Spaces:
Running
Running
import os | |
import io | |
import sys | |
import uuid | |
import base64 | |
import traceback | |
import contextlib | |
import tempfile | |
import subprocess | |
import sqlite3 | |
from typing import Dict, List, Any, Optional, Union | |
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
from PIL import Image | |
class CodeInterpreter: | |
def __init__(self, allowed_modules=None, max_execution_time=30, working_directory=None): | |
"""Initialize the code interpreter with safety measures.""" | |
self.allowed_modules = allowed_modules or [ | |
"numpy", "pandas", "matplotlib", "scipy", "sklearn", | |
"math", "random", "statistics", "datetime", "collections", | |
"itertools", "functools", "operator", "re", "json", | |
"sympy", "networkx", "nltk", "PIL", "pytesseract", | |
"cmath", "uuid", "tempfile", "requests", "urllib" | |
] | |
self.max_execution_time = max_execution_time | |
self.working_directory = working_directory or os.path.join(os.getcwd()) | |
if not os.path.exists(self.working_directory): | |
os.makedirs(self.working_directory) | |
self.globals = { | |
"__builtins__": __builtins__, | |
"np": np, | |
"pd": pd, | |
"plt": plt, | |
"Image": Image, | |
} | |
self.temp_sqlite_db = os.path.join(tempfile.gettempdir(), "code_exec.db") | |
def execute_code(self, code: str, language: str = "python") -> Dict[str, Any]: | |
"""Execute the provided code in the selected programming language.""" | |
language = language.lower() | |
execution_id = str(uuid.uuid4()) | |
result = { | |
"execution_id": execution_id, | |
"status": "error", | |
"stdout": "", | |
"stderr": "", | |
"result": None, | |
"plots": [], | |
"dataframes": [] | |
} | |
try: | |
if language == "python": | |
return self._execute_python(code, execution_id) | |
elif language == "bash": | |
return self._execute_bash(code, execution_id) | |
elif language == "sql": | |
return self._execute_sql(code, execution_id) | |
elif language == "c": | |
return self._execute_c(code, execution_id) | |
elif language == "java": | |
return self._execute_java(code, execution_id) | |
else: | |
result["stderr"] = f"Unsupported language: {language}" | |
except Exception as e: | |
result["stderr"] = str(e) | |
return result | |
def _execute_python(self, code: str, execution_id: str) -> dict: | |
output_buffer = io.StringIO() | |
error_buffer = io.StringIO() | |
result = { | |
"execution_id": execution_id, | |
"status": "error", | |
"stdout": "", | |
"stderr": "", | |
"result": None, | |
"plots": [], | |
"dataframes": [] | |
} | |
try: | |
exec_dir = os.path.join(self.working_directory, execution_id) | |
os.makedirs(exec_dir, exist_ok=True) | |
plt.switch_backend('Agg') | |
with contextlib.redirect_stdout(output_buffer), contextlib.redirect_stderr(error_buffer): | |
exec_result = exec(code, self.globals) | |
if plt.get_fignums(): | |
for i, fig_num in enumerate(plt.get_fignums()): | |
fig = plt.figure(fig_num) | |
img_path = os.path.join(exec_dir, f"plot_{i}.png") | |
fig.savefig(img_path) | |
with open(img_path, "rb") as img_file: | |
img_data = base64.b64encode(img_file.read()).decode('utf-8') | |
result["plots"].append({ | |
"figure_number": fig_num, | |
"data": img_data | |
}) | |
for var_name, var_value in self.globals.items(): | |
if isinstance(var_value, pd.DataFrame) and len(var_value) > 0: | |
result["dataframes"].append({ | |
"name": var_name, | |
"head": var_value.head().to_dict(), | |
"shape": var_value.shape, | |
"dtypes": str(var_value.dtypes) | |
}) | |
result["status"] = "success" | |
result["stdout"] = output_buffer.getvalue() | |
result["result"] = exec_result | |
except Exception as e: | |
result["status"] = "error" | |
result["stderr"] = f"{error_buffer.getvalue()}\n{traceback.format_exc()}" | |
return result | |
def _execute_bash(self, code: str, execution_id: str) -> dict: | |
try: | |
completed = subprocess.run( | |
code, shell=True, capture_output=True, text=True, timeout=self.max_execution_time | |
) | |
return { | |
"execution_id": execution_id, | |
"status": "success" if completed.returncode == 0 else "error", | |
"stdout": completed.stdout, | |
"stderr": completed.stderr, | |
"result": None, | |
"plots": [], | |
"dataframes": [] | |
} | |
except subprocess.TimeoutExpired: | |
return { | |
"execution_id": execution_id, | |
"status": "error", | |
"stdout": "", | |
"stderr": "Execution timed out.", | |
"result": None, | |
"plots": [], | |
"dataframes": [] | |
} | |
def _execute_sql(self, code: str, execution_id: str) -> dict: | |
result = { | |
"execution_id": execution_id, | |
"status": "error", | |
"stdout": "", | |
"stderr": "", | |
"result": None, | |
"plots": [], | |
"dataframes": [] | |
} | |
try: | |
conn = sqlite3.connect(self.temp_sqlite_db) | |
cur = conn.cursor() | |
cur.execute(code) | |
if code.strip().lower().startswith("select"): | |
columns = [description[0] for description in cur.description] | |
rows = cur.fetchall() | |
df = pd.DataFrame(rows, columns=columns) | |
result["dataframes"].append({ | |
"name": "query_result", | |
"head": df.head().to_dict(), | |
"shape": df.shape, | |
"dtypes": str(df.dtypes) | |
}) | |
else: | |
conn.commit() | |
result["status"] = "success" | |
result["stdout"] = "Query executed successfully." | |
except Exception as e: | |
result["stderr"] = str(e) | |
finally: | |
conn.close() | |
return result | |
def _execute_c(self, code: str, execution_id: str) -> dict: | |
temp_dir = tempfile.mkdtemp() | |
source_path = os.path.join(temp_dir, "program.c") | |
binary_path = os.path.join(temp_dir, "program") | |
try: | |
with open(source_path, "w") as f: | |
f.write(code) | |
compile_proc = subprocess.run( | |
["gcc", source_path, "-o", binary_path], | |
capture_output=True, text=True, timeout=self.max_execution_time | |
) | |
if compile_proc.returncode != 0: | |
return { | |
"execution_id": execution_id, | |
"status": "error", | |
"stdout": compile_proc.stdout, | |
"stderr": compile_proc.stderr, | |
"result": None, | |
"plots": [], | |
"dataframes": [] | |
} | |
run_proc = subprocess.run( | |
[binary_path], | |
capture_output=True, text=True, timeout=self.max_execution_time | |
) | |
return { | |
"execution_id": execution_id, | |
"status": "success" if run_proc.returncode == 0 else "error", | |
"stdout": run_proc.stdout, | |
"stderr": run_proc.stderr, | |
"result": None, | |
"plots": [], | |
"dataframes": [] | |
} | |
except Exception as e: | |
return { | |
"execution_id": execution_id, | |
"status": "error", | |
"stdout": "", | |
"stderr": str(e), | |
"result": None, | |
"plots": [], | |
"dataframes": [] | |
} | |
def _execute_java(self, code: str, execution_id: str) -> dict: | |
temp_dir = tempfile.mkdtemp() | |
source_path = os.path.join(temp_dir, "Main.java") | |
try: | |
with open(source_path, "w") as f: | |
f.write(code) | |
compile_proc = subprocess.run( | |
["javac", source_path], | |
capture_output=True, text=True, timeout=self.max_execution_time | |
) | |
if compile_proc.returncode != 0: | |
return { | |
"execution_id": execution_id, | |
"status": "error", | |
"stdout": compile_proc.stdout, | |
"stderr": compile_proc.stderr, | |
"result": None, | |
"plots": [], | |
"dataframes": [] | |
} | |
run_proc = subprocess.run( | |
["java", "-cp", temp_dir, "Main"], | |
capture_output=True, text=True, timeout=self.max_execution_time | |
) | |
return { | |
"execution_id": execution_id, | |
"status": "success" if run_proc.returncode == 0 else "error", | |
"stdout": run_proc.stdout, | |
"stderr": run_proc.stderr, | |
"result": None, | |
"plots": [], | |
"dataframes": [] | |
} | |
except Exception as e: | |
return { | |
"execution_id": execution_id, | |
"status": "error", | |
"stdout": "", | |
"stderr": str(e), | |
"result": None, | |
"plots": [], | |
"dataframes": [] | |
} | |