# -*- coding: utf-8 -*- import re, unicodedata, warnings, branca, folium, gradio as gr import pandas as pd, geopandas as gpd, numpy as np from pandas.api.types import is_datetime64_any_dtype from shapely.geometry import Point from folium.plugins import HeatMap from sklearn.cluster import DBSCAN import plotly.express as px warnings.filterwarnings("ignore") # ─────────────────── helpers ─────────────────── norm = lambda t: unicodedata.normalize("NFKD", t).encode("ascii", "ignore").decode() snake = lambda cols: [re.sub(r"[^\w]+", "_", norm(c).strip().lower()).strip("_") for c in cols] sin_dato = lambda s: s.fillna("Sin dato").replace("", "Sin dato") NUM_VARS = ["edad", "creatinina"] ENV_VARS_GRAFICOS = ["PM2.5", "Ozono", "Temperatura", "Precipitación", "Viento"] # ─────────────────── rutas ───────────────────── DATA_XLSX = "VasculitisAsociadasA-Bdd3_DATA_LABELS_2025-04-16_1949 (1).xlsx" LOCALIDADES = "loca.json" GEO_AMBIENTALES = { "PM10": "pm10_prom_anual.geojson", "PM2.5": "pm25_prom_anual_2023 (2).geojson", "Ozono": "ozono_prom_anual_2022 (2).geojson", "Temperatura": "temp_anualprom_2023 (2).geojson", "Precipitación": "precip_anualacum_2023 (2).geojson", "Viento": "vel_viento_0_23h_anual_2023.geojson", "WQI": "tramo_wqi.geojson", "Heatmap pacientes": None } # ─────────── mapa variables ─────────── META_CAPAS = { "PM10": ("conc_pm10", "µg/m³", branca.colormap.linear.OrRd_09, "id", "Zona"), "PM2.5": ("conc_pm25", "µg/m³", branca.colormap.linear.Reds_09, "id", "Zona"), "Ozono": ("conc_ozono", "ppb", branca.colormap.linear.PuBuGn_09, "id", "Zona"), "Temperatura": ("temperatur","°C", branca.colormap.linear.YlOrBr_09, "id", "Zona"), "Precipitación":("precip_per","mm", branca.colormap.linear.Blues_09, "id", "Zona"), "Viento": ("velocidad", "m/s", branca.colormap.linear.GnBu_09, "id", "Zona"), "WQI": ("wqi_val", "", None, "tramo", "Tramo") } ANT_COLS_HUMAN = { "Diabetes": "antecedente_personal_de_diabetes", "Falla cardíaca": "antecedente_personal_de_falla_cardiaca", "EPOC": "antecedente_personal_de_epoc", "Hipertensión": "antecedente_personal_de_hipertension_arterial", "VIH": "antecedente_personal_de_vih", "Enf. autoinmune": "antecedente_personal_de_otra_enfermedad_autoinmune", "Cáncer": "antecedente_personal_de_cancer" } DISPLAY_MAP = {"Localidad": "localidad", "Estrato": "estrato_socioeconomico_cat", "Hallazgo Biopsia": "biopsia_patron_str", **ANT_COLS_HUMAN} _resolve = lambda v: DISPLAY_MAP.get(v, v) # ─────────── pacientes ─────────── df_all = pd.read_excel(DATA_XLSX, dtype=str) df_all.columns = snake(df_all.columns) lat_col = next(c for c in df_all.columns if "residencia" in c and "latitud" in c) lon_col = next(c for c in df_all.columns if "residencia" in c and "longitud" in c) df_all = df_all.rename(columns={lat_col: "latitud_raw", lon_col: "longitud_raw"}) df_all["latitud"] = pd.to_numeric(df_all["latitud_raw"].str.replace(",", "."), errors="coerce") df_all["longitud"] = pd.to_numeric(df_all["longitud_raw"].str.replace(",", "."), errors="coerce") for col in ("genero", "estrato_socioeconomico"): df_all[f"{col}_cat"] = sin_dato(df_all.get(col)) bins = list(range(0, 105, 5)) age_labels = [f"{b}-{b+4}" for b in bins[:-1]] df_all["edad"] = pd.to_numeric(df_all.get("edad_en_anos_del_paciente").str.replace(",", "."), errors="coerce") df_all["edad_cat"] = pd.Categorical( sin_dato(pd.cut(df_all["edad"], bins=bins, labels=age_labels, right=False).astype(str)), categories=age_labels + ["Sin dato"], ordered=True) for col in ("ancas", "mpo", "pr3"): df_all[f"{col.split('s')[0]}_cat"] = sin_dato(df_all.get(col)) clin_cols = { "sindrome_renal": "sindrome_renal_al_ingreso", "manifestaciones_extrarenales": "manifestaciones_extrarenales", "proteinuria": "proteinuria", } for dst, src in clin_cols.items(): df_all[dst] = sin_dato(df_all.get(src)).str.capitalize() df_all["creatinina"] = pd.to_numeric(df_all.get("creatinina").str.replace(",", "."), errors="coerce") for k, col in ANT_COLS_HUMAN.items(): if col in df_all.columns: vals = df_all[col].astype(str).str.lower() df_all[col] = np.where(vals.isin(["si", "sí", "checked", "1", "positivo"]), "Positivo", "Negativo") else: df_all[col] = "Negativo" bio_raw = [c for c in df_all.columns if c.startswith("hallazgos_histologicos_en_biopsia")] ren_bio = {c: f"bio_{i}" for i, c in enumerate(bio_raw, 1)} df_all = df_all.rename(columns=ren_bio) BIO_REGEX = [ (r"sin_alteraciones$", "Sin alteraciones"), (r"sin_proliferacion_extracapilar", "Necrosis sin PC"), (r"menos_del_50.*focal", "Focal"), (r"clase_mixta", "Mixta"), (r"mas_del_50.*cresc", "Crescéntica"), (r"sin_compromiso_glomerular$", "Vasculitis sin glom."), (r"con_compromiso_glomerular$", "Vasculitis + glom."), (r"sin_dato$", "Sin dato") ] raw2short = {next(r for r in bio_raw if re.search(p, r)): s for p, s in BIO_REGEX} def hallar(r): return [raw2short[raw] for raw, flag in ren_bio.items() if str(r[flag]).strip().lower() in ("si", "sí", "checked", "1", "positivo")] or ["Sin dato"] df_all["biopsia_patrones"] = df_all.apply(hallar, axis=1) df_all["biopsia_patron_str"] = df_all["biopsia_patrones"].apply("; ".join) df_all["biopsia_positiva"] = np.where(df_all["biopsia_patron_str"] == "Sin dato", "Negativo", "Positivo") # ─────────── localidades ─────────── geo_loc = gpd.read_file(LOCALIDADES).to_crs(4326) geo_loc.columns = snake(geo_loc.columns) geo_loc = geo_loc.rename(columns={"locnombre": "localidad"}) geo_loc["localidad"] = geo_loc["localidad"].str.upper() geom_pts = df_all.dropna(subset=["latitud", "longitud"]).copy() geom_pts["geometry"] = [Point(xy) for xy in zip(geom_pts["longitud"], geom_pts["latitud"])] geom_pts = gpd.GeoDataFrame(geom_pts, geometry="geometry", crs=4326) geom_pts = gpd.sjoin(geom_pts, geo_loc[["localidad", "geometry"]], how="left", predicate="within").drop(columns="index_right") df_all = df_all.merge(geom_pts[["localidad"]], left_index=True, right_index=True, how="left") # ─────────── capas ─────────── def load_gjson(pth): g = gpd.read_file(pth).to_crs(4326) g.columns = snake(g.columns) for c in g.columns: if g[c].dtype == object: g[c] = pd.to_numeric(g[c].str.strip(), errors="ignore") if is_datetime64_any_dtype(g[c]): g[c] = g[c].astype(str) return g caps_base = {k: load_gjson(v) for k, v in GEO_AMBIENTALES.items() if v} wqi_bins = [0, 20, 35, 50, 70, 100] wqi_labels = ["Pobre", "Marginal", "Regular", "Buena", "Excelente"] g_wqi = caps_base["WQI"].copy() g_wqi["wqi_val"] = pd.to_numeric(g_wqi["wqi"], errors="coerce") g_wqi["wqi_cat"] = pd.cut(g_wqi["wqi_val"], bins=wqi_bins, labels=wqi_labels, include_lowest=True) wqi_cmap = branca.colormap.StepColormap(colors=["red", "olive", "purple", "green", "blue"] ,index=wqi_bins,vmin=wqi_bins[0], vmax=wqi_bins[-1],caption="WQI") caps_base["WQI"] = g_wqi META_CAPAS["WQI"] = META_CAPAS["WQI"][:2] + (wqi_cmap,) + META_CAPAS["WQI"][3:] # Los demás bloques (filtros, gráficos, mapa, interfaz) siguen idénticos. ¿Te los incluyo también? # ─────────── filtros ─────────── def filtrar(d, gen, edades, locs, renal, ants, bios, anca, mpo, pr3): d2 = d.copy() if gen != "Todos": d2 = d2[d2["genero_cat"] == gen] if edades: d2 = d2[d2["edad_cat"].isin(edades)] if locs: d2 = d2[d2["localidad"].fillna("Sin dato").isin(locs)] if renal != "Todos": d2 = d2[d2["biopsia_positiva"] == renal] if bios and bios != ["Todos"]: d2 = d2[d2["biopsia_patrones"].apply(lambda lst: any(p in lst for p in bios))] if anca != "Todos": d2 = d2[d2["anca_cat"] == anca] if mpo != "Todos": d2 = d2[d2["mpo_cat"] == mpo] if pr3 != "Todos": d2 = d2[d2["pr3_cat"] == pr3] for ant in ants: if ant == "Todos": continue col = ANT_COLS_HUMAN[ant] d2 = d2[d2[col] == "Positivo"] return d2 # ─────────── conteos dinámicos ─────────── def capas_conteos(pts): caps = {} for capa, g0 in caps_base.items(): if capa in ("Heatmap pacientes", "WQI"): caps[capa] = g0 continue g = g0.copy() g["pacientes"] = 0 join = gpd.sjoin(pts[["geometry"]], g, how="left", predicate="within") counts = join["index_right"].value_counts() g.loc[counts.index, "pacientes"] = counts.values caps[capa] = g return caps # ─────────── helpers gráficos ─────────── def prep_pts(d): d2 = d.dropna(subset=["latitud", "longitud"]).copy() d2["geometry"] = gpd.points_from_xy(d2["longitud"].astype(float), d2["latitud"].astype(float), crs=4326) return gpd.GeoDataFrame(d2, geometry="geometry", crs=4326) def env_series(var, pts): g = capas_conteos(pts)[var] val, uni, *_ = META_CAPAS[var] join = gpd.sjoin(pts[["geometry"]], g[["geometry", val]], how="left", predicate="within") def fmt(r): if pd.isna(r[val]): return "Sin dato" try: v = float(r[val]) return f"Zona {int(r['index_right'])} ({v:.1f} {uni})" except Exception: return str(r[val]) ser = join.apply(fmt, axis=1) ser.index = join.index return ser def env_df(var, pts): g = capas_conteos(pts)[var] val, uni, *_ = META_CAPAS[var] g["zona"] = g.apply(lambda r: f"Zona {int(r['id'])} ({r[val]:.1f} {uni})", axis=1) return g[["zona", "pacientes"]] is_num = lambda v: v in NUM_VARS # ─────────── gráficos univariados ─────────── def g_uni(v, d): col = _resolve(v) if v in ENV_VARS_GRAFICOS: df = env_df(v, prep_pts(d)).sort_values("zona") return px.bar(df, x="zona", y="pacientes", text_auto=True, title=v, labels={"zona": "Zona", "pacientes": "Pacientes"}) if v == "Localidad": s = d[col].fillna("Sin dato") return px.histogram(s, x=s, category_orders={s.name: sorted(s.unique())}, text_auto=True, title="Localidad") if is_num(col): return px.histogram(d, x=col, nbins=20, title=v) order = sorted(d[col].astype(str).unique()) return px.histogram(d, x=col, category_orders={col: order}, text_auto=True, title=v) # ─────────── gráficos bivariados ─────────── def g_bi(x, y, d): x_col = _resolve(x) y_col = _resolve(y) pts = prep_pts(d) if x in ENV_VARS_GRAFICOS: d = d.assign(**{x: env_series(x, pts)}) if y in ENV_VARS_GRAFICOS: d = d.assign(**{y: env_series(y, pts)}) num_x, num_y = is_num(x_col), is_num(y_col) if not num_x and not num_y: ord_x = sorted(map(str, d[x_col].unique())) ord_y = sorted(map(str, d[y_col].unique())) return px.histogram(d, x=x_col, color=y_col, barmode="group", category_orders={x_col: ord_x, y_col: ord_y}, title=f"{x} vs {y}") if num_x and not num_y: return px.box(d, x=y_col, y=x_col, points="all", title=f"{x} vs {y}") if not num_x and num_y: return px.box(d, x=x_col, y=y_col, points="all", title=f"{x} vs {y}") return px.scatter(d, x=x_col, y=y_col, title=f"{x} vs {y}") # ─────────── pop-up de paciente ─────────── def popup(r): lab = lambda k: f"{k}: Positivo
" if r.get(f"{k.lower()}_cat", "").lower() == "positivo" else "" edad = f"{int(r['edad'])} años" if pd.notna(r['edad']) else "Sin dato edad" ants = "; ".join(lbl for lbl, col in ANT_COLS_HUMAN.items() if r.get(col) == "Positivo") or "Ninguno" return (f"Localidad: {r['localidad']}
" f"Edad: {edad}
" f"Género: {r['genero_cat']}
" f"{lab('ANCA')}{lab('MPO')}{lab('PR3')}" f"Biopsia: {'; '.join(r['biopsia_patrones'])}
" f"Antecedentes: {ants}") # ─────────── choropleth ─────────── def choropleth(m, g, val, title, cmap, zfield, zalias): g = g.copy() g[val] = pd.to_numeric(g[val], errors="coerce") for c in g.columns: if is_datetime64_any_dtype(g[c]): g[c] = g[c].astype(str) cm = cmap.scale(g[val].min(), g[val].max()) if cmap is not wqi_cmap else cmap cm.caption = title cm.add_to(m) is_line = g.geometry.iloc[0].geom_type.startswith("Line") style = (lambda f, vc=val: {"color": cm(f['properties'][vc]), "weight": 4, "opacity": .9} if is_line else {"fillColor": cm(f['properties'][vc]), "fillOpacity": .8, "color": "black", "weight": .3}) fields = [zfield, val] aliases = [zalias, title] if "pacientes" in g.columns and val != "pacientes": fields.append("pacientes"); aliases.append("Pacientes") if "wqi_cat" in g.columns: fields.insert(2, "wqi_cat"); aliases.insert(2, "Calidad") if "nombre" in g.columns: fields.insert(1,"nombre"); aliases.insert(1,"Río") folium.GeoJson( g, name=title, style_function=style, highlight_function=lambda _: {"weight": 2, "color": "#444"}, tooltip=folium.GeoJsonTooltip(fields, aliases, sticky=True) ).add_to(m) # ─────────── mapa ─────────── def crear_mapa(d_filt, capas_sel, ver_cluster): pts = prep_pts(d_filt) caps = capas_conteos(pts) g_loc = pts.groupby("localidad").size().reset_index(name="pacientes") geo = geo_loc.merge(g_loc, on="localidad", how="left").fillna({"pacientes": 0}) m = folium.Map(location=[4.65, -74.1], zoom_start=11, tiles="CartoDB positron") choropleth(m, geo, "pacientes", "No. Pacientes", branca.colormap.linear.Reds_09, "localidad", "Localidad") for capa in capas_sel: if capa == "Heatmap pacientes": continue if capa == "WQI": wqi_cmap.add_to(m) choropleth(m, caps["WQI"], "wqi_val", "WQI", wqi_cmap, "tramo", "Tramo") continue val, uni, cmap, zf, za = META_CAPAS[capa] choropleth(m, caps[capa], val, f"{capa} ({uni})", cmap, zf, za) if "Heatmap pacientes" in capas_sel and not pts.empty: HeatMap(pts[["latitud", "longitud"]].values, radius=18, name="Heatmap pacientes").add_to(m) fg = folium.FeatureGroup("Pacientes", overlay=True) for _, r in pts.iterrows(): folium.CircleMarker( (r["latitud"], r["longitud"]), radius=6, color="#c00", fill=True, fill_color="#fff", fill_opacity=.9, popup=popup(r) ).add_to(fg) fg.add_to(m) if ver_cluster and len(pts) > 2: coords = np.radians(pts[["latitud", "longitud"]]) lab = DBSCAN(eps=1/6371, min_samples=3, metric="haversine").fit_predict(coords) pts["cluster"] = lab cl_fg = folium.FeatureGroup("Clústeres (1 km)", overlay=True) pal = branca.colormap.linear.Set1_09 for cl in sorted(c for c in pts["cluster"].unique() if c != -1): color = pal(cl / max(1, pts["cluster"].nunique() - 1)) for _, r in pts[pts["cluster"] == cl].iterrows(): folium.CircleMarker( (r["latitud"], r["longitud"]), radius=7, color=color, fill=True, fill_color=color, fill_opacity=.9, popup=f"Clúster {cl}
"+popup(r) ).add_to(cl_fg) cl_fg.add_to(m) folium.LayerControl(collapsed=False).add_to(m) return m._repr_html_() # ─────────── interfaz Gradio ─────────── gen_opts = ["Todos"] + sorted(df_all["genero_cat"].unique()) age_opts = list(df_all["edad_cat"].dtype.categories) loc_opts = sorted(df_all["localidad"].fillna("Sin dato").unique()) anca_opts = ["Todos"] + sorted(df_all["anca_cat"].unique()) mpo_opts = ["Todos"] + sorted(df_all["mpo_cat"].unique()) pr3_opts = ["Todos"] + sorted(df_all["pr3_cat"].unique()) vars_cat = ["Localidad"] + ENV_VARS_GRAFICOS + [ "genero_cat", "estrato_socioeconomico_cat", "edad_cat", "sindrome_renal", "manifestaciones_extrarenales", "proteinuria", "anca_cat","mpo_cat","pr3_cat", ] + list(ANT_COLS_HUMAN.keys())+ ["Hallazgo Biopsia"] vars_all = vars_cat + NUM_VARS with gr.Blocks(title="Vasculitis ANCA Bogotá") as demo: gr.Markdown("## Explorador geoespacial – Vasculitis ANCA (Bogotá)") with gr.Row(): ui_gen = gr.Dropdown(gen_opts, label="Género", value="Todos") ui_age = gr.CheckboxGroup(age_opts, label="Edad (quinquenios)") ui_loc = gr.Dropdown(loc_opts, multiselect=True, label="Localidades") ui_renal = gr.Dropdown(["Todos", "Positivo", "Negativo"], value="Todos", label="Compromiso renal") ui_ant = gr.CheckboxGroup(["Todos"] + list(ANT_COLS_HUMAN.keys()), label="Antecedentes") ui_bio = gr.CheckboxGroup(["Todos"] + sorted(set(sum(df_all["biopsia_patrones"], []))), label="Hallazgo en Biopsia") with gr.Row(): ui_anca = gr.Dropdown(anca_opts, label="ANCA", value="Todos") ui_mpo = gr.Dropdown(mpo_opts, label="MPO", value="Todos") ui_pr3 = gr.Dropdown(pr3_opts, label="PR3", value="Todos") ui_capas = gr.CheckboxGroup(list(GEO_AMBIENTALES.keys()), label="Capas mapa") ui_clu = gr.Checkbox(label="Mostrar clústeres (1 km)") with gr.Tab("Mapa"): btn_map = gr.Button("Generar mapa") out_map = gr.HTML() btn_map.click( lambda *i: crear_mapa(filtrar(df_all, *i[:-2]), i[-2], i[-1]), inputs=[ui_gen, ui_age, ui_loc, ui_renal, ui_ant, ui_bio, ui_anca, ui_mpo, ui_pr3, ui_capas, ui_clu], outputs=out_map) with gr.Tab("Univariado"): ui_var = gr.Dropdown(vars_all, label="Variable") btn_uni = gr.Button("Graficar") out_uni = gr.Plot() btn_uni.click( lambda v, *i: g_uni(v, filtrar(df_all, *i)), inputs=[ui_var, ui_gen, ui_age, ui_loc, ui_renal, ui_ant, ui_bio, ui_anca, ui_mpo, ui_pr3], outputs=out_uni) with gr.Tab("Bivariado"): ui_x = gr.Dropdown(vars_all, label="Variable X") ui_y = gr.Dropdown(vars_all, label="Variable Y") btn_bi = gr.Button("Graficar") out_bi = gr.Plot() btn_bi.click( lambda x, y, *i: g_bi(x, y, filtrar(df_all, *i)), inputs=[ui_x, ui_y, ui_gen, ui_age, ui_loc, ui_renal, ui_ant, ui_bio, ui_anca, ui_mpo, ui_pr3], outputs=out_bi) if __name__ == "__main__": demo.launch()