纵向联邦学习
纵向联邦学习的参与方拥有相同样本空间、不同特征空间的数据,通过共有样本数据进行安全联合建模,在金融、广告等领域拥有广泛的应用场景。和横向联邦学习相比,纵向联邦学习的参与方之间需要协同完成数据求交集、模型联合训练和模型联合推理。并且,参与方越多,纵向联邦学习系统的复杂度就越高。
下面以企业A和企业B两方为例来介绍纵向联邦学习的基本架构和流程。假设企业A有特征数据和标签数据,可以独立建模;企业B有特征数据,缺乏标签数据,因此无法独立建模。由于隐私法规和行业规范等原因,两个企业之间的数据无法直接互通。企业A和企业B可采用纵向联邦学习解决方案进行合作,数据不出本地,使用双方共同样本数据进行联合建模和训练。最终双方都能获得一个更强大的模型。
纵向联邦架构
纵向联邦学习系统中的模型训练一般分为如下阶段: - 样本对齐:首先对齐企业A和企业B中具有相同ID(Identification)的样本数据。在数据对齐阶段,系统会采用加密算法对数据进行保护,确保任何一方的用户数据不会暴露。 - 联合训练:在确定企业A和企业B共有用户数据后,可以使用这些共有的数据来协同训练一个业务模型。模型训练过程中,模型参数信息以加密方式进行传递。已训练好的联邦学习模型可以部署在联邦学习系统的各参与方。
14.3. 纵向联邦学习 — 机器学习系统:设计和实现 1.0.0 documentation
链接中的文章对纵向联邦学习的样本对齐和联合训练进行了进一步的解释。
实践案例:银行营销
任务
市场营销是银行业在不断变化的市场环境中,为满足客户需要、实现经营目标的整体性经营和销售的活动。在目前大数据的环境下,数据分析为银行业提供了更有效的分析手段。对客户需求分析,了解目标市场趋势以及更宏观的市场策略都可以提供依据与方向。
数据来自 kaggle 上的经典银行营销数据集,是一家葡萄牙银行机构电话直销的活动,目标变量是客户是否订阅存款产品。
数据
- 样本量总计11162个,其中训练集8929, 测试集2233
- 特征16维,标签为2分类
- 我们预先对数据进行了切割,alice持有其中的4维基础属性特征,bob持有12维银行交易特征,对应的label只有alice方持有
我们先来看看我们的银行市场营销数据长什么样的?
原始数据被拆分为bank_alice和bank_bob,分别存在alice和bob两方。这里的csv是仅经过拆分没有做预处理的原始数据,我们将使用secretflow preprocess进行FedData预处理。
下面在 Secretflow 环境中创建 2 个实体 [Alice, Bob],其中 ‘Alice’ 和 ‘Bob’ 是两个 PYU。
%load_ext autoreload
%autoreload 2
import secretflow as sf
import matplotlib.pyplot as plt
sf.init(['alice', 'bob'], address='local')
alice, bob = sf.PYU('alice'), sf.PYU('bob')
import pandas as pd
from secretflow.utils.simulation.datasets import dataset
df = pd.read_csv(dataset('bank_marketing'), sep=';')
我们假设Alice是一个新银行,他们只有用户的基本信息,和是否从其他银行购买过理财产品的label
alice_data = df[["age", "job", "marital", "education", "y"]]
alice_data
Bob端是一个老银行,他们有用户的账户余额,是否有房,是否有贷款,以及最近的营销反馈
bob_data = df[
[
"default",
"balance",
"housing",
"loan",
"contact",
"day",
"month",
"duration",
"campaign",
"pdays",
"previous",
"poutcome",
]
]
bob_data
创建联邦表
联邦表是一个跨多方的虚拟概念,我们定义 VDataFrame
用于垂直场景设置。
- 联邦表中各方的数据存储在本地,不允许出域。
- 除了拥有数据的一方之外,没有人可以访问数据存储。
- 联邦表的任何操作都会由driver调度给每个worker,执行指令会逐层传递,直到特定worker的Python Runtime。 框架确保只有当worker的 worker.device 和 Object.device 相同时,才能够操作数据。
- 联邦表旨在从中心角度管理和操作多方数据。
Federated Table
的接口与 pandas.DataFrame 对齐,以降低多方数据操作的成本。- SecretFlow 框架提供 Plain&Ciphertext (明密文)混合编程能力。垂直联邦表是使用
SPU
构建的,MPC-PSI
用于安全地获取来自各方的交集和对齐数据。
VDataFrame 提供类似于 pandas 的 read_csv 接口,不同之处在于secretflow.read_csv 接收一个定义双方数据路径的字典。我们可以使用 secretflow.vertical.read_csv
来构建 VDataFrame
。
from secretflow.data.split import train_test_split
from secretflow.ml.nn import SLModel
read_csv(file_dict,delimiter,ppu,keys,drop_key)
filepath: Path of the participant file. The address can be a relative or absolute path to a local file
spu: SPU Device for PSI; If this parameter is not specified, data must be prealigned
keys: Key for intersection.
spu = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob']))
from secretflow.utils.simulation.datasets import load_bank_marketing
# Alice has the first four features,
# while bob has the left features
data = load_bank_marketing(parts={alice: (0, 4), bob: (4, 16)}, axis=1)
# Alice holds the label.
label = load_bank_marketing(parts={alice: (16, 17)}, axis=1)
data 为构建好的垂直联邦表,它从全局上只拥有所有数据的 Schema
我们进一步来看一下VDF的数据管理
通过一个实例可以看出,age这个字段是属于alice的,所以在alice方的partition可以得到对应的列,但是bob方想要去获取age的时候会报`KeyError`错误。
这里有一个Partition的概念,是我们定义的一个数据分片,每个Partition都会有自己的device归属,只有归属的device才可以操作数据。
data['age'].partitions[alice].data
输出结果:
<secretflow.device.device.pyu.PYUObject at 0x7fd7b1e8cb20>
data['age'].partitions[bob]
会引发报错:
KeyError: PYURuntime(bob)
我们接着对生成的联邦表做数据预处理。
我们这里以LabelEncoder和MinMaxScaler为例,这两个预处理函数在`sklearn`中有对应的概念,它们的使用方法和sklearn中是类似的
from secretflow.preprocessing.scaler import MinMaxScaler
from secretflow.preprocessing.encoder import LabelEncoder
encoder = LabelEncoder()
data['job'] = encoder.fit_transform(data['job'])
data['marital'] = encoder.fit_transform(data['marital'])
data['education'] = encoder.fit_transform(data['education'])
data['default'] = encoder.fit_transform(data['default'])
data['housing'] = encoder.fit_transform(data['housing'])
data['loan'] = encoder.fit_transform(data['loan'])
data['contact'] = encoder.fit_transform(data['contact'])
data['poutcome'] = encoder.fit_transform(data['poutcome'])
data['month'] = encoder.fit_transform(data['month'])
label = encoder.fit_transform(label)
print(f"label= {type(label)},\ndata = {type(data)}")
通过MinMaxScaler做数据标准化
scaler = MinMaxScaler()
data = scaler.fit_transform(data)
接着我们将数据集划分成训练集(train-set)和测试集(test-set)
from secretflow.data.split import train_test_split
random_state = 1234
train_data, test_data = train_test_split(
data, train_size=0.8, random_state=random_state
)
train_label, test_label = train_test_split(
label, train_size=0.8, random_state=random_state
)
小结 :到这里为止,我们就完成了联邦表的定义,数据的预处理,以及训练集和测试集的划分。 SecretFlow框架定义了跨越多方的 联邦表 概念,同时定义了一套构建在联邦表上的操作(它在逻辑上对等 pandas.DataFrame) ,同时定义了对于联邦表的预处理操作(它在逻辑上对等 sklearn)。
模型构建
单方模型:
对于该任务一个基本的DNN就可以完成,输入16维特征,经过一个DNN网络,输出对于正负样本的概率。
多方模型:
- Alice:
- base_net:输入4维特征,经过一个dnn网络得到hidden.
- fuse_net:接收_alice,以及bob计算得到的hidden特征,输入这些特征到fuse_net,进行特征融合,送入之后的网络完成整个前向传播过程和反向传播过程。
- Bob:
- base_net:输入12维特征,经过一个dnn网络得到hidden,然后将hidden发送给alice方,完成接下来的运算。
def create_base_model(input_dim, output_dim, name='base_model'):
# Create model
def create_model():
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow as tf
model = keras.Sequential(
[
keras.Input(shape=input_dim),
layers.Dense(100, activation="relu"),
layers.Dense(output_dim, activation="relu"),
]
)
# Compile model
model.summary()
model.compile(
loss='binary_crossentropy',
optimizer='adam',
metrics=["accuracy", tf.keras.metrics.AUC()],
)
return model
return create_model
我们使用create_base_model分别为 Alice 和 Bob 创建他们的base model
# prepare model
hidden_size = 64
model_base_alice = create_base_model(4, hidden_size)
model_base_bob = create_base_model(12, hidden_size)
model_base_alice()
model_base_bob()
接下来我们定义有label的一方,或者server端的模型——fuse_model。在fuse_model的定义中,我们需要正确的定义loss,optimizer,metrics。
def create_fuse_model(input_dim, output_dim, party_nums, name='fuse_model'):
def create_model():
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow as tf
# input
input_layers = []
for i in range(party_nums):
input_layers.append(
keras.Input(
input_dim,
)
)
merged_layer = layers.concatenate(input_layers)
fuse_layer = layers.Dense(64, activation='relu')(merged_layer)
output = layers.Dense(output_dim, activation='sigmoid')(fuse_layer)
model = keras.Model(inputs=input_layers, outputs=output)
model.summary()
model.compile(
loss='binary_crossentropy',
optimizer='adam',
metrics=["accuracy", tf.keras.metrics.AUC()],
)
return model
return create_model
model_fuse = create_fuse_model(input_dim=hidden_size, party_nums=2, output_dim=1)
model_fuse()
我们需要三个参数来初始化SLModel
- base_model_dict:一个字典,它需要传入参与训练的所有client以及base_model映射。
- device_y:PYU对象,哪一方持有label
- model_fuse:融合模型,具体的优化器以及损失函数都在这个模型中进行定义
定义 base_model_dict
base_model_dict:Dict[PYU,model_fn]
from secretflow.security.privacy import DPStrategy, LabelDP
from secretflow.security.privacy.mechanism.tensorflow import GaussianEmbeddingDP
# Define DP operations
train_batch_size = 128
gaussian_embedding_dp = GaussianEmbeddingDP(
noise_multiplier=0.5,
l2_norm_clip=1.0,
batch_size=train_batch_size,
num_samples=train_data.values.partition_shape()[alice][0],
is_secure_generator=False,
)
label_dp = LabelDP(eps=64.0)
dp_strategy_alice = DPStrategy(label_dp=label_dp)
dp_strategy_bob = DPStrategy(embedding_dp=gaussian_embedding_dp)
dp_strategy_dict = {alice: dp_strategy_alice, bob: dp_strategy_bob}
dp_spent_step_freq = 10
sl_model = SLModel(
base_model_dict=base_model_dict,
device_y=alice,
model_fuse=model_fuse,
dp_strategy_dict=dp_strategy_dict,
)
sf.reveal(test_data.partitions[alice].data), sf.reveal(
test_label.partitions[alice].data
)
sf.reveal(train_data.partitions[alice].data), sf.reveal(
train_label.partitions[alice].data
)
history = sl_model.fit(
train_data,
train_label,
validation_data=(test_data, test_label),
epochs=10,
batch_size=train_batch_size,
shuffle=True,
verbose=1,
validation_freq=1,
dp_spent_step_freq=dp_spent_step_freq,
)
下面可视化训练过程
# Plot the change of loss during training
plt.plot(history['train_loss'])
plt.plot(history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Val'], loc='upper right')
plt.show()
# Plot the change of accuracy during training
plt.plot(history['train_accuracy'])
plt.plot(history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Val'], loc='upper left')
plt.show()
# Plot the Area Under Curve(AUC) of loss during training
plt.plot(history['train_auc_1'])
plt.plot(history['val_auc_1'])
plt.title('Model Area Under Curve')
plt.ylabel('Area Under Curve')
plt.xlabel('Epoch')
plt.legend(['Train', 'Val'], loc='upper left')
plt.show()
调用一下评估函数,看下训练效果怎么样
global_metric = sl_model.evaluate(test_data, test_label, batch_size=128)
(运行以上代码是对于性能有一定要求,可能碰到OutOfMemoryError)
下面对比下单方模型
1.数据
数据也使用 Kaggle 的反欺诈数据集。在这里,我们只使用 Alice 的新银行数据。(特征个数为四个)
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow as tf
from sklearn.model_selection import train_test_split
def create_model():
model = keras.Sequential(
[
keras.Input(shape=4),
layers.Dense(100, activation="relu"),
layers.Dense(64, activation='relu'),
layers.Dense(64, activation='relu'),
layers.Dense(1, activation='sigmoid'),
]
)
model.compile(
loss='binary_crossentropy',
optimizer='adam',
metrics=["accuracy", tf.keras.metrics.AUC()],
)
return model
single_model = create_model()
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
encoder = LabelEncoder()
single_part_data = alice_data.copy()
single_part_data['job'] = encoder.fit_transform(alice_data['job'])
single_part_data['marital'] = encoder.fit_transform(alice_data['marital'])
single_part_data['education'] = encoder.fit_transform(alice_data['education'])
single_part_data['y'] = encoder.fit_transform(alice_data['y'])
y = single_part_data['y']
alice_data = single_part_data.drop(columns=['y'], inplace=False)
scaler = MinMaxScaler()
alice_data = scaler.fit_transform(alice_data)
train_data, test_data = train_test_split(
alice_data, train_size=0.8, random_state=random_state
)
train_label, test_label = train_test_split(y, train_size=0.8, random_state=random_state)
alice_data.shape
2.训练
single_model.fit(
train_data,
train_label,
validation_data=(test_data, test_label),
batch_size=128,
epochs=10,
shuffle=False,
)
single_model.evaluate(test_data, test_label, batch_size=128)
小结
上面两个实验模拟了一个典型的垂直场景的训练问题,Alice和Bob拥有相同的样本群体,但每一方只有样本的一部分特征数据,如果Alice只用自己的一方数据来训练模型,能够得到一个准确率为0.583,auc 分数为0.53的模型,但是如果联合Bob的数据之后,可以获得一个准确率为0.893,auc分数为0.883的模型。