EchoX / app.py
tzzte's picture
update
cf432f5
import subprocess
import sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "pip==24.0"])
subprocess.check_call([sys.executable, "-m", "pip", "install", "omegaconf==2.0.6"])
subprocess.check_call([sys.executable, "-m", "pip", "install", "git+https://github.com/facebookresearch/[email protected]"])
import gradio as gr
import os
import torch
import librosa
import soundfile as sf
import tempfile
import spaces # ZeroGPU requirement
# 导入你的模块
import Echox_copy_stream as Echox
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# 全局变量
_MODEL_ON_CUDA = False
inference_model = None
def init_model():
"""在CPU上初始化模型"""
global inference_model
if inference_model is None:
inference_model = Echox.EchoxAssistant()
return inference_model
def process_audio_input(audio):
"""处理音频输入"""
if audio is None:
return None
try:
# 如果是文件路径,直接返回
if isinstance(audio, str):
return audio
# 如果是numpy数组格式 (sr, data)
if isinstance(audio, tuple):
sr, y = audio
if y.ndim > 1:
y = y[:, 0] # 只保留第一个声道
else:
# 如果直接是数组
y = audio
sr = 16000 # 默认采样率
# 保存为临时文件
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file:
sf.write(tmp_file.name, y, sr)
return tmp_file.name
except Exception as e:
print(f"Error processing audio: {e}")
return None
@spaces.GPU(duration=120)
def process_audio_text(text, audio):
global _MODEL_ON_CUDA, inference_model
if inference_model is None:
init_model()
if not _MODEL_ON_CUDA:
try:
if hasattr(inference_model, 'model'):
inference_model.model = inference_model.model.to("cuda")
if hasattr(inference_model, 'unit_translator'):
inference_model.unit_translator = inference_model.unit_translator.to("cuda")
inference_model.device = "cuda"
_MODEL_ON_CUDA = True
print("Model moved to GPU")
except Exception as e:
print(f"Error moving model to GPU: {e}")
audio_path = process_audio_input(audio)
tmp = [{
"conversations": [
{
"from": "user",
"value": text,
"audio": audio_path
}
]
}]
accumulated_text = ""
try:
for text_response, audio_data in inference_model._inference(tmp):
if text_response:
accumulated_text = text_response
if audio_data is not None:
sr, audio_array = audio_data
yield (sr, audio_array), accumulated_text
else:
yield None, accumulated_text
except Exception as e:
yield None, f"Error: {str(e)}"
finally:
if audio_path and audio_path != audio and os.path.exists(audio_path):
try:
os.unlink(audio_path)
except:
pass
init_model()
if __name__ == "__main__":
examples = [
["Listen to the audio and answer to the question.", "./show_case/1.wav"],
["", "./show_case/2.wav"],
]
iface = gr.Interface(
fn=process_audio_text,
inputs=[
gr.Textbox(label="Input Text", value=examples[0][0]),
gr.Audio(type="filepath", label="Upload Audio", value=examples[0][1])
],
outputs=[
gr.Audio(label="Streamed Audio", streaming=True, autoplay=True),
gr.Textbox(label="Model output")
],
examples=examples,
live=False,
allow_flagging="never"
)
iface.launch(server_name="0.0.0.0", server_port=7860, share=False)