RocketMQ源码解析-主从同步原理(HA)

news2024/12/25 10:31:02

1、关键组件

主从同步的实现逻辑主要在HAService中,在它的构造函数中实例化了几个对象同时在start()方法内执行启动:

public class HAService {
    public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
        this.defaultMessageStore = defaultMessageStore;
        this.acceptSocketService =
            new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
        this.groupTransferService = new GroupTransferService();
        this.haClient = new HAClient();
    }

    ......

    public void start() throws Exception {
        this.acceptSocketService.beginAccept();
        this.acceptSocketService.start();
        this.groupTransferService.start();
        this.haClient.start();
    }
}

首先了解一下HAService的构造函数中的内容究竟是干什么的:

  • AcceptSocketService:主要是处理从节点的连接,调用AcceptSocketService#beginAccept()方法,这一步主要是进行端口绑定,在端口上监听从节点的连接请求;调用AcceptSocketService#start()方法启动服务,这一步主要为了处理从节点的连接请求,与从节点建立连接(可以看做是运行在master节点的)。
  • GroupTransferService:主要用于在主从同步的时候,等待数据传输完毕(可以看做是运行在master节点的。
  • HAClient:里面与master节点建立连接,向master汇报主从同步进度并存储master发送过来的同步数据(可以看做是运行在slave从节点的)。

了解完HAService中的组件,而且看到在start()方法中启动了各个组件,那么HAService在何时被启动的呢?

还记得之前在记录broker时,看过BrokerController#initialize()初始化方法内,同时也构建了DefaultMessageStore对象,它作为HAService构造函数的入参,定义的start()方法中就包含HAService的启动

1).构建DefaultMessageStore以及start()启动

//BrokerController.class
public class BrokerController {
    private MessageStore messageStore;
    //broekr初始化
    public boolean initialize() throws CloneNotSupportedException {
         .......
         this.messageStore =  new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);
         .......
    }
    //borker启动
    public void start() throws Exception {
        if (this.messageStore != null) {
            this.messageStore.start();
        }
      }
    }
}

2)实例化HAServer以及start()启动

//DefaultMessageStore.class
public class DefaultMessageStore implements MessageStore {
    private final HAService haService;
    ......

    public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
        final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
        ......
        //实例化HAService
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            this.haService = new HAService(this);
        } else {
            this.haService = null;
        }
        ......
    }

public void start() throws Exception {
        ......
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            //启动HA
            this.haService.start();
            this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
        }
        ......
    }
}

2.主从同步流程 

2.1.绑定端口,监听连接请求

AcceptSocketService#beginAccept方法里面首先获取了ServerSocketChannel,然后进行端口绑定,并在selector上面注册了OP_ACCEPT事件的监听,监听从节点的连接请求:

class AcceptSocketService extends ServiceThread {
        /**
         * 监听从节点的连接
         *
         * @throws Exception If fails.
         */
        public void beginAccept() throws Exception {
            // 创建ServerSocketChannel
            this.serverSocketChannel = ServerSocketChannel.open();
            // 获取selector
            this.selector = RemotingUtil.openSelector();
            this.serverSocketChannel.socket().setReuseAddress(true);
            // 绑定端口:10912
            this.serverSocketChannel.socket().bind(this.socketAddressListen);
            // 设置非阻塞
            this.serverSocketChannel.configureBlocking(false);
            // 注册OP_ACCEPT连接事件的监听
            this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
        }
}

2.2master节点处理连接 

 因为继承了ServiceThread,所以被调用start()启动方法后,会另外开启一个线程执行run()代码,这块就是处理连接请求:

public class HAService {
    class AcceptSocketService extends ServiceThread {
        @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
            // 如果服务未停止
            while (!this.isStopped()) {
                try {
                    this.selector.select(1000);
                    // 获取监听到的事件
                    Set<SelectionKey> selected = this.selector.selectedKeys();
                    // 处理事件
                    if (selected != null) {
                        for (SelectionKey k : selected) {
                            // 如果是连接事件
                            if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                                SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
                                if (sc != null) {
                                    HAService.log.info("HAService receive new connection, "
                                        + sc.socket().getRemoteSocketAddress());
                                    try {
                                        // 创建HAConnection,建立连接
                                        HAConnection conn = new HAConnection(HAService.this, sc);
                                        // 启动
                                        conn.start();
                                        //添加连接
                                        HAService.this.addConnection(conn);
                                    }
                                    ...
                       
        }
    }
}
  1.  从selector中获取到监听到的事件;
  2. 如果是OP_ACCEPT连接事件,创建与从节点的连接对象HAConnection,与从节点建立连接,然后调用HAConnectionstart方法进行启动,并创建的HAConnection对象加入到连接集合中,HAConnection中封装了Master节点和从节点的数据同步逻辑;

2.3HAClient

HAClient同样也继承了ServiceThread

public void run() {
            log.info(this.getServiceName() + " service started");
            //是否执行
            while (!this.isStopped()) {
                try {
                    //连接Master
                    if (this.connectMaster()) {
                        //判断时间间隔是否合法
                        if (this.isTimeToReportOffset()) {
                            // 发送同步偏移量,传入的参数是当前的主从复制偏移量currentReportedOffset
                            boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                            //返回不对则关闭连接
                            if (!result) {
                                this.closeMaster();
                            }
                        }

                        ......
}
        }
2.3.1slave与主节点建立连接

connectMaster()方法执行连接主节点操作

 class HAClient extends ServiceThread {
    // 当前的主从复制进度
    private long currentReportedOffset = 0;

        private boolean connectMaster() throws ClosedChannelException {
            if (null == socketChannel) {
                String addr = this.masterAddress.get();
                if (addr != null) {
                    // 将地址转为SocketAddress
                    SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
                    if (socketAddress != null) {
                        // 连接master
                        this.socketChannel = RemotingUtil.connect(socketAddress);
                        if (this.socketChannel != null) {
                            // 注册OP_READ可读事件监听
                            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                        }
                    }
                }
                // 获取CommitLog中当前最大的偏移量
                this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                // 更新上次写入时间
                this.lastWriteTimestamp = System.currentTimeMillis();
            }
            return this.socketChannel != null;
        }
}
2.3.2处理网络可读事件

processReadEvent()方法中处理了可读事件,也就是处理Master节点发送的同步数据, 首先从socketChannel中读取数据到byteBufferRead中,byteBufferRead是读缓冲区,读取数据的方法会返回读取到的字节数,对字节数大小进行判断: 

  class HAClient extends ServiceThread {
        // 读缓冲区,会将从socketChannel读入缓冲区
        private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);

        private boolean processReadEvent() {
            int readSizeZeroTimes = 0;
            while (this.byteBufferRead.hasRemaining()) {
                try {
                    // 从socketChannel中读取数据到byteBufferRead中,返回读取到的字节数
                    int readSize = this.socketChannel.read(this.byteBufferRead);
                    if (readSize > 0) {
                        // 重置readSizeZeroTimes
                        readSizeZeroTimes = 0;
                        // 处理数据
                        boolean result = this.dispatchReadRequest();
                        if (!result) {
                            log.error("HAClient, dispatchReadRequest error");
                            return false;
                        }
                    } else if (readSize == 0) {
                        // 记录读取到空数据的次数
                        if (++readSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        log.info("HAClient, processReadEvent read socket < 0");
                        return false;
                    }
                } catch (IOException e) {
                    log.info("HAClient, processReadEvent read socket exception", e);
                    return false;
                }
            }

            return true;
        }
  }
  • 如果可读字节数大于0表示有数据需要处理,调用dispatchReadRequest方法进行处理;
  • 如果可读字节数为0表示没有可读数据,此时记录读取到空数据的次数,如果连续读到空数据的次数大于3次,将终止本次处理;
2.3.3消息写入ComitLog

dispatchReadRequest方法中会将从节点读取到的数据写入CommitLog,dispatchPosition记录了已经处理的数据在读缓冲区中的位置,从读缓冲区byteBufferRead获取剩余可读取的字节数,如果可读数据的字节数大于一个消息头的字节数(12个字节),表示有数据还未处理完毕,反之表示消息已经处理完毕结束处理。 

 private boolean dispatchReadRequest() {
            // 消息头大小
            final int msgHeaderSize = 8 + 4; // phyoffset + size
            int readSocketPos = this.byteBufferRead.position();
            // 开启循环不断读取数据
            while (true) {
                    ......
                    // 如果可读取的字节数大于一个消息头的字节数 + 消息体大小
                    if (diff >= (msgHeaderSize + bodySize)) {
                        byte[] bodyData = new byte[bodySize];
                        this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
                        this.byteBufferRead.get(bodyData);
                        // 从读缓冲区中根据消息的位置,读取消息内容,将消息追加到从节点的CommitLog中
                        HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

                        this.byteBufferRead.position(readSocketPos);
                        // 更新dispatchPosition的值为消息头大小+消息体大小
                        this.dispatchPosition += msgHeaderSize + bodySize;

                        if (!reportSlaveMaxOffsetPlus()) {
                            return false;
                        }

                        continue;
                    }
                }

                if (!this.byteBufferRead.hasRemaining()) {
                    this.reallocateByteBuffer();
                }

                break;
            }

            return true;
        }

2.4向Master发送主从同步消息拉取偏移量

HAClient#run()中与主节点建立连接后,会向主节点发送同步消息拉取偏移量,调用reportSlaveMaxOffset()

 private boolean reportSlaveMaxOffset(final long maxOffset) {
            this.reportOffset.position(0);
            this.reportOffset.limit(8); // 设置数据传输大小为8个字节
            this.reportOffset.putLong(maxOffset);// 设置同步偏移量
            this.reportOffset.position(0);
            this.reportOffset.limit(8);

            for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
                try {
                    // 向Master节点发送拉取偏移量
                    this.socketChannel.write(this.reportOffset);
                } catch (IOException e) {
                    log.error(this.getServiceName()
                            + "reportSlaveMaxOffset this.socketChannel.write exception", e);
                    return false;
                }
            }
            // 更新发送时间
            lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
            return !this.reportOffset.hasRemaining();
        }

2.5HAConnection

前面知道HAClientSlave节点会定时向Master节点汇报从节点的消息同步偏移量,那么Master节点是如何处理的呢?

 HAConnection中封装了Master节点与从节点的网络通信处理,分别在ReadSocketService(负责读Socket的服务)和WriteSocketService(负责读Socket的服务)。

暂时不做深究了有兴趣的可以去看看。这边值注意的一点是,消息消费时用的是netty,而主从同步时用的是java.nio下原生的SocketChannel 

3.有新消息写入之后的同步流程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1350588.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何本地快速部署Apache服务器并使用内网穿透工具实现公网访问内网服务

文章目录 前言1.Apache服务安装配置1.1 进入官网下载安装包1.2 Apache服务配置 2.安装cpolar内网穿透2.1 注册cpolar账号2.2 下载cpolar客户端 3. 获取远程桌面公网地址3.1 登录cpolar web ui管理界面3.2 创建公网地址 4. 固定公网地址 前言 Apache作为全球使用较高的Web服务器…

Python零基础入门技能学习,实现办公自动化与数据分析

一、教程描述 Python零基础入门&#xff0c;写下你的第一行人工智能代码&#xff0c;Python实现办公自动化&#xff0c;这是比同事早下班2小时的技能&#xff0c;Python实现网络爬虫&#xff0c;用Python抓取网页信息提升职场竞争力&#xff0c;Python实现数据分析&#xff0c…

使用 go-elasticsearch v8 基本请求

使用 go-elasticsearch 请求示例 你可以通过参考Go 官方文档找到简单的示例&#xff0c;所以我认为先看看这个是个好主意。 连接客户端有两种方式&#xff0c;如下图。 至于两者的特点&#xff0c;TypedClient有类型&#xff0c;更容易编写&#xff0c;但文档较少。另外&…

jdk更改版本

要改三个地方 1、pom.xml里 2、File -> Project Structure -> Project File -> Project Structure -> Modules 3、File -> Settings -> targrt bytecode设为1.8

C语言学习----存储类别

存储类别 &#x1f33f;本文是C Primer Pluse 中文版第12章的部分内容整理 &#x1f331;主要是围绕C中作用域 链接 存储期 展开 &#xff0c;是后面进行多文件管理的基础~ &#x1f308;概要 &#x1f34e;明确对象 变量名 标识符的基本概念和含义 &#x1f350;作用域和链接描…

西尔维斯特方程(Sylvester equation)官方求解有问题

西尔维斯特方程&#xff08;Sylvester equation&#xff09;官方求解有问题 1. 西尔维斯特方程&#xff08;Sylvester equation&#xff09;1.1 以上所有矩阵都是2x2矩阵的演算形式 2. 官方求解Sylvester方程-错误2.1 官方求解形式2.2 2x2矩阵举例 3. Nicolas Andreff作者文章O…

PostgreSQL数据库的json操作

1.操作符 select json字段::json->key值 from order -- 对象域 select json字段::json->>key值 from order -- 文本 select json字段::json#>{key值} from order -- 对象域 select json字段::json#>>{key值} from order -- 文本对象域表示还能继续操作&#…

微服务智慧工地信息化解决方案(IOT云平台源码)

智慧工地是指应用智能技术和互联网手段对施工现场进行管理和监控的一种工地管理模式。它利用传感器、监控摄像头、人工智能、大数据等技术&#xff0c;实现对施工现场的实时监测、数据分析和智能决策&#xff0c;以提高工地的安全性、效率和质量。 智慧工地平台是一种智慧型、系…

听GPT 讲Rust源代码--library/alloc

File: rust/library/alloc/benches/slice.rs 在Rust源代码中&#xff0c;rust/library/alloc/benches/slice.rs文件的作用是对&[T]类型&#xff08;切片类型&#xff09;进行性能基准测试。该文件包含了对切片类型的一系列操作的基准测试&#xff0c;例如切片迭代、切片排序…

透明OLED屏批发:从价格到产品选择的全方位指南

随着科技的进步&#xff0c;透明OLED屏作为一种创新的显示技术&#xff0c;在各个领域都展现出巨大的应用潜力。对于希望在商业或个人应用中采用透明OLED屏的企业或个人&#xff0c;批发购买可能是一个经济高效的选择。尼伽小编将从本文将为您详细解析透明OLED屏批发的各个环节…

x-cmd pkg | bit - 实验性的现代化 git CLI

目录 简介首次用户功能特点竞品和相关作品进一步探索 简介 bit&#xff0c;由 Chris Walz 于 2020 年使用 Go 语言开发&#xff0c;提供直观的命令行补全提示和建立在 git 命令之上的封装命令&#xff0c;旨在建立完全兼容 git 命令的现代化 CLI。 首次用户 使用 x bit 即可自…

EasyCode代码生成器插件

EasyCode文档&#xff1a;https://gitee.com/makejava/EasyCode/wikis/pages?sort_id725187&doc_id166248 EasyCode 优点 可以生成controller层的代码。可以一次性生成多张表的各层代码。可以自定义模板。 EasyCode使用 在插件市场下载easy code插件 在idea中进行数据…

MySQL基础学习: linux系统mysql 密码插件 validate_password安装

1、没有安装mysql密码插件&#xff0c;执行命令&#xff1a;SHOW VARIABLES LIKE ‘validate_password%’; 2、安装mysql密码插件&#xff0c;执行命令&#xff1a;install plugin validate_password soname ‘validate_password.so’; 3、再次执行&#xff1a;SHOW VARIABLE…

Docker 实践之旅:项目迁移与高效部署

目录 1 引言2 初识 Docker2.1 Docker简介2.2 Docker优势 3 传统部署流程的问题4 学习 Docker 的过程5 Docker 解决项目部署的实践5.1 迁移关键服务5.2 定制化打包与快速部署 6 项目实践收获6.1 简化了部署流程6.2 节约了部署成本 7 克服难点和经验分享7.1 版本兼容性问题7.2 网…

Python列表数据处理全攻略(七):常用内置方法轻松掌握

引言 亲爱的读者&#xff0c;你好&#xff01;Python的列表在数据结构中占据着核心地位&#xff0c;对于学习与应用Python来说&#xff0c;它无疑是我们不可或缺的得力助手。它不仅能帮助我们有效地存储和整理数据&#xff0c;还为我们提供了众多内置方法&#xff0c;让数据处…

基于RetinaFace+Jetson Nano的智能门锁系统——第二篇(配置环境)

文章目录 设备一、安装远程登录终端Xshell1.1下载Xshell1.2新建回话1.3查询ip地址1.4启动连接 二、安装远程文件管理WinScp2.1下载WinScp2.2连接Jetson Nano2.3连接成功 三、安装远程桌面VNC Viewer3.1下载VNC Viewer3.2在Jetson Nano安装VNC Viewer3.3设置VINO登录选项3.4将网…

Git原理与使用(二):分支管理

Git原理与使用[二]:分支管理 一.分支的基本操作1.理解分支2.创建分支3.切换分支4.删除分支5.补充:创建并切换分支 二.合并分支1.合并分支的基础操作2.分支冲突 三.分支管理策略1.Fast-forward模式2.--no--ff(即:禁用Fast-forward模式)3.分支策略 四.创建临时分支修复bug1.git s…

C#核心学习(面向对象)

目录 封装构造函数基本概念&#xff1a;写法&#xff1a;特殊写法注意&#xff1a; 析构函数基本语法 垃圾回收机制&#xff08;GC&#xff0c;Garbage Collector&#xff09;回收算法C# 中内存回收原理自动回收步骤注意 手动回收&#xff1a; 成员属性基本概念基本语法注意&am…

什么是软件测试?这是我听过最通俗易懂的解释

很多人总是说我要学习软件测试&#xff0c;因为他可以拿到一个不错的薪资。 但是当我问他你知道什么是软件测试吗&#xff1f;这个时候&#xff0c;他总会愣住了&#xff0c;一脸不屑的表情说着&#xff0c;不就是找bug&#xff0c;给软件找问题&#xff0c;找茬吗&#xff1f…

超好用的IDEA插件!免费

IDEA是一款功能强大的集成开发环境&#xff08;IDE&#xff09;&#xff0c;它可以帮助开发人员更加高效地编写、调试和部署软件应用程序。我们在编写完接口代码后需要进行接口调试等操作&#xff0c;一般需要打开额外的调试工具。 今天给大家介绍一款IDEA插件&#xff1a;Api…