arthrod's picture
Update app.py
92bfd5b verified
import os
import asyncio
import time
import numpy as np
import cv2
import gradio as gr
from fastrtc import Stream, AsyncAudioVideoStreamHandler, get_cloudflare_turn_credentials_async, ReplyOnPause
from google import genai
from google.genai import types
# Environment variable for API key
API_KEY = os.getenv("GEMINI_API_KEY", "")
class EnhancedScreenAssistantHandler(AsyncAudioVideoStreamHandler):
"""Enhanced real-time screen assistant with voice activity detection"""
def __init__(self):
super().__init__(input_audio_type="mono", output_sample_rate=24000, input_sample_rate=16000)
self.session = None
self.last_frame_time = 0
self.audio_queue = asyncio.Queue()
self.text_queue = asyncio.Queue()
self.connected = False
self.frame_interval = 1.0 # Send one frame per second
self.conversation_history = []
async def start_up(self):
"""Initialize Google GenAI Live session with enhanced configuration"""
try:
if not API_KEY:
print("❌ No GEMINI_API_KEY found in environment")
return
# Initialize Google GenAI client with alpha API access
client = genai.Client(api_key=API_KEY, http_options={"api_version": "v1alpha"})
# Enhanced configuration for live session
config = {
"response_modalities": ["AUDIO", "TEXT"],
"input_audio_transcription": {"model": "latest"},
"output_audio_transcription": {"model": "latest"},
"system_instruction": (
"You are an expert real-time screen assistant. You can see the user's screen "
"and hear their voice. Provide clear, actionable guidance based on what you observe. "
"Be proactive - if you see the user struggling or notice something important, "
"offer helpful suggestions even without being asked. Keep responses concise but thorough. "
"When giving instructions, be specific about what to click, where to look, "
"and what to expect next."
),
"generation_config": {"response_mime_type": "text/plain", "temperature": 0.7, "max_output_tokens": 512},
}
# Connect to Live API
self.session = await client.aio.live.connect(model="gemini-2.0-flash-live-preview", config=config)
self.connected = True
print("βœ… Connected to Google GenAI Live API with enhanced configuration")
# Start background tasks with proper management
self.background_tasks = set()
response_task = asyncio.create_task(self._handle_responses())
context_task = asyncio.create_task(self._periodic_context_update())
self.background_tasks.add(response_task)
self.background_tasks.add(context_task)
response_task.add_done_callback(self.background_tasks.discard)
context_task.add_done_callback(self.background_tasks.discard)
except Exception as e:
print(f"❌ Failed to connect to GenAI: {e}")
self.connected = False
async def _handle_responses(self):
"""Handle incoming responses from AI with enhanced processing"""
try:
current_text = ""
async for msg in self.session.receive():
if msg.data: # Audio response from AI
# Convert raw PCM bytes to numpy array for FastRTC
audio_array = np.frombuffer(msg.data, dtype=np.int16)
if len(audio_array) > 0:
audio_array = audio_array.reshape(1, -1) # Shape: (1, N)
await self.audio_queue.put(audio_array)
if msg.text: # Text response from AI
current_text += msg.text
print(f"πŸ€– AI: {msg.text}")
# Add to conversation history when response is complete
if msg.text.endswith((".", "!", "?", "\n")):
self.conversation_history.append({"role": "assistant", "content": current_text.strip(), "timestamp": time.time()})
current_text = ""
# Keep conversation history manageable
if len(self.conversation_history) > 20:
self.conversation_history = self.conversation_history[-15:]
await self.text_queue.put(msg.text)
except Exception as e:
print(f"❌ Error handling AI responses: {e}")
async def _periodic_context_update(self):
"""Periodically send context updates to maintain session state"""
while self.connected:
await asyncio.sleep(30) # Update every 30 seconds
if self.session and len(self.conversation_history) > 0:
try:
# Send a subtle context maintenance message
context_msg = "Continue monitoring and providing assistance as needed."
await self.session.send_realtime_input(text=context_msg)
except Exception as e:
print(f"⚠️ Context update failed: {e}")
async def receive(self, frame: tuple[int, np.ndarray]):
"""Handle incoming audio with voice activity detection"""
if not self.connected or not self.session:
return
try:
_, audio_np = frame
# Basic voice activity detection
audio_level = np.abs(audio_np).mean()
if audio_level > 0.01: # Threshold for voice activity
audio_bytes = audio_np.tobytes()
# Send audio to Google GenAI Live API
await self.session.send_realtime_input(media=types.Blob(data=audio_bytes, mime_type="audio/pcm;rate=16000"))
except Exception as e:
print(f"❌ Error processing audio: {e}")
async def video_receive(self, frame: np.ndarray):
"""Handle incoming video frames with intelligent frame selection"""
if not self.connected or not self.session:
return
try:
current_time = time.time()
# Adaptive frame rate based on activity
# Send frames more frequently if there's likely activity
frame_diff_threshold = 0.1
if hasattr(self, "last_frame"):
frame_diff = np.abs(frame.astype(float) - self.last_frame.astype(float)).mean()
if frame_diff > frame_diff_threshold:
# More activity detected, reduce interval
effective_interval = self.frame_interval * 0.5
else:
effective_interval = self.frame_interval
else:
effective_interval = self.frame_interval
if current_time - self.last_frame_time < effective_interval:
return
self.last_frame_time = current_time
self.last_frame = frame.copy()
# Resize frame for efficiency while maintaining quality
height, width = frame.shape[:2]
if width > 1280:
scale = 1280 / width
new_width = 1280
new_height = int(height * scale)
frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA)
# Encode frame as JPEG with optimized quality
success, jpg_bytes = cv2.imencode(
".jpg",
frame,
[cv2.IMWRITE_JPEG_QUALITY, 75], # Balanced quality/size
)
if not success:
return
# Send frame to Google GenAI
await self.session.send_realtime_input(media=types.Blob(data=jpg_bytes.tobytes(), mime_type="image/jpeg"))
print(f"πŸ“Έ Sent frame ({frame.shape[1]}x{frame.shape[0]}, {len(jpg_bytes)} bytes)")
except Exception as e:
print(f"❌ Error processing video frame: {e}")
async def emit(self):
"""Provide audio output back to user with queue management"""
try:
audio_chunk = self.audio_queue.get_nowait()
return (24000, audio_chunk)
except asyncio.QueueEmpty:
return None
async def get_latest_text(self):
"""Get latest text response for UI updates"""
try:
text = self.text_queue.get_nowait()
return text
except asyncio.QueueEmpty:
return None
async def shutdown(self):
"""Enhanced cleanup with proper resource management"""
self.connected = False
if self.session:
try:
# Send goodbye message
await self.session.send_realtime_input(text="Session ending. Thank you!")
await asyncio.sleep(0.5) # Brief delay for message to send
await self.session.close()
print("πŸ”΄ Cleanly disconnected from GenAI Live API")
except Exception as e:
print(f"⚠️ Error during shutdown: {e}")
# Cancel all background tasks properly
if hasattr(self, "background_tasks"):
for task in self.background_tasks.copy():
if not task.done():
task.cancel()
# Wait for all tasks to complete or be cancelled
if self.background_tasks:
await asyncio.gather(*self.background_tasks, return_exceptions=True)
self.background_tasks.clear()
# Clear queues
while not self.audio_queue.empty():
try:
self.audio_queue.get_nowait()
except asyncio.QueueEmpty:
break
while not self.text_queue.empty():
try:
self.text_queue.get_nowait()
except asyncio.QueueEmpty:
break
self.session = None
self.conversation_history = []
# Global state management
app_state = {"stream": None, "handler": None, "connected": False, "screen_sharing": False}
def initialize_stream():
"""Initialize the FastRTC stream with enhanced configuration"""
try:
# Create enhanced handler
handler = EnhancedScreenAssistantHandler()
app_state["handler"] = handler
# Create stream with optimized settings for HF Spaces
stream = Stream(
handler=ReplyOnPause(handler), # Add voice activity detection
modality="audio-video",
mode="send-receive",
rtc_configuration=get_cloudflare_turn_credentials_async,
time_limit=600, # 10 minute session limit
ui_args={
"audio_controls": True,
"video_controls": True,
},
)
app_state["stream"] = stream
return stream
except Exception as e:
print(f"❌ Error initializing stream: {e}")
return None
def handle_connect():
"""Enhanced connection handler"""
if not API_KEY:
return "❌ Please set GEMINI_API_KEY environment variable"
if app_state["connected"]:
return "βœ… Already connected - session is active"
if app_state["handler"]:
app_state["connected"] = True
return "βœ… Connecting to AI... Please allow microphone and camera permissions"
return "❌ Stream not initialized - please refresh the page"
def handle_screen_share():
"""Handle screen sharing toggle"""
app_state["screen_sharing"] = not app_state["screen_sharing"]
if app_state["screen_sharing"]:
return "πŸ–₯️ Screen sharing started - AI can now see your screen"
else:
return "πŸ“± Switched back to camera view"
async def handle_disconnect_async():
"""Async enhanced disconnection handler"""
if app_state["handler"] and app_state["connected"]:
try:
await app_state["handler"].shutdown()
app_state["connected"] = False
app_state["screen_sharing"] = False
app_state["handler"] = None
return "πŸ”΄ Disconnected from AI assistant"
except Exception as e:
return f"⚠️ Disconnect error: {e}"
return "Already disconnected"
def handle_disconnect():
"""Sync wrapper for enhanced disconnection handler"""
# Create task and store reference for proper cleanup
if not hasattr(app_state, "disconnect_task") or app_state.get("disconnect_task", {}).done():
import asyncio
app_state["disconnect_task"] = asyncio.create_task(handle_disconnect_async())
app_state["connected"] = False # Immediately mark as disconnected
app_state["screen_sharing"] = False
return "πŸ”„ Disconnecting... Please wait..."
# Enhanced JavaScript for screen sharing
enhanced_screen_share_js = """
async function toggleScreenShare() {
try {
const videoElements = document.querySelectorAll('video');
const webrtcVideo = Array.from(videoElements).find(video =>
video.srcObject && video.srcObject.getVideoTracks().length > 0
);
if (!webrtcVideo) {
return "❌ Could not find video element";
}
const currentTrack = webrtcVideo.srcObject.getVideoTracks()[0];
const isScreenShare = currentTrack && currentTrack.label.includes('screen');
if (isScreenShare) {
// Switch back to camera
const cameraStream = await navigator.mediaDevices.getUserMedia({
video: { width: 640, height: 480 },
audio: false
});
const videoTrack = cameraStream.getVideoTracks()[0];
webrtcVideo.srcObject.removeTrack(currentTrack);
webrtcVideo.srcObject.addTrack(videoTrack);
currentTrack.stop();
return "πŸ“± Switched to camera view";
} else {
// Switch to screen share
const screenStream = await navigator.mediaDevices.getDisplayMedia({
video: {
mediaSource: 'screen',
width: { ideal: 1280, max: 1920 },
height: { ideal: 720, max: 1080 },
frameRate: { ideal: 2, max: 5 } // Low frame rate for efficiency
},
audio: false
});
const videoTrack = screenStream.getVideoTracks()[0];
webrtcVideo.srcObject.removeTrack(currentTrack);
webrtcVideo.srcObject.addTrack(videoTrack);
// Handle when screen sharing ends
videoTrack.onended = () => {
console.log('Screen sharing ended by user');
// Automatically switch back to camera
navigator.mediaDevices.getUserMedia({video: true, audio: false})
.then(cameraStream => {
const cameraTrack = cameraStream.getVideoTracks()[0];
webrtcVideo.srcObject.addTrack(cameraTrack);
});
};
currentTrack.stop();
return "πŸ–₯️ Screen sharing active";
}
} catch (error) {
console.error('Screen sharing error:', error);
if (error.name === 'NotAllowedError') {
return "❌ Screen sharing permission denied";
} else if (error.name === 'NotFoundError') {
return "❌ No screen available to share";
} else {
return `❌ Error: ${error.message}`;
}
}
}
return toggleScreenShare();
"""
def create_main_interface():
"""Create the enhanced main interface"""
# Initialize stream
stream = initialize_stream()
with gr.Blocks(
title="Enhanced Real-Time Screen Assistant",
theme=gr.themes.Soft(),
css="""
.status-connected { background: linear-gradient(90deg, #4CAF50, #45a049); color: white; }
.status-disconnected { background: linear-gradient(90deg, #f44336, #da190b); color: white; }
.status-warning { background: linear-gradient(90deg, #ff9800, #f57c00); color: white; }
.control-row { margin: 10px 0; }
.stream-container { border: 2px solid #ddd; border-radius: 10px; padding: 20px; margin: 20px 0; }
""",
) as demo:
gr.Markdown("# πŸ–₯️ Enhanced Real-Time Screen Assistant")
gr.Markdown("""
**Advanced AI assistant with live screen sharing, voice interaction, and real-time guidance**
Powered by Google's Gemini Live API and FastRTC for ultra-low latency communication.
""")
# Status display
status_display = gr.Textbox(
label="πŸ” Status",
value="Ready to connect - Click Connect to start your AI session",
interactive=False,
elem_classes=["status-disconnected"],
)
# Control buttons
with gr.Row(elem_classes=["control-row"]):
connect_btn = gr.Button("πŸ”— Connect to AI", variant="primary", size="lg")
screen_btn = gr.Button("πŸ–₯️ Toggle Screen Share", variant="secondary", size="lg")
disconnect_btn = gr.Button("πŸ”΄ Disconnect", variant="stop", size="lg")
# Stream container
if stream and stream.ui:
with gr.Group(elem_classes=["stream-container"]):
gr.Markdown("### πŸ“‘ Live Stream")
stream_interface = stream.ui
else:
stream_interface = gr.HTML("<div>⚠️ Stream initialization failed - check console for errors</div>")
# Usage instructions
with gr.Accordion("πŸ“‹ How to Use This Assistant", open=True):
gr.Markdown("""
**Getting Started:**
1. **Connect**: Click "Connect to AI" to establish the AI session
2. **Permissions**: Allow microphone and camera access in your browser
3. **Screen Share**: Click "Toggle Screen Share" to let the AI see your screen
4. **Interact**: Simply speak naturally - the AI will respond with voice and can see your screen
**What the AI can help with:**
- πŸ–₯️ **Software tutorials**: "Show me how to use this feature"
- πŸ”§ **Troubleshooting**: "Why isn't this working?"
- πŸ“Š **Data analysis**: "Help me understand this chart"
- 🎨 **Design feedback**: "How can I improve this layout?"
- πŸ“ **Writing assistance**: "Help me edit this document"
- 🌐 **Web navigation**: "Guide me through this website"
**Voice Commands:**
- "What am I looking at?"
- "What should I do next?"
- "Explain this to me"
- "Help me fix this error"
- "Is this the right approach?"
""")
# Advanced features
with gr.Accordion("βš™οΈ Advanced Features", open=False):
gr.Markdown("""
**Technical Capabilities:**
- πŸŽ™οΈ **Voice Activity Detection**: AI responds when you finish speaking
- πŸ“Έ **Intelligent Frame Sampling**: Optimized screen capture (1-2 FPS)
- 🧠 **Context Awareness**: AI remembers your conversation history
- πŸ”„ **Adaptive Quality**: Automatically adjusts based on connection
- ⚑ **Ultra-Low Latency**: Typical response time under 500ms
**Privacy & Security:**
- πŸ”’ All data encrypted in transit (WebRTC + TLS)
- 🏠 Processing by Google's secure AI infrastructure
- 🚫 No permanent storage of your screen or voice data
- πŸ‘€ Each session is completely isolated and private
**Optimization for Hugging Face Spaces:**
- ☁️ Cloudflare TURN servers for reliable connectivity
- πŸ”§ Automatic resource management and cleanup
- ⏱️ Session timeout prot""")
# Wire up the interface
connect_btn.click(fn=handle_connect, outputs=[status_display])
screen_btn.click(fn=handle_screen_share, outputs=[status_display], _js=enhanced_screen_share_js)
disconnect_btn.click(fn=handle_disconnect, outputs=[status_display])
return demo
# Main execution
if __name__ == "__main__":
print("πŸ–₯️ Enhanced Real-Time Screen Assistant")
print("=" * 55)
if not API_KEY:
print("⚠️ CRITICAL: No GEMINI_API_KEY environment variable found!")
print("Please set your Google AI API key:")
print("export GEMINI_API_KEY='your-api-key-here'")
print("\nGet your API key at: https://makersuite.google.com/app/apikey")
else:
print(f"βœ… API key configured (length: {len(API_KEY)})")
print("\nπŸ”§ Initializing enhanced components...")
print("- FastRTC with voice activity detection")
print("- Google GenAI Live API integration")
print("- Cloudflare TURN server configuration")
print("- Enhanced screen sharing capabilities")
try:
demo = create_main_interface()
print("\nπŸš€ Launching enhanced interface...")
demo.launch(server_name="0.0.0.0", server_port=7860, share=False, show_error=True, enable_queue=True)
except Exception as e:
print(f"\n❌ Failed to launch: {e}")
print("Check that all dependencies are installed:")
print("pip install -r requirements.txt")