离线用户召回定时更新

news2025/1/16 9:13:33

3.6 离线用户召回定时更新

学习目标

  • 目标
    • 知道离线内容召回的概念
    • 知道如何进行内容召回计算存储规则
  • 应用
    • 应用spark完成离线用户基于内容的协同过滤推荐

3.6.1 定时更新代码

  • 完整代码
import os
import sys
# 如果当前代码文件运行测试需要加入修改路径,否则后面的导包出现问题
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(BASE_DIR))
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

from pyspark.ml.recommendation import ALS
from offline import SparkSessionBase
from datetime import datetime
import time
import numpy as np


class UpdateRecall(SparkSessionBase):

    SPARK_APP_NAME = "updateRecall"
    ENABLE_HIVE_SUPPORT = True

    def __init__(self, number):

        self.spark = self._create_spark_session()
        self.N = number

    def update_als_recall(self):
        """
        更新基于模型(ALS)的协同过滤召回集
        :return:
        """
        # 读取用户行为基本表
        self.spark.sql("use profile")
        user_article_click = self.spark.sql("select * from user_article_basic").\
            select(['user_id', 'article_id', 'clicked'])

        # 更换类型
        def change_types(row):
            return row.user_id, row.article_id, int(row.clicked)

        user_article_click = user_article_click.rdd.map(change_types).toDF(['user_id', 'article_id', 'clicked'])
        # 用户和文章ID超过ALS最大整数值,需要使用StringIndexer进行转换
        user_id_indexer = StringIndexer(inputCol='user_id', outputCol='als_user_id')
        article_id_indexer = StringIndexer(inputCol='article_id', outputCol='als_article_id')
        pip = Pipeline(stages=[user_id_indexer, article_id_indexer])
        pip_fit = pip.fit(user_article_click)
        als_user_article_click = pip_fit.transform(user_article_click)

        # 模型训练和推荐默认每个用户固定文章个数
        als = ALS(userCol='als_user_id', itemCol='als_article_id', ratingCol='clicked', checkpointInterval=1)
        model = als.fit(als_user_article_click)
        recall_res = model.recommendForAllUsers(self.N)

        # recall_res得到需要使用StringIndexer变换后的下标
        # 保存原来的下表映射关系
        refection_user = als_user_article_click.groupBy(['user_id']).max('als_user_id').withColumnRenamed(
            'max(als_user_id)', 'als_user_id')
        refection_article = als_user_article_click.groupBy(['article_id']).max('als_article_id').withColumnRenamed(
            'max(als_article_id)', 'als_article_id')

        # Join推荐结果与 refection_user映射关系表
        # +-----------+--------------------+-------------------+
        # | als_user_id | recommendations | user_id |
        # +-----------+--------------------+-------------------+
        # | 8 | [[163, 0.91328144]... | 2 |
        #        | 0 | [[145, 0.653115], ... | 1106476833370537984 |
        recall_res = recall_res.join(refection_user, on=['als_user_id'], how='left').select(
            ['als_user_id', 'recommendations', 'user_id'])

        # Join推荐结果与 refection_article映射关系表
        # +-----------+-------+----------------+
        # | als_user_id | user_id | als_article_id |
        # +-----------+-------+----------------+
        # | 8 | 2 | [163, 0.91328144] |
        # | 8 | 2 | [132, 0.91328144] |
        import pyspark.sql.functions as F
        recall_res = recall_res.withColumn('als_article_id', F.explode('recommendations')).drop('recommendations')

        # +-----------+-------+--------------+
        # | als_user_id | user_id | als_article_id |
        # +-----------+-------+--------------+
        # | 8 | 2 | 163 |
        # | 8 | 2 | 132 |
        def _article_id(row):
            return row.als_user_id, row.user_id, row.als_article_id[0]

        als_recall = recall_res.rdd.map(_article_id).toDF(['als_user_id', 'user_id', 'als_article_id'])
        als_recall = als_recall.join(refection_article, on=['als_article_id'], how='left').select(
            ['user_id', 'article_id'])
        # 得到每个用户ID 对应推荐文章
        # +-------------------+----------+
        # | user_id | article_id |
        # +-------------------+----------+
        # | 1106476833370537984 | 44075 |
        # | 1 | 44075 |
        # 分组统计每个用户,推荐列表
        # als_recall = als_recall.groupby('user_id').agg(F.collect_list('article_id')).withColumnRenamed(
        #     'collect_list(article_id)', 'article_list')
        self.spark.sql("use toutiao")
        news_article_basic = self.spark.sql("select article_id, channel_id from news_article_basic")
        als_recall = als_recall.join(news_article_basic, on=['article_id'], how='left')
        als_recall = als_recall.groupBy(['user_id', 'channel_id']).agg(F.collect_list('article_id')).withColumnRenamed(
            'collect_list(article_id)', 'article_list')
        als_recall = als_recall.dropna()

        # 存储
        def save_offline_recall_hbase(partition):
            """离线模型召回结果存储
            """
            import happybase
            pool = happybase.ConnectionPool(size=10, host='hadoop-master', port=9090)
            for row in partition:
                with pool.connection() as conn:
                    # 获取历史看过的该频道文章
                    history_table = conn.table('history_recall')
                    # 多个版本
                    data = history_table.cells('reco:his:{}'.format(row.user_id).encode(),
                                               'channel:{}'.format(row.channel_id).encode())

                    history = []
                    if len(data) >= 2:
                        for l in data[:-1]:
                            history.extend(eval(l))
                    else:
                        history = []

                    # 过滤reco_article与history
                    reco_res = list(set(row.article_list) - set(history))

                    if reco_res:

                        table = conn.table('cb_recall')
                        # 默认放在推荐频道
                        table.put('recall:user:{}'.format(row.user_id).encode(),
                                  {'als:{}'.format(row.channel_id).encode(): str(reco_res).encode()})
                        conn.close()

                        # 放入历史推荐过文章
                        history_table.put("reco:his:{}".format(row.user_id).encode(),
                                          {'channel:{}'.format(row.channel_id): str(reco_res).encode()})
                    conn.close()

        als_recall.foreachPartition(save_offline_recall_hbase)

    def update_content_recall(self):
        """
        更新基于内容(画像)的推荐召回集, word2vec相似
        :return:
        """
        # 基于内容相似召回(画像召回)
        ur.spark.sql("use profile")
        user_article_basic = self.spark.sql("select * from user_article_basic")
        user_article_basic = user_article_basic.filter("clicked=True")

        def save_content_filter_history_to__recall(partition):
            """计算每个用户的每个操作文章的相似文章,过滤之后,写入content召回表当中(支持不同时间戳版本)
            """
            import happybase
            pool = happybase.ConnectionPool(size=10, host='hadoop-master')

            # 进行为相似文章获取
            with pool.connection() as conn:

                # key:   article_id,    column:  similar:article_id
                similar_table = conn.table('article_similar')
                # 循环partition
                for row in partition:
                    # 获取相似文章结果表
                    similar_article = similar_table.row(str(row.article_id).encode(),
                                                        columns=[b'similar'])
                    # 相似文章相似度排序过滤,召回不需要太大的数据, 百个,千
                    _srt = sorted(similar_article.items(), key=lambda item: item[1], reverse=True)
                    if _srt:
                        # 每次行为推荐10篇文章
                        reco_article = [int(i[0].split(b':')[1]) for i in _srt][:10]

                        # 获取历史看过的该频道文章
                        history_table = conn.table('history_recall')
                        # 多个版本
                        data = history_table.cells('reco:his:{}'.format(row.user_id).encode(),
                                                   'channel:{}'.format(row.channel_id).encode())

                        history = []
                        if len(data) >= 2:
                            for l in data[:-1]:
                                history.extend(eval(l))
                        else:
                            history = []

                        # 过滤reco_article与history
                        reco_res = list(set(reco_article) - set(history))

                        # 进行推荐,放入基于内容的召回表当中以及历史看过的文章表当中
                        if reco_res:
                            # content_table = conn.table('cb_content_recall')
                            content_table = conn.table('cb_recall')
                            content_table.put("recall:user:{}".format(row.user_id).encode(),
                                              {'content:{}'.format(row.channel_id).encode(): str(reco_res).encode()})

                            # 放入历史推荐过文章
                            history_table.put("reco:his:{}".format(row.user_id).encode(),
                                              {'channel:{}'.format(row.channel_id).encode(): str(reco_res).encode()})

                conn.close()

        user_article_basic.foreachPartition(save_content_filter_history_to__recall)


if __name__ == '__main__':
    ur = UpdateRecall(500)
    ur.update_als_recall()
    ur.update_content_recall()
  • 定时更新代码,在main.py和update.py中添加以下代码:
from offline.update_recall import UpdateRecall
from schedule.update_profile import update_user_profile, update_article_profile, update_recall

def update_recall():
    """
    更新用户的召回集
    :return:
    """
    udp = UpdateRecall(200)
    udp.update_als_recall()
    udp.update_content_recall()

main中添加

scheduler.add_job(update_recall, trigger='interval', hour=3)

 

3.6 离线排序模型训练

学习目标

  • 目标
    • 了解文章CTR预估主要作用
    • 知道常见点击率预测的种类和模型
    • 知道常见CTR中特征处理方式
  • 应用
    • 应用spark lr完成模型训练预测评估

3.6.1 离线排序模型-CTR预估

  • CTR(Click-Through Rate)预估:给定一个Item,预测该Item会被点击的概率

    • 离线的模型训练:排序的各种模型训练评估
    • 特征服务平台:为了提高模型在排序时候的特征读取处理速率,直接将处理好的特征写入HBASE

 

 

3.6.2 排序模型

最基础的模型目前都是基于LR的点击率预估策略,目前在工业使用模型做预估的有这么几种类型

  • 宽模型 + 特征⼯程

    • LR/MLR + 非ID类特征(⼈⼯离散/GBDT/FM)
    • spark 中可以直接使用
  • 宽模型 + 深模型

    • wide&deep,DeepFM
    • 使用TensorFlow进行训练
  • 深模型:
    • DNN + 特征embedding
    • 使用TensorFlow进行训练

这里使用LR做基本模型使用先,去进行模型的评估,使用模型进行预测

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/178463.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

游戏启动器:LaunchBox Premium with Big Box v13.1

LaunchBox知道您会喜欢的功能,具有风格的游戏启动器,我们最初将 Launchbox 构建为 DOSBox 的一个有吸引力的前端,但它现在拥有对现代游戏和复古游戏模拟的支持。我们让您的所有游戏看起来都很漂亮。 整理您的游戏收藏 我们不仅漂亮&#xff…

基于微信小程序奶茶店在线点餐下单系统

奶茶在线下单系统用户端是基于微信小程序端,管理员端是基于web端,基于java编程语言,mysql数据库,idea工具开发,用户微信端可以注册登陆小程序,查看奶茶详情,搜索下单奶茶,在线奶茶评…

CSS @property(CSS 自定义属性)

CSS property(CSS 自定义属性)参考描述propertyHoudiniproperty兼容性描述符规则syntax扩展initial-valueinherits示例描述符的注意事项使用 JavaScript 来创建自定义属性CSS 变量与自定义属性重复赋值过渡简单的背景过渡动画更复杂的背景过渡动画错误示…

【ARM体系结构】之数据类型约定与工作模式

1、RISC和CISC的区别 1.1 RISC : 精简指令集 使用精简指令集的架构:ARM架构 RISC-V架构 PowerPC架构 MIPS架构ARM架构 :目前使用最广泛的架构,ARM面向的低端消费类市场RISC-V架构 :第五代,精简指令集的架构&#xff…

这样定义通用人工智能

🍿*★,*:.☆欢迎您/$:*.★* 🍿 正文 人类解决问题的途径,大体可以分为两种。一种是事实推理,另一种是事实验证。 为什么只是两种分类,因为根据和环境的交互与否。 事实推理解释为当遇到事件发生的时候,思考的过程。可以使用概率模型,或者更复杂的模型(目前没…

Out of Vocabulary处理方法

Out of Vocabulary 我们在NLP任务中一般都会有一个词表,这个词表一般可以使用一些大牛论文中的词表或者一些大公司的词表,或者是从自己的数据集中提取的词。但是无论当后续的训练还是预测,总有可能会出现并不包含在词表中的词,这…

(教程)如何在BERT模型中添加自己的词汇(pytorch版)

来源:投稿 作者:皮皮雷 编辑:学姐 参考文章: NLP | How to add a domain-specific vocabulary (new tokens) to a subword tokenizer already trained like BERT WordPiece | by Pierre Guillou | Medium https://medium.com/pi…

ROS2机器人编程简述humble-第三章-BUMP AND GO IN C++ .3

简述本章项目,参考如下:ROS2机器人编程简述humble-第三章-PERCEPTION AND ACTUATION MODELS .1流程图绘制,参考如下:ROS2机器人编程简述humble-第三章-COMPUTATION GRAPH .2然后,在3.3和3.4分别用C和Python编程实现&am…

Bus Hound 工具抓取串口数据(PC端抓取USB转串口数据)

测试环境: PC端 USB转串口 链接终端板卡串口 目标:抓取通信过程中的通信数据 工具介绍:Bus Hound是是由美国perisoft公司研制的一款超级软件总线协议分析器,它是一种专用于PC机各种总线数据包监视和控制的开发工具软件&#xff0c…

通信原理简明教程 | 数字调制传输

文章目录1 二进制数字调制和解调1.1 二进制数字调制的基本原理1.2 二进制数字调制信号的特性1.3 解调方法2 二进制差分相移键控2.1 2PSK的倒π现象2.2 2DPSK调制和解调3 二进制调制系统的抗噪性能3.1 2ASK系统的抗噪声性能3.2 2FSK系统的抗噪声性能4 二进制数字调制系统性能比较…

服务器配置定时脚本 crontab + Python;centos6或centos 7或centos8 实现定时执行 python 脚本

一、crontab的安装 默认情况下,CentOS 7中已经安装有crontab,如果没有安装,可以通过yum进行安装。 yum install crontabs 二、crontab的定时语法说明 corntab中,一行代码就是一个定时任务,其语法结构可以通过这个图来理解。 字符含义如下: * 代表取值范围内的数字 /…

Linux内核驱动初探(三) 以太网卡

目录 0. 前言 1. menuconfig 2. 设备树 0. 前言 这次的网卡驱动就比较顺利,基本就是参考 4.19.x 内核以及 imx6qdl-sabrelite.dtsi、imx6qdl-sabreauto.dtsi 中的设备树,来设置以太网各项参数。 1. menuconfig 其实笔者接手的时候,网口这…

本质安全设备标准(IEC60079-11)的理解(三)

本质安全设备标准(IEC60079-11)的理解(三) 对于标准中“fault”的理解 第一,标准中对fault的定义是这样的: 3.7.2 fault any defect of any component, separation, insulation or connection between c…

C++空间命名

前言 提示:由于C是在C语言基础之上,增加了很多新的东西。 本文讲解命名空间的具体使用方法 文章目录 目录 前言 一、命名空间 二、命名空间定义 1.嵌套性 2.和并性 总结 提示:以下是本篇文章正文内容,下面案例可供参考 一…

【华为上机真题】区间交集

🎈 作者:Linux猿 🎈 简介:CSDN博客专家🏆,华为云享专家🏆,Linux、C/C、云计算、物联网、面试、刷题、算法尽管咨询我,关注我,有问题私聊! &…

CleanMyMac X真的有必要买吗?CleanMyMac2023最新版下载

CleanMyMac X是一款集所有功能于一身的先进程序卸载清理器,只需两个简单步骤就可以把系统里那些乱七八糟的无用文件统统清理掉,节省宝贵的磁盘空间。CleanMyMac为您喜爱的东西腾出空间。它不仅有着赏心悦目的UI交互页面,更有着强大的“超能力…

HTB-BountyHunter

HTB-BountyHunter信息收集开机提权信息收集 80端口的网页如下。 注意有一个db.php,虽然现在打不开,估计后面会用上。 还有resources里面的readme文件。 完成了tracker提交编写和developer组权限。没有完成portal的test用户禁用、选择哈希加密的密码以…

Webshell(网页后门)

数据来源 本文仅用于信息安全的学习,请遵守相关法律法规,严禁用于非法途径。若观众因此作出任何危害网络安全的行为,后果自负,与本人无关。 一、Webshell简介 01 什么是 Webshell webshell是以 asp、php、jsp或者cgi等网页文…

【数据结构与算法】第十九篇:回溯,剪枝,N皇后问题

知识导航一、回溯思想概述二、八皇后问题引入八皇后问题的解决思路(1)思路一:暴力出奇迹(2)思路二:根据题意减小暴力程度(3)思路三:回溯法剪枝三、四皇后问题八皇后问题四、N皇后的实现1.实现方法一:利用数…

程序员的自我修养第七章——动态链接 (上)

继续更新《程序员的自我修养》这个系列,主要是夏天没把它看完,补上遗憾。本篇来自书中第七章。 再说动态链接前,我们先阐明为什么要动态链接: 动态链接的产生来自静态链接的局限性。随着静态链接的发展,其限制也越来越…