haloscope/visualization.py

316 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Script to check the shape of *.npy files in save_for_eval/AdvBench/alpaca_7B_jailbreak
load them as benign vs adverse for comparison, and perform PCA analysis to identify
the most discriminative layers.
"""
import numpy as np
import os
import glob
from pathlib import Path
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
def check_npy_shapes(directory_path):
"""
Check shapes of all .npy files in the specified directory
and load them as benign vs adverse pairs.
"""
# Get all .npy files in the directory
npy_files = glob.glob(os.path.join(directory_path, "*.npy"))
if not npy_files:
print(f"No .npy files found in {directory_path}")
return
print(f"Found {len(npy_files)} .npy files:")
print("-" * 50)
# Separate files into benign and adverse
benign_files = [f for f in npy_files if "benign" in f.lower()]
adverse_files = [f for f in npy_files if "adverse" in f.lower()]
# Check shapes and load data
benign_data = {}
adverse_data = {}
# Process benign files
print("BENIGN FILES:")
for file_path in benign_files:
try:
data = np.load(file_path)
file_name = os.path.basename(file_path)
benign_data[file_name] = data
print(f" {file_name}: shape {data.shape}, dtype {data.dtype}")
except Exception as e:
print(f" Error loading {file_path}: {e}")
print("\nADVERSE FILES:")
for file_path in adverse_files:
try:
data = np.load(file_path)
file_name = os.path.basename(file_path)
adverse_data[file_name] = data
print(f" {file_name}: shape {data.shape}, dtype {data.dtype}")
except Exception as e:
print(f" Error loading {file_path}: {e}")
print("\n" + "="*50)
print("COMPARISON SUMMARY:")
print("="*50)
# Compare corresponding benign and adverse files
for benign_name in benign_data:
# Find corresponding adverse file
adverse_name = benign_name.replace("benign", "adverse")
if adverse_name in adverse_data:
benign_array = benign_data[benign_name]
adverse_array = adverse_data[adverse_name]
print(f"\nComparing {benign_name} vs {adverse_name}:")
print(f" Shapes: {benign_array.shape} vs {adverse_array.shape}")
print(f" Dtypes: {benign_array.dtype} vs {adverse_array.dtype}")
if benign_array.shape == adverse_array.shape:
print(f" ✓ Shapes match")
# Calculate some basic statistics for comparison
diff = np.abs(benign_array - adverse_array)
print(f" Mean absolute difference: {np.mean(diff):.6f}")
print(f" Max absolute difference: {np.max(diff):.6f}")
print(f" Min absolute difference: {np.min(diff):.6f}")
else:
print(f" ✗ Shapes don't match!")
print("-" * 30)
return benign_data, adverse_data
def reshape_for_layer_analysis(data_array, n_samples=234, n_layers=15):
"""
Reshape data from (3510, *, 4096) to (n_samples, n_layers, *, 4096)
where 3510 = 234 samples × 15 layers
"""
if data_array.shape[0] != 3510:
raise ValueError(f"Expected first dimension to be 3510, got {data_array.shape[0]}")
# Reshape to (234, 15, *, 4096)
if len(data_array.shape) == 3:
# For head_wise and mlp_wise: (3510, 32, 4096) -> (234, 15, 32, 4096)
# For layer_wise: (3510, 33, 4096) -> (234, 15, 33, 4096)
reshaped = data_array.reshape(n_samples, n_layers, data_array.shape[1], data_array.shape[2])
elif len(data_array.shape) == 2:
# For simpler cases: (3510, 4096) -> (234, 15, 4096)
reshaped = data_array.reshape(n_samples, n_layers, data_array.shape[1])
else:
raise ValueError(f"Unsupported array shape: {data_array.shape}")
return reshaped
def perform_pca_analysis(benign_data, adverse_data):
"""
Perform PCA analysis to identify the most discriminative components
(heads or layers) between benign and adverse data.
"""
print("\n" + "="*60)
print("PCA ANALYSIS - IDENTIFYING DISCRIMINATIVE COMPONENTS")
print("="*60)
# Create output directory for visualizations
os.makedirs("visualizations", exist_ok=True)
for benign_name in benign_data:
adverse_name = benign_name.replace("benign", "adverse")
if adverse_name in adverse_data:
benign_array = benign_data[benign_name]
adverse_array = adverse_data[adverse_name]
print(f"\nAnalyzing {benign_name}:")
print(f" Original shape: {benign_array.shape}")
try:
# Get the number of components (32 for heads/mlp, 33 for layers)
n_components = benign_array.shape[1]
component_type = "head" if "head" in benign_name else ("layer" if "layer" in benign_name else "mlp")
print(f" Analyzing {n_components} {component_type}s")
# Prepare data for PCA: analyze each component separately
component_discrimination_scores = []
all_pca_results = []
for comp_idx in range(n_components):
# Extract data for this specific component across all samples
benign_comp_data = benign_array[:, comp_idx, :].reshape(-1, 4096)
adverse_comp_data = adverse_array[:, comp_idx, :].reshape(-1, 4096)
# Combine benign and adverse data for this component
X = np.vstack([benign_comp_data, adverse_comp_data])
y = np.hstack([np.zeros(len(benign_comp_data)), np.ones(len(adverse_comp_data))])
# Standardize the data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Perform PCA
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)
# Calculate separation score (variance explained by PC1)
separation_score = pca.explained_variance_ratio_[0]
component_discrimination_scores.append(separation_score)
# Store PCA results for visualization
all_pca_results.append({
'component': comp_idx + 1,
'X_pca': X_pca,
'y': y,
'pca': pca,
'score': separation_score
})
print(f" {component_type.capitalize()} {comp_idx + 1}: PCA separation score = {separation_score:.4f}")
# Create visualizations
create_component_visualizations(all_pca_results, component_discrimination_scores, component_type)
# Identify the most discriminative components
sorted_components = np.argsort(component_discrimination_scores)[::-1]
print(f"\n Most discriminative {component_type}s (highest to lowest):")
for i, comp_idx in enumerate(sorted_components):
print(f" {component_type.capitalize()} {comp_idx + 1}: {component_discrimination_scores[comp_idx]:.4f}")
# Analyze the top discriminative component in more detail
top_comp_idx = sorted_components[0]
print(f"\n Detailed analysis of top {component_type} {top_comp_idx + 1}:")
# Extract data for top component
benign_top_comp = benign_array[:, top_comp_idx, :].reshape(-1, 4096)
adverse_top_comp = adverse_array[:, top_comp_idx, :].reshape(-1, 4096)
X_top = np.vstack([benign_top_comp, adverse_top_comp])
y_top = np.hstack([np.zeros(len(benign_top_comp)), np.ones(len(adverse_top_comp))])
scaler_top = StandardScaler()
X_top_scaled = scaler_top.fit_transform(X_top)
pca_top = PCA(n_components=2)
X_top_pca = pca_top.fit_transform(X_top_scaled)
# Calculate class separation in PCA space
benign_pca = X_top_pca[y_top == 0]
adverse_pca = X_top_pca[y_top == 1]
centroid_benign = np.mean(benign_pca, axis=0)
centroid_adverse = np.mean(adverse_pca, axis=0)
distance = np.linalg.norm(centroid_benign - centroid_adverse)
print(f" Distance between class centroids: {distance:.4f}")
print(f" Variance explained: PC1={pca_top.explained_variance_ratio_[0]:.4f}, PC2={pca_top.explained_variance_ratio_[1]:.4f}")
print(f" Total samples analyzed: {len(X_top)}")
except Exception as e:
print(f" Error analyzing data: {e}")
continue
def create_component_visualizations(pca_results, discrimination_scores, component_type):
"""
Create visualizations for PCA analysis results.
"""
# Plot 1: Component discrimination scores
plt.figure(figsize=(15, 6))
components = [result['component'] for result in pca_results]
scores = discrimination_scores
plt.bar(components, scores, alpha=0.7, color='skyblue')
plt.xlabel(f'{component_type.capitalize()} Number')
plt.ylabel('PCA Separation Score')
plt.title(f'{component_type.capitalize()} Discrimination Scores')
plt.xticks(components)
plt.grid(True, alpha=0.3)
# Highlight top 5 components
top_indices = np.argsort(scores)[-5:][::-1]
colors = ['red', 'orange', 'green', 'purple', 'brown']
for i, idx in enumerate(top_indices):
plt.bar(components[idx], scores[idx], color=colors[i], alpha=0.8,
label=f'{component_type.capitalize()} {components[idx]}: {scores[idx]:.3f}')
plt.legend()
plt.tight_layout()
plt.savefig(f'visualizations/{component_type}_scores.png', dpi=300, bbox_inches='tight')
plt.close()
# Plot 2: PCA scatter plots for top 3 components
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
top_indices = np.argsort(scores)[-3:][::-1]
for i, idx in enumerate(top_indices):
result = pca_results[idx]
X_pca = result['X_pca']
y = result['y']
# Plot benign vs adverse in PCA space
benign_mask = y == 0
adverse_mask = y == 1
axes[i].scatter(X_pca[benign_mask, 0], X_pca[benign_mask, 1],
alpha=0.6, label='Benign', s=10, color='blue')
axes[i].scatter(X_pca[adverse_mask, 0], X_pca[adverse_mask, 1],
alpha=0.6, label='Adverse', s=10, color='red')
axes[i].set_xlabel('PC1')
axes[i].set_ylabel('PC2')
axes[i].set_title(f'{component_type.capitalize()} {result["component"]} (Score: {result["score"]:.3f})')
axes[i].legend()
axes[i].grid(True, alpha=0.3)
plt.suptitle(f'PCA Visualization - Top 3 Discriminative {component_type.capitalize()}s')
plt.tight_layout()
plt.savefig(f'visualizations/{component_type}_pca_scatter.png', dpi=300, bbox_inches='tight')
plt.close()
# Plot 3: Cumulative variance explained
plt.figure(figsize=(12, 6))
cumulative_scores = np.cumsum(np.sort(scores)[::-1])
plt.plot(range(1, len(scores) + 1), cumulative_scores, 'o-', linewidth=2, markersize=8)
plt.xlabel(f'Number of {component_type.capitalize()}s')
plt.ylabel('Cumulative Discrimination Score')
plt.title(f'Cumulative Discrimination - {component_type.capitalize()}s')
plt.grid(True, alpha=0.3)
plt.xticks(range(1, len(scores) + 1))
plt.tight_layout()
plt.savefig(f'visualizations/{component_type}_cumulative_scores.png', dpi=300, bbox_inches='tight')
plt.close()
print(f" Visualizations saved to visualizations/{component_type}_*.png")
def main():
"""Main function to run the script."""
target_directory = "save_for_eval/AdvBench/alpaca_7B_jailbreak"
print(f"Checking .npy files in: {target_directory}")
print("=" * 60)
# Check if directory exists
if not os.path.exists(target_directory):
print(f"Error: Directory '{target_directory}' does not exist!")
return
# Check and load the .npy files
benign_data, adverse_data = check_npy_shapes(target_directory)
# Perform PCA analysis to identify discriminative layers
if benign_data and adverse_data:
perform_pca_analysis(benign_data, adverse_data)
print("\nScript completed successfully!")
# Return the loaded data for further analysis if needed
return benign_data, adverse_data
if __name__ == "__main__":
main()