Spaces:
Runtime error
Runtime error
import random | |
import time | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
import torch.optim as optim | |
import numpy as np | |
import pandas as pd | |
import torch_geometric | |
from torch_geometric.nn.conv import MessagePassing | |
from torch_geometric.utils import degree | |
from tqdm.notebook import tqdm | |
from sklearn import preprocessing as pp | |
from sklearn.model_selection import train_test_split | |
from sklearn.metrics import precision_score, recall_score, f1_score | |
import scipy.sparse as sp | |
import datetime | |
import os | |
import logging | |
import requests | |
from filelock import FileLock | |
from datasets import load_dataset, Dataset | |
from huggingface_hub import HfApi, create_repo, Repository | |
import gc | |
from typing import Tuple, Optional, Dict, List | |
import json | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
logging.info(f"Using device: {device}") | |
cache_base = "/app/cache" | |
os.makedirs(f"{cache_base}/huggingface", exist_ok=True) | |
os.makedirs(f"{cache_base}/transformers", exist_ok=True) | |
os.makedirs(f"{cache_base}/datasets", exist_ok=True) | |
# Set all possible Hugging Face cache environment variables | |
os.environ['HF_HOME'] = f"{cache_base}/huggingface" | |
os.environ['TRANSFORMERS_CACHE'] = f"{cache_base}/transformers" | |
os.environ['HF_DATASETS_CACHE'] = f"{cache_base}/datasets" | |
os.environ['HUGGINGFACE_HUB_CACHE'] = f"{cache_base}/huggingface" | |
os.environ['HF_HUB_CACHE'] = f"{cache_base}/huggingface" | |
API_BASE_URL = os.getenv("API_BASE_URL", "https://ssc-grad.up.railway.app/api/transaction/json") | |
REPO_ID = "FarahMohsenSamy1/Transactions" | |
LOCAL_PATH = "/tmp/transactions.csv" | |
METADATA_PATH = "/tmp/dataset_metadata.json" | |
UNIQUE_KEYS = ["Transaction_ID", "Customer_ID", "Item_ID"] | |
MODEL_DIR = "/app/models" | |
LATEST_MODEL_PATH = os.path.join(MODEL_DIR, "latest_model.txt") | |
ENCODERS_PATH = os.path.join(MODEL_DIR, "encoders.pkl") | |
LATENT_DIM = 64 | |
NUM_LAYERS = 3 | |
LEARNING_RATE = 0.001 | |
BATCH_SIZE = 64 | |
NUM_EPOCHS = 100 | |
MIN_RATING = 3.0 | |
REG_LAMBDA = 1e-4 | |
# Evaluation parameters | |
TOP_K = [5, 10, 20, 30] # Top-K for precision and recall calculation | |
EVALUATION_THRESHOLD = 3.5 # Rating threshold for binary relevance | |
try: | |
api = HfApi(token=os.getenv("HF_TOKEN")) | |
logging.info("HuggingFace API initialized successfully") | |
except Exception as e: | |
logging.error(f"Failed to initialize HF API: {e}") | |
api = None | |
def save_metadata(metadata: dict): | |
"""Save dataset metadata locally.""" | |
with open(METADATA_PATH, 'w') as f: | |
json.dump(metadata, f, indent=2) | |
logging.info(f"Metadata saved to {METADATA_PATH}") | |
def load_metadata() -> dict: | |
"""Load dataset metadata if it exists.""" | |
if os.path.exists(METADATA_PATH): | |
with open(METADATA_PATH, 'r') as f: | |
return json.load(f) | |
return {} | |
def clean_dataframe_for_upload(df: pd.DataFrame) -> pd.DataFrame: | |
"""Clean and standardize DataFrame for HuggingFace upload.""" | |
df_clean = df.copy() | |
# Convert all columns to appropriate types | |
for col in df_clean.columns: | |
if df_clean[col].dtype == 'object': | |
# Try to convert to numeric if possible, otherwise keep as string | |
try: | |
# First try to convert to float | |
df_clean[col] = pd.to_numeric(df_clean[col], errors='ignore') | |
except: | |
# If that fails, ensure it's properly formatted as string | |
df_clean[col] = df_clean[col].astype(str) | |
# Handle any remaining problematic values | |
if df_clean[col].dtype == 'object': | |
df_clean[col] = df_clean[col].fillna('').astype(str) | |
# Ensure ID columns are strings to avoid conversion issues | |
id_columns = ['Transaction_ID', 'Customer_ID', 'Item_ID', 'userId', 'itemId'] | |
for col in id_columns: | |
if col in df_clean.columns: | |
df_clean[col] = df_clean[col].astype(str) | |
# Handle any NaN values | |
df_clean = df_clean.fillna('') | |
logging.info(f"Cleaned DataFrame: {len(df_clean)} rows, {len(df_clean.columns)} columns") | |
logging.info(f"Column types: {dict(df_clean.dtypes)}") | |
return df_clean | |
def fetch_api_data(api_url: str) -> pd.DataFrame: | |
"""Fetch data from the API and return as a DataFrame.""" | |
try: | |
logging.info(f"Fetching data from {api_url}") | |
response = requests.get(api_url, timeout=300) | |
if response.status_code == 200: | |
data = response.json() | |
new_transactions = data.get("data", {}).get("newTransactions", []) | |
rating_updates = data.get("data", {}).get("ratingUpdates", []) | |
combined = new_transactions + rating_updates | |
if combined: | |
df = pd.DataFrame(combined) | |
logging.info(f"Fetched {len(df)} records from API") | |
return df | |
else: | |
logging.info("No new data found in API response") | |
return pd.DataFrame() | |
else: | |
logging.error(f"Failed to fetch from {api_url} — Status code: {response.status_code}") | |
return pd.DataFrame() | |
except Exception as e: | |
logging.error(f"Error fetching from {api_url}: {e}") | |
return pd.DataFrame() | |
def load_existing_dataset() -> pd.DataFrame: | |
"""Load existing dataset from HuggingFace.""" | |
try: | |
logging.info(f"Loading existing dataset from {REPO_ID}") | |
dataset = load_dataset(REPO_ID, split='train') | |
df = dataset.to_pandas() | |
logging.info(f"Loaded {len(df)} existing records from HuggingFace") | |
return df | |
except Exception as e: | |
logging.warning(f"Could not load existing dataset: {e}") | |
return pd.DataFrame() | |
def upload_dataset_to_hf(df: pd.DataFrame) -> bool: | |
"""Upload dataset to HuggingFace with proper data type handling.""" | |
try: | |
if api is None: | |
logging.error("HuggingFace API not initialized") | |
return False | |
# Clean the DataFrame before upload | |
df_clean = clean_dataframe_for_upload(df) | |
# Create dataset with explicit feature types to avoid conversion issues | |
try: | |
dataset = Dataset.from_pandas(df_clean) | |
except Exception as e: | |
logging.error(f"Error creating dataset from pandas: {e}") | |
# Try with string conversion for problematic columns | |
for col in df_clean.columns: | |
if col in ['Item_ID', 'Customer_ID', 'Transaction_ID', 'userId', 'itemId']: | |
df_clean[col] = df_clean[col].astype(str) | |
dataset = Dataset.from_pandas(df_clean) | |
# Push to hub | |
dataset.push_to_hub(REPO_ID, token=os.getenv("HF_TOKEN")) | |
logging.info(f"Successfully uploaded {len(df_clean)} records to {REPO_ID}") | |
# Update metadata | |
metadata = { | |
"last_updated": datetime.datetime.now().isoformat(), | |
"total_records": len(df_clean), | |
"columns": list(df_clean.columns), | |
"data_types": {col: str(dtype) for col, dtype in df_clean.dtypes.items()} | |
} | |
save_metadata(metadata) | |
return True | |
except Exception as e: | |
logging.error(f"Failed to upload dataset to HuggingFace: {e}") | |
# Save locally as backup | |
try: | |
df.to_csv(LOCAL_PATH, index=False) | |
logging.info(f"Dataset saved locally as backup: {LOCAL_PATH}") | |
except Exception as backup_e: | |
logging.error(f"Failed to save backup: {backup_e}") | |
return False | |
def sync_and_update_dataset() -> Tuple[bool, int, pd.DataFrame]: | |
"""Sync and update dataset from API and existing data.""" | |
logging.info("Starting dataset sync and update process...") | |
# Fetch new data from API | |
df_new = fetch_api_data(API_BASE_URL) | |
# Load existing dataset | |
df_existing = load_existing_dataset() | |
if df_new.empty and df_existing.empty: | |
logging.warning("No data available from API or existing dataset") | |
return False, 0, pd.DataFrame() | |
# Combine datasets | |
if df_existing.empty: | |
df_combined = df_new.copy() | |
new_records = len(df_new) | |
elif df_new.empty: | |
df_combined = df_existing.copy() | |
new_records = 0 | |
else: | |
df_combined = pd.concat([df_existing, df_new], ignore_index=True) | |
# Remove duplicates based on unique keys | |
original_len = len(df_combined) | |
df_combined = df_combined.drop_duplicates(subset=UNIQUE_KEYS, keep="last").reset_index(drop=True) | |
new_records = len(df_combined) - len(df_existing) | |
logging.info(f"Removed {original_len - len(df_combined)} duplicate records") | |
# Save locally first | |
try: | |
df_combined.to_csv(LOCAL_PATH, index=False) | |
logging.info(f"Saved combined dataset locally: {len(df_combined)} total records") | |
except Exception as e: | |
logging.error(f"Failed to save locally: {e}") | |
# Try to upload to HuggingFace, but don't fail if it doesn't work | |
upload_success = upload_dataset_to_hf(df_combined) | |
# Return success as True if we have data, regardless of upload status | |
return True, new_records, df_combined | |
def prepare_data_for_training(df: pd.DataFrame) -> Tuple[pd.DataFrame, dict]: | |
"""Prepare data for collaborative filtering training.""" | |
logging.info("Preparing data for training...") | |
# Standardize column names | |
column_mapping = { | |
'Customer_ID': 'userId', | |
'rating': 'rating', | |
'Timestamp': 'timestamp', | |
'Item_ID': 'itemId' | |
} | |
# Rename columns if they exist | |
for old_col, new_col in column_mapping.items(): | |
if old_col in df.columns: | |
df = df.rename(columns={old_col: new_col}) | |
# Select relevant columns for collaborative filtering | |
required_cols = ['userId', 'itemId', 'rating'] | |
optional_cols = ['timestamp'] | |
available_cols = [col for col in required_cols if col in df.columns] | |
available_cols.extend([col for col in optional_cols if col in df.columns]) | |
df_collab = df[available_cols].copy() | |
# Clean data | |
df_collab = df_collab.dropna(subset=required_cols) | |
df_collab = df_collab[df_collab['rating'] >= MIN_RATING] | |
# Convert data types | |
df_collab['userId'] = df_collab['userId'].astype(str) | |
df_collab['itemId'] = df_collab['itemId'].astype(str) | |
df_collab['rating'] = pd.to_numeric(df_collab['rating'], errors='coerce') | |
# Remove any rows with invalid ratings | |
df_collab = df_collab.dropna(subset=['rating']) | |
logging.info(f"Prepared {len(df_collab)} records for training") | |
# Check minimum requirements | |
n_users = df_collab['userId'].nunique() | |
n_items = df_collab['itemId'].nunique() | |
stats = { | |
'n_users': n_users, | |
'n_items': n_items, | |
'n_interactions': len(df_collab), | |
'sparsity': 1 - (len(df_collab) / (n_users * n_items)) | |
} | |
logging.info(f"Dataset stats: {stats}") | |
if n_users < 2 or n_items < 2: | |
raise ValueError(f"Insufficient data for training: {n_users} users, {n_items} items") | |
return df_collab, stats | |
def create_train_test_split(df: pd.DataFrame, test_size: float = 0.2) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
"""Create train-test split ensuring proper data distribution.""" | |
logging.info("Creating train-test split...") | |
if len(df) < 10: | |
# For very small datasets, use most data for training | |
test_size = min(0.1, (len(df) - 1) / len(df)) | |
train_df, test_df = train_test_split(df, test_size=test_size, random_state=42, stratify=None) | |
# Ensure test set only contains users and items present in training set | |
train_users = set(train_df['userId'].unique()) | |
train_items = set(train_df['itemId'].unique()) | |
test_df = test_df[ | |
(test_df['userId'].isin(train_users)) & | |
(test_df['itemId'].isin(train_items)) | |
] | |
logging.info(f"Train set: {len(train_df)} interactions") | |
logging.info(f"Test set: {len(test_df)} interactions") | |
return train_df.reset_index(drop=True), test_df.reset_index(drop=True) | |
def encode_users_items(train_df: pd.DataFrame, test_df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, dict]: | |
"""Encode users and items to sequential indices.""" | |
logging.info("Encoding users and items...") | |
le_user = pp.LabelEncoder() | |
le_item = pp.LabelEncoder() | |
# Create copies to avoid modifying original data | |
train_df = train_df.copy() | |
test_df = test_df.copy() | |
# Fit encoders on training data | |
train_df['user_id_idx'] = le_user.fit_transform(train_df['userId'].values) | |
train_df['item_id_idx'] = le_item.fit_transform(train_df['itemId'].values) | |
# Transform test data | |
test_df['user_id_idx'] = le_user.transform(test_df['userId'].values) | |
test_df['item_id_idx'] = le_item.transform(test_df['itemId'].values) | |
encoders = { | |
'user_encoder': le_user, | |
'item_encoder': le_item, | |
'n_users': len(le_user.classes_), | |
'n_items': len(le_item.classes_) | |
} | |
# Save encoders | |
import pickle | |
os.makedirs(os.path.dirname(ENCODERS_PATH), exist_ok=True) | |
with open(ENCODERS_PATH, 'wb') as f: | |
pickle.dump(encoders, f) | |
logging.info(f"Encoded {encoders['n_users']} users and {encoders['n_items']} items") | |
return train_df, test_df, encoders | |
# Data loader function | |
def data_loader(data: pd.DataFrame, batch_size: int, n_usr: int, n_itm: int): | |
"""Generate batches for training with negative sampling.""" | |
def sample_neg(user_items: set) -> int: | |
while True: | |
neg_id = random.randint(0, n_itm - 1) | |
if neg_id not in user_items: | |
return neg_id | |
# Group items by user | |
user_items_dict = data.groupby('user_id_idx')['item_id_idx'].apply(set).to_dict() | |
# Sample users | |
available_users = list(user_items_dict.keys()) | |
if len(available_users) < batch_size: | |
users = random.choices(available_users, k=batch_size) | |
else: | |
users = random.sample(available_users, batch_size) | |
pos_items = [] | |
neg_items = [] | |
for user in users: | |
user_items = user_items_dict[user] | |
pos_item = random.choice(list(user_items)) | |
neg_item = sample_neg(user_items) | |
pos_items.append(pos_item) | |
neg_items.append(neg_item) | |
return ( | |
torch.LongTensor(users).to(device), | |
torch.LongTensor(pos_items).to(device) + n_usr, | |
torch.LongTensor(neg_items).to(device) + n_usr | |
) | |
# LightGCN Convolution Layer | |
class LightGCNConv(MessagePassing): | |
def __init__(self): | |
super().__init__(aggr='add') | |
def forward(self, x, edge_index): | |
from_, to_ = edge_index | |
deg = degree(to_, x.size(0), dtype=x.dtype) | |
deg_inv_sqrt = deg.pow(-0.5) | |
deg_inv_sqrt[deg_inv_sqrt == float('inf')] = 0 | |
norm = deg_inv_sqrt[from_] * deg_inv_sqrt[to_] | |
return self.propagate(edge_index, x=x, norm=norm) | |
def message(self, x_j, norm): | |
return norm.view(-1, 1) * x_j | |
# Recommendation System Model | |
class RecSysGNN(nn.Module): | |
def __init__(self, latent_dim: int, num_layers: int, num_users: int, num_items: int, dropout: float = 0.1): | |
super(RecSysGNN, self).__init__() | |
self.num_users = num_users | |
self.num_items = num_items | |
self.latent_dim = latent_dim | |
self.num_layers = num_layers | |
self.embedding = nn.Embedding(num_users + num_items, latent_dim) | |
self.convs = nn.ModuleList([LightGCNConv() for _ in range(num_layers)]) | |
self.dropout = nn.Dropout(dropout) | |
self.init_parameters() | |
def init_parameters(self): | |
nn.init.normal_(self.embedding.weight, std=0.1) | |
def forward(self, edge_index): | |
emb0 = self.embedding.weight | |
embs = [emb0] | |
emb = emb0 | |
for conv in self.convs: | |
emb = conv(x=emb, edge_index=edge_index) | |
emb = self.dropout(emb) | |
embs.append(emb) | |
return emb0, torch.mean(torch.stack(embs, dim=0), dim=0) | |
def encode_minibatch(self, users, pos_items, neg_items, edge_index): | |
emb0, final_emb = self(edge_index) | |
return ( | |
final_emb[users], final_emb[pos_items], final_emb[neg_items], | |
emb0[users], emb0[pos_items], emb0[neg_items] | |
) | |
def predict(self, users, items, edge_index): | |
"""Predict ratings for user-item pairs.""" | |
with torch.no_grad(): | |
_, final_emb = self(edge_index) | |
user_emb = final_emb[users] | |
item_emb = final_emb[items] | |
predictions = torch.mul(user_emb, item_emb).sum(dim=1) | |
return predictions | |
# Compute BPR Loss with regularization | |
def compute_bpr_loss(users, users_emb, pos_emb, neg_emb, user_emb0, pos_emb0, neg_emb0, reg_lambda=REG_LAMBDA): | |
reg_loss = reg_lambda * (1/2) * ( | |
user_emb0.norm().pow(2) + | |
pos_emb0.norm().pow(2) + | |
neg_emb0.norm().pow(2) | |
) / float(len(users)) | |
pos_scores = torch.mul(users_emb, pos_emb).sum(dim=1) | |
neg_scores = torch.mul(users_emb, neg_emb).sum(dim=1) | |
bpr_loss = torch.mean(F.softplus(neg_scores - pos_scores)) | |
return bpr_loss + reg_loss, reg_loss | |
def compute_precision_recall_at_k(model: nn.Module, test_df: pd.DataFrame, train_df: pd.DataFrame, | |
encoders: dict, edge_index: torch.Tensor, k_values: List[int] = TOP_K) -> Dict[str, float]: | |
""" | |
Compute precision and recall at K for the recommendation model. | |
Args: | |
model: Trained recommendation model | |
test_df: Test dataset | |
train_df: Training dataset (to exclude known interactions) | |
encoders: User and item encoders | |
edge_index: Graph edge index | |
k_values: List of K values to compute metrics for | |
Returns: | |
Dictionary with precision and recall metrics | |
""" | |
model.eval() | |
logging.info("Computing precision and recall metrics...") | |
n_users = encoders['n_users'] | |
n_items = encoders['n_items'] | |
# Create test user-item ground truth | |
test_user_items = test_df.groupby('user_id_idx')['item_id_idx'].apply(set).to_dict() | |
train_user_items = train_df.groupby('user_id_idx')['item_id_idx'].apply(set).to_dict() | |
# Filter test users who have enough interactions | |
valid_test_users = [user for user, items in test_user_items.items() if len(items) >= min(k_values)] | |
if not valid_test_users: | |
logging.warning("No valid test users found for evaluation") | |
return {f"precision@{k}": 0.0 for k in k_values} | {f"recall@{k}": 0.0 for k in k_values} | |
metrics = {f"precision@{k}": [] for k in k_values} | |
metrics.update({f"recall@{k}": [] for k in k_values}) | |
with torch.no_grad(): | |
_, final_emb = model(edge_index) | |
for user_idx in valid_test_users: | |
# Get ground truth items for this user | |
true_items = test_user_items[user_idx] | |
# Get training items to exclude from recommendations | |
train_items = train_user_items.get(user_idx, set()) | |
# Get user embedding | |
user_tensor = torch.LongTensor([user_idx]).to(device) | |
user_emb = final_emb[user_tensor] | |
# Compute scores for all items | |
item_indices = torch.arange(n_items).to(device) + n_users | |
item_embs = final_emb[item_indices] | |
scores = torch.mm(user_emb, item_embs.t()).squeeze() | |
# Mask out training items (set their scores to very low values) | |
for train_item in train_items: | |
scores[train_item] = float('-inf') | |
# Get top-K recommendations for each K value | |
for k in k_values: | |
_, top_k_items = torch.topk(scores, k) | |
top_k_items = top_k_items.cpu().numpy() | |
# Calculate precision and recall | |
relevant_items = len(set(top_k_items) & true_items) | |
precision = relevant_items / k if k > 0 else 0.0 | |
recall = relevant_items / len(true_items) if len(true_items) > 0 else 0.0 | |
metrics[f"precision@{k}"].append(precision) | |
metrics[f"recall@{k}"].append(recall) | |
# Calculate average metrics | |
avg_metrics = {} | |
for metric_name, values in metrics.items(): | |
if values: | |
avg_metrics[metric_name] = np.mean(values) | |
else: | |
avg_metrics[metric_name] = 0.0 | |
# Add F1 scores | |
for k in k_values: | |
precision = avg_metrics[f"precision@{k}"] | |
recall = avg_metrics[f"recall@{k}"] | |
if precision + recall > 0: | |
f1 = 2 * (precision * recall) / (precision + recall) | |
else: | |
f1 = 0.0 | |
avg_metrics[f"f1@{k}"] = f1 | |
return avg_metrics | |
def evaluate_model_ratings(model: nn.Module, test_df: pd.DataFrame, encoders: dict, edge_index: torch.Tensor) -> Dict[str, float]: | |
""" | |
Evaluate model performance on rating prediction task. | |
Args: | |
model: Trained recommendation model | |
test_df: Test dataset with ratings | |
encoders: User and item encoders | |
edge_index: Graph edge index | |
Returns: | |
Dictionary with rating prediction metrics | |
""" | |
model.eval() | |
logging.info("Evaluating rating prediction performance...") | |
n_users = encoders['n_users'] | |
with torch.no_grad(): | |
# Get predictions for test set | |
user_indices = torch.LongTensor(test_df['user_id_idx'].values).to(device) | |
item_indices = torch.LongTensor(test_df['item_id_idx'].values).to(device) + n_users | |
predictions = model.predict(user_indices, item_indices, edge_index) | |
predictions = predictions.cpu().numpy() | |
# Get actual ratings | |
actual_ratings = test_df['rating'].values | |
# Convert to binary for precision/recall (above threshold = relevant) | |
binary_actual = (actual_ratings >= EVALUATION_THRESHOLD).astype(int) | |
binary_pred = (predictions >= np.mean(predictions)).astype(int) | |
# Calculate metrics | |
try: | |
precision = precision_score(binary_actual, binary_pred, zero_division=0) | |
recall = recall_score(binary_actual, binary_pred, zero_division=0) | |
f1 = f1_score(binary_actual, binary_pred, zero_division=0) | |
except Exception as e: | |
logging.warning(f"Error calculating rating-based metrics: {e}") | |
precision = recall = f1 = 0.0 | |
# Calculate RMSE and MAE | |
mse = np.mean((predictions - actual_ratings) ** 2) | |
rmse = np.sqrt(mse) | |
mae = np.mean(np.abs(predictions - actual_ratings)) | |
return { | |
'rating_precision': precision, | |
'rating_recall': recall, | |
'rating_f1': f1, | |
'rmse': rmse, | |
'mae': mae | |
} | |
def train_model(train_df: pd.DataFrame, test_df: pd.DataFrame, encoders: dict) -> Tuple[nn.Module, Dict[str, float]]: | |
"""Train the recommendation model and return evaluation metrics.""" | |
logging.info("Starting model training...") | |
n_users = encoders['n_users'] | |
n_items = encoders['n_items'] | |
# Create edge index for graph | |
u_t = torch.LongTensor(train_df['user_id_idx'].values) | |
i_t = torch.LongTensor(train_df['item_id_idx'].values) + n_users | |
train_edge_index = torch.stack(( | |
torch.cat([u_t, i_t]), | |
torch.cat([i_t, u_t]) | |
)).to(device) | |
# Initialize model | |
model = RecSysGNN( | |
latent_dim=LATENT_DIM, | |
num_layers=NUM_LAYERS, | |
num_users=n_users, | |
num_items=n_items | |
).to(device) | |
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE) | |
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.5) | |
model.train() | |
best_loss = float('inf') | |
for epoch in range(NUM_EPOCHS): | |
epoch_loss = 0.0 | |
epoch_reg_loss = 0.0 | |
num_batches = 0 | |
# Multiple batches per epoch | |
for _ in range(max(1, len(train_df) // BATCH_SIZE)): | |
users, pos_items, neg_items = data_loader(train_df, BATCH_SIZE, n_users, n_items) | |
optimizer.zero_grad() | |
users_emb, pos_emb, neg_emb, user_emb0, pos_emb0, neg_emb0 = model.encode_minibatch( | |
users, pos_items, neg_items, train_edge_index | |
) | |
loss, reg_loss = compute_bpr_loss( | |
users, users_emb, pos_emb, neg_emb, | |
user_emb0, pos_emb0, neg_emb0 | |
) | |
loss.backward() | |
optimizer.step() | |
epoch_loss += loss.item() | |
epoch_reg_loss += reg_loss.item() | |
num_batches += 1 | |
# Clean up tensors | |
del users, pos_items, neg_items, users_emb, pos_emb, neg_emb | |
scheduler.step() | |
avg_loss = epoch_loss / num_batches | |
avg_reg_loss = epoch_reg_loss / num_batches | |
if avg_loss < best_loss: | |
best_loss = avg_loss | |
if epoch % 10 == 0: | |
logging.info(f"Epoch {epoch + 1}/{NUM_EPOCHS}: Loss = {avg_loss:.4f}, Reg Loss = {avg_reg_loss:.4f}") | |
# Memory cleanup | |
gc.collect() | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
logging.info(f"Training completed. Best loss: {best_loss:.4f}") | |
# Evaluate model after training | |
logging.info("Starting model evaluation...") | |
# Compute precision and recall at K | |
ranking_metrics = compute_precision_recall_at_k( | |
model, test_df, train_df, encoders, train_edge_index, TOP_K | |
) | |
# Compute rating prediction metrics | |
rating_metrics = evaluate_model_ratings( | |
model, test_df, encoders, train_edge_index | |
) | |
# Combine all metrics | |
all_metrics = {**ranking_metrics, **rating_metrics} | |
# Log all metrics | |
logging.info("=== Model Evaluation Results ===") | |
for metric_name, value in all_metrics.items(): | |
logging.info(f"{metric_name}: {value:.4f}") | |
return model, all_metrics | |
def save_model(model: nn.Module, encoders: dict, metrics: Dict[str, float]): | |
"""Save the trained model, encoders, and evaluation metrics.""" | |
os.makedirs(MODEL_DIR, exist_ok=True) | |
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
model_path = os.path.join(MODEL_DIR, f"model_{timestamp}.pth") | |
# Save model state dict with metrics | |
torch.save({ | |
'model_state_dict': model.state_dict(), | |
'model_config': { | |
'latent_dim': LATENT_DIM, | |
'num_layers': NUM_LAYERS, | |
'num_users': encoders['n_users'], | |
'num_items': encoders['n_items'] | |
}, | |
'encoders': encoders, | |
'metrics': metrics, | |
'timestamp': timestamp | |
}, model_path) | |
# Update latest model path | |
with open(LATEST_MODEL_PATH, 'w') as f: | |
f.write(model_path) | |
# Save metrics separately for easy access | |
metrics_path = os.path.join(MODEL_DIR, f"metrics_{timestamp}.json") | |
with open(metrics_path, 'w') as f: | |
json.dump(metrics, f, indent=2) | |
logging.info(f"Model saved: {model_path}") | |
logging.info(f"Metrics saved: {metrics_path}") | |
logging.info(f"Latest model path updated: {LATEST_MODEL_PATH}") | |
def print_evaluation_summary(metrics: Dict[str, float], encoders: dict): | |
"""Print a comprehensive evaluation summary.""" | |
print("\n" + "="*60) | |
print("MODEL EVALUATION SUMMARY") | |
print("="*60) | |
print(f"\nDataset Information:") | |
print(f" - Number of users: {encoders['n_users']}") | |
print(f" - Number of items: {encoders['n_items']}") | |
print(f" - Total interactions: {encoders['n_users'] * encoders['n_items']}") | |
print(f"\nRanking Metrics (Top-K Recommendations):") | |
for k in TOP_K: | |
precision = metrics.get(f"precision@{k}", 0) | |
recall = metrics.get(f"recall@{k}", 0) | |
f1 = metrics.get(f"f1@{k}", 0) | |
print(f" Top-{k}:") | |
print(f" - Precision: {precision:.4f}") | |
print(f" - Recall: {recall:.4f}") | |
print(f" - F1-Score: {f1:.4f}") | |
print(f"\nRating Prediction Metrics:") | |
print(f" - Precision: {metrics.get('rating_precision', 0):.4f}") | |
print(f" - Recall: {metrics.get('rating_recall', 0):.4f}") | |
print(f" - F1-Score: {metrics.get('rating_f1', 0):.4f}") | |
print(f" - RMSE: {metrics.get('rmse', 0):.4f}") | |
print(f" - MAE: {metrics.get('mae', 0):.4f}") | |
print("\n" + "="*60) | |
def main(): | |
"""Main training pipeline with evaluation.""" | |
try: | |
logging.info("=== Starting Enhanced Training Pipeline with Evaluation ===") | |
# Step 1: Sync and update dataset | |
sync_success, new_records, df_complete = sync_and_update_dataset() | |
if not sync_success or df_complete.empty: | |
logging.error("Dataset sync failed or no data available. Aborting training.") | |
return | |
logging.info(f"Dataset sync completed. New records: {new_records}, Total records: {len(df_complete)}") | |
# Step 2: Prepare data for training | |
df_collab, stats = prepare_data_for_training(df_complete) | |
if len(df_collab) < 10: | |
logging.warning(f"Insufficient data for meaningful training: {len(df_collab)} interactions") | |
return | |
# Step 3: Create train-test split | |
train_df, test_df = create_train_test_split(df_collab) | |
if len(test_df) == 0: | |
logging.warning("No test data available for evaluation") | |
return | |
# Step 4: Encode users and items | |
train_df, test_df, encoders = encode_users_items(train_df, test_df) | |
# Step 5: Train model and get evaluation metrics | |
model, metrics = train_model(train_df, test_df, encoders) | |
# Step 6: Save model with metrics | |
save_model(model, encoders, metrics) | |
# Step 7: Print comprehensive evaluation summary | |
print_evaluation_summary(metrics, encoders) | |
logging.info("=== Training Pipeline Completed Successfully ===") | |
logging.info(f"Final model performance summary:") | |
logging.info(f" - Users: {encoders['n_users']}, Items: {encoders['n_items']}") | |
logging.info(f" - Best Precision@10: {metrics.get('precision@10', 0):.4f}") | |
logging.info(f" - Best Recall@10: {metrics.get('recall@10', 0):.4f}") | |
logging.info(f" - RMSE: {metrics.get('rmse', 0):.4f}") | |
except Exception as e: | |
logging.error(f"Training pipeline failed: {e}") | |
import traceback | |
logging.error(f"Full traceback: {traceback.format_exc()}") | |
raise | |
def load_and_evaluate_saved_model(model_path: str = None) -> Dict[str, float]: | |
""" | |
Load a saved model and return its evaluation metrics. | |
Args: | |
model_path: Path to the saved model. If None, loads the latest model. | |
Returns: | |
Dictionary with evaluation metrics | |
""" | |
if model_path is None: | |
if os.path.exists(LATEST_MODEL_PATH): | |
with open(LATEST_MODEL_PATH, 'r') as f: | |
model_path = f.read().strip() | |
else: | |
raise FileNotFoundError("No saved model found") | |
# Load model checkpoint | |
checkpoint = torch.load(model_path, map_location=device) | |
if 'metrics' in checkpoint: | |
logging.info("Loaded evaluation metrics from saved model:") | |
for metric_name, value in checkpoint['metrics'].items(): | |
logging.info(f" {metric_name}: {value:.4f}") | |
return checkpoint['metrics'] | |
else: | |
logging.warning("No evaluation metrics found in saved model") | |
return {} | |
if __name__ == "__main__": | |
main() |