Kafka 入门到起飞系列 - 生产者发送消息流程解析

news2025/1/9 15:26:51

在这里插入图片描述

  • 生产者通过producerRecord 对象封装消息主题、消息的value(内容)、timestamp(时间戳)等

  • 生产者通过send()方法发送消息,send()方法会经过如下几步
    1. 首先将消息交给拦截器(Interceptor)处理, 拦截器对生产者而言,对所有消息都是生效的,拦截器也支持链式编程(责任器链)的效果,拦截器一般将一些通用的功能加进来,通常在消息发送前,producer回调逻辑前对消息做一些定制化需求,消息头部添加消息的属性等
    2. 接下来交给序列化器(Serializer),Key的序列化器和value的序列化器,对消息的key和value进行序列化,序列化为字节数组,
    3. 然后将序列化的结果交给分区器(Partitioner),分区器有3种策略来计算消息应该属于哪个分区,

    • 在producerRecord中直接指定分区,分区器会直接将消息放到指定分区

    • 如果没有指定分区器,但是消息有key,分区器会根据消息的key计算hash值,根据主题分区数量取模,来决定将消息放到哪个分区

    • 如果没有指定分区、也没有指定key,分区器会以轮询(Round Robin)的方式给消息分配分区

      在这里插入图片描述

  • 消息经过以上拦截器->序列化器->分区器 进行加工后,会将消息放到RecordAccumulator缓冲区,对每个分区都会有一个单独的缓冲区,经过分区器计算出分区号之后,不同的消息就会分配给不同的缓冲区,缓冲区里面消息也是有序的,我们可以指定对缓冲区里的消息进行分批次,也可以指定缓冲区大小

  • 在这里插入图片描述

  • 当缓冲区中消息达到条件会按批次发送到broker对应分区上

  • broker将接收到的消息进行刷盘持久化

  • 一个消息发出去之后,服务器(broker)会返回给producer响应,producer再来判断消息是否发送成功,

  • broker返回元数据信息 - > 落盘成功 ->生产者继续发送后面消息

  • broker返回元数据信息 - >落盘失败 - 生产者设置了重试次数 -> producer 会将消息重新放入缓冲区进行排队,等待再次发送,当一个消息发送失败重试需要重发,消息是放到缓冲区队尾,

  • 生产者去缓冲区重试发送


生产者在重试消息时,消息的顺序就错了,那怎么保证消息的有序性呢?

在这里插入图片描述

针对这种情况,可以做一个配置,
参数:max.in.flight.requests.per.connection表示producer 在收到broker响应之前可以发送多少批消息,默认5,
设置此值是1,表示broker在响应之前producer不能再向同一个broker发送请求,就是我确认一批你再发下一批,这样可以保证消息有序性,对消息顺序要求不高情况可以不考虑


补充:

  • Producer 创建时,会创建一个Sender线程(IO线程)设置为守护线程

  • Producer 创建时,会创建缓冲区

  • Producer 生产消息,内部是一个异步流程,Sender线程不断轮询RecordAccumulator,满足条件后进行真正的网络IO发送消息

  • 在这里插入图片描述

  • RecordAccumulator(缓冲区) 对每一个分区都有一个缓冲区

    • 每个分区的缓冲区中消息也是有序的
    • 可以指定缓冲区中的消息按批次发送
      • 缓冲区大小达到batch.size,默认16KB
      • 在缓冲区等待时间 lingger.ms 达到上限
      • 以上两个条件满足一个即发送一批
    • 可以指定整个缓冲区的大小

批次的概念很好理解,缓冲区就像一辆公交车,有两种发车方式,一是人满了就发车,一是等5分钟就发车,不管是人满了还是到5分钟了,发车,go~
分批发送可以减少网络IO,节省带宽使用,减少网络传输的压力,提升吞吐量

  • 一个批次消息发送后,通过网络,发往Kafka指定分区,然后刷盘到broker
  • 如果Producer设置了retries参数值>0,那么允许消息发送失败进行重试,重试机制由客户端Producer内部实现
  • Broker端消息落盘成功,会返回元数据给生产者
    • 通过阻塞直接返回 (同步发送)
    • 通过回调函数返回(异步发送)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/775066.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java编程-IDEA中Java的main方法psvm、sout快捷键设置

目的 我打出psvm这四个字母时,可快速打出main方法 我打出syso,sout时,可快速打出System.out.println(); 步骤: 1、打开IDEA,点击文件,选择Editor中的 Live Templates选项,点击右侧边栏中的 号 2、选中…

教你用Python+selenium搭建自动化测试环境

今天给大家带来的是关于Python的相关知识,文章围绕着如何用Pythonselenium搭建自动化测试环境展开,文中有非常详细的介绍,需要的朋友可以参考下 一、环境搭建 1、安装pythonpycharm软件 。python安装网址官网:About Python™ | Python.org 根据自己的电脑系统选择…

设计模式 ~ 发布订阅模式

概念 用于实现对象之间的松耦合通信; 在该模式中,存在一个或多个发布者(Publishers)和一个或多个订阅者(Subscribers); 发布者负责发布消息,而订阅者负责订阅感兴趣的消息并在接收到…

Java中高级面试题,开发模拟练习

1.Redis为什么这么快? Redis是单线程的,避免了多线程的上下文切换和并发控制开销;Redis大部分操作时基于内存,读写数据不需要磁盘I/O,所以速度非常快;Redis采用了I/O多路复用机制,提高了网络I/O并发性;Redis提供高效的数据结构&…

数据科学团队的角色分工

描述数据科学团队中角色分工常用下列维度。进一步以数据可视化直观表达的能力雷达图: ML Ops - 机器学习运维 Data Pipelines - 数据流水线 Database - 数据库 Data Viz - 数据可视化 Storytelling - 数据讲故事 Business Insights - 业务洞察 Reporting - 报告 Experimentatio…

Apache和Nginx是什么?|Nginx和Reactor是什么?|网路IO的本质|阻塞队列|异步非阻塞IO

前言 那么这里博主先安利一些干货满满的专栏了! 首先是博主的高质量博客的汇总,这个专栏里面的博客,都是博主最最用心写的一部分,干货满满,希望对大家有帮助。 高质量干货博客汇总https://blog.csdn.net/yu_cblog/c…

pandas 笔记:melt函数

和pivot_index 相反,前者是长表转宽表,melt是宽表转长表 pandas 笔记:pivot_table 数据透视表_UQI-LIUWJ的博客-CSDN博客 1 基本使用方法 pandas.melt(frame, id_varsNone, value_varsNone, var_nameNone, value_namevalue, col_levelNone…

【Linux】Zookeeper集群 + Fafka集群

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 Zookeeper集群 Fafka集群 Zookeeper 概述Zookeeper 定义Zookeeper 工作机制Zookeeper 特点Zookeeper 数据结构Zookeeper 应用场景Zookeeper 选举机制 Kafka 概述为什么需要消…

⚡【C语言趣味教程】(3) 浮点类型:单精度浮点数 | 双精度浮点型 | IEEE754 标准 | 介绍雷神之锤 III 源码中的平方根倒数速算法 | 浮点数类型的表达方式

🔗 《C语言趣味教程》👈 猛戳订阅!!! ​—— 热门专栏《维生素C语言》的重制版 —— 💭 写在前面:这是一套 C 语言趣味教学专栏,目前正在火热连载中,欢迎猛戳订阅&#…

laravel 的SQL使用正则匹配

案例场景 精准正则匹配 查询结果 代码如下 $regexp ^ . $new_str . [^0-9];$info Test::query()->where(is_del, 0)->whereRaw("name REGEXP $regexp")->pluck(name, id)->toArray();字符 “^” 匹配以特定字符或者字符串开头的文本 name 字段值包含…

国产单片机(沁恒微WCH)CH32V307评估板初探

国产单片机(沁恒微WCH)CH32V307评估板初探 关于沁恒微:国产芯厂家、官网链接 公司简介 - 南京沁恒微电子股份有限公司 (wch.cn) 开发板资源: 评估板应用于 CH32V307 芯片的开发,IDE 使用 MounRiver 编译器,可选择使用板载或独…

Python自动化办公:docx篇

文章目录 简介官方demo读取并修改已存在的docx参考文献 202201笔记迁移 简介 python的docx包是可以用来自动化处理docx文件,可以从无到有生成一个docx文件,也可以对已有的docx文件做批量修改。(但印象里是只能操作.docx文件,如果…

【电路原理学习笔记】第5章:串联电路:5.2 串联电路的总电阻

第5章:串联电路 5.2 串联电路的总电阻 5.2.1 串联电阻相加 由于每个电阻对电流的阻力与其阻值成正比,因此,当电阻串联时,电阻值要相加串联电阻的数量越多,对电流的阻力就越大,也就意味着更大的电阻。因此…

收入下滑,亏损严重,面临法律诉讼的中驰车福申请纳斯达克IPO上市

来源:猛兽财经 作者:猛兽财经 猛兽财经获悉,来自北京的汽车产业供应链数字化服务商【中驰车福】(Autozi Internet Technology (Global) Ltd)近期已向美国证券交易委员会(SEC)提交招股书&#x…

新建Mybatis流程

删除src目录 pom文件夹下导入依赖 这样的话每次只用改父项目的内容,就不必每次都导包 1.修改这三个文件 2.mybatis-config.xml的配置文件有顺序的规定,properties需要写在最上面。 3.类型别名

Bridging the Gap Between Anchor-based and Anchor-free Detection via ATSS 论文学习

1. 解决了什么问题? Anchor-based 和 anchor-free 方法的本质差异其实是如何定义正负样本,如果训练过程中它们采用相同的正负样本定义,最终的表现是差不多的。也就是说,如何选取正负样本才是最重要的。 以单阶段 anchor-based 方…

C++初探

目录 经典开头 — C的历史 作用域运算符 using的用法 命名空间 - namespace 命名空间的基本使用 特殊的命名空间 - 无名命名空间 全部展开和部分展开 std — C所有的标准库都在std命名空间内 省缺值 - 默认参数 占位参数 内联函数 - inline 函数重载 函数重载的用…

MySQL八股学习过程2行的存储 from 小林coding

MySQL八股学习过程2行的存储 from 小林coding MySQL数据的存放MySQL表结构InnoDB行格式记录的额外信息记录的真实数据 MySQL数据的存放 下面的命令能够查询到MySQL数据库文件的存放位置 SHOW VARIABLES LIKE datadir;一张表的结构会保存在表同名.frm中,数据会保存在表同名.ib…

导轨式 称重传感器 压力应变桥信号处理 隔离变送器

主要特性 DIN11 IPO 压力应变桥信号处理系列隔离放大器是一种将差分输入信号隔离放大、转换成按比例输出的直流信号导轨安装变送模块。产品广泛应用在电力、远程监控、仪器仪表、医疗设备、工业自控等行业。此系列模块内部嵌入了一个高效微功率的电源,向输入端和输…

Kyuubi的介绍优势(官网链接)

官网链接:https://kyuubi.apache.org/ Apache Kyuubi™ 是一个分布式多租户网关,用于在数据仓库和 Lakehouse 上提供无服务器 SQL。 Kyuubi 在各种现代计算框架(例如 Apache Spark、 Flink、 Doris、 Hive和Trino等)之上构建分布…