Final_Assignment_Template / tools copy.py
mdicio's picture
restructuring
ef7a70c
import os
import re
import tempfile
import traceback
import fitz # PyMuPDF
import pandas as pd
import requests
from smolagents import Tool
class DownloadFileFromTaskTool(Tool):
name = "download_file_from_task"
description = """Downloads a file for a GAIA task ID and saves it in a temporary directory.
Use this when question requires information from a mentioned file, before reading a file."""
inputs = {
"task_id": {"type": "string", "description": "The GAIA task ID (REQUIRED)."},
"filename": {
"type": "string",
"description": "Optional custom filename to save the file as (e.g., 'data.xlsx').",
"nullable": True,
},
}
output_type = "string"
def forward(self, task_id: str, filename: str = None) -> str:
if not task_id or not re.match(r"^[0-9a-f\-]{36}$", task_id):
return "❌ Invalid or missing task_id."
file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}"
try:
response = requests.get(file_url, timeout=15)
if response.status_code == 404:
return "⚠️ No file found for this task."
response.raise_for_status()
# Try extracting filename and extension from header
disposition = response.headers.get("content-disposition", "")
header_filename_match = re.search(r'filename="(.+?)"', disposition)
ext = ""
if header_filename_match:
ext = os.path.splitext(header_filename_match.group(1))[1]
# Final filename logic
if not filename:
filename = f"{task_id}{ext or '.bin'}"
temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, filename)
with open(file_path, "wb") as f:
f.write(response.content)
print(f"File saved at: {file_path}")
return file_path
except Exception as e:
return f"❌ Error: {e}"
class ReadFileContentTool(Tool):
name = "read_file_content"
description = """Reads and returns the content of a file. Use after downloading a file using `download_file_from_task`."""
inputs = {
"file_path": {"type": "string", "description": "Full path to a file to read."}
}
output_type = "string"
def forward(self, file_path: str) -> str:
if not os.path.exists(file_path):
return f"❌ File does not exist: {file_path}"
ext = os.path.splitext(file_path)[1].lower()
try:
if ext == ".txt":
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
elif ext == ".csv":
df = pd.read_csv(file_path)
return df.head().to_string(index=False)
elif ext == ".xlsx":
df = pd.read_excel(file_path)
return df.head().to_string(index=False)
elif ext == ".pdf":
doc = fitz.open(file_path)
text = ""
for page in doc:
text += page.get_text()
doc.close()
return text.strip() or "⚠️ PDF contains no readable text."
elif ext == ".json":
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
elif ext == ".py":
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
elif ext in [".mp3", ".wav"]:
return f"ℹ️ Audio file detected: {os.path.basename(file_path)}. Use audio processing tool if needed."
elif ext in [".mp4", ".mov", ".avi"]:
return f"ℹ️ Video file detected: {os.path.basename(file_path)}. Use video analysis tool if available."
else:
return f"ℹ️ Unsupported file type: {ext}. File saved at {file_path}"
except Exception as e:
return f"❌ Could not read {file_path}: {e}"
class GetWikipediaInfoTool(Tool):
name = "get_wikipedia_info"
description = """Fetches a short summary about a topic from Wikipedia.
Use this when a user asks for background information, an explanation, or context on a well-known subject."""
inputs = {
"topic": {
"type": "string",
"description": "The topic to search for on Wikipedia.",
}
}
output_type = "string"
def forward(self, topic: str) -> str:
print(f"EXECUTING TOOL: get_wikipedia_info(topic='{topic}')")
try:
search_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={topic}&format=json"
search_response = requests.get(search_url, timeout=10)
search_response.raise_for_status()
search_data = search_response.json()
if not search_data.get("query", {}).get("search", []):
return f"No Wikipedia info for '{topic}'."
page_id = search_data["query"]["search"][0]["pageid"]
content_url = (
f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&"
f"exintro=1&explaintext=1&pageids={page_id}&format=json"
)
content_response = requests.get(content_url, timeout=10)
content_response.raise_for_status()
content_data = content_response.json()
extract = content_data["query"]["pages"][str(page_id)]["extract"]
if len(extract) > 1500:
extract = extract[:1500] + "..."
result = f"Wikipedia summary for '{topic}':\n{extract}"
print(f"-> Tool Result (Wikipedia): {result[:100]}...")
return result
except Exception as e:
print(f"❌ Error in get_wikipedia_info: {e}")
traceback.print_exc()
return f"Error wiki: {e}"
class VisitWebpageTool(Tool):
name = "visit_webpage"
description = """
Visits a given URL and returns structured page content including title, metadata, headings, paragraphs,
tables, lists, and links.
"""
inputs = {
"url": {
"type": "string",
"description": "The full URL of the webpage to visit.",
}
}
output_type = "string"
def forward(self, url: str) -> str:
try:
import json
import requests
from bs4 import BeautifulSoup
response = requests.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
def clean(text):
return " ".join(text.strip().split())
def extract_tables(soup):
tables_data = []
for table in soup.find_all("table"):
headers = [clean(th.get_text()) for th in table.find_all("th")]
rows = []
for row in table.find_all("tr"):
cells = [clean(td.get_text()) for td in row.find_all("td")]
if cells:
rows.append(cells)
if headers and rows:
tables_data.append({"headers": headers, "rows": rows})
return tables_data
def extract_lists(soup):
all_lists = []
for ul in soup.find_all("ul"):
items = [clean(li.get_text()) for li in ul.find_all("li")]
if items:
all_lists.append(items)
for ol in soup.find_all("ol"):
items = [clean(li.get_text()) for li in ol.find_all("li")]
if items:
all_lists.append(items)
return all_lists
def extract_meta(soup):
metas = {}
for meta in soup.find_all("meta"):
name = meta.get("name") or meta.get("property")
content = meta.get("content")
if name and content:
metas[name.lower()] = clean(content)
return metas
result = {
"title": clean(soup.title.string) if soup.title else None,
"meta": extract_meta(soup),
"headings": {
"h1": [clean(h.get_text()) for h in soup.find_all("h1")],
"h2": [clean(h.get_text()) for h in soup.find_all("h2")],
"h3": [clean(h.get_text()) for h in soup.find_all("h3")],
},
"paragraphs": [clean(p.get_text()) for p in soup.find_all("p")],
"lists": extract_lists(soup),
"tables": extract_tables(soup),
"links": [
{"text": clean(a.get_text()), "href": a["href"]}
for a in soup.find_all("a", href=True)
],
}
return json.dumps(result, indent=2)
except Exception as e:
return f"❌ Failed to fetch or parse webpage: {str(e)}"
class TranscribeAudioTool(Tool):
name = "transcribe_audio"
description = (
"""Transcribes spoken audio (e.g. voice memos, lectures) into plain text."""
)
inputs = {"file_path": {"type": "string", "description": "Path to an audio file."}}
output_type = "string"
def forward(self, file_path: str) -> str:
try:
import os
import tempfile
import speech_recognition as sr
from pydub import AudioSegment
# Initialize recognizer
recognizer = sr.Recognizer()
# Convert to WAV if not already (needed for speech_recognition)
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext != ".wav":
# Create temp WAV file
temp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name
# Convert to WAV using pydub
audio = AudioSegment.from_file(file_path)
audio.export(temp_wav, format="wav")
audio_path = temp_wav
else:
audio_path = file_path
# Transcribe audio using Google's speech recognition
with sr.AudioFile(audio_path) as source:
audio_data = recognizer.record(source)
transcript = recognizer.recognize_google(audio_data)
# Clean up temp file if created
if file_ext != ".wav" and os.path.exists(temp_wav):
os.remove(temp_wav)
return transcript.strip()
except Exception as e:
return f"❌ Transcription failed: {str(e)}"
class TranscibeVideoFileTool(Tool):
name = "transcribe_video"
description = """Transcribes speech from a video file. Use this to understand video lectures, tutorials, or visual demos."""
inputs = {
"file_path": {
"type": "string",
"description": "Path to the video file (e.g., .mp4, .mov).",
}
}
output_type = "string"
def forward(self, file_path: str) -> str:
try:
import os
import tempfile
import moviepy.editor as mp
import speech_recognition as sr
# Extract audio from video
video = mp.VideoFileClip(file_path)
# Create temporary audio file
temp_audio = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name
# Extract audio to WAV format (required for speech_recognition)
video.audio.write_audiofile(temp_audio, verbose=False, logger=None)
video.close()
# Initialize recognizer
recognizer = sr.Recognizer()
# Transcribe audio
with sr.AudioFile(temp_audio) as source:
audio_data = recognizer.record(source)
transcript = recognizer.recognize_google(audio_data)
# Clean up temp file
if os.path.exists(temp_audio):
os.remove(temp_audio)
return transcript.strip()
except Exception as e:
return f"❌ Video processing failed: {str(e)}"