Kafka - 14 Kafka分区的分配策略及再平衡 | Range | RoundRobin | Sticky | CooperativeSticky

news2025/1/15 12:53:54

文章目录

    • 1. 分区的分配以及再平衡
    • 2. Range 分区分配以及再平衡
    • 3. RoundRobin 分区分配以及再平衡
    • 4. Sticky 分区分配以及再平衡

1. 分区的分配以及再平衡

一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据。

Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range +CooperativeSticky。Kafka可以同时使用多个分区分配策略。

在这里插入图片描述

在这里插入图片描述

2. Range 分区分配以及再平衡

Range 是对每个 topic 而言的。首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。

假如现在有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。例如,7/3 = 2 余 1 ,除不尽,那么 消费者 C0 便会多消费 1 个分区。 8/3=2余2,除不尽,那么C0和C1分别多消费一个。

通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。

在这里插入图片描述

注意:如果只是针对 1 个 topic 而言,C0消费者多消费1个分区影响不是很大。但是如果有 N 多个 topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。容易产生数据倾斜。

需求:设置主题为 hh,7 个分区;准备 3 个消费者,采用Range 分区分配策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

Range 分区分配策略案例:

① 修改主题 hh为 7 个分区

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 3 --replication-factor 3 --topic hh

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic hh
Topic:hh    PartitionCount:3    ReplicationFactor:3     Configs:segment.bytes=1073741824
Topic: hh       Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 2,1,0
Topic: hh       Partition: 1    Leader: 2       Replicas: 2,0,3 Isr: 3,0,2
Topic: hh       Partition: 2    Leader: 3       Replicas: 3,2,1 Isr: 3,1,2

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --alter --topic hh --partitions 7

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic hh
Topic:hh    PartitionCount:7     ReplicationFactor:3     Configs:segment.bytes=1073741824
Topic: hh       Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 2,0,1
Topic: hh       Partition: 1    Leader: 2       Replicas: 2,0,3 Isr: 2,0,3
Topic: hh       Partition: 2    Leader: 3       Replicas: 3,2,1 Isr: 2,1,3
Topic: hh       Partition: 3    Leader: 3       Replicas: 3,0,1 Isr: 3,0,1
Topic: hh       Partition: 4    Leader: 0       Replicas: 0,2,3 Isr: 0,2,3
Topic: hh       Partition: 5    Leader: 1       Replicas: 1,3,0 Isr: 1,3,0
Topic: hh       Partition: 6    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
[root@hadoop101 kafka_2.12-2.2.1]#

② 创建3个消费者,消费者组名相同,这样3个消费者属于同一个组

public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 创建消费者组,组名任意起名都可以
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");

        // 创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        // 订阅主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("hh");
        consumer.subscribe(topics);

        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                "分区:"+consumerRecord.partition()+"数据:"+consumerRecord.value()
            }
        }
    }
}

③ 创建生产者用来向主题hh发送消息,随机发送到不同的分区

public class CustomProducerCallbackPartitions {
    public static void main(String[] args) throws InterruptedException {
        // kafka生产者属性配置
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // kafka生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for(int i=0;i<50;i++){
            kafkaProducer.send(new ProducerRecord<>("hh" ,"hello,kafka"), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                    if(exception==null){
                        // 消息发送成功
                        System.out.println("主题"+recordMetadata.topic()+",发往的分区:"+recordMetadata.partition());
                    }else{
                        // 消息发送失败
                        exception.printStackTrace();
                    }
                }
            });
            Thread.sleep(2);
        }
        // 关闭资源
        kafkaProducer.close();
    }
}

④ 测试:先启动3个消费者,然后启动生产者发送消息

Consumer1消费者消费 0、1、2 号分区的数据

在这里插入图片描述

Consumer消费者消费 3、4 号分区的数据

在这里插入图片描述

Consumer2消费者消费 5、6分区的数据

在这里插入图片描述

Range 分区分配再平衡案例:

① 停止掉 Consumer 消费者,快速重新发送消息观看结果(45s 以内,越快越好):Consumer 消费者的任务会整体被分配到 Consumer1 消费者或者 Consumer2 消费者。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

说明:Consumer 消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

② 再次重新发送消息观看结果(45s 以后):

在这里插入图片描述

说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

3. RoundRobin 分区分配以及再平衡

RoundRobin 针对集群中所有Topic而言,RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。

在这里插入图片描述

需求:设置主题为 hh,7 个分区;准备 3 个消费者,采用RoundRobin 分区分配策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

RoundRobin 分区分配策略案例:

① 依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代码中修改分区分配策略为 RoundRobin,同时修改消费者组名为 test-group

public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置分区分配策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
        // 创建消费者组,组名任意起名都可以
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-group");

        // 创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        // 订阅主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("hh");
        consumer.subscribe(topics);

        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println("分区:"+consumerRecord.partition()+"数据:"+consumerRecord.value());
            }
        }
    }
}

② 启动生产者发送消息测试

在这里插入图片描述

RoundRobin 分区分配再平衡案例:

① 停止掉Consumer消费者,快速重新发送消息观看结果(45s 以内,越快越好):

在这里插入图片描述

Consumer消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 2号和5号分区数据,分别由Consumer1消费者和 Consumer2消费者消费。

② 再次重新发送消息观看结果(45s 以后):

在这里插入图片描述

说明:消费者 Consumer 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

4. Sticky 分区分配以及再平衡

粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

需求:设置主题为 hh,7 个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

① 修改3 个消费者分区分配策略为粘性,重启 3 个消费者,如果出现报错,全部停止等会再重启,或者修改为全新的消费者组。

public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置分区分配策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
        // 创建消费者组,组名任意起名都可以
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");

        // 创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        // 订阅主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("hh");
        consumer.subscribe(topics);

        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println("分区:"+consumerRecord.partition()+"数据:"+consumerRecord.value());
            }
        }
    }
}

② 启动生产者发送消息测试:可以看到会尽量保持分区的个数近似划分分区。

在这里插入图片描述

Sticky 分区分配再平衡案例:

① 停止掉 Consumer 消费者,快速重新发送消息观看结果(45s 以内,越快越好):Consumer消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 Consumer1 消费者或者 Consumer2 消费者消费。

在这里插入图片描述

② 再次重新发送消息观看结果(45s 以后):消费者 Consumer 已经被踢出消费者组,所以重新按照粘性方式分配。

在这里插入图片描述

消费者 Consumer 已经被踢出消费者组,所以重新按照粘性方式分配。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/59857.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[Windows驱动开发] BlackBone介绍

▒ 目录 ▒&#x1f6eb; 导读需求开发环境1️⃣ 名词解释2️⃣ 介绍3️⃣ 库包含内容4️⃣ 编译问题总结常见问题fatal error LNK1104非类型模板参数中的 "auto" &#x1f4d6; 参考资料&#x1f6eb; 导读 需求 作为Windows开发人员&#xff0c;经常遇到枚举进程、…

dojo中的类

使用arcgis api for js 4.*进行地图的web前端开发&#xff0c;就不得不与dojo打交道。dojo是一个框架&#xff0c;自成体系&#xff0c;比如它对类的支持&#xff0c;有自己的一套。众所周知&#xff0c;js不是面向对象语言&#xff0c;没有类这一说&#xff0c;都是用函数来模…

人工智能轨道交通行业周刊-第25期(2022.11.28-12.4)

本期关键词&#xff1a;液体安检仪、智慧车站、大机作业、动车打温、实时人体姿态估计、图像压缩 1 整理涉及公众号名单 1.1 行业类 RT轨道交通中关村轨道交通产业服务平台人民铁道世界轨道交通资讯网铁路信号技术交流北京铁路轨道交通网上榜铁路视点ITS World轨道交通联盟V…

Java项目:SSM医院挂号预约管理系统

作者主页&#xff1a;源码空间站2022 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文末获取源码 项目介绍 本项目分为管理员与医生两种角色&#xff1b; 管理员角色包含以下功能&#xff1a; 管理员登录,添加科室,科室增删改查,医生管理,查看预约信息,…

Redis03:Redis基础知识以及数据类型

Redis基础知识以及数据类型基础知识Redis-key基本数据类型String(字符串)三种特殊的数据类型geospatialhyperloglogbitmaps基础知识 redis默认有16个数据库&#xff0c;默认使用的时第0个&#xff0c;可以使用select进行切换数据库 清除当前数据库 清除全部数据库的内容 Red…

Cmake升级与软链接

记录cmake升级版本的记录&#xff0c;主要参考文章&#xff1a;Unbuntu安装Ros后Cmake变成3.10_向日葵骑士Faraday的博客-CSDN博客 cmake升级、更新&#xff08;ubuntu18.04&#xff09;_Doctor_Wu_的博客-CSDN博客_cmake升级 设计安装ros后cmake版本出现问题&#xff0c;且卸…

[附源码]Python计算机毕业设计Django基于Java的员工管理系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

Windows 文件共享功能使用方法,局域网多台电脑之间传送文件

设想一下&#xff0c;家里或者公司有多台电脑&#xff0c;连接同一个Wifi&#xff0c;也就是处于同一个局域网中。 在不能使用微信、网盘的文件传输功能的情况下&#xff0c;这多台电脑之间&#xff0c;就只能用U盘传送数据吗&#xff1f; 不。Windows系统中已经提供了文件共享…

ZMQ之克隆模式的可靠性

克隆服务器的可靠性 克隆模型1至5相对比较简单&#xff0c;下面我们会探讨一个非常复杂的模型。可以发现&#xff0c;为了构建可靠的消息队列&#xff0c;我们需要花费非常多的精力。所以我们经常会问&#xff1a;有必要这么做吗&#xff1f;如果说你能够接受可靠性不够高的、或…

Docker三大核心概念(镜像、容器和仓库)与虚拟化

目录 1. Docker是什么 2. Docker与虚拟化 3. Docker虚拟化的好处 4. Docker核心概念 4.1.镜像 4.2.容器 4.3.仓库 5. CentOS7 安装docker(在线方式) 5.1.内核版本信息检查 5.2 卸载可能存在的旧版本 5.3 安装必要的系统工具 5.4 添加docker-ce安装源 5.5 更新yum缓存 5.…

web前端期末大作业:个人网站设计——响应式个人小站网站HTML+CSS+JavaScript

&#x1f389;精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业…

STC 51单片机54——气压水压计HX710B 串口显示均值滤波+滑窗滤波

//气压模块为红色模块&#xff0c;传感器型号未知&#xff0c;其信号放大器型号为HX710B // STC15W408AS 11.0592MHz 波特率9600&#xff0c;串口输出大气压强值 // STC15W408AS没有定时器1&#xff0c;所以用定时器2做波特率发生器 // 采用电脑USB供电会有很大的干扰&#xff…

Unity工具 - 工具聚合页(UEWindow)

随着项目工程的推进&#xff0c;开发者们会根据工作内容的需要在Unity内开发众多的工具。随着工具的增多&#xff0c;Unity 的Menu菜单也会逐渐臃肿&#xff0c;过于分散&#xff0c;工具代码也难以查找。在此问题的基础上&#xff0c;开发了工具聚合页(UEWindow) 这一功能来管…

R绘图案例|基于分面的面积图

简介 最近参加一个统计建模的比赛。模型建模后&#xff0c;需要展示不同模型的性能指标&#xff0c;数据如下所示&#xff1a; 其中&#xff0c;第 1 列是不同样本&#xff0c;共376条。第 2-4 列是随机森林得到的结果&#xff0c;第 5-7 列是XGBoost的结果。一共使用了三种评…

数字验证学习笔记——UVM学习3 核心基类

一、核心基类 UVM世界中的类最初都是从一个uvm_void根类&#xff08;root class&#xff09;继承来的&#xff0c;而实际上这个类并没有成员变量和方法。 uvm_void只是一个虚类&#xff08;virtual class&#xff09;&#xff0c;还在等待将来继承于它的子类去开垦。在继承与u…

适合新手的Pytorch的中文文档

&#x1f50e;大家好&#xff0c;我是Sonhhxg_柒&#xff0c;希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流&#x1f50e; &#x1f4dd;个人主页&#xff0d;Sonhhxg_柒的博客_CSDN博客 &#x1f4c3; &#x1f381;欢迎各位→点赞…

牛客练习赛106

牛客练习赛106 C D 脑筋急转弯的构造题 E 染色法判断二分图 结论&#xff0c;这个图是二分图说明不存在奇环 设左边是x&#xff0c;右边是y 则有xyn,xyn, xyn,且x∗y>边数n∗(n−1)/2−mx*y>边数n*(n-1)/2-m x∗y>边数n∗(n−1)/2−m 也就是说左式最大是n∗n/4(xn/…

【1774. 最接近目标价格的甜点成本】

来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 描述&#xff1a; 你打算做甜点&#xff0c;现在需要购买配料。目前共有 n 种冰激凌基料和 m 种配料可供选购。而制作甜点需要遵循以下几条规则&#xff1a; 必须选择 一种 冰激凌基料。可以添加 一种或多种 配料&…

[附源码]计算机毕业设计ssm校园一卡通服务平台

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

Apache+PHP8+MYSQL的配置(目前最新版本)

已有很多年没有WEB开发了&#xff0c;本机都没了测试的服务环境&#xff0c;前几天GO语言的一个测试用例需要用到WEB&#xff0c;于是快速搭建一个Apche环境&#xff0c;也顺便将PHP和MYSQL的环境也配置好&#xff0c;贴出来方便自己和他人&#xff0c;临时需要的时候就更快了&…