Kafka-生产者

news2025/1/10 10:26:49

Kafka在实际应用中,经常被用作高性能、可扩展的消息中间件。

Kafka自定义了一套网络协议,只要遵守这套协议的格式,就可以向Kafka发送消息,也可以从Kafka中拉取消息。

在实践生产过程中,一套API封装良好、灵活易用的客户端可以避免开发人员重复劳动,提高开发效率,也可以提高程序的健壮性和可靠性。

Kafka提供了Java版本的生产者的实现——KafkaProducer,使用KafkaProducer的API可以轻松实现同步/异步发送消息、批量发送、超时重发等复杂的功能,在业务模块向Kafka写入消息时,KafkaProducer就显得必不可少。

现在,Kafka的爱好者已经使用多种语言(诸如C++、Java、Python、Go等)实现了Kafka的客户端。

如果读者使用其他语言,可以到Kafka官方网站的wiki(https://cwiki.apache.org/confluence/display/KAFKA/Clients)查找相关资料。

在Kafka core模块的kafka.producer包中,新版本的生产者客户端实现KafkaProducer(Java实现)在Kafka clients模块的org.apache.kafka.clients.producer包中。

KafkaProducer分析

在图中简略描述了KafkaProducer发送消息的整个流程。

在这里插入图片描述
下面简述图中每个步骤的操作:

  1. Producerlnterceptors对消息进行拦截。
  2. Serializer对消息的key和value进行序列化。
  3. Partitioner为消息选择合适的Partition。
  4. RecordAccumulator收集消息,实现批量发送。
  5. Sender从RecordAccumulator获取消息。
  6. 构造ClientRequest。
  7. 将ClientRequest交给NetworkClient,准备发送。
  8. NetworkClient将请求放入KafkaChannel的缓存。
  9. 执行网络I/O,发送请求。
  10. 收到响应,调用ClientRequest的回调函数。
  11. 调用RecordBatch的回调函数,最终调用每个消息上注册的回调函数。

消息发送的过程中,涉及两个线程协同工作。主线程首先将业务数据封装成ProducerRecord对象,之后调用send方法将消息放入RecordAccumulator(消息收集器,也可以理解为主线程与Sender线程之间的缓冲区)中暂存。

Sender线程负责将消息信息构成请求,并最终执行网络IVO的线程,它从RecordAccumulator中取出消息并批量发送出去。

需要注意的是,KafkaProducer是线程安全的,多个线程间可以共享使用同一个KafkaProducer对象。

KafkaProducer实现了Producer接口,在Producer接口中定义KafkaProducer对外提供的API,分为四类方法。

  • send方法:发送消息,实际是将消息放入RecordAccumulator暂存,等待发送。
  • flush方法:刷新操作,等待RecordAccumulator中所有消息发送完成,在刷新完成之前会阻塞调用的线程。
  • partitionsFor方法:在KafkaProducer中维护了一个Metadata对象用于存储Kafka集群的元数据,Metadata中的元数据会定期更新。partitionsFor方法负责从Metadata中获取指定Topic中的分区信息。
  • close方法:关闭此Producer对象,主要操作是设置close标志,等待RecordAccumulator中的消息清空,关闭Sender线程。
    还有一个metrics方法,用于记录统计信息,与消息发送的流程无关,我们不做详细分析。
    了解了Producer接口的功能之后,我们下面就来分析KafkaProducer的具体实现。

首先,介绍KafkaProducer中比较重要的字段,在后面分析过程中,会逐个进行分析,如图所示。

在这里插入图片描述

  • PRODUCER_CLIENT_ID_SEQUENCE:clientld的生成器,如果没有明确指定client的Id,则使用字段生成一个ID。
  • clientld:此生产者的唯一标识。
  • partitioner:分区选择器,根据一定的策略,将消息路由到合适的分区。
  • maxRequestSize:消息的最大长度,这个长度包含了消息头、序列化后的key和序列化后的value的长度。
  • totalMemorySize:发送单个消息的缓冲区大小。
  • accumulator:RecordAccumulator,用于收集并缓存消息,等待Sender线程发送。
  • sender:发送消息的Sender任务,实现了Runnable接口,在ioThread线程中执行。
  • ioThread:执行Sender任务发送消息的线程,称为“Sender线程”。
  • compressionType:压缩算法,可选项有none、gzip、snappy、lz4。这是针对RecordAccumulator中多条消息进行的压缩,所以消息越多,压缩效果越好。
  • keySerializer:key的序列化器。
  • valueSerializer:value的序列化器。
  • Metadata metadata:整个Kafka集群的元数据。
  • maxBlockTimeMs:等待更新Kafka集群元数据的最大时长。
  • requestTimeoutMs:消息的超时时间,也就是从消息发送到收到ACK响应的最长时长。
  • interceptors:Producerlnterceptor集合,Producerlnterceptor可以在消息发送之前对其进行拦截或修改;也可以先于用户的Callback,对ACK响应进行预处理。
  • producerConfig:配置对象,使用反射初始化KafkaProducer配置的相对对象。

KafkaProducer构造完成之后,我们来关注KafkaProducer的send方法。图展示了整个send方法的调用流程。

在这里插入图片描述

Producerlnterceptors&Producerlnterceptor

Producerlnterceptors是一个Producerlnterceptor集合,其onSend方法、onAcknowledgement方法、onSendEror方法,实际上是循环调用其封装的Producerlnterceptor集合的对应方法。

Producerlnterceptor对象可以在消息发送之前对其进行拦截或修改,也可以先于用户的Callback,对ACK响应进行预处理。

如果熟悉Java Web开发,可以将其与Filter的功能做类比。

如果要使用自定义Producerlnterceptor类,只要实现Producerlnterceptor接口,创建其对象并添加到Producerlnterceptors中即可。

Producerlnterceptors与ProducerInterceptor之间的关系如图所示。

在这里插入图片描述

Kafka集群元数据

每个Topic中有多个分区,这些分区的Leader副本可以分配在集群中不同的Broker上。

我们站在生产者的角度来看,分区的数量以及Leader副本的分布是动态变化的。

通过简单的示例说明这种动态变化:在运行过程中,Leader副本随时都有可能出现故障进而导致Leader副本的重新选举,新的Leader副本会在其他Broker上继续对外提供服务。

当需要提高某Topic的并行处理消息的能力时,我们可以通过增加其分区的数量来实现。

当然,还有别的方式导致这种动态变化,例如,手动触发“优先副本”选举等。

我们创建的ProducerRecord中只指定了Topic的名称,并未明确指定分区编号。

KafkaProducer要将此消息追加到指定Topic的某个分区的Leader副本中,首先需要知道Topic的分区数量,经过路由后确定目标分区,之后KafkaProducer需要知道目标分区的Leader副本所在服务器的地址、端口等信息,才能建立连接,将消息发送到Kafka中。

因此,在KafkaProducer中维护了Kafka集群的元数据,这些元数据记录了:某个Topic中有哪几个分区,每个分区的Leader副本分配哪个节点上,Follower副本分配哪些节点上,哪些副本在ISR集合中以及这些节点的网络地址、端口。

在KafkaProducer中,使用Node、TopicPartition、PartitionInfo这三个类封装了Kafka集群的相关元数据,其主要字段如图所示。

在这里插入图片描述

  • Node表示集群中的一个节点,Node记录这个节点的host、ip、port等信息。
  • TopicPartition表示某Topic的一个分区,其中的topic字段是Topic的名称,partition字段则此分区在Topic中的分区编号(ID)。
  • PartitionInfo表示一个分区的详细信息。其中topic字段和partition字段的含义与TopicPartition中的相同,除此之外,leader字段记录了Leader副本所在节点的id,replica字段记录了全部副本所在的节点信息,inSyncReplicas字段记录了ISR集合中所有副本所在的节点信息。

通过这三个类的组合,我们可以完整表示出KafkaProducer需要的集群元数据。

这些元数据保存在了Cluster这个类中,并按照不同的映射方式进行存放,方便查询。Cluster类的核心字段如图所示。

在这里插入图片描述

  • nodes:Kafka集群中节点信息列表。
  • nodesById:Brokerld与Node节点之间对应关系,方便按照Brokerld进行索引。
  • partitionsBy TopicPartition:记录了TopicPartition与PartitionInfo的映射关系。
  • partitionsByTopic:记录了Topic名称和Partitionlnfo的映射关系,可以按照Topic名称查询其中全部分区的详细信息。
  • availablePartitionsByTopic:Topic与Partitionlnfo的映射关系,这里的List中存放的分区必须是有Leader副本的Partition,而partitionsByTopic中记录的分区则不一定有Leader副本,因为某些中间状态,例如Leader副本宕机而触发的选举过程中,分区不一定有Leader副本。
  • partitionsByNode:记录了Node与PartitionInfo的映射关系,可以按照节点Id查询其上分布的全部分区的详细信息。

Metadata中封装了Cluster对象,并保存Cluster数据的最后更新时间、版本号(version)、是否需要更新等待信息,如图所示。

在这里插入图片描述

  • topics:记录了当前已知的所有topic,在cluster字段中记录了Topic最新的元数据。
  • version:表示Kafka集群元数据的版本号。Kafka集群元数据每更新成功一次,version字段的值增1。通过新旧版本号的比较,判断集群元数据是否更新完成。
  • metadataExpireMs:每隔多久,更新一次。默认是300×1000,也就是5分种。
  • refreshBackoffMs:两次发出更新Cluster保存的元数据信息的最小时间差,默认为100ms。这是为了防止更新操作过于频繁而造成网络阻塞和增加服务端压力。在Kafka中与重试操作有关的操作中,都有这种“退避(backoff)时间”设计的身影。
  • lastRefreshMs:记录上一次更新元数据的时间戳(也包含更新失败的情况)。
  • lastSuccessfulRefreshMs:上一次成功更新的时间戳。如果每次都成功,则lastSuccessfulRefreshMs、lastRefreshMs相等。 否则,lastRefreshMs>lastSuccessulRefreshMs。
  • cluster:记录Kafka集群的元数据。
  • needUpdate:标识是否强制更新Cluster,这是触发Sender线程更新集群元数据的条件之一。
  • listeners:监听Metadata更新的监听器集合。自定义Metadata监听实现Metadata.Listener.onMetadataUpdate方法即可,在更新Metadata中的cluster字段之前,会通知listener集合中全部Listener对象。
  • needMetadataForAllTopics:是否需要更新全部Topic的元数据,一般情况下,KafkaProducer只维护它用到的Topic的元数据,是集群中全部Topic的子集。

Metadata的方法比较简单,主要是操纵上面的几个字段,这里着重介绍主线程用到的requestUpdate方法和awaitUpdate方法。

requestUpdate()方法将needUpdate字段修改为true,这样当Sender线程运行时会更新Metadata记录的集群元数据,然后返回version字段的值。

awaitUpdate方法主要是通过version版本号来判断元数据是否更新完成,更新未完成则阻塞等待。

在这里插入图片描述

下面回到KafkaProducer.waitOnMetadata方法的分析,它负责触发Kafka集群元数据的更新,并阻塞主线程等待更新完毕。它的主要步骤是:

  1. 检测Metadata中是否包含指定Topic的元数据,若不包含,则将Topic添加到topics集合中,下次更新时会从服务端获取指定Topic的元数据。
  2. 尝试获取Topic中分区的详细信息,失败后会调用requestUpdate)方法设置Metadata.needUpdate字段,并得到当前元数据版本号。
  3. 唤醒Sender线程,由Sender线程更新Metadata中保存的Kafka集群元数据。
  4. 主线程调用awaitUpdate()方法,等待Sender线程完成更新。
  5. 从Metadata中获取指定Topic分区的详细信息(即PartitionInfo集合)。若失败,则回到步骤2继续尝试,若等待时间超时,则抛出异常。

waitOnMetadata()方法的具体实现如下:

在这里插入图片描述

Serializer&Deserializer

客户端发送的消息的key和value都是byte数组,Serializer和Deserializer接口提供了将Java对象序列化(反序列化)为byte数组的功能。在KafkaProducer中,根据配置文件,使用合适的Serializer。

图展示了Serializer和Deserializer接口以及它们的实现类。

在这里插入图片描述
Kafka已经为我们提供了Java基本类型的Serializer实现和Deserializer实现,我们也可以对Java复杂类型的自定义Serializer和Deserializer实现,只要实现Serializer或Deserializer接口即可。

下面简单介绍Serializer,Deserializer是其逆操作。

在Serializer接口中,configure()方法是在执行序列化操作之前的配置,例如,在StringSerializer.configure()方法中会选择合适的编码类型(encoding),默认是UTF-8;IntegerSerializer.configure()方法则是空实现。

serializer方法是真正进行序列化的地方,将传入的Java对象序列化为byte[]。

close方法是在其后的关闭方法,多为空实现。

Partitioner

KafkaProducer.send()方法的下一步操作是选择消息的分区。

在有的应用场景中,由业务逻辑控制每个消息追加到合适的分区中,而有时候业务逻辑并不关心分区的选择。

在KafkaProducer.partition方法中,优先根据ProducerRecord中partition字段指定的序号选择分区,如果ProducerRecord.partition字段没有明确指定分区编号,则通过Partitioner.partition()方法选择Partition。

在这里插入图片描述

Kafka提供了Partitioner接口的一个默认实现——DefaultPartitioner,继承结构如图(左)所示,可以看到上面介绍的ProducerInterceptor接口也继承了Configurable接口。

在创建KafkaProducer时传人的key/value配置项会保存到AbstractConfig的originals字段中,如图(右)所示。AbstractConfig的核心方法是getConfiguredInstance方法,其主要功能是通过反射机制实例化originals字段中指定的类。在前面分析KafkaProducer的构造函数时,也看到过此方法的调用。

DefaultPartitioner.partition方法负责在ProduceRecord中没有明确指定分区编号的时候,为其选择合适的分区:如果消息没有key,会根据counter与Partition个数取模来确定分区编号,count不断递增,确保消息不会都发到同一个Partition里;如果消息有key的话,则对key进行hash(使用的是murmur2这种高效率低碰撞的Hash算法),然后与分区数量取模,来确定key所在的分区达到负载均衡。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1389941.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言——详解字符函数和字符串数组(上)

目录 一、strlen的使用和模拟实现 1.strlen()函数的介绍 2.strlen()函数的具体使用 3.strlen函数的注意事项 4.strlen函数的模拟实现 二、strcpy的使用和模拟实现 1.strcpy()函数的介绍 2.strcpy()函数的具体使用 3.strcpy()函数的注意事项 4.strcpy函数的模拟实现 …

js逆向第19例:猿人学第17题天杀的Http2.0

文章目录 一、前言二、定位关键参数三、代码实现四、参考文献一、前言 任务十七:抓取这5页的数字,计算加和并提交结果 题目已经给出来标准答案,而且此题设置为“非常简单”其关键就是HTTP/2.0请求,打开控制台查看请求接口数据如下: 二、定位关键参数 可以看到控制台显示…

计算机毕业设计-----SSH学生请假管理系统

项目介绍 本项目分为管理员、教师、学生三种角色; 教师角色包含以下功能: 教师角色登录,请假查看,审批学生请假,学生管理等功能。 管理员角色包含以下功能: 班级管理,班主任管理,公告管理,留言板管理,学生管理等功能。 学生角色包含以下…

MySQL 中有关 NULL 的三个坑

mysql sum 函数、count 函数,以及 NULL 值条件可能踩的坑。 SELECT SUM(score) FROM person ; nullSELECT COUNT(score) FROM person; 0select id from person where scoreNULL; null 显然,这三条 SQL 语句的执行结果和我们的期望不同&#xf…

一篇文章带你搞懂---全排序

顾得泉:个人主页 个人专栏:《Linux操作系统》 《C/C》 《LeedCode刷题》 键盘敲烂,年薪百万! 全排序(Permutation)是指将一组元素按照一定的顺序进行排列的过程。在计算机科学中,全排序是一…

Centos系统安全设置

1 设置密码复杂度,帐号密码有效期3个月 密码复杂度要求:最小长度8位,至少2位大写字母,1位小写字母,4位数字,1位特殊字符 1)执行备份: #cp -p /etc/login.defs /etc/login.defs_bak…

zotero使用gpt

zotero使用gpt 下载 zotero下载:https://www.zotero.org/download/ 插件下载:https://github.com/MuiseDestiny/zotero-gpt?tabreadme-ov-file 插件安装 zotero中选择 工具->添加组件 选择右上角的齿轮,选择Install add-on from fil…

c++继承和派生(1)

目录 1.含义:从不同层面看的,同一种意思 2. 继承的使用场景 1. 共同之处 2. 迭代更新 3. 代码重构 3. 继承的基本语法 4. 继承了什么 1. 查看占用内存大小 2. 使用vs自带的功能查看类的内存分布 布局: 3. 结果 5. pr…

软件测试|使用Python提取出语句中的人名

简介 在自然语言处理(NLP)中,提取文本中的人名是一项常见的任务。Python作为一种流行的编程语言,拥有强大的NLP库和工具,使我们能够轻松地进行这项任务。在本文中,我们将使用Python示例来演示如何提取文本…

Linux第23步_安装windows下的STM32CubeProgrammer软件和安装 DFU驱动程序

STM32CubeProgrammer软件是通过USB3.0接口烧写系统软件。 STM32CubeProgrammer需要java运行环境,因此需要安装JAVA软件。 Java 运行环境版本必须是 V1.7 及以上,一定要用 64位的,和win系统一致,否则使用 STM32CubeProgrammer烧…

深入解析JavaScript中的函数绑定和命名函数表达式

🧑‍🎓 个人主页:《爱蹦跶的大A阿》 🔥当前正在更新专栏:《VUE》 、《JavaScript保姆级教程》、《krpano》、《krpano中文文档》 ​ ​ ✨ 前言 函数是JavaScript中最重要的组成部分之一。但是函数的this绑定和作用…

Power Query 中常用的数据清洗、转换操作

(一)数据筛选、保留、删除、去重 1、数据筛选 操作步骤 选中列——点击列名右侧箭头——进行筛选 可选操作 勾选文本筛选器:等于/不等于、开头是/开头不是、结尾是/结尾不是、包含/不包含数字筛选器:等于/不等于、大于/大于等于…

harbor https

harbor https部署 准备docker-compose安装https 证书harbor安装访问harbor推镜像到harbor 准备 192.168.112.99,harbor,centos7 192.168.112.3,测试机,centos7 docker版本:docker-ce 20.10.16(部署参考&a…

主流浏览器设置代理IP之搜狗浏览器

给浏览器设置代理IP是目前代理IP的主流使用场景之一,接下来小编就手把手教你如何对搜狗浏览器进行代理IP设置 注:本次使用IP来源于携趣代理平台 搜狗浏览器内设置IP代理 1、首先需要进入浏览器【设置】 2.点击【代理设置】选择【代理服务器设置】然后进…

ITE IT6801FNBX HDMI接收器 芯片

一、物料概述 IT6801FN是一款单端口HDMI接收器,可在HDMI1.4和MHL2.1双模式下工作,完全兼容MHL2.1、HDMI 1.4a、HDMI 1.4a3D和HDCP1.4,还可向后兼容DVI 1.0规格。IT6801FN具有深彩色功能(高达36位),可确保接…

腾讯开源AI工具PhotoMaker,无需训练模型就能保持脸部一致随意换装。

腾讯开源AI工具PhotoMaker,无需训练模型就能保持脸部一致随意换装。 最近,AI文本到图像生成领域取得了显著进展,然而,现有的个性化生成方法已经无法同时满足用户的需求,这次腾讯发布了PhotoMaker框架,可以在…

IaC基础设施即代码:Terraform 通过后端使用 alicloud的OSS 实现资源管理

目录 一、实验 1.环境 2.Windows创建Terraform后端项目 3.Windows实例化Terraform后端项目 3.Windows给Terraform项目添加alicloud阿里云OSS (实现代码与资源分离) 4.Windows给Terraform项目添加封装的模块 5.Terraform通过后端使用 alicloud阿里…

价值7500的在线授权网站源码支持IP+域名+双向授权全开源

PHP授权验证更新系统完整版,一键更新系统,一键卡密生成自助授权功能,域名ip双重验证功能等等 修复盗版检测,确保实时查看盗版 修复在线加密系统,一键加密 授权系统几乎所有的程序都能整合使用,包括您的app和计算机程序…

el-table右固定最后一列显示不全或者是倒数第二列无边框线

问题图片: 解决方式1: >>>.el-table__row td:not(.is-hidden):last-child { border-left:1px solid #EBEEF5; } >>>.el-table__header th:not(.is-hidden):last-child{ border-left:1px solid #EBEEF5; } >>>.el-table__head…

文件压缩完还是过大怎么办?这些参数你设置了吗?

不同的压缩算法对不同类型的文件可能有不同的效果。尝试使用不同的压缩算法、压缩模式、调整压缩工具的参数或者设置输出的文件格式等方法解决问题。下面以嗨格式压缩大师作为操作示范。 方法一:对文件进行分类压缩 将文件按照视频、图片、pdf、word、PPT分类压缩…