对于联邦学习正在学习中,下文中若有错误出现,望指正
介绍
本文在简单实现联邦平均算法时,使用客户-服务器架构,其基本流程是:
1、server初始化模型参数,所有clients将这个初始模型下载到本地
2、clients利用本地产生的数据进行训练
3、将训练得到的模型参数上传到server
4、server对得到的模型参数整合,所有的clients更新模型参数
5、重复执行2-5,直至收敛或达到预期要求
在我看来,联邦学习就是参数在客户端和服务器之间的反复调整
数据管理
加载数据集
在这部分代码段中,先加载MNIST数据集的训练集和测试集。然后将训练集和测试集的数据和标签分别提取出来。接着计算数据的均值和标准差,用于后续的归一化处理。归一化的目的是将数据缩放到一个相同的尺度,使得不同特征之间的数值差异不会对模型的学习产生过大的影响
接下来,使用transforms.Compose
创建一个转换操作序列,包括将数据转换为张量(tensor)和对数据进行归一化处理。
# 加载MNIST数据集
train_dataset = torchvision.datasets.MNIST("./dataset", train=True, download=False)
test_dataset = torchvision.datasets.MNIST("./dataset", train=False, download=False)
# 将数据与标签分离
train_data = train_dataset.data.to(torch.float)
train_labels = np.array(train_dataset.targets)
test_data = test_dataset.data.data.to(torch.float)
test_labels = np.array(test_dataset.targets)
# 对数据进行归一化处理
mean = (train_data.mean()) / (train_data.max() - train_data.min())
std = (train_data.std()) / (train_data.max() - train_data.min())
transform = transforms.Compose([
transforms.Normalize((mean,), (std,))
])
train_data = transform(train_data)
test_data = transform(test_data)
# train_data.shpe[0] <-> len(train_data) 为数据集的大小
train_data_size = train_data.shape[0]
test_data_size = test_data.shape[0]
test_DataLoader = DataLoader(TensorDataset(test_data, test_labels), batch_size=self.BatchSize)
数据划分
我们要为每个客户端分配数据,在实际场景中,每个客户端有自己独有的数据,这里为了模拟场景,手动划分数据集给每个客户端,MNIST数据集中共有60000个样本,这里设存在100个客户端,那我们就要将这60000个数据分配给100个客户端,而客户端之间的数据可能是独立分布IID,也可能是非独立同分布Non_IID的
IID
将数据集打乱,然后为每个client分配600个数据
涉及到的函数:
numpy.random.seed():生成随机种子
numpy.random.permutation():随机排列序列
numpy.array_split():将大型数组拆分为多个较小的子数组
代码
# 整体数据集(MNIST)中的类别数
nclass = np.max(train_labels) + 1
# 数据划分
client_train_data = {}
client_train_label = {}
if self.is_iid:
# 设置随机种子数可以保证每次运行代码时,np.random.permutation(self.train_data_size)产生的数列都是一样的, 这样可以确保实验的可重复性
np.random.seed(12)
# 对训练数据集(序号)进行随机排列, 得到一个索引数组idxs(实现了将数据集打乱)
idxs = np.random.permutation(train_data_size)
# 将索引数组idxs分割成与客户端数量相等的子数组batch_idxs
batch_idxs = np.array_split(idxs, self.num_of_clients)
# 遍历所有客户端
for i in range(self.num_of_clients):
# 根据索引数组batch_idxs[i], 给每个客户端分配相应的数据和标签
client_train_data[i] = train_data[batch_idxs[i]]
client_train_label[i] = train_labels[batch_idxs[i]]
# 类别分布,表示每个类别在客户端数据集中出现的次数
distribution = [client_train_label[i].tolist().count(i) for i in range(nclass)]
return client_train_data, client_train_label
Non-IID
首先根据数据标签将数据集排序(即MNIST中的数字大小),
然后将其划分为200组大小为300的数据切片,然后分给每个Client两个切片
涉及到的函数
numpy.random.dirichlet(): dirichlet分布中获取随机样本,并使用该方法返回一些随机样本的numpy数组
numpy.argwhere: 查找满足特定条件的索引
zip(): 将可迭代的对象作为参数,将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的列表
numpy.cumsum(): 计算轴向的累加和
numpy.concatenate(): 拼接数组
代码
else:
n_clients = self.num_of_clients
train_label = train_labels
np.random.seed(250)
# [self.beta] * n_clients会创建一个长度为n_clients, 每个元素都为self.beta的列表
# 生成一个形状为(nclass, n_clients)的矩阵, 记录每个类别划分到每个client的比例
label_distribution = np.random.dirichlet([self.beta] * n_clients, nclass)
# 对于每一个在范围[0, nclass-1]内的整数y, 找到train_label中所有等于y的元素的索引, 并将这些索引展平成一个一维数组
# class_idcs是一个列表, 其中每个元素都是一个一维数组, 记录每个类别对应的样本索引
class_idcs = [np.argwhere(train_label == y).flatten() for y in range(nclass)]
# 创建一个名为client_idcs的列表, 其中包含n_clients个空列表, 每个空列表代表一个客户端的索引集合
client_idcs = [[] for _ in range(n_clients)]
# 使用zip函数将class_idcs和label_distribution进行配对, 并迭代处理每一对数据
# 每次迭代, 都是一个不同的类别, c是从class_ids中取出的一个一维数组, 包含了本次迭代对应类别的样本索引,
# fracs是从label_distribution中取出的一维数组, 表示当前类别的数据分配到各个客户端的比例
for c, fracs in zip(class_idcs, label_distribution):
# np.split按照比例将本次迭代对应类别的样本划分为了N个子集
# (np.cumsum(fracs)[:-1] * len(c) 先计算了fracs数组的累积和, 然后[:-1]切片操作去除累积和数组的最后一个元素,以确保
# fracs与新的累加和数组的长度相同, 最后累加和数组的每个元素乘以数组c的长度会得到一个新的数组
# 将新数组的每个元素转换为整数后即为分割c的分割点
# for i, idcs 为遍历第i个client对应样本集合的索引
for i, idcs in enumerate(np.split(c, (np.cumsum(fracs)[:-1] * len(c)).astype(int))):
client_idcs[i] += [idcs]
for i in range(self.num_of_clients):
idcs = client_idcs[i]
# 记录每个client拥有(数据)的样本数量
distribution = [len(c) for c in idcs]
client_train_data[i] = train_data[np.concatenate(idcs)]
client_train_label[i] = train_label[np.concatenate(idcs)]
return client_train_data, client_train_label
定义CNN神经网络模型
这部分不在此赘述, 不明白的可移步主页
class Mnist_CNN(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(in_channels=1, out_channels=30, kernel_size=3, stride=1, padding=1)
self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
self.conv2 = nn.Conv2d(in_channels=30, out_channels=5, kernel_size=3, stride=1, padding=1)
self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
self.fc1 = nn.Linear(7 * 7 * 5, 100)
self.fc2 = nn.Linear(100, 10)
def forward(self, inputs):
tensor = inputs.view(-1, 1, 28, 28)
tensor = F.relu(self.conv1(tensor))
tensor = self.pool1(tensor)
tensor = F.relu(self.conv2(tensor))
tensor = self.pool2(tensor)
tensor = tensor.view(-1, 7 * 7 * 5)
tensor = F.relu(self.fc1(tensor))
tensor = self.fc2(tensor)
return tensor
客户端
在客户端接收聚合后的参数,并更新神经网络模型,利用新的神经网络模型进行数据集的训练,最后返回训练好的模型参数
class client:
def __init__(self):
self.dev = dev
self.train_DataLoader = None
self.local_parameters = None
# 模型训练
def localUpdate(self, localEpoch, localBatchSize, Net, lossFun, opti, global_parameters, trainDataSet, dev):
# localEpoch: 当前Client的迭代次数
# localBatchSize: 当前Client的batchsize大小
# Net: Server共享的模型
# LossFun: 损失函数
# opti: 优化函数
# global_parmeters: 当前通讯中最全局参数
# return: 返回当前Client基于自己的数据训练得到的新的模型参数
# 加载当前通信中最新全局参数
# 并将global_parameters传入网络模型
Net.load_state_dict(global_parameters, strict=True)
# 加载本地数据, client自己的数据集
self.train_DataLoader = DataLoader(trainDataSet, batch_size=localBatchSize, shuffle=True)
# 设置迭代次数
for epoch in range(localEpoch):
for data, label in self.train_DataLoader:
# 加载到GPU上
data, label = data.to(dev), label.to(dev)
# 模型上传入数据
output = Net(data)
# 计算损失函数
loss = lossFun(output, label)
# 将梯度归零,初始化梯度
opti.zero_grad()
# 反向传播
loss.backward()
# 计算梯度,并更新梯度
opti.step()
# 返回当前Client基于自己的数据训练得到的新的模型参数
return Net.state_dict()
def local_val(self):
pass
服务器
将clients训练好的参数通过加权平均的方式合并为一个新的模型参数,并传递给clients
class server:
def __init__(self, client_params):
self.client_params = client_params
def agg_average(self):
w = self.client_params
# 将第一个权重字典赋值给weights_avg, 即为将第一个client的模型参数赋给weights_avg
weights_avg = w[0]
# 遍历weights_avg字典的所有键(代表一个神经网络模型中的所有的参数)
for k in weights_avg.keys():
# 遍历权重列表w中从第二个元素开始的所有权重字典, 即遍历所有client的参数
for i in range(1, len(w)):
# 在第一个client参数的基础上, 将其他client的参数也全部加到weights_avg(weights_avg充当累加的“容器”)
weights_avg[k] = weights_avg[k] + w[i][k]
# 联邦平均, 将所有client的参数求平均值得到新的参数
weights_avg[k] = weights_avg[k] / len(w)
return weights_avg
测试集评估模型
class test_accuracy:
def test_accuracy(self, net, parameters, testDataLoader, dev, lossFun):
# 存储损失
total_test_loss = 0
with torch.no_grad():
net.load_state_dict(parameters, strict=True)
sum_accu = 0
num = 0
# 载入测试集
for data, label in testDataLoader:
data, label = data.to(dev), label.to(dev)
output = net(data)
loss = lossFun(output, label)
# loss = 1
total_test_loss = total_test_loss + loss.item()
output = torch.argmax(output, dim=1)
sum_accu += (output == label).float().mean()
num += 1
accuracy = sum_accu / num
avg_loss = total_test_loss / num
return avg_loss, accuracy
main函数
这段代码是一个基于联邦学习的神经网络训练过程:
- 设置设备(CPU或GPU)用于训练。
- 创建客户端数据,并将其分配给各个客户端。
- 初始化一个神经网络模型(Mnist_CNN)。
- 设置训练参数,如通信轮数、批次大小、损失函数和优化器。
- 在每一轮通信中,每个客户端使用本地数据进行训练,并更新其模型参数。
- 服务器收集所有客户端的模型参数,并计算全局模型参数的平均值。
- 将全局模型参数加载到网络模型中,并在测试数据集上评估模型的性能。
- 打印每10轮的训练结果,包括每个客户端的准确率和损失值以及全局模型的准确率和损失值。
if __name__ == "__main__":
# ----------------------------------设置参数----------------------------------
dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
# ----------------------------------创建clients, 并分配数据----------------------------------
IID = False
# 假设有10个客户端
get_data = GetData(isIID=IID, num_clients=10, dev=dev)
clients_data, clients_label, testDataLoader = get_data.load_data()
# ----------------------------------初始化模型----------------------------------
# 模型实例化
net = Mnist_CNN()
net = net.to(dev)
# 通信轮数
rounds = 100
# batch_size
batch_size = 64
# 定义损失函数
loss_func = nn.CrossEntropyLoss()
loss_func = loss_func.to(dev)
# 定义优化器
if IID:
lr = 0.00001
else:
lr = 0.0001
opti = optim.Adam(net.parameters(), lr=lr)
# 客户端数量
num_in_comm = 10
# ----------------------------------训练----------------------------------
# 定义变量global_parameters
global_parameters = net.state_dict()
# train_loss = []
# clients与server之间通信
for curr_round in range(1, rounds + 1):
local_loss = []
client_params = {}
sum_parameters = None
for k in range(num_in_comm):
# print(k)
my_client = client()
train_data = clients_data[k]
train_label = torch.tensor(clients_label[k])
# 每个client训练得到的权重
local_parameters = my_client.localUpdate(localEpoch=1, localBatchSize=batch_size, Net=net,
lossFun=loss_func,
opti=opti,
global_parameters=global_parameters,
trainDataSet=TensorDataset(train_data, train_label), dev=dev)
client_params[k] = local_parameters
accuracy = test_accuracy()
local_loss, local_acc = accuracy.test_accuracy(net, local_parameters, testDataLoader, dev, loss_func)
if curr_round % 10 == 0:
print(
'[Round: %d Client: %d] accuracy: %f loss: %f ' % (curr_round, k, local_acc, local_loss))
# 取平均值,得到本次通信中server得到的更新后的模型参数
s = server(client_params)
global_parameters = s.agg_average()
net.load_state_dict(global_parameters, strict=True)
accuracy = test_accuracy()
global_loss, global_acc = accuracy.test_accuracy(net, global_parameters, testDataLoader, dev, loss_func)
if curr_round % 10 == 0:
print(
'----------------------------------[Round: %d] accuracy: %f loss: %f----------------------------------'
% (curr_round, global_acc, global_loss))
完整代码
import torch
import torchvision
import numpy as np
from torch import nn, optim
from torch.utils.data import DataLoader, TensorDataset
from torchvision import transforms
import torch.nn.functional as F
# ----------------------------------数据管理----------------------------------
class GetData:
def __init__(self, isIID, num_clients, dev, beta=0.4, BatchSize=256):
self.dev = dev
self.beta = beta
self.is_iid = isIID
self.num_of_clients = num_clients # 客户端的数量
self.BatchSize = BatchSize
# self.clients_set = {} # 用于整合客户端信息
def load_data(self):
# 加载MNIST数据集
train_dataset = torchvision.datasets.MNIST("./dataset", train=True, download=False)
test_dataset = torchvision.datasets.MNIST("./dataset", train=False, download=False)
# 将数据与标签分离
train_data = train_dataset.data.to(torch.float)
train_labels = np.array(train_dataset.targets)
test_data = test_dataset.data.data.to(torch.float)
test_labels = test_dataset.targets
# 对数据进行归一化处理, 使得数据的均值为0, 标准差为1
mean = (train_data.mean()) / (train_data.max() - train_data.min())
std = (train_data.std()) / (train_data.max() - train_data.min())
transform = transforms.Compose([
transforms.Normalize((mean,), (std,))
])
train_data = transform(train_data)
test_data = transform(test_data)
# train_data.shpe[0] <-> len(train_data) 为数据集的大小
train_data_size = train_data.shape[0]
test_data_size = test_data.shape[0]
test_DataLoader = DataLoader(TensorDataset(test_data, test_labels), batch_size=self.BatchSize)
# 整体数据集(MNIST)中的类别数
nclass = np.max(train_labels) + 1
# 数据划分
client_train_data = {}
client_train_label = {}
distribution = {}
if self.is_iid:
# 设置随机种子数可以保证每次运行代码时,np.random.permutation(self.train_data_size)产生的数列都是一样的, 这样可以确保实验的可重复性
np.random.seed(12)
# 对训练数据集(序号)进行随机排列, 得到一个索引数组idxs(实现了将数据集打乱)
idxs = np.random.permutation(train_data_size)
# 将索引数组idxs分割成与客户端数量相等的子数组batch_idxs
batch_idxs = np.array_split(idxs, self.num_of_clients)
# 遍历所有客户端
for i in range(self.num_of_clients):
# 根据索引数组batch_idxs[i], 给每个客户端分配相应的数据和标签
client_train_data[i] = train_data[batch_idxs[i]]
client_train_label[i] = train_labels[batch_idxs[i]]
return client_train_data, client_train_label, test_DataLoader
else:
n_clients = self.num_of_clients
train_label = train_labels
np.random.seed(250)
# [self.beta] * n_clients会创建一个长度为n_clients, 每个元素都为self.beta的列表
# 生成一个形状为(nclass, n_clients)的矩阵, 记录每个类别划分到每个client的比例
label_distribution = np.random.dirichlet([self.beta] * n_clients, nclass)
# 对于每一个在范围[0, nclass-1]内的整数y, 找到train_label中所有等于y的元素的索引, 并将这些索引展平成一个一维数组
# class_idcs是一个列表, 其中每个元素都是一个一维数组, 记录每个类别对应的样本索引
class_idcs = [np.argwhere(train_label == y).flatten() for y in range(nclass)]
# 创建一个名为client_idcs的列表, 其中包含n_clients个空列表, 每个空列表代表一个客户端的索引集合
client_idcs = [[] for _ in range(n_clients)]
# 使用zip函数将class_idcs和label_distribution进行配对, 并迭代处理每一对数据
# 每次迭代, 都是一个不同的类别, c是从class_ids中取出的一个一维数组, 包含了本次迭代对应类别的样本索引,
# fracs是从label_distribution中取出的一维数组, 表示当前类别的数据分配到各个客户端的比例
for c, fracs in zip(class_idcs, label_distribution):
# np.split按照比例将本次迭代对应类别的样本划分为了N个子集
# (np.cumsum(fracs)[:-1] * len(c) 先计算了fracs数组的累积和, 然后[:-1]切片操作去除累积和数组的最后一个元素,以确保
# fracs与新的累加和数组的长度相同, 最后累加和数组的每个元素乘以数组c的长度会得到一个新的数组
# 将新数组的每个元素转换为整数后即为分割c的分割点
# for i, idcs 为遍历第i个client对应样本集合的索引
for i, idcs in enumerate(np.split(c, (np.cumsum(fracs)[:-1] * len(c)).astype(int))):
client_idcs[i] += [idcs]
for i in range(self.num_of_clients):
idcs = client_idcs[i]
# 记录每个client拥有(数据)的样本数量
distribution = [len(c) for c in idcs]
client_train_data[i] = train_data[np.concatenate(idcs)]
client_train_label[i] = train_label[np.concatenate(idcs)]
# yield client_train_data[i], client_train_label[i]
return client_train_data, client_train_label, test_DataLoader
# ----------------------------------定义CNN神经网络模型----------------------------------
class Mnist_CNN(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=5, stride=1, padding=2)
self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5, stride=1, padding=2)
self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
self.fc1 = nn.Linear(7 * 7 * 64, 512)
self.fc2 = nn.Linear(512, 10)
def forward(self, inputs):
tensor = inputs.view(-1, 1, 28, 28)
tensor = F.relu(self.conv1(tensor))
tensor = self.pool1(tensor)
tensor = F.relu(self.conv2(tensor))
tensor = self.pool2(tensor)
tensor = tensor.view(-1, 7 * 7 * 64)
tensor = F.relu(self.fc1(tensor))
tensor = self.fc2(tensor)
return tensor
# ----------------------------------客户端----------------------------------
class client:
def __init__(self):
self.dev = dev
self.train_DataLoader = None
self.local_parameters = None
# 模型训练
def localUpdate(self, localEpoch, localBatchSize, Net, lossFun, opti, global_parameters, trainDataSet, dev):
# localEpoch: 当前Client的迭代次数
# localBatchSize: 当前Client的batchsize大小
# Net: Server共享的模型
# LossFun: 损失函数
# opti: 优化函数
# global_parmeters: 当前通讯中最全局参数
# return: 返回当前Client基于自己的数据训练得到的新的模型参数
# 加载当前通信中最新全局参数
# 并将global_parameters传入网络模型
Net.load_state_dict(global_parameters, strict=True)
# 加载本地数据, client自己的数据集
self.train_DataLoader = DataLoader(trainDataSet, batch_size=localBatchSize, shuffle=True)
# 设置迭代次数
for epoch in range(localEpoch):
for data, label in self.train_DataLoader:
# 加载到GPU上
data, label = data.to(dev), label.to(dev)
# 模型上传入数据
output = Net(data)
# 计算损失函数
loss = lossFun(output, label)
# 将梯度归零,初始化梯度
opti.zero_grad()
# 反向传播
loss.backward()
# 计算梯度,并更新梯度
opti.step()
# 返回当前Client基于自己的数据训练得到的新的模型参数
return Net.state_dict()
def local_val(self):
pass
# ----------------------------------服务器----------------------------------
class server:
def __init__(self, client_params):
self.client_params = client_params
def agg_average(self):
w = self.client_params
weights_avg = w[0]
for k in weights_avg.keys():
for i in range(1, len(w)):
weights_avg[k] = weights_avg[k] + w[i][k]
weights_avg[k] = weights_avg[k] / len(w)
return weights_avg
# ----------------------------------在测试集上评估模型的性能, 计算准确率和平均损失----------------------------------
class test_accuracy:
def test_accuracy(self, net, parameters, testDataLoader, dev, lossFun):
# 存储损失
loss_collector = []
with torch.no_grad():
net.load_state_dict(parameters, strict=True)
sum_accu = 0
num = 0
loss_collector.clear()
# 载入测试集
for data, label in testDataLoader:
data, label = data.to(dev), label.to(dev)
output = net(data)
loss = lossFun(output, label)
# loss = 1
loss_collector.append(loss.item())
output = torch.argmax(output, dim=1)
sum_accu += (output == label).float().mean()
num += 1
accuracy = sum_accu / num
avg_loss = sum(loss_collector) / len(loss_collector)
return avg_loss, accuracy
if __name__ == "__main__":
# ----------------------------------设置参数----------------------------------
dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
# ----------------------------------创建clients, 并分配数据----------------------------------
IID = False
# 假设有10个客户端
get_data = GetData(isIID=IID, num_clients=10, dev=dev)
clients_data, clients_label, testDataLoader = get_data.load_data()
# ----------------------------------初始化模型----------------------------------
# 模型实例化
net = Mnist_CNN()
net = net.to(dev)
# 通信轮数
rounds = 100
# batch_size
batch_size = 64
# 定义损失函数
loss_func = nn.CrossEntropyLoss()
loss_func = loss_func.to(dev)
# 定义优化器
if IID:
lr = 0.00001
else:
lr = 0.0001
opti = optim.Adam(net.parameters(), lr=lr)
# 客户端数量
num_in_comm = 10
# ----------------------------------训练----------------------------------
# 定义变量global_parameters
global_parameters = net.state_dict()
# clients与server之间通信
for curr_round in range(1, rounds + 1):
local_loss = []
client_params = {}
for k in range(num_in_comm):
# print(k)
my_client = client()
train_data = clients_data[k]
train_label = torch.tensor(clients_label[k])
# 每个client训练得到的权重
local_parameters = my_client.localUpdate(localEpoch=1, localBatchSize=batch_size, Net=net,
lossFun=loss_func,
opti=opti,
global_parameters=global_parameters,
trainDataSet=TensorDataset(train_data, train_label), dev=dev)
client_params[k] = local_parameters
accuracy = test_accuracy()
local_loss, local_acc = accuracy.test_accuracy(net, local_parameters, testDataLoader, dev, loss_func)
if curr_round % 10 == 0:
print(
'[Round: %d Client: %d] accuracy: %f loss: %f ' % (curr_round, k, local_acc, local_loss))
# 取平均值,得到本次通信中server得到的更新后的模型参数
s = server(client_params)
global_parameters = s.agg_average()
net.load_state_dict(global_parameters, strict=True)
accuracy = test_accuracy()
global_loss, global_acc = accuracy.test_accuracy(net, global_parameters, testDataLoader, dev, loss_func)
if curr_round % 10 == 0:
print(
'----------------------------------[Round: %d] accuracy: %f loss: %f----------------------------------'
% (curr_round, global_acc, global_loss))
运行结果
IID
None-IID
在本次实验中,模型同构时收敛效果比较好,但在异构时效果并不稳定,所以后续学习异构情况下的优化策略
参考
基于PaddlePaddle实现联邦学习算法FedAvg - 飞桨AI Studio星河社区
联邦学习方法FedAvg实现(PyTorch) - 樱桃小屋 (cherry1024.github.io)
PyTorch 实现联邦学习FedAvg (详解)_pytorch fedavg-CSDN博客