Kafka Producer - 分区机制实战

news2025/1/20 14:49:10

Kafka Producer - 分区机制实战

上一篇介绍了kafka Producer 生产者发送数据的程序代码,以及对生产者分区机制的相关介绍,今天继续深入的了解下分区机制的原理、测试验证、自定义分区。

在学习之前先在本地机器搭建一个单机版的双节点集群环境,方便后面做测试,另外本机使用的软件版本信息如下:

  • JDK17
  • kafka_2.13-3.3.1

搭建集群

修改配置

# 1. 在kafka根目录 创建cluster目录
mkdir cluster
# 2. 复制配置文件 模板
cp config/server.properties cluster/server_n1.properties
cp config/server.properties cluster/server_n2.properties
cp config/server.properties cluster/server_n3.properties
# 修改 server_n1.properties 并保存
vim cluster/server_n1.properties
# 更改相关内容如下
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/tmp/node1/kafka-logs 
# 修改 server_n2.properties 并保存
vim cluster/server_n2.properties
# 更改相关内容如下
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/node2/kafka-logs 
# 修改 server_n3.properties 并保存
vim cluster/server_n3.properties
# 更改相关内容如下
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/node3/kafka-logs 

启动集群

# 1. 启动zk
./bin/zookeeper-server-start.sh config/zookeeper.properties
# 2. 启动Node1
./bin/kafka-server-start.sh cluster/server_n1.properties
# 3. 启动Node2 
./bin/kafka-server-start.sh cluster/server_n2.properties
# 3. 启动Node3
./bin/kafka-server-start.sh cluster/server_n3.properties

创建topic

# 1. 创建一个分区为3的topic
./bin/kafka-topics.sh --create --topic topic_t3 --bootstrap-server localhost:9092 --partitions 3

# 2. 创建完成后,查看主题信息
./bin/kafka-topics.sh --describe --topic topic_t3 --bootstrap-server localhost:9092

在这里插入图片描述

测试代码

public class SimpleProducer {

    public static void main(String[] args) throws Exception{
        String topicName = "topic_t3";
        Properties props = new Properties();
        //指定kafka 服务器连接地址
        props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        // 发送失败 重试次数
        props.put("retries", 0);
        // 消息发送延迟时间 默认为0 表示消息立即发送,单位为毫秒
        props.put("linger.ms", 0);
        // 序列化方式
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "message : " + i);
                Future<RecordMetadata> send = producer.send(record);
                RecordMetadata metadata = send.get();
                System.out.println(String.format("sent record(key=%s value=%s) 分区数: %d, 偏移量: %d, 时间戳: %d",
                        record.key(), record.value(),
                        metadata.partition(),metadata.offset(), metadata.timestamp()
                ));
        }
        System.out.println("Message sent successfully");
        producer.close();
    }
}

无分区无Key

ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "message : " + i);

注意上面代码,没有指定分区,Record对象也没有指定key值, Kakfa为了内部的性能考虑,会选取其中一个节点进行发送(避免多节点发送数据造成性能损耗),该机制被称为黏性分区

在这里插入图片描述

无分区指定Key

// ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "message : " + i);
// 替换成以下代码
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, String.valueOf(i) ,"message : " + i);

重新执行,测试结果

在这里插入图片描述

当key不为空时,Kafka默认使用key的hash值,来计算待发送的分区值,核心代码如下

org.apache.kafka.clients.producer.internals.BuiltInPartitioner#partitionForKey

/*
     * Default hashing function to choose a partition from the serialized key bytes
     */
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
  return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}

Round-Robin 机制

// 指定轮训分区算法,将数据均匀的打散在不同的节点
props.put("partitioner.class","org.apache.kafka.clients.producer.RoundRobinPartitioner");

在这里插入图片描述

测试结果如上,整体的效果图如下

在这里插入图片描述

相关配置参数

Kafka 默认给生产者提供了许多的参数,进行分区策略的配置

  • partitioner.class - 默认值为null, 显示指定分区的策略,可以是自定义分区机制
  • partitioner.ignore.keys - 默认为false, 设置为true时,生产者不会使用键来计算分区,注意:如使用自定义分区,则此设置无效
  • partitioner.adaptive.partitioning.enable - 默认为true, 当设置为“true”时,生产者将会想Broker性能好的服务发送更多的消息,。如果为“false”,生产者将尝试统一分发消息。注意:如果使用自定义分区器,则此设置无效
  • partitioner.availability.timeout.ms - 默认值为0,如果Broker 在partitioner.availability.timeout时间内无法处理请求,将会视为该分区无效不可用。如果值为0,则禁用此逻辑。注意:如果使用自定义分区器或分区er.adaptive.partitioning,则此设置无效。enable设置为“false”

自定义分区

程序代码

开发者可以选择实现 org.apache.kafka.clients.producer.Partitioner 接口来自定义分区机制,满足特殊的业务场景需求,接下来利用随机算法实现一个自定义分区的功能。该实现么有任何实际的作用,仅仅只是作为学习使用

public class RandomPartitioner implements Partitioner {
    private Random random = new Random();

    public void configure(Map<String, ?> configs) {}

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return random.nextInt(numPartitions);
    }

    public void close() {}
}

测试验证

// 指定自定义的分区策略实现类
props.put("partitioner.class","org.kafka.example.RandomPartitioner");

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/105975.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

80.【Spring5】

Spring《解耦》(一)、Spring 简介1.历史:2.Spring 目的3.Spring 引入4.优点5.Spring 七大模块组成:6.扩展&#xff08;约定大于配置&#xff09;(二)、IOC理论推导(Inversion of Contro)1.以前的三层分级2.现在对三层架构的更新3.什么是IOC(三)、HelloSpring1.怎么使用Spring?…

技术分享 | 缓存穿透 - Redis Module 之布隆过滤器

作者&#xff1a;贲绍华 爱可生研发中心工程师&#xff0c;负责项目的需求与维护工作。其他身份&#xff1a;柯基铲屎官。 本文来源&#xff1a;原创投稿 *爱可生开源社区出品&#xff0c;原创内容未经授权不得随意使用&#xff0c;转载请联系小编并注明来源。 一、场景案例 假…

设计模式-抽象工厂模式

1、什么是抽象工厂模式 抽象工厂&#xff08;AbstractFactory&#xff09;模式的定义&#xff1a;是一种为访问类提供一个创建一组相关或相互依赖对象的接口&#xff0c;且访问类无须指定所要产品的具体类就能得到同族的不同等级的产品的模式结构。抽象工厂模式是工厂方法模式的…

Tiny ImageNet 数据集分享

ImageNet官网上的数据集&#xff0c;动辄就100G&#xff0c;真的是太大了。 有需要Tiny Image Net 数据集的小伙伴可以点击这个下载链接&#xff1a; http://cs231n.stanford.edu/tiny-imagenet-200.zip数据集简介&#xff1a; Tiny ImageNet Challenge 来源于斯坦福 CS231N …

uwb无线定位系统的原理和介绍

uwb无线定位系统是在 uwb平台上部署的定位基站&#xff0c;通过发射无线信号&#xff0c;将 uwb定位系统部署在需要安装的位置&#xff0c;同时结合定位基站所支持工作环境条件&#xff08;如&#xff1a;温度、湿度、光照等&#xff09;和定位算法&#xff0c;实现在不同的地理…

使用elesticsearch-7.10.0版本连接elasticsearch-head

背景&#xff1a; 由于esasticsearch-5.5.1中没有登录&#xff0c;登出的安全校验&#xff0c;在安全测评时&#xff0c;经常被检查到高危漏洞&#xff0c;因此项目经常要升级到es7版本。 问题一&#xff1a;jdk版本不满足要求&#xff0c;提示如下 future versions of Elasti…

Js实现轮盘抽奖功能,一招帮你解决选择困难症

不知道今天自己该吃什么&#xff0c;一招帮你解决选择困难症。 通过htmlcssjs实现一个轮盘抽奖功能。我们可以将平时吃的饭菜输入到代码中&#xff0c;每到纠结的时候只需点开抽一次就可以了。 实现步骤 html代码&#xff1a; 整体实现的结构是一个大的圆形&#xff0c;分成…

热门项目披露:成都双流板桥轨道城市发展有限公司100%股权转让

热门项目披露&#xff1a;成都双流板桥轨道城市发展有限公司100%股权转让&#xff1b;该项目由 西南联合产权交易所 发布&#xff0c;于2022年12月9日被塔米狗平台收录。 项目方 成都双流板桥轨道城市发展有限公司&#xff0c; 成立于 2021年9月7日 &#xff0c; 注册资金 100…

域控制器交付量「翻番」,汽车中间件赛道竞争升级

作为软件定义汽车的关键环节&#xff0c;智能汽车中间件赛道&#xff0c;正在成为兵家必争之地。 从传统IT架构的角度看&#xff0c;中间件位于上层应用和底层操作系统之间&#xff1b;除了基础的通信交互外&#xff0c;中间件还承载着屏蔽底层复杂性的功能&#xff0c;向下适配…

005:UITableView

介绍&#xff1a; 提示&#xff1a;数据量大、样式较为统一、分组的需要以及滚动的需求。 图示&#xff1a; UITableViewDataSource&#xff1a; 提示UITableView作为视图&#xff0c;只负责展示&#xff0c;协助管理&#xff0c;不管数据需要开发者为UITableView提供展示需…

Framework底层原理——Binder调用流程分析

binder是一个非常好的跨进程通信工具&#xff0c;Android对其进行了各种封装&#xff0c;虽然我们用起来简单&#xff0c;但是理解起来却比较困难。 1.自己设计一个跨进程通信机制 在理解binder之前呢&#xff0c;首先我们想一下&#xff0c;如果我们自己设计一个跨进程通信的…

简单Thinkphp5.1如何使用Topsdk\Topapi

一淘模板&#xff08;56admin.cn&#xff09;给大家介绍tp5.1相关知识&#xff0c;其中主要记录tp5.1是怎么使用Topsdk\Topapi&#xff08;对接淘宝客开放平台&#xff09;&#xff0c;希望对需要的朋友有所帮助&#xff01; 1、公司有一项目需要对接淘宝开放平台 先去申请帐号…

tensorrt debug问题汇总

目录 1. Dynamic dimensions required for input: input, but no shapes were provided. Automatically overriding 2. sampleMNIST.obj : error LNK2019: 无法解析的外部符号 cudaStreamCreate 3. Assertion failed: (smVersion &#xff1c; SM_VERSION_A100) &&…

条码管理系统,助力企业打造轻量级数字化车间

在原辅材料供应、生产管理、仓储物流、市场营销等相关业务环节中&#xff0c;采取适当的软硬件技术手段&#xff0c;实时记录产品信息。通过查询可以随时跟踪产品的生产状态、仓储状态和流向&#xff0c;达到可追溯管理的目的。随着制造企业对精细化管理要求的提高&#xff0c;…

【QT开发笔记-基础篇】| 第五章 绘图QPainter | 5.7 画笔设置

本节对应的视频讲解&#xff1a;B_站_视_频 https://www.bilibili.com/video/BV16W4y1g7dM 经过前面几节课的讲解&#xff0c;学会了绘制点、线、多段线、多边形、矩形、圆角矩形 到这里就可以学习画笔和画刷的设置了&#xff0c;本节先讲解画笔的设置 Qt 中画笔的类是 QPen…

正则表达式验证合集

1.定义封装的公共js 在src下定义一个util文件夹&#xff0c;并且定义个validate.js(当然你想取什么名字就什么名字哈哈哈哈) 2.上代码 //邮箱 /*** 邮箱* param {*} s*/ export function isEmail(s) {return /^([a-zA-Z0-9_-])([a-zA-Z0-9_-])((.[a-zA-Z0-9_-]{2,3}){1,2}…

基于FPGA的 矩阵键盘按键识别 【原理+源码】

目录 引言 原理阐述 实现方法 源码分享 板级调试演示 引言 最近了解了矩阵键盘扫描的原理&#xff0c;动手实现了一下&#xff0c;在这里做一个简单的总结。 原理阐述 矩阵键盘典型电路&#xff1a; FPGA的应用电路&#xff1a; 其中&#xff0c;行信号为FPGA输入信号&a…

企业从哪里开始构建弹性 IT 基础架构

混合工作模式扩大了工作范围&#xff0c;增加了 IT 团队的负担&#xff0c;因为他们需要在面对增加的攻击面时保持弹性。入侵企业的 IT 基础架构只需要一个受损的身份。 什么是企业标识&#xff1f; 这些是用户名、密码、网络、端点、应用程序等&#xff0c;充当业务敏感信息…

CheatEngine教程-官方9关

文章目录第一步&#xff1a;环境准备&#xff0c;下载并安装CE第二关&#xff1a;精确扫描数值第三关&#xff1a;未知数值扫描第四关&#xff1a;浮点数的扫描第五关&#xff1a;代码替换功能第六关&#xff1a;关于指针第七关&#xff1a;简单代码注入第八关&#xff1a;查找…

力扣(LeetCode)173. 二叉搜索树迭代器(C++)

设计 根据二叉树的中序遍历的迭代解法&#xff0c;稍改代码&#xff0c;就是本题的解法。 初始化 : 传入了根结点&#xff0c;根据迭代思路&#xff0c;将结点的左链依次入栈。 nextnextnext : 栈顶结点就是所求。根据迭代思路&#xff0c;当前结点要变成栈顶结点的右儿子。由…