#!/usr/bin/env python3 """ YouTube Tools for GAIA Agent Provides functionality to extract information from YouTube videos """ import os import re import logging from typing import Dict, Any, Optional, List import requests from urllib.parse import urlparse, parse_qs logger = logging.getLogger(__name__) class YouTubeTools: """Tools for working with YouTube videos""" def __init__(self): """Initialize YouTube tools""" self.youtube_api_key = os.getenv('YOUTUBE_API_KEY') if not self.youtube_api_key: logger.warning("YOUTUBE_API_KEY not found. YouTube functionality will be limited.") # Try to import optional dependencies try: import yt_dlp self.yt_dlp = yt_dlp self.has_yt_dlp = True logger.info("yt-dlp available for YouTube processing") except ImportError: self.yt_dlp = None self.has_yt_dlp = False logger.warning("yt-dlp not available. Install with: pip install yt-dlp") try: from youtube_transcript_api import YouTubeTranscriptApi self.transcript_api = YouTubeTranscriptApi self.has_transcript_api = True logger.info("youtube-transcript-api available for transcript extraction") except ImportError: self.transcript_api = None self.has_transcript_api = False logger.warning("youtube-transcript-api not available. Install with: pip install youtube-transcript-api") def extract_video_id(self, url: str) -> Optional[str]: """Extract video ID from YouTube URL""" patterns = [ r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})', r'youtube\.com/watch\?.*v=([a-zA-Z0-9_-]{11})', ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return None def get_video_metadata(self, video_url: str) -> Dict[str, Any]: """Get video metadata using YouTube API or yt-dlp""" video_id = self.extract_video_id(video_url) if not video_id: return {"error": "Invalid YouTube URL"} # Try YouTube API first if self.youtube_api_key: try: return self._get_metadata_via_api(video_id) except Exception as e: logger.error(f"YouTube API failed: {e}") # Fallback to yt-dlp if self.has_yt_dlp: try: return self._get_metadata_via_ytdlp(video_url) except Exception as e: logger.error(f"yt-dlp failed: {e}") return {"error": "Could not extract video metadata"} def _get_metadata_via_api(self, video_id: str) -> Dict[str, Any]: """Get metadata using YouTube Data API""" url = "https://www.googleapis.com/youtube/v3/videos" params = { 'id': video_id, 'key': self.youtube_api_key, 'part': 'snippet,statistics,contentDetails' } response = requests.get(url, params=params) response.raise_for_status() data = response.json() if not data.get('items'): return {"error": "Video not found"} item = data['items'][0] snippet = item.get('snippet', {}) statistics = item.get('statistics', {}) content_details = item.get('contentDetails', {}) return { 'title': snippet.get('title', ''), 'description': snippet.get('description', ''), 'channel_title': snippet.get('channelTitle', ''), 'published_at': snippet.get('publishedAt', ''), 'duration': content_details.get('duration', ''), 'view_count': statistics.get('viewCount', ''), 'like_count': statistics.get('likeCount', ''), 'comment_count': statistics.get('commentCount', ''), 'tags': snippet.get('tags', []), 'category_id': snippet.get('categoryId', ''), 'language': snippet.get('defaultLanguage', ''), 'source': 'youtube_api' } def _get_metadata_via_ytdlp(self, video_url: str) -> Dict[str, Any]: """Get metadata using yt-dlp""" ydl_opts = { 'quiet': True, 'no_warnings': True, 'extract_flat': False, } with self.yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(video_url, download=False) return { 'title': info.get('title', ''), 'description': info.get('description', ''), 'channel_title': info.get('uploader', ''), 'published_at': info.get('upload_date', ''), 'duration': str(info.get('duration', '')), 'view_count': str(info.get('view_count', '')), 'like_count': str(info.get('like_count', '')), 'tags': info.get('tags', []), 'source': 'yt_dlp' } def get_video_transcript(self, video_url: str, languages: List[str] = None) -> Dict[str, Any]: """Get video transcript/captions""" if not self.has_transcript_api: return {"error": "youtube-transcript-api not available"} video_id = self.extract_video_id(video_url) if not video_id: return {"error": "Invalid YouTube URL"} if languages is None: languages = ['en', 'ru', 'auto'] try: # Try to get transcript in preferred languages for lang in languages: try: transcript = self.transcript_api.get_transcript(video_id, languages=[lang]) text = ' '.join([entry['text'] for entry in transcript]) return { 'transcript': text, 'language': lang, 'entries': transcript, 'word_count': len(text.split()), 'source': 'youtube_transcript_api' } except Exception as e: logger.debug(f"Failed to get transcript in {lang}: {e}") continue # If no specific language worked, try auto-generated try: transcript_list = self.transcript_api.list_transcripts(video_id) transcript = transcript_list.find_generated_transcript(['en']) transcript_data = transcript.fetch() text = ' '.join([entry['text'] for entry in transcript_data]) return { 'transcript': text, 'language': 'auto-generated', 'entries': transcript_data, 'word_count': len(text.split()), 'source': 'youtube_transcript_api' } except Exception as e: logger.error(f"Failed to get auto-generated transcript: {e}") return {"error": "No transcript available"} except Exception as e: logger.error(f"Transcript extraction failed: {e}") return {"error": f"Transcript extraction failed: {str(e)}"} def analyze_video(self, video_url: str) -> Dict[str, Any]: """Comprehensive video analysis""" logger.info(f"Analyzing YouTube video: {video_url}") result = { 'url': video_url, 'video_id': self.extract_video_id(video_url), 'metadata': {}, 'transcript': {}, 'analysis': {} } # Get metadata metadata = self.get_video_metadata(video_url) result['metadata'] = metadata # Get transcript transcript = self.get_video_transcript(video_url) result['transcript'] = transcript # Basic analysis analysis = {} if 'error' not in metadata: analysis['has_metadata'] = True analysis['title'] = metadata.get('title', '') analysis['duration'] = metadata.get('duration', '') analysis['view_count'] = metadata.get('view_count', '') analysis['channel'] = metadata.get('channel_title', '') else: analysis['has_metadata'] = False analysis['metadata_error'] = metadata.get('error', '') if 'error' not in transcript: analysis['has_transcript'] = True analysis['transcript_language'] = transcript.get('language', '') analysis['word_count'] = transcript.get('word_count', 0) analysis['transcript_preview'] = transcript.get('transcript', '')[:200] + '...' if transcript.get('transcript') else '' else: analysis['has_transcript'] = False analysis['transcript_error'] = transcript.get('error', '') result['analysis'] = analysis logger.info(f"Video analysis complete. Metadata: {analysis.get('has_metadata')}, Transcript: {analysis.get('has_transcript')}") return result def format_video_info_for_llm(self, video_analysis: Dict[str, Any]) -> str: """Format video information for LLM consumption""" info_parts = [] # Basic info video_id = video_analysis.get('video_id', 'unknown') url = video_analysis.get('url', '') info_parts.append(f"YouTube Video ID: {video_id}") info_parts.append(f"URL: {url}") # Metadata metadata = video_analysis.get('metadata', {}) if 'error' not in metadata: info_parts.append(f"Title: {metadata.get('title', 'N/A')}") info_parts.append(f"Channel: {metadata.get('channel_title', 'N/A')}") info_parts.append(f"Duration: {metadata.get('duration', 'N/A')}") info_parts.append(f"Views: {metadata.get('view_count', 'N/A')}") info_parts.append(f"Published: {metadata.get('published_at', 'N/A')}") if metadata.get('description'): desc = metadata['description'][:500] + '...' if len(metadata['description']) > 500 else metadata['description'] info_parts.append(f"Description: {desc}") if metadata.get('tags'): info_parts.append(f"Tags: {', '.join(metadata['tags'][:10])}") else: info_parts.append(f"Metadata Error: {metadata.get('error', 'Unknown error')}") # Transcript transcript = video_analysis.get('transcript', {}) if 'error' not in transcript: info_parts.append(f"Transcript Language: {transcript.get('language', 'N/A')}") info_parts.append(f"Transcript Word Count: {transcript.get('word_count', 0)}") if transcript.get('transcript'): # Include first part of transcript transcript_text = transcript['transcript'] if len(transcript_text) > 1000: transcript_text = transcript_text[:1000] + '...' info_parts.append(f"Transcript: {transcript_text}") else: info_parts.append(f"Transcript Error: {transcript.get('error', 'Unknown error')}") return '\n'.join(info_parts) def search_in_transcript(self, video_analysis: Dict[str, Any], query: str) -> Dict[str, Any]: """Search for specific content in video transcript""" transcript = video_analysis.get('transcript', {}) if 'error' in transcript: return {"error": "No transcript available"} transcript_text = transcript.get('transcript', '') entries = transcript.get('entries', []) if not transcript_text: return {"error": "Empty transcript"} # Simple text search query_lower = query.lower() matches = [] # Search in full text if query_lower in transcript_text.lower(): # Find specific entries that contain the query for entry in entries: if query_lower in entry.get('text', '').lower(): matches.append({ 'text': entry.get('text', ''), 'start': entry.get('start', 0), 'duration': entry.get('duration', 0) }) return { 'query': query, 'found': len(matches) > 0, 'match_count': len(matches), 'matches': matches[:10], # Limit to first 10 matches 'full_transcript_contains': query_lower in transcript_text.lower() }