Kafka原生API使用Java代码-生产者-分区策略-默认分区策略轮询分区策略

news2024/11/23 22:33:09

文章目录

  • 1、代码演示
  • 1.1、pom.xml
  • 1.2、KafkaProducerPartitioningStrategy.java
    • 1.2.1、ProducerConfig.LINGER_MS_CONFIG取 0 值得情况,不轮询
    • 1.2.2、ProducerConfig.LINGER_MS_CONFIG取 0 值得情况,轮询
    • 1.2.3、ProducerConfig.LINGER_MS_CONFIG取 1000 值得情况,轮询
    • 1.2.4、ProducerConfig.LINGER_MS_CONFIG取 1000 值得情况,不轮询
  • 2、分区策略
    • 2.1、linger.ms参数的含义
    • 2.2、linger milliseconds
    • 2.3、linger.ms配置参数的理解

1、代码演示

1.1、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- Generated by https://start.springboot.io -->
    <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
    <groupId>com.atguigu.kafka</groupId>
    <artifactId>kafka-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-producer</name>
    <description>kafka-producer</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

1.2、KafkaProducerPartitioningStrategy.java

1.2.1、ProducerConfig.LINGER_MS_CONFIG取 0 值得情况,不轮询

不等待,不轮询,默认分区策略

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerPartitioningStrategy {

    public static void main(String[] args) {
        // 初始化Kafka生产者配置
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口
        // 确认消息写入策略,"acks"设置为all表示所有副本都确认消息接收后才会响应
        props.put("acks", "all");
        // 消息发送失败时的重试次数,设置为0表示不重试
        props.put("retries", 0);
        // 发送缓冲区等待时间,等待1秒后,发送
        //props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器
        // 分区器选择RoundRobinPartitioner,实现消息在主题分区间的轮询分配
        //props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"  );
        // 创建Kafka生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 1000; i++) {
            producer.send(new ProducerRecord<String, String>("my_topic3", "88"), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("recordMetadata.partition() = " + recordMetadata.partition());
                    }
                }
            });

        }
        producer.close();
    }
}

在这里插入图片描述

1.2.2、ProducerConfig.LINGER_MS_CONFIG取 0 值得情况,轮询

不等待,立即发送,轮询策略

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerPartitioningStrategy {

    public static void main(String[] args) {
        // 初始化Kafka生产者配置
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口
        // 确认消息写入策略,"acks"设置为all表示所有副本都确认消息接收后才会响应
        props.put("acks", "all");
        // 消息发送失败时的重试次数,设置为0表示不重试
        props.put("retries", 0);
        // 发送缓冲区等待时间,设置为0表示不等待,立即发送
        props.put(ProducerConfig.LINGER_MS_CONFIG, 0);   
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器
        // 分区器选择RoundRobinPartitioner,实现消息在主题分区间的轮询分配
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"  );
        // 创建Kafka生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 1000; i++) {
            producer.send(new ProducerRecord<String, String>("my_topic3", "88"), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("recordMetadata.partition() = " + recordMetadata.partition());
                    }
                }
            });

        }
        producer.close();
    }
}

在这里插入图片描述

1.2.3、ProducerConfig.LINGER_MS_CONFIG取 1000 值得情况,轮询

等待1秒后发送,轮询策略

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerPartitioningStrategy {

    public static void main(String[] args) {
        // 初始化Kafka生产者配置
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口
        // 确认消息写入策略,"acks"设置为all表示所有副本都确认消息接收后才会响应
        props.put("acks", "all");
        // 消息发送失败时的重试次数,设置为0表示不重试
        props.put("retries", 0);
        // 发送缓冲区等待时间,等待1秒后,发送
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器
        // 分区器选择RoundRobinPartitioner,实现消息在主题分区间的轮询分配
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"  );
        // 创建Kafka生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 1000; i++) {
            producer.send(new ProducerRecord<String, String>("my_topic3", "88"), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("recordMetadata.partition() = " + recordMetadata.partition());
                    }
                }
            });

        }
        producer.close();
    }
}

在这里插入图片描述

1.2.4、ProducerConfig.LINGER_MS_CONFIG取 1000 值得情况,不轮询

等待1秒后发送,不轮询

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerPartitioningStrategy {

    public static void main(String[] args) {
        // 初始化Kafka生产者配置
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口
        // 确认消息写入策略,"acks"设置为all表示所有副本都确认消息接收后才会响应
        props.put("acks", "all");
        // 消息发送失败时的重试次数,设置为0表示不重试
        props.put("retries", 0);
        // 发送缓冲区等待时间,等待1秒后,发送
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器
        // 分区器选择RoundRobinPartitioner,实现消息在主题分区间的轮询分配
        //props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"  );
        // 创建Kafka生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 1000; i++) {
            producer.send(new ProducerRecord<String, String>("my_topic3", "88"), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("recordMetadata.partition() = " + recordMetadata.partition());
                    }
                }
            });

        }
        producer.close();
    }
}

在这里插入图片描述

2、分区策略

kafka的生产者分区策略

  1. 默认分区策略:减少重新建立分区连接的性能损耗 开发使用最多的分区方式,采用黏性分区,默认向第一次连接上的主题分区发送消息,直到消息累积到 batch.size大小(16kb)
  2. 轮询分区策略:每个分区接收一次消息(linger.ms决定生产者一次批量发送多少条消息 到一个分区中),开发中一定不会用轮询分区策略,顶多自定义,因为轮询性能太差,频繁跟不同的分区建立连接,大数据会用轮询策略

2.1、linger.ms参数的含义

在Kafka的生产者(Producer)配置中,props.put("linger.ms", 1); 这行代码是用于设置生产者的linger.ms参数的。

linger.ms参数的含义是:生产者会在发送消息之前等待更多消息被发送到同一个分区(partition)的额外时间(以毫秒为单位)。这样做的目的是为了提高吞吐量,因为将多个消息批量发送到同一个分区可以减少网络传输的开销和服务器端的I/O开销。

具体来说,当你设置了linger.ms参数(比如设置为1毫秒),Kafka生产者会尝试在发送消息之前等待1毫秒,看看是否还有其他的消息要发送到同一个分区。如果有,这些消息将会被合并成一个批次(batch)一起发送。

注意,设置linger.ms参数可能会增加消息的延迟,因为生产者会等待指定的时间以合并更多的消息。所以,这个参数需要在吞吐量和延迟之间进行权衡。

这里是一个简化的示例,展示如何在使用Java Kafka生产者时设置linger.ms参数:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 设置linger.ms
        props.put("linger.ms", 1);

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息...

        producer.close();
    }
}

在这个示例中,我们创建了一个KafkaProducer对象,并设置了包括linger.ms在内的多个配置参数。然后,你可以使用这个生产者对象来发送消息到Kafka集群。

2.2、linger milliseconds

linger.ms 的英文全称就是 “linger milliseconds”,其中 “linger” 是指延迟或等待,“milliseconds” 是毫秒的意思。

在 Kafka 的 Producer 配置中,linger.ms 参数用于控制 Producer 在发送消息之前等待更多消息到达相同分区(partition)的时间,以便可以将这些消息一起发送,从而提高吞吐量。默认情况下,linger.ms 的值为 0,这意味着 Producer 收到消息后会立即发送,不进行任何延迟。

linger.ms 参数与 batch.size 参数一起使用时,可以实现更复杂的消息发送策略。batch.size 参数定义了单个批次(batch)中允许的最大消息字节数。当 Producer 收到消息时,它会尝试将消息添加到当前批次中。如果linger.ms 大于 0,并且当前批次中的消息数量尚未达到 batch.size 的限制,那么 Producer 会等待 linger.ms 指定的时间,看看是否还有更多的消息要发送到相同的分区。如果有,这些消息将被添加到当前批次中;如果没有,那么在当前时间到达后,Producer 将发送当前批次中的所有消息。

需要注意的是,linger.ms 参数的值应该根据具体的业务场景和性能需求进行调整。较小的值可以提高消息的实时性,但可能会降低吞吐量;较大的值可以提高吞吐量,但可能会增加消息的延迟。因此,在实际应用中需要根据实际情况进行权衡和选择。

2.3、linger.ms配置参数的理解

在Kafka中,linger.ms是一个配置参数,用于控制生产者(producer)在发送消息到broker之前的等待时间,以便将更多的消息累积到同一批次中,从而提高吞吐量。linger.ms的取值可以是任何非负整数,表示毫秒数。

以下是关于linger.ms的一些关键点:

  • 如果linger.ms设置为0,生产者会立即发送消息到broker,不会等待其他消息来累积到同一批次。
  • 如果linger.ms设置为大于0的值,生产者会等待该指定的毫秒数,或者直到达到batch.size(批次大小)的限制,然后将累积的消息作为一个批次发送到broker。
  • 增大linger.ms的值可能会提高吞吐量,因为可以累积更多的消息到同一批次中,减少网络传输的次数。但是,这也会增加消息的延迟。
  • linger.ms的取值可以根据具体的应用场景和需求进行调整。在需要低延迟的场景中,可以将linger.ms设置为较小的值;在可以容忍一定延迟的场景中,可以尝试增大linger.ms的值以提高吞吐量。

综上所述,linger.ms的取值并没有固定的几个选项,而是可以根据实际需求设置为任何非负整数。在配置Kafka生产者时,需要根据具体的业务场景和需求来选择合适的linger.ms值。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1714707.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

英语学习笔记29——Come in, Amy!

Come in, Amy! 进来&#xff0c;艾米&#xff01; shut v. 关严 区别&#xff1a;shut the door 把门关紧 口语&#xff1a;Shut up! 闭嘴&#xff01;    态度强硬&#xff0c;不礼貌 例句&#xff1a;请不要把门关严。    Don’t shut the door, please. bedroom n. …

Android Studio无法改变Button背景颜色解决办法

大家好&#xff0c;我是咕噜铁蛋&#xff01;今天我来和大家探讨一个在Android开发中常见但可能让初学者感到困惑的问题——如何在Android Studio中改变Button的背景颜色。这个问题看似简单&#xff0c;但实际操作中可能会遇到一些意想不到的挑战。接下来&#xff0c;我将从多个…

论文笔记:Vision GNN: An Image is Worth Graph of Nodes

neurips 2022 首次将图神经网络用于视觉任务&#xff0c;同时能取得很好的效果 1 方法 2 架构 在计算机视觉领域&#xff0c;常用的 transformer 通常是 isotropic 的架构&#xff08;如 ViT&#xff09;&#xff0c;而 CNN 更喜欢使用 pyramid 架构&#xff08;如 ResNet&am…

Day 6:2981. 找出出现至少三次的最长特殊子字符串 I

Leetcode 2981. 找出出现至少三次的最长特殊子字符串 I 给你一个仅由小写英文字母组成的字符串 s 。 如果一个字符串仅由单一字符组成&#xff0c;那么它被称为 特殊 字符串。例如&#xff0c;字符串 “abc” 不是特殊字符串&#xff0c;而字符串 “ddd”、“zz” 和 “f” 是特…

计算字符串的长度

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 由于不同的字符所占字节数不同&#xff0c;所以要计算字符串的长度&#xff0c;需要先了解各字符所占的字节数。在Python中&#xff0c;数字、英文、…

万亿应急国债项目之通信指挥类应急装备多链路聚合通信设备在应急行业中的重要作用

万亿应急国债项目的推出&#xff0c;无疑是我国在应急领域的一次重大举措。在这一宏大蓝图中&#xff0c;通信指挥类应急装备的多链路聚合通信设备显得尤为重要&#xff0c;其在应急行业中所发挥的作用&#xff0c;堪称不可或缺的关键一环。 通信指挥是应急响应中的核心环节&a…

【开源项目】Excel数据表自动生成工具v1.0版

一、介绍 Excel数据表自动生成工具是Go语言编写的一款小型工具软件&#xff0c;用于将特定的Excel表格内容导出为多种编程语言的代码或可以直接读取的数据内容。 开源Github地址&#xff1a;https://github.com/SkyCreator/goproj 二、版本v1.0功能概览 1.编程语言支持 目前…

ch2应用层--计算机网络期末复习

2.1应用层协议原理 网络应用程序位于应用层 开发网络应用程序: 写出能够在不同的端系统上通过网络彼此通信的程序 2.1.1网络应用程序体系结构分类: 客户机/服务器结构 服务器: 总是打开(always-on)具有固定的、众所周知的IP地址 主机群集常被用于创建强大的虚拟服务器 客…

【漏洞复现】大华智能物联综合管理平台 fastjson远程代码执行漏洞

0x01 产品简介 大华ICC智能物联综合管理平台对技术组件进行模块化和松耦合&#xff0c;将解决方案分层分级&#xff0c;提高面向智慧物联的数据接入与生态合作能力。 0x02 漏洞概述 由于大华智能物联综合管理平台使用了存在漏洞的Fastson组件,未经身份验让的攻击者可利用 /e…

探索Codigger文件管理器(File Explorer)的创新与实用性

在数字时代&#xff0c;文件资源管理器作为桌面环境中不可或缺的一部分&#xff0c;承担着管理文件和文件夹的重要职责。Codigger文件管理器&#xff08;File Explorer&#xff09;以其独特的创新和实用性&#xff0c;为用户提供了高效、便捷的文件管理体验。 Codigger文件管理…

AVL树的模拟实现

我们上期提到了二叉搜索树&#xff0c;只是简单的讲了一下原理&#xff0c;那么今天我们就讲一下AVL树。 目录 AVL树的概念AVL树的实现AVL树的架构insert插入引用pair对象引进parent指针仅插入数据调节平衡因子情况1&#xff1a;插入在父亲的右边&#xff0c;父亲的平衡因子后…

一种最大重叠离散小波包特征提取和支持向量机的ECG心电信号分类方法(MATLAB 2018)

目前小波分析算法常采用Mallat快速算法。该算法由与滤波器卷积、隔点采样和隔点插零等三个环节组成。由于实际使用的滤波器并不具有理想频域特性&#xff0c;使得在标准二进小波算法中存在着频率混叠和小波系数失真等缺点&#xff0c;在标准二进小波包算法中还存在频带错乱现象…

docker制作高版本jdk17镜像踩坑

1、创建目录并下载jdk上传到服务器中 从jdk官网下载jdk17镜像&#xff0c;提示&#xff1a;下载到本地用xftp上传到服务器&#xff08;速度会快点&#xff09; jdk官网&#xff1a;https://www.oracle.com/java/technologies/downloads/#graalvmjava21 创建目录&#xff0c;将…

MATLAB分类与判别模型算法:基于Fisher算法的分类程序【含Matlab源码 MX_002期】

算法思路介绍&#xff1a; 费舍尔线性判别分析&#xff08;Fishers Linear Discriminant Analysis&#xff0c;简称 LDA&#xff09;&#xff0c;用于将两个类别的数据点进行二分类。以下是代码的整体思路&#xff1a; 生成数据&#xff1a; 使用 randn 函数生成随机数&#x…

P10-P11【重载,模板,泛化和特化】【分配器的实现】

三类模板&#xff08;类模板&#xff09;&#xff08;函数模板&#xff09;&#xff08;成员函数模板&#xff09; 特化 偏特化&#xff1a;模板参数个数/模板范围 定义的分配器 以上分配器的性能和内存管理有很大不足&#xff08;在分配内存时&#xff0c;会产生很大的内存开…

UE5中绘制饼状图

饼状图 使用UE绘制前提完整的创建过程123456678 附录代码.h代码.c代码 使用UE绘制前提 EPIC Game使用的版本是Unreal Engine 5.0.3。 没有使用其他额外的插件&#xff0c;使用的是C和Ui共同绘制。 C编译器使用的是VS2019。 完整的创建过程 1 首先在UE中随意一种项目的白色。…

博物馆三维实景vr展示

VR技术应用到地产行业的优势不言而喻&#xff0c;随着购房政策的进一步放宽&#xff0c;购房刚需者借助VR商铺样板间展示系统看房&#xff0c;远比之前跑楼盘更便捷高效。那么VR商铺全景展示具体有哪些好处呢? VR技术与商铺的结合&#xff0c;为客户带来了前所未有的购房体验。…

python基础知识总结(第一节)

一、python简介&#xff1a; Python是一种解释型&#xff0c;面向对象的高级语言。 Pyhton的语法和动态类型&#xff0c;以及解释性语言的本质&#xff0c;使它一跃成为多数平台上写脚本和快速开发应用的编程语言。 python语言百度百科介绍 二、Python基础语法&#xff1a;…

国产操作系统上apt命令详解 _ 统信 _ 麒麟 _ 中科方德

原文链接&#xff1a;国产操作系统上apt命令详解 | 统信 | 麒麟 | 中科方德 Hello&#xff0c;大家好啊&#xff01;今天给大家带来一篇在国产操作系统上使用apt命令的详解文章。apt&#xff08;Advanced Package Tool&#xff09;是Debian及其衍生发行版&#xff08;如统信UOS…

宁盾身份域管与天翼云电脑完成兼容互认证

近日&#xff0c;宁盾身份域管与天翼云电脑完成兼容互认证&#xff01;这是继中兴、深信服、升腾威讯云桌面/云电脑后&#xff0c;宁盾对接的第4个国产云桌面品牌。企业在引入国产云桌面时&#xff0c;同时会考虑微软AD目录的替代方案。宁盾国产化身份域管对接天翼云电脑从终端…