8.5 排序模型进阶-Wide&Deep
学习目标
- 目标
- 无
- 应用
- 无
8.5.1 wide&deep
- Wide部分的输入特征:
- raw input features and transformed features
- notice: W&D这里的cross-product transformation:
- 只在离散特征之间做组合,不管是文本策略型的,还是离散值的;没有连续值特征的啥事,至少在W&D的paper里面是这样使用的。
- Deep部分的输入特征:
- raw input+embeding处理
- 对非连续值之外的特征做embedding处理,这里都是策略特征,就是乘以个embedding-matrix。在TensorFlow里面的接口是:tf.feature_column.embedding_column,默认trainable=True.
- 对连续值特征的处理是:将其按照累积分布函数P(X≤x),压缩至[0,1]内。
- notice:** Wide部分用FTRL+L1来训练;Deep部分用AdaGrad来训练。
- Wide&Deep在TensorFlow里面的API接口为:tf.estimator.DNNLinearCombinedClassifier
代码:
import tensorflow as tf
from tensorflow.python import keras
class WDL(object):
"""wide&deep模型
"""
def __init__(self):
pass
@staticmethod
def read_ctr_records():
# 定义转换函数,输入时序列化的
def parse_tfrecords_function(example_proto):
features = {
"label": tf.FixedLenFeature([], tf.int64),
"feature": tf.FixedLenFeature([], tf.string)
}
parsed_features = tf.parse_single_example(example_proto, features)
feature = tf.decode_raw(parsed_features['feature'], tf.float64)
feature = tf.reshape(tf.cast(feature, tf.float32), [1, 121])
# 特征顺序 1 channel_id, 100 article_vector, 10 user_weights, 10 article_weights
# 1 channel_id类别型特征, 100维文章向量求平均值当连续特征,10维用户权重求平均值当连续特征
channel_id = tf.cast(tf.slice(feature, [0, 0], [1, 1]), tf.int32)
vector = tf.reduce_sum(tf.slice(feature, [0, 1], [1, 100]), axis=1, keep_dims=True)
user_weights = tf.reduce_sum(tf.slice(feature, [0, 101], [1, 10]), axis=1, keep_dims=True)
article_weights = tf.reduce_sum(tf.slice(feature, [0, 111], [1, 10]), axis=1, keep_dims=True)
label = tf.reshape(tf.cast(parsed_features['label'], tf.float32), [1, 1])
# 构造字典 名称-tensor
FEATURE_COLUMNS = ['channel_id', 'vector', 'user_weigths', 'article_weights']
tensor_list = [channel_id, vector, user_weights, article_weights]
feature_dict = dict(zip(FEATURE_COLUMNS, tensor_list))
return feature_dict, label
dataset = tf.data.TFRecordDataset(["./train_ctr_201905.tfrecords"])
dataset = dataset.map(parse_tfrecords_function)
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.repeat(10000)
return dataset.make_one_shot_iterator().get_next()
def build_estimator(self):
"""建立模型
:param dataset:
:return:
"""
# 离散分类
article_id = tf.feature_column.categorical_column_with_identity('channel_id', num_buckets=25)
# 连续类型
vector = tf.feature_column.numeric_column('vector')
user_weigths = tf.feature_column.numeric_column('user_weigths')
article_weights = tf.feature_column.numeric_column('article_weights')
wide_columns = [article_id]
# embedding_column用来表示类别型的变量
deep_columns = [tf.feature_column.embedding_column(article_id, dimension=25),
vector, user_weigths, article_weights]
estimator = tf.estimator.DNNLinearCombinedClassifier(model_dir="./ckpt/wide_and_deep",
linear_feature_columns=wide_columns,
dnn_feature_columns=deep_columns,
dnn_hidden_units=[1024, 512, 256])
return estimator
if __name__ == '__main__':
wdl = WDL()
# dataset = wdl.read_ctr_records()
# lwf.train(dataset)
# lwf.train_v2(dataset)
estimator = wdl.build_estimator()
estimator.train(input_fn=wdl.read_ctr_records, steps=10000)
# eval_result = estimator.evaluate(input_fn=wdl.read_ctr_records, steps=10000)
# print(eval_result)
WDL模型导出
3.2 线上预估
线上流量是模型效果的试金石。离线训练好的模型只有参与到线上真实流量预估,才能发挥其价值。在演化的过程中,我们开发了一套稳定可靠的线上预估体系,提高了模型迭代的效率。
模型同步
我们开发了一个高可用的同步组件:用户只需要提供线下训练好的模型的HDFS路径,该组件会自动同步到线上服务机器上。该组件基于HTTPFS实现,它是美团离线计算组提供的HDFS的HTTP方式访问接口。同步过程如下:
- 同步前,检查模型md5文件,只有该文件更新了,才需要同步。
- 同步时,随机链接HTTPFS机器并限制下载速度。
- 同步后,校验模型文件md5值并备份旧模型。
同步过程中,如果发生错误或者超时,都会触发报警并重试。依赖这一组件,我们实现了在2min内可靠的将模型文件同步到线上。
模型计算
当前我们线上有两套并行的预估计算服务。
基于TF Serving的模型服务
TF Serving是TensorFlow官方提供的一套用于在线实时预估的框架。它的突出优点是:和TensorFlow无缝链接,具有很好的扩展性。使用TF serving可以快速支持RNN、LSTM、GAN等多种网络结构,而不需要额外开发代码。这非常有利于我们模型快速实验和迭代。
使用这种方式,线上服务需要将特征发送给TF Serving,这不可避免引入了网络IO,给带宽和预估时延带来压力。我们尝试了以下优化,效果显著。
- 并发请求。一个请求会召回很多符合条件的广告。在客户端多个广告并发请求TF Serving,可以有效降低整体预估时延。
- 特征ID化。通过将字符串类型的特征名哈希到64位整型空间,可以有效减少传输的数据量,降低使用的带宽。
导出代码:
# 导出serving_model
wide_columns = [tf.feature_column.categorical_column_with_identity('channel_id', num_buckets=25)]
deep_columns = [tf.feature_column.embedding_column(tf.feature_column.categorical_column_with_identity('channel_id', num_buckets=25), dimension=25),
tf.feature_column.numeric_column('vector'),
tf.feature_column.numeric_column('user_weigths'),
tf.feature_column.numeric_column('article_weights')
]
columns = wide_columns + deep_columns
feature_spec = tf.feature_column.make_parse_example_spec(columns)
serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
estimator.export_savedmodel("./serving_model/wdl/", serving_input_receiver_fn)