消息中间件之RocketMQ源码分析(四)

news2024/11/27 22:25:58

消费者的Rebalance机制

客户端是通过Rebalance服务做到高可靠的。当发生Broker掉线、消费者实例掉线、
Topic扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡的,以支持全部队列的正常消费的?

Rebalance服务的类图

在这里插入图片描述

RebalanceImpl的核心属性

在这里插入图片描述

  • ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable:记录MessageQueue和ProcessQueue的关系,MessageQueue可以简单地理解为ConsumeQueue的客户端实现;ProcessQueue是保存Pull消息的本地容器
  • ConcurrentMap<String, Set< MessageQueue >> topicSubscribeInfoTable:Topic路由信息。保存Topic和MessageQueue的关系
  • ConcurrentMap<String(Topic), SubscriptionData> subscriptionInner:真正的订阅关系,保存当前消费者组订阅了哪些Topic的哪些Tag
  • AllocateMessageQueueStrategy allocateMessageQueueStrategy:MessageQUeue消息分配策略的实现
  • MQClientInstance mQClientFactory:client实例对象

RebalanceImpl的核心方法

  • boolean lock():为MessageQueue加锁
    在这里插入图片描述

  • void doRebalance():执行Rebalance操作
    在这里插入图片描述

  • void messageQueueChanged():通知Message发生变化,这个方法在Push和Pull两个类中被重写
    在这里插入图片描述

  • boolean removeUnnecessaryMessageQueue():去掉不再需要的MessageQueue
    在这里插入图片描述

  • void dispatchPullRequest():执行消息拉取请求

+

  • boolean updateProcessQueueTableInRebalance():在Rebalance中更新processQueue
    在这里插入图片描述

Rebalance过程

在这里插入图片描述

  • 消费者实例在收到Broker通知后是怎么执行Reblance的?这个操作是通过调用
    MQClientInstance.rebalanceImmediately()来实现的
    在这里插入图片描述
  • 这种设计是RocketMQ种典型的锁方式,执行wakeup命令后,this.waitForRunning()就会暂停,再执行this.mqClientFactory.doRebalance()
    在这里插入图片描述
  • doRebalance()方法主要有以下几个步骤
    1.查找当前clientId对应的全部的消费者组,全部执行一次Rebalance.
    虽然消费者实现分别为Pull消费和Push消费两种默认实现,调用的是不同实现类的Rebalance方法,但是实现逻辑都差不多
    在这里插入图片描述
    2.判断Rebalance开关,如果没有被暂停,则调用RebalancePushImpl.rebalance()方法
    在这里插入图片描述
    3.在RebalancePushImpl.rebalance()方法中,获取当前消费者全部订阅关系中的Topic,
    循环对每个Topic进行Rebalance.待全部的Rebalance都执行完之后,将不属于当前
    消费者的队列删除
    在这里插入图片描述
    在这里插入图片描述
    4.Topic队列重新分配,这里也就是客户端Rebalance的核心逻辑之处,根据是集群消费还是广播消费分别执行MessageQueue重新分配的逻辑,以集群消费为例分析
    在这里插入图片描述
    4.1.获取当前Topic的全部MessageQueue(代码中是mqSet)和该Topic的所有消费者的clientId(代码中是cidAll),只有当两者都不为空时,才执行Rebalance
    4.2.将全部的MessageQueue(代码中时mqAll)和消费者客户端(cidAll)进行排序。
    由于不是所有消费者的客户端都能彼此通信,所以将mqAll和cidAll排序的目的在于,
    保证所有消费者客户端在做Rebalance的时候,看到的MessageQueue列表和消费者
    客户端都是一样的试图,做Rebalance时才不会分配错
    4.3.按照当前设置的队列分配策略执行Queue分配。队列分配策略接口AllocateMessageQueueStrategy,该接口中,有两个方法allocate()和getName()
    在这里插入图片描述

allocate():执行队列分配操作,该方法必须满足全部队列都能分配到消费者
getName():获取当前分配算法的名字

目前队列分配策略有五种实现:
AllocateMessageQueueAveragely:平均分配,也就是默认使用的策略(强烈推荐)
AllocateMessageQueueAveragelyByCircle:环形分配策略
AllocateMessageQueueByConfig:手动配置
AllocateMessageQueueConsistentHash:一致性Hash分配
AllocateMessageQueueByMachineRoom:机房分配策略

4.4.动态更新ProcessQueue,在队列重新分配后,当前消费者消费的队列可能不会发生变化,也可能发生变化,不管时新增加了队列需要消费,还是减少了队列,都需要执行updateProcessQueueTableInRebalance()方法来更新ProcessQueue,如果有MessageQueue不再分配给当前的消费者消费,则设置ProcessQueue.setDropped(true),表示放其当前MessageQueue的Pull消息
在这里插入图片描述

如果在重新分配MessageQueue后,新增加了MessageQueue,
则添加一个对应的ProcessQueue,查询Queue拉取位点,包装一个新的PullRequest
来Pull消息;同理如果减少了MessageQueue,则将其对应的ProcessQueue删除,
不管MessageQueue时新增还是减少,都会设置changed为True,表示当前消费者
消费的MessageQueue有变化,源码中是分别两个集合遍历来判断是新增还是减少的。
在这里插入图片描述

PullRequest初始化的具体实现,新增的PullRequest对象将被分配出去拉取MessageQueue中的消息

在这里插入图片描述
4.5.执行messageQueueChanged()方法,如果有MessageQueue订阅关系发生变化,
则更新本地订阅关系版本,修改本地消费者有限流的一些参数,然后发送心跳,
通知所有Broker,当前订阅关系发生了改变
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1425675.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

面试了字节大模型算法岗(实习),快被问哭了。。。。

最近技术群组织了一次算法面试讨论会&#xff0c;今天分享的是一位小伙子的痛苦面试经历&#xff0c;如果你想加入我们的讨论群&#xff0c;见文末。 本次分享的内容如下&#xff1a; 应聘岗位&#xff1a;字节大模型算法实习生 面试轮数&#xff1a;第一轮 整体面试感觉&…

银行数据仓库体系实践(17)--数据应用之营销分析

营销是每个银行业务部门重要的工作任务&#xff0c;银行产品市场竞争激烈&#xff0c;没有好的营销体系是不可能有立足之地&#xff0c;特别是随着互联网金融发展,金融脱媒”已越来越普遍&#xff0c;数字化营销方兴未艾&#xff0c;银行的营销体系近些年也不断发展&#xff0c…

基于SpringBoot开发的JavaWeb智能家电商城[附源码]

基于SpringBoot开发的JavaWeb智能家电商城[附源码] &#x1f345; 作者主页 央顺技术团队 &#x1f345; 欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; &#x1f345; 文末获取源码联系方式 &#x1f4dd; &#x1f345; 查看下方微信号获取联系方式 承接各种定制系统 &a…

Linux系统中的日志管理工具和技术

Linux系统中的日志管理工具和技术 在Linux系统中&#xff0c;日志管理对于系统管理员和开发人员来说至关重要。它能够帮助用户监视系统运行时的行为、故障和安全问题。以下是一些常用的日志管理工具和技术&#xff0c;包括使用logrotate和ELK堆栈。 1. 日志管理工具 - **rsys…

Flutter canvas 画一条会动的波浪线 进度条

之前用 Flutter Canvas 画过一个三角三角形&#xff0c;html 的 Canvas 也画过一次类似的&#xff0c; 今天用 Flutter Canvas 试了下 感觉差不多&#xff1a; html 版本 大致效果如下&#xff1a; 思路和 html 实现的类似&#xff1a; 也就是找出点的位置&#xff0c;使用二阶…

PySimpleGUI 综合应用|英语文本朗读以及转换为语音Mp3

PySimpleGUI 综合应用 目录 PySimpleGUI 综合应用 应用界面 完整代码 所需模块 PySimpleGUI pyttsx3 pyaudio rapidfuzz 字典格式 应用界面 完整代码 英语朗读器.pyw import PySimpleGUI as sg import pyttsx3,pyaudio,pyperclip import os,re,datetime,wave,threa…

如何分辨坏信息?

每当有社会热点&#xff0c;大家也许都会遇到一个困扰&#xff1a; 铺天盖地的信息&#xff0c;实在是太多了。究竟哪一些值得信任&#xff0c;哪些不值得信任&#xff1f;哪些可以接受&#xff0c;哪些最好保持怀疑&#xff1f; 我想用这篇文章&#xff0c;彻底把这个问题讲清…

python数据类型-元组

1 元组(tuple)的定义 元组是集合类型&#xff0c;和列表类似&#xff0c;元组中的元组类型可以不同&#xff0c;元素间用逗号隔开&#xff0c;和列表的不同之处在于&#xff1a; 1 元组的元组不可改变&#xff0c;也被称为只读列表 2 且元组用括号()表示&#xff0c;列表用方括…

HTML+JavaScript-06

节点操作 目前对于节点操作还是有些困惑&#xff0c;只是了解简单的案例 具体操作可以看菜鸟教程&#xff1a;https://www.runoob.com/js/js-htmldom-elements.html 案例-1 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8…

前端构建变更:从 webpack 换 vite

现状 这里以一个 op &#xff08;内部运营管理用&#xff09;项目为例&#xff0c;从 webpack 构建改为 vite 构建&#xff0c;提高本地开发效率&#xff0c;顺便也加深对 webpack 、 vite 的了解。 vite 是前端构建工具&#xff0c;使用 一系列预配置进行rollup 打包&#x…

获客助手助力提升企业微信添加客户的效率和精准度

数字化营销时代&#xff0c;企业微信成为企业与客户沟通的重要渠道&#xff0c;但如何快速、精准地添加潜在客户&#xff0c;一直是企业面临的一大挑战。获客助手作为一款企业微信服务商数灵通外链工具提供的营销工具&#xff0c;为企业提供了一种高效、精准的解决方案。 获客助…

【C++】 C++入门 — auto关键字

C入门 auto 关键字1 介绍2 使用细则3 注意事项 Thanks♪(&#xff65;ω&#xff65;)&#xff89;谢谢阅读下一篇文章见&#xff01;&#xff01;&#xff01; auto 关键字 1 介绍 编程时常常需要把表达式的值赋给变量&#xff0c;这就要求在声明变量时清楚地知道表达式的类…

服务网格(Service Mesh)流行工具

在这篇博客中&#xff0c;我们将介绍微服务的最佳服务网格工具列表&#xff0c;这些工具提供安全性、金丝雀部署、遥测、负载均衡等。 用于部署和操作微服务的服务网格工具的数量不断增加。在这篇文章中&#xff0c;我们将探讨您应该用来构建自己的服务网格架构的顶级服务网格…

(已解决)spingboot项目如何做QQ邮箱注册功能,如何在邮箱注册中进行随机数添加作为动态验证码,并满足分层解耦

前面我们已经完成了发送静态验证码&#xff0c;现在用随机数作为动态验证码。 文章地址&#xff1a;spingboot 后端发送QQ邮箱验证码 使用注解Component进行分层解耦加入ioc容器&#xff0c;方便调用。 package com.example.tianyidemo.utils; import org.springframework.st…

深度学习(7)--Keras项目详解(卷积神经网络)

目录 一.项目介绍 二.卷积神经网络构造 2.1.判断是否是channels first的back end 2.2.卷积层构造 2.3.添加激活函数 2.4.池化层构造 2.5.全连接FC层构造 三.完整代码 3.1.学习率衰减设置 四.首次运行结果 五.数据增强对结果的影响 六.BatchNormalization对结果的影…

MIT6.5830 实验3

前置回顾 在实验2中&#xff0c;完成了增删查改、排序、分组、聚合、连接等基本操作&#xff0c;在已提供 sql 解析器的基础上&#xff0c;能够运行进本的 sql 语句。都是逻辑层的实现&#xff0c;没有涉及物理存储方面的内容。 实验目标 实现最简单的基于锁的transaction&am…

RabbitMQ之死信交换机

前言 消息队列是分布式系统中常用的组件&#xff0c;用于异步通信、解耦和提高系统可靠性。然而&#xff0c;在实际应用中&#xff0c;难免会遇到一些异常情况&#xff0c;例如消息处理失败、超时等。为了更好地处理这些异常情况&#xff0c;死信交换机&#xff08;Dead Lette…

搭建高效企业培训平台:教育系统源码开发详解

为了更好地满足企业培训的需求&#xff0c;许多组织纷纷转向数字化教育&#xff0c;搭建高效的企业培训平台成为当务之急。本篇文章&#xff0c;小编将为您讲解教育系统源码的开发细节&#xff0c;为搭建一个功能强大、灵活高效的企业培训平台提供详尽的指南。 一、教育系统的…

“2024成都国际自动驾驶技术展览会”展示前沿技术与创新融合

近年来&#xff0c;新一轮科技革命和产业革命正向纵深发展&#xff0c;以互联网为代表的新一代信息技术与汽车产业的加速融合推动了汽车产品形态和分布的深刻变革&#xff0c;汽车已开始向大型移动智能终端的方向演变。汽车、信息、互联网等企业、研究院所、高校及各国政府纷纷…

单细胞scRNA-seq测序基础知识笔记

单细胞scRNA-seq测序基础知识笔记 scRNA-seq技术scRNA-seq 分析流程数据预处理聚类标准化数据筛选有用的数据数据降维聚类 Clustering 注释细胞类型 scRNA数据分析结尾 该笔记来源于 B站up 江湾青年 scRNA-seq技术 首先是如何测序&#xff0c;上图瓶中有很多细胞&#xff0c;…