RocketMQ5.0的Broker主从同步机制

news2024/11/18 5:26:41

RocketMQ5.0的Broker主从同步机制

一、主从同步工作原理

为了提高消息消费的高可用性,避免Broker发生单点故障引起存储在Broker上的消息无法及时消费, RocketMQ引入Broker主备机制,即:消息消费到达主服务器后需要将消息同步到消息从服务器,如果主服务器Broker宕机后,消息消费者可以从从服务器拉取消息。

下图所示是Broker的HA交互机制流程图及类图。主从同步模式分为:同步、异步。

  • step1:主服务器启动,并在特定端口上监听从服务器的连接;
  • step2:从服务器主动连接主服务器,主服务器接收客户端的连接,并建立相关TCP连接;
  • step3:从服务器主动向主服务器发送待拉取消息偏移量,主服务器解析请求并返回消息给从服务器;
  • step4:从服务器保存消息并继续发送新的消息同步请求。
    在这里插入图片描述
    在这里插入图片描述
    1. 主从配置

参考rocketmq-distribution项目的conf目录下有:2主2从异步HA配置(2m-2s-async)、2主2从同步HA配置(2m-2s-sync)。以下1主1从异步HA配置实例如下。

主Broker配置:

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 启动SQL过滤
enablePropertyFilter = true
 
namesrvAddr=192.168.1.55:9876;172.17.0.3:9876
brokerIP1=192.168.1.55

从Broker配置:

注意:brokerName与主机相同;brokerId > 0时,则为从,0时则为主;brokerRole角色为SLAVE(从),刷盘类型为ASYNC_FLUSH(异步刷盘)。

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
# 启动SQL过滤
enablePropertyFilter = true
 
namesrvAddr=192.168.1.55:9876;172.17.0.3:9876

2. 启动HA

org.apache.rocketmq.store.DefaultMessageStore#start是Broker启动方法,如下图所示是其调用链及相关HA部分代码。
在这里插入图片描述

/**
 * broker启动时,消息存储线程
 * BrokerController#startBasicService()
 * @throws Exception
 */
@Override
public void start() throws Exception {
    // 是否HA主从复制
    if (!messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
        this.haService.init(this);
    }
 
    ......
 
    if (this.haService != null) {
        this.haService.start();
    }
 
    ......
}

org.apache.rocketmq.store.ha.DefaultHAService#init是HAService初始化方法,如下代码所示。注意,从Broker的broker.conf配置的brokerRole为SLAVE,才能创建HAClient(从Broker注册到主Broker)。

@Override
public void init(final DefaultMessageStore defaultMessageStore) throws IOException {
    this.defaultMessageStore = defaultMessageStore;
    this.acceptSocketService = new DefaultAcceptSocketService(defaultMessageStore.getMessageStoreConfig());
    this.groupTransferService = new GroupTransferService(this, defaultMessageStore);
    if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
        this.haClient = new DefaultHAClient(this.defaultMessageStore);
    }
    this.haConnectionStateNotificationService = new HAConnectionStateNotificationService(this, defaultMessageStore);
}

org.apache.rocketmq.store.ha.DefaultHAService#start是HAService启动方法。注意:

  • org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService:主Broker接收从Broker的连接事件;
  • org.apache.rocketmq.store.ha.GroupTransferService:负责主Broker向从Broker发送同步数据;
  • org.apache.rocketmq.store.ha.HAClient:从Broker向主Broker发送连接事件;

Broker启动时,根据配置brokerRole配置(ASYNC_MASTER、SYNC_MASTER、SLAVE)判定Broker是主还是从。若是Slave角色,在broker配置文件中获取haMasterAddress,并更新至masterAddress;但是haMasterAddress配置为空,则启动成功,但是不会执行HA。

/**
 * 启动HAService
 * step1:{@link AcceptSocketService}接收从Broker的注册事件,方法是{@link AcceptSocketService#beginAccept()}
 * step2:启动{@link AcceptSocketService}线程,监听从Broker发送心跳
 * step3:同步数据{@link GroupTransferService}线程启动,主Broker向从Broker发送数据
 * step4:启动从Broker{@link HAClient}发送心跳到主Broker
 */
@Override
public void start() throws Exception {
    // 主接收从Broker的连接事件,SelectionKey.OP_ACCEPT(连接事件)
    this.acceptSocketService.beginAccept();
    // 启动主Broker线程
    this.acceptSocketService.start();
    // 主Broker同步数据线程启动
    this.groupTransferService.start();
    this.haConnectionStateNotificationService.start();
    // 启动从Broker{@link HAClient}发送心跳到主Broker
    if (haClient != null) {
        this.haClient.start();
    }
}

二、主从同步实现机制

1. 从Broker发送连接事件

org.apache.rocketmq.store.ha.DefaultHAClient是从Broker向主Broker的发送连接事件的核心类,是个线程。其主要属性如下代码所示。

// Socket读缓存区大小,4M
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
// 主Broker地址
private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
private final AtomicReference<String> masterAddress = new AtomicReference<>();
// 从Broker向主Broker发起HA的偏移量
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
// 网络传输通道
private SocketChannel socketChannel;
// 事件选择器
private Selector selector;
/**
 * 上次读取主Broker的时间戳
 * last time that slave reads date from master.
 */
private long lastReadTimestamp = System.currentTimeMillis();
/**
 * 上次写入主Broker的时间戳
 * last time that slave reports offset to master.
 */
private long lastWriteTimestamp = System.currentTimeMillis();
 
// 反馈HA的复制进度(从Broker的Commitlog文件的最大偏移量)
private long currentReportedOffset = 0;
// 本次处理读缓存区的指针
private int dispatchPosition = 0;
// 读缓存区
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
// 读缓存区备份,与byteBufferRead交换
private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private DefaultMessageStore defaultMessageStore;
// HA连接状态
private volatile HAConnectionState currentState = HAConnectionState.READY;
// 流监控
private FlowMonitor flowMonitor;

org.apache.rocketmq.store.ha.DefaultHAClient#run是HAClient启动执行任务,其调用链和代码如下。

  • DefaultHAClient#connectMaster():从Broker连接到主Broker。
  • DefaultHAClient#transferFromMaster():向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中。
    在这里插入图片描述
/**
 * 启动HAClient
 * {@link DefaultHAClient#connectMaster()}:从Broker连接到主Broker
 * {@link DefaultHAClient#transferFromMaster()}:向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中
 */
@Override
public void run() {
    log.info(this.getServiceName() + " service started");
 
    this.flowMonitor.start();
 
    while (!this.isStopped()) {
        try {
            switch (this.currentState) {
                case SHUTDOWN:
                    return;
                case READY:
                    if (!this.connectMaster()) {
                        log.warn("HAClient connect to master {} failed", this.masterHaAddress.get());
                        this.waitForRunning(1000 * 5);
                    }
                    continue;
                case TRANSFER:
                    // 向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中
                    if (!transferFromMaster()) {
                        // 没有可拉取消息时,设置READY状态
                        closeMasterAndWait();
                        continue;
                    }
                    break;
                default:
                    this.waitForRunning(1000 * 2);
                    continue;
            }
            long interval = this.defaultMessageStore.now() - this.lastReadTimestamp;
            if (interval > this.defaultMessageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {
                log.warn("AutoRecoverHAClient, housekeeping, found this connection[" + this.masterHaAddress
                    + "] expired, " + interval);
                this.closeMaster();
                log.warn("AutoRecoverHAClient, master not response some time, so close connection");
            }
        } catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.closeMasterAndWait();
        }
    }
 
    log.info(this.getServiceName() + " service end");
}

注意,一旦HAClient线程启动后,在状态READY、TRANSFER来回变化,READY状态下:发送从Broker连接事件到主Broker,开启Socket连接;TRANSFER状态下:主从发送相关数据信息,如:从向主发送HA复制进度(currentReportedOffset,即:从Broker的Commitlog文件的最大偏移量);主向从发送同步消息。

org.apache.rocketmq.store.ha.DefaultHAClient#connectMaster是从Broker连接到主Broker的核心方法,其代码如下。

/**
 * 从Broker连接到主Broker
 * 注意:
 *  a. Broker启动时,若是Slave角色,从broker配置文件中获取haMasterAddress,并更新至masterAddress;
 *  b. 若是Slave角色,但是haMasterAddress配置为空,则启动成功,但是不会执行HA
 * @return true连接成功;false连接失败
 */
public boolean connectMaster() throws ClosedChannelException {
    if (null == socketChannel) {
        // 获取主Broker地址
        String addr = this.masterHaAddress.get();
        if (addr != null) {
            // 根据地址创建SocketAddress对象
            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            // 获取SocketChannel
            this.socketChannel = RemotingUtil.connect(socketAddress);
            if (this.socketChannel != null) {
                // SocketChannel注册OP_READ(网络读事件)
                this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                log.info("HAClient connect to master {}", addr);
                this.changeCurrentState(HAConnectionState.TRANSFER);
            }
        }
 
        // 获取Commitlog最大偏移量(HA同步进度)
        this.currentReportedOffset = this.defaultMessageStore.getMaxPhyOffset();
 
        this.lastReadTimestamp = System.currentTimeMillis();
    }
 
    return this.socketChannel != null;
}

2. 主Broker接收连接事件

org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService是主Broker接收从Broker连接事件的实现类,是一个线程。其主要属性如下代码所示。

// 主Broker监听本地的Socket(本地IP + 端口号)
private final SocketAddress socketAddressListen;
// Socket通道,基于NIO
private ServerSocketChannel serverSocketChannel;
// 事件选择器,基于NIO
private Selector selector;

org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#beginAccept方法定义了主Broker监听从Broker的连接事件。

/**
 * 启动监听从broker的连接
 * Starts listening to slave connections.
 *
 * @throws Exception If fails.
 */
public void beginAccept() throws Exception {
    this.serverSocketChannel = ServerSocketChannel.open();
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true); // TCP可重复使用
    this.serverSocketChannel.socket().bind(this.socketAddressListen); // 绑定监听端口
    if (0 == messageStoreConfig.getHaListenPort()) {
        messageStoreConfig.setHaListenPort(this.serverSocketChannel.socket().getLocalPort());
        log.info("OS picked up {} to listen for HA", messageStoreConfig.getHaListenPort());
    }
    this.serverSocketChannel.configureBlocking(false); // 非阻塞模式
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); // 注册OP_ACCEPT(连接事件)
}

org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#run监听到从Broker连接事件的任务处理,为每个连接事件创建org.apache.rocketmq.store.ha.HAConnection对象并启动(负责M-S的数据同步逻辑)。

/**
 * 标准的NIO连接处理方式
 * step1:选择器每1s处理一次连接就绪事件
 * step2:是否连接事件,若是,创建{@link SocketChannel}
 * step3:每一个连接创建{@link HAConnection}(负责M-S的数据同步逻辑)
 */
@Override
public void run() {
    log.info(this.getServiceName() + " service started");
 
    while (!this.isStopped()) {
        try {
            // 选择器每1s处理一次连接就绪事件
            this.selector.select(1000);
            Set<SelectionKey> selected = this.selector.selectedKeys();
 
            if (selected != null) {
                for (SelectionKey k : selected) {
                    // 是否连接事件
                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                        // 若是连接事件时,创建SocketChannel
                        SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
 
                        if (sc != null) {
                            DefaultHAService.log.info("HAService receive new connection, "
                                + sc.socket().getRemoteSocketAddress());
                            try {
                                // 每一个连接创建HAConnection并启动(负责M-S的数据同步逻辑)
                                HAConnection conn = createConnection(sc);
                                conn.start();
                                DefaultHAService.this.addConnection(conn);
                            } catch (Exception e) {
                                log.error("new HAConnection exception", e);
                                sc.close();
                            }
                        }
                    } else {
                        log.warn("Unexpected ops in select " + k.readyOps());
                    }
                }
 
                selected.clear();
            }
        } catch (Exception e) {
            log.error(this.getServiceName() + " service has exception.", e);
        }
    }
 
    log.info(this.getServiceName() + " service end");
}

org.apache.rocketmq.store.ha.DefaultHAConnection创建并启动时,启动读、写线程服务。其关键属性如下代码所示。

  • private WriteSocketService writeSocketService:主Broker向从Broker写数据服务类
  • private ReadSocketService readSocketService:主Broker读取从Broker数据服务类
private final DefaultHAService haService;
private final SocketChannel socketChannel;
// HA客户端连接地址
private final String clientAddress;
// 主Broker向从Broker写数据服务类
private WriteSocketService writeSocketService;
// 主Broker读取从Broker数据服务类
private ReadSocketService readSocketService;
private volatile HAConnectionState currentState = HAConnectionState.TRANSFER;
// 从Broker请求拉取的偏移量
private volatile long slaveRequestOffset = -1;
// 从Broker反馈已完成的偏移量
private volatile long slaveAckOffset = -1;
private FlowMonitor flowMonitor;

3. 从Broker反馈复制进度

org.apache.rocketmq.store.ha.DefaultHAClient#transferFromMaster是从Broker与主Broker传输数据的核心方法,代码如下所示,该方法有两大功能:

  • 从Broker向主Broker:反馈HA复制进度,即:currentReportedOffset(从Broker的Commitlog文件的最大偏移量),方法org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset执行。
  • 从Broker接收主Broker:HA同步消息内容,方法org.apache.rocketmq.store.ha.DefaultHAClient#processReadEvent执行。
/**
 * 向主反馈HA复制进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中
 * {@link DefaultHAClient#reportSlaveMaxOffset(long)}:向主反馈HA复制进度,即:currentReportedOffset(从Broker的Commitlog文件的最大偏移量)
 * {@link DefaultHAClient#processReadEvent()}:处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中
 */
private boolean transferFromMaster() throws IOException {
    boolean result;
    // 判断是否需要向主Broker反馈当前待拉取偏移量
    if (this.isTimeToReportOffset()) {
        log.info("Slave report current offset {}", this.currentReportedOffset);
        // 向主Broker反馈拉取偏移量
        result = this.reportSlaveMaxOffset(this.currentReportedOffset);
        if (!result) {
            return false;
        }
    }
 
    this.selector.select(1000);
 
    // 处理主Broker发送过来的消息数据
    result = this.processReadEvent();
    if (!result) {
        return false;
    }
 
    return reportSlaveMaxOffsetPlus();
}

org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset向主Broker反馈HA复制进度,代码如下。

/**
 * 向主Broker反馈拉取偏移量
 * 注意:
 *      a. 向主Broker反馈拉取偏移量maxOffset: 对于Slave端:发送下次待拉取消息偏移量
 *                                        对于Master端:本次请求拉取的偏移量,也可以理解为同步ACK
 *      b. 手动切换ByteBuffer的写模式/读模式;
 *      c. 通过{@link Buffer#hasRemaining()}判断缓存内容是否完全写入SocketChannel(基于NIO模式的写范例)
 * @param maxOffset HA待拉取偏移量
 * @return ByteBuffer缓存的内容是否写完
 */
private boolean reportSlaveMaxOffset(final long maxOffset) {
    // 偏移量写入ByteBuffer
    this.reportOffset.position(0); // 写缓存位置
    this.reportOffset.limit(8); // 写缓存字节长度
    this.reportOffset.putLong(maxOffset); // 偏移量写入ByteBuffer
    // 将ByteBuffer的写模式 转为 读模式
    this.reportOffset.position(0);
    this.reportOffset.limit(8);
 
    // 循环,并判定ByteBuffer是否完全写入SocketChannel
    for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
        try {
            this.socketChannel.write(this.reportOffset);
        } catch (IOException e) {
            log.error(this.getServiceName()
                + "reportSlaveMaxOffset this.socketChannel.write exception", e);
            return false;
        }
    }
    lastWriteTimestamp = this.defaultMessageStore.getSystemClock().now();
    return !this.reportOffset.hasRemaining();
}

4. ReadSocketService线程读取从Broker复制进度
org.apache.rocketmq.store.ha.DefaultHAConnection.ReadSocketService#processReadEvent是主Broker读取从Broker拉取消息的请求,获取内容是HA复制进度。其代码如下,看出主Broker获取从Broker的HA复制进度后,赋值给DefaultHAConnection#slaveRequestOffset属性,后立即唤醒GroupTransferService线程,执行消息同步。

/**
 * 主Broker读取从Broker拉取消息的请求
 * step1:判定byteBufferRead是否有剩余空间,没有则{@link Buffer#flip()}
 * step2:用剩余空间,从SocketChannel读数据到缓存中;读取到的内容是从Broker拉取消息的偏移量
 * step3:通知等待同步HA复制结果的发送消息线程
 * @return
 */
private boolean processReadEvent() {
    int readSizeZeroTimes = 0;
 
    /*
        byteBufferRead没有剩余空间时,则:position == limit == capacity
        调用flip()方法后,则:position == 0, limit == capacity,加上processPosition = 0,说明从头开始处理
     */
    if (!this.byteBufferRead.hasRemaining()) {
        this.byteBufferRead.flip(); // ByteBuffer重置处理
        this.processPosition = 0;
    }
 
    // ByteBuffer有剩余空间,循环至byteBufferRead没有剩余空间
    while (this.byteBufferRead.hasRemaining()) {
        try {
            // 从SocketChannel读数据到缓存中
            int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                readSizeZeroTimes = 0; // 重置
                this.lastReadTimestamp = DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                // 读取内容长度 >= 8,说明收到从Broker的拉取请求(内容是offset)
                if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                    int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                    long readOffset = this.byteBufferRead.getLong(pos - 8);
                    this.processPosition = pos;
 
                    // 从Broker反馈已完成的偏移量
                    DefaultHAConnection.this.slaveAckOffset = readOffset;
                    // 更新从Broker请求拉取的偏移量
                    if (DefaultHAConnection.this.slaveRequestOffset < 0) {
                        DefaultHAConnection.this.slaveRequestOffset = readOffset;
                        log.info("slave[" + DefaultHAConnection.this.clientAddress + "] request offset " + readOffset);
                    }
 
                    // 通知等待同步HA复制结果的发送消息线程
                    DefaultHAConnection.this.haService.notifyTransferSome(DefaultHAConnection.this.slaveAckOffset);
                }
            } else if (readSize == 0) {
                // 连续读取字节数0,则终止本次读取处理
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                log.error("read socket[" + DefaultHAConnection.this.clientAddress + "] < 0");
                return false;
            }
        } catch (IOException e) {
            log.error("processReadEvent exception", e);
            return false;
        }
    }
 
    return true;
}

5. WriteSocketService传输同步消息
ReadSocketService线程读取从Broker发送的HA复制进度,由org.apache.rocketmq.store.ha.DefaultHAConnection.WriteSocketService根据DefaultHAConnection#slaveRequestOffset获取主Broker还没有同步的所有消息进行HA同步。其如下代码所示WriteSocketService#run方法,是同步消息核心逻辑。

/**
 * 传输消息内容到HA客户端
 * step1:slaveRequestOffset为-1时,说明主Broker没有收到从Broker的拉取请求,忽略本次写事件
 * step2:nextTransferFromWhere为-1时,说明初次传输,计算nextTransferFromWhere(待传输offset)
 * step3:判断上次是否传输完
 *        上次传输完:当前时间 与 上次传输时间的间隔 > 发送心跳包时间间隔,
 *                  发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)
 *        上次传输没有完成:继续传输,忽略本次写事件
 * step4:根据从Broker待拉取消息offset查找之后的所有可读消息
 * step5:待同步消息总大小 > 一次传输的大小,默认32KB,则截取,此时一次传输不是完整的消息
 * step6:传输消息内容
 */
@Override
public void run() {
    log.info(this.getServiceName() + " service started");
 
    while (!this.isStopped()) {
        try {
            this.selector.select(1000);
 
            // slaveRequestOffset为-1时,说明主Broker没有收到从Broker的拉取请求,忽略本次写事件
            if (-1 == DefaultHAConnection.this.slaveRequestOffset) {
                Thread.sleep(10);
                continue;
            }
 
            /*
                nextTransferFromWhere为-1时,说明初次传输
                初次传输时,计算nextTransferFromWhere(待传输offset)
             */
            // 初次传输
            if (-1 == this.nextTransferFromWhere) {
                // =0 时,从Commitlog文件的最大偏移量传输
                if (0 == DefaultHAConnection.this.slaveRequestOffset) {
                    long masterOffset = DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                    masterOffset =
                        masterOffset
                            - (masterOffset % DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                            .getMappedFileSizeCommitLog());
 
                    if (masterOffset < 0) {
                        masterOffset = 0;
                    }
 
                    this.nextTransferFromWhere = masterOffset;
                }
                // !=0 时,从Broker请求的偏移量
                else {
                    this.nextTransferFromWhere = DefaultHAConnection.this.slaveRequestOffset;
                }
 
                log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + DefaultHAConnection.this.clientAddress
                    + "], and slave request " + DefaultHAConnection.this.slaveRequestOffset);
            }
 
            /*
                判断上次是否传输完
                上次传输完:当前时间 与 上次传输时间的间隔 > 发送心跳包时间间隔,
                         发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)
                上次传输没有完成:继续传输,忽略本次写事件
             */
            // lastWriteOver为true,则上次传输完
            if (this.lastWriteOver) {
 
                // 当前时间 与 上次传输时间的间隔
                long interval =
                    DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
 
                // 时间间隔 > 发送心跳包时间间隔
                if (interval > DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                    .getHaSendHeartbeatInterval()) {
 
                    // Build Header 发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)
                    this.byteBufferHeader.position(0);
                    this.byteBufferHeader.limit(headerSize);
                    this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                    this.byteBufferHeader.putInt(0);
                    this.byteBufferHeader.flip();
 
                    // 传输数据
                    this.lastWriteOver = this.transferData();
                    if (!this.lastWriteOver)
                        continue;
                }
            }
            // lastWriteOver为false,则上次传输没有完成,则继续传输
            else {
                // 继续传输上次拉取请求,还未完成,则忽略本次写事件
                this.lastWriteOver = this.transferData();
                if (!this.lastWriteOver)
                    continue;
            }
 
            // 根据从Broker待拉取消息offset查找之后的所有可读消息
            SelectMappedBufferResult selectResult =
                DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
            if (selectResult != null) {
                int size = selectResult.getSize();
                // 待同步消息总大小 > 一次传输的大小,默认32KB
                if (size > DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                    size = DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                }
 
                int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();
                if (size > canTransferMaxBytes) {
                    if (System.currentTimeMillis() - lastPrintTimestamp > 1000) {
                        log.warn("Trigger HA flow control, max transfer speed {}KB/s, current speed: {}KB/s",
                            String.format("%.2f", flowMonitor.maxTransferByteInSecond() / 1024.0),
                            String.format("%.2f", flowMonitor.getTransferredByteInSecond() / 1024.0));
                        lastPrintTimestamp = System.currentTimeMillis();
                    }
                    size = canTransferMaxBytes;
                }
 
                long thisOffset = this.nextTransferFromWhere;
                this.nextTransferFromWhere += size; // 下一次写入的offset
 
                selectResult.getByteBuffer().limit(size);
                this.selectMappedBufferResult = selectResult;
 
                // Build Header 传输size大小的消息内容(不一定是完整的消息)
                this.byteBufferHeader.position(0);
                this.byteBufferHeader.limit(headerSize);
                this.byteBufferHeader.putLong(thisOffset);
                this.byteBufferHeader.putInt(size);
                this.byteBufferHeader.flip();
 
                // 传输数据
                this.lastWriteOver = this.transferData();
            } else {
 
                DefaultHAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
            }
        } catch (Exception e) {
 
            DefaultHAConnection.log.error(this.getServiceName() + " service has exception.", e);
            break;
        }
    }
 
    DefaultHAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
 
    if (this.selectMappedBufferResult != null) {
        this.selectMappedBufferResult.release();
    }
 
    changeCurrentState(HAConnectionState.SHUTDOWN);
 
    this.makeStop();
 
    readSocketService.makeStop();
 
    haService.removeConnection(DefaultHAConnection.this);
 
    SelectionKey sk = this.socketChannel.keyFor(this.selector);
    if (sk != null) {
        sk.cancel();
    }
 
    try {
        this.selector.close();
        this.socketChannel.close();
    } catch (IOException e) {
        DefaultHAConnection.log.error("", e);
    }
 
    DefaultHAConnection.log.info(this.getServiceName() + " service end");
}

6. GroupTransferService线程通知HA结果

org.apache.rocketmq.store.ha.GroupTransferService该类负责将主从同步复制结束后,通知阻塞的消息发送者线程。同步主从Broker模式,即:消息刷磁盘后,继续等待新消息被传输到从Broker,等待传输结果,并通知消息发送线程。

1):待需要HA的消息集合

org.apache.rocketmq.store.CommitLog#asyncPutMessage是消息生产者发送消息到Broker时执行存储消息,参考《RocketMQ5.0.0消息存储<二>_消息存储流程》,该方法会根据同步或异步模式(默认)来执行org.apache.rocketmq.store.CommitLog#handleDiskFlushAndHA方法(完成刷盘和HA复制),方法调用链如下。
在这里插入图片描述
生产者把消息发送到Broker,完成commit操作(消息提交到文件内存映射中) ,随后根据同步/异步模式完成刷盘和HA。HA操作时,把消息提交请求添加到org.apache.rocketmq.store.ha.GroupTransferService.requestsWrite是主Broker待需要HA的的集合。以下是org.apache.rocketmq.store.CommitLog#handleHA的代码。

private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult result, PutMessageResult putMessageResult,
                                                     int needAckNums) {
    if (needAckNums >= 0 && needAckNums <= 1) {
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
 
    HAService haService = this.defaultMessageStore.getHaService();
 
    long nextOffset = result.getWroteOffset() + result.getWroteBytes();
 
    // Wait enough acks from different slaves
    GroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums);
    haService.putRequest(request);
    haService.getWaitNotifyObject().wakeupAll();
    return request.future();
}

2):通知消息发送者线程

HAService启动时,会启动GroupTransferService线程。GroupTransferService#run执行任务,如下代码所示。

@Override
public void run() {
    log.info(this.getServiceName() + " service started");
 
    while (!this.isStopped()) {
        try {
            // 间隔10s
            this.waitForRunning(10);
            // 主从同步复制结束后,通知阻塞的消息发送者线程
            this.doWaitTransfer();
        } catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
 
    log.info(this.getServiceName() + " service end");
}

其中执行waitForRunning()方法时,会去执行org.apache.rocketmq.store.ha.GroupTransferService#swapRequests方法,使得requestsWrite与requestsRead两个集合对调:

  • private volatile List<CommitLog.GroupCommitRequest>
    requestsWrite:主Broker待需要HA的消息集合
  • private volatile List<CommitLog.GroupCommitRequest>
    requestsRead:主Broker正在执行的HA集合
private void swapRequests() {
    List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
    this.requestsWrite = this.requestsRead;
    this.requestsRead = tmp;
}

org.apache.rocketmq.store.ha.GroupTransferService#doWaitTransfer方法是主从同步复制结束后,通知阻塞的消息发送者线程,如下代码所示。

/**
 * 主从同步复制结束后,通知阻塞的消息发送者线程
 * step1:遍历消息提交请求(内存提交到Commitlog文件的内存映射)
 * step2:判断主从同步成功:从已经成功复制的最大偏移量 >= 消息生产者发送消息后返回下一条消息的偏移量
 */
private void doWaitTransfer() {
    // 加锁
    synchronized (this.requestsRead) {
        if (!this.requestsRead.isEmpty()) {
            // commit请求,即:内存提交到Commitlog文件的内存映射
            for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                boolean transferOK = false;
 
                long deadLine = req.getDeadLine(); // 是否超时
                final boolean allAckInSyncStateSet = req.getAckNums() == MixAll.ALL_ACK_IN_SYNC_STATE_SET;
 
                for (int i = 0; !transferOK && deadLine - System.nanoTime() > 0; i++) { // 是否超时
                    if (i > 0) {
                        // 等待1s
                        this.notifyTransferObject.waitForRunning(1000);
                    }
 
                    if (!allAckInSyncStateSet && req.getAckNums() <= 1) {
                        // 主从同步成功判断:从已经成功复制的最大偏移量 >= 消息生产者发送消息后返回下一条消息的偏移量
                        transferOK = haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset();
                        continue;
                    }
 
                    if (allAckInSyncStateSet && this.haService instanceof AutoSwitchHAService) {
                        // In this mode, we must wait for all replicas that in InSyncStateSet.
                        final AutoSwitchHAService autoSwitchHAService = (AutoSwitchHAService) this.haService;
                        final Set<String> syncStateSet = autoSwitchHAService.getSyncStateSet();
                        if (syncStateSet.size() <= 1) {
                            // Only master
                            transferOK = true;
                            break;
                        }
 
                        // Include master
                        int ackNums = 1;
                        for (HAConnection conn : haService.getConnectionList()) {
                            final AutoSwitchHAConnection autoSwitchHAConnection = (AutoSwitchHAConnection) conn;
                            if (syncStateSet.contains(autoSwitchHAConnection.getSlaveAddress()) && autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {
                                ackNums++;
                            }
                            if (ackNums >= syncStateSet.size()) {
                                transferOK = true;
                                break;
                            }
                        }
                    } else {
                        // Include master
                        int ackNums = 1;
                        for (HAConnection conn : haService.getConnectionList()) {
                            // TODO: We must ensure every HAConnection represents a different slave
                            // Solution: Consider assign a unique and fixed IP:ADDR for each different slave
                            if (conn.getSlaveAckOffset() >= req.getNextOffset()) {
                                ackNums++;
                            }
                            if (ackNums >= req.getAckNums()) {
                                transferOK = true;
                                break;
                            }
                        }
                    }
                }
 
                if (!transferOK) {
                    log.warn("transfer message to slave timeout, offset : {}, request acks: {}",
                        req.getNextOffset(), req.getAckNums());
                }
 
                // 从完成复制后,唤醒消息发送者线程
                req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
            }
 
            this.requestsRead.clear();
        }
    }
}

三、读写分离机制

RocketMQ读写分离与其他中间件的实现方式完全不同,RocketMQ是消费者首先向主服务器发起拉取消息请求,然后主服务器返回一批消息,然后会根据主服务器负载压力与主从同步情况,向从服务器建议下次消息拉取是从主服务器还是从从服务器拉取。

RocketMQ根据MessageQueu查找Broker地址的唯一依据是brokerName。Broker组织中根据brokerName获取一组Broker服务器(M-S),它们的brokerName相同但brokerId不同,主服务器的brokerId为0,从服务器的brokerId大于0。 其方法是org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe

详细消费拉取消息时,实现读写分离机制见后续章节,参考《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取》。

四、参考资料

  1. https://blog.csdn.net/sinat_14840559/article/details/115970738
  2. https://www.cnblogs.com/shanml/p/16950178.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/737078.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

15分钟真正搞懂什么是“交叉熵损失函数”

大家好啊&#xff0c;我是董董灿。 在你刚学习神经网络的时候&#xff0c;有没有被一个名字叫做“交叉熵”的概念绕的云里雾里&#xff0c;以至于现在看到这个概念&#xff0c;依然很懵。 今天就来看一下&#xff0c;这个所谓的“交叉熵”到底是什么&#xff0c;以及它在神经…

【项目级面试题】关于前端鉴权这块,能详细的说说吗?

如果有人问你&#xff1a;”你做了这么多年的开发&#xff0c;关于前端鉴权这块&#xff0c;能详细的说说吗&#xff1f;“。你该如何作答&#xff0c;脑海中有没有一个完整的思路。 像 Token、Cookie、Session、JWT、单点登录 这些概念&#xff0c;它们的作用、应用场景、如何…

【C#】并行编程实战:使用 PLINQ(3)

PLINQ 是语言集成查询&#xff08;Language Integrate Query , LINQ&#xff09;的并行实现&#xff08;P 表示并行&#xff09;。本章将继续介绍其编程的各个方面以及与之相关的一些优缺点。 本文的主要内容为 PLINQ 中的组合并行和顺序 LINQ 查询、取消 PLINQ 查询、使用 PLI…

Azure AD混合部署,实现在本地AD同步到AAD上面

一、前期准备 1、进入 Azure云 注册一个账号 云计算服务 | Microsoft Azure 2、进入 AAD 管理后台 Microsoft Azure 3、创建一个新的租户 4、添加自定义域名&#xff0c;这里要做下验证&#xff0c;所以你要有个线上的域名 5、创建一个全局管理员 6、登陆本地AD&#xff0c;创…

第133页的gtk+编程例子——计算器应用改写网上的例子

第133页的gtk编程例子——计算器应用改写网上的例子 来源&#xff1a;《GTK的计算器》 https://blog.csdn.net/zhouzhouzf/article/details/17097999 重点在于它的设计思路是比较巧妙的&#xff0c;能够处理多种情况&#xff0c;比较容易理解&#xff0c;也感到人类的思想是非…

Java进程ProcessBuilder类的介绍及使用,ProcessBuilder调用外部程序执行shell命令Linux命令

目录 ProcessBuilder类的介绍及使用 【前言】 【正文】 --构造方法-- --常用方法-- --使用技巧-- --调用本地Shell命令&#xff0c;实例-- 【总结】 【注意】 ProcessBuilder类的介绍及使用 【前言】 在做一个项目的时候需要用到运行时动态执行JAVA命令&#xff0c;一…

常用数据聚类算法总结记录与代码实现[K-means/层次聚类/DBSACN/高斯混合模型(GMM)/密度峰值聚类/均值漂移聚类/谱聚类等]

本文的主要目的是总结记录日常学习工作中常用到的一些数据聚类算法&#xff0c;对其原理简单总结记录&#xff0c;同时分析对应的优缺点&#xff0c;以后需要的时候可以直接翻看&#xff0c;避免每次都要查询浪费时间&#xff0c;欢迎补充。 聚类算法是一种无监督学习的方法&am…

chrales过期重装或使用途中的踩坑记录及使用方法

1、背景 我的是证书过期了&#xff0c;提示无网络。 1. 重新下载 安装(或者 不需要重新下载安装&#xff0c;用已有的就可以) 2. 重新进行配置 3. (关键)检查电脑上的 证书过期时间 4. (关键)检查手机上的证书过期时间 5. (手机配置好后&#xff0c;点开链接显示无网络&#…

Android:简单登录界面

一、前言&#xff1a;这个登陆界面我前前后后写了差不多一个星期&#xff0c;主要有密码登录、验证码登录、键盘自动隐藏、忘记密码、新建密码等功能&#xff0c;你们可以自己研究一下&#xff01; 二、上代码&#xff1a; 资源文件 1.在value包下面添加一个dimens.xml <…

对性能测试评估分析优化市场的反思

目录 前言&#xff1a; 性能市场的现状 性能测试人员的价值体现 年龄和加班 性能行业的出路 前言&#xff1a; 性能测试评估分析和优化是一个关键的环节&#xff0c;它可以帮助我们了解系统的性能瓶颈和潜在问题&#xff0c;并提出相应的优化方案。在市场竞争激烈的环境下…

Cesium 实战 - AGI_articulations 扩展:模型自定义关节动作

Cesium 实战 - AGI_articulations 扩展&#xff1a;模型自定义关节动作 简要概述两种方式实现模型组件动作模型添加关节&#xff08;articulations&#xff09;1.导入模型&#xff08;J15.glb&#xff09;2.查看模型内部组件信息&#xff08;名称&#xff09;4.将需要J15.glb复…

Docker常见命令(以备不时之需)

参考官网&#xff1a;https://docs.docker.com/engine/reference/commandline/cli/ 帮助启动类命令 启动docker&#xff1a; systemctl start docker 停止docker&#xff1a; systemctl stop docker 重启docker&#xff1a; systemctl restart docker 查看docker状态&…

架构训练营笔记系列:面向复杂度的设计

面向复杂度的设计 DDD 是可扩展架构的设计技巧&#xff0c;不是架构方法论。不关注高性能、高可靠。 架构本质&#xff1a;为了降低软件系统复杂度 怎么做架构设计 &#xff1f;思路是分析系统需求找到系统复杂的地方&#xff0c;然后设计方案。 复杂度相关有哪些&#xff1…

Unity的PostProcessing后处理使用介绍

大家好&#xff0c;我是阿赵。 上一篇文章说了Unity的PostProcessing后处理有bug并提供了解决办法&#xff0c;这里顺便介绍一下PostProcessing的用法。 一、安装 打开PackageManager&#xff0c;然后搜索Post&#xff0c;应该就能看到左边出现搜索结果&#xff0c;选择&…

一个人的面相能直接反映其个性与命运

中国传统文化&#xff0c;博大精深&#xff0c;面相学只是其中一种。 在古代&#xff0c;面相学却是一门非常实用的学科&#xff0c; 尤其是经过了一代代人的发展&#xff0c;面相学得到了完善之后&#xff0c;他的准确性往往会超过现代人的预料。相由心生&#xff0c;面相是对…

用户端App 测试方法与技术

目录&#xff1a; app测试体系app项目测试流程app结构讲解app测试设计思路app常见bug解析常用模拟器使用android开发者选项安装mumu模拟器adb命令介绍adb环境搭建与配置adb与设备交互adb安装卸载应用adb命令启动页面adb命令清缓存adb文件传输adb日志操作adb命令操作Android设备…

DRF+Vue.JS前后端分离项目实例(下) --- Vue.js 前端实现代码

本文上篇请 点击阅读 1. 需求说明 本文以学生信息查询功能为例&#xff0c;采用前后端分离架构&#xff0c;后端提供RESTFul 接口&#xff0c;前端代码用Vue.js Bottstrap实现。 1.1 本例要求提供如下查询功能&#xff1a; 列表查询、单条查询 添加学生信息 更改学生信息 删…

在线试用Stable Diffusion生成可爱的图片

文章目录 一、 Stable Diffusion 模型在线使用地址&#xff1a;二、模型相关版本和参数配置&#xff1a;三、图片生成提示词与反向提示词&#xff1a;提示词1提示词2提示词3提示词4提示词5 一、 Stable Diffusion 模型在线使用地址&#xff1a; https://inscode.csdn.net/insc…

python 读取npy文件

import numpy as np test np.load("I:/软件/mask.npy") print(test) 如下图所示&#xff1a;