十分钟,了解Kafka的Sender线程

news2024/9/17 8:29:58

〇、前言

在上两篇文章《连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka》和《一文了解Kafka的消息收集器RecordAccumulate》中,我们介绍了Main ThreadRecordAccumulate的工作原理,那么在本篇文章中,我们继续介绍第三部分内容:Sender线程

在介绍原理之前,大家再重温一下Producer端的整体架构,图示如下所示:

这个图看不懂没有关系,我们会在介绍Producer端原理时一一介绍每个部分的含义及其所复杂的功能。

一、Sender线程

除了我们前面曾经介绍过的Main Thread主线程之外,在KafkaProducer中还启动了一个Sender线程,那么,本节我们就来针对Sender线程进行解析,Send线程启动代码如下所示:

Sender线程负责从RecordAccumulate中获取缓存消息,在获取了以Map<TopicPartition,Deque<ProducerBatch>> 的对应关系存储的消息缓存之后,会通过主题信息分区信息创建TopicPartition实例对象tp,然后再以此为key,获取ProducerBatch的双向队列,如下所示:

然后,会进一步将映射中key的类型从TopicPartition转换为NodeId,即:Map<NodeId,Deque<ProducerBatch>> 的对应关系。这是由于当Producer端最终发送消息的时候,关注的是向哪个Broker节点发送消息,而并不是关心哪个主题分区,所以此处需要做一个从应用逻辑层面向网络I/O层面的转换。如下所示:

当最后要进行消息发送的时候,还要再次进行封装,封装出用于消息发送的ProduceRequest,此时的对应关系就变成了NodeId和ProduceRequest了,代码如下所示:

请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求。代码如下所示:

如果我们开启幂等配置,则会创建TransactionManager实例对象,如下所示:

当TransactionManager实例对象不为null的时候,Sender现成则会执行下图红框内的代码逻辑,那么在黄框中,我们会看到调用了maybeSendAndPollTransactionalRequest()方法,代码如下所示:

在选择目标节点的时候,如果coordinatorType为空,则会调用client的leastLoadeNode(...)方法,通过该方法可以获得所有Node中负载最小的那一个。那怎么来判断Node节点的负载呢? 在上面的内容中,我们其实提到了,发出去的消息也会保存到InFlightRequests中,它其实是一个缓存的作用,主要用来缓存已经发出去但是还没有接收到响应确认的消息请求。因此,我们可以通过它来判断那些Node节点未接到响应的请求数量最少,则就是负载最小的。通过这种方式,可以保证负载尽可能的平均,而不会造成某一个节点的重度阻塞从而影响整个消息的发送性能。代码如下所示:

除了上面之外,我们在元数据更新的时候,也是通过leastLoadeNode(...)方法来获得负载最低的节点的,那么,在Kafka中什么是元数据呢? 当我们发送消息的时候,消息发送到哪个分区,这个分区对应的Broker的地址和端口,已经这个是否配置了Kafka集群,集群中都包含哪些节点等等,都是保存在元数据信息中的。那么,在什么步骤触发了元数据更新呢? 我们可以把视野转向Sender的runOnce()方法上,在下图红框处,我们调用了client的poll方法,如下是该方法的源码:

此处的client对应的是NetworkClient的实例对象,在该类的poll(...)方法中,执行了更新元数据的逻辑,即下图红框所示:

maybeUpdate方法中,我们看到了熟悉的一段代码Node node = leastLoadedNode(now); 此处就是获得负载最低节点的地方。那么获得到了这个node之后,就可以调用maybeUpdate(now, node)来尝试更新元数据信息了:

maybeUpdate(now, node)方法中我们可以看到,更新元数据也是采用发送消息的方式,即:向这个负载最低的Node发送MetadataRequest请求来获取具体的元数据信息。在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的处理逻辑一样的:

今天的文章内容就这些了:

写作不易,笔者几个小时甚至数天完成的一篇文章,只愿换来您几秒钟的 点赞 & 分享 。

更多技术干货,欢迎大家关注公众号“爪哇缪斯” ~ \(^o^)/ ~ 「干货分享,每天更新」

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/988831.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python之Xlwings操作excel

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 一、xlwings简介二、安装与使用1.安装2.使用3.xlwings结构说明 二、xlwings对App常见的操作App基础操作工作簿的基础操作工作表的基础操作工作表其他操作 读取单元格…

Android 自定义View之圆形进度条

很多场景下都用到这种进度条&#xff0c;有的还带动画效果&#xff0c; 今天我也来写一个。 写之前先拆解下它的组成&#xff1a; 底层圆形上层弧形中间文字 那我们要做的就是&#xff1a; 绘制底层圆形&#xff1b;在同位置绘制上层弧形&#xff0c;但颜色不同&#xff…

【0908练习】shell脚本使用expr截取网址

题目&#xff1a; 终端输入网址&#xff0c;如&#xff1a;www.hqyj.com&#xff0c; 要求&#xff1a;截取网址每个部分&#xff0c;并放入数组中&#xff0c;不能使用cut&#xff0c;使用expr解决 #!/bin/bash read -p "请输入一个网址" net lenexpr length $net …

协程 VS 线程,Kotlin技术精讲

协程(coroutines)是一种并发设计模式&#xff0c;您可以在Android 平台上使用它来简化异步执行的代码。协程是在版本 1.3 中添加到 Kotlin 的&#xff0c;它基于来自其他语言的既定概念。 在 Android 上&#xff0c;协程有助于管理长时间运行的任务&#xff0c;如果管理不当&a…

无脑014——linux系统,制作coco(json)格式数据集,使用mmdetection训练自己的数据集

电脑&#xff0c;linux&#xff0c;RTX 3090 cuda 11.2 1.制作coco&#xff08;json&#xff09;格式数据集 这里我们使用的标注软件是&#xff1a;labelimg 选择voc格式进行标注&#xff0c;标注之后使用以下代码&#xff0c;把voc格式转换成coco格式&#xff0c;注意最后的路…

机房运维管理软件不知道用哪个好?

云顷网络还原系统V7.0是一款专业的机房运维管理产品&#xff0c;基于局域网络环境&#xff0c;针对中高端机房中电脑运维管理需求所设计开发的。网络还原系统软件通过全面的规划和设计&#xff0c;遵从机房部署、使用到维护阶段化使用方式&#xff0c;通过极速网络同传/增量对拷…

TypeScript的函数

ts与js函数区别 tsjs传参需要规定类型无类型箭头函数箭头函数ES6函数类型无函数类型必填和可选参数所有参数都是可选的能设置默认参数能设置默认参数剩余参数剩余参数 函数重载 函数重载 注释 TypeScript 允许您指定函数的输入和输出值的类型。 输入值注释 // 传参必须为字…

如何理解图神经网络的傅里叶变换和图卷积

图神经网络&#xff08;GNN&#xff09;代表了一类强大的深度神经网络架构。在一个日益互联的世界里&#xff0c;因为信息的联通性&#xff0c;大部分的信息可以被建模为图。例如&#xff0c;化合物中的原子是节点&#xff0c;它们之间的键是边。图神经网络的美妙之处在于它们能…

【设计模式】二、UML 类图概述

文章目录 常见含义含义依赖关系&#xff08;Dependence&#xff09;泛化关系&#xff08;Generalization&#xff09;实现关系&#xff08;Implementation&#xff09;关联关系&#xff08;Association&#xff09;聚合关系&#xff08;Aggregation&#xff09;组合关系&#x…

【赠书活动】AI 时代,程序员无需焦虑

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

9.8day59

503. 下一个更大元素 II - 力扣&#xff08;LeetCode&#xff09; 知识点&#xff1a;单调栈 42. 接雨水 - 力扣&#xff08;LeetCode&#xff09;

初学python爬虫学习笔记——爬取网页中小说标题

初学python爬虫学习笔记——爬取网页中小说标题 一、要爬取的网站小说如下图 二、打开网页的“检查”&#xff0c;查看html页面 发现每个标题是列表下的一个个超链接&#xff0c;从183.html到869.html 可以使用for循环依次得到&#xff1a; x range(183,600) for i in x:pr…

NoSQL数据库入门

一、NoSQL数据库概述 NoSQL 是一种不同于关系数据库的数据库管理系统设计方式&#xff0c;是对非关系型数据库的统称&#xff0c;它所采用的数据模型并非传统关系数据库的关系模型&#xff0c;而是类似键/值、列族、文档等非关系模型。NoSQL 数据库没有固定的表结构&#xff0c…

W25Q16_Flash手册总结

文章目录 前言一、概述&特点1、概述W25Q16BV1、特点2、引脚说明3、内部结构示意图4、操作指令5、操作示例时序图1、写入启用指令&#xff1a;Write Enable&#xff08;06h&#xff09;2、读取状态寄存器指令&#xff1a;Read Status Register-1&#xff08;05h&#xff09;…

【PowerShell代码】清除掉文件中的非英文字母

如果你尝试从网上下载一些带有非ASCII的文件时候&#xff0c;你在这台机器上会发现没有问题&#xff0c;但是将文件传递到其他的地方或者其他电脑你会发现存在比较大的问题&#xff0c;我如何才能将这些文件中的非英文字母去掉呢&#xff1f; 如何才能将文件中的这些非英文字母…

无涯教程-JavaScript - IMLOG2函数

描述 IMLOG2函数以x yi或x yj文本格式返回复数的以2为底的对数。可以从自然对数计算复数的以2为底的对数,如下所示- $$\log_2(x yi)(log_2e)\ln(x yi)$$ 语法 IMLOG2 (inumber)争论 Argument描述Required/OptionalInumberA complex number for which you want the bas…

为什么零基础选择语言首选python

在众多编程语言中&#xff0c;似乎已经没有什么能够阻挡Python的步伐。本月Python又是第一名&#xff0c;市场份额达到了13.42%&#xff0c;在2023年&#xff0c;Python已经连续7个月蝉联榜首&#xff0c;遥遥领先于其他对手。 每个月榜单发布后&#xff0c;都有小伙伴会好奇&…

Blender中的高级边缘控制和纹理映射

推荐&#xff1a;使用 NSDT场景编辑器 快速搭建3D应用场景 步骤 1 首先&#xff0c;您需要创建一组无阴影材质&#xff0c;每种材质具有不同的颜色&#xff0c;确保您有足够的材质来覆盖模型&#xff0c;而不会有相同的颜色相互重叠。然后&#xff0c;切换到“着色”&#xff…

即拼七人拼团系统开发模式是怎么盈利赚钱的?

即拼七人拼团是市场上最近比较火爆的一款商业模式&#xff0c;它结合了二二复制和拼团两种模式玩法&#xff0c;不仅能让消费者从中获利&#xff0c;还能让平台快速获流裂变&#xff0c;对平台起盘初期和发展中期具有很强的推广能力。那么这个模式是怎么盈利赚钱的呢&#xff1…

使用内网负载机(Linux)执行Jmeter性能测试

一、背景 ​ 在我们工作中有时候会需要使用客户提供的内网负载机进行性能测试&#xff0c;一般在什么情况下我们需要要求客户提供内网负载机进行性能测试呢&#xff1f; 遇到公网环境下性能测试达到了带宽瓶颈。那么这时&#xff0c;我们就需要考虑在内网环境负载机下来执行我们…