@KafkaListener 详解及消息消费启停控制

news2024/9/21 0:36:49

参考:Kafka参数

一、@KafkaListener注解

@KafkaListener(id = "11111", groupId = "demo-group",topics = Constants.TOPIC)
    public void listen(String msgData) {
    LOGGER.info("收到消息" + msgData);
}  
 
@KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix",
                                 topics = Constants.TOPIC)
    public void listen2(String msgData) {
    LOGGER.info("收到消息" + msgData);
}
 
@KafkaListener(id = "3333", groupId = "demo-group2", topics = Constants.TOPIC)
    public void listen3(String msgData) {
    LOGGER.info("收到消息" + msgData);
}
 
@KafkaListener(id = "4444", groupId = "demo-group2", topics = Constants.TOPIC)
    public void listen4(String msgData) {
    LOGGER.info("收到消息" + msgData);
}

(1) id: 默认是每个Listener实例的重要标识。

对于整个日志的排查起着至关重要的作用。如果不指定groupId,那么id将直接作为groupId。可以使用另外一个属性 idIsGroup=false关闭,默认是true。

(2) goupId: 每个消费者所属的组。

每个消费者都有自己所属的组。一个组中可以有多个消费者。

一个Topic的分区只能被同一个组下的某个消费者消费。从日志上来看,侧面也反映的消费模式是 Subscribed 订阅模式,不是手动的assign模式。

[Consumer clientId=consumer-1, groupId=demo-group2] Subscribed to topic(s): COLA
[Consumer clientId=consumer-2, groupId=demo-group] Subscribed to topic(s): COLA
[Consumer clientId=consumer-3, groupId=demo-group2] Subscribed to topic(s): COLA
[Consumer clientId=prefix-0, groupId=demo-group] Subscribed to topic(s): COLA

(3) clientIdPrefix: 消费者clientId前缀

@KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix", topics = Constants.TOPIC)
public void listen2(String msgData) {
    LOGGER.info("收到消息" + msgData);
}

如下图,共有4个消费者。有个消费者配置了clientIdPrefix属性为"prefix",所以该消费者的clientId以指定的"prefix"开头。如果没有配置,该实例的clientId默认为"consumer"。同时,每个实例的clientId后面挂了一个数字,来标示它在整个kafka集群中的消费者编号,编号从0开始。这里配置了4个消费者,所以消费者实例编号有0、 1、 2、 3。

(4) autoStartup

public @interface KafkaListener ...
    /**
     * Set to true or false, to override the default setting in the container factory. May
     * be a property placeholder or SpEL expression that evaluates to a {@link Boolean} or
     * a {@link String}, in which case the {@link Boolean#parseBoolean(String)} is used to
     * obtain the value.
     * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
     * @return true to auto start, false to not auto start.
     * @since 2.2
     */
    String autoStartup() default "";

是否自动启动,如果是 false,默认不生效,需要手动唤醒。

看源代码上作者给的的注释:该注解指定的值优先级比工厂里指定的高。

另外可以使用 ${} 占位符的形式,支持配置。

application.yaml:
listener:
  auto:
    startup: true  
 
java :
    @KafkaListener(... containerFactory = "batchContainerFactory",
      autoStartup = "${listener.auto.startup}")
    public void listen4(List<ConsumerRecord> list, Acknowledgment acknowledgment)...
注:每个消费者实例对象内部持有两个属性。
boolean running
boolean paused
有几个改变状态的方法:
调用start()方法后,running转为true
调用stop()方法后,running转为false
调用pause()方法后,paused转为true
调用resume()方法后,paused转为false

只有running=true 、 paused=false 的消费者实例才能正常消费数据。
注解上的autoStartup改变的是running属性。
    @KafkaListener(id = "11111", groupId = "demo-group", 
                    topics = Constants.TOPIC, autoStartup = "false")
    public void listen(String msgData) throws InterruptedException {
        LOGGER.info("收到消息" + msgData);
        Thread.sleep(1000);
    }

二、Kafka Listener任务暂停及恢复

2.1 唤醒消费者实例, 示例代码:

    @Autowired
    private KafkaListenerEndpointRegistry registry;
 
    // 获取到id="11111" 的消费实例对象
    MessageListenerContainer listenerContainer = 
                        this.registry.getListenerContainer("11111");
 
    listenerContainer.pause();  //paused ==> true
  // listenerContainer.stop(); //running==> false

2.2 暂停消费者实例, 示例代码:

    @Autowired
    private KafkaListenerEndpointRegistry registry;
 
    // 获取到id="11111" 的消费实例对象
    MessageListenerContainer listenerContainer = 
                        this.registry.getListenerContainer("11111");
 
    listenerContainer.pause();  //paused ==> true
    // listenerContainer.stop(); //running==> false

2.3 定时任务自动启动

    @Autowired
    private KafkaListenerEndpointRegistry registry;
 
    // 定时器,每天凌晨0点开启监听
    @Scheduled(cron = "0 0 0 * * ?")
    public void startListener() {
        log.info("开启监听");
        // 判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer("11111").isRunning()) {
            registry.getListenerContainer("11111").start();
        }
        registry.getListenerContainer("11111").resume();
    }
 
    // 定时器,每天早上10点关闭监听
    @Scheduled(cron = "0 0 10 * * ?")
    public void shutDownListener() {
        log.info("关闭监听");
        registry.getListenerContainer("11111").pause();
    }

三、@KafkaListener注解方法参数汇总

@KafkaListener注解能够使用到如下8种方法上面。至于监听单条数据的前4种方法,与批量监听多条数据的后4种方法,主要依据kafka的具体配置。

    @KafkaListener(....)
    public void listen1(String data) 
 
    @KafkaListener(....)
    public void listen2(ConsumerRecord<K,V> data) 
 
    @KafkaListener(....)
    public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment) 
 
    @KafkaListener(....)
    public void listen4(ConsumerRecord<K,V> data,
                        Acknowledgment acknowledgment, Consumer<K,V> consumer)
 
    @KafkaListener(....)
    public void listen5(List<String> data) 
 
    @KafkaListener(....)
    public void listen6(List<ConsumerRecord<K,V>> data) 
 
    @KafkaListener(....)
    public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment) 
 
    @KafkaListener(....)
    public void listen8(List<ConsumerRecord<K,V>> data, 
                        Acknowledgment acknowledgment, Consumer<K,V> consumer)

四、KafkaListenerContainerFactory配置

在application.yaml中配置的kafka参数,以spring.kafka开头的参数族,全部用于kafka默认对象的创建。

4.1 kafka参数默认封装对象

所有kafka参数默认封装到对象:KafkaProperties对象中,可使用@Autowired自动注入。

    @Autowired
    private KafkaProperties properties;

4.2 @KakfkaListener注解标记监听实例对象

如不特殊指定,默认使用在yaml中的所有spring.kafka.consumer与spring.kafka.listener下的参数。

监听器实例对象自动绑定到上述配置文件,是由于它默认使用的"containerFactory" 是名为"kafkaListenerContainerFactory"的bean。

源码注释如下,如果不特殊指定,则默认的容器工厂将会被使用。

package org.springframework.kafka.annotation;
 
public @interface KafkaListener ...
    /**
     * The bean name of the {@link 
            org.springframework.kafka.config.KafkaListenerContainerFactory}
     * to use to create the message listener container 
               responsible to serve this endpoint.
     * <p>If not specified, the default container factory is used, if any.
     * @return the container factory bean name.
     */
    String containerFactory() default "";
默认的容器工厂代码如下,均为Springboot与Kafka框架提供的类。
这两个bean将spring.kafka.listener与spring.kafka.consumer下的参数全部组装到名为"kafkaListenerContainerFactory"这个bean中。该bean供@KafkaListener标记的监听实例使用。
因此可以得出结论:
如果不想使用默认的"kafkaListenerContainerFactory"容器工厂,则必须手动创建一个"ConcurrentKafkaListenerContainerFactory"类的实例,并且其bean name 不能叫"kafkaListenerContainerFactory"(不然与默认的工厂实例重名了),然后把该对象加入spring容器中。当在使用@KafkaListener标注的监听实例对象时,手动指定该注解"containerFactory"属性为刚才自定义的容器工厂实例bean name。
package org.springframework.boot.autoconfigure.kafka;
 
class KafkaAnnotationDrivenConfiguration {
 
    @Bean
    @ConditionalOnMissingBean
    ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer = 
                            new ConcurrentKafkaListenerContainerFactoryConfigurer();
        configurer.setKafkaProperties(this.properties);
        MessageConverter messageConverterToUse = 
                     (this.properties.getListener().getType().equals(Type.BATCH))
                                ? this.batchMessageConverter : this.messageConverter;
        configurer.setMessageConverter(messageConverterToUse);
        configurer.setReplyTemplate(this.kafkaTemplate);
        configurer.setTransactionManager(this.transactionManager);
        configurer.setRebalanceListener(this.rebalanceListener);
        configurer.setErrorHandler(this.errorHandler);
        configurer.setBatchErrorHandler(this.batchErrorHandler);
        configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
        configurer.setRecordInterceptor(this.recordInterceptor);
        return configurer;
    }
 
    @Bean
    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = 
                                    new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory
                .getIfAvailable(() -> 
           new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
        return factory;
    }

4.3 自定义容器工厂实例代码示例:

    @Autowired
    private KafkaProperties properties;
    
    @Bean("batchContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainer() {
        ConcurrentKafkaListenerContainerFactory<?, ?> container =
                new ConcurrentKafkaListenerContainerFactory<>();
 
        Map<String, Object> stringObjectMap = this.properties.buildConsumerProperties();
        stringObjectMap.put("enable.auto.commit", false);
        container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(stringObjectMap));
        // 没有topic是否禁止系统启动
        container.setMissingTopicsFatal(true);
        // 并发
        container.setConcurrency(1);
        // 批量接收
        container.setBatchListener(true);
        // 如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
        container.getContainerProperties().setPollTimeout(5000);
        // 设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
        container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // 设置kafka 异常重试次数 第一个参数等待重试时间,第二个参数数提交次数,这里设置不重试,默认重试10次 抛出异常后调用
        // factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 0L)));
        return container;
    }
  
    @KafkaListener(id = "4444", groupId = "demo-group2", topics = Constants.TOPIC, containerFactory = "batchContainerFactory")
    public void listen4(List<ConsumerRecord> list, Acknowledgment acknowledgment) {
        LOGGER.info("4444收到消息" + list.size());
        acknowledgment.acknowledge();
    }

4.4 示例二

@Bean
public ConsumerFactory<String, ListingMessage> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
    props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100000);
    props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 240000);
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
        new JsonDeserializer<>(ListingMessage.class));
}

@Bean(KAFKA_LISTENER_CONTAINER_FACTORY) 
public concurrentKafkaListenerContainerFactory<String, ListingMessage> listingKafkaListenerContainerFactory(
    ConsumerFactory<String, ListingMessage> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, ListingMessage> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(listingConsumerFactory);
    factory.setConcurrency(1);
    factory.setAutoStartup(false);
    factory.setBatchListener(true);
    return factory;
}

五、吞吐量

如下,这里我只列出了影响本例的几条参数。

spring:
  kafka:
    consumer:
      enable-auto-commit: true
      # max-poll-records: 20
 
    listener:
      ack-mode: batch
      type: batch
      concurrency: 5

如果设置spring.kafka.listener.concurrency为5,共两个消费者,Topic名为"COLA",共8个分区。代码如下。

    @KafkaListener(id = "4444", groupId = "demo-group2", topics = "COLA")
    public void listen4(List<String> msgData) {
        LOGGER.info("收到消息" + msgData);
    }
 
    @KafkaListener(id = "5555", groupId = "demo-group2", topics = "COLA")
    public void listen5(List<String> msgData) {
        LOGGER.info("收到消息" + msgData);
    }
 
    @Bean
    public NewTopic newTopic() {
        return new NewTopic(Constants.TOPIC, 8, (short) 1);
    }
系统每个消费者都创建了5个线程,共10个线程。换句话说,每个消费者实例(@KafkaListener标记的方法)同时都会有5个线程在跑。每个线程接收的分区都不一样。
另外,这两个消费者属于同一个组,Topic只有8个分区,2个消费者共10个线程,一个线程消费一个分区,所以必然有两个线程最后属于空闲状态。
从实际结果上来看(下面的日志),没想到系统为id="4444"的消费者实际只分配到了3个分区,有两个线程处于空闲状态。id="5555"的消费者达到了预期,共消费了5个分区,分配到了5个线程!
[4444-2-C-1]: demo-group2: partitions assigned: []
[4444-3-C-1]: demo-group2: partitions assigned: []
[4444-4-C-1]: demo-group2: partitions assigned: [COLA-1]
[4444-1-C-1]: demo-group2: partitions assigned: [COLA-7]
[5555-2-C-1]: demo-group2: partitions assigned: [COLA-3]
[5555-4-C-1]: demo-group2: partitions assigned: [COLA-5]
[5555-3-C-1]: demo-group2: partitions assigned: [COLA-4]
[4444-0-C-1]: demo-group2: partitions assigned: [COLA-6]
[5555-0-C-1]: demo-group2: partitions assigned: [COLA-0]
[5555-1-C-1]: demo-group2: partitions assigned: [COLA-2]

六、 结论:

  1. concurrency值对应@KafkaListener的消费者实例线程数目,如果concurrency数量大于partition数量,多出的部分分配不到partition,会被闲置。

  1. 设置的并发量不能大于partition的数量,如果需要提高吞吐量,可以通过增加partition的数量达到快速提升吞吐量的效果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/359861.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

React系列之Redux

1 Redux概述 Redux 是 JavaScript 状态容器&#xff0c;提供可预测化的状态管理。Redux中文文档 Redux 和react没有必然关系&#xff0c;redux可以应用于各种框架&#xff0c;包括jquery&#xff0c;甚至js都可以使用redux&#xff0c;只不过redux和react更加搭配。redux也推…

javaee之git

一张图说明git 分支之间的操作 这个 框里面的linux命令都可以用 操作开始&#xff1a; 在master分支里面创建了一个hello.txt&#xff0c;并且放入了一些数据进去 这个去查一下日志 问题&#xff1a;当你放入了暂存区&#xff0c;你去查看日志会报错 一个分支这个指针head永…

Django框架之Django使用自带模板

Django使用自带模板 1 配置 在工程中创建模板目录templates。 在settings.py配置文件中修改TEMPLATES配置项的DIRS值&#xff1a; TEMPLATES [{BACKEND: django.template.backends.django.DjangoTemplates,DIRS: [os.path.join(BASE_DIR, templates)], # 此处修改APP_DIR…

vue-router 的基本用法

vue-router 的基本用法 1.什么是 vue-router vue-router 是 vue.js 官方给出的路由解决方案。它只能结合 vue 项目进行使用&#xff0c;能够轻松的管理 SPA 项目中组件的切换。 vue-router 的官方文档地址&#xff1a;https://router.vuejs.org/zh/ 2.vue-router 安装和配置的…

GIT分支管理策略

git基本操作git操作的前提条件:本地windows安装git学习idea中的插件使用idea的git基本操作:远程仓库remote更新fetch:git fetch拉取pull: git pull上传push: git push合并merge: git merge 合并分支本地提交commit:git commit分支branch: git branch 查看分支或者 切换分支上述…

SpringBoot整合Junit

创建项目 idea创建空项目Empty Project。项目中创建模块&#xff0c;选择SpringBoot Initialize快速构建SpringBoot项目。 依赖这里什么也不用选择。 pom文件中默认有两个依赖: spring-boot-starter springboot如果不导入任何依赖&#xff0c;默认的一个基础依赖。spring-…

5.3 线程安全问题解决方案

文章目录1.概述2.同步和异步3.synchronized同步关键字3.1 写法3.2 前提3.3 特点4.练习-改造售票案例-继承Thread4.1 代码实现4.2 注意事项5.练习-改造售票案例-实现Runnable接口5.1 代码实现5.2 注意事项6.练习-改造售票案例-使用线程池6.1 代码实现6.2 代码分析7.线程锁7.1 悲…

七、确保web安全的HTTPS

HTTPS 1、HTTP 的缺点 HTTP的主要缺点&#xff1a; 通信使用明文&#xff08;不加密&#xff09;&#xff0c;内容可能会被窃听 HTTP 本身不具备加密的功能&#xff0c;因此无法做到对通信整体&#xff08;使用 HTTP 协议通信的请求和响应的内容&#xff09;进行加密。所以按…

actipro-winforms-controls-23.1.0 Crack

actipro-winforms一组用于构建漂亮的 Windows 窗体桌面应用程序的 UI 控件&#xff0c;用于构建 IDE 的高级停靠窗口、MDI、属性网格、树控件和文件夹/文件浏览器&#xff0c;用于常见数据类型、自动完成、屏蔽编辑和代码编辑的强大编辑器&#xff0c;功能区、图表、微型图表、…

Centos7 安装 MySQL 8.0.31详细教程(亲测无障碍必成功)

操作之前&#xff0c;首先检查防火墙是否关闭&#xff08;直接设置永久关闭&#xff09; 查看防火墙状态&#xff1a;firewall-cmd --state 禁止firewall开机启动 永久生效&#xff1a;systemctl disable firewalld.service 重启电脑&#xff1a;reboot 1. 在FinallShell上传或…

Julia 教程

Julia 是一个开源的编程语言&#xff0c;采用 MIT 许可证&#xff0c;每个人都可以免费使用。 Julia 是一个面向科学计算的高性能动态高级程序设计语言。 Julia 最初是为了满足高性能数值分析和计算科学的需要而设计的&#xff0c;不需要解释器&#xff0c;速度快。 Julia 于…

筛选效率直接起飞,复杂场景秒变简单丨三叠云

表单 路径 表单设计 >> 高级筛选 功能简介 筛选条件优化升级&#xff0c;支持多种混合条件筛选。 功能描述&#xff1a; 本次更新支持2个层级的条件&#xff0c;系统处理数据时&#xff0c;将会先根据第二个层级的条件关系找出数据、继而再根据第一层级即分组之间的…

解决CondaUpgradeError网上的方法都不奏效(回退版本、upgrade/update都不行)的问题和CondaValueError

问题描述 Executing transaction: failed ERROR conda.core.link:_execute(502): An error occurred while installing package ‘conda-forge::certifi-2022.9.24-pyhd8ed1ab_0’. CondaUpgradeError: This environment has previously been operated on by a conda version…

Java 基础——File 类与 I/O 流

目录1.java.io.File 类的使用1.1.概述1.2.构造器1.3.常用方法1.3.1.获取文件和目录基本信息1.3.2.列出目录的下一级1.3.3.File类的重命名功能1.3.4.判断功能的方法1.3.5.创建、删除功能2.I/O 流原理及流的分类2.1.I/O 原理2.2.流的分类2.3.流的 API3.节点流之一&#xff1a;Fil…

项目经理PMO分别是什么?

1. PMO是什么&#xff1f;&#xff08;1&#xff09;定义PMO项目经理&#xff08;Project Management Office Manager&#xff09;&#xff0c;也称为项目管理办公室经理、项目管理中心或者项目管理部。一般来说&#xff0c;PMO就是负责公司项目管理政策、标准的制定&#xff0…

C/C++每日一练(20230220)

目录 1. 利用字母组成图形 2. 子集 II 3. 路径总和 II 附录 深度优先搜索算法 广度优先搜索算法 1. 利用字母组成图形 利用字母可以组成一些美丽的图形&#xff0c;下面给出了一个例子&#xff1a; ABCDEFG BABCDEF CBABCDE DCBABCD EDCBABC 这是一个5行7列的图形&…

ROS2入门-话题-服务-接口

ROS2入门-话题-服务-接口 本文学习的是《动手学ROS2》 报错放在另一个文章中。 文章目录ROS2入门-话题-服务-接口Linux常用命令sudochomd 修改文件权限安装软件apt安装软件dpkg安装deb包打开终端VS code关机/重启静态链接库/动态链接库Cmake设置treeROS节点功能包创建功能包列…

数据分析,如何看待我国1400万人忍受极端通勤,单程通勤超60分钟!

女生极限通勤每天来回 6.5 小时&#xff0c;上海某位女士公司离家单程约100公里左右&#xff0c;单程通勤需要3小15分&#xff0c;来回通勤时间为6.5小时。如此长的通勤时间却不是个例&#xff0c;全国有超1400万人正在忍受单程超过60分钟的极端通勤&#xff0c;如何看待我国 1…

将默认安装的 WSL2 迁移至指定目录

将默认安装的 WSL2 迁移至指定目录WSL2 默认安装在 C 盘下&#xff0c;系统盘空间有限&#xff0c;推荐更改安装目录。 1. 默认安装的 WSL2 目录 C:\Users\cheng\AppData\Local\Packages\CanonicalGroupLimited.Ubuntu20.04onWindows_79rhkp1fndgsc\LocalState\ext4.vhdx 2. …

使用Swiper插件实现视频轮播,怎么实现切换自动播放视频?

一、需求分析 这两天讨论了一个项目需求&#xff0c;刚开始是希望&#xff1a;轮播图中嵌入视频&#xff0c;轮播到视频自动播放&#xff0c;播放完毕切换下一张轮播&#xff0c;手动切换时暂停播放视频。后面因为自动播放没有声音&#xff0c;便暂时放弃了这个想法&#xff0…