Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现

news2024/11/15 19:59:47

文章目录

  • 概述
  • 思路
  • Code
  • 扩展
    • KafkaListenerEndpointRegistry

在这里插入图片描述


概述

在实际应用中,往往需要根据业务需求动态开启/关闭Kafka消费者监听。例如,在某些时间段内,可能需要暂停对某个Topic的消费,或者在某些条件下才开启对某个Topic的消费。

在Spring Boot中,要实现动态的控制或关闭消费以及动态开启或关闭监听,可以使用Spring Kafka提供的一些功能。


思路

首先,需要配置Kafka消费者的相关属性。在Spring Boot中,可以通过在application.properties或application.yml文件中添加相应的配置来实现。

以下是一个示例配置:

spring.kafka.consumer.bootstrap-servers=<Kafka服务器地址>
spring.kafka.consumer.group-id=<消费者组ID>

接下来,可以创建一个Kafka消费者,使用@KafkaListener注解来指定要监听的Kafka主题,并编写相应的消息处理方法。例如:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    @KafkaListener(topics = "<Kafka主题>")
    public void receive(String message) {
        // 处理接收到的消息
    }
}

现在,你可以使用以下两种方法来控制或关闭消费以及动态开启或关闭监听:

方法1:使用@KafkaListener注解的autoStartup属性

@KafkaListener注解具有一个名为autoStartup的属性,可以用于控制是否自动启动消费者。默认情况下,它的值为true,表示自动启动。如果将其设置为false,则消费者将不会自动启动。

@KafkaListener(topics = "<Kafka主题>", autoStartup = "false")
public void receive(String message) {
    // 处理接收到的消息
}

要在运行时动态启动消费者,你可以通过KafkaListenerEndpointRegistry bean来手动启动:

@Autowired
private KafkaListenerEndpointRegistry endpointRegistry;

// 启动消费者
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").start();

同样,你也可以使用stop()方法来停止消费者:

// 停止消费者
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").stop();

方法2:使用KafkaListenerEndpointRegistry bean的pause()resume()方法

KafkaListenerEndpointRegistry bean提供了pause()resume()方法,用于暂停和恢复消费者的监听。

@Autowired
private KafkaListenerEndpointRegistry endpointRegistry;

// 暂停消费者监听
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").pause();

// 恢复消费者监听
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").resume();

使用这些方法,可以在运行时动态地控制或关闭消费,以及动态地开启或关闭监听。


Code


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * @author artisan
 */
@Slf4j
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServer;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;

    @Value("${spring.kafka.consumer.group-id}")
    private String group_id;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;

    @Value("${spring.kafka.consumer.max-poll-interval-ms}")
    private String maxPollIntervalMs;

    @Value("${spring.kafka.listener.concurrency}")
    private Integer concurrency;

    private final String consumerInterceptor = "net.zf.module.system.kafka.interceptor.FailureRateInterceptor";


    /**
     * 消费者配置信息
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>(32);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,maxPollIntervalMs);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,group_id);
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,consumerInterceptor );
        return props;
    }


    /**
     * 消费者批量工厂
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setBatchListener(true);
        factory.setConcurrency(concurrency);
        return factory;
    }




    /**
     * 异常处理器
     *
     * @return
     */
    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {
        return (message, exception, consumer) -> {
//            log.error("消息{} , 异常原因{}", message, exception.getMessage());
            log.error("consumerAwareListenerErrorHandler called");

            return null;
        };
    }

}


使用

   @KafkaListener(topicPattern = KafkaTopicConstant.ATTACK_MESSAGE + ".*",
            containerFactory = "batchFactory",
            errorHandler = "consumerAwareListenerErrorHandler",
            id = "attackConsumer")
    public void processMessage(List<String> records, Acknowledgment ack)  {
        log.info("AttackKafkaConsumer 当前线程 {} , 本次拉取的数据总量:{} ", Thread.currentThread().getId(), records.size());
        try {
            List<AttackMessage> attackMessages = new ArrayList();
            records.stream().forEach(record -> {
                messageExecutorFactory.process(KafkaTopicConstant.ATTACK_MESSAGE).execute(record, attackMessages);
            });
            if (!attackMessages.isEmpty()) {
                attackMessageESService.addDocuments(attackMessages, false);
            }
        } finally {
            ack.acknowledge();
        }
    }

在这段代码中,@KafkaListener注解表示这是一个Kafka消费者,

  • topicPattern参数指定了该消费者要监听的主题的模式,即以 KafkaTopicConstant.ATTACK_MESSAGE开头的所有主题。
  • containerFactory参数指定了用于创建Kafka监听器容器的工厂类别名。
  • errorHandler参数指定了用于处理监听器抛出异常的错误处理器。id参数指定了该消费者的ID。

在该消费者的方法中,当有消息到达时,records参数将包含一组消息记录,ack参数用于手动确认已经消费了这些消息。

在方法中,首先记录了当前线程ID和拉取的数据总量。将消息记录逐一处理,并将处理结果存储在一个名为attackMessages的列表中。如果列表不为空,则将其添加到ES搜索引擎中。

最后,手动确认已经消费了这些消息。


【控制】


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

 
@Slf4j
@RestController
public class KafkaConsumerController {

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    /**
     * 开启监听
     */
    @GetMapping("/start")
    public void start() {
        // 判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer("attackConsumer").isRunning()) {
            log.info("start  ");

            registry.getListenerContainer("attackConsumer").start();
        }
        // 将其恢复
        registry.getListenerContainer("attackConsumer").resume();

        log.info("resume over ");
    }

    /**
     * 关闭监听
     */
    @GetMapping("/pause")
    public void pause() {
        // 暂停监听
        registry.getListenerContainer("attackConsumer").pause();

        log.info("pause");
    }
}
    

扩展

KafkaListenerEndpointRegistry

KafkaListenerEndpointRegistry是 Spring Kafka 提供的一个组件,用于管理 Kafka 消费者监听器的注册和启动。它是一个接口,提供了管理 Kafka 监听器容器的方法,如注册和启动监听器容器,以及暂停和恢复监听器容器等。

在 Spring Boot 应用程序中使用 @KafkaListener 注解时,Spring Kafka 会自动创建一个 KafkaListenerEndpointRegistry 实例,并使用它来管理所有的 Kafka 监听器容器。 它是 Spring Kafka 中的一个核心组件,用于实现 Kafka 消费者的监听和控制。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/594194.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

毫米波雷达信号处理中的通道间相干与非相干积累问题

说明 相干和非相干积累是雷达信号处理中的常用方法&#xff0c;这两个概念一般是用在多脉冲积累这个问题上&#xff1a;积累可以提高信号的SNR&#xff0c;从而提高检出概率。不过本文内容与脉冲积累无关&#xff0c;本文讨论的话题是将这两个概念(non-coherent combination、c…

HEVC变换编码介绍

介绍 ● 图像变换编码是指将以空间域中像素形式描述的图像转化至变换域&#xff0c;以变换系数的形式加以表示&#xff1b; ● 图像都含有较多平坦区域和内容变化缓慢的区域&#xff0c;适当的变换可使图像能量在空间域的分散分布转为在变换域的相对集中分布&#xff0c;实现…

推出“百亿生态”,拼多多“极限左移”

文 | 螳螂观察 作者 | 陈小江 任何一个成功穿越周期的企业&#xff0c;都有一个共同的特点——在发展顺利的时候&#xff0c;主动跳出“舒适圈”&#xff0c;进而跳出一般企业“起始、成长、成熟、衰退”的发展周期&#xff0c;为企业发展画出漂亮的S型增长曲线。 日前&…

DC1通关

环境自己百度装好。 我的一台kali&#xff0c;一台DC都是52网段 1.nmap 扫一扫52网段 确定是143&#xff0c;然后针对143进行扫描 80开放&#xff0c;进去。 老熟人了 Drupal&#xff0c;直接msf开打 试了几个&#xff0c;use2直接getshell了 看看权限 尝试SUID提权 进入sh…

零基础如何入门网络安全?

要学习网络安全&#xff0c;其实自学是有局限的&#xff0c;没有设备和网络环境&#xff0c;除了web渗透&#xff0c;其他很多方面&#xff0c;只能在培训班实战环境里才合适&#xff0c;比如路由交换技术、安全设备、学会怎么架构和配置一个企业网络安全只架构。 还要学习系统…

chatgpt赋能Python-python中导入numpy

介绍 在Python编程领域中&#xff0c;NumPy是一个非常常用的库&#xff0c;它提供了高性能的多维数组对象和许多用于操作这些数组的函数。在科学计算、数据分析和机器学习等领域中&#xff0c;NumPy是必不可少的工具。 本文将详细介绍如何在Python中导入NumPy库&#xff0c;并…

opencv初步了解

https://www.bilibili.com/video/BV1PV411774y?p2&vd_sourcee7d12c9f66ab8294c87125a95510dac9 这里写目录标题 下载计算机眼中的图像视频gray cv2.cvColor(frame, cv.COLOR_BGR2GRAY) ROI边界填充数值计算图像融合 下载 pip install input cv2 cv2.__version__下载一些…

io之io模型

写在前面 本文一起看下常见的io模型。 1&#xff1a;基础知识 同步异步&#xff0c;阻塞阻塞&#xff0c;区别如下&#xff1a; 同步异步:描述的通信模式&#xff0c;即结果如果是主动的获取则是同步&#xff0c;处理结果是被动的接收则是异步 阻塞非阻塞&#xff1a;描述的…

网页提交文件无法打开问题解决办法(以学习通为例)

时长会碰到这样的情况&#xff0c;日常实训课在机房写实训作业时&#xff0c;将未完成的作业先暂存先在学习通里&#xff0c;但后续在登陆学习通时发现未提交的附件打不开了&#xff0c;经过翻阅之前web的相关资料&#xff0c;总结出了这样的解决办法&#xff0c;供各位参考。 …

时间基础概念及Linux中的时间函数

时间基础概念及Linux中的时间函数 时间相关概念GMT 时间UTC 时间时区 Time Zone夏令时 DST本地时间 localtime Linux 系统中的时间时钟基础概念系统节拍数 jiffiesLinux系统查看时间及配置时区获取时间函数获取 当前时间 time()获取 当前时间&#xff08;微秒&#xff09; gett…

Qt Quick系列(5)—键盘输入

&#x1f680;作者&#xff1a;CAccept &#x1f382;专栏&#xff1a;Qt Quick 文章目录 前言代码示例单一按键组合按键 前言 本篇将介绍如何处理Qt Quick中的键盘输入。键盘输入在现代应用程序中扮演着重要角色&#xff0c;无论是快捷键还是文本输入都离不开它。通过本篇教…

(2021,FastGAN)用于高保真 few-shot 图像合成的更快、更稳定的 GAN 训练

Towards faster and stabilized gan training for high-fidelity few-shot image synthesis 公众号&#xff1a;EDPJ 目录 0. 摘要 1. 简介 2. 相关工作 3. 方法 3.1 跳跃层通道激励 3.2 自监督判别器 4. 实验 4.1 图像合成性能 4.2 更多分析与应用 5. 结论 参…

JL-8D/4X1 定时限电流继电器 用于输变电线路中 JOSEF约瑟

名称:定时限电流继电器型号:JL-8D/4X1触点容量250V5A功率消耗&#xff1c;5W返回系数0.90.97整定范围0.039.9A;0.130A辅助电源24220VDC/AC 系列型号&#xff1a; JL-8D/3X1定时限电流继电器&#xff1b;JL-8D/3X2定时限电流继电器&#xff1b; JL-8D/4X1定时限电流继电器&am…

【发电厂 HDLN-1-2Z 不带辅助电源电流继电器 导轨安装 JOSEF约瑟】

品牌&#xff1a;上海约瑟&#xff0c;名称&#xff1a;不带辅助电源电流继电器&#xff0c;型号&#xff1a;HDLN-1-2Z-2&#xff0c;触点容量&#xff1a;250V5A&#xff0c;整定范围&#xff1a;2-99.9A;特点&#xff1a;精度高&#xff0c;整定范围宽&#xff0c;动作快&am…

看完这篇 教你玩转渗透测试靶机vulnhub—DarkHole2

Vulnhub靶机DarkHole渗透测试详解 Vulnhub靶机介绍&#xff1a;Vulnhub靶机下载&#xff1a;Vulnhub靶机安装&#xff1a;Vulnhub靶机漏洞详解&#xff1a;①&#xff1a;信息收集&#xff1a;②&#xff1a;Git信息泄露&#xff1a;③&#xff1a;SQL注入&#xff1a;④&#…

APP出海的现状与挑战​

随着国内移动互联网市场的饱和&#xff0c;越来越多的国内APP开发者和企业将目光投向了海外市场&#xff0c;寻求新的增长机会。根据艾瑞咨询的数据&#xff0c;2020年上半年&#xff0c;全球APP下载量达到715亿次&#xff0c;用户总支出达到501亿美元&#xff0c;其中中国出海…

Pytorch基本概念和使用方法

目录 1 Adam及优化器optimizer&#xff08;Adam、SGD等&#xff09;是如何选用的&#xff1f; 1&#xff09;Momentum 2&#xff09;RMSProp 3&#xff09;Adam 2 Pytorch的使用以及Pytorch在以后学习工作中的应用场景。 1&#xff09;Pytorch的使用 2&#xff09;应用场…

vue methods 互相调用的方法

methods是一个内置的函数&#xff0c;主要用于两个组件之间的数据传递&#xff0c;也就是调用方法。下面给大家介绍一个在 vue中互相调用的方法&#xff0c;在使用过程中可以参考一下。 methods实现了两个组件之间数据的传递&#xff0c;我们先来看一下 Methods是如何实现数据传…

统计软件与数据分析Lesson17----利用pytorch构建LSTM预测股票收益率详细教程

利用pytorch构建LSTM预测股票收益率详细教程 1. 整体实现思路2.代码编写2.1 step1:导入所需的库2.2 step2: 读取数据、构建训练样本2.3 step3: 定义部分辅助函数2.4 step4:LSTM模型构建2.5 step5:模型训练2.6 step6:模型预测和评估 3. 小结 1. 整体实现思路 step1:导入所需的库…

对抗样本攻击

目录 一、对抗样本攻击的基本原理 1.1 什么是对抗样本攻击和对抗样本 1.2 对抗样本攻击的基本思路 1.3 对抗样本攻击的分类 1.3.1 按攻击效果分类 1.3.2 按攻击者能力分类 1.3.3 按攻击环境分类 1.4 对抗扰动的衡量 二、对抗样本攻击方法 一、对抗样本攻击的基本原理 …