Spaces:
Running
Running
import streamlit as st | |
import os | |
import asyncio | |
import base64 | |
import io | |
import traceback | |
import threading | |
import time | |
from typing import Optional, Dict, Any | |
import queue | |
import tempfile | |
import json | |
import cv2 | |
import PIL.Image | |
import numpy as np | |
from streamlit_webrtc import webrtc_streamer, WebRtcMode, RTCConfiguration | |
import av | |
from google import genai | |
from google.genai import types | |
# Constants | |
MODEL = "models/gemini-2.5-flash-preview-native-audio-dialog" | |
# Streamlit page config | |
st.set_page_config( | |
page_title="Gemini Live API", | |
page_icon="π€", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
class HuggingFaceGeminiInterface: | |
def __init__(self): | |
self.session = None | |
self.is_connected = False | |
self.current_frame = None | |
self.client = None | |
self.config = None | |
# Initialize session state | |
if 'session_active' not in st.session_state: | |
st.session_state.session_active = False | |
if 'messages' not in st.session_state: | |
st.session_state.messages = [] | |
if 'audio_chunks' not in st.session_state: | |
st.session_state.audio_chunks = [] | |
def setup_client(self, api_key: str): | |
"""Setup the Gemini client with API key""" | |
try: | |
self.client = genai.Client( | |
http_options={"api_version": "v1beta"}, | |
api_key=os.getenv("GEMINI_API_KEY") | |
) | |
tools = [types.Tool(google_search=types.GoogleSearch())] | |
self.config = types.LiveConnectConfig( | |
response_modalities=["AUDIO", "TEXT"], | |
media_resolution="MEDIA_RESOLUTION_MEDIUM", | |
speech_config=types.SpeechConfig( | |
voice_config=types.VoiceConfig( | |
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Kore") | |
) | |
), | |
context_window_compression=types.ContextWindowCompressionConfig( | |
trigger_tokens=25600, | |
sliding_window=types.SlidingWindow(target_tokens=12800), | |
), | |
tools=tools, | |
) | |
return True | |
except Exception as e: | |
st.error(f"Error setting up client: {e}") | |
return False | |
def process_uploaded_audio(self, uploaded_file): | |
"""Process uploaded audio file""" | |
try: | |
# Save uploaded file temporarily | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file: | |
tmp_file.write(uploaded_file.getvalue()) | |
tmp_file_path = tmp_file.name | |
# Read audio data | |
with open(tmp_file_path, 'rb') as f: | |
audio_data = f.read() | |
# Clean up temp file | |
os.unlink(tmp_file_path) | |
return audio_data | |
except Exception as e: | |
st.error(f"Error processing audio: {e}") | |
return None | |
def process_webcam_frame(self, frame): | |
"""Process webcam frame""" | |
try: | |
# Convert frame to PIL Image | |
img = PIL.Image.fromarray(frame) | |
img.thumbnail([1024, 1024]) | |
# Convert to base64 | |
image_io = io.BytesIO() | |
img.save(image_io, format="jpeg") | |
image_io.seek(0) | |
image_bytes = image_io.read() | |
return { | |
"mime_type": "image/jpeg", | |
"data": base64.b64encode(image_bytes).decode(), | |
"display_frame": frame | |
} | |
except Exception as e: | |
st.error(f"Error processing frame: {e}") | |
return None | |
async def send_text_message(self, text: str): | |
"""Send text message to Gemini""" | |
try: | |
if not self.client: | |
st.error("Client not initialized") | |
return | |
# For HuggingFace deployment, we'll use the simpler generate method | |
response = await self.client.aio.models.generate_content( | |
model=MODEL.replace('-preview-native-audio-dialog', ''), | |
contents=[text] | |
) | |
if response.text: | |
st.session_state.messages.append({ | |
"role": "assistant", | |
"content": response.text, | |
"timestamp": time.time() | |
}) | |
return response.text | |
except Exception as e: | |
st.error(f"Error sending message: {e}") | |
return None | |
async def send_multimodal_message(self, text: str, image_data: Optional[Dict] = None, audio_data: Optional[bytes] = None): | |
"""Send multimodal message to Gemini""" | |
try: | |
if not self.client: | |
st.error("Client not initialized") | |
return | |
contents = [] | |
# Add text | |
if text: | |
contents.append(text) | |
# Add image | |
if image_data: | |
contents.append({ | |
"mime_type": image_data["mime_type"], | |
"data": image_data["data"] | |
}) | |
# Add audio (convert to base64) | |
if audio_data: | |
audio_b64 = base64.b64encode(audio_data).decode() | |
contents.append({ | |
"mime_type": "audio/wav", | |
"data": audio_b64 | |
}) | |
# Use generate_content for multimodal | |
response = await self.client.aio.models.generate_content( | |
model="gemini-1.5-flash", # Use standard model for multimodal | |
contents=contents | |
) | |
if response.text: | |
st.session_state.messages.append({ | |
"role": "assistant", | |
"content": response.text, | |
"timestamp": time.time() | |
}) | |
return response.text | |
except Exception as e: | |
st.error(f"Error sending multimodal message: {e}") | |
return None | |
# Initialize the interface | |
if 'gemini_interface' not in st.session_state: | |
st.session_state.gemini_interface = HuggingFaceGeminiInterface() | |
# Main UI | |
st.title("π€ Gemini Live API Interface (HuggingFace)") | |
st.markdown("Interactive chat with Google Gemini - Web-compatible version") | |
# Sidebar configuration | |
st.sidebar.header("Configuration") | |
# API Key input | |
api_key = st.sidebar.text_input( | |
"Gemini API Key", | |
type="password", | |
help="Enter your Google Gemini API key" | |
) | |
# Input mode selection | |
input_mode = st.sidebar.selectbox( | |
"Input Mode", | |
["text", "webcam", "audio_upload", "multimodal"], | |
help="Choose your input method" | |
) | |
# Connection status | |
if st.session_state.session_active: | |
st.sidebar.success("π’ API Ready") | |
else: | |
st.sidebar.error("π΄ Not Connected") | |
# Setup client | |
if api_key and not st.session_state.session_active: | |
if st.sidebar.button("π Initialize API"): | |
if st.session_state.gemini_interface.setup_client(api_key): | |
st.session_state.session_active = True | |
st.sidebar.success("API initialized!") | |
st.rerun() | |
# Main interface | |
col1, col2 = st.columns([2, 1]) | |
with col1: | |
st.subheader("Chat Interface") | |
# Display chat messages | |
chat_container = st.container() | |
with chat_container: | |
for msg in st.session_state.messages[-10:]: | |
with st.chat_message(msg["role"]): | |
st.write(msg["content"]) | |
if "timestamp" in msg: | |
st.caption(f"At {time.strftime('%H:%M:%S', time.localtime(msg['timestamp']))}") | |
with col2: | |
st.subheader("Input Controls") | |
if st.session_state.session_active: | |
# Text Input Mode | |
if input_mode == "text": | |
st.write("**Text Chat**") | |
text_input = st.text_area("Your message:", height=100) | |
if st.button("Send Message", disabled=not text_input): | |
# Add user message to history | |
st.session_state.messages.append({ | |
"role": "user", | |
"content": text_input, | |
"timestamp": time.time() | |
}) | |
# Send message | |
with st.spinner("Sending..."): | |
response = asyncio.run( | |
st.session_state.gemini_interface.send_text_message(text_input) | |
) | |
if response: | |
st.success("Message sent!") | |
st.rerun() | |
# Webcam Input Mode | |
elif input_mode == "webcam": | |
st.write("**Webcam Input**") | |
# WebRTC component for camera access | |
webrtc_ctx = webrtc_streamer( | |
key="webcam", | |
mode=WebRtcMode.SENDONLY, | |
rtc_configuration=RTCConfiguration({ | |
"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}] | |
}), | |
media_stream_constraints={"video": True, "audio": False}, | |
) | |
text_prompt = st.text_input("Describe what you want to know about the image:") | |
if st.button("Analyze Current Frame") and webrtc_ctx.video_receiver: | |
if text_prompt: | |
with st.spinner("Analyzing..."): | |
# Get the latest frame | |
try: | |
frame = webrtc_ctx.video_receiver.get_frame(timeout=1) | |
if frame: | |
img_array = frame.to_ndarray(format="rgb24") | |
image_data = st.session_state.gemini_interface.process_webcam_frame(img_array) | |
if image_data: | |
# Add user message | |
st.session_state.messages.append({ | |
"role": "user", | |
"content": f"[Image] {text_prompt}", | |
"timestamp": time.time() | |
}) | |
# Send multimodal message | |
response = asyncio.run( | |
st.session_state.gemini_interface.send_multimodal_message( | |
text_prompt, image_data=image_data | |
) | |
) | |
if response: | |
st.success("Image analyzed!") | |
st.rerun() | |
except Exception as e: | |
st.error(f"Error capturing frame: {e}") | |
# Audio Upload Mode | |
elif input_mode == "audio_upload": | |
st.write("**Audio Upload**") | |
uploaded_audio = st.file_uploader( | |
"Upload audio file", | |
type=['wav', 'mp3', 'ogg'], | |
help="Upload an audio file to transcribe and analyze" | |
) | |
text_context = st.text_input("Additional context (optional):") | |
if st.button("Process Audio") and uploaded_audio: | |
with st.spinner("Processing audio..."): | |
audio_data = st.session_state.gemini_interface.process_uploaded_audio(uploaded_audio) | |
if audio_data: | |
# Add user message | |
st.session_state.messages.append({ | |
"role": "user", | |
"content": f"[Audio Upload] {text_context if text_context else 'Please transcribe and analyze this audio'}", | |
"timestamp": time.time() | |
}) | |
# Send audio message | |
response = asyncio.run( | |
st.session_state.gemini_interface.send_multimodal_message( | |
text_context if text_context else "Please transcribe and analyze this audio", | |
audio_data=audio_data | |
) | |
) | |
if response: | |
st.success("Audio processed!") | |
st.rerun() | |
# Multimodal Mode | |
elif input_mode == "multimodal": | |
st.write("**Multimodal Input**") | |
text_input = st.text_area("Text prompt:", height=80) | |
col_img, col_aud = st.columns(2) | |
with col_img: | |
uploaded_image = st.file_uploader( | |
"Upload image", | |
type=['jpg', 'jpeg', 'png'], | |
help="Optional image input" | |
) | |
with col_aud: | |
uploaded_audio = st.file_uploader( | |
"Upload audio", | |
type=['wav', 'mp3', 'ogg'], | |
help="Optional audio input" | |
) | |
if st.button("Send Multimodal Message"): | |
if text_input or uploaded_image or uploaded_audio: | |
with st.spinner("Processing..."): | |
# Process image | |
image_data = None | |
if uploaded_image: | |
img = PIL.Image.open(uploaded_image) | |
img.thumbnail([1024, 1024]) | |
image_io = io.BytesIO() | |
img.save(image_io, format="jpeg") | |
image_io.seek(0) | |
image_bytes = image_io.read() | |
image_data = { | |
"mime_type": "image/jpeg", | |
"data": base64.b64encode(image_bytes).decode() | |
} | |
# Process audio | |
audio_data = None | |
if uploaded_audio: | |
audio_data = st.session_state.gemini_interface.process_uploaded_audio(uploaded_audio) | |
# Create message description | |
msg_parts = [] | |
if text_input: | |
msg_parts.append(f"Text: {text_input}") | |
if uploaded_image: | |
msg_parts.append("Image") | |
if uploaded_audio: | |
msg_parts.append("Audio") | |
# Add user message | |
st.session_state.messages.append({ | |
"role": "user", | |
"content": f"[{', '.join(msg_parts)}]", | |
"timestamp": time.time() | |
}) | |
# Send multimodal message | |
response = asyncio.run( | |
st.session_state.gemini_interface.send_multimodal_message( | |
text_input, image_data=image_data, audio_data=audio_data | |
) | |
) | |
if response: | |
st.success("Multimodal message sent!") | |
st.rerun() | |
else: | |
st.warning("Please provide at least one input (text, image, or audio)") | |
else: | |
st.info("Enter your API key and click 'Initialize API' to get started.") | |
# Footer | |
st.markdown("---") | |
st.markdown(""" | |
**Instructions:** | |
1. Enter your Gemini API key in the sidebar | |
2. Click 'Initialize API' to connect | |
3. Choose your input mode: | |
- **Text**: Simple text chat | |
- **Webcam**: Analyze camera feed with text prompts | |
- **Audio Upload**: Upload and analyze audio files | |
- **Multimodal**: Combine text, images, and audio | |
4. Interact with Gemini using your chosen method | |
**Note**: This version is optimized for Hugging Face Spaces deployment without audio streaming dependencies. | |
""") | |
# Display current mode info | |
st.info(f"Current mode: **{input_mode}**") | |
# Auto-refresh indicator | |
if st.session_state.session_active: | |
st.markdown("π’ **Status**: Ready for interaction") |