【kafka】十四、kafka生产者API

news2025/1/24 11:42:28

kafka Producer API

1.消息发送流程

kafka的producer发送消息采用的是异步发送的方式。在消息的发送过程中,涉及到了两个线程–main线程和sender线程,以及一个线程共享变量–RecordAccumulator。main线程将消息发送给RecordAccumulator,sender线程不断从RecordAccumulator中拉取消息发送到kafka broker。

image-20220221220852427

相关参数:

batch.size:只有数据积累到batch.size之后,sender才会发送数据

linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms之后就会发送数据

2.异步发送API

依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

服务类

KafkaProducer:创建一个生产者对象,用来发送数据

ProducerConfig:获取kafka需要的一系列配置参数

ProducerRecord:每条数据都要封装成一个ProducerRecord对象

2.1 不调用回到函数
public class MyProducer {

    public static void main(String[] args) throws InterruptedException {
        //1.创建kafka生产者的配置信息
        Properties properties = new Properties();
        //2.指定连接的kafka集群
        properties.put("bootstrap.servers", "hll1:9092");
        //3.ack机制
        properties.put("acks", "all");
        //4.重试次数
        properties.put("retries", 3);
        //5.批次大小,16384=16k
        properties.put("batch.size", 16384);
        //6.等待时间,时间到了之后会发送数据
        properties.put("linger.ms", 1);
        //7.RecordAccumulator缓冲区大小,33554432=32M
        properties.put("buffer.memory", 33554432);
        //8.key value的序列化类
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //9.创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        //10.发送数据
        //topic信息写在ProducerRecord
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("bigdata", "hll---" + i));
        }
        //11.关闭连接,如果不关闭连接,消费者不会接收到消息
        producer.close();
        //或者可以暂停线程,达到"linger.ms"的配置要求也可以完成消费
        //Thread.sleep(1000);
    }
}

启动一个消费者后再运行代码,可以看到成功消费通过代码生成的消息

image-20220226225832308

2.2 调用回调函数
public class CallbackProducer {

    public static void main(String[] args) {
        //创建kafka配置信息
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hll1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");


        //创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        //发送数据
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("bigdata", "hll::" + i),
                    (metadata, exception) -> {
                        if (exception == null) {
                            System.out.println(metadata.partition() + "::" + metadata.offset());
                        }
                    });
        }

        //关闭资源
        kafkaProducer.close();
    }
}

image-20220228232238995

从打印的结果来看,消息被平均的分配到了两个分区(当前测试的主题只有两个分区)

3.自定义分区

public class MyPartitioner implements Partitioner {

    /**
     * 分区选取
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //默认分区获取方法
        //new DefaultPartitioner().partition()

        //可以实现自己分区策略,返回的需要是可用的分区的
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
public class PartitionerProducer {

    public static void main(String[] args) {
        //kafka配置文件
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hll1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        //自定义分区加载器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.hll.partitioner.MyPartitioner");

        //创建kafka生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        //发送数据
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("bigdata", "hll::" + i),
                    (metadata, exception) -> {
                        //如果exception为null,说明消息发送成功没有异常
                        if (exception == null) {
                            System.out.println(metadata.partition() + "==" + metadata.offset());
                        } else {
                            exception.printStackTrace();
                        }
                    });
        }

        //关闭连接
        producer.close();
    }
}

image-20220228233550702

4.同步发送API

同步发送的意思是,一条消息发送之后,会阻塞当前线程,直到返回ack.

由于send方法返回的是一个Future对象,根据Future对象的特点,我们也可以实现同步发送的效果,只需要调用Future对象的get()方法即可。

//调用get,阻塞线程,同步发送
producer.send(new ProducerRecord<String, String>("bigdata", "hll---" + i)).get();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/40147.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

单向环形链表介绍以及约瑟夫问题分析

❤️一名热爱Java的大一学生&#xff0c;希望与各位大佬共同学习进步❤️ &#x1f9d1;个人主页&#xff1a;周小末天天开心 各位大佬的点赞&#x1f44d; 收藏⭐ 关注✅&#xff0c;是本人学习的最大动力 感谢&#xff01; &#x1f4d5;该篇文章收录专栏—数据结构 目录 单…

不敲代码就能搭建个人博客?快解析内网穿透来助力

记得很多年前看到一句话&#xff0c;“博客是一个人的狂欢”。无论是享受搭建的过程&#xff0c;还是享受创作的乐趣&#xff0c;更多时候博客是在取悦自己。那么&#xff0c;在2022年的今天&#xff0c;搭建个人博客还有意义吗&#xff1f;答案是肯定的&#xff0c;当我们在搜…

Day4: 应用篇-1

应用篇-1 环境安装 应用开发交叉编译环境&#xff0c; 【正点原子】I.MX6U嵌入式Linux驱动开发指南V1.7.pdf 章节4.3.1 在 Ubuntu 中创建目录&#xff1a;/usr/local/arm&#xff0c;命令如下&#xff1a; sudo mkdir /usr/local/arm令将交叉编译器复制到/usr/local/arm 中…

Arduino开发实例-DIY分贝测量仪

DIY分贝测量仪 1、应用介绍 分贝计,它通常用于测量声音的强度和水平。 声音响度是用分贝来衡量的。 从飞机到人类耳语的不同发声介质都有一定的声音响度,以分贝表示。 声波是具有来回运动的纵波,给出高音或低音,如图所示: 声音的响度取决于频率或波长或传播所需的时间。…

APK构建过程-命令行编译

官方对APK构建过程的介绍 官方 - 构建流程介绍 典型 Android 应用模块的构建流程&#xff0c;按照以下常规步骤执行&#xff1a; 编译器将您的源代码转换成 DEX 文件&#xff08;Dalvik 可执行文件&#xff0c;其中包括在 Android 设备上运行的字节码&#xff09;&#xff0c;…

【强化学习论文合集】专栏介绍(订阅前必读)

导读&#xff1a;什么是强化学习&#xff1f; 强化学习&#xff08;Reinforcement Learning, RL&#xff09;&#xff0c;又称再励学习、评价学习或增强学习&#xff0c;是机器学习的范式和方法论之一&#xff0c;用于描述和解决智能体&#xff08;agent&#xff09;在与环境的…

AtCoder Beginner Contest 263 G.Erasing Prime Pairs(二分图最大匹配-网络流)

题目 黑板上有n(n<100)个不同的数&#xff0c;第i个数ai(1<ai<1e7)出现了bi(1<1e9)次&#xff0c; 你每次可以选择当前黑板上存在的两个数x、y&#xff0c;满足xy是质数&#xff0c;擦掉这两个数&#xff0c; 求可以擦掉的最大次数 思路来源 AtCoder Beginner…

[LeetCode周赛复盘] 第 321 场周赛20221127

[LeetCode周赛复盘] 第 321 场周赛20221127 一、本周周赛总结二、 [Easy] 6245. 找出中枢整数1. 题目描述2. 思路分析3. 代码实现三、[Medium]6246. 追加字符以获得子序列1. 题目描述2. 思路分析3. 代码实现四、[Medium] 6247. 从链表中移除节点1. 题目描述2. 思路分析3. 代码实…

Docker-compose详解和LNMP搭建实战

目录 一、Docker-compose简介 1.前言 2.概述 二、Docker-compose安装 三、YAML文件格式及编写注意事项 1.简介 2.使用方法 四、Docker Compose 常用命令 五、Docker Compose 配置常用字段 六、Docker-compose搭建LNMP实战 一、Docker-compose简介 1.前言 我们知道使…

深度学习与总结JVM专辑(三):垃圾回收器—G1(图文+代码)

垃圾收集器G1前言概述停顿时间模型内存布局传统内存布局过时了G1实现的几个关键细节问题铺垫知识&#xff1a;跨代引用铺垫知识&#xff1a;记忆集&#xff0c;卡表&#xff0c;卡页铺垫知识&#xff1a;写屏障插眼往下看G1内存模型分区Region卡片Card堆Heap分代模型分代垃圾收…

网站分享:7个非常好用的电子书网站

❤️作者主页&#xff1a;IT技术分享社区 ❤️作者简介&#xff1a;大家好,我是IT技术分享社区的博主&#xff0c;从事C#、Java开发九年&#xff0c;对数据库、 C#、 Java、前端、运维、电脑技巧等经验丰富。 ❤️个人荣誉&#xff1a; 数据库领域优质创作者&#x1f3c6;&…

Steam下载MOD至本地文件夹

Steam下载MOD至本地文件夹1 所需app和web2 具体步骤2.1 安装SteamCMD2.2 登录SteamCMD2.3 打开网页端的Steam并搜索你想要的MOD2.4 点击你需要的MOD&#xff0c;并复制链接2.5 将链接放入 https://steamworkshopdownloader.io/2.6 把下载代码放入SteamCMD由于各种原因&#xff…

OpenVINO--初步学习笔记

英特尔发布的针对AI工作负载的一款部署神器当模型训练结束后&#xff0c;上线部署时&#xff0c;就会遇到各种问题&#xff0c;比如&#xff0c;模型性能是否满足线上要求&#xff0c;模型如何嵌入到原有工程系统&#xff0c;推理线程的并发路数是否满足&#xff0c;这些问题决…

java项目_第164期ssm定西扶贫惠农推介系统-_java毕业设计_计算机毕业设计

java项目_第164期ssm定西扶贫惠农推介系统-_java毕业设计_计算机毕业设计 【源码请到资源专栏下载】 今天分享的项目是《ssm定西扶贫惠农推介系统》 该项目分为2个角色&#xff0c;管理员和用户。 用户可以浏览前台,包含功能有&#xff1a; 首页、扶贫计划、惠农福利、优秀农民…

【C++】vector的介绍和使用

​&#x1f320; 作者&#xff1a;阿亮joy. &#x1f386;专栏&#xff1a;《吃透西嘎嘎》 &#x1f387; 座右铭&#xff1a;每个优秀的人都有一段沉默的时光&#xff0c;那段时光是付出了很多努力却得不到结果的日子&#xff0c;我们把它叫做扎根 目录&#x1f449;vector 的…

【课设/毕业设计】电力系统潮流计算(Matlab代码实现)

&#x1f468;‍&#x1f393;个人主页&#xff1a;研学社的博客 &#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜…

相控阵天线(九):平面阵列天线综合(不可分离型切比雪夫分布、圆口径泰勒综合、可分离型分布)

目录简介不可分离型分布不可分离型切比雪夫圆口径泰勒综合可分离型分布可分离切比雪夫综合可分离泰勒综合简介 按行、列排列的可分离型矩形平面阵&#xff0c;其阵因子是两个正交排列的直线阵阵因子的乘积。可分离的平面阵方向图在两个主面内是满足预期副瓣电平的&#xff0c;…

SpringMVC学习笔记(一)

目录 一、什么是SpringMVC ? 二、Spring MVC项目的连接(用户 和 程序 的 映射) 三、获取参数的功能的实现 传递较少数量的参数&#xff1a; 通过对象传递大量参数&#xff1a; 获取表单参数&#xff1a; 获取Json对象 上传文件&#xff1a; &#x1f514;一点补充 一、…

家用 NAS 服务器搭建 | 前篇

1、前言 最近一段时间都在折腾家用 NAS 服务器&#xff0c;NAS 系统从最开始选择安装开源的 OMV&#xff08;OpenMediaVault&#xff09;、万由U-NAS&#xff0c;最终决定使用黑群晖。硬件也是一步步从旧笔记本、拆旧笔记本改nas样式、最终也是到万由410机箱。 家庭nas服务器可…

unity rtsp 视频渲染(一)

unity unity 可以说是一个不错的工具&#xff0c;建立三维的场景非常方便&#xff0c;下面我们建立一个三维的场景&#xff0c;并且在三维的场景中和场景外分别建立系统去播放视频。所谓场景内就是在三维中播放视频&#xff0c;场景外就是在三维场景前表面的二维平面中播放视频…