|
|
import os, shutil, zipfile, pickle |
|
|
from typing import List, Tuple |
|
|
import pandas as pd |
|
|
import gradio as gr |
|
|
|
|
|
from datasets import load_dataset |
|
|
from huggingface_hub import hf_hub_download, snapshot_download |
|
|
from autogluon.tabular import TabularPredictor |
|
|
|
|
|
MODEL_REPO_ID = "rlogh/cheese-texture-autogluon-classifier" |
|
|
DATASET_ID = "aslan-ng/cheese-tabular" |
|
|
|
|
|
import os, json |
|
|
import urllib.request |
|
|
|
|
|
USE_HF_LOCAL = os.getenv("USE_HF", "0").lower() in {"1", "true", "yes"} |
|
|
USE_HF_API = os.getenv("USE_HF_API", "0").lower() in {"1", "true", "yes"} |
|
|
HF_MODEL = os.getenv("HF_MODEL", "google/flan-t5-small") |
|
|
HF_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN") |
|
|
|
|
|
def _hf_inference_api(prompt: str) -> str: |
|
|
"""Call HF Inference API for text2text; avoids local downloads.""" |
|
|
url = f"https://api-inference.huggingface.co/models/{HF_MODEL}" |
|
|
req = urllib.request.Request( |
|
|
url, |
|
|
data=json.dumps({"inputs": prompt}).encode("utf-8"), |
|
|
headers={"Authorization": f"Bearer {HF_TOKEN}", "Content-Type": "application/json"}, |
|
|
method="POST", |
|
|
) |
|
|
with urllib.request.urlopen(req, timeout=60) as resp: |
|
|
data = json.loads(resp.read().decode("utf-8")) |
|
|
|
|
|
if isinstance(data, list) and data and "generated_text" in data[0]: |
|
|
return data[0]["generated_text"].strip() |
|
|
if isinstance(data, dict) and "generated_text" in data: |
|
|
return data["generated_text"].strip() |
|
|
|
|
|
return str(data).strip() |
|
|
|
|
|
def explain(structured: dict, engine: str = "auto") -> str: |
|
|
prompt = build_prompt(structured) |
|
|
|
|
|
if USE_HF_API and HF_TOKEN: |
|
|
try: |
|
|
return _hf_inference_api(prompt) |
|
|
except Exception as e: |
|
|
return explain_template(structured) + f"\n\n_Explanation engine fell back to template: {e}_" |
|
|
|
|
|
if USE_HF_LOCAL: |
|
|
try: |
|
|
from transformers import pipeline |
|
|
pipe = pipeline("text2text-generation", model=HF_MODEL) |
|
|
out = pipe(prompt, max_new_tokens=180, do_sample=False) |
|
|
return out[0]["generated_text"].strip() |
|
|
except Exception as e: |
|
|
return explain_template(structured) + f"\n\n_Explanation engine fell back to template: {e}_" |
|
|
|
|
|
return explain_template(structured) |
|
|
|
|
|
def _safe_concat_splits(ds): |
|
|
frames = [] |
|
|
for split in ds.keys(): |
|
|
try: |
|
|
frames.append(ds[split].to_pandas()) |
|
|
except Exception: |
|
|
pass |
|
|
if not frames: |
|
|
raise ValueError("Could not load any splits from the dataset.") |
|
|
return pd.concat(frames, ignore_index=True) |
|
|
|
|
|
def load_cheese_dataset(dataset_id: str) -> pd.DataFrame: |
|
|
|
|
|
try: |
|
|
ds = load_dataset(dataset_id, "default") |
|
|
return _safe_concat_splits(ds) |
|
|
except Exception: |
|
|
ds = load_dataset(dataset_id) |
|
|
return _safe_concat_splits(ds) |
|
|
|
|
|
def _find_dir_with_any_predictor_marker(start_dir: str) -> str: |
|
|
'''Return the first directory containing either 'learner.pkl' (preferred) or 'predictor.pkl'.''' |
|
|
for root, dirs, files in os.walk(start_dir): |
|
|
if "learner.pkl" in files or "predictor.pkl" in files: |
|
|
return root |
|
|
return "" |
|
|
|
|
|
def _symlink_or_copytree(src: str, dst: str): |
|
|
if os.path.exists(dst): |
|
|
return |
|
|
try: |
|
|
os.symlink(src, dst) |
|
|
except Exception: |
|
|
shutil.copytree(src, dst) |
|
|
|
|
|
def _materialize_flat_model_layout(predictor_dir: str, extract_root: str): |
|
|
'''Ensure model subdirs exist in both forms: |
|
|
- predictor_dir/models/<name>/... |
|
|
- extract_root/<name>/... (flat layout some predictors still reference) |
|
|
- predictor_dir/<name>/... (defensive) |
|
|
''' |
|
|
models_dir = os.path.join(predictor_dir, "models") |
|
|
if not os.path.isdir(models_dir): |
|
|
return |
|
|
for name in os.listdir(models_dir): |
|
|
src = os.path.join(models_dir, name) |
|
|
if not os.path.isdir(src): |
|
|
continue |
|
|
for base in (extract_root, predictor_dir): |
|
|
dst = os.path.join(base, name) |
|
|
_symlink_or_copytree(src, dst) |
|
|
|
|
|
def load_predictor_from_hub(repo_id: str) -> Tuple[TabularPredictor, str, str]: |
|
|
|
|
|
extract_root = os.path.join(os.getcwd(), "ag_predictor_unpack") |
|
|
if os.path.exists(extract_root): |
|
|
shutil.rmtree(extract_root) |
|
|
os.makedirs(extract_root, exist_ok=True) |
|
|
|
|
|
zip_candidates = ["cheese_texture_predictor_dir.zip", "predictor_dir.zip", "agModels-predictor.zip"] |
|
|
predictor_dir = "" |
|
|
for fname in zip_candidates: |
|
|
try: |
|
|
zpath = hf_hub_download(repo_id=repo_id, filename=fname) |
|
|
with zipfile.ZipFile(zpath, "r") as zf: |
|
|
zf.extractall(extract_root) |
|
|
predictor_dir = _find_dir_with_any_predictor_marker(extract_root) |
|
|
if predictor_dir: |
|
|
break |
|
|
except Exception as e: |
|
|
print(f"[loader] Zip candidate '{fname}' not usable: {e}") |
|
|
|
|
|
if not predictor_dir: |
|
|
|
|
|
repo_path = snapshot_download(repo_id=repo_id) |
|
|
predictor_dir = _find_dir_with_any_predictor_marker(repo_path) |
|
|
if not predictor_dir: |
|
|
|
|
|
for fname in ("cheese_texture_predictor.pkl", "predictor.pkl"): |
|
|
try: |
|
|
pkl_path = hf_hub_download(repo_id=repo_id, filename=fname) |
|
|
with open(pkl_path, "rb") as f: |
|
|
obj = pickle.load(f) |
|
|
if isinstance(obj, TabularPredictor) or hasattr(obj, "predict"): |
|
|
|
|
|
predictor_dir = os.path.join(os.getcwd(), "ag_predictor_from_pkl") |
|
|
os.makedirs(predictor_dir, exist_ok=True) |
|
|
|
|
|
return obj, predictor_dir, extract_root |
|
|
except Exception as e: |
|
|
print(f"[loader] PKL candidate '{fname}' not usable: {e}") |
|
|
raise FileNotFoundError("Could not locate an AutoGluon predictor directory.") |
|
|
|
|
|
|
|
|
predictor = TabularPredictor.load( |
|
|
predictor_dir, |
|
|
require_version_match=False, |
|
|
require_py_version_match=False, |
|
|
check_packages=False, |
|
|
) |
|
|
|
|
|
|
|
|
_materialize_flat_model_layout(predictor_dir, extract_root) |
|
|
|
|
|
return predictor, predictor_dir, extract_root |
|
|
|
|
|
|
|
|
df_all = load_cheese_dataset(DATASET_ID) |
|
|
expected_cols = {"fat", "origin", "holed", "price", "protein", "texture"} |
|
|
missing = expected_cols.difference(df_all.columns) |
|
|
if missing: |
|
|
raise ValueError(f"Dataset missing expected columns: {missing}") |
|
|
|
|
|
FAT_MIN, FAT_MAX = float(df_all["fat"].min()), float(df_all["fat"].max()) |
|
|
PRICE_MIN, PRICE_MAX = float(df_all["price"].min()), float(df_all["price"].max()) |
|
|
PROTEIN_MIN, PROTEIN_MAX = float(df_all["protein"].min()), float(df_all["protein"].max()) |
|
|
ORIGINS: List[str] = sorted([o for o in df_all["origin"].dropna().unique().tolist() if isinstance(o, str)]) |
|
|
|
|
|
PREDICTOR, PREDICTOR_DIR, EXTRACT_ROOT = load_predictor_from_hub(MODEL_REPO_ID) |
|
|
CLASSES = list(getattr(PREDICTOR, "class_labels", [])) or sorted(df_all["texture"].dropna().unique().tolist()) |
|
|
|
|
|
|
|
|
MODEL_NAMES = ["best"] |
|
|
try: |
|
|
if hasattr(PREDICTOR, "get_model_names"): |
|
|
names = PREDICTOR.get_model_names() |
|
|
if names: |
|
|
MODEL_NAMES += list(names) |
|
|
else: |
|
|
lb = PREDICTOR.leaderboard(silent=True) |
|
|
if hasattr(lb, "columns") and "model" in lb.columns: |
|
|
MODEL_NAMES += lb["model"].tolist() |
|
|
except Exception as e: |
|
|
print("Warning: couldn't fetch base model names; using ['best'] only. Details:", e) |
|
|
|
|
|
|
|
|
SUMMARY_CSS = '''#summary_table table { table-layout: auto !important; width: 100% !important; } |
|
|
#summary_table table th, |
|
|
#summary_table table td { |
|
|
white-space: normal !important; |
|
|
overflow: visible !important; |
|
|
text-overflow: clip !important; |
|
|
max-width: none !important; |
|
|
} |
|
|
''' |
|
|
|
|
|
def _make_df(**kwargs): |
|
|
try: |
|
|
return gr.Dataframe(**kwargs) |
|
|
except TypeError: |
|
|
kwargs.pop("wrap", None) |
|
|
kwargs.pop("column_widths", None) |
|
|
return gr.Dataframe(**kwargs) |
|
|
|
|
|
def _coerce_and_validate(fat, origin, holed, price, protein, top_k): |
|
|
def clamp(val, lo, hi, name): |
|
|
try: |
|
|
v = float(val) |
|
|
except Exception: |
|
|
gr.Warning(f"{name} must be numeric; falling back to dataset default.") |
|
|
return float((lo + hi) / 2.0) |
|
|
if v < lo: |
|
|
gr.Warning(f"{name} below dataset min ({lo:.2f}); clamping to min.") |
|
|
return lo |
|
|
if v > hi: |
|
|
gr.Warning(f"{name} above dataset max ({hi:.2f}); clamping to max.") |
|
|
return hi |
|
|
return v |
|
|
fat_s = clamp(fat, FAT_MIN, FAT_MAX, "fat (g/100g)") |
|
|
price_s = clamp(price, PRICE_MIN, PRICE_MAX, "price (unit)") |
|
|
protein_s = clamp(protein, PROTEIN_MIN, PROTEIN_MAX, "protein (g/100g)") |
|
|
|
|
|
origin_s = origin if origin in ORIGINS else (ORIGINS[0] if ORIGINS else origin) |
|
|
if origin_s != origin: |
|
|
gr.Warning("origin not in dataset categories; resetting to a valid choice.") |
|
|
|
|
|
try: |
|
|
k = int(top_k) |
|
|
except Exception: |
|
|
gr.Warning("Top‑k must be an integer; using 3.") |
|
|
k = 3 |
|
|
k = max(1, min(k, len(CLASSES))) |
|
|
if k != top_k: |
|
|
gr.Info(f"Top‑k adjusted to {k}.") |
|
|
return fat_s, origin_s, bool(holed), price_s, protein_s, k |
|
|
|
|
|
def _predict_with_fallback(X, base_model): |
|
|
'''Try requested/best, then non-NN models as fallback.''' |
|
|
try_order = [] |
|
|
if base_model in (None, "", "best"): |
|
|
try_order.append(None) |
|
|
else: |
|
|
try_order.append(base_model) |
|
|
|
|
|
non_nn = [m for m in MODEL_NAMES |
|
|
if m not in (None, "", "best") |
|
|
and not (m.lower().startswith("nn") or "neuralnet" in m.lower() or "weightedensemble" in m.lower())] |
|
|
try_order.extend([m for m in non_nn if m not in try_order]) |
|
|
errors = [] |
|
|
for m in try_order: |
|
|
try: |
|
|
if m is None: |
|
|
label_pred = PREDICTOR.predict(X).iloc[0] |
|
|
proba_df = PREDICTOR.predict_proba(X) |
|
|
else: |
|
|
label_pred = PREDICTOR.predict(X, model=m).iloc[0] |
|
|
proba_df = PREDICTOR.predict_proba(X, model=m) |
|
|
if m is not None: |
|
|
gr.Info(f"Using base model: {m}") |
|
|
return label_pred, proba_df |
|
|
except Exception as e: |
|
|
errors.append(str(e)) |
|
|
gr.Warning(f"Model '{m or 'best'}' failed; trying a fallback...") |
|
|
raise RuntimeError("All model attempts failed: " + " | ".join(errors)) |
|
|
|
|
|
def do_predict(fat, origin, holed, price, protein, base_model, output_mode, top_k): |
|
|
try: |
|
|
fat_s, origin_s, holed_s, price_s, protein_s, k = _coerce_and_validate(fat, origin, holed, price, protein, top_k) |
|
|
X = pd.DataFrame([{ |
|
|
"fat": float(fat_s), |
|
|
"origin": origin_s, |
|
|
"holed": int(1 if holed_s else 0), |
|
|
"price": float(price_s), |
|
|
"protein": float(protein_s), |
|
|
}]) |
|
|
|
|
|
label_pred, proba_df = _predict_with_fallback(X, base_model) |
|
|
row = proba_df.iloc[0].sort_values(ascending=False) |
|
|
row_top = row.head(k) |
|
|
proba_table_df = pd.DataFrame({ |
|
|
"texture": row_top.index, "probability (%)": (row_top.values * 100).round(2) |
|
|
}).reset_index(drop=True) |
|
|
|
|
|
topk_text = ", ".join([f"{cls}: {(prob*100):.2f}%" for cls, prob in row_top.items()]) |
|
|
summary_df = pd.DataFrame([{ |
|
|
"fat (g/100g)": float(fat_s), |
|
|
"price (unit)": float(price_s), |
|
|
"protein (g/100g)": float(protein_s), |
|
|
"origin": origin_s, |
|
|
"holed (Swiss-style holes)": bool(holed_s), |
|
|
"Base model to use": (base_model or "best"), |
|
|
"Output": output_mode, |
|
|
"Top‑k": int(k), |
|
|
"Predicted texture": str(label_pred), |
|
|
"Top‑k probabilities": topk_text |
|
|
}]) |
|
|
|
|
|
if output_mode == "Label only": |
|
|
return str(label_pred), pd.DataFrame(columns=["texture","probability (%)"]), summary_df |
|
|
elif output_mode == "Probabilities only": |
|
|
return "", proba_table_df, summary_df |
|
|
else: |
|
|
return str(label_pred), proba_table_df, summary_df |
|
|
except Exception as e: |
|
|
gr.Warning(f"Something went wrong while predicting: {e}") |
|
|
empty_probs = pd.DataFrame(columns=["texture","probability (%)"]) |
|
|
empty_summary = pd.DataFrame(columns=[ |
|
|
"fat (g/100g)","price (unit)","protein (g/100g)","origin", |
|
|
"holed (Swiss-style holes)","Base model to use","Output","Top‑k", |
|
|
"Predicted texture","Top‑k probabilities" |
|
|
]) |
|
|
return "", empty_probs, empty_summary |
|
|
|
|
|
def toggle_visibility(output_mode): |
|
|
show_probs = (output_mode != "Label only") |
|
|
show_label = (output_mode != "Probabilities only") |
|
|
return ( |
|
|
gr.update(visible=show_label), |
|
|
gr.update(visible=show_probs), |
|
|
gr.update(visible=show_probs), |
|
|
) |
|
|
|
|
|
with gr.Blocks(title="Cheese Texture Classifier", css=SUMMARY_CSS) as demo: |
|
|
gr.Markdown('''## Cheese Texture (Tabular) — AutoGluon model |
|
|
Predicts **texture** from nutritional/origin features. |
|
|
Model: [`rlogh/cheese-texture-autogluon-classifier`](https://huggingface.co/rlogh/cheese-texture-autogluon-classifier). |
|
|
Set inputs on the left; results on the right.''') |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
fat_slider = gr.Slider( |
|
|
minimum=float(max(0.0, FAT_MIN)), |
|
|
maximum=float(FAT_MAX), |
|
|
step=0.1, |
|
|
value=float(min(30.0, FAT_MAX)), |
|
|
label="fat (g/100g)", |
|
|
info="Fat content per 100g of cheese (from dataset range)" |
|
|
) |
|
|
price_slider = gr.Slider( |
|
|
minimum=float(max(0.0, PRICE_MIN)), |
|
|
maximum=float(PRICE_MAX), |
|
|
step=0.01, |
|
|
value=float(min(3.0, PRICE_MAX)), |
|
|
label="price (unit)", |
|
|
info="Price per unit (dataset units)" |
|
|
) |
|
|
protein_slider = gr.Slider( |
|
|
minimum=float(max(0.0, PROTEIN_MIN)), |
|
|
maximum=float(PROTEIN_MAX), |
|
|
step=0.1, |
|
|
value=float(min(22.0, PROTEIN_MAX)), |
|
|
label="protein (g/100g)", |
|
|
info="Protein content per 100g of cheese (from dataset range)" |
|
|
) |
|
|
origin_dd = gr.Dropdown( |
|
|
choices=ORIGINS, |
|
|
value=("Italy" if "Italy" in ORIGINS else ORIGINS[0]), |
|
|
label="origin", |
|
|
info="Country or region of origin" |
|
|
) |
|
|
holed_cb = gr.Checkbox(value=False, label="holed (Swiss-style holes)", info="Typical Emmental-style holes?") |
|
|
|
|
|
gr.Markdown("### Inference parameters") |
|
|
base_model_dd = gr.Dropdown( |
|
|
choices=MODEL_NAMES, |
|
|
value=MODEL_NAMES[0] if MODEL_NAMES else "best", |
|
|
label="Base model to use", |
|
|
info="Pick the best AutoGluon-ensembled model or a specific base model (if available)" |
|
|
) |
|
|
output_mode_radio = gr.Radio( |
|
|
choices=["Label only", "Probabilities only", "Label + Probabilities"], |
|
|
value="Label + Probabilities", |
|
|
label="Output", |
|
|
info="Choose which outputs to display" |
|
|
) |
|
|
topk_slider = gr.Slider( |
|
|
minimum=1, |
|
|
maximum=max(1, len(CLASSES)), |
|
|
step=1, |
|
|
value=min(3, max(1, len(CLASSES))), |
|
|
label="Top‑k probabilities to show", |
|
|
info="How many top classes to show in the probability table" |
|
|
) |
|
|
|
|
|
with gr.Column(): |
|
|
pred_label = gr.Textbox(label="Predicted texture", interactive=False, visible=True) |
|
|
proba_table = _make_df( |
|
|
headers=["texture", "probability (%)"], |
|
|
datatype=["str", "number"], |
|
|
row_count=(1, "dynamic"), |
|
|
type="pandas", |
|
|
interactive=False, |
|
|
label="Predicted probabilities (sorted)", |
|
|
visible=True, |
|
|
wrap=True, |
|
|
column_widths=[220, 160] |
|
|
) |
|
|
|
|
|
gr.Markdown("### Summary (inputs and results)") |
|
|
summary_table = _make_df( |
|
|
headers=[ |
|
|
"fat (g/100g)","price (unit)","protein (g/100g)","origin", |
|
|
"holed (Swiss-style holes)","Base model to use","Output","Top‑k", |
|
|
"Predicted texture","Top‑k probabilities" |
|
|
], |
|
|
datatype=[ |
|
|
"number","number","number","str","bool","str","str","number","str","str" |
|
|
], |
|
|
row_count=(1, "dynamic"), |
|
|
type="pandas", |
|
|
interactive=False, |
|
|
visible=True, |
|
|
elem_id="summary_table", |
|
|
wrap=True, |
|
|
column_widths=[140, 120, 160, 160, 200, 170, 130, 90, 170, 420] |
|
|
) |
|
|
|
|
|
inputs = [fat_slider, origin_dd, holed_cb, price_slider, protein_slider, base_model_dd, output_mode_radio, topk_slider] |
|
|
|
|
|
for s in [fat_slider, price_slider, protein_slider, topk_slider]: |
|
|
s.release(fn=do_predict, inputs=inputs, outputs=[pred_label, proba_table, summary_table]) |
|
|
for c in [origin_dd, holed_cb, base_model_dd, output_mode_radio]: |
|
|
c.change(fn=do_predict, inputs=inputs, outputs=[pred_label, proba_table, summary_table]) |
|
|
|
|
|
output_mode_radio.change(fn=toggle_visibility, inputs=[output_mode_radio], outputs=[pred_label, proba_table, topk_slider], queue=False) |
|
|
|
|
|
demo.load(fn=do_predict, inputs=inputs, outputs=[pred_label, proba_table, summary_table]) |
|
|
|
|
|
gr.Examples( |
|
|
label="Examples", |
|
|
examples=[ |
|
|
[27.8, "Switzerland", True, 2.20, 26.9, "best", "Label + Probabilities", 3], |
|
|
[4.3, "USA", False, 1.31, 11.1, "best", "Label + Probabilities", 3], |
|
|
[29.0, "Italy", False, 5.73, 28.4, "best", "Label + Probabilities", 3], |
|
|
[27.7, "France", False, 4.63, 20.8, "best", "Label + Probabilities", 3], |
|
|
], |
|
|
inputs=inputs |
|
|
) |
|
|
|
|
|
demo.launch() |
|
|
|