详解Kafka分区机制原理|Kafka 系列 二

news2024/11/25 12:22:25

Kafka 系列第二篇,详解分区机制原理。为了不错过更新,请大家将本号“设为星标”。

点击上方“后端开发技术”,选择“设为星标” ,优质资源及时送达

上一篇文章介绍了 Kafka 的基本概念和术语,里面有个概念是 分区(Partition)。

kafka 将 一个Topic 中的消息分成多份,分别存储在不同的 Broker 里,这每一段消息被 kafka 称为分区,其中每条消息只会保存在一个分区中。

如果不太理解请回顾上一篇:

5b2ad62e786f5c79be317c3bd9772c58.jpeg

开始学习 Kafka,一文掌握基本概念|Kafka 系列 一

 

为什么有分区?

为什么要有分区呢?

Kafka 的分区机制的本质就是将一个大的 Topic 进行拆分,将一组很大的队列拆分成了多组队列。这样做有以下几个好处:

  1. 因为一个 Topic 中的消息可能非常多,多到一台Broker存不下,因此需要拆分成多段存储在不同的机器里,实现负载均衡。

  2. 拆分成多个队列,可以在多个生产者和消费者的情况下发挥多机性能,可以分流和并行处理消息,从而提高读写性能,提升系统的吞吐力。

  3. 有利于系统扩缩容,提高系统的可扩展性。不同分区在不同的broker上,可以通过增加新机器提高吞吐,并且增加新机器的时候可以通过调整分区的分布来调配负载。

0de38ad040a27e5ddabd8449fac6d55e.png

但是分区数不是越多越好,需要根据系统具体情况来设置。比如3个Broker就应该至少有3个分区,如果broker性能之间有差异,可以调大分区数进行调配。也可以通过broker的倍数来设置分区数,并且进行性能压测,测试集群的吞吐量。

分区数过多会带来资源管理上的消耗,清除日志时间变长,集群broker故障后分区leader重选时间变长,客户端消费端线程数需求增加,甚至导致连接所需的socket消耗增加。

分区策略

分区策略就是决定生产者将会把消息发送到具体哪个分区的算法,分区策略由 Partitioner 接口实现。

自定义分区策略

用于分区的 partition 方法定义如下:

/**
     * Compute the partition for the given record.
     *
     * @param topic topic名 The topic name
     * @param key 用于分区的key The key to partition on (or null if no key)
     * @param keyBytes 用于分区的序列号key The serialized key to partition on( or null if no key)
     * @param value 用于分区的值 The value to partition on or null
     * @param valueBytes 用于分区的序列号值 The serialized value to partition on or null
     * @param cluster 当前集群元数据 The current cluster metadata
     */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

可以看出,这里提供了 Topic 和一些跟消息有关的key参数,cluster 是集群信息,包含Kafka 当前的Node 数据以及Topic、partition数据等。有了这些数据,具体拿到一条消息该发往哪个分区,我们就可以根据已有信息制定自己的分区策略。

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

我们实现了自定义的 Partition 类之后,就可以设置 partitioner.class 为目标策略类,Producer 就会按照我们的自定义策略来对消息进行分区。

默认分区策略

Kafka 提供了默认分区策略 DefaultPartitioner,策略内容如下:

  1. 如果在消息中指定了分区,优先使用指定的分区。

  2. 如果没有指定分区,但存在分区键,则根据序列化key使用murmur2哈希算法对分区数取模。

  3. 如果没有指定分区或分区键,则会使用粘性分区策略。(关于粘性分区策略后面讲解)

在实际生产中,我们一般都默认使用此策略,无需修改。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                         int numPartitions) {
    if (keyBytes == null) {
        return stickyPartitionCache.partition(topic, cluster);
    }
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

注意,这里指的分区键是序列化后的key,也就是变量 keyBytes,其他key、value、valueBytes 并没用到。

byte[] keyBytes = keySerializer.serialize(topic, record.headers(), record.key());
default byte[] serialize(String topic, Headers headers, T data) {
  // data 变量
    return serialize(topic, data);
}

看到 key 等序列化方法我们可以明白,key 的序列号值只受到 record.key() 的影响,所以同样的key会被固定分配到同样的partition中。(注意这里的key是指用于分区的key,而不是topic)

e16317cdfef524b9779fd048bbc2e591.png

粘性分区策略

实现类为 UniformStickyPartitioner ,他与默认分区策略的区别是:

  • DefaultPartitionerd 默认分区策略:如果有分区键的话,会按照分区键来决定分区,这个时候并不会使用粘性分区策略。

  • UniformStickyPartitioner粘性分区策略:无论有没有分区键,都用粘性分区来分配。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    return stickyPartitionCache.partition(topic, cluster);
}

什么是粘性分区策略?

我们需要知道,在Producer在发送消息的时候,会将消息放到一个ProducerBatch中, 然后多条消息批量发送。这样可以减少网络请求次数,提高消息的发送效率。

所以批量发送消息有两个条件:

  1. 一个batch满了,与 batch.size有关,一般大小是16k。

  2. linger.ms时间到了。

满足任意一个条件,都会触发sender线程的发送。如果生产的消息较少,batch没有满,就必须等到等待时间到了,这就导致了较长的延迟。

因为ProducerBatch是多个,为了让消息尽可能快的发送,就需要让其中一个ProducerBatch先变满。

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

注意:一个分区对应一个双端队列Deque<ProducerBatch>>

粘性分区策略就是在相同的分区中,优先填满一个ProducerBatch,发送,再去填充另一个ProducerBatch。参见下图,第一个分区会被优先塞满并发送。

fb4f66d75ce57e9231f4ea8ed6c56c88.png

在一个 ProducerBatch 发送结束,选择新分区的时候,是随机选择的,之后便会继续优先填满新的分区。

  • 可用分区<1 ,所有分区中随机选择。

  • 可用分区=1,选择这个分区。

  • 可用分区>1,所有可用分区中随机选择。

public int nextPartition(String topic, Cluster cluster, int prevPartition) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        Integer oldPart = indexCache.get(topic);
        Integer newPart = oldPart;
        // Check that the current sticky partition for the topic is either not set or that the partition that 
        // triggered the new batch matches the sticky partition that needs to be changed.
        if (oldPart == null || oldPart == prevPartition) {
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() < 1) {
                Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = random % partitions.size();
            } else if (availablePartitions.size() == 1) {
                newPart = availablePartitions.get(0).partition();
            } else {
                while (newPart == null || newPart.equals(oldPart)) {
                    int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    newPart = availablePartitions.get(random % availablePartitions.size()).partition();
                }
            }
            // Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
            if (oldPart == null) {
                indexCache.putIfAbsent(topic, newPart);
            } else {
                indexCache.replace(topic, prevPartition, newPart);
            }
            return indexCache.get(topic);
        }
        return indexCache.get(topic);
    }

轮询分区策略

Kafka 中提供了轮训策略的实现 RoundRobinPartitioner。当用户希望将写操作均匀地分发到所有分区时,可以使用此分区策略。

举例,有三个分区,针对于同一个producer,第一条消息发送到partition1,第二条消息发送到partition2,第三条发送到partition3,以此类推。

7372d4c65e8afd01c61335a18caddd6b.png
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    // 分区数
    int numPartitions = partitions.size();
    // 下一个自增值
    int nextValue = nextValue(topic);
    // 获取此主题的可用分区列表
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (!availablePartitions.isEmpty()) {
        // topic可用分区不为空,取余
        int part = Utils.toPositive(nextValue) % availablePartitions.size();
        return availablePartitions.get(part).partition();
    } else {
        // 没有可用的分区,给出一个不可用的分区
        // no partitions are available, give a non-available partition
        return Utils.toPositive(nextValue) % numPartitions;
    }
}

hash 键的值并不会影响到数据的分布,这应该是数据均匀度最好的策略,可以保证消息最大程度的平均分配到所有分区。

除了官方提供的策略,我们还可以实现自己的分区策略,比如随机策略,实现起来也很简单;比如按照业务键去分区的策略;比如按照ip分区的策略等。

最后,欢迎大家提问和交流。

加入讨论群是升职加薪第一步!

回复:加群

9cc714a9272b10f02503d017f1df6ee5.jpeg

点赞是一种美德,如对您有帮助,欢迎评论和分享,感谢阅读!

实战总结|记一次消息队列堆积的问题排查

2023-07-18

445cd86f0993eb1268b6ff21b1866d61.jpeg

从二叉查找树到B*树,一文搞懂搜索树的演进!|原创

2023-05-23

69792976b8a1929fd83b907b1d130d17.jpeg

CAP、BASE理论真的很重要!|分布式事务系列(一)

2023-05-06

7a8d17512f5d9cd2d3cde5fa8560fac0.jpeg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/845095.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

音视频技术开发周刊 | 305

每周一期&#xff0c;纵览音视频技术领域的干货。 新闻投稿&#xff1a;contributelivevideostack.com。 大神回归学界&#xff1a;何恺明宣布加入 MIT 「作为一位 FAIR 研究科学家&#xff0c;我将于 2024 年加入麻省理工学院&#xff08;MIT&#xff09;电气工程与计算机科学…

LeetCode 130. 被围绕的区域

题目链接&#xff1a;130. 被围绕的区域 题目描述 给你一个 m x n 的矩阵 board &#xff0c;由若干字符 ‘X’ 和 ‘O’ &#xff0c;找到所有被 ‘X’ 围绕的区域&#xff0c;并将这些区域里所有的 ‘O’ 用 ‘X’ 填充。 示例1&#xff1a; 输入&#xff1a;board [[“…

论文阅读 RRNet: A Hybrid Detector for Object Detection in Drone-captured Images

文章目录 RRNet: A Hybrid Detector for Object Detection in Drone-captured ImagesAbstract1. Introduction2. Related work3. AdaResampling4. Re-Regression Net4.1. Coarse detector4.2. Re-Regression 5. Experiments5.1. Data augmentation5.2. Network details5.3. Tra…

线性代数(二) 矩阵及其运算

前言 行列式det(A) 其实表示的只是一个值 ∣ a b c d ∣ a d − b c \begin{vmatrix} a & b\\ c & d\end{vmatrix} ad -bc ​ac​bd​ ​ad−bc&#xff0c;其基本变化是基于这个值是不变。而矩阵表示的是一个数表。 定义 矩阵与线性变换的关系 即得 ( a 11 a 12…

(el-switch)操作:Element-plus 中 Switch 将默认值修改为 “true“ 与 “false“(字符串)来控制开关

Ⅰ、Element-plus 提供的 Switch 开关组件与想要目标情况的对比&#xff1a; 1、Element-plus 提供 Switch 组件情况&#xff1a; 其一、Element-ui 自提供的 Switch 代码情况为(示例的代码)&#xff1a; // Element-plus 自提供的代码&#xff1a; // 此时是使用了 ts 语言环…

英语使用场景口语

HOTEL ENGLISH hotel motel inn b&b Process 1.booking a room can i reserve a room? reservation do you have and singles? double room standard room deluxe room presidential suite do you have a pick-up service? 2.checking in where is the recept…

C++物理引擎Box2D的下载,编译,VS2013配置环境

文章目录 网站和下载地址编译工具:编译box2dhelloworld测试网站和下载地址 https://box2d.org/ 下载地址 https://hub.nuaa.cf/erincatto/box2d/tags 编译工具: 1.VS2013 2.cmake 下载地址 https://cmake.org/ 编译box2d 下载box2d源码2.4.0,解压。在box2d-2.4.0目录下…

手机便签内容不见了怎么恢复正常?

在日常生活和工作中&#xff0c;很多人都需要随手记录事情&#xff0c;例如家庭琐事、孩子相关的事情、指定时间需要完成的工作任务、会议安排等。当我们需要随时随地记录事情的时候&#xff0c;手机便签应用就是非常不多的选择&#xff0c;我们直接打开手机上的便签APP就可以新…

浏览器无法连接网络问题

问题描述 电脑其他程序都能正常联网&#xff0c;但是所有的浏览器都无法联网&#xff0c;同时外部网站都能ping通 问题诊断 查看电脑Internet连接的问题报告显示&#xff1a;该设备或资源(Web 代理)未设置为接受端口"7890"上的连接。 解决方案 经过检查发现不是IP地址…

QT自带PDF库的使用

QT自带PDF库可以方便的打开PDF文件&#xff0c;并将文件解析为QImage&#xff0c;相比网上提供的开源库&#xff0c;QT自带PDF库使用更方便&#xff0c;也更加可靠&#xff0c;然而&#xff0c;QT自带PDF库的使用却不同于其他通用库的使用&#xff0c;具备一定的技巧。 1. 安装…

Namecheap 便宜域名注册使用,直接购买

FREENOM免费域名不能注册了&#xff0c;现在只能自己动手注册便宜的域名&#xff0c;前面我们也记录了不能注册FREENOM免费域名不能注册怎么办&#xff0c;不能注册FREENOM免费域名&#xff0c;怎么办&#xff0c;这里是解决方案&#xff01; 注册6元域名。 现在我们又多了一个…

机器学习深度学习——文本预处理

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位即将上大四&#xff0c;正专攻机器学习的保研er &#x1f30c;上期文章&#xff1a;机器学习&&深度学习——序列模型&#xff08;NLP启动&#xff01;&#xff09; &#x1f4da;订阅专栏&#xff1a;机器学习&am…

Blazor前后端框架Known-V1.2.11

V1.2.11 Known是基于C#和Blazor开发的前后端分离快速开发框架&#xff0c;开箱即用&#xff0c;跨平台&#xff0c;一处代码&#xff0c;多处运行。 Gitee&#xff1a; https://gitee.com/known/KnownGithub&#xff1a;https://github.com/known/Known 概述 基于C#和Blazo…

运维作业—5

一.基于 CentOS 7 构建 LVS-DR 群集 1.配置LVS 2.第一台real server&#xff08;192.168.100.139:80&#xff09; 手工在RS端绑定VIP 手工在RS端抑制ARP响应 3.第二台real server&#xff08;192.168.100.140:80&#xff09; 安装arptables并启动 使用arptables实现抑制 测试…

网卡内部的 DMA

前言 MCU、SOC 内部通常带有 DMA 控制器&#xff0c;要想使用 DMA 通常需要如下操作 选择通道配置传输方向&#xff08;内存到外设、内存到内存、外设到内存&#xff09;设置源地址、目的地址&#xff08;内存地址、外设地址&#xff09;设置源地址、目的地址是否自增设置位宽…

【Spring Cloud 】基于微服务架构的智慧工地监管平台源码带APP

智慧工地监管平台是一种利用物联网、云计算、大数据等技术手段实现工地信息化管理的解决方案。它通过数据采集、分析和应用&#xff0c;在实时监控、风险预警、资源调度等方面为工地管理者提供了全方位的支持&#xff0c;提高了工地管理的效率和质量。智慧监管平台还基于“云端…

Detector定位算法在FPGA中的实现——section1 原理推导

关于算法在FPGA中的实现&#xff0c;本次利用业余的时间推出一个系列章节&#xff0c;专门记录从算法的推导、Matlab的实现、FPGA的移植开发与仿真做一次完整的FPGA算法开发&#xff0c;在此做一下相关的记录和总结&#xff0c;做到温故知新。 这里以Detector在Global Coordina…

C++11 新特性 ---- 模板的优化

C11 模板机制:① 函数模板② 类模板模板的使用&#xff1a;① 范围&#xff1a;模板的声明或定义只能在全局或类范围进行&#xff0c;不可以在局部范围&#xff08;如函数&#xff09;② 目的&#xff1a;为了能够编写与类型无关的代码函数模板&#xff1a;- 格式&#xff1a;t…

Glass指纹识别工具,多线程Web指纹识别工具-Chunsou

Glass指纹识别工具&#xff0c;多线程Web指纹识别工具-Chunsou。 Glass指纹识别工具 Glass一款针对资产列表的快速指纹识别工具&#xff0c;通过调用Fofa/ZoomEye/Shodan/360等api接口快速查询资产信息并识别重点资产的指纹&#xff0c;也可针对IP/IP段或资产列表进行快速的指…

chrome插件开发实例05-拦截页面请求

目录 功能 演示 源代码下载 manifest.json devtools.html devtools.js background.js 功能 拦截任意打开页面的请求信息&#xff0c;包括&#xff1a;URL&#xff0c;参数&#xff0c;请求方式method, 返回status&#xff0c;返回体大小&#xff0c;返回原始内容 演示 源…