Kafka3.0.0版本——消费者(Range分区分配策略以及再平衡)

news2025/1/13 10:02:55

目录

    • 一、Range分区分配策略原理
      • 1.1、Range分区分配策略原理的示例一
      • 1.2、Range分区分配策略原理的示例二
      • 1.3、Range分区分配策略原理的示例注意事项
    • 二、Range 分区分配策略代码案例
      • 2.1、创建带有4个分区的fiveTopic主题
      • 2.2、创建三个消费者 组成 消费者组
      • 2.3、创建生产者
      • 2.4、测试
      • 2.5、Range 分区分配策略代码案例说明
    • 三、Range 分区分配再平衡案例
      • 3.1、停止某一个消费者后,(45s 以内)重新发送消息示例
      • 3.2、停止某一个消费者后,(45s 以后)重新发送消息示例
      • 3.3、Range 分区分配再平衡案例说明

一、Range分区分配策略原理

  • Range 是对每个 topic 而言的。首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。

1.1、Range分区分配策略原理的示例一

假如现在有 4 个分区,3 个消费者,排序后的分区将会是0,1,2,3;消费者排序完之后将会是C0,C1,C2。

  • 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。 如果除不尽,那么前面几个消费者将会多消费 1 个分区。
  • 例如:4/3 = 1 余 1 ,除不尽,那么消费者C0便会多消费1个分区。
    在这里插入图片描述

1.2、Range分区分配策略原理的示例二

假如现在有 5 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4;消费者排序完之后将会是C0,C1,C2。

  • 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。 如果除不尽,那么前面几个消费者将会多消费 1 个分区。
  • 例如:5/3 = 1 余 2 ,除不尽,那么消费者么C0和C1分别多消费一个分区。
    在这里插入图片描述

1.3、Range分区分配策略原理的示例注意事项

  • 如果只是针对 1 个 topic 而言,C0消费者多消费1个分区影响不是很大。但是如果有N多个topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。 容易产生数据倾斜!

二、Range 分区分配策略代码案例

2.1、创建带有4个分区的fiveTopic主题

  • 在 Kafka 集群控制台,创建带有4个分区的fiveTopic主题

    bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 4 --replication-factor 1 --topic fiveTopic
    

    在这里插入图片描述

2.2、创建三个消费者 组成 消费者组

  • 复制 CustomConsumer1类,创建 CustomConsumer2和CustomConsumer3。这样可以由三个消费者组成消费者组,组名都为“test”。

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumer1 {
    
        public static void main(String[] args) {
    
            // 0 配置
            Properties properties = new Properties();
    
            // 连接 bootstrap.servers
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 配置消费者组id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    
            // 1 创建一个消费者  "", "hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 订阅主题 first
            ArrayList<String> topics = new ArrayList<>();
            topics.add("fiveTopic");
            kafkaConsumer.subscribe(topics);
    
            // 3 消费数据
            while (true){
    
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    

2.3、创建生产者

  • 创建CustomProducer生产者。

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    
    public class CustomProducerCallback {
    
        public static void main(String[] args) throws InterruptedException {
    
            //1、创建 kafka 生产者的配置对象
            Properties properties = new Properties();
    
            //2、给 kafka 配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            //3、指定对应的key和value的序列化类型 key.serializer value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            //4、创建 kafka 生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //5、调用 send 方法,发送消息
            for (int i = 0; i < 200; i++) {
                kafkaProducer.send(new ProducerRecord<>("fiveTopic", "hello kafka" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                        }
                    }
                });
                Thread.sleep(2);
            }
    
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
    

2.4、测试

  • 首先,在 IDEA中分别启动消费者1、消费者2和消费者3代码
    在这里插入图片描述

  • 然后,在 IDEA中分别启动生产者代码
    在这里插入图片描述

  • 在 IDEA 控制台观察消费者1、消费者2和消费者3控制台接收到的数据,如下图所示:

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

2.5、Range 分区分配策略代码案例说明

  • 由上述测试输出结果截图可知: 消费者1消费2分区的数据;消费者2消费0和3分区的数据;消费者3消费2分区的数据。
  • 说明:Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。

三、Range 分区分配再平衡案例

3.1、停止某一个消费者后,(45s 以内)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 0、3号分区数据。在这里插入图片描述
  • 由下图控制台输出可知:3号消费者 消费到 1号分区数据。
    在这里插入图片描述

3.2、停止某一个消费者后,(45s 以后)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 0、3号分区数据。
    在这里插入图片描述
  • 由下图控制台输出可知:3号消费者 消费到 1、2号分区数据。
    在这里插入图片描述

3.3、Range 分区分配再平衡案例说明

  • 1号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
  • 消费者1 已经被踢出消费者组,所以重新按照 range 方式分配。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/992906.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

学会用命令行创建uni-app项目并用vscode开放项目

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 创建 uni-app 项目 命令行创建 uni-app 项目 编译和运行 uni-app 项目&#xff1a; 用 VS Code 开发 uni…

深入浅出学Verilog--基础语法

1、简介 Verilog的语法和C语言非常类似&#xff0c;相对来说还是非常好学的。和C语言一样&#xff0c;Verilog语句也是由一连串的令牌&#xff08;Token&#xff09;组成。1个令牌必须由1个或1个以上的字符&#xff08;character&#xff09;组成&#xff0c;令牌可以是&#x…

前端通过第三插件uuid 生成一个 uuid

有时候 后端会让我们自己生成一个uuid 我们没必要自己去写 直接用第三方插件就好了 先终端执行 npm install uuid这样 我们第三方插件就进来了 然后 引入一定要根据环境来 //TS环境引入 import { v4 as uuidv4 } from uuid; //js环境引入 const { v4: uuidv4 } require(uui…

Ubuntu 20.04出现蓝牙无法打开的问题(已解决)

安装Ubuntu20.04后&#xff0c;蓝牙无法打开&#xff0c;按钮开启后蓝牙仍处于关闭状态 解决方法&#xff08;四种方式&#xff09; 1.卸载并重新加载btusb内核模块&#xff08;支持蓝牙设备的内核模块&#xff09; sudo rmmod btusb sleep 1 sudo modprobe btusb2、安装蓝牙工…

OpenRoads Designer道路边坡渐变过渡之二点特征名称覆盖

点特征名称覆盖在模板内进行设置&#xff0c;因此使用点特征名称覆盖实现边坡外口连续适用于使用一个模板创建的一段道路&#xff08;廊道&#xff09;模型内出现的边坡开口线间断的情况&#xff0c;对于使用多个模板创建的一系列首尾相连的道路模型&#xff0c;在相邻模型间出…

性能测试中TPS上不去的几种原因

中TPS一直上不去&#xff0c;是什么原因&#xff1f; 这篇文章&#xff0c;就具体说说在实际压力测试中&#xff0c;为什么有时候TPS上不去的原因。 先来解释下什么叫TPS&#xff1a; TPS&#xff08;Transaction Per Second&#xff09;&#xff1a;每秒事务数&#xff0c;…

腾讯云学生专属便宜云服务器如何购买?

随着云计算技术的快速发展&#xff0c;越来越多的学生开始关注和使用云服务器。腾讯云作为国内知名的云计算服务提供商&#xff0c;推出了一系列针对学生的优惠活动&#xff0c;让更多学生能够享受到云服务器的便利和优势。本文将详细介绍如何购买腾讯云学生专属的便宜云服务器…

java 集成免费虹软人脸识别 SDK,实现人脸识别认证功能

系列文章目录 引入本地 jar 包教程&#xff1a; https://blog.csdn.net/demo_yo/article/details/132495029 文章目录 系列文章目录前言一、SDK 下载二、SDK 集成1、jar 依赖包2、dll 链接文件 三、API 集成1、yaml 配置2、Properties 配置类3、Factory 工厂类4、Service 服务…

类加载流程

文档链接&#xff1a; https://www.processon.com/view/link/64fc101a00a5c32bca7fe12e 访问密码&#xff1a;e3x8

亚马逊测评工作室怎么赚钱?

测评工作室的盈利方式主要来自于以下几种&#xff1a; 佣金&#xff1a;市场价基本上做免评单一单30、留评50单&#xff0c;会根据市场价实施波动&#xff0c;如果一个人今天做30单的留评单就是30单*501500元汇率差&#xff1a;即使用实时汇率进行结算时的差额&#xff0c;假设…

性能测试系列专题集合

下方查看历史精选文章 重磅发布 - 自动化框架基础指南pdfv1.1大数据测试过程、策略及挑战 测试框架原理&#xff0c;构建成功的基石 在自动化测试工作之前&#xff0c;你应该知道的10条建议 在自动化测试中&#xff0c;重要的不是工具 从终端用户感受来体验性能指标度量如何建立…

3000字详解!什么是护网行动?什么是红蓝对抗?

一、什么是护网行动&#xff1f; 护网行动是以公安部牵头的&#xff0c;用以评估企事业单位的网络安全的活动。 具体实践中。公安部会组织攻防两方&#xff0c;进攻方会在一个月内对防守方发动网络攻击&#xff0c;检测出防守方&#xff08;企事业单位&#xff09;存在的安全…

04_瑞萨GUI(LVGL)移植实战教程之驱动LCD屏(SPI)

本系列教程配套出有视频教程&#xff0c;观看地址&#xff1a;https://www.bilibili.com/video/BV1gV4y1e7Sg 4. 驱动LCD屏(SPI) 本次实验我们在上一次实验的基础上驱动 LCD屏(SPI)。 上次实验我们已经能驱动触摸屏(I2C)并打印触摸点坐标&#xff0c;这次实验我们的目标是点…

无涯教程-JavaScript - IMSINH函数

描述 MSINH函数以x yi或x yj文本格式返回复数的双曲正弦值。复数的双曲正弦通过以下公式计算- $$\sinh(x yi) \sinh(x)\cos(y)-\cosh(x)\sin(y)i $$ 语法 IMSINH (inumber)争论 Argument描述Required/OptionalInumberA complex number for which you want the hyperbol…

渗透测试流程是什么?7个步骤给你讲清楚!

在学习渗透测试之初&#xff0c;有必要先系统了解一下它的流程&#xff0c;静下心来阅读一下&#xff0c;树立一个全局观&#xff0c;一步一步去建设并完善自己的专业领域&#xff0c;最终实现从懵逼到牛逼的华丽转变。渗透测试是通过模拟恶意黑客的攻击方法&#xff0c;同时也…

python实现读取并显示图片的两种方法

前言 嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 在 python 中除了用 opencv&#xff0c;也可以用 matplotlib 和 PIL 这两个库操作图片。 本人偏爱 matpoltlib&#xff0c;因为它的语法更像 matlab。 &#x1f447; &#x1f447; &#x1f447; 更多精彩机密、教程&…

智能语音血压计:NV040DS芯片呵护您的健康

随着科技的发展。血压计已告别传统的水银血压计&#xff0c;迈向电子血压计时代。电子血压计往往体积小。携带方便。智能血压计能自动检测人体的血压值&#xff0c;并给予语音提示与科学指导、帮助人们更好地了解自己的身体状况。 一、产品介绍 深耕语音芯片的九芯电子科技带…

SpringBoot+MP操作DM8

1. 达梦数据库介绍 达梦数据库管理系统是达梦公司推出的具有完全自主知识产权的国产高性能数据库管理系统&#xff0c;简称DM。当前最新版本是8.0版本&#xff0c;简称DM8。&#xff08;同时也是一款RDBMS&#xff0c;关系型数据库管理系统&#xff0c;和oracle比较像&#xff…

CLIP | 打破文本图像次元壁

1 贡献 CLIP是文字图片的多模态工作 CLIP的迁移效果非常好。不同风格数据集的ZeroShot推理能力超强 在分类 &#xff0c;物体检测和分割&#xff0c;视频检索都很多视觉下游任务都可以用CLIP取得有监督学习的效果 采用利用自然语音信号的监督信号来进行训练 提出了高质量的…

抖店开通后,新手必须要知道的几个做店技巧,建议认真看完

我是王路飞。 抖店的运营&#xff0c;无非就是围绕【产品】【流量】展开的。 你要是能把这两个点给搞明白&#xff0c;新店快速出单、真是爆单就不再是问题了。 今天就给你们说一下&#xff0c;抖店开通后&#xff0c;作为一个新手商家&#xff0c;你必须要知道的几个做店技…