Producer

news2025/1/11 15:08:04

Producer开发样例

版本说明

新客在这里插入图片描述
户端, 从Kafka 0.9.x 开始, client基于Java语言实现。同时提供C/C++, Python等其他客户端实现。

开发步骤

  1. 配置客户端参数以及创建客户端实例;
  2. 构建待发送消息;
  3. 发送消息;
  4. 关闭生产者实例;

代码示例

public class KafkaProducer {
    public static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.server", "localhost:9092");
        // key.serializer
        // value.serializer
        // client.id xxx
        return props;
    }
    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Hello, world");
        try {
            producer.send(record);
        } catch(Exception e) {

        }
    }
}

Producer参数配置

配置项目

部分配置项在后续文章中介绍

配置项意义
bootstrap.serversbroker列表(至少2个, client会从中得到所有)
key.serializer序列化key使用
value.serializer序列化value使用
client.id默认为"“,不设置会创建为"producer-1”,"producer-2"等
partitioner.class为消息分配分区使用
interceptor.classes执行消息拦截逻辑

小技巧

基本原则: 将拼写配置转换为代码编译, 借助代码编译器的校验能力来辅助检查

  1. 配置项拼写错误通过引用静态变量解决;
  2. key.serializer填写应该为全限定类名, 容易拼写错误, 可以基于Serializer.class.getName()来解决;
  3. KafkaProducer是ThreadSafe;

消息发送

消息构造

由于使用Kafka发送消息是一个非常频繁的动作, 因此ProduceRecord的构造也非常频繁。构造ProducerRecord对象, 必选属性key,value, 其他均为可选属性。

class ProduceRecord {
    String topic;
    Integer partition;
    Headers headers; // 增加应用相关信息
    K key; // 相同key被发送到同一个partition, 支持消息压缩
    V value;
    Long timestamp; // CreateTime 创建时间; LogAppendTime 追加到日志文件的时间;
}

发送方式

Kafka Producer本身支持3种模式, 同步, 异步和发后即忘, 并且Kafka Producer在实现上做到了三种模式的统一。
Producer#send声明如下:

Future<ReocrdMetadata> send(ProducerRecord<> record);

具体具体使用哪种模式, 取决于我们对返回值Future的处理。

模式实现Future处理
同步发送线程,Future#get获得结果
异步非发送线程单独处理
发后即忘不处理

关于异步模式, 实际中更多基于callback处理, 即调用send(record, callback)方法比较多, 避免应用侧对Future的管理。Producer内部可以保证对callback调用的顺序是分区有序。

class Callback{
    public void onComplete(RecordMetadata meta, Exception e) {}
}

异常处理

发送异常一般由2种, 可重试异常(多由于集群处于一种状态迁移过程中, 比如Leader选举过程, partition rebalance过程)和不可重试异常(不满足硬性约束, 比如RecordTooLarge)。对于可重试异常, 如果配置了retries参数, KafkaProducer内部会自动重试给定次数, 依然不成功则抛出异常。

|发送模式| 结果 | 可靠性 vs 性能 |
|—|—|—|—|
|同步| 成功或异常 | 可靠性最好但牺牲性能 |
|异步| 成功或异常 | 兼顾可靠性和性能 |
|发送即忘| 不确定 | 性能最好牺牲可靠性 |

资源释放

直接通过close()或者close(long timeout, TimeUnit timeUnit)方法完成。后者支持等待一定时间, 建议基于后者来完成, 实际应用中的关闭是个复杂的过程, 也是会受到协作应用影响的过程, 但好在最终由操作系统兜底完成资源释放。底线是避免应用侧产生错误数据, 因此如何关闭是个case by case的选择。

serializer

作用

发送侧: 将待发送的对象转换为byte[], 在网络上传输。
接受侧: 将接收到的byte[]转换为Java对象, 在应用中处理。

常见类型

Byte、Short、Long、Float、Double、String对应的Serializer。当然也可以自己实现Serializer。

约束

发送侧和接收侧应该使用兼容的Serialzer, 否则无法进行消息解码, 因此建议使用通用serilizer。

partitioner

作用

给待发送的消息分配消息分区。如果ProducerRecord中的partition字段已设置, 则Partitioner不起作用, 否则将由Partitioner决定消息分区。

默认与自定义

Kafka默认的Partioner是DefaultPartioner。我们可以基于Partioner接口进行自定义, 自定义Partitioner可以通过partitioner.class来显示指定。

使用案例

大型电商存在多个仓库, 使用仓库名称或者ID作为key, 灵活记录商品/发单信息。

Producer Interceptor

声明与作用

ProducerInterceptor声明

ProducerRecord<K,V> onSend(ProducerRecord<K,V> record);
void onAcknowledgement(RecordMetadata metadata, Exception e);
void close();

KafkaProducer在消息序列化和分区前调用onSend, 在有发送结果后调用onAcknowledgement,该方法提前于Callback执行。
自定义实现后需要在配置项interceptor.classes中声明。
拦截器可以按顺序形成拦截器链, Kafka的拦截器链会从上一个执行成功的上下文继续执行, 如果拦截器出现异常, 可能产生副作用。

使用场景

  1. 类型于Java Web开发中的Filter, 增加一些通用的规则性逻辑, 比如增加统一前缀。

整体流程

在这里插入图片描述

消息发送过程涉及两个关键线程main和sender。Main Thread中, 应用侧完成消息放入RecordAccumulator中。sender则轮询RecordAccumulator, 完成消息发送。

其中RecordAccumulator, 按照partion完成消息合并, 将消息发送单位从逐条发送, 转变为按批发送, 从而提高消息发送效率。

Sender则将每个partion的消息转换为面向每个Node的请求, 毕竟partion是个逻辑概念, node才是物理存在的。

在整个发送过程中, producer需要知道cluster对应的metadata, 例如node/partion对应关系等, 从而及时调整目标Node。这里也涉及metadata更新等问题。这里仅做简要说明, 后续文章中做进一步阐述。

总结

本文介绍了Kafka Producer发送数据中涉及的线程和各自的职责,重点介绍了与应用直接相关的Interceptor, Serializer和Partitioner。希望能帮助你初步认识Kafka, 感谢你的阅读。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1198853.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

什么是数据库事务、事务的ACID、怎么设置/禁止自动提交?

数据库事务及ACID 数据库事务是指作为单个逻辑工作单元执行的一组操作。这组操作要么全部成功地执行&#xff0c;要么全部不执行&#xff0c;不允许出现部分执行的情况。数据库事务通常需要满足ACID属性&#xff0c;即原子性&#xff08;Atomicity&#xff09;、一致性&#x…

建行驻江门市分行纪检组以政治谈话压责任促发展

开展政治谈话&#xff0c;是加强“一把手”和领导班子监督、严肃党内政治生活、加强对党员领导干部日常教育管理的有效手段。 为督促“一把手”和领导班子成员依法依规履行职责、行使权力&#xff0c;推动党中央重大决策部署以及建设银行总行、广东省分行党委的决策部署在本单…

SpringBoot学习(黑马程序员day12)

1jwt令牌 JWT的组成&#xff1a; &#xff08;JWT令牌由三个部分组成&#xff0c;三个部分之间使用英文的点来分割&#xff09; 第一部分&#xff1a;Header(头&#xff09;&#xff0c; 记录令牌类型、签名算法等。 例如&#xff1a; {"alg":"HS256",&qu…

Linux各种版本安装详细步骤和root密码破解

文章目录 VMware新建虚拟机硬件设置设置虚拟网络挂载ISO文件 root密码破解 VMware新建虚拟机 硬件设置 设置虚拟网络 编辑>虚拟网络编辑器>VMnet8(NAT模式) 挂载ISO文件 加电>开启次虚拟机 第二项可以检查挂载上来的iso文件是否完整没有破坏 磁盘分区 选自定义分…

Linux应用开发基础知识——字符文字编码(五)

前言&#xff1a; TXT 文件中保存的是字符的核心&#xff1a;它的编码值。而 Notepad 上显示时&#xff0c; 这些字符对应什么样的形状态&#xff0c;这是由字符文件决定的。编码值&#xff0c;字体是两个不一样的东西&#xff0c;比如 A 的编码值是 0x41&#xff0c;但是在屏幕…

用excel计算一个矩阵的转置矩阵

假设我们的原矩阵是一个3*3的矩阵&#xff1a; 125346789 现在求它的转置矩阵&#xff1a; 鼠标点到一个空白的地方&#xff0c;用来存放结果&#xff1a; 插入-》函数&#xff1a; 选择TRANSPOSE&#xff0c;这个就是求转置矩阵的函数&#xff1a; 点击“继续”&#xff1a…

SparkSQL之Rule体系

在Unresolved LogicalPlan逻辑算子树的操作&#xff08;如绑定、解析、优化等&#xff09;中&#xff0c;主要方法都是基于规则&#xff08;Rule&#xff09;的&#xff0c;通过Scala语言模式匹配机制&#xff08;Pattern-match&#xff09;进行树结构的转换或节点改写。Rule是…

TCP协议(建议收藏)

1. TCP特点 有连接&#xff1a;需要双方建立连接才能通信&#xff0c;在socket编程中服务端new ServerSocket(port)需要绑定端口&#xff0c;在客服端new Socket(serverIp, serverPort)与服务端建立连接可靠传输&#xff1a;确认应答机制&#xff0c;超时重传机制面向字节流&a…

Python开源项目PGDiff——人脸重建(Face Restoration),模糊清晰、划痕修复及黑白上色的实践

python ansconda 等的下载、安装等请参阅&#xff1a; Python开源项目CodeFormer——人脸重建&#xff08;Face Restoration&#xff09;&#xff0c;模糊清晰、划痕修复及黑白上色的实践https://blog.csdn.net/beijinghorn/article/details/134334021 友情提示&#xff1a; …

vue 使用js new Map()优化多个if else 执行方法

前言 在实际开发中根据业务需求我们经常要判断情况&#xff0c;一个if 我们科技直接使用ES6就可以解决 经常会出现根据不同的条件执行不同的方法&#xff0c;这是就会有多个if else 看起不太美观也费劲 js new map &#xff08;&#xff09;就可以解决这个问题&#xff0c;它…

建行广东省江门市分行走进农村地区开展反假货币宣传

人民对美好生活的向往&#xff0c;涉及方方面面&#xff0c;小至“钱袋子”安全。建行广东省江门市分行落实当地监管部门部署&#xff0c;积极扛起维护国家金融安全的重要政治责任&#xff0c;深入农村地区开展反假货币宣传工作&#xff0c;助力构建农村反假货币工作长效机制。…

notes_质谱蛋白组学数据分析基础知识

目录 1. 蛋白组学方法学1.1 液相-质谱法1) 基本原理2) bottom-up策略的基本流程 1.2 PEA/Olink 2. 质谱数据分析2.1 原始数据格式2.2 分析过程1&#xff09;鉴定2&#xff09;定量3&#xff09;预处理 2.3 下游分析 参考附录 1. 蛋白组学方法学 目前常见的蛋白组学方法学如下图…

【C/C++笔试练习】内联函数、函数重载、调用构造函数的次数、赋值运算符重载、静态成员函数、析构函数、模板定义、最近公共祖先、求最大连续bit数

文章目录 C/C笔试练习选择部分&#xff08;1&#xff09;内联函数&#xff08;2&#xff09;函数重载&#xff08;3&#xff09;调用构造函数的次数&#xff08;4&#xff09;赋值运算符重载&#xff08;5&#xff09;静态成员函数&#xff08;6&#xff09;调用构造函数的次数…

一句话讲明白buck和boost电源电路

大部分教程就是垃圾 虽然buck和boost结构上很像&#xff0c;但是是两个原理完全不一样的东西 BUCK&#xff08;降压&#xff09;电源 buck就是把方波&#xff0c;用LC滤波器后&#xff0c;变成正弦波 滤波&#xff1a;就是让电压缓慢增加&#xff0c;缓慢减少。&#xff08…

【SoC基础】DMA的工作原理

&#x1f4e2;&#xff1a;如果你也对机器人、人工智能感兴趣&#xff0c;看来我们志同道合✨ &#x1f4e2;&#xff1a;不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 &#x1f4e2;&#xff1a;文章若有幸对你有帮助&#xff0c;可点赞 &#x1f44d;…

19.删除链表的倒数第N个结点(LeetCode)

想法一 先用tail指针找尾&#xff0c;计算出节点个数&#xff0c;再根据倒数第N个指定删除 想法二 根据进阶的要求&#xff0c;只能遍历一遍链表&#xff0c;那刚刚想法一就做不到 首先&#xff0c;我们要在一遍内找到倒数第N个节点&#xff0c;所以我们设置slow和fast两个指…

02. Python基础数据类型

1、前言 前面我们介绍了认识了Python以及Python的基础环境搭建&#xff0c;今天我们介绍下Python的一些基础语法。 2、Python基础 2.1、输入输出 2.1.1、输出 print() 用于输出指定的文字&#xff0c;括号中的为输出的字符串。print()也可以同时接收多个字符串&#xff0c;…

ESP32网络开发实例-将数据保存到InfluxDB时序数据库

将数据保存到InfluxDB时序数据库 文章目录 将数据保存到InfluxDB时序数据库1、InfluxDB介绍与安装3、软件准备4、硬件准备5、代码实现6、InfluxDB数据可视化在本文中,将介绍 InfluxDB 以及如何将其与 ESP32 开发板一起使用。 我们将向展示如何创建数据库桶并将 ESP32 数据发送…

Jupyter notebook 无法链接内核、运行代码

问题来源 今天想在 vscode 上使用 Jupyter notebook 跑 Python 代码&#xff0c;但无法使用&#xff0c;提示要升级内核。 Running cells with base requires the ipykernel package to be installed or requires an update. 其实这个问题存在好一段时间了&#xff0c;不过之前…

【教3妹学编程-算法题】Range 模块

3妹&#xff1a;哈哈哈哈哈哈哈哈 2哥 : 3妹看什么呢&#xff0c;笑的这么开森 3妹&#xff1a;2哥你快来看啊&#xff0c;成都欢乐谷的NPC模仿“唐僧”&#xff0c; 太搞笑了。 2哥 : 哦这个我也看到了&#xff0c;真的是唯妙唯肖&#xff0c;不能说像&#xff0c;只能说一模一…