# Wild Fire Tracker - Fire Detection MCP Server # Copyright (c) 2024 Wild Fire Tracker # Licensed under MIT License - see LICENSE file for details import gradio as gr import cv2 import numpy as np import threading import time from datetime import datetime from PIL import Image from transformers import BlipProcessor, BlipForQuestionAnswering import torch # Load BLIP-2 model print("Loading BLIP-2 model...") vqa_processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base") vqa_model = BlipForQuestionAnswering.from_pretrained("Salesforce/blip-vqa-base", torch_dtype=torch.float16) device = "cuda" if torch.cuda.is_available() else "cpu" vqa_model = vqa_model.to(device) print(f"Model loaded on {device}") class FireDetectionMCP: def __init__(self): self.running = False self.current_frame = None self.status = "No video source" self.status_color = "#808080" # Gray self.last_analysis_time = 0 self.frame_count = 0 self.last_detection_time = None self.display_status = "No video source" # For video overlay (no emojis) def analyze_frame(self, frame): """Analyze frame for fire/smoke""" try: # Convert to PIL Image image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) image = image.resize((224, 224)) # Ask multiple questions for better detection fire_question = "Is there fire or flames in this image?" smoke_question = "Is there smoke in this image?" # Check for fire with confidence fire_inputs = vqa_processor(image, fire_question, return_tensors="pt").to(device) with torch.no_grad(): fire_outputs = vqa_model.generate(**fire_inputs, max_length=10, return_dict_in_generate=True, output_scores=True) fire_answer = vqa_processor.decode(fire_outputs.sequences[0], skip_special_tokens=True).lower() fire_confidence = torch.softmax(fire_outputs.scores[0][0], dim=0).max().item() * 100 # Check for smoke with confidence smoke_inputs = vqa_processor(image, smoke_question, return_tensors="pt").to(device) with torch.no_grad(): smoke_outputs = vqa_model.generate(**smoke_inputs, max_length=10, return_dict_in_generate=True, output_scores=True) smoke_answer = vqa_processor.decode(smoke_outputs.sequences[0], skip_special_tokens=True).lower() smoke_confidence = torch.softmax(smoke_outputs.scores[0][0], dim=0).max().item() * 100 # Determine result has_fire = 'yes' in fire_answer or 'fire' in fire_answer or 'flame' in fire_answer has_smoke = 'yes' in smoke_answer or 'smoke' in smoke_answer if has_fire and has_smoke: status_with_emoji = f"🔥💨 FIRE & SMOKE DETECTED (F:{fire_confidence:.0f}% S:{smoke_confidence:.0f}%)" status_no_emoji = f"FIRE & SMOKE DETECTED (F:{fire_confidence:.0f}% S:{smoke_confidence:.0f}%)" return status_with_emoji, status_no_emoji, "#FF0000" # Red elif has_fire: status_with_emoji = f"🔥 FIRE DETECTED ({fire_confidence:.0f}%)" status_no_emoji = f"FIRE DETECTED ({fire_confidence:.0f}%)" return status_with_emoji, status_no_emoji, "#FF4500" # Orange elif has_smoke: status_with_emoji = f"💨 SMOKE DETECTED ({smoke_confidence:.0f}%)" status_no_emoji = f"SMOKE DETECTED ({smoke_confidence:.0f}%)" return status_with_emoji, status_no_emoji, "#696969" # Gray else: status_with_emoji = f"✅ ALL CLEAR (F:{fire_confidence:.0f}% S:{smoke_confidence:.0f}%)" status_no_emoji = f"ALL CLEAR (F:{fire_confidence:.0f}% S:{smoke_confidence:.0f}%)" return status_with_emoji, status_no_emoji, "#32CD32" # Green except Exception as e: return f"❌ ERROR: {str(e)}", "#FF0000" # Red def monitor_video(self, video_source): """Monitor video source""" if video_source.isdigit(): cap = cv2.VideoCapture(int(video_source)) else: cap = cv2.VideoCapture(video_source) if not cap.isOpened(): self.status = "❌ Cannot open video source" self.status_color = "#FF0000" return # Check if MP4 for looping is_mp4 = isinstance(video_source, str) and video_source.lower().endswith('.mp4') self.running = True self.frame_count = 0 while self.running: ret, frame = cap.read() # Loop MP4 files if not ret and is_mp4: cap.set(cv2.CAP_PROP_POS_FRAMES, 0) ret, frame = cap.read() self.frame_count = 0 if not ret: break self.frame_count += 1 current_time = time.time() # Resize for display display_frame = cv2.resize(frame, (640, 480)) # Analyze every 10 seconds (only if still running) if self.running and current_time - self.last_analysis_time >= 10.0: print(f"[{datetime.now().strftime('%H:%M:%S')}] Analyzing frame {self.frame_count}...") self.status, self.display_status, self.status_color = self.analyze_frame(frame) self.last_analysis_time = current_time self.last_detection_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z') print(f"[{datetime.now().strftime('%H:%M:%S')}] Result: {self.status}") # Add status overlay cv2.rectangle(display_frame, (0, 0), (640, 80), (0, 0, 0), -1) # Convert hex color to BGR if self.status_color == "#32CD32": # Green color = (50, 205, 50) elif self.status_color == "#FF4500": # Orange color = (0, 69, 255) elif self.status_color == "#696969": # Gray color = (105, 105, 105) else: # Red color = (0, 0, 255) # cv2.putText(display_frame, self.status, (10, 30), # cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) cv2.putText(display_frame, self.display_status, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) # Add full timestamp timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z') cv2.putText(display_frame, f"Time: {timestamp}", (10, 460), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) # Store frame self.current_frame = cv2.cvtColor(display_frame, cv2.COLOR_BGR2RGB) time.sleep(0.04) # ~25 FPS cap.release() self.status = "Monitoring stopped" self.status_color = "#808080" def start_monitoring(self, video_source): """Start monitoring in thread""" if self.running: return "Already monitoring" if not video_source or (isinstance(video_source, str) and not video_source.strip()): return "Please provide a video source" thread = threading.Thread(target=self.monitor_video, args=(video_source,), daemon=True) thread.start() return f"✅ Started monitoring: {video_source}" def stop_monitoring(self): """Stop monitoring""" self.running = False self.current_frame = None self.status = "🛑 Monitoring stopped" self.display_status = "Monitoring stopped" self.status_color = "#808080" return "🛑 Monitoring stopped" def get_frame(self): """Get current frame""" if self.current_frame is not None: return self.current_frame else: # Placeholder placeholder = np.zeros((480, 640, 3), dtype=np.uint8) cv2.putText(placeholder, "Waiting for video stream...", (150, 240), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2) return placeholder def get_status(self): """Get current status""" if self.last_detection_time: return f"{self.status} (Last check: {self.last_detection_time})" return self.status # Initialize MCP server mcp_server = FireDetectionMCP() def create_interface(): """Create Gradio interface""" with gr.Blocks(title="🔥 Fire Detection MCP Server", theme=gr.themes.Soft()) as interface: gr.Markdown("# 🔥 Fire Detection MCP Server") gr.Markdown("Real-time fire and smoke detection from video streams (analyzes every 10 seconds)") gr.Markdown("⚠️ **Usage**: Upload your own video file or use live sources (webcam/RTSP). It may take few seconds to load stream and show analysis. Webcam may not work on HF Spaces.") gr.Markdown("🔗 **Sample Videos**: [Fire Test Video](https://www.pexels.com/video/a-man-carrying-gear-walking-away-from-a-controlled-fire-8552246/) | [Smoke Test Video](https://www.pexels.com/video/aerial-view-of-controlled-forest-fire-in-spring-31361444/)") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Video Source Options") with gr.Tabs(): with gr.Tab("📁 Upload Video"): video_upload = gr.File( label="Upload MP4 Video", file_types=[".mp4", ".avi", ".mov"], type="filepath" ) upload_btn = gr.Button("🚀 Start Monitoring", variant="primary") with gr.Tab("📹 Live Sources"): video_input = gr.Textbox( label="Video Source", placeholder="0 (webcam), rtsp://url, or path/to/video.mp4", value="0" ) live_btn = gr.Button("🚀 Start Monitoring", variant="primary") stop_btn = gr.Button("🛑 Stop Monitoring", variant="secondary") control_output = gr.Textbox(label="Control Status", interactive=False) gr.Markdown("### Detection Status") status_display = gr.Textbox(label="Current Status", interactive=False) gr.Markdown("### Status Legend") gr.Markdown("🟢 Clear | 🟠 Fire | ⚫ Smoke | 🔴 Error") with gr.Column(scale=2): gr.Markdown("### Live Video Stream") video_display = gr.Image( label="Video Feed", height=480, width=640, interactive=False ) # Update functions def update_display(): frame = mcp_server.get_frame() status = mcp_server.get_status() return frame, status # Event handlers def start_from_upload(video_file): mcp_server.stop_monitoring() # Stop current stream if video_file is None: return "❌ Please upload a video file first" return mcp_server.start_monitoring(video_file) def start_live_source(video_source): mcp_server.stop_monitoring() # Stop current stream return mcp_server.start_monitoring(video_source) upload_btn.click( fn=start_from_upload, inputs=video_upload, outputs=control_output ) live_btn.click( fn=start_live_source, inputs=video_input, outputs=control_output ) stop_btn.click( fn=mcp_server.stop_monitoring, outputs=control_output ) # Auto-refresh every 0.5 seconds timer = gr.Timer(0.5) timer.tick( fn=update_display, outputs=[video_display, status_display] ) return interface if __name__ == "__main__": interface = create_interface() interface.launch(mcp_server=True, server_port=7860, share=False)