Kafka中时间轮算法的使用

news2024/11/24 2:10:52

简介: Kafka的心跳处理机制竟然用到了时间轮算法?

Broker端与客户端的心跳在Kafka中非常的重要,因为一旦在一个心跳过期周期内(默认10s),Broker端的消费组组协调器(GroupCoordinator)会把消费者从消费组中移除,从而触发重平衡。在2.4.x以下其版本中,消费组一旦进入重平衡状态,该消费组内所有消费者全部暂停消费,直到重平衡完成。

本文将来探讨Kafka的心跳机制的具体实现。本文的组织结构如下:

  • 源码解读Kafka心跳机制
  • Kafka心跳架构设计亮点(时间轮调度算法实现原理图)

温馨提示:如果大家对源码阅读不感兴趣,可以直接跳到本文的第二部分,用流程图、数据结构图阐述心跳的实现机制。

1、源码分析Kafka心跳机制


在介绍源码分析之前介绍笔直的一条源码分析经验:找准入口,了解调用链路。故笔者会先寻找归纳出Kafka心跳处理的所有入口。

1.1Kafka心跳入口总结


Kafka心跳包的处理流程如下图所示:

图的右边是kafka心跳在服务端的核心处理流程,而左边主要展示kafka中所有的心跳请求,根据上图得知Kafka触发心跳处理的主要请求分别如下:

  1. KafkaConsume主动发送心跳包 消费者会以3s的频率向服务端发送心跳包,服务端对应的入口为 KafkaApis的handleHeartbeatRequest方法。
  2. 消费者加入消费组 在消费端重平衡过程中,客户端主动向其组协调器发起Join_Group(加入消费组)时,组协调器会认为收到一个有效的心跳包,服务端对应的处理入口:KafkaApis的handleJoinGroup方法。
  3. 消费者获取队列负载结果 在重平衡的第二个阶段,消费组的Leader在计算出分区负载结果后会发给组协调器,消费组中的其他成员需要发生Sync_Group请求获取负载结果,组协调器同样认为收到了一个有效的心跳包。服务端对应的处理入口:KafkaApis的handleSyncGroupRequest。
  4. 消费者提交位点 消费者组协调器收到消费者提交位点请求,同样可以认定消费者是存活的。位点提交的处理入口:KafkaApis的handlerCommitOffsets方法。
  5. __consumers_offsets主题的ISR的Leader发生变化
    如果__consumers_offsets主题中的各个分区Leader发生变化,与特定分区的组协调器需要重新选举,与此组协调器相关的消费者将触发重平衡。

上述任何一种请求,都能表明消费端是存活的,故能有效阻止服务端将客户端端心跳设置为过期,进入下一个心跳检测周期。

上述各个入口,特别是__consumers_offsets的ISR对消费组的影响,后续会专门展开研究,现在我们将重心转移到服务端是如何处理一个心跳包的。

1.2 源码分析Kafka心跳处理机制


从上面的流程图可以得出,Kafka收到一个心跳包后的处理入口为GroupCoordinator的completeAndScheduleNextExpiration方法,核心代码如下图所示:

在介绍该方法之前首先介绍一个该方法的入参含义:

  • GroupMetadata group 消费组的元信息。
  • MemberMetadata member 消费者的元信息。
  • long timeoutMs 心跳超时时间,默认为10s,这个参数是由消费端的session.timeout.ms参数设置,默认为10s。

Step1:为消费组设置唯一标识:groupId + "-" + memberId构成。

Step2:将hearbeatSatisfied设置为true,表示该消费者收到一个有效的心跳包。

Step3:收到一个有效的心跳包,通知定时调度器停止本次的心跳过期检测。

Step4:构建一个DelayedHearbeat,进入下一个心跳检测周期。

接下来将分别对Step3、Step4展开详细介绍。

1.2.1 心跳检测正常处理逻辑


在收到一个心跳包时,尝试将本次检测设置成功,具体的实现由DelayedOperation的checkAndComplete方法,代码如下:

Kafka使用一个数据结构来存储需要跟踪的所有消费者,在这里成为Watch机制。

实现要点:根据key获取WatchList,然后从获取的WatchList中内部的ConcurrentMap中再按照Key获取对应与当前消费者对应的Watch。

  • 如果没有找到对应消费者的Watch,则直接返回,无需检测,说明已经成功检测。
  • 如果找到了对应消费者的Watch,则执行被watch的tryCompleteWatched方法。

Watch的数据结构如下:

接下来重点关注Watches的tryCompleteWatched方法,该方法的详细调用代码如下图所示:

这边先重点介绍一下组协调器判断一次成功的心跳检测的三个标准中满足一个即可(GroupCoordinator的tryCompleteHeartbeat方法):

  • 如果消费组的状态处于Dead
  • 如果消费组的状态为Pending(消费组在重平衡中)
  • hearbeatSatisfied为true,即收到了一个有效的心跳包。

上述代码的实现比较简单,这里就不一一罗列,其核心关键点如下:

  • 删除对应的Watch,表示一次心跳检测成功。
  • Watchs中存储的对象是DelayedOperation(Kafka延迟类型的父类)的子类,在心跳检测中具体为DelayedHeartbeat。
  • 最终执行DelayedOperation的是TimeTask的cancel方法(取消延迟任务),就是从延迟调度中移除自己,表示没有超时,结束本轮的超时检测,具体的存储结构,将在下文详介绍如果开启新一轮心跳检测时再详细讲解。

为了方便大家阅读源码,其主要的调用时序图如下:

1.2.2 开启下一轮心跳检测


1.2.2.1将延迟任务放入时间轮

在接受到一个新的心跳包首先用于清除上一轮设置的延迟任务,然后需要开启一个新的延迟任务,接下来我们将来具体看看Kafka如何开启新一轮心跳检测机制,**其本质上是Kafka的延迟(定时)实现原理。**代码入口如下图所示:

开启下一轮调度时首先将Member的heartbeatSatisfied设置为false。

其核心思想是创建一个心跳延迟任务DelayedHeartbeat,并对其检测是否完成或者添加Watch,启动心跳延迟或者等待下一个心跳包的到来。

其实看到这里,我们应该能得到一个关于Kafka心跳检测机制的实现思路:

  • 开启一个延迟任务,延迟检查时间为心跳过期时间,一旦延迟任务执行,则意外着心跳超时。
  • 当收到一个心跳包时,需要取消上一次设置的延迟任务。
  • 使用循环使用延迟任务,从而实现类似定时任务的效果。

接下来我们详细探讨一下DelayedOperationPurgatory的tryCompleteElseWatch方法,其代码如下图所示:

Step1:尝试调用DelayedHeartbeat的tryComplete方法,判断是否可以判断完成,这里主要是消费组是否为重平衡或者状态为Dead,如果上述情况不满足,则会返回false,因为在发起下一轮心跳包时已将heartbeatSatisfied设置为false。

Step2:为该消费者添加到Watch中,表示kafka需要跟踪该消费者的心跳。

Step3:再次调用maybeTryComplete方法,再尝试判断是否该心跳检测完成。

Step4:如果没有完成,则该任务延迟任务(DelayedHeartbeat)添加到定时调度中。

接下来将进入到Kafka心跳的核心机制,即延迟任务的实现机制

每一个待执行的延迟任务被封装在TimeTaskEntry中,这个一个典型的双链表,数据结构说明说明如下:

并持有一个关键字段:该定时任务的过期时间,等于系统当前时间+过期时间,在心跳检测场景中默认为10s。

继续跟踪SystemTimer的addTimerTaskEntry,其代码如下:

addTimerTaskEntry的核心实现如下:

  • 尝试将延迟任务添加到时间轮,如果已经过期,则提交到线程池,触发心跳过期的逻辑,提交到线程后,DelayedOperation的run方法会被调用,最终onExpiration方法被调用。

接下来重点谈一下往时间轮中添加任务的具体实现,核心代码见下图所示:

核心实现要点:

Step1:如果任务已经被取消或者已过期,返回false。如果返回false,则会触发定时任务过期。

Step2根据过期时间,放入到时间轮中指定的位置,时间轮的数据结构如下:

每一个格代表一个时间间隔,例如200ms,当前指针指向的格子,代表该格子中的所有任务过期,例如现在要要插入一个700ms过期,从当前指针的下一格开始算起,放入第4格中。

另外时间轮的总格子有限,则该时间轮能计算的最大时间是有限的,例如一个8格的时间轮,每一格代表200ms,则如果要在2s后过期,显然这个时间轮无法存储,通常的解决方案是采用多级时间轮,另外一级的时间轮,其时间精度会更粗。

结合上述关于时间轮的原理,再去看上述代码,就显得容易看懂了。

Step3:就是处理第一级时间轮无法满足过期时间,则放入到第二级时间轮中。

1.2.2.2 驱动时间轮

基于时间轮算法,除了数据按找时间轮到方向、触发时间存储在合适的刻度量,还需要驱动时间轮指针。Kafka中的驱动时间轮入口为:

具体实现代码如下:

具体就是将指针处的所有任务全部拉取出来,执行addTimeTaskEntry,其中过期的任务将提交到线程池触发延迟任务的执行。

上述代码看起来比较简单,就不一一介绍,为了方便大家读懂上面的代码,我们只需要了解一下kafka采用时间轮的实际存储数据结构,即能很容易理解上述代码:

其核心特点:环形队列就是一个数组,每一个元素在Kafka中对应一个桶,每一个桶存储一个TimerTaskList(链表),每次指针指向的TimerTaskList,将该链表中的元素代表的任务全部执行。

2、图解Kafka心跳架构设计


读起源码来说或许比较枯燥,接下来给出Kafka心跳处理的图解,重点是阐述Kafka时间轮算法的核心数据结构。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/531751.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

U盘数据丢失怎么恢复?优盘数据恢复,看这4个方法!

案例:U盘数据丢失怎么恢复? 【我的U盘里真的存了很多重要的视频和图片,但是前段时间U盘好像中病毒了,导致我很多的图片都丢失了!大家有什么好方法可以帮我恢复U盘中的重要数据吗?真的超级感谢!…

ZED使用指南(五)

六、其他 1、相机 (1)选择视频模式 左右视频帧同步,以并排格式作为单个未压缩视频帧流式传输。 在ZED Explorer或者使用API可以改变视频的分辨率和帧率。 (2)选择输出视图 ZED能以不同的格式输出图像,…

基于Java+SpringBoot+Mybaties+Layui 小区物业管理系统设计与实现

一.项目介绍 小区物业管理系统分为两类,一类是业主、一类是管理员 业主的功能有:小区首页、房屋购买、车位购买、公告通知、出入登记、投诉服务、报修服务、关于我们 管理员的功能有:楼宇管理、住房管理、车位管理、物业管理、收费项目管理、…

yolov8seg模型转onnx转ncnn

yolov8是yolo的最新版本,可做图像分类,目标检测,实例分割,姿态估计。 主页地址 这里测试一个分割模型。 模型如下 选yolov8n-seg模型,转成onnx,再转ncnn测试。 yolov8s-seg的ncnn版可以直接用这个 如果用…

Linux代码性能分析工具

一、gperftools 1、gperftools安装 docker下需要安装gperftools、ghostscript、graphviz gperftools: # 从github下载gperftools源码并解压 wget https://github.com/gperftools/gperftools/releases/download/gperftools-2.9.1/gperftools-2.9.1.tar.gz tar -x…

(1分钟速览)图像金字塔对比--光流法and fast角点

图像金字塔对比--光流法and fast角点 ​ fast角点以计算快为出名,但是其不具有尺度不变性和旋转不变性。针对尺度不变性,其说的是对于远处和近处相同的一个物体,可能近处能够检测出来有角点,但是放远了以后就不一定能检测出来角点…

良好的水生态环境对人居生活的帮助

水是生命之源,良好的水源生态,对于人们的生活健康提供很大的作用,在农村污水处理中,利用污水处理设备进行水源净化排放是很常用的手段。 良好的水环境对人居生活有很多的帮助,主要包括以下几个方面: 1.提供…

类和对象 - 中(C++)

目录 类的6个默认成员函数 一、构造函数 概念 特性 二、析构函数 特性 三、拷贝构造函数 概念 特性 四、赋值运算符重载 运算符重载 前置和后置重载​​​​​​​ 赋值运算符重载 五、& 取地址操作符重载 六、const & 取地址操作符重载 认识const成员 const & 运…

第七章 TensorFlow实现卷积神经网络--代码调试

注:实验工具为jupyter,该python环境为3.7并安装了1.14.0版本的tensorflow ,这是本人基于最新版的anaconda下新建的环境,至于在头歌环境平台及其他平台并未验证,而我们需要安装和更新包也需要在自己新建的环境命令行下才有效。 //这…

CloudCompare二次开发之如何通过PCL进行点云采样?

文章目录 0.引言1.CloudCompare界面设计采样(sample)按钮2.RandomSample随机下采样3.VoxelGrid体素下采样4.UniformSampling均匀采样5. MovingLeastSquares增采样6.SamplingSurfaceNormal非均匀体素采样 0.引言 因笔者课题涉及点云处理,需要通过PCL进行点云数据一系…

MySQL之数据目录

前言 本文章收录在MySQL性能优化原理实战专栏,点击此处查看更多优质内容。 本文摘录自 ▪ 小孩子4919《MySQL是怎样运行的:从根儿上理解MySQL》 我们知道像InnoDB、MyISAM这样的存储引擎都是把表存储在磁盘上的,而操作系统用来管理磁盘的那…

C. Classy Numbers(dfs构造 + 组合数学)

Problem - C - Codeforces 让我们称某个正整数为“优美的”,如果它的十进制表示中不超过3个数字不为零。例如,数字4、200000、10203是优美的,而数字4231、102306、7277420000则不是。 给定一个区间[L;R],请计算在此区间内有多少个…

Linux awk流编辑器

awk流编辑器 工作原理 逐行读取文本,默认以空格或tab键为分隔符进行分隔,将分隔所得的各个字段保存到内建变量中,并按模式或者条件执行编辑命令。 sed命令常用于一整行的处理,而awk比较倾向于将一行分成多个“字段”然后再进行处理…

pix2pix

Image-to-Image Translation Using Conditional Adversarial Networks 1: pix2pix也是CGAN的一种,pix2pix可以学习输入到输出的映射,同时也学习了损害函数去训练这个映射。这是一个大一统的方法去实现从标签合成图像,从边界图重建…

【FMC139】多通道采集--基于 VITA57.1 标准的4 路500MSPS/1GSPS 14 位AD 采集子卡模块(AD9680/HMC7044)

板卡概述 FMC139 是一款基于VITA57.1 标准规范的JESD204B 接口FMC 子卡模块,该模块可以实现4 路14-bit、500MSPS/1GSPSADC采集功能。该板卡ADC 器件采用ADI 公司的AD9680 芯片,全功率-3dB 模拟输入带宽可达2GHz。该ADC 与FPGA 的主机接口通过8通道的高速串行GTX 收…

AutoSAR PNC和ComM

文章目录 PNC和ComMPNC管理NM PDU结构及PNC信息位置如何理解节点关联PNCPNC状态管理 ComM 通道状态管理 PNC和ComM PNC 和 ComM层的Channel不是一个概念,ComM的Channel对应具体的物理总线数。 在ComM模块中,一个Channel可以对应一个PNC,也可…

AIGC产业研究报告2023——语言生成篇

易观:今年以来,随着人工智能技术不断实现突破迭代,生成式AI的话题多次成为热门,而人工智能内容生成(AIGC)的产业发展、市场反应与相应监管要求也受到了广泛关注。为了更好地探寻其在各行业落地应用的可行性…

java字类与继承

文章目录 一、Java子类与父类二、Java子类的继承性三、Java子类与对象四、Java成员变量的隐藏和方法重写五、Java super关键字六、Java final关键字七、Java对象的上转型对象八、Java继承与多态九、Java abstract类和abstract方法总结 一、Java子类与父类 继承就是一种由已有的…

电脑无法安装软件?不用慌,这样做可以快速解决!

案例:为什么我的电脑不能下载软件? 【在学习的过程中,需要下载一些软件工具。按照老师给的软件步骤,电脑还是无法安装软件,有小伙伴知道怎么回事吗?】 在使用电脑的过程中,很多小伙伴都会遇到…

【C++】unordered_map和unordered_set的模拟实现

一、改造HashTable 实现了哈希表&#xff08;开散列&#xff09;&#xff0c;再将其封装为unordered_map和unordered_set。 HashTable的改造与RBTree的改造大致相同&#xff1a; 改造节点 template<class T> struct HashNode {//std::pair<K, V> _kv;//HashNod…