推荐系统之推荐缓存服务

news2025/1/10 20:49:20

5.6 推荐缓存服务

学习目标

  • 目标
  • 应用

5.6.1 待推荐结果的redis缓存

  • 目的:对待推荐结果进行二级缓存,多级缓存减少数据库读取压力
  • 步骤:
    • 1、获取redis结果,进行判断
      • 如果redis有,读取需要推荐的文章数量放回,并删除这些文章,并且放入推荐历史推荐结果中
      • 如果redis当中不存在,则从wait_recommend中读取
        • 如果wait_recommend中也没有,直接返回
        • 如果wait_recommend有,从wait_recommend取出所有结果,定一个数量(如100篇)存入redis,剩下放回wait_recommend,不够100,全部放入redis,然后清空wait_recommend
        • 从redis中拿出要推荐的文章结果,然后放入历史推荐结果中

增加一个缓存数据库

# 缓存在8号当中
cache_client = redis.StrictRedis(host=DefaultConfig.REDIS_HOST,
                                 port=DefaultConfig.REDIS_PORT,
                                 db=8,
                                 decode_responses=True)

1、redis 8 号数据库读取

# 1、直接去redis拿取对应的键,如果为空
    # 构造读redis的键
    key = 'reco:{}:{}:art'.format(temp.user_id, temp.channel_id)
    # 读取,删除,返回结果
    pl = cache_client.pipeline()

    # 拿督redis数据
    res = cache_client.zrevrange(key, 0, temp.article_num - 1)
    if res:
        # 手动删除读取出来的缓存结果
        pl.zrem(key, *res)

2、redis没有数据,进行wait_recommend读取,放入redis中

else:

        # 如果没有redis缓存数据
        # 删除键
        cache_client.delete(key)
        try:
            # 1、# - 首先从wait_recommend中读取,没有直接返回空,进去正常召回流程
            wait_cache = eval(hbu.get_table_row('wait_recommend',
                                                'reco:{}'.format(temp.user_id).encode(),
                                                'channel:{}'.format(temp.channel_id).encode()))
        except Exception as e:
            logger.warning("{} WARN read user_id:{} wait_recommend exception:{} not exist".format(
                datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, e))
            wait_cache = []

        if not wait_cache:
            return wait_cache
        # 2、- 首先从wait_recommend中读取,有数据,读取出来放入自定义100个文章到redis当中,如有剩余放回到wait_recommend。小于自定义100,全部放入redis,wait_recommend直接清空
        # - 直接取出被推荐的结果,记录一下到历史记录当中
        # 假设是放入到redis当中为100个数据

        if len(wait_cache) > 100:
            logger.info(
                "{} INFO reduce user_id:{} channel:{} wait_recommend data".format(
                    datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))

            cache_redis = wait_cache[:100]
            # 前100个数据放入redis
            pl.zadd(key, dict(zip(cache_redis, range(len(cache_redis)))))

            # 100个后面的数据,在放回wait_recommend
            hbu.get_table_put('wait_recommend',
                              'reco:{}'.format(temp.user_id).encode(),
                              'channel:{}'.format(temp.channel_id).encode(),
                              str(wait_cache[100:]).encode())

        else:
            logger.info(
                "{} INFO delete user_id:{} channel:{} wait_recommend data".format(
                    datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
            # 清空wait_recommend数据
            hbu.get_table_put('wait_recommend',
                              'reco:{}'.format(temp.user_id).encode(),
                              'channel:{}'.format(temp.channel_id).encode(),
                               str([]).encode())

            # 所有不足100个数据,放入redis
            pl.zadd(key, dict(zip(wait_cache, range(len(wait_cache)))))

        res = cache_client.zrange(key, 0, temp.article_num - 1)

3、推荐出去的结果放入历史结果

 # redis初始有无数据
    pl.execute()

    # 进行类型转换
    res = list(map(int, res))

    logger.info("{} INFO store user_id:{} channel:{} cache history data".format(
        datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
    # 进行推荐出去,要做放入历史推荐结果当中
    hbu.get_table_put('history_recommend',
                      'reco:his:{}'.format(temp.user_id).encode(),
                      'channel:{}'.format(temp.channel_id).encode(),
                      str(res).encode(),
                      timestamp=temp.time_stamp
                      )
    return res

完整逻辑代码:

from server import cache_client
import logging
from datetime import datetime

logger = logging.getLogger('recommend')


def get_reco_from_cache(temp, hbu):
    """读取数据库缓存
    redis: 存储在 8 号
    """
    # 1、直接去redis拿取对应的键,如果为空
    # 构造读redis的键
    key = 'reco:{}:{}:art'.format(temp.user_id, temp.channel_id)
    # 读取,删除,返回结果
    pl = cache_client.pipeline()

    # 拿督redis数据
    res = cache_client.zrevrange(key, 0, temp.article_num - 1)
    if res:
        # 手动删除读取出来的缓存结果
        pl.zrem(key, *res)
    else:

        # 如果没有redis缓存数据
        # 删除键
        cache_client.delete(key)
        try:
            # 1、# - 首先从wait_recommend中读取,没有直接返回空,进去正常召回流程
            wait_cache = eval(hbu.get_table_row('wait_recommend',
                                                'reco:{}'.format(temp.user_id).encode(),
                                                'channel:{}'.format(temp.channel_id).encode()))
        except Exception as e:
            logger.warning("{} WARN read user_id:{} wait_recommend exception:{} not exist".format(
                datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, e))
            wait_cache = []

        if not wait_cache:
            return wait_cache
        # 2、- 首先从wait_recommend中读取,有数据,读取出来放入自定义100个文章到redis当中,如有剩余放回到wait_recommend。小于自定义100,全部放入redis,wait_recommend直接清空
        # - 直接取出被推荐的结果,记录一下到历史记录当中
        # 假设是放入到redis当中为100个数据

        if len(wait_cache) > 100:
            logger.info(
                "{} INFO reduce user_id:{} channel:{} wait_recommend data".format(
                    datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))

            cache_redis = wait_cache[:100]
            # 前100个数据放入redis
            pl.zadd(key, dict(zip(cache_redis, range(len(cache_redis)))))

            # 100个后面的数据,在放回wait_recommend
            hbu.get_table_put('wait_recommend',
                              'reco:{}'.format(temp.user_id).encode(),
                              'channel:{}'.format(temp.channel_id).encode(),
                              str(wait_cache[100:]).encode())

        else:
            logger.info(
                "{} INFO delete user_id:{} channel:{} wait_recommend data".format(
                    datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
            # 清空wait_recommend数据
            hbu.get_table_put('wait_recommend',
                              'reco:{}'.format(temp.user_id).encode(),
                              'channel:{}'.format(temp.channel_id).encode(),
                               str([]).encode())

            # 所有不足100个数据,放入redis
            pl.zadd(key, dict(zip(wait_cache, range(len(wait_cache)))))

        res = cache_client.zrange(key, 0, temp.article_num - 1)

    # redis初始有无数据
    pl.execute()

    # 进行类型转换
    res = list(map(int, res))

    logger.info("{} INFO store user_id:{} channel:{} cache history data".format(
        datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
    # 进行推荐出去,要做放入历史推荐结果当中
    hbu.get_table_put('history_recommend',
                      'reco:his:{}'.format(temp.user_id).encode(),
                      'channel:{}'.format(temp.channel_id).encode(),
                      str(res).encode(),
                      timestamp=temp.time_stamp
                      )
    return res

5.6.2 在推荐中心加入缓存逻辑

  • from server import redis_cache
# 1、获取缓存
res = redis_cache.get_reco_from_cache(temp, self.hbu)

# 如果没有,然后走一遍算法推荐 召回+排序,同时写入到hbase待推荐结果列表
 if not res:
     logger.info("{} INFO get user_id:{} channel:{} recall/sort data".
                 format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))

     res = self.user_reco_list(temp)

5.7 排序模型在线预测

学习目标

  • 目标
  • 应用
    • 应用spark完成

5.7.1排序模型服务

 

 

  • 提供多种不同模型排序逻辑
    • SPARK LR/Tensorflow

5.7.2 排序模型在线预测

  • 召回之后的文章结果进行排序
  • 步骤:
    • 1、读取用户特征中心特征
    • 2、读取文章特征中心特征、合并用户文章特征构造预测样本
    • 4、预测并进行排序是筛选
import os
import sys
# 如果当前代码文件运行测试需要加入修改路径,避免出现后导包问题
BASE_DIR = os.path.dirname(os.getcwd())
sys.path.insert(0, os.path.join(BASE_DIR))

PYSPARK_PYTHON = "/miniconda2/envs/reco_sys/bin/python"
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
from pyspark import SparkConf
from pyspark.sql import SparkSession
from server.utils import HBaseUtils
from server import pool
from pyspark.ml.linalg import DenseVector
from pyspark.ml.classification import LogisticRegressionModel
import pandas as pd


conf = SparkConf()
config = (
    ("spark.app.name", "sort"),
    ("spark.executor.memory", "2g"),    # 设置该app启动时占用的内存用量,默认1g
    ("spark.master", 'yarn'),
    ("spark.executor.cores", "2"),   # 设置spark executor使用的CPU核心数
)

conf.setAll(config)
spark = SparkSession.builder.config(conf=conf).getOrCreate()

1、读取用户特征中心特征

hbu = HBaseUtils(pool)
# 排序
# 1、读取用户特征中心特征
try:
    user_feature = eval(hbu.get_table_row('ctr_feature_user',
                                               '{}'.format(1115629498121846784).encode(),
                                               'channel:{}'.format(18).encode()))
except Exception as e:
    user_feature = []

2、读取文章特征中心特征,并与用户特征进行合并,构造要推荐文章的样本

  • 合并特征向量(channel_id1个+文章向量100个+用户特征权重10个+文章关键词权重) = 121个特征

 

 

if user_feature:
    # 2、读取文章特征中心特征
    result = []

    for article_id in [17749, 17748, 44371, 44368]:
        try:
            article_feature = eval(hbu.get_table_row('ctr_feature_article',
                                                     '{}'.format(article_id).encode(),
                                                     'article:{}'.format(article_id).encode()))
        except Exception as e:
            article_feature = [0.0] * 111
        f = []
        # 第一个channel_id
        f.extend([article_feature[0]])
        # 第二个article_vector
        f.extend(article_feature[11:])
        # 第三个用户权重特征
        f.extend(user_feature)
        # 第四个文章权重特征
        f.extend(article_feature[1:11])
        vector = DenseVector(f)

        result.append([1115629498121846784, article_id, vector])

文章特征中心存的顺序

+----------+----------+--------------------+--------------------+--------------------+
|article_id|channel_id|             weights|       articlevector|            features|
+----------+----------+--------------------+--------------------+--------------------+
|        26|        17|[0.19827163395829...|[0.02069368539384...|[17.0,0.198271633...|
|        29|        17|[0.26031398249056...|[-0.1446092289546...|[17.0,0.260313982...|

最终结果:

 

 

3、处理样本格式,模型加载预测

# 4、预测并进行排序是筛选
df = pd.DataFrame(result, columns=["user_id", "article_id", "features"])
test = spark.createDataFrame(df)

# 加载逻辑回归模型
model = LogisticRegressionModel.load("hdfs://hadoop-master:9000/headlines/models/LR.obj")
predict = model.transform(test)

预测结果进行筛选

def vector_to_double(row):
    return float(row.article_id), float(row.probability[1])
res = predict.select(['article_id', 'probability']).rdd.map(vector_to_double).toDF(['article_id', 'probability']).sort('probability', ascending=False)

获取排序之后前N个文章

article_list = [i.article_id for i in res.collect()]
if len(article_list) > 100:
    article_list = article_list[:100]
reco_set = list(map(int, article_list))

5.7.3 添加实时排序的模型预测

  • 添加spark配置

grpc启动灰将spark相关信息初始化

from pyspark import SparkConf
from pyspark.sql import SparkSession
# spark配置
conf = SparkConf()
conf.setAll(DefaultConfig.SPARK_GRPC_CONFIG)

SORT_SPARK = SparkSession.builder.config(conf=conf).getOrCreate()



# SPARK grpc配置
SPARK_GRPC_CONFIG = (
  ("spark.app.name", "grpcSort"),  # 设置启动的spark的app名称,没有提供,将随机产生一个名称
  ("spark.master", "yarn"),
  ("spark.executor.instances", 4)
)
  • 添加模型服务预测函数
from server import SORT_SPARK
from pyspark.ml.linalg import DenseVector
from pyspark.ml.classification import LogisticRegressionModel
import pandas as pd
import numpy as np
from datetime import datetime
import logging

logger = logging.getLogger("recommend")

预测函数

def lr_sort_service(reco_set, temp, hbu):
    """
    排序返回推荐文章
    :param reco_set:召回合并过滤后的结果
    :param temp: 参数
    :param hbu: Hbase工具
    :return:
    """
    # 排序
    # 1、读取用户特征中心特征
    try:
        user_feature = eval(hbu.get_table_row('ctr_feature_user',
                                              '{}'.format(temp.user_id).encode(),
                                              'channel:{}'.format(temp.channel_id).encode()))
        logger.info("{} INFO get user user_id:{} channel:{} profile data".format(
            datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id))
    except Exception as e:
        user_feature = []

    if user_feature:
        # 2、读取文章特征中心特征
        result = []

        for article_id in reco_set:
            try:
                article_feature = eval(hbu.get_table_row('ctr_feature_article',
                                                         '{}'.format(article_id).encode(),
                                                         'article:{}'.format(article_id).encode()))
            except Exception as e:

                article_feature = [0.0] * 111
            f = []
            # 第一个channel_id
            f.extend([article_feature[0]])
            # 第二个article_vector
            f.extend(article_feature[11:])
            # 第三个用户权重特征
            f.extend(user_feature)
            # 第四个文章权重特征
            f.extend(article_feature[1:11])
            vector = DenseVector(f)
            result.append([temp.user_id, article_id, vector])

        # 4、预测并进行排序是筛选
        df = pd.DataFrame(result, columns=["user_id", "article_id", "features"])
        test = SORT_SPARK.createDataFrame(df)

        # 加载逻辑回归模型
        model = LogisticRegressionModel.load("hdfs://hadoop-master:9000/headlines/models/LR.obj")
        predict = model.transform(test)

        def vector_to_double(row):
            return float(row.article_id), float(row.probability[1])

        res = predict.select(['article_id', 'probability']).rdd.map(vector_to_double).toDF(
            ['article_id', 'probability']).sort('probability', ascending=False)
        article_list = [i.article_id for i in res.collect()]
        logger.info("{} INFO sorting user_id:{} recommend article".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                                                                          temp.user_id))
        # 排序后,只将排名在前100个文章ID返回给用户推荐
        if len(article_list) > 100:
            article_list = article_list[:100]
        reco_set = list(map(int, article_list))

    return reco_set

推荐中心加入排序

# 配置default
RAParam = param(
    COMBINE={
        'Algo-1': (1, [100, 101, 102, 103, 104], [200]),  # 首页推荐,所有召回结果读取+LR排序
        'Algo-2': (2, [100, 101, 102, 103, 104], [200])  # 首页推荐,所有召回结果读取 排序
    },

# reco_center
from server.sort_service import lr_sort_service
sort_dict = {
    'LR': lr_sort_service,
}

# 排序代码逻辑
_sort_num = RAParam.COMBINE[temp.algo][2][0]
reco_set = sort_dict[RAParam.SORT[_sort_num]](reco_set, temp, self.hbu)

5.7.4 supervisor添加grpc实时推荐程序

[program:online]
environment=JAVA_HOME=/root/bigdata/jdk,SPARK_HOME=/root/bigdata/spark,HADOOP_HOME=/root/bigdata/hadoop,PYSPARK_PYTHON=/miniconda2/envs/reco_sys/bin/python ,PYSPARK_DRIVER_PYTHON=/miniconda2/envs/reco_sys/bin/python
command=/miniconda2/envs/reco_sys/bin/python /root/toutiao_project/reco_sys/abtest/routing.py
directory=/root/toutiao_project/reco_sys/abtest
user=root
autorestart=true
redirect_stderr=true
stdout_logfile=/root/logs/recommendsuper.log
loglevel=info
stopsignal=KILL
stopasgroup=true
killasgroup=true

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/186282.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

(深度学习快速入门)第四章第一节:基础图像处理知识

文章目录一:位图和矢量图二:图像分辨率三:颜色模式(1)RGB(2)HSB(2)灰度图四:通道五:亮度、对比度和饱和度六:图像平滑和锐化&#xff…

D3股权穿透图

前言:最近做了一个项目,主要就是实现各种类似企查查的各种图谱,欢迎交流。后期将完成的谱图全部链接上,目前已大致实现了: 【企业关系图谱】、【企业构成图谱】、【股权穿透图】、【股权结构图】、【关联方认定图】 准…

【蓝桥杯基础题】2018年省赛—日志统计

👑专栏内容:蓝桥杯刷题⛪个人主页:子夜的星的主页💕座右铭:前路未远,步履不停 目录一、题目描述1.问题描述2.输入格式3.输出格式4.一个例子二、题目分析1、暴力法2、双指针三、代码汇总1、暴力代码汇总2、双…

【Mysql第一期 数据库概述】

文章目录1. 为什么要使用数据库2. 数据库与数据库管理系统2.1 数据库的相关概念2.2 数据库特点2.3SQL优点3.常见的数据库介绍1.Oracle2.SQL Server3.MySQL4.Access5.DB26.PostgreSQL7.SQLite8.informix4. MySQL介绍4.1Mysql重大历史事件4.2 关于MySQL 8.04.3 Why choose MySQL?…

linux内核读文件代码分析

linux下“一切皆文件”,所有设备都可以被抽象成文件,用户态可以通过open、read、write、llseek等api操作一个文件,通过系统调用进入内核态,最终访问到pagecache/磁盘上的数据,然后返回给用户态。 kernel version:v6.2-rc4 社区master主干 用户态应用程序调用read接口,通…

【转载】车载传感器与云端数据交换标准SensorIS的理解与使用

原文 https://zhuanlan.zhihu.com/p/386277784 1、什么是SensorIS?SensorIS全称是Sensor Interface Specification,翻译为中文就是传感器接口规范,是由来自全球汽车行业的主机厂、地图和数据提供商、传感器制造商和电信运营商共同组成的开放团体发布的一…

JavaEE day10 初识SpringMVC

JSON简介 JSON :JavaScript Object Notation JS对象表示法 是轻量级的文本数据交换格式,但是JSON仍然独立于语言和平台。其解析器和库支持许多不同的编程语言。目前非常多的动态编程语言(java,PHP)都支持JSON。JSON…

禅道好用吗?优缺点及类似10大项目管理系统介绍

类似禅道的十大项目管理软件:1、一站式研发项目管理软件PingCode;2、通用型项目协作工具Worktile;3、开源项目管理软件Redmine;4、免费项目管理软件Trello;5、无代码项目管理软件Monday;6、IT项目追踪管理工…

面试宝典-数据库基础

数据库基础前言一、数据库1.1 sql练习题1.2 sql语句执行顺序1.3 sql语句编写前言 本文主要记录B站视频视频链接的内容,做到知识梳理和总结的作用,项目git地址。 一、数据库 1.1 sql练习题 user表数据: idusername1张三2李四3王五4小刘 user_role表数…

CrackQL:一款功能强大的图形化密码爆破和模糊测试工具

关于CrackQL CrackQL是一款功能强大的图形化密码爆破和模糊测试工具,在该工具的帮助下,广大研究人员可以针对密码安全和应用程序安全进行渗透测试。 除此之外,CrackQL同时也是一款通用的GraphQL渗透测试工具,它可以控制速率限制…

垃圾分类智能分析系统 yolov7

垃圾分类智能分析系统应用pythonyolov7网络模型深度学习识别技术,自动识别违规投放行为并现场进行语音提示实时预警。如垃圾满溢抓拍预警、人脸识别、工服识别、厨余垃圾混投未破袋识别预警、垃圾落地识别预警、人来扔垃圾语音提醒等。我们选择当下YOLO最新的卷积神…

数组去重的七种方法

数组去重的七种方法1. 双重for循环2. forindexOf3.es6 set4.filter5.includes6.创建一个新的object7.new Map()1. 双重for循环 第1种是定义一个新的空数组,再执行嵌套双循环,监测空数组中如果没有的元素,push进空数组中。这个方法考察了conti…

AcWing - 寒假每日一题2023(DAY 16——DAY 20)

文章目录一、AcWing 4455. 出行计划(简单)1. 实现思路2. 实现代码二、AcWing 4510. 寻宝!大冒险!(简单)1. 实现思路2. 实现代码三、AcWing 3422. 左孩子右兄弟(中等)1. 实现思路2. 实…

【MySQL】过年没有回老家,在出租屋里整理了一些思维导图

Xmind导图知识点Mysql知识点SQL知识点Mybatis知识点面试题分享MySQL部分Mybatis部分Mysql知识点 通过下面的图片可以看出,MySQL基础语法分为四部分:连接数据库,对数据库的操作,对表中的数据操作,对表操作等等。 SQL…

python exe程序注册为window系统服务

1、使用pyinstaller将py打包成exe 1、安装 pip install pyinstaller2、打包成exe可执行文件 pyinstaller -F packTest.py #packTest.py为待打包的py文件打包成功后会在同级目录中生成两个文件夹和一个文件,分别为dist和build文件夹,以及一份与.py文件同…

Java——最大子数组和

题目链接 leetcode在线oj题——最大子数组和 题目描述 给你一个整数数组 nums ,请你找出一个具有最大和的连续子数组(子数组最少包含一个元素),返回其最大和。 子数组 是数组中的一个连续部分。 题目示例 输入:…

Vulnhub DC-4靶机渗透

环境准备DC-4靶机 ip:???????kali攻击机 ip:192.168.153.128一、信息收集kali攻击机中,使用 arp-scan -l 扫描c段(-l为扫描c段)确定靶…

自动驾驶——智能配电

一、汽车配电 汽车配电(Power Distrubition Unit,PDU)分为低压配电与高压配电,即低压PDU与高压PDU。 二、传统控制方式——PCB式电器盒 传统配电盒(机电器件): (1)继…

为什么要做黑盒测试?黑盒测试有什么作用?

对于软件测试的从业者来说,黑盒测试是十分重要的测试方式,它可以弥补白盒测试检查不到的部分。可能刚刚入门的测试小白,对于为什么要做黑盒测试?黑盒测试有什么作用?仍然抱有很大的疑问。下面小编就来从黑盒测试的概念…

QT入门Buttons之QPushButton

目录 一、界面布局介绍 1、布局器中的位置及使用 2、控件的界面属性 3、常用基本属性介绍 3.1控件名称 3.2控件大小属性 3.3按钮上的文字设置 3.4设置按钮的样式 二、属性功能介绍 1、常用方法介绍 2、基本信号介绍 三、Demo展示 一、界面布局介绍 1、布局器中的位…