OpenSUNO / app.py
ginipick's picture
Update app.py
fba7d92 verified
raw
history blame
21.9 kB
import gradio as gr
import subprocess
import os
import shutil
import tempfile
import torch
import logging
import numpy as np
import re
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
# ๋กœ๊น… ์„ค์ •
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('yue_generation.log'),
logging.StreamHandler()
]
)
################################
# ๊ธฐ์กด์— ์ •์˜๋œ ํ•จ์ˆ˜ ๋ฐ ๋กœ์ง๋“ค #
################################
def optimize_gpu_settings():
if torch.cuda.is_available():
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.enabled = True
torch.backends.cudnn.deterministic = False
torch.cuda.empty_cache()
torch.cuda.set_device(0)
torch.cuda.Stream(0)
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:512'
logging.info(f"Using GPU: {torch.cuda.get_device_name(0)}")
logging.info(f"Available GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB")
if 'L40S' in torch.cuda.get_device_name(0):
torch.cuda.set_per_process_memory_fraction(0.95)
import logging
def analyze_lyrics(lyrics, repeat_chorus=2):
# ๋จผ์ € ๋ผ์ธ๋ณ„๋กœ ๋ถ„๋ฆฌํ•˜๊ณ , ๊ณต๋ฐฑ ์ค„ ์ œ๊ฑฐ
lines = [line.strip() for line in lyrics.split('\n')]
lines = [line for line in lines if line]
# ๋งŒ์•ฝ ์ „์ฒด๊ฐ€ ๋น„์–ด์žˆ๋‹ค๋ฉด ๊ฐ•์ œ๋กœ '.' ํ•œ ์ค„ ์ถ”๊ฐ€
if not lines:
lines = ['.']
else:
# ๋งˆ์ง€๋ง‰ ์ค„์ด [verse], [chorus], [bridge] ํƒœ๊ทธ๋กœ๋งŒ ๋๋‚˜๋ฉด
# ์ž„์˜๋กœ '.' ํ•œ ์ค„์„ ์ถ”๊ฐ€ํ•˜์—ฌ ์‹ค์ œ ๊ฐ€์‚ฌ ๋ผ์ธ์ด ๋˜๋„๋ก ์ฒ˜๋ฆฌ
last_line_lower = lines[-1].lower()
if last_line_lower in ['[verse]', '[chorus]', '[bridge]']:
lines.append('.')
# ๊ธฐ๋ณธ ์„น์…˜ ์ •๋ณด
sections = {
'verse': 0,
'chorus': 0,
'bridge': 0,
'total_lines': len(lines)
}
# ์„น์…˜ ๋ผ์ธ๋“ค์„ ๋‹ด์„ ๋”•์…”๋„ˆ๋ฆฌ
section_lines = {
'verse': [],
'chorus': [],
'bridge': []
}
current_section = None
last_section_start = 0
# [verse], [chorus], [bridge] ํƒœ๊ทธ๊ฐ€ ๋‚˜์˜ค๋ฉด ์„น์…˜์„ ๊ตฌ๋ถ„ํ•˜์—ฌ ๋ผ์ธ์„ ์ €์žฅ
for i, line in enumerate(lines):
lower_line = line.lower()
if '[verse]' in lower_line:
if current_section is not None:
section_lines[current_section].extend(lines[last_section_start:i])
current_section = 'verse'
sections['verse'] += 1
last_section_start = i + 1
elif '[chorus]' in lower_line:
if current_section is not None:
section_lines[current_section].extend(lines[last_section_start:i])
current_section = 'chorus'
sections['chorus'] += 1
last_section_start = i + 1
elif '[bridge]' in lower_line:
if current_section is not None:
section_lines[current_section].extend(lines[last_section_start:i])
current_section = 'bridge'
sections['bridge'] += 1
last_section_start = i + 1
# ๋งˆ์ง€๋ง‰ ์„น์…˜์— ๋‚จ์•„ ์žˆ๋Š” ๋ผ์ธ๋“ค์„ ์ถ”๊ฐ€
if current_section is not None and last_section_start < len(lines):
section_lines[current_section].extend(lines[last_section_start:])
# ์ฝ”๋Ÿฌ์Šค ๋ฐ˜๋ณต ์ฒ˜๋ฆฌ
if sections['chorus'] > 0 and repeat_chorus > 1:
original_chorus = list(section_lines['chorus'])
for _ in range(repeat_chorus - 1):
section_lines['chorus'].extend(original_chorus)
# ์„น์…˜๋ณ„ ๋ผ์ธ์ˆ˜ ๋กœ๊น…
logging.info(
f"Section line counts - Verse: {len(section_lines['verse'])}, "
f"Chorus: {len(section_lines['chorus'])}, "
f"Bridge: {len(section_lines['bridge'])}"
)
# ๋ฐ˜ํ™˜: ์„น์…˜ ์ •๋ณด, ์ „์ฒด ์„น์…˜ ์ˆ˜, ์ „์ฒด ๋ผ์ธ ์ˆ˜, ๊ฐ ์„น์…˜๋ณ„ ๋ผ์ธ ๋”•์…”๋„ˆ๋ฆฌ
return sections, (sections['verse'] + sections['chorus'] + sections['bridge']), len(lines), section_lines
def calculate_generation_params(lyrics):
sections, total_sections, total_lines, section_lines = analyze_lyrics(lyrics)
time_per_line = {
'verse': 4,
'chorus': 6,
'bridge': 5
}
section_durations = {}
for section_type in ['verse', 'chorus', 'bridge']:
lines_count = len(section_lines[section_type])
section_durations[section_type] = lines_count * time_per_line[section_type]
total_duration = sum(duration for duration in section_durations.values())
total_duration = max(60, int(total_duration * 1.2))
base_tokens = 3000
tokens_per_line = 200
extra_tokens = 1000
total_tokens = base_tokens + (total_lines * tokens_per_line) + extra_tokens
if sections['chorus'] > 0:
num_segments = 4
else:
num_segments = 3
max_tokens = min(12000, total_tokens)
return {
'max_tokens': max_tokens,
'num_segments': num_segments,
'sections': sections,
'section_lines': section_lines,
'estimated_duration': total_duration,
'section_durations': section_durations,
'has_chorus': sections['chorus'] > 0
}
def create_temp_file(content, prefix, suffix=".txt"):
temp_file = tempfile.NamedTemporaryFile(delete=False, mode="w", prefix=prefix, suffix=suffix)
content = content.strip() + "\n\n"
content = content.replace("\r\n", "\n").replace("\r", "\n")
temp_file.write(content)
temp_file.close()
logging.debug(f"Temporary file created: {temp_file.name}")
return temp_file.name
def empty_output_folder(output_dir):
try:
shutil.rmtree(output_dir)
os.makedirs(output_dir)
logging.info(f"Output folder cleaned: {output_dir}")
except Exception as e:
logging.error(f"Error cleaning output folder: {e}")
raise
def get_last_mp3_file(output_dir):
mp3_files = [f for f in os.listdir(output_dir) if f.endswith('.mp3')]
if not mp3_files:
logging.warning("No MP3 files found")
return None
mp3_files_with_path = [os.path.join(output_dir, f) for f in mp3_files]
mp3_files_with_path.sort(key=os.path.getmtime, reverse=True)
return mp3_files_with_path[0]
def get_audio_duration(file_path):
try:
import librosa
duration = librosa.get_duration(path=file_path)
return duration
except Exception as e:
logging.error(f"Failed to get audio duration: {e}")
return None
def detect_and_select_model(text):
if re.search(r'[\u3131-\u318E\uAC00-\uD7A3]', text):
return "m-a-p/YuE-s1-7B-anneal-jp-kr-cot"
elif re.search(r'[\u4e00-\u9fff]', text):
return "m-a-p/YuE-s1-7B-anneal-zh-cot"
elif re.search(r'[\u3040-\u309F\u30A0-\u30FF]', text):
return "m-a-p/YuE-s1-7B-anneal-jp-kr-cot"
else:
return "m-a-p/YuE-s1-7B-anneal-en-cot"
def install_flash_attn():
try:
if not torch.cuda.is_available():
logging.warning("GPU not available, skipping flash-attn installation")
return False
cuda_version = torch.version.cuda
if cuda_version is None:
logging.warning("CUDA not available, skipping flash-attn installation")
return False
logging.info(f"Detected CUDA version: {cuda_version}")
try:
import flash_attn
logging.info("flash-attn already installed")
return True
except ImportError:
logging.info("Installing flash-attn...")
subprocess.run(
["pip", "install", "flash-attn", "--no-build-isolation"],
check=True,
capture_output=True
)
logging.info("flash-attn installed successfully!")
return True
except Exception as e:
logging.warning(f"Failed to install flash-attn: {e}")
return False
def initialize_system():
optimize_gpu_settings()
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
futures.append(executor.submit(install_flash_attn))
from huggingface_hub import snapshot_download
folder_path = './inference/xcodec_mini_infer'
os.makedirs(folder_path, exist_ok=True)
logging.info(f"Created folder at: {folder_path}")
futures.append(executor.submit(
snapshot_download,
repo_id="m-a-p/xcodec_mini_infer",
local_dir="./inference/xcodec_mini_infer",
resume_download=True
))
for future in futures:
future.result()
try:
os.chdir("./inference")
logging.info(f"Working directory changed to: {os.getcwd()}")
except FileNotFoundError as e:
logging.error(f"Directory error: {e}")
raise
@lru_cache(maxsize=100)
def get_cached_file_path(content_hash, prefix):
return create_temp_file(content_hash, prefix)
def optimize_model_selection(lyrics, genre):
model_path = detect_and_select_model(lyrics)
params = calculate_generation_params(lyrics)
has_chorus = params['sections']['chorus'] > 0
model_config = {
"m-a-p/YuE-s1-7B-anneal-en-cot": {
"max_tokens": params['max_tokens'],
"temperature": 0.8,
"batch_size": 16,
"num_segments": params['num_segments'],
"estimated_duration": params['estimated_duration']
},
"m-a-p/YuE-s1-7B-anneal-jp-kr-cot": {
"max_tokens": params['max_tokens'],
"temperature": 0.7,
"batch_size": 16,
"num_segments": params['num_segments'],
"estimated_duration": params['estimated_duration']
},
"m-a-p/YuE-s1-7B-anneal-zh-cot": {
"max_tokens": params['max_tokens'],
"temperature": 0.7,
"batch_size": 16,
"num_segments": params['num_segments'],
"estimated_duration": params['estimated_duration']
}
}
if has_chorus:
for config in model_config.values():
config['max_tokens'] = int(config['max_tokens'] * 1.5)
return model_path, model_config[model_path], params
def infer(genre_txt_content, lyrics_txt_content, num_segments, max_new_tokens):
genre_txt_path = None
lyrics_txt_path = None
try:
# ---- (1) ํ™”๋ฉด์—๋Š” ๋ณด์ด์ง€ ์•Š์ง€๋งŒ, ๋งˆ์ง€๋ง‰์— [chorus] bye ์‚ฝ์ž… ----
forced_line = "[chorus] bye"
tmp_lyrics = lyrics_txt_content.strip()
# ์ด๋ฏธ 'bye'๊ฐ€ ๋“ค์–ด์žˆ๋Š”์ง€ ํ™•์ธ (์›ํ•œ๋‹ค๋ฉด ์กฐ๊ฑด ์ถ”๊ฐ€/์‚ญ์ œ ๊ฐ€๋Šฅ)
if forced_line.lower() not in tmp_lyrics.lower():
tmp_lyrics += "\n" + forced_line
# ---- (2) ๊ฐ•์ œ ์‚ฝ์ž…๋œ tmp_lyrics๋ฅผ ํ†ตํ•ด ๋ชจ๋ธ ์ตœ์ ํ™”/์„ค์ • ----
model_path, config, params = optimize_model_selection(tmp_lyrics, genre_txt_content)
logging.info(f"Selected model: {model_path}")
logging.info(f"Lyrics analysis: {params}")
has_chorus = params['sections']['chorus'] > 0
estimated_duration = params.get('estimated_duration', 90)
# ์„ธ๊ทธ๋จผํŠธ ๋ฐ ํ† ํฐ ์ˆ˜ ์„ค์ •
if has_chorus:
actual_max_tokens = min(12000, int(config['max_tokens'] * 1.3)) # 30% ๋” ๋งŽ์€ ํ† ํฐ
actual_num_segments = min(5, params['num_segments'] + 2) # ์ถ”๊ฐ€ ์„ธ๊ทธ๋จผํŠธ
else:
actual_max_tokens = min(10000, int(config['max_tokens'] * 1.2))
actual_num_segments = min(4, params['num_segments'] + 1)
logging.info(f"Estimated duration: {estimated_duration} seconds")
logging.info(f"Has chorus sections: {has_chorus}")
logging.info(f"Using segments: {actual_num_segments}, tokens: {actual_max_tokens}")
genre_txt_path = create_temp_file(genre_txt_content, prefix="genre_")
# tmp_lyrics(๊ฐ•์ œ ์ถ”๊ฐ€๋œ ๋ฌธ์ž์—ด)์„ ์ž„์‹œ ํŒŒ์ผ๋กœ ์ €์žฅ
lyrics_txt_path = create_temp_file(tmp_lyrics, prefix="lyrics_")
output_dir = "./output"
os.makedirs(output_dir, exist_ok=True)
empty_output_folder(output_dir)
command = [
"python", "infer.py",
"--stage1_model", model_path,
"--stage2_model", "m-a-p/YuE-s2-1B-general",
"--genre_txt", genre_txt_path,
"--lyrics_txt", lyrics_txt_path,
"--run_n_segments", str(actual_num_segments),
"--stage2_batch_size", "16",
"--output_dir", output_dir,
"--cuda_idx", "0",
"--max_new_tokens", str(actual_max_tokens),
"--disable_offload_model"
]
env = os.environ.copy()
if torch.cuda.is_available():
env.update({
"CUDA_VISIBLE_DEVICES": "0",
"CUDA_HOME": "/usr/local/cuda",
"PATH": f"/usr/local/cuda/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"/usr/local/cuda/lib64:{env.get('LD_LIBRARY_PATH', '')}",
"PYTORCH_CUDA_ALLOC_CONF": "max_split_size_mb:512",
"CUDA_LAUNCH_BLOCKING": "0"
})
# transformers ์บ์‹œ ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ์ฒ˜๋ฆฌ (๋ฒ„์ „์— ๋”ฐ๋ผ ๋™์ž‘ํ•˜์ง€ ์•Š์„ ์ˆ˜ ์žˆ์Œ)
try:
from transformers.utils import move_cache
move_cache()
except Exception as e:
logging.warning(f"Cache migration warning (non-critical): {e}")
process = subprocess.run(
command,
env=env,
check=False,
capture_output=True,
text=True
)
logging.info(f"Command output: {process.stdout}")
if process.stderr:
logging.error(f"Command error: {process.stderr}")
if process.returncode != 0:
logging.error(f"Command failed with return code: {process.returncode}")
logging.error(f"Command: {' '.join(command)}")
raise RuntimeError(f"Inference failed: {process.stderr}")
last_mp3 = get_last_mp3_file(output_dir)
if last_mp3:
try:
duration = get_audio_duration(last_mp3)
logging.info(f"Generated audio file: {last_mp3}")
if duration:
logging.info(f"Audio duration: {duration:.2f} seconds")
logging.info(f"Expected duration: {estimated_duration} seconds")
if duration < estimated_duration * 0.8:
logging.warning(
f"Generated audio is shorter than expected: {duration:.2f}s < {estimated_duration:.2f}s"
)
except Exception as e:
logging.warning(f"Failed to get audio duration: {e}")
return last_mp3
else:
logging.warning("No output audio file generated")
return None
except Exception as e:
logging.error(f"Inference error: {e}")
raise
finally:
for path in [genre_txt_path, lyrics_txt_path]:
if path and os.path.exists(path):
try:
os.remove(path)
logging.debug(f"Removed temporary file: {path}")
except Exception as e:
logging.warning(f"Failed to remove temporary file {path}: {e}")
#####################################
# ์•„๋ž˜๋ถ€ํ„ฐ Gradio UI ๋ฐ main() ๋ถ€๋ถ„ #
#####################################
def update_info(lyrics):
"""๊ฐ€์‚ฌ ๋ณ€๊ฒฝ ์‹œ ์ถ”์ • ์ •๋ณด๋ฅผ ์—…๋ฐ์ดํŠธํ•˜๋Š” ํ•จ์ˆ˜."""
if not lyrics:
return "No lyrics entered", "No sections detected"
params = calculate_generation_params(lyrics)
duration = params['estimated_duration']
sections = params['sections']
return (
f"Estimated duration: {duration:.1f} seconds",
f"Verses: {sections['verse']}, Chorus: {sections['chorus']} (Expected full length including chorus)"
)
def main():
# ์‹œ์Šคํ…œ ์ดˆ๊ธฐํ™”
initialize_system()
# samples ๋””๋ ‰ํ† ๋ฆฌ ๋ฐ ์˜ˆ์ œ ํŒŒ์ผ ์ฒ˜๋ฆฌ
current_dir = os.path.dirname(os.path.abspath(__file__))
samples_dir = os.path.join(current_dir, 'samples')
sample_audio_path = os.path.join(samples_dir, 'metal.mp3')
os.makedirs(samples_dir, exist_ok=True)
with gr.Blocks(css="""
/* ์ „์ฒด ๋ฐฐ๊ฒฝ ๋ฐ ์ปจํ…Œ์ด๋„ˆ ์Šคํƒ€์ผ */
body {
background-color: #f5f5f5;
}
.gradio-container {
max-width: 1000px;
margin: auto !important;
background-color: #ffffff;
border-radius: 8px;
padding: 20px;
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1);
}
h1, h2, h3 {
margin: 0;
padding: 0;
}
p {
margin: 5px 0;
}
/* ์˜ˆ์ œ ๋ธ”๋ก ์Šคํƒ€์ผ */
.gr-examples {
background-color: #fafafa;
border-radius: 8px;
padding: 10px;
}
""") as demo:
# ์ƒ๋‹จ ํ—ค๋”
gr.HTML("""
<div style="text-align: center; margin-bottom: 1.5rem;">
<h1>Open SUNO: Full-Song Generation (Multi-Language Support)</h1>
<p style="font-size: 1.1rem; color: #555;">
Enter your song details below and let the AI handle the music production!
</p>
</div>
""")
# ์˜ˆ์ œ ์Œ์•… ์„น์…˜
with gr.Group():
gr.HTML("""
<div style="padding: 1rem; margin-bottom: 1.5rem; background-color: #f8f9fa; border-radius: 8px; text-align: center;">
<h3 style="margin: 0;">Sample Generated Music</h3>
<p style="color: #666; margin: 5px 0;">Listen to this example</p>
</div>
""")
if os.path.exists(sample_audio_path):
gr.Audio(
value=sample_audio_path,
label="Sample Music",
type="filepath"
)
else:
gr.Markdown("### Sample music file not available")
with gr.Row():
# ์™ผ์ชฝ ์ž…๋ ฅ ์ปฌ๋Ÿผ
with gr.Column():
genre_txt = gr.Textbox(
label="Genre",
placeholder="Enter music genre and style descriptions...",
lines=2
)
lyrics_txt = gr.Textbox(
label="Lyrics (Supports English, Korean, Japanese, Chinese)",
placeholder="Enter song lyrics with [verse], [chorus], [bridge] tags...",
lines=10
)
# ์˜ค๋ฅธ์ชฝ ์„ค์ •/์ •๋ณด ์ปฌ๋Ÿผ
with gr.Column():
with gr.Group():
gr.Markdown("### Generation Settings")
num_segments = gr.Number(
label="Number of Song Segments (Auto-adjusted)",
value=2,
minimum=1,
maximum=4,
step=1,
interactive=False
)
max_new_tokens = gr.Slider(
label="Max New Tokens (Auto-adjusted)",
minimum=500,
maximum=32000,
step=500,
value=4000,
interactive=False
)
with gr.Group():
gr.Markdown("### Song Info")
duration_info = gr.Label(label="Estimated Duration")
sections_info = gr.Label(label="Section Information")
submit_btn = gr.Button("Generate Music", variant="primary")
with gr.Group():
music_out = gr.Audio(label="Generated Audio")
# ์˜ˆ์‹œ
gr.Examples(
examples=[
[
"Pop catchy uplifting romantic love song",
"""
[verse]
Under the city lights, your hand in mine
Every step we take, feels like a sign
[chorus]
Baby, you're my everything, my heart is yours
"""
],
[
"K-pop upbeat youthful synth electronic",
"""
[verse]
๋…ธ์„ ์†์— ๋„ˆ์˜ ๊ธฐ์–ต์ด ๋– ์˜ฌ๋ผ
[chorus]
์–ด๋””๋“  ๋„ค ๊ณ์— ๋‚ด๊ฐ€ ์žˆ์„๊ฒŒ
[bridge]
๋ฉ€๋ฆฌ๋ผ๋„ ๋„ ์œ„ํ•ด ๋‹ฌ๋ ค๊ฐˆ๊ฒŒ
"""
],
[
"J-pop energetic emotional dance synth",
"""
[verse]
ๅคœใฎ่ก—ใซๅ…‰ใ‚‹ๅ›ใฎ็ฌ‘้ก”
ใฉใ‚“ใชๆ™‚ใ‚‚ใใฐใซใ„ใ‚‹ใ‚ˆ
[chorus]
ใ“ใฎๆฐ—ๆŒใกๆญขใ‚ใ‚‰ใ‚Œใชใ„
"""
],
[
"Mandopop sentimental ballad love song piano",
"""
[verse]
ๅคœ่‰ฒๆธฉๆŸ”ๅƒไฝ ็š„ๆ‹ฅๆŠฑ
ๅฟƒ่ทณ้š็€ไฝ ๆ…ขๆ…ขๅ˜้ซ˜
[chorus]
ๆฐธ่ฟœไธ่ฆๆ”พๅผ€ๆˆ‘็š„ๆ‰‹
"""
]
],
inputs=[genre_txt, lyrics_txt],
outputs=[]
)
# ๊ฐ€์‚ฌ ๋ณ€๊ฒฝ ์‹œ ์ถ”์ • ์ •๋ณด ์—…๋ฐ์ดํŠธ
lyrics_txt.change(
fn=update_info,
inputs=[lyrics_txt],
outputs=[duration_info, sections_info]
)
# ๋ฒ„ํŠผ ํด๋ฆญ ์‹œ infer ์‹คํ–‰
submit_btn.click(
fn=infer,
inputs=[genre_txt, lyrics_txt, num_segments, max_new_tokens],
outputs=[music_out]
)
return demo
if __name__ == "__main__":
demo = main()
demo.queue(max_size=20).launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
show_api=True,
show_error=True,
max_threads=8
)