import gradio as gr import cv2 import threading import time import requests import json from datetime import datetime class FireDetectionClient: def __init__(self): self.video_sources = {} self.detection_threads = {} self.running = {} self.mcp_server_url = "http://localhost:7860" def detect_fire_mcp(self, frame): """Send frame to MCP server for fire detection""" try: # Convert frame to base64 or save temporarily import base64 import io from PIL import Image # Convert frame to PIL Image image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) # Resize image to reduce processing time image = image.resize((320, 240)) # Smaller size for faster processing # Convert to base64 buffer = io.BytesIO() image.save(buffer, format='JPEG', quality=70) # Lower quality for speed img_str = base64.b64encode(buffer.getvalue()).decode() # Send to MCP server API endpoint with longer timeout response = requests.post( f"{self.mcp_server_url}/detect_fire", json={"image": img_str}, timeout=30 # Increased timeout ) if response.status_code == 200: return response.json() else: return {"error": "MCP server error"} except Exception as e: return {"error": str(e)} def monitor_video_source(self, source_id, video_source): """Monitor a video source for fire/smoke detection""" cap = cv2.VideoCapture(video_source) if not cap.isOpened(): return f"Error: Could not open video source {source_id}" frame_count = 0 self.running[source_id] = True while self.running.get(source_id, False): ret, frame = cap.read() if not ret: break frame_count += 1 # Process every 200th frame (less frequent for stability) if frame_count % 200 == 0: timestamp = datetime.now().strftime("%H:%M:%S") print(f"[{timestamp}] Source {source_id}: Analyzing frame {frame_count}") try: # Use MCP server for fire detection result = self.detect_fire_mcp(frame) if "error" in result: print(f"MCP Error: {result['error']}") # Fallback to simple detection fire_detected, smoke_detected = self.simple_fire_detection(frame) else: fire_detected = result.get("fire_detected", False) smoke_detected = result.get("smoke_detected", False) except Exception as e: print(f"Connection error: {e}") # Use fallback detection fire_detected, smoke_detected = self.simple_fire_detection(frame) if fire_detected or smoke_detected: alert = f"🚨 ALERT - Source {source_id} at {timestamp}:\n" if fire_detected: alert += "🔥 FIRE DETECTED!\n" if smoke_detected: alert += "💨 SMOKE DETECTED!\n" alert += f"Frame: {frame_count}" print(alert) # Here you could send notifications, save alerts, etc. time.sleep(0.03) # Longer delay to reduce load cap.release() print(f"Stopped monitoring source {source_id}") def simple_fire_detection(self, frame): """Simple color-based fire detection as fallback""" # Convert to HSV for better color detection hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) # Fire color ranges (orange/red/yellow) fire_lower1 = (0, 50, 50) fire_upper1 = (10, 255, 255) fire_lower2 = (170, 50, 50) fire_upper2 = (180, 255, 255) # Create masks mask1 = cv2.inRange(hsv, fire_lower1, fire_upper1) mask2 = cv2.inRange(hsv, fire_lower2, fire_upper2) fire_mask = cv2.bitwise_or(mask1, mask2) # Smoke detection (gray areas) gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) smoke_mask = cv2.inRange(gray, 100, 200) # Check if significant area detected fire_area = cv2.countNonZero(fire_mask) smoke_area = cv2.countNonZero(smoke_mask) fire_detected = fire_area > 1000 # Threshold for fire smoke_detected = smoke_area > 5000 # Threshold for smoke return fire_detected, smoke_detected def start_monitoring(self, sources): """Start monitoring selected video sources""" results = [] for i, source in enumerate(sources): if source.strip(): source_id = f"Source_{i+1}" # Convert source to appropriate format if source.isdigit(): video_source = int(source) else: video_source = source # Start monitoring thread thread = threading.Thread( target=self.monitor_video_source, args=(source_id, video_source), daemon=True ) self.detection_threads[source_id] = thread thread.start() results.append(f"✅ Started monitoring {source_id}: {source}") return "\n".join(results) if results else "No valid sources provided" def stop_monitoring(self): """Stop all monitoring threads""" for source_id in self.running: self.running[source_id] = False return "🛑 Stopped all monitoring" # Initialize client client = FireDetectionClient() def create_interface(): """Create Gradio interface for fire detection client""" with gr.Blocks(title="Fire Detection Client") as interface: gr.Markdown("# 🔥 Fire Detection Client") gr.Markdown("Monitor up to 4 video sources for fire and smoke detection") with gr.Row(): with gr.Column(): gr.Markdown("### Video Sources") source1 = gr.Textbox(label="Source 1 (webcam: 0, file path, or RTSP URL)", placeholder="0") source2 = gr.Textbox(label="Source 2", placeholder="rtsp://localhost:8554/stream") source3 = gr.Textbox(label="Source 3", placeholder="C:/path/to/video.mp4") source4 = gr.Textbox(label="Source 4", placeholder="") with gr.Row(): start_btn = gr.Button("🚀 Start Monitoring", variant="primary") stop_btn = gr.Button("🛑 Stop Monitoring", variant="secondary") with gr.Column(): gr.Markdown("### Status") status_output = gr.Textbox( label="Monitoring Status", lines=10, interactive=False ) gr.Markdown("### Instructions") gr.Markdown(""" - **Webcam**: Enter `0` for default webcam, `1` for second camera - **Video File**: Enter full path like `C:/videos/fire.mp4` - **RTSP Stream**: Enter URL like `rtsp://localhost:8554/stream` - **Detection**: Analyzes every 100th frame for real-time performance - **Alerts**: Check console output for fire/smoke detection alerts """) # Event handlers start_btn.click( fn=lambda s1, s2, s3, s4: client.start_monitoring([s1, s2, s3, s4]), inputs=[source1, source2, source3, source4], outputs=status_output ) stop_btn.click( fn=client.stop_monitoring, outputs=status_output ) return interface if __name__ == "__main__": interface = create_interface() interface.launch(server_port=7861, share=False)