Kafka入门,分区的分配再平衡(二十)

news2025/1/11 9:50:12

分区的分配以及再平衡

在这里插入图片描述

1、kafka有四种主流的分区策略:Range,RoundRobin,Sticky,CooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Ranage+CooperativeSticky。Kafka可以同事使用多个分区分配策略。

参数描述
heartbeat.interval.msKafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于session.timeout.ms,也不应该高于session.timeoyt.ms的1/3
session.timeout.msKafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者执行再平衡
max.poll.interval.ms消费者处理消息的最大时长,默认是5分钟,超过该值被移除,消费者执行再平衡
partition.assignment.strategy消费者分区分配策略,默认策略是Range+CooperativeStickt。Kafka可以同事使用多个分区分配策略。可以选择策略包括:Range,RoundRobin,sticky,CooperativeSticky

Range以及再平衡

在这里插入图片描述

Range分区策略原理
Range是对每个topic而言
首先对同一个topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
加入现在有7个分区,3个消费者,排序后的分区将会是0,1,2,3,4,5,6消费者排序完之后将会是C0,C1,C2
通过partitions数/consumer数来决定每个消费者应该消费几个分区,如果除不尽,那么前面几个消费者将会多消费一个分区。
例如。7/3=2余1,除不尽,那么消费者C0便会多消费者1个分区。8/3=2余2,除不尽,那么C0和C1分别多消费一个。
注意:如果只是针对一个topic而言,C0消费者多一个分区影响不是很大,但是如果有N个topic,那么针对每个topic,消费者C0都将多消费一个分区,topic越多,C0消费的分区会比其他消费者明显多消费N个分区
容易尝试数据倾斜
测试代码

package com.longer.range;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * 测试指定分区(partition)
 */
public class Producer {
    public static void main(String[] args) throws InterruptedException {
        //1、创建kafka生产者得配置对象
        Properties properties=new Properties();
        //2、给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //3、key value 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //4、创建kafka生产者对象
        KafkaProducer<String,String> producer=new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 500; i++) {
            //指定数据发送到1号分区,key为空(IDEA中,ctrl+p查看参数)
            producer.send(new ProducerRecord<>("two", "longer " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if(e==null){
                        System.out.println(String.format("主题:%s,分区:%s",metadata.topic(),metadata.partition()));
                        return;
                    }
                    e.printStackTrace();
                }

            });
            Thread.sleep(1000);
        }
        //关闭资源
        producer.close();
    }
}

package com.longer.range;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer1 {
    public static void main(String[] args) {
        //创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题
        ArrayList<String> topics=new ArrayList<>();
        topics.add("two");
        kafkaConsumer.subscribe(topics);
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
        }
    }
}

用一个消费者每一秒发送一条信息,三个消费者接收。观察打印情况。再停止其中一个消费者,再观察情况。
(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 3、4 号分区数据。
2 号消费者:消费到 5、6 号分区数据。
0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需
要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 0、1、2、3 号分区数据。
2 号消费者:消费到 4、5、6 号分区数据。
说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

RoundRobin 以及再平衡

在这里插入图片描述
RoundRobin针对集群所有Topic而言
RoundRobin沦陷分区策略,是把所有的partition和所有的consumer都列出来,然后按照hashcode而进行排序,最后通过沦陷算法来分配partition给各个消费者。
修改分区策略

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");

测试代码

public class CustomConsumer1 {
    public static void main(String[] args) {
        //创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //修改分区策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
        //创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题
        ArrayList<String> topics=new ArrayList<>();
        topics.add("two");
        kafkaConsumer.subscribe(topics);
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
        }
    }
}

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 2、5 号分区数据
2 号消费者:消费到 4、1 号分区数据
0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,
分别由 1 号消费者或者 2 号消费者消费。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需
要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 0、2、4、6 号分区数据
2 号消费者:消费到 1、3、5 号分区数据
说明:消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

Sticky 以及再平衡

粘性分区定义:可以理解为分配的结果带有”粘性的“,即再执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配变动,可以节省大量的开销。
粘性分区时Kafka从0.11.x版本开始引入这种分配策略,首先会尽量保持原有分配的分区不变化
测试代码

package com.longer.sticky;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer1 {
    public static void main(String[] args) {
        //创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //修改分区策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
        //创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题
        ArrayList<String> topics=new ArrayList<>();
        topics.add("two");
        kafkaConsumer.subscribe(topics);
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
        }
    }
}

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 2、5、3 号分区数据。
2 号消费者:消费到 4、6 号分区数据。
0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别
由 1 号消费者或者 2 号消费者消费。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需
要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 2、3、5 号分区数据。
2 号消费者:消费到 0、1、4、6 号分区数据。
说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配

总结

range会造成数据倾斜,RoundRobin不会造成,但是分区调整不会考虑最小变动。sticky,尽量少的调整分配变动,可以节省大量的开销。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/725272.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【组合数学 or 枚举】逆序对

C-逆序对_Wannafly挑战赛6 (nowcoder.com) 题意&#xff1a; 思路&#xff1a; 组合数学&#xff0c;无非两种做法&#xff0c;一种是计数DP&#xff0c;另一种是组合数 DP显然不可能&#xff0c;那就组合数 考虑组合数的时候可以把这道题变成一个枚举题 我们去枚举位&…

Android shader编译原理

作者&#xff1a;tmaczhang 1. 什么是着色器编译卡顿&#xff1f; 着色器是在 GPU&#xff08;图形处理单元&#xff09;上运行的代码。当 Flutter 渲染的 Skia 图形后端首次看到新的绘制命令序列时&#xff0c;它有时会生成和编译一个自定义的 GPU 着色器用于该命令序列。使得…

JAVA对象转xml(支持递归生成复杂数据类型)

前言 调用一些soap协议的项目你或许使用的到&#xff0c;也许我是在造轮子&#xff0c;但是我没在网上找到合适的轮子&#xff0c;就根据现有的项目自己造了一个&#xff0c;废话不说&#xff0c;说思路 使用反射获取对象的属性&#xff0c;根据属性的类型做出相应的处理&…

计算机体系结构基础知识介绍之缓存性能的十大进阶优化之编译器控制的预取和利用HBM扩展内存层次(七)

优化九&#xff1a;编译器控制的预取以减少丢失惩罚或丢失率 硬件预取的替代方案是编译器在处理器需要数据之前插入预取指令来请求数据。 预取有两种类型&#xff1a; ■ 寄存器预取将值加载到寄存器中。 ■ 高速缓存预取仅将数据加载到高速缓存。 这两种类型都可以分为有错…

k8s对象操作的了解

一&#xff1a;什么是Kubernetes对象 Kubernetes对象指的是Kubernetes系统的持久化实体&#xff0c;所有这些对象合起来&#xff0c;代表了你集群的实际情况。常规的应用里&#xff0c;我们把应用程序的数据存储在数据库中&#xff0c;Kubernetes将其数据以Kubernetes对象的形…

unity+pico neo3入门教程

安装unity&#xff0c;教程如下&#xff1a;unity2021安装教程 安装pico的SDK:: https://developer-cn.pico-interactive.com/ 有入门教程&#xff1a;导入 SDK - PICO 开发者平台 注册后组织&#xff0c;创建应用learntest&#xff0c;如下 下载SDK。下载最新版&#xff…

旧固态硬盘复制到新固态硬盘多出一个分区怎么办?

朋友电脑只有512G而且只有一个硬盘口&#xff0c;然后就买了一款1T硬盘&#xff0c;去店里回来之后发现多出一个分区&#xff0c;无法直接在系统里合并 这时候我们就需要下载第三方软件&#xff0c;删除多余分区&#xff0c;扩展C盘 软件下载链接&#xff1a; DiskGenius.rar…

Stable Diffusion 用插件管理NNN个模型

当初步涉足 Stable Diffusion&#xff0c;可能会被各种新概念和模型搞得头大。好比我们作为新晋的魔法师&#xff0c;需要理解如何巧妙使用各种法师装备——也就是这些模型&#xff0c;以更好地应对问题&#xff0c;发挥出最大效果。 要了解一个被称为 safetensors 的概念。这…

TRACE请求造成XSS

漏洞描述 远端WWW服务支持TRACE请求。RFC 2616介绍了TRACE请求&#xff0c;该请求典型地用于测试HTTP协议实现。 漏洞危害 攻击者利用TRACE请求&#xff0c;结合其它浏览器端漏洞&#xff0c;有可能进行跨站脚本攻击&#xff0c;获取敏感信息&#xff0c;比如cookie中的认证信…

如何用思维导图规划旅行

又到了一年一度的旅游旺季&#xff0c;旅游是一件令人愉悦又放松的事情。合理的规划旅游不仅可以保证旅行的顺利进行。还可以帮助我们提前了解旅游地的信息&#xff0c;比如当地的民俗、文化、天气等等。特别的正值旅游旺季的时候&#xff0c;一份合理的规划&#xff0c;可以帮…

【IMX6ULL - LOGO替换】uboot阶段替换打印的开发板Logo教程

替换Board: I.MX6U VSTC,将其显示为Board: I.MX6U ALIENTEK (1)修改 uboot/board/freescale/mx6ullevk 路径下的 mx6ullevk.c 文件内容: int checkboard(void) {if (is_mx6ull_9x9_evk())puts(

ARM_串口解析器

include/serial.h #ifndef __UART4_H__ #define __UART4_H__#include "stm32mp1xx_rcc.h" #include "stm32mp1xx_gpio.h" #include "stm32mp1xx_uart.h"//引脚封装 #define GPIO_PIN_0 0 #define GPIO_PIN_1 1 #define GPIO_PIN_2 2 #define GP…

当你知道前后端分离与不分离的6个特点,你就不该再当点工了!

前后端不分离 在早期&#xff0c;Web 应用开发主要采用前后端不分离的方式&#xff0c;它是以后端直接渲染模板完成响应为主的一种开发模式。以前后端不分离方式开发的 Web 应用的架构图如下&#xff1a; 浏览器向服务器发起请求&#xff0c;服务器接收到请求后去数据库中获取…

大数据之数据采集项目延伸——sqoop

承接上篇文章 大数据之数据采集项目总结——hadoop&#xff0c;hive&#xff0c;openresty&#xff0c;frcp&#xff0c;nginx&#xff0c;flume https://blog.csdn.net/qq_43759478/article/details/131520375?spm1001.2014.3001.5501 在上个阶段&#xff1a;完成了数据收集&…

查找满足条件的文件

linux 系统下查询当前多个子目录下满足条件的文件 find ./ -mindepth 2 -name *.png

attention中为啥multi-head输出结果进行concat,得到x,x还要乘上一个WO矩阵?

刚刚在敲vit模型代码&#xff0c;突然一个疑问&#xff0c;就是multi-head输出结果进行concat&#xff0c;得到x&#xff0c;x的维度是预期维度&#xff0c;然后再乘以一个WO矩阵&#xff0c;为啥要乘上一个WO矩阵&#xff0c;x的维度已经是预期的了&#xff1f;&#xff1f;&a…

C#基础学习_类的方法

C#基础学习_类的方法 概念:描述对象的动态特征 类型:实例方法、静态方法等 方法的定义: 访问修饰符(默认为private) 返回值类型 方法名(类型 参数1,类型 参数2,...) {//这里编写方法的主体(功能实现的具体过程)return 返回值; //若没有返回值,则不需要写该语句 }

2023年第四届“华数杯”全国大学生数学建模竞赛(附历年赛题和论文)

目录 华数杯简介大赛资料获取方式 华数杯简介 国赛前的预热”华数杯“第四届正在报名中&#xff0c;看到咨询我们的同学不少&#xff0c;挺多同学都非常感兴趣&#xff0c;但是又不清楚比赛的相关情况&#xff0c;这里将会给同学们一一答疑。 比赛难度&#xff1a;难度适中&am…

【面试常见】JS继承与原型、原型链

前后端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★ 地址&#xff1a;前端面试题库 web前端面试题库 VS java后端面试题库大全 在 JavaScript 中&#xff0c;继承是实现代码复用和构建对象关系的重要概念。本文将讨论原型链继承、构造函数继承以及…

QML Canvas 制作动画

作者: 一去、二三里 个人微信号: iwaleon 微信公众号: 高效程序员 终于要介绍动画了,这意味着我们快要把 Canvas 学完了,所以是时候庆祝一下了… 要在 Canvas 上实现动画,需要间隔一定的时间重绘动画的下一帧,而且频率要足够快,这样才能在图像切换时看起来像动画一样。…