|
print("start1") |
|
import os |
|
import sys |
|
import subprocess |
|
import gradio as gr |
|
from PyPDF2 import PdfReader |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain_community.vectorstores import FAISS |
|
from langchain.prompts import PromptTemplate |
|
from langchain.chains import LLMChain |
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
from langchain.schema import Document |
|
print("start2") |
|
|
|
|
|
# Check if already installed to avoid reinstalling |
|
try: |
|
import llama_cpp |
|
print("llama_cpp already installed.") |
|
except ImportError: |
|
print("Installing llama_cpp from wheel...") |
|
subprocess.check_call([ |
|
sys.executable, "-m", "pip", "install", |
|
"llama-cpp-python", "--no-binary", ":all:", "--force-reinstall" |
|
]) |
|
|
|
|
|
from llama_cpp import Llama |
|
print("start3") |
|
import warnings |
|
warnings.filterwarnings("ignore") |
|
|
|
print("Start") |
|
import subprocess |
|
|
|
subprocess.run([ |
|
"huggingface-cli", "download", |
|
"TheBloke/Mistral-7B-Instruct-v0.1-GGUF", |
|
"mistral-7b-instruct-v0.1.Q2_K.gguf", |
|
"--local-dir", "./models", |
|
"--local-dir-use-symlinks", "False" |
|
], check=True) |
|
|
|
# ------------------------------ |
|
# Device and Embedding Setup (CPU optimized) |
|
# ------------------------------ |
|
modelPath = "sentence-transformers/all-mpnet-base-v2" |
|
model_kwargs = {"device": "cpu"} # Force CPU usage |
|
encode_kwargs = {"normalize_embedding": False} |
|
|
|
embeddings = HuggingFaceEmbeddings( |
|
model_name=modelPath, |
|
model_kwargs=model_kwargs, |
|
encode_kwargs=encode_kwargs |
|
) |
|
|
|
# ------------------------------ |
|
# Load Mistral GGUF via llama.cpp (CPU optimized) |
|
# ------------------------------ |
|
llm_cpp = Llama( |
|
model_path="./models/mistral-7b-instruct-v0.1.Q2_K.gguf", |
|
n_ctx=2048, |
|
n_threads=4, # Adjust based on your CPU cores |
|
n_gpu_layers=0, # Force CPU-only |
|
temperature=0.7, |
|
top_p=0.9, |
|
repeat_penalty=1.1 |
|
) |
|
|
|
# ------------------------------ |
|
# LangChain-compatible wrapper |
|
# ------------------------------ |
|
def mistral_llm(prompt): |
|
output = llm_cpp( |
|
prompt, |
|
max_tokens=512, # Reduced for CPU performance |
|
stop=["</s>", "[INST]", "[/INST]"] |
|
) |
|
return output["choices"][0]["text"].strip() |
|
|
|
# ------------------------------ |
|
# Prompt Template (unchanged) |
|
# ------------------------------ |
|
def get_qa_prompt(): |
|
template = """<s>[INST] \ |
|
You are a helpful, knowledgeable AI assistant. Answer the user's question based on the provided context. |
|
|
|
Guidelines: |
|
- Respond in a natural, conversational tone |
|
- Be detailed but concise |
|
- Use paragraphs and bullet points when appropriate |
|
- If you don't know, say so |
|
- Maintain a friendly and professional demeanor |
|
|
|
Conversation History: |
|
{chat_history} |
|
|
|
Relevant Context: |
|
{context} |
|
|
|
Current Question: {question} |
|
|
|
Provide a helpful response: [/INST]""" |
|
return PromptTemplate( |
|
template=template, |
|
input_variables=["context", "question", "chat_history"] |
|
) |
|
|
|
# ------------------------------ |
|
# PDF and Chat Logic (optimized for CPU) |
|
# ------------------------------ |
|
def pdf_text(pdf_docs): |
|
text = "" |
|
for doc in pdf_docs: |
|
reader = PdfReader(doc) |
|
for page in reader.pages: |
|
page_text = page.extract_text() |
|
if page_text: |
|
text += page_text + "\n" |
|
return text |
|
|
|
def get_chunks(text): |
|
splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=800, # Smaller chunks for CPU |
|
chunk_overlap=100, |
|
length_function=len |
|
) |
|
chunks = splitter.split_text(text) |
|
return [Document(page_content=chunk) for chunk in chunks] |
|
|
|
def get_vectorstore(documents): |
|
db = FAISS.from_documents(documents, embedding=embeddings) |
|
db.save_local("faiss_index") |
|
|
|
def format_chat_history(history): |
|
return "\n".join([f"User: {q}\nAssistant: {a}" for q, a in history[-2:]]) # Shorter history |
|
|
|
def handle_pdf_upload(pdf_files): |
|
if not pdf_files: |
|
return "β οΈ Upload at least one PDF" |
|
try: |
|
text = pdf_text(pdf_files) |
|
if not text.strip(): |
|
return "β οΈ Could not extract text" |
|
chunks = get_chunks(text) |
|
get_vectorstore(chunks) |
|
return f"β
Processed {len(pdf_files)} PDF(s) with {len(chunks)} chunks" |
|
except Exception as e: |
|
return f"β Error: {str(e)}" |
|
|
|
def user_query(msg, chat_history): |
|
if not os.path.exists("faiss_index"): |
|
chat_history.append((msg, "Please upload PDF documents first.")) |
|
return "", chat_history |
|
|
|
try: |
|
db = FAISS.load_local("faiss_index", embeddings, allow_dangerous_deserialization=True) |
|
retriever = db.as_retriever(search_kwargs={"k": 2}) # Fewer documents for CPU |
|
docs = retriever.get_relevant_documents(msg) |
|
context = "\n\n".join([d.page_content for d in docs][:2]) # Limit context |
|
|
|
prompt = get_qa_prompt() |
|
final_prompt = prompt.format( |
|
context=context[:1500], # Further limit context size |
|
question=msg, |
|
chat_history=format_chat_history(chat_history) |
|
) |
|
|
|
response = mistral_llm(final_prompt) |
|
chat_history.append((msg, response)) |
|
return "", chat_history |
|
except Exception as e: |
|
error_msg = f"Sorry, I encountered an error: {str(e)}" |
|
chat_history.append((msg, error_msg)) |
|
return "", chat_history |
|
|
|
# ------------------------------ |
|
# Gradio Interface (your exact requested format) |
|
# ------------------------------ |
|
with gr.Blocks(theme=gr.themes.Soft(), title="PDF Chat Assistant") as demo: |
|
with gr.Row(): |
|
gr.Markdown(""" |
|
# π PDF Chat Assistant |
|
### Have natural conversations with your documents ((Note: This Space runs on CPU, so responses may take a few mins.)) |
|
""") |
|
with gr.Row(): |
|
with gr.Column(scale=1, min_width=300): |
|
gr.Markdown("### Document Upload") |
|
pdf_input = gr.File( |
|
file_types=[".pdf"], |
|
file_count="multiple", |
|
label="Upload PDFs", |
|
height=100 |
|
) |
|
upload_btn = gr.Button("Process Documents", variant="primary") |
|
status_box = gr.Textbox(label="Status", interactive=False) |
|
gr.Markdown(""" |
|
**Instructions:** |
|
1. Upload PDF documents |
|
2. Click Process Documents |
|
3. Start chatting in the right panel |
|
""") |
|
|
|
with gr.Column(scale=2): |
|
chatbot = gr.Chatbot( |
|
height=600, |
|
bubble_full_width=False, |
|
avatar_images=( |
|
"user.png", |
|
"bot.png" |
|
) |
|
) |
|
|
|
with gr.Row(): |
|
message = gr.Textbox( |
|
placeholder="Type your question about the documents...", |
|
show_label=False, |
|
container=False, |
|
scale=7, |
|
autofocus=True |
|
) |
|
submit_btn = gr.Button("Send", variant="primary", scale=1) |
|
|
|
with gr.Row(): |
|
clear_chat = gr.Button("π§Ή Clear Conversation") |
|
examples = gr.Examples( |
|
examples=[ |
|
"Summarize the key points from the documents", |
|
"What are the main findings?", |
|
"Explain this in simpler terms" |
|
], |
|
inputs=message, |
|
label="Example Questions" |
|
) |
|
|
|
upload_btn.click(handle_pdf_upload, inputs=pdf_input, outputs=status_box) |
|
submit_btn.click(user_query, inputs=[message, chatbot], outputs=[message, chatbot]) |
|
message.submit(user_query, inputs=[message, chatbot], outputs=[message, chatbot]) |
|
clear_chat.click(lambda: [], None, chatbot, queue=False) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() # Disable sharing for local CPU use |