hedtorresca's picture
Update app.py
166d718 verified
# app.py – Explorador geoespacial Vasculitis ANCA (Bogotá)
# ─────────────────────────────────────────────────────────────
# • Carga el Excel “Vasculitis…2025‑04‑16_1949 (1).xlsx”.
# • Normaliza los nombres de columna a snake_case ASCII.
# • Renombra dinámicamente latitud / longitud.
# • Deriva:
# – edad_cat (quinquenios)
# – flags de antecedentes
# – patrón de biopsia resumido
# • Filtros completos por género, edad, localidad, ANCA, MPO, PR3,
# antecedentes, patrón de biopsia y compromiso renal.
# • Mapa Folium con:
# – coroplético pacientes / localidad
# – capas ambientales (PM10, PM2.5, Ozono, Temp, Precip, Viento, WQI)
# – heatmap opcional
# – una sola capa de clústeres 1 km con pop‑ups resumidos
# • Gráficos Univariado y Bivariado que aceptan TODAS las variables
# (numéricas → histograma / dispersión; categóricas → barras / box‑plot).
# • Toda etiqueta de biopsia u antecedente usa la forma corta
# (p.ej. “Crescéntica”, “Vasculitis + glom.”, “Hipertensión”, “EPOC”…).
import re, unicodedata, warnings, branca, folium, gradio as gr
import pandas as pd, geopandas as gpd, numpy as np
from shapely.geometry import Point
from folium.plugins import HeatMap
from sklearn.cluster import DBSCAN
import plotly.express as px, plotly.graph_objects as go
import pandas.api.types as ptypes
import math
warnings.filterwarnings("ignore")
def snake(cols):
out = []
for col in cols:
txt = unicodedata.normalize("NFKD", col)
txt = txt.encode("ascii", "ignore").decode("utf-8")
txt = re.sub(r"[^\w]+", "_", txt.strip().lower())
out.append(txt.strip("_"))
return out
DATA_XLSX = "VasculitisAsociadasA-Bdd3_DATA_LABELS_2025-04-16_1949 (1).xlsx"
LOCALIDADES = "loca.json"
GEO_AMBIENTALES = {
"PM10": "pm10_prom_anual.geojson",
"PM2.5": "pm25_prom_anual_2023 (2).geojson",
"Ozono": "ozono_prom_anual_2022 (2).geojson",
"Temperatura": "temp_anualprom_2023 (2).geojson",
"Precipitación": "precip_anualacum_2023 (2).geojson",
"Viento": "vel_viento_0_23h_anual_2023.geojson",
"WQI": "tramo_wqi.geojson",
"Heatmap pacientes": None
}
META_CAPAS = {
"PM10": ("conc_pm10", "µg/m³", branca.colormap.linear.OrRd_09, "id", "Zona"),
"PM2.5": ("conc_pm25", "µg/m³", branca.colormap.linear.Reds_09, "id", "Zona"),
"Ozono": ("conc_ozono", "ppb", branca.colormap.linear.PuBuGn_09, "id", "Zona"),
"Temperatura": ("temperatur", "°C", branca.colormap.linear.YlOrBr_09, "id", "Zona"),
"Precipitación": ("precip_per", "mm", branca.colormap.linear.Blues_09, "id", "Zona"),
"Viento": ("velocidad", "m/s", branca.colormap.linear.GnBu_09, "id", "Zona"),
"WQI": ("wqi", "", branca.colormap.linear.Greens_09, "tramo", "Tramo")
}
# ─── 1. Pacientes ────────────────────────────────────────
df = pd.read_excel(DATA_XLSX, dtype=str)
df.columns = snake(df.columns)
col_lat = next(c for c in df.columns if "residencia" in c and "latitud" in c)
col_lon = next(c for c in df.columns if "residencia" in c and "longitud" in c)
df = df.rename(columns={col_lat:"latitud", col_lon:"longitud"})
df["latitud"] = pd.to_numeric(df["latitud"].str.replace(",", "."), errors="coerce")
df["longitud"] = pd.to_numeric(df["longitud"].str.replace(",", "."), errors="coerce")
df = df.dropna(subset=["latitud","longitud"])
df["geometry"] = df.apply(lambda r: Point(r["longitud"], r["latitud"]), axis=1)
df = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")
# ─── 2. Localidades ─────────────────────────────────────
geo_loc = gpd.read_file(LOCALIDADES).to_crs("EPSG:4326")
geo_loc.columns = snake(geo_loc.columns)
loc_col = next(c for c in geo_loc.columns if "localidad" in c or "locnombre" in c)
geo_loc = geo_loc.rename(columns={loc_col:"localidad"})
geo_loc["localidad"] = geo_loc["localidad"].str.upper()
df = gpd.sjoin(df, geo_loc[["localidad","geometry"]], how="left", predicate="within") \
.drop(columns="index_right")
# ─── 3. Capas ambientales ───────────────────────────────
def load_gjson(path):
g = gpd.read_file(path).to_crs("EPSG:4326")
g.columns = snake(g.columns)
for c in g.columns:
if ptypes.is_datetime64_any_dtype(g[c].dtype):
g[c] = g[c].astype(str)
elif g[c].dtype == object:
txt = g[c].str.strip()
if txt.str.match(r"^-?\d+(\.\d+)?$").all():
g[c] = txt.astype(float)
else:
g[c] = txt
return g
caps_amb = {k: load_gjson(v) for k,v in GEO_AMBIENTALES.items() if v}
wqi_bins = [0, 20, 35, 50, 70, 100]
wqi_labels = ["Pobre", "Marginal", "Regular", "Buena", "Excelente"]
wqi_colors = ["red", "olive", "purple", "green", "blue"]
# 2) Extrae el GeoDataFrame de WQI y conviértelo a numérico
g_wqi = caps_amb["WQI"].copy()
g_wqi["wqi_val"] = pd.to_numeric(g_wqi["wqi"], errors="coerce")
# 3) Crea la categoría
g_wqi["wqi_cat"] = pd.cut(
g_wqi["wqi_val"],
bins=wqi_bins,
labels=wqi_labels,
include_lowest=True
)
# 4) Construye el colormap por pasos
WQI_COLORMAP = branca.colormap.StepColormap(
colors=wqi_colors,
index=wqi_bins,
vmin=wqi_bins[0],
vmax=wqi_bins[-1],
caption="WQI"
)
# 5) Guarda de nuevo en caps_amb
caps_amb["WQI"] = g_wqi
# ─── 4. Derivadas y flags ───────────────────────────────
df["genero_cat"] = df.get("genero","").str.capitalize()
df["estrato_cat"] = df.get("estrato_socioeconomico","").str.capitalize()
df["edad"] = pd.to_numeric(df.get("edad_en_anos_del_paciente","").str.replace(",", "."), errors="coerce")
bins = list(range(0,105,5))
labels = [f"{b}-{b+4}" for b in bins[:-1]]
df["edad_cat"] = pd.cut(df["edad"], bins=bins, labels=labels, right=False)
df["anca_cat"] = df.get("ancas")
df["mpo_cat"] = df.get("mpo")
df["pr3_cat"] = df.get("pr3")
df["sindrome_renal"] = df.get("sindrome_renal_al_ingreso","").str.capitalize()
df["manifestaciones_extrarenales"] = df.get("manifestaciones_extrarenales","").str.capitalize()
df["proteinuria"] = df.get("proteinuria","").str.capitalize()
df["creatinina"] = pd.to_numeric(df.get("creatinina","").str.replace(",", "."), errors="coerce")
ante_cols = {
"diabetes":"antecedente_personal_de_diabetes",
"falla_cardiaca":"antecedente_personal_de_falla_cardiaca",
"epoc":"antecedente_personal_de_epoc",
"hipertension":"antecedente_personal_de_hipertension_arterial",
"vih":"antecedente_personal_de_vih",
"autoinmune":"antecedente_personal_de_otra_enfermedad_autoinmune",
"cancer":"antecedente_personal_de_cancer"
}
resumen_ante = {
"diabetes":"Diabetes",
"falla_cardiaca":"Falla cardíaca",
"epoc":"EPOC",
"hipertension":"Hipertensión",
"vih":"VIH",
"autoinmune":"Enf. autoinmune",
"cancer":"Cáncer"
}
for key,col in ante_cols.items():
df[key] = (df.get(col,"0").astype(str).str.lower()
.map({"si":1,"sí":1,"checked":1,"1":1})
.fillna(0).astype(int)
)
bio_raw = [c for c in df.columns if c.startswith("hallazgos_histologicos_en_biopsia")]
ren_bio = {c:f"bio_{i}" for i,c in enumerate(bio_raw,1)}
df = df.rename(columns=ren_bio)
bio_cols = list(ren_bio.values())
BIO_REGEX = [
(r"sin_alteraciones$", "Sin alteraciones"),
(r"sin_proliferacion_extracapilar", "Necrosis sin PC"),
(r"menos_del_50.*focal", "Focal"),
(r"clase_mixta", "Mixta"),
(r"mas_del_50.*cresc", "Crescéntica"),
(r"sin_compromiso_glomerular$", "Vasculitis sin glom."),
(r"con_compromiso_glomerular$", "Vasculitis + glom."),
(r"sin_dato$", "Sin dato")
]
# crear un dict raw_col → short
raw2short = {}
for patt, short in BIO_REGEX:
raw = next(c for c in bio_raw if re.search(patt, c))
raw2short[raw] = short
# después de raw2short = { … }
resumen_bio_map = raw2short.copy()
def patron_bio(row):
for raw, flag in ren_bio.items():
if str(row[flag]).strip().lower() in ("si","sí","checked","1"):
return raw2short.get(raw, "Sin dato")
return "Sin dato"
df["biopsia_patron"] = df.apply(patron_bio, axis=1)
df["biopsia_positiva"] = np.where(df["biopsia_patron"]=="Sin dato","No","Si")
# ─── 5. Filtrado ────────────────────────────────────────
def filtrar(d, gen, edades, locs, renal, ants, bios, anca, mpo, pr3):
d2 = d.copy()
if gen!="Todos": d2 = d2[d2["genero_cat"]==gen]
if edades: d2 = d2[d2["edad_cat"].isin(edades)]
if locs: d2 = d2[d2["localidad"].isin(locs)]
if renal!="Todos": d2 = d2[d2["biopsia_positiva"]==renal]
if bios and bios!=["Todos"]:
d2 = d2[d2["biopsia_patron"].isin(bios)]
if anca!="Todos": d2 = d2[d2["anca_cat"]==anca]
if mpo!="Todos": d2 = d2[d2["mpo_cat"]==mpo]
if pr3!="Todos": d2 = d2[d2["pr3_cat"]==pr3]
for ant in ants:
if ant=="Todos": continue
key = next(k for k,v in resumen_ante.items() if v==ant)
d2 = d2[d2[key]==1]
return d2
# ─── 6. Mapas ───────────────────────────────────────────
# ─── 6. Mapas ───────────────────────────────────────────
def choropleth(m, g, val, title, cmap, zfield, zalias):
g = g.copy()
g[val] = pd.to_numeric(g[val], errors="coerce")
vmin, vmax = g[val].min(), g[val].max()
cm = cmap.scale(vmin, vmax)
cm.caption = title
cm.add_to(m)
is_line = g.geometry.iloc[0].geom_type.startswith("Line")
style = (
lambda f,vc=val: {"color":cm(f["properties"][vc]),"weight":4,"opacity":0.9}
) if is_line else (
lambda f,vc=val: {"fillColor":cm(f["properties"][vc]),"fillOpacity":0.8,
"color":"black","weight":0.3}
)
fields = [zfield, val]
aliases = [zalias, title]
for extra in ("nombre","rio"):
if extra in g.columns:
fields.append(extra); aliases.append("Río"); break
folium.GeoJson(
g, name=title,
style_function=style,
highlight_function=lambda f: {"weight":2,"color":"#444","fillOpacity":0.95},
tooltip=folium.GeoJsonTooltip(fields=fields, aliases=aliases, sticky=True)
).add_to(m)
def capa_clusters(m, d):
"""
Añade al mapa m una capa de clústeres de pacientes (DBSCAN 1 km),
con popups que muestran género, edad (si existe), patrón biopsia y antecedentes.
"""
if d.empty:
return
coords = np.radians(d[["latitud", "longitud"]].astype(float))
if len(coords) < 3:
return
labels = DBSCAN(eps=1/6371, min_samples=3, metric="haversine").fit_predict(coords)
d = d.copy()
d["cluster"] = labels
pal = branca.colormap.linear.Set1_09
fg = folium.FeatureGroup(name="Clústeres (1 km)", overlay=True)
for cl in sorted([c for c in d["cluster"].unique() if c != -1]):
color = pal(cl / max(1, d["cluster"].nunique() - 1))
for _, r in d[d["cluster"] == cl].iterrows():
# Edad segura
if pd.notna(r["edad"]) and not math.isnan(r["edad"]):
edad_txt = f"{int(r['edad'])} años"
else:
edad_txt = "Sin dato edad"
# Antecedentes resumidos
ant = [v for k, v in resumen_ante.items() if r.get(k) == 1]
ants_txt = "; ".join(ant) if ant else "Ninguno"
popup = (
f"Clúster #{cl}<br>"
f"Género: {r['genero_cat']}<br>"
f"Edad: {edad_txt}<br>"
f"Biopsia: {r['biopsia_patron']}<br>"
f"Antecedentes: {ants_txt}"
)
folium.CircleMarker(
location=(r["latitud"], r["longitud"]),
radius=6,
color=color,
fill=True, fill_color=color, fill_opacity=0.9,
weight=1,
popup=popup
).add_to(fg)
fg.add_to(m)
def crear_mapa(d_filt, capas, ver_cluster):
"""
Construye el mapa completo:
- coroplético de pacientes por localidad
- capas ambientales
- heatmap de puntos
- marcadores individuales con popups seguros
- clústeres si ver_cluster=True
"""
# 1) Coroplético por localidad
g = d_filt.groupby("localidad").size().reset_index(name="pacientes")
geo = geo_loc.merge(g, on="localidad", how="left").fillna({"pacientes": 0})
m = folium.Map(location=[4.65, -74.1], zoom_start=11, tiles="CartoDB positron")
choropleth(
m, geo, "pacientes", "Pacientes por localidad (N)",
branca.colormap.linear.Reds_09, "localidad", "Localidad"
)
# 2) Capas ambientales
for capa in capas:
# 1) Saltar el heatmap aquí
if capa == "Heatmap pacientes":
continue
# 2) WQI: paso discreto + leyenda
if capa == "WQI":
# Añadir la leyenda de WQI (continua o en pasos, como prefieras)
WQI_COLORMAP.add_to(m)
folium.GeoJson(
caps_amb["WQI"],
name="WQI (valor y categoría)",
style_function=lambda f: {
"color": WQI_COLORMAP(f["properties"]["wqi_val"]),
"fillColor": WQI_COLORMAP(f["properties"]["wqi_val"]),
"weight": 3,
"fillOpacity": 0.7
},
tooltip=folium.GeoJsonTooltip(
fields=["nombre", # nombre del río
"tramo", # identificador de tramo
"wqi_val"], # valor numérico de WQI
aliases=["Río", # alias para nombre
"Tramo", # alias para tramo
"WQI (valor)"], # alias para wqi_val
sticky=True
)
).add_to(m)
continue # no volver a procesar esta capa
# 3) Resto de capas: color continuo con tu choropleth genérico
gdf = caps_amb.get(capa)
val, uni, cmap, zfield, zalias = META_CAPAS[capa]
if gdf is not None and val in gdf.columns:
choropleth(
m,
gdf,
val,
f"{capa}{' ('+uni+')' if uni else ''}",
cmap,
zfield,
zalias
)
# 3) Heatmap de puntos
if "Heatmap pacientes" in capas and not d_filt.empty:
HeatMap(
d_filt[["latitud", "longitud"]].astype(float).values,
radius=18, name="Heatmap pacientes"
).add_to(m)
# 4) Marcadores individuales
fg_pts = folium.FeatureGroup(name="Puntos pacientes", overlay=True)
for _, r in d_filt.iterrows():
# Edad segura
if pd.notna(r["edad"]) and not math.isnan(r["edad"]):
edad_txt = f"{int(r['edad'])} años"
else:
edad_txt = "Sin dato edad"
# Antecedentes resumidos
ant = [v for k, v in resumen_ante.items() if r.get(k) == 1]
ants_txt = "<br>".join(ant) if ant else "Ninguno"
popup_html = (
f"Localidad: {r['localidad']}<br>"
f"Edad: {edad_txt}<br>"
f"Género: {r['genero_cat']}<br>"
f"Biopsia: {r['biopsia_patron']}<br>"
f"Antecedentes:<br>{ants_txt}"
)
folium.CircleMarker(
location=(r["latitud"], r["longitud"]),
radius=5,
color="#c00",
fill=True, fill_color="white",
fill_opacity=0.85, weight=1,
popup=popup_html
).add_to(fg_pts)
fg_pts.add_to(m)
# 5) Capa de clústeres opcional
if ver_cluster:
capa_clusters(m, d_filt)
folium.LayerControl(collapsed=False).add_to(m)
return m._repr_html_()
# ─── 7. Gráficos ─────────────────────────────────────────
def col_of(v):
"""Mapea nombre legible a columna interna."""
if v in resumen_ante.values():
return next(k for k,val in resumen_ante.items() if val==v)
if v in raw2short.values() or v=="Patrón biopsia":
return "biopsia_patron"
return v
def g_uni(var, d):
if d.empty:
return go.Figure()
col = col_of(var)
# 1) Flags de antecedentes (0/1) → barras de conteo "No"/"Si"
if var in resumen_ante.values():
s = d[col].map({0:"No",1:"Si"})
fig = px.histogram(s, x=s,
category_orders={col:["No","Si"]},
text_auto=True,
title=var)
# 2) Patrón biopsia → barras de conteo de cada categoría
elif var=="Patrón biopsia" or var in raw2short.values():
fig = px.histogram(d, x="biopsia_patron",
category_orders={"biopsia_patron": list(raw2short.values())},
text_auto=True,
title="Patrón biopsia")
# 3) Variables numéricas → histograma
elif d[col].dtype.kind in "if":
fig = px.histogram(d, x=col, nbins=20, title=var)
# 4) Resto categóricas → barras de conteo con color
else:
fig = px.histogram(d, x=col, color=col, text_auto=True, title=var)
fig.update_layout(bargap=0.1)
return fig
def g_bi(x, y, d):
"""
Gráfico bivariado:
- num vs num → scatter con trendline
- num vs cat → boxplot
- cat vs cat → barras agrupadas
Reconoce correctamente:
• Patrones de biopsia (incluida la etiqueta "Patrón biopsia")
• Etiquetas de antecedentes.
"""
if d.empty:
return go.Figure()
# Mapeo de la variable de UI al nombre real de columna en df
def map_var(v):
# Dropdown de patrón de biopsia (UI) → columna biop_patron
if v == "Patrón biopsia":
return "biopsia_patron"
# Cualquier etiqueta corta de biopsia
if v in resumen_bio_map.values():
return "biopsia_patron"
# Etiqueta de antecedente → nombre de flag en df
for key, lab in resumen_ante.items():
if v == lab:
return key
# Variables numéricas o de texto sin transformar
return v
cx = map_var(x)
cy = map_var(y)
# Determinar si cada una es categórica (flags, biopsia o texto)
is_cat = {}
for var in (cx, cy):
is_cat[var] = (
var == "biopsia_patron"
or var in resumen_ante.keys()
or d[var].dtype == object
)
# 1) cat vs cat → histograma agrupado
if is_cat[cx] and is_cat[cy]:
fig = px.histogram(
d,
x=cx,
color=cy,
barmode="group",
category_orders={
cx: list(resumen_bio_map.values()) if cx=="biopsia_patron" else list(resumen_ante.values()),
cy: list(resumen_bio_map.values()) if cy=="biopsia_patron" else list(resumen_ante.values()),
},
labels={cx: x, cy: y},
title=f"{x} vs {y}"
)
# 2) num vs cat → boxplot
elif is_cat[cx] ^ is_cat[cy]:
# uno es categórico, otro numérico
if is_cat[cx]:
fig = px.box(
d,
x=cx,
y=cy,
points="all",
category_orders={cx: list(resumen_bio_map.values()) if cx=="biopsia_patron" else list(resumen_ante.values())},
labels={cx: x, cy: y},
title=f"{x} vs {y}"
)
else:
fig = px.box(
d,
x=cy,
y=cx,
points="all",
category_orders={cy: list(resumen_bio_map.values()) if cy=="biopsia_patron" else list(resumen_ante.values())},
labels={cx: x, cy: y},
title=f"{x} vs {y}"
)
# 3) num vs num → scatter + trendline
else:
fig = px.scatter(
d,
x=cx,
y=cy,
trendline="ols",
labels={cx: x, cy: y},
title=f"{x} vs {y}"
)
fig.update_layout(bargap=0.1)
return fig
# ─── 8. Interfaz Gradio ───────────────────────────────────
def interfaz():
gen = ["Todos"] + sorted(df["genero_cat"].dropna().unique())
ages = sorted(df["edad_cat"].dropna().unique())
locs = sorted(df["localidad"].dropna().unique())
ancas = ["Todos"] + sorted(df["anca_cat"].dropna().unique())
mpos = ["Todos"] + sorted(df["mpo_cat"].dropna().unique())
pr3s = ["Todos"] + sorted(df["pr3_cat"].dropna().unique())
vars_cat = [
"genero_cat","estrato_cat","edad_cat","sindrome_renal",
"manifestaciones_extrarenales","proteinuria",
"anca_cat","mpo_cat","pr3_cat"
] + ["Patrón biopsia"] + list(resumen_ante.values())
vars_num = ["edad","creatinina"]
vars_all = vars_cat + vars_num
with gr.Blocks(title="Vasculitis ANCA Bogotá") as demo:
gr.Markdown("## Explorador geoespacial – Vasculitis ANCA (Bogotá)")
with gr.Row():
ui_gen = gr.Dropdown(gen, label="Género", value="Todos")
ui_age = gr.CheckboxGroup(ages, label="Edad (quinquenios)")
ui_loc = gr.Dropdown(locs, multiselect=True, label="Localidades")
ui_renal = gr.Dropdown(["Todos","Si","No"], value="Todos", label="Compromiso renal")
ui_ant = gr.CheckboxGroup(["Todos"]+list(resumen_ante.values()), label="Antecedentes")
ui_bio = gr.CheckboxGroup(["Todos"]+list(raw2short.values()), label="Patrón biopsia")
with gr.Row():
ui_anca = gr.Dropdown(ancas, label="ANCA", value="Todos")
ui_mpo = gr.Dropdown(mpos, label="MPO", value="Todos")
ui_pr3 = gr.Dropdown(pr3s, label="PR3", value="Todos")
ui_capas = gr.CheckboxGroup(list(GEO_AMBIENTALES.keys()), label="Capas mapa")
ui_clu = gr.Checkbox(label="Mostrar clústeres (1 km)")
with gr.Tab("Mapa"):
btn_map = gr.Button("Generar mapa")
out_map = gr.HTML()
btn_map.click(
lambda *i: crear_mapa(filtrar(df,*i[:-2]), i[-2], i[-1]),
inputs=[ui_gen,ui_age,ui_loc,ui_renal,
ui_ant,ui_bio,ui_anca,ui_mpo,ui_pr3,
ui_capas,ui_clu],
outputs=out_map
)
with gr.Tab("Univariado"):
ui_var = gr.Dropdown(vars_all, label="Variable")
btn_uni = gr.Button("Graficar")
out_uni = gr.Plot()
btn_uni.click(
lambda v,*i: g_uni(v, filtrar(df,*i)),
inputs=[ui_var,ui_gen,ui_age,ui_loc,ui_renal,
ui_ant,ui_bio,ui_anca,ui_mpo,ui_pr3],
outputs=out_uni
)
with gr.Tab("Bivariado"):
ui_x = gr.Dropdown(vars_all, label="Variable X")
ui_y = gr.Dropdown(vars_all, label="Variable Y")
btn_bi = gr.Button("Graficar")
out_bi = gr.Plot()
btn_bi.click(
lambda x,y,*i: g_bi(x,y, filtrar(df,*i)),
inputs=[ui_x,ui_y,ui_gen,ui_age,ui_loc,ui_renal,
ui_ant,ui_bio,ui_anca,ui_mpo,ui_pr3],
outputs=out_bi
)
demo.launch()
if __name__ == "__main__":
interfaz()