RocketMQ系统性学习-RocketMQ高级特性之消息存储在Broker的文件布局

news2025/1/6 4:54:02

🌈🌈🌈🌈🌈🌈🌈🌈
【11来了】文章导读地址:点击查看文章导读!
🍁🍁🍁🍁🍁🍁🍁🍁

RocketMQ 高级特性

消息在 Broker 的文件布局

RocketMQ 的混合存储

在 RocketMQ 存储架构中,采用混合存储,其中有 3 个重要的存储文件:Commitlog、ConsumeQueue、IndexFile

  • Topic 的消息实体存储在 Commitlog 中,顺序进行写入
  • ConsumeQueue 可以看作是基于 Topic 的 Commitlog 的索引文件,在 ConsumeQueue 中记录了消息在 Commitlog 中的偏移量、消息大小的信息,用于进行消费
  • IndexFile 提供了可以通过 key 来查询消息的功能,key 是由 topic + msgId 组成的,可以很方便地根据 key 查询具体的消息

消费者去 Broker 中消费数据流程如下:

  1. 先读取 ConsumeQueue,拿到具体消息在 Commitlog 中的偏移量
  2. 通过偏移量在 Commitlog 读取具体 Topic 的信息

消费者去寻找 Commitlog 中的数据流程图如下:

在这里插入图片描述

那么先来看一下 Commitlog 文件在哪里进行写入

SendMessageProcessor # processRequest 作为入口,

经过层层调用 this.sendMessage() -> this.brokerController.getMessageStore().putMessage(msgInner) -> DefaultMessageStore # asyncPutMessage ,最终到达 asyncPutMessage() 方法中,在这里会进行消息的磁盘写的操作:

  1. 创建消息存储所对应的 ByteBuffer:putMessageThreadLocal.getEncoder().encode(msg)

    在这个方法中,会对 Commitlog 文件进行写入:

    在这里插入图片描述

    这里的 byteBuffer 也就是 Commitlog 文件的结构如下:

    在这里插入图片描述

  2. 将创建的 ByteBuffer 设置到 msg 中去: msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer())

  3. 开始向文件中追加消息: result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext)

appendMessage 方法中主要是写入消息之后,Commitlog 中一些数据会发生变化,因此需要进行修改,还是经过层层调用 appendMessage()-> appendMessagesInner()-> cb.doAppend(),最终到达 doAppend 方法,接下来看这个方法都做了些什么:

  1. 首先取出来在上边创建消息对应的 ByteBuffer:ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff()

  2. 接下来修改这个 ByteBuffer 中的一些数据:

    这个 ByteBuffer 在创建的时候已经将一些默认信息设置好了,这里只需要对写入消息后会变化的信息进行修改!

    • 先修改 QueueOffset (偏移量为 20 字节):preEncodeBuffer.putLong(20, queueOffset)
    • 再修改 PhysiclOffset (偏移量为 28 字节):preEncodeBuffer.putLong(28, fileFromOffset + byteBuffer.position())
    • 再修改 SysFlag、BornTimeStamp、BornHost 等等信息,都是通过偏移量在 ByteBuffer 中进行定位,再修改

那么通过上边就 完成了对 Commitlog 文件的追加操作 ,ReputMessageService 线程中的 run 方法,会每隔 1ms 就会去 Commitlog 中取出数据,写入到 ConsumeQueue 和 IndexFile 中

那么接下来寻找写 ConsumerQueue 的地方,也是通过调用链直接找到核心方法:

  • DefaultMessageStore # ReputMessageService # run
  • -> this.doReput()
  • -> DefaultMessageStore.this.doDispatch(dispatchRequest)
  • -> dispatcher.dispatch(req)
  • -> 这里进入到构建 ConsumeQueue 类的 dispatch 方法中:CommitLogDispatcherBuildConsumeQueue # dispatch()
  • -> DefaultMessageStore.this.putMessagePositionInfo(request)
  • -> this.consumeQueueStore.putMessagePositionInfoWrapper(dispatchRequest)
  • -> this.putMessagePositionInfoWrapper(cq, dispatchRequest)
  • -> consumeQueue.putMessagePositionInfoWrapper(request)
  • -> this.putMessagePositionInfo()

这个调用链比较长,如果不想一步一步点的话,直接找到 ConsumeQueue # this.putMessagePositionInfo() 这个方法即可,在这个方法中向 byteBufferIndex 中放了 3 个数据,就是 ConsumeQueue 的组成 = Offset + Size + TagsCode

在这里插入图片描述

那么 ConsumeQueue 的组成结构就如下所示,通过 ConsumeQueue 主要用于寻找 Topic 下的消息在 Commitlog 中的位置:

在这里插入图片描述

IndexFile 主要是通过 Key(Topic+msgId) 来寻找消息在 Commitlog 中的位置

接下来看一下 IndexFile 结构是怎样的,在上边寻找 ConsumeQueue 的调用链中,有一个 dispatcher.dispatch() 方法,这次我们进入到构建 IndexFile 的实现类的 dispatch 方法中,即:CommitLogDispatcherBuildIndex # dispatch(),那么接下来还是经过调用链到达核心方法:

  • CommitLogDispatcherBuildIndex # dispatch()
  • -> DefaultMessageStore.this.indexService.buildIndex(request)
  • -> indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()))
  • -> indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp())

那么核心方法就在 IndexFile # putKey() 中:

  1. 首先根据 key 计算出哈希值,key 也就是 Topic + 消息的 msgId

  2. 再通过哈希值对哈希槽的数量取模,计算出在哈希槽中的相对位置:slotPos = keyHash % this.hashSlotNum

  3. 计算 key 在 IndexFile 中的绝对位置,通过 哈希槽的位置 * 每个哈希槽的大小(4B) + IndexFile 头部的大小(40B)

    代码即:

    absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize

  4. 计算索引在 IndexFile 中的绝对位置,通过 absIndexPos = IndexFile 头部大小(40B) + 哈希槽位置 * 哈希槽大小(4B) + 消息的数量 * 消息索引的大小(20B)

    int absIndexPos =
        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
            + this.indexHeader.getIndexCount() * indexSize;
    
  5. 向 IndexFile 的第三部分(索引列表)中放入数据的索引,索引包含 4 部分,共 20B:keyHash、phyOffset、timeDiff、slotValue

    在这里插入图片描述

  6. 向 IndexFile 的第二部分(哈希槽)中放入数据

    在这里插入图片描述

IndexFile 的结构如下图所示:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1326827.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023.12.19 关于 Redis 通用全局命令

目录 引言 Redis 全局命令 SET & GET KEYS EXISTS DEL EXPIRE TTL TYPE redis 引入定时器高效处理过期 key 基于优先级队列方式 基于时间轮方式 引言 Redis 是根据键值对的方式存储数据的必须要进入 redis-cli 客户端程序 才能输入 redis 命令 Redis 全局命令 R…

手写单链表(指针)(next域)附图

目录 创建文件: 具体实现: 首先是头插。 注意:一定要注意:再定义tmp时,要给它赋一个初始值(推荐使用 new list_next) 接着是尾插: 随后是中间插: 然后是最简单的改值&#xf…

【Linux】权限篇(一)

权限篇目录 1. 前言2. shell3. 权限介绍3.1 什么是权限3.2 权限的本质3.3 Linux中的用户3.4 Linux中文件的权限 1. 前言 在之前的博客中已经学习了一些相关的操作,这次来分享的是与Linux的权限有关的一些笔记。 在正片开始之前,先来讲讲外壳(shell)。 …

实体店如何进行线上线下统一管理

随着互联网的普及和消费者行为的改变,实体店不再满足于单一的线下销售模式,开始探索线上线下的融合。本文将介绍如何通过搭建小程序和乔拓云平台,实现实体店的线上线下统一管理。 实体店可以通过微信小程序搭建自己的线上平台,实现…

【贪心算法】之 买股票的最佳时机(中等题)

1.买卖股票的最佳时机 把利润分解为每天为单位的维度,而不是从0天到第3天整体去考虑! 那么只收集正利润就是贪心所贪的地方! 局部最优:收集每天的正利润,全局最优:求得最大利润。 局部最优可以推出全局最…

一篇文章带你进阶CTF命令执行

以下的命令是为了方便以后做题时方便各位读者直接来这里复制使用,刚开始还请先看完这篇文章后才会懂得下面的命令 ?ceval($_GET[shy]);&shypassthru(cat flag.php); #逃逸过滤 ?cinclude%09$_GET[shy]?>&shyphp://filter/readconvert.base64-…

基于MLP完成CIFAR-10数据集和UCI wine数据集的分类

基于MLP完成CIFAR-10数据集和UCI wine数据集的分类,使用到了sklearn和tensorflow,并对图片分类进行了数据可视化展示 数据集介绍 UCI wine数据集: http://archive.ics.uci.edu/dataset/109/wine 这些数据是对意大利同一地区种植的葡萄酒进…

Navicat里放大、缩小字体的快捷方法

我是偶然误触键盘把字体缩小了,研究以后发现的这个快捷键,分享给大家。 方法:按住【CtrlShift】组合键,再拖动鼠标滚轮,就可以缩放字体了。 缩小效果: 放大效果:

在 TensorFlow 中启用 Eager Execution

TensorFlow 是一个端到端的开源机器学习平台,可以更轻松地构建和部署机器学习模型。TensorFlow 应用程序使用一种称为数据流图的结构。默认情况下,在 TensorFlow 1.0 版中,每个图形都必须在 TensorFlow 会话中运行,这只允许一次运…

C# Onnx Yolov8 Detect 物体检测 多张图片同时推理

目录 效果 模型信息 项目 代码 下载 C# Onnx Yolov8 Detect 物体检测 多张图片同时推理 效果 模型信息 Model Properties ------------------------- date:2023-12-18T11:47:29.332397 description:Ultralytics YOLOv8n-detect model trained on …

UE4 UE5 一直面向屏幕

一直面相屏幕,方法很简单 新建一个蓝图,如下添加组件: 蓝图如下: Rotation Actor :需要跟随镜头旋转的物体 Update:一个timeline(替代event tick 只是为了循环) Timeline&#xff…

变量覆盖漏洞 [BJDCTF2020]Mark loves cat 1

打开题目 我们拿dirsearch扫描一下看看 扫描得到 看见有git字眼&#xff0c;那我们就访问 用githack去扒一下源代码看看 可以看到确实有flag.php结合index.php存在 但是当我去翻源代码的时候却没有翻到 去网上找到了这道题目的源代码 <?phpinclude flag.php;$yds &qu…

Linux Centos 配置 Docker 国内镜像加速

在使用 Docker 进行容器化部署时&#xff0c;由于国外的 Docker 镜像源速度较慢&#xff0c;我们可以配置 Docker 使用国内的镜像加速器&#xff0c;以提高下载和部署的效率。本文将介绍如何在 CentOS 系统上配置 Docker 使用国内镜像加速。 步骤一&#xff1a;安装 Docker 首…

大数据机器学习 - 似然函数:概念、应用与代码实例

文章目录 大数据机器学习 - 似然函数&#xff1a;概念、应用与代码实例一、概要二、什么是似然函数数学定义似然与概率的区别重要性举例 三、似然函数与概率密度函数似然函数&#xff08;Likelihood Function&#xff09;定义例子 概率密度函数&#xff08;Probability Density…

伪协议和反序列化 [ZJCTF 2019]NiZhuanSiWei

打开题目 代码审计 第一层绕过 if(isset($text)&&(file_get_contents($text,r)"welcome to the zjctf")){ echo "<br><h1>".file_get_contents($text,r)."</h1></br>"; 要求我们get传参的text内容必须为w…

智能优化算法应用:基于未来搜索算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于未来搜索算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于未来搜索算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.未来搜索算法4.实验参数设定5.算法结果6.…

Chatgpt如何多人使用?如何防止封号?

时下火爆年轻人的AI技术当属于Chatgpt&#xff0c;但他是一把双刃剑&#xff0c;使用它给我们带来便利的同时&#xff0c;也可能会带来隐患&#xff0c;因此我们需要科学使用AI技术。 本文将针对备受关注的Chatgpt如何多人共享使用&#xff1f;如何防止封号&#xff0c;为你带…

playbook 模块

list together nested with_items Templates 模块 jinja模块架构&#xff0c;通过模板可以实现向模板文件传参&#xff08;python转义&#xff09;把占位符参数传到配置文件中去。 生产一个目标文本文件&#xff0c;传递变量到文本文件当中去。 实验&#xff1a; systemctl…

uniapp整合echarts(目前性能最优、渲染最快方案)

本文echarts示例如上图,可扫码体验渲染速度及loading效果,下文附带本小程序uniapp相关代码 实现代码 <template><view class="source

thinkphp的生命周期

1.入口文件 index.php 用户通过入口文件&#xff0c;发起服务请求&#xff0c;是整个应用的入口与七点 定义常量&#xff0c;加载引导文件&#xff0c;不要放任何业务处理代码 2.引导文件 start.php; 加载常量->加载环境变量->注册自动加载->注册错误与异常->加…