PDFChat / app.py
getGO007's picture
Update app.py
f7831ce verified
import os
print(">>> DEBUG: Environment Variables at Startup <<<")
for var in ("OPENAI_API_KEY", "LLAMA_CLOUD_API_KEY"):
#, "LLAMA_CLOUD_BASE_URL"):
print(f"{var} = {os.getenv(var)!r}")
# import openai
import shutil
import asyncio
from pathlib import Path
import nest_asyncio
nest_asyncio.apply()
import gradio as gr
from PyPDF2 import PdfReader # pip install PyPDF2
from llama_parse import LlamaParse
from llama_index.core import (
Settings, VectorStoreIndex, StorageContext, load_index_from_storage
)
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.tools import QueryEngineTool
from llama_index.core.query_engine import SubQuestionQueryEngine
from llama_index.core.agent.workflow import FunctionAgent
from llama_index.core.workflow import Context
# ---- 1. Global Settings & API Keys ----
global OPENAI_API_KEY, LLAMA_CLOUD_API_KEY
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
LLAMA_CLOUD_API_KEY = os.getenv("LLAMA_CLOUD_API_KEY")
# openai.api_key = OPENAI_API_KEY
Settings.llm = OpenAI(model="gpt-4o")
Settings.embed_model = OpenAIEmbedding(model_name="text-embedding-3-large")
Settings.chunk_size = 512
Settings.chunk_overlap = 64
# ---- 2. Parser Setup ----
print(">>> DEBUG: About to init LlamaParse with key:", os.getenv("LLAMA_CLOUD_API_KEY") is not None)
# print(">>> DEBUG: About to init LlamaParse with key:", os.getenv("LLAMA_CLOUD_BASE_URL") is not None)
parser = LlamaParse(
api_key = LLAMA_CLOUD_API_KEY,
# base_url = os.getenv("LLAMA_CLOUD_BASE_URL"),
result_type = "markdown",
content_guideline_instruction = (
"You are processing a PDF slide deck. "
"Produce Markdown with slide metadata, cleaned bullets, tables, "
"charts summaries, figures captions, metrics, and a 1–2 sentence takeaway."
),
verbose=True
)
# Ensure directories exist
Path("./user_data").mkdir(exist_ok=True)
Path("./index_data").mkdir(exist_ok=True)
# ---- 3a. Upload + Answer Logic ----
async def answer(uploaded_files: list[gr.FileData], question: str) -> str:
print(f">>> DEBUG: answer() called. OPENAI key set? {os.getenv('OPENAI_API_KEY') is not None}")
print(f">>> DEBUG: answer() called. LLAMA key set? {os.getenv('LLAMA_CLOUD_API_KEY') is not None}")
if not uploaded_files:
return "❗ Please upload at least one PDF."
if len(uploaded_files) > 5:
return "❗ You can upload up to 5 PDF files."
tools = []
for file_obj in uploaded_files:
# 1) Page-count check
try:
reader = PdfReader(file_obj.name)
except Exception as e:
return f"❗ Error reading {file_obj.name}: {e}"
if len(reader.pages) > 50:
return f"❗ {Path(file_obj.name).name} has {len(reader.pages)} pages (>50)."
# 2) Copy PDF into user_data
dest = Path("./user_data") / Path(file_obj.name).name
shutil.copyfile(file_obj.name, dest)
# 3) Parse via LlamaParse
docs = parser.load_data(dest)
# 4) Index folder per file stem
stem = dest.stem
idx_dir = Path(f"./index_data/{stem}")
# 5) Load or build index
if idx_dir.exists() and any(idx_dir.iterdir()):
sc = StorageContext.from_defaults(persist_dir=str(idx_dir))
idx = load_index_from_storage(sc)
else:
sc = StorageContext.from_defaults()
idx = VectorStoreIndex.from_documents(docs, storage_context=sc)
sc.persist(persist_dir=str(idx_dir))
# 6) Wrap in QueryEngineTool
tools.append(
QueryEngineTool.from_defaults(
query_engine=idx.as_query_engine(),
name=f"vector_index_{stem}",
description=f"Query engine for {stem}.pdf"
)
)
# 7) Combine tools into SubQuestionQueryEngine + Agent
subq = SubQuestionQueryEngine.from_defaults(query_engine_tools=tools)
tools.append(
QueryEngineTool.from_defaults(
query_engine=subq,
name="sub_question_query_engine",
description="Multi-file comparative queries"
)
)
agent = FunctionAgent(tools=tools, llm=OpenAI(model="gpt-4o"))
ctx = Context(agent)
# 8) Run agent
resp = await agent.run(question, ctx=ctx)
return str(resp)
# ---- 3b. Remove Documents Logic ----
def remove_docs(filenames: str) -> str:
"""
filenames: comma-separated list of exact PDF filenames (with .pdf)
Deletes each from ./user_data/ and its index folder under ./index_data/
"""
if not filenames.strip():
return "❗ Enter at least one filename to remove."
removed, not_found = [], []
for name in [f.strip() for f in filenames.split(",")]:
pdf_path = Path("./user_data") / name
idx_path = Path("./index_data") / Path(name).stem
ok = True
if pdf_path.exists():
pdf_path.unlink()
else:
ok = False
if idx_path.exists():
shutil.rmtree(idx_path)
else:
ok = ok and False
if ok:
removed.append(name)
else:
not_found.append(name)
msg = ""
if removed:
msg += f"βœ… Removed: {', '.join(removed)}.\n"
if not_found:
msg += f"⚠️ Not found: {', '.join(not_found)}."
return msg.strip()
# ---- 4. Gradio UI ----
with gr.Blocks() as demo:
gr.Markdown("# πŸ“„ PDF Slide Deck Q&A Bot")
with gr.Tab("Ask Questions"):
with gr.Row():
file_input = gr.UploadButton(
"Upload up to 5 PDFs",
file_types=[".pdf"],
file_count="multiple"
)
question = gr.Textbox(
lines=2,
placeholder="Ask your question about the uploaded slide decks..."
)
output = gr.Textbox(label="Answer")
ask_btn = gr.Button("Ask")
ask_btn.click(
fn=answer,
inputs=[file_input, question],
outputs=output
)
with gr.Tab("Remove Documents"):
remove_input = gr.Textbox(
lines=1,
placeholder="e.g. Q1-Slides.pdf, Q2-Slides.pdf"
)
remove_output = gr.Textbox(label="Removal Status")
remove_btn = gr.Button("Remove Docs")
remove_btn.click(
fn=remove_docs,
inputs=remove_input,
outputs=remove_output
)
if __name__ == "__main__":
demo.launch()