Kafka 之生产者(Producer)

news2025/1/24 4:51:55

目录

一. 前言

二. 生产消息

三. 幂等和事务

四. send() 发送消息

五. 原理解析


一. 前言

    Kafka生产者是一个应用程序,它负责向 Kafka 主题发送消息。这些消息可以用于多种目的,如记录用户活动、收集物联网设备数据、保存日志消息或缓存即将写入数据库的数据。

二. 生产消息

    生产者是线程安全的,在线程之间共享单个生产者实例,通常单例比多个实例要快。

    一个简单的例子,使用 Producer 发送一个有序的 key/value(键值对),放到 Java 的 main() 方法里就能直接运行(支持的版本 >= 0.9):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();

    生产者的缓冲空间池保留尚未发送到服务器的消息,后台 I/O 线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会丢失这些消息。

    send() 方法是异步的,添加消息到缓冲区等待发送,并立即返回。生产者将单个的消息批量在一起发送来提高效率。

    ack 是判别请求是否为完整的条件(就是判断是不是成功发送了)。我们指定了 all 将会阻塞消息,这种设置性能最低,但是是最可靠的。

    retries,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。

    producer(生产者)缓存每个分区未发送的消息。缓存的大小是通过 batch.size 配置指定的。值较大的话将会产生更大的批次。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。

    默认缓冲可立即发送,即便缓冲空间还没有满。但是,如果你想减少请求的数量,可以设置linger.ms 大于0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批次中。这类似于 TCP 的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了 linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区,这个设置将增加1毫秒的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是 linger.ms=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的、更有效的请求。

    buffer.memory 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过 max.block.ms设定,之后它将抛出一个 TimeoutException。

    key.serializervalue.serializer 示例,将用户提供的 key 和 value 对象 ProducerRecord 转换成字节,你可以使用附带的 ByteArraySerializaer 或 StringSerializer 处理简单的 String 或 Byte 类型。

三. 幂等和事务

    从 Kafka 0.11 开始,KafkaProducer 又支持两种模式:幂等生产者事务生产者。幂等生产者加强了 Kafka 的交付语义,从至少一次交付到精确一次交付。特别是生产者的重试将不再引入重复。事务性生产者允许应用程序原子地将消息发送到多个分区(和主题)。

    要启用幂等(idempotence),必须将 enable.idempotence 配置设置为 true。如果设置,则retries(重试)配置将默认为 Integer.MAX_VALUE,acks 配置将默认为 all。API 没有变化,所以无需修改现有应用程序即可利用此功能。

    此外,如果 send(ProducerRecord) 即使在无限次重试的情况下也会返回错误(例如消息在发送前在缓冲区中过期),那么建议关闭生产者,并检查最后产生的消息的内容,以确保它不重复。最后,生产者只能保证单个会话内发送的消息的幂等性

    要使用事务生产者和 attendant API,必须设置 transactional.id。如果设置了 transactional.id,幂等性会和幂等所依赖的生产者配置一起自动启用。此外,应该对包含在事务中的 Topic 进行耐久性配置。特别是,replication.factor 应该至少是3,而且这些 Topic 的 min.insync.replicas 应该设置为2。最后,为了实现从端到端的事务性保证,消费者也必须配置为只读取已提交的消息。

    transactional.id 的目的是实现单个生产者实例的多个会话之间的事务恢复。它通常是由分区、有状态的应用程序中的分片标识符派生的。因此,它对于在分区应用程序中运行的每个生产者实例来说应该是唯一的。

    所有新的事务性 API 都是阻塞的,并且会在失败时抛出异常。下面的例子说明了新的 API 是如何使用的。它与上面的例子类似,只是所有100条消息都是一个事务的一部分。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

producer.initTransactions();

try {
    producer.beginTransaction();
    for (int i = 0; i < 100; i++)
        producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // We can't recover from these exceptions, so our only option is to close the producer and exit.
    producer.close();
} catch (KafkaException e) {
    // For all other exceptions, just abort the transaction and try again.
    producer.abortTransaction();
}
producer.close();

    如示例所示,每个生产者只能有一个未完成的事务。在 beginTransaction()commitTransaction() 调用之间发送的所有消息都将是单个事务的一部分。当指定 transactional.id时,生产者发送的所有消息都必须是事务的一部分。

    事务生产者使用异常来传递错误状态。特别是,不需要为 producer.send() 指定回调,也不需要在返回的 Future 上调用 get()。如果任何 producer.send() 或事务性调用在事务过程中遇到不可恢复的错误,就会抛出 KafkaException。

    该客户端可以与 0.10.0 或更高版本的 Broker 进行通信。旧的或较新的 Broker 可能不支持某些客户端功能。例如,事务性 API 需要 0.11.0 或更新版本的 Broker。当调用在运行的 Broker 版本中不可用的 API 时,您将收到 UnsupportedVersionException。

四. send() 发送消息

public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)

异步发送一条消息到 Topic,并调用 callback(当发送已确认)。语法说明如下: 

参数:

  • record:发送的记录(消息)。
  • callback:用户提供的 callback,服务器调用这个 callback 来应答结果(null 表示没有callback)。

声明的异常:

  • InterruptException:如果线程阻塞中断。
  • SerializationException:如果 key 或 value 不是给定有效配置的 serializers。
  • TimeoutException:如果获取元数据或消息分配内存花费的时间超过 max.block.ms。
  • KafkaException:Kafka 有关的错误(不属于公共 API 的异常)。

    send() 是异步的,并且一旦消息被保存在等待发送的消息缓存中,此方法就立即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。

    发送的结果是一个 RecordMetadata,它指定了消息发送的分区,分配的 offset 和消息的时间戳。如果 Topic 使用的是 CreateTime,则使用用户提供的时间戳或发送的时间(如果用户没有指定指定消息的时间戳)如果 Topic 使用的是 LogAppendTime,则追加消息时,时间戳是 Broker 的本地时间。

    由于 send() 调用是异步的,它将为此消息的 RecordMetadata 返回一个 Future。如果 future 调用 get(),则将阻塞,直到相关请求完成并返回该消息的 metadata,或抛出发送异常。

如果要模拟一个简单的阻塞调用,你可以调用 get() 方法:

byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();

完全无阻塞的话,可以利用回调参数提供的请求完成时将调用的回调通知:

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null)
                           e.printStackTrace();
                       System.out.println("The offset of the record we just sent is: " + metadata.offset());
                   }
               });

发送到同一个分区的消息回调保证按一定的顺序执行,也就是说,在下面的例子中 callback1 保证执行在 callback2 之前:

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);

注意:callback 一般在生产者的 I/O 线程中执行,所以是相当快的,否则将延迟其他的线程的消息发送。如果你需要执行阻塞或计算昂贵(消耗)的回调,建议在 callback 主体中使用自己的Executor 来并行处理。

五. 原理解析

由上图可以看出:KafkaProducer 有两个基本线程: 

1. 主线程:负责消息创建、拦截器、序列化器、分区器等操作,并将消息追加到消息收集器RecoderAccumulator 中;

  • 消息收集器 RecoderAccumulator 为每个分区都维护了一个 Deque<ProducerBatch> 类型的双端队列。
  • ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低网络影响。
  • 由于生产者客户端使用 java.io.ByteBuffer 在发送消息之前进行消息保存,并维护了一个 BufferPool 实现 ByteBuffer 的复用。该缓存池只针对特定大小(batch.size 指定)的 ByteBuffer 进行管理,对于消息过大的缓存,不能做到重复利用。
  • 每次追加一条 ProducerRecord 消息,会寻找/新建对应的双端队列,从其尾部获取一个ProducerBatch,判断当前消息的大小是否可以写入该批次中。若可以写入则写入;若不可以写入,则新建一个 ProducerBatch,判断该消息大小是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size 建立新的 ProducerBatch,这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的 ProducerBatch ,缺点就是该内存不能被复用了。

2. Sender线程:

  • 该线程从消息收集器获取缓存的消息,将其处理为 <Node, List<ProducerBatch> 的式,Node 表示集群的 Broker 节点。
  • 进一步将 <Node, List<ProducerBatch> 转化为 <Node, Request> 形式,此时才可以向服务端发送数据。
  • 在发送之前,Sender 线程将消息以 Map<NodeId, Deque<Request>> 的形式保存到InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode,即当前 Node 中负载压力最小的一个,以实现消息的尽快发出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1446294.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《乱弹篇(十四)香火旺》

连日来&#xff0c;“大年初一烧香祈福&#xff0c;北京雍和宫人山人海”这一词条登上社交网站热搜&#xff0c;对这一现象的描述多为“初一凌晨 民众在雍和宫前排大队”&#xff0c;“大年初一&#xff0c;雍和宫内人山人海&#xff0c;烟雾缭绕”&#xff0c;“雍和宫迎来6万…

Asp .Net Core 系列:Asp .Net Core 集成 NLog

简介 NLog是一个基于.NET平台编写的日志记录类库&#xff0c;它可以在应用程序中添加跟踪调试代码&#xff0c;以便在开发、测试和生产环境中对程序进行监控和故障排除。NLog具有简单、灵活和易于配置的特点&#xff0c;支持在任何一种.NET语言中输出带有上下文的调试诊断信息…

三、案例 - MySQL数据迁移至ClickHouse

MySQL数据迁移至ClickHouse 一、生成测试数据表和数据1.在MySQL创建数据表和数据2.在ClickHouse创建数据表 二、生成模板文件1.模板文件内容2.模板文件参数详解2.1 全局设置2.2 数据读取&#xff08;Reader&#xff09;2.3 数据写入&#xff08;Writer&#xff09;2.4 性能设置…

ctfshow-文件上传(web151-web161)

目录 web151 web152 web153 web154 web155 web156 web157 web158 web159 web160 web161 web151 提示前台验证不可靠 那限制条件估计就是在前端设置的 上传php小马后 弹出了窗口说不支持的格式 查看源码 这一条很关键 这种不懂直接ai搜 意思就是限制了上传类型 允许…

计算机组成原理 1 概论

主要内容 介绍运算器、控制器、存储器结构、工作原理、设计方法及互连构成整机的技术。 主要内容&#xff1a; ◼ 数值表示与运算方法 ◼ 运算器的功能、组成和基本运行原理 ◼ 存储器及层次存储系统 ◼ 指令系统 ◼ CPU功能、组成和运行原理 ◼ 流水线 ◼ 系统总线 ◼ 输入输出…

UE5 播放本地MP3、MP4

1.创建一个媒体播放器 2.如创建视频&#xff0c;勾选。 它会多一个媒体纹理给你 3.1 设置音频 在一个actor上添加“媒体音频组件” “音频媒体播放器”赋值给它 3.2播放音频 添加一个音频媒体播放器变量&#xff0c; 赋值 地址使用绝对地址 4.1设置视频 UI上创建一个imag…

快速学习Spring

Spring 简介 Spring 是一个开源的轻量级、非侵入式的 JavaEE 框架&#xff0c;它为企业级 Java 应用提供了全面的基础设施支持。Spring 的设计目标是简化企业应用的开发&#xff0c;并解决 Java 开发中常见的复杂性和低效率问题。 Spring常用依赖 <dependencies><!-…

Linux:信号的处理

文章目录 信号处理 本篇总结的是关于信号的处理 信号处理 在之前有这样的观点&#xff1a;信号在合适的时候被处理好&#xff0c;当进程收到信号后&#xff0c;当前进程可能在做优先级更高的事&#xff0c;所以它来不及处理这个信号&#xff0c;那么就会把这个信号暂时保存起…

spring aop @annotation的用法

直接看原文: spring aop annotation的用法-CSDN博客 -------------------------------------------------------------------------------------------------------------------------------- annotation用在定义连接点时&#xff0c;对连接点进行限制。比如我们想对标注了…

双非本科准备秋招(18.2)—— 图解Monitor

对象头 普通对象&#xff1a; 数组对象&#xff1a; java中对象存储结构分为对象头&#xff08;Header&#xff09;、实例数据&#xff08;Instance Date&#xff09;和对齐填充&#xff08;Padding&#xff09;。 对象头存储着Mark Word和Klass Word&#xff0c;通过Klass Wo…

【MySQL】操作库 —— 库的操作 -- 详解

一、增删数据库 1、创建数据库 create database db_name; 本质就是在 /var/lib/mysql 创建一个目录。 说明&#xff1a; 大写的表示关键字。[ ] 是可选项。CHARACTER SET&#xff1a;指定数据库采用的字符集。COLLATE&#xff1a;指定数据库字符集的校验规则。 2、数据库删除…

背包问题(理论)

对于面试的话&#xff0c;掌握01背包、完全背包&#xff0c;就够用了&#xff0c;最多可以再来一个多重背包。 至于背包九讲其他背包&#xff0c;面试几乎不会问&#xff0c;都是竞赛级别的了&#xff0c;leetcode上连多重背包的题目都没有&#xff0c;所以题库也告诉我们&…

NodeJS安装(windows)

NodeJS安装&#xff08;windows&#xff09; 1、官网地址 NodeJS官网地址&#xff1a;https://nodejs.org/en 2、安装 3、验证NodeJS环境变量 cmd后&#xff0c;运行&#xff1a;node -v 4、配置npm的全局安装路径&#xff08;需要管理员身份运行&#xff09; npm conf…

【后端高频面试题--设计模式上篇】

&#x1f680; 作者 &#xff1a;“码上有前” &#x1f680; 文章简介 &#xff1a;后端高频面试题 &#x1f680; 欢迎小伙伴们 点赞&#x1f44d;、收藏⭐、留言&#x1f4ac; 什么是设计模式&#xff1f;怎么理解设计模式&#xff1f; 设计模式是在软件设计中&#xff0c…

三.AV Foundation 视频播放 - 播放控制

引言 前面的博客我们已经实现了视频的播放功能&#xff0c;但是作为一个完整的视频播放器仅仅有播放功能是不够的&#xff0c;暂停&#xff0c;快进&#xff0c;播放进度条&#xff0c;显示播放时间&#xff0c;显示视频标题和字幕都是必不可少的功能。 本篇博客我们就对视频…

Spring Cloud Gateway 网关路由

一、路由断言 路由断言就是判断路由转发的规则 二、路由过滤器 1. 路由过滤器可以实现对网关请求的处理&#xff0c;可以使用 Gateway 提供的&#xff0c;也可以自定义过滤器 2. 路由过滤器 GatewayFilter&#xff08;默认不生效&#xff0c;只有配置到路由后才会生效&#x…

【原创 附源码】Flutter安卓及iOS海外登录--Apple登录最详细流程

最近接触了几个海外登录的平台&#xff0c;踩了很多坑&#xff0c;也总结了很多东西&#xff0c;决定记录下来给路过的兄弟坐个参考&#xff0c;也留着以后留着回顾。更新时间为2024年2月12日&#xff0c;后续集成方式可能会有变动&#xff0c;所以目前的集成流程仅供参考&…

【GameFramework框架内置模块】1、全局配置(Config)

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址 大家好&#xff0c;我是佛系工程师☆恬静的小魔龙☆&#xff0c;不定时更新Unity开发技巧&#xff0c;觉得有用记得一键三连哦。 一、前言 【GameFramework框架】系列教程目录&#xff1a; https://blog.csdn.net/q7…

AtCoder Beginner Contest 340(A-G)

A - Arithmetic Progression (atcoder.jp) 1.思路&#xff1a;循环输出即可 2.代码&#xff1a; #include <bits/stdc.h> #define rep(i,z,n) for(int i z;i < n; i) #define per(i,n,z) for(int i n;i > z; i--) #define PII pair<int,int> #define fi f…

【51单片机】串口(江科大)

8.1串口通信 1.串口介绍 2.硬件电路 3.电平标准 电平标准是数据1和数据0的表达方式,是传输线缆中人为规定的电压与数据的对应关系,串口常用的电平标准有如下三种: 电平标准是数据1和数据O的表达方式,是传输线缆中人为规定的电 压与数据的对应关系,串口常用的电平标准有如下…