This commit is contained in:
parent
de5d73d548
commit
fb6cf42548
|
|
@ -0,0 +1,315 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Script to check the shape of *.npy files in save_for_eval/AdvBench/alpaca_7B_jailbreak
|
||||
load them as benign vs adverse for comparison, and perform PCA analysis to identify
|
||||
the most discriminative layers.
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import os
|
||||
import glob
|
||||
from pathlib import Path
|
||||
from sklearn.decomposition import PCA
|
||||
from sklearn.preprocessing import StandardScaler
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
def check_npy_shapes(directory_path):
|
||||
"""
|
||||
Check shapes of all .npy files in the specified directory
|
||||
and load them as benign vs adverse pairs.
|
||||
"""
|
||||
# Get all .npy files in the directory
|
||||
npy_files = glob.glob(os.path.join(directory_path, "*.npy"))
|
||||
|
||||
if not npy_files:
|
||||
print(f"No .npy files found in {directory_path}")
|
||||
return
|
||||
|
||||
print(f"Found {len(npy_files)} .npy files:")
|
||||
print("-" * 50)
|
||||
|
||||
# Separate files into benign and adverse
|
||||
benign_files = [f for f in npy_files if "benign" in f.lower()]
|
||||
adverse_files = [f for f in npy_files if "adverse" in f.lower()]
|
||||
|
||||
# Check shapes and load data
|
||||
benign_data = {}
|
||||
adverse_data = {}
|
||||
|
||||
# Process benign files
|
||||
print("BENIGN FILES:")
|
||||
for file_path in benign_files:
|
||||
try:
|
||||
data = np.load(file_path)
|
||||
file_name = os.path.basename(file_path)
|
||||
benign_data[file_name] = data
|
||||
print(f" {file_name}: shape {data.shape}, dtype {data.dtype}")
|
||||
except Exception as e:
|
||||
print(f" Error loading {file_path}: {e}")
|
||||
|
||||
print("\nADVERSE FILES:")
|
||||
for file_path in adverse_files:
|
||||
try:
|
||||
data = np.load(file_path)
|
||||
file_name = os.path.basename(file_path)
|
||||
adverse_data[file_name] = data
|
||||
print(f" {file_name}: shape {data.shape}, dtype {data.dtype}")
|
||||
except Exception as e:
|
||||
print(f" Error loading {file_path}: {e}")
|
||||
|
||||
print("\n" + "="*50)
|
||||
print("COMPARISON SUMMARY:")
|
||||
print("="*50)
|
||||
|
||||
# Compare corresponding benign and adverse files
|
||||
for benign_name in benign_data:
|
||||
# Find corresponding adverse file
|
||||
adverse_name = benign_name.replace("benign", "adverse")
|
||||
if adverse_name in adverse_data:
|
||||
benign_array = benign_data[benign_name]
|
||||
adverse_array = adverse_data[adverse_name]
|
||||
|
||||
print(f"\nComparing {benign_name} vs {adverse_name}:")
|
||||
print(f" Shapes: {benign_array.shape} vs {adverse_array.shape}")
|
||||
print(f" Dtypes: {benign_array.dtype} vs {adverse_array.dtype}")
|
||||
|
||||
if benign_array.shape == adverse_array.shape:
|
||||
print(f" ✓ Shapes match")
|
||||
|
||||
# Calculate some basic statistics for comparison
|
||||
diff = np.abs(benign_array - adverse_array)
|
||||
print(f" Mean absolute difference: {np.mean(diff):.6f}")
|
||||
print(f" Max absolute difference: {np.max(diff):.6f}")
|
||||
print(f" Min absolute difference: {np.min(diff):.6f}")
|
||||
else:
|
||||
print(f" ✗ Shapes don't match!")
|
||||
|
||||
print("-" * 30)
|
||||
|
||||
return benign_data, adverse_data
|
||||
|
||||
def reshape_for_layer_analysis(data_array, n_samples=234, n_layers=15):
|
||||
"""
|
||||
Reshape data from (3510, *, 4096) to (n_samples, n_layers, *, 4096)
|
||||
where 3510 = 234 samples × 15 layers
|
||||
"""
|
||||
if data_array.shape[0] != 3510:
|
||||
raise ValueError(f"Expected first dimension to be 3510, got {data_array.shape[0]}")
|
||||
|
||||
# Reshape to (234, 15, *, 4096)
|
||||
if len(data_array.shape) == 3:
|
||||
# For head_wise and mlp_wise: (3510, 32, 4096) -> (234, 15, 32, 4096)
|
||||
# For layer_wise: (3510, 33, 4096) -> (234, 15, 33, 4096)
|
||||
reshaped = data_array.reshape(n_samples, n_layers, data_array.shape[1], data_array.shape[2])
|
||||
elif len(data_array.shape) == 2:
|
||||
# For simpler cases: (3510, 4096) -> (234, 15, 4096)
|
||||
reshaped = data_array.reshape(n_samples, n_layers, data_array.shape[1])
|
||||
else:
|
||||
raise ValueError(f"Unsupported array shape: {data_array.shape}")
|
||||
|
||||
return reshaped
|
||||
|
||||
def perform_pca_analysis(benign_data, adverse_data):
|
||||
"""
|
||||
Perform PCA analysis to identify the most discriminative components
|
||||
(heads or layers) between benign and adverse data.
|
||||
"""
|
||||
print("\n" + "="*60)
|
||||
print("PCA ANALYSIS - IDENTIFYING DISCRIMINATIVE COMPONENTS")
|
||||
print("="*60)
|
||||
|
||||
# Create output directory for visualizations
|
||||
os.makedirs("visualizations", exist_ok=True)
|
||||
|
||||
for benign_name in benign_data:
|
||||
adverse_name = benign_name.replace("benign", "adverse")
|
||||
if adverse_name in adverse_data:
|
||||
benign_array = benign_data[benign_name]
|
||||
adverse_array = adverse_data[adverse_name]
|
||||
|
||||
print(f"\nAnalyzing {benign_name}:")
|
||||
print(f" Original shape: {benign_array.shape}")
|
||||
|
||||
try:
|
||||
# Get the number of components (32 for heads/mlp, 33 for layers)
|
||||
n_components = benign_array.shape[1]
|
||||
component_type = "head" if "head" in benign_name else ("layer" if "layer" in benign_name else "mlp")
|
||||
|
||||
print(f" Analyzing {n_components} {component_type}s")
|
||||
|
||||
# Prepare data for PCA: analyze each component separately
|
||||
component_discrimination_scores = []
|
||||
all_pca_results = []
|
||||
|
||||
for comp_idx in range(n_components):
|
||||
# Extract data for this specific component across all samples
|
||||
benign_comp_data = benign_array[:, comp_idx, :].reshape(-1, 4096)
|
||||
adverse_comp_data = adverse_array[:, comp_idx, :].reshape(-1, 4096)
|
||||
|
||||
# Combine benign and adverse data for this component
|
||||
X = np.vstack([benign_comp_data, adverse_comp_data])
|
||||
y = np.hstack([np.zeros(len(benign_comp_data)), np.ones(len(adverse_comp_data))])
|
||||
|
||||
# Standardize the data
|
||||
scaler = StandardScaler()
|
||||
X_scaled = scaler.fit_transform(X)
|
||||
|
||||
# Perform PCA
|
||||
pca = PCA(n_components=2)
|
||||
X_pca = pca.fit_transform(X_scaled)
|
||||
|
||||
# Calculate separation score (variance explained by PC1)
|
||||
separation_score = pca.explained_variance_ratio_[0]
|
||||
component_discrimination_scores.append(separation_score)
|
||||
|
||||
# Store PCA results for visualization
|
||||
all_pca_results.append({
|
||||
'component': comp_idx + 1,
|
||||
'X_pca': X_pca,
|
||||
'y': y,
|
||||
'pca': pca,
|
||||
'score': separation_score
|
||||
})
|
||||
|
||||
print(f" {component_type.capitalize()} {comp_idx + 1}: PCA separation score = {separation_score:.4f}")
|
||||
|
||||
# Create visualizations
|
||||
create_component_visualizations(all_pca_results, component_discrimination_scores, component_type)
|
||||
|
||||
# Identify the most discriminative components
|
||||
sorted_components = np.argsort(component_discrimination_scores)[::-1]
|
||||
print(f"\n Most discriminative {component_type}s (highest to lowest):")
|
||||
for i, comp_idx in enumerate(sorted_components):
|
||||
print(f" {component_type.capitalize()} {comp_idx + 1}: {component_discrimination_scores[comp_idx]:.4f}")
|
||||
|
||||
# Analyze the top discriminative component in more detail
|
||||
top_comp_idx = sorted_components[0]
|
||||
print(f"\n Detailed analysis of top {component_type} {top_comp_idx + 1}:")
|
||||
|
||||
# Extract data for top component
|
||||
benign_top_comp = benign_array[:, top_comp_idx, :].reshape(-1, 4096)
|
||||
adverse_top_comp = adverse_array[:, top_comp_idx, :].reshape(-1, 4096)
|
||||
|
||||
X_top = np.vstack([benign_top_comp, adverse_top_comp])
|
||||
y_top = np.hstack([np.zeros(len(benign_top_comp)), np.ones(len(adverse_top_comp))])
|
||||
|
||||
scaler_top = StandardScaler()
|
||||
X_top_scaled = scaler_top.fit_transform(X_top)
|
||||
|
||||
pca_top = PCA(n_components=2)
|
||||
X_top_pca = pca_top.fit_transform(X_top_scaled)
|
||||
|
||||
# Calculate class separation in PCA space
|
||||
benign_pca = X_top_pca[y_top == 0]
|
||||
adverse_pca = X_top_pca[y_top == 1]
|
||||
|
||||
centroid_benign = np.mean(benign_pca, axis=0)
|
||||
centroid_adverse = np.mean(adverse_pca, axis=0)
|
||||
distance = np.linalg.norm(centroid_benign - centroid_adverse)
|
||||
|
||||
print(f" Distance between class centroids: {distance:.4f}")
|
||||
print(f" Variance explained: PC1={pca_top.explained_variance_ratio_[0]:.4f}, PC2={pca_top.explained_variance_ratio_[1]:.4f}")
|
||||
print(f" Total samples analyzed: {len(X_top)}")
|
||||
|
||||
except Exception as e:
|
||||
print(f" Error analyzing data: {e}")
|
||||
continue
|
||||
|
||||
def create_component_visualizations(pca_results, discrimination_scores, component_type):
|
||||
"""
|
||||
Create visualizations for PCA analysis results.
|
||||
"""
|
||||
# Plot 1: Component discrimination scores
|
||||
plt.figure(figsize=(15, 6))
|
||||
components = [result['component'] for result in pca_results]
|
||||
scores = discrimination_scores
|
||||
|
||||
plt.bar(components, scores, alpha=0.7, color='skyblue')
|
||||
plt.xlabel(f'{component_type.capitalize()} Number')
|
||||
plt.ylabel('PCA Separation Score')
|
||||
plt.title(f'{component_type.capitalize()} Discrimination Scores')
|
||||
plt.xticks(components)
|
||||
plt.grid(True, alpha=0.3)
|
||||
|
||||
# Highlight top 5 components
|
||||
top_indices = np.argsort(scores)[-5:][::-1]
|
||||
colors = ['red', 'orange', 'green', 'purple', 'brown']
|
||||
for i, idx in enumerate(top_indices):
|
||||
plt.bar(components[idx], scores[idx], color=colors[i], alpha=0.8,
|
||||
label=f'{component_type.capitalize()} {components[idx]}: {scores[idx]:.3f}')
|
||||
|
||||
plt.legend()
|
||||
plt.tight_layout()
|
||||
plt.savefig(f'visualizations/{component_type}_scores.png', dpi=300, bbox_inches='tight')
|
||||
plt.close()
|
||||
|
||||
# Plot 2: PCA scatter plots for top 3 components
|
||||
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
|
||||
top_indices = np.argsort(scores)[-3:][::-1]
|
||||
|
||||
for i, idx in enumerate(top_indices):
|
||||
result = pca_results[idx]
|
||||
X_pca = result['X_pca']
|
||||
y = result['y']
|
||||
|
||||
# Plot benign vs adverse in PCA space
|
||||
benign_mask = y == 0
|
||||
adverse_mask = y == 1
|
||||
|
||||
axes[i].scatter(X_pca[benign_mask, 0], X_pca[benign_mask, 1],
|
||||
alpha=0.6, label='Benign', s=10, color='blue')
|
||||
axes[i].scatter(X_pca[adverse_mask, 0], X_pca[adverse_mask, 1],
|
||||
alpha=0.6, label='Adverse', s=10, color='red')
|
||||
|
||||
axes[i].set_xlabel('PC1')
|
||||
axes[i].set_ylabel('PC2')
|
||||
axes[i].set_title(f'{component_type.capitalize()} {result["component"]} (Score: {result["score"]:.3f})')
|
||||
axes[i].legend()
|
||||
axes[i].grid(True, alpha=0.3)
|
||||
|
||||
plt.suptitle(f'PCA Visualization - Top 3 Discriminative {component_type.capitalize()}s')
|
||||
plt.tight_layout()
|
||||
plt.savefig(f'visualizations/{component_type}_pca_scatter.png', dpi=300, bbox_inches='tight')
|
||||
plt.close()
|
||||
|
||||
# Plot 3: Cumulative variance explained
|
||||
plt.figure(figsize=(12, 6))
|
||||
cumulative_scores = np.cumsum(np.sort(scores)[::-1])
|
||||
plt.plot(range(1, len(scores) + 1), cumulative_scores, 'o-', linewidth=2, markersize=8)
|
||||
plt.xlabel(f'Number of {component_type.capitalize()}s')
|
||||
plt.ylabel('Cumulative Discrimination Score')
|
||||
plt.title(f'Cumulative Discrimination - {component_type.capitalize()}s')
|
||||
plt.grid(True, alpha=0.3)
|
||||
plt.xticks(range(1, len(scores) + 1))
|
||||
plt.tight_layout()
|
||||
plt.savefig(f'visualizations/{component_type}_cumulative_scores.png', dpi=300, bbox_inches='tight')
|
||||
plt.close()
|
||||
|
||||
print(f" Visualizations saved to visualizations/{component_type}_*.png")
|
||||
|
||||
def main():
|
||||
"""Main function to run the script."""
|
||||
target_directory = "save_for_eval/AdvBench/alpaca_7B_jailbreak"
|
||||
|
||||
print(f"Checking .npy files in: {target_directory}")
|
||||
print("=" * 60)
|
||||
|
||||
# Check if directory exists
|
||||
if not os.path.exists(target_directory):
|
||||
print(f"Error: Directory '{target_directory}' does not exist!")
|
||||
return
|
||||
|
||||
# Check and load the .npy files
|
||||
benign_data, adverse_data = check_npy_shapes(target_directory)
|
||||
|
||||
# Perform PCA analysis to identify discriminative layers
|
||||
if benign_data and adverse_data:
|
||||
perform_pca_analysis(benign_data, adverse_data)
|
||||
|
||||
print("\nScript completed successfully!")
|
||||
|
||||
# Return the loaded data for further analysis if needed
|
||||
return benign_data, adverse_data
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in New Issue