import streamlit as st from transformers import T5ForConditionalGeneration, T5Tokenizer import spacy import nltk from sklearn.feature_extraction.text import TfidfVectorizer from rake_nltk import Rake import pandas as pd from fpdf import FPDF import wikipediaapi from functools import lru_cache nltk.download('punkt') nltk.download('stopwords') nltk.download('brown') from nltk.tokenize import sent_tokenize nltk.download('wordnet') from nltk.corpus import wordnet import random import sense2vec from wordcloud import WordCloud import matplotlib.pyplot as plt import json import os from sentence_transformers import SentenceTransformer, util import textstat from spellchecker import SpellChecker from transformers import pipeline import re import pymupdf import fitz # PyMuPDF import pytesseract from PIL import Image import io import uuid import time import asyncio import aiohttp import easyocr # '-----------------' import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.base import MIMEBase from email import encoders # '------------------' print("***************************************************************") st.set_page_config( page_icon='cyclone', page_title="Question Generator", initial_sidebar_state="auto", menu_items={ "About" : "Hi this our project." } ) st.set_option('deprecation.showPyplotGlobalUse',False) class QuestionGenerationError(Exception): """Custom exception for question generation errors.""" pass # Initialize Wikipedia API with a user agent user_agent = 'QGen/1.2' wiki_wiki = wikipediaapi.Wikipedia(user_agent= user_agent,language='en') def get_session_id(): if 'session_id' not in st.session_state: st.session_state.session_id = str(uuid.uuid4()) return st.session_state.session_id def initialize_state(session_id): if 'session_states' not in st.session_state: st.session_state.session_states = {} if session_id not in st.session_state.session_states: st.session_state.session_states[session_id] = { 'generated_questions': [], # add other state variables as needed } return st.session_state.session_states[session_id] def get_state(session_id): return st.session_state.session_states[session_id] def set_state(session_id, key, value): st.session_state.session_states[session_id][key] = value @st.cache_resource def load_model(modelname): model_name = modelname model = T5ForConditionalGeneration.from_pretrained(model_name) tokenizer = T5Tokenizer.from_pretrained(model_name) return model, tokenizer # Load Spacy Model @st.cache_resource def load_nlp_models(): nlp = spacy.load("en_core_web_md") s2v = sense2vec.Sense2Vec().from_disk('s2v_old') return nlp, s2v # Load Quality Assurance Models @st.cache_resource def load_qa_models(): # Initialize BERT model for sentence similarity similarity_model = SentenceTransformer('all-MiniLM-L6-v2') spell = SpellChecker() return similarity_model, spell with st.sidebar: select_model = st.selectbox("Select Model", ("T5-large","T5-small")) if select_model == "T5-large": modelname = "DevBM/t5-large-squad" elif select_model == "T5-small": modelname = "AneriThakkar/flan-t5-small-finetuned" nlp, s2v = load_nlp_models() similarity_model, spell = load_qa_models() context_model = similarity_model model, tokenizer = load_model(modelname) # Info Section def display_info(): st.sidebar.title("Information") st.sidebar.markdown(""" ### Question Generator System This system is designed to generate questions based on the provided context. It uses various NLP techniques and models to: - Extract keywords from the text - Map keywords to sentences - Generate questions - Provide multiple choice options - Assess the quality of generated questions #### Key Features: - **Keyword Extraction:** Combines RAKE, TF-IDF, and spaCy for comprehensive keyword extraction. - **Question Generation:** Utilizes a pre-trained T5 model for generating questions. - **Options Generation:** Creates contextually relevant multiple-choice options. - **Question Assessment:** Scores questions based on relevance, complexity, and spelling correctness. - **Feedback Collection:** Allows users to rate the generated questions and provides statistics on feedback. #### Customization Options: - Number of beams for question generation - Context window size for mapping keywords to sentences - Number of questions to generate - Additional display elements (context, answer, options, entity link, QA scores) #### Outputs: - Generated questions with multiple-choice options - Download options for CSV and PDF formats - Visualization of overall scores """) import fitz # PyMuPDF from PIL import Image import io import easyocr import numpy as np def extract_text_from_pdf(pdf_path): """Extract text from the given PDF file.""" try: pdf_file = fitz.open(pdf_path) all_text = "" for page_index in range(len(pdf_file)): page = pdf_file.load_page(page_index) text = page.get_text("text") if text.strip(): # Check if the text is not empty all_text += text.replace('\n', ' ') + " " pdf_file.close() if not all_text.strip(): print("No direct text found in the PDF.") return all_text.strip() # Strip any leading/trailing whitespace except Exception as e: print(f"Error extracting text from PDF: {e}") return "" def extract_images_from_pdf(pdf_path): """Extract images from the given PDF file.""" try: pdf_file = fitz.open(pdf_path) images = [] for page_index in range(len(pdf_file)): page = pdf_file.load_page(page_index) image_list = page.get_images(full=True) for img_index, img in enumerate(image_list): xref = img[0] base_image = pdf_file.extract_image(xref) image_bytes = base_image["image"] image_ext = base_image["ext"] image = Image.open(io.BytesIO(image_bytes)) images.append(image) pdf_file.close() if not images: print("No images found in the PDF.") return images except Exception as e: print(f"Error extracting images from PDF: {e}") return [] def recognize_text(image): """Recognize text from a single image.""" try: reader = easyocr.Reader(['en']) image_np = np.array(image) # Convert PIL image to numpy array result = reader.readtext(image_np) recognized_text = "" for (bbox, text, prob) in result: if prob > 0.2: recognized_text += f'{text} ' if not recognized_text.strip(): print("No text recognized from the image.") return recognized_text.strip() # Strip any leading/trailing whitespace except Exception as e: print(f"Error recognizing text from image: {e}") return "" def ocr_text_from_pdf(pdf_path): """Extract text from all images in the PDF.""" images = extract_images_from_pdf(pdf_path) all_text = "" for image in images: text = recognize_text(image) if text.strip(): # Check if the recognized text is not empty all_text += text + " " if not all_text.strip(): print("No OCR text found in the PDF images.") return all_text.strip() # Strip any leading/trailing whitespace def extract_all_text_from_pdf(pdf_path): """Extract both direct text and OCR text from a PDF.""" direct_text = extract_text_from_pdf(pdf_path) ocr_text = ocr_text_from_pdf(pdf_path) all_text = direct_text + " " + ocr_text + " " if not all_text.strip(): print("No text extracted from the PDF.") return all_text.strip() # Strip any leading/trailing whitespace def save_feedback(question, answer, rating, options, context): feedback_file = 'question_feedback.json' if os.path.exists(feedback_file): with open(feedback_file, 'r') as f: feedback_data = json.load(f) else: feedback_data = [] tpl = { 'question' : question, 'answer' : answer, 'context' : context, 'options' : options, 'rating' : rating, } # feedback_data[question] = rating feedback_data.append(tpl) print(feedback_data) with open(feedback_file, 'w') as f: json.dump(feedback_data, f) return feedback_file # ----------------------------------------------------------------------------------------- def send_email_with_attachment(email_subject, email_body, recipient_emails, sender_email, sender_password, attachment_path): msg = MIMEMultipart() msg['From'] = sender_email msg['To'] = ", ".join(recipient_emails) # Join the list of recipients with commas msg['Subject'] = email_subject msg.attach(MIMEText(email_body, 'plain')) attachment = open(attachment_path, 'rb') part = MIMEBase('application', 'octet-stream') part.set_payload(attachment.read()) encoders.encode_base64(part) part.add_header('Content-Disposition', f'attachment; filename={os.path.basename(attachment_path)}') msg.attach(part) attachment.close() with smtplib.SMTP('smtp.gmail.com', 587) as server: server.starttls() print(sender_email) print(sender_password) server.login(sender_email, sender_password) text = msg.as_string() server.sendmail(sender_email, recipient_emails, text) # ---------------------------------------------------------------------------------- # Function to clean text def clean_text(text): text = re.sub(r"[^\x00-\x7F]", " ", text) text = re.sub(f"[\n]"," ", text) return text # Function to create text chunks def segment_text(text, max_segment_length=700, batch_size=7): sentences = sent_tokenize(text) segments = [] current_segment = "" for sentence in sentences: if len(current_segment) + len(sentence) <= max_segment_length: current_segment += sentence + " " else: segments.append(current_segment.strip()) current_segment = sentence + " " if current_segment: segments.append(current_segment.strip()) # Create batches batches = [segments[i:i + batch_size] for i in range(0, len(segments), batch_size)] return batches # Function to extract keywords using combined techniques def extract_keywords(text, extract_all): try: doc = nlp(text) spacy_keywords = set([ent.text for ent in doc.ents]) spacy_entities = spacy_keywords print(f"\n\nSpacy Entities: {spacy_entities} \n\n") # Use Only Spacy Entities if extract_all is False: return list(spacy_entities) # Use RAKE rake = Rake() rake.extract_keywords_from_text(text) rake_keywords = set(rake.get_ranked_phrases()) print(f"\n\nRake Keywords: {rake_keywords} \n\n") # Use spaCy for NER and POS tagging spacy_keywords.update([token.text for token in doc if token.pos_ in ["NOUN", "PROPN", "VERB", "ADJ"]]) print(f"\n\nSpacy Keywords: {spacy_keywords} \n\n") # Use TF-IDF vectorizer = TfidfVectorizer(stop_words='english') X = vectorizer.fit_transform([text]) tfidf_keywords = set(vectorizer.get_feature_names_out()) print(f"\n\nTFIDF Entities: {tfidf_keywords} \n\n") # Combine all keywords combined_keywords = rake_keywords.union(spacy_keywords).union(tfidf_keywords) return list(combined_keywords) except Exception as e: raise QuestionGenerationError(f"Error in keyword extraction: {str(e)}") def get_similar_words_sense2vec(word, n=3): # Try to find the word with its most likely part-of-speech word_with_pos = word + "|NOUN" if word_with_pos in s2v: similar_words = s2v.most_similar(word_with_pos, n=n) return [word.split("|")[0] for word, _ in similar_words] # If not found, try without POS if word in s2v: similar_words = s2v.most_similar(word, n=n) return [word.split("|")[0] for word, _ in similar_words] return [] def get_synonyms(word, n=3): synonyms = [] for syn in wordnet.synsets(word): for lemma in syn.lemmas(): if lemma.name() != word and lemma.name() not in synonyms: synonyms.append(lemma.name()) if len(synonyms) == n: return synonyms return synonyms def generate_options(answer, context, n=3): options = [answer] # Add contextually relevant words using a pre-trained model context_embedding = context_model.encode(context) answer_embedding = context_model.encode(answer) context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()] # Compute similarity scores and sort context words similarity_scores = [util.pytorch_cos_sim(context_model.encode(word), answer_embedding).item() for word in context_words] sorted_context_words = [word for _, word in sorted(zip(similarity_scores, context_words), reverse=True)] options.extend(sorted_context_words[:n]) # Try to get similar words based on sense2vec similar_words = get_similar_words_sense2vec(answer, n) options.extend(similar_words) # If we don't have enough options, try synonyms if len(options) < n + 1: synonyms = get_synonyms(answer, n - len(options) + 1) options.extend(synonyms) # If we still don't have enough options, extract other entities from the context if len(options) < n + 1: doc = nlp(context) entities = [ent.text for ent in doc.ents if ent.text.lower() != answer.lower()] options.extend(entities[:n - len(options) + 1]) # If we still need more options, add some random words from the context if len(options) < n + 1: context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()] options.extend(random.sample(context_words, min(n - len(options) + 1, len(context_words)))) print(f"\n\nAll Possible Options: {options}\n\n") # Ensure we have the correct number of unique options options = list(dict.fromkeys(options))[:n+1] # Shuffle the options random.shuffle(options) return options # Function to map keywords to sentences with customizable context window size def map_keywords_to_sentences(text, keywords, context_window_size): sentences = sent_tokenize(text) keyword_sentence_mapping = {} print(f"\n\nSentences: {sentences}\n\n") for keyword in keywords: for i, sentence in enumerate(sentences): if keyword in sentence: # Combine current sentence with surrounding sentences for context start = max(0, i - context_window_size) end = min(len(sentences), i + context_window_size + 1) context = ' '.join(sentences[start:end]) if keyword not in keyword_sentence_mapping: keyword_sentence_mapping[keyword] = context else: keyword_sentence_mapping[keyword] += ' ' + context return keyword_sentence_mapping # Function to perform entity linking using Wikipedia API @lru_cache(maxsize=128) def entity_linking(keyword): page = wiki_wiki.page(keyword) if page.exists(): return page.fullurl return None async def generate_question_async(context, answer, num_beams): try: input_text = f" {context} {answer}" print(f"\n{input_text}\n") input_ids = tokenizer.encode(input_text, return_tensors='pt') outputs = await asyncio.to_thread(model.generate, input_ids, num_beams=num_beams, early_stopping=True, max_length=250) question = tokenizer.decode(outputs[0], skip_special_tokens=True) print(f"\n{question}\n") return question except Exception as e: raise QuestionGenerationError(f"Error in question generation: {str(e)}") async def generate_options_async(answer, context, n=3): try: options = [answer] # Add contextually relevant words using a pre-trained model context_embedding = await asyncio.to_thread(context_model.encode, context) answer_embedding = await asyncio.to_thread(context_model.encode, answer) context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()] # Compute similarity scores and sort context words similarity_scores = [util.pytorch_cos_sim(await asyncio.to_thread(context_model.encode, word), answer_embedding).item() for word in context_words] sorted_context_words = [word for _, word in sorted(zip(similarity_scores, context_words), reverse=True)] options.extend(sorted_context_words[:n]) # Try to get similar words based on sense2vec similar_words = await asyncio.to_thread(get_similar_words_sense2vec, answer, n) options.extend(similar_words) # If we don't have enough options, try synonyms if len(options) < n + 1: synonyms = await asyncio.to_thread(get_synonyms, answer, n - len(options) + 1) options.extend(synonyms) # Ensure we have the correct number of unique options options = list(dict.fromkeys(options))[:n+1] # Shuffle the options random.shuffle(options) return options except Exception as e: raise QuestionGenerationError(f"Error in generating options: {str(e)}") # Function to generate questions using beam search async def generate_questions_async(text, num_questions, context_window_size, num_beams, extract_all_keywords): try: batches = segment_text(text) keywords = extract_keywords(text, extract_all_keywords) all_questions = [] progress_bar = st.progress(0) status_text = st.empty() for i, batch in enumerate(batches): status_text.text(f"Processing batch {i+1} of {len(batches)}...") batch_questions = await process_batch(batch, keywords, context_window_size, num_beams) all_questions.extend(batch_questions) progress_bar.progress((i + 1) / len(batches)) if len(all_questions) >= num_questions: break progress_bar.empty() status_text.empty() return all_questions[:num_questions] except QuestionGenerationError as e: st.error(f"An error occurred during question generation: {str(e)}") return [] except Exception as e: st.error(f"An unexpected error occurred: {str(e)}") return [] async def process_batch(batch, keywords, context_window_size, num_beams): questions = [] for text in batch: keyword_sentence_mapping = map_keywords_to_sentences(text, keywords, context_window_size) for keyword, context in keyword_sentence_mapping.items(): question = await generate_question_async(context, keyword, num_beams) options = await generate_options_async(keyword, context) overall_score, relevance_score, complexity_score, spelling_correctness = assess_question_quality(context, question, keyword) if overall_score >= 0.5: questions.append({ "question": question, "context": context, "answer": keyword, "options": options, "overall_score": overall_score, "relevance_score": relevance_score, "complexity_score": complexity_score, "spelling_correctness": spelling_correctness, }) return questions # Function to export questions to CSV def export_to_csv(data): # df = pd.DataFrame(data, columns=["Context", "Answer", "Question", "Options"]) df = pd.DataFrame(data) # csv = df.to_csv(index=False,encoding='utf-8') csv = df.to_csv(index=False) return csv # Function to export questions to PDF def export_to_pdf(data): pdf = FPDF() pdf.add_page() pdf.set_font("Arial", size=12) for item in data: pdf.multi_cell(0, 10, f"Context: {item['context']}") pdf.multi_cell(0, 10, f"Question: {item['question']}") pdf.multi_cell(0, 10, f"Answer: {item['answer']}") pdf.multi_cell(0, 10, f"Options: {', '.join(item['options'])}") pdf.multi_cell(0, 10, f"Overall Score: {item['overall_score']:.2f}") pdf.ln(10) return pdf.output(dest='S').encode('latin-1') def display_word_cloud(generated_questions): word_frequency = {} for question in generated_questions: words = question.split() for word in words: word_frequency[word] = word_frequency.get(word, 0) + 1 wordcloud = WordCloud(width=800, height=400, background_color='white').generate_from_frequencies(word_frequency) plt.figure(figsize=(10, 5)) plt.imshow(wordcloud, interpolation='bilinear') plt.axis('off') st.pyplot() def assess_question_quality(context, question, answer): # Assess relevance using cosine similarity context_doc = nlp(context) question_doc = nlp(question) relevance_score = context_doc.similarity(question_doc) # Assess complexity using token length (as a simple metric) complexity_score = min(len(question_doc) / 20, 1) # Normalize to 0-1 # Assess Spelling correctness misspelled = spell.unknown(question.split()) spelling_correctness = 1 - (len(misspelled) / len(question.split())) # Normalize to 0-1 # Calculate overall score (you can adjust weights as needed) overall_score = ( 0.4 * relevance_score + 0.4 * complexity_score + 0.2 * spelling_correctness ) return overall_score, relevance_score, complexity_score, spelling_correctness def main(): # Streamlit interface st.title(":blue[Question Generator System]") session_id = get_session_id() state = initialize_state(session_id) with st.sidebar: show_info = st.toggle('Show Info',False) if show_info: display_info() st.subheader("Customization Options") # Customization options input_type = st.radio("Select Input Preference", ("Text Input","Upload PDF")) with st.expander("Choose the Additional Elements to show"): show_context = st.checkbox("Context",True) show_answer = st.checkbox("Answer",True) show_options = st.checkbox("Options",False) show_entity_link = st.checkbox("Entity Link For Wikipedia",True) show_qa_scores = st.checkbox("QA Score",False) num_beams = st.slider("Select number of beams for question generation", min_value=2, max_value=10, value=2) context_window_size = st.slider("Select context window size (number of sentences before and after)", min_value=1, max_value=5, value=1) num_questions = st.slider("Select number of questions to generate", min_value=1, max_value=1000, value=5) col1, col2 = st.columns(2) with col1: extract_all_keywords = st.toggle("Extract Max Keywords",value=False) with col2: enable_feedback_mode = st.toggle("Enable Feedback Mode",False) text = None if input_type == "Text Input": text = st.text_area("Enter text here:", value="Joe Biden, the current US president is on a weak wicket going in for his reelection later this November against former President Donald Trump.", help="Enter or paste your text here") elif input_type == "Upload PDF": file = st.file_uploader("Upload PDF Files") if file is not None: try: text = extract_all_text_from_pdf(file) # text = get_pdf_text(file) except Exception as e: st.error(f"Error reading PDF file: {str(e)}") text = None if text: text = clean_text(text) generate_questions_button = st.button("Generate Questions") st.markdown('Above is the generate questions button', unsafe_allow_html=True) # if generate_questions_button: if generate_questions_button and text: start_time = time.time() with st.spinner("Generating questions..."): try: state['generated_questions'] = asyncio.run(generate_questions_async(text, num_questions, context_window_size, num_beams, extract_all_keywords)) if not state['generated_questions']: st.warning("No questions were generated. The text might be too short or lack suitable content.") else: st.success(f"Successfully generated {len(state['generated_questions'])} questions!") except QuestionGenerationError as e: st.error(f"An error occurred during question generation: {str(e)}") except Exception as e: st.error(f"An unexpected error occurred: {str(e)}") print("\n\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n\n") data = get_state(session_id) print(data) end_time = time.time() print(f"Time Taken to generate: {end_time-start_time}") set_state(session_id, 'generated_questions', state['generated_questions']) # sort question based on their quality score state['generated_questions'] = sorted(state['generated_questions'],key = lambda x: x['overall_score'], reverse=True) # Display generated questions if state['generated_questions']: st.header("Generated Questions:",divider='blue') for i, q in enumerate(state['generated_questions']): st.subheader(body=f":orange[Q{i+1}:] {q['question']}") if show_context is True: st.write(f"**Context:** {q['context']}") if show_answer is True: st.write(f"**Answer:** {q['answer']}") if show_options is True: st.write(f"**Options:**") for j, option in enumerate(q['options']): st.write(f"{chr(65+j)}. {option}") if show_entity_link is True: linked_entity = entity_linking(q['answer']) if linked_entity: st.write(f"**Entity Link:** {linked_entity}") if show_qa_scores is True: m1,m2,m3,m4 = st.columns([1.7,1,1,1]) m1.metric("Overall Quality Score", value=f"{q['overall_score']:,.2f}") m2.metric("Relevance Score", value=f"{q['relevance_score']:,.2f}") m3.metric("Complexity Score", value=f"{q['complexity_score']:,.2f}") m4.metric("Spelling Correctness", value=f"{q['spelling_correctness']:,.2f}") # q['context'] = st.text_area(f"Edit Context {i+1}:", value=q['context'], key=f"context_{i}") if enable_feedback_mode: q['question'] = st.text_input(f"Edit Question {i+1}:", value=q['question'], key=f"question_{i}") q['rating'] = st.select_slider(f"Rate this question (1-5)", options=[1, 2, 3, 4, 5], key=f"rating_{i}") if st.button(f"Submit Feedback for Question {i+1}", key=f"submit_{i}"): feedback_file=save_feedback(q['question'], q['answer'], q['rating'], q['options'], q['context']) st.success(f"Feedback submitted for Question {i+1}") pswd = st.secrets['EMAIL_PASSWORD'] send_email_with_attachment( email_subject='feedback from QGen', email_body='Please find the attached feedback JSON file.', recipient_emails=['apjc01unique@gmail.com', 'channingfisher7@gmail.com'], sender_email='apjc01unique@gmail.com', sender_password=pswd, attachment_path=feedback_file) st.write("Feedback sent to admin") st.write("---") # Export buttons # if st.session_state.generated_questions: if state['generated_questions']: with st.sidebar: csv_data = export_to_csv(state['generated_questions']) st.download_button(label="Download CSV", data=csv_data, file_name='questions.csv', mime='text/csv') pdf_data = export_to_pdf(state['generated_questions']) st.download_button(label="Download PDF", data=pdf_data, file_name='questions.pdf', mime='application/pdf') with st.expander("View Visualizations"): questions = [tpl['question'] for tpl in state['generated_questions']] overall_scores = [tpl['overall_score'] for tpl in state['generated_questions']] st.subheader('WordCloud of Questions',divider='rainbow') display_word_cloud(questions) st.subheader('Overall Scores',divider='violet') overall_scores = pd.DataFrame(overall_scores,columns=['Overall Scores']) st.line_chart(overall_scores) # View Feedback Statistics with st.expander("View Feedback Statistics"): feedback_file = 'question_feedback.json' if os.path.exists(feedback_file): with open(feedback_file, 'r') as f: feedback_data = json.load(f) st.subheader("Feedback Statistics") # Calculate average rating ratings = [feedback['rating'] for feedback in feedback_data] avg_rating = sum(ratings) / len(ratings) if ratings else 0 st.write(f"Average Question Rating: {avg_rating:.2f}") # Show distribution of ratings rating_counts = {i: ratings.count(i) for i in range(1, 6)} st.bar_chart(rating_counts) # Show some highly rated questions st.subheader("Highly Rated Questions") sorted_feedback = sorted(feedback_data, key=lambda x: x['rating'], reverse=True) top_questions = sorted_feedback[:5] for feedback in top_questions: st.write(f"Question: {feedback['question']}") st.write(f"Answer: {feedback['answer']}") st.write(f"Rating: {feedback['rating']}") st.write("---") else: st.write("No feedback data available yet.") print("********************************************************************************") if __name__ == '__main__': try: main() except Exception as e: st.error(f"An unexpected error occurred: {str(e)}") st.error("Please try refreshing the page. If the problem persists, contact support.")