Spaces:
Build error
Build error
import os | |
import time | |
import threading | |
from pathlib import Path | |
from typing import Iterator | |
import gradio as gr | |
import numpy as np | |
import soundfile as sf | |
import librosa | |
import torch | |
from transformers import set_seed | |
from vibevoice.modular.modeling_vibevoice_inference import ( | |
VibeVoiceForConditionalGenerationInference, | |
) | |
from vibevoice.processor.vibevoice_processor import VibeVoiceProcessor | |
from vibevoice.modular.streamer import AudioStreamer | |
MODEL_ID = "microsoft/VibeVoice-1.5B" | |
def convert_to_16bit(data: np.ndarray) -> np.ndarray: | |
if torch.is_tensor(data): | |
data = data.detach().cpu().numpy() | |
data = np.array(data, dtype=np.float32, copy=False) | |
amax = np.max(np.abs(data)) if data.size else 1.0 | |
if amax > 1.0: | |
data = data / amax | |
return (data * 32767.0).astype(np.int16) | |
def read_audio(path: str, target_sr: int = 24000) -> np.ndarray: | |
wav, sr = sf.read(path) | |
if wav.ndim > 1: | |
wav = wav.mean(axis=1) | |
if sr != target_sr: | |
wav = librosa.resample(wav, orig_sr=sr, target_sr=target_sr) | |
return wav.astype(np.float32) | |
class VibeMiniDemo: | |
def __init__(self, model_path: str, device: str = "cuda", inference_steps: int = 10): | |
self.model_path = model_path | |
self.device = device | |
self.inference_steps = inference_steps | |
self._stop = False | |
self._streamer = None | |
self._load() | |
def _load(self): | |
print(f"๐ Loading VibeVoice from {self.model_path} ...") | |
# Processor pulls tokenizer/config from HF automatically if model_path is a repo id | |
self.processor = VibeVoiceProcessor.from_pretrained(self.model_path) | |
# Try flash-attn2 first; fall back to SDPA if the env doesnโt have it | |
try: | |
self.model = VibeVoiceForConditionalGenerationInference.from_pretrained( | |
self.model_path, | |
torch_dtype=torch.bfloat16, | |
device_map="cuda" if torch.cuda.is_available() else "cpu", | |
attn_implementation="flash_attention_2", | |
) | |
except Exception as e: | |
print(f"โ ๏ธ flash_attention_2 unavailable ({type(e).__name__}: {e}); falling back to SDPA") | |
self.model = VibeVoiceForConditionalGenerationInference.from_pretrained( | |
self.model_path, | |
torch_dtype=torch.bfloat16, | |
device_map="cuda" if torch.cuda.is_available() else "cpu", | |
attn_implementation="sdpa", | |
) | |
self.model.eval() | |
# Configure diffusion steps (matches upstream demo defaults) | |
self.model.model.noise_scheduler = self.model.model.noise_scheduler.from_config( | |
self.model.model.noise_scheduler.config, | |
algorithm_type="sde-dpmsolver++", | |
beta_schedule="squaredcos_cap_v2", | |
) | |
self.model.set_ddpm_inference_steps(num_steps=self.inference_steps) | |
print("โ Model ready") | |
def stop(self): | |
self._stop = True | |
if self._streamer is not None: | |
try: | |
self._streamer.end() | |
except Exception as e: | |
print(f"stop error: {e}") | |
def generate_stream( | |
self, | |
script: str, | |
voice_files: list[str], | |
cfg_scale: float = 1.3, | |
) -> Iterator[tuple]: | |
if not script.strip(): | |
yield None, None, "โ Please provide a script.", gr.update(visible=False) | |
return | |
# Load voice samples (1..4) | |
voice_samples = [read_audio(p) for p in voice_files if p] | |
if not voice_samples: | |
yield None, None, "โ Provide at least one voice sample (WAV/MP3/etc).", gr.update(visible=False) | |
return | |
# Normalize speaker labels if user didnโt prefix lines | |
lines = [] | |
for i, raw in enumerate([ln for ln in script.splitlines() if ln.strip()]): | |
if raw.lower().startswith("speaker") and ":" in raw: | |
lines.append(raw) | |
else: | |
lines.append(f"Speaker {i % max(1, len(voice_samples))}: {raw}") | |
formatted = "\n".join(lines) | |
# Pack inputs | |
inputs = self.processor( | |
text=[formatted], | |
voice_samples=[voice_samples], | |
padding=True, | |
return_tensors="pt", | |
return_attention_mask=True, | |
) | |
self._stop = False | |
streamer = AudioStreamer(batch_size=1, stop_signal=None, timeout=None) | |
self._streamer = streamer | |
# Kick off generation on a worker thread | |
def _worker(): | |
try: | |
self.model.generate( | |
**inputs, | |
max_new_tokens=None, | |
cfg_scale=cfg_scale, | |
tokenizer=self.processor.tokenizer, | |
generation_config={"do_sample": False}, | |
audio_streamer=streamer, | |
stop_check_fn=lambda: self._stop, | |
verbose=False, | |
refresh_negative=True, | |
) | |
except Exception as e: | |
print(f"gen error: {e}") | |
streamer.end() | |
t = threading.Thread(target=_worker, daemon=True) | |
t.start() | |
# Stream chunks out | |
sr = 24000 | |
all_chunks, pending = [], [] | |
last_yield = time.time() | |
min_chunk = sr * 30 # ~30s per push feels smooth for Spaces audio | |
min_interval = 15.0 # or every 15s if chunks are small | |
stream0 = streamer.get_stream(0) | |
got_any = False | |
yielded_any = False | |
chunk_idx = 0 | |
log_prefix = f"๐๏ธ VibeVoice streaming (CFG={cfg_scale})\n" | |
for chunk in stream0: | |
if self._stop: | |
streamer.end() | |
break | |
got_any = True | |
chunk_idx += 1 | |
if torch.is_tensor(chunk): | |
if chunk.dtype == torch.bfloat16: | |
chunk = chunk.float() | |
audio_np = chunk.cpu().numpy().astype(np.float32) | |
else: | |
audio_np = np.asarray(chunk, dtype=np.float32) | |
if audio_np.ndim > 1: | |
audio_np = audio_np.squeeze(-1) | |
pcm16 = convert_to_16bit(audio_np) | |
all_chunks.append(pcm16) | |
pending.append(pcm16) | |
need_push = False | |
if not yielded_any and sum(len(c) for c in pending) >= min_chunk: | |
need_push = True | |
yielded_any = True | |
elif yielded_any and ( | |
sum(len(c) for c in pending) >= min_chunk | |
or (time.time() - last_yield) >= min_interval | |
): | |
need_push = True | |
if need_push and pending: | |
new_audio = np.concatenate(pending) | |
total_sec = sum(len(c) for c in all_chunks) / sr | |
msg = log_prefix + f"๐ต {total_sec:.1f}s generated (chunk {chunk_idx})" | |
yield (sr, new_audio), None, msg, gr.update(visible=True) | |
pending, last_yield = [], time.time() | |
# Flush any remainder | |
if pending: | |
final = np.concatenate(pending) | |
total_sec = sum(len(c) for c in all_chunks) / sr | |
yield (sr, final), None, log_prefix + f"๐ต final chunk: {total_sec:.1f}s", gr.update(visible=True) | |
yielded_any = True | |
# Join worker quickly; then deliver full take | |
t.join(timeout=5.0) | |
self._streamer = None | |
if not got_any: | |
yield None, None, "โ No audio chunks received from the model.", gr.update(visible=False) | |
return | |
if all_chunks: | |
complete = np.concatenate(all_chunks) | |
final_sec = len(complete) / sr | |
msg = f"โ Done. Total: {final_sec:.1f}s" | |
yield None, (sr, complete), msg, gr.update(visible=False) | |
def build_ui(demo: VibeMiniDemo): | |
with gr.Blocks(title="VibeVoice โ Minimal") as app: | |
gr.Markdown("## ๐๏ธ VibeVoice โ Minimal Space\nProvide a script and 1โ4 short voice samples.") | |
with gr.Row(): | |
with gr.Column(): | |
script = gr.Textbox( | |
label="Script", | |
value="Speaker 0: Welcome to VibeVoice!\nSpeaker 0: This is a minimal Space demo.", | |
lines=8, | |
) | |
cfg = gr.Slider(1.0, 2.0, step=0.05, value=1.3, label="CFG Scale") | |
voices = gr.Files( | |
label="Voice samples (WAV/MP3/FLAC/OGG/M4A/AAC) โ 1 to 4 files", | |
file_count="multiple", | |
type="filepath", | |
) | |
with gr.Row(): | |
go = gr.Button("๐ Generate") | |
stop = gr.Button("๐ Stop", variant="stop") | |
with gr.Column(): | |
live = gr.Audio(label="Live Stream", streaming=True, autoplay=True) | |
full = gr.Audio(label="Complete Take (downloadable)") | |
log = gr.Textbox(label="Log", interactive=False) | |
badge = gr.HTML(visible=False, value=""" | |
<div style="background:#dcfce7;border:1px solid #86efac;padding:8px;border-radius:8px;text-align:center"> | |
<strong>LIVE STREAMING</strong> | |
</div> | |
""") | |
def on_go(script, cfg, voices): | |
paths = [f.name if hasattr(f, "name") else f for f in (voices or [])][:4] | |
# Clear outputs first | |
yield None, gr.update(value=None), "โณ Startingโฆ", gr.update(visible=True) | |
# Stream generation | |
for s_chunk, full_take, msg, badge_vis in demo.generate_stream( | |
script=script, | |
voice_files=paths, | |
cfg_scale=cfg, | |
): | |
if full_take is not None: | |
# final: hide live, show full | |
yield None, full_take, msg, gr.update(visible=False) | |
else: | |
# live streaming | |
yield s_chunk, gr.update(), msg, badge_vis | |
go.click( | |
on_go, | |
inputs=[script, cfg, voices], | |
outputs=[live, full, log, badge], | |
) | |
def on_stop(): | |
demo.stop() | |
return "๐ Stopped.", gr.update(visible=False) | |
stop.click(on_stop, outputs=[log, badge]) | |
return app | |
def main(): | |
set_seed(42) | |
demo = VibeMiniDemo(model_path=MODEL_ID, device="cuda" if torch.cuda.is_available() else "cpu") | |
app = build_ui(demo) | |
app.queue(max_size=20, default_concurrency_limit=1).launch(server_name="0.0.0.0", show_api=False) | |
if __name__ == "__main__": | |
main() | |