Spaces:
Running
Running
import gradio as gr | |
import groq | |
import os | |
import tempfile | |
import uuid | |
from dotenv import load_dotenv | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.vectorstores import FAISS | |
from langchain_huggingface import HuggingFaceEmbeddings | |
import fitz # PyMuPDF | |
import base64 | |
from PIL import Image | |
import io | |
import requests | |
import json | |
import re | |
from datetime import datetime, timedelta | |
import arxiv | |
# Load environment variables | |
load_dotenv() | |
client = groq.Client(api_key=os.getenv("GROQ_LEGAL_API_KEY")) | |
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
# Directory to store FAISS indexes | |
FAISS_INDEX_DIR = "faiss_indexes_academic" | |
if not os.path.exists(FAISS_INDEX_DIR): | |
os.makedirs(FAISS_INDEX_DIR) | |
# Dictionary to store user-specific vectorstores | |
user_vectorstores = {} | |
# Custom CSS for Academic theme | |
custom_css = """ | |
:root { | |
--primary-color: #003366; /* Deep Blue */ | |
--secondary-color: #000080; /* Navy */ | |
--light-background: #F5F5F5; /* Light Gray */ | |
--dark-text: #333333; | |
--white: #FFFFFF; | |
--border-color: #E5E7EB; | |
} | |
body { background-color: var(--light-background); font-family: 'Inter', sans-serif; } | |
.container { max-width: 1200px !important; margin: 0 auto !important; padding: 10px; } | |
.header { background-color: var(--white); border-bottom: 2px solid var(--border-color); padding: 15px 0; margin-bottom: 20px; border-radius: 12px 12px 0 0; box-shadow: 0 2px 4px rgba(0,0,0,0.05); } | |
.header-title { color: var(--secondary-color); font-size: 1.8rem; font-weight: 700; text-align: center; } | |
.header-subtitle { color: var(--dark-text); font-size: 1rem; text-align: center; margin-top: 5px; } | |
.chat-container { border-radius: 12px !important; box-shadow: 0 4px 6px rgba(0,0,0,0.1) !important; background-color: var(--white) !important; border: 1px solid var(--border-color) !important; min-height: 500px; } | |
.message-user { background-color: var(--primary-color) !important; color: var(--white) !important; border-radius: 18px 18px 4px 18px !important; padding: 12px 16px !important; margin-left: auto !important; max-width: 80% !important; } | |
.message-bot { background-color: #F0F0F0 !important; color: var(--dark-text) !important; border-radius: 18px 18px 18px 4px !important; padding: 12px 16px !important; margin-right: auto !important; max-width: 80% !important; } | |
.input-area { background-color: var(--white) !important; border-top: 1px solid var(--border-color) !important; padding: 12px !important; border-radius: 0 0 12px 12px !important; } | |
.input-box { border: 1px solid var(--border-color) !important; border-radius: 24px !important; padding: 12px 16px !important; box-shadow: 0 2px 4px rgba(0,0,0,0.05) !important; } | |
.send-btn { background-color: var(--secondary-color) !important; border-radius: 24px !important; color: var(--white) !important; padding: 10px 20px !important; font-weight: 500 !important; } | |
.clear-btn { background-color: #F0F0F0 !important; border: 1px solid var(--border-color) !important; border-radius: 24px !important; color: var(--dark-text) !important; padding: 8px 16px !important; font-weight: 500 !important; } | |
.pdf-viewer-container { border-radius: 12px !important; box-shadow: 0 4px 6px rgba(0,0,0,0.1) !important; background-color: var(--white) !important; border: 1px solid var(--border-color) !important; padding: 20px; } | |
.pdf-viewer-image { max-width: 100%; height: auto; border: 1px solid var(--border-color); border-radius: 12px; box-shadow: 0 2px 4px rgba(0,0,0,0.05); } | |
.stats-box { background-color: #E6E6FA; padding: 10px; border-radius: 8px; margin-top: 10px; } | |
.tool-container { background-color: var(--white); border-radius: 12px; box-shadow: 0 4px 6px rgba(0,0,0,0.1); padding: 15px; margin-bottom: 20px; } | |
.paper-card { border-left: 3px solid var(--primary-color); padding: 10px; margin: 15px 0; background-color: #F8F9FA; border-radius: 8px; } | |
.paper-title { font-weight: bold; color: var(--primary-color); font-size: 1.1rem; margin-bottom: 5px; } | |
.paper-authors { color: var(--dark-text); font-size: 0.9rem; margin-bottom: 5px; } | |
.paper-abstract { font-size: 0.95rem; margin: 10px 0; } | |
.paper-meta { color: #666; font-size: 0.85rem; display: flex; justify-content: space-between; } | |
.citation-box { background-color: #F0F0F8; border: 1px solid #D1D5DB; border-radius: 8px; padding: 15px; margin: 10px 0; font-family: monospace; white-space: pre-wrap; } | |
""" | |
# Function to process PDF files (unchanged) | |
def process_pdf(pdf_file): | |
if pdf_file is None: | |
return None, "No file uploaded", {"page_images": [], "total_pages": 0, "total_words": 0} | |
try: | |
session_id = str(uuid.uuid4()) | |
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file: | |
temp_file.write(pdf_file) | |
pdf_path = temp_file.name | |
doc = fitz.open(pdf_path) | |
texts = [page.get_text() for page in doc] | |
page_images = [] | |
for page in doc: | |
pix = page.get_pixmap() | |
img_bytes = pix.tobytes("png") | |
img_base64 = base64.b64encode(img_bytes).decode("utf-8") | |
page_images.append(img_base64) | |
total_pages = len(doc) | |
total_words = sum(len(text.split()) for text in texts) | |
doc.close() | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
chunks = text_splitter.create_documents(texts) | |
vectorstore = FAISS.from_documents(chunks, embeddings) | |
index_path = os.path.join(FAISS_INDEX_DIR, session_id) | |
vectorstore.save_local(index_path) | |
user_vectorstores[session_id] = vectorstore | |
os.unlink(pdf_path) | |
pdf_state = {"page_images": page_images, "total_pages": total_pages, "total_words": total_words} | |
return session_id, f"✅ Successfully processed {len(chunks)} text chunks from your PDF", pdf_state | |
except Exception as e: | |
if "pdf_path" in locals() and os.path.exists(pdf_path): | |
os.unlink(pdf_path) | |
return None, f"Error processing PDF: {str(e)}", {"page_images": [], "total_pages": 0, "total_words": 0} | |
# Function to generate chatbot responses with Academic theme | |
def generate_response(message, session_id, model_name, history): | |
if not message: | |
return history | |
try: | |
context = "" | |
if session_id and session_id in user_vectorstores: | |
vectorstore = user_vectorstores[session_id] | |
docs = vectorstore.similarity_search(message, k=3) | |
if docs: | |
context = "\n\nRelevant information from uploaded PDF:\n" + "\n".join(f"- {doc.page_content}" for doc in docs) | |
# Check if it's a special command for paper search | |
if message.lower().startswith("/paper ") or message.lower().startswith("/arxiv "): | |
query = message.split(" ", 1)[1] | |
paper_results = search_arxiv(query) | |
if paper_results: | |
response = "**Academic Paper Search Results:**\n\n" | |
for paper in paper_results[:3]: # Limit to top 3 results | |
response += f"**{paper['title']}**\n" | |
response += f"Authors: {paper['authors']}\n" | |
response += f"Published: {paper['published']}\n" | |
response += f"Summary: {paper['summary'][:250]}...\n" | |
response += f"Link: {paper['url']}\n\n" | |
history.append((message, response)) | |
return history | |
else: | |
history.append((message, "No paper results found for your query.")) | |
return history | |
# Check if it's a citation request | |
citation_match = re.search(r'/cite\s+(.+)', message, re.IGNORECASE) | |
if citation_match: | |
search_term = citation_match.group(1).strip() | |
try: | |
paper = search_paper_by_title(search_term) | |
if paper: | |
citations = generate_citations(paper) | |
response = f"**Citation for '{paper['title']}':**\n\n" | |
response += f"APA: {citations['apa']}\n\n" | |
response += f"MLA: {citations['mla']}\n\n" | |
response += f"Chicago: {citations['chicago']}\n\n" | |
history.append((message, response)) | |
return history | |
else: | |
history.append((message, f"Sorry, I couldn't find a paper matching '{search_term}'. Please try a more specific title.")) | |
return history | |
except Exception as e: | |
history.append((message, f"Error generating citation: {str(e)}")) | |
return history | |
system_prompt = "You are an academic assistant specializing in analyzing research papers, theses, and scholarly articles." | |
system_prompt += " You can help with understanding academic content, summarizing research findings, and explaining scholarly concepts." | |
if context: | |
system_prompt += " Use the following context to answer the question if relevant: " + context | |
completion = client.chat.completions.create( | |
model=model_name, | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": message} | |
], | |
temperature=0.7, | |
max_tokens=1024 | |
) | |
response = completion.choices[0].message.content | |
history.append((message, response)) | |
return history | |
except Exception as e: | |
history.append((message, f"Error generating response: {str(e)}")) | |
return history | |
# Functions to update PDF viewer (unchanged) | |
def update_pdf_viewer(pdf_state): | |
if not pdf_state["total_pages"]: | |
return 0, None, "No PDF uploaded yet" | |
try: | |
img_data = base64.b64decode(pdf_state["page_images"][0]) | |
img = Image.open(io.BytesIO(img_data)) | |
return pdf_state["total_pages"], img, f"**Total Pages:** {pdf_state['total_pages']}\n**Total Words:** {pdf_state['total_words']}" | |
except Exception as e: | |
print(f"Error decoding image: {e}") | |
return 0, None, "Error displaying PDF" | |
def update_image(page_num, pdf_state): | |
if not pdf_state["total_pages"] or page_num < 1 or page_num > pdf_state["total_pages"]: | |
return None | |
try: | |
img_data = base64.b64decode(pdf_state["page_images"][page_num - 1]) | |
img = Image.open(io.BytesIO(img_data)) | |
return img | |
except Exception as e: | |
print(f"Error decoding image: {e}") | |
return None | |
# Academic-specific tools | |
def search_arxiv(query, max_results=10, sort_by=arxiv.SortCriterion.Relevance): | |
"""Search for papers on arXiv""" | |
try: | |
search = arxiv.Search( | |
query=query, | |
max_results=max_results, | |
sort_by=sort_by | |
) | |
results = [] | |
for paper in search.results(): | |
results.append({ | |
"title": paper.title, | |
"authors": ", ".join(author.name for author in paper.authors), | |
"summary": paper.summary, | |
"published": paper.published.strftime("%Y-%m-%d"), | |
"url": paper.pdf_url, | |
"arxiv_id": paper.entry_id.split("/")[-1], | |
"categories": ", ".join(paper.categories) | |
}) | |
return results | |
except Exception as e: | |
print(f"Error searching arXiv: {e}") | |
return [] | |
def search_semantic_scholar(query, fields="title,authors,abstract,year,venue,externalIds"): | |
"""Search for papers using Semantic Scholar API""" | |
api_key = os.getenv("SEMANTIC_SCHOLAR_API_KEY", "") | |
try: | |
headers = {} | |
if api_key: | |
headers["x-api-key"] = api_key | |
params = { | |
"query": query, | |
"fields": fields, | |
"limit": 10 | |
} | |
response = requests.get( | |
"https://api.semanticscholar.org/graph/v1/paper/search", | |
headers=headers, | |
params=params | |
) | |
if response.status_code != 200: | |
print(f"API Error: {response.status_code} - {response.text}") | |
return [] | |
data = response.json() | |
results = [] | |
for paper in data.get("data", []): | |
authors = ", ".join([author.get("name", "") for author in paper.get("authors", [])]) | |
paper_data = { | |
"title": paper.get("title", "Unknown Title"), | |
"authors": authors, | |
"abstract": paper.get("abstract", "No abstract available"), | |
"year": paper.get("year", "Unknown Year"), | |
"venue": paper.get("venue", "Unknown Venue"), | |
"s2_id": paper.get("paperId", ""), | |
"url": f"https://www.semanticscholar.org/paper/{paper.get('paperId', '')}" | |
} | |
# Extract external IDs if available | |
external_ids = paper.get("externalIds", {}) | |
if external_ids: | |
if "DOI" in external_ids: | |
paper_data["doi"] = external_ids["DOI"] | |
if "ArXiv" in external_ids: | |
paper_data["arxiv_id"] = external_ids["ArXiv"] | |
results.append(paper_data) | |
return results | |
except Exception as e: | |
print(f"Error in Semantic Scholar search: {e}") | |
return [] | |
def search_paper_by_title(title): | |
"""Search for a specific paper by title to generate citations""" | |
try: | |
# Try Semantic Scholar first | |
results = search_semantic_scholar(title) | |
if results: | |
return results[0] # Return the top match | |
# Fallback to arXiv | |
results = search_arxiv(title, max_results=1) | |
if results: | |
return results[0] | |
return None | |
except Exception as e: | |
print(f"Error searching for paper: {e}") | |
return None | |
def generate_citations(paper): | |
"""Generate citations in various formats""" | |
try: | |
# Get current year for citations | |
current_year = datetime.now().year | |
# Extract author surnames for citations | |
author_list = paper.get("authors", "").split(", ") | |
first_author_surname = author_list[0].split()[-1] if author_list else "Unknown" | |
# Publication year | |
year = paper.get("year", current_year) | |
# Title | |
title = paper.get("title", "Unknown Title") | |
# Publication venue | |
venue = paper.get("venue", "") | |
# URLs | |
url = paper.get("url", "") | |
doi = paper.get("doi", "") | |
doi_url = f"https://doi.org/{doi}" if doi else "" | |
# Create citations | |
apa = f"{first_author_surname}" | |
if len(author_list) > 1: | |
apa += " et al." | |
apa += f" ({year}). {title}. " | |
if venue: | |
apa += f"{venue}. " | |
if doi: | |
apa += f"https://doi.org/{doi}" | |
elif url: | |
apa += url | |
mla = f"{first_author_surname}" | |
if len(author_list) > 1: | |
mla += " et al." | |
mla += f". \"{title}.\" " | |
if venue: | |
mla += f"{venue}, " | |
mla += f"{year}. " | |
if doi: | |
mla += f"DOI: {doi}." | |
elif url: | |
mla += f"Web: {url}." | |
chicago = f"{first_author_surname}" | |
if len(author_list) > 1: | |
chicago += " et al." | |
chicago += f". \"{title}.\" " | |
if venue: | |
chicago += f"{venue} " | |
chicago += f"({year})" | |
if doi or url: | |
chicago += f". Accessed {datetime.now().strftime('%B %d, %Y')}" | |
return { | |
"apa": apa, | |
"mla": mla, | |
"chicago": chicago | |
} | |
except Exception as e: | |
print(f"Error generating citations: {e}") | |
return { | |
"apa": "Error generating APA citation", | |
"mla": "Error generating MLA citation", | |
"chicago": "Error generating Chicago citation" | |
} | |
def perform_paper_search(query, source, category, sort_by, max_results): | |
"""Unified paper search handler""" | |
try: | |
if not query: | |
return "Please enter a search query" | |
if source == "arXiv": | |
# Map string sort option to arXiv sort criterion | |
sort_criterion = arxiv.SortCriterion.Relevance | |
if sort_by == "date": | |
sort_criterion = arxiv.SortCriterion.SubmittedDate | |
# Refine query with category if specified | |
search_query = query | |
if category and category != "all": | |
search_query = f"{query} AND cat:{category}" | |
results = search_arxiv(search_query, int(max_results), sort_criterion) | |
else: # Semantic Scholar | |
results = search_semantic_scholar(query) | |
if not results: | |
return "No results found. Try different search terms." | |
# Format results as markdown | |
markdown = "## Paper Search Results\n\n" | |
for i, paper in enumerate(results, 1): | |
markdown += f"### {i}. {paper['title']}\n\n" | |
markdown += f"**Authors:** {paper['authors']}\n\n" | |
if source == "arXiv": | |
markdown += f"**Published:** {paper['published']}\n" | |
markdown += f"**Categories:** {paper.get('categories', 'N/A')}\n" | |
markdown += f"**arXiv ID:** {paper.get('arxiv_id', 'N/A')}\n\n" | |
else: | |
markdown += f"**Year:** {paper.get('year', 'N/A')}\n" | |
markdown += f"**Venue:** {paper.get('venue', 'N/A')}\n" | |
if paper.get('doi'): | |
markdown += f"**DOI:** {paper.get('doi')}\n\n" | |
markdown += f"**Abstract:** {paper.get('summary', paper.get('abstract', 'No abstract available'))[:500]}...\n\n" | |
markdown += f"[View Paper]({paper['url']})\n\n" | |
markdown += "---\n\n" | |
return markdown | |
except Exception as e: | |
return f"Error searching for papers: {str(e)}" | |
def generate_citation_from_search(query): | |
"""Search for a paper and generate citations""" | |
try: | |
if not query: | |
return "Please enter a paper title to cite" | |
paper = search_paper_by_title(query) | |
if not paper: | |
return "No matching papers found. Try a more specific title." | |
citations = generate_citations(paper) | |
markdown = f"## Citation for: {paper['title']}\n\n" | |
markdown += "### APA Format\n" | |
markdown += f"```\n{citations['apa']}\n```\n\n" | |
markdown += "### MLA Format\n" | |
markdown += f"```\n{citations['mla']}\n```\n\n" | |
markdown += "### Chicago Format\n" | |
markdown += f"```\n{citations['chicago']}\n```\n\n" | |
if paper.get('url'): | |
markdown += f"[View Original Paper]({paper['url']})\n" | |
return markdown | |
except Exception as e: | |
return f"Error generating citation: {str(e)}" | |
# Gradio interface | |
with gr.Blocks(css=custom_css, theme=gr.themes.Soft()) as demo: | |
current_session_id = gr.State(None) | |
pdf_state = gr.State({"page_images": [], "total_pages": 0, "total_words": 0}) | |
gr.HTML(""" | |
<div class="header"> | |
<div class="header-title">Scholar-Vision</div> | |
<div class="header-subtitle">Analyze academic papers with Groq's LLM API.</div> | |
</div> | |
""") | |
with gr.Row(elem_classes="container"): | |
with gr.Column(scale=1, min_width=300): | |
pdf_file = gr.File(label="Upload PDF Document", file_types=[".pdf"], type="binary") | |
upload_button = gr.Button("Process PDF", variant="primary") | |
pdf_status = gr.Markdown("No PDF uploaded yet") | |
model_dropdown = gr.Dropdown( | |
choices=["llama3-70b-8192", "llama3-8b-8192", "mixtral-8x7b-32768", "gemma-7b-it"], | |
value="llama3-70b-8192", | |
label="Select Groq Model" | |
) | |
# Academic Tools Section | |
gr.Markdown("### Academic Tools", elem_classes="tool-title") | |
with gr.Group(elem_classes="tool-container"): | |
with gr.Tabs(): | |
with gr.TabItem("Paper Search"): | |
paper_query = gr.Textbox(label="Search Query", placeholder="Enter keywords to search for papers") | |
with gr.Row(): | |
source = gr.Dropdown( | |
choices=["arXiv", "Semantic Scholar"], | |
value="arXiv", | |
label="Source" | |
) | |
category = gr.Dropdown( | |
choices=["all", "cs.AI", "cs.CL", "cs.CV", "cs.LG", "physics", "math", "q-bio"], | |
value="all", | |
label="Category (arXiv only)" | |
) | |
with gr.Row(): | |
sort_by = gr.Dropdown( | |
choices=["relevance", "date"], | |
value="relevance", | |
label="Sort By" | |
) | |
max_results = gr.Dropdown( | |
choices=["5", "10", "15", "20"], | |
value="10", | |
label="Max Results" | |
) | |
search_btn = gr.Button("Search Papers") | |
with gr.TabItem("Citation Generator"): | |
citation_query = gr.Textbox(label="Paper Title", placeholder="Enter the title of the paper to cite") | |
citation_btn = gr.Button("Generate Citations") | |
with gr.Column(scale=2, min_width=600): | |
with gr.Tabs(): | |
with gr.TabItem("PDF Viewer"): | |
with gr.Column(elem_classes="pdf-viewer-container"): | |
page_slider = gr.Slider(minimum=1, maximum=1, step=1, label="Page Number", value=1) | |
pdf_image = gr.Image(label="PDF Page", type="pil", elem_classes="pdf-viewer-image") | |
stats_display = gr.Markdown("No PDF uploaded yet", elem_classes="stats-box") | |
with gr.TabItem("Paper Search Results"): | |
paper_results = gr.Markdown("Search for papers to see results here") | |
with gr.TabItem("Citations"): | |
citation_results = gr.Markdown("Generate citations to see results here") | |
with gr.Row(elem_classes="container"): | |
with gr.Column(scale=2, min_width=600): | |
chatbot = gr.Chatbot(height=500, bubble_full_width=False, show_copy_button=True, elem_classes="chat-container") | |
with gr.Row(): | |
msg = gr.Textbox(show_label=False, placeholder="Ask about your academic document, type /paper to search, or /cite to generate citations...", scale=5) | |
send_btn = gr.Button("Send", scale=1) | |
clear_btn = gr.Button("Clear Conversation") | |
# Event Handlers | |
upload_button.click( | |
process_pdf, | |
inputs=[pdf_file], | |
outputs=[current_session_id, pdf_status, pdf_state] | |
).then( | |
update_pdf_viewer, | |
inputs=[pdf_state], | |
outputs=[page_slider, pdf_image, stats_display] | |
) | |
msg.submit( | |
generate_response, | |
inputs=[msg, current_session_id, model_dropdown, chatbot], | |
outputs=[chatbot] | |
).then(lambda: "", None, [msg]) | |
send_btn.click( | |
generate_response, | |
inputs=[msg, current_session_id, model_dropdown, chatbot], | |
outputs=[chatbot] | |
).then(lambda: "", None, [msg]) | |
clear_btn.click( | |
lambda: ([], None, "No PDF uploaded yet", {"page_images": [], "total_pages": 0, "total_words": 0}, 0, None, "No PDF uploaded yet"), | |
None, | |
[chatbot, current_session_id, pdf_status, pdf_state, page_slider, pdf_image, stats_display] | |
) | |
page_slider.change( | |
update_image, | |
inputs=[page_slider, pdf_state], | |
outputs=[pdf_image] | |
) | |
# Academic tool handlers | |
search_btn.click( | |
perform_paper_search, | |
inputs=[paper_query, source, category, sort_by, max_results], | |
outputs=[paper_results] | |
) | |
citation_btn.click( | |
generate_citation_from_search, | |
inputs=[citation_query], | |
outputs=[citation_results] | |
) | |
# Add footer with creator attribution | |
gr.HTML(""" | |
<div style="text-align: center; margin-top: 20px; padding: 10px; color: #666; font-size: 0.8rem; border-top: 1px solid #eee;"> | |
Created by Calvin Allen Crawford | |
</div> | |
""") | |
# Launch the app | |
if __name__ == "__main__": | |
demo.launch() |