[RocketMQ] Broker CommitLogDispatcher 异步构建ConsumeQueue和IndexFile源码解析 (十四)

news2025/1/16 15:55:46
  1. CommitLogDispatcherBuildConsumeQueue: 异步构建ConsumerQueue。
  2. CommitLogDispatcherBuildIndex: 异步构建IndexFile。

    文章目录

        • 1.CommitLogDispatcherBuildConsumeQueue构建ConsumeQueue
          • 1.1 putMessagePositionInfo写入消息位置信息
          • 1.2 findConsumeQueue查找ConsumeQueue
            • 1.2.1 创建ConsumeQueue
          • 1.3 putMessagePositionInfoWrapper追加消息索引
            • 1.3.1 putMessagePositionInfo写入消息位置信息
            • 1.3.2 MappedFile#appendMessage追加消息
        • 2.CommitLogDispatcherBuildIndex构建IndexFile
          • 2.1 buildIndex构建Index索引
            • 2.1.1 retryGetAndCreateIndexFile获取IndexFile
            • 2.1.2 getAndCreateLastIndexFile获取最新索引文件
            • 2.1.3 创建IndexFile
            • 2.1.4 buildKey构建Key
            • 2.1.5 putKey构建Index索引
        • 3.总结

1.CommitLogDispatcherBuildConsumeQueue构建ConsumeQueue

CommitLogDispatcherBuildConsumeQueue用于接收分发请求并构建ConsumeQueue。

对于非事务消息或者是事务commit消息, 调用DefaultMessageStore#putMessagePositionInfo方法写入消息位置信息到consumeQueue, 如果是事务prepared消息和事务rollback消息, 则不出理。

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
    /**
     * DefaultMessageStore的方法
     *
     * @param request 分派消息请求
     */
    @Override
    public void dispatch(DispatchRequest request) {
        //从该消息的消息系统flag中获取事务状态
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            //如果不是事务消息或者是事务commit消息,则进行处理
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                //写入消息位置信息到consumeQueue
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            //如果是事务prepared消息或者是事务rollback消息,则不进行处理
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
}

1.1 putMessagePositionInfo写入消息位置信息

  1. 首先调用findConsumeQueue方法根据topic和队列id确定需要写入的ConsumeQueue。
  2. 然后调用ConsumeQueue#putMessagePositionInfoWrapper方法将消息信息追加到ConsumeQueue索引文件中。
/**
 * DefaultMessageStore的方法
 * 写入消息位置信息
 *
 * @param dispatchRequest 分派消息请求
 */
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    /*
     * 根据topic和队列id确定ConsumeQueue
     */
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    /*
     * 将消息信息追加到ConsumeQueue索引文件中
     */
    cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));
}

1.2 findConsumeQueue查找ConsumeQueue

根据topic和队列id确定需要写入的ConsumeQueue, 查找的目标是consumeQueueTable缓存集合。ConsumerQueue文件是延迟加载的。需要到该ConsumeQueue的时候才会新建。

/**
 * DefaultMessageStore
 * <p>
 * 根据topic和队列id查找ConsumeQueue
 */
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
    //从consumeQueueTable中获取该topic所有的队列
    ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
    //如果没有保存该topic的喜喜,那么存入一个空的map
    if (null == map) {
        ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
        ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
        if (oldMap != null) {
            map = oldMap;
        } else {
            map = newMap;
        }
    }
    // 从map中根据queueId 获取对应的 消费队列
    ConsumeQueue logic = map.get(queueId);
    //如果ConsumeQueue为null,那么新建,所以说ConsumeQueue是延迟创建的
    if (null == logic) {
        //新建ConsumeQueue
        ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                //单个文件大小,默认为可存储30W数据的大小,每条数据20Byte
                this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
                this);
        //存入map中,如果已存在则取旧的
        ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
        if (oldLogic != null) {
            logic = oldLogic;
        } else {
            // light message queue(LMQ)
            if (MixAll.isLmq(topic)) {
                lmqConsumeQueueNum.getAndIncrement();
            }
            logic = newLogic;
        }
    }

    return logic;
}
1.2.1 创建ConsumeQueue

创建ConsumerQueue, 初始化各种属性, 会初始化20个字节的堆外内存, 用于临时存储单个索引, 可以重复使用。

public ConsumeQueue(
        final String topic,
        final int queueId,
        final String storePath,
        final int mappedFileSize,
        final DefaultMessageStore defaultMessageStore) {
    //各种属性
    this.storePath = storePath;
    //单个文件大小,默认为可存储30W数据的大小,每条数据20Byte
    this.mappedFileSize = mappedFileSize;
    this.defaultMessageStore = defaultMessageStore;

    this.topic = topic;
    this.queueId = queueId;
    //queue的路径 $HOME/store/consumequeue/{topic}/{queueId}/{fileName}
    String queueDir = this.storePath
            + File.separator + topic
            + File.separator + queueId;
    //创建mappedFileQueue,内部保存在该queueId下面的所有的consumeQueue文件集合mappedFiles相当于一个文件夹
    this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
    //分配20个字节的堆外内存,用于临时存储单个索引,这段内存可循环使用
    this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
    //是否启用消息队列的扩展存储,默认false
    if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
        this.consumeQueueExt = new ConsumeQueueExt(
                topic,
                queueId,
                StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
                defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
                defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
        );
    }
}

ConsumeQueue文件可以看成是基于topic的commitlog索引文件, ConsumeQueue文件的组织方式: topic/queue/file三层组织结构, $HOME/store/consumequeue/{topic}/{queueId}/{fileName}。

在这里插入图片描述

1.3 putMessagePositionInfoWrapper追加消息索引

构建消息索引信息并且存入找到的ConsumeQueue文件中。支持重试, 最多30次。

/**
 * ConsumeQueue的方法
 * <p>
 * 将消息信息追加到ConsumeQueue索引文件中
 */
public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) {
    //最大重试次数30
    final int maxRetries = 30;
    //检查ConsumeQueue文件是否可写
    boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
    //如果文件可写,并且重试次数小于30次,那么写入ConsumeQueue索引
    for (int i = 0; i < maxRetries && canWrite; i++) {
        //获取tagCode
        long tagsCode = request.getTagsCode();
        //如果支持扩展信息写入,默认false
        if (isExtWriteEnable()) {
            ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
            cqExtUnit.setFilterBitMap(request.getBitMap());
            cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
            cqExtUnit.setTagsCode(request.getTagsCode());

            long extAddr = this.consumeQueueExt.put(cqExtUnit);
            if (isExtAddr(extAddr)) {
                tagsCode = extAddr;
            } else {
                log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                        topic, queueId, request.getCommitLogOffset());
            }
        }
        /*
         * 写入消息位置信息到ConsumeQueue中
         */
        boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
        if (result) {
            if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
                    this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
                //修改StoreCheckpoint中的physicMsgTimestamp:最新commitlog文件的刷盘时间戳,单位毫秒
                this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
            }
            this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
            if (multiQueue) {
                multiDispatchLmqQueue(request, maxRetries);
            }
            return;
        } else {
            // XXX: warn and notify me
            log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                    + " failed, retry " + i + " times");

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log.warn("", e);
            }
        }
    }

    // XXX: warn and notify me
    log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
    this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
1.3.1 putMessagePositionInfo写入消息位置信息

将消息位置信息写入到ConsumeQueue文件中:

  1. 如果消息偏移量+消息大小小于等于ConsumeQueue已处理的最大物理偏移量, 说明该消息已经被写过了, 返回true。
  2. 将消息信息offset、size、tagsCode按照顺序存入临时缓冲区byteBufferIndex中。
  3. 调用getLastMappedFile方法, 根据偏移量获取将要写入的最新ConsumeQueue文件的MappedFile。
  4. 进行一系列校验, 例如是否需要重设索引信息, 是否存在写入错误等等。
  5. 更新消息最大物理偏移量maxPhysicOffset = 消息在CommitLog中的物理偏移量 + 消息的大小。
  6. 调用MappedFile#appendMessage方法将临时缓冲区中的索引信息追加到mappedFile的mappedByteBuffer中, 并且更新wrotePosition的位置信息。

8B的offset+4B的size+8BtagsCode, offset: 消息在CommitLog中的物理偏移量。size: 消息大小。tagsCode: 延迟消息就是消息投递时间, 其他消息就是消息的tags的hashCode。

/**
 * 写入消息位置信息到ConsumeQueue中
 *
 * @param offset   消息在CommitLog中的物理偏移量
 * @param size     消息大小
 * @param tagsCode 消息tagsCode,延迟消息就是消息投递时间,其他消息就是消息的tags的hashCode
 * @param cqOffset 消息在消息消费队列的偏移量
 */
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
                                       final long cqOffset) {
    //如果消息偏移量+消息大小 小于等于ConsumeQueue已处理的最大物理偏移量
    //说明该消息已经被写过了,直接返回true
    if (offset + size <= this.maxPhysicOffset) {
        log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
        return true;
    }
    /*
     * 将消息信息offset、size、tagsCode按照顺序存入临时缓冲区byteBufferIndex中
     */
    //position指针移到缓冲区头部
    this.byteBufferIndex.flip();
    //缓冲区的限制20B
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    //存入8个字节长度的offset,消息在CommitLog中的物理偏移量
    this.byteBufferIndex.putLong(offset);
    //存入4个字节长度的size,消息大小
    this.byteBufferIndex.putInt(size);
    //存入8个字节长度的tagsCode,延迟消息就是消息投递时间,其他消息就是消息的tags的hashCode
    this.byteBufferIndex.putLong(tagsCode);
    //已存在索引数据的最大预计偏移量
    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
    /*
     * 根据偏移量获取将要写入的最新ConsumeQueue文件的MappedFile,可能会新建ConsumeQueue文件
     */
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {
        //如果mappedFile是第一个创建的消费队列,并且消息在消费队列的偏移量不为0,并且消费队列写入指针为0
        //那么表示消费索引数据错误,需要重设索引信息
        if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
            //设置最小偏移量为预计偏移量
            this.minLogicOffset = expectLogicOffset;
            //设置刷盘最新位置,提交的最新位置
            this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
            this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
            //对该ConsumeQueue文件expectLogicOffset之前的位置填充前导0
            this.fillPreBlank(mappedFile, expectLogicOffset);
            log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                    + mappedFile.getWrotePosition());
        }
        //如果消息在消费队列的偏移量不为0,即此前有数据
        if (cqOffset != 0) {
            //获取当前ConsumeQueue文件最新已写入物理偏移量
            long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
            //最新已写入物理偏移量大于预期偏移量,那么表示重复构建消费队列
            if (expectLogicOffset < currentLogicOffset) {
                log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                return true;
            }
            //如果不相等,表示存在写入错误,正常情况下,两个值应该相等,因为一个索引条目固定大小20B
            if (expectLogicOffset != currentLogicOffset) {
                LOG_ERROR.warn(
                        "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset,
                        currentLogicOffset,
                        this.topic,
                        this.queueId,
                        expectLogicOffset - currentLogicOffset
                );
            }
        }
        //更新消息最大物理偏移量 = 消息在CommitLog中的物理偏移量 + 消息的大小
        this.maxPhysicOffset = offset + size;
        /*
         * 将临时缓冲区中的索引信息追加到mappedFile的mappedByteBuffer中,并且更新wrotePosition的位置信息,到此构建ComsumeQueue完毕
         */
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}
1.3.2 MappedFile#appendMessage追加消息

该方法用于将数据追加到MappedFile, 追加到对应的mappedByteBuffer中, 基于mmap技术仅仅是将数据写入pageCache中, 没有立即刷盘, 依靠操作系统判断刷盘, 保证写入的高可用。

/**
 * MappedFile的方法
 * <p>
 * 追加消息
 *
 * @param data 追加的数据
 */
public boolean appendMessage(final byte[] data) {
    //获取写入位置
    int currentPos = this.wrotePosition.get();
    //如果当前位置加上消息大小小于等于文件大小,那么将消息写入mappedByteBuffer
    if ((currentPos + data.length) <= this.fileSize) {
        try {
            //消息写入mappedByteBuffer即可,并没有执行刷盘
            ByteBuffer buf = this.mappedByteBuffer.slice();
            buf.position(currentPos);
            buf.put(data);
        } catch (Throwable e) {
            log.error("Error occurred when append message to mappedFile.", e);
        }
        //更新写入位置
        this.wrotePosition.addAndGet(data.length);
        return true;
    }

    return false;
}

2.CommitLogDispatcherBuildIndex构建IndexFile

接收分发请求并构建IndexFile。判断是否支持消息Index, 调用IndexService#buildIndex方法构建, 不存在则不构建, Index文件是否存在都不影响RocketMQ的正常运行, 提高根据keys或者时间范围查询消息的效率。

/**
 * DefaultMessageStore的方法
 * 写入消息位置信息到IndexFile
 *
 * @param request 分派消息请求
 */
@Override
public void dispatch(DispatchRequest request) {
    //是否支持IndexFile,默认true
    if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
        //构建Index
        DefaultMessageStore.this.indexService.buildIndex(request);
    }
}

2.1 buildIndex构建Index索引

  1. 通过retryGetAndCreateIndexFile方法获取或创建最新索引文件IndexFile, 最多重试3次。
  2. 判断当前消息在commitlog中的偏移量小于该文件的结束索引在commitlog中的偏移量, 表示已为该消息构建Index索引, 直接返回。如果该消息是事务回滚消息, 同样直接返回, 不创建索引。
  3. 获取客户端生成的uniqId, 也叫msgId, 代表客户端生成的唯一一条消息, 如果uniqId不为null的话, 调用putKey()为uniqId创建索引。
  4. 获取客户端传递的keys, 如果keys不是空, 那么调用putKey方法为keys中的每一个key构建索引。
/**
 * IndexService的方法
 * <p>
 * 构建Index索引
 */
public void buildIndex(DispatchRequest req) {
    /*
     * 获取或创建最新索引文件,支持重试最多3次
     */
    IndexFile indexFile = retryGetAndCreateIndexFile();
    if (indexFile != null) {
        //获取结束物理索引
        long endPhyOffset = indexFile.getEndPhyOffset();
        DispatchRequest msg = req;
        //获取topic和keys
        String topic = msg.getTopic();
        String keys = msg.getKeys();
        //如果消息在commitlog中的偏移量小于该文件的结束索引在commitlog中的偏移量,那么表示已为该消息之后的消息构建Index索引
        //此时直接返回,不需要创建索引
        if (msg.getCommitLogOffset() < endPhyOffset) {
            return;
        }
        //获取该消息的事务类型
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                break;
            //如果是事务回滚消息,则直接返回,不需要创建索引
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                return;
        }
        //获取客户端生成的uniqId,也被称为msgId,从逻辑上代表客户端生成的唯一一条消息
        //如果uniqId不为null,那么为uniqId构建索引
        if (req.getUniqKey() != null) {
            indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
            if (indexFile == null) {
                log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                return;
            }
        }
        //获取客户端传递的keys
        //如果keys不为空,那么为keys中的每一个key构建索引
        if (keys != null && keys.length() > 0) {
            //按照空格拆分key
            String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
            //为keys中的每一个key构建索引
            for (int i = 0; i < keyset.length; i++) {
                String key = keyset[i];
                if (key.length() > 0) {
                    indexFile = putKey(indexFile, msg, buildKey(topic, key));
                    if (indexFile == null) {
                        log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                        return;
                    }
                }
            }
        }
    } else {
        log.error("build index error, stop building index");
    }
}
2.1.1 retryGetAndCreateIndexFile获取IndexFile

该方法用于获取或创建索引文件, 支持重试, 最多循环3次, 循环中调用getAndCreateLastIndexFile方法获取最新索引文件, 如果文件写满了或者没有文件, 则自动创建文件。

/**
 * IndexService的方法
 * <p>
 * 获取或创建索引文件,支持重试
 */
public IndexFile retryGetAndCreateIndexFile() {
    IndexFile indexFile = null;
    //循环尝试,尝试创建索引文件的最大次数为3
    for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
        //获取最新的索引文件,如果文件写满了或者还没有文件则会自动创建新的索引文件
        indexFile = this.getAndCreateLastIndexFile();
        //如果获取的indexFile不为null,那么退出循环
        if (null != indexFile)
            break;

        try {
            log.info("Tried to create index file " + times + " times");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        }
    }
    //标记indexFile异常
    if (null == indexFile) {
        this.defaultMessageStore.getAccessRights().makeIndexFileError();
        log.error("Mark index file cannot build flag");
    }

    return indexFile;
}
2.1.2 getAndCreateLastIndexFile获取最新索引文件

获取最新IndexFile, 如果文件写满了或者还没有文件则会自动创建新的索引文件。

  1. 获取读锁。
    1. 如果indexFileList不为空, 重试获取最后的indexFile, 否则创建一个新的。
    2. 如果最后一个indexFile没写满, 赋值给indexFile。
    3. 如果最后一个IndexFile写满了, 创建新文件, 获取目前最后一个文件的endPhyOffset, endTimestamp等信息。
  2. 如果上一步没有获取到indexFile, 创建新的IndexFile。
    1. 获取完整文件名 $HOME/store/index${fileName}。fileName是以创建时的时间戳命名的。
    2. 调用IndexFile的构造器创建新的IndexFile。
    3. 获取写锁, 将新建的IndexFile加入indexFileList, 释放写锁。
    4. 创建新文件后, 尝试将上一个文件刷盘。
  3. 返回获取的indexFile。
/**
 * IndexService的方法
 * <p>
 * 获取最新的索引文件,如果文件写满了或者还没有文件则会自动创建新的索引文件
 */
public IndexFile getAndCreateLastIndexFile() {
    IndexFile indexFile = null;
    IndexFile prevIndexFile = null;
    long lastUpdateEndPhyOffset = 0;
    long lastUpdateIndexTimestamp = 0;

    /*
     * 尝试获取最新IndexFile
     */
    {
        //尝试获取读锁
        this.readWriteLock.readLock().lock();
        //如果indexFileList不为空
        if (!this.indexFileList.isEmpty()) {
            //尝试获取最后一个IndexFile
            IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
            if (!tmp.isWriteFull()) {
                //如果最后一个IndexFile没写满,则赋值给indexFile
                indexFile = tmp;
            } else {
                //如果最后一个IndexFile写满了,则创建新文件
                //获取目前最后一个文件的endPhyOffset
                lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
                //获取目前最后一个文件的endTimestamp
                lastUpdateIndexTimestamp = tmp.getEndTimestamp();
                //赋值给prevIndexFile
                prevIndexFile = tmp;
            }
        }

        this.readWriteLock.readLock().unlock();
    }
    /*
     * 尝试创建一个新的IndexFile
     */
    if (indexFile == null) {
        try {
            //获取完整文件名$HOME/store/index${fileName},fileName是以创建时的时间戳命名的,精确到毫秒
            String fileName =
                    this.storePath + File.separator
                            + UtilAll.timeMillisToHumanString(System.currentTimeMillis());
            //创建IndexFile
            indexFile =
                    new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
                            lastUpdateIndexTimestamp);
            //获取写锁
            this.readWriteLock.writeLock().lock();
            //加入到indexFileList集合中
            this.indexFileList.add(indexFile);
        } catch (Exception e) {
            log.error("getLastIndexFile exception ", e);
        } finally {
            //释放写锁
            this.readWriteLock.writeLock().unlock();
        }
        /*
         * 创建了新的文件之后,尝试将上一个文件刷盘
         */
        if (indexFile != null) {
            final IndexFile flushThisFile = prevIndexFile;
            /*
             * 新开一个线程,异步的对上一个IndexFile文件刷盘
             */
            Thread flushThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    IndexService.this.flush(flushThisFile);
                }
            }, "FlushIndexFileThread");

            flushThread.setDaemon(true);
            flushThread.start();
        }
    }

    return indexFile;
}
2.1.3 创建IndexFile

第一次构建Index或者之前的IndexFile写满了的时候, 创建新的IndexFile。IndexFile文件大小约为
40B 头数据indexHeader + 500w * 4B hashslot + 2000w * 20B index = 420000040B: 400M大小。

/**
 * 创建IndexFile
 *
 * @param fileName     文件名
 * @param hashSlotNum  哈希槽数量,默认5000000
 * @param indexNum     索引数量默认,默认5000000 * 4
 * @param endPhyOffset 上一个文件的endPhyOffset
 * @param endTimestamp 上一个文件的endTimestamp
 * @throws IOException
 */
public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
                 final long endPhyOffset, final long endTimestamp) throws IOException {
    //文件大小,默认约400M左右
    //40B 头数据 + 500w * 4B hashslot + 2000w * 20B index
    int fileTotalSize =
            IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
    //构建mappedFile
    this.mappedFile = new MappedFile(fileName, fileTotalSize);
    this.fileChannel = this.mappedFile.getFileChannel();
    this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
    this.hashSlotNum = hashSlotNum;
    this.indexNum = indexNum;
    //生成DirectByteBuffer,对该buffer写操作会被反映到文件里面
    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    //获取indexHeader
    this.indexHeader = new IndexHeader(byteBuffer);
    //设置新文件的起始物理索引和结束物理索引都为上一个文件的结束物理索引
    if (endPhyOffset > 0) {
        this.indexHeader.setBeginPhyOffset(endPhyOffset);
        this.indexHeader.setEndPhyOffset(endPhyOffset);
    }
    //设置新文件的起始时间戳和结束时间戳都为上一个文件的结束时间戳
    if (endTimestamp > 0) {
        this.indexHeader.setBeginTimestamp(endTimestamp);
        this.indexHeader.setEndTimestamp(endTimestamp);
    }
}
2.1.4 buildKey构建Key

构建Index索引的key, RocketMQ将会为uniqId和keys中的每个key构建索引。

UniqKey将会转换为topic#UniqKey, 而keys则会先通过空格拆分, 然后将每个key转换为topic#key。

/**
 * IndexService的方法
 * 构建key
 */
private String buildKey(final String topic, final String key) {
    //拼接
    return topic + "#" + key;
}

2.1.5 putKey构建Index索引

IndexFile文件大约400M, 一个IndexFile文件可以保存2000W个索引, IndexFile底层是HashMap结构, 故RocketMQ的索引文件底层实现为hash索引。

putKey方法循环调用indexFile#putKey方法构建Index索引, 每次构建失败都将调用retryGetAndCreateIndexFile方法尝试获取或创建最新索引文件然后再尝试构建。

  1. 判断如果当前文件的index索引数量小于2000w, 则表明当前文件还可以继续构建索引。
  2. 计算Key的哈希值keyHash, 通过 哈希值keyHash & hash槽数量hashSlotNum获取key对应的hash槽的下标slotPos, 计算该消息的绝对hash槽偏移量 absSlotPos = 40B + slotPos * 4B。
  3. 计算当前消息在commitlog中的消息存储时间与该Index文件起始时间差timeDiff, 计算该消息的索引存放位置的绝对偏移量absIndexPos = 40B + 500w * 4B + indexCount * 20B。
  4. 在absIndexPos位置顺序存放Index索引数据, 大小为20B, 存入4B的当前消息的Key的哈希值, 存入8B的当前消息在commitlog中的物理偏移量, 存入4B的当前消息在commitlog中的消息存储时间与该Index文件起始时间差, 存入4B的slotValue, 可更新当前hash槽的值为最新的IndexFile的索引条目计数的编号。
  5. 在absSlotPos位置更新当前hash槽的值为最新的IndexFile的索引条目计数的编号, 为当前索引存入的编号。
  6. 判断如果索引数量小于等于1, 说明时该文件第一次存入索引, 初始化beginPhyOffset和beginTimestamp。
  7. 判断如果slotValue为0, 则为一个新槽, hashSlotCount + 1。
  8. 索引条目计数indexCount自增1, 设置新的endPhyOffset和endTimestamp。
/**
 * IndexService的方法
 * <p>
 * 构建Index索引
 *
 * @param indexFile indexFile
 * @param msg       消息
 * @param idxKey    key
 */
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
    //循环尝试构建Index索引
    for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
        log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
        //构建失败,则尝试获取或创建最新索引文件,支持重试
        indexFile = retryGetAndCreateIndexFile();
        if (null == indexFile) {
            return null;
        }
        //再次尝试构建Index索引
        ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
    }

    return indexFile;
}


/**
 * IndexFile的方法
 * <p>
 * 构建Index索引
 *
 * @param key            key
 * @param phyOffset      当前消息在commitlog中的物理偏移量
 * @param storeTimestamp 当前消息在commitlog中的消息存储时间
 * @return
 */
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    //如果当前文件的index索引数量小于2000w,则表明当前文件还可以继续构建索引
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        //计算Key的哈希值
        int keyHash = indexKeyHashMethod(key);
        //通过 哈希值 & hash槽数量 的方式获取当前key对应的hash槽下标位置,hashSlotNum默认为5000w
        int slotPos = keyHash % this.hashSlotNum;
        //计算该消息的绝对hash槽偏移量 absSlotPos = 40B + slotPos * 4B
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        FileLock fileLock = null;

        try {

            // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
            // false);
            //获取当前hash槽的值,一个hash槽大小为4B
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            //如果值不为0说明这个hash key已经存在,即存在hash冲突
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }
            //当前消息在commitlog中的消息存储时间与该Index文件起始时间差
            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

            timeDiff = timeDiff / 1000;

            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }
            //获取该消息的索引存放位置的绝对偏移量 absIndexPos = 40B + 500w * 4B + indexCount * 20B
            int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                            + this.indexHeader.getIndexCount() * indexSize;
            //存入4B的当前消息的Key的哈希值
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            //存入8B的当前消息在commitlog中的物理偏移量
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            //存入4B的当前消息在commitlog中的消息存储时间与该Index文件起始时间差
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            //存入4B的slotValue,即前面读出来的 slotValue,可能是0,也可能不是0,而是上一个发生hash冲突的索引条目的编号
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
            //更新当前hash槽的值为最新的IndexFile的索引条目计数的编号,也就是当前索引存入的编号
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
            /*
             * 从存入的数据可以看出来:
             * IndexFile采用用slotValue字段将所有冲突的索引用链表的方式串起来了,而哈希槽SlotTable并不保存真正的索引数据,
             * 而是保存每个槽位对应的单向链表的头,即可以看作是头插法插入数据
             */

            //如果索引数量小于等于1,说明时该文件第一次存入索引,那么初始化beginPhyOffset和beginTimestamp
            if (this.indexHeader.getIndexCount() <= 1) {
                this.indexHeader.setBeginPhyOffset(phyOffset);
                this.indexHeader.setBeginTimestamp(storeTimestamp);
            }
            //如果slotValue为0,那么表示采用了一个新的哈希槽,此时hashSlotCount自增1
            if (invalidIndex == slotValue) {
                this.indexHeader.incHashSlotCount();
            }
            //因为存入了新的索引,那么索引条目计数indexCount自增1
            this.indexHeader.incIndexCount();
            //设置endPhyOffset和endTimestamp
            this.indexHeader.setEndPhyOffset(phyOffset);
            this.indexHeader.setEndTimestamp(storeTimestamp);

            return true;
        } catch (Exception e) {
            log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
        } finally {
            if (fileLock != null) {
                try {
                    fileLock.release();
                } catch (IOException e) {
                    log.error("Failed to release the lock", e);
                }
            }
        }
    } else {
        log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
    }

    return false;
}

3.总结

在这里插入图片描述

在这里插入图片描述

IndexFile的构成包括40B的Header头信息, 4 x 500wB的Slot信息, 20 x 2000wB的Index信息

  1. Header: java为IndexHeader, 8B的的beginTimestamp, 8B的endTimestamp, 8B的beginPhyOffset, 8B的endPhyOffset, 4B的hashSlotCount 哈希槽计数, 4B的indexCount 索引条目计数。

  2. slot Table并不保存真正的索引数据, 存储的是每个槽位对应的单向链表的头。即最新消息的索引条目计数的编号indexCount。

  3. 索引信息: 4B的Key Hash, Key的哈希值。8B的CommitLog Offset, 当前消息在commitlog中的物理偏移量。4B的Timestamp, 当前消息在commitlog中的消息存储时间与该Index文件起始时间差。4B的NextIndex offset, 链表的下一个索引的Index位置。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/769298.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Windows Docker部署springboot微服务

Windows Docker部署springboot微服务 前言 偶然的机会让我能够实操在Windows server 2019版本的服务器上进行springCloud服务的部署 过程中深刻的体会到了为什么Docker要推荐使用Linux系列的系统进行操作 遇到的问题 springboot镜像打包微服务启动后nacos连不上使用的基础…

zabbix企业级监控(监控win10主机)---接小白到大神之路运维第63天

第三阶段基础 zabbix企业级监控监控win10主机 目录 Wind10配置&#xff1a; Web图形操作&#xff1a; Wind10配置&#xff1a; 服务器相关信息&#xff1a; 关闭防火墙、IP地址为192.168.59.128 1.首先在C盘根目录下创建zabbix的文件夹 2.将需要的文件拖到该文件夹内&…

CRM排名前三的的系统有哪些特点?

crm经过多年的发展&#xff0c;不仅可以管理好客户关系还是企业重要的战略武器。让企业的销售、市场营销和客服服务部门建立密切联系&#xff0c;在crm一个平台上处理商机&#xff0c;简化业务流程&#xff0c;为组织降本增效。国内crm系统排名哪些技术商更靠前&#xff1f; 1…

CISCN2023国赛复现

[CISCN 2023 初赛]被加密的生产流量 下载附件打开 他的题目叫modbus modbus是一种协议 在这些流量里都找不到有用信息&#xff0c;但是发现了有tcp的追踪流 打开看看 发现两个一组的这几行数字组合像是base编码 MMYMMX3GNEYMOXZRGAYDA 放到base家族解一下密 最后在base32解…

详解应用层的HTTP协议与HTTPS协议

文章目录 前言HTTP协议1. 理解应用层协议1. 什么是HTTP协议&#xff1f;2. HTTP协议工作流程3. HTTP报文格式3.1 HTTP请求方法3.2 HTTP请求报头3.3 HTTP请求正文3.4 HTTP响应的状态码3.4 HTTP响应的报头3.5 HTTP响应的正文3.6 HTTP请求的URL 4. POST请求与GET请求5. HTTP协议实…

结构型模式 - 装饰者模式

概述 我们先来看一个快餐店的例子。 快餐店有炒面、炒饭这些快餐&#xff0c;可以额外附加鸡蛋、火腿、培根这些配菜&#xff0c;当然加配菜需要额外加钱&#xff0c;每个配菜的价钱通常不太一样&#xff0c;那么计算总价就会显得比较麻烦。 使用继承的方式存在的问题&#x…

5G工业路由器实现AGV远程控制,智联物联无线物联网方案

随着AGV在制造业应用逐渐广泛&#xff0c;在生产车间传统的布线网络下&#xff0c;存在着接口不足、网络不稳定、数据丢失、故障异常的情况&#xff0c;技术人员无法及时观察AGV的数据情况&#xff0c;导致AGV出错率高&#xff0c;维护成本高等问题。 传统的AGV通信方式一般是…

【Python基础函数笔记】获取当前时间并写入日志

1.获取当前时间 import os from datetime import datetime import pytzdef get_cur_time():# 获取当前时间return datetime.strftime(datetime.now(pytz.timezone(Asia/Singapore)), %Y-%m-%d_%H-%M-%S)# 基础目录 basedir a logdir os.path.join(basedir, logs, str(args.n…

S3C2440的串口通信(UART)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、UART二、2440的uart资源2.1.uart配置流程2.2 RS2322.3 RS232接口 三. UART编程实现&#xff08;无缓存&#xff09;四. UART编程实现&#xff08;中断&…

Python多线程 threading 和多进程 multiprocessing

1. 并发 vs 并行 线程是程序执行的最小单位&#xff0c;一个进程可以由一个或多个线程组成&#xff0c;各个线程之间也是交叉执行。 并发&#xff0c;相当于单核CPU&#xff0c;宏观同时执行&#xff0c;微观高速切换 交替执行。多线程、高并发这些词语更多地出现在服务端程序…

机器学习(十七):实操_在Sklearn中的实现CART树的基本流程

全文共8000余字&#xff0c;预计阅读时间约16~27分钟 | 满满干货(附代码)&#xff0c;建议收藏&#xff01; 代码下载点这里 一、介绍 CART&#xff08;Classification and Regression Trees&#xff09;即分类回归树&#xff0c;是一种重要的机器学习算法&#xff0c;既可以…

3.8 Bootstrap 面包屑导航(Breadcrumbs)

文章目录 Bootstrap 面包屑导航&#xff08;Breadcrumbs&#xff09; Bootstrap 面包屑导航&#xff08;Breadcrumbs&#xff09; 面包屑导航&#xff08;Breadcrumbs&#xff09;是一种基于网站层次信息的显示方式。以博客为例&#xff0c;面包屑导航可以显示发布日期、类别或…

解决win10系统中ping localhost被解析为 ::1的问题

目录 问题描述 问题分析 解决方案 一、修改host文件 二、修改注册表 三、修改IPv6的优先级 问题描述 本机为win10系统&#xff0c;在命令行窗口ping localhost时&#xff0c;本机IP127.0.0.1被解析为了 ::1的问题 1、在命令行窗口 ping 127.0.0.1 2、在命令行窗口 ping…

Linux常用命令——ed命令

在线Linux命令查询工具 ed 单行纯文本编辑器 补充说明 ed命令是单行纯文本编辑器&#xff0c;它有命令模式&#xff08;command mode&#xff09;和输入模式&#xff08;input mode&#xff09;两种工作模式。ed命令支持多个内置命令&#xff0c;常见内置命令如下&#xff…

leetcode 59.螺旋矩阵

记录一下&#xff0c;觉得倒水思想来做 总体看起来还是比较清晰的。 class Solution { public:vector<vector<int>> generateMatrix(int n) {int a[4][2] {{0,1}, {1,0}, {0,-1},{-1,0}};int direction0; //方向int num0;int S n*n;int x 0;int y 0;vector<…

解析基因影响:孟德尔随机化的创新思维

一、引言 在当今的遗传学和生物学研究中&#xff0c;我们对基因对个体特征和性状的影响的理解变得更加深入。然而&#xff0c;基因影响的复杂性和多样性给我们带来了巨大的挑战。为了更好地揭示基因影响的本质和机制&#xff0c;我们需要采用创新的研究思维和方法。 本文的目的…

听GPT 讲K8s源代码--pkg(四)

/pkg/controlplane、/pkg/credentialprovider、/pkg/kubeapiserver是Kubernetes中的三个核心包&#xff0c;它们分别实现了不同的功能。 /pkg/controlplane包 /pkg/controlplane是Kubernetes的一个包&#xff0c;它包含了控制平面组件的实现&#xff0c;例如API Server、Contro…

妙记多 Mojidoc 模版投稿活动招募

妙记多 Mojidoc 开始征集模板啦! 快来投稿吧&#xff01;&#x1f389;&#x1f389;&#x1f389; 优秀模板将被选录进官方模板中心&#xff0c;让你的灵感和创意被更多人看见&#xff01;选录后&#xff0c;你可直接解锁「高级体验官」称号&#xff0c;并有机会获得妙记多 M…

IDELAYG/ODELAY/IDELAYCTRL

如下是7系列FPGA HP Bank I/O 资源&#xff1a; 其中ILOGIC是由许多的数据选择器和一个IDDR触发器构成。 在HP BANK中&#xff0c;ILOGIC被称为ILOGICE2&#xff0c;在HR BANK中&#xff0c;ILOGIC被称为ILOGICE3 IDELAY 简单介绍 输入信号延迟模块。每个I/O模块都包含了一…

内存分区,编译链接,ARCMRC,消息传递消息转发,对象的底层

文章目录 前言内存分区栈区堆区全局区文字常量区程序代码区运行之前运行之后 编译&#xff0c;链接编译的过程链接 ARC&#xff0c;MRC在编译期干了什么 对象的底层消息传递&#xff0c;消息转发消息转发消息传递IMP指针IMP与SEL的区别与联系 前言 对第一周学习内容做个概括 提…