Spaces:
Running
Running
# app.py - Gradio version for Hugging Face Spaces | |
import gradio as gr | |
import requests | |
import json | |
import time | |
from typing import Dict, Any, Tuple, List | |
# Configuration - Can switch between local and AWS | |
LOCAL_API = "http://localhost:8080" | |
AWS_API = "https://rj92jktas7.us-east-2.awsapprunner.com" | |
# Global variables to maintain state | |
current_session = { | |
'session_id': None, | |
'validation_started': False, | |
'all_questions': [], | |
'current_question_id': None, | |
'validation_complete': False, | |
'api_base_url': AWS_API | |
} | |
# Sample records for testing | |
SAMPLE_RECORDS = { | |
"Medical Record 1 - Multiple Issues": { | |
"UUID": "sample-001", | |
"patient_info": { | |
"name": "John Doe", | |
"age": 28 | |
}, | |
"clinical_risk": { | |
"thyroid_disease": "how are you?", | |
"anemia": None, | |
"preeclampsia_current": "not sure", | |
"geriatric_mother": "maybe", | |
"malaria_chronic_history": ".", | |
"preeclampsia_prior": "[insert explanation here]" | |
}, | |
"rmc1": { | |
"rmc1_respect": True, | |
"rmc1_explanation": "[insert explanation here]", | |
"rmc1_facility": "Test Hospital" | |
}, | |
"immunization_birth": { | |
"birth_bcg": False, | |
"birth_oral_polio": True, | |
"birth_no_vax_reason": "not sure" | |
} | |
}, | |
"Medical Record 2 - Few Issues": { | |
"UUID": "sample-002", | |
"patient_info": { | |
"name": "Jane Smith", | |
"age": 35 | |
}, | |
"clinical_risk": { | |
"thyroid_disease": True, | |
"anemia": "maybe", | |
"preeclampsia_current": False, | |
"malaria_chronic_history": "[insert explanation here]" | |
}, | |
"rmc2": { | |
"rmc2_respect": False, | |
"rmc2_explanation": ".", | |
"rmc2_facility": "Another Hospital" | |
} | |
} | |
} | |
def check_api_health(): | |
"""Check if the API is running and get feature info""" | |
try: | |
response = requests.get(f"{current_session['api_base_url']}/health", timeout=10) | |
if response.status_code == 200: | |
health_data = response.json() | |
return True, health_data | |
else: | |
return False, f"Status code: {response.status_code}" | |
except Exception as e: | |
return False, f"Error: {str(e)}" | |
def start_validation_session(record: Dict[str, Any]): | |
"""Start a validation session with the API""" | |
try: | |
response = requests.post(f"{current_session['api_base_url']}/start-validation", json=record, timeout=30) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
return None, f"Failed to start validation: {response.text}" | |
except Exception as e: | |
return None, f"Error connecting to API: {str(e)}" | |
def get_question_by_id(question_id: str): | |
"""Get a specific question by its unique ID""" | |
try: | |
response = requests.get(f"{current_session['api_base_url']}/get-question/{question_id}", timeout=15) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
return None | |
except Exception as e: | |
return None | |
def list_all_questions(session_id: str): | |
"""Get list of all questions for a session""" | |
try: | |
response = requests.get(f"{current_session['api_base_url']}/list-questions/{session_id}", timeout=15) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
return None | |
except Exception as e: | |
return None | |
def submit_response_to_question(question_id: str, user_response: str): | |
"""Submit a response to a specific question""" | |
try: | |
response = requests.post(f"{current_session['api_base_url']}/submit-response/{question_id}", | |
json={"response": user_response}, timeout=30) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
return None | |
except Exception as e: | |
return None | |
def get_final_results(session_id: str): | |
"""Get the final validation results""" | |
try: | |
response = requests.get(f"{current_session['api_base_url']}/get-results/{session_id}", timeout=15) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
return None | |
except Exception as e: | |
return None | |
# Gradio Interface Functions | |
def update_api_endpoint(api_choice, custom_url=""): | |
"""Update the API endpoint based on user selection""" | |
if api_choice == "AWS Deployment": | |
current_session['api_base_url'] = AWS_API | |
status = f"β Using AWS App Runner: {AWS_API}" | |
elif api_choice == "Local Development": | |
current_session['api_base_url'] = LOCAL_API | |
status = f"βΉοΈ Using Local API: {LOCAL_API}" | |
else: # Custom URL | |
current_session['api_base_url'] = custom_url if custom_url else AWS_API | |
status = f"π§ Using Custom URL: {current_session['api_base_url']}" | |
# Check API health | |
is_healthy, health_info = check_api_health() | |
if is_healthy: | |
if isinstance(health_info, dict): | |
status += f"\n\nπ API Status:\n" | |
status += f"β’ Version: {health_info.get('version', 'N/A')}\n" | |
status += f"β’ Active Sessions: {health_info.get('active_sessions', 0)}\n" | |
status += f"β’ Total Questions: {health_info.get('total_questions', 0)}\n" | |
features = health_info.get('features', []) | |
if features: | |
status += f"β’ Features: {', '.join(features)}" | |
else: | |
status += f"\n\nβ API Error: {health_info}" | |
return status | |
def load_sample_record(sample_choice): | |
"""Load a sample record based on user selection""" | |
if sample_choice and sample_choice in SAMPLE_RECORDS: | |
record_json = json.dumps(SAMPLE_RECORDS[sample_choice], indent=2) | |
return record_json, f"β Loaded: {sample_choice}" | |
return "", "Please select a sample record" | |
def parse_json_record(json_text): | |
"""Parse and validate JSON record""" | |
if not json_text.strip(): | |
return "β Please provide a JSON record", "" | |
try: | |
record = json.loads(json_text) | |
preview = json.dumps(record, indent=2) | |
return f"β Valid JSON record parsed successfully!", preview | |
except json.JSONDecodeError as e: | |
return f"β Invalid JSON: {str(e)}", "" | |
def start_validation(json_text): | |
"""Start the validation process""" | |
if not json_text.strip(): | |
return "β Please provide a JSON record first", "", gr.update(visible=False) | |
try: | |
record = json.loads(json_text) | |
except json.JSONDecodeError as e: | |
return f"β Invalid JSON: {str(e)}", "", gr.update(visible=False) | |
# Start validation session | |
result = start_validation_session(record) | |
if isinstance(result, tuple): # Error case | |
return f"β {result[1]}", "", gr.update(visible=False) | |
if result: | |
current_session['session_id'] = result["session_id"] | |
current_session['all_questions'] = result.get("questions", []) | |
current_session['validation_started'] = True | |
if result["total_questions"] == 0: | |
current_session['validation_complete'] = True | |
return "β No missing fields found! Record is already complete.", "", gr.update(visible=False) | |
else: | |
status = f"β Created {result['total_questions']} individual questions with unique tracking IDs\n" | |
# Show tracking info | |
tracking_info = result.get("tracking", {}) | |
if tracking_info.get("question_tracking_enabled"): | |
status += "π Each question now has a unique ID for precise tracking" | |
# Get first question | |
question_status = list_all_questions(current_session['session_id']) | |
if question_status: | |
questions_list = question_status["questions"] | |
if questions_list: | |
first_question = questions_list[0] | |
current_session['current_question_id'] = first_question['question_id'] | |
# Get question details | |
question_details = get_question_by_id(first_question['question_id']) | |
if question_details: | |
question_display = f"**Field:** {question_details['field_path']}\n" | |
question_display += f"**Question ID:** {question_details['question_id']}\n\n" | |
question_display += f"### {question_details['question']}" | |
progress_info = f"Progress: 0/{result['total_questions']} (0.0%)" | |
return status, f"{progress_info}\n\n{question_display}", gr.update(visible=True) | |
return status, "Loading questions...", gr.update(visible=True) | |
else: | |
return "β Failed to start validation session", "", gr.update(visible=False) | |
def submit_answer(answer_text): | |
"""Submit an answer to the current question""" | |
if not answer_text.strip(): | |
return "β Please provide an answer", "", gr.update(visible=True) | |
if not current_session.get('current_question_id'): | |
return "β No active question", "", gr.update(visible=True) | |
# Submit response | |
response = submit_response_to_question(current_session['current_question_id'], answer_text) | |
if not response: | |
return "β Failed to submit response", "", gr.update(visible=True) | |
# Process response | |
result_text = "" | |
if response.get("status") == "needs_clarification": | |
result_text = f"β οΈ Your answer needs clarification\n\n" | |
result_text += f"**Follow-up question:** {response['clarification_question']}\n\n" | |
result_text += "Please provide a clearer, more specific answer." | |
# Update current question to the clarification | |
current_session['current_question_id'] = response.get('clarification_question_id', current_session['current_question_id']) | |
return result_text, "", gr.update(visible=True) | |
elif response.get("status") in ["completed", "low_confidence"]: | |
validation = response["validation"] | |
confidence = validation["confidence"] | |
if response.get("status") == "completed": | |
result_text = f"β Answer accepted! (Confidence: {confidence:.2f})\n\n" | |
else: | |
result_text = f"β οΈ Answer accepted with low confidence: {confidence:.2f}\n\n" | |
# Show tracking details | |
tracking_info = response.get("tracking", {}) | |
if tracking_info: | |
result_text += f"π **Tracking Details:**\n" | |
result_text += f"β’ Question ID: {tracking_info.get('question_unique_id', 'N/A')}\n" | |
result_text += f"β’ Response Time: {tracking_info.get('response_timestamp', 'N/A')}\n" | |
result_text += f"β’ Attempt Count: {tracking_info.get('attempt_count', 'N/A')}\n" | |
result_text += f"β’ Validation Confidence: {tracking_info.get('validation_confidence', 0):.3f}\n\n" | |
# Check if all completed | |
progress = response.get("progress", {}) | |
if progress.get("all_completed"): | |
current_session['validation_complete'] = True | |
result_text += "π **All questions completed!**" | |
return result_text, "All questions have been answered. You can now view the final results.", gr.update(visible=False) | |
else: | |
# Get next question | |
question_status = list_all_questions(current_session['session_id']) | |
if question_status: | |
questions_list = question_status["questions"] | |
completed_count = question_status["completed_questions"] | |
total_count = question_status["total_questions"] | |
# Find next pending question | |
next_question = None | |
for q in questions_list: | |
if q['status'] in ['pending', 'needs_clarification']: | |
next_question = q | |
break | |
if next_question: | |
current_session['current_question_id'] = next_question['question_id'] | |
# Get question details | |
question_details = get_question_by_id(next_question['question_id']) | |
if question_details: | |
question_display = f"**Field:** {question_details['field_path']}\n" | |
question_display += f"**Question ID:** {question_details['question_id']}\n\n" | |
question_display += f"### {question_details['question']}" | |
progress_info = f"Progress: {completed_count}/{total_count} ({completed_count/total_count*100:.1f}%)" | |
return result_text, f"{progress_info}\n\n{question_display}", gr.update(visible=True) | |
return result_text, "Loading next question...", gr.update(visible=True) | |
return "β Unexpected response", "", gr.update(visible=True) | |
def get_results(): | |
"""Get the final validation results""" | |
if not current_session.get('session_id'): | |
return "β No active session" | |
results = get_final_results(current_session['session_id']) | |
if not results: | |
return "β Failed to get results" | |
# Format results | |
result_text = "# π Validation Complete!\n\n" | |
# Summary metrics | |
result_text += "## π Summary\n" | |
result_text += f"β’ **Fields Processed:** {results['summary']['total_fields_processed']}\n" | |
result_text += f"β’ **Success Rate:** {results['summary']['success_rate']:.1f}%\n" | |
result_text += f"β’ **Average Confidence:** {results['summary']['average_confidence']:.2f}\n" | |
result_text += f"β’ **Average Attempts per Question:** {results['summary']['average_attempts_per_question']:.1f}\n\n" | |
# Tracking metrics | |
result_text += "## π Question Tracking\n" | |
result_text += f"β’ **Total Questions:** {results['summary']['total_questions']}\n" | |
result_text += f"β’ **Questions with Clarification:** {results['summary']['questions_with_clarification']}\n" | |
result_text += f"β’ **Completed Questions:** {results['summary']['completed_questions']}\n\n" | |
# Updated record | |
result_text += "## π Updated Record\n" | |
result_text += "```json\n" | |
result_text += json.dumps(results["updated_record"], indent=2) | |
result_text += "\n```\n\n" | |
# Download files | |
updated_record_json = json.dumps(results["updated_record"], indent=2) | |
tracking_data = { | |
"session_info": results.get("session_info", {}), | |
"question_details": results.get("question_details", {}), | |
"summary": results.get("summary", {}) | |
} | |
tracking_json = json.dumps(tracking_data, indent=2) | |
full_results_json = json.dumps(results, indent=2) | |
return result_text, updated_record_json, tracking_json, full_results_json | |
def reset_session(): | |
"""Reset the current session""" | |
global current_session | |
session_id = current_session.get('session_id', 'new') | |
current_session = { | |
'session_id': None, | |
'validation_started': False, | |
'all_questions': [], | |
'current_question_id': None, | |
'validation_complete': False, | |
'api_base_url': current_session['api_base_url'] # Keep the API setting | |
} | |
return f"π Session reset. Ready for new validation.", "", "", gr.update(visible=False) | |
# Create Gradio Interface | |
def create_interface(): | |
with gr.Blocks(title="Medical Record Validation", theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# π₯ Medical Record Data Validation") | |
gr.Markdown("**Enhanced with Question-Level Tracking**") | |
with gr.Tab("π Setup & Configuration"): | |
gr.Markdown("## API Configuration") | |
api_choice = gr.Radio( | |
choices=["AWS Deployment", "Local Development", "Custom URL"], | |
value="AWS Deployment", | |
label="Choose API endpoint" | |
) | |
custom_url = gr.Textbox( | |
label="Custom API URL", | |
placeholder="https://your-api-url.com", | |
visible=False | |
) | |
api_status = gr.Markdown("π Checking API status...") | |
# Update API status when choice changes | |
def update_custom_url_visibility(choice): | |
return gr.update(visible=(choice == "Custom URL")) | |
api_choice.change( | |
update_custom_url_visibility, | |
inputs=[api_choice], | |
outputs=[custom_url] | |
) | |
api_choice.change( | |
update_api_endpoint, | |
inputs=[api_choice, custom_url], | |
outputs=[api_status] | |
) | |
custom_url.change( | |
update_api_endpoint, | |
inputs=[api_choice, custom_url], | |
outputs=[api_status] | |
) | |
with gr.Tab("π Record Input"): | |
gr.Markdown("## Step 1: Provide Medical Record") | |
with gr.Row(): | |
with gr.Column(): | |
sample_choice = gr.Dropdown( | |
choices=list(SAMPLE_RECORDS.keys()), | |
label="Choose from sample records", | |
value=None | |
) | |
load_sample_btn = gr.Button("Load Sample Record", variant="secondary") | |
with gr.Column(): | |
upload_file = gr.File( | |
label="Upload JSON file", | |
file_types=[".json"] | |
) | |
json_input = gr.Textbox( | |
label="Medical Record JSON", | |
placeholder="Paste your JSON record here or use the options above...", | |
lines=10 | |
) | |
parse_status = gr.Markdown("") | |
json_preview = gr.Code(label="Record Preview", language="json", visible=False) | |
validate_btn = gr.Button("π Start Validation", variant="primary", size="lg") | |
validation_status = gr.Markdown("") | |
# Event handlers | |
load_sample_btn.click( | |
load_sample_record, | |
inputs=[sample_choice], | |
outputs=[json_input, parse_status] | |
) | |
json_input.change( | |
parse_json_record, | |
inputs=[json_input], | |
outputs=[parse_status, json_preview] | |
) | |
with gr.Tab("β Answer Questions"): | |
gr.Markdown("## Step 2: Answer Validation Questions") | |
question_display = gr.Markdown("Start validation to see questions here...") | |
with gr.Row(): | |
answer_input = gr.Textbox( | |
label="Your Answer", | |
placeholder="Enter your response here...", | |
scale=3 | |
) | |
submit_btn = gr.Button("Submit Answer", variant="primary", scale=1) | |
answer_result = gr.Markdown("") | |
question_section = gr.Group(visible=False) | |
with question_section: | |
submit_btn.click( | |
submit_answer, | |
inputs=[answer_input], | |
outputs=[answer_result, question_display, question_section] | |
) | |
with gr.Tab("π Results"): | |
gr.Markdown("## Step 3: Validation Results") | |
get_results_btn = gr.Button("π₯ Get Final Results", variant="primary") | |
results_display = gr.Markdown("Complete validation to see results here...") | |
with gr.Row(): | |
download_record = gr.File(label="Updated Record", visible=False) | |
download_tracking = gr.File(label="Tracking Details", visible=False) | |
download_full = gr.File(label="Complete Results", visible=False) | |
reset_btn = gr.Button("π Start New Validation", variant="secondary") | |
reset_status = gr.Markdown("") | |
# Main event handlers | |
validate_btn.click( | |
start_validation, | |
inputs=[json_input], | |
outputs=[validation_status, question_display, question_section] | |
) | |
get_results_btn.click( | |
get_results, | |
outputs=[results_display, download_record, download_tracking, download_full] | |
) | |
reset_btn.click( | |
reset_session, | |
outputs=[reset_status, validation_status, question_display, question_section] | |
) | |
# Initialize API status | |
demo.load( | |
lambda: update_api_endpoint("AWS Deployment"), | |
outputs=[api_status] | |
) | |
return demo | |
# Launch the app | |
if __name__ == "__main__": | |
demo = create_interface() | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False | |
) |