6.RocketMQ之消费索引文件ConsumeQueue

news2025/1/23 22:37:23

功能:作为CommitLog文件的索引文件。
在这里插入图片描述
本文着重分析为consumequeue/topic/queueId目录下的索引文件。

1.ConsumeQueueStore

public class ConsumeQueueStore {

	protected final ConcurrentMap<String>, ConcurrentMap<Integer>, ConsumeQueueInterface>> consumeQueueTable;
	
	public boolean load() {
	    String storePathRootDir = this.messageStoreConfig.getStorePathRootDir();
	    String storePathConsumeQueue = getStorePathConsumeQueue(storePathRootDir);
	    boolean cqLoadResult = loadConsumeQueues(storePathConsumeQueue, CQType.SimpleCQ);
	    String storePathBatchConsumeQueue = getStorePathBatchConsumeQueue(storePathRootDir);
	    boolean bcqLoadResult = loadConsumeQueues(storePathBatchConsumeQueue, CQType.BatchCQ);
	    return cqLoadResult && bcqLoadResult;
	}
	
	//Broker启动后加载本地的consumequeue文件
	private boolean loadConsumeQueues(String storePath, CQType cqType) {
        File dirLogic = new File(storePath);
        File[] fileTopicList = dirLogic.listFiles();
        if (fileTopicList != null) {
            for (File fileTopic : fileTopicList) {
                String topic = fileTopic.getName();
                File[] fileQueueIdList = fileTopic.listFiles();
                if (fileQueueIdList != null) {
                    for (File fileQueueId : fileQueueIdList) {
                        int queueId = Integer.parseInt(fileQueueId.getName());;
                        queueTypeShouldBe(topic, cqType);
                        //选择 ConsumeQueue or BatchConsumeQueue 本文以 ConsumeQueue 作为分析案例
                        ConsumeQueueInterface logic = createConsumeQueueByType(cqType, topic, queueId, storePath);
                        this.putConsumeQueue(topic, queueId, logic);
                        if (!this.load(logic)) {
                            return false;
                        }
                    }
                }
            }
        }
        return true;
    }
	
	private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueueInterface consumeQueue) {
        ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface> map = this.consumeQueueTable.get(topic);
        if (null == map) {
            map = new ConcurrentHashMap<>();
            map.put(queueId, consumeQueue);
            this.consumeQueueTable.put(topic, map);
        } else {
            map.put(queueId, consumeQueue);
        }
    }
	
	public boolean load(ConsumeQueueInterface consumeQueue) {
		// 通过 topic & queueId 从consumeQueueTable 获取到 对应的FileQueueLifeCycle 即ConsumeQueue
        FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
        return fileQueueLifeCycle.load();
    }
}

1.1.ConsumeQueue

public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
	
	private final MappedFileQueue mappedFileQueue;
	
	@Override
	public boolean load() {
	    boolean result = this.mappedFileQueue.load();
	    return result;
	}
}

1.2.MappedFileQueue

mappedFileQueue.load核心功能就是加载consumequeue/topic/queueId目录下的消费索引本地文件。区别CommitLog加载的是/commitlog目录下真正的用户数据。
ConsumeQueue & CommitLog 均持有属性类MappedFileQueue【mmap零拷贝之内存映射的磁盘文件】。

DefaultMessageStore#ReputMessageService

CommitLog & ConsumerQueue 目录下的所有问题在Broker端启动的时候默认都会加载到内存中建立与磁盘之间的映射关系。但是在CommitLog不断增加数据过程中,ConsumerQueue是如何确认每条消息的索引文件呢?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/905963.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NetSuite OIDC、SAML SSO 演示

NetSuite的SSO的策略近些年处于演进过程&#xff0c;所以原来的Inbound SSO和Outbound SSO已经退出历史舞台。前者已经废止&#xff0c;后者在24年底废止。目前的SSO策略是&#xff1a; 第三方的身份认证服务商NetSuite as OIDC Provider 前者的含义是&#xff0c;把认证服务…

数据结构 - 基本概念和术语

基础概念之间的关系大致如下&#xff1a; 一、数据、数据元素、数据项和数据对象 数据 > 数据对象 > 数据元素 > 数据项 类比数据库&#xff0c;这四个概念代表的含义如下所示&#xff1a; 数据&#xff1a;整个数据库的所有数据数据对象&#xff1a;这个数据库的…

Shell脚本五:函数和数组

文章目录 1.函数1.1Shell函数的概念1.2函数的好处1.2函数的组成1.3函数的结构1.4查看函数列表1.5删除函数1.6函数的返回值1.6.1使用原则1.6.2示例 1.7函数的作用范围1.8函数递归1.8.1示例 2.数组2.1什么是数组2.2数组的作用2.3数组名和索引2.4定义数组的方式2.5普通数组和关联数…

深入理解分布式架构,构建高效可靠系统的关键

深入探讨分布式架构的核心概念、优势、挑战以及构建过程中的关键考虑因素。 引言什么是分布式架构&#xff1f;分布式架构的重要性 分布式系统的核心概念节点和通信数据分区与复制一致性与一致性模型负载均衡与容错性 常见的分布式架构模式客户端-服务器架构微服务架构事件驱动…

对Lua的理解

在redis和nginx中都潜入了Lua环境用于快速上手开发。但如何理解Lua以及Lua与宿主环境的交互是需要掌握的。 首先是Lua本身&#xff0c;打开5.1的lua版本开始编译后最后生成一个lua的可执行文件&#xff0c;这其实就是一个包含了Lua虚拟机的终端.。所以其实在不管redis也好nginx…

2023/8/20周报

目录 摘要 论文阅读 1、标题和现存问题 2、准备知识 3、模型结构 4、实验准备 5、实验结果 深度学习 1、构建图数据 2、GCN模型 3、当前实验结果 总结 摘要 本周在论文阅读上&#xff0c;阅读了一篇时空图卷积网络:交通预测的深度学习框架的论文。文章的时空图卷积…

NOIP2014普及组,提高组 比例简化 飞扬的小鸟 答案

比例简化 说明 在社交媒体上&#xff0c;经常会看到针对某一个观点同意与否的民意调查以及结果。例如&#xff0c;对某一观点表示支持的有1498 人&#xff0c;反对的有 902人&#xff0c;那么赞同与反对的比例可以简单的记为1498:902。 不过&#xff0c;如果把调查结果就以这种…

Leetcode-每日一题【剑指 Offer 33. 二叉搜索树的后序遍历序列】

题目 输入一个整数数组&#xff0c;判断该数组是不是某二叉搜索树的后序遍历结果。如果是则返回 true&#xff0c;否则返回 false。假设输入的数组的任意两个数字都互不相同。 参考以下这颗二叉搜索树&#xff1a; 5 / \ 2 6 / \ 1 3 示例 1&#xff1a; 输入: […

第4天----找出第一个只出现一次的字符(桶计数法/4种思路讲解)

题目描述 给定一个只包含小写字母的字符串&#xff0c;请你找到第一个仅出现一次的字符。如果没有&#xff0c;输出 no。 输入格式 一个字符串&#xff0c;长度小于 1100。 输出格式 输出第一个仅出现一次的字符&#xff0c;若没有则输出 no。 输入输出样例 输入 #1复制 abc 输…

鸿蒙/Android上最大的毒瘤:快应用服务

鸿蒙/Android上最大的毒瘤&#xff1a;快应用服务 2023.3.22版权声明&#xff1a;本文为博主chszs的原创文章&#xff0c;未经博主允许不得转载。 1、什么是快应用&#xff1f; “快应用” 是安卓厂&#xff08;华&#xff0c;米&#xff0c;O、V、魅族、努、联、加&#xf…

消息中间件的选择:RabbitMQ是一个明智的选择

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; MQ&#xff08;Message Queue&#xff09; MQ&#xff08;消息队列&#xff09;是一种用于在应用程序之间进行异步通信的技术&#xff1b;允许应用程序通过发送和接收…

[虚幻引擎] DTGlobalVariable 插件说明,蓝图全局变量访问,设置, Get, Set。

本插件可以在蓝图或者UMG中直接访问指定的全局变量值&#xff0c;方便编写。 支持Bool&#xff0c;Byte&#xff0c;Int&#xff0c;Int64&#xff0c;Float&#xff0c;Name&#xff0c;String&#xff0c;Text&#xff0c;Vector&#xff0c;Rotator&#xff0c;Transform&am…

【Linux网络】网络编程套接字 -- 基于socket实现一个简单UDP网络程序

认识端口号网络字节序处理字节序函数 htonl、htons、ntohl、ntohs socketsocket编程接口sockaddr结构结尾实现UDP程序的socket接口使用解析socket处理 IP 地址的函数初始化sockaddr_inbindrecvfromsendto 实现一个简单的UDP网络程序封装服务器相关代码封装客户端相关代码实验结…

TE-L-Tyrosine (FET-precursor),合成蛋白质的必需成分之一,L-Tyrosine

试剂简介&#xff1a;TE-L-Tyrosine (FET-precursor)&#xff0c;L-Tyrosine是一种氨基酸&#xff0c;它是蛋白质合成的必需成分之一。除了在蛋白质合成中的重要作用外&#xff0c;L-Tyrosine还具有多种生理功能。它是肾上腺素、去甲肾上腺素、甲状腺激素等重要激素的前体物质&…

短视频矩阵系统源码|开发者步骤

一、为了开发和部署短视频矩阵系统&#xff0c;首先需要进行以下步骤&#xff1a; 1. 系统设计与开发&#xff1a;根据需求&#xff0c;进行系统架构设计&#xff0c;并选择合适的技术栈进行开发。这可能涉及到前端开发、后端开发、数据库设计等工作。 2. 实现核心功能&#…

AIGC与软件测试的融合

一、ChatGPT与AIGC 生成式人工智能——AIGC&#xff08;Artificial Intelligence Generated Content&#xff09;&#xff0c;是指基于生成对抗网络、大型预训练模型等人工智能的技术方法&#xff0c;通过已有数据的学习和识别&#xff0c;以适当的泛化能力生成相关内容的技术。…

MySQL运行时的可观测性

1.说在前面的话 2.安装employees测试库 3.观测SQL运行状态 3.1 观测SQL运行时的内存消耗3.2 观测SQL运行时的其他开销3.3 观测SQL运行进度 感知SQL运行时的状态 1. 说在前面的话 在MySQL里&#xff0c;一条SQL运行时产生多少磁盘I/O&#xff0c;占用多少内存&#xff0c;是否…

Ctfshow web入门 命令执行RCE篇 web29-web77 与 web118-web124 详细题解 全

Ctfshow 命令执行 web29 pregmatch是正则匹配函数&#xff0c;匹配是否包含flag&#xff0c;if(!preg_match("/flag/i", $c))&#xff0c;/i忽略大小写 可以利用system来间接执行系统命令 flag采用f*绕过&#xff0c;或者mv fl?g.php 1.txt修改文件名&#xff0c…

一文彻底理解时间复杂度和空间复杂度(附实例)

目录 1 PNP&#xff1f;2 时间复杂度2.1 常数阶复杂度2.2 对数阶复杂度2.3 线性阶复杂度2.4 平方阶复杂度2.5 指数阶复杂度2.6 总结 3 空间复杂度 1 PNP&#xff1f; P类问题(Polynomial)指在多项式时间内能求解的问题&#xff1b;NP类问题(Non-Deterministic Polynomial)指在…

数据中心UPS监控,不服不行!

UPS作为关键的电力保障设备&#xff0c;它在电力中断或波动的情况下&#xff0c;为电子设备提供稳定的备用电源&#xff0c;以防止数据丢失、设备损坏或生产中断。 通过远程监控、电池健康检测、负载管理、警报通知等功能&#xff0c;UPS监控确保了系统的高效运行和可靠性。同时…