import os
import cv2
import base64
import uuid
import smtplib
import ssl
import json
import logging
import requests
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from datetime import datetime
import numpy as np
# Set DeepFace home directory before importing DeepFace
os.environ['DEEPFACE_HOME'] = '/tmp/.deepface'
from fastapi import FastAPI, HTTPException, File, Form, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pymongo import MongoClient
from bson import ObjectId
from dotenv import load_dotenv
from deepface import DeepFace
from ultralytics import YOLO
from typing import Optional, List, Dict, Any
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Email configuration
EMAIL_SENDER = os.getenv("EMAIL_SENDER", "your_email@gmail.com")
EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD", "your_password")
SENDGRID_API_KEY = os.getenv("SENDGRID_API_KEY")
# Determine email method based on available credentials
if SENDGRID_API_KEY and SENDGRID_API_KEY != "your_sendgrid_api_key":
EMAIL_CONFIGURED = True
EMAIL_METHOD = "sendgrid"
logger.info("Email configured using SendGrid")
elif EMAIL_SENDER and EMAIL_PASSWORD and EMAIL_SENDER != "your_email@gmail.com":
EMAIL_CONFIGURED = True
EMAIL_METHOD = "smtp"
logger.info("Email configured using SMTP")
else:
EMAIL_CONFIGURED = False
EMAIL_METHOD = "none"
logger.warning("Email not configured - no valid credentials found")
# MongoDB configuration
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017")
DATABASE_NAME = os.getenv("DATABASE_NAME", "lost_and_found")
try:
# Connect to MongoDB
client = MongoClient(MONGO_URI)
db = client[DATABASE_NAME]
lost_collection = db['lost_people']
found_collection = db['found_people']
live_feed_collection = db['live_feed']
match_collection = db['match_records']
logger.info("Connected to MongoDB successfully")
except Exception as e:
logger.error(f"Failed to connect to MongoDB: {e}")
raise
# FastAPI setup
app = FastAPI(title="Lost & Found Person Tracker API", version="1.0.0")
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify allowed origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize YOLO face detection model
try:
face_model = YOLO("yolov11s-face.pt")
logger.info("YOLO face detection model loaded successfully")
except Exception as e:
logger.error(f"Failed to load YOLO model: {e}")
face_model = None
def crop_face(image: np.ndarray) -> np.ndarray:
"""Extract and crop face from image using YOLO face detection."""
if face_model is None:
raise HTTPException(status_code=500, detail="Face detection model not available")
results = face_model.predict(image, verbose=False)
if results and results[0].boxes and results[0].boxes.xyxy.shape[0] > 0:
x1, y1, x2, y2 = results[0].boxes.xyxy[0].cpu().numpy().astype(int)
cropped_face = image[y1:y2, x1:x2]
return cropped_face
raise HTTPException(status_code=400, detail="No face detected in the image")
def save_metadata(collection, metadata: dict) -> str:
"""Save metadata to MongoDB collection and return the inserted ID as string."""
result = collection.insert_one(metadata)
return str(result.inserted_id)
def convert_objectid_to_str(data: Any) -> Any:
"""Recursively convert ObjectId fields to strings in MongoDB documents."""
if isinstance(data, list):
return [convert_objectid_to_str(item) for item in data]
elif isinstance(data, dict):
converted = {}
for key, value in data.items():
if isinstance(value, ObjectId):
converted[key] = str(value)
elif isinstance(value, dict):
converted[key] = convert_objectid_to_str(value)
elif isinstance(value, list):
converted[key] = [convert_objectid_to_str(item) for item in value]
else:
converted[key] = value
return converted
elif isinstance(data, ObjectId):
return str(data)
else:
return data
def match_face_with_db(known_image: np.ndarray, collection) -> List[Dict]:
"""Match a face against faces in a MongoDB collection using DeepFace."""
matches = []
try:
for doc in collection.find():
try:
face_blob = doc.get("face_blob")
if not face_blob:
continue
# Decode base64 image
img_bytes = base64.b64decode(face_blob)
np_arr = np.frombuffer(img_bytes, np.uint8)
compare_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
if compare_image is None:
continue
# Perform face verification
result = DeepFace.verify(
known_image,
compare_image,
model_name="ArcFace",
enforce_detection=False
)
if result["verified"]:
# Convert ObjectIds to strings before adding to matches
match_doc = convert_objectid_to_str(doc)
matches.append(match_doc)
except Exception as e:
logger.error(f"Error matching individual face: {str(e)}")
continue
except Exception as e:
logger.error(f"Error in face matching process: {str(e)}")
return matches
def send_email_via_sendgrid(to_email: str, subject: str, html_content: str,
attachment_data: Optional[str] = None,
attachment_name: str = "attachment.jpg") -> bool:
"""Send email using SendGrid API via direct HTTP requests."""
try:
url = "https://api.sendgrid.com/v3/mail/send"
headers = {
"Authorization": f"Bearer {SENDGRID_API_KEY}",
"Content-Type": "application/json"
}
# Basic email payload
email_payload = {
"personalizations": [
{
"to": [{"email": to_email}],
"subject": subject
}
],
"from": {"email": EMAIL_SENDER},
"content": [
{
"type": "text/html",
"value": html_content
}
]
}
# Add attachment if provided
if attachment_data:
try:
email_payload["attachments"] = [
{
"content": attachment_data,
"type": "image/jpeg",
"filename": attachment_name,
"disposition": "attachment"
}
]
except Exception as e:
logger.error(f"Error adding attachment to SendGrid email: {e}")
# Send the email
response = requests.post(url, headers=headers, json=email_payload)
if response.status_code in [200, 201, 202]:
logger.info(f"Email sent successfully via SendGrid to {to_email}")
return True
else:
logger.error(f"SendGrid API returned status code: {response.status_code}, Response: {response.text}")
return False
except Exception as e:
logger.error(f"Error sending email via SendGrid API: {e}")
return False
def send_email_via_smtp(to_email: str, subject: str, html_content: str,
attachment_data: Optional[str] = None,
attachment_name: str = "attachment.jpg") -> bool:
"""Send email using SMTP (Gmail)."""
try:
message = MIMEMultipart()
message["From"] = EMAIL_SENDER
message["To"] = to_email
message["Subject"] = subject
message.attach(MIMEText(html_content, "html"))
# Add attachment if provided
if attachment_data:
try:
part = MIMEBase("application", "octet-stream")
part.set_payload(base64.b64decode(attachment_data))
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename={attachment_name}"
)
message.attach(part)
except Exception as e:
logger.error(f"Error adding attachment to email: {e}")
context = ssl.create_default_context()
with smtplib.SMTP_SSL("smtp.gmail.com", 465, context=context) as server:
server.login(EMAIL_SENDER, EMAIL_PASSWORD)
server.sendmail(EMAIL_SENDER, to_email, message.as_string())
logger.info(f"Email sent successfully via SMTP to {to_email}")
return True
except Exception as e:
logger.error(f"Error sending email via SMTP: {e}")
return False
def send_email_alert(subject: str, body: str, to_email: str,
image_blob: Optional[str] = None) -> Dict[str, Any]:
"""Send email alert using configured method."""
if not EMAIL_CONFIGURED:
logger.warning("Email not configured - cannot send alert")
return {"success": False, "method": "none", "error": "Email not configured"}
if not to_email:
logger.error("No recipient email provided")
return {"success": False, "method": EMAIL_METHOD, "error": "No recipient email"}
success = False
if EMAIL_METHOD == "sendgrid":
success = send_email_via_sendgrid(to_email, subject, body, image_blob)
elif EMAIL_METHOD == "smtp":
success = send_email_via_smtp(to_email, subject, body, image_blob)
return {
"success": success,
"method": EMAIL_METHOD,
"to": to_email,
"subject": subject
}
def send_match_emails(lost_person: Dict, found_person: Dict) -> Dict[str, Any]:
"""Send email notifications for matched persons."""
results = {
"lost_email_sent": False,
"found_email_sent": False,
"errors": []
}
time_found = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Get email addresses
lost_email = None
found_email = None
if isinstance(lost_person.get("contact_details"), dict):
lost_email = lost_person["contact_details"].get("email_id")
else:
lost_email = lost_person.get("email_id")
if isinstance(found_person.get("contact_details"), dict):
found_email = found_person["contact_details"].get("email_id")
else:
found_email = found_person.get("email_id")
# Email to person who reported lost person
if lost_email:
lost_subject = "🎉 GREAT NEWS! Your Lost Person Has Been Found!"
lost_body = f"""
🎉 Wonderful News!
We have found a match for {lost_person.get('name', 'the person')} you reported missing!
📍 Found Person Details:
- Location Found: {found_person.get('location_found', found_person.get('where_found', 'Not specified'))}
- Found By: {found_person.get('reported_by', {}).get('name', found_person.get('your_name', 'Unknown'))}
- Organization: {found_person.get('reported_by', {}).get('organization', found_person.get('organization', 'Not specified'))}
- Time Found: {time_found}
📞 Contact Information:
Mobile: {found_person.get('contact_details', {}).get('mobile_no', found_person.get('mobile_no', 'Not provided'))}
Email: {found_email or 'Not provided'}
Please contact them immediately to coordinate the reunion.
This is an automated message from the Lost & Found Person Tracker system.
"""
result = send_email_alert(lost_subject, lost_body, lost_email, found_person.get("face_blob"))
results["lost_email_sent"] = result["success"]
if not result["success"]:
results["errors"].append(
f"Failed to send email to lost person reporter: {result.get('error', 'Unknown error')}")
# Email to person who found the person
if found_email:
found_subject = "🔍 Match Confirmed - Lost Person Identified!"
found_body = f"""
🔍 Match Confirmed!
The person you found has been identified as {lost_person.get('name', 'a missing person')}!
👤 Lost Person Details:
- Name: {lost_person.get('name', 'Not specified')}
- Age: {lost_person.get('age', 'Not specified')}
- Where Lost: {lost_person.get('where_lost', 'Not specified')}
- Reported By: {lost_person.get('reporter_name', lost_person.get('your_name', 'Unknown'))}
- Relation: {lost_person.get('relation_with_lost', 'Not specified')}
📞 Family Contact Information:
Mobile: {lost_person.get('contact_details', {}).get('mobile_no', lost_person.get('mobile_no', 'Not provided'))}
Email: {lost_email or 'Not provided'}
Thank you for helping reunite this family! Please coordinate with them for safe return.
This is an automated message from the Lost & Found Person Tracker system.
"""
result = send_email_alert(found_subject, found_body, found_email, lost_person.get("face_blob"))
results["found_email_sent"] = result["success"]
if not result["success"]:
results["errors"].append(
f"Failed to send email to found person reporter: {result.get('error', 'Unknown error')}")
return results
def send_live_feed_alert(live_feed_data: Dict, matched_lost_persons: List[Dict]) -> Dict[str, Any]:
"""Send urgent alerts when lost persons are spotted in live feed."""
results = {
"alerts_sent": 0,
"total_matches": len(matched_lost_persons),
"errors": []
}
detection_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
camera_info = {
"camera_id": live_feed_data.get("camera_id", "Unknown"),
"location": live_feed_data.get("location", "Unknown location"),
"details": live_feed_data.get("location_details", live_feed_data.get("where_found", "No details")),
"operator": live_feed_data.get("reported_by", {}).get("name", live_feed_data.get("your_name", "Unknown")),
"organization": live_feed_data.get("reported_by", {}).get("organization",
live_feed_data.get("organization", "Unknown"))
}
for lost_person in matched_lost_persons:
# Get contact email
contact_email = None
if isinstance(lost_person.get("contact_details"), dict):
contact_email = lost_person["contact_details"].get("email_id")
else:
contact_email = lost_person.get("email_id")
if not contact_email:
results["errors"].append(f"No email found for {lost_person.get('name', 'unknown person')}")
continue
subject = f"🚨 URGENT: {lost_person.get('name', 'Missing Person')} Spotted on Security Camera!"
body = f"""
🚨 URGENT ALERT 🚨
Missing Person Spotted!
{lost_person.get('name', 'Your missing person')} has been spotted on a security camera!
📹 Camera Information:
- Camera ID: {camera_info['camera_id']}
- Location: {camera_info['location']}
- Specific Area: {camera_info['details']}
- Detection Time: {detection_time}
- Operator: {camera_info['operator']} ({camera_info['organization']})
📞 Immediate Action Required:
- Contact local authorities immediately
- Head to the location: {camera_info['location']}
- Coordinate with security: {camera_info['operator']}
⚠️ Important:
This is a live detection. The person may still be in the area. Act quickly but safely.
Time is critical - Every moment counts!
"""
result = send_email_alert(subject, body, contact_email, live_feed_data.get("face_blob"))
if result["success"]:
results["alerts_sent"] += 1
else:
results["errors"].append(
f"Failed to send alert for {lost_person.get('name', 'unknown')}: {result.get('error', 'Unknown error')}")
return results
@app.post("/upload_lost")
async def upload_lost_person(
name: str = Form(...),
gender: str = Form(...),
age: int = Form(...),
where_lost: str = Form(...),
your_name: str = Form(...),
relation_with_lost: str = Form(...),
user_id: str = Form(...),
mobile_no: str = Form(...),
email_id: str = Form(...),
file: UploadFile = File(...)
):
"""Upload a lost person record with face recognition."""
try:
# Read and process image
contents = await file.read()
image = cv2.imdecode(np.frombuffer(contents, np.uint8), cv2.IMREAD_COLOR)
if image is None:
raise HTTPException(status_code=400, detail="Invalid image file")
# Generate unique face ID
face_id = str(uuid.uuid4())
# Extract face from image
cropped_face = crop_face(image)
# Encode face image to base64
_, buffer = cv2.imencode('.jpg', cropped_face)
image_blob = base64.b64encode(buffer).decode('utf-8')
# Create metadata
metadata = {
"face_id": face_id,
"name": name,
"gender": gender,
"age": age,
"where_lost": where_lost,
"reporter_name": your_name,
"relation_with_lost": relation_with_lost,
"user_id": user_id,
"contact_details": {
"mobile_no": mobile_no,
"email_id": email_id
},
"face_blob": image_blob,
"upload_time": datetime.now().isoformat(),
"status": "pending"
}
# Save to database
save_metadata(lost_collection, metadata)
logger.info(f"Lost person record created: {face_id}")
# Match against found people and live feed
matched_found = match_face_with_db(cropped_face, found_collection)
matched_live = match_face_with_db(cropped_face, live_feed_collection)
all_matches = matched_found + matched_live
email_results = {"sent_count": 0, "errors": []}
# Process matches and send emails
for match in all_matches:
try:
# Send match emails
email_result = send_match_emails(metadata, match)
# Create match record
match_data = {
"match_id": str(uuid.uuid4()),
"lost_face_id": face_id,
"found_face_id": match.get("face_id"),
"match_time": datetime.now().isoformat(),
"match_status": "confirmed",
"lost_person": convert_objectid_to_str(metadata),
"found_person": convert_objectid_to_str(match),
"email_results": email_result
}
save_metadata(match_collection, match_data)
if email_result.get("lost_email_sent") or email_result.get("found_email_sent"):
email_results["sent_count"] += 1
email_results["errors"].extend(email_result.get("errors", []))
except Exception as e:
logger.error(f"Error processing match: {e}")
email_results["errors"].append(f"Error processing match: {str(e)}")
# Prepare response data with ObjectId conversion
response_data = {
"message": "Lost person uploaded successfully.",
"face_id": face_id,
"matched_found": convert_objectid_to_str(matched_found),
"matched_live": convert_objectid_to_str(matched_live),
"total_matches": len(all_matches),
"email_results": email_results
}
logger.info(f"Lost person upload completed: {face_id}")
return response_data
except HTTPException:
raise
except Exception as e:
logger.error(f"Error uploading lost person: {e}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@app.post("/upload_found")
async def upload_found_person(
name: str = Form(...),
gender: str = Form(...),
age: int = Form(...),
where_found: str = Form(...),
your_name: str = Form(...),
organization: str = Form(...),
designation: str = Form(...),
user_id: str = Form(...),
mobile_no: str = Form(...),
email_id: str = Form(...),
file: UploadFile = File(...)
):
"""Upload a found person record with face recognition."""
try:
# Read and process image
contents = await file.read()
image = cv2.imdecode(np.frombuffer(contents, np.uint8), cv2.IMREAD_COLOR)
if image is None:
raise HTTPException(status_code=400, detail="Invalid image file")
# Generate unique face ID
face_id = str(uuid.uuid4())
# Extract face from image
cropped_face = crop_face(image)
# Encode face image to base64
_, buffer = cv2.imencode('.jpg', cropped_face)
image_blob = base64.b64encode(buffer).decode('utf-8')
# Create metadata
metadata = {
"face_id": face_id,
"name": name,
"gender": gender,
"age": age,
"location_found": where_found,
"reported_by": {
"name": your_name,
"organization": organization,
"designation": designation
},
"user_id": user_id,
"contact_details": {
"mobile_no": mobile_no,
"email_id": email_id
},
"face_blob": image_blob,
"upload_time": datetime.now().isoformat(),
"status": "pending"
}
# Save to database
save_metadata(found_collection, metadata)
logger.info(f"Found person record created: {face_id}")
# Match against lost people
matched_lost = match_face_with_db(cropped_face, lost_collection)
email_results = {"sent_count": 0, "errors": []}
# Process matches and send emails
for match in matched_lost:
try:
# Send match emails
email_result = send_match_emails(match, metadata)
# Create match record
match_data = {
"match_id": str(uuid.uuid4()),
"lost_face_id": match.get("face_id"),
"found_face_id": face_id,
"match_time": datetime.now().isoformat(),
"match_status": "confirmed",
"lost_person": convert_objectid_to_str(match),
"found_person": convert_objectid_to_str(metadata),
"email_results": email_result
}
save_metadata(match_collection, match_data)
if email_result.get("lost_email_sent") or email_result.get("found_email_sent"):
email_results["sent_count"] += 1
email_results["errors"].extend(email_result.get("errors", []))
except Exception as e:
logger.error(f"Error processing match: {e}")
email_results["errors"].append(f"Error processing match: {str(e)}")
# Prepare response data with ObjectId conversion
response_data = {
"message": "Found person uploaded successfully.",
"face_id": face_id,
"matched_lost": convert_objectid_to_str(matched_lost),
"total_matches": len(matched_lost),
"email_results": email_results
}
logger.info(f"Found person upload completed: {face_id}")
return response_data
except HTTPException:
raise
except Exception as e:
logger.error(f"Error uploading found person: {e}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@app.post("/upload_live_feed")
async def upload_live_feed(
camera_id: str = Form(...),
where_found: str = Form(...),
location: str = Form(...),
your_name: str = Form(...),
organization: str = Form(...),
designation: str = Form(...),
user_id: str = Form(...),
mobile_no: str = Form(...),
email_id: str = Form(...),
file: UploadFile = File(...)
):
"""Upload live feed data and check for matches with lost persons."""
try:
# Read and process image
contents = await file.read()
image = cv2.imdecode(np.frombuffer(contents, np.uint8), cv2.IMREAD_COLOR)
if image is None:
raise HTTPException(status_code=400, detail="Invalid image file")
# Generate unique face ID
face_id = str(uuid.uuid4())
# Extract face from image
cropped_face = crop_face(image)
# Encode face image to base64
_, buffer = cv2.imencode('.jpg', cropped_face)
image_blob = base64.b64encode(buffer).decode('utf-8')
# Create metadata
metadata = {
"face_id": face_id,
"camera_id": camera_id,
"location": location,
"location_details": where_found,
"reported_by": {
"name": your_name,
"organization": organization,
"designation": designation
},
"user_id": user_id,
"contact_details": {
"mobile_no": mobile_no,
"email_id": email_id
},
"face_blob": image_blob,
"upload_time": datetime.now().isoformat(),
"status": "active"
}
# Save to database
save_metadata(live_feed_collection, metadata)
logger.info(f"Live feed record created: {face_id}")
# Match against lost people - this is critical for live alerts
matched_lost = match_face_with_db(cropped_face, lost_collection)
alert_results = {"alerts_sent": 0, "total_matches": len(matched_lost), "errors": []}
if matched_lost:
# Send urgent alerts for live feed matches
alert_results = send_live_feed_alert(metadata, matched_lost)
# Create match records for each match
for match in matched_lost:
try:
match_data = {
"match_id": str(uuid.uuid4()),
"lost_face_id": match.get("face_id"),
"live_feed_face_id": face_id,
"match_time": datetime.now().isoformat(),
"match_status": "live_spotted",
"lost_person": convert_objectid_to_str(match),
"live_feed_data": convert_objectid_to_str(metadata),
"alert_results": alert_results
}
save_metadata(match_collection, match_data)
except Exception as e:
logger.error(f"Error creating match record: {e}")
alert_results["errors"].append(f"Error creating match record: {str(e)}")
# Clean up old live feed records (keep only latest 500)
try:
total_records = live_feed_collection.count_documents({})
if total_records > 500:
# Delete oldest records
records_to_delete = total_records - 500
oldest_records = live_feed_collection.find().sort("upload_time", 1).limit(records_to_delete)
for record in oldest_records:
live_feed_collection.delete_one({"_id": record["_id"]})
logger.info(f"Cleaned up {records_to_delete} old live feed records")
except Exception as e:
logger.error(f"Error cleaning up live feed records: {e}")
# Prepare response data with ObjectId conversion
response_data = {
"message": "Live feed uploaded successfully.",
"face_id": face_id,
"matched_lost": convert_objectid_to_str(matched_lost),
"total_matches": len(matched_lost),
"alert_results": alert_results,
"metadata": convert_objectid_to_str(metadata)
}
logger.info(f"Live feed upload completed: {face_id}")
return response_data
except HTTPException:
raise
except Exception as e:
logger.error(f"Error uploading live feed: {e}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@app.get("/get_records_by_user/{user_id}")
async def get_records_by_user(user_id: str):
"""Get all records uploaded by a specific user."""
try:
collections_info = [
("lost_people", lost_collection),
("found_people", found_collection),
("live_feed", live_feed_collection),
("match_records", match_collection)
]
records = []
for collection_name, collection in collections_info:
try:
docs = list(collection.find({"user_id": user_id}))
for doc in docs:
doc = convert_objectid_to_str(doc)
records.append({"source": collection_name, "data": doc})
except Exception as e:
logger.error(f"Error querying {collection_name}: {e}")
if records:
return {
"message": f"Found {len(records)} records for user {user_id}.",
"user_id": user_id,
"total_records": len(records),
"records": records
}
else:
return {
"message": f"No records found for user {user_id}.",
"user_id": user_id,
"total_records": 0,
"records": []
}
except Exception as e:
logger.error(f"Error getting records for user {user_id}: {e}")
raise HTTPException(status_code=500, detail=f"Error retrieving records: {str(e)}")
@app.get("/search_face/{face_id}")
async def search_face(face_id: str):
"""Search for a specific face ID across all collections."""
try:
collections_info = [
("lost_people", lost_collection),
("found_people", found_collection),
("live_feed", live_feed_collection),
("match_records", match_collection)
]
for collection_name, collection in collections_info:
try:
record = collection.find_one({"face_id": face_id})
if record:
record = convert_objectid_to_str(record)
return {
"message": f"Face found in {collection_name}.",
"collection": collection_name,
"face_id": face_id,
"record": record
}
except Exception as e:
logger.error(f"Error searching in {collection_name}: {e}")
raise HTTPException(status_code=404, detail=f"Face ID {face_id} not found in any collection.")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error searching for face {face_id}: {e}")
raise HTTPException(status_code=500, detail=f"Error searching for face: {str(e)}")
# Additional endpoints for frontend functionality
@app.get("/get_all_lost")
async def get_all_lost_people():
"""Get all lost people records."""
try:
records = list(lost_collection.find().sort("upload_time", -1))
records = convert_objectid_to_str(records)
return {
"message": "Lost people records retrieved successfully.",
"total_count": len(records),
"records": records
}
except Exception as e:
logger.error(f"Error getting lost people: {e}")
raise HTTPException(status_code=500, detail="Error retrieving lost people records")
@app.get("/get_all_found")
async def get_all_found_people():
"""Get all found people records."""
try:
records = list(found_collection.find().sort("upload_time", -1))
records = convert_objectid_to_str(records)
return {
"message": "Found people records retrieved successfully.",
"total_count": len(records),
"records": records
}
except Exception as e:
logger.error(f"Error getting found people: {e}")
raise HTTPException(status_code=500, detail="Error retrieving found people records")
@app.get("/get_all_live_feed")
async def get_all_live_feed():
"""Get all live feed records."""
try:
records = list(live_feed_collection.find().sort("upload_time", -1).limit(100))
records = convert_objectid_to_str(records)
return {
"message": "Live feed records retrieved successfully.",
"total_count": len(records),
"records": records
}
except Exception as e:
logger.error(f"Error getting live feed: {e}")
raise HTTPException(status_code=500, detail="Error retrieving live feed records")
@app.get("/get_all_matches")
async def get_all_matches():
"""Get all match records."""
try:
records = list(match_collection.find().sort("match_time", -1))
records = convert_objectid_to_str(records)
return {
"message": "Match records retrieved successfully.",
"total_count": len(records),
"records": records
}
except Exception as e:
logger.error(f"Error getting matches: {e}")
raise HTTPException(status_code=500, detail="Error retrieving match records")
@app.post("/test_email")
async def test_email_configuration(test_email: str = Form(...)):
"""Test email configuration by sending a test email."""
try:
if not EMAIL_CONFIGURED:
return {
"status": "error",
"message": "Email is not configured. Please check your environment variables.",
"method": "none"
}
subject = "Test Email - Lost & Found System"
body = f"""
✅ Email Test Successful!
This is a test email from the Lost & Found Person Tracker system.
System Information:
- Email Method: {EMAIL_METHOD.upper()}
- Sender: {EMAIL_SENDER}
- Test Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
If you received this email, your email configuration is working correctly!
This is an automated test message.
"""
result = send_email_alert(subject, body, test_email)
if result["success"]:
return {
"status": "success",
"message": f"Test email sent successfully to {test_email}",
"method": EMAIL_METHOD,
"details": result
}
else:
return {
"status": "error",
"message": f"Failed to send test email: {result.get('error', 'Unknown error')}",
"method": EMAIL_METHOD,
"details": result
}
except Exception as e:
logger.error(f"Error testing email configuration: {e}")
return {
"status": "error",
"message": f"Error testing email: {str(e)}",
"method": EMAIL_METHOD
}
@app.get("/health")
async def health_check():
"""Check system health and configuration."""
try:
# Test database connection
db_status = "connected"
try:
client.admin.command('ping')
except Exception as e:
db_status = f"error: {str(e)}"
# Check collections
collections_status = {}
try:
collections_status = {
"lost_people": lost_collection.count_documents({}),
"found_people": found_collection.count_documents({}),
"live_feed": live_feed_collection.count_documents({}),
"matches": match_collection.count_documents({})
}
except Exception as e:
collections_status = {"error": str(e)}
return {
"status": "healthy" if db_status == "connected" else "unhealthy",
"timestamp": datetime.now().isoformat(),
"database": db_status,
"collections": collections_status,
"email_configured": EMAIL_CONFIGURED,
"email_method": EMAIL_METHOD,
"email_sender": EMAIL_SENDER if EMAIL_CONFIGURED else "Not configured",
"face_model_loaded": face_model is not None
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return {
"status": "error",
"timestamp": datetime.now().isoformat(),
"error": str(e)
}
@app.get("/stats")
async def get_statistics():
"""Get system statistics."""
try:
stats = {
"lost_people": lost_collection.count_documents({}),
"found_people": found_collection.count_documents({}),
"live_feed": live_feed_collection.count_documents({}),
"matches": match_collection.count_documents({}),
"last_updated": datetime.now().isoformat()
}
return stats
except Exception as e:
logger.error(f"Error getting statistics: {e}")
raise HTTPException(status_code=500, detail="Error retrieving statistics")