oussamatahkoubit commited on
Commit
535c49d
·
verified ·
1 Parent(s): 7cccd78

Delete backend

Browse files
backend/extractors.py DELETED
@@ -1,57 +0,0 @@
1
- from pathlib import Path
2
- import fitz # PyMuPDF
3
- from docx import Document
4
- from pptx import Presentation
5
-
6
- # PDF
7
- def extract_text_pdf(file_path: str) -> str:
8
- text = ""
9
- with fitz.open(file_path) as doc:
10
- for page in doc:
11
- text += page.get_text()
12
- return text.strip()
13
-
14
- # DOCX
15
- def extract_text_docx(file_path: str) -> dict:
16
- doc = Document(file_path)
17
- paragraphs = []
18
-
19
- for para in doc.paragraphs:
20
- text = para.text.strip()
21
- if text: # Only include non-empty paragraphs
22
- paragraphs.append({
23
- "style": para.style.name,
24
- "text": text
25
- })
26
-
27
- return {"content": paragraphs}
28
-
29
- # TXT
30
- def extract_text_txt(file_path: str) -> str:
31
- with open(file_path, "r", encoding="utf-8") as f:
32
- lines = f.read().splitlines() # ✅ split into clean lines
33
- return {"content": lines}
34
-
35
- # PPTX
36
- def extract_text_pptx(file_path: str) -> str:
37
- prs = Presentation(file_path)
38
- text = []
39
- for slide in prs.slides:
40
- for shape in slide.shapes:
41
- if hasattr(shape, "text"):
42
- text.append(shape.text)
43
- return "\n".join(text).strip()
44
-
45
- # Dispatcher
46
- def extract_text(file_path: str) -> str:
47
- ext = Path(file_path).suffix.lower()
48
- if ext == ".pdf":
49
- return extract_text_pdf(file_path)
50
- elif ext == ".docx":
51
- return extract_text_docx(file_path)
52
- elif ext == ".txt":
53
- return extract_text_txt(file_path)
54
- elif ext == ".pptx":
55
- return extract_text_pptx(file_path)
56
- else:
57
- raise ValueError("Unsupported file extension")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
backend/file_handler.py DELETED
@@ -1,49 +0,0 @@
1
- from fastapi import UploadFile, HTTPException
2
- from pathlib import Path
3
- from uuid import uuid4
4
-
5
- # Accepted file types
6
- ALLOWED_TYPES = {
7
- # Document types
8
- "application/pdf": ".pdf",
9
- "application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
10
- "text/plain": ".txt",
11
- "application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx",
12
-
13
- # Image types
14
- "image/jpeg": ".jpg",
15
- "image/jpg": ".jpg",
16
- "image/png": ".png",
17
- "image/gif": ".gif",
18
- "image/bmp": ".bmp",
19
- "image/webp": ".webp"
20
- }
21
-
22
- MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
23
-
24
- tmp_dir = Path("/tmp/uploads")
25
- tmp_dir.mkdir(parents=True, exist_ok=True)
26
-
27
- UPLOAD_DIR = tmp_dir
28
-
29
- def save_upload(file: UploadFile) -> tuple[str, str]:
30
- # Print file content type for debugging
31
- print(f"File content type: {file.content_type}")
32
-
33
- if file.content_type not in ALLOWED_TYPES:
34
- raise HTTPException(status_code=400, detail=f"Unsupported file type: {file.content_type}")
35
-
36
- # Read file into memory to check size and save it
37
- file_bytes = file.file.read()
38
-
39
- if len(file_bytes) > MAX_FILE_SIZE:
40
- raise HTTPException(status_code=413, detail="File is too large. Maximum size is 10MB.")
41
-
42
- file_ext = ALLOWED_TYPES[file.content_type]
43
- file_id = str(uuid4())
44
- file_path = UPLOAD_DIR / f"{file_id}{file_ext}"
45
-
46
- with open(file_path, "wb") as f:
47
- f.write(file_bytes)
48
-
49
- return file_id, file_path.name
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
backend/image_processor.py DELETED
@@ -1,101 +0,0 @@
1
- from transformers import VisionEncoderDecoderModel, ViTImageProcessor, AutoTokenizer
2
- from transformers import ViltProcessor, ViltForQuestionAnswering # Add these imports
3
- from PIL import Image
4
- import torch
5
- from typing import Dict, Any, Union, List
6
-
7
- class ImageProcessor:
8
- def __init__(self, caption_model_name: str = "nlpconnect/vit-gpt2-image-captioning",
9
- vqa_model_name: str = "dandelin/vilt-b32-finetuned-vqa"):
10
- # Image captioning model
11
- self.caption_processor = ViTImageProcessor.from_pretrained(caption_model_name)
12
- self.tokenizer = AutoTokenizer.from_pretrained(caption_model_name)
13
- self.caption_model = VisionEncoderDecoderModel.from_pretrained(caption_model_name)
14
-
15
- # VQA model
16
- self.vqa_processor = ViltProcessor.from_pretrained(vqa_model_name)
17
- self.vqa_model = ViltForQuestionAnswering.from_pretrained(vqa_model_name)
18
-
19
- self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
20
- self.caption_model.to(self.device)
21
- self.vqa_model.to(self.device)
22
-
23
- def generate_caption(self, image_path: str) -> str:
24
- """Generate a descriptive caption for the provided image"""
25
- try:
26
- image = Image.open(image_path).convert("RGB")
27
- pixel_values = self.caption_processor(image, return_tensors="pt").pixel_values.to(self.device)
28
-
29
- gen_kwargs = {
30
- "max_length": 50,
31
- "num_beams": 4,
32
- "early_stopping": True
33
- }
34
-
35
- output_ids = self.caption_model.generate(pixel_values, **gen_kwargs)
36
- caption = self.tokenizer.decode(output_ids[0], skip_special_tokens=True)
37
-
38
- return caption
39
- except Exception as e:
40
- return f"Error processing image: {str(e)}"
41
-
42
- def answer_image_question(self, image_path: str, question: str) -> Dict[str, Any]:
43
- """Answer a question about the provided image using a Visual QA model"""
44
- try:
45
- # Open image
46
- image = Image.open(image_path).convert("RGB")
47
-
48
- # Prepare inputs
49
- inputs = self.vqa_processor(image, question, return_tensors="pt")
50
- inputs = {k: v.to(self.device) for k, v in inputs.items()}
51
-
52
- # Forward pass
53
- with torch.no_grad():
54
- outputs = self.vqa_model(**inputs)
55
-
56
- # Get answer
57
- logits = outputs.logits
58
- idx = logits.argmax(-1).item()
59
- answer = self.vqa_model.config.id2label[idx]
60
- confidence = torch.softmax(logits, dim=-1)[0, idx].item()
61
-
62
- return {"answer": answer, "confidence": confidence}
63
-
64
- except Exception as e:
65
- print(f"VQA Error: {str(e)}")
66
- # Fallback to caption
67
- try:
68
- caption = self.generate_caption(image_path)
69
- return {
70
- "answer": f"Based on the image which shows {caption}, I cannot provide a specific answer.",
71
- "confidence": 0.0
72
- }
73
- except Exception as e2:
74
- return {"answer": f"Error processing image: {str(e)}, {str(e2)}", "confidence": 0.0}
75
-
76
-
77
- # Add this to the end of your image_processor.py file
78
-
79
- class SimpleImageProcessor:
80
- """A simple fallback image processor that doesn't require external models"""
81
-
82
- def __init__(self):
83
- """Initialize without any models"""
84
- print("Using SimpleImageProcessor fallback")
85
-
86
- def generate_caption(self, image_path: str) -> str:
87
- """Generate a basic caption for the provided image"""
88
- try:
89
- # Just extract basic image information
90
- from PIL import Image
91
- img = Image.open(image_path)
92
- return f"an image of size {img.width}x{img.height}"
93
- except Exception as e:
94
- return f"an image (could not process: {str(e)})"
95
-
96
- def answer_image_question(self, image_path: str, question: str) -> Dict[str, Any]:
97
- """Provide a fallback answer for image questions"""
98
- return {
99
- "answer": "I cannot analyze this image right now. The image processing system is not fully functional.",
100
- "confidence": 0.0
101
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
backend/models.py DELETED
@@ -1,61 +0,0 @@
1
- from transformers import pipeline
2
- import os
3
- from typing import Dict, Any, Optional
4
-
5
- # Singleton model manager
6
- class ModelManager:
7
- _instance = None
8
-
9
- @classmethod
10
- def get_instance(cls):
11
- if cls._instance is None:
12
- cls._instance = cls()
13
- return cls._instance
14
-
15
- def __init__(self):
16
- self.pipelines = {}
17
-
18
- # Set default models - preferably small ones that are publicly accessible
19
- self.model_configs = {
20
- "document_qa": {
21
- "name": "distilbert-base-cased-distilled-squad", # Smaller, public model
22
- "type": "question-answering"
23
- },
24
- #"image_captioning": {
25
- # "name": "Salesforce/blip-image-captioning-base", # Public model
26
- # "type": "image-to-text"
27
- #},
28
- "document_qa": {
29
- "name": "impira/layoutlm-document-qa", # Document-specialized model
30
- "type": "document-question-answering"
31
- }
32
- }
33
-
34
- def load_pipeline(self, pipeline_type: str) -> bool:
35
- """Load a specific pipeline if it's not already loaded"""
36
- if pipeline_type not in self.model_configs:
37
- return False
38
-
39
- if pipeline_type in self.pipelines:
40
- return True
41
-
42
- config = self.model_configs[pipeline_type]
43
- model_name = config["name"]
44
-
45
- try:
46
- if config["type"] == "document-question-answering":
47
- self.pipelines[pipeline_type] = pipeline("document-question-answering", model=model_name)
48
- elif config["type"] == "image-to-text":
49
- self.pipelines[pipeline_type] = pipeline("image-to-text", model=model_name)
50
-
51
-
52
- return True
53
- except Exception as e:
54
- print(f"Error loading pipeline {model_name}: {str(e)}")
55
- return False
56
-
57
- def get_pipeline(self, pipeline_type: str) -> Optional[Any]:
58
- """Get a loaded pipeline or load it if not already loaded"""
59
- if pipeline_type not in self.pipelines and not self.load_pipeline(pipeline_type):
60
- return None
61
- return self.pipelines[pipeline_type]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
backend/qa_engine.py DELETED
@@ -1,65 +0,0 @@
1
- from transformers import pipeline
2
- import torch
3
- from typing import Dict, List, Any
4
-
5
- class QAEngine:
6
- def __init__(self, model_name: str = "deepset/roberta-base-squad2"):
7
- # Use the pipeline API which works better with Hugging Face Spaces
8
- self.qa_pipeline = pipeline("question-answering", model=model_name)
9
-
10
- def answer_question(self, context: str, question: str) -> Dict[str, Any]:
11
- """Answer a question based on the provided context"""
12
- try:
13
- # Use the pipeline directly
14
- result = self.qa_pipeline(question=question, context=context)
15
-
16
- return {
17
- "answer": result["answer"],
18
- "confidence": result["score"],
19
- "start_position": result["start"],
20
- "end_position": result["end"]
21
- }
22
- except Exception as e:
23
- return {
24
- "answer": f"Error processing question: {str(e)}",
25
- "confidence": 0.0,
26
- "start_position": 0,
27
- "end_position": 0
28
- }
29
-
30
- def answer_multiple_questions(self, context: str, questions: List[str]) -> List[Dict[str, Any]]:
31
- """Answer multiple questions from the same context"""
32
- return [self.answer_question(context, question) for question in questions]
33
-
34
-
35
- # Add this to qa_engine.py
36
- class SimpleQAEngine:
37
- """A simple QA engine that doesn't rely on complex models"""
38
-
39
- def answer_question(self, context: str, question: str) -> Dict[str, Any]:
40
- """Basic keyword-based answer extraction (fallback when models fail)"""
41
- # Very basic implementation - just finds sentences with keywords from the question
42
- from nltk.tokenize import sent_tokenize
43
- try:
44
- import nltk
45
- nltk.download('punkt', quiet=True)
46
- except:
47
- pass
48
-
49
- question_words = set(question.lower().split())
50
- best_sentence = ""
51
- best_score = 0
52
-
53
- for sentence in sent_tokenize(context):
54
- sentence_words = set(sentence.lower().split())
55
- overlap = len(question_words.intersection(sentence_words))
56
- if overlap > best_score:
57
- best_score = overlap
58
- best_sentence = sentence
59
-
60
- return {
61
- "answer": best_sentence if best_score > 0 else "No relevant information found.",
62
- "confidence": min(best_score / max(1, len(question_words)), 1.0),
63
- "start_position": context.find(best_sentence) if best_sentence in context else 0,
64
- "end_position": context.find(best_sentence) + len(best_sentence) if best_sentence in context else 0
65
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
backend/response_formatter.py DELETED
@@ -1,48 +0,0 @@
1
- from typing import Dict, Any, List, Union
2
-
3
- class ResponseFormatter:
4
- @staticmethod
5
- def format_document_qa_response(qa_result: Dict[str, Any], document_name: str) -> Dict[str, Any]:
6
- """Format the response from the QA engine for document questions"""
7
- formatted_response = {
8
- "document": document_name,
9
- "answer": qa_result.get("answer", "No answer found"),
10
- "confidence": round(qa_result.get("confidence", 0) * 100, 2),
11
- "metadata": {
12
- "source_type": "document",
13
- "timestamp": qa_result.get("timestamp")
14
- }
15
- }
16
-
17
- # Add highlighted text positions if available
18
- if "start_position" in qa_result and "end_position" in qa_result:
19
- formatted_response["highlight"] = {
20
- "start": qa_result["start_position"],
21
- "end": qa_result["end_position"]
22
- }
23
-
24
- return formatted_response
25
-
26
- @staticmethod
27
- def format_image_qa_response(vqa_result: Dict[str, Any], image_name: str) -> Dict[str, Any]:
28
- """Format the response from the image QA engine"""
29
- formatted_response = {
30
- "image": image_name,
31
- "answer": vqa_result.get("answer", "No answer found"),
32
- "confidence": round(vqa_result.get("confidence", 0) * 100, 2),
33
- "metadata": {
34
- "source_type": "image",
35
- "timestamp": vqa_result.get("timestamp")
36
- }
37
- }
38
-
39
- return formatted_response
40
-
41
- @staticmethod
42
- def format_error_response(error_message: str, status_code: int = 400) -> Dict[str, Any]:
43
- """Format error responses"""
44
- return {
45
- "error": True,
46
- "message": error_message,
47
- "status_code": status_code
48
- }