Kafka3.0.0版本——生产者自定义分区器

news2025/1/14 0:56:05

目录

    • 一、生产者自定义分区器代码示例
      • 1.1、自定义分区器类
      • 1.2、生产者发送消息代码(生产者的配置中添加分区器参数)
      • 1.3、测试

一、生产者自定义分区器代码示例

1.1、自定义分区器类

  • 代码

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import java.util.Map;
    /**
     * 1. 实现接口 Partitioner
     * 2. 实现 3 个方法:partition,close,configure
     * 3. 编写 partition 方法,返回分区号
     */
    public class MyPartitioner implements Partitioner {
        /**
        * 返回信息对应的分区
        * @param topic 主题
        * @param key 消息的 key
        * @param keyBytes 消息的 key 序列化后的字节数组
        * @param value 消息的 value
        * @param valueBytes 消息的 value 序列化后的字节数组
        * @param cluster 集群元数据可以查看分区信息
        * @return
        */
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            // 获取消息
            String msgValues = value.toString();
            int partition;
            // 判断消息是否包含 hello
            if (msgValues.contains("hello")){
                partition = 0;
            }else {
                partition = 1;
            }
            // 返回分区号
            return partition;
        }
    
        /**
         * 关闭资源
         */
        @Override
        public void close() {
    
        }
    
        /**
         * 配置方法
         */
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    

1.2、生产者发送消息代码(生产者的配置中添加分区器参数)

  • 代码

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    /**
     * @author: xz
     * @since: 2023/4/9 20:56
     * @description: 使用自定义的分区器方法,在生产者的配置中添加分区器参数。
     */
    public class CustomProducerMyPartitioner {
        public static void main(String[] args) throws InterruptedException {
    
            //1、创建 kafka 生产者的配置对象
            Properties properties = new Properties();
    
            //2、给 kafka 配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            //3、指定对应的key和value的序列化类型 key.serializer value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            
            //4、添加自定义分区器
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class);
            
            //5、创建 kafka 生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //6、调用 send 方法,发送消息
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("news", "hello kafka" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                        }
                    }
                });
                Thread.sleep(2);
            }
    
            //7、关闭资源
            kafkaProducer.close();
        }
    }
    

1.3、测试

  • 在 kafka集群上开启 Kafka 消费者

    [root@localhost kafka-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.136.28:9092 --topic news
    
  • 启动main方法,在 IDEA 控制台观察回调信息,发送消息内容包含hello,则发送到0号分区,如下图:
    在这里插入图片描述

  • 发送消息内容不包含hello,则发送到1号分区,如下图:

    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/415973.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Web API学习笔记1(DOM学习)

一、API 和 web API 1API API —— 应用程序编程接口&#xff0c;是给程序员提供的一种工具&#xff0c;以便能更轻松的实现想要完成的功能。可以比作为充电接口 2.Web API 是浏览器提供的一套操作浏览器功能和页面元素的API&#xff08;BOM和DOM&#xff09;&#xff0c;主…

HarmonyOS/OpenHarmony应用开发-ArkTS画布组件CanvasRenderingContext2D对象(十一)

measureText measureText(text: string): TextMetrics 该方法返回一个文本测算的对象&#xff0c;通过该对象可以获取指定文本的宽度值。 示例&#xff1a; // xxx.etsEntryComponentstruct MeasureText { private settings: RenderingContextSettings new RenderingConte…

机器学习 异常值检测与处理

文章目录一、异常值检测1.1 简单统计1.2 3σ原则检测1.3 箱线图检测1.4 DBScan密度聚类二、异常值处理异常值是指不属于某一特定群体的数据点。它是一个与其他数值大不相同的异常观测值&#xff0c;与良好构成的数据组相背离。在机器学习建模准备数据集时&#xff0c;检测出所有…

Day940.开发分支 -系统重构实战

开发分支 Hi&#xff0c;我是阿昌&#xff0c;今天学习记录的是关于开发分支的内容。 组件化&#xff0c;软件变得更加高内聚、低耦合&#xff0c;开发及维护的效率也更高了&#xff0c;但是组件化的架构又会引入新的复杂度。 举个例子&#xff0c;在重构前我们基于一个模块…

解决Failed to load ApplicationContext问题的思路

中文翻译&#xff1a; 加载ApplicationContext失败 第一步&#xff1a;首先检查测试类的注解 以及 依赖 SpringBootTest <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope…

【数据库复习】第三章关系数据库标准语言SQL 集合查询 2

用EXISTS/NOT EXISTS实现全称量词 SQL语言中没有全称量词" &#xff08;For all&#xff09; 可以把带有全称量词的谓词转换为等价的带有存在量词的谓词&#xff1a; 查询学生S没有选修的课程 查询选修了全部课程的学生姓名。 等价于&#xff1a;查询这样的学生&#xf…

基于灵动微SPIN系列开发的水泵方案介绍 以 MM32SPIN040C/MM32SPIN560C为主控

水泵是输送液体或使液体增压的机械。它将原动机的机械能或其他外部能量传送给液体&#xff0c;使液体能量增加&#xff0c;主要用来输送液体包括水、油、酸碱液、乳化液、悬乳液和液态金属等。 水泵以 MM32SPIN040C/MM32SPIN560C为主控。 水泵方案 MCU: MM32SPIN系列 1.输入…

redis主从复制详解

文章目录主从复制概述主从复制的作用主要包括&#xff1a;数据冗余故障恢复负载均衡高可用基石主从库之间采用的是读写分离的方式读操作写操作主从复制原理全量复制确立主从关系全量复制的三个阶段第一阶段是主从库间建立连接、协商同步的过程&#xff0c;主要是为全量复制做准…

业务逻辑复杂如何解决性能问题

0 前言 上节针对生成订单信息这个接口做了三个阶段的分析定位和优化动作&#xff0c;让TPS变得正常。不过&#xff0c;系统资源并没有完全用起来&#xff0c;这个接口显然还有优化空间。性能优化的过程中&#xff0c;要把资源都用起来。 在性能环境中做优化&#xff0c;把资源…

自动化测试框架之selenium

目录1 自动化测试1.1 单元测试1.2 接口测试1.3 UI测试1.3.1 UI自动化测试的优点&#xff1a;1.3.2 UI自动化测试的适用对象1.4 自动化测试流程2 selenium3 selenium IDE 录制脚本1 自动化测试 自动化测试指软件测试的自动化&#xff0c;在预设状态下运行应用程序或者系统&…

50 openEuler搭建PostgreSQL数据库服务器-配置环境

文章目录50 openEuler搭建PostgreSQL数据库服务器-配置环境50.1 关闭防火墙并取消开机自启动50.2 修改SELINUX为disabled50.3 创建组和用户50.4 创建数据盘50.4.1 方法一&#xff1a;在root权限下使用fdisk进行磁盘管理50.4.2 方法二&#xff1a;在root权限下使用LVM进行磁盘管…

图解HTTP阅读笔记:第5章 与HTTP协作的Web服务器

《图解HTTP》第五章读书笔记 图解HTTP第5章 与HTTP协作的Web服务器5.1 用单台虚拟主机实现多个域名5.2 通信数据转发程序&#xff1a;代理、网关、隧道5.2.1 代理5.2.2 网关5.2.3 隧道5.3 保存资源的缓存5.3.1 缓存的有限期限5.3.2 客户端的缓存第5章 与HTTP协作的Web服务器 5…

学习系统编程No.18【进程间通信之管道实战】

引言&#xff1a; 北京时间&#xff1a;2023/4/11/21:17&#xff0c;今天的文章更新啦&#xff01;但是还是没有上热榜&#xff0c;所以我们需要继续更文啦&#xff01;我相信下一篇博客肯定是可以上热榜的&#xff0c;加油&#xff01;并且今天晚上因为有一节体育课&#xff…

Linux下让进程不再被拉起

Linux下为了防止应用挂掉&#xff0c;我们会设置服务进程来拉起这些应用。但某些流氓软件也会利用该机制使得它们被杀掉后能再被拉起来。本文讲述让这些进程不再被拉起的方法。 比如&#xff0c;有名称为recordmain.bin的进程&#xff0c;使用kill -9 杀掉它后&#xff0c;过几…

NVIDIA jetson tensorrt加速yolov5摄像头检测

link 在使用摄像头直接检测目标时&#xff0c;检测的实时画面还是有点慢&#xff0c;下面是tensorrt加速过程记录。 一、设备 1、设备jetson agx xavier 2、jetpack4.6.1 3、tensorrt 8.2.1.8 4、conda虚拟环境 python3.6 二、虚拟环境搭建及依赖 1、参考此博客安装torch Nvidi…

做自动化测试时所谓的“难点”

这篇关于自动化测试的文章&#xff0c;可能和你看到的大多数自动化的文章有所不同。我不是一位专职的自动化测试工程师&#xff0c;没有开发过自动化的工具或者框架&#xff0c;用的自动化的工具也不多&#xff0c;也没有做过开发&#xff0c;所以我讲不出那些现在很多人很看重…

[C++]日期类计算器的模拟实现

目录 日期类计算器的模拟实现&#xff1a;&#xff1a; 1.获取某年某月的天数 2.构造函数 3.拷贝构造函数 4.赋值运算符重载 5.析构函数 6.日期天数 7.日期天数 8.日期-天数 9.日期-天数 10.前置的运算符重载 11.后置的运算符重载 12.前置--的运算符重载 13.后置--的运算符重载…

前后端交互系列之Axios详解(包括拦截器)

目录前言一&#xff0c;服务器的搭建二&#xff0c;Axios的基本使用2.1 Axios的介绍及页面配置2.2 如何安装2.3 Axios的前台代码2.4 Axios的基本使用2.5 axios请求响应结果的结构2.6 带参数的axios请求2.7 axios修改默认配置三&#xff0c;axios拦截器3.1 什么是拦截器3.2 拦截…

Go分布式爬虫笔记(二十)

文章目录20 调度引擎调度引擎目标通道函数选项模式函数式选项模式的好处通道底层原理无缓冲区的通道带缓冲区的通道Select 机制的底层原理思考题在我们的课程中&#xff0c;schedule 函数其实有一个 bug&#xff0c;您能看出来吗&#xff1f;你觉得可以用什么方式找出这样的 Bu…

OTA A/B 分区升级 update_engine简介

近期猛然发现公司的项目都已经换成了AB升级&#xff0c;AB升级之前一直有所了解&#xff0c;只是一直都没有去仔细查看过其具体升级流程&#xff0c;这两天抽空捋了捋&#xff0c;简单整理下。 AB升级&#xff08;谷歌官网叫法无缝更新&#xff09;是自android7.0开始新增的一…