Kafka 为什么这么快?

news2024/11/15 16:40:49

Kafka 是一款性能非常优秀的消息队列,每秒处理的消息体量可以达到千万级别。今天来聊一聊 Kafka 高性能背后的技术原理。

1 批量发送

Kafka 收发消息都是批量进行处理的。我们看一下 Kafka 生产者发送消息的代码:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
 TopicPartition tp = null;
 try {
  //省略前面代码
  Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
  //把消息追加到之前缓存的这一批消息上
  RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
    serializedValue, headers, interceptCallback, remainingWaitMs);
  //积累到设置的缓存大小,则发送出去
  if (result.batchIsFull || result.newBatchCreated) {
   log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
   this.sender.wakeup();
  }
  return result.future;
  // handling exceptions and record the errors;
  // for API exceptions return them in the future,
  // for other exceptions throw directly
 } catch /**省略 catch 代码*/
}

从代码中可以看到,生产者调用 doSend 方法后,并不会直接把消息发送出去,而是把消息缓存起来,缓存消息量达到配置的批量大小后,才会发送出去。

注意:从上面 accumulator.append 代码可以看到,一批消息属于同一个 topic 下面的同一个 partition。

Broker 收到消息后,并不会把批量消息解析成单条消息后落盘,而是作为批量消息进行落盘,同时也会把批量消息直接同步给其他副本。

消费者拉取消息,也不会按照单条进行拉取,而是按照批量进行拉取,拉取到一批消息后,再解析成单条消息进行消费。

使用批量收发消息,减轻了客户端和 Broker 的交互次数,提升了 Broker 处理能力。

2 消息压缩

如果消息体比较大,Kafka 消息吞吐量要达到千万级别,网卡支持的网络传输带宽会是一个瓶颈。Kafka 的解决方案是消息压缩。发送消息时,如果增加参数 compression.type,就可以开启消息压缩:

public static void main(String[] args) {
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    //开启消息压缩
 props.put("compression.type", "gzip");
 Producer<String, String> producer = new KafkaProducer<>(props);

 ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key1", "value1");

 producer.send(record, new Callback() {
  @Override
  public void onCompletion(RecordMetadata metadata, Exception exception) {
   if (exception != null) {
    logger.error("sending message error: ", e);
   } else {
    logger.info("sending message successful, Offset: ", metadata.offset());
   }
  }
 });

 producer.close();
}

如果 compression.type 的值设置为 none,则不开启压缩。那消息是在什么时候进行压缩呢?前面提到过,生产者缓存一批消息后才会发送,在发送这批消息之前就会进行压缩,代码如下:

public RecordAppendResult append(TopicPartition tp,
         long timestamp,
         byte[] key,
         byte[] value,
         Header[] headers,
         Callback callback,
         long maxTimeToBlock) throws InterruptedException {
 // ...
 try {
  // ...
  buffer = free.allocate(size, maxTimeToBlock);
  synchronized (dq) {
   //...
   RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
   if (appendResult != null) {
    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
    return appendResult;
   }
            //这批消息缓存已满,这里进行压缩
   MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
   ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
   FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

   dq.addLast(batch);
   incomplete.add(batch);

   // Don't deallocate this buffer in the finally block as it's being used in the record batch
   buffer = null;

   return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
  }
 } finally {
  if (buffer != null)
   free.deallocate(buffer);
  appendsInProgress.decrementAndGet();
 }
}

上面的 recordsBuilder 方法最终调用了下面 MemoryRecordsBuilder 的构造方法。

public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
       byte magic,
       CompressionType compressionType,
       TimestampType timestampType,
       long baseOffset,
       long logAppendTime,
       long producerId,
       short producerEpoch,
       int baseSequence,
       boolean isTransactional,
       boolean isControlBatch,
       int partitionLeaderEpoch,
       int writeLimit) {
 //省略其他代码
 this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
}

上面的 wrapForOutput 方法会根据配置的压缩算法进行压缩或者选择不压缩。目前 Kafka 支持的压缩算法包括:gzip、snappy、lz4,从 2.1.0 版本开始,Kafka 支持 Zstandard 算法。

在 Broker 端,会解压 header 做一些校验,但不会解压消息体。消息体的解压是在消费端,消费者拉取到一批消息后,首先会进行解压,然后进行消息处理。

因为压缩和解压都是耗费 CPU 的操作,所以在开启消息压缩时,也要考虑生产者和消费者的 CPU 资源情况。

有了消息批量收集和压缩,kafka 生产者发送消息的过程如下图:

图片

3 磁盘顺序读写

顺序读写省去了寻址的时间,只要一次寻址,就可以连续读写。

在固态硬盘上,顺序读写的性能是随机读写的好几倍。而在机械硬盘上,寻址时需要移动磁头,这个机械运动会花费很多时间,因此机械硬盘的顺序读写性能是随机读写的几十倍。

Kafka 的 Broker 在写消息数据时,首先为每个 Partition 创建一个文件,然后把数据顺序地追加到该文件对应的磁盘空间中,如果这个文件写满了,就再创建一个新文件继续追加写。这样大大减少了寻址时间,提高了读写性能。

4 PageCache

在 Linux 系统中,所有文件 IO 操作都要通过 PageCache,PageCache 是磁盘文件在内存中建立的缓存。当应用程序读写文件时,并不会直接读写磁盘上的文件,而是操作 PageCache。

图片

应用程序写文件时,都先会把数据写入 PageCache,然后操作系统定期地将 PageCache 的数据写到磁盘上。如下图:

图片

而应用程序在读取文件数据时,首先会判断数据是否在 PageCache 中,如果在则直接读取,如果不在,则读取磁盘,并且将数据缓存到 PageCache。

图片

Kafka 充分利用了 PageCache 的优势,当生产者生产消息的速率和消费者消费消息的速率差不多时,Kafka 基本可以不用落盘就能完成消息的传输。

5 零拷贝

Kafka Broker 将消息发送给消费端时,即使命中了 PageCache,也需要将 PageCache 中的数据先复制到应用程序的内存空间,然后从应用程序的内存空间复制到 Socket 缓存区,将数据发送出去。如下图:

图片

Kafka 采用了零拷贝技术把数据直接从 PageCache 复制到 Socket 缓冲区中,这样数据不用复制到用户态的内存空间,同时 DMA 控制器直接完成数据复制,不需要 CPU 参与。如下图:

图片

Java 零拷贝技术采用 FileChannel.transferTo() 方法,底层调用了 sendfile 方法。

6 mmap

Kafka 的日志文件分为数据文件(.log)和索引文件(.index),Kafka 为了提高索引文件的读取性能,对索引文件采用了 mmap 内存映射,将索引文件映射到进程的内存空间,这样读取索引文件就不需要从磁盘进行读取。如下图:

图片

7 总结

本文介绍了 Kafka 实现高性能用到的关键技术,这些技术可以为我们学习和工作提供参考。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2154656.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构C语言】【入门】【首次万字详细解析】入门阶段数据结构可能用到的C语言知识,一章让你看懂数据结构!!!!!!!

前言&#xff1a;欢迎各位光临本博客&#xff0c;这里小编带你直接手撕入门阶段的数据结构的C语言知识&#xff0c;让你不再看见数据结构就走不动道。文章并不复杂&#xff0c;愿诸君耐其心性&#xff0c;忘却杂尘&#xff0c;道有所长&#xff01;&#xff01;&#xff01;&am…

论文阅读 - MDFEND: Multi-domain Fake News Detection

https://arxiv.org/pdf/2201.00987 目录 ABSTRACT INTRODUCTION 2 RELATED WORK 3 WEIBO21: A NEW DATASET FOR MFND 3.1 Data Collection 3.2 Domain Annotation 4 MDFEND: MULTI-DOMAIN FAKE NEWS DETECTION MODEL 4.1 Representation Extraction 4.2 Domain Gate 4.…

【Stm32】从零建立一个工程

这里我们创建“STM32F103”系列的文件&#xff0c;基于“固件库” 1.固件库获取 https://www.st.com.cn/zh/embedded-software/stm32-standard-peripheral-libraries.html 2.使用Keil创建.uvprojx文件 前提是已经下载好了“芯片对应的固件” 3.复制底层驱动代码 将固件库下的…

【图表如何自动排序】

前提&#xff1a;有这样一个表&#xff1a;第1列为姓名&#xff08;用字母A~I代替&#xff09;&#xff0c;第2列假设为销量 用到的函数&#xff1a; Sort(自动排序的区域&#xff0c;以哪一列为基础来排序&#xff0c;降序or升序&#xff09; Choosecols(选择的区域&#xf…

Spring在不同类型之间也能相互拷贝?

场景还原 日常开发中&#xff0c;我们会定义非常多的实体&#xff0c;例如VO、DTO等&#xff0c;在涉及实体类的相互转换时&#xff0c;常使用Spring提供的BeanUtils.copyProperties&#xff0c;该类虽好&#xff0c;可不能贪用。 这不在使用过程中就遇到一个大坑&#xff0c…

二十、功率放大电路

功率放大电路 1、乙类功率放大器的工作过程以及交越失真; 2、复合三极管的复合规则 3、甲乙类功率放大器的工作原理、自举过程

智谱AI:CogVideoX-2b——视频生成模型的得力工具

智谱AI&#xff1a;CogVideoX-2b——视频生成模型的得力工具 文章目录 CogVideoX 简介——它是什么&#xff1f;CogVideoX 具体部署与实践指南一、创建丹摩实例二、配置环境和依赖三、上传模型与配置文件四、开始运行五、Web UI 演示 CogVideoX 简介——它是什么&#xff1f; …

电线覆盖物检测数据集 气球风筝鸟巢 1300张 voc yol

电线覆盖物检测数据集 气球风筝鸟巢 1300张 voc yol 电线覆盖物检测数据集 数据集描述 该数据集是一个专门用于检测电线及其周围环境中的异物的数据集&#xff0c;旨在帮助研究人员和开发者训练和评估基于深度学习的目标检测模型。数据集涵盖了五种常见的电线覆盖物类型&…

基于 Qwen2.5-Coder 模型和 CrewAI 多智能体框架,实现智能编程系统的实战教程

9 月 19 日&#xff0c;阿里开源了 Qwen2.5 系列大模型全家桶&#xff1a;除常规的语言模型 Qwen2.5 之外&#xff0c;还发布了专门针对编程的Qwen2.5-Coder模型和数学的 Qwen2.5-Math 模型&#xff0c;并且针对每个模型都提供了不同规模参数版本&#xff0c;包括&#xff1a; …

yolov8模型在手部关键点检测识别中的应用【代码+数据集+python环境+GUI系统】

yolov8模型在手部关键点检测识别中的应用【代码数据集python环境GUI系统】 背景意义 在手势识别、虚拟现实&#xff08;VR&#xff09;、增强现实&#xff08;AR&#xff09;等领域&#xff0c;手部关键点检测为用户提供了更加自然、直观的交互方式。通过检测手部关键点&#…

通信工程学习:什么是VLAN虚拟局域网

VLAN&#xff1a;虚拟局域网 VLAN&#xff08;Virtual Local Area Network&#xff0c;虚拟局域网&#xff09;是一种将物理局域网在逻辑上划分成多个广播域的通信技术。以下是关于VLAN的详细解释&#xff1a; 一、VLAN虚拟局域网的定义与概述 VLAN通过逻辑方式将网络中的设备…

【Proteus仿真】基于51单片机的L298N电机电速调节

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于51单片机&#xff0c;L298N电机驱动连接电机&#xff0c;采用调节PWM占空比来控制电机速度转动。 仿真图&#xff1a; 编辑 二、硬件资源 基于KEIL5编写C代码&#xff0c;PROTEUS8.15进行…

系统架构笔记-2-计算机系统基础知识

知识要点-2.6计算机语言 UML 对系统架构的定义是系统的组织结构&#xff0c;包括系统分解的组成部分以及它们的关联性、交互机制和指导原则等&#xff0c;提供系统设计的信息。 具体有以下 5 个系统视图&#xff1a; 1. 逻辑视图&#xff1a;也称为设计视图&#xff0c;表示…

3.《DevOps》系列K8S部署CICD流水线之部署MetalLB负载均衡器和Helm部署Ingress-Nginx

架构 服务器IP服务名称硬件配置192.168.1.100k8s-master8核、16G、120G192.168.1.101k8s-node18核、16G、120G192.168.1.102k8s-node28核、16G、120G192.168.1.103nfs2核、4G、500G操作系统:Rocky9.3 后续通过K8S部署GitLab、Harbor、Jenkins 为什么使用MetalLB 当使用云平…

【BEV 视图变换】Ray-based(2): 代码复现+画图解释 基于深度估计、bev_pool

paper&#xff1a;Lift, Splat, Shoot: Encoding Images from Arbitrary Camera Rigs by Implicitly Unprojecting to 3D code&#xff1a;https://github.com/nv-tlabs/lift-splat-shoot 一、完整复现代码(可一键运行)和效果图 import torch import torch.nn as nn import mat…

Springboot3 + MyBatis-Plus + MySql + Uniapp 商品加入购物车功能实现(最新教程附源码)

Springboot3 MyBatis-Plus MySql Uniapp 商品加入购物车功能实现&#xff08;针对上一篇sku&#xff09; 1、效果展示2、后端代码2.1 model2.2 mapper server serverImpl 参照上一篇自动生成2.3 controller 3、前端代码3.1 index.js3.2 shop-info.vue3.3 ShopBottomButton.v…

掌上高考爬虫逆向分析

目标网站 aHR0cHM6Ly93d3cuZ2Fva2FvLmNuL3NjaG9vbC9zZWFyY2g/cmVjb21zY2hwcm9wPSVFNSU4QyVCQiVFOCU4RCVBRg 一、抓包分析 二、逆向分析 搜索定位加密参数 本地生成代码 var CryptoJS require(crypto-js) var crypto require(crypto);f "D23ABC#56"function v(t…

机器学习之实战篇——图像压缩(K-means聚类算法)

机器学习之实战篇——图像压缩(K-means聚类算法&#xff09; 0. 文章传送1.实验任务2.实验思想3.实验过程 0. 文章传送 机器学习之监督学习&#xff08;一&#xff09;线性回归、多项式回归、算法优化[巨详细笔记] 机器学习之监督学习&#xff08;二&#xff09;二元逻辑回归 …

Unity自我实现响应式属性

其实只是写着玩,响应式编程建议使用UniRx插件(一套成熟的响应式编程解决方案),我写的主要是借鉴一下这个思想,实现的也不够优雅,不过逻辑也算严密可以正常使用.你可以查看我写的理解响应式属性的思想. 借鉴UniRx的ReactiveProperty类,且UniRx不仅有响应式属性. using System; …

光伏板缺陷红外检测数据集

光伏板缺陷红外检测数据集 包含以下4个数据文件&#xff1a; /train&#xff1a;训练集 /valid&#xff1a;验证集 /test&#xff1a;测试集 README.txt&#xff1a;数据说明 【数据说明】检测目标以Pascal VOC格式进行标注&#xff0c;对每个图像进行以下预处理&#xff0c;统…