import gradio as gr import spaces import torch import numpy as np from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline as hf_pipeline import re import matplotlib.pyplot as plt import io from PIL import Image from datetime import datetime from torch.nn.functional import sigmoid from collections import Counter import logging import traceback # Set up logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Device configuration device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') logger.info(f"Using device: {device}") # Set up custom logging # Set up custom logging class CustomFormatter(logging.Formatter): """Custom formatter with colors and better formatting""" grey = "\x1b[38;21m" blue = "\x1b[38;5;39m" yellow = "\x1b[38;5;226m" red = "\x1b[38;5;196m" bold_red = "\x1b[31;1m" reset = "\x1b[0m" def format(self, record): # Remove the logger name from the output if record.levelno == logging.DEBUG: return f"{self.blue}{record.getMessage()}{self.reset}" elif record.levelno == logging.INFO: return f"{self.grey}{record.getMessage()}{self.reset}" elif record.levelno == logging.WARNING: return f"{self.yellow}{record.getMessage()}{self.reset}" elif record.levelno == logging.ERROR: return f"{self.red}{record.getMessage()}{self.reset}" elif record.levelno == logging.CRITICAL: return f"{self.bold_red}{record.getMessage()}{self.reset}" return record.getMessage() # Setup logger logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # Remove any existing handlers logger.handlers = [] # Create console handler with custom formatter ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) ch.setFormatter(CustomFormatter()) logger.addHandler(ch) # Model initialization model_name = "SamanthaStorm/tether-multilabel-v4" model = AutoModelForSequenceClassification.from_pretrained(model_name).to(device) tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False) # Sentiment model sentiment_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/tether-sentiment").to(device) sentiment_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/tether-sentiment", use_fast=False) emotion_pipeline = hf_pipeline( "text-classification", model="j-hartmann/emotion-english-distilroberta-base", return_all_scores=True, # Get all emotion scores top_k=None, # Don't limit to top k predictions truncation=True, device=0 if torch.cuda.is_available() else -1 ) # DARVO model darvo_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1").to(device) darvo_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1", use_fast=False) darvo_model.eval() # Constants and Labels LABELS = [ "recovery phase", "control", "gaslighting", "guilt tripping", "dismissiveness", "blame shifting", "nonabusive", "projection", "insults", "contradictory statements", "obscure language" ] SENTIMENT_LABELS = ["undermining", "supportive"] THRESHOLDS = { "recovery phase": 0.324, "control": 0.433, "gaslighting": 0.285, "guilt tripping": 0.267, "dismissiveness": 0.123, "blame shifting": 0.116, "projection": 0.425, "insults": 0.347, "contradictory statements": 0.378, "obscure language": 0.206, "nonabusive": 0.094 } PATTERN_WEIGHTS = { "recovery": 0.7, "control": 1.4, "gaslighting": 1.3, "guilt tripping": 1.2, "dismissiveness": 0.9, "blame shifting": 1.0, # Increased from 0.8 "projection": 0.5, "insults": 1.4, # Reduced from 2.1 "contradictory statements": 1.0, "obscure language": 0.9, "nonabusive": 0.0 } ESCALATION_QUESTIONS = [ ("Partner has access to firearms or weapons", 4), ("Partner threatened to kill you", 3), ("Partner threatened you with a weapon", 3), ("Partner has ever choked you, even if you considered it consensual at the time", 4), ("Partner injured or threatened your pet(s)", 3), ("Partner has broken your things, punched or kicked walls, or thrown things ", 2), ("Partner forced or coerced you into unwanted sexual acts", 3), ("Partner threatened to take away your children", 2), ("Violence has increased in frequency or severity", 3), ("Partner monitors your calls/GPS/social media", 2) ] RISK_STAGE_LABELS = { 1: "πŸŒ€ Risk Stage: Tension-Building\nThis message reflects rising emotional pressure or subtle control attempts.", 2: "πŸ”₯ Risk Stage: Escalation\nThis message includes direct or aggressive patterns, suggesting active harm.", 3: "🌧️ Risk Stage: Reconciliation\nThis message reflects a reset attemptβ€”apologies or emotional repair without accountability.", 4: "🌸 Risk Stage: Calm / Honeymoon\nThis message appears supportive but may follow prior harm, minimizing it." } THREAT_MOTIFS = [ "i'll kill you", "i'm going to hurt you", "you're dead", "you won't survive this", "i'll break your face", "i'll bash your head in", "i'll snap your neck", "i'll come over there and make you shut up", "i'll knock your teeth out", "you're going to bleed", "you want me to hit you?", "i won't hold back next time", "i swear to god i'll beat you", "next time, i won't miss", "i'll make you scream", "i know where you live", "i'm outside", "i'll be waiting", "i saw you with him", "you can't hide from me", "i'm coming to get you", "i'll find you", "i know your schedule", "i watched you leave", "i followed you home", "you'll regret this", "you'll be sorry", "you're going to wish you hadn't", "you brought this on yourself", "don't push me", "you have no idea what i'm capable of", "you better watch yourself", "i don't care what happens to you anymore", "i'll make you suffer", "you'll pay for this", "i'll never let you go", "you're nothing without me", "if you leave me, i'll kill myself", "i'll ruin you", "i'll tell everyone what you did", "i'll make sure everyone knows", "i'm going to destroy your name", "you'll lose everyone", "i'll expose you", "your friends will hate you", "i'll post everything", "you'll be cancelled", "you'll lose everything", "i'll take the house", "i'll drain your account", "you'll never see a dime", "you'll be broke when i'm done", "i'll make sure you lose your job", "i'll take your kids", "i'll make sure you have nothing", "you can't afford to leave me", "don't make me do this", "you know what happens when i'm mad", "you're forcing my hand", "if you just behaved, this wouldn't happen", "this is your fault", "you're making me hurt you", "i warned you", "you should have listened" ] def get_emotion_profile(text): """Get emotion profile from text with all scores""" try: emotions = emotion_pipeline(text) if isinstance(emotions, list) and isinstance(emotions[0], list): # Extract all scores from the first prediction emotion_scores = emotions[0] # Convert to dictionary with lowercase emotion names return {e['label'].lower(): round(e['score'], 3) for e in emotion_scores} return {} except Exception as e: logger.error(f"Error in get_emotion_profile: {e}") return { "sadness": 0.0, "joy": 0.0, "neutral": 0.0, "disgust": 0.0, "anger": 0.0, "fear": 0.0 } def get_emotional_tone_tag(text, sentiment, patterns, abuse_score): """Get emotional tone tag based on emotions and patterns""" emotions = get_emotion_profile(text) sadness = emotions.get("sadness", 0) joy = emotions.get("joy", 0) neutral = emotions.get("neutral", 0) disgust = emotions.get("disgust", 0) anger = emotions.get("anger", 0) fear = emotions.get("fear", 0) # 1. Performative Regret if ( sadness > 0.3 and any(p in patterns for p in ["blame shifting", "guilt tripping", "recovery"]) and (sentiment == "undermining" or abuse_score > 40) ): return "performative regret" # 2. Coercive Warmth if ( (joy > 0.2 or sadness > 0.3) and any(p in patterns for p in ["control", "gaslighting"]) and sentiment == "undermining" ): return "coercive warmth" # 3. Cold Invalidation if ( (neutral + disgust) > 0.4 and any(p in patterns for p in ["dismissiveness", "projection", "obscure language"]) and sentiment == "undermining" ): return "cold invalidation" # 4. Genuine Vulnerability if ( (sadness + fear) > 0.4 and sentiment == "supportive" and all(p in ["recovery"] for p in patterns) ): return "genuine vulnerability" # 5. Emotional Threat if ( (anger + disgust) > 0.4 and any(p in patterns for p in ["control", "insults", "dismissiveness"]) and sentiment == "undermining" ): return "emotional threat" # 6. Weaponized Sadness if ( sadness > 0.5 and any(p in patterns for p in ["guilt tripping", "projection"]) and sentiment == "undermining" ): return "weaponized sadness" # 7. Toxic Resignation if ( neutral > 0.4 and any(p in patterns for p in ["dismissiveness", "obscure language"]) and sentiment == "undermining" ): return "toxic resignation" # 8. Aggressive Dismissal if ( anger > 0.4 and any(p in patterns for p in ["insults", "control"]) and sentiment == "undermining" ): return "aggressive dismissal" # 9. Deflective Hostility if ( (0.15 < anger < 0.6 or 0.15 < disgust < 0.6) and any(p in patterns for p in ["projection"]) and sentiment == "undermining" ): return "deflective hostility" # 10. Contradictory Gaslight if ( (joy + anger + sadness) > 0.4 and any(p in patterns for p in ["gaslighting", "contradictory statements"]) and sentiment == "undermining" ): return "contradictory gaslight" # 11. Forced Accountability Flip if ( (anger + disgust) > 0.4 and any(p in patterns for p in ["blame shifting", "projection"]) and sentiment == "undermining" ): return "forced accountability flip" # Emotional Instability Fallback if ( (anger + sadness + disgust) > 0.5 and sentiment == "undermining" ): return "emotional instability" return "neutral" @spaces.GPU def predict_darvo_score(text): """Predict DARVO score for given text""" try: inputs = darvo_tokenizer(text, return_tensors="pt", truncation=True, padding=True) inputs = {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): logits = darvo_model(**inputs).logits return round(sigmoid(logits.cpu()).item(), 4) except Exception as e: logger.error(f"Error in DARVO prediction: {e}") return 0.0 def detect_weapon_language(text): """Detect weapon-related language in text""" weapon_keywords = ["knife", "gun", "bomb", "weapon", "kill", "stab"] t = text.lower() return any(w in t for w in weapon_keywords) def get_risk_stage(patterns, sentiment): """Determine risk stage based on patterns and sentiment""" try: if "insults" in patterns: return 2 elif "recovery" in patterns: return 3 elif "control" in patterns or "guilt tripping" in patterns: return 1 elif sentiment == "supportive" and any(p in patterns for p in ["projection", "dismissiveness"]): return 4 return 1 except Exception as e: logger.error(f"Error determining risk stage: {e}") return 1 @spaces.GPU def compute_abuse_score(matched_scores, sentiment): """Compute abuse score from matched patterns and sentiment""" try: if not matched_scores: logger.debug("No matched scores, returning 0") return 0.0 # Calculate weighted score total_weight = sum(weight for _, _, weight in matched_scores) if total_weight == 0: logger.debug("Total weight is 0, returning 0") return 0.0 # Get highest pattern scores pattern_scores = [(label, score) for label, score, _ in matched_scores] sorted_scores = sorted(pattern_scores, key=lambda x: x[1], reverse=True) logger.debug(f"Sorted pattern scores: {sorted_scores}") # Base score calculation weighted_sum = sum(score * weight for _, score, weight in matched_scores) base_score = (weighted_sum / total_weight) * 100 logger.debug(f"Initial base score: {base_score:.1f}") # Cap maximum score based on pattern severity max_score = 85.0 # Set maximum possible score if any(label in {'control', 'gaslighting'} for label, _, _ in matched_scores): max_score = 90.0 logger.debug(f"Increased max score to {max_score} due to high severity patterns") # Apply diminishing returns for multiple patterns if len(matched_scores) > 1: multiplier = 1 + (0.1 * (len(matched_scores) - 1)) base_score *= multiplier logger.debug(f"Applied multiplier {multiplier:.2f} for {len(matched_scores)} patterns") # Apply sentiment modifier if sentiment == "supportive": base_score *= 0.85 logger.debug("Applied 15% reduction for supportive sentiment") final_score = min(round(base_score, 1), max_score) logger.debug(f"Final abuse score: {final_score}") return final_score except Exception as e: logger.error(f"Error computing abuse score: {e}") return 0.0 @spaces.GPU def analyze_single_message(text, thresholds): """Analyze a single message for abuse patterns""" logger.debug("\n=== DEBUG START ===") logger.debug(f"Input text: {text}") try: if not text.strip(): logger.debug("Empty text, returning zeros") return 0.0, [], [], {"label": "none"}, 1, 0.0, None # Check for explicit abuse explicit_abuse_words = ['fuck', 'bitch', 'shit', 'ass', 'dick'] explicit_abuse = any(word in text.lower() for word in explicit_abuse_words) logger.debug(f"Explicit abuse detected: {explicit_abuse}") # Abuse model inference inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True) inputs = {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): outputs = model(**inputs) raw_scores = torch.sigmoid(outputs.logits.squeeze(0)).cpu().numpy() # Log raw model outputs logger.debug("\nRaw model scores:") for label, score in zip(LABELS, raw_scores): logger.debug(f"{label}: {score:.3f}") # Get predictions and sort them predictions = list(zip(LABELS, raw_scores)) sorted_predictions = sorted(predictions, key=lambda x: x[1], reverse=True) logger.debug("\nTop 3 predictions:") for label, score in sorted_predictions[:3]: logger.debug(f"{label}: {score:.3f}") # Apply thresholds threshold_labels = [] if explicit_abuse: threshold_labels.append("insults") logger.debug("\nForced inclusion of 'insults' due to explicit abuse") for label, score in sorted_predictions: base_threshold = thresholds.get(label, 0.25) if explicit_abuse: base_threshold *= 0.5 if score > base_threshold: if label not in threshold_labels: threshold_labels.append(label) logger.debug("\nLabels that passed thresholds:", threshold_labels) # Calculate matched scores matched_scores = [] for label in threshold_labels: score = raw_scores[LABELS.index(label)] weight = PATTERN_WEIGHTS.get(label, 1.0) if explicit_abuse and label == "insults": weight *= 1.5 matched_scores.append((label, score, weight)) # Get sentiment sent_inputs = sentiment_tokenizer(text, return_tensors="pt", truncation=True, padding=True) sent_inputs = {k: v.to(device) for k, v in sent_inputs.items()} with torch.no_grad(): sent_logits = sentiment_model(**sent_inputs).logits[0] sent_probs = torch.softmax(sent_logits, dim=-1).cpu().numpy() sentiment = SENTIMENT_LABELS[int(np.argmax(sent_probs))] # Calculate abuse score abuse_score = compute_abuse_score(matched_scores, sentiment) if explicit_abuse: abuse_score = max(abuse_score, 70.0) # Get DARVO score darvo_score = predict_darvo_score(text) # Get tone using emotion-based approach tone_tag = get_emotional_tone_tag(text, sentiment, threshold_labels, abuse_score) # Check for the specific combination highest_pattern = max(matched_scores, key=lambda x: x[1])[0] if matched_scores else None # Get highest pattern if sentiment == "supportive" and tone_tag == "neutral" and highest_pattern == "obscure language": logger.debug("Message classified as likely non-abusive (supportive, neutral, and obscure language). Returning low risk.") return 0.0, [], [], {"label": "supportive"}, 1, 0.0, "neutral" # Return non-abusive values # Set stage stage = 2 if explicit_abuse or abuse_score > 70 else 1 logger.debug("=== DEBUG END ===\n") return abuse_score, threshold_labels, matched_scores, {"label": sentiment}, stage, darvo_score, tone_tag except Exception as e: logger.error(f"Error in analyze_single_message: {e}") return 0.0, [], [], {"label": "error"}, 1, 0.0, None def generate_abuse_score_chart(dates, scores, patterns): """Generate a timeline chart of abuse scores""" try: plt.figure(figsize=(10, 6)) plt.clf() # Create new figure fig, ax = plt.subplots(figsize=(10, 6)) # Plot points and lines x = range(len(scores)) plt.plot(x, scores, 'bo-', linewidth=2, markersize=8) # Add labels for each point with highest scoring pattern for i, (score, pattern) in enumerate(zip(scores, patterns)): # Get the pattern and its score plt.annotate( f'{pattern}\n{score:.0f}%', (i, score), textcoords="offset points", xytext=(0, 10), ha='center', bbox=dict( boxstyle='round,pad=0.5', fc='white', ec='gray', alpha=0.8 ) ) # Customize the plot plt.ylim(-5, 105) plt.grid(True, linestyle='--', alpha=0.7) plt.title('Abuse Pattern Timeline', pad=20, fontsize=12) plt.ylabel('Abuse Score %') # X-axis labels plt.xticks(x, dates, rotation=45) # Risk level bands with better colors plt.axhspan(0, 50, color='#90EE90', alpha=0.2) # light green - Low Risk plt.axhspan(50, 70, color='#FFD700', alpha=0.2) # gold - Moderate Risk plt.axhspan(70, 85, color='#FFA500', alpha=0.2) # orange - High Risk plt.axhspan(85, 100, color='#FF6B6B', alpha=0.2) # light red - Critical Risk # Add risk level labels plt.text(-0.2, 25, 'Low Risk', rotation=90, va='center') plt.text(-0.2, 60, 'Moderate Risk', rotation=90, va='center') plt.text(-0.2, 77.5, 'High Risk', rotation=90, va='center') plt.text(-0.2, 92.5, 'Critical Risk', rotation=90, va='center') # Adjust layout plt.tight_layout() # Convert plot to image buf = io.BytesIO() plt.savefig(buf, format='png', bbox_inches='tight') buf.seek(0) plt.close('all') # Close all figures to prevent memory leaks return Image.open(buf) except Exception as e: logger.error(f"Error generating abuse score chart: {e}") return None def analyze_composite(msg1, msg2, msg3, *answers_and_none): """Analyze multiple messages and checklist responses""" logger.debug("\nπŸ”„ STARTING NEW ANALYSIS") logger.debug("=" * 50) # Define severity categories at the start high = {'control'} moderate = {'gaslighting', 'dismissiveness', 'obscure language', 'insults', 'contradictory statements', 'guilt tripping'} low = {'blame shifting', 'projection', 'recovery'} try: # Process checklist responses logger.debug("\nπŸ“‹ CHECKLIST PROCESSING") logger.debug("=" * 50) none_selected_checked = answers_and_none[-1] responses_checked = any(answers_and_none[:-1]) none_selected = not responses_checked and none_selected_checked logger.debug("Checklist Status:") logger.debug(f" β€’ None Selected Box: {'βœ“' if none_selected_checked else 'βœ—'}") logger.debug(f" β€’ Has Responses: {'βœ“' if responses_checked else 'βœ—'}") logger.debug(f" β€’ Final Status: {'None Selected' if none_selected else 'Has Selections'}") if none_selected: escalation_score = 0 escalation_note = "Checklist completed: no danger items reported." escalation_completed = True logger.debug("\nβœ“ Checklist: No items selected") elif responses_checked: escalation_score = sum(w for (_, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]) if a) escalation_note = "Checklist completed." escalation_completed = True logger.debug(f"\nπŸ“Š Checklist Score: {escalation_score}") # Log checked items logger.debug("\n⚠️ Selected Risk Factors:") for (q, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]): if a: logger.debug(f" β€’ [{w} points] {q}") else: escalation_score = None escalation_note = "Checklist not completed." escalation_completed = False logger.debug("\n❗ Checklist: Not completed") # Process messages logger.debug("\nπŸ“ MESSAGE PROCESSING") logger.debug("=" * 50) messages = [msg1, msg2, msg3] active = [(m, f"Message {i+1}") for i, m in enumerate(messages) if m.strip()] logger.debug(f"Active Messages: {len(active)} of 3") if not active: logger.debug("❌ Error: No messages provided") return "Please enter at least one message.", None # Detect threats logger.debug("\n🚨 THREAT DETECTION") logger.debug("=" * 50) def normalize(text): import unicodedata text = text.lower().strip() text = unicodedata.normalize("NFKD", text) text = text.replace("'", "'") return re.sub(r"[^a-z0-9 ]", "", text) def detect_threat_motifs(message, motif_list): norm_msg = normalize(message) return [motif for motif in motif_list if normalize(motif) in norm_msg] # Analyze threats and patterns immediate_threats = [detect_threat_motifs(m, THREAT_MOTIFS) for m, _ in active] flat_threats = [t for sublist in immediate_threats for t in sublist] threat_risk = "Yes" if flat_threats else "No" # Analyze each message logger.debug("\nπŸ” INDIVIDUAL MESSAGE ANALYSIS") logger.debug("=" * 50) results = [] for m, d in active: logger.debug(f"\nπŸ“ ANALYZING {d}") logger.debug("-" * 40) # Separator for each message result = analyze_single_message(m, THRESHOLDS.copy()) # Check for non-abusive classification and skip further analysis if result[0] == 0.0 and result[1] == [] and result[3] == {"label": "supportive"} and result[4] == 1 and result[5] == 0.0 and result[6] == "neutral": logger.debug(f"βœ“ {d} classified as non-abusive, skipping further analysis.") # Option to include in final output (uncomment if needed): # results.append(({"abuse_score": 0.0, "patterns": [], "sentiment": {"label": "supportive"}, "stage": 1, "darvo_score": 0.0, "tone": "neutral"}, d)) continue # Skip to the next message results.append((result, d)) # Log the detailed results for the current message (if not skipped) abuse_score, patterns, matched_scores, sentiment, stage, darvo_score, tone = result logger.debug(f"\nπŸ“Š Results for {d}:") logger.debug(f" β€’ Abuse Score: {abuse_score:.1f}%") logger.debug(f" β€’ DARVO Score: {darvo_score:.3f}") logger.debug(f" β€’ Risk Stage: {stage}") logger.debug(f" β€’ Sentiment: {sentiment['label']}") logger.debug(f" β€’ Tone: {tone}") if patterns: logger.debug(" β€’ Patterns: " + ", ".join(patterns)) # Unpack results for cleaner logging abuse_score, patterns, matched_scores, sentiment, stage, darvo_score, tone = result # Log core metrics logger.debug("\nπŸ“Š CORE METRICS") logger.debug(f" β€’ Abuse Score: {abuse_score:.1f}%") logger.debug(f" β€’ DARVO Score: {darvo_score:.3f}") logger.debug(f" β€’ Risk Stage: {stage}") logger.debug(f" β€’ Sentiment: {sentiment['label']}") logger.debug(f" β€’ Tone: {tone}") # Log detected patterns with scores if patterns: logger.debug("\n🎯 DETECTED PATTERNS") for label, score, weight in matched_scores: severity = "❗HIGH" if label in high else "⚠️ MODERATE" if label in moderate else "πŸ“ LOW" logger.debug(f" β€’ {severity} | {label}: {score:.3f} (weight: {weight})") else: logger.debug("\nβœ“ No abuse patterns detected") # Extract scores and metadata abuse_scores = [r[0][0] for r in results] stages = [r[0][4] for r in results] darvo_scores = [r[0][5] for r in results] tone_tags = [r[0][6] for r in results] dates_used = [r[1] for r in results] # Pattern Analysis Summary logger.debug("\nπŸ“ˆ PATTERN ANALYSIS SUMMARY") logger.debug("=" * 50) predicted_labels = [label for r in results for label in r[0][1]] if predicted_labels: logger.debug("Detected Patterns Across All Messages:") pattern_counts = Counter(predicted_labels) # Log high severity patterns first high_patterns = [p for p in pattern_counts if p in high] if high_patterns: logger.debug("\n❗ HIGH SEVERITY PATTERNS:") for p in high_patterns: logger.debug(f" β€’ {p} (Γ—{pattern_counts[p]})") # Then moderate moderate_patterns = [p for p in pattern_counts if p in moderate] if moderate_patterns: logger.debug("\n⚠️ MODERATE SEVERITY PATTERNS:") for p in moderate_patterns: logger.debug(f" β€’ {p} (Γ—{pattern_counts[p]})") # Then low low_patterns = [p for p in pattern_counts if p in low] if low_patterns: logger.debug("\nπŸ“ LOW SEVERITY PATTERNS:") for p in low_patterns: logger.debug(f" β€’ {p} (Γ—{pattern_counts[p]})") else: logger.debug("βœ“ No patterns detected across messages") # Pattern Severity Analysis logger.debug("\nβš–οΈ SEVERITY ANALYSIS") logger.debug("=" * 50) counts = {'high': 0, 'moderate': 0, 'low': 0} for label in predicted_labels: if label in high: counts['high'] += 1 elif label in moderate: counts['moderate'] += 1 elif label in low: counts['low'] += 1 logger.debug("Pattern Distribution:") if counts['high'] > 0: logger.debug(f" ❗ High Severity: {counts['high']}") if counts['moderate'] > 0: logger.debug(f" ⚠️ Moderate Severity: {counts['moderate']}") if counts['low'] > 0: logger.debug(f" πŸ“ Low Severity: {counts['low']}") total_patterns = sum(counts.values()) if total_patterns > 0: logger.debug(f"\nSeverity Percentages:") logger.debug(f" β€’ High: {(counts['high']/total_patterns)*100:.1f}%") logger.debug(f" β€’ Moderate: {(counts['moderate']/total_patterns)*100:.1f}%") logger.debug(f" β€’ Low: {(counts['low']/total_patterns)*100:.1f}%") # Risk Assessment logger.debug("\n🎯 RISK ASSESSMENT") logger.debug("=" * 50) if counts['high'] >= 2 and counts['moderate'] >= 2: pattern_escalation_risk = "Critical" logger.debug("❗❗ CRITICAL RISK") logger.debug(" β€’ Multiple high and moderate patterns detected") logger.debug(f" β€’ High patterns: {counts['high']}") logger.debug(f" β€’ Moderate patterns: {counts['moderate']}") elif (counts['high'] >= 2 and counts['moderate'] >= 1) or \ (counts['moderate'] >= 3) or \ (counts['high'] >= 1 and counts['moderate'] >= 2): pattern_escalation_risk = "High" logger.debug("❗ HIGH RISK") logger.debug(" β€’ Significant pattern combination detected") logger.debug(f" β€’ High patterns: {counts['high']}") logger.debug(f" β€’ Moderate patterns: {counts['moderate']}") elif (counts['moderate'] == 2) or \ (counts['high'] == 1 and counts['moderate'] == 1) or \ (counts['moderate'] == 1 and counts['low'] >= 2) or \ (counts['high'] == 1 and sum(counts.values()) == 1): pattern_escalation_risk = "Moderate" logger.debug("⚠️ MODERATE RISK") logger.debug(" β€’ Concerning pattern combination detected") logger.debug(f" β€’ Pattern distribution: H:{counts['high']}, M:{counts['moderate']}, L:{counts['low']}") else: pattern_escalation_risk = "Low" logger.debug("πŸ“ LOW RISK") logger.debug(" β€’ Limited pattern severity detected") logger.debug(f" β€’ Pattern distribution: H:{counts['high']}, M:{counts['moderate']}, L:{counts['low']}") # Checklist Risk Assessment logger.debug("\nπŸ“‹ CHECKLIST RISK ASSESSMENT") logger.debug("=" * 50) checklist_escalation_risk = "Unknown" if escalation_score is None else ( "Critical" if escalation_score >= 20 else "Moderate" if escalation_score >= 10 else "Low" ) if escalation_score is not None: logger.debug(f"Score: {escalation_score}/29") logger.debug(f"Risk Level: {checklist_escalation_risk}") if escalation_score >= 20: logger.debug("❗❗ CRITICAL: Score indicates severe risk") elif escalation_score >= 10: logger.debug("⚠️ MODERATE: Score indicates concerning risk") else: logger.debug("πŸ“ LOW: Score indicates limited risk") else: logger.debug("❓ Risk Level: Unknown (checklist not completed)") # Escalation Analysis logger.debug("\nπŸ“ˆ ESCALATION ANALYSIS") logger.debug("=" * 50) escalation_bump = 0 for result, msg_id in results: abuse_score, _, _, sentiment, stage, darvo_score, tone_tag = result logger.debug(f"\nπŸ” Message {msg_id} Risk Factors:") factors = [] if darvo_score > 0.65: escalation_bump += 3 factors.append(f"β–² +3: High DARVO score ({darvo_score:.3f})") if tone_tag in ["forced accountability flip", "emotional threat"]: escalation_bump += 2 factors.append(f"β–² +2: Concerning tone ({tone_tag})") if abuse_score > 80: escalation_bump += 2 factors.append(f"β–² +2: High abuse score ({abuse_score:.1f}%)") if stage == 2: escalation_bump += 3 factors.append("β–² +3: Escalation stage") if factors: for factor in factors: logger.debug(f" {factor}") else: logger.debug(" βœ“ No escalation factors") logger.debug(f"\nπŸ“Š Total Escalation Bump: +{escalation_bump}") # Combined Risk Calculation logger.debug("\n🎯 FINAL RISK CALCULATION") logger.debug("=" * 50) def rank(label): return {"Low": 0, "Moderate": 1, "High": 2, "Critical": 3, "Unknown": 0}.get(label, 0) combined_score = rank(pattern_escalation_risk) + rank(checklist_escalation_risk) + escalation_bump logger.debug("Risk Components:") logger.debug(f" β€’ Pattern Risk ({pattern_escalation_risk}): +{rank(pattern_escalation_risk)}") logger.debug(f" β€’ Checklist Risk ({checklist_escalation_risk}): +{rank(checklist_escalation_risk)}") logger.debug(f" β€’ Escalation Bump: +{escalation_bump}") logger.debug(f" = Combined Score: {combined_score}") escalation_risk = ( "Critical" if combined_score >= 6 else "High" if combined_score >= 4 else "Moderate" if combined_score >= 2 else "Low" ) logger.debug(f"\n⚠️ Final Escalation Risk: {escalation_risk}") # Generate Output Text logger.debug("\nπŸ“ GENERATING OUTPUT") logger.debug("=" * 50) if escalation_score is None: escalation_text = ( "🚫 **Escalation Potential: Unknown** (Checklist not completed)\n" "⚠️ This section was not completed. Escalation potential is estimated using message data only.\n" ) hybrid_score = 0 logger.debug("Generated output for incomplete checklist") elif escalation_score == 0: escalation_text = ( "βœ… **Escalation Checklist Completed:** No danger items reported.\n" "🧭 **Escalation potential estimated from detected message patterns only.**\n" f"β€’ Pattern Risk: {pattern_escalation_risk}\n" f"β€’ Checklist Risk: None reported\n" f"β€’ Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)" ) hybrid_score = escalation_bump logger.debug("Generated output for no-risk checklist") else: hybrid_score = escalation_score + escalation_bump escalation_text = ( f"πŸ“ˆ **Escalation Potential: {escalation_risk} ({hybrid_score}/29)**\n" "πŸ“‹ This score combines your safety checklist answers *and* detected high-risk behavior.\n" f"β€’ Pattern Risk: {pattern_escalation_risk}\n" f"β€’ Checklist Risk: {checklist_escalation_risk}\n" f"β€’ Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)" ) logger.debug(f"Generated output with hybrid score: {hybrid_score}/29") # Final Metrics logger.debug("\nπŸ“Š FINAL METRICS") logger.debug("=" * 50) composite_abuse = int(round(sum(abuse_scores) / len(abuse_scores))) logger.debug(f"Composite Abuse Score: {composite_abuse}%") most_common_stage = max(set(stages), key=stages.count) logger.debug(f"Most Common Stage: {most_common_stage}") avg_darvo = round(sum(darvo_scores) / len(darvo_scores), 3) logger.debug(f"Average DARVO Score: {avg_darvo}") # Generate Final Report logger.debug("\nπŸ“„ GENERATING FINAL REPORT") logger.debug("=" * 50) out = f"Abuse Intensity: {composite_abuse}%\n" out += "πŸ“Š This reflects the strength and severity of detected abuse patterns in the message(s).\n\n" # Risk Level Assessment risk_level = ( "Critical" if composite_abuse >= 85 or hybrid_score >= 20 else "High" if composite_abuse >= 70 or hybrid_score >= 15 else "Moderate" if composite_abuse >= 50 or hybrid_score >= 10 else "Low" ) logger.debug(f"Final Risk Level: {risk_level}") # Add Risk Description risk_descriptions = { "Critical": ( "🚨 **Risk Level: Critical**\n" "Multiple severe abuse patterns detected. This situation shows signs of " "dangerous escalation and immediate intervention may be needed." ), "High": ( "⚠️ **Risk Level: High**\n" "Strong abuse patterns detected. This situation shows concerning " "signs of manipulation and control." ), "Moderate": ( "⚑ **Risk Level: Moderate**\n" "Concerning patterns detected. While not severe, these behaviors " "indicate unhealthy relationship dynamics." ), "Low": ( "πŸ“ **Risk Level: Low**\n" "Minor concerning patterns detected. While present, the detected " "behaviors are subtle or infrequent." ) } out += risk_descriptions[risk_level] out += f"\n\n{RISK_STAGE_LABELS[most_common_stage]}" logger.debug("Added risk description and stage information") # Add DARVO Analysis if avg_darvo > 0.25: level = "moderate" if avg_darvo < 0.65 else "high" out += f"\n\n🎭 **DARVO Score: {avg_darvo}** β†’ This indicates a **{level} likelihood** of narrative reversal (DARVO), where the speaker may be denying, attacking, or reversing blame." logger.debug(f"Added DARVO analysis ({level} level)") # Add Emotional Tones logger.debug("\n🎭 Adding Emotional Tones") out += "\n\n🎭 **Emotional Tones Detected:**\n" for i, tone in enumerate(tone_tags): out += f"β€’ Message {i+1}: *{tone or 'none'}*\n" logger.debug(f"Message {i+1} tone: {tone}") # Add Threats Section logger.debug("\n⚠️ Adding Threat Analysis") if flat_threats: out += "\n\n🚨 **Immediate Danger Threats Detected:**\n" for t in set(flat_threats): out += f"β€’ \"{t}\"\n" out += "\n⚠️ These phrases may indicate an imminent risk to physical safety." logger.debug(f"Added {len(set(flat_threats))} unique threat warnings") else: out += "\n\n🧩 **Immediate Danger Threats:** None explicitly detected.\n" out += "This does *not* rule out risk, but no direct threat phrases were matched." logger.debug("No threats to add") # Generate Timeline logger.debug("\nπŸ“ˆ Generating Timeline") pattern_labels = [] for result, _ in results: matched_scores = result[2] # Get the matched_scores from the result tuple if matched_scores: # Sort matched_scores by score and get the highest scoring pattern highest_pattern = max(matched_scores, key=lambda x: x[1]) pattern_labels.append(highest_pattern[0]) # Add the pattern name else: pattern_labels.append("none") logger.debug("Pattern labels for timeline:") for i, (pattern, score) in enumerate(zip(pattern_labels, abuse_scores)): logger.debug(f"Message {i+1}: {pattern} ({score:.1f}%)") timeline_image = generate_abuse_score_chart(dates_used, abuse_scores, pattern_labels) logger.debug("Timeline generated successfully") # Add Escalation Text out += "\n\n" + escalation_text logger.debug("Added escalation text to output") logger.debug("\nβœ… ANALYSIS COMPLETE") logger.debug("=" * 50) return out, timeline_image except Exception as e: logger.error("\n❌ ERROR IN ANALYSIS") logger.error("=" * 50) logger.error(f"Error type: {type(e).__name__}") logger.error(f"Error message: {str(e)}") logger.error(f"Traceback:\n{traceback.format_exc()}") return "An error occurred during analysis.", None # Gradio Interface Setup def create_interface(): try: textbox_inputs = [gr.Textbox(label=f"Message {i+1}") for i in range(3)] quiz_boxes = [gr.Checkbox(label=q) for q, _ in ESCALATION_QUESTIONS] none_box = gr.Checkbox(label="None of the above") demo = gr.Interface( fn=analyze_composite, inputs=textbox_inputs + quiz_boxes + [none_box], outputs=[ gr.Textbox(label="Results"), gr.Image(label="Abuse Score Timeline", type="pil") ], title="Abuse Pattern Detector + Escalation Quiz", description=( "Enter up to three messages that concern you. " "For the most accurate results, include messages from a recent emotionally intense period." ), flagging_mode="manual" ) return demo except Exception as e: logger.error(f"Error creating interface: {e}") raise # Main execution if __name__ == "__main__": try: demo = create_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=False ) except Exception as e: logger.error(f"Failed to launch app: {e}") raise