Spaces:
Sleeping
Sleeping
""" | |
Complete Weather App with AI Integration | |
Full LlamaIndex and Gemini API integration for intelligent conversations | |
""" | |
import gradio as gr | |
import asyncio | |
import logging | |
import os | |
import sys | |
from datetime import datetime | |
import json | |
from typing import Dict, List | |
import plotly.graph_objects as go | |
import pandas as pd | |
import plotly.express as px | |
# Load environment variables from .env file | |
try: | |
from dotenv import load_dotenv | |
load_dotenv() | |
print("β Loaded .env file successfully") | |
except ImportError: | |
print("β οΈ python-dotenv not installed. Install with: pip install python-dotenv") | |
print("π Trying to read environment variables directly...") | |
# Add src to path | |
sys.path.append(os.path.join(os.path.dirname(__file__), 'src')) | |
# Import enhanced modules | |
try: | |
from api.weather_client import create_weather_client | |
from chatbot.nlp_processor import create_nlp_processor | |
from chatbot.enhanced_chatbot import create_enhanced_chatbot | |
from geovisor.map_manager import create_map_manager | |
from analysis.climate_analyzer import create_climate_analyzer | |
except ImportError as e: | |
print(f"Import error: {e}") | |
print("Please check that all required modules are properly installed.") | |
exit(1) | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class WeatherAppProEnhanced: | |
"""Enhanced Weather App with full AI integration""" | |
def __init__(self): | |
"""Initialize the enhanced weather app""" | |
# Get Gemini API key from environment (now supports .env files) | |
self.gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if self.gemini_api_key: | |
print("π€ Gemini API key found - AI features enabled!") | |
print(f"π API key starts with: {self.gemini_api_key[:10]}...") | |
else: | |
print("β οΈ GEMINI_API_KEY not found in environment variables or .env file.") | |
print("π‘ Create a .env file with: GEMINI_API_KEY=your-api-key") | |
print("π App will work in basic mode without AI features.") | |
# Initialize components | |
self.weather_client = create_weather_client() | |
self.nlp_processor = create_nlp_processor() | |
self.enhanced_chatbot = create_enhanced_chatbot( | |
self.weather_client, | |
self.nlp_processor, | |
self.gemini_api_key | |
) | |
self.map_manager = create_map_manager() | |
self.climate_analyzer = create_climate_analyzer(self.weather_client) | |
# Initialize MCP client for enhanced weather intelligence | |
try: | |
from mcp_client.weather_mcp_client import WeatherMCPClient | |
self.mcp_client = WeatherMCPClient( | |
weather_client=self.weather_client, | |
climate_analyzer=self.climate_analyzer | |
) | |
logger.info("MCP client initialized successfully") | |
except Exception as e: | |
logger.warning(f"MCP client initialization failed: {e}") | |
self.mcp_client = None | |
# App state | |
self.current_cities = [] | |
self.chat_history = [] | |
self.last_weather_data = {} | |
logger.info("Enhanced Weather App initialized successfully") | |
async def process_chat_message(self, message: str, history: list) -> tuple: | |
"""Process chat message with enhanced AI""" | |
try: | |
if not message.strip(): | |
return history, "", self._create_default_map(), "Please enter a weather question!" | |
# DEBUG: Check if LLM is enabled | |
llm_status = "π€ LLM Enabled" if self.gemini_api_key else "β οΈ Basic mode (no LLM)" | |
print(f"Processing: '{message}' | {llm_status}") | |
# Add user message to history in messages format | |
history.append({"role": "user", "content": message}) | |
# Process with appropriate method based on AI availability | |
if self.gemini_api_key: | |
result = await self._process_with_ai(message, history) | |
else: | |
result = self._process_basic(message, history) | |
# DEBUG: Show what the processing returned | |
print(f"Processing result keys: {list(result.keys()) if isinstance(result, dict) else 'Not a dict'}") | |
ai_response = result.get('response', 'Sorry, I could not process your request.') | |
cities = result.get('cities', []) | |
weather_data = result.get('weather_data', {}) | |
map_data = result.get('map_data', []) | |
comparison_mode = result.get('comparison_mode', False) | |
# Update app state | |
self.current_cities = cities | |
self.last_weather_data = weather_data | |
# Add AI response to history in messages format | |
history.append({"role": "assistant", "content": ai_response}) | |
# Create updated map | |
if map_data: | |
map_html = self.map_manager.create_weather_map( | |
map_data, | |
comparison_mode=comparison_mode, | |
show_weather_layers=True | |
) | |
else: | |
map_html = self._create_default_map() | |
# Create status message | |
if cities: | |
status = f"π― Found weather data for: {', '.join([c.title() for c in cities])}" | |
if comparison_mode: | |
status += " (Comparison mode active)" | |
else: | |
status = "π¬ General weather assistance" | |
return history, "", map_html, status | |
except Exception as e: | |
logger.error(f"Error processing chat message: {e}") | |
error_response = f"I encountered an error processing your request: {str(e)}" | |
history.append({"role": "assistant", "content": error_response}) | |
return history, "", self._create_default_map(), "β Error processing request" | |
def get_detailed_forecast(self, city_input: str) -> str: | |
"""Get detailed forecast text for a city""" | |
try: | |
if not city_input.strip(): | |
return "Please enter a city name" | |
coords = self.weather_client.geocode_location(city_input) | |
if not coords: | |
return f"City '{city_input}' not found" | |
lat, lon = coords | |
forecast = self.weather_client.get_forecast(lat, lon) | |
if not forecast: | |
return f"No forecast data available for {city_input}" | |
# Create detailed forecast text with emojis | |
forecast_text = f"# π 7-Day Forecast for {city_input.title()}\n\n" | |
for i, period in enumerate(forecast[:7]): # 7-day forecast | |
condition = period.get('shortForecast', 'N/A') | |
weather_emoji = self._get_weather_emoji(condition) | |
temp = period.get('temperature', 'N/A') | |
temp_unit = period.get('temperatureUnit', 'F') | |
wind_speed = period.get('windSpeed', 'N/A') | |
wind_dir = period.get('windDirection', '') | |
precip = period.get('precipitationProbability', 0) | |
# Temperature emoji based on value | |
if isinstance(temp, (int, float)): | |
if temp >= 85: | |
temp_emoji = "π₯" | |
elif temp >= 75: | |
temp_emoji = "π‘οΈ" | |
elif temp >= 60: | |
temp_emoji = "π‘οΈ" | |
elif temp >= 40: | |
temp_emoji = "π§" | |
else: | |
temp_emoji = "βοΈ" | |
else: | |
temp_emoji = "π‘οΈ" | |
# Day/Night emoji | |
day_night_emoji = "βοΈ" if period.get('isDaytime', True) else "π" | |
forecast_text += f"## {day_night_emoji} {period.get('name', f'Period {i+1}')}\n" | |
forecast_text += f"**{temp_emoji} Temperature:** {temp}Β°{temp_unit}\n" | |
forecast_text += f"**{weather_emoji} Conditions:** {condition}\n" | |
forecast_text += f"**π¨ Wind:** {wind_speed} {wind_dir}\n" | |
forecast_text += f"**π§οΈ Rain Chance:** {precip}%\n" | |
# Add detailed forecast with line breaks for readability | |
details = period.get('detailedForecast', 'No details available') | |
if len(details) > 100: | |
details = details[:100] + "..." | |
forecast_text += f"**π Details:** {details}\n\n" | |
forecast_text += "---\n\n" | |
return forecast_text | |
except Exception as e: | |
logger.error(f"Error getting detailed forecast: {e}") | |
return f"Error getting forecast: {str(e)}" | |
def get_weather_alerts(self) -> str: | |
"""Get current weather alerts""" | |
try: | |
alerts = self.weather_client.get_alerts() | |
if not alerts: | |
return "# π’ No Active Weather Alerts\n\nThere are currently no active weather alerts in the system." | |
alerts_text = f"# π¨ Active Weather Alerts ({len(alerts)} alerts)\n\n" | |
for alert in alerts[:10]: # Limit to 10 alerts | |
severity = alert.get('severity', 'Unknown') | |
event = alert.get('event', 'Weather Alert') | |
headline = alert.get('headline', 'No headline available') | |
areas = alert.get('areas', 'Unknown areas') | |
expires = alert.get('expires', 'Unknown expiration') | |
# Color code by severity | |
if severity.lower() == 'severe': | |
icon = "π΄" | |
elif severity.lower() == 'moderate': | |
icon = "π‘" | |
else: | |
icon = "π " | |
alerts_text += f"## {icon} {event}\n" | |
alerts_text += f"**Severity:** {severity}\n" | |
alerts_text += f"**Areas:** {areas}\n" | |
alerts_text += f"**Expires:** {expires}\n" | |
alerts_text += f"**Details:** {headline}\n\n" | |
alerts_text += "---\n\n" | |
return alerts_text | |
except Exception as e: | |
logger.error(f"Error getting weather alerts: {e}") | |
return f"# β Error Getting Alerts\n\nError retrieving weather alerts: {str(e)}" | |
def _create_default_map(self) -> str: | |
"""Create default map view""" | |
try: | |
return self.map_manager.create_weather_map([]) | |
except Exception as e: | |
logger.error(f"Error creating default map: {e}") | |
return """ | |
<div style="width: 100%; height: 400px; background: #2c3e50; color: white; | |
display: flex; align-items: center; justify-content: center; | |
font-family: Arial, sans-serif; border-radius: 10px;"> | |
<div style="text-align: center;"> | |
<h3>πΊοΈ Weather Map</h3> | |
<p>Ask about weather in a city to see it on the map!</p> | |
</div> | |
</div> """ | |
def _create_default_chart(self) -> go.Figure: | |
"""Create default empty chart with professional dark styling""" | |
fig = go.Figure() | |
# Add placeholder data | |
fig.add_trace(go.Scatter( | |
x=['Day 1', 'Day 2', 'Day 3', 'Day 4', 'Day 5', 'Day 6', 'Day 7'], | |
y=[72, 75, 71, 68, 73, 76, 74], | |
mode='lines+markers', | |
name='Temperature (Β°F)', | |
line=dict(color='#ff6b6b', width=3), | |
marker=dict(size=8, color='#ff6b6b') | |
)) | |
fig.add_trace(go.Bar( | |
x=['Day 1', 'Day 2', 'Day 3', 'Day 4', 'Day 5', 'Day 6', 'Day 7'], | |
y=[20, 10, 40, 60, 30, 15, 25], | |
name='Precipitation (%)', | |
yaxis='y2', | |
opacity=0.7, | |
marker_color='#4ecdc4' | |
)) | |
fig.update_layout( | |
title="π 7-Day Weather Forecast", | |
xaxis_title="Days", | |
yaxis_title="Temperature (Β°F)", | |
yaxis2=dict( | |
title="Precipitation (%)", | |
overlaying='y', | |
side='right', | |
range=[0, 100] | |
), | |
template='plotly_dark', | |
height=400, | |
showlegend=True, | |
paper_bgcolor='rgba(0,0,0,0)', | |
plot_bgcolor='rgba(0,0,0,0)', | |
font=dict(color='white'), | |
title_font=dict(size=16, color='white') | |
) | |
return fig | |
def _create_temperature_chart(self, forecast_data) -> go.Figure: | |
"""Create temperature chart from forecast data""" | |
try: | |
# Handle both list and dict formats | |
if isinstance(forecast_data, list): | |
periods = forecast_data[:7] # Direct list of periods | |
elif isinstance(forecast_data, dict): | |
periods = forecast_data.get('periods', [])[:7] # Dict with periods key | |
else: | |
periods = [] | |
if not periods: | |
return self._create_default_chart() | |
times = [p.get('name', f'Day {i+1}') for i, p in enumerate(periods)] | |
temps = [p.get('temperature', 0) for p in periods] | |
fig = go.Figure() | |
fig.add_trace(go.Scatter( | |
x=times, y=temps, | |
mode='lines+markers', | |
name='Temperature', | |
line=dict(color='#3b82f6', width=3), | |
marker=dict(size=8) | |
)) | |
fig.update_layout( | |
title="π‘οΈ 7-Day Temperature Forecast", | |
xaxis_title="Time Period", | |
yaxis_title="Temperature (Β°F)", | |
template="plotly_dark", | |
height=300, | |
paper_bgcolor='rgba(0,0,0,0)', | |
plot_bgcolor='rgba(0,0,0,0)', | |
font=dict(color='white') | |
) | |
return fig | |
except Exception as e: | |
logger.error(f"Error creating temperature chart: {e}") | |
return self._create_default_chart() | |
def _create_precipitation_chart(self, forecast_data) -> go.Figure: | |
"""Create precipitation chart""" | |
try: | |
# Handle both list and dict formats | |
if isinstance(forecast_data, list): | |
periods = forecast_data[:7] # Direct list of periods | |
elif isinstance(forecast_data, dict): | |
periods = forecast_data.get('periods', [])[:7] # Dict with periods key | |
else: | |
periods = [] | |
if not periods: | |
return self._create_default_chart() | |
times = [p.get('name', f'Day {i+1}') for i, p in enumerate(periods)] | |
precip = [p.get('precipitationProbability', 0) or 0 for p in periods] | |
fig = go.Figure() | |
fig.add_trace(go.Bar( | |
x=times, y=precip, | |
name='Precipitation Chance', | |
marker_color='#06b6d4' | |
)) | |
fig.update_layout( | |
title="π§οΈ Precipitation Probability", | |
xaxis_title="Time Period", | |
yaxis_title="Chance of Rain (%)", | |
template="plotly_dark", | |
height=300, | |
paper_bgcolor='rgba(0,0,0,0)', | |
plot_bgcolor='rgba(0,0,0,0)', | |
font=dict(color='white') | |
) | |
return fig | |
except Exception as e: | |
logger.error(f"Error creating precipitation chart: {e}") | |
return self._create_default_chart() | |
def _create_wind_chart(self, forecast_data) -> go.Figure: | |
"""Create wind speed chart""" | |
try: | |
# Handle both list and dict formats | |
if isinstance(forecast_data, list): | |
periods = forecast_data[:7] # Direct list of periods | |
elif isinstance(forecast_data, dict): | |
periods = forecast_data.get('periods', [])[:7] # Dict with periods key | |
else: | |
periods = [] | |
if not periods: | |
return self._create_default_chart() | |
times = [p.get('name', f'Day {i+1}') for i, p in enumerate(periods)] | |
# Extract wind speed from wind string (e.g., "SW 10 mph") | |
winds = [] | |
for p in periods: | |
wind_str = p.get('windSpeed', '0 mph') | |
try: | |
# Extract numeric part from wind speed string | |
import re | |
wind_match = re.search(r'\d+', str(wind_str)) | |
wind_val = int(wind_match.group()) if wind_match else 0 | |
winds.append(wind_val) | |
except: | |
winds.append(0) | |
fig = go.Figure() | |
fig.add_trace(go.Scatter( | |
x=times, y=winds, | |
mode='lines+markers', | |
name='Wind Speed', | |
line=dict(color='#10b981', width=3), | |
marker=dict(size=8) | |
)) | |
fig.update_layout( | |
title="π¬οΈ Wind Speed Forecast", | |
xaxis_title="Time Period", | |
yaxis_title="Wind Speed (mph)", | |
template="plotly_dark", | |
height=300, | |
paper_bgcolor='rgba(0,0,0,0)', | |
plot_bgcolor='rgba(0,0,0,0)', | |
font=dict(color='white') | |
) | |
return fig | |
except Exception as e: | |
logger.error(f"Error creating wind chart: {e}") | |
return self._create_default_chart() | |
def _create_humidity_chart(self, forecast_data) -> go.Figure: | |
"""Create humidity chart""" | |
try: | |
# Handle both list and dict formats | |
if isinstance(forecast_data, list): | |
periods = forecast_data[:7] # Direct list of periods | |
elif isinstance(forecast_data, dict): | |
periods = forecast_data.get('periods', [])[:7] # Dict with periods key | |
else: | |
periods = [] | |
if not periods: | |
return self._create_default_chart() | |
times = [p.get('name', f'Day {i+1}') for i, p in enumerate(periods)] | |
# Try to get real humidity data, or simulate if not available | |
humidity = [] | |
for i, p in enumerate(periods): | |
# Check if humidity data exists | |
rel_humidity = p.get('relativeHumidity') | |
if rel_humidity and isinstance(rel_humidity, dict): | |
humidity_val = rel_humidity.get('value', 60 + (i * 5) % 40) | |
elif rel_humidity and isinstance(rel_humidity, (int, float)): | |
humidity_val = rel_humidity | |
else: | |
# Simulate realistic humidity values (50-90%) | |
humidity_val = 60 + (i * 5) % 40 | |
humidity.append(humidity_val) | |
fig = go.Figure() | |
fig.add_trace(go.Bar( | |
x=times, y=humidity, | |
name='Relative Humidity', | |
marker_color='#8b5cf6' | |
)) | |
fig.update_layout( | |
title="π§ Humidity Levels", | |
xaxis_title="Time Period", | |
yaxis_title="Relative Humidity (%)", | |
template="plotly_dark", | |
height=300, | |
paper_bgcolor='rgba(0,0,0,0)', | |
plot_bgcolor='rgba(0,0,0,0)', | |
font=dict(color='white') | |
) | |
return fig | |
except Exception as e: | |
logger.error(f"Error creating humidity chart: {e}") | |
return self._create_default_chart() | |
def _get_weather_emoji(self, condition: str) -> str: | |
"""Get appropriate emoji for weather condition""" | |
condition_lower = condition.lower() | |
if any(word in condition_lower for word in ['sunny', 'clear', 'fair']): | |
return 'βοΈ' | |
elif any(word in condition_lower for word in ['partly cloudy', 'partly sunny']): | |
return 'β ' | |
elif any(word in condition_lower for word in ['cloudy', 'overcast']): | |
return 'βοΈ' | |
elif any(word in condition_lower for word in ['rain', 'shower', 'drizzle']): | |
return 'π§οΈ' | |
elif any(word in condition_lower for word in ['thunderstorm', 'storm', 'lightning']): | |
return 'βοΈ' | |
elif any(word in condition_lower for word in ['snow', 'snowy', 'blizzard']): | |
return 'βοΈ' | |
elif any(word in condition_lower for word in ['fog', 'mist', 'haze']): | |
return 'π«οΈ' | |
elif any(word in condition_lower for word in ['wind', 'breezy', 'gusty']): | |
return 'π¨' | |
else: | |
return 'π€οΈ' | |
# ...existing code... | |
async def _process_with_ai(self, message: str, history: list) -> dict: | |
"""Process message with full AI integration and MCP enhancement""" | |
try: | |
# Check if query lacks cities but might reference previous context | |
enhanced_message = await self._inject_context_if_needed(message, history) | |
# Use the enhanced chatbot's process_weather_query method for full AI integration | |
result = await self.enhanced_chatbot.process_weather_query( | |
enhanced_message, | |
chat_history=history | |
) | |
# If MCP client is available, enhance the response further | |
if self.mcp_client and result.get('cities'): | |
mcp_enhancements = await self._get_mcp_enhancements( | |
enhanced_message, | |
result.get('cities', []), | |
result.get('weather_data', {}) | |
) | |
if mcp_enhancements: | |
# Integrate MCP data into the response | |
result['response'] = self._integrate_mcp_response( | |
result['response'], | |
mcp_enhancements | |
) | |
result['mcp_enhanced_data'] = mcp_enhancements | |
return result | |
except Exception as e: | |
logger.error(f"Error in enhanced MCP processing: {e}") | |
return { | |
'response': f"I encountered an error processing your request: {str(e)}", | |
'cities': [], | |
'weather_data': {}, | |
'map_data': [], | |
'query_analysis': {}, | |
'map_update_needed': False, | |
'comparison_mode': False, | |
'mcp_enhanced_data': {} | |
} | |
def _process_basic(self, message: str, history: list) -> dict: | |
"""Process message with basic functionality (no AI)""" | |
try: | |
# Parse the query using NLP | |
query_analysis = self.nlp_processor.process_query(message) | |
cities = query_analysis.get('cities', []) | |
query_type = query_analysis.get('query_type', 'general') | |
is_comparison = query_analysis.get('comparison_info', {}).get('is_comparison', False) | |
# Basic city geocoding | |
geocoded_cities = [] | |
weather_data = {} | |
for city in cities: | |
coords = self.weather_client.geocode_location(city) | |
if coords: | |
lat, lon = coords | |
forecast = self.weather_client.get_forecast(lat, lon) | |
current_obs = self.weather_client.get_current_observations(lat, lon) | |
weather_data[city] = { | |
'name': city, | |
'coordinates': coords, | |
'forecast': forecast, | |
'current': current_obs | |
} | |
geocoded_cities.append(city) | |
# Generate basic response | |
basic_response = self._generate_basic_response( | |
message, weather_data, query_analysis | |
) | |
return { | |
'response': basic_response, | |
'cities': geocoded_cities, | |
'weather_data': weather_data, | |
'map_data': self._prepare_map_data(geocoded_cities, weather_data), | |
'query_analysis': query_analysis, | |
'map_update_needed': len(geocoded_cities) > 0, | |
'comparison_mode': is_comparison | |
} | |
except Exception as e: | |
logger.error(f"Error in basic processing: {e}") | |
return { | |
'response': f"I encountered an error: {str(e)}", | |
'cities': [], | |
'weather_data': {}, | |
'map_data': [], | |
'query_analysis': {}, | |
'map_update_needed': False, | |
'comparison_mode': False | |
} | |
async def _geocode_with_disambiguation(self, city: str) -> dict: | |
"""Enhanced geocoding with disambiguation for multiple matches""" | |
try: | |
# Common city disambiguation patterns | |
city_mappings = { | |
'wichita': 'Wichita, KS', # Default to Kansas | |
'portland': 'Portland, OR', # Default to Oregon | |
'springfield': 'Springfield, IL', # Default to Illinois | |
'columbia': 'Columbia, SC', # Default to South Carolina | |
'franklin': 'Franklin, TN', # Default to Tennessee | |
'manchester': 'Manchester, NH', # Default to New Hampshire | |
'canton': 'Canton, OH', # Default to Ohio | |
'auburn': 'Auburn, AL', # Default to Alabama | |
} | |
# Check for disambiguation | |
city_lower = city.lower().strip() | |
if city_lower in city_mappings: | |
disambiguated_city = city_mappings[city_lower] | |
coords = self.weather_client.geocode_location(disambiguated_city) | |
else: | |
coords = self.weather_client.geocode_location(city) | |
if not coords: | |
return None | |
lat, lon = coords | |
# Get comprehensive weather data | |
forecast = self.weather_client.get_forecast(lat, lon) | |
current_obs = self.weather_client.get_current_observations(lat, lon) | |
return { | |
'name': city, | |
'original_name': city, | |
'disambiguated_name': city_mappings.get(city_lower, city), | |
'coordinates': coords, | |
'lat': lat, | |
'lon': lon, | |
'forecast': forecast, | |
'current': current_obs | |
} | |
except Exception as e: | |
logger.error(f"Error geocoding {city}: {e}") | |
return None | |
async def _generate_contextual_response(self, message: str, weather_data: dict, query_analysis: dict) -> str: | |
"""Generate AI-powered contextual response with weather data""" | |
try: | |
# Format weather context for AI | |
weather_context = self._format_weather_context_for_ai(weather_data, query_analysis) | |
# Enhanced prompt with detailed weather data | |
enhanced_prompt = f""" | |
User Query: {message} | |
Current Weather Data: | |
{weather_context} | |
Query Analysis: | |
- Cities mentioned: {query_analysis.get('cities', [])} | |
- Query type: {query_analysis.get('query_type', 'general')} | |
- Is comparison: {query_analysis.get('comparison_info', {}).get('is_comparison', False)} | |
Please provide a helpful, accurate, and engaging response about the weather. | |
Include specific data from the weather information provided. | |
If comparing cities, highlight key differences with specific numbers. | |
Offer practical advice or insights when relevant. | |
Be conversational and friendly. | |
""" | |
# Use the enhanced chatbot to generate response | |
if self.enhanced_chatbot.chat_engine: | |
response = await self.enhanced_chatbot._get_llamaindex_response(enhanced_prompt) | |
elif self.enhanced_chatbot.llm: | |
response = await self.enhanced_chatbot._get_direct_llm_response(enhanced_prompt) | |
elif self.gemini_api_key: | |
response = await self.enhanced_chatbot._get_gemini_response(enhanced_prompt) | |
else: | |
response = self._generate_basic_response(message, weather_data, query_analysis) | |
return response | |
except Exception as e: | |
logger.error(f"Error generating contextual response: {e}") | |
return self._generate_basic_response(message, weather_data, query_analysis) | |
def _format_weather_context_for_ai(self, weather_data: dict, query_analysis: dict) -> str: | |
"""Format weather data for AI context with rich details""" | |
if not weather_data: | |
return "No weather data available." | |
context_parts = [] | |
for city, data in weather_data.items(): | |
forecast = data.get('forecast', []) | |
current = data.get('current', {}) | |
city_context = f"\n{city.title()}:" | |
if forecast: | |
current_period = forecast[0] | |
temp = current_period.get('temperature', 'N/A') | |
temp_unit = current_period.get('temperatureUnit', 'F') | |
conditions = current_period.get('shortForecast', 'N/A') | |
wind_speed = current_period.get('windSpeed', 'N/A') | |
wind_dir = current_period.get('windDirection', '') | |
precip = current_period.get('precipitationProbability', 0) | |
city_context += f""" | |
- Current Temperature: {temp}Β°{temp_unit} | |
- Conditions: {conditions} | |
- Wind: {wind_speed} {wind_dir} | |
- Precipitation Chance: {precip}% | |
- Detailed Forecast: {current_period.get('detailedForecast', 'N/A')[:200]}... | |
""" | |
# Add next few periods for context | |
if len(forecast) > 1: | |
city_context += "\n Next periods:" | |
for i, period in enumerate(forecast[1:4], 1): | |
name = period.get('name', f'Period {i+1}') | |
temp = period.get('temperature', 'N/A') | |
conditions = period.get('shortForecast', 'N/A') | |
city_context += f"\n - {name}: {temp}Β°F, {conditions}" | |
if current: | |
temp_c = current.get('temperature') | |
if temp_c: | |
temp_f = (temp_c * 9/5) + 32 | |
city_context += f"\n- Observed Temperature: {temp_f:.1f}Β°F" | |
humidity = current.get('relativeHumidity', {}) | |
if isinstance(humidity, dict): | |
humidity_val = humidity.get('value') | |
if humidity_val: | |
city_context += f"\n- Humidity: {humidity_val}%" | |
context_parts.append(city_context) | |
return "\n".join(context_parts) | |
def _generate_basic_response(self, message: str, weather_data: dict, query_analysis: dict) -> str: | |
"""Generate basic response without AI""" | |
cities = query_analysis.get('cities', []) | |
query_type = query_analysis.get('query_type', 'general') | |
is_comparison = query_analysis.get('comparison_info', {}).get('is_comparison', False) | |
if not cities: | |
return "I'd be happy to help with weather information! Please specify a city you're interested in." | |
if len(cities) == 1: | |
city = cities[0] | |
if city in weather_data: | |
return self._generate_single_city_response(city, weather_data[city], query_type) | |
else: | |
return f"I couldn't find weather data for {city.title()}. Please check the city name." | |
elif is_comparison and len(cities) >= 2: | |
return self._generate_comparison_response(cities, weather_data, query_type) | |
return "I can help you with weather information for US cities. Try asking about temperature, conditions, or comparing cities!" | |
def _generate_single_city_response(self, city: str, city_data: dict, query_type: str) -> str: | |
"""Generate response for single city""" | |
forecast = city_data.get('forecast', []) | |
if not forecast: | |
return f"Sorry, I couldn't get weather data for {city.title()}." | |
current = forecast[0] | |
temp = current.get('temperature', 'N/A') | |
temp_unit = current.get('temperatureUnit', 'F') | |
conditions = current.get('shortForecast', 'N/A') | |
wind = current.get('windSpeed', 'N/A') | |
wind_dir = current.get('windDirection', '') | |
precip = current.get('precipitationProbability', 0) | |
if query_type == 'temperature': | |
return f"π‘οΈ The current temperature in **{city.title()}** is **{temp}Β°{temp_unit}**. Conditions are {conditions.lower()}." | |
elif query_type == 'precipitation': | |
return f"π§οΈ In **{city.title()}**, there's a **{precip}% chance of precipitation**. Current conditions: {conditions}." | |
elif query_type == 'wind': | |
return f"π¨ Wind in **{city.title()}** is **{wind} {wind_dir}**. Current conditions: {conditions}." | |
else: # general | |
return f""" | |
π€οΈ **Weather in {city.title()}:** | |
- **Temperature:** {temp}Β°{temp_unit} | |
- **Conditions:** {conditions} | |
- **Wind:** {wind} {wind_dir} | |
- **Rain Chance:** {precip}% | |
*I've updated the map to show {city.title()}. Click the marker for more details!* | |
""" | |
def _generate_comparison_response(self, cities: list, weather_data: dict, query_type: str) -> str: | |
"""Generate response for city comparison""" | |
if len(cities) < 2: | |
return "I need at least two cities to make a comparison." | |
comparison_text = f"Weather comparison between {' and '.join([c.title() for c in cities])}:\n\n" | |
for city in cities: | |
if city in weather_data: | |
forecast = weather_data[city].get('forecast', []) | |
if forecast: | |
current = forecast[0] | |
temp = current.get('temperature', 'N/A') | |
conditions = current.get('shortForecast', 'N/A') | |
precip = current.get('precipitationProbability', 0) | |
comparison_text += f"π **{city.title()}**: {temp}Β°F, {conditions}, {precip}% rain chance\n" | |
comparison_text += "\n*Check the map to see both locations with detailed weather data!*" | |
return comparison_text | |
def _generate_fallback_response(self, message: str, query_analysis: dict) -> str: | |
"""Generate fallback response when no weather data is available""" | |
cities = query_analysis.get('cities', []) | |
if cities: | |
return f"I couldn't find weather data for {', '.join([c.title() for c in cities])}. Please check the city names and try again." | |
else: | |
return "I can help you with weather information for US cities. Please mention a city you're interested in!" | |
def _prepare_map_data(self, cities: list, weather_data: dict) -> list: | |
"""Prepare data for map visualization""" | |
map_data = [] | |
for city in cities: | |
if city in weather_data: | |
city_data = weather_data[city] | |
coords = city_data.get('coordinates') | |
forecast = city_data.get('forecast', []) | |
if coords and forecast: | |
lat, lon = coords | |
map_data.append({ | |
'name': city, | |
'lat': lat, | |
'lon': lon, | |
'forecast': forecast | |
}) | |
return map_data | |
async def _inject_context_if_needed(self, message: str, history: list) -> str: | |
"""Inject previous city context if the query lacks explicit cities but seems weather-related""" | |
try: | |
# Quick NLP analysis to see if message has cities | |
query_analysis = self.nlp_processor.process_query(message) | |
cities_in_query = query_analysis.get('cities', []) | |
# If query already has cities, no context injection needed | |
if cities_in_query: | |
return message | |
# Check if this looks like a follow-up weather query | |
weather_related_keywords = [ | |
'weather', 'temperature', 'rain', 'snow', 'wind', 'forecast', | |
'conditions', 'humidity', 'precipitation', 'cloudy', 'sunny', | |
'hot', 'cold', 'warm', 'cool', 'transport', 'transportation', | |
'recommended', 'should i', 'can i', 'good for', 'advice', | |
'biking', 'walking', 'driving', 'outdoor', 'activity' | |
] | |
message_lower = message.lower() | |
is_weather_related = any(keyword in message_lower for keyword in weather_related_keywords) | |
if not is_weather_related: | |
return message | |
# Look for the most recent cities mentioned in conversation history | |
recent_cities = [] | |
# Search through recent conversation history (last 10 messages) | |
for entry in reversed(history[-10:]): | |
if isinstance(entry, dict) and entry.get('role') == 'user': | |
content = entry.get('content', '') | |
elif isinstance(entry, list) and len(entry) >= 1: | |
content = entry[0] # User message in [user, assistant] format | |
else: | |
continue | |
# Extract cities from this historical message | |
historical_analysis = self.nlp_processor.process_query(content) | |
historical_cities = historical_analysis.get('cities', []) | |
if historical_cities: | |
recent_cities.extend(historical_cities) | |
break # Use the most recent cities found | |
# If we found recent cities, inject them into the current query | |
if recent_cities: | |
# Remove duplicates while preserving order | |
unique_cities = [] | |
for city in recent_cities: | |
if city not in unique_cities: | |
unique_cities.append(city) | |
# Inject context into the message | |
cities_context = ", ".join(unique_cities[:2]) # Limit to 2 most recent cities | |
enhanced_message = f"{message} (referring to {cities_context})" | |
logger.info(f"Context injection: '{message}' -> '{enhanced_message}'") | |
return enhanced_message | |
return message | |
except Exception as e: | |
logger.error(f"Error in context injection: {e}") | |
return message | |
async def _get_mcp_enhancements(self, message: str, cities: list, weather_data: dict) -> dict: | |
"""Get MCP tool enhancements for the response""" | |
if not self.mcp_client: | |
return {} | |
enhancements = {} | |
message_lower = message.lower() | |
try: | |
# Connect MCP client if not connected | |
if not self.mcp_client.is_connected: | |
await self.mcp_client.connect() | |
# Determine which MCP tools to use based on query | |
tools_to_use = [] | |
if any(keyword in message_lower for keyword in ['historical', 'history', 'past', 'compare to last year', 'last month']): | |
tools_to_use.append('get_historical_weather') | |
if any(keyword in message_lower for keyword in ['air quality', 'pollution', 'aqi', 'air']): | |
tools_to_use.append('get_air_quality_data') | |
if any(keyword in message_lower for keyword in ['severe', 'storm', 'warning', 'alert', 'tornado', 'hurricane']): | |
tools_to_use.append('get_severe_weather_outlook') | |
if any(keyword in message_lower for keyword in ['travel', 'trip', 'activity', 'outdoor', 'vacation']): | |
tools_to_use.append('get_weather_forecast') | |
if any(keyword in message_lower for keyword in ['marine', 'ocean', 'sea', 'coastal', 'beach']): | |
tools_to_use.append('get_marine_weather') | |
if any(keyword in message_lower for keyword in ['climate', 'trend', 'pattern', 'analysis']): | |
tools_to_use.append('analyze_climate_patterns') | |
# Execute MCP tools | |
for tool_name in tools_to_use: | |
for city in cities[:2]: # Limit to first 2 cities to avoid rate limits | |
try: | |
params = self._prepare_mcp_params(tool_name, city, weather_data.get(city, {})) | |
# Call the appropriate tool method directly | |
if tool_name == 'get_historical_weather': | |
result = await self.mcp_client._get_historical_weather(params) | |
elif tool_name == 'get_air_quality_data': | |
result = await self.mcp_client._get_air_quality_data(params) | |
elif tool_name == 'get_severe_weather_outlook': | |
result = await self.mcp_client._get_severe_weather_outlook(params) | |
elif tool_name == 'get_weather_forecast': | |
result = await self.mcp_client._get_weather_forecast(params) | |
elif tool_name == 'get_marine_weather': | |
result = await self.mcp_client._get_marine_conditions(params) | |
elif tool_name == 'analyze_climate_patterns': | |
result = await self.mcp_client._analyze_climate_patterns(params) | |
else: | |
continue | |
if result and not result.get('error'): | |
if city not in enhancements: | |
enhancements[city] = {} | |
enhancements[city][tool_name] = result | |
logger.info(f"Got MCP enhancement for {city}: {tool_name}") | |
except Exception as e: | |
logger.warning(f"MCP tool {tool_name} failed for {city}: {e}") | |
continue | |
return enhancements | |
except Exception as e: | |
logger.error(f"Error getting MCP enhancements: {e}") | |
return {} | |
def _prepare_mcp_params(self, tool_name: str, city: str, city_data: dict) -> dict: | |
"""Prepare parameters for MCP tool calls""" | |
coords = city_data.get('coordinates') | |
base_params = {'city': city} | |
if coords: | |
base_params['location'] = f"{coords[0]},{coords[1]}" | |
if tool_name == 'get_historical_weather': | |
base_params.update({ | |
'date': '2024-06-01', # Example date for historical data | |
'data_type': 'temperature,precipitation' | |
}) | |
elif tool_name == 'get_severe_weather_outlook': | |
base_params.update({ | |
'region': city, | |
'outlook_period': '3_day' | |
}) | |
elif tool_name == 'get_weather_forecast': | |
base_params.update({ | |
'days': 7, | |
'include_hourly': True | |
}) | |
elif tool_name == 'get_marine_weather': | |
base_params.update({ | |
'location': city, | |
'data_type': 'waves,wind,visibility' | |
}) | |
elif tool_name == 'analyze_climate_patterns': | |
base_params.update({ | |
'location': city, | |
'analysis_type': 'temperature_trends', | |
'time_period': '30_days' | |
}) | |
elif tool_name == 'get_air_quality_data': | |
base_params.update({ | |
'pollutants': ['pm2.5', 'ozone', 'co'] }) | |
return base_params | |
def _integrate_mcp_response(self, base_response: str, mcp_data: dict) -> str: | |
"""Integrate MCP tool results into the base response""" | |
if not mcp_data: | |
return base_response | |
enhanced_response = base_response + "\n\n" | |
for city, city_mcp_data in mcp_data.items(): | |
if city_mcp_data: | |
enhanced_response += f"## π§ Enhanced Analysis for {city.title()}\n\n" | |
for tool_name, tool_result in city_mcp_data.items(): | |
if tool_name == 'get_historical_weather': | |
# Extract and format historical weather data properly | |
if isinstance(tool_result, dict) and 'result' in tool_result: | |
hist_data = tool_result['result'] | |
if 'historical_data' in hist_data: | |
hist_info = hist_data['historical_data'] | |
city_name = hist_data.get('city', city) # Use city from result or fallback | |
enhanced_response += f"π **Historical Context for {city_name.title() if city_name else city.title()}:**\n" | |
enhanced_response += f"β’ 30-day average: {hist_info.get('average_temperature', 'N/A')}Β°F\n" | |
enhanced_response += f"β’ Temperature range: {hist_info.get('temperature_range', {}).get('min', 'N/A')}Β°F to {hist_info.get('temperature_range', {}).get('max', 'N/A')}Β°F\n" | |
enhanced_response += f"β’ Precipitation days: {hist_info.get('precipitation_days', 'N/A')} out of 30\n" | |
enhanced_response += f"β’ Conditions vs normal: {hist_info.get('comparison_to_normal', 'N/A')}\n\n" | |
else: | |
enhanced_response += f"π **Historical Context:** {str(tool_result)[:200]}...\n\n" | |
else: | |
enhanced_response += f"π **Historical Context:** {str(tool_result)[:200]}...\n\n" | |
elif tool_name == 'get_air_quality_data': | |
# Use formatted response if available | |
if isinstance(tool_result, dict) and 'result' in tool_result: | |
result_data = tool_result['result'] | |
if 'formatted_response' in result_data: | |
enhanced_response += f"{result_data['formatted_response']}\n\n" | |
else: | |
enhanced_response += f"π¬οΈ **Air Quality:** {str(tool_result)[:150]}...\n\n" | |
else: | |
enhanced_response += f"π¬οΈ **Air Quality:** {str(tool_result)[:150]}...\n\n" | |
elif tool_name == 'get_severe_weather_outlook': | |
enhanced_response += f"β οΈ **Severe Weather Outlook:** {str(tool_result)[:200]}...\n\n" | |
elif tool_name == 'get_weather_forecast': | |
enhanced_response += f"πΊοΈ **Detailed Forecast:** {str(tool_result)[:200]}...\n\n" | |
elif tool_name == 'get_marine_weather': | |
enhanced_response += f"π **Marine Conditions:** {str(tool_result)[:150]}...\n\n" | |
elif tool_name == 'analyze_climate_patterns': | |
enhanced_response += f"π **Climate Analysis:** {str(tool_result)[:200]}...\n\n" | |
return enhanced_response | |
def create_interface(self): | |
"""Create the enhanced Gradio interface with integrated forecast functionality""" | |
# Enhanced custom CSS for premium dark theme and modern UI | |
custom_css = """ | |
.gradio-container { | |
background: linear-gradient(135deg, #0f172a, #1e293b, #334155) !important; | |
min-height: 100vh; | |
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif !important; | |
} | |
.gr-button { | |
background: linear-gradient(45deg, #3b82f6, #6366f1, #8b5cf6) !important; | |
border: none !important; | |
border-radius: 12px !important; | |
color: white !important; | |
font-weight: 600 !important; | |
box-shadow: 0 4px 15px rgba(59, 130, 246, 0.3) !important; | |
transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1) !important; | |
padding: 12px 24px !important; | |
font-size: 14px !important; | |
} | |
.gr-button:hover { | |
transform: translateY(-2px) scale(1.02) !important; | |
box-shadow: 0 8px 25px rgba(59, 130, 246, 0.4) !important; | |
background: linear-gradient(45deg, #2563eb, #4f46e5, #7c3aed) !important; | |
} | |
.gr-textbox { | |
background: rgba(15, 23, 42, 0.8) !important; | |
border: 2px solid rgba(59, 130, 246, 0.3) !important; | |
color: #e2e8f0 !important; | |
border-radius: 12px !important; | |
backdrop-filter: blur(10px) !important; | |
transition: all 0.3s ease !important; | |
} | |
.gr-textbox:focus { | |
border-color: rgba(59, 130, 246, 0.6) !important; | |
box-shadow: 0 0 0 3px rgba(59, 130, 246, 0.1) !important; | |
} | |
.gr-chatbot { | |
background: rgba(15, 23, 42, 0.6) !important; | |
border-radius: 16px !important; | |
border: 2px solid rgba(59, 130, 246, 0.2) !important; | |
backdrop-filter: blur(20px) !important; | |
box-shadow: inset 0 2px 10px rgba(0, 0, 0, 0.3) !important; | |
} | |
.premium-header { | |
background: linear-gradient(135deg, #1e293b, #334155, #475569); | |
color: white; | |
padding: 40px 30px; | |
border-radius: 20px; | |
margin-bottom: 30px; | |
text-align: center; | |
box-shadow: 0 20px 40px rgba(0, 0, 0, 0.4); | |
border: 1px solid rgba(59, 130, 246, 0.2); | |
position: relative; | |
overflow: hidden; | |
} | |
.premium-header::before { | |
content: ''; | |
position: absolute; | |
top: 0; | |
left: 0; | |
right: 0; | |
height: 2px; | |
background: linear-gradient(90deg, #3b82f6, #6366f1, #8b5cf6); | |
} | |
.section-container { | |
background: rgba(15, 23, 42, 0.4) !important; | |
border-radius: 16px !important; | |
padding: 25px !important; | |
margin: 20px 0 !important; | |
border: 2px solid rgba(59, 130, 246, 0.15) !important; | |
backdrop-filter: blur(20px) !important; | |
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.3) !important; | |
transition: all 0.3s ease !important; | |
} | |
.section-container:hover { | |
border-color: rgba(59, 130, 246, 0.3) !important; | |
transform: translateY(-2px) !important; | |
box-shadow: 0 12px 40px rgba(0, 0, 0, 0.4) !important; | |
} | |
.chart-container { | |
background: rgba(15, 23, 42, 0.6) !important; | |
border-radius: 12px !important; | |
padding: 15px !important; | |
border: 1px solid rgba(59, 130, 246, 0.2) !important; | |
backdrop-filter: blur(10px) !important; | |
} | |
.feature-grid { | |
display: grid; | |
grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); | |
gap: 20px; | |
margin: 20px 0; | |
} | |
.feature-card { | |
background: rgba(15, 23, 42, 0.6); | |
border-radius: 12px; | |
padding: 20px; | |
border: 1px solid rgba(59, 130, 246, 0.2); | |
backdrop-filter: blur(10px); | |
transition: all 0.3s ease; | |
} | |
.feature-card:hover { | |
transform: translateY(-4px); | |
border-color: rgba(59, 130, 246, 0.4); | |
box-shadow: 0 8px 25px rgba(0, 0, 0, 0.3); | |
} | |
.sync-indicator { | |
background: linear-gradient(45deg, #10b981, #059669); | |
color: white; | |
padding: 10px 20px; | |
border-radius: 25px; | |
font-size: 12px; | |
font-weight: 600; | |
text-align: center; | |
margin: 15px 0; | |
box-shadow: 0 4px 15px rgba(16, 185, 129, 0.3); | |
display: inline-block; | |
} | |
.section-title { | |
font-size: 20px; | |
font-weight: 700; | |
color: #60a5fa; | |
margin-bottom: 20px; | |
display: flex; | |
align-items: center; | |
gap: 10px; | |
} | |
.status-badge { | |
background: linear-gradient(45deg, #f59e0b, #d97706); | |
color: white; | |
padding: 6px 12px; | |
border-radius: 20px; | |
font-size: 11px; | |
font-weight: 600; | |
text-transform: uppercase; | |
letter-spacing: 0.5px; | |
} | |
.ai-badge { | |
background: linear-gradient(45deg, #8b5cf6, #7c3aed); | |
color: white; | |
padding: 6px 12px; | |
border-radius: 20px; | |
font-size: 11px; | |
font-weight: 600; | |
text-transform: uppercase; | |
letter-spacing: 0.5px; | |
} | |
.forecast-details-markdown { | |
font-size: 0.85rem !important; /* Smaller text */ | |
line-height: 1.4 !important; /* Adjust line spacing */ | |
} | |
.forecast-details-markdown p { | |
margin-bottom: 0.5em !important; /* Smaller paragraph spacing */ | |
} | |
.forecast-details-markdown strong { | |
font-weight: 600 !important; /* Ensure strong is still visible */ | |
} | |
/* Mobile responsiveness */ | |
@media (max-width: 768px) { | |
.premium-header { | |
padding: 25px 20px; | |
} | |
.section-container { | |
padding: 20px; | |
margin: 15px 0; | |
} | |
.feature-grid { | |
grid-template-columns: 1fr; | |
gap: 15px; | |
} | |
} | |
""" | |
with gr.Blocks(css=custom_css, title="π€οΈ Weather App", theme=gr.themes.Base()) as app: | |
# Premium Header | |
mcp_status = "π§ MCP ACTIVE" if self.mcp_client else "β οΈ MCP OFFLINE" | |
mcp_badge_class = "ai-badge" if self.mcp_client else "status-badge" | |
gr.HTML(f""" | |
<div class="premium-header"> | |
<h1 style="font-size: 2.5rem; margin-bottom: 15px; font-weight: 800;"> | |
π€οΈ Weather App | |
</h1> | |
<p style="font-size: 1.2rem; margin-bottom: 10px; opacity: 0.9;"> | |
π€ AI-Powered Weather Intelligence Platform | |
</p> | |
<div style="display: flex; justify-content: center; gap: 10px; margin: 20px 0; flex-wrap: wrap;"> | |
<span class="ai-badge">π§ AI ENABLED</span> | |
<span class="status-badge">π SYNC ACTIVE</span> | |
<span class="status-badge">π REAL-TIME</span> | |
<span class="{mcp_badge_class}">{mcp_status}</span> | |
</div> | |
<p style="font-size: 1rem; opacity: 0.8; max-width: 600px; margin: 0 auto;"> | |
<strong>β¨ Experience intelligent weather conversations with enhanced MCP tools β¨</strong><br> | |
Ask about historical data, air quality, severe weather, and travel advice | |
</p> </div> | |
""") | |
# Main AI Weather Assistant Section (Chat and Map) | |
with gr.Group(elem_classes=["section-container"]): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.HTML('<div class="section-title">π¬ AI Weather Assistant</div>') | |
chatbot = gr.Chatbot( | |
label="Weather Assistant π€", | |
elem_id="weather_chatbot", | |
height=600, | |
show_label=False | |
) | |
with gr.Row(): | |
msg_input = gr.Textbox( | |
placeholder="Ask about weather: 'Compare rain in Seattle and Portland' or 'What's the forecast for Miami?'", | |
label="Your Question", | |
scale=4, | |
elem_classes=["dark"] | |
) | |
send_btn = gr.Button("π Send", scale=1) | |
clear_btn = gr.Button("ποΈ Clear Chat") | |
with gr.Column(scale=1): | |
gr.HTML('<div class="section-title">πΊοΈ Dynamic Weather Map</div>') | |
weather_map = gr.HTML( | |
value=self._get_current_map(), | |
label="Interactive Map" | |
) | |
# Professional Weather Forecasts Section - NOW ABOVE TABS | |
with gr.Group(elem_classes=["section-container"]): | |
gr.HTML('<div class="section-title">π Professional Weather Forecasts</div>') | |
gr.HTML('<div class="sync-indicator">π Auto-synced with AI Assistant</div>') | |
gr.Markdown("Get comprehensive 7-day forecasts with professional charts, detailed analysis, and weather emojis") | |
with gr.Row(): | |
forecast_city_input = gr.Textbox( | |
placeholder="Enter city name (e.g., New York, Los Angeles, Chicago)", | |
label="ποΈ City for Detailed Forecast", | |
scale=3 | |
) | |
get_forecast_btn = gr.Button("π Get Forecast", scale=1) | |
# Horizontal layout for forecast text and charts | |
with gr.Row(): | |
with gr.Column(scale=1): # Column for forecast text | |
forecast_output = gr.Markdown( | |
"Enter a city name above to see detailed forecast with weather emojis! π€οΈ", | |
elem_classes=["forecast-details-markdown"] # Apply new CSS class | |
) | |
with gr.Column(scale=2): # Column for charts (2x2 grid) | |
with gr.Row(): | |
with gr.Column(elem_classes=["chart-container"]): | |
temp_chart = gr.Plot(label="π‘οΈ Temperature Analysis") | |
with gr.Column(elem_classes=["chart-container"]): | |
precip_chart = gr.Plot(label="π§οΈ Precipitation Forecast") | |
with gr.Row(): | |
with gr.Column(elem_classes=["chart-container"]): | |
wind_chart = gr.Plot(label="π¬οΈ Wind Analysis") # New chart | |
with gr.Column(elem_classes=["chart-container"]): | |
humidity_chart = gr.Plot(label="π§ Humidity Levels") # New chart | |
# Tabs for Alerts and About - POSITIONED AT THE VERY BOTTOM | |
with gr.Tabs(elem_classes=["section-container"]): | |
with gr.TabItem("π¨ Live Weather Alerts", elem_id="alerts-tab"): | |
gr.HTML('<div class="section-title">π¨ Live Weather Alerts</div>') | |
with gr.Row(): | |
refresh_alerts_btn = gr.Button("π Refresh Alerts") | |
alerts_output = gr.Markdown("Click 'Refresh Alerts' to see current weather alerts.") | |
with gr.TabItem("βΉοΈ About Weather App", elem_id="about-tab"): | |
gr.HTML('<div class="section-title">βΉοΈ About Weather App</div>') | |
gr.HTML(""" | |
<div class="feature-grid"> | |
<div class="feature-card"> | |
<h3>π€ AI-Powered Chat</h3> | |
<p>Natural language processing with Google Gemini for intelligent weather conversations</p> | |
</div> | |
<div class="feature-card"> | |
<h3>π Professional Charts</h3> | |
<p>Interactive visualizations with temperature trends and precipitation forecasts</p> | |
</div> | |
<div class="feature-card"> | |
<h3>πΊοΈ Dynamic Maps</h3> | |
<p>Real-time weather visualization with auto-zoom and comparison features</p> | |
</div> | |
<div class="feature-card"> | |
<h3>π Smart Sync</h3> | |
<p>Automatic synchronization between chat and forecast sections</p> | |
</div> | |
<div class="feature-card"> | |
<h3>β οΈ Live Alerts</h3> | |
<p>Real-time weather alert monitoring and notifications</p> | |
</div> | |
<div class="feature-card"> | |
<h3>π‘οΈ Detailed Forecasts</h3> | |
<p>Comprehensive 7-day forecasts with weather emojis and analysis</p> | |
</div> | |
</div> | |
""") | |
gr.HTML(""" | |
<div style="text-align: center; margin-top: 30px; padding: 20px; background: rgba(59, 130, 246, 0.1); border-radius: 12px; border: 1px solid rgba(59, 130, 246, 0.2);"> | |
<h3 style="color: #60a5fa; margin-bottom: 15px;">π Built with β€οΈ for weather enthusiasts</h3> | |
<p style="color: #cbd5e1; margin-bottom: 10px;"> | |
<strong>Powered by:</strong> National Weather Service API & Google Gemini AI | |
</p> | |
<p style="color: #94a3b8; font-size: 0.9rem;"> | |
Experience the future of weather intelligence with seamless AI integration | |
</p> </div> | |
""") | |
# Event handlers | |
def handle_chat(message, history): | |
"""Handle chat messages with enhanced AI processing and MCP integration.""" | |
try: | |
if self.gemini_api_key: | |
# π Convert tuple history to messages format for AI processing | |
messages_history = [] | |
for entry in history: | |
if isinstance(entry, list) and len(entry) >= 2: | |
messages_history.append({"role": "user", "content": entry[0]}) | |
messages_history.append({"role": "assistant", "content": entry[1]}) | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
result = loop.run_until_complete( | |
self._process_with_ai(message, messages_history) | |
) | |
loop.close() | |
else: | |
# For basic processing, pass empty history since it doesn't use it | |
result = self._process_basic(message, []) | |
# β Always use tuple format for Gradio chatbot | |
if not history: | |
history = [] | |
new_history = history + [[message, result['response']]] | |
# πΊοΈ Enhanced map update with MCP data | |
new_map = self._get_current_map() | |
if result.get('map_update_needed', False): | |
new_map = self._update_map_with_cities( | |
result.get('cities', []), | |
result.get('weather_data', {}), | |
mcp_data=result.get('mcp_enhanced_data', {}) | |
) | |
cities = result.get('cities', []) | |
forecast_city_sync = cities[0] if cities else "" | |
# π Auto-sync forecast with MCP enhancements | |
synced_forecast_text = gr.update() | |
synced_temp_chart = gr.update() | |
synced_precip_chart = gr.update() | |
synced_wind_chart = gr.update() | |
synced_humidity_chart = gr.update() | |
if forecast_city_sync: | |
try: | |
# π Enhanced forecast with MCP data | |
f_text, t_chart, p_chart, w_chart, h_chart, _ = self._handle_enhanced_forecast( | |
forecast_city_sync, | |
mcp_data=result.get('mcp_enhanced_data', {}) | |
) | |
synced_forecast_text = f_text | |
synced_temp_chart = t_chart | |
synced_precip_chart = p_chart | |
synced_wind_chart = w_chart | |
synced_humidity_chart = h_chart | |
except Exception as e_forecast: | |
logger.error(f"Error in enhanced forecast: {e_forecast}") | |
synced_forecast_text = f"β οΈ Error loading enhanced forecast for {forecast_city_sync}" | |
return new_history, "", new_map, forecast_city_sync, synced_forecast_text, synced_temp_chart, synced_precip_chart, synced_wind_chart, synced_humidity_chart | |
except Exception as e: | |
logger.error(f"Error in enhanced chat handler: {e}") | |
if not history: | |
history = [] | |
error_history = history + [[message, f"π¨ Sorry, I encountered an error: {str(e)}"]] | |
return error_history, "", self._get_current_map(), "", gr.update(), gr.update(), gr.update(), gr.update(), gr.update() | |
def handle_forecast(city_name): | |
"""Handle detailed forecast requests with auto-clear and enhanced styling, including wind and humidity charts.""" | |
try: | |
if not city_name.strip(): | |
return ( | |
"Please enter a city name to get the forecast! ποΈ", | |
self._create_default_chart(), # temp | |
self._create_default_chart(), # precip | |
self._create_default_chart(), # wind | |
self._create_default_chart(), # humidity | |
"" # Clear input | |
) | |
forecast_text = self.get_detailed_forecast(city_name) | |
coords = self.weather_client.geocode_location(city_name) | |
if not coords: | |
return ( | |
f"β Could not find weather data for '{city_name}'. Please check the city name and try again.", | |
self._create_default_chart(), | |
self._create_default_chart(), | |
self._create_default_chart(), | |
self._create_default_chart(), | |
"" | |
) | |
lat, lon = coords | |
forecast_data = self.weather_client.get_forecast(lat, lon) | |
# Debug: Check what format forecast_data is | |
logger.info(f"Forecast data type: {type(forecast_data)}") | |
if forecast_data: | |
logger.info(f"Forecast data sample: {forecast_data[:1] if isinstance(forecast_data, list) else list(forecast_data.keys()) if isinstance(forecast_data, dict) else 'Unknown format'}") | |
# Use the same forecast data for all charts since they now handle both formats | |
temp_fig = self._create_temperature_chart(forecast_data) if forecast_data else self._create_default_chart() | |
precip_fig = self._create_precipitation_chart(forecast_data) if forecast_data else self._create_default_chart() | |
wind_fig = self._create_wind_chart(forecast_data) if forecast_data else self._create_default_chart() | |
humidity_fig = self._create_humidity_chart(forecast_data) if forecast_data else self._create_default_chart() | |
return forecast_text, temp_fig, precip_fig, wind_fig, humidity_fig, "" # Clear input | |
except Exception as e: | |
logger.error(f"Error in forecast handler: {e}") | |
return ( | |
f"β Error getting forecast for '{city_name}': {str(e)}", | |
self._create_default_chart(), | |
self._create_default_chart(), | |
self._create_default_chart(), | |
self._create_default_chart(), | |
"" | |
) | |
# Wire up event handlers with enhanced synchronization | |
send_btn.click( | |
fn=handle_chat, | |
inputs=[msg_input, chatbot], | |
outputs=[chatbot, msg_input, weather_map, forecast_city_input, forecast_output, temp_chart, precip_chart, wind_chart, humidity_chart] | |
) | |
msg_input.submit( | |
fn=handle_chat, | |
inputs=[msg_input, chatbot], | |
outputs=[chatbot, msg_input, weather_map, forecast_city_input, forecast_output, temp_chart, precip_chart, wind_chart, humidity_chart] | |
) | |
clear_btn.click( | |
fn=lambda: ([], "", ""), | |
outputs=[chatbot, msg_input, forecast_city_input] | |
) | |
get_forecast_btn.click( | |
fn=handle_forecast, | |
inputs=[forecast_city_input], | |
outputs=[forecast_output, temp_chart, precip_chart, wind_chart, humidity_chart, forecast_city_input] | |
) | |
forecast_city_input.submit( | |
fn=handle_forecast, | |
inputs=[forecast_city_input], | |
outputs=[forecast_output, temp_chart, precip_chart, wind_chart, humidity_chart, forecast_city_input] | |
) | |
refresh_alerts_btn.click( | |
fn=self.get_weather_alerts, | |
outputs=[alerts_output] | |
) | |
return app | |
def _get_current_map(self) -> str: | |
"""Get the current map HTML""" | |
try: | |
return self.map_manager.create_weather_map([]) | |
except Exception as e: | |
logger.error(f"Error getting current map: {e}") | |
return "<div>Map temporarily unavailable</div>" | |
def _update_map_with_cities(self, cities: list, weather_data: dict, mcp_data: dict = None) -> str: | |
"""Update map with city markers and MCP enhancements""" | |
try: | |
cities_data = [] | |
for city in cities: | |
if city in weather_data: | |
city_data = weather_data[city] | |
coords = city_data.get('coordinates') | |
if coords: | |
lat, lon = coords | |
# Base city data | |
city_info = { | |
'name': city, | |
'lat': lat, | |
'lon': lon, | |
'forecast': city_data.get('forecast', []), | |
'current': city_data.get('current', {}) | |
} | |
# Add MCP enhancements if available | |
if mcp_data and city in mcp_data: | |
city_info['mcp_enhancements'] = mcp_data[city] | |
# Add air quality info to popup if available | |
if 'get_air_quality_data' in mcp_data[city]: | |
city_info['air_quality'] = mcp_data[city]['get_air_quality_data'] | |
cities_data.append(city_info) | |
return self.map_manager.create_enhanced_weather_map(cities_data) | |
except Exception as e: | |
logger.error(f"Error updating enhanced map: {e}") | |
return self._get_current_map() | |
def _handle_enhanced_forecast(self, city_name: str, mcp_data: dict = None): | |
"""Handle forecast with MCP enhancements""" | |
try: | |
# Get basic forecast | |
basic_forecast = self.get_detailed_forecast(city_name) | |
# Enhance with MCP data if available | |
if mcp_data and city_name.lower() in mcp_data: | |
city_mcp = mcp_data[city_name.lower()] | |
enhanced_forecast = basic_forecast + "\n\n## π§ Enhanced Insights\n\n" | |
for tool_name, tool_result in city_mcp.items(): | |
if tool_name == 'get_historical_weather': | |
enhanced_forecast += f"π **Historical Comparison:** {str(tool_result)[:200]}...\n\n" | |
elif tool_name == 'get_air_quality_data': | |
enhanced_forecast += f"π¬οΈ **Air Quality:** {str(tool_result)[:150]}...\n\n" | |
elif tool_name == 'analyze_climate_patterns': | |
enhanced_forecast += f"π **Climate Trends:** {str(tool_result)[:200]}...\n\n" | |
elif tool_name == 'get_severe_weather_outlook': | |
enhanced_forecast += f"β οΈ **Severe Weather:** {str(tool_result)[:150]}...\n\n" | |
else: | |
enhanced_forecast = basic_forecast | |
# Get charts | |
coords = self.weather_client.geocode_location(city_name) | |
if coords: | |
lat, lon = coords | |
forecast_data = self.weather_client.get_forecast(lat, lon) | |
# Use the same forecast data for all charts (they handle the format internally) | |
temp_fig = self._create_temperature_chart(forecast_data) if forecast_data else self._create_default_chart() | |
precip_fig = self._create_precipitation_chart(forecast_data) if forecast_data else self._create_default_chart() | |
wind_fig = self._create_wind_chart(forecast_data) if forecast_data else self._create_default_chart() | |
humidity_fig = self._create_humidity_chart(forecast_data) if forecast_data else self._create_default_chart() | |
return enhanced_forecast, temp_fig, precip_fig, wind_fig, humidity_fig, "" | |
else: | |
return enhanced_forecast, self._create_default_chart(), self._create_default_chart(), self._create_default_chart(), self._create_default_chart(), "" | |
except Exception as e: | |
logger.error(f"Error in enhanced forecast: {e}") | |
return f"β Error getting enhanced forecast: {str(e)}", self._create_default_chart(), self._create_default_chart(), self._create_default_chart(), self._create_default_chart(), "" # Chart creation methods are defined above in the main class | |
# ...existing code... | |
def main(): | |
"""Main application entry point""" | |
print("π€οΈ Starting Weather App") | |
print("π€ AI Features:", "β Enabled" if os.getenv("GEMINI_API_KEY") else "β οΈ Limited (no API key)") | |
print("π± App will be available at: http://localhost:7860") | |
try: | |
app_instance = WeatherAppProEnhanced() | |
app = app_instance.create_interface() | |
app.launch( | |
share=True, | |
debug=True, | |
server_name="0.0.0.0", | |
server_port=7860 | |
) | |
print("β App stopped successfully") | |
except Exception as e: | |
logger.error(f"Error starting app: {e}") | |
print(f"β Failed to start app: {e}") | |
if __name__ == "__main__": | |
main() |