kafka入门,生产者自定义分区(六)

news2024/11/18 13:57:16

1、实现Partitioner接口

package com.longer.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * 实现接口Partitioner
 * 实现3个方法:partition,close,configure
 * 编写partition方法,返回分区号
 */
public class MyPartitioner implements Partitioner {

    /** 返回信息对应的分区
     * @param topic 主题
     * @param key 消息的 key
     * @param keyBytes 消息的 key 序列化后的字节数组
     * @param value 消息的 value
     * @param valueBytes 消息的 value 序列化后的字节数组
     * @param cluster 集群元数据可以查看分区信息
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //获取消息
        String msgValue = value.toString();
        //创建partition
        int partition;
        //判断消息求模
        return Integer.valueOf(msgValue) % 3;
    }

    //关闭资源
    @Override
    public void close() {

    }

    //p配置方法
    @Override
    public void configure(Map<String, ?> map) {

    }
}

2、使用自定义分区器

主要代码properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.longer.producer.MyPartitioner");

package com.longer.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Map;
import java.util.Properties;

public class CustomProducerCallbackPartitions3 {
    public static void main(String[] args) {
        //1、创建kafka生产者得配置对象
        Properties properties=new Properties();
        //2、给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //3、key value 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
       //添加自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.longer.producer.MyPartitioner");
        //4、创建kafka生产者对象
        KafkaProducer<String,String> producer=new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 5; i++) {
            //指定数据发送到1号分区,key为空(IDEA中,ctrl+p查看参数)
            producer.send(new ProducerRecord<>("first",    String.valueOf(i)), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if(e==null){
                        System.out.println(String.format("主题:%s,分区:%s",metadata.topic(),metadata.partition()));
                        return;
                    }
                    e.printStackTrace();
                }
            });
        }
        //关闭资源
        producer.close();
    }
}

效果

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/694804.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

EasyCVR级联后上级在线,请求播放显示端口不可达是什么原因?

EasyCVR可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安防视频监控的能力&#xff0c;比如&#xff1a;视…

TestNG单元测试报错Software caused connection abort: socket write error

在用TestNG进行单元测试时&#xff0c;总出现如下错误&#xff1a;Software caused connection abort: socket write error 解决方法&#xff1a; 报错前我使用的eclipse testng插件版本为6.11&#xff0c;对插件版本降为6.8后此问题解决。&#xff08;先卸载已装testng插件&…

通过精益价值流探索研发效能提升实践

研发效能八大损耗 采用精益价值流进行分析&#xff0c;研发过程存在以下典型的八大损耗 01 缺陷修复 因上游质量问题后移而引发的工作返工&#xff0c;一般会占用新功能的时间投入&#xff0c;如果经常反复回流&#xff0c;将严重影响团队的需求交付吞吐量 02 工作等待 上游…

Postman接口测试工具使用教程【基础版】

postmanpostman是一款支持http协议的接口调试与测试工具&#xff0c;其主要特点就是功能强大&#xff0c;使用简单且易用性好 。无论是开发人员进行接口调试&#xff0c;还是测试人员做接口测试&#xff0c;postman都是我们的首选工具之一 。那么接下来就介绍下postman到底有哪…

chatgpt赋能python:Python调用宏实现SEO优化的方法

Python调用宏实现SEO优化的方法 什么是Python调用宏&#xff1f; 在Python语言中&#xff0c;宏是一种可以被调用的代码模板&#xff0c;可以在程序运行时被动态地解析和执行。Python的宏通常用于代码重用和快速开发&#xff0c;可以大大提高代码的可维护性和开发效率。 Pyt…

青大数据结构【2018】【综合应用】

关键字&#xff1a; 二叉排序树、先序中序排列、平均查找长度ASL、快速排序、堆排序 &#xff08;3&#xff09; 采用堆排序&#xff1b; 因为快速排序在基本有序&#xff08;逆序&#xff09;的情况下&#xff0c;达到最坏的时间复杂度O(n2)。

【im推送苹果推】日历推送筛选与 APNs 的通讯协定应基于您的需要、技能栈和开辟环境

筛选与 APNs 的通讯协定应基于您的需要、技能栈和开辟环境。如下是一些发起来帮忙您做出选择&#xff1a; HTTP/2 协议合用情况&#xff1a; 如果您的技术栈支撑 HTTP/2&#xff0c;且您的开发环境能够大要轻松地集成和利用 HTTP/2 库或框架&#xff0c;那么选择 HTTP/2 是一…

48. 旋转图像----从一道题中深入理解数组=

48. 旋转图像 原题链接&#xff1a;完成情况&#xff1a;开始分析&#xff1a;&#xff08;1&#xff09;&#xff08;2&#xff09;&#xff08;3&#xff09; 解释&#xff1a;如果是一维数组如果是二维数组数组传递的底层原理 原题链接&#xff1a; 48. 旋转图像 https://…

【产品设计】电商类产品搜索功能如何优化?

搜索功能对于电商类产品而言尤为重要&#xff0c;当用户带着明确的目的去搜索自己需要的产品时&#xff0c;却没有得到他想要的结果&#xff0c;这在很大程度上直接影响了用户对于产品的体验。本文作者结合自己的经验&#xff0c;来探讨关于电商类产品搜索功能应如何优化。 在讲…

kettle之数据库连接-Generic database连接hive(CDH版)

版本&#xff1a;kettle7.1、hive-common-1.1.0-cdh5.5.0 目录 1、创建连接 2、org/apache/thrift/TException 3、org.apache.hadoop.conf.Configuration 1、创建连接 当我们想通过jdbc方式连接hive时&#xff0c;可以配置一个通用的一般数据连接Generic database&#xff…

理解GPT-4:人工智能的全新里程碑及其国内使用

在人工智能领域&#xff0c;每一代的进步都是突破性的。近年来&#xff0c;这个领域的发展尤为引人注目&#xff0c;尤其是在语言处理和生成方面。OpenAI的GPT系列模型就是最好的例证。在GPT-3取得巨大成功后&#xff0c;我们迎来了更强大的GPT-4。 GPT-4: AI的新里程碑 GPT-4…

【2022吴恩达机器学习课程视频翻译笔记】2.2监督学习-part-2

B站上面那个翻译我有点看不懂&#xff0c;打算自己啃英文翻译了&#xff08;有自己意译的部分&#xff09;&#xff0c;然后懒得做字幕&#xff0c;就丢在博客上面了&#xff0c;2.2之前的章节结合那个机翻字幕能看懂 2.2监督学习-part-2 So supervised learning algorithms …

【qiankun】前端微服务架构踩坑记录

目录 前言 1.Cannot GET /cooperation/board 场景&#xff1a; 分析 解决 2.Invalid options in vue.config.js:"css.requireModuleExtension" is not allowed 原因 解决 3.less版本升级导致除法写法未转换 原因 解决 4.主子应用样式隔离 场景 解决 5…

HOT23-反转链表

leetcode原题链接&#xff1a;反转链表 题目描述 给你单链表的头节点 head &#xff0c;请你反转链表&#xff0c;并返回反转后的链表。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5] 输出&#xff1a;[5,4,3,2,1]示例 2&#xff1a; 输入&#xff1a;head [1,2] 输…

Spring Boot 中的缓存注解

Spring Boot 中的缓存注解 在 Spring Boot 中&#xff0c;缓存是一个非常重要的话题。当我们需要频繁读取一些数据时&#xff0c;为了提高性能&#xff0c;可以将这些数据缓存起来&#xff0c;避免每次都从数据库中读取。为了实现缓存&#xff0c;Spring Boot 提供了一些缓存注…

电脑大文件夹怎么加密?大文件夹方法介绍

当我们把电脑中的各种数据分类存放在文件夹中时&#xff0c;可以方便我们使用和管理。但这些文件夹也会变得十分庞大&#xff0c;如果想要加密它们&#xff0c;就需要使用更快速、安全的方法。下面我们来了解一下电脑大文件夹的加密方法。 文件夹加密超级大师 文件夹加密超级大…

享元模式的学习与使用

1、享元模式的学习 当我们需要创建大量相似的对象时&#xff0c;享元模式&#xff08;Flyweight Pattern&#xff09;可以提供一种有效的解决方案。享元模式旨在通过共享对象来最小化内存使用和提高性能。它将对象分为可共享的内部状态&#xff08;Intrinsic State&#xff09;…

websocket前端的连接与接收数据

什么是websocket 1, WebSocket是一种用于在客户端和服务器之间进行全双工通信的网络协议。它使得在单个TCP连接上可以进行双向通信&#xff0c;允许服务器主动地发送数据给客户端&#xff0c;同时客户端也可以向服务器发送数据。与传统的HTTP请求-响应模型不同&#xff0c;Web…

百度排名代发收录怎么上百度

百度排名代发收录怎么上百度&#xff0c;如何提高百度排名&#xff0c;网站提高排名最新手册&#xff01;#seo 今天来点不一样的&#xff0c;就是讲一下百度竞价里边最基础也是最容易踩的一个坑&#xff0c;就是我发现很多人很喜欢把自己推广的产品或者说业务直接作为关键词上…

OpenAI 发布的新语音系统Whisper能力到底有多强?

OpenAI 最近发布了一个名为Whisper 的自动语音识别系统&#xff0c;声称其在英语语音识别方面已经接近人类水平的鲁棒性和准确性。这个系统使用了68万小时多任务监督数据来进行训练&#xff0c;并且在处理口音、背景噪音和技术语言等复杂场景时表现出了很好的鲁棒性。那么&…