RocketMQ 5.0 API 与 SDK 的演进

news2025/3/1 23:08:17

作者: 艾阳坤

RocketMQ 5.0 SDK 采用了全新的 API,使用 gRPC 作为通信层的实现,并在可观测性上做了很大幅度的提升。

全新统一的 API

此处的 API 并不单单只是接口上的定义,同时也规定了各个接口不同的方法和行为,明确了整个消息模型。

在这里插入图片描述

RocketMQ 过去的 API 从第一版开始,至今已经过了很长时间,长期依赖是一个缺乏变革的状态,对于一些中途打算废弃或者变更的 API 也没有进行后续的迭代。此外,接口的定义也不够清晰。因此,RocketMQ 希望在 5.0 中能够建立一个统一的规范,精简整个 API,通过引入 builder 模式来引入更多的不变性,同时做好异常管理,给开发者和用户一个更加清爽的面貌。

目前 C++ 和 Java 已经进行了 5.0 API 的定义与实现,更多语言的支持也陆续在路上了。我们也欢迎更多的开发者可以参与到社区的工作中来。这里给出 5.0 客户端的仓库链接:

https://github.com/apache/rocketmq-clients

除了在上述接口上做的一些修改之外, RocketMQ 5.0 还规定了四种新的不同的客户端类型,即 Producer/Push Consumer/Simple Consumer/Pull Consumer。

其中 Pull Consumer 还在开发中;Producer 主要还是做了接口裁剪,规范了异常管理。在功能上其实并没有做一些颠覆性的改变。Push Consumer 也是比较类似的;Simple consumer 将更多的权利将下发给用户,是一种用户可以主动控制消息接收与处理过程的消费者,特别的,5.0 的 SDK 中,Push Consumer 和 Simple Consumer 都采用 RocketMQ 的 pop 机制进行实现,一些社区的同学可能已经熟悉了。

如果用户并不一定想控制或者关心整个消息的接收过程,只在乎消息的消费过程的话,这个时候 Push Consumer 可能是一个更好的选择。

RocketMQ 5.0 定义了四种不同的消息类型。过去的开源版本中其实我们并没有去突出消息类型这样一个概念,后续出于维护及运维方面的需要以及模型定义的完备,才让今天的 5.0 有了消息类型的这样一个概念。

在这里插入图片描述

1、NORMAL:普通消息。

2、FIFO:满足先入先出的语义。用户可以通过定义 message group 来控制消息间的消费顺序。例如图中的 fruit 这个 topic 下,可以定义不同的 message group,在 apple 这个 message group 下,会按照发送顺序决定消息被消费的顺序,并且不同的 message group 之间不会互相干扰。

3、TRANSACTIONAL:可以将一条或多条消息包成一个事务,最终用户可以根据自己的业务结果选择提交或者回滚。

4、DELAY:用户可以自主地设置消息的定时时间,相比之前开源版本仅允许用户设置定时/延迟级别,5.0 的实现中还讲允许用户设置更精确的时间戳。

以上四种消息是互斥的,我们会在 topic 的元数据去标识它的类型。实际在消息发送的时候如果如果出现了尝试发送的消息类型与 topic 类型不匹配的情况,也会做一些限制。

实现

RocketMQ 5.0 在客户端的启动过程中提前进行了更多的准备工作。比如用户提前设置要发送消息的 topic 时,Producer 会在启动过程中尝试获取对应 topic 的路由。在过去的客户端实现中,在针对于某个 topic 第一次发送消息时,需要先获取路由,这里就会有一个类似冷启动的过程。

在这里插入图片描述

提前获取 Topic 的路由信息有两点好处:

  1. 不阻塞后面的发送,让消息的发送仅仅触发发送这一个动作。

  2. 错误前置,比如用户要往一个不存在 Topic 发送消息时,因为路由的获取参与到整个客户端的启动过程,获取路由不成功,那整个客户端启动可能就会失败,用户也是拿不到对应的 Producer 对象的。

类似的,Consumer 的启动也会有这样的一个过程。

除此之外,我们在客户端和服务端之间增加了一个 Telemetry 的部分,它会在客户端和服务端之间建立起了一个进行双向数据通讯的通道,客户端和服务端会在这个过程中沟通配置,比如服务端可以实现对客户端配置的下发,更好地管理客户端。此外,Telemetry 也可以将本地配置主动上报给服务端,让服务端也可以对客户端的设置有更好的了解。Telemetry 通道也会在客户端启动时尝试建立,如果这个通道没有建立成功,也会影响客户端的启动。

总的来说,客户端的启动过程会尽可能将所有准备工作做好。同时在客户端和服务端之间建立 Telemetry 这样一个通讯通道。

在这里插入图片描述

客户端内部存在一些周期性的任务,比如路由的定时更新以及客户端往服务端发送心跳等。对于上文中提到的 Telemetry 过程中,客户端的配置上报也是周期性的。

在这里插入图片描述

Producer 在 RocketMQ 5.0 中的具体工作流程

消息在发送时,会检查是否已经获取对应 topic 的路由信息。如果已经获取,则尝试在路由中选取队列,随即查看要发送的消息的类型是否与 topic 类型匹配,如果匹配,则进行消息发送。如果发送成功,则返回;否则,判断当前重试次数是否超出用户设置的上限,如果超出,则返回失败;否则轮转到下一个队列,然后对新的队列进行重试直到消费次数超出上线。而如果启动过程中没有提前获取路由,那么消息发送时依然会先尝试获取路由,然后再进行下一步操作。

另外一点相对于老客户端较大的改变在于,客户端从底层 RPC 交互到上层的业务逻辑全部采用异步实现。Producer 依然会提供一个同步发送接口和异步发送接口,但同步的方法也是使用异步来实现,整个逻辑非常统一和清爽。

在这里插入图片描述

Push Consumer 分为两部分,消息的接收和消费。

消息接收流程为:客户端需要不断地从服务端拉取消息,并将消息缓存。Push Consumer 会将消息先缓存到客户端的本地,再进行消费,因此它会判断客户端本地的 Cache 是否已满,如果已满,则隔一段时间再判断,直到消息被客户端消费,Cache 尚有余量时再进行消息拉取。为了避免出现一些内存问题,Cache 的大小也是被严格限制的。

在这里插入图片描述

消息消费过程分为两个类型,顺序类型和非顺序类型。

其中非顺序类型即并发消费。消费者会先从 Cache 中获取消息,然后尝试消费消息,消费后再将消息从 Cache 中移除。消息消费成功时,会尝试将消息 ACK ,消费流程结束;如果消费失败,则尝试修改消息的可见时间,即决定下一次什么时候可见。

顺序消费指对于同一个 Group 的消息,最终消费时一定是前一条消息被消费过并且得到确认后,后面的消息才能够继续消费。而消费过程与非顺序消费类似,首先尝试从 Cache 中拉取消息,如果消费成功,则将消息 ACK。ACK 成功后,将其从 Cache 中移除。特别地,如果消费失败,会 suspend 一段时间,然后继续尝试对消息进行消费。此时会判断消费次数是否超限,如果超限,则会尝试将消息放入死信队列中。

相对于非顺序消费,顺序消费更复杂,因为其需要保证前一个消息消费成功后才能对后面的消息进行消费。顺序消费的消费逻辑是基于 message group 隔离的。message group 会在发送时做哈希,从而保证 message group 的消息最终会落在一个队列上,顺序消费模式本质上保证队列内部消费的顺序。

此外,因为不同 message group 的顺序消息最终可能会映射到同一个队列上,这可能会导致不同的 message group 之间的消费形成阻塞,因此服务端未来会实现一个虚拟队列,让不同的 message group 映射到客户端的虚拟队列,保证他们之间没有任何阻塞,从而加速数据消息的消费过程。

在这里插入图片描述

对于 Simple Consumer,用户可以主动控制消息接收和确认的流程。比如用户收到消息后,可以根据业务决定是否过一段时间再消费该消息,或者不需要再收到该消息。消费成功后将消息 ACK 掉,如果失败则主动修改可见时间,选择该消息下一次什么时候可见,即由用户自发地控制整个过程。

可观测性

Shaded Logback

在这里插入图片描述

因为历史原因,RocketMQ 的老客户端并不是面向 SLF4J 进行编程的,而是面向 logback 的。这么做的目的其实是为了方便快捷地获取日志,不需要让用户自己去手动配置。

RocketMQ 中专门有一个 logging 模块是负责日志部分的,像用户自己使用了 logback ,RocketMQ SDK 如果也直接去使用 logback,两者就会产生各种各样的冲突,这个 logging 模块就是用来保证这一层隔离性的。

但是 logging 模块本身的实现并不是很优雅,也带来了一定的维护成本。因此我们采用了 shade logback 的方式来达到上文提到的隔离性。shaded logback 不仅能够避免用户的 logback 与 RocketMQ 自己的 logback 冲突,还能保持较好的可维护性,将来要想在日志上去做一些修改,也来得容易的多。

具体来说,用户的 logback 会采用 logback.xml 的配置文件,通过 shade logback, RocketMQ 5.0 的客户端会使用 rocketmq.logback.xml 的配置文件,因此在配置部分就已经完全隔离了,同时在 shade 的过程中,还对原生 logback 中使用到的一些环境变量和系统变量也进行了修改,这样就保证了两者的彻底隔离。

另外,使用 shadeed logback 之后,RocketMQ 5.0 客户端中的日志部分就全都是面向 SLF4J 来进行编程的了,这样一来,如果我们未来想让用户自己去完全控制日志的话,提供一个去除 logback 的 SDK 就可以了,非常方便。

Trace

5.0 的消息轨迹基于 OpenTelemetry 模型进行定义与实现,消息发送或接收消息的流程被定义为一个个独立的 span ,这一套 span 规范参照了 OpenTelemetry 关于 Messaging 的定义。图中这里 Process P 表示 Producer ,Process C 表示 Consumer。消息的全生命周期,从发送到接收到消费,就可以具象化为这样一个个的 span。

在这里插入图片描述

比如,针对 Push Consumer 而言,先会有一个 receive 的 span 来表示从服务端获取消息的过程,收到消息后到会先等待消息被处理,这个也就是 await span 表示的过程,消息被处理则对应图中的 process span,消息消费结束之后,向服务端反馈消息处理结果也会有专门的 span 进行描述。

我们通过 parent 和 link 来讲所有的这些 span 关联起来,这样通过一条消息的任意一个 span,就可以获得这条消息全生命周期的所有 span。

不仅如此,用户还将允许可以设置一个 span context 与自己的业务链路进行关联,将 RocketMQ 5.0 的消息轨迹本身嵌入进自己的全链路可观测系统中去。

Metrics

Tracing 相对来说成本是比较高的,因为一条消息从发送到接收,可能会有很多流程,这就伴随着很多的 span,这就导致相对来说,tracing 数据的存储查询成本相对来说比较高。我们希望诊断整个 SDK 的健康状况,同时又不希望收集太多的 tracing 信息提高成本,此时提供一份 metrics 数据就能比较好地满足我们的需求。

在这里插入图片描述

在 SDK 的 metrics 中我们新增了诸多指标,包括不限于 Producer 中消息发送延时,Push Consumer 中消息的消费耗时和消息缓存量,可以帮助用户和运维者更快更好地发现异常。

5.0 中 SDK 的 metrics 也是基于 OpenTelemetry 进行实现的。以 Java程序为例,OpenTelemetry 对于 Java 的实现本身提供了一个 agent,agent 在运行时会打点采集 SDK 的一些 tracing/metrics 信息,并将它上报到对应的 metric collector 中,这种通过 agent 来降低无侵入式数据采集的方式被称之为 automatic instrumentation,而手动在代码中实现打点采集的方式则被称之 manual instrumentation。对于 metrics 我们目前还是采用 manual instrumentation 的方式来进行数据的采集和上报的。服务端会告知客户端对应的 collector 的地址,然后客户端将 Metrics 数据上传到对应的 collector 当中去。

作者介绍:

艾阳坤,Apache RocketMQ 5.0 Java SDK 作者,CNCF Envoy Contributor,CNCF OpenTelemetry Contributor,阿里云智能高级开发工程师。

加入 Apache RocketMQ 社区

十年铸剑,Apache RocketMQ 的成长离不开全球接近 500 位开发者的积极参与贡献,相信在下个版本你就是 Apache RocketMQ 的贡献者,在社区不仅可以结识社区大牛,提升技术水平,也可以提升个人影响力,促进自身成长。

社区 5.0 版本正在进行着如火如荼的开发,另外还有接近 30 个 SIG(兴趣小组)等你加入,欢迎立志打造世界级分布式系统的同学加入社区,添加社区开发者微信:rocketmq666 即可进群,参与贡献,打造下一代消息、事件、流融合处理平台。
欢迎您扫描下方二维码加入钉钉群一起沟通交流~

在这里插入图片描述

点击此处,进入官网了解更多详情~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/9254.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【案例实战】分布式应用下登录检验解决方案(JWT)

文章目录1.需求背景以及JWT简介2.创建Maven项目,搭建SpringBoot项目3.容器化急速部署MySQL4.数据库表准备5.SpringBoot整合MySQLMyBatisPlus6.MyBatisPlus逆向工程自动生成7.SpringBoot整合JWT8.开发测试接口9.开发登录接口10.开发登录拦截器11.启动验证1.需求背景以…

cookie介绍:cookie实现增删改查功能

1、cookie介绍 只能存储文本,只能实现在本地的存储;单条存储有大小限制4kB左右,数量限制(一般浏览器,限制大概在50条左右);读取有域名限制:不可跨域读取,只能由来自写入…

集成学习与随机森林

1、集成学习概念 集成学习(ensemble learning)是一类机器学习框架,通过构建并结合多个学习器来完成学习任务。一般结构是:先产生一组“个体学习器”,再用某种策略将它们结合起来。结合策略主要有平均法、投票法和学习法等 集成学习包含三个…

仿大众点评——秒杀系统部分01

秒杀系统 全局ID生成器 全局唯一ID生成策略: UUIDRedis自增snowflake算法数据库自增 这里使用Redis自增的数值,并拼接一些其它信息 Redis自增ID策略: 每天一个key,方便统计订单量ID构造是 时间戳 计数器 ID的组成部分&#…

Unity接入日志插件Log4Net

前言 log4net是一个日志插件,可以帮助我们把控制台输出的日志写入到本地。这个功能说简单就简单,说复杂其实还挺复杂。 为什么这么说呢,首先文件写入本地确实简单,但是如果你要实现一下功能就没那么简单了。 1.把每行日志按照指…

2022年“新一代”设备管理系统——支持低代码平台

在现代化企业的信息化管理体系建设中,设备管理系统被看作是重中之重。因为设备是工厂的主要生产要素,而且随着生产设备的日益增多,设备的重要性日益凸显。如何妥善管理这些设备也成了企业管理者经常考虑的问题。单纯依靠人工管理逐渐不能满足…

uniapp实现下拉刷新及上拉(分页)加载更多(app,H5,小程序均可使用)

开门见山地说,在移动端开发中,80%的项目都会涉及到列表展示,而有了列表不可避免的需求就是列表的下拉刷新和上拉加载更多。本篇文章主要介绍在使用uniapp开发移动端的过程中,比较好用的一个下拉及上拉组件,节约大家选择…

5.2启动内存分页机制,畅游虚拟空间

5.2启动内存分页机制,畅游虚拟空间 即使机器上只有512MB的内存,每个进程自己的内存空间也是4GB,这4GB便是指的虚拟内存空间。下面就是讲解虚拟内存空间是怎么来的。 5.2.1内存为什么要分页 问题场景:由于多进程的发展&#xff…

软考 - 计算机组成与结构

数据计算 数据的进制转化 十六进制符号 0X 或 H,可表示为0x18F 或 18FH m进制转n进制:先将m进制转化为十进制数,再将十进制数转化为n进制数(2进制可直接转8进制(3位)和16进制(4位&#xff09…

Python如何自动操作电脑桌面应用程序

前言 本文是该专栏的第2篇,后面会持续分享python的各种黑科技知识,值得关注。 熟悉python的朋友,都知道python可以做自动化,比如说selenium,pyppeteer,airtest等等。 但你是否听说过python可以来自动操作电脑桌面的应用程序呢,趟若临时接到某个需求,让你用python脚本…

基于zynq7100的OV5640摄像头照相机实验,提供工程源码和技术支持

目录1.设计架构2.工程简介3.zynq配置4.sd卡文件系统FATFS配置5.sd卡文件系统FATFS读写测试6.OV5640摄像头显示测试7.OV5640摄像头循环拍照测试8.OV5640摄像头按键拍照测试9.上板调试10.福利领取1.设计架构 设计框图如下: 采用Xilinx官方推荐的VDMA架构实现图像缓存…

TiDB丨一次TiDB GC阻塞引发的“惨案”......

前不久,从项目一线同学得到某集群的告警信息,某个时间段 TiDB duration 突然异常升高,持续时间6小时左右,需要定位到具体原因。 于是乎,我们就来一场关于TiDB GC阻塞的排查...... 分析过程 第一招 初步判断 既然…

Docker+nginx在CVM的机器远程发布hellogin

有两种方式,一种通过docker容器安装,一种是直接安装, 这里我们通过docker服务安装 常用操作 images是查询当前机器上所有的镜像有哪些 docker images删除镜像 docker rmi [MAGE ID ]可以查当前运行中的容器 docker ps -a开始/停止/删除容器 docker…

动态树的最值

一 问题描述 一棵树有 N 个节点,每个节点都有一个权值 Wi ,有 4 种操作。 ① 1 x y ,在两个节点 x、y 之间添加一条新边。因此,在这种操作之后,两棵树将连接成一棵新树。 ② 2 x y ,在树集合中找到包含节…

LeetCOde-剑指46-把数字翻译成字符串

1、动态规划法 我们通过观察可以发现,假如我们使用数组dp[i]dp[i]dp[i]来记录前iii位可能构成的字符串个数:1、当新加入的第i1i1i1位和第iii位能够构成一个大于9小于26的数字时,dp[i1]dp[i]dp[i−1]dp[i1]dp[i]dp[i-1]dp[i1]dp[i]dp[i−1]&a…

容器化部署(k8s)任务调度平台xxl-job(部署过程及踩坑问题记录)

文章预览:1 部署过程(下方ip代表服务器的ip哈)1.1 制作服务打包镜像DockerFile1.2 制作执行脚本run.sh1.3 jar包上上传1.4 kuboard创建----配置信息2 踩坑问题记录2.1 日志抛出异常2.2 原因分析2.3 过程分析及解决2.4 执行调度测试导入sql等过…

Baklib|SaaS产品,实现企业流程数字化

正如许多科技潮流一样,“SaaS”这个词也逐渐成为企业经理们谈论的话题。然而,如果您对“SaaS”一无所知,您可能会感到困惑并容易忽略它。那么,什么是“SaaS”?它的优点是什么?它如何帮助企业实现数字化转型…

SSM+VUE+ElementUI实现宠物领养系统,期末大作业

SSMVUEElementUI实现宠物领养系统 系统角色 领养人,管理员 系统功能 本系统的功能主要分为四大模块: 领养人用户模块:注册、领养人登录、申请领养、查看小动物信息、发布留言领养机构员工用户模块:领养机构员工登录、增加小动…

地平线开发者社区真心话大冒险,邀你闯关!

Hello! 各位初次见面的萌新和久经沙场的社牛 目前开发者社区已成立两年有余 感谢大家一路上的支持和理解 今天,我们也准备了一些小礼品 希望倾听大家作为用户和开发者的真心话 同时,也欢迎初次见面的萌新们一同冒险 期待陪伴大家走过更…

Protect Privacy from Gradient Leakage Attack in Federated Learning

wangjunxiao/GradDefense: Defense against Gradient Leakage Attack (github.com) Summary 针对DGA和DIA攻击,提出了一个轻量、保证训练准确性、够用的的防御机制。防御机制主要包括随机layer添加扰动,然后进行梯度补偿来减少噪声对模型准确性的影响。…