Kafka生产者消息异步发送并返回发送信息api编写教程

news2024/11/9 3:54:50

1.引入依赖(pox.xml文件)

<dependencies>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>3.6.2</version>

        </dependency>

</dependencies>

2.创建java类

3.配置运行属性

//连接的服务器

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");

//指定对应的key和value的序列化类型

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

//关联自定义分区器

//properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.ljr.kafka.producer.MyPartitioner");

4.创建生产者对象

键入new KafkaProducer<>(),光标置于括号内CTRL+P可以显示需要对象为properties;

键入new Properties().var 回车,键入new KafkaProducer<>(properties).var 回车,选择变量名

5.发送消息并返回发送结果

键入KafkaProducer.send(),提示需要对象ProducerRecord;键入topic名(order)和要发送的信息(“0000”+i),new Callback()回车会弹出需要重写的抽象类,补全返回条件、需要返回的信息即可实现抽象类;

e == null 表示消息全部发送完毕;

6.关闭资源

KafkaProducer.close();

7.运行查看结果

运行:

可以看到有返回信息;

另开窗口查看发送结果

kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092 --topic order

信息发送成功;

8.完整代码

package com.ljr.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerCallback {
    public static void main(String[] args) {

        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
/关联自定义分区器
//		properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.ljr.kafka.producer.MyPartitioner");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        for(int i =0; i < 3; i++){
            kafkaProducer.send(new ProducerRecord<>("customers", "LiSi" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("topic:" + recordMetadata.topic() + " partition:" + recordMetadata.partition());
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1794035.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

0基础学习区块链技术——推演猜想

大纲 去中心预防篡改付出代价方便存储 在《0基础学习区块链技术——入门》一文中&#xff0c;我们结合可视化工具&#xff0c;直观地感受了下区块的结构&#xff0c;以及链式的前后关系。 本文我们将抛弃之前的知识&#xff0c;从0开始思考和推演&#xff0c;区块链技术可能是如…

【必会面试题】快照读的原理

目录 前言知识点一个例子 前言 快照读&#xff08;Snapshot Read&#xff09;是数据库管理系统中一种特殊的读取机制&#xff0c;主要用于实现多版本并发控制&#xff08;MVCC, Multi-Version Concurrency Control&#xff09;策略&#xff0c;尤其是在MySQL的InnoDB存储引擎中…

hadoop疑难问题解决_NoClassDefFoundError: org/apache/hadoop/fs/adl/AdlFileSystem

1、问题描述 impala执行查询&#xff1a;select * from stmta_raw limit 10; 报错信息如下&#xff1a; Query: select * from sfmta_raw limit 10 Query submitted at: 2018-04-11 14:46:29 (Coordinator: http://mrj001:25000) ERROR: AnalysisException: Failed to load …

关于一些优化的知识

1、凸优化&#xff1a;一定可找到全局最优解 非凸优化&#xff1a;一般是局部最优解 2、无约束优化问题求解方法&#xff1a;梯度下降、拟牛顿法、高斯牛顿法、LM算法 3、 解释&#xff0c;就是右边的式子对应于就是当前这个xk这个点基础上朝着pk取走步长为a得到了对应的值&…

什么是Vector Database(向量数据库)?

什么是Vector Database(向量数据库)&#xff1f; 向量数据库是向量嵌入的有组织的集合&#xff0c;可以随时创建、读取、更新和删除。向量嵌入将文本或图像等数据块表示为数值。 文章目录 什么是Vector Database(向量数据库)&#xff1f;什么是嵌入模型(Embedding Model)&…

618精选好物盘点!五款必买力作合集,省钱又实用!

随着夏日的脚步越来越近&#xff0c;618购物节也如期而至&#xff0c;成为消费者们期待的年度购物盛宴。在这个全民狂欢的日子里&#xff0c;各种精选好物层出不穷&#xff0c;无论是追求高品质生活的你&#xff0c;还是希望提升日常效率的上班族&#xff0c;都能在这一天找到心…

【微服务】使用kubekey部署k8s多节点及kubesphere

kubesphere官方部署文档 https://github.com/kubesphere/kubesphere/blob/master/README_zh.md kubuctl命令文档 https://kubernetes.io/zh-cn/docs/reference/kubectl/ k8s资源类型 https://kubernetes.io/zh-cn/docs/reference/kubectl/#%E8%B5%84%E6%BA%90%E7%B1%BB%E5%9E…

QT_UI设计

mainwindow.h #ifndef MAINWINDOW_H #define MAINWINDOW_H#include <QMainWindow>QT_BEGIN_NAMESPACE //命名空间 namespace Ui { class MainWindow; } //ui_MainWindow文件里定义的类&#xff0c;外部声明 QT_END_NAMESPACEclass MainWindow : public QMainWindow {Q_O…

读书笔记-《软件定义安全》之一:SDN和NFV:下一代网络的变革

第1章 SDN和NFV&#xff1a;下一代网络的变革 1.什么是SDN和NFV 1.1 SDN/NFV的体系结构 SDN SDN的体系结构可以分为3层&#xff1a; 基础设施层由经过资源抽象的网络设备组成&#xff0c;仅实现网络转发等数据平面的功能&#xff0c;不包含或仅包含有限的控制平面的功能。…

debian支持中文

1. 安装依赖 apt-get install locales2. 设置字符集 dpkg-reconfigure locales 选择en_us.UF-8&#xff0c;其余全部取消 3.设置生效&#xff0c;无需重启系统 source /etc/default/locale 4. 查看是否生效 locale

高效内容分发:海外短剧推广平台的流媒体传输技术挑战与解决

随着海外短剧市场的蓬勃发展&#xff0c;如何高效地将短剧内容分发给全球观众成为了推广平台必须面对的一大挑战。在这一过程中&#xff0c;流媒体传输技术起着至关重要的作用。然而&#xff0c;由于网络环境的复杂性和多样性&#xff0c;流媒体传输面临着带宽限制、延迟等诸多…

(学习笔记)数据基建-元数据管理

&#xff08;学习笔记&#xff09;数据基建-元数据管理 什么是元数据元数据该如何管理工具化规范化 数据血缘 什么是元数据 简单来说就是描述数据的数据&#xff0c;更直白来说就是描述表名、表制作者、表字段、表生命周期、表存粗等信息的数据 元数据该如何管理 工具化 …

【备战蓝桥杯】蓝桥杯省一笔记:算法模板笔记(Java)

蓝桥杯 0、快读快写模板1、回文判定2、前缀和3、差分4、二分查找5、快速幂6、判断素数7、gcd&lcm8、进制转换9、位运算10、字符串常用API11、n的所有质因子12、n的质因子个数13、n的约数个数14、n阶乘的约数个数15、n的约数和16、阶乘 & 双阶乘17、自定义升序降序18、动…

Docker安装、使用,容器化部署springboot项目

一、使用官方安装脚本自动安装 安装命令如下&#xff1a; curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun 也可以使用国内 daocloud 一键安装命令&#xff1a; curl -sSL https://get.daocloud.io/docker | sh 二、Docker离线安装 1. 下载安装包 可…

【android】设置背景图片

改变值&#xff0c;可显示zai在 在theves下面的两个value都要增加名字代码 <item name"windowActionBar">false</item><item name"android:windowNoTitle">true</item><item name"android:windowFullscreen">tru…

一、【源码】创建简单的映射器代理工厂

源码地址&#xff1a;https://github.com/mybatis/mybatis-3/ 仓库地址&#xff1a;https://gitcode.net/qq_42665745/mybatis/-/tree/01-xxxDao-proxy 创建简单的映射器代理工厂 执行xxxDao.method()时都做了些什么&#xff1f; 原理是&#xff1a;首先定义Dao接口&#xff…

docker 停止重启容器命令start/stop/restart详解(容器生命周期管理教程-2)

Docker 提供了多个命令来管理容器的生命周期&#xff0c; 其中start、stop 和 restart。这些命令允许用户控制容器的运行状态。 1. docker start 命令格式&#xff1a; docker start [OPTIONS] CONTAINER [CONTAINER...]功能&#xff1a; 启动一个或多个已经停止的 Docker …

【Qt系列教程】一、认识Qt、安装Qt、运行Hello Qt

文章目录 1.1 Qt 简介1.2 Qt 的安装1.3 编写 Hello World 1.1 Qt 简介 Qt&#xff08;官网&#xff1a;https://www.qt.io&#xff09;于1995年5月首次公开发布&#xff0c;是一个跨平台的应用程序开发框架&#xff0c;也是最主流的 C 开发框架&#xff1b; Qt 具有其他编程…

[XYCTF新生赛]-Reverse:ez_rand解析(爆破时间戳,汇编结合反汇编)

无壳 查看ida 这里是利用time64获取种子&#xff0c;但是time64不是标准的函数&#xff0c;这里是伪随机数&#xff0c;简单地来说就是它不是通过时间来确定种子&#xff0c;所以我们没办法在脚本里直接调用它得到种子&#xff0c;那就意味着我们不知道种子是多少&#xff0c;…

一个好用的对外开放端口工具 Ngrok

工作中我们经常需要在测试或者开发阶段给客户快速展示程序&#xff0c;需要运维打开端口、部署等一系列操作&#xff0c;成本较高。如果能够直将本地开发环境发布给客户直接进行体验、需求确认&#xff0c;就会方便很多&#xff0c;本文将介绍一个小工具可以快速对外打开端口。…