图解系列 图解Kafka之Producer

news2025/1/11 8:45:31

开局一张图,其他全靠吹

发送消息流程如下

1.初始化流程

  • 指定bootstrap.servers,地址的格式为 host:port。它会连接bootstrap.servers参数指定的所有Broker,Producer启动时会发起与这些Broker的连接。因此,如果你为这个参数指定了1000个Broker连接信息,那么很遗憾,你的Producer启动时会首先创建与这1000个Broker的TCP连接。

    • 在实际使用过程中,我并不建议把集群中所有的Broker信息都配置到bootstrap.servers中,通常你指定3~4台就足以了。因为Producer一旦连接到集群中的任一台Broker,就能拿到整个集群的Broker信息,故没必要为bootstrap.servers指定所有的Broker。
    • props.put("bootstrap.servers", "localhost:9092");
  • 指定Key和Value的序列化方式。

    •  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
  • 指定acks配置,默认值是all(版本3.x)

    • props.put("acks", "all");
    • 设置为0,表示生产端发送消息后立即返回,不等待broker端的响应结果。通常此时生产端吞吐量最高,消息发送的可靠性最低。
    • 设置为1,表示leader副本成功写入PageCache就会响应Producer,而无需等待ISR(同步副本)集合中的其他副本写入成功。这种方案提供了适当的持久性,保证了一定的吞吐量。
    • 设置成all或-1,表示不仅要等leader副本成功写入,还要求ISR中的其他副本成功写入,才会响应Producer。这种方案提供了最高的持久性,但也提供了最差的吞吐量。
  • producer = new KafkaProducer<>(props);

    • 从配置中获取必要的参数,如transactionalIdclientId
    • 根据clientId创建日志记录上下文(LogContext),用于日志记录。
    • 配置度量(Metrics)相关信息,包括度量标签、度量配置、度量报告器等。
    • 创建度量上下文(MetricsContext)和度量实例(Metrics)。
    • 初始化分区器(Partitioner)。
    • 配置并初始化键(key)和值(value)的序列化器(Serializer)。
    • 配置并初始化拦截器(Interceptors)。
    • 配置集群资源监听器(ClusterResourceListeners)。
    • 设置最大请求大小(maxRequestSize)、内存大小(totalMemorySize)和压缩类型(compressionType)等参数。
    • 配置最大阻塞时间(maxBlockTimeMs)和交付超时时间(deliveryTimeoutMs)。
    • 初始化API版本(apiVersions)和事务管理器(transactionManager)。
    • 创建记录累加器(RecordAccumulator),用于累积记录以进行批量发送。
    • 解析并验证引导服务器地址(addresses)。
    • 如果提供了元数据(metadata),则使用提供的元数据,否则创建新的元数据实例,并通过引导服务器地址进行引导。
    • 初始化错误度量传感器(errors)。
    • 创建并启动IO线程(ioThread)来处理消息发送。
    • 注册应用程序信息,用于JMX度量和监控。
    • 如果在初始化过程中发生任何错误,将调用关闭方法以避免资源泄漏,并向上抛出Kafka异常。

2.发送消息流程

在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulatormain 线程将消息发送给 RecordAccumulatorSender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker

  • 构造消息记录ProducerRecord 对象,对象包含了四个属性:Topic,partition,key,value;topic 和 value 是必须的,key 和 partition 是可选的。
  • 同步获取Kafka集群信息(Cluster)。
    • 如果缓存有,并且分区没有超过指定分区范围则返回缓存
    • 否则触发更新,等待从broker获取新的元数据信息
    • 默认强制拉取时间是metadata.max.age.ms: 5分钟
  • 使用键序列化器(keySerializer)将消息的键序列化为字节数组,使用值序列化器(valueSerializer)将消息的值序列化为字节数组。
  • 计算数据发送到那个分区,如果指定了 key,那么相同 key 的消息会发往同一个分区,如果实现了自定义分区器,那么就会走自定义分区器进行分区路由。
    • 如果有Key值,则使用Key值的Hash值来分配分区 murmurhash(key) % 主题分区总数
    • 老版本:如果没有key值,则以Round-Robin的方式分配分区。
    • 新版本:如果没有key值,则以粘性分区的方式分配分区
  • 创建一个TopicPartition对象,表示要发送消息的主题和分区。
  • 判断消息的大小是否超过了我们设置的阈值。
  • 异步发送时,给每一条消息都绑定他的回调函数
  • 把消息放入记录累加器(accumulator)(32M的一个内存)*,*然后有accumulator把消息封装成为一个批次一个批次的去发送。
  • 如果批次满了或者新创建出来一个批次, 唤醒sender线程,他才是真正发送数据的线程,发送的时候并不是来一个消息就发送一个消息,这样的话吞吐量比较低,并且频繁的进行网络请求。消息是按照批次来发送的或者等待时间来发的的.

参考

  • https://www.clairvoyant.ai/blog/unleash-kafka-producers-architecture-and-internal-workings
  • 尚硅谷 Kafka

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/981474.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

点云从入门到精通技术详解100篇-伪雷达点云预测

前言 近年来,“自动驾驶”已经成为一个耳熟能详的词语,它是一种通过车载计 算实现无人驾驶的智能汽车系统。自动驾驶汽车依靠人工智能、视觉计算、视觉 传感器、控制设备和定位系统协同合作,让系统可以在无人主动操作的情况下, 自动安全地操作机动车辆。其中视觉传感器作…

【继RNN之后的一项技术】Transfomer 学习笔记

谷歌团队在17年的神作&#xff0c;论文17年6月发布 https://arxiv.org/abs/1706.03762 被NIPS2017收录&#xff0c;目前引用量已经逼近3w。 以下内容参考李沐老师的课程《动⼿学深度学习(Pytorch版)》 简介 注意力 自主性&#xff1a;有目的的搜索某样东西&#xff08;键&…

Python小知识 - 如何使用Python进行机器学习

如何使用Python进行机器学习 Python是一种解释型、面向对象、动态数据类型的高级程序设计语言。 机器学习是人工智能的一个分支&#xff0c;是让计算机自动“学习”。学习的过程是从经验E中获得知识K。经验E可以是一个数据集&#xff0c;比如一个图像数据集。知识K可以是计算机…

Python爬虫-爬取文档内容,如何去掉文档中的表格,并保存正文内容

前言 本文是该专栏的第58篇,后面会持续分享python爬虫干货知识,记得关注。 做过爬虫项目的同学,可能或多或少爬取过文档数据,比如说“政务网站,新闻网站,小说网站”等平台的文档数据。爬取文档数据,笔者这里就不过多详述,而本文,笔者将主要介绍在爬取文档数据的过程中…

六安RapidSSL泛域名https能保护几个域名

RapidSSL是Geotrust旗下的子品牌&#xff0c;Geotrust是国际知名的CA认证机构&#xff0c;每年都要通过WebTrust年检&#xff0c;而Geotrust旗下的https证书已经应用于市场多年&#xff0c;Geotrust的子品牌RapidSSL证书经营的DV基础型泛域名https证书也受到市场的欢迎。今天就…

MFC新建内部消息

提示&#xff1a;记录一下MFC新建内部消息的成功过程 文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 前言 先说一下基本情况&#xff0c;因为要在mapview上增加一个显示加载时间的功能。然后发现是要等加载完再显示时间&#xff0c;显示在主…

动手学深度学习d2l.Animator无法在PyCharm中显示动态图片的解决方案

from d2l import torch as d2l一、问题描述 运行d2l的训练函数&#xff0c;仅在控制台输出以下内容&#xff0c;无法显示动态图片&#xff08;训练监控&#xff09; <Figure size 350x250 with 1 Axes> <Figure size 350x250 with 1 Axes> <Figure size 350x2…

数据结构与算法之贪心动态规划

一&#xff1a;思考 1.某天早上公司领导找你解决一个问题&#xff0c;明天公司有N个同等级的会议需要使用同一个会议室&#xff0c;现在给你这个N个会议的开始和结束 时间&#xff0c;你怎么样安排才能使会议室最大利用&#xff1f;即安排最多场次的会议&#xff1f;电影的话 那…

Fiddler 系列教程(一)初识Fiddler,我们能用fiddler做什么?

Fiddler是最强大最好用的Web调试工具之一&#xff0c;它能记录所有客户端和服务器的http和https请求&#xff0c;允许你监视&#xff0c;设置断点&#xff0c;甚至修改输入输出数据. 使用Fiddler无论对开发还是测试来说&#xff0c;都有很大的帮助。 阅读目录 Fiddler的基本介…

23个react常见问题

1、setState 是异步还是同步&#xff1f; 合成事件中是异步 钩子函数中的是异步 原生事件中是同步 setTimeout中是同步 相关链接&#xff1a;你真的理解setState吗&#xff1f;&#xff1a; 2、聊聊 react16.4 的生命周期 图片 相关连接&#xff1a;React 生命周期 我对 Reac…

康耐视visionpro破解版满天飞,那么如何查询康耐视Visionpro加密狗支持哪些工具

目录 第一步骤&#xff0c;点击WinR&#xff0c;弹出命令符第二步骤&#xff1a;输入CMD&#xff0c;回车第三步骤&#xff1a;输入cogtool -p&#xff08;cogtool与-p之间有空格&#xff09;&#xff0c;输入完毕后&#xff0c;记得回车&#xff0c;稍等3秒钟不到。 第一步骤&…

[js] 图解 event.pageX event.clientX event.offsetX getBoundingClientRect

event.clientX、event.clientY 鼠标相对于浏览器窗口可视区域的X&#xff0c;Y坐标&#xff08;窗口坐标&#xff09;&#xff0c;可视区域不包括工具栏和滚动条。IE事件和标准事件都定义了这2个属性 event.pageX、event.pageY 类似于event.clientX、event.clientY&#xff0c;…

作为一名游戏开发工作者,分享一些工作经验

作为一名游戏开发工作者&#xff0c;有一些重要的心得和经验可以分享&#xff0c;这些经验可以帮助你更好地在游戏开发领域取得成功&#xff1a; 学习不断进步&#xff1a;游戏开发是一个不断演进的领域&#xff0c;你需要不断学习新的技术和工具。跟踪最新的游戏开发趋势&…

Excel VSTO开发2 -建立Excel VSTO项目

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 2 建立Excel VSTO项目 新建项目&#xff0c;选择Excel 2013和2016 VSTO外接程序。输入项目名称&#xff08;本示例的项目名称为&am…

python基于Django健身爱好者交流分享平台vue

而肌友网—健身交流平台能很好地解决这一问题&#xff0c;轻松应对健身交流&#xff0c;既能提高用户对健身交流的评价&#xff0c;又能加快健身交流平台的效率&#xff0c;取代人工管理是必然趋势。 本肌友网—健身交流平台以Django作为框架&#xff0c;B/S模式以及MySql作为后…

Android Studio开发入门教程:如何让开发的app国际化?

配置APP的语言环境&#xff08;文末有Android Studio以及雷电模拟器的压缩包&#xff09; 实验目的&#xff1a; 为了使我们基于android操作系统开发的APP能更好的国际化&#xff0c;面向不同的国家市场&#xff0c;我们需要做出相关的操作&#xff0c;使开发出来的APP会根据用…

解决husky在mac下不生效的问题

目录 一、问题 1.1 问题描述 二、解决 2.1 解决 一、问题 1.1 问题描述 本文主要解决的问题是&#xff0c;husky在windows上正常生肖&#xff0c;但放到mac下后不生效的问题&#xff01; 为了确保团队中提交代码的一致性&#xff0c;因此使用了 husky 作为提交的检测工具…

【第二章 数据的表示和运算】d1

【2.1】 各种码的比较&#xff0c;0的表示&#xff1a; 【2.2】 符号扩展&#xff1a; 溢出判别用异或&#xff08;不同01/10取1&#xff0c;相同00/11取0&#xff09;&#xff1a; 进位判别符号位Cs&#xff0c;最高位C1&#xff1a; 补码与-x的补码&#xff1a; /2&…

【Seata】03 - Seata AT 模式全局锁相关知识简单整理

文章目录 前言参考目录版本说明分析整理1、全局锁的引入说明2、全局锁相关源码整理2.1、流程简图2.2、事务分支注册前的 SQL 相关操作2.3、注册分支&#xff08;获取全局锁&#xff09; 前言 上一篇文章介绍了 AT 模式的调用流程&#xff0c;但是有个比较重要的概念没有提及到…

【已解决】pycharm 突然每次点击都开新页面,关不掉怎么办?

今天在 pycharm 中写代码&#xff0c;突然发现&#xff0c;新开的文件不再原来的页面上&#xff0c;而是新增了页面&#xff0c;导致整个屏幕全都是新开的页面&#xff0c;最难受的是&#xff0c;关不掉&#xff01; 无奈&#xff0c;我只能关闭 pycharm&#xff0c;重新双击…