Spaces:
Running
Running
import os | |
import shutil | |
import gradio as gr | |
# Langchain imports | |
from langchain_community.document_loaders import DirectoryLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.embeddings import OpenAIEmbeddings | |
from langchain_community.vectorstores import Chroma | |
from langchain.chains import ConversationalRetrievalChain | |
from langchain_community.llms import OpenAI | |
# from langchain_openai import OpenAIEmbeddings, OpenAI # Alternativa si tienes langchain-openai | |
# Configuraci贸n de API Key | |
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') | |
if not OPENAI_API_KEY: | |
raise ValueError("La variable de entorno OPENAI_API_KEY no est谩 configurada. " | |
"Por favor, config煤rala como un Secret en tu Hugging Face Space.") | |
# Directorio de la aplicaci贸n actual | |
APP_DIR = os.path.dirname(os.path.abspath(__file__)) | |
logo_filename = "logo.jpeg" # Solo el nombre del archivo para Gradio | |
full_logo_path_for_os_check = os.path.join(APP_DIR, logo_filename) # Ruta completa para os.path.exists | |
# Directorio persistente para ChromaDB (dentro del directorio de la app) | |
PERSIST_DIRECTORY_NAME = "chroma_db_persistent_data" | |
persist_directory = os.path.join(APP_DIR, PERSIST_DIRECTORY_NAME) | |
# Asegurarse de que el directorio persistente exista | |
try: | |
os.makedirs(persist_directory, exist_ok=True) | |
print(f"Directorio persistente '{persist_directory}' listo.") | |
except OSError as e: | |
print(f"CRITICAL: Error al crear el directorio persistente '{persist_directory}': {e}") | |
raise | |
def initialize_vectorstore(documents=None, force_recreate=False, db_path=persist_directory): | |
"""Inicializa o carga la base de datos vectorial Chroma.""" | |
try: | |
os.makedirs(db_path, exist_ok=True) | |
if force_recreate and os.path.exists(db_path) and os.listdir(db_path): | |
print(f"Borrando directorio de ChromaDB existente: {db_path}") | |
shutil.rmtree(db_path) | |
os.makedirs(db_path, exist_ok=True) | |
embeddings = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY) | |
db_file_path = os.path.join(db_path, "chroma.sqlite3") | |
if not force_recreate and os.path.exists(db_file_path): | |
print(f"Intentando cargar base de datos vectorial existente desde: {db_path}") | |
vectorstore = Chroma(persist_directory=db_path, embedding_function=embeddings) | |
print("Base de datos vectorial cargada exitosamente.") | |
elif documents: | |
print(f"Creando nueva base de datos vectorial desde {len(documents)} documentos en: {db_path}") | |
vectorstore = Chroma.from_documents(documents, embeddings, persist_directory=db_path) | |
print("Nueva base de datos vectorial creada y guardada.") | |
else: | |
print(f"Inicializando una base de datos vectorial vac铆a en: {db_path}. " | |
"No se proporcionaron documentos y no se encontr贸 una base existente (o se forz贸 la recreaci贸n).") | |
vectorstore = Chroma(persist_directory=db_path, embedding_function=embeddings) | |
print("Base de datos vectorial vac铆a inicializada.") | |
return vectorstore | |
except Exception as e: | |
print(f"Error cr铆tico durante la inicializaci贸n de la base de datos vectorial en '{db_path}': {e}") | |
raise | |
# --- Carga y procesamiento de documentos --- | |
documents_for_vectorstore = None | |
db_file_check = os.path.join(persist_directory, "chroma.sqlite3") | |
if not os.path.exists(db_file_check): | |
print(f"Base de datos vectorial no encontrada en '{db_file_check}'. Procesando documentos...") | |
DOCUMENTS_DIR = os.path.join(APP_DIR, 'archivos') | |
if not os.path.isdir(DOCUMENTS_DIR): | |
print(f"ADVERTENCIA: El directorio de documentos '{DOCUMENTS_DIR}' no existe. " | |
"El chatbot podr铆a no tener conocimiento espec铆fico.") | |
else: | |
pdf_loader = DirectoryLoader(DOCUMENTS_DIR, glob="**/*.pdf", show_progress=True, use_multithreading=True) | |
loaded_documents = pdf_loader.load() | |
if not loaded_documents: | |
print("ADVERTENCIA: No se cargaron documentos desde el directorio 'archivos'.") | |
else: | |
print(f"Cargados {len(loaded_documents)} documentos.") | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200, length_function=len) | |
documents_for_vectorstore = text_splitter.split_documents(loaded_documents) | |
print(f"Documentos divididos en {len(documents_for_vectorstore)} chunks.") | |
vectorstore = initialize_vectorstore(documents=documents_for_vectorstore, force_recreate=True) | |
else: | |
print(f"Base de datos vectorial existente encontrada en '{persist_directory}'. Cargando...") | |
vectorstore = initialize_vectorstore(documents=None, force_recreate=False) | |
# Crear cadena de recuperaci贸n conversacional | |
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3}) | |
qa_chain = ConversationalRetrievalChain.from_llm( | |
OpenAI(temperature=0.5, openai_api_key=OPENAI_API_KEY), | |
retriever, | |
return_source_documents=True | |
) | |
# --- Interfaz de usuario con Gradio --- | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
# Mostrar logo | |
img1 = gr.Image("logo.jpeg") | |
img1.css = "max-width: 200px; max-height: 200px; display: block; margin: 0 auto;" | |
gr.Markdown( | |
""" | |
<div style='text-align: center;'> | |
<h1>CHATBOT EDUCONVIVE</h1> | |
<p><em>Convivencia escolar, respeto y resoluci贸n pac铆fica de conflictos.</em></p> | |
<p>Escribe tu caso y recibe indicaciones sobre la ruta de acci贸n para su soluci贸n.</p> | |
</div> | |
""" | |
) | |
gradio_chat_history_state = gr.State([]) | |
chatbot_display = gr.Chatbot( | |
value=[], | |
elem_id="chatbot", | |
type="messages", | |
height=500, | |
# Para avatar_images, si el logo est谩 en el mismo dir que app.py, | |
# solo el nombre del archivo deber铆a funcionar. | |
avatar_images=(None, logo_filename if os.path.exists(full_logo_path_for_os_check) else None) | |
) | |
with gr.Row(): | |
msg_input = gr.Textbox( | |
placeholder="Escribe tu pregunta o caso aqu铆...", | |
show_label=False, | |
scale=7 | |
) | |
submit_button = gr.Button("Enviar", variant="primary", scale=1) | |
clear_button = gr.ClearButton([msg_input, chatbot_display, gradio_chat_history_state], value="Limpiar Chat") | |
def respond(user_message, current_gradio_history): | |
if not user_message.strip(): | |
return current_gradio_history, current_gradio_history | |
print(f"Mensaje del usuario: {user_message}") | |
langchain_chat_history_tuples = [] | |
temp_user_msg = None | |
for msg_obj in current_gradio_history: | |
if msg_obj["role"] == "user": | |
temp_user_msg = msg_obj["content"] | |
elif msg_obj["role"] == "assistant" and temp_user_msg is not None: | |
langchain_chat_history_tuples.append((temp_user_msg, msg_obj["content"])) | |
temp_user_msg = None | |
result = qa_chain.invoke({ | |
"question": user_message, | |
"chat_history": langchain_chat_history_tuples | |
}) | |
answer = result["answer"] | |
updated_gradio_history = current_gradio_history + [ | |
{"role": "user", "content": user_message}, | |
{"role": "assistant", "content": answer} | |
] | |
return updated_gradio_history, updated_gradio_history | |
def clear_user_input(): | |
return "" | |
# Conectar eventos | |
msg_input.submit( | |
respond, | |
[msg_input, gradio_chat_history_state], | |
[chatbot_display, gradio_chat_history_state] | |
).then(clear_user_input, [], [msg_input]) | |
submit_button.click( | |
respond, | |
[msg_input, gradio_chat_history_state], | |
[chatbot_display, gradio_chat_history_state] | |
).then(clear_user_input, [], [msg_input]) | |
# Lanzar la demo | |
if __name__ == "__main__": | |
demo.queue().launch(debug=True, server_name="0.0.0.0", server_port=7860, share=False) |