由于我想在项目中实现基于 Spring kafka 动态连接 Kafka 服务,指定监听 Topic 并控制消费程序的启动和停止这样一个功能,所以就大概的了解了一下 Spring Kafka 的几个重要的类的概念,内容如下:
ConsumerFactory
- 作用:负责创建 Kafka 消费者的实例。
ConsumerFactory
是一个简单的工厂类,用于封装消费者的配置(如bootstrap servers, key deserializer, value deserializer等)并生成Consumer
实例。 - 用法:通常在Spring配置类中定义,并通过依赖注入提供给
KafkaListenerContainerFactory
。
- 作用:负责创建 Kafka 消费者的实例。
ConcurrentKafkaListenerContainerFactory
- 作用:这个工厂类用于创建
ConcurrentMessageListenerContainer
实例,该容器管理多个Kafka MessageListenerContainer
来提供并发消息消费。 - 特点:可以设置并发消费的数量,即同时运行的
MessageListenerContainer
的数量。
支持消息过滤、错误处理和事务管理。 - 用法:在Spring配置类中定义,并设置其
ConsumerFactory
和其他相关配置。然后,可以通过@KafkaListener
注解直接使用,Spring会自动使用这个工厂来创建监听器。
- 作用:这个工厂类用于创建
KafkaListenerEndpointRegistry
- 作用:这是一个管理类,用于管理应用中所有由
@KafkaListener
注解创建的消息监听器容器。 - 特点:提供了启动和停止监听器的方法,可以在运行时控制监听器。
可以用来查询当前所有注册的监听器的状态。 - 用法:通常自动配置,可以通过自动注入到任何Spring管理的Bean中,用于运行时管理监听器。
- 作用:这是一个管理类,用于管理应用中所有由
KafkaTemplate
- 作用:这是一个高级抽象,用于生产消息到Kafka主题。
- 特点:提供同步和异步发送消息的方法。
支持事务消息发送。 - 用法:定义在Spring配置类中,注入生产者工厂
ProducerFactory
,并用于应用中的消息发送。
@KafkaListener
作用:注解用于标记方法以作为Kafka消息的监听器,这些方法会自动被Spring容器管理,并在有新消息时触发。
特点:
可以指定主题、分区和消费组。
支持并发消费。
用法:放在组件的方法上,方法参数可以灵活地映射消息的key、value、headers等。
从上面的内容可以看到,KafkaListenerEndpointRegistry
这个类是管理消息监听容器的,并提供了启动和停止监听器的方法,于是我就想创建这个类来完成我的需求功能。当我直接写如下内容时:
@Component
public class KafkaConfig {
@Autowired
private KafkaListenerEndpointRegistry registry;
@PostConstruct
public void init() {
System.out.println(registry);
}
}
IDEA提示了 Could not autowire. No beans of ‘KafkaListenerEndpointRegistry’ type found. 但是我启动 SpringBoot 项目却没有报错 :
我在我的项目中是没有加 @EnableKafka
这样的注解的,代码如下:
@SpringBootApplication
public class SpringKafkaExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SpringKafkaExampleApplication.class, args);
}
}
引入的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
于是我就比较好奇,项目启动的时候是在什么地方声明了 KafkaListenerEndpointRegistry
这个 bean 的。
KafkaListenerEndpointRegistry 隐式注册分析
SpringBoot 对于 kafka 有如下的自动配置:
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {
private final KafkaProperties properties;
private final RecordMessageConverter recordMessageConverter;
private final RecordFilterStrategy<Object, Object> recordFilterStrategy;
private final BatchMessageConverter batchMessageConverter;
private final KafkaTemplate<Object, Object> kafkaTemplate;
private final KafkaAwareTransactionManager<Object, Object> transactionManager;
private final ConsumerAwareRebalanceListener rebalanceListener;
private final CommonErrorHandler commonErrorHandler;
private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
private final RecordInterceptor<Object, Object> recordInterceptor;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> recordMessageConverter,
ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
ObjectProvider<BatchMessageConverter> batchMessageConverter,
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener,
ObjectProvider<CommonErrorHandler> commonErrorHandler,
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
this.properties = properties;
this.recordMessageConverter = recordMessageConverter.getIfUnique();
this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
this.batchMessageConverter = batchMessageConverter
.getIfUnique(() -> new BatchMessagingMessageConverter(this.recordMessageConverter));
this.kafkaTemplate = kafkaTemplate.getIfUnique();
this.transactionManager = kafkaTransactionManager.getIfUnique();
this.rebalanceListener = rebalanceListener.getIfUnique();
this.commonErrorHandler = commonErrorHandler.getIfUnique();
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
this.recordInterceptor = recordInterceptor.getIfUnique();
}
@Bean
@ConditionalOnMissingBean
ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
configurer.setKafkaProperties(this.properties);
configurer.setBatchMessageConverter(this.batchMessageConverter);
configurer.setRecordMessageConverter(this.recordMessageConverter);
configurer.setRecordFilterStrategy(this.recordFilterStrategy);
configurer.setReplyTemplate(this.kafkaTemplate);
configurer.setTransactionManager(this.transactionManager);
configurer.setRebalanceListener(this.rebalanceListener);
configurer.setCommonErrorHandler(this.commonErrorHandler);
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
configurer.setRecordInterceptor(this.recordInterceptor);
return configurer;
}
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory
.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
return factory;
}
@Configuration(proxyBeanMethods = false)
@EnableKafka
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
static class EnableKafkaConfiguration {
}
}
可以看到这个配置类里面有一个静态的内部类 EnableKafkaConfiguration
该类上声明了 @EnableKafka
注解,也就是说内部静态类EnableKafkaConfiguration
使用了@EnableKafka
注解,并且通过@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
确保如果Spring上下文中缺少相应的Bean,则自动激活@EnableKafka
功能。这意味着,即便你没有在你的应用配置中显式添加@EnableKafka
,这个内部类也可以根据条件自动注册所需的Bean,从而启用Kafka的支持。
@EnableKafka
定义如下:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaListenerConfigurationSelector.class)
public @interface EnableKafka {
}
这个注解的使用导致了KafkaListenerConfigurationSelector
的激活,其源码如下:
@Order
public class KafkaListenerConfigurationSelector implements DeferredImportSelector {
@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
return new String[] { KafkaBootstrapConfiguration.class.getName() };
}
}
上面的代码中 DeferredImportSelector
是Spring框架中一个特殊的接口,它继承自ImportSelector
。它主要用于处理配置类的导入,允许更细致地控制配置类的加载顺序。这个接口特别适用于那些依赖于由Spring容器中其他Bean或配置动态决定的配置。
KafkaListenerConfigurationSelector
这个类实现了DeferredImportSelector
并通过selectImports
方法返回了一个配置类名称的数组。这个方法指定了当Spring处理到这个选择器时,它应该导入KafkaBootstrapConfiguration
类。
KafkaBootstrapConfiguration
内容如下:
public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
if (!registry.containsBeanDefinition(
KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));
}
if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
new RootBeanDefinition(KafkaListenerEndpointRegistry.class));
}
}
}
KafkaBootstrapConfiguration
是一个实现了ImportBeanDefinitionRegistrar
接口的类,主要用于程序化地注册Bean定义到Spring的ApplicationContext
中。通过实现ImportBeanDefinitionRegistrar
接口,这个类可以在Spring的配置阶段动态地添加Bean定义。
在这个特定的实现中,KafkaBootstrapConfiguration
检查特定的Kafka相关Bean(如KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME和KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)是否已经注册。如果这些Bean尚未注册,它会使用RootBeanDefinition
手动注册这些Bean到Spring容器中。
RootBeanDefinition 的功能
RootBeanDefinition
是Spring框架中用于定义Bean的一个核心类。它是BeanDefinition
接口的一个直接实现,提供了一种配置Spring管理的Bean的方式,包括Bean的类类型、生命周期回调、依赖信息等。
- Bean配置的详细定义:
RootBeanDefinition
允许开发者详细定义Bean的创建细节,如构造函数参数、属性值、初始化方法、销毁方法等。 - 高级功能:它还支持更复杂的配置,如懒加载、自动装配模式、作用域和其他高级特性。
- 程序化Bean注册:通过使用RootBeanDefinition,开发者可以在运行时动态地注册Bean,这对于条件配置或需要响应不同配置环境的高级用途尤为重要。
在KafkaBootstrapConfiguration
类中,使用RootBeanDefinition
来创建和注册KafkaListenerAnnotationBeanPostProcessor
和KafkaListenerEndpointRegistry
类的实例,这些是设置和管理Kafka消息监听器所必需的。
之后在AbstractBeanFactory
会根据 beanName 获取到了 RootBeanDefinition
如下图所示:
然后在如下所示的位置:
程序创建了 beanName 为 org.springframework.kafka.config.internalKafkaListenerEndpointRegistry
的实例,具体创建实例的位置如下:
从调试中可以看到此处实例化了 KafkaListenerEndpointRegistry
。
所以当我们 springboot 项目引入了
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
依赖后,即使我们不显示的声明 @EnableKafka
程序也会进行初始化相应的配置。
总结
当Spring Boot项目中引入Spring Kafka依赖后,即使我们没有显式声明@EnableKafka
,系统仍会自动进行相应的配置。因此,在项目中尝试注入KafkaListenerEndpointRegistry
时,尽管IDE可能会提示“Could not autowire. No beans of ‘KafkaListenerEndpointRegistry’ type found.”,项目依然能够正常启动。这是因为KafkaListenerEndpointRegistry
在Spring Kafka的自动配置过程中已被隐式注册。