rock / app.py
itschristine73's picture
Update app.py
4ce4d52 verified
import gradio as gr
import json
import os
import io
import pdfplumber
import requests
import together
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import re
import unicodedata
from dotenv import load_dotenv
from flask import jsonify
load_dotenv()
API_URL = "ttps://1611-223-233-35-112.ngrok-free.app "
API_URL_FILES = f"{API_URL}/file"
API_URL_EMBEDDINGS = f"{API_URL}/embeddings"
API_URL_METADATA = f"{API_URL}/metadata"
# FAISS index setup
DIM = 768 # Adjust based on the embedding model
# Set up Together.AI API Key (Replace with your actual key)
assert os.getenv("TOGETHER_API_KEY"), "api key missing"
# Use a sentence transformer for embeddings
#'BAAI/bge-base-en-v1.5'
# embedding_model = SentenceTransformer("BAAI/bge-base-en-v1.5")
# 'togethercomputer/m2-bert-80M-8k-retrieval'
embedding_model = SentenceTransformer(
"togethercomputer/m2-bert-80M-8k-retrieval",
trust_remote_code=True # Allow remote code execution
)
embedding_dim = 768 # Adjust according to model
def store_document_data(PDF_FILE):
print(" Storing document...")
if PDF_FILE:
# Extract text from the PDF
text = extract_text_from_pdf(PDF_FILE)
if not text:
return "Could not extract any text from the PDF."
# Generate and return embedding
embedding = embedding_model.encode([text]).astype(np.float32)
print("Embeddings generated")
print("Embedding shape:", embedding.shape)
print(f"sending to {API_URL_EMBEDDINGS}")
try:
index = faiss.IndexFlatL2(embedding.shape[1])
index.add(embedding) # Add embedding
print(index, index.ntotal)
if index.ntotal == 0:
raise ValueError("FAISS index is empty. No embeddings added.")
index_file = "index.bin"
faiss.write_index(index, index_file)
faiss_index = faiss.read_index(index_file)
print("FAISS index loaded successfully. Number of vectors:", faiss_index.ntotal)
doc_index = index.ntotal - 1
with open(index_file, "rb") as f:
response = requests.post(API_URL_EMBEDDINGS,
files={"file": ("index.bin", f, "application/octet-stream")})
print("sent", response.json())
except requests.exceptions.RequestException as e:
return {"error": str(e)}
return doc_index
else:
return "No PDF file provided."
def retrieve_document(query):
print(f"Retrieving document based on:\n{query}")
embeddings_ = requests.get(API_URL_EMBEDDINGS)
metadata_ = requests.get(API_URL_METADATA)
# Check for errors before parsing JSON
if embeddings_.status_code != 200:
print(f"Error fetching embeddings: {embeddings_.status_code} - {embeddings_.text}")
return None
if metadata_.status_code != 200:
print(f"Error fetching metadata: {metadata_.status_code} - {metadata_.text}")
return None
try:
metadata_file = metadata_.json()['metadata_file']
print(metadata_file)
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding metadata JSON: {e}")
return None
try:
print("Response content length:", len(embeddings_.content)) # Debugging
if len(embeddings_.content) == 0:
raise ValueError("Received empty FAISS index file")
# Convert response content to a byte stream
byte_stream = io.BytesIO(embeddings_.content)
# Write the received binary content to a temporary file
with open("downloaded_index.bin", "wb") as f:
f.write(byte_stream.read())
# Load FAISS index from file
index = faiss.read_index("downloaded_index.bin")
print(f"βœ… Successfully loaded FAISS index with {index.ntotal} vectors.")
except Exception as e:
print(f"Error loading FAISS index: {e}")
return None
print(index, metadata_file)
# Generate query embedding
query_embedding = embedding_model.encode([query]).astype(np.float32)
# Search for the closest document in FAISS index
_, closest_idx = index.search(query_embedding, 1)
metadata = metadata_file
# Check if a relevant document was found
if closest_idx[0][0] == -1 or str(closest_idx[0][0]) not in metadata:
print("No relevant document found")
return None
# Retrieve the document file path
filename = metadata[str(closest_idx[0][0])]
print(filename)
response = requests.get(API_URL_FILES, params={"file":filename})
print(response.content)
recieved_file = "document.pdf"
if response.status_code == 200:
with open(recieved_file, "wb") as f:
f.write(response.content)
prompt_doc = extract_text_from_pdf(recieved_file)
print(f"PDF received successfully: received_{filename}")
else:
print(f"Error: {response.status_code}, {response.json()}")
return prompt_doc
def clean_text(text):
"""Cleans extracted text for better processing by the model."""
print("cleaning")
text = unicodedata.normalize("NFKC", text) # Normalize Unicode characters
text = re.sub(r'\s+', ' ', text).strip() # Remove extra spaces and newlines
text = re.sub(r'[^a-zA-Z0-9.,!?;:\\"()\-]', ' ', text) # Keep basic punctuation
text = re.sub(r'(?i)(page\s*\d+)', '', text) # Remove page numbers
return text
def extract_text_from_pdf(pdf_file):
"""Extract and clean text from the uploaded PDF."""
print("extracting")
try:
with pdfplumber.open(pdf_file) as pdf:
text = " ".join(clean_text(text) for page in pdf.pages if (text := page.extract_text()))
return text
except Exception as e:
print(f"Error extracting text: {e}{pdf_file}")
return None
def split_text(text, chunk_size=500):
"""Splits text into smaller chunks for better processing."""
print("splitting")
return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
def chatbot(user_question):
"""Processes the PDF and answers the user's question."""
print("chatbot start")
# retrieve the document relevant to the query
doc = retrieve_document(user_question)
if doc:
print(f"found doc:\n{doc}\n")
# Split into smaller chunks
chunks = split_text(doc)
# Use only the first chunk (to optimize token usage)
prompt = f"Based on this document, answer the question:\n\nDocument:\n{chunks[0]}\n\nQuestion: {user_question}"
print(f"prompt:\n{prompt}")
else:
prompt=user_question
try:
print("asking")
response = together.Completion.create(
model="mistralai/Mistral-7B-Instruct-v0.1",
prompt=prompt,
max_tokens=200,
temperature=0.7,
)
# Return chatbot's response
return response.choices[0].text
except Exception as e:
return f"Error generating response: {e}"
# Send to Together.AI (Mistral-7B)
def helloWorld(text):
return f"{text} : hello world"
# Gradio Interface
iface = gr.TabbedInterface(
[
gr.Interface(
fn=chatbot,
inputs=gr.Textbox(label="Ask a Question"),
outputs=gr.Textbox(label="Answer"),
title="PDF Q&A Chatbot (Powered by Together.AI)",
),
gr.Interface(
fn=helloWorld,
inputs="text",
outputs="text",
),
gr.Interface(
fn=store_document_data,
inputs=[gr.File(label="PDF_FILE")],
outputs=gr.Textbox(label="Answer"),
title="pdf file, metadata, index parsing and storing",
),
]
)
# Launch Gradio app
iface.launch(show_error=True)