Kafka消费者组重平衡(二)

news2024/10/6 23:19:34

文章目录

    • 概要
    • 重平衡通知机制
    • 消费组组状态
    • 消费端重平衡流程
    • Broker端重平衡流程

概要

上一篇Kafka消费者组重平衡主要介绍了重平衡相关的概念,本篇主要梳理重平衡发生的流程。

为了更好地观察,数据准备如下:
kafka版本:kafka_2.13-3.2.1
控制台创建topic (2个分区1个副本):
bin/kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 1 --partitions 2 --topic test-rebalance

本地启动两个SpringBoot项目实例,代码如下

@KafkaListener(topics = "test-rebalance", groupId = "test-group")
public void rebalanceConsumer(ConsumerRecord<String, String> recordInfo) {
    int partition = recordInfo.partition();
    System.out.println("partition:" + partition + " value:" + recordInfo.value());
}

重平衡通知机制

Kafka Java 消费者需要定期地发送心跳请求(Heartbeat Request)到 Broker 端的协调者,以表明它还存活着。在 Kafka 0.10.1.0 版本之前,发送心跳请求是在消费者主线程完成的,也就是调用 KafkaConsumer.poll 方法的那个线程。
这样的设计存在弊端,一旦消息处理消耗了过长的时间,心跳请求将无法及时发到协调者那里,导致协调者“错误地”认为该消费者已“死”。自 0.10.1.0 版本开始,Kafka引入了一个单独的心跳线程来专门执行心跳请求发送,避免了这个问题。

重平衡的通知机制正是通过心跳线程来完成的。当协调者决定开启新一轮重平衡后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。当消费者实例发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就能立马知道重平衡又开始了,这就是重平衡的通知机制。

消费组组状态

Kafka 为消费者组定义了 5 种状态,它们分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。那么,这 5 种状态的含义如下:

状态含义
Empty组内没有任何成员
Dead组内没有任何成员,但组的元数据已经在协调者端删除
PreparingRebalance消费组组准备开启重平衡,此时所有成员都要重新请求加入消费者组
CompletingRebalance消费者组下所有成员已经加入,各个成员正在等待分配方案
Stable消费者组的稳定状态,该状态表明重平衡已经完成,组内各成员都能够正常消费数据了

以下是各个状态的流转:

在这里插入图片描述

一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于PreparingRebalance 状态等待成员加入,之后变更到CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。

创建一个topic并逐步启动两个消费者实例:
服务端日志:

INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group test-group in Empty state. Created a new member id consumer-1-a6c11c9f-ec26-4e66-adeb-832f699f1247 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Preparing to rebalance group test-group in state PreparingRebalance with old generation 3 (__consumer_offsets-12) (reason: Adding new member consumer-1-a6c11c9f-ec26-4e66-adeb-832f699f1247 with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Stabilized group test-group generation 4 (__consumer_offsets-12) with 1 members (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Assignment received from leader consumer-1-a6c11c9f-ec26-4e66-adeb-832f699f1247 for group test-group for generation 4. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

客户端一启动日志

o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Successfully joined group with generation 4
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting newly assigned partitions: test-rebalance-1, test-rebalance-0
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting offset for partition test-rebalance-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=node1:9092 (id: 0 rack: null), epoch=0}}
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting offset for partition test-rebalance-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=node1:9092 (id: 0 rack: null), epoch=0}}
o.s.k.l.KafkaMessageListenerContainer    : test-group: partitions assigned: [test-rebalance-1, test-rebalance-0]

有新成员加入后broker日志

INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group test-group in Stable state. Created a new member id consumer-1-97eba1d4-7f5c-4d78-b979-ba6ab9c82395 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Preparing to rebalance group test-group in state PreparingRebalance with old generation 4 (__consumer_offsets-12) (reason: Adding new member consumer-1-97eba1d4-7f5c-4d78-b979-ba6ab9c82395 with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Stabilized group test-group generation 5 (__consumer_offsets-12) with 2 members (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Assignment received from leader consumer-1-a6c11c9f-ec26-4e66-adeb-832f699f1247 for group test-group for generation 5. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

接下来启动客户端二

客户端一日志

 o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Successfully joined group with generation 5
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting newly assigned partitions: test-rebalance-0
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting offset for partition test-rebalance-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=node1:9092 (id: 0 rack: null), epoch=0}}
o.s.k.l.KafkaMessageListenerContainer    : test-group: partitions assigned: [test-rebalance-0]

客户端二日志


o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Attempt to heartbeat failed since group is rebalancing
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Revoking previously assigned partitions [test-rebalance-1, test-rebalance-0]
o.s.k.l.KafkaMessageListenerContainer    : test-group: partitions revoked: [test-rebalance-1, test-rebalance-0]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Successfully joined group with generation 5
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting newly assigned partitions: test-rebalance-1
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting offset for partition test-rebalance-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, o.s.k.l.KafkaMessageListenerContainer    : test-group: partitions assigned: [test-rebalance-1]

当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。

消费端重平衡流程

重平衡的完整流程需要消费者端和协调者组件(什么是协调者)共同参与才能完成。下面先梳理消费者端的重平衡流程。主要分为3各阶段。

第一阶段:确定组协调器
关于如何确定组协调器,参考Kafka消费者重平衡(一)

第二阶段:JoinGroup

在此阶段的消费者会向Group Coordinator 发送 JoinGroupRequest 请求,并处理响应。然后GroupCoordinator 从一个consumer group中选择第一个(通常情况下)加入group的consumer作为leader(消费组协调器),把consumer group情况发送给这个leader,接着这个leader会负责制定分区方案。

在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步的主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。

在这里插入图片描述

第三阶段:SyncGroup

待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段

在这里插入图片描述
SyncGroup 请求的主要目的,就是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。

Broker端重平衡流程

下面只要从几个常见的场景梳理Broker端重平衡的流程。

新成员加入

当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。
具体流程如下:

在这里插入图片描述

组成员主动离组

消费者实例所在线程或进程调用 close() 方法时,就会主动通知协调者要退出组。以下时具体的流程

在这里插入图片描述

组成员崩溃离组

崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms 控制。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1011615.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

9.12 C++作业

实现一个图形类&#xff08;Shape&#xff09;&#xff0c;包含受保护成员属性&#xff1a;周长、面积&#xff0c; 公共成员函数&#xff1a;特殊成员函数书写 定义一个圆形类&#xff08;Circle&#xff09;&#xff0c;继承自图形类&#xff0c;包含私有属性&#xff1a;半…

模拟信号电压或电流信号转变频器频率传感器信号隔离变送器0-5V/0-10V/0-20mA/4-20mA转0-5KHz/0-10KHz/1-5KHz

主要特性: 精度等级&#xff1a;0.1 级、0.2 级。产品出厂前已检验校正&#xff0c;用户可以直接使用输 入 &#xff1a;0-5V/0-10V/1-5V,0-10mA/0-20mA/4-20mA 等输出信号&#xff1a;0-5KHz/0-10KHz/1-5KHz 等标准信号辅助电源&#xff1a;5V、9V、12V、15V 或 24V 直流单电…

OpenCV(四十三):Shi-Tomas角点检测

1.Shi-Tomas角点检测原理 Shi-Tomasi&#xff08;也称为Good Features to Track&#xff09;角点检测算法是一种改进的角点检测方法&#xff0c;它基于Harris角点检测算法&#xff0c;并针对一些不足进行了改进。 与Harris角点检测不同&#xff0c;Shi-Tomasi使用了更简化的角点…

PDF怎么合并?这几个方法收藏起来吧

PDF文件是一种非常常见的文档格式&#xff0c;它具有跨平台、易于阅读和打印等优点&#xff0c;因此在生活和工作中得到了广泛的应用。当我们需要将多个PDF文件合并成一个文件时&#xff0c;我们可以采用以下几种方法。 方法一&#xff1a;使用PDF转换工具 我们在电脑上打开迅…

Spring Boot 中的 @CacheEvict 注解使用

Spring Boot 中的 CacheEvict 注解 在 Spring Boot 中&#xff0c;缓存是提高应用性能的重要手段。为了更好地管理缓存&#xff0c;Spring Boot 提供了一系列的缓存注解&#xff0c;其中 CacheEvict 注解用于清空缓存。 本文将介绍 CacheEvict 注解的含义、原理以及如何使用。…

华为云云耀云服务器L实例评测-基于华为云服务器的测试及简单配置

引言 云计算已经成为现代企业和个人的重要组成部分。在云计算市场上&#xff0c;华为云一直以来都以其出色的性能和服务质量而闻名。周末的时候&#xff0c;利用华为云云耀云服务器搭建了一个基于hexo的个人博客&#xff0c;我用的是2核2G的3M带宽的配置&#xff0c;访问起来挺…

自动化搭建(Jenkins_Docker)1

简介 目前为了搭建Android自动化构建&#xff0c; 包含自动打包、代码审查工具以及自动化测试的串联。如下图&#xff1a; 我拿到的是一个2T的一个服务器&#xff0c;需要在上面搭建整个环境&#xff0c; 整体分解如下&#xff1a; Java安装Jenkins安装和配置Gerrit 和 rep…

GIS前端—地图标注

GIS前端—地图标注 地图标注原理图片标注文本标注矢量图形标注 地图标注原理 地图标注是将空间位置信息点与地图关联&#xff0c;通过图标、窗口等形式把点相关的信息展现在地图上。地图标注是WebGIS应用的核心功能之一&#xff0c;在大众应用中十分常见。基于地图标注可以为用…

使用代码产生标准的软件架构图之C4

在软件开发的流程中&#xff0c; 软件架构图是重要的软件文档&#xff0c;软件架构图包含有多个层级&#xff0c;最常见的&#xff0c;有软件的整体架构和组件、类等图。 整体架构可能使用PPT或者一些绘图工具Visio来绘制组件、类等图有UML的标准&#xff0c; 也可以使用Visio…

【Android知识笔记】进程通信(二)

一、Binder对象是如何跨进程传递的 binder传递有哪些方式?binder在传递过程中是怎么存储的?binder对象序列化和反序列化过程?binder对象传递过程中驱动层做了什么?总结 Binder 对象的跨进程传递主要靠 Parcel 的两个关键方法 writeStrongBinder() 和

【数据结构】—堆排序以及TOP-K问题究极详解(含C语言实现)

食用指南&#xff1a;本文在有C基础的情况下食用更佳 &#x1f525;这就不得不推荐此专栏了&#xff1a;C语言 ♈️今日夜电波&#xff1a;ルミネセンス—今泉愛夏 1:01 ━━━━━━️&#x1f49f;──────── 5:05 …

[刷题记录]牛客面试笔刷TOP101(一)

牛客笔试算法必刷TOP101系列,每日更新中~(主要是记录自己的刷题,所以描述的可能不是很清楚 但如果刚好能帮助到你就更好了) 后续后头复习的时候,记得是看正解啊,别对着错的例子傻傻看了... 目录 1.合并有序链表2023.9.3 2.链表是否有环2023.9.4 3.判断链表中环的入口点 …

学Python的漫画漫步进阶 -- 第三步

学Python的漫画漫步进阶 -- 第三步 三、数字类型的数据3.1 Python中的数据类型3.2 整数类型3.3 浮点类型3.4 复数类型3.5 布尔类型3.6 数字类型的相互转换3.6.1 隐式类型的转换3.6.2 显式类型的转换 3.7 练一练3.8 数字类型的总结全部16步完成后 &#xff0c;后续就是介绍项目实…

走进甄云,探寻SRM独角兽成功背后的故事

随着科技的快速发展和全球商业环境的不断变化&#xff0c;中国企业对灵活性、创新性、全球化和效率的需求是迫切的&#xff0c;数字化转型已经成为企业生存和发展的关键因素&#xff0c;对企业具有重要意义&#xff0c;是组织生存和发展的必然趋势。数字化转型涉及整个组织、多…

PMP-项目规划过程组的重要性

一、什么是项目规划过程组 规划过程组包括明确项目全部范围、定义和优化目标&#xff0c;并为实现目标制定行动方案的一组过程。规划过程组中的过程制定项目管理计划的组成部分&#xff0c;以及用于执行项目的项目文件。取决于项目本身的性质&#xff0c;可能需要通过多轮反馈来…

片上网络(1)概述

前言 NoC&#xff1a;On-Chip Networks&#xff0c;片上网络。 由于多核乃至众核时代的到来&#xff0c;用于连接它们的可扩展、低延迟、大带宽的通信结构变得至关重要。 在核心较少时&#xff0c;总线Bus和矩阵/交叉开关Crossbar是主要的互联结构。总线可以提供较低的传输延迟…

云原生Kubernetes:pod基础与配置

目录 一、理论 1.pod 2.pod容器分类 3.镜像拉取策略 4.pod 的重启策略 二、实验 1.Pod容器的分类 2.镜像拉取策略 三、问题 1.apiVersion 报错 2.pod v1版本资源未注册 3.格式错误 4.取行显示指定pod信息 四、总结 一、理论 1.pod (1) 概念 Pod是kubernetes中…

pgzrun 拼图游戏制作过程详解(4,5)

4. 将小拼图位置随机打乱 建立swap_Square(i&#xff0c;j)坐标互换函数 将Gird[i]和Gird[j] 中的小拼图信息进行互换 def swap_Square(i,j): # 两个拼图的位置互换temp_posGird[i].posGird[i].posGird[j].posGird[j].postemp_pos 导入随机数模块 import random 随机抽取…

ruoyi-nbcio移植过程中的一些问题记录

1、打包去掉测试出现 Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.22.2:test 错误 在pom.xml里增加下面 去掉测试 <!--添加配置跳过测试--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId…

2023.05.27系统分析师考试案例分析及解析

案例分析真题1 阅读以下关于软件系统分析与建模的叙述&#xff0c;在纸上回答问题1至3. 说明: 某软件公司拟开发一套汽车租赁系统&#xff0c;科学安全和方便的管理租赁公司的各项业务&#xff0c;提高公司效率&#xff0c;提升利率。注册用户在使用系统镜像车辆预约时需执行…