【Kafka】Kafka再平衡机制及相关参数

news2024/11/25 2:44:42

背景

Kafka作为一款基于发布订阅模式的消息队列,生产者将消息发送到Kafka集群(Brokers)中,消费者(Consumer Group )拉取消息进行消费,实现了异步机制。Kafka中,消费者通常以消费者组的方式进行消费,消费特点为:

  • 每个分区只能被一个消费组中的一个消费者所消费。
  • 消费组中一个消费者可以消费多个分区。
  • 多个消费组,每个消费组都可以消费topic中的所有数据,且消费位移之间互不影响。

一、Kafka的再平衡机制

在Kafka中,如果消费者数量、分区数变更或者消费者订阅的topic发生变化,也就需要再进行消费者消费分区的重新分配,这也就是所谓的再平衡。

1.1 再平衡定义

再平衡是指的是Consumer Group 下的 Consumer 所订阅的Topic发生变化时 发生的一种分区重分配机制。

也就是说,再平衡也就是一种协议,它规定了如何让消费组下的所有消费者来分配 Topic 中的每一个分区。

举个栗子:一个 Topic 有 100 个分区,一个消费者组内有有 20 个消费者,在协调者的控制下让消费者组内的每一个消费者分配到 5 个分区,这个分区分配的过程就是再平衡。

1.2 再平衡触发条件

一般来说,触发Kafka再平衡的条件一般是以下三种:

  • 主题分区发生改变,Kafka 目前只支持分区增加,当出现分区数增加的时候就会触发再平衡。

  • Consumer Group 中Consumer 个数发生变化(新增或者减少),导致其所消费的分区需要分配到组内其他的Consumer 上。这里的减少有很大可能是被动的,就是某个消费者出现崩溃掉线了。

  • Consumer 所订阅的Topic发生了新增分区的行为(Kafka目前只支持新增分区),那么新增的分区就会分配给当前的Consumer ,此时就会触发再平衡。

  • Consumer 订阅的topic发生变化,比如订阅的Topic采用的是正则表达式的形式。如 test-* 此时如果有新建了一个topic test-user,那么这个Topic的所有分区也是会自动分配给当前的Consumer 的,此时就会发生再平衡。

简洁一点,触发再平衡的条件就是:

  • Consumer Group 成员数变更。
  • Consumer Group 订阅的主题的分区数发生变更。
  • Consumer Group 的订阅主题数发生变更。

再平衡有什么危害呢,首先我们要知道,再平衡的过程中,消费者是无法从 Kafka集群中消费消息的,这对 Kafka的 系统吞吐量(TPS)影响极大,而如果 Kafka 集群内节点较多,那么再平衡可能比较耗时。数分钟到数小时都有可能,而这段时间,Kafka 是处于不可用状态。所以在实际环境中,应该尽量避免。

1.3 再平衡通知机制

那么发生再平衡的时候Kafka集群是如何通知到消费者的呢,答案就是通过消费者与Kafka集群之间的心跳机制。Kafka 消费者需要定期地发送心跳请求(Heartbeat Request) 到 Broker 端的协调者(Coordinator ),以证明消费者还活着。
   在 Kafka 0.10.0.1版本之前,发送心跳请求是在消费者主线程中完成的。这样就有很多问题,最大的问题在于,消息处理逻辑也是在这个线程中完后的。因此,一旦消息处理消耗很长的时间,心跳请求将无法及时发送到协调者那里,使协调者误以为该消费者死掉。
   Kafka 0.10.0.1 版本之后,Kafka 就提供了一个专门的线程去发送心跳,避免了这个问题。

再平衡就是通过这个心跳线程去通知其他消费者触发再平衡机制。当协调者开启新一轮的再平衡之后,它会将"REBALANCE_IN_PROGREESS"封装到心跳线程的响应信息中,返回给消费者实例,当消费者收到响应信息中含有 “REBALANCE_IN_PROGREESS” 信息,就知道再平衡开始了,这就是再平衡的通知机制。

二、再平衡流程

从再平衡的定义和触发再平衡条件中我们可以看出,再平衡主要是由Kafka集群和Kafka消费端一起完成的,更精确的来说,是Kafka的Broker端的Coordinator 和Consumer端一起完成的。

2.1 消费端再平衡流程

在消费者端,再平衡主要分为两个步骤:

  • 重新加入消费者组中。
  • 等待领导消费者(Leader Consumer) 分配方案。

这两个步骤分别对应两类特定的请求:JoinGroup 请求和 SyncGroup 请求。

当组内成员加入组时,消费者会向协调者发送 JoinGroup 请求,在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的 JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。

通常情况下,第一个发送 JoinGroup 请求的成员会自动成为领导者领导消费者的任务是收集所有成员的订阅信息,然后根据这些信息,指定具体的分区消费分配方案。

选出领导者之后,协调者会把消费者的订阅信息封装在 JoinGroup 请求的响应中,然后发送给领导者,由领导者统一做出分区消费分配方案后,在进行下一步,发送 SyncGroup 请求。
在这里插入图片描述
在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。当然,其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式发给所有成员,这样组内的成员都知道自己该消费哪些分区的数据了。
在这里插入图片描述

因此,JoinGroup 请求的主要作用是将组成员的订阅信息发送给领导者消费者,待领导者制定好分配方案后,再平衡流程进入到 SyncGroup 请求阶段。而SyncGroup 请求的主要目的就是让协调者把领导者的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入 Stable 状态,即开始正常的消费工作。

2.2 Broker端的再平衡流程

Broker端的再平衡主要是Coordinator 处理再平衡的流程。从触发再平衡的条件来看,与Coordinator 相关的主要是新成员加入消费者组、消费者组成员主动离开、消费者组成员崩溃离组、组成员提交位移。

再平衡一旦开启,Broker 端的协调者组件就要开始忙了,主要涉及到控制消费者组的状态流转。当前 Kafka 设计了一套消费者组状态机(State Machine),来帮助协调者完成整个再平衡过程。严格来说,这套状态机属于非常底层的设计,Kafka 官网并没有提及过。目前,Kafka 为消费者组定义了 5 中状态,分别是:EmptyDeadPreparingRebalanceCompletingReblanceStable。每种状态对应的含义如下:

状态含义
Empty组内没有任何成员,但消费者可能存在已提交的位移数据,而且这些位移尚未过期。
Dead同样是在组内没有成员,但组内元数据信息已经在协调者端被移除。协调者组件保存着当前向它注册过的所有组信息,所谓的元数据信息类似于这个注册信息。
PreparingRebalance消费者组开启再平衡,此时所有成员都要重新加入消费者组。
CompletingRebalance消费者组小所有成员已经加入,各个成员已在等待分配方案。该状态在老一点版本中称为AwaitingSync,它和CompletingReblance是等价的。
Stable消费者组的稳定状态。该状态表名再平衡已经完成,组内各成员能够正常消费数据了。

一个消费者组最开始是 Empty状态,当再平衡开启后,它会被置为 PreparingRebalance 状态等待成员加入,之后变更为 CompletingReblance 状态等待分配方案,最后流转为 Stable,完成再平衡过程。
在这里插入图片描述

2.2.1 新成员加入消费者组

新成员加入消费者组导致触发再平衡主要指的当消费者组处于 Stable 状态后,有新成员加入。如果对全新启动一个消费者组,Kafka 是有一些自己的优化,流程会有些许的不同。我们这里要讨论的是,消费者组稳定之后有新成员加入的情形。

当协调者收到新的 JoinkGroup 请求后,它会通过心跳响应的方式通知组内现有的所有成员,强制它们开启新一轮的再平衡。具体的过程和之前的客户端再平衡流程是一样的。现在,用一张时序图说明协调者一端是如何处理新成员入组的。

在这里插入图片描述

2.2.2 消费者组成员主动离组

消费者实例所在线程或进程调用 Close() 方法主动通知协调者它要退出。这个场景就涉及到第三类请求:LeaveGroup 请求。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员。
在这里插入图片描述

2.2.3 消费者组成员崩溃离组

崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为主动发起的离组,协调者能马上感知并处理。但是崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms控制的。也就是说,Kafka 一般不会超过 session.timeout.ms 就能感知到这个崩溃。处理崩溃离组的流程如下:
在这里插入图片描述

2.2.4 协调者对组内成员移交位移处理

正常情况下,每个组成员都会定期汇报位移给协调者。当再平衡开启时,协调者会给予成员一端缓冲时间,要求每个成员必须在这段时间内快速上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup请求发送。
在这里插入图片描述

三、再平衡相关参数

  • session.timeout.ms:该参数是 Coordinator 检测消费者失败的时间,即在这段时间内客户端是否跟 Coordinator 保持心跳,如果该参数设置较小,可以更早发现消费者崩溃的信息,从而更快的开启再平衡,避免消费滞后,但同时这也会频繁的再平衡,需要根据实际业务来衡量。
  • max.poll.interval.ms:该参数表示消费者处理消息逻辑的最大时间,对于某些业务来说,处理消息可能需要很长时间,比如需要 1 分钟,那么该参数就需要设置成大于 1 分钟的值,否则就会被 Coordinator 剔除消息组然后再平衡。
  • heartbeat.interval.ms:该参数是消费端与Coordinator的心跳时间,该参数跟 session.timeout.ms 紧密相关,前面也说过,只要在 session.timeout.ms时间内与 Coordinator 保持心跳,就不会被 Coordinator 剔除,那么心跳间隔时间就是 session.timeout.ms,因此,该参数值必须小于 session.timeout.ms,以保持 session.timeout.ms时间内有心跳。每个consumer 都会根据 heartbeat.interval.ms参数指定的时间周期性地向Coordinator 发送 hearbeat,Coordinator 会给各个consumer响应,若发生了 rebalance,各个consumer收到的响应中会包含 REBALANCE_IN_PROGRESS标识,这样各个consumer就知道已经发生了rebalance,同时Coordinator 也知道了各个consumer的存活情况。

3.1 heartbeat.interval.ms 与 session.timeout.ms 的对比

session.timeout.ms是指:Coordinator检测consumer发生崩溃所需的时间。一个consumer group里面的某个consumer挂掉了,最长需要 session.timeout.ms 秒检测出来。

举个栗子:session.timeout.ms=10heartbeat.interval.ms=3

session.timeout.ms指定了一个阈值—10秒,在这个阈值内如果Coordinator未收到Consumer的任何消息,那Coordinator就认为Consumer挂了。而heartbeat.interval.ms主要是告诉Consumer要每3秒给Coordinator发一个心跳包,heartbeat.interval.ms越小,发的心跳包越多,它是会影响发TCP包的数量的。

如果Coordinator在一个heartbeat.interval.ms周期内未收到Consumer的心跳,就把该Consumer移出group,这有点说不过去。就好像Consumer犯了一个小错,就一棍子把它打死了。事实上,有可能网络延时,有可能Consumer出现了一次长时间GC,影响了心跳包的到达,说不定下一个heartbeat就正常了。

因此,heartbeat.interval.ms肯定是要小于session.timeout.ms的,如果Consumer Group发生了rebalance,通过心跳包里面的REBALANCE_IN_PROGRESS,Consumer就能及时知道发生了rebalance,从而更新Consumer可消费的分区。而如果超过了session.timeout.ms,Coordinator都认为Consumer挂了,那也当然不用把 rebalance信息告诉该Consumer了。

3.2 session.timeout.ms 和 max.poll.interval.ms

在kafka0.10.1之后的版本中,将session.timeout.msmax.poll.interval.ms 解耦了。

也就是说:创建Kafka消费者实例后,消费者不停地执行consumer.poll拉取消息这个过程中,其实背后是有2个线程的,即一个Kafka Consumer实例包含2个线程:一个是heartbeat 线程,另一个是processing线程

processing线程可理解为调用consumer.poll方法执行消息处理逻辑的线程,而heartbeat线程是一个后台线程,对程序员是"隐藏不见"的。如果消息处理逻辑很复杂,比如说需要处理5min,那么 max.poll.interval.ms可设置成比5min大一点的值。而heartbeat 线程则和上面提到的参数 heartbeat.interval.ms有关,heartbeat线程 每隔heartbeat.interval.ms向Coordinator发送一个心跳包,证明自己还活着。只要 heartbeat线程 在 session.timeout.ms时间内向 Coordinator发送过心跳包,那么Coordinator就认为当前的Kafka Consumer是活着的。

在kafka0.10.1之前,发送心跳包和消息处理逻辑这2个过程是耦合在一起的。

如果一条消息处理时长要5min,而session.timeout.ms=3000ms,那么等 Kafka Consumer处理完消息,Coordinator早就将Consumer 移出group了,因为只有一个线程,在消息处理过程中就无法向Coordinator发送心跳包,超过3000ms未发送心跳包,Coordinator就将该Consumer移出group了。

而将二者分开,一个processing线程负责执行消息处理逻辑,一个heartbeat线程负责发送心跳包,那么,就算一条消息需要处理5min,只要heartbeat线程在session.timeout.ms向Coordinator发送了心跳包,那Consumer可以继续处理消息,而不用担心被移出group了。另一个好处是:如果Consumer出了问题,那么在session.timeout.ms内就能检测出来,而不用等到max.poll.interval.ms 时长后才能检测出来。

TODO:后续根据sarama源码来看Kafka的再平衡过程。

参考

1、kafka 中参数:session.timeout.ms 和 heartbeat.interval.ms的区别
2、sarama 源码解析–Kafka的重平衡
3、kafka学习(五):消费者分区策略(再平衡机制)
4、Kafka【再平衡】

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/994842.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Python从入门到进阶】35、selenium基本语法学习

接上篇《34、selenium基本概念及安装流程》 上一篇我们介绍了selenium技术的基础概念以及安装和调用的流程,本篇我们来学习selenium的基本语法,包括元素定位以及访问元素信息的操作。 一、元素定位 Selenium元素定位是指通过特定的方法在网页中准确定位…

SpringMvc--文件上传下载

一.什么是SpringMvc文件上传下载 二.文件上传 编写hpjyController类 编写upload.jsp 建立一个储存图片的文件夹 ​编辑 编写PageController来处理页面跳转 编写工具类PropertiesUtil 编写resource.properties类 编写list.jsp 测试结果 三.文件下载 编写hpjyControll…

Redis集群3.2.11离线安装详细版本(使用Ruby)

1.安装软件准备 1.Redis版本下载 Index of /releases/http://download.redis.io/releases/ 1.2gcc环境准备 GCC(GNU Compiler Collection,GNU编译器套件)是一套用于编译程序代码的开源编译器工具集。它的主要用途是将高级编程语言(如C、C++、Fortran等)编写的源代码转换…

MyBatis之分页查询:MyBatis PageHelper

MyBatis之分页查询:MyBatis PageHelper 简介 MyBatis,作为目前流行的ORM框架,大大方便了日常开发。而对于分页查询,虽然可以通过SQL的limit语句实现,但是比较繁琐。而MyBatis PageHelper的出现,则解决了这…

如何在postman中实现自动化测试?

这里简单演示在postman中怎样实现自动化测试(不涉及到用户登录的token认证) 导入测试用例文件,测试web接口 postman使用流程:创建collection文件夹,在该文件夹中创建post,get请求;其中传入的参…

Keil MDK-ARM 软件的部分常用快捷键如下

F7:编译。F8: 下载。F9:添加/取消断点。Ctrl F5:调试。Tab:将选中的内容整体右移。Shift Tab:将选中的内容整体左移。Home:将光标移至行首。End:将光标移至行末。Ctrl >:光标…

【SpringMVC】注解、参数传递、返回值和页面跳转的关键步骤

目录 引言 一、常用注解 1.1.RequestMapping 1.2.RequestParam 1.3.RequestBody 1.4.RequestHeader 1.5.PathVariable 二、参数传递 2.1.基础类型String 2.2.复杂类型 2.3.RequestParam 2.4.PathVariable 2.5.RequestBody 2.6.RequestHeader 三、返回值 3.1.vo…

大数据-玩转数据-Flink状态编程(中)

一、键控状态 键控状态是根据输入数据流中定义的键(key)来维护和访问的。 Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理…

Jmeter压测监控体系搭建Docker+Influxdb+Grafana

章节目录: 一、背景介绍1.1 概述1.2 拓扑图 二、云服务器设置三、Docker3.1 概述3.2 搭建流程3.3 安装验证3.4 配置docker镜像加速3.5 取消sudo运行(可选操作) 四、InfluxDB4.1 镜像拉取4.2 运行数据库4.3 创建存储 jmeter 数据的库 五、Grafana5.1 镜像拉取5.2 关联…

Day_13 > 指针进阶(2)

目录 1.函数指针数组 2.指向函数指针数组的指针 3.回调函数 qsort()函数 代码示例 void* 4.结束 今天我们在进阶指针的基础上,学习进阶指针的第二部分 1.函数指针数组 首先我们回顾一下指针数组 char* arr[5]://字符指针数组 - 数组 - 存放的是字符指针 in…

mysql的索引结构

索引概述 索引( index )是帮助 MySQL 高效获取数据的数据结构 ( 有序 ) 。在数据之外,数据库系统还维护着满足特定查找算法的数据结构,这些数据结构以某种方式引用(指向)数据, 这样就可以在这些…

Spring与OAuth2:实现第三方认证和授权的最佳实践

🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🦄 博客首页——🐅🐾猫头虎的博客🎐 🐳 《面试题大全专栏》 🦕 文章图文…

python基本类型

数值类型 整型 int_val 1145143 print(int_val)python中的整型是大数类型。 一些其他函数 val 30 vlen val.bit_length() # 转换为二进制的长度 v_8 oct(val) print(v_8) # 将十进制转为八进制 v_16 hex(val) # 将十进制转为十六进制 v_2 bin(val) # 将十进制转为二进…

二、环境配置,项目运行 —— TinyWebServer

环境配置,项目运行 —— TinyWebServer 一、前言 上一期已经介绍过这个项目的基本结构,不懂得可以点开主页查找。 写代码前。一般的步骤就是,先把别人的代码下载下来运行。一、一方面看看最终效果是否是自己想要的,二、掌握项目…

redis分布式锁详解

一、基本分布式锁实现 1、案例(扣减库存) RequestMapping("reduceStock")public String reduceStock() {String lockKey "lock:product_101";String clientId UUID.randomUUID().toString();// 过期时间要和设置key成为一条命令…

linux下shell脚本实现wordpress搭建

wordpress_auto_install.sh #!/bin/bashuser$(whoami)function wordpress_auto_install () { if [ $user "root" ];thenecho "前提:调整系统配置,如关闭selinux、firewall等!"sed -i s/SELINUXenforcing/SELINUXdis…

光线投射之伪3d

光线投射是一种在 2D 地图中创建 3D 透视的渲染技术。当计算机速度较慢时,不可能实时运行真正的 3D 引擎,光线投射是第一个解决方案。光线投射可以非常快,因为只需对屏幕的每条垂直线进行计算。 光线投射的基本思想如下:地图是一…

rtthread下基于spi device架构MCP25625驱动

1.CAN驱动架构 由于采用了RTT的spi device架构,不能再随心所遇的编写CAN驱动 了,之前内核虽然采用了RTT内核,但是驱动并没有严格严格按RTT推荐的架构来做,这次不同了,上次是因为4个MCP25625挂在了4路独立的SPI总线上&…

【图论】Floyd

算法提高课笔记) 文章目录 例题牛的旅行题意思路代码 排序题意思路代码 观光之旅题意思路代码 例题 牛的旅行 原题链接 农民John的农场里有很多牧区,有的路径连接一些特定的牧区。 一片所有连通的牧区称为一个牧场。 但是就目前而言,你…

程序依赖相关知识点(PDG,SDG)

什么叫可达性 变量v的定义d:对变量v的赋值语句称为变量v的定义 变量v的使用:在某个表达式中引用变量v的值 当变量v被再次赋值时,上一次赋值对变量v的定义d就被kill掉了 如果定义d到点p之间存在一条路径,且在路径中定义d没有被…