import streamlit as st import pandas as pd from datasets import load_dataset import re from datetime import datetime, date from io import StringIO from typing import Optional, Tuple, List, Dict, Any # Constants DEFAULT_SAMPLE_SIZE = 1000 DATE_FORMAT = "%Y%m%d" FULL_DATE_FORMAT = f"{DATE_FORMAT}%H%M%S" # Load dataset with enhanced caching and validation @st.cache_data(ttl=3600, show_spinner="Loading dataset...") def load_data(sample_size: int = DEFAULT_SAMPLE_SIZE) -> pd.DataFrame: """ Load and validate dataset with error handling. Args: sample_size (int): Number of records to load Returns: pd.DataFrame: Loaded and validated dataframe """ try: dataset = load_dataset( "dwb2023/gdelt-gkg-2025-v2", data_files={ "train": [ "gdelt_gkg_20250210.parquet", "gdelt_gkg_20250211.parquet", ] }, split="train" ) df = pd.DataFrame(dataset) # Basic data validation if df.empty: st.error("Loaded dataset is empty") return pd.DataFrame() if "DATE" not in df.columns: st.error("Dataset missing required DATE column") return pd.DataFrame() return df except Exception as e: st.error(f"Error loading dataset: {str(e)}") st.stop() return pd.DataFrame() def initialize_app(df: pd.DataFrame) -> None: """Initialize the Streamlit app interface.""" st.title("GDELT GKG 2025 Dataset Explorer") with st.sidebar: st.header("Search Criteria") st.markdown("🔍 Filter dataset using the controls below") def extract_unique_themes(df: pd.DataFrame, column: str) -> List[str]: """ Extract and clean unique themes from semicolon-separated column. Args: df (pd.DataFrame): Input dataframe column (str): Column name containing themes Returns: List[str]: Sorted list of unique themes """ if df.empty: return [] return sorted({ theme.split(",")[0].strip() for themes in df[column].dropna().str.split(";") for theme in themes if theme.strip() }) def get_date_range(df: pd.DataFrame, date_col: str) -> Tuple[date, date]: """ Get min/max dates from dataset with fallback defaults. Args: df (pd.DataFrame): Input dataframe date_col (str): Column name containing dates Returns: Tuple[date, date]: (min_date, max_date) as date objects """ try: # Convert YYYYMMDDHHMMSS string format to datetime using constant dates = pd.to_datetime(df[date_col], format=FULL_DATE_FORMAT) return dates.min().date(), dates.max().date() except Exception as e: st.warning(f"Date range detection failed: {str(e)}") return datetime(2025, 2, 10).date(), datetime(2025, 2, 11).date() def create_filters(df: pd.DataFrame) -> Dict[str, Any]: """ Generate sidebar filters and return filter state. Args: df (pd.DataFrame): Input dataframe Returns: Dict[str, Any]: Dictionary of filter settings """ filters = {} with st.sidebar: # Theme multi-select filters["themes"] = st.multiselect( "V2EnhancedThemes (exact match)", options=extract_unique_themes(df, "V2EnhancedThemes"), help="Select exact themes to include (supports multiple selection)" ) # Text-based filters text_filters = { "source_common_name": ("SourceCommonName", "partial name match"), "document_identifier": ("DocumentIdentifier", "partial identifier match"), "sharing_image": ("V2.1SharingImage", "partial image URL match") } for key, (label, help_text) in text_filters.items(): filters[key] = st.text_input( f"{label} ({help_text})", placeholder=f"Enter {help_text}...", help=f"Case-insensitive {help_text}" ) # Date range with dataset-based defaults date_col = "DATE" min_date, max_date = get_date_range(df, date_col) filters["date_range"] = st.date_input( "Date range", value=(min_date, max_date), min_value=min_date, max_value=max_date, ) # Record limit filters["record_limit"] = st.number_input( "Max records to display", min_value=100, max_value=5000, value=1000, step=100, help="Limit results for better performance" ) return filters def apply_filters(df: pd.DataFrame, filters: Dict[str, Any]) -> pd.DataFrame: """ Apply all filters to dataframe using vectorized operations. Args: df (pd.DataFrame): Input dataframe to filter filters (Dict[str, Any]): Dictionary containing filter parameters: - themes (list): List of themes to match exactly - source_common_name (str): Partial match for source name - document_identifier (str): Partial match for document ID - sharing_image (str): Partial match for image URL - date_range (tuple): (start_date, end_date) tuple - record_limit (int): Maximum number of records to return Returns: pd.DataFrame: Filtered dataframe """ filtered_df = df.copy() # Theme exact match filter - set regex groups to be non-capturing using (?:) syntax if filters["themes"]: pattern = r'(?:^|;)(?:{})(?:$|,|;)'.format('|'.join(map(re.escape, filters["themes"]))) filtered_df = filtered_df[filtered_df["V2EnhancedThemes"].str.contains(pattern, na=False)] # Text partial match filters text_columns = { "source_common_name": "SourceCommonName", "document_identifier": "DocumentIdentifier", "sharing_image": "V2.1SharingImage" } for filter_key, col_name in text_columns.items(): if value := filters.get(filter_key): filtered_df = filtered_df[ filtered_df[col_name] .str.contains(re.escape(value), case=False, na=False) ] # Date range filter with validation if len(filters["date_range"]) == 2: start_date, end_date = filters["date_range"] # Validate date range if start_date > end_date: st.error("Start date must be before end date") return filtered_df date_col = "DATE" try: # Convert full datetime strings to datetime objects using constant date_series = pd.to_datetime(filtered_df[date_col], format=FULL_DATE_FORMAT) # Create timestamps for start/end of day start_timestamp = pd.Timestamp(start_date).normalize() # Start of day end_timestamp = pd.Timestamp(end_date) + pd.Timedelta(days=1) - pd.Timedelta(seconds=1) # End of day filtered_df = filtered_df[ (date_series >= start_timestamp) & (date_series <= end_timestamp) ] except Exception as e: st.error(f"Error applying date filter: {str(e)}") return filtered_df # Apply record limit return filtered_df.head(filters["record_limit"]) def main(): """Main application entry point.""" df = load_data() if df.empty: st.warning("No data available - check data source") return initialize_app(df) filters = create_filters(df) filtered_df = apply_filters(df, filters) # Display results st.subheader(f"Results: {len(filtered_df)} records") st.dataframe(filtered_df, use_container_width=True) st.download_button( label="Download CSV", data=filtered_df.to_csv(index=False).encode(), file_name="filtered_results.csv", mime="text/csv", help="Download filtered results as CSV" ) main()