Upload 6 files
Browse files- backend/extractors.py +57 -0
- backend/file_handler.py +49 -0
- backend/image_processor.py +101 -0
- backend/models.py +129 -0
- backend/qa_engine.py +65 -0
- backend/response_formatter.py +48 -0
backend/extractors.py
ADDED
@@ -0,0 +1,57 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from pathlib import Path
|
2 |
+
import fitz # PyMuPDF
|
3 |
+
from docx import Document
|
4 |
+
from pptx import Presentation
|
5 |
+
|
6 |
+
# PDF
|
7 |
+
def extract_text_pdf(file_path: str) -> str:
|
8 |
+
text = ""
|
9 |
+
with fitz.open(file_path) as doc:
|
10 |
+
for page in doc:
|
11 |
+
text += page.get_text()
|
12 |
+
return text.strip()
|
13 |
+
|
14 |
+
# DOCX
|
15 |
+
def extract_text_docx(file_path: str) -> dict:
|
16 |
+
doc = Document(file_path)
|
17 |
+
paragraphs = []
|
18 |
+
|
19 |
+
for para in doc.paragraphs:
|
20 |
+
text = para.text.strip()
|
21 |
+
if text: # Only include non-empty paragraphs
|
22 |
+
paragraphs.append({
|
23 |
+
"style": para.style.name,
|
24 |
+
"text": text
|
25 |
+
})
|
26 |
+
|
27 |
+
return {"content": paragraphs}
|
28 |
+
|
29 |
+
# TXT
|
30 |
+
def extract_text_txt(file_path: str) -> str:
|
31 |
+
with open(file_path, "r", encoding="utf-8") as f:
|
32 |
+
lines = f.read().splitlines() # ✅ split into clean lines
|
33 |
+
return {"content": lines}
|
34 |
+
|
35 |
+
# PPTX
|
36 |
+
def extract_text_pptx(file_path: str) -> str:
|
37 |
+
prs = Presentation(file_path)
|
38 |
+
text = []
|
39 |
+
for slide in prs.slides:
|
40 |
+
for shape in slide.shapes:
|
41 |
+
if hasattr(shape, "text"):
|
42 |
+
text.append(shape.text)
|
43 |
+
return "\n".join(text).strip()
|
44 |
+
|
45 |
+
# Dispatcher
|
46 |
+
def extract_text(file_path: str) -> str:
|
47 |
+
ext = Path(file_path).suffix.lower()
|
48 |
+
if ext == ".pdf":
|
49 |
+
return extract_text_pdf(file_path)
|
50 |
+
elif ext == ".docx":
|
51 |
+
return extract_text_docx(file_path)
|
52 |
+
elif ext == ".txt":
|
53 |
+
return extract_text_txt(file_path)
|
54 |
+
elif ext == ".pptx":
|
55 |
+
return extract_text_pptx(file_path)
|
56 |
+
else:
|
57 |
+
raise ValueError("Unsupported file extension")
|
backend/file_handler.py
ADDED
@@ -0,0 +1,49 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from fastapi import UploadFile, HTTPException
|
2 |
+
from pathlib import Path
|
3 |
+
from uuid import uuid4
|
4 |
+
|
5 |
+
# Accepted file types
|
6 |
+
ALLOWED_TYPES = {
|
7 |
+
# Document types
|
8 |
+
"application/pdf": ".pdf",
|
9 |
+
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
|
10 |
+
"text/plain": ".txt",
|
11 |
+
"application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx",
|
12 |
+
|
13 |
+
# Image types
|
14 |
+
"image/jpeg": ".jpg",
|
15 |
+
"image/jpg": ".jpg",
|
16 |
+
"image/png": ".png",
|
17 |
+
"image/gif": ".gif",
|
18 |
+
"image/bmp": ".bmp",
|
19 |
+
"image/webp": ".webp"
|
20 |
+
}
|
21 |
+
|
22 |
+
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
|
23 |
+
|
24 |
+
tmp_dir = Path("/tmp/uploads")
|
25 |
+
tmp_dir.mkdir(parents=True, exist_ok=True)
|
26 |
+
|
27 |
+
UPLOAD_DIR = tmp_dir
|
28 |
+
|
29 |
+
def save_upload(file: UploadFile) -> tuple[str, str]:
|
30 |
+
# Print file content type for debugging
|
31 |
+
print(f"File content type: {file.content_type}")
|
32 |
+
|
33 |
+
if file.content_type not in ALLOWED_TYPES:
|
34 |
+
raise HTTPException(status_code=400, detail=f"Unsupported file type: {file.content_type}")
|
35 |
+
|
36 |
+
# Read file into memory to check size and save it
|
37 |
+
file_bytes = file.file.read()
|
38 |
+
|
39 |
+
if len(file_bytes) > MAX_FILE_SIZE:
|
40 |
+
raise HTTPException(status_code=413, detail="File is too large. Maximum size is 10MB.")
|
41 |
+
|
42 |
+
file_ext = ALLOWED_TYPES[file.content_type]
|
43 |
+
file_id = str(uuid4())
|
44 |
+
file_path = UPLOAD_DIR / f"{file_id}{file_ext}"
|
45 |
+
|
46 |
+
with open(file_path, "wb") as f:
|
47 |
+
f.write(file_bytes)
|
48 |
+
|
49 |
+
return file_id, file_path.name
|
backend/image_processor.py
ADDED
@@ -0,0 +1,101 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from transformers import VisionEncoderDecoderModel, ViTImageProcessor, AutoTokenizer
|
2 |
+
from transformers import ViltProcessor, ViltForQuestionAnswering # Add these imports
|
3 |
+
from PIL import Image
|
4 |
+
import torch
|
5 |
+
from typing import Dict, Any, Union, List
|
6 |
+
|
7 |
+
class ImageProcessor:
|
8 |
+
def __init__(self, caption_model_name: str = "nlpconnect/vit-gpt2-image-captioning",
|
9 |
+
vqa_model_name: str = "dandelin/vilt-b32-finetuned-vqa"):
|
10 |
+
# Image captioning model
|
11 |
+
self.caption_processor = ViTImageProcessor.from_pretrained(caption_model_name)
|
12 |
+
self.tokenizer = AutoTokenizer.from_pretrained(caption_model_name)
|
13 |
+
self.caption_model = VisionEncoderDecoderModel.from_pretrained(caption_model_name)
|
14 |
+
|
15 |
+
# VQA model
|
16 |
+
self.vqa_processor = ViltProcessor.from_pretrained(vqa_model_name)
|
17 |
+
self.vqa_model = ViltForQuestionAnswering.from_pretrained(vqa_model_name)
|
18 |
+
|
19 |
+
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
20 |
+
self.caption_model.to(self.device)
|
21 |
+
self.vqa_model.to(self.device)
|
22 |
+
|
23 |
+
def generate_caption(self, image_path: str) -> str:
|
24 |
+
"""Generate a descriptive caption for the provided image"""
|
25 |
+
try:
|
26 |
+
image = Image.open(image_path).convert("RGB")
|
27 |
+
pixel_values = self.caption_processor(image, return_tensors="pt").pixel_values.to(self.device)
|
28 |
+
|
29 |
+
gen_kwargs = {
|
30 |
+
"max_length": 50,
|
31 |
+
"num_beams": 4,
|
32 |
+
"early_stopping": True
|
33 |
+
}
|
34 |
+
|
35 |
+
output_ids = self.caption_model.generate(pixel_values, **gen_kwargs)
|
36 |
+
caption = self.tokenizer.decode(output_ids[0], skip_special_tokens=True)
|
37 |
+
|
38 |
+
return caption
|
39 |
+
except Exception as e:
|
40 |
+
return f"Error processing image: {str(e)}"
|
41 |
+
|
42 |
+
def answer_image_question(self, image_path: str, question: str) -> Dict[str, Any]:
|
43 |
+
"""Answer a question about the provided image using a Visual QA model"""
|
44 |
+
try:
|
45 |
+
# Open image
|
46 |
+
image = Image.open(image_path).convert("RGB")
|
47 |
+
|
48 |
+
# Prepare inputs
|
49 |
+
inputs = self.vqa_processor(image, question, return_tensors="pt")
|
50 |
+
inputs = {k: v.to(self.device) for k, v in inputs.items()}
|
51 |
+
|
52 |
+
# Forward pass
|
53 |
+
with torch.no_grad():
|
54 |
+
outputs = self.vqa_model(**inputs)
|
55 |
+
|
56 |
+
# Get answer
|
57 |
+
logits = outputs.logits
|
58 |
+
idx = logits.argmax(-1).item()
|
59 |
+
answer = self.vqa_model.config.id2label[idx]
|
60 |
+
confidence = torch.softmax(logits, dim=-1)[0, idx].item()
|
61 |
+
|
62 |
+
return {"answer": answer, "confidence": confidence}
|
63 |
+
|
64 |
+
except Exception as e:
|
65 |
+
print(f"VQA Error: {str(e)}")
|
66 |
+
# Fallback to caption
|
67 |
+
try:
|
68 |
+
caption = self.generate_caption(image_path)
|
69 |
+
return {
|
70 |
+
"answer": f"Based on the image which shows {caption}, I cannot provide a specific answer.",
|
71 |
+
"confidence": 0.0
|
72 |
+
}
|
73 |
+
except Exception as e2:
|
74 |
+
return {"answer": f"Error processing image: {str(e)}, {str(e2)}", "confidence": 0.0}
|
75 |
+
|
76 |
+
|
77 |
+
# Add this to the end of your image_processor.py file
|
78 |
+
|
79 |
+
class SimpleImageProcessor:
|
80 |
+
"""A simple fallback image processor that doesn't require external models"""
|
81 |
+
|
82 |
+
def __init__(self):
|
83 |
+
"""Initialize without any models"""
|
84 |
+
print("Using SimpleImageProcessor fallback")
|
85 |
+
|
86 |
+
def generate_caption(self, image_path: str) -> str:
|
87 |
+
"""Generate a basic caption for the provided image"""
|
88 |
+
try:
|
89 |
+
# Just extract basic image information
|
90 |
+
from PIL import Image
|
91 |
+
img = Image.open(image_path)
|
92 |
+
return f"an image of size {img.width}x{img.height}"
|
93 |
+
except Exception as e:
|
94 |
+
return f"an image (could not process: {str(e)})"
|
95 |
+
|
96 |
+
def answer_image_question(self, image_path: str, question: str) -> Dict[str, Any]:
|
97 |
+
"""Provide a fallback answer for image questions"""
|
98 |
+
return {
|
99 |
+
"answer": "I cannot analyze this image right now. The image processing system is not fully functional.",
|
100 |
+
"confidence": 0.0
|
101 |
+
}
|
backend/models.py
ADDED
@@ -0,0 +1,129 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from transformers import pipeline, BlipProcessor, Blip2ForConditionalGeneration
|
2 |
+
import torch
|
3 |
+
import os
|
4 |
+
from typing import Dict, Any, Optional
|
5 |
+
from PIL import Image
|
6 |
+
|
7 |
+
# Singleton model manager
|
8 |
+
class ModelManager:
|
9 |
+
_instance = None
|
10 |
+
|
11 |
+
@classmethod
|
12 |
+
def get_instance(cls):
|
13 |
+
if cls._instance is None:
|
14 |
+
cls._instance = cls()
|
15 |
+
return cls._instance
|
16 |
+
|
17 |
+
def __init__(self):
|
18 |
+
self.pipelines = {}
|
19 |
+
self.specialized_models = {}
|
20 |
+
|
21 |
+
# Set default models - preferably small ones that are publicly accessible
|
22 |
+
self.model_configs = {
|
23 |
+
"image_captioning": {
|
24 |
+
"name": "Salesforce/blip-image-captioning-base", # Public model
|
25 |
+
"type": "image-to-text"
|
26 |
+
},
|
27 |
+
"document_qa": {
|
28 |
+
"name": "impira/layoutlm-document-qa", # Document-specialized model
|
29 |
+
"type": "document-question-answering"
|
30 |
+
},
|
31 |
+
"blip2_vqa": {
|
32 |
+
"name": "Salesforce/blip2-flan-t5-xl", # BLIP-2 model for improved VQA
|
33 |
+
"type": "specialized"
|
34 |
+
}
|
35 |
+
}
|
36 |
+
|
37 |
+
# Check if CUDA is available
|
38 |
+
self.device = "cuda" if torch.cuda.is_available() else "cpu"
|
39 |
+
|
40 |
+
def load_pipeline(self, pipeline_type: str) -> bool:
|
41 |
+
"""Load a specific pipeline if it's not already loaded"""
|
42 |
+
if pipeline_type not in self.model_configs:
|
43 |
+
return False
|
44 |
+
|
45 |
+
if pipeline_type in self.pipelines or pipeline_type in self.specialized_models:
|
46 |
+
return True
|
47 |
+
|
48 |
+
config = self.model_configs[pipeline_type]
|
49 |
+
model_name = config["name"]
|
50 |
+
|
51 |
+
try:
|
52 |
+
if config["type"] == "document-question-answering":
|
53 |
+
self.pipelines[pipeline_type] = pipeline("document-question-answering", model=model_name)
|
54 |
+
elif config["type"] == "image-to-text":
|
55 |
+
self.pipelines[pipeline_type] = pipeline("image-to-text", model=model_name)
|
56 |
+
elif config["type"] == "specialized" and pipeline_type == "blip2_vqa":
|
57 |
+
# Load BLIP-2 model components
|
58 |
+
processor = BlipProcessor.from_pretrained(model_name)
|
59 |
+
model = Blip2ForConditionalGeneration.from_pretrained(
|
60 |
+
model_name,
|
61 |
+
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32
|
62 |
+
)
|
63 |
+
model.to(self.device)
|
64 |
+
|
65 |
+
self.specialized_models[pipeline_type] = {
|
66 |
+
"processor": processor,
|
67 |
+
"model": model
|
68 |
+
}
|
69 |
+
|
70 |
+
return True
|
71 |
+
except Exception as e:
|
72 |
+
print(f"Error loading pipeline {model_name}: {str(e)}")
|
73 |
+
return False
|
74 |
+
|
75 |
+
def get_pipeline(self, pipeline_type: str) -> Optional[Any]:
|
76 |
+
"""Get a loaded pipeline or load it if not already loaded"""
|
77 |
+
if pipeline_type not in self.pipelines and \
|
78 |
+
pipeline_type not in self.specialized_models and \
|
79 |
+
not self.load_pipeline(pipeline_type):
|
80 |
+
return None
|
81 |
+
|
82 |
+
if pipeline_type in self.pipelines:
|
83 |
+
return self.pipelines[pipeline_type]
|
84 |
+
else:
|
85 |
+
return self.specialized_models[pipeline_type]
|
86 |
+
|
87 |
+
def blip2_vqa(self, image_path: str, question: str, max_length: int = 100) -> str:
|
88 |
+
"""
|
89 |
+
Perform VQA using BLIP-2 model
|
90 |
+
|
91 |
+
Args:
|
92 |
+
image_path: Path to the image file
|
93 |
+
question: The question to ask about the image
|
94 |
+
max_length: Maximum length of generated answer
|
95 |
+
|
96 |
+
Returns:
|
97 |
+
Answer to the question
|
98 |
+
"""
|
99 |
+
# Make sure the model is loaded
|
100 |
+
if not self.load_pipeline("blip2_vqa"):
|
101 |
+
return "Model could not be loaded"
|
102 |
+
|
103 |
+
# Get the model components
|
104 |
+
components = self.specialized_models["blip2_vqa"]
|
105 |
+
processor = components["processor"]
|
106 |
+
model = components["model"]
|
107 |
+
|
108 |
+
try:
|
109 |
+
# Load and process the image
|
110 |
+
image = Image.open(image_path).convert('RGB')
|
111 |
+
|
112 |
+
# Process inputs
|
113 |
+
inputs = processor(image, question, return_tensors="pt").to(self.device)
|
114 |
+
|
115 |
+
# Generate answer
|
116 |
+
with torch.no_grad():
|
117 |
+
generated_ids = model.generate(
|
118 |
+
**inputs,
|
119 |
+
max_length=max_length,
|
120 |
+
do_sample=False,
|
121 |
+
num_beams=5,
|
122 |
+
)
|
123 |
+
|
124 |
+
# Decode the generated answer
|
125 |
+
answer = processor.decode(generated_ids[0], skip_special_tokens=True)
|
126 |
+
return answer
|
127 |
+
|
128 |
+
except Exception as e:
|
129 |
+
return f"Error processing image: {str(e)}"
|
backend/qa_engine.py
ADDED
@@ -0,0 +1,65 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from transformers import pipeline
|
2 |
+
import torch
|
3 |
+
from typing import Dict, List, Any
|
4 |
+
|
5 |
+
class QAEngine:
|
6 |
+
def __init__(self, model_name: str = "deepset/roberta-base-squad2"):
|
7 |
+
# Use the pipeline API which works better with Hugging Face Spaces
|
8 |
+
self.qa_pipeline = pipeline("question-answering", model=model_name)
|
9 |
+
|
10 |
+
def answer_question(self, context: str, question: str) -> Dict[str, Any]:
|
11 |
+
"""Answer a question based on the provided context"""
|
12 |
+
try:
|
13 |
+
# Use the pipeline directly
|
14 |
+
result = self.qa_pipeline(question=question, context=context)
|
15 |
+
|
16 |
+
return {
|
17 |
+
"answer": result["answer"],
|
18 |
+
"confidence": result["score"],
|
19 |
+
"start_position": result["start"],
|
20 |
+
"end_position": result["end"]
|
21 |
+
}
|
22 |
+
except Exception as e:
|
23 |
+
return {
|
24 |
+
"answer": f"Error processing question: {str(e)}",
|
25 |
+
"confidence": 0.0,
|
26 |
+
"start_position": 0,
|
27 |
+
"end_position": 0
|
28 |
+
}
|
29 |
+
|
30 |
+
def answer_multiple_questions(self, context: str, questions: List[str]) -> List[Dict[str, Any]]:
|
31 |
+
"""Answer multiple questions from the same context"""
|
32 |
+
return [self.answer_question(context, question) for question in questions]
|
33 |
+
|
34 |
+
|
35 |
+
# Add this to qa_engine.py
|
36 |
+
class SimpleQAEngine:
|
37 |
+
"""A simple QA engine that doesn't rely on complex models"""
|
38 |
+
|
39 |
+
def answer_question(self, context: str, question: str) -> Dict[str, Any]:
|
40 |
+
"""Basic keyword-based answer extraction (fallback when models fail)"""
|
41 |
+
# Very basic implementation - just finds sentences with keywords from the question
|
42 |
+
from nltk.tokenize import sent_tokenize
|
43 |
+
try:
|
44 |
+
import nltk
|
45 |
+
nltk.download('punkt', quiet=True)
|
46 |
+
except:
|
47 |
+
pass
|
48 |
+
|
49 |
+
question_words = set(question.lower().split())
|
50 |
+
best_sentence = ""
|
51 |
+
best_score = 0
|
52 |
+
|
53 |
+
for sentence in sent_tokenize(context):
|
54 |
+
sentence_words = set(sentence.lower().split())
|
55 |
+
overlap = len(question_words.intersection(sentence_words))
|
56 |
+
if overlap > best_score:
|
57 |
+
best_score = overlap
|
58 |
+
best_sentence = sentence
|
59 |
+
|
60 |
+
return {
|
61 |
+
"answer": best_sentence if best_score > 0 else "No relevant information found.",
|
62 |
+
"confidence": min(best_score / max(1, len(question_words)), 1.0),
|
63 |
+
"start_position": context.find(best_sentence) if best_sentence in context else 0,
|
64 |
+
"end_position": context.find(best_sentence) + len(best_sentence) if best_sentence in context else 0
|
65 |
+
}
|
backend/response_formatter.py
ADDED
@@ -0,0 +1,48 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from typing import Dict, Any, List, Union
|
2 |
+
|
3 |
+
class ResponseFormatter:
|
4 |
+
@staticmethod
|
5 |
+
def format_document_qa_response(qa_result: Dict[str, Any], document_name: str) -> Dict[str, Any]:
|
6 |
+
"""Format the response from the QA engine for document questions"""
|
7 |
+
formatted_response = {
|
8 |
+
"document": document_name,
|
9 |
+
"answer": qa_result.get("answer", "No answer found"),
|
10 |
+
"confidence": round(qa_result.get("confidence", 0) * 100, 2),
|
11 |
+
"metadata": {
|
12 |
+
"source_type": "document",
|
13 |
+
"timestamp": qa_result.get("timestamp")
|
14 |
+
}
|
15 |
+
}
|
16 |
+
|
17 |
+
# Add highlighted text positions if available
|
18 |
+
if "start_position" in qa_result and "end_position" in qa_result:
|
19 |
+
formatted_response["highlight"] = {
|
20 |
+
"start": qa_result["start_position"],
|
21 |
+
"end": qa_result["end_position"]
|
22 |
+
}
|
23 |
+
|
24 |
+
return formatted_response
|
25 |
+
|
26 |
+
@staticmethod
|
27 |
+
def format_image_qa_response(vqa_result: Dict[str, Any], image_name: str) -> Dict[str, Any]:
|
28 |
+
"""Format the response from the image QA engine"""
|
29 |
+
formatted_response = {
|
30 |
+
"image": image_name,
|
31 |
+
"answer": vqa_result.get("answer", "No answer found"),
|
32 |
+
"confidence": round(vqa_result.get("confidence", 0) * 100, 2),
|
33 |
+
"metadata": {
|
34 |
+
"source_type": "image",
|
35 |
+
"timestamp": vqa_result.get("timestamp")
|
36 |
+
}
|
37 |
+
}
|
38 |
+
|
39 |
+
return formatted_response
|
40 |
+
|
41 |
+
@staticmethod
|
42 |
+
def format_error_response(error_message: str, status_code: int = 400) -> Dict[str, Any]:
|
43 |
+
"""Format error responses"""
|
44 |
+
return {
|
45 |
+
"error": True,
|
46 |
+
"message": error_message,
|
47 |
+
"status_code": status_code
|
48 |
+
}
|