Spaces:
Running
on
Zero
Running
on
Zero
import math, json | |
import gradio as gr | |
import torch, pandas as pd | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
# ZeroGPU support | |
try: | |
import spaces | |
ZEROGPU_AVAILABLE = True | |
print("ZeroGPU support enabled") | |
except ImportError: | |
ZEROGPU_AVAILABLE = False | |
print("ZeroGPU not available, running in standard mode") | |
# Create dummy decorator for local development | |
def spaces_gpu_decorator(duration=60): | |
def decorator(func): | |
return func | |
return decorator | |
spaces = type('spaces', (), {'GPU': spaces_gpu_decorator}) | |
# Model configuration - Foundation-Sec-8B only | |
MODEL_NAME = "fdtn-ai/Foundation-Sec-8B" | |
# Initialize tokenizer and model using pipeline approach | |
print(f"Loading model: {MODEL_NAME}") | |
try: | |
print(f"Initializing Foundation-Sec-8B model...") | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
text_pipeline = pipeline( | |
"text-generation", | |
model=MODEL_NAME, | |
tokenizer=tokenizer, | |
torch_dtype=torch.bfloat16, | |
device_map="auto", | |
trust_remote_code=True | |
) | |
print(f"Foundation-Sec-8B model initialized successfully") | |
# Extract model and tokenizer from pipeline for direct access | |
model = text_pipeline.model | |
tok = text_pipeline.tokenizer | |
except Exception as e: | |
print(f"Error initializing Foundation-Sec-8B model: {str(e)}") | |
print("Trying with simplified parameters...") | |
try: | |
# Try with simpler parameters | |
text_pipeline = pipeline( | |
"text-generation", | |
model=MODEL_NAME, | |
trust_remote_code=True | |
) | |
model = text_pipeline.model | |
tok = text_pipeline.tokenizer | |
print(f"Foundation-Sec-8B model loaded with simplified parameters") | |
except Exception as e2: | |
print(f"Failed to load Foundation-Sec-8B model: {str(e2)}") | |
raise RuntimeError(f"Could not load Foundation-Sec-8B model. Please ensure the model is accessible and try again. Error: {str(e2)}") | |
# Log device information | |
if hasattr(model, 'device'): | |
print(f"Model loaded on device: {model.device}") | |
else: | |
device_info = next(model.parameters()).device | |
print(f"Model parameters on device: {device_info}") | |
print(f"CUDA available: {torch.cuda.is_available()}") | |
if torch.cuda.is_available(): | |
print(f"CUDA device count: {torch.cuda.device_count()}") | |
print(f"Current CUDA device: {torch.cuda.current_device()}") | |
print(f"CUDA device name: {torch.cuda.get_device_name()}") | |
# Configuration parameters | |
LEN_ALPHA = 0.7 # Length correction factor (0=no correction, 1=full average logP) | |
# Sample data for testing | |
CAMPAIGN_LIST = [ | |
"Operation Aurora", | |
"Dust Storm", | |
"ShadowHammer", | |
"NotPetya", | |
"SolarWinds", | |
] | |
ACTOR_LIST = ["APT1", "APT28", "APT33", "APT38", "FIN8"] | |
# Sample ATT&CK technique IDs with names | |
TECHNIQUE_LIST = [ | |
"T1059 Command and Scripting Interpreter", | |
"T1566 Phishing", | |
"T1027 Obfuscated/Stored Files", | |
"T1036 Masquerading", | |
"T1105 Ingress Tool Transfer", | |
"T1018 Remote System Discovery", | |
"T1568 Dynamic Resolution", | |
] | |
def phrase_log_prob(prompt, phrase): | |
"""Calculate log probability of a phrase given a prompt using the language model.""" | |
try: | |
# Log GPU usage information | |
device_info = next(model.parameters()).device | |
print(f"Running phrase_log_prob on device: {device_info}") | |
ids_prompt = tok(prompt, return_tensors="pt").to(model.device)["input_ids"][0] | |
ids_phrase = tok(phrase, add_special_tokens=False)["input_ids"] | |
lp = 0.0 | |
cur = ids_prompt.unsqueeze(0) | |
for tid in ids_phrase: | |
logits = model(cur).logits[0, -1].float() | |
lp += torch.log_softmax(logits, -1)[tid].item() | |
cur = torch.cat([cur, torch.tensor([[tid]], device=model.device)], 1) | |
return lp | |
except Exception as e: | |
print(f"Error in phrase_log_prob: {e}") | |
raise e | |
def binary_assoc_score(prompt: str, phrase: str, neg="does NOT use", prompt_template="typically uses") -> float: | |
""" | |
Calculate binary association score: p ≈ P(use) / (P(use)+P(not use)) | |
Applies length normalization to correct for longer phrases. | |
Args: | |
prompt: Base prompt string | |
phrase: Phrase to evaluate | |
neg: Negative template to replace positive template | |
prompt_template: Positive template to be replaced | |
Returns: | |
Length-normalized association score between 0 and 1 | |
""" | |
lp_pos = phrase_log_prob(prompt, phrase) | |
lp_neg = phrase_log_prob(prompt.replace(prompt_template, neg), phrase) | |
# Logistic transformation | |
prob = 1 / (1 + math.exp(lp_neg - lp_pos)) | |
# Length normalization | |
n_tok = len(tok(phrase, add_special_tokens=False)["input_ids"]) | |
return prob / (n_tok ** LEN_ALPHA) | |
def campaign_actor_associations(campaigns, actors): | |
"""Campaign × Actor の関連度を計算し、各CampaignごとにTop Actorを返す""" | |
results = {} | |
for camp in campaigns: | |
prompt_base = CAMPAIGN_ACTOR_PROMPT.format(campaign=camp) | |
actor_scores = {} | |
for actor in actors: | |
score = binary_assoc_score(prompt_base, actor, neg="is NOT associated with") | |
actor_scores[actor] = score | |
# スコア順でソート | |
sorted_actors = sorted(actor_scores.items(), key=lambda x: x[1], reverse=True) | |
results[camp] = sorted_actors | |
return results | |
def campaign_technique_matrix(campaigns, techniques, prompt_template="typically uses", neg_template="typically does NOT use"): | |
""" | |
Generate Campaign × Technique association matrix using binary scoring. | |
Args: | |
campaigns: List of campaign names | |
techniques: List of technique names | |
prompt_template: Template for positive association | |
neg_template: Template for negative association | |
Returns: | |
DataFrame with campaigns as rows, techniques as columns, scores as values | |
""" | |
rows = {} | |
for camp in campaigns: | |
prompt_base = f"{camp} {prompt_template}" | |
rows[camp] = { | |
tech: binary_assoc_score(prompt_base, tech, neg=neg_template, prompt_template=prompt_template) | |
for tech in techniques | |
} | |
return pd.DataFrame.from_dict(rows, orient="index") | |
def campaign_actor_matrix(campaigns, actors): | |
"""Campaign × Actor 行列を生成""" | |
rows = {} | |
for camp in campaigns: | |
prompt_base = CAMPAIGN_ACTOR_PROMPT.format(campaign=camp) | |
rows[camp] = { | |
actor: binary_assoc_score(prompt_base, actor, neg="is NOT associated with") | |
for actor in actors | |
} | |
return pd.DataFrame.from_dict(rows, orient="index") | |
def campaign_actor_probs(campaigns, actors, prompt_template="is conducted by"): | |
""" | |
Generate Campaign × Actor probability matrix using softmax normalization. | |
Args: | |
campaigns: List of campaign names | |
actors: List of actor names | |
prompt_template: Template for actor association prompt | |
Returns: | |
DataFrame with campaigns as rows, actors as columns, probabilities as values | |
""" | |
rows = {} | |
for camp in campaigns: | |
prompt = f"{camp} {prompt_template}" | |
logps = [phrase_log_prob(prompt, a) for a in actors] | |
# Softmax normalization (with max-shift for numerical stability) | |
m = max(logps) | |
ps = [math.exp(lp - m) for lp in logps] | |
s = sum(ps) | |
rows[camp] = {a: p/s for a, p in zip(actors, ps)} | |
return pd.DataFrame.from_dict(rows, orient="index") | |
def generate_actor_heatmap(c_list, a_list, actor_prompt_template): | |
"""Generate Campaign-Actor association heatmap with probability visualization.""" | |
try: | |
campaigns = [c.strip() for c in c_list.split(",") if c.strip()] | |
actors = [a.strip() for a in a_list.split(",") if a.strip()] | |
if not campaigns or not actors: | |
fig, ax = plt.subplots(figsize=(8, 6)) | |
ax.text(0.5, 0.5, 'Please enter both Campaigns and Actors', | |
ha='center', va='center', fontsize=16) | |
ax.set_xlim(0, 1) | |
ax.set_ylim(0, 1) | |
ax.axis('off') | |
return fig | |
print(f"Processing {len(campaigns)} campaigns and {len(actors)} actors...") | |
print(f"Using prompt template: '{actor_prompt_template}'") | |
# Check GPU availability | |
if torch.cuda.is_available(): | |
print(f"GPU computation enabled - Device: {torch.cuda.get_device_name()}") | |
else: | |
print("Running on CPU") | |
# Calculate probability matrix | |
df_ca = campaign_actor_probs(campaigns, actors, actor_prompt_template) | |
print(f"Actor probability matrix shape: {df_ca.shape}") | |
print("Actor probability matrix:") | |
print(df_ca.round(4)) | |
# Create heatmap with matplotlib/seaborn | |
fig, ax = plt.subplots(figsize=(max(8, len(actors)*1.2), max(6, len(campaigns)*0.8))) | |
sns.heatmap(df_ca, annot=True, cmap='plasma', fmt='.3f', | |
cbar_kws={'label': 'P(actor)'}, ax=ax) | |
ax.set_title('Campaign-Actor Probabilities (softmax normalized)', | |
fontsize=14, pad=20) | |
ax.set_xlabel('Actor', fontsize=12) | |
ax.set_ylabel('Campaign', fontsize=12) | |
# Adjust label rotation | |
plt.setp(ax.get_xticklabels(), rotation=45, ha='right') | |
plt.setp(ax.get_yticklabels(), rotation=0) | |
plt.tight_layout() | |
print("Actor heatmap generated successfully!") | |
return fig | |
except Exception as e: | |
print(f"Error in generate_actor_heatmap: {e}") | |
import traceback | |
traceback.print_exc() | |
fig, ax = plt.subplots(figsize=(8, 6)) | |
ax.text(0.5, 0.5, f'Error occurred: {str(e)}', | |
ha='center', va='center', fontsize=12, color='red') | |
ax.set_xlim(0, 1) | |
ax.set_ylim(0, 1) | |
ax.axis('off') | |
return fig | |
def generate_technique_heatmap(c_list, t_list, technique_prompt_template, technique_neg_template): | |
"""Generate Campaign-Technique association heatmap with binary scoring visualization.""" | |
try: | |
campaigns = [c.strip() for c in c_list.split(",") if c.strip()] | |
techniques = [t.strip() for t in t_list.split(",") if t.strip()] | |
if not campaigns or not techniques: | |
fig, ax = plt.subplots(figsize=(8, 6)) | |
ax.text(0.5, 0.5, 'Please enter both Campaigns and Techniques', | |
ha='center', va='center', fontsize=16) | |
ax.set_xlim(0, 1) | |
ax.set_ylim(0, 1) | |
ax.axis('off') | |
return fig | |
print(f"Processing {len(campaigns)} campaigns and {len(techniques)} techniques...") | |
print(f"Using prompt templates: '{technique_prompt_template}' / '{technique_neg_template}'") | |
# Check GPU availability | |
if torch.cuda.is_available(): | |
print(f"GPU computation enabled - Device: {torch.cuda.get_device_name()}") | |
else: | |
print("Running on CPU") | |
# Calculate score matrix | |
df_ct = campaign_technique_matrix(campaigns, techniques, technique_prompt_template, technique_neg_template) | |
print(f"Score matrix shape: {df_ct.shape}") | |
print("Score matrix:") | |
print(df_ct.round(4)) | |
# Create heatmap with matplotlib/seaborn | |
fig, ax = plt.subplots(figsize=(max(8, len(techniques)*1.2), max(6, len(campaigns)*0.8))) | |
sns.heatmap(df_ct, annot=True, cmap='viridis', fmt='.3f', | |
cbar_kws={'label': 'Association Score'}, ax=ax) | |
ax.set_title('Campaign-Technique Associations (len-norm, independent)', | |
fontsize=14, pad=20) | |
ax.set_xlabel('Technique', fontsize=12) | |
ax.set_ylabel('Campaign', fontsize=12) | |
# Adjust label rotation | |
plt.setp(ax.get_xticklabels(), rotation=45, ha='right') | |
plt.setp(ax.get_yticklabels(), rotation=0) | |
plt.tight_layout() | |
print("Technique heatmap generated successfully!") | |
return fig | |
except Exception as e: | |
print(f"Error in generate_technique_heatmap: {e}") | |
import traceback | |
traceback.print_exc() | |
fig, ax = plt.subplots(figsize=(8, 6)) | |
ax.text(0.5, 0.5, f'Error occurred: {str(e)}', | |
ha='center', va='center', fontsize=12, color='red') | |
ax.set_xlim(0, 1) | |
ax.set_ylim(0, 1) | |
ax.axis('off') | |
return fig | |
with gr.Blocks(title="LLM Threat Graph Demo") as demo: | |
gr.Markdown("# 🕸️ LLM Threat Association Analysis\n*Visualizing Campaign-Actor-Technique relationships using Language Models*") | |
# Common inputs | |
with gr.Row(): | |
campaigns = gr.Textbox( | |
"Operation Aurora, Dust Storm, ShadowHammer, NotPetya, SolarWinds", | |
label="Campaigns (comma-separated)", | |
placeholder="e.g., Operation Aurora, NotPetya, Stuxnet" | |
) | |
# Campaign-Actor section (probabilistic) | |
gr.Markdown("## 👤 Campaign-Actor Associations") | |
gr.Markdown("Visualizing Campaign-Actor relationships with probabilistic heatmaps") | |
gr.Markdown(""" | |
**Calculation Method**: `P(actor | "{campaign} is conducted by") (softmax normalized)` | |
1. Calculate `phrase_log_prob("{campaign} is conducted by", actor)` for each Actor | |
2. Apply softmax normalization to create probability distribution (probabilities sum to 1.0 per Campaign) | |
3. Result: Shows relative likelihood of each Actor conducting each Campaign | |
""") | |
with gr.Row(): | |
actor_prompt_template = gr.Textbox( | |
"is conducted by", | |
label="Actor Prompt Template", | |
placeholder="e.g., is conducted by, is attributed to" | |
) | |
actors = gr.Textbox( | |
"APT1, APT28, APT33, APT38, FIN8", | |
label="Actors (comma-separated)", | |
placeholder="e.g., APT1, Lazarus Group, Cozy Bear" | |
) | |
btn_actor = gr.Button("Generate Actor Heatmap", variant="primary") | |
plot_actor = gr.Plot(label="Campaign-Actor Heatmap") | |
btn_actor.click( | |
fn=generate_actor_heatmap, | |
inputs=[campaigns, actors, actor_prompt_template], | |
outputs=plot_actor, | |
show_progress=True | |
) | |
# Campaign-Technique section (independent scoring) | |
gr.Markdown("## 🛠️ Campaign-Technique Associations") | |
gr.Markdown("Visualizing Campaign-Technique relationships with independent association scores") | |
gr.Markdown(""" | |
**Calculation Method**: `Binary Association Score (length-normalized, independent)` | |
1. For each Technique, calculate: | |
- `lp_pos = phrase_log_prob("{campaign} typically uses", technique)` | |
- `lp_neg = phrase_log_prob("{campaign} typically does NOT use", technique)` | |
2. Apply logistic transformation: `prob = 1 / (1 + exp(lp_neg - lp_pos))` | |
3. Length normalization: `score = prob / (n_tokens^0.7)` (penalty for longer phrases) | |
4. Result: Independent association scores (0-1) for each Campaign-Technique pair | |
""") | |
with gr.Row(): | |
technique_prompt_template = gr.Textbox( | |
"typically uses", | |
label="Technique Prompt Template (positive)", | |
placeholder="e.g., typically uses, commonly employs" | |
) | |
technique_neg_template = gr.Textbox( | |
"typically does NOT use", | |
label="Technique Prompt Template (negative)", | |
placeholder="e.g., typically does NOT use, never employs" | |
) | |
techniques = gr.Textbox( | |
"T1059 Command and Scripting Interpreter, T1566 Phishing, T1027 Obfuscated/Stored Files, T1036 Masquerading, T1105 Ingress Tool Transfer, T1018 Remote System Discovery, T1568 Dynamic Resolution", | |
label="Techniques (comma-separated)", | |
placeholder="e.g., T1059 Command and Scripting Interpreter, T1566 Phishing" | |
) | |
btn_technique = gr.Button("Generate Technique Heatmap", variant="primary") | |
plot_technique = gr.Plot(label="Campaign-Technique Heatmap") | |
btn_technique.click( | |
fn=generate_technique_heatmap, | |
inputs=[campaigns, techniques, technique_prompt_template, technique_neg_template], | |
outputs=plot_technique, | |
show_progress=True | |
) | |
demo.launch() | |