Kafka分区策略实现

news2025/2/3 20:29:57

引言

Kafka 的分区策略决定了生产者发送的消息会被分配到哪个分区中,合理的分区策略有助于实现负载均衡、提高消息处理效率以及满足特定的业务需求。

轮询策略(默认)

  • 轮询策略是 Kafka 默认的分区策略(当消息没有指定键时)。生产者会按照顺序依次将消息发送到各个分区中,确保每个分区都能均匀地接收到消息,从而实现负载均衡。简单高效,能使各个分区的消息量相对均衡,充分利用每个分区的存储和处理能力。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    
    public class RoundRobinProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);
                producer.send(record);
            }
            producer.close();
        }
    }

    随机策略

  • 随机策略会随机地将消息分配到一个分区中。这种策略在某些情况下可以实现一定程度的负载均衡,但由于是随机分配,可能会导致分区之间的消息分布不够均匀。可以通过自定义分区器来实现随机策略。
  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    
    public class RandomPartitioner implements Partitioner {
        private final Random random = new Random();
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            return random.nextInt(partitions.size());
        }
    
        @Override
        public void close() {}
    
        @Override
        public void configure(Map<String, ?> configs) {}
    }
    
    // 使用随机分区器的生产者示例
    public class RandomProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("partitioner.class", "RandomPartitioner");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);
                producer.send(record);
            }
            producer.close();
        }
    }

    按键哈希策略

  • 当消息指定了键时,Kafka 会根据键的哈希值将消息分配到特定的分区中。相同键的消息会被分配到同一个分区,这有助于保证具有相同业务逻辑的消息顺序性。可以保证消息的局部有序性,例如在处理用户相关的消息时,将同一个用户的消息发送到同一个分区,方便后续的处理和分析。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    
    public class KeyBasedProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "user-" + (i % 2), "message-" + i);
                producer.send(record);
            }
            producer.close();
        }
    }

    自定义分区策略(实现接口)

  • 当上述默认策略无法满足业务需求时,可以自定义分区策略。通过实现org.apache.kafka.clients.producer.Partitioner接口,重写partition方法来实现自定义的分区逻辑。例如,根据消息的某些特定字段(如时间、地理位置等)来进行分区,以满足特定的业务需求。

  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;
    
    public class CustomPartitioner implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            // 自定义分区逻辑,这里简单示例根据消息值的长度分区
            String message = (String) value;
            return message.length() % partitions.size();
        }
    
        @Override
        public void close() {}
    
        @Override
        public void configure(Map<String, ?> configs) {}
    }
    
    // 使用自定义分区器的生产者示例
    public class CustomProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("partitioner.class", "CustomPartitioner");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);
                producer.send(record);
            }
            producer.close();
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2291432.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在无sudo权限Linux上安装 Ollama 并使用 DeepSeek-R1 模型

本教程将指导你如何在 Linux 系统上安装 Ollama&#xff08;一个本地运行大型语言模型的工具&#xff09;&#xff0c;并加载 DeepSeek-R1 模型。DeepSeek-R1 是一个高性能的开源语言模型&#xff0c;适用于多种自然语言处理任务。 DeepSeek-R1 简介 DeepSeek-R1 是 DeepSeek …

蓝桥杯思维训练营(一)

文章目录 题目总览题目详解翻之一起做很甜的梦 蓝桥杯的前几题用到的算法较少&#xff0c;大部分考察的都是思维能力&#xff0c;方法比较巧妙&#xff0c;所以我们要积累对应的题目&#xff0c;多训练 题目总览 翻之 一起做很甜的梦 题目详解 翻之 思维分析&#xff1a;一开…

纯后训练做出benchmark超过DeepseekV3的模型?

论文地址 https://arxiv.org/pdf/2411.15124 模型是AI2的&#xff0c;他们家也是玩开源的 先看benchmark&#xff0c;几乎是纯用llama3 405B后训练去硬刚出一个gpt4o等级的LLamA405 我们先看之前的机遇Lllama3.1 405B进行全量微调的模型 Hermes 3&#xff0c;看着还没缘模型…

OpenAI深夜反击:o3-mini免费上线,能否撼动DeepSeek的地位?

还在为寻找合适的 AI 模型而烦恼吗&#xff1f;chatTools 平台为您精选 o1、GPT4o、Claude、Gemini 等顶尖 AI 模型&#xff0c;满足您不同的 AI 应用需求。立即体验强大的 AI 能力&#xff01; 深夜反击&#xff0c;OpenAI祭出o3-mini 在DeepSeek异军突起&#xff0c;搅动AI行…

【Linux-网络】初识计算机网络 Socket套接字 TCP/UDP协议(包含Socket编程实战)

&#x1f3ac; 个人主页&#xff1a;谁在夜里看海. &#x1f4d6; 个人专栏&#xff1a;《C系列》《Linux系列》《算法系列》 ⛰️ 道阻且长&#xff0c;行则将至 目录 &#x1f4da;一、初识计算机网络 &#x1f4d6; 背景 &#x1f4d6; 网络协议 &#x1f516;OSI七层…

使用ollama在本地部署一个deepseek大模型

文章目录 为什么选择本地化部署需要用到什么作者使用的什么环境如何根据自己的电脑或服务器配置选择自己能部署的大模型 一、Ollama1、下载Ollama2、安装Ollama 二、DeepSeek R11、下载DeepSeek R12、安装DeepSeek R1 三、ChatBox AI1、下载ChatBox AI2、安装ChatBox AI3、连接…

10 Flink CDC

10 Flink CDC 1. CDC是什么2. CDC 的种类3. 传统CDC与Flink CDC对比4. Flink-CDC 案例5. Flink SQL 方式的案例 1. CDC是什么 CDC 是 Change Data Capture&#xff08;变更数据获取&#xff09;的简称。核心思想是&#xff0c;监测并捕获数据库的变动&#xff08;包括数据或数…

【含文档+PPT+源码】基于微信小程序连锁药店商城

项目介绍 本课程演示的是一款基于微信小程序连锁药店商城&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的 Java 学习者。 1.包含&#xff1a;项目源码、项目文档、数据库脚本、软件工具等所有资料 2.带你从零开始部署运行本套系统 3.该项目附带的…

[免费]微信小程序智能商城系统(uniapp+Springboot后端+vue管理端)【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的微信小程序智能商城系统(uniappSpringboot后端vue管理端)&#xff0c;分享下哈。 项目视频演示 【免费】微信小程序智能商城系统(uniappSpringboot后端vue管理端) Java毕业设计_哔哩哔哩_bilibili 项目介绍…

2025年02月02日Github流行趋势

项目名称&#xff1a;oumi 项目地址url&#xff1a;https://github.com/oumi-ai/oumi 项目语言&#xff1a;Python 历史star数&#xff1a;1416 今日star数&#xff1a;205 项目维护者&#xff1a;xrdaukar, oelachqar, taenin, wizeng23, kaisopos 项目简介&#xff1a;构建最…

vue入门到实战 三

目录 3.1 v-bind 3.1.1 v-bind指令用法 ​编辑3.1.2 使用v-bind绑定class 3.1.3 使用v-bind绑定style 3.2.1 v-if指令 3.2.1 v-if指令 3.2.2 v-show指令 ​3.3 列表渲染指令v-for 3.3.1 基本用法 3.3.2 数组更新 3.3.3 过滤与排序 3.4 事件处理 3.4.1 使用v-on指令…

实验六 项目二 简易信号发生器的设计与实现 (HEU)

声明&#xff1a;代码部分使用了AI工具 实验六 综合考核 Quartus 18.0 FPGA 5CSXFC6D6F31C6N 1. 实验项目 要求利用硬件描述语言Verilog&#xff08;或VHDL&#xff09;、图形描述方式、IP核&#xff0c;结合数字系统设计方法&#xff0c;在Quartus开发环境下&#xff…

java SSM框架 商城系统源码(含数据库脚本)

商城购物功能&#xff0c;项目代码&#xff0c;mysql脚本&#xff0c;html等静态资源在压缩包里面 注册界面 登陆界面 商城首页 文件列表 shop/.classpath , 1768 shop/.project , 1440 shop/.settings/.jsdtscope , 639 shop/.settings/org.eclipse.core.resources.prefs , …

Unet 改进:在encoder和decoder间加入TransformerBlock

目录 1. TransformerBlock 2. Unet 改进 3. 完整代码 Tips:融入模块后的网络经过测试,可以直接使用,设置好输入和输出的图片维度即可 1. TransformerBlock TransformerBlock是Transformer模型架构的基本组件,广泛应用于机器翻译、文本摘要和情感分析等自然语言处理任务…

【Linux系统】信号:认识信号 与 信号的产生

信号快速认识 1、生活角度的信号 异步&#xff1a;你是老师正在上课&#xff0c;突然有个电话过来资料到了&#xff0c;你安排小明过去取资料&#xff0c;然后继续上课&#xff0c;则小明取资料这个过程就是异步的 同步&#xff1a;小明取快递&#xff0c;你停下等待小明回来再…

一、html笔记

(一)前端概述 1、定义 前端是Web应用程序的前台部分,运行在PC端、移动端等浏览器上,展现给用户浏览的网页。通过HTML、CSS、JavaScript等技术实现,是用户能够直接看到和操作的界面部分。上网就是下载html文档,浏览器是一个解释器,运行从服务器下载的html文件,解析html、…

PyQt5超详细教程终篇

PyQt5超详细教程 前言 接&#xff1a; [【Python篇】PyQt5 超详细教程——由入门到精通&#xff08;序篇&#xff09;](【Python篇】PyQt5 超详细教程——由入门到精通&#xff08;序篇&#xff09;-CSDN博客) 建议把代码复制到pycahrm等IDE上面看实际效果&#xff0c;方便理…

洛谷 P8724 [蓝桥杯 2020 省 AB3] 限高杆

洛谷题目传送门 题目描述 某市有 n 个路口&#xff0c;有 m 段道路连接这些路口&#xff0c;组成了该市的公路系统。其中一段道路两端一定连接两个不同的路口。道路中间不会穿过路口。 由于各种原因&#xff0c;在一部分道路的中间设置了一些限高杆&#xff0c;有限高杆的路…

虚幻UE5手机安卓Android Studio开发设置2025

一、下载Android Studio历史版本 步骤1&#xff1a;虚幻4.27、5.0、5.1、5.2官方要求Andrd Studio 4.0版本&#xff1b; 5.3、5.4、5.5官方要求的版本为Android Studio Flamingo | 2022.2.1 Patch 2 May 24, 2023 虚幻官网查看对应Andrd Studiob下载版本&#xff1a; https:/…

JavaWeb入门-请求响应(Day3)

(一)请求响应概述 请求(HttpServletRequest):获取请求数据 响应(HttpServletResponse):设置响应数据 BS架构:Browser/Server,浏览器/服务器架构模式。客户端只需要浏览器就可访问,应用程序的逻辑和数据都存储在服务端(维护方便,响应速度一般) CS架构:Client/ser…