# import threading # import pyaudio # import wave # import io # import time # from groq import Groq # from pydub import AudioSegment # import numpy as np # class SpeechTotext: # def __init__(self): # self.client = Groq() # self.is_recording = False # self.frames = [] # self.chunk = 1024 # self.format = pyaudio.paInt16 # self.channels = 1 # self.rate = 44100 # self.p = pyaudio.PyAudio() # # Silence detection parameters # self.silence_threshold = -35.0 # Adjusted threshold to be more lenient # self.silence_duration = 3.0 # seconds # self.buffer_duration = 0.1 # seconds for each audio chunk analysis # self.silent_chunks = 0 # self.chunks_per_second = int(1 / self.buffer_duration) # def detect_silence(self, audio_segment): # """Check if the audio chunk is silent using pydub""" # return audio_segment.dBFS < self.silence_threshold # def record_audio(self): # stream = self.p.open( # format=self.format, # channels=self.channels, # rate=self.rate, # input=True, # frames_per_buffer=self.chunk # ) # self.frames = [] # buffer_samples = int(self.buffer_duration * self.rate) # while self.is_recording: # # Read enough chunks to fill our buffer duration # buffer_data = b'' # chunks_needed = max(1, int(buffer_samples / self.chunk)) # for _ in range(chunks_needed): # data = stream.read(self.chunk) # buffer_data += data # self.frames.append(data) # # Convert the buffer to pydub AudioSegment # audio_buffer = AudioSegment( # data=buffer_data, # sample_width=self.p.get_sample_size(self.format), # frame_rate=self.rate, # channels=self.channels # ) # # Check for silence # if self.detect_silence(audio_buffer): # self.silent_chunks += 1 # if self.silent_chunks >= self.silence_duration * self.chunks_per_second: # print(f"Silence detected for {self.silence_duration} seconds, stopping recording...") # self.is_recording = False # break # else: # self.silent_chunks = 0 # Reset silent chunk counter when sound is detected # stream.stop_stream() # stream.close() # def start_recording(self): # """Start recording audio""" # self.is_recording = True # self.silent_chunks = 0 # threading.Thread(target=self.record_audio).start() # def stop_recording(self): # """Stop recording audio and transcribe""" # self.is_recording = False # print("Recording stopped") # # Save the recorded audio to a BytesIO object # wav_buffer = io.BytesIO() # with wave.open(wav_buffer, 'wb') as wf: # wf.setnchannels(self.channels) # wf.setsampwidth(self.p.get_sample_size(self.format)) # wf.setframerate(self.rate) # wf.writeframes(b''.join(self.frames)) # # Rewind the buffer and transcribe # wav_buffer.seek(0) # try: # transcription = self.client.audio.transcriptions.create( # file=("audio.wav", wav_buffer), # model="whisper-large-v3-turbo" # ) # print(f"Transcript: {transcription.text}") # except Exception as e: # print(f"Error while transcribing audio: {str(e)}") # finally: # wav_buffer.close() # def cleanup(self): # """Cleanup PyAudio""" # self.p.terminate() # if __name__ == "__main__": # recorder = SpeechTotext() # try: # print("Starting recording... (will stop after 3 seconds of silence)") # recorder.start_recording() # # Wait for recording to finish # while recorder.is_recording: # time.sleep(0.1) # recorder.stop_recording() # finally: # recorder.cleanup() # Upper one using pydub to detect silence if needed in future versions import pyaudio import wave import io from array import array from groq import Groq class SpeechToText: def __init__(self): self.client = Groq() self.chunk = 4096 self.format = pyaudio.paInt16 self.channels = 1 self.rate = 16000 self.silence_threshold = 1000 self.silence_duration = 3.0 self.frames_per_chunk = self.chunk / self.rate self.chunks_for_silence = int(self.silence_duration / self.frames_per_chunk) def record_and_transcribe(self): """Records audio until 3 seconds of silence and returns the transcription.""" p = pyaudio.PyAudio() # Create a new PyAudio instance for each request (previously when declared on constructor cant execute more than once) stream = p.open( format=self.format, channels=self.channels, rate=self.rate, input=True, frames_per_buffer=self.chunk ) frames = [] silent_chunk_counter = 0 print("Recording started... (will stop after 3 seconds of silence)") while True: try: data = stream.read(self.chunk, exception_on_overflow=False) frames.append(data) # Detect silence audio_data = array('h', data) if max(abs(x) for x in audio_data) < self.silence_threshold: silent_chunk_counter += 1 if silent_chunk_counter >= self.chunks_for_silence: print(f"Detected {self.silence_duration} seconds of silence, stopping...") break else: silent_chunk_counter = 0 except IOError as e: print(f"Error recording: {e}") break stream.stop_stream() stream.close() p.terminate() # Ensure PyAudio is completely closed wav_buffer = io.BytesIO() try: with wave.open(wav_buffer, 'wb') as wf: wf.setnchannels(self.channels) wf.setsampwidth(p.get_sample_size(self.format)) wf.setframerate(self.rate) wf.writeframes(b''.join(frames)) wav_buffer.seek(0) transcription = self.client.audio.transcriptions.create( file=("audio.wav", wav_buffer), model="whisper-large-v3-turbo" ) return transcription.text except Exception as e: print(f"Error transcribing: {e}") return str(e) finally: wav_buffer.close() if __name__ == "__main__": recorder = SpeechToText() transcribed_text = recorder.record_and_transcribe() if transcribed_text: print(f"Transcription: {transcribed_text}")