6.RocketMQ之文件查询索引文件IndexFile

news2025/1/16 16:13:00

根据消息ID来查找消息,consumequeue 中没有存储消息ID,如果不采取其他措施,又得遍历 commitlog文件了,indexFile就是为了解决这个问题的文件。

在这里插入图片描述
如图所示:

  1. 一个indexFile对应一个40个字节的IndexHead。
  2. (40,2000 0000]区间代表存放4个字节的index条目数。
  3. (2000 0000,4 0000 0000]区间存放的是20个字节长度的具体的Index条目数据。
  4. 一个IndexFile文件大小约为400M左右。

1.IndexFile

为了便于分析将DirectByteBuffer等同于HeapByteBuffer解析。

/**
*
* @param fileName
* @param hashSlotNum 500 0000
* @param indexNum 4 * 500 0000
* @param endPhyOffset
* @param endTimestamp
* @throws IOException
*/
public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,final long endPhyOffset, final 
		long endTimestamp) throws IOException {
    // todo fileTotalSize = 40个字节的文件头 + 4字节共500w个的槽表 + 20字节2000w记录索引链表
    this.fileTotalSize = IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
    // 内存映射缓冲区大小约为 400M
    this.mappedFile = new DefaultMappedFile(fileName, fileTotalSize);
    this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
    this.hashSlotNum = hashSlotNum;
    this.indexNum = indexNum;

    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    this.indexHeader = new IndexHeader(byteBuffer);

    if (endPhyOffset > 0) {
        this.indexHeader.setBeginPhyOffset(endPhyOffset);
        this.indexHeader.setEndPhyOffset(endPhyOffset);
    }

    if (endTimestamp > 0) {
        this.indexHeader.setBeginTimestamp(endTimestamp);
        this.indexHeader.setEndTimestamp(endTimestamp);
    }
}
 /**
 * 冲突生成的链条:
 * absSlotPos:没有冲突该值为0。
 * 如果存在多个冲突是如何处理的呢?
 * 假设MsgKey分别为1、2、3,其对应的indexCount分别为10、20、30。并且这三个MsgKey对应的keyHash一样即出现冲突。
 * 1、MsgKey = 1,即对应的(absIndexPos + 4 + 8 + 4)为0,absIndexPos为10。
 * 2、MsgKey = 2,即对应的(absIndexPos + 4 + 8 + 4)为10,absIndexPos为20。
 * 3、MsgKey = 3,即对应的(absIndexPos + 4 + 8 + 4)为20,absIndexPos为30。
 * 注意发生冲突并没有丢弃旧MsgKey,也没有发生覆盖。
 * @param key
 * @param phyOffset
 * @param storeTimestamp
 * @return
 */
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        int keyHash = indexKeyHashMethod(key);
        // 根据MsgKey的hash值 计算槽位置逻辑值
        int slotPos = keyHash % this.hashSlotNum;
        // 计算槽位置的绝对值。
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        try {
            // 如果不存在hash冲突则slotValue始终为0。否则存在hash冲突,并且 slotValue 表示 与当前MsgKey冲突的旧Msg
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }

            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

            timeDiff = timeDiff / 1000;

            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }

            int indexCount = this.indexHeader.getIndexCount();
            // 计算index条目的绝对位置
            int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + indexCount * indexSize;
            // MsgKey的hash值
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            //MsgKey的物理偏移量
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            //MsgKey的存储时间
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            //MsgKey的槽值通常为0。如果存在hash冲突则表示与当前MsgKey冲突的旧MsgKey的偏移量。
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

            // 槽位置绝对值absSlotPos对应值的实际意义为 当前indexFile现有的条目数。其实也是定位MsgKey的偏移量
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

            if (this.indexHeader.getIndexCount() <= 1) {
                this.indexHeader.setBeginPhyOffset(phyOffset);
                this.indexHeader.setBeginTimestamp(storeTimestamp);
            }

            if (invalidIndex == slotValue) {
                this.indexHeader.incHashSlotCount();
            }
            this.indexHeader.incIndexCount();
            this.indexHeader.setEndPhyOffset(phyOffset);
            this.indexHeader.setEndTimestamp(storeTimestamp);
            return true;
        }
    } 
    return false;
}
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
                            final long begin, final long end) {
    if (this.mappedFile.hold()) {
        int keyHash = indexKeyHashMethod(key);
        int slotPos = keyHash % this.hashSlotNum;
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        try {
            // 获取当前MsgKey的偏移量,或者当前MsgKey添加时IndexFile中现有的条目数
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() || 
            									this.indexHeader.getIndexCount() <= 1) {
            } else {// 存在hash冲突
                for (int nextIndexToRead = slotValue; ; ) {
                    if (phyOffsets.size() >= maxNum) {
                        break;
                    }
                    // 获取当前MsgKey对应的其实position
                    int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + nextIndexToRead 	
                    		* indexSize;
                    int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                    long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
                    long timeDiff = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                    // 获取到与当前MsgKey冲突的最近旧MsgKey的偏移量
                    int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

                    if (timeDiff < 0) {
                        break;
                    }

                    timeDiff *= 1000L;

                    // 再根据时间维度从众多冲突的key中确定需要返回的key
                    long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                    boolean timeMatched = timeRead >= begin && timeRead <= end;

                    if (keyHash == keyHashRead && timeMatched) {
                        phyOffsets.add(phyOffsetRead);
                    }

                    if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() || 
                    									prevIndexRead == nextIndexToRead || timeRead < begin) {
                        break;
                    }

                    nextIndexToRead = prevIndexRead;
                }
            }
        } catch (Exception e) {
            log.error("selectPhyOffset exception ", e);
        } finally {
            this.mappedFile.release();
        }
    }
}

疑问:处理冲突的时候涉及时间维度,为啥现实通过msgKey查询具体消息的时候没有时间维度的选择呢?是不是会把对应冲突的消息都给返回呢?

2.IndexHeader

IndexHeader共占用40个字节。其中包括Long类型的 起始【beginTimestampIndex】 & 结束【endTimestampIndex】时间以及起始【beginPhyoffsetIndex】 & 结束【endPhyoffsetIndex】消息物理偏移量4个指标,int类型的槽表个数【hashSlotcountIndex】以及索引个数【indexCountIndex】2个指标。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/907787.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【深入解析:数据结构栈的魅力与应用】

本章重点 栈的概念及结构 栈的实现方式 数组实现栈接口 栈面试题目 概念选择题 一、栈的概念及结构 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一端 称为栈顶&#xff0c;另一端称为栈底。栈中的数…

行政区划表设计和多级查询

简介 行政区划的表为一个多层级结构&#xff0c;设计大同小异&#xff0c;大致结构如下所示 其中&#xff0c;code为区划编号&#xff08;主键&#xff09;&#xff0c;parent_code为父区划编号&#xff0c;ancestors为祖区划编号&#xff0c;查询也主要围绕前两个展开。 查询…

【后端】Core框架版本和发布时间以及.net 6.0启动文件的结构

2023年&#xff0c;第35周&#xff0c;第1篇文章。给自己一个目标&#xff0c;然后坚持总会有收货&#xff0c;不信你试试&#xff01; .NET Core 是一个跨平台的开源框架&#xff0c;用于构建现代化的应用程序。它在不同版本中有一些重要的区别和发布时间 目录 一、Core版本和…

新研究:Gartner 公有云成本管理框架

2023年6月28日&#xff0c;Gartner 出版了名为《Beyond FinOps: the Gartner Framework for Public Cloud Financial Management》的公有云成本管理框架&#xff0c;旨在帮助企业/组织应对公有云支出的挑战&#xff0c;同时抓住新机遇&#xff0c;推动更有效的 IT 使用。新框架…

【HCIP】05.OSPF邻居与邻接关系

报文 OSPF头部 报文头部中影响邻居关系建立的字段是Router ID、Area ID、AuType HELLO报文 报文中影响建立邻居关系到是 hello time时间&#xff0c;DR优先级等 DD报文 序列号&#xff0c;LSA头部摘要信息I位&#xff1a;第一个报文&#xff0c;用于主从选举M位&#xff1a;…

跨境电商企业新宠Telegram使用指南,这个功能你不能忽视

Telegram是一款消息传递应用&#xff0c;以其隐私和安全性以及作为移动应用的易用性而闻名。截至 2023年 6月&#xff0c;它拥有超过7亿月活跃用户&#xff0c;是全球下载量排名前五的应用之一。随着其受欢迎程度的不断提高&#xff0c;该渠道正在全球企业中得到认可。让我们来…

Apache Doris 入门教程34:Join 优化

Bucket Shuffle Join Bucket Shuffle Join 是在 Doris 0.14 版本中正式加入的新功能。旨在为某些 Join 查询提供本地性优化&#xff0c;来减少数据在节点间的传输耗时&#xff0c;来加速查询。 它的设计、实现和效果可以参阅 上面的图片展示了Bucket Shuffle Join的工作原理…

ES 索引重命名--Reindex(一)

ES reindex脚本流程&#xff0c;下图为整体流程&#xff1a; 步骤&#xff08;1&#xff09;&#xff1a;每次写入把之前的索引删除再重新创建索引&#xff0c;然后判断索引是否创建成功&#xff0c;由于创建成功返回结果是json&#xff0c;因此用Json Input插件去解析json获得…

(三)行为模式:4、迭代器模式(Iterator Pattern)(C++示例)

1、迭代器模式&#xff08;Iterator Pattern&#xff09;含义 迭代器模式&#xff08;Iterator&#xff09;&#xff0c;提供一种方法顺序访问一个聚合对象中各个元素&#xff0c;而不暴露该对象的内部表示。【DP】 通过使用迭代器模式&#xff0c;可以将遍历算法与集合对象解耦…

pytorch内存泄漏

问题描述&#xff1a; 内存泄漏积累过多最终会导致内存溢出&#xff0c;当内存占用过大&#xff0c;进程会被killed掉。 解决过程&#xff1a; 在代码的运行阶段输出内存占用量&#xff0c;观察在哪一块存在内存剧烈增加或者显存异常变化的情况。但是在这个过程中要分级确认…

IT运维:使用数据分析平台监控 Nginx 服务

Nginx 是一种流行的 HTTP 服务器和反向代理服务器。作为 HTTP 服务器&#xff0c;Nginx 可以非常有效和可靠地提供静态内容&#xff1b;作为反向代理&#xff0c;它可以用作多个后端服务器或其他应用程序&#xff08;例如缓存和负载平衡&#xff09;的单个访问入口。 Nginx 同时…

3分钟教你如何选防水劳保鞋

防水劳保鞋是防止水渗透鞋子造成湿漉漉感觉的安全防护鞋&#xff0c;主要用于地面积水或有溅水的作业场景。前面的文章中与大家简单介绍了防水劳保鞋&#xff0c;其实选购防水劳保鞋也是有窍门的。一双质量好的防水劳保鞋可以为工人的工作效率添砖加瓦&#xff0c;反之&#xf…

【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结

背景介绍 公司最近年底要对系统做一次大的体检&#xff0c;所以是不测不知道&#xff0c;一测吓一跳啊&#xff0c;出现了很多问题&#xff0c;其中最恶心的问题要数我们的ROCKETMQ消息队列的问题了&#xff0c;大家都知道消息队列是作为流量削峰的主要手段&#xff0c;负责系…

C++基础Ⅰ编译、链接

目录儿 1 C是如何工作的1.1 预处理语句1.2 include1.3 main()1.4 编译单独编译项目编译 1.5 链接 2 定义和调用函数3 编译器如何工作3.1 编译3.1.1 引入头文件系统头文件自定义头文件 3.1.2 自定义类型3.1.3 条件判断拓展: 汇编 3.2 链接3.2.1 起始函数3.2.2 被调用的函数 3.3 …

C++新经典09--函数新特性、inline内联函数与const详解

函数回顾与后置返回类型 函数定义中如果有形参则形参应该有名字&#xff0c;而不光是只有类型&#xff0c;但是如果并不想使用这个形参&#xff0c;换句话说这个形参并不在这个函数中使用&#xff0c;则不给形参名也可以&#xff0c;但在调用这个函数的时候&#xff0c;该位置…

什么情况下,亚马逊账户会被判滥用?

如果说有对亚马逊跨境电商有所了解的朋友就会知道&#xff0c;现在亚马逊跨境电商的规则是十分严格的&#xff0c;亚马逊开店变得越来越困难&#xff0c;尤其是要想成功的把一个亚马逊店铺给开好。 这几年不少有一些违规的亚马逊卖家都被系统检测到了&#xff0c;如果说被系统…

投资者的秘密武器,代理IP在金融决策中的驱动作用

在如今数据为王的时代&#xff0c;无论从事哪个行业&#xff0c;都需要使用数据分析来指导自己的决策&#xff0c;而这些数据又是从哪里来的呢&#xff1f;很多人都知道&#xff0c;数据采集可以帮助我们将分散在互联网各个网站上的大量数据集中起来。对于金融行业来说&#xf…

重磅丨无人机新规出台,这些红线不能踩!

近年来&#xff0c;随着无人机研发技术逐渐成熟&#xff0c;无人机在各个领域得到了广泛应用&#xff0c;包括VR全景航拍、乡村农业、城市管理、环境监测等领域&#xff0c;其应用场景及使用方式都还在迅速拓展&#xff0c;无人机行业受到社会广泛关注。 但在实践中&#xff0c…

SOLIDWORKS焊件是什么?

SOLIDWORKS是一款广泛应用于机械设计领域的三维计算机辅助设计软件。SOLIDWORKS提供了强大的焊件功能&#xff0c;可以帮助工程师们以更高的效率设计焊接件。本文将介绍SOLIDWORKS焊件的概念、特点以及使用方法&#xff0c;以期帮助读者更好地理解和应用这一关键技术。 SOLIDWO…

国际刑警组织逮捕 14 名涉嫌盗窃 4000 万美元的网络罪犯

Bleeping Computer 网站披露&#xff0c;4 月份&#xff0c;国际刑警组织发动了一起为期四个月&#xff0c;横跨 25 个非洲国家的执法行动 “Africa Cyber Surge II”&#xff0c;共逮捕 14 名网络犯罪嫌疑人&#xff0c;摧毁 20000 多个从事勒索、网络钓鱼、BEC 和在线诈骗的犯…