SongPorter / app.py
MonilM's picture
Reduced Logging
febe1b6
import logging
from fastapi import FastAPI, HTTPException, Request, Body
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import StreamingResponse, FileResponse # Added FileResponse
import os
import time
import tempfile # Added
import shutil # Added
import re # Added
import yt_dlp # Added
from mutagen.mp3 import MP3 # Added
from mutagen.id3 import ID3, TIT2, TPE1, TALB, APIC, TDRC, COMM # Added
import requests # Added
# Import recommender and artist utils
# Assuming these files exist in the same directory or are importable
try:
from recommendation import MusicRecommender, get_hardcoded_recommendations
# Ensure load_artist_data is imported
from artist_utils import get_bulk_artist_info, load_artist_data, get_artist_info
except ImportError:
print("Warning: Recommendation or artist utils not found. Related endpoints might fail.") # Replaced logger
MusicRecommender = None
get_hardcoded_recommendations = lambda limit: []
get_bulk_artist_info = lambda names: {}
load_artist_data = lambda: False # Define a dummy function if import fails
get_artist_info = lambda name: {'artist': name, 'artist_img': None, 'country': 'Unknown', 'artist_genre': 'Unknown'}
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)
# --- Initialize Artist Data ---
print("Attempting to load/verify artist data...")
if load_artist_data():
print("Artist data loaded/verified successfully.")
else:
# Using print instead of logger.error
print("CRITICAL: Failed to load artist data on startup. Artist info endpoints might fail.")
# Consider if the app should exit or continue with degraded functionality
# Initialize FastAPI app
app = FastAPI(
title="SongPorter API",
description="Music recommendation, artist info, and download API", # Updated description
version="1.1.0", # Incremented version
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
# --- Initialize Recommender ---
recommender = None
if MusicRecommender:
try:
logger.info("Initializing Music Recommender...")
recommender = MusicRecommender()
logger.info("Music Recommender loaded successfully.")
except Exception as e:
logger.error(f"Failed to load Music Recommender: {e}", exc_info=True)
# --- Initialize Artist Data ---
if not load_artist_data():
logger.error("CRITICAL: Failed to load artist data on startup.")
# App will run without artist data, but log errors.
# --- Utility Functions (Adapted from download_utils.py) ---
def sanitize_filename(filename, max_length=200):
"""Sanitizes a string to be used as a valid filename."""
sanitized = re.sub(r'[\\/*?:\"<>|]', "", filename)
sanitized = re.sub(r'\s+', ' ', sanitized).strip()
if len(sanitized) > max_length:
last_space = sanitized[:max_length].rfind(' ')
if last_space != -1:
sanitized = sanitized[:last_space]
else:
sanitized = sanitized[:max_length]
if not sanitized:
sanitized = "downloaded_file"
return sanitized
def embed_metadata_fastapi(mp3_path, title, artist, album=None, thumbnail_url=None, year=None, youtube_id=None):
"""Embeds ID3 metadata into an MP3 file (FastAPI context)."""
try:
print(f"Embedding metadata into: {mp3_path}") # Replaced logger
audio = MP3(mp3_path, ID3=ID3)
if audio.tags is None:
audio.add_tags()
audio.tags.add(TIT2(encoding=3, text=title))
audio.tags.add(TPE1(encoding=3, text=artist))
if album:
audio.tags.add(TALB(encoding=3, text=album))
if year:
try:
audio.tags.add(TDRC(encoding=3, text=str(year)))
except ValueError:
print(f"Warning: Invalid year format for metadata: {year}") # Replaced logger
if youtube_id:
audio.tags.add(COMM(encoding=3, lang='eng', desc='YouTube ID', text=youtube_id))
if thumbnail_url:
try:
response = requests.get(thumbnail_url, stream=True, timeout=10)
response.raise_for_status()
mime_type = response.headers.get('content-type', 'image/jpeg').lower()
img_format = None
if 'jpeg' in mime_type or 'jpg' in mime_type: img_format = 'image/jpeg'
elif 'png' in mime_type: img_format = 'image/png'
if img_format:
image_data = response.content
audio.tags.add(APIC(encoding=3, mime=img_format, type=3, desc='Cover', data=image_data))
print(f"Successfully embedded cover art from {thumbnail_url}") # Replaced logger
else:
print(f"Warning: Unsupported image format for cover art: {mime_type}") # Replaced logger
except requests.exceptions.RequestException as e:
print(f"Error: Failed to download cover art from {thumbnail_url}: {e}") # Replaced logger
except Exception as e:
print(f"Error: Failed to embed cover art: {e}") # Replaced logger
audio.save()
print(f"Metadata embedded successfully for {os.path.basename(mp3_path)}") # Replaced logger
return True # Indicate success
except Exception as e:
print(f"Error embedding metadata for {os.path.basename(mp3_path)}: {e}") # Replaced logger
# Consider adding traceback print here: import traceback; traceback.print_exc()
return False # Indicate failure
# --- API Input Models ---
class ArtistInfoRequestData(BaseModel):
artist_name: str = Field(..., example="Artist Name 1")
class SongInput(BaseModel):
spotify_id: Optional[str] = None
title: Optional[str] = None
artist: Optional[str] = None
class RecommendationRequestData(BaseModel):
songs: List[SongInput] = Field(..., example=[{"spotify_id": "id1", "title": "Song A", "artist": "Artist X"}])
limit: Optional[int] = 10
# --- NEW: Download Request Model ---
class DownloadRequestData(BaseModel):
url: str = Field(..., example="https://www.youtube.com/watch?v=dQw4w9WgXcQ")
format: Optional[str] = Field("mp3", example="mp3") # Default to mp3
# --- Request timing middleware ---
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
# --- API Endpoints ---
@app.post("/recommendations/")
async def get_recommendations_endpoint(request_data: RecommendationRequestData):
# ... (existing recommendation logic remains unchanged) ...
if recommender is None:
logger.error("Recommender not available.")
# Return hardcoded or popular songs as fallback
return {"recommendations": get_hardcoded_recommendations(request_data.limit or 10),
"message": "Recommender unavailable, returning popular songs."}
try:
limit = request_data.limit or 10
recent_song_ids = [song.spotify_id for song in request_data.songs if song.spotify_id]
top_genres = [] # Derive if needed, or let recommender handle empty
logger.info(f"Received recommendation request. Extracted IDs: {recent_song_ids}, Derived Genres: {top_genres}, Limit: {limit}")
all_recommendations = []
for song_id in recent_song_ids[:5]:
song_recommendations = recommender.find_similar_songs(song_id, n=20)
if song_recommendations:
all_recommendations.extend(song_recommendations)
for genre in top_genres[:3]:
if genre and genre != 'Unknown':
genre_recommendations = recommender.get_recommendations_by_genre(genre, n=10)
if genre_recommendations:
all_recommendations.extend(genre_recommendations)
if not all_recommendations:
logger.warning("No recommendations generated, falling back to popular.")
popular_songs = recommender.get_popular_songs(limit)
return {"recommendations": popular_songs}
recommendation_dict = {}
for rec in all_recommendations:
key = rec.get('spotify_id') or rec.get('title')
if not key: continue
if key in recommendation_dict:
recommendation_dict[key]['count'] = recommendation_dict[key].get('count', 1) + 1
else:
rec['count'] = 1
recommendation_dict[key] = rec
final_recommendations = list(recommendation_dict.values())
final_recommendations.sort(key=lambda x: (x.get('count', 0), x.get('popularity', 0)), reverse=True)
# Limit the list before fetching thumbnails
limited_recommendations = final_recommendations[:limit]
# --- Fetch Thumbnails and Construct Song URL ---
logger.info(f"Fetching thumbnails and constructing URLs for {len(limited_recommendations)} recommendations...")
for rec in limited_recommendations:
spotify_id = rec.get('spotify_id')
rec['thumbnail_url'] = None # Initialize thumbnail_url
rec['song_url'] = None # Initialize song_url
if spotify_id:
rec['song_url'] = f"https://open.spotify.com/track/{spotify_id}" # Construct song URL
try:
oembed_url = f"https://open.spotify.com/oembed?url=https://open.spotify.com/track/{spotify_id}"
# Use requests library (ensure it's imported at the top)
import requests
response = requests.get(oembed_url, timeout=5) # Add timeout
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
oembed_data = response.json()
rec['thumbnail_url'] = oembed_data.get('thumbnail_url')
# logger.debug(f"Fetched thumbnail for {spotify_id}: {rec['thumbnail_url']}") # Optional debug log
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to fetch thumbnail for Spotify ID {spotify_id}: {e}")
except Exception as e:
# Use logger for errors
logger.error(f"Unexpected error fetching thumbnail for {spotify_id}: {e}", exc_info=True)
logger.info(f"Generated {len(limited_recommendations)} recommendations with thumbnails and URLs.")
# Ensure the response structure matches the desired format
response_recommendations = [
{
"title": rec.get("title"),
"artist": rec.get("artist"),
"album": rec.get("album"),
"spotify_id": rec.get("spotify_id"),
"popularity": rec.get("popularity"),
"thumbnail_url": rec.get("thumbnail_url"),
"song_url": rec.get("song_url")
}
for rec in limited_recommendations # Use the limited list
]
return {"recommendations": response_recommendations}
except Exception as e:
logger.error(f"Error generating recommendations: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to generate recommendations: {str(e)}")
@app.post("/artist-info/")
async def get_artist_info_endpoint(request_data: ArtistInfoRequestData):
# ... (existing artist info logic remains unchanged) ...
try:
artist_name = request_data.artist_name
logger.info(f"Received artist info request for artist: {artist_name}")
if not artist_name:
raise HTTPException(status_code=400, detail="artist_name field cannot be empty")
artist_info = get_artist_info(artist_name) # Assumes this function exists and works
logger.info(f"Returning info for artist: {artist_name}")
# Return in the exact format expected by Django's ArtistSerializer
# Note: The original request asked for values directly in braces, which isn't standard JSON.
# Returning standard JSON key-value pairs. Django side might need adjustment if it expects the odd format.
return {
'artist': artist_info.get('artist', artist_name),
'artist_img': artist_info.get('artist_img'),
'country': artist_info.get('country', 'Unknown'),
'artist_genre': artist_info.get('artist_genre', 'Unknown')
}
except Exception as e:
logger.error(f"Error fetching artist info: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to fetch artist info: {str(e)}")
# --- NEW: YouTube Download Endpoint ---
@app.post("/download-youtube/")
async def download_youtube_endpoint(request_data: DownloadRequestData):
"""
Downloads audio from a YouTube URL, embeds metadata, and returns the file.
If a search query URL is provided, downloads the first result.
"""
url = request_data.url
output_format = request_data.format if request_data.format in ['mp3', 'aac'] else 'mp3' # Validate format
temp_dir = None # Initialize temp_dir
# Check if URL is a search query
is_search_query = False
if "youtube.com/results" in url or "youtube.com/search" in url:
is_search_query = True
print(f"Detected YouTube search URL: {url}, will attempt to download first result")
# Basic URL validation
if not ("youtube.com" in url or "youtu.be" in url):
raise HTTPException(status_code=400, detail="Invalid YouTube URL provided.")
if 'list=' in url and not is_search_query:
raise HTTPException(status_code=400, detail="Playlist URLs are not supported by this endpoint.")
try:
print(f"Received download request for YouTube URL: {url} (Format: {output_format})") # Replaced logger
temp_dir = tempfile.mkdtemp()
base_temp_filename = os.path.join(temp_dir, f'download_{int(time.time())}')
cookie_src_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cookies.txt")
cookie_dst_path = os.path.join(temp_dir, "cookies.txt")
if os.path.exists(cookie_src_path):
try:
# Copy cookies.txt to a temp location to avoid permission issues
import shutil
shutil.copy(cookie_src_path, cookie_dst_path)
os.chmod(cookie_dst_path, 0o644)
except Exception as e:
print(f"Warning: Could not copy/chmod cookies.txt: {e}")
# Base yt-dlp options
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': output_format,
'preferredquality': '192', # Standard quality
}],
'outtmpl': f'{base_temp_filename}.%(ext)s',
'noplaylist': True,
'quiet': True,
'no_warnings': True,
# Use absolute path inside the container
'logtostderr': False,
'ignoreerrors': False,
'max_filesize': 500 * 1024 * 1024, # Limit download size to 500MB
}
if os.path.exists(cookie_dst_path):
ydl_opts['cookiefile'] = cookie_dst_path
downloaded_info = None
final_audio_path = None
try:
# If this is a search URL, find the first video result
actual_download_url = url
if is_search_query:
print(f"Finding first video from search: {url}")
search_opts = dict(ydl_opts)
search_opts['quiet'] = False # Show output for debugging
search_opts['extract_flat'] = True
search_opts['force_generic_extractor'] = False
search_opts['noplaylist'] = False
# Add options to limit search results
search_opts['playlistend'] = 1 # Stop after first result
search_opts['max_downloads'] = 1 # Only download 1 entry
with yt_dlp.YoutubeDL(search_opts) as ydl:
info = ydl.extract_info(url, download=False)
if info and 'entries' in info and info['entries']:
# Get the first valid entry
for entry in info['entries'][:1]: # Limit to just the first entry
if entry.get('_type') != 'playlist' and entry.get('id'):
actual_download_url = f"https://www.youtube.com/watch?v={entry['id']}"
print(f"Found first result: {entry.get('title', 'Unknown')} (ID: {entry['id']})")
break
if actual_download_url == url:
print("No suitable video found in search results")
raise HTTPException(status_code=404, detail="No videos found in search results")
else:
print("No search results found")
raise HTTPException(status_code=404, detail="No videos found in search results")
# Now download the actual video
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
print(f"Starting yt-dlp download for: {actual_download_url}") # Replaced logger
info_dict = ydl.extract_info(actual_download_url, download=True)
downloaded_info = info_dict # Store info for metadata
print(f"yt-dlp download finished for: {actual_download_url}") # Replaced logger
# Find the downloaded audio file (yt-dlp might rename it slightly)
expected_prefix = os.path.basename(base_temp_filename)
for filename in os.listdir(temp_dir):
if filename.startswith(expected_prefix) and filename.endswith(f'.{output_format}'):
final_audio_path = os.path.join(temp_dir, filename)
print(f"Found downloaded audio file: {final_audio_path}") # Replaced logger
break
if not final_audio_path:
# Fallback search if exact name match failed
for filename in os.listdir(temp_dir):
if filename.endswith(f'.{output_format}'):
final_audio_path = os.path.join(temp_dir, filename)
print(f"Warning: Found fallback audio file: {final_audio_path}") # Replaced logger
break
if not final_audio_path or not os.path.exists(final_audio_path):
raise FileNotFoundError(f"Could not locate the downloaded {output_format} file in {temp_dir}")
except yt_dlp.utils.DownloadError as e:
print(f"Error: yt-dlp download error for {url}: {e}") # Replaced logger
raise HTTPException(status_code=502, detail=f"Failed to download from YouTube: {e}")
except FileNotFoundError as e:
print(f"Error: File not found after download attempt for {url}: {e}") # Replaced logger
raise HTTPException(status_code=500, detail="Download process failed to produce audio file.")
except Exception as e:
print(f"Error: Unexpected error during YouTube download for {url}: {e}") # Replaced logger
# Consider adding traceback print here: import traceback; traceback.print_exc()
raise HTTPException(status_code=500, detail=f"An unexpected error occurred during download: {e}")
# --- Metadata Embedding ---
metadata_embedded = False
if downloaded_info and output_format == 'mp3': # Only embed for mp3 currently
title = downloaded_info.get('title', 'Unknown Title')
artist = downloaded_info.get('uploader', downloaded_info.get('channel', 'Unknown Artist'))
album = downloaded_info.get('album')
year = downloaded_info.get('upload_date', '')[:4] if downloaded_info.get('upload_date') else None
youtube_id = downloaded_info.get('id')
thumbnail_url = downloaded_info.get('thumbnail')
metadata_embedded = embed_metadata_fastapi(
mp3_path=final_audio_path,
title=title,
artist=artist,
album=album,
thumbnail_url=thumbnail_url,
year=year,
youtube_id=youtube_id
)
elif output_format != 'mp3':
print(f"Warning: Metadata embedding skipped for non-mp3 format: {output_format}") # Replaced logger
else:
print("Warning: Metadata embedding skipped as download info was not available.") # Replaced logger
# --- Prepare Response ---
# Generate a user-friendly filename
final_filename_user = "downloaded_track." + output_format
if downloaded_info:
title = downloaded_info.get('title', 'Unknown Title')
artist = downloaded_info.get('uploader', downloaded_info.get('channel', 'Unknown Artist'))
final_filename_user = f"{sanitize_filename(title)} - {sanitize_filename(artist)}.{output_format}"
print(f"Preparing FileResponse for '{final_filename_user}'") # Replaced logger
# Create a copy of the file to a non-temporary location to avoid cleanup issues
# This ensures the file remains available after the response is sent
import uuid
persistent_dir = os.path.join(tempfile.gettempdir(), "songporter_downloads")
os.makedirs(persistent_dir, exist_ok=True)
# Generate a unique filename for the persistent copy
unique_id = str(uuid.uuid4())[:8]
persistent_path = os.path.join(persistent_dir, f"{unique_id}_{final_filename_user}")
try:
shutil.copy2(final_audio_path, persistent_path)
print(f"Created persistent copy at: {persistent_path}")
except Exception as e:
print(f"Warning: Failed to create persistent copy: {e}")
# Fall back to using the original path
persistent_path = final_audio_path
# Define headers for FileResponse
headers = {
'Content-Disposition': f'attachment; filename="{final_filename_user}"'
}
if downloaded_info:
headers['X-Song-Title'] = downloaded_info.get('title', 'Unknown Title')
headers['X-Song-Artist'] = downloaded_info.get('uploader', downloaded_info.get('channel', 'Unknown Artist'))
if downloaded_info.get('album'): headers['X-Song-Album'] = downloaded_info.get('album')
if downloaded_info.get('upload_date'): headers['X-Song-Year'] = downloaded_info.get('upload_date')[:4]
if downloaded_info.get('thumbnail'): headers['X-Thumbnail-URL'] = downloaded_info.get('thumbnail')
if downloaded_info.get('id'): headers['X-YouTube-ID'] = downloaded_info.get('id')
if downloaded_info.get('duration'): headers['X-Duration-Seconds'] = str(int(downloaded_info.get('duration')))
# Schedule cleanup to happen in a separate thread after response
# This won't block the response from being sent
import threading
def delayed_cleanup():
# Wait a bit to ensure the file streaming has started
time.sleep(1)
try:
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
print(f"Cleaned up temporary directory: {temp_dir}")
# Persistent file will be cleaned up by the OS's temp file cleaner eventually
except Exception as e:
print(f"Error during delayed cleanup: {e}")
# Start cleanup in separate thread
threading.Thread(target=delayed_cleanup, daemon=True).start()
# Return FileResponse with the persistent path
# Use StreamingResponse for potentially better streaming performance
def iterfile():
with open(persistent_path, mode="rb") as file_like:
yield from file_like
return StreamingResponse(
iterfile(),
media_type=f'audio/{output_format}',
headers=headers
)
except HTTPException as http_exc:
# Re-raise HTTPExceptions directly
print(f"HTTP Exception: {http_exc.status_code} - {http_exc.detail}") # Optional: print HTTP exceptions
# Cleanup might be needed here too if temp_dir was created before the exception
if temp_dir and os.path.exists(temp_dir):
try:
shutil.rmtree(temp_dir, ignore_errors=True)
print(f"Cleaned up temp dir due to HTTPException: {temp_dir}")
except Exception as e:
print(f"Error cleaning temp dir during HTTPException: {e}")
raise http_exc
except Exception as e:
print(f"Error: Unexpected error in download endpoint for {url}: {e}") # Replaced logger
# Consider adding traceback print here: import traceback; traceback.print_exc()
if temp_dir and os.path.exists(temp_dir):
try:
shutil.rmtree(temp_dir, ignore_errors=True) # Cleanup on unexpected errors
print(f"Cleaned up temp dir due to unexpected error: {temp_dir}")
except Exception as e_clean:
print(f"Error cleaning temp dir during unexpected error handling: {e_clean}")
raise HTTPException(status_code=500, detail=f"An unexpected server error occurred: {str(e)}")
@app.get("/")
async def root():
return {
"message": "SongPorter API",
"endpoints": [
{"path": "/", "method": "GET", "description": "This help message"},
{"path": "/recommendations/", "method": "POST", "description": "Get song recommendations"},
{"path": "/artist-info/", "method": "POST", "description": "Get artist information"},
{"path": "/download-youtube/", "method": "POST", "description": "Download audio from YouTube URL"} # Added endpoint info
],
"version": "1.1.0" # Updated version
}
# --- Serve static files if they exist ---
static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static")
if os.path.exists(static_dir):
app.mount("/static", StaticFiles(directory=static_dir), name="static")
logger.info(f"Mounted static files from {static_dir}")
#final 2
# --- Run for local development (if not on Hugging Face) ---
if __name__ == "__main__" and os.environ.get('DEPLOYMENT_ENV') != 'huggingface':
port = int(os.environ.get("PORT", 8000)) # Default to 8000 locally
uvicorn.run("app:app", host="0.0.0.0", port=port, reload=True)