离线召回与排序介绍

news2025/1/10 3:24:21

3.3 离线召回与排序介绍

学习目标

  • 目标
    • 了解召回排序作用
    • 知道头条推荐召回排序设计
  • 应用

3.3.1 召回与排序介绍

召回:从海量文章数据中得到若干候选文章召回集合(数量较多)

排序:从召回集合中读取推荐文章,构建样本特征进行排序过滤筛选

3.3.1.1项目召回与排序业务流程

3.3.2项目推荐的召回排序设计

  • 匿名用户:

    • 通常使用用户冷启动方案,区别在于user_id为匿名用户手机识别号(项目不允许匿名用户)
  • 所有只正针对于登录用户:

  • 用户冷启动(前期点击行为较少情况)

    • 非个性化推荐
      • 热门召回:自定义热门规则,根据当前时间段热点定期更新维护人点文章库
      • 新文章召回:为了提高新文章的曝光率,建立新文章库,进行推荐
    • 个性化推荐:
      • 基于内容的协同过滤在线召回:基于用户实时兴趣画像相似的召回结果用于首页的个性化推荐
  • 后期离线部分(用户点击行为较多,用户画像完善)
    • 建立用户长期兴趣画像(详细):包括用户各个维度的兴趣特征
    • 训练排序模型
      • LR模型、FTRL、Wide&Deep
    • 离线部分的召回:
      • 基于模型协同过滤推荐离线召回:ALS
      • 基于内容的离线召回:或者称基于用户画像的召回

3.4 召回表设计与模型召回

学习目标

  • 目标
    • 知道ALS模型推荐API使用
    • 知道StringIndexer的使用
  • 应用
    • 应用spark完成离线用户基于模型的协同过滤推荐

3.4.1 召回表设计

我们的召回方式有很多种,多路召回结果存储模型召回与内容召回的结果需要进行相应频道推荐合并。

  • 方案:基于模型与基于内容的召回结果存入同一张表,避免多张表进行读取处理
    • 由于HBASE有多个版本数据功能存在的支持
    • TTL=>7776000, VERSIONS=>999999
create 'cb_recall', {NAME=>'als', TTL=>7776000, VERSIONS=>999999}
alter 'cb_recall', {NAME=>'content', TTL=>7776000, VERSIONS=>999999}
alter 'cb_recall', {NAME=>'online', TTL=>7776000, VERSIONS=>999999}

# 例子:
put 'cb_recall', 'recall:user:5', 'als:1',[45,3,5,10]
put 'cb_recall', 'recall:user:5', 'als:1',[289,11,65,52,109,8]
put 'cb_recall', 'recall:user:5', 'als:2',[1,2,3,4,5,6,7,8,9,10]
put 'cb_recall', 'recall:user:2', 'content:1',[45,3,5,10,289,11,65,52,109,8]
put 'cb_recall', 'recall:user:2', 'content:2',[1,2,3,4,5,6,7,8,9,10]


hbase(main):084:0> desc 'cb_recall'
Table cb_recall is ENABLED                                                                             
cb_recall                                                                                              
COLUMN FAMILIES DESCRIPTION                                                                            
{NAME => 'als', VERSIONS => '999999', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false'
, KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL => 
'7776000 SECONDS (90 DAYS)', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE
_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_
OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}                    
{NAME => 'content', VERSIONS => '999999', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'fa
lse', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL
 => '7776000 SECONDS (90 DAYS)', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', C
ACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS
_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}                
{NAME => 'online', VERSIONS => '999999', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'fal
se', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL 
=> '7776000 SECONDS (90 DAYS)', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CA
CHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_
ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}                 
3 row(s)

在HIVE用户数据数据库下建立HIVE外部表,若hbase表有修改,则进行HIVE 表删除更新

create external table cb_recall_hbase(
user_id STRING comment "userID",
als map<string, ARRAY<BIGINT>> comment "als recall",
content map<string, ARRAY<BIGINT>> comment "content recall",
online map<string, ARRAY<BIGINT>> comment "online recall")
COMMENT "user recall table"
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,als:,content:,online:")
TBLPROPERTIES ("hbase.table.name" = "cb_recall");

增加一个历史召回结果表

create 'history_recall', {NAME=>'channel', TTL=>7776000, VERSIONS=>999999}


put 'history_recall', 'recall:user:5', 'als:1',[1,2,3]
put 'history_recall', 'recall:user:5', 'als:1',[4,5,6,7]
put 'history_recall', 'recall:user:5', 'als:1',[8,9,10]

为什么增加历史召回表?

 

 

  • 1、直接在存储召回结果部分进行过滤,比之后排序过滤,节省排序时间
  • 2、防止Redis缓存没有消耗完,造成重复推荐,从源头进行过滤

3.4.2 基于模型召回集合计算

初始化信息

import os
import sys
# 如果当前代码文件运行测试需要加入修改路径,避免出现后导包问题
BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))
sys.path.insert(0, os.path.join(BASE_DIR))

PYSPARK_PYTHON = "/miniconda2/envs/reco_sys/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

from offline import SparkSessionBase

class UpdateRecall(SparkSessionBase):

    SPARK_APP_NAME = "updateRecall"
    ENABLE_HIVE_SUPPORT = True

    def __init__(self):
        self.spark = self._create_spark_session()

ur = UpdateRecall()

3.4.2.1 用户日志信息处理

  • 目标:处理成ALS模型所需数据类型和格式

 

 

  • 步骤:
    • 数据类型转换,clicked
    • 用户ID与文章ID处理

数据类型转换,clicked

ur.spark.sql("use profile")
user_article_click = ur.spark.sql("select * from user_article_basic").\
            select(['user_id', 'article_id', 'clicked'])
# 更换类型
def change_types(row):
    return row.user_id, row.article_id, int(row.clicked)

user_article_click = user_article_click.rdd.map(change_types).toDF(['user_id', 'article_id', 'clicked'])

 

 

用户ID与文章ID处理,编程ID索引

from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
# 用户和文章ID超过ALS最大整数值,需要使用StringIndexer进行转换
user_id_indexer = StringIndexer(inputCol='user_id', outputCol='als_user_id')
article_id_indexer = StringIndexer(inputCol='article_id', outputCol='als_article_id')
pip = Pipeline(stages=[user_id_indexer, article_id_indexer])
pip_fit = pip.fit(user_article_click)
als_user_article_click = pip_fit.transform(user_article_click)

3.4.2.2 ALS 模型训练与推荐

ALS模型需要输出用户ID列,文章ID列以及点击列

from pyspark.ml.recommendation import ALS
# 模型训练和推荐默认每个用户固定文章个数
als = ALS(userCol='als_user_id', itemCol='als_article_id', ratingCol='clicked', checkpointInterval=1)
model = als.fit(als_user_article_click)
recall_res = model.recommendForAllUsers(100)

 

 

3.4.2.3 推荐结果处理

通过StringIndexer变换后的下标知道原来的和用户ID

# recall_res得到需要使用StringIndexer变换后的下标
# 保存原来的下表映射关系
refection_user = als_user_article_click.groupBy(['user_id']).max('als_user_id').withColumnRenamed(
'max(als_user_id)', 'als_user_id')
refection_article = als_user_article_click.groupBy(['article_id']).max('als_article_id').withColumnRenamed(
'max(als_article_id)', 'als_article_id')

# Join推荐结果与 refection_user映射关系表
# +-----------+--------------------+-------------------+
# | als_user_id | recommendations | user_id |
# +-----------+--------------------+-------------------+
# | 8 | [[163, 0.91328144]... | 2 |
#        | 0 | [[145, 0.653115], ... | 1106476833370537984 |
recall_res = recall_res.join(refection_user, on=['als_user_id'], how='left').select(
['als_user_id', 'recommendations', 'user_id'])

对推荐文章ID后处理:得到推荐列表,获取推荐列表中的ID索引

# Join推荐结果与 refection_article映射关系表
# +-----------+-------+----------------+
# | als_user_id | user_id | als_article_id |
# +-----------+-------+----------------+
# | 8 | 2 | [163, 0.91328144] |
# | 8 | 2 | [132, 0.91328144] |
import pyspark.sql.functions as F
recall_res = recall_res.withColumn('als_article_id', F.explode('recommendations')).drop('recommendations')

# +-----------+-------+--------------+
# | als_user_id | user_id | als_article_id |
# +-----------+-------+--------------+
# | 8 | 2 | 163 |
# | 8 | 2 | 132 |
def _article_id(row):
  return row.als_user_id, row.user_id, row.als_article_id[0]

进行索引对应文章ID获取

als_recall = recall_res.rdd.map(_article_id).toDF(['als_user_id', 'user_id', 'als_article_id'])
als_recall = als_recall.join(refection_article, on=['als_article_id'], how='left').select(
  ['user_id', 'article_id'])
# 得到每个用户ID 对应推荐文章
# +-------------------+----------+
# | user_id | article_id |
# +-------------------+----------+
# | 1106476833370537984 | 44075 |
# | 1 | 44075 |

获取每个文章对应的频道,推荐给用户时按照频道存储

ur.spark.sql("use toutiao")
news_article_basic = ur.spark.sql("select article_id, channel_id from news_article_basic")

als_recall = als_recall.join(news_article_basic, on=['article_id'], how='left')
als_recall = als_recall.groupBy(['user_id', 'channel_id']).agg(F.collect_list('article_id')).withColumnRenamed(
  'collect_list(article_id)', 'article_list')

als_recall = als_recall.dropna()

3.4.2.4 召回结果存储

  • 存储位置,选择HBASE

HBASE表设计:

put 'cb_recall', 'recall:user:5', 'als:1',[45,3,5,10,289,11,65,52,109,8]
put 'cb_recall', 'recall:user:5', 'als:2',[1,2,3,4,5,6,7,8,9,10]

存储代码如下:

        def save_offline_recall_hbase(partition):
            """离线模型召回结果存储
            """
            import happybase
            pool = happybase.ConnectionPool(size=10, host='hadoop-master', port=9090)
            for row in partition:
                with pool.connection() as conn:
                    # 获取历史看过的该频道文章
                    history_table = conn.table('history_recall')
                    # 多个版本
                    data = history_table.cells('reco:his:{}'.format(row.user_id).encode(),
                                               'channel:{}'.format(row.channel_id).encode())

                    history = []
                    if len(data) >= 2:
                        for l in data[:-1]:
                            history.extend(eval(l))
                    else:
                        history = []

                    # 过滤reco_article与history
                    reco_res = list(set(row.article_list) - set(history))

                    if reco_res:

                        table = conn.table('cb_recall')
                        # 默认放在推荐频道
                        table.put('recall:user:{}'.format(row.user_id).encode(),
                                  {'als:{}'.format(row.channel_id).encode(): str(reco_res).encode()})
                        conn.close()

                        # 放入历史推荐过文章
                        history_table.put("reco:his:{}".format(row.user_id).encode(),
                                          {'channel:{}'.format(row.channel_id): str(reco_res).encode()})
                    conn.close()

        als_recall.foreachPartition(save_offline_recall_hbase)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/177647.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【SpringCloud21】面试题雪花算法

目录1. 问题2.ID生成规则部分硬性要求3.ID号生成系统的可用性要求4.一般通用方案4.1UUID4. 数据库自增主键5. 基于Redis生成全局id策略6. snowflake6.1 概述6.2 结构6.3 源码6.4 工程落地经验6.4.1 糊涂工具包6.4.2 springboot整合雪花算法1.POM2.核心代码IdGeneratorSnowflake…

ElasticSearch7.6.x 学习笔记

ElasticSearch7.6.x 学习笔记 目录 ElasticSearch概述 ElasticSearch&#xff0c;简称es&#xff0c;es是一个开源的高扩展的分布式全文检索引擎&#xff0c;它可以近乎实时的存储、检索数据。且本身扩展性很好&#xff0c;可以扩展到上百台服务器&#xff0c;处理PB级别的数…

Spring事务、事务隔离级别、事务传播机制

Spring事务和事务传播机制一、为什么需要事务&#xff1f;(回顾)二、Spring中事务的实现2.1 MySQL中的事务使用 (回顾)2.2 Spring编程式事务2.3 Spring 声明式事务2.3.1 Transactional 使用2.3.2 Transactional 作用范围2.3.3 Transactional 参数说明2.3.4 注意事项2.3.4 Trans…

【微服务】Nacos注册中心

Nacos和Eureka一样也可以充当服务的注册中心&#xff0c;让我们一起看看有何区别&#xff1f; 点击跳转&#x1f449;【微服务】Eureka注册中心 一.引入 Nacos是阿里巴巴的产品&#xff0c;现在是SpringCloud中的一个组件。相比于Eureka其功能更加丰富&#xff0c;在国内受欢迎…

2. 获取数字证书,搭建nginx服务器,验证https请求

文章目录 一、 前提知识二、获取数字证书三、搭建nginx服务器3.1 安装nginx操作步骤3.2 导入证书3.3 修改nginx配置文件四、验证一、 前提知识 首先我们知道访问域名时,http请求默认端口为80,https为443。那么我们现在就需要对这两个端口进行监听,这里我们就要用到nginx服务…

数据结构 最短路径课设(源码+实验报告+视频讲解)(用了自取)

XIAN TECHNOLOGICAL UNIVERSITY 课程设计报告 实验课程名称 算法与数据结构 专 业&#xff1a; 班 级&#xff1a; 姓 名&#xff1a; 学 号&#xff1a; 实验学时&#xff1a; 指导…

插入排序实现

场景&#xff1a; 插入排序&#xff0c;一般也被称为直接插入排序。 对于少量元素的排序&#xff0c;它是一个有效的算法 。 插入排序是一种最简单的排序方法&#xff0c;它的基本思想是将一个记录插入到已经排好序的有序表中&#xff0c;从而一个新的、记录数增1的有序表。在…

分享135个ASP源码,总有一款适合您

ASP源码 分享135个ASP源码&#xff0c;总有一款适合您 下面是文件的名字&#xff0c;我放了一些图片&#xff0c;文章里不是所有的图主要是放不下...&#xff0c; 135个ASP源码下载链接&#xff1a;https://pan.baidu.com/s/1rHFniMK56P-_qXNY9kKihg?pwdl95g 提取码&#x…

day11文件夹导航条+文件从数据库和cos删除

获取临时凭证&&上传文件 1创建COS_OBJECT对象 new一个&#xff0c;然后就会向你写的url地址获取临时凭证&#xff08;需要引入一个JS&#xff09; 2.写url地址和url的函数&#xff0c;后台写函数获取到临时凭证&#xff0c;有文档直接调用就可&#xff0c;然后后台返回…

【C++】右值引用和移动语义 | 新的类功能 | 可变参数模板

​&#x1f320; 作者&#xff1a;阿亮joy. &#x1f386;专栏&#xff1a;《吃透西嘎嘎》 &#x1f387; 座右铭&#xff1a;每个优秀的人都有一段沉默的时光&#xff0c;那段时光是付出了很多努力却得不到结果的日子&#xff0c;我们把它叫做扎根 目录&#x1f449;左值引用…

HTTPS 是这样握手的

HTTP协议默认是明文传输&#xff0c;存在一定的安全隐患&#xff0c;容易被中间人窃听和攻击&#xff0c;在 加密解决HTTP协议带来的安全问题 中提到使用哈希、对称加密、非对称加密等方式对数据加密&#xff0c;能解决数据安全的问题。 以上加密方式需要我们手动的使用加密算…

python基础——列表切片操作

python基础——列表切片操作 文章目录python基础——列表切片操作一、实验目的二、实验原理三、实验环境四、实验内容五、实验步骤一、实验目的 掌握列表切片操作 二、实验原理 1、列表是写在方括号[]之间、用逗号分隔开的元素列表。列表可以完成大多数集合类的数据结构实现…

23种设计模式(二十三)——解释器模式【邻域问题】

文章目录 意图什么时候使用解释器真实世界类比解释器模式的实现文法抽象语法树解释器模式的优缺点亦称:Interpreter 意图 给定一个语言,定义它的文法表示,并定义一个解释器,这个解释器使用该标识来解释语言中的句子。 在软件系统中,如果有一些特殊的领域问题较为复杂,疑…

基于springboot物资管理系统源码含论文

摘要 目前&#xff0c;大型物资作为社会零售业态中最为重要的组成部分&#xff0c;处于社会零售商业 进入高速发展的轨道阶段&#xff0c;其在社会经济发展的作用日益明显。国内各大大型基本 上都拥有自己的社会网&#xff0c;将社会物资管理纳入网络管理系统&#xff0c;实现…

JDK8 新特性之并行的Stream流

目录 一&#xff1a;串行的Stream流 二&#xff1a;并行的Stream流 获取并行Stream流的两种方式 小结 三&#xff1a;并行和串行Stream流的效率对比 四&#xff1a;parallelStream线程安全问题 五&#xff1a;parallelStream背后的技术 Fork/Join框架介绍 Fork/Join原理…

RK3399平台开发系列讲解(内存篇)访问虚拟内存的物理内存过程

🚀返回专栏总目录 文章目录 一、虚拟地址的表示二、虚拟地址到物理地址的转换三、Linux页表沉淀、分享、成长,让自己和他人都能有所收获!😄 📢虚拟内存这一概念给进程带来错觉,使它认为内存大到几乎无限,有时甚至超过系统的实际内存。每次访问内存位置时,由CPU完成从…

static_cast,dynamic_cast,const_cast详解

目录 一.static_cast&#xff08;静态转换&#xff09; 二.dynamic_cast&#xff08;动态转换&#xff09; 三.const_cast 一.static_cast&#xff08;静态转换&#xff09; 1.语法&#xff1a; static_cast<new_type>(expression); newtype dataname static_cast…

分享133个ASP源码,总有一款适合您

ASP源码 分享133个ASP源码&#xff0c;总有一款适合您 下面是文件的名字&#xff0c;我放了一些图片&#xff0c;文章里不是所有的图主要是放不下...&#xff0c; 133个ASP源码下载链接&#xff1a;https://pan.baidu.com/s/1l_8UHQkosNF3HHTu8AFq5A?pwdyxvw 提取码&#x…

欧几里得与扩展欧几里得算法(含推导过程及代码)

文章目录前言一、欧几里得算法二、扩展欧几里得算法2.1、认识裴蜀定理2.2、推导axbygcd(a, b)得到x与y2.2.1、推导过程2.2.2、代码实现2.3、推导axbygcd(a, b)的所有解及a或者b的最小值&#xff08;结论验证&#xff09;参考文章前言 在学习Acwing c蓝桥杯辅导课第八讲数论-Ac…

Spark 常用算子02

常用Action算子 1、countByKey算子 功能&#xff1a;统计key出现的次数&#xff08;一般适用于KV型的RDD&#xff09; 用法&#xff1a; result rdd1.countByKey() print(result)代码示例&#xff1a; # coding:utf8from pyspark import SparkConf, SparkContextif __name…