RocketMQ 存储优化技术 解析——图解、源码级解析

news2024/12/28 21:03:08

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年1月13日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • RocketMQ里的存储优化
    • 内存预分配
    • `mlock`系统调用
    • 文件预热
  • 存储模型
  • 刷盘流程
    • 同步刷盘
    • 异步刷盘

RocketMQ里的存储优化

内存预分配

写入消息时,CommitLog会先从MappedFileQueue(队列)中获取一个MappedFileMappedFile对象的预分配过程如下图所示:

在这里插入图片描述

MappedFile的创建过程是将构建好的AllocateRequest请求添加至队列中,后台运行的AllocateMappedFileService服务线程根据队列里存在的请求,执行MappedFile映射文件的创建和预分配工作。

 static class AllocateRequest implements Comparable<AllocateMappedFileService.AllocateRequest> {
        private String filePath;
        private int fileSize;
        private CountDownLatch countDownLatch = new CountDownLatch(1);
        private volatile MappedFile mappedFile = null;

分配时有两种策略:

  1. 使用mmap的方式创建MappedFile实例
  2. TransientStorePool堆外内存池中获取相应的DirectByteBuffer来构建MappedFile实例

在创建并分配完成MappedFile实例后,系统还会同时将下一个MappedFile实例也预先创建出来并保存到请求队列里,下次请求时可以跳过创建MappedFile实例的时间,直接返回。


mlock系统调用

该系统调用的功能是可以将进程使用的部分或所有的地址空间锁定在物理内存里,防止其被交换到swap空间。

对于一款消息中间件来说,追求的一定是消息读写的低延时,因为内存页面调出调入的时间延迟可能太长或难以预知,所以就希望尽可能多地使用物理内存,以提高数据读写的效率。


文件预热

做预热主要是基于如下考虑:

  1. 仅仅分配内存并执行mlock系统调用之后并不会为程序完全锁定这些内存,其中的分页仍然可能是copy-on-write的,因此RocketMQ在创建MappedFile实例的时候,会先写入一些随机值到mmap映射出的内存空间里。

  1. 使用mmap进行内存映射之后,操作系统只是建立虚拟内存地址到物理地址的映射表,但没有加载任何文件至内存中。程序想要访问数据时,操作系统会检查该部分的分页是否已经在内存里,如果不在的话,则会发出一次缺页中断。RocketMQ在做mmap内存映射的同时进行madvise系统调用,目的是使操作系统做一次内存映射后对应的文件数据尽可能多地预加载进内存,从而实现预热。

x86 Linux中的一个标准页面大小为4KB,1G的commitLog需要发生256次缺页中断,才能将数据完全加载进物理内存中


存储模型

1. CommitLog

消息主体及元数据的存储主体,存储Producer端写入的消息主体内容。单个文件默认大小为1GB,文件名长度为20位

00000000000000000000表示第一个文件,起始偏移量为0,文件大小为1GB = 1073741824;00000000001073741824表示第二个文件,写入是顺序写

public class CommitLog {
    // Message's MAGIC CODE daa320a7
    public final static int MESSAGE_MAGIC_CODE = -626843481;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    // End of file empty MAGIC CODE cbd43194
    protected final static int BLANK_MAGIC_CODE = -875286124;
    protected final MappedFileQueue mappedFileQueue;
    protected final DefaultMessageStore defaultMessageStore;
    private final FlushCommitLogService flushCommitLogService;

    //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
    private final FlushCommitLogService commitLogService;

    private final AppendMessageCallback appendMessageCallback;
    private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
    protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
    protected volatile long confirmOffset = -1L;

    private volatile long beginTimeInLock = 0;

    protected final PutMessageLock putMessageLock;
}

2. ConsumeQueue

消息消费的逻辑队列,其中包含了这个MessageQueueCommitLog中的起始物理位置偏移量offset、消息实体内容的大小和Message Tag的哈希值。从实际物理存储来说,ConsumeQueue对应每个TopicQueueId下面的所有文件,每个文件默认大小为6MB,差不多可以存储30万条消息,也是顺序写入。

public class ConsumeQueue {
    private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqStore");
    public static final int CQ_STORE_UNIT_SIZE = 20;
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger("RocketmqStoreError");
    private final DefaultMessageStore defaultMessageStore;
    private final MappedFileQueue mappedFileQueue;
    private final String topic;
    private final int queueId;
    private final ByteBuffer byteBufferIndex;
    private final String storePath;
    private final int mappedFileSize;
    private long maxPhysicOffset = -1L;
    private volatile long minLogicOffset = 0L;
    private ConsumeQueueExt consumeQueueExt = null;
}

3. IndexFile

用于为生成的索引文件提供访问,通过消息Key值查询消息真正的实体内容。在实际的物理存储上,文件名是以创建的时间戳命名的,一个IndexFile可以保存2000万个索引。

public class IndexFile {
    private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqStore");
    private static int hashSlotSize = 4;
    private static int indexSize = 20;
    private static int invalidIndex = 0;
    private final int hashSlotNum;
    private final int indexNum;
    private final MappedFile mappedFile;
    private final FileChannel fileChannel;
    private final MappedByteBuffer mappedByteBuffer;
    private final IndexHeader indexHeader;
}

4. MappedFileQueue

对连续物理存储的封装类,代码中可以通过消息存储的物理偏移量快速定位offset对应的MappedFile

public class MappedFileQueue {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);

    private static final int DELETE_FILES_BATCH_MAX = 10;

    private final String storePath;

    private final int mappedFileSize;

    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

    private final AllocateMappedFileService allocateMappedFileService;

    private long flushedWhere = 0;
    private long committedWhere = 0;

    private volatile long storeTimestamp = 0;
}

5. MappedFile

文件存储的直接内存映射封装类,通过该类的实例,可以把消息写入PageCache,或者将消息刷盘。

public class MappedFile extends ReferenceResource {
    public static final int OS_PAGE_SIZE = 1024 * 4;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    protected int fileSize;
    protected FileChannel fileChannel;
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    protected ByteBuffer writeBuffer = null;
    protected TransientStorePool transientStorePool = null;
    private String fileName;
    private long fileFromOffset;
    private File file;
    private MappedByteBuffer mappedByteBuffer;
    private volatile long storeTimestamp = 0;
    private boolean firstCreateInQueue = false;
}

刷盘流程

同步刷盘

在这里插入图片描述
只有在消息成功写入磁盘之后,Broker才会给Producer返回一个ACK响应。RocketMQ中,主线程会首先创建一个刷盘请求实例GroupCommitRequest,并将其放到刷盘队列,之后使用同步刷盘线程GroupCommitService来执行刷盘动作。

GroupCommitService里使用到CountDownLatch来控制线程间同步

RocketMQ里还使用了两个队列分别负责读和写操作,实现读写分离,提高并发量

同步刷盘可以保障较好的一致性,一般适用于金融业务领域。


异步刷盘

在这里插入图片描述

只要消息写入到Pagecache之后就可以直接返回ACK给Producer,之后,后台异步线程负责将消息刷盘,此时主线程并不会阻塞,降低了读写延迟,从而达到提高MQ性能和吞吐量的目的。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/160852.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Django 后端没有接收到前端anxios的 post 内容

前端使用 vue 无论怎样 post 后端都说没有接收到值&#xff0c;&#xff08;后端接口正确&#xff09; 寻找原因&#xff1a; 1、前端查看自己的请求类型 Content-Type:application/json 我们的请求是这样的&#xff1a; axios({method:post,url:/video/upload,data:{"…

RedHat6配置本地yum源(最新超详细过程)

一、环境准备 挂载iso的镜像文件在CD/DVD驱动器上&#xff0c;需要确保&#xff0c;已连接已打开&#xff0c;且CD/DVD上的介质符合当前操作系统的版本。 挂载本地光盘到系统 在“编辑设置”——>“硬件”——>“CD/DVD驱动器”里指定操作系统的ISO镜像文件 光驱挂载…

web性能测试:Lighthouse测试实践

一工具简介 Lighthouse是Google开源的一个自动化工具&#xff0c;它可以搜集多个Web网页性能指标&#xff0c;分析Web应用的性能并生成报告&#xff0c;为开发人员进行性能优化提供了参考方向。1工作原理•Driver&#xff08;驱动&#xff09;—— 通过 Chrome Debugging Proto…

力扣sql基础篇(六)

力扣sql基础篇(六) 1 学生参加各科测试的次数 1.1 题目内容 1.1.1 基本题目信息 1.1.2 示例输入输出 a 输入示例 b 输出示例 1.2 示例sql语句 # 无论考没考试都要该科目这栏且无连接字段,就可以考虑笛卡尔积了 SELECT s.student_id,s.student_name,s.subject_name,IFNULL…

【软件STM32cubeIDE下STM32F4xx使用DMA+定时器推PWM+灯带WS2812-进阶-综合汇总(讲解移植相关)】

2TOC &#xff08;1&#xff09;前言 做灯带ws2812其实有一段时间了&#xff0c;中间遇到很多问题&#xff0c;从开始的学习&#xff0c;到后来慢慢熟悉&#xff0c;再到后来尝试点很多灯带&#xff0c;做过非常多的实验了&#xff0c;自己新建工程&#xff0c;几乎尝试过很多…

【Git】GitHub 操作

6、GitHub 操作 GitHub 网址&#xff1a;https://github.com/ Ps:全球最大同性交友网站&#xff0c;技术宅男的天堂&#xff0c;新世界的大门&#xff0c;你还在等什么? 账号姓名验证邮箱atguiguyueyue岳不群atguiguyueyuealiyun.comatguigulinghuchong令狐冲atguigulinghu…

72、PaletteNeRF: Palette-based Appearance Editing of Neural Radiance Fields

简介 官网:https://palettenerf.github.io/ 以(a)多视图照片为训练输入&#xff0c;重建NeRF并将其外观分解为一组(b)基于3D调色板的色基&#xff0c;实现了©直观和逼真的场景重新着色&#xff0c;在任意视图之间具有3D一致性&#xff0c;如(d)所示&#xff0c;该方法支…

人工智能与Python的渊源

人工智能起源与发展 文章目录人工智能起源与发展前言一、达特茅斯会议与人工智能起源二、人工智能发展的高峰与低谷Python与人工智能构建Python人工智能编程环境1.Python版本2.Anaconda编程环境数据处理常用算法2.1傅里叶变换2.1.1傅里叶分析的由来2.1.2傅里叶变换原理及应用傅…

MODBUS协议下,组态王与S7-1200能否建立无线通讯?

MODBUS协议下&#xff0c;想要组态王与S7-1200之间的无线通讯其实很容易。可采用了西门子PLC专用无线通讯终端DTD434MC&#xff0c;作为实现无线通讯的硬件设备&#xff0c;使用简单方便&#xff0c;不必深入理解 MODBUS 协议细节&#xff0c;无需更改网络参数直接替换有线连接…

波士顿房价预测—随机梯度下降法优化

根据我上一篇关于波士顿房价预测一文可以知道&#xff0c;如果使用梯度下降法&#xff0c;需要将所有的样本对梯度的贡献取平均&#xff0c;根据梯度更新参数。 但是面对海量样本的数据集&#xff0c;如果每次计算都使用全部的样本来计算损失函数和梯度&#xff0c;性能会很差&…

如何创建商用照明 App SDK 应用?

商用照明 App SDK 是专为照明行业的物联网应用提供的移动端开发工具。通过商用照明 SDK&#xff0c;大家可以形成完整的商用照明物联网控制系统&#xff0c;多协议兼容&#xff0c;完美满足绿色建筑的设备管理及能源管理要求。 什么是涂鸦商用照明&#xff1f; 涂鸦商用照明解…

python基础语法(1)

专栏&#xff1a;python 每日一句&#xff1a;人生&#xff0c;无非只有三天&#xff0c;昨天&#xff0c;今天&#xff0c;明天。昨天很长&#xff0c;说不清有多少天&#xff0c;但不管有多少天&#xff0c;不管是受到挫折&#xff0c;还是取得辉煌&#xff0c;都只能代表过去…

Feign

文章目录Http客户端Feign1、Feign替代RestTemplate1.1、RestTemplate方式调用存在的问题1.2、Feign介绍1.3、定义和使用Feign客户端2、Feign的自定义配置2.1、修改日志级别3、Feign的性能优化3.1、Feign的性能优化-连接池配置4、Feign的最佳实践4.1、方式一&#xff08;继承&am…

TypeScript类型 : any,unknown

1.any类 在某些情况下&#xff0c;我们确实无法确定一个变量的类型&#xff0c;并且可能它会发生一些变化&#xff0c;这个时候我们可以使用any类型&#xff08;类似 于Dart语言中的dynamic类型&#xff09;。 any类型有点像一种讨巧的TypeScript手段&#xff1a; 1.我们可以…

【23届秋招总结】本科小学弟成功签约滴滴后端开发offer

大家好&#xff01;我是路飞&#xff0c;最近工作太忙啦&#xff0c;断更很久&#xff0c;今天给大家分享一位本科23届小学弟的秋招历程&#xff5e; 在今年整体上就业困难&#xff0c;各大公司校招HC收缩的情况下&#xff0c;这位小学弟也历经坎坷成功拿到了滴滴后端开发岗位…

【安卓学习笔记】安卓的事件处理

安卓提供了两种方式的事件处理&#xff1a;基于回调的事件处理和基于监听的事件处理。 基于监听的事件处理 基于监听的事件处理一般包含三个要素&#xff0c;分别是&#xff1a; Event Source&#xff08;事件源&#xff09;&#xff1a;事件发生的场所&#xff0c;通常是各个…

c++11 标准模板(STL)(std::forward_list)(六)

定义于头文件 <forward_list> template< class T, class Allocator std::allocator<T> > class forward_list;(1)(C11 起)namespace pmr { template <class T> using forward_list std::forward_list<T, std::pmr::polymorphic_…

动态内存管理详解(malloc、calloc、realloc)

文章目录 一、什么是动态内存 二、为什么要存在动态内存分配 三、动态内存函数的介绍 3、1 malloc和free的介绍 3、2 calloc的介绍 3、3 reallco的介绍 四、常见的动态内存错误 4、1 对NULL指针的解引用操作 4、2 对动态开辟空间的越界访问 4、3 对非动态开辟内存使用free释放 …

rcu锁原理以及rcu example学习

rcu参考资料&#xff1a; https://airekans.github.io/c/2016/05/10/dive-into-liburcu https://lwn.net/Articles/262464/ https://cloud.tencent.com/developer/article/1684477 https://www.cnblogs.com/LoyenWang/p/12681494.html userspace rcu: https://github.com/urcu…

PHP 过滤器

PHP 过滤器用于验证和过滤来自非安全来源的数据&#xff0c;比如用户的输入。什么是 PHP 过滤器&#xff1f;PHP 过滤器用于验证和过滤来自非安全来源的数据。测试、验证和过滤用户输入或自定义数据是任何 Web 应用程序的重要组成部分。PHP 的过滤器扩展的设计目的是使数据过滤…