Kafka生产者的粘性分区算法

news2025/1/22 17:59:42

分区算法分类

kafka在生产者投递消息时,会根据是否有key采取不用策略来获取分区。

存在key时会根据key计算一个hash值,然后采用hash%分区数的方式获取对应的分区。

而不存在key时采用随机算法选取分区,然后将所有的消息封装到这个batch上直到达到限定数量,然后才发送出去。

如下图,6条消息采用key可能分三次发送到三个不同的分区,需要3次网络请求。如果没有key将封住成一个批次发送。这样一次网路请求就可以发送多条消息,大大提高了效率。

源码分析

producer根据keyBytes是否有值采用不同的分区策略。有key的计算hash % numPartitions得到分区。

 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                         int numPartitions) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        }
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

 并且kafka在这里做了缓存,如果第一次获取到了粘性分区后面会缓存起来。 

 public int partition(String topic, Cluster cluster) {
        Integer part = indexCache.get(topic);
        if (part == null) {
            return nextPartition(topic, cluster, -1);
        }
        return part;
    }

没有key的采用stickyPartitionCache的策略,这里是分区算法的主要代码。获取所有的availablePartitions,然后如果availablePartitions大于1,获取一个随机数random,然后通过random % availablePartitions.size()的方式获取分区。

      List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() < 1) {
                Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = random % partitions.size();
            } else if (availablePartitions.size() == 1) {
                newPart = availablePartitions.get(0).partition();
            } else {
                while (newPart == null || newPart.equals(oldPart)) {
                    int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    newPart = availablePartitions.get(random % availablePartitions.size()).partition();
                }
            }

abortForNewBatch表示需要发送到新的批次,然后调用onNewBatch获取新的分区。

      if (result.abortForNewBatch) {
                int prevPartition = partition;
                partitioner.onNewBatch(record.topic(), cluster, prevPartition);
                partition = partition(record, serializedKey, serializedValue, cluster);
                tp = new TopicPartition(record.topic(), partition);

                ...



public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }

在下一个批次发送时会检测是否和上一个分区相同,如果相同将会缓存一个新的分区。

        // Check that the current sticky partition for the topic is either not set or that the partition that 
        // triggered the new batch matches the sticky partition that needs to be changed.
        if (oldPart == null || oldPart == prevPartition) {

总结

为了提升kafka发送消息的速率,在对消息顺序没有特殊的要求情况下,应该尽量避免设置消息的key,这样可以提交发送消息的吞吐量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/390159.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023/3/5 Vue学习笔记 - 生命周期函数探究-2

1 beforeCreated 在组件实例初始化完成之后立即调用。会在实例初始化完成、props 解析之后、data() 和 computed 等选项处理之前立即调用。 组件的组件实例初始化动作&#xff1a;初始化一个空的Vue实例对象&#xff0c;此时&#xff0c;这个对象身上只有一个默认的声明周期函…

Eureka注册中心快速入门

一、提供者与消费者**服务提供者&#xff1a;**一次业务中&#xff0c;被其他微服务调用的服务。&#xff08;提供接口给其他微服务&#xff09;**服务消费者&#xff1a;**一次业务中&#xff0c;调用其他微服务的服务。&#xff08;调用其它微服务提供的接口&#xff09;比如…

如何分辨on-policy和off-policy

on-policy的定义&#xff1a;behavior policy和target-policy相同的是on-policy&#xff0c;不同的是off-policy。 behavior policy&#xff1a;采样数据的策略&#xff0c;影响的是采样出来s,a的分布。 target policy&#xff1a;就是被不断迭代修改的策略。 如果是基于深度…

JavaSE学习笔记总结day18(完结!!!)

今日内容 零、 复习昨日 一、作业 二、进程与线程 三、创建线程 四、线程的API 五、线程状态 六、线程同步 零、 复习昨日 晨考 一、作业 见答案 二、进程与线程[了解] 一个进程就是一个应用程序,进程包含线程 一个进程至少包含一个线程,大部分都是有多条线程在执行任务(多线…

Win系统蓝牙设备频繁卡顿/断连 - 解决方案

Win系统蓝牙设备频繁卡顿/断连 - 解决方案前言常见网卡Intel无线网卡&#xff08;推荐&#xff09;Realtek无线网卡总结查看本机网卡解决方案更新驱动更换网卡&#xff08;推荐&#xff09;前言 无线网卡有2个模块&#xff0c;一个是WiFi&#xff0c;一个是蓝牙&#xff0c;因…

Kubernetes之存储管理(下)

动态卷供应 上篇文章讲述的持久性存储&#xff0c;是先创建pv&#xff0c;然后才能创建pvc。如果不同的命名空间里同时要创建不同的pvc&#xff0c;那么就需要提前创建好pv&#xff0c;这样才能为pvc提供存储。但是这种方式太过繁琐&#xff0c;可以使用storageClass&#xff…

yolov5算法,训练模型,模型检测

嘟嘟嘟嘟&#xff01;工作需要&#xff0c;所以学习了下yolov5算法。是干什么的呢&#xff1f; 通俗来说&#xff0c;可以将它看做是一个小孩儿&#xff0c;通过成年人&#xff08;开发人员&#xff09;提供的大量图片的学习&#xff0c;让自己知道我看到的哪些场景需要提醒给成…

MySQL底层存储B-Tree和B+Tree原理分析

1.B-Tree的原理分析 &#xff08;1&#xff09;什么是B-Tree B-树&#xff0c;全称是 Balanced Tree&#xff0c;是一种多路平衡查找树。 一个节点包括多个key (数量看业务)&#xff0c;具有M阶的B树&#xff0c;每个节点最多有M-1个Key。 节点的key元素个数就是指这个节点能…

Andorid:关于Binder几个面试问题

1.简单介绍下binderbinder是一种进程间通讯的机制进程间通讯需要了解用户空间和内核空间每个进程拥有自己的独立虚拟机&#xff0c;系统为他们分配的地址空间都是互相隔离的。如两个进程需要进行通讯&#xff0c;则需要使用到内核空间做载体&#xff0c;内核空间是所有进程共享…

FL2440(S3C2440A 芯片) 开发板开发笔记

FL2440(S3C2440A 芯片) 开发板开发笔记 开发板的拨码开关指南&#xff1a; FL2440 改 vnfg 飞凌嵌入式 www. witech. com. cn 09. 8. 22 1 开发板使用手册 version4. 0 FL2440 保定飞凌嵌入式技术有限公司 网站&#xff1a;http: //www. witech. com. cn http: //www. he…

动态规划之买卖股票问题

&#x1f308;&#x1f308;&#x1f604;&#x1f604; 欢迎来到茶色岛独家岛屿&#xff0c;本期将为大家揭晓动态规划之买卖股票问题 &#xff0c;做好准备了么&#xff0c;那么开始吧。 &#x1f332;&#x1f332;&#x1f434;&#x1f434; 动态规划算法本质上就是穷举…

synchronized底层

Monitor概念一、Java对象头二、Monitor2.1、Monitor—工作原理2.2、Monitor工作原理—字节码角度2.2、synchronized进阶原理&#xff08;优化&#xff09;2.3、synchronized优化原理——轻量级锁2.4、synchronized优化原理——锁膨胀2.5、synchronized优化原理——自旋优化2.6、…

VUE3-Cesium(加载GeoJSON数据)

目录 一、准备工作 1、新建vue项目 解决报错&#xff1a;使用nvm后找不到vue -V找不到版本 2、安装Cesium插件 3、安装 Element Plus、unplugin-vue-components 和 unplugin-auto-import 4、按需自动导入element-plus 测试按需自动导入element-plus是否配置成功 二、项…

2023年软考中级电子商务设计师考什么?

首先&#xff0c;电子商务设计师属于软考中级&#xff0c;因此难度也不是特别大。但并不是说就完全没有难度&#xff0c;难度还是有的&#xff0c;像上午题一般把基本知识点掌握了&#xff0c;是没什么问题的&#xff0c;重点就在于下午题会比较难。 接下来我们来剖析一下考试…

408考研计算机之计算机组成与设计——知识点及其做题经验篇目1:RAM与ROM

目录 一、RAM 1、特点 2、分类 ①SRAM ②DRAM 二、ROM 1、特点 2、分类 可能是小编的学习风格和大家的不一样&#xff0c;小编喜欢由难到易的学习风格&#xff0c;所以在408计算机考研的四门课中&#xff0c;先选择了最难的计算机组成与设计进行学习。近…

【C++】初识类和对象

&#x1f3d6;️作者&#xff1a;malloc不出对象 ⛺专栏&#xff1a;C的学习之路 &#x1f466;个人简介&#xff1a;一名双非本科院校大二在读的科班编程菜鸟&#xff0c;努力编程只为赶上各位大佬的步伐&#x1f648;&#x1f648; 目录前言一、面向过程和面向对象初步认识二…

Spring核心模块—— BeanFactoryPostProcessorBeanPostProcessor(后处理器)

后置处理器前言Spring的后处理器BeanFactoryPostProcessor&#xff08;工厂后处理器&#xff09;执行节点作用基本信息经典场景子接口——BeanDefinitiRegistryPostProcessor基本介绍用途具体原理例子——注册BeanDefinition使用Spring的BeanFactoryPostProcessor扩展点完成自定…

Linux安装minio单机版

说明&#xff1a;因为前面记录了一下minio的使用&#xff0c;这里说一下minio的安装&#xff0c;只是单机版哦 环境准备&#xff1a;Linux系统 说明图&#xff1a; 1.创建文件夹命令 我的是安装在/usr/local 文件夹下面的创建文件夹命令 #进入目标文件夹 cd /usr/local#创建…

5 GateWay断言Predicate

每一个Predicate的使用&#xff0c;可以理解为&#xff1a;当满足条件后才会进行转发&#xff0c;如果十多个&#xff0c;那就是满足所有条件才会转发 断言种类 After&#xff1a;匹配在指定日期时间之后发生的请求。Before&#xff1a;匹配在指定日期之前发生的请求。Betwee…

常见摄像头接口USB、DVP、MIPI接口的对比

常见摄像头接口DVP、MIPI、USB的比较 引言 摄像头传感器已经广泛用于嵌入式设备了&#xff0c;现在的手机很多都支持多个摄像头。 在物联网领域&#xff0c;摄像头传感器也越来越被广泛使用。今天就来简单聊一聊几种常见的摄像头接口。 传感器与主控设备进行通信&#xff0…