# --- Standard Libraries --- import os import re import logging import asyncio import json import html import contextlib import traceback from typing import Optional, Dict, Any, Tuple # --- Frameworks --- from starlette.applications import Starlette from starlette.routing import Route from starlette.responses import PlainTextResponse, JSONResponse, Response from starlette.requests import Request # --- Telegram Bot --- from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, Bot from telegram.ext import ( Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler, ) from telegram.constants import ParseMode from telegram.error import NetworkError, RetryAfter, TimedOut, BadRequest, TelegramError from telegram.request import HTTPXRequest, BaseRequest # --- Other Libraries --- import httpx from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound from bs4 import BeautifulSoup from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, before_sleep_log try: import lxml DEFAULT_PARSER = 'lxml' except ImportError: DEFAULT_PARSER = 'html.parser' # --- Crawl4AI (New Primary Web Scraper) --- try: from crawl4ai import AsyncWebCrawler _crawl4ai_available = True except ImportError: AsyncWebCrawler = None _crawl4ai_available = False # logger defined later # --- Google Gemini --- try: import google.generativeai as genai # Import specific types needed, check library for exact names if errors occur from google.generativeai.types import HarmCategory, HarmBlockThreshold, GenerateContentResponse _gemini_available = True except ImportError: genai = None HarmCategory = None HarmBlockThreshold = None GenerateContentResponse = None # Add this for type hinting if needed _gemini_available = False # logger defined later # --- Logging Setup --- logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("telegram.ext").setLevel(logging.INFO) logging.getLogger('telegram.bot').setLevel(logging.INFO) logging.getLogger("urllib3").setLevel(logging.INFO) logging.getLogger('gunicorn.error').setLevel(logging.INFO) logging.getLogger('uvicorn').setLevel(logging.INFO) logging.getLogger('starlette').setLevel(logging.INFO) if _gemini_available: logging.getLogger("google.ai.generativelanguage").setLevel(logging.WARNING) # Suppress noisy crawl4ai/playwright logs if needed logging.getLogger("crawl4ai").setLevel(logging.INFO) # Adjust level as needed logging.getLogger("playwright").setLevel(logging.WARNING) logger = logging.getLogger(__name__) # Use __name__ logger.info(f"Logging configured. Using BS4 parser: {DEFAULT_PARSER}") if not _gemini_available: logger.warning("google-generativeai library not found. Gemini functionality disabled.") if not _crawl4ai_available: logger.warning("crawl4ai library not found. Primary web scraping disabled.") # --- Global variable for PTB app --- ptb_app: Optional[Application] = None # --- Environment Variable Loading & Configuration --- logger.info("Attempting to load secrets and configuration...") def get_secret(secret_name): # (Function remains the same) value = os.environ.get(secret_name) if value: status = "Found" log_length = min(len(value), 8) value_start = value[:log_length] logger.info(f"Secret '{secret_name}': {status} (Value starts with: {value_start}...)") else: status = "Not Found" logger.warning(f"Secret '{secret_name}': {status}") return value TELEGRAM_TOKEN = get_secret('TELEGRAM_TOKEN') OPENROUTER_API_KEY = get_secret('OPENROUTER_API_KEY') # Fallback Summarizer URLTOTEXT_API_KEY = get_secret('URLTOTEXT_API_KEY') # Fallback Web Scraper 2 SUPADATA_API_KEY = get_secret('SUPADATA_API_KEY') # Fallback YT Transcript 1 APIFY_API_TOKEN = get_secret('APIFY_API_TOKEN') # Fallback YT Transcript 2 WEBHOOK_SECRET = get_secret('WEBHOOK_SECRET') GEMINI_API_KEY = get_secret('GEMINI_API_KEY') # Primary Summarizer # Models (User can still configure via env vars) OPENROUTER_MODEL = os.environ.get("OPENROUTER_MODEL", "deepseek/deepseek-chat-v3-0324:free") # Fallback Model APIFY_ACTOR_ID = os.environ.get("APIFY_ACTOR_ID", "karamelo~youtube-transcripts") GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.0-flash-001") # Primary Model # --- Configuration Checks --- if not TELEGRAM_TOKEN: logger.critical("❌ FATAL: TELEGRAM_TOKEN not found."); raise RuntimeError("Exiting: Telegram token missing.") if not GEMINI_API_KEY: logger.error("❌ ERROR: GEMINI_API_KEY not found. Primary summarization (Gemini) will fail.") if not OPENROUTER_API_KEY: logger.warning("⚠️ WARNING: OPENROUTER_API_KEY not found. Fallback summarization will fail.") _gemini_primary_enabled = _gemini_available and bool(GEMINI_API_KEY) if not _gemini_available: logger.warning("⚠️ WARNING: google-generativeai library missing. Gemini disabled.") elif not GEMINI_API_KEY: logger.warning("⚠️ WARNING: GEMINI_API_KEY not found or empty. Gemini disabled.") _openrouter_fallback_enabled = bool(OPENROUTER_API_KEY) if not _openrouter_fallback_enabled: logger.warning("⚠️ WARNING: OPENROUTER_API_KEY not found. Fallback disabled.") _crawl4ai_primary_web_enabled = _crawl4ai_available if not _crawl4ai_primary_web_enabled: logger.warning("⚠️ WARNING: crawl4ai library missing. Primary Web Scraper disabled.") _urltotext_fallback_enabled = bool(URLTOTEXT_API_KEY) if not _urltotext_fallback_enabled: logger.info("ℹ️ INFO: URLTOTEXT_API_KEY not found. Fallback Web Scraper 2 (API) disabled.") else: logger.info("ℹ️ INFO: URLTOTEXT_API_KEY found. Fallback Web Scraper 2 (API) enabled.") if not SUPADATA_API_KEY: logger.info("ℹ️ INFO: SUPADATA_API_KEY not found. Fallback YT Transcript 1 (API) disabled.") if not APIFY_API_TOKEN: logger.info("ℹ️ INFO: APIFY_API_TOKEN not found. Fallback YT Transcript 2 (API) disabled.") if not WEBHOOK_SECRET: logger.info("ℹ️ INFO: Optional secret 'WEBHOOK_SECRET' not found. Webhook security disabled.") logger.info("Secret loading and configuration check finished.") logger.info(f"Primary Web Scraper: {'Crawl4AI' if _crawl4ai_primary_web_enabled else 'DISABLED'}") logger.info(f"Fallback Web Scraper 1: BeautifulSoup (Always available)") logger.info(f"Fallback Web Scraper 2: urltotext.com API {'ENABLED' if _urltotext_fallback_enabled else 'DISABLED'}") logger.info(f"Primary Summarizer: Gemini ({GEMINI_MODEL if _gemini_primary_enabled else 'DISABLED'})") logger.info(f"Fallback Summarizer: OpenRouter ({OPENROUTER_MODEL if _openrouter_fallback_enabled else 'DISABLED'})") logger.info(f"Primary YT Transcript: youtube-transcript-api (Always available)") logger.info(f"Fallback YT Transcript 1: Supadata API {'ENABLED' if SUPADATA_API_KEY else 'DISABLED'}") logger.info(f"Fallback YT Transcript 2: Apify REST API {'ENABLED' if APIFY_API_TOKEN else 'DISABLED'}") _apify_token_exists = bool(APIFY_API_TOKEN) # Keep this for health check if _gemini_primary_enabled: try: genai.configure(api_key=GEMINI_API_KEY) logger.info("Google GenAI client configured successfully.") except Exception as e: logger.error(f"Failed to configure Google GenAI client: {e}") _gemini_primary_enabled = False # --- Constants --- MAX_SUMMARY_CHUNK_SIZE = 4000 MAX_INPUT_TOKEN_APPROX = 500000 # --- Retry Decorator --- @retry( stop=stop_after_attempt(4), wait=wait_exponential(multiplier=1, min=2, max=15), retry=retry_if_exception_type((NetworkError, RetryAfter, TimedOut, BadRequest)), before_sleep=before_sleep_log(logger, logging.WARNING), reraise=True ) async def retry_bot_operation(func, *args, **kwargs): try: return await func(*args, **kwargs) except BadRequest as e: ignore_errors = [ "message is not modified", "query is too old", "message to edit not found", "chat not found", "bot was blocked by the user", ] if any(err in str(e).lower() for err in ignore_errors): logger.warning(f"Ignoring non-critical BadRequest: {e}") return None logger.error(f"Potentially critical BadRequest: {e}") raise except TelegramError as e: if isinstance(e, (TimedOut, NetworkError, RetryAfter)): logger.warning(f"Telegram transient error (will retry): {e}") else: logger.error(f"Unhandled TelegramError: {e}") raise except Exception as e: logger.error(f"Unexpected error during bot operation: {e}", exc_info=True) raise # --- Helper Functions --- def is_youtube_url(url): youtube_regex = re.compile(r'(?:https?://)?(?:www.)?(?:m.)?(?:youtube(?:-nocookie)?.com|youtu.be)/(?:watch?v=|embed/|v/|shorts/|live/|attribution_link?a=.&u=/watch?v=)?([\w-]{11})(?:\S+)?', re.IGNORECASE) match = youtube_regex.search(url) logger.debug(f"is_youtube_url '{url}': {bool(match)}") return bool(match) def extract_youtube_id(url): youtube_regex = re.compile(r'(?:https?://)?(?:www.)?(?:m.)?(?:youtube(?:-nocookie)?.com|youtu.be)/(?:watch?v=|embed/|v/|shorts/|live/|attribution_link?a=.&u=/watch?v=)?([\w-]{11})(?:\S+)?', re.IGNORECASE) match = youtube_regex.search(url) if match: video_id = match.group(1) logger.debug(f"Extracted YT ID '{video_id}' from {url}") return video_id else: logger.warning(f"Could not extract YT ID from {url}") return None # --- Content Fetching Functions --- async def get_transcript_via_supadata(video_id: str, api_key: str) -> Optional[str]: if not video_id: logger.error("[Supadata] No video_id provided"); return None if not api_key: logger.error("[Supadata] API key missing."); return None logger.info(f"[Supadata] Attempting fetch for video ID: {video_id}") api_endpoint = "https://api.supadata.ai/v1/youtube/transcript" # Note: Param name might be 'videoId' based on earlier logs, adjust if needed params = {"videoId": video_id, "format": "text"} headers = {"X-API-Key": api_key} try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get(api_endpoint, headers=headers, params=params) logger.debug(f"[Supadata] Status code {response.status_code} for {video_id}") if response.status_code == 200: try: # Attempt to decode JSON, fall back to raw text if needed try: data = response.json() except json.JSONDecodeError: data = None content = None # Check various possible response structures if data: content = data if isinstance(data, str) else data.get("transcript") or data.get("text") or data.get("data") # If JSON parsing failed or content key not found, try raw text if not content and response.text: content = response.text if content and isinstance(content, str): logger.info(f"[Supadata] Success for {video_id}. Length: {len(content)}") return content.strip() else: logger.warning(f"[Supadata] Success but content empty/invalid for {video_id}. Response: {response.text[:200]}") return None except Exception as e: logger.error(f"[Supadata] Error processing success response for {video_id}: {e}", exc_info=True) return None elif response.status_code in [401, 403]: logger.error(f"[Supadata] Auth error ({response.status_code}). Check API key.") return None elif response.status_code == 404: logger.warning(f"[Supadata] Not found (404) for {video_id}.") return None else: logger.error(f"[Supadata] Unexpected status {response.status_code} for {video_id}. Resp: {response.text[:200]}") return None except httpx.TimeoutException: logger.error(f"[Supadata] Timeout connecting for {video_id}") return None except httpx.RequestError as e: # Log specific errors like SSL verification failure if "CERTIFICATE_VERIFY_FAILED" in str(e): logger.error(f"[Supadata] SSL Cert Verify Failed for {video_id}: {e}") else: logger.error(f"[Supadata] Request error for {video_id}: {e}") return None except Exception as e: logger.error(f"[Supadata] Unexpected error for {video_id}: {e}", exc_info=True) return None async def get_transcript_via_apify(video_url: str, api_token: str) -> Optional[str]: global APIFY_ACTOR_ID if not video_url: logger.error("[Apify] No video_url provided"); return None if not api_token: logger.error("[Apify] API token missing."); return None logger.info(f"[Apify] Attempting fetch for URL: {video_url} (Actor: {APIFY_ACTOR_ID})") sync_items_endpoint = f"https://api.apify.com/v2/acts/{APIFY_ACTOR_ID}/run-sync-get-dataset-items" params = {"token": api_token} # *** RESTORED Full Payload *** payload = { "urls": [video_url], "outputFormat": "singleStringText", "maxRetries": 5, "channelHandleBoolean": False, "channelNameBoolean": False, "datePublishedBoolean": False, "relativeDateTextBoolean": False, } headers = {"Content-Type": "application/json"} try: async with httpx.AsyncClient(timeout=120.0) as client: log_headers = {k: v for k, v in headers.items()} # Avoid logging token in params logger.debug(f"[Apify] POST Request Details:\nURL: {sync_items_endpoint}\nParams: {params}\nHeaders: {log_headers}\nPayload: {json.dumps(payload)}") response = await client.post(sync_items_endpoint, headers=headers, params=params, json=payload) logger.debug(f"[Apify] Received status code {response.status_code} for {video_url}") if response.status_code == 200: try: results = response.json() if isinstance(results, list) and len(results) > 0: item = results[0] content = None # Prioritize specific keys, fall back to others if "captions" in item and isinstance(item["captions"], str): content = item["captions"] elif "text" in item and isinstance(item["text"], str): content = item["text"] elif "transcript" in item and isinstance(item["transcript"], str): content = item["transcript"] # Handle list of caption segments if primary keys fail elif "captions" in item and isinstance(item["captions"], list): if len(item["captions"]) > 0 and isinstance(item["captions"][0], dict) and 'text' in item["captions"][0]: content = " ".join(line.get("text", "") for line in item["captions"] if line.get("text")) elif len(item["captions"]) > 0 and isinstance(item["captions"][0], str): content = " ".join(item["captions"]) # Final check if content was found and is a string if content and isinstance(content, str): logger.info(f"[Apify] Success via REST for {video_url}. Length: {len(content)}") return content.strip() else: logger.warning(f"[Apify] Dataset item parsed but transcript content empty/invalid format for {video_url}. Item keys: {list(item.keys())}") return None else: logger.warning(f"[Apify] Actor success but dataset was empty for {video_url}. Response: {results}") return None except json.JSONDecodeError: logger.error(f"[Apify] Failed JSON decode. Status:{response.status_code}. Resp:{response.text[:200]}") return None except Exception as e: logger.error(f"[Apify] Error processing success response for {video_url}: {e}", exc_info=True) return None elif response.status_code == 400: logger.error(f"[Apify] Bad Request (400) for {video_url}. Check payload. Resp:{response.text[:200]}") return None elif response.status_code == 401: logger.error("[Apify] Auth error (401). Check token.") return None elif response.status_code == 404: error_info = "Unknown 404 Error" try: error_data = response.json() error_info = error_data.get("error", {}).get("message", "No specific message") except Exception: error_info = response.text[:200] logger.error(f"[Apify] Endpoint/Actor Not Found (404). Error: '{error_info}'") return None else: logger.error(f"[Apify] Unexpected status {response.status_code} for {video_url}. Resp:{response.text[:200]}") return None except httpx.TimeoutException as e: logger.error(f"[Apify] Timeout during API interaction for {video_url}: {e}") return None except httpx.HTTPStatusError as e: logger.error(f"[Apify] HTTP Status Error during API interaction for {video_url}: {e}") return None except httpx.RequestError as e: logger.error(f"[Apify] Request error during API interaction for {video_url}: {e}") return None except Exception as e: logger.error(f"[Apify] Unexpected error during Apify REST call for {video_url}: {e}", exc_info=True) return None async def get_youtube_transcript(video_id: str, video_url: str) -> Optional[str]: global SUPADATA_API_KEY, APIFY_API_TOKEN if not video_id: logger.error("YT transcript: No video_id provided"); return None logger.info(f"Fetching YT transcript for video ID: {video_id} (URL: {video_url})") transcript_text = None logger.info("[Primary YT] Attempting youtube-transcript-api...") try: transcript_list = await asyncio.to_thread( YouTubeTranscriptApi.list_transcripts(video_id).find_generated_transcript(['en', 'en-GB', 'en-US']).fetch ) if transcript_list: transcript_text = " ".join([item['text'] for item in transcript_list if 'text' in item]) if transcript_text: logger.info(f"[Primary YT] Success via lib for {video_id} (len: {len(transcript_text)})") return transcript_text.strip() else: logger.warning(f"[Primary YT] Transcript list/text empty for {video_id}") transcript_text = None except TranscriptsDisabled: logger.warning(f"[Primary YT] Transcripts are disabled for video {video_id}") transcript_text = None except NoTranscriptFound: logger.warning(f"[Primary YT] No English transcript found for video {video_id}") transcript_text = None except Exception as e: logger.warning(f"[Primary YT] Error via lib for {video_id}: {e}") transcript_text = None # Fallback 1: Supadata if transcript_text is None: logger.info("[Fallback YT 1] Trying Supadata API...") if SUPADATA_API_KEY: transcript_text = await get_transcript_via_supadata(video_id, SUPADATA_API_KEY) if transcript_text: logger.info(f"[Fallback YT 1] Success via Supadata for {video_id}") return transcript_text # Already stripped else: logger.warning(f"[Fallback YT 1] Supadata failed or no content for {video_id}.") else: logger.warning("[Fallback YT 1] Supadata API key unavailable. Skipping.") # Fallback 2: Apify if transcript_text is None: logger.info("[Fallback YT 2] Trying Apify REST API...") if APIFY_API_TOKEN: transcript_text = await get_transcript_via_apify(video_url, APIFY_API_TOKEN) if transcript_text: logger.info(f"[Fallback YT 2] Success via Apify REST for {video_url}") return transcript_text # Already stripped else: logger.warning(f"[Fallback YT 2] Apify REST failed or no content for {video_url}.") else: logger.warning("[Fallback YT 2] Apify API token unavailable. Skipping.") if transcript_text is None: logger.error(f"All methods failed for YT transcript: {video_id}") return None return transcript_text # Should be stripped if found async def get_website_content_via_crawl4ai(url: str) -> Optional[str]: """Fetches website content using Crawl4AI (Primary Method).""" global _crawl4ai_primary_web_enabled if not _crawl4ai_primary_web_enabled: logger.error("[Crawl4AI] Lib not available."); return None if not url: logger.error("[Crawl4AI] No URL provided."); return None logger.info(f"[Crawl4AI] Attempting crawl: {url}") try: # Initialize with ignore_robots=True to bypass cache/permission issues async with AsyncWebCrawler(ignore_robots=True) as crawler: logger.info(f"[Crawl4AI] Initialized crawler (ignore_robots=True).") result = await crawler.arun(url=url, crawler_strategy="playwright", timeout=90) content = None if result and result.markdown: content = result.markdown.strip() elif result and result.text: # Fallback if markdown is missing content = result.text.strip() if content: logger.info(f"[Crawl4AI] Success crawling {url}. Content length: {len(content)}") return content else: logger.warning(f"[Crawl4AI] Crawl successful for {url}, but extracted content (markdown/text) is empty.") return None except asyncio.TimeoutError: logger.error(f"[Crawl4AI] Timeout occurred while crawling {url}") return None except PermissionError as e: # Should not happen with ignore_robots=True, but keep for logging logger.error(f"[Crawl4AI] Permission denied during crawl for {url}. Path: '{e.filename}'. Error: {e}", exc_info=True) return None except Exception as e: logger.error(f"[Crawl4AI] Unexpected error during crawl for {url}: {e}", exc_info=True) return None async def fetch_url_content_for_scrape(url: str, timeout: int = 25) -> Optional[str]: """Fetches HTML content for BeautifulSoup.""" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', # Allow image/webp 'Accept-Language': 'en-US,en;q=0.5', 'Connection': 'keep-alive', 'DNT': '1', # Do Not Track 'Upgrade-Insecure-Requests': '1' } try: async with httpx.AsyncClient(follow_redirects=True, timeout=timeout, headers=headers) as client: logger.debug(f"[BS4 Fetch] Sending request to {url}") response = await client.get(url) logger.debug(f"[BS4 Fetch] Received response {response.status_code} from {url}") response.raise_for_status() # Raise HTTP errors content_type = response.headers.get('content-type', '').lower() if 'html' not in content_type: logger.warning(f"[BS4 Fetch] Non-HTML content type from {url}: {content_type}") return None try: # Let httpx handle decoding return response.text except Exception as e: logger.error(f"[BS4 Fetch] Error reading response text for {url}: {e}") return None except httpx.HTTPStatusError as e: logger.error(f"[BS4 Fetch] HTTP error {e.response.status_code} fetching {url}: {e}") except httpx.TimeoutException: logger.error(f"[BS4 Fetch] Timeout error fetching {url}") except httpx.TooManyRedirects: logger.error(f"[BS4 Fetch] Too many redirects fetching {url}") except httpx.RequestError as e: logger.error(f"[BS4 Fetch] Request error fetching {url}: {e}") except Exception as e: logger.error(f"[BS4 Fetch] Unexpected error fetching {url}: {e}", exc_info=True) return None async def get_website_content_bs4(url: str) -> Optional[str]: """Fetches and parses website content using BeautifulSoup (Fallback 1).""" if not url: logger.error("[BS4] No URL provided"); return None logger.info(f"[BS4] Attempting basic fetch & parse for: {url}") html_content = await fetch_url_content_for_scrape(url) if not html_content: logger.warning(f"[BS4] Failed to fetch HTML for {url}") return None try: # Inner function for parsing (runs in thread) def parse_html(content): soup = BeautifulSoup(content, DEFAULT_PARSER) # Remove common non-content elements more aggressively for element in soup(["script", "style", "header", "footer", "nav", "aside", "form", "button", "input", "iframe", "img", "svg", "link", "meta", "noscript", "figure", "figcaption", "video", "audio", ".advertisement", ".ad", ".sidebar", ".popup", ".modal"]): try: element.extract() except: pass # Ignore if element already removed # Try various selectors for main content selectors = ['main', 'article', '[role="main"]', '#content', '.content', '#main-content', '.main-content', '#body', '.body', '#article-body', '.article-body', '.post-content', '.entry-content', '.page-content'] target_element = None for selector in selectors: try: target_element = soup.select_one(selector) if target_element: break except Exception as sel_e: logger.warning(f"[BS4] Invalid selector '{selector}': {sel_e}") continue # Fallback to body if no main element found if not target_element: target_element = soup.body if not target_element: logger.warning(f"[BS4] Could not find body or main content area for parsing {url}") return None # Extract text, clean up whitespace lines = [line.strip() for line in target_element.get_text(separator='\n', strip=True).splitlines() if line.strip()] text = " ".join(lines) # Join lines with single spaces text = re.sub(r'\s{2,}', ' ', text).strip() # Consolidate multiple spaces if not text: logger.warning(f"[BS4] Extracted text is empty after cleaning for {url}") return None return text # Run parsing in a separate thread text_content = await asyncio.to_thread(parse_html, html_content) if text_content: logger.info(f"[BS4] Success scrape/parse for {url} (final len: {len(text_content)})") return text_content else: logger.warning(f"[BS4] Parsing resulted in empty content for {url}") return None except Exception as e: logger.error(f"[BS4] Error scraping/parsing {url}: {e}", exc_info=True) return None async def get_website_content_via_api(url: str, api_key: str) -> Optional[str]: """Fetches website content using urltotext.com API (Fallback 2).""" if not url: logger.error("[API] No URL provided"); return None if not api_key: logger.error("[API] urltotext.com API key missing."); return None logger.info(f"[API] Attempting fetch via urltotext.com for: {url}") api_endpoint = "https://urltotext.com/api/v1/urltotext/" payload = { "url": url, "output_format": "text", "extract_main_content": True, "render_javascript": True, "residential_proxy": False } headers = { "Authorization": f"Token {api_key}", "Content-Type": "application/json" } try: async with httpx.AsyncClient(timeout=45.0) as client: logger.debug(f"[API] Sending request to urltotext.com API for {url}") response = await client.post(api_endpoint, headers=headers, json=payload) logger.debug(f"[API] Received status {response.status_code} from urltotext.com API for {url}") if response.status_code == 200: try: data = response.json() content = data.get("data", {}).get("content"); credits = data.get("credits_used", "N/A"); warning = data.get("data", {}).get("warning") if warning: logger.warning(f"[API] urltotext.com API Warning for {url}: {warning}") if content: logger.info(f"[API] Success via urltotext.com API for {url}. Len: {len(content)}. Credits: {credits}"); return content.strip() else: logger.warning(f"[API] urltotext.com API success but content empty for {url}. Resp: {data}"); return None except json.JSONDecodeError: logger.error(f"[API] Failed JSON decode urltotext.com for {url}. Resp:{response.text[:500]}"); return None except Exception as e: logger.error(f"[API] Error processing urltotext.com success response for {url}: {e}", exc_info=True); return None elif response.status_code == 402: logger.error(f"[API] Error 402 (Insufficient Credits) from urltotext.com API for {url}. Resp:{response.text[:200]}"); return None elif response.status_code == 400 and "url" in response.text.lower(): logger.error(f"[API] Error 400 (Likely Bad URL) from urltotext.com API for {url}. Resp:{response.text[:200]}"); return None elif response.status_code in [400, 401, 403, 422, 500]: logger.error(f"[API] Error {response.status_code} from urltotext.com API for {url}. Resp:{response.text[:200]}"); return None else: logger.error(f"[API] Unexpected status {response.status_code} from urltotext.com API for {url}. Resp:{response.text[:200]}"); return None except httpx.TimeoutException: logger.error(f"[API] Timeout connecting to urltotext.com API for {url}"); return None except httpx.RequestError as e: logger.error(f"[API] Request error connecting to urltotext.com API for {url}: {e}"); return None except Exception as e: logger.error(f"[API] Unexpected error during urltotext.com API call for {url}: {e}", exc_info=True); return None # --- Summarization Functions --- async def _call_gemini(text: str, summary_type: str) -> Tuple[Optional[str], Optional[str]]: """ Calls the Google Gemini API to generate a summary. """ global GEMINI_MODEL, _gemini_primary_enabled if not _gemini_primary_enabled: logger.error("[Gemini] Disabled."); return None, "Error: Primary AI service unavailable." if len(text) > MAX_INPUT_TOKEN_APPROX: logger.warning(f"[Gemini] Truncating input ({len(text)} > {MAX_INPUT_TOKEN_APPROX})"); text = text[:MAX_INPUT_TOKEN_APPROX] logger.info(f"[Gemini] Generating {summary_type} summary using {GEMINI_MODEL}. Input len: {len(text)}") if summary_type == "paragraph": prompt = f"""Please summarise the following text into a concise paragraph. Focus on the main points and key information. Avoid unnecessary jargon or overly complex sentences. Text to summarise: --- {text} --- Concise Paragraph Summary:""" elif summary_type == "points": prompt = f"""Please summarise the following text into a list of key bullet points. Each point should capture a distinct main idea or important piece of information. Aim for clarity and conciseness. Text to summarise: --- {text} --- Key Bullet Points Summary:""" else: logger.error(f"[Gemini] Invalid summary type: {summary_type}") return None, f"Error: Invalid summary type '{summary_type}' specified." # *** MODIFIED: Disable safety settings as requested *** safety_settings = { category: HarmBlockThreshold.BLOCK_NONE for category in HarmCategory if hasattr(HarmCategory, category.name) } # Ensure category exists logger.info("[Gemini] Safety settings disabled (BLOCK_NONE).") generation_config = genai.types.GenerationConfig( max_output_tokens=2048, temperature=0.7, ) try: model = genai.GenerativeModel(GEMINI_MODEL) logger.debug(f"[Gemini] Sending request...") response: GenerateContentResponse = await model.generate_content_async( prompt, generation_config=generation_config, safety_settings=safety_settings, ) # 1. Check prompt feedback for blocks (even with BLOCK_NONE, core harms might trigger) if not response.candidates: block_reason = "Unknown" safety_ratings_str = "N/A" if hasattr(response, 'prompt_feedback') and response.prompt_feedback: block_reason = str(response.prompt_feedback.block_reason or "Not specified") if response.prompt_feedback.safety_ratings: safety_ratings_str = ', '.join([f"{r.category.name}: {r.probability.name}" for r in response.prompt_feedback.safety_ratings]) error_msg = f"Error: Gemini response blocked (Prompt). Reason: {block_reason}. Safety: {safety_ratings_str}" logger.error(f"[Gemini] {error_msg}") return None, error_msg # 2. Check candidate's finish reason candidate = response.candidates[0] finish_reason_val = candidate.finish_reason finish_reason_str = str(finish_reason_val).upper() if finish_reason_val is not None else "UNSPECIFIED" logger.debug(f"[Gemini] Finish reason value: {finish_reason_val} -> {finish_reason_str}") candidate_safety_ratings_str = "N/A" if hasattr(candidate, 'safety_ratings') and candidate.safety_ratings: candidate_safety_ratings_str = ', '.join([f"{r.category.name}: {r.probability.name}" for r in candidate.safety_ratings]) # Check if finish reason indicates successful completion (STOP or MAX_TOKENS) # Compare the end of the string representation to handle potential enum prefixes success = any(finish_reason_str.endswith(reason) for reason in ["STOP", "MAX_TOKENS"]) if not success: # Treat SAFETY, RECITATION, OTHER, UNSPECIFIED as errors error_msg = f"Error: Gemini generation failed or finished unexpectedly. Reason: {finish_reason_str}. Safety: {candidate_safety_ratings_str}" logger.error(f"[Gemini] {error_msg}") return None, error_msg # Return specific error # Log warning if truncated, but proceed if finish_reason_str.endswith("MAX_TOKENS"): logger.warning("[Gemini] Output may be truncated due to max_tokens limit.") # 3. Extract text summary_text = "" extracted = False try: summary_text = response.text.strip() # Use shortcut if available extracted = True except Exception as e: logger.warning(f"[Gemini] Error accessing response.text: {e}. Trying parts.") # Fallback to parts if .text fails if candidate.content and candidate.content.parts: summary_text = "".join(part.text for part in candidate.content.parts if hasattr(part, "text")).strip() extracted = True # Check if text is empty even after successful finish reason if not extracted or not summary_text: logger.warning(f"[Gemini] Gemini returned empty summary despite finish reason '{finish_reason_str}'.") return None, "Error: AI generated an empty summary." logger.info(f"[Gemini] Summary extracted successfully (len: {len(summary_text)}).") return summary_text, None except AttributeError as e: logger.error(f"[Gemini] Attribute error accessing Gemini response: {e}. Structure might have changed.", exc_info=True) return None, f"Error: Failed to parse Gemini response ({e})." except Exception as e: logger.error(f"[Gemini] Error during API call: {e}", exc_info=True) return None, f"Error: Failed communication with Gemini ({e})." async def _call_openrouter(text: str, summary_type: str) -> Tuple[Optional[str], Optional[str]]: """ Calls the OpenRouter API to generate a summary. """ global OPENROUTER_API_KEY, OPENROUTER_MODEL, _openrouter_fallback_enabled if not _openrouter_fallback_enabled: logger.error("[OR] Disabled."); return None, "Error: Fallback AI unavailable." # Truncate if needed max_len = 100000 if len(text) > max_len: logger.warning(f"[OR] Truncating input ({len(text)} > {max_len})"); text = text[:max_len] logger.info(f"[OR] Generating {summary_type} summary ({OPENROUTER_MODEL}). Input len: {len(text)}") if summary_type == "paragraph": prompt_content = f"Please summarise the following text into a concise paragraph...\n\nText:\n---\n{text}\n---\n\nSummary:" elif summary_type == "points": prompt_content = f"Please summarise the following text into key bullet points...\n\nText:\n---\n{text}\n---\n\nSummary:" else: logger.error(f"[OR] Invalid type: {summary_type}"); return None, f"Error: Invalid summary type '{summary_type}'." headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", "HTTP-Referer": "https://github.com/fmab777/telegram-summary-bot", "X-Title": "Telegram Summary Bot", } payload = { "model": OPENROUTER_MODEL, "messages": [ {"role": "system", "content": "You are an expert summarizer. Provide summaries as requested."}, {"role": "user", "content": prompt_content} ], "max_tokens": 2048, "temperature": 0.7, } api_url = "https://openrouter.ai/api/v1/chat/completions" try: async with httpx.AsyncClient(timeout=120.0) as client: logger.debug(f"[OR] Sending request to {api_url}...") response = await client.post(api_url, headers=headers, json=payload) logger.debug(f"[OR] Received status code {response.status_code}") if response.status_code == 200: try: data = response.json() if data.get("choices") and len(data["choices"]) > 0: choice = data["choices"][0] message = choice.get("message") finish_reason = choice.get("finish_reason", "N/A") if message and message.get("content"): summary_text = message["content"].strip() if summary_text: logger.info(f"[OR] Summary generated successfully (len: {len(summary_text)}). Finish: {finish_reason}") if finish_reason == 'length': logger.warning("[OR] Summary may be truncated (max_tokens).") return summary_text, None else: logger.warning(f"[OR] OpenRouter returned empty summary content. Data: {data}") return None, "Error: Fallback AI generated empty summary." else: logger.error(f"[OR] Invalid response structure (missing message/content). Data: {data}") return None, "Error: Fallback AI returned invalid response format." else: logger.error(f"[OR] Invalid response structure (missing choices). Data: {data}") api_error = data.get("error", {}).get("message", "Unknown API error") return None, f"Error: Fallback AI response missing summary. API msg: {api_error}" except json.JSONDecodeError: logger.error(f"[OR] Failed to decode JSON response. Status: {response.status_code}, Text: {response.text[:500]}") return None, "Error: Fallback AI sent invalid JSON response." except Exception as e: logger.error(f"[OR] Error processing success response: {e}", exc_info=True) return None, f"Error: Failed to process Fallback AI response ({e})." else: # Handle API errors error_message = f"Error: Fallback AI service ({OPENROUTER_MODEL}) returned status {response.status_code}." try: error_details = response.json().get("error", {}).get("message", response.text[:200]) error_message += f" Details: {error_details}" except Exception: error_message += f" Response: {response.text[:200]}" logger.error(f"[OR] {error_message}") return None, error_message except httpx.TimeoutException: logger.error(f"[OR] Timeout connecting to OpenRouter API for {OPENROUTER_MODEL}") return None, "Error: Timed out connecting to fallback AI." except httpx.RequestError as e: logger.error(f"[OR] Request error connecting to OpenRouter API: {e}") return None, f"Error: Network error connecting to fallback AI ({e})." except Exception as e: logger.error(f"[OR] Unexpected error during OpenRouter API call: {e}", exc_info=True) return None, f"Error: Unexpected issue with fallback AI ({e})." async def generate_summary(text: str, summary_type: str) -> str: """ Generates a summary using primary (Gemini) then fallback (OpenRouter).""" global _gemini_primary_enabled, _openrouter_fallback_enabled, GEMINI_MODEL, OPENROUTER_MODEL logger.info(f"[Summary] Starting process...") error_message: Optional[str] = None # Try Gemini if _gemini_primary_enabled: logger.info(f"[Summary] Attempting primary AI: Gemini ({GEMINI_MODEL})") primary_summary, primary_error = await _call_gemini(text, summary_type) if primary_summary: logger.info("[Summary] Success with primary AI (Gemini).") return primary_summary else: logger.warning(f"[Summary] Primary AI (Gemini) failed: {primary_error}. Falling back.") error_message = f"Primary AI failed: {primary_error}" else: logger.warning("[Summary] Primary AI (Gemini) disabled. Falling back.") error_message = "Primary AI unavailable." # Try OpenRouter if Gemini failed or was disabled if _openrouter_fallback_enabled: logger.info(f"[Summary] Attempting fallback AI: OpenRouter ({OPENROUTER_MODEL})") fallback_summary, fallback_error = await _call_openrouter(text, summary_type) if fallback_summary: logger.info("[Summary] Success with fallback AI (OpenRouter).") return fallback_summary else: logger.error(f"[Summary] Fallback AI (OpenRouter) also failed: {fallback_error}") # Combine errors for the final message if error_message: return f"{error_message}\nFallback failed: {fallback_error}" else: # Should only happen if Gemini was disabled and OR failed return f"Fallback AI failed: {fallback_error}" else: # OpenRouter is disabled logger.error("[Summary] Fallback AI (OpenRouter) is disabled.") if error_message: # Primary failed AND fallback disabled return f"{error_message}\nFallback unavailable." else: # Primary disabled AND fallback disabled return "Error: Both primary and fallback AI services are unavailable." # Should not be reached if logic is correct logger.error("[Summary] Reached end of function unexpectedly. No summary generated.") final_error = error_message or "Unknown summary generation error." return f"Sorry, an error occurred: {final_error}" # --- Main Processing Task --- async def process_summary_task( user_id: int, chat_id: int, message_id_to_edit: Optional[int], url: str, summary_type: str, bot_token: str ) -> None: task_id = f"{user_id}-{message_id_to_edit or 'new'}" logger.info(f"[Task {task_id}] Starting processing for URL: {url}") bot: Optional[Bot] = None content: Optional[str] = None feedback: Optional[str] = None # Use 'feedback' for final user message (error or result part 1) success = False original_msg_id = message_id_to_edit try: # Initialize background bot bg_request = HTTPXRequest(connect_timeout=15.0, read_timeout=60.0, write_timeout=60.0, pool_timeout=60.0) bot = Bot(token=bot_token, request=bg_request) except Exception as e: logger.critical(f"[Task {task_id}] Failed background bot init: {e}", exc_info=True) return # Cannot proceed try: # Edit original message to "Processing..." proc_text = f"Generating '{summary_type}' summary...\nThis might take a moment..." if original_msg_id: try: await retry_bot_operation( bot.edit_message_text, chat_id=chat_id, message_id=original_msg_id, text=proc_text, parse_mode=ParseMode.HTML, # Keep HTML for potential future formatting reply_markup=None, link_preview_options={'is_disabled': True} ) logger.debug(f"[Task {task_id}] Edited original msg {original_msg_id} to 'Processing'") except Exception as e: logger.warning(f"[Task {task_id}] Failed edit original msg {original_msg_id}: {e}.") # Continue anyway, will just send result as new message later # Indicate activity --- FIX APPLIED --- try: await retry_bot_operation(bot.send_chat_action, chat_id=chat_id, action='typing') except Exception: pass # --- Get Content --- is_yt = is_youtube_url(url) logger.debug(f"[Task {task_id}] URL Type: {'YT' if is_yt else 'Web'}") if is_yt: vid = extract_youtube_id(url) if vid: content = await get_youtube_transcript(vid, url) else: feedback = "Invalid YouTube URL format." if not content and not feedback: feedback = "Could not get YouTube transcript (unavailable/no captions?)." else: # Website logger.info(f"[Task {task_id}] Trying Crawl4AI...") content = await get_website_content_via_crawl4ai(url) if not content: logger.warning(f"[Task {task_id}] Crawl4AI failed. Trying BS4...") # --- FIX APPLIED --- try: await retry_bot_operation(bot.send_chat_action, chat_id, action='typing') except Exception: pass content = await get_website_content_bs4(url) if not content: logger.warning(f"[Task {task_id}] BS4 failed. Trying API...") global URLTOTEXT_API_KEY, _urltotext_fallback_enabled if _urltotext_fallback_enabled: # --- FIX APPLIED --- try: await retry_bot_operation(bot.send_chat_action, chat_id, action='typing') except Exception: pass content = await get_website_content_via_api(url, URLTOTEXT_API_KEY) if not content: feedback = "Fetch failed (Crawl4AI/BS4/API error or credits)." else: feedback = "Fetch failed (Crawl4AI/BS4 error, API disabled)." # Final check if web content fetch failed if not content and not feedback: feedback = "Could not fetch website content using any method." # --- Generate Summary --- if content and not feedback: logger.info(f"[Task {task_id}] Content fetched (len:{len(content)}). Generating summary...") # --- FIX APPLIED --- try: await retry_bot_operation(bot.send_chat_action, chat_id, action='typing') except Exception: pass final_summary = await generate_summary(content, summary_type) if final_summary.startswith("Error:") or final_summary.startswith("Sorry,"): feedback = final_summary # Use AI error as user feedback logger.warning(f"[Task {task_id}] Summary generation failed: {feedback}") success = False else: # Success - Split into parts summary_parts = [] current_part = ""; lines = final_summary.splitlines(keepends=True) for line in lines: if len(current_part) + len(line) > MAX_SUMMARY_CHUNK_SIZE: if current_part.strip(): summary_parts.append(current_part.strip()) current_part = line[:MAX_SUMMARY_CHUNK_SIZE] if len(line) > MAX_SUMMARY_CHUNK_SIZE else line else: current_part += line if current_part.strip(): summary_parts.append(current_part.strip()) if not summary_parts: summary_parts.append("Summary generated, but empty."); logger.warning(f"[Task {task_id}] Summary generated but resulted in empty parts.") # Added logging/fallback text logger.info(f"[Task {task_id}] Summary OK (len: {len(final_summary)}). Sending {len(summary_parts)} part(s).") feedback = summary_parts[0] # First part becomes the feedback message success = True # Assume success initially for sending parts # Send remaining parts if any if len(summary_parts) > 1: for i, part in enumerate(summary_parts[1:], 2): await asyncio.sleep(0.5) try: await retry_bot_operation( bot.send_message, chat_id=chat_id, text=part, parse_mode=None, link_preview_options={'is_disabled': True} ) logger.debug(f"[Task {task_id}] Sent part {i}/{len(summary_parts)}.") except Exception as part_err: # If sending a later part fails, update feedback and mark as overall failure feedback = f"Sent part 1, but failed to send part {i}. Error: {part_err}" success = False logger.error(f"[Task {task_id}] {feedback}") break # Stop sending # --- Send Final Feedback/Result --- if feedback: # If feedback is set (either error or first summary part) final_text = feedback logger.info(f"[Task {task_id}] Sending final message (Success: {success}).") try: # Try to edit the original message first edited = False if original_msg_id: try: await retry_bot_operation( bot.edit_message_text, chat_id=chat_id, message_id=original_msg_id, text=final_text, parse_mode=None, # Use None for safety, assuming plain text output from AI reply_markup=None, link_preview_options={'is_disabled': True} ) logger.debug(f"[Task {task_id}] Edited original msg {original_msg_id} with final content.") edited = True except Exception as edit_err: logger.warning(f"[Task {task_id}] Failed final edit original msg {original_msg_id}: {edit_err}. Sending new.") # If editing failed or wasn't applicable, send as new message if not edited: await retry_bot_operation( bot.send_message, chat_id=chat_id, text=final_text, parse_mode=None, link_preview_options={'is_disabled': True} ) logger.debug(f"[Task {task_id}] Sent final content as new message.") except Exception as send_err: # Log failure to send even the final message logger.error(f"[Task {task_id}] CRITICAL: Failed to send final feedback/result message: {send_err}") success = False # Mark failure if we couldn't even send the result/error else: # This case should ideally not happen if logic is correct logger.error(f"[Task {task_id}] No feedback message set at end of task (Success: {success}).") # Optionally send a generic error if success is False here if not success: try: await retry_bot_operation(bot.send_message, chat_id, text="An unknown error occurred processing the request.") except Exception: pass except Exception as e: # Catch-all for unexpected task errors logger.error(f"[Task {task_id}] Unexpected error during processing task: {e}", exc_info=True) success = False feedback = "Oops! An unexpected error occurred. Please try again later." # Try to send this final crash feedback if bot: try: edited_crash = False if original_msg_id: try: await retry_bot_operation( bot.edit_message_text, chat_id, original_msg_id, text=feedback, reply_markup=None, link_preview_options={'is_disabled': True} ) edited_crash = True except Exception: pass if not edited_crash: await retry_bot_operation( bot.send_message, chat_id, text=feedback ) except Exception as final_err: logger.error(f"[Task {task_id}] Failed to send final CRASH error feedback: {final_err}") finally: # Cleanup # Close background bot client if bot and bot.request and hasattr(bot.request, '_client') and bot.request._client: try: await bot.request._client.aclose(); logger.debug(f"[Task {task_id}] BG client closed.") except Exception as e: logger.warning(f"[Task {task_id}] Error closing BG client: {e}") logger.info(f"[Task {task_id}] Task completed. Overall Success: {success}") # --- Telegram Handlers --- async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user = update.effective_user; mention = user.mention_html() if not user or not update.message: return logger.info(f"User {user.id} ({user.username or 'N/A'}) /start.") await update.message.reply_html( f"👋 {mention}! Send YT/website link to summarise." ) async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user = update.effective_user if not user or not update.message: return logger.info(f"User {user.id} ({user.username or 'N/A'}) /help.") help_text = ( "🔍 **How:**\n1. Send link.\n2. Choose type.\n3. Wait.\n\n" "⚙️ **Tech:**\n" f"• Web: `Crawl4AI` (ignores robots), `BS4`, `urltotext`.\n" f"• YT: `youtube-transcript-api`, `Supadata`, `Apify`.\n" f"• AI: `{GEMINI_MODEL}`, `{OPENROUTER_MODEL}`.\n\n" "`/start`, `/help`" ) await update.message.reply_text(help_text, parse_mode=ParseMode.MARKDOWN) async def handle_potential_url(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: if not update.message or not update.message.text: return url = update.message.text.strip(); user = update.effective_user; if not user: return if not re.match(r'https?://[^\s/$.?#].[^\s]*', url, re.I): logger.debug(f"Ignoring non-URL from {user.id}: {url}") await update.message.reply_text("Invalid URL format. Use http(s)://..."); return logger.info(f"User {user.id} sent URL: {url}") context.user_data['url_to_summarize'] = url context.user_data['original_message_id'] = update.message.message_id keys = [[ InlineKeyboardButton("Paragraph", callback_data="paragraph"), InlineKeyboardButton("Points", callback_data="points") ]] markup = InlineKeyboardMarkup(keys) await update.message.reply_html( f"Link:\n{html.escape(url)}\n\nSummary type?", reply_markup=markup, disable_web_page_preview=True ) async def handle_summary_type_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: query = update.callback_query if not query or not query.message or not query.from_user: logger.warning("Callback missing data."); return user = query.from_user; summary_type = query.data; qid = query.id try: await query.answer(); logger.debug(f"Ack cb {qid} from {user.id}") except Exception as e: logger.warning(f"Err answering cb {qid}: {e}") url = context.user_data.get('url_to_summarize'); msg_id = query.message.message_id logger.info(f"User {user.id} chose '{summary_type}' for msg {msg_id}. URL context: {'Yes' if url else 'No'}") if not url: logger.warning(f"No URL context user {user.id} (cb {qid})."); try: await query.edit_message_text("Context lost. Please send URL again.", reply_markup=None) except Exception as e: logger.error(f"Failed edit 'URL not found': {e}"); return global TELEGRAM_TOKEN, _gemini_primary_enabled, _openrouter_fallback_enabled # Check configs if not TELEGRAM_TOKEN: logger.critical("TG TOKEN missing!") try: await query.edit_message_text("❌ Bot config error (Token).", reply_markup=None) except Exception as e: logger.error(f"Failed edit msg for TOKEN error: {e}") return if not _gemini_primary_enabled and not _openrouter_fallback_enabled: logger.critical("No AI models available!") try: await query.edit_message_text("❌ AI config error (Models).", reply_markup=None) except Exception as e: logger.error(f"Failed edit msg for AI config error: {e}") return if not _gemini_primary_enabled: logger.warning("Primary AI unavailable, using fallback.") elif not _openrouter_fallback_enabled: logger.warning("Fallback AI unavailable, using primary.") # Schedule task logger.info(f"Scheduling task user {user.id}...") asyncio.ensure_future( process_summary_task( user_id=user.id, chat_id=query.message.chat_id, message_id_to_edit=msg_id, url=url, summary_type=summary_type, bot_token=TELEGRAM_TOKEN ) ) # Clear context after scheduling context.user_data.pop('url_to_summarize', None); context.user_data.pop('original_message_id', None) logger.debug(f"Cleared context user {user.id} post-schedule.") async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None: ignore_errors = (AttributeError, BadRequest, TimedOut, NetworkError, RetryAfter) if isinstance(context.error, ignore_errors): ignore_messages = ["message is not modified", "query is too old", "message to edit not found", "chat not found", "bot was blocked by the user"] err_str = str(context.error).lower() if any(msg in err_str for msg in ignore_messages) or isinstance(context.error, (TimedOut, NetworkError, RetryAfter)): logger.warning(f"Ignoring known/transient error: {context.error}") return logger.error("Exception handling update:", exc_info=context.error) # --- Application Setup --- async def setup_bot_config() -> Application: logger.info("Configuring Telegram App..."); global TELEGRAM_TOKEN if not TELEGRAM_TOKEN: raise ValueError("TG TOKEN missing.") req = HTTPXRequest(connect_timeout=10.0, read_timeout=30.0, write_timeout=30.0, pool_timeout=60.0) app = ( Application.builder().token(TELEGRAM_TOKEN).request(req).build() ) app.add_handler(CommandHandler("start", start)) app.add_handler(CommandHandler("help", help_command)) url_filter = filters.TEXT & ~filters.COMMAND & (filters.Entity("url") | filters.Entity("text_link") | filters.Regex(r'https?://[^\s]+')) app.add_handler(MessageHandler(url_filter, handle_potential_url)) app.add_handler(CallbackQueryHandler(handle_summary_type_callback)) app.add_error_handler(error_handler) logger.info("TG handlers configured."); return app # --- ASGI Lifespan & Routes --- @contextlib.asynccontextmanager async def lifespan(app: Starlette): global ptb_app, WEBHOOK_SECRET, TELEGRAM_TOKEN; logger.info("Lifespan: Startup..."); if not TELEGRAM_TOKEN: raise RuntimeError("Telegram token missing.") try: ptb_app = await setup_bot_config(); await ptb_app.initialize() bot_info = await ptb_app.bot.get_me(); logger.info(f"Bot init: @{bot_info.username} ({bot_info.id})") # Webhook setup current_info = await ptb_app.bot.get_webhook_info(); deleted_ok = True if current_info and current_info.url: logger.info(f"Deleting existing webhook: {current_info.url}...") try: deleted_ok = await ptb_app.bot.delete_webhook(drop_pending_updates=True); logger.info("WH deleted." if deleted_ok else "WH delete failed.") except Exception as e: logger.warning(f"WH delete error: {e}"); deleted_ok = False; await asyncio.sleep(1) host = os.environ.get("SPACE_HOST"); path="/webhook"; if not host: raise RuntimeError("SPACE_HOST missing.") wh_url = f"https://{host.split('://')[-1].rstrip('/')}{path}" if wh_url and deleted_ok: logger.info(f"Setting webhook: {wh_url}") args = { "url": wh_url, "allowed_updates": Update.ALL_TYPES, "drop_pending_updates": True } if WEBHOOK_SECRET: args["secret_token"] = WEBHOOK_SECRET; logger.info("Using webhook secret.") await asyncio.sleep(1.0) try: if not await ptb_app.bot.set_webhook(**args): raise RuntimeError("set_webhook returned False.") await asyncio.sleep(1.5); info = await ptb_app.bot.get_webhook_info() if not (info and info.url == wh_url): raise RuntimeError(f"WH verify fail! Expected '{wh_url}', Got: {info}") # --- Both Previous Fixes Applied --- logger.info(f"WH set & verified: URL='{info.url}', Secret={'YES' if WEBHOOK_SECRET else 'NO'}") if info.last_error_message: logger.warning(f"WH status error: {info.last_error_message}") # --- This Line Corrected Now --- await ptb_app.start() logger.info("PTB started (webhook).") except Exception as e: logger.error(f"FATAL: WH setup error: {e}", exc_info=True) raise RuntimeError(f"WH setup fail: {e}") from e elif not deleted_ok: raise RuntimeError("Failed to delete previous webhook.") logger.info("Lifespan: Startup complete."); yield except Exception as startup_err: logger.critical(f"Startup failed: {startup_err}", exc_info=True); raise # Reraise to stop Gunicorn finally: # Shutdown logger.info("Lifespan: Shutdown..."); if ptb_app: try: if ptb_app.running: await ptb_app.stop(); logger.info("PTB stopped.") if ptb_app._initialized: await ptb_app.shutdown(); logger.info("PTB shutdown.") except Exception as e: logger.error(f"Error during PTB shutdown: {e}", exc_info=True) logger.info("Lifespan: Shutdown complete.") async def health_check(request: Request) -> PlainTextResponse: global OPENROUTER_MODEL, GEMINI_MODEL, APIFY_ACTOR_ID, _apify_token_exists, _gemini_primary_enabled, _openrouter_fallback_enabled, _crawl4ai_primary_web_enabled, _urltotext_fallback_enabled, SUPADATA_API_KEY bot_status = "Not Initialized"; bot_username = "N/A" if ptb_app and ptb_app.bot and ptb_app._initialized: try: wh_info = await ptb_app.bot.get_webhook_info() if ptb_app.running and wh_info and wh_info.url: bot_info = await ptb_app.bot.get_me(); bot_username = f"@{bot_info.username}" bot_status = f"Running (Webhook OK, {bot_username})" elif ptb_app.running: bot_status = f"Running (Webhook Status: {wh_info.url if wh_info else 'N/A'}, Last Error: {wh_info.last_error_message if wh_info else 'N/A'})" else: bot_status = "Initialized/Not running" except Exception as e: logger.error(f"Health check status error: {e}", exc_info=True); bot_status = f"Error checking status: {e}" elif ptb_app: bot_status = "Initializing..." health_info = [ f"=== Bot Status ===", f"Application: {bot_status}", "--- Services ---", f"Web Scraper 1 (Primary): {'Crawl4AI (ignore_robots)' if _crawl4ai_primary_web_enabled else 'DISABLED'}", f"Web Scraper 2 (Fallback): BeautifulSoup", f"Web Scraper 3 (Fallback): {'urltotext.com API' if _urltotext_fallback_enabled else 'DISABLED'}", f"Summarizer 1 (Primary): {'Gemini (' + GEMINI_MODEL + ')' if _gemini_primary_enabled else 'DISABLED'}", f"Summarizer 2 (Fallback): {'OpenRouter (' + OPENROUTER_MODEL + ')' if _openrouter_fallback_enabled else 'DISABLED'}", f"YT Transcript 1 (Primary): youtube-transcript-api", f"YT Transcript 2 (Fallback): {'Supadata API' if SUPADATA_API_KEY else 'DISABLED'}", f"YT Transcript 3 (Fallback): {'Apify (' + APIFY_ACTOR_ID + ')' if _apify_token_exists else 'DISABLED'}" ] return PlainTextResponse("\n".join(health_info)) async def telegram_webhook(request: Request) -> Response: global WEBHOOK_SECRET, ptb_app # Check if app is ready if not ptb_app or not ptb_app._initialized or not ptb_app.running: status = "Not Initialized" if not ptb_app else ("Initializing" if not ptb_app._initialized else "Not Running") logger.error(f"Webhook received but PTB application {status}.") return PlainTextResponse(f'Bot {status}', status_code=503) # Validate secret if WEBHOOK_SECRET: token_header = request.headers.get("X-Telegram-Bot-Api-Secret-Token") if token_header != WEBHOOK_SECRET: logger.warning("Webhook received with invalid secret token.") return Response(content="Invalid secret token", status_code=403) # Process update try: update_data = await request.json() update = Update.de_json(data=update_data, bot=ptb_app.bot) logger.debug(f"Processing update_id: {update.update_id} via webhook") await ptb_app.process_update(update) return Response(status_code=200) # OK to Telegram except json.JSONDecodeError: logger.error("Webhook received invalid JSON.") return PlainTextResponse('Bad Request: Invalid JSON', status_code=400) except Exception as e: logger.error(f"Error processing webhook update: {e}", exc_info=True) return Response(status_code=200) # Still return OK to TG to prevent retries for processing errors # --- ASGI App Definition --- app = Starlette( debug=False, lifespan=lifespan, routes=[ Route("/", endpoint=health_check, methods=["GET"]), Route("/webhook", endpoint=telegram_webhook, methods=["POST"]), ] ) logger.info("Starlette ASGI application created.") # --- Development Runner --- if __name__ == '__main__': import uvicorn logger.warning("Running DEV mode - FOR LOCAL TESTING ONLY") log_level = os.environ.get("LOGGING_LEVEL", "info").lower() port = int(os.environ.get('PORT', 8080)) try: from dotenv import load_dotenv load_dotenv() logger.info(".env file loaded for local development.") except ImportError: logger.info(".env file not found or python-dotenv not installed.") # Check required secrets for local dev if not get_secret('TELEGRAM_TOKEN'): logger.critical("Local Dev: TELEGRAM_TOKEN missing.") if not get_secret('GEMINI_API_KEY'): logger.error("Local Dev: GEMINI_API_KEY missing.") uvicorn.run( "main:app", host='0.0.0.0', port=port, log_level=log_level, reload=True )