RocketMQ 消费者Rebalance 解析——图解、源码级解析

news2025/1/12 1:35:26

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年4月15日

🍊 新专栏筹备中,还是熟悉的源码,还是熟悉的感觉!

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • 什么是消息负载均衡?
  • Rebalance的触发条件
  • 负载策略使用方法
  • 消息消费默认策略

什么是消息负载均衡?

Rebalance机制: 将一个Topic下的多个队列在同一个消费者组下的多个消费者实例之间进行重新分配。该机制的目的是为了提高消息的并行处理能力

例如: 一个Topic下有5个队列,如果只有一个消费者的话,这个消费者就会处理所有队列的消息。如果有两个消费者的话,就可以两个消费者共同处理这5个队列

在这里插入图片描述


但Rebalance机制也存在明显的限制与危害:

  • 如果消费者组下的消费者实例数量大于队列数量时,多余的消费者将分配不到任何实例
  • 消费暂停: 只有Concumer 1时,其负责所有队列的消息处理;如果此时新增了Consumer 2,触发Rebalance时,需要给它分配几个队列,此时Concumer 1就需要停止这几个队列的消费
  • 重复消费: Concumer 2在消费分配到自己的队列时,必须从Concumer 1已经消费到的地方(offset)开始继续消费。默认情况下,offset是异步提交的,如果Concumer 1之前消费到了第10条数据,但此时Broker记录的offset还是第8条数据,如果Concumer 2从第8条数据开始消费的话,就会有两条消息重复。异步提交offset的间隔时间越久,可能造成的重复消费就越多
  • 消费洪峰: 如果因为Rebalance机制造成了需要重复消费的消息过多 或者 由于Rebalance导致的暂停时间过长,导致有消息积压,就有可能在Rebalance之后需要瞬间消费很多消息

Broker起到的作用: Broker端主要负责Rebalance相关的元数据的维护,通知机制,扮演协调者的角色



Rebalance的触发条件

从根本上来看,触发Rebalance的原因只有两个:

  1. Topic下的队列数量变化
  2. Consumer组信息变化

其中会导致这两项变化的典型场景为:

队列数量变化典型场景:
1. Broker宕机
2. Broker停机维护
3. 队列扩容 / 缩容
Consumer组信息变化典型场景:
1. 消费者机器启动 / 停止
2. 消费者宕机
3. 网络异常导致消费者与Broker断联
4. 消费者扩容 / 缩容
5. Topic订阅信息变化

无论发生上面的哪种情况,Broker都会主动通知消费者组下的所有实例进行Rebalance,相关源码如ConsumerManagerregisterConsumer方法所示:

public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
    // 查找consumer组信息,如果没有则创建一个新的
    // consumerTable:维护所有的Consumer
    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    
    if (null == consumerGroupInfo) {
        ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
        // put到map里
        ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
        consumerGroupInfo = prev != null ? prev : tmp;
    }
    // 更新Consumer信息,客户端信息,返回消费者组下实例信息是否变化
    boolean r1 =
        consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                                        consumeFromWhere);
    // 更新订阅Topic信息,返回消费者订阅信息是否变化
    boolean r2 = consumerGroupInfo.updateSubscription(subList);
    
    // 如果消费者实例 或 消费者订阅信息变化,则rebalance
    if (r1 || r2) {
        if (isNotifyConsumerIdsChangedEnable) {
            this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
        }
    }
    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
    return r1 || r2;
}

负载策略使用方法

消费者使用不同的消费附在策略只需要简单地使用set方法即可,即直接使用consumer.setAllocateMessageQueueStrategy(策略对象)即可,源码如下所示

public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group Name");
        consumer.subscribe("Target Topic", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 设置平均负载策略
        consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
        
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }

消息消费默认策略

如果没有手动指定负载均衡均衡策略算法,RocketMQ将会使用内部的默认策略,DefaultMQPushConsumer通过构造函数内设置AllocateMessageQueueAveragely来指定默认策略。

在这里插入图片描述

在RocketMQ的实现中,Producer会把消息发送给对应的Topic,同一个Topic下的所有消息会被负载均衡至多个Queue里,之后消息队列的Broker会将同一个Topic下的所有Queue再分配至订阅了该Topic的Consumer组里,再由组内所有消费者进行消费。

消费者机器与Queue的个数可能有以下几种情况:

  1. 消费者机器数大于Queue个数时:部分机器会处理不到消息
    在这里插入图片描述

  2. 消费者机器数等于Queue个数时:每个消费者负责消费一个Queue里的消息
    在这里插入图片描述

  3. 消费者机器数小于Queue个数时:每个消费者可能消费多个Queue里的消息
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/429238.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

gRPC 四种RPC类型异同

gRPC定义了如下四种RPC&#xff0c;刚开始接触的时候&#xff0c;感觉挺奇怪的&#xff0c;RPC不就是接口调用吗&#xff0c;区分这么多干啥&#xff1f;难道实现原理不一样&#xff1f;未读源码之前&#xff0c;还想着有啥神秘的地方&#xff0c;看完源码之后&#xff0c;才发…

Guns社区医疗项目

又是一年毕业季&#xff0c;计算机专业大四的同学们要接受毕业设计的考验啦。又有多少同学为了毕业设计而愁眉苦脸&#xff0c;心力憔悴。考虑到这些&#xff0c;这里为同学们分享一个适合你们毕业设计的作品以及详细介绍&#xff0c;让正在焦头烂额的同学们有所启发&#xff0…

MPC的560x系列的运行模式的介绍

一、模式简介 1、运行模式 一共11种模式&#xff0c;分别为RESET、DRUN、SAFE、TEST、RUN0、RUN1、RUN2、RUN3、HALT、STOP、STANDBY。其中RESET、DRUN、SAFE、TEST是系统工作模式&#xff0c;用户不用个特别关系&#xff0c;而后面几种是用于经常使用到的工作模式。 RESET&a…

ASP.NET Core - 依赖注入(二)

2&#xff0c;NET Core 依赖注入的基本用法 话接上篇&#xff0c;这一章介绍 .NET Core 框架自带的轻量级 Ioc 容器下服务使用的一些知识点&#xff0c;大家可以先看看上一篇文章 [ASP.NET Core - 依赖注入(一)] 2.3 服务解析 通过 IServiceCollection 注册了服务之后&#xf…

五一假期将近,给景区视频监控方案提几点建议

一、行业背景 随着旅游业的不断发展&#xff0c;旅游安全问题越来越受到重视。尤其是五一假期将近&#xff0c;为确保游客在景区内的人身安全和财产安全&#xff0c;景区必须采用高效、可靠的安防视频监控系统&#xff0c;并进行科学规划和设计&#xff0c;从而实现及时发现安…

Simulink simscape绳索和滑轮的使用总结

在做仿真的时候使用了绳索和滑轮&#xff0c;网上的资料不是很多&#xff0c;所以想想还是自己做一下记录&#xff0c;提供了一个小案例&#xff0c;文件上传到我的资源里了。 1.滑轮约束 Pulley 这个模块可以建立滑轮和绳索之间的约束。这个约束保证了绳子和滑轮之间 The pull…

Memory Analyzer Mat

目录 一、JDK 、JRE和JVM 的关系 二、Java进程内存占用查询命令 2.1JAVA 代码是如何执行的 2.2何时用hrpof文件分析内存 三、Memory Analyzer Mat 3.1Memory Analyzer Mat安装 3.2 Overview视图 3.2.1直方图视图&#xff08;histogram&#xff09; 3.2.2 Dominator Tr…

计算机图形学 Animation 学习笔记

1、做插值&#xff1a; 线性插值和用贝塞尔曲线做插值&#xff0c;贝塞尔曲线做插值可以看起来更生动形象 2、物理模拟&#xff08;Simulate Physics&#xff09; 原理是利用“ 力 质量 x 加速度”&#xff0c;知道了这些信息&#xff0c;那么下一帧的位置就可以提前被运算出来…

Ethercat学习-从站FOE固件更新(TwinCAT主站)

文章目录简介协议说明1.读请求2.写请求3.数据4.应答5.错误码6.忙数据传输流程1.读流程2.写流程3.忙操作代码实现1.源码生成与移植2.代码解析1.FOE_ServiceInd2.FOE_Read3.FOE_Write4.FOE_Ack5.FOE_Data6.FOE_Error7.FOE_Busy其他TwinCAT测试简介 FOE(File Access over Etherca…

React从入门到入土系列3-使用React构建你的应用

这是我自己系统整理的React系列博客&#xff0c;主要参考2023年3月开放的最新版本react官网内容&#xff0c;欢迎你阅读本系列内容&#xff0c;希望能有所收货。 本文是该系列的第3篇文章&#xff0c;阅读完本文后你将收获&#xff1a; 如何使用React逐步构建你的应用了解prop…

《分布式商业》

读完《分布式商业&#xff1a;数字化时代的新商业变革》&#xff0c;说实话&#xff0c;如果读者没有技术研发背景&#xff0c;读完此书&#xff0c;是很难在技术和商业层面引起共鸣。我甚至觉得&#xff0c;这本书就是写给技术类岗位的同学看的&#xff1b;而写这本书的人&…

Softing工业边缘产品的新功能助力工业数据集成到IT解决方案中

Softing的edgeConnector和edgeAggregator产品在3.40版本中新增了一系列功能&#xff0c;使数据集成到IT解决方案变得更加容易。 基于Docker的edgeConnector产品系列支持访问SIMATIC S7、SINUMERIK 840D和Modbus TCP等控制器中的过程数据。同样基于Docker的edgeAggregator产品可…

柔性数组【结构体和动态内存的结合】

全文目录前言柔性数组的定义语法柔性数组的特点柔性数组的使用柔性数组的优势前言 很多人可能没有听过柔性数组这个概念&#xff0c;但是在C99中柔性数组是确实存在的。我个人感觉有点像动态内存和结构体的结合。 柔性数组的定义语法 结构中的最后一个元素允许是未知大小的数…

一起学 WebGL:绘制三角形

大家好&#xff0c;我是前端西瓜哥。画了好几节课的点&#xff0c;这次我们来画三角形了。 三角形可太重要了&#xff0c;再复杂的三维模型都是由一个个小三角形组合而成&#xff0c;越多越精细越真实。 绘制三角形 这次绘制三角形&#xff0c;要绘制的点就有三个了&#xf…

C语言之 单链表1(simply linked list)

单链表 链表优点&#xff1a; 1.按需申请空间&#xff0c;需要就申请&#xff0c;不需要就释放 2.头部或中间插入数据&#xff0c;不需要挪动数据 3.不存在空间浪费 缺点&#xff1a; 1.每次存放一个数据&#xff0c;到要存一个指针去链接后面的数据节点 2.不支持随机访问&a…

让你的ChatGPT更加强大——200+小白用来解锁ChatGPT高级功能的提示(Prompts)

让你的ChatGPT更加强大——200小白用来解锁ChatGPT高级功能的提示&#xff08;Prompts&#xff09;使用说明标签筛选关键词搜索展示区复制语言切换常见问题为什么提示词用英文&#xff1f;中文搜索出错输出虚假信息提示词不好用为什么执着于 ChatGPT&#xff1f;最后参考博客其…

Vue.js 2.0 单文件组件

Vue.js 2.0 单文件组件介绍 在很多Vue项目中&#xff0c;我们使用 Vue.component 来定义全局组件&#xff0c;紧接着用 new Vue({ el: #container }) 在每个页面内指定一个容器元素。 这种方案在只是使用 JavaScript 增强某个视图的中小型项目中表现得很好。然而在更复杂的项…

对象树、QT的坐标系、信号和槽机制

目录 1、QT中什么是对象树 2、QT的坐标系 3、信号和槽机制 3.1、信号槽的理解 3.2、信号槽的工作原理 3.3、信号槽的使用 3.3.1、系统的信号和槽 3.3.2、自定义信号和槽函数 3.3.3、信号和槽函数之间的参数传递 3.3.4、信号和槽的注意 1、QT中什么是对象树 在创建 QO…

重构·改善既有代码的设计.04之重构手法(下)完结

1. 前言 本文是代码重构系列的最后一篇啦。前面三篇《重构改善既有代码的设计.01之入门基础》、《重构改善既有代码的设计.02之代码的“坏味道”》、《重构改善既有代码的设计.03之重构手法&#xff08;上&#xff09;》介绍了基础入门&#xff0c;代码异味&#xff0c;还有部…

如何在ubuntu上搭建minio

由于腾讯的对象存储服务器&#xff08;COS&#xff09;的半年免费试用期已过&#xff0c;所以寻思鼓捣一下minio&#xff0c;试着在自己的服务器上搭建一套开源的minio对象存储系统。 单机部署基本上有以下两种方式。 一、直接安装 最基础的一种单机安装&#xff0c;这里不做…