Kafka分区策略

news2024/7/4 5:42:11

默认分区器DefaultPartitioner

(1)指明partition的情况下,直
接将指明的值作为partition值;

(2)没有指明partition值但有key的情况下,将key的hash值与topic的
partition数进行取余得到partition值;

例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那
么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。

(3)既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直
使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。

例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进
行使用(如果还是0会继续随机)。

自定义分区器

    1. 实现接口 Partitioner
    1. 实现 3 个方法:partition,close,configure
    1. 编写 partition 方法,返回分区号
      在这里插入图片描述
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * 1. 实现接口 Partitioner
 * 2. 实现 3 个方法:partition,close,configure
 * 3. 编写 partition 方法,返回分区号
 */


public class MyPartitioner implements Partitioner {

    /**
     * 返回信息对应的分区
     *
     * @param topic      主题
     * @param key        消息的 key
     * @param keyBytes   消息的 key 序列化后的字节数组
     * @param value      消息的 value
     * @param valueBytes 消息的 value 序列化后的字节数组
     * @param cluster    集群元数据可以查看分区信息
     * @return
     */

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        //获取消息
        String msgValue = value.toString();

        //创建partition
        int partition;

        //判断消息是否包含silence
        if (msgValue.contains("silence")) {
            partition = 0;
        } else {
            partition = 1;
        }

        //返回分区号
        return partition;
    }

    //关闭资源
    @Override
    public void close() {

    }

    //配置方法
    @Override
    public void configure(Map<String, ?> configs) {

    }
}

使用分区器的方法,在生产者的配置中添加分区器参数
在这里插入图片描述

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerCallbackPartitions {
    public static void main(String[] args) {

        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();

        // 2. 给 kafka 配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop100:9092");

        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 添加自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.itsp.kafka.producer.MyPartitioner");

        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 4. 调用 send方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "call of silence" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {

                    if (e == null) {
                        System.out.println("主题 : " + metadata.topic() + "->" + "分区" + metadata.partition());
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

测试

开启 Kafka 消费者。

在这里插入图片描述

启动生产者,在 IDEA 控制台观察回调信息。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/14629.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

代谢组学——最接近生物表型的组学

■ 什么是代谢组学 在基于基因组-转录组-蛋白质组-代谢组的系统生物学框架内&#xff0c;代谢组学 (metabolomics/metabonomics) 处于最下游&#xff0c;最接近生物表型&#xff0c;主要通过考察生物体系在某一特定时期内受到刺激或扰动前后所有小分子代谢物 (分子量小于 1500…

信创国产化大背景下,应用性能体验如何保障?

信创产业是拉动中国经济增长不可或缺的重要抓手。从2020年我国迈入信创发展元年&#xff0c;到2022年信创开始向行业“深水区”迈进&#xff0c;信创产业得到了国家相关政策的大力支持。今年9月底国家下发79号文&#xff0c;全面指导国资信创产业的发展和进度&#xff0c;明确要…

bootstrap导航窗格响应式二级菜单

这次碰到的需求是响应式二级导航窗格&#xff0c;默认的导航窗格只有点击下拉框的二级窗格&#xff0c;会有如下问题&#xff1a;一级菜单无法添加超链接&#xff0c;二级菜单展示要多点一下。 实现目标&#xff1a; 1.滑动到指定区域&#xff0c;展示二级菜单。 2.一级菜单和…

Vue3 - 响应式工具函数(使用教程)

前言 您需要对 ref()、reactive() 有所了解&#xff0c;否则要先学习这些。 Vue3 为响应式提供了一些工具函数&#xff0c;辅助开发&#xff1a; API说明isRef()检查某个值是否为 ref。isProxy()检查一个对象是否是由 reactive()、readonly()、shallowReactive() 或 shallowRe…

前端国际化如何对中文——>英文自动化翻译小demo

非专业的国际化语言。 需求是把zh.js文件中的对象的值转换为en.js&#xff08;也就是实现中英文翻译&#xff09; 结果&#xff1a; 话不多说&#xff0c;上技巧&#xff01; 首先找个免费翻译的API接口&#xff0c;我找的百度翻译的API接口。百度翻译开放平台看百度翻译技术…

仅此一招,再无消息乱序的烦恼

1. 概览 RocketMQ 早已提供了一组最佳实践&#xff0c;但工作在一线的伙伴却很少知道&#xff0c;项目中的各种随性代码经常导致消息错乱问题&#xff0c;严重影响业务的准确性。为了保障最佳实践的落地&#xff0c;降低一线伙伴的使用成本&#xff0c;统一 MQ 使用规范&#…

AF488 NHS,AF488 活性酯,Alexa Fluor488 NHS,水溶性小分子绿色荧光标记染料

AF488 NHS通过引入两个磺酸根离子&#xff0c;AF488的水溶性大大增强&#xff0c;荧光强度增加&#xff0c;pH稳定性&#xff0c;光稳定性也提高&#xff0c;但是它的激发和发射谱图基本保持不变。不像荧光素类染料&#xff0c;AF488的荧光在较宽的pH范围内(4 – 10)保持不变。…

ATF源码篇(八):docs文件夹-Components组件(7)固件配置框架

7、固件配置框架 fconf/索引 本文档概述了固件配置框架 7.1 固件配置框架是什么&#xff1f; 1 介绍 固件配置框架&#xff08;|FCONF|&#xff09;是平台特定数据的抽象层&#xff0c;允许查询“属性”并检索值&#xff0c;而请求实体不知道使用什么后备存储来保存数据。 …

Java接口(Interface)

文章目录接口语法注意事项和细节实现接口VS.继承类接口的多态特性小练习usb插槽就是现实中的接口。 你可以把手机,相机,u盘都插在usb插槽上,而不用担心那个插槽是专门插哪个的,原因是做usb插槽的厂家和做各种设备的厂家都遵守了统一的规定包括尺寸&#xff0c;排线等等。 首先创…

ISP-Gamma

参考:https://blog.csdn.net/lxy201700/article/details/24929013 http://www.cambridgeincolour.com/tutorials/gamma-correction.htm 1. 什么是Gamma Gamma是一种指数曲线&#xff0c;显示器用这个指数曲线来调整真实输出到显示屏幕上的颜色值&#xff0c;以此更好的适应人…

卷?这份Java后端架构指南首次公开就摘星百万,肝完直接60K+

最近和各位小伙伴儿私下聊的比较多&#xff0c;各个阶段的朋友都有&#xff1b;因为大环境的内卷&#xff0c;导致大家在求学、求职、提升自己的各个方面都多多少少有些迷茫焦虑&#xff1b; 这些其实是一个非常普遍且正常的现象&#xff0c;会焦虑的人&#xff0c;往往都是对…

大学生简单个人静态HTML网页设计作品 HTML+CSS制作我的家乡杭州 DIV布局个人介绍网页模板代码 DW学生个人网站制作成品下载 HTML5期末大作业

常见网页设计作业题材有 个人、 美食、 公司、 学校、 旅游、 电商、 宠物、 电器、 茶叶、 家居、 酒店、 舞蹈、 动漫、 服装、 体育、 化妆品、 物流、 环保、 书籍、 婚纱、 游戏、 节日、 戒烟、 电影、 摄影、 文化、 家乡、 鲜花、 礼品、 汽车、 其他等网页设计题目, A…

蓝牙学习一(简介)

1.简介 蓝牙分为经典蓝牙&#xff08;BT-Bluetooth&#xff09;和低功耗蓝牙&#xff08;BLE-Bluetooth Low Energy&#xff09;&#xff0c;本次主要学习BLE。 BLE分了很多个版本&#xff0c;现在用的比较多的就是4.2和5.X。那4.2到5.0之间有哪些升级呢&#xff1f;首先&#…

多肽标签X-press,DLYDDDDK

X-press Tag Peptide 是一种N-端前导肽&#xff0c;Anti-Xpress 抗体能够识别 Xpress 表位&#xff0c;因此&#xff0c;该多肽可用来纯化 X-press Tag 融合蛋白。X-press Tag Peptide is a tag peptide used for protein purification. X-press Tag is also an N-terminal lea…

【附源码】计算机毕业设计JAVA商院足球赛事管理

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; Springboot mybatis Maven Vue 等等组成&#xff0c;B/…

CAPL语言编译的那些事

CAPL是类似于C语言的面向过程语言,这是众所周知的。C或C++代码在执行前需要编译成机器语言,也就是二进制语言,如此能够更快速运行。CAPL程序也是一样的,需要编译后执行 在CAPL Browser编辑器下,Home -> Compile/Compile All,Compile编译当前打开的CAPL文件,Compile A…

基于51单片机的可调节占空比四种三种波形发生器proteus仿真

简介&#xff1a; 该系统显示器为LCD1602&#xff0c;可实时显示波形的参数情况可显示四种波形&#xff0c;分别是正弦波 三角波方波以及锯齿波该系统可以通过按键调节波形的占空比波形输出通过仿真软件的示波器可以查看得到波形发生器的核心芯片是利用DAC0832产生运放LM324经…

Jenkins部署的Windows爬虫机如何配置

文章目录一 安装软件1. Python爬虫必备安装包2. Visual Studio Code3. Git3.1. 备选 - OneDrive4. Java5. 向日葵二 配置Chrome1. 查看Chrome版本2. 下载ChromeDriver3. 解压放入Python的Scripts文件夹有时候, 一台Windows只是用来部署一些任务, 例如爬虫任务. 这个时候需要简单…

【人见人爱报错系列】GIt常见问题解决大全

前言 在使用的github\gitlab各种hub的过程中&#xff0c;会遇到各种各样的小问题&#xff0c;这些会给程序员们带来五光十色的烦恼&#xff0c;本文总结使用git的各种问题并持续更新。 一、Git用户名邮箱设置 使用git过程中&#xff0c;会切换不同项目但是发现提交人都是一样…

M1 芯片 MacBook 结合 MAMP 集成环境配置 PHP 环境变量

MacOS Catalina 版本之后 shell 改为使用 zsh 。 可以使用 echo $SHELL 命令查看。 配置文件分为系统级&#xff08;所有用户生效&#xff09;和用户级&#xff08;当前登录用户生效&#xff09;&#xff0c;可以自行了解&#xff0c;一般不经常切换用户的话&#xff0c;用户…