【RcoketMQ】RcoketMQ 5.0新特性(二)- Pop消费模式

news2025/2/27 8:15:43

Pop模式消费和消息粒度负载均衡

在RocketMQ 5.0之前,消费有两种方式可以从Broker获取消息,分别为Pull模式和Push模式。

  • Pull模式:消费需要不断的从阻塞队列中获取数据,如果没有数据就等待,这个阻塞队列中的数据由消息拉取线程从Broker拉取消息之后加入的,所以Pull模式下消费需要不断主动从Broker拉取消息。
  • Push模式:需要注册消息监听器,当有消息到达时会通过回调函数进行消息消费,从表面上看就像是Broker主动推送给消费者一样,所以叫做推模式,底层依旧是消费者从Broker拉取数据然后触发回调函数进行消息消费,只不过不需要像Pull模式一样不断判断是否有消息到来。


注:图片来自RocketMQ官方文档

不过不管是Pull模式还是Push模式,在集群模式下,一个消息队列只能分配给同一个消费组内的某一个消费者进行消费,所以需要进行Rebalance负载均衡为每个消费者分配消息队列之后才可以进行消息消费。
Rebalance的工作是在每个消费者端进行的,消费端负责的工作太多,除了负载均衡还有消费位点管理等功能,如果新增一种语言的支持,就需要重新实现一遍对应的业务逻辑代码。

除此以外,在RocketMQ 5.0以前负载均衡是以消息队列为维度为每个消费者分配的,一个消息队列只能分给组内一个消费者消费,所以会存在以下问题:

(1)队列只能分给组内一个消费者消费,也就无法通过扩展消费者的数量来提升消费能力;
(2)消息队列数量与消费者数量比例不均衡时,可能会导致某些消费者没有消息队列可以分配或者某些消费者承担过多的消息队列,分配不均匀;
(3)如果某个消费者hang主,会导致分配到该消费者的消息队列中的消息无法消费,导致消息积压;

在RocketMQ 5.0增加了Pop模式消费,将负载均衡、消费位点管理等功能放到了Broker端,减少客户端的负担,使其变得轻量级,并且5.0之后支持消息粒度的负载均衡。

消息粒度负载均衡

对于PushConsumer和SimpleConsumer类型的消费者,默认且仅使用消息粒度负载均衡策略。

注:图片来自RocketMQ官方文档

消息粒度负载均衡策略中,同一消费组内的多个消费者将按照消息粒度平均分摊主题中的所有消息,即同一个队列中的消息,可被平均分配给组内多个消费者共同消费。

消息粒度负载均衡策略保证同一个队列的消息可以被组内多个消费者共同处理,但是该策略使用的消息分配算法结果是随机的,不能指定消息被哪一个特定的消费者处理。当消费者获取到某条消息后,服务端会对该消息加锁,保证该消息对其他消费者不可见,直到消息消费成功或者超时,所以多个消费者同时消费同一个消息队列中的消息,服务端也可以保证消息不会被多个消费者重复消费。

消息粒度负载均衡策略适用于绝大多数在线处理的业务场景。

Pop消息消费

首先客户端(消费者)向服务端(Broker)发送Pop请求,Broker端收到请求后以Pop模式获取消息,之后返回给客户端,客户端消费消息成功之后,向Broker发送ACK请求确认消息消费成功。

当POP出一条消息之后,这条消息就会在一段时间内不可见,在这个时间段内,这条消息不会再被POP出来,如果在这个期间未能收到该消息的ACK请求,过了这个不可见的时间之后,消息就会恢复可见状态,重新被消费。

POP的消费位点由Broker保存和控制,并且POP模式可以使多个消费者端消费同一个消息队列中的消息,消费者端不再需要在本地做负载均衡分配消息队列,只需要调用服务端提供的POP接口获取消息进行消费即可,即便某个消费者hang住,其他消费者依旧可以继续消费队列中的数据,不会造成消息堆积。

POP消息在Broker端的实现

  1. Broker端在处理POP请求时,先在队列维度加锁,保证同一时间只有一个消费者可以从该队列中获取消息;

  2. Broker端会从队列中获取一批消息,并构建这批消息对应的CheckPoint信息保存在Broker中,之后会与ACK的消息进行匹配;
    CheckPoint主要包括消息的 Topic,ConsumerGroup,QueueId,offset,POPTime,msgCout,reviveQueueId等信息。

  3. CheckPoint会优先保存在内存中,如果在一段时间内收到了客户端的ACK消息,就会将对应的CheckPoint清除,并更新消费进度;

  4. 对于一段时间内为收到ACK消息的CheckPoint,会将其从内存中删除,然后发送到延时主题SCHEDULE_TOPIC_XXXX中,到达延时时间之后,消息会再被转发到REVIVE_TOPIC(会使用REVIVE_LOG_ + 集群名称作为主题)中,有一个线程去处理REVIVE_TOPIC中的数据,将里面的消息拉取放入到一个
    MAP中,如果后续收到对应的ACK消息,则会更新REVIVE_TOPIC主题中的消费位点标识消息消费完成,如果过了一定时间依旧未收到对应的ACK消息,会查找这个CheckPoint对应的真实消息,将其放入到重试队列中,等待客户端消费,所以消费者消费的时候有一定概率可以消费到重试队列中的消息。

由于一个消息队列中的消息可以被多个消费者消费,如果某个消费者在消费某条消息之后一直未发生ACK消息,那么Broker是如何管理消费进度的,比如队列1中有1、2、3、4、5条消息,此时有三个消费者1、2、3,分别分配到了队列中的1、2、3条消息,此时消费者1已经对消息1ACK完毕,消费者3也对消息3ACK完毕,消费者2一直未ACK消息2,那么Broker如何设置消费进度?

个人认为,在一段时间内消息2对应的CheckPoint未匹配到对应的ACK消息,为了保证消费可以继续向后消费消息,应该会推进消费进度跳过这个消息,对于消息2,会按照超时处理逻辑,将其对应的CheckPoint先放入延时队列,再放入REVIVE_TOPIC中,之后等待ACK,如果之后一直还未收到ACK再将其放入重试队列,等待重新消费。

参考
RocketMQ官方文档

RocketMQ 5.0 POP 消费模式探秘

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1087426.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springBoot 自动配置机制

springBoot 自动配置机制 自动配置的tomcat、springmvc等默认包扫描规则自定义扫描路径配置默认值按需加载自动配置总结 自动配置的tomcat、springmvc等 导入场景,容器中就会自动配置好这个场景的核心组件 以前:dispatcherservlet、viewresolver、chara…

MyCat分片规则

MyCat是一个开源的数据库中间件,它可以实现对MySQL数据库进行分片和负载均衡。在分片规则方面,MyCat支持以下几种常见的分片方式: 范围分片 根据指定的字段及其配置的范围与数据节点的对应情况, 来决定该数据属于哪一个分片。 …

LeetCode【438】找到字符串中所有字母异位词

题目&#xff1a; 注意&#xff1a;下面代码勉强通过&#xff0c;每次都对窗口内字符排序。然后比较字符串。 代码&#xff1a; public List<Integer> findAnagrams(String s, String p) {int start 0, end p.length() - 1;List<Integer> result new ArrayL…

Leetcode——统计数组中的元素练习

错误的集合 此题没做出来 class Solution { public:vector<int> findErrorNums(vector<int>& nums) {vector<int> errorNums(2);int n nums.size();sort(nums.begin(), nums.end());// 记录上一个元素int prev 0;for (int i 0; i < n; i) {int cu…

什么是指标体系,怎么搭建一套完整的指标体系?(附PDF素材)

什么是指标体系&#xff0c;怎么搭建一套完整的指标体系&#xff1f;数字化转型过程中&#xff0c;这个问题一直困扰着数据分析师。主要体现在&#xff1a; 各部门根据业务需求&#xff0c;都有一部分量化指标&#xff0c;但不够全面&#xff0c;对企业整体数据分析应用能力提…

4.0 Beta2版本编译RK3588错误问题解决

最近有小伙伴在问4.0 Beta2版本编译RK3588&#xff08;也就是dayu210&#xff09;时&#xff0c;会有各种莫名奇妙的报错 &#xff08;1&#xff09;subsystem name config incorrect in ....... 这个原因是OH代码加入了编译检查&#xff0c;临时措施是把需要编译检查的文件放…

提升自媒体影音创作效率,这 10 款 AI 工具打工人必备!

随着AI工具的不断涌现&#xff0c;自媒体影音创作的效率也得到了提升&#xff0c;本次为大家介绍10款AI影音工具&#xff0c;为你的自媒体创作助力&#xff01; 还是先上一张脑图&#xff1a; 自媒体必备AI工具 1. Runway AI视频编辑工具&#xff0c;支持文字转视频 目前最…

Elasticsearch:使用 Langchain 和 OpenAI 进行问答

这款交互式 jupyter notebook 使用 Langchain 将虚构的工作场所文档拆分为段落 (chunks)&#xff0c;并使用 OpenAI 将这些段落转换为嵌入并将其存储到 Elasticsearch 中。然后&#xff0c;当我们提出问题时&#xff0c;我们从向量存储中检索相关段落&#xff0c;并使用 langch…

UnityShaderLab —— 简单的流光shader

原理&#xff1a; 就是在原先的模型表面叠加一层可以流动的图片&#xff0c; 算法代码&#xff1a; float2 tex; tex float2(i.uv.x - _Time.x * _Speed,i.uv.y); fixed4 col0 tex2D(_Tex, tex)* _Strenth; fixed4 col1 tex2D(_MainTex, i.uv); return col0 col1; 这里…

ADS版图中连接提示线设置

ADS版图连接提示线设置 简述solve 简述 在ADS版中连接提示线设置&#xff0c;如下图1所示&#xff0c;有点类似于AD中“金线”&#xff0c;提示同一网络的焊盘&#xff0c;但在ads中&#xff0c;是产生了同一层的wire&#xff0c;证据如图2所示。如果没有设置的话&#xff0c;…

【Java学习之道】字符串处理工具类

引言 在Java中&#xff0c;我们常常需要处理字符串&#xff0c;比如读取用户输入、解析文件、网络通信等等。Java提供了一个强大的字符串处理工具类——java.lang.String。这个类包含了各种有用的方法&#xff0c;能帮你轻松地处理字符串。在这一节中&#xff0c;我们将介绍几…

基于 SpringBoot+Hikvision SDK 远程查看配置海康网络摄像头配置

写在前面 工作中遇到&#xff0c;简单整理理解不足小伙伴帮忙指正 对每个人而言&#xff0c;真正的职责只有一个&#xff1a;找到自我。然后在心中坚守其一生&#xff0c;全心全意&#xff0c;永不停息。所有其它的路都是不完整的&#xff0c;是人的逃避方式&#xff0c;是对大…

【C++杂货铺】一文带你走进哈希:哈希冲突 | 哈希函数 | 闭散列 | 开散列

文章目录 一、unordered 系列关联式容器二、unordered_map1.1 unordered_map 介绍1.2 unordered_map 的接口说明1.2.1 unordered_map 的构造1.2.2 unordered_map 的容量1.2.3 unordered_map 的迭代器1.2.4 unordered_map 的元素访问1.2.5 unordered_map 的查询1.2.6 unordered_…

【正点原子STM32连载】 第四十二章 IIC实验 摘自【正点原子】APM32F407最小系统板使用指南

1&#xff09;实验平台&#xff1a;正点原子stm32f103战舰开发板V4 2&#xff09;平台购买地址&#xff1a;https://detail.tmall.com/item.htm?id609294757420 3&#xff09;全套实验源码手册视频下载地址&#xff1a; http://www.openedv.com/thread-340252-1-1.html## 第四…

Zookeeper【Curator客户端Java版】从0到1——万字学习笔记

目录 初识Zookeeper Zookeeper作用 维护配置信息 分布式锁服务 集群管理 生产分布式唯一ID Zookeeper的设计目标 Zookeeper 工作机制 数据模型 ZooKeeper 命令操作 服务端常用命令 客户端常用命令 ZooKeeper JavaAPI操作 Curator 介绍 Curator API 常用操作 导入依赖 建立连接 …

腾讯wifi码推广如何代理?方法详解!

腾讯wifi码推广是一种利用微信扫码连接商家wifi的方式&#xff0c;用户看完广告后就可以免费上网&#xff0c;而推广者则可以获得广告收益。 那么怎样代理腾讯wifi码推广呢&#xff1f; 答案是腾讯官方没有这个项目&#xff0c;那是怎么回事呢&#xff0c;腾讯wifi码正确的名称…

时间复杂度O(n)

目录 一. 前言 二. 时间频度和空间复杂度 2.1. 时间频度 2.2. 空间复杂度 三. 时间复杂度 3.1. 概念 3.2. 常见的时间复杂度 3.3. 计算实例 四. 大O记法 五. 对数log小知识 一. 前言 同一问题可用不同算法解决&#xff0c;而一个算法的质量优劣将影响到算法乃至程序的…

【计算机网络笔记】数据交换之电路交换

系列文章目录 什么是计算机网络&#xff1f; 什么是网络协议&#xff1f; 计算机网络的结构 文章目录 系列文章目录为什么需要数据交换数据交换的类型电路交换什么是多路复用&#xff1f;频分多路复用&#xff08;FDM&#xff09;时分多路复用&#xff08;TDM&#xff09;波分…

深度神经网络压缩与加速技术

// 深度神经网络是深度学习的一种框架&#xff0c;它是一种具备至少一个隐层的神经网络。与浅层神经网络类似&#xff0c;深度神经网络也能够为复杂非线性系统提供建模&#xff0c;但多出的层次为模型提供了更高的抽象层次&#xff0c;因而提高了模型的能力。深度神经网络是一…