分布式 - 消息队列Kafka:Kafka生产者发送消息流程和3种方式

news2025/1/8 7:01:10

文章目录

    • 1. Kafka 生产者
    • 2. kafaka 命令行操作
    • 3. Kafka 生产者发送消息流程
    • 4. Kafka 生产者发送消息的3种方式
      • 1. 发送即忘记
      • 2. 同步发送
      • 3. 异步发送
    • 5. Kafka 消息对象 ProducerRecord

1. Kafka 生产者

Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多个主题(topic)。生产者可以将消息发送到指定的主题,也可以根据分区策略将消息发送到多个分区中。生产者可以以异步或同步方式发送消息,并且可以配置消息的可靠性和持久性等属性。在 Kafka 中,生产者是消息的源头,它们将消息发送到 Kafka 集群中,供消费者消费。

2. kafaka 命令行操作

① 启动 Zookeeper 集群:

[root@master01 bin]# pwd
/root/ch/soft/zk/zk-01/bin
[root@master01 bin]# ./zkServer.sh start
[root@master01 bin]# pwd
/root/ch/soft/zk/zk-02/bin
[root@master01 bin]# ./zkServer.sh start
[root@master01 bin]# pwd
/root/ch/soft/zk/zk-03/bin
[root@master01 bin]# ./zkServer.sh start

② 启动 kafka 集群:

[root@master01 kafka01]# pwd
/root/ch/soft/kafka/kafka01
[root@master01 kafka01]# bin/kafka-server-start.sh config/server.properties
[root@master01 kafka02]# pwd
/root/ch/soft/kafka/kafka02
[root@master01 kafka02]# bin/kafka-server-start.sh config/server.properties
[root@master01 kafka03]# pwd
/root/ch/soft/kafka/kafka03
[root@master01 kafka03]# bin/kafka-server-start.sh config/server.properties

③ 创建主题 test:

[root@master01 kafka01]#  bin/kafka-topics.sh --zookeeper localhost:2183 --create --partitions 3 --replication-factor 2  --topic test
Created topic test.
[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --describe --topic test
Topic:test      PartitionCount:3        ReplicationFactor:2     Configs:
Topic: test     Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
Topic: test     Partition: 1    Leader: 0       Replicas: 0,2   Isr: 0,2
Topic: test     Partition: 2    Leader: 1       Replicas: 1,0   Isr: 1,0

④ 生产者发送消息到主题test:

[root@master01 kafka01]# bin/kafka-console-producer.sh --broker-list 10.65.132.2:9093 --topic test
>hello
>你好,kafka!
>

⑤ 消费者消费主题test的消息:

[root@master01 kafka01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
hello
你好,kafka!

3. Kafka 生产者发送消息流程

在这里插入图片描述

① 首先要构造一个 ProducerRecord 对象,该对象可以声明主题Topic、分区Partition、键 Key以及值 Value,主题和值是必须要声明的,分区和键可以不用指定。

② 调用send() 方法进行消息发送。

③ 因为消息要到网络上进行传输,所以必须进行序列化,序列化器的作用就是把消息的 key 和value对象序列化成字节数组。

④ 接下来数据传到分区器,如果之间的 ProducerRecord 对象指定了分区,那么分区器将不再做任何事,直接把指定的分区返回;如果没有,那么分区器会根据 Key 来选择一个分区,选择好分区之后,生产者就知道该往哪个主题和分区发送记录了。

⑤ 接着这条记录会被添加到一个记录批次里面,这个批次里所有的消息会被发送到相同的主题和分区。会有一个独立的线程来把这些记录批次发送到相应的 Broker 上。

⑥ Broker 服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka ,就返回一个RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败, 就会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败, 就返回错误信息。

4. Kafka 生产者发送消息的3种方式

发送消息主要有三种模式:发后即忘记、同步及异步。在同步模式下,程序会一直等待某个操作完成后才会继续执行下一个操作,在异步模式下,程序可以同时执行多个操作,不会阻塞其他操作。

KafkaProducer 的 send() 方法用于向 Kafka 集群发送消息。该方法的语法如下:

public interface Producer<K, V> extends Closeable {
    Future<RecordMetadata> send(ProducerRecord<K, V> record);
    Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
}

其中,ProducerRecord<K, V> 表示要发送的消息记录,K 和 V 分别表示键和值的类型。send() 方法返回一个 Future 对象,表示异步发送消息的结果。

1. 发送即忘记

发送即忘记,生产者发送消息后不会等待服务器的响应,直接发送下一条消息。它只管往Kafka中发送消息而并不关心消息是否正确到达。在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。

public class CustomProducer01 {
    private static final String brokerList "10.65.132.2:9093";
    private static final String topic = "test";

    public static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }

    public static void main(String[] args) {
        // kafka生产者属性配置
        Properties properties = initConfig();
        // kafka生产者发送消息,默认是异步发送方式
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka!");
        try{
            // 发送消息
            kafkaProducer.send(producerRecord);
        }catch (Exception e){
            e.printStackTrace();
        }
        // 关闭资源
        kafkaProducer.close();
    }
}

cmd命令行窗口开启 kafka 消息者,观察消费者是否接收到消息:

[root@master01 kafka01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!

2. 同步发送

send()方法本身就是异步的,send()方法返回的Future对象可以使调用方稍后获得发送的结果。在执行send()方法之后可以调用 get()方法来阻塞等待Kafka的响应,直到消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交由外层逻辑处理。

Future 接口源码:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Future接口是Java中用于表示异步计算结果的接口。它定义了一些方法,用于查询异步计算是否完成、获取计算结果等操作。

  • cancel方法用于取消异步计算;
  • isCancelled方法用于判断异步计算是否已经被取消;
  • isDone方法用于判断异步计算是否已经完成。
  • get方法用于获取异步计算的结果,如果计算还没有完成,则该方法会阻塞直到计算完成。如果计算被取消,则该方法会抛出CancellationException异常。如果计算抛出异常,则该方法会抛出ExecutionException异常。
  • get(long timeout, TimeUnit unit)方法与get方法类似,但是它会在指定的时间内等待计算完成,如果超时则会抛出TimeoutException异常。

Future 表示一个任务的生命周期,并提供了相应的方法来判断任务是否已经完成或取消,以及获取任务的结果和取消任务等。既然KafkaProducer.send()方法的返回值是一个Future类型的对象,那么完全可以用Java语言层面的技巧来丰富应用的实现,比如使用Future中的 get(long timeout,TimeUnit unit)方法实现可超时的阻塞。

public class CustomProducer01 {
    private static final String brokerList = "10.65.132.2:9093";
    private static final String topic = "test";

    public static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }

    public static void main(String[] args) {
        // kafka生产者属性配置
        Properties properties = initConfig();
        // kafka生产者发送消息,默认是异步发送方式
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka,同步发送!");
        try{
            // 发送消息
            Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
            // 获取异步计算的结果,如果计算还没有完成,则该方法会阻塞直到计算完成
            RecordMetadata recordMetadata = future.get();
            System.out.println("metadata.topic() = " + recordMetadata.topic());
        }catch (Exception e){
            e.printStackTrace();
        }
        // 关闭资源
        kafkaProducer.close();
    }
}
[root@master01 kafka01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!
你好,kafka,同步发送!

在RecordMetadata对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。

3. 异步发送

生产者发送消息后不会等待服务器的响应,而是通过回调函数来处理服务器的响应。回调函数会在 producer 收到 ack 时调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

public class CustomProducer01 {
    private static final String brokerList = "10.65.132.2:9093";
    private static final String topic = "test";

    public static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }

    public static void main(String[] args) {
        // kafka生产者属性配置
        Properties properties = initConfig();
        // kafka生产者发送消息,默认是异步发送方式
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka,异步发送带返回值!");
        try{
            // 发送消息
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    // 说明消息发送成功
                    if(e==null){
                        System.out.println("metadata.topic() = " + recordMetadata.topic());
                        System.out.println("metadata.partition() = " + recordMetadata.partition());
                    }
                }
            });
        }catch (Exception e){
            e.printStackTrace();
        }
        // 关闭资源
        kafkaProducer.close();
    }
}
[root@master01 kafka01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!
你好,kafka,同步发送!
你好,kafka,异步发送带回调函数!

Kafka生产者异步发送消息时,可以通过指定回调函数来处理发送结果。当消息发送完成后,回调函数会被调用,以通知应用程序消息发送的结果。具体来说,当生产者成功发送消息时,回调函数会被传递一个RecordMetadata对象,该对象包含了发送消息的相关信息,如消息所在的分区、消息在分区中的偏移量等。如果发送消息失败,则回调函数会被传递一个非空的Exception对象,以指示发送失败的原因。

需要注意的是,回调函数是在生产者的I/O线程中被调用的,因此应该尽量避免在回调函数中执行耗时的操作,以免影响生产者的性能。

5. Kafka 消息对象 ProducerRecord

① ProducerRecord 成员变量:

public class ProducerRecord<K, V> {
    // 消息要发送到的主题
    private final String topic;
    // 消息要发送到的分区号,如果为null,则由Kafka自动选择分区
    private final Integer partition;
    // 消息的键
    private final K key;
    // 消息的值
    private final V value;
    // 消息的时间戳,如果为null,则使用当前时间戳
    private final Long timestamp;
    // 消息的头部信息
    private final Headers headers;
    
    // .....
}
  • topic和partition字段分别代表消息要发往的主题和分区号。
  • key是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。前面提及消息以主题为单位进行归类,而这个key可以让消息再进行二次归类,同一个key的消息会被划分到同一个分区中。
  • value是指消息体,一般不为空,如果为空则表示特定的消息。
  • timestamp是指消息的时间戳,它有CreateTime和LogAppendTime两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间。

② ProducerRecord 构造函数:

public class ProducerRecord<K, V> {

    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;

    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null.");
        if (timestamp != null && timestamp < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        if (partition != null && partition < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
        this.headers = new RecordHeaders(headers);
    }

    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        this(topic, partition, timestamp, key, value, null);
    }

    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
        this(topic, partition, null, key, value, headers);
    }

    public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, null, key, value, null);
    }

    public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value, null);
    }

    public ProducerRecord(String topic, V value) {
        this(topic, null, null, null, value, null);
    }
}

生产者发送消息的分区选择逻辑:

  • 若指定Partition ID,则消息发送至指定的Partition
  • 若未指定Partition ID,但指定了Key,则消息会按照 hasy(key) 发送至对应Partition
  • 若既未指定Partition ID也没指定Key,则消息会按照round-robin模式发送到每个Partition
  • 若同时指定了Partition ID和Key,则消息只会发送到指定的Partition (Key不起作用,代码逻辑决定)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/853000.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

wsl(在windows中使用呢linux系统)适用于windows的linux子系统

步骤可参考微软官方文档https://learn.microsoft.com/zh-cn/windows/wsl/install-manual#step-4—download-the-linux-kernel-update-package 在这里主要列举一些需要注意的点 wsl2的要求 一定要检查下windows版本&#xff0c;版本不对的先升级版本不然无法使用wsl2 wsl支持…

P4381 [IOI2008] Island (求基环树直径)

也许更好的阅读体验 D e s c r i p t i o n \mathcal{Description} Description 给一个基环树森林&#xff0c;求每棵树的直径的和&#xff0c;基环树的直径定义为&#xff0c;从一个点出发只能走到没走过的点&#xff08;即一个环不能把所有边都选&#xff09;&#xff0c;所经…

史上最细,自动化测试-logging日志采集详细实战(二)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、测试场景 给登…

固态硬盘数据恢复方法有哪些?三种恢复方法助您解忧

近年来固态硬盘比较流行&#xff0c;因为工作的需要我也在使用固态硬盘&#xff0c;它真的给我带来了很多的方便。但是最近&#xff0c;我固态硬盘里的文件有些不知道怎么就丢失了&#xff0c;这给我带来了很大的困扰。有什么方法可以找回来吗&#xff1f; 固态硬盘&#xff08…

Netty客户端同步获取结果

上次服务间通信是异步的&#xff0c;现在想实现客户端同步拿到服务端响应结果。实现如下&#xff1a; 在NettyClientHandler类中增加一个结果缓存器 Map<Long,Protocol<ResponseMsg>> resultMap new ConcurrentHashMap<>();修改方法 Override protected vo…

【文献阅读笔记】深度异常检测模型

文章目录 导读相关关键词及其英文描述记录深度异常检测模型Supervised deep anomaly detection 有监督深度异常检测Semi-Supervised deep anomaly detection 半监督深度异常检测Hybrid deep anomaly detection 混合深度异常检测One-class neural network for anomaly detection…

VR全景的盈利模式你知道吗?VR全景能用在哪些领域?

引言&#xff1a; 随着科技的迅猛发展&#xff0c;虚拟现实技术已经逐渐走进我们的生活。这项令人惊叹的技术让我们能够穿越时间与空间的限制&#xff0c;重新定义人们与世界互动的方式。 一&#xff0e;什么是VR全景&#xff1f; VR全景&#xff0c;是一种通过虚拟现实技术&…

万应低代码受邀参加上海电信“大干一场 科创沙龙”活动

7月28日&#xff0c;由上海市宝山区大场镇政府指导、中国电信上海北区局主办的“大干一场 科创沙龙”系列第九期沙龙活动顺利举办。大场镇“数字化转型”领导小组办公室&#xff08;以下简称“数字办”&#xff09;邀请了来自镇域内外的数十家科创服务企业。万应低代码作为天翼…

D5渲染器有多强大?给你10个选择它的理由

使用此分步指南&#xff0c;可以轻松快速地使用 D5 Render 创建专业视觉效果。使用 D5 Render 创建的视觉效果能够快速有效地传达信息。例如&#xff0c;简单的图形或图表可用于比较不同的数据集或显示变量之间的关系。使用 D5 Render&#xff0c;您可以创建既美观又信息丰富的…

公检系统创新:利用校对软件优化法律文书流程

公检系统可以通过利用校对软件来优化法律文书的流程&#xff0c;从而提高效率和准确性。以下是在创新方面利用校对软件的一些方法&#xff1a; 1.自动校对和修正&#xff1a;校对软件可以与公检系统集成&#xff0c;自动检测文书中的拼写、语法和标点符号错误&#xff0c;并提供…

msvcp120.dll丢失的4种解决方法,教你修复msvcp120.dll文件

当你在使用计算机时&#xff0c;经常会遇到各种各样的错误&#xff0c;最近遇到的应该就是dll文件缺失的错误吧&#xff0c;今天主要来跟大家讲解一下msvcp120.dll这个文件&#xff0c;教大家msvcp120.dll丢失的4种解决方法&#xff0c;好了&#xff0c;废话不多说&#xff0c;…

北京多铁克FPGA笔试题目

1、使用D触发器来实现二分频 2、序列检测器&#xff0c;检测101&#xff0c;输出1&#xff0c;其余情况输出0 module Detect_101(input clk,input rst_n,input data, //输入的序列output reg flag_101 //检测到101序列的输出标志 );parameter S0 2d0;S1 2d1;S2 2d2;S4 …

从金蝶云星空到金蝶云星空通过接口配置打通数据

从金蝶云星空到金蝶云星空通过接口配置打通数据 对接系统金蝶云星空 金蝶K/3Cloud结合当今先进管理理论和数十万家国内客户最佳应用实践&#xff0c;面向事业部制、多地点、多工厂等运营协同与管控型企业及集团公司&#xff0c;提供一个通用的ERP服务平台。K/3Cloud支持的协同应…

【LeetCode】数据结构题解(12)[用栈实现队列]

用栈实现队列 &#x1f609; 1.题目来源&#x1f440;2.题目描述&#x1f914;3.解题思路&#x1f973;4.代码展示 所属专栏&#xff1a;玩转数据结构题型❤️ &#x1f680; >博主首页&#xff1a;初阳785❤️ &#x1f680; >代码托管&#xff1a;chuyang785❤️ &…

flask使用cookie (设置cookie与查看cookie内容)

1.flask包cookie的使用 设置cookie app.route(/set_cookie) def set_cookie():resp make_response(Setting cookie)resp.set_cookie(username, John)return resp查看cookie: app.route(/get_cookie) def get_cookie():username request.cookies.get(username)return Welco…

gazebo 导入从blender导出的dae等文件

背景&#xff1a; gazebo 模型库里的模型在我需要完成的任务中不够用&#xff0c;还是得从 solidworks、3DMax, blender这种建模软件里面在手动画一些&#xff0c;或者去他们的库里面在挖一挖。 目录 1 blender 1-1 blender 相关links 1-2 install 2 gazebo导入模型 2-1 g…

使用imu_tools对imu_raw进行滤波处理

文章目录 1 前言2 安装3 查找自己的IMU话题4 imu_tools滤波 1 前言 imu_filter_madgwick&#xff1a;一种滤波器&#xff0c;可将来自常规IMU设备的角速度&#xff0c;加速度和磁力计读数&#xff08;可选&#xff09;融合到一个方向中。基于工作&#xff1a;http://www.x-io.…

电力系统电流三段式保护MATLAB仿真模型

整体模型如下&#xff1a; Matlab/Simulink搭建的电力系统电流保护模型采用辐射型单电源供电的运行方式 Ⅰ段保护的搭建 Ⅰ段保护为瞬时速断保护&#xff0c;根据Ⅰ段整定原则确定整定值。线路发生短路故障时&#xff0c;短路电流急剧增大&#xff1b;超过设置的整定值时&…

lokibot样本分析

火绒剑行为监控 行为监控 1.主程序在temp文件夹下释放frhdgr.exe 2.并创建进程 参数为 C:\Users\xxx\AppData\Local\Temp\frhdgr.exe C:\Users\xxx \AppData\Local\Temp\vxogkynyop 3.主进程退出 4.frhdgr.exe自我删除 并释放C:\Users\xxx\AppData\Roaming\F503CB\B28854…

【ARM Cache 系列文章 8 -- ARM DynamIQ 技术介绍

文章目录 DynamIQ 技术背景DynamIQ技术详解DynamIQ 与 big.LITTLEDynamIQ cluster 分类硬件支持 DynamIQ为什么适合人工智能&#xff1f; DynamIQ 技术背景 2017年3月21日下午&#xff0c;ARM在北京金隅喜来登酒店召开发布会&#xff0c;正式发布了全新的有针对人工智能及机器…