import math import os import re import subprocess import sys import fitz # PyMuPDF import requests from langchain_community.retrievers import BM25Retriever from smolagents import Tool class DetectVisualElementsTool(Tool): name = "detect_visual_elements" description = """Detects objects, people, and common visual elements in an image using a pretrained object detection model.""" inputs = { "image_path": { "type": "string", "description": "The full path to the image file to analyze.", } } output_type = "string" def forward(self, image_path: str) -> list: import os import torch import torchvision.models.detection as models import torchvision.transforms as T from PIL import Image label_map = { 0: "unlabeled", 1: "person", 2: "bicycle", 3: "car", 4: "motorcycle", 5: "airplane", 6: "bus", 7: "train", 8: "truck", 9: "boat", 10: "traffic", 11: "fire", 12: "street", 13: "stop", 14: "parking", 15: "bench", 16: "bird", 17: "cat", 18: "dog", 19: "horse", 20: "sheep", 21: "cow", 22: "elephant", 23: "bear", 24: "zebra", 25: "giraffe", 26: "hat", 27: "backpack", 28: "umbrella", 29: "shoe", 30: "eye", 31: "handbag", 32: "tie", 33: "suitcase", 34: "frisbee", 35: "skis", 36: "snowboard", 37: "sports", 38: "kite", 39: "baseball", 40: "baseball", 41: "skateboard", 42: "surfboard", 43: "tennis", 44: "bottle", 45: "plate", 46: "wine", 47: "cup", 48: "fork", 49: "knife", 50: "spoon", 51: "bowl", 52: "banana", 53: "apple", 54: "sandwich", 55: "orange", 56: "broccoli", 57: "carrot", 58: "hot", 59: "pizza", 60: "donut", 61: "cake", 62: "chair", 63: "couch", 64: "potted", 65: "bed", 66: "mirror", 67: "dining", 68: "window", 69: "desk", 70: "toilet", 71: "door", 72: "tv", 73: "laptop", 74: "mouse", 75: "remote", 76: "keyboard", 77: "cell", 78: "microwave", 79: "oven", 80: "toaster", 81: "sink", 82: "refrigerator", 83: "blender", 84: "book", 85: "clock", 86: "vase", 87: "scissors", 88: "teddy", 89: "hair", 90: "toothbrush", 91: "hair", 92: "banner", 93: "blanket", 94: "branch", 95: "bridge", 96: "building", 97: "bush", 98: "cabinet", 99: "cage", 100: "cardboard", 101: "carpet", 102: "ceiling", 103: "ceiling", 104: "cloth", 105: "clothes", 106: "clouds", 107: "counter", 108: "cupboard", 109: "curtain", 110: "desk", 111: "dirt", 112: "door", 113: "fence", 114: "floor", 115: "floor", 116: "floor", 117: "floor", 118: "floor", 119: "flower", 120: "fog", 121: "food", 122: "fruit", 123: "furniture", 124: "grass", 125: "gravel", 126: "ground", 127: "hill", 128: "house", 129: "leaves", 130: "light", 131: "mat", 132: "metal", 133: "mirror", 134: "moss", 135: "mountain", 136: "mud", 137: "napkin", 138: "net", 139: "paper", 140: "pavement", 141: "pillow", 142: "plant", 143: "plastic", 144: "platform", 145: "playingfield", 146: "railing", 147: "railroad", 148: "river", 149: "road", 150: "rock", 151: "roof", 152: "rug", 153: "salad", 154: "sand", 155: "sea", 156: "shelf", 157: "sky", 158: "skyscraper", 159: "snow", 160: "solid", 161: "stairs", 162: "stone", 163: "straw", 164: "structural", 165: "table", 166: "tent", 167: "textile", 168: "towel", 169: "tree", 170: "vegetable", 171: "wall", 172: "wall", 173: "wall", 174: "wall", 175: "wall", 176: "wall", 177: "wall", 178: "water", 179: "waterdrops", 180: "window", 181: "window", 182: "wood", } if not os.path.exists(image_path): return [f"❌ Image file does not exist: {image_path}"] try: model = models.fasterrcnn_resnet50_fpn(pretrained=True) model.eval() image = Image.open(image_path).convert("RGB") transform = T.Compose([T.ToTensor()]) img_tensor = transform(image).unsqueeze(0) with torch.no_grad(): predictions = model(img_tensor)[0] labels_list = [] for label_id, score in zip(predictions["labels"], predictions["scores"]): if score > 0.8: print(str(label_id.item())) labels_list.append(label_map.get(label_id.item())) labels = ",".join(labels_list) return labels or ["⚠️ No confident visual elements detected."] except Exception as e: return [f"❌ Failed to detect visual elements: {e}"] class ChessPositionSolverTool(Tool): name = "chess_position_solver" description = """Analyzes a chessboard image (from a URL or a local file path), detects the position using computer vision, and returns the best move in algebraic notation using the Stockfish engine (e.g., 'Qh5#').""" inputs = { "url": { "type": "string", "description": "Optional. URL pointing to an image of a chessboard position.", "nullable": True, }, "file_path": { "type": "string", "description": "Optional. Local file path to an image of a chessboard position.", "nullable": True, }, } output_type = "string" def forward(self, url: str = None, file_path: str = None) -> str: if not url and not file_path: return "❌ Please provide either a URL or a local file path to the chessboard image." if url and file_path: return "❌ Provide only one of: 'url' or 'file_path', not both." try: # Step 1 - Load image if url: img_bytes = requests.get(url, timeout=30).content img = cv2.imdecode(np.frombuffer(img_bytes, np.uint8), cv2.IMREAD_COLOR) else: if not os.path.exists(file_path): return f"❌ File not found: {file_path}" img = cv2.imread(file_path) if img is None: return "❌ Could not decode the image. Ensure the file is a valid chessboard image." # Step 2 - Infer FEN with chesscog detector = Chesscog(device="cpu") fen = detector.get_fen(img) if fen is None: return "❌ Could not detect chessboard or recognize position." board = chess.Board(fen) STOCKFISH_PATH = os.getenv( "STOCKFISH_PATH", "/home/boom/Desktop/repos/boombot/engines/stockfish-ubuntu-x86-64-bmi2", ) # Ensure Stockfish is available # Step 3 - Analyze with Stockfish engine = chess.engine.SimpleEngine.popen_uci(STOCKFISH_PATH) result = engine.play(board, chess.engine.Limit(depth=18)) # fixed depth engine.quit() best_move = board.san(result.move) return best_move except Exception as e: return f"❌ chess_position_solver failed: {str(e)}" def patch_pyproject(path): pyproject_path = os.path.join(path, "pyproject.toml") if not os.path.exists(pyproject_path): raise FileNotFoundError(f"No pyproject.toml found in {path}") with open(pyproject_path, "r", encoding="utf-8") as f: lines = f.readlines() with open(pyproject_path, "w", encoding="utf-8") as f: for line in lines: if re.match(r"\s*python\s*=", line): f.write('python = ">=3.8,<3.12"\n') else: f.write(line) def install_chesscog(): TARGET_DIR = "chesscog" REPO_URL = "https://github.com/georg-wolflein/chesscog.git" try: pass print("✅ chesscog already installed.") # return except ImportError: print("⬇️ Installing chesscog...") if not os.path.exists(TARGET_DIR): subprocess.run(["git", "clone", REPO_URL, TARGET_DIR], check=True) patch_pyproject(TARGET_DIR) subprocess.run( [sys.executable, "-m", "pip", "install", f"./{TARGET_DIR}"], check=True ) print("✅ chesscog installed successfully.") class RetrieverTool(Tool): name = "retriever" description = "Retrieves the most similar known question to the query." inputs = { "query": { "type": "string", "description": "The query from the user (a question).", } } output_type = "string" def __init__(self, docs, **kwargs): super().__init__(**kwargs) self.retriever = BM25Retriever.from_documents(docs, k=1) def forward(self, query: str) -> str: docs = self.retriever.invoke(query) if docs: doc = docs[0] return f"{doc.page_content}\n\nEXAMPLE FINAL ANSWER:\n{doc.metadata['answer']}\n" else: return "No similar question found." class CalculatorTool(Tool): name = "calculator" description = """Performs basic mathematical calculations (e.g., addition, subtraction, multiplication, division, exponentiation, square root). Use this tool whenever math is required, especially for numeric reasoning.""" inputs = { "expression": { "type": "string", "description": "A basic math expression, e.g., '5 + 3 * 2', 'sqrt(49)', '2 ** 3'. No variables or natural language.", } } output_type = "string" def forward(self, expression: str) -> str: try: allowed_names = { k: v for k, v in math.__dict__.items() if not k.startswith("__") } allowed_names.update({"abs": abs, "round": round}) result = eval(expression, {"__builtins__": {}}, allowed_names) return str(result) except Exception as e: return f"Error: Invalid math expression. ({e})" class AnalyzeChessImageTool(Tool): name = "analyze_chess_image" description = """Extracts the board state from a chessboard image and returns the best move for black (in algebraic notation).""" inputs = { "file_path": { "type": "string", "description": "Path to the image file of the chess board.", } } output_type = "string" def forward(self, file_path: str) -> str: try: import chess import chess.engine import chessvision # hypothetical or use OpenCV + custom board parser board = chessvision.image_to_board(file_path) if not board or not board.turn == chess.BLACK: return "❌ Invalid board or not black's turn." engine = chess.engine.SimpleEngine.popen_uci("/usr/bin/stockfish") result = engine.play(board, chess.engine.Limit(time=0.1)) move = result.move.uci() engine.quit() return move except Exception as e: return f"❌ Chess analysis failed: {e}" class ExecutePythonCodeTool(Tool): name = "execute_python_code" description = """Executes a provided Python code snippet in a controlled, sandboxed environment. This tool is used to safely run Python code and return the output or result of the execution.""" inputs = { "code": { "type": "string", "description": "A valid Python code block that needs to be executed. It should be a string containing executable Python code.", } } output_type = "string" def forward(self, code: str) -> str: try: # Create a restricted environment to execute the code safely # Only allow standard Python libraries and prevent unsafe functions like `os.system` or `eval` restricted_globals = { "__builtins__": { "abs": abs, "all": all, "any": any, "bin": bin, "bool": bool, "chr": chr, "complex": complex, "dict": dict, "divmod": divmod, "float": float, "hash": hash, "hex": hex, "int": int, "isinstance": isinstance, "len": len, "max": max, "min": min, "oct": oct, "pow": pow, "range": range, "round": round, "set": set, "sorted": sorted, "str": str, "tuple": tuple, "zip": zip, } } # Execute the code in the restricted environment exec_locals = {} exec(code, restricted_globals, exec_locals) # If the code produces a result, we return that as output if "result" in exec_locals: return str(exec_locals["result"]) else: return "❌ The code did not produce a result." except Exception as e: return f"❌ Error executing code: {str(e)}" class ArxivSearchTool(Tool): name = "arxiv_search" description = """Searches arXiv for academic papers and returns structured information including titles, authors, publication dates, and abstracts. Ideal for finding scientific research on specific topics.""" inputs = { "query": { "type": "string", "description": "A research-related query string (e.g., 'Superstring Cosmology')", } } output_type = "string" def forward(self, query: str) -> str: max_results = 10 try: search_docs = ArxivLoader( query=query, load_max_docs=max_results, load_all_available_meta=True ).load() except Exception as e: return f"❌ Arxiv search failed: {e}" if not search_docs: return "No results found for your query." output_lines = [] for idx, doc in enumerate(search_docs): meta = getattr(doc, "metadata", {}) or {} content = getattr(doc, "page_content", "").strip() output_lines.append(f"🔍 RESULT {idx + 1}") output_lines.append(f"Title : {meta.get('Title', '[No Title]')}") output_lines.append(f"Authors : {meta.get('Authors', '[No Authors]')}") output_lines.append(f"Published : {meta.get('Published', '[No Date]')}") output_lines.append(f"Summary : {meta.get('Summary', '[No Summary]')}") output_lines.append(f"Entry ID : {meta.get('entry_id', '[N/A]')}") # output_lines.append(f"First Pub. : {meta.get('published_first_time', '[N/A]')}") # output_lines.append(f"Comment : {meta.get('comment', '[N/A]')}") output_lines.append(f"DOI : {meta.get('doi', '[N/A]')}") # output_lines.append(f"Journal Ref : {meta.get('journal_ref', '[N/A]')}") # output_lines.append(f"Primary Cat. : {meta.get('primary_category', '[N/A]')}") # output_lines.append(f"Categories : {', '.join(meta.get('categories', [])) or '[N/A]'}") output_lines.append( f"Links : {', '.join(meta.get('links', [])) or '[N/A]'}" ) if content: preview = content[:30] + ("..." if len(content) > 30 else "") output_lines.append(f"Content : {preview}") output_lines.append("") # spacing between results return "\n".join(output_lines).strip()