weather-app_v1 / src /api /storm_events_client.py
chirfort's picture
Add Weather MCP Client and Server for Weather Intelligence Tools
bf6a6f7
"""
NOAA Storm Events API Client
Access to severe weather events and historical storm data
"""
import requests
import logging
from typing import List, Dict, Optional, Any
from datetime import datetime, timedelta
import json
logger = logging.getLogger(__name__)
class StormEventsClient:
"""Client for NOAA Storm Events Database API"""
def __init__(self):
"""Initialize Storm Events client"""
self.base_url = "https://www.ncdc.noaa.gov/stormevents/services/v1"
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'WeatherAppPro/1.0 (enhanced-weather-app)'
})
def is_available(self) -> bool:
"""Check if Storm Events API is available"""
return True # No API key required for NOAA Storm Events
def get_storm_events(
self,
state: str,
county: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
event_types: Optional[List[str]] = None
) -> List[Dict]:
"""
Get storm events for a location
Args:
state: Two-letter state code (e.g., 'KS', 'TX')
county: County name (optional)
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
event_types: List of event types to filter
"""
try:
# Default to last 30 days if no dates provided
# But ensure we're not asking for future dates
# NOAA Storm Events data usually has a delay, so we go back further
if not start_date:
end_dt = datetime.now() - timedelta(days=7) # One week back as end date
start_dt = end_dt - timedelta(days=30) # 30 days before that
start_date = start_dt.strftime('%Y-%m-%d')
end_date = end_dt.strftime('%Y-%m-%d')
url = f"{self.base_url}/query"
params = {
'format': 'json',
'startdate': start_date,
'enddate': end_date,
'state': state.upper(),
'limit': 100
}
if county:
params['county'] = county.upper()
if event_types:
# Convert event types to API format
api_event_types = []
for event_type in event_types:
api_event_types.append(self._map_event_type(event_type))
params['eventtype'] = ','.join(api_event_types)
logger.info(f"Storm Events API request: {url} with params: {params}")
response = self.session.get(url, params=params, timeout=15)
response.raise_for_status()
data = response.json()
events = data.get('results', [])
# Process and enhance event data
processed_events = []
for event in events:
processed_event = self._process_event_data(event)
processed_events.append(processed_event)
return processed_events
except Exception as e:
logger.error(f"Error getting storm events: {e}")
return []
def _map_event_type(self, event_type: str) -> str:
"""Map common event type names to API format"""
event_mapping = {
'tornado': 'Tornado',
'hail': 'Hail',
'wind': 'Thunderstorm Wind',
'thunderstorm': 'Thunderstorm Wind',
'flood': 'Flood',
'flash flood': 'Flash Flood',
'winter storm': 'Winter Storm',
'blizzard': 'Blizzard',
'ice storm': 'Ice Storm',
'drought': 'Drought',
'wildfire': 'Wildfire',
'hurricane': 'Hurricane (Typhoon)',
'lightning': 'Lightning'
}
return event_mapping.get(event_type.lower(), event_type)
def _process_event_data(self, event: Dict) -> Dict:
"""Process and enhance raw event data"""
processed = {
'event_id': event.get('EVENT_ID'),
'event_type': event.get('EVENT_TYPE'),
'state': event.get('STATE'),
'county': event.get('CZ_NAME'),
'begin_date': event.get('BEGIN_DATE_TIME'),
'end_date': event.get('END_DATE_TIME'),
'injuries_direct': event.get('INJURIES_DIRECT', 0),
'injuries_indirect': event.get('INJURIES_INDIRECT', 0),
'deaths_direct': event.get('DEATHS_DIRECT', 0),
'deaths_indirect': event.get('DEATHS_INDIRECT', 0),
'damage_property': event.get('DAMAGE_PROPERTY'),
'damage_crops': event.get('DAMAGE_CROPS'),
'magnitude': event.get('MAGNITUDE'),
'magnitude_type': event.get('MAGNITUDE_TYPE'),
'source': event.get('SOURCE'),
'episode_narrative': event.get('EPISODE_NARRATIVE'),
'event_narrative': event.get('EVENT_NARRATIVE')
}
# Add severity classification
processed['severity'] = self._calculate_severity(processed)
return processed
def _calculate_severity(self, event: Dict) -> str:
"""Calculate event severity based on impacts"""
deaths = (event.get('deaths_direct', 0) or 0) + (event.get('deaths_indirect', 0) or 0)
injuries = (event.get('injuries_direct', 0) or 0) + (event.get('injuries_indirect', 0) or 0)
if deaths > 0:
return 'Extreme'
elif injuries > 10:
return 'Severe'
elif injuries > 0:
return 'Moderate'
else:
return 'Minor'
def get_events_by_location(self, location: str, days_back: int = 30, event_types: Optional[List[str]] = None) -> List[Dict]:
"""
Get storm events by location name (convenience method)
Args:
location: State code (e.g., 'NY', 'CA') or state name
days_back: Number of days to look back
event_types: List of event types to filter
"""
# Convert state names to codes if needed
state_mapping = {
'new york': 'NY', 'california': 'CA', 'texas': 'TX', 'florida': 'FL',
'illinois': 'IL', 'ohio': 'OH', 'michigan': 'MI', 'pennsylvania': 'PA',
'georgia': 'GA', 'north carolina': 'NC', 'south carolina': 'SC',
'virginia': 'VA', 'washington': 'WA', 'oregon': 'OR', 'colorado': 'CO',
'arizona': 'AZ', 'nevada': 'NV', 'utah': 'UT', 'new mexico': 'NM',
'kansas': 'KS', 'nebraska': 'NE', 'iowa': 'IA', 'missouri': 'MO',
'arkansas': 'AR', 'louisiana': 'LA', 'oklahoma': 'OK', 'mississippi': 'MS',
'alabama': 'AL', 'tennessee': 'TN', 'kentucky': 'KY', 'indiana': 'IN',
'wisconsin': 'WI', 'minnesota': 'MN', 'north dakota': 'ND', 'south dakota': 'SD',
'montana': 'MT', 'wyoming': 'WY', 'idaho': 'ID', 'alaska': 'AK',
'hawaii': 'HI', 'maine': 'ME', 'new hampshire': 'NH', 'vermont': 'VT',
'massachusetts': 'MA', 'rhode island': 'RI', 'connecticut': 'CT',
'new jersey': 'NJ', 'delaware': 'DE', 'maryland': 'MD', 'west virginia': 'WV'
}
location_lower = location.lower().strip()
state = state_mapping.get(location_lower, location.upper())
# Calculate date range
end_date = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')
start_date = (datetime.now() - timedelta(days=days_back + 7)).strftime('%Y-%m-%d')
return self.get_storm_events(
state=state,
start_date=start_date,
end_date=end_date,
event_types=event_types
)
def get_events_by_coordinates(self, lat: float, lon: float, radius_miles: int = 50, days_back: int = 30) -> List[Dict]:
"""
Get storm events near coordinates (simplified implementation)
Args:
lat: Latitude
lon: Longitude
radius_miles: Search radius in miles (currently not implemented, gets state events)
days_back: Number of days to look back
"""
# For now, we'll get the state for the coordinates and return state-wide events
# A full implementation would need geographic filtering
# Simple state mapping based on coordinates (US only)
state = self._get_state_from_coordinates(lat, lon)
if not state:
return []
return self.get_events_by_location(state, days_back)
def _get_state_from_coordinates(self, lat: float, lon: float) -> Optional[str]:
"""Get state code from coordinates (simplified mapping for major states)"""
# This is a simplified mapping for demonstration
# A production system would use a proper geographic lookup
state_bounds = {
'NY': {'lat_min': 40.5, 'lat_max': 45.0, 'lon_min': -79.8, 'lon_max': -71.9},
'CA': {'lat_min': 32.5, 'lat_max': 42.0, 'lon_min': -124.5, 'lon_max': -114.1},
'TX': {'lat_min': 25.8, 'lat_max': 36.5, 'lon_min': -106.6, 'lon_max': -93.5},
'FL': {'lat_min': 24.4, 'lat_max': 31.0, 'lon_min': -87.6, 'lon_max': -80.0},
'IL': {'lat_min': 36.9, 'lat_max': 42.5, 'lon_min': -91.5, 'lon_max': -87.0},
'OH': {'lat_min': 38.4, 'lat_max': 42.3, 'lon_min': -84.8, 'lon_max': -80.5},
'MI': {'lat_min': 41.7, 'lat_max': 48.3, 'lon_min': -90.4, 'lon_max': -82.4},
'PA': {'lat_min': 39.7, 'lat_max': 42.3, 'lon_min': -80.5, 'lon_max': -74.7},
'GA': {'lat_min': 30.4, 'lat_max': 35.0, 'lon_min': -85.6, 'lon_max': -80.8},
'NC': {'lat_min': 33.8, 'lat_max': 36.6, 'lon_min': -84.3, 'lon_max': -75.5},
'WA': {'lat_min': 45.5, 'lat_max': 49.0, 'lon_min': -124.8, 'lon_max': -116.9},
'CO': {'lat_min': 37.0, 'lat_max': 41.0, 'lon_min': -109.1, 'lon_max': -102.0},
'AZ': {'lat_min': 31.3, 'lat_max': 37.0, 'lon_min': -114.8, 'lon_max': -109.0},
'NV': {'lat_min': 35.0, 'lat_max': 42.0, 'lon_min': -120.0, 'lon_max': -114.0},
'OR': {'lat_min': 42.0, 'lat_max': 46.3, 'lon_min': -124.6, 'lon_max': -116.5},
}
for state, bounds in state_bounds.items():
if (bounds['lat_min'] <= lat <= bounds['lat_max'] and
bounds['lon_min'] <= lon <= bounds['lon_max']):
return state
# Default fallback for central US
if 38.0 <= lat <= 42.0 and -100.0 <= lon <= -90.0:
return 'KS' # Kansas as central fallback
return None
def create_storm_events_client() -> StormEventsClient:
"""Factory function to create Storm Events client"""
return StormEventsClient()