【项目实战】Kafka 生产者写入分区的策略

news2025/1/16 0:49:35

👉博主介绍: 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO TOP红人

Java知识图谱点击链接:体系化学习Java(Java面试专题)

💕💕 感兴趣的同学可以收藏关注下不然下次找不到哟💕💕

✊✊ 感觉对你有帮助的朋友,可以给博主一个三连,非常感谢 🙏🙏🙏

在这里插入图片描述

文章目录

  • 1、生产者写入分区的策略有哪些?
  • 2、轮询分区策略
  • 3、随机分区策略
  • 4、按 key 分区策略
  • 5、自定义分区策略
  • 写在最后

1、生产者写入分区的策略有哪些?

生产者写入分区的策略主要有以下几种:

  1. 轮询分区策略:生产者可以使用轮询策略将消息依次写入每个分区,实现负载均衡。在每次发送消息时,生产者会按照轮询的方式选择下一个可用的分区,并将消息写入该分区。这样可以确保消息均匀地分布在各个分区中。

  2. 随机分区策略:Kafka生产者随机的将消息写入分区,有可能会造成消息的分布不均,所以这个策略基本上也很少用。

  3. 按 key 分区策略:Kafka生产者基于消息的键(key)进行哈希计算,然后将消息写入对应的分区。这种策略可以保证具有相同键的消息被写入到相同的分区,从而保证消息的顺序性。

  4. 自定义分区策略:Kafka生产者可以使用自定义分区策略来决定将消息写入哪个分区。

2、轮询分区策略

轮询分区的代码如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;

public class RoundRobinPartitioner implements Partitioner {
   
    private int currentPartition;
    
    @Override
    public void configure(Map<String, ?> configs) {
        // 初始化当前分区索引
        currentPartition = 0;
    }
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
         // 轮询选择下一个分区
        int selectedPartition = currentPartition;
        currentPartition = (currentPartition + 1) % numPartitions;
         return selectedPartition;
    }
    
    @Override
    public void close() {
        // 可选:清理资源
    }
}

partition 方法会使用一个变量 currentPartition 来记录当前选择的分区索引。每次调用 partition 方法时,会将 currentPartition 增加 1,并通过取模运算来确保选择的分区索引始终在分区数范围内。

要使用轮询分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.RoundRobinPartitioner");

3、随机分区策略

随机分区的代码如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
 import java.util.List;
import java.util.Map;
import java.util.Random;

public class RandomPartitioner implements Partitioner {
    
    private final Random random = new Random();
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return random.nextInt(numPartitions);
    }
    
    @Override
    public void close() {
   
    }
    
    @Override
    public void configure(Map<String, ?> configs) {
     
    }
}

partition 方法会随机选择一个分区返回。 random.nextInt(numPartitions) 方法会生成一个小于分区数的随机数,作为分区的索引。

要使用随机分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.RandomPartitioner");

4、按 key 分区策略

按 key 分区的代码如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
 import java.util.List;
import java.util.Map;

public class KeyPartitioner implements Partitioner {
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
         if (keyBytes == null) {
            // 如果 key 为 null,则使用轮询分区策略
            return Math.abs(key.hashCode()) % numPartitions;
        } else {
            // 使用 key 的哈希码来确定分区
            return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
    
    @Override
    public void close() {
        // 可选:清理资源
    }
    
    @Override
    public void configure(Map<String, ?> configs) {
        // 可选:配置方法
    }
}

partition 方法会检查 key 是否为 null。如果 key 为 null,就会使用轮询分区策略,通过计算 key 的哈希码并对分区数取模来确定分区。如果 key 不为 null,则使用 key 的字节数组的哈希码来确定分区。

要使用基于 key 的分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.KeyPartitioner");

5、自定义分区策略

自定义分区的代码如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;

public class CustomPartitioner implements Partitioner {
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
         // 自定义分区逻辑
        // 根据消息的 key 或 value 来选择分区
        // 这里以 key 的哈希值作为分区选择依据
        int partition = Math.abs(key.hashCode()) % numPartitions;
         return partition;
    }
    
    @Override
    public void close() {
        // 可选:清理资源
    }
    
    @Override
    public void configure(Map<String, ?> configs) {
        // 可选:配置分区器
    }
}

partition 方法根据消息的 key 或 value 来选择分区。这里使用 key 的哈希值进行取模运算,以确保选择的分区索引在分区数范围内。

要使用自定义分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.CustomPartitioner");

写在最后

通过y以上这些实现,生产者将根据自定义的分区策略来选择分区来发送消息。您可以根据自己的需求,实现不同的分区逻辑。

💕💕 本文由激流原创,原创不易,希望大家关注、点赞、收藏,给博主一点鼓励,感谢!!!
🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/858264.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

苏州OV泛域名RSA加密算法https

RSA加密算法是一种非对称加密算法&#xff0c;它被广泛应用于信息安全领域。与对称加密算法不同&#xff0c;RSA加密算法使用了两个密钥&#xff0c;一个公钥和一个私钥。公钥可以公开&#xff0c;任何人都可以使用它加密信息&#xff0c;但只有私钥的持有者才能解密信息。RSA加…

NGINX负载均衡及LVS-DR负载均衡集群

目录 LVS-DR原理搭建过程nginx 负载均衡 LVS-DR原理 原理&#xff1a; 1. 当用户向负载均衡调度器&#xff08;Director Server&#xff09;发起请求&#xff0c;调度器将请求发往至内核空间 2. PREROUTING链首先会接收到用户请求&#xff0c;判断目标IP确定是本机IP&#xff…

8路AD采集FMC子卡【产品资料】

FMC148是一款基于VITA57.4标准的JESD204B接口FMC子卡模块&#xff0c;该模块可以实现8路14-bit、500MSPS/1GSPS/1.25GSPS ADC采集功能。该板卡ADC器件采用ADI公司的AD9680芯片,全功率-3dB模拟输入带宽可达2GHz。该ADC与FPGA的主机接口通过16通道的高速串行GTX收发器进行互联。 …

《Java-SE-第三十三章》之函数式编程

前言 在你立足处深挖下去,就会有泉水涌出!别管蒙昧者们叫嚷:“下边永远是地狱!” 博客主页&#xff1a;KC老衲爱尼姑的博客主页 博主的github&#xff0c;平常所写代码皆在于此 共勉&#xff1a;talk is cheap, show me the code 作者是爪哇岛的新手&#xff0c;水平很有限&…

消息中间件 Asio (C++)

折腾了一上午&#xff0c;看到这个结果的时候泪目了兄弟闷&#xff0c;讲真。我的asio客户端成功收到服务端发来的消息了。虽然这确实是极其智障又简单的入门哈哈 下载独立版本 asio网络通信库新建cmake工程&#xff0c;CMakeLists.txt加载asioasio最简单的服务端和客户端代码…

iview+treeSelect组件,我是如何一步步手动实现全选功能的

如果我掏出下图&#xff0c;阁下除了私信我加入学习群&#xff0c;还能如何应对&#xff1f; 正文开始 前言一、历史问题二、通过监听select事件实现全选不靠谱&#xff01;&#xff01;&#xff01;三、 通过外部事件控制树选择组件四、render函数创建组件4.1 不得不说的h函数…

STM32 低功耗-待机模式

STM32 待机模式 文章目录 STM32 待机模式第1章 低功耗模式简介第2章 待机模式简介2.1 进入待机模式2.1 退出待机模式 第3章 待机模式代码部分总结 第1章 低功耗模式简介 在 STM32 的正常工作中&#xff0c;具有四种工作模式&#xff1a;运行、睡眠、停止和待机模式。 在系统或…

nuxt.js框架使用

1、这种框架只要页面有一个地方错&#xff0c;都会出现404或者吓人的报错界面。 如表单的prop属性&#xff0c;在data函数return对象里面该字段找不到或者不一致&#xff0c;就会报404。 2、使用字典&#xff0c;对字典进行翻译。 在plugins/methods.js文件里面&#xff0c;加…

APP专项测试知识点

APP的专项测试 测试要点&#xff1a; 功能测试、兼容性测试、安装、卸载、升级测试、交叉事件测试、PUSH测试、性能测试-使用solopi监控-仅适用于安卓手机&#xff08;CPU、内存、流量测试、电量测试、流畅度测试、启动测试&#xff09;、用户体验测试、稳定性测试 &#xf…

Java 11 新特性解读(1)

目录 前言 新增了一系列字符串处理方法 Optional 加强 局部变量类型推断升级 前言 北京时间2018年9月26日&#xff0c;Oracle官方宣布Java 11正式发布。这是Java大版本周期变化后的第一个长期支持版本&#xff0c;非常值得关注。从官网即可下载,最新发布的Java11将带来ZGC、…

[C++] 自定义的类如何使用“cout“和“cin“?(含日期类实现)

一、引言 在C中&#xff0c;“cin”和"cout"可以说是区别于C语言的一大亮点。 但是&#xff0c;它的自动识别类型&#xff0c;其本质不过是运算符重载。若真到了能够“自动识别”的那一天&#xff0c;人类大概也能进入新的纪元了罢。 对于我们自己写的类&#xff…

uni-app之app上传pdf类型文件

通过阅读官方文档发现&#xff0c;uni.chooseFile在app端不支持非媒体文件上传&#xff1b; 可以使用这个插件&#xff0c;验证过可以上传pdf&#xff1b;具体使用可以去看文档 插件地址 就是还是会出现相机&#xff0c;这个可能需要自己解决下 实现功能&#xff1a;上传只能上…

vscode ssh远程的config/配置文件无法保存解决

问题 之前已经有了一个config&#xff0c;我想更改连接的地址和用户名&#xff0c;但是无法保存&#xff0c;显示需要管理员权限&#xff0c;但以管理员启动vscode或者以管理员权限保存都不行 未能保存“config”: Command failed: “D:\vscode\Microsoft VS Code\bin\code.c…

ssm+vue基于java的少儿编程网上报名系统源码和论文PPT

ssmvue基于java的少儿编程网上报名系统源码和论文PPT006 开发工具&#xff1a;idea 数据库mysql5.7(mysql5.7最佳) 数据库链接工具&#xff1a;navcat,小海豚等 开发技术&#xff1a;java ssm tomcat8.5 摘 要 在国家重视教育影响下&#xff0c;教育部门的密确配合下&#…

如何将Linux上的cpolar内网穿透设置成 - > 开机自启动

如何将Linux上的cpolar内网穿透设置成 - > 开机自启动 文章目录 如何将Linux上的cpolar内网穿透设置成 - > 开机自启动前言一、进入命令行模式二、输入token码三、输入内网穿透命令 前言 我们将cpolar安装到了Ubuntu系统上&#xff0c;并通过web-UI界面对cpolar的功能有…

[YAPI]导出API文档

1.登录点击进去,点击项目2.点击接口,点击编辑,划到最下面,开启开放接口3.点击数据管理, 选择你要的数据导出格式,点击公开接口, 导出完别忘记关闭,防止别人导的时候将你开启的 也一并下载下来

API 测试 | 了解 API 接口概念|电商平台 API 接口测试指南

什么是 API&#xff1f; API 是一个缩写&#xff0c;它代表了一个 pplication P AGC 软件覆盖整个房间。API 是用于构建软件应用程序的一组例程&#xff0c;协议和工具。API 指定一个软件程序应如何与其他软件程序进行交互。 例行程序&#xff1a;执行特定任务的程序。例程也称…

springboot教务综合管理系统java学生教师班级课题jsp源代码mysql

本项目为前几天收费帮学妹做的一个项目&#xff0c;Java EE JSP项目&#xff0c;在工作环境中基本使用不到&#xff0c;但是很多学校把这个当作编程入门的项目来做&#xff0c;故分享出本项目供初学者参考。 一、项目描述 springboot教务综合管理系统 系统有1权限&#xff1a…

全球外贸b2b2c跨境电商购物网站开源搭建

要搭建一个全球外贸B2B2C跨境电商购物网站&#xff0c;需要采取以下步骤&#xff08;以下步骤不分先后&#xff09;&#xff1a; 设计系统架构首先需要设计系统的整体架构&#xff0c;确定系统的技术选型、功能模块和业务流程等。可以考虑使用分布式架构&#xff0c;将系统划分…

恒盛策略:沪指冲高回落跌0.26%,酿酒、汽车等板块走弱,燃气股拉升

10日早盘&#xff0c;两市股指盘中冲高回落&#xff0c;半日成交约4200亿元&#xff0c;北向资金净卖出超20亿元。 到午间收盘&#xff0c;沪指跌0.26%报3235.9点&#xff0c;深成指跌0.54%&#xff0c;创业板指跌0.28%&#xff1b;两市算计成交4202亿元&#xff0c;北向资金净…