|
import json |
|
import mimetypes |
|
import os |
|
import re |
|
import shutil |
|
import threading |
|
import uuid |
|
from typing import Optional |
|
from loguru import logger |
|
from datetime import datetime |
|
|
|
import gradio as gr |
|
from dotenv import load_dotenv |
|
from huggingface_hub import login, HfApi |
|
from smolagents import ( |
|
CodeAgent, |
|
InferenceClientModel, |
|
Tool, |
|
DuckDuckGoSearchTool, |
|
) |
|
from smolagents.agent_types import ( |
|
AgentAudio, |
|
AgentImage, |
|
AgentText, |
|
handle_agent_output_types, |
|
) |
|
from smolagents.gradio_ui import stream_to_gradio |
|
|
|
from scripts.text_inspector_tool import TextInspectorTool |
|
from scripts.text_web_browser import ( |
|
ArchiveSearchTool, |
|
FinderTool, |
|
FindNextTool, |
|
PageDownTool, |
|
PageUpTool, |
|
SimpleTextBrowser, |
|
VisitTool, |
|
) |
|
from scripts.visual_qa import visualizer |
|
from scripts.cvedb_tool import CVEDBTool |
|
from scripts.report_generator import ReportGeneratorTool |
|
from scripts.epss_tool import EpsTool |
|
from scripts.nvd_tool import NvdTool |
|
from scripts.kevin_tool import KevinTool |
|
|
|
|
|
web_search = DuckDuckGoSearchTool() |
|
|
|
AUTHORIZED_IMPORTS = [ |
|
"requests", |
|
"zipfile", |
|
"pandas", |
|
"numpy", |
|
"sympy", |
|
"json", |
|
"bs4", |
|
"pubchempy", |
|
"xml", |
|
"yahoo_finance", |
|
"Bio", |
|
"sklearn", |
|
"scipy", |
|
"pydub", |
|
"PIL", |
|
"chess", |
|
"PyPDF2", |
|
"pptx", |
|
"torch", |
|
"datetime", |
|
"fractions", |
|
"csv", |
|
"plotly", |
|
"plotly.express", |
|
"plotly.graph_objects", |
|
"jinja2", |
|
] |
|
|
|
load_dotenv(override=True) |
|
|
|
|
|
if os.getenv("HF_TOKEN"): |
|
try: |
|
login(os.getenv("HF_TOKEN")) |
|
logger.info("Successfully logged in with HF_TOKEN from environment") |
|
except Exception as e: |
|
logger.warning(f"Failed to login with HF_TOKEN from environment: {e}") |
|
logger.info("You can still use the application by providing a valid API key in the interface") |
|
|
|
|
|
user_sessions = {} |
|
session_lock = threading.Lock() |
|
|
|
append_answer_lock = threading.Lock() |
|
|
|
|
|
browser = SimpleTextBrowser(request_kwargs={}) |
|
|
|
|
|
cvedb_tool = CVEDBTool() |
|
epss_tool = EpsTool() |
|
nvd_tool = NvdTool() |
|
kevin_tool = KevinTool() |
|
|
|
def validate_hf_api_key(api_key: str) -> tuple[bool, str]: |
|
"""Validate Hugging Face API key by making a test request.""" |
|
if not api_key or not api_key.strip(): |
|
return False, "❌ API key cannot be empty" |
|
|
|
api_key = api_key.strip() |
|
|
|
|
|
if not api_key.startswith("hf_"): |
|
return False, "❌ Invalid API key format. Hugging Face API keys start with 'hf_'" |
|
|
|
try: |
|
|
|
api = HfApi(token=api_key) |
|
|
|
user_info = api.whoami() |
|
return True, f"✅ API key validated successfully! Welcome, {user_info.get('name', 'User')}!" |
|
except Exception as e: |
|
return False, f"❌ Invalid API key: {str(e)}" |
|
|
|
def create_model_with_api_key(hf_token: str, model_id: str = None) -> InferenceClientModel: |
|
"""Create a model instance with the provided API key.""" |
|
if not model_id: |
|
model_id = "Qwen/Qwen2.5-Coder-32B-Instruct" |
|
|
|
|
|
original_token = os.environ.get("HF_TOKEN") |
|
|
|
try: |
|
|
|
os.environ["HF_TOKEN"] = hf_token |
|
|
|
|
|
model = InferenceClientModel( |
|
model_id=model_id, |
|
) |
|
|
|
return model |
|
finally: |
|
|
|
if original_token: |
|
os.environ["HF_TOKEN"] = original_token |
|
elif "HF_TOKEN" in os.environ: |
|
del os.environ["HF_TOKEN"] |
|
|
|
def create_tools_with_model(model: InferenceClientModel): |
|
"""Create tools with the provided model.""" |
|
|
|
if model is None: |
|
raise ValueError("Model is None, cannot create TextInspectorTool") |
|
|
|
|
|
|
|
ti_tool = TextInspectorTool(model, 20000) |
|
|
|
|
|
if ti_tool is None: |
|
raise ValueError("Failed to create TextInspectorTool") |
|
|
|
tools = [ |
|
web_search, |
|
VisitTool(browser), |
|
PageUpTool(browser), |
|
PageDownTool(browser), |
|
FinderTool(browser), |
|
FindNextTool(browser), |
|
ArchiveSearchTool(browser), |
|
ti_tool, |
|
cvedb_tool, |
|
|
|
epss_tool, |
|
nvd_tool, |
|
kevin_tool, |
|
] |
|
|
|
return tools |
|
|
|
|
|
def create_agent(hf_token: str = None, model_id: str = None, max_steps: int = 10): |
|
"""Creates a fresh agent instance for each session""" |
|
if not hf_token: |
|
raise ValueError("A valid Hugging Face API key is required to create an agent.") |
|
|
|
logger.info(f"Creating agent with token: {hf_token[:10]}...") |
|
|
|
|
|
model = create_model_with_api_key(hf_token, model_id) |
|
tools = create_tools_with_model(model) |
|
|
|
|
|
has_text_inspector = any(hasattr(tool, 'name') and tool.name == 'inspect_file_as_text' for tool in tools) |
|
if not has_text_inspector: |
|
raise ValueError("TextInspectorTool not found in tools list") |
|
|
|
agent = CodeAgent( |
|
model=model, |
|
tools=[visualizer] + tools, |
|
max_steps=max_steps, |
|
verbosity_level=1, |
|
additional_authorized_imports=AUTHORIZED_IMPORTS, |
|
planning_interval=4, |
|
) |
|
|
|
logger.info("Agent created successfully") |
|
return agent |
|
|
|
def get_user_session(request: gr.Request) -> str: |
|
"""Get or create a unique session ID for the user.""" |
|
if not request: |
|
logger.warning("No request object, using random session ID") |
|
return str(uuid.uuid4()) |
|
|
|
|
|
session_id = request.headers.get("x-session-id") |
|
if not session_id: |
|
|
|
client_ip = request.client.host if hasattr(request, 'client') and request.client else "unknown" |
|
user_agent = request.headers.get("user-agent", "unknown") |
|
|
|
import hashlib |
|
session_hash = hashlib.md5(f"{client_ip}:{user_agent}".encode()).hexdigest() |
|
session_id = f"session_{session_hash[:8]}" |
|
logger.info(f"Created stable session ID {session_id} for client {client_ip}") |
|
|
|
return session_id |
|
|
|
def get_stable_session_id(request: gr.Request) -> str: |
|
"""Get a stable session ID that persists across requests.""" |
|
if not request: |
|
logger.warning("No request object, using random session ID") |
|
return f"random_{str(uuid.uuid4())[:8]}" |
|
|
|
|
|
client_ip = getattr(request.client, 'host', 'unknown') if request.client else 'unknown' |
|
user_agent = request.headers.get("user-agent", "unknown") |
|
|
|
|
|
accept_language = request.headers.get("accept-language", "unknown") |
|
accept_encoding = request.headers.get("accept-encoding", "unknown") |
|
|
|
|
|
import hashlib |
|
session_data = f"{client_ip}:{user_agent}:{accept_language}:{accept_encoding}" |
|
session_hash = hashlib.md5(session_data.encode()).hexdigest() |
|
session_id = f"user_{session_hash[:16]}" |
|
|
|
logger.info(f"Generated session ID: {session_id}") |
|
logger.info(f"Session data: {session_data}") |
|
|
|
return session_id |
|
|
|
def get_unique_session_id(request: gr.Request) -> str: |
|
"""Get a truly unique session ID for each request.""" |
|
if not request: |
|
return f"unique_{str(uuid.uuid4())[:8]}" |
|
|
|
|
|
import time |
|
timestamp = int(time.time() * 1000) |
|
client_ip = getattr(request.client, 'host', 'unknown') if request.client else 'unknown' |
|
user_agent = request.headers.get("user-agent", "unknown") |
|
|
|
|
|
import hashlib |
|
session_data = f"{timestamp}:{client_ip}:{user_agent}" |
|
session_hash = hashlib.md5(session_data.encode()).hexdigest() |
|
session_id = f"unique_{session_hash[:16]}" |
|
|
|
logger.info(f"Generated unique session ID: {session_id}") |
|
|
|
return session_id |
|
|
|
def get_persistent_session_id(request: gr.Request) -> str: |
|
"""Get a persistent session ID that stays the same for the same client.""" |
|
if not request: |
|
return f"persistent_{str(uuid.uuid4())[:8]}" |
|
|
|
|
|
client_ip = getattr(request.client, 'host', 'unknown') if request.client else 'unknown' |
|
user_agent = request.headers.get("user-agent", "unknown") |
|
accept_language = request.headers.get("accept-language", "unknown") |
|
|
|
|
|
import hashlib |
|
session_data = f"{client_ip}:{user_agent}:{accept_language}" |
|
session_hash = hashlib.md5(session_data.encode()).hexdigest() |
|
session_id = f"persistent_{session_hash[:16]}" |
|
|
|
logger.info(f"Generated persistent session ID: {session_id}") |
|
logger.info(f"Session data: {session_data}") |
|
|
|
return session_id |
|
|
|
def get_session_data(session_id: str) -> dict: |
|
"""Get session data for a specific user.""" |
|
with session_lock: |
|
if session_id not in user_sessions: |
|
user_sessions[session_id] = { |
|
"hf_token": None, |
|
"agent": None, |
|
"max_steps": 10, |
|
"created_at": datetime.now() |
|
} |
|
return user_sessions[session_id] |
|
|
|
def clear_session_data(session_id: str): |
|
"""Clear session data for a specific user.""" |
|
with session_lock: |
|
if session_id in user_sessions: |
|
|
|
user_sessions[session_id]["hf_token"] = None |
|
user_sessions[session_id]["agent"] = None |
|
logger.info(f"Session {session_id[:8]}... cleared") |
|
|
|
def clear_agent_only(session_id: str): |
|
"""Clear only the agent, keeping the API key for convenience.""" |
|
with session_lock: |
|
if session_id in user_sessions: |
|
if "agent" in user_sessions[session_id]: |
|
del user_sessions[session_id]["agent"] |
|
logger.info(f"Session {session_id[:8]}... agent cleared") |
|
|
|
|
|
|
|
class GradioUI: |
|
"""A one-line interface to launch your agent in Gradio""" |
|
|
|
def __init__(self, file_upload_folder: str | None = None): |
|
self.file_upload_folder = file_upload_folder |
|
if self.file_upload_folder is not None: |
|
if not os.path.exists(file_upload_folder): |
|
os.mkdir(file_upload_folder) |
|
|
|
self.reports_folder = "reports" |
|
if not os.path.exists(self.reports_folder): |
|
os.mkdir(self.reports_folder) |
|
|
|
def save_report(self, html_content: str) -> str: |
|
"""Saves the HTML report and returns the file path.""" |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
filename = f"vulnerability_report_{timestamp}.html" |
|
filepath = os.path.join(self.reports_folder, filename) |
|
|
|
with open(filepath, "w", encoding="utf-8") as f: |
|
f.write(html_content) |
|
|
|
return filepath |
|
|
|
def validate_api_key(self, api_key: str) -> tuple[str, str]: |
|
"""Validate API key and return status message.""" |
|
is_valid, message = validate_hf_api_key(api_key) |
|
if is_valid: |
|
return message, "success" |
|
else: |
|
return message, "error" |
|
|
|
def interact_with_agent(self, prompt, messages, request: gr.Request): |
|
"""Handle agent interaction with proper session management.""" |
|
|
|
session_id = get_persistent_session_id(request) |
|
session_data = get_session_data(session_id) |
|
|
|
logger.info(f"Processing request for session {session_id}...") |
|
logger.info(f"Request client: {request.client.host if request and request.client else 'unknown'}") |
|
logger.info(f"Request user-agent: {request.headers.get('user-agent', 'unknown')[:50] if request else 'unknown'}") |
|
logger.info(f"All active sessions: {list(user_sessions.keys())}") |
|
logger.info(f"Session data for {session_id}: {session_data}") |
|
|
|
|
|
if not session_data.get("agent"): |
|
|
|
hf_token = session_data.get("hf_token") |
|
|
|
|
|
if not hf_token: |
|
env_token = os.getenv("HF_TOKEN") |
|
if env_token: |
|
hf_token = env_token |
|
session_data["hf_token"] = env_token |
|
session_data["max_steps"] = 10 |
|
logger.info(f"Using HF_TOKEN from .env file for session {session_id[:8]}...") |
|
else: |
|
logger.warning(f"No API key found for session {session_id[:8]}...") |
|
error_msg = "❌ No API key configured for your session. Please enter your Hugging Face API key in the API Configuration section above and click 'Setup API Key'." |
|
messages.append(gr.ChatMessage(role="assistant", content=error_msg)) |
|
yield messages |
|
return |
|
|
|
logger.info(f"Creating agent for session {session_id[:8]}...") |
|
|
|
if hf_token: |
|
try: |
|
max_steps = session_data.get("max_steps", 10) |
|
session_data["agent"] = create_agent(hf_token, max_steps=max_steps) |
|
logger.info(f"Agent created successfully for session {session_id[:8]}...") |
|
except Exception as e: |
|
logger.error(f"Failed to create agent for session {session_id[:8]}: {e}") |
|
error_msg = f"❌ Failed to create agent with provided API key: {str(e)}" |
|
messages.append(gr.ChatMessage(role="assistant", content=error_msg)) |
|
yield messages |
|
return |
|
else: |
|
logger.info(f"Agent already exists for session {session_id[:8]}...") |
|
|
|
|
|
try: |
|
|
|
has_memory = hasattr(session_data["agent"], "memory") |
|
print(f"Agent has memory: {has_memory}") |
|
if has_memory: |
|
print(f"Memory type: {type(session_data['agent'].memory)}") |
|
|
|
|
|
from datetime import datetime |
|
current_date = datetime.now().strftime("%Y-%m-%d") |
|
|
|
|
|
system_prompt = f"""You are a Vulnerability Intelligence Analyst. Complete the user request in {session_data.get('max_steps', 10)} steps maximum. |
|
|
|
TODAY'S DATE: {current_date} |
|
|
|
AVAILABLE TOOLS: nvd_search, web_search, cvedb_search, kevin_search, epss_search |
|
|
|
CRITICAL RULES: |
|
1. VERSION ANALYSIS: |
|
- FIRST: Check current version via web search to understand the latest available version |
|
- PRIORITY: When user asks for specific version, focus ONLY on vulnerabilities affecting that version and newer |
|
- EXCLUDE OLDER: Do NOT report vulnerabilities that only affect older versions |
|
- VERSION LOGIC: |
|
* "up to X" or "before X" = affects versions UP TO X, NOT newer versions |
|
* "X+" or "X and later" = affects X and newer versions |
|
* "X through Y" = affects versions X to Y inclusive |
|
- CORRECT LOGIC: If CVE affects "up to v22.1" and user asks about v24.0 then v24.0 is NOT vulnerable |
|
- CORRECT LOGIC: If CVE affects "v25.0+" and user asks about v24.0 then v24.0 is NOT vulnerable |
|
- CORRECT LOGIC: If CVE affects "below v25.0" and user asks about v24.0 then v24.0 is vulnerable |
|
2. DATES: If user provides specific date, use that date. If user mentions "today", "current", "as of today", or "recent", use TODAY'S DATE above. |
|
3. PRODUCT SEARCH: ALWAYS use ONLY the base product name, NEVER include versions when using vulnerability tools (nvd_search, cvedb_search, kevin_search, epss_search) |
|
4. SOURCES: Always prioritize vendor/original sources for CVE, CWE, and reference links (official vendor websites, security advisories) |
|
5. SIMPLICITY: Keep code simple and logical. Avoid unnecessary library imports. Use only basic Python functions when needed. |
|
6. TOOL USAGE: Use ONLY the available tools. Do not complicate tool calls with unnecessary code. |
|
7. STRING OPERATIONS: Use simple Python methods (in, find, startswith, endswith, etc.). NEVER use .contains() - it doesn't exist in Python. Avoid complex string parsing of tool results. |
|
8. VULNERABILITY ANALYSIS: When analyzing tool results: |
|
- READ CAREFULLY: Pay attention to version ranges in vulnerability descriptions |
|
- "up to X" means versions UP TO X, NOT including newer versions |
|
- "below X" means versions BELOW X, NOT including X or newer |
|
- "X+" means X and newer versions |
|
- ONLY include vulnerabilities that actually affect the requested version |
|
- If unsure about version compatibility, exclude the vulnerability |
|
- DO NOT REASON: Don't create complex logic about version compatibility |
|
- DO NOT ASSUME: If a CVE affects "up to 22.1", it does NOT affect 24.0 |
|
- SIMPLE RULE: Only include CVEs where the version range explicitly includes the requested version |
|
9. REPORT GENERATION: Do NOT create complex functions, or loops for the final answer. Use the information collected and format it directly following the REPORT FORMAT. |
|
|
|
REPORT FORMAT: |
|
# Vulnerability Report |
|
### [Software and Version] |
|
|
|
#### CVE-ID: [CVE-YYYY-NNNNN] |
|
**NIST NVD Link:** https://nvd.nist.gov/vuln/detail/CVE-YYYY-NNNNN |
|
- **Attack Type:** [Type] |
|
- **Published:** [Date] |
|
- **CVSS:** [Score] |
|
- **EPSS:** [Score] |
|
- **KEV:** [Yes/No] |
|
- **Affected Versions:** [Specific range] |
|
- **Description:** [Description] |
|
- **CWE:** [CWE-XXX] - https://cwe.mitre.org/data/definitions/XXX.html |
|
- **Recommendations:** [Remediation advice] |
|
- **Sources:** |
|
- https://oficial-vendor-website.com/security-advisory |
|
- https://official-security-source.com |
|
- https://additional-reference-source.com |
|
|
|
INSTRUCTIONS: |
|
- Follow the exact format above |
|
- Use official vendor sources when available |
|
- Complete the task efficiently within the step limit |
|
|
|
Now it is your turn, remember to keep code simple. |
|
User Query: """ |
|
|
|
|
|
full_prompt = system_prompt + prompt |
|
|
|
|
|
display_message = prompt |
|
if "[INTERNAL CONTEXT:" in prompt: |
|
display_message = prompt.split("[INTERNAL CONTEXT:")[0].strip() |
|
|
|
messages.append(gr.ChatMessage(role="user", content=display_message)) |
|
yield messages |
|
|
|
logger.info(f"Starting agent interaction for session {session_id[:8]}...") |
|
for msg in stream_to_gradio( |
|
session_data["agent"], task=full_prompt, reset_agent_memory=False |
|
): |
|
|
|
if isinstance(msg.content, str) and msg.content.startswith("<!DOCTYPE html>"): |
|
report_path = self.save_report(msg.content) |
|
msg.content = f"Report generated and saved at: {report_path}\n\nYou can open the file in your browser to view the complete report." |
|
|
|
messages.append(msg) |
|
yield messages |
|
|
|
|
|
|
|
if "agent" in session_data: |
|
del session_data["agent"] |
|
logger.info(f"Session {session_id[:8]}... agent cleared after interaction") |
|
|
|
yield messages |
|
except Exception as e: |
|
logger.error(f"Error in interaction for session {session_id[:8]}: {str(e)}") |
|
print(f"Error in interaction: {str(e)}") |
|
error_msg = f"❌ Error during interaction: {str(e)}" |
|
messages.append(gr.ChatMessage(role="assistant", content=error_msg)) |
|
yield messages |
|
|
|
def setup_api_key(self, api_key: str, max_steps: int, request: gr.Request) -> str: |
|
"""Setup API key for the user's session.""" |
|
|
|
session_id = get_persistent_session_id(request) |
|
session_data = get_session_data(session_id) |
|
|
|
logger.info(f"Setting up API key for session {session_id}...") |
|
logger.info(f"Setup request client: {request.client.host if request and request.client else 'unknown'}") |
|
logger.info(f"Setup request user-agent: {request.headers.get('user-agent', 'unknown')[:50] if request else 'unknown'}") |
|
logger.info(f"All active sessions before setup: {list(user_sessions.keys())}") |
|
logger.info(f"Session data before setup: {session_data}") |
|
|
|
|
|
if api_key and api_key.strip(): |
|
|
|
token_to_use = api_key.strip() |
|
source = "interface" |
|
else: |
|
|
|
env_token = os.getenv("HF_TOKEN") |
|
if env_token: |
|
token_to_use = env_token |
|
source = ".env file" |
|
else: |
|
return "❌ No API key provided. Please enter your Hugging Face API key or set HF_TOKEN in your .env file." |
|
|
|
|
|
is_valid, message = validate_hf_api_key(token_to_use) |
|
|
|
if is_valid: |
|
|
|
session_data["hf_token"] = token_to_use |
|
session_data["max_steps"] = max_steps |
|
logger.info(f"API key stored in session {session_id[:8]}... from {source}") |
|
logger.info(f"Max steps set to: {max_steps}") |
|
|
|
|
|
try: |
|
session_data["agent"] = create_agent(token_to_use, max_steps=max_steps) |
|
logger.info(f"Agent created successfully for session {session_id[:8]}...") |
|
return f"✅ API key from {source} validated and agent created successfully! {message.split('!')[1] if '!' in message else ''}" |
|
except Exception as e: |
|
logger.error(f"Failed to create agent for session {session_id[:8]}: {e}") |
|
return f"❌ Failed to create agent with API key from {source}: {str(e)}" |
|
else: |
|
logger.warning(f"Invalid API key for session {session_id[:8]}... from {source}") |
|
return f"❌ Invalid API key from {source}: {message}" |
|
|
|
def upload_file( |
|
self, |
|
file, |
|
file_uploads_log, |
|
allowed_file_types=[ |
|
"application/pdf", |
|
"application/vnd.openxmlformats-officedocument.wordprocessingml.document", |
|
"text/plain", |
|
], |
|
): |
|
""" |
|
Handle file uploads, default allowed types are .pdf, .docx, and .txt |
|
""" |
|
if file is None: |
|
return gr.Textbox("No file uploaded", visible=True), file_uploads_log |
|
|
|
try: |
|
mime_type, _ = mimetypes.guess_type(file.name) |
|
except Exception as e: |
|
return gr.Textbox(f"Error: {e}", visible=True), file_uploads_log |
|
|
|
if mime_type not in allowed_file_types: |
|
return gr.Textbox("File type disallowed", visible=True), file_uploads_log |
|
|
|
|
|
original_name = os.path.basename(file.name) |
|
sanitized_name = re.sub( |
|
r"[^\w\-.]", "_", original_name |
|
) |
|
|
|
type_to_ext = {} |
|
for ext, t in mimetypes.types_map.items(): |
|
if t not in type_to_ext: |
|
type_to_ext[t] = ext |
|
|
|
|
|
sanitized_name = sanitized_name.split(".")[:-1] |
|
sanitized_name.append("" + type_to_ext[mime_type]) |
|
sanitized_name = "".join(sanitized_name) |
|
|
|
|
|
file_path = os.path.join( |
|
self.file_upload_folder, os.path.basename(sanitized_name) |
|
) |
|
shutil.copy(file.name, file_path) |
|
|
|
return gr.Textbox( |
|
f"File uploaded: {file_path}", visible=True |
|
), file_uploads_log + [file_path] |
|
|
|
def log_user_message(self, text_input, file_uploads_log): |
|
|
|
display_message = text_input |
|
|
|
|
|
internal_message = text_input |
|
if len(file_uploads_log) > 0: |
|
file_names = [os.path.basename(f) for f in file_uploads_log] |
|
file_paths = [f for f in file_uploads_log] |
|
internal_message += f"\n\n[Uploaded files available: {', '.join(file_names)}. Use inspect_file_as_text(file_path='uploads/filename') to analyze them. Use the content as plain text context if readable, no need to parse or create complex functions.]" |
|
|
|
return ( |
|
internal_message, |
|
gr.Textbox( |
|
value="", |
|
interactive=False, |
|
placeholder="Please wait while Steps are getting populated", |
|
), |
|
gr.Button(interactive=False), |
|
) |
|
|
|
def detect_device(self, request: gr.Request): |
|
|
|
|
|
if not request: |
|
return "Desktop" |
|
|
|
|
|
is_mobile_header = request.headers.get("sec-ch-ua-mobile") |
|
if is_mobile_header: |
|
return "Mobile" if "?1" in is_mobile_header else "Desktop" |
|
|
|
|
|
user_agent = request.headers.get("user-agent", "").lower() |
|
mobile_keywords = ["android", "iphone", "ipad", "mobile", "phone", "tablet"] |
|
|
|
|
|
if any(keyword in user_agent for keyword in mobile_keywords): |
|
return "Mobile" |
|
|
|
|
|
if "mobile" in user_agent or "android" in user_agent or "iphone" in user_agent: |
|
return "Mobile" |
|
|
|
|
|
platform = request.headers.get("sec-ch-ua-platform", "").lower() |
|
if platform: |
|
if platform in ['"android"', '"ios"']: |
|
return "Mobile" |
|
elif platform in ['"windows"', '"macos"', '"linux"']: |
|
return "Desktop" |
|
|
|
|
|
viewport_width = request.headers.get("viewport-width") |
|
if viewport_width: |
|
try: |
|
width = int(viewport_width) |
|
return "Mobile" if width <= 768 else "Desktop" |
|
except ValueError: |
|
pass |
|
|
|
|
|
return "Desktop" |
|
|
|
def launch(self, **kwargs): |
|
|
|
custom_css = """ |
|
@media (max-width: 768px) { |
|
.gradio-container { |
|
max-width: 100% !important; |
|
padding: 10px !important; |
|
} |
|
.main { |
|
padding: 10px !important; |
|
} |
|
.chatbot { |
|
max-height: 60vh !important; |
|
} |
|
.textbox { |
|
font-size: 16px !important; /* Prevents zoom on iOS */ |
|
} |
|
.button { |
|
min-height: 44px !important; /* Better touch targets */ |
|
} |
|
} |
|
""" |
|
|
|
with gr.Blocks(theme="ocean", fill_height=True, css=custom_css) as demo: |
|
|
|
@gr.render() |
|
def layout(request: gr.Request): |
|
device = self.detect_device(request) |
|
print(f"device - {device}") |
|
|
|
if device == "Desktop": |
|
with gr.Blocks( |
|
fill_height=True, |
|
): |
|
file_uploads_log = gr.State([]) |
|
with gr.Sidebar(): |
|
|
|
gr.Markdown("""# Open Deep Research Vulnerability Intelligence""") |
|
gr.Markdown("""<img src="https://github.githubassets.com/images/modules/logos_page/GitHub-Mark.png" width="20" height="20" style="display: inline-block; vertical-align: middle; margin-right: 8px;"> <a href="https://github.com/mcdaqc/open-deep-research-vulnerability-intelligence" target="_blank">Github Repository</a>""") |
|
|
|
|
|
with gr.Accordion("ℹ️ About", open=False): |
|
gr.Markdown("""**What it does:** |
|
This AI agent specializes in automated vulnerability research and analysis, built on <a href="https://huggingface.co/blog/open-deep-research" target="_blank">Hugging Face's Open Deep Research</a> architecture. It can search across multiple security databases to provide comprehensive vulnerability intelligence reports. |
|
|
|
**Available Tools & APIs:** |
|
- <a href="https://nvd.nist.gov/developers/vulnerabilities" target="_blank">🛡️ NIST NVD</a> - National Vulnerability Database (free API) |
|
- <a href="https://cvedb.com/" target="_blank">📊 Shodan CVEDB</a> - Comprehensive vulnerability database (free API) |
|
- <a href="https://kevin.gtfkd.com/" target="_blank">⚠️ KEVin</a> - Known Exploited Vulnerabilities database (free API) |
|
- <a href="https://www.first.org/epss/" target="_blank">📈 EPSS</a> - Exploit Prediction Scoring System (free API) |
|
- 🌐 **Web Browser** - Navigate and extract information from web pages |
|
|
|
**Model Configuration:** |
|
- **Default Model**: Qwen/Qwen2.5-Coder-32B-Instruct (recommended) |
|
- **Alternative**: You can also use Ollama with local models for privacy |
|
|
|
**How to use:** |
|
1. Enter your Hugging Face API key below |
|
2. Ask about specific software versions, CVEs, or security vulnerabilities |
|
3. The agent will automatically search all available databases |
|
4. Receive comprehensive vulnerability reports with CVSS scores, EPSS predictions, and remediation advice""") |
|
|
|
with gr.Group(): |
|
gr.Markdown("**Your request**", container=True) |
|
text_input = gr.Textbox( |
|
lines=3, |
|
label="Your request", |
|
container=False, |
|
placeholder="Enter your prompt here and press Shift+Enter or press the button", |
|
) |
|
launch_research_btn = gr.Button( |
|
"Run", variant="primary" |
|
) |
|
|
|
|
|
with gr.Accordion("💡 Example Prompts", open=False): |
|
gr.Markdown("**Click any example below to populate your request field:**") |
|
|
|
example_btn_1 = gr.Button("🔍 MobaXterm 24.0 vulnerabilities", size="sm", variant="secondary") |
|
example_btn_2 = gr.Button("🔍 Chrome 120.0.6099.109 security issues", size="sm", variant="secondary") |
|
example_btn_3 = gr.Button("🔍 Apache Tomcat 9.0.65 KEV check", size="sm", variant="secondary") |
|
example_btn_4 = gr.Button("🔍 Windows 11 recent vulnerabilities", size="sm", variant="secondary") |
|
example_btn_5 = gr.Button("🔍 CVE-2024-0001 analysis", size="sm", variant="secondary") |
|
example_btn_6 = gr.Button("🔍 Nginx 1.24.0 security status", size="sm", variant="secondary") |
|
|
|
|
|
example_btn_1.click( |
|
lambda: "Analyze MobaXterm 24.0 for vulnerabilities as of today", |
|
None, |
|
[text_input] |
|
) |
|
example_btn_2.click( |
|
lambda: "Check Chrome 120.0.6099.109 for security vulnerabilities", |
|
None, |
|
[text_input] |
|
) |
|
example_btn_3.click( |
|
lambda: "Is Apache Tomcat 9.0.65 in KEV database as of today?", |
|
None, |
|
[text_input] |
|
) |
|
example_btn_4.click( |
|
lambda: "Check Windows 11 for recent vulnerabilities as of today", |
|
None, |
|
[text_input] |
|
) |
|
example_btn_5.click( |
|
lambda: "Analyze CVE-2024-0001 in detail", |
|
None, |
|
[text_input] |
|
) |
|
example_btn_6.click( |
|
lambda: "Check Nginx 1.24.0 for security vulnerabilities", |
|
None, |
|
[text_input] |
|
) |
|
|
|
|
|
with gr.Accordion("🔑 API Configuration", open=False): |
|
gr.Markdown("**Configure your Hugging Face API Key**") |
|
gr.Markdown("🔒 **Security**: Your API key is only kept during this session.") |
|
gr.Markdown("Get your API key from: https://huggingface.co/settings/tokens") |
|
|
|
api_key_input = gr.Textbox( |
|
label="Hugging Face API Key", |
|
placeholder="hf_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", |
|
type="password", |
|
lines=1 |
|
) |
|
api_key_status = gr.Textbox( |
|
label="Status", |
|
value="✅ HF_TOKEN found in .env file. To use a different key, enter it above and click 'Setup API Key'." if os.getenv("HF_TOKEN") else "⚠️ Please enter your Hugging Face API key above and click 'Setup API Key' to start using the application.", |
|
interactive=False |
|
) |
|
|
|
|
|
gr.Markdown("**Agent Configuration**") |
|
max_steps_slider = gr.Slider( |
|
minimum=5, |
|
maximum=30, |
|
value=10, |
|
step=1, |
|
label="Maximum Steps", |
|
info="Number of steps the agent can take per session (higher = more detailed but slower)" |
|
) |
|
|
|
setup_api_btn = gr.Button("Setup API Key", variant="secondary") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Row(): |
|
gr.HTML("""<div style="display: flex; align-items: center; gap: 8px; font-family: system-ui, -apple-system, sans-serif;">Powered by |
|
<img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png" style="width: 32px; height: 32px; object-fit: contain;" alt="logo"> |
|
<a target="_blank" href="https://github.com/huggingface/smolagents"><b>hf/smolagents</b></a> |
|
</div>""") |
|
|
|
|
|
stored_messages = gr.State([]) |
|
chatbot = gr.Chatbot( |
|
label="open-Deep-Research", |
|
type="messages", |
|
avatar_images=( |
|
None, |
|
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", |
|
), |
|
resizeable=False, |
|
scale=1, |
|
elem_id="my-chatbot", |
|
) |
|
|
|
|
|
report_viewer = gr.HTML(label="Vulnerability Report", visible=False) |
|
|
|
|
|
setup_api_btn.click( |
|
self.setup_api_key, |
|
[api_key_input, max_steps_slider], |
|
[api_key_status] |
|
) |
|
|
|
|
|
|
|
text_input.submit( |
|
self.log_user_message, |
|
[text_input, file_uploads_log], |
|
[stored_messages, text_input, launch_research_btn], |
|
).then( |
|
self.interact_with_agent, |
|
[stored_messages, chatbot], |
|
[chatbot], |
|
).then( |
|
lambda: ( |
|
gr.Textbox( |
|
interactive=True, |
|
placeholder="Enter your prompt here and press the button", |
|
), |
|
gr.Button(interactive=True), |
|
), |
|
None, |
|
[text_input, launch_research_btn], |
|
) |
|
launch_research_btn.click( |
|
self.log_user_message, |
|
[text_input, file_uploads_log], |
|
[stored_messages, text_input, launch_research_btn], |
|
).then( |
|
self.interact_with_agent, |
|
[stored_messages, chatbot], |
|
[chatbot], |
|
).then( |
|
lambda: ( |
|
gr.Textbox( |
|
interactive=True, |
|
placeholder="Enter your prompt here and press the button", |
|
), |
|
gr.Button(interactive=True), |
|
), |
|
None, |
|
[text_input, launch_research_btn], |
|
) |
|
|
|
|
|
else: |
|
try: |
|
with gr.Blocks( |
|
fill_height=True, |
|
): |
|
|
|
gr.Markdown("""# Open Deep Research Vulnerability Intelligence""") |
|
gr.Markdown("""<img src="https://github.githubassets.com/images/modules/logos_page/GitHub-Mark.png" width="20" height="20" style="display: inline-block; vertical-align: middle; margin-right: 8px;"> <a href="https://github.com/mcdaqc/open-deep-research-vulnerability-intelligence" target="_blank">Github Repository</a>""") |
|
|
|
|
|
with gr.Accordion("ℹ️ About", open=False): |
|
gr.Markdown("""**What it does:** |
|
This AI agent specializes in automated vulnerability research and analysis, built on <a href="https://huggingface.co/blog/open-deep-research" target="_blank">Hugging Face's Open Deep Research</a> architecture. It can search across multiple security databases to provide comprehensive vulnerability intelligence reports. |
|
|
|
**Available Tools & APIs:** |
|
- <a href="https://nvd.nist.gov/developers/vulnerabilities" target="_blank">🛡️ NIST NVD</a> - National Vulnerability Database (free API) |
|
- <a href="https://cvedb.com/" target="_blank">📊 Shodan CVEDB</a> - Comprehensive vulnerability database (free API) |
|
- <a href="https://kevin.gtfkd.com/" target="_blank">⚠️ KEVin</a> - Known Exploited Vulnerabilities database (free API) |
|
- <a href="https://www.first.org/epss/" target="_blank">📈 EPSS</a> - Exploit Prediction Scoring System (free API) |
|
- 🌐 **Web Browser** - Navigate and extract information from web pages |
|
|
|
**Model Configuration:** |
|
- **Default Model**: Qwen/Qwen2.5-Coder-32B-Instruct (recommended) |
|
- **Local Models**: For privacy, you can use Ollama with local models: |
|
1. Install Ollama: https://ollama.ai/ |
|
2. Pull a model: `ollama pull qwen2.5-coder:7b` |
|
3. Set in your `.env` file: |
|
- `MODEL_ID=qwen2.5-coder:7b` |
|
- `OPENAI_API_BASE=http://localhost:11434/v1` |
|
- `OPENAI_API_KEY=ollama` |
|
|
|
**How to use:** |
|
1. Enter your Hugging Face API key below |
|
2. Ask about specific software versions, CVEs, or security vulnerabilities |
|
3. The agent will automatically search all available databases |
|
4. Receive comprehensive vulnerability reports with CVSS scores, EPSS predictions, and remediation advice""") |
|
|
|
|
|
with gr.Accordion("🔑 API Configuration", open=False): |
|
gr.Markdown("**Configure your Hugging Face API Key**") |
|
gr.Markdown("🔒 **Security**: Your API key is only kept during this session.") |
|
gr.Markdown("Get your API key from: https://huggingface.co/settings/tokens") |
|
|
|
mobile_api_key_input = gr.Textbox( |
|
label="Hugging Face API Key", |
|
placeholder="hf_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", |
|
type="password", |
|
lines=1 |
|
) |
|
mobile_api_key_status = gr.Textbox( |
|
label="Status", |
|
value="✅ HF_TOKEN found in .env file. To use a different key, enter it above and click 'Setup API Key'." if os.getenv("HF_TOKEN") else "⚠️ Please enter your Hugging Face API key above and click 'Setup API Key' to start using the application.", |
|
interactive=False |
|
) |
|
|
|
|
|
gr.Markdown("**Agent Configuration**") |
|
mobile_max_steps_slider = gr.Slider( |
|
minimum=5, |
|
maximum=30, |
|
value=10, |
|
step=1, |
|
label="Maximum Steps", |
|
info="Number of steps the agent can take per session (higher = more detailed but slower)" |
|
) |
|
|
|
mobile_setup_api_btn = gr.Button("Setup API Key", variant="secondary") |
|
|
|
|
|
stored_messages = gr.State([]) |
|
file_uploads_log = gr.State([]) |
|
chatbot = gr.Chatbot( |
|
label="open-Deep-Research", |
|
type="messages", |
|
avatar_images=( |
|
None, |
|
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", |
|
), |
|
resizeable=True, |
|
scale=1, |
|
) |
|
|
|
|
|
text_input = gr.Textbox( |
|
lines=1, |
|
label="Your request", |
|
placeholder="Enter your prompt here and press the button", |
|
) |
|
launch_research_btn = gr.Button( |
|
"Run", |
|
variant="primary", |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Accordion("💡 Example Prompts", open=False): |
|
gr.Markdown("**Click any example below to populate your request field:**") |
|
|
|
mobile_example_btn_1 = gr.Button("🔍 MobaXterm 24.0 vulnerabilities", size="sm", variant="secondary") |
|
mobile_example_btn_2 = gr.Button("🔍 Chrome 120.0.6099.109 security analysis", size="sm", variant="secondary") |
|
mobile_example_btn_3 = gr.Button("🔍 Apache Tomcat 9.0.65 KEV check", size="sm", variant="secondary") |
|
|
|
|
|
with gr.Row(): |
|
gr.HTML("""<div style="display: flex; align-items: center; gap: 8px; font-family: system-ui, -apple-system, sans-serif;">Powered by |
|
<img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png" style="width: 32px; height: 32px; object-fit: contain;" alt="logo"> |
|
<a target="_blank" href="https://github.com/huggingface/smolagents"><b>hf/smolagents</b></a> |
|
</div>""") |
|
|
|
|
|
mobile_setup_api_btn.click( |
|
self.setup_api_key, |
|
[mobile_api_key_input, mobile_max_steps_slider], |
|
[mobile_api_key_status] |
|
) |
|
|
|
|
|
mobile_example_btn_1.click( |
|
lambda: "Analyze MobaXterm 24.0 for vulnerabilities as of today", |
|
None, |
|
[text_input] |
|
) |
|
mobile_example_btn_2.click( |
|
lambda: "Check Chrome 120.0.6099.109 for current security issues", |
|
None, |
|
[text_input] |
|
) |
|
mobile_example_btn_3.click( |
|
lambda: "Is Apache Tomcat 9.0.65 in KEV database as of today?", |
|
None, |
|
[text_input] |
|
) |
|
|
|
|
|
text_input.submit( |
|
self.log_user_message, |
|
[text_input, file_uploads_log], |
|
[stored_messages, text_input, launch_research_btn], |
|
).then( |
|
self.interact_with_agent, |
|
[stored_messages, chatbot], |
|
[chatbot], |
|
).then( |
|
lambda: ( |
|
gr.Textbox( |
|
interactive=True, |
|
placeholder="Enter your prompt here and press the button", |
|
), |
|
gr.Button(interactive=True), |
|
), |
|
None, |
|
[text_input, launch_research_btn], |
|
) |
|
launch_research_btn.click( |
|
self.log_user_message, |
|
[text_input, file_uploads_log], |
|
[stored_messages, text_input, launch_research_btn], |
|
).then( |
|
self.interact_with_agent, |
|
[stored_messages, chatbot], |
|
[chatbot], |
|
).then( |
|
lambda: ( |
|
gr.Textbox( |
|
interactive=True, |
|
placeholder="Enter your prompt here and press the button", |
|
), |
|
gr.Button(interactive=True), |
|
), |
|
None, |
|
[text_input, launch_research_btn], |
|
) |
|
except Exception as e: |
|
|
|
logger.error(f"Mobile layout failed: {e}") |
|
|
|
with gr.Blocks(fill_height=True): |
|
gr.Markdown("""# Open Deep Research Vulnerability Intelligence""") |
|
gr.Markdown("""<img src="https://github.githubassets.com/images/modules/logos_page/GitHub-Mark.png" width="20" height="20" style="display: inline-block; vertical-align: middle; margin-right: 8px;"> <a href="https://github.com/mcdaqc/open-deep-research-vulnerability-intelligence" target="_blank">Github Repository</a>""") |
|
gr.Markdown("⚠️ Mobile layout failed, using desktop layout as fallback.") |
|
|
|
|
|
stored_messages = gr.State([]) |
|
file_uploads_log = gr.State([]) |
|
chatbot = gr.Chatbot( |
|
label="open-Deep-Research", |
|
type="messages", |
|
avatar_images=( |
|
None, |
|
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", |
|
), |
|
) |
|
|
|
text_input = gr.Textbox( |
|
lines=1, |
|
label="Your request", |
|
placeholder="Enter your prompt here and press the button", |
|
) |
|
launch_research_btn = gr.Button("Run", variant="primary") |
|
|
|
|
|
text_input.submit( |
|
self.log_user_message, |
|
[text_input, file_uploads_log], |
|
[stored_messages, text_input, launch_research_btn], |
|
).then( |
|
self.interact_with_agent, |
|
[stored_messages, chatbot], |
|
[chatbot], |
|
) |
|
launch_research_btn.click( |
|
self.log_user_message, |
|
[text_input, file_uploads_log], |
|
[stored_messages, text_input, launch_research_btn], |
|
).then( |
|
self.interact_with_agent, |
|
[stored_messages, chatbot], |
|
[chatbot], |
|
) |
|
|
|
|
|
|
|
is_spaces = os.getenv("SPACE_ID") is not None |
|
|
|
if is_spaces: |
|
|
|
demo.launch( |
|
debug=False, |
|
server_name="0.0.0.0", |
|
server_port=int(os.getenv("PORT", 7860)), |
|
share=True, |
|
**kwargs |
|
) |
|
else: |
|
|
|
demo.launch( |
|
debug=True, |
|
server_name="localhost", |
|
server_port=7860, |
|
share=False, |
|
**kwargs |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
GradioUI(file_upload_folder="uploads").launch() |
|
except KeyboardInterrupt: |
|
print("Application stopped by user") |
|
except Exception as e: |
|
print(f"Error starting application: {e}") |
|
raise |