Spaces:
Runtime error
Runtime error
import streamlit as st | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from transformers import AutoTokenizer, AutoModelWithLMHead | |
import torch | |
import nltk | |
import before_run | |
#nltk.download('wordnet') | |
#nltk.download('punkt') | |
#nltk.download('brown') | |
#nltk.download('stopwords') | |
from nltk.tokenize import sent_tokenize | |
from flashtext import KeywordProcessor | |
from nltk.corpus import stopwords | |
from urllib import response | |
import requests | |
import string | |
import traceback | |
import pke | |
link = "http://127.0.0.1:8000/question" | |
summary_tokenizer = AutoTokenizer.from_pretrained("t5-base") | |
summary_model = AutoModelWithLMHead.from_pretrained("t5-base") | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
summary_model = summary_model.to(device) | |
question_model = AutoModelWithLMHead.from_pretrained('ramsrigouthamg/t5_squad_v1') | |
question_tokenizer = AutoTokenizer.from_pretrained('ramsrigouthamg/t5_squad_v1') | |
question_model = question_model.to(device) | |
def query(url, payload): | |
return requests.post(url, json=payload) | |
def fetch_transcript(url): | |
vid = url.split("=")[1] | |
transcript = YouTubeTranscriptApi.get_transcript(vid) | |
result = "" | |
for i in transcript: | |
result += ' ' + i['text'] | |
return result | |
def postprocesstext (content): | |
final="" | |
for sent in sent_tokenize(content): | |
sent = sent.capitalize() | |
final = final +" "+sent | |
return final | |
def summarizer(text,model,tokenizer): | |
text = text.strip().replace("\n"," ") | |
text = "summarize: "+text | |
# print (text) | |
max_len = 512 | |
encoding = tokenizer.encode_plus(text,max_length=max_len, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) | |
input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] | |
outs = model.generate(input_ids=input_ids, | |
attention_mask=attention_mask, | |
early_stopping=True, | |
num_beams=3, | |
num_return_sequences=1, | |
no_repeat_ngram_size=2, | |
min_length = 75, | |
max_length=300) | |
dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] | |
summary = dec[0] | |
summary = postprocesstext(summary) | |
summary= summary.strip() | |
return summary | |
def get_nouns_multipartite(content): | |
out=[] | |
try: | |
extractor = pke.unsupervised.MultipartiteRank() | |
stoplist = list(string.punctuation) | |
stoplist += ['-lrb-', '-rrb-', '-lcb-', '-rcb-', '-lsb-', '-rsb-'] | |
stoplist += stopwords.words('english') | |
extractor.load_document(input=content, stoplist=stoplist) | |
# not contain punctuation marks or stopwords as candidates. | |
pos = {'PROPN','NOUN'} | |
extractor.candidate_selection(pos=pos) | |
extractor.candidate_weighting(alpha=1.1, | |
threshold=0.75, | |
method='average') | |
keyphrases = extractor.get_n_best(n=15) | |
for val in keyphrases: | |
out.append(val[0]) | |
except: | |
out = [] | |
traceback.print_exc() | |
return out | |
def get_keywords(originaltext,summarytext,count): | |
keywords = get_nouns_multipartite(originaltext) | |
print ("keywords unsummarized: ",keywords) | |
keyword_processor = KeywordProcessor() | |
for keyword in keywords: | |
keyword_processor.add_keyword(keyword) | |
keywords_found = keyword_processor.extract_keywords(summarytext) | |
keywords_found = list(set(keywords_found)) | |
print ("keywords_found in summarized: ",keywords_found) | |
important_keywords =[] | |
for keyword in keywords: | |
if keyword in keywords_found: | |
important_keywords.append(keyword) | |
return important_keywords[:int(count)] | |
def get_question(context,answer,model,tokenizer): | |
text = "context: {} answer: {}".format(context,answer) | |
encoding = tokenizer.encode_plus(text,max_length=384, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) | |
input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] | |
outs = model.generate(input_ids=input_ids, | |
attention_mask=attention_mask, | |
early_stopping=True, | |
num_beams=5, | |
num_return_sequences=1, | |
no_repeat_ngram_size=2, | |
max_length=72) | |
dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] | |
Question = dec[0].replace("question:","") | |
Question= Question.strip() | |
return Question | |
def all(url,count): | |
transcript = fetch_transcript(url) | |
summarized_text = summarizer(transcript, summary_model, summary_tokenizer) | |
keywords = get_keywords(transcript,summarized_text,count) | |
qna = [] | |
for answer in keywords: | |
qna.append(get_question(summarized_text,answer,question_model,question_tokenizer)+' : '+answer) | |
return qna | |
def main(): | |
if 'submitted' not in st.session_state: | |
st.session_state.submitted = False | |
if 'opt' not in st.session_state: | |
st.session_state.opt = [] | |
def callback(): | |
st.session_state.submitted = True | |
st.title('QnA pair Generator') | |
url = st.text_input('Enter the Video Link') | |
count = st.text_input('Enter the number of questions you want to generate') | |
if (st.button("Submit URL", on_click=callback) and url and count) : | |
st.write("Thanks for submission !") | |
opt = all(url, count) | |
st.session_state.opt = opt | |
if st.session_state.submitted and st.session_state.opt: | |
option = st.multiselect('Select the question you want to add to database ', st.session_state.opt) | |
if option: | |
if st.button("Add question"): | |
for i in range(len(option)): | |
files = { | |
"question": option[i].split(":")[0], | |
"answer": option[i].split(":")[1] | |
} | |
response = query(link, files) | |
st.write(response.text) | |
main() | |