Java Kafka生产者实现

news2024/9/21 16:34:48

在这里插入图片描述
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。
在这里插入图片描述

  • 推荐:「stormsha的主页」👈,「stormsha的知识库」👈持续学习,不断总结,共同进步,为了踏实,做好当下事儿~

  • 专栏导航

    • Python系列: Python面试题合集,剑指大厂
    • Git系列: Git操作技巧
    • GO系列: 记录博主学习GO语言的笔记,该笔记专栏尽量写的试用所有入门GO语言的初学者
    • 数据库系列: 详细总结了常用数据库 mysql 技术点,以及工作中遇到的 mysql 问题等
    • 运维系列: 总结好用的命令,高效开发
    • 算法与数据结构系列: 总结数据结构和算法,不同类型针对性训练,提升编程思维

    非常期待和您一起在这个小小的网络世界里共同探索、学习和成长。💝💝💝 ✨✨ 欢迎订阅本专栏 ✨✨

    💖The Start💖点点关注,收藏不迷路💖

    📒文章目录


下面是一个可以连接多个节点的Kafka生产者类,并且在其它文件中调用生产者发送消息的示例代码。代码包含了Kafka连接失败和发送消息失败的异常处理。

首先,确保你已经导入了Kafka的依赖。如果你使用的是Maven,可以在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
</dependency>

接下来是Kafka生产者类的实现:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerExample {
    private KafkaProducer<String, String> producer;

    public KafkaProducerExample(String bootstrapServers) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        try {
            producer = new KafkaProducer<>(props);
        } catch (Exception e) {
            System.err.println("Failed to create Kafka producer: " + e.getMessage());
            throw new RuntimeException(e);
        }
    }

    public void sendMessage(String topic, String key, String value) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        try {
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println("Failed to send message: " + exception.getMessage());
                    } else {
                        System.out.println("Message sent successfully to topic " + metadata.topic() +
                                " partition " + metadata.partition() + " with offset " + metadata.offset());
                    }
                }
            });
        } catch (Exception e) {
            System.err.println("Failed to send message: " + e.getMessage());
        }
    }

    public void close() {
        producer.close();
    }
}

然后是在其它文件中调用生产者发送消息的示例代码:

public class KafkaProducerDemo {
    public static void main(String[] args) {
        String bootstrapServers = "localhost:9092,localhost:9093,localhost:9094";
        KafkaProducerExample producerExample = new KafkaProducerExample(bootstrapServers);
        
        try {
            producerExample.sendMessage("test-topic", "key1", "value1");
            producerExample.sendMessage("test-topic", "key2", "value2");
        } catch (Exception e) {
            System.err.println("Exception occurred while sending messages: " + e.getMessage());
        } finally {
            producerExample.close();
        }
    }
}

在上面的代码中,我们创建了一个KafkaProducerExample类,该类的构造函数接受一个包含多个节点的Kafka集群地址字符串。sendMessage方法用于发送消息,并处理可能的异常。如果Kafka连接失败,或者消息发送失败,都会打印错误信息。

KafkaProducerDemo类中,我们实例化了KafkaProducerExample,并调用了sendMessage方法发送消息,最后关闭了生产者实例。这样可以确保资源被正确释放。

你可以根据需要修改主题名、消息内容以及Kafka集群的地址。希望这些代码能帮助你实现功能。


🔥🔥🔥道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙

💖The End💖点点关注,收藏不迷路💖

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2107456.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mysql开启远程访问

个人建议mysql可以用宝塔自动下载安装。 远程访问&#xff0c; 1.关闭防火墙&#xff0c;确保ip能ping通 2.ping端口确定数据库能ping通 3.本地先连上去命令行修改远程访问权限。 mysql -u root -p use mysql; select user,host from user; select host from user where u…

IP地址怎样实现https访问

IP地址实现HTTPS访问的过程涉及一系列步骤&#xff0c;主要是为了确保网站的安全性和可信度。以下是实现IP地址HTTPS访问的关键步骤&#xff1a; 一、确认公网IP地址与权限 公网IP地址&#xff1a;确保你拥有一个公网IP地址&#xff0c;因为只有公网IP才能从互联网直接被客户…

【匈牙利汽车产业考察,开启新机遇】

匈牙利汽车工业发展历史悠久&#xff0c;拥有发达的基础设施和成熟的产业基础&#xff0c;全球20大汽车制造厂商中&#xff0c;有超过14家在匈牙利建立整车制造工厂和汽车零部件生产基地&#xff0c;比亚迪、宁德时代、欣旺达、蔚来等企业纷纷入驻。匈牙利位于东西方交汇处&…

K8s的福音:《Kubernetes企业级云原生运维实战》导读

京东购书点击↓↓↓&#xff1a; 《Kubernetes企业级云原生运维实战&#xff08;云计算前沿实战丛书&#xff09;》(李振良)【摘要 书评 试读】- 京东图书 在当今互联网时代&#xff0c;Kubernetes已经成为新一代的基础设施标准&#xff0c;如何设计一个高效、稳定、安全的Kube…

WGCLOUD的下发指令多长时间执行完成

15秒左右 WGCLOUD在v3.5.4版本&#xff0c;对下发指令做了优化&#xff0c;最快10s执行完成

C++引用简介

引用的基本使用&#xff1a; 作用&#xff1a; 给变量起别名 语法&#xff1a; 数据类型 &别名 原名 int main() {int a 10;int &b a;cout << "a " << a << endl;cout << "b " << b << endl; //都打印…

C++入门基础知识51——【关于C++数字】之C++随机数

成长路上不孤单&#x1f60a;【14后&#xff0c;C爱好者&#xff0c;持续分享所学&#xff0c;如有需要欢迎收藏转发&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#xff01;&#xff01;&#xff01;&#xff01;&#xff…

ArcGIS之建模处理栅格数据以表格显示分区统计(以夜间灯光数据为例)

当需要计算一个shp数据中多个面中的栅格数据值是&#xff0c;可以通过模型构建器进行批量处理&#xff0c;也就是统计多个面中的栅格数据值。但在处理过程中可能会遇见不同的错误&#xff0c;本文会介绍ERROR000883的解决办法。 数据准备&#xff1a;一个shp数据&#xff08;例…

如何在极狐GitLab中添加 SSH Key?

本文分享如何生成 SSH Key 并添加到极狐GitLab 中&#xff0c;然后用 SSH Key 进行代码拉取。 极狐GitLab 是 GitLab 在中国的发行版&#xff0c;可以私有化部署&#xff0c;对中文的支持非常友好&#xff0c;是专为中国程序员和企业推出的企业级一体化 DevOps 平台&#xff0…

c++ string中append/push_back/insert的区别以及erase/pop_back的区别

一.append/push_back/insert的区别 append是在末尾追加字符或字符串 如上的几种用法。注意第二个&#xff0c;它的第三个参数不是结束位置&#xff0c;而是要追加的长度&#xff1b;要追加的字符串可以用string对象表示&#xff0c;也可以用字符指针表示&#xff1b;也可以用于…

TMGM:美国贸易逆差扩大将对第三季度GDP增长产生压力

七月份&#xff0c;美国的进口量激增&#xff0c;导致国际贸易逆差扩大至一年半以来的最大位置&#xff0c;并使净出口再次从第三季度实际GDP增长中扣除超过半个百分点。 七月份&#xff0c;美国的国际贸易逆差扩大了58亿美元&#xff0c;至788亿美元&#xff08;图表&#xf…

『功能项目』DOTween动态文字【26】

打开上一篇25协程生成怪物模型的项目&#xff0c; 本章要做的事情是用DOTween插件做一个动态文字效果 首先在资源商店中免费下载一个DOTween插件 新建脚本&#xff1a;DowteenFlicker.cs 编写脚本&#xff1a; using DG.Tweening; using UnityEngine; using UnityEngine.UI;pu…

如何在算家云搭建Qwen2(智能对话)

一、Qwen2简介 Qwen2 是由阿里云通义千问团队研发的新一代大型语言模型系列&#xff0c;它在多个方面实现了技术的飞跃和性能的显著提升。以下是对 Qwen2 的详细介绍&#xff1a; GitHub - QwenLM/Qwen2: Qwen2 is the large language model series developed by Qwen team, …

从0到1深入理解vite

一、什么是构建工具 ts:如果遇到ts文件&#xff0c;我们需要使用tsc把ts转换为jsreact/vue &#xff1a; 安装react-compiler、vue-conplier 将我们写的jsx或者vue文件转换成render函数less/sass/postcss/somponent-style:我们又需要less-loader、sass-loader等一系列编译工具…

锡废水的废水处理回收

锡废水的废水处理回收是一个综合性的环保过程&#xff0c;旨在从含有锡的废水中提取并回收锡资源&#xff0c;同时减少废水对环境的污染。以下是对锡废水处理回收的详细分析&#xff1a; 一、处理回收意义 锡是一种重要的金属资源&#xff0c;广泛应用于电子、化工、建筑等多个…

HTB-Funnel(ssh端口转发与Hydra爆破)

前言 各位师傅大家好&#xff0c;我是qmx_07,今天给大家讲解Funnel靶机 渗透过程 信息搜集 服务器开放了21FTP端口&#xff0c;22SSH端口&#xff0c;通过sC脚本检测&#xff0c;发现存在匿名登录 FTP匿名登录 发现两个文件尝试下载:password_policy.pdf welcome_28112022…

数据漂移分类——稚嫩版

概念漂移虚拟概念漂移 虚拟概念漂移中分类边界不变的原因是&#xff0c;把分类边界归为网站特征所有&#xff0c;理解为是有网站流量特有的特征构建出来的&#xff0c;但是实际上并不是&#xff0c;网络等因素导致的流量变化也是最终分类边界形成的影响之一&#xff0c;所以我认…

Spring Boot:医护人员排班系统开发的技术选择

5系统详细实现 5.1 医护类型管理 医护人员排班系统的系统管理员可以对医护类型添加修改删除以及查询操作。具体界面的展示如图5.1所示。 图5.1 医护类型管理界面 5.2 排班类型管理 管理员可以对排班类型进行添加修改删除操作。具体界面如图5.2所示。 图5.2 排班类型界面 5.…

Xinstall助力应用运营,注册参数获取更高效!

随着移动互联网的迅猛发展&#xff0c;应用推广已成为各大企业营销的重要一环。然而&#xff0c;在应用推广过程中&#xff0c;如何准确获取用户的注册参数&#xff0c;一直是困扰推广者的难题。今天&#xff0c;我们就来聊聊Xinstall这一神器&#xff0c;看看它如何助力应用轻…

低代码平台中的统一认证与单点登录(SSO):实现简化与安全的用户管理

引言 在现代应用开发中&#xff0c;用户管理是一个关键环节。随着应用数量的增加&#xff0c;传统的用户认证和管理方式逐渐显得繁琐且不够高效。低代码平台的出现为解决这一问题提供了新的思路。本文将探讨低代码平台中如何实现统一认证与单点登录&#xff08;SSO&#xff09…