Spaces:
Sleeping
Sleeping
| import logging | |
| from fastapi import FastAPI, UploadFile, File | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| import tempfile | |
| import os | |
| from transformers import pipeline | |
| import pdfplumber | |
| import docx | |
| # Optional easyocr import | |
| try: | |
| import easyocr | |
| _easyocr_reader = easyocr.Reader(['en'], gpu=False) | |
| except Exception: | |
| _easyocr_reader = None | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| logging.basicConfig(level=logging.INFO) | |
| _SUMMARIZER = None | |
| _SUMMARIZER_FALLBACK = None | |
| _SENTIMENT = None | |
| SAFE_TEXT_CHARS = 4000 | |
| # Model Cache/Lazy Loader | |
| def get_summarizer(): | |
| global _SUMMARIZER, _SUMMARIZER_FALLBACK | |
| if _SUMMARIZER is None: | |
| try: | |
| _SUMMARIZER = pipeline("summarization", model="facebook/bart-large-cnn", device=-1) | |
| logging.info("Loaded facebook/bart-large-cnn summarizer (primary)") | |
| except Exception as e: | |
| logging.warning(f"Could not load bart-large-cnn: {str(e)}. Falling back to t5-small.") | |
| _SUMMARIZER_FALLBACK = pipeline("summarization", model="t5-small", device=-1) | |
| return _SUMMARIZER or _SUMMARIZER_FALLBACK | |
| def get_sentiment(): | |
| global _SENTIMENT | |
| if _SENTIMENT is None: | |
| _SENTIMENT = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english", device=-1) | |
| logging.info("Loaded distilbert-base-uncased-finetuned-sst-2-english for sentiment.") | |
| return _SENTIMENT | |
| # ========== File Extraction Handlers ========== | |
| def extract_text_pdf(b: bytes): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp: | |
| tmp.write(b) | |
| tmp.flush() | |
| path = tmp.name | |
| try: | |
| with pdfplumber.open(path) as pdf: | |
| text = "\n".join((p.extract_text() or "") for p in pdf.pages) | |
| except Exception as e: | |
| raise ValueError(f"PDF parsing failed: {e}") | |
| finally: | |
| try: os.unlink(path) | |
| except Exception: pass | |
| return text | |
| def extract_text_docx(b: bytes): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".docx") as tmp: | |
| tmp.write(b) | |
| tmp.flush() | |
| path = tmp.name | |
| try: | |
| paragraphs = docx.Document(path).paragraphs | |
| text = "\n".join(p.text for p in paragraphs) | |
| except Exception as e: | |
| raise ValueError(f"DOCX parsing failed: {e}") | |
| finally: | |
| try: os.unlink(path) | |
| except Exception: pass | |
| return text | |
| def extract_text_txt(b: bytes): | |
| try: | |
| return b.decode("utf-8", errors="ignore") | |
| except Exception: | |
| return str(b) | |
| def extract_text_image(b: bytes): | |
| if _easyocr_reader is not None: | |
| try: | |
| import numpy as np | |
| import cv2 | |
| arr = np.frombuffer(b, np.uint8) | |
| img = cv2.imdecode(arr, cv2.IMREAD_COLOR) | |
| result = _easyocr_reader.readtext(img, detail=0, paragraph=True) | |
| return "\n".join(result) | |
| except Exception: | |
| pass | |
| # Fallback BLIP if easyocr not present | |
| try: | |
| from transformers import pipeline as hf_pipeline | |
| image_to_text = hf_pipeline("image-to-text", model="Salesforce/blip-image-captioning-base") | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as tmp: | |
| tmp.write(b) | |
| tmp.flush() | |
| path = tmp.name | |
| text = image_to_text(path)[0]["generated_text"] | |
| os.unlink(path) | |
| return text | |
| except Exception: | |
| return None | |
| # ========== Main Endpoint ========== | |
| def home(): | |
| return {"message": "Document Analyzer ML API is running!"} | |
| async def analyze(file: UploadFile = File(...)): | |
| try: | |
| contents = await file.read() | |
| filename = file.filename or "uploaded_file" | |
| ext = (filename.split('.')[-1].lower() if '.' in filename else "") | |
| logging.info(f"Loaded file: {filename}") | |
| if not contents or len(contents) == 0: | |
| logging.warning("File is empty.") | |
| return JSONResponse({"fileName": filename, "summary": "", "keywords": [], "sentiment": "neutral"}, status_code=400) | |
| # Dispatch extraction | |
| text = "" | |
| try: | |
| if ext == "pdf": | |
| text = extract_text_pdf(contents) | |
| elif ext == "docx": | |
| text = extract_text_docx(contents) | |
| elif ext in ("txt", "log", "csv"): | |
| text = extract_text_txt(contents) | |
| elif ext in ("jpg", "jpeg", "png", "bmp", "tiff"): | |
| t = extract_text_image(contents) | |
| if not t: | |
| raise ValueError("Image could not be processed.") | |
| text = t | |
| else: | |
| text = extract_text_txt(contents) | |
| except Exception as e: | |
| logging.error(f"Extract failed: {str(e)}") | |
| status = 422 if ext in ("pdf", "docx") else 400 | |
| return JSONResponse({"fileName": filename, "summary": "", "keywords": [], "sentiment": "neutral"}, status_code=status) | |
| if not text or len(text.strip()) == 0: | |
| logging.warning("No extractable text found in file.") | |
| return JSONResponse({"fileName": filename, "summary": "", "keywords": [], "sentiment": "neutral"}, status_code=400) | |
| if len(text) > SAFE_TEXT_CHARS: | |
| text = text[:SAFE_TEXT_CHARS] | |
| logging.info(f"Extracted text length: {len(text)}") | |
| # SUMMARY | |
| try: | |
| summarizer = get_summarizer() | |
| summarizer_input = text[:2048] # token safety | |
| summary_out = summarizer(summarizer_input, max_length=180, min_length=40, truncation=True)[0]["summary_text"] | |
| logging.info("Summary OK") | |
| except Exception as e: | |
| summary_out = "" | |
| logging.error(f"Summary error: {str(e)}") | |
| # KEYWORDS | |
| try: | |
| import yake | |
| kw_extractor = yake.KeywordExtractor(top=10) | |
| kw = kw_extractor.extract_keywords(text) | |
| keywords = [k[0] for k in kw][:10] if kw else [] | |
| except Exception as e: | |
| keywords = [] | |
| logging.error(f"Keywords error: {str(e)}") | |
| logging.info(f"Keywords: {len(keywords)}") | |
| # SENTIMENT | |
| try: | |
| sentiment_pipe = get_sentiment() | |
| senti_result = sentiment_pipe(text[:512]) | |
| sentiment = (senti_result[0]["label"].lower() if senti_result and "label" in senti_result[0] else "neutral") | |
| except Exception as e: | |
| sentiment = "neutral" | |
| logging.error(f"Sentiment error: {str(e)}") | |
| logging.info(f"Sentiment: {sentiment}") | |
| # ALWAYS flat response, no nesting, always the same keys | |
| return { | |
| "fileName": filename, | |
| "summary": summary_out, | |
| "keywords": keywords, | |
| "sentiment": sentiment | |
| } | |
| except Exception as e: | |
| logging.error("/analyze error: %s", str(e)) | |
| return JSONResponse({ | |
| "fileName": file.filename if file else None, | |
| "summary": "", | |
| "keywords": [], | |
| "sentiment": "neutral" | |
| }, status_code=500) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860) | |