import os import streamlit as st import speech_recognition as sr from gtts import gTTS import google.generativeai as genai import base64 from transformers import pipeline import asyncio # Ensure event loop exists (Fix for Streamlit async issue) try: asyncio.get_running_loop() except RuntimeError: asyncio.set_event_loop(asyncio.new_event_loop()) # Configure Generative AI (Ensure to use a secure way to handle API keys) GOOGLE_API_KEY = "------------------------------------------------" genai.configure(api_key=GOOGLE_API_KEY) # Initialize recognizer recognizer = sr.Recognizer() # Emotion Detection Model emotion_model = pipeline("text-classification", model="bhadresh-savani/distilbert-base-uncased-emotion") def detect_emotion(text): """Detects emotion from text.""" try: return emotion_model(text)[0]['label'] except Exception as e: return f"Error detecting emotion: {str(e)}" def listen_to_customer(): """Captures voice input and converts it to text.""" with sr.Microphone() as source: st.write("Listening...") audio = recognizer.listen(source) try: return recognizer.recognize_google(audio) except (sr.UnknownValueError, sr.RequestError): return None def process_text(customer_input): """Processes customer input using Generative AI.""" try: model = genai.GenerativeModel('gemini-1.5-flash') response = model.generate_content(customer_input) return response.text except Exception as e: return f"Error in AI response: {str(e)}" def text_to_speech(text, voice_option, language): """Converts AI response text to speech.""" try: lang_code = {"English": "en", "Spanish": "es", "French": "fr", "Hindi": "hi"}.get(language, "en") tts = gTTS(text=text, lang=lang_code, tld='com' if voice_option == "Male" else 'co.uk') file_path = "response.mp3" tts.save(file_path) return file_path except Exception as e: st.error(f"Text-to-Speech Error: {str(e)}") return None def autoplay_audio(file_path): """Autoplays generated speech audio in Streamlit.""" try: with open(file_path, "rb") as f: data = f.read() b64 = base64.b64encode(data).decode() st.markdown(f""" """, unsafe_allow_html=True) except Exception as e: st.error(f"Error playing audio: {str(e)}") def main(): st.title("Vocacity AI Voice Agent 🎙️") st.sidebar.header("Settings") language = st.sidebar.selectbox("Choose Language:", ["English", "Spanish", "French", "Hindi"]) voice_option = st.sidebar.selectbox("Choose AI Voice:", ["Male", "Female"]) clear_chat = st.sidebar.button("🗑️ Clear Chat") if "chat_history" not in st.session_state: st.session_state.chat_history = [] user_text_input = st.text_input("Type your query here:", "") if st.button("🎙️ Speak"): customer_input = listen_to_customer() else: customer_input = user_text_input.strip() if user_text_input else None if customer_input: emotion = detect_emotion(customer_input) ai_response = process_text(customer_input) st.session_state.chat_history.append((customer_input, ai_response)) st.write(f"**AI Response:** {ai_response} (Emotion: {emotion})") audio_file = text_to_speech(ai_response, voice_option, language) if audio_file: autoplay_audio(audio_file) os.remove(audio_file) st.write("### Chat History") for user, ai in st.session_state.chat_history[-5:]: st.write(f"👤 {user}") st.write(f"🤖 {ai}") if clear_chat: st.session_state.chat_history = [] st.experimental_rerun() if __name__ == "__main__": main()