from flask import Flask, render_template, request, redirect, url_for from joblib import load import pandas as pd import re from customFunctions import * import json import datetime import numpy as np from huggingface_hub import hf_hub_download import torch import os pd.set_option('display.max_colwidth', 1000) # Patch torch.load to always load on CPU original_torch_load = torch.load def cpu_load(*args, **kwargs): return original_torch_load(*args, map_location=torch.device('cpu'), **kwargs) torch.load = cpu_load def load_pipeline_from_hub(filename): cache_dir = "/tmp/hf_cache" os.environ["HF_HUB_CACHE"] = cache_dir # optional but informative repo_id = 'hw01558/nlp-coursework-pipelines' local_path = hf_hub_download(repo_id=repo_id, filename=filename, cache_dir=cache_dir) return load(local_path) #repo_id = 'hw01558/nlp-coursework-pipelines' #local_path = hf_hub_download(repo_id=repo_id, filename=filename) #return load(local_path) PIPELINES = [ { 'id': 8, 'name': 'Embedded using BioWordVec', 'filename': "pipeline_ex3_s4.joblib" }, { 'id': 1, 'name': 'Baseline', 'filename': "pipeline_ex1_s1.joblib" }, { 'id': 2, 'name': 'Trained on a FeedForward NN', 'filename': "pipeline_ex1_s2.joblib" }, { 'id': 3, 'name': 'Trained on a CRF', 'filename': "pipeline_ex1_s3.joblib" }, { 'id': 4, 'name': 'Trained on a small dataset', 'filename': "pipeline_ex2_s3.joblib" }, { 'id': 5, 'name': 'Trained on a large dataset', 'filename': "pipeline_ex2_s2.joblib" }, { 'id': 6, 'name': 'Embedded using TFIDF', 'filename': "pipeline_ex3_s2.joblib" }, { 'id': 7, 'name': 'Embedded using GloVe', 'filename': "pipeline_ex3_s3.joblib" }, ] pipeline_metadata = [{'id': p['id'], 'name': p['name']} for p in PIPELINES] def get_pipeline_by_id(pipelines, pipeline_id): return next((p['filename'] for p in pipelines if p['id'] == pipeline_id), None) def get_name_by_id(pipelines, pipeline_id): return next((p['name'] for p in pipelines if p['id'] == pipeline_id), None) def requestResults(text, pipeline): labels = pipeline.predict(text) if isinstance(labels, np.ndarray): labels = labels.tolist() return labels[0] import os import logging #logging.basicConfig( # level=logging.INFO, # format='%(asctime)s [%(levelname)s] %(message)s', # handlers=[ # logging.FileHandler("app.log",mode='w') # #] #) LOG_FILE = "./usage_log.jsonl" # Use temporary file path for Hugging Face Spaces def log_interaction(user_input, model_name, predictions): # https://betterstack.com/community/guides/logging/how-to-start-logging-with-python/ logging.basicConfig(filename=LOG_FILE, level=logging.INFO) log_entry = { "timestamp": datetime.datetime.utcnow().isoformat(), "model": model_name, "user_input": user_input, "predictions": predictions } try: # os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) # with open(LOG_FILE, "a") as log_file: # log_file.write(json.dumps(log_entry) + "\n") logging.info(log_entry) print("[INFO] Logged interaction successfully.") except Exception as e: print(f"[ERROR] Could not write log entry: {e}") app = Flask(__name__) @app.route('/') def index(): return render_template('index.html', pipelines= pipeline_metadata) @app.route('/', methods=['POST']) def get_data(): if request.method == 'POST': text = request.form['search'] tokens = re.findall(r"\w+|[^\w\s]", text) tokens_fomatted = pd.Series([pd.Series(tokens)]) pipeline_id = int(request.form['pipeline_select']) pipeline = load_pipeline_from_hub(get_pipeline_by_id(PIPELINES, pipeline_id)) name = get_name_by_id(PIPELINES, pipeline_id) labels = requestResults(tokens_fomatted, pipeline) results = dict(zip(tokens, labels)) log_interaction(text, name, results) return render_template('index.html', results=results, name=name, pipelines= pipeline_metadata) if __name__ == '__main__': app.run(host="0.0.0.0", port=7860) #if __name__ == '__main__': #app.run(host="0.0.0.0", port=7860)