import asyncio import re import logging from datetime import datetime from typing import List, Dict, Optional from tqdm import tqdm from crawl4ai import AsyncWebCrawler from crawl4ai.extraction_strategy import JsonCssExtractionStrategy # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class YouTubeScraper: YOUTUBE_SCHEMA = { "name": "YouTubeVideoData", "baseSelector": "ytd-rich-item-renderer", "fields": [ {"name": "title", "selector": "#video-title", "type": "text"}, {"name": "url", "selector": "#video-title", "type": "link"}, {"name": "views", "selector": "#metadata-line span:first-child", "type": "text"}, {"name": "upload_date", "selector": "#metadata-line span:last-child", "type": "text"}, {"name": "transcript", "selector": "#segments-container", "type": "text"} ] } def __init__(self): self.crawler = AsyncWebCrawler( headless=True, browser="chromium", stealth=True, timeout=60 ) async def scrape_channel(self, url: str, start_date: str, end_date: str, max_videos: int = 10): """Scrape and process YouTube channel content""" try: logger.info(f"Scraping channel: {url}") result = await self.crawler.arun( url=url, extraction_strategy=JsonCssExtractionStrategy(self.YOUTUBE_SCHEMA), wait_for_selector="#video-title" ) return self._process_results(result.data, start_date, end_date, max_videos) except Exception as e: logger.error(f"Failed to scrape {url}: {str(e)}") return [] def _process_results(self, raw_data: List[Dict], start_date: str, end_date: str, max_videos: int): """Process and filter scraped data""" processed = [] date_format = "%b %d, %Y" for item in raw_data[:max_videos]: try: if not item.get("url"): continue upload_date = datetime.strptime(item["upload_date"], date_format) start = datetime.strptime(start_date, "%Y-%m-%d") end = datetime.strptime(end_date, "%Y-%m-%d") if not (start <= upload_date <= end): continue processed.append({ "id": self._extract_video_id(item["url"]), "title": item.get("title", "Untitled"), "url": f"https://youtube.com{item['url']}", "views": self._parse_views(item.get("views", "0")), "upload_date": upload_date.strftime("%Y-%m-%d"), "transcript": self._process_transcript(item.get("transcript", "")) }) except Exception as e: logger.warning(f"Skipping invalid video data: {str(e)}") return processed @staticmethod def _parse_views(views_str: str) -> int: """Convert view count string to integer""" return int(re.sub(r"[^\d]", "", views_str.split()[0])) if views_str else 0 @staticmethod def _process_transcript(raw: str) -> List[Dict]: """Structure raw transcript text""" return [{ "start": i*5, "end": (i+1)*5, "text": line.strip() } for i, line in enumerate(raw.split("\n") if raw else [])] @staticmethod def _extract_video_id(url: str) -> Optional[str]: """Extract YouTube video ID from URL""" match = re.search(r"v=([a-zA-Z0-9_-]{11})", url) return match.group(1) if match else None async def scrape_multiple_channels(urls: List[str], start_date: str, end_date: str, num_videos: int = 10): """Scrape multiple YouTube channels with progress tracking""" scraper = YouTubeScraper() tasks = [scraper.scrape_channel(url, start_date, end_date, num_videos) for url in urls] with tqdm(total=len(tasks), desc="Processing Channels") as pbar: results = [] for future in asyncio.as_completed(tasks): results.append(await future) pbar.update(1) return results