kafka消费端之消费者协调器和组协调器

news2025/3/20 7:05:23

文章目录

  • 概述
  • 回顾历史
    • 老版本获取消费者变更
    • 老版本存在的问题
  • 消费者协调器和组协调器
    • 新版如何解决老版本问题
    • 再均衡过程
      • **第一阶段CFIND COORDINATOR**
      • **第二阶段(JOINGROUP)**
        • 选举消费组的lcader
        • 选举分区分配策略
      • 第三阶段(SYNC GROUP)
        • 消费组元数据信息
      • 第四阶段(HEARTBEAT)
  • 消费者协调器和控制器关系
    • 分区分配与消费者组管理
    • 选举与状态同步
    • 心跳检测与故障处理

概述

上一章我们讲解kafka消费端的分区分配策略时留下了两个问题,这章我们通过消费者协调器和组协调器继续详细回答那两个问题。其中我们会回归历史说明消费者协调器和组协调器出现的原因,最后会说下消费者协调器和控制器在工作上有何关联。

回顾历史

老版本获取消费者变更

消费者协调器和组协调器的概念是针对新版的消费者客户端而言的,Kafka建立之初并没有它们。旧版的消费者客户端是使用ZooKeeper的监听器(Watcher)来实现这些功能的。每个消费组王()在ZooKeeper中都维护了一个/consumers//ids路径,在此路径下使用临时节点记录录属于此消费组的消费者的唯一标识CconsumerIdString )consumerIdString由消费者启动时创建。消费者的唯一标识由aconsumer.id+主机名+时间截+UUID的部分信息构成,其中consumerid是旧版消费者客户端中的配置,相当于新版客户端中的client.id。

每个broker、主题和分区在ZooKeeper中也都对应一个路径:/brokers/ids/记录了 host、port及分配在此broker上的主题分区表;/brokers/topics/记录了每个分区的leader副本、ISR集合等信息。/brokers/topics//partitions//state记录了当前leader副本、leaderepoch等信息。如下图:
在这里插入图片描述

每个消费者在启动时都会在/consumers//ids和/brokers/ids路径上注册一个监听器。当/consumers//ids路径下的子节点发生变化时,表示消费组中的消费者发生了变化;当/brokers/ids路径下的子节点发生变化时,表示broker出现了增减。这样通过ZooKeeper所提供的Watcher,每个消费者就可以监听消费组和Kafka集群的状态了。

老版本存在的问题

这种方式下每个消费者对ZooKeeper E的相关路径分别进行监听,当触发再均衡操作时,一个消费组下的所有消费者会同时进行再均衡操作,而消费者之间并不知道彼此操作的结果,这样可能导致Kafka工作在一个不正确的状态。与此同时,这种严重依赖于ZooKeeper集群的做法还有两个比较严重的问题。

  • (1)羊群效应(HerdEffect):所谓的羊群效应是指ZooKeeper中一个被监听的节点变化大量的Watcher通知被发送到客户端,导致在通知期间的其他操作延迟,也有可能发生类似死锁的情况。
  • (2)脑裂问题(SplitBrain):消费者进行再均衡操作时每个消费者都与ZooKeeper进行通信以判断消费者或broker变化的情况,由于ZooKeeper本身的特性,可能导致在同一时刻各个消费者获取的状态不一致,这样会导致异常问题发生。

消费者协调器和组协调器

新版如何解决老版本问题

新版的消费者客户端对此进行了重新设计,将全部消费组分成多个子集,每个消费组的子集在服务端对应一个GroupCoordinator对其进行管理,GroupCoordinator是Kafka服务端中用于管理消费组的组件。而消费者客户端中的ConsumerCoordinator组件负责与GroupCoordinator进行交互,也就是发送心跳,加入group请求、提交位移等请求。

再均衡过程

ConsumerCoordinatorr与GroupCoordinator之间最重要的职责就是负责执行消费者再均衡的操作,包括前面提及的分区分配的工作也是在再均衡期间完成的。就目前而言,一共有如下几种情形会触发再均衡的操作:

  • 有新的消费者加入消费组。
  • 有消费者容机下线。消费者并不一定需要真正下线,例如遇到长时间的GC、网络延退导致消费者长时间未向GroupCoordinator发送心跳等情况时,GroupCoordinator会认为消费者已经下线。
  • 有消费者主动退出消费组(发送LeaveGroupRequest请求)。比如客户端调用了
    unsubscribleO方法取消对某些主题的订阅。
  • 消费组所对应的GroupCoorinator节点发生了变更。
  • 消费组内所订阅的任一主题或者主题的分区数量发生变化。

当有消费者加入消费组时,消费者、消费组及组协调器之间会经历一下几个阶段。

第一阶段CFIND COORDINATOR

消费者需要确定它所属的消费组对应的GroupCoordinator所在的broker,并创建与该broker相互通信的网络连接。如果消费者已经保存了与消费组对应的(GroupCoordinator节点的信息,并且与它之间的网络连接是正常的,那么就可以进入第二阶段。否则,就需要向集群中的某个节点发送FindCoordinatorRequest请求来查找对应的GroupCoordinator,这里的“某个节点”并非是集群中的任意节点,而是负载最小的节点。

Kafka 在收到 FindCoordinatorRequest请求之后,会根据coordinator_key(也就是groupId)查找对应的GroupCoordinator节点,如果找到对应的GroupCoordinator则会返回其相对应的nodeid、host和port信息。具体查找GroupCoordinator的方式是先根据消费组groupId的哈希值计算_consumer_offsets中的分区编号。找到对应的_consumer_offsets中的分区之后,再寻找此分区leader副本所在的broker节点,该broker节点即为这个groupId所对应的GroupCoordinator节点。消费者groupId最终的分区分配方案及组内消费者所提交的消费位移信息都会发送给此分区leader副本所在的broker节点,让此broker节点既扮演GroupCoordinator的角色,又扮演保存分区分配方案和组内消费者位移的角色,这样可以省去很多不必要的中间轮转所带来的开销。

第二阶段(JOINGROUP)

在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。

JoinGroupRequest中的group_protocols域为数组类型,其中可以囊括多个分区分配策略,这个主要取决于消费者客户端参数partition.assignment.strategy的配置。如果配置了多种策略,那么JoinGroupRequest中就会包含多个protocol name和protocol metadata。

如果是原有的消费者重新加入消费组,那么在真正发送JoinGroupRequest请求之前还要执行一些准备工作:

  • (1)如果消费端参数enable.auto.commit设置为tue(默认值也为tue),即开启自动提交位移功能,那么在请求加入消费组之前需要向GroupCoordinator提交消费位移。这个过程是阻塞执行的,要么成功提交消费位移,要么超时。

  • (2)如果消费者添加了自定义的再均衡监听器(ConsumerRebalanceListener),那么此时会调用onPartitionsRevokedO方法在重新加入消费组之前实施自定义的规则逻辑,比如清除一些状态,或者提交消费位移等。

  • (3)因为是重新加入消费组,之前与GroupCoordinator节点之间的心跳检测也就不需要了,所以在成功地重新加入消费组之前需要禁止心跳检测的运作。

消费者在发送JoinGroupRequest请求之后会阻塞等待Kafka服务端的响应。服务端在收到JoinGroupRequest请求后会交由GroupCoordinator来进行处理。GroupCoordinator首先会对JoinGroupRequest请求做合法性校验,比如group_id是否为空、当前broker节点是否是请求的消费者组所对应的组协调器、rebalance_timeout的值是否在合理的范围之内。如果消费者是第一次请求加入消费组,那么JoinGroupRequest请求中的memberid值为null,即没有它自身的唯一标志,此时组协调器负责为此消费者生成一个memberid。这个生成的算法很
简单,具体如以下伪代码所示。

String memberId = clientId +"-"+UUID.randomuuID().toString()

其中clientld为消费者客户端的clientd,对应请求头中的clientid。由此可见消费者的memberid由clientId和UUID用“”字符拼接而成。

选举消费组的lcader

GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader,这个选举的算法也很简单,分两种情况分析。如果消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader。如果某一时刻leader消费者由于某些原因退出了消费组,那么会重新选举一个新的leader,这个重新选举leader的过程又更“随意”了。在GroupCoordinator中消费者的信息是以HashMap的形式存储的,其中key为消费者的memberid,value是消费者相关的元数据信息。leaderId表示leader
消费者的memberid,它的取值为HashMap中的第一个键值对的key,这种选举的方式基本上和随机无异。总体上来说,消费组的leader选举过程是很随意的。

选举分区分配策略

每个消费者都可以设置自己的分区分配策略,对消费组而言需要从各个消费者呈报上来的各个分配策略中选举一个彼此都“信服”的策略来进行整体上的分区分配。这个分区分配的选举并非由leader消费者决定,而是根据消费组内的各个消费者投票来决定的。这里所说的“根据组内的各个消费者投票来决定”不是指GroupCoordinator还要再与各个消费者进行进一步交互,而是根据各个消费者呈报的分配策略来实施。最终选举的分配策略基本上可以看作被各个
消费者支持的最多的策略,具体的选举过程如下:
(1)收集各个消费者支持的所有分配策略,组成候选集candidates。
(2)4每个消费者从候选集candidates中找出第一个自身支持的策略,为这个策略投上一票。
(3)计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。

如果有消费者并不支持选出的分配策略,那么就会报出异常IllegalArgumentException:Memberdoesnotsupportprotocol。所以请不要为同一个消费组的不同消费者设置不同的分配策略,以防出现问题。需要注意的是,这里所说的“消费者所支持的分配策略”是指partition.assignment.strategy参数配置的策略,如果这个参数值只配置了RangeAssignor,那么这个消费者客户端只支持RangeAssignor分配策略,而不是消费者客户端代码中实现的3种分配策略及可能的自定义分配策略。

在此之后,Kafka服务端就要发送JoinGroupResponse响应给各个消费者,leader消费者和其他普通消费者收到的响应内容并不相同。leader消费者会收到最终的分配策略以及消费者成员信息,而普通消费者只能收到最终的分配策略。由此可见,Kafka把分区分配的具体分配交还给客户端,自身并不参与具体的分配细节,这样即使以后分区分配的策略发生了变更,也只需要重启消费端的应用即可,而不需要重启服务端。该过程可见如下图:

在这里插入图片描述

在这里插入图片描述

第三阶段(SYNC GROUP)

leader消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配,在此之后需要将分配的方案同步给各个消费者,此时leader消费者并不是直接和其余的普通消费者同步分配方案,而是通过GroupCoordinator这个“中间人”来负责转发同步分配方案的。在第三阶段,也就是同步阶段,各个消费者会向GroupCoordinator发送SyncGroupRequest请求来同步分配方案,如下图所示:
在这里插入图片描述

服务端在收到消费者发送的SyncGroupRequest请求之后会交由GroupCoordinator来负责具体的逻辑处理。GroupCoordinator同样会先对SyncGroupRequest请求做合法性校验,在此之后会将从leader消费者发送过来的分配方案提取出来,连同整个消费组的元数据信息一起存入Kafka的consumer_offsets主题中,最后发送响应给各个消费者以提供给各个消费者各自所属的分配方案。

消费者在获得消费分区后会连接broker进行消费,并定期发送心跳给消费者协调器表明自己活着。

消费组元数据信息

我们知道消费者客户端提交的消费位移会保存在Kafka的consumer_offsets主题中,这里也一样,只不过保存的是消费组的元数据信息(GroupMetadata)。具体来说,每个消费组的元数据信息都是一条消息,不过这类消息并不依赖于具体版本的消息格式,因为它只定义了消息中的key和value字段的具体内容,所以消费组元数据信息的保存可以做到与具体的消息格式无关。

第四阶段(HEARTBEAT)

进入这个阶段之后,消费组中的所有消费者就会处于正常工作状态。在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交到了GroupCoordinator,并且GroupCoordinator将其保存到了Kafka内部的consumeroffsets主题中,此时消费者可以通过OffsetFetchRequest请求获取上次提交的消费位移并从此处继续消费。

消费者通过向GroupCoordinator发送心跳来维持它们与消费组的从属关系,以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区中的消息。心跳线程是一个独立的线程,可以在轮询消息的空档发送心跳。如果消费者停止发送心跳的时间足够长,则整个会话就被判定为过期,GroupCoordinator也会认为这个消费者已经死亡,就会触发一次再均衡行为。消费者的心跳间隔时间由参数heartbeat.interval.ms指定,默认值为3000,即3秒,这个参数必须比session.timeout.ms参数设定的值要小,一般情况下heartbeat.interval.ms的配置值不能超过session.timeout.ms配置值的1/3。这个参数可以调整得更低,以控制正常重新平衡的预期时间。

如果一个消费者发生崩溃,并停止读取消息,那么GroupCoordinator会等待一小段时间,确认这个消费者死亡之后才会触发再均衡。在这一小段时间内,死掉的消费者并不会读取分区里的消息。这个一小段时间由session.timeout.ms参数控制,该参数的配置值必须在broker端参数group.min.session.timeout.ms(默认值为6000,即6秒)和group.max.session.timeout.ms(默认值为300000,即5分钟)允许的范围内。

还有一个参数max.po11.interval.ms,它用来指定使用消费者组管理时poll0方法调用之间的最大延退,也就是消费者在获取更多消息之前可以空闲的时间量的上限。如果此超时时间期满之前polio没有调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。

除了被动退出消费组,还可以使用LeaveGroupRequest请求主动退出消费组。

消费者协调器和控制器关系

在Kafka中,控制器(Controller)与组协调器(Group Coordinator)在工作上存在一定的交互,具体体现在以下几个方面:

分区分配与消费者组管理

  • 控制器负责分区管理:控制器负责管理Kafka集群中的分区状态,如分区的创建、删除以及副本的分配等。当一个新的分区被创建时,控制器会决定该分区的副本分布在哪些Broker上。

  • 组协调器依赖分区信息:组协调器在进行消费者组的分区分配时,需要依赖控制器所管理的分区元数据信息。例如,组协调器要根据分区的数量、副本分布以及消费者组内消费者的数量和位置等信息,来为消费者分配合适的分区,以实现负载均衡和高效的数据消费。

选举与状态同步

  • 控制器主导Broker选举:在Kafka集群中,当Broker出现故障或新的Broker加入时,控制器会负责选举新的Leader Broker以及进行相关的状态变更。

  • 组协调器获取选举结果:组协调器需要与控制器进行交互,以获取最新的Broker选举结果和集群状态信息。这有助于组协调器了解哪些Broker是活跃的,哪些是不可用的,从而更好地管理消费者组的状态,确保消费者能够正确地连接到合适的Broker进行数据消费。

心跳检测与故障处理

  • 组协调器检测消费者心跳:组协调器通过心跳机制来检测消费者的存活状态。如果消费者长时间没有发送心跳,组协调器会认为消费者可能出现了故障,并进行相应的处理,如重新分配分区。

  • 控制器协助故障判断:在这个过程中,组协调器可能会与控制器进行交互,以获取更全面的集群状态信息,来确定消费者的故障是否是由于Broker故障等原因引起的。控制器可以提供关于Broker状态、分区状态等方面的信息,帮助组协调器更准确地判断故障情况,并采取合适的措施,如触发重新平衡操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2296251.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IDEA升级出现问题Failed to prepare an update Temp directory inside installation

IDEA升级出现问题"Failed to prepare an update Temp directory inside installation…" 问题来源: 之前修改了IDEA的默认配置文件路径,然后升级新版本时就无法升级,提示"Failed to prepare an update Temp directory insid…

十款开源的论坛建站工具

以下是十款开源的论坛建站工具,它们各具特色,能够满足不同用户的需求: Discuz!(Crossday Discuz! Board) 特点:基础架构采用web编程组合PHPMySQL,用户可以在不需要任何编程的基础上,…

vue学习6

1. 智慧商城 1. 路由设计配置 单个页面&#xff0c;独立展示的&#xff0c;是一级路由 2.二级路由配置 规则&组件配置导航链接配置路由出口 <template><div id"app"><!--二级路由出口--><router-view></router-view><van-…

线程池以及日志、线程总结

一、线程池以及日志 1、基础线程池写法 主线程在main函数中构建一个线程池&#xff0c;初始化(Init)后开始工作(Start) 此时线程池中每个线程都已经工作起来了&#xff0c;只是任务队列中任务为空&#xff0c;所有线程处于休眠状态(通过线程同步中的条件变量实现&#xff0c…

Vue 响应式渲染 - 过滤应用

Vue 渐进式JavaScript 框架 基于Vue2的学习笔记 - Vue响应式渲染综合 - 过滤应用 目录 过滤应用 引入vue Vue设置 设置页面元素 模糊查询过滤实现 函数表达式实现 总结 过滤应用 综合响应式渲染做一个输入框&#xff0c;用来实现&#xff1b;搜索输入框关键词符合列表。…

【ThreeJS Basics 1-3】Hello ThreeJS,实现第一个场景

文章目录 环境创建一个项目安装依赖基础 Web 页面概念解释编写代码运行项目 环境 我的环境是 node version 22 创建一个项目 首先&#xff0c;新建一个空的文件夹&#xff0c;然后 npm init -y , 此时会快速生成好默认的 package.json 安装依赖 在新建的项目下用 npm 安装依…

深入理解动态代理

为什么需要动态代理 对于代码的增强逻辑我们是清楚具体实现的,一种方式是增强逻辑作为委托类,被其他业务类调用, 这样会有很多重复代码,而且,当需要根据动态参数来决定增强逻辑时,重复代码会更多,逻辑会更不清晰 二,也是动态代理产生的原始需求,解决类爆照问题, 所以…

Cherry Studio之DeepSeek联网/本地,建属于自己的AI助理!

上一篇文章&#xff0c;讲了DeepSeek-R1部署到本地的方法。这一篇文章&#xff0c;我们让DeepSeek再一次升级&#xff0c;通过图形化界面来交互&#xff0c;从而变成我们的AI助理&#xff0c;让DeepSeek R1发挥最大实力&#xff01; 首选需要借助硅基流动的API接口&#xff0c…

IGBT的两级关断

IGBT&#xff08;绝缘栅双极型晶体管&#xff09;的两级关断&#xff08;Two-stage turn-off&#xff09;是一种优化关断过程的方法&#xff0c;主要用于减少关断时的电压过冲和dv/dt&#xff08;电压变化率&#xff09;过高的问题&#xff0c;特别是在大功率应用中&#xff08…

【STM32】ADC

本次实现的是ADC实现数字信号与模拟信号的转化&#xff0c;数字信号时不连续的&#xff0c;模拟信号是连续的。 1.ADC转化的原理 模拟-数字转换技术使用的是逐次逼近法&#xff0c;使用二分比较的方法来确定电压值 当单片机对应的参考电压为3.3v时&#xff0c;0~ 3.3v(模拟信号…

0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录 0基础…

Chapter3:结构化程序设计

参考书籍&#xff1a;《C#边做边学》&#xff1b; 3.结构化程序设计 3.1 结构化程序设计的3种基本结构 顺序结构&#xff1a;先执行 A {\rm A} A语句&#xff0c;再执行 B {\rm B} B语句&#xff0c;两者是顺序执行的关系&#xff1b; 选择结构&#xff1a;根据所定选择条件为…

白话文实战Nacos(保姆级教程)

前言 上一篇博客 我们创建好了微服务项目,本篇博客来体验一下Nacos作为注册中心和配置中心的功能。 注册中心 如果我们启动了一个Nacos注册中心,那么微服务比如订单服务,启动后就可以连上注册中心把自己注册上去,这过程就是服务注册。每个微服务,比如商品服务都应该注册…

智能理解 PPT 内容,快速生成讲解视频

当我们想根据一版 PPT 制作出相对应的解锁视频时&#xff0c;从撰写解锁词&#xff0c;录制音频到剪辑视频&#xff0c;每一个环节都需要投入大量的时间和精力&#xff0c;本方案将依托于阿里云函数计算 FC 和百炼模型服务&#xff0c;实现从 PPT 到视频的全自动转换&#xff0…

IEC61850标准下的数据和数据模型服务的详细介绍

目录 一、摘要 二、概述 三、详细介绍 1、读服务器目录(GetServerDirectory) 2、读逻辑设备目录(GetLogicalDeviceDirectory) 3、读逻辑节点目录(GetLogicalNodeDirectory) 4、读全部数据值(GetAllDataValues) 5、读数据值(GetDataValues) 6、设置数据值(SetDataValues…

R语言LCMM多维度潜在类别模型流行病学研究:LCA、MM方法分析纵向数据

全文代码数据&#xff1a;https://tecdat.cn/?p39710 在数据分析领域&#xff0c;当我们面对一组数据时&#xff0c;通常会有已知的分组情况&#xff0c;比如不同的治疗组、性别组或种族组等&#xff08;点击文末“阅读原文”获取完整代码数据&#xff09;。 然而&#xff0c;…

5. 【.NET 8 实战--孢子记账--从单体到微服务--转向微服务】--微服务基础工具与技术--Nacos

一、什么是Nacos Nacos 是阿里巴巴开源的一款云原生应用基础设施&#xff0c;它旨在简化微服务架构中服务治理和配置管理的复杂性。通过 Nacos&#xff0c;服务在启动时可以自动注册&#xff0c;而其他服务则可以通过名称来查找并访问这些注册好的实例。同时&#xff0c;Nacos…

VUE项目中实现权限控制,菜单权限,按钮权限,接口权限,路由权限,操作权限,数据权限实现

VUE项目中实现权限控制&#xff0c;菜单权限&#xff0c;按钮权限&#xff0c;接口权限&#xff0c;路由权限&#xff0c;操作权限&#xff0c;数据权限实现 权限系统分类&#xff08;RBAC&#xff09;引言菜单权限按钮权限接口权限路由权限 菜单权限方案方案一&#xff1a;菜单…

【网络安全】服务器安装Docker及拉取镜像教程

文章目录 1. 安装 Docker2. 拉取镜像3. 运行 Ubuntu 容器4. 执行相关操作5. 退出并停止容器1. 安装 Docker # 更新软件包索引 sudo apt update# 安装必要的依赖 sudo apt install -y ca-certificates curl gnupg

elementplus 使用日期时间选择器,设置可选范围为前后大于2年且只能选择历史时间不能大于当前时间点

需求&#xff1a;时间选择器可选的时间范围进行限制&#xff0c;-2年<a<2年且a<new Date().getTime()核心&#xff1a;这里需要注意plus版没有picker-options换成disabled-date属性了&#xff0c;使用了visible-change和calendar-change属性逻辑&#xff1a;另设一个参…