Kafka(二)Producer第一篇

news2025/1/14 18:44:24

一,Client开发


生产逻辑需要具备以下几个 步骤:
(1)配置生产者客户端参数及创建相应的生产者实例。
(2)构建待发送的消息。
(3)发送消息。
(4)关闭生产者实例。
public class Producer {
    private static final String BROKER_LIST = "localhost:9092";
    private static final String TOPIC = "TOPIC-A";
    public static Properties initConfig() {
        Properties properties = new Properties();
        // 以下3个必须
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
       
        // 客户端ID
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "eris-kafka-producer");
        // 重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 成功收到消息分区副本数,默认为1,即leader副本收到就返回成功。注意是字符串!
        properties.put(ProducerConfig.ACKS_CONFIG, "1");
        // 生产者客户端能发送的消息的最大值,单位为B,默认1M
        properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,  1048576);
        // Producer等待请求响应的最长时间,单位ms,默认值为30000
        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,  30000);
        // 生产者发送ProducerBatch之前等待更多ProducerRecord加入的时间。默认为0,ProducerBatch被填满时发出
        properties.put(ProducerConfig.LINGER_MS_CONFIG,  0);
        return properties;
    }

    public static ProducerRecord<String, String> initMessage() {
        return new ProducerRecord<>(TOPIC, "hi eris");
    }

    public static void main(String[] args) {
        Properties properties = initConfig();
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord record = initMessage();
        kafkaProducer.send(record);
        kafkaProducer.close();
    }
}
注意:KafkaProducer,它是线程安全的,我们可以在多线程的环境中复用它。
消费者客户端KafkaConsumer是非线程安全的。
1,发送消息的构造
消息对象是ProducerRecord,业务信息是value字段。
  • key可以让消息再二次归类,同一个key的消息会被划分到同一个分区中;
  • topic属性和value属性是必填项,其余属性是选填项;
2,主要Producer参数
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG并非需要所有的broker地址,生产者会从给定的broker里查找到其他broker的信息,一般至少要2个。
3, 消息的发送
3-1,消息发送的3种模式:发后即忘、同步及异步
A,发后即忘:
kafkaProducer.send(record);
B,同步:
kafkaProducer.send(record).get();
C,异步:
  • 不推荐手动用future实现业务异步,诸多消息对应的Future对象的处理会引起逻辑的混乱。
  • 对于同一个分区,如果消息record1于record2之前先发送,那么KafkaProducer可以保证对应的callback1在callback2之前调用,也就是说 ,回调函数的调用也可以保证分区有序。
3-2,KafkaProducer中一般会发生两种类型的异常:可重试的异常和不可重试的异常。
对于可重试异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常;不可重试异常会直接抛出异常。
3-3,close方法会回收资源,并且阻塞等待之前所有的发送请求完成后再关闭 KafkaProducer,可带timeout。
3-4,消息在通过send发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)才能被真正地发往 broker。
4,序列化
生产者需要用序列化器(Serializer) 把对象转换成字节数组才能通过网络发送给Kafka。
如果 Kafka 客户端提供的几种序列化器都无法满足需求,则可以选择使用如Avro、JSON、Thrift、ProtoBuf和Protostuff等通用的序列化工具来实现,或者使用 自定义的序列化器来实现
5,分区器
如果ProducerRecord中指定了partition字段,那么就不需要分区器的作用。
反之若没有指定partition字段,那么就需要依赖分区器, 根据key这个字段来计算partition的值。
  • 如果key不为null,计算得到的分区会是所有分区中的任意一个;
  • 如果key为null,计算得到的分区号仅为可用分区中的任意一个;
一旦主题中增加了分区,那么就难以保证key与分区之间的映射关系了。
可以自定义分区器。
6,拦截器
自定义拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口,接口中包含3个方法:
  • onSend:将消息序列化和计算分区之前调用;
  • onAcknowledgement:消息被应答(ack)之前或消息发送失败时调用,优先于用户设定的 Callback之前执行;
  • interceptor.classes配置拦截器链(逗号分隔),拦截链会按照参数配置的顺序一一执行。如果某个拦截器执行失败,下一个拦截器会接 着上一个执行成功的拦截器继续执行。

二,重要参数


1.acks
用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。
  • ack=1(默认),leader写入(落盘)即成功;
  • ack=0,发送后就认为成功,不需要等待任何服务端的响应;
  • ack=-1或者ack=“all”,需要ISR中的所有副本都成功写入;
2,retries
生产者重试的次数,默认值为0,即不重试。
保证消息顺序
max.in.flight.requests.per.connection
该参数指定了生产者在收到服务器响应之前可以发送多少个消息(即最多缓存的请求数)。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
如果把 retries 设为非零整数,同时把 max.in.flight.requests.per.connection 设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,broker 会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了
一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把 retries 设为 0。可以把 max.in.flight.requests.per.connection 设为 1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给 broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。
3,linger.ms
这个参数用来指定生产者发送ProducerBatch之前等待更多ProducerRecord加入ProducerBatch的时间,默认值为0,即生产者会在 ProducerBatch 被填满时发送出去。
其他:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1906275.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

材料科学SCI期刊,IF=6+,2个月录用,审稿速度非常快

一、期刊名称 Journal of Materials Research and Technology 二、期刊简介概况 期刊类型&#xff1a;SCI 学科领域&#xff1a;材料科学 影响因子&#xff1a;6.2 中科院分区&#xff1a;2区 三、期刊简介 《材料研究与技术杂志》为发表与材料加工、性能和性能相关的理论…

蚓链实践告诉你“企业确保达成数字化营销效果的方法”

在如今这个数字化盛行的时代&#xff0c;企业想在激烈的市场竞争里崭露头角&#xff0c;确保数字营销效果那可是至关重要&#xff01;今天就来给大家聊聊实现这一目标的基本条件&#xff0c;来自蚓链数字化营销系统的广大用户体验总结。 一、精准的目标定位 企业一定要清楚地知…

Java 操作 Redis客户端

目录 1.渐进式遍历 2.Java 操作 Redis 客户端 2.1 引入依赖 2.2 配置端口转发 2.3 连接Redis Server 3.基础操作 3.1 set 和 get 3.2 exists 和 del 3.3 keys 3.4 expire 和 ttl 3.5 type 4.字符串操作 4.1 mget 和 mset 4.2 append 4.3 getrange 和 setrange 4.4 incr 和 d…

如何大幅减少 Vue.js 中的包大小和加载时间,提升用户体验!

大家好,我是CodeQi! 一位热衷于技术分享的码仔。 你知道吗,根据Google 的一项研究,如果网站加载时间超过 3 秒,53% 的移动用户会离开该网站? 性能优化是一个经常讨论的话题,但很多开发人员并不关心提高应用的速度。 在前端开发中,优化包大小和加载时间对于提升用户体…

数据结构--二叉树相关题2(OJ)

1.比较对称二叉树&#xff08;镜像二叉树&#xff09; 二叉树相关题1中第二题的变形题。先去看1哦&#xff01; 左子树和右子树比较 bool _isSymmetric(struct TreeNode* p, struct TreeNode* q) {if (p NULL && q NULL)return true;//如果两个都为空则是相等的if …

【Arduino】XIAOFEIYU(TM)实验ESP32使用霍尔传感器(图文)

霍尔传感器是一种可以测量磁力变化的传感器&#xff0c;今天XIAOFEIYU就来测试一下ESP32使用霍尔传感器。 霍尔传感器&#xff1a;正负极加一个数据接口。 将传感器与ESP32进行电路连接&#xff1a; 编写程序&#xff1a; #define SIGNAL_PIN 33int value 0; // 存储传感…

【Spring Boot】关系映射开发(二):一对多映射

《JPA 从入门到精通》系列包含以下文章&#xff1a; Java 持久层 API&#xff1a;JPA认识 JPA 的接口JPA 的查询方式基于 JPA 开发的文章管理系统&#xff08;CRUD&#xff09;关系映射开发&#xff08;一&#xff09;&#xff1a;一对一映射关系映射开发&#xff08;二&#…

如何在Spring Boot中实现分布式任务调度?

文章目录 引言一、分布式任务调度的基本原理二、Spring Boot与分布式任务调度1. 使用Quartz实现分布式任务调度2. 使用Elastic-Job实现分布式任务调度 三、常见问题与解决方案结论 &#x1f389;欢迎来到SpringBoot框架学习专栏~ ☆* o(≧▽≦)o *☆嗨~我是IT陈寒&#x1f379;…

食品行业制氮机的应用范围解析

在食品行业中&#xff0c;保障食品的品质和安全性是每一个企业所追求的核心目标。制氮机作为一种重要的辅助设备&#xff0c;其在食品行业中的作用不容忽视。 一、保障食品质量与安全性 制氮机通过物理方法从空气中分离出高纯度氮气&#xff0c;为食品提供了一个无氧环境。这一…

C++模板元编程(二)——完美转发

完美转发指的是函数模板可以将自己的参数“完美”地转发给内部调用的其它函数。所谓完美&#xff0c;即不仅能准确地转发参数的值&#xff0c;还能保证被转发参数的左、右值属性不变。 文章目录 场景旧的方法新的方法内部实现参考文献 场景 思考下面的代码&#xff1a; templ…

哈喽GPT-4o,程序员如何通过GPT-4o提高工作效率

目录 一、编写代码Prompt&#xff1a;请用Java语言编写一个二分查找的样例 二、修正代码错误、代码优化Prompt&#xff1a;我们上传一张华为OD算法题的题目描述&#xff0c;再给它我的Java解题代码&#xff0c;问问它有什么问题&#xff1f; 三、解读代码功能、代码翻译Prompt&…

Qt 网络编程 网络信息获取操作

学习目标&#xff1a;网络信息获取操作 前置环境 运行环境:qt creator 4.12 学习内容 一、Qt 网络编程基础 Qt 直接提供了网络编程模块,包括基于 TCP/IP 的客户端和服务器相关类,如 QTcpSocket/QTcpServer 和 QUdpSocket,以及实现 HTTP、FTP 等协议的高级类,如 QNetworkRe…

flask缓存、信号的使用

【 一 】flask-ache ​ 它为 Flask 应用程序提供了缓存支持。缓存是 Web 应用程序中非常常见的做法&#xff0c;用于存储频繁访问但不太可能经常更改的数据&#xff0c;以减少对数据库或其他慢速存储系统的访问&#xff0c;从而提高应用程序的性能和响应速度。 ​ Flask-Cach…

程序员必知的 89 个操作系统核心概念

1. 操作系统&#xff08;Operating System&#xff0c;OS&#xff09;&#xff1a;是管理计算机硬件与软件资源的系统软件&#xff0c;同时也是计算机系统的内核与基石。操作系统需要处理管理与配置内存、决定系统资源供需的优先次序、控制输入与输出设备、操作网络与管理文件系…

Stable Diffusion 【模型推荐】没有最强,只有更强!高清画质!电影光效版SD1.5人像摄影大模型《他和她 2》

今天带来了一款SD1.5大模型——《他和她 2》电影光效版SD1.5人像摄影大模型。该模型经过家叔马丁Mr_M大佬的优化升级后&#xff0c;把SD1.5的影像光效推上了全新的高度&#xff01;根据大佬的描述&#xff0c;该模型具有更强大的细节表现&#xff0c;更细腻的表面肌理&#xff…

揭秘SmartEDA:电路仿真软件如何贯穿课前课中课后,助力电子学习新纪元!

在电子设计与自动化的学习道路上&#xff0c;一款强大的电路仿真软件往往能为学生们带来事半功倍的效果。今天&#xff0c;我们就来深入探讨一下SmartEDA这款电路仿真软件在课前、课中、课后的全方位应用&#xff0c;看看它如何助力我们的电子学习步入新纪元&#xff01; 1、课…

水果商城系统 SpringBoot+Vue

1、技术栈 技术栈&#xff1a;SpringBootVueMybatis等使用环境&#xff1a;Windows10 谷歌浏览器开发环境&#xff1a;jdk1.8 Maven mysql Idea 数据库仅供学习参考 【已经答辩过的毕业设计】 项目源码地址 2、功能划分 3、效果演示

下载安装JavaFX及解决报错:缺少 JavaFX 运行时组件, 需要使用该组件来运行此应用程序|Eclipse

目录 1.下载并解压 2.Eclipse配置 3.报错问题 解决方法1&#xff1a;将javaSE更改到9以下 解决方法2&#xff1a; 使用module-info.java配置解决 1.下载并解压 JavaFX下载地址&#xff1a;JavaFX - Gluon 选择合适自己电脑配置的sdk版本下载 打不开网页的参考这个博客&…

泛微开发修炼之旅--35关于基于页面扩展和自定义按钮实现与后端交互调用的方法

文章链接&#xff1a;35关于基于页面扩展和自定义按钮实现与后端交互调用的方法

【手写数据库内核组件】0201 哈希表hashtable的实战演练,多种非加密算法,hash桶的冲突处理,查找插入删除操作的代码实现

hash表原理与实战 ​专栏内容&#xff1a; postgresql使用入门基础手写数据库toadb并发编程 个人主页&#xff1a;我的主页 管理社区&#xff1a;开源数据库 座右铭&#xff1a;天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物. 文章目录 hash表…