Kafka生产者使用案例

news2025/1/22 13:11:28

1.生产者发送消息的过程

首先介绍一下 Kafka 生产者发送消息的过程:

1)Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。

2) 接下来,数据被传给分区器。如果之前已经在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情。如果没有指定分区 ,那么分区器会根据 ProducerRecord 对象的键来选择一个分区,紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。

3) 服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,如果达到指定的重试次数后还没有成功,则直接抛出异常,不再重试。

2.创建生产者

2.1 项目依赖

本项目采用 Maven 构建,想要调用 Kafka 生产者 API,需要导入 `kafka-clients` 依赖,如下:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>

2.2 创建生产者

创建 Kafka 生产者时,以下三个属性是必须指定的:

bootstrap.servers:指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错;

key.serializer:指定键的序列化器;

value.serializer:指定值的序列化器。

创建的示例代码如下:

public class SimpleProducer {

    public static void main(String[] args) {

        String topicName = "Hello-Kafka";

        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop001:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /*创建生产者*/
        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "hello" + i, 
                                                                         "world" + i);
            /* 发送消息*/
            producer.send(record);
        }
        /*关闭生产者*/
        producer.close();
    }
}

2.3 测试

2.3.1启动Kakfa

Kafka 的运行依赖于 zookeeper,需要预先启动,可以启动 Kafka 内置的 zookeeper,也可以启动自己安装的:

zookeeper启动命令
bin/zkServer.sh start

# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties

启动单节点 kafka 用于测试:

bin/kafka-server-start.sh config/server.properties

2.3.2 创建topic

# 创建用于测试主题
bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                     --replication-factor 1 --partitions 1 \
                     --topic Hello-Kafka

# 查看所有主题
 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092

2.3.3 启动消费者

 启动一个控制台消费者用于观察写入情况,启动命令如下:

bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic Hello-Kafka --from-beginning

2.3.4 运行项目

此时可以看到消费者控制台,输出如下,这里 `kafka-console-consumer` 只会打印出值信息,不会打印出键信息。

2.4 可能出现的问题

在这里可能出现的一个问题是:生产者程序在启动后,一直处于等待状态。这通常出现在你使用默认配置启动 Kafka 的情况下,此时需要对 `server.properties` 文件中的 `listeners` 配置进行更改:

hadoop001 为我启动kafka服务的主机名,你可以换成自己的主机名或者ip地址
listeners=PLAINTEXT://hadoop001:9092

3.发送消息

上面的示例程序调用了 `send` 方法发送消息后没有做任何操作,在这种情况下,我们没有办法知道消息发送的结果。想要知道消息发送的结果,可以使用同步发送或者异步发送来实现。

3.1 同步发送

在调用 `send` 方法后可以接着调用 `get()` 方法,`send` 方法的返回值是一个 Future<RecordMetadata>对象,RecordMetadata 里面包含了发送消息的主题、分区、偏移量等信息。改写后的代码如下:

for (int i = 0; i < 10; i++) {
    try {
        ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
        /*同步发送消息*/
        RecordMetadata metadata = producer.send(record).get();
        System.out.printf("topic=%s, partition=%d, offset=%s \n",
                metadata.topic(), metadata.partition(), metadata.offset());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

此时得到的输出如下:偏移量和调用次数有关,所有记录都分配到了 0 分区,这是因为在创建 `Hello-Kafka` 主题时候,使用 `--partitions` 指定其分区数为 1,即只有一个分区。

topic=Hello-Kafka, partition=0, offset=40 
topic=Hello-Kafka, partition=0, offset=41 
topic=Hello-Kafka, partition=0, offset=42 
topic=Hello-Kafka, partition=0, offset=43 
topic=Hello-Kafka, partition=0, offset=44 
topic=Hello-Kafka, partition=0, offset=45 
topic=Hello-Kafka, partition=0, offset=46 
topic=Hello-Kafka, partition=0, offset=47 
topic=Hello-Kafka, partition=0, offset=48 
topic=Hello-Kafka, partition=0, offset=49 

3.2 异步发送

通常我们并不关心发送成功的情况,更多关注的是失败的情况,因此 Kafka 提供了异步发送和回调函数。 代码如下:

for (int i = 0; i < 10; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
    /*异步发送消息,并监听回调*/
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                System.out.println("进行异常处理");
            } else {
                System.out.printf("topic=%s, partition=%d, offset=%s \n",
                        metadata.topic(), metadata.partition(), metadata.offset());
            }
        }
    });
}

4.自定义分区器

Kafka 有着默认的分区机制:

如果键值为 null, 则使用轮询 (Round Robin) 算法将消息均衡地分布到各个分区上;

如果键值不为 null,那么 Kafka 会使用内置的散列算法对键进行散列,然后分布到各个分区上。

某些情况下,你可能有着自己的分区需求,这时候可以采用自定义分区器实现。这里给出一个自定义分区器的示例:

4.1 自定义分区器

/**
 * 自定义分区器
 */
public class CustomPartitioner implements Partitioner {

    private int passLine;

    @Override
    public void configure(Map<String, ?> configs) {
        /*从生产者配置中获取分数线*/
        passLine = (Integer) configs.get("pass.line");
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, 
                         byte[] valueBytes, Cluster cluster) {
        /*key 值为分数,当分数大于分数线时候,分配到 1 分区,否则分配到 0 分区*/
        return (Integer) key >= passLine ? 1 : 0;
    }

    @Override
    public void close() {
        System.out.println("分区器关闭");
    }
}

需要在创建生产者时指定分区器,和分区器所需要的配置参数:

public class ProducerWithPartitioner {

    public static void main(String[] args) {

        String topicName = "Kafka-Partitioner-Test";

        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop001:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        /*传递自定义分区器*/
        props.put("partitioner.class", "com.heibaiying.producers.partitioners.CustomPartitioner");
        /*传递分区器所需的参数*/
        props.put("pass.line", 6);

        Producer<Integer, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i <= 10; i++) {
            String score = "score:" + i;
            ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, i, score);
            /*异步发送消息*/
            producer.send(record, (metadata, exception) ->
                    System.out.printf("%s, partition=%d, \n", score, metadata.partition()));
        }

        producer.close();
    }
}

4.2 测试

需要创建一个至少有两个分区的主题:

 bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                     --replication-factor 1 --partitions 2 \
                     --topic Kafka-Partitioner-Test

此时输入如下,可以看到分数大于等于 6 分的都被分到 1 分区,而小于 6 分的都被分到了 0 分区。

score:6, partition=1, 
score:7, partition=1, 
score:8, partition=1, 
score:9, partition=1, 
score:10, partition=1, 
score:0, partition=0, 
score:1, partition=0, 
score:2, partition=0, 
score:3, partition=0, 
score:4, partition=0, 
score:5, partition=0, 
分区器关闭

5.生产者其他属性

上面生产者的创建都仅指定了服务地址,键序列化器、值序列化器,实际上 Kafka 的生产者还有很多可配置属性,如下:

1. acks

acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的:

acks=0: 消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;

acks=1: 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;

acks=all:只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。

2. buffer.memory

设置生产者内存缓冲区的大小。

3. compression.type

默认情况下,发送的消息不会被压缩。如果想要进行压缩,可以配置此参数,可选值有 snappy,gzip,lz4。

4. retries

发生错误后,消息重发的次数。如果达到设定值,生产者就会放弃重试并返回错误。

5. batch.size

当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。

6. linger.ms

该参数制定了生产者在发送批次之前等待更多消息加入批次的时间。

7. clent.id

客户端 id,服务器用来识别消息的来源。

8. max.in.flight.requests.per.connection

指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量,把它设置为 1 可以保证消息是按照发送的顺序写入服务器,即使发生了重试。

9. timeout.ms, request.timeout.ms & metadata.fetch.timeout.ms

- timeout.ms 指定了 borker 等待同步副本返回消息的确认时间;

- request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间;

- metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如分区首领是谁)时等待服务器返回响应的时间。

10. max.block.ms

指定了在调用 `send()` 方法或使用 `partitionsFor()` 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。

11. max.request.size

该参数用于控制生产者发送的请求大小。它可以指发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1000K ,那么可以发送的单个最大消息为 1000K ,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1K。 

12. receive.buffer.bytes & send.buffer.byte

这两个参数分别指定 TCP socket 接收和发送数据包缓冲区的大小,-1 代表使用操作系统的默认值。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1084832.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

FPGA面试题(7)

一.解释一下SPI的四种模式 01时钟极性CPOL空闲状态为低电平空闲状态为高电平时钟相位CPHA在第一个跳变沿采样在第二个跳变沿采样 模式CPOLCPHA描述模式000sclk上升沿采样&#xff0c;sclk下降沿发送模式101sclk上升沿发送&#xff0c;sclk下降沿采样模式210sclk上升沿发送&…

解决nav2_bringup tb3_simulation_launch.py 无法启动Gazebo的问题

方法 1 断网再开gazebo. 评价: 方便且有效, 但来回联网很麻烦 参考: https://blog.csdn.net/James___H/article/details/116906217 方法 2 断网能打开是因为gazebo软件开启时会自动从网络下载模型&#xff0c;下载过程必然漫长, 另外你懂的, 网络问题嘛, vpn也解决不了的话…

jmeter压测记录、使用方法

jmeter压测记录、使用方法 1、非gui方式执行压测命令2、压测命令输出解读 1、非gui方式执行压测命令 sh jmeter.sh -n -t test.jmx -l result.jtl2、压测命令输出解读 Active: 10 Started: 10 Finished: 0 Active: 10 表示一共10个活动&#xff08;正在进行的压测线程&#…

基于Cl2/BCl3电感偶联等离子体的氮化镓干蚀特性

引言 氮化镓(GaN)具有六方纤锌矿结构&#xff0c;直接带隙约为3.4eV&#xff0c;目前已成为实现蓝光发光二极管(led)的主导材料。由于GaN的高化学稳定性&#xff0c;在室温下用湿法化学蚀刻来蚀刻或图案化GaN是非常困难的。与湿法蚀刻技术相比&#xff0c;干法蚀刻技术可以提供…

前端axios下载导出文件工具封装

使用示例&#xff1a; import { fileDownload } from /utils/fileDownloadfileDownload({ url: process.env.VUE_APP_BASE_URL /statistic/pageList/export, method: post, data: data })工具类&#xff1a; import store from ../store/index import {getAccessToken } fro…

【Lombok的Bug记录】前端传的有值,但是到后端就全为空了

项目场景&#xff1a; 项目背景&#xff1a;使用Data注解标注类 问题描述 前端传的有值&#xff0c;但是到后端就全为空了 原因分析&#xff1a; AName和aName生成的set方法名是一样的&#xff0c;所以换名字就行了&#xff01; 解决方案&#xff1a; 属性不要写成xXxx的形式…

为什么手机会莫名多出许多软件?

许多手机用户都曾遭遇过这样的问题&#xff0c;他们在使用手机的过程中&#xff0c;突然发现手机屏幕上出现了一些未知的软件。这些软件并非他们主动下载的&#xff0c;但它们却显现在屏幕上。这些软件从何而来&#xff1f; 其实&#xff0c;这些软件往往是在浏览网页、阅读小…

spring 注入 当有两个参数的时候 接上面

新加一个int 型的 age 记得写getset方法和构造方法 &#xff08;&#xff08;&#xff08;&#xff08;&#xff08;&#xff08;&#xff08; 构造方法的作用——无论是有参构造还是无参构造&#xff0c;他的作用都是为了方便为对象的属性初始化值 构造方法是一种特殊的方…

UnrealEngine iOS 打包 —— 签名证书(cer、p12)生成

官方文档 docs.unrealengine.com/5.3/zh-CN/setting-up-ios-tvos-and-ipados-provisioning-profiles-and-signing-certificates-for-unreal-engine-projects 打开 ProjectSettings -> Platforms -> iOS 可以看到签名证书配置 需要拓展名为 .cer 和 .p12 的一对证书和密钥…

虹科方案 | AR助力仓储物流突破困境:规模化运营与成本节约

文章来源&#xff1a;虹科数字化AR 点击阅读原文&#xff1a;https://mp.weixin.qq.com/s/xis_I5orLb6RjgSokEhEOA 虹科方案一览 HongKe DigitalizationAR 当今的客户体验要求企业在人员、流程和产品之间实现全面的连接。为了提升整个组织的效率并提高盈利能力&#xff0c;物流…

Vscode 插件-代码敲出不同的特效

为了让写代码的时候增加一点趣味性&#xff0c;vscode有个插件&#xff0c;可以增加烟花特效&#xff0c;还挺好玩的。 一.在应用商店下载这个插件 二. 在设置里 添加配置文件 settings.json //是否开启"powermode.enabled": true,//效果样式 “水花-particles”,…

虹科方案 | 虹科ATTO加速虚拟存储管理

虹科方案 | 虹科ATTO加速虚拟存储管理 文章来源&#xff1a;虹科网络安全 点此阅读原文&#xff1a;https://mp.weixin.qq.com/s/SYruurSQSodUvyhZBr-BMQ 1 方案背景 企业越来越多地转向服务器虚拟化&#xff0c;以有效利用硬件资源、降低运营成本&#xff0c;并为维护和灾难恢…

WPF中的多重绑定

MultiBinding 将会给后端传回一个数组, 其顺序为绑定的顺序. 例如: <DataGridMargin"10"AutoGenerateColumns"False"ItemsSource"{Binding Stu}"><DataGrid.Columns><DataGridTextColumn Binding"{Binding Id}" Header…

求二叉树叶子节点的个数——递归

节点时NULL——》返回0 节点是叶子——》返回1 节点不是空也不是叶子&#xff1a;递归 代码&#xff1a; int BinaryTreeLeafSize(BTNode* root) {if (root NULL){return 0;}if (root->left NULL && root->right NULL){return 1;} return BinaryTreeLeafSiz…

如何有效改进erp管理系统?erp管理系统改进建议方向

前言&#xff1a; 说到erp&#xff0c;全称是企业资源计划&#xff0c;这可是企业管理的大杀器&#xff0c;也是现在企业管理的必备神器。它的出身可以追溯到上世纪90年代&#xff0c;那时候的企业管理可是个大难题&#xff0c;各种资源调配不灵光&#xff0c;企业主们急需一种…

快速构建代理应对

今天我要和大家分享一个解决反爬策略升级问题的方法&#xff0c;那就是快速构建代理池。如果您是一位爬虫开发人员&#xff0c;一定深知反爬策略的烦恼。但是&#xff0c;通过构建代理池&#xff0c;您可以轻松地应对反爬策略的升级&#xff0c;让您的爬虫持续高效运行。接下来…

vite vite.config.js中的配置

vite打包依赖于 rollup和esbuild rollup中文文档 esbulid中文文档 基本配置 import { defineConfig, loadEnv } from "vite"; import vue from "vitejs/plugin-vue"; import path from "path";import Components from "unplugin-vue-com…

pycharm的debug,你知道每个按钮对应哪个功能吗?

本文讲解pycharm的debug 1. debug的汇总图2. 第一个图标&#xff08;Step Over&#xff09;3. 第二个图标&#xff08;Step into&#xff09;4. 第三个图标&#xff08;Step Into My Code&#xff09;5. 第四个图标&#xff08;Step Out&#xff09;6. 第五个图标&#xff08;R…

02 stm32-hal库 timer 基本定时器设定

1.配置始终时钟参数 >2. 初始化 MX_TIM3_Init();/* USER CODE BEGIN 2 */HAL_TIM_Base_Start_IT(&htim3);> 3.增加回调函数 4 中断服务函数 void TIM3_IRQHandler(void) {/* USER CODE BEGIN TIM3_IRQn 0 *//* USER CODE END TIM3_IRQn 0 */HAL_TIM_IRQHandler(&…

KITTI数据集中的二进制激光雷达数据(.bin文件)转换为点云数据(.pcd文件)(C++代码)

目录 main.cpp CMakeLists.txt main.cpp #include <pcl/io/pcd_io.h> #include <pcl/point_types.h> #include <fstream> #include <iostream> #include <vector>int main() {// Define file pathsstd::string input_filename "/home/f…