Spring Kafka—— KafkaListenerEndpointRegistry 隐式注册分析

news2025/1/11 23:36:48

由于我想在项目中实现基于 Spring kafka 动态连接 Kafka 服务,指定监听 Topic 并控制消费程序的启动和停止这样一个功能,所以就大概的了解了一下 Spring Kafka 的几个重要的类的概念,内容如下:

  1. ConsumerFactory
    • 作用:负责创建 Kafka 消费者的实例。ConsumerFactory 是一个简单的工厂类,用于封装消费者的配置(如bootstrap servers, key deserializer, value deserializer等)并生成Consumer实例。
    • 用法:通常在Spring配置类中定义,并通过依赖注入提供给KafkaListenerContainerFactory
  2. ConcurrentKafkaListenerContainerFactory
    • 作用:这个工厂类用于创建 ConcurrentMessageListenerContainer 实例,该容器管理多个Kafka MessageListenerContainer来提供并发消息消费。
    • 特点:可以设置并发消费的数量,即同时运行的MessageListenerContainer的数量。
      支持消息过滤、错误处理和事务管理。
    • 用法:在Spring配置类中定义,并设置其ConsumerFactory和其他相关配置。然后,可以通过@KafkaListener注解直接使用,Spring会自动使用这个工厂来创建监听器。
  3. KafkaListenerEndpointRegistry
    • 作用:这是一个管理类,用于管理应用中所有由@KafkaListener注解创建的消息监听器容器。
    • 特点:提供了启动和停止监听器的方法,可以在运行时控制监听器。
      可以用来查询当前所有注册的监听器的状态。
    • 用法:通常自动配置,可以通过自动注入到任何Spring管理的Bean中,用于运行时管理监听器。
  4. KafkaTemplate
    • 作用:这是一个高级抽象,用于生产消息到Kafka主题。
    • 特点:提供同步和异步发送消息的方法。
      支持事务消息发送。
    • 用法:定义在Spring配置类中,注入生产者工厂ProducerFactory,并用于应用中的消息发送。
  5. @KafkaListener
    作用:注解用于标记方法以作为Kafka消息的监听器,这些方法会自动被Spring容器管理,并在有新消息时触发。
    特点:
    可以指定主题、分区和消费组。
    支持并发消费。
    用法:放在组件的方法上,方法参数可以灵活地映射消息的key、value、headers等。

从上面的内容可以看到,KafkaListenerEndpointRegistry 这个类是管理消息监听容器的,并提供了启动和停止监听器的方法,于是我就想创建这个类来完成我的需求功能。当我直接写如下内容时:

@Component
public class KafkaConfig {

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @PostConstruct
    public void init() {
        System.out.println(registry);
    }
}

IDEA提示了 Could not autowire. No beans of ‘KafkaListenerEndpointRegistry’ type found. 但是我启动 SpringBoot 项目却没有报错 :
在这里插入图片描述
我在我的项目中是没有加 @EnableKafka 这样的注解的,代码如下:

@SpringBootApplication
public class SpringKafkaExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaExampleApplication.class, args);
    }
}

引入的依赖:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

于是我就比较好奇,项目启动的时候是在什么地方声明了 KafkaListenerEndpointRegistry 这个 bean 的。

KafkaListenerEndpointRegistry 隐式注册分析

SpringBoot 对于 kafka 有如下的自动配置:

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {

	private final KafkaProperties properties;

	private final RecordMessageConverter recordMessageConverter;

	private final RecordFilterStrategy<Object, Object> recordFilterStrategy;

	private final BatchMessageConverter batchMessageConverter;

	private final KafkaTemplate<Object, Object> kafkaTemplate;

	private final KafkaAwareTransactionManager<Object, Object> transactionManager;

	private final ConsumerAwareRebalanceListener rebalanceListener;

	private final CommonErrorHandler commonErrorHandler;

	private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;

	private final RecordInterceptor<Object, Object> recordInterceptor;

	KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
			ObjectProvider<RecordMessageConverter> recordMessageConverter,
			ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
			ObjectProvider<BatchMessageConverter> batchMessageConverter,
			ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
			ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
			ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener,
			ObjectProvider<CommonErrorHandler> commonErrorHandler,
			ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
			ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
		this.properties = properties;
		this.recordMessageConverter = recordMessageConverter.getIfUnique();
		this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
		this.batchMessageConverter = batchMessageConverter
			.getIfUnique(() -> new BatchMessagingMessageConverter(this.recordMessageConverter));
		this.kafkaTemplate = kafkaTemplate.getIfUnique();
		this.transactionManager = kafkaTransactionManager.getIfUnique();
		this.rebalanceListener = rebalanceListener.getIfUnique();
		this.commonErrorHandler = commonErrorHandler.getIfUnique();
		this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
		this.recordInterceptor = recordInterceptor.getIfUnique();
	}

	@Bean
	@ConditionalOnMissingBean
	ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
		ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
		configurer.setKafkaProperties(this.properties);
		configurer.setBatchMessageConverter(this.batchMessageConverter);
		configurer.setRecordMessageConverter(this.recordMessageConverter);
		configurer.setRecordFilterStrategy(this.recordFilterStrategy);
		configurer.setReplyTemplate(this.kafkaTemplate);
		configurer.setTransactionManager(this.transactionManager);
		configurer.setRebalanceListener(this.rebalanceListener);
		configurer.setCommonErrorHandler(this.commonErrorHandler);
		configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
		configurer.setRecordInterceptor(this.recordInterceptor);
		return configurer;
	}

	@Bean
	@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
	ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
			ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
			ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
		ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
		configurer.configure(factory, kafkaConsumerFactory
			.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
		return factory;
	}

	@Configuration(proxyBeanMethods = false)
	@EnableKafka
	@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
	static class EnableKafkaConfiguration {

	}
}

可以看到这个配置类里面有一个静态的内部类 EnableKafkaConfiguration 该类上声明了 @EnableKafka 注解,也就是说内部静态类EnableKafkaConfiguration使用了@EnableKafka注解,并且通过@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)确保如果Spring上下文中缺少相应的Bean,则自动激活@EnableKafka功能。这意味着,即便你没有在你的应用配置中显式添加@EnableKafka,这个内部类也可以根据条件自动注册所需的Bean,从而启用Kafka的支持。

@EnableKafka 定义如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaListenerConfigurationSelector.class)
public @interface EnableKafka {
}

这个注解的使用导致了KafkaListenerConfigurationSelector的激活,其源码如下:

@Order
public class KafkaListenerConfigurationSelector implements DeferredImportSelector {

	@Override
	public String[] selectImports(AnnotationMetadata importingClassMetadata) {
		return new String[] { KafkaBootstrapConfiguration.class.getName() };
	}

}

上面的代码中 DeferredImportSelector是Spring框架中一个特殊的接口,它继承自ImportSelector。它主要用于处理配置类的导入,允许更细致地控制配置类的加载顺序。这个接口特别适用于那些依赖于由Spring容器中其他Bean或配置动态决定的配置。
KafkaListenerConfigurationSelector 这个类实现了DeferredImportSelector并通过selectImports方法返回了一个配置类名称的数组。这个方法指定了当Spring处理到这个选择器时,它应该导入KafkaBootstrapConfiguration类。

KafkaBootstrapConfiguration 内容如下:

public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {

	@Override
	public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
		if (!registry.containsBeanDefinition(
				KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {

			registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
					new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));
		}

		if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
			registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
					new RootBeanDefinition(KafkaListenerEndpointRegistry.class));
		}
	}

}

KafkaBootstrapConfiguration 是一个实现了ImportBeanDefinitionRegistrar接口的类,主要用于程序化地注册Bean定义到Spring的ApplicationContext中。通过实现ImportBeanDefinitionRegistrar接口,这个类可以在Spring的配置阶段动态地添加Bean定义。

在这个特定的实现中,KafkaBootstrapConfiguration检查特定的Kafka相关Bean(如KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME和KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)是否已经注册。如果这些Bean尚未注册,它会使用RootBeanDefinition手动注册这些Bean到Spring容器中。

RootBeanDefinition 的功能

RootBeanDefinition是Spring框架中用于定义Bean的一个核心类。它是BeanDefinition接口的一个直接实现,提供了一种配置Spring管理的Bean的方式,包括Bean的类类型、生命周期回调、依赖信息等。

  • Bean配置的详细定义:RootBeanDefinition允许开发者详细定义Bean的创建细节,如构造函数参数、属性值、初始化方法、销毁方法等。
  • 高级功能:它还支持更复杂的配置,如懒加载、自动装配模式、作用域和其他高级特性。
  • 程序化Bean注册:通过使用RootBeanDefinition,开发者可以在运行时动态地注册Bean,这对于条件配置或需要响应不同配置环境的高级用途尤为重要。

KafkaBootstrapConfiguration类中,使用RootBeanDefinition来创建和注册KafkaListenerAnnotationBeanPostProcessorKafkaListenerEndpointRegistry类的实例,这些是设置和管理Kafka消息监听器所必需的。

之后在AbstractBeanFactory会根据 beanName 获取到了 RootBeanDefinition 如下图所示:
在这里插入图片描述
然后在如下所示的位置:
在这里插入图片描述
程序创建了 beanName 为 org.springframework.kafka.config.internalKafkaListenerEndpointRegistry 的实例,具体创建实例的位置如下:
在这里插入图片描述
从调试中可以看到此处实例化了 KafkaListenerEndpointRegistry
所以当我们 springboot 项目引入了

 <dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
 </dependency>

依赖后,即使我们不显示的声明 @EnableKafka 程序也会进行初始化相应的配置。

总结

当Spring Boot项目中引入Spring Kafka依赖后,即使我们没有显式声明@EnableKafka,系统仍会自动进行相应的配置。因此,在项目中尝试注入KafkaListenerEndpointRegistry时,尽管IDE可能会提示“Could not autowire. No beans of ‘KafkaListenerEndpointRegistry’ type found.”,项目依然能够正常启动。这是因为KafkaListenerEndpointRegistry在Spring Kafka的自动配置过程中已被隐式注册。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1616477.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

探索数学语言模型的前沿进展——人工智能在数学教育和研究中的应用

数学一直被认为是科学的基石&#xff0c;对于推动技术进步和解决现实世界问题具有重要意义。然而&#xff0c;传统的数学问题解决方式正面临着数字化转型的挑战。MLMs的出现&#xff0c;预示着数学学习和研究方式的一次革命。 MLMs&#xff0c;包括预训练语言模型&#xff08;…

STM32F1串口

文章目录 1 数据通信的基础概念1.11.21.31.41.5 2 串口(RS-232&#xff09;2.12.22.32.42.5 3 STM32的USART3.13.23.33.53.9 USART寄存器介绍 4 HAL库外设初始化MSP回调机制5 HAL库中断回调机制6 USART/UART异步通信配置步骤 &#xff08;包括HAL库相关函数&#xff09;6.16.26…

SDN基础知识

&#x1f308;个人主页&#xff1a;小新_- &#x1f388;个人座右铭&#xff1a;“成功者不是从不失败的人&#xff0c;而是从不放弃的人&#xff01;”&#x1f388; &#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐️ 留言&#x1f4dd; &#x1f3c6;所属专栏&#xff1…

SQLite的DBSTAT 虚拟表(三十六)

返回&#xff1a;SQLite—系列文章目录 上一篇:SQLite运行时可加载扩展(三十五&#xff09; 下一篇&#xff1a;SQLite—系列文章目录 1. 概述 DBSTAT 虚拟表是一个只读的同名虚拟表&#xff0c;返回 有关用于存储内容的磁盘空间量的信息 的 SQLite 数据库。 示例用例…

【数据结构(邓俊辉)学习笔记】绪论03——递归分析

文章目录 意图目标1. 线性递归数组求和线性递归减而治之 2. 递归分析递归跟踪递推方程典型递推方程 3. 递归模式多递归基多向递归 4. 递归消除空间成本尾递归及其消除 5. 二分递归分而治之数组求和 6 . 效率7. 算法设计优化总结前n项计算算法 意图 数据结构中经常用到递归&…

VScode配置MySQL

1、进入官网&#xff0c;下载MySQL 地址&#xff1a;dev.mysql.com/downloads/mysql/ ZIP方式下载&#xff0c;选择本地的路径进行解压。 2、配置环境变量 形如下方的路径&#xff1a; D:\software\Mysql\mysql-8.3.0-winx64\bin 即是解压位置后文件夹下的bin文件路径 3、初…

在 VSCode 中运行 C#

文章目录 1.为何选择VSCode而不是VS2.操作步骤2.1 安装.NET2.2 安装扩展插件2.2.1 C#2.2.2 Code Runner 3.新建工程HelloCsharp 1.为何选择VSCode而不是VS VS实在是太“重”了&#xff0c;如果只是写一些简单控制台程序进行调试&#xff0c;则完全没必要 2.操作步骤 2.1 安装…

【前端】vue3树形组件使用

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、树形组件简介二、树形组件使用三、总结 前言 随着开发语言及人工智能工具的普及&#xff0c;使得越来越多的人学习使用vue前端工具&#xff0c;本文主要是…

第十、十一章 折线图 + 地图 + 柱状图的绘制

第十章 折线图的绘制 官网&#xff1a;pyecharts - A Python Echarts Plotting Library built with love. 画廊官网&#xff1a;Document 懒人工具&#xff1a;懒人工具-手机APP工具下载-手机软件下载大全 - 173软件站 (ab173.com) 导学 json 定义 &#xff08;1&#xff…

Flask 数据库前后端交互案例-1

Flask 数据库前后端交互案例 目录结构templates目录base.htmlheader.htmlleft.html首页职员管理页面添加员工界面员工编辑页面员工详情界面 后台main.pyapp.pymodels.pyviews.py 数据库数据position.sqlperson.sqlpermission.sqldepartment.sql 目录结构 静态文件链接&#xff…

工装行业项目管理系统哪家好?找企智汇工程项目管理系统!

在工装行业&#xff0c;项目管理是至关重要的一环。好的项目管理系统能够提高工装企业的效率、降低成本、提升客户满意度。在这个竞争激烈的市场中&#xff0c;选择一款好的项目管理系统&#xff0c;对于企业的发展至关重要。 今天&#xff0c;我向大家介绍的是企智汇工程项目…

uniapp自定义顶部导航栏

首先uniapp获取设备信息&#xff1a;uni.getSystemInfo或uni.getSystemInfoSync&#xff0c;可用于设置顶部安全区 留一个设备安全区的位置哦 然后在pages.json文件里配置自定义导航栏 {"pages": [ //pages数组中第一项表示应用启动页&#xff0c;参考&#xff1a…

如何使用 ArcGIS Pro 快速为黑白地图配色

对于某些拍摄时间比较久远的地图&#xff0c;限于当时的技术水平只有黑白的地图&#xff0c;针对这种情况&#xff0c;我们可以通过现在的地图为该地图进行配色&#xff0c;这里为大家讲解一下操作方法&#xff0c;希望能对你有所帮助。 数据来源 教程所使用的数据是从水经微…

windows SDK编程 --- 消息(3)

前置知识 一、消息的分类 1. 鼠标消息 处理与鼠标交互相关的事件&#xff0c;比如移动、点击和滚动等。例如&#xff1a; WM_MOUSEMOVE: 当鼠标在窗口客户区内移动时发送。WM_LBUTTONDOWN: 当用户按下鼠标左键时发送。WM_LBUTTONUP: 当用户释放鼠标左键时发送。WM_RBUTTOND…

Cisco NX-OS Software Release 10.4(3)F - 网络操作系统软件

Cisco NX-OS Software Release 10.4(3)F - 网络操作系统软件 NX-OS 网络操作系统 请访问原文链接&#xff1a;Cisco NX-OS Software Release 10.4(3)F - 网络操作系统软件&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&#xff1a;sysin.org Cisc…

K8s: Ingress对象, 创建Ingress控制器, 创建Ingress资源并暴露服务

Ingress对象 1 &#xff09;概述 Ingress 是对集群中服务的外部访问进行管理的 API 对象&#xff0c;典型的访问方式是 HTTPIngress-nginx 本质是网关&#xff0c;当你请求 abc.com/service/a, Ingress 就把对应的地址转发给你&#xff0c;底层运行了一个 nginx但 K8s 为什么不…

通义灵码牵手阿里云函数计算 FC ,打造智能编码新体验

通义灵码自成功入职阿里云后&#xff0c;其智能编程助手的角色除了服务于阿里云内部几万开发者&#xff0c;如今进一步服务函数计算 FC 产品开发者。近日&#xff0c;通义灵码正式进驻函数计算 FC WebIDE&#xff0c;让使用函数计算产品的开发者在其熟悉的云端集成开发环境中&a…

yolov5 的几个问题,讲的比较清楚

yolov5, 几个问题 【BCELoss】pytorch中的BCELoss理解 三个损失函数原理讲解 https://zhuanlan.zhihu.com/p/458597638 yolov5源码解析–输出 YOLOv5系列(十) 解析损失部分loss(详尽) 1、输入数据是 xywh, 针对原图的, 然后,变成 0-1, x/原图w, y/原图h, w/原图w, h/原图h,…

【Java网络编程】TCP通信(Socket 与 ServerSocket)和UDP通信的三种数据传输方式

目录 1、TCP通信 1.1、Socket 和 ServerSocket 1.3、TCP通信示例 2、UDP的三种通信&#xff08;数据传输&#xff09;方式 1、TCP通信 TCP通信协议是一种可靠的网络协议&#xff0c;它在通信的两端各建立一个Socket对象 通信之前要保证连接已经建立&#xff08;注意TCP是一…

从win10升级到win11后,安全中心没有病毒防护的解决办法

从win10升级到win11后&#xff0c;安全中心没有病毒防护的解决办法 问题就是Win11的安全中心打开没有病毒和威胁防护选项&#xff08;不装其它第三方防病毒软件的情况下&#xff09;。 这可能是因为注册表出了问题。 具体操作如下&#xff1a; 点击Windows左下角搜索栏&…