MaskNet 这个模型是微博21年提出的,23年twitter(X)开源的推荐系统排序模块使用的backbone结构。
核心思想是认为DNN为主的特征交叉是addictive,交叉效率不高;所以设计了一种multiplicatvie的特征交叉
如何设计muliplicative特征交叉呢?
1)首先设计了一个instance-guide-mask,下图是instance-guide-mask的设计,其实就是两层feed-forward-layer,第一层把原始输入维度扩增,第二层再还原回去,总结而言,就是这个公式:
V
m
a
s
k
=
W
d
2
(
R
e
l
u
(
W
d
1
V
e
m
b
+
β
d
1
)
)
+
β
d
2
V_{mask} = W_{d2}(Relu(W_{d1} V_{emb} + \beta_{d1})) + \beta_{d2}
Vmask=Wd2(Relu(Wd1Vemb+βd1))+βd2
𝑉
𝑒
𝑚
𝑏
∈
R
𝑚
=
𝑓
×
𝑘
𝑉_{𝑒𝑚𝑏}∈ R^{𝑚=𝑓 ×𝑘}
Vemb∈Rm=f×k 输入的embedding结果, f是输入特征的数量,k是特征embedding维度。
最终输出的是一个处理后的embedding向量,后面简称为mask
2) 这个embedding得到后,怎么使用呢,这个就是MaskBlock干的事情。
主要有两种使用,一个是对embedding进行处理,图里LN-EMB里的LN指的是Layer Normalization
V
𝑚
𝑎
𝑠
𝑘
𝑒
𝑑
𝐸
𝑀
𝐵
=
V
𝑚
𝑎
𝑠
𝑘
⊙
L
N
_
E
M
B
(
V
𝑒
𝑚
b
)
V_{𝑚𝑎𝑠𝑘𝑒𝑑𝐸𝑀𝐵} = V_{𝑚𝑎𝑠𝑘} ⊙ LN\_EMB(V_{𝑒𝑚b})
VmaskedEMB=Vmask⊙LN_EMB(Vemb), 把mask的结果和LN-EMB 进行element-wide product, 然后在接一个linear,LN后应用在Relu做下非线性激活,这个就是MaskBLock的全部了, 总结成一个公式:
V
o
u
t
p
u
t
=
L
N
_
H
I
D
(
W
i
V
m
a
s
k
e
d
E
M
B
)
=
R
e
L
U
(
L
N
(
W
i
(
V
m
a
s
k
⊙
L
N
_
E
M
B
(
V
𝑒
𝑚
b
)
)
)
V_{output} = LN\_HID(W_i V_{maskedEMB}) = ReLU(LN(W_i (V_{mask} ⊙ LN\_EMB(V_{𝑒𝑚b})))
Voutput=LN_HID(WiVmaskedEMB)=ReLU(LN(Wi(Vmask⊙LN_EMB(Vemb)))
除了对Embedding进行element-wide-product,还可以对神经网络的输出再和mask做一次处理,这个就是另一种mask的应用方式:
V
o
u
t
p
u
t
=
L
N
_
H
I
D
(
W
i
V
m
a
s
k
d
H
I
D
)
=
R
e
L
U
(
L
N
(
W
i
(
V
m
a
s
k
⊙
V
o
u
t
p
u
t
p
)
)
)
V_{output} = LN\_HID(W_i V_{maskdHID}) = ReLU(LN(W_i(V_{mask} ⊙ V_{output}^p)))
Voutput=LN_HID(WiVmaskdHID)=ReLU(LN(Wi(Vmask⊙Voutputp)))
1) 2) 结束之后,文章的核心内容也基本结束,后面3)是MaskBlock的应用
3)MaskNet
所有特征都和Instance-guide-mask进行运算,可以是串行的也可以是并行的。
串行的第一个是一个MaskBlock on feature embedding,后面接的都是MaskBlock on MaskBlock;
并行的比较简单,每一个都是一个MaskBlock on feature embedding,然后concat到一起
二 Implementation
1)torch 代码实现,摘录自twitter开源代码:
def _init_weights(module):
if isinstance(module, torch.nn.Linear):
torch.nn.init.xavier_uniform_(module.weight)
torch.nn.init.constant_(module.bias, 0)
class MaskBlock(torch.nn.Module):
def __init__(self,
mask_block_config: config.MaskBlockConfig,
input_dim: int,
mask_input_dim: int) -> None:
super(MaskBlock, self).__init__()
self.mask_block_config = mask_block_config
output_size = mask_block_config.output_size
if mask_block_config.input_layer_norm:
# twitter实现的这里layer normalization做了可配置的
self._input_layer_norm = torch.nn.LayerNorm(input_dim)
else:
self._input_layer_norm = None
# instace-guide-mask第一层aggregation的神经元数量配置
# 如果指定了压缩量,就是input * 压缩量;如果没有,那么也可以手动指定大小
if mask_block_config.reduction_factor:
aggregation_size = int(mask_input_dim * mask_block_config.reduction_factor)
elif mask_block_config.aggregation_size is not None:
aggregation_size = mask_block_config.aggregation_size
else:
raise ValueError("Need one of reduction factor or aggregation size.")
# instance-guide-mask is here
# 两层Linear
self._mask_layer = torch.nn.Sequential(
torch.nn.Linear(mask_input_dim, aggregation_size),
torch.nn.ReLU(),
torch.nn.Linear(aggregation_size, input_dim),
)
# 参数初始化
self._mask_layer.apply(_init_weights)
self._hidden_layer = torch.nn.Linear(input_dim, output_size)
self._hidden_layer.apply(_init_weights)
self._layer_norm = torch.nn.LayerNorm(output_size)
def forward(self, net: torch.Tensor, mask_input: torch.Tensor):
# LN
if self._input_layer_norm:
net = self._input_layer_norm(net)
# self._mask_layer(mask_input)-- V_mask
# net * V_mask
hidden_layer_output = self._hidden_layer(net * self._mask_layer(mask_input))
return self._layer_norm(hidden_layer_output)
class MaskNet(torch.nn.Module):
def __init__(self,
mask_net_config: config.MaskNetConfig,
in_features: int):
super().__init__()
self.mask_net_config = mask_net_config
mask_blocks = []
if mask_net_config.use_parallel:
total_output_mask_blocks = 0
# 从local_prod参数看,用了4个block
for mask_block_config in mask_net_config.mask_blocks:
mask_blocks.append(MaskBlock(mask_block_config, in_features, in_features))
total_output_mask_blocks += mask_block_config.output_size
self._mask_blocks = torch.nn.ModuleList(mask_blocks)
else:
input_size = in_features
for mask_block_config in mask_net_config.mask_blocks:
mask_blocks.append(MaskBlock(mask_block_config, input_size, in_features))
input_size = mask_block_config.output_size
self._mask_blocks = torch.nn.ModuleList(mask_blocks)
total_output_mask_blocks = mask_block_config.output_size
if mask_net_config.mlp:
self._dense_layers = mlp.Mlp(total_output_mask_blocks, mask_net_config.mlp)
self.out_features = mask_net_config.mlp.layer_sizes[-1]
else:
self.out_features = total_output_mask_blocks
self.shared_size = total_output_mask_blocks
def forward(self, inputs: torch.Tensor):
if self.mask_net_config.use_parallel:
# 并行化的网络结构实现
mask_outputs = []
# 对于多个Block,每一个block输入都是一样,只是其中学习到的参数有所不同
for mask_layer in self._mask_blocks:
# mask_input,net 都是inputs
mask_outputs.append(mask_layer(mask_input=inputs, net=inputs))
# Share the outputs of the MaskBlocks.
all_mask_outputs = torch.cat(mask_outputs, dim=1)
# 最终输出处理
output = (
all_mask_outputs
if self.mask_net_config.mlp is None
else self._dense_layers(all_mask_outputs)["output"])
return {"output": output, "shared_layer": all_mask_outputs}
else:
# 串行
net = inputs
for mask_layer in self._mask_blocks:
# mask_input 是inputs,net输入是上一层的输出
net = mask_layer(net=net, mask_input=inputs)
# Share the output of the stacked MaskBlocks.
output = net if self.mask_net_config.mlp is None else self._dense_layers[net]["output"]
return {"output": output, "shared_layer": net}
2)tensorflow实现
摘录自EasyRec(阿里开源推荐工具)
# Copyright (c) Alibaba, Inc. and its affiliates.
import tensorflow as tf
from tensorflow.python.keras.layers import Activation
from tensorflow.python.keras.layers import Dense
from tensorflow.python.keras.layers import Layer
from easy_rec.python.layers.keras.blocks import MLP
from easy_rec.python.layers.keras.layer_norm import LayerNormalization
from easy_rec.python.layers.utils import Parameter
class MaskBlock(Layer):
"""MaskBlock use in MaskNet.
Args:
projection_dim: project dimension to reduce the computational cost.
Default is `None` such that a full (`input_dim` by `aggregation_size`) matrix
W is used. If enabled, a low-rank matrix W = U*V will be used, where U
is of size `input_dim` by `projection_dim` and V is of size
`projection_dim` by `aggregation_size`. `projection_dim` need to be smaller
than `aggregation_size`/2 to improve the model efficiency. In practice, we've
observed that `projection_dim` = d/4 consistently preserved the
accuracy of a full-rank version.
"""
def __init__(self, params, name='mask_block', reuse=None, **kwargs):
super(MaskBlock, self).__init__(name=name, **kwargs)
self.config = params.get_pb_config()
self.l2_reg = params.l2_regularizer
self._projection_dim = params.get_or_default('projection_dim', None)
self.reuse = reuse
self.final_relu = Activation('relu', name='relu')
def build(self, input_shape):
if type(input_shape) in (tuple, list):
assert len(input_shape) >= 2, 'MaskBlock must has at least two inputs'
input_dim = int(input_shape[0][-1])
mask_input_dim = int(input_shape[1][-1])
else:
input_dim, mask_input_dim = input_shape[-1], input_shape[-1]
# 这里实现和pytorch一样
if self.config.HasField('reduction_factor'):
aggregation_size = int(mask_input_dim * self.config.reduction_factor)
elif self.config.HasField('aggregation_size') is not None:
aggregation_size = self.config.aggregation_size
else:
raise ValueError('Need one of reduction factor or aggregation size for MaskBlock.')
# instance-guide-mask第一层
self.aggr_layer = Dense(
aggregation_size,
activation='relu',
kernel_initializer='he_uniform',
kernel_regularizer=self.l2_reg,
name='aggregation')
# instance-guide-mask第二层
self.weight_layer = Dense(input_dim, name='weights')
# 对比pytorch实现,增加了projection_dim, 低秩矩阵(详见DCN)
if self._projection_dim is not None:
logging.info('%s project dim is %d', self.name, self._projection_dim)
self.project_layer = Dense(
self._projection_dim,
kernel_regularizer=self.l2_reg,
use_bias=False,
name='project')
if self.config.input_layer_norm:
# 推荐在调用MaskBlock之前做好 layer norm,否则每一次调用都需要对input做ln
if tf.__version__ >= '2.0':
self.input_layer_norm = tf.keras.layers.LayerNormalization(
name='input_ln')
else:
self.input_layer_norm = LayerNormalization(name='input_ln')
if self.config.HasField('output_size'):
self.output_layer = Dense(
self.config.output_size, use_bias=False, name='output')
# tensorflow遗留问题,兼容1/2
if tf.__version__ >= '2.0':
self.output_layer_norm = tf.keras.layers.LayerNormalization(
name='output_ln')
else:
self.output_layer_norm = LayerNormalization(name='output_ln')
super(MaskBlock, self).build(input_shape)
def call(self, inputs, training=None, **kwargs):
if type(inputs) in (tuple, list):
net, mask_input = inputs[:2]
else:
net, mask_input = inputs, inputs
# LN
if self.config.input_layer_norm:
net = self.input_layer_norm(net)
# tensorflow实现aggregate层和projection层是分开的,上面pytorch是用一个sequence
if self._projection_dim is None:
aggr = self.aggr_layer(mask_input)
else:
u = self.project_layer(mask_input)
aggr = self.aggr_layer(u)
# 得到mask结果
weights = self.weight_layer(aggr)
# elemnet-wide product
masked_net = net * weights
if not self.config.HasField('output_size'):
return masked_net
# 最终处理,一个Liner+layer norm层
hidden = self.output_layer(masked_net)
ln_hidden = self.output_layer_norm(hidden)
return self.final_relu(ln_hidden)
class MaskNet(Layer):
def __init__(self, params, name='mask_net', reuse=None, **kwargs):
super(MaskNet, self).__init__(name=name, **kwargs)
self.reuse = reuse
self.params = params
self.config = params.get_pb_config()
if self.config.HasField('mlp'):
p = Parameter.make_from_pb(self.config.mlp)
p.l2_regularizer = params.l2_regularizer
self.mlp = MLP(p, name='mlp', reuse=reuse)
else:
self.mlp = None
self.mask_layers = []
for i, block_conf in enumerate(self.config.mask_blocks):
params = Parameter.make_from_pb(block_conf)
params.l2_regularizer = self.params.l2_regularizer
mask_layer = MaskBlock(params, name='block_%d' % i, reuse=self.reuse)
self.mask_layers.append(mask_layer)
if self.config.input_layer_norm:
if tf.__version__ >= '2.0':
self.input_layer_norm = tf.keras.layers.LayerNormalization(
name='input_ln')
else:
self.input_layer_norm = LayerNormalization(name='input_ln')
def call(self, inputs, training=None, **kwargs):
# 与pytorch版本对比,对输入也进行了一次layer norm
if self.config.input_layer_norm:
inputs = self.input_layer_norm(inputs)
# 下面的并行/串行实现逻辑无差
if self.config.use_parallel:
mask_outputs = [
mask_layer((inputs, inputs)) for mask_layer in self.mask_layers
]
all_mask_outputs = tf.concat(mask_outputs, axis=1)
if self.mlp is not None:
output = self.mlp(all_mask_outputs, training=training)
else:
output = all_mask_outputs
return output
else:
net = inputs
for i, _ in enumerate(self.config.mask_blocks):
mask_layer = self.mask_layers[i]
net = mask_layer((net, inputs))
if self.mlp is not None:
output = self.mlp(net, training=training)
else:
output = net
return output
Reference:
MaskNet: Introducing Feature-Wise Multiplication to CTR Ranking Models by Instance-Guided Mask
tesorflow实现
twitter-alg-ml