Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡)

news2024/10/9 6:29:57

目录

    • 一、RoundRobin 分区分配策略原理
    • 二、RoundRobin分区分配策略代码案例
      • 2.1、创建带有7个分区的sixTopic主题
      • 2.3、创建三个消费者 组成 消费者组
      • 2.3、创建生产者
      • 2.4、测试
      • 2.5、RoundRobin分区分配策略代码案例说明
    • 三、RoundRobin 分区分配再平衡案例
      • 3.1、停止某一个消费者后,(45s 以内)重新发送消息示例
      • 3.2、停止某一个消费者后,(45s 以后)重新发送消息示例
      • 3.3、RoundRobin 分区分配再平衡案例说明

一、RoundRobin 分区分配策略原理

  • RoundRobin 针对集群中所有Topic而言。
  • RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。
    在这里插入图片描述

二、RoundRobin分区分配策略代码案例

2.1、创建带有7个分区的sixTopic主题

  • 在 Kafka 集群控制台,创建带有7个分区的sixTopic主题

    bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 7 --replication-factor 1 --topic sixTopic
    

    在这里插入图片描述

2.3、创建三个消费者 组成 消费者组

  • 复制 CustomConsumer1类,创建 CustomConsumer2和CustomConsumer3。这样可以由三个消费者组成消费者组,组名都为“test1”,设置分区分配策略为 RoundRobin。

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumer1 {
    
        public static void main(String[] args) {
    
            // 0 配置
            Properties properties = new Properties();
    
            // 连接 bootstrap.servers
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 配置消费者组id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
            // 设置分区分配策略
            properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
    
            // 1 创建一个消费者  "", "hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 订阅主题 sixTopic
            ArrayList<String> topics = new ArrayList<>();
            topics.add("sixTopic");
            kafkaConsumer.subscribe(topics);
    
            // 3 消费数据
            while (true){
    
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    

2.3、创建生产者

  • 创建CustomProducer生产者。

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    
    public class CustomProducerCallback {
    
        public static void main(String[] args) throws InterruptedException {
    
            //1、创建 kafka 生产者的配置对象
            Properties properties = new Properties();
    
            //2、给 kafka 配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            //3、指定对应的key和value的序列化类型 key.serializer value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            //4、创建 kafka 生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //5、调用 send 方法,发送消息
            for (int i = 0; i < 200; i++) {
                kafkaProducer.send(new ProducerRecord<>("sixTopic", "hello kafka" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                        }
                    }
                });
                Thread.sleep(2);
            }
    
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
    
    

2.4、测试

  • 首先,在 IDEA中分别启动消费者1、消费者2和消费者3代码
    在这里插入图片描述

  • 然后,在 IDEA中分别启动生产者代码
    在这里插入图片描述

  • 在 IDEA 控制台观察消费者1、消费者2和消费者3控制台接收到的数据,如下图所示:

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

2.5、RoundRobin分区分配策略代码案例说明

  • 由上述测试输出结果截图可知: 消费者1消费1、4分区的数据;消费者2消费2和5分区的数据;消费者3消费0、3、6分区的数据。
  • 说明:Kafka 采用修改后的RoundRobin分区分配策略。

三、RoundRobin 分区分配再平衡案例

3.1、停止某一个消费者后,(45s 以内)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 2、5号分区数据。
    在这里插入图片描述
  • 由下图控制台输出可知:3号消费者 消费到 0、3、6号分区数据。
    在这里插入图片描述

3.2、停止某一个消费者后,(45s 以后)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 1、3、5号分区数据。
    在这里插入图片描述

  • 由下图控制台输出可知:3号消费者 消费到 0、2、4、6号分区数据。
    在这里插入图片描述

3.3、RoundRobin 分区分配再平衡案例说明

  • 1号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
  • 消费者 1 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/994340.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL--MySQL表的增删改查(进阶)

check 聚合查找 count sum average max min 我们这里先构造出多张表 查询lisi同学的成绩 来自student和来自score c 增加名字这一条件 查询所有同学的总成绩以及个人信息 来自score和来自student 查询所有同学的各科成绩以及个人信息 来自student&#xff0c;course和…

数据分享|SAS数据挖掘EM贷款违约预测分析:逐步Logistic逻辑回归、决策树、随机森林...

全文链接&#xff1a;http://tecdat.cn/?p31745 近几年来&#xff0c;各家商业银行陆续推出多种贷款业务&#xff0c;如何识别贷款违约因素已经成为各家商业银行健康有序发展贷款业务的关键&#xff08;点击文末“阅读原文”获取完整数据&#xff09;。 相关视频 在贷款违约预…

Python开源项目周排行 2023年第33周

#2023年第33周2023年9月9日1feapder款上手简单&#xff0c;功能强大的 Python 爬虫框架&#xff0c;内置 AirSpider、Spider、TaskSpider、BatchSpider 四种爬虫解决不同场景的需求。命名源于 fast-easy-air-pro-spider 缩写。 支持断点续爬、监控报警、浏览器渲染、海量数据去…

【洛谷 P1105】平台 题解(结构体+暴力枚举)

平台 题目描述 空间中有一些平台。给出每个平台的位置&#xff0c;请你计算从每一个平台的边缘落下之后会落到哪一个平台上。注意&#xff0c;如果某两个平台的某个两边缘横坐标相同&#xff0c;物体从上面那个平台落下之后将不会落在下面那个平台上。平台可能会重叠。 如果…

使用GPU虚拟化技术搭建支持3D设计的职校学生机房(云教室)

背景 学校为职业学校&#xff0c;计算机教室需要进行Maya、Adobe Illustrator、Adobe Dreamweaver、Adobe PhotoShop等软件的教学。每个教室为35用户。资源需求为4核、8G内存、80G硬盘。 基于桌面虚拟化VDI技术的机房在成本、可管理性方面&#xff0c;相对于传统胖终端的机房…

004微信小程序云开发API数据库-插入记录-删除记录-更新记录

文章目录 1.微信小程序云开发API数据库-插入记录案例代码 2.微信小程序云开发API数据库-删除记录案例代码 3.微信小程序云开发API数据库-更新记录案例代码 1.微信小程序云开发API数据库-插入记录 微信小程序云开发API数据库是一个方便快捷的数据库解决方案&#xff0c;可以让开…

lv4 嵌入式开发-1 Linux文件IO

目录 1 文件的概念和类型 2 如何理解标准IO 3 流(FILE)的含义 3.1 流 3.2 文本流和二进制流 3.3 流的缓冲类型 4 小结 5 缓存区实验 1 文件的概念和类型 概念&#xff1a;一组相关数据的有序集合 文件类型&#xff1a; 常规文件 r 目录文件 d 字符设备文件 …

电力社区电力故障,潜在风险如何避免?

在现代社会中&#xff0c;电力已经成为我们日常生活不可或缺的一部分。它驱动着我们的家庭设备、照明系统、电子设备和许多其他关键基础设施。然而&#xff0c;电力的可靠性和安全性对于确保我们的住宅社区运行顺畅至关重要。 在这个背景下&#xff0c;配电柜监控成为了一个至关…

Python项目打包与部署(三):打包与部署的实际操作流程

其它章节 Python项目打包与部署(一)&#xff1a;模块与包的概念与关系Python项目打包与部署(二): init.py的作用及内容各类Python项目的项目结构及代码组织最佳实践 在实际项目开发过程中&#xff0c;标准化的项目打包与部署流程&#xff0c;对于开源项目&#xff0c;可以帮助你…

[移动通讯]【Carrier Aggregation-4】【LTE-2】

前言&#xff1a; 参考&#xff1a; 4G/LTE - LTE Advanced 参考&#xff1a; 《Carrier Aggregation Explained In 101 Seconds》 Qualcomm 《Carrier aggregation (CA) in LTE-Advanced by TELCOMA Global》 《Carrier Aggregation _CA_Part1》 《Carrier Aggregation _CA_Pa…

手动开发-简单的Spring基于XML配置的程序--源码解析

手动开发-简单的Spring基于XML配置的程序 文章目录 手动开发-简单的Spring基于XML配置的程序思路分析完整代码&&#xff1a; 本文带着大家写一个简单的Spring容器&#xff0c;通过读取beans.xml配置文件&#xff0c;获取第一个JavaBean&#xff1a;Monster的对象&#xff0…

光源控制器光源亮度调节操作说明

光源控制器光源亮度调节操作说明 光源亮度的调节在许多应用中都扮演着至关重要的角色&#xff0c;不仅影响图像质量&#xff0c;还能改善工作环境。下面是关于光源控制器光源亮度调节的详细操作步骤&#xff0c;帮助您轻松实现亮度的合适调整。 步骤一&#xff1a;登录系统 …

MySQL——常见问题

NULL和空值的区别 1、空值不占空间&#xff0c;NULL值占空间。当字段不为NULL时&#xff0c;也可以插入空值。 2、当使用 IS NOT NULL 或者 IS NULL 时&#xff0c;只能查出字段中没有不为NULL的或者为 NULL 的&#xff0c;不能查出空值。 3、判断NULL 用IS NULL 或者 is no…

Bootstrap与响应式图片设计相关的类

01-图像随父元素的同步缩放 可以利用类 .img-fluid 实现图像随父元素同步缩放。 示例代码如下&#xff1a; <!DOCTYPE html> <html> <head><meta charset"UTF-8"><title>图像的同步缩放</title><meta name"viewport&…

C/C++标准输入输出函数最全解析(含C/C++的输出控制符)

C/C标准输入输出函数最全解析&#xff08;含C/C的输出控制符&#xff09; 一、标准输入流1、C 标准输入1.1 标准输入流及对缓冲区的理解1.2 scanf()1.2.1 scanf()简介1.2.2 ANSI C中scanf()的转换说明1.2. 3 scanf()转换说明中的修饰符 1.3 gets() - 不建议1.4 fgets()1.5 fget…

【笔记】大模型时代下做科研的四个思路 - 论文精读·52

视频地址&#xff1a;大模型时代下做科研的四个思路 相关大模型 CV: ViT(22B) , ViT-G(2B) from google 多模态&#xff1a;ViT-E(4B) from google NLP&#xff1a;LLaMA(70B,130B,330B,651B) from Meta 提问&#xff1a;在模型越来越大的时代背景下&#xff0c;如何利用有限…

【接口测试】微信小程序接口自动化的实现方案

背景 前几天看到有人问微信小程序怎么进行接口自动化&#xff0c;所以想把自己如何进行微信小程序接口自动化的方法分享下。 https://developers.weixin.qq.com/miniprogram/dev/framework/open-ability/login.html这是微信小程序的登陆流程&#xff0c;小程序登陆需要在小程序…

un-app-手机号授权登录-授权框弹不出情况

前言 手机号授权是获取用户信息api停用之后&#xff0c;经常使用的api。但是此api也是有很多坑 手机号授权会出现调用不起来的情况&#xff0c;这是因为小程序后台没有进行微信认证导致的 手机号授权调用不起来-没有微信认证 来到小程序后台-设置-基本设置-下拉找到微信认证…

【ModelSim】查看波形图(Wave)和数据流图(DataFlow),以4-bit计数器为例

▚ 00 预备条件 &#x1f4e2; 本项目包括两个Verilog文件&#xff1a;测试文件counter.v和激励文件testbench.v&#xff1b;可从此处点击HERE获取。 &#x1f3ae; Windows10系统 &#x1f52e; ModelSim SE-64 2020.4 ▚ 01 启动ModelSim 有两种方式可以打开ModelSim&a…

vue下载与部分指令详解

目录 vue 下载地址 前端框架 MVC与MVVM框架 Vue使用 Vue.js指令 ​编辑 v-if v-else v-show v-on v-model&#xff08;表单绑定&#xff09; v-bind v-for v-text v-model 指令扩展 vue 下载地址 官方入门&#xff1a;https://cn.vuejs.org/ API 文档&#…