1. 背景介绍
联邦学习(Federated Learning,FL)是隐私计算中常见的一种技术范式,其本质是一种面向可信数据流通的分布式机器学习框架,允许多个参与方在不共享其本地数据的前提下,协同训练机器学习模型。与传统的集中式学习方法相比,联邦学习通过分布式以及加密算法等方式保护数据隐私和安全,避免了数据的集中存储与传输。
关于联邦学习的知识,我们在之前的文章《使用GPU加速计算联邦学习XGBOOST算法》、《大模型的安全由隐私计算来保护》、《基于横纵向的混合联邦学习原理分析》、《SGB、SS-XGB算法原理》、《逻辑回归LR与广义线性模型GLM》中,或多或少都有涉及。本文主要面向联邦学习的性能提升问题,制约联邦学习的因素归根到底主要是两类:计算以及通信,而其中计算又以密文计算耗时占据整体联邦学习的大头,因此在本文中,我们将更多的关注在密文计算上的加速。
说到密文计算的加速,通常有多种优化的方向:
(1)软硬件加速(如提升并行度);
(2)一次密文计算完成多个密文数据处理;
(3)最小化密文计算(既然密文计算耗时,那采用优化算法机制等手段尽量减少密文的处理场景);
(4)同态加密算法本身的改进,比如采用中国剩余定理来加速,参考《中国剩余定理解释以及Paillier解密加速应用》。
(5)同态算法的选型(全同态、半同态)
(6)其他还有调度策略、采用其他隐私计算技术来替代同态加密等手段,就不一一列出。
2. 全同态加密与半同态加密在联邦机器学习中的实证对比
2.1 实验设置
本文主要关注第(5)点中提到的同态算法选型方向。我们会对比半同态加密、全同态加密的密文计算性能,在联邦逻辑回归算法中的实证结果。注意:这里联邦逻辑回归算法我们会做简化处理,不对原始机器学习算法做节点区分(比如划分成host、guest的计算逻辑),而是简单的对梯度因子(即误差项)做加密,然后密文计算梯度。实际的联邦学习算法中,基本流程类似,guest方会解密host方混淆后的梯度值,然后发回给host方进行模型权重参数的更新。
密文的操作包括加密(误差)、密文@明文(梯度)、解密(梯度)。
我们选择的加密算法参照对象分别为:
半同态加密:隐语的Zpaillier【1】;
几种半同态加密算法性能对比数据【1】:
全同态加密:OpenMined的TenSeal【2,3】提供的CKKS (基于微软的SEAL)【4】。
CKKS(全称 Cheon-Kim-Kim-Song)是2017年提出的同态加密方案。它支持浮点向量在密文空间的加减乘运算并保持同态,但是只支持有限次乘法的运算。以下是CKKS的计算流程图。先对消息(向量)进行编码,然后再加密,在密文空间进行一些运算后再解密,最后解码成运算后的消息(向量)。这里的编码指的是将复数向量映射成为多项式,是为了方便下面进一步的加密【5】。对于全同态加密算法感兴趣的话,可以参考下这篇文章的分析【6】。
2.2 实验数据及代码
我们选用的模型是逻辑回归模型,做梯度部分做密文处理,实验数据使用的是风控数据,共有43个特征,样本量10万。
2.2.1 zpaillier(gpaillier)版本模型
事实上,后续的性能比较采用了gpaillier(gpu版本),gpaillier与zpaillier的接口一致,略微修改就可以切换,gpaillier的性能要优于zpaillier。
gpaillier与zpaillier的对比见【7】:
import numpy as np
import pandas as pd
import copy
from sklearn.metrics import auc, roc_curve
from heu import numpy as hnp
from heu import phe
# 初始化 Paillier 加密系统
encryption_kit = hnp.setup(phe.SchemaType.ZPaillier, 2048)
encryptor = encryption_kit.encryptor()
decryptor = encryption_kit.decryptor()
evaluator = encryption_kit.evaluator()
# 设置浮点数编码比例
scale_factor = 10**6
float_encoder = phe.FloatEncoder(phe.SchemaType.ZPaillier, scale_factor)
# 定义 Sigmoid 函数
def sigmoid(x):
return 1. / (1 + np.exp(-x))
# 数据列名
match_column = "id"
label_column = "isDefault"
feature_file = "risk_train_x.csv"
label_file = "risk_train_y.csv"
# 加载并合并特征和标签数据集
features_df = pd.read_csv(feature_file)
labels_df = pd.read_csv(label_file)
merged_df = pd.merge(features_df, labels_df, how='left', on=match_column)
merged_df = pd.DataFrame(merged_df)
# 获取所有列名并移除匹配列和标签列
all_columns = list(merged_df.columns)
feature_columns = copy.deepcopy(all_columns)
feature_columns.remove(match_column)
feature_columns.remove(label_column)
print(f"特征列名: {feature_columns}")
# 填充缺失值(使用众数填充)
X = merged_df[feature_columns].fillna(merged_df[feature_columns].mode().iloc[0])
# 特征标准化处理
mean_values = np.mean(X, axis=0)
std_values = np.std(X, axis=0)
X_normalized = (X - mean_values) / std_values
X_normalized = X_normalized.to_numpy()
# 获取标签列
Y = merged_df[label_column].to_numpy().reshape((len(merged_df), 1))
# 划分训练集和测试集(比例为 8:2)
train_ratio = 0.8
split_index = int(len(X_normalized) * train_ratio)
X_train, X_test = X_normalized[:split_index], X_normalized[split_index:]
y_train, y_test = Y[:split_index], Y[split_index:]
print(f"训练数据大小: {X_train.shape}, {y_train.shape}, {X_test.shape}, {y_test.shape}")
# 设置逻辑回归模型超参数
num_epochs = 3
batch_size = 4000
learning_rate = 0.01
# 初始化模型权重
num_features = len(feature_columns)
weights = np.zeros((num_features, 1))
# 计算批次数
num_batches = len(y_train) // batch_size
if len(y_train) % batch_size != 0:
num_batches += 1
# 迭代训练模型
for epoch in range(num_epochs):
for batch in range(num_batches):
print(f"Epoch {epoch}, Batch {batch}")
X_batch = X_train[batch * batch_size: (batch + 1) * batch_size]
y_batch = y_train[batch * batch_size: (batch + 1) * batch_size]
y_pred = sigmoid(np.matmul(X_batch, weights))
# 梯度因子加密
grad_factor = y_pred - np.array(y_batch)
grad_factor_encrypted = encryption_kit.array(grad_factor.T, float_encoder)
encrypted_gradients = encryptor.encrypt(grad_factor_encrypted)
# 密文梯度计算
X_batch_encoded = encryption_kit.array(X_batch, float_encoder)
encrypted_grad_sum = evaluator.matmul(encrypted_gradients, X_batch_encoded)
# 解密并计算权重更新
grad_sum = decryptor.decrypt(encrypted_grad_sum).to_numpy(float_encoder)
grad_sum = grad_sum.T / scale_factor
gradient = grad_sum / len(y_batch)
weights -= gradient * learning_rate
# 在测试集上进行预测
y_pred_test = sigmoid(np.matmul(X_test, weights))
# 计算模型评估指标
print("模型评估指标:")
fpr, tpr, _ = roc_curve(y_test, y_pred_test)
ks_value = max(tpr - fpr)
auc_value = auc(fpr, tpr)
print(f"KS值: {ks_value}")
print(f"AUC值: {auc_value}")
2.2.2 tenseal-ckks版本模型
import numpy as np
import pandas as pd
from sklearn.metrics import auc, roc_curve
import tenseal as ts
import sys
# 设置 TenSEAL 加密上下文,指定多项式模数度数和系数位大小
context = ts.context(
ts.SCHEME_TYPE.CKKS,
poly_modulus_degree=8192,
coeff_mod_bit_sizes=[60, 40, 40, 60]
)
# 生成 Galois 密钥,用于加密操作
context.generate_galois_keys()
context.global_scale = 2**40 # 设置全局加密比例
def sigmoid(x):
return 1. / (1 + np.exp(-x))
match_column = "id" # 匹配列名
label_column = "isDefault" # 标签列名
feature_data_file = "risk_train_x.csv" # 特征数据文件
label_data_file = "risk_train_y.csv" # 标签数据文件
# 加载并合并特征与标签数据集
features_df = pd.read_csv(feature_data_file)
labels_df = pd.read_csv(label_data_file)
merged_df = pd.merge(features_df, labels_df, how='left', on=match_column)
# 获取特征列名,排除匹配列和标签列
feature_columns = [col for col in merged_df.columns if col not in [match_column, label_column]]
# 填充缺失值(使用众数),并对特征进行标准化
X = merged_df[feature_columns].fillna(merged_df.mode().iloc[0])
X = (X - X.mean()) / X.std()
X = X.to_numpy()
# 获取标签列数据
Y = merged_df[label_column].to_numpy().reshape(-1, 1)
# 将数据集按 80/20 划分为训练集和测试集
train_ratio = 0.8
train_size = int(len(X) * train_ratio)
X_train, X_test = X[:train_size], X[train_size:] # 训练集和测试集的特征
y_train, y_test = Y[:train_size], Y[train_size:] # 训练集和测试集的标签
# 逻辑回归的超参数配置
num_epochs = 3 # 迭代次数
batch_size = 4000 # 批量大小
learning_rate = 0.01 # 学习率
# 初始化模型权重
num_features = X_train.shape[1] # 特征数目
weights = np.zeros((num_features, 1)) # 权重初始化为全零向量
# 计算批次数量
num_batches = int(np.ceil(len(y_train) / batch_size))
# 使用批量梯度下降训练逻辑回归模型
for epoch in range(num_epochs):
for batch in range(num_batches):
print(f"Epoch {epoch}, Batch {batch}")
# 获取当前批次的训练数据
X_batch = X_train[batch * batch_size: (batch + 1) * batch_size]
y_batch = y_train[batch * batch_size: (batch + 1) * batch_size]
# 预测当前批次的结果
y_pred = sigmoid(np.matmul(X_batch, weights))
# 计算梯度误差,并进行加密
gradient_error = y_pred - y_batch
encrypted_gradient = ts.ckks_vector(context, gradient_error.T.flatten().tolist())
# 内存占用
print("内存占用", sys.getsizeof(encrypted_gradient))
# 加密矩阵乘法,计算加密后的梯度
gradient_sum_encrypted = encrypted_gradient.matmul(X_batch.tolist())
# 解密梯度
gradient_sum = np.array(gradient_sum_encrypted.decrypt()).reshape(-1, 1)
# 更新权重,按照梯度下降规则
weights -= learning_rate * gradient_sum / len(y_batch)
# 在测试集上进行预测
y_pred_test = sigmoid(np.matmul(X_test, weights))
# 评估模型性能,计算 AUC 和 KS 指标
fpr, tpr, _ = roc_curve(y_test, y_pred_test)
ks_stat = max(tpr - fpr) # KS 统计量
auc_score = auc(fpr, tpr) # AUC 值
print(f'KS: {ks_stat}')
print(f'AUC: {auc_score}')
2.3 实验结果
实验迭代3个epoch,batchsize设置4000。
左边是ckks版本的耗时 407s,右边是gpaillier版本的耗时 1167s。结果显示,采用全同态ckks,在计算性能上明显优于gpaillier版本,更优于zpaillier。
3. 参考材料
【1】HEU 多种 PHE 算法选择
【2】TenSeal代码仓库
【3】TENSEAL: A LIBRARY FOR ENCRYPTED TENSOR OPERATIONS USING HOMOMORPHIC ENCRYPTION
【4】微软SEAL
【5】CKKS explained series
【6】全同态加密算法概览
【7】隐语HEU同态加密算法解读