RocketMQ你不得不了解的 Rebalance机制源码分析

news2025/1/12 15:47:17

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

RocketMQ版本

  • version: 5.1.0

RocketMQ中consumer消费模型

在了解RocketMQ的Rebalance机制之前,我们必须先简单了解下rocketmq的消费模型

我们知道在我们创建topic的时候需要指定一个参数就是读队列数

这里假设我们的topic是xiaozoujishu-topic,我们的读队列数
是4个,我们同一gid下的集群消费模式的消费者有两个,那么我们消费者是如何消费消息的呢
首先需要明确的是:

  1. 这里我们的消费模式是集群消费
  2. queue的负载均衡算法是使用默认的AllocateMessageQueueAveragely(平均分配)
    假设我们项目刚开始只有一个消费者,那么我们的消费队列分配就如下:

四个队列分配给一个消费者

此时如果我们再启动一个消费者,那么这时候就会进行Rebalance,然后此时我们的队列分配就变成如下:

所以通过上面的队列分配我就知道Rebalance是个啥了,我们下面对Rebalance进行一些定义

RocketMQ的Rebalance是什么

Rebalance(重新平衡)机制指的是:将一个Topic下的多个队列(queue),在同一个消费者组(consumer group)(gid)下的多个消费者实例(consumer instance)之间进行重新分配

Rebalance的目的

从上面可以看出Rebalance的本意是把一个topic的queue分配给合适的consumer,本意其实是为了提升消息的并行处理能力

但是Rebalance也带来了一些危害,后面我们会重点分析下

Rebalance的触发原因

我们这里先说结论

  1. 订阅Topic的队列数量变化
  2. 消费者组信息变化

这里是最深层的原因,就是topic的队列数量、消费组信息
实际我们可以将这些归结为Rebalance的元数据,这些元数据的变更,就会引起clinet的Rebalance

注意RocketMQ的Rebalance是发生在client

这些元数据都在管broker管理
核心就是这三个类

  • TopicConfigManager
  • SubscriptionGroupManager
  • ConsumerManager

只要这个三个类的信息有变化,client就会进行Rebalance
下面我们可以具体说下什么情况下会让这三个类变化

订阅Topic的队列数量变化

什么情况下订阅Topic的队列数量会变化呢?

  1. broker扩容
  2. broker缩容
  3. broker宕机(本质也是类似缩容)

消费者组信息变化

什么时候消费者组信息会变化呢?

核心就是consumer的上下线,具体细分又可以分为如下原因:

  1. 服务日常滚动升级
  2. 服务扩容
  3. 服务订阅消息发生变化

源码分析

上面大致介绍了Rebalance的触发原因,现在我们结合源码来具体分析下

我们就从consumer的启动开始分析吧

这里我们以最简单的demo为例

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
    consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe(TOPIC, "*");
    consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
//            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();

        System.out.printf("Consumer Started.%n");
    

这里我们直接注意到
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
这个方法,看名字就知道是client向所有的broker发送心跳

我们进入到sendHeartbeatToAllBrokerWithLock方法看看

    private void sendHeartbeatToAllBroker() {
        final HeartbeatData heartbeatData = this.prepareHeartbeatData();
        final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
        final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
        if (producerEmpty && consumerEmpty) {
            log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
            return;
        }

        if (this.brokerAddrTable.isEmpty()) {
            return;
        }
        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
        for (Entry<String, HashMap<Long, String>> brokerClusterInfo : this.brokerAddrTable.entrySet()) {
            String brokerName = brokerClusterInfo.getKey();
            HashMap<Long, String> oneTable = brokerClusterInfo.getValue();
            if (oneTable == null) {
                continue;
            }
            for (Entry<Long, String> singleBrokerInstance : oneTable.entrySet()) {
                Long id = singleBrokerInstance.getKey();
                String addr = singleBrokerInstance.getValue();
                if (addr == null) {
                    continue;
                }
                if (consumerEmpty && MixAll.MASTER_ID != id) {
                    continue;
                }

                try {
                    int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
                    if (!this.brokerVersionTable.containsKey(brokerName)) {
                        this.brokerVersionTable.put(brokerName, new HashMap<>(4));
                    }
                    this.brokerVersionTable.get(brokerName).put(addr, version);
                    if (times % 20 == 0) {
                        log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                        log.info(heartbeatData.toString());
                    }
                } catch (Exception e) {
                    if (this.isBrokerInNameServer(addr)) {
                        log.warn("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
                    } else {
                        log.warn("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
                                id, addr, e);
                    }
                }
            }
        }
    }

这段代码主要是通过this.brokerAddrTable.entrySet()获取到所有的master broker地址,然后进行心跳发送

具体的心跳发送代码实际是在下面代码中进行的

                    int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());

我们进入到该方法会发现和我们之前分析的一样,就是发送一个请求到broker,请求码是RequestCode.HEART_BEAT

我们看看RequestCode.HEART_BEAT的调用找到`broker的处理逻辑

很快我们通过方法名就能定位到处理client的请求的方法是ClientManageProcessor类的processRequest

我们具体进去看看这个方法

可以看到具体的逻辑被封装在return this.heartBeat(ctx, request);这个方法中,所以我们需要再进去看看

进去这个方法我们能看到一个比较核心的方法
registerConsumer

很明显这个方法就是注册consumer的方法

这个方法里面和Rebalance相关比较核心的方法就是这三个

  1. consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);

这里我们可以看看clientChannelInfo里面是个啥玩意

具体深入到updateChannel方法里面就是判断是否为新的client,是就更新channelInfoTable

2.updateSubscription

这个方法就是判断订阅关系是否发生了变化并更新订阅关系

  1. callConsumerIdsChangeListener
  • callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
    这个方法就是通知client进行Rebalance,具体的实现是参考了类似事件订阅的方式去实现的,这里是发送了一个CHANGE事件

这里我们可以简单看看事件定义的类型有哪些

我们直接看看具体的事件处理类

可以看到实现类有多个,我们直接看broker模块的DefaultConsumerIdsChangeListener类即可

可以看到这里是给该group所有的client发送Rebalance消息

具体的消息状态码是
RequestCode.NOTIFY_CONSUMER_IDS_CHANGED

client Rebalance

通过上面我们大致找到了整个通信过程,但是实际的Rebalance是发生在client,所以我们还是需要继续回到client的代码

我们通过状态码RequestCode.NOTIFY_CONSUMER_IDS_CHANGED
找到client的处理类ClientRemotingProcessor

实际处理方法就是

this.mqClientFactory.rebalanceImmediately();

我们进入这个方法看看这里最终就是唤醒阻塞的Rebalance线程

所以实际的方法调用还是在RebalanceServicerun方法

最终还是调用的是MQConsumerInner接口中的doRebalance方法

这里有个细节,为什么不是直接调用一个静态方法,要搞这么多花里胡哨的唤醒线程操作?

原来是cleint也会定时去Rebalance
默认是20s一次,可以配置

可以通过参数rocketmq.client.rebalance.waitInterval去配置

那么为什么client还要自己去循环Rebalance

原来这里是防止因为网络等其他原因丢失了broker的请求,后续网络回复了,也能进行进行Rebalance

下面我们继续看看Rebalance的实现细节

这里我们以常用的DefaultMQPushConsumerImpl为例

实际这里最终调用的还是抽象类RebalanceImpldoRebalance方法

可以看到这里的Rebalance是按照topic的维度

我们先理解订阅单个topic的原理

这里的就是先对topic的queue排序,然后对consumer排序,
然后调用AllocateMessageQueueStrategy的allocate方法
这里我们暂时只分析默认的平均分配算法(AllocateMessageQueueAveragely),也就是我们最先说的分配算法。其他算法可以详细分析

这里的分配方式就是我们前面画图的,比如4个queue,2个consumer,那么就是每个consumer2个queue。
简单举例就是我们的 queue有q1、q2、q3、q4
consumer有 c1、c2

那么就是
c1:q1、q2
c2:q2、q3

需要注意的是如果consumer大于queue数量,多出的consumer就不会被分配到queue

client什么时候触发Rebalance

上面分析了这么多原理,这里我们总结下client什么时候会触发Rebalance

  1. consumer启动时会向所有master broker发送心跳,然后broker发送信息通知所有consumer触发Rebalance
  2. 启动完成后consumer会周期的触发Rebalance,防止因为网络等问题丢失broker的通知而没有Rebalance
  3. 当consumer停止时,也会通过之前分析的事件机制,触发注销comsuer事件然后通知所有的comsuer触发Rebalance

总结

这里我们详细介绍了client是如何触发Rebalance的,以及触发Rebalance的时机,也介绍了Rebalance的好处。
实际还有很多细节我们限于篇幅暂未分析。
后面我们会继续分析Rebalance的坏处和一些详细的Rebalance算法

参考

  • RocketMQ源码
  • 博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/537960.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

chatgpt赋能Python-left函数_python

Left 函数在Python中的使用及其优点 在Python编程语言中&#xff0c;字符串处理是不可避免的任务。Python提供了许多内置函数来处理字符串&#xff0c;其中left()函数是其中一个非常重要的函数。本文将介绍left()函数的用法、优点和一些实例&#xff0c;以便更好的理解该函数。…

redis高级篇(2)---主从

一)搭建主从架构: 单节点Redis的并发能力是有限的&#xff0c;所以说要想进一步提高Redis的并发能力&#xff0c;就需要搭建主从集群&#xff0c;实现读写分离&#xff0c;因为对于Redis来说大部分都是读多写少的场景&#xff0c;更多的要进行读的压力&#xff0c;最基本都要是…

【Android学习专题】java基本语法和概念(学习记录)

学习记录来自菜鸟教程 Java 变量 Java 中主要有如下几种类型的变量 局部变量 在方法、构造方法或者语句块中定义的变量被称为局部变量。变量声明和初始化都是在方法中&#xff0c;方法结束后&#xff0c;变量就会自动销毁类变量&#xff08;静态变量&#xff09; 类变量也声…

chatgpt赋能Python-libreoffice_python宏

介绍 LibreOffice是一套免费开源的办公软件&#xff0c;其中包含一个强大的Python宏系统&#xff0c;可以使用Python编写脚本来增强办公软件的功能。本文将介绍LibreOffice Python宏是什么&#xff0c;如何使用Python编写宏&#xff0c;并提供一些示例&#xff0c;以便读者可以…

去付款--支付宝沙箱的简单测试

alipay-demo 进入开发者中心–开发工具–沙箱–设置公钥 搜索电脑网上支付–查看Demo–查看配置类–查看业务逻辑 我们的基础配置类主要是初始化我们的alipay客户端 真正去付款的时候是提交了一个form表单达到一个真正的支付jsp,java代码首先初始化我买的Alipay客户端&#xf…

瑞吉外卖 - 新增分类功能(11)

某马瑞吉外卖单体架构项目完整开发文档&#xff0c;基于 Spring Boot 2.7.11 JDK 11。预计 5 月 20 日前更新完成&#xff0c;有需要的胖友记得一键三连&#xff0c;关注主页 “瑞吉外卖” 专栏获取最新文章。 相关资料&#xff1a;https://pan.baidu.com/s/1rO1Vytcp67mcw-PD…

网安学习踩坑经验篇

回想学习网络安全一年来&#xff0c;踩了不少坑走了不少弯路&#xff0c;在此稍作总结&#xff0c;希望可以帮助那些想要入门 web 安全或者是想打CTF的同学们一些建议 坑点 先总结一下&#xff0c;我在学习中遇到的坑点 只看视频&#xff0c;眼高手低&#xff0c;不练习&…

【嵌入式Linux】设备树基本语法

设备树基本语法 1_总领-本期设备树视频要怎么讲&#xff1f;讲什么&#xff1f;_哔哩哔哩_bilibili 基本的 特殊的 中断控制 描述GIC控制器 时钟 CPU GPIO 个数&#xff0c;保留范围&#xff08;起始、长度&#xff09;&#xff0c;个数对应的名字 GPIO映射-这个脚被用了换一…

chatgpt赋能Python-numpy_归一化

NumPy归一化&#xff1a;理解数据规范化的重要性 什么是归一化&#xff1f; 在数据科学和机器学习中&#xff0c;归一化是预处理数据的一种常用技术。归一化是指将数据缩放到一个特定的范围内&#xff0c;通常是0到1或-1到1之间。 例如&#xff0c;我们可能比较一家医院的三…

渗透测试--5.3.使用john破解密码

前言 由于Linux是Internet最流行的服务器操作系统&#xff0c;因此它的安全性备受关注。这种安全主要靠口令实现。 Linux使用一个单向函数crypt&#xff08;&#xff09;来加密用户口令。单向函数crypt&#xff08;&#xff09;从数学原理上保证了从加密的密文得到加密前的明…

Java笔记_22(反射和动态代理)

Java笔记_22 一、反射1.1、反射的概述1.2、获取class对象的三种方式1.3、反射获取构造方法1.4、反射获取成员变量1.5、反射获取成员方法1.6、综合练习1.6.1、保存信息1.6.2、跟配置文件结合动态创建 一、反射 1.1、反射的概述 什么是反射? 反射允许对成员变量&#xff0c;成…

基于IC5000烧录器使用winIDEA烧写+调试程序(S32K324的软件烧写与调试)

目录 一、iSYSTEM简介二、如何使用iSYSTEM winIDEA烧写调试程序2.1 打开winIDEA&#xff1a;2.2 新建一个Workspace;2.3 硬件配置:2.4 选择CPU芯片型号&#xff1a;2.5 加载烧写文件&#xff1a;2.6 开始烧录程序&#xff1a;2.7 程序调试Debug&#xff1a;2.7.1 运行程序&…

PCL点云处理之单点选择的交互操作(一百六十七)

PCL点云处理之单点选择的交互操作(一百六十七) 一、效果展示二、实现代码一、效果展示 交互选择点,输出点信息,具体如下图所示 二、实现代码 #include <pcl/io/pcd_io.h> #include <pcl/point_types.h> #include <pcl/visualization/pcl_visu

Spring Cloud Alibaba(二)Nacos统一配置管理

目录 一、为什么需要配置中心 二、常用的配置中心 Nacos 的几个概念 三、Nacos配置中心的使用 &#xff08;一&#xff09;properties格式 1、导入依赖 2、在配置中心新建配置 3、修改配置文件名为bootstrap.yml 4、在微服务中添加nacos config服务地址的配置 5、测试…

chatgpt赋能Python-numpy精度

Numpy精度介绍 Numpy是一个用于进行科学计算的Python库&#xff0c;它提供了多维数组对象以及一系列用于操作数组的函数。Numpy的广泛使用使其成为数据科学中的重要组成部分。然而&#xff0c;Numpy中的精度问题却常常被忽视。 浮点数精度问题 在Numpy中&#xff0c;浮点数是…

STL与string类的认识及简单使用

STL与string类的认识及简单使用 一、STL二、string类构造函数容量操作访问及遍历操作迭代器 修改操作非成员函数重载关系运算符重载getline 三、总结 一、STL STL(standard template libaray-标准模板库)&#xff1a;是C标准库的重要组成部分&#xff0c;不仅是一个可复用的组…

【计算机毕设】基于SringBoot+Vue的校园二手交易平台(含支付)

在导师的严格指导下&#xff0c;我的毕业设计终于完成了&#xff0c;毕设被推优算是给大学生活画上了圆满的句号&#xff0c;几个月的努力也没白费。在开发的过程中收获了很多&#xff0c;也遇到很多问题&#xff0c;但因怕时间来不及&#xff08;根本不知道截止时间TvT&#x…

点餐小程序实战教程04-餐品分类及餐品数据源设计

我们已经利用一定篇幅实现了店铺信息展示的功能,本篇我们来实现一下点餐的逻辑。点餐的逻辑有以下: 用户打开点餐页面,利用侧边栏导航来切换菜品初始状态用户未点餐,显示一个加号的图标点击加号显示数量,需要将菜品加入购物车,购物车显示选购菜品的数量和总价点击减号可以…

[日记]LeetCode算法·二十五——二叉树⑤ AVL树(插入+删除)附代码实现

本章的代码实现基于上一篇BST与优先队列的基类进行平衡二叉树&#xff0c;即AVL树。 文章目录 AVL的概念AVL查询效率AVL的插入1.插入节点2.更新平衡因子BF3.旋转调整树的结构3.1 LL 右旋3.2 RR 左旋3.3 LR 左右双旋3.4 RL 右左双旋 4 插入总结 AVL的删除1.寻找删除节点2.更新平…