推荐系统之召回集读取服务

news2024/10/5 12:49:50

5.4 召回集读取服务

学习目标

  • 目标
  • 应用

5.4.1 召回集读取服务

 

 

  • 添加一个server的目录
    • 会添加推荐中心,召回读取服务,模型排序服务,缓存服务
    • 这里先添加一个召回集的结果读取服务recall_service.py
    • utils.py中装有自己封装的hbase数据库读取存储工具

5.4.1 Hbase读取存储等工具类封装

为什么封装?

在写happybase代码的时候会有过多的重复代码,将这些封装成简便的工具,减少代码冗余

  • 包含方法
    • get_table_row(self, table_name, key_format, column_format=None, include_timestamp=False):
      • 获取具体表中的键、列族中的行数据
    • get_table_cells(self, table_name, key_format, column_format=None, timestamp=None, include_timestamp=False):
      • 获取Hbase中多个版本数据
    • get_table_put(self, table_name, key_format, column_format, data, timestamp=None):
      • 存储数据到Hbase当中
    • get_table_delete(self, table_name, key_format, column_format):
      • 删除Hbase中的数据
class HBaseUtils(object):
    """HBase数据库读取工具类
    """
    def __init__(self, connection):
        self.pool = connection

    def get_table_row(self, table_name, key_format, column_format=None, include_timestamp=False):
        """
        获取HBase数据库中的行记录数据
        :param table_name: 表名
        :param key_format: key格式字符串, 如表的'user:reco:1', 类型为bytes
        :param column_format: column, 列族字符串,如表的 column 'als:18',类型为bytes
        :param include_timestamp: 是否包含时间戳
        :return: 返回数据库结果data
        """
        if not isinstance(key_format, bytes):
            raise KeyError("key_format or column type error")

        if not isinstance(table_name, str):
            raise KeyError("table_name should str type")

        with self.pool.connection() as conn:
            table = conn.table(table_name)

            if column_format:
                data = table.row(row=key_format, columns=[column_format], include_timestamp=include_timestamp)
            else:
                data = table.row(row=key_format)
            conn.close()

        if column_format:
            return data[column_format]
        else:
            # {b'als:5': (b'[141440]', 1555519429582)}
            # {b'als:5': '[141440]'}
            return data

    def get_table_cells(self, table_name, key_format, column_format=None, timestamp=None, include_timestamp=False):
        """
        获取HBase数据库中多个版本数据
        :param table_name: 表名
        :param key_format: key格式字符串, 如表的'user:reco:1', 类型为bytes
        :param column_format: column, 列族字符串,如表的 column 'als:18',类型为bytes
        :param timestamp: 指定小于该时间戳的数据
        :param include_timestamp: 是否包含时间戳
        :return: 返回数据库结果data
        """
        if not isinstance(key_format, bytes) or not isinstance(column_format, bytes):
            raise KeyError("key_format or column type error")

        if not isinstance(table_name, str):
            raise KeyError("table_name should str type")

        with self.pool.connection() as conn:
            table = conn.table(table_name)

            data = table.cells(row=key_format, column=column_format, timestamp=timestamp,
                               include_timestamp=include_timestamp)

            conn.close()
        # [(,), ()]
        return data

    def get_table_put(self, table_name, key_format, column_format, data, timestamp=None):
        """

        :param table_name: 表名
        :param key_format: key格式字符串, 如表的'user:reco:1', 类型为bytes
        :param column_format: column, 列族字符串,如表的 column 'als:18',类型为bytes
        :param data: 插入的数据
        :param timestamp: 指定拆入数据的时间戳
        :return: None
        """
        if not isinstance(key_format, bytes) or not isinstance(column_format, bytes) or not isinstance(data, bytes):
            raise KeyError("key_format or column or data type error")

        if not isinstance(table_name, str):
            raise KeyError("table_name should str type")

        with self.pool.connection() as conn:
            table = conn.table(table_name)

            table.put(key_format, {column_format: data}, timestamp=timestamp)

            conn.close()
        return None

    def get_table_delete(self, table_name, key_format, column_format):
        """
        删除列族中的内容
        :param table_name: 表名称
        :param key_format: key
        :param column_format: 列格式
        :return:
        """
        if not isinstance(key_format, bytes) or not isinstance(column_format, bytes):
            raise KeyError("key_format or column type error")

        if not isinstance(table_name, str):
            raise KeyError("table_name should str type")
        with self.pool.connection() as conn:
            table = conn.table(table_name)
            table.delete(row=key_format, columns=[column_format])
            conn.close()
        return None

5.4.2 多路召回结果读取

  • 目的:读取离线和在线存储的召回结果

    • hbase的存储:cb_recall, als, content, online
  • 步骤:

    • 1、初始化redis,hbase相关工具
    • 2、在线画像召回,离线画像召回,离线协同召回数据的读取
    • 3、redis新文章和热门文章结果读取
    • 4、相似文章读取接口

初始化redis,hbase相关工具

import os
import sys
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(BASE_DIR))

from server import redis_client
from server import pool
import logging
from datetime import datetime
from server.utils import HBaseUtils

logger = logging.getLogger('recommend')


class ReadRecall(object):
    """读取召回集的结果
    """
    def __init__(self):
        self.client = redis_client
        self.hbu = HBaseUtils(pool)

并且添加了获取结果打印日志设置

# 实施推荐日志
# 离线处理更新打印日志
trace_file_handler = logging.FileHandler(
  os.path.join(logging_file_dir, 'recommend.log')
)
trace_file_handler.setFormatter(logging.Formatter('%(message)s'))
log_trace = logging.getLogger('recommend')
log_trace.addHandler(trace_file_handler)
log_trace.setLevel(logging.INFO)

在init文件中添加相关初始化数据库变量

import redis
import happybase
from setting.default import DefaultConfig
from pyspark import SparkConf
from pyspark.sql import SparkSession


pool = happybase.ConnectionPool(size=10, host="hadoop-master", port=9090)

redis_client = redis.StrictRedis(host=DefaultConfig.REDIS_HOST,
                                 port=DefaultConfig.REDIS_PORT,
                                 db=10,
                                 decode_responses=True)

# 缓存在8号当中
cache_client = redis.StrictRedis(host=DefaultConfig.REDIS_HOST,
                                 port=DefaultConfig.REDIS_PORT,
                                 db=8,
                                 decode_responses=True)

2、在线画像召回,离线画像召回,离线协同召回数据的读取

  • 读取用户的指定列族的召回数据,并且读取之后要删除原来的推荐召回结果'cb_recall'
    def read_hbase_recall_data(self, table_name, key_format, column_format):
        """
        读取cb_recall当中的推荐数据
        读取的时候可以选择列族进行读取als, online, content

        :return:
        """
        recall_list = []
        try:
            data = self.hbu.get_table_cells(table_name, key_format, column_format)

            # data是多个版本的推荐结果[[],[],[],]
            for _ in data:
                recall_list = list(set(recall_list).union(set(eval(_))))

            # self.hbu.get_table_delete(table_name, key_format, column_format)
        except Exception as e:
            logger.warning("{} WARN read {} recall exception:{}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                                                                     table_name, e))
        return recall_list

测试:

if __name__ == '__main__':
    rr = ReadRecall()
    # 召回结果的读取封装
    # print(rr.read_hbase_recall_data('cb_recall', b'recall:user:1114864874141253632', b'online:18'))

3、redis新文章和热门文章结果读取

    def read_redis_new_article(self, channel_id):
        """
        读取新闻章召回结果
        :param channel_id: 提供频道
        :return:
        """
        logger.warning("{} WARN read channel {} redis new article".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                                                                 channel_id))
        _key = "ch:{}:new".format(channel_id)
        try:
            res = self.client.zrevrange(_key, 0, -1)
        except Exception as e:
            logger.warning("{} WARN read new article exception:{}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e))
            res = []

        return list(map(int, res))

热门文章读取:热门文章记录了很多,可以选取前K个

    def read_redis_hot_article(self, channel_id):
        """
        读取新闻章召回结果
        :param channel_id: 提供频道
        :return:
        """
        logger.warning("{} WARN read channel {} redis hot article".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), channel_id))
        _key = "ch:{}:hot".format(channel_id)
        try:
            res = self.client.zrevrange(_key, 0, -1)

        except Exception as e:
            logger.warning("{} WARN read new article exception:{}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e))
            res = []

        # 由于每个频道的热门文章有很多,因为保留文章点击次数
        res = list(map(int, res))
        if len(res) > self.hot_num:
            res = res[:self.hot_num]
        return res

测试:

print(rr.read_redis_new_article(18))
print(rr.read_redis_hot_article(18))

4、相似文章读取接口

最后相似文章读取接口代码

  • 会有接口获取固定的文章数量(用在黑马头条APP中的猜你喜欢接口)
    def read_hbase_article_similar(self, table_name, key_format, article_num):
        """获取文章相似结果
        :param article_id: 文章id
        :param article_num: 文章数量
        :return:
        """
        # 第一种表结构方式测试:
        # create 'article_similar', 'similar'
        # put 'article_similar', '1', 'similar:1', 0.2
        # put 'article_similar', '1', 'similar:2', 0.34
        try:
            _dic = self.hbu.get_table_row(table_name, key_format)

            res = []
            _srt = sorted(_dic.items(), key=lambda obj: obj[1], reverse=True)
            if len(_srt) > article_num:
                _srt = _srt[:article_num]
            for _ in _srt:
                res.append(int(_[0].decode().split(':')[1]))
        except Exception as e:
            logger.error(
                "{} ERROR read similar article exception: {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e))
            res = []
        return res

完整代码:

import os
import sys
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(BASE_DIR))

from server import redis_client
from server import pool
import logging
from datetime import datetime
from abtest.utils import HBaseUtils

logger = logging.getLogger('recommend')


class ReadRecall(object):
    """读取召回集的结果
    """
    def __init__(self):
        self.client = redis_client
        self.hbu = HBaseUtils(pool)

    def read_hbase_recall_data(self, table_name, key_format, column_format):
        """获取指定用户的对应频道的召回结果,在线画像召回,离线画像召回,离线协同召回
        :return:
        """
        # 获取family对应的值
        # 数据库中的键都是bytes类型,所以需要进行编码相加
        # 读取召回结果多个版本合并
        recall_list = []
        try:

            data = self.hbu.get_table_cells(table_name, key_format, column_format)
            for _ in data:
                recall_list = list(set(recall_list).union(set(eval(_))))

            # 读取所有这个用户的在线推荐的版本,清空该频道的数据
            # self.hbu.get_table_delete(table_name, key_format, column_format)
        except Exception as e:
            logger.warning(
                "{} WARN read recall data exception:{}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e))
        return recall_list

    def read_redis_new_data(self, channel_id):
        """获取redis新文章结果
        :param channel_id:
        :return:
        """
        # format结果
        logger.info("{} INFO read channel:{} new recommend data".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), channel_id))
        _key = "ch:{}:new".format(channel_id)
        try:
            res = self.client.zrevrange(_key, 0, -1)
        except redis.exceptions.ResponseError as e:
            logger.warning("{} WARN read new article exception:{}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e))
            res = []
        return list(map(int, res))

    def read_redis_hot_data(self, channel_id):
        """获取redis热门文章结果
        :param channel_id:
        :return:
        """
        # format结果
        logger.info("{} INFO read channel:{} hot recommend data".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), channel_id))
        _key = "ch:{}:hot".format(channel_id)
        try:
            _res = self.client.zrevrange(_key, 0, -1)
        except redis.exceptions.ResponseError as e:
            logger.warning("{} WARN read hot article exception:{}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e))
            _res = []
        # 每次返回前50热门文章
        res = list(map(int, _res))
        if len(res) > 50:
            res = res[:50]
        return res

    def read_hbase_article_similar(self, table_name, key_format, article_num):
        """获取文章相似结果
        :param article_id: 文章id
        :param article_num: 文章数量
        :return:
        """
        # 第一种表结构方式测试:
        # create 'article_similar', 'similar'
        # put 'article_similar', '1', 'similar:1', 0.2
        # put 'article_similar', '1', 'similar:2', 0.34
        try:
            _dic = self.hbu.get_table_row(table_name, key_format)

            res = []
            _srt = sorted(_dic.items(), key=lambda obj: obj[1], reverse=True)
            if len(_srt) > article_num:
                _srt = _srt[:article_num]
            for _ in _srt:
                res.append(int(_[0].decode().split(':')[1]))
        except Exception as e:
            logger.error("{} ERROR read similar article exception: {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e))
            res = []
        return res


if __name__ == '__main__':

    rr = ReadRecall()
    print(rr.read_hbase_article_similar('article_similar', b'13342', 10))
    print(rr.read_hbase_recall_data('cb_recall', b'recall:user:1115629498121846784', b'als:18'))

    # rr = ReadRecall()
    # print(rr.read_redis_new_data(18))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/185873.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【GD32F427开发板试用】GD32的ISP进行程序烧录

本篇文章来自极术社区与兆易创新组织的GD32F427开发板评测活动,更多开发板试用活动请关注极术社区网站。作者:羁傲不驯ぃ 什么是ISP? ISP是In-System Program的缩写,即在系统编程。通常我们开发使用仿真器来下载和调试程序,当固…

linux下安装elasticsearch

一:安装JDK1:java安装地址最新版:https://www.oracle.com/java/technologies/javase-downloads.html历史版:https://www.oracle.com/java/technologies/oracle-java-archive-downloads.html2:安装java(1):安…

vue使用echarts画可视化大屏

画出来的页面效果如下: 一、布局 整体使用element-ui的layout布局,即el-rowel-col,便于自适应 二、配置跨域 首先创建个vue.config.js的文件 module.exports {lintOnSave: false,devServer: {proxy: { //配置跨域/: {target: , //填写…

Python——matplotlib绘图可视化知识点整理

无论你工作在什么项目上,IPython都是值得推荐的。利用ipython --pylab,可以进入PyLab模式,已经导入了matplotlib库与相关软件包(例如Numpy和Scipy),额可以直接使用相关库的功能。 本文作为学习过程中对matplotlib一些…

Oracle Blogs上的Flashback文章

Oracle Database und Temporal Validity Temporal Validity和Flashback的区别&#xff1f;两者通常配合使用。 延伸阅读&#xff1a;Oracle 数据库和时间有效性 时间有效性也称为 Flashback Time Travel。 显示不可见的列&#xff1a; set colinvisible on desc <table_…

PyQt5开发环境搭建 1.3 Python语法练习

第一组练习1阅读理解。输入红色框框中命令。说出文中大致意思。&#xff08;限30个字&#xff09;解&#xff1a;点击运行之后会跳到一个网站https://xkcd.com/353/练习2建立如下py文件并运行&#xff0c;贴出在Eric6下运行输出结果&#xff08;注意文件名中的bkjtest用自己的姓…

NSSCTF Round#7部分wp

Web ec_RCE 源码: <!-- A EZ RCE IN REALWORLD _ FROM CHINA.TW --> <!-- By 探姬 --> <?PHPif(!isset($_POST["action"]) && !isset($_POST["data"]))show_source(__FILE__);putenv(LANGzh_TW.utf8); $action $_POST["a…

layui遇到的一些问题

目录一、layui nav 菜单栏默认收缩二、layui 数据表格 单元格 颜色设置三、layui表格没有数据的时候&#xff0c;表头没有横向滚动条四、layui layer.open 弹窗全屏显示五、layui表格通过点击tr改变这一行的颜色六、 layer.open弹框弹出后父页面滚动问题七、LayUI下拉框中取值和…

Cuba勒索软件深度分析及防护建议

0 1. Cuba勒索软件的部署方式 Cuba勒索软件家族于2019年12月首次浮出水面。此后&#xff0c;该勒索软件家族背后的攻击者改变了策略和工具&#xff0c;成为2022年成为更普遍的威胁。该勒索软件历来通过Hancitor分发&#xff0c;通常通过恶意附件传递。 Hancitor也被称为Chani…

C++001-对比编程语言C++和python

文章目录C001-对比编程语言C和python编程语言发展史计算机 ENIAC机器语言&#xff1a;汇编语言&#xff1a;高级语言&#xff1a;机器汇编高级语言对比C语言与汇编不同高级语言的应用场景C和python语法对比Print Hello WorldPrint Hello 10 timesCreate a procedureCheck if li…

Kotlin的5种单例模式

前言最近在学习Kotlin这门语言&#xff0c;在项目开发中&#xff0c;运用到了单例模式。因为其表达方式与Java是不同的。所以对不同单例模式的实现进行了分别探讨。主要单例模式实现如下&#xff1a;饿汉式懒汉式线程安全的懒汉式双重校验锁式静态内部类式PS:该篇文章不讨论单例…

Windows并发测试工具

Apache安装目录cmd用ab并发测试工具&#xff0c;请求10次&#xff0c;并发为5ab -n 10 -c 5 http://www.ysbm.com/api.php/task/testBingfa

安装kali linuxnmap使用(一)

安装环境 vmware17 kali linux 怎么安装可以查看这个博主的文章 这么说你需要重置root密码 sudo passwd root 或者你打出node -v但是kali linux没有nodejs,则会询问你是否需要安装。开玩笑,你可以使用sudo 指令来获取权限(第一次输入需要你的密码) nmap 这是kali linux自带…

沁恒CH32V307单片机入门(02):官方库与工程模板介绍

文章目录目的官方库工程模板使用例程总结目的 现在开发单片机大多数时候都是面向库开发的&#xff0c;这里将简单介绍下CH32V307的官方库。 在开发过程中新建项目时通常会从某些模板开始&#xff0c;模板包含了库和初始化代码等内容&#xff0c;有一定的组织好的目录结构&…

【C++】模板

模板一、非类型模板参数二、模板的特化2.1 函数模板的特化2.2 类模板的特化2.2.1 全特化2.2.2 偏特化三、模板的分离编译一、非类型模板参数 模板参数分为类型形参与非类型形参。 类型形参&#xff1a;出现在模板参数列表中&#xff0c;跟在class或者typename之类的参数类型名称…

为什么 Go 不支持 []T 转换为 []interface

在 Go 中&#xff0c;如果 interface{} 作为函数参数的话&#xff0c;是可以传任意参数的&#xff0c;然后通过类型断言来转换。 举个例子&#xff1a; package mainimport "fmt"func foo(v interface{}) {if v1, ok1 : v.(string); ok1 {fmt.Println(v1)} else if…

【测试设计】使用jenkins 插件Allure生成自动化测试报告

前言 以前做自动化测试的时候一直用的HTMLTestRunner来生成测试报告&#xff0c;后来也尝试过用Python的PyH模块自己构建测试报告&#xff0c;在后来看到了RobotFramework的测试报告&#xff0c;感觉之前用的测试报告都太简陋&#xff0c;它才是测试报告应该有的样子。也就是在…

【01Studio MaixPy AI K210】25.云训练模型文件

采集数据 根据它云训练平台的要求&#xff0c;它要求的图片格式必须是224*224的&#xff08;重点之重点&#xff09;&#xff0c;所以可以利用K210跑脚本直接采集数据。 数据采集脚本 main.py实验名称&#xff1a;照相机 说明&#xff1a;通过按键拍照并在LCD上显示&#xff08…

windows自建免费无限的开源图片识别公式转换为Latex

一、准备 python3.9.6下载 在最开始勾选添加环境变量 https://www.python.org/ftp/python/3.9.6/python-3.9.6-amd64.exe 验证&#xff0c;右键终端&#xff08;管理员&#xff09;&#xff0c;输入&#xff1a;python --version安装Anaconda https://mirrors.bfsu.edu.cn/ana…

算法拾遗二十五之暴力递归到动态规划三

算法拾遗二十五之暴力递归到动态规划三最长回文子串返回象棋从一个位置到另一个位置的方法有多少种返回咖啡机从开始到干净的最短时间最长回文子串 测试链接&#xff1a;https://leetcode.cn/problems/longest-palindromic-subsequence/ 子序列&#xff1a;是可以不连续的 子…