深度解析RocketMq源码-IndexFile

news2025/1/11 9:06:49

1.绪论

在工作中,我们经常需要根据msgKey查询到某条日志。但是,通过前面对commitLog分析,producer将消息推送到broker过后,其实broker是直接消息到达broker的先后顺序写入到commitLog中的。我们如果想根据msgKey检索一条消息无疑大海捞针,所以们需要像数集一样建立一个目录,我们其实可以想到的是构建一个Map,key存储msgKey,value存储msg在commitLog中的物理偏移量。而这个目录其实就是indexFile。

2.indexFile的组成和原理

indexFile主要由两部分组成,分别是indexFile文件头和index的文件内容。

2.1 indexFile文件头 - IndexHeader 

indexHeader占据40个字节,其中最重要的是他记录了整个索引文件的最开始插入的索引的时间和最后一条数据插入的时间,主要是为了支持根据时间进行范围搜索。以及第一条和最后一条日志的索引位置。还有一个就是已经插入了多少条索引IndexCount。

public class IndexHeader {
//index文件头占4个字节
    public static final int INDEX_HEADER_SIZE = 40;
    private static int beginTimestampIndex = 0;
    private static int endTimestampIndex = 8;
    private static int beginPhyoffsetIndex = 16;
    private static int endPhyoffsetIndex = 24;
    private static int hashSlotcountIndex = 32;
    private static int indexCountIndex = 36;
    private final ByteBuffer byteBuffer;
    //开始的时间戳
    private final AtomicLong beginTimestamp = new AtomicLong(0);
    //结束时间戳
    private final AtomicLong endTimestamp = new AtomicLong(0);
    //开始的物理偏移量
    private final AtomicLong beginPhyOffset = new AtomicLong(0);
    //结束的物理偏移量
    private final AtomicLong endPhyOffset = new AtomicLong(0);
    //hash槽的数量
    private final AtomicInteger hashSlotCount = new AtomicInteger(0);
    //index的数量
    private final AtomicInteger indexCount = new AtomicInteger(1);
}

2.2 indexFile的组成

idnexFile的内容包括:

1. 40个字节的indexFile头

2. 4* 500w个字节hash槽,每个槽记录的其实是:根据key取hash值%槽数在当前hash槽的索引的序号(也即当前有多少条索引)

3. 20*2000w个自己的索引数,每条索引20个字节,包含4个字节索引key的hash值+8个字节的物理偏移量+4个字节的当前索引的插入时间距离该索引文件第一条索引的插入时间的差值+4个字节的上一个在当前hash槽的索引的序号。

我们可以画图来描述一下:

e5b98bd4baf04c1c9393127f0a93ebca.png

可以看出idnexFile是采用链地址法解决hash冲突的,每个索引存储有上一条拥有相同hash值索引的index值,相当于通过链表将这些hash冲突的索引串起来。

public class IndexFile {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    //一个hash槽的大小为4个字节
    private static int hashSlotSize = 4;
    //一条索引的大小为20字节
    private static int indexSize = 20;
    private static int invalidIndex = 0;
    //hash槽的数量
    private final int hashSlotNum;
    //index的总数量
    private final int indexNum;
    //index也是存储在mappedFile中的
    private final MappedFile mappedFile;
    private final MappedByteBuffer mappedByteBuffer;
    //index文件的头
    private final IndexHeader indexHeader;

    public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
        final long endPhyOffset, final long endTimestamp) throws IOException {
        //文件总大小 = 头部所占40个字节 + hash槽数量(默认为500w) * 4个字节 + index数量 * 20个字节
        int fileTotalSize =
            IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
        //新建mappedFile
        this.mappedFile = new MappedFile(fileName, fileTotalSize);
        //获取到与文件建立映射关系的buffer
        this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
        //hash槽数量
        this.hashSlotNum = hashSlotNum;
        //索引文件的数量
        this.indexNum = indexNum;

        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        //index文件的头部
        this.indexHeader = new IndexHeader(byteBuffer);

        if (endPhyOffset > 0) {
            //够级整个索引文件的开始的物理偏移量和结束的偏移量
            this.indexHeader.setBeginPhyOffset(endPhyOffset);
            this.indexHeader.setEndPhyOffset(endPhyOffset);
        }

        if (endTimestamp > 0) {
            //够级整个索引文件的开始时间戳和结束时间戳
            this.indexHeader.setBeginTimestamp(endTimestamp);
            this.indexHeader.setEndTimestamp(endTimestamp);
        }
    }
}

3.向indexFile插入一条索引数据

主要的步骤如下:

1.获取msgKey的hash值;

2.通过hash值对总的hash槽数取模得到对应第几个槽;

3.40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址;

4.40个字节的索引头大小+hash槽总数*4个字节+现在存储了多少条索引*20个字节得到最新一条数据写入的物理偏移量;

5.分别写入索引内容:hash值,commitLog的物理偏移量,距离第一条索引的时间戳+上一条指向同一个hash槽的索引的序号(也即当前hash槽中存储的值);

6.将最新一条的索引序号写入到hash槽中。

    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            //1.获取msgKey的hash值
            int keyHash = indexKeyHashMethod(key);
            //2.通过hash值对总的hash槽数取模得到对应第几个槽
            int slotPos = keyHash % this.hashSlotNum;
            //3.40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            try {
                //获取到上一个hash槽的所指向的索引序号
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }
                //获取当前索引与第一条索引的差值
                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

                timeDiff = timeDiff / 1000;

                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }
                //40个字节的索引头大小+hash槽总数*4个字节+现在存储了多少条索引*20个字节得到最新一条数据写入的物理偏移量
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;
                //分别写入索引内容:hash值,commitLog的物理偏移量,距离第一条索引的时间戳+上一条指向同一个hash槽的索引的序号
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
                //将最新一条的索引序号写入到hash槽中
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
                //更新idnex中的最后一条索引的时间戳和物理偏移量
                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }
                
                if (invalidIndex == slotValue) {
                    this.indexHeader.incHashSlotCount();
                }   
                //增加indexheader索引序号
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }

        return false;
    }

4.从indexFile中读取一条索引数据

1.获取索引key的hash值;

2.hash值对槽总数取模获得第几个槽;

3.40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址;

4.从槽中读取到该槽所指向的最新的一条索引序号;

5.40个字节的索引头大小+hash槽总数*4个字节+hash槽中存储的索引序号*20个字节得到最新一条数据写入的物理偏移量;

6.如果hash值相等,并且时间匹配,证明匹配到数据,跳出循环;

7.如果不匹配,便根据链表寻找到拥有相同hash值并且时间匹配的日志;

    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
                                final long begin, final long end) {
        if (this.mappedFile.hold()) {
            //获取索引key的hash值
            int keyHash = indexKeyHashMethod(key);
            //hash值对槽总数取模获得第几个槽
            int slotPos = keyHash % this.hashSlotNum;
            //40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            try {
                //从槽中读取到该槽所指向的最新的一条索引序号
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                    || this.indexHeader.getIndexCount() <= 1) {
                } else {
                    for (int nextIndexToRead = slotValue; ; ) {
                        if (phyOffsets.size() >= maxNum) {
                            break;
                        }
                       // 40个字节的索引头大小+hash槽总数*4个字节+hash槽中存储的索引序号*20个字节得到最新一条数据写入的物理偏移量
                        int absIndexPos =
                            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                                + nextIndexToRead * indexSize;
                        //获取索引的hash值
                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        //获取到该索引的物理偏移量
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
                        //获取到时间戳差值
                        long timeDiff = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        //获取到拥有相同槽数的上一条索引序号
                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

                        if (timeDiff < 0) {
                            break;
                        }

                        timeDiff *= 1000L;

                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
                        //如果hash值相等,并且时间匹配,证明匹配到数据,跳出循环
                        if (keyHash == keyHashRead && timeMatched) {
                            phyOffsets.add(phyOffsetRead);
                        }
                        //如果上一条索引非法,证明已经到达链表头部,跳出循环,证明该条索引就是需要寻找的索引
                        if (prevIndexRead <= invalidIndex
                            || prevIndexRead > this.indexHeader.getIndexCount()
                            || prevIndexRead == nextIndexToRead || timeRead < begin) {
                            break;
                        }

                        nextIndexToRead = prevIndexRead;
                    }
                }
            } catch (Exception e) {
                log.error("selectPhyOffset exception ", e);
            } finally {
                this.mappedFile.release();
            }
        }
    }

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1872132.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Embedding 、词嵌入、向量模型说的是一回事么?AI是如何理解世界?AI人不能不看的Embedding白话科普!

在AI理解世界的过程中&#xff0c;向量模型扮演着一个至关重要的角色&#xff0c;甚至可以说它是AI大模型用以构建和理解复杂数据的基础&#xff0c;也是对不同形态数据的一种标准化的“浓缩”。它能够将语言、图像、声音等多样化的信息&#xff0c;转化为一种通用的、数学化的…

知乎正通过乱码来干扰必应/谷歌等爬虫,从而限制中文数据集被用于AI训练

有用户反馈称使用微软必应搜索和谷歌搜索发现存在不少知乎乱码内容&#xff0c;即搜索结果里知乎内容的标题和正文内容都可能是乱码的&#xff0c;但抓取的正文前面一些段落内容可以正常查看。考虑到此前知乎已经屏蔽除百度和搜狗以外的所有搜索引擎爬虫 (蜘蛛 / 机器人)&#…

《数字图像处理与机器视觉》案例二(基于边缘检测和数学形态学焊缝图像处理)

一、前言 焊缝是评价焊接质量的重要标志&#xff0c;人工检测方法存在检测标准不统一&#xff0c;检测精度低&#xff0c;焊缝视觉检测技术作为一种重要的质量检测方法&#xff0c;正逐渐在各行各业中崭露头角。把焊缝准确的从焊接工件中准确分割出来是焊缝评价的关键一步&…

使用模板方法设计模式封装 socket 套接字并实现Tcp服务器和客户端 简单工厂模式设计

文章目录 使用模板方法设计模式封装套接字使用封装后的套接字实现Tcp服务器和客户端实现Tcp服务器实现Tcp客户端 工厂模式 使用模板方法设计模式封装套接字 可以使用模块方法设计模式来设计套接字 socket 的封装 模板方法&#xff08;Template Method&#xff09;设计模式是一…

百度ueditor如何修改图片的保存位置

背景 编辑器的保存图片是设置有默认规则的&#xff0c;但是服务器上一般会把图片路径设置为软连接&#xff0c;所以我就需要更改编辑器保存图片的路径&#xff0c;要不然&#xff0c;每次有新的部署&#xff0c;上一次上传的图片就会失效。先来看看编辑器默认的保存路径吧&…

目标检测算法之RT-DETR

RT-DETR算法理解 BackgroundModel ArchitectureEfficient Hybrid EncoderUncertainty-minimal Query Selection 总结 Background Real-time Detection Transformer&#xff08;RT-DETR&#xff09;是一个基于tranformer的实时推理目标检测模型。RT-DETR是2023年百度发布的一个…

七天速通javaSE:第五天 数组进阶

文章目录 前言一、二维数组二、Arrays类1.toString打印数组内各元素1.1 示例1.2 自己实现内部逻辑 2. sort升序排列3. fill数组填充&#xff08;重新赋值&#xff09;4.equals比较数组元素是否相等 三、冒泡排序 前言 本文将学习二维数组、arrays类以及冒泡排序 一、二维数组 …

重生奇迹MU新手攻略:如何一步步往大佬发展

装备强化攻略&#xff1a; 提纯装备&#xff1a;通过提纯装备可以提升基础属性&#xff0c;选择合适的装备进行提纯可以获得更好的效果。 镶嵌宝石&#xff1a;使用宝石进行装备镶嵌可以增加装备的属性&#xff0c;根据需要选择适合的宝石进行镶嵌。 洗练装备&#xff1a;通…

基于盲信号处理的声音分离——最大化信噪比的ICA算法

基于最大化信噪比的ICA算法是一种较新模式的ICA算法&#xff0c;在该算法中利用输出信号的信噪比建立信噪比函数作为该算法的代价函数。 在上式中&#xff0c;用S表示原信号&#xff0c;Y表示输出信号。由于原信号S并不知道&#xff0c;因此采用估计信号Y的滑动平均 来代替&…

激励视频广告的eCPM更高,每天的展示频次有限制吗?

在APP发展初期&#xff0c;由于DUA量级有限&#xff0c;所需的广告资源比较少&#xff0c;往往接入1-2家广告平台就能满足APP用户每日需要的广告展示量。而随着APP用户规模的扩大、广告场景的不断丰富&#xff0c;开发者要提升APP整体广告变现收益&#xff0c;一是可以尽可能多…

PLC数据采集案例

--------天津三石峰科技案例分享 项目介绍 项目背景 本项目为天津某钢铁集团下数字化改造项目&#xff0c;主要解决天津大型钢厂加氢站数字化改造过程中遇到的数据采集需求。项目难点PLC已经在运行了&#xff0c;需要采集里面数据&#xff0c;不修改程序&#xff0c;不影响P…

3D立体卡片动效(附源码)

3D立体卡片动效 欢迎关注&#xff1a;xssy5431 小拾岁月参考链接&#xff1a;https://mp.weixin.qq.com/s/9xEjPAA38pRiIampxjXNKQ 效果展示 思路分析 需求含有立体这种关键词&#xff0c;我们第一反应是采用动画中的平移、倾斜等实现。如果是立体&#xff0c;必然产生阴影&…

浅谈制造业EHS管理需要关注的重点

在快速发展的制造业中&#xff0c;EHS&#xff08;环境、健康、安全&#xff09;管理体系如同一道坚实的屏障&#xff0c;守护着企业的绿色与安全。那么&#xff0c;这个管理体系到底包含哪些内容呢&#xff1f;接下来&#xff0c;让我们一同探寻其奥秘。 一、EHS管理体系的丰富…

你的钱花得值不值?简谈FMEA培训的投资与回报

在探讨 FMEA&#xff08;失效模式及影响分析&#xff09;培训是否值得投资时&#xff0c;需要综合考虑多个方面。 从投资的角度来看&#xff0c;FMEA 培训通常需要一定的费用支出&#xff0c;包括培训课程的费用、培训期间员工的时间成本以及可能涉及的培训材料和设备成本。 然…

利用MMDetection将单阶段检测器作为Faster R-CNN的RPN

将单阶段检测器作为RPN 一、在 Faster R-CNN 中使用 FCOSHead 作为 RPNHead与原始配置的对比结果Neck (FPN)RPN HeadROI Head学习率 使用单阶段检测器作为RPN的优势1. 速度提升2. 准确性3. 简化架构4. 灵活性 二、评估候选区域三、用预先训练的 FCOS 训练定制的 Faster R-CNN 本…

Excel单元格输入逐字动态提示可选输入效果制作

Excel单元格输入逐字动态提示可选输入效果制作。INDEX函数整理动态列表&#xff0c;再配合IF函数干净界面&#xff0c;“数据验证”完成点选。 (笔记模板由python脚本于2024年06月27日 22:26:14创建&#xff0c;本篇笔记适合喜欢用Excel处理数据的coder翻阅) 【学习的细节是欢悦…

【数据集划分——针对于原先图片已经整理好类别】训练集|验证集|测试集

目标&#xff1a;用split-folders进行数据集划分 学习资源&#xff1a;https://www.youtube.com/watch?vC6wbr1jJvVs 努力的小巴掌 记录计算机视觉学习道路上的所思所得。 现在已经有了数据集&#xff0c;并且&#xff0c;注意&#xff0c;是已经划分好类别的&#xff01; …

基于ARM的通用的Qt移植思路

文章目录 实验环境介绍一、确认Qt版本二、确认交叉编译工具链三、配置Qt3.1、修改qmake.conf3.2、创建autoConfig.sh配置文件 四、编译安装Qt五、移植Qt安装目录六、配置Qt creator6.1、配置qmake6.2、配置GCC编译器6.3、配置G编译器6.4、配置编译器套件6.5、创建应用 七、总结…

MySQL 主从复制集群高可用

在实际的生产环境中&#xff0c;如果对数据库的读和写都在同一个数据库服务器中操作&#xff0c;无论是在安全性、高可用性还是高并发等各个方面都是完全不能满足实际需求的。因此&#xff0c;一般来说 都是通过主从复制&#xff08;Master-Slave&#xff09;来同步数据&#x…

微信小程序毕业设计-线上教育商城系统项目开发实战(附源码+论文)

大家好&#xff01;我是程序猿老A&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f49e;当前专栏&#xff1a;微信小程序毕业设计 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f380; Python毕业设计…