Producer源码解读

news2024/11/18 17:24:36

Producer源码解读

在 Kafka 中, 我们把产生消息的一方称为 Producer 即 生产者, 它是 Kafka 的核心组件之一, 也是消息的来源所在。它的主要功能是将客户端的请求打包封装发送到 kafka 集群的某个 Topic 的某个分区上。那么这些生产者产生的消息是怎么传到 Kafka 服务端的呢?

Producer之整体流程

image.png

但是站在源码的核心角度,我们可以把Producer分成以下几个核心部分:

1、Producer之初始化

2、Producer之发送流程

3、Producer之缓冲区

4、Producer之参数与调优

Producer源码解读

从生产流程可以知道,Producer里面的核心有序列化器,分区器,还有缓冲,所以初始化的流程肯定是围绕这几个核心来处理。

image.png

KafkaProducer之初始化

image.png

image.png

1)、设置分区器

设置分区器(partitioner),分区器是支持自定义的

image.png

2)、设置每次重试间隔时间

设置每次重试间隔时间(retryBackoffMs)默认100ms

如果发送消息到broker时抛出异常,且是允许重试的异常,那么就会最大重试retries参数指定的次数,同时retryBackoffMs是重试的间隔。

image.png

3)、设置序列化器

设置序列化器(Serializer)

image.png

4)、设置拦截器

设置拦截器(interceptors),关于拦截器,这个后面会有讲解和介绍。

image.png

5)、设置缓冲区

image.png

在之前,还有一些参数的设置。

image.png

1、设置最大的消息为多大(maxRequestSize), 默认最大1M, 生产环境可以提高到10M

2、设置缓存大小(totalMemorySize) 默认是32M

3、设置压缩格式(compressionType)

4、初始化RecordAccumulator也就是缓冲区指定为32M

6)、设置消息累加器

因为生产者是通过缓冲的方式发送,发送的条件之前的课程讲过,所以这里需要一个消息累加器配合才能完成消息的发送。

image.png

5、初始化集群元数据(metadata),刚开始空的

image.png

6)、创建Sender线程

image.png

这里还初始化了一个重要的管理网路的组件 NetworkClient

image.png

KafkaThread将Sender设置为守护线程并启动

image.png

拦截器使用及介绍

这里讲一讲拦截器的使用和基本作用,拦截器一般用得不多,所以这里只是讲一讲案例,不推荐生产中使用。

想要实现拦截器,我们需要先实现ProducerInterceptor接口即可,然后在生产者中设置进去即可。

image.png

image.png

1、想要把发送的数据都带上时间戳image.png

2、实现统计发送消息的成功次数和失败次数

onAcknowledgement(RecordMetadata, Exception)里面,根据消息发送后返回的异常信息来判断是否发送成功。一般异常如果为空就说明发送成功了,反之就说明发送失败了。

然后定义两个变量,并根据Exception的值分别累加就可以统计到了

最后在close方法里输出两个变量的值,这样当producer发送数据结束并close后,会自动调用拦截器的close方法来输出咱们想要统计的成功和失败次数

image.png

image.png

不过这里要注意一个点:

onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很复杂的逻辑,否则会拖慢producer的消息发送效率。

3、拦截链路

   // 设置属性
        Properties properties = new Properties();
        // 指定连接的kafka服务器的地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        // 设置String的序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //设置自定义拦截器
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SelfInterceptor.class);

        //设置拦截链路(设置多个 SelfInterceptor  先执行   再执行SelfInterceptor2)
        ArrayList<String> interceptors = new ArrayList<>();
        interceptors.add("com.llp.interceptor.SelfInterceptor");//注意:这里是拦截器的全类名
        interceptors.add("com.llp.interceptor.SelfInterceptor2"); //这里假设有SelfInterceptor2
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

拦截器链里的拦截器是按照顺序组成的,因此我们要注意前后拦截器对彼此的影响,比如这里拦截器1的onsend方法不能返回null,不然拦截器2的onsend就丢失了信息,会发生异常。

Producer之发送流程

Producer之发送流程

Kafka Producer 发送消息流程如下:

1)、执行拦截器逻辑

执行拦截器逻辑,预处理消息, 封装 Producer Record

image.png

2)、集群元数据

从 Kafka Broker 集群获取集群元数据metadata

image.png

3)、序列化

调用Serializer.serialize()方法进行消息的key/value序列化

image.png

4)、分区

调用partition()选择合适的分区策略,给消息体 Producer Record 分配要发送的 topic 分区号

image.png

5)、消息累加进缓存

将消息缓存到RecordAccumulator 收集器中, 最后判断是否要发送。

image.png

7)、消息发送

前面我们也知道真正的消息发送是Sender线程来做,并且这里还要结合缓冲区来处理。

批次发送的条件为:缓冲区数据大小达到 batch.size 或者 linger.ms 达到上限,哪个先达到就算哪个

Producer之缓冲区

Kafka生产者的缓冲区,也就是内存池,可以将其类比为连接池(DB, Redis),主要是避免不必要的创建连接的开销, 这样内存池可以对 RecordBatch 做到反复利用, 防止引起Full GC问题。那我们看看 Kafka 内存池是怎么设计的。

核心就是这段代码:

image.png

image.png

   Kafka 内存设计有两部分,下面的粉色的是可用的内存(未分配的内存,初始的时候是 32M),上面紫色的是已经被分配了的内存,每个小 Batch 是 16K,然后这一个个的 Batch 就可以被反复利用,不需要每次都申请内存,  两部分加起来是 32M。
申请内存的过程

从 Producer 发送流程的第6步中可以看到会把消息放入 accumulator中, 即调用 accumulator.append() 追加, 然后把消息封装成一个个Batch 进行发送, 然后去申请内存(free.allocate())

image.png

image.png

(1)如果申请的内存大小超过了整个缓存池的大小,则抛异常出来

image.png

(2)对整个方法加锁:

this.lock.lock();

(3)如果申请的大小是每个 recordBatch 的大小(16K),并且已分配内存不为空,则直接取出来一个返回。

if (size == poolableSize && !this.free.isEmpty())
    return this.free.pollFirst();

image.png

(4)如果整个内存池大小比要申请的内存大小大 (this.availableMemory + freeListSize >= size),则直接从可用内存(即上图粉色的区域)申请一块内存。并且可用内存要去掉申请的那一块内存。

image.png

Sender线程

image.png

Producer之参数调优

Kafka 实际使用中,Producer 端既要保证吞吐量,又要确保无消息丢失,一些核心参数的配置就显得至关重要。接下来我们就来看看生产端都有哪些重要的参数,及调优建议。

acks

参数说明:对于 Kafka Producer 来说是一个非常重要的参数,它表示指定分区中成功写入消息的副本数量,是 Kafka 生产端消息的持久性的保证, 详细可以查看

发送确认机制 3 种不同的确认模式。

acks=0 意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入Kafka 。
acks=1 意味若首领在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。
acks=all 意味着首领在返回确认或错误响应之前,会等待(min.insync.replicas)同步副本都收到悄息。

max.request.size

参数说明:这个参数对于 Kafka Producer 也比较重要, 表示生产端能够发送的最大消息大小,默认值为1048576(1M)

  调优建议:这个配置对于生产环境来说有点小, **为了避免因消息过大导致发送失败,生产环境建议适当调大,比如可以调到10485760(10M)** 。

retries

参数说明:表示生产端消息发送失败时的重试次数,默认值为0,即不重试。 这个参数一般是为了解决因系统瞬时故障导致的消息发送失败,比如网络抖动、Leader 选举及重选举,其中瞬时的 Leader 重选举是比较常见的。因此这个参数的设置对于 Kafka Producer 就显得非常重要

 调优建议:这里建议设置为一个大于0的值,比如3次。

retry.backoff.ms

参数说明:**设定两次重试之间的时间间隔,避免无效的频繁重试,默认值为100, 主要跟 retries 配合使用, **在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,需要设定总的重试时间要大于异常恢复时间,避免生产者过早的放弃重试。

connections.max.idele.ms

参数说明:主要用来判断多久之后关闭空闲的链接,默认值540000(ms)即9分钟。

compression.type

参数说明: 该参数表示生产端是否要对消息进行压缩,默认值为不压缩(none)。 压缩可以显著减少网络IO传输、磁盘IO以及磁盘空间,从而提升整体吞吐量,但也是以牺牲CPU开销为代价的。

 调优建议:出于提升吞吐量的考虑,建议在生产端对消息进行压缩。**对于Kafka来说,综合考虑吞吐量与压缩比,建议选择lz4压缩。如果追求最高的压缩比则推荐zstd压缩。**

buffer.memory

参数说明: 该参数表示生产端消息缓冲池或缓冲区的大小,默认值为即33554432(32M) 。这个参数基本可以认为是 Producer 程序所使用的内存大小。

调优建议:通常我们应尽量保证生产端整体吞吐量,建议适当调大该参数,也意味着生产客户端会占用更多的内存。

batch.size

参数说明: 该参数表示发送到缓冲区中的消息会被封装成一个一个的Batch,分批次的发送到 Broker 端,默认值为16KB。 因此减小 batch 大小有利于降低消息延时,增加 batch 大小有利于提升吞吐量。

 调优建议:通常合理调大该参数值,能够显著提升生产端吞吐量,比如可以调整到32KB,调大也意味着消息会有相对较大的延时。

linger.ms

参数说明: 该参数表示用来控制 Batch 最大的空闲时间,超过该时间的 Batch 也会自动被发送到 Broker 端。 实际情况中, 这是吞吐量与延时之间的权衡。默认值为0,表示消息需要被立即发送,无需关系 batch 是否被填满。

  调优建议:通常为了减少请求次数、提升整体吞吐量,建议设置一个大于0的值,比如设置为100,此时会在负载低的情况下带来100ms的延时。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1395648.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux系统三剑客之grep和正则表达式的介绍(一)

1.正则表达式 目录 1.正则表达式 1.什么是正则表达式 &#xff1f; 2.正则表达式的使用场景 3.正则表达式字符表示 4.它们之间的区别 2.grep命令 作用&#xff1a; 语法&#xff1a; 说明&#xff1a; 选项&#xff1a;options 重点 实例 3.后面的下次再更新。 …

hanlp,pkuseg,jieba,cutword分词实践

总结&#xff1a;只有jieba,cutword,baidu lac成功将色盲色弱成功分对,这两个库字典应该是最全的 hanlp[持续更新中] https://github.com/hankcs/HanLP/blob/doc-zh/plugins/hanlp_demo/hanlp_demo/zh/tok_stl.ipynb import hanlp # hanlp.pretrained.tok.ALL # 语种见名称最…

统计学-R语言-6.3

文章目录 前言总体方差的区间估计总体方差的区间估计(一个总体方差的估计)总体方差的区间估计(两个总体方差比的估计) 总结 前言 本篇文章是最后一个介绍参数估计的章节。 总体方差的区间估计 研究一个总体时&#xff0c;推断总体方差 使用的统计量为样本方差 。研究两个总体…

MacOS受欢迎的数据库开发工具 Navicat Premium 15 中文版

Navicat Premium 15 Mac是一款数据库管理工具&#xff0c;提供了一个全面的解决方案&#xff0c;用于连接、管理和维护各种数据库系统。以下是Navicat Premium 15 Mac的一些主要功能和特点&#xff1a; 软件下载&#xff1a;Navicat Premium 15 中文版下载 多平台支持&#xff…

代码、课程、教学的一些思考-2024

1 代码、算法、艺术品 1.1 代码 最典型的C代码示例。 以下是一个简单的C代码示例&#xff0c;它打印出“Hello, World!”&#xff1a; #include <iostream> int main() { std::cout << "Hello, World!"; return 0; } 这段代码定义了一个程序&a…

2024年美国各州即将生效的新隐私保护法(上)

2024年美国各州即将生效的新隐私保护法&#xff08;上&#xff09; 文章目录 2024年美国各州即将生效的新隐私保护法&#xff08;上&#xff09;前言一、2023年隐私保护法开始生效的五个州二、2023年通过了新的隐私保护法的八个州三、2024年确定截止的州及法律法规&#xff08;…

Python GUI 新手入门教程:轻松构建图形用户界面

Python 凭借其简单性和多功能性&#xff0c;已经成为最流行的编程语言之一。被广泛应用于从 web 开发到数据科学的各个领域。 在本教程中&#xff0c;我们将探索用于创建图形用户界面&#xff08;GUIs&#xff09;的 Python 内置库&#xff1a; Tkinter&#xff1a;无论你是初…

深入学习卷积神经网络(CNN)的原理知识

在深度学习领域中&#xff0c;已经经过验证的成熟算法&#xff0c;目前主要有深度卷积网络&#xff08;DNN&#xff09;和递归网络&#xff08;RNN&#xff09;&#xff0c;在图像识别&#xff0c;视频识别&#xff0c;语音识别领域取得了巨大的成功&#xff0c;正是由于这些成…

gitlab 命令执行漏洞(CVE-2022-2992)

1.漏洞影响版本 GitLab CE/EE 中的一个漏洞影响从 11.10 开始到 15.1.6 之前的所有版本、从 15.2 开始到 15.2.4 之前的所有版本、从 15.3 开始到 15.3.2 之前的所有版本。允许经过身份验证的用户通过从 GitHub API 端点导入实现远程代码执行。 查看 gitlab 版本。(登录后才能…

keil logic analyzer使用

DARMSTM.DLL--- -pSTM32F103VC ---- TRAMSTM.DLL ------ -pSTM32F103VC 然后点击DEBUG按钮 用上面的名称USART1_SR 点击STUP 不同的引脚&#xff0c;用不同的名称&#xff0c;通过放大缩小来查看波形。当前串口用的是USART1_SR&#xff0c;只能用这个名称&#xff0c;…

海思刷机注意事项

目录 为什么写这个文档海思SOC刷机原理1.串口缓冲区关闭2.IP地址自动更改导致无法烧录3.烧录完成后找不到根文件系统4.由于电源设计不合理导致无法烧录 为什么写这个文档 海思SOC刷机时,偶然会遇到奇奇怪怪的问题,会刷机不上.现在总结一下. 海思SOC刷机原理 如果没有任何程序…

VUE--组件通信(非父子)

一、非父子通信 --- event bus 事件总线 作用&#xff1a;非父子组件之间进行简易的消息传递 步骤&#xff1a; 1、创建一个都能访问到的事件总线&#xff08;空vue实例&#xff09;--- utils/EventBus.js import Vue from vue export default new Vue({}) 2、 接收方&…

day5:IO多路复用

思维导图 TCP并发服务器的IO复用poll函数实现 #include <head.h> #define SER_PORT 8888 #define SER_IP "192.168.232.133" int main(int argc, const char *argv[]) {int sfdsocket(AF_INET,SOCK_STREAM,0);if(sfd-1){perror("sfd error:");retur…

【正点原子STM32】Cortex-M系列介绍(ARM、Cortex、DMIPS/MHz和CoreMark/MHz*)

一、ARM公司 二、Cortex内核分类及特征 三、Cortex-M3/4/7介绍 四、总结 ARM官网 ARM开发者官网 CoreMark分数 一、ARM公司 ARM架构特点 ARM&#xff08;Advanced RISC Machine&#xff09;架构是一种RISC&#xff08;Reduced Instruction Set Computing&#xff09;架构&…

【征服redis2】redis的事务介绍

目录 1.redis事务介绍 2 事务出错的处理 1.redis事务介绍 在前面我们介绍了redis的几种典型数据结构和应用&#xff0c;本文我们来看一下redis的事务问题。事务也是数据库的重要主题&#xff0c;熟悉关系型数据库的读者应该对事务比较了解&#xff0c;简单地说&#xff0c;事…

第四期——kali

文章目录 12.4kali安装内网主机发现探测 12.5nmapnccdnzenmapdocker启动redisredis命令vulhub启动redispython爆破redis密码密码攻击——九头蛇hydra密码攻击——美杜莎medusa 12.6rsyncgobyMongoDBmongodb命令hydra爆破python脚本爆破 12.7Elasticsearchpython连接esMetasploi…

系分备考计算机网络传输介质、通信方式和交换方式

文章目录 1、概述2、传输介质3、网络通信4、网络交换5、总结 1、概述 计算机网路是系统分析师考试的常考知识点&#xff0c;本篇主要记录了知识点&#xff1a;网络传输介质、网络通信和数据交换方式等。 2、传输介质 网络的传输最常见的就是网线&#xff0c;也就是双绞线&…

蓝天采集器,功能逆天的网站数据抓取神器,轻松助你成为采集达人,附带搭建配置文档

源码介绍 蓝天采集器是一款专为web服务器打造的数据采集神器。与市面上常见的桌面端采集工具&#xff08;如火车头等&#xff09;相比&#xff0c;蓝天采集器在易用性、上手成本和灵活性方面更胜一筹。它部署简便&#xff0c;无需复杂的设置&#xff0c;即可迅速融入您的web服…

ONLYOFFICE服务器无法连接,请联系管理员问题解决

1、现象 部署好了nextcloud和onlyoffice后&#xff0c;新建文本文档报错ONLYOFFICE服务器无法连接&#xff0c;请联系管理员。 用快捷键“F12”进入控制台&#xff0c;点开错误提示栏&#xff0c;找到有“api.js“文件&#xff0c;“https://ONLYOFFICED的地址/web-apps/apps/…

史上最详细的JAVA学生信息管理系统(MySQL实现)

一、项目介绍 为了巩固Java的学习写了一个基于MVC设计模式的学生管理系统。 简单介绍一下MVC设计模式&#xff1a; 1、M也就是Model 模型层&#xff08;也叫数据层&#xff09;主要是通过这个类来进行数据的操作。 2、V是Views 视图层&#xff0c;主要就是来显示页面信息。 3、…