Spaces:
Sleeping
Sleeping
| import os | |
| import requests | |
| import pandas as pd | |
| from typing import Annotated | |
| from typing_extensions import TypedDict | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langgraph.graph import StateGraph, START, END | |
| from langgraph.graph.message import add_messages | |
| from langgraph.prebuilt import ToolNode | |
| from langchain_core.tools import tool | |
| from langchain_core.messages import AIMessage, ToolMessage, HumanMessage, SystemMessage | |
| from smolagents import DuckDuckGoSearchTool | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import wikipedia | |
| import pandas as pd | |
| # (Keep Constants as is) | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # --- Basic Agent Definition --- | |
| # ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------ | |
| class OrderState(TypedDict): | |
| """State representing the customer's order conversation.""" | |
| messages: Annotated[list, add_messages] | |
| order: list[str] | |
| finished: bool | |
| # System instruction for the Agent | |
| SYSINT = ( | |
| "system", | |
| "You are a general AI assistant. I will ask you a question." | |
| "The question requires a tool to solve. You must attempt to use at least one of the available tools before returning an answer." | |
| "Report your thoughts, and finish your answer with the following template: " | |
| "FINAL ANSWER: [YOUR FINAL ANSWER]. YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings." | |
| "If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise." | |
| "If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise." | |
| "If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string." | |
| "If a tool required for task completion is not functioning, return 0." | |
| ) | |
| WELCOME_MSG = "Welcome to my general-purpose AI agent. Type `q` to quit. How shall I fail to serve you today?" | |
| def wikipedia_search_tool(title: str) -> str: | |
| """Provides an excerpt from a Wikipedia article with the given title.""" | |
| try: | |
| page = wikipedia.page(title, auto_suggest=False) | |
| return page.content[:3000] | |
| except Exception as e: | |
| return f"Error during processing: {e}" | |
| def media_tool(file_path: str) -> str: | |
| """Used for deciphering video and audio files.""" | |
| return "This tool hasn't been implemented yet. Please return 0 if the task cannot be solved without knowing the contents of this file." | |
| def internet_search_tool(search_query: str) -> str: | |
| """Does a google search with using the input as the search query. Returns a long batch of textual information related to the query.""" | |
| try: | |
| search_tool = DuckDuckGoSearchTool() | |
| result = search_tool(search_query) | |
| return result | |
| except Exception as e: | |
| return f"Error during processing: {e}" | |
| def webscraper_tool(url: str) -> str: | |
| """Returns the page's html content from the input url.""" | |
| try: | |
| response = requests.get(url, stream=True) | |
| if response.status_code == 200: | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| html_text = soup.get_text() | |
| return html_text | |
| else: | |
| return f"Failed to retrieve the webpage. Status code: {response.status_code}" | |
| except Exception as e: | |
| return f"Error during processing: {e}" | |
| def read_excel_tool(file_path: str) -> str: | |
| """Returns the contents of an Excel file as a Pandas dataframe.""" | |
| try: | |
| df = pd.read_excel(file_path, engine = "openpyxl") | |
| return df.to_string(index=False) | |
| except Exception as e: | |
| return f"Error during processing: {e}" | |
| class AgenticAI: | |
| def __init__(self): | |
| # initialize LLM | |
| self.llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash") | |
| # prepare tool list | |
| self.tools = [ | |
| wikipedia_search_tool, | |
| media_tool, | |
| internet_search_tool, | |
| webscraper_tool, | |
| read_excel_tool, | |
| ] | |
| # bind tools | |
| self.llm_with_tools = self.llm.bind_tools(self.tools) | |
| # standalone ToolNode for any non-interactive tools (none here) | |
| self.tool_node = ToolNode([]) | |
| # build state graph | |
| self.graph = StateGraph(OrderState) | |
| self.graph.add_node("agent", self._agent_node) | |
| self.graph.add_node("interactive_tools", self._interactive_tools_node) | |
| self.graph.add_node("human", self._human_node) | |
| # routing | |
| self.graph.add_conditional_edges("agent", self._maybe_route_to_tools) | |
| self.graph.add_conditional_edges("human", self._maybe_exit_human_node) | |
| self.graph.add_edge("interactive_tools", "agent") | |
| self.graph.add_edge(START, "human") | |
| self.chat_graph = self.graph.compile() | |
| def ask(self, human_input: str) -> str: | |
| """ | |
| Take a single human input, run through the full agent+tool graph, | |
| return the AI's reply, and discard any stored human/chat history. | |
| """ | |
| # build initial messages | |
| init_msgs = [ | |
| SystemMessage(content=SYSINT), | |
| HumanMessage(content=human_input) | |
| ] | |
| state = {"messages": init_msgs, "order": [], "finished": False} | |
| try: | |
| final_state = self.chat_graph.invoke(state, {"recursion_limit": 15}) | |
| # last message should be from the AI | |
| ai_msg = final_state["messages"][-1] | |
| return ai_msg.content | |
| except Exception as e: | |
| return f"Error during processing: {e}" | |
| # --- internal node functions (mirror your original code) --- | |
| def _agent_node(self, state: OrderState) -> OrderState: | |
| print(f"Messagelist sent to agent node: {[msg.content for msg in state.get('messages', [])]}") | |
| defaults = {"order": [], "finished": False} | |
| msgs = state.get("messages", []) | |
| if not msgs: | |
| # no prior messages: seed with system + empty AI message | |
| return {**defaults, "messages": [SystemMessage(SYSINT), AIMessage(content="")]} | |
| try: | |
| # always ensure system prompt is first | |
| msgs = [SystemMessage(SYSINT)] + msgs | |
| new_output = self.llm_with_tools.invoke(msgs) | |
| return {**defaults, "messages": [new_output]} | |
| except Exception as e: | |
| return {**defaults, "messages": [AIMessage(content=f"I'm having trouble: {e}")]} | |
| def _interactive_tools_node(self, state: OrderState) -> OrderState: | |
| tool_msg = state["messages"][-1] | |
| outbound_msgs = [] | |
| for tool_call in tool_msg.tool_calls: | |
| tool_name = tool_call["name"] | |
| tool_args = tool_call["args"] | |
| if tool_name == "wikipedia_search_tool": | |
| try: | |
| print(f"called wikipedia with {str(tool_args)}") | |
| page = wikipedia.page(tool_args.get("title"), auto_suggest=False) | |
| response = page.content[:3000] | |
| except Exception as e: | |
| response = e | |
| elif tool_name == "media_tool": | |
| try: | |
| print(f"called media with {str(tool_args)}") | |
| response = "This tool hasn't been implemented yet. Please return 0 if the task cannot be solved without knowing the contents of this file." | |
| except Exception as e: | |
| response = e | |
| elif tool_name == "internet_search_tool": | |
| try: | |
| print(f"called internet with {str(tool_args)}") | |
| question = tool_args.get("search_query") | |
| search_tool = DuckDuckGoSearchTool() | |
| response = search_tool(question)[:3000] | |
| except Exception as e: | |
| response = e | |
| elif tool_name == "webscraper_tool": | |
| try: | |
| print(f"called webscraper with {str(tool_args)}") | |
| url = tool_args.get("url") | |
| response = requests.get(url, stream=True) | |
| if response.status_code == 200: | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| html_text = soup.get_text() | |
| response = html_text | |
| else: | |
| response = f"Failed to retrieve the webpage. Status code: {response.status_code}" | |
| except Exception as e: | |
| response = e | |
| elif tool_name == "read_excel_tool": | |
| try: | |
| print(f"called excel with {str(tool_args)}") | |
| path = tool_args.get("file_path") | |
| df = pd.read_excel(path, engine = "openpyxl") | |
| response = df | |
| except Exception as e: | |
| response = e | |
| else: | |
| response = f'Unknown tool call: {tool_name}' | |
| outbound_msgs.append( | |
| ToolMessage( | |
| content=response, | |
| name=tool_name, | |
| tool_call_id=tool_call["id"], | |
| ) | |
| ) | |
| return {"messages": outbound_msgs, "order": state.get("order", []), "finished": False} | |
| def _human_node(self, state: OrderState) -> OrderState: | |
| print(f"Messagelist sent to human node: {[msg.content for msg in state.get('messages', [])]}") | |
| last = state["messages"][-1] | |
| if isinstance(last, HumanMessage) and last.content.strip().lower() in {"q", "quit", "exit", "goodbye"}: | |
| state["finished"] = True | |
| return state | |
| def _maybe_route_to_tools(self, state: OrderState) -> str: | |
| msgs = state.get("messages", []) | |
| if state.get("finished"): | |
| print("from agent GOTO End node") | |
| return END | |
| last = msgs[-1] | |
| if hasattr(last, "tool_calls") and last.tool_calls: | |
| print("from agent GOTO tools node") | |
| # go run interactive tools | |
| return "interactive_tools" | |
| # else, end conversation | |
| print("tool call failed, quitting") | |
| return END | |
| def _maybe_exit_human_node(self, state: OrderState) -> str: | |
| if state.get("finished"): | |
| print("from human GOTO End node") | |
| return END | |
| last = state["messages"][-1] | |
| # if AIMessage then end after one turn | |
| print("from human GOTO agent node or quit") | |
| return END if isinstance(last, AIMessage) else "agent" |