Kafka原生API使用Java代码-生产者-异步发送消息回调

news2024/11/15 9:01:00

文章目录

  • 1、异步发送消息&回调
    • 1.1、pom.xml
    • 1.2、KafkaProducer1.java

1、异步发送消息&回调

回调就是接收kafka的响应

1.1、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- Generated by https://start.springboot.io -->
    <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
    <groupId>com.atguigu.kafka</groupId>
    <artifactId>kafka-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-producer</name>
    <description>kafka-producer</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

1.2、KafkaProducer1.java

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducer1 {

    /**
     * 主函数用于演示如何向Kafka的特定主题发送消息。
     *
     * @param args 命令行参数(未使用)
     */
    public static void main(String[] args) {
        // 初始化Kafka生产者配置
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口
        props.put("acks", "all"); // 确认消息写入策略
        props.put("retries", 0); // 消息发送失败时的重试次数
        props.put("linger.ms", 1); // 发送缓冲区等待时间
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器

        // 创建Kafka生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息到主题"my_topic3"
        // 异步发送消息:不接收kafka的响应
        //producer.send(new ProducerRecord<String, String>("my_topic3",  "hello,1,2,3"));

        // 注释掉的循环代码块展示了如何批量发送消息
        //for (int i = 0; i < 100; i++)
        //    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));


        producer.send(new ProducerRecord<String, String>("my_topic3",  "hello_3"),new Callback() {

            //消息发送成功,kafka broker ack 以后回调
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                //exception:如果有异常代表消息未能正常发送到kafka,没有异常代表消息发送成功:
                //此时kafka的消息不一定持久化成功(需要kafka生产者加配置)
                //RecordMetadata代表发送成功的消息的元数据
                System.out.println("recordMetadata.topic() = " + recordMetadata.topic());
                System.out.println("recordMetadata.partition() = " + recordMetadata.partition());
                System.out.println("msg offset = " + recordMetadata.offset());
                System.out.println("recordMetadata.timestamp() = " + recordMetadata.timestamp());

            }
        });
        // 关闭生产者实例
        producer.close();
    }
}
recordMetadata.topic() = my_topic3
recordMetadata.partition() = 1
msg offset = 1
recordMetadata.timestamp() = 1716976982243

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1714356.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

创建QT项目后只有一个pro文件(已解决) QT Creator

1. 问题回顾 工程仅有xx.pro文件 工程文件夹里面却有文件 这种情况并非创建工程失败&#xff0c;而是没有配置好文件 界面左下角 2. 解决方法 点击项目模式 配置不了 有点生气。。卸载了 重新安装&#xff0c;这次选择了多个kit勾选 回到项目模式&#xff0c;点进去 发现这…

短视频矩阵系统源码---开发BS架构B/S(Browser/Server Architecture)架构

短视频矩阵系统源头开发------- 第一款叫做筷子科技&#xff0c;这个筷子科技剪辑和发布都是没有问题的&#xff0c;但是前一段时间他的剪辑发个公告&#xff0c;每个账号只能发两条&#xff0c;另外它的唯一缺点就是它成本比较高的&#xff0c;入门门槛应该在12800左右&#…

视频汇聚EasyCVR视频监控平台GA/T 1400协议特点及应用领域解析

GA/T 1400协议&#xff0c;也被称为视图库标准&#xff0c;全称为《公安视频图像信息应用系统》。这一标准在公安系统中具有举足轻重的地位&#xff0c;它详细规定了公安视频图像信息应用系统的设计原则、系统结构、视频图像信息对象、统一标识编码、系统功能、系统性能、接口协…

大规模服务治理中etcd的实践与深度应用

导读&#xff1a;随着企业对于服务治理的日益重视&#xff0c;特别是在云原生和微服务架构的广泛应用下&#xff0c;百度小程序团队基于大模型服务治理的实战经验&#xff0c;结合分布式开源KV产品etcd&#xff0c;分享了其核心技术Raft与boltdb的实现原理&#xff0c;并深入剖…

文件上传漏洞简介

目录 漏洞原理 漏洞危害 利用场景 检测方法 防御方法 绕过手段 前端JS绕过 构造可解析后缀 修改Content-Type&#xff08;MIME&#xff09; 大小写绕过 文件头绕过 图片马 截断与特殊文件名 其他绕过 尝试绕过的步骤 漏洞原理 原理 攻击者构造恶意文件进行上传…

RTPS协议之Behavior Module

目录 交互要求基本要求RTPS Writer 行为RTPS Reader行为 RTPS协议的实现与Reader匹配的Writer的行为涉及到的类型RTPS Writer实现RTPS WriterRTPS StatelessWriterRTPS ReaderLocatorRTPS StatefulWriterRTPS ReaderProxyRTPS ChangeForReader RTPS StatelessWriter BehaviorBe…

keil4和5版本代码编译错误问题

需求: 在工作中, 遇到了keil4工程的老代码, 需要烧录到板子中. 问题: 电脑中只有keil5软件, 使用keil5软件打开, 编译后报了一堆错, 还是官方库文件的错误, 这就是版本不兼容了. 解决方法: 下载keil4软件, 不要和keil5放到一起. 进行如下操作. 0. 根据如下链接来下载keil4.7…

拥塞控制的微观行为与力学解释

本文以 tcptrace 图为基&#xff0c;描述传输的微观行为&#xff0c;并给出一个初中几何描述的压水井模型。 统计复用网络的拥塞控制&#xff0c;宏观看 inflight&#xff0c;微观看 pacing rate&#xff0c;宏观大方向不对&#xff0c;微观再正确也不行。 而网络的统计动力学…

推荐丨一键申请SSL证书,让网站实现HTTPS访问!

申请HTTPS证书可以简化为以下几个直接步骤&#xff0c;以便您能快速理解和操作&#xff1a; 1. 确定证书类型&#xff1a; - 单域名证书&#xff1a;适用于一个特定域名。 - 通配符证书&#xff1a;适用于同一主域名下的所有子域名。 - 多域名证书&#xff1a;覆盖多个不同的域…

polarctf靶场[reverse]shell、PE结构、拼接

[reverse]shell 考点&#xff1a;脱壳 将所解压的文件拖入DIE有无有壳&#xff0c;文件类型 发现有UPX壳&#xff0c;是32位的文件&#xff0c;先脱壳 用FFI工具脱壳 将脱壳后的程序用32位IDA进行反汇编 点开_main_0函数进行查看 看到flag&#xff0c;&#xff08;F5&#…

【PostgreSQL17新特性之-事务级别超时参数transaction_timeout】

PostgreSQL数据库里有多个和会话相关的参数&#xff0c;PostgreSQL17-beta1版本新增了一个transaction_timeout参数&#xff0c;来限制事务的持续时间。 当前的一些和会话相关的超时参数如下 -----------------------------------------------------------------------------…

【学习Day2】计算机基础

✍&#x1f3fb;记录学习过程中的输出&#xff0c;坚持每天学习一点点~ ❤️希望能给大家提供帮助~欢迎点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;指点&#x1f64f; 1.4 校验码 奇偶校验 ● 奇偶校验码的编码方法是&#xff1a; 由若干位有效信息的头部或者…

PowerPivot-跨表取值

在PowerPivot中&#xff0c;跨表取值通常涉及创建关系和使用DAX&#xff08;数据分析表达式&#xff09;函数。 以下是一些基本步骤和常用的DAX函数&#xff0c;帮助你在PowerPivot中实现跨表取值&#xff1a; 步骤1&#xff1a;创建关系 加载数据&#xff1a;确保你已将需要…

FreeRTOS【9】计数信号量使用

1.开发背景 FreeRTOS 基于上一篇了解了二值信号量后&#xff0c;如果需要设计需要累计信号量的次数就不行了&#xff0c;所以有了计数信号量&#xff0c;可以设置计数的最大值。同样&#xff0c;可以理解二值信号量是计数信号量的一种特例&#xff0c;即二值信号量是计数信号量…

利用QtScrcpy与Power Automate 实现微信群批量自动清理

Power Automate 一、Power Automate window系统自带软件 在开始使用Power Automate之前&#xff0c;需要熟悉它的基本概念和功能。Power Automate的核心概念包括触发器、操作和连接器。触发器是指触发自动化流程的事件&#xff0c;操作是指在自动化流程中执行的操作&#xff0…

数据结构(十)图

文章目录 图的简介图的定义图的结构图的分类无向图有向图带权图&#xff08;Wighted Graph&#xff09; 图的存储邻接矩阵&#xff08;Adjacency Matrix&#xff09;邻接表代码实现 图的遍历深度优先搜索&#xff08;DFS&#xff0c;Depth Fisrt Search&#xff09;遍历抖索过程…

ai写作助手有哪些,5款强大的ai写作工具为你所用

在科技日新月异的时代&#xff0c;人工智能已经悄然走进我们的生活&#xff0c;为我们带来了诸多便利。其中&#xff0c;AI写作助手作为一种创新的工具&#xff0c;正在改变着我们的写作方式。它们不仅能够提供创意灵感&#xff0c;还能帮助我们提高写作效率&#xff0c;让文字…

在VSCode 中增加文件与文件夹的可辨识度

今天重新打开VSCode&#xff0c;打算新建一个项目做测试&#xff0c;看到VSCode中的文件与文件夹很不容易辨认&#xff0c;有时候容易导致一些误操作&#xff0c;需要做一些配置来改变。 效果图&#xff1a; 只需要做简单的2步就可以了。 1、安装插件 ⑴ 打开VSCode的扩展搜索并…

由浅入深二叉树刷题指南与讲解

目录 前言二叉树OJ基础题目1. 单值二叉树2. 检查两棵树是否相同3. 二叉树的前中后序遍历4. 另一棵树的子树5. 对称二叉树6. 二叉树的构建以及遍历 二叉树其他方法的实现1. 二叉树的销毁2. 层序遍历3. 判断二叉树是否是完全二叉树 二叉树的其他性质总结 前言 上一篇我们已经了解…

整理三维空间内4点的209个结构

4点的209个结构按照旋转对称的关系可分成73组 如1&#xff0c;72&#xff0c;177为一组, z y x z y x 1 72 177 1 2 10 93 4 * 4 74 39 2 * 3 73 179 5 * 5 76 178 3 * 6 75 133 6 7 77 180 7 8 8 89 34 9 11 95 35 * 35 91 …