Kafka生产者——消息发送流程,同步、异步发送API

news2025/1/18 17:07:59

生产者消息发送流程

发送原理

Kafka的Producer发送消息采用的是异步发送的方式。
在消息发送的过程中,涉及到了两个线程:main线程和Sender线程,以及一个线程共享变量:RecordAccumulator。
①main线程中创建了一个双端队列RecordAccumulator,将消息发送给RecordAccumulator。
②Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
在这里插入图片描述

  • batch.size: 只有数据积累到batch.size之后,sender才会发送数据。默认16k
  • linger.ms: 如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。
    0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader收到数据后应答。-1 ( al1) :生产者发送过来的数据,Leader和和ISR队列里面的所有节点收齐数据后应答。-1和al1等价。

生产者重要参数列表

  • bootstrap.servers: 生产者连接集群所需的broker地址清单。可以设置1个或者多个,中间用逗号隔开。生产者从给定的broker里查找到其他broker信息。
  • key.serializer、 value.serializer: 指定发送消息的key和value的序列化类型。要写全类名。(反射获取)
  • buffer.memory: RecordAccumulator缓冲区总大小,默认32m。
  • batch.size: 缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
  • linger.ms: 如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。
  • acks:
    0:生产者发送过来的数据,不需要等数据落盘应答。
    1:生产者发送过来的数据,Leader数据落盘后应答。
    -1(all):生产者发送过来的数据,Leader和isr队列里面的所有节点数据都落盘后应答。默认值是-1
  • max.in.flight.requests.per.connection: 允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。
  • Retries(重试): 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
  • retry.backoff.ms: 两次重试之间的时间间隔,默认是100ms。
  • enable.idempotence: 是否开启幂等性,默认true,开启幂等性。
  • compression.type 生产者发送的所有数据的压缩方式。默认是none,不压缩。 支持压缩类型:none、gzip、snappy、lz4和zstd。

异步发送API

普通异步发送

  1. 需求:创建Kafka生产者,采用异步的方式发送到Kafka broker

  2. 异步发送流程如下:
    在这里插入图片描述
    batch.size: 只有数据积累到batch.size之后,sender才会发送数据。默认16k
    linger.ms: 如果数据迟迟未达到batch.size,sender等待lingerms设置的时间到了之后就会发送数据。单位ms,默认值是Oms,表示没有延迟。

  3. 代码编写
    1)创建工程kafka-demo
    2)导入依赖

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
</dependencies>

3)创建包名:com.taohua.kafka.producer
4)编写代码:不带回调函数的API

package com.taohua.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class CustomProducer {
    public static void main(String[] args) throws InterruptedException {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息
        properties.put("bootstrap.servers","hadoop102:9092");

        // key,value序列化
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }
} 

5)测试:
在hadoop102上开启kafka消费者

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

在IDEA中执行上述代码,观察hadoop102消费者输出

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka0
kafka1
kafka2
kafka3
……

带回调函数的异步发送

  1. 回调函数callback()会在producer收到ack时调用,为异步调用。
    该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。
    ·如果Exception为null,说明消息发送成功,
    ·如果Exception不为null,说明消息发送失败。
  2. 带回调函数的异步调用发送流程
    在这里插入图片描述
    batch.size: 只有数据积累到batch.size之后,sender才会发送数据。默认16k
    linger.ns: 如果数据迟迟未达到batch.size,sender等待linger:ms设置的时间到了之后就会发送数据。单位ms,默认值是Oms,表示没有延迟。
  3. 编写代码:带回调函数的生产者
package com.taohua.kafka.producer;

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class CustomProducerCallback {
    public static void main(String[] args) throws InterruptedException {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息
        properties.put("bootstrap.servers", "hadoop102:9092");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value序列化(必须)
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            // 添加回调
            kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i), new Callback() {
                // 该方法在Producer收到ack时调用,为异步调用
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) 
                        // 没有异常,输出信息到控制台
                        System.out.println("主题"+recordMetadata.topic() +", 分区:"+recordMetadata.partition()+", 偏移量:"+recordMetadata.offset());
                }
            });
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }
}
  1. 测试
    1)在hadoop102上开启kafka消费者
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

2)在IDEA中执行代码,观察hadoop102消费者输出

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka0
kafka1
kafka2
……

3)在IDEA控制台观察回调信息

主题first, 分区:0, 偏移量:10
主题first, 分区:0, 偏移量:11
主题first, 分区:0, 偏移量:12
主题first, 分区:0, 偏移量:13
主题first, 分区:0, 偏移量:14
主题first, 分区:0, 偏移量:15
主题first, 分区:0, 偏移量:16
主题first, 分区:0, 偏移量:17
主题first, 分区:0, 偏移量:18
主题first, 分区:0, 偏移量:19
……

同步发送API

  1. 同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
    由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。
  2. 同步发送流程示意图如下:
    在这里插入图片描述
    batch.size: 只有数据积累到batch.size之后,sender才会发送数据。默认16k
    linger.ns: 如果数据迟迟未达到batch.size,sender等待linger:ms设置的时间到了之后就会发送数据。单位ms,默认值是Oms,表示没有延迟。
  3. 编写代码:同步发送消息的生产者
package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ConsumerProducerSync {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息
        //properties.put("bootstrap.servers","hadoop102:9092");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        // key,value序列化(必须)
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {

            // 同步发送
            kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }
}

  1. 测试

1)在hadoop102上开启kafka消费者

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

2)在IDEA中执行代码,观察hadoop102消费者的消费情况

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka0
kafka1
kafka2
……

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/155528.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Boot 创建和使用

Spring Boot 创建和使用一、什么是 Spring Boot二、Spring Boot 优点三、Spring Boot 项目创建3.1 使用 Idea 创建验证3.2 网页版创建四、项目目录介绍五、约定大于配置 (重要)5.1 启动类5.2 自定义类在目录中的位置一、什么是 Spring Boot Spring 的诞⽣是为了简化 Java 程序…

《架构300讲》学习笔记(51-100)

前言 内容来自B站IT老齐架构300讲内容。 053动静分离 静态数据&#xff1a;无个性化的数据&#xff0c;静态文件&#xff0c;低频变动的数据。 动态数据&#xff1a;个性化推荐&#xff0c;高频写。 有效的区分页面中的动静数据是优化的关键前提。 页面伪静态化技术&#x…

【Leetcode】308. 二维区域和检索 - 可变

一、题目 1、题目描述 给你一个 2D 矩阵 matrix&#xff0c;请计算出从左上角 (row1, col1) 到右下角 (row2, col2) 组成的矩形中所有元素的和。 实现 NumMatrix 类&#xff1a; NumMatrix(int[][] matrix) 用整数矩阵 matrix 初始化对象。void update(int row, int col, i…

OpenCv相机标定——圆形标定板标定

提取角点时与黑白棋盘格差别主要在于寻找角点的函数&#xff0c;只需将第一章内第二段代码 ret, corners1 cv.findChessboardCorners(img_gray, (w, h)) # 寻找内角点改为 ret, corners1 cv.findCirclesGrid(img_gray, (w, h)) # 寻找内角点&#xff0c;更详细的内容参考第一…

盘点| 能够实现小程序开发提效的框架/工具有这些

近年来&#xff0c;为了研发效率的提升&#xff0c;技术高频革新&#xff0c;开发者们纷纷表示&#xff1a;“好是好&#xff0c;就是快学不动了&#xff01;”。开发者们在不断学习新语言、框架、工具等内容的同时&#xff0c;也在担心所学是否真正有用。而小程序其实能够帮助…

9、Javaweb_http响应概念Response+验证码案例ServletContext+文件下载

HTTP协议&#xff1a; 1. 请求消息&#xff1a;客户端发送给服务器端的数据 * 数据格式&#xff1a; 1. 请求行 2. 请求头 3. 请求空行 4. 请求体 2. 响应消息&#xff1a;服务器端发送给客户端的数据 * 数据格式&#xff1a; …

Nginx 常用配置、操作详解

学习每个技术都要有目标&#xff0c;比如说要源码精通gRPC实现原理&#xff0c;要熟练应用Prometheus、Gin&#xff0c;以及Nginx&#xff0c;Nginx个人定位目标是不需要深入了解技术原理、更不要阅读源码&#xff0c;只需要在自己使用的时候能通过本文章快速检索就够了。 在看…

Graphing calculator PRO

Graphing calculator PRO计算器是一个专业的计算器&#xff0c;它也是编译的&#xff0c;也是学生和学生需要的工具。该程序旨在取代大型和昂贵的图形计算。此外&#xff0c;它在手机或广告牌显示屏上以更高的质量显示计算&#xff0c;这使其更易于理解。Mathlab提供的计算器是…

【云原生进阶之容器】第三章List-Watch机制3.1节-- List-Watch机制剖析

1 list-watch机制 1.1 list-watch介绍 Kubernetes 是通过 List-Watch 的机制进行每个组件的协作,保持数据同步的,每个组件之间的设计实现了解耦。 用户是通过 kubectl 根据配置文件,向 APIServer 发送命令,在 Node 节点上面建立 Pod 和 Container。 APIServer 经过 API 调…

回收租赁商城系统功能拆解06讲-商品评价

回收租赁系统适用于物品回收、物品租赁、二手买卖交易等三大场景。 可以快速帮助企业搭建类似闲鱼回收/爱回收/爱租机/人人租等回收租赁商城。 回收租赁系统支持智能评估回收价格&#xff0c;后台调整最终回收价&#xff0c;用户同意回收后系统即刻放款&#xff0c;用户微信零…

Ubuntu20.04 (ROS noetic) 运行 Vins-Fusion

参考博客&#xff1a;Ubuntu20.04 运行 Vins-Fusion&#xff0c;问题没有完全解决&#xff0c;所以自己写了一篇Vins-Fusion 开源地址&#xff1a;https://github.com/HKUST-Aerial-Robotics/VINS-Fusion由于仅支持ROS kinetic 和 melodic&#xff0c;所以在Ubuntu20.04对应的R…

【Nginx】Nginx的安装

1. 基于apt源安装 1.1 安装1.2 测试安装是否成功1.3 卸载 1. 停止nginx服务2. 删除nginx&#xff0c;-purge包括配置文件3. 移除全部不使用的软件包4. 罗列出与nginx相关的软件并删除5. 查看nginx正在运行的进程&#xff0c;如果有就kill掉 2. 通过源码包编译安装 1. 安装各种…

Java 开源开发平台 O2OA V7.3 发布,新增带权限的全文检索等重要功能

O2OA 自产品发布以来&#xff0c;我们收到了很多伙伴对产品的宝贵建议和意见&#xff0c;在 2022 年的最后一个版本里&#xff0c;我们为伙伴们又提供了新的能力&#xff0c;v7.3 版本正式发布&#xff0c;对平台做了更多的优化。一、平台架构新增带权限的全文检索协同办公领域…

MATLAB-拉格朗日插值运算

在结点上给出结点基函数&#xff0c;接着做该基函数的线性组合&#xff0c;组合的系数为结点的函数值,这种插值多项式称为拉格朗日插值公式。通俗地说&#xff0c;就是通过平面上的两个点确定一条直线。该插值方法是一种较为基础的方法&#xff0c;同时该方法也较容易理解与实现…

Go语言结构

Go语言结构 知识主要参考菜鸟教程。 简单实例 Go语言的基础组成有以下几个部分&#xff1a; 包声明引入包函数变量语句 & 发表达式注释 package mainimport "fmt"func main() {/*这是一行注释*/fmt.Println("hello,world") }上述程序各个部分组成&am…

北大硕士LeetCode算法专题课-字符串相关问题

算法面试相关专题&#xff1a; 北大硕士LeetCode算法专题课-数组相关问题_骨灰级收藏家的博客-CSDN博客 北大硕士LeetCode算法专题课---算法复杂度介绍_骨灰级收藏家的博客-CSDN博客 北大硕士LeetCode算法专题课-基础算法之排序_骨灰级收藏家的博客-CSDN博客 反转字符串(Lee…

springcloud3 Nacos的服务搭建和生产消费案例

一 nacos 1.1 nacos概念 Nacos是服务注册发现中心配置中心的组合。比eurka实现的功能更加强大。 nacos默认均有负载均衡的功能&#xff0c;集成了netflix的ribbon代码包。 1.2 nacos与其他进行对别 1.3 nacos的配置 1.4 namespace和group和dataid之间的关系 二 nacos的安…

2023年网络工程师必备10大软件,最新安装包分享

常听人说&#xff1a;拳头再硬&#xff0c;也比不上锤子&#xff01;同样的&#xff0c;作为一个网络工程师只有满腹的技术而不会使用对应的软件工具&#xff0c;是完全不行的。那作为一个2023年的网络工程师必备的软件有哪些呢&#xff1f;以下10大网工必备软件都已整理好安装…

smsalarm怎么读取intouch系统平台的点

有两种方式&#xff0c;分别是DDE和OPC方式 DDE方式 在SMC里面添加SIDIR驱动&#xff0c;连接到1200PLC 在IDE中创建对象DT01并绑定到驱动上 可看到已经可以读取到值了 打开smsalarm 8.26 创建一个DDE连接 创建一个DDE逻辑组 创建一个tag. 名称可以填任意字符串&#xff0c;…

SpringBoot在使用测试的时候是否需要@RunWith?

我们在使用SpringBoot进行测试的时候一般是需要加两个注解&#xff1a; SpringBootTest 目的是加载ApplicationContext&#xff0c;启动spring容器。 RunWith 是一个测试启动器&#xff0c;可以加载SpringBoot测试注解让测试在Spring容器环境下执行。如测试类中无此注解&#…