#!/usr/bin/env python3
import logging
import random
import subprocess
import soundfile as sf
import gradio as gr
import numpy as np
import sherpa_onnx
from huggingface_hub import hf_hub_download
sample_rate = 16000
def _get_nn_model_filename(
repo_id: str,
filename: str,
subfolder: str = "exp",
) -> str:
nn_model_filename = hf_hub_download(
repo_id=repo_id,
filename=filename,
subfolder=subfolder,
)
return nn_model_filename
def get_vad() -> sherpa_onnx.VoiceActivityDetector:
vad_model = _get_nn_model_filename(
repo_id="csukuangfj/vad",
filename="silero_vad.onnx",
subfolder=".",
)
config = sherpa_onnx.VadModelConfig()
config.silero_vad.model = vad_model
config.silero_vad.threshold = 0.5
config.silero_vad.min_silence_duration = 0.1
config.silero_vad.min_speech_duration = 0.25
config.sample_rate = sample_rate
config.silero_vad.max_speech_duration = 20 # seconds
vad = sherpa_onnx.VoiceActivityDetector(
config,
buffer_size_in_seconds=180,
)
return vad
def build_html_output(s: str, style: str = "result_item_success"):
return f"""
"""
def process_uploaded_audio_file(
in_filename: str,
):
logging.warning(f"Processing audio {in_filename}")
if in_filename is None or in_filename == "":
return (
"",
build_html_output(
"Please first upload a file and then click " 'the button "Submit"',
"result_item_error",
),
"",
"",
)
return process_file(in_filename)
def process_uploaded_video_file(
in_filename: str,
):
logging.warning(f"Processing video {in_filename}")
if in_filename is None or in_filename == "":
return (
"",
build_html_output(
"Please first upload a file and then click " 'the button "Submit"',
"result_item_error",
),
"",
"",
)
logging.warning(f"Processing uploaded video file: {in_filename}")
return process_file(in_filename)
def process_file(filename: str):
vad = get_vad()
ffmpeg_cmd = [
"ffmpeg",
"-i",
filename,
"-f",
"s16le",
"-acodec",
"pcm_s16le",
"-ac",
"1",
"-ar",
str(sample_rate),
"-",
]
process = subprocess.Popen(
ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL
)
frames_per_read = int(sample_rate * 100) # 100 second
window_size = 512
buffer = []
all_samples = []
is_last = False
while True:
# *2 because int16_t has two bytes
data = process.stdout.read(frames_per_read * 2)
if not data:
if is_last:
break
is_last = True
data = np.zeros(sample_rate, dtype=np.int16)
samples = np.frombuffer(data, dtype=np.int16)
samples = samples.astype(np.float32) / 32768
buffer = np.concatenate([buffer, samples])
while len(buffer) > window_size:
vad.accept_waveform(buffer[:window_size])
buffer = buffer[window_size:]
if is_last:
vad.flush()
while not vad.empty():
all_samples.extend(vad.front.samples)
vad.pop()
suffix = random.randint(1000, 10000)
out_filename = f"{filename}-{suffix}.wav"
speech_samples = np.array(all_samples, dtype=np.float32)
sf.write(out_filename, speech_samples, samplerate=sample_rate)
return (
out_filename,
build_html_output(
"Done! Please download the generated .wav file", "result_item_success"
),
)
css = """
.result {display:flex;flex-direction:column}
.result_item {padding:15px;margin-bottom:8px;border-radius:15px;width:100%}
.result_item_success {background-color:mediumaquamarine;color:white;align-self:start}
.result_item_error {background-color:#ff7070;color:white;align-self:start}
"""
demo = gr.Blocks(css=css)
with demo:
gr.Markdown("Remove non-speeches")
with gr.Tabs():
with gr.TabItem("Upload audio from disk (音频)"):
uploaded_audio_file = gr.Audio(
sources=["upload"], # Choose between "microphone", "upload"
type="filepath",
label="Upload audio from disk",
)
upload_audio_button = gr.Button("Submit")
output_audio = gr.Audio(label="Output")
output_info_audio = gr.HTML(label="Info")
with gr.TabItem("Upload video from disk (视频)"):
uploaded_video_file = gr.Video(
sources=["upload"],
label="Upload from disk",
show_share_button=True,
)
upload_video_button = gr.Button("Submit")
output_video = gr.Video(label="Output")
output_info_video = gr.HTML(label="Info")
upload_video_button.click(
process_uploaded_video_file,
inputs=[
uploaded_video_file,
],
outputs=[
output_video,
output_info_video,
],
)
upload_audio_button.click(
process_uploaded_audio_file,
inputs=[
uploaded_audio_file,
],
outputs=[
output_audio,
output_info_audio,
],
)
if __name__ == "__main__":
formatter = "%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] %(message)s"
logging.basicConfig(format=formatter, level=logging.WARNING)
demo.launch(share=True)