Sprint Boot教程之五十八:动态启动/停止 Kafka 监听器

news2025/1/14 11:53:17

Spring Boot – 动态启动/停止 Kafka 监听器

当 Spring Boot 应用程序启动时,Kafka Listener 的默认行为是开始监听某个主题。但是,有些情况下我们不想在应用程序启动后立即启动它。

动态启动或停止 Kafka Listener,我们需要三种主要方法,即在需要处理 Kafka 消息时启动/停止、使用@KafkaListener注释、使用 kafkaListenerEndpointRegistry

在本文中,我们将介绍如何动态启动或停止 Kafka 监听器。

启动/停止 Kafka 监听器的不同方法

方法一

  • 当需要处理 Kafka 消息时,启动一个应用程序。
  • 处理成功后停止应用程序。

方法 2:在注册 Kafka Listener 时,我们可以设置以下 id 属性。

@KafkaListener(id = "id-1", groupId = "group-1", topics = "Message-topic", 
                   containerFactory = "messageListenerFactory", autoStartup = "false")
    public void consumeMessage(Message message)

方法 3:自动连接KafkaListenerEndpointRegistry bean 来控制 Kafka Listener 的启动或停止。

@Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

开始:

 public boolean startListener(String listenerId) {
        MessageListenerContainer listenerContainer = 
          kafkaListenerEndpointRegistry.getListenerContainer(listenerId);
        assert listenerContainer != null : false;
        listenerContainer.start();

停止:

public boolean stopListener(String listenerId) {
        MessageListenerContainer listenerContainer = 
          kafkaListenerEndpointRegistry.getListenerContainer(listenerId);
        assert listenerContainer != null : false;
        listenerContainer.stop();
        logger.info("{} Kafka Listener Stopped.", listenerId);

下面我们将以上述句法方法为例进行实现。

启动或停止特定 Kafka Listener的实现

创建一个类,其对象将被 Kafka 侦听器使用。

文件:Message.java

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {
    private String message;
}

配置 Kafka Listener 将使用的消费者。

文件:KakfaConsumerConfig.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

    private String kafkaUrl = "localhost:9092";

    @Bean
    public ConsumerFactory<String, Message> messageConsumerFactory() {
        JsonDeserializer<Message> deserializer = 
             new JsonDeserializer<>(Message.class, false);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
                   StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
                   JsonDeserializer.class);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), 
                                                 deserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Message> 
      messageListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Message> containerFactory = 
          new ConcurrentKafkaListenerContainerFactory();
        containerFactory.setConsumerFactory(messageConsumerFactory());
        return containerFactory;
    }
}

创建一个具有必要参数的 Kafka 监听器。

  • id:此侦听器的容器唯一标识符。如果未指定,则使用自动生成的 ID。
  • groupId:仅为该监听器使用该值覆盖消费者工厂的 group.id 属性。
  • 主题:此侦听器的主题。条目可以是“主题名称”、“属性占位符键”或“表达式”。主题名称必须从表达式解析。这使用组管理,Kafka 将为组成员分配分区。
  • containerFactory:KafkaListenerContainerFactory的 bean 名称,将用于创建为该端点提供服务的消息侦听器容器。
  • autoStartup:设置为 true 或 false 以覆盖容器工厂的默认设置。默认情况下,该值设置为 true,因此,它将在我们的应用程序启动时立即开始使用消息。

文件:KafkaMessageListener.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

@Configuration
public class KafkaMessageListener {

    Logger logger = LoggerFactory.getLogger(KafkaMessageListener.class);

    @KafkaListener(id = "id-1", groupId = "group-1", topics = "Message-topic", 
                   containerFactory = "messageListenerFactory", autoStartup = "false")
    public void consumeMessage(Message message) {
        logger.info("Message received : -> {}", message);
    }
}

KafkaListenerEndpointRegistry 类可用于通过 listenerId 获取 Kafka 侦听器容器。这里我们使用了@KafkaListener注释来将 bean 方法声明为 Kafka 侦听器容器的侦听器。现在可以使用此容器启动或停止 Kafka 侦听器。

文件:KafkaListenerAutomation.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;

@Component
public class KafkaListenerAutomation {

    private final Logger logger = LoggerFactory.getLogger(KafkaListenerAutomation.class);

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    public boolean startListener(String listenerId) {
        MessageListenerContainer listenerContainer = 
           kafkaListenerEndpointRegistry.getListenerContainer(listenerId);
        assert listenerContainer != null : false;
        listenerContainer.start();
        logger.info("{} Kafka Listener Started", listenerId);
        return true;
    }

    public boolean stopListener(String listenerId) {
        MessageListenerContainer listenerContainer = 
          kafkaListenerEndpointRegistry.getListenerContainer(listenerId);
        assert listenerContainer != null : false;
        listenerContainer.stop();
        logger.info("{} Kafka Listener Stopped.", listenerId);
        return true;
    }
}

使用 API 端点,我们可以通过提供 listenerID 来启动或停止特定的 Kafka 监听器。

文件:StartOrStopListenerController.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class StartOrStopListenerController {

    @Autowired
    KafkaListenerAutomation kafkaListenerAutomation;

    @GetMapping("/start")
    public void start(@RequestParam("id") String listenerId) {
        kafkaListenerAutomation.startListener(listenerId);
    }

    @GetMapping("/stop")
    public void stop(@RequestParam("id") String listenerId) {
        kafkaListenerAutomation.stopListener(listenerId);
    }
}

输出:

1.Kafka Listener启动:

2.Kafka Listener 收到消息:

3. Kafka Listener 停止:

最后

理想情况下,应用程序应在需要处理 Kafka 消息时启动,并在该过程完成后立即停止。限制 Kafka 侦听器以有效利用它是一种很好的做法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2276461.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《JavaWeb开发-javascript基础》

文章目录 《JavaWeb开发-javascript基础》1.javascript 引入方式2.JS-基础语法-书写语法2.1 书写语法2.2 输出语句 3.JS-基础语法-变量4.JS-基础语法-数据类型&运算符4.1 数据类型4.2 运算符4.3 数据类型转换 5. JS-函数6. JS-对象-Array数组7. JS-对象-String字符串8. JS-…

1.组件的三大组成部分注意点(结构/样式/逻辑)scoped解决样式冲突/data是一个函数2.组件通信组件通信语法父传子子传父

学习目标 1.组件的三大组成部分注意点&#xff08;结构/样式/逻辑&#xff09; scoped解决样式冲突/data是一个函数 2.组件通信 组件通信语法 父传子 子传父 非父子通信&#xff08;扩展&#xff09; 3.综合案例&#xff1a;小黑记事本&#xff08;组件版&#xff09; …

mapbox基础,expressions表达式汇总

👨‍⚕️ 主页: gis分享者 👨‍⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍⚕️ 收录于专栏:mapbox 从入门到精通 文章目录 一、🍀前言二、🍀Expressions简介2.1 expressions 操作符2.1.1 Data expressions2.1.2 Camera expressions2.2 Expressi…

一文清晰梳理Mysql 数据库

现在处于大四上学期的阶段&#xff0c;在大四下学期即将要进行毕业设计&#xff0c;所以在毕业设计开始之前呢&#xff0c;先将Mysql 数据库有关知识进行了一个梳理&#xff0c;以防选题需要使用到数据库。 1&#xff09;什么是数据库&#xff1f; 简单理解数据库&#xff0c…

基于大语言模型的组合优化

摘要&#xff1a;组合优化&#xff08;Combinatorial Optimization, CO&#xff09;对于提高工程应用的效率和性能至关重要。随着问题规模的增大和依赖关系的复杂化&#xff0c;找到最优解变得极具挑战性。在处理现实世界的工程问题时&#xff0c;基于纯数学推理的算法存在局限…

安装conda 环境

conda create -n my_unet5 python3.8 conda activate my_unet5

容器技术全面攻略:Docker的硬核玩法

文章背景 想象一下&#xff0c;一个项目终于要上线了&#xff0c;结果因为环境配置不一致&#xff0c;测试服务器一切正常&#xff0c;生产环境却宕机了。这是开发者噩梦的开始&#xff0c;也是Docker救世主角色的登场&#xff01;Docker的出现颠覆了传统环境配置的方式&#…

LabVIEW部署Web服务

目录 LabVIEW部署Web服务1、创建项目2、创建Web服务3、新建WebVI3.1、使用GET方法3.2、使用POST方法 4、 部署和对应URL4.1、应用程序&#xff1a;80804.2、本地调试&#xff1a;80094.3、NI Web服务器&#xff1a;9090(禁用) 5、测试5.1、测试GET方法5.2、测试POST方法 6、实际…

STM32 : 波特率发生器

波特率发生器 1. 发送器和接收器的波特率 波特率寄存器 (BRR): 在串行通信中&#xff0c;发送器和接收器的波特率是由波特率寄存器&#xff08;BRR&#xff09;中的一个值 DIV 来确定的。 2. 计算公式 计算公式: 详细解释 1. 波特率寄存器 (BRR) BRR: 波特率寄存器是一…

Excel数据叠加生成新DataFrame:操作指南与案例

目录 一、准备工作 二、读取Excel文件 三、数据叠加 四、处理重复数据&#xff08;可选&#xff09; 五、保存新DataFrame到Excel文件 六、案例演示 七、注意事项 八、总结 在日常数据处理工作中&#xff0c;我们经常需要将不同Excel文档中的数据整合到一个新的DataFra…

基于微信小程序的汽车销售系统的设计与实现springboot+论文源码调试讲解

第4章 系统设计 一个成功设计的系统在内容上必定是丰富的&#xff0c;在系统外观或系统功能上必定是对用户友好的。所以为了提升系统的价值&#xff0c;吸引更多的访问者访问系统&#xff0c;以及让来访用户可以花费更多时间停留在系统上&#xff0c;则表明该系统设计得比较专…

C#调用OpenCvSharp实现图像的开运算和闭运算

对图像同时进行腐蚀和膨胀操作&#xff0c;顺序不同则效果也不同。先腐蚀后膨胀为开运算&#xff0c;能够消除小斑点和细小的突出物、平滑图像以及改善边缘&#xff1b;先膨胀后腐蚀为闭运算&#xff0c;能够去除噪点、填补图像孔洞、连接邻近物体和平滑物体边界。   OpenCvS…

从 SQL 语句到数据库操作

1. SQL 语句分类 数据定义语言 DDL &#xff1a; 用于定义或修改数据库中的结构&#xff0c;如&#xff1a;创建、修改、删除数据库对象。create、drop alter 数据操作语言 DML &#xff1a; 用于添加、删除、更新数据库中的数据。select、insert alter、drop 数据控制语言 D…

django在线考试系统

Django在线考试系统是一种基于Django框架开发的在线考试平台&#xff0c;它提供了完整的在线考试解决方案。 一、系统概述 Django在线考试系统旨在为用户提供便捷、高效的在线考试环境&#xff0c;满足教育机构、企业、个人等不同场景下的考试需求。通过该系统&#xff0c;用…

【Spring Boot 应用开发】-04-01 自动配置-数据源-连接池

资源关闭 还记得上一节中的这段代码么&#xff1f; try {if (resultSet ! null) resultSet.close();if (preparedStatement ! null) preparedStatement.close();if (connection ! null) connection.close(); } catch (SQLException e) {e.printStackTrace(); }这是我们在查询…

AngularJs指令中出错:Error: $compile:nonassign Non-Assignable Expression

Expression {resumeId: item.resumeId} used with directive rwdsDelete is non-assignable! 在AngularJS中&#xff0c;$compile服务用于将指令编译成HTML。当你在模板中使用了一个表达式&#xff0c;但这个表达式不是一个左值&#xff08;即不能被赋值的表达式&#xff09;时…

Docker 的安装和基本使用[SpringBoot之Docker实战系列] - 第535篇

历史文章&#xff08;文章累计530&#xff09; 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 《…

聚铭网络当选中关村华安关键信息基础设施安全保护联盟理事单位

近日&#xff0c;在北京隆重举行的中关村华安关键信息基础设施安全保护联盟&#xff08;以下简称“联盟”&#xff09;第一届第四次会员大会上&#xff0c;聚铭网络凭借其在网络安全领域的卓越贡献和创新实力&#xff0c;成功当选为联盟的理事单位。此次大会吸引了来自政府机关…

CES 2025|全面拥抱端侧AI,美格智能在CES发布系列创新成果

要点&#xff1a; ▶ 在AI机器人领域&#xff0c;以高算力AI模组助力发布“通天晓”人形机器人和2款全新微小型AI机器人 ▶ 在AI硬件领域&#xff0c;发布消费级AI智能体产品——AIMO&#xff0c;引领个人专属的大模型时代 ▶ 在5G通信领域&#xff0c;发布全新5GWiFi-7 CPE…

VScode 配置 C语言环境

遇到的问题集合 mingw官方下载网站&#xff08;https://sourceforge.net/projects/mingw-w64/files/&#xff09;更新之后&#xff0c;与网上大多数教程上写的界面不同了。 网上大多数教程让下载这个&#xff1a; 但是现在找不到这个文件。 写hello.c文件时&#xff0c;报错&…