SongPorter / artist_utils.py
MonilM's picture
HF Space Fix#8
26adcbb
import sqlite3
import csv
import os
import re
# Database path
DB_NAME = 'artists.db'
# Get the directory of the current script
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
DATASETS_PATH = os.path.join(CURRENT_DIR, 'datasets')
DB_PATH = os.path.join(DATASETS_PATH, DB_NAME)
CSV_PATH = os.path.join(DATASETS_PATH, 'Global Music Artists.csv')
ALT_CSV_PATH = os.path.join(DATASETS_PATH, 'Global_Music_Artists.csv')
def _normalize_artist_name_for_db(name):
"""Normalize artist name specifically for DB key/lookup."""
if not name:
return ""
name = str(name).lower().strip()
# Basic normalization, can be expanded
name = re.sub(r'\s+', ' ', name)
return name
def _get_db_connection():
"""Establishes a connection to the SQLite database."""
try:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row # Return rows as dictionary-like objects
return conn
except sqlite3.Error as e:
print(f"Error connecting to database {DB_PATH}: {e}")
return None
def load_artist_data():
"""
Load artist data from CSV into SQLite database if it doesn't exist.
Returns True if DB is ready, False otherwise.
"""
# Check if CSV exists
actual_csv_path = None
if os.path.exists(CSV_PATH):
actual_csv_path = CSV_PATH
elif os.path.exists(ALT_CSV_PATH):
actual_csv_path = ALT_CSV_PATH
else:
print(f"Error: Artist CSV not found at {CSV_PATH} or {ALT_CSV_PATH}")
return False
# Check if DB needs to be created/populated
if not os.path.exists(DB_PATH):
print(f"Database {DB_PATH} not found. Creating and populating from {actual_csv_path}...")
os.makedirs(DATASETS_PATH, exist_ok=True) # Ensure datasets directory exists
conn = None
try:
conn = _get_db_connection()
if not conn: return False
cursor = conn.cursor()
# Create table
cursor.execute('''
CREATE TABLE IF NOT EXISTS artists (
artist_name_lower TEXT PRIMARY KEY,
artist_name TEXT,
artist_genre TEXT,
artist_img TEXT,
country TEXT
)
''')
conn.commit()
print("Table 'artists' created successfully (if it didn't exist).")
# Read CSV and insert data
count = 0
inserted_count = 0
skipped_count = 0
encodings_to_try = ['utf-8', 'latin1', 'iso-8859-1'] # Add more if needed
for encoding in encodings_to_try:
try:
print(f"Trying to read CSV with encoding: {encoding}")
with open(actual_csv_path, 'r', encoding=encoding, errors='ignore') as csvfile: # Use errors='ignore' as fallback
# Use csv.DictReader to handle header mapping
reader = csv.DictReader(csvfile)
# Dynamically find column names (case-insensitive)
if not reader.fieldnames:
print(f"Error: CSV file {actual_csv_path} seems empty or has no header.")
return False
headers = [h.lower().strip() for h in reader.fieldnames]
col_map = {
# Try common variations for column names
'name': next((h for h in reader.fieldnames if h.lower().strip() in ['artist_name', 'artist']), None),
'genre': next((h for h in reader.fieldnames if h.lower().strip() in ['artist_genre', 'genre']), None),
'img': next((h for h in reader.fieldnames if h.lower().strip() in ['artist_img', 'artist img', 'image']), None),
'country': next((h for h in reader.fieldnames if h.lower().strip() == 'country'), None)
}
if not col_map['name']:
print(f"Error: Could not find 'artist_name' or 'artist' column in CSV header: {reader.fieldnames}")
return False # Cannot proceed without artist name
print(f"CSV Headers mapped: name='{col_map['name']}', genre='{col_map['genre']}', img='{col_map['img']}', country='{col_map['country']}'")
for row in reader:
count += 1
artist_name_raw = row.get(col_map['name'])
if not artist_name_raw:
skipped_count += 1
continue # Skip rows without an artist name
artist_name_lower = _normalize_artist_name_for_db(artist_name_raw)
if not artist_name_lower: # Skip if normalization results in empty string
skipped_count += 1
continue
# Prepare data for insertion, using None for missing optional columns
data = (
artist_name_lower,
artist_name_raw,
row.get(col_map['genre']) if col_map['genre'] else None,
row.get(col_map['img']) if col_map['img'] else None,
row.get(col_map['country']) if col_map['country'] else None
)
try:
cursor.execute('''
INSERT OR IGNORE INTO artists
(artist_name_lower, artist_name, artist_genre, artist_img, country)
VALUES (?, ?, ?, ?, ?)
''', data)
if cursor.rowcount > 0:
inserted_count += 1
else:
# This means the key already existed (due to OR IGNORE)
skipped_count += 1
except sqlite3.Error as insert_err:
print(f"Warning: Error inserting row {count} ({artist_name_raw}): {insert_err}. Skipping row.")
skipped_count += 1
# Commit periodically to avoid holding locks for too long
if count % 5000 == 0: # Increased commit interval
conn.commit()
print(f"Processed {count} rows...")
conn.commit() # Final commit
print(f"Successfully populated database from CSV using {encoding}.")
print(f"Total rows processed: {count}, Inserted: {inserted_count}, Skipped/Duplicates: {skipped_count}")
return True # Success
except UnicodeDecodeError as ude:
print(f"Encoding {encoding} failed: {ude}. Trying next encoding.")
if conn: conn.rollback() # Rollback any partial inserts from the failed encoding attempt
except FileNotFoundError:
print(f"Error: CSV file not found at {actual_csv_path}")
return False
except Exception as e:
print(f"Error processing CSV with encoding {encoding}: {e}")
if conn: conn.rollback()
# Don't return False immediately, try next encoding
# If all encodings failed
print("Error: Could not read CSV with any attempted encoding.")
return False
except sqlite3.Error as e:
print(f"Database error during population: {e}")
if conn: conn.rollback()
return False
finally:
if conn:
conn.close()
print("Database connection closed.")
else:
print(f"Database {DB_PATH} already exists. Skipping population.")
# Optional: Add check here to see if CSV is newer than DB and repopulate if needed.
# Basic check: DB exists, ensure table exists
conn = None
try:
conn = _get_db_connection()
if not conn: return False # Cannot verify if connection fails
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='artists';")
if cursor.fetchone():
print("Table 'artists' confirmed to exist.")
return True
else:
print("Error: Database file exists, but 'artists' table is missing. Consider deleting the .db file and restarting.")
return False
except sqlite3.Error as e:
print(f"Database error checking existing DB: {e}")
return False
finally:
if conn:
conn.close()
def get_artist_info(artist_name):
"""
Get artist information from the SQLite database.
Returns information in the format expected by Django views.
"""
default_img = "https://media.istockphoto.com/id/1298261537/vector/blank-man-profile-head-icon-placeholder.jpg?s=612x612&w=0&k=20&c=CeT1RVWZzQDay4t54ookMaFsdi7ZHVFg2Y5v7hxigCA="
default_info = {
'artist': artist_name or "Unknown Artist",
'artist_img': default_img,
'country': 'Unknown',
'artist_genre': 'Unknown'
}
if not artist_name:
return default_info
artist_lower = _normalize_artist_name_for_db(artist_name)
if not artist_lower:
return default_info
conn = None
try:
conn = _get_db_connection()
if not conn: return default_info
cursor = conn.cursor()
cursor.execute("SELECT artist_name, artist_genre, artist_img, country FROM artists WHERE artist_name_lower = ?", (artist_lower,))
row = cursor.fetchone()
if row:
# Use fetched name for consistency, fallback to input name if somehow null
fetched_artist_name = row['artist_name'] or artist_name
return {
'artist': fetched_artist_name,
'artist_img': row['artist_img'] or default_img,
'country': row['country'] or 'Unknown',
'artist_genre': row['artist_genre'] or 'Unknown'
}
else:
# Simple fallback if exact match fails (no fuzzy matching here for performance)
# print(f"Artist '{artist_name}' (normalized: '{artist_lower}') not found in DB.") # Keep this commented unless debugging
return default_info
except sqlite3.Error as e:
print(f"Database error fetching artist '{artist_name}': {e}")
return default_info
finally:
if conn:
conn.close()
def get_bulk_artist_info(artist_names):
"""
Get information for multiple artists at once from the SQLite database.
Returns a dictionary mapping original artist names to their information.
"""
results = {}
default_img = "https://media.istockphoto.com/id/1298261537/vector/blank-man-profile-head-icon-placeholder.jpg?s=612x612&w=0&k=20&c=CeT1RVWZzQDay4t54ookMaFsdi7ZHVFg2Y5v7hxigCA="
# Initialize results with default info for all requested names
valid_artist_names = [name for name in artist_names if name] # Filter out None or empty strings
for name in valid_artist_names:
results[name] = {
'artist': name,
'artist_img': default_img,
'country': 'Unknown',
'artist_genre': 'Unknown'
}
# Add entry for None/empty string if it was in the original list
if None in artist_names or "" in artist_names:
if None not in results: results[None] = {'artist': 'Unknown Artist', 'artist_img': default_img, 'country': 'Unknown', 'artist_genre': 'Unknown'}
if "" not in results: results[""] = {'artist': 'Unknown Artist', 'artist_img': default_img, 'country': 'Unknown', 'artist_genre': 'Unknown'}
# Filter out empty names and normalize valid ones for query
normalized_map = {} # Map normalized name back to original(s)
normalized_names_to_query = []
for name in valid_artist_names:
normalized = _normalize_artist_name_for_db(name)
if normalized:
# Only add unique normalized names to the query list
if normalized not in normalized_map:
normalized_names_to_query.append(normalized)
normalized_map[normalized] = []
normalized_map[normalized].append(name) # Store original name(s)
if not normalized_names_to_query:
return results # Return defaults if no valid names provided
conn = None
try:
conn = _get_db_connection()
if not conn: return results # Return defaults if DB connection fails
cursor = conn.cursor()
# Create placeholders for the IN clause
placeholders = ','.join('?' * len(normalized_names_to_query))
query = f"SELECT artist_name_lower, artist_name, artist_genre, artist_img, country FROM artists WHERE artist_name_lower IN ({placeholders})"
cursor.execute(query, normalized_names_to_query)
rows = cursor.fetchall()
# Update results with fetched data
processed_originals = set()
for row in rows:
normalized_key = row['artist_name_lower']
if normalized_key in normalized_map:
# Update all original names that mapped to this normalized key
for original_name in normalized_map[normalized_key]:
# Use fetched name for consistency, fallback to original if somehow null
fetched_artist_name = row['artist_name'] or original_name
results[original_name] = {
'artist': fetched_artist_name,
'artist_img': row['artist_img'] or default_img,
'country': row['country'] or 'Unknown',
'artist_genre': row['artist_genre'] or 'Unknown'
}
processed_originals.add(original_name)
# Any original names whose normalized form wasn't found retain defaults
return results
except sqlite3.Error as e:
print(f"Database error during bulk fetch: {e}")
# Return the results dictionary which contains defaults for failed lookups
return results
finally:
if conn:
conn.close()
# Note: The global ARTIST_DATA and ARTIST_MAP are no longer used or needed.
# The load_artist_data function should be called once at application startup.
# The old normalize_artist_name function is replaced by _normalize_artist_name_for_db.