五、Producer源码解读

news2024/10/7 2:18:30

Producer源码解读

在 Kafka 中, 我们把产生消息的一方称为 Producer 即 生产者, 它是 Kafka 的核心组件之一, 也是消息的来源所在。它的主要功能是将客户端的请求打包封装发送到 kafka 集群的某个 Topic 的某个分区上。那么这些生产者产生的消息是怎么传到 Kafka 服务端的呢?

Producer之整体流程

我们回顾一下之前我们讲过Kafka一条消息发送和消费的流程

image.png

但是站在源码的核心角度,我们可以把Producer分成以下几个核心部分:

1、Producer之初始化

2、Producer之发送流程

3、Producer之缓冲区

4、Producer之参数与调优

Producer源码解读

从生产流程可以知道,Producer里面的核心有序列化器,分区器,还有缓冲,所以初始化的流程肯定是围绕这几个核心来处理。

image.png

KafkaProducer之初始化

image.png

image.png

因为源码中有非常多的一些额外处理,所以我们解读源码没必要每行都读,只需要根据我们之前梳理的主流程找到核心代码进行解读就可以,这也是推荐大家去初次解读源码的最优方式。

1)、设置分区器

设置分区器(partitioner),分区器是支持自定义的

image.png

2)、设置重试时间

设置重试时间(retryBackoffMs)默认100ms

如果发送消息到broker时抛出异常,且是允许重试的异常,那么就会最大重试retries参数指定的次数,同时retryBackoffMs是重试的间隔。

image.png

3)、设置序列化器

设置序列化器(Serializer)

image.png

4)、设置拦截器

设置拦截器(interceptors),关于拦截器,这个后面会有讲解和介绍。

image.png

5)、设置缓冲区

image.png

在之前,还有一些参数的设置。

image.png

1、设置最大的消息为多大(maxRequestSize), 默认最大1M, 生产环境可以提高到10M

2、设置缓存大小(totalMemorySize) 默认是32M

3、设置压缩格式(compressionType)

4、初始化RecordAccumulator也就是缓冲区指定为32M

6)、设置消息累加器

因为生产者是通过缓冲的方式发送,发送的条件之前的课程讲过,所以这里需要一个消息累加器配合才能完成消息的发送。

image.png

5、初始化集群元数据(metadata),刚开始空的

image.png

6)、创建Sender线程

image.png

这里还初始化了一个重要的管理网路的组件 NetworkClient

image.png

KafkaThread将Sender设置为守护线程并启动

image.png

拦截器使用及介绍

这里讲一讲拦截器的使用和基本作用,拦截器一般用得不多,所以这里只是讲一讲案例,不推荐生产中使用。

想要实现拦截器,我们需要先实现ProducerInterceptor接口即可,然后在生产者中设置进去即可。

image.png

image.png

1、想要把发送的数据都带上时间戳image.png

2、实现统计发送消息的成功次数和失败次数

onAcknowledgement(RecordMetadata, Exception)里面,根据消息发送后返回的异常信息来判断是否发送成功。一般异常如果为空就说明发送成功了,反之就说明发送失败了。

然后定义两个变量,并根据Exception的值分别累加就可以统计到了

最后在close方法里输出两个变量的值,这样当producer发送数据结束并close后,会自动调用拦截器的close方法来输出咱们想要统计的成功和失败次数

image.png

image.png

不过这里要注意一个点:

onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很复杂的逻辑,否则会拖慢producer的消息发送效率。

3、拦截链路

image.png

拦截器链里的拦截器是按照顺序组成的,因此我们要注意前后拦截器对彼此的影响,比如这里拦截器1的onsend方法不能返回null,不然拦截器2的onsend就丢失了信息,会发生异常。

Producer之发送流程

Producer之发送流程

Kafka Producer 发送消息流程如下:

1)、执行拦截器逻辑

执行拦截器逻辑,预处理消息, 封装 Producer Record

image.png

2)、集群元数据

从 Kafka Broker 集群获取集群元数据metadata

image.png

3)、序列化

调用Serializer.serialize()方法进行消息的key/value序列化

image.png

4)、分区

调用partition()选择合适的分区策略,给消息体 Producer Record 分配要发送的 topic 分区号

image.png

5)、消息累加进缓存

将消息缓存到RecordAccumulator 收集器中, 最后判断是否要发送。

image.png

7)、消息发送

前面我们也知道真正的消息发送是Sender线程来做,并且这里还要结合缓冲区来处理。后面会对这个进行详细的讲解,这里我们只需要知道发送的条件:

批次发送的条件为:缓冲区数据大小达到 batch.size 或者 linger.ms 达到上限,哪个先达到就算哪个

Producer之缓冲区

Kafka生产者的缓冲区,也就是内存池,可以将其类比为连接池(DB, Redis),主要是避免不必要的创建连接的开销, 这样内存池可以对 RecordBatch 做到反复利用, 防止引起Full GC问题。那我们看看 Kafka 内存池是怎么设计的。

核心就是这段代码:

image.png

image.png

   Kafka 内存设计有两部分,下面的粉色的是可用的内存(未分配的内存,初始的时候是 32M),上面紫色的是已经被分配了的内存,每个小 Batch 是 16K,然后这一个个的 Batch 就可以被反复利用,不需要每次都申请内存,  两部分加起来是 32M。

申请内存的过程

从 Producer 发送流程的第6步中可以看到会把消息放入 accumulator中, 即调用 accumulator.append() 追加, 然后把消息封装成一个个Batch 进行发送, 然后去申请内存(free.allocate())

image.png

image.png

(1)如果申请的内存大小超过了整个缓存池的大小,则抛异常出来

image.png

(2)对整个方法加锁:

this.lock.lock();

(3)如果申请的大小是每个 recordBatch 的大小(16K),并且已分配内存不为空,则直接取出来一个返回。

if (size == poolableSize && !this.free.isEmpty())
    return this.free.pollFirst();

image.png

(4)如果整个内存池大小比要申请的内存大小大 (this.availableMemory + freeListSize >= size),则直接从可用内存(即上图粉色的区域)申请一块内存。并且可用内存要去掉申请的那一块内存。

image.png

Sender线程

image.png

Producer之参数调优

     我们知道在 Kafka 实际使用中,Producer 端既要保证吞吐量,又要确保无消息丢失,一些核心参数的配置就显得至关重要。接下来我们就来看看生产端都有哪些重要的参数,及调优建议。

acks

参数说明:对于 Kafka Producer 来说是一个非常重要的参数,它表示指定分区中成功写入消息的副本数量,是 Kafka 生产端消息的持久性的保证, 详细可以查看

max.request.size

参数说明:这个参数对于 Kafka Producer 也比较重要, 表示生产端能够发送的最大消息大小,默认值为1048576(1M)

  调优建议:这个配置对于生产环境来说有点小, **为了避免因消息过大导致发送失败,生产环境建议适当调大,比如可以调到10485760(10M)** 。

retries

参数说明:表示生产端消息发送失败时的重试次数,默认值为0,即不重试。 这个参数一般是为了解决因系统瞬时故障导致的消息发送失败,比如网络抖动、Leader 选举及重选举,其中瞬时的 Leader 重选举是比较常见的。因此这个参数的设置对于 Kafka Producer 就显得非常重要

 调优建议:这里建议设置为一个大于0的值,比如3次。

retry.backoff.ms

参数说明:**设定两次重试之间的时间间隔,避免无效的频繁重试,默认值为100, ****主要跟 retries 配合使用, **在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,需要设定总的重试时间要大于异常恢复时间,避免生产者过早的放弃重试。

connections.max.idele.ms

参数说明:主要用来判断多久之后关闭空闲的链接,默认值540000(ms)即9分钟。

compression.type

参数说明: 该参数表示生产端是否要对消息进行压缩,默认值为不压缩(none)。 压缩可以显著减少网络IO传输、磁盘IO以及磁盘空间,从而提升整体吞吐量,但也是以牺牲CPU开销为代价的。

 调优建议:出于提升吞吐量的考虑,建议在生产端对消息进行压缩。**对于Kafka来说,综合考虑吞吐量与压缩比,建议选择lz4压缩。如果追求最高的压缩比则推荐zstd压缩。**

buffer.memory

参数说明: 该参数表示生产端消息缓冲池或缓冲区的大小,默认值为即33554432(32M) 。这个参数基本可以认为是 Producer 程序所使用的内存大小。

调优建议:通常我们应尽量保证生产端整体吞吐量,建议适当调大该参数,也意味着生产客户端会占用更多的内存。

batch.size

参数说明: 该参数表示发送到缓冲区中的消息会被封装成一个一个的Batch,分批次的发送到 Broker 端,默认值为16KB。 因此减小 batch 大小有利于降低消息延时,增加 batch 大小有利于提升吞吐量。

 调优建议:通常合理调大该参数值,能够显著提升生产端吞吐量,比如可以调整到32KB,调大也意味着消息会有相对较大的延时。

linger.ms

参数说明: 该参数表示用来控制 Batch 最大的空闲时间,超过该时间的 Batch 也会自动被发送到 Broker 端。 实际情况中, 这是吞吐量与延时之间的权衡。默认值为0,表示消息需要被立即发送,无需关系 batch 是否被填满。

  调优建议:通常为了减少请求次数、提升整体吞吐量,建议设置一个大于0的值,比如设置为100,此时会在负载低的情况下带来100ms的延时。  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/665850.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

二维码带参数跳转小程序页面,小程序获取参数。

功能介绍 普通链接二维码,是指开发者使用工具对网页链接进行编码后生成的二维码。 线下商户可不需更换线下二维码,在小程序后台完成配置后,即可在用户扫描普通链接二维码时打开小程序,使用小程序的功能。 对于普通链接二维码&a…

5.4.2 网络地址转换NAT

5.4.2 网络地址转换NAT 我们知道为了缓解IPv4地址紧缺的问题,相继出现了一系列缓解地址耗尽的解决方案,比如通过子网划分(5.2.8 子网编址)实现网络地址在多个物理网络之间的复用,通过无分类编址(5.2.9 无分…

纳斯达克大屏宣传品牌的价值何在?媒介易解析背后的优势!

在当今竞争激烈的商业环境中,企业需要不断寻找创新的方式来宣传和推广品牌。而纳斯达克大屏作为全球最具规模和影响力的数字广告媒体之一,其庞大的电子屏幕成为企业宣传品牌和增加曝光度的理想平台。为什么企业选择在纳斯达克大屏宣传品牌?一…

SpringBoot中接收POST参数的几种方式

今天在做一个vue前后端分离项目的过程中,踩了一个坑,记录一下 前端如下: 用户名字段:username 密码字段:password 提交后,发现后端怎么也收不到参数,总结如下: 常见的接收post参…

【云计算 | Azure】微软 Azure 基础解析(九)Azure 标识、身份管理、Azure AD 的功能与用途

本系列博文还在更新中,收录在专栏:「Azure探秘:构建云计算世界」 专栏中。 本系列文章列表如下: 【Azure】微软 Azure 基础解析(三)云计算运营中的 CapEx 与 OpEx,如何区分 CapEx 与 OpEx 【A…

12. WebGPU 矩阵数学

在最近的 3 篇文章中,介绍了如何平移、旋转和缩放顶点位置。平移、旋转和缩放都被认为是一种变换。这些变换中的每一个都需要对着色器进行修改,并且 3 个转换中的每一个都依赖于顺序。 在之前的示例中,先缩放,然后旋转&#xff0…

第十章 STM32+ESP8266接入机智云 实现小型IOT智能家居项目

前言 最近有不少小伙伴私信留言,想要我推出一章能够通过APP进行远程控制并获取传感器信息的实验教程。说实话在嵌入式毕设里边,这算是中等偏上水平的了。刚好我也有兴趣写写。全篇4700多字,我写的很详细,按着文章一步一步操作即可…

arm64架构的linux中断分析(二)

文章目录 3. GICv3中断控制器3.1 GICv3中断控制器设备树3.2 GICv3中断控制器驱动 3. GICv3中断控制器 gic在soc中的位置如下: GICv3(Generic Interrupt Controller Version 3)是一种基于ARM Cortex-A架构的中断控制器,它提供了…

联邦元学习综述

联邦元学习综述 张传尧1,2, 司世景1, 王健宗1,肖京1 1 平安科技(深圳)有限公司,广东 深圳 518063 2 中国科学技术大学,安徽 合肥 230026 摘要:随着移动设备的普及,海量的数据在不断产生。数据隐…

express的使用(六) 中间件的理解

原文链接 express的使用(六) 中间件的理解") 不要脸的求关注,希望能让大家批评我的不足点,方便学习,一键三连最好不过了~另外,乌鸦玩心之钢是真的爽! 看前提示 本篇主要讲的是关于express中间件的一些基础概念…

商品支付金额篡改测试-业务安全测试实操(16)

商品支付金额篡改测试,商品订购数量篡改测试 商品支付金额篡改测试 测试原理和方法 电商类网站在业务流程整个环节,需要对业务数据的完整性和一致性进行保护,特别是确保在用户客户端与服务、业务系统接口之间的数据传输的一致性,通常在订购类交易流程中,容易出现服务器端未…

如何在纺织服装行业运用IPD?

纺织服装行业是我国的传统支柱产业,对促进国民经济发展、解决就业、增加国民收入、促进社会和谐发展等方面具有十分重要的意义。纺织服装行业属于劳动密集型产业,产业链上下游关联度较大。产业链上游原材料主要包括棉花、麻、蚕茧丝等天然纤维以及人造纤…

艾默生CE4001S2T2B4控制模块

​ 艾默生CE4001S2T2B4控制模块 艾默生CE4001S2T2B4控制模块 集散控制系统简称dcs,也可直译为“分散控制系统”或“分布式计算机控制系统”。它采用控制分散、操作和管理集中的基本设计思想,采用多层分级、合作自治的结构形式。其主要特征是它的集中管理…

spring boot 校运会赛事管理系统-计算机毕设 附源码87890

spring boot校运会赛事管理系统 摘 要 科技进步的飞速发展引起人们日常生活的巨大变化,电子信息技术的飞速发展使得电子信息技术的各个领域的应用水平得到普及和应用。信息时代的到来已成为不可阻挡的时尚潮流,人类发展的历史正进入一个新时代。在现实运…

TC8:ICMPv4_TYPE_18-22

ICMPv4_TYPE_18: Send ICMP Destination Unreachable for unknown protocol 目的 主机收到IP Header中协议字段不支持的IP数据包时,回复ICMP目的不可达报文(未知协议) 协议不可达报文的Type为3,Code为2 测试步骤 Tester:发送一条IP报文,其中协议字段值为无效值DUT:发送…

MySQL数据库基础 14

第十四章 视图 1. 常见的数据库对象2. 视图概述2.1 为什么使用视图?2.2 视图的理解 3. 创建视图3.1 创建单表视图3.2 创建多表联合视图3.3 基于视图创建视图 4. 查看视图5. 更新视图的数据5.1 一般情况5.2 不可更新的视图 6. 修改、删除视图6.1 修改视图6.2 删除视图…

软考高级系统架构设计师(三) 基础知识之操作系统2(分页/分段/段页存储)

目录 存储管理 页式存储 段式存储 段页式存储 存储管理 存储管理的主要目的:解决多个用户共同使用主存的问题(怎么分配内存??) 主要包括分区存储管理、分页存储管理、分段存储器管理、段页式存储管理以及虚拟存储…

eNSP中对NAT的配置(网络地址转换)

地址转换是把局域网的私有地址转换为公有地址,如果想要上外网,而公有地址是有限的,则需要将私有地址转换成公有地址,用端口号进行区分。 一.基本原理 NAT是改变IP报文中的源或目的地址的一种处理方式;让局域网用户…

清微智能TX5368A与飞桨完成Ⅱ级兼容性测试,助力全行业智能化升级

近日,清微智能的高性能视觉芯片TX5368A与飞桨完成Ⅱ级兼容性测试(基于Paddle2ONNX工具)。测试结果显示,双方兼容性表现良好,整体运行稳定。这是清微智能加入“硬件生态共创计划”后取得的又一阶段性成果。 产品兼容性证…

差分信号隔离放大变送模块光电转换器0-10mV/0-20mV/0-±10mV/0-±20mV转0-5V/0-10V/4-20mA

概述: DIN11 IPO 压力应变桥信号处理系列隔离放大器是一种将差分输入信号隔离放大、转换成按比例输出的直流信号导轨安装变送模块。产品广泛应用在电力、远程监控、仪器仪表、医疗设备、工业自控等行业。此系列模块内部嵌入了一个高效微功率的电源,向输…