Kafka之Producer源码

news2025/1/13 17:28:18

Producer源码解读

在 Kafka 中, 我们把产生消息的一方称为 Producer 即 生产者, 它是 Kafka 的核心组件之一, 也是消息的来源所在。它的主要功能是将客户端的请求打包封装发送到 kafka 集群的某个 Topic 的某个分区上。那么这些生产者产生的消息是怎么传到 Kafka 服务端的呢?

Producer之整体流程

我们回顾一下之前我们讲过Kafka一条消息发送和消费的流程

image.png

但是站在源码的核心角度,我们可以把Producer分成以下几个核心部分:

1、Producer之初始化

2、Producer之发送流程

3、Producer之缓冲区

4、Producer之参数与调优

Producer源码解读

从生产流程可以知道,Producer里面的核心有序列化器,分区器,还有缓冲,所以初始化的流程肯定是围绕这几个核心来处理。

image.png

KafkaProducer之初始化

image.png

image.png

因为源码中有非常多的一些额外处理,所以我们解读源码没必要每行都读,只需要根据我们之前梳理的主流程找到核心代码进行解读就可以,这也是推荐大家去初次解读源码的最优方式。

1)、设置分区器

设置分区器(partitioner),分区器是支持自定义的

image.png

2)、设置重试时间

设置重试时间(retryBackoffMs)默认100ms

如果发送消息到broker时抛出异常,且是允许重试的异常,那么就会最大重试retries参数指定的次数,同时retryBackoffMs是重试的间隔。

image.png

3)、设置序列化器

设置序列化器(Serializer)

image.png

4)、设置拦截器

设置拦截器(interceptors),关于拦截器,这个后面会有讲解和介绍。

image.png

5)、设置缓冲区

image.png

在之前,还有一些参数的设置。

image.png

1、设置最大的消息为多大(maxRequestSize), 默认最大1M, 生产环境可以提高到10M

2、设置缓存大小(totalMemorySize) 默认是32M

3、设置压缩格式(compressionType)

4、初始化RecordAccumulator也就是缓冲区指定为32M

6)、设置消息累加器

因为生产者是通过缓冲的方式发送,发送的条件之前的课程讲过,所以这里需要一个消息累加器配合才能完成消息的发送。

image.png

5、初始化集群元数据(metadata),刚开始空的

image.png

6)、创建Sender线程

image.png

这里还初始化了一个重要的管理网路的组件 NetworkClient

image.png

KafkaThread将Sender设置为守护线程并启动

image.png

拦截器使用及介绍

这里讲一讲拦截器的使用和基本作用,拦截器一般用得不多,所以这里只是讲一讲案例,不推荐生产中使用。

想要实现拦截器,我们需要先实现ProducerInterceptor接口即可,然后在生产者中设置进去即可。

image.png

image.png

1、想要把发送的数据都带上时间戳image.png

2、实现统计发送消息的成功次数和失败次数

onAcknowledgement(RecordMetadata, Exception)里面,根据消息发送后返回的异常信息来判断是否发送成功。一般异常如果为空就说明发送成功了,反之就说明发送失败了。

然后定义两个变量,并根据Exception的值分别累加就可以统计到了

最后在close方法里输出两个变量的值,这样当producer发送数据结束并close后,会自动调用拦截器的close方法来输出咱们想要统计的成功和失败次数

image.png

image.png

不过这里要注意一个点:

onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很复杂的逻辑,否则会拖慢producer的消息发送效率。

3、拦截链路

image.png

拦截器链里的拦截器是按照顺序组成的,因此我们要注意前后拦截器对彼此的影响,比如这里拦截器1的onsend方法不能返回null,不然拦截器2的onsend就丢失了信息,会发生异常。

Producer之发送流程

Producer之发送流程

Kafka Producer 发送消息流程如下:

1)、执行拦截器逻辑

执行拦截器逻辑,预处理消息, 封装 Producer Record

image.png

2)、集群元数据

从 Kafka Broker 集群获取集群元数据metadata

image.png

3)、序列化

调用Serializer.serialize()方法进行消息的key/value序列化

image.png

4)、分区

调用partition()选择合适的分区策略,给消息体 Producer Record 分配要发送的 topic 分区号

image.png

5)、消息累加进缓存

将消息缓存到RecordAccumulator 收集器中, 最后判断是否要发送。

image.png

7)、消息发送

前面我们也知道真正的消息发送是Sender线程来做,并且这里还要结合缓冲区来处理。后面会对这个进行详细的讲解,这里我们只需要知道发送的条件:

批次发送的条件为:缓冲区数据大小达到 batch.size 或者 linger.ms 达到上限,哪个先达到就算哪个

Producer之缓冲区

Kafka生产者的缓冲区,也就是内存池,可以将其类比为连接池(DB, Redis),主要是避免不必要的创建连接的开销, 这样内存池可以对 RecordBatch 做到反复利用, 防止引起Full GC问题。那我们看看 Kafka 内存池是怎么设计的。

核心就是这段代码:

image.png

image.png

   Kafka 内存设计有两部分,下面的粉色的是可用的内存(未分配的内存,初始的时候是 32M),上面紫色的是已经被分配了的内存,每个小 Batch 是 16K,然后这一个个的 Batch 就可以被反复利用,不需要每次都申请内存,  两部分加起来是 32M。
申请内存的过程

从 Producer 发送流程的第6步中可以看到会把消息放入 accumulator中, 即调用 accumulator.append() 追加, 然后把消息封装成一个个Batch 进行发送, 然后去申请内存(free.allocate())

image.png

image.png

(1)如果申请的内存大小超过了整个缓存池的大小,则抛异常出来

image.png

(2)对整个方法加锁:

this.lock.lock();

(3)如果申请的大小是每个 recordBatch 的大小(16K),并且已分配内存不为空,则直接取出来一个返回。

if (size == poolableSize && !this.free.isEmpty())
    return this.free.pollFirst();

image.png

(4)如果整个内存池大小比要申请的内存大小大 (this.availableMemory + freeListSize >= size),则直接从可用内存(即上图粉色的区域)申请一块内存。并且可用内存要去掉申请的那一块内存。

image.png

Sender线程

image.png

Producer之参数调优

     我们知道在 Kafka 实际使用中,Producer 端既要保证吞吐量,又要确保无消息丢失,一些核心参数的配置就显得至关重要。接下来我们就来看看生产端都有哪些重要的参数,及调优建议。

acks

参数说明:对于 Kafka Producer 来说是一个非常重要的参数,它表示指定分区中成功写入消息的副本数量,是 Kafka 生产端消息的持久性的保证, 详细可以查看

max.request.size

参数说明:这个参数对于 Kafka Producer 也比较重要, 表示生产端能够发送的最大消息大小,默认值为1048576(1M)

  调优建议:这个配置对于生产环境来说有点小, **为了避免因消息过大导致发送失败,生产环境建议适当调大,比如可以调到10485760(10M)** 。

retries

参数说明:表示生产端消息发送失败时的重试次数,默认值为0,即不重试。 这个参数一般是为了解决因系统瞬时故障导致的消息发送失败,比如网络抖动、Leader 选举及重选举,其中瞬时的 Leader 重选举是比较常见的。因此这个参数的设置对于 Kafka Producer 就显得非常重要

 调优建议:这里建议设置为一个大于0的值,比如3次。

retry.backoff.ms

参数说明:**设定两次重试之间的时间间隔,避免无效的频繁重试,默认值为100, ****主要跟 retries 配合使用, **在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,需要设定总的重试时间要大于异常恢复时间,避免生产者过早的放弃重试。

connections.max.idele.ms

参数说明:主要用来判断多久之后关闭空闲的链接,默认值540000(ms)即9分钟。

compression.type

参数说明: 该参数表示生产端是否要对消息进行压缩,默认值为不压缩(none)。 压缩可以显著减少网络IO传输、磁盘IO以及磁盘空间,从而提升整体吞吐量,但也是以牺牲CPU开销为代价的。

 调优建议:出于提升吞吐量的考虑,建议在生产端对消息进行压缩。**对于Kafka来说,综合考虑吞吐量与压缩比,建议选择lz4压缩。如果追求最高的压缩比则推荐zstd压缩。**

buffer.memory

参数说明: 该参数表示生产端消息缓冲池或缓冲区的大小,默认值为即33554432(32M) 。这个参数基本可以认为是 Producer 程序所使用的内存大小。

调优建议:通常我们应尽量保证生产端整体吞吐量,建议适当调大该参数,也意味着生产客户端会占用更多的内存。

batch.size

参数说明: 该参数表示发送到缓冲区中的消息会被封装成一个一个的Batch,分批次的发送到 Broker 端,默认值为16KB。 因此减小 batch 大小有利于降低消息延时,增加 batch 大小有利于提升吞吐量。

 调优建议:通常合理调大该参数值,能够显著提升生产端吞吐量,比如可以调整到32KB,调大也意味着消息会有相对较大的延时。

linger.ms

参数说明: 该参数表示用来控制 Batch 最大的空闲时间,超过该时间的 Batch 也会自动被发送到 Broker 端。 实际情况中, 这是吞吐量与延时之间的权衡。默认值为0,表示消息需要被立即发送,无需关系 batch 是否被填满。

  调优建议:通常为了减少请求次数、提升整体吞吐量,建议设置一个大于0的值,比如设置为100,此时会在负载低的情况下带来100ms的延时。  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1470971.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

unity发布webGL压缩方式的gzip,使用nginx作为web服务器时的配置文件

unity发布webGL压缩方式的gzip,使用nginx作为web服务器时的配置文件 Unity版本是:2021.3 nginx的版本是:nginx-1.25.4 Unity发布webgl时的测试 设置压缩方式是gzip nginx配置文件 worker_processes 1;events {worker_connections 102…

SpringBoot实现热插拔AOP

热插拔AOP执行核心逻辑 Advice:“通知”,表示 Aspect 在特定的 Join point 采取的操作。包括 “around”, “before” and “after 等 Advice,大体上分为了三类:BeforeAdvice、MethodInterceptor、AfterAdviceAdvisor&#xff1a…

STM32存储左右互搏 QSPI总线FATS文件读写FLASH W25QXX

STM32存储左右互搏 QSPI总线FATS文件读写FLASH W25QXX FLASH是常用的一种非易失存储单元,W25QXX系列Flash有不同容量的型号,如W25Q64的容量为64Mbit,也就是8MByte。这里介绍STM32CUBEIDE开发平台HAL库Quad SPI总线实现FATS文件操作W25Q各型号…

智能SQL生成:后端技术与LLM的完美结合

文章目录 引言一、什么是大模型二、为什么选择LLM三、开发技术说明四、系统架构说明五、编码实战1. Maven2. 讯飞大模型配置类3. LLM相关的封装4. 编写LLM的service5. 编写controller6. 运行测试 六、总结 引言 本篇文章主要是关于实现一个类似Chat2DB的根据自然语言生成SQL的…

SpringMVC 学习(四)之获取请求参数

目录 1 通过 HttpServletRequest 获取请求参数 2 通过控制器方法的形参获取请求参数 3 通过 POJO 获取请求参数&#xff08;重点&#xff09; 1 通过 HttpServletRequest 获取请求参数 public String handler1(HttpServletRequest request) <form action"${pageCont…

微信小程序02: 使用微信快速验证组件code获取手机号

全文目录,一步到位 1.前言简介1.1 专栏传送门1.1.1 上文小总结1.1.2 上文传送门 2. 微信小程序获取手机号2.1 业务场景(使用与充值)2.2 准备工作2.3 具体代码使用与注释如下2.3.1 代码解释(一)[无需复制]2.3.2 代码解释(二)[无需复制] 2.4 最后一步 获取手机号信息2.4.1 两行代…

电商评价分析:NLP信息抽取技术在用户评论中的应用与挖掘

一、引言 在2019年&#xff0c;电子商务的蓬勃发展不仅推动了消费市场的增长&#xff0c;也带来了海量的用户评价数据。这些数据&#xff0c;作为消费者对商品和服务直接反馈的载体&#xff0c;蕴含着巨大的价值。然而&#xff0c;由于其非结构化的特性&#xff0c;这些文本信息…

YOLOv8改进 | Conv篇 | 全新的SOATA轻量化下采样操作ADown(参数量下降百分之二十,附手撕结构图)

一、本文介绍 本文给大家带来的改进机制是利用2024/02/21号最新发布的YOLOv9其中提出的ADown模块来改进我们的Conv模块,其中YOLOv9针对于这个模块并没有介绍,只是在其项目文件中用到了,我将其整理出来用于我们的YOLOv8的项目,经过实验我发现该卷积模块(作为下采样模块)…

半导体物理基础-笔记(续)

源内容参考&#xff1a;https://www.bilibili.com/video/BV11U4y1k7zn/?spm_id_from333.337.search-card.all.click&vd_source61654d4a6e8d7941436149dd99026962 掺杂半导体的费米能级与温度及杂质浓度的关系图 在温度一定的条件下&#xff0c;施主杂质浓度越高&#xff0…

压力测试工具Jmeter的下载与使用

1、进入官网下载Jmeter https://jmeter.apache.org/ 国内镜像&#xff08;下载的慢的话可以用国内镜像下载&#xff09; https://mirrors.cloud.tencent.com/apache/jmeter/binaries/ 2、跳转到下载页面 3、根据不同系统下载相应版本的Jmeter压缩包&#xff0c;Linux系统下载…

Repeater:创建大量类似项

Repeater 类型用于创建大量类似项。与其它视图类型一样&#xff0c;Repeater有一个model和一个delegate。 首次创建Repeater时&#xff0c;会创建其所有delegate项。若存在大量delegate项&#xff0c;并且并非所有项都必须同时可见&#xff0c;则可能会降低效率。 有2种方式可…

2024年大路灯无广测评推荐:书客、柏曼、霍尼韦尔大路灯哪个品牌更好?

临近开学&#xff0c;护眼大路灯哪个品牌好的话题度在不断提高&#xff01; 有人说大路灯是“智商税”&#xff0c;但也有人说“学生党福音”、“照明神器”&#xff0c;吸引了大量人群的关注。在没用过大路灯之前我也很担心在担心是否是智商税的问题&#xff0c;直到我自己入…

BUU [CISCN2019 华东南赛区]Web4

BUU [CISCN2019 华东南赛区]Web4 题目描述&#xff1a;Click to launch instance. 开题&#xff1a; 点击链接&#xff0c;有点像SSRF 使用local_file://协议读到本地文件&#xff0c;无法使用file://协议读取&#xff0c;有过滤。 local_file://协议&#xff1a; local_file…

stable-diffusion-webui+sadTalker开启GFPGAN as Face enhancer

接上一篇&#xff1a;在autodl搭建stable-diffusion-webuisadTalker-CSDN博客 要开启sadTalker gfpgan as face enhancer&#xff0c; 需要将 1. stable-diffusion-webui/extensions/SadTalker/gfpgan/weights 目录下的文件拷贝到 :~/autodl-tmp/models/GFPGAN/目录下 2.将G…

探索创意的无尽宇宙——Photoshop 2020,你的视觉魔法棒

在数字艺术的广阔天地中&#xff0c;Photoshop 2020无疑是一颗璀璨的明星。这款由Adobe公司精心打造的图像处理软件&#xff0c;自推出以来&#xff0c;便以其强大的功能和卓越的性能&#xff0c;赢得了全球数百万设计师、摄影师和爱好者的青睐。无论是Mac还是Windows系统&…

冯诺依曼体系结构 与 操作系统

一、冯诺依曼体系结构 深入理解冯诺依曼体系结构 计算机的出现就是为了解决实际问题, 所以把问题交给计算机&#xff0c;计算机经过处理&#xff0c;得到一个结果反馈给我们&#xff0c;所以这中间就必然涉及到了输入设备&#xff0c;中央处理器(包括运算器和控制器)和输出设备…

Find My小风扇|苹果Find My技术与小风扇结合,智能防丢,全球定位

电风扇在我们的日常生活中也是经常会使用到的家电产品&#xff0c;尤其是在炎炎的夏日&#xff0c;风扇能给我们吹来清凉的凉风&#xff0c;如今随身携带的小风扇成为人们出门的必备物品&#xff0c;由于体积小方便经常会被人遗忘在某个地方导致丢失。 在智能化加持下&#x…

C#,数组数据波形排序(Sort in Wave Form)的朴素算法与源代码

1 波形排序 所谓“波形排序”就是一大一小。 将n个身高互不相同的人排成一行 ,对于每个人 ,要求他要么比相邻的人均高 ,要么比相邻的人均矮 ,问共有多少种排法 ,这一问题称为波形排列问题。 2 源程序 using System; using System.Collections; using System.Collections.Gen…

《Docker 简易速速上手小册》第9章 Docker 与持续集成(2024 最新版)

文章目录 9.1 持续集成的基本概念9.1.1 重点基础知识9.1.2 重点案例&#xff1a;Python Web 应用的 CI 流程9.1.3 拓展案例 1&#xff1a;Python 数据分析项目的 CI9.1.4 拓展案例 2&#xff1a;Python 微服务的 CI/CD 9.2 Docker 在 CI/CD 中的应用9.2.1 重点基础知识9.2.2 重…

MCU多核异构通信原理

摘要&#xff1a; 本文结合瑞萨RZ/G2L 多核处理器&#xff0c;给大家讲述一下多核异构设计及通信的原理。 随着电子技术的不断发展&#xff0c;以及市场需求的日益增长&#xff0c;嵌入式系统不仅要求执行复杂的控制任务&#xff0c;还需要实时地采集和处理数据。 为了满足这…