Spaces:
Running
Running
# main.py (Revised with Starlette-only routes and fixes) | |
import os | |
import re | |
import logging | |
import asyncio | |
import json | |
import html | |
import contextlib | |
import traceback | |
from typing import Optional, Dict, Any | |
# --- Frameworks --- | |
# Removed Flask imports | |
from starlette.applications import Starlette | |
from starlette.routing import Route # Changed from Mount | |
from starlette.responses import PlainTextResponse, JSONResponse, Response # Starlette responses | |
from starlette.requests import Request # Starlette request object | |
# --- Telegram Bot --- | |
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, Bot | |
from telegram.ext import ( | |
Application, | |
CommandHandler, | |
MessageHandler, | |
filters, | |
ContextTypes, | |
CallbackQueryHandler, | |
) | |
from telegram.constants import ParseMode | |
from telegram.error import NetworkError, RetryAfter, TimedOut, BadRequest, TelegramError | |
from telegram.request import HTTPXRequest | |
# --- Other Libraries --- | |
import httpx | |
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound | |
import requests | |
from bs4 import BeautifulSoup | |
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, before_sleep_log | |
_apify_token_exists = bool(os.environ.get('APIFY_API_TOKEN')) | |
if _apify_token_exists: | |
from apify_client import ApifyClient | |
from apify_client.consts import ActorJobStatus | |
else: | |
ApifyClient = None # type: ignore # Make type checkers happy | |
# --- Logging Setup --- | |
logging.basicConfig( | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
level=logging.INFO # Changed default to INFO, DEBUG is very verbose | |
) | |
logging.getLogger("httpx").setLevel(logging.WARNING) | |
if ApifyClient: logging.getLogger("apify_client").setLevel(logging.WARNING) | |
logging.getLogger("telegram.ext").setLevel(logging.INFO) | |
logging.getLogger('telegram.bot').setLevel(logging.INFO) | |
logging.getLogger("urllib3").setLevel(logging.INFO) | |
logging.getLogger('gunicorn.error').setLevel(logging.INFO) | |
logging.getLogger('uvicorn').setLevel(logging.INFO) | |
logging.getLogger('starlette').setLevel(logging.INFO) | |
logger = logging.getLogger(__name__) | |
logger.info("Logging configured.") | |
# --- Global variable for PTB app --- | |
ptb_app: Optional[Application] = None | |
# --- Environment Variable Loading --- | |
logger.info("Attempting to load secrets...") | |
def get_secret(secret_name): | |
value = os.environ.get(secret_name) | |
if value: logger.info(f"Secret '{secret_name}': Found (Value length: {len(value)})") | |
else: logger.warning(f"Secret '{secret_name}': Not Found") | |
return value | |
TELEGRAM_TOKEN = get_secret('TELEGRAM_TOKEN') | |
OPENROUTER_API_KEY = get_secret('OPENROUTER_API_KEY') | |
URLTOTEXT_API_KEY = get_secret('URLTOTEXT_API_KEY') | |
SUPADATA_API_KEY = get_secret('SUPADATA_API_KEY') | |
APIFY_API_TOKEN = get_secret('APIFY_API_TOKEN') | |
logger.info("Secret loading attempt finished.") | |
# --- Retry Decorator for Bot Operations --- | |
# Increased attempts slightly and added logging before sleep | |
# Explicitly catch TelegramError subclasses for retries | |
async def retry_bot_operation(func, *args, **kwargs): | |
"""Wrapper to retry bot operations with exponential backoff.""" | |
# Note: This is now a wrapper function, not a decorator factory | |
# Call it like: await retry_bot_operation(bot.send_message, chat_id=..., text=...) | |
try: | |
return await func(*args, **kwargs) | |
except BadRequest as e: | |
# Don't retry on bad requests that are likely permanent (e.g., chat not found) | |
# unless it's a common transient issue like message not modified | |
if "message is not modified" in str(e).lower(): | |
logger.warning(f"Ignoring 'message not modified' error: {e}") | |
return None # Or indicate success/no action needed | |
logger.error(f"BadRequest during bot operation (will not retry unless specific): {e}") | |
raise # Reraise non-retryable BadRequests | |
except TelegramError as e: | |
logger.warning(f"TelegramError during bot operation (will retry): {e}") | |
raise # Reraise retryable errors for tenacity | |
except Exception as e: | |
logger.error(f"Unexpected error during bot operation: {e}", exc_info=True) | |
raise # Reraise unexpected errors | |
# --- Helper Functions --- | |
def is_youtube_url(url): | |
"""Checks if the URL is a valid YouTube video or shorts URL.""" | |
# More robust regex to handle various youtube domain variations and query params | |
youtube_regex = re.compile( | |
r'(?:https?://)?(?:www\.)?(?:m\.)?(?:youtube(?:-nocookie)?\.com|youtu\.be)/' | |
r'(?:watch\?v=|embed/|v/|shorts/|live/|attribution_link\?a=.*&u=/watch\?v=)?' | |
r'([\w-]{11})' # Group 1: Video ID | |
r'(?:\S+)?', # Optional non-whitespace characters after ID | |
re.IGNORECASE) | |
match = youtube_regex.search(url) | |
logger.debug(f"is_youtube_url check for '{url}': {'Match found' if match else 'No match'}") | |
return bool(match) | |
def extract_youtube_id(url): | |
"""Extracts the YouTube video ID from a URL.""" | |
# Use the same robust regex as is_youtube_url | |
youtube_regex = re.compile( | |
r'(?:https?://)?(?:www\.)?(?:m\.)?(?:youtube(?:-nocookie)?\.com|youtu\.be)/' | |
r'(?:watch\?v=|embed/|v/|shorts/|live/|attribution_link\?a=.*&u=/watch\?v=)?' | |
r'([\w-]{11})' # Group 1: Video ID | |
r'(?:\S+)?', | |
re.IGNORECASE) | |
match = youtube_regex.search(url) | |
if match: | |
video_id = match.group(1) | |
logger.debug(f"Extracted YouTube ID '{video_id}' from URL: {url}") | |
return video_id | |
else: | |
logger.warning(f"Could not extract YouTube ID from URL: {url}") | |
return None | |
# --- Content Fetching Functions --- | |
# Using httpx for async requests | |
async def fetch_url_content(url: str, timeout: int = 20) -> Optional[str]: | |
"""Fetches content from a URL using httpx asynchronously.""" | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
} | |
try: | |
async with httpx.AsyncClient(follow_redirects=True, timeout=timeout, headers=headers) as client: | |
response = await client.get(url) | |
response.raise_for_status() # Raise an exception for bad status codes | |
# Detect encoding, fallback to UTF-8 | |
response.encoding = response.apparent_encoding or 'utf-8' | |
return response.text | |
except httpx.HTTPStatusError as e: | |
logger.error(f"HTTP error fetching {url}: {e.response.status_code} - {e}") | |
except httpx.RequestError as e: | |
logger.error(f"Request error fetching {url}: {e}") | |
except Exception as e: | |
logger.error(f"Unexpected error fetching {url}: {e}", exc_info=True) | |
return None | |
async def get_transcript_via_supadata(video_id: str, api_key: str) -> Optional[str]: | |
"""Fetches YouTube transcript using Supadata API.""" | |
if not api_key: return None | |
api_url = f"https://api.supadata.net/youtube/transcript?video_id={video_id}" | |
headers = {'X-API-Key': api_key, 'Accept': 'application/json'} | |
logger.info(f"Attempting transcript fetch via Supadata for {video_id}") | |
try: | |
async with httpx.AsyncClient(timeout=30.0) as client: | |
response = await client.get(api_url, headers=headers) | |
response.raise_for_status() | |
data = response.json() | |
if data and isinstance(data, list) and data[0].get("text"): | |
transcript = " ".join([item["text"] for item in data if "text" in item]) | |
logger.info(f"Supadata transcript fetched successfully for {video_id} (length: {len(transcript)})") | |
return transcript | |
else: | |
logger.warning(f"Supadata response format unexpected or empty for {video_id}: {data}") | |
return None | |
except httpx.HTTPStatusError as e: | |
logger.error(f"Supadata API HTTP error for {video_id}: {e.response.status_code} - {e}") | |
except Exception as e: | |
logger.error(f"Error fetching transcript via Supadata for {video_id}: {e}", exc_info=True) | |
return None | |
async def get_transcript_via_apify(video_id: str, api_token: str) -> Optional[str]: | |
"""Fetches YouTube transcript using Apify YouTube Scraper Actor.""" | |
if not ApifyClient or not api_token: return None | |
logger.info(f"Attempting transcript fetch via Apify for {video_id}") | |
try: | |
client = ApifyClient(api_token) | |
actor_call = client.actor("clockworks/youtube-scraper").call( | |
run_input={ | |
"startUrls": [f"https://www.youtube.com/watch?v={video_id}"], | |
"maxResultStreams": 0, | |
"maxResults": 0, | |
"maxResultCommentStreams": 0, | |
"proxyConfiguration": {"useApifyProxy": True}, | |
"subtitles": True, # Explicitly request subtitles/transcript | |
"extendOutputFunction": 'async ({ data, item, page, request, customData, Apify }) => { return item; }', | |
} | |
) | |
# Wait for the actor run to complete and fetch results | |
dataset_items = client.dataset(actor_call["defaultDatasetId"]).list_items().items | |
if dataset_items: | |
# Look for transcript data within the results | |
for item in dataset_items: | |
if 'subtitles' in item and isinstance(item['subtitles'], list) and len(item['subtitles']) > 0: | |
# Combine transcript lines, assuming standard format | |
transcript = " ".join(line.get('text', '') for line in item['subtitles'][0].get('lines', [])) | |
if transcript.strip(): | |
logger.info(f"Apify transcript fetched successfully for {video_id} (length: {len(transcript)})") | |
return transcript.strip() | |
logger.warning(f"Apify run completed for {video_id}, but no transcript found in results.") | |
else: | |
logger.warning(f"Apify run completed for {video_id}, but dataset was empty.") | |
except Exception as e: | |
logger.error(f"Error fetching transcript via Apify for {video_id}: {e}", exc_info=True) | |
return None | |
async def get_youtube_transcript(video_id: str, url: str, supadata_key: Optional[str], apify_token: Optional[str]) -> Optional[str]: | |
"""Tries different methods to get a YouTube transcript.""" | |
transcript = None | |
# 1. Try Supadata API (if key exists) | |
if supadata_key: | |
transcript = await get_transcript_via_supadata(video_id, supadata_key) | |
if transcript: return transcript | |
# 2. Try youtube-transcript-api (Direct method) | |
logger.info(f"Attempting transcript fetch via youtube-transcript-api for {video_id}") | |
try: | |
# Run in executor to avoid blocking async loop | |
transcript_list = await asyncio.to_thread(YouTubeTranscriptApi.get_transcript, video_id) | |
transcript = " ".join([item['text'] for item in transcript_list]) | |
logger.info(f"youtube-transcript-api transcript fetched successfully for {video_id} (length: {len(transcript)})") | |
return transcript | |
except (TranscriptsDisabled, NoTranscriptFound): | |
logger.warning(f"Transcripts disabled or unavailable via youtube-transcript-api for {video_id}.") | |
except Exception as e: | |
logger.error(f"Error using youtube-transcript-api for {video_id}: {e}") | |
# 3. Try Apify (if token exists and other methods failed) | |
if not transcript and apify_token: | |
transcript = await get_transcript_via_apify(video_id, apify_token) | |
if transcript: return transcript | |
logger.warning(f"Failed to retrieve transcript for YouTube video {video_id} using all available methods.") | |
return None | |
async def get_website_content_via_requests(url: str) -> Optional[str]: | |
"""Fetches and extracts main text content from a website using BeautifulSoup.""" | |
logger.info(f"Attempting website scrape via requests/BeautifulSoup for: {url}") | |
html_content = await fetch_url_content(url) | |
if not html_content: | |
return None | |
try: | |
# Run BeautifulSoup parsing in executor thread | |
def parse_html(content): | |
soup = BeautifulSoup(content, 'html.parser') | |
# Remove script and style elements | |
for script_or_style in soup(["script", "style", "nav", "footer", "aside"]): | |
script_or_style.decompose() | |
# Get text, strip whitespace, join lines | |
text = soup.get_text(separator='\n', strip=True) | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
text = '\n'.join(chunk for chunk in chunks if chunk) | |
return text | |
text_content = await asyncio.to_thread(parse_html, html_content) | |
if text_content and len(text_content) > 100: # Basic check for meaningful content | |
logger.info(f"Successfully scraped content via requests/BeautifulSoup for {url} (length: {len(text_content)})") | |
return text_content | |
else: | |
logger.warning(f"Scraping via requests/BeautifulSoup for {url} yielded minimal content (length: {len(text_content) if text_content else 0}).") | |
return None | |
except Exception as e: | |
logger.error(f"Error parsing website content with BeautifulSoup for {url}: {e}", exc_info=True) | |
return None | |
async def get_website_content_via_urltotext_api(url: str, api_key: str) -> Optional[str]: | |
"""Fetches website content using the UrlToText API.""" | |
if not api_key: return None | |
api_endpoint = "https://api.urltotext.ai/text" | |
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} | |
payload = {"url": url, "text_only": True} # Requesting only text content | |
logger.info(f"Attempting website content fetch via UrlToText API for: {url}") | |
try: | |
async with httpx.AsyncClient(timeout=45.0) as client: # Increased timeout | |
response = await client.post(api_endpoint, headers=headers, json=payload) | |
response.raise_for_status() | |
data = response.json() | |
if "text" in data and data["text"]: | |
content = data["text"] | |
logger.info(f"Successfully fetched content via UrlToText API for {url} (length: {len(content)})") | |
return content | |
else: | |
logger.warning(f"UrlToText API response did not contain text for {url}. Response: {data}") | |
return None | |
except httpx.HTTPStatusError as e: | |
logger.error(f"UrlToText API HTTP error for {url}: {e.response.status_code} - {e}") | |
except Exception as e: | |
logger.error(f"Error fetching content via UrlToText API for {url}: {e}", exc_info=True) | |
return None | |
# --- Summarization Function --- | |
async def generate_summary(content: str, summary_type: str, api_key: Optional[str]) -> str: | |
"""Generates a summary using OpenRouter API.""" | |
if not api_key: | |
return "Error: OpenRouter API key is not configured." | |
if not content: | |
return "Error: No content provided to summarize." | |
# Basic check for content length | |
if len(content) < 50: | |
return "The provided content is too short to summarize effectively." | |
# Truncate content if too long (adjust limit as needed based on model context window) | |
# This limit depends heavily on the chosen model. Claude Sonnet 3.5 has 200k tokens. | |
# Let's aim for roughly 100k characters as a safe-ish limit for many models. | |
max_chars = 100000 | |
if len(content) > max_chars: | |
logger.warning(f"Content length ({len(content)}) exceeds max_chars ({max_chars}), truncating.") | |
content = content[:max_chars] | |
prompt_template = """ | |
Please summarize the following text. | |
Provide the summary in {format_style} format. | |
Text to summarize: | |
--- | |
{text} | |
--- | |
Summary ({format_style}): | |
""" | |
format_style = "a concise paragraph" if summary_type == "paragraph" else "bullet points (using * or - for each point)" | |
prompt = prompt_template.format(text=content, format_style=format_style) | |
# Recommended model: claude-3.5-sonnet-20240620, but allow others | |
model_to_use = os.environ.get("OPENROUTER_MODEL", "anthropic/claude-3.5-sonnet") # Default to Claude 3.5 Sonnet | |
logger.info(f"Sending request to OpenRouter (Model: {model_to_use}) for {summary_type} summary.") | |
try: | |
async with httpx.AsyncClient(timeout=120.0) as client: # Increased timeout for generation | |
response = await client.post( | |
url="https://openrouter.ai/api/v1/chat/completions", | |
headers={ | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
}, | |
json={ | |
"model": model_to_use, | |
"messages": [{"role": "user", "content": prompt}], | |
"max_tokens": 1024, # Adjust as needed | |
}, | |
) | |
response.raise_for_status() | |
data = response.json() | |
if data.get("choices") and len(data["choices"]) > 0: | |
summary = data["choices"][0].get("message", {}).get("content", "").strip() | |
if summary: | |
logger.info(f"Summary generated successfully (length: {len(summary)})") | |
# Simple Markdown sanitization (escape potential conflicts) | |
summary = summary.replace('_', r'\_').replace('*', r'\*').replace('[', r'\[').replace('`', r'\`') | |
return summary | |
else: | |
logger.error("OpenRouter response successful, but summary content is empty.") | |
return "Sorry, the AI generated an empty summary. Please try again." | |
else: | |
logger.error(f"OpenRouter response format unexpected: {data}") | |
return "Sorry, I received an unexpected response from the summarization service." | |
except httpx.HTTPStatusError as e: | |
error_body = "" | |
try: | |
error_body = e.response.text | |
except Exception: | |
pass # Ignore if reading body fails | |
logger.error(f"OpenRouter API HTTP error: {e.response.status_code} - {e}. Response body: {error_body}") | |
return f"Sorry, there was an error communicating with the summarization service (HTTP {e.response.status_code})." | |
except Exception as e: | |
logger.error(f"Error generating summary via OpenRouter: {e}", exc_info=True) | |
return "Sorry, an unexpected error occurred while generating the summary." | |
# --- Background Task Processing --- | |
async def process_summary_task( | |
user_id: int, | |
chat_id: int, | |
message_id_to_edit: Optional[int], # Can be None if original message couldn't be edited | |
url: str, | |
summary_type: str, | |
bot_token: str | |
) -> None: | |
"""Handles the actual fetching and summarization in a background task.""" | |
task_id = f"{user_id}-{message_id_to_edit or 'new'}" | |
logger.info(f"[Task {task_id}] Starting processing for URL: {url}") | |
# Create a new bot instance for this task | |
# Use longer timeouts for background tasks which might involve heavy network I/O | |
custom_request = HTTPXRequest( | |
connect_timeout=15.0, read_timeout=60.0, write_timeout=60.0, pool_timeout=60.0, http_version="1.1" | |
) | |
bot = Bot(token=bot_token, request=custom_request) | |
content = None | |
user_feedback_message = None | |
success = False | |
final_summary = "" | |
status_message_id = message_id_to_edit # Start assuming we can edit the original | |
try: | |
# --- Inform User Processing Has Started (Edit original or send new) --- | |
processing_message_text = f"⏳ Working on your '{summary_type}' summary for:\n`{url}`\n\n_(Fetching & summarizing...)_" | |
if status_message_id: | |
try: | |
await retry_bot_operation( | |
bot.edit_message_text, | |
chat_id=chat_id, | |
message_id=status_message_id, | |
text=processing_message_text, | |
parse_mode=ParseMode.MARKDOWN, | |
reply_markup=None # Remove buttons | |
) | |
logger.debug(f"[Task {task_id}] Successfully edited message {status_message_id} to 'Processing'") | |
except Exception as e: | |
logger.warning(f"[Task {task_id}] Could not edit original message {status_message_id}: {e}. Will send a new status message.") | |
status_message_id = None # Flag to send a new message | |
if not status_message_id: # If editing failed or wasn't possible | |
try: | |
status_message = await retry_bot_operation( | |
bot.send_message, | |
chat_id=chat_id, | |
text=processing_message_text, | |
parse_mode=ParseMode.MARKDOWN | |
) | |
status_message_id = status_message.message_id | |
logger.debug(f"[Task {task_id}] Sent new status message {status_message_id}") | |
except Exception as e: | |
logger.error(f"[Task {task_id}] Failed to send new status message: {e}") | |
# Cannot proceed without informing user | |
raise RuntimeError("Failed to send initial status message") from e | |
# --- Main Content Fetching and Summarization --- | |
try: | |
await retry_bot_operation(bot.send_chat_action, chat_id=chat_id, action='typing') | |
# --- Determine Content Type and Fetch --- | |
is_yt = is_youtube_url(url) | |
logger.debug(f"[Task {task_id}] URL is YouTube: {is_yt}") | |
if is_yt: | |
video_id = extract_youtube_id(url) | |
if video_id: | |
logger.info(f"[Task {task_id}] Fetching YouTube transcript for {video_id}") | |
content = await get_youtube_transcript(video_id, url, SUPADATA_API_KEY, APIFY_API_TOKEN) | |
if not content: | |
user_feedback_message = "⚠️ Sorry, I couldn't retrieve the transcript for that YouTube video. It might be disabled or unavailable." | |
else: | |
user_feedback_message = "⚠️ Couldn't extract a valid YouTube video ID from the link." | |
else: | |
logger.info(f"[Task {task_id}] Attempting website scrape for: {url}") | |
content = await get_website_content_via_requests(url) | |
if not content and URLTOTEXT_API_KEY: | |
logger.info(f"[Task {task_id}] Basic scrape failed or insufficient, trying UrlToText API...") | |
await retry_bot_operation(bot.send_chat_action, chat_id=chat_id, action='typing') | |
content = await get_website_content_via_urltotext_api(url, URLTOTEXT_API_KEY) | |
if not content: | |
user_feedback_message = "⚠️ Sorry, I couldn't fetch or extract meaningful content from that website." | |
# --- Generate Summary if Content Was Fetched --- | |
if content: | |
logger.info(f"[Task {task_id}] Content fetched (length: {len(content)}). Generating '{summary_type}' summary.") | |
await retry_bot_operation(bot.send_chat_action, chat_id=chat_id, action='typing') | |
final_summary = await generate_summary(content, summary_type, OPENROUTER_API_KEY) | |
if final_summary.startswith("Error:") or final_summary.startswith("Sorry,"): | |
user_feedback_message = f"⚠️ {final_summary}" # Use the error from generate_summary | |
else: | |
success = True # Summary generated successfully | |
# If content fetching failed initially, user_feedback_message is already set | |
except Exception as e: | |
logger.error(f"[Task {task_id}] Error during content fetching or summarization: {e}", exc_info=True) | |
user_feedback_message = "❌ An unexpected error occurred while processing your request." | |
# --- Send Final Result or Error --- | |
if success and final_summary: | |
# Split summary if it's too long for one message | |
max_length = 4096 | |
summary_parts = [final_summary[i:i+max_length] for i in range(0, len(final_summary), max_length)] | |
# Send the first part (or the only part) | |
await retry_bot_operation( | |
bot.send_message, | |
chat_id=chat_id, | |
text=summary_parts[0], | |
parse_mode=ParseMode.MARKDOWN, | |
link_preview_options={'is_disabled': True} # Changed to dict format for PTB v21+ | |
) | |
# Send subsequent parts if any | |
for part in summary_parts[1:]: | |
await asyncio.sleep(0.5) # Small delay between parts | |
await retry_bot_operation( | |
bot.send_message, | |
chat_id=chat_id, | |
text=part, | |
parse_mode=ParseMode.MARKDOWN, | |
link_preview_options={'is_disabled': True} | |
) | |
logger.info(f"[Task {task_id}] Successfully sent summary ({len(summary_parts)} parts).") | |
elif user_feedback_message: # Handle errors (either from fetching or summarization) | |
logger.warning(f"[Task {task_id}] Sending feedback/error message: {user_feedback_message}") | |
await retry_bot_operation( | |
bot.send_message, | |
chat_id=chat_id, | |
text=user_feedback_message, | |
link_preview_options={'is_disabled': True} | |
) | |
else: # Should not happen, but safety net | |
logger.error(f"[Task {task_id}] Reached end of task without success or specific error message.") | |
await retry_bot_operation( | |
bot.send_message, | |
chat_id=chat_id, | |
text="❓ Something went wrong, but no specific error was identified.", | |
link_preview_options={'is_disabled': True} | |
) | |
except Exception as e: | |
logger.critical(f"[Task {task_id}] Critical error within task processing: {e}", exc_info=True) | |
try: | |
await retry_bot_operation( | |
bot.send_message, | |
chat_id=chat_id, | |
text="❌ A critical internal error occurred. Please report this if it persists." | |
) | |
except Exception: | |
logger.exception(f"[Task {task_id}] Failed even to send critical error message.") | |
finally: | |
# --- Clean up Status Message --- | |
if status_message_id: | |
try: | |
await retry_bot_operation(bot.delete_message, chat_id=chat_id, message_id=status_message_id) | |
logger.debug(f"[Task {task_id}] Deleted status message {status_message_id}") | |
except Exception as e: | |
logger.warning(f"[Task {task_id}] Failed to delete status message {status_message_id}: {e}") | |
# Ensure bot session is closed | |
if bot and bot.session: | |
try: | |
await bot.shutdown() # Use bot.shutdown() which includes closing the session | |
logger.debug(f"[Task {task_id}] Background bot instance shut down.") | |
except Exception as e: | |
logger.warning(f"[Task {task_id}] Error shutting down background bot instance: {e}") | |
logger.info(f"[Task {task_id}] Task completed. Success: {success}") | |
# --- Telegram Bot Handlers --- | |
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
"""Handles the /start command.""" | |
user = update.effective_user | |
if not user or not update.message: return | |
logger.info(f"User {user.id} initiated /start.") | |
mention = user.mention_html() # mention_html is safer | |
start_message = ( | |
f"👋 Hello {mention}!\n\n" | |
"I can summarise YouTube videos or web articles for you.\n\n" | |
"Just send me a link (URL) and I'll ask you whether you want the summary as a paragraph or bullet points.\n\n" | |
"Type /help for more details." | |
) | |
await update.message.reply_html(start_message) | |
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
"""Handles the /help command.""" | |
user = update.effective_user | |
if not user or not update.message: return | |
logger.info(f"User {user.id} requested /help.") | |
help_text = ( | |
"**How to Use Me:**\n" | |
"1. Send me a direct link (URL) to a YouTube video or a web article.\n" | |
"2. I will ask you to choose the summary format: `Paragraph` or `Points`.\n" | |
"3. Click the button for your preferred format.\n" | |
"4. I'll fetch the content, summarise it using AI, and send it back to you!\n\n" | |
"**Important Notes:**\n" | |
"- **YouTube:** Getting transcripts can sometimes fail if they are disabled or unavailable.\n" | |
"- **Websites:** I try my best, but complex or JavaScript-heavy sites might not work perfectly.\n" | |
"- **AI Summaries:** The AI aims for accuracy but may occasionally miss nuances or make errors.\n" | |
"- **Length Limits:** Very long videos or articles might be truncated before summarization.\n\n" | |
"Just send a link to get started!" | |
) | |
# Use reply_markdown_v2 for safer Markdown parsing if needed, but MARKDOWN should suffice here | |
await update.message.reply_text(help_text, parse_mode=ParseMode.MARKDOWN) | |
async def handle_potential_url(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
"""Handles messages containing potential URLs.""" | |
if not update.message or not update.message.text: return | |
message_text = update.message.text.strip() | |
user = update.effective_user | |
if not user: return | |
# Improved URL regex (simplified for matching, not validation) | |
url_pattern = r'https?://[^\s<>"]+|www\.[^\s<>"]+' | |
match = re.search(url_pattern, message_text) | |
if match: | |
url = match.group(0) | |
# Basic cleanup: remove trailing characters like periods or commas if they likely aren't part of URL | |
url = re.sub(r'[.,!?)\]>]+$', '', url) | |
logger.info(f"User {user.id} sent potential URL: {url}") | |
# Store URL and the original message ID for potential editing | |
context.user_data['url_to_summarize'] = url | |
context.user_data['original_message_id'] = update.message.message_id # Store for potential edit | |
keyboard = [ | |
[ | |
InlineKeyboardButton("📜 Paragraph", callback_data="paragraph"), # Shortened labels | |
InlineKeyboardButton("🔹 Bullet Points", callback_data="points") | |
] | |
] | |
reply_markup = InlineKeyboardMarkup(keyboard) | |
await update.message.reply_text( | |
f"✅ Link received:\n`{url}`\n\nChoose your desired summary format:", | |
reply_markup=reply_markup, | |
parse_mode=ParseMode.MARKDOWN, | |
link_preview_options={'is_disabled': True} | |
) | |
elif not message_text.startswith('/'): | |
# Avoid replying to non-URL, non-command text to prevent noise | |
logger.debug(f"User {user.id} sent non-URL, non-command text: '{message_text[:50]}...'") | |
# Optional: Send a hint if it looks like they *tried* to send a URL | |
if "http" in message_text or "www." in message_text: | |
await update.message.reply_text("Hmm, that looks like it might be a link, but please ensure it starts with `http://` or `https://` and is a valid URL.") | |
# else: do nothing | |
async def handle_summary_type_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
"""Handles button presses for summary type selection.""" | |
query = update.callback_query | |
if not query or not query.message or not query.from_user: | |
logger.warning("Callback query received without essential data.") | |
if query: await query.answer() # Try to answer anyway | |
return | |
user = query.from_user | |
summary_type = query.data | |
query_id = query.id | |
# --- Acknowledge button press --- | |
# This is where the original error occurred. Trying this early. | |
try: | |
await query.answer() | |
logger.debug(f"Acknowledged callback query {query_id} from user {user.id}") | |
except Exception as e: | |
# Log the error but try to continue, as acknowledging isn't strictly critical | |
logger.error(f"Error answering callback query {query_id} from user {user.id}: {e}", exc_info=True) | |
# The original NetworkError might still happen here in problematic environments. | |
# If it persists, it points to deeper issues with httpx/anyio/uvloop/PTB interaction. | |
# --- Retrieve URL and Original Message ID --- | |
url = context.user_data.get('url_to_summarize') | |
# Use the message ID from the callback query's message - this IS the message with buttons | |
message_id_to_edit = query.message.message_id | |
logger.info(f"User {user.id} chose summary type '{summary_type}' for URL associated with message {message_id_to_edit}") | |
if not url: | |
logger.warning(f"No URL found in user_data for user {user.id} (callback query {query_id}). Editing message.") | |
try: | |
await query.edit_message_text(text="⚠️ Oops! I couldn't find the link associated with this request. Please send the link again.") | |
except Exception as e: | |
logger.error(f"Failed to edit message to show 'URL not found' error: {e}") | |
return | |
# --- Clear user_data and schedule task --- | |
context.user_data.pop('url_to_summarize', None) | |
context.user_data.pop('original_message_id', None) # Clear stored ID | |
if not TELEGRAM_TOKEN: | |
logger.critical("TELEGRAM_TOKEN is missing, cannot start background task!") | |
try: | |
await query.edit_message_text(text="❌ Internal configuration error. Cannot process request.") | |
except Exception: pass | |
return | |
logger.info(f"Scheduling background task for user {user.id}, chat {query.message.chat_id}, message {message_id_to_edit}, type {summary_type}") | |
asyncio.create_task( | |
process_summary_task( | |
user_id=user.id, | |
chat_id=query.message.chat_id, | |
message_id_to_edit=message_id_to_edit, # Pass the ID of the message with buttons | |
url=url, | |
summary_type=summary_type, | |
bot_token=TELEGRAM_TOKEN # Pass token | |
), | |
name=f"SummaryTask-{user.id}-{message_id_to_edit}" | |
) | |
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None: | |
"""Log Errors caused by Updates or background tasks.""" | |
logger.error("Exception while handling an update:", exc_info=context.error) | |
# Optionally, inform the user about backend errors if appropriate | |
# Be careful not to spam users if the error is persistent | |
# if isinstance(context.error, (NetworkError, TimedOut)): | |
# # Potentially inform user about temporary issues | |
# pass | |
# elif isinstance(context.error, TelegramError): | |
# # Handle specific Telegram API errors if needed | |
# pass | |
# --- Bot Setup Function --- | |
async def setup_bot_config() -> Application: | |
"""Configures the PTB Application.""" | |
logger.info("Configuring Telegram Application...") | |
if not TELEGRAM_TOKEN: | |
raise ValueError("TELEGRAM_TOKEN environment variable not set.") | |
# Configure httpx client for PTB (timeouts are important) | |
custom_request = HTTPXRequest( | |
connect_timeout=10.0, | |
read_timeout=30.0, # Read timeout for API calls like getMe, getUpdates etc. | |
write_timeout=30.0, | |
pool_timeout=60.0, # Allow keeping connections open longer | |
http_version="1.1" # Stick to 1.1 for wider compatibility | |
) | |
application = ( | |
Application.builder() | |
.token(TELEGRAM_TOKEN) | |
.request(custom_request) | |
# Consider connection_pool_size if expecting high concurrency | |
# .connection_pool_size(10) | |
.build() | |
) | |
# Add handlers | |
application.add_handler(CommandHandler("start", start)) | |
application.add_handler(CommandHandler("help", help_command)) | |
# Use TEXT filter combined with URL entity/filter for better targeting? | |
# For simplicity, keeping the regex check inside the handler for now. | |
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_potential_url)) | |
application.add_handler(CallbackQueryHandler(handle_summary_type_callback)) | |
# Add error handler | |
application.add_error_handler(error_handler) | |
logger.info("Telegram application handlers configured.") | |
return application | |
# --- ASGI Lifespan Context Manager (Mostly Unchanged) --- | |
async def lifespan(app: Starlette): | |
"""Handles PTB startup and shutdown during ASGI lifespan.""" | |
global ptb_app | |
logger.info("ASGI Lifespan: Startup sequence initiated...") | |
if not TELEGRAM_TOKEN: | |
logger.critical("TELEGRAM_TOKEN is not set. Bot cannot start.") | |
raise RuntimeError("Telegram token missing.") | |
bot_info_text = "Bot info not available yet." | |
try: | |
ptb_app = await setup_bot_config() | |
await ptb_app.initialize() # Initialize application resources | |
bot_info = await ptb_app.bot.get_me() | |
bot_info_text = f"@{bot_info.username} (ID: {bot_info.id})" | |
logger.info(f"Bot initialized: {bot_info_text}") | |
# Check for existing webhook and potentially delete it before setting a new one | |
current_webhook_info = await ptb_app.bot.get_webhook_info() | |
if current_webhook_info and current_webhook_info.url: | |
logger.info(f"Found existing webhook: {current_webhook_info.url}. Attempting to delete it.") | |
try: | |
await ptb_app.bot.delete_webhook(drop_pending_updates=True) | |
logger.info("Existing webhook deleted successfully.") | |
except Exception as e: | |
logger.warning(f"Could not delete existing webhook: {e}") | |
await asyncio.sleep(1) # Short pause before setting new one | |
# Determine Webhook URL from Hugging Face environment variables | |
space_host = os.environ.get("SPACE_HOST") | |
webhook_path = "/webhook" # Keep it simple | |
full_webhook_url = None | |
if space_host: | |
# Ensure it starts with https:// | |
if not space_host.startswith("https://"): | |
# HF usually provides full host like 'user-repo.hf.space' | |
# Prepend https:// if missing | |
if not space_host.startswith("http"): | |
full_webhook_url = f"https://{space_host.rstrip('/')}{webhook_path}" | |
else: # If it starts with http://, replace it (HF uses https) | |
full_webhook_url = f"https://{space_host.split('://')[1].rstrip('/')}{webhook_path}" | |
if full_webhook_url: | |
logger.info(f"Attempting to set webhook to: {full_webhook_url}") | |
await asyncio.sleep(2.0) # Give network time to settle | |
try: | |
await ptb_app.bot.set_webhook( | |
url=full_webhook_url, | |
allowed_updates=Update.ALL_TYPES, # Or specify types like ['message', 'callback_query'] | |
drop_pending_updates=True, | |
secret_token=os.environ.get("WEBHOOK_SECRET") # Optional: Add a secret token for security | |
) | |
webhook_info = await ptb_app.bot.get_webhook_info() | |
logger.info(f"Webhook successfully set: URL='{webhook_info.url}', Pending={webhook_info.pending_update_count}") | |
# Start PTB's internal runners *after* setting the webhook | |
await ptb_app.start() | |
logger.info("PTB Application started (webhook mode). Ready for updates.") | |
except Exception as e: | |
logger.error(f"FATAL: Failed to set webhook to {full_webhook_url}: {e}", exc_info=True) | |
# Decide how to handle failure: maybe try polling or just exit? | |
# For HF Spaces webhook is preferred. Raising error might be best. | |
raise RuntimeError(f"Failed to set webhook: {e}") from e | |
else: | |
logger.warning("Could not construct valid HTTPS webhook URL from SPACE_HOST.") | |
# Fallback or error needed here? For HF, webhook is essential. | |
raise RuntimeError("Webhook URL could not be determined.") | |
else: | |
logger.warning("SPACE_HOST environment variable not found. Cannot set webhook.") | |
# Maybe fall back to polling for local dev? Not ideal for HF. | |
# logger.info("Starting PTB in polling mode (no SPACE_HOST found).") | |
# await ptb_app.start() # Start runners | |
# application.run_polling(drop_pending_updates=True) # Start polling - BLOCKING! Not suitable for ASGI app. | |
raise RuntimeError("SPACE_HOST env var missing, cannot run in webhook mode.") | |
logger.info("ASGI Lifespan: Startup complete.") | |
yield # Application runs here | |
except Exception as startup_err: | |
logger.critical(f"Application startup failed: {startup_err}", exc_info=True) | |
# Ensure cleanup happens even if startup fails partially | |
if ptb_app and ptb_app.running: | |
await ptb_app.stop() | |
if ptb_app: | |
await ptb_app.shutdown() | |
raise # Reraise the error to stop the ASGI server | |
finally: | |
logger.info("ASGI Lifespan: Shutdown sequence initiated...") | |
if ptb_app: | |
if ptb_app.running: | |
logger.info("Stopping PTB application...") | |
await ptb_app.stop() | |
logger.info("Shutting down PTB application...") | |
await ptb_app.shutdown() | |
logger.info("PTB Application shut down gracefully.") | |
else: | |
logger.info("PTB application was not initialized or startup failed.") | |
logger.info("ASGI Lifespan: Shutdown complete.") | |
# --- Starlette Route Handlers --- | |
async def health_check(request: Request) -> PlainTextResponse: | |
"""Basic health check endpoint.""" | |
bot_status = "Not Initialized" | |
if ptb_app and ptb_app.bot: | |
try: | |
# Perform a lightweight check like get_me again, or use a flag | |
if ptb_app.running: | |
bot_info = await ptb_app.bot.get_me() # Cached usually | |
bot_status = f"Running (@{bot_info.username})" | |
else: | |
bot_status = "Initialized but not running" | |
except Exception as e: | |
bot_status = f"Error checking status: {e}" | |
return PlainTextResponse(f"Telegram Bot Summarizer - Status: {bot_status}") | |
async def telegram_webhook(request: Request) -> Response: | |
"""Webhook endpoint called by Telegram.""" | |
if not ptb_app: | |
logger.error("Webhook received but PTB application not initialized.") | |
return PlainTextResponse('Bot not initialized', status_code=503) | |
if not ptb_app.running: | |
logger.warning("Webhook received but PTB application not running.") | |
# Maybe return 200 OK anyway so Telegram doesn't retry too much? | |
# Or 503 Service Unavailable | |
return PlainTextResponse('Bot initialized but not running', status_code=503) | |
try: | |
# Check secret token if configured | |
secret_token = os.environ.get("WEBHOOK_SECRET") | |
if secret_token: | |
if request.headers.get("X-Telegram-Bot-Api-Secret-Token") != secret_token: | |
logger.warning("Webhook received with invalid secret token.") | |
return Response(status_code=403) # Forbidden | |
# Process the update | |
update_data = await request.json() | |
update = Update.de_json(data=update_data, bot=ptb_app.bot) | |
logger.debug(f"Processing update_id: {update.update_id}") | |
await ptb_app.process_update(update) | |
return Response(status_code=200) # OK | |
except json.JSONDecodeError: | |
logger.error("Webhook received invalid JSON.") | |
return PlainTextResponse('Bad Request: Invalid JSON', status_code=400) | |
except Exception as e: | |
logger.error(f"Error processing webhook update: {e}", exc_info=True) | |
# Return 200 OK even on errors to prevent Telegram retries if the error is in processing logic | |
# Alternatively return 500 if it's a server-side issue. 200 is often safer for webhooks. | |
return Response(status_code=200) # OK despite error | |
# --- Create Starlette ASGI Application --- | |
app = Starlette( | |
debug=False, # Set debug based on environment if needed | |
lifespan=lifespan, | |
routes=[ | |
Route("/", endpoint=health_check, methods=["GET"]), | |
Route("/webhook", endpoint=telegram_webhook, methods=["POST"]), | |
] | |
) | |
logger.info("Starlette ASGI application created with native routes.") | |
# --- Development Server Execution Block (Optional) --- | |
# This block is generally NOT used when deploying with Gunicorn/Uvicorn | |
# Keep it for potential local testing without gunicorn | |
if __name__ == '__main__': | |
import uvicorn | |
logger.warning("Running in development mode using Uvicorn directly (not for production)") | |
local_port = int(os.environ.get('PORT', 8080)) # Use PORT if set, else 8080 | |
uvicorn.run(app, host='0.0.0.0', port=local_port, log_level="info") |