momenaca's picture
add features to ease hackathon
f07b5e8
raw
history blame
7.88 kB
import gradio as gr
import os
import yaml
from langchain.prompts.chat import ChatPromptTemplate
from huggingface_hub import hf_hub_download
from spinoza_project.source.frontend.utils import make_html_source
from spinoza_project.source.backend.prompt_utils import (
to_chat_instruction,
SpecialTokens,
)
from spinoza_project.source.backend.get_prompts import get_qa_prompts
from spinoza_project.source.backend.document_store import pickle_to_document_store
def get_config():
if os.getenv("EKI_OPENAI_LLM_DEPLOYMENT_NAME"):
with open("./spinoza_project/config.yaml") as f:
return yaml.full_load(f)
else:
with open("./spinoza_project/config_public.yaml") as f:
return yaml.full_load(f)
def get_prompts(config):
prompts = {}
for source in config["prompt_naming"]:
with open(f"./spinoza_project/prompt_{source}.yaml") as f:
prompts[source] = yaml.full_load(f)
return prompts
def set_prompts(prompts, config):
chat_qa_prompts, chat_reformulation_prompts = ({}, {})
for source, prompt in prompts.items():
chat_qa_prompt, chat_reformulation_prompt = get_qa_prompts(config, prompt)
chat_qa_prompts[source] = chat_qa_prompt
chat_reformulation_prompts[source] = chat_reformulation_prompt
return chat_qa_prompts, chat_reformulation_prompts
def get_assets():
with open("./assets/style.css", "r") as f:
css = f.read()
with open("./assets/source_information.md", "r") as f:
source_information = f.read()
return css, source_information
def get_qdrants(config):
qdrants = {
tab: pickle_to_document_store(
hf_hub_download(
repo_id="SpinozaProject/spinoza-database",
filename=f"database_{tab}.pickle",
repo_type="dataset",
)
)
for tab in config["prompt_naming"]
if tab in ["Science", "Loi", "Organismes publics", "ADEME"]
}
return qdrants
def get_qdrants_public(config):
qdrants = {
tab: pickle_to_document_store(
hf_hub_download(
repo_id=config["prompt_naming"],
filename=f"database_{tab}.pickle",
repo_type="dataset",
)
)
for tab in config["prompt_naming"]
if tab not in ["Science", "Loi", "Organismes publics", "ADEME", "Presse", "AFP"]
}
return qdrants
def get_theme():
return gr.themes.Base(
primary_hue="blue",
secondary_hue="red",
font=[
gr.themes.GoogleFont("Poppins"),
"ui-sans-serif",
"system-ui",
"sans-serif",
],
)
def get_init_prompt():
return """
Bonjour, je suis Spinoza, un assistant conversationnel expert sur le climat conçu pour vous aider dans votre parcours journalistique. Je répondrai à vos questions en lien avec le climat en me basant **sur les sources fournies**.
⚠️ Limitations
*Veuillez noter que ce système de questionnement est à un stade précoce, il n'est pas parfait et peut parfois donner des réponses non pertinentes. Si vous n'êtes pas satisfait de la réponse, veuillez poser une question plus spécifique ou signaler vos commentaires pour nous aider à améliorer le système.*
Que voulez-vous apprendre ?
"""
def get_synthesis_prompt(config):
special_tokens = SpecialTokens(config)
with open(f"./spinoza_project/prompt_Spinoza.yaml", "r") as f:
synthesis_template = f.read()
synthesis_prompt = to_chat_instruction(synthesis_template, special_tokens)
synthesis_prompt_template = ChatPromptTemplate.from_messages([synthesis_prompt])
return synthesis_prompt_template
def zip_longest_fill(*args, fillvalue=None):
# zip_longest('ABCD', 'xy', fillvalue='-') --> Ax By C- D-
iterators = [iter(it) for it in args]
num_active = len(iterators)
if not num_active:
return
cond = True
fillvalues = [fillvalue] * len(iterators)
while cond:
values = []
for i, it in enumerate(iterators):
try:
value = next(it)
if not value:
value = next(it)
except StopIteration:
value = fillvalues[i]
values.append(value)
new_cond = False
for i, elt in enumerate(values):
if elt != fillvalues[i]:
new_cond = True
cond = new_cond
fillvalues = values.copy()
yield tuple(values)
def start_agents():
gr.Info(message="Les agents et Spinoza démarent leurs analyses...", duration=3)
return [
(
None,
"J'attends que tous les agents aient terminé pour générer une réponse...",
)
]
def end_agents():
gr.Info(
message="Les agents et Spinoza ont fini de répondre à votre question",
duration=3,
)
def next_call():
return
def format_question(question):
return f"{question}"
def parse_question(question):
x = question.replace("<p>", "").replace("</p>\n", "")
if "### " in x:
return x.split("### ")[1]
return x
def reformulate(llm, chat_reformulation_prompts, question, tab, config):
if tab in list(config["tabs"].keys()):
return llm.stream(
chat_reformulation_prompts[config["source_mapping"][tab]],
{"question": parse_question(question)},
)
else:
return iter([None] * 5)
def add_question(question):
return question
def answer(llm, chat_qa_prompts, question, source, tab, config):
if tab in list(config["tabs"].keys()):
if len(source) < 10:
return iter(["Aucune source trouvée, veuillez reformuler votre question"])
else:
return llm.stream(
chat_qa_prompts[config["source_mapping"][tab]],
{
"question": parse_question(question),
"sources": source.replace("<p>", "").replace("</p>\n", ""),
},
)
else:
return iter([None] * 5)
def get_sources(questions, qdrants, bdd_presse, bdd_afp, config):
k = config["num_document_retrieved"]
min_similarity = config["min_similarity"]
text, formated = [], []
for i, (question, tab) in enumerate(zip(questions, list(config["tabs"].keys()))):
sources = (
bdd_presse.similarity_search_with_relevance_scores(
question.replace("<p>", "").replace("</p>\n", ""), k=k
)
if tab == "Presse"
else (
bdd_afp.similarity_search_with_relevance_scores(
question.replace("<p>", "").replace("</p>\n", ""), k=k
)
if tab == "AFP"
else qdrants[
config["source_mapping"][tab]
].similarity_search_with_relevance_scores(
config["query_preprompt"]
+ question.replace("<p>", "").replace("</p>\n", ""),
k=k,
)
)
)
sources = [(doc, score) for doc, score in sources if score >= min_similarity]
formated.extend(
[
make_html_source(source[0], j, source[1], config)
for j, source in zip(range(k * i + 1, k * (i + 1) + 1), sources)
]
)
text.extend(
[
"\n\n".join(
[
f"Doc {str(j)} with source type {source[0].metadata.get('file_source_type')}:\n"
+ source[0].page_content
for j, source in zip(range(k * i + 1, k * (i + 1) + 1), sources)
]
)
]
)
formated = "".join(formated)
return formated, text