RocketMQ 5.0 无状态实时性消费详解

news2024/12/26 22:43:16

作者:绍舒

背景

RocketMQ 5.0 版本引入了 Proxy 模块、无状态 pop 消费机制和 gRPC 协议等创新功能,同时还推出了一种全新的客户端类型:SimpleConsumer。

SimpleConsumer 客户端采用了无状态的 pop 机制,彻底解决了在客户端发布消息、上下线时可能出现的负载均衡问题。然而,这种新机制也带来了一个新的挑战:当客户端数量较少且消息数量较少时,可能会出现消息消费延时的情况。

在当前的消息产品中,消费普遍使用了长轮询机制,即客户端向服务端发送一个超时时间相对较长的请求,该请求会一直挂起,除非队列中存在消息或该请求到达设定的长轮询时间。

然而,在引入 Proxy 之后,目前的长轮询机制出现了一个问题。客户端层面的长轮询和 Proxy 与 Broker 内部的长轮询之间互相耦合,也就是说,一次客户端对 Proxy 的长轮询只对应一次 Proxy 对 Broker 的长轮询。因此,在以下情况下会出现问题:当客户端数量较少且后端存在多个可用的 Broker 时,如果请求到达了没有消息的 Broker,就会触发长轮询挂起逻辑。此时,即使另一台 Broker 存在消息,由于请求挂在了另一个 Broker 上,也无法拉取到消息。这导致客户端无法实时接收到消息,即 false empty response。

这种情况可能导致以下现象:用户发送一条消息后,再次发起消费请求,但该请求却无法实时拉取到消息。这种情况对于消息传递的实时性和可靠性产生了不利影响。

AWS 的文档里也有描述此等现象,他们的解决方案是通过查询是所有的后端服务,减少 false empty response。

在这里插入图片描述

其他产品

在设计方案时,首先是需要目前存在的消息商业化产品是如何处理该问题的。

MNS 采取了以下策略,主要是将长轮询时间切割为多个短轮询时间片,以尽可能覆盖所有的 Broker。

首先,在长轮询时间内,会对后端的 Broker 进行多次请求。其次,当未超过短轮询配额时,优先使用短轮询消费请求来与 Broker 进行通信,否则将使用长轮询,其时间等于客户端的长轮询时间。此外,考虑到过多的短轮询可能会导致 CPU 和网络资源消耗过多的问题,因此在短轮询超过一定数量且剩余时间充足时,最后一次请求将转为长轮询。

然而,上述策略虽以尽可能轮询完所有的 Broker 为目标,但并不能解决所有问题。当轮询时间较短或 Broker 数量较多时,无法轮询完所有的 Broker。即使时间足够充足的情况下,也有可能出现时间错位的情况,即在短轮询请求结束后,才有消息在该 Broker 上就绪,导致无法及时取回该消息。

解法

技术方案

首先,需要明确该问题的范围和条件。该问题只会在客户端数量较少且请求较少的情况下出现。当客户端数量较多且具备充足的请求能力时,该问题不会出现。因此,理想情况是设计一个自适应的方案,能够在客户端数量较多时不引入额外成本来解决该问题。

为了解决该问题,关键在于将前端的客户端长轮询和后端的 Broker 长轮询解耦,并赋予 Proxy 感知后端消息个数的能力,使其能够优先选择有消息的 Broker,避免 false empty response。

考虑到 Pop 消费本身的无状态属性,期望设计方案的逻辑与 Pop 一致,而不在代理中引入额外的状态来处理该问题。

另外,简洁性是非常重要的,因此期望该方案能够保持简单可靠,不引入过多的复杂性。

  1. 为了解决该问题,本质上是要将前端的客户端长轮询和后端的 Broker 长轮询解耦开来,并赋予 Proxy 感知后端消息个数的能力,能够优先选择有消息的 Broker,避免 false empty response。
  2. 由于 Pop 消费本身的无状态属性,因此期望该方案的设计逻辑和 Pop 一致,而不在 Proxy 引入额外的状态来处理这个事情。
  3. Simplicity is ALL,因此期望这个方案简单可靠。

我们使用了 NOTIFICATION,可以获取到后端是否有尚未消费的消息。拥有了上述后端消息情况的信息,就能够更加智能地指导 Proxy 侧的消息拉取。

通过重构 NOTIFICATION,我们对其进行了一些改进,以更好地适应这个方案的要求。

pop with notify

一个客户端的请求可以被抽象为一个长轮询任务,该轮询任务由通知任务和请求任务组成。

通知任务的目的是获取 Broker 是否存在可消费的消息,对应的是 Notification 请求;而请求任务的目的是消费 Broker 上的消息,对应的是 Pop 请求。

首先,长轮询任务会执行一次 Pop 请求,以确保在消息积压的情况下能够高效处理。如果成功获取到消息,则会正常返回结果并结束任务。如果没有获取到消息,并且还有剩余的轮询时间,则会向每个 Broker 提交一个异步通知任务。

在任务通知返回时,如果不存在任何消息,长轮询任务将被标记为已完成状态。然而,如果相关的 Broker 存在消息,该结果将被添加到队列中,并且消费任务将被启动。该队列的目的在于缓存多个返回结果,以备将来的重试之需。对于单机代理而言,只要存在一个通知结果返回消息,Proxy 即可进行消息拉取操作。然而,在实际的分布式环境中,可能会存在多个代理,因此即使通知结果返回消息存在,也不能保证客户端能够成功拉取消息。因此,该队列的设计旨在避免发生这种情况。

在这里插入图片描述

消费任务会从上述队列中获取结果,若无结果,则直接返回。这是因为只有在通知任务返回该 Broker 存在消息时,消费任务才会被触发。因此,若消费任务无法获取结果,可推断其他并发的消费任务已经处理了该消息。

消费任务从队列获取到结果后,会进行加锁,以确保一个长轮询任务只有一个正在进行的消费任务,以避免额外的未被处理的消息。

在这里插入图片描述

如果获取到消息或长轮询时间结束,该任务会被标记完成并返回结果。但如果没有获取到消息(可能是其他客户端的并发操作),则会继续发起该路由所对应的异步通知任务,并尝试进行消费。

自适应切换

考虑到当请求较多时,无需采用 pop with notify 机制,可使用原先的 pop 长轮询 broker 方案,但是需要考虑的是,如何在两者之间进行自适应切换。目前是基于当前 Proxy 统计的 pop 请求数做判断,当请求数少于某一值时,则认为当前请求较少,使用 pop with notify;反之则使用 pop 长轮询。

由于上述方案基于的均为单机视角,因此当消费请求在 proxy 侧不均衡时,可能会导致判断条件结果有所偏差。

Metric

为了之后进一步调优长轮询和观察长轮询的效果,我们设计了一组 metric 指标,来记录并观测实时长轮询的表现和损耗。

  1. 客户端发起的长轮询次数 (is_long_polling)
  2. pop with notify 次数 (通过现有 rpc metric 统计)
  3. 首次 pop 请求命中消息次数 (未触发 notify) (is_short_polling_hit)

使用方式

在使用时需明确长轮询和短轮询的区分,可以参考 AWS 的定义,当轮询时间大于 0 时,长轮询生效。

在这里插入图片描述

可以看到需明确一个长轮询最小时间,因为长轮询时间过小时无意义,AWS 的最小值采取了 1 秒。

在这里插入图片描述

在目前版本的 Apache RocketMQ 服务端中,采用了最小 5 秒的限制,即需超过 5 秒才能触发长轮询,该值可在 ProxyConfig#grpcClientConsumerMinLongPollingTimeoutMillis 中配置或修改。

对于 SimpleConsumer 而言,可以通过 awaitDuration 字段来调整长轮询时间。

SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    .setConsumerGroup(consumerGroup)
    // set await duration for long-polling.
    .setAwaitDuration(awaitDuration)
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .build();

总结

通过如上方案,我们成功设计了一套基于无状态消费方式的实时消费方案,在做到客户端无状态消费的同时,还能够避免 false empty response,保证消费的实时性,同时,相较于原先 PushConsumer 的长轮询方案,能够大量减少用户侧无效请求数量,降低网络开销。

RocketMQ 学习社区体验地址

RocketMQ 学习社区重磅上线!AI 互动,一秒了解 RocketMQ 功能源码。RocketMQ 学习社区是国内首个基于 AIGC 提供的知识服务社区,旨在成为 RocketMQ 学习路上的“贴身小二”。

PS:RocketMQ 社区以 RocketMQ 5.0 资料为主要训练内容,持续优化迭代中,回答内容均由人工智能模型生成,其准确性和完整性无法保证,且不代表 RocketMQ 学习社区的态度或观点。
立即体验 RocketMQ 学习社区(建议 PC 端体验完整功能):https://rocketmq-learning.com/

为了帮助用户更全面的了解 RocketMQ 5.0,同时收集更多反馈,**「寻找 RocketMQ 首席评测官」**活动惊喜上线!

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WRUz4bGL-1690130818194)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/df4c1c35104e46c086cbd7f6bdc7c5aa~tplv-k3u1fbpfcp-zoom-1.image “image”)]

点击此处,即可报名参加!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/784190.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot原理分析 | Redis集成

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; Springboot集成Redis 依赖导入 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis<…

九、数据结构——顺序队列中的循环队列

目录 一、循环队列的定义 二、循环队列的实现 三、循环队列的基本操作 ①初始化 ②判空 ③判满 ④入队 ⑤出队 ⑥获取长度 ⑦打印 四、循环队列的应用 五、全部代码 数据结构中的循环队列 在数据结构中&#xff0c;队列&#xff08;Queue&#xff09;是一种常见的线性数据结…

防火墙NAT地址转换的四种应用实验与防火墙的双机热备实验

一、NAT实验 一、源地址转换 1、首先搭建NAT实验环境的拓扑&#xff1a; 这里需要配置各个设备的ip、掩码、网关&#xff1b;省略 2、登录防火墙设备并且为防火墙设备的0/0/0接口配置与虚拟网卡一个网段的ip&#xff0c;并且开启该接口的全部服务 [USG6000V1]int gi 0/0/0…

Keil系列教程11_工程窗口图标说明

1写在前面 很多朋友看到如下工程窗口的图标&#xff08;如&#xff1a;带有“叹号”、“星号”、“钥匙”、“禁止驶入”标志&#xff09;&#xff0c;就会产生疑问&#xff1a;这些图标到底是啥意思呢&#xff1f; 其实&#xff0c;这些不同标志的图标是代表着不同的含义&…

AWVS 15.6 使用教程

目录 介绍 版本 AWVS具有以下特点和功能&#xff1a; 功能介绍&#xff1a; Dashboard功能&#xff1a; Targets功能&#xff1a; Scans功能&#xff1a; Vulnerabilities功能&#xff1a; Reports功能&#xff1a; Users功能&#xff1a; Scan Profiles功能&#x…

MyBatis查询数据库(2)

目录 前言&#x1f36d; 一、增删查改操作 1、查 Ⅰ、mapper接口&#xff1a; Ⅱ、UserMapper.xml 查询所有用户的具体实现 SQL&#xff1a; Ⅲ、进行单元测试 2、增、删、改操作 Ⅰ、增 添加用户 添加用户并且返回自增 id Ⅱ、改 根据id修改用户名 开启 MyBatis …

leetcode每日一练-第141题-环形链表

一、思路 双指针 二、解题方法 使用了正确的快慢环指针方法来判断链表。快指针每次向前移动两步&#xff0c;慢指针每次移动一步&#xff0c;如果链表中向前移动一步&#xff0c;它们最终会相遇。如果链表不存在环&#xff0c;快指针会先到达链表是否存在&#xff0c;此时存在…

【C#】using

文章目录 global 修饰符using 别名结合“global 修饰符”和“using 别名”static 修饰符来源 global 修饰符 向 using 指令添加 global 修饰符意味着 using 将应用于编译中的所有文件&#xff08;通常是一个项目&#xff09;。 global using 指令被添加到 C# 10 中。 其语法为…

怎么快速定位bug?怎么编写测试用例?

目录 01定位问题的重要性 02问题定位技巧 03初次怎么写用例 作为一名测试人员如果连常见的系统问题都不知道如何分析&#xff0c;频繁将前端人员问题指派给后端人员&#xff0c;后端人员问题指派给前端人员&#xff0c;那么在团队里你在开发中的地位显而易见 &#xff0c;口碑…

什么?按Home键SingleInstance Activity销毁了???

前段时间&#xff0c;突然有朋友询问&#xff0c;自己写的SingleInstance Activity在按home键的时候被销毁了&#xff0c;刚听到这个问题的时候&#xff0c;我直觉怀疑是Activity在onPause或者onStop中发生了Crash导致闪退了&#xff0c;但是安装apk查看现象&#xff0c;没有发…

摸索graphQL在前端vue中使用过程(四)

请求网址https://hasura.io/learn/graphql&#xff0c;他这个Authorization好像每天就会一次变化&#xff0c;需要注意。 之前用到了一种类型ID&#xff0c;也就是说&#xff0c;在GraphQL的查询标量的过程中。 标量:就是被查询的字段名称。 这里再补充一点知识&#xff0c;统…

Android 包体积资源优化实践

1 插件优化 插件优化资源在得物App最新版本上收益12MB。插件优化的日志在包体积平台有具体的展示&#xff0c;也是为了提供一个资源问题追溯的能力。 1.1 插件环境配置 插件首先会初始化环境配置&#xff0c;如果机器上未安装运行环境则会去oss下载对应的可执行文件。 1.2 图…

Windows 在VMware16.x安装Win11系统详细教程

文章目录 一、准备二、创建虚拟机1. 创建新的虚拟机2. 选择虚拟机硬件兼容性3. 安装客户机操作系统4. 选择客户机操作系统5. 命名虚拟机6. 固件类型7. 处理器配置8. 此虚拟机内存9. 网络类型10. 选择I/O控制器类型11. 选择磁盘类型12. 选择磁盘13. 指定磁盘容量14. 指定磁盘文件…

【深度学习】日常笔记15

训练集和测试集并不来⾃同⼀个分布。这就是所谓的分布偏移。 真实⻛险是从真实分布中抽取的所有数据的总体损失的预期&#xff0c;然⽽&#xff0c;这个数据总体通常是⽆法获得的。计算真实风险公式如下&#xff1a; 为概率密度函数 经验⻛险是训练数据的平均损失&#xff0c;⽤…

python机器学习(四)线性代数回顾、多元线性回归、多项式回归、标准方程法求解、线性回归案例

回顾线性代数 矩阵 矩阵可以理解为二维数组的另一种表现形式。A矩阵为三行两列的矩阵&#xff0c;B矩阵为两行三列的矩阵&#xff0c;可以通过下标来获取矩阵的元素&#xff0c;下标默认都是从0开始的。 A i j : A_{ij}: Aij​:表示第 i i i行&#xff0c;第 j j j列的元素。…

N位分频器的实现

N位分频器的实现 一、 目的 使用verilog实现n位的分频器&#xff0c;可以是偶数&#xff0c;也可以是奇数 二、 原理 FPGA中n位分频器的工作原理可以简要概括为: 分频器的作用是将输入时钟频率分频,输出低于输入时钟频率的时钟信号。n位分频器可以将输入时钟频率分频2^n倍…

linux进阶-I.MX 6ULL

目录 启动模式&#xff08;8引脚设置启动模式&#xff09; 对应原理图 boot ROM程序 空偏移 映像向量表&#xff08;Image vector table&#xff0c;IVT&#xff09; IVT结构体 Boot data DCD&#xff08;外设寄存器配置信息&#xff0c;初始化关键外设&#xff09; NXP…

如何使用 After Effects 导出摄像机跟踪数据到 3ds Max

推荐&#xff1a; NSDT场景编辑器助你快速搭建可二次开发的3D应用场景 在本教程中&#xff0c;我将展示如何在After Effects中跟踪实景场景&#xff0c;然后将相机数据导出到3ds Max。 1. 项目设置 步骤 1 打开“后效”。 打开后效果 步骤 2 转到合成>新合成以创建新合…

Docker Compose(九)

一、背景&#xff1a; 对于现代应用来说&#xff0c;大多数都是通过很多的微服务互相协同组成一个完整的应用。例如&#xff0c;订单管理、用户管理、品类管理、缓存服务、数据库服务等&#xff0c;他们构成了一个电商平台的应用。而部署和管理大量的服务容器是一件非常繁琐的事…

【图像处理】使用自动编码器进行图像降噪(改进版)

阿里雷扎凯沙瓦尔兹 一、说明 自动编码器是一种学习压缩和重建输入数据的神经网络。它由一个将数据压缩为低维表示的编码器和一个从压缩表示中重建原始数据的解码器组成。该模型使用无监督学习进行训练&#xff0c;旨在最小化输入和重建输出之间的差异。自动编码器可用于降维、…