Kafka3.0.0版本——消费者(Sticky分区分配策略以及再平衡)

news2025/1/23 4:41:37

目录

    • 一、Sticky分区分配策略原理
    • 二、Sticky分区分配策略 示例需求
    • 三、Sticky分区分配策略代码案例
      • 3.1、创建带有7个分区的sevenTopic主题
      • 3.2、创建三个消费者 组成 消费者组
      • 3.3、创建生产者
      • 3.4、测试
      • 3.5、Sticky分区分配策略代码案例说明
    • 四、Sticky分区分配再平衡案例
      • 4.1、停止某一个消费者后,(45s 以内)重新发送消息示例
      • 4.2、停止某一个消费者后,(45s 以后)重新发送消息示例
      • 4.3、Sticky分区分配再平衡案例说明

一、Sticky分区分配策略原理

  • 粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
  • 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化

二、Sticky分区分配策略 示例需求

  • 设置主题为 sevenTopic,7个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察
    消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

三、Sticky分区分配策略代码案例

3.1、创建带有7个分区的sevenTopic主题

  • 在 Kafka 集群控制台,创建带有7个分区的sevenTopic主题

    bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 7 --replication-factor 1 --topic sevenTopic
    

    在这里插入图片描述

3.2、创建三个消费者 组成 消费者组

  • 复制 CustomConsumer1类,创建 CustomConsumer2和CustomConsumer3。这样可以由三个消费者组成消费者组,组名都为“test2”,设置分区分配策略为 Sticky。

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumer1 {
    
        public static void main(String[] args) {
    
            // 0 配置
            Properties properties = new Properties();
    
            // 连接 bootstrap.servers
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 配置消费者组id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");
            // 设置分区分配策略
            properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
    
            // 1 创建一个消费者  "", "hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 订阅主题 sevenTopic
            ArrayList<String> topics = new ArrayList<>();
            topics.add("sevenTopic");
            kafkaConsumer.subscribe(topics);
    
            // 3 消费数据
            while (true){
    
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    

3.3、创建生产者

  • 创建CustomProducer生产者。

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    
    public class CustomProducerCallback {
    
        public static void main(String[] args) throws InterruptedException {
    
            //1、创建 kafka 生产者的配置对象
            Properties properties = new Properties();
    
            //2、给 kafka 配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            //3、指定对应的key和value的序列化类型 key.serializer value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            //4、创建 kafka 生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //5、调用 send 方法,发送消息
            for (int i = 0; i < 200; i++) {
                kafkaProducer.send(new ProducerRecord<>("sevenTopic", "hello kafka" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                        }
                    }
                });
                Thread.sleep(2);
            }
    
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
    

3.4、测试

  • 首先,在 IDEA中分别启动消费者1、消费者2和消费者3代码
    在这里插入图片描述

  • 然后,在 IDEA中分别启动生产者代码
    在这里插入图片描述

  • 在 IDEA 控制台观察消费者1、消费者2和消费者3控制台接收到的数据,如下图所示:
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

3.5、Sticky分区分配策略代码案例说明

  • 由上述测试输出结果截图可知: 消费者1消费0、2、4分区的数据;消费者2消费1、3分区的数据;消费者3消费5、6分区的数据。
  • 说明:Kafka 采用修改后的Sticky分区分配策略。

四、Sticky分区分配再平衡案例

4.1、停止某一个消费者后,(45s 以内)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 2号分区数据。
    在这里插入图片描述

  • 由下图控制台输出可知:3号消费者 消费到 0、4号分区数据。
    在这里插入图片描述

4.2、停止某一个消费者后,(45s 以后)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 1、2、3号分区数据。
    在这里插入图片描述

  • 由下图控制台输出可知:3号消费者 消费到 0、4、5、6号分区数据。
    在这里插入图片描述

4.3、Sticky分区分配再平衡案例说明

  • 1 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 2号消费者或者 3号消费者消费。

  • 1号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

  • 消费者 1 已经被踢出消费者组,所以重新按照粘性方式分配。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/996382.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器人抓取检测技术的研究现状

1.分析法 图 1 为分析法在进行抓取检测时所采用的 一般策略[3] [3]Sahbani A, El-Khoury S, Bidaud P. An overview of 3D object grasp synthesis algorithms[J]. Robotics and Autonomous Systems, 2012, 60(3): 326-336. 首先,基于环境限制和机械手与物体模 型进行抓取检…

算法宝典——Java版本(持续更新)

目录 一、链表的算法题&#xff08;目前9道&#xff09; 1. 移除链表元素&#xff08;思路&#xff1a;前后指针&#xff09; 2. 反转一个单链表 &#xff08;思路&#xff1a;头插法&#xff09; 3. 链表的中间结点&#xff08;思路&#xff1a;快慢指针&#xff09; 4. 链…

衷心祝福“好教师”节日快乐

在2023年教师节的今天&#xff0c;本“人民体验官”推广人民日报官方微博文化产品《张桂梅说教师这个职业有苦有幸福》。 图&#xff1a;来源“人民体验官”推广平台 人民微博简述道&#xff1a;“今天&#xff0c;张桂梅祝全国老师们节日快乐。” 张桂梅说&#xff1a;“教师…

图像的几何变换(缩放、平移、旋转)

图像的几何变换 学习目标 掌握图像的缩放、平移、旋转等了解数字图像的仿射变换和透射变换 1 图像的缩放 缩放是对图像的大小进行调整&#xff0c;即 使图像放大或缩小 cv2.resize(src,dsize,fx0,fy0,interpolationcv2.INTER_LINEAR) 参数&#xff1a; src :输入图像dsize…

机构企业学员培训知识付费小程序开源版开发

机构企业学员培训知识付费小程序开源版开发 用户注册与登录&#xff1a;提供用户注册和登录功能&#xff0c;以便用户能够访问和使用小程序。个人信息管理&#xff1a;允许用户管理个人资料&#xff0c;包括修改个人信息和上传个人头像。课程浏览&#xff1a;提供课程列表&…

[技术讨论]讨论问题的两个基本原则——17年前的文字仍然有效

前两天又有人找我讨论问题&#xff0c;而且是他自己的项目&#xff0c;内容与我没有任何关系&#xff0c;他说的&#xff0c;却是讨论。 其实就是想来做咨询&#xff0c;又不想付费。 今天看到了十七年前写的这篇文字&#xff0c;就重发一下了&#xff0c;当然有少量文字修订&a…

【Java并发】聊聊ReentrantReadWriteLock锁降级和StampedLock邮戳锁

面试题 1.你说你用过读写锁&#xff0c;锁饥饿问题是什么&#xff1f; 2.有没有比读写锁更快的锁&#xff1f; 3.StampedLock知道吗?(邮戳锁/票据锁) 4.ReentrantReadWriteLock有锁降级机制策略你知道吗&#xff1f; 在并发编程领域&#xff0c;有多线程进行提升整体性能&…

c++ - 抽象类 和 多态当中一些问题

抽象类 纯虚函数 在虚函数的后面写上 0 &#xff0c;则这个函数为纯虚函数。 class A { public:virtual void func() 0; }; 纯虚函数不需要写函数的定义&#xff0c;他有类似声明一样的结构。 抽象类概念 我们把具有纯虚函数的类&#xff0c;叫做抽象类。 所谓抽象就是&a…

124个Python案例,完整源代码!

大家好&#xff0c;我是涛哥。 很多小伙伴为了掌握爬虫这门技术&#xff0c;投入了大量的时间和精力。他们在深夜里独自码字&#xff0c;他们在周末的时候熟读代码&#xff0c; 但独自学习&#xff0c;没有朋友的陪伴和指导&#xff0c;学习的过程就像是在一条无尽的道路上徘…

【多线程】wait 、notify 和 notifyAll 讲解

wait 、notify 和 notifyAll 讲解 一. wait二. wait 和 sleep 的对比三. notify四. notifyAll五. notify 与 notifyAll 的原理 由于线程之间是抢占式执行的, 因此线程之间执行的先后顺序难以预知. 但是实际开发中有时候我们希望合理的协调多个线程之间的执行先后顺序. 完成这个…

军队状态出现的六种结果,是将帅的过失

军队状态会出现六种坏结果&#xff0c;是将帅的过失 【安志强趣讲《孙子兵法》第35讲】 【原文】 故兵有走者&#xff0c;有弛者&#xff0c;有陷者&#xff0c;有崩者&#xff0c;有乱者&#xff0c;有北者。凡此六者&#xff0c;非天之灾&#xff0c;将之过也。 【趣讲白话】…

【代码分析】初学解惑C++:函数适配器

文章目录 前置知识 运算符的重载“&#xff08;&#xff09;”一、函数适配器是什么&#xff1f;由遇到的问题引出适配器模式类模式对象模式例1例2例3例4二、实现函数适配器1.定义函数2.定义函数适配器3.使用函数适配器 三、带模板的函数适配器1、自定义unary_function2、改写带…

alibaba按关键字搜索商品 API

为了进行电商平台 的API开发&#xff0c;首先我们需要做下面几件事情。 1&#xff09;开发者注册一个账号 2&#xff09;然后为每个alibaba应用注册一个应用程序键&#xff08;App Key) 。 3&#xff09;下载alibaba API的SDK并掌握基本的API基础知识和调用 4&#xff09;利…

vue3中,调接口,渲染数据

1. 封装接口文档 // src/apis/xxx.js中 // 1. 导入 封装的axios实例 import request from /utils/http // 2. 封装接口 --获取轮播图数据 export const getBannerAPI (params {})>{// 传默认参数->&#xff08;传参 默认参数&#xff09;const { distributionSite…

Reids的安装使用

Windows 版本的 Redis 是 Microsoft 的开源部门提供的 Redis. 这个版本的 Redis 适合开发人员学习使用&#xff0c;生产环境中使用 Linux 系统上的 Redis, 这里讲解了这两种的安装和下载。按照你们需要的liunx 或window步骤来 就可以了&#xff08;也可以留言&#xff0c;后面看…

c++ explicit关键作用

explicit 概念引入1.explicit 介绍1.1 显示调用和隐式调用1.2 explicit意义 概念引入 构造函数不仅可以构造并初始化对象&#xff0c;对于具有单个参数或者除第一个参数无默认值其余均有默认值的构造函数&#xff0c;还具有类型转换作用。 而explicit关键字&#xff0c;恰恰可…

useGetState自定义hooks解决useState 异步回调获取不到最新值

setState 的两种传参方式 1、直接传入新值 setState(options); const [state, setState] useState(0); setState(state 1); 2、传入回调函数 setState(callBack); const [state, setState] useState(0); setState((prevState) > prevState 1); // prevState 是改变之…

BUUCTF reverse2 1

使用die查看文件信息&#xff0c;发现是ELF64位程序&#xff0c; 也就是说这是linux上的运行程序 再linux上运行 使用IDA64打开文件 F5 反编译 可以看到这里和flag进行对比 点击flag 点击这个7Bh&#xff0c;然后按r flag出来了 {hacking_for_fun}加上flag头提交 flag{h…

Python入门教程35:使用email模块发送HTML和图片邮件

smtplib模块实现邮件的发送功能&#xff0c;模拟一个stmp客户端&#xff0c;通过与smtp服务器交互来实现邮件发送的功能&#xff0c;可以理解成Foxmail的发邮件功能&#xff0c;在使用之前我们需要准备smtp服务器主机地址、邮箱账号以及密码信息。 #我的Python教程 #官方微信公…

用Canape配置VX1000的工程,在DA中绘制各个传感器目标的方法

参考本文档可帮助读者,快速安装VX1000软件,根据自己的需求,实现传感器目标在canape中DA的绘制。 介绍 Driver assistance (DA)系统是通过各种传感器(如视频、雷达、激光雷达等)获取有关车辆环境的信息。根据传感器对物体检测的结果(例如与前方车辆的距离)对驾驶员发…