import spaces
from snac import SNAC
import torch
import gradio as gr
from transformers import AutoModelForCausalLM, AutoTokenizer
from huggingface_hub import snapshot_download
from dotenv import load_dotenv
load_dotenv()

# Check if CUDA is available
device = "cuda" if torch.cuda.is_available() else "cpu"

print("Loading SNAC model...")
snac_model = SNAC.from_pretrained("hubertsiuzdak/snac_24khz")
snac_model = snac_model.to(device)

model_name = "syvai/tts-v1-finetuned"

# Download only model config and safetensors
snapshot_download(
    repo_id=model_name,
    allow_patterns=[
        "config.json",
        "*.safetensors",
        "model.safetensors.index.json",
    ],
    ignore_patterns=[
        "optimizer.pt",
        "pytorch_model.bin",
        "training_args.bin",
        "scheduler.pt",
        "tokenizer.json",
        "tokenizer_config.json",
        "special_tokens_map.json",
        "vocab.json",
        "merges.txt",
        "tokenizer.*"
    ]
)

model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16)
model.to(device)
tokenizer = AutoTokenizer.from_pretrained(model_name)
print(f"Orpheus model loaded to {device}")

# Process text prompt
def process_prompt(prompt, voice, tokenizer, device):
    prompt = f"{voice}: {prompt}"
    input_ids = tokenizer(prompt, return_tensors="pt").input_ids
    
    start_token = torch.tensor([[128259]], dtype=torch.int64)  # Start of human
    end_tokens = torch.tensor([[128009, 128260]], dtype=torch.int64)  # End of text, End of human
    
    modified_input_ids = torch.cat([start_token, input_ids, end_tokens], dim=1)  # SOH SOT Text EOT EOH
    
    # No padding needed for single input
    attention_mask = torch.ones_like(modified_input_ids)
    
    return modified_input_ids.to(device), attention_mask.to(device)

# Parse output tokens to audio
def parse_output(generated_ids):
    token_to_find = 128257
    token_to_remove = 128258
    
    token_indices = (generated_ids == token_to_find).nonzero(as_tuple=True)

    if len(token_indices[1]) > 0:
        last_occurrence_idx = token_indices[1][-1].item()
        cropped_tensor = generated_ids[:, last_occurrence_idx+1:]
    else:
        cropped_tensor = generated_ids

    processed_rows = []
    for row in cropped_tensor:
        masked_row = row[row != token_to_remove]
        processed_rows.append(masked_row)

    code_lists = []
    for row in processed_rows:
        row_length = row.size(0)
        new_length = (row_length // 7) * 7
        trimmed_row = row[:new_length]
        trimmed_row = [t - 128266 for t in trimmed_row]
        code_lists.append(trimmed_row)
        
    return code_lists[0]  # Return just the first one for single sample

# Redistribute codes for audio generation
def redistribute_codes(code_list, snac_model):
    device = next(snac_model.parameters()).device  # Get the device of SNAC model
    
    layer_1 = []
    layer_2 = []
    layer_3 = []
    for i in range((len(code_list)+1)//7):
        layer_1.append(code_list[7*i])
        layer_2.append(code_list[7*i+1]-4096)
        layer_3.append(code_list[7*i+2]-(2*4096))
        layer_3.append(code_list[7*i+3]-(3*4096))
        layer_2.append(code_list[7*i+4]-(4*4096))
        layer_3.append(code_list[7*i+5]-(5*4096))
        layer_3.append(code_list[7*i+6]-(6*4096))
        
    # Move tensors to the same device as the SNAC model
    codes = [
        torch.tensor(layer_1, device=device).unsqueeze(0),
        torch.tensor(layer_2, device=device).unsqueeze(0),
        torch.tensor(layer_3, device=device).unsqueeze(0)
    ]
    
    audio_hat = snac_model.decode(codes)
    return audio_hat.detach().squeeze().cpu().numpy()  # Always return CPU numpy array

# Main generation function
@spaces.GPU()
def generate_speech(text, voice, temperature, top_p, repetition_penalty, max_new_tokens, progress=gr.Progress()):
    if not text.strip():
        return None
    
    try:
        progress(0.1, "Processing text...")
        input_ids, attention_mask = process_prompt(text, voice, tokenizer, device)
        
        progress(0.3, "Generating speech tokens...")
        with torch.no_grad():
            generated_ids = model.generate(
                input_ids=input_ids,
                attention_mask=attention_mask,
                max_new_tokens=max_new_tokens,
                do_sample=True,
                temperature=temperature,
                top_p=top_p,
                repetition_penalty=repetition_penalty,
                num_return_sequences=1,
                eos_token_id=128258,
            )
        
        progress(0.6, "Processing speech tokens...")
        code_list = parse_output(generated_ids)
        
        progress(0.8, "Converting to audio...")
        audio_samples = redistribute_codes(code_list, snac_model)
        
        return (24000, audio_samples)  # Return sample rate and audio
    except Exception as e:
        print(f"Error generating speech: {e}")
        return None

# Examples for the UI
examples = [
    ["Spørger man lykke friis, der er tysklandskender og direktør i Tænketanken europa, så kan man kun gætte på årsagerne, men  er ikke gode venner med alle i regeringen.", "mic", 0.2, 0.95, 1.1, 1200],
    ["Det burde have været en formssag i Den Tyske Forbundsdag, men det endte som alt andet end det. For første gang i Forbundsrepublikkens historie fik kanslerkandidaten ikke nok stemmer til at sikre sig den fornemme titel som kansler, da der skulle stemmes i parlamentet.", "nic", 0.2, 0.95, 1.1, 2000],
]

# Available voices
VOICES = ["nic", "mic"]

# Available Emotive Tags
EMOTIVE_TAGS = []

# Create Gradio interface
with gr.Blocks(title="Syv.ai TTS v0.1") as demo:
    gr.Markdown(f"""
    # 🎵 [Syv.ai TTS v0.1](https://huggingface.co/syvai/tts-v1-finetuned)
    Skriv din tekst (gerne kortere end 200 tegn) nedenfor og hør hvad den kan.
    
    Vi har pt. kun 2 stemmer, og ingen måde at styre tone, grin eller andre paralinguistiske elementer. Vi arbejder dog på at udgive en model med bedre stemmestying.
                
    Syvai TTS er trænet på +1000 timer af dansk tale og bygger ovenpå en model fra [Orpheus TTS](https://huggingface.co/canopyai/Orpheus-TTS).
    """)    
    with gr.Row():
        with gr.Column(scale=3):
            text_input = gr.Textbox(
                label="Tekst at tale", 
                placeholder="Indtast din tekst her...",
                lines=5
            )
            voice = gr.Dropdown(
                choices=VOICES, 
                value="mic", 
                label="Stemme"
            )
            
            with gr.Accordion("Advanced Settings", open=False):
                temperature = gr.Slider(
                    minimum=0.1, maximum=1.5, value=0.6, step=0.05,
                    label="Temperature", 
                    info="Higher values (0.7-1.0) create more expressive but less stable speech"
                )
                top_p = gr.Slider(
                    minimum=0.1, maximum=1.0, value=0.95, step=0.05,
                    label="Top P", 
                    info="Nucleus sampling threshold"
                )
                repetition_penalty = gr.Slider(
                    minimum=1.0, maximum=2.0, value=1.1, step=0.05,
                    label="Repetition Penalty", 
                    info="Higher values discourage repetitive patterns"
                )
                max_new_tokens = gr.Slider(
                    minimum=100, maximum=2000, value=1200, step=100,
                    label="Max Length", 
                    info="Maximum length of generated audio (in tokens)"
                )
            
            with gr.Row():
                submit_btn = gr.Button("Generer tale", variant="primary")
                clear_btn = gr.Button("Ryd")
                
        with gr.Column(scale=2):
            audio_output = gr.Audio(label="Genereret tale", type="numpy")
            
    # Set up examples
    gr.Examples(
        examples=examples,
        inputs=[text_input, voice, temperature, top_p, repetition_penalty, max_new_tokens],
        outputs=audio_output,
        fn=generate_speech,
        cache_examples=True,
    )
    
    # Set up event handlers
    submit_btn.click(
        fn=generate_speech,
        inputs=[text_input, voice, temperature, top_p, repetition_penalty, max_new_tokens],
        outputs=audio_output
    )
    
    clear_btn.click(
        fn=lambda: (None, None),
        inputs=[],
        outputs=[text_input, audio_output]
    )

# Launch the app
if __name__ == "__main__":
    demo.queue().launch(share=False, ssr_mode=False)