Springboot RabbitMq源码解析之RabbitListener注解 (四)

news2024/11/16 9:53:53

文章目录

      • 1.RabbitListener注解介绍
      • 2.EnableRabbit和RabbitBootstrapConfiguration
      • 3.RabbitListenerAnnotationBeanPostProcessor
      • 4.对RabbitListener注解的解析
      • 5.RabbitListenerEndpointRegistrar

1.RabbitListener注解介绍

RabbitListenerSpringboot RabbitMq中经常用到的一个注解,将被RabbitListener注解的类和方法封装成MessageListener注入MessageListenerContainer

  • 当RabbitListener注解在方法上时,对应的方式就是Rabbit消息的监听器
  • 当RabbitListener注解在类上时,和RabbitHandle注解配合使用,可以实现不同类型的消息的分发,类中被RabbitHandle注解的方法就是Rabbit消息的监听器

2.EnableRabbit和RabbitBootstrapConfiguration

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

通过自动配置类RabbitAutoConfiguration将EnableRabbit引入,而EnableRabbit又通过import注解引入了配置类RabbitBootstrapConfiguration

public class RabbitBootstrapConfiguration implements ImportBeanDefinitionRegistrar {

	@Override
	public void registerBeanDefinitions(@Nullable AnnotationMetadata importingClassMetadata,
			BeanDefinitionRegistry registry) {

		if (!registry.containsBeanDefinition(
				RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {

			registry.registerBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
					new RootBeanDefinition(RabbitListenerAnnotationBeanPostProcessor.class));
		}

		if (!registry.containsBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
			registry.registerBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
					new RootBeanDefinition(RabbitListenerEndpointRegistry.class));
		}
	}

}

容器Ioc中注入RabbitListenerAnnotationBeanPostProcessor和RabbitListenerEndpointRegistry

3.RabbitListenerAnnotationBeanPostProcessor

在这里插入图片描述

RabbitListenerAnnotationBeanPostProcessor类实现了BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware, SmartInitializingSingleton接口,Ordered表示处理顺序,BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware主要用于获取对应的BeanFactory,BeanClassLoader, Environment属性,我们主要关注从SmartInitializingSingletonBeanPostProcessor继承的方法

在这里插入图片描述

public void afterSingletonsInstantiated() {
	this.registrar.setBeanFactory(this.beanFactory);

	if (this.beanFactory instanceof ListableBeanFactory) {
		Map<String, RabbitListenerConfigurer> instances =
				((ListableBeanFactory) this.beanFactory).getBeansOfType(RabbitListenerConfigurer.class);
		for (RabbitListenerConfigurer configurer : instances.values()) {
			configurer.configureRabbitListeners(this.registrar);
		}
	}

	if (this.registrar.getEndpointRegistry() == null) {
		if (this.endpointRegistry == null) {
			Assert.state(this.beanFactory != null,
					"BeanFactory must be set to find endpoint registry by bean name");
			this.endpointRegistry = this.beanFactory.getBean(
					RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
					RabbitListenerEndpointRegistry.class);
		}
		this.registrar.setEndpointRegistry(this.endpointRegistry);
	}

	if (this.containerFactoryBeanName != null) {
		this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
	}

	// Set the custom handler method factory once resolved by the configurer
	MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
	if (handlerMethodFactory != null) {
		this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
	}

	// Actually register all listeners
	this.registrar.afterPropertiesSet();

	// clear the cache - prototype beans will be re-cached.
	this.typeCache.clear();
}

初始化工作,主要是基于自定义配置RabbitListenerConfigurer进行RabbitListenerAnnotationBeanPostProcessor(尤其是registrar元素)的初始化

在这里插入图片描述

  • postProcessBeforeInitialization
  • postProcessAfterInitialization

在这里插入图片描述

在这里插入图片描述

	@Override
	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
		Class<?> targetClass = AopUtils.getTargetClass(bean);
		final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
		for (ListenerMethod lm : metadata.listenerMethods) {
			for (RabbitListener rabbitListener : lm.annotations) {
				processAmqpListener(rabbitListener, lm.method, bean, beanName);
			}
		}
		if (metadata.handlerMethods.length > 0) {
			processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
		}
		return bean;
	}

对RabbitListener注解查找和解析

  • RabbitListenerAnnotationBeanPostProcessor#buildMetadata
  • RabbitListenerAnnotationBeanPostProcessor#processAmqpListener
  • RabbitListenerAnnotationBeanPostProcessor#processMultiMethodListeners

4.对RabbitListener注解的解析

RabbitListenerAnnotationBeanPostProcessor#buildMetadata

private TypeMetadata buildMetadata(Class<?> targetClass) {
	Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
	final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
	final List<ListenerMethod> methods = new ArrayList<>();
	final List<Method> multiMethods = new ArrayList<>();
	ReflectionUtils.doWithMethods(targetClass, method -> {
		Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
		if (listenerAnnotations.size() > 0) {
			methods.add(new ListenerMethod(method,
					listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
		}
		if (hasClassLevelListeners) {
			RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
			if (rabbitHandler != null) {
				multiMethods.add(method);
			}
		}
	}, ReflectionUtils.USER_DECLARED_METHODS);
	if (methods.isEmpty() && multiMethods.isEmpty()) {
		return TypeMetadata.EMPTY;
	}
	return new TypeMetadata(
			methods.toArray(new ListenerMethod[methods.size()]),
			multiMethods.toArray(new Method[multiMethods.size()]),
			classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
}

RabbitListenerAnnotationBeanPostProcessor就是针对每一个bean类进行解析,针对类上的RabbitListener注解、方法上的RabbitHandle注解和方法上的RabbitListener注解解析后封装到TypeMetadata类中

通过RabbitListenerAnotationBeanPostProcessor#buildMetadata查找并封装成TypeMetadata分别交给processAmqpListenerprocessMultiMethodListeners进行解析

protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
	Method methodToUse = checkProxy(method, bean);
	MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
	endpoint.setMethod(methodToUse);
	endpoint.setBeanFactory(this.beanFactory);
	endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
	String errorHandlerBeanName = resolveExpressionAsString(rabbitListener.errorHandler(), "errorHandler");
	if (StringUtils.hasText(errorHandlerBeanName)) {
		endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));
	}
	processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}

private void processMultiMethodListeners(RabbitListener[] classLevelListeners, Method[] multiMethods,
		Object bean, String beanName) {
	List<Method> checkedMethods = new ArrayList<Method>();
	for (Method method : multiMethods) {
		checkedMethods.add(checkProxy(method, bean));
	}
	for (RabbitListener classLevelListener : classLevelListeners) {
		MultiMethodRabbitListenerEndpoint endpoint = new MultiMethodRabbitListenerEndpoint(checkedMethods, bean);
		endpoint.setBeanFactory(this.beanFactory);
		processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);
	}
}

RabbitListenerAnnotationBeanPostProcessor#processAmqpListener针对被RabbitListener注解的方法进行解析,
RabbitListenerAnnotationBeanPostProcessot#processMultiMethodListeners针对RabbitListener注解的类中被RabbitHandle注解的方法进行解析

新建MultiMethodRabbitListenerEndpoint对象,针对两种方式的差异进行部分属性的初始化后交给RabbitListenerAnnotationBeanPostProcessor进行后续处理processListener

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
		Object adminTarget, String beanName) {
	endpoint.setBean(bean);
	endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
	endpoint.setId(getEndpointId(rabbitListener));
	endpoint.setQueueNames(resolveQueues(rabbitListener));
	endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
	String group = rabbitListener.group();
	if (StringUtils.hasText(group)) {
		Object resolvedGroup = resolveExpression(group);
		if (resolvedGroup instanceof String) {
			endpoint.setGroup((String) resolvedGroup);
		}
	}
	String autoStartup = rabbitListener.autoStartup();
	if (StringUtils.hasText(autoStartup)) {
		endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));
	}

	endpoint.setExclusive(rabbitListener.exclusive());
	String priority = resolve(rabbitListener.priority());
	if (StringUtils.hasText(priority)) {
		try {
			endpoint.setPriority(Integer.valueOf(priority));
		}
		catch (NumberFormatException ex) {
			throw new BeanInitializationException("Invalid priority value for " +
					rabbitListener + " (must be an integer)", ex);
		}
	}

	String rabbitAdmin = resolve(rabbitListener.admin());
	if (StringUtils.hasText(rabbitAdmin)) {
		Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");
		try {
			endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
		}
		catch (NoSuchBeanDefinitionException ex) {
			throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
					adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +
					rabbitAdmin + "' was found in the application context", ex);
		}
	}


	RabbitListenerContainerFactory<?> factory = null;
	String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
	if (StringUtils.hasText(containerFactoryBeanName)) {
		Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
		try {
			factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
		}
		catch (NoSuchBeanDefinitionException ex) {
			throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
					adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
					containerFactoryBeanName + "' was found in the application context", ex);
		}
	}

	this.registrar.registerEndpoint(endpoint, factory);
}

根据RabbitListener注解的属性进行MethodRabbitListenerEndpoint 的属性设置和校验,最后通过RabbitListenerEndpointRegistrar#registerEndpoint方法将MethodRabbitListenerEndpoint 注入容器RabbitListenerContainerFactory

5.RabbitListenerEndpointRegistrar

在这里插入图片描述

@Override
public void afterPropertiesSet() {
	registerAllEndpoints();
}

protected void registerAllEndpoints() {
	synchronized (this.endpointDescriptors) {
		for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
			this.endpointRegistry.registerListenerContainer(
					descriptor.endpoint, resolveContainerFactory(descriptor));
		}
		this.startImmediately = true;  // trigger immediate startup
	}
}

RabbitListenerEndpointRegistrar#registerEndpoint

public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
	Assert.notNull(endpoint, "Endpoint must be set");
	Assert.hasText(endpoint.getId(), "Endpoint id must be set");
	// Factory may be null, we defer the resolution right before actually creating the container
	AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
	synchronized (this.endpointDescriptors) {
		if (this.startImmediately) { // Register and start immediately
			this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
					resolveContainerFactory(descriptor), true);
		}
		else {
			this.endpointDescriptors.add(descriptor);
		}
	}
}

RabbitListenerEndpointRegistry#registerListenerContainer进行注册监听器的容器

RabbitListenerEndpointRegistry#registerListenerContainer

public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
                                      boolean startImmediately) {
	Assert.notNull(endpoint, "Endpoint must not be null");
	Assert.notNull(factory, "Factory must not be null");

	String id = endpoint.getId();
	Assert.hasText(id, "Endpoint id must not be empty");
	synchronized (this.listenerContainers) {
		Assert.state(!this.listenerContainers.containsKey(id),
				"Another endpoint is already registered with id '" + id + "'");
		MessageListenerContainer container = createListenerContainer(endpoint, factory);
		this.listenerContainers.put(id, container);
		if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
			List<MessageListenerContainer> containerGroup;
			if (this.applicationContext.containsBean(endpoint.getGroup())) {
				containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
			}
			else {
				containerGroup = new ArrayList<MessageListenerContainer>();
				this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
			}
			containerGroup.add(container);
		}
		if (startImmediately) {
			startIfNecessary(container);
		}
	}
}

基于RabbitListenerEndpoint根据监听器的容器工厂类生成一个监听器的容器,并且整个注册过程是同步的,同时最多只能有一个endpoint在注册

RabbitListenerEndpointRegistry#start

@Override
public void start() {
	for (MessageListenerContainer listenerContainer : getListenerContainers()) {
		startIfNecessary(listenerContainer);
	}
}
private void startIfNecessary(MessageListenerContainer listenerContainer) {
	if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
		listenerContainer.start();
	}
}

调用MessageListenerContainer#start方法, 监听器的启动。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/61236.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

D-023 DVI硬件电路设计

DVI硬件电路设计1 简介1.1 连接器1.2 接口信号定义1.3 DVI的分类1.4 DVI规格2 硬件设计实战3 硬件设计要点3.1 注意事项3.2 补充说明3.3 VGA 和 DVI 优缺点1 简介 DVI(Digital Visual Interface)是一种数字视频接口&#xff0c;它是基于TMDS (Transition Minimized Differenti…

MFC列表控件的用法(基于对话框的编程)

目录 一、List Control列表控件属性 1.List Control 2.View属性 二、OnInitDialog初始化列表 1.创建List Control的变量 2.找OnInitDialog ​3. InsertColumn插入表头 4. InsertColumn设置对齐方式和列宽 5. 设置List的正文内容 ​6.循环结构创建列表 7.设置列表整行…

Windows内核--子系统(3.5)

到底什么是子系统? 子系统是用户层概念。在Windows内核之上&#xff0c;如果想要执行类UNIX应用程序&#xff0c;就是POSIX子系统&#xff0c;如果要类似OS/2环境&#xff0c;就是OS/2子系统。 如何能模拟出不同子系统呢? 一般需要子系统用户态应用程序和相关DLL支援。 对于W…

腾讯云服务器mysql安装

1.选择mysql版本 2.安装mysql源 sudo wget https://repo.mysql.com//mysql80-community-release-el7-1.noarch.rpm 3.下载mysql.rpm源 wget https://repo.mysql.com//mysql80-community-release-el7-1.noarch.rpm 4.安装下载好的rpm包 sudo rpm -ivh mysql80-community-rele…

PCB入门介绍与电阻电容电感类元件的创建

摘自凡亿教育 目录 一、PCB入门介绍 二、电阻电容电感类元件的创建 1.绘制电阻的原理图库 2.绘制电容的原理图库 3.绘制电感的原理图 一、PCB入门介绍 1.EDA工具 Cadence Allegro :IC-芯片设计 Mentor PADS:做消费类电子产品、手机、机顶盒、平板电脑 Altium Designer…

多线程初阶(二)

目录 前言&#xff1a; synchronized 解析 可重入和不可重入问题 解析 Java中线程安全类 死锁问题 解析 解决死锁问题 解析 内存可见性 解析 volatile关键字 解析 wait&#xff0c;notify 解析 小结&#xff1a; 前言&#xff1a; 针对上篇文章讲到的线程安全…

VSCode\\VS2017下CPP环境的配置

VSCode下C环境配置一些问题VSCode下配置C环境&#xff1a;VSCode与boost总结&#xff1a;坑位待填&#xff1a;VSCode中3个json文件的作用&#xff1a;环境配置出现的问题以及解决VS2017 配置 C环境VS配置boost库包含项目中的自定义的.hpp文件&#xff0c;.h文件VSCode下配置C环…

公众号网课题库接口

公众号网课题库接口 本平台优点&#xff1a; 多题库查题、独立后台、响应速度快、全网平台可查、功能最全&#xff01; 1.想要给自己的公众号获得查题接口&#xff0c;只需要两步&#xff01; 2.题库&#xff1a; 题库&#xff1a;题库后台&#xff08;点击跳转&#xff09;…

4.验证面试高频问题整理(附答案)

目录 Q76.package如何使用 Q77.如何在子类中调用父类中的方法 Q78.bit[7:0]和byte有什么区别 Q79.类中的方法和类外的方法有什么区别 Q80.如何将类中的方法定义在类外 Q81.modport的用途是什么 Q82.struct和union的异同 Q83.$rose和posedge区别 Q84.如何在fork...join结…

[附源码]Python计算机毕业设计Django人事管理系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

js对象易混淆知识

js对象易混淆知识 __proto__ vs prototype __proto__和constructor属性是对象所独有的。 __proto__属性的作用就是当访问一个对象的属性时&#xff0c;如果该对象内部不存在这个属性&#xff0c;那么就会去它的__proto__属性所指向的那个对象&#xff08;父对象&#xff09;…

三菱FX5U PLSV指令-可变速度输出

三菱FX5U PLSV指令-可变速度输出,程序如下 该指令用于输出带旋转方向输出的变速脉冲。只支持CPU模块 *1 只能使用Y。 *2 输出模式为CW/CCW时&#xff0c;请指定CCW轴。使用Y时&#xff0c;只能指定本轴的SIGN输出或通用输出。 *3 不能使用T、ST、C 以上是指定FX3操作数得情况…

JVM总结全

虚拟机 HotSpot 默认虚拟机 JRockit HotSpot融合了JRockit jdk8初步融合完成 没有解释器&#xff0c;只有编译器 IBM J9 结构图 类加载子系统Q 1.类加载器 ​ 启动类加载器&#xff08;引导类加载器&#xff09;Bootstrap ClassLoader ​ 加载java 核心类库&#xff0c;…

QT + FFmpeg 5.x + x264 + x265 + SDL2 音视频播放器

QT FFmpeg 5.x x264 x265 SDL2 音视频播放器 使用了QT的QML设计界面,人机交互; 使用了FFmpeg 5.x x264 x265 SDL2 完成了音视频的解析到播放; 阅读了ffplay的源码,用到了ffplay的核心思想. 想熟悉ffmpeg和ffplay的朋友,都可以参考学习. 代码自取: https://github.c…

秒杀实现技巧

需求分析 “秒杀”这个词在电商行业中出现的频率较高&#xff0c;如京东或者淘宝平台的各种“秒杀”活动&#xff0c;最典型的就是“双11抢购”。 “秒杀”是指在有限的时间内对有限的商品数量进行抢购的一种行为&#xff0c;这是商家以“低价量少”的商品来获取用户的一种营…

Golang原理分析:切片(slice)原理及扩容机制

《Go语言精进之路》切片相关章节学习笔记及实验。 1.切片原理 说切片之前&#xff0c;先看看Go语言数组。Go数组是一个固定长度的、容纳同构类型元素的连续序列&#xff0c;因此Go数组类型具有两个属性&#xff1a;长度及类型&#xff1a; var a [1]int var b [2]byte var c …

【Web安全】文件上传漏洞

目录 1. 文件上传漏洞概述 1.1 FCKEditor文件上传漏洞 1.2 绕过文件上传检查功能 2. 功能还是漏洞 2.1 Apache文件解析 2.2 IIS文件解析 2.3 PHP CGI路径解析 2.4 利用上传文件钓鱼 3. 设计安全的文件上传功能 1. 文件上传漏洞概述 文件上传漏洞是指用户上传了一个…

R语言学习笔记——入门篇:第四章-基本数据管理

# R语言R语言学习笔记——入门篇&#xff1a;第四章-基本数据管理 文章目录一、示例二、创建新变量三、变量的重编码四、变量的重命名4.1、交互式编辑器4.2、函数编程五、缺失值5.1、重编码某些值为缺失值5.2、在分析中排除缺失值六、日期值6.1、将日期值转换回字符型变量6.2、…

使用 Learner Lab - 在 Lambda 建立 Pillow 层,进行 S3 的图片镜相操作

使用 Learner Lab - 在 Lambda 建立 Pillow 层&#xff0c;进行 S3 的图片镜相操作 AWS Academy Learner Lab 是提供一个帐号让学生可以自行使用 AWS 的服务&#xff0c;让学生可以在 100 USD的金额下&#xff0c;自行练习所要使用的 AWS 服务&#xff0c;如何进入 Learner La…

[论文阅读] 颜色迁移-Automated Colour Grading

[论文阅读] 颜色迁移-Automated Colour Grading 文章: Automated colour grading using colour distribution transfer, [paper], [matlab代码], [python代码] 1-算法原理 本文算法分为2个大步骤, 首先使用IDT(Iterative Distribution Transfer)方法得到初步的结果, 这个结果…