【RocketMQ】(十一)Dledger模式下的日志复制

news2025/1/16 18:02:34

RocketMQ在开启Dledger时,使用DLedgerCommitLog,其他情况使用的是CommitLog来管理消息的存储。在Dledger模式下,消息写入时Leader节点还需要将消息转发给Follower节点,有过半的节点响应成功,消息才算写入成功。

Leader消息写入

Dledger下有DLedgerMemoryStore(基于内存存储)和DLedgerMmapFileStore(基于Mmap文件映射)两种方式写入,接下来以DLedgerMmapFileStore为例,看下消息的写入过程。
Leader节点在写入前会为消息构建DLedgerEntry对象,之后本地写入以及转发给Follower节点都会使用这个对象:

  1. 进行Leader节点校验和磁盘已满校验;

  2. 在DLedgerEntry对象中设置消息的index(为每条消息进行了编号),值为ledgerEndIndex + 1,ledgerEndIndex初始值为-1,新增一条消息ledgerEndIndex的值也会增1,ledgerEndIndex是随着消息的增加而递增的,写入成功之后会更新ledgerEndIndex的值,ledgerEndIndex记录最后一条成功写入消息的index:

  3. 将消息内容写入CommitLog文件;

  4. 更新MemberState中记录的LedgerEndIndex和LedgerEndTerm的值;

等待Follower节点响应

  • pendingAppendResponsesByTerm:key为Term的值,value是一个ConcurrentHashMap,这个ConcurrentHashMap中的KEY为消息的index(每条消息的编号,从0开始,后面会提到),ConcurrentMap的KEY为消息的index,value为此条消息转发给Follower节点的异步响应对象AppendEntryResponse:

在消息写入Leader节点之后,Leader节点需要向Follwer节点转发日志,这个过程是异步处理的,会开启一个线程来进行消息转发,所以这里会先为每个请求创建异步响应对象,主要处理逻辑如下:

  1. 如果集群中只有一个节点,设置处理状态为完成并返回响应即可;
  2. 如果集群中有多个节点,由于日志转发是异步进行的,所以会先创建响应对象AppendFuture,并将创建的对象加入到pendingAppendResponsesByTerm中,pendingAppendResponsesByTerm的数据就是在这里加入的,之后有另外一个线程会处理消息转发,当消息转发成功之后会从这里取出响应对象,并将其处理状态置为完成;

Leader与Follower的日志复制

Leader消息转发与Follower的处理是单独开启线程异步进行的,主要有以下几个线程:

  1. EntryDispatcher(运行于Leader节点):用于Leader节点向Follwer节点转发日志,Leader节点会为每个Follower节点创建一个EntryDispatcher转发器,一个EntryDispatcher负责一个节点的日志转发,多个节点之间是并行处理的;
  2. EntryHandler(运行于Follower节点):用于Follower节点处理Leader节点发送的日志;
  3. QuorumAckChecker(运行于Leader节点):用于Leader节点等待Follower节点同步;

EntryDispatcher(Leader日志转发)

EntryDispatcher中会启动一个线程,用于向Follower转发日志,处理逻辑如下:

  1. 校验节点的角色是否是Leader节点;
  2. 对消息的转发类型进行判断,有以下两种状态:
    • APPEND:消息追加,用于向Follower转发消息;
    • COMPARE:消息对比,一般出现在数据不一致的情况下,需要与Follower节点的日志进行对比;

EntryDispatcher中会记录向当前的Term与Leader ID,处于以下三种条件之一,会认为集群可能发送了变化,数据处于不一致的状态,此时会将推送类型更改为COMPARE:
(1)EntryDispatcher记录的Term与MemberState中记录的不一致;
(2)EntryDispatcher记录的LeaderId为空;
(3)EntryDispatcher记录的LeaderId与MemberState中记录的不一致;

Append
  • committedIndex:得到了集群中大多数节点的响应的消息的index,记为committedIndex,committedIndex之前的消息表示都已提交,可以被消费者消费;

Append状态下,Leader节点将消息转发给Follower节点进行同步,Leader方的发送逻辑如下:

  1. 校验推送类型是否是APPEND,如果不是终止处理;

  2. writeIndex为待转发消息的Index,默认值为-1,判断是否大于LedgerEndIndex,如果大于会发送COMMIT请求到Follower节点,通知Follower节点更新committedIndex(后面再说);

这里可以看出转发日志的时候也使用了一个计数器writeIndex来记录待转发消息的index,每次根据writeIndex的值从日志中取出消息进行转发,转发成后更新writeIndex的值(自增)指向下一条数据。

  1. 向Follower发送消息转发请求并处理请求响应结果;

  2. 更新writeIndex的值,做自增操作指向下一条待转发的消息index;

消息转发请求发送与响应结果处理

pendingMap:pendingMap是一个ConcurrentMap,KEY为消息的INDEX,value为该条消息向Follwer节点转发的时间,在发送消息转发请求后会加入到pendingMap中,请求响应成功之后会从pendingMap移除;

peerWaterMarksByTerm:记录了每个Follower节点的消息复制进度,KEY为当前的Term值,VALUE是一个ConcurrentMap,ConcurrentMap中的KEY为Follower节点的ID(peerId),VALUE为该节点已经同步完毕的最新的那条消息的index;

请求发送

  1. 根据消息的index从日志获取消息内容;
  2. 构建日志转发请求,在请求中设置了消息、当前Term、Leader节点的commitIndex(最后一条得到集群中大多数节点响应的消息index)等信息
  3. 发送请求给Follower节点;
  4. 将本条消息对应的index加入到pendingMap中记录消息的发送时间(key为消息的index,value为当前时间);

响应处理

  1. 如果响应状态为SUCCESS, 表示节点写入成功,然后做如下处理:
    (1)从pendingMap中移除本条消息index的信息;
    (2)更新当前这个Follower节点的复制进度,也就是peerWaterMarksByTerm中的值;
    (3)唤醒QuorumAckChecker线程,主要是为了统计是否有过半的节点写入成功(后面再说);
  2. 如果Follower响应状态为INCONSISTENT_STATE,表示Follower节点数据出现了不一致的情况,Leader节点会将状态改为COMPARE;

这里再区分一下pendingAppendResponsesByTerm和peerWaterMarksByTerm:
pendingAppendResponsesByTerm:记录的是转发给Follower节点的消息复制请求的异步响应对象AppendEntryResponse,因为要等待集群中大多数节点的响应,所以使用了异步处理,消息写入成功之后会将处理状态置为完成。
peerWaterMarksByTerm:记录的是每个Follower节点的消息复制进度,保存的是每个节点最后一条成功写入的消息的index。

Compare

compareIndex:需要比较的那条消息的index,初始值为-1,每次更改状态为COMPARE时都会重置为-1;
truncateIndex:要删除消息的index,在数据不一致时,会发送请求通知Follower节点将数据不一致的那条消息删除;

当出现数据不一致的情况时,日志转发状态会被置为Compare,然后Leader节点会发送Compare请求,通知Follower节点进行消息对比,找到数据不一致的那条消息的index。
判断数据不一致的条件如下,满足其中之一就会被认定数据不一致:
(1)Leader节点在调用checkAndFreshState检查的时候,发现当前Term与memberState记录的不一致或者LeaderId为空或者LeaderId与memberState记录的LeaderId不一致;
(2)Follower节点在处理消息APPEND请求在进行校验的时候(Follower节点请求校验链接),发现数据出现了不一致,会在请求的响应中设置不一致的状态INCONSISTENT_STATE,通知Leader节点;

Leader节点发送Compare请求

在COMPARE状态下,向Follower节点发送比较请求的处理逻辑如下:

  1. 校验当前状态是否是COMPARE或者TRUNCATE请求,如果不是终止处理;
  2. 设置compareIndex的值:
    (1)如果compareIndex值为-1(初始值),获取LedgerEndIndex值作为compareIndex的值进行更新,从最近的那条消息开始比较;
    (2)如果compareIndex的值大于LedgerEndIndex(超过最大值)或者小于LedgerBeginIndex(低于最小值),说明值不合法,同样从最近的那条消息也就是LedgerEndIndex的位置开始比较;
  3. 根据compareIndex的值获取对应的消息内容,然后构建COMPARE请求;
  4. 向Follower节点发送COMPARE请求;
  5. 等待COMPARE请求返回响应;

Leader节点对Compare请求的响应结果处理

在Follower节点的请求响应中,会返回Follower节点最后成功写入的消息的index设置在endIndex变量中,第一条写入的消息设置在beginIndex变量中,Leader节点拿compareIndex值与其进行对比,处理逻辑如下:

  1. 请求响应码为SUCCESS:
    (1)如果compareIndex与Follower返回请求中的EndIndex相等,表示没有数据不一致的情况,将状态更改为APPEND
    (1)其他情况,将truncateIndex的值置为compareIndex;

  2. 如果endIndex小于当前节点的ledgerBeginIndex,或者beginIndex大于ledgerEndIndex,也就是follower与leader的index不相交时, 将truncateIndex设置为Leader的BeginIndex,也就通知Follower节点从Leader节点第一条消息那个位置往后删除;

  3. compareIndex比Follower的BeginIndex小,将truncateIndex设置为Leader的BeginIndex,同样通知Follower节点从Leader节点第一条消息那个位置往后删除;

  4. 其他情况,说明还未找到不一致的消息位置,将compareIndex的值减一,从上一条消息开始继续对比;

  5. 如果truncateIndex的值不为-1,调用doTruncate方法进入消息删除的处理逻辑;

在doTruncate方法中,会构建TRUNCATE请求设置truncateIndex(要删除的消息的index),发送给Follower节点,通知Follower节点从数据不一致的那条消息开始删除,如果Follower删除成功,Leader节点会将状态改为APPEND,并更新节点的复制进度为出现数据不一致的那条消息的index,同时也更新了writeIndex,下次从writeIndex处重新给Follower节点发送APPEND请求进行消息写入。

总结
在Leader节点判断数据不一致时,会向Follower节点发送COMPARE请求,请求中会携带要比较那条日志的index,通知Follower节点对此条消息进行对比,并返回对比的结果,如果Follower节点发现数据并没有不一致,那么Leader节点收到响应后就更改为APPEND状态,继续日志转发;

如果Follower节点认为数据不一致,会返回BeginIndex和EndIndex,Leader节点会拿compareIndex的值进行对比,如果不在以上情况内,compareIndex的值减一,从上一条消息开始继续对比,直到找到数据不一致的那条消息的index。

EntryHandler(Follower节点)

EntryHandler用于Follower节点处理Leader发送的消息请求,主要有四种请求类型,分别为Append、Compare、Truncate和Commit类型。

APPEND请求处理

APPEND请求处理逻辑如下:
(1)计算writeIndex的值,Follower的LedgerEndIndex记录了最后一条成功写入消息的index,对其 + 1表示下一条待写入消息的index,也就是writeIndex的值;
(2)从请求中获取消息内容,将消息写入CommitLog文件;
(3)上面说过Leader节点发送Append请求时,也会将记录的commitIndex设置到请求中,这里会从中取出commitIndex更新到Follower本地,后面讲QuorumAckChecker时候会提到;

Compare请求处理

COMPARE的请求处理逻辑如下,compareIndex为需要比较的index,处理逻辑如下:
(1)校验请求中的类型是否是COMPARE;
(2)根据compareIndex的值从本地获取消息内容;
(3)将上一步获取到的消息内容,与请求中携带的消息内容做对比,如果一致进入下一步,如果不一致会进入异常处理;
(4)构建响应体,响应状态为SUCCESS,并在响应体中设置当前节点同步的消息的BeginIndex和EndIndex;
异常处理:会返回INCONSISTENT_STATE状态,表示数据不一致,响应体中也会设置当前节点同步的消息的BeginIndex和EndIndex;

Truncate请求处理

Truncate请求用于Follower节点根据Leader节点发送的truncateIndex,将truncateIndex位置后面的消息从本地删除。

COMMIT请求处理

前面讲到Leader节点会向Follower节点发送COMMIT请求,COMMIT请求主要是更新Follower节点本地的committedIndex的值,记录集群中最新的那条获取大多数响应的消息的index,committedIndex之前的消息都已提交,已提交的消息可以被消费者消费,下面讲QuorumAckChecker的时候还会说到。
需要注意,Leader节点除了专门向Follower节点发送COMMIT请求外,Leader节点在发送Append请求时也会设置这个提交点,Follower节点处理APPEND的请求时会顺带更新。

QuorumAckChecker(Leader节点)

QuorumAckChecker用于Leader节点等待Follower节点复制完毕,处理逻辑如下:

  1. 如果pendingAppendResponsesByTerm的个数大于1,对其进行遍历,如果KEY的值与当前Term不一致,说明数据已过期,将过期数据置为完成状态并从pendingAppendResponsesByTerm中移除;

  2. 如果peerWaterMarksByTerm个数大于1,对其进行遍历,同样找出与当前TERM不一致的数据,进行清理;

以上两步主要是为了清理过期的数据。

  1. 从peerWaterMarksByTerm中获取当前Term的数据,里面记录了每个Follower节点的日志复制进度,然后对所有的复制进度进行排序,取出处于中间位置的那个进度值,也就是消息的index值,这里不太好理解,举个例子,假如一个Leader有5个Follower节点,当前Term为1:
{
   "1" : { // TERM的值,对应peerWaterMarksByTerm中的Key
    "节点1" : "1", // 节点1复制到第1条消息
    "节点2" : "1", // 节点2复制到第1条消息
    "节点3" : "2", // 节点3复制到第2条消息
    "节点4" : "3", // 节点4复制到第3条消息
    "节点5" : "3"  // 节点5复制到第3条消息
   }
}

对所有Follower节点的复制进度倒序排序之后的list如下:

[3, 3, 2, 1, 1]

取5 / 2 的整数部分为2,也就是下标为2处的值,对应节点3的复制进度(消息index为2),记录在quorumIndex变量中,节点4和5对应的消息进度大于消息2的,所以对于消息2,集群已经有三个节点复制成功,满足了集群中大多数节点复制成功的条件。

如果要判断某条消息是否集群中大多数节点已经成功写入,一种常规的处理方法,对每个节点的复制进度进行判断,记录已经复制成功的节点个数,这样需要每次遍历整个节点,效率比较低,所以这里RocketMQ使用了一种更高效的方式来判断某个消息是否获得了集群中大多数节点的响应。

  1. quorumIndex之前的消息都已经获得集群中大多数节点响应,所以此时可以更新提交点,更新当前Leader节点记录的committedIndex的值;

  2. 从pendingAppendResponsesByTerm中移除已经写入成功消息的数据,主要是清理数据;

  3. 处理pendingAppendResponsesByTerm中的超时数据,这一步主要是为了处理超时的数据;

持久化

QuorumAckChecker中可知Leader节点在某个消息的写入得到集群中大多数Follower节点的响应之后,会更新committedIndex的值,上面也提到过,Follower节点在收到Leader节点的APPEND或者Commit请求的时候,也会将请求中设置的Leader节点的committedIndex更新到本地。之后Broker停止或者FLUSH的时候,会将ledgerEndIndex和committedIndex写入到文件(ChecktPoint)进行持久化:

  • ledgerEndIndex:Leader或者Follower节点最后一条成功写入的消息的index;

  • committedIndex:如果某条消息转发给Follower节点之后得到了集群中大多数节点的响应成功,将对应的index记在committedIndex表示该index之前的消息都已提交,已提交的消息可以被消费者消费,Leader节点会将值设置在APPEND请求中发送给Follower节点进行更新或者发送COMMIT请求进行更新;

个人公众号在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1079915.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

spring学习小笔记

spring学习小笔记(1) 一、Spring开发1.1 Spring简介1.2 Spring Framework系统架构1.3 Spring Framework学习路线1.4 Spring Farmework核心概念1.5 Spring入门 二、Bean的基础配置2.1 Bean的别名配置2.2 Bean的作用范围2.3 Bean的实例化2.3.1 构造方法实例…

本地vscode安装GPU版本PyTorch

操作系统 windows, IDE环境vscode,本地GPU 可以新建一个jupyter文件,运行一些测试代码 确保装好显卡驱动 在底下调出终端窗口,默认是power shell,我喜欢用cmd窗口 激活自己的虚拟环境,输入命令 nvidia-smi 确保自己…

ctfshow-web12(glob绕过)

打开链接,在网页源码里找到提示 要求以get请求方式给cmd传入参数 尝试直接调用系统命令,没有回显,可能被过滤了 测试phpinfo,回显成功,确实存在了代码执行 接下来我们尝试读取一下它存在的文件,这里主要介…

E. Li Hua and Array

Problem - E - Codeforces 思路:观察给定的函数,其实就是求与这个数互质的数的个数,即欧拉函数,我们发现一个数迭代欧拉函数不会很多,那么对于第一个操作来说我们可以直接暴力修改,而对于第二个操作来说&am…

软件测试/测试开发丨为什么接口自动化测试是提升职业技能的关键?

接口测试背景和必要性 接口测试是测试系统组件间接口(API)的一种测试,主要用于检测内部与外部系统、内部子系统之间的交互质量,其测试重点是检查数据交换、传递的准确性,控制和交互管理过程,以及系统间相互…

ElementPlus Switch 开关基础使用

昨天开发用到开关组件 后台返回字段是 can_write 默认是0 or 1 但是Switch 组件绑定的默认值默认是 true or false 直接绑定会导致默认是关闭状态 在页面一加载 值发生变化时 会自己调用 查了文档 需要使用 active-value 和 inactive-value 来指定绑定的数据类型 …

C#,工业化软件与院校软件的对比及编程语言的选择建议

飞机发动之之一,涡轮喷气航空发动机(JET ENGINE) 火箭发动机之一,俄罗斯RD-180煤油和液氧发动机(ROCKET ENGINE) 1 飞机发动机与火箭发动机的简明对比 2 工业软件与院校软件的简单对比 除了以上类似的对比…

【java学习】方法的参数传递(21)

文章目录 相关概念1. 方法传递之基本数据类型2. 方法的参数传递之引用对象3. 总结 相关概念 方法,必须有其所在类或对象调用才有意义。若方法含有参数: 形参:方法声明时的参数 实参:方法调用时实际传给形参的参数值 问题&#xf…

数据建模设计

数据库系统——建模与设计 一、数据建模 数据库的设计不仅需要处理规则的理解,更重要的是数据需求的理解与表达。 表达计算机世界的模型称为数据模型,而表达信息世界的模型称为概念模型。抽象是具有层次的,将现实世界的问题抽象成概念模型…

[ValueError: not enough values to unpack (expected 3, got 2)]

项目场景: 在使用opencv进行关键点识别、边缘轮廓提取的时候,提示以上错误。 import cv2 import numpy as npdef preprocess(image):# 进行图像预处理(例如灰度化、高斯模糊等)gray cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)blu…

Vuex的使用,详细易懂

目录 一.前言 二.Vuex的简介 三.vuex的使用 3.1 安装Vuex 3.2 使用Vuex的步骤: 四.vuex的存值取值(改变值) 五.vuex的异步请求 好啦,今天的分享就到这啦!!! 一.前言 今天我们继续前面的E…

openGauss Meetup(天津站.10月13日),欢迎报名

由openGauss社区、天开发展集团、天津市软件行业协会、天大智图(科技)有限公司联合主办,天津鲲鹏生态创新中心、天津市计算机学会、天津市人工智能学会、天津市系统集成协会、麒麟软件有限公司、天津南大通用数据技术股份有限公司、AI知学社协办的“open…

【Vuex+ElementUI】Vuex中取值存值以及异步加载的使用

一、导言 1、引言 Vuex是一个用于Vue.js应用程序的状态管理模式和库。它建立在Vue.js的响应式系统之上,提供了一种集中管理应用程序状态的方式。使用Vuex,您可以将应用程序的状态存储在一个单一的位置(即“存储”)中,…

北斗高精度定位为无人车成为机场运营新常态提供技术保障

在现代快节奏的生活中,人们对交通效率和安全性的需求越来越高。为了满足这一需求,无人驾驶技术被广泛研究和应用。而随着北斗卫星系统的发展,机场无人车正成为潜在的未来运输解决方案。本文将深入探讨北斗卫星如何改变机场运营,以…

Vega Prime入门教程14.01:调用VAPS XT DLL

本文首发于:Vega Prime入门教程14.01:调用VAPS XT DLL 在VAPS XT系列教程中提到过Vega Prime可以直接调用Drawing Integration生成的dll,本文来测试这个功能效果。 本系列使用的是VP18.0,使用的是VC14.0(VS2015&…

java装箱和拆箱

package daysreplace;import com.sun.jdi.IntegerValue;import java.util.Arrays;public class Test {public static void main(String[] args) {//装箱:自动将基本数据类型转成包装类 基本数据类型->包装类型//拆箱:自动将包装类转成基本数据类型 包…

【LeetCode:2512. 奖励最顶尖的 K 名学生 | 模拟+哈希表+堆】

🚀 算法题 🚀 🌲 算法刷题专栏 | 面试必备算法 | 面试高频算法 🍀 🌲 越难的东西,越要努力坚持,因为它具有很高的价值,算法就是这样✨ 🌲 作者简介:硕风和炜,…

练[GYCTF2020]EasyThinking

[GYCTF2020]EasyThinking 文章目录 [GYCTF2020]EasyThinking掌握知识解题思路还得靠大佬正式开始 关键paylaod 掌握知识 ​ thinkphpV6任意文件操作漏洞,代码分析写入session文件的参数,源码泄露,使用蚁剑插件disable_functions绕过终端无回…

Android Native 开发 要点实录

Android Studio 中写 C 代码 android studio创建C项目_android studio native c-CSDN博客 项目配置参考 【CMake】CMakeLists.txt的超傻瓜手把手教程(附实例源码)_【cmake】cmakelists.txt的超傻瓜手把手教程(附实例源码)-CSDN博客 CMakeLists.txt 讲解…

一文区分路由策略和策略路由!

一、路由策略 在复杂的数据通信网络中,根据实际组网需求,往往需要实施一些路由策略对路由信息进行过滤、属性设置等操作,通过对路由的控制,可以影响数据流量转发。路由策略并非单一的技术或者协议,而是一个技术专题或…