import streamlit as st
from interpreter import interpreter
import os
from streamlit_extras.colored_header import colored_header
from streamlit_lottie import st_lottie
import json
import requests
import re
from datetime import datetime, timezone
from typing import Dict, Any
from streamlit.runtime.scriptrunner import get_script_run_ctx
import time
from tenacity import retry, stop_after_attempt, wait_exponential
import shutil
from pathlib import Path
import hashlib
import streamlit_file_browser as sfb
# Add retry decorator for API calls
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def load_lottieurl(url: str) -> Dict[str, Any]:
try:
r = requests.get(url, timeout=10) # Add timeout
r.raise_for_status() # Raise exception for bad status codes
return r.json()
except requests.exceptions.RequestException as e:
st.error(f"Failed to load animation: {str(e)}")
return None
# Add error handling for interpreter calls
def safe_interpreter_call(func, *args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
error_msg = str(e)
if "API key" in error_msg.lower():
st.error("❌ API key error. Please check your API key in settings.")
elif "rate limit" in error_msg.lower():
st.error("⏳ Rate limit exceeded. Please wait a moment and try again.")
else:
st.error(f"❌ Error: {error_msg}")
return None
def get_session_id():
ctx = get_script_run_ctx()
return ctx.session_id if ctx else None
def save_settings():
"""Save current settings to session state and ensure they persist"""
settings = st.session_state.settings
session_id = get_session_id()
if session_id:
# Save all relevant state
st.session_state[f"persistent_settings_{session_id}"] = settings.copy()
st.session_state[f"persistent_model_{session_id}"] = st.session_state.selected_model
st.session_state[f"persistent_audio_{session_id}"] = st.session_state.selected_audio_track
def load_settings():
"""Load settings from persistent storage"""
session_id = get_session_id()
if session_id:
# Load all saved state
if f"persistent_settings_{session_id}" in st.session_state:
st.session_state.settings = st.session_state[f"persistent_settings_{session_id}"].copy()
if f"persistent_model_{session_id}" in st.session_state:
st.session_state.selected_model = st.session_state[f"persistent_model_{session_id}"]
if f"persistent_audio_{session_id}" in st.session_state:
st.session_state.selected_audio_track = st.session_state[f"persistent_audio_{session_id}"]
def init_session_state():
"""Initialize session state with proper error handling and defaults"""
try:
# Load persistent settings first
load_settings()
# Define all required session state keys and their defaults
default_state = {
"settings": {
"api_key": os.getenv("HF_API_KEY", ""),
"api_base": "https://api-inference.huggingface.co/models/Qwen/Qwen2.5-Coder-32B-Instruct",
"model": "huggingface/Qwen/Qwen2.5-Coder-32B-Instruct",
"auto_run": True,
"theme": "light",
"code_style": "monokai",
"custom_instructions": "",
"safe_mode": "off",
"conversation_history": False,
"os_mode": False,
"os_restricted_mode": False,
"allowed_paths": ["/"],
"use_workspace": True,
},
"selected_model": None, # Will be set from settings["model"]
"selected_audio_track": "Ambient",
"uploaded_files": [],
"settings_open": False,
"messages": [],
"original_system_message": interpreter.system_message,
"session_dir": None,
"last_request_time": time.time()
}
# Initialize all required state variables
for key, default_value in default_state.items():
if key not in st.session_state:
st.session_state[key] = default_value
# Ensure selected_model is set from settings if not already set
if not st.session_state.selected_model:
st.session_state.selected_model = st.session_state.settings["model"]
# Always reset interpreter for fresh session
interpreter.reset()
# Apply initial settings
apply_interpreter_settings()
except Exception as e:
st.error(f"Error initializing session state: {str(e)}")
# Provide fallback values for critical settings
if "settings" not in st.session_state:
st.session_state.settings = default_state["settings"]
def apply_interpreter_settings():
"""Apply interpreter settings"""
interpreter.llm.api_key = st.session_state.settings["api_key"]
interpreter.llm.api_base = st.session_state.settings["api_base"]
interpreter.llm.model = st.session_state.settings["model"]
interpreter.auto_run = st.session_state.settings["auto_run"]
interpreter.llm.temperature = 0.7
interpreter.llm.repetition_penalty = 1.2
interpreter.safe_mode = "off"
interpreter.conversation_history = True # Force this to False
interpreter.os = st.session_state.settings["os_mode"]
# Set allowed paths
interpreter.computer.allowed_paths = [get_session_folder()] if st.session_state.settings["use_workspace"] else \
st.session_state.settings["allowed_paths"]
# Update system message
interpreter.system_message = st.session_state.original_system_message
if st.session_state.settings["custom_instructions"]:
interpreter.system_message += f"\n\nAdditional Instructions:\n{st.session_state.settings['custom_instructions']}"
if st.session_state.settings["use_workspace"]:
workspace_path = get_session_folder()
interpreter.system_message += f"\n\nWorkspace Path: {workspace_path}\nYou can only access files in this workspace directory."
class OutputController:
def __init__(self):
self.loop_detection = {
'last_content': None,
'repeat_count': 0,
'last_timestamp': datetime.now(),
'number_pattern': re.compile(r'^[\d\s.]+$'),
'terminal_spam': re.compile(r'^[0-9\s.]{20,}$'), # Long number sequences
'max_repeats': 3,
'timeout_seconds': 2
}
def is_loop_detected(self, content: str) -> bool:
now = datetime.now()
content = str(content).strip()
# Immediately skip terminal spam
if self.loop_detection['terminal_spam'].match(content):
return True
if (content != self.loop_detection['last_content'] or
(now - self.loop_detection['last_timestamp']).seconds > self.loop_detection['timeout_seconds']):
self.loop_detection.update({
'repeat_count': 0,
'last_content': content,
'last_timestamp': now
})
return False
# More aggressive number detection
if self.loop_detection['number_pattern'].match(content):
self.loop_detection['repeat_count'] += 1
if self.loop_detection['repeat_count'] > 2: # Reduced tolerance
return True
return False
# Move clear_chat_history outside main function
def clear_chat_history():
"""Completely reset the chat state"""
interpreter.messages = [] # Clear interpreter's message history
interpreter.reset() # Full reset of interpreter
st.session_state.messages = [] # Clear UI message history
save_settings() # Ensure settings persist after clear
st.success("Chat history cleared!")
st.rerun()
# Move update_model_settings outside main function
def update_model_settings():
st.session_state.selected_model = st.session_state.model_select
st.session_state.settings.update({
"model": st.session_state.model_select,
"api_base": model_options[st.session_state.model_select]
})
save_settings() # Save settings after update
# Define audio_tracks at module level
audio_tracks = {
"Ambient": "https://cdn.pixabay.com/download/audio/2022/02/22/audio_d1718ab41b.mp3",
"Lo-Fi": "https://cdn.pixabay.com/download/audio/2022/03/10/audio_2d7b426f87.mp3",
"Focus": "https://cdn.pixabay.com/download/audio/2022/01/18/audio_d0c6bf3c0e.mp3"
}
# Move to module level (top of file with other constants)
model_options = {
# Default and recommended model
"huggingface/Qwen/Qwen2.5-Coder-32B-Instruct": "https://api-inference.huggingface.co/models/Qwen/Qwen2.5-Coder-32B-Instruct", # Default
# Other Qwen 2.5 Coder Series
"huggingface/Qwen/Qwen2.5-Coder-14B-Instruct": "https://api-inference.huggingface.co/models/Qwen/Qwen2.5-Coder-14B-Instruct",
"huggingface/Qwen/Qwen2.5-Coder-7B-Instruct": "https://api-inference.huggingface.co/models/Qwen/Qwen2.5-Coder-7B-Instruct",
# Qwen 2.5 General Series
"huggingface/Qwen/Qwen2.5-72B-Instruct": "https://api-inference.huggingface.co/models/Qwen/Qwen2.5-72B-Instruct",
"huggingface/Qwen/Qwen2.5-32B-Instruct": "https://api-inference.huggingface.co/models/Qwen/Qwen2.5-32B-Instruct",
"huggingface/Qwen/Qwen2.5-7B-Instruct": "https://api-inference.huggingface.co/models/Qwen/Qwen2.5-7B-Instruct",
# Other verified top performers
"huggingface/mistralai/Mixtral-8x7B-Instruct-v0.1": "https://api-inference.huggingface.co/models/mistralai/Mixtral-8x7B-Instruct-v0.1",
"huggingface/mistralai/Mistral-7B-Instruct-v0.2": "https://api-inference.huggingface.co/models/mistralai/Mistral-7B-Instruct-v0.2",
"huggingface/codellama/CodeLlama-34b-Instruct-hf": "https://api-inference.huggingface.co/models/codellama/CodeLlama-34b-Instruct-hf",
"huggingface/codellama/CodeLlama-13b-Instruct-hf": "https://api-inference.huggingface.co/models/codellama/CodeLlama-13b-Instruct-hf",
"huggingface/deepseek-ai/deepseek-coder-6.7b-instruct": "https://api-inference.huggingface.co/models/deepseek-ai/deepseek-coder-6.7b-instruct",
"huggingface/microsoft/phi-2": "https://api-inference.huggingface.co/models/microsoft/phi-2",
"huggingface/bigcode/starcoder2-15b": "https://api-inference.huggingface.co/models/bigcode/starcoder2-15b",
}
def get_theme_styles(theme: str = "light") -> str:
"""Get theme styles based on Streamlit's native theming"""
return """
/* Base styles */
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif;
line-height: 1.6;
}
/* Message container styles */
.stChatMessage {
margin: 1rem 0;
padding: 1rem;
border-radius: 10px;
border: 1px solid rgba(128, 128, 128, 0.2);
}
/* Code block styles */
pre {
border-radius: 8px !important;
padding: 1rem !important;
margin: 1rem 0 !important;
border: 1px solid rgba(128, 128, 128, 0.2) !important;
overflow-x: auto !important;
}
code {
font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, Courier, monospace !important;
font-size: 0.9em !important;
padding: 0.2em 0.4em !important;
border-radius: 3px !important;
}
/* Output block styles */
.output-block {
border-radius: 8px;
padding: 1rem;
margin: 0.5rem 0;
font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, Courier, monospace;
font-size: 0.9em;
border-left: 4px solid #FF4B4B;
opacity: 0.9;
}
/* Message spacing */
.stMarkdown {
line-height: 1.6;
margin: 0.5rem 0;
}
.stMarkdown p {
margin: 0.75rem 0;
}
/* Button styles */
.stButton button {
border-radius: 20px !important;
padding: 0.4rem 1rem !important;
border: 1px solid rgba(128, 128, 128, 0.2) !important;
font-weight: 500 !important;
transition: all 0.3s ease !important;
}
/* Header styles */
.stMarkdown h1, .stMarkdown h2, .stMarkdown h3 {
margin: 1.5rem 0 1rem 0;
font-weight: 600;
}
/* Input field styles */
.stTextInput > div > div > input {
border-radius: 10px !important;
border: 1px solid rgba(128, 128, 128, 0.2) !important;
padding: 0.75rem 1rem !important;
font-size: 1rem !important;
}
/* Chat message styles */
.chat-message {
padding: 1.25rem;
border-radius: 12px;
margin: 1rem 0;
border: 1px solid rgba(128, 128, 128, 0.2);
box-shadow: 0 2px 4px rgba(0,0,0,0.05);
}
/* User message specific styles */
.user-message {
margin-left: auto;
max-width: 80%;
}
/* Assistant message specific styles */
.assistant-message {
margin-right: auto;
max-width: 80%;
}
/* Error and warning styles */
.error-message {
border-left: 4px solid #DC2626;
padding: 1rem;
margin: 1rem 0;
border-radius: 8px;
opacity: 0.9;
}
.warning-message {
border-left: 4px solid #F59E0B;
padding: 1rem;
margin: 1rem 0;
border-radius: 8px;
opacity: 0.9;
}
/* Success message styles */
.success-message {
border-left: 4px solid #059669;
padding: 1rem;
margin: 1rem 0;
border-radius: 8px;
opacity: 0.9;
}
/* Floating audio player styles */
.floating-audio {
position: fixed;
bottom: 20px;
right: 20px;
z-index: 9999;
padding: 1rem;
border-radius: 12px;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
backdrop-filter: blur(10px);
border: 1px solid rgba(128, 128, 128, 0.2);
max-width: 300px;
transition: all 0.3s ease;
}
.floating-audio:hover {
transform: translateY(-2px);
box-shadow: 0 6px 8px rgba(0,0,0,0.15);
}
.floating-audio audio {
width: 250px;
height: 40px;
opacity: 0.9;
border-radius: 8px;
margin-bottom: 0.5rem;
}
.floating-audio select {
width: 100%;
padding: 0.5rem;
border-radius: 8px;
border: 1px solid rgba(128, 128, 128, 0.2);
font-size: 0.9rem;
cursor: pointer;
}
/* File browser styles */
.file-browser {
border-radius: 12px;
padding: 1rem;
margin: 1rem 0;
border: 1px solid rgba(128, 128, 128, 0.2);
}
.file-item {
display: flex;
align-items: center;
padding: 0.75rem;
border-bottom: 1px solid rgba(128, 128, 128, 0.2);
transition: all 0.2s ease;
}
.file-item:hover {
opacity: 0.8;
}
/* Settings panel styles */
.settings-panel {
border-radius: 12px;
padding: 1.5rem;
margin: 1rem 0;
border: 1px solid rgba(128, 128, 128, 0.2);
}
/* Tab styles */
.stTabs {
border-radius: 8px;
padding: 0.5rem;
}
.stTab {
border-radius: 8px !important;
padding: 0.5rem 1rem !important;
}
"""
class ChatMessage:
def __init__(self, content="", message_type="text", role="assistant"):
self.content = content
self.type = message_type
self.role = role
self.timestamp = datetime.now(timezone.utc)
self.id = hashlib.md5(f"{self.timestamp.isoformat()}-{content}".encode()).hexdigest()
self._processed_content = None # Cache for processed content
def to_dict(self):
return {
"content": self.content,
"type": self.type,
"role": self.role,
"timestamp": self.timestamp.isoformat(),
"id": self.id
}
@classmethod
def from_dict(cls, data):
msg = cls(
content=data["content"],
message_type=data["type"],
role=data["role"]
)
msg.timestamp = datetime.fromisoformat(data["timestamp"])
msg.id = data["id"]
return msg
def get_formatted_content(self, force_refresh=False):
"""Get formatted content with caching"""
if self._processed_content is None or force_refresh:
self._processed_content = format_message(self)
return self._processed_content
def format_message(message: ChatMessage) -> str:
"""Format message with improved markdown and syntax highlighting for large responses"""
try:
if message.type == "code":
# Default to Python for code blocks
lang = "python"
content = message.content.strip()
# Enhanced language detection
if content.startswith("```"):
first_line = content.split("\n")[0]
lang = first_line.replace("```", "").strip() or "python"
content = "\n".join(content.split("\n")[1:])
if content.endswith("```"):
content = content[:-3]
# Extended language detection
if "." in content:
ext_match = re.search(r'\.(py|js|html|css|json|md|sql|sh|bash|yaml|yml|java|cpp|c|go|rs|ts)$', content.lower())
if ext_match:
lang_map = {
'py': 'python',
'js': 'javascript',
'html': 'html',
'css': 'css',
'json': 'json',
'md': 'markdown',
'sql': 'sql',
'sh': 'bash',
'bash': 'bash',
'yaml': 'yaml',
'yml': 'yaml',
'java': 'java',
'cpp': 'cpp',
'c': 'c',
'go': 'go',
'rs': 'rust',
'ts': 'typescript'
}
lang = lang_map.get(ext_match.group(1), lang)
# Format code with proper spacing and syntax
formatted_content = f"```{lang}\n{content.strip()}\n```"
# Add visual separator for multiple code blocks
if "\n\n```" in message.content:
formatted_content = f"\n{formatted_content}\n"
return formatted_content
elif message.type == "error":
return f'
❌ **Error:** {message.content}
'
elif message.type == "warning":
return f'
⚠️ **Warning:** {message.content}
'
elif message.type == "success":
return f'
✅ {message.content}
'
else:
# Clean and format regular text
content = message.content.strip()
# Handle inline code with better spacing
content = re.sub(r'(?\1 ', content)
content = re.sub(r'\s+', ' ', content) # Normalize spaces
# Handle markdown links with proper spacing
content = re.sub(r'\[([^\]]+)\]\(([^\)]+)\)', r'\1', content)
# Improve list formatting
content = re.sub(r'(\n\s*[-*]\s+[^\n]+)(?=\n\s*[^-*]|\Z)', r'\1\n', content)
# Enhanced console output formatting
if "$ " in content or "%" in content:
lines = content.split("\n")
formatted_lines = []
in_output = False
output_buffer = []
for line in lines:
if line.strip().startswith(("$ ", "%")):
# Format collected output
if output_buffer:
formatted_lines.append(f'
{"".join(output_buffer)}
')
output_buffer = []
# Format command with proper styling
formatted_lines.append(f'{line}')
in_output = True
elif in_output:
# Collect output lines
output_buffer.append(line + "\n")
else:
# Regular text line
formatted_lines.append(line)
# Handle any remaining output
if output_buffer:
formatted_lines.append(f'
{"".join(output_buffer)}
')
content = "\n".join(formatted_lines)
# Clean up excessive newlines
content = re.sub(r'\n{3,}', '\n\n', content)
return content
except Exception as e:
st.error(f"Error formatting message: {str(e)}")
return message.content
def handle_user_input(user_input: str):
"""Handle user input with improved streaming and chunking for large responses"""
if not user_input.strip():
return
# Rate limiting with exponential backoff
current_time = time.time()
if hasattr(st.session_state, 'last_request_time'):
time_since_last = current_time - st.session_state.last_request_time
min_interval = 1.0 # Base interval in seconds
if hasattr(st.session_state, 'request_count'):
st.session_state.request_count += 1
if st.session_state.request_count > 5:
min_interval = min(5.0, min_interval * 1.5)
else:
st.session_state.request_count = 1
if time_since_last < min_interval:
st.warning(f"Please wait {min_interval - time_since_last:.1f} seconds before sending another message...")
time.sleep(min_interval - time_since_last)
st.session_state.last_request_time = current_time
st.session_state.request_count = 1
# Add user message
user_message = ChatMessage(user_input, "text", "user")
st.session_state.messages.append(user_message)
with st.chat_message("user", avatar="🧑💻"):
st.markdown(user_message.get_formatted_content())
# Process with interpreter
try:
with st.chat_message("assistant", avatar="🤖"):
message_container = st.container()
with st.spinner("Thinking..."):
# Initialize buffers and state
message_buffer = []
code_buffer = []
current_chunk = {
'type': 'message',
'content': '',
'language': None
}
# Create placeholder for streaming updates
with message_container:
response_placeholder = st.empty()
# Enhanced streaming with chunking
for chunk in interpreter.chat(user_input, stream=True, display=False):
if isinstance(chunk, dict):
content = str(chunk.get('content', '')) # Convert content to string
chunk_type = chunk.get('type', 'message')
# Skip empty chunks
if not content:
continue
# Handle different chunk types
if chunk_type == 'message':
# Flush code buffer if exists
if code_buffer:
code_text = ''.join(str(item) for item in code_buffer) # Convert each item to string
if code_text.strip():
message_buffer.append(f"\n```{current_chunk['language'] or 'python'}\n{code_text.strip()}\n```\n")
code_buffer = []
# Add message content
message_buffer.append(content)
current_chunk = {'type': 'message', 'content': content}
elif chunk_type in ['code', 'console']:
# Start new code block if needed
if current_chunk['type'] != 'code':
if code_buffer: # Flush previous code buffer
code_text = ''.join(str(item) for item in code_buffer) # Convert each item to string
if code_text.strip():
message_buffer.append(f"\n```{current_chunk['language'] or 'python'}\n{code_text.strip()}\n```\n")
code_buffer = []
current_chunk = {
'type': 'code',
'language': chunk.get('format', 'python')
}
# Accumulate code content
code_buffer.append(content)
# Update display with proper chunking
try:
display_content = ''.join(str(item) for item in message_buffer) # Convert each item to string
if code_buffer: # Add current code buffer if exists
code_text = ''.join(str(item) for item in code_buffer) # Convert each item to string
if code_text.strip():
display_content += f"\n```{current_chunk['language'] or 'python'}\n{code_text.strip()}\n```"
# Use markdown for display with proper formatting
response_placeholder.markdown(display_content)
except Exception as e:
st.error(f"Error updating display: {str(e)}")
# Final cleanup and display
try:
# Handle any remaining code buffer
if code_buffer:
code_text = ''.join(str(item) for item in code_buffer) # Convert each item to string
if code_text.strip():
message_buffer.append(f"\n```{current_chunk['language'] or 'python'}\n{code_text.strip()}\n```\n")
# Prepare final response
final_response = ''.join(str(item) for item in message_buffer) # Convert each item to string
# Create and store assistant message
assistant_message = ChatMessage(final_response, "text", "assistant")
st.session_state.messages.append(assistant_message)
# Final display update
response_placeholder.markdown(assistant_message.get_formatted_content())
except Exception as e:
st.error(f"Error in final display update: {str(e)}")
response_placeholder.markdown(final_response)
except Exception as e:
error_msg = ChatMessage(str(e), "error", "assistant")
st.session_state.messages.append(error_msg)
st.error(error_msg.get_formatted_content())
# Add file handling functions
def get_session_folder() -> str:
"""Get or create the session folder for file uploads with proper error handling"""
if not st.session_state.settings["use_workspace"]:
return "/"
try:
# If a custom workspace path is set, use it
if st.session_state.get("session_dir"):
workspace_dir = Path(st.session_state.session_dir)
else:
# Use default workspace directory
workspace_dir = Path("autointerpreter-workspace")
# Create directory if it doesn't exist
workspace_dir.mkdir(parents=True, exist_ok=True, mode=0o755)
# If no session directory is set, create one
if not st.session_state.get("session_dir"):
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H_%M")
session_dir = workspace_dir / f"session-{timestamp}"
session_dir.mkdir(exist_ok=True, mode=0o755)
st.session_state.session_dir = str(session_dir)
st.session_state.uploaded_files = []
update_custom_instructions()
# Verify the workspace directory exists and is accessible
if not workspace_dir.exists():
st.error(f"Workspace directory not found: {workspace_dir}")
# Create a new default workspace
workspace_dir = Path("autointerpreter-workspace")
workspace_dir.mkdir(parents=True, exist_ok=True, mode=0o755)
st.session_state.session_dir = str(workspace_dir)
st.session_state.uploaded_files = []
update_custom_instructions()
return st.session_state.session_dir
except Exception as e:
st.error(f"Error managing workspace: {str(e)}")
return "/"
def handle_file_upload(uploaded_files):
"""Enhanced file upload handling with better error handling and validation"""
if not uploaded_files:
return
try:
session_dir = Path(get_session_folder())
if str(session_dir) == "/":
st.error("Invalid workspace configuration!")
return
if not session_dir.exists():
st.error("Session directory does not exist!")
try:
session_dir.mkdir(parents=True, exist_ok=True, mode=0o755)
st.success("Created new session directory.")
except Exception as e:
st.error(f"Failed to create session directory: {str(e)}")
return
for uploaded_file in uploaded_files:
try:
# Validate file
if uploaded_file.size == 0:
st.warning(f"Skipping empty file: {uploaded_file.name}")
continue
# Sanitize filename
safe_filename = Path(uploaded_file.name).name
safe_filename = re.sub(r'[^a-zA-Z0-9._-]', '_', safe_filename)
# Validate extension
allowed_extensions = {'.txt', '.py', '.js', '.html', '.css', '.json', '.md', '.csv', '.yml', '.yaml'}
file_ext = Path(safe_filename).suffix.lower()
if file_ext not in allowed_extensions:
st.warning(f"Unsupported file type: {file_ext}. Skipping {safe_filename}")
continue
file_path = session_dir / safe_filename
# Handle file conflicts
if file_path.exists():
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
name_parts = safe_filename.rsplit('.', 1)
safe_filename = f"{name_parts[0]}_{timestamp}.{name_parts[1]}" if len(name_parts) > 1 else f"{safe_filename}_{timestamp}"
file_path = session_dir / safe_filename
# Check file size
file_size = len(uploaded_file.getvalue())
if file_size > 100 * 1024 * 1024: # 100MB limit
st.warning(f"File {safe_filename} is too large (>{file_size/(1024*1024):.1f}MB). Skipping...")
continue
# Save file with error handling
try:
with open(file_path, "wb") as f:
f.write(uploaded_file.getbuffer())
# Add to session state
file_info = {
"name": safe_filename,
"path": str(file_path),
"size": uploaded_file.size,
"type": uploaded_file.type or "application/octet-stream",
"timestamp": datetime.now(timezone.utc)
}
if "uploaded_files" not in st.session_state:
st.session_state.uploaded_files = []
# Update or append file info
existing_file = next((f for f in st.session_state.uploaded_files if f["path"] == str(file_path)), None)
if existing_file:
existing_file.update(file_info)
else:
st.session_state.uploaded_files.append(file_info)
st.success(f"Successfully uploaded: {safe_filename}")
except Exception as e:
st.error(f"Error saving {safe_filename}: {str(e)}")
if file_path.exists():
try:
file_path.unlink()
except Exception as cleanup_error:
st.error(f"Error cleaning up partial file: {str(cleanup_error)}")
continue
except Exception as e:
st.error(f"Error processing upload {uploaded_file.name}: {str(e)}")
continue
# Update instructions after successful uploads
update_custom_instructions()
except Exception as e:
st.error(f"Error handling file uploads: {str(e)}")
def update_custom_instructions():
"""Update custom instructions with workspace info"""
workspace_path = get_session_folder()
files_info = ""
if st.session_state.get("uploaded_files"):
files = [f"{info['name']} ({info['size'] / 1024:.1f} KB)"
for info in st.session_state.uploaded_files]
files_info = "\nFiles: " + ", ".join(files)
workspace_info = f"\nWorking Space: {workspace_path}{files_info}"
# Update instructions
current_instructions = st.session_state.settings.get("custom_instructions", "").strip()
current_instructions = re.sub(r'\nWorking Space:.*?(?=\n|$)', '', current_instructions, flags=re.MULTILINE)
current_instructions = re.sub(r'\nFiles:.*?(?=\n|$)', '', current_instructions, flags=re.MULTILINE)
new_instructions = current_instructions + ("\n" if current_instructions else "") + workspace_info
st.session_state.settings["custom_instructions"] = new_instructions
apply_interpreter_settings()
def get_file_icon(file_type: str) -> str:
"""Get appropriate icon for file type"""
if file_type.startswith('image/'):
return "🖼️"
elif file_type.startswith('text/'):
return "📄"
elif file_type.startswith('application/pdf'):
return "📑"
elif file_type.startswith('application/json'):
return "����"
elif 'python' in file_type.lower():
return "🐍"
elif 'javascript' in file_type.lower():
return ""
elif 'spreadsheet' in file_type.lower():
return "📊"
else:
return "📎"
# Add audio track selection handler
def update_audio_track():
"""Update the selected audio track"""
st.session_state.selected_audio_track = st.session_state.audio_select
def create_audio_player():
return f"""
"""
def cleanup_old_sessions():
"""Clean up old session directories with improved error handling and format detection"""
try:
workspace_dir = Path("autointerpreter-workspace")
if not workspace_dir.exists():
return
current_time = datetime.now(timezone.utc)
current_session = st.session_state.get("session_dir")
# Define supported timestamp formats
timestamp_formats = [
"%Y-%m-%d_%H_%M", # Standard format: 2024-01-20_14_30
"%Y%m%d_%H%M%S", # Compact format: 20240120_143000
]
for session_dir in workspace_dir.glob("session-*"):
try:
if str(session_dir) == current_session:
continue
dir_name = session_dir.name
timestamp_str = None
session_time = None
# Try to extract timestamp based on different patterns
if dir_name.startswith("session-"):
# Try standard timestamp format first
timestamp_str = dir_name.replace("session-", "")
# Try each supported format
for fmt in timestamp_formats:
try:
session_time = datetime.strptime(timestamp_str, fmt).replace(tzinfo=timezone.utc)
break
except ValueError:
continue
# If no supported format matches, check for UUID-like format
if not session_time and (
len(timestamp_str) == 36 or # Standard UUID
len(timestamp_str) == 32 or # Compact UUID
'-' in timestamp_str # Any UUID-like string
):
# For UUID-based sessions, use file modification time
try:
mtime = session_dir.stat().st_mtime
session_time = datetime.fromtimestamp(mtime, tz=timezone.utc)
except Exception:
continue
if not session_time:
# Skip without warning for unrecognized formats
continue
# If older than 24 hours
if (current_time - session_time).days >= 1:
try:
if session_dir.exists(): # Double check existence
# Check if directory is empty
has_files = any(session_dir.iterdir())
if has_files:
# Move to archive instead of deleting if contains files
archive_dir = workspace_dir / "archived_sessions"
archive_dir.mkdir(exist_ok=True)
new_name = f"archived_{dir_name}_{current_time.strftime('%Y%m%d_%H%M%S')}"
shutil.move(str(session_dir), str(archive_dir / new_name))
print(f"Archived session with files: {session_dir}")
else:
# Delete if empty
shutil.rmtree(session_dir)
print(f"Cleaned up empty session: {session_dir}")
except Exception as e:
print(f"Error processing session directory {session_dir}: {str(e)}")
except Exception as e:
print(f"Error processing session directory {session_dir}: {str(e)}")
continue
except Exception as e:
print(f"Error in cleanup: {str(e)}")
def main():
# 1. Initialize session state
init_session_state()
# 2. Page config (should be at top)
st.set_page_config(
page_title="AutoInterpreter",
layout="wide",
initial_sidebar_state="collapsed",
menu_items={
'Get Help': 'https://github.com/samihalawa/autointerpreter',
'Report a bug': "https://github.com/samihalawa/autointerpreter/issues",
'About': "# AutoInterpreter\nThe Final AI Coding Experience"
}
)
# 3. Load and apply styles
st.markdown("""
""", unsafe_allow_html=True)
# 4. Setup UI components
lottie_coding = load_lottieurl('https://assets5.lottiefiles.com/packages/lf20_fcfjwiyb.json')
# 5. Create header and settings UI
col1, col2, col3 = st.columns([0.2, 0.6, 0.2])
with col2:
st_lottie(lottie_coding, height=200, key="coding")
colored_header(
label="AutoInterpreter",
description="Run Any Code. The Final AI Coding Experience.",
color_name="red-70"
)
with col3:
st.markdown('
', unsafe_allow_html=True)
btn_col1, btn_col2 = st.columns([1, 1])
with btn_col1:
# Toggle settings when button is clicked
if st.button("⚙️", help="Configure AutoInterpreter", key="settings_btn"):
st.session_state.settings_open = not st.session_state.settings_open
with btn_col2:
if st.button("❌", help="Clear chat history", key="clear_btn"):
if st.button("✓", help="Confirm clear", key="confirm_btn"):
clear_chat_history()
st.markdown('
', unsafe_allow_html=True)
# Move theme application before settings panel
theme = st.session_state.settings.get("theme", "light")
st.markdown(f"", unsafe_allow_html=True)
# Enhanced Settings modal with tabs
if st.session_state.settings_open:
with st.expander("Settings Panel", expanded=True):
# Add close button to top-right of settings panel
col1, col2 = st.columns([0.9, 0.1])
with col2:
if st.button("✖️", help="Close settings", key="close_settings"):
st.session_state.settings_open = False
save_settings() # Save settings before closing
st.rerun()
# Settings tabs
tab1, tab2, tab3 = st.tabs(["API & Model", "Code Settings", "Assistant Settings"])
with tab1:
# API Settings
st.text_input(
"API Key",
value=st.session_state.settings["api_key"],
type="password",
key="api_key",
help="Enter your HuggingFace API key"
)
st.markdown("---")
st.markdown("### 🤖 Model Selection")
# Current model display
current_model = st.session_state.selected_model.split('/')[-1]
st.info(f"Current Model: **{current_model}**", icon="🤖")
# Model Selection with categories
model_category = st.radio(
"Model Category",
["Qwen Coder Series", "Qwen General Series", "Other Models"],
help="Select model category"
)
filtered_models = {
k: v for k, v in model_options.items()
if (
(model_category == "Qwen Coder Series" and "Qwen2.5-Coder" in k) or
(model_category == "Qwen General Series" and "Qwen2.5-" in k and "Coder" not in k) or
(model_category == "Other Models" and "Qwen" not in k)
)
}
selected_model = st.selectbox(
"Select Model",
options=list(filtered_models.keys()),
format_func=lambda x: x.split('/')[-1],
key="model_select",
help="Choose your preferred model"
)
# Theme Selection
st.markdown("---")
st.markdown("### 🎨 Theme")
theme_selection = st.selectbox(
"UI Theme",
options=["light", "dark"],
index=0 if theme == "light" else 1,
key="theme_select",
help="Select the UI theme"
)
# Save Settings Button - More prominent
st.markdown("---")
col1, col2 = st.columns([0.7, 0.3])
with col2:
if st.button("💾 Save Changes", type="primary", use_container_width=True):
# Update settings
st.session_state.settings.update({
"api_key": st.session_state.api_key,
"model": st.session_state.model_select,
"api_base": model_options[st.session_state.model_select],
"theme": st.session_state.theme_select
})
st.session_state.selected_model = st.session_state.model_select
save_settings()
st.session_state.settings_open = False
st.success("✅ Settings saved successfully!")
time.sleep(0.5)
st.rerun()
with tab2:
# Code Execution Settings
col1, col2 = st.columns(2)
with col1:
st.toggle(
"Auto Run Code",
value=st.session_state.settings["auto_run"],
key="auto_run",
help="Automatically execute code without confirmation",
on_change=lambda: st.session_state.settings.update({"auto_run": st.session_state.auto_run})
)
st.selectbox(
"Code Style",
options=["monokai", "github", "dracula"],
index=0,
key="code_style",
help="Code highlighting theme",
on_change=lambda: st.session_state.settings.update({"code_style": st.session_state.code_style})
)
with col2:
st.selectbox(
"Safe Mode",
options=["off", "ask", "auto"],
index=0,
key="safe_mode",
help="Code execution safety level",
on_change=lambda: st.session_state.settings.update({"safe_mode": st.session_state.safe_mode})
)
with tab3:
# Assistant Behavior Settings
"""
st.toggle(
"Save Chat History",
value=st.session_state.settings["conversation_history"],
key="conversation_history",
help="Preserve conversation between sessions",
on_change=lambda: st.session_state.settings.update({"conversation_history": st.session_state.conversation_history})
)
"""
# System Message Settings
st.text_area(
"Default System Message",
value=interpreter.system_message,
disabled=True,
help="Base instructions for the AI assistant",
height=100
)
st.text_area(
"Custom Instructions",
value=st.session_state.settings["custom_instructions"],
key="custom_instructions",
help="Additional instructions for the assistant",
height=100,
on_change=lambda: st.session_state.settings.update({"custom_instructions": st.session_state.custom_instructions})
)
# OS Control Settings
st.markdown("---")
st.markdown("### System Access Settings")
st.markdown("**Warning**: OS mode enables system control (mouse, keyboard, screen access)")
st.toggle(
"Enable System Control",
value=st.session_state.settings["os_mode"],
key="os_mode",
help="Allow assistant to control system",
on_change=lambda: st.session_state.settings.update({"os_mode": st.session_state.os_mode})
)
if st.session_state.settings["os_mode"]:
st.toggle(
"Restricted Access",
value=st.session_state.settings["os_restricted_mode"],
key="os_restricted_mode",
help="Limit system access to specific paths",
on_change=lambda: st.session_state.settings.update({"os_restricted_mode": st.session_state.os_restricted_mode})
)
if st.session_state.settings["os_restricted_mode"]:
st.text_area(
"Allowed Paths",
value="\n".join(st.session_state.settings["allowed_paths"]),
help="One path per line",
key="allowed_paths",
on_change=lambda: st.session_state.settings.update({
"allowed_paths": [p.strip() for p in st.session_state.allowed_paths.split("\n") if p.strip()]
})
)
# Add current model display after header
col1, col2, col3 = st.columns([0.2, 0.6, 0.2])
with col3:
st.markdown(
f"""
""",
unsafe_allow_html=True
)
# Initialize selected track in session state if not exists
if "selected_audio_track" not in st.session_state:
st.session_state.selected_audio_track = "Ambient"
# Create floating audio player
st.markdown(create_audio_player(), unsafe_allow_html=True)
# Simplified workspace control in sidebar
with st.sidebar:
st.markdown("### 🗂️ Workspace Control")
# Single workspace toggle in sidebar
st.toggle(
"Use Workspace",
value=st.session_state.settings["use_workspace"],
key="use_workspace",
help="Restrict file access to workspace directory only",
on_change=lambda: (
st.session_state.settings.update({"use_workspace": st.session_state.use_workspace}),
apply_interpreter_settings()
)
)
if st.session_state.settings["use_workspace"]:
st.markdown("### 📂 Workspace")
# Add workspace path input
workspace_path = st.text_input(
"Workspace Path",
value=str(Path(st.session_state.get("session_dir", "autointerpreter-workspace")).resolve()),
help="Enter the full path to your workspace directory",
key="workspace_path_input"
)
# Add Set Path button
if st.button("📁 Set Workspace Path", use_container_width=True):
try:
workspace_dir = Path(workspace_path)
workspace_dir.mkdir(parents=True, exist_ok=True)
st.session_state.session_dir = str(workspace_dir)
apply_interpreter_settings()
st.success(f"✅ Workspace set to: {workspace_path}")
st.rerun()
except Exception as e:
st.error(f"❌ Error setting workspace path: {str(e)}")
# File browser for workspace management
event = sfb.st_file_browser(
path=st.session_state.get("session_dir", "autointerpreter-workspace"),
key="file_browser",
show_choose_file=True,
show_delete_file=True,
show_new_folder=True,
show_upload_file=True,
show_preview=True
)
if event:
if event.get("type") == "file_selected":
st.session_state.session_dir = event["path"]
st.code(f"Current workspace: {event['path']}", language="bash")
apply_interpreter_settings()
elif event.get("type") == "folder_created":
st.success(f"Created folder: {event['path']}")
elif event.get("type") == "file_deleted":
st.warning(f"Deleted: {event['path']}")
if str(event['path']) == st.session_state.get('session_dir'):
st.session_state.pop('session_dir', None)
apply_interpreter_settings()
# After settings panel and before chat history display
# Apply interpreter settings
apply_interpreter_settings()
# Display chat history with enhanced formatting
for message in st.session_state.messages:
with st.chat_message(message.role, avatar="🧑💻" if message.role == "user" else "🤖"):
st.markdown(message.get_formatted_content())
# Handle new user input
user_input = st.chat_input("Ask me anything about coding...", key="chat_input")
if user_input:
handle_user_input(user_input)
# Add ARIA labels to main UI components
st.markdown("""
""", unsafe_allow_html=True)
# Add cleanup of old sessions at the end
if st.session_state.settings["use_workspace"]:
cleanup_old_sessions()
if __name__ == "__main__":
main()