From fb6cf42548634716ce93178e79899de0f0052232 Mon Sep 17 00:00:00 2001 From: weixin_43297441 Date: Thu, 27 Nov 2025 16:09:13 +0800 Subject: [PATCH] up --- check_npy_shapes.py | 315 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 315 insertions(+) create mode 100644 check_npy_shapes.py diff --git a/check_npy_shapes.py b/check_npy_shapes.py new file mode 100644 index 0000000..69c0b15 --- /dev/null +++ b/check_npy_shapes.py @@ -0,0 +1,315 @@ +#!/usr/bin/env python3 +""" +Script to check the shape of *.npy files in save_for_eval/AdvBench/alpaca_7B_jailbreak +load them as benign vs adverse for comparison, and perform PCA analysis to identify +the most discriminative layers. +""" + +import numpy as np +import os +import glob +from pathlib import Path +from sklearn.decomposition import PCA +from sklearn.preprocessing import StandardScaler +import matplotlib.pyplot as plt + +def check_npy_shapes(directory_path): + """ + Check shapes of all .npy files in the specified directory + and load them as benign vs adverse pairs. + """ + # Get all .npy files in the directory + npy_files = glob.glob(os.path.join(directory_path, "*.npy")) + + if not npy_files: + print(f"No .npy files found in {directory_path}") + return + + print(f"Found {len(npy_files)} .npy files:") + print("-" * 50) + + # Separate files into benign and adverse + benign_files = [f for f in npy_files if "benign" in f.lower()] + adverse_files = [f for f in npy_files if "adverse" in f.lower()] + + # Check shapes and load data + benign_data = {} + adverse_data = {} + + # Process benign files + print("BENIGN FILES:") + for file_path in benign_files: + try: + data = np.load(file_path) + file_name = os.path.basename(file_path) + benign_data[file_name] = data + print(f" {file_name}: shape {data.shape}, dtype {data.dtype}") + except Exception as e: + print(f" Error loading {file_path}: {e}") + + print("\nADVERSE FILES:") + for file_path in adverse_files: + try: + data = np.load(file_path) + file_name = os.path.basename(file_path) + adverse_data[file_name] = data + print(f" {file_name}: shape {data.shape}, dtype {data.dtype}") + except Exception as e: + print(f" Error loading {file_path}: {e}") + + print("\n" + "="*50) + print("COMPARISON SUMMARY:") + print("="*50) + + # Compare corresponding benign and adverse files + for benign_name in benign_data: + # Find corresponding adverse file + adverse_name = benign_name.replace("benign", "adverse") + if adverse_name in adverse_data: + benign_array = benign_data[benign_name] + adverse_array = adverse_data[adverse_name] + + print(f"\nComparing {benign_name} vs {adverse_name}:") + print(f" Shapes: {benign_array.shape} vs {adverse_array.shape}") + print(f" Dtypes: {benign_array.dtype} vs {adverse_array.dtype}") + + if benign_array.shape == adverse_array.shape: + print(f" ✓ Shapes match") + + # Calculate some basic statistics for comparison + diff = np.abs(benign_array - adverse_array) + print(f" Mean absolute difference: {np.mean(diff):.6f}") + print(f" Max absolute difference: {np.max(diff):.6f}") + print(f" Min absolute difference: {np.min(diff):.6f}") + else: + print(f" ✗ Shapes don't match!") + + print("-" * 30) + + return benign_data, adverse_data + +def reshape_for_layer_analysis(data_array, n_samples=234, n_layers=15): + """ + Reshape data from (3510, *, 4096) to (n_samples, n_layers, *, 4096) + where 3510 = 234 samples × 15 layers + """ + if data_array.shape[0] != 3510: + raise ValueError(f"Expected first dimension to be 3510, got {data_array.shape[0]}") + + # Reshape to (234, 15, *, 4096) + if len(data_array.shape) == 3: + # For head_wise and mlp_wise: (3510, 32, 4096) -> (234, 15, 32, 4096) + # For layer_wise: (3510, 33, 4096) -> (234, 15, 33, 4096) + reshaped = data_array.reshape(n_samples, n_layers, data_array.shape[1], data_array.shape[2]) + elif len(data_array.shape) == 2: + # For simpler cases: (3510, 4096) -> (234, 15, 4096) + reshaped = data_array.reshape(n_samples, n_layers, data_array.shape[1]) + else: + raise ValueError(f"Unsupported array shape: {data_array.shape}") + + return reshaped + +def perform_pca_analysis(benign_data, adverse_data): + """ + Perform PCA analysis to identify the most discriminative components + (heads or layers) between benign and adverse data. + """ + print("\n" + "="*60) + print("PCA ANALYSIS - IDENTIFYING DISCRIMINATIVE COMPONENTS") + print("="*60) + + # Create output directory for visualizations + os.makedirs("visualizations", exist_ok=True) + + for benign_name in benign_data: + adverse_name = benign_name.replace("benign", "adverse") + if adverse_name in adverse_data: + benign_array = benign_data[benign_name] + adverse_array = adverse_data[adverse_name] + + print(f"\nAnalyzing {benign_name}:") + print(f" Original shape: {benign_array.shape}") + + try: + # Get the number of components (32 for heads/mlp, 33 for layers) + n_components = benign_array.shape[1] + component_type = "head" if "head" in benign_name else ("layer" if "layer" in benign_name else "mlp") + + print(f" Analyzing {n_components} {component_type}s") + + # Prepare data for PCA: analyze each component separately + component_discrimination_scores = [] + all_pca_results = [] + + for comp_idx in range(n_components): + # Extract data for this specific component across all samples + benign_comp_data = benign_array[:, comp_idx, :].reshape(-1, 4096) + adverse_comp_data = adverse_array[:, comp_idx, :].reshape(-1, 4096) + + # Combine benign and adverse data for this component + X = np.vstack([benign_comp_data, adverse_comp_data]) + y = np.hstack([np.zeros(len(benign_comp_data)), np.ones(len(adverse_comp_data))]) + + # Standardize the data + scaler = StandardScaler() + X_scaled = scaler.fit_transform(X) + + # Perform PCA + pca = PCA(n_components=2) + X_pca = pca.fit_transform(X_scaled) + + # Calculate separation score (variance explained by PC1) + separation_score = pca.explained_variance_ratio_[0] + component_discrimination_scores.append(separation_score) + + # Store PCA results for visualization + all_pca_results.append({ + 'component': comp_idx + 1, + 'X_pca': X_pca, + 'y': y, + 'pca': pca, + 'score': separation_score + }) + + print(f" {component_type.capitalize()} {comp_idx + 1}: PCA separation score = {separation_score:.4f}") + + # Create visualizations + create_component_visualizations(all_pca_results, component_discrimination_scores, component_type) + + # Identify the most discriminative components + sorted_components = np.argsort(component_discrimination_scores)[::-1] + print(f"\n Most discriminative {component_type}s (highest to lowest):") + for i, comp_idx in enumerate(sorted_components): + print(f" {component_type.capitalize()} {comp_idx + 1}: {component_discrimination_scores[comp_idx]:.4f}") + + # Analyze the top discriminative component in more detail + top_comp_idx = sorted_components[0] + print(f"\n Detailed analysis of top {component_type} {top_comp_idx + 1}:") + + # Extract data for top component + benign_top_comp = benign_array[:, top_comp_idx, :].reshape(-1, 4096) + adverse_top_comp = adverse_array[:, top_comp_idx, :].reshape(-1, 4096) + + X_top = np.vstack([benign_top_comp, adverse_top_comp]) + y_top = np.hstack([np.zeros(len(benign_top_comp)), np.ones(len(adverse_top_comp))]) + + scaler_top = StandardScaler() + X_top_scaled = scaler_top.fit_transform(X_top) + + pca_top = PCA(n_components=2) + X_top_pca = pca_top.fit_transform(X_top_scaled) + + # Calculate class separation in PCA space + benign_pca = X_top_pca[y_top == 0] + adverse_pca = X_top_pca[y_top == 1] + + centroid_benign = np.mean(benign_pca, axis=0) + centroid_adverse = np.mean(adverse_pca, axis=0) + distance = np.linalg.norm(centroid_benign - centroid_adverse) + + print(f" Distance between class centroids: {distance:.4f}") + print(f" Variance explained: PC1={pca_top.explained_variance_ratio_[0]:.4f}, PC2={pca_top.explained_variance_ratio_[1]:.4f}") + print(f" Total samples analyzed: {len(X_top)}") + + except Exception as e: + print(f" Error analyzing data: {e}") + continue + +def create_component_visualizations(pca_results, discrimination_scores, component_type): + """ + Create visualizations for PCA analysis results. + """ + # Plot 1: Component discrimination scores + plt.figure(figsize=(15, 6)) + components = [result['component'] for result in pca_results] + scores = discrimination_scores + + plt.bar(components, scores, alpha=0.7, color='skyblue') + plt.xlabel(f'{component_type.capitalize()} Number') + plt.ylabel('PCA Separation Score') + plt.title(f'{component_type.capitalize()} Discrimination Scores') + plt.xticks(components) + plt.grid(True, alpha=0.3) + + # Highlight top 5 components + top_indices = np.argsort(scores)[-5:][::-1] + colors = ['red', 'orange', 'green', 'purple', 'brown'] + for i, idx in enumerate(top_indices): + plt.bar(components[idx], scores[idx], color=colors[i], alpha=0.8, + label=f'{component_type.capitalize()} {components[idx]}: {scores[idx]:.3f}') + + plt.legend() + plt.tight_layout() + plt.savefig(f'visualizations/{component_type}_scores.png', dpi=300, bbox_inches='tight') + plt.close() + + # Plot 2: PCA scatter plots for top 3 components + fig, axes = plt.subplots(1, 3, figsize=(18, 6)) + top_indices = np.argsort(scores)[-3:][::-1] + + for i, idx in enumerate(top_indices): + result = pca_results[idx] + X_pca = result['X_pca'] + y = result['y'] + + # Plot benign vs adverse in PCA space + benign_mask = y == 0 + adverse_mask = y == 1 + + axes[i].scatter(X_pca[benign_mask, 0], X_pca[benign_mask, 1], + alpha=0.6, label='Benign', s=10, color='blue') + axes[i].scatter(X_pca[adverse_mask, 0], X_pca[adverse_mask, 1], + alpha=0.6, label='Adverse', s=10, color='red') + + axes[i].set_xlabel('PC1') + axes[i].set_ylabel('PC2') + axes[i].set_title(f'{component_type.capitalize()} {result["component"]} (Score: {result["score"]:.3f})') + axes[i].legend() + axes[i].grid(True, alpha=0.3) + + plt.suptitle(f'PCA Visualization - Top 3 Discriminative {component_type.capitalize()}s') + plt.tight_layout() + plt.savefig(f'visualizations/{component_type}_pca_scatter.png', dpi=300, bbox_inches='tight') + plt.close() + + # Plot 3: Cumulative variance explained + plt.figure(figsize=(12, 6)) + cumulative_scores = np.cumsum(np.sort(scores)[::-1]) + plt.plot(range(1, len(scores) + 1), cumulative_scores, 'o-', linewidth=2, markersize=8) + plt.xlabel(f'Number of {component_type.capitalize()}s') + plt.ylabel('Cumulative Discrimination Score') + plt.title(f'Cumulative Discrimination - {component_type.capitalize()}s') + plt.grid(True, alpha=0.3) + plt.xticks(range(1, len(scores) + 1)) + plt.tight_layout() + plt.savefig(f'visualizations/{component_type}_cumulative_scores.png', dpi=300, bbox_inches='tight') + plt.close() + + print(f" Visualizations saved to visualizations/{component_type}_*.png") + +def main(): + """Main function to run the script.""" + target_directory = "save_for_eval/AdvBench/alpaca_7B_jailbreak" + + print(f"Checking .npy files in: {target_directory}") + print("=" * 60) + + # Check if directory exists + if not os.path.exists(target_directory): + print(f"Error: Directory '{target_directory}' does not exist!") + return + + # Check and load the .npy files + benign_data, adverse_data = check_npy_shapes(target_directory) + + # Perform PCA analysis to identify discriminative layers + if benign_data and adverse_data: + perform_pca_analysis(benign_data, adverse_data) + + print("\nScript completed successfully!") + + # Return the loaded data for further analysis if needed + return benign_data, adverse_data + +if __name__ == "__main__": + main()