深入理解Kafka Producer内部机制

news2025/1/11 23:57:15

总的来说,Kafka Producer是将数据发送到kafka集群的客户端。其组成部分如下图所示:

基本组件:

  • Producer Metadata——管理生产者所需的元数据:集群中的主题和分区、充当分区领导者的代理节点等。
  • Partitioner——计算给定记录的分区。
  • Serializers——记录键和值序列化器。 序列化程序将对象转换为字节数组。
  • Producer Interceptors——可能改变记录的拦截器。
  • Record Accumulator——累积记录并按主题分区将它们分组为批次。
  • Transaction manager——管理事务并维护必要的状态以确保幂等生产。
  • Sender——向 Kafka 集群发送数据的后台线程。

配置Kafka Producer

kafka producer有三个必须指定的参数:

  • bootstrap.servers — 主机/端口对列表,用于建立与 Kafka 集群的初始连接。 格式:“host1:port1,host2:port2,…”
  • key.serializer — 代表实现 org.apache.kafka.common.serialization.Serializer 接口的key序列化器的完全限定类名。
  • value.serializer — 代表实现 org.apache.kafka.common.serialization.Serializer 接口的value序列化器的完全限定类名。

数据发送到kafka流程

Kafka Producer异步发送消息,返回一个Future,代表发送结果。 此外,用户可以选择提供在 Kafka broker确认记录时调用的回调。 虽然它看起来很简单,但在背后却发生了一些事情。

  1. 生产者将消息传递给已配置的拦截器列表。 例如,拦截器可能会改变消息并返回更新版本。
  2. 序列化程序将记录键和值转换为字节数组
  3. 如果没有特别指定,则使用默认或配置的分区程序计算主题分区。
  4. 记录累加器使用配置的压缩算法将消息附加到生产者批次。

此时,消息还在内存中,并没有发送给Kafka broker。 Record Accumulator 按主题和分区对内存中的消息进行分组。

Sender线程将具有相同broker的多个批次作为领导者分组到请求中并发送它们。 此时,消息被发送到Kafka。

发送时间

Kafka Producer 提供配置参数来控制在各个阶段花费的时间:

  • max.block.ms — 等待元数据获取和缓冲区分配的时间
  • linger.ms — 等待发送其他记录的时间
  • retry.backoff.ms——重试失败请求前等待的时间
  • request.timeout.ms — 等待 Kafka broker 响应的时间
  • delivery.timeout.ms — 后来引入,是 KIP-91 的一部分,用于为用户提供有保证的超时上限,而无需调整生产者组件内部结构

数据持久性

用户可以通过 acks 配置参数控制写入 Kafka 的消息的持久性。 允许的值为:

  • 0, producer 不会等待 broker 确认
  • 1,一个生产者只会等待分区领导者写消息,而不需要等待所有的追随者
  • all,生产者将等待所有同步副本确认消息。 这是以延迟成本为代价的,代表了最强大的可用保证。

使用 acks=all 时,对于同步副本有一些细微差别需要澄清。 在 Kafka 方面,两个设置和当前状态会影响行为:

  • Topic replication factor
  • min.insync.replicas 设置
  • 当前同步副本的数量,包括领导者本身。

min.insync.replicas 指定 acks=all 请求的同步副本的最小阈值。 如果不能满足这个要求,Broker 会拒绝 producer 的请求,甚至不尝试写等待确认。 下表说明了可能的情况。

在瞬时故障期间,同步副本数可能低于副本总数,但只要它大于或等于 min.insync.replicas——带有 acks=all 的请求就会成功。

用户可以通过重新发送失败的请求来减轻暂时性故障并增加持久性。 这可以通过重试(默认 MAX_INT)和 delivery.timeout.ms(默认 120000)设置来实现。 重试可能会导致消息重复并改变消息的顺序。 这些副作用可以通过设置 enable.idempotence=true 来减轻,但它是以降低吞吐量为代价的。

分区

主题中的消息被组织成分区。 用户可以通过消息键或可插入的 ProducerPartitioner 实现来控制分区分配。 Partitioner 可以使用 partitioner.class 配置来设置,它应该是一个实现 org.apache.kafka.clients.producer.Partitioner 接口的完全限定类名。

Kafka 提供了三种开箱即用的实现:DefaultPartitioner、RoundRobinPartitioner 和 UniformStickyPartitioner。

DefaultPartitioner——如果消息键为空——使用当前分区,并在下一批中更改。 对于非空键,它使用以下公式计算:murmur2hash(key) % total nr of topic partitions。

RoundRobinPartitioner — 忽略消息键,以循环方式在所有活动分区之间平均分配消息。 如果分区有一个指定的代理作为领导者,则分区被认为是活跃的。

UniformStickyPartitioner——忽略消息键,使用当前分区,并在下一批更改分区。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/76394.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Pygame入门-游戏代码结构及背景轮播、声音音效及图像动态效果

一、游戏代码结构 代码结构是代码的组织方式,也是游戏编程的思考框架。pygame官方文档中给出了以下建议,将游戏代码结构分为以下六个部分。 1导入游戏模块。 2资源处理类;定义一些类来处理最基本的资源,包括加载图像和声音,连接和…

C#(csharp)这门语言的优势在哪?

确实有不少人因为unity入坑C#,unity刚开始使用的语言也有很多,比如Unity(一种Java的扩展)或者Boo(一种受Python启发的语言),但C#坚持至今甚至成为unity使用主流,C#当然有它的优点。 …

实测 ChatGPT 编程效果被其发现,这波我先站队 Stack Overflow

本文对 ChatGPT 解答编程问题的情况进行了测试。测试了不同难度的问题,并对 ChatGPT 的回答结果进行了鉴别。测试结果表明,ChatGPT 在解答简单的编程问题时表现较为出色,但在解决复杂的问题时则不太理想。因此,也总结出了如何更好…

Vulnhub靶机:SICKOS_ 1.1

目录介绍信息收集主机信息探测主机信息探测网站探测Getshell敏感信息收集SUDO提权第2种通关思路nikto扫描站点验证破壳漏洞破壳漏洞利用计划任务提权修复文件内容错乱 & 提权总结介绍 系列:SickOs(此系列共2台) 发布日期:201…

springboot+netty实现站内消息通知(完整代码)

代码用到的组件介绍 ChannelInitializer 见名知意,就是channel 初始化器,当每个客户端创建连接时这里面的代码都会执行一遍。由于,连接建立之后,这个channel就会常驻内存,所以这里就有个值得思考的问题: …

【vue核心】1.vue简介

1. 官网 英文官网: https://vuejs.org/ 中文官网: https://cn.vuejs.org/ 2. 介绍与描述 动态构建用户界面的渐进式 JavaScript 框架 作者: 尤雨溪 3. Vue 的特点 遵循 MVVM 模式 编码简洁, 体积小, 运行效率高, 适合移动/PC 端开发 它本身只关注UI, 也可以引入其它第三…

新冠疫苗预约小程序设计与实现的源码+文档

摘 要 网络的广泛应用给生活带来了十分的便利。所以把新冠疫苗预约管理与现在网络相结合,利用java技术建设新冠疫苗预约小程序,实现新冠疫苗预约的信息化。则对于进一步提高新冠疫苗预约管理发展,丰富新冠疫苗预约管理经验能起到不少的促进…

数据库挖矿系列-优化器设计探索穿越之旅

作者:王晨 阿里云数据库产品团队 前言 引用来自百度百科的话:在数据库技术发展历史上,1970 年是发生伟大转折的一年,因为这一年的6月,IBM的圣约瑟研究实验室的高级研究员Edgar Frank Codd在Communications of ACM 上…

微信中使用ChatGPT

ChatGPT 微信 Bot1. Ubuntu2. 卸载旧版本3. apt 安装4. 添加软件源的GPG密钥5. 添加docker源到sources.list6. 安装 docker7. 启动 docker8. 建立docker用户组9. 测试10. wechat-chatgpt 搭建11. 获取 会话令牌12. 运行13. 使用微信小号扫码登录14. 重新登录14.1 停止运行容器1…

【Python项目】Python实现点选验证码识别, 模拟B站登陆 | 附源码 学习资料

前言 halo,包子们下午好 今天小编带大家是想b站模拟登陆,Python实现验证码识别 废话不多说,直接开整 相关文件 关注小编,私信小编领取哟! 当然别忘了一件三连哟~~ 公众号:Python日志 源码点击蓝色字体领…

Docker Desktop下部署springboot项目

一、前言 本文是基于windows10版本下的docker desktop来演示的,所以你需要自行安装docker desktop,可以是windows,也可以是mac,根据自己的电脑进行选择即可。 二、创建springboot项目 创建一个springboot web项目,这个比较简单&#xff0c…

Vector-常用CAN工具 - CANoe入门到精通_04

前面已经介绍了Network Node节点的创建和配置,我想大家如果仔细研究下这块基本没什么问题,但是针对相应的CAPL编程该如何去做呢?今天这篇文章就是我们专门介绍在Network Node节点中常用的一些操作函数和使用技巧。 五、 Network Node相关CAPL…

如何在 Canvas 上实现图形拾取?

大家好,我是前端西瓜哥,今天来和大家说说 canvas 怎么做图形拾取。 图形拾取,指的是用户通过鼠标或手指在图形界面上能选中图形的能力。图形拾取技术是之后的高亮图形、拖拽图形、点击触发事件的基础。 canvas 作为一个过于朴实无华的绘制工…

【软件测试】老板:你测试,我放心。测试人的成功就是不做测试?

目录:导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜)前言 测试没价值&#xf…

[附源码]计算机毕业设计的党务管理系统Springboot程序

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM mybatis Maven Vue 等等组成,B/S模式 M…

Arco Pro最佳实践,路由与菜单

Arco Pro最佳实践,路由与菜单1.路由2.菜单3.测试1.路由 路由通常都和菜单绑定在一起,为了减少维护的量,Arco直接通过路由表生成了菜单。 首先,需要先了解一下路由表的配置 现在我们来解析一下仪表盘的路由代码(dash…

Ranger集成Solr

前言 对已经在正常使用的Ranger开启Solr存储审计日志。 可以手动安装或者使用ranger admin自带的solr安装程序来安装。当然官网也说了,用户可以选择手动安装然后集成,只要你足够勇敢 :) 我们这里选择使用Ranger自带的安装程序来…

深度学习之:强化学习 Reinforcement Learning

文章目录认识强化学习Sparse RewardSupervised Learning v.s. RLRL 玩游戏Policy-based & Value-basedPolicy-based训练模型的三步骤定义目标函数衡量目标函数的好坏RL 的目标函数的好坏(reward 总和的期望)如何求得 Rθˉ\bar{R_{\theta}}Rθ​ˉ​…

win10系统+3060显卡驱动+cuda11.5+cudnn8.3安装

显卡驱动和一些cuda库安装教程 目的 本教程为了让大家能更好的了解和能更快的对显卡进行环境配置。 需注意,本教程的配置仅仅针对显卡NVIDA RTX 3060。 其他显卡对应的配置的流程雷同,仅仅是环境版本的不同。 显卡需要牢固的插装在PCI/PCI-E&#xff0…

如何发现循环中的规律?动作分解

第五章循环结构程序设计 计算机最擅长的就是重复 重复再重复 循环 就是重复 使用循环结构的条件:2个: 1 需要三个或以上的 同样的操作 多个 三就是多,事不过三,三人成虎,三人行必有我师焉 也就是多个操作 2. 必…