Kafka-生产者基本使用

news2024/9/28 5:23:36

一、生产者原理

在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。
在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator, Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

生产者原理.png

sender线程拉取数据参数:

  • batch.size:数据积累到batch.size之后,sender才会发送数据。默认16k
  • linger.ms: 如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间
    到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。

sender线程发送数据
以每个broker结点为key,value为要发送的消息请求,放在一个队列中。

selector
通过selector实现生产者和kafka集群的数据传输。

acks应答机制

  • 0:生产者发送过来的数据,不需要等数据落盘应答。
  • 1:生产者发送过来的数据,Leader收到数据后应答。
  • -1(all):生产者发送过来的数据,Leader和ISR队列 里面的所有节点收齐数据后应答。-1和all等价。

消息发送成功
sender线程中清理掉对应的成功请求,清理掉双端队列中分区的数据。
消息发送失败
再次发送请求,直到成功为止(默认重试次数为int 最大值)

在内存中,由分区器决定消息发送到哪个分区,一个分区对应一个DQueque;
内存大小默认32M,分区一批次的数据大小为16K;

二、生产者重要参数

参数名称描述
bootstrap.servers生产者连接集群所需的 broker地址清单。可以设置1个或者多个,中间用逗号隔开。这里并非需要所有的broker地址,因为生产者从给定的broker 里查找到其他 broker 信息。
key.serializer 和 value.serializer指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memoryRecordAccumulator 缓冲区总大小,默认 32M。
batch.size缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
acks0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader 收到数据后应答。 -1(all):生产者发送过来的数据,Leader+和 isr 队列 里面的所有节点收齐数据后应答。默认值是-1, -1 和 all 是等价的。
max.in.flight.requests.per.connection允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。
retries当消息发送出现错误的时候,系统会重发消息。 retries表示重试次数。默认是 int 最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是100ms。
enable.idempotence是否开启幂等性,默认 true,开启幂等性。
compression.type生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。

三、生产者消息发送

1.异步发送

外部数据发送到队列中,不需要等待上一批处理完可以继续发送。
依赖:

<dependencies> 
  <dependency>
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>3.0.0</version>
  </dependency> 
</dependencies>

生产者:

package kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @Title: MyProducer.java
 * @Package kafka
 * @Description: 生产者
 * @Author: hongcaixia
 * @Date: 2023/1/20 21:24
 * @Version V1.0
 */
public class MyProducer {

    public static void main(String[] args) {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        // 连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.2:9092");
        // 指定序列化类型 key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "record" + i));
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }

}

2.带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是:

  • 元数据信息(RecordMetadata)(主题,分区等数据)
  • 异常信息(Exception)

如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

package kafka;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
 * @Title: MyProducer.java
 * @Package kafka
 * @Description: 生产者
 * @Author: hongcaixia
 * @Date: 2023/1/20 21:24
 * @Version V1.0
 */
public class MyCallBackProducer {

    public static void main(String[] args) {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        // 连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.2:9092");
        // 指定序列化类型 key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "record" + i), (recordMetadata, exception) -> {
                if (exception == null) {
                    System.out.println("主题:" + recordMetadata.topic() + ";分区:" + recordMetadata.partition());
                }
            });
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }
}

3.同步发送

队列中必须上一批数据发送完毕,才可以进行下一批数据处理,继续发送。

package kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @Title: MyProducer.java
 * @Package kafka
 * @Description: 生产者
 * @Author: hongcaixia
 * @Date: 2023/1/20 21:24
 * @Version V1.0
 */
public class MySyncProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        // 连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.2:9092");
        // 指定序列化类型 key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "record" + i)).get();
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/175120.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ae 案例:制作粒子空间穿梭动画

本文介绍使用 Ae 插件 Stardust 制作粒子空间穿梭动画的一般方法与步骤。示例视频1、新建合成。持续时间&#xff1a;10 秒。2、新建纯色图层&#xff0c;命名为“Stardust”&#xff0c;然后添加 Stardust 效果。3、再新建一个纯色图层&#xff0c;命名为“Mask”。使用矩形工…

php宝塔搭建部署实战易优宠物用品网站源码

大家好啊&#xff0c;我是测评君&#xff0c;欢迎来到web测评。 本期给大家带来一套php开发的易优宠物用品网站源码&#xff0c;感兴趣的朋友可以自行下载学习。 技术架构 PHP7.2 nginx mysql5.7 JS CSS HTMLcnetos7以上 宝塔面板 文字搭建教程 下载源码&#xff0c;宝…

SpringBoot集成Swagger,前后端接口文档解决方案

一个不断在迭代的项目&#xff0c;Controller层与POJO层肯定会是经常变动的&#xff0c;在目前前后端分离的大环境背景下有一份接口文档可以极大减少项目组成员之间的交流成本&#xff0c;也能支持自动化测试&#xff0c;但靠人工维护该文档总是不够稳妥&#xff0c;因此我们可…

23.1.21打卡 CF-1782D Many Perfect Squares

Problem - D - Codeforces 题外话: 痛苦的 C大模拟写不出D题数论我是真菜没想到, 泪目 -------------------------------------------------------------------------------------------------------------------------------- 先抛开这题, 我们先探究下平方数的规律 1 …

容器虚拟化技术Docker(三)DockerFile、Docker部署微服务、Docker-compose容器编排、Docker监控

容器虚拟化技术Docker&#xff08;三&#xff09;DockerFile、Docker部署微服务、Docker-compose容器编排、Docker监控 不熟悉的docker的可以参考&#xff1a; 容器虚拟化技术Docker&#xff08;一&#xff09;简介、安装、常见命令、数据卷、安装常规软件 容器虚拟化技术Do…

QSslSocket::supportsSsl()返回false问题解决

1.问题的提出今天研究Qt官方自带的有关QSslSocket类用法的例子。该例子存放在Qt安装目录下的Examples\Qt-XX.XX.XX\network\securesocketclient其中XX.XX.XX为Qt的版本号&#xff0c;如&#xff1a;5.14.1。在main函数QSslSocket::supportsSsl()返回false&#xff0c;如下&…

浅析RecyclerView预加载RV-Prefetch 机制

浅析RecyclerView预加载RV-Prefetch 机制 UI渲染基本流程&#xff08;UI-Thread,Render-Thread,SurfaceFlinger&#xff09;(硬件加速开启) 当系统V-Sync信号来临时&#xff0c;会唤醒主线程&#xff0c;回调编舞者Choreographer#FrameDisplayEventReceiver#onVsync()开始这一…

HPC Game小结

PART 1 - 基础知识 一、文件读取 a. 二进制文件 mmap https://stackoverflow.com/questions/44553907/mmap-sigbus-error-and-initializing-the-file fread fwrite //readFILE* fi;if(fi fopen("input.bin", "rb")){fread(&p, sizeof(int), 1, fi)…

JVM调优实战——jvm常用参数及方法

一、创建会内存溢出的程序 pom&#xff1a; <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/ma…

Q_DISABLE_COPY、Q_DISABLE_MOVE、Q_DISABLE_COPY_MOVE用法详解及总结

1.前言在编程中&#xff0c;会用到某些资源&#xff0c;这些资源有的在整个应用程序期间是唯一的&#xff1b;是不能通过拷贝、赋值的方法存在多份的&#xff0c;如STL的std::unique_ptr指针指向的资源。现实中这样的资源有&#xff1a;文件指针、串口句柄等。试想如果存在多个…

TVM: End-to-End Optimization Stack for Deep Learning论文阅读

摘要 很多目前最为流行的深度学习框架&#xff0c;如 TensorFlow、MXNet、Caffe 和 PyTorch&#xff0c;支持在有限类型的服务器级 GPU 设备上获得加速&#xff0c;这种支持依赖于高度特化、供应商特定的 GPU 库。然而&#xff0c;专用深度学习加速器的种类越来越多&#xff0…

数据库系统概念 | 第四章:中级SQL

文章目录&#x1f4da; 连接表达式&#x1f407; 自然连接&#x1f407; 连接条件&#x1f955;natural条件&#x1f955;using 条件&#x1f955;on 条件&#x1f407; 内连接和外连接&#x1f955; 内连接inner join&#x1f955; 外连接outer join&#x1f343; 左外连接lef…

Web 应用渗透测试 00 - 信息收集

背景 这个系列写 Web 应用渗透测试相关的内容。此篇从信息收集开始&#xff0c;看一下 Web 应用端有哪些方面的信息值得渗透测试者去收集&#xff0c;能对后续的行动产生积极的影响。 Web 应用渗透测试 - 信息收集 security.txt 这个文件包含了网站的漏洞披露的联系方式。如…

Java面试题每日10问(18)

Miscellaneous Interview Questions 1. What are the advantages and disadvantages of object cloning? Advantage of Object Cloning You don’t need to write lengthy and repetitive codes. Just use an abstract class with a 4- or 5-line long clone() method.It is t…

二叉树的迭代遍历

二叉树的迭代遍历 前序遍历 基本思路 基本思路其实很简单, 使用递归遍历的时候, 一直是系统帮我们把其他数据压栈, 举个例子 > ans [5,4,6,2,1,null,null] 前序遍历的序列是: [5,4,2,1,6] , 栈的出入顺序是, 先入, 后出, 假如我们想要一个元素先出, 就要让它后入栈 基…

STC12驱动MLX90614红外测温模块在LCD1602显示

文章目录1、基本简介2、通信方式3、参考STC12例程参考文献1、基本简介 2、通信方式 通过芯片手册我们可以了解到这个模块的输出有PWM和SMBus方式&#xff0c;PWM长期做嵌入式开发的已经很熟悉了&#xff0c;那么什么是SMBus呢&#xff1f; SMBus&#xff08;系统管理总线&…

swift(3)

目录 while循环&#xff0c;repeat while循环 String基本操作 Array数组 Set集合 while循环&#xff0c;repeat while循环 import UIKitvar a0 while(a<5){print(a) }简单的while循环&#xff0c;我这一个循环下去&#xff0c;我playground直接被强制退出。 import UIK…

go的基本数据类型转换

目录 1.(整形转化)基本语法 2.小知识 3.基本数据类型和string的转换 A.fmt.Sprintf("%参数", 表达式) B. 使用 strconv 包的函数 4.string和基本数据类型转换 Go在不同类型的变量之间赋值时需要显示转换&#xff0c;不能自动转换 1.(整形转化)基本语法 A.不考…

Elasticsearch7.8.0版本高级查询—— 模糊查询文档

目录一、初始化文档数据二、模糊查询文档2.1、概述2.2、示例12.3、示例2一、初始化文档数据 在 Postman 中&#xff0c;向 ES 服务器发 POST 请求 &#xff1a;http://localhost:9200/user/_doc/1&#xff0c;请求体内容为&#xff1a; { "name":"zhangsan"…

LeetCode 1824. 最少侧跳次数

【LetMeFly】1824.最少侧跳次数 力扣题目链接&#xff1a;https://leetcode.cn/problems/minimum-sideway-jumps/ 给你一个长度为 n 的 3 跑道道路 &#xff0c;它总共包含 n 1 个 点 &#xff0c;编号为 0 到 n 。一只青蛙从 0 号点第二条跑道 出发 &#xff0c;它想要跳到…