YT-Trend / crawl4ai_scrapper-1.py
muhammadsalmanalfaridzi's picture
Rename crawl4ai_scrapper.py to crawl4ai_scrapper-1.py
de865d0 verified
import asyncio
import re
import logging
from datetime import datetime
from typing import List, Dict, Optional
from tqdm import tqdm
from crawl4ai import AsyncWebCrawler
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class YouTubeScraper:
YOUTUBE_SCHEMA = {
"name": "YouTubeVideoData",
"baseSelector": "ytd-rich-item-renderer",
"fields": [
{"name": "title", "selector": "#video-title", "type": "text"},
{"name": "url", "selector": "#video-title", "type": "link"},
{"name": "views", "selector": "#metadata-line span:first-child", "type": "text"},
{"name": "upload_date", "selector": "#metadata-line span:last-child", "type": "text"},
{"name": "transcript", "selector": "#segments-container", "type": "text"}
]
}
def __init__(self):
self.crawler = AsyncWebCrawler(
headless=True,
browser="chromium",
stealth=True,
timeout=60
)
async def scrape_channel(self, url: str, start_date: str, end_date: str, max_videos: int = 10):
"""Scrape and process YouTube channel content"""
try:
logger.info(f"Scraping channel: {url}")
result = await self.crawler.arun(
url=url,
extraction_strategy=JsonCssExtractionStrategy(self.YOUTUBE_SCHEMA),
wait_for_selector="#video-title"
)
return self._process_results(result.data, start_date, end_date, max_videos)
except Exception as e:
logger.error(f"Failed to scrape {url}: {str(e)}")
return []
def _process_results(self, raw_data: List[Dict], start_date: str, end_date: str, max_videos: int):
"""Process and filter scraped data"""
processed = []
date_format = "%b %d, %Y"
for item in raw_data[:max_videos]:
try:
if not item.get("url"):
continue
upload_date = datetime.strptime(item["upload_date"], date_format)
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
if not (start <= upload_date <= end):
continue
processed.append({
"id": self._extract_video_id(item["url"]),
"title": item.get("title", "Untitled"),
"url": f"https://youtube.com{item['url']}",
"views": self._parse_views(item.get("views", "0")),
"upload_date": upload_date.strftime("%Y-%m-%d"),
"transcript": self._process_transcript(item.get("transcript", ""))
})
except Exception as e:
logger.warning(f"Skipping invalid video data: {str(e)}")
return processed
@staticmethod
def _parse_views(views_str: str) -> int:
"""Convert view count string to integer"""
return int(re.sub(r"[^\d]", "", views_str.split()[0])) if views_str else 0
@staticmethod
def _process_transcript(raw: str) -> List[Dict]:
"""Structure raw transcript text"""
return [{
"start": i*5,
"end": (i+1)*5,
"text": line.strip()
} for i, line in enumerate(raw.split("\n") if raw else [])]
@staticmethod
def _extract_video_id(url: str) -> Optional[str]:
"""Extract YouTube video ID from URL"""
match = re.search(r"v=([a-zA-Z0-9_-]{11})", url)
return match.group(1) if match else None
async def scrape_multiple_channels(urls: List[str], start_date: str, end_date: str, num_videos: int = 10):
"""Scrape multiple YouTube channels with progress tracking"""
scraper = YouTubeScraper()
tasks = [scraper.scrape_channel(url, start_date, end_date, num_videos) for url in urls]
with tqdm(total=len(tasks), desc="Processing Channels") as pbar:
results = []
for future in asyncio.as_completed(tasks):
results.append(await future)
pbar.update(1)
return results