from flask import Flask, render_template, jsonify, request from ytmusicapi import YTMusic import os import logging import requests from datetime import datetime, timedelta import time import asyncio import cloudscraper from pydantic import BaseModel from urllib.parse import urlparse, parse_qs from collections import defaultdict import threading app = Flask(__name__) ytmusic = YTMusic() # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @app.route('/') def index(): return render_template('index.html') @app.route('/search', methods=['POST']) def search(): query = request.json.get('query', '') search_results = ytmusic.search(query, filter="songs") return jsonify(search_results) @app.route('/searcht', methods=['POST']) def searcht(): query = request.json.get('query', '') logger.info(f"serch query: {query}") search_results = ytmusic.search(query, filter="songs") first_song = next((song for song in search_results if 'videoId' in song and song['videoId']), {}) if search_results else {} return jsonify(first_song) # Function to extract track ID from Amazon Music URL def extract_amazon_track_id(url: str): if "music.amazon.com" in url: # Case 1: URL contains trackAsin (e.g., https://music.amazon.com/albums/B01N48U32A?trackAsin=B01NAE38YO&do=play) parsed_url = urlparse(url) query_params = parse_qs(parsed_url.query) if "trackAsin" in query_params: return query_params["trackAsin"][0] # Case 2: URL is a direct track link (e.g., https://music.amazon.com/tracks/B0DNTPYT5S) if "/tracks/" in url: return url.split("/tracks/")[-1].split("?")[0] return None # Function to get track info from Song.link API def get_song_link_info(url: str): # Check if the URL is from Amazon Music if "music.amazon.com" in url: track_id = extract_amazon_track_id(url) if track_id: # Use the working format for Amazon Music tracks api_url = f"https://api.song.link/v1-alpha.1/links?type=song&platform=amazonMusic&id={track_id}&userCountry=US" else: # If no track ID is found, use the original URL api_url = f"https://api.song.link/v1-alpha.1/links?url={url}&userCountry=US" else: # For non-Amazon Music URLs, use the standard format api_url = f"https://api.song.link/v1-alpha.1/links?url={url}&userCountry=US" # Make the API call response = requests.get(api_url) if response.status_code == 200: return response.json() else: return None # Function to extract Tidal or YouTube URL def extract_url(links_by_platform: dict, platform: str): if platform in links_by_platform: return links_by_platform[platform]["url"] return None # Function to extract track title and artist from entities def extract_track_info(entities_by_unique_id: dict, platform: str): for entity in entities_by_unique_id.values(): if entity["apiProvider"] == platform: return entity["title"], entity["artistName"] return None, None @app.route('/match', methods=['POST']) async def match(): data = request.json track_url = data.get('url') if not track_url: raise HTTPException(status_code=400, detail="No URL provided") track_info = get_song_link_info(track_url) if not track_info: raise HTTPException(status_code=404, detail="Could not fetch track info") youtube_url = extract_url(track_info["linksByPlatform"], "youtube") entityUniqueId = track_info["entityUniqueId"] logger.info(f"songlink info: {entityUniqueId}") title = track_info["entitiesByUniqueId"][entityUniqueId]["title"] artist = track_info["entitiesByUniqueId"][entityUniqueId]["artistName"] if youtube_url: video_id = youtube_url.split("v=")[1] if "v=" in youtube_url else None if title and artist: filename = f"{title} - {artist}" return {"url": youtube_url, "filename": filename, "track_id": video_id} else: return {"url": youtube_url, "filename": "Unknown Track - Unknown Artist", "track_id": video_id} else: search_query = f'{title}+{artist}' search_results = ytmusic.search(search_query, filter="songs") first_song = next((song for song in search_results if 'videoId' in song and song['videoId']), {}) if search_results else {} if 'videoId' in first_song: videoId = first_song["videoId"] ym_url = f'https://www.youtube.com/watch?v={videoId}' return {"filename": search_query, "url": ym_url, "track_id": videoId} else: raise HTTPException(status_code=404, detail="Video ID not found") # If no URLs found, return an error raise HTTPException(status_code=404, detail="No matching URL found") class ApiRotator: def __init__(self, apis): self.apis = apis self.last_successful_index = None def get_prioritized_apis(self): if self.last_successful_index is not None: # Move the last successful API to the front rotated_apis = ( [self.apis[self.last_successful_index]] + self.apis[:self.last_successful_index] + self.apis[self.last_successful_index+1:] ) return rotated_apis return self.apis def update_last_successful(self, index): self.last_successful_index = index # In your function: api_rotator = ApiRotator([ "https://cobalt-api.ayo.tf", "https://dwnld.nichind.dev", "https://cobalt-api.kwiatekmiki.com", "http://34.107.254.11" ]) async def get_track_download_url(track_id: str, quality: str) -> str: apis = api_rotator.get_prioritized_apis() session = cloudscraper.create_scraper() # Requires cloudscraper package headers = { "Accept": "application/json", "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } for i, api_url in enumerate(apis): try: logger.info(f"Attempting to get download URL from: {api_url}") y_url = f"https://youtu.be/{track_id}" response = session.post( api_url, timeout=20, json={"url": y_url, "audioFormat": "mp3", "downloadMode": "audio", "audioBitrate": quality}, headers=headers ) logger.info(f"Response status: {response.status_code}") logger.info(f"Response content: {response.content}") if response.headers.get('content-type', '').startswith('application/json'): json_response = response.json() error_code = json_response.get("error", {}).get("code", "") if error_code == "error.api.content.video.unavailable": logger.warning(f"Video unavailable error from {api_url}") break # Only break for specific error if "url" in json_response: api_rotator.update_last_successful(i) return json_response["url"] except Exception as e: logger.error(f"Failed with {api_url}: {str(e)}") continue logger.error(f"No download URL found") return {"error": "Download URL not found"} async def get_audio_download_url(track_id: str, quality: str) -> str: youtube_url = f'https://www.youtube.com/watch?v={track_id}' donwnload_url = f'https://chrunos-shadl.hf.space/yt/dl?url={youtube_url}&type=audio' return donwnload_url @app.route('/track_dl', methods=['POST']) async def track_dl(): data = request.get_json() track_id = data.get('track_id') quality = data.get('quality', '128') try: quality_num = int(quality) if quality_num > 128 or quality.upper() == 'FLAC': return jsonify({ "error": "Quality above 128 or FLAC is for Premium users Only.", "premium": "https://chrunos.com/premium-shortcuts/" }), 400 dl_url = await get_track_download_url(track_id, quality) logger.info(dl_url) if dl_url and "http" in dl_url: result = { "url": dl_url, "premium": "https://chrunos.com/premium-shortcuts/" } return jsonify(result) else: return jsonify({ "error": "Failed to Fetch the Track.", "premium": "https://chrunos.com/premium-shortcuts/" }), 400 except ValueError: return jsonify({ "error": "Invalid quality value provided. It should be a valid integer or FLAC.", "premium": "https://chrunos.com/premium-shortcuts/" }), 400 @app.route('/get_artist', methods=['GET']) def get_artist(): artist_id = request.args.get('id') artist_info = ytmusic.get_artist(artist_id) return jsonify(artist_info) @app.route('/get_album', methods=['GET']) def get_album(): album_id = request.args.get('id') album_info = ytmusic.get_album(album_id) return jsonify(album_info) @app.route('/get_song', methods=['GET']) def get_song(): song_id = request.args.get('id') song_info = ytmusic.get_song(song_id) return jsonify(song_info) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)