RocketMQ为什么要保证订阅关系一致

news2024/11/19 16:23:21

这篇文章,笔者想聊聊 RocketMQ 最佳实践之一:保证订阅关系一致。

订阅关系一致指的是同一个消费者 Group ID 下所有 Consumer 实例所订阅的 Topic 、Tag 必须完全一致。

如果订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

1 订阅关系演示

首先我们展示正确的订阅关系:多个 Group ID 订阅了多个 Topic,并且每个 Group ID 里的多个消费者的订阅关系保持了一致。

接下来,我们展示错误的订阅关系。

从上图中,单个 Group ID 订阅了多个 Topic,但是该 Group ID 里的多个消费者的订阅关系并没有保持一致。 

代码逻辑角度来看,每个消费者实例内订阅方法的主题、 TAG、监听逻辑都需要保持一致。

接下来,我们实验相同消费组,两种不正确的场景,看看消费者和 Broker 服务有什么异常。

订阅主题不同,标签相同

订阅主题相同,标签不同

2 订阅主题不同,标签相同


当我们启动两个消费者后,消费者组名:myconsumerGroup。C1 消费者订阅主题 TopicTest , C2 消费者订阅主题 mytest

在 Broker 端的日志里,会不停的打印拉取消息失败的日志 :

2023-10-09 14:52:53 WARN PullMessageThread_2 -

the consumer’s subscription not exist, group: myconsumerGroup, topic:TopicTest

那么在这种情况下,C1 消费者是不可能拉取到消息,也就不可能消费到最新的消息。

为什么呢 ? 我们知道客户端会定时的发送心跳包到 Broker 服务,心跳包中会包含消费者订阅信息,数据格式样例如下:

"subscriptionDataSet": [

 {

 "classFilterMode": false,

 "codeSet": [],

 "expressionType": "TAG",

 "subString": "*",

 "subVersion": 1696832107020,

 "tagsSet": [],

 "topic": "TopicTest"

 },

 {

 "classFilterMode": false,

 "codeSet": [],

 "expressionType": "TAG",

 "subString": "*",

 "subVersion": 1696832098221,

 "tagsSet": [],

 "topic": "%RETRY%myconsumerGroup"

 }

]

Broker 服务会调用 ClientManageProcessorheartBeat 方法处理心跳请求。

最终跟踪到代码: org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer

Broker 服务的会保存消费者信息,消费者信息存储在消费者表 consumerTable 。消费者表以消费组名为 key , 值为消费者组信息 ConsumerGroupInfo

#org.apache.rocketmq.broker.client.ConsumerManager

private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =

new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);

如果消费组的消费者信息 ConsumerGroupInfo 为空,则新建新的对象。

更新订阅信息时,订阅信息是按照消费组存放的,这步骤就会导致同一个消费组内的各个消费者客户端的订阅信息相互被覆盖。

回到消费者客户端,当消费者拉取消息时,Broker 服务会调用 PullMessageProcessor processRequest 方法 。

首先会进行前置判断,查询当前的主题的订阅信息若该主题的订阅信息为空,则打印告警日志,并返回异常的响应结果。


subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());    

if (null == subscriptionData) {

 log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), 

 response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);

 response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));

 return response;

}

通过调研 Broker 端的代码,我们发现:相同消费组的订阅信息必须保持一致,否则同一个消费组内的各个消费者客户端的订阅信息相互被覆盖,从而导致某个消费者客户端无法拉取到新的消息。

C1 消费者无法消费主题 TopicTest 的消息数据,那么 C2 消费者订阅主题 mytest,消费会正常吗 ?

上图来看,依然有问题。 主题 mytest 有四个队列,但只有两个队列被分配了, 另外两个队列的消息就没有办法消费了。

要解释这个问题,我们需要重新温习负载均衡的原理。

负载均衡服务会根据消费模式为” 广播模式” 还是 “集群模式” 做不同的逻辑处理,这里主要来看下集群模式下的主要处理流程:

(1) 获取该主题下的消息消费队列集合;

(2) 查询 Broker 端获取该消费组下消费者 Id 列表;

(3) 先对 Topic 下的消息消费队列、消费者 Id 排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列;

这里的平均分配算法,类似于分页的算法,将所有 MessageQueue 排好序类似于记录,将所有消费端排好序类似页数,并求出每一页需要包含的平均 size 和每个页面记录的范围 range ,最后遍历整个 range 而计算出当前消费端应该分配到的记录。

(4) 分配到的消息队列集合与 processQueueTable 做一个过滤比对操作。

 

消费者实例内 ,processQueueTable 对象存储着当前负载均衡的队列 ,以及该队列的处理队列 processQueue (消费快照)。

  1. 标红的 Entry 部分表示与分配到的消息队列集合互不包含,则需要将这些红色队列 Dropped 属性为 true , 然后从 processQueueTable 对象中移除。
  2. 绿色的 Entry 部分表示与分配到的消息队列集合的交集,processQueueTable 对象中已经存在该队列。
  3. 黄色的 Entry 部分表示这些队列需要添加到 processQueueTable 对象中,为每个分配的新队列创建一个消息拉取请求 pullRequest , 在消息拉取请求中保存一个处理队列 processQueue (队列消费快照),内部是红黑树(TreeMap),用来保存拉取到的消息。

最后创建拉取消息请求列表,并将请求分发到消息拉取服务,进入拉取消息环节。

通过上面的介绍 ,通过负载均衡的原理推导,原因就显而易见了。

C1 消费者被分配了队列 0、队列 1 ,但是 C1 消费者本身并没有订阅主题 mytest , 所以无法消费该主题的数据。

从本次实验来看,C1 消费者无法消费主题 TopicTest 的消息数据,C2 消费者只能部分消费主题 mytest 的消息数据。

但是因为在 Broker 端,同一个消费组内的各个消费者客户端的订阅信息相互被覆盖,所以这种消费状态非常混乱,偶尔也会切换成:C1 消费者可以部分消费主题 TopicTest 的消息数据,C2 消费者无法消费主题 mytest 的消息数据。

3 订阅主题相同,标签不同

如图,C1 消费者和 C2 消费者订阅主题 TopicTest ,但两者的标签 TAG 并不相同。

启动消费者服务之后,从控制台观察,负载均衡的效果也如预期一般正常。

笔者在 Broker 端打印埋点日志,发现主题 TopicTest 的订阅信息为 :


{

 "classFilterMode": false,

 "codeSet": [66],

 "expressionType": "TAG",

 "subString": "B",

 "subVersion": 1696901014319,

 "tagsSet": ["B"],

 "topic": "TopicTest"

}

 

那么这种状态,消费正常吗 ?笔者做了一组实验,消费依然混乱:

C1 消费者无法消费 TAG 值为 A 的消息 ,C2 消费者只能消费部分 TAG 值为 B 的消息

想要理解原因,我们需要梳理消息过滤机制。

首先 ConsumeQueue 文件的格式如下 :

  1. Broker 端在接收到拉取请求后,根据请求参数定位 ConsumeQueue 文件,然后遍历 ConsumeQueue 待检索的条目, 判断条目中存储 Tag 的 hashcode 是否和订阅信息中 TAG 的 hashcode 是否相同,若不符合,则跳过,继续对比下一个, 符合条件的聚合后返回给消费者客户端。
  2. 消费者在收到过滤后的消息后,也要执行过滤机制,只不过过滤的是 TAG 字符串的值,而不是 hashcode

我们模拟下消息过滤的过程:

首先,生产者将不同的消息发送到 Broker 端,不同的 TAG 的消息会发送到保存的不同的队列中。

C1 消费者从队列 0 ,队列 1 中拉取消息时,因为 Broker 端该主题的订阅信息中 TAG 值为 B ,经过服务端过滤后, C1 消费者拉取到的消息的 TAG 值都是 B , 但消费者在收到过滤的消息后,也需要进行客户端过滤,A 并不等于 B ,所以 C1 消费者无法消费 TAG 值为 A 的消息。

C2 消费者从队列 2, 队列 3 中拉取消息,整个逻辑链路是正常的 ,但是因为负载均衡的缘故,它无法消费队列 0 ,队列 1 的消息。

4 总结

什么是消费组 ?消费同一类消息且消费逻辑一致 。RocketMQ 4.X 源码实现就是为了和消费组的定义保持一致 。

规避订阅关系不一致这个问题有两种方式:

  • 合理定义好主题和标签

当我们定义好主题和标签后,需要添加新的标签时,是否可以换一个思路:换一个新的消费组或者新建一个主题。

  • 严格规范上线流程

在上线之前,梳理好相关依赖服务,梳理好上线流程,做好上线评审,并严格按照流程执行。

最后的思考:

假如从基础架构层面来思考,将订阅关系信息中心化来设计,应该也可以实现 ,但成本较高,对于中小企业来讲,并不合算。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1088842.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

FreeSOLO: Learning to Segment Objects without Annotations*(论文解析)

FreeSOLO: Learning to Segment Objects without Annotations* 摘要引言 摘要 实例分割是一项基本的计算机视觉任务&#xff0c;旨在识别并分割图像中的每个对象。然而&#xff0c;要学习实例分割通常需要昂贵的注释&#xff0c;例如边界框和分割掩模。在这项工作中&#xff0…

使用PyQt5创建图片查看器应用程序

使用PyQt5创建图片查看器应用程序 作者&#xff1a;安静到无声 个人主页 在本教程中&#xff0c;我们将使用PyQt5库创建一个简单的图片查看器应用程序。这个应用程序可以显示一系列图片&#xff0c;并允许用户通过按钮切换、跳转到不同的图片。 1. 准备工作 首先&#xff0…

小黑子—MyBatis:第四章

MyBatis入门4.0 十 小黑子进行MyBatis参数处理10.1 单个简单类型参数10.1.1 单个参数Long类型10.1.2 单个参数Date类型 10.2 Map参数10.3 实体类参数&#xff08;POJO参数&#xff09;10.4 多参数10.5 Param注解&#xff08;命名参数&#xff09;10.6 Param注解源码分析 十一 小…

CVE-2017-15715 apache换行解析文件上传漏洞

影响范围 httpd 2.4.0~2.4.29 复现环境 vulhub/httpd/CVE-2017-15715 docker-compose 漏洞原理 在apache2的配置文件&#xff1a; /etc/apache2/conf-available/docker-php.conf 中&#xff0c;php的文件匹配以正则形式表达 ".php$"的正则匹配模式意味着以.ph…

金蝶EAS、EAS Cloud远程代码执行漏洞

【漏洞概述】 金蝶 EAS 及 EAS Cloud 是金蝶软件公司推出的一套企业级应用软件套件&#xff0c;旨在帮助企业实现全面的管理和业务流程优化。 【漏洞介绍】 金蝶 EAS 及 EAS Cloud 存在远程代码执行漏洞 【影响版本】 金蝶 EAS 8.0&#xff0c;8.1&#xff0c;8.2&#xf…

风电光伏混合储能功率小波包分解、平抑前后波动性分析、容量配置、频谱分析、并网功率波动分析(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

流计算概述(林子雨慕课课程)

文章目录 11. 流计算概述11.1 流计算概述11.1.1 数据的处理模型11.1.2 流计算概念与典型框架 11.2 流计算处理流程11.3 流计算的应用11.4 开源流计算框架Storm11.4.1 Storm 简介11.4.2 Storm设计思想11.4.3 Storm框架设计 11.5 Spark Spark Streaming Samza以及三种流计算框架比…

Python如何17行代码画一个爱心

&#x1f308;write in front&#x1f308; &#x1f9f8;大家好&#xff0c;我是Aileen&#x1f9f8;.希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流. &#x1f194;本文由Aileen_0v0&#x1f9f8; 原创 CSDN首发&#x1f412; 如…

xtrabackup全备 增备

版本针对mysql8.0版本 官方下载地址 https://www.percona.com/downloads 自行选择下载方式 yum安装方式 1、下载上传服务器 安装软件 [rootmaster mysql]# ll percona-xtrabackup-80-8.0.33-28.1.el7.x86_64.rpm -rw-r--r--. 1 root root 44541856 Oct 10 13:25 percona-x…

android 判断是否打开了蓝牙网络共享

最近做项目遇到需要判断手机是否打开了蓝牙网络共享的开关 //调用isBluetoothPanTetheringOn(context) {if (it) {Log.i("TAG","已打开")} else {Log.i("TAG","未打开")context.gotoBleShareSettings()} }/*** 是否打开蓝牙网络共享**…

idea中取消class文件显示所有方法的显示

一 idea中class文件取消显示方法 1.1 取消显示方法 1.显示如下 2.操作如下 3.显示如下

2023年中国固废处理行业研究报告

第一章 行业概况 1.1 定义 固体废物处理是一个日益重要的领域&#xff0c;随着中国城市化进程的加速和工业产值的持续增长&#xff0c;固体废物的产生量也在不断上升。根据《固体废物污染环境防治法》的定义&#xff0c;固体废物包括了人类在生产、生活和其他活动中产生的固态…

基于单目的光流法测速

目录 1.简介 2.代码实现 1.简介 基于单目的光流法是一种常见的计算机视觉技术&#xff0c;用于估计图像序列中物体的运动速度。它通过分析连续帧之间的像素变化来推断物体在图像中的移动情况。 背景&#xff1a; 光流法是计算机视觉领域中最早的运动估计方法之一&#xff0c…

BUUCTF 金三 1

BUUCTF:https://buuoj.cn/challenges 题目描述&#xff1a; 只有一个附件&#xff0c;下载下来有一张GIF图片。 解题思路&#xff1a; 本题一共有2种解法&#xff08;本人找到的&#xff09; 方法一&#xff1a; 1、打开这张GIF图片&#xff0c;观察到不正常闪动&#xff0c;似…

《YOLO医学影像检测》专栏介绍 CSDN独家改进实战

&#x1f4a1;&#x1f4a1;&#x1f4a1;YOLO医学影像检测&#xff1a;http://t.csdnimg.cn/N4zBP ✨✨✨实战医学影像检测项目&#xff0c;通过创新点验证涨点可行性&#xff1b; ✨✨✨入门医学影像检测到创新&#xff0c;不断打怪进阶&#xff1b; 1.血细胞检测介绍 数据…

数据结构 - 2(顺序表10000字详解)

一&#xff1a;List 1.1 什么是List 在集合框架中&#xff0c;List是一个接口&#xff0c;继承自Collection。 Collection也是一个接口&#xff0c;该接口中规范了后序容器中常用的一些方法&#xff0c;具体如下所示&#xff1a; Iterable也是一个接口&#xff0c;Iterabl…

security+JWT

securityJWT 添加依赖准备工作sqlUserInfoUserMapperUserService、UserServiceImpl 创建JwtUtils工具类&#xff0c;做token的生成和校验进入Security创建AccountDetailsServiceImpl&#xff0c;并且实现UserDetailsService编写登录操作 创建拦截器JWTAuthenticationFilter继承…

mac电影特效合成软件nuke15 完美激活版下载

Nuke 15是一款由英国The Foundry公司开发的专业的合成软件&#xff0c;被广泛用于电影、电视和广告制作中的后期合成和特效制作。 Mac软件下载&#xff1a;nuke15 完美激活版下载 Win软件下载&#xff1a;NUKE 13 中文激活版 Nuke 15拥有强大的功能和灵活性&#xff0c;可以帮助…

TartanVO: A Generalizable Learning-based VO 服务器复现(rtx3090 py3)

源码地址 代码地址&#xff1a;https://github.com/castacks/tartanvo/tree/python3 配环境 git clone https://github.com/castacks/tartanvo.git -b python3创建conda环境&#xff1a; conda create -n tartanvo python3.8安装pytorch conda install pytorch1.10.1 torc…

路由router

什么是路由? 一个路由就是一组映射关系&#xff08;key - value&#xff09;key 为路径&#xff0c;value 可能是 function 或 component 2、安装\引入\基础使用 只有vue-router3&#xff0c;才能应用于vue2&#xff1b;vue-router4可以应用于vue3中 这里我们安装vue-router3…