from queue import SimpleQueue from dotenv import load_dotenv import re from langchain.callbacks.base import BaseCallbackHandler job_done = object() # signals the processing is done class StreamingGradioCallbackHandler(BaseCallbackHandler): """Callback handler for streaming. Only works with LLMs that support streaming.""" def __init__(self, q): self.q = q def on_llm_start(self, serialized, prompts, **kwargs) -> None: """Run when LLM starts running.""" while not self.q.empty(): try: self.q.get(block=False) except SimpleQueue.empty: continue def on_llm_new_token(self, token, **kwargs) -> None: """Run on new LLM token. Only available when streaming is enabled.""" self.q.put(token) def on_llm_end(self, response, **kwargs) -> None: """Run when LLM ends running.""" self.q.put(job_done) def on_llm_error(self, error, **kwargs) -> None: """Run when LLM errors.""" self.q.put(job_done) def add_gradio_streaming(llm): q = SimpleQueue() job_done = object() # signals the processing is done llm.callbacks = [StreamingGradioCallbackHandler(q)] return llm, q def gradio_stream(llm, prompt): thread = Thread(target=llm.predict, kwargs={"text": prompt}) thread.start() text = "" while True: next_token = q.get(block=True) # Blocks until an input is available if next_token is job_done: break text += next_token time.sleep(0.03) yield text thread.join() def get_source_link(metadata): return metadata["file_url"] + f"#page={metadata['content_page_number'] + 1}" def make_html_source(source, i, score, config): meta = source.metadata if meta["file_source_type"] == "AFP": return f"""
{source.page_content}
{source.page_content}
{source.page_content}
{source.page_content.replace(config["passage_preprompt"], "")}
{source.page_content.replace(config["passage_preprompt"], "")}