6.RocketMQ之索引文件ConsumeQueue

news2025/1/23 21:30:01

在这里插入图片描述
本文着重分析为consumequeue/topic/queueId目录下的索引文件。

1.ConsumeQueueStore

public class ConsumeQueueStore {

	protected final ConcurrentMap<String>, ConcurrentMap<Integer>, ConsumeQueueInterface>> consumeQueueTable;
	
	public boolean load() {
	    String storePathRootDir = this.messageStoreConfig.getStorePathRootDir();
	    String storePathConsumeQueue = getStorePathConsumeQueue(storePathRootDir);
	    boolean cqLoadResult = loadConsumeQueues(storePathConsumeQueue, CQType.SimpleCQ);
	    String storePathBatchConsumeQueue = getStorePathBatchConsumeQueue(storePathRootDir);
	    boolean bcqLoadResult = loadConsumeQueues(storePathBatchConsumeQueue, CQType.BatchCQ);
	    return cqLoadResult && bcqLoadResult;
	}
	
	//Broker启动后加载本地的consumequeue文件
	private boolean loadConsumeQueues(String storePath, CQType cqType) {
        File dirLogic = new File(storePath);
        File[] fileTopicList = dirLogic.listFiles();
        if (fileTopicList != null) {
            for (File fileTopic : fileTopicList) {
                String topic = fileTopic.getName();
                File[] fileQueueIdList = fileTopic.listFiles();
                if (fileQueueIdList != null) {
                    for (File fileQueueId : fileQueueIdList) {
                        int queueId = Integer.parseInt(fileQueueId.getName());;
                        queueTypeShouldBe(topic, cqType);
                        //选择 ConsumeQueue or BatchConsumeQueue 本文以 ConsumeQueue 作为分析案例
                        ConsumeQueueInterface logic = createConsumeQueueByType(cqType, topic, queueId, storePath);
                        this.putConsumeQueue(topic, queueId, logic);
                        if (!this.load(logic)) {
                            return false;
                        }
                    }
                }
            }
        }
        return true;
    }
	
	private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueueInterface consumeQueue) {
        ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface> map = this.consumeQueueTable.get(topic);
        if (null == map) {
            map = new ConcurrentHashMap<>();
            map.put(queueId, consumeQueue);
            this.consumeQueueTable.put(topic, map);
        } else {
            map.put(queueId, consumeQueue);
        }
    }
	
	public boolean load(ConsumeQueueInterface consumeQueue) {
		// 通过 topic & queueId 从consumeQueueTable 获取到 对应的FileQueueLifeCycle 即ConsumeQueue
        FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
        return fileQueueLifeCycle.load();
    }
}

1.1.ConsumeQueue

public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
	
	private final MappedFileQueue mappedFileQueue;
	
	@Override
	public boolean load() {
	    boolean result = this.mappedFileQueue.load();
	    return result;
	}
}

1.2.MappedFileQueue

mappedFileQueue.load核心功能就是加载consumequeue/topic/queueId目录下的消费索引本地文件。区别CommitLog加载的是/commitlog目录下真正的用户数据。
ConsumeQueue & CommitLog 均持有属性类MappedFileQueue【mmap零拷贝之内存映射的磁盘文件】。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/882889.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

重新梳理DeepFaceLab(DeepFake)最近动态:简要且全面的信息

DeepFaceLab相关文章 一&#xff1a;《简单介绍DeepFaceLab&#xff08;DeepFake&#xff09;的使用以及容易被忽略的事项》 二&#xff1a;《继续聊聊DeepFaceLab&#xff08;DeepFake&#xff09;不断演进的2.0版本》 三&#xff1a;《如何翻译DeepFaceLab&#xff08;DeepF…

docker安装及优化详解

目录 一、部署20版的docker 1.1 安装依赖包 1.2 设置阿里云镜像源 1.3 安装docker-ce 社区版 1.4 关闭增强机制 1.5 开启服务 1.6 设置镜像加速 1.7 网络优化 二、linux 系统中的命令 记10条(cd ls pwd mv cp ) 2.1 查询docker 版本 2.2 搜索镜像 2.3 技能点 2.…

北美电商圈的黑马Shein(希音)产品权重打造,测评补单助销量提升

这两年北美的电商领域出现了一些备受关注的热门平台&#xff0c;其中Shein和TEMU无疑是其中的佼佼者。关于TEMU的测评之前有做过介绍&#xff0c;今天我们来探讨一下Shein是否也可以借助测评补单的方式来打造产品泉州提升销量。 首先不可否认只要是电商平台都可以通过测评补单…

数据结构--最短路径 Dijkstra算法

数据结构–最短路径 Dijkstra算法 Dijkstra算法 计算 b e g i n 点到各个点的最短路 \color{red}计算\ begin\ 点到各个点的最短路 计算 begin 点到各个点的最短路 如果是无向图&#xff0c;可以先把无向图转化成有向图 我们需要2个数组 final[] &#xff08;标记各顶点是否已…

FPGA:uart原理+tx发送模块

文章目录 一、串口通信二、UART通信三、tx发送模块 一、串口通信 处理器与外部设备通信的两种方式&#xff1a; 串行通信&#xff1a; 指数据的各个位使用多条数据线同时进行传输。 并行通信&#xff1a; 将数据分成一位一位的形式在一条数据线上逐个传输。 串行通信的通信方…

SpringBoot中properties、yml、yaml的优先级

原理 配置优先级低的会先加载然后会被配置优先级高的覆盖 验证 创建SpringBoot项目&#xff08;网址&#xff09; 在resource目录下创建application.properties、application.yml、application.yaml文件 运行 结论 优先级顺序&#xff1a; properties>yml>yaml

ARM处理器

1、RISC处理器&#xff1a; RISC (Reduced Instruction Set Computer) 微处理器是一种计算机微处理器架构&#xff0c;其设计原则是通过简化指令集来提高执行速度。 (1)、RISC处理器的设计理念&#xff1a; 简化指令集&#xff1a;RISC 微处理器的指令集非常精简&#xff0c…

匠心工艺-易天注胶设备新升级

随着科技的不断发展&#xff0c;注胶设备也在不断地升级换代。近期&#xff0c;易天光通信自主研发的注胶新设备投入DAC产线使用&#xff0c;新升级的注胶设备在原有的基础上&#xff0c;投入了更加先进的工艺技术&#xff0c;大幅度提升生产工作效率。 一、注胶设备的功能与性…

【UE】Web Browser内嵌网页在场景中的褪色问题

使用WebBrowser放置在场景中时&#xff0c;网页颜色会出现异常的褪色。 这是因为 Web 浏览器插件以 sRGB 格式输出其颜色数据&#xff0c;而 Widget/3D Widget 需要线性 RGB 格式的数据。 可以通过创建在 3D Widget 中使用的新材质&#xff08;而不是默认的 Widget3DPassthr…

Rust语法:所有权引用生命周期

文章目录 所有权垃圾回收管理内存手动管理内存Rust的所有权所有权转移函数所有权传递 引用与借用可变与不可变引用 生命周期悬垂引用函数生命周期声明结构体的生命周期声明Rust生命周期的自行推断生命周期约束静态生命周期 所有权 垃圾回收管理内存 Python&#xff0c;Java这…

远程仓库上创建一个新的分支 `b` 并将远程分支 `a` 的内容克隆到 `b` 分支上

一、需求&#xff1a; 要在远程仓库上创建一个新的分支 b 并将远程分支 a 的内容克隆到 b 分支上&#xff0c;你可以按照以下步骤进行操作&#xff1a; 二、解决方案&#xff1a; 1. 首先&#xff0c;使用 git clone 命令克隆远程仓库到本地。例如&#xff0c;要克隆一个名为…

Python数据分析实战-dataframe 某一列数据每个元素做处理并新增一列(附源码和实现效果)

实现功能 dataframe 某一列数据每个元素做处理并新增一列 实现代码 import pandas as pd# 创建示例数据 df pd.DataFrame({A: [1, 2, 3], B: [foo, bar, baz]}) # 对列 B 中的每个元素加上 processed_ 前缀&#xff0c;并将结果添加为新列 C df[C] df[B].apply(lambda x: p…

台湾shopee:虾皮电商平台选品方法与市场机遇

台湾Shopee虾皮电商平台为台湾本土卖家和消费者提供了一个线上交易平台。对于想要在台湾市场做虾皮电商的卖家来说&#xff0c;选择合适的产品是非常重要的。本文介绍一些做虾皮电商的选品方法和策略。 首先&#xff0c;了解市场需求是选品的基础。在进入台湾Shopee市场之前&a…

Linux6.39 Kubernetes Pod控制器

文章目录 计算机系统5G云计算第三章 LINUX Kubernetes Pod控制器一、Pod控制器及其功用二.pod控制器有多种类型1.ReplicaSet2.Deployment3.DaemonSet4.StatefulSet5.Cronjob 三、Pod与控制器之间的关系1.Deployment2.SatefulSet1&#xff09;为什么要有headless2&#xff09;为…

注解@Value获取配置文件内容 (demo)

1. 自定义配置文件内容 (application.yml) 2. 使用 Value("${xxx}") 注入属性 import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;Compone…

【声波】声波在硼酸、硫酸镁 (MgSO4) 和纯水中的吸收研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

棒球发展史·棒球1号位

棒球发展史 1. 棒球的起源 棒球的起源地棒球的起源地。棒球&#xff0c;也被称为垒球或棒球运动&#xff0c;起源于19世纪晚期的美国。当时在美国&#xff0c;体育运动已经有了较为完备的体制&#xff0c;也形成了多种不同的运动形式。然而&#xff0c;最受欢迎的体育运动主要…

主存储器结构

计算机存储器又称内存&#xff0c;是一种利用半导体技术做成的电子设备&#xff0c;用来存储数据。电子电路的数据是以二进制的方式存储&#xff0c;存储器的每一个存储单元称做记忆元。 存储器以二进制计算容量&#xff0c;基本单位是Byte&#xff1a; 1KiB1,024B 1MiB1,0…

反序列化与序列化过程分析

前言 在学习反序列化的漏洞时,大致都是了解了一些知识,比如序列化就是写入对象,反序列化就是读取文件恢复对象,在这个过程中会自动调用一些方法,readObject,writeObject,静态代码块等,但是从来没有了解过这个过程是怎么样的,一直很模糊,所以在这篇文章里面会记录整个学习过程,…

⛳ Docker - Centos 安装配置

目录 ⛳ Docker - Centos 安装配置&#x1f3ed; Docker 安装&#xff1a;&#x1f4e2; 一、安装依赖包&#x1f4ac; 二、添加 Docker 下载源地址&#x1f43e; 三、更新yum缓存&#x1f463; 四、安装Docker&#x1f4bb; 五、启动Docker&#x1f381; 六、查看Docker状态和…