实时推荐业务介绍 grpc接口对接

news2025/1/13 10:11:42

5.1 实时推荐业务介绍

学习目标

  • 目标
  • 应用

5.1.1 实时推荐逻辑

  • 逻辑流程

    • 1、后端发送推荐请求,实时推荐系统拿到请求参数
      • grpc对接
    • 2、根据用户进行ABTest分流
      • ABTest实验中心,用于进行分流任务,方便测试调整不同的模型上线
    • 3、推荐中心服务
      • 根据用户在ABTest分配的算法进行召回服务和排序服务读取返回结果
    • 4、返回推荐结果和埋点参数封装
  • 实时推荐的流程

  • ABTest与推荐中心逻辑

 

5.2 grpc接口对接

学习目标

  • 目标
  • 应用

5.2.1 头条推荐接口对接

  • 请求参数:

    • feed流推荐:用户ID,频道ID,推荐文章数量,请求推荐时间戳
    • 相似文章获取:文章ID,推荐文章数量
  • 返回参数:

    • feed流推荐:曝光参数,每篇文章的所有行为参数,上一条时间戳

    • # 埋点参数参考:
      # {
      #     "param": '{"action": "exposure", "userId": 1, "articleId": [1,2,3,4],  "algorithmCombine": "c1"}',
      #     "recommends": [
      #         {"article_id": 1, "param": {"click": "{"action": "click", "userId": "1", "articleId": 1, "algorithmCombine": 'c1'}", "collect": "", "share": "","read":""}},
      #         {"article_id": 2, "param": {"click": "", "collect": "", "share": "", "read":""}},
      #         {"article_id": 3, "param": {"click": "", "collect": "", "share": "", "read":""}},
      #         {"article_id": 4, "param": {"click": "", "collect": "", "share": "", "read":""}}
      #     ]
      #     "timestamp": 1546391572
      # }
      
    • 相似文章获取:文章ID列表

 

 

5.2.2 简介

  • gRPC是由Google公司开源的高性能RPC框架。

  • gRPC支持多语言

    gRPC原生使用C、Java、Go进行了三种实现,而C语言实现的版本进行封装后又支持C++、C#、Node、ObjC、 Python、Ruby、PHP等开发语言

  • gRPC支持多平台

    支持的平台包括:Linux、Android、iOS、MacOS、Windows

  • gRPC的消息协议使用Google自家开源的Protocol Buffers协议机制(proto3) 序列化

  • gRPC的传输使用HTTP/2标准,支持双向流和连接多路复用

使用方法

  1. 使用Protocol Buffers(proto3)的IDL接口定义语言定义接口服务,编写在文本文件(以.proto为后缀名)中。
  2. 使用protobuf编译器生成服务器和客户端使用的stub代码

在gRPC中推荐使用proto3版本。

5.2.3 代码结构

Protocol Buffers版本

Protocol Buffers文档的第一行非注释行,为版本申明,不填写的话默认为版本2。

syntax = "proto3";
或者
syntax = "proto2";
  • 消息类型

Protocol Buffers使用message定义消息数据。在Protocol Buffers中使用的数据都是通过message消息数据封装基本类型数据或其他消息数据,对应Python中的类。

message SearchRequest {
  string query = 1;
  int32 page_number = 2;
  int32 result_per_page = 3;
}
  • 字段编号

消息定义中的每个字段都有唯一的编号。这些字段编号用于以消息二进制格式标识字段,并且在使用消息类型后不应更改。 请注意,1到15范围内的字段编号需要一个字节进行编码,包括字段编号和字段类型16到2047范围内的字段编号占用两个字节。因此,您应该为非常频繁出现的消息元素保留数字1到15。请记住为将来可能添加的常用元素留出一些空间。

最小的标识号可以从1开始,最大到2^29 - 1,或 536,870,911。不可以使用其中的[19000-19999]的标识号, Protobuf协议实现中对这些进行了预留。如果非要在.proto文件中使用这些预留标识号,编译时就会报警。同样你也不能使用早期保留的标识号。

  • 指定字段规则

消息字段可以是以下之一:

  • singular:格式良好的消息可以包含该字段中的零个或一个(但不超过一个)。

  • repeated:此字段可以在格式良好的消息中重复任意次数(包括零)。将保留重复值的顺序。对应Python的列表。

    message Result {
      string url = 1;
      string title = 2;
      repeated string snippets = 3;
    }
    
  • 添加更多消息类型

可以在单个.proto文件中定义多个消息类型。

message SearchRequest {
  string query = 1;
  int32 page_number = 2;
  int32 result_per_page = 3;
}

message SearchResponse {
 ...
}
  • 安装protobuf编译器和grpc库
pip install grpcio-tools
  • 编译生成代码
python -m grpc_tools.protoc -I. --python_out=.. --grpc_python_out=.. itcast.proto
  • -I表示搜索proto文件中被导入文件的目录
  • --python_out表示保存生成Python文件的目录,生成的文件中包含接口定义中的数据类型
  • --grpc_python_out表示保存生成Python文件的目录,生成的文件中包含接口定义中的服务类型

5.2.4 某项目推荐接口protoco协议定义

创建abtest目录,将相关接口代码放入user_reco.proto协议文件

  • 用户刷新feed流接口
    • user_recommend(User) returns (Track)
  • 文章相似(猜你喜欢)接口
    • article_recommend(Article) returns(Similar)
syntax = "proto3";

message User {

    string user_id = 1;
    int32 channel_id = 2;
    int32 article_num = 3;
    int64 time_stamp = 4;
}
// int32 ---> int64 article_id
message Article {

    int64 article_id = 1;
    int32 article_num = 2;

}

message param2 {
    string click = 1;
    string collect = 2;
    string share = 3;
    string read = 4;
}

message param1 {
    int64 article_id = 1;
    param2 params = 2;
}

message Track {
    string exposure = 1;
    repeated param1 recommends = 2;
    int64 time_stamp = 3;
}

message Similar {
    repeated int64 article_id = 1;
}

service UserRecommend {
    // feed recommend
    rpc user_recommend(User) returns (Track) {}
    rpc article_recommend(Article) returns(Similar) {}
}

通过命令生成

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. user_reco.proto

5.2.4 某项目grpc服务端编写

创建routing.py文件,填写服务端代码:

相关包

import os
import sys

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(BASE_DIR))
from concurrent import futures
from abtest import user_reco_pb2
from abtest import user_reco_pb2_grpc
from setting.default import DefaultConfig
import grpc
import time
import json

完整程序代码

需要添加grpc服务配置:

# rpc
RPC_SERVER = '192.168.19.137:9999'

完整代码:

# 基于用户推荐的rpc服务推荐
# 定义指定的rpc服务输入输出参数格式proto
class UserRecommendServicer(user_reco_pb2_grpc.UserRecommendServicer):
    """
    对用户进行技术文章推荐
    """
    def user_recommend(self, request, context):
        """
        用户feed流推荐
        :param request:
        :param context:
        :return:
        """
        # 选择C4组合
        user_id = request.user_id
        channel_id = request.channel_id
        article_num = request.article_num
        time_stamp = request.time_stamp

        # 解析参数,并进行推荐中心推荐(暂时使用假数据替代)
        class Temp(object):
            user_id = -10
            algo = 'test'
            time_stamp = -10

        tp = Temp()
        tp.user_id = user_id
        tp.time_stamp = time_stamp
        _track = add_track([], tp)

        # 解析返回参数到rpc结果参数
        # 参数如下
        # [       {"article_id": 1, "param": {"click": "", "collect": "", "share": "", 'detentionTime':''}},
        #         {"article_id": 2, "param": {"click": "", "collect": "", "share": "", 'detentionTime':''}},
        #         {"article_id": 3, "param": {"click": "", "collect": "", "share": "", 'detentionTime':''}},
        #         {"article_id": 4, "param": {"click": "", "collect": "", "share": "", 'detentionTime':''}}
        #     ]
        # 第二个rpc参数
        _param1 = []
        for _ in _track['recommends']:
            # param的封装
            _params = user_reco_pb2.param2(click=_['param']['click'],
                                           collect=_['param']['collect'],
                                           share=_['param']['share'],
                                           read=_['param']['read'])
            _p2 = user_reco_pb2.param1(article_id=_['article_id'], params=_params)
            _param1.append(_p2)
        # param
        return user_reco_pb2.Track(exposure=_track['param'], recommends=_param1, time_stamp=_track['timestamp'])

#    def article_recommend(self, request, context):
#        """
#       文章相似推荐
#       :param request:
#       :param context:
#       :return:
#       """
#       # 获取web参数
#       article_id = request.article_id
#       article_num = request.article_num
#
#        # 进行文章相似推荐,调用推荐中心的文章相似
#       _article_list = article_reco_list(article_id, article_num, 105)
#
#       # rpc参数封装
#       return user_reco_pb2.Similar(article_id=_article_list)


def serve():

    # 多线程服务器
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    # 注册本地服务
    user_reco_pb2_grpc.add_UserRecommendServicer_to_server(UserRecommendServicer(), server)
    # 监听端口
    server.add_insecure_port(DefaultConfig.RPC_SERVER)

    # 开始接收请求进行服务
    server.start()
    # 使用 ctrl+c 可以退出服务
    _ONE_DAY_IN_SECONDS = 60 * 60 * 24
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)


if __name__ == '__main__':
    # 测试grpc服务
    serve()

埋点参数的接口封装:

其中:

class Temp(object):
    user_id = '1115629498121846784'
    algo = 'test'
    time_stamp = int(time.time() * 1000)
_track = add_track([], Temp())

web后台请求传入的时间戳是time.time(),Out[3]: int(1558128143.8735564) * 1000的大小

def add_track(res, temp):
    """
    封装埋点参数
    :param res: 推荐文章id列表
    :param cb: 合并参数
    :param rpc_param: rpc参数
    :return: 埋点参数
        文章列表参数
        单文章参数
    """
    # 添加埋点参数
    track = {}

    # 准备曝光参数
    # 全部字符串形式提供,在hive端不会解析问题
    _exposure = {"action": "exposure", "userId": temp.user_id, "articleId": json.dumps(res),
                 "algorithmCombine": temp.algo}

    track['param'] = json.dumps(_exposure)
    track['recommends'] = []

    # 准备其它点击参数
    for _id in res:
        # 构造字典
        _dic = {}
        _dic['article_id'] = _id
        _dic['param'] = {}

        # 准备click参数
        _p = {"action": "click", "userId": temp.user_id, "articleId": str(_id),
              "algorithmCombine": temp.algo}

        _dic['param']['click'] = json.dumps(_p)
        # 准备collect参数
        _p["action"] = 'collect'
        _dic['param']['collect'] = json.dumps(_p)
        # 准备share参数
        _p["action"] = 'share'
        _dic['param']['share'] = json.dumps(_p)
        # 准备detentionTime参数
        _p["action"] = 'read'
        _dic['param']['read'] = json.dumps(_p)

        track['recommends'].append(_dic)

    track['timestamp'] = temp.time_stamp
    return track

提供客户端测试代码:

  • 测试客户端
import os
import sys

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(BASE_DIR))
from abtest import user_reco_pb2_grpc
from abtest import user_reco_pb2
import grpc
from setting.default import DefaultConfig
import time


def test():
    article_dict = {}
    # 构造传入数据

    req_article = user_reco_pb2.User()
    req_article.user_id = '1115629498121846784'
    req_article.channel_id = 18
    req_article.article_num = 10
    req_article.time_stamp = int(time.time() * 1000)
    # req_article.time_stamp = 1555573069870

    with grpc.insecure_channel(DefaultConfig.RPC_SERVER) as rpc_cli:
        print('''''')
        try:
            stub = user_reco_pb2_grpc.UserRecommendStub(rpc_cli)
            resp = stub.user_recommend(req_article)
        except Exception as e:
            print(e)
            article_dict['param'] = []
        else:

            # 解析返回结果参数
            article_dict['exposure_param'] = resp.exposure

            reco_arts = resp.recommends

            reco_art_param = []
            reco_list = []
            for art in reco_arts:
                reco_art_param.append({
                    'artcle_id': art.article_id,
                    'params': {
                        'click': art.params.click,
                        'collect': art.params.collect,
                        'share': art.params.share,
                        'read': art.params.read
                    }
                })

                reco_list.append(art.article_id)
            article_dict['param'] = reco_art_param

            # 文章列表以及参数(曝光参数 以及 每篇文章的点击等参数)
            print(reco_list, article_dict)

if __name__ == '__main__':
    test()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/181825.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

k8s之部署有状态应用

写在前面 本文一起看下k8s对于有状态应用部署提供的解决方案。 1:有状态应用和无状态应用 如果是一个应用每次重启时依赖环境都能和第一次启动时的完全一致,则就可以称这类应用是无状态应用用,反之,就是有状态应用,如…

自动写代码的AI工具,已经支持 VsCode 插件安装使用

自动写代码的AI工具,已经支持 VsCode 插件安装使用,它的功能并不是「代码补全」,而是「代码生成」。 之前有个比较火的 GitHub Copilot,但是这是商业产品,并且没有开源,现在又被告了。 GitHub Copilot 面…

SQLSERVER 事务日志的 LSN 到底是什么?

一:背景 1. 讲故事 大家都知道数据库应用程序 它天生需要围绕着数据文件打转,诸如包含数据的 .mdf,事务日志的 .ldf,很多时候深入了解这两类文件的合成原理,差不多对数据库就能理解一半了,关于 .mdf 的合…

代码随想录--二叉树章节总结 Part II

代码随想录–二叉树章节总结 Part II 1.Leetcode222 求完全二叉树结点的个数 给你一棵 完全二叉树 的根节点 root ,求出该树的节点个数。 完全二叉树 的定义如下:在完全二叉树中,除了最底层节点可能没填满外,其余每层节点数都达…

Python机器学习:特征变换

🌕 特征变换 特征变换主要就是针对一个特征,使用合适的方法,对数据的分布、尺度等进行变换,以满足建模时对数据的需求。 特征变换可分为数据的数据的无量纲化处理和数据特征变换。 🌗 数据的无量纲化处理 常用处理…

22.0:Codejock Suite Pro for ActiveX COM:Crack

从 Visual Basic 5.0 和 6.0 开始一直到当前版本的 Visual Studio 的大多数 ActiveX 容器。与 Visual Studio 无缝集成并包含我们所有 ActiveX COM 产品的评估版本。评估版不提供 OCX 文件的 Unicode 版本。 创建包含一整套高度可定制的用户界面组件的专业应用程序,…

Flink-FinkSQL基本操作(Table API、动态表、事件窗口、分组聚合开窗查询、联结查询)

11 Table API和SQL 11.1 快速上手 引入TableAPI的依赖 桥接器 <dependency><groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</vers…

12、常用插件

文章目录12、常用插件推荐1&#xff1a;Alibaba Java Coding Guidelines推荐2&#xff1a;jclasslib bytecode viewer推荐3&#xff1a;Translation推荐4&#xff1a;GenerateAllSetter推荐5&#xff1a;Rainbow Brackets推荐6&#xff1a;CodeGlance Pro推荐7&#xff1a;Stat…

7.bWAPP -- INSECURE DIRECT OBJECT REFERENCES

7.bWAPP – INSECURE DIRECT OBJECT REFERENCES 0x01、Insecure DOR (Change Secret) 同 XSS - Stored (Change Secret) Low 仔细观察页面, 发现隐藏一个input标签, 作用是输入用户名, 并且配合提交的修改密码, 完成修改用户密码的操作: 这里就可以利用该用户名input标签达…

如何带好一个团队?团队管理的要点有哪些?

想带好一个团队并不是这么容易&#xff0c;尤其是对于新晋升管理者来说更是难上加难。团队管理可以大大提高工作效率。那么&#xff0c;团队管理的要点是什么呢&#xff1f; 1、远景和目标 成员们先要有一个共同的目标&#xff0c;在此基础上还必须要有一个好的愿景&#xff0…

即时通讯系列---如何下手做技术方案设计

1. 引出主题 IM整体涉及的内容比较多, 做技术方案设计需要慎重, 可以先从功能列表以及核心case逐步的总结出技术方案 本文结构: 1. 查看功能列表 2. 核心case分析 3. 总结技术方案设计 2. 如何做技术方案设计 1. 查看功能列表 功能清单 一级分类 二级分类 三级分类…

TCP/UDP网络编程

目录 一、常见的客户端服务端模型 二、Socket套接字 1、概念 2、分类 a、流套接字 b、数据报套接字 c、原始套接字 三、UDP数据报套接字编程 四、TCP数据报套接字编程 一、常见的客户端服务端模型 客户端&#xff1a;用户使用的程序。 服务端&#xff1a;给用户提…

miracl编译及使用

文章目录Windows平台编译网址 https://miracl.com/https://github.com/miracl/MIRACL Windows平台编译 源码目录下新建文件夹ms32或ms64&#xff0c;把/lib/ms32doit.bat或ms64doit.bat分别拷进去。 把源码include和source目录所有文件拷贝进要编译的ms32或ms64&#xff0c…

【高阶数据结构】海量数据如何处理? (位图 布隆过滤器)

&#x1f308;欢迎来到高阶数据结构专栏~~位图 & 布隆过滤器 (꒪ꇴ꒪(꒪ꇴ꒪ )&#x1f423;,我是Scort目前状态&#xff1a;大三非科班啃C中&#x1f30d;博客主页&#xff1a;张小姐的猫~江湖背景快上车&#x1f698;&#xff0c;握好方向盘跟我有一起打天下嘞&#xff0…

模拟实现list / list迭代器

前言&#xff1a;学习C的STL&#xff0c;我们不仅仅要求自己能够熟练地使用各种接口&#xff0c;我们还必须要求自己了解一下其底层的实现方法&#xff0c;这样可以帮助我们写出比较高效的代码程序&#xff01; ⭐在本篇文章中&#xff0c;list的迭代器是重点&#xff0c;它不…

WSL2配置网络代理

注意&#xff1a;本文参考自文章&#xff1a;WSL2配置代理&#xff0c;是对原文的补充&#xff0c;使其适用于河对岸云服务代理。 1 开启Windows代理 1.1 开启代理软件的局域网访问权限 请注意&#xff1a;本文的WSL2代理配置&#xff0c;需要Windows的代理软件已经能够正常…

HTTPS详解及HTTPS实验

目录 HTTPS 一&#xff0c;https在参考模型中的位置 二&#xff0c;什么是HTTPS 三&#xff0c;什么是SSL 1&#xff0c;SSL 协议分为两层&#xff1a; 2&#xff0c;SSL 协议提供的服务&#xff1a; 四&#xff0c;HTTPS的加密方式 1&#xff0c;常见的加密算法 2&#xff0c;…

mysql知识点

目录 1.mysql聚合函数&#xff1a; 2.having&#xff08;用来过滤数据&#xff09;&#xff1a; HAVING 不能单独使用&#xff0c;必须要跟 GROUP BY 一起使用 WHERE 与 HAVING 的对比 3.升序和降序 4.等于 5.实战demo&#xff1a; 1.mysql聚合函数&#xff1a; 常用的聚…

codeforces签到题之div3

前言 第一次&#xff43;&#xff4f;&#xff44;&#xff45;&#xff46;&#xff4f;&#xff52;&#xff43;&#xff45;&#xff53;&#xff0c;发现几个问题&#xff1a; 1,不知道选&#xff4c;&#xff41;&#xff4e;&#xff47;&#xff55;&#xff41;&…

17正交距阵和Gram-Schmidt正交化

标准正交向量与正交矩阵 上一节介绍过的正交向量&#xff0c;通过一个式子进行回顾&#xff0c;设q是标准正交向量组中的任意向量&#xff0c;则 这很好地表现了标准正交向量组内各向量的性质&#xff1a; 不同向量之间相互垂直&#xff08;正交&#xff09;&#xff0c;向量…