Kafka 源码分析——Producer

news2025/2/28 6:21:29

文章目录

  • 前言
  • Producer 整体流程
  • Producer 初始化
  • Producer 发送流程
    • 执行拦截器逻辑
    • 获取集群元数据
    • 序列化
    • 选择分区
    • 消息累加进缓存
    • 消息发送
  • Producer缓冲区
  • Producer 参数调优

前言

在 Kafka 中, 把产生消息的一方称为 Producer 即 生产者,它是 Kafka 的核心组件之一, 也是消息的来源所在。它的主要功能是将客户端的请求打包封装发送到 kafka 集群的某个 Topic 的某个分区上。那么这些生产者产生的消息是怎么传到 Kafka 服务端的呢?

Producer 整体流程

Kafka一条消息发送和消费的流程

image.png

站在源码的核心角度,可以把Producer分成以下几个核心部分:

  1. Producer初始化
  2. Producer发送流程
  3. Producer缓冲区
  4. Producer参数与调优

Producer 初始化

image.png

因为源码中有非常多的一些额外处理,所以解读源码没必要每行都读,只需要根据梳理的主流程找到核心代码进行解读就可以。

设置分区器(partitioner),分区器是支持自定义的

image.png

设置重试时间

重试时间(retryBackoffMs)默认100ms。如果发送消息到broker时抛出异常,且是允许重试的异常,那么就会最大重试retries参数指定的次数,同时retryBackoffMs是重试的间隔。

image.png

设置序列化器

image.png

设置拦截器(interceptors)

image.png

拦截器一般用得不多,可以为消息统一添加字段或统计发送失败成功次数,这些逻辑会拖慢producer的消息发送效率,不推荐生产中使用。

想要实现拦截器,我们需要先实现ProducerInterceptor接口即可,然后在生产者中设置进去即可。

image.png

image.png

image.png
上图的一些设置

  1. 设置最大的消息为多大(maxRequestSize), 默认最大1M, 生产环境可以提高到10M

  2. 设置缓存大小(totalMemorySize) 默认是32M

  3. 设置压缩格式(compressionType)

  4. 初始化RecordAccumulator也就是缓冲区指定为32M

设置缓冲区

image.png

设置消息累加器

因为生产者是通过缓冲的方式发送,所以需要一个消息累加器配合才能完成消息的发送。

image.png

初始化集群元数据(metadata)

image.png

创建Sender线程

image.png

这里还初始化了一个重要的管理网路的组件 NetworkClient

image.png

KafkaThread将Sender设置为守护线程并启动

image.png

Producer 发送流程

执行拦截器逻辑

执行拦截器逻辑,预处理消息,封装 Producer Record

image.png

获取集群元数据

从 Kafka Broker 集群获取集群元数据metadata

image.png

序列化

调用Serializer.serialize()方法进行消息的key/value序列化

image.png

选择分区

调用partition()选择合适的分区策略,给消息体 Producer Record 分配要发送的 topic 分区号

image.png

消息累加进缓存

将消息缓存到RecordAccumulator 收集器中, 最后判断是否要发送。

image.png

消息发送

真正的消息发送是Sender线程来做,并且还要结合缓冲区来处理。这里我们只需要知道发送的条件:缓冲区数据大小达到 batch.size 或者 linger.ms 达到上限。

Producer缓冲区

Kafka生产者的缓冲区,也就是内存池,可以将其类比为连接池(DB, Redis),主要是避免不必要的创建连接的开销,。这样内存池可以对 RecordBatch 做到反复利用,防止引起Full GC问题。

核心就是这段代码:

image.png

image.png

Kafka 内存设计有两部分,可用的内存(未分配的内存,初始的时候是 32M)和已经被分配了的内存,每个小 Batch 是 16K,然后这一个个的 Batch 就可以被反复利用,不需要每次都申请内存, 两部分加起来是 32M。

申请内存的过程

发送流程中会把消息放入 accumulator中,即调用 accumulator.append() 追加, 然后把消息封装成一个个Batch 进行发送,然后去申请内存(free.allocate())。

image.png

  1. 如果申请的内存大小超过了整个缓存池的大小,则抛异常出来。
  2. 如果申请的大小是每个 recordBatch 的大小(16K),并且已分配内存不为空,则直接取出来一个返回。
  3. 如果整个内存池大小比要申请的内存大小大 (this.availableMemory + freeListSize >= size),则直接从可用内存申请一块内存。

Producer 参数调优

在 Kafka 实际使用中,Producer 端既要保证吞吐量,又要确保无消息丢失,一些核心参数的配置就显得至关重要。

acks

参数说明:对于 Kafka Producer 来说是一个非常重要的参数,它表示指定分区中成功写入消息的副本数量,是 Kafka 生产端消息的持久性的保证。

max.request.size

参数说明:这个参数对于 Kafka Producer 也比较重要, 表示生产端能够发送的最大消息大小,默认值为1048576(1M)

调优建议:这个配置对于生产环境来说有点小, 为了避免因消息过大导致发送失败,生产环境建议适当调大,比如可以调到10485760(10M)

retries

参数说明:表示生产端消息发送失败时的重试次数,默认值为0,即不重试。 这个参数一般是为了解决因系统瞬时故障导致的消息发送失败,比如网络抖动、Leader 选举及重选举,其中瞬时的 Leader 重选举是比较常见的。因此这个参数的设置对于 Kafka Producer 就显得非常重要

调优建议:这里建议设置为一个大于0的值,比如3次。

retry.backoff.ms

参数说明:**设定两次重试之间的时间间隔,避免无效的频繁重试,默认值为100, **主要跟 retries 配合使用, 在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,需要设定总的重试时间要大于异常恢复时间,避免生产者过早的放弃重试。

connections.max.idele.ms

参数说明:主要用来判断多久之后关闭空闲的链接,默认值540000(ms)即9分钟。

compression.type

参数说明: 该参数表示生产端是否要对消息进行压缩,默认值为不压缩(none)。 压缩可以显著减少网络IO传输、磁盘IO以及磁盘空间,从而提升整体吞吐量,但也是以牺牲CPU开销为代价的。

调优建议:出于提升吞吐量的考虑,建议在生产端对消息进行压缩。对于Kafka来说,综合考虑吞吐量与压缩比,建议选择lz4压缩。如果追求最高的压缩比则推荐zstd压缩。

buffer.memory

参数说明: 该参数表示生产端消息缓冲池或缓冲区的大小,默认值为即33554432(32M) 。这个参数基本可以认为是 Producer 程序所使用的内存大小。

调优建议:通常我们应尽量保证生产端整体吞吐量,建议适当调大该参数,也意味着生产客户端会占用更多的内存。

batch.size

参数说明: 该参数表示发送到缓冲区中的消息会被封装成一个一个的Batch,分批次的发送到 Broker 端,默认值为16KB。 因此减小 batch 大小有利于降低消息延时,增加 batch 大小有利于提升吞吐量。

调优建议:通常合理调大该参数值,能够显著提升生产端吞吐量,比如可以调整到32KB,调大也意味着消息会有相对较大的延时。

linger.ms

参数说明: 该参数表示用来控制 Batch 最大的空闲时间,超过该时间的 Batch 也会自动被发送到 Broker 端。 实际情况中, 这是吞吐量与延时之间的权衡。默认值为0,表示消息需要被立即发送,无需关系 batch 是否被填满。

调优建议:通常为了减少请求次数、提升整体吞吐量,建议设置一个大于0的值,比如设置为100,此时会在负载低的情况下带来100ms的延时。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1026980.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

uniapp 微信小程序之隐私协议开发

uniapp 微信小程序之隐私协议开发 官网通知:https://developers.weixin.qq.com/miniprogram/dev/framework/user-privacy/PrivacyAuthorize.html 1、配置 __usePrivacyCheck__: true;位置 manifest.json : "mp-weixin":{"__usePrivacyCh…

Vue基础语法【下】

目录 一、事件处理器 1.事件修饰符 .stop .prevent .capture .self .once 2.按键修饰符 .enter .tab .delete .esc .space .up .down .left .right .ctrl、.alt、.shift、.meta 二、表单赋值与取值 三、自定义组件 1.组件介绍 2.局部组件 3.全局组件 4.组件通…

idea如何关闭项目文件显示的浏览器图标

这里写自定义目录标题 1.idea经常项目文件右上角弹出图标2.setting中Tools 取消勾选浏览器 1.idea经常项目文件右上角弹出图标 2.setting中Tools 取消勾选浏览器

vue的模板语法(下篇)

目录 一.事件处理 二.表单的综合案例 三.组件通信⭐⭐ 3.1 自定义组件 3.2 组件通信之父传子 3.3组件通信之子传父 一.事件处理 Vue通过由点(.)表示的指令后缀来调用修饰符&#xff0c; .stop .prevent .capture .self .once 如下&#xff1a; 阻止单击事件冒泡 <a v-on…

Mybatis学习笔记11 缓存相关

Mybatis学习笔记10 高级映射及延迟加载_biubiubiu0706的博客-CSDN博客 缓存:cache 缓存的作用:通过减少IO的方式,来提高程序的执行效率 Mybatis的缓存:将select语句的查询结果放到缓存(内存)当中,下一次还是这条select语句的话,直接从缓存中取,不再查数据库.一方面是减少了I…

基于Android+OpenCV+CNN+Keras的智能手语数字实时翻译——深度学习算法应用(含Python、ipynb工程源码)+数据集(三)

目录 前言总体设计系统整体结构图系统流程图 运行环境模块实现1. 数据预处理2. 数据增强3. 模型构建4. 模型训练及保存1&#xff09;模型训练2&#xff09;模型保存 5. 模型评估 相关其它博客工程源代码下载其它资料下载 前言 本项目依赖于Keras深度学习模型&#xff0c;旨在对…

JavaWeb学习总结(在IntelliJ IDEA中配置使用Tomcat)

1、配置 ​​​​​​​ 在 Libray 中选 Java 选项&#xff08;也就是安装Tomcat的路径&#xff09; 如果运行时端口被占用可以修改端口 例如&#xff1a;原8080&#xff0c;可改为8081&#xff08;也可修改其他&#xff09; 2、使用Serlvet package com.company;import java…

将近 5 万字讲解 Python Django 框架详细知识点(更新中)

Django 框架基本概述 Django 是一个开源的 Web 应用后端框架&#xff0c;由 Python 编写。它采用了 MVC 的软件设计模式&#xff0c;即模型&#xff08;Model&#xff09;、视图&#xff08;View&#xff09;和控制器&#xff08;Controller&#xff09;。在 Django 框架中&am…

Oracle查询固定时间间隔

获取每一天 SELECT (trunc(to_date(2023-01-01,YYYY-MM-DD), dd) LEVEL -1) as DATA_TIME FROM dual CONNECT BY LEVEL < 3;解释&#xff1a; 这个 SQL 查询语句的目的是生成一个包含三个日期的结果集。查询的结果是从当前日期开始的三个连续日期。让我解释一下查询的各个…

ClickHouse与Elasticsearch比较总结

目录 背景 分布式架构 存储架构 写入链路设计 Elasticsearch 再谈Schemaless 查询架构 计算引擎 数据扫描 再谈高并发 性能测试 日志分析场景 access_log&#xff08;数据量197921836&#xff09; trace_log&#xff08;数据量569816761&#xff09; 官方Ontime测…

爬虫入门基础与Selenium反爬虫策略

目录 一、爬虫入门基础 1、什么是爬虫&#xff1f; 2、爬虫的分类 3、爬虫的基本流程 二、Selenium简介 1、Selenium是什么&#xff1f; 2、Selenium的用途 三、应对反爬虫的Selenium策略 1、使用代理IP 2、模拟用户行为 3、设置合理的请求间隔时间 4、随机化请求参…

社区活跃开发者 Aaron 加入 sCrypt

Aaron&#xff08;周全&#xff09;是资深的 BSV 开发者&#xff0c;前 nChain BSV 基础架构团队成员&#xff0c;也是比特币协会在中国任命的首位技术推广专家。作为 BSV 社区的活跃成员&#xff0c;他多次作为演讲者参与区块链技术会议&#xff0c;开发了 Webot 应用、Witnes…

【完美解决】GitHub连接超时问题 Recv failure: Connection was reset

问题&#xff1a; 已经开了梯子但是在Idea中使用git&#xff08;GitHub&#xff09;还是连接超时Recv failure: Connection was reset。此时需要让git走代理。 解决方案&#xff1a; 1.对右下角网络点击右键 -> 打开网络和Internet设置 2.代理 -> 查看到地址和端口号…

智能生活从这里开始:数字孪生驱动的社区

数字孪生技术&#xff0c;这个近年来备受瞩目的名词&#xff0c;正迅速渗透到社区发展领域&#xff0c;改变着我们居住的方式、管理的方式以及与周围环境互动的方式。它不仅仅是一种概念&#xff0c;更是一种变革&#xff0c;下面我们将探讨数字孪生技术如何推动社区智能化发展…

淘宝分布式文件存储系统( 二 ) -TFS

淘宝分布式文件存储系统( 二 ) ->>TFS 目录 : 大文件存储结构哈希链表的结构文件映射原理及对应的API文件映射头文件的定义 大文件存储结构 : 采用块(block)文件的形式对数据进行存储 , 分成索引块,主块 , 扩展块 。所有的小文件都是存放到主块中的 &#xff0c;扩展块…

湖南湘潭家具3D轮廓扫描测量家居三维数字化外观逆向设计-CASAIM中科广电

随着科技的不断进步&#xff0c;CASAIM三维扫描技术在各个行业中得到了广泛应用&#xff0c;家具行业也不例外。传统的家具设计和展示方式已经无法满足现代消费者的个性化、多元化需求&#xff0c;而三维扫描技术的出现为家具行业带来了新的机遇和可能性。 家具表面有雕刻图案…

Selenium和Requests搭配使用

Selenium和Requests搭配使用 前要1. CDP2. 通过requests控制浏览器2. 1 代码一2. 2 代码2 3. 通过selenium获取cookie, requests携带cookie请求 前要 之前有提过, 用selenium控制本地浏览器, 提高拟人化,但是效率比较低,今天说一种selenium和requests搭配使用的方法 注意: 一定…

企业该如何选择数字化转型工具?_光点科技

随着科技的不断进步和数字化的浪潮席卷全球&#xff0c;企业数字化转型已经成为了保持竞争力和持续增长的关键因素之一。无论企业规模大小&#xff0c;数字化转型都可以提高效率、降低成本、改善客户体验&#xff0c;从而实现更好的业务结果。然而&#xff0c;要成功进行数字化…

Unity云原生分布式运行时

// 元宇宙时代的来临对实时3D引擎提出了诸多要求&#xff0c;Unity作为游戏行业应用最广泛的3D实时内容创作引擎&#xff0c;为应对这些新挑战&#xff0c;提出了Unity云原生分布式运行时的解决方案。LiveVideoStack 2023上海站邀请到Unity中国的解决方案工程师舒润萱&#x…

iPhone辐射超标,发布三年突然禁售了

昨晚 iPhone 15 预售大家抢到了吗&#xff1f; 虽然13日发布会后大家的反应十分冷静&#xff0c;但身体还是很诚实&#xff0c;官网都排到6-7周以后了... 在大伙都争着第一波尝鲜的时候&#xff0c;有一个地方正准备禁售 iPhone 。 不用想肯定是欧盟某个国家啦&#xff0c;这…