Kafka Producer发送消息流程之分区器和数据收集器

news2024/9/21 8:04:31

文章目录

  • 1. Partitioner分区器
  • 2. 自定义分区器
  • 3. RecordAccumulator数据收集器

1. Partitioner分区器

在这里插入图片描述

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java,中doSend方法,记录了生产者将消息发送的流程,其中有一步就是计算当前消息应该发送往对应Topic哪一个分区,

int partition = partition(record, serializedKey, serializedValue, cluster);

private final Partitioner partitioner;

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        //当record的分区已存在,则直接返回,这对应了创建Record时可以手动传入partition参数
        if (record.partition() != null)
            return record.partition();

        // 如果存在partitioner分区器,则使用Partitioner.partition方法计算分区数据
        if (partitioner != null) {
            int customPartition = partitioner.partition(
                record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
            if (customPartition < 0) {
                throw new IllegalArgumentException(String.format(
                    "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
            }
            return customPartition;
        }


        // 如果没有分区器的情况
        if (serializedKey != null && !partitionerIgnoreKeys) {
            // hash the keyBytes to choose a partition
            return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
        } else {
            return RecordMetadata.UNKNOWN_PARTITION;
        }
    }


// 利用键的哈希值来选择分区
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
        return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
    }

2. 自定义分区器

新建类实现Partitioner接口,key是字符串数字,奇数送到分区0,偶数送到分区1 。

public class MyKafkaPartitioner implements Partitioner {
    @Override
    public int partition(String s, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        // Ensure the key is a non-null string
        if (key == null || !(key instanceof String)) {
            throw new IllegalArgumentException("Key must be a non-null String");
        }

        // Parse the key as an integer
        int keyInt;
        try {
            keyInt = Integer.parseInt((String) key);
        } catch (NumberFormatException e) {
            throw new IllegalArgumentException("Key must be a numeric string", e);
        }

        // Determine the partition based on the key's odd/even nature
        if (keyInt % 2 == 0) {
            return 1; // Even keys go to partition 2
        } else {
            return 0; // Odd keys go to partition 0
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

新建一个存在多分区的Topic。

在这里插入图片描述

public class KafkaProducerPartitionorTest {
    public static void main(String[] args) throws InterruptedException {
        //创建producer
        HashMap<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //指定拦截器
        config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName());
        //指定分区器
        config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyKafkaPartitioner.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

        for (int i = 0; i < 10; i++) {
            //创建record
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                    "test1",
                    "key"+i,
                    "我是你爹"+i
            );
            //发送record
            producer.send(record);
            Thread.sleep(500);
        }

        //关闭producer
        producer.close();
    }
}

配置好PARTITIONER_CLASS_CONFIG后发送消息。
在这里插入图片描述
在这里插入图片描述

可以分区器成功起作用了。

3. RecordAccumulator数据收集器

通过数据校验后,数据从分区器来到数据收集器

数据收集器的工作机制

  1. 队列缓存RecordAccumulator为每个分区维护一个队列。默认情况下,每个队列的批次大小(buffer size)是16KB,这个大小可以通过配置参数batch.size来调整。

  2. 缓冲区管理

    • 每个分区都有一个或多个批次,每个批次包含多条消息。
    • 当一个批次填满(即达到batch.size),或者达到发送条件(如linger.ms时间窗口,即发送消息前等待的时间)时,批次会被标记为可发送状态,并被传递给Sender线程。
  3. 满批次处理

    • 当某个分区的队列中的某个批次大小超过了16KB(默认值)或满足linger.ms的时间条件,RecordAccumulator会将该批次加入到一个待发送的队列中。
    • Sender线程会从待发送队列中获取这些满批次并将其发送到Kafka集群。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1931326.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mongodb数组字段索引之多键索引

学习mongodb&#xff0c;体会mongodb的每一个使用细节&#xff0c;欢迎阅读威赞的文章。这是威赞发布的第92篇mongodb技术文章&#xff0c;欢迎浏览本专栏威赞发布的其他文章。如果您认为我的文章对您有帮助或者解决您的问题&#xff0c;欢迎在文章下面点个赞&#xff0c;或者关…

跨境电商小白0-1教程,跨境电商新手开店教程

跨境电商新纪元&#xff0c;新手开店秘籍大公开&#xff01;&#x1f680; 还在为跨境电商的浩瀚海洋感到迷茫&#xff1f;别怕&#xff0c;从0到1的开店之旅&#xff0c;我们为你精心铺设了每一步&#xff01;&#x1f463; 无论你是完全的新手跨境小白&#xff0c;还是对未来…

【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【23】【订单服务】

持续学习&持续更新中… 守破离 【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【23】【订单服务】 订单中心订单信息用户信息订单基础信息商品信息优惠信息支付信息物流信息 订单状态订单流程订单创建与支付逆向流程 订单确认页Feign远程调用丢失请求头问题Feign异步…

Qt第十一章 其他控件

其他控件 文章目录 其他控件按钮组项目小部件输入控件显示控件容器 按钮组 命令链接按钮 对话框按钮盒子 添加基础按钮 改变排列方向 项目小部件 列表控件List Widget 也可以通过代码添加 // 添加ui->listWidget->addItem("你好啊");ui->listWidge…

数据链路层重点协议

目录 一、以太网 二、MTU 1、MTU对IP协议的影响 2、MTU对UDP的影响 3、MTU对TCP协议的影响 三、ARP协议 1、作用&#xff1a;建立主机IP地址和MAC地址的映射关系 2、工作流程 一、以太网 以太网不是一种具体的网络&#xff0c;而是一种技术标准。既包含了数据链路层的…

数据库第9

安装redis&#xff0c;启动客户端、验证 C:\Windows\System32>redis-cli string类型数据的命令操作&#xff1a; 设置键值 set k1 12 读取键值 get k1 ​ 数值类型自增1 incr k1 数值类型自减1 decr k1 查看值的长度 STRLEN k1 list类型数据的命令操作&#xff1a; &#x…

[MySQL][内置函数][日期函数][字符串函数][数学函数]详细讲解

目录 1.日期函数1.基础语法2.示例13.示例2 2.字符串函数1.基础语法2.示例 3.数学函数1.基础语法2.示例 4.其他函数 1.日期函数 1.基础语法 日期时间在MYSQL中是区分开的 日期&#xff1a;年月日时间&#xff1a;时分秒 获得年月日select current_date();----------------| cur…

Open3D 最小二乘法拟合点云平面

目录 一、概述 1.1最小二乘法原理 1.2实现步骤 1.3应用场景 二、代码实现 2.1关键函数 2.2完整代码 三、实现效果 3.1原始点云 3.2matplotlib可视化 3.3平面拟合方程 前期试读&#xff0c;后续会将博客加入该专栏&#xff0c;欢迎订阅 Open3D点云算法与点云深度学习…

opencv学习:图像视频的读取截取部分图像数据颜色通道提取合并颜色通道边界填充数值计算图像融合

一、计算机眼中的图像 1.图像操作 构成像素点的数字在0~255之间 RGB叫做图像的颜色通道 h500&#xff0c;w500 2.灰度图像 3. 彩色图像 4.图像的读取 5.视频的读取 cv2.VideoCapture()--在OpenCV中&#xff0c;可以使用VideoCapture来读取视频文件&#xff0c;或是摄像头数…

前缀和算法——部分OJ题详解

&#xff08;文章的题目解释可能存在一些问题&#xff0c;欢迎各位小伙伴私信或评论指点&#xff08;双手合十&#xff09;&#xff09; 关于前缀和算法 前缀和算法解决的是“快速得出一个连续区间的和”&#xff0c;以前求区间和的时间复杂度是O(N)&#xff0c;使用前缀和可…

关于springboot的@DS(““)多数据源的注解无法生效的原因

对于com.baomidou.dynamic.datasource.annotation的DS注解&#xff0c;但凡有一个AOP的修改都会影响到多数据源无法生效的问题&#xff0c;本次我是添加了方法上添加了Transactional&#xff0c;例如下图&#xff1a; 在方法上写了这个注解&#xff0c;会影响到DS("db2&qu…

MODEL4高性价比工业级HMI芯片在喷码机解决方案中的应用

一、概述 随着工业自动化与智能化的发展&#xff0c;喷码机作为标识设备在各行各业中扮演着至关重要的角色。为满足市场对于高效、精准、灵活喷码的需求&#xff0c;我们推出了基于MODEL4工业级HMI芯片的喷码机解决方案。 该方案集成了高性能国产嵌入式64位RISC-V内核芯片组&…

<数据集>铁轨缺陷检测数据集<目标检测>

数据集格式&#xff1a;VOCYOLO格式 图片数量&#xff1a;844张 标注数量(xml文件个数)&#xff1a;844 标注数量(txt文件个数)&#xff1a;844 标注类别数&#xff1a;3 标注类别名称&#xff1a;[Spalling, Squat, Wheel Burn] 序号类别名称图片数框数1Spalling3315522…

集线器、交换机、路由器的区别,冲突域、广播域

冲突域 定义&#xff1a;同一时间内只能有一台设备发送信息的范围。 分层&#xff1a;基于OSI模型的第一层物理层。 广播域 定义&#xff1a;如果某个站点发出一个广播信号&#xff0c;所有能接受到这个信号的设备的范围称为一个广播域。 分层&#xff1a;基于OSI模型的第二…

绿色水利,智慧未来:数字孪生技术在智慧水库建设中的应用,助力实现水资源的可持续利用与环境保护的双赢

本文关键词&#xff1a;智慧水利、智慧水利工程、智慧水利发展前景、智慧水利技术、智慧水利信息化系统、智慧水利解决方案、数字水利和智慧水利、数字水利工程、数字水利建设、数字水利概念、人水和协、智慧水库、智慧水库管理平台、智慧水库建设方案、智慧水库解决方案、智慧…

【Python】open()函数的全面解析:如何读取和写入文件

文章目录 1. 基本用法&#xff1a;打开文件2. 不同模式的使用3. 文件读取方法3.1 readline()方法3.2 readlines()方法 4. 上下文管理器5. 错误处理6. 小结 在编程过程中&#xff0c;文件操作是一个非常常见的任务&#xff0c;而Python的open()函数是进行文件操作的基础。通过op…

Sparse4D-v3:稀疏感知的性能优化及端到端拓展

极致的感知性能与极简的感知pipeline一直是牵引我们持续向前的目标。为了实现该目标&#xff0c;打造一个性能优异的端到端感知模型是重中之重&#xff0c;充分发挥深度神经网络数据闭环的作用&#xff0c;才能打破当前感知系统的性能上限&#xff0c;解决更多的corner case&am…

分布式 I/O 系统Modbus TCP 耦合器BL200

BL200 耦合器是一个数据采集和控制系统&#xff0c;基于强大的 32 位微处理器设计&#xff0c;采用 Linux 操作系统&#xff0c;可以快速接入现场 PLC、SCADA 以及 ERP 系统&#xff0c; 内置逻辑控制、边缘计算应用&#xff0c;支持标准 Modbus TCP 服务器通讯&#xff0c;以太…

Ubuntu Desktop Docker 配置代理

Ubuntu Desktop Docker 配置代理 主要解决 docker pull 拉取不了镜像问题. Docker Desktop 配置代理 这个比较简单, 直接在 Docker Desktop 里设置 Proxies, 示例如下: http://127.0.0.1:7890 Docker Engine 配置代理 1.Docker Engine 使用下面配置文件即可, root 用户可…

Java面试八股之简述单例redis并发承载能力

简述单例redis并发承载能力 单例Redis实例的并发承载上限受到多种因素的影响&#xff0c;包括但不限于硬件性能、网络条件、数据集大小、操作类型以及Redis自身的配置。以下是几个关键因素的详细说明&#xff1a; 硬件性能&#xff1a; CPU&#xff1a;Redis主要依赖于CPU的…