在 Java 中实现 Kafka Producer 的单例模式

news2025/1/13 2:50:27

在这里插入图片描述
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。
在这里插入图片描述

  • 推荐:「stormsha的主页」👈,「stormsha的知识库」👈持续学习,不断总结,共同进步,为了踏实,做好当下事儿~

  • 专栏导航

    • Python系列: Python面试题合集,剑指大厂
    • Git系列: Git操作技巧
    • GO系列: 记录博主学习GO语言的笔记,该笔记专栏尽量写的试用所有入门GO语言的初学者
    • 数据库系列: 详细总结了常用数据库 mysql 技术点,以及工作中遇到的 mysql 问题等
    • 运维系列: 总结好用的命令,高效开发
    • 算法与数据结构系列: 总结数据结构和算法,不同类型针对性训练,提升编程思维

    非常期待和您一起在这个小小的网络世界里共同探索、学习和成长。💝💝💝 ✨✨ 欢迎订阅本专栏 ✨✨

    💖The Start💖点点关注,收藏不迷路💖

    📒文章目录

      • 一、前言
      • 二、Kafka Producer 的基本配置
      • 三、实现 Kafka Producer 的单例模式
      • 四、使用 Kafka Producer 发送消息
      • 五、总结


一、前言

在分布式系统中,Apache Kafka 是一个非常受欢迎的消息中间件。它提供了高吞吐量、低延迟的消息传递机制,非常适合处理实时数据流。本文将介绍如何在 Java 中使用 Kafka Producer 并实现单例模式,以确保资源的有效管理。

Kafka 是一个分布式流处理平台,它的核心功能包括发布和订阅记录流、存储流记录、以及处理流记录。为了充分利用 Kafka 的功能,一个高效的 Kafka 生产者(Producer)是必要的。在生产环境中,使用单例模式可以确保 Kafka Producer 资源的唯一性和线程安全性。

二、Kafka Producer 的基本配置

在开始之前,我们需要引入一些必要的依赖。假设我们使用 Maven 项目,pom.xml 文件中需要添加以下依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.30</version>
    </dependency>
</dependencies>

三、实现 Kafka Producer 的单例模式

下面是完整的 QuoteKafkaProducer 类,包含了 Kafka Producer 的配置、消息发送和关闭方法。

package com.stormsha.util;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class QuoteKafkaProducer {

    private static final Logger logger = LoggerFactory.getLogger(QuoteKafkaProducer.class);

    private static String bootstrapServers;  // Kafka 服务器的地址
    private static KafkaProducer<String, String> producer;  // Kafka Producer 实例
    private static final Object lock = new Object();  // 锁对象,用于线程安全的单例模式

    /**
     * 获取 Kafka Producer 单例实例
     *
     * @param servers Kafka 服务器地址
     * @return KafkaProducer 实例
     */
    public static KafkaProducer<String, String> getInstance(String servers) {
        if (isLocalEnvironment()) {  // 本地启动不需要实例化kafka
            return producer;
        }
        if (producer == null) {  // 双重检查锁定机制,确保单例实例的唯一性和线程安全
            synchronized (lock) {
                if (producer == null) {
                    bootstrapServers = servers;  // 设置 Kafka 服务器地址
                    // 配置 Kafka Producer 属性
                    Properties props = new Properties();
                    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  // 设置 Kafka 服务器地址
                    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  // 设置键的序列化器
                    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  // 设置值的序列化器

                    // 创建 Kafka Producer 实例
                    producer = new KafkaProducer<>(props);
                    
                    // JVM 关闭时,确保 Kafka producer 被关闭
                    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                        logger.info("关闭 Kafka Producer");
                        close();
                    }));
                }
            }
        }
        return producer;
    }

    /**
     * 发送 Kafka 消息
     *
     * @param topic 目标主题
     * @param key   消息键
     * @param value 消息值
     */
    public static void sendMessage(String topic, String key, String value) {
        if (isLocalEnvironment()) {  // 本地启动不需要实例化kafka
            return;
        }

        // 获取 Kafka Producer 单例实例
        KafkaProducer<String, String> producer = QuoteKafkaProducer.getInstance(bootstrapServers);

        // 创建一条消息,包含topic、key 和 value
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

        // 异步发送消息
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                // 发送失败的处理
                logger.error("消息发送失败", exception);
            } else {
                // 发送成功的处理
                logger.info("消息发送成功: 主题: {}, 分区: {}, 偏移量: {}", metadata.topic(), metadata.partition(), metadata.offset());
            }
        });
    }

    /**
     * 关闭 Kafka Producer 实例
     */
    public static void close() {
        if (producer != null) {
            synchronized (lock) {
                if (producer != null) {
                    producer.close();  // 关闭 Kafka Producer 实例
                    producer = null;  // 清空 Kafka Producer 实例
                }
            }
        }
    }

    /**
     * 判断当前程序是否在本地环境启动
     *
     * @return true 如果在本地环境启动,否则返回 false
     */
    private static boolean isLocalEnvironment() {
        // 获取环境变量
        String env = System.getenv().getOrDefault("ENV", "dev");
        // 返回是否为本地环境标志
        return "local".equals(env);
    }
}

四、使用 Kafka Producer 发送消息

以下是如何使用 QuoteKafkaProducer 类发送 Kafka 消息的示例:

// 使用示例
String servers = "127.0.0.1:9092,127.0.0.1:9092";
QuoteKafkaProducer.getInstance(servers);
QuoteKafkaProducer.sendMessage("topicId", "dataKey", "发送的内容");

五、总结

通过实现 Kafka Producer 的单例模式,我们可以确保 Kafka Producer 在整个应用程序中是唯一的,并且在多线程环境下是安全的。同时,通过引入日志记录,我们可以更好地监控消息的发送状态和处理潜在的异常。这种模式不仅提高了资源的利用效率,还简化了资源管理,特别是在处理高并发和大规模数据流的应用中。

希望本文对你在实际项目中使用 Kafka Producer 提供一些帮助。如果有任何问题或建议,欢迎在评论区讨论。


🔥🔥🔥道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙

💖The End💖点点关注,收藏不迷路💖

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2137525.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

解决 Kylin OS 提示软件包 powerconnect 需要重新安装,但是我无法找到相应的安装文件

解决 Kylin OS 提示软件包 powerconnect 需要重新安装&#xff0c;但是我无法找到相应的安装文件 1、问题现象2、解决办法 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 1、问题现象 sudo apt-get install -y seahorse生物识别认证 按Q或…

Android 11 FileProvider的使用和限制

概述&#xff1a; 从Android 7开始&#xff0c;将不允许在app之间&#xff0c;使用file uri,即file://的方式&#xff0c;传递一个file&#xff0c;否则会抛出异常&#xff1a;FileUriExposedException &#xff0c;其解决方案&#xff0c;就是使用FileProvider&#xff0c;用c…

【Rust练习】14.流程控制

练习题来自&#xff1a;https://practice-zh.course.rs/flow-control.html 1 // 填空 fn main() {let n 5;if n < 0 {println!("{} is negative", n);} __ n > 0 {println!("{} is positive", n);} __ {println!("{} is zero", n);} } …

初识Linux · 进程(3)

目录 前言&#xff1a; 进程的创建 前言&#xff1a; 继上文介绍了着重介绍了进程的内部属性&#xff0c;以及在操作系统层面进程如何被组织起来的&#xff0c;如何调用系统接口&#xff0c;有关task_struct&#xff0c;进程的部分理解等&#xff0c;今天&#xff0c;我们就…

医疗报销|基于springBoot的医疗报销系统设计与实现(附项目源码+论文+数据库)

私信或留言即免费送开题报告和任务书&#xff08;可指定任意题目&#xff09; 目录 一、摘要 二、相关技术 三、系统设计 四、数据库设计 五、核心代码 六、论文参考 七、源码获取 一、摘要 传统信息的管理大部分依赖于管理人员的手工登记与管理&#x…

知识竞赛活动舞台搭建要多少钱

每次举办活动&#xff0c;舞台搭建总是让人头疼的一部分&#xff0c;尤其是费用问题。今天就来揭开活动舞台搭建费用的神秘面纱。 活动舞台搭建的费用主要包括舞台结构、设备、音响、灯光、舞美装饰等各方面的成本。具体来说&#xff1a; 1.舞台结构&#xff1a;包括舞台平台…

vscode 中 python 代码跳转不生效 ctrl加单击不跳转

目录 网友的解决方法&#xff1a; 我的解决方法 vscode 中 python 代码跳转不生效 ctrl加单击不跳转 网友的解决方法&#xff1a; vscode 中 python 代码跳转不生效_vscode python 代码无法跳转-CSDN博客 解决方法 后来发现vs code初次远程连接服务器时&#xff0c;需要…

带你0到1之QT编程:十一、掌握Containers容器艺术,一网打尽开发利器

此为QT编程的第十一谈&#xff01;关注我&#xff0c;带你快速学习QT编程的学习路线&#xff01; 每一篇的技术点都是很很重要&#xff01;很重要&#xff01;很重要&#xff01;但不冗余&#xff01; 我们通常采取总-分-总和生活化的讲解方式来阐述一个知识点&#xff01; …

C++类与对象(二)

目录 1.类的6个默认成员函数 2..构造函数 2.1概念 2.2 特征 3.析构函数 3.1 概念 3.2 特性 4.拷贝构造函数 4.1 概念 4.2 特征 5.赋值运算符重载函数 5.1 运算符重载&#xff08;是否重载这个运算符是看这个运算符对这个类是否有意义&#xff09; 5.2 赋值运算符重…

Vue3 + Echarts 实现中国地图

基本概念 echarts是一个基于JavaScript的开源可视化库&#xff0c;用于创建和展示各种交互式图表和图形。它可以用于数据分析、数据可视化、数据探索和数据报告等方面。我们一般使用echarts来实现数据可视化&#xff0c;本文我们使用vue3 echars来实现中国地图。 准备echarts…

Node.js 多版本安装与切换指南

一.使用nvm的方法 1. 卸载nodejs 如果你的电脑有安装nodejs&#xff0c;需要先卸载掉&#xff1b;若没有请直接下一步。 2. 前往官网下载nvm nvm&#xff1a;一个nodejs版本管理工具&#xff01; 官网地址&#xff1a;nvm文档手册 - nvm是一个nodejs版本管理工具 - nvm中文…

智能数据体系,新突破?

智能数据体系&#xff0c;新突破&#xff1f; 前言智能数据体系 前言 我们正处于一个数智融合的新时代&#xff0c;数据的价值和作用日益凸显。如何更好地理解和利用数据&#xff0c;构建先进的智能数据体系&#xff0c;成为了摆在我们面前的重要课题。 在这个背景下&#xf…

归并排序(Merge Sort)

什么是归并排序 归并排序&#xff08;Merge Sort&#xff09;是一种经典的排序算法&#xff0c;它采用分治法&#xff08;Divide and Conquer&#xff09;策略&#xff0c;将一个大数组分为两个小数组&#xff0c;分别进行排序&#xff0c;然后将这两个已排序的小数组合并成一个…

Cortex-M3架构学习:异常

异常类型 Cortex-M3 在内核水平上搭载了一个异常响应系统&#xff0c;支持为数众多的系统异常和外部中断。其 中&#xff0c;编号为 1&#xff0d;15 的对应系统异常&#xff0c;大于等于 16 的则全是外部中断。 Cortex-M3支持的中断源数目为 240 个&#xff0c;做成芯片后&…

docker进入容器运行命令详细讲解

​ 大家好&#xff0c;我是程序员小羊&#xff01; 前言&#xff1a; 在 Docker 中&#xff0c;进入容器并运行命令是常见的操作&#xff0c;尤其是当你想要调试、检查日志或手动运行某些程序时。Docker 提供了几种方式来进入容器和执行命令。 前提条件 确保你的 Docker 容器…

C++基础面试题 | 什么是C++中的虚继承?

文章目录 回答重点菱形继承问题虚继承解决菱形继承问题虚继承的二义性解决 虚继承总结拓展知识&#xff1a;virtual关键字的用法1. 虚函数 (Virtual Function)2. 纯虚函数 (Pure Virtual Function)3. 虚析构函数 (Virtual Destructor)4. 虚继承 (Virtual Inheritance)5. 虚函数…

一篇文章带你入门机器学习 Part1 -->Machine Learning from Scratch

学习网站&#xff1a;Machine Learning from Scratch Machine Learning from Scratch (Part1神经网络&#xff09; 神经网络——Neural Networks神经网络是如何工作的&#xff1f;训练神经网络 神经网络——Neural Networks 在人工神经网络的背景下&#xff1a;一个神经元是一…

046全排列

题意 给定一个不含重复数字的数组 nums &#xff0c;返回其 所有可能的全排列 。你可以 按任意顺序 返回答案。 提示&#xff1a; 1 < nums.length < 6 -10 < nums[i] < 10 nums 中的所有整数 互不相同 难度 中等 示例 示例 1&#xff1a; 输入&#xff1…

uniapp+若依 开发租房小程序源码分享

1、使用Uniapp开发的前台&#xff0c;基于 Vue.js 开发所有前端应用的框架&#xff0c;开发者编写一套代码&#xff0c;可发布到iOS、Android、Web&#xff08;响应式&#xff09;、以及各种小程序 2、基于SpringBoot的权限管理系统&#xff0c;易读易懂、界面简洁美观。 核心…

WordBN字远笔记!更新1.2.2版本|Markdown编辑器新增高亮功能,界面新增深色模式

WordBN字远笔记1.2.2版本更新描述 WordBN字远笔记在1.2.2版本中进行了多项重要的更新与改进&#xff0c;旨在提升用户的编辑体验和视觉舒适度。 以下是本次更新的两大亮点&#xff1a;Markdown编辑器新增高亮功能以及界面新增深色模式。 1. Markdown编辑器新增高亮功能 在1…