分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

news2025/1/9 2:00:58

文章目录

    • 1. Kafka 生产者
    • 2. kafaka 命令行操作
    • 3. Kafka 生产者发送消息流程
    • 4. Kafka 生产者发送消息的3种方式
      • 1. 发送即忘记
      • 2. 同步发送
      • 3. 异步发送
    • 5. Kafka 消息对象 ProducerRecord

1. Kafka 生产者

Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多个主题(topic)。生产者可以将消息发送到指定的主题,也可以根据分区策略将消息发送到多个分区中。生产者可以以异步或同步方式发送消息,并且可以配置消息的可靠性和持久性等属性。在 Kafka 中,生产者是消息的源头,它们将消息发送到 Kafka 集群中,供消费者消费。

2. kafaka 命令行操作

① 启动 Zookeeper 集群:

[root@master01 bin]# pwd
/root/ch/soft/zk/zk-01/bin
[root@master01 bin]# ./zkServer.sh start
[root@master01 bin]# pwd
/root/ch/soft/zk/zk-02/bin
[root@master01 bin]# ./zkServer.sh start
[root@master01 bin]# pwd
/root/ch/soft/zk/zk-03/bin
[root@master01 bin]# ./zkServer.sh start

② 启动 kafka 集群:

[root@master01 kafka01]# pwd
/root/ch/soft/kafka/kafka01
[root@master01 kafka01]# bin/kafka-server-start.sh config/server.properties
[root@master01 kafka02]# pwd
/root/ch/soft/kafka/kafka02
[root@master01 kafka02]# bin/kafka-server-start.sh config/server.properties
[root@master01 kafka03]# pwd
/root/ch/soft/kafka/kafka03
[root@master01 kafka03]# bin/kafka-server-start.sh config/server.properties

③ 创建主题 test:

[root@master01 kafka01]#  bin/kafka-topics.sh --zookeeper localhost:2183 --create --partitions 3 --replication-factor 2  --topic test
Created topic test.
[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --describe --topic test
Topic:test      PartitionCount:3        ReplicationFactor:2     Configs:
Topic: test     Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
Topic: test     Partition: 1    Leader: 0       Replicas: 0,2   Isr: 0,2
Topic: test     Partition: 2    Leader: 1       Replicas: 1,0   Isr: 1,0

④ 生产者发送消息到主题test:

[root@master01 kafka01]# bin/kafka-console-producer.sh --broker-list 10.65.132.2:9093 --topic test
>hello
>你好,kafka!
>

⑤ 消费者消费主题test的消息:

[root@master01 kafka01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
hello
你好,kafka!

3. Kafka 生产者发送消息流程

在这里插入图片描述

① 首先要构造一个 ProducerRecord 对象,该对象可以声明主题Topic、分区Partition、键 Key以及值 Value,主题和值是必须要声明的,分区和键可以不用指定。

② 调用send() 方法进行消息发送。

③ 因为消息要到网络上进行传输,所以必须进行序列化,序列化器的作用就是把消息的 key 和value对象序列化成字节数组。

④ 接下来数据传到分区器,如果之间的 ProducerRecord 对象指定了分区,那么分区器将不再做任何事,直接把指定的分区返回;如果没有,那么分区器会根据 Key 来选择一个分区,选择好分区之后,生产者就知道该往哪个主题和分区发送记录了。

⑤ 接着这条记录会被添加到一个记录批次里面,这个批次里所有的消息会被发送到相同的主题和分区。会有一个独立的线程来把这些记录批次发送到相应的 Broker 上。

⑥ Broker 服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka ,就返回一个RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败, 就会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败, 就返回错误信息。

4. Kafka 生产者发送消息的3种方式

发送消息主要有三种模式:发后即忘记、同步及异步。在同步模式下,程序会一直等待某个操作完成后才会继续执行下一个操作,在异步模式下,程序可以同时执行多个操作,不会阻塞其他操作。

KafkaProducer 的 send() 方法用于向 Kafka 集群发送消息。该方法的语法如下:

public interface Producer<K, V> extends Closeable {
    Future<RecordMetadata> send(ProducerRecord<K, V> record);
    Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
}

其中,ProducerRecord<K, V> 表示要发送的消息记录,K 和 V 分别表示键和值的类型。send() 方法返回一个 Future 对象,表示异步发送消息的结果。

1. 发送即忘记

发送即忘记,生产者发送消息后不会等待服务器的响应,直接发送下一条消息。它只管往Kafka中发送消息而并不关心消息是否正确到达。在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。

public class CustomProducer01 {
    private static final String brokerList "10.65.132.2:9093";
    private static final String topic = "test";

    public static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }

    public static void main(String[] args) {
        // kafka生产者属性配置
        Properties properties = initConfig();
        // kafka生产者发送消息,默认是异步发送方式
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka!");
        try{
            // 发送消息
            kafkaProducer.send(producerRecord);
        }catch (Exception e){
            e.printStackTrace();
        }
        // 关闭资源
        kafkaProducer.close();
    }
}

cmd命令行窗口开启 kafka 消息者,观察消费者是否接收到消息:

[root@master01 kafka01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!

2. 同步发送

send()方法本身就是异步的,send()方法返回的Future对象可以使调用方稍后获得发送的结果。在执行send()方法之后可以调用 get()方法来阻塞等待Kafka的响应,直到消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交由外层逻辑处理。

Future 接口源码:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Future接口是Java中用于表示异步计算结果的接口。它定义了一些方法,用于查询异步计算是否完成、获取计算结果等操作。

  • cancel方法用于取消异步计算;
  • isCancelled方法用于判断异步计算是否已经被取消;
  • isDone方法用于判断异步计算是否已经完成。
  • get方法用于获取异步计算的结果,如果计算还没有完成,则该方法会阻塞直到计算完成。如果计算被取消,则该方法会抛出CancellationException异常。如果计算抛出异常,则该方法会抛出ExecutionException异常。
  • get(long timeout, TimeUnit unit)方法与get方法类似,但是它会在指定的时间内等待计算完成,如果超时则会抛出TimeoutException异常。

Future 表示一个任务的生命周期,并提供了相应的方法来判断任务是否已经完成或取消,以及获取任务的结果和取消任务等。既然KafkaProducer.send()方法的返回值是一个Future类型的对象,那么完全可以用Java语言层面的技巧来丰富应用的实现,比如使用Future中的 get(long timeout,TimeUnit unit)方法实现可超时的阻塞。

public class CustomProducer01 {
    private static final String brokerList = "10.65.132.2:9093";
    private static final String topic = "test";

    public static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }

    public static void main(String[] args) {
        // kafka生产者属性配置
        Properties properties = initConfig();
        // kafka生产者发送消息,默认是异步发送方式
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka,同步发送!");
        try{
            // 发送消息
            Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
            // 获取异步计算的结果,如果计算还没有完成,则该方法会阻塞直到计算完成
            RecordMetadata recordMetadata = future.get();
            System.out.println("metadata.topic() = " + recordMetadata.topic());
        }catch (Exception e){
            e.printStackTrace();
        }
        // 关闭资源
        kafkaProducer.close();
    }
}
[root@master01 kafka01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!
你好,kafka,同步发送!

在RecordMetadata对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。

3. 异步发送

生产者发送消息后不会等待服务器的响应,而是通过回调函数来处理服务器的响应。回调函数会在 producer 收到 ack 时调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

public class CustomProducer01 {
    private static final String brokerList = "10.65.132.2:9093";
    private static final String topic = "test";

    public static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }

    public static void main(String[] args) {
        // kafka生产者属性配置
        Properties properties = initConfig();
        // kafka生产者发送消息,默认是异步发送方式
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka,异步发送带返回值!");
        try{
            // 发送消息
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    // 说明消息发送成功
                    if(e==null){
                        System.out.println("metadata.topic() = " + recordMetadata.topic());
                        System.out.println("metadata.partition() = " + recordMetadata.partition());
                    }
                }
            });
        }catch (Exception e){
            e.printStackTrace();
        }
        // 关闭资源
        kafkaProducer.close();
    }
}
[root@master01 kafka01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!
你好,kafka,同步发送!
你好,kafka,异步发送带回调函数!

Kafka生产者异步发送消息时,可以通过指定回调函数来处理发送结果。当消息发送完成后,回调函数会被调用,以通知应用程序消息发送的结果。具体来说,当生产者成功发送消息时,回调函数会被传递一个RecordMetadata对象,该对象包含了发送消息的相关信息,如消息所在的分区、消息在分区中的偏移量等。如果发送消息失败,则回调函数会被传递一个非空的Exception对象,以指示发送失败的原因。

需要注意的是,回调函数是在生产者的I/O线程中被调用的,因此应该尽量避免在回调函数中执行耗时的操作,以免影响生产者的性能。

5. Kafka 消息对象 ProducerRecord

① ProducerRecord 成员变量:

public class ProducerRecord<K, V> {
    // 消息要发送到的主题
    private final String topic;
    // 消息要发送到的分区号,如果为null,则由Kafka自动选择分区
    private final Integer partition;
    // 消息的键
    private final K key;
    // 消息的值
    private final V value;
    // 消息的时间戳,如果为null,则使用当前时间戳
    private final Long timestamp;
    // 消息的头部信息
    private final Headers headers;
    
    // .....
}
  • topic和partition字段分别代表消息要发往的主题和分区号。
  • key是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。前面提及消息以主题为单位进行归类,而这个key可以让消息再进行二次归类,同一个key的消息会被划分到同一个分区中。
  • value是指消息体,一般不为空,如果为空则表示特定的消息。
  • timestamp是指消息的时间戳,它有CreateTime和LogAppendTime两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间。

② ProducerRecord 构造函数:

public class ProducerRecord<K, V> {

    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;

    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null.");
        if (timestamp != null && timestamp < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        if (partition != null && partition < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
        this.headers = new RecordHeaders(headers);
    }

    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        this(topic, partition, timestamp, key, value, null);
    }

    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
        this(topic, partition, null, key, value, headers);
    }

    public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, null, key, value, null);
    }

    public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value, null);
    }

    public ProducerRecord(String topic, V value) {
        this(topic, null, null, null, value, null);
    }
}

生产者发送消息的分区选择逻辑:

  • 若指定Partition ID,则消息发送至指定的Partition
  • 若未指定Partition ID,但指定了Key,则消息会按照 hasy(key) 发送至对应Partition
  • 若既未指定Partition ID也没指定Key,则消息会按照round-robin模式发送到每个Partition
  • 若同时指定了Partition ID和Key,则消息只会发送到指定的Partition (Key不起作用,代码逻辑决定)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/855903.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

什么是React?React与VU的优缺点有哪些?

什么是React&#xff1f;什么是VUE&#xff1f; 维基百科上的概念解释&#xff0c;Vue.js是一个用于创建用户界面的开源MVVM前端JavaScript框架&#xff0c;也是一个创建单页应用的Web应用框架。Vue.js由尤雨溪&#xff08;Evan You&#xff09;创建&#xff0c;由他和其他活跃…

【Go语言】Golang保姆级入门教程 Go初学者chapter3

Go语言 第三章 运算符 下划线“_”本身在Go中一个特殊的标识符&#xff0c;成为空标识符。可以代表任何其他的标识符&#xff0c;但是他对应的值就会被忽略 仅仅被作为站维度使用&#xff0c; 不能作为标识符使用 因为Go语言中没有private public 所以标记变量首字母大写代表其…

Pytorch量化之Post Train Static Quantization(训练后静态量化)

使用Pytorch训练出的模型权重为fp32&#xff0c;部署时&#xff0c;为了加快速度&#xff0c;一般会将模型量化至int8。与fp32相比&#xff0c;int8模型的大小为原来的1/4, 速度为2~4倍。 Pytorch支持三种量化方式&#xff1a; 动态量化&#xff08;Dynamic Quantization&…

最大异或对

如果你觉得这篇题解对你有用&#xff0c;可以点个赞或关注再走呗&#xff0c;谢谢你的关注~ 分析 最大异或对 (1)最大异或对是运用trie树存储十进制数对应的二进制数的每一位。 (2)再根据trie树的每一位进行搜索查找&#xff0c;严格满足不同的数异或为1&#xff0c;相同的异…

【业余小练习】交互式网格自定义增删改(进行中)

学习SQL和PLISQL数据类型的区别和应用场景 Oracle plsql 基础篇1 数据类型以及流程控制_bb_tarek的博客-CSDN博客https://blog.csdn.net/bb_tarek/article/details/17555713?ops_request_misc&request_id&biz_id102&utm_termplsql%E5%9F%BA%E6%9C%AC%E6%95%B0%E6…

Unlikely argument type for equals(): String seems to be unrelated to T

Unlikely argument type for equals(): String seems to be unrelated to Integer Unlikely argument type for equals(): String seems to be unrelated to Date 多余代码

java代码审计9之XXE

文章目录 1、简介2、 java XXE审计函数3、漏洞3.1、正常的业务3.2、有回显的情况3.3、无回显的情况3.4、修复 之前的文章&#xff0c; php代码审计9之XXE 1、简介 XXE&#xff08;XML外部实体注⼊&#xff0c;XML External Entity) &#xff0c;在应⽤程序解析XML输⼊时&…

【雕爷学编程】Arduino动手做(200)---WS2812B幻彩LED灯带4

37款传感器与模块的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&#x…

linux系统虚拟主机开启支持SourceGuardian(sg11)加密组件

注意&#xff1a;sg11我司只支持linux系统虚拟主机自主安装。支持php5.3及以上版本。 1、登陆主机控制面板&#xff0c;找到【远程文件下载】这个功能。 2、远程下载文件填写http://download.myhostadmin.net/vps/sg11_for_linux.zip 下载保存的路径填写/others/ 3、点击控制…

golang 自定义exporter - 端口连接数 portConnCount_exporter

需求&#xff1a; 1、计算当前6379 、3306 服务的连接数 2、可prometheus 语法查询 下面代码可直接使用&#xff1a; 注&#xff1a; 1、windows 与linux的区分 第38行代码 localAddr : fields[1] //windows为fields[1] &#xff0c; linux为fields[3] 2、如需求 增加/修改/删除…

PHP实现在线进制转换器,10进制,2、4、8、16、32进制转换

1.接口文档 2.laravel实现代码 /*** 进制转换计算器* return \Illuminate\Http\JsonResponse*/public function binaryConvertCal(){$ten $this->request(ten);$two $this->request(two);$four $this->request(four);$eight $this->request(eight);$sixteen …

JavaScript基础 第二天

1. 运算符 2. 语句 一.运算符 1.赋值运算符 2.一元运算符 3.比较运算符 4.逻辑运算符 5.运算符优先级 1.1 赋值运算符 概念&#xff1a;对变量进行赋值的运算符 赋值运算符&#xff1a; - * / % 1.2 一元运算符 可以根据表达式的个数&#xff0c;分为一…

数据结构【第4章】——栈与队列

队列是只允许在一端进行插入操作、而在另-端进行删除操作的线性表。 栈 栈与队列&#xff1a;栈是限定仅在表尾进行插入和删除操作的线性表。 我们把允许插入和删除的一端称为栈顶&#xff08;top&#xff09;&#xff0c;另一端称为栈底&#xff08;bottom&#xff09;&…

提升客户满意度的创意项目管理软件推荐!

发现功能强大的工作管理软件&#xff0c;让创意大放异彩。将您团队的愿景变成引人注目的项目。 一、交付总是令人印象深刻的工作 Zoho Projects的创意项目管理软件可帮助您和您的团队在一个地方监督多个项目。使用我们的内置管理工具和模板&#xff0c;花更少的时间在管理上&a…

postman如何添加token

参考博客&#xff1a;https://blog.csdn.net/Mrbignose/article/details/107237581 1.添加token&#xff1a; 2.设置token&#xff1a; 3.发送时携带token&#xff1a;

【JavaEE】懒人的福音-MyBatis框架—介绍、搭建环境以及初步感受

【JavaEE】MyBatis框架要点总结&#xff08;1&#xff09; 文章目录 【JavaEE】MyBatis框架要点总结&#xff08;1&#xff09;1. MyBatis是什么&#xff1f;2. 搭建MyBatis的开发环境2.0 MySQL建库建表2.1 新项目添加MyBatis框架2.2 设置MyBatis的配置2.2.1 设置数据库的连接信…

图像的平移变换之c++实现(qt + 不调包)

1.基本原理 设dx为水平偏移量&#xff0c;dy为垂直偏移量&#xff0c;则平移变换的坐标映射关系为下公式&#xff0c;图像平移一般有两种方式。 1.不改变图像大小的平移&#xff08;一旦平移&#xff0c;相应内容被截掉&#xff09; 1&#xff09;当dx > width、dx < -wi…

【云原生】Kubernetes节点亲和性分配 Pod

目录 1 给节点添加标签 2 根据选择节点标签指派 pod 到指定节点[nodeSelector] 3 根据节点名称指派 pod 到指定节点[nodeName] 4 根据 亲和性和反亲和性 指派 pod 到指定节点 5 节点亲和性权重 6 pod 间亲和性和反亲和性及权重 7 污点和容忍度 8 Pod 拓扑分布约束 官方…

flinksql sink to sr often fail because of nullpoint

flinksql or DS sink to starrocks often fail because of nullpoint flink sql 和 flink ds sink starrocks 经常报NullpointException重新编译代码 并上传到flink 集群 验证&#xff0c;有效 flink sql 和 flink ds sink starrocks 经常报NullpointException 使用flink-sta…

【EI复现】售电市场环境下电力用户选择售电公司行为研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…