VocRT / fastAPI.py
Anurag
version-2 initial version
5306da4
import threading
import uvicorn
import asyncio
from pathlib import Path
from fastapi import FastAPI, File, UploadFile, HTTPException, Form, status
from pydantic import BaseModel, HttpUrl
from chat_database import create_chat_entry, get_all_chat_details, rename_chat_title, save_context_detail, clear_context_detail, delete_chat, save_system_prompt
from fastapi.responses import JSONResponse
from pdfminer.high_level import extract_text
from io import BytesIO
import httpx
import pandas as pd
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from embeddings import get_and_store_embeddings
from qdrent import delete_embeddings
import re
from readability import Document as ReadabilityDocument
from providers.ppt_and_docx_helper import extract_text_from_docx, extract_text_from_pptx
ALLOWED_EXT = {
".pdf", ".csv", ".txt",
".ppt", ".pptx",
".doc", ".docx",
".xls", ".xlsx"
}
app = FastAPI()
class Document(BaseModel):
session_id: str
data: str
filename: str
class RenameChatRequest(BaseModel):
sessionId: str
title: str
class SavePromptRequest(BaseModel):
sessionId: str
prompt: str
class LinkInput(BaseModel):
link: HttpUrl
sessionId: str
title: str
summary: str
categories: str
class TextInput(BaseModel):
text: str
sessionId: str
title: str
name: str
summary: str
categories: str
class clearContextInput(BaseModel):
sessionId: str
@app.get('/get-chats')
async def get_chat_names():
chat_history = get_all_chat_details()
return chat_history
@app.post('/create-chat/{sessionId}')
async def createChat(sessionId: str):
chat_history = create_chat_entry(sessionId)
return chat_history
@app.post('/save-prompt')
async def savePrompt(req: SavePromptRequest):
saved = save_system_prompt(req.sessionId, req.prompt)
return saved
@app.post('/rename-chat')
async def renameChat(req: RenameChatRequest):
renamed = rename_chat_title(req.sessionId, req.title)
return renamed
def _process_documents(contents: bytes, session_id: str, name: str, title: str, summary: str, categories: str) -> str:
ext = Path(name).suffix.lower()
# 1) extract text (blocking)
if ext == ".pdf":
text = extract_text(BytesIO(contents))
elif ext in {".doc", ".docx"}:
text = extract_text_from_docx(contents)
elif ext in {".ppt", ".pptx"}:
text = extract_text_from_pptx(contents)
elif ext in {".xls", ".xlsx"}:
xls = pd.read_excel(BytesIO(contents), sheet_name=None)
parts = []
for sheet, df in xls.items():
parts.append(f"--- Sheet: {sheet} ---")
parts.append(df.to_csv(index=False))
text = "\n".join(parts)
elif ext in {".csv", ".txt"}:
text = contents.decode("utf-8", errors="ignore")
else:
raise ValueError(f"Unsupported extension {ext!r}")
asyncio.run(save_context_detail(
session_id, name, title, summary, categories))
asyncio.run(get_and_store_embeddings(
text, session_id, name, title, summary, categories))
return text
@app.post("/upload-pdf")
async def upload_pdf(
pdf_file: UploadFile = File(...),
name: str = Form(...),
sessionId: str = Form(...),
title: str = Form(...),
summary: str = Form(...),
categories: str = Form(...)
):
try:
ext = Path(name).suffix.lower()
if ext not in ALLOWED_EXT:
raise HTTPException(
400,
detail=(
f"Invalid file type {ext!r}. "
"Allowed: PDF, CSV, TXT, PPT(X), DOC(X), XLS(X)."
)
)
contents = await pdf_file.read()
loop = asyncio.get_running_loop()
text_content = await loop.run_in_executor(
None, # use default ThreadPoolExecutor
_process_documents, # the blocking function
contents, sessionId, name, title, summary, categories
)
# pdf_stream = BytesIO(contents)
# print("Content : ", contents)
# print("pdf_stream : ", pdf_stream)
# try:
# text_content = extract_text(pdf_stream)
# print("pdf content : ", text_content)
# except Exception as e:
# print("error in pdf content : ", e)
# raise HTTPException(
# status_code=400, detail=f"Error extracting text from PDF: {e}")
# print("\n\nSaving details")
# await save_context_detail(sessionId, name)
# print("\n\nSaving embeddings")
# embeddded = await get_and_store_embeddings(text_content, sessionId, name)
return JSONResponse(status_code=200, content={"status": "received", "text": text_content})
except Exception as e:
print("Error in embedding pdf : ", e)
return JSONResponse(status_code=500, content={"status": "failed", "detail": e})
async def fetch_url_content(link: str):
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(link)
response.raise_for_status() # Raise exception for HTTP errors
return response
except httpx.RequestError as exc:
raise HTTPException(
status_code=400, detail=f"Error fetching the URL: {exc}") from exc
except httpx.HTTPStatusError as exc:
raise HTTPException(
status_code=exc.response.status_code,
detail=f"Error response {exc.response.status_code} while requesting {exc.request.url}"
) from exc
def get_content_type(response: httpx.Response) -> str:
content_type = response.headers.get('Content-Type', '').lower()
if ';' in content_type:
content_type = content_type.split(';')[0].strip()
return content_type
def extract_text_from_pdf(pdf_content: bytes) -> str:
pdf_stream = BytesIO(pdf_content)
try:
text = extract_text(pdf_stream)
return text
except Exception as e:
raise HTTPException(
status_code=400, detail=f"Error extracting text from PDF: {e}") from e
def extract_text_from_html(html_content: str) -> str:
soup = BeautifulSoup(html_content, 'html.parser')
for script_or_style in soup(['script', 'style']):
script_or_style.decompose()
text = soup.get_text(separator='\n')
lines = [line.strip() for line in text.splitlines()]
text = '\n'.join(line for line in lines if line)
return text
def is_supported_domain(url: str) -> bool:
parsed_url = urlparse(url)
unsupported_domains = ['drive.google.com', 'docs.google.com']
return parsed_url.netloc not in unsupported_domains
@app.post("/process-link")
async def process_link(input_data: LinkInput):
try:
link = str(input_data.link)
session_id = input_data.sessionId
title = input_data.title
summary = input_data.summary
categories = input_data.categories
blocked_domains = ("drive.google.com",
"docs.google.com", "dropbox.com")
if any(blocked in link for blocked in blocked_domains):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Links from Google Drive or similar services are not supported. Please provide a direct link to a PDF or a public web page."
)
if not is_supported_domain(link):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Links from Google Drive or similar services are not supported. Please provide a direct link to a PDF or a public web page."
)
response = await fetch_url_content(link)
content_type = get_content_type(response)
text_content = None
extracted_from = None
if content_type.startswith('application/pdf'):
text_content = extract_text_from_pdf(response.content)
extracted_from = 'pdf'
elif content_type.startswith('application/vnd.openxmlformats-officedocument.wordprocessingml.document'):
text_content = extract_text_from_docx(response.content)
extracted_from = 'docx'
elif content_type.startswith('application/vnd.openxmlformats-officedocument.presentationml.presentation'):
text_content = extract_text_from_pptx(response.content)
extracted_from = 'pptx'
elif content_type.startswith('text/html'):
html = response.text
async def try_fetch_readme_raw(urls):
for raw_url in urls:
try:
raw_resp = await fetch_url_content(raw_url)
if raw_resp.status_code == 200 and raw_resp.text.strip():
return raw_resp.text
except Exception:
continue
return None
github_repo_match = re.match(
r"https://github\.com/([^/]+)/([^/]+)(/)?$", link)
if github_repo_match:
user, repo = github_repo_match.group(
1), github_repo_match.group(2)
raw_urls = [
f"https://raw.githubusercontent.com/{user}/{repo}/main/README.md",
f"https://raw.githubusercontent.com/{user}/{repo}/master/README.md"
]
text_content = await try_fetch_readme_raw(raw_urls)
if text_content:
extracted_from = 'github_readme'
if text_content is None:
gitlab_repo_match = re.match(
r"https://gitlab\.com/([^/]+)/([^/]+)(/)?$", link)
if gitlab_repo_match:
user, repo = gitlab_repo_match.group(
1), gitlab_repo_match.group(2)
raw_urls = [
f"https://gitlab.com/{user}/{repo}/-/raw/main/README.md",
f"https://gitlab.com/{user}/{repo}/-/raw/master/README.md"
]
text_content = await try_fetch_readme_raw(raw_urls)
if text_content:
extracted_from = 'gitlab_readme'
if text_content is None and "huggingface.co/" in link:
raw_readme_url = link.rstrip("/") + "/raw/main/README.md"
try:
raw_resp = await fetch_url_content(raw_readme_url)
if raw_resp.status_code == 200 and raw_resp.text.strip():
text_content = raw_resp.text
extracted_from = 'huggingface_readme'
except Exception:
pass
if text_content is None:
try:
doc = ReadabilityDocument(html)
except Exception as e:
print(f"Error creating Document: {e}")
summary_html = doc.summary()
soup = BeautifulSoup(summary_html, "html.parser")
text_content = "\n".join(soup.stripped_strings)
# print("\n\n\n\n\n\nScraped Text : ", text_content)
extracted_from = 'html'
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unsupported content type: {content_type}"
)
await save_context_detail(session_id, link, title, summary, categories)
await get_and_store_embeddings(text_content, session_id, link, title, summary, categories)
return JSONResponse(
status_code=status.HTTP_200_OK,
content={
"status": "success",
"content_type": extracted_from,
"text": text_content
}
)
except HTTPException as http_exc:
raise http_exc
except Exception as e:
print("Error in uploding link : ", e)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"success": False,
"detail": str(e)
}
)
@app.post("/process-text")
async def process_text(input_data: TextInput):
try:
text = str(input_data.text)
session_id = input_data.sessionId
name = input_data.name
title = input_data.title
summary = input_data.summary
categories = input_data.categories
await save_context_detail(session_id, name, title, summary, categories)
await get_and_store_embeddings(text, session_id, name, title, summary, categories)
return JSONResponse(
status_code=status.HTTP_200_OK,
content={
"status": "success",
"text": text
}
)
except HTTPException as http_exc:
raise http_exc
except Exception as e:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"success": False,
"detail": str(e)
}
)
@app.post("/clear-context")
async def clearContext(body: clearContextInput):
sessionId = body.sessionId
deleted = delete_embeddings(sessionId)
if deleted:
clear_context_detail(sessionId)
return JSONResponse(
status_code=status.HTTP_200_OK,
content={
"status": "success",
"message": "all the embedding are deleted"
}
)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"status": "failed",
"message": "failed to delete"
}
)
@app.post("/delete-chat")
async def clearChat(body: clearContextInput):
sessionId = body.sessionId
deleted = delete_embeddings(sessionId)
if deleted:
delete_chat(sessionId)
return JSONResponse(
status_code=status.HTTP_200_OK,
content={
"status": "success",
"message": "all the embedding are deleted"
}
)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"status": "failed",
"message": "failed to delete"
}
)
def run_fastapi():
uvicorn.run(app, host="0.0.0.0", port=8082, log_level="info")
threading.Thread(target=run_fastapi, daemon=True).start()