Kafka-消费者-KafkaConsumer分析-Heartbeat

news2025/1/13 10:05:23

在前面分析Rebalance操作的原理时介绍到,消费者定期向服务端的GroupCoordinator发送HeartbeatRequest来确定彼此在线。

下面就来详细分析KafkaConsumer中Heartbeat的相关实现。

首先了解一下心跳请求和响应的格式。HeartbeatRequest的消息体格式比较简单,依次包含group_id(String)、group_generation_id(int)、member_id(String)三个字段。HeartbeatResponse消息体只包含一个short类型的error_code。

HeartbeatTask是一个实现DelayedTask接口的定时任务,负责定时发送HeartbeatRequest并处理其响应,此逻辑在其run方法中实现,下面就来分析HeartbeatTask.run()方法的具体流程,如图所示。

在这里插入图片描述

  1. 首先检查是否需要发送HeartbeatRequest,条件有多个:
  • GroupCoordinator已确定且已连接;
  • 不处于正在等待Partition分配结果的状态;
  • 之前的HeartbeatRequest请求正常收到响应且没有过期。
    如果不符合条件,则不再执行HeartbeatTask,等待后续调用reset方法重启HeartbeatTask任务。
  1. 调用Heartbeat.sessionTimeoutExpired方法,检测HeartbeatResponse是否超时。若超时,则认为GroupCoordinator宕机,调用coordinatorDead方法清空其unsent集合中对应的请求队列并将这些请求标记为异常后结束,将coordinator字段设置为null,表示将重新选择GroupCoordinator。同时还会停止HeartbeatTask的执行。

  2. 检测HeartbeatTask是否到期,如果不到期则更新其到期时间,将HeartbeatTask对象重新添加到DelayedTaskQueue中,等待其到期后执行;如果已到期则继续后面的步骤,发送HeartbeatRequest请求。

  3. 更新最近一次发送HeartbeatRequest请求的时间,将requestinFlight设置为true,表示有未响应的HeartbeatRequest请求,防止重复发送。

  4. 创建HeartbeatRequest请求,并调用ConsumerNetworkClient.send方法,将请求放入unsent集合中缓存并返回RequestFuture。在后面的ConsumerNetworkClient.poll()操作中会将其发送给GroupCoordinator。

  5. 在RequestFuture对象上添加RequestFutureListener。

下面介绍一下HeartbeatResponse相关的处理。首先需要注意上面介绍的sendHeartbeatRequest()方法,它使用HeartbeatCompletionHandler将client.send方法返回的RequestFuture适配成RequestFuture后返回。:

在这里插入图片描述
CoordinatorResponseHandler是一个抽象类,其中有pasre和handle()两个抽象方法,parse()方法对ClientResponse进行解析,得到指定类型的响应;handle()方法对解析后的响应进行处理。

CoordinatorResponseHandler实现了RequestFuture抽象类的onSuccess方法和onFailure方法。

处理HeartbeatResponse的相关处理流程如图所示。

在这里插入图片描述
RequestFuture和RequestFutureListener只是为了实现适配器的功能,并没有实际处理逻辑。

当ClientResponse传递到HeartbeatCompletionHandler处时,会通过parse方法解析成HeartbeatResponse,然后进入handle方法处理。

在HeartbeatCompletionHandler.handle方法中,判断HeartbeatResponse中是否包含错误码,如果不包含,则调用RequestFuture的complete(null)方法,将HeartbeatResponse成功的事件传播下去;

反之,针对不同类型错误码分类处理,并调用raise()方法设置对应异常。

例如,错误码是ILLEGAL_GENERATION,表示HeartbeatRequest中携带的generationld过期,GroupCoordinator已经开始新的一轮Rebalance操作,则将rejoinNeeded设置为true,这会重新发送JoinGroupRequest请求尝试加入Consumer Group,也会导致HeartbeatTask任务停止。

如果错误码是UNKNOWN_MEMBER_ID,表示GroupCoordinator识别不了此Consumer,则清空memberld,尝试重新加入Consumer Group。
在这里插入图片描述
HeartbeatCompletionHandler.handle()方法中会调用RequestFuture的complete方法或raise方法,这两个方法中没有处理逻辑,但是会触发其上的RequestFutureListener(在HeartbeatTaskrun)方法的步骤6中注册),此监听器会将requestlnFlight设置为false,表示所有HeartbeatRequest都已经完成,并将HeartbeatTask重新放入定时任务队列,等待下一次到期执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1401389.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CTFhub-bak文件

CTFhub-Web-信息泄露-备份文件下载-bak文件 题目信息 解题过程 看到提示说和index.php有关,在url后面加index.php.bak,跳转到http://challenge-7a4da2076cfabae6.sandbox.ctfhub.com:10800/index.php.bak网址,即: 跳转到下载页…

安装ddddocr中遇到的问题

1、需要先安装: pip3 install pyinstaller --no-use-pep517 pip install scikit-build pip install setuptools pip install pyinstaller pip install pillow 重要是的是保证一个python 环境,多个python环境会导致各种问题。并且保证python>3.8…

万辰集团十年经营首度亏损,泡沫式增长是喜是忧?

2023年,线下消费回暖复苏,零售领域更是繁花似锦。以量贩零食为代表的新业态蓬勃发展,引得众多资本和企业纷至沓来。 这一背景下,作为新兴量贩零食的典型代表之一,福建万辰生物科技集团股份有限公司(以下简…

JavaEE中的监听器的作用和工作原理

在JavaEE(Java Platform, Enterprise Edition)中,监听器(Listener)是一种重要的组件,用于监听和响应Web应用程序中的事件。监听器的作用是在特定的事件发生时执行一些自定义的逻辑。常见的监听器包括Servle…

vue3中Fragment特性的一个bug,需要留意的注意事项

vue3中的Fragment 模版碎片特性是什么&#xff0c;简单的理解就是template模板代码不在像vue2中那样必须在根节点在包裹一层节点了。 vue2写法 <template><div><h1>标题</h1><p>正文内容</p></div> </template>vue3写法 &l…

医学图像的数据增强技术 --- 切割-拼接数据增强(CS-DA)

医学图像的新型数据增强技术 CS-DA 核心思想自然图像和医学图像之间的关键差异CS-DA 步骤确定增强后的数据数量 代码复现 CS-DA 核心思想 论文链接&#xff1a;https://arxiv.org/ftp/arxiv/papers/2210/2210.09099.pdf 大多数用于医学分割的数据增强技术最初是在自然图像上开…

滑动窗口求连续数列最大值(固定窗长和非固定窗长)

非固定窗长 思路&#xff1a; sum 0&#xff0c;max num[0], 依次遍历一个一个往前加&#xff0c;sum sum num[i], 如果sum[i] 小&#xff0c;则替换sum 如果sum > max&#xff0c; 则max sum; int maxSubArray(int* nums, int numsSize){int max nums[0];int sum …

圆的参数方程是如何推导的?

圆的参数方程是如何推导的? 1. 圆的三种参数表示2. 三角函数万能公式3. 回到圆的参数方程1. 圆的三种参数表示 已知圆的第一种参数方程为: x 2 + y 2 = r x^2+y^2=r x2+y2=r   圆的图像如下: 通过上图,不难理解,圆的参数方程还可以用三角函数表示,也就是第二种参数表…

从零开始配置pwn环境:sublime配置并解决pwn脚本报错问题

1.sublime安装 Download - Sublime Text ──(holyeyes㉿kali2023)-[~] └─$ sudo dpkg -i sublime-text_build-4169_amd64.deb [sudo] password for holyeyes: Selecting previously unselected package sublime-text. (Reading database ... 409163 files and directori…

大创项目推荐 深度学习驾驶行为状态检测系统(疲劳 抽烟 喝水 玩手机) - opencv python

文章目录 1 前言1 课题背景2 相关技术2.1 Dlib人脸识别库2.2 疲劳检测算法2.3 YOLOV5算法 3 效果展示3.1 眨眼3.2 打哈欠3.3 使用手机检测3.4 抽烟检测3.5 喝水检测 4 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 基于深度学习的驾…

(超详细)9-YOLOV5改进-添加EffectiveSEModule注意力机制

1、在yolov5/models下面新建一个EffectiveSEModule.py文件&#xff0c;在里面放入下面的代码 代码如下&#xff1a; import torch from torch import nn as nn from timm.models.layers.create_act import create_act_layerclass EffectiveSEModule(nn.Module):def __init__…

使用DALL-E 3模型模拟AI女友的一天 |【人人都是算法专家】

Rocky Ding 公众号&#xff1a;WeThinkIn 知乎&#xff1a;Rocky Ding 写在前面 【人人都是算法专家】栏目专注于分享AI行业中业务/竞赛/研究/产品维度的思考与感悟。欢迎大家一起交流学习&#x1f4aa; 大家好&#xff0c;我是Rocky。 我们都知道DALL-E 3是和Stable Diffusio…

Leetcode的AC指南 —— 栈与队列:232.用栈实现队列

摘要&#xff1a; **Leetcode的AC指南 —— 栈与队列&#xff1a;232.用栈实现队列 **。题目介绍&#xff1a;请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作&#xff08;push、pop、peek、empty&#xff09;&#xff1a; 实现 MyQueue 类&#xff1a;…

【Linux系统编程二十九】基于信号量的环形队列生产消费模型

【Linux系统编程二十九】基于信号量的环形队列生产消费模型 一.信号量1.P操作2.V操作 二.环形队列三.单生产单消费场景1.信号量维持生产消费之间互斥同步 四.多生产多消费场景1.加锁维持生产生产&#xff0c;消费消费互斥 五.总结 一.信号量 当共享资源被当成整体使用时&#…

Java-NIO篇章(4)——Reactor反应器模式

前面已经讲过了Java-NIO中的三大核心组件Selector、Channel、Buffer&#xff0c;现在组件我们回了&#xff0c;但是如何实现一个超级高并发的socket网络通信程序呢&#xff1f;假设&#xff0c;我们只有一台内存为32G的Intel-i710八核的机器&#xff0c;如何实现同时2万个客户端…

PID笔记

Improving the Beginner’s PID 参考资料 Improving the Beginner’s PID – Introduction The Beginner’s PID 以下是每个人第一次学习的PID方程&#xff1a; 这导致几乎每个人都编写了以下PID控制器&#xff1a; /*working variables*/ unsigned long lastTime; double…

Mongo集群入门

一、前言 MongoDB 有三种集群架构模式&#xff0c;分别为主从复制&#xff08;Master-Slaver&#xff09;、副本集&#xff08;Replica Set&#xff09;和分片&#xff08;Sharding&#xff09;模式。 Master-Slaver 是一种主从复制的模式&#xff0c;目前已经不推荐使用。 Re…

vue.js js 雪花算法ID生成 vue.js之snowFlake算法

随着前端业务越来越复杂&#xff0c;自定义表单数据量比较大&#xff0c;每条数据的id生成则至关重要。想到前期IOS中实现的雪花算法ID&#xff0c;照着其实现JS版本&#xff0c;供大家学习参考。 一、库的建立引入 在你项目中创建一个snowFlake.js的文件&#xff1a;拷贝以下…

《Windows核心编程》若干知识点应用实战分享

目录 1、进程的虚拟内存分区与小于0x10000的小地址内存区 1.1、进程的虚拟内存分区 1.2、小于0x10000的小地址内存区 2、保存线程上下文的CONTEXT结构体 3、从汇编代码角度去理解多线程运行过程的典型实例 4、调用TerminateThread强制结束线程会导致线程中的资源没有释放…

大数据关联规则挖掘:Apriori算法的深度探讨

文章目录 大数据关联规则挖掘&#xff1a;Apriori算法的深度探讨一、简介什么是关联规则挖掘&#xff1f;什么是频繁项集&#xff1f;什么是支持度与置信度&#xff1f;Apriori算法的重要性应用场景 二、理论基础项和项集支持度&#xff08;Support&#xff09;置信度&#xff…