rajkhanke's picture
Upload 9 files
df54699 verified
from flask import Flask, request, render_template
import requests
from datetime import datetime, date, timedelta
import joblib
import numpy as np
import shap
import google.generativeai as genai
import json
import logging
import os
import pandas as pd
# Attempt to import dice_ml and set a flag
try:
import dice_ml
dice_ml_available = True
logging.info("dice_ml library found and imported successfully.")
except ImportError as e_import: # Catch the specific ImportError
dice_ml_available = False
# Log the actual import error, which can be very helpful for debugging
logging.warning(f"IMPORTANT: dice_ml library FAILED TO IMPORT: {e_import}. "
f"DICE explanations will be unavailable. Ensure 'dice-ml' is installed in your Python environment (e.g., 'pip install dice-ml').")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
app = Flask(__name__)
# IMPORTANT: Set this environment variable or replace the placeholder
GEMINI_API_KEY = "AIzaSyDkiYr-eSkqIXpZ1fHlik_YFsFtfQoFi0w"
if GEMINI_API_KEY == "YOUR_GEMINI_API_KEY_HERE":
logging.warning(
"Using a placeholder Gemini API Key. AI Analysis will likely fail. Please set the GEMINI_API_KEY environment variable or update app.py.")
NOMINATIM_USER_AGENT = 'CloudburstPredictorApp/1.0 ([email protected])' # Update with your contact
# Model and explainer globals
reg = None # Regressor (for probability score)
scaler = None
explainer = None # SHAP explainer
scaled_background_data = None
dice_explainer = None # DICE explainer
dice_data_object = None # DICE Data object
feature_names = [
'Min Temp (°C)', 'Max Temp (°C)', 'Humidity (2m %)', 'Pressure (hPa)',
'Precipitation (mm)', 'Rain (mm)', 'Precipitation Probability (%)',
'Cloud Cover (%)', 'Wind Speed (km/h)', 'Wind Gust (km/h)',
'Wind Direction (Encoded)', 'Is Day (Encoded)', 'Temp (2m °C)',
'Weather Description (Encoded)'
]
weather_code_mapping = {
0: {"desc": "Clear sky", "icon": "fa-sun"}, 1: {"desc": "Mainly clear", "icon": "fa-cloud-sun"},
2: {"desc": "Partly cloudy", "icon": "fa-cloud"}, 3: {"desc": "Overcast", "icon": "fa-smog"},
45: {"desc": "Fog", "icon": "fa-smog"}, 48: {"desc": "Depositing rime fog", "icon": "fa-smog"},
51: {"desc": "Light drizzle", "icon": "fa-cloud-rain"}, 53: {"desc": "Moderate drizzle", "icon": "fa-cloud-rain"},
55: {"desc": "Dense drizzle", "icon": "fa-cloud-showers-heavy"},
56: {"desc": "Light freezing drizzle", "icon": "fa-snowflake"},
57: {"desc": "Dense freezing drizzle", "icon": "fa-snowflake"},
61: {"desc": "Slight rain", "icon": "fa-cloud-rain"},
63: {"desc": "Moderate rain", "icon": "fa-cloud-showers-heavy"},
65: {"desc": "Heavy rain", "icon": "fa-cloud-pour"},
66: {"desc": "Light freezing rain", "icon": "fa-cloud-meatball"},
67: {"desc": "Heavy freezing rain", "icon": "fa-cloud-meatball"},
71: {"desc": "Slight snow fall", "icon": "fa-snowflake"},
73: {"desc": "Moderate snow fall", "icon": "fa-snowflake"},
75: {"desc": "Heavy snow fall", "icon": "fa-snowflake"}, 77: {"desc": "Snow grains", "icon": "fa-snowflake"},
80: {"desc": "Slight rain showers", "icon": "fa-cloud-sun-rain"},
81: {"desc": "Moderate rain showers", "icon": "fa-cloud-showers-heavy"},
82: {"desc": "Violent rain showers", "icon": "fa-cloud-pour"},
85: {"desc": "Slight snow showers", "icon": "fa-cloud-meatball"},
86: {"desc": "Heavy snow showers", "icon": "fa-cloud-meatball"},
95: {"desc": "Thunderstorm", "icon": "fa-bolt-lightning"},
96: {"desc": "Thunderstorm with slight hail", "icon": "fa-cloud-bolt"},
99: {"desc": "Thunderstorm with heavy hail", "icon": "fa-cloud-bolt"}
}
def get_weather_detail(code, detail_type="desc", default_desc="Unknown", default_icon="fa-question-circle"):
mapping = weather_code_mapping.get(code)
if mapping: return mapping.get(detail_type, default_desc if detail_type == "desc" else default_icon)
return default_desc if detail_type == "desc" else default_icon
def load_models():
global reg, scaler, explainer, scaled_background_data, dice_explainer, dice_data_object, dice_ml_available, feature_names
try:
logging.info("Attempting to load models and scaler...")
reg_path, scaler_path, background_data_path = 'cloudburst_regressor.pkl', 'scaler.pkl', 'scaled_background_data_sample.npy'
if os.path.exists(reg_path):
reg = joblib.load(reg_path); logging.info("Regressor loaded.")
else:
logging.warning(f"Regressor model not found at {reg_path}. Regression disabled."); reg = None
if os.path.exists(scaler_path):
scaler = joblib.load(scaler_path); logging.info("Scaler loaded.")
else:
logging.warning(f"Scaler not found at {scaler_path}. Predictions disabled."); scaler = None
if os.path.exists(background_data_path):
scaled_background_data = np.load(background_data_path, allow_pickle=True)
logging.info(f"Background data for explainers loaded. Shape: {scaled_background_data.shape}")
if scaled_background_data.ndim == 1 and scaler and hasattr(scaler, 'n_features_in_') and \
scaled_background_data.shape[0] == scaler.n_features_in_:
scaled_background_data = scaled_background_data.reshape(1, -1);
logging.info(f"Reshaped 1D background data to: {scaled_background_data.shape}")
else:
logging.warning(f"Background data not found at {background_data_path}. Explainers may be affected.");
scaled_background_data = None
model_ready = reg and scaler and hasattr(reg, 'n_features_in_') and hasattr(scaler, 'n_features_in_') and \
reg.n_features_in_ == scaler.n_features_in_ == len(feature_names)
if model_ready:
# SHAP Explainer
is_tree_model = any(hasattr(reg, attr) for attr in ['tree_', 'booster_', 'estimators_']) or \
reg.__class__.__name__ in ['RandomForestRegressor', 'GradientBoostingRegressor',
'XGBRegressor', 'LGBMRegressor']
if is_tree_model:
explainer = shap.TreeExplainer(reg); logging.info("SHAP TreeExplainer initialized.")
elif scaled_background_data is not None and scaled_background_data.shape[1] == scaler.n_features_in_:
summary_data = shap.kmeans(scaled_background_data, min(10, scaled_background_data.shape[0])) if \
scaled_background_data.shape[0] > 10 else scaled_background_data
explainer = shap.KernelExplainer(reg.predict, summary_data);
logging.info(f"SHAP KernelExplainer initialized with background summary shape {summary_data.shape}.")
else:
logging.warning(
"SHAP explainer could not be initialized (non-tree model and no suitable background data)."); explainer = None
# DICE Explainer
if dice_ml_available and scaled_background_data is not None and scaled_background_data.shape[1] == len(
feature_names):
try:
logging.info("Attempting to initialize DICE explainer...")
df_background_for_dice = pd.DataFrame(scaled_background_data, columns=feature_names)
df_dice_data_constructor = df_background_for_dice.copy()
df_dice_data_constructor['Cloudburst_Probability'] = reg.predict(scaled_background_data)
dice_data_object = dice_ml.Data(dataframe=df_dice_data_constructor,
continuous_features=feature_names,
outcome_name='Cloudburst_Probability')
dice_model_wrapper = dice_ml.Model(model=reg, backend='sklearn', model_type='regressor')
dice_explainer = dice_ml.Dice(dice_data_object, dice_model_wrapper, method="random")
logging.info("DICE explainer initialized successfully.")
except Exception as e_dice_init:
logging.error(f"Error initializing DICE explainer: {e_dice_init}", exc_info=True)
dice_explainer = None # Keep dice_ml_available as True, but explainer object is None
elif not dice_ml_available:
logging.warning(
"DICE explainer not initialized: dice_ml library not available.") # Already logged at import
else:
logging.warning(
"DICE explainer not initialized: background data for DICE missing or mismatched."); dice_explainer = None
else:
logging.warning(
"SHAP and DICE explainers disabled due to missing models, scaler, or feature count mismatch.")
explainer = None;
dice_explainer = None
except Exception as e_load:
logging.error(f"Error during model loading: {e_load}", exc_info=True)
finally:
if not model_ready: logging.critical(
"CRITICAL: Model/Scaler/Feature_names integrity check failed. Predictions unreliable.")
load_models()
wind_direction_mapping = {"E": 0, "N": 1, "NE": 2, "NW": 3, "S": 4, "SE": 5, "SW": 6, "W": 7}
is_day_mapping = {1: 0, 0: 1} # API: 1=Day (model:0), 0=Night (model:1)
wind_direction_full_names = {"N": "North", "NE": "Northeast", "E": "East", "SE": "Southeast", "S": "South",
"SW": "Southwest", "W": "West", "NW": "Northwest"}
def map_weather_description_to_encoding(code):
if code is None: return 0
try:
code = int(code)
except (ValueError, TypeError):
return 0
weather_encoding_map = {0: 0, 1: 0, 2: 5, 3: 4, 45: 2, 48: 2, 51: 1, 53: 1, 55: 1, 56: 1, 57: 1, 61: 6, 63: 6,
65: 3, 66: 6, 67: 3, 71: 6, 73: 6, 75: 6, 77: 6, 80: 6, 81: 6, 82: 3, 85: 6, 86: 6, 95: 7,
96: 7, 99: 7}
return weather_encoding_map.get(code, 0)
def get_previous_week_data(lat, lon):
today = date.today();
start_date, end_date = today - timedelta(days=8), today - timedelta(days=1)
url = "https://api.open-meteo.com/v1/archive"
params = {"latitude": lat, "longitude": lon, "daily": 'precipitation_sum,rain_sum',
"hourly": 'temperature_2m,relativehumidity_2m,pressure_msl,cloudcover,windspeed_10m,windgusts_10m,precipitation_probability',
"timezone": "auto", "start_date": start_date.strftime("%Y-%m-%d"),
"end_date": end_date.strftime("%Y-%m-%d")}
hist_avgs = {"avg_precipitation_sum": 0.1, "avg_rain_sum": 0.1, "avg_relativehumidity_2m": 65.0,
"avg_pressure_msl": 1012.0,
"avg_cloudcover": 40.0, "avg_temp": 22.0, "avg_wind_speed": 8.0, "avg_wind_gust": 12.0,
"avg_precip_prob": 15.0}
try:
response = requests.get(url, params=params, timeout=15);
response.raise_for_status();
data_hist = response.json()
key_map = [(("daily", "precipitation_sum"), "avg_precipitation_sum"), (("daily", "rain_sum"), "avg_rain_sum"),
(("hourly", "temperature_2m"), "avg_temp"),
(("hourly", "relativehumidity_2m"), "avg_relativehumidity_2m"),
(("hourly", "pressure_msl"), "avg_pressure_msl"), (("hourly", "cloudcover"), "avg_cloudcover"),
(("hourly", "windspeed_10m"), "avg_wind_speed"), (("hourly", "windgusts_10m"), "avg_wind_gust"),
(("hourly", "precipitation_probability"), "avg_precip_prob")]
for (data_type, param_name), avg_key in key_map:
values = [x for x in data_hist.get(data_type, {}).get(param_name, []) if
x is not None and not (isinstance(x, float) and np.isnan(x))]
if values: hist_avgs[avg_key] = float(np.mean(values))
except Exception as e:
logging.error(f"Error fetching/processing historical data: {e}. Using defaults.")
return hist_avgs
def degrees_to_cardinal(deg):
if deg is None: return "N/A"
try:
deg = float(deg)
except:
return "N/A"
return ["N", "NE", "E", "SE", "S", "SW", "W", "NW"][int((deg + 22.5) / 45) % 8]
def reverse_geocode(lat, lon):
url, headers = f"https://nominatim.openstreetmap.org/reverse?format=jsonv2&lat={lat}&lon={lon}", {
'User-Agent': NOMINATIM_USER_AGENT}
try:
r = requests.get(url, headers=headers, timeout=10);
r.raise_for_status();
data = r.json();
addr = data.get('address', {})
name = addr.get('city') or addr.get('town') or addr.get('village') or data.get('display_name')
country = addr.get('country')
return f"{name}, {country}" if name and country else data.get('display_name', f'Lat: {lat:.3f}, Lon: {lon:.3f}')
except Exception as e:
logging.error(f"Geocoding failed: {e}"); return f'Lat: {lat:.3f}, Lon: {lon:.3f} (No Address)'
def map_daily_to_model_features(daily_data_point, historical_avgs, expected_n_features):
try:
min_temp, max_temp = daily_data_point.get('temperature_2m_min'), daily_data_point.get('temperature_2m_max')
precip_sum, rain_sum_val = daily_data_point.get('precipitation_sum', 0.0), daily_data_point.get('rain_sum', 0.0)
precip_prob_max = daily_data_point.get('precipitation_probability_max', 0.0)
wind_speed_max, wind_gust_max = daily_data_point.get('windspeed_10m_max', 0.0), daily_data_point.get(
'windgusts_10m_max', 0.0)
wind_dir_deg, weathercode = daily_data_point.get('winddirection_10m_dominant'), daily_data_point.get(
'weathercode')
humidity, pressure, cloudcover = historical_avgs['avg_relativehumidity_2m'], historical_avgs[
'avg_pressure_msl'], historical_avgs['avg_cloudcover']
temp_avg_day = (min_temp + max_temp) / 2 if min_temp is not None and max_temp is not None else historical_avgs[
'avg_temp']
is_day_enc = is_day_mapping.get(1, 0)
features_raw = [min_temp, max_temp, humidity, pressure, precip_sum, rain_sum_val, precip_prob_max, cloudcover,
wind_speed_max, wind_gust_max, wind_direction_mapping.get(degrees_to_cardinal(wind_dir_deg), 1),
is_day_enc, temp_avg_day, map_weather_description_to_encoding(weathercode)]
default_map = {'Min Temp (°C)': historical_avgs['avg_temp'] - 5,
'Max Temp (°C)': historical_avgs['avg_temp'] + 5,
'Humidity (2m %)': historical_avgs['avg_relativehumidity_2m'],
'Pressure (hPa)': historical_avgs['avg_pressure_msl'],
'Precipitation (mm)': historical_avgs['avg_precipitation_sum'],
'Rain (mm)': historical_avgs['avg_rain_sum'],
'Precipitation Probability (%)': historical_avgs['avg_precip_prob'],
'Cloud Cover (%)': historical_avgs['avg_cloudcover'],
'Wind Speed (km/h)': historical_avgs['avg_wind_speed'],
'Wind Gust (km/h)': historical_avgs['avg_wind_gust'],
'Wind Direction (Encoded)': 1, 'Is Day (Encoded)': 0,
'Temp (2m °C)': historical_avgs['avg_temp'],
'Weather Description (Encoded)': 0}
proc_features = [float(val) if val is not None and not (isinstance(val, float) and np.isnan(val)) else float(
default_map[feature_names[i]]) for i, val in enumerate(features_raw)]
features_arr = np.array([proc_features])
if features_arr.shape[1] != expected_n_features: logging.error(
f"Daily mapped features count ({features_arr.shape[1]}) != expected ({expected_n_features})."); return None
return features_arr
except Exception as e:
logging.error(f"Error mapping daily to features: {e}", exc_info=True); return None
def get_gemini_analysis(current_weather, current_prediction, future_daily_predictions, shap_explanation, location,
dice_explanation=None):
if not GEMINI_API_KEY or "YOUR_GEMINI_API_KEY_HERE" in GEMINI_API_KEY: return {
"error": "AI analysis key not configured or is placeholder."}
try:
genai.configure(api_key=GEMINI_API_KEY)
model = genai.GenerativeModel('gemini-1.5-flash',
generation_config={"temperature": 0.5, "top_p": 0.9, "max_output_tokens": 3500})
cw = {k: (v if v is not None else 'N/A') for k, v in (current_weather or {}).items()}
cp = {k: (v if v is not None else 'N/A') for k, v in (current_prediction or {}).items()}
future_preds_text = [
f"- **{p.get('date', 'N/A')}**: Risk {p.get('probability', 'N/A')}% ({p.get('status', 'N/A')}). Weather: {p.get('weather_description', 'N/A')}, Temp: {p.get('min_temp', 'N/A')} to {p.get('max_temp', 'N/A')}. Max Precip Prob: {p.get('precip_prob_max', 'N/A')}."
for p in future_daily_predictions or []]
future_summary = "\n".join(future_preds_text) if future_preds_text else "No detailed future forecast available."
base_val_text = "Base model prediction (average probability): Not available."
shap_text_parts = []
if shap_explanation and not any(
item.get('feature', '').lower().startswith(('shap error', 'prediction disabled', 'shap n/a')) for item
in shap_explanation):
for item in shap_explanation:
if item['feature'] == 'Base Value (Average Prediction)':
base_val_text = f"Base model prediction (average probability): {item['impact']:.1f}%"
elif isinstance(item['impact'], float) and abs(item['impact']) > 0.01:
shap_text_parts.append(f" - {item['feature']}: influence of {item['impact']:.1f}% on probability")
shap_summary = "Key factors influencing *current* prediction (SHAP values show % change from base):\n" + "\n".join(
shap_text_parts[:5]) if shap_text_parts else "SHAP analysis not available or not significant."
dice_summary_text = "Counterfactual analysis (what-if scenarios to lower risk): Not available or not run."
if dice_explanation and dice_explanation.get("counterfactuals"):
dice_parts = ["**Insights from Counterfactual Analysis (What could lower the risk?):**"]
for i, cf in enumerate(dice_explanation["counterfactuals"][:2]):
dice_parts.append(
f" *Scenario {i + 1} (to achieve ~{cf.get('achieved_probability', 'target')}% risk):*") # Added % to achieved_probability
for change in cf["changes"][:3]:
orig_val, cf_val = change.get('original_value_unscaled',
change.get('original_value_scaled')), change.get('cf_value_unscaled',
change.get(
'cf_value_scaled'))
dice_parts.append(f" - **{change['feature']}**: change from `{orig_val}` to `{cf_val}`")
dice_summary_text = "\n".join(dice_parts)
elif dice_explanation and (dice_explanation.get("message") or dice_explanation.get("error")):
dice_summary_text = f"Counterfactual analysis: {dice_explanation.get('message') or dice_explanation.get('error')}"
prompt = f"""
You are an expert meteorologist. Analyze the following cloudburst risk data for **{location}** and provide a comprehensive summary.
Use Markdown for all formatting (e.g., `## Heading 2`, `### Heading 3`, `* list item`, `**bold text**`, `_italic text_`).
**Current Weather Snapshot (as of {cw.get('Current Time', 'N/A')}):**
- Conditions: {cw.get('Weather Description (Current)', 'N/A')}
- Temperature: {cw.get('Temp (2m °C)', 'N/A')}°C (Today's Range: {cw.get('Min Temp (°C)', 'N/A')}°C - {cw.get('Max Temp (°C)', 'N/A')}°C)
- Humidity (Recent Avg): {cw.get('Humidity (Past Week Avg %)', 'N/A')}%
- Wind: {cw.get('Wind Speed (Current km/h)', 'N/A')} km/h from {cw.get('Wind Direction (Current)', 'N/A')}
- Today's Precipitation: {cw.get('Precipitation Today (Accumulated mm)', 'N/A')} mm
- Current Hour Precip. Chance: {cw.get('Precipitation Probability (Current Hour %)', 'N/A')}%
**Immediate Cloudburst Risk Assessment (Now):**
- Predicted Likelihood: **{cp.get('Predicted Cloudburst', 'N/A')}**
- Probability Score: **{cp.get('Predicted Cloudburst (%)', 'N/A')}%**
- {base_val_text}
{shap_summary}
{dice_summary_text}
**Cloudburst Risk Outlook (Next ~{len(future_daily_predictions) if future_daily_predictions else 0} Days):**
{future_summary}
---
**YOUR DETAILED ANALYSIS & ADVICE (Use Markdown formatting as specified above):**
## Executive Summary
_(A concise overview: current cloudburst risk level at {location}, the trend for upcoming days, and critical factors. Subtly weave in SHAP/DICE insights if available for the *immediate* forecast.)_
## Detailed Risk Breakdown
_(Elaborate on the current situation. For the future outlook, if any days show moderate or high risk (e.g., > 40-50% probability), create sub-sections like `### Tuesday: Elevated Risk` and explain the contributing factors for that day.)_
## Actionable Recommendations & Safety Tips
_(Provide 3-5 clear, practical bullet points based on the overall risk. E.g., preparations, travel advice, monitoring official alerts.)_
## Understanding the Forecast
_(Briefly explain that these are model-based predictions with inherent uncertainties and encourage users to stay updated with official meteorological sources.)_
**Important:** Maintain a factual, clear, and safety-conscious tone. Avoid sensationalism. Ensure all structured text (headings, lists) uses Markdown.
gie response in html for proper rendering on webpage.
"""
logging.info("Sending refined prompt to Gemini API...")
response = model.generate_content(prompt)
if response.prompt_feedback and response.prompt_feedback.block_reason:
reason = response.prompt_feedback.block_reason_message or response.prompt_feedback.block_reason.name
logging.warning(f"Gemini API call blocked. Reason: {reason}")
return {"error": f"AI analysis blocked by content policy ({reason})."}
analysis_text = "".join(part.text for part in response.candidates[0].content.parts) if response.candidates and \
response.candidates[
0].content else None
if analysis_text: return {"analysis": analysis_text}
logging.warning(f"Gemini API returned empty or unexpected response. Full response: {response}")
return {"error": "AI analysis response was empty or malformed."}
except Exception as e:
logging.error(f"Error calling Gemini API: {e}", exc_info=True);
err_msg = str(e).lower()
if any(s in err_msg for s in ["api_key_invalid", "permission_denied", "authentication"]): return {
"error": "AI analysis failed: Invalid API Key or auth issue."}
if "quota" in err_msg: return {"error": "AI analysis failed: API quota exceeded."}
if "rate limit" in err_msg: return {"error": "AI analysis failed: Rate limit. Try again later."}
if "Deadline" in str(e) or "timeout" in err_msg: return {"error": "AI analysis failed: Request timed out."}
return {"error": f"Failed to get AI analysis: Unexpected error ({type(e).__name__})."}
@app.route('/', methods=['GET'])
def index():
return render_template('index.html', show_results=False, current_year=datetime.now().year,
lat_initial=20.5937, lon_initial=78.9629,
weather_code_mapping_json=json.dumps(weather_code_mapping))
@app.route('/forecast', methods=['GET'])
def forecast():
lat_str, lon_str = request.args.get('lat'), request.args.get('lon')
render_args = {'show_results': True, 'current_year': datetime.now().year, 'lat_initial': lat_str,
'lon_initial': lon_str,
'current_weather': None,
'current_prediction': {"Predicted Cloudburst": "Error", "Predicted Cloudburst (%)": "Error"},
'future_predictions': [],
'shap_explanation': [{"feature": "SHAP N/A", "impact": "Not run or error."}],
'dice_explanation': {"error": "DICE N/A"}, 'gemini_analysis': {"error": "Analysis pending."},
'future_prob_chart_data_json': "{}", 'shap_chart_data_json': "{}", 'prediction_error': None,
'error': None,
'weather_code_mapping_json': json.dumps(weather_code_mapping)}
if not lat_str or not lon_str: render_args.update(
{'error': "Latitude and longitude are required.", 'show_results': False}); return render_template('index.html',
**render_args)
try:
lat, lon = float(lat_str), float(
lon_str); assert -90 <= lat <= 90 and -180 <= lon <= 180, "Coords out of range."
except (ValueError, AssertionError) as e:
render_args.update({'error': f"Invalid coordinates: {e}", 'show_results': False}); return render_template(
'index.html', **render_args)
if not (scaler and reg and hasattr(scaler, 'n_features_in_') and hasattr(reg,
'n_features_in_') and scaler.n_features_in_ == reg.n_features_in_ == len(
feature_names)):
err_msg = "Core prediction models/config missing/mismatched. Cannot forecast.";
logging.critical(err_msg)
render_args.update({'prediction_error': err_msg, 'gemini_analysis': {"error": err_msg}});
return render_template('index.html', **render_args)
weather_api_url = "https://api.open-meteo.com/v1/forecast"
api_params_curr = {'latitude': lat, 'longitude': lon, 'current_weather': True, 'timezone': 'auto',
'forecast_days': 1,
'hourly': 'temperature_2m,relativehumidity_2m,pressure_msl,precipitation,rain,cloudcover,windspeed_10m,windgusts_10m,winddirection_10m,is_day,weathercode,precipitation_probability',
'daily': 'temperature_2m_max,temperature_2m_min,precipitation_sum,rain_sum,precipitation_probability_max,weathercode,windspeed_10m_max,windgusts_10m_max,winddirection_10m_dominant'}
try:
r_curr = requests.get(weather_api_url, params=api_params_curr,
timeout=15); r_curr.raise_for_status(); data_curr_api = r_curr.json()
except requests.exceptions.RequestException as e:
render_args['prediction_error'] = f"Weather API error (current): {e}"; return render_template('index.html',
**render_args)
api_cw, api_hrly, api_dly_today = data_curr_api.get("current_weather", {}), data_curr_api.get("hourly",
{}), data_curr_api.get(
"daily", {})
if not (api_cw and api_hrly.get("time") and api_dly_today.get("time")): render_args[
'prediction_error'] = "Incomplete current weather data from API."; return render_template('index.html',
**render_args)
curr_time_api = api_cw.get("time");
curr_idx = api_hrly["time"].index(curr_time_api) if curr_time_api and curr_time_api in api_hrly["time"] else 0
def get_val(src, key, idx, default=None):
vals = src.get(key); return vals[idx] if vals and idx < len(vals) and vals[idx] is not None else default
hist_avgs = get_previous_week_data(lat, lon)
min_T, max_T = get_val(api_dly_today, "temperature_2m_min", 0, hist_avgs['avg_temp'] - 5), get_val(api_dly_today,
"temperature_2m_max",
0, hist_avgs[
'avg_temp'] + 5)
curr_T = get_val(api_hrly, "temperature_2m", curr_idx, hist_avgs['avg_temp'])
is_day_api = get_val(api_hrly, "is_day", curr_idx, 1)
curr_feat_vals = [min_T, max_T,
get_val(api_hrly, "relativehumidity_2m", curr_idx, hist_avgs['avg_relativehumidity_2m']),
get_val(api_hrly, "pressure_msl", curr_idx, hist_avgs['avg_pressure_msl']),
get_val(api_dly_today, "precipitation_sum", 0, hist_avgs['avg_precipitation_sum']),
get_val(api_dly_today, "rain_sum", 0, hist_avgs['avg_rain_sum']),
get_val(api_dly_today, "precipitation_probability_max", 0,
get_val(api_hrly, "precipitation_probability", curr_idx, hist_avgs['avg_precip_prob'])),
get_val(api_hrly, "cloudcover", curr_idx, hist_avgs['avg_cloudcover']),
get_val(api_dly_today, "windspeed_10m_max", 0,
get_val(api_hrly, "windspeed_10m", curr_idx, hist_avgs['avg_wind_speed'])),
get_val(api_dly_today, "windgusts_10m_max", 0,
get_val(api_hrly, "windgusts_10m", curr_idx, hist_avgs['avg_wind_gust'])),
wind_direction_mapping.get(degrees_to_cardinal(
get_val(api_dly_today, "winddirection_10m_dominant", 0,
get_val(api_hrly, "winddirection_10m", curr_idx, 0))), 1),
is_day_mapping.get(is_day_api, 0), curr_T,
map_weather_description_to_encoding(
get_val(api_dly_today, "weathercode", 0, get_val(api_hrly, "weathercode", curr_idx, 0)))]
curr_feat_proc = [float(x) if x is not None and not (isinstance(x, float) and np.isnan(x)) else 0.0 for x in
curr_feat_vals]
curr_feat_np = np.array([curr_feat_proc])
# Current Prediction & Explainability
if curr_feat_np.shape[1] == scaler.n_features_in_:
try:
curr_feat_scaled = scaler.transform(curr_feat_np)
prob_raw = reg.predict(curr_feat_scaled)[0]
prob_clmp = max(0, min(100, int(round(prob_raw))))
render_args['current_prediction'] = {"Predicted Cloudburst (%)": prob_clmp,
"Predicted Cloudburst": "Yes" if prob_clmp > 50 else "No"}
# SHAP Explanations
if explainer:
try:
shap_vals_raw = explainer.shap_values(curr_feat_scaled);
shap_vals = np.asarray(
shap_vals_raw[0] if isinstance(shap_vals_raw, list) else shap_vals_raw).squeeze()
if shap_vals.ndim > 1 and shap_vals.shape[0] == 1: shap_vals = shap_vals[0]
if len(shap_vals) == len(feature_names):
shap_pairs = sorted(zip(feature_names, shap_vals * 100), key=lambda x: abs(x[1]), reverse=True)
render_args['shap_explanation'] = [{"feature": name, "impact": float(val)} for name, val in
shap_pairs]
if hasattr(explainer, 'expected_value'):
base_val = explainer.expected_value;
base_val = base_val.mean() if isinstance(base_val, np.ndarray) else base_val
render_args['shap_explanation'].insert(0, {"feature": "Base Value (Average Prediction)",
"impact": float(base_val * 100)})
else:
render_args['shap_explanation'] = [
{"feature": "SHAP Error", "impact": "SHAP values length mismatch."}]
except Exception as e_s:
logging.error(f"SHAP error: {e_s}", exc_info=True); render_args['shap_explanation'] = [
{"feature": "SHAP Error", "impact": str(e_s)}]
else:
render_args['shap_explanation'] = [{"feature": "SHAP Disabled", "impact": "Explainer not initialized."}]
# DICE Counterfactuals
if dice_explainer and dice_ml_available: # Check both the explainer object and the import flag
try:
query_instance_df = pd.DataFrame(curr_feat_scaled, columns=feature_names)
desired_prob_range = [0, max(0, prob_clmp - 30)]
if prob_clmp < 30:
render_args['dice_explanation'] = {
"message": "Current risk is already low. Counterfactuals for further reduction may not be very distinct or meaningful."}
else:
cfs_object = dice_explainer.generate_counterfactuals(
query_instance_df, total_CFs=3, desired_range=desired_prob_range,
features_to_vary='all'
)
if cfs_object and cfs_object.cf_examples_list:
processed_cfs = []
original_unscaled_features = pd.Series(curr_feat_np[0], index=feature_names)
for cf_example in cfs_object.cf_examples_list:
cf_df_final = cf_example.final_cfs_df
if cf_df_final is not None and not cf_df_final.empty:
for _, cf_row_scaled_series in cf_df_final.iterrows():
achieved_prob = cf_row_scaled_series['Cloudburst_Probability']
cf_scaled_values = cf_row_scaled_series.drop(
'Cloudburst_Probability').values.reshape(1, -1)
cf_unscaled_values = scaler.inverse_transform(cf_scaled_values)[0]
cf_unscaled_series = pd.Series(cf_unscaled_values, index=feature_names)
changes_list = []
for feat_name in feature_names:
original_val_display = f"{original_unscaled_features[feat_name]:.2f}"
cf_val_display = f"{cf_unscaled_series[feat_name]:.2f}"
if not np.isclose(original_unscaled_features[feat_name],
cf_unscaled_series[feat_name], atol=1e-2):
changes_list.append({
"feature": feat_name,
"original_value_unscaled": original_val_display,
"cf_value_unscaled": cf_val_display
})
if changes_list:
processed_cfs.append({
"target_probability_range": f"{desired_prob_range[0]}-{desired_prob_range[1]}%",
"achieved_probability": f"{achieved_prob:.1f}",
"changes": changes_list
})
render_args['dice_explanation'] = {"counterfactuals": processed_cfs} if processed_cfs else {
"message": "No distinct counterfactuals found to significantly lower the risk."}
else:
render_args['dice_explanation'] = {"message": "No counterfactuals generated by DiCE."}
except Exception as e_d:
logging.error(f"DICE error during generation: {e_d}", exc_info=True)
render_args['dice_explanation'] = {"error": f"DICE generation failed: {str(e_d)}"}
elif not dice_ml_available: # Condition for "dice-ml library not loaded"
render_args['dice_explanation'] = {
"error": "The 'dice-ml' library (for counterfactuals) failed to import. "
"Please ensure it is installed correctly in your active Python environment. "
"You can typically install it using: pip install dice-ml. "
"Check the server console/logs for specific import error messages that occurred at startup."
}
else: # dice_ml_available is True, but dice_explainer object is None (init failed for other reasons)
render_args['dice_explanation'] = {
"error": "DICE explainer could not be initialized. This might be due to issues with "
"background data ('scaled_background_data_sample.npy'), model compatibility, "
"or other setup problems. Check server console/logs for detailed initialization errors that occurred at startup."
}
except Exception as e_p:
logging.error(f"Prediction engine error: {e_p}", exc_info=True)
render_args.update({'prediction_error': f"Prediction engine error: {e_p}",
'shap_explanation': [{"feature": "Prediction Error", "impact": str(e_p)}],
'dice_explanation': {"error": "Prediction failed, so DICE analysis was not run."}})
else:
render_args[
'prediction_error'] = "Feature mismatch for current prediction. Expected {} features, got {}.".format(
scaler.n_features_in_, curr_feat_np.shape[1])
render_args['current_weather'] = {
"Location Address": reverse_geocode(lat, lon),
"Current Time": datetime.fromisoformat(curr_time_api).strftime("%Y-%m-%d %H:%M %Z") if curr_time_api else "N/A",
"Min Temp (°C)": f"{min_T:.1f}" if min_T is not None else "N/A",
"Max Temp (°C)": f"{max_T:.1f}" if max_T is not None else "N/A",
"Temp (2m °C)": f"{curr_T:.1f}" if curr_T is not None else "N/A",
"Weather Code": get_val(api_hrly, "weathercode", curr_idx, 0),
"Weather Description (Current)": get_weather_detail(get_val(api_hrly, "weathercode", curr_idx, 0), "desc"),
"Humidity (Past Week Avg %)": f"{hist_avgs['avg_relativehumidity_2m']:.0f}",
"Wind Speed (Current km/h)": f"{get_val(api_hrly, 'windspeed_10m', curr_idx, 0):.1f}",
"Wind Direction (Current)": wind_direction_full_names.get(
degrees_to_cardinal(get_val(api_hrly, 'winddirection_10m', curr_idx)), "N/A"),
"Precipitation Today (Accumulated mm)": f"{get_val(api_dly_today, 'precipitation_sum', 0, 0.0):.1f}",
"Precipitation Probability (Current Hour %)": f"{get_val(api_hrly, 'precipitation_probability', curr_idx, 0):.0f}"}
# Future Predictions
api_params_fut = {'latitude': lat, 'longitude': lon, 'timezone': 'auto', 'forecast_days': 8,
'daily': 'weathercode,temperature_2m_max,temperature_2m_min,precipitation_sum,rain_sum,precipitation_probability_max,windspeed_10m_max,windgusts_10m_max,winddirection_10m_dominant'}
try:
r_fut = requests.get(weather_api_url, params=api_params_fut, timeout=15);
r_fut.raise_for_status();
data_fut_api_dly = r_fut.json().get("daily", {})
if data_fut_api_dly.get("time") and len(data_fut_api_dly["time"]) > 1:
for i in range(1, len(data_fut_api_dly["time"])): # Skip today
day_data = {key: get_val(data_fut_api_dly, key, i) for key in data_fut_api_dly.keys()}
day_feat_np = map_daily_to_model_features(day_data, hist_avgs, scaler.n_features_in_)
day_pred = {"date": day_data.get('time'),
"min_temp": f"{day_data.get('temperature_2m_min'):.1f}" if day_data.get(
'temperature_2m_min') is not None else "N/A",
"max_temp": f"{day_data.get('temperature_2m_max'):.1f}" if day_data.get(
'temperature_2m_max') is not None else "N/A",
"weather_code": day_data.get('weathercode'),
"weather_description": get_weather_detail(day_data.get('weathercode'), "desc"),
"precip_prob_max": f"{day_data.get('precipitation_probability_max'):.0f}%" if day_data.get(
'precipitation_probability_max') is not None else "N/A",
"probability": "N/A", "status": "Error"}
if day_feat_np is not None:
try:
day_prob_raw = reg.predict(scaler.transform(day_feat_np))[0]; day_prob_clmp = max(0, min(100,
int(round(
day_prob_raw))))
except Exception as e_fut_pred:
day_prob_clmp = "N/A"; day_pred["status"] = "Pred. Err"; logging.error(
f"Future day pred err: {e_fut_pred}")
day_pred.update({"probability": day_prob_clmp, "status": "Yes" if isinstance(day_prob_clmp,
int) and day_prob_clmp > 50 else "No" if isinstance(
day_prob_clmp, int) else day_pred["status"]})
else:
day_pred["status"] = "Data Err"
render_args['future_predictions'].append(day_pred)
except Exception as e_fut_api:
logging.error(f"Future forecast API/processing error: {e_fut_api}", exc_info=True)
# Gemini AI Analysis
render_args['gemini_analysis'] = get_gemini_analysis(render_args['current_weather'],
render_args['current_prediction'],
render_args['future_predictions'],
render_args['shap_explanation'],
render_args['current_weather'].get('Location Address',
f'Lat: {lat:.2f}, Lon: {lon:.2f}') if
render_args[
'current_weather'] else f'Lat: {lat:.2f}, Lon: {lon:.2f}',
render_args['dice_explanation'])
# Chart Data
plottable_future = [p for p in render_args['future_predictions'] if isinstance(p.get('probability'), int)]
if plottable_future: render_args['future_prob_chart_data_json'] = json.dumps(
{"labels": [p['date'] for p in plottable_future], "data": [p['probability'] for p in plottable_future]})
plottable_shap = [s for s in render_args['shap_explanation'] if
s.get('feature') != 'Base Value (Average Prediction)' and isinstance(s.get('impact'),
float) and not s.get(
'feature', '').lower().startswith(('shap', 'error', 'prediction', 'disabled', 'n/a'))]
if plottable_shap: render_args['shap_chart_data_json'] = json.dumps(
{"labels": [s['feature'] for s in plottable_shap], "data": [s['impact'] for s in plottable_shap]})
return render_template('index.html', **render_args)
if __name__ == '__main__':
app.run(debug=True, port=os.environ.get("PORT", 5000))