AutoMQ 如何实现没有写性能劣化的极致冷读效率

news2025/2/11 17:48:34

前言

追赶读(Catch-up Read,冷读)是消息和流系统常见和重要的场景。

  • 削峰填谷:对于消息来说,消息通常用作业务间的解耦和削峰填谷。削峰填谷要求消息队列能将上游发送的数据堆积住,让下游在容量范围内消费,这时候下游追赶读的数据都是不在内存中的冷数据。

  • 批处理场景:对于流来说,周期性的批处理任务需要从几个小时甚至一天前的数据开始扫描计算。

  • 故障恢复:消费者宕机故障若干小时后恢复重新上线;消费者逻辑问题,修复后,回溯消费历史数据。

追赶读主要关注两点:

  • 追赶读的速度:追赶读速度越快。业务就能更快从故障中恢复,降低故障影响时间。批处理任务就能更快产出分析结果,产出报表和决策。

  • 读写的隔离性:追赶读需要尽量不影响消息发送的速率和延时。

Apache Kafka 一直以来都以极致的吞吐能力受到广大开发者和使用者的喜爱。AutoMQ[1] 在保证与 Apache Kafka 100% 兼容并且提供极致弹性和降本能力的基础上,不仅做到了相比Kafka更加极致的吞吐能力,同时还解决了Kafka冷读时,写吞吐性能劣化的问题。接下来,本文将从追赶读的实现来说明 AutoMQ 如何做到单机 1K 分区并发追尾读达到 1GB/s 的极致吞吐能力,并且在追赶读过程中避免发送流量的性能劣化。

追赶读实现

架构概览

AutoMQ 针对流顺序连续读取的特征参考 Linux 的 PageCache 设计了 BlockCache 层。BlockCache 会对上层屏蔽与对象存储交互的细节,上层只需要发起指定位点的读取请求,BlockCache 会进行读取请求合并、数据预读、数据缓存和缓存驱逐,以达到最佳的追赶读吞吐、缓存利用率和 API 调用成本。

那为什么叫 BlockCache 不叫 PageCache 或者 RecordCache?

要回答这个问题,首先需要介绍 AutoMQ 在对象存储上一个对象的存储格式,一个对象由三大部分组成:

  • Data Block:存储一个 Stream 连续的 Records 数据段,一个对象中可以有多个不同 Stream 的 Data Block。

  • Index Block:存储 Data Block 的索引信息 {streamId, startOffset, endOffset, recordCount, blockPosition, blockSize},每次从对象中读取数据,首先是需要从 Index Block 二分查找定位到对应的 Data Block 索引,然后再去执行真正的数据块读取。

  • Footer:存储格式版本和 Index Block 位置等信息。

<beginning>
[data block 1]
[data block 2]
...
[data block N]
[index block]
[Footer]
<end>

AutoMQ 从对象存储读取和缓存都是以 Data Block 为最小维度,因此追赶读的缓存称作 BlockCache。

BlockCache 架构如下图,主要由 4 部分组成:

  • KRaft Metadata:存储 Stream 的 Offset 段到对象的关系。

  • StreamReader:读取窗口,每个消费者消费每个分区都会有自己独立的读取窗口。窗口内主要维护还未完成读取的 Data Block 的索引信息,并且在适当的时候触发预读加速。

  • DataBlockCache:Data Block 数据缓存,通过堆外内存缓存从对象存储读取的数据块,采用关注度和 LRU(Least Recently Used)机制来进行缓存管理。

  • ObjectStorage:对象存储的 API 抽象层,抹平不同云对象存储的差异,并提供读取合并加速。

在这里插入图片描述

BlockCache 发起一次追赶读,各个组件的交互流程简单描述如下:

  1. 首先根据读取的 {streamId, startOffset} 定位到 StreamReader;

  2. 然后 StreamReader 会向 KRaft Metadata 请求 {startOffset, endOffset} 下负责的对象的元信息;

  3. StreamReader 根据对象元信息读取对象的 IndexBlock,并二分查找出对应的 DataBlock 索引(若内存中已经有索引信息则跳过步骤2 / 3);

  4. StreamReader 向 DataBlockCache 请求 DataBlock;

  5. DataBlockCache 向 ObjectStorage 发送对象的 #rangeRead 请求(若已经缓存则直接返回);

  6. ObjectStorage 读取对应的数据段返回给上层。

基础概念和流程介绍完成,再来剖析一下 “AutoMQ 如何做到单机 1K 分区并发追尾读达到 1GB/s”。

1K 分区并发追尾读

AutoMQ 实现单机 1K 分区并发追尾读的关键是控制每个 Stream 读取的缓存空间占用。避免总缓存诉求超过缓存空间上限,不同 Stream 的缓存互相驱逐导致从对象存储读取的网络带宽和 API 成本浪费。

AutoMQ 可以将每个 Stream 的读取缓存空间占用控制在 2MB 以下,意味着只需要 2GB 的 BlockCache 就能支撑 1K 分区的并发追尾读。

前面提到 BlockCache 的最小缓存粒度是对象的 DataBlock。DataBlock 默认大小为 512KB(软限制),因此 Stream 读取缓存空间占用为 512KB * N(缓存的 DataBlock 个数)。那么减少空间占用的目标,就是去尽可能减少 N 的值,而 N 值的大小主要由缓存驱逐策略决定。

在通用的缓存中通常采用 Least Recently Used 来作为缓存驱逐策略,但实测下来这种策略对顺序读取的流场景并不是特别适配,仍旧会出现较多的误驱逐问题。举个例子,假设有 2 个分区在并发追尾读,2 个分区的读取速率分别是 10MB/s 和 1MB/s,1MB/s 分区的 DataBlock 访问和更新频率比 10MB/s 分区低,那么很有可能由于 LRU,1MB/s 分区缓存的 DataBlock 还未被读完,就被 10MB/s 分区新加载的 DataBlock 所驱除。

为了解决这个问题,AutoMQ 在 LRU 的基础上新增基于关注度(Watch)驱逐策略。读取窗口(StreamReader)内正在读取或者将来准备要读取的 DataBlock,读取窗口会给该 DataBlock 标记关注度 + 1,当读取窗口将这个 DataBlock 读取完成后会释放 DataBlock 的关注度 -1。BlockCache 会优先采用基于关注度的驱逐策略,当 DataBlock 的关注度减为 0 时,即使 BlockCache 还有缓存空间,该 DataBlock 的缓存也会被立即驱逐。

在这里插入图片描述

通过关注度驱逐策略,在不考虑预读的场景,Stream 的每个读取窗口至多占用 512KB * 3 = 1.5MB(Kafka 的默认 max.partition.fetch.bytes 为 1MB,读取的位点如果在 DataBlock 中间,则至多读取 3 个 DataBlock)。同样在 2 分区 10MB/s 和 1MB/s 并发读取的场景,AutoMQ 的追尾读缓存占用会稳定在 4MB,并且 2 个读取窗口会互相隔离,不会出现缓存互相驱逐的情况。

1GB/s 读取吞吐

追赶读的分区并发能力决定了 Kafka 能支撑多少业务同时追赶读。读取吞吐决定了业务决策的效率。AutoMQ 提供单机 1GB/s 追赶读吞吐主要由两点决定:对象存储和预读。

对象存储,虽然对象存储的操作耗时通常是百毫秒级别的,但是只要使用侧只提供充足的并发,即使不添加任何读写的优化,在对象存储后端庞大的资源池下,可以轻松提供 GB/s 的读写吞吐。以 S3 举例,假设 4MB 读取需要花费 100ms,那么只需要 25 个并发就可以达到 1 GB/s 的读取速度。

预读,Kafka 的追赶读消费宏观上看读取数据 -> 处理数据 -> 读取数据的循环,如果是直接透传请求到对象存储,那么对象存储的高延迟,会让读取的并发无法被充分利用,最终导致读取吞吐不理想。因此 AutoMQ 通过缓存预读来减少追赶读 Fetch 请求的处理耗时,尽量使得后续的追赶读请求均可被预读窗口所覆盖,以提高读取吞吐。

细心的读者这时候会有疑问了:AutoMQ 的缓存预读策略会不会导致 Stream 读取窗口占用过大,以至于出现 10MB 和 1MB 并发读取的互相驱逐现象么?

AutoMQ 为了避免这种情况的出现采取了以下预读策略:

  • 预读大小初始为 512KB,只有在读取窗口内上层读取出现 Cache Miss 时才会触发预读窗口大小的增加。未出现 Cache Miss,则说明当前预读的速度能满足追赶读的需求。

  • 读取窗口中的预读窗口最大不会超过 32MB。

  • 只有在 BlockCache 还有空余空间的时候才会发起预读,避免了内存紧张的情况下仍旧发起预读导致误驱逐。

读写隔离

AutoMQ 在支持追尾读高并发和高吞吐的同时,通过读写隔离确保了发送流量不受影响。如下图所示,AutoMQ 的读写隔离主要由两部分保障:

  • 读写链路隔离:写入链路,Producer 发送的消息存储到 EBS WAL 后就会响应给客户端成功;追赶读链路,追赶读的数据来自于 S3,因此也不会争抢 EBS WAL 的磁盘带宽和 IOPS。

  • 网络优先级限流:AutoMQ 可以设置整体的网络流入流出限制,并且 Producer 流量优先级高于追赶读 Consumer 的流量优先级,因此不会出现追赶读流量占满网络带宽从而影响发送的情况。

在这里插入图片描述

压测

环境准备

  • 服务端:阿里云 ecs.g8i.4xlarge,16C64G,数据盘 PL1 300GB

  • 压力机:阿里云 ecs.g8i.4xlarge,16C64G

AutoMQ 启动命令:堆内 32G,堆外 24G,BlockCache 14G,带宽限制 2GB/s。

# AutoMQ Version >= 1.2 
KAFKA_S3_ACCESS_KEY=xxxx KAFKA_S3_SECRET_KEY=xxxx KAFKA_HEAP_OPTS="-Xmx32g -Xms32g -XX:MaxDirectMemorySize=24G" ./bin/kafka-server-start.sh -daemon config/kraft/server.properties \
--override node.id=0 \
--override cluster.id=M_automq-catchup_____w \
--override controller.quorum.voters=0@${ip}:9093 \
--override advertised.listener=${ip}:9092 \
--override s3.data.buckets='0@s3://xxx_bucket?region=oss-cn-hangzhou&endpoint=https://oss-cn-hangzhou-internal.aliyuncs.com' \
--override s3.wal.path='0@file:///dev/nvme1n1?capacity=21474836480&iodepth=32&iops=4000' \
--override s3.telemetry.metrics.exporter.uri='otlp://?endpoint=http://xxxx&protocol=grpc' \
--override s3.stream.allocator.policy=POOLED_DIRECT \
--override s3.wal.cache.size=6442450944 \
--override s3.wal.upload.threshold=1572864000 \
--override s3.block.cache.size=12884901888 \
--override s3.network.baseline.bandwidth=2147483648 \
--override s3.stream.object.split.size=1048576

压测脚本:创建 50 个 Topic,每个 Topic 20 个分区,总共 1000 个分区,以 200MB/s 持续写入2 小时,然后从头开始消费,并且消费过程中仍旧保持 200MB/s 的写入流量。

KAFKA_HEAP_OPTS="-Xmx32g -Xms32g" nohup ./bin/automq-perf-test.sh --bootstrap-server ${bootstrapServer}:9092 \
--producer-configs batch.size=0 \
--consumer-configs fetch.max.wait.ms=1000 \
--topics 50 \
--partitions-per-topic 20 \
--producers-per-topic 2 \
--groups-per-topic 1 \
--consumers-per-group 4 \
--record-size 65536 \
--send-rate 3200 \
--backlog-duration 7200  \
--group-start-delay 0 \
--warmup-duration 1 \
--reset &

压测结果

  • 1000 个分区 2 个小时总共生产 1.37 TB 的数据;

  • 追赶读消费峰值 1.6GB/s,每个 Topic 均保持 32MB/s 的消费速度,总共耗时 18 分钟消费完 1.37 TB 的积压数据;

  • 追赶读期间发送流量仍旧稳定在 200MB/s,发送耗时 P99 从 5ms 上涨到 10ms,平均耗时仍旧维持在 2ms 以下;

在这里插入图片描述
在这里插入图片描述

参考资料

[1] AutoMQ: https://www.automq.com

[2] AutoMQ vs. Apache Kafka Benchmark: https://docs.automq.com/automq/benchmarks/benchmark-automq-vs-apache-kafka#catch-up-read

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2296442.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Rabbitmq篇】高级特性----TTL,死信队列,延迟队列

目录 一.TTL ???1.设置消息的TTL 2.设置队列的TTL 3.俩者区别? 二.死信队列 定义&#xff1a; 消息成为死信的原因&#xff1a; 1.消息被拒绝&#xff08;basic.reject 或 basic.nack&#xff09; 2.消息过期&#xff08;TTL&#xff09; 3.队列达到最大长度? …

【Java】多线程和高并发编程(三):锁(中)深入ReentrantLock

文章目录 3、深入ReentrantLock3.1 ReentrantLock和synchronized的区别3.2 AQS概述3.3 加锁流程源码剖析3.3.1 加锁流程概述3.3.2 三种加锁源码分析3.3.2.1 lock方法3.3.2.2 tryLock方法3.3.2.3 lockInterruptibly方法 3.4 释放锁流程源码剖析3.4.1 释放锁流程概述3.4.2 释放锁…

电路笔记(元器件):AD 5263数字电位计(暂记)

AD5263 是四通道、15 V、256位数字电位计&#xff0c;可通过SPI/I2C配置具体电平值。 配置模式&#xff1a; W引脚作为电位器的抽头&#xff0c;可在A-B之间调整任意位置的电阻值。也可将W与A(或B)引脚短接&#xff0c;A-W间的电阻总是0欧姆&#xff0c;通过数字接口调整电位器…

webpack【初体验】使用 webpack 打包一个程序

打包前 共 3 个文件 dist\index.html <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Webpack 示例&…

VMware安装CentOS 7(全网超详细图文保姆版教程)

文章目录 一、下载及安装 VMware1.1 VMware下载1.2 CentOS下载 二、搭建虚拟机环境2.1 创建新虚拟机2.2 选择自定义2.3 选择虚拟机硬件兼容性2.4 选择稍后安装操作系统2.5 选择Linux系统 版本选择 centos 7 64位2.6 设备你虚拟机的名字和保存位置&#xff08;保存位置建议在编辑…

mysql BUG 导致 show processlist 有大量的show slave stauts 处于init状态

一、详细报错信息&#xff1a; 1、执行show slave status\G 卡住 && stop slave也卡住 2、show processlist 发现 Waiting for commit lock NULL 锁 3、错误日志报错主备同步用户认证失败 二、报错原因&#xff08;分析过程&#xff09;&#xff1a; 1、排查备库日志…

机器学习在癌症分子亚型分类中的应用

学习笔记&#xff1a;机器学习在癌症分子亚型分类中的应用——Cancer Cell 研究解析 1. 文章基本信息 标题&#xff1a;Classification of non-TCGA cancer samples to TCGA molecular subtypes using machine learning发表期刊&#xff1a;Cancer Cell发表时间&#xff1a;20…

从MySQL优化到脑力健康:技术人与效率的双重提升

文章目录 零&#xff1a;前言一&#xff1a;MySQL性能优化的核心知识点1. 索引优化的最佳实践实战案例&#xff1a; 2. 高并发事务的处理机制实战案例&#xff1a; 3. 查询性能调优实战案例&#xff1a; 4. 缓存与连接池的优化实战案例&#xff1a; 二&#xff1a;技术工作者的…

Qt:项目文件解析

目录 QWidget基础项目文件解析 .pro文件解析 widget.h文件解析 widget.cpp文件解析 widget.ui文件解析 main.cpp文件解析 认识对象模型 窗口坐标系 QWidget基础项目文件解析 .pro文件解析 工程新建好之后&#xff0c;在工程目录列表中有⼀个后缀为 ".pro" …

react使用if判断

1、第一种 function Dade(req:any){console.log(req)if(req.data.id 1){return <span>66666</span>}return <span style{{color:"red"}}>8888</span>}2、使用 {win.map((req,index) > ( <> <Dade data{req}/>{req.id 1 ?…

conda 修复 libstdc++.so.6: version `GLIBCXX_3.4.30‘ not found 简便方法

ImportError: /data/home/hum/anaconda3/envs/ipc/bin/../lib/libstdc.so.6: version GLIBCXX_3.4.30 not found (required by /home/hum/anaconda3/envs/ipc/lib/python3.11/site-packages/paddle/base/libpaddle.so) 1. 检查版本 strings /data/home/hum/anaconda3/envs/ipc/…

python学opencv|读取图像(六十)先后使用cv2.erode()函数和cv2.dilate()函数实现图像处理

【1】引言 前序学习进程中&#xff0c;先后了解了使用cv2.erode()函数和cv2.dilate()函数实现图像腐蚀和膨胀处理的效果&#xff0c;相关文章链接为&#xff1a; python学opencv|读取图像&#xff08;五十八&#xff09;使用cv2.erode()函数实现图像腐蚀处理-CSDN博客 pytho…

Flink 内存模型各部分大小计算公式

Flink 的运行平台 如果 Flink 是运行在 yarn 或者 standalone 模式的话&#xff0c;其实都是运行在 JVM 的基础上的&#xff0c;所以首先 Flink 组件运行所需要给 JVM 本身要耗费的内存大小。无论是 JobManager 或者 TaskManager &#xff0c;他们 JVM 内存的大小都是一样的&a…

Qt修仙之路2-1 仿QQ登入 法宝初成

widget.cpp #include "widget.h" #include<QDebug> //实现槽函数 void Widget::login1() {QString userusername_input->text();QString passpassword_input->text();//如果不勾选无法登入if(!check->isChecked()){qDebug()<<"xxx"&…

从家庭IP到全球网络资源的无缝连接:Cliproxy的专业解决方案

数字化时代&#xff0c;家庭IP作为个人或家庭接入互联网的门户&#xff0c;其重要性日益凸显。然而&#xff0c;要实现从家庭IP到全球网络资源的无缝连接&#xff0c;并享受高效、安全、稳定的网络访问体验&#xff0c;往往需要借助专业的代理服务。Cliproxy&#xff0c;作为业…

【Java】多线程和高并发编程(四):阻塞队列(上)基础概念、ArrayBlockingQueue

文章目录 四、阻塞队列1、基础概念1.1 生产者消费者概念1.2 JUC阻塞队列的存取方法 2、ArrayBlockingQueue2.1 ArrayBlockingQueue的基本使用2.2 生产者方法实现原理2.2.1 ArrayBlockingQueue的常见属性2.2.2 add方法实现2.2.3 offer方法实现2.2.4 offer(time,unit)方法2.2.5 p…

TCP/IP 协议图解 | TCP 协议详解 | IP 协议详解

注&#xff1a;本文为 “TCP/IP 协议” 相关文章合辑。 未整理去重。 TCP/IP 协议图解 退休的汤姆 于 2021-07-01 16:14:25 发布 TCP/IP 协议简介 TCP/IP 协议包含了一系列的协议&#xff0c;也叫 TCP/IP 协议族&#xff08;TCP/IP Protocol Suite&#xff0c;或 TCP/IP Pr…

阿里云百炼初探DeepSeek模型调用

阿里云百炼初探DeepSeek模型调用 阿里云百炼为什么选择百炼开始使用百炼方式一&#xff1a;文本对话方式二&#xff1a;文本调试方式三&#xff1a;API调用 DeepSeek调用1、搜索模型2、查看API调用3、开始调用安装依赖查看API Key运行以下代码 4、流式输出 总结 阿里云百炼 阿…

蓝桥杯备赛——“双指针”“三指针”解决vector相关问题

一、寄包柜 相关代码&#xff1a; #include <iostream> #include <vector> using namespace std; const int N 1e5 10; int n, q; vector<int> a[N]; // 创建 N 个柜⼦ int main() {cin >> n >> q;while(q--){int op, i, j, k;cin >> …

【Java 面试 八股文】Redis篇

Redis 1. 什么是缓存穿透&#xff1f;怎么解决&#xff1f;2. 你能介绍一下布隆过滤器吗&#xff1f;3. 什么是缓存击穿&#xff1f;怎么解决&#xff1f;4. 什么是缓存雪崩&#xff1f;怎么解决&#xff1f;5. redis做为缓存&#xff0c;mysql的数据如何与redis进行同步呢&…