Spaces:
Sleeping
Sleeping
from flask import Flask, render_template, request, jsonify, session, redirect, url_for, send_file | |
from flask_sqlalchemy import SQLAlchemy | |
import google.generativeai as genai | |
import PyPDF2 | |
import os | |
import re | |
import json | |
from datetime import datetime | |
import io | |
from twilio.rest import Client | |
import base64 | |
import uuid | |
from reportlab.lib.pagesizes import letter | |
from reportlab.pdfgen import canvas | |
from reportlab.lib import colors | |
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle | |
from reportlab.lib.enums import TA_LEFT, TA_RIGHT, TA_CENTER, TA_JUSTIFY | |
import logging | |
# Set up basic logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
app = Flask(__name__, instance_path='/tmp') | |
# Use a strong, unique secret key | |
app.secret_key = os.getenv('SECRET_KEY', '688ed745a74bdd7ac238f5b50f4104fb87d6774b8b0a4e06e7e18ac5ed0fa31c') # CHANGE THIS IN PRODUCTION | |
# Database Configuration | |
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL', 'sqlite:////tmp/patients.db') | |
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False | |
db = SQLAlchemy(app) | |
# Define the Patient Model | |
class Patient(db.Model): | |
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4())) | |
name = db.Column(db.String(100), nullable=False) | |
age = db.Column(db.Integer) | |
gender = db.Column(db.String(50)) | |
feedback = db.Column(db.Text, nullable=False) | |
original_plan = db.Column(db.Text) # Store text from uploaded PDF | |
updated_plan = db.Column(db.Text, nullable=False) # Store generated plan text | |
status = db.Column(db.String(50), default='stable') | |
timestamp = db.Column(db.DateTime, default=datetime.utcnow) | |
def __repr__(self): | |
return f"<Patient {self.name} ({self.id})>" | |
def to_dict(self): | |
return { | |
'id': self.id, | |
'name': self.name, | |
'age': self.age, | |
'gender': self.gender, | |
'feedback': self.feedback, | |
'original_plan': self.original_plan, | |
'updated_plan': self.updated_plan, | |
'status': self.status, | |
'timestamp': self.timestamp.strftime("%Y-%m-%d %H:%M:%S") if self.timestamp else None | |
} | |
# Create database tables if they don't exist | |
def create_tables(): | |
# Ensure this runs only once per request or use a flag | |
# Simple approach for development: check if tables exist | |
if not app.config.get('_tables_created', False): | |
with app.app_context(): | |
# Check if at least one table exists (e.g., the Patient table) | |
# This check is basic and might not be sufficient for complex setups | |
inspector = db.inspect(db.engine) | |
if not inspector.has_table("patient"): | |
logger.info("Creating database tables.") | |
db.create_all() | |
app.config['_tables_created'] = True # Set flag | |
upload_base = os.getenv('UPLOAD_DIR', '/tmp/uploads') | |
upload_folder = os.path.join(upload_base, 'pdfs') | |
app.config['UPLOAD_FOLDER'] = upload_folder | |
# Ensure the folder exists at runtime | |
os.makedirs(upload_folder, exist_ok=True) | |
# Twilio Configuration | |
ACCOUNT_SID = os.getenv('TWILIO_ACCOUNT_SID') | |
AUTH_TOKEN = os.getenv('TWILIO_AUTH_TOKEN') | |
TWILIO_FROM = os.getenv('TWILIO_FROM_NUMBER') | |
TWILIO_TO = os.getenv('TWILIO_TO_NUMBER') # Hardcoded number as requested, consider making this configurable per patient | |
twilio_client = None | |
if ACCOUNT_SID and AUTH_TOKEN and TWILIO_FROM and TWILIO_TO: | |
try: | |
twilio_client = Client(ACCOUNT_SID, AUTH_TOKEN) | |
logger.info("Twilio client initialized successfully.") | |
except Exception as e: | |
logger.error(f"Error initializing Twilio client (check SID/Token/Network): {e}") | |
twilio_client = None | |
else: | |
logger.warning("Twilio environment variables (SID, Token, From, To) not fully set. WhatsApp sending disabled.") | |
# Gemini API Configuration | |
GENAI_API_KEY = os.getenv('GENAI_API_KEY') | |
model = None | |
if GENAI_API_KEY: | |
try: | |
genai.configure(api_key=GENAI_API_KEY) | |
logger.info("Gemini API configured successfully.") | |
generation_config = { | |
"temperature": 0.8, | |
"top_p": 0.9, | |
"top_k": 30, | |
"max_output_tokens": 4096, | |
} | |
model = genai.GenerativeModel( | |
model_name="gemini-1.5-flash-latest", | |
generation_config=generation_config, | |
# Optionally add safety settings | |
# safety_settings=[ | |
# {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, | |
# {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, | |
# {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, | |
# {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, | |
# {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, # Duplicate, remove | |
# ] | |
) | |
logger.info(f"Using Gemini model: {model.model_name}") | |
except Exception as e: | |
logger.error(f"Error configuring Gemini API or loading model: {e}") | |
model = None | |
else: | |
logger.warning("GENAI_API_KEY environment variable not set. AI generation disabled.") | |
def extract_text_from_pdf(pdf_file): | |
"""Extract text from uploaded PDF file""" | |
try: | |
# Ensure the file pointer is at the beginning | |
pdf_file.seek(0) | |
# Use PdfReader which is the newer class | |
pdf_reader = PyPDF2.PdfReader(pdf_file) | |
text = "" | |
if pdf_reader.is_encrypted: | |
try: | |
# Attempt decryption - PyPDF2 v3+ might not need password for simple cases | |
# or might fail differently. Explicitly try common/empty passwords. | |
# Note: Robust decryption requires knowing the password, which is not handled here. | |
# This attempt is basic. | |
try: | |
pdf_reader.decrypt('') # Try with empty password | |
except PyPDF2.errors.PasswordError: | |
# If empty password fails, and it's truly password protected | |
logger.warning("PDF is encrypted and requires a password.") | |
return "[PDF Content Unavailable: File is encrypted and requires a password]" | |
except Exception as dec_e: | |
# Handle other potential decryption errors | |
logger.error(f"Error during PDF decryption attempt: {dec_e}") | |
return "[PDF Content Unavailable: Error decrypting file]" | |
except Exception: # Catch any other unexpected error during decryption check | |
logger.error("Unexpected error during PDF decryption check.") | |
return "[PDF Content Unavailable: Unexpected error with encrypted file]" | |
for page_num in range(len(pdf_reader.pages)): | |
try: | |
page = pdf_reader.pages[page_num] | |
page_text = page.extract_text() | |
if page_text: | |
text += page_text + "\n" | |
except Exception as page_e: | |
logger.error(f"Error extracting text from page {page_num + 1}: {page_e}") | |
text += f"[Error extracting page {page_num + 1}]\n" # Add placeholder for failed pages | |
return text.strip() or "[No readable text found in PDF]" | |
except Exception as e: | |
logger.error(f"Error extracting PDF text: {e}", exc_info=True) | |
return f"[Error extracting PDF text: {e}]" | |
def extract_care_plan_format(pdf_text): | |
"""Extract a general format template from PDF text by identifying common section headers.""" | |
if not pdf_text or "[No readable text found" in pdf_text or "[Error extracting PDF text" in pdf_text or "[PDF Content Unavailable" in pdf_text: | |
logger.info("No valid PDF text available to extract format.") | |
return None | |
# Look for lines that seem like headers followed by a colon, possibly with content below | |
# Pattern: Start of line, followed by one or more uppercase letters or spaces, ending with a colon. | |
# Use word boundaries \b to avoid matching things like "MEDICATION:" within a sentence. | |
# Refined pattern: Allow optional space before colon, and optional space/newline after colon before content starts. | |
# It also accounts for the possibility that a header is the last thing in the text. | |
potential_headers = re.findall(r'^\b([A-Z][A-Z\s]*)\b[ \t]*:(?:[\s\r\n]|$)', pdf_text, re.MULTILINE) | |
if not potential_headers: | |
# Fallback: Look for lines that start with a capital letter and seem like standalone headers | |
# (e.g., "Patient Information", "Assessment.") | |
# Ensure it doesn't look like a sentence. | |
# Add more constraints: must end with colon or period, or be followed by a newline and indented text? | |
# Simple fallback: Starts with Capital, contains mostly letters/spaces, ends with colon or period. | |
fallback_headers_strict = re.findall(r'^[A-Z][A-Za-z\s]*[:.][ \t]*$', pdf_text, re.MULTILINE) | |
if fallback_headers_strict: | |
logger.info(f"Extracted potential headers (fallback - strict): {list(set(fallback_headers_strict))}") | |
# Remove ending colon/period and strip whitespace | |
unique_headers = sorted(list(set([re.sub(r'[:.\s]*$', '', h).strip() for h in fallback_headers_strict if re.sub(r'[:.\s]*$', '', h).strip()]))) | |
format_template = "\n".join([f"{header.strip()}:" for header in unique_headers if header.strip()]) # Sort for consistency | |
return format_template if format_template.strip() else None | |
# Less strict fallback: Starts with Capital, seems like a short phrase line | |
fallback_headers_loose = re.findall(r'^[A-Z][A-Za-z\s]{3,}[ \t]*$', pdf_text, re.MULTILINE) | |
fallback_headers_loose = [h.strip() for h in fallback_headers_loose if h.strip()] | |
if fallback_headers_loose: | |
# Further filter to remove things that look like sentence beginnings | |
# Check if the line is followed by a line starting with a bullet or indentation? Too complex. | |
# Simple filter: check length and word count. | |
fallback_headers_loose = [h for h in fallback_headers_loose if len(h) > 5 and len(h.split()) < 6] # Example filter | |
if fallback_headers_loose: | |
logger.info(f"Extracted potential headers (fallback - loose): {list(set(fallback_headers_loose))}") | |
unique_headers = sorted(list(set([h.strip() for h in fallback_headers_loose if h.strip()]))) | |
format_template = "\n".join([f"{header.strip()}:" for header in unique_headers if header.strip()]) # Sort for consistency | |
return format_template if format_template.strip() else None | |
logger.info("No sections or potential headers found in PDF.") | |
return None | |
# Use a set to get unique headers and sort them for consistency | |
unique_headers = sorted(list(set([h.strip() for h in potential_headers if h.strip()]))) | |
if not unique_headers: | |
logger.info("Extracted headers are empty after cleaning.") | |
return None | |
format_template = "" | |
for section_title in unique_headers: | |
format_template += f"{section_title.strip()}:\n- [Details]\n" # Use a simple list format placeholder | |
logger.info(f"Extracted sections: {unique_headers}") | |
return format_template if format_template.strip() else None | |
# --- OPTIMIZED determine_patient_status FUNCTION --- | |
def determine_patient_status(original_plan, updated_plan, feedback): | |
""" | |
Determine patient status based on feedback, original plan, and the *final* updated plan text. | |
Prioritizes emergency > deteriorating > improving. Checks feedback/original first, | |
then checks updated plan for emergency confirmation. Uses refined keyword matching. | |
""" | |
logger.info("Determining patient status...") | |
feedback_lower = feedback.lower() if feedback else "" | |
original_plan_lower = original_plan.lower() if original_plan else "" | |
updated_plan_lower = updated_plan.lower() if updated_plan else "" # Include updated plan lower | |
# Define robust keyword lists | |
# Emergency Keywords: Indicate immediate threat to life or limb, requires urgent medical intervention. | |
emergency_keywords = [ | |
"severe chest pain", "heart attack", "sudden shortness of breath", | |
"difficulty breathing severely", "loss of consciousness", "unresponsive", | |
"extreme pain level 10", "sudden weakness one side", "signs of stroke", | |
"slurred speech sudden", "severe headache sudden", "worst headache of my life", | |
"severe abdominal pain acute", "persistent vomiting blood", "uncontrolled bleeding", | |
"severe allergic reaction", "anaphylaxis", "immediate medical attention", | |
"emergency", "call 911", "go to er", "hospital immediately", "critical condition", "ambulance", | |
"collapsed", "unconscious", "seizure activity", "convulsion", | |
"suffocating", "not breathing", "blue lips", "blue face", "cardiac arrest", | |
"signs of shock", "severe dehydration symptoms", "acute change in mental status", | |
"unstable vitals", "rapidly worsening symptoms", "can't breathe", "chest tight severe", | |
"severe difficulty swallowing suddenly", "new onset paralysis", "severe burns", | |
"major trauma", "suspected poisoning", "overdose", "suicidal thoughts active", # Added qualifiers | |
"unstable", "critically ill", "life-threatening", "no pulse", "low oxygen saturation severe" | |
] | |
# Deteriorating Keywords: Indicate condition is worsening, not improving as expected, or new concerning symptoms. Requires prompt medical review. | |
deteriorating_keywords = [ | |
"worsening", "increased pain", "not improving", "deteriorating", | |
"getting worse", "more frequent symptoms", "elevated", "higher", | |
"concerning change", "decline", "decreased function", "less able", "more difficult", | |
"aggravated", "intensified", "escalating", "weakening", "relapse of symptoms", | |
"recurrence", "regressing", "not responding to treatment", "increased severity", | |
"progressing", "progressive symptoms", "complicated", "adverse change", | |
"needs urgent attention", "significant increase in", "new symptoms appeared", | |
"trouble with walking", "reduced appetite significantly", "difficulty sleeping constantly", | |
"tired all the time", "much weaker", "feeling noticeably worse", "consistent high blood pressure", | |
"uncontrolled blood sugar levels", "increased swelling", "persistent cough getting worse", | |
"unexplained weight loss significant", "gradual decline", "unmanaged symptoms", | |
"not resolving", "changes in condition", "difficulty performing daily tasks" # Added more | |
] | |
# Improvement Keywords: Indicate condition is getting better, symptoms are resolving, or goals are being met. | |
improvement_keywords = [ | |
"improving", "better today", "reduced pain", "lower", "less frequent", | |
"healing well", "recovery on track", "making progress", "stable condition", | |
"maintained progress", "consistent improvement", "well-controlled", "responding well", | |
"good progress", "getting better", "positive change", "improved significantly", | |
"enhancement", "advancement", "resolving", "resolved completely", "recovering", | |
"normalized", "normal range", "responding positively", "effective treatment", | |
"successful treatment", "managed well", "under control", "symptoms decreased", | |
"feeling stronger", "better sleep quality", "increased appetite", "pain decreased", | |
"more energy", "walking further", "blood pressure normal range", | |
"blood sugar stable", "swelling reduced", "easier breathing", "cough improving", | |
"weight gain healthy", "feeling like myself again", "in remission", "managing well at home", | |
"tolerating well", "no issues reported", "feeling good", "symptoms gone" # Added more positive | |
] | |
# Helper to check if any keyword is found in the text | |
def check_keywords(text, keywords): | |
# Create a regex pattern for whole words, handling potential special characters in keywords | |
pattern = r'\b(?:' + '|'.join(re.escape(kw) for kw in keywords) + r')\b' | |
return re.search(pattern, text) is not None | |
# --- Classification Logic --- | |
# Combine feedback and original plan text for initial assessment | |
combined_initial_text_lower = feedback_lower + " " + (original_plan_lower if original_plan_lower else "") | |
combined_initial_text_lower = re.sub(r'\s+', ' ', combined_initial_text_lower).strip() # Clean up spaces | |
# 2. Check for DETERIORATING status (Second Priority) | |
# Check combined initial text for deteriorating keywords (only if not emergency) | |
is_deteriorating_initial = check_keywords(combined_initial_text_lower, deteriorating_keywords) | |
if is_deteriorating_initial: | |
logger.info("Status determined: DETERIORATING (keyword found in feedback/original).") | |
return "deteriorating" | |
# 3. Check for IMPROVING status (Third Priority) | |
# Check combined initial text for improving keywords (only if not emergency or deteriorating) | |
is_improving_initial = check_keywords(combined_initial_text_lower, improvement_keywords) | |
if is_improving_initial: | |
logger.info("Status determined: IMPROVING (keyword found in feedback/original).") | |
return "improving" | |
# 1. Check for EMERGENCY status (Highest Priority) | |
# Check combined initial text for emergency keywords | |
is_emergency_initial = check_keywords(combined_initial_text_lower, emergency_keywords) | |
# Also, check if the *final generated plan* explicitly contains strong emergency keywords. | |
# This helps catch cases where the AI correctly inferred emergency from subtle cues | |
# or context, even if the patient's feedback wasn't explicitly critical *using the exact keywords*. | |
is_emergency_final_plan = check_keywords(updated_plan_lower, emergency_keywords) | |
if is_emergency_initial: | |
logger.info("Status determined: EMERGENCY (keyword found in feedback/original or final plan).") | |
return "emergency" | |
# 4. Default to STABLE if no specific status keywords are found | |
logger.info("Status determined: STABLE (no specific status keywords found).") | |
return "stable" | |
def generate_care_plan_pdf(patient_info, care_plan_text, status): | |
"""Generate a PDF of the care plan with improved styling""" | |
buffer = io.BytesIO() | |
doc = SimpleDocTemplate(buffer, pagesize=letter, | |
leftMargin=72, rightMargin=72, | |
topMargin=72, bottomMargin=72) | |
styles = getSampleStyleSheet() | |
# Custom Styles | |
title_style = ParagraphStyle( | |
'Title', | |
parent=styles['Heading1'], | |
fontSize=22, alignment=TA_CENTER, spaceAfter=25, textColor=colors.HexColor("#4e73df"), | |
fontName='Helvetica-Bold' # Use bold font | |
) | |
heading_style = ParagraphStyle( | |
'Heading', | |
parent=styles['Heading2'], | |
fontSize=15, spaceAfter=8, spaceBefore=18, textColor=colors.HexColor("#1cc88a"), | |
fontName='Helvetica-Bold' # Use bold font | |
) | |
normal_style = ParagraphStyle( | |
'Normal', | |
parent=styles['Normal'], | |
fontSize=11, spaceAfter=6, leading=14, textColor=colors.HexColor("#5a5c69"), | |
alignment=TA_JUSTIFY # Justify text | |
) | |
bullet_style = ParagraphStyle( | |
'Bullet', | |
parent=styles['Normal'], | |
fontSize=11, spaceAfter=3, leftIndent=20, leading=14, bulletIndent=10, textColor=colors.HexColor("#5a5c69"), | |
alignment=TA_JUSTIFY # Justify text | |
) | |
feedback_style = ParagraphStyle( | |
'Feedback', | |
parent=normal_style, | |
spaceBefore=10, spaceAfter=10, backColor=colors.HexColor("#f8f9fc"), | |
borderWidth=0.5, borderColor=colors.HexColor("#dee2e6"), borderPadding=6, | |
borderRadius=5, | |
textColor=colors.HexColor("#212529") # Darker text for readability on light background | |
) | |
status_colors = { | |
'emergency': colors.HexColor("#e74a3b"), | |
'deteriorating': colors.HexColor("#f6c23e"), | |
'improving': colors.HexColor("#1cc88a"), | |
'stable': colors.HexColor("#36b9cc"), | |
'unknown': colors.black | |
} | |
status_text_styles = { | |
'emergency': ParagraphStyle('StatusEmergency', parent=styles['Heading2'], fontSize=16, spaceBefore=10, spaceAfter=15, textColor=status_colors['emergency'], alignment=TA_CENTER, fontName='Helvetica-Bold'), | |
'deteriorating': ParagraphStyle('StatusDeteriorating', parent=styles['Heading2'], fontSize=15, spaceBefore=10, spaceAfter=15, textColor=status_colors['deteriorating'], alignment=TA_CENTER, fontName='Helvetica-Bold'), | |
'improving': ParagraphStyle('StatusImproving', parent=styles['Heading2'], fontSize=15, spaceBefore=10, spaceAfter=15, textColor=status_colors['improving'], alignment=TA_CENTER, fontName='Helvetica-Bold'), | |
'stable': ParagraphStyle('StatusStable', parent=styles['Heading2'], fontSize=15, spaceBefore=10, spaceAfter=15, textColor=status_colors['stable'], alignment=TA_CENTER, fontName='Helvetica-Bold'), | |
'unknown': ParagraphStyle('StatusUnknown', parent=styles['Heading2'], fontSize=15, spaceBefore=10, spaceAfter=15, textColor=colors.black, alignment=TA_CENTER, fontName='Helvetica-Bold'), | |
} | |
story = [] | |
story.append(Paragraph("Patient Care Plan", title_style)) | |
status_map_text = { | |
'emergency': "🚨 EMERGENCY - IMMEDIATE ACTION REQUIRED 🚨", | |
'deteriorating': "⚠️ HIGH RISK - Condition Deteriorating ⚠️", | |
'improving': "✅ LOW RISK - Condition Improving ✅", | |
'stable': "🟦 STABLE - Maintain Current Care 🟦", | |
'unknown': "Status: Unknown" | |
} | |
current_status_text = status_map_text.get(status, 'unknown') | |
current_status_style = status_text_styles.get(status, status_text_styles['unknown']) | |
story.append(Paragraph(current_status_text, current_status_style)) | |
patient_data = [ | |
[Paragraph("<b>Patient Name:</b>", normal_style), Paragraph(patient_info.get('name', 'N/A'), normal_style)], | |
[Paragraph("<b>Age:</b>", normal_style), Paragraph(str(patient_info.get('age', 'N/A')), normal_style)], | |
[Paragraph("<b>Gender:</b>", normal_style), Paragraph(patient_info.get('gender', 'N/A'), normal_style)], | |
[Paragraph("<b>Generated Date:</b>", normal_style), Paragraph(datetime.now().strftime("%Y-%m-%d %H:%M"), normal_style)] | |
] | |
# Using a Table for patient info | |
# Adjust colWidths to handle page width better | |
available_width = letter[0] - (doc.leftMargin + doc.rightMargin) | |
patient_table = Table(patient_data, colWidths=[available_width/3, available_width/3*2]) # Divide width into 1/3 and 2/3 | |
patient_table.setStyle(TableStyle([ | |
('BACKGROUND', (0, 0), (0, -1), colors.HexColor("#f2f2f2")), | |
('TEXTCOLOR', (0, 0), (-1, -1), colors.black), | |
('ALIGN', (0, 0), (0, -1), 'LEFT'), # Left align headers | |
('ALIGN', (1, 0), (-1, -1), 'LEFT'), # Left align details | |
('VALIGN', (0, 0), (-1, -1), 'TOP'), | |
('INNERGRID', (0, 0), (-1, -1), 0.25, colors.HexColor("#dddddd")), | |
('BOX', (0, 0), (-1, -1), 0.25, colors.HexColor("#dddddd")), | |
('LEFTPADDING', (0, 0), (-1, -1), 6), | |
('RIGHTPADDING', (0, 0), (-1, -1), 6), | |
('TOPPADDING', (0, 0), (-1, -1), 6), | |
('BOTTOMPADDING', (0, 0), (-1, -1), 6), | |
('FONTNAME', (0,0), (-1,-1), 'Helvetica'), # Ensure standard font | |
('FONTNAME', (0,0), (0,-1), 'Helvetica-Bold'), # Bold left column | |
])) | |
story.append(patient_table) | |
story.append(Spacer(1, 25)) | |
# Add Feedback Section (if available) | |
if patient_info.get('feedback') and patient_info['feedback'].strip(): # Check if feedback is not empty | |
story.append(Paragraph("Patient Feedback:", heading_style)) | |
story.append(Spacer(1, 5)) | |
# Using Paragraph with the feedback_style to format the text | |
# Replace newline with <br/> for ReportLab | |
feedback_content = patient_info['feedback'].strip().replace('\n', '<br/>') | |
story.append(Paragraph(feedback_content, feedback_style)) | |
story.append(Spacer(1, 15)) | |
story.append(Paragraph("Care Plan Details:", heading_style)) | |
story.append(Spacer(1, 10)) | |
# Process care plan text line by line | |
lines = care_plan_text.strip().split('\n') | |
for line in lines: | |
stripped_line = line.strip() | |
if not stripped_line: | |
continue | |
# Check for section headers (e.g., "MEDICATIONS:", "ASSESSMENT:") | |
# Look for lines starting with one or more uppercase words followed by a colon or period | |
header_match = re.match(r'^([A-Z][A-Z\s]*)\b[ \t]*[:.](\s|$)', stripped_line) # Added period check | |
if header_match: | |
# Remove trailing colon or period for cleaner heading | |
header_text = re.sub(r'[:.\s]*$', '', stripped_line).strip() | |
story.append(Spacer(1, 8)) # Add space before a new section | |
story.append(Paragraph(header_text + ":", heading_style)) # Use heading style for sections | |
# Check for list items (starting with -, *, •) | |
elif stripped_line.startswith('-') or stripped_line.startswith('*') or stripped_line.startswith('•'): | |
# Remove the bullet character and any leading space/tab | |
bullet_text = re.sub(r'^[-*•][ \t]*', '', line).strip() | |
if bullet_text: | |
formatted_bullet_text = bullet_text.replace('\n', '<br/>') | |
story.append(Paragraph(f"• {formatted_bullet_text}", bullet_style)) | |
else: | |
# Handle cases with just a bullet point on a line | |
story.append(Paragraph("• ", bullet_style)) | |
else: | |
# Handle regular paragraph text | |
normal_line_content = line.strip().replace('\n', '<br/>') | |
story.append(Paragraph(normal_line_content, normal_style)) | |
story.append(Spacer(1, 20)) | |
footer_style = ParagraphStyle('Footer', parent=styles['Normal'], fontSize=9, alignment=TA_CENTER, textColor=colors.grey) | |
story.append(Paragraph(f"Generated by Patient Care Management System on {datetime.now().strftime('%Y-%m-%d %H:%M')}", footer_style)) | |
try: | |
doc.build(story) | |
buffer.seek(0) | |
logger.info("PDF generated successfully.") | |
return buffer | |
except Exception as e: | |
logger.error(f"Error building PDF: {e}", exc_info=True) | |
error_buffer = io.BytesIO() | |
c = canvas.Canvas(error_buffer, pagesize=letter) | |
c.drawString(100, 750, "Error Generating Care Plan PDF") | |
c.drawString(100, 735, f"Details: {str(e)}") | |
c.save() | |
error_buffer.seek(0) | |
return error_buffer | |
def send_whatsapp_care_plan(patient_info, care_plan_text, status): | |
"""Send care plan via WhatsApp using Twilio with improved formatting""" | |
if not twilio_client: | |
logger.warning("Twilio client not configured. Cannot send WhatsApp message.") | |
return False, "Twilio client not configured." | |
if not TWILIO_TO or not TWILIO_FROM: | |
logger.warning("Twilio TO or FROM number not set.") | |
return False, "Twilio TO or FROM number not configured." | |
# Basic check for empty plan text before sending | |
if not care_plan_text or not care_plan_text.strip(): | |
logger.warning("Care plan text is empty. Cannot send WhatsApp message.") | |
return False, "Care plan text is empty." | |
try: | |
status_emoji = { | |
'emergency': "🚨 EMERGENCY", | |
'deteriorating': "⚠️ HIGH RISK", | |
'improving': "✅ IMPROVING", | |
'stable': "🟦 STABLE", | |
'unknown': "⚪ Status: Unknown" | |
} | |
# Clean and format the plan text for WhatsApp | |
formatted_plan = care_plan_text.strip() | |
# Replace multiple newlines with double newline for paragraphs | |
formatted_plan = re.sub(r'\n{2,}', '\n\n', formatted_plan) | |
# Replace common list bullet formats with WhatsApp bullet | |
formatted_plan = formatted_plan.replace('- ', '• ').replace('* ', '• ') | |
# Attempt to bold section headers - look for lines ending with a colon or period, possibly followed by whitespace | |
formatted_plan_lines = [] | |
for line in formatted_plan.split('\n'): | |
stripped_line = line.strip() | |
# Check for lines that look like headers (starts with capital, ends with colon/period, possibly followed by whitespace) | |
if re.match(r'^[A-Z][A-Z\s]*\b[ \t]*[:.](\s|$)', stripped_line + '\n'): # Add newline for robust match | |
# Remove trailing colon/period before bolding | |
header_text = re.sub(r'[:.\s]*$', '', stripped_line).strip() | |
if header_text: # Only bold if there's actual text before the colon/period | |
formatted_plan_lines.append(f"*{header_text}:*") | |
else: # Handle cases like just ":" on a line | |
formatted_plan_lines.append(line) | |
else: | |
formatted_plan_lines.append(line) | |
formatted_plan = '\n'.join(formatted_plan_lines) | |
message = f"*Care Plan Update*\n\n" | |
message += f"*Patient Name:* {patient_info.get('name', 'N/A')}\n" | |
message += f"*Age:* {patient_info.get('age', 'N/A')}\n" | |
message += f"*Gender:* {patient_info.get('gender', 'N/A')}\n" | |
message += f"*Status:* {status_emoji.get(status, 'Unknown')}\n\n" | |
# Add feedback if available | |
feedback_text = patient_info.get('feedback') | |
if feedback_text and feedback_text.strip(): # Check if feedback is not empty | |
message += f"*Latest Feedback:*\n{feedback_text.strip()}\n\n" | |
message += f"*Care Plan Details:*\n{formatted_plan}" | |
logger.info(f"Attempting to send WhatsApp message to {TWILIO_TO}...") | |
message_sent = twilio_client.messages.create( | |
from_=TWILIO_FROM, | |
body=message, | |
to=TWILIO_TO | |
) | |
logger.info(f"WhatsApp message sent, SID: {message_sent.sid}") | |
return True, "WhatsApp message sent successfully." | |
except Exception as e: | |
logger.error(f"Error sending WhatsApp message: {e}", exc_info=True) | |
# Provide more specific Twilio error details if available | |
twilio_error_message = str(e) | |
# Attempt to parse Twilio API error response | |
if hasattr(e, 'status_code') and hasattr(e, 'text'): | |
try: | |
error_details = json.loads(e.text) | |
if 'message' in error_details: | |
twilio_error_message = f"Twilio API error: {error_details['message']} (Code: {error_details.get('code')})" | |
except json.JSONDecodeError: | |
pass # Fallback to generic error if JSON parsing fails | |
except Exception: | |
pass # Catch other potential exceptions during parsing/formatting | |
return False, f"Error sending WhatsApp: {twilio_error_message}" | |
def index(): | |
role = session.get('role', 'patient') | |
# If doctor role is in session but user visits '/', redirect to doctor dashboard | |
if role == 'doctor': | |
return redirect(url_for('doctor_dashboard')) | |
# Otherwise, render patient index | |
return render_template('index.html') | |
def switch_role(): | |
role = request.form.get('role') | |
if role in ['patient', 'doctor']: | |
session['role'] = role | |
logger.info(f"Role switched to: {role}") | |
# Redirect to the appropriate page after switching | |
if role == 'doctor': | |
return redirect(url_for('doctor_dashboard')) | |
else: | |
return redirect(url_for('index')) # Redirect to patient home | |
logger.warning(f"Invalid role switch attempted: {role}") | |
return jsonify({'success': False, 'error': 'Invalid role'}), 400 | |
def doctor_dashboard(): | |
# Ensure user is marked as doctor in session when accessing this page directly | |
if session.get('role') != 'doctor': | |
session['role'] = 'doctor' | |
logger.info("Accessed doctor dashboard, setting role to doctor.") | |
return render_template('doctor_dashboard.html') | |
def submit_feedback(): | |
ai_enabled = bool(model) # Check if model is initialized | |
ai_error_message = None # Initialize error message | |
try: | |
name = request.form.get('name', 'Unnamed Patient') | |
age = request.form.get('age') | |
gender = request.form.get('gender', 'N/A') | |
feedback = request.form.get('feedback', '').strip() # Strip whitespace | |
if not name or not feedback: | |
logger.warning("Submission failed: Patient Name or Feedback missing.") | |
return jsonify({'success': False, 'error': 'Patient Name and Feedback are required.'}), 400 | |
# Basic sanitization/validation for age | |
if age: | |
try: | |
age = int(age) | |
if age <= 0: raise ValueError("Age must be positive") | |
if age > 150: raise ValueError("Age seems unreasonably high") # More realistic sanity check | |
except ValueError: | |
logger.warning(f"Submission failed: Invalid Age provided: {age}") | |
return jsonify({'success': False, 'error': 'Invalid Age provided.'}), 400 | |
else: | |
age = None # Store as None if not provided | |
care_plan_text = "" # This will store the extracted text from PDF | |
care_plan_format = None # This will store the detected format | |
if 'care_plan_pdf' in request.files: | |
pdf_file = request.files['care_plan_pdf'] | |
if pdf_file and pdf_file.filename != '': | |
# Check file extension | |
if not pdf_file.filename.lower().endswith('.pdf'): | |
logger.warning("Submission failed: Invalid file type uploaded.") | |
return jsonify({'success': False, 'error': 'Invalid file type. Only PDF files are allowed.'}), 400 | |
logger.info(f"Processing uploaded PDF: {pdf_file.filename}") | |
care_plan_text = extract_text_from_pdf(pdf_file) | |
# If extraction resulted in an error message, set format to None | |
if "[Error extracting PDF text" in care_plan_text or "[No readable text found" in care_plan_text or "[PDF Content Unavailable" in care_plan_text: | |
care_plan_format = None | |
logger.warning(f"PDF text extraction failed or empty: {care_plan_text}") | |
else: | |
care_plan_format = extract_care_plan_format(care_plan_text) | |
logger.info(f"Extracted text length: {len(care_plan_text)}. Format found: {care_plan_format is not None}") | |
else: | |
logger.info("No PDF file uploaded or file is empty.") | |
# Determine the initial status based on feedback and original plan | |
# Pass "" for updated_plan initially, as it hasn't been generated yet for status check | |
initial_status = determine_patient_status(care_plan_text, "", feedback) | |
logger.info(f"Initial status determined based on feedback/original plan: {initial_status}") | |
generated_plan_text = "" # This will store the AI-generated or fallback plan | |
final_status_to_save = initial_status # Start with initial status | |
# Only generate AI plan if AI is enabled AND status isn't immediate emergency based on feedback | |
# If feedback triggers "emergency", the generated_plan_text is a fixed emergency plan. | |
if final_status_to_save == 'emergency': | |
logger.info("Emergency status detected. Generating fixed emergency plan.") | |
generated_plan_text = ( | |
"🚨 *EMERGENCY - IMMEDIATE ACTION REQUIRED* 🚨\n\n" | |
"PATIENT INFORMATION:\n" | |
f"- Name: {name}\n" | |
f"- Age: {age if age is not None else 'N/A'}\n" | |
f"- Gender: {gender}\n\n" | |
"ASSESSMENT:\n" | |
f"- Emergency symptoms reported: {feedback}. Immediate medical attention required.\n\n" | |
"EMERGENCY ACTION PLAN:\n" | |
"- *Call emergency services immediately* at your local emergency number (e.g., 104/108/109/112).\n" | |
"- Do not delay seeking medical help. If possible, have someone stay with the patient.\n" | |
"- If conscious, help the patient into a comfortable position (e.g., upright for breathing difficulties, on back with legs elevated for shock).\n" | |
"- Do not give food or drink until evaluated by medical professionals.\n" | |
"- Prepare relevant medical history, medication list, and previous care plan if available for emergency responders.\n" | |
"- Follow *all* instructions from emergency responders.\n\n" | |
"RED FLAGS / WHEN TO SEEK HELP:\n" | |
"- *Any* worsening of reported emergency symptoms requires urgent re-evaluation by medical professionals.\n" | |
"- Do not attempt to manage severe symptoms at home once emergency signs are present.\n\n" | |
"FOLLOW-UP:\n" | |
"- Immediate hospitalization or urgent medical evaluation is necessary.\n" | |
"- Inform your primary physician/care team as soon as medically stable.\n" | |
"- A new care plan will be developed after the emergency situation is resolved and evaluated by medical professionals.\n" | |
) | |
# For emergency, the generated plan text *is* the final plan to save/send. | |
# Status remains 'emergency', which was already set as final_status_to_save. | |
elif ai_enabled: # AI is enabled and initial status is not emergency | |
# Define or get the format template | |
if not care_plan_format or not care_plan_format.strip(): | |
logger.info("Using default care plan format as extraction failed or returned empty.") | |
care_plan_format = """ | |
PATIENT INFORMATION: | |
- Name: [Patient Name] | |
- Age: [Age] | |
- Gender: [Gender] | |
ASSESSMENT: | |
- [Summary of patient's current condition based on feedback and previous plan] | |
DAILY CARE PLAN: | |
Morning: | |
- [Morning activities/medications/checks] | |
Afternoon: | |
- [Afternoon activities/medications/checks] | |
Night: | |
- [Night activities/medications/sleep instructions/checks] | |
MEDICATIONS: | |
- list of medicines (speciifc ones, based on user symptomms and dosage and frequency like (ex: fro fever paracetamol 500mg 3 times a day)) | |
DIET AND HYDRATION | |
- [what to eat , what to not eat ,speciific one slike ex : ooat,green vegetables, Beetrroot for blood defieicneny, avoid brger,pizza,samosa junk food etc...just ex] | |
- [ how many litres of water to drink in day, and other hydrated things toi dirnk etc...] | |
Fitness , Exercise and Home Remedies : | |
- [ what exercise to do for fitness and increase immunity, any Indian Hoem remeides on disease like (haldi ka kadha for cold,clough just an example)] | |
SYMPTOM MANAGEMENT: | |
- [Short instructions for managing specific symptoms (e.g., pain, nausea, shortness of breath), non-pharmacological interventions] | |
""" # Enhanced default format | |
prompt = f""" | |
You are a helpful and highly structured AI assistant generating updated patient care plans. | |
Your task is to create a NEW, comprehensive, and well-structured daily care plan based on the provided patient's personal information, their current symptoms/feedback, and their previous care plan (if available). | |
Patient Information: | |
Name: {name} | |
Age: {age if age is not None else 'N/A'} | |
Gender: {gender} | |
Patient Feedback/Symptoms Update: | |
{feedback} | |
Previous Care Plan Details (if available): | |
{care_plan_text if care_plan_text and "[No readable text found" not in care_plan_text and "[Error extracting PDF text" not in care_plan_text and "[PDF Content Unavailable" not in care_plan_text else "No previous care plan provided or could not be read."} | |
Instructions: | |
1. Generate the updated care plan strictly using the exact following format template. | |
2. Populate each section of the template based on the patient's information, their *latest feedback/symptoms*, and integrate relevant, SAFE, and appropriate elements from the previous plan if they are still applicable and helpful given the feedback. | |
3. Prioritize addressing the issues raised in the patient feedback and give only prp[er formated,allinged,jsutifed and spaced text only, no tbales should be there. | |
4. Be specific, actionable, and realistic in your recommendations (e.g., specify *what* activities, *when* to take medications and *what" medications and diet to follow). | |
5. Ensure the language is clear, empathetic, and easy for a patient or caregiver to understand. Avoid overly technical jargon where possible. | |
7. Do NOT include any introductory phrases (like "Here is the updated plan:", "Based on your feedback...") or concluding sentences outside the plan structure. Provide ONLY the structured content that fits within the templateand dont follow ai ethics like as an ai i ammnot able to give ,tehcnical consulatation etc | |
8. If the previous care plan was unavailable or unreadable, create the plan based solely on the patient information and feedback, still following the template. | |
9. Ensure the plan is medically sound and reflects standard care principles. also give specific medications rexomemndations based on patient symptoms (ex : paracetamol for fever etc) | |
10. If the feedback indicates significant improvement, the plan should reflect this (e.g., adjusting activity levels up, noting successful symptom management) while still including monitoring and red flags. | |
11. Review the feedback and previous plan carefully to determine the most likely current STATUS (e.g., Emergency, Deteriorating, Improving, Stable) and ensure the plan content aligns with that status, especially the RED FLAGS section. ensure plan does not exist whatsapp message character limit | |
12. kindly, do not produce emergency contact section drop it okay,also in any section dont give like not provided, not known,need to be added like stufff, cosnider youself super intellgient and real physician | |
13. also ensure that only three patient info paprmaeters name ,age,gender provided and nothing else okay. do not give vague parameters like contact no, caretkae no,emegency no etc..also do not give nay useless or vague content.also dont mention that as an ai i cant give ans, consult to professional,if you dont know give near about answers | |
Care Plan Format Template (hey only give in text format do not use tbales okay ,a lso ensure that plan is short and no section great than 4 sentences okay): | |
{care_plan_format} | |
""" | |
logger.info("Sending prompt to AI model...") | |
try: | |
response = model.generate_content(prompt) | |
# Access text attribute | |
generated_plan_text = response.text.strip() | |
# Remove markdown code block formatting if present | |
if generated_plan_text.startswith('```') and generated_plan_text.endswith('```'): | |
# Find the first newline after ``` to potentially strip language name | |
first_newline_after_code = generated_plan_text.find('\n') | |
if first_newline_after_code != -1: | |
# Check if there's a language name before the newline | |
potential_lang = generated_plan_text[3:first_newline_after_code].strip() | |
if re.match(r'^[a-zA-Z0-9]+$', potential_lang): # Simple check for language name | |
generated_plan_text = generated_plan_text[first_newline_after_code:].strip() | |
else: | |
# No language name or unexpected format, just strip ``` | |
generated_plan_text = generated_plan_text[3:].strip() | |
else: | |
# Handle case where ``` is on the last line without newline | |
generated_plan_text = generated_plan_text[3:].strip() | |
# Strip ending ``` | |
if generated_plan_text.endswith('```'): | |
generated_plan_text = generated_plan_text[:-3].strip() | |
logger.info(f"AI Response received. Length: {len(generated_plan_text)}") | |
# Re-determine the final status using the generated plan as well. | |
# This is important because the AI might infer severity the keyword matching missed initially, | |
# or the generated plan text itself might contain explicit strong status indicators. | |
final_status_to_save = determine_patient_status(care_plan_text, generated_plan_text, feedback) | |
logger.info(f"Final status determined after AI generation: {final_status_to_save}") | |
except Exception as ai_error: | |
logger.error(f"Error generating content from AI: {ai_error}", exc_info=True) | |
# If AI fails, construct an error message plan | |
generated_plan_text = f"[Error generating updated plan from AI: {ai_error}]\n\n" | |
if care_plan_text and "[No readable text found" not in care_plan_text and "[Error extracting PDF text" not in care_plan_text and "[PDF Content Unavailable" not in care_plan_text: | |
generated_plan_text += "Falling back to original plan if available:\n\n" + care_plan_text | |
# If falling back to original, status should reflect original plan/feedback | |
final_status_to_save = determine_patient_status(care_plan_text, care_plan_text, feedback) # Use original plan for status check if AI failed | |
ai_error_message = f"AI generation failed. Showing original plan if available. Error: {ai_error}" | |
else: | |
generated_plan_text += "No previous plan available." | |
# Status based on feedback only if no original plan fallback | |
final_status_to_save = determine_patient_status(None, None, feedback) # Only use feedback for status check | |
ai_error_message = f"AI generation failed. No previous plan available. Error: {ai_error}" | |
logger.info(f"AI failed, final status after fallback: {final_status_to_save}") | |
else: # AI is not enabled and status is not emergency | |
logger.warning("AI generation is disabled.") | |
generated_plan_text = f"[AI generation is currently disabled.]\n\n" | |
if care_plan_text and "[No readable text found" not in care_plan_text and "[Error extracting PDF text" not in care_plan_text and "[PDF Content Unavailable" not in care_plan_text: | |
generated_plan_text += "Showing original plan if available:\n\n" + care_plan_text | |
# Status based on original plan/feedback | |
final_status_to_save = determine_patient_status(care_plan_text, care_plan_text, feedback) | |
ai_error_message = 'AI generation is disabled. Using original plan or feedback.' | |
else: | |
generated_plan_text += "No previous plan available." | |
# Status based on feedback only | |
final_status_to_save = determine_patient_status(None, None, feedback) | |
ai_error_message = 'AI generation is disabled. No previous plan available.' | |
logger.info(f"AI disabled, final status: {final_status_to_save}") | |
# Create and store patient record in the database | |
new_patient = Patient( | |
name=name, | |
age=age, | |
gender=gender, | |
feedback=feedback, | |
original_plan=care_plan_text, # Store extracted text, could be error message | |
updated_plan=generated_plan_text, # Store generated/fallback plan text | |
status=final_status_to_save, # Store determined status | |
timestamp=datetime.utcnow() | |
) | |
db.session.add(new_patient) | |
db.session.commit() | |
patient_id = new_patient.id | |
logger.info(f"Patient {patient_id} added to DB with status: {final_status_to_save}.") | |
# Generate PDF for downloading using the stored data | |
# Note: We pass the patient object directly to the PDF generator for simplicity | |
# and to include feedback in the PDF | |
pdf_buffer = generate_care_plan_pdf(new_patient.to_dict(), new_patient.updated_plan, new_patient.status) | |
pdf_buffer.seek(0) # Ensure buffer is at the start before base64 encoding | |
pdf_base64 = base64.b64encode(pdf_buffer.getvalue()).decode('utf-8') | |
logger.info("PDF generated and base64 encoded.") | |
# Send care plan via WhatsApp (using the final saved data) | |
# Note: We pass the patient object directly to the WhatsApp function | |
whatsapp_sent, whatsapp_message = send_whatsapp_care_plan(new_patient.to_dict(), new_patient.updated_plan, new_patient.status) | |
logger.info(f"WhatsApp message attempt sent: {whatsapp_sent}, message: {whatsapp_message}") | |
# Return success response, include relevant data | |
return jsonify({ | |
'success': True, | |
'updated_plan': new_patient.updated_plan, # Return the final saved plan | |
'pdf_data': pdf_base64, | |
'patient_id': patient_id, | |
'status': new_patient.status, # Return the final determined status | |
'whatsapp_sent': whatsapp_sent, | |
'whatsapp_message': whatsapp_message, | |
'ai_error': not ai_enabled or (ai_error_message is not None) # Indicate if AI was not enabled or failed | |
}) | |
except Exception as e: | |
logger.error(f"An unexpected error occurred during submission: {str(e)}", exc_info=True) | |
db.session.rollback() | |
return jsonify({ | |
'success': False, | |
'error': f'An unexpected server error occurred: {str(e)}' | |
}), 500 | |
# --- New routes for Doctor Dashboard actions --- | |
def update_care_plan(patient_id): | |
"""Endpoint for doctor to save edited care plan text.""" | |
try: | |
data = request.get_json() | |
if not data or 'updated_plan' not in data: | |
logger.warning(f"Update failed for ID {patient_id}: Invalid data.") | |
return jsonify({'success': False, 'error': 'Invalid data provided.'}), 400 | |
updated_plan_text = data['updated_plan'].strip() | |
patient = Patient.query.get(patient_id) | |
if not patient: | |
logger.warning(f"Update failed for ID {patient_id}: Patient not found.") | |
return jsonify({'success': False, 'error': 'Patient not found.'}), 404 | |
# Re-determine status based on the manually updated plan + existing feedback/original? | |
# Yes, this makes sense. If the doctor edits the plan, it should potentially change the status indication | |
# if their edits include stronger language about severity or improvement. | |
patient.status = determine_patient_status(patient.original_plan, updated_plan_text, patient.feedback) | |
patient.updated_plan = updated_plan_text | |
patient.timestamp = datetime.utcnow() # Update timestamp when plan is saved | |
db.session.commit() | |
logger.info(f"Updated plan saved for patient ID: {patient_id}, new status: {patient.status}") | |
return jsonify({ | |
'success': True, | |
'message': 'Care plan saved successfully.', | |
'patient_id': patient_id, | |
'status': patient.status, # Return the updated status | |
'timestamp': patient.timestamp.strftime("%Y-%m-%d %H:%M:%S") # Return the updated timestamp | |
}) | |
except Exception as e: | |
logger.error(f"Error updating care plan for ID {patient_id}: {str(e)}", exc_info=True) | |
db.session.rollback() | |
return jsonify({'success': False, 'error': f'An error occurred while saving the plan: {str(e)}'}), 500 | |
def send_whatsapp_doctor(patient_id): | |
"""Endpoint for doctor to send WhatsApp message for a patient.""" | |
try: | |
patient = Patient.query.get(patient_id) | |
if not patient: | |
logger.warning(f"WhatsApp send failed for ID {patient_id}: Patient not found.") | |
return jsonify({'success': False, 'error': 'Patient not found.'}), 404 | |
# Use the latest data from the database for the message | |
patient_info_for_whatsapp = patient.to_dict() # Gets name, age, gender, feedback etc. | |
care_plan_text_to_send = patient.updated_plan | |
status_to_send = patient.status | |
whatsapp_sent, whatsapp_message = send_whatsapp_care_plan( | |
patient_info_for_whatsapp, | |
care_plan_text_to_send, | |
status_to_send | |
) | |
if whatsapp_sent: | |
logger.info(f"WhatsApp sent successfully for patient ID: {patient_id}") | |
return jsonify({'success': True, 'message': whatsapp_message}) | |
else: | |
logger.error(f"WhatsApp failed for patient ID: {patient_id} - {whatsapp_message}") | |
# Use 500 for server-side error only if it's a backend issue, otherwise 400 maybe? | |
# Let's stick to 500 for general failure to send via backend service. | |
return jsonify({'success': False, 'error': whatsapp_message}), 500 | |
except Exception as e: | |
logger.error(f"Error triggering WhatsApp send for ID {patient_id}: {str(e)}", exc_info=True) | |
return jsonify({'success': False, 'error': f'An error occurred while sending WhatsApp: {str(e)}'}), 500 | |
# --- Existing routes remain below --- | |
def download_pdf(patient_id): | |
logger.info(f"Download requested for patient ID: {patient_id}") | |
try: | |
patient = Patient.query.get(patient_id) | |
if not patient: | |
logger.warning(f"Patient ID {patient_id} not found in database for download.") | |
return "Patient data not found.", 404 | |
# Pass the full patient dict to PDF generator for feedback inclusion | |
pdf_buffer = generate_care_plan_pdf( | |
patient.to_dict(), | |
patient.updated_plan, | |
patient.status | |
) | |
pdf_buffer.seek(0) | |
safe_name = re.sub(r'[^a-zA-Z0-9_\-]', '', patient.name or 'patient').lower() | |
download_name = f"care_plan_{safe_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf" | |
logger.info(f"Serving PDF for {patient_id} as {download_name}") | |
return send_file( | |
pdf_buffer, | |
as_attachment=True, | |
download_name=download_name, | |
mimetype='application/pdf' | |
) | |
except Exception as e: | |
logger.error(f"PDF Download Error for ID {patient_id}: {str(e)}", exc_info=True) | |
return f"Error generating PDF for download: {str(e)}", 500 | |
def get_emergency_notifications(): | |
# Only include patients whose status is 'emergency' | |
# Exclude temporary 'emergency' status from new submissions before AI runs, | |
# perhaps only include statuses confirmed by AI or manual save? | |
# For simplicity now, just filter by final status == 'emergency' in DB. | |
emergency_patients_query = Patient.query.filter_by(status='emergency').order_by(Patient.timestamp.desc()) | |
# Only return basic info needed for the alert, not full patient details | |
notifications = [{'id': p.id, 'name': p.name, 'status': p.status} for p in emergency_patients_query.all()] | |
logger.info(f"Found {len(notifications)} emergency notifications.") | |
return jsonify({ | |
'success': True, | |
'notifications': notifications | |
}) | |
def get_patients(): | |
all_patients_query = Patient.query.order_by(Patient.timestamp.desc()) | |
# Include status in the dictionary representation | |
patients_list = [p.to_dict() for p in all_patients_query.all()] | |
logger.info(f"Retrieved {len(patients_list)} patients from DB.") | |
return jsonify({ | |
'success': True, | |
'patients': patients_list | |
}) | |
def get_patient(patient_id): | |
logger.info(f"API request for patient ID: {patient_id}") | |
patient = Patient.query.get(patient_id) | |
if not patient: | |
logger.warning(f"Patient ID {patient_id} not found in database for API request.") | |
return jsonify({'success': False, 'error': 'Patient not found.'}), 404 | |
logger.info(f"Found patient {patient_id}: {patient.name}") | |
# Return the full patient data including original and updated plans | |
return jsonify({ | |
'success': True, | |
'patient': patient.to_dict() | |
}) | |
# Route to handle patient deletion | |
def delete_patient(patient_id): | |
logger.info(f"Delete requested for patient ID: {patient_id}") | |
try: | |
# Find the patient by ID | |
patient = Patient.query.get(patient_id) | |
if not patient: | |
logger.warning(f"Patient ID {patient_id} not found for deletion.") | |
return jsonify({'success': False, 'error': 'Patient not found.'}), 404 | |
# Delete the patient from the database | |
db.session.delete(patient) | |
db.session.commit() | |
logger.info(f"Patient ID {patient_id} deleted successfully.") | |
return jsonify({'success': True, 'message': 'Patient deleted successfully.'}) | |
except Exception as e: | |
logger.error(f"Error deleting patient ID {patient_id}: {str(e)}", exc_info=True) | |
db.session.rollback() # Rollback changes if deletion fails | |
return jsonify({'success': False, 'error': f'An error occurred during deletion: {str(e)}'}), 500 | |
if __name__ == '__main__': | |
# Create database tables if they don't exist within the application context | |
with app.app_context(): | |
# Check if tables exist before creating to avoid errors on subsequent runs | |
inspector = db.inspect(db.engine) | |
if not inspector.has_table("patient"): # Check for at least one model's table | |
logger.info("Database tables not found, creating.") | |
db.create_all() | |
else: | |
logger.info("Database tables already exist.") | |
# Use a more robust development server like Waitress or Gunicorn in production | |
# For development, debug=True is fine | |
app.run(debug=True) |