【RocketMQ】(九)主从同步实现原理

news2025/1/4 15:51:24

RocketMQ支持集群部署来保证高可用。它基于主从模式,将节点分为Master、Slave两个角色,集群中可以有多个Master节点,一个Master节点可以有多个Slave节点。Master节点负责接收生产者发送的写入请求,将消息写入CommitLog文件,Slave节点会与Master节点建立连接,从Master节点同步消息数据。消费者可以从Master节点拉取消息,也可以从Slave节点拉取消息。

在RocketMQ 4.5版本之前,如果Master宕机,不支持自动将Slave切换为Master,需要人工介入,在4.5版本之后引入了DLedger来解决Master/Slave的主从切换问题,今天先来看主从模式下的数据同步原理。

RocketMQ主从模式下,是通过Slave节点主动向Master节点发送请求通知主节点进行数据同步的。

主从消息同步

建立连接

主节点监听连接事件

主从节点既然需要传输数据,那么肯定会先建立一个连接,所以主节点在启动的时候,会开启一个端口(haListenPort)用于监听从节点的连接请求(注册了ACCEPT连接事件的监听),默认端口是10912,当然也可以通过配置修改haListenPort的值,使用其他端口。

在端口绑定之后,主节点会专门开启一个线程,用于监听到从节点的连接事件,如果从节点发起了连接请求,会与从节点建立连接,与从节点的连接信息会封装在HAConnection类中,主节点和从节点的数据同步逻辑也在HAConnection中。

从节点发起连接请求

从节点在启动时会向主节点发起连接请求,上面说过主节点会监听从节点的连接请求,所以经过这一步主从节点的连接建立完成,建立成功后,从节点会在连接上注册READ可读事件监听,处理连接上的可读事件。

消息同步

从节点主从同步处理

从节点主从同步的逻辑主要在HAClient中,它开启了一个线程处理主从同步,只要Slave节点未停止,就会不断循环进行如下处理:

  1. 从节点会定时向主节点汇报消息同步的偏移量(同步进度),所以每次循环开始都会判断是否需要向主节点发送消息同步偏移量,如果已经有一段时间内没有向主节点汇报,此时就会向主节点发送消息同步偏移量,告诉主节点现在同步到哪条消息;
  2. 等待与Master节点建立的连接上产生可读事件;
  3. 处理可读事件,主要是判断Master节点是否发来了数据,如果Master节点发送了数据,就要从网络中读取数据,将读取到的消息内容写到从节点自己的CommmitLog。
主节点处理从节点发送请求
读事件处理(处理从节点发送的请求)

上面说到从节点会定时向主节点汇报消息同步的进度,主节点会开启一个线程专门处理监听到的可读事件,也就是处理从节点发来的请求,处理逻辑在ReadSocketService中。
主节点会将从节点发送的消息同步偏移量记录在slaveAckOffset中,表示从节点已经同步的消息位置,同时也会将消息同步偏移量更新到push2SlaveMaxOffset中,它代表了主节点向从节点推送消息的偏移量。slaveRequestOffset的值如果小于0,也会将其更新为从节点反馈的同步偏移量。

这里再对比一下这三个变量的值,之后主节点向从节点发送消息数据会使用slaveRequestOffset来判断是否需要向从节点推送数据:

  • slaveAckOffset从节点响应的同步消息的偏移量,记录从节点已经同步的消息位置,每次收到从节点反馈的同步偏移量都会对这个值进行更新;

  • slaveRequestOffset默认值为-1,此时表示还未收到从节点反馈的消息偏移量,在收到从节点发送的消息同步偏移量之后,如果slaveRequestOffset的值小于0才会对其进行更新,也就是主节点首次收到从节点的反馈进度或者主节点重启等原因值又被恢复成了默认值-1再次收到反馈进度才会更新,之后不会对其进行更新;

  • push2SlaveMaxOffset默认值为0,在收到从节点反馈的消息偏移量时,会对该值进行更新,与slaveRequestOffset不同的是它每次收到从节点反馈的时候都会更新,表示主节点向从节点推送消息的偏移量;

写事件处理(向从节点发送消息数据)

主节点同样开启了一个线程来处理网络中的写事件,主节点向从节点发送同步消息数据的处理就是在这里进行的,它也会开启一个循环,只要主节点为停止服务,就不断进行如下处理:

  1. 首先根据slaveRequestOffset的值判断是否需要进行推送,有以下两种情况:

    • slaveRequestOffset值为-1(默认值),表示还未收到过从节点反馈的消息偏移量,所以此时会睡眠一段时间等待从节点发送消息拉取偏移量;
    • slaveRequestOffset值不为-1表示已经接收到过从节点反馈的消息偏移量(上面提到从节点向主节点反馈同步进度之后,主节点会更新这个值),此时进入下一步;
  2. 判断nextTransferFromWhere的值(默认值-1),它是主节点中记录的下次需要传输的消息在CommitLog文件中的偏移量,如果值不为-1表示已经进行过数据同步,此时可以进入下一步。这里我们看下值为-1也就是首次进行主从同步的情况:

    • slaveRequestOffset为0,表示从节点向主节点发送的消息同步偏移量为0,也就是从节点还未同步到消息,本次是首次进行同步,那么就从主节点当前CommitLog文件记录的最新的那条消息开始同步,此时更新nextTransferFromWhere的值为当前CommitLog的最大的那个偏移量,然后进入下一步;

      每个CommitLog文件大小为1G,所以可能会有多个CommitLog文件,首次进行主从同步的时候从最近那个也就是当前正使用的那个CommitLog文件中的消息开始进行同步;

    • slaveRequestOffset大于0,表示从节点之前已经同步过消息,那么就从反馈的位置处开始消息同步,也就是之前同步到哪个消息了,就从那个消息继续往后同步,此时将nextTransferFromWhere的值更新为slaveRequestOffset的值,然后进入下一步;

      这一步主要是对nextTransferFromWhere的值进行处理。

  3. 判断上次向从节点发送的消息是否已经传输完毕(有可能网络等原因数据还在发送中):

    • 如果数据都已经发送完毕,会判断距离上次发送数据的时间间隔是否超过了设置的心跳时间,如果超过,为了避免连接空闲被关闭,需要发送一个心跳包,维护长连接;
    • 如果上次发送的数据还在传输中,会继续先传输上次同步的数据;
  4. 根据nextTransferFromWhere的值从CommitLog中获取本次要同步的消息内容;

  5. 更新nextTransferFromWhere的值为下次发送消息的偏移量;

  6. 将第4步中获取到的消息内容,每次最大发送32KB的数据,发送给从节点,进行数据同步;

从节点对收到消息的处理
在从节点主从同步处理一节中,提到从节点会开启一个线程处理可读事件,当主节点向从节点推送消息数据进行同步后,从节点监听到可读事件,就会从请求中获取发送的消息数据,进行同步:

  1. 从缓冲区中读取数据,首先获取到的是消息在master节点的物理偏移量masterPhyOffset;
  2. 获取从节点当前CommitLog的最大物理偏移量slavePhyOffset,如果不为0并且不等于masterPhyOffset,表示与Master节点的传输偏移量不一致,也就是数据不一致,此时终止处理;
  3. 计算消息体在读缓冲区中的起始位置,从读缓冲区中根据起始位置,读取消息内容,将消息追加到从节点的CommitLog中;
  4. 继续处理下一条消息直到请求中的消息处理完毕;

从节点会监听到网络中的可读数据,收到消息后将消息写入从节点的CommitLog中。

等待主从复制传输结束

SYNC_MASTER同步复制:消息写入主节点之后,需要等待从节点也写入完毕才能返回成功。

ASYNC_MASTER异步复制:消息写入主节点之后即可返回成功,主从同步数据异步进行,不需要等待从节点写入完毕即可返回成功。

当主从同步开始之后,如果有新的消息写入主节点的CommitLog,如果Master节点配置的是SYNC_MASTER同步复制,在消息写入主节点之后还需要等待从节点同步完毕,主节点会开启一个线程,可以记作数据同步判断线程(GroupTransferService中实现),它专门来判断数据是否同步完毕。

首先消息在写入CommitLog之后会构建一个消息提交请求GroupCommitRequest,请求中会携带本次消息写入之后的偏移量,将其提交到一个求集合requestsRead中,这个线程可以记作主线程,然后主线程会唤醒数据同步判断线程来判断数据是否同步完毕,之后主线程进入等待状态。

在数据同步判断线程中,它会对消息提交请求集合requestsRead中的每一个请求进行处理,开启循环做如下处理:

  1. push2SlaveMaxOffset记录了从节点已经同步的消息偏移量,将push2SlaveMaxOffset与本次消息提交请求的偏移量作对比:
    • 如果push2SlaveMaxOffset值大,说明当前提交请求中的消息已经同步完毕,此时进入第2步唤醒正在等待的主线程,继续执行主线程的处理逻辑;
    • 如果push2SlaveMaxOffset值比请求中的偏移量小,表示这条消息还未同步到从节点,此时当前线程会等待一段时间再进行判断,直到数据已经同步到从节点或者超时;
  2. 唤醒主线程;

主从模式下的消息消费

在主从模式下,消费者向Broker发送拉取消息请求后,Broker对拉取请求进行处理时会设置一个broker ID,建议消费者下次从这个Broker拉取消息,接下来看下Broker是根据什么条件决定返回哪个Broker ID的。

Broker在处理消费者拉取请求时,获取消息后会在返回结果中设置一个是否建议从Slave节点拉取值放在isSuggestPullingFromSlave这个变量中,这个值的判断方式如下:
diff:当前Broker的CommitLog最大偏移量减去本次拉取消息的最大物理偏移量,表示剩余未拉取的消息
memory:消息在PageCache中的总大小,计算方式是总物理内存 * 消息存储在内存中的阀值(默认为40)/100,也就是说MQ会缓存一部分消息在操作系统的PageCache中,加速访问;

如果dif大于memory的值,表示未拉取的消息过多,已经超出了PageCache缓存的数据的大小,还需要从磁盘中获取消息,所以此时会建议下次从Slave节点拉取,将isSuggestPullingFromSlave的值置为true,否则为false。

订阅分组配置
mqadmin命令的-i参数可以指定从哪个Broker消费消息(对应SubscriptionGroupConfig中的brokerId变量,默认是MASTER节点的ID);
-w参数可以指定建议从slave节点消费的时候,从哪个slave消费(对应SubscriptionGroupConfig中的whichBrokerWhenConsumeSlowly变量,默认值为1);

接下来会用到以上两个参数。

Broker获取到isSuggestPullingFromSlave的值之后,在构建返回结果时,会根据isSuggestPullingFromSlave的值进行以下处理:

  1. 如果建议从slave节点拉取消息(isSuggestPullingFromSlave为true),会获取订阅分组配置中设置的whichBrokerWhenConsumeSlowly的值(默认-1)作为建议拉取消息的Broker ID,否则下次依旧建议从主节点拉取消息,将MASTER节点的ID设置到响应中;
  2. 如果当前Broker的角色是slave节点,并且配置了不允许从slave节点读取数据(SlaveReadEnable = false),此时依旧建议从主节点拉取消息,将MASTER节点的ID设置到响应中;
  3. 如果开启了允许从slave节点读取数据(SlaveReadEnable = true),有以下两种情况:
    • isSuggestPullingFromSlave为true,表示建议从slave节点拉消息,会使用订阅分组配置中设置的whichBrokerWhenConsumeSlowly的值(默认-1)作为建议拉取消息的Broker ID;
    • isSuggestPullingFromSlave为false,表示不建议从slave节点拉取消息,会从订阅分组配置中获取brokerId(默认值为Master节点ID)的值作为建议拉取消息的Broker ID;

当然,如果未开启允许从Slave节点读取数据,下次依旧建议从Master节点拉取;

总结
默认情况下,消费者从Master节点拉取消息,Broker在处理消息拉取时会根据消息的拉取进度,进行判断,如果未拉取消息的大小超过了总物理内存的40%,此时会建议消费者从Slave节点拉取消息,Broker会将下次建议拉取消息的BrokerID,设置到响应中返回给消费者。

从Slave节点拉取消息,需要开启配置项SlaveReadEnable,可以通过mqadmin命令更改订阅分组中的brokerId(默认值为Master节点ID)和whichBrokerWhenConsumeSlowly(默认-1),如果未设置使用默认值。

如果未开启SlaveReadEnable,依旧会从Master节点拉取消息;

消费进度管理

消费者会优先选择向主节点发送请求进行消费进度保存,假如主节点宕机等原因未能获取到主节点的信息,会迭代集合选择第一个节点返回,所以消费者也可以向从节点发送请求进行进度保存,待主节点恢复后,依旧优先选择主节点。

消费进度同步

从节点在启动时会注册定时任务,定时进行数据同步:

  1. 从节点向主节点发送请求获取消费进度数据;
  2. 从节点将获取到的消费进度数据进行持久化;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1044835.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构】顺序栈及其基本操作

顺序栈 栈的数组实现(创建) 栈的初始化栈的增加(压栈)栈的删除(弹栈)栈的查询栈的判空和判满 1. 顺序栈 栈是一种数据结构,其主要特点是后进先出,相当于我们在瓶子里面放东西&am…

golang工程——protobuf使用及原理

相关文档 源码:https://github.com/grpc/grpc-go 官方文档:https://www.grpc.io/docs/what-is-grpc/introduction/ protobuf编译器源码:https://github.com/protocolbuffers/protobuf proto3文档:https://protobuf.dev/programmin…

【信创】麒麟v10(arm)-mysql8-mongo-redis-oceanbase

Win10/Win11 借助qume模拟器安装arm64麒麟v10 前言 近两年的国产化进程一直在推进,基于arm架构的国产系统也在积极发展,这里记录一下基于麒麟v10arm版安装常见数据库的方案。 麒麟软件介绍: 银河麒麟高级服务器操作系统V10 - 国产操作系统、银河麒麟、中…

Tomcat(HTTP服务器)下载以及认识

Tomcat是java程序员写网页后端所用到的一个经典工具 一. 搜索Tomcat找到官网,在Download下找到Tomcat8(虽然已经有了更新的版本,但经典版的更稳定) 二. 找到Core,点击zip便能下载Tomcat的压缩包(完全绿色&…

使用API Monitor工具巧妙探测C++程序中监听某端口的模块

目录 1、问题说明 2、API Monitor工具介绍 2.1、API Monitor主要用途 2.2、如何使用API Monitor工具 3、使用API Monitor监测程序对bind函数的调用,定位启用2620端口的模块 3.1、为啥要监控socket API函数bind 3.2、编写演示代码进行说明 3.3、使用API Moni…

COTS即Commercial Off-The-Shelf 翻译为“商用现成品或技术”或者“商用货架产品”

COTS 使用“不再做修理或改进”的模式出售的商务产品 COTS即Commercial Off-The-Shelf 翻译为“商用现成品或技术”或者“商用货架产品”,指可以采购到的具有开放式标准定义的接口的软件或硬件产品,可以节省成本和时间。 中文名 商用现成品或技术 外文…

垂直行业大模型“封神”背后,AI数据服务走入“深水区”

图源:Unsplash 文 | 智能相对论 作者 | 沈浪 由ChatGPT掀起的这股大模型浪潮,从通用领域席卷垂直领域。现阶段,越来越多的行业都在开发专用垂直细分赛道的大模型产品,以加速AI应用的场景化落地进程。 譬如,在电商领…

MASA MAUI iOS 文件下载与断点续传

文章目录 背景介绍方案及代码1、新建MAUI项目2、建立NSUrlSession会话连接3、使用NSUrlSessionDownloadTask 创建下载任务4、DidWriteData 监听下载5、DidFinishDownloading 完成下载6、CancelDownload (取消/暂停)下载7、ResumeDownload 恢复下载8、杀死进程-恢复下载 效果图总…

界面组件DevExpress WPF v23.2新功能预览 - 更轻量级的主题

本文主要描述了DevExpress WPF即将在几个月之后发布的v23.2中包含的新功能,持续关注我们获取更多最新资讯哦~ P.S:DevExpress WPF拥有120个控件和库,将帮助您交付满足甚至超出企业需求的高性能业务应用程序。通过DevExpress WPF能创建有着强…

手把手带你完成安卓登录修改的案例

新建一个空的项目 到这里我们的项目就搭建完成了,接下来就来编写app的页面 提前准备 1.新建一个 登录 的java和xml文件 2.同样的步骤新建一个忘记登录的页面 3.创建一个工具类,这个工具来主要是用来隐藏软键盘的 import android.app.Activity; import an…

【斯坦福cs324w】中译版 大模型学习笔记十 环境影响

环境影响 温室气体排放水足迹:数据中心使用水进行冷却;发电需要用水释放到环境中的化学物质很多是对人类有害的 如何计算数据中心能源消耗 简单表示形式 模型训练过程 参考资料 datawhale so-large-lm学习资料

AWS-Lambda之导入自定义包-pip包

参考文档: https://repost.aws/zh-Hans/knowledge-center/lambda-import-module-error-python https://blog.csdn.net/fxtxz2/article/details/112035627 单来说,以 " alibabacloud_dyvmsapi20170525 " 包为例 mkdir /tmp cd ./tmp mkdir python pip ins…

idea开发Springboot出租车管理系统VS开发mysql数据库web结构java编程计算机网页源码maven项目

一、源码特点 springboot 出租车管理系统是一套完善的完整信息系统,结合springboot框架和bootstrap完成本系统,对理解JSP java编程开发语言有帮助系统采用springboot框架(MVC模式开发), 系统具有完整的源代码和数据…

Mysql基础【操作表中数据入门查询】

一、DML🍓 DML主要是对数据进行增(insert)删(delete)改(update)操作。 (一)、添加数据🥝 语法格式: insert into 表名 (字段名1,字段名2...&a…

2024年【MCM/ICM】美国大学生数学建模竞赛优秀论文(免费下载)

一、前言 美国大学生数学建模竞赛(MCM/ICM)由美国数学及其应用联合会主办,是最高的国际性数学建模竞赛,也是世界范围内最具影响力的数学建模竞赛,一般也指数学建模竞赛。赛题内容涉及经济、管理、环境、资源、生态、医…

APP开发费用估算方法

估算APP开发费用是一个重要的项目管理步骤,它有助于确定项目的总成本,并帮助您在项目规划阶段做出决策。APP开发费用估算的方法可以根据项目的规模、复杂性、功能和技术选择而异,以下是一些常见的APP开发费用估算方法,希望对大家有…

Vue3最佳实践 第五章 Vue 组件应用 1( Props )

本章带领大家理解组件、props、emits、slots、providers/injects,Vue 插件 等Vue组件使用的基础知识。 5.1 组件注册5.2 Props5.2.1 组件之间如何传值5.2.2 参数绑定 v-bind5.2.3 参数类型5.2.4 props 默认与必填5.2.5 验证设置5.2.6 useAttrs 属性设置 第一章 Vue3…

regexp 应用

今天同事拿出个小栗子 1 如果用like的话 1,22 的情况会被字符串2匹配到这样会有问题 这里需要用concat将uids处理下 比如第一条处理成,1,2,3, 的形式 去模糊匹配 ‘%,1,%’ 当然like这种模糊匹配不太建议使用 2 regexp 用法 单个值 &#x…

SpringBoot的excel模板导出

Word的模板导出(参考:https://easyexcel.opensource.alibaba.com/docs/current/quickstart/fill) 创建有两个sheet的excel文件模板 将模板文件放入resource\templates/doc下使用 public void exportUavInfoExcel(HttpServletResponse response, CaseExportRPO cas…

如何更改注册表使系统暂停更新时间延长

1、创建一个文本文件,命名为:“stopupdate.reg”,然后用记事本或者代码编辑器打开,复制以下代码: Windows Registry Editor Version 5.00[HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\WindowsUpdate\UX\Settings] "F…