# main.py - FastAPI application for Pokemon Livestream
import asyncio
import os
import random
import time
import traceback
import logging
from typing import List, Dict, Optional, Set
import html # --- ADDED FOR /last_action --- (HTML escaping)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
# --- Imports for poke_env and agents ---
from poke_env.player import Player
from poke_env import AccountConfiguration, ServerConfiguration
from poke_env.environment.battle import Battle
# Import the actual agent classes
from agents import OpenAIAgent, GeminiAgent, MistralAgent
# --- Configuration ---
CUSTOM_SERVER_URL = "wss://jofthomas.com/showdown/websocket"
CUSTOM_ACTION_URL = 'https://play.pokemonshowdown.com/action.php?'
CUSTOM_BATTLE_VIEW_URL_TEMPLATE = "https://jofthomas.com/play.pokemonshowdown.com/testclient.html#{battle_id}"
custom_config = ServerConfiguration(CUSTOM_SERVER_URL, CUSTOM_ACTION_URL)
DEFAULT_BATTLE_FORMAT = "gen9randombattle"
LAST_ACTION_FILE = "last_action.txt" # --- ADDED FOR /last_action --- (Filename)
# Define available agents with their corresponding classes
AGENT_CONFIGS = {
"OpenAIAgent": {"class": OpenAIAgent, "password_env_var": "OPENAI_AGENT_PASSWORD"},
"GeminiAgent": {"class": GeminiAgent, "password_env_var": "GEMINI_AGENT_PASSWORD"},
"MistralAgent": {"class": MistralAgent, "password_env_var": "MISTRAL_AGENT_PASSWORD"},
}
# Filter out agents with missing passwords
AVAILABLE_AGENT_NAMES = [
name for name, cfg in AGENT_CONFIGS.items()
if os.environ.get(cfg.get("password_env_var", ""))
]
if not AVAILABLE_AGENT_NAMES:
print("FATAL ERROR: No agent configurations have their required password environment variables set. Exiting.")
exit(1)
# --- Global State Variables ---
active_agent_name: Optional[str] = None
active_agent_instance: Optional[Player] = None
active_agent_task: Optional[asyncio.Task] = None
current_battle_instance: Optional[Battle] = None
background_task_handle: Optional[asyncio.Task] = None
# --- Create FastAPI app ---
app = FastAPI(title="Pokemon Battle Livestream")
# --- Helper Functions ---
def get_active_battle(agent: Player) -> Optional[Battle]:
"""Returns the first non-finished battle for an agent."""
if agent and agent._battles:
active_battles = [b for b in agent._battles.values() if not b.finished]
if active_battles:
# Ensure the battle object has a battle_tag before returning
if hasattr(active_battles[0], 'battle_tag') and active_battles[0].battle_tag:
# Check if the battle_tag has the expected format (starts with 'battle-')
if active_battles[0].battle_tag.startswith("battle-"):
return active_battles[0]
else:
# This handles cases where the battle object might exist but tag isn't ready
# print(f"DEBUG: Found active battle for {agent.username} but tag '{active_battles[0].battle_tag}' not ready.")
return None
else:
# print(f"DEBUG: Found active battle for {agent.username} but it has no battle_tag attribute yet.")
return None
return None
def create_battle_iframe(battle_id: str) -> str:
"""Creates JUST the HTML for the battle iframe tag."""
print("Creating iframe content for battle ID: ", battle_id)
# Use the official client URL unless you specifically need the test client
# battle_url = f"https://play.pokemonshowdown.com/{battle_id}"
battle_url = f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#{battle_id}" # Using your custom URL
# Return ONLY the iframe tag with a class for styling
return f"""
"""
def create_idle_html(status_message: str, instruction: str) -> str:
"""Creates a visually appealing idle screen HTML fragment."""
# Returns ONLY the content div, not the full HTML page
return f"""
{status_message}
{instruction}
"""
def create_error_html(error_msg: str) -> str:
"""Creates HTML fragment to display an error message."""
# Returns ONLY the content div, not the full HTML page
return f"""
"""
@app.get("/last_action", response_class=HTMLResponse)
async def get_last_action():
"""
Serves a simple HTML page displaying the content of last_action.txt,
styled for OBS integration.
"""
file_content_raw = ""
error_message = None
try:
# Read the file content fresh on each request
with open(LAST_ACTION_FILE, "r", encoding="utf-8") as f:
file_content_raw = f.read()
except FileNotFoundError:
error_message = f"Error: File '{LAST_ACTION_FILE}' not found."
print(f"WARN: {error_message}") # Log server-side
except Exception as e:
error_message = f"An unexpected error occurred while reading '{LAST_ACTION_FILE}': {e}"
print(f"ERROR: {error_message}") # Log server-side
traceback.print_exc() # Log full traceback for debugging
# Escape the raw content to prevent XSS if the file contains HTML/JS
display_content = html.escape(file_content_raw) if not error_message else error_message
# Use a class to differentiate normal content from error messages for styling
content_class = "error" if error_message else "log-content"
# Create the simple HTML response with updated styles for OBS
html_output = f"""
Last Action Log
{display_content}
"""
return HTMLResponse(content=html_output)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
# Keep connection alive. Client doesn't send messages in this setup.
# FastAPI's WebSocket implementation handles ping/pong internally usually.
# If needed, you could implement explicit keepalive here.
data = await websocket.receive_text()
# We don't expect messages from the client in this design,
# but log if received for debugging.
print(f"Received unexpected message from client: {data}")
# Or simply keep listening:
# await asyncio.sleep(60) # Example keepalive interval if needed
except WebSocketDisconnect as e:
print(f"WebSocket disconnected: Code {e.code}, Reason: {getattr(e, 'reason', 'N/A')}")
await manager.disconnect(websocket) # Use await here
except Exception as e:
# Catch other potential errors on the connection
print(f"WebSocket error: {e}")
traceback.print_exc()
await manager.disconnect(websocket) # Ensure disconnect on error
@app.on_event("startup")
async def startup_event():
"""Start background tasks when the application starts."""
global background_task_handle
# Mount static files directory (make sure 'static' folder exists)
# Place your 'pokemon_huggingface.png' inside this 'static' folder
static_dir = "static"
if not os.path.exists(static_dir):
os.makedirs(static_dir)
print(f"Created static directory at: {os.path.abspath(static_dir)}")
print("!!! Please add 'pokemon_huggingface.png' to this directory! !!!")
app.mount("/static", StaticFiles(directory=static_dir), name="static")
print(f"Mounted static directory '{static_dir}' at '/static'")
# --- ADDED FOR /last_action --- Check if last_action.txt exists ---
if not os.path.exists(LAST_ACTION_FILE):
print(f"WARN: '{LAST_ACTION_FILE}' not found. Creating an empty file.")
try:
with open(LAST_ACTION_FILE, "w", encoding="utf-8") as f:
f.write("No actions recorded yet.")
except Exception as e:
print(f"ERROR: Could not create '{LAST_ACTION_FILE}': {e}")
# --- END ADDED SECTION ---
print("π Starting background tasks")
# Start the main lifecycle manager task
background_task_handle = asyncio.create_task(manage_agent_lifecycle(), name="LifecycleManager")
# Add the exception logging callback
background_task_handle.add_done_callback(log_task_exception)
print("β Background tasks started")
@app.on_event("shutdown")
async def shutdown_event():
"""Clean up tasks when shutting down."""
global background_task_handle, active_agent_instance
print("\nπ Shutting down application. Cleaning up...")
# 1. Cancel the main lifecycle manager task
if background_task_handle and not background_task_handle.done():
print("Cancelling background task...")
background_task_handle.cancel()
try:
await asyncio.wait_for(background_task_handle, timeout=5.0)
print("Background task cancelled successfully.")
except asyncio.CancelledError:
print("Background task cancellation confirmed (CancelledError).")
except asyncio.TimeoutError:
print("Background task did not finish cancelling within timeout.")
except Exception as e:
print(f"Error during background task cancellation: {e}")
# 2. Deactivate and disconnect any currently active agent
# Use a copy of the instance in case it gets cleared elsewhere during shutdown.
agent_to_disconnect = active_agent_instance
if agent_to_disconnect:
agent_name = agent_to_disconnect.username if hasattr(agent_to_disconnect, 'username') else 'Unknown Agent'
print(f"Disconnecting active agent '{agent_name}'...")
try:
# Check websocket status before disconnecting
if hasattr(agent_to_disconnect, '_websocket') and agent_to_disconnect._websocket and agent_to_disconnect._websocket.open:
await agent_to_disconnect.disconnect()
print(f"Agent '{agent_name}' disconnected.")
else:
print(f"Agent '{agent_name}' already disconnected or websocket not available.")
except Exception as e:
print(f"Error during agent disconnect on shutdown for '{agent_name}': {e}")
# 3. Close all active WebSocket connections cleanly
print(f"Closing {len(manager.active_connections)} client WebSocket connections...")
# Create tasks to close all connections concurrently
close_tasks = [
conn.close(code=1000, reason="Server shutting down") # 1000 = Normal Closure
for conn in list(manager.active_connections) # Iterate over a copy
]
if close_tasks:
await asyncio.gather(*close_tasks, return_exceptions=True) # Log potential errors during close
print("β Cleanup complete. Application shutdown.")
# For direct script execution
if __name__ == "__main__":
import uvicorn
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Reduce noise from poke_env's default INFO logging if desired
logging.getLogger('poke_env').setLevel(logging.WARNING)
logging.getLogger('websockets.client').setLevel(logging.INFO) # Show websocket connection attempts
print("Starting Pokemon Battle Livestream Server...")
print("="*60)
if not AVAILABLE_AGENT_NAMES:
print("βββββββββββββββββββββ FATAL ERROR βββββββββββββββββββββ")
print(" No agents found with configured passwords!")
print(" Please set the required environment variables:")
for name, cfg in AGENT_CONFIGS.items():
print(f" - {cfg.get('password_env_var', 'N/A')} (for agent: {name})")
print("="*60)
exit("Exiting due to missing agent passwords.")
else:
print("β¨ Available Agents Found:")
for name in AVAILABLE_AGENT_NAMES:
print(f" - {name}")
print("="*60)
print(f"Server will run on http://0.0.0.0:7860")
print(f"Last action log available at http://0.0.0.0:7860/last_action") # --- ADDED INFO ---
print("="*60)
# Run with uvicorn
uvicorn.run(
"main:app", # Point to the FastAPI app instance
host="0.0.0.0",
port=7860,
reload=False, # Disable reload for production/stable testing
log_level="info" # Uvicorn's log level
)