|
import PyPDF2 |
|
import torch |
|
from transformers import pipeline |
|
import gradio as gr |
|
import logging |
|
from typing import List |
|
import time |
|
import requests |
|
from bs4 import BeautifulSoup |
|
import io |
|
import tempfile |
|
import os |
|
from tqdm import tqdm |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class ContentQuestionGenerator: |
|
def __init__(self): |
|
self.device = "cuda" if torch.cuda.is_available() else "cpu" |
|
logger.info(f"Using device: {self.device}") |
|
|
|
self.summarizer = pipeline( |
|
"summarization", |
|
model="facebook/bart-large-cnn", |
|
device=0 if self.device == "cuda" else -1 |
|
) |
|
|
|
self.question_generator = pipeline( |
|
"text2text-generation", |
|
model="lmqg/t5-base-squad-qg", |
|
device=0 if self.device == "cuda" else -1 |
|
) |
|
|
|
def process_large_pdf(self, file_obj, chunk_size=50) -> str: |
|
"""Process large PDF files in chunks.""" |
|
try: |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file: |
|
if isinstance(file_obj, bytes): |
|
temp_file.write(file_obj) |
|
else: |
|
temp_file.write(file_obj.read()) |
|
temp_file_path = temp_file.name |
|
|
|
|
|
with open(temp_file_path, 'rb') as file: |
|
pdf_reader = PyPDF2.PdfReader(file) |
|
total_pages = len(pdf_reader.pages) |
|
logger.info(f"Processing PDF with {total_pages} pages") |
|
|
|
all_text = [] |
|
|
|
for i in range(0, total_pages, chunk_size): |
|
chunk_text = "" |
|
end_page = min(i + chunk_size, total_pages) |
|
|
|
logger.info(f"Processing pages {i+1} to {end_page}") |
|
for page_num in range(i, end_page): |
|
try: |
|
page = pdf_reader.pages[page_num] |
|
chunk_text += page.extract_text() + "\n" |
|
except Exception as e: |
|
logger.warning(f"Error extracting text from page {page_num + 1}: {str(e)}") |
|
continue |
|
|
|
if chunk_text.strip(): |
|
all_text.append(chunk_text) |
|
|
|
|
|
del chunk_text |
|
|
|
|
|
os.unlink(temp_file_path) |
|
|
|
return "\n".join(all_text) |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing large PDF: {str(e)}") |
|
if 'temp_file_path' in locals(): |
|
try: |
|
os.unlink(temp_file_path) |
|
except: |
|
pass |
|
raise |
|
|
|
def extract_text_from_url(self, url: str) -> str: |
|
"""Extract text content from a webpage.""" |
|
try: |
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
|
} |
|
|
|
response = requests.get(url, headers=headers, timeout=30) |
|
response.raise_for_status() |
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside']): |
|
element.decompose() |
|
|
|
|
|
if 'wikipedia.org' in url: |
|
main_content = soup.find('div', {'id': 'mw-content-text'}) |
|
text = ' '.join([p.get_text() for p in (main_content or soup).find_all('p')]) |
|
else: |
|
text = ' '.join([p.get_text() for p in soup.find_all('p')]) |
|
|
|
text = ' '.join(text.split()) |
|
|
|
if not text.strip(): |
|
raise ValueError("No text content could be extracted from the URL") |
|
|
|
return text.strip() |
|
|
|
except Exception as e: |
|
logger.error(f"Error extracting text from URL: {str(e)}") |
|
raise ValueError(f"Could not extract text from URL: {str(e)}") |
|
|
|
def chunk_text(self, text: str, max_chunk_size: int = 1024) -> List[str]: |
|
"""Split text into chunks for processing.""" |
|
chunks = [] |
|
current_chunk = [] |
|
current_size = 0 |
|
|
|
for sentence in text.split('.'): |
|
sentence = sentence.strip() + '.' |
|
if current_size + len(sentence) + 1 <= max_chunk_size: |
|
current_chunk.append(sentence) |
|
current_size += len(sentence) + 1 |
|
else: |
|
if current_chunk: |
|
chunks.append(' '.join(current_chunk)) |
|
current_chunk = [sentence] |
|
current_size = len(sentence) + 1 |
|
|
|
if current_chunk: |
|
chunks.append(' '.join(current_chunk)) |
|
|
|
return chunks |
|
|
|
def summarize_text(self, text: str) -> str: |
|
"""Summarize text with memory-efficient chunking.""" |
|
chunks = self.chunk_text(text) |
|
summaries = [] |
|
|
|
for chunk in tqdm(chunks, desc="Summarizing text"): |
|
if len(chunk.strip()) > 50: |
|
try: |
|
summary = self.summarizer(chunk, |
|
max_length=150, |
|
min_length=40, |
|
do_sample=False)[0]['summary_text'] |
|
summaries.append(summary) |
|
except Exception as e: |
|
logger.warning(f"Error summarizing chunk: {str(e)}") |
|
continue |
|
|
|
|
|
torch.cuda.empty_cache() if torch.cuda.is_available() else None |
|
|
|
return " ".join(summaries) |
|
|
|
def generate_questions(self, text: str, num_questions: int = 20) -> List[str]: |
|
"""Generate diverse questions with memory management.""" |
|
try: |
|
all_questions = set() |
|
sentences = text.split('.') |
|
|
|
for sentence in tqdm(sentences, desc="Generating questions"): |
|
if len(all_questions) >= num_questions * 2: |
|
break |
|
|
|
if len(sentence.strip()) > 30: |
|
try: |
|
generated = self.question_generator( |
|
sentence.strip(), |
|
max_length=64, |
|
num_return_sequences=2, |
|
do_sample=True, |
|
temperature=0.8 |
|
) |
|
|
|
for gen in generated: |
|
question = gen['generated_text'].strip() |
|
if question.endswith('?') and len(question.split()) > 3: |
|
all_questions.add(question) |
|
|
|
|
|
torch.cuda.empty_cache() if torch.cuda.is_available() else None |
|
|
|
except Exception as e: |
|
logger.warning(f"Error generating question: {str(e)}") |
|
continue |
|
|
|
|
|
questions_list = list(all_questions) |
|
import random |
|
random.shuffle(questions_list) |
|
|
|
return questions_list[:num_questions] |
|
|
|
except Exception as e: |
|
logger.error(f"Error generating questions: {str(e)}") |
|
raise |
|
|
|
def process_input(self, input_data) -> str: |
|
"""Process either PDF file or URL with progress tracking.""" |
|
try: |
|
start_time = time.time() |
|
|
|
|
|
if isinstance(input_data, str) and (input_data.startswith('http://') or input_data.startswith('https://')): |
|
logger.info("Processing URL content...") |
|
text = self.extract_text_from_url(input_data) |
|
else: |
|
logger.info("Processing PDF content...") |
|
text = self.process_large_pdf(input_data) |
|
|
|
logger.info(f"Extracted {len(text)} characters of text") |
|
|
|
|
|
logger.info("Summarizing content...") |
|
summarized_text = self.summarize_text(text) |
|
logger.info(f"Summarized to {len(summarized_text)} characters") |
|
|
|
logger.info("Generating questions...") |
|
questions = self.generate_questions(summarized_text) |
|
logger.info(f"Generated {len(questions)} questions") |
|
|
|
if not questions: |
|
return "Could not generate any valid questions from the content." |
|
|
|
formatted_output = "\n".join(f"{i+1}. {q}" for i, q in enumerate(questions)) |
|
processing_time = time.time() - start_time |
|
logger.info(f"Total processing time: {processing_time:.2f} seconds") |
|
|
|
return formatted_output |
|
|
|
except Exception as e: |
|
error_msg = f"Error processing input: {str(e)}" |
|
logger.error(error_msg) |
|
return f"An error occurred: {error_msg}" |
|
|
|
def create_gradio_interface(): |
|
"""Create and configure Gradio interface.""" |
|
generator = ContentQuestionGenerator() |
|
|
|
def process_input(file, url): |
|
if file is None and not url: |
|
return "Please provide either a PDF file or a webpage URL." |
|
if file is not None and url: |
|
return "Please provide either a PDF file or a URL, not both." |
|
|
|
try: |
|
if url: |
|
if not (url.startswith('http://') or url.startswith('https://')): |
|
return "Please provide a valid URL starting with http:// or https://" |
|
return generator.process_input(url) |
|
|
|
return generator.process_input(file) |
|
|
|
except Exception as e: |
|
logger.error("Error processing input:", exc_info=True) |
|
return f"Error processing input: {str(e)}" |
|
|
|
interface = gr.Interface( |
|
fn=process_input, |
|
inputs=[ |
|
gr.File( |
|
label="Upload PDF Document", |
|
type="binary", |
|
file_types=[".pdf"], |
|
file_count="single" |
|
), |
|
gr.Textbox( |
|
label="Or enter webpage URL", |
|
placeholder="https://example.com/page or https://en.wikipedia.org/wiki/Topic" |
|
) |
|
], |
|
outputs=gr.Textbox( |
|
label="Generated Questions", |
|
lines=20 |
|
), |
|
title="Content Question Generator", |
|
description=""" |
|
Upload any size PDF document or provide a webpage URL to generate relevant questions. |
|
|
|
Features: |
|
- Supports large PDF files (100MB+) |
|
- Works with any webpage URL |
|
- Special handling for Wikipedia pages |
|
- Generates 20 unique random questions |
|
- Shows progress during processing |
|
|
|
Note: Large files may take several minutes to process. |
|
""", |
|
allow_flagging="never" |
|
) |
|
|
|
return interface |
|
|
|
if __name__ == "__main__": |
|
interface = create_gradio_interface() |
|
interface.queue().launch(share=True) |