【Kafka】Kafka Producer 分区-05

news2024/11/28 10:56:55

【Kafka】Kafka Producer 分区-05

  • 1. 分区的好处
  • 2. 分区策略
    • 2.1 默认的分区器 DefaultPartitioner
  • 3. 自定义分区器

1. 分区的好处

(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

在这里插入图片描述

see 2.4.1http://t.csdnimg.cn/m1O9u

2. 分区策略

2.1 默认的分区器 DefaultPartitioner

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky 
partition that changes when the batch is full.
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {
 … …
}

策略如下:

  1. 如果记录中指定了分区,使用指定的分区。
    这意味着生产者在发送消息时明确指定了一个分区号,Kafka将直接使用这个分区。
  2. 如果没有指定分区但存在键,根据键的哈希值选择分区。
    如果消息中没有明确指定分区,但是提供了一个键,Kafka会使用键的哈希值来计算应该使用哪个分区。这可以保证具有相同键的消息被发送到相同的分区,从而保证消息的有序性。
  3. 如果既没有指定分区也没有键,选择一个“sticky partition”(粘性分区),在批次满时更换分区。
    如果消息既没有指定分区,也没有键,Kafka将使用“sticky partition”策略。这种策略会在消息批次满时更换分区,以便于提高效率和性能。

这个注释是对DefaultPartitioner类的说明,该类实现了Partitioner接口,用于定义消息分区的策略。DefaultPartitioner类的主要功能就是根据上述规则决定消息发送到哪个分区。以下是DefaultPartitioner类的示例实现结构:

public class DefaultPartitioner implements Partitioner {
    // 初始化方法
    @Override
    public void configure(Map<String, ?> configs) {
        // 配置代码
    }

    // 计算分区的方法
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 分区计算逻辑
    }

    // 清理资源的方法
    @Override
    public void close() {
        // 资源清理代码
    }
}

该实现中包括了三个主要的方法:

  • configure(Map<String, ?> configs):用于初始化和配置分区器。
  • partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster):用于根据给定的主题、键和值来计算应该使用哪个分区。
  • close():用于在分区器关闭时清理资源。

在这里插入图片描述

案例一: 将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
public class CustomProducerCallbackPartitions {
    public static void main(String[] args) {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();

		// 2. 给 kafka 配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102
:9092 ");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<>(properties);
        for (int i = 0; i < 5; i++) {
            // 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)
            kafkaProducer.send(new ProducerRecord<>("first",
                    1, "", "atguigu " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata,
                                         Exception e) {
                    if (e == null) {
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition()
                        );
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}

案例二: 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。

public class CustomProducerCallback {
    public static void main(String[] args) {
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<>(properties);
        for (int i = 0; i < 5; i++) {
            // 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0
            kafkaProducer.send(new ProducerRecord<>("first",
                    "a", "atguigu " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata,
                                         Exception e) {
                    if (e == null) {
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition()
                        );
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}

3. 自定义分区器

如果研发人员可以根据企业需求,自己重新实现分区器

  1. 需求
    例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区。

  2. 实现步骤
    定义类实现 Partitioner 接口
    重写 partition()方法

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * 1. 实现接口 Partitioner
 * 2. 实现 3 个方法:partition,close,configure
 * 3. 编写 partition 方法,返回分区号
 */
public class MyPartitioner implements Partitioner {
    /**
     * 返回信息对应的分区
     *
     * @param topic 主题
     * @param key 消息的 key
     * @param keyBytes 消息的 key 序列化后的字节数组
     * @param value 消息的 value
     * @param valueBytes 消息的 value 序列化后的字节数组
     * @param cluster 集群元数据可以查看分区信息
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[]
            keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取消息
        String msgValue = value.toString();
        // 创建 partition
        int partition;
        // 判断消息是否包含 atguigu
        if (msgValue.contains("atguigu")) {
            partition = 0;
        } else {
            partition = 1;
        }
        // 返回分区号
        return partition;
    }

    // 关闭资源
    @Override
    public void close() {
    }

    // 配置方法
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

使用分区器的方法,在生产者的配置中添加分区器参数。

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class CustomProducerCallbackPartitions {
    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        // 添加自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atgui
                gu.kafka.producer.MyPartitioner");
                KafkaProducer < String, String > kafkaProducer = new
                        KafkaProducer<>(properties);
        for (int i = 0; i < 5; i++) {

            kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata,
                                         Exception e) {
                    if (e == null) {
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition()
                        );
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}

测试
①在 hadoop102 上开启 Kafka 消费者

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 控制台观察回调信息

主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1825394.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

sslyze一键检查服务器检查服务器的 SSL/TLS 安全性(KALI工具系列二十五)

目录 1、KALI LINUX 简介 2、sslyze工具简介 3、信息收集 3.1 目标主机IP&#xff08;服务器&#xff09; 3.2 KALI的IP 4、操作示例 4.1 扫描主机和端口 4.2 批量扫描 4.3 插件扫描 4.4 输出结果 5、总结 1、KALI LINUX 简介 Kali Linux 是一个功能强大、多才多艺…

【会议征稿,ACM出版】2024年云计算与大数据国际学术会议(ICCBD 2024,7月26-28)

2024年云计算与大数据国际学术会议(ICCBD 2024)将于2024年7月26-28日在中国大理召开。ICCBD 2024将围绕“云计算与大数据”的最新研究领域, 旨在为从事研究的专家、学者、工程师和技术人员提供一个国际平台&#xff0c;分享科研成果和尖端技术&#xff0c;了解学术发展趋势&…

潮玩宇宙大逃杀APP系统开发成品案例分享指南

这是一款多人游戏&#xff0c;玩家需要选择一个房间躲避杀手。满足人数后&#xff0c;杀手会随机挑选一个房间杀掉里面所有的参与者&#xff0c;其他房间的幸存者将平均瓜分被杀房间的元宝。玩家在选中房间后&#xff0c;倒计时结束前可以自由切换不同房间。 软件项目开发成品…

B3981 [信息与未来 2024] 图灵完备

题目描述 &#xff08;你不需要看懂这张图片&#xff1b;但如果你看懂了&#xff0c;会觉得它很有趣。&#xff09; JavaScript 是一种功能强大且灵活的编程语言&#xff0c;也是现代 Web 开发的三大支柱之一 (另外两个是 HTML 和 CSS)。灵活的 JavaScript 包含“自动类型转换…

python-jupyter notebook安装教程

&#x1f308;所属专栏&#xff1a;【python】✨作者主页&#xff1a; Mr.Zwq✔️个人简介&#xff1a;一个正在努力学技术的Python领域创作者&#xff0c;擅长爬虫&#xff0c;逆向&#xff0c;全栈方向&#xff0c;专注基础和实战分享&#xff0c;欢迎咨询&#xff01; 您的…

【ARM 安全系列介绍 3.7 -- SM4 对称加密算】

请阅读【嵌入式开发学习必备专栏 Cache | MMU | AMBA BUS | CoreSight | Trace32 | CoreLink | GCC | CSH | Armv8/v9 系统异常分析】 文章目录 SM4 加密算法简介SM4 工作模式算法步骤加密举例注意事项 Principle of SM4 encryption algorithm SM4 加密算法简介 SM4是一种分组…

填表统计预约打卡表单系统(FastAdmin+ThinkPHP+UniApp)

填表统计预约打卡表单系统&#xff1a;一键搞定你的预约与打卡需求​ 填表统计预约打卡表单系统是一款基于FastAdminThinkPHPUniApp开发的一款集信息填表、预约报名&#xff0c;签到打卡、活动通知、报名投票、班级统计等功能的自定义表单统计小程序。 &#x1f4dd; 一、引言…

Docker镜像技术剖析

目录 1、概述1.1 什么是镜像&#xff1f;1.2 联合文件系统UnionFS1.3 bootfs和rootfs1.4 镜像结构1.5 镜像的主要技术特点1.5.1 镜像分层技术1.5.2 写时复制(copy-on-write)策略1.5.3 内容寻址存储(content-addressable storage)机制1.5.4 联合挂载(union mount)技术 2.机制原理…

以keepalived为例说明程序不能正常被gdb调试的原因

现象 通过gdb att $keepalived_pid发起对当前运行keepalived的调试&#xff1b; 在放行keepalived继续执行后&#xff0c;想通过CtrlC按键中断执行&#xff0c;观察下被调试程序的当前内部状态&#xff0c; 但是&#xff0c;在终端输入CtrlC后&#xff0c;导致keepalived被调…

靠这10个神级搜书网站,实现你电子书自由(含有声书资源)!

2024搜书利器大盘点&#xff0c;让你轻松找到心仪的电子书&#xff0c;你想要的都有&#xff01;竟然还有有声书&#xff01;速度收藏&#xff0c;这一次&#xff0c;让你实现电子书自由&#xff01; 阿星今天又来给你们送大礼了&#xff01;这次不是别的&#xff0c;是搜书网…

亚马逊测评自养号误区解析

大家都知道亚马逊的评价对产品listing曝光和流量是有很大影响&#xff0c;但是亚马逊的评价又不是那么容易获取的&#xff0c;再加上亚马逊平台风控的不断严苛&#xff0c;所以卖家们想尽办法打造爆款listing是每个亚马逊卖家共同的目标&#xff0c;尤其是当旺季到来时&#xf…

SSM名城养老院管理系统-计算机毕业设计源码03948

目 录 摘要 1 绪论 1.1选题的意义 1.2研究现状 1.3Vue.js 主要功能 1.4ssm框架介绍 2 1.5论文结构与章节安排 3 2 名城养老院管理系统分析 4 2.1 可行性分析 4 2.2 系统流程分析 4 2.2.1数据增加流程 5 2.3.2数据修改流程 5 2.3.3数据删除流程 5 2.3 系统功能分析 5 2.3.…

FreeRTOS移植:STM32L476 nucleo-L476RG 开发板《03》

系列文章 FreeRTOS移植&#xff1a;STM32L476 nucleo-L476RG 开发板《01》 FreeRTOS移植&#xff1a;STM32L476 nucleo-L476RG 开发板《02》 说明 在上篇 FreeRTOS移植&#xff1a;STM32L476 nucleo-L476RG 开发板《02》 开始移植适配 FreeRTOS&#xff0c;FreeRTOS 移植适配…

python-02

面向对象 Python中把具有相同属性和方法的对象归为一个类。 class ClassName: 语句 class Myclass: # 定义类Myclassdef pp(self): # 定义方法pp()print("这是产品说明书")myclass Myclass() # 实例化类Myclass myclass.pp() # 调用Myclass中的方法pp()打印…

深入浅出 Babel:现代 JavaScript 的编译器

在现代前端开发中&#xff0c;JavaScript 的版本更新速度非常快&#xff0c;新的语法和特性层出不穷。然而&#xff0c;旧版本的浏览器并不总是支持这些新特性。为了确保代码的兼容性和稳定性&#xff0c;我们需要一个工具来将现代 JavaScript 代码转换为旧版本的代码。Babel 就…

【Windchill监听器、队列、排程】

目录 Windchill监听器 监听器的概念 监听器的监听器实现原理 监听器的客制化 Windchill队列、排程 队列、排程的概念 Windchill常见出厂队列 自定义队列 Windchill 11新增功能 Windchill监听器 监听器的概念 监听器&#xff0c;字面上的理解就是监听观察某个事件&…

二进制中的相反数

相反数的本质 相反数的本质是两数相加等于 0&#xff0c;1 加上 1 的相反数-1 永远等于 0。 二进制中取相反数的公式 对于二进制运算来说减法是通过加上一个负数实现的&#xff0c;所以想要达成两数相加等于 0 的情况一定是通过溢出来实现。两数相加等于 0 可以带入为 1111…

部署LVS-DR群集...

目录 最后一台主机&#xff08;第四台&#xff09; 本地yum源安装httpd&#xff08;非必做&#xff09; 继续开始从最后一台主机开始&#xff08;第四台&#xff09; 转第二台主机 转第三台主机 回第二台 上传 转第三台主机 上传 回第二台 转第三台 转第一台主机…

一文看懂:数据湖、数据仓库、数据中台,浅显直白!

许多初入数据分析和数据可视化行业的人&#xff0c;对一些概念的认知往往很模糊&#xff0c;贝格前端工场截借此机会给大家讲解一下数据湖、数据仓库和数据中台的概念&#xff0c;力求浅显易懂。 一、什么是数据湖 数据湖是一种用于存储大量原始数据的存储系统&#xff0c;它…

拓扑排序、关键路径(AOV、AOE网)

拓扑排序&#xff08;AOV网&#xff09; 相关知识 在现代化管理中&#xff0c;人们常用有向图来描述和分析一项工程的计划和实施过程&#xff0c;一个工程常被分为多个小的子工程&#xff0c;这些子工程被称为活动&#xff08;Activity)。 在有向图中若以顶点表示活动&#xff…