|
import asyncio |
|
import numpy as np |
|
import time |
|
from faster_whisper import WhisperModel |
|
import pyaudio |
|
import wave |
|
import threading |
|
from collections import deque |
|
import os |
|
from silero_vad import load_silero_vad, VADIterator |
|
|
|
sampling_rate = 16_000 |
|
vad_model = load_silero_vad() |
|
vad_iter = VADIterator(vad_model, sampling_rate=sampling_rate) |
|
frame_size = 512 |
|
|
|
PRE_CHUNK_LIMIT_BYTES = frame_size * 2 * 20 |
|
|
|
|
|
class FasterWhisperTerminalTest: |
|
def __init__(self): |
|
|
|
self.model = WhisperModel( |
|
"small", |
|
device="cpu", |
|
compute_type="int8", |
|
cpu_threads=4, |
|
download_root="./models" |
|
) |
|
|
|
|
|
self.sample_rate = 16000 |
|
self.chunk_size = 512 |
|
self.channels = 1 |
|
self.format = pyaudio.paInt16 |
|
|
|
|
|
self.is_recording = False |
|
self.audio_buffer = deque() |
|
self.silence_threshold = 500 |
|
self.silence_duration = 0.4 |
|
self.silence_counter = 0 |
|
self.pre_chunks = deque(maxlen=PRE_CHUNK_LIMIT_BYTES) |
|
self.audio_count = 0 |
|
self.in_speech = False |
|
|
|
print("Faster-Whisper Terminal Test Initialized") |
|
print("Model loaded successfully") |
|
|
|
def start_recording(self): |
|
"""Start recording audio from microphone""" |
|
self.audio = pyaudio.PyAudio() |
|
self.stream = self.audio.open( |
|
format=self.format, |
|
channels=self.channels, |
|
rate=self.sample_rate, |
|
input=True, |
|
frames_per_buffer=self.chunk_size, |
|
stream_callback=self.audio_callback |
|
) |
|
|
|
self.is_recording = True |
|
self.stream.start_stream() |
|
print("Recording started. Speak into microphone...") |
|
print("Press Ctrl+C to stop") |
|
|
|
def audio_callback(self, in_data, frame_count, time_info, status): |
|
"""Audio callback for real-time processing""" |
|
if self.is_recording: |
|
audio_data = np.frombuffer(in_data, dtype=np.int16) |
|
self.audio_buffer.extend(audio_data) |
|
|
|
float_chunk = audio_data.astype(np.float32) / 32768.0 |
|
|
|
vad_result = vad_iter(float_chunk) |
|
|
|
self.pre_chunks.extend(audio_data) |
|
|
|
|
|
|
|
|
|
|
|
if vad_result: |
|
if "start" in vad_result: |
|
self.in_speech = True |
|
self.audio_buffer.extend( |
|
self.pre_chunks) |
|
if "end" in vad_result: |
|
self.in_speech = False |
|
|
|
if self.in_speech: |
|
self.audio_buffer.extend(audio_data) |
|
self.silence_counter = 0.0 |
|
self.audio_count += 1 |
|
else: |
|
sample_rate = 16000 |
|
duration = len(audio_data) / sample_rate |
|
self.silence_counter += duration |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.silence_counter >= self.silence_duration and len(self.audio_buffer) > 0: |
|
self.silence_counter = 0 |
|
if self.audio_count < 2: |
|
self.audio_count = 0 |
|
return (in_data, pyaudio.paContinue) |
|
self.audio_count = 0 |
|
print("Silence ") |
|
threading.Thread(target=self.process_audio, |
|
daemon=True).start() |
|
|
|
return (in_data, pyaudio.paContinue) |
|
|
|
def process_audio(self): |
|
"""Process accumulated audio buffer""" |
|
if len(self.audio_buffer) == 0: |
|
return |
|
|
|
|
|
audio_array = np.array(list(self.audio_buffer), |
|
dtype=np.float32) / 32768.0 |
|
self.audio_buffer.clear() |
|
self.silence_counter = 0 |
|
|
|
if len(audio_array) < self.sample_rate * 0.5: |
|
return |
|
|
|
print("\nπ€ Processing audio...") |
|
start_time = time.time() |
|
|
|
try: |
|
|
|
segments, info = self.model.transcribe( |
|
audio_array, |
|
language="en", |
|
beam_size=2, |
|
vad_filter=True, |
|
vad_parameters=dict( |
|
|
|
min_speech_duration_ms=500, |
|
max_speech_duration_s=60 |
|
) |
|
) |
|
|
|
|
|
transcription = "" |
|
for segment in segments: |
|
transcription += segment.text |
|
|
|
end_time = time.time() |
|
processing_time = end_time - start_time |
|
|
|
if transcription.strip(): |
|
print(f"π Transcription: {transcription.strip()}") |
|
print(f"β±οΈ Processing time: {processing_time:.2f}s") |
|
print( |
|
f"π Language: {info.language} (confidence: {info.language_probability:.2f})") |
|
print("-" * 50) |
|
else: |
|
print("π No speech detected") |
|
|
|
except Exception as e: |
|
print(f"β Error during transcription: {e}") |
|
|
|
def stop_recording(self): |
|
"""Stop recording and cleanup""" |
|
self.is_recording = False |
|
if hasattr(self, 'stream'): |
|
self.stream.stop_stream() |
|
self.stream.close() |
|
if hasattr(self, 'audio'): |
|
self.audio.terminate() |
|
print("\nπ Recording stopped") |
|
|
|
|
|
def test_faster_whisper(): |
|
"""Main function to test Faster-Whisper""" |
|
tester = FasterWhisperTerminalTest() |
|
|
|
try: |
|
tester.start_recording() |
|
|
|
|
|
while True: |
|
time.sleep(0.1) |
|
|
|
except KeyboardInterrupt: |
|
print("\n\nπ Stopping test...") |
|
tester.stop_recording() |
|
print("β
Faster-Whisper test completed") |
|
|
|
|
|
if __name__ == "__main__": |
|
test_faster_whisper() |
|
|