import csv import numpy as np from collections import defaultdict from scipy.optimize import linear_sum_assignment import os def load_clusters(path): cluster_to_tokens = defaultdict(set) with open(path, "r", encoding="utf-8") as f: for line in f: parts = line.strip().split("|||") if len(parts) < 2: continue token = parts[0] cluster_id = parts[-1] cluster_to_tokens[cluster_id].add(token) return cluster_to_tokens def compute_jaccard_matrix(clusters_a, clusters_b): a_keys = list(clusters_a.keys()) b_keys = list(clusters_b.keys()) matrix = np.zeros((len(a_keys), len(b_keys))) for i, ca in enumerate(a_keys): for j, cb in enumerate(b_keys): set_a = clusters_a[ca] set_b = clusters_b[cb] intersection = len(set_a & set_b) union = len(set_a | set_b) matrix[i, j] = intersection / union if union > 0 else 0.0 return matrix, a_keys, b_keys # Dictionary mapping perturbation names to their descriptions perturbation_descriptions = { "Scope Modification": "Identifies variables in complex scopes and moves them to unrelated blocks.", "Log Modification": "Adds logging statements to blocks of code for tracking execution flow.", "Operator Modification": "Modifies boolean expressions by negating them in various contexts.", "Pointer Modification": "Add C style pointer to the code.", "POS finetuned": "Clusters based on finetuned POS codebert model", "Random Modification": "Permutes statements within basic blocks, allowing different execution orders.", "Try Catch Modification": "Converts switch statements into equivalent if statements.", "Unused Statement Modification": "Inserts unused statements into blocks of code for testing/debugging.", "Exact Name Class Variable Modification": "Renames classes and variables to a specific randomly generated name.", "Casing Class Variable Modification": "Generates lexical variations of class and variable names with different casing.", "Onecase Modification": "Generates lexical variations of class and variable names with just 1 letter uppercase wither for class anme or variable name.", "Example Modification": "Generates lexical variations of class and variable names with Example being the class name and example being the variable name or vice versa.", "Finetuned on compile error": "Clusters based on finetuned codebert model on compile errors", "Finetuned on language classification": "Clusters based on finetuned codebert model on language classification", } def compute_and_log_csi(file_orig, file_pert, perturbation_name, output_csv="results/csi_summary.csv"): clusters_orig = load_clusters(file_orig) clusters_pert = load_clusters(file_pert) if len(clusters_orig) != len(clusters_pert): raise ValueError(f"Cluster count mismatch: {len(clusters_orig)} (original) vs {len(clusters_pert)} (perturbed)") jaccard_matrix, orig_ids, pert_ids = compute_jaccard_matrix(clusters_orig, clusters_pert) row_ind, col_ind = linear_sum_assignment(-jaccard_matrix) matched_similarities = [jaccard_matrix[i, j] for i, j in zip(row_ind, col_ind)] avg_jaccard = np.mean(matched_similarities) csi = 1.0 - avg_jaccard print(f"Perturbation: {perturbation_name}") print(f" Average Jaccard Similarity: {avg_jaccard:.4f}") print(f" Cluster Sensitivity Index (CSI): {csi:.4f}") # Append to CSV os.makedirs(os.path.dirname(output_csv), exist_ok=True) file_exists = os.path.isfile(output_csv) with open(output_csv, mode="a", newline='', encoding="utf-8") as file: writer = csv.writer(file) if not file_exists: writer.writerow(["Perturbation", "Average Jaccard", "CSI", "Description"]) writer.writerow([perturbation_name, avg_jaccard, csi, perturbation_descriptions.get(perturbation_name, "No description available")]) return avg_jaccard, csi # Example usage compute_and_log_csi( "codenet_4000_del_15000/Java/layer12/kmeans/clusters-kmeans-350.txt", "codenet_4000_scope_error/java/layer12/kmeans/clusters-kmeans-350.txt", perturbation_name="Scope Modification", output_csv="results/csi_summary.csv" ) compute_and_log_csi( "codenet_4000_del_15000/Java/layer12/kmeans/clusters-kmeans-350.txt", "codenet_4000_log/java/layer12/kmeans/clusters-kmeans-350.txt", perturbation_name="Log Modification", output_csv="results/csi_summary.csv" ) compute_and_log_csi( "codenet_4000_del_15000/Java/layer12/kmeans/clusters-kmeans-350.txt", "codenet_4000_operator/java/layer12/kmeans/clusters-kmeans-350.txt", perturbation_name="Operator Modification", output_csv="results/csi_summary.csv" ) compute_and_log_csi( "codenet_4000_del_15000/Java/layer12/kmeans/clusters-kmeans-350.txt", "codenet_4000_pointer/java/layer12/kmeans/clusters-kmeans-350.txt", perturbation_name="Pointer Modification", output_csv="results/csi_summary.csv" ) compute_and_log_csi( "codenet_4000_del_15000/Java/layer12/kmeans/clusters-kmeans-350.txt", "codenet_4000_POS/java/layer12/kmeans/clusters-kmeans-350.txt", perturbation_name="POS finetuned", output_csv="results/csi_summary.csv" ) compute_and_log_csi( "codenet_4000_del_15000/Java/layer12/kmeans/clusters-kmeans-350.txt", "codenet_4000_random/java/layer12/kmeans/clusters-kmeans-350.txt", perturbation_name="Random Modification", output_csv="results/csi_summary.csv" ) compute_and_log_csi( "codenet_4000_del_15000/Java/layer12/kmeans/clusters-kmeans-350.txt", "codenet_4000_trycatch/java/layer12/kmeans/clusters-kmeans-350.txt", perturbation_name="Try Catch Modification", output_csv="results/csi_summary.csv" ) compute_and_log_csi( "codenet_4000_del_15000/Java/layer12/kmeans/clusters-kmeans-350.txt", "codenet_4000_unusedStatement/java/layer12/kmeans/clusters-kmeans-350.txt", perturbation_name="Unused Statement Modification", output_csv="results/csi_summary.csv" ) compute_and_log_csi( "codenet_4000_del_15000/Java/layer12/kmeans/clusters-kmeans-350.txt", "codenet_4000_exactNameClassVariable/java/layer12/kmeans/clusters-kmeans-350.txt", perturbation_name="Exact Name Class Variable Modification", output_csv="results/csi_summary.csv" ) compute_and_log_csi( "codenet_4000_del_15000/Java/layer12/kmeans/clusters-kmeans-350.txt", "codenet_4000_CasingClassVariable/java/layer12/kmeans/clusters-kmeans-350.txt", perturbation_name="Casing Class Variable Modification", output_csv="results/csi_summary.csv" ) compute_and_log_csi( "codenet_4000_del_15000/Java/layer12/kmeans/clusters-kmeans-350.txt", "codenet_4000_Onecase/java/layer12/kmeans/clusters-kmeans-350.txt", perturbation_name="Onecase Modification", output_csv="results/csi_summary.csv" ) compute_and_log_csi( "codenet_4000_Example/Java/layer12/kmeans/clusters-kmeans-350.txt", "codenet_4000_Example/java/layer12/kmeans/clusters-kmeans-350.txt", perturbation_name="Example Modification", output_csv="results/csi_summary.csv" ) compute_and_log_csi( "codenet_4000_finetuned_compile_error/java/layer12/kmeans/clusters-kmeans-350.txt", "codenet_4000_finetuned_compile_error/java/layer12/kmeans/clusters-kmeans-350.txt", perturbation_name="Finetuned on compile error", output_csv="results/csi_summary.csv" ) compute_and_log_csi( "codenet_4000_finetuned_language_classification/java/layer12/kmeans/clusters-kmeans-350.txt", "codenet_4000_finetuned_language_classification/java/layer12/kmeans/clusters-kmeans-350.txt", perturbation_name="Finetuned on language classification", output_csv="results/csi_summary.csv" ) # You can now call compute_and_log_csi again and again for other perturbations!