Kafka - 14 Kafka消费者 | 分区的分配策略及再平衡 | Range | RoundRobin | Sticky | CooperativeSticky

news2025/1/12 11:59:00

文章目录

    • 1. 分区的分配以及再平衡
    • 2. Range 分区分配以及再平衡
    • 3. RoundRobin 分区分配以及再平衡
    • 4. Sticky 分区分配以及再平衡

1. 分区的分配以及再平衡

一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据。

Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range +CooperativeSticky。Kafka可以同时使用多个分区分配策略。

在这里插入图片描述

在这里插入图片描述

2. Range 分区分配以及再平衡

Range 是对每个 topic 而言的。首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。

假如现在有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。例如,7/3 = 2 余 1 ,除不尽,那么 消费者 C0 便会多消费 1 个分区。 8/3=2余2,除不尽,那么C0和C1分别多消费一个。

通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。

在这里插入图片描述

注意:如果只是针对 1 个 topic 而言,C0消费者多消费1个分区影响不是很大。但是如果有 N 多个 topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。容易产生数据倾斜。

需求:设置主题为 hh,7 个分区;准备 3 个消费者,采用Range 分区分配策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

Range 分区分配策略案例:

① 修改主题 hh为 7 个分区

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 3 --replication-factor 3 --topic hh

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic hh
Topic:hh    PartitionCount:3    ReplicationFactor:3     Configs:segment.bytes=1073741824
Topic: hh       Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 2,1,0
Topic: hh       Partition: 1    Leader: 2       Replicas: 2,0,3 Isr: 3,0,2
Topic: hh       Partition: 2    Leader: 3       Replicas: 3,2,1 Isr: 3,1,2

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --alter --topic hh --partitions 7

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic hh
Topic:hh    PartitionCount:7     ReplicationFactor:3     Configs:segment.bytes=1073741824
Topic: hh       Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 2,0,1
Topic: hh       Partition: 1    Leader: 2       Replicas: 2,0,3 Isr: 2,0,3
Topic: hh       Partition: 2    Leader: 3       Replicas: 3,2,1 Isr: 2,1,3
Topic: hh       Partition: 3    Leader: 3       Replicas: 3,0,1 Isr: 3,0,1
Topic: hh       Partition: 4    Leader: 0       Replicas: 0,2,3 Isr: 0,2,3
Topic: hh       Partition: 5    Leader: 1       Replicas: 1,3,0 Isr: 1,3,0
Topic: hh       Partition: 6    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
[root@hadoop101 kafka_2.12-2.2.1]#

② 创建3个消费者,消费者组名相同,这样3个消费者属于同一个组

public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 创建消费者组,组名任意起名都可以
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");

        // 创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        // 订阅主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("hh");
        consumer.subscribe(topics);

        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                "分区:"+consumerRecord.partition()+"数据:"+consumerRecord.value()
            }
        }
    }
}

③ 创建生产者用来向主题hh发送消息,随机发送到不同的分区

public class CustomProducerCallbackPartitions {
    public static void main(String[] args) throws InterruptedException {
        // kafka生产者属性配置
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // kafka生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for(int i=0;i<50;i++){
            kafkaProducer.send(new ProducerRecord<>("hh" ,"hello,kafka"), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                    if(exception==null){
                        // 消息发送成功
                        System.out.println("主题"+recordMetadata.topic()+",发往的分区:"+recordMetadata.partition());
                    }else{
                        // 消息发送失败
                        exception.printStackTrace();
                    }
                }
            });
            Thread.sleep(2);
        }
        // 关闭资源
        kafkaProducer.close();
    }
}

④ 测试:先启动3个消费者,然后启动生产者发送消息

Consumer1消费者消费 0、1、2 号分区的数据

在这里插入图片描述

Consumer消费者消费 3、4 号分区的数据

在这里插入图片描述

Consumer2消费者消费 5、6分区的数据

在这里插入图片描述

Range 分区分配再平衡案例:

① 停止掉 Consumer 消费者,快速重新发送消息观看结果(45s 以内,越快越好):Consumer 消费者的任务会整体被分配到 Consumer1 消费者或者 Consumer2 消费者。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

说明:Consumer 消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

② 再次重新发送消息观看结果(45s 以后):

在这里插入图片描述

说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

3. RoundRobin 分区分配以及再平衡

RoundRobin 针对集群中所有Topic而言,RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。

在这里插入图片描述

需求:设置主题为 hh,7 个分区;准备 3 个消费者,采用RoundRobin 分区分配策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

RoundRobin 分区分配策略案例:

① 依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代码中修改分区分配策略为 RoundRobin,同时修改消费者组名为 test-group

public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置分区分配策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
        // 创建消费者组,组名任意起名都可以
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-group");

        // 创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        // 订阅主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("hh");
        consumer.subscribe(topics);

        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println("分区:"+consumerRecord.partition()+"数据:"+consumerRecord.value());
            }
        }
    }
}

② 启动生产者发送消息测试

在这里插入图片描述

RoundRobin 分区分配再平衡案例:

① 停止掉Consumer消费者,快速重新发送消息观看结果(45s 以内,越快越好):

在这里插入图片描述

Consumer消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 2号和5号分区数据,分别由Consumer1消费者和 Consumer2消费者消费。

② 再次重新发送消息观看结果(45s 以后):

在这里插入图片描述

说明:消费者 Consumer 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

4. Sticky 分区分配以及再平衡

粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

需求:设置主题为 hh,7 个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

① 修改3 个消费者分区分配策略为粘性,重启 3 个消费者,如果出现报错,全部停止等会再重启,或者修改为全新的消费者组。

public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置分区分配策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
        // 创建消费者组,组名任意起名都可以
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");

        // 创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        // 订阅主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("hh");
        consumer.subscribe(topics);

        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println("分区:"+consumerRecord.partition()+"数据:"+consumerRecord.value());
            }
        }
    }
}

② 启动生产者发送消息测试:可以看到会尽量保持分区的个数近似划分分区。

在这里插入图片描述

Sticky 分区分配再平衡案例:

① 停止掉 Consumer 消费者,快速重新发送消息观看结果(45s 以内,越快越好):Consumer消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 Consumer1 消费者或者 Consumer2 消费者消费。

在这里插入图片描述

② 再次重新发送消息观看结果(45s 以后):消费者 Consumer 已经被踢出消费者组,所以重新按照粘性方式分配。

在这里插入图片描述

消费者 Consumer 已经被踢出消费者组,所以重新按照粘性方式分配。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/60287.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Python自然语言处理】规则分词中正向、反向、双向最大匹配法的讲解及实战(超详细 附源码)

需要源码和字典集请点赞关注收藏后评论区留言私信~~~ 一、规则分词 规则分词核心内容是建立人工专家词典库&#xff0c;通过将语句切分出的单词串与专家词典库中的所有词语进行逐一匹配&#xff0c;匹配成功则进行对象词语切分&#xff0c;否则通过增加或者减少一个字继续比较…

文件或者文件夹的忽略

文件或者文件夹的忽略 编辑项目的时候&#xff0c;将一些临时文件或者插件可以忽略上传到项目库中去。 追踪中的文件&#xff0c;不能被忽略。 首先的创建.gitignore文件&#xff0c;并且该文件需要放到项目的根目录下 接着&#xff0c;打开.gitignore文件 windows中 open d…

软件测试——分类

测试分类 一、按照测试对象划分 1、界面 界面测试UI测试 &#xff08;1&#xff09;测试软件界面元素完整性&#xff0c;正确性&#xff0c;一致性 &#xff08;2&#xff09;软件界面排版布局合理、字体、颜色 &#xff08;3&#xff09;测试界面的自适应性&#xff0c;界面…

1549_AURIX_TC275_SCU系统中的CCU模块

全部学习汇总&#xff1a; GreyZhang/g_TC275: happy hacking for TC275! (github.com) 从这一份笔记开始看一下SCU系统&#xff0c;其实这个是一个功能组合&#xff0c;其中的一个小模块又叫做SCU。因此&#xff0c;在名称上可能会有一点点绕。近段时间看相关的资料比较多&…

数据结构与算法—数组栈和链表栈

数据结构与算法—数组栈和链表栈 &#x1f308;一览众山小数据结构与算法—数组栈和链表栈栈介绍栈图解栈实现数组实现栈实现思路实现代码单链表实现栈实现思路(图解)实现代码栈总结栈力扣栈介绍 栈,存储货物或供旅客住宿的地方,可引申为仓库、中转站&#xff0c;所以引入到计算…

Java—泛型、内部类、多继承

文章目录泛型1.泛型是什么&#xff0c;使用泛型的好处2.泛型中的限定通配符和非限定通配符3.泛型擦除内部类多继承多继承使用&#xff1a;———————————————————————————泛型 1.泛型是什么&#xff0c;使用泛型的好处 ​ 泛型就是把类型参数化&…

骰子游戏-第11届蓝桥杯Scratch选拔赛真题精选

[导读]&#xff1a;超平老师计划推出Scratch蓝桥杯真题解析100讲&#xff0c;这是超平老师解读Scratch蓝桥真题系列的第94讲。 蓝桥杯选拔赛每一届都要举行4~5次&#xff0c;和省赛、国赛相比&#xff0c;题目要简单不少&#xff0c;再加上篇幅有限&#xff0c;因此我精挑细选…

Python源码剖析1-整数对象PyIntObject

1、PyIntObject 对象 [intobject.h] typedef struct {PyObject_HEADlong ob_ival; } PyIntObjectPyIntObject是一个不可变&#xff08;immutable&#xff09;对象。Python内部也大量的使用整数对象&#xff0c;我们在自己的代码中也会有大量的创建销毁整型对象的操作&#xff…

霍夫曼树:霍夫曼编码(Huffman Tree:Huffman Coding)

预计阅读时间&#xff1a;10分钟 一、简介 霍夫曼树常处理符号编写工作。根据整组数据中符号出现的频率高低&#xff0c;决定如何给符号编码。如果符号出现的频率越高&#xff0c;则给符号的码越短&#xff0c;相反符号的号码越长。 相关术语 路径&#xff1a;从书中一个节点…

Docker安装可视化管理器Portainer

Docker安装可视化管理器Portainer Portainer 提供状态显示面板、应用模板快速部署、容器镜像网络数据卷的基本操作&#xff08;包括上传下载镜像&#xff0c;创建容器等操作&#xff09;、事件日志显示、容器控制台操作、Swarm 集群和服务等集中管理和操作、登录用户管理和控制…

Linux/Windows Redis的下载与安装

Redis简介 参考视频教程: https://www.bilibili.com/video/BV13a411q753?p143 Redis下载与安装 Windows版 下载地址: https://github.com/microsoftarchive/redis/releases Linux版下载地址: https://download.redis.io/releases/ 1. Window版本 1.1 redis下载 官网下载…

用ACL实现防火墙功能

目录 实验目的&#xff1a; 实验所需软硬件 实验步骤&#xff1a; 1、按以下拓扑接好线路。 2、配置好设备的IP地址和静态路由&#xff0c;使得所有设备可以互通。&#xff08;配置截图&#xff09; PC2 PC0 Router0 Router1​编辑 Server 3、测试各PC/服务器互联状…

基于KubeSphere图形编辑面板构建微服务项目的DevOps 系统

文章目录相关文章部署过程准备工作创建 DevOps 项目创建凭证创建流水线编辑流水线JAVA后端微服务拉取源码构建源码构建镜像推送镜像部署项目VUE前端拉取源码构建源码构建镜像推送镜像部署项目运行流水线查看流水线详情完整流水线脚本微服务后端VUE前端参考相关文章 kubernetes…

Grafana+Prometheus打造运维监控系统(一)-安装篇

1. Prometheus、Grafana介绍 Prometheus是一个开源的系统监控和报警系统&#xff0c;Grafana 是一个开源的监控数据分析和可视化套件&#xff0c;利用GrafanaPrometheus组合&#xff0c;打造运维日常的各种指标监控以及数据可视化。 2. Prometheus 2.1 下载 访问&#xff1…

专利-分析方法总结

目录 一、专利分析的意义 二、专利分析的方法&#xff1a; 2.1、行业专利信息分析 2.1.1、专利技术发展趋势分析 2.1.2、专利区域分布分析 2.1.3、专利相关人分析 2.1.4、专利技术主题分析 2.1.5、技术发展路线分析 2.1.6、专利技术功效分析 2.1.7、专利运营分析 3.…

node环境的搭建

一、node的安装&#xff08;可以去文末直接安装nvm管理器&#xff0c;就不用配置了&#xff09; 1 下载 | Node.js,也可以下载以往版本,window是以msi结尾的文件 2 安装,直接一直安装就行,如果有之前安装的版本,先进行卸载,然后再进行安装 3 安装完成后查看版本号 node -vnp…

Linux下文件目录权限管理chmod, chown, chgrp,umask命令使用总结

在Linux系统下常用的文件目录权限管理命令有chmod, chown, chgrp,umask&#xff0c;一直以来都在用&#xff0c;但是没有太注意它们的区别&#xff0c;今天就在这篇文章做个总结。 目录 1. chmod 2. chown 3. chgrp 4. umask 1. chmod 作用&#xff1a;修改某个目录或文件…

C语言实现学生管理系统(顺序表版)

前言 设计知识 使用语言&#xff1a;C语言 数据结构类型&#xff1a;顺序表 内容导图 效果展示 内容目录前言设计知识内容导图效果展示静态管理系统菜单的实现选择功能实现静态开辟空间实现增删功能增加功能实现删除功能实现实现查找功能实现修改功能实现排序功能动态管理系…

rollup打包工具快速入门

0.开始 教学视频出处 https://www.bilibili.com/video/BV1w84y1z77V?p3&spm_id_frompageDriver&vd_source0f7f337dd5a99bb975b88a48ae1b3711 日期&#xff1a;2022/12/3 rollup目前版本&#xff1a; "rollup": "^3.5.1"1.rollup概述 官网 http…

N32G45之串口+DMA数据收发

N32G45之串口DMA数据收发 1.串口简介   通用同步异步收发器(USART)提供了一种灵活的方法与使用工业标准NRZ异步串行数据格式的外部设备之间进行全双工数据交换。 USART利用分数波特率发生器提供宽范围的波特率选择。它支持同步单向通信和半双工单线通信&#xff0c;也支持LI…