|
import os |
|
from dotenv import load_dotenv |
|
from supabase import create_client |
|
from supabase.client import Client |
|
from langgraph.graph import START, StateGraph, MessagesState |
|
from langgraph.prebuilt import tools_condition, ToolNode |
|
from langchain_core.messages import SystemMessage, HumanMessage |
|
from langchain_core.tools import tool |
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
from langchain_groq import ChatGroq |
|
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint, HuggingFaceEmbeddings |
|
from langchain_community.tools.tavily_search import TavilySearchResults |
|
from langchain_community.document_loaders import WikipediaLoader, ArxivLoader |
|
from langchain_community.vectorstores import SupabaseVectorStore |
|
from langchain.tools.retriever import create_retriever_tool |
|
|
|
load_dotenv() |
|
|
|
|
|
SUPABASE_URL = os.environ.get("SUPABASE_URL") |
|
SUPABASE_SERVICE_KEY = os.environ.get("SUPABASE_SERVICE_KEY") |
|
|
|
print(f"SUPABASE_URL: {SUPABASE_URL[:10]}..." if SUPABASE_URL else "SUPABASE_URL not set") |
|
print(f"SUPABASE_SERVICE_KEY: {SUPABASE_SERVICE_KEY[:10]}..." if SUPABASE_SERVICE_KEY else "SUPABASE_SERVICE_KEY not set") |
|
|
|
|
|
def get_supabase_client(): |
|
if not SUPABASE_URL or not SUPABASE_SERVICE_KEY: |
|
raise ValueError("Supabase environment variables are missing.") |
|
return create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) |
|
|
|
|
|
@tool |
|
def multiply(a: int, b: int) -> int: |
|
"""Multiply two integers.""" |
|
return a * b |
|
|
|
@tool |
|
def add(a: int, b: int) -> int: |
|
"""Add two integers.""" |
|
return a + b |
|
|
|
@tool |
|
def subtract(a: int, b: int) -> int: |
|
"""Subtract b from a.""" |
|
return a - b |
|
|
|
@tool |
|
def divide(a: int, b: int) -> float: |
|
"""Divide a by b.""" |
|
if b == 0: |
|
raise ValueError("Cannot divide by zero.") |
|
return a / b |
|
|
|
@tool |
|
def modulus(a: int, b: int) -> int: |
|
"""Modulo operation.""" |
|
return a % b |
|
|
|
@tool |
|
def wiki_search(query: str) -> str: |
|
"""Search Wikipedia for the query and return top results.""" |
|
docs = WikipediaLoader(query=query, load_max_docs=2).load() |
|
return "\n\n---\n\n".join([doc.page_content for doc in docs]) |
|
|
|
@tool |
|
def web_search(query: str) -> str: |
|
"""Search the web and return top results.""" |
|
docs = TavilySearchResults(max_results=3).invoke(query=query) |
|
return "\n\n---\n\n".join([doc.page_content for doc in docs]) |
|
|
|
@tool |
|
def arvix_search(query: str) -> str: |
|
"""Search Arxiv for the query and return excerpts.""" |
|
docs = ArxivLoader(query=query, load_max_docs=3).load() |
|
return "\n\n---\n\n".join([doc.page_content[:1000] for doc in docs]) |
|
|
|
|
|
tools = [multiply, add, subtract, divide, modulus, wiki_search, web_search, arvix_search] |
|
|
|
with open("system_prompt.txt", "r", encoding="utf-8") as f: |
|
system_prompt = f.read().strip() |
|
|
|
if not system_prompt: |
|
print("Warning: system_prompt.txt is empty. Using default system prompt.") |
|
system_prompt = "You are a helpful assistant." |
|
|
|
sys_msg = SystemMessage(content=system_prompt) |
|
|
|
|
|
def build_graph(provider: str = "groq"): |
|
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") |
|
supabase = get_supabase_client() |
|
vector_store = SupabaseVectorStore( |
|
client=supabase, |
|
embedding=embeddings, |
|
table_name="documents", |
|
query_name="match_documents_langchain", |
|
) |
|
retriever_tool = create_retriever_tool( |
|
retriever=vector_store.as_retriever(), |
|
name="Question Search", |
|
description="A tool to retrieve similar questions from a vector store.", |
|
) |
|
|
|
if provider == "google": |
|
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0) |
|
elif provider == "groq": |
|
llm = ChatGroq(model="qwen-qwq-32b", temperature=0) |
|
elif provider == "huggingface": |
|
llm = ChatHuggingFace( |
|
llm=HuggingFaceEndpoint( |
|
url="https://api-inference.huggingface.co/models/Meta-DeepLearning/llama-2-7b-chat-hf", |
|
temperature=0, |
|
), |
|
) |
|
else: |
|
raise ValueError("Invalid provider specified") |
|
|
|
llm_with_tools = llm.bind_tools(tools) |
|
|
|
def assistant(state: MessagesState): |
|
try: |
|
print("Assistant received messages:") |
|
for m in state["messages"]: |
|
print(f"- {m.__class__.__name__}: {m.content[:100]}") |
|
|
|
result = llm_with_tools.invoke(state["messages"]) |
|
print("LLM output message:") |
|
if hasattr(result, "content"): |
|
print(result.content[:500]) |
|
else: |
|
print(result) |
|
|
|
if not result or not getattr(result, "content", None): |
|
print("Warning: LLM returned empty result or no content.") |
|
return {"messages": [HumanMessage(content="Sorry, I couldn't generate an answer.")]} |
|
|
|
return {"messages": [result]} |
|
except Exception as e: |
|
print(f"Error invoking LLM: {e}") |
|
return {"messages": [HumanMessage(content="Sorry, I encountered an error during processing.")]} |
|
|
|
def retriever(state: MessagesState): |
|
similar = vector_store.similarity_search(state["messages"][0].content) |
|
msg = HumanMessage(content=f"Similar question reference:\n\n{similar[0].page_content}") |
|
return {"messages": [sys_msg] + state["messages"] + [msg]} |
|
|
|
graph = StateGraph(MessagesState) |
|
graph.add_node("retriever", retriever) |
|
graph.add_node("assistant", assistant) |
|
graph.add_node("tools", ToolNode(tools)) |
|
graph.add_edge(START, "retriever") |
|
graph.add_edge("retriever", "assistant") |
|
graph.add_conditional_edges("assistant", tools_condition) |
|
graph.add_edge("tools", "assistant") |
|
|
|
return graph.compile() |
|
|
|
|
|
if __name__ == "__main__": |
|
g = build_graph("groq") |
|
question = "When was Aquinas added to Wikipedia page on double effect?" |
|
output = g.invoke({"messages": [HumanMessage(content=question)]}) |
|
for msg in output["messages"]: |
|
print(f"\n[{msg.__class__.__name__}] {msg.content}\n") |
|
|