Kafka生产者概述

news2024/12/26 22:02:41

【Kafka】Kafka生产者概述

文章目录

  • 【Kafka】Kafka生产者概述
    • 1. 生产者
      • 1.1 生产者消息发送流程
        • 1.1.1 发送原理
        • 1.1.2 生产者重要参数列表
      • 1.2 异步发送 API
        • 1.2.1 普通异步发送
        • 1.2.2 带回调函数的异步发送

1. 生产者

1.1 生产者消息发送流程

1.1.1 发送原理

在消息发送过程中,涉及到了两个线程—— main 线程和 Sender 线程。在 main 线程 中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator, Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

image-20230706214345470


1.1.2 生产者重要参数列表

参数名称描述
bootstrap.servers生产者连接集群所需的 broker 地 址 清 单 。 例 如 node1:9092,node2:9092,node3:9092,可以 设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息。
key.serializer 和 value.serializer指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memoryRecordAccumulator 缓冲区总大小,默认 32m
batch.size缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
acks0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader 收到数据后应答。
-1(all):生产者发送过来的数据,Leader+和 isr 队列 里面的所有节点收齐数据后应答。
默认值是-1,-1 和 all 是等价的。
max.in.flight.requests.per.connection允许最多没有返回 ack 的次数,默认为 5,开启幂等性 要保证该值是 1-5 的数字。
retries当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时候,其他的消息可能发送 成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是 100ms
enable.idempotence是否开启幂等性,默认 true,开启幂等性。
compression.type生产者发送的所有数据的压缩方式。默认是 none,也 就是不压缩。
支持压缩类型:none、gzip、snappy、lz4 和 zstd。

1.2 异步发送 API

1.2.1 普通异步发送

**需求:**创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker

创建一个maven功能,导入如下依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
</dependency>

编写一个不带回调函数的API代码:

public class CustomProducer {
    public static void main(String[] args) {
        //0.配置
        Properties properties = new Properties();
        //连接集群 bootstrap.server
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.66:9092,192.168.101.67:9092,192.168.101.68:9092");
        //指定对应的 key 和 value 的序列化类型(key.serializer,)
        //properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //1.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        //2.发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "hello,world" + i));
        }

        //3.关闭资源
        kafkaProducer.close();
    }
}

在node1节点中开启kafka消费者 bin/kafka-console-consumer.sh -- bootstrap-server node1:9092 --from-beginning --topic first ,控制台收到如下消息:

image-20230706224656919


1.2.2 带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试

public class CustomProducerCallback {
    public static void main(String[] args) {
        //0.配置
        Properties properties = new Properties();
        //连接集群 bootstrap.server
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.66:9092,192.168.101.67:9092,192.168.101.68:9092");
        //指定对应的 key 和 value 的序列化类型(key.serializer,)
        //properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //1.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        //2.发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "hello,world" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题:" + recordMetadata.topic() + "\t分区" + recordMetadata.partition());
                    }
                }
            });
        }

        //3.关闭资源
        kafkaProducer.close();
    }
}

运行程序之后,node1节点收到消息,idea控制台输出如下消息:

image-20230706225153138


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/725927.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

企业数字化转型成功的标准是什么?

​近几年来&#xff0c;数字化转型一直是企业管理者的热议话题&#xff0c;那么&#xff0c;到底该以什么标准来衡量转型成效&#xff0c;又如何向管理者交出一份满意的答卷呢&#xff1f; 为了确保转型效果与目标的一致性&#xff0c;在规划数字化转型之初&#xff0c;选择正…

JVM — JDK11垃圾回收器 ZGC

1. ZGC介绍 ZGC&#xff08;The Z Garbage Collector&#xff09;是 JDK 11 中推出的一款低延迟垃圾回收器&#xff0c;为实现以下几个目标而诞生的垃圾回收器&#xff0c;停顿时间不超过 10ms&#xff0c;停顿时间不会因堆变大而变长&#xff0c;支持 8MB~4TB 级别的堆&#…

【专题速递】MD-VQA、AB实验、音视频质量建设以及在手机上的应用

// 怎样才能更好地进行QoE优化&#xff1f;音视频技术在用户侧的挑战又是什么&#xff1f;7月29日LiveVideoStackCon上海站QoE与数据驱动专场&#xff0c;为您解答。 QoE与数据驱动 在音视频应用里&#xff0c;获得了大量的用户上报数据&#xff0c;包括但不限于音视频质量数…

ASM汇编语言环境安装

以前是学习过8位单片机的&#xff0c;忘记的差不多了。现在需要使用64位的汇编语言&#xff0c;准备重新学习。 64位的编程环境使用ebe&#xff0c;sf上有的下载&#xff1a; 这个软件不错&#xff0c;可以调试64位的汇编语言&#xff0c;寄存器也可以实时查看。 32位的编程开…

pcl经典算法60例——(1)打开并显示点云,窗口PCLVisualizer嵌入MFC的picture control

一、搭建MFC框架 1、环境说明 本教程为vs2022&#xff0c;pcl1.12.1版本&#xff0c;其他版本自己进行适当修改&#xff0c;仅供参考。 2、方法步骤 (1)新建项目&#xff0c;选择“基于对话框”&#xff0c;然后点击“下一步” 二、配置pcl环境 关于配置环境&#xff0c;网…

centos安装常见软件

安装git # 方式一&#xff1a;yum install git -y# 方式二&#xff1a;&#xff08;开发会用的软件&#xff09;yum -y groupinstall "Development tools"# 执行下面这条yum install openssl-devel bzip2-devel expat-devel gdbm-devel readline-devel sqlite-devel…

点云实战及Python路径实验

点云实战 文章目录 点云实战python 有关路径实验下一级目录上一级目录 学习PointNet论文&#xff08;https://arxiv.org/abs/1612.00593&#xff09;并实践 python .\show_seg.py --dataset ../data/shapenet --model .\seg\seg_model_Chair_2.pth python 有关路径实验 下一级…

kotlin Flow系列之 - 冷流SafefFlow源码解析之 - Safe在那里?

本文涉及源码基于kotlinx-coroutines-core-jvm:1.7.1 kotlin 协成系列文章: 你真的了解kotlin中协程的suspendCoroutine原理吗? Kotlin Channel系列&#xff08;一&#xff09;之读懂Channel每一行源码 kotlin Flow系列之-SharedFlow源码解析 kotlin Flow系列之-StateFlow源码…

【java】隐藏手机号中间四位

String phone "12334543437";phone phone.replaceAll("(\\d{3})\\d{4}(\\d{4})", "$1****$2");System.out.println(phone);

Mycat【什么是Mycat、Mycat与其他中间件区别、Mycat应用场景、核心概念详解、Mycat原理】(一)-全面详解(学习总结---从入门到深化)

目录 Mycat概述_什么是Mycat Mycat概述_Mycat与其他中间件区别 Mycat概述_Mycat应用场景 Mycat概念_核心概念详解 Mycat概述_Mycat原理 Mycat部署安装_MySQL主从复制概述 Mycat概述_什么是Mycat 什么是Mycat Mycat是数据库中间件&#xff0c;所谓中间件数据库中间件是连…

CentOS详细安装教程

文章目录 前言一、下载所需版本的 CentOS二、创建虚拟机三、安装 CentOS 前言 本文在虚拟机上安装 CentOS Linux release 7.6.1810 版本的操作系统&#xff0c;仅作为安装记录。 一、下载所需版本的 CentOS 1、进入 CentOS 官网&#xff1a;https://www.centos.org/download/…

React类组件

1. React组件 将页面按照界面功能进行拆分&#xff0c;每一块界面都拥有自己的独立逻辑&#xff0c;这样可以提高项目代码的可维护性。其中React组件分为两种&#xff0c;一种是类式组件&#xff0c;一种是函数式组件。这里我们将的是比较常用的类式组件&#xff0c;但是在后续…

《安全软件开发框架(SSDF) 1.1:降低软件漏洞风险的建议》解读(六)

安全软件开发框架SSDF是由美国国家标准与技术研究院发布的关于安全软件开发的一组实践&#xff0c;帮助开发组织减少发布的软件中的漏洞数量&#xff0c;减少利用未检测到或未解决的漏洞的潜在影响&#xff0c;从根本上解决漏洞防止再次发生。本文根据《Secure Software Develo…

dedecms后台数据库备份迁移流程

dedecms网站正常使用需要两部分,网站文件和数据库.两者缺一不可. dedecms上传网站文件后,还要导入数据库,如果您只有网站后台备份,没有其他格式sql备份文件,请按照下面流程重装dedecms,并操作恢复数据库 . 需要选确定/wwwroot/data/backupdata/下是否有对应备份 如不存在备份…

机器学习面试题 - 模型评估2

目录标题 8、为什么在一些场景中要使用余弦相似度而不是欧氏距离&#xff1f;9、余弦距离是否是一个严格定义的距离?10、在对模型进行过充分的离线评估之后&#xff0c;为什么还要进行在线A/B测试&#xff1f;11、如何进行线上A/B测试&#xff1f;12、如何划分实验组和对照组1…

ChatGPT落地场景探索-数据库与大模型

目录 openGauss介绍 openGauss介绍 数据库与大模型 openGauss介绍 大模型与数据库 大模型为数据库带来的机遇 大模型解决数据库问题的挑战 数据库为大模型带来的价值 大模型大模型的发展趋势 趋势产品&#xff1a;Chat2DB 简介 特性 生产应用&#xff1a;基…

运输层:TCP超时重传时间的选择

1.运输层&#xff1a;TCP超时重传时间的选择 笔记来源&#xff1a; 湖科大教书匠&#xff1a;TCP超时重传时间的选择 声明&#xff1a;该学习笔记来自湖科大教书匠&#xff0c;笔记仅做学习参考 若将超时重传时间RTO < \lt < 报文段0的往返时间RTT0&#xff0c;则会出现…

ArcGIS、ENVI、InVEST、FRAGSTATS等多技术融合提升环境、生态、水文、农业、大气等领域数据分析

查看原文>>> ArcGIS、ENVI、InVEST、FRAGSTATS等多技术融合提升环境、生态、水文、农业、大气等领域数据分析 【内容简述】&#xff1a; 注&#xff1a;请提前自行配置上课环境 【其它相关推荐】&#xff1a; GEE遥感云大数据林业应用典型案例实践及GPT模型应用 基…

【Ubuntu】apt 更换阿里源

Ubuntu apt 更换阿里源 1.Ubuntu apt 更换阿里源1.1 找到阿里官方相关镜像1.2 找到apt的源管理文件1.3 使用命令替换其源地址1.4 刷新源信息1.5 Failed to fetch http://xxxxxxx Temporary failure resolving ‘mirrors.aliyun.com‘ 1.Ubuntu apt 更换阿里源 1.1 找到阿里官方…

Java 设计模式 随笔1 监听器/观察者

0. 不由自主&#xff0c;恍恍惚惚&#xff0c;又走回头路&#xff0c;再看一眼有过的幸福… 太棒了流沙!!! 0.1 引用 https://blog.csdn.net/majunzhu/article/details/100869562 ApplicationEvent事件机制源码分析 单机环境下优雅地使用事件驱动进行代码解耦 1. JDK 1.1 …