Spaces:
Sleeping
Sleeping
import os | |
import cv2 | |
import base64 | |
import uuid | |
import smtplib | |
import ssl | |
import json | |
import logging | |
import requests | |
from email.mime.multipart import MIMEMultipart | |
from email.mime.text import MIMEText | |
from email.mime.base import MIMEBase | |
from email import encoders | |
from datetime import datetime | |
import numpy as np | |
# Set DeepFace home directory before importing DeepFace | |
os.environ['DEEPFACE_HOME'] = '/tmp/.deepface' | |
from fastapi import FastAPI, HTTPException, File, Form, UploadFile | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import JSONResponse | |
from pymongo import MongoClient | |
from bson import ObjectId | |
from dotenv import load_dotenv | |
from deepface import DeepFace | |
from ultralytics import YOLO | |
from typing import Optional, List, Dict, Any | |
# Load environment variables | |
load_dotenv() | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Email configuration | |
EMAIL_SENDER = os.getenv("EMAIL_SENDER", "[email protected]") | |
EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD", "your_password") | |
SENDGRID_API_KEY = os.getenv("SENDGRID_API_KEY") | |
# Determine email method based on available credentials | |
if SENDGRID_API_KEY and SENDGRID_API_KEY != "your_sendgrid_api_key": | |
EMAIL_CONFIGURED = True | |
EMAIL_METHOD = "sendgrid" | |
logger.info("Email configured using SendGrid") | |
elif EMAIL_SENDER and EMAIL_PASSWORD and EMAIL_SENDER != "[email protected]": | |
EMAIL_CONFIGURED = True | |
EMAIL_METHOD = "smtp" | |
logger.info("Email configured using SMTP") | |
else: | |
EMAIL_CONFIGURED = False | |
EMAIL_METHOD = "none" | |
logger.warning("Email not configured - no valid credentials found") | |
# MongoDB configuration | |
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017") | |
DATABASE_NAME = os.getenv("DATABASE_NAME", "lost_and_found") | |
try: | |
# Connect to MongoDB | |
client = MongoClient(MONGO_URI) | |
db = client[DATABASE_NAME] | |
lost_collection = db['lost_people'] | |
found_collection = db['found_people'] | |
live_feed_collection = db['live_feed'] | |
match_collection = db['match_records'] | |
logger.info("Connected to MongoDB successfully") | |
except Exception as e: | |
logger.error(f"Failed to connect to MongoDB: {e}") | |
raise | |
# FastAPI setup | |
app = FastAPI(title="Lost & Found Person Tracker API", version="1.0.0") | |
# CORS middleware | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # In production, specify allowed origins | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Initialize YOLO face detection model | |
try: | |
face_model = YOLO("yolov11s-face.pt") | |
logger.info("YOLO face detection model loaded successfully") | |
except Exception as e: | |
logger.error(f"Failed to load YOLO model: {e}") | |
face_model = None | |
def crop_face(image: np.ndarray) -> np.ndarray: | |
"""Extract and crop face from image using YOLO face detection.""" | |
if face_model is None: | |
raise HTTPException(status_code=500, detail="Face detection model not available") | |
results = face_model.predict(image, verbose=False) | |
if results and results[0].boxes and results[0].boxes.xyxy.shape[0] > 0: | |
x1, y1, x2, y2 = results[0].boxes.xyxy[0].cpu().numpy().astype(int) | |
cropped_face = image[y1:y2, x1:x2] | |
return cropped_face | |
raise HTTPException(status_code=400, detail="No face detected in the image") | |
def save_metadata(collection, metadata: dict) -> str: | |
"""Save metadata to MongoDB collection and return the inserted ID as string.""" | |
result = collection.insert_one(metadata) | |
return str(result.inserted_id) | |
def convert_objectid_to_str(data: Any) -> Any: | |
"""Recursively convert ObjectId fields to strings in MongoDB documents.""" | |
if isinstance(data, list): | |
return [convert_objectid_to_str(item) for item in data] | |
elif isinstance(data, dict): | |
converted = {} | |
for key, value in data.items(): | |
if isinstance(value, ObjectId): | |
converted[key] = str(value) | |
elif isinstance(value, dict): | |
converted[key] = convert_objectid_to_str(value) | |
elif isinstance(value, list): | |
converted[key] = [convert_objectid_to_str(item) for item in value] | |
else: | |
converted[key] = value | |
return converted | |
elif isinstance(data, ObjectId): | |
return str(data) | |
else: | |
return data | |
def match_face_with_db(known_image: np.ndarray, collection) -> List[Dict]: | |
"""Match a face against faces in a MongoDB collection using DeepFace.""" | |
matches = [] | |
try: | |
for doc in collection.find(): | |
try: | |
face_blob = doc.get("face_blob") | |
if not face_blob: | |
continue | |
# Decode base64 image | |
img_bytes = base64.b64decode(face_blob) | |
np_arr = np.frombuffer(img_bytes, np.uint8) | |
compare_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) | |
if compare_image is None: | |
continue | |
# Perform face verification | |
result = DeepFace.verify( | |
known_image, | |
compare_image, | |
model_name="ArcFace", | |
enforce_detection=False | |
) | |
if result["verified"]: | |
# Convert ObjectIds to strings before adding to matches | |
match_doc = convert_objectid_to_str(doc) | |
matches.append(match_doc) | |
except Exception as e: | |
logger.error(f"Error matching individual face: {str(e)}") | |
continue | |
except Exception as e: | |
logger.error(f"Error in face matching process: {str(e)}") | |
return matches | |
def send_email_via_sendgrid(to_email: str, subject: str, html_content: str, | |
attachment_data: Optional[str] = None, | |
attachment_name: str = "attachment.jpg") -> bool: | |
"""Send email using SendGrid API via direct HTTP requests.""" | |
try: | |
url = "https://api.sendgrid.com/v3/mail/send" | |
headers = { | |
"Authorization": f"Bearer {SENDGRID_API_KEY}", | |
"Content-Type": "application/json" | |
} | |
# Basic email payload | |
email_payload = { | |
"personalizations": [ | |
{ | |
"to": [{"email": to_email}], | |
"subject": subject | |
} | |
], | |
"from": {"email": EMAIL_SENDER}, | |
"content": [ | |
{ | |
"type": "text/html", | |
"value": html_content | |
} | |
] | |
} | |
# Add attachment if provided | |
if attachment_data: | |
try: | |
email_payload["attachments"] = [ | |
{ | |
"content": attachment_data, | |
"type": "image/jpeg", | |
"filename": attachment_name, | |
"disposition": "attachment" | |
} | |
] | |
except Exception as e: | |
logger.error(f"Error adding attachment to SendGrid email: {e}") | |
# Send the email | |
response = requests.post(url, headers=headers, json=email_payload) | |
if response.status_code in [200, 201, 202]: | |
logger.info(f"Email sent successfully via SendGrid to {to_email}") | |
return True | |
else: | |
logger.error(f"SendGrid API returned status code: {response.status_code}, Response: {response.text}") | |
return False | |
except Exception as e: | |
logger.error(f"Error sending email via SendGrid API: {e}") | |
return False | |
def send_email_via_smtp(to_email: str, subject: str, html_content: str, | |
attachment_data: Optional[str] = None, | |
attachment_name: str = "attachment.jpg") -> bool: | |
"""Send email using SMTP (Gmail).""" | |
try: | |
message = MIMEMultipart() | |
message["From"] = EMAIL_SENDER | |
message["To"] = to_email | |
message["Subject"] = subject | |
message.attach(MIMEText(html_content, "html")) | |
# Add attachment if provided | |
if attachment_data: | |
try: | |
part = MIMEBase("application", "octet-stream") | |
part.set_payload(base64.b64decode(attachment_data)) | |
encoders.encode_base64(part) | |
part.add_header( | |
"Content-Disposition", | |
f"attachment; filename={attachment_name}" | |
) | |
message.attach(part) | |
except Exception as e: | |
logger.error(f"Error adding attachment to email: {e}") | |
context = ssl.create_default_context() | |
with smtplib.SMTP_SSL("smtp.gmail.com", 465, context=context) as server: | |
server.login(EMAIL_SENDER, EMAIL_PASSWORD) | |
server.sendmail(EMAIL_SENDER, to_email, message.as_string()) | |
logger.info(f"Email sent successfully via SMTP to {to_email}") | |
return True | |
except Exception as e: | |
logger.error(f"Error sending email via SMTP: {e}") | |
return False | |
def send_email_alert(subject: str, body: str, to_email: str, | |
image_blob: Optional[str] = None) -> Dict[str, Any]: | |
"""Send email alert using configured method.""" | |
if not EMAIL_CONFIGURED: | |
logger.warning("Email not configured - cannot send alert") | |
return {"success": False, "method": "none", "error": "Email not configured"} | |
if not to_email: | |
logger.error("No recipient email provided") | |
return {"success": False, "method": EMAIL_METHOD, "error": "No recipient email"} | |
success = False | |
if EMAIL_METHOD == "sendgrid": | |
success = send_email_via_sendgrid(to_email, subject, body, image_blob) | |
elif EMAIL_METHOD == "smtp": | |
success = send_email_via_smtp(to_email, subject, body, image_blob) | |
return { | |
"success": success, | |
"method": EMAIL_METHOD, | |
"to": to_email, | |
"subject": subject | |
} | |
def send_match_emails(lost_person: Dict, found_person: Dict) -> Dict[str, Any]: | |
"""Send email notifications for matched persons.""" | |
results = { | |
"lost_email_sent": False, | |
"found_email_sent": False, | |
"errors": [] | |
} | |
time_found = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
# Get email addresses | |
lost_email = None | |
found_email = None | |
if isinstance(lost_person.get("contact_details"), dict): | |
lost_email = lost_person["contact_details"].get("email_id") | |
else: | |
lost_email = lost_person.get("email_id") | |
if isinstance(found_person.get("contact_details"), dict): | |
found_email = found_person["contact_details"].get("email_id") | |
else: | |
found_email = found_person.get("email_id") | |
# Email to person who reported lost person | |
if lost_email: | |
lost_subject = "🎉 GREAT NEWS! Your Lost Person Has Been Found!" | |
lost_body = f""" | |
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto;"> | |
<h2 style="color: #2e7d32;">🎉 Wonderful News!</h2> | |
<p>We have found a match for <strong>{lost_person.get('name', 'the person')}</strong> you reported missing!</p> | |
<div style="background: #f5f5f5; padding: 15px; border-radius: 8px; margin: 20px 0;"> | |
<h3 style="color: #1976d2;">📍 Found Person Details:</h3> | |
<ul> | |
<li><strong>Location Found:</strong> {found_person.get('location_found', found_person.get('where_found', 'Not specified'))}</li> | |
<li><strong>Found By:</strong> {found_person.get('reported_by', {}).get('name', found_person.get('your_name', 'Unknown'))}</li> | |
<li><strong>Organization:</strong> {found_person.get('reported_by', {}).get('organization', found_person.get('organization', 'Not specified'))}</li> | |
<li><strong>Time Found:</strong> {time_found}</li> | |
</ul> | |
</div> | |
<div style="background: #e3f2fd; padding: 15px; border-radius: 8px; margin: 20px 0;"> | |
<h3 style="color: #1976d2;">📞 Contact Information:</h3> | |
<p><strong>Mobile:</strong> {found_person.get('contact_details', {}).get('mobile_no', found_person.get('mobile_no', 'Not provided'))}</p> | |
<p><strong>Email:</strong> {found_email or 'Not provided'}</p> | |
</div> | |
<p style="color: #666;">Please contact them immediately to coordinate the reunion.</p> | |
<p style="font-size: 12px; color: #999;">This is an automated message from the Lost & Found Person Tracker system.</p> | |
</div> | |
""" | |
result = send_email_alert(lost_subject, lost_body, lost_email, found_person.get("face_blob")) | |
results["lost_email_sent"] = result["success"] | |
if not result["success"]: | |
results["errors"].append( | |
f"Failed to send email to lost person reporter: {result.get('error', 'Unknown error')}") | |
# Email to person who found the person | |
if found_email: | |
found_subject = "🔍 Match Confirmed - Lost Person Identified!" | |
found_body = f""" | |
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto;"> | |
<h2 style="color: #d32f2f;">🔍 Match Confirmed!</h2> | |
<p>The person you found has been identified as <strong>{lost_person.get('name', 'a missing person')}</strong>!</p> | |
<div style="background: #f5f5f5; padding: 15px; border-radius: 8px; margin: 20px 0;"> | |
<h3 style="color: #1976d2;">👤 Lost Person Details:</h3> | |
<ul> | |
<li><strong>Name:</strong> {lost_person.get('name', 'Not specified')}</li> | |
<li><strong>Age:</strong> {lost_person.get('age', 'Not specified')}</li> | |
<li><strong>Where Lost:</strong> {lost_person.get('where_lost', 'Not specified')}</li> | |
<li><strong>Reported By:</strong> {lost_person.get('reporter_name', lost_person.get('your_name', 'Unknown'))}</li> | |
<li><strong>Relation:</strong> {lost_person.get('relation_with_lost', 'Not specified')}</li> | |
</ul> | |
</div> | |
<div style="background: #e8f5e8; padding: 15px; border-radius: 8px; margin: 20px 0;"> | |
<h3 style="color: #2e7d32;">📞 Family Contact Information:</h3> | |
<p><strong>Mobile:</strong> {lost_person.get('contact_details', {}).get('mobile_no', lost_person.get('mobile_no', 'Not provided'))}</p> | |
<p><strong>Email:</strong> {lost_email or 'Not provided'}</p> | |
</div> | |
<p style="color: #666;">Thank you for helping reunite this family! Please coordinate with them for safe return.</p> | |
<p style="font-size: 12px; color: #999;">This is an automated message from the Lost & Found Person Tracker system.</p> | |
</div> | |
""" | |
result = send_email_alert(found_subject, found_body, found_email, lost_person.get("face_blob")) | |
results["found_email_sent"] = result["success"] | |
if not result["success"]: | |
results["errors"].append( | |
f"Failed to send email to found person reporter: {result.get('error', 'Unknown error')}") | |
return results | |
def send_live_feed_alert(live_feed_data: Dict, matched_lost_persons: List[Dict]) -> Dict[str, Any]: | |
"""Send urgent alerts when lost persons are spotted in live feed.""" | |
results = { | |
"alerts_sent": 0, | |
"total_matches": len(matched_lost_persons), | |
"errors": [] | |
} | |
detection_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
camera_info = { | |
"camera_id": live_feed_data.get("camera_id", "Unknown"), | |
"location": live_feed_data.get("location", "Unknown location"), | |
"details": live_feed_data.get("location_details", live_feed_data.get("where_found", "No details")), | |
"operator": live_feed_data.get("reported_by", {}).get("name", live_feed_data.get("your_name", "Unknown")), | |
"organization": live_feed_data.get("reported_by", {}).get("organization", | |
live_feed_data.get("organization", "Unknown")) | |
} | |
for lost_person in matched_lost_persons: | |
# Get contact email | |
contact_email = None | |
if isinstance(lost_person.get("contact_details"), dict): | |
contact_email = lost_person["contact_details"].get("email_id") | |
else: | |
contact_email = lost_person.get("email_id") | |
if not contact_email: | |
results["errors"].append(f"No email found for {lost_person.get('name', 'unknown person')}") | |
continue | |
subject = f"🚨 URGENT: {lost_person.get('name', 'Missing Person')} Spotted on Security Camera!" | |
body = f""" | |
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto;"> | |
<div style="background: #d32f2f; color: white; padding: 20px; text-align: center; border-radius: 8px 8px 0 0;"> | |
<h1>🚨 URGENT ALERT 🚨</h1> | |
<h2>Missing Person Spotted!</h2> | |
</div> | |
<div style="padding: 20px; border: 2px solid #d32f2f; border-top: none; border-radius: 0 0 8px 8px;"> | |
<p style="font-size: 18px; color: #d32f2f; font-weight: bold;"> | |
<strong>{lost_person.get('name', 'Your missing person')}</strong> has been spotted on a security camera! | |
</p> | |
<div style="background: #fff3e0; padding: 15px; border-radius: 8px; margin: 20px 0; border-left: 4px solid #ff9800;"> | |
<h3 style="color: #e65100;">📹 Camera Information:</h3> | |
<ul> | |
<li><strong>Camera ID:</strong> {camera_info['camera_id']}</li> | |
<li><strong>Location:</strong> {camera_info['location']}</li> | |
<li><strong>Specific Area:</strong> {camera_info['details']}</li> | |
<li><strong>Detection Time:</strong> {detection_time}</li> | |
<li><strong>Operator:</strong> {camera_info['operator']} ({camera_info['organization']})</li> | |
</ul> | |
</div> | |
<div style="background: #e3f2fd; padding: 15px; border-radius: 8px; margin: 20px 0;"> | |
<h3 style="color: #1976d2;">📞 Immediate Action Required:</h3> | |
<ol> | |
<li>Contact local authorities immediately</li> | |
<li>Head to the location: <strong>{camera_info['location']}</strong></li> | |
<li>Coordinate with security: <strong>{camera_info['operator']}</strong></li> | |
</ol> | |
</div> | |
<div style="background: #f3e5f5; padding: 15px; border-radius: 8px; margin: 20px 0;"> | |
<h3 style="color: #7b1fa2;">⚠️ Important:</h3> | |
<p>This is a live detection. The person may still be in the area. Act quickly but safely.</p> | |
</div> | |
<p style="text-align: center; margin-top: 30px;"> | |
<strong>Time is critical - Every moment counts!</strong> | |
</p> | |
</div> | |
</div> | |
""" | |
result = send_email_alert(subject, body, contact_email, live_feed_data.get("face_blob")) | |
if result["success"]: | |
results["alerts_sent"] += 1 | |
else: | |
results["errors"].append( | |
f"Failed to send alert for {lost_person.get('name', 'unknown')}: {result.get('error', 'Unknown error')}") | |
return results | |
async def upload_lost_person( | |
name: str = Form(...), | |
gender: str = Form(...), | |
age: int = Form(...), | |
where_lost: str = Form(...), | |
your_name: str = Form(...), | |
relation_with_lost: str = Form(...), | |
user_id: str = Form(...), | |
mobile_no: str = Form(...), | |
email_id: str = Form(...), | |
file: UploadFile = File(...) | |
): | |
"""Upload a lost person record with face recognition.""" | |
try: | |
# Read and process image | |
contents = await file.read() | |
image = cv2.imdecode(np.frombuffer(contents, np.uint8), cv2.IMREAD_COLOR) | |
if image is None: | |
raise HTTPException(status_code=400, detail="Invalid image file") | |
# Generate unique face ID | |
face_id = str(uuid.uuid4()) | |
# Extract face from image | |
cropped_face = crop_face(image) | |
# Encode face image to base64 | |
_, buffer = cv2.imencode('.jpg', cropped_face) | |
image_blob = base64.b64encode(buffer).decode('utf-8') | |
# Create metadata | |
metadata = { | |
"face_id": face_id, | |
"name": name, | |
"gender": gender, | |
"age": age, | |
"where_lost": where_lost, | |
"reporter_name": your_name, | |
"relation_with_lost": relation_with_lost, | |
"user_id": user_id, | |
"contact_details": { | |
"mobile_no": mobile_no, | |
"email_id": email_id | |
}, | |
"face_blob": image_blob, | |
"upload_time": datetime.now().isoformat(), | |
"status": "pending" | |
} | |
# Save to database | |
save_metadata(lost_collection, metadata) | |
logger.info(f"Lost person record created: {face_id}") | |
# Match against found people and live feed | |
matched_found = match_face_with_db(cropped_face, found_collection) | |
matched_live = match_face_with_db(cropped_face, live_feed_collection) | |
all_matches = matched_found + matched_live | |
email_results = {"sent_count": 0, "errors": []} | |
# Process matches and send emails | |
for match in all_matches: | |
try: | |
# Send match emails | |
email_result = send_match_emails(metadata, match) | |
# Create match record | |
match_data = { | |
"match_id": str(uuid.uuid4()), | |
"lost_face_id": face_id, | |
"found_face_id": match.get("face_id"), | |
"match_time": datetime.now().isoformat(), | |
"match_status": "confirmed", | |
"lost_person": convert_objectid_to_str(metadata), | |
"found_person": convert_objectid_to_str(match), | |
"email_results": email_result | |
} | |
save_metadata(match_collection, match_data) | |
if email_result.get("lost_email_sent") or email_result.get("found_email_sent"): | |
email_results["sent_count"] += 1 | |
email_results["errors"].extend(email_result.get("errors", [])) | |
except Exception as e: | |
logger.error(f"Error processing match: {e}") | |
email_results["errors"].append(f"Error processing match: {str(e)}") | |
# Prepare response data with ObjectId conversion | |
response_data = { | |
"message": "Lost person uploaded successfully.", | |
"face_id": face_id, | |
"matched_found": convert_objectid_to_str(matched_found), | |
"matched_live": convert_objectid_to_str(matched_live), | |
"total_matches": len(all_matches), | |
"email_results": email_results | |
} | |
logger.info(f"Lost person upload completed: {face_id}") | |
return response_data | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error uploading lost person: {e}") | |
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") | |
async def upload_found_person( | |
name: str = Form(...), | |
gender: str = Form(...), | |
age: int = Form(...), | |
where_found: str = Form(...), | |
your_name: str = Form(...), | |
organization: str = Form(...), | |
designation: str = Form(...), | |
user_id: str = Form(...), | |
mobile_no: str = Form(...), | |
email_id: str = Form(...), | |
file: UploadFile = File(...) | |
): | |
"""Upload a found person record with face recognition.""" | |
try: | |
# Read and process image | |
contents = await file.read() | |
image = cv2.imdecode(np.frombuffer(contents, np.uint8), cv2.IMREAD_COLOR) | |
if image is None: | |
raise HTTPException(status_code=400, detail="Invalid image file") | |
# Generate unique face ID | |
face_id = str(uuid.uuid4()) | |
# Extract face from image | |
cropped_face = crop_face(image) | |
# Encode face image to base64 | |
_, buffer = cv2.imencode('.jpg', cropped_face) | |
image_blob = base64.b64encode(buffer).decode('utf-8') | |
# Create metadata | |
metadata = { | |
"face_id": face_id, | |
"name": name, | |
"gender": gender, | |
"age": age, | |
"location_found": where_found, | |
"reported_by": { | |
"name": your_name, | |
"organization": organization, | |
"designation": designation | |
}, | |
"user_id": user_id, | |
"contact_details": { | |
"mobile_no": mobile_no, | |
"email_id": email_id | |
}, | |
"face_blob": image_blob, | |
"upload_time": datetime.now().isoformat(), | |
"status": "pending" | |
} | |
# Save to database | |
save_metadata(found_collection, metadata) | |
logger.info(f"Found person record created: {face_id}") | |
# Match against lost people | |
matched_lost = match_face_with_db(cropped_face, lost_collection) | |
email_results = {"sent_count": 0, "errors": []} | |
# Process matches and send emails | |
for match in matched_lost: | |
try: | |
# Send match emails | |
email_result = send_match_emails(match, metadata) | |
# Create match record | |
match_data = { | |
"match_id": str(uuid.uuid4()), | |
"lost_face_id": match.get("face_id"), | |
"found_face_id": face_id, | |
"match_time": datetime.now().isoformat(), | |
"match_status": "confirmed", | |
"lost_person": convert_objectid_to_str(match), | |
"found_person": convert_objectid_to_str(metadata), | |
"email_results": email_result | |
} | |
save_metadata(match_collection, match_data) | |
if email_result.get("lost_email_sent") or email_result.get("found_email_sent"): | |
email_results["sent_count"] += 1 | |
email_results["errors"].extend(email_result.get("errors", [])) | |
except Exception as e: | |
logger.error(f"Error processing match: {e}") | |
email_results["errors"].append(f"Error processing match: {str(e)}") | |
# Prepare response data with ObjectId conversion | |
response_data = { | |
"message": "Found person uploaded successfully.", | |
"face_id": face_id, | |
"matched_lost": convert_objectid_to_str(matched_lost), | |
"total_matches": len(matched_lost), | |
"email_results": email_results | |
} | |
logger.info(f"Found person upload completed: {face_id}") | |
return response_data | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error uploading found person: {e}") | |
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") | |
async def upload_live_feed( | |
camera_id: str = Form(...), | |
where_found: str = Form(...), | |
location: str = Form(...), | |
your_name: str = Form(...), | |
organization: str = Form(...), | |
designation: str = Form(...), | |
user_id: str = Form(...), | |
mobile_no: str = Form(...), | |
email_id: str = Form(...), | |
file: UploadFile = File(...) | |
): | |
"""Upload live feed data and check for matches with lost persons.""" | |
try: | |
# Read and process image | |
contents = await file.read() | |
image = cv2.imdecode(np.frombuffer(contents, np.uint8), cv2.IMREAD_COLOR) | |
if image is None: | |
raise HTTPException(status_code=400, detail="Invalid image file") | |
# Generate unique face ID | |
face_id = str(uuid.uuid4()) | |
# Extract face from image | |
cropped_face = crop_face(image) | |
# Encode face image to base64 | |
_, buffer = cv2.imencode('.jpg', cropped_face) | |
image_blob = base64.b64encode(buffer).decode('utf-8') | |
# Create metadata | |
metadata = { | |
"face_id": face_id, | |
"camera_id": camera_id, | |
"location": location, | |
"location_details": where_found, | |
"reported_by": { | |
"name": your_name, | |
"organization": organization, | |
"designation": designation | |
}, | |
"user_id": user_id, | |
"contact_details": { | |
"mobile_no": mobile_no, | |
"email_id": email_id | |
}, | |
"face_blob": image_blob, | |
"upload_time": datetime.now().isoformat(), | |
"status": "active" | |
} | |
# Save to database | |
save_metadata(live_feed_collection, metadata) | |
logger.info(f"Live feed record created: {face_id}") | |
# Match against lost people - this is critical for live alerts | |
matched_lost = match_face_with_db(cropped_face, lost_collection) | |
alert_results = {"alerts_sent": 0, "total_matches": len(matched_lost), "errors": []} | |
if matched_lost: | |
# Send urgent alerts for live feed matches | |
alert_results = send_live_feed_alert(metadata, matched_lost) | |
# Create match records for each match | |
for match in matched_lost: | |
try: | |
match_data = { | |
"match_id": str(uuid.uuid4()), | |
"lost_face_id": match.get("face_id"), | |
"live_feed_face_id": face_id, | |
"match_time": datetime.now().isoformat(), | |
"match_status": "live_spotted", | |
"lost_person": convert_objectid_to_str(match), | |
"live_feed_data": convert_objectid_to_str(metadata), | |
"alert_results": alert_results | |
} | |
save_metadata(match_collection, match_data) | |
except Exception as e: | |
logger.error(f"Error creating match record: {e}") | |
alert_results["errors"].append(f"Error creating match record: {str(e)}") | |
# Clean up old live feed records (keep only latest 500) | |
try: | |
total_records = live_feed_collection.count_documents({}) | |
if total_records > 500: | |
# Delete oldest records | |
records_to_delete = total_records - 500 | |
oldest_records = live_feed_collection.find().sort("upload_time", 1).limit(records_to_delete) | |
for record in oldest_records: | |
live_feed_collection.delete_one({"_id": record["_id"]}) | |
logger.info(f"Cleaned up {records_to_delete} old live feed records") | |
except Exception as e: | |
logger.error(f"Error cleaning up live feed records: {e}") | |
# Prepare response data with ObjectId conversion | |
response_data = { | |
"message": "Live feed uploaded successfully.", | |
"face_id": face_id, | |
"matched_lost": convert_objectid_to_str(matched_lost), | |
"total_matches": len(matched_lost), | |
"alert_results": alert_results, | |
"metadata": convert_objectid_to_str(metadata) | |
} | |
logger.info(f"Live feed upload completed: {face_id}") | |
return response_data | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error uploading live feed: {e}") | |
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") | |
async def get_records_by_user(user_id: str): | |
"""Get all records uploaded by a specific user.""" | |
try: | |
collections_info = [ | |
("lost_people", lost_collection), | |
("found_people", found_collection), | |
("live_feed", live_feed_collection), | |
("match_records", match_collection) | |
] | |
records = [] | |
for collection_name, collection in collections_info: | |
try: | |
docs = list(collection.find({"user_id": user_id})) | |
for doc in docs: | |
doc = convert_objectid_to_str(doc) | |
records.append({"source": collection_name, "data": doc}) | |
except Exception as e: | |
logger.error(f"Error querying {collection_name}: {e}") | |
if records: | |
return { | |
"message": f"Found {len(records)} records for user {user_id}.", | |
"user_id": user_id, | |
"total_records": len(records), | |
"records": records | |
} | |
else: | |
return { | |
"message": f"No records found for user {user_id}.", | |
"user_id": user_id, | |
"total_records": 0, | |
"records": [] | |
} | |
except Exception as e: | |
logger.error(f"Error getting records for user {user_id}: {e}") | |
raise HTTPException(status_code=500, detail=f"Error retrieving records: {str(e)}") | |
async def search_face(face_id: str): | |
"""Search for a specific face ID across all collections.""" | |
try: | |
collections_info = [ | |
("lost_people", lost_collection), | |
("found_people", found_collection), | |
("live_feed", live_feed_collection), | |
("match_records", match_collection) | |
] | |
for collection_name, collection in collections_info: | |
try: | |
record = collection.find_one({"face_id": face_id}) | |
if record: | |
record = convert_objectid_to_str(record) | |
return { | |
"message": f"Face found in {collection_name}.", | |
"collection": collection_name, | |
"face_id": face_id, | |
"record": record | |
} | |
except Exception as e: | |
logger.error(f"Error searching in {collection_name}: {e}") | |
raise HTTPException(status_code=404, detail=f"Face ID {face_id} not found in any collection.") | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error searching for face {face_id}: {e}") | |
raise HTTPException(status_code=500, detail=f"Error searching for face: {str(e)}") | |
# Additional endpoints for frontend functionality | |
async def get_all_lost_people(): | |
"""Get all lost people records.""" | |
try: | |
records = list(lost_collection.find().sort("upload_time", -1)) | |
records = convert_objectid_to_str(records) | |
return { | |
"message": "Lost people records retrieved successfully.", | |
"total_count": len(records), | |
"records": records | |
} | |
except Exception as e: | |
logger.error(f"Error getting lost people: {e}") | |
raise HTTPException(status_code=500, detail="Error retrieving lost people records") | |
async def get_all_found_people(): | |
"""Get all found people records.""" | |
try: | |
records = list(found_collection.find().sort("upload_time", -1)) | |
records = convert_objectid_to_str(records) | |
return { | |
"message": "Found people records retrieved successfully.", | |
"total_count": len(records), | |
"records": records | |
} | |
except Exception as e: | |
logger.error(f"Error getting found people: {e}") | |
raise HTTPException(status_code=500, detail="Error retrieving found people records") | |
async def get_all_live_feed(): | |
"""Get all live feed records.""" | |
try: | |
records = list(live_feed_collection.find().sort("upload_time", -1).limit(100)) | |
records = convert_objectid_to_str(records) | |
return { | |
"message": "Live feed records retrieved successfully.", | |
"total_count": len(records), | |
"records": records | |
} | |
except Exception as e: | |
logger.error(f"Error getting live feed: {e}") | |
raise HTTPException(status_code=500, detail="Error retrieving live feed records") | |
async def get_all_matches(): | |
"""Get all match records.""" | |
try: | |
records = list(match_collection.find().sort("match_time", -1)) | |
records = convert_objectid_to_str(records) | |
return { | |
"message": "Match records retrieved successfully.", | |
"total_count": len(records), | |
"records": records | |
} | |
except Exception as e: | |
logger.error(f"Error getting matches: {e}") | |
raise HTTPException(status_code=500, detail="Error retrieving match records") | |
async def test_email_configuration(test_email: str = Form(...)): | |
"""Test email configuration by sending a test email.""" | |
try: | |
if not EMAIL_CONFIGURED: | |
return { | |
"status": "error", | |
"message": "Email is not configured. Please check your environment variables.", | |
"method": "none" | |
} | |
subject = "Test Email - Lost & Found System" | |
body = f""" | |
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto;"> | |
<h2 style="color: #2e7d32;">✅ Email Test Successful!</h2> | |
<p>This is a test email from the Lost & Found Person Tracker system.</p> | |
<div style="background: #f5f5f5; padding: 15px; border-radius: 8px; margin: 20px 0;"> | |
<h3>System Information:</h3> | |
<ul> | |
<li><strong>Email Method:</strong> {EMAIL_METHOD.upper()}</li> | |
<li><strong>Sender:</strong> {EMAIL_SENDER}</li> | |
<li><strong>Test Time:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</li> | |
</ul> | |
</div> | |
<p>If you received this email, your email configuration is working correctly!</p> | |
<p style="font-size: 12px; color: #999;">This is an automated test message.</p> | |
</div> | |
""" | |
result = send_email_alert(subject, body, test_email) | |
if result["success"]: | |
return { | |
"status": "success", | |
"message": f"Test email sent successfully to {test_email}", | |
"method": EMAIL_METHOD, | |
"details": result | |
} | |
else: | |
return { | |
"status": "error", | |
"message": f"Failed to send test email: {result.get('error', 'Unknown error')}", | |
"method": EMAIL_METHOD, | |
"details": result | |
} | |
except Exception as e: | |
logger.error(f"Error testing email configuration: {e}") | |
return { | |
"status": "error", | |
"message": f"Error testing email: {str(e)}", | |
"method": EMAIL_METHOD | |
} | |
async def health_check(): | |
"""Check system health and configuration.""" | |
try: | |
# Test database connection | |
db_status = "connected" | |
try: | |
client.admin.command('ping') | |
except Exception as e: | |
db_status = f"error: {str(e)}" | |
# Check collections | |
collections_status = {} | |
try: | |
collections_status = { | |
"lost_people": lost_collection.count_documents({}), | |
"found_people": found_collection.count_documents({}), | |
"live_feed": live_feed_collection.count_documents({}), | |
"matches": match_collection.count_documents({}) | |
} | |
except Exception as e: | |
collections_status = {"error": str(e)} | |
return { | |
"status": "healthy" if db_status == "connected" else "unhealthy", | |
"timestamp": datetime.now().isoformat(), | |
"database": db_status, | |
"collections": collections_status, | |
"email_configured": EMAIL_CONFIGURED, | |
"email_method": EMAIL_METHOD, | |
"email_sender": EMAIL_SENDER if EMAIL_CONFIGURED else "Not configured", | |
"face_model_loaded": face_model is not None | |
} | |
except Exception as e: | |
logger.error(f"Health check failed: {e}") | |
return { | |
"status": "error", | |
"timestamp": datetime.now().isoformat(), | |
"error": str(e) | |
} | |
async def get_statistics(): | |
"""Get system statistics.""" | |
try: | |
stats = { | |
"lost_people": lost_collection.count_documents({}), | |
"found_people": found_collection.count_documents({}), | |
"live_feed": live_feed_collection.count_documents({}), | |
"matches": match_collection.count_documents({}), | |
"last_updated": datetime.now().isoformat() | |
} | |
return stats | |
except Exception as e: | |
logger.error(f"Error getting statistics: {e}") | |
raise HTTPException(status_code=500, detail="Error retrieving statistics") | |