@woai
Add HybridGAIAAgent and clean up project structure
04ffb15
#!/usr/bin/env python3
"""
YouTube Tools for GAIA Agent
Provides functionality to extract information from YouTube videos
"""
import os
import re
import logging
from typing import Dict, Any, Optional, List
import requests
from urllib.parse import urlparse, parse_qs
logger = logging.getLogger(__name__)
class YouTubeTools:
"""Tools for working with YouTube videos"""
def __init__(self):
"""Initialize YouTube tools"""
self.youtube_api_key = os.getenv('YOUTUBE_API_KEY')
if not self.youtube_api_key:
logger.warning("YOUTUBE_API_KEY not found. YouTube functionality will be limited.")
# Try to import optional dependencies
try:
import yt_dlp
self.yt_dlp = yt_dlp
self.has_yt_dlp = True
logger.info("yt-dlp available for YouTube processing")
except ImportError:
self.yt_dlp = None
self.has_yt_dlp = False
logger.warning("yt-dlp not available. Install with: pip install yt-dlp")
try:
from youtube_transcript_api import YouTubeTranscriptApi
self.transcript_api = YouTubeTranscriptApi
self.has_transcript_api = True
logger.info("youtube-transcript-api available for transcript extraction")
except ImportError:
self.transcript_api = None
self.has_transcript_api = False
logger.warning("youtube-transcript-api not available. Install with: pip install youtube-transcript-api")
def extract_video_id(self, url: str) -> Optional[str]:
"""Extract video ID from YouTube URL"""
patterns = [
r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})',
r'youtube\.com/watch\?.*v=([a-zA-Z0-9_-]{11})',
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
def get_video_metadata(self, video_url: str) -> Dict[str, Any]:
"""Get video metadata using YouTube API or yt-dlp"""
video_id = self.extract_video_id(video_url)
if not video_id:
return {"error": "Invalid YouTube URL"}
# Try YouTube API first
if self.youtube_api_key:
try:
return self._get_metadata_via_api(video_id)
except Exception as e:
logger.error(f"YouTube API failed: {e}")
# Fallback to yt-dlp
if self.has_yt_dlp:
try:
return self._get_metadata_via_ytdlp(video_url)
except Exception as e:
logger.error(f"yt-dlp failed: {e}")
return {"error": "Could not extract video metadata"}
def _get_metadata_via_api(self, video_id: str) -> Dict[str, Any]:
"""Get metadata using YouTube Data API"""
url = "https://www.googleapis.com/youtube/v3/videos"
params = {
'id': video_id,
'key': self.youtube_api_key,
'part': 'snippet,statistics,contentDetails'
}
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
if not data.get('items'):
return {"error": "Video not found"}
item = data['items'][0]
snippet = item.get('snippet', {})
statistics = item.get('statistics', {})
content_details = item.get('contentDetails', {})
return {
'title': snippet.get('title', ''),
'description': snippet.get('description', ''),
'channel_title': snippet.get('channelTitle', ''),
'published_at': snippet.get('publishedAt', ''),
'duration': content_details.get('duration', ''),
'view_count': statistics.get('viewCount', ''),
'like_count': statistics.get('likeCount', ''),
'comment_count': statistics.get('commentCount', ''),
'tags': snippet.get('tags', []),
'category_id': snippet.get('categoryId', ''),
'language': snippet.get('defaultLanguage', ''),
'source': 'youtube_api'
}
def _get_metadata_via_ytdlp(self, video_url: str) -> Dict[str, Any]:
"""Get metadata using yt-dlp"""
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': False,
}
with self.yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=False)
return {
'title': info.get('title', ''),
'description': info.get('description', ''),
'channel_title': info.get('uploader', ''),
'published_at': info.get('upload_date', ''),
'duration': str(info.get('duration', '')),
'view_count': str(info.get('view_count', '')),
'like_count': str(info.get('like_count', '')),
'tags': info.get('tags', []),
'source': 'yt_dlp'
}
def get_video_transcript(self, video_url: str, languages: List[str] = None) -> Dict[str, Any]:
"""Get video transcript/captions"""
if not self.has_transcript_api:
return {"error": "youtube-transcript-api not available"}
video_id = self.extract_video_id(video_url)
if not video_id:
return {"error": "Invalid YouTube URL"}
if languages is None:
languages = ['en', 'ru', 'auto']
try:
# Try to get transcript in preferred languages
for lang in languages:
try:
transcript = self.transcript_api.get_transcript(video_id, languages=[lang])
text = ' '.join([entry['text'] for entry in transcript])
return {
'transcript': text,
'language': lang,
'entries': transcript,
'word_count': len(text.split()),
'source': 'youtube_transcript_api'
}
except Exception as e:
logger.debug(f"Failed to get transcript in {lang}: {e}")
continue
# If no specific language worked, try auto-generated
try:
transcript_list = self.transcript_api.list_transcripts(video_id)
transcript = transcript_list.find_generated_transcript(['en'])
transcript_data = transcript.fetch()
text = ' '.join([entry['text'] for entry in transcript_data])
return {
'transcript': text,
'language': 'auto-generated',
'entries': transcript_data,
'word_count': len(text.split()),
'source': 'youtube_transcript_api'
}
except Exception as e:
logger.error(f"Failed to get auto-generated transcript: {e}")
return {"error": "No transcript available"}
except Exception as e:
logger.error(f"Transcript extraction failed: {e}")
return {"error": f"Transcript extraction failed: {str(e)}"}
def analyze_video(self, video_url: str) -> Dict[str, Any]:
"""Comprehensive video analysis"""
logger.info(f"Analyzing YouTube video: {video_url}")
result = {
'url': video_url,
'video_id': self.extract_video_id(video_url),
'metadata': {},
'transcript': {},
'analysis': {}
}
# Get metadata
metadata = self.get_video_metadata(video_url)
result['metadata'] = metadata
# Get transcript
transcript = self.get_video_transcript(video_url)
result['transcript'] = transcript
# Basic analysis
analysis = {}
if 'error' not in metadata:
analysis['has_metadata'] = True
analysis['title'] = metadata.get('title', '')
analysis['duration'] = metadata.get('duration', '')
analysis['view_count'] = metadata.get('view_count', '')
analysis['channel'] = metadata.get('channel_title', '')
else:
analysis['has_metadata'] = False
analysis['metadata_error'] = metadata.get('error', '')
if 'error' not in transcript:
analysis['has_transcript'] = True
analysis['transcript_language'] = transcript.get('language', '')
analysis['word_count'] = transcript.get('word_count', 0)
analysis['transcript_preview'] = transcript.get('transcript', '')[:200] + '...' if transcript.get('transcript') else ''
else:
analysis['has_transcript'] = False
analysis['transcript_error'] = transcript.get('error', '')
result['analysis'] = analysis
logger.info(f"Video analysis complete. Metadata: {analysis.get('has_metadata')}, Transcript: {analysis.get('has_transcript')}")
return result
def format_video_info_for_llm(self, video_analysis: Dict[str, Any]) -> str:
"""Format video information for LLM consumption"""
info_parts = []
# Basic info
video_id = video_analysis.get('video_id', 'unknown')
url = video_analysis.get('url', '')
info_parts.append(f"YouTube Video ID: {video_id}")
info_parts.append(f"URL: {url}")
# Metadata
metadata = video_analysis.get('metadata', {})
if 'error' not in metadata:
info_parts.append(f"Title: {metadata.get('title', 'N/A')}")
info_parts.append(f"Channel: {metadata.get('channel_title', 'N/A')}")
info_parts.append(f"Duration: {metadata.get('duration', 'N/A')}")
info_parts.append(f"Views: {metadata.get('view_count', 'N/A')}")
info_parts.append(f"Published: {metadata.get('published_at', 'N/A')}")
if metadata.get('description'):
desc = metadata['description'][:500] + '...' if len(metadata['description']) > 500 else metadata['description']
info_parts.append(f"Description: {desc}")
if metadata.get('tags'):
info_parts.append(f"Tags: {', '.join(metadata['tags'][:10])}")
else:
info_parts.append(f"Metadata Error: {metadata.get('error', 'Unknown error')}")
# Transcript
transcript = video_analysis.get('transcript', {})
if 'error' not in transcript:
info_parts.append(f"Transcript Language: {transcript.get('language', 'N/A')}")
info_parts.append(f"Transcript Word Count: {transcript.get('word_count', 0)}")
if transcript.get('transcript'):
# Include first part of transcript
transcript_text = transcript['transcript']
if len(transcript_text) > 1000:
transcript_text = transcript_text[:1000] + '...'
info_parts.append(f"Transcript: {transcript_text}")
else:
info_parts.append(f"Transcript Error: {transcript.get('error', 'Unknown error')}")
return '\n'.join(info_parts)
def search_in_transcript(self, video_analysis: Dict[str, Any], query: str) -> Dict[str, Any]:
"""Search for specific content in video transcript"""
transcript = video_analysis.get('transcript', {})
if 'error' in transcript:
return {"error": "No transcript available"}
transcript_text = transcript.get('transcript', '')
entries = transcript.get('entries', [])
if not transcript_text:
return {"error": "Empty transcript"}
# Simple text search
query_lower = query.lower()
matches = []
# Search in full text
if query_lower in transcript_text.lower():
# Find specific entries that contain the query
for entry in entries:
if query_lower in entry.get('text', '').lower():
matches.append({
'text': entry.get('text', ''),
'start': entry.get('start', 0),
'duration': entry.get('duration', 0)
})
return {
'query': query,
'found': len(matches) > 0,
'match_count': len(matches),
'matches': matches[:10], # Limit to first 10 matches
'full_transcript_contains': query_lower in transcript_text.lower()
}