大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器

news2025/1/10 10:44:49

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下内容:
现实中业务中我们遇到了分区副本数量想要调整的问题,假设起初我们的分区副本数只有1,想要修改为2、3来保证当部分Kafka的Broker宕机时,仍然可以提供服务给我们,但是不可以用脚本直接修改,所以我们通过JSON+脚本的方式,来达到Kafka副本分区的调整。

  • 启动服务、创建主题、查看主题
  • 修改分区副本因子(不允许)、修改分区副本因子(成功)
  • 查看结果

在这里插入图片描述

分区分配策略

在Kafka中,每个Topic会包含多个分区,默认情况下一个分区只会被一个消费组下面的一个消费者消费,这里就产生了分区分配的问题。
Kafka中提供了多重分区分配算法(PartitionAssignor):

  • RangeAssignor
  • RoundRobinAssignor
  • StickAssignor

在这里插入图片描述

RangeAssignor

  • PartionAssignor 接口用于用户自定义分区分配算法,以实现Consumer之间的分区分配。
  • 消费组的成员定义他们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker,协调者选择其中一个消费者来执行这个消费组的分区,并将分配结果转发给消费组内所有的消费者。
  • Kafka默认采用的是RangeAssignor的分配算法。

在这里插入图片描述

  • RangeAssignor对每个Topic进行独立的分区分配,对于每一个Topic,首先对分区按照分区ID进行数值排序,然后订阅这个Topic的消费组的消费者再进行字典排序,之后尽量均衡的将分区分配给消费者,这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,有一些消费者就会多分配到一些分区。
  • RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运行来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能的均匀的分配给所有的消费者。
  • 对于每一个Topic,RangerAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够分配均衡,那么字典序靠前的消费者会被多分配一个分区。

在这里插入图片描述

RoundRobinAssignor

  • RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。
  • 如果消费组内,消费订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果尽量均衡。如果订阅的Topic列表是不同的,那么分配结果不保证尽量均衡。

在这里插入图片描述

  • 对于 RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的方式能让消费者之间尽量均衡的分配到分区(分配到的分区的差值不会超过1,而RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越大)
  • 对于消费组内消费者订阅Topic不一致的情况:假设有两个消费者分别为C0和C1,有2个TopicT1、T2,分别有3个分区、2个分区,并且 C0 订阅了T1和T2,那么RoundRobinAssignor的分配结果如下:

在这里插入图片描述

StickyAssignor

尽管 RoundRobinAssignor 已经在 RangeAssignoror 上做了一些优化来更均衡的分配分区,但是在一些情况下依旧会产生严重的分配偏差,比如消费组中订阅的Topic列表不相同的情况下。
更核心的问题是无论是 RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上一次的分配结果。显然,在执行一次新的分配之前,如果能考虑到上一次的分配结果,尽量少的调整分区分配的变动,显然是能减少很多开销的。
Sticky是“粘性的”,可以理解为分配是带粘性的:

  • 分区的分配尽量的均衡
  • 每一次重分配的结果尽量与上一次分配结果保持一致

当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个才是真正体现出 StickyAssignor 特性的。

假设当前有如下内容:

  • 3个Consumer C0、C1、C2
  • 4个Topic:T0、T1、T2、T3 每个Topic有2个分区
  • 所有Consumer都订阅了4个分区

在这里插入图片描述
如果 C1 宕机,此时 StickyAssignor 的结果:
在这里插入图片描述

自定义分区策略

基本概念

需要实现:org.apache.kafka.clients.consumer.internals.PartitionAssignor 接口
其中定义了两个内部类:

  • Subscription:用来表示消费者的订阅信息,类中有两个属性:topics、userData,分别表示消费者所订阅Topic列表和用户自定义信息。
  • PartitionAssignor接口通过subscription()方法来设置消费者自身相关的Subscription信息,注意此方法中只有一个参数Topics,与Subscription类中的topics相互呼应,但是并没有有关userData的参数体现。为了增强用户对分配结果的控制,可以在Subscription()方法内部添加一些影响分配的用户自定义信息赋予userData,比如:权重、IP地址、HOST或者机架
  • Assignment:用来表示分配信息的,类中有两个属性:partitions、userData,分别表示所分配到的分区集合和用户自定义的数据,可以通过PartitonAssignor接口中的onAssignment()方法是在每个消费者收到消费组Leader分配结果时的回调函数,例如在:StickyAssignor策略中就是通过这个方法保存当前的分配方案,以备下次消费组再平衡(Rebalance)时可以提供分配参考依据。

Kafka还提供了一个抽象类:org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化 PartitionAssignor 接口的实现,对 assign() 方法进行了实现,其中将Subscription的 userData信息去掉后,在进行分配。

代码实现

import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;

import java.nio.ByteBuffer;
import java.util.*;

public class WeightedPartitionAssignor implements ConsumerPartitionAssignor {

    @Override
    public Subscription subscription(Set<String> topics) {
        // 在这里添加权重信息到 userData
        ByteBuffer buffer = ByteBuffer.allocate(4);
        buffer.putInt(getWeight());
        buffer.flip();
        return new Subscription(new ArrayList<>(topics), buffer);
    }

    @Override
    public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
        Map<String, Assignment> assignments = new HashMap<>();
        Map<TopicPartition, List<String>> partitionConsumers = new HashMap<>();

        // 遍历所有订阅的topics
        for (String topic : metadata.topics()) {
            List<TopicPartition> partitions = metadata.partitionsForTopic(topic);
            for (TopicPartition partition : partitions) {
                partitionConsumers.putIfAbsent(partition, new ArrayList<>());
            }
        }

        // 根据权重分配分区
        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
            String consumerId = subscriptionEntry.getKey();
            Subscription subscription = subscriptionEntry.getValue();
            int weight = subscription.userData().getInt();

            for (String topic : subscription.topics()) {
                List<TopicPartition> partitions = metadata.partitionsForTopic(topic);
                for (TopicPartition partition : partitions) {
                    List<String> consumers = partitionConsumers.get(partition);
                    for (int i = 0; i < weight; i++) {
                        consumers.add(consumerId);  // 权重高的消费者多次添加,增加选中的机会
                    }
                }
            }
        }

        // 随机分配分区给消费者
        Random random = new Random();
        for (Map.Entry<TopicPartition, List<String>> entry : partitionConsumers.entrySet()) {
            List<String> consumers = entry.getValue();
            String assignedConsumer = consumers.get(random.nextInt(consumers.size()));
            assignments.computeIfAbsent(assignedConsumer, k -> new Assignment(new ArrayList<>()))
                       .partitions().add(entry.getKey());
        }

        return assignments;
    }

    @Override
    public void onAssignment(Assignment assignment, Cluster metadata) {
        // 可以在这里处理分配后的逻辑,比如保存当前分配的快照
    }

    @Override
    public String name() {
        return "weighted";
    }

    private int getWeight() {
        // 获取权重,可以从配置文件或环境变量中获取
        return 10; // 默认权重为10
    }
}

注册使用

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, WeightedPartitionAssignor.class.getName());
// 配置其他消费者属性

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1", "topic2"));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1987632.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

虚拟机Centos7 minimal版本安装docker

1、在 CentOS 7 上启用 EPEL 软件包存储库&#xff1b; &#xff08;删除epel软件包和其他操作可参考&#xff1a;如何在 CentOS 7 上使用 EPEL (linux-console.net)&#xff09; 1.1&#xff1a; 要安装epel前会报错&#xff0c;如下所示&#xff1a; 先参照这个链接安装&a…

【python】OpenCV—Image Super Resolution

文章目录 1、背景介绍2、准备工作3、EDSR4、ESPCN5、FSRCNN6、LapSRN7、汇总对比8、参考 1、背景介绍 图像超分&#xff0c;即图像超分辨率&#xff08;Image Super Resolution&#xff0c;简称SR&#xff09;&#xff0c;是指由一幅低分辨率图像或图像序列恢复出高分辨率图像…

HTML基础 - HTML5

目录 一. 简介 二. 新增元素 三. 拖放 地理定位 A、HTML5 拖放&#xff08;Drag and Drop&#xff09; B.HTML5 地理定位&#xff08;Geolocation&#xff09; 四. input 五. web存储 webSQL 六. 应用程序缓存 web workers 七. web socket 可以先看上篇HTML基础再来看…

RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别

文章目录 一、kafka和rabbitmq全面对比分析1.1 简介1.2 kafka和rabbitmq全面对比分析1.3 影响因素 二、RabbitMQ、Kafka主要区别2.1 详解/主要区别2.1.1 设计目标和适用场景2.1.2 架构模型方面2.1.3 吞吐量和性能2.1.4 消息存储和持久化2.1.5 消息传递保证2.1.6 集群负载均衡方…

理解二分搜索算法

一.介绍 在本文中&#xff0c;我们将了解二分搜索算法。二分搜索算法是一种在排序数组中查找特定元素的高效方法。它的工作原理是将搜索间隔反复分成两半&#xff0c;从而大大减少了找到所需元素所需的比较次数。该算法的时间复杂度为 O(log n)&#xff0c;因此对于大型数据集…

CLOS架构

CLOS Networking CLOS Networking 是指使用 Clos 网络拓扑结构&#xff08;Clos Network Topology&#xff09;进行网络设计的一种方法。该方法是由贝尔实验室的工程师 Charles Clos 在1950年代提出的&#xff0c;以解决电路交换网络的可扩展性和性能问题。随着现代计算和网络…

SpringBoot基础(一):快速入门

SpringBoot基础系列文章 SpringBoot基础(一)&#xff1a;快速入门 目录 一、SpringBoot简介二、快速入门三、SpringBoot核心组件1、parent1.1、spring-boot-starter-parent1.2、spring-boot-dependencies 2、starter2.1、spring-boot-starter-web2.2、spring-boot-starter2.3、…

YOLOv10改进 | 主干篇 | YOLOv10引入CVPR2023 顶会论文BiFormer用于主干修改

1. 使用之前用于注意力的BiFormer在这里用于主干修改。 YOLOv10改进 | 注意力篇 | YOLOv10引入BiFormer注意力机制 2. 核心代码 from collections import OrderedDict from functools import partial from typing import Optional, Union import torch import torch.nn as n…

C++:vector容器

概览 std::vector是C标准模板库(STL)中的一种动态数组容器。它提供了一种类似于数组的数据结构&#xff0c;但是具有动态大小和更安全的内存管理。 定义和基本特性 std::vector是C标准库中的一 个序列容器&#xff0c;它代表了能够动态改变大小的数组。与普通数组一样&#x…

酒店智能插座在酒店智慧化中的重要性

在当今数字化和智能化的时代&#xff0c;酒店行业也在不断追求创新和提升服务品质&#xff0c;以满足客人日益增长的需求。酒店智能插座作为酒店智慧化的重要组成部分&#xff0c;发挥着不可忽视的作用。 提升客人的便利性&#xff1a; 酒店智能插座能够为客人提供更加便捷的充…

使用 Java Swing 的 IMEI 验证器

一.介绍 本文档介绍如何使用 Java Swing 创建一个简单的 IMEI 验证器应用程序。 二.什么是 IMEI 号码 IMEI 代表国际移动设备识别码。IMEI 用于在移动设备连接到网络时对其进行识别。每个 GSM、CDMA 或卫星移动设备都有唯一的 IMEI 号码。此号码将印在设备电池组件内。用户可…

Flutter GPU 是什么?为什么它对 Flutter 有跨时代的意义?

Flutter 3.24 版本引入了 Flutter GPU 概念的新底层图形 API flutter_gpu &#xff0c;还有 flutter_scene 的 3D 渲染支持库&#xff0c;它们目前都是预览阶段&#xff0c;只能在 main channel 上体验&#xff0c;并且依赖 Impeller 的实现。 Flutter GPU 是 Flutter 内置的底…

Python3 第六十六课 -- CGI编程

目录 一. 什么是 CGI 二. 网页浏览 三. CGI 架构图 四. Web服务器支持及配置 五. 第一个CGI程序 5.1. HTTP 头部 5.2. CGI 环境变量 六. GET和POST方法 6.1. 使用GET方法传输数据 6.1.1. 简单的url实例&#xff1a;GET方法 6.1.2. 简单的表单实例&#xff1a;GET方法…

暑期数据结构 空间复杂度

3&#xff0e;空间复杂度 空间复杂度也是一个数学表达式&#xff0c;是对一个算法在运行过程中临时占用存储空间大小的量度。 空间复杂度不是程序占用了多少bytes的空间&#xff0c;因为这个也没太大意义&#xff0c;所以空间复杂度算的是变量的个数。空间复杂度计算规则基本跟…

SAM2:在图像和视频中分割任何内容

SAM 2: Segment Anything in Images and Videos 一、关键信息 1. SAM 2概述&#xff1a; SAM 2 是一种基础模型&#xff0c;设计用于在图像和视频中实现可提示的视觉分割。该模型采用变压器架构和流式内存进行实时视频处理。它在原始的Segment Anything Model&#xff08;SAM…

自用 K8S 资源对象清单 YAML 配置模板手册-1

Linux 常用资源对象清单配置速查手册-1 文章目录 1、Pod 容器集合2、Pod 的存储卷3、Pod 的容器探针4、ResourceQuota 全局资源配额管理5、PriorityClass 优先级类 管理多个资源对象清单文件常用方法&#xff1a; 使用 sed 流式编辑器批量修改脚本键值进行资源清单的创建&am…

【高中数学/函数/值域】求f(x)=(x^2+1)^0.5/(x-1) 的值域

【问题】 求f(x)(x^21)^0.5/(x-1) 的值域 【来源】 《高中数学解题思维策略》P3 例1-1 杨林军著 天津出版传媒集团出版 【解答】 表达式说明f(x)(x^21)^0.5/(x-1)f(x)((x^21)/(x-1)^2)^0.5准备采用配方法f(x)(12/(x-1)2/(x-1)^2)^0.5(1)式f(x)(2*(1/(x-1)1/2)^21/2)^0.5(2)…

Pytorch系列-张量的类型转换

&#x1f308;个人主页&#xff1a;羽晨同学 &#x1f4ab;个人格言:“成为自己未来的主人~” 张量转换为NumPy数组 使用Tensor.numpy()函数可以将张量转换为ndarray数组 # 1.将张量转换为numpy数组 data_tensortorch.tensor([2,3,4]) # 使用张量对象中的numpy函数进行转…

LiveNVR监控流媒体Onvif/RTSP常见问题-页面上传SSL证书配置开启 HTTPS 服务?什么时候必须要开启HTTPS服务?

LiveNVR常见问题-页面上传SSL证书配置开启 HTTPS 服务&#xff1f;什么时候必须要开启HTTPS服务&#xff1f; 1、配置开启HTTPS1.1、准备https证书1.2、配置HTTPS端口1.3、配置证书路径1.3、 页面上传SSL证书 2、验证HTTPS服务3、为什么要开启HTTPS4、RTSP/HLS/FLV/RTMP拉流Onv…

Vue3+TS+element plus实现一个简单列表页面

期望完成效果 1.创建一个api api内容&#xff1a; 根据接口&#xff1a; 修改 url 和 函数的参数 以及 params里的内容 import { request } from "/utils/service" /** 查 */ export function getDyLogDataApi(page: any, limit: any, campaign_id: any, adgroup_id…