#!/usr/bin/env python3 import logging import random import subprocess import soundfile as sf import gradio as gr import numpy as np import sherpa_onnx from huggingface_hub import hf_hub_download sample_rate = 16000 def _get_nn_model_filename( repo_id: str, filename: str, subfolder: str = "exp", ) -> str: nn_model_filename = hf_hub_download( repo_id=repo_id, filename=filename, subfolder=subfolder, ) return nn_model_filename def get_vad() -> sherpa_onnx.VoiceActivityDetector: vad_model = _get_nn_model_filename( repo_id="csukuangfj/vad", filename="silero_vad.onnx", subfolder=".", ) config = sherpa_onnx.VadModelConfig() config.silero_vad.model = vad_model config.silero_vad.threshold = 0.5 config.silero_vad.min_silence_duration = 0.1 config.silero_vad.min_speech_duration = 0.25 config.sample_rate = sample_rate config.silero_vad.max_speech_duration = 20 # seconds vad = sherpa_onnx.VoiceActivityDetector( config, buffer_size_in_seconds=180, ) return vad def build_html_output(s: str, style: str = "result_item_success"): return f"""
{s}
""" def process_uploaded_audio_file( in_filename: str, ): logging.warning(f"Processing audio {in_filename}") if in_filename is None or in_filename == "": return ( "", build_html_output( "Please first upload a file and then click " 'the button "Submit"', "result_item_error", ), "", "", ) return process_file(in_filename) def process_uploaded_video_file( in_filename: str, ): logging.warning(f"Processing video {in_filename}") if in_filename is None or in_filename == "": return ( "", build_html_output( "Please first upload a file and then click " 'the button "Submit"', "result_item_error", ), "", "", ) logging.warning(f"Processing uploaded video file: {in_filename}") return process_file(in_filename) def process_file(filename: str): vad = get_vad() ffmpeg_cmd = [ "ffmpeg", "-i", filename, "-f", "s16le", "-acodec", "pcm_s16le", "-ac", "1", "-ar", str(sample_rate), "-", ] process = subprocess.Popen( ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL ) frames_per_read = int(sample_rate * 100) # 100 second window_size = 512 buffer = [] all_samples = [] is_last = False while True: # *2 because int16_t has two bytes data = process.stdout.read(frames_per_read * 2) if not data: if is_last: break is_last = True data = np.zeros(sample_rate, dtype=np.int16) samples = np.frombuffer(data, dtype=np.int16) samples = samples.astype(np.float32) / 32768 buffer = np.concatenate([buffer, samples]) while len(buffer) > window_size: vad.accept_waveform(buffer[:window_size]) buffer = buffer[window_size:] if is_last: vad.flush() while not vad.empty(): all_samples.extend(vad.front.samples) vad.pop() suffix = random.randint(1000, 10000) out_filename = f"{filename}-{suffix}.wav" speech_samples = np.array(all_samples, dtype=np.float32) sf.write(out_filename, speech_samples, samplerate=sample_rate) return ( out_filename, build_html_output( "Done! Please download the generated .wav file", "result_item_success" ), ) css = """ .result {display:flex;flex-direction:column} .result_item {padding:15px;margin-bottom:8px;border-radius:15px;width:100%} .result_item_success {background-color:mediumaquamarine;color:white;align-self:start} .result_item_error {background-color:#ff7070;color:white;align-self:start} """ demo = gr.Blocks(css=css) with demo: gr.Markdown("Remove non-speeches") with gr.Tabs(): with gr.TabItem("Upload audio from disk (音频)"): uploaded_audio_file = gr.Audio( sources=["upload"], # Choose between "microphone", "upload" type="filepath", label="Upload audio from disk", ) upload_audio_button = gr.Button("Submit") output_audio = gr.Audio(label="Output") output_info_audio = gr.HTML(label="Info") with gr.TabItem("Upload video from disk (视频)"): uploaded_video_file = gr.Video( sources=["upload"], label="Upload from disk", show_share_button=True, ) upload_video_button = gr.Button("Submit") output_video = gr.Video(label="Output") output_info_video = gr.HTML(label="Info") upload_video_button.click( process_uploaded_video_file, inputs=[ uploaded_video_file, ], outputs=[ output_video, output_info_video, ], ) upload_audio_button.click( process_uploaded_audio_file, inputs=[ uploaded_audio_file, ], outputs=[ output_audio, output_info_audio, ], ) if __name__ == "__main__": formatter = "%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] %(message)s" logging.basicConfig(format=formatter, level=logging.WARNING) demo.launch(share=True)