FirstAgent / app.py
helmo's picture
add custom tools
637de75 verified
from smolagents import CodeAgent,DuckDuckGoSearchTool, HfApiModel,load_tool,tool
import datetime
import requests
import pytz
import yaml
from tools.final_answer import FinalAnswerTool
from Gradio_UI import GradioUI
@tool
def get_current_time_in_timezone(timezone: str) -> str:
"""A tool that fetches the current local time in a specified timezone.
Args:
timezone: A string representing a valid timezone (e.g., 'America/New_York').
"""
try:
# Create timezone object
tz = pytz.timezone(timezone)
# Get current time in that timezone
local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")
return f"The current local time in {timezone} is: {local_time}"
except Exception as e:
return f"Error fetching time for timezone '{timezone}': {str(e)}"
# my custom tools
@tool
def convert_currency(amount: float, from_currency: str, to_currency: str) -> str:
"""A tool that converts an amount from one currency to another using current exchange rates.
Args:
amount: The amount to convert.
from_currency: The source currency code (e.g., 'USD', 'EUR', 'JPY').
to_currency: The target currency code (e.g., 'USD', 'EUR', 'JPY').
"""
try:
from_currency = from_currency.upper()
to_currency = to_currency.upper()
# Using a free currency API
response = requests.get(f"https://open.er-api.com/v6/latest/{from_currency}")
if response.status_code == 200:
data = response.json()
if data["result"] == "success":
rates = data["rates"]
if to_currency in rates:
exchange_rate = rates[to_currency]
converted_amount = amount * exchange_rate
return f"{amount} {from_currency} = {converted_amount:.2f} {to_currency} (Rate: 1 {from_currency} = {exchange_rate} {to_currency})"
else:
return f"Currency {to_currency} not found in exchange rates."
else:
return f"API error: {data.get('error-type', 'Unknown error')}"
else:
return f"Error fetching exchange rates: HTTP Status {response.status_code}"
except Exception as e:
return f"Error converting currency: {str(e)}"
@tool
def get_ip_info(ip: str = "") -> str:
"""A tool that provides information about an IP address using a free IP geolocation API.
If no IP is provided, it returns information about the user's public IP.
Args:
ip: Optional IP address (e.g., '8.8.8.8'). If empty, uses the current public IP.
"""
try:
if ip:
response = requests.get(f"https://ipapi.co/{ip}/json/")
else:
response = requests.get("https://ipapi.co/json/")
if response.status_code == 200:
data = response.json()
if "error" in data:
return f"API Error: {data['reason']}"
result = f"IP: {data.get('ip', 'N/A')}\n"
result += f"Location: {data.get('city', 'N/A')}, {data.get('region', 'N/A')}, {data.get('country_name', 'N/A')}\n"
result += f"ISP: {data.get('org', 'N/A')}\n"
result += f"Timezone: {data.get('timezone', 'N/A')}\n"
result += f"Coordinates: {data.get('latitude', 'N/A')}, {data.get('longitude', 'N/A')}"
return result
else:
return f"Error fetching IP information: HTTP Status {response.status_code}"
except Exception as e:
return f"Error getting IP information: {str(e)}"
@tool
def dictionary_lookup(word: str) -> str:
"""A tool that provides definitions and information about English words using a free dictionary API.
Args:
word: The English word to look up.
"""
try:
word = word.lower().strip()
response = requests.get(f"https://api.dictionaryapi.dev/api/v2/entries/en/{word}")
if response.status_code == 200:
data = response.json()
result = f"Definitions for '{word}':\n\n"
# Process the first entry
entry = data[0]
# Include phonetics if available
if 'phonetic' in entry and entry['phonetic']:
result += f"Pronunciation: {entry['phonetic']}\n\n"
# Process meanings
for meaning in entry.get('meanings', [])[:3]: # Limit to first 3 parts of speech
part_of_speech = meaning.get('partOfSpeech', 'unknown')
result += f"• {part_of_speech.capitalize()}:\n"
# Add definitions
for i, definition in enumerate(meaning.get('definitions', [])[:2], 1): # Limit to first 2 definitions
result += f" {i}. {definition.get('definition', 'No definition available')}\n"
# Add example if available
if 'example' in definition and definition['example']:
result += f" Example: \"{definition['example']}\"\n"
result += "\n"
# Add synonyms if available
synonyms = []
for meaning in entry.get('meanings', []):
for synonym in meaning.get('synonyms', []):
synonyms.append(synonym)
if synonyms:
result += f"Synonyms: {', '.join(synonyms[:5])}\n" # Limit to first 5 synonyms
return result
elif response.status_code == 404:
return f"Word '{word}' not found in the dictionary."
else:
return f"Error looking up word: HTTP Status {response.status_code}"
except Exception as e:
return f"Error performing dictionary lookup: {str(e)}"
@tool
def wikipedia_search(query: str, sentences: int = 3) -> str:
"""A tool that searches Wikipedia and returns a summary of the topic.
Args:
query: The topic to search for on Wikipedia.
sentences: Number of sentences to include in the summary (default: 3).
"""
try:
# Sanitize input
query = query.strip()
sentences = max(1, min(10, int(sentences))) # Ensure between 1-10 sentences
# Use Wikipedia's API
search_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={query}&format=json"
search_response = requests.get(search_url)
if search_response.status_code != 200:
return f"Error: Failed to search Wikipedia. Status code: {search_response.status_code}"
search_data = search_response.json()
search_results = search_data.get('query', {}).get('search', [])
if not search_results:
return f"No Wikipedia articles found for '{query}'."
# Get the page ID of the first result
page_id = search_results[0]['pageid']
# Get the summary using the page ID
summary_url = f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&exintro=true&explaintext=true&pageids={page_id}&format=json"
summary_response = requests.get(summary_url)
if summary_response.status_code != 200:
return f"Error: Failed to get Wikipedia summary. Status code: {summary_response.status_code}"
summary_data = summary_response.json()
page_data = summary_data.get('query', {}).get('pages', {}).get(str(page_id), {})
title = page_data.get('title', 'Unknown')
# Get the full extract
extract = page_data.get('extract', 'No summary available.')
# Split into sentences and limit
extract_sentences = extract.split('. ')
limited_extract = '. '.join(extract_sentences[:sentences])
if not limited_extract.endswith('.'):
limited_extract += '.'
return f"Wikipedia: {title}\n\n{limited_extract}\n\nSource: Wikipedia"
except Exception as e:
return f"Error searching Wikipedia: {str(e)}"
@tool
def string_utilities(action: str, text: str, additional_param: str = "") -> str:
"""A tool that performs various operations on strings without requiring internet access.
Args:
action: The operation to perform ('count', 'reverse', 'uppercase', 'lowercase', 'find', 'replace').
text: The input text to process.
additional_param: Additional parameter needed for some operations (e.g., text to find or replace).
"""
try:
if action.lower() == "count":
char_count = len(text)
word_count = len(text.split())
line_count = len(text.splitlines()) or 1
return f"Character count: {char_count}\nWord count: {word_count}\nLine count: {line_count}"
elif action.lower() == "reverse":
return f"Reversed text: {text[::-1]}"
elif action.lower() == "uppercase":
return f"Uppercase text: {text.upper()}"
elif action.lower() == "lowercase":
return f"Lowercase text: {text.lower()}"
elif action.lower() == "find":
if not additional_param:
return "Error: 'find' action requires text to search for in the additional_param."
occurrences = text.count(additional_param)
if occurrences > 0:
positions = [str(i) for i in range(len(text)) if text.startswith(additional_param, i)]
return f"Found '{additional_param}' {occurrences} times at positions: {', '.join(positions)}"
else:
return f"'{additional_param}' not found in the text."
elif action.lower() == "replace":
if not additional_param:
return "Error: 'replace' action requires 'old_text:new_text' format in the additional_param."
try:
old_text, new_text = additional_param.split(":", 1)
result = text.replace(old_text, new_text)
return f"Text after replacing '{old_text}' with '{new_text}':\n{result}"
except ValueError:
return "Error: For 'replace' action, additional_param must be in 'old_text:new_text' format."
else:
return f"Error: Unknown action '{action}'. Valid actions are 'count', 'reverse', 'uppercase', 'lowercase', 'find', and 'replace'."
except Exception as e:
return f"Error processing string: {str(e)}"
final_answer = FinalAnswerTool()
# If the agent does not answer, the model is overloaded, please use another model or the following Hugging Face Endpoint that also contains qwen2.5 coder:
# model_id='https://pflgm2locj2t89co.us-east-1.aws.endpoints.huggingface.cloud'
model = HfApiModel(
max_tokens=2096,
temperature=0.5,
model_id='Qwen/Qwen2.5-Coder-32B-Instruct',# it is possible that this model may be overloaded
custom_role_conversions=None,
)
# Import tool from Hub
image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True)
with open("prompts.yaml", 'r') as stream:
prompt_templates = yaml.safe_load(stream)
agent = CodeAgent(
model=model,
tools=[get_current_time_in_timezone,
convert_currency,
dictionary_lookup,
wikipedia_search,
string_utilities,
final_answer],
max_steps=6,
verbosity_level=1,
grammar=None,
planning_interval=None,
name=None,
description=None,
prompt_templates=prompt_templates
)
GradioUI(agent).launch()