Spaces:
Running
Running
import gradio as gr | |
import torch | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
import numpy as np | |
import json | |
import re | |
import time | |
from datetime import datetime | |
import plotly.graph_objects as go | |
import plotly.express as px | |
from plotly.subplots import make_subplots | |
# Initialize SecBERT model | |
MODEL_NAME = "jackaduma/SecBERT" | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME) | |
# Threat patterns and keywords | |
THREAT_PATTERNS = { | |
"malware": ["trojan", "virus", "worm", "ransomware", "backdoor", "rootkit", "botnet", "keylogger"], | |
"phishing": ["click here", "urgent action", "verify account", "suspended", "limited time", "act now"], | |
"social_engineering": ["confidential", "insider", "exclusive", "secret", "don't tell", "between us"], | |
"data_breach": ["leaked", "exposed", "unauthorized access", "data dump", "credentials", "database"], | |
"network_attack": ["ddos", "injection", "exploit", "payload", "shell", "reverse", "buffer overflow"], | |
"apt": ["advanced persistent", "nation state", "targeted", "spear phishing", "zero day", "lateral movement"] | |
} | |
DEMO_EXAMPLES = [ | |
"Urgent: Your account has been suspended! Click this link immediately to verify your identity and restore access.", | |
"Our new trojan variant uses advanced persistence mechanisms and lateral movement techniques to maintain access.", | |
"The APT group deployed custom malware with zero-day exploits targeting financial institutions.", | |
"Database containing 2.3 million user credentials was found exposed on an unprotected server.", | |
"Hi there! Hope you're having a great day. Looking forward to our meeting tomorrow at 2 PM.", | |
"The SQL injection vulnerability allows attackers to dump the entire user database via union select queries." | |
] | |
def analyze_threat_patterns(text): | |
"""Analyze text for cybersecurity threat patterns""" | |
threat_scores = {} | |
detected_threats = [] | |
text_lower = text.lower() | |
for threat_type, keywords in THREAT_PATTERNS.items(): | |
score = 0 | |
found_keywords = [] | |
for keyword in keywords: | |
if keyword in text_lower: | |
score += 1 | |
found_keywords.append(keyword) | |
if score > 0: | |
threat_scores[threat_type] = { | |
"score": min(score / len(keywords), 1.0), | |
"keywords": found_keywords | |
} | |
detected_threats.append(threat_type) | |
return threat_scores, detected_threats | |
def get_threat_level(threat_scores): | |
"""Calculate overall threat level""" | |
if not threat_scores: | |
return "safe", 0.0 | |
max_score = max(threat_info["score"] for threat_info in threat_scores.values()) | |
if max_score >= 0.4: | |
return "critical", max_score | |
elif max_score >= 0.25: | |
return "high", max_score | |
elif max_score >= 0.15: | |
return "medium", max_score | |
else: | |
return "low", max_score | |
def create_threat_visualization(threat_scores, overall_level, overall_score): | |
"""Create interactive threat visualization""" | |
if not threat_scores: | |
# Safe visualization | |
fig = go.Figure(go.Indicator( | |
mode = "gauge+number", | |
value = 0, | |
domain = {'x': [0, 1], 'y': [0, 1]}, | |
title = {'text': "π’ SAFE"}, | |
gauge = { | |
'axis': {'range': [None, 1]}, | |
'bar': {'color': "green"}, | |
'steps': [{'range': [0, 1], 'color': "lightgray"}], | |
'threshold': {'line': {'color': "red", 'width': 4}, | |
'thickness': 0.75, 'value': 0.8} | |
} | |
)) | |
else: | |
# Threat level colors | |
colors = { | |
"safe": "green", | |
"low": "yellow", | |
"medium": "orange", | |
"high": "red", | |
"critical": "darkred" | |
} | |
# Main gauge | |
fig = go.Figure(go.Indicator( | |
mode = "gauge+number+delta", | |
value = overall_score, | |
domain = {'x': [0, 1], 'y': [0, 1]}, | |
title = {'text': f"π¨ {overall_level.upper()} THREAT"}, | |
delta = {'reference': 0.5}, | |
gauge = { | |
'axis': {'range': [None, 1]}, | |
'bar': {'color': colors[overall_level]}, | |
'steps': [ | |
{'range': [0, 0.15], 'color': "lightgreen"}, | |
{'range': [0.15, 0.25], 'color': "yellow"}, | |
{'range': [0.25, 0.4], 'color': "orange"}, | |
{'range': [0.4, 1], 'color': "red"} | |
], | |
'threshold': {'line': {'color': "black", 'width': 4}, | |
'thickness': 0.75, 'value': 0.8} | |
} | |
)) | |
fig.update_layout( | |
height=300, | |
font={'color': "white", 'family': "Arial"}, | |
paper_bgcolor="rgba(0,0,0,0.1)", | |
plot_bgcolor="rgba(0,0,0,0)" | |
) | |
return fig | |
def create_threat_breakdown(threat_scores): | |
"""Create threat category breakdown chart""" | |
if not threat_scores: | |
return None | |
categories = list(threat_scores.keys()) | |
scores = [threat_scores[cat]["score"] for cat in categories] | |
colors = px.colors.qualitative.Set3 | |
fig = go.Figure(data=[ | |
go.Bar( | |
x=categories, | |
y=scores, | |
marker_color=colors[:len(categories)], | |
text=[f"{s:.1%}" for s in scores], | |
textposition='auto', | |
) | |
]) | |
fig.update_layout( | |
title="Threat Categories Detected", | |
xaxis_title="Threat Type", | |
yaxis_title="Threat Score", | |
height=400, | |
font={'color': "white"}, | |
paper_bgcolor="rgba(0,0,0,0.1)", | |
plot_bgcolor="rgba(0,0,0,0)" | |
) | |
return fig | |
def highlight_threats_in_text(text, threat_scores): | |
"""Highlight detected threats in the original text""" | |
if not threat_scores: | |
return text | |
highlighted_text = text | |
colors = ["#ff6b6b", "#4ecdc4", "#45b7d1", "#96ceb4", "#ffeaa7", "#dda0dd"] | |
color_idx = 0 | |
for threat_type, threat_info in threat_scores.items(): | |
color = colors[color_idx % len(colors)] | |
for keyword in threat_info["keywords"]: | |
pattern = re.compile(re.escape(keyword), re.IGNORECASE) | |
highlighted_text = pattern.sub( | |
f'<mark style="background-color: {color}; padding: 2px 4px; border-radius: 3px; font-weight: bold;">{keyword}</mark>', | |
highlighted_text | |
) | |
color_idx += 1 | |
return highlighted_text | |
def analyze_cybersecurity_threat(text, progress=gr.Progress()): | |
"""Main threat analysis function""" | |
progress(0, desc="Initializing analysis...") | |
time.sleep(0.5) | |
progress(0.2, desc="Scanning for threat patterns...") | |
threat_scores, detected_threats = analyze_threat_patterns(text) | |
time.sleep(0.3) | |
progress(0.5, desc="Calculating threat levels...") | |
overall_level, overall_score = get_threat_level(threat_scores) | |
time.sleep(0.3) | |
progress(0.7, desc="Generating visualizations...") | |
gauge_chart = create_threat_visualization(threat_scores, overall_level, overall_score) | |
breakdown_chart = create_threat_breakdown(threat_scores) | |
time.sleep(0.3) | |
progress(0.9, desc="Highlighting threats in text...") | |
highlighted_text = highlight_threats_in_text(text, threat_scores) | |
# Generate detailed analysis | |
analysis_report = generate_analysis_report(threat_scores, overall_level, overall_score, detected_threats) | |
progress(1.0, desc="Analysis complete!") | |
return ( | |
gauge_chart, | |
breakdown_chart if breakdown_chart else gr.update(visible=False), | |
f"<div style='padding: 15px; background: rgba(0,0,0,0.1); border-radius: 10px; color: white;'>{highlighted_text}</div>", | |
analysis_report, | |
gr.update(visible=True) | |
) | |
def generate_analysis_report(threat_scores, overall_level, overall_score, detected_threats): | |
"""Generate detailed threat analysis report""" | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
report = f""" | |
## π Cybersecurity Threat Analysis Report | |
**Timestamp:** {timestamp} | |
**Overall Threat Level:** {overall_level.upper()} ({overall_score:.1%}) | |
### π Summary | |
""" | |
if not threat_scores: | |
report += """ | |
β **No cybersecurity threats detected!** | |
The analyzed text appears to be safe and doesn't contain indicators of: | |
- Malware or malicious software | |
- Phishing attempts | |
- Social engineering tactics | |
- Data breach indicators | |
- Network attack patterns | |
- Advanced Persistent Threat (APT) activities | |
""" | |
else: | |
report += f""" | |
β οΈ **{len(detected_threats)} threat categories detected:** | |
""" | |
for threat_type, threat_info in threat_scores.items(): | |
score_percent = threat_info["score"] * 100 | |
keywords = ", ".join(f"`{kw}`" for kw in threat_info["keywords"]) | |
report += f""" | |
**{threat_type.upper()}** - {score_percent:.1f}% confidence | |
- Detected keywords: {keywords} | |
""" | |
report += """ | |
### π‘οΈ Recommendations | |
""" | |
if overall_level == "critical": | |
report += "π¨ **IMMEDIATE ACTION REQUIRED** - This content shows strong indicators of cybersecurity threats" | |
elif overall_level == "high": | |
report += "β οΈ **HIGH PRIORITY** - Review and investigate this content immediately" | |
elif overall_level == "medium": | |
report += "πΆ **MODERATE CONCERN** - Monitor and verify the source of this content" | |
else: | |
report += "π‘ **LOW PRIORITY** - Minor indicators detected, routine monitoring recommended" | |
return report | |
# Custom CSS for the interface | |
custom_css = """ | |
.gradio-container { | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; | |
color: white; | |
} | |
.gr-button { | |
background: linear-gradient(45deg, #FF6B6B, #4ECDC4) !important; | |
border: none !important; | |
color: white !important; | |
font-weight: bold !important; | |
transition: all 0.3s ease !important; | |
} | |
.gr-button:hover { | |
transform: translateY(-2px) !important; | |
box-shadow: 0 5px 15px rgba(0,0,0,0.3) !important; | |
} | |
.gr-textbox textarea { | |
background: rgba(255,255,255,0.1) !important; | |
border: 2px solid rgba(255,255,255,0.3) !important; | |
color: white !important; | |
} | |
.gr-markdown { | |
color: white !important; | |
} | |
.threat-highlight { | |
animation: pulse 2s infinite; | |
} | |
@keyframes pulse { | |
0% { opacity: 1; } | |
50% { opacity: 0.7; } | |
100% { opacity: 1; } | |
} | |
""" | |
# Build the Gradio interface | |
with gr.Blocks(css=custom_css, title="π¨ Cyber Threat Radar") as demo: | |
gr.Markdown(""" | |
# π¨ Cyber Threat Radar Dashboard | |
### Powered by SecBERT - Real-time Cybersecurity Threat Detection | |
**Upload any text and watch our AI detect hidden cybersecurity threats in real-time!** | |
Try pasting emails, code snippets, news articles, or any suspicious content. Our advanced AI will analyze it for: | |
π¦ Malware indicators β’ π£ Phishing attempts β’ π₯ Social engineering β’ πΎ Data breaches β’ π Network attacks β’ π― APT activities | |
""") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
input_text = gr.Textbox( | |
label="π Enter text to analyze", | |
placeholder="Paste any text here - emails, articles, code, reports...", | |
lines=8, | |
max_lines=15 | |
) | |
with gr.Row(): | |
analyze_btn = gr.Button("π Analyze Threats", variant="primary", size="lg") | |
clear_btn = gr.Button("ποΈ Clear", variant="secondary") | |
gr.Markdown("### π― Try These Examples:") | |
example_buttons = [] | |
for i, example in enumerate(DEMO_EXAMPLES): | |
btn = gr.Button(f"Example {i+1}: {example[:50]}...", variant="secondary", size="sm") | |
btn.click(lambda x=example: x, outputs=input_text) | |
example_buttons.append(btn) | |
with gr.Row(): | |
with gr.Column(): | |
threat_gauge = gr.Plot(label="π― Threat Level Gauge") | |
with gr.Column(): | |
threat_breakdown = gr.Plot(label="π Threat Categories", visible=False) | |
with gr.Row(): | |
highlighted_text = gr.HTML(label="π Text Analysis (Threats Highlighted)") | |
with gr.Row(): | |
analysis_report = gr.Markdown(label="π Detailed Analysis Report") | |
results_section = gr.Group(visible=False) | |
# Event handlers | |
analyze_btn.click( | |
analyze_cybersecurity_threat, | |
inputs=[input_text], | |
outputs=[threat_gauge, threat_breakdown, highlighted_text, analysis_report, results_section] | |
) | |
clear_btn.click( | |
lambda: ("", None, None, "", gr.update(visible=False)), | |
outputs=[input_text, threat_gauge, threat_breakdown, analysis_report, results_section] | |
) | |
if __name__ == "__main__": | |
demo.launch( | |
share=True, | |
show_error=True, | |
debug=True, | |
server_name="0.0.0.0", | |
server_port=7860 | |
) |