Chapter12-主从同步机制

news2025/4/16 9:51:05

12.1 同步属性信息

        Slave 需要和 Master 同步的不只 是 消息本身,一些元数据信息也需要 同步,比如 TopicConfig 信息 、 ConsumerOffset 信息 、 DelayOffset 和SubscriptionGroupConfig 信息 。 Broker 在启动的时候,判断自己的角色是否是Slave ,是的话就启动定时同步任务

        在 syncAll 函数里,调用 syncTopicConfig ()、 syncConsumerOffset ()、
syncDelayOffset ()和 syncSubscriptionGroupConfig ()进行元数据同步 。 我们以
syncConsumerOffset 为例,来看看底层的具体实现。

        12.2 同步消息体

        本 节介绍 Master 和 Slave 之间同步消息体内容的方法,也就 是 同步CommitLog 内容的方法。 CommitLog 和元数据信息不同: 首先, CommitLog的数据量比元数据要大 ;其次 ,对实 时性和可靠性要求也不一样。 元数据信息是定时同步的 ,在两次同步的时间差里,如果出现异常可能会造成 Mastel" 上的元数据内容和 Slave 上 的元数据内容不一致 ,不过这种情况还可以补救 (手动调整 Offset ,重启 Consumer 等) 。 CommitLog 在高可靠性场景下如果没有及时同步, 一旦 Master 机器出故障, 消息就彻底丢失 了 。 所以有专 门的代码来实现Master 和Slave 之间消息体内容的同步 。

         主要 的实现代码在 Broker 模块的 org.apache.rocketmq.store.ha 包中 ,里 面包括 HAService 、 HAConnection 和 WaitNotifyObject 这三个类。

HAService 是实 现 commitLog 同步的 主体,它在 Master 机器和 Slave 机器上执行的逻辑不同, 默认是在 Master 机器上执行

        当 Broker 角色是 Slave 的时候 , MasterAddr 的值会被正确设置, 这样HAService 在启动的时候,在 HAClient 这个内部类中, connectMaster 会被正确执行 。

// org.apache.rocketmq.store.ha.DefaultHAClient#run
    public boolean connectMaster() throws ClosedChannelException {
        if (null == socketChannel) {
            String addr = this.masterHaAddress.get();
            if (addr != null) {
                SocketAddress socketAddress = NetworkUtil.string2SocketAddress(addr);
                this.socketChannel = RemotingHelper.connect(socketAddress);
                if (this.socketChannel != null) {
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                    log.info("HAClient connect to master {}", addr);
                    this.changeCurrentState(HAConnectionState.TRANSFER);
                }
            }

            this.currentReportedOffset = this.defaultMessageStore.getMaxPhyOffset();

            this.lastReadTimestamp = System.currentTimeMillis();
        }

        return this.socketChannel != null;
    }

        从代码中可以看出, HAClient 试图通过 Java NIO 函数去连接 Master 角色的 Broker 。 Master 角色有相应的监听代码。

        CommitLog 的同步,不是经过 nettycommand 的方式, 而是直接进行 TCP 连接,这样效率更高。 连接成功以后,通过对比 Master 和 Slave 的Offset ,不断进行同步。

        12.3 sync_master 和 async_master

        sync_ master 和 async_master 是 写 在 Broker 配置文件里的配置参数,这个参数影响的是主从同步的方式。 从字面意思理解, sync_master 是同步方式,也就是 Master 角色 Broker 中的消息要立刻同步过去; async_ master 是异步方式 ,也就是 Master 角色 Broker 中的消息是通过异步处理的方式 同步到 Slave 角色的机器上的 。 

    private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult result, PutMessageResult putMessageResult,
        int needAckNums) {
        if (needAckNums >= 0 && needAckNums <= 1) {
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }

        HAService haService = this.defaultMessageStore.getHaService();

        long nextOffset = result.getWroteOffset() + result.getWroteBytes();

        // Wait enough acks from different slaves
        GroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums);
        haService.putRequest(request);
        haService.getWaitNotifyObject().wakeupAll();
        return request.future();
    }

        在 CommitLog 类的 putMessage 函数末尾,调用 handleHA 函数 。 代码中的关键词是 wakeupAll 和 waitForFlush ,在同步方式下, Master 每次写消息的时候,都会等待向 Slave 同 步消息的过程 , 同步完成后再返回。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/434131.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DPDK简介

什么是DPDK 对于用户来说&#xff0c;它可能是一个性能出色的包数据处理加速软件库&#xff1b;对于开发者来说&#xff0c;它可能是一个实践包处理新想法的创新工场&#xff1b;对于性能调优者来说&#xff0c;它可能又是一个绝佳的成果分享平台。 DPDK用软件的方式在通用多…

使用git clone 拉去git仓库地址时报错 Failed to connect to github.com port 443: Timed out

问题描述 最近发现访问不了GitHub 使用ping 连接失败 ping github.com甚至连GitHub中的项目拉取不下来&#xff0c;报错信息如下&#xff1a; fatal: unable to access ‘https://github.com/josdejong/mathjs.git/’: Failed to connect to github.com port 443: Timed out …

在IDE中使用Bito - 一个不需要VPN就可以使用的chatgpt

文章目录 在IDE中使用Bito什么是Bito为什么要使用BitoBito可以做什么如何在IDE中安装Bito使用Bito 在IDE中使用Bito 什么是Bito 用他自己的介绍就是&#xff1a; Bito’s AI helps developers dramatically accelerate their impact. It’s a Swiss Army knife of capabilit…

Springboot如何启动内嵌tomcat

只需要引入如下依赖即可&#xff1a; 1.首先判断容器的类型&#xff0c;这里是servlet类型 org.springframework.boot.WebApplicationType#deduceFromClasspath 2.根据容器类型创建容器&#xff1a; org.springframework.boot.SpringApplication#createApplicationContext …

单链表面试题思路分享二

单链表面试题思路分享二 前言1.合并两个有序链表1.1 审题1.2 代码实现1.3 代码优化 2. 链表的分割2.1 审题2.2 代码实现 3. 链表的回文结构3.1 审题3.2 代码实现 4. 链表相交4.1 审题4.2 代码实现4.3 方法二的实现 5. 总结 前言 我们紧接上文单链表面试题分享一来看看本章我要分…

解决PySide6/PyQT的界面卡死问题(PySide6/PyQT多线程

前言 问&#xff1a;在使用 PySide6 时候&#xff0c;会出现应用程序卡死的问题。 答&#xff1a;为什么会出现这个问题呢&#xff1f;PySide6 应用程序是基于事件驱动的&#xff0c;主线程负责处理GUI事件。如果有耗时的操作任务&#xff0c;GUI 事件将被阻塞&#xff0c;应用…

发送封包协议实现XXZ批量秒分解装备

通过发送封包&#xff0c;我们可以让一些反复的枯燥的行为变的简单&#xff0c;高效。 比如XXZ的萃取装备&#xff0c;我们可以一瞬间萃取大量的装备&#xff0c;而省去读条的过程。 我们来萃取一下看看效果 手动萃取是有读条的&#xff0c;那么如果很多装备的话&#xff0c;…

OAuth2.0 实践 Spring Authorization Server 搭建授权服务器 + Resource + Client

title: OAuth2.0 实践 Spring Authorization Server 搭建授权服务器 Resource Client date: 2023-03-27 01:41:26 tags: OAuth2.0Spring Authorization Server categories:开发实践 cover: https://cover.png feature: false 1. 授权服务器 目前 Spring 生态中的 OAuth2 授…

ArcGISPRO 和 ChatGPT集成思路

“我们如何一起使用 ArcGIS PRO 和 ChatGPT&#xff1f;”ArcGIS Pro 是一款功能强大的桌面 GIS 软件&#xff0c;用于制图、空间分析和数据管理。ChatGPT 是一种 AI 语言模型&#xff0c;可用于自然语言处理任务&#xff0c;例如文本生成和响应。 结合使用 ArcGIS Pro 和 Chat…

工业互联网业务知识

文章目录 背景第四次工业革命带动制造业产业升级主要工业大国不同路径 架构ISA95体系架构变革趋势基础通用架构数据采集平台 工业互联网应用软件工业互联网全要素连接产品视角&#xff1a;产销服务企业的业务流程企业数字化改造&#xff1a;车间级全要素连接 工业互联网的产品体…

Perl检查环境配置

最近部署Perl环境&#xff0c;但是不确定安装完成&#xff0c;看到有个内置监测的&#xff0c;记录下 perl bin/otrs.CheckModules.pl

数据类型及变量的定义、使用和注意事项

数据类型 计算机存储单元 变量的定义格式&#xff1a; 数据类型 变量名数据值; 我们知道计算机是可以用来存储数据的&#xff0c;但是无论是内存还是硬盘&#xff0c;计算机存储设备的最小信息单元叫“位( bit ) "&#xff0c;我们又称之为“比特位”&#xff0c;通常用…

生态-化学反应

生态&#xff0c;确实需要化学反应。但是如果不知道化学反应的各种前置条件&#xff0c;化学反应是不可能反应的。所以我们需要了解这些知识&#xff0c;并且把这些知识迁移到人类社会经济活动中。最厉害的人就是&#xff1a;范式提炼-范式迁移。 老贾就是不知道这些知识&#…

Poseidon Hash

之前我们介绍了zk友好的哈希函数Anemoi&#xff0c;今天我们介绍另一种zk友好的哈希函数Poseidon Poseidon采用 sponge/squeeze 结构&#xff0c;该结构吸纳万物并生成固定大小的输出&#xff0c;内部有一个状态 S ( s 1 , s 2 , . . . , s t ) S(s_1,s_2,...,s_t) S(s1​,s2…

真题详解(UML部署图)-软件设计(五十二)

真题详解&#xff08;地址索引&#xff09;-软件设计&#xff08;五十一)https://blog.csdn.net/ke1ying/article/details/130211684 瀑布模式&#xff1a;适应 开发大型项目&#xff0c;且需求明确。 演化模式&#xff1a;适应 对软件需求缺乏准确认知。 螺旋模式&#xff…

C语言CRC-32 MPEG-2格式校验函数

C语言CRC-32 MPEG-2格式校验函数 CRC-32校验产生4个字节长度的数据校验码&#xff0c;通过计算得到的校验码和获得的校验码比较&#xff0c;用于验证获得的数据的正确性。基本的CRC-32校验算法实现&#xff0c;参考&#xff1a; C语言标准CRC-32校验函数 不同应用规范通过对输…

阿里JAVA架构师面试136题含答案:JVM+spring+分布式+并发编程

此文包含 Java 面试的各个方面&#xff0c;史上最全&#xff0c;苦心整理最全Java面试题目整理包括基JVM算法数据库优化算法数据结构分布式并发编程缓存等&#xff0c;使用层面广&#xff0c;知识量大&#xff0c;涉及你的知识盲点。要想在面试者中出类拔萃就要比人付出更多的努…

Baklib在线知识库/帮助中心:让知识无限延伸

在今天这个信息爆炸的时代&#xff0c;各行各业都需要一个高效的知识管理系统来帮助他们更好地组织和分享知识。Baklib在线知识库/帮助中心就是这样一个优秀的工具&#xff0c;它可以帮助您轻松地创建、管理和分享知识&#xff0c;让您的团队和客户更加高效地工作。 什么是Bakl…

Linux进程控制【进程程序替换】

✨个人主页&#xff1a; Yohifo &#x1f389;所属专栏&#xff1a; Linux学习之旅 &#x1f38a;每篇一句&#xff1a; 图片来源 &#x1f383;操作环境&#xff1a; CentOS 7.6 阿里云远程服务器 Good judgment comes from experience, and a lot of that comes from bad jud…

ESXI 6.7全面系统教程~汇总

ESXI 6.7全面系统教程 许可证&#xff1a;0A65P-00HD0-375M1-M097M-22P7H esxi 是一个脱机系统&#xff0c;也是一个虚拟机系统与vmware 相比&#xff0c;它可以直接运行在硬件上&#xff0c;这样可以减少资源浪费&#xff0c;一般用于服务器上&#xff1b;下面是esxi 的完整…