Kakfa - Producer机制原理与调优

news2025/1/10 23:26:05

Producer是Kakfa模型中生产者组件,也就是Kafka架构中数据的生产来源,虽然其整体是比较简单的组件,但依然有很多细节需要细品一番。比如Kafka的Producer实现原理是什么,怎么发送的消息?IO通讯模型是什么?在实际工作中,怎么调优来实现高效性?

简单的生产者程序:

一、客户端初始化  KafkaProducer

new KafkaProducer() 是Producer初始化过程,比如Interceptor、Serializer、Partitioner、RecordAccumulator等。当我们使用KafkaProducer发送消息的时候,消息会经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner),最后会暂存到消息收集器(RecordAccumulator)中,最终读取按批次发送。

以下跟踪比较核心的机制流程:

1、 初始化RecordAccumulator记录累加器

简单介绍:RecordAccumulator可以理解为Producer发送数据缓冲区,Producer数据发送时并不会直接连接Broker后,一条一条的发送,而是会将数据(Record)放入RecordAccumulator中按批次发送。

2、初始化Sender的Iothread,在Producer在初始化过程中,会额外的创建一个ioThread。

二、Send方法

到此位置Kafka只是做了一些初始化的工作,没有与kafka集群建立连接,更没有相关元数据信息。那继续看send中的doSend方法。

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;

        try {
            
            try {
               // waitOnMetadata更新元数据
                clusterAndWaitTime = this.waitOnMetadata(record.topic(),record.partition(), this.maxBlockTimeMs);
            } catch (KafkaException var19) {
             
            Cluster cluster = clusterAndWaitTime.cluster;
            ........
            byte[] serializedKey;
            try {
            // 序列化
             serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException var18) {
            ........

            byte[] serializedValue;
            try {
                serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } 
            // 获取元数据中partition信息
            int partition = this.partition(record, serializedKey, serializedValue, cluster);
            // ..........
            // 数据append到accumulator中
            RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);
           ..........

            return result.future;

1、WaitonMetadat元数据更新

方法内部会优先判断当前Cluster是否存在元数据partitions,如果不存在意味着还没有建立连接获取元数据,此时它会wakeup唤醒sender线程。

注意:此时Cluster并不是完全时空的,它已经有指定的Node列表信息。

在早期版本的时候元数据是存储在zookeeper中的, 元数据指的是集群中分区信息、节点信息、以及节点、主题、分区的映射关系等。在生产者启动的时候没有元数据的支撑,是无法进行数据的发送的,等于瞎子。但是zookeeper存储元数据,在并发场景下会对zookeeper产生网卡压力,那就意味着要保障Kakfa可靠性的前提就要保障zookeeper的可靠性。

所以在1.0版本之后,Kafka将元数据维护在了Broker节点中。Producer可以通过Borker获取元数据,减少对zookeeper的依赖。只有一些核心的内容交给zookeeper做分布式协调。

2、Sender线程run方法

Sender线程中run方法,一个while(runing),这是一中Loop过程一种常见的响应式编程方式,比如Redis服务中也是一种EventLoop事件轮询过程。

其内部核心方法NetWorkClient.poll实现了客户端连接、数据发送、事件处理工作。

metadataUpdater.maybeUpdate方法在第一次被执行时,因为没有元数据节点信息,会执行this.maybeUpdate(now, node)方法,方法内部实现了initiateConnect方法用于客户端建立连接,其底层就是使用的Java Nio的Selector多路复用器。

建立连接之后,nioSelector.select()等待事件响应。

之后触发handleCompletedReceives处理器进行元数据同步过程。

注意: 在完成元数据更新以后,metadata.update会调用 this.notifyAll(),唤醒阻塞的main线程,进行数据发送工作。

到此为止主线程waitOnMetadata方法完成元数据的更新。

之后main就开始处理Serializer序列化,获取partition元数据信息,以及数据发送工作。

3、RecordAccumulator 记录累加器

生产者在发送数据时,并不是建立连接后每消息发送的,而是会将消息按批次发送。RecordAccumulator 对象中batches会为每一个TopicPartition维护一个双端队列。用于缓存record数据。

ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches

TopicPartition 主题分区:缓冲区按照主题分为不同的双端队列

Deque<ProducerBatch> 双端队列,ProducerBatch:一批次的数据(多个数据,默认容量16k)

结构如下:

生产者在往batches中添加数据时,使用了Sychronized,所以Producer在多线程场景下是线程安全的。

为什么要有RecordAccumulator ?

RecordAccumulator的主要作用是暂存Main Thread发送过来的消息,然后Sender Thread就可以从RecordAccumulator中批量的获取到消息,减少单个消息获取的请求次数,减少网卡IO压力,提升性能效率。

相关参数配置以及调优点:

1、RecordAccumulator buffer.memory默认大小32mb

指每一个new KafkaProducer中RecordAccumulator的batches所有承载的最大buffer.memory=32mb。

设置方式:properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 3210241024);

如果RecordAccumulator 缓存空间满时,会进行阻塞,等待数据被消费,如果指定时间内消息没有发送除去,即仍然是满状态,则抛出异常,默认60s。

设置方式:properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60*1000);

优化点: 根据业务需求,如果TopicPartition较多,而数据量很大时,这是及时单个TopicPartition中batche很少,可能总的容量也超过32mb,这时可以扩大buffer_memery大小。

2、Kafka中单个batche大小默认为16k。

指每个batche大小为16k。batche用于存储数据Record。

如果record < 16k,则batche可以存储多个数据,此时batche空间是会被重复利用的。

如果record > 16k,则当前record会额外申请存储空间,使用完后销毁。

优化点:batche大小需要根据业务评估,不要有过多大record存在,确保每一个batche可以容纳record,尽量减少内存空间的频繁申请和销毁,以及内存碎片化。

 3、同步阻塞和非阻塞的选择

RecordAccumulator用于支持分批次发送数据。在KafkaProducer中send方法是异步接口,通过 send.get()方法可以使其阻塞,等待数据返回。

实现同步的发送数据,需要等待kafka接收了record后响应,producer才会进行下一个record发送。此时虽然会有更高一致性,但RecordAccumulator就失去了意义

非阻塞send情况下,当生产和消费端IO不对称时,可以通过LINGRE_MS_CONFG 30 来要求sender线程每次拉取RecordAccumulator中数据时等待一段时间再拉取,尽量确保按批次拉取,减少更多的网络IO。

设置方式:properties.put(ProducerConfig.LINGRE_MS_C0NFG , 0);

继续内容分析

到此当Main线程将数据append到RecordAccumulator容器后,其核心的工作就结束了,此时它也会调用sender.wakeup,告知已经有数据需要处理了,并确保sender线程不会select阻塞住。

Sender线程是一个Loop过程,在发送数据过程中,会从RecordAccumulator中拉取批次数据进行打包发送,并不是一个个batche发送。默认封装的包大小为1mb。

设置方式:pp.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG , String.valueOf(1 * 1024 * 1024))

Sender线程在真正发送数据前,还额外存储了Request数据到InFilgntRequest(飞行中的包),InFilgntRequest 默认大小为5,意思是指生产者向kafka发送5个包request后,都没有回应时,则停止发送变成阻塞状态。

这种设计在同步发送过程没有作用的,因为同步过程是每请求返回的。

SEND_BUFFER_CONFIG 发送缓冲区配置、RECEIVE_BUFFER_CONFIG 接收缓冲区配置,这两个就是IO层的缓冲区配置了,不同的操作系统可能不一样。设置成-1,代表默认使用系统分配大小。

pp.setProperty(ProducerConfig.SEND_BUFFER_CONFIG , String.valueOf(32 * 1024));
pp.setProperty(ProducerConfig.RECEIVE_BUFFER_CONFIG , String.valueOf(32 * 1024));

查看内核默认配置:

到此Producer整个数据发送流程机制就清楚了,Ack的设定涉及到Broker数据同步和Consumer消费状态,这块单独再进行分析。

总结一下:

1、Producer的实现是由Main线程和Sender线程组合完成的。

Main线程核心完成了数据的输入、Producer初始化和数据append到RecordAccumulator工作,具体的元数据的更新、数据发送等IO操作都是都Sender线程完成。

Sender线程工作模式是中间件中比较常见的响应式编程模式。其在Loop过程中进行客户端连接、元数据更新、数据打包发送等工作。

2、Kakfa中IO操作封装了Java中Nio的实现(Selector),底层是多路复用器的实现,而不是netty。

3、Producer发送数据过程并不是简单的一条一条数据发送,其内部封装RecordAccumulator、Batche、Request包,可以实现按批次发送数据,减少IO次数。同时结合FilghtRequest飞行中请求大小限制,确保kafka未正常响应时,抛出异常防止数据丢失。在开发过程中,可以通过调整参数,来达到优化目的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1017399.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

对Docker的认识和总结

Docker简介 Docker 是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的镜像中&#xff0c;然后发布到任何流行的 Linux或Windows操作系统的机器上&#xff0c;也可以实现虚拟化。容器是完全使用沙箱机制&#xff0c;相互之间不会有任何接…

数据结构入门 — 二叉树的概念、性质及结构

本文属于数据结构专栏文章&#xff0c;适合数据结构入门者学习&#xff0c;涵盖数据结构基础的知识和内容体系&#xff0c;文章在介绍数据结构时会配合上动图演示&#xff0c;方便初学者在学习数据结构时理解和学习&#xff0c;了解数据结构系列专栏点击下方链接。 博客主页&am…

学习记忆——英语——字母编码

字母编码表 A&#xff1a;苹果 &#xff1b; B&#xff1a;一支笔或者小男孩boy &#xff1b; C&#xff1a;月亮或者镰刀 &#xff1b; D&#xff1a;笛子或者弟弟或者狗dog &#xff1b; E&#xff1a;大白鹅 &#xff1b; F&#xff1a;斧头 &#xff1b; G&#xff1a;鸽子…

Python:安装Flask web框架hello world示例

安装easy_install pip install distribute 安装pip easy_install pip 安装 virtualenv pip install virtualenv 激活Flask pip install Flask 创建web页面demo.py from flask import Flask app Flask(__name__)app.route(/) def hello_world():return Hello World! 2023if _…

Spring注解家族介绍: @RequestMapping

前言&#xff1a; 今天我们来介绍RequestMapping这个注解&#xff0c;这个注解的内容相对来讲比较少&#xff0c;篇幅会比较短。 目录 前言&#xff1a; RequestMapping 应用场景&#xff1a; 总结&#xff1a; RequestMapping RequestMapping 是一个用于映射 HTTP 请求…

[Linux打怪升级之路]-缓冲区

前言 作者&#xff1a;小蜗牛向前冲 名言&#xff1a;我可以接受失败&#xff0c;但我不能接受放弃 如果觉的博主的文章还不错的话&#xff0c;还请点赞&#xff0c;收藏&#xff0c;关注&#x1f440;支持博主。如果发现有问题的地方欢迎❀大家在评论区指正 本期学习目标&…

SpringCloud Ribbon--负载均衡 原理及应用实例

&#x1f600;前言 本篇博文是关于SpringCloud Ribbon的基本介绍&#xff0c;希望你能够喜欢 &#x1f3e0;个人主页&#xff1a;晨犀主页 &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是晨犀&#xff0c;希望我的文章可以帮助到大家&#xff0c;您的满意是我的动力…

深入理解线程安全

引言&#xff1a; 在多线程编程中&#xff0c;线程安全是一个至关重要的概念。线程安全可能到导致数据不一致&#xff0c;应用程序崩溃和其他不可预测的后果。本文将深入探讨线程安全问题的根本原因&#xff0c;并通过Java代码示例演示如何解决这些问题。 线程安全的根本原因 …

element plus Infinite Scroll 无限滚动

欢迎关注我的公众号&#xff1a;夜说猫&#xff0c;让一个贫穷的程序员不靠打代码也能吃饭~ element plus官网中&#xff0c;Infinite Scroll示例使用的是数字&#xff0c;在实际项目运用中&#xff0c;我们更多的是使用json数组进行渲染&#xff0c;所以我们改写v-infinite-sc…

Visual Studio2019报错

1- Visual Studio2019报错 错误 MSB8036 找不到 Windows SDK 版本 10.0.19041.0的解决方法 小伙伴们在更新到Visual Studio2019后编译项目时可能遇到过这个错误&#xff1a;“ 错误 MSB8036 找不到 Windows SDK 版本 10.0.19041.0的解决方法”&#xff0c;但是我们明明安装了该…

网络安全攻防对抗之隐藏通信隧道技术整理

完成内网信息收集工作后&#xff0c;渗透测试人员需要判断流量是否出得去、进得来。隐藏通信隧道技术常用于在访问受限的网络环境中追踪数据流向和在非受信任的网络中实现安全的数据传输。 一、隐藏通信隧道基础知识 &#xff08;一&#xff09;隐藏通信隧道概述 一般的网络通…

Python图像融合处理和 ROI 区域绘制基础

文章目录 一、图像融合二、图像 ROI 区域定位三、图像属性3.1 shape3.2 size3.3 dtype四、图像通道分离及合并4.1、split()函数4.2 merge()函数五、图像类型转换一、图像融合 图像融合通常是指多张图像的信息进行融合,从而获得信息更丰富的结果,能够帮助人们观察或计算机处理…

微服务保护-隔离

个人名片&#xff1a; 博主&#xff1a;酒徒ᝰ. 个人简介&#xff1a;沉醉在酒中&#xff0c;借着一股酒劲&#xff0c;去拼搏一个未来。 本篇励志&#xff1a;三人行&#xff0c;必有我师焉。 本项目基于B站黑马程序员Java《SpringCloud微服务技术栈》&#xff0c;SpringCloud…

SOAP WebService 发布服务成功,但是访问404

原因 我这里是出在路由问题&#xff0c;因为一般我们都会配置WebServiceConfig&#xff0c;WebServiceConfig里又会定义ServletRegistrationBean&#xff0c;用于将一个Servlet注册到Web应用程序中&#xff0c;这里会配置上路径&#xff0c;如下&#xff1a; 但是项目有可能在…

再战SDRAM与资料整理。

总之只要阅读操作手册&#xff0c;按照时序来&#xff0c;完全不难&#xff01; 器件记录&#xff1a; 小梅哥AC620上SDRAM&#xff1a;M12L2561616A-6TG2T 其的存储空间为16M*16256MB&#xff0c;第二行的数字则与其速度等级有关&#xff1b;其分为&#xff1a; 4bank*16bit…

NLP(6)--Diffusion Model

目录 一、Flow-Based General Model 1、概述 2、函数映射关系 3、Coupling Layer 4、Glow 二、Diffusion Model 1、概述 2、前向过程 3、反向过程 4、训练获得噪声估计模型 5、生成图片 三、马尔科夫链 一、Flow-Based General Model 1、概述 Flow-Based General…

C 通过宏定义重定义malloc - free,预防内存泄露

系列文章目录 C模版基础 文章目录 目录 代码地址 相关说明 使用案例 代码地址 GitHub - CHENLitterWhite/CPPWheel: CPP自封装的库 /* * 作者: 干饭小白 * 时间: 2023-09-25 16:00:00:00 * * 说明: * 只能检测 malloc 和 free,无法检测 new delete */ #pra…

[Linux入门]---Linux指令②

文章目录 Linux系统常用指令1.man指令2.echo3.cp指令&#xff08;重要&#xff09;4.mv指令&#xff08;重要&#xff09;&#xff1a;5.alias指令6.cat指令7.more指令8.less指令&#xff08;重要&#xff09;9.head指令10.tail指令11.时间相关的指令1.在显示方面2.在设定时间方…

Redis环境配置

【Redis解压即可】链接&#xff1a;https://pan.baidu.com/s/1y4xVLF8-8PI8qrczbxde9w?pwd0122 提取码&#xff1a;0122 【Redis桌面工具】 链接&#xff1a;https://pan.baidu.com/s/1IlsUy9sMfh95dQPeeM_1Qg?pwd0122 提取码&#xff1a;0122 Redis安装步骤 1.先打开Redis…

OpenAI开发系列(二):大语言模型发展史及Transformer架构详解

全文共1.8w余字&#xff0c;预计阅读时间约60分钟 | 满满干货&#xff0c;建议收藏&#xff01; 一、介绍 在2020年秋季&#xff0c;GPT-3因其在社交媒体上病毒式的传播而引发了广泛关注。这款拥有超过1.75亿参数和每秒运行成本达到100万美元的大型语言模型&#xff08;Large …