【Python实现机器遗忘算法】复现2023年TNNLS期刊算法UNSIR
1 算法原理
Tarun A K, Chundawat V S, Mandal M, et al. Fast yet effective machine unlearning[J]. IEEE Transactions on Neural Networks and Learning Systems, 2023.
本文提出了一种名为 UNSIR(Unlearning with Single Pass Impair and Repair) 的机器遗忘框架,用于从深度神经网络中高效地卸载(遗忘)特定类别数据,同时保留模型对其他数据的性能。以下是算法的主要步骤:
1. 零隐私设置(Zero-Glance Privacy Setting)
- 假设:用户请求从已训练的模型中删除其数据(例如人脸图像),并且模型无法再访问这些数据,即使是为了权重调整。
- 目标:在不重新训练模型的情况下,使模型忘记特定类别的数据,同时保留对其他数据的性能。
2. 学习误差最大化噪声矩阵(Error-Maximizing Noise Matrix)
-
初始化:随机初始化噪声矩阵 N,其大小与模型输入相同。
-
优化目标:通过最大化模型对目标类别的损失函数来优化噪声矩阵 N。具体优化问题为:
a r g N m i n E ( θ ) = − L ( f , y ) + λ ∥ w n o i s e ∥ argNminE(θ)=−L(f,y)+λ∥wnoise∥ argNminE(θ)=−L(f,y)+λ∥wnoise∥其中:
- L(f,y) 是针对要卸载的类别的分类损失函数。
- λ∥wnoise∥ 是正则化项,防止噪声值过大。
- 使用交叉熵损失函数 L 和 L2 归一化。
-
噪声矩阵的作用:生成的噪声矩阵 N 与要卸载的类别标签相关联,用于在后续步骤中破坏模型对这些类别的记忆。
3. 单次损伤与修复(Single Pass Impair and Repair)
- 损伤步骤(Impair Step):
- 操作:将噪声矩阵 N 与保留数据子集Dr结合,训练模型一个周期(epoch)。
- 目的:通过高学习率(例如 0.02)快速破坏模型对要卸载类别的权重。
- 结果:模型对要卸载类别的性能显著下降,同时对保留类别的性能也会受到一定影响。
- 修复步骤(Repair Step):
- 操作:仅使用保留数据子集 Dr再次训练模型一个周期(epoch),学习率较低(例如 0.01)。
- 目的:恢复模型对保留类别的性能,同时保持对要卸载类别的遗忘效果。
- 结果:最终模型在保留数据上保持较高的准确率,而在卸载数据上准确率接近于零。
2 Python代码实现
相关函数
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, Subset,TensorDataset
from torch.amp import autocast, GradScaler
import numpy as np
import matplotlib.pyplot as plt
import os
import warnings
import random
from copy import deepcopy
random.seed(2024)
torch.manual_seed(2024)
np.random.seed(2024)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
warnings.filterwarnings("ignore")
MODEL_NAMES = "MLP"
# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 定义三层全连接网络
class MLP(nn.Module):
def __init__(self):
super(MLP, self).__init__()
self.fc1 = nn.Linear(28 * 28, 256)
self.fc2 = nn.Linear(256, 128)
self.fc3 = nn.Linear(128, 10)
def forward(self, x):
x = x.view(-1, 28 * 28)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
# 加载MNIST数据集
def load_MNIST_data(batch_size,forgotten_classes,ratio):
transform = transforms.Compose([transforms.ToTensor()])
train_data = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_data = datasets.MNIST(root='./data', train=False, download=True, transform=transform)
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)
forgotten_train_data,_ = generate_subset_by_ratio(train_data, forgotten_classes,ratio)
retain_train_data,_ = generate_subset_by_ratio(train_data, [i for i in range(10) if i not in forgotten_classes])
forgotten_train_loader= DataLoader(forgotten_train_data, batch_size=batch_size, shuffle=True)
retain_train_loader= DataLoader(retain_train_data, batch_size=batch_size, shuffle=True)
return train_loader, test_loader, retain_train_loader, forgotten_train_loader
# worker_init_fn 用于初始化每个 worker 的随机种子
def worker_init_fn(worker_id):
random.seed(2024 + worker_id)
np.random.seed(2024 + worker_id)
def get_transforms():
train_transform = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)), # 标准化为[-1, 1]
])
test_transform = transforms.Compose([
transforms.Resize((32, 32)),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)), # 标准化为[-1, 1]
])
return train_transform, test_transform
# 模型训练函数
def train_model(model, train_loader, criterion, optimizer, scheduler=None,use_fp16 = False):
use_fp16 = True
# 使用新的初始化方式:torch.amp.GradScaler("cuda")
scaler = GradScaler("cuda") # 用于混合精度训练
model.train()
running_loss = 0.0
for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)
# 前向传播
with autocast(enabled=use_fp16, device_type="cuda"): # 更新为使用 "cuda"
outputs = model(images)
loss = criterion(outputs, labels)
# 反向传播和优化
optimizer.zero_grad()
if use_fp16:
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
else:
loss.backward()
optimizer.step()
running_loss += loss.item()
if scheduler is not None:
# 更新学习率
scheduler.step()
print(f"Loss: {running_loss/len(train_loader):.4f}")
# 模型评估(计算保留和遗忘类别的准确率)
def test_model(model, test_loader, forgotten_classes=[0]):
"""
测试模型的性能,计算总准确率、遗忘类别准确率和保留类别准确率。
:param model: 要测试的模型
:param test_loader: 测试数据加载器
:param forgotten_classes: 需要遗忘的类别列表
:return: overall_accuracy, forgotten_accuracy, retained_accuracy
"""
model.eval()
correct = 0
total = 0
forgotten_correct = 0
forgotten_total = 0
retained_correct = 0
retained_total = 0
with torch.no_grad():
for images, labels in test_loader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
# 计算总的准确率
total += labels.size(0)
correct += (predicted == labels).sum().item()
# 计算遗忘类别的准确率
mask_forgotten = torch.isin(labels, torch.tensor(forgotten_classes, device=device))
forgotten_total += mask_forgotten.sum().item()
forgotten_correct += (predicted[mask_forgotten] == labels[mask_forgotten]).sum().item()
# 计算保留类别的准确率(除遗忘类别的其他类别)
mask_retained = ~mask_forgotten
retained_total += mask_retained.sum().item()
retained_correct += (predicted[mask_retained] == labels[mask_retained]).sum().item()
overall_accuracy = correct / total
forgotten_accuracy = forgotten_correct / forgotten_total if forgotten_total > 0 else 0
retained_accuracy = retained_correct / retained_total if retained_total > 0 else 0
# return overall_accuracy, forgotten_accuracy, retained_accuracy
return round(overall_accuracy, 4), round(forgotten_accuracy, 4), round(retained_accuracy, 4)
主函数
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
import numpy as np
from models.Base import load_MNIST_data, test_model, load_CIFAR100_data, init_model
class UNSIRForget:
def __init__(self, model):
self.model = model
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 学习误差最大化噪声矩阵
def learn_error_maximizing_noise(self, train_loader, forgotten_classes, lambda_reg=0.01, learning_rate=0.01, num_epochs=5):
self.model.eval()
# 初始化噪声矩阵 N,大小与输入图像相同(例如28x28图像)
noise_matrix = torch.randn(1, 1, 28, 28, device=self.device, requires_grad=True) # 假设输入是28x28的图像
# 优化器用于优化噪声矩阵
optimizer = torch.optim.SGD([noise_matrix], lr=learning_rate)
noise_data = []
noise_labels = []
# 生成噪声数据集
for epoch in range(num_epochs):
total_loss = 0.0
for images, labels in train_loader:
images, labels = images.to(self.device), labels.to(self.device)
# 只对属于遗忘类别的数据进行优化
mask_forgotten = torch.isin(labels, torch.tensor(forgotten_classes, device=self.device))
noisy_images = images.clone()
# 对遗忘类别的图像添加噪声
noisy_images[mask_forgotten] += noise_matrix
# 保存噪声数据
noise_data.append(noisy_images)
noise_labels.append(labels)
# 前向传播
outputs = self.model(noisy_images.view(-1, 28 * 28)) # 假设模型的输入是28x28的图像
loss = F.cross_entropy(outputs, labels)
# L2 正则化项(噪声矩阵的L2范数)
l2_reg = lambda_reg * torch.norm(noise_matrix)
# 总损失(包含交叉熵损失和L2正则化)
total_loss = loss + l2_reg
# 反向传播并更新噪声矩阵
optimizer.zero_grad()
total_loss.backward()
optimizer.step()
print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss.item():.4f}")
# 返回包含噪声数据和标签的噪声数据集
return torch.cat(noise_data), torch.cat(noise_labels), noise_matrix.detach()
# 实现机器遗忘(针对特定类别,使用噪声矩阵进行干扰)
def unlearn(self, train_loader, forgotten_classes, noise_data, noise_labels, noise_matrix, alpha_impair, alpha_repair, num_epochs=1):
# 损伤步骤
self.model.train()
print("执行损伤中...")
for epoch in range(num_epochs):
for images, labels in train_loader:
images, labels = images.to(self.device), labels.to(self.device)
# 仅选择保留类别的数据
mask_retained = ~torch.isin(labels, torch.tensor(forgotten_classes, device=self.device))
retained_images = images[mask_retained]
retained_labels = labels[mask_retained]
# 生成新的数据集,将噪声数据添加到保留数据中
augmented_images = torch.cat([retained_images, noise_data], dim=0)
augmented_labels = torch.cat([retained_labels, noise_labels], dim=0)
# 前向传播
outputs = self.model(augmented_images.view(-1, 28 * 28)) # 假设模型的输入是28x28的图像
loss = F.cross_entropy(outputs, augmented_labels)
# 更新模型权重
self.model.zero_grad()
loss.backward()
with torch.no_grad():
for param in self.model.parameters():
param.data -= alpha_impair * param.grad.data
# 修复步骤
print("执行修复中...")
for epoch in range(num_epochs):
for images, labels in train_loader:
images, labels = images.to(self.device), labels.to(self.device)
# 仅使用保留类别的数据进行修复
mask_retained = ~torch.isin(labels, torch.tensor(forgotten_classes, device=self.device))
retained_images = images[mask_retained]
retained_labels = labels[mask_retained]
if retained_images.size(0) == 0:
continue
# 前向传播和损失计算
outputs = self.model(retained_images.view(-1, 28 * 28))
loss = F.cross_entropy(outputs, retained_labels)
# 更新模型权重
self.model.zero_grad()
loss.backward()
with torch.no_grad():
for param in self.model.parameters():
param.data -= alpha_repair * param.grad.data
return self.model
# UNSIR算法的主要流程
def unsir_unlearning(model_before, retrain_data, forget_data, all_data, forgotten_classes, lambda_reg=0.01, learning_rate=0.01, alpha_impair=0.5, alpha_repair=0.001, num_epochs=5):
"""
执行 UNSIR 算法的主要流程,包括学习误差最大化噪声矩阵、损伤、修复步骤,最终返回遗忘后的模型。
"""
unsir_forgetter = UNSIRForget(model_before)
# 计算学习误差最大化噪声矩阵
noise_data, noise_labels, noise_matrix = unsir_forgetter.learn_error_maximizing_noise(all_data, forgotten_classes, lambda_reg, learning_rate, num_epochs)
# 执行 unlearn(损伤与修复步骤)
unlearned_model = unsir_forgetter.unlearn(all_data, forgotten_classes, noise_data, noise_labels, noise_matrix, alpha_impair, alpha_repair, num_epochs)
return unlearned_model
def main():
# 超参数设置
batch_size = 256
forgotten_classes = [0]
ratio = 1
model_name = "MLP"
# 加载数据
train_loader, test_loader, retain_loader, forget_loader = load_MNIST_data(batch_size, forgotten_classes, ratio)
model_before = init_model(model_name, train_loader)
# 在训练之前测试初始模型准确率
overall_acc_before, forgotten_acc_before, retained_acc_before = test_model(model_before, test_loader)
print("执行 UNSIR 遗忘...")
model_after = unsir_unlearning(
model_before,
retain_loader,
forget_loader,
train_loader,
forgotten_classes,
lambda_reg=0.01,
learning_rate=0.01,
alpha_impair=0.5,
alpha_repair=0.001,
num_epochs=5,
)
# 测试遗忘后的模型
overall_acc_after, forgotten_acc_after, retained_acc_after = test_model(model_after, test_loader)
# 输出遗忘前后的准确率变化
print(f"Unlearning前遗忘准确率: {100 * forgotten_acc_before:.2f}%")
print(f"Unlearning后遗忘准确率: {100 * forgotten_acc_after:.2f}%")
print(f"Unlearning前保留准确率: {100 * retained_acc_before:.2f}%")
print(f"Unlearning后保留准确率: {100 * retained_acc_after:.2f}%")
if __name__ == "__main__":
main()
3 总结
当前方法不支持随机样本或类别子集的卸载,这可能违反零隐私假设。
仍属于重新优化的算法,即还需要训练。