RocketMQ 多级存储设计与实现

news2024/9/25 21:31:12

作者:张森泽

随着 RocketMQ 5.1.0 的正式发布,多级存储作为 RocketMQ 一个新的独立模块到达了 Technical Preview 里程碑:允许用户将消息从本地磁盘卸载到其他更便宜的存储介质,可以用较低的成本延长消息保留时间。本文详细介绍 RocketMQ 多级存储设计与实现。

设计总览

RocketMQ 多级存储旨在不影响热数据读写的前提下将数据卸载到其他存储介质中,适用于两种场景:

  1. 冷热数据分离:RocketMQ 新近产生的消息会缓存在 page cache 中,我们称之为热数据;当缓存超过了内存的容量就会有热数据被换出成为冷数据。如果有少许消费者尝试消费冷数据就会从硬盘中重新加载冷数据到 page cache,这会导致读写 IO 竞争并挤压 page cache 的空间。而将冷数据的读取链路切换为多级存储就可以避免这个问题;
  2. 延长消息保留时间:将消息卸载到更大更便宜的存储介质中,可以用较低的成本实现更长的消息保存时间。同时多级存储支持为 topic 指定不同的消息保留时间,可以根据业务需要灵活配置消息 TTL。

RocketMQ 多级存储对比 Kafka 和 Pulsar 的实现最大的不同是我们使用准实时的方式上传消息,而不是等一个 CommitLog 写满后再上传,主要基于以下几点考虑:

  1. 均摊成本:RocketMQ 多级存储需要将全局 CommitLog 转换为 topic 维度并重新构建消息索引,一次性处理整个 CommitLog 文件会带来性能毛刺;
  2. 对小规格实例更友好:小规格实例往往配置较小的内存,这意味着热数据会更快换出成为冷数据,等待 CommitLog 写满再上传本身就有冷读风险。采取准实时上传的方式既能规避消息上传时的冷读风险,又能尽快使得冷数据可以从多级存储读取。

Quick Start

多级存储在设计上希望降低用户心智负担:用户无需变更客户端就能实现无感切换冷热数据读写链路,通过简单的修改服务端配置即可具备多级存储的能力,只需以下两步:

  1. 修改 Broker 配置,指定使用 org.apache.rocketmq.tieredstore.TieredMessageStore 作为 messageStorePlugIn
  2. 配置你想使用的储存介质,以卸载消息到其他硬盘为例:配置 tieredBackendServiceProvider 为 org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment,同时指定新储存的文件路径:tieredStoreFilepath

可选项:支持修改 tieredMetadataServiceProvider 切换元数据存储的实现,默认是基于 json 的文件存储

更多使用说明和配置项可以在 GitHub 上查看多级存储的 README [ 1]

技术架构

在这里插入图片描述

architecture

接入层:TieredMessageStore/TieredDispatcher/TieredMessageFetcher

接入层实现 MessageStore 中的部分读写接口,并为他们增加了异步语意。TieredDispatcher 和 TieredMessageFetcher 分别实现了多级存储的上传/下载逻辑,相比于底层接口这里做了较多的性能优化:包括使用独立的线程池,避免慢 IO 阻塞访问热数据;使用预读缓存优化性能等。

容器层:TieredCommitLog/TieredConsumeQueue/TieredIndexFile/TieredFileQueue

容器层实现了和 DefaultMessageStore 类似的逻辑文件抽象,同样将文件划分为 CommitLog、ConsumeQueue、IndexFile,并且每种逻辑文件类型都通过 FileQueue 持有底层物理文件的引用。有所不同的是多级存储的 CommitLog 改为 queue 维度。

驱动层:TieredFileSegment

驱动层负责维护逻辑文件到物理文件的映射,通过实现 TieredStoreProvider 对接底层文件系统读写接口(Posix、S3、OSS、MinIO 等)。目前提供了 PosixFileSegment 的实现,可以将数据转移到其他硬盘或通过 fuse 挂载的对象存储上。

消息上传

RocketMQ 多级存储的消息上传是由 dispatch 机制触发的:初始化多级存储时会将 TieredDispatcher 注册为 CommitLog 的 dispacher。这样每当有消息发送到 Broker 会调用 TieredDispatcher 进行消息分发,TieredDispatcher 将该消息写入到 upload buffer 后立即返回成功。整个 dispatch 流程中不会有任何阻塞逻辑,确保不会影响本地 ConsumeQueue 的构建。

在这里插入图片描述

TieredDispatcher

TieredDispatcher 写入 upload buffer 的内容仅为消息的引用,不会将消息的 body 读入内存。因为多级储存以 queue 维度构建 CommitLog,此时需要重新生成 commitLog offset 字段。

在这里插入图片描述

upload buffer

触发 upload buffer 上传时读取到每条消息的 commitLog offset 字段时采用拼接的方式将新的 offset 嵌入到原消息中。

上传进度控制

每个队列都会有两个关键位点控制上传进度:

  1. dispatch offset:已经写入缓存但是未上传的消息位点
  2. commit offset:已上传的消息位点

在这里插入图片描述

upload progress

类比消费者,dispatch offset 相当于拉取消息的位点,commit offset 相当于确认消费的位点。commit offset 到 dispatch offset 之间的部分相当于已拉取未消费的消息。

消息读取

TieredMessageStore 实现了 MessageStore 中的消息读取相关接口,通过请求中的逻辑位点(queue offset)判断是否从多级存储中读取消息,根据配置(tieredStorageLevel)有四种策略:

  • DISABLE:禁止从多级存储中读取消息;
  • NOT_IN_DISK:不在 DefaultMessageStore 中的消息从多级存储中读取;
  • NOT_IN_MEM:不在 page cache 中的消息即冷数据从多级存储读取;
  • FORCE:强制所有消息从多级存储中读取,目前仅供测试使用。
/**
  * Asynchronous get message
  * @see #getMessage(String, String, int, long, int, MessageFilter) 
  getMessage
  *
  * @param group Consumer group that launches this query.
  * @param topic Topic to query.
  * @param queueId Queue ID to query.
  * @param offset Logical offset to start from.
  * @param maxMsgNums Maximum count of messages to query.
  * @param messageFilter Message filter used to screen desired 
  messages.
  * @return Matched messages.
  */
CompletableFuture<GetMessageResult> getMessageAsync(final String group, final String topic, final int queueId,
    final long offset, final int maxMsgNums, final MessageFilter 
messageFilter);

需要从多级存储中读取的消息会交由 TieredMessageFetcher 处理:首先校验参数是否合法,然后按照逻辑位点(queue offset)发起拉取请求。TieredConsumeQueue/TieredCommitLog 将逻辑位点换算为对应文件的物理位点从 TieredFileSegment 读取消息。

// TieredMessageFetcher#getMessageAsync similar with 
TieredMessageStore#getMessageAsync
public CompletableFuture<GetMessageResult> getMessageAsync(String 
group, String topic, int queueId,
        long queueOffset, int maxMsgNums, final MessageFilter 
messageFilter)

TieredFileSegment 维护每个储存在文件系统中的物理文件位点,并通过为不同存储介质实现的接口从中读取所需的数据。

/**
  * Get data from backend file system
  *
  * @param position the index from where the file will be read
  * @param length the data size will be read
  * @return data to be read
  */
CompletableFuture<ByteBuffer> read0(long position, int length);

预读缓存

TieredMessageFetcher 读取消息时会预读一部分消息供下次使用,这些消息暂存在预读缓存中。

protected final Cache<MessageCacheKey /* topic, queue id and queue 
offset */,
SelectMappedBufferResultWrapper /* message data */> readAheadCache;

预读缓存的设计参考了 TCP Tahoe 拥塞控制算法,每次预读的消息量类似拥塞窗口采用加法增、乘法减的机制控制:

  • 加法增:从最小窗口开始,每次增加等同于客户端 batchSize 的消息量。
  • 乘法减:当缓存的消息超过了缓存过期时间仍未被全部拉取,在清理缓存的同时会将下次预读消息量减半。

预读缓存支持在读取消息量较大时分片并发请求,以取得更大带宽和更小的延迟。

某个 topic 消息的预读缓存由消费这个 topic 的所有 group 共享,缓存失效策略为:

  1. 所有订阅这个 topic 的 group 都访问了缓存
  2. 到达缓存过期时间

故障恢复

上文中我们介绍上传进度由 commit offset 和 dispatch offset 控制。多级存储会为每个 topic、queue、fileSegment 创建元数据并持久化这两种位点。当 Broker 重启后会从元数据中恢复,继续从 commit offset 开始上传消息,之前缓存的消息会重新上传并不会丢失。

在这里插入图片描述

开发计划

面向云原生的存储系统要最大化利用云上存储的价值,而对象存储正是云计算红利的体现。RocketMQ 多级存储希望一方面利用对象存储低成本的优势延长消息存储时间、拓展数据的价值;另一方面利用其共享存储的特性在多副本架构中兼得成本和数据可靠性,以及未来向 Serverless 架构演进。

tag 过滤

多级存储拉取消息时没有计算消息的 tag 是否匹配,tag 过滤交给客户端处理。这样会带来额外的网络开销,计划后续在服务端增加 tag 过滤能力。

广播消费以及多个消费进度不同的消费者

预读缓存失效需要所有订阅这个 topic 的 group 都访问了缓存,这在多个 group 消费进度不一致的情况下很难触发,导致无用的消息在缓存中堆积。

需要计算出每个 group 的消费 qps 来估算某个 group 能否在缓存失效前用上缓存的消息。如果缓存的消息预期在失效前都不会被再次访问,那么它应该被立即过期。相应的对于广播消费,消息的过期策略应被优化为所有 Client 都读取这条消息后才失效。

和高可用架构的融合

目前主要面临以下三个问题:

  1. 元数据同步:如何可靠的在多个节点间同步元数据,slave 晋升时如何校准和补全缺失的元数据;
  2. 禁止上传超过 confirm offset 的消息:为了避免消息回退,上传的最大 offset 不能超过 confirm offset;
  3. slave 晋升时快速启动多级存储:只有 master 节点具有写权限,在 slave 节点晋升后需要快速拉起多级存储断点续传。

相关链接:

[1] README

https://github.com/apache/rocketmq/blob/develop/tieredstore/README.md

点击此处查看消息队列 RocketMQ 产品详情

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/439272.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

记录贴:EasyPoi word导出问题一览

项目场景&#xff1a; EasyPoi word导出 问题描述1 easypoi 模板导出 我直接在map的value输入空格或"",出来的是{{,两个左花括号,咋解决 解决方案&#xff1a; exportMap.put("key", "\u00A0"); //空格前端效果&#xff1a; 其他无效解决方案…

Redis安装配置操作记录

Redis 官网&#xff1a;https://redis.io/ 中文文档&#xff1a;https://www.redis.com.cn/documentation.html 在线命令参考&#xff1a;http://doc.redisfans.com 一&#xff0c;Redis下载安装与配置 下载网站&#xff0c;可下载安装包然后安装或可使用brew来安装Redis&#…

LeetCode——前K个高频单词

692. 前K个高频单词 给定一个单词列表 words 和一个整数 k &#xff0c;返回前 k 个出现次数最多的单词。 返回的答案应该按单词出现频率由高到低排序。如果不同的单词有相同出现频率&#xff0c; 按字典顺序 排序。 示例 1&#xff1a; 输入: words [“i”, “love”, “le…

太阳能电池测试解决方案NS-9001

前言 太阳能行业的快速发展提高了对太阳能电池测试和精确测量解决方案要求&#xff0c;伴随着太阳能电池尺寸大小质量的提升&#xff0c;充电电池测试必须更多的电流和更高输出功率水准&#xff0c;这就更加需要灵活多变的测试方案支持。 现阶段&#xff0c;太阳能电池测试 解…

Java学习星球,Java学习路线

目录 一、Java学习路线二、学习计划三、为何会有Java学习星球&#xff1f;四、加入星球后&#xff0c;你可以得到什么&#xff1f;五、如何加入Java学习星球&#xff1f;六、打卡挑战 大家好&#xff0c;我是哪吒&#xff0c;一个靠着热情攀登至C站巅峰的中年男子&#xff0c;C…

【历史上的今天】3 月 20 日:cURL 二十五周年;Docker 发布;思科收购 Linksys

整理 | 王启隆 透过「历史上的今天」&#xff0c;从过去看未来&#xff0c;从现在亦可以改变未来。 今天是 2023 年 3 月 20 日&#xff0c;在 1999 年的今天&#xff0c;人类首次成功乘热气球环球飞行。在 24 年的今天&#xff0c;瑞士人皮尔卡、英国人琼斯经过近 20 天的飞行…

JavaWeb—HTTP协议

目录 1.HTTP协议 1.1 HTTP-概述 1.1.1 介绍 2.1.2 特点 1.2 HTTP-请求协议 HTTP-请求数据格式 GET请求和POST请求的区别&#xff1a; 1.3 HTTP-响应协议 1.3.1 格式介绍 1.3.2 响应状态码 常见响应状态码 1.4 HTTP-协议解析 1.HTTP协议 1.1 HTTP-概述 1.1.1 介绍 HTT…

ModuleNotFoundError: No module named ‘cuda‘、‘tensorrt‘

1、 ModuleNotFoundError: No module named ‘cuda’ python -m pip install --upgrade pip pip install cuda-python2、 ModuleNotFoundError: No module named ‘tensorrt’ 2.1 依赖库 先安装两个TensorRT的依赖库 python -m pip install --upgrade pip pip install nvi…

openstack compute schedulers

https://docs.openstack.org/nova/latest/admin/scheduling.html 在默认的配置中&#xff0c;调度器将考虑如下的几个方面&#xff1a; 请求的是Availability Zonenova-compute服务在目标节点上是启用的满足实例类型的extra specs&#xff08;ComputeCapabilityesFilter&#…

Spring Boot Web

一. 概述 下面我们将进入 SpringBoot 基础阶段的学习。 在没有正式的学习 SpringBoot 之前&#xff0c;我们要先来了解下什么是 Spring 。 我们可以打开 Spring 的官网 ( https://spring.io ) &#xff0c;去看一下 Spring 的简介&#xff1a; Spring makes Java simple 。…

UUID无处不在,你就是唯一(2023.4.16)

六种语言生成UUID 2023.4.16 引言1、UUID简介2、UUID格式和编码3、UUID各历史版本4、UUID代码具体调用实现4.1 C# 生成UUID4.2 Java 生成UUID4.3 Python 生成UUID4.4 C 生成UUID4.5 C 生成UUID4.6 JavaScript 生成UUID&#xff08;较为实用&#xff09;4.6.1 控制台运行&#x…

测绘与设计之间的鸿沟:坐标系,教你如何将CAD与测绘数据准确叠加

一、背景 2008年&#xff0c;我国推出了2000国家大地坐标系&#xff08;以下简称国家2000坐标系&#xff09;&#xff0c;截至2022年&#xff0c;国家2000坐标系在自然资源领域已经取得了较高的普及率&#xff0c;但在工程建设领域的普及率依旧比较低&#xff0c;很多工程项目…

23种设计模式(9)——适配器模式

目录 一、基本介绍 二、demo 2.1、类适配器模式 类适配器模式注意事项和细节 2.2、对象适配器模式 对象适配器模式注意事项和细节 2.3、接口适配器模式 接口适配器模式介绍 三、适配器模式在框架中的应用 3.1在 SpringMVC 框架应用 3.2、spring AOP中的适配器模式 一、…

如何制作实时库存报表

草料二维码暂不支持自动计算功能&#xff0c;无法看到实时的库存数量。但可以使用外部数据分析工具&#xff0c;如百度Sugar&#xff0c;连接草料二维码官方数据库&#xff0c;即可自由实现各类计算&#xff0c;包括实时库存。 一、案例效果 输入物料名称&#xff0c;即可快速…

oracle学习之rownum和rowid

rownum先百度一波https://www.cnblogs.com/xfeiyun/p/16355165.html rownum是oracle特有的一个关键字。 对于基表&#xff0c;在insert记录时&#xff0c;oracle就按照insert的顺序&#xff0c;将rownum分配给每一行记录&#xff0c;因此在select一个基表的时候&#xff0c;r…

Java基础(八)异常处理

1. 异常概述 1.1 什么是生活的异常 男主角小明每天开车上班&#xff0c;正常车程1小时。但是&#xff0c;不出意外的话&#xff0c;可能会出现意外。 出现意外&#xff0c;即为异常情况。我们会做相应的处理。如果不处理&#xff0c;到不了公司。处理完了&#xff0c;就可以…

Ubuntu下打开QtCreator,环境变量(PATH、LD_LIBRARY_PATH等)与预期不一致的问题

现象展示 在Ubuntu中&#xff0c;安装好Qt之后&#xff0c;可以在系统桌面的左下角找到启动图标 但是&#xff0c;这种方式启动的QtCreator所读取到的环境变量和我们从命令行读取到的不一致&#xff1a; 可以看到&#xff0c;明显少了这个&#xff1a;/opt/ros/humble/bin 因…

Docker实战笔记3-仓库

转载请标明出处&#xff1a;http://blog.csdn.net/zhaoyanjun6/article/details/130260521 本文出自【赵彦军的博客】 文章目录 官方仓库 Docker Hub注册登录查看镜像搜索镜像推送镜像自动构建 网易镜像 官方仓库 Docker Hub https://hub.docker.com/ 目前 Docker 官方维护了…

C++入门之auto关键字内联函数

文章目录 前言一、auto关键字1.auto简介2.auto的使用细则&#xff08;1&#xff09;auto与指针和引用结合起来使用&#xff08;2&#xff09;在同一行定义多个变量&#xff08;3&#xff09;auto不能推导的场景 3.基于范围的for循环(C11)&#xff08;1&#xff09;遍历&#xf…

聚观早报|五一首日高铁跨省游热度暴涨;大语言模型规模接近极限

今日要闻&#xff1a;马斯克已创立新人工智能公司X.AI&#xff1b;五一假期首日高铁跨省游热度暴涨&#xff1b;大语言模型规模已接近极限&#xff1b;贾跃亭发文谈FF首台量产车下线&#xff1b;斑马智行与智己汽车进一步合作 马斯克已创立新人工智能公司X.AI 据外媒报道&…