RocketMQ5.0.0的Broker主从同步机制

news2024/9/28 7:17:00

目录

一、主从同步工作原理

1. 主从配置

2. 启动HA

二、主从同步实现机制

1. 从Broker发送连接事件

2. 主Broker接收连接事件

3. 从Broker反馈复制进度

4. ReadSocketService线程读取从Broker复制进度

5. WriteSocketService传输同步消息

6. GroupTransferService线程通知HA结果

        1):待需要HA的消息集合

        2):通知消息发送者线程

三、读写分离机制

四、参考资料


一、主从同步工作原理

        为了提高消息消费的高可用性,避免Broker发生单点故障引起存储在Broker上的消息无法及时消费, RocketMQ引入Broker主备机制,即:消息消费到达主服务器后需要将消息同步到消息从服务器,如果主服务器Broker宕机后,消息消费者可以从从服务器拉取消息。

        下图所示是Broker的HA交互机制流程图及类图。主从同步模式分为:同步、异步。

  • step1:主服务器启动,并在特定端口上监听从服务器的连接;
  • step2:从服务器主动连接主服务器,主服务器接收客户端的连接,并建立相关TCP连接;
  • step3:从服务器主动向主服务器发送待拉取消息偏移量,主服务器解析请求并返回消息给从服务器;
  • step4:从服务器保存消息并继续发送新的消息同步请求。

1. 主从配置

        参考rocketmq-distribution项目的conf目录下有:2主2从异步HA配置(2m-2s-async)、2主2从同步HA配置(2m-2s-sync)。以下1主1从异步HA配置实例如下。

        主Broker配置:

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 启动SQL过滤
enablePropertyFilter = true

namesrvAddr=192.168.1.55:9876;172.17.0.3:9876
brokerIP1=192.168.1.55

        从Broker配置:

        注意:brokerName与主机相同;brokerId > 0时,则为从,0时则为主;brokerRole角色为SLAVE(从),刷盘类型为ASYNC_FLUSH(异步刷盘)。

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
# 启动SQL过滤
enablePropertyFilter = true

namesrvAddr=192.168.1.55:9876;172.17.0.3:9876

2. 启动HA

        org.apache.rocketmq.store.DefaultMessageStore#start是Broker启动方法,如下图所示是其调用链及相关HA部分代码。 

/**
 * broker启动时,消息存储线程
 * BrokerController#startBasicService()
 * @throws Exception
 */
@Override
public void start() throws Exception {
    // 是否HA主从复制
    if (!messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
        this.haService.init(this);
    }

    ......

    if (this.haService != null) {
        this.haService.start();
    }

    ......
}

        org.apache.rocketmq.store.ha.DefaultHAService#init是HAService初始化方法,如下代码所示。注意,从Broker的broker.conf配置的brokerRole为SLAVE,才能创建HAClient(从Broker注册到主Broker)

@Override
public void init(final DefaultMessageStore defaultMessageStore) throws IOException {
    this.defaultMessageStore = defaultMessageStore;
    this.acceptSocketService = new DefaultAcceptSocketService(defaultMessageStore.getMessageStoreConfig());
    this.groupTransferService = new GroupTransferService(this, defaultMessageStore);
    if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
        this.haClient = new DefaultHAClient(this.defaultMessageStore);
    }
    this.haConnectionStateNotificationService = new HAConnectionStateNotificationService(this, defaultMessageStore);
}

        org.apache.rocketmq.store.ha.DefaultHAService#start是HAService启动方法。注意:

  • org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService:主Broker接收从Broker的连接事件;
  • org.apache.rocketmq.store.ha.GroupTransferService:负责主Broker向从Broker发送同步数据;
  • org.apache.rocketmq.store.ha.HAClient:从Broker向主Broker发送连接事件;

        Broker启动时,根据配置brokerRole配置(ASYNC_MASTER、SYNC_MASTER、SLAVE)判定Broker是主还是从。若是Slave角色,在broker配置文件中获取haMasterAddress,并更新至masterAddress;但是haMasterAddress配置为空,则启动成功,但是不会执行HA

/**
 * 启动HAService
 * step1:{@link AcceptSocketService}接收从Broker的注册事件,方法是{@link AcceptSocketService#beginAccept()}
 * step2:启动{@link AcceptSocketService}线程,监听从Broker发送心跳
 * step3:同步数据{@link GroupTransferService}线程启动,主Broker向从Broker发送数据
 * step4:启动从Broker{@link HAClient}发送心跳到主Broker
 */
@Override
public void start() throws Exception {
    // 主接收从Broker的连接事件,SelectionKey.OP_ACCEPT(连接事件)
    this.acceptSocketService.beginAccept();
    // 启动主Broker线程
    this.acceptSocketService.start();
    // 主Broker同步数据线程启动
    this.groupTransferService.start();
    this.haConnectionStateNotificationService.start();
    // 启动从Broker{@link HAClient}发送心跳到主Broker
    if (haClient != null) {
        this.haClient.start();
    }
}

二、主从同步实现机制

1. 从Broker发送连接事件

        org.apache.rocketmq.store.ha.DefaultHAClient是从Broker向主Broker的发送连接事件的核心类,是个线程。其主要属性如下代码所示。

// Socket读缓存区大小,4M
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
// 主Broker地址
private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
private final AtomicReference<String> masterAddress = new AtomicReference<>();
// 从Broker向主Broker发起HA的偏移量
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
// 网络传输通道
private SocketChannel socketChannel;
// 事件选择器
private Selector selector;
/**
 * 上次读取主Broker的时间戳
 * last time that slave reads date from master.
 */
private long lastReadTimestamp = System.currentTimeMillis();
/**
 * 上次写入主Broker的时间戳
 * last time that slave reports offset to master.
 */
private long lastWriteTimestamp = System.currentTimeMillis();

// 反馈HA的复制进度(从Broker的Commitlog文件的最大偏移量)
private long currentReportedOffset = 0;
// 本次处理读缓存区的指针
private int dispatchPosition = 0;
// 读缓存区
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
// 读缓存区备份,与byteBufferRead交换
private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private DefaultMessageStore defaultMessageStore;
// HA连接状态
private volatile HAConnectionState currentState = HAConnectionState.READY;
// 流监控
private FlowMonitor flowMonitor;

        org.apache.rocketmq.store.ha.DefaultHAClient#run是HAClient启动执行任务,其调用链和代码如下。

  • DefaultHAClient#connectMaster():从Broker连接到主Broker
  • DefaultHAClient#transferFromMaster():向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中。  

/**
 * 启动HAClient
 * {@link DefaultHAClient#connectMaster()}:从Broker连接到主Broker
 * {@link DefaultHAClient#transferFromMaster()}:向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中
 */
@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    this.flowMonitor.start();

    while (!this.isStopped()) {
        try {
            switch (this.currentState) {
                case SHUTDOWN:
                    return;
                case READY:
                    if (!this.connectMaster()) {
                        log.warn("HAClient connect to master {} failed", this.masterHaAddress.get());
                        this.waitForRunning(1000 * 5);
                    }
                    continue;
                case TRANSFER:
                    // 向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中
                    if (!transferFromMaster()) {
                        // 没有可拉取消息时,设置READY状态
                        closeMasterAndWait();
                        continue;
                    }
                    break;
                default:
                    this.waitForRunning(1000 * 2);
                    continue;
            }
            long interval = this.defaultMessageStore.now() - this.lastReadTimestamp;
            if (interval > this.defaultMessageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {
                log.warn("AutoRecoverHAClient, housekeeping, found this connection[" + this.masterHaAddress
                    + "] expired, " + interval);
                this.closeMaster();
                log.warn("AutoRecoverHAClient, master not response some time, so close connection");
            }
        } catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.closeMasterAndWait();
        }
    }

    log.info(this.getServiceName() + " service end");
}

        注意,一旦HAClient线程启动后,在状态READY、TRANSFER来回变化,READY状态下:发送从Broker连接事件到主Broker,开启Socket连接;TRANSFER状态下:主从发送相关数据信息,如:从向主发送HA复制进度(currentReportedOffset,即:从Broker的Commitlog文件的最大偏移量);主向从发送同步消息

        org.apache.rocketmq.store.ha.DefaultHAClient#connectMaster是从Broker连接到主Broker的核心方法,其代码如下。

/**
 * 从Broker连接到主Broker
 * 注意:
 *  a. Broker启动时,若是Slave角色,从broker配置文件中获取haMasterAddress,并更新至masterAddress;
 *  b. 若是Slave角色,但是haMasterAddress配置为空,则启动成功,但是不会执行HA
 * @return true连接成功;false连接失败
 */
public boolean connectMaster() throws ClosedChannelException {
    if (null == socketChannel) {
        // 获取主Broker地址
        String addr = this.masterHaAddress.get();
        if (addr != null) {
            // 根据地址创建SocketAddress对象
            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            // 获取SocketChannel
            this.socketChannel = RemotingUtil.connect(socketAddress);
            if (this.socketChannel != null) {
                // SocketChannel注册OP_READ(网络读事件)
                this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                log.info("HAClient connect to master {}", addr);
                this.changeCurrentState(HAConnectionState.TRANSFER);
            }
        }

        // 获取Commitlog最大偏移量(HA同步进度)
        this.currentReportedOffset = this.defaultMessageStore.getMaxPhyOffset();

        this.lastReadTimestamp = System.currentTimeMillis();
    }

    return this.socketChannel != null;
}

2. 主Broker接收连接事件

        org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService是主Broker接收从Broker连接事件的实现类,是一个线程。其主要属性如下代码所示。

// 主Broker监听本地的Socket(本地IP + 端口号)
private final SocketAddress socketAddressListen;
// Socket通道,基于NIO
private ServerSocketChannel serverSocketChannel;
// 事件选择器,基于NIO
private Selector selector;

        org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#beginAccept方法定义了主Broker监听从Broker的连接事件

/**
 * 启动监听从broker的连接
 * Starts listening to slave connections.
 *
 * @throws Exception If fails.
 */
public void beginAccept() throws Exception {
    this.serverSocketChannel = ServerSocketChannel.open();
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true); // TCP可重复使用
    this.serverSocketChannel.socket().bind(this.socketAddressListen); // 绑定监听端口
    if (0 == messageStoreConfig.getHaListenPort()) {
        messageStoreConfig.setHaListenPort(this.serverSocketChannel.socket().getLocalPort());
        log.info("OS picked up {} to listen for HA", messageStoreConfig.getHaListenPort());
    }
    this.serverSocketChannel.configureBlocking(false); // 非阻塞模式
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); // 注册OP_ACCEPT(连接事件)
}

        org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#run监听到从Broker连接事件的任务处理,为每个连接事件创建org.apache.rocketmq.store.ha.HAConnection对象并启动(负责M-S的数据同步逻辑)

/**
 * 标准的NIO连接处理方式
 * step1:选择器每1s处理一次连接就绪事件
 * step2:是否连接事件,若是,创建{@link SocketChannel}
 * step3:每一个连接创建{@link HAConnection}(负责M-S的数据同步逻辑)
 */
@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // 选择器每1s处理一次连接就绪事件
            this.selector.select(1000);
            Set<SelectionKey> selected = this.selector.selectedKeys();

            if (selected != null) {
                for (SelectionKey k : selected) {
                    // 是否连接事件
                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                        // 若是连接事件时,创建SocketChannel
                        SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

                        if (sc != null) {
                            DefaultHAService.log.info("HAService receive new connection, "
                                + sc.socket().getRemoteSocketAddress());
                            try {
                                // 每一个连接创建HAConnection并启动(负责M-S的数据同步逻辑)
                                HAConnection conn = createConnection(sc);
                                conn.start();
                                DefaultHAService.this.addConnection(conn);
                            } catch (Exception e) {
                                log.error("new HAConnection exception", e);
                                sc.close();
                            }
                        }
                    } else {
                        log.warn("Unexpected ops in select " + k.readyOps());
                    }
                }

                selected.clear();
            }
        } catch (Exception e) {
            log.error(this.getServiceName() + " service has exception.", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

        org.apache.rocketmq.store.ha.DefaultHAConnection创建并启动时,启动读、写线程服务。其关键属性如下代码所示。

  • private WriteSocketService writeSocketService:主Broker向从Broker写数据服务类
  • private ReadSocketService readSocketService:主Broker读取从Broker数据服务类
private final DefaultHAService haService;
private final SocketChannel socketChannel;
// HA客户端连接地址
private final String clientAddress;
// 主Broker向从Broker写数据服务类
private WriteSocketService writeSocketService;
// 主Broker读取从Broker数据服务类
private ReadSocketService readSocketService;
private volatile HAConnectionState currentState = HAConnectionState.TRANSFER;
// 从Broker请求拉取的偏移量
private volatile long slaveRequestOffset = -1;
// 从Broker反馈已完成的偏移量
private volatile long slaveAckOffset = -1;
private FlowMonitor flowMonitor;

3. 从Broker反馈复制进度

        org.apache.rocketmq.store.ha.DefaultHAClient#transferFromMaster是从Broker与主Broker传输数据的核心方法,代码如下所示,该方法有两大功能:

  • 从Broker向主Broker:反馈HA复制进度,即:currentReportedOffset(从Broker的Commitlog文件的最大偏移量),方法org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset执行。
  • 从Broker接收主Broker:HA同步消息内容,方法org.apache.rocketmq.store.ha.DefaultHAClient#processReadEvent执行。
/**
 * 向主反馈HA复制进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中
 * {@link DefaultHAClient#reportSlaveMaxOffset(long)}:向主反馈HA复制进度,即:currentReportedOffset(从Broker的Commitlog文件的最大偏移量)
 * {@link DefaultHAClient#processReadEvent()}:处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中
 */
private boolean transferFromMaster() throws IOException {
    boolean result;
    // 判断是否需要向主Broker反馈当前待拉取偏移量
    if (this.isTimeToReportOffset()) {
        log.info("Slave report current offset {}", this.currentReportedOffset);
        // 向主Broker反馈拉取偏移量
        result = this.reportSlaveMaxOffset(this.currentReportedOffset);
        if (!result) {
            return false;
        }
    }

    this.selector.select(1000);

    // 处理主Broker发送过来的消息数据
    result = this.processReadEvent();
    if (!result) {
        return false;
    }

    return reportSlaveMaxOffsetPlus();
}

        org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset向主Broker反馈HA复制进度,代码如下。

/**
 * 向主Broker反馈拉取偏移量
 * 注意:
 *      a. 向主Broker反馈拉取偏移量maxOffset: 对于Slave端:发送下次待拉取消息偏移量
 *                                        对于Master端:本次请求拉取的偏移量,也可以理解为同步ACK
 *      b. 手动切换ByteBuffer的写模式/读模式;
 *      c. 通过{@link Buffer#hasRemaining()}判断缓存内容是否完全写入SocketChannel(基于NIO模式的写范例)
 * @param maxOffset HA待拉取偏移量
 * @return ByteBuffer缓存的内容是否写完
 */
private boolean reportSlaveMaxOffset(final long maxOffset) {
    // 偏移量写入ByteBuffer
    this.reportOffset.position(0); // 写缓存位置
    this.reportOffset.limit(8); // 写缓存字节长度
    this.reportOffset.putLong(maxOffset); // 偏移量写入ByteBuffer
    // 将ByteBuffer的写模式 转为 读模式
    this.reportOffset.position(0);
    this.reportOffset.limit(8);

    // 循环,并判定ByteBuffer是否完全写入SocketChannel
    for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
        try {
            this.socketChannel.write(this.reportOffset);
        } catch (IOException e) {
            log.error(this.getServiceName()
                + "reportSlaveMaxOffset this.socketChannel.write exception", e);
            return false;
        }
    }
    lastWriteTimestamp = this.defaultMessageStore.getSystemClock().now();
    return !this.reportOffset.hasRemaining();
}

4. ReadSocketService线程读取从Broker复制进度

        org.apache.rocketmq.store.ha.DefaultHAConnection.ReadSocketService#processReadEvent是主Broker读取从Broker拉取消息的请求,获取内容是HA复制进度。其代码如下,看出主Broker获取从Broker的HA复制进度后,赋值给DefaultHAConnection#slaveRequestOffset属性,后立即唤醒GroupTransferService线程,执行消息同步

/**
 * 主Broker读取从Broker拉取消息的请求
 * step1:判定byteBufferRead是否有剩余空间,没有则{@link Buffer#flip()}
 * step2:用剩余空间,从SocketChannel读数据到缓存中;读取到的内容是从Broker拉取消息的偏移量
 * step3:通知等待同步HA复制结果的发送消息线程
 * @return
 */
private boolean processReadEvent() {
    int readSizeZeroTimes = 0;

    /*
        byteBufferRead没有剩余空间时,则:position == limit == capacity
        调用flip()方法后,则:position == 0, limit == capacity,加上processPosition = 0,说明从头开始处理
     */
    if (!this.byteBufferRead.hasRemaining()) {
        this.byteBufferRead.flip(); // ByteBuffer重置处理
        this.processPosition = 0;
    }

    // ByteBuffer有剩余空间,循环至byteBufferRead没有剩余空间
    while (this.byteBufferRead.hasRemaining()) {
        try {
            // 从SocketChannel读数据到缓存中
            int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                readSizeZeroTimes = 0; // 重置
                this.lastReadTimestamp = DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                // 读取内容长度 >= 8,说明收到从Broker的拉取请求(内容是offset)
                if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                    int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                    long readOffset = this.byteBufferRead.getLong(pos - 8);
                    this.processPosition = pos;

                    // 从Broker反馈已完成的偏移量
                    DefaultHAConnection.this.slaveAckOffset = readOffset;
                    // 更新从Broker请求拉取的偏移量
                    if (DefaultHAConnection.this.slaveRequestOffset < 0) {
                        DefaultHAConnection.this.slaveRequestOffset = readOffset;
                        log.info("slave[" + DefaultHAConnection.this.clientAddress + "] request offset " + readOffset);
                    }

                    // 通知等待同步HA复制结果的发送消息线程
                    DefaultHAConnection.this.haService.notifyTransferSome(DefaultHAConnection.this.slaveAckOffset);
                }
            } else if (readSize == 0) {
                // 连续读取字节数0,则终止本次读取处理
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                log.error("read socket[" + DefaultHAConnection.this.clientAddress + "] < 0");
                return false;
            }
        } catch (IOException e) {
            log.error("processReadEvent exception", e);
            return false;
        }
    }

    return true;
}

5. WriteSocketService传输同步消息

        ReadSocketService线程读取从Broker发送的HA复制进度,由org.apache.rocketmq.store.ha.DefaultHAConnection.WriteSocketService根据DefaultHAConnection#slaveRequestOffset获取主Broker还没有同步的所有消息进行HA同步。其如下代码所示WriteSocketService#run方法,是同步消息核心逻辑。

/**
 * 传输消息内容到HA客户端
 * step1:slaveRequestOffset为-1时,说明主Broker没有收到从Broker的拉取请求,忽略本次写事件
 * step2:nextTransferFromWhere为-1时,说明初次传输,计算nextTransferFromWhere(待传输offset)
 * step3:判断上次是否传输完
 *        上次传输完:当前时间 与 上次传输时间的间隔 > 发送心跳包时间间隔,
 *                  发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)
 *        上次传输没有完成:继续传输,忽略本次写事件
 * step4:根据从Broker待拉取消息offset查找之后的所有可读消息
 * step5:待同步消息总大小 > 一次传输的大小,默认32KB,则截取,此时一次传输不是完整的消息
 * step6:传输消息内容
 */
@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            this.selector.select(1000);

            // slaveRequestOffset为-1时,说明主Broker没有收到从Broker的拉取请求,忽略本次写事件
            if (-1 == DefaultHAConnection.this.slaveRequestOffset) {
                Thread.sleep(10);
                continue;
            }

            /*
                nextTransferFromWhere为-1时,说明初次传输
                初次传输时,计算nextTransferFromWhere(待传输offset)
             */
            // 初次传输
            if (-1 == this.nextTransferFromWhere) {
                // =0 时,从Commitlog文件的最大偏移量传输
                if (0 == DefaultHAConnection.this.slaveRequestOffset) {
                    long masterOffset = DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                    masterOffset =
                        masterOffset
                            - (masterOffset % DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                            .getMappedFileSizeCommitLog());

                    if (masterOffset < 0) {
                        masterOffset = 0;
                    }

                    this.nextTransferFromWhere = masterOffset;
                }
                // !=0 时,从Broker请求的偏移量
                else {
                    this.nextTransferFromWhere = DefaultHAConnection.this.slaveRequestOffset;
                }

                log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + DefaultHAConnection.this.clientAddress
                    + "], and slave request " + DefaultHAConnection.this.slaveRequestOffset);
            }

            /*
                判断上次是否传输完
                上次传输完:当前时间 与 上次传输时间的间隔 > 发送心跳包时间间隔,
                         发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)
                上次传输没有完成:继续传输,忽略本次写事件
             */
            // lastWriteOver为true,则上次传输完
            if (this.lastWriteOver) {

                // 当前时间 与 上次传输时间的间隔
                long interval =
                    DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;

                // 时间间隔 > 发送心跳包时间间隔
                if (interval > DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                    .getHaSendHeartbeatInterval()) {

                    // Build Header 发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)
                    this.byteBufferHeader.position(0);
                    this.byteBufferHeader.limit(headerSize);
                    this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                    this.byteBufferHeader.putInt(0);
                    this.byteBufferHeader.flip();

                    // 传输数据
                    this.lastWriteOver = this.transferData();
                    if (!this.lastWriteOver)
                        continue;
                }
            }
            // lastWriteOver为false,则上次传输没有完成,则继续传输
            else {
                // 继续传输上次拉取请求,还未完成,则忽略本次写事件
                this.lastWriteOver = this.transferData();
                if (!this.lastWriteOver)
                    continue;
            }

            // 根据从Broker待拉取消息offset查找之后的所有可读消息
            SelectMappedBufferResult selectResult =
                DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
            if (selectResult != null) {
                int size = selectResult.getSize();
                // 待同步消息总大小 > 一次传输的大小,默认32KB
                if (size > DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                    size = DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                }

                int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();
                if (size > canTransferMaxBytes) {
                    if (System.currentTimeMillis() - lastPrintTimestamp > 1000) {
                        log.warn("Trigger HA flow control, max transfer speed {}KB/s, current speed: {}KB/s",
                            String.format("%.2f", flowMonitor.maxTransferByteInSecond() / 1024.0),
                            String.format("%.2f", flowMonitor.getTransferredByteInSecond() / 1024.0));
                        lastPrintTimestamp = System.currentTimeMillis();
                    }
                    size = canTransferMaxBytes;
                }

                long thisOffset = this.nextTransferFromWhere;
                this.nextTransferFromWhere += size; // 下一次写入的offset

                selectResult.getByteBuffer().limit(size);
                this.selectMappedBufferResult = selectResult;

                // Build Header 传输size大小的消息内容(不一定是完整的消息)
                this.byteBufferHeader.position(0);
                this.byteBufferHeader.limit(headerSize);
                this.byteBufferHeader.putLong(thisOffset);
                this.byteBufferHeader.putInt(size);
                this.byteBufferHeader.flip();

                // 传输数据
                this.lastWriteOver = this.transferData();
            } else {

                DefaultHAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
            }
        } catch (Exception e) {

            DefaultHAConnection.log.error(this.getServiceName() + " service has exception.", e);
            break;
        }
    }

    DefaultHAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();

    if (this.selectMappedBufferResult != null) {
        this.selectMappedBufferResult.release();
    }

    changeCurrentState(HAConnectionState.SHUTDOWN);

    this.makeStop();

    readSocketService.makeStop();

    haService.removeConnection(DefaultHAConnection.this);

    SelectionKey sk = this.socketChannel.keyFor(this.selector);
    if (sk != null) {
        sk.cancel();
    }

    try {
        this.selector.close();
        this.socketChannel.close();
    } catch (IOException e) {
        DefaultHAConnection.log.error("", e);
    }

    DefaultHAConnection.log.info(this.getServiceName() + " service end");
}

6. GroupTransferService线程通知HA结果

        org.apache.rocketmq.store.ha.GroupTransferService该类负责将主从同步复制结束后,通知阻塞的消息发送者线程。同步主从Broker模式,即:消息刷磁盘后,继续等待新消息被传输到从Broker,等待传输结果,并通知消息发送线程

        1):待需要HA的消息集合

        org.apache.rocketmq.store.CommitLog#asyncPutMessage是消息生产者发送消息到Broker时执行存储消息,参考《RocketMQ5.0.0消息存储<二>_消息存储流程》,该方法会根据同步或异步模式(默认)来执行org.apache.rocketmq.store.CommitLog#handleDiskFlushAndHA方法(完成刷盘和HA复制),方法调用链如下。

        生产者把消息发送到Broker,完成commit操作(消息提交到文件内存映射中) ,随后根据同步/异步模式完成刷盘和HA。HA操作时,把消息提交请求添加到org.apache.rocketmq.store.ha.GroupTransferService.requestsWrite是主Broker待需要HA的的集合。以下是org.apache.rocketmq.store.CommitLog#handleHA的代码。  

private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult result, PutMessageResult putMessageResult,
                                                     int needAckNums) {
    if (needAckNums >= 0 && needAckNums <= 1) {
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }

    HAService haService = this.defaultMessageStore.getHaService();

    long nextOffset = result.getWroteOffset() + result.getWroteBytes();

    // Wait enough acks from different slaves
    GroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums);
    haService.putRequest(request);
    haService.getWaitNotifyObject().wakeupAll();
    return request.future();
}

        2):通知消息发送者线程

        HAService启动时,会启动GroupTransferService线程。GroupTransferService#run执行任务,如下代码所示。

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // 间隔10s
            this.waitForRunning(10);
            // 主从同步复制结束后,通知阻塞的消息发送者线程
            this.doWaitTransfer();
        } catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

        其中执行waitForRunning()方法时,会去执行org.apache.rocketmq.store.ha.GroupTransferService#swapRequests方法,使得requestsWrite与requestsRead两个集合对调

  • private volatile List<CommitLog.GroupCommitRequest> requestsWrite:主Broker待需要HA的消息集合
  • private volatile List<CommitLog.GroupCommitRequest> requestsRead:主Broker正在执行的HA集合
private void swapRequests() {
    List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
    this.requestsWrite = this.requestsRead;
    this.requestsRead = tmp;
}

        org.apache.rocketmq.store.ha.GroupTransferService#doWaitTransfer方法是主从同步复制结束后,通知阻塞的消息发送者线程,如下代码所示。

/**
 * 主从同步复制结束后,通知阻塞的消息发送者线程
 * step1:遍历消息提交请求(内存提交到Commitlog文件的内存映射)
 * step2:判断主从同步成功:从已经成功复制的最大偏移量 >= 消息生产者发送消息后返回下一条消息的偏移量
 */
private void doWaitTransfer() {
    // 加锁
    synchronized (this.requestsRead) {
        if (!this.requestsRead.isEmpty()) {
            // commit请求,即:内存提交到Commitlog文件的内存映射
            for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                boolean transferOK = false;

                long deadLine = req.getDeadLine(); // 是否超时
                final boolean allAckInSyncStateSet = req.getAckNums() == MixAll.ALL_ACK_IN_SYNC_STATE_SET;

                for (int i = 0; !transferOK && deadLine - System.nanoTime() > 0; i++) { // 是否超时
                    if (i > 0) {
                        // 等待1s
                        this.notifyTransferObject.waitForRunning(1000);
                    }

                    if (!allAckInSyncStateSet && req.getAckNums() <= 1) {
                        // 主从同步成功判断:从已经成功复制的最大偏移量 >= 消息生产者发送消息后返回下一条消息的偏移量
                        transferOK = haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset();
                        continue;
                    }

                    if (allAckInSyncStateSet && this.haService instanceof AutoSwitchHAService) {
                        // In this mode, we must wait for all replicas that in InSyncStateSet.
                        final AutoSwitchHAService autoSwitchHAService = (AutoSwitchHAService) this.haService;
                        final Set<String> syncStateSet = autoSwitchHAService.getSyncStateSet();
                        if (syncStateSet.size() <= 1) {
                            // Only master
                            transferOK = true;
                            break;
                        }

                        // Include master
                        int ackNums = 1;
                        for (HAConnection conn : haService.getConnectionList()) {
                            final AutoSwitchHAConnection autoSwitchHAConnection = (AutoSwitchHAConnection) conn;
                            if (syncStateSet.contains(autoSwitchHAConnection.getSlaveAddress()) && autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {
                                ackNums++;
                            }
                            if (ackNums >= syncStateSet.size()) {
                                transferOK = true;
                                break;
                            }
                        }
                    } else {
                        // Include master
                        int ackNums = 1;
                        for (HAConnection conn : haService.getConnectionList()) {
                            // TODO: We must ensure every HAConnection represents a different slave
                            // Solution: Consider assign a unique and fixed IP:ADDR for each different slave
                            if (conn.getSlaveAckOffset() >= req.getNextOffset()) {
                                ackNums++;
                            }
                            if (ackNums >= req.getAckNums()) {
                                transferOK = true;
                                break;
                            }
                        }
                    }
                }

                if (!transferOK) {
                    log.warn("transfer message to slave timeout, offset : {}, request acks: {}",
                        req.getNextOffset(), req.getAckNums());
                }

                // 从完成复制后,唤醒消息发送者线程
                req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
            }

            this.requestsRead.clear();
        }
    }
}

三、读写分离机制

        RocketMQ读写分离与其他中间件的实现方式完全不同,RocketMQ是消费者首先向主服务器发起拉取消息请求,然后主服务器返回一批消息,然后会根据主服务器负载压力与主从同步情况,向从服务器建议下次消息拉取是从主服务器还是从从服务器拉取

        RocketMQ根据MessageQueu查找Broker地址的唯一依据是brokerName。Broker组织中根据brokerName获取一组Broker服务器(M-S),它们的brokerName相同但brokerId不同,主服务器的brokerId为0,从服务器的brokerId大于0。 其方法是org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe

        详细消费拉取消息时,实现读写分离机制见后续章节,参考《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取》。

四、参考资料

【RocketMQ】学习RocketMQ必须要知道的主从同步原理_午睡的猫…的博客-CSDN博客_rocketmq主从同步原理

【RocketMQ】主从同步实现原理 - shanml - 博客园

RocketMQ5.0.0消息存储<二>_消息存储流程_爱我所爱0505的博客-CSDN博客_rocketmq 消息写入流程RocketMQ5.0.0消息存储<三>_消息转发与恢复机制_爱我所爱0505的博客-CSDN博客 

RocketMQ5.0.0消息存储<四>_刷盘机制_爱我所爱0505的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/366704.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【每日一题Day127】LC1238循环码排列 | 格雷码构造 位运算

格雷码 看到题目就想到了格雷码 然后就疯狂搜索格雷码 手动构造了一波格雷码 看了题解 emmm 有点亏 理论基础 n 位格雷码序列 是一个由 2n 个整数组成的序列&#xff0c;其中&#xff1a; 每个整数都在范围 [0, 2n - 1] 内&#xff08;含 0 和 2n - 1&#xff09;第一个整数是…

深度学习之“制作自定义数据”--torch.utils.data.DataLoader重写构造方法。

深度学习之“制作自定义数据”–torch.utils.data.DataLoader重写构造方法。 前言&#xff1a; ​ 本文讲述重写torch.utils.data.DataLoader类的构造方法&#xff0c;对自定义图片制作类似MNIST数据集格式&#xff08;image, label&#xff09;&#xff0c;用于自己的Pytorc…

大数据Hadoop教程-学习笔记04【数据仓库基础与Apache Hive入门】

视频教程&#xff1a;哔哩哔哩网站&#xff1a;黑马大数据Hadoop入门视频教程 总时长&#xff1a;14:22:04教程资源: https://pan.baidu.com/s/1WYgyI3KgbzKzFD639lA-_g 提取码: 6666【P001-P017】大数据Hadoop教程-学习笔记01【大数据导论与Linux基础】【17p】【P018-P037】大…

Spring boot开启定时任务的三种方式(内含源代码+sql文件)

Spring boot开启定时任务的三种方式&#xff08;内含源代码sql文件&#xff09; 源代码sql文件下载链接地址&#xff1a;https://download.csdn.net/download/weixin_46411355/87486580 目录Spring boot开启定时任务的三种方式&#xff08;内含源代码sql文件&#xff09;源代码…

【无人机】回波状态网络(ESN)在固定翼无人机非线性控制中的应用(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

前端常见手写面试题集锦

实现迭代器生成函数 我们说迭代器对象全凭迭代器生成函数帮我们生成。在ES6中&#xff0c;实现一个迭代器生成函数并不是什么难事儿&#xff0c;因为ES6早帮我们考虑好了全套的解决方案&#xff0c;内置了贴心的 生成器 &#xff08;Generator&#xff09;供我们使用&#xff…

java面试题-IO流

基础IO1.如何从数据传输方式理解IO流&#xff1f;IO流根据处理数据的类型可以分为字节流和字符流。字节流字节流以字节&#xff08;8位&#xff09;为单位读写数据。字节流主要用于读写二进制文件&#xff0c;如图片、音频、视频等。Java中的InputStream和OutputStream就是字节…

写论文不用构建语料库!只需要福昕PDF阅读器高级搜索

写论文不用构建语料库&#xff01;只需要福昕PDF阅读器高级搜索 文章目录写论文不用构建语料库&#xff01;只需要福昕PDF阅读器高级搜索前言&#xff1a;“福昕语料库”使用前的准备&#xff1a;调用“语料库”&#xff1a;前言&#xff1a; 最近论文阅读可以借助NewBing的总…

【算法与数据结构(C语言)】栈和队列

文章目录 目录 前言 一、栈 1.栈的概念及结构 2.栈的实现 入栈 出栈 获取栈顶元素 获取栈中有效元素个数 检测栈是否为空&#xff0c;如果为空返回非零结果&#xff0c;如果不为空返回0 销毁栈 二、队列 1.队列的概念及结构 2.队列的实现 初始化队列 队尾入队列 队头出队列 获…

报表开发难上手?这里有一份 Fastreport 最新中文用户指南,请查收

Fast Reports,Inc.成立于1998年&#xff0c;多年来一直致力于开发快速报表软件&#xff0c;包括应用程序、库和插件。FastReport的报表生成器&#xff08;VCL平台和.NET平台&#xff09;、跨平台的多语言脚本引擎FastScript、桌面OLAP FastCube&#xff0c;如今都受到世界各地开…

Typecho COS插件实现网站静态资源存储到COS,降低本地存储负载

Typecho 简介Typecho 是一个简单、强大的轻量级开源博客平台&#xff0c;用于建立个人独立博客。它具有高效的性能&#xff0c;支持多种文件格式&#xff0c;并具有对设备的响应式适配功能。Typecho 相对于其他 CMS 还有一些特殊优势&#xff1a;包括可扩展性、不同数据库之间的…

IDA 实战--(2)熟悉工具

布局介绍 软件启动后会 有几个选项&#xff0c;一般直接选择Go 即可 之后的工作台布局如下 开始分析 分析的第一个&#xff0c;将PE 文件拖入工作区 刚开始接触&#xff0c;我们先保持默认选项&#xff0c;其它选项后面会详细讲解&#xff0c;点击OK 后&#xff0c;等待分析…

软件项目管理知识回顾---软件项目质量和资源管理

软件项目质量和资源管理 5.0质量管理 5.1质量管理模型 1.模型 boehm模型&#xff1a;可移植性&#xff0c;可使用性&#xff0c;可维护性McCall模型ISO体系认证5.2质量成本 1.含义&#xff1a;由于产品第一次不正常运行而产生的附加费用 预防成本和缺陷成本5.3质量管理 1.过程 …

Python opencv进行矩形识别

Python opencv进行矩形识别 图像识别中,圆形和矩形识别是最常用的两种,上一篇讲解了圆形识别,本例讲解矩形识别,最后的结果是可以识别出圆心,4个顶点,如下图: 左边是原始图像,右边是识别结果,在我i5 10400的CPU上,执行时间不到8ms。 识别出结果后,计算任意3个顶点…

【自监督论文阅读笔记】Unsupervised Learning of Dense Visual Representations

Abstract 对比自监督学习已成为无监督视觉表示学习的一种有前途的方法。通常&#xff0c;这些方法学习全局&#xff08;图像级&#xff09;表示&#xff0c;这些表示对于同一图像的不同视图&#xff08;即数据增强的组合&#xff09;是不变的。然而&#xff0c;许多视觉理解任务…

PDF文件怎么转图片格式?转换有技巧

PDF文件有时为了更美观或者更直观的展现出效果&#xff0c;我们会把它转成图片格式&#xff0c;这样不论是归档总结还是存储起来都会更为高效。有没有合适的转换方法呢&#xff1f;这就来给你们罗列几种我个人用过体验还算不错的方式&#xff0c;大家可以拿来参考一下哈。1.用电…

vm 网络配置

点击NAT设置&#xff0c;配置本台虚拟机ip&#xff08;注意网关要在同一个网段&#xff09;&#xff0c;配置对应端口 然后添加映射端口&#xff1a; 然后选择网络适配器 选择vm8网卡 配置网卡静态ip #查看网卡 ip addr #修改网卡配置 cd /etc/sysconfig/network-scripts…

DevData Talks | 对谈谷歌云 DORA 布道师,像谷歌一样度量 DevOps 表现

本期 DevData Talks 我们请到来自 Google Cloud 谷歌云的 DORA 研究团队的嘉宾 Nathen Harvey与 Apache DevLake、CNCF DevStream 的海外社区负责人 Maxim 进行对谈。如果您关注 DevOps 的话&#xff0c;也许对这个团队有所耳闻。 DORA 的全称是 DevOps Research and Assessme…

mysql lesson1

常用命令 1:exit 退出mysql 2&#xff1a;uroot pENTER键&#xff0c;再输入密码&#xff0c;不被别人看见 3&#xff1a;完美卸载&#xff1a;双击安装包&#xff0c;手动删除program file中的mysql,手动删除Programedate里的mysql 4:use mysql 使用数据库 5&#xff1a;…

InstallAware Multi-Platform updated

InstallAware Multi-Platform updated 原生ARM&#xff1a;为您的内置设置、IDE和整个工具链添加了Apple macOS和Linux ARM构建。 本地化&#xff1a;引擎内多语言感知&#xff0c;可再分发工具&#xff0c;具有资产隔离功能&#xff0c;使您的IP保持安全。 模板&#xff1a;将…