消费者Rebalance机制

news2024/11/24 4:40:24

优质博文:IT-BLOG-CN

一、消费者Rebalance机制

Apache Kafka中,消费者组
Consumer Group会在以下几种情况下发生重新平衡Rebalance
【1】消费者加入或离开消费者组: 当一个新的消费者加入消费者组或一个现有的消费者离开消费者组时,Kafka会触发重新平衡,以重新分配分区给消费者。
【2】消费者崩溃或失去连接: 如果Kafka检测到某个消费者崩溃或失去连接(例如,由于网络问题或消费者进程被终止),它会触发重新平衡。
【3】主题的分区数量发生变化: 如果一个主题的分区数量增加或减少,Kafka会触发重新平衡,以确保新的分区被分配给消费者组中的消费者。
【4】消费者组协调器变更: 消费者组协调器是负责管理消费者组的一个Kafka Broker。如果消费者组协调器发生变更(例如,协调器所在的Broker崩溃),也会触发重新平衡。
【5】消费者组成员发送心跳失败: 消费者需要定期向消费者组协调器发送心跳heartbeat以表明它们仍然活跃。如果心跳失败,协调器会认为该消费者已经失去连接,从而触发重新平衡。

rebalance只针对subscribe这种不指定分区消费的情况,如果通过assign这种消费方式指定了分区,kafka不会进行rebanlance

Kafka在高峰期重平衡rebalancing会导致消费者组的停顿,影响系统的性能和稳定性。为了避免在高峰期发生重平衡,可以采取以下几种策略:
【1】优化分区分配策略: 使用RangeAssignorStickyAssignor等分区分配策略来减少重平衡的频率和影响。

RangeAssignorKafka默认的分区分配策略之一,它将分区按范围分配给消费者。

我们通过一个具体的例子来说明RangeAssignor如何分配分区。

假设我们有一个Kafka主题my-topic,它有6个分区P0, P1, P2, P3, P4, P5,并且我们有3个消费者C1, C2, C3在一个消费者组中。

初始分配:假设初始分配如下:

C1: P0, P1
C2: P2, P3
C3: P4, P5

消费者组成员变化:现在假设C2离开了消费者组,那么RangeAssignor会重新分配分区,以确保分区尽量按顺序和均匀地分配给剩余的消费者。新的分配可能如下:

C1: P0, P1, P2
C3: P3, P4, P5

在这个过程中,RangeAssignor将分区按顺序重新分配给剩余的消费者,确保每个消费者分配到的分区尽量连续。

新消费者加入:现在假设有一个新消费者C4加入了消费者组,RangeAssignor会再次按顺序和均匀地分配分区。新的分配可能如下:

C1: P0, P1
C3: P2, P3
C4: P4, P5

在这个过程中,RangeAssignor将分区重新分配,以确保每个消费者分配到的分区尽量连续和均匀。

通过这个例子,我们可以看到RangeAssignor的分配策略:
1、将分区按顺序分配给消费者。
2、当消费者组成员变化时,重新分配分区,以确保分区尽量按顺序和均匀地分配给所有消费者。
3、分区分配尽量保持连续性。
这种策略的好处是分区分配简单且稳定,减少了分区在消费者组成员变化时的重新分配范围,从而减少了重平衡的频率和影响。

以下是配置RangeAssignor的代码示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Properties;

public class RangeAssignorExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 设置分区分配策略为 RangeAssignor
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅主题
        consumer.subscribe(List.of("example-topic"));

        // 消费消息的逻辑
        // ...
    }
}

StickyAssignorKafka 2.4及以上版本引入的一种分区分配策略,它的目标是尽量保持分区分配的稳定性,减少重平衡的频率。

我们通过一个具体的例子来说明StickyAssignor如何分配分区。

假设我们有一个Kafka主题my-topic,它有6个分区P0, P1, P2, P3, P4, P5,并且我们有3个消费者C1, C2, C3在一个消费者组中。

初始分配:假设初始分配如下:

C1: P0, P1
C2: P2, P3
C3: P4, P5

消费者组成员变化:现在假设C2离开了消费者组,那么StickyAssignor会尽量保持现有的分区分配不变,并重新分配C2的分区。新的分配可能如下:

C1: P0, P1, P2
C3: P3, P4, P5

在这个过程中,StickyAssignor尽量保持C1C3的分区分配不变,只是将C2的分区重新分配给其他消费者。

新消费者加入:现在假设有一个新消费者C4加入了消费者组,StickyAssignor会尝试保持现有的分区分配不变,并将分区尽量均匀地分配给所有消费者。新的分配可能如下:

C1: P0, P1
C3: P4, P5
C4: P2, P3

在这个过程中,StickyAssignor保持了C1C3的分区不变,并将C2的分区重新分配给C4

通过这个例子,我们可以看到StickyAssignor的分配策略:
1、尽量保持现有的分区分配不变。
2、当消费者组成员变化时,尽量最小化分区在消费者之间的移动。
3、尽量保持分区分配的平衡性。
这种策略的好处是减少了重平衡带来的影响,提高了分区分配的稳定性,减少了因分区移动带来的数据重新加载和处理的开销。

以下是配置StickyAssignor的代码示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Properties;

public class StickyAssignorExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 设置分区分配策略为 StickyAssignor
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅主题
        consumer.subscribe(List.of("example-topic"));

        // 消费消息的逻辑
        // ...
    }
}

或者在配置中进行指定

group.id=my-consumer-group
partition.assignment.strategy=org.apache.kafka.clients.consumer.StickyAssignor

【2】增加session.timeout.msheartbeat.interval.ms:增加session.timeout.msheartbeat.interval.ms的值,这样可以减少消费者因为心跳超时而被认为失效,从而触发重平衡。

1、session.timeout.ms是消费者与Kafka broker之间的会话超时时间。如果在这个时间内Kafka broker没有收到某个消费者的心跳,broker就会认为该消费者已经失效,并触发重平衡
2、heartbeat.interval.ms是消费者发送心跳给Kafka broker的时间间隔。心跳是消费者向broker表示自己仍然活跃的方式。

session.timeout.ms=30000
heartbeat.interval.ms=3000

3、heartbeat.interval.ms的值通常要远小于session.timeout.ms的值。这样可以确保在会话超时之前,消费者有多次机会发送心跳。一般建议session.timeout.ms至少是heartbeat.interval.ms10倍,以确保有足够的时间进行多次心跳尝试。

【3】合理配置消费者组:确保消费者组中的消费者数量稳定,避免频繁地增加或减少消费者。尽量在低峰期进行消费者的添加或移除操作。

【4】优化消费者性能:提高消费者的处理能力,确保消费者能够及时处理消息,避免因为处理延迟导致的重平衡。使用异步处理或批量处理来提高消费者的吞吐量。

【5】监控和报警:实时监控Kafka集群和消费者组的状态,设置报警机制,当检测到重平衡风险时,及时采取措施。

【6】使用静态成员Static MembershipKafka 2.3及以上版本支持静态成员功能,可以通过配置group.instance.id来减少重平衡的频率。

group.instance.idKafka 2.4.0引入的一个配置项,用于为每个消费者实例指定一个唯一的标识符。当消费者组中的消费者具有唯一的group.instance.id时,Kafka可以更智能地处理消费者组成员的变化,从而减少不必要的重平衡。

静态成员:通过配置group.instance.id,消费者实例变成了“静态成员”,即使它们暂时断开连接,Kafka也会保留它们的成员身份。这与传统的动态成员(没有group.instance.id)不同,动态成员在断开连接后会被移除,从而触发重平衡。

group.id=my-consumer-group
group.instance.id=consumer-instance-1

【7】调整rebalance.timeout.ms:增加rebalance.timeout.ms的值,确保消费者有足够的时间完成重平衡过程,避免因超时导致的频繁重平衡。

消费者Rebalance分区分配策略

主要包含四种relalance策略:RangeAssignor(范围分配策略),RoundRobinAssignor(轮询分配策略),StickyAssignor(粘性分配策略),CooperativeStickyAssignor(协作粘性分配策略),之前已经讲过两个,这里聊聊剩下的两个

RoundRobinAssignor(轮询分配策略)

RoundRobinAssignor采用轮询的方式将分区分配给消费者。它会将所有分区和消费者按照字典顺序排序,然后依次将每个分区分配给下一个消费者,直到所有分区都被分配完毕。

CooperativeStickyAssignor(协作粘性分配策略)

CooperativeStickyAssignorStickyAssignor的改进版本,它引入了协作重平衡的概念,使得重平衡过程更加平滑,减少了重平衡期间的停顿时间。

二、Rebalance 过程

第一阶段:选择"组协调器"
组协调器GroupCoordinator:每个consumer group都会选择一个broker作为自己的组协调器coordinator,负责监控这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者rebalance

consumer group中的每个consumer启动时会向kafka集群中的某个节点发送FindCoordinatorRequest请求来查找对应的组协调器GroupCoordinator,并跟其建立网络连接。

组协调器选择方式:consumer消费的offset要提交到__consumer_offsets的哪个分区,这个分区leader对应的broker就是这个consumer groupcoordinator

第二阶段:加入消费组JOIN GROUP
在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。然后GroupCoordinator从一个consumer group中选择第一个加入groupconsumer作为leader(消费组协调器),把consumer group情况发送给这个leader,接着这个leader会负责制定分区方案。

第三阶段:SYNC GROUP
consumer leader通过给GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator就把分区方案下发给各个consumer,他们会根据指定分区的leader broker进行网络连接以及消息消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2193892.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot工程中使用tcp协议

文章目录 一、概述二、实现思路三、代码结构四、代码放送五、运行界面六. 主要技术点 一、概述 在上文JAVA TCP协议初体验 中&#xff0c;我们使用java实现了tcp协议的一个雏形&#xff0c;实际中大部分项目都已采用springboot&#xff0c;那么&#xff0c;怎么在springboot中…

【机器学习】知识总结1(人工智能、机器学习、深度学习、贝叶斯、回归分析)

目录 一、机器学习、深度学习 1.人工智能 1.1人工智能概念 1.2人工智能的主要研究内容与应用领域 1.2.1主要研究内容&#xff1a; 1.2.2应用领域 2.机器学习 2.1机器学习的概念 2.2机器学习的基本思路 2.3机器学习的分类 3.深度学习 3.1深度学习的概念 3.2人工智能…

Cocos_鼠标滚轮放缩地图

文章目录 前言一、环境二、版本一_code2.分析类属性方法详细分析详细分析onLoad()onMouseWheel(event)详细分析 总结 前言 学习笔记&#xff0c;请多多斧正。 一、环境 通过精灵rect放置脚本实现鼠标滚轮放缩地图。 二、版本一_code import { _decorator, Component, Node }…

task【XTuner微调个人小助手认知】

1 微调前置基础 本节主要重点是带领大家实现个人小助手微调&#xff0c;如果想了解微调相关的基本概念&#xff0c;可以访问XTuner微调前置基础。 2 准备工作 环境安装&#xff1a;我们想要用简单易上手的微调工具包 XTuner 来对模型进行微调的话&#xff0c;第一步是安装 XTu…

Trie树之最大异或对问题

这是C算法基础-数据结构专栏的第二十八篇文章&#xff0c;专栏详情请见此处。 从这篇博客开始&#xff0c;文章将会于每周一更新&#xff0c;望周知&#xff01; 引入 上次&#xff0c;我们学习了Trie树之字符串统计问题&#xff0c;字符串统计问题中的Trie树节点存储的是字符…

面试官:如何实现分布式系统的限流?

限流的概念以及作用我前一篇文章已经做了介绍:并发限流算法的实践 目录 限流的几种算法 : 1、令牌桶算法 2、漏桶算法 3. 滑动时间窗口计数器算法 5. 全局限流 6. 客户端限流 7. API网关限流 8. 熔断与降级 本篇重点: 具体实现: 限流的几种算法 : 这里主要讲在分…

快速熟悉Nginx

一、Nginx是什么&#xff1f; ‌Nginx是一款高性能、轻量级的Web服务器和反向代理服务器。‌ ‌特点‌&#xff1a;Nginx采用事件驱动的异步非阻塞处理框架&#xff0c;内存占用少&#xff0c;并发能力强&#xff0c;资源消耗低。‌功能‌&#xff1a;Nginx主要用作静态文件服…

Arduino UNO R3自学笔记22 之 Arduino基础篇学习总结

注意&#xff1a;学习和写作过程中&#xff0c;部分资料搜集于互联网&#xff0c;如有侵权请联系删除。 前言&#xff1a;目前将Arduino的大多数基础内容学习了&#xff0c;做个总结。 1.编程语言 学习单片机&#xff0c;在面向单片机编程时&#xff0c;语言是最基础的&#…

给Linux操作系统命令取个别名

一个Linux终端命令的别名通常是其命令的缩写&#xff0c;用来减少键盘输入。命令格式为&#xff1a; alias &#xff3b;alias-name‘original-command’&#xff3d; 其中&#xff0c;alias-name是用户给命令取的别名&#xff08;新名&#xff09;&#xff0c;original-comm…

whisper 实现语音识别 ASR - python 实现

语音识别&#xff08;Speech Recognition&#xff09;&#xff0c;同时称为自动语音识别&#xff08;英语&#xff1a;Automatic Speech Recognition, ASR&#xff09;&#xff0c;将语音音频转换为文字的技术。 whisper是一个通用的语音识别模型&#xff0c;由OpenAI公司开发。…

浸没边界 直接强迫法 圆球绕流验证 阅读笔记

Combined multi-direct forcing and immersed boundary method for simulating flows with moving particles https://doi.org/10.1016/j.ijmultiphaseflow.2007.10.004 他的意思是&#xff0c;不止需要一次的直接强迫 直接强迫的次数与误差成低于二阶的关系 不知道是不是一阶…

输电线路悬垂线夹检测无人机航拍图像数据集,总共1600左右图片,悬垂线夹识别,标注为voc格式

输电线路悬垂线夹检测无人机航拍图像数据集&#xff0c;总共1600左右图片&#xff0c;悬垂线夹识别&#xff0c;标注为voc格式 输电线路悬垂线夹检测无人机航拍图像数据集介绍 数据集名称 输电线路悬垂线夹检测数据集 (Transmission Line Fittings Detection Dataset) 数据集…

sv标准研读第十二章-过程性编程语句

书接上回&#xff1a; sv标准研读第一章-综述 sv标准研读第二章-标准引用 sv标准研读第三章-设计和验证的building block sv标准研读第四章-时间调度机制 sv标准研读第五章-词法 sv标准研读第六章-数据类型 sv标准研读第七章-聚合数据类型 sv标准研读第八章-class sv标…

使用链地址法实现哈希表(哈希函数为除留余数法)

该代码实现了一个哈希表&#xff0c;使用拉链法&#xff08;链地址法&#xff09;来解决哈希冲突&#xff0c;核心思想是通过链表存储哈希冲突的数据。哈希表的大小被设置为 MAX_SIZE&#xff0c;其中哈希函数采用除留余数法。以下是代码的详细解释和总结&#xff1a; #includ…

C++关于链表基础知识

单链表 // 结点的定义 template <class T> struct Node { T data ; Node <T> *next; //指向下一个node 的类型与本node相同 } // 最后一个node指针指向Null 生成结点&#xff1a; Node <T> * p new Node < T>; 为结点赋值: p-> data …

LLM+知识图谱新工具! iText2KG:使用大型语言模型构建增量知识图谱

iText2KG是一个基于大型语言模型的增量知识图谱构建工具&#xff0c;通过从文本文档中提取实体和关系来逐步构建知识图谱。该工具具有零样本学习能力&#xff0c;能够在无需特定训练的情况下&#xff0c;在多个领域中进行知识提取。它包括文档提炼、实体提取和关系提取模块&…

BM1 反转链表

要求 代码 /*** struct ListNode {* int val;* struct ListNode *next;* };*/ /*** 代码中的类名、方法名、参数名已经指定&#xff0c;请勿修改&#xff0c;直接返回方法规定的值即可*** param head ListNode类* return ListNode类*/ struct ListNode* ReverseList(struct …

【LeetCode-热题100-128题】官方题解好像有误

最长连续序列 题目链接&#xff1a;https://leetcode.cn/problems/longest-consecutive-sequence/?envTypestudy-plan-v2&envIdtop-100-liked 给定一个未排序的整数数组 nums &#xff0c;找出数字连续的最长序列&#xff08;不要求序列元素在原数组中连续&#xff09;的…

Linux高阶——0928—Github本地仓库与云端仓库关联

1、安装代理软件 steam 选择Github和系统代理模式&#xff0c;一键加速即可 2、 安装Git 3、访问Github网站&#xff0c;创建新用户 4、Github探索 &#xff08;1&#xff09;Explore探索标签 &#xff08;2&#xff09;工程结构 用户名/仓库名 自述文件&#xff0c;用markdo…

【Llamaindex RAG实践】

基础任务 (完成此任务即完成闯关) 任务要求&#xff1a;基于 LlamaIndex 构建自己的 RAG 知识库&#xff0c;寻找一个问题 A 在使用 LlamaIndex 之前InternLM2-Chat-1.8B模型不会回答&#xff0c;借助 LlamaIndex 后 InternLM2-Chat-1.8B 模型具备回答 A 的能力&#xff0c;截…