5.5 推荐中心逻辑
学习目标
- 目标
- 无
- 应用
- 无
5.5.1 推荐中心作用
推荐中一般作为整体召回结果读取与排序模型进行排序过程的作用,主要是产生推荐结果的部分。
5.5.2 推荐目录
- server目录为整个推荐中心建立的目录
- recall_service.:召回数据读取目录
- reco_centor:推荐中心逻辑代码
- redis_cache:推荐结果缓存目录
5.5.3 推荐中心刷新逻辑
-
根据时间戳
- 时间戳T小于HBASE历史推荐记录,则获取历史记录,返回该时间戳T上次的时间戳T-1
- 时间戳T大于HBASE历史推荐记录,则获取新推荐,则获取HBASE数据库中最近的一次时间戳
- 如果有缓存,从缓存中拿,并且写入推荐历史表中
- 如果没有缓存,就进行一次指定算法组合的召回结果读取,排序,然后写入待推荐wait_recommend中,其中推荐出去的放入历史推荐表中
-
HBASE 数据库表设计
- wait_recommend: 经过各种多路召回,排序之后的待推荐结果
- 只要刷新一次,没有缓存,才主动收集各种召回集合一起给wait_recommend写入,所以不用设置多个版本
- history_recommend: 每次真正推荐出去给用户的历史推荐结果列表
- 1、按照频道存储用户的历史推荐结果
- 2、需要保留多个版本,才需要建立版本信息
- wait_recommend: 经过各种多路召回,排序之后的待推荐结果
create 'wait_recommend', 'channel'
put 'wait_recommend', 'reco:1', 'channel:18', [17283, 140357, 14668, 15182, 17999, 13648, 12884, 17302, 13846, 18135]
put 'wait_recommend', 'reco:1', 'channel:0', [17283, 140357, 14668, 15182, 17999, 13648, 12884, 17302, 13846, 18135]
创建一个历史hbase结果
create 'history_recommend', {NAME=>'channel', TTL=>7776000, VERSIONS=>999999} 86400
# 每次指定一个时间戳,可以达到不同版本的效果
put 'history_recommend', 'reco:his:1', 'channel:18', [17283, 140357, 14668, 15182, 17999, 13648, 12884, 17302, 13846, 18135]
# 修改的时候必须指定family名称
hbase(main):084:0> alter 'history_recommend',NAME => 'channel', TTL => '7776000'
Updating all regions with the new schema...
1/1 regions updated.
Done.
Took 2.0578 seconds
alter 'history_recommend',NAME => 'channel', VERSIONS=>999999, TTL=>7776000
放入历史数据,存在时间戳,到时候取出历史数据就是每个用户的历史时间戳可以
get "history_recommend", 'reco:his:1', {COLUMN=>'channel:18',VERSIONS=>1000, TIMESTAMP=>1546242869000}
这里与上次召回cb_recall以及history_recall有不同用处:
- 过滤热门和新文章等推荐过的历史记录,history_recommend存入的是真正推荐过的历史记录
- history_recall只过滤召回的结果
5.5.4 feed流 推荐中心逻辑
- 目的:根据ABTest分流之后的用户,进行制定算法的召回和排序读取
- 步骤:
- 1、根据时间戳进行推荐逻辑判断
- 2、读取召回结果(无实时排序)
创建特征中心类:
import os
import sys
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(BASE_DIR))
import hashlib
from setting.default import RAParam
from server.utils import HBaseUtils
from server import pool
from server import recall_service
from datetime import datetime
import logging
import json
logger = logging.getLogger('recommend')
def add_track(res, temp):
"""
封装埋点参数
:param res: 推荐文章id列表
:param cb: 合并参数
:param rpc_param: rpc参数
:return: 埋点参数
文章列表参数
单文章参数
"""
# 添加埋点参数
track = {}
# 准备曝光参数
# 全部字符串形式提供,在hive端不会解析问题
_exposure = {"action": "exposure", "userId": temp.user_id, "articleId": json.dumps(res),
"algorithmCombine": temp.algo}
track['param'] = json.dumps(_exposure)
track['recommends'] = []
# 准备其它点击参数
for _id in res:
# 构造字典
_dic = {}
_dic['article_id'] = _id
_dic['param'] = {}
# 准备click参数
_p = {"action": "click", "userId": temp.user_id, "articleId": str(_id),
"algorithmCombine": temp.algo}
_dic['param']['click'] = json.dumps(_p)
# 准备collect参数
_p["action"] = 'collect'
_dic['param']['collect'] = json.dumps(_p)
# 准备share参数
_p["action"] = 'share'
_dic['param']['share'] = json.dumps(_p)
# 准备detentionTime参数
_p["action"] = 'read'
_dic['param']['read'] = json.dumps(_p)
track['recommends'].append(_dic)
track['timestamp'] = temp.time_stamp
return track
class RecoCenter(object):
"""推荐中心
"""
def __init__(self):
self.hbu = HBaseUtils(pool)
self.recall_service = recall_service.ReadRecall()
1、增加feed_recommend_logic函数,进行时间戳逻辑判断
- 传入temp ABTest中的获取的参数
- 根据时间戳
- 时间戳T小于HBASE历史推荐记录,则获取历史记录,返回该时间戳T上次的时间戳T-1
- 时间戳T大于HBASE历史推荐记录,则获取新推荐,则获取HBASE数据库中最近的一次时间戳
- 根据时间戳
获取这个用户该频道的历史结果
# 判断用请求的时间戳大小决定获取历史记录还是刷新推荐文章
try:
last_stamp = self.hbu.get_table_row('history_recommend', 'reco:his:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode(), include_timestamp=True)[
1]
logger.info("{} INFO get user_id:{} channel:{} history last_stamp".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
except Exception as e:
logger.warning("{} WARN read history recommend exception:{}".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e))
last_stamp = 0
如果历史时间戳最近的一次小于用户请求时候的时间戳,Hbase的时间戳是time.time() * 1000这个值的大小,与Web后台传入的一样类型,如果Web后台传入的不是改大小,注意修改
- 然后返回推荐结果以及此次请求的上一次时间戳
- 用于用户获取历史记录
if last_stamp < temp.time_stamp:
# 1、获取缓存
# res = redis_cache.get_reco_from_cache(temp, self.hbu)
#
# # 如果没有,然后走一遍算法推荐 召回+排序,同时写入到hbase待推荐结果列表
# if not res:
# logger.info("{} INFO get user_id:{} channel:{} recall/sort data".
# format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
#
# res = self.user_reco_list(temp)
# 2、直接拿推荐结果
res = self.user_reco_list(temp)
temp.time_stamp = int(last_stamp)
track = add_track(res, temp)
如果历史时间戳大于用户请求的这次时间戳,那么就是在获取历史记录,用户请求的历史时间戳是具体某个历史记录的时间戳T,Hbase当中不能够直接用T去获取,而需要去TT>T的时间戳获取,才能拿到包含T时间的结果,并且使用get_table_cells去获取
- 分以下情况考虑
- 1、如果没有历史数据,返回时间戳0以及结果空列表
- 2、如果历史数据只有一条,返回这一条历史数据以及时间戳正好为请求时间戳,修改时间戳为0,表示后面请求以后就没有历史数据了(APP的行为就是翻历史记录停止了)
- 3、如果历史数据多条,返回最近的第一条历史数据,然后返回之后第二条历史数据的时间戳
else:
logger.info("{} INFO read user_id:{} channel:{} history recommend data".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
try:
row = self.hbu.get_table_cells('history_recommend',
'reco:his:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode(),
timestamp=temp.time_stamp + 1,
include_timestamp=True)
except Exception as e:
logger.warning("{} WARN read history recommend exception:{}".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e))
row = []
res = []
# 1、如果没有历史数据,返回时间戳0以及结果空列表
# 2、如果历史数据只有一条,返回这一条历史数据以及时间戳正好为请求时间戳,修改时间戳为0
# 3、如果历史数据多条,返回最近一条历史数据,然后返回
if not row:
temp.time_stamp = 0
res = []
elif len(row) == 1 and row[0][1] == temp.time_stamp:
res = eval(row[0][0])
temp.time_stamp = 0
elif len(row) >= 2:
res = eval(row[0][0])
temp.time_stamp = int(row[1][1])
res = list(map(int, res))
logger.info(
"{} INFO history:{}, {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), res, temp.time_stamp))
track = add_track(res, temp)
# 曝光参数设置为空
track['param'] = ''
return track
- 完整代码:
def feed_recommend_logic(self, temp):
"""推荐流业务逻辑
:param temp:ABTest传入的业务请求参数
"""
# 判断用请求的时间戳大小决定获取历史记录还是刷新推荐文章
try:
last_stamp = self.hbu.get_table_row('history_recommend', 'reco:his:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode(), include_timestamp=True)[1]
logger.info("{} INFO get user_id:{} channel:{} history last_stamp".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
except Exception as e:
logger.warning("{} WARN read history recommend exception:{}".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e))
last_stamp = 0
# 如果小于,走一遍正常的推荐流程,缓存或者召回排序
logger.info("{} INFO history last_stamp:{},temp.time_stamp:{}".
format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), last_stamp, temp.time_stamp))
if last_stamp < temp.time_stamp:
# 获取
res = redis_cache.get_reco_from_cache(temp, self.hbu)
# 如果没有,然后走一遍算法推荐 召回+排序,同时写入到hbase待推荐结果列表
if not res:
logger.info("{} INFO get user_id:{} channel:{} recall/sort data".
format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
res = self.user_reco_list(temp)
temp.time_stamp = int(last_stamp)
track = add_track(res, temp)
else:
logger.info("{} INFO read user_id:{} channel:{} history recommend data".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
try:
row = self.hbu.get_table_cells('history_recommend',
'reco:his:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode(),
timestamp=temp.time_stamp + 1,
include_timestamp=True)
except Exception as e:
logger.warning("{} WARN read history recommend exception:{}".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e))
row = []
res = []
# 1、如果没有历史数据,返回时间戳0以及结果空列表
# 2、如果历史数据只有一条,返回这一条历史数据以及时间戳正好为请求时间戳,修改时间戳为0
# 3、如果历史数据多条,返回最近一条历史数据,然后返回
if not row:
temp.time_stamp = 0
res = []
elif len(row) == 1 and row[0][1] == temp.time_stamp:
res = eval(row[0][0])
temp.time_stamp = 0
elif len(row) >= 2:
res = eval(row[0][0])
temp.time_stamp = int(row[1][1])
res = list(map(int, res))
logger.info(
"{} INFO history:{}, {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), res, temp.time_stamp))
track = add_track(res, temp)
# 曝光参数设置为空
track['param'] = ''
return track
修改ABTest中的推荐调用
from server.reco_center import RecoCenter
# 推荐
track = RecoCenter().feed_recommend_logic(temp)
获取多路召回结果,过滤历史记录逻辑
user_reco_list
- 1、循环算法组合参数,遍历不同召回结果进行过滤
reco_set = []
# 1、循环算法组合参数,遍历不同召回结果进行过滤
for _num in RAParam.COMBINE[temp.algo][1]:
# 进行每个召回结果的读取100,101,102,103,104
if _num == 103:
# 新文章召回读取
_res = self.recall_service.read_redis_new_article(temp.channel_id)
reco_set = list(set(reco_set).union(set(_res)))
elif _num == 104:
# 热门文章召回读取
_res = self.recall_service.read_redis_hot_article(temp.channel_id)
reco_set = list(set(reco_set).union(set(_res)))
else:
_res = self.recall_service.\
read_hbase_recall_data(RAParam.RECALL[_num][0],
'recall:user:{}'.format(temp.user_id).encode(),
'{}:{}'.format(RAParam.RECALL[_num][1], temp.channel_id).encode())
# 进行合并某个协同过滤召回的结果
reco_set = list(set(reco_set).union(set(_res)))
- 2、过滤当前该请求频道推荐历史结果,如果不是0频道需要过滤0频道推荐结果,防止出现
- 比如Python频道和0频道相同的推荐结果
# reco_set都是新推荐的结果,进行过滤
history_list = []
try:
data = self.hbu.get_table_cells('history_recommend',
'reco:his:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode())
for _ in data:
history_list = list(set(history_list).union(set(eval(_))))
logger.info("{} INFO filter user_id:{} channel:{} history data".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
except Exception as e:
logger.warning(
"{} WARN filter history article exception:{}".format(datetime.now().
strftime('%Y-%m-%d %H:%M:%S'), e))
# 如果0号频道有历史记录,也需要过滤
try:
data = self.hbu.get_table_cells('history_recommend',
'reco:his:{}'.format(temp.user_id).encode(),
'channel:{}'.format(0).encode())
for _ in data:
history_list = list(set(history_list).union(set(eval(_))))
logger.info("{} INFO filter user_id:{} channel:{} history data".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, 0))
except Exception as e:
logger.warning(
"{} WARN filter history article exception:{}".format(datetime.now().
strftime('%Y-%m-%d %H:%M:%S'), e))
# 过滤操作 reco_set 与history_list进行过滤
reco_set = list(set(reco_set).difference(set(history_list)))
- 3、过滤之后,推荐出去指定个数的文章列表,写入历史记录,剩下多的写入待推荐结果
# 如果没有内容,直接返回
if not reco_set:
return reco_set
else:
# 类型进行转换
reco_set = list(map(int, reco_set))
# 跟后端需要推荐的文章数量进行比对 article_num
# article_num > reco_set
if len(reco_set) <= temp.article_num:
res = reco_set
else:
# 之取出推荐出去的内容
res = reco_set[:temp.article_num]
# 剩下的推荐结果放入wait_recommend等待下次帅新的时候直接推荐
self.hbu.get_table_put('wait_recommend',
'reco:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode(),
str(reco_set[temp.article_num:]).encode(),
timestamp=temp.time_stamp)
logger.info(
"{} INFO put user_id:{} channel:{} wait data".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
# 放入历史记录表当中
self.hbu.get_table_put('history_recommend',
'reco:his:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode(),
str(res).encode(),
timestamp=temp.time_stamp)
# 放入历史记录日志
logger.info(
"{} INFO store recall/sorted user_id:{} channel:{} history_recommend data".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
return res
修改调用读取召回数据的部分
# 2、不开启缓存
res = self.user_reco_list(temp)
temp.time_stamp = int(last_stamp)
track = add_track(res, temp)
运行grpc服务之后,测试结果
hbase(main):007:0> get "history_recommend", 'reco:his:1115629498121846784', {COLUMN=>'channel:18',VERSIONS=>1000}
COLUMN CELL
channel:18 timestamp=1558189615378, value=[13890, 14915, 13891, 15429, 15944, 44371, 18
005, 15196, 13410, 13672]
channel:18 timestamp=1558189317342, value=[17966, 17454, 14125, 16174, 14899, 44339, 16
437, 18743, 44090, 18238]
channel:18 timestamp=1558143073173, value=[19200, 17665, 16151, 16411, 19233, 13090, 15
140, 16421, 19494, 14381]
待推荐表中有
hbase(main):008:0> scan 'wait_recommend'
ROW COLUMN+CELL
reco:1115629498121846784 column=channel:18, timestamp=1558189615378, value=[44137, 18795, 19052, 4465
2, 44654, 44657, 14961, 17522, 43894, 44412, 16000, 14208, 44419, 17802, 142
23, 18836, 140956, 18335, 13728, 14498, 44451, 44456, 18609, 18353, 44468, 1
8103, 135869, 16062, 14015, 13757, 13249, 44483, 17605, 14021, 15309, 18127,
43983, 44754, 43986, 19413, 14805, 18904, 44761, 17114, 13272, 14810, 18907
, 13022, 14300, 17120, 17632, 14299, 43997, 17889, 17385, 18156, 15085, 1329
5, 44020, 14839, 44024, 14585, 18172, 44541]
完整代码:
def user_reco_list(self, temp):
"""
获取用户的召回结果进行推荐
:param temp:
:return:
"""
reco_set = []
# 1、循环算法组合参数,遍历不同召回结果进行过滤
for _num in RAParam.COMBINE[temp.algo][1]:
# 进行每个召回结果的读取100,101,102,103,104
if _num == 103:
# 新文章召回读取
_res = self.recall_service.read_redis_new_article(temp.channel_id)
reco_set = list(set(reco_set).union(set(_res)))
elif _num == 104:
# 热门文章召回读取
_res = self.recall_service.read_redis_hot_article(temp.channel_id)
reco_set = list(set(reco_set).union(set(_res)))
else:
_res = self.recall_service.\
read_hbase_recall_data(RAParam.RECALL[_num][0],
'recall:user:{}'.format(temp.user_id).encode(),
'{}:{}'.format(RAParam.RECALL[_num][1], temp.channel_id).encode())
# 进行合并某个协同过滤召回的结果
reco_set = list(set(reco_set).union(set(_res)))
# reco_set都是新推荐的结果,进行过滤
history_list = []
try:
data = self.hbu.get_table_cells('history_recommend',
'reco:his:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode())
for _ in data:
history_list = list(set(history_list).union(set(eval(_))))
logger.info("{} INFO filter user_id:{} channel:{} history data".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
except Exception as e:
logger.warning(
"{} WARN filter history article exception:{}".format(datetime.now().
strftime('%Y-%m-%d %H:%M:%S'), e))
# 如果0号频道有历史记录,也需要过滤
try:
data = self.hbu.get_table_cells('history_recommend',
'reco:his:{}'.format(temp.user_id).encode(),
'channel:{}'.format(0).encode())
for _ in data:
history_list = list(set(history_list).union(set(eval(_))))
logger.info("{} INFO filter user_id:{} channel:{} history data".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, 0))
except Exception as e:
logger.warning(
"{} WARN filter history article exception:{}".format(datetime.now().
strftime('%Y-%m-%d %H:%M:%S'), e))
# 过滤操作 reco_set 与history_list进行过滤
reco_set = list(set(reco_set).difference(set(history_list)))
# 排序代码逻辑
# _sort_num = RAParam.COMBINE[temp.algo][2][0]
# reco_set = sort_dict[RAParam.SORT[_sort_num]](reco_set, temp, self.hbu)
# 如果没有内容,直接返回
if not reco_set:
return reco_set
else:
# 类型进行转换
reco_set = list(map(int, reco_set))
# 跟后端需要推荐的文章数量进行比对 article_num
# article_num > reco_set
if len(reco_set) <= temp.article_num:
res = reco_set
else:
# 之取出推荐出去的内容
res = reco_set[:temp.article_num]
# 剩下的推荐结果放入wait_recommend等待下次帅新的时候直接推荐
self.hbu.get_table_put('wait_recommend',
'reco:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode(),
str(reco_set[temp.article_num:]).encode(),
timestamp=temp.time_stamp)
logger.info(
"{} INFO put user_id:{} channel:{} wait data".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
# 放入历史记录表当中
self.hbu.get_table_put('history_recommend',
'reco:his:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode(),
str(res).encode(),
timestamp=temp.time_stamp)
# 放入历史记录日志
logger.info(
"{} INFO store recall/sorted user_id:{} channel:{} history_recommend data".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
return res