Spaces:
Runtime error
Runtime error
from fastapi import FastAPI, HTTPException, Form, Request | |
from fastapi.responses import JSONResponse | |
import torch | |
import pandas as pd | |
import logging | |
import torch.nn as nn | |
from sklearn.preprocessing import MinMaxScaler | |
from torch_geometric.nn.conv import MessagePassing | |
from torch_geometric.utils import degree | |
import numpy as np | |
from sklearn.metrics.pairwise import cosine_similarity | |
from sklearn import preprocessing as pp | |
import json | |
from pydantic import BaseModel | |
from typing import List, Optional | |
import gradio as gr | |
import os | |
from datasets import load_dataset | |
from scheduler import get_latest_model | |
cache_base = "/app/cache" | |
os.makedirs(f"{cache_base}/huggingface", exist_ok=True) | |
os.makedirs(f"{cache_base}/transformers", exist_ok=True) | |
os.makedirs(f"{cache_base}/datasets", exist_ok=True) | |
# Set all possible Hugging Face cache environment variables | |
os.environ['HF_HOME'] = f"{cache_base}/huggingface" | |
os.environ['TRANSFORMERS_CACHE'] = f"{cache_base}/transformers" | |
os.environ['HF_DATASETS_CACHE'] = f"{cache_base}/datasets" | |
os.environ['HUGGINGFACE_HUB_CACHE'] = f"{cache_base}/huggingface" | |
os.environ['HF_HUB_CACHE'] = f"{cache_base}/huggingface" | |
# Initialize FastAPI and logging | |
app = FastAPI() | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
product_gender_mapping = { | |
"Dental Care Kits": "Unisex", | |
"Lamb Meat": "Unisex", | |
"Whole Chicken": "Unisex", | |
"Hyaluronic Acid": "Female", | |
"Whitening Toothpaste": "Unisex", | |
"Pure Sesame Oil": "Unisex", | |
"Modern Literature": "Unisex", | |
"Organic Sesame Oil": "Unisex", | |
"Premium Olive Oil": "Unisex", | |
"Historical Fiction": "Unisex", | |
"Home Decorations": "Unisex", | |
"Minced Meat": "Unisex", | |
"Fresh Milk": "Unisex", | |
"Skin Health Products": "Female", | |
"Kitchen Towels": "Unisex", | |
"Mineral Water": "Unisex", | |
"Frozen Chicken Drumsticks": "Unisex", | |
"Premium Bedding": "Unisex", | |
"Pepsi Soft Drink": "Unisex", | |
"Organic Milk": "Unisex", | |
"Refined Olive Oil": "Unisex", | |
"Tomato Paste": "Unisex", | |
"Burger Sauce": "Unisex", | |
"Xbox Series X": "Male", | |
"Smart LED TV": "Unisex", | |
"MacBook Pro 16-inch": "Unisex", | |
"iPhone15": "Unisex", | |
"Innovative Home Appliances": "Unisex", | |
"Windbreaker Jacket": "Male", | |
"Natural Shampoo": "Female", | |
"Classic Fiction": "Unisex", | |
"Eyeliner": "Female", | |
"Creamy Mayonnaise": "Unisex", | |
"Coca-Cola Soft Drink": "Unisex", | |
"Training Shorts": "Male", | |
"Pavilion Laptop": "Unisex", | |
"Hyaluronic Acid": "Female", | |
"Inspiron Laptop": "Unisex", | |
"Snack Bars": "Unisex", | |
"Tomato Ketchup": "Unisex", | |
"Blender": "Unisex", | |
"Energy-Efficient Air Conditioner": "Unisex", | |
"Conditionar": "Female", | |
"Advanced Washing Machine": "Unisex", | |
"Hand Cream": "Female", | |
"Hair Cream": "Female", | |
"Mascara": "Female", | |
"Bluetooth Audio System": "Unisex", | |
"Sports Shoes": "Unisex", | |
"PlayStation Console": "Male", | |
"Chili Sauce": "Unisex", | |
"Smart Refrigerator": "Unisex", | |
"Bravia Television": "Unisex", | |
"Formal Shirt": "Male", | |
"ThinkPad Laptop": "Unisex", | |
"Blended Sunflower Oil": "Unisex", | |
"iPhone14": "Unisex", | |
"Split Air Conditioner": "Unisex", | |
"MacBook Pro 13-inch": "Unisex", | |
"Athletic T-shirt": "Male", | |
"iPad": "Unisex", | |
"Galaxy Tablet": "Unisex", | |
"Popular Non-Fiction": "Unisex", | |
"High-Capacity Washing Machine": "Unisex", | |
"iPhone13": "Unisex", | |
"Hair Repair Shampoo": "Female", | |
"Microwave Oven": "Unisex", | |
"Eyeliner": "Female", | |
"Consumer Electronics": "Unisex", | |
"Durable Home Appliances": "Unisex", | |
"Multi-Function Home Appliances": "Unisex", | |
"Hydrating Skincare": "Female", | |
"MacBook Air": "Unisex", | |
"Fruit Juice": "Unisex", | |
"Healthy Juice": "Unisex", | |
"Evening Dress": "Female", | |
"Body Care Essentials": "Female", | |
"Mascara": "Female", | |
"Frozen Chicken": "Unisex", | |
"Hair Serum": "Female", | |
"Ground Meat": "Unisex", | |
"Eyeliner": "Female", | |
"Workout T-shirt": "Male", | |
"Living Room Furniture": "Unisex", | |
"Milk Chocolate": "Unisex", | |
"Shampoo": "Female", | |
"Frozen Chicken Wings": "Unisex", | |
"Beef Cuts": "Unisex", | |
"Instant Coffee": "Unisex", | |
"Home Decorations": "Unisex", | |
"Power Tools": "Male", | |
"Coffee Maker": "Unisex", | |
"Modular Furniture": "Unisex", | |
"Smart TV": "Unisex", | |
"Sunflower Cooking Oil": "Unisex", | |
"Running Shoes": "Unisex", | |
"Gentle Body Care": "Female", | |
"Mascara": "Female", | |
"Bathroom Accessories": "Unisex", | |
"Hair Cream": "Female", | |
"Comfort Bedding": "Unisex", | |
"Thriller Novel": "Unisex", | |
"Track Jacket": "Male", | |
"MacBook Pro 14-inch": "Unisex", | |
"LED Lighting": "Unisex", | |
"Galaxy Smartphone": "Unisex", | |
"Contemporary Literature": "Unisex", | |
"Bathroom Essentials": "Unisex", | |
"Natural Juice": "Unisex", | |
"Smart Watch": "Unisex", | |
"Conditionar": "Female", | |
"Shampoo": "Female", | |
"Casual Jacket": "Male", | |
"iPhone16": "Unisex", | |
"iPhone11": "Unisex", | |
} | |
# Set device to GPU if available | |
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
# # Load and preprocess data | |
# df_all = pd.read_csv("transactions.csv") | |
# Set a writable cache directory | |
os.environ["HF_HOME"] = "/tmp/hf_cache" # Use /tmp, which is writable in Spaces | |
os.makedirs(os.environ["HF_HOME"], exist_ok=True) | |
user_encoder = pp.LabelEncoder() | |
item_encoder = pp.LabelEncoder() | |
# Load dataset with custom cache directory | |
dataset_all = load_dataset("FarahMohsenSamy1/Transactions", cache_dir=os.environ["HF_HOME"]) | |
df = dataset_all['train'].to_pandas() # Convert to pandas DataFrame | |
df["user_id_idx"] = user_encoder.fit_transform(df["Customer_ID"]) | |
user_encoder = pp.LabelEncoder() | |
item_encoder = pp.LabelEncoder() | |
df["user_id_idx"] = user_encoder.fit_transform(df["Customer_ID"]) | |
df["item_id_idx"] = item_encoder.fit_transform(df["Item_ID"]) | |
# df['Timestamp'] = pd.to_datetime(df['Timestamp']) | |
# df['Timestamp_numeric'] = df['Timestamp'].astype('int64') // 10**9 # Seconds since epoch | |
# df["scaled_timestamp"] = MinMaxScaler().fit_transform(df[["Timestamp_numeric"]]) | |
latent_dim = 64 | |
n_layers = 3 | |
n_users = df["user_id_idx"].nunique() | |
n_items = df["item_id_idx"].nunique() | |
COLLAB_WEIGHT = 0.5 | |
CONTENT_WEIGHT = 0.5 | |
# Label encoding and scaling | |
user_label_encoder = pp.LabelEncoder() | |
item_label_encoder = pp.LabelEncoder() | |
date_scaler = MinMaxScaler() | |
def preprocess_data(df, le_user=None, le_item=None, scaler=None): | |
if le_user is not None: | |
df["user_id_idx"] = le_user.fit_transform(df["Customer_ID"].values) | |
if le_item is not None: | |
df["item_id_idx"] = le_item.fit_transform(df["Item_ID"].values) | |
df["Timestamp"] = pd.to_datetime(df["Timestamp"], unit='s') | |
if scaler is not None: | |
# Option 1: scale based on numeric timestamp | |
df["Timestamp_numeric"] = df["Timestamp"].astype(np.int64) // 10**9 | |
df["Date"] = scaler.fit_transform(df[["Timestamp_numeric"]]) | |
return df | |
preprocessed_df = preprocess_data( | |
df, user_label_encoder, item_label_encoder, date_scaler | |
) | |
# Prepare edge_index for the graph-based model | |
u_t = torch.LongTensor(preprocessed_df.user_id_idx.values) | |
i_t = torch.LongTensor(preprocessed_df.item_id_idx.values) + n_users | |
edge_index = torch.stack((torch.cat([u_t, i_t]), torch.cat([i_t, u_t]))).to(device) | |
# Define LightGCNConv model | |
class LightGCNConv(MessagePassing): | |
def __init__(self, **kwargs): | |
super().__init__(aggr="add") | |
def forward(self, x, edge_index): | |
from_, to_ = edge_index | |
deg = degree(to_, x.size(0), dtype=x.dtype) | |
deg_inv_sqrt = deg.pow(-0.5) | |
deg_inv_sqrt[deg_inv_sqrt == float("inf")] = 0 | |
norm = deg_inv_sqrt[from_] * deg_inv_sqrt[to_] | |
return self.propagate(edge_index, x=x, norm=norm) | |
def message(self, x_j, norm): | |
return norm.view(-1, 1) * x_j | |
class RecSysGNN(nn.Module): | |
def __init__(self, latent_dim, num_layers, num_users, num_items): | |
super(RecSysGNN, self).__init__() | |
self.embedding = nn.Embedding(num_users + num_items, latent_dim) | |
self.convs = nn.ModuleList(LightGCNConv() for _ in range(num_layers)) | |
self.init_parameters() | |
def init_parameters(self): | |
nn.init.normal_(self.embedding.weight, std=0.1) | |
def forward(self, edge_index): | |
emb0 = self.embedding.weight | |
embs = [emb0] | |
emb = emb0 | |
for conv in self.convs: | |
emb = conv(x=emb, edge_index=edge_index) | |
embs.append(emb) | |
out = torch.mean(torch.stack(embs, dim=0), dim=0) | |
return emb0, out | |
# model_path = get_latest_model() | |
MODEL_PATH_FILE = "/app/models/latest_model.txt" | |
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
def get_model_path(): | |
"""Reads the latest model path from the file.""" | |
if os.path.exists(MODEL_PATH_FILE): | |
with open(MODEL_PATH_FILE, "r") as f: | |
return f.read().strip() | |
return None | |
# Retrieve the model path from the file | |
model_path = get_model_path() | |
if not model_path: | |
raise FileNotFoundError("Model path file is missing or empty. Please train the model first.") | |
if not os.path.exists(model_path): | |
raise FileNotFoundError(f"Model file not found at '{model_path}'. Please train the model first.") | |
print(f" Loading model from: {model_path}") | |
# Initialize the model | |
model = RecSysGNN( | |
latent_dim=64, num_layers=3, num_users=n_users, num_items=n_items | |
).to(device) | |
# Load the state dictionary | |
state_dict = torch.load(model_path, map_location=device) | |
model_state = model.state_dict() | |
# Filter the state_dict to only load matching parameters | |
filtered_state_dict = {k: v for k, v in state_dict.items() if k in model_state and v.size() == model_state[k].size()} | |
# Update the model state dictionary with the filtered parameters | |
model_state.update(filtered_state_dict) | |
# Load the model state into the model | |
model.load_state_dict(model_state) | |
# Set the model to evaluation mode | |
model.eval() | |
print(f" Model loaded successfully from: {model_path}") | |
# Create user-product rating matrix | |
user_product_rating = preprocessed_df.pivot_table( | |
index="user_id_idx", columns="Item_ID", values="rating" | |
) | |
user_product_rating.fillna(0, inplace=True) | |
# Cosine similarity for content-based filtering | |
product_features = ( | |
preprocessed_df[["Item_ID", "Product_Name", "Product_Category", "Product_Brand", "Price"]] | |
.drop_duplicates() | |
.set_index("Item_ID") | |
) | |
product_features_encoded = pd.get_dummies(product_features) | |
cosine_sim_df = pd.DataFrame( | |
cosine_similarity(product_features_encoded), | |
index=product_features_encoded.index, | |
columns=product_features_encoded.index, | |
) | |
# Item ID mapping | |
item_id_mapping = dict(zip(preprocessed_df["item_id_idx"], preprocessed_df["Item_ID"])) | |
product_name_mapping = dict( | |
zip(preprocessed_df["Item_ID"], preprocessed_df["Product_Name"]) | |
) | |
user_gender_mapping = dict( | |
zip(preprocessed_df["user_id_idx"], preprocessed_df["Customer_Gender"]) | |
) | |
cosine_sim_df.fillna(0, inplace=True) | |
# Set up logging configuration | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
def content_based_filtering(user_id, top_k=20, time_weight=0.5): | |
try: | |
logging.info(f"Started content-based filtering for user {user_id}") | |
user_transactions = df[df["user_id_idx"] == user_id].sort_values(by="Timestamp", ascending=False) | |
content_scores = [] | |
if user_id not in user_product_rating.index: | |
logging.warning(f"User {user_id} not found in rating matrix.") | |
return [] | |
user_ratings = user_product_rating.loc[user_id] | |
for _, transaction in user_transactions.iterrows(): | |
product = transaction["Item_ID"] | |
timestamp = transaction["Timestamp"] | |
time_factor = 1 / (1 + np.exp(-time_weight * timestamp)) | |
if product in cosine_sim_df.index: | |
similar_products = cosine_sim_df.loc[product].nlargest(top_k) | |
for similar_product, score in similar_products.items(): | |
weighted_score = (score * user_ratings.get(product, 0)) * time_factor | |
content_scores.append({ | |
"item_id": similar_product, | |
"score": weighted_score | |
}) | |
return sorted(content_scores, key=lambda x: x["score"], reverse=True)[:top_k] | |
except Exception as e: | |
logging.error(f"Error in content-based filtering for user {user_id}: {e}") | |
return [] | |
# Define the class to receive the new user's preferences | |
class NewUserPreferences(BaseModel): | |
user_id: int | |
liked_categories: list | |
# Find the most similar user based on liked categories | |
def get_most_similar_user_by_categories(liked_categories): | |
if not liked_categories: # Ensure it's a valid list | |
return None | |
# Find users who bought products from the same categories | |
similar_users = preprocessed_df[ | |
preprocessed_df["Product_Category"].isin(liked_categories) | |
]["user_id_idx"].value_counts() | |
logging.info(f"Most Similar Users: {similar_users}") | |
if not similar_users.empty: | |
return int(similar_users.idxmax()) # Most frequent user | |
return None | |
# Recommendation Function | |
def recommend(customer_id: str, top_k: int = 20, liked_categories: str = ""): | |
# Convert customer_id to user_id_idx | |
user_id = user_label_encoder.transform([customer_id])[0] if customer_id in user_label_encoder.classes_ else None | |
# Handle invalid customer_id | |
if user_id is None: | |
if not liked_categories: | |
return json.dumps({"error": "Customer ID not found. New users must provide liked categories"}, indent=2) | |
# Handle cold-start scenario for new users (new customer_id not in the dataset) | |
most_similar_user = get_most_similar_user_by_categories(liked_categories.split(',')) | |
if most_similar_user is None: | |
logging.warning(f"No similar users found for liked categories: {liked_categories.split(',')}") | |
return json.dumps([], indent=2) # Return an empty list instead of hanging | |
# Use the most similar user for recommendations | |
user_id = most_similar_user | |
# Collaborative Filtering | |
logging.info("Starting collaborative filtering") | |
with torch.no_grad(): | |
_, out = model(edge_index) | |
user_emb, item_emb = torch.split(out, (n_users, n_items)) | |
user_embedding = user_emb[user_id] | |
collab_scores = torch.matmul(user_embedding, item_emb.T) | |
collab_top_k_indices = torch.topk(collab_scores, k=top_k).indices.tolist() | |
collab_recommendations = [ | |
{ | |
"item_id": int(item_id_mapping[idx]), | |
"product_name": product_name_mapping.get(idx, "Unknown"), | |
"score": float(collab_scores[idx]) | |
} | |
for idx in collab_top_k_indices if idx in item_id_mapping | |
] | |
# Content-Based Filtering | |
content_recommendations = content_based_filtering(user_id, top_k) | |
# Hybrid Recommendation (Merging Scores) | |
hybrid_scores = {rec["item_id"]: rec["score"] for rec in collab_recommendations} | |
for rec in content_recommendations: | |
if rec["item_id"] in hybrid_scores: | |
hybrid_scores[rec["item_id"]] += rec["score"] # Merging scores | |
else: | |
hybrid_scores[rec["item_id"]] = rec["score"] | |
# Sort recommendations based on hybrid scores | |
hybrid_recommendations = sorted( | |
[{"item_id": item_id,"product_name": product_name_mapping.get(item_id, "Unknown"), "score": score} for item_id, score in hybrid_scores.items()], | |
key=lambda x: x["score"], | |
reverse=True | |
)[:top_k] | |
# Return top-k hybrid recommendations | |
return json.dumps(hybrid_recommendations, indent=2) | |
# import gradio as gr | |
# iface = gr.Interface( | |
# fn=recommend, | |
# inputs=[ | |
# gr.Textbox(label="User ID"), | |
# gr.Number(label="Top K", value=20), | |
# gr.Textbox(label="Liked Categories (comma-separated)") | |
# ], | |
# outputs=gr.JSON(label="Recommendations"), # JSON output | |
# title="AI-Powered Product Recommendation System", | |
# description="Enter a user ID and get personalized product recommendations based on collaborative & content filtering." | |
# ) | |
def get_recommendations(user_id: str, top_k: int = 20, liked_categories: str = ""): | |
result = recommend(user_id, top_k, liked_categories) | |
return JSONResponse(content=json.loads(result)) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |
# if __name__ == "__main__": | |
# iface.launch(server_name="0.0.0.0", server_port=7860) | |