haloscope/steer_vector.py

417 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import torch
import torch.nn as nn
from datasets import load_dataset
from tqdm import tqdm
import numpy as np
import argparse
from train_utils import get_last_non_padded_token_rep, compute_ot_loss_cos, update_centroids_ema, update_centroids_ema_hard, get_ex_data, collate_fn ,split_indices
from transformers import AutoTokenizer, AutoModelForCausalLM
from llm_layers import add_sv_layers
from sklearn.metrics import roc_auc_score
from torch.amp import autocast, GradScaler
import torch.nn.functional as F
import logging
def seed_everything(seed: int):
import random, os
import numpy as np
import torch
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True
def train_model(model, optimizer, device, prompts, labels, args):
"""
对应论文里的两阶段训练:
- Phase 1: 使用人工标注的 exemplar 做 initial training式 (3)(4)(5)(6)
- Phase 2: 使用 OT + Sinkhorn 选出来的伪标签数据做 self-trainingSec.3.3 pseudo-labeling
参数说明:
- model: 冻结好的 LLM + 已经插入 TSV 层的模型(只训练 TSV
- optimizer: 只优化 TSV 的 AdamW
- device: cuda
- prompts: [test_prompts, train_prompts, exemplar_prompts]
- labels: 对应三块的标签 [test_labels, train_labels, exemplar_labels]
- args: 超参数学习率、epoch 数、Sinkhorn 参数等)
"""
layer_number = -1 # 使用最后一层 hidden state这里 -1 当索引)
# ========= 日志 & 结果保存目录 =========
dir_name = f"SV_{args.model_name}_{args.dataset_name}/{args.component}/{args.str_layer}/{args.lam}"
log_dir = f"/{dir_name}/"
log_file = os.path.join(log_dir, f"log.txt")
os.makedirs(dir_name, exist_ok=True)
logging.basicConfig(
filename=log_file,
filemode="w",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logging.info("Starting training")
logging.info(
f"component={args.component}, str_layer={args.str_layer}"
)
# 解包数据test / wild train / exemplar
test_prompts, train_prompts = prompts[0], prompts[1]
test_labels, train_labels = labels[0], labels[1]
batch_size = args.batch_size
losses = []
best_test_auroc = -1
scaler = GradScaler('cuda') # 混合精度的梯度缩放器
num_trains = args.num_train
# 用 exemplar 的标签估计类分布 wtruthful/hallu 的先验)
# ex_hallu = P(hallucinated), ex_true = P(truthful)
# ex_hallu = (num_exemplars - exemplar_labels[:num_exemplars].sum()) / num_exemplars
# ex_true = (exemplar_labels[:num_exemplars].sum()) / num_exemplars
# cls_dist = torch.tensor([ex_hallu, ex_true]).float().cuda()
# cls_dist = cls_dist.view(-1, 1) # 形状 [2, 1]
# 实例化带类别边缘约束的 Sinkhorn实现论文里的 OT 问题)
# sinkhorn = SinkhornKnopp_imb(args, cls_dist)
# ========= 初始化两个类别的“原型向量” μ_c =========
# 这里 centroids 就是论文中球面上的 μ_truthful, μ_hallucinated
centroids = torch.randn((2, model.config.hidden_size)).half().cuda()
centroids = F.normalize(centroids, p=2, dim=1)
# 把 exemplar 的 prompt/label 整理成一个大 batchpadding
train_prompts_, train_labels_ = train_prompts, train_labels
train_prompts, train_labels = collate_fn(train_prompts, train_labels)
# ========= Phase 1: Initial training on exemplars =========
num_epochs = args.init_num_epochs
for epoch in range(num_epochs):
running_loss = 0.0
total = 0
num_samples = num_trains
# 在 exemplar 上分 batch 训练(对应论文中 D_E 只含人工标注数据)
for batch_start in tqdm(
range(0, num_samples, batch_size),
desc=f"Epoch {epoch+1}/{num_epochs} Batches",
leave=False,
):
batch_prompts = train_prompts[batch_start: batch_start + batch_size]
batch_labels = train_labels[batch_start: batch_start + batch_size]
# attention_mask: 1 = valid token, 0 = padding
attention_mask = (batch_prompts != 0).half()
batch_prompts = batch_prompts.to(device)
batch_labels = batch_labels.to(device)
attention_mask = attention_mask.to(batch_prompts.device)
# ======= 前向传播:取最后一层最后非 padding token 的表示 r_v =======
with autocast('cuda', dtype=torch.float16):
output = model(
batch_prompts.squeeze(),
attention_mask=attention_mask.squeeze(),
output_hidden_states=True,
)
hidden_states = output.hidden_states # tuple(len=L+1)
hidden_states = torch.stack(hidden_states, dim=0).squeeze()
last_layer_hidden_state = hidden_states[layer_number] # [B, T, H]
# 对应论文里 r_v最后非 padding token 的 hidden包含 SV steer
last_token_rep = get_last_non_padded_token_rep(
last_layer_hidden_state, attention_mask.squeeze()
)
# 把标签变成 one-hot2 维)
batch_labels_oh = torch.nn.functional.one_hot(
batch_labels, num_classes=-1 # 这里 -1 应该在 utils 里特殊处理成 2
)
# compute_ot_loss_cos对应 Eq.(5),不过是用 OT 的版本:
# - 通过与 centroids 的余弦距离计算类似 softmax 的分类损失
# - 里面会用 args.cos_temp 做温度缩放
ot_loss, similarities = compute_ot_loss_cos(
last_token_rep, centroids, batch_labels_oh, batch_size, args
)
loss = ot_loss
total += batch_labels.size(0)
# ====== 更新类别原型 μ_cEMA对应论文 Eq.(6) ======
with torch.no_grad():
centroids = update_centroids_ema_hard(
centroids, last_token_rep, batch_labels_oh, args
)
# ====== 只对 SV 反传LLM 本体是冻结的 ======
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad()
running_loss += loss.item() * batch_labels.size(0)
# 一个 epoch 的平均 loss
epoch_loss = running_loss / total
# ====== 在 test 上评估,记录 AUROC ======
if (epoch + 1) % 1 == 0:
test_labels_ = test_labels
test_predictions, test_labels= test_model(
model, centroids, test_prompts, test_labels_, device, batch_size, layer_number
)
test_auroc = roc_auc_score(
test_labels.cpu().numpy(), test_predictions.cpu().numpy()
)
print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}")
logging.info(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}")
losses.append(epoch_loss)
if test_auroc > best_test_auroc:
best_test_auroc = test_auroc
best_test_epoch = epoch
print(f"Best test AUROC: {best_test_auroc:.4f}, at epoch: {best_test_epoch}")
logging.info(
f"Best test AUROC: {best_test_auroc:.4f}, at epoch: {best_test_epoch}"
)
# 保存最佳AUROC时的centroids到npy文件
centroids_np = centroids.cpu().numpy()
centroids_file = os.path.join(dir_name, f"best_centroids_epoch_{epoch}.npy")
np.save(centroids_file, centroids_np)
print(f"Saved best centroids to {centroids_file}")
logging.info(f"Saved best centroids to {centroids_file}")
logging.info(
f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {epoch_loss:.4f}, "
)
logging.info(f"Test AUROC: {test_auroc:.4f}")
print(f"Epoch [{epoch+1}/{num_epochs}],Test AUROC: {test_auroc:.4f}")
return best_test_auroc
def test_model(model, centroids, test_prompts, test_labels, device, batch_size, layer_number):
"""
在论文里相当于:
- 对 test 集算当前 TV steer 后的 r_v
- 计算与两个原型 μ_c 的余弦相似度
- 用 softmax 得到属于 truthful 的概率 p(c=truthful | r_v)
- 用这个概率做 AUROC 评估
"""
model.eval()
val_predictions = [] # 保存 p
val_labels = [] # 对应的(二分类标签)
num_val_samples = len(test_prompts)
with torch.no_grad():
with autocast('cuda', dtype=torch.float16):
for batch_start in range(0, num_val_samples, batch_size):
batch_prompts = test_prompts[batch_start:batch_start + batch_size]
batch_labels = test_labels[batch_start:batch_start + batch_size]
batch_prompts, batch_labels = collate_fn(batch_prompts, batch_labels)
attention_mask = (batch_prompts != 0).half().to(device)
batch_prompts = batch_prompts.to(device)
batch_labels = batch_labels.to(device)
# 直接 forward + 拿最后一个非 padding token 的 hidden 表示作为 r_v
output = model(
batch_prompts.squeeze(),
attention_mask=attention_mask.squeeze(),
output_hidden_states=True,
)
hidden_states = output.hidden_states
hidden_states = torch.stack(hidden_states, dim=0).squeeze()
last_layer_hidden_state = hidden_states[layer_number]
last_token_rep = get_last_non_padded_token_rep(
last_layer_hidden_state, attention_mask.squeeze()
)
# 归一化到球面 & 原型也归一化
last_token_rep = F.normalize(last_token_rep, p=2, dim=-1)
centroids = F.normalize(centroids, p=2, dim=-1)
with autocast('cuda', dtype=torch.float16):
similarities = torch.matmul(last_token_rep, centroids.T) # [B, 2]
# 相似度 / 温度 -> softmax -> 取“真”的那一维作为概率
similarity_scores = torch.softmax(similarities / 0.1, dim=-1)
similarity_scores = similarity_scores[:, 1] # 假设 index 1 = truthful
val_predictions.append(similarity_scores.cpu())
val_labels.append(batch_labels.cpu())
val_predictions = torch.cat(val_predictions)
val_labels = torch.cat(val_labels)
return val_predictions, val_labels
HF_NAMES = {
'llama3.1-8B': 'meta-llama/Meta-Llama-3.1-8B',
'qwen2.5-7B': 'Qwen/Qwen2.5-7B' ,
'llama_7B': 'baffo32/decapoda-research-llama-7B-hf',
'honest_llama_7B': 'validation/results_dump/llama_7B_seed_42_top_48_heads_alpha_15',
'alpaca_7B': 'circulus/alpaca-7b',
'vicuna_7B': 'AlekseyKorshuk/vicuna-7b',
'llama2_chat_7B': 'models/Llama-2-7b-chat-hf',
'llama2_chat_13B': 'models/Llama-2-13b-chat-hf',
'llama2_chat_70B': 'meta-llama/Llama-2-70b-chat-hf',
}
def main():
# ======================
# 1. 解析命令行参数
# ======================
parser = argparse.ArgumentParser()
parser.add_argument('--model_name', type=str, default='alpaca_7B')
parser.add_argument('--num_gene', type=int, default=1) # 每个问题生成多少个答案
parser.add_argument('--train_sv', type=bool, default=True) # 是否执行“生成答案”阶段1=生成+保存答案0=不生成)
parser.add_argument('--steer_place', type=str, default='layer', help='where to steer (layer, mlp and head)') # 是否执行“生成答案”阶段1=生成+保存答案0=不生成)
parser.add_argument('--dataset_name', type=str, default='AdvBench') # 使用哪个数据集
parser.add_argument('--device', type=int, default=0) # GPU id没怎么用device_map="auto" 为主)
parser.add_argument("--model_dir", type=str, default=None, help='local directory with model data')
parser.add_argument("--batch_size", type=int, default=64)
parser.add_argument("--cos_temp", type=float, default=0.1) # 余弦相似度的 softmax 温度
parser.add_argument("--ema_decay", type=float, default=0.99) # 原型向量 EMA 衰减系数
parser.add_argument("--lr", type=float, default=0.005) # SV 的学习率
parser.add_argument("--str_layer", type=int, default=9) # 插 SV 的层号(第几层 hidden
parser.add_argument("--component", type=str, default='res') # SV 以何种方式注入(残差等)
parser.add_argument("--lam", type=float, default=5) # SV 强度 λ
parser.add_argument("--init_num_epochs", type=int, default=20) # SV训练轮数
parser.add_argument("--optimizer", type=str, default='AdamW')
parser.add_argument('--train_ratio', type=float, default=0.8)
parser.add_argument('--val_ratio', type=float, default=0.1)
args = parser.parse_args()
# ======================
# 2. 确定 HuggingFace 模型名
# ======================
# HF_NAMES 在文件前面定义:
# HF_NAMES = {'llama3.1-8B': 'meta-llama/Meta-Llama-3.1-8B', 'qwen2.5-7B': 'Qwen/Qwen2.5-7B'}
# model_prefix 可以拼接一些前缀,这里默认是空
model_name_or_path = HF_NAMES[args.model_prefix + args.model_name]
# ======================
# 3. 加载不同的数据集
# ======================
if args.dataset_name == "AdvBench":
dataset = load_dataset("LeeWlving/Safety_Datasets", "advbench")['test']
elif args.dataset_name == 'HarmBench':
dataset = load_dataset("LeeWlving/Safety_Datasets", "harmbench")['standard']
elif args.dataset_name == 'JBB':
dataset = load_dataset("LeeWlving/Safety_Datasets", "JBB")['test']
else:
assert "Not supported dataset name!"
# ============================================================
# 6. 模式三SV 训练阶段gene=0 且 generate_gt=0
# ============================================================
if args.train_sv:
device = torch.device("cuda")
# 加载基础 LLM之后会在其上插 TSV
model = AutoModelForCausalLM.from_pretrained(
model_name_or_path,
low_cpu_mem_usage=True,
torch_dtype=torch.float16,
device_map="auto",
token = ''
)
tokenizer = AutoTokenizer.from_pretrained(model_name_or_path, token = '')
prompts_ = []
qa_pairs = []
categories = []
length = len(dataset)
for i in tqdm(range(length)):
question = dataset[i]['query']
adversary = dataset[i]['target']
prompt = tokenizer(
f"Answer the question concisely. Q: {question}" + " A:" + adversary,
return_tensors='pt'
).input_ids.cuda()
prompts_.append(prompt)
qa_pairs.append({'Question': question, 'Answer': adversary})
categories.append(0) # 0 = adverse
for i in tqdm(range(length)):
question = dataset[i]['query']
clean= dataset[i]['response']
prompt = tokenizer(
f"Answer the question concisely. Q: {question}" + " A:" + clean,
return_tensors='pt'
).input_ids.cuda()
prompts_.append(prompt)
qa_pairs.append({'Question': question, 'Answer': clean})
categories.append(1) # 1 = benign
train_index, val_index, test_index=split_indices(len(prompts_), args.train_ratio, args.val_ratio)
labels = [categories[test_index], categories[train_index]]
prompts = [prompts_[test_index], prompts_[train_index]]
# ====== 6.4 冻结 LLM只训练 SV 参数 ======
num_layers = model.config.num_hidden_layers
hidden_size = model.config.hidden_size
for param in model.parameters():
param.requires_grad = False
# 每一层一个 SV 向量(虽然实际只在某几层生效)
sv = nn.ParameterList(
[nn.Parameter(torch.zeros(hidden_size), requires_grad=True) for _ in range(num_layers)]
)
sv.to(device)
# 在模型对应层里插入 SVadd_sv_layers 会改 forward
add_sv_layers(model, sv, [args.lam], args)
# 只把 SV 的参数丢给优化器
optimizer = torch.optim.AdamW(list(sv.parameters()), lr=args.lr)
# ====== 6.5 调用 train_model进入两阶段训练流程 ======
train_model(model, optimizer, device, prompts, labels, args=args)
else:
print("Skip training steer vectors.")
raise NotImplementedError("Only training SV is implemented in this script.")
if __name__ == '__main__':
# 为了结果可复现,固定随机种子
seed_everything(42)
main()