RocketMQ源码阅读-七-高可用

news2024/9/29 7:31:21

RocketMQ源码阅读-七-高可用

  • 概述
  • NameServer高可用
    • Broker注册到NameServer
    • Producer、Consumer 访问 Namesrv
  • Broker高可用
    • Broker主从配置
    • Master、Slave通信组件
    • Master与Slave的通信协议
    • Slave节点逻辑
    • Master节点逻辑
    • Master_SYNC模式
    • Producer发消息
    • Consumer消费消息
  • 总结

本篇分析RocketMQ如何实现高可用。

概述

Rocket的高可用包括NameServer和Broker的高可用,以及Producer和Consumer与他们的通信。
image.png

NameServer高可用

NameServer是一个轻量级的注册中心,提供命名服务。可以启动多个NameServer实现高可用。
RocketMQ的多个NameServer之间,没有任何关系,没有主从之分,不进行通信与数据同步。

Broker注册到NameServer

Broker循环注册多个NameServer。具体代码为BrokerOuterAPI#registerBrokerAll:

/**
 * 注册到多个 Namesrv
 *
 * @param clusterName 集群名
 * @param brokerAddr broker地址
 * @param brokerName brokerName
 * @param brokerId brokerId
 * @param haServerAddr 高可用服务地址。用于broker master节点给 slave节点同步数据
 * @param topicConfigWrapper topic配置信息
 * @param filterServerList filtersrv数组
 * @param oneway 是否oneway通信方式
 * @param timeoutMills 请求超时时间
 * @return 注册结果
 */
public RegisterBrokerResult registerBrokerAll(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final boolean oneway,
    final int timeoutMills) {
    RegisterBrokerResult registerBrokerResult = null;

    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null) {
        for (String namesrvAddr : nameServerAddressList) { 
            try {
                // 循环注册多个 Namesrv
                RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
                                                                  haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
                if (result != null) {
                    registerBrokerResult = result;
                }

                log.info("register broker to name server {} OK", namesrvAddr);
            } catch (Exception e) {
                log.warn("registerBroker Exception, {}", namesrvAddr, e);
            }
        }
    }

    return registerBrokerResult;
}

Producer、Consumer 访问 Namesrv

Producer和Consumer会从NameServer列表中选一个可用的进行通信。具体代码为NettyRemotingClient#getAndCreateNameserverChannel:

private Channel getAndCreateNameserverChannel() throws InterruptedException {
    // 返回已选择、可连接Namesrv
    String addr = this.namesrvAddrChoosed.get();
    if (addr != null) {
        ChannelWrapper cw = this.channelTables.get(addr);
        if (cw != null && cw.isOK()) {
            return cw.getChannel();
        }
    }
    //
    final List<String> addrList = this.namesrvAddrList.get();
    if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
        try {
            // 返回已选择、可连接的Namesrv
            addr = this.namesrvAddrChoosed.get();
            if (addr != null) {
                ChannelWrapper cw = this.channelTables.get(addr);
                if (cw != null && cw.isOK()) {
                    return cw.getChannel();
                }
            }
            // 从【Namesrv列表】中选择一个连接的返回
            if (addrList != null && !addrList.isEmpty()) {
                for (int i = 0; i < addrList.size(); i++) {
                    int index = this.namesrvIndex.incrementAndGet();
                    index = Math.abs(index);
                    index = index % addrList.size();
                    String newAddr = addrList.get(index);

                    this.namesrvAddrChoosed.set(newAddr);
                    Channel channelNew = this.createChannel(newAddr);
                    if (channelNew != null)
                        return channelNew;
                }
            }
        } catch (Exception e) {
            log.error("getAndCreateNameserverChannel: create name server channel exception", e);
        } finally {
            this.lockNamesrvChannel.unlock();
        }
    } else {
        log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
    }

    return null;
}

Broker高可用

Broker的高可用,需要通过分组+集群的方式。
Broker分组包括 1 个 Master节点 和 N 个 slave节点。其中Master节点提供读写服务,Slvae节点提供只读服务。
多个Broker分组组成一个集群。
对个Broker分组之间没有数据通信。

Broker主从配置

  • 每个分组,Master节点 不断发送新的 CommitLog 给 Slave节点。 Slave节点 不断上报本地的 CommitLog 已经同步到的位置给 Master节点。
  • Broker分组 与 Broker分组 之间没有任何关系,不进行通信与数据同步。
  • Slave 节点会从 Master 节点拉取消费进度、Topic 配置等等。

集群内,Master节点 有两种类型:

  • Master_SYNC:在 Producer 发送消息时,等待 Slave节点 存储完毕后再返回发送结果。
  • Master_ASYNC:在 Producer 发送消息时,不需等待 Slave节点 存储完毕,直接返回发送结果。

Master、Slave通信组件

Master和Slave节点通过如下组件进行通信:
image.png

  • Master节点
    • AcceptSocketService:接收 Slave节点 连接
    • HAConnection:
      • ReadSocketService:读来自 Slave节点 的数据
      • WriteSocketService:写到往 Slave节点 的数据
  • Slave节点
    • HAClient:对 Master节点 连接、读写数据

Master与Slave的通信协议

待补充,// todo

Slave节点逻辑

Slave的主要逻辑是:

  1. 接收从 Master 传输的 CommitLog 数据
  2. 上传 Master 自己本地的 CommitLog 已经同步物理位置

Slave的主要逻辑都在类HAClient.java中:
image.png
HAClient实现了Runable接口,实现了run方法,run方法源码为:

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            if (this.connectMaster()) {
                // 若到满足上报间隔,上报到Master进度
                if (this.isTimeToReportOffset()) {
                    boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                    if (!result) {
                        this.closeMaster();
                    }
                }

                this.selector.select(1000);

                // 处理读取事件
                boolean ok = this.processReadEvent();
                if (!ok) {
                    this.closeMaster();
                }

                // 若进度有变化,上报到Master进度
                if (!reportSlaveMaxOffsetPlus()) {
                    continue;
                }

                // Master过久未返回数据,关闭连接
                long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                    .getHaHousekeepingInterval()) {
                    log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                             + "] expired, " + interval);
                    this.closeMaster();
                    log.warn("HAClient, master not response some time, so close connection");
                }
            } else {
                this.waitForRunning(1000 * 5);
            }
        } catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.waitForRunning(1000 * 5);
        }
    }

    log.info(this.getServiceName() + " service end");
}

从源码可以看出,Slave先连接Master,连接成功后:

  1. 上报自己本地的 CommitLog 已经同步物理位置
  2. 处理读取事件,也即处理Master同步给自己的CommitLog信息

第一步上报自己本地的 CommitLog 已经同步物理位置调用的方法是HAClient#reportSlaveMaxOffset:

private final ByteBuffer reportOffset = ByteBuffer.allocate(8);

/**
 * 上报进度
 *
 * @param maxOffset 进度
 * @return 是否上报成功
 */
private boolean reportSlaveMaxOffset(final long maxOffset) {
    this.reportOffset.position(0);
    this.reportOffset.limit(8);
    this.reportOffset.putLong(maxOffset);
    this.reportOffset.position(0);
    this.reportOffset.limit(8);

    for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
        try {
            this.socketChannel.write(this.reportOffset);
        } catch (IOException e) {
            log.error(this.getServiceName()
                      + "reportSlaveMaxOffset this.socketChannel.write exception", e);
            return false;
        }
    }

    return !this.reportOffset.hasRemaining();
}

做的操作就是更新缓冲区的数据,并刷新缓冲区。

第二步处理读取事件调用的方法为HAClient#processReadEvent:

private boolean processReadEvent() {
    int readSizeZeroTimes = 0;
    while (this.byteBufferRead.hasRemaining()) {
        try {
            int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();

                readSizeZeroTimes = 0;
                // 处理读取事件
                boolean result = this.dispatchReadRequest();
                if (!result) {
                    log.error("HAClient, dispatchReadRequest error");
                    return false;
                }
            } else if (readSize == 0) {
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                // TODO ERROR
                log.info("HAClient, processReadEvent read socket < 0");
                return false;
            }
        } catch (IOException e) {
            log.info("HAClient, processReadEvent read socket exception", e);
            return false;
        }
    }

    return true;
}

此方法是判断缓冲区是否有新数据,有的话出发读取函数HAClient#dispatchReadRequest:

/**
 * 读取Master传输的CommitLog数据,并返回是否异常
 * 如果读取到数据,写入CommitLog
 * 异常原因:
 *   1. Master传输来的数据offset 不等于 Slave的CommitLog数据最大offset
 *   2. 上报到Master进度失败
 *
 * @return 是否异常
 */
private boolean dispatchReadRequest() {
    final int msgHeaderSize = 8 + 4; // phyoffset + size
    int readSocketPos = this.byteBufferRead.position();

    while (true) {
        // 读取到请求
        int diff = this.byteBufferRead.position() - this.dispatchPostion;
        if (diff >= msgHeaderSize) {
            // 读取masterPhyOffset、bodySize。使用dispatchPostion的原因是:处理数据“粘包”导致数据读取不完整。
            long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
            int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
            // 校验 Master传输来的数据offset 是否和 Slave的CommitLog数据最大offset 是否相同。
            long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
            if (slavePhyOffset != 0) {
                if (slavePhyOffset != masterPhyOffset) {
                    log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                              + slavePhyOffset + " MASTER: " + masterPhyOffset);
                    return false;
                }
            }
            // 读取到消息
            if (diff >= (msgHeaderSize + bodySize)) {
                // 写入CommitLog
                byte[] bodyData = new byte[bodySize];
                this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
                this.byteBufferRead.get(bodyData);
                HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
                // 设置处理到的位置
                this.byteBufferRead.position(readSocketPos);
                this.dispatchPostion += msgHeaderSize + bodySize;
                // 上报到Master进度
                if (!reportSlaveMaxOffsetPlus()) {
                    return false;
                }
                // 继续循环
                continue;
            }
        }

        // 空间写满,重新分配空间
        if (!this.byteBufferRead.hasRemaining()) {
            this.reallocateByteBuffer();
        }

        break;
    }

    return true;
}

Master节点逻辑

回顾下Master节点组件的功能:

  • Master节点
    • AcceptSocketService:接收 Slave节点 连接
    • HAConnection:
      • ReadSocketService:读来自 Slave节点 的数据
      • WriteSocketService:写到往 Slave节点 的数据

先来看ReadSocketService:
image.png
ReadSocketService实现了Runable接口,实现了run方法:

@Override
public void run() {
    HAConnection.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            this.selector.select(1000);
            boolean ok = this.processReadEvent();
            if (!ok) {
                HAConnection.log.error("processReadEvent error");
                break;
            }

            // 过长时间无心跳,断开连接
            long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
            if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
                break;
            }
        } catch (Exception e) {
            HAConnection.log.error(this.getServiceName() + " service has exception.", e);
            break;
        }
    }

此方法调用ReadSocketService#processReadEvent方法:

private boolean processReadEvent() {
    int readSizeZeroTimes = 0;

    // 清空byteBufferRead
    if (!this.byteBufferRead.hasRemaining()) {
        this.byteBufferRead.flip();
        this.processPostion = 0;
    }

    while (this.byteBufferRead.hasRemaining()) {
        try {
            int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                readSizeZeroTimes = 0;

                // 设置最后读取时间
                this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();

                if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
                    // 读取Slave 请求来的CommitLog的最大位置
                    int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                    long readOffset = this.byteBufferRead.getLong(pos - 8);
                    this.processPostion = pos;

                    // 设置Slave CommitLog的最大位置
                    HAConnection.this.slaveAckOffset = readOffset;

                    // 设置Slave 第一次请求的位置
                    if (HAConnection.this.slaveRequestOffset < 0) {
                        HAConnection.this.slaveRequestOffset = readOffset;
                        log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                    }

                    // 通知目前Slave进度。主要用于Master节点为同步类型的。
                    HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                }
            } else if (readSize == 0) {
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                return false;
            }
        } catch (IOException e) {
            log.error("processReadEvent exception", e);
            return false;
        }
    }

    return true;
}
}

主要逻辑就是读取Slave 请求来的CommitLog的最大位置,并设置Slave CommitLog的最大位置。

下面看WriteSocketService的逻辑:
image.png
同样是实现了Runable接口,实现了run方法,源码:

@Override
public void run() {
    HAConnection.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            this.selector.select(1000);

            // 未获得Slave读取进度请求,sleep等待。
            if (-1 == HAConnection.this.slaveRequestOffset) {
                Thread.sleep(10);
                continue;
            }

            // 计算初始化nextTransferFromWhere
            if (-1 == this.nextTransferFromWhere) {
                if (0 == HAConnection.this.slaveRequestOffset) {
                    long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                    masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMapedFileSizeCommitLog());
                    if (masterOffset < 0) {
                        masterOffset = 0;
                    }

                    this.nextTransferFromWhere = masterOffset;
                } else {
                    this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
                }

                log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                         + "], and slave request " + HAConnection.this.slaveRequestOffset);
            }

            if (this.lastWriteOver) {
                long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) { // 心跳

                    // Build Header
                    this.byteBufferHeader.position(0);
                    this.byteBufferHeader.limit(headerSize);
                    this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                    this.byteBufferHeader.putInt(0);
                    this.byteBufferHeader.flip();

                    this.lastWriteOver = this.transferData();
                    if (!this.lastWriteOver)
                        continue;
                }
            } else { // 未传输完成,继续传输
                this.lastWriteOver = this.transferData();
                if (!this.lastWriteOver)
                    continue;
            }

            // 选择新的CommitLog内容进行传输
            SelectMappedBufferResult selectResult =
            HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
            if (selectResult != null) {
                int size = selectResult.getSize();
                if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                    size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                }

                long thisOffset = this.nextTransferFromWhere;
                this.nextTransferFromWhere += size;

                selectResult.getByteBuffer().limit(size);
                this.selectMappedBufferResult = selectResult;

                // Build Header
                this.byteBufferHeader.position(0);
                this.byteBufferHeader.limit(headerSize);
                this.byteBufferHeader.putLong(thisOffset);
                this.byteBufferHeader.putInt(size);
                this.byteBufferHeader.flip();

                this.lastWriteOver = this.transferData();
            } else { // 没新的消息,挂起等待
                HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
            }
        } catch (Exception e) {

            HAConnection.log.error(this.getServiceName() + " service has exception.", e);
            break;
        }
    }

    // 断开连接 & 暂停写线程 & 暂停读线程 & 释放CommitLog
    if (this.selectMappedBufferResult != null) {
        this.selectMappedBufferResult.release();
    }

    this.makeStop();

    readSocketService.makeStop();

    haService.removeConnection(HAConnection.this);

    SelectionKey sk = this.socketChannel.keyFor(this.selector);
    if (sk != null) {
        sk.cancel();
    }

    try {
        this.selector.close();
        this.socketChannel.close();
    } catch (IOException e) {
        HAConnection.log.error("", e);
    }

    HAConnection.log.info(this.getServiceName() + " service end");
}

传输数据的方法为WriteSocketService#transferData:

/**
 * 传输数据
 */
private boolean transferData() throws Exception {
    int writeSizeZeroTimes = 0;
    // Write Header
    while (this.byteBufferHeader.hasRemaining()) {
        int writeSize = this.socketChannel.write(this.byteBufferHeader);
        if (writeSize > 0) {
            writeSizeZeroTimes = 0;
            this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
        } else if (writeSize == 0) {
            if (++writeSizeZeroTimes >= 3) {
                break;
            }
        } else {
            throw new Exception("ha master write header error < 0");
        }
    }

    if (null == this.selectMappedBufferResult) {
        return !this.byteBufferHeader.hasRemaining();
    }

    writeSizeZeroTimes = 0;

    // Write Body
    if (!this.byteBufferHeader.hasRemaining()) {
        while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
            int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
            if (writeSize > 0) {
                writeSizeZeroTimes = 0;
                this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
            } else if (writeSize == 0) {
                if (++writeSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                throw new Exception("ha master write body error < 0");
            }
        }
    }

    boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();

    if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
        this.selectMappedBufferResult.release();
        this.selectMappedBufferResult = null;
    }

    return result;
}

Master_SYNC模式

此模式下,在Producer发消息时,Master节点会等待Slave节点存储完毕再返回给Producer发送结果,源码位置在CommitLog#putMessage方法:

/**
 * 添加消息,返回消息结果
 *
 * @param msg 消息
 * @return 结果
 */
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // ...省略部分代码

    // Synchronous write double 如果是同步Master,同步到从节点
    if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
        HAService service = this.defaultMessageStore.getHaService();
        if (msg.isWaitStoreMsgOK()) {
            // Determine whether to wait
            if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                if (null == request) {
                    request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                }
                service.putRequest(request);

                // 唤醒WriteSocketService
                service.getWaitNotifyObject().wakeupAll();

                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
                        + msg.getTags() + " client address: " + msg.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                }
            }
            // Slave problem
            else {
                // Tell the producer, slave not available
                putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
            }
        }
    }

    return putMessageResult;
}

上述方法第22行唤醒 WriteSocketService

  • WriteSocketService 挂起等待新消息结束,Master 传输 Slave 新的 CommitLog 数据
  • Slave 收到数据后,立即上报最新的 CommitLog 同步进度到 Master。ReadSocketService 唤醒第 24 行:request#waitForFlush(…)

Producer发消息

Producer 发消息时,从 Broker集群 的所有队列中进行选择,相关代码在DefaultMQProducerImpl#sendDefaultImpl中:

/**
 * 发送消息。
 * 1. 获取消息路由信息
 * 2. 选择要发送到的消息队列
 * 3. 执行消息发送核心方法
 * 4. 对发送结果进行封装返回
 *
 * @param msg 消息
 * @param communicationMode 通信模式
 * @param sendCallback 发送回调
 * @param timeout 发送消息请求超时时间
 * @return 发送结果
 * @throws MQClientException 当Client发生异常
 * @throws RemotingException 当请求发生异常
 * @throws MQBrokerException 当Broker发生异常
 * @throws InterruptedException 当线程被打断
 */
private SendResult sendDefaultImpl(//
    Message msg, //
    final CommunicationMode communicationMode, //
    final SendCallback sendCallback, //
    final long timeout//
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // ...省略部分代码
    // 获取 Topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null; // 最后选择消息要发送到的队列
        Exception exception = null;
        SendResult sendResult = null; // 最后一次发送结果
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步多次调用
        int times = 0; // 第几次发送
        String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名
        // 循环调用发送消息,直到成功
        for (; times < timesTotal; times++) {
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            @SuppressWarnings("SpellCheckingInspection")
            MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 选择消息要发送到的队列
            if (tmpmq != null) {
                mq = tmpmq;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    // 调用发送消息核心方法
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                    endTimestamp = System.currentTimeMillis();
                    // 更新Broker可用性信息
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试
                                    continue;
                                }
                            }
                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) { // 打印异常,更新Broker可用性信息,更新继续循环
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQClientException e) { // 打印异常,更新Broker可用性信息,继续循环
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    switch (e.getResponseCode()) {
                        // 如下异常continue,进行发送消息重试
                        case ResponseCode.TOPIC_NOT_EXIST:
                        case ResponseCode.SERVICE_NOT_AVAILABLE:
                        case ResponseCode.SYSTEM_ERROR:
                        case ResponseCode.NO_PERMISSION:
                        case ResponseCode.NO_BUYER_ID:
                        case ResponseCode.NOT_IN_CURRENT_UNIT:
                            continue;
                        // 如果有发送结果,进行返回,否则,抛出异常;
                        default:
                            if (sendResult != null) {
                                return sendResult;
                            }
                            throw e;
                    }
                } catch (InterruptedException e) {
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    throw e;
                }
            } else {
                break;
            }
        }
        // 返回发送结果
        if (sendResult != null) {
            return sendResult;
        }
        // 根据不同情况,抛出不同的异常
        String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(), Arrays.toString(brokersSent)) + FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
        MQClientException mqClientException = new MQClientException(info, exception);
        if (exception instanceof MQBrokerException) {
            mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
        } else if (exception instanceof RemotingConnectException) {
            mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
        } else if (exception instanceof RemotingTimeoutException) {
            mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
        } else if (exception instanceof MQClientException) {
            mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
        }
        throw mqClientException;
    }
    // Namesrv找不到异常
    List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
    if (null == nsList || nsList.isEmpty()) {
        throw new MQClientException(
            "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
    }
    // 消息路由找不到异常
    throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

Consumer消费消息

Consumer 消费消息时,也从 Broker集群 的所有队列中进行选择。

总结

HA集群.png
RocketMQ实现高可用的架构如上图。

  1. Broker通过一个Master和0-N个Slave实现一个分组
  2. 多个Broker分组,行程一个集群,每个Broker分组之间互相没有联系,不进行通信
  3. pro

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1410384.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何配置Tomcat服务环境并实现无公网ip访问本地站点

文章目录 前言1.本地Tomcat网页搭建1.1 Tomcat安装1.2 配置环境变量1.3 环境配置1.4 Tomcat运行测试1.5 Cpolar安装和注册 2.本地网页发布2.1.Cpolar云端设置2.2 Cpolar本地设置 3.公网访问测试4.结语 前言 Tomcat作为一个轻量级的服务器&#xff0c;不仅名字很有趣&#xff0…

前出深入-机器学习

文章目录 一、K近邻算法1.1 先画一个散列图1.2 使用K最近算法建模拟合数据1.3 进行预测1.4 K最近邻算法处理多元分类问题1.5 K最近邻算法用于回归分析1.6 K最近邻算法项目实战-酒的分类1.6.1 对数据进行分析1.6.2 生成训练数据集和测试数据集1.6.3 使用K最近邻算法对数据进行建…

python3去除图片中的文字水印

声明&#xff1a;本文为python技术分享&#xff0c;仅供学习使用。 请勿用于商业用途&#xff01;&#xff01;&#xff01; 请勿用于商业用途&#xff01;&#xff01;&#xff01; 请勿用于商业用途&#xff01;&#xff01;&#xff01; 以下为代码&#xff1a; import …

LeetCode、875. 爱吃香蕉的珂珂【中等,最小速度二分】

文章目录 前言LeetCode、875. 爱吃香蕉的珂珂【中等&#xff0c;最小速度二分】题目及分类思路分析及代码实现代码优化 资料获取 前言 博主介绍&#xff1a;✌目前全网粉丝2W&#xff0c;csdn博客专家、Java领域优质创作者&#xff0c;博客之星、阿里云平台优质作者、专注于Ja…

多维时序 | Matlab实现EVO-TCN-Multihead-Attention能量谷算法优化时间卷积网络结合多头注意力机制多变量时间序列预测

多维时序 | Matlab实现EVO-TCN-Multihead-Attention能量谷算法优化时间卷积网络结合多头注意力机制多变量时间序列预测 目录 多维时序 | Matlab实现EVO-TCN-Multihead-Attention能量谷算法优化时间卷积网络结合多头注意力机制多变量时间序列预测效果一览基本介绍程序设计参考资…

部署TOMCAT详解

目录 一、Tomcat概述 1.1Tomcat简介 1.2、Tomcat历史 1.3Tomcat官网 二、部署单实例Tomcat 1.下载Tomcat包 2. 解压Tomcat包 3.配置环境变量 4.刷新环境变量 5.查看tomcat是否安装成功 6.启动Tomcat 三、Tomcat目录介绍 1、tomcat主目录介绍 2.webapps目录介绍 3…

Unity——八叉树的原理与实现

八叉树原理 八叉树&#xff08;Octree&#xff09;是一种用于在三维空间中进行空间分割的数据结构。它将三维空间递归地划分为八个子空间&#xff0c;每个子空间对应于一个八叉树节点。这种分割方式可以有效地组织和管理场景中的对象&#xff0c;提高检索效率&#xff0c;特别…

会计等式与会计事项

目录 一. 会计等式二. 会计事项 \quad 一. 会计等式 \quad 最后利润是归所有者权益所有, 就回到了原有等式 \quad \quad \quad 二. 会计事项 \quad 会计事项: 引起会计要素增减变化的经济业务。 会计六要素: 资产 负债 所有者权益 收入 费用 利润 任何会计事项都不会破坏会计…

C++ 数论相关题目(欧拉函数、筛法求欧拉函数)

1、欧拉函数 给定 n 个正整数 ai &#xff0c;请你求出每个数的欧拉函数。 欧拉函数的定义 1∼N 中与 N 互质的数的个数被称为欧拉函数&#xff0c;记为 ϕ(N) 。 若在算数基本定理中&#xff0c;Npa11pa22…pamm &#xff0c;则&#xff1a; ϕ(N) Np1−1p1p2−1p2…pm−1p…

人工智能时代:让AIGC成为你的外部智慧源(文末送书)

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;数据结构、网络奇遇记 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 &#x1f4cb;前言一. 什么是AIGC?二. AIGC如何运作&#xff1f;2.1 步骤一&#xff1a;收集数据2.…

day32WEB 攻防-通用漏洞文件上传二次渲染.htaccess变异免杀

本章节知识点&#xff1a; 1 、文件上传 - 二次渲染 2 、文件上传 - 简单免杀变异 3 、文件上传 -.htaccess 妙用 4 、文件上传 -PHP 语言特性 前置知识&#xff1a; 后门代码需要用特定格式后缀解析&#xff0c;不能以图片后缀解析脚本后门代码 ( 解析漏洞除外 ) 如&…

Aleo测试网回顾-测试网期间共释放了多少积分

上一篇我们整理了Aleo的详细项目介绍&#xff0c;Aleo项目详细介绍-一个兼顾隐私和可编程性的隐私公链-CSDN博客 接下来&#xff0c;让我们盘点下测试网期间的积分释放情况&#xff0c;测试网期间的奖励积分也将是Aleo主网上线后的抛压来源。测试网期间共计释放了4000万的积分…

2024转行程序员的请注意:均月薪在40-70k

前言 2023年&#xff0c;对大多数行业来说都是不太好过的一年。 对程序员来说也是如此&#xff0c;很多粉丝朋友都在说android工作特别难找&#xff0c;一个岗位都是几千份简历…大家心里都是特别的焦虑&#xff0c;本以为2024年就业情况会有好转&#xff0c;但实际上并非如此…

解决Windows系统本地端口被占用的问题

一、解决Windows系统本地端口被占用的问题&#xff0c;首先我们要在虚拟机上人为的占用本地端口 二、占用端口方法&#xff1a;以管理员身份运行cmd;输入net stop http;如果提示是否真的需要停止这些服务,则选择“Y”;完成后输入:sc config http startdisabled 弹出上图内容则成…

UE4运用C++和框架开发坦克大战教程笔记(十六)(第49~50集)

UE4运用C和框架开发坦克大战教程笔记&#xff08;十六&#xff09;&#xff08;第49~50集&#xff09; 49. 创建多个资源对象补全调用链并测试生成多个同种类名资源对象实现创建多个同资源名的对象实例 50. 资源加载系统测试补全调用链并测试生成多个同名资源对象测试生成 Widg…

【mongoDB】集合的创建和删除

目录 1.集合的创建 2. 查看所有集合 3.删除集合 1.集合的创建 格式&#xff1a; db.createCollection ( name ) 例如创建一个名为 bbb 的集合 还可以通过传递一个选项对象来指定集合的属性&#xff0c;例如最大文档的大小&#xff0c;索引选项等 例如 这样创建了一个名为 cc…

[极客大挑战 2019]BabySQL1

发现union select被过滤了&#xff0c;双写绕过 or、from被过滤 where被过滤 在b4bysql中找到flag

【pdf技巧】pdf无法编辑的原因是什么?如何编辑pdf?

打开PDF文件之后发现没有办法编辑PDF文件&#xff0c;都有哪些原因呢&#xff1f; 首先我们可以考虑一下&#xff0c;PDF文件中的内容是否是图片&#xff0c;如果确认是图片文件&#xff0c;那么我们想要编辑&#xff0c;就可以先使用PDF编辑器中的OCR扫描功能&#xff0c;将图…

【c语言】三子棋

前言&#xff1a; 三子棋是一种民间传统游戏&#xff0c;又叫九宫棋、圈圈叉叉棋、一条龙、井字棋等。游戏规则是双方对战&#xff0c;双方依次在9宫格棋盘上摆放棋子&#xff0c;率先将自己的三个棋子走成一条线就视为胜利。但因棋盘太小&#xff0c;三子棋在很多时候会出现和…

springboot实现aop

目录 AOP(术语)引入依赖实现步骤测试验证感谢阅读 AOP(术语) 连接点 类里面哪些方法可以增强&#xff0c;这些点被称为连接点 切入点 实际被真正增强的方法 通知&#xff08;增强&#xff09; 实际增强的逻辑部分称为通知&#xff08;增强&#xff09; 通知&#xff08;增强&…