RocketMQ5.0.0消息存储<三>_消息转发与恢复机制

news2024/11/15 5:34:45

目录

一、消息转发

1. ReputMessageService线程初始化

2. 消息转发更新ConsumeQueue

3. 消息转发更新IndexFile

二、恢复机制

1. Broker加载存储文件

2. Broker正常退出的文件恢复

3. Broker异常退出的文件恢复

三、参考资料


一、消息转发

        消息消费队列文件、索引文件都是基于CommitLog文件构建的,当消息Commit操作(将MappedFile.writeBuffer数据提交到该FileChannel文件通道内,即:提交到文件内存映射)时,ConsumeQueue、IndexFile需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。

        RocketMQ通过开启一个线程ReputMessageServcie来实时转发CommitLog文件更新事件,通过各自的任务处理器根据转发的消息及时更新ConsumeQueue、IndexFile文件。如下所示是的UML图。org.apache.rocketmq.store.CommitLogDispatcher是转发的顶层接口。需要注意的是消息转发只是将消息写入ConsumeQueue、IndexFile文件内存映射中,不是写入磁盘

1. ReputMessageService线程初始化

        Broker启动时会启动ReputMessageService线程,并初始化关键参数reputFromOffset(开始转发消息的物理偏移量)。如果允许重复转发,reputFromOffset设置为CommitLog的提交指针;如果不允许重复转发,reputFromOffset设置为Commitlog的内存中最大偏移量。如下代码所示,org.apache.rocketmq.store.DefaultMessageStore#start。

/**
 * broker启动时,消息存储线程
 * BrokerController#startBasicService()
 * @throws Exception
 */
@Override
public void start() throws Exception {
    ......

    /*
        启动ReputMessageService服务线程
        重要参数reputFromOffset:从哪个物理偏移量转发消息给ConsumeQueue、IndexFile
                            允许重复转发:提交指针
                          不允许重复转发:最大偏移量
        启动后,调用{@link ReputMessageService}  的run()
     */
    // 允许重复转发:提交指针
    if (this.getMessageStoreConfig().isDuplicationEnable()) {
        this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
    }
    // 不允许重复转发:最大偏移量
    else {
        // It is [recover]'s responsibility to fully dispatch the commit log data before the max offset of commit log.
        this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
    }
    // DefaultMessageStore.ReputMessageService.run()
    this.reputMessageService.start();

    ......
}

        下图所示是org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run方法的调用链。其核心方法是org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput,代码如下。注意转发消息DefaultMessageStore#doDispatch(DispatchRequest)方法并没有返回结果,而是通过重复写入次数控制,详细见下小节

/**
 * 消息提交到Commitlog时消息转发,构建ConsumeQueue、index文件服务 的核心方法
 * step1:消息转发偏移量 > 最小偏移量时,则最小偏移量赋值给消息转发偏移量
 * step2:消息转发偏移量 <= 最小偏移量时,获取转发偏移量开始的全部有效数据
 * step3:循环转发每条消息,获取每条消息的转发请求对象{@link DispatchRequest}
 * step4:消息解析成功后,转发消息{@link DefaultMessageStore#doDispatch(DispatchRequest)}
 *        注意:转发消息时遍历LinkedList<CommitLogDispatcher> dispatcherList,集合中有消息队列、索引文件执行转发的实现类
 * step5:更新消息转发偏移量
 */
private void doReput() {
    // 消息转发偏移量 > 最小偏移量时,则最小偏移量赋值给消息转发偏移量
    if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
        LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
                this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
        this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    }
    // 消息转发偏移量 <= 最小偏移量时,则循环转发每条消息
    for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

        // 获取从消息转发偏移量开始的全部有效数据
        SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
        if (result != null) {
            try {
                this.reputFromOffset = result.getStartOffset();

                // 循环转发每条消息
                for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
                    // 获取每条消息
                    DispatchRequest dispatchRequest =
                            DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false, false);
                    // 消息大小
                    int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                    // 读取的消息是否超出提交偏移量
                    if (reputFromOffset + size > DefaultMessageStore.this.getConfirmOffset()) {
                        doNext = false;
                        break;
                    }

                    // 解析消息成功
                    if (dispatchRequest.isSuccess()) {
                        if (size > 0) {
                            // 消息转发
                            DefaultMessageStore.this.doDispatch(dispatchRequest);

                            // 开启长轮询时,唤醒PullRequestHoldService线程,执行被挂起的拉取消息请求
                            if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
                                    && DefaultMessageStore.this.messageArrivingListener != null) {
                                // 唤醒被挂起的拉取消息请求,再次拉取消息
                                DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                        dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                        dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                        dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                // 多个消息队列,唤醒被挂起的拉取消息请求,再次拉取消息
                                notifyMessageArrive4MultiQueue(dispatchRequest);
                            }

                            this.reputFromOffset += size;
                            readSize += size;
                            if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
                                    DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                DefaultMessageStore.this.storeStatsService
                                        .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
                                DefaultMessageStore.this.storeStatsService
                                        .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                        .add(dispatchRequest.getMsgSize());
                            }
                        } else if (size == 0) {
                            this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                            readSize = result.getSize();
                        }
                    }
                    // 解析消息失败
                    else if (!dispatchRequest.isSuccess()) {

                        if (size > 0) {
                            LOGGER.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                            this.reputFromOffset += size;
                        } else {
                            doNext = false;
                            // If user open the dledger pattern or the broker is master node,
                            // it will not ignore the exception and fix the reputFromOffset variable
                            if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                                    DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                LOGGER.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                        this.reputFromOffset);
                                this.reputFromOffset += result.getSize() - readSize;
                            }
                        }
                    }
                }
            } finally {
                result.release();
            }
        } else {
            doNext = false;
        }
    }
}

2. 消息转发更新ConsumeQueue

        org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue是转发任务执行更新到消费队列实现类,CommitLogDispatcherBuildConsumeQueue#dispatch的执行转发消息方法,如下所示是调用链。

        org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper是执行转发核心方法。其中:写入消费队列最大重试次数maxRetries,默认30次,超出该值则打印异常日志;写入消费队列,只是写入到消费队列文件内存映射,并没有执行刷盘

/**
 * 转发消息到ConsumeQueue的核心方法
 * step1:写入消费队列最大重试次数maxRetries,默认30次
 * step2:判定消费队列是否允许写入
 * step3:写入消费队列,只是写入到消费队列文件内存映射,不是写入磁盘
 * step4:超出maxRetries,打印异常日志
 */
@Override
public void putMessagePositionInfoWrapper(DispatchRequest request) {
    // 写入消费队列最大重试次数
    final int maxRetries = 30;
    // 是否能写
    boolean canWrite = this.messageStore.getRunningFlags().isCQWriteable();
    for (int i = 0; i < maxRetries && canWrite; i++) {
        long tagsCode = request.getTagsCode();
        if (isExtWriteEnable()) {
            ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
            cqExtUnit.setFilterBitMap(request.getBitMap());
            cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
            cqExtUnit.setTagsCode(request.getTagsCode());

            long extAddr = this.consumeQueueExt.put(cqExtUnit);
            if (isExtAddr(extAddr)) {
                tagsCode = extAddr;
            } else {
                log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                    topic, queueId, request.getCommitLogOffset());
            }
        }
        // 写入消费队列
        boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
            request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
        if (result) {
            if (this.messageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
                this.messageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
                this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
            }
            this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
            if (checkMultiDispatchQueue(request)) {
                multiDispatchLmqQueue(request, maxRetries);
            }
            return;
        } else {
            // XXX: warn and notify me
            log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                + " failed, retry " + i + " times");

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log.warn("", e);
            }
        }
    }

    // XXX: warn and notify me
    log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
    this.messageStore.getRunningFlags().makeLogicsQueueError();
}

        org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo是消息写入消费队列的文件内存映射的核心方法。代码如下所示。

/**
 * 写入消费队列
 * 注意:此次写入消息,只是追加到映射内存,固定刷盘方式:异步
 * @param offset 消息偏移量
 * @param size 消息大小
 * @param tagsCode 消息Tag的哈希码
 * @param cqOffset 消费队列的偏移量
 * @return
 */
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset) {

    if (offset + size <= this.maxPhysicOffset) {
        log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
        return true;
    }

    // 组装消费队列条目:20字节 = 8字节的Commitlog offset + 4字节的消息大小 + 8字节的Tag的哈希码
    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    this.byteBufferIndex.putLong(offset);
    this.byteBufferIndex.putInt(size);
    this.byteBufferIndex.putLong(tagsCode);

    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

    /*
     * 更新消费队列参数
     */
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {

        if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
            this.minLogicOffset = expectLogicOffset;
            this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
            this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
            this.fillPreBlank(mappedFile, expectLogicOffset);
            log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                + mappedFile.getWrotePosition());
        }

        if (cqOffset != 0) {
            long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

            if (expectLogicOffset < currentLogicOffset) {
                log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                return true;
            }

            if (expectLogicOffset != currentLogicOffset) {
                LOG_ERROR.warn(
                    "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset,
                    currentLogicOffset,
                    this.topic,
                    this.queueId,
                    expectLogicOffset - currentLogicOffset
                );
            }
        }
        this.maxPhysicOffset = offset + size;

        // 追加消息到映射,并没有刷盘(固定刷盘方式:异步)
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}

3. 消息转发更新IndexFile

        org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildIndex是转发任务执行更新到索引文件实现类,CommitLogDispatcherBuildIndex#dispatch的执行转发消息方法,如下所示是调用链。

        org.apache.rocketmq.store.index.IndexService#buildIndex是构建索引的核心方法,如下代码所示。RocketMQ支持同一个消息建立多个索引,多个索引键空格分开,如:"key1 key2"

/**
 * 构建消息索引文件
 * 注意:索引追加到映射内存
 * step1:获取或创建索引文件
 * step2:获取索引文件最大偏移量
 * step3:判断消息的偏移量 < 索引文件最大偏移量,说明索引文件已构建(数据重复),则返回
 * step4:消息唯一键不为空,则首先添加索引(唯一键索引)
 * step5:消息key不为空,MQ支持同一消息多个索引,用空格隔开
 * step6:
 * @param req 消息信息
 */
public void buildIndex(DispatchRequest req) {
    // 获取或创建索引文件
    IndexFile indexFile = retryGetAndCreateIndexFile();
    if (indexFile != null) {
        // 索引文件最大偏移量
        long endPhyOffset = indexFile.getEndPhyOffset();
        DispatchRequest msg = req;
        String topic = msg.getTopic();
        String keys = msg.getKeys();
        // 消息的偏移量 < 索引文件最大偏移量,说明索引文件已构建(数据重复),则返回
        if (msg.getCommitLogOffset() < endPhyOffset) {
            return;
        }

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                break;
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                return;
        }

        // 消息唯一键不为空,则首先添加索引(唯一键索引)
        if (req.getUniqKey() != null) {
            // 写入索引文件缓冲
            indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey())); // buildKey():格式:主题#key
            if (indexFile == null) {
                LOGGER.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                return;
            }
        }

        // 消息key不为空,MQ支持同一消息多个索引,用空格隔开
        if (keys != null && keys.length() > 0) {
            String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
            for (int i = 0; i < keyset.length; i++) {
                String key = keyset[i];
                if (key.length() > 0) {
                    indexFile = putKey(indexFile, msg, buildKey(topic, key));
                    if (indexFile == null) {
                        LOGGER.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                        return;
                    }
                }
            }
        }
    } else {
        LOGGER.error("build index error, stop building index");
    }
}

        org.apache.rocketmq.store.index.IndexFile#putKey是索引写入IndexFile文件内存映射中的核心方法,如下代码所示。

/**
 * 写入IndexFile文件
 * 追加到映射内存
 * @param key 消息的key
 * @param phyOffset 消息的物理偏移量
 * @param storeTimestamp 消息的存储时间戳
 * @return true写入成功;false写入失败或已满
 */
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    // 当前条目数量是否大于允许最大条目数量
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        // key的hash值
        int keyHash = indexKeyHashMethod(key);
        // key的hash槽的下标
        int slotPos = keyHash % this.hashSlotNum;
        // hash槽的物理地址
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        try {

            /*
             * 相同hash,存储多个数据(槽中是否有数据)
             */
            // 获取hash槽存储的数据(最新条目的地址)
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }

            // 计算待存储条目 与 第一条消息时间戳 的差值,并转换成秒
            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

            timeDiff = timeDiff / 1000;

            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }

            // 存储位置absIndexPos = 头部40字节 + hash槽数量 * 单个槽的字节大小(4字节) + 当前存储条目数量 * 单个条目字节大小(20字节)
            int absIndexPos =
                IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                    + this.indexHeader.getIndexCount() * indexSize;
            // 组装条目 = hash码 + Commitlog偏移量 + 存储时间差值 + 当前hash槽的值(待存储条目的前一个条目的偏移量)
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
            // 替换hash槽的值(待存储条目的偏移量absIndexPos)
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

            // 更新文件头信息
            if (this.indexHeader.getIndexCount() <= 1) {
                this.indexHeader.setBeginPhyOffset(phyOffset);
                this.indexHeader.setBeginTimestamp(storeTimestamp);
            }

            if (invalidIndex == slotValue) {
                this.indexHeader.incHashSlotCount();
            }
            this.indexHeader.incIndexCount();
            this.indexHeader.setEndPhyOffset(phyOffset);
            this.indexHeader.setEndTimestamp(storeTimestamp);

            return true;
        } catch (Exception e) {
            log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
        }
    } else {
        log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
            + "; index max num = " + this.indexNum);
    }

    // 当前条目数量 >= 允许最大数量,则为false
    return false;
}

二、恢复机制

        RocketMQ存储异步(默认)或同步提交到Commitlog文件内存映射,再异步(默认)或同步刷盘到磁盘。当消息写入Commitlog文件内存映射时,然后异步生成转发任务更新ConsumeQueue、Index 文件。如果消息成功存储到Commitlog文件中,转发任务未成功执行,此时Broker由于某个原因宕机,导致Commitlog、ConsumeQueue、IndexFile文件数据不一致。如果没有人工修复,则导致会有一部分消息即便在Commitlog文件中,但由于并没有转发到ConsumeQueue,这部分消息将永远不会被消费者消费。

        RocketMQ是如何使Commitlog、ConsumeQueue达到最终一致性的呢?RocketMQ的存储文件的加载流程中,判断上一次退出是否正常,实现机制:Broker启动时判断${ROCKETMQ_HOME}/store/abort文件是否存在,正常退出时JVM钩子函数删除abort文件,若存在abort文件,说明Broker是异常退出。根据Broker是否正常退出,来采取不同的恢复机制。

        文件恢复主要完成flushedPosition、committedWhere指针设置,消息消费队列最大偏移量加载到内存,并删除flushedPosition之后所有的文件。 如果Broker异常退出,在文件恢复过程中,RocketMQ会将最后一个有效文件中的所有消息重新转发到消息消费队列与索引文件,确保不丢失消息,但同时会带来消息重复的问题。纵观RocktMQ的整体设计思想,RocketMQ保证消息不丢失但不保证消息不会重复消费,故消息消费业务方需要实现消息消费的幕等设计

1. Broker加载存储文件

        org.apache.rocketmq.store.DefaultMessageStore#load是Broker启动时加载存储文件Commitlog、ConsumeQueue、IndexFile、Checkpoint文件的核心方法,其调用链、代码如下所示。

/**
 * broker控制器启动初始化时,加载消息存储:如Commitlog文件、ConsumerQueue文件、IndexFile文件等
 * step1:判断broker上次是否正常退出,是否存在${ROCKETMQ_HOME}/store/abort文件,存在:Broker退出异常
 * step2:加载Commitlog文件、ConsumerQueue文件、Checkpoint文件、IndexFile索引文件
 * step3:恢复文件,根据Broker是否退出异常,选择不同的恢复策略
 *        {@link DefaultMessageStore#recover(boolean)}
 */
@Override
public boolean load() {
    boolean result = true;

    try {
        // 判断broker上次是否正常退出
        boolean lastExitOK = !this.isTempFileExist();
        LOGGER.info("last shutdown {}, root dir: {}", lastExitOK ? "normally" : "abnormally", messageStoreConfig.getStorePathRootDir());

        // load Commit Log 加载消息存储文件
        result = result && this.commitLog.load();

        // load Consume Queue 加载消息消费队列文件
        result = result && this.consumeQueueStore.load();

        if (result) {
            // 加载检查点Checkpoint文件
            this.storeCheckpoint =
                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
            // 获取检查点中的刷盘时间点
            this.masterFlushedOffset = this.storeCheckpoint.getMasterFlushedOffset();
            // 加载IndexFile索引文件
            this.indexService.load(lastExitOK);

            // 恢复文件
            this.recover(lastExitOK);

            LOGGER.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
        }

        long maxOffset = this.getMaxPhyOffset();
        this.setBrokerInitMaxOffset(maxOffset);
        LOGGER.info("load over, and the max phy offset = {}", maxOffset);
    } catch (Exception e) {
        LOGGER.error("load exception", e);
        result = false;
    }

    if (!result) {
        this.allocateMappedFileService.shutdown();
    }

    return result;
}

        org.apache.rocketmq.store.DefaultMessageStore#recover是ConsumeQueue、IndexFile文件根据Broker是否正常提出采用不同恢复策略的方法入口,如下代码所示。恢复ConsumeQueue文件后,保存每个消费队列的当前存储逻辑偏移量

/**
 * 恢复文件
 * @param lastExitOK broker上次是否正常退出,true正常退出;false异常退出
 */
private void recover(final boolean lastExitOK) {
    long recoverCqStart = System.currentTimeMillis();
    // 获取消息队列中的最大的消息物理偏移量
    long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
    long recoverCqEnd = System.currentTimeMillis();

    // 正常退出
    if (lastExitOK) {
        this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
    }
    // 异常退出
    else {
        this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
    }

    long recoverClogEnd = System.currentTimeMillis();
    // 恢复ConsumeQueue文件后,保存每个消费队列的当前存储逻辑偏移量
    this.recoverTopicQueueTable();
    long recoverOffsetEnd = System.currentTimeMillis();

    LOGGER.info("Recover end total:{} recoverCq:{} recoverClog:{} recoverOffset:{}",
            recoverOffsetEnd - recoverCqStart, recoverCqEnd - recoverCqStart, recoverClogEnd - recoverCqEnd, recoverOffsetEnd - recoverClogEnd);
}

2. Broker正常退出的文件恢复

        org.apache.rocketmq.store.CommitLog#recoverNormally是Broker正常退出(abort文件不存在)时的恢复核心方法,其调用链、代码如下所示。

        恢复是从ConsumeQueue文件集合的倒数第三个文件开始恢复,若没有3个则从第一个开始恢复。需要注意的是没有恢复IndexFile文件,原因是有异常的IndexFile在加载过程中被销毁,当恢复时再次转发消息来构建IndexFile,详细见org.apache.rocketmq.store.index.IndexService#load是IndexFile文件加载的核心处理方法,这里不作介绍。 

/**
 * Broker正常停止,文件恢复
 * step1:倒数第三个文件开始恢复,若没有则从第一个开始恢复
 * step2:遍历Commitlog文件,逐个取出每一条消息进行恢复(再次转发)
 * step3:更新MappedFileQueue的刷盘、提交指针 并移除processOffset之后的Commitlog文件
 * When the normal exit, data recovery, all memory data have been flush
 */
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
    // 文件恢复时,查找消息是否验证CRC,可配置
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    boolean checkDupInfo = this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        // 倒数第三个文件开始恢复,若没有则从第一个开始恢复
        // Began to recover from the last third file
        int index = mappedFiles.size() - 3;
        if (index < 0) {
            index = 0;
        }

        MappedFile mappedFile = mappedFiles.get(index);
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        // 已恢复的消息物理偏移量 = mappedFile.getFileFromOffset() + mappedFileOffset
        long processOffset = mappedFile.getFileFromOffset();
        // 当前已校验通过的offset
        long mappedFileOffset = 0; // 从0开始,说明该Commitlog文件的第一个消息开始
        long lastValidMsgPhyOffset = this.getConfirmOffset();
        // normal recover doesn't require dispatching
        boolean doDispatch = false; // 是否正常恢复
        // 遍历Commitlog文件,逐个取出每一条消息进行恢复
        while (true) {
            // 检查消息并返回消息大小
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
            int size = dispatchRequest.getMsgSize();
            // Normal data
            if (dispatchRequest.isSuccess() && size > 0) {
                lastValidMsgPhyOffset = processOffset + mappedFileOffset;
                mappedFileOffset += size;
                // 消息转发到ConsumeQueue文件、IndexFile文件
                this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, false);
            }
            // 消息验证通过,且消息大小为0,说明已到Commitlog文件末尾,读取下一个Commitlog文件
            // Come the end of the file, switch to the next file Since the
            // return 0 representatives met last hole,
            // this can not be included in truncate offset
            else if (dispatchRequest.isSuccess() && size == 0) {
                this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, true);
                // 读取下一文件
                index++;
                // 下一文件没有
                if (index >= mappedFiles.size()) {
                    // Current branch can not happen
                    log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
                    break;
                }
                // 若有下个Commitlog,重新设置变量
                else {
                    mappedFile = mappedFiles.get(index);
                    byteBuffer = mappedFile.sliceByteBuffer();
                    processOffset = mappedFile.getFileFromOffset();
                    mappedFileOffset = 0;
                    log.info("recover next physics file, " + mappedFile.getFileName());
                }
            }
            // Intermediate file read error
            else if (!dispatchRequest.isSuccess()) {
                if (size > 0) {
                    log.warn("found a half message at {}, it will be truncated.", processOffset + mappedFileOffset);
                }
                log.info("recover physics file end, " + mappedFile.getFileName());
                break;
            }
        }

        /*
            更新MappedFileQueue的刷盘、提交指针
         */
        processOffset += mappedFileOffset;
        // Set a candidate confirm offset.
        // In most cases, this value will be overwritten by confirmLog.init.
        // It works if some confirmed messages are lost.
        this.setConfirmOffset(lastValidMsgPhyOffset);
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        // 移除processOffset之后的所有文件
        this.mappedFileQueue.truncateDirtyFiles(processOffset);

        // Clear ConsumeQueue redundant data
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }

    } else {
        // Commitlog case files are deleted
        log.warn("The commitlog files are deleted, and delete the consume queue files");
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}

3. Broker异常退出的文件恢复

        org.apache.rocketmq.store.CommitLog#recoverAbnormally是Broker异常退出(abort文件存在)时的恢复核心方法,代码如下所示。

        与Broker正常退出逻辑差不多,区别:恢复是从ConsumeQueue文件集合的最后一个文件往前遍历,找到文件第一条消息存储正常的文件。而判断Commitlog是否正常存储,判定条件:魔数是否正常(第一条消息存储时间戳 = 0时,说明该文件没有任何消息)。

/**
 * Broker异常停止,文件恢复(大致与正常恢复一样)
 */
@Deprecated
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
    // recover by the minimum time stamp
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    boolean checkDupInfo = this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        // 最后一个文件往前走,找到第一个消息存储正常的文件
        // Looking beginning to recover from which file
        int index = mappedFiles.size() - 1;
        MappedFile mappedFile = null;
        for (; index >= 0; index--) {
            mappedFile = mappedFiles.get(index);
            // 判断Commitlog是否正常存储(判定条件:魔数是否正常);注意第一条消息存储时间戳 = 0时,说明该文件没有任何消息
            if (this.isMappedFileMatchedRecover(mappedFile)) {
                log.info("recover from this mapped file " + mappedFile.getFileName());
                break;
            }
        }

        if (index < 0) { // 没有,则获取第一个
            index = 0;
            mappedFile = mappedFiles.get(index);
        }

        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        long lastValidMsgPhyOffset = this.getConfirmOffset();
        // abnormal recover require dispatching
        boolean doDispatch = true;
        while (true) {
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
            int size = dispatchRequest.getMsgSize();

            if (dispatchRequest.isSuccess()) {
                // Normal data
                if (size > 0) {
                    lastValidMsgPhyOffset = processOffset + mappedFileOffset;
                    mappedFileOffset += size;

                    if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                        if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                            this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, false);
                        }
                    } else {
                        // 消息转发到ConsumeQueue文件、IndexFile文件
                        this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, false);
                    }
                }
                // Come the end of the file, switch to the next file
                // Since the return 0 representatives met last hole, this can
                // not be included in truncate offset
                else if (size == 0) {
                    this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, true);
                    index++;
                    if (index >= mappedFiles.size()) {
                        // The current branch under normal circumstances should
                        // not happen
                        log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
            } else {

                if (size > 0) {
                    log.warn("found a half message at {}, it will be truncated.", processOffset + mappedFileOffset);
                }

                log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
                break;
            }
        }

        processOffset += mappedFileOffset;
        // Set a candidate confirm offset.
        // In most cases, this value will be overwritten by confirmLog.init.
        // It works if some confirmed messages are lost.
        this.setConfirmOffset(lastValidMsgPhyOffset);
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        this.mappedFileQueue.truncateDirtyFiles(processOffset);

        // Clear ConsumeQueue redundant data
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    }
    // Commitlog case files are deleted
    else {
        log.warn("The commitlog files are deleted, and delete the consume queue files");
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}

三、参考资料

RocketMQ源码分析(十五)之文件恢复_jannals的博客-CSDN博客_rocketmq 文件恢复

RocketMQ5.0.0消息发送_爱我所爱0505的博客-CSDN博客

RocketMQ5.0.0消息存储<一>_存储文件及内存映射_爱我所爱0505的博客-CSDN博客

RocketMQ5.0.0消息存储<二>_消息存储流程_爱我所爱0505的博客-CSDN博客 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/340360.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

七大设计原则之单一职责原则应用

目录1 单一职责原则介绍2 单一职责原则应用1 单一职责原则介绍 单一职责&#xff08;Simple Responsibility Pinciple&#xff0c;SRP&#xff09;是指不要存在多于一个导致类变更的原因。假设我们有一个 Class 负责两个职责&#xff0c;一旦发生需求变更&#xff0c;修改其中…

线程池的简单实现:Java线程池初学者必读指南

"作为一名Java开发者&#xff0c;是否曾经遇到过多线程并发的问题&#xff1f;线程数量过多时&#xff0c;会导致资源浪费&#xff0c;应用性能下降&#xff0c;甚至发生线程死锁的情况。那么&#xff0c;有没有一种方法可以有效地管理线程&#xff0c;避免这些问题呢&…

Matlab傅里叶谱方法求解一维波动方程

傅里叶谱方法求解基本偏微分方程—一维波动方程 一维波动方程 对于一根两端固定、没有受到任何外力的弦, 若只研究其中的一段, 在不太长的时间 里, 固定端来不及对这段弦产生影响, 则可以认为固定端是不存在的, 弦的长度为无限大。 这种无界 (−∞<x<∞)(-\infty<x&…

震源机制(Focal Mechanisms)之沙滩球(Bench Ball)

沙滩球包含如下信息&#xff1a; a - 判断断层类型&#xff0c;可根据球的颜色快速判断 b - 判断断层的走向(strike)&#xff0c;倾角(dip) c - 确定滑移角/滑动角(rake) 走向 &#xff0c;倾角&#xff0c;滑移角 如不了解断层的定义&#xff0c;可以先阅读&#xff1a;震…

windows下qt设置网卡ip信息+简单案列(图形化界面设置网卡IP)。

windows设置网卡ip信息的方法 文章目录windows设置网卡ip信息的方法前言一、QProcess修改网卡ip信息1.1 代码实例二、system修改网卡ip信息2.1 代码实例三、qt修改网卡信息案例3.1 设计方法3.2 代码实例3.3 功能测试前言 方法1&#xff1a;QProcess修改网卡ip信息&#xff1b;…

四种方式的MySQL安装

mysql安装常见的方法有四种序号 安装方式 说明1 yum\rpm简单、快速&#xff0c;不能定制参数2二进制 解压&#xff0c;简单配置就可使用 免安装 mysql-a.b.c-linux2.x-x86_64.tar.gz3源码编译 可以定制参数&#xff0c;安装时间长 mysql-a.b.c.tar.gz4源码制成rpm包 把源码制…

Spring boot 实战指南(四):登录认证(OAuth、Cookie、Session、Token)、Spring Security

文章目录一、登录认证方式1.OAuth 认证颁发令牌的四种方式2.Cookie/Session 认证(1)Cookie(2)Session3.Token认证基于JWT的Token认证(spring security)二、Spring boot整合Spring Security(前后端分离)1.快速入门2.认证3.授权参考&#xff1a; 教程 登录认证简介 OAuth 2.0 的一…

Spring项目中用了这种解耦模式,老大对我刮目相看

前言不知道大家在项目中有没有遇到过这样的场景&#xff0c;根据传入的类型&#xff0c;调用接口不同的实现类或者说服务&#xff0c;比如根据文件的类型使用 CSV解析器或者JSON解析器&#xff0c;在调用的客户端一般都是用if else去做判断&#xff0c;比如类型等于JSON&#x…

Python Web开发:用Tornado框架制作一个简易的网站

前言 大家早好、午好、晚好吖 ❤ ~ 今天我们要用Python做Web开发&#xff0c;做一个简单的【表白墙】网站。 众所周知表白墙的功能普遍更多的是发布找人&#xff0c;失物招领&#xff0c;还是一个大家可以跟自己喜欢的人公开表白的平台 Tornado框架简单介绍 在Python当中&am…

SpringMVC笔记【JavaEE】

SpringMVC 1. SpringMVC概念 Spring MVC 是一个Web 框架。Spring MVC 是基于Servlet API构建的。 MVC是 Model&#xff08;模型&#xff09; View&#xff08;视图&#xff09; Controller&#xff08;控制器&#xff09; 的缩写&#xff0c;它是一种设计模式。 视图分为两种&…

2022年山东省中职组“网络安全”赛项比赛任务书正式赛题

2022年山东省中职组“网络安全”赛项 比赛任务书 一、竞赛时间 总计&#xff1a;360分钟 竞赛阶段竞赛阶段 任务阶段 竞赛任务 竞赛时间 分值 A模块 A-1 登录安全加固 180分钟 200分 A-2 Nginx安全策略 A-3 日志监控 A-4 中间件服务加固 A-5 本地安全策略…

html的表单标签(form)

目录标题1、表单标签主要有三大类&#xff1a;2、表单标签中常见的属性3、例子代码及结果4、注意&#xff1a;5、表单中特殊的属性表单标签可以用来数据交互&#xff0c;而前面学的六个标签只能发送不能接收。 表单标签的作用就是数据交互1、表单标签主要有三大类&#xff1a; …

搭建本地私有仓库

目录 一、搭建本地私有仓库 1、首先下载registry镜像 2、在daemon.json文件中添加私有镜像仓库地址 3、运行restart容器 4、为镜像打标签 5、上传到私有仓库 6、列出私有仓库的所有镜像 7、列出私有仓库的centos镜像有哪些tag 8、删除原有的centos的镜像&#xff0c;再测试…

双目测距------双目相机V1.0,将双目相机采集到任意一点的深度数据进行串口传输(带源码)

Depth2Uart 双目测距------双目相机V1.0&#xff0c;将双目相机采集到任意一点的深度数据进行串口传输 一、项目说明/Overview 所实现的功能&#xff1a;基于Intel Realsense官方提供的SDK&#xff0c;双目深度相机能获取到相机任何一个像素点距离前方障碍物的距离&#xff0…

电子学会2022年12月青少年软件编程(图形化)等级考试试卷(三级)答案解析

目录 一、单选题(共25题&#xff0c;共50分) 二、判断题(共10题&#xff0c;共20分) 三、编程题(共3题&#xff0c;共30分) 青少年软件编程&#xff08;图形化&#xff09;等级考试试卷&#xff08;三级&#xff09; 一、单选题(共25题&#xff0c;共50分) 1. 默认小猫角色…

FlinkCEP - Flink的复杂事件处理

版本说明 本文中以Flink 1.16.1 版本讲解说明 Note:Flink1.16.1版本相较于之前版本增强的within函数&#xff0c; 支持模式序列中相邻事件间的超时定义&#xff0c;以前版本只支持模式序列中第一个事件到最后一个事件之间的最大时间间隔。 快速开始 基于Kafka connecter 流…

C语言速成(有基础)

linux下的 是一种通用的、面向过程式的计算机编程语言 #include <stdio.h> //#include 预处理命令&#xff0c;用来引用头文件&#xff0c; stdio.h 头文件 int main() //开始 {/* 一个注释 */printf("Hello, World! \n");return 0; …

最强大的人工智能chatGPT不会还有人没用过吧,再不用就out了

&#x1f517; 运行环境&#xff1a;chatGPT &#x1f6a9; 撰写作者&#xff1a;左手の明天 &#x1f947; 精选专栏&#xff1a;《python》 &#x1f525; 推荐专栏&#xff1a;《算法研究》 #### 防伪水印——左手の明天 #### &#x1f497; 大家好&#x1f917;&#x1f9…

JUC并发编程 Ⅱ -- 共享模型之管程(上)

文章目录共享带来的问题临界区 Critical Section竞态条件 Race Conditionsynchronized 解决方案synchronized语法解决方案思考面向对象改进方法上的 synchronized线程八锁变量的线程安全分析成员变量和静态变量是否线程安全&#xff1f;局部变量是否线程安全&#xff1f;局部变…

搜索插入位置-力扣35-java

一、题目描述给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。请必须使用时间复杂度为 O(log n) 的算法。示例 1:输入: nums [1,3,5,6], target 5输出: 2示例 2:输…