Kafka - 图解生产者消息发送流程

news2025/1/12 18:10:43

文章目录

  • 发送原理
    • 1. 主线程 (main thread):
    • 2. Sender 线程:
    • 3. RecordAccumulator:
  • 发送原理小结
  • 重要参数

在这里插入图片描述


发送原理

在这里插入图片描述


Kafka的Producer发送消息采用的是异步发送的方式。

在消息发送的过程中,涉及到了两个线程:main线程和Sender线程,以及一个线程共享变量:RecordAccumulator

  • ①main线程中创建了一个双端队列RecordAccumulator,将消息发送给RecordAccumulator。

  • ②Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。


Kafka的Producer发送消息采用了异步发送的方式,这个过程确实涉及到多个线程以及共享变量。下面详细展开说明这个过程:

1. 主线程 (main thread):

主线程是生产者应用的线程,它负责创建消息并将这些消息发送给Kafka Producer API。主要的操作包括:

  • 创建消息:主线程创建消息,将它们封装成ProducerRecord对象。ProducerRecord通常包括消息的主题(topic)、分区(partition)、键(key)和值(value)等信息。

  • 发送消息到RecordAccumulator:主线程将创建的消息发送到一个双端队列(deque)叫做RecordAccumulator。这个队列用于缓冲消息,允许Producer线程将消息异步发送到Kafka集群,而不需要等待每条消息都被立刻发送。

2. Sender 线程:

Sender线程是Kafka Producer内部的一个后台线程,它负责从RecordAccumulator中拉取消息并发送到Kafka broker。Sender线程的主要工作如下:

  • 从RecordAccumulator拉取消息:Sender线程定期轮询(poll)RecordAccumulator,检查是否有新消息需要发送。这个轮询是异步的,因此主线程不需要等待消息被发送。

  • 构建请求:当Sender线程发现有消息需要发送,它会构建一个或多个ProducerRequest,每个请求包含多个消息,以便进行有效的批量发送。

  • 发送消息到Kafka broker:Sender线程将构建的请求发送到Kafka broker,等待来自broker的响应。一旦消息被成功接收并记录在Kafka broker中,Sender线程会通知RecordAccumulator,以便它可以更新消息的状态。

3. RecordAccumulator:

RecordAccumulator是Producer内部的一个共享变量,用于暂存即将发送到Kafka broker的消息。主要功能包括:

  • 暂存消息:主线程将消息发送到RecordAccumulator中,使其在等待Sender线程处理。

  • 管理消息的状态:RecordAccumulator跟踪每条消息的发送状态,以确保消息被成功发送到Kafka broker。一旦消息被成功写入到Kafka broker的日志中,RecordAccumulator会将消息的状态标记为已发送。

  • 负责消息批量化:RecordAccumulator也有助于消息的批量发送,以减少网络开销和提高性能。

发送原理小结

总结一下,Kafka的Producer采用异步发送消息的方式,

  • 主线程负责创建和发送消息到RecordAccumulator,

  • 而Sender线程负责从RecordAccumulator中拉取消息并将其发送到Kafka broker。

  • RecordAccumulator充当缓冲区,用于管理消息的状态以及批量发送,以提高性能和降低延迟。

这个架构充分利用了多线程和异步操作,使得Producer能够高效地发送消息到Kafka集群。


重要参数

参数名称描述
bootstrap.servers生产者连接集群所需的broker地址清单。可以设置1个或者多个,中间用逗号隔开。生产者从给定的broker里查找到其他broker信息。
key.serializer, value.serializer指定发送消息的key和value的序列化类型。要写全类名。(反射获取)
buffer.memoryRecordAccumulator缓冲区总大小,默认32m。
batch.size缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。
acks0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader数据落盘后应答.
-1(all):生产者发送过来的数据,Leader和isr队列里面的所有节点数据都落盘后应答。默认值是-1
max.in.flight.requests.per.connection允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。
Retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是100ms。
enable.idempotence是否开启幂等性,默认true,开启幂等性。
compression.type生产者发送的所有数据的压缩方式。默认是none,不压缩。支持压缩类型:none、gzip、snappy、lz4和zstd。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1137346.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue3 联合搜索

划到下方可以直接观看完整代码 目录 前言 用法 1. 数据绑定和事件处理 2. 列表渲染和条件过滤 解析 完整代码 前言 在现代的前端开发中,Vue.js 是一个非常流行的 JavaScript 框架,用于构建用户界面。Vue 提供了一种简洁的方式来构建交互式和动态的网…

顺序表的定义与实现(数据结构与算法)

一、顺序表的定义 1. 顺序表的定义 #define MaxSize 10 //定义最大长度 typedef struct{ ElemType data[MaxSize]; //用静态的“数组”存放数据元素int length; //顺序表的当前长度 …

【MATLAB源码-第58期】基于蛇优化算法(SO)和粒子群优化算法(PSO)的栅格地图路径规划最短路径和适应度曲线对比。

操作环境: MATLAB 2022a 1、算法描述 粒子群算法 (Particle Swarm Optimization, PSO) 1. 算法概述 粒子群算法是一种基于群体智能的优化算法,模拟鸟群觅食的行为。算法中的每个粒子代表问题的一个可能解,并且具有位置和速度两个属性。粒…

【计算机网络】什么是HTTPS?HTTPS为什么是安全的?

【面试经典题】 前言: HTTP最初的设计就是用于数据的共享和传输,并没有考虑到数据的安全性,如窃听风险,篡改风险和冒充风险。HTTPS是在 HTTP 的基础上引入了一个加密层。HTTPS通过数据加密,数据完整性检验和身份认证…

ES性能优化最佳实践- 检索性能提升30倍!

Elasticsearch是被广泛使用的搜索引擎技术,它的应用领域远不止搜索引擎,还包括日志分析、实时数据监控、内容推荐、电子商务平台、企业级搜索解决方案以及许多其他领域。其强大的全文搜索、实时索引、分布式性能和丰富的插件生态系统使其成为了许多不同行…

C++求欧拉角(eigen库中暴露的一些问题)

不同顺序欧拉角转旋转矩阵对照公式 eigen库求欧拉角公式 分别试验eigen库自带的matrix.eulerAngles()函数,与根据上述公式推导的两种方法求欧拉角 eigen库求得欧拉角的范围一定是 x − > r o l l x->roll x−>roll方向在 [ 0 , π ] [0,π] [0,π]之间&am…

argparse模块介绍

argparse是一个Python模块:命令行选项、参数和子命令解析器。argparse 模块可以让人轻松编写用户友好的命令行接口。程序定义了所需的参数,而 argparse 将找出如何从 sys.argv (命令行)中解析这些参数。argparse 模块还会自动生成…

实时数仓-Hologres介绍与架构

本文是向大家介绍Hologres是一款实时HSAP产品,隶属阿里自研大数据品牌MaxCompute,兼容 PostgreSQL 生态、支持MaxCompute数据直接查询,支持实时写入实时查询,实时离线联邦分析,低成本、高时效、快速构筑企业实时数据仓…

笔记本电脑识别不了刻录机,由于设备驱动程序的前一个实例仍在内存中,windows 无法加载这个硬件的设备驱动程序。 (代码 38)

目录 1 问题2 解决 1 问题 笔记本电脑识别不了刻录机,由于设备驱动程序的前一个实例仍在内存中,windows 无法加载这个硬件的设备驱动程序。 (代码 38) 笔记本电脑插入刻录机,一直识别不了; 右边点击属性 这里展示 由于设备驱动…

powerdesigner逆向mysql与pg生成er图

一、逆向mysql数据库 官网下载mysql的ODBC(开放数据库互连) 选择自己对应版本,我的power designer是32位的,需要选择32的ODBC进行下载,不然power designer会监测不到。 双击exe文件,安装ODBC 这个比较简单,就不阐述了…

nu1l-死亡ping命令攻略

book-nu1l docker 虚拟化技术 把传统的虚拟机堪称容器 依赖镜像启动容器,镜像可以理解为模板克隆的虚拟机,删除容器,对镜像没有影响 镜像在云端 本地要使用某个镜像,根据地址,直接从云端拉取 基本命令 # 查看系统…

day14_集合

今日内容 零、 复习昨日 一、集合框架体系 二、Collection 三、泛型 四、迭代 五、List(ArrayList、LinkedList) 零、 复习 throw和throws什么区别 throwthrows位置方法里面方法签名上怎么写throw 异常对象throws异常类名(多个)作用真正抛出异常对象声明抛出的异常类型 运行时…

【开发篇】一、处理函数:定时器与定时服务

文章目录 1、基本处理函数2、定时器和定时服务3、KeyedProcessFunction下演示定时器4、process重获取当前watermark 前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图: 在Flink…

【Linux08-进程信号】信号的一生……

今天,带来Linux下进程信号的讲解。文中不足错漏之处望请斧正! 是什么 生活中的信号 例子: 红绿灯来电铃声老妈倒数321叫我起床外卖小哥叫我下楼拿外卖 理解: 过程:收到信号 → 分析信号 → 产生信号对应的行为信号不一定会被立即处理&…

Flutter——最详细(Scaffold)使用教程

Scaffold简介 相当于界面的主体(类似于安卓最外层PhoneWindow),组件的展示都必须依附于它。 使用场景: 每一个界面都是脚手架,通过它来进行架构实现,优美的布局效果。 属性作用appBar顶部的标题栏body显示整…

Qwt QwtPlotMarker标记类详解

1.概述 QwtPlotMarker类是Qwt绘图库中用于在图表上绘制标记的类。标记可以是垂直或水平线、直线、文本或箭头等。它可用于标记某个特定的位置、绘制参考线或注释信息。 以下是类继承关系图: 2.常用方法 设置标记的坐标。传入x和y坐标值,标记将被放置在…

红黑树--讲解以及详细实现过程

目录 红黑树理解红黑树概念红黑树性质 红黑树实现红黑树图解基础结构实现插入实现 红黑树理解 红黑树概念 红黑树,是一种二叉搜索树,但在每个结点上增加一个存储位表示结点的颜色,可以是Red或Black。 通过对任何一条从根到叶子的路径上各个…

六零导航页SQL注入漏洞复现(CVE-2023-45951)

0x01 产品简介 LyLme Spage(六零导航页)是中国六零(LyLme)开源的一个导航页面。致力于简洁高效无广告的上网导航和搜索入口,支持后台添加链接、自定义搜索引擎,沉淀最具价值链接,全站无商业推广…

前端(二十五)——前端实现 OCR 图文识别的详细步骤与示例代码

😁博主:小猫娃来啦 😁文章核心:前端实现 OCR 图文识别的详细步骤与示例代码 文章目录 简介确定使用的 OCR API创建前端界面添加图像上传功能发送识别请求和处理识别结果完善代码添加注释结论附录 简介 在现代应用程序中&#xff…

如何选择向量数据库|Weaviate Cloud v.s. Zilliz Cloud

随着以 Milvus 为代表的向量数据库在 AI 产业界越来越受欢迎,传统数据库和检索系统也开始在快速集成专门的向量检索插件方面展开角逐。 例如 Weaviate 推出开源向量数据库,凭借其易用、开发者友好、上手快速、API 文档齐全等特点脱颖而出。同样&#xff…