【源码解析】实现异步功能的注解 @Async 的源码解析

news2024/11/15 12:26:37

使用方式

  1. 启动类上添加注解@EnableAsync()
  2. 在方法或者类上添加@Async

源码解析

初始化配置

@EnableAsync注入了AsyncConfigurationSelector

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

	Class<? extends Annotation> annotation() default Annotation.class;


	boolean proxyTargetClass() default false;

	AdviceMode mode() default AdviceMode.PROXY;

	int order() default Ordered.LOWEST_PRECEDENCE;

}

AsyncConfigurationSelector注入了ProxyAsyncConfiguration

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

	private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
			"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";

	@Override
	@Nullable
	public String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			case PROXY:
				return new String[] {ProxyAsyncConfiguration.class.getName()};
			case ASPECTJ:
				return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
			default:
				return null;
		}
	}

}

ProxyAsyncConfiguration该类初始化了AsyncAnnotationBeanPostProcessor,并注入到Spring容器中。

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
		bpp.configure(this.executor, this.exceptionHandler);
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}

}

AsyncAnnotationBeanPostProcessor继承了AbstractBeanFactoryAwareAdvisingPostProcessorAbstractBeanFactoryAwareAdvisingPostProcessor实现了BeanFactoryAware,所以系统启动会执行AsyncAnnotationBeanPostProcessor#setBeanFactory

	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
		super.setBeanFactory(beanFactory);

		AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
		if (this.asyncAnnotationType != null) {
			advisor.setAsyncAnnotationType(this.asyncAnnotationType);
		}
		advisor.setBeanFactory(beanFactory);
		this.advisor = advisor;
	}

AsyncAnnotationAdvisor构造方法中,初始化了advicepointcutAsyncAnnotationAdvisor#buildAdvice拦截类指定了AnnotationAsyncExecutionInterceptorAsyncAnnotationAdvisor#buildPointcut指定了可以根据方法和类上的注解进行拦截。

	public AsyncAnnotationAdvisor(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

		Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
		asyncAnnotationTypes.add(Async.class);
		try {
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
		}
		catch (ClassNotFoundException ex) {
			// If EJB 3.1 API not present, simply ignore.
		}
		this.advice = buildAdvice(executor, exceptionHandler);
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}

	protected Advice buildAdvice(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

		AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
		interceptor.configure(executor, exceptionHandler);
		return interceptor;
	}

	protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
		ComposablePointcut result = null;
		for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
			Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
			Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
			if (result == null) {
				result = new ComposablePointcut(cpc);
			}
			else {
				result.union(cpc);
			}
			result = result.union(mpc);
		}
		return (result != null ? result : Pointcut.TRUE);
	}

拦截处理

AnnotationAsyncExecutionInterceptor继承了AsyncExecutionInterceptor,真正的拦截处理在AsyncExecutionInterceptor#invoke内执行。核心逻辑就是获取指定的线程池,在线程池中执行任务。

	@Override
	@Nullable
	public Object invoke(final MethodInvocation invocation) throws Throwable {
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
		Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
		final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

		AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
		if (executor == null) {
			throw new IllegalStateException(
					"No executor specified and no default executor set on AsyncExecutionInterceptor either");
		}

		Callable<Object> task = () -> {
			try {
				Object result = invocation.proceed();
				if (result instanceof Future) {
					return ((Future<?>) result).get();
				}
			}
			catch (ExecutionException ex) {
				handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
			}
			catch (Throwable ex) {
				handleError(ex, userDeclaredMethod, invocation.getArguments());
			}
			return null;
		};

		return doSubmit(task, executor, invocation.getMethod().getReturnType());
	}

	@Nullable
	protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
		if (CompletableFuture.class.isAssignableFrom(returnType)) {
			return CompletableFuture.supplyAsync(() -> {
				try {
					return task.call();
				}
				catch (Throwable ex) {
					throw new CompletionException(ex);
				}
			}, executor);
		}
		else if (ListenableFuture.class.isAssignableFrom(returnType)) {
			return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
		}
		else if (Future.class.isAssignableFrom(returnType)) {
			return executor.submit(task);
		}
		else {
			executor.submit(task);
			return null;
		}
	}

获取线程池

AsyncExecutionAspectSupport#determineAsyncExecutor获取线程池,如果指定了线程池,获取指定的。如果没有指定,获取默认的线程池。

	@Nullable
	protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
		AsyncTaskExecutor executor = this.executors.get(method);
		if (executor == null) {
			Executor targetExecutor;
			String qualifier = getExecutorQualifier(method);
			if (StringUtils.hasLength(qualifier)) {
				targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
			}
			else {
				targetExecutor = this.defaultExecutor.get();
			}
			if (targetExecutor == null) {
				return null;
			}
			executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
					(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
			this.executors.put(method, executor);
		}
		return executor;
	}

AsyncExecutionAspectSupport#getDefaultExecutor,从Spring容器中获取。

	public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
		this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
		this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);
	}

	@Nullable
	protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
		if (beanFactory != null) {
			try {
				// Search for TaskExecutor bean... not plain Executor since that would
				// match with ScheduledExecutorService as well, which is unusable for
				// our purposes here. TaskExecutor is more clearly designed for it.
				return beanFactory.getBean(TaskExecutor.class);
			}
        }
    }

TaskExecutionAutoConfiguration中,通过生成ThreadPoolTaskExecutor的Bean,来提供默认的Executor。

    @Bean
	@ConditionalOnMissingBean
	public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties,
			ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
			ObjectProvider<TaskDecorator> taskDecorator) {
		TaskExecutionProperties.Pool pool = properties.getPool();
		TaskExecutorBuilder builder = new TaskExecutorBuilder();
		builder = builder.queueCapacity(pool.getQueueCapacity());
		builder = builder.corePoolSize(pool.getCoreSize());
		builder = builder.maxPoolSize(pool.getMaxSize());
		builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
		builder = builder.keepAlive(pool.getKeepAlive());
		Shutdown shutdown = properties.getShutdown();
		builder = builder.awaitTermination(shutdown.isAwaitTermination());
		builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
		builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
		builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
		builder = builder.taskDecorator(taskDecorator.getIfUnique());
		return builder;
	}

	@Lazy
    @Bean(
        name = {"applicationTaskExecutor", "taskExecutor"}
    )
    @ConditionalOnMissingBean({Executor.class})
    public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
        return builder.build();
    }

如果没有进行对应的配置,线程池阻塞队列的大小是Integer.MAX_VALUE

@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {

	private final Pool pool = new Pool();

	private final Shutdown shutdown = new Shutdown();

	/**
	 * Prefix to use for the names of newly created threads.
	 */
	private String threadNamePrefix = "task-";

	public static class Pool {

		/**
		 * Queue capacity. An unbounded capacity does not increase the pool and therefore
		 * ignores the "max-size" property.
		 */
		private int queueCapacity = Integer.MAX_VALUE;

		/**
		 * Core number of threads.
		 */
		private int coreSize = 8;

		/**
		 * Maximum allowed number of threads. If tasks are filling up the queue, the pool
		 * can expand up to that size to accommodate the load. Ignored if the queue is
		 * unbounded.
		 */
		private int maxSize = Integer.MAX_VALUE;

		/**
		 * Whether core threads are allowed to time out. This enables dynamic growing and
		 * shrinking of the pool.
		 */
		private boolean allowCoreThreadTimeout = true;

		/**
		 * Time limit for which threads may remain idle before being terminated.
		 */
		private Duration keepAlive = Duration.ofSeconds(60);
    }
}

由于默认的线程池队列大小是Integer.MAX_VALUE,当有很多线程进来,而系统没有及时处理掉,就会将队列塞满,从而系统会出现内存溢出。所以使用@Async需要指定自己设定的线程池。

在这里插入图片描述

标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/498714.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【郭东白架构课 模块二:创造价值】23|节点四:架构规划之统一语义

你好&#xff0c;我是郭东白。从这节课开始&#xff0c;我们就进入到架构活动的第四个环节——架构规划。这个环节比较复杂&#xff0c;可以分为四个部分&#xff1a;统一语义、需求确认、边界划分和规划确认。这节课我们先来讲统一语义。 架构师的工作日常就是跟不同的角色沟…

Mysql索引(4):索引语法

1 创建索引 CREATE [ UNIQUE | FULLTEXT ] INDEX index_name ON table_name (index_col_name,... ) ; 2 查看索引 SHOW INDEX FROM table_name; 3 删除索引 DROP INDEX index_name ON table_name; 4 案例演示 先来创建一张表 tb_user&#xff0c;并且查询测试数据。 creat…

从零开始学习Linux运维,成为IT领域翘楚(九)

文章目录 &#x1f525;Linux系统服务&#x1f525;Linux系统定时任务 &#x1f525;Linux系统服务 Service命令 服务(service) 本质就是进程&#xff0c;但是是运行在后台的&#xff0c;通常都会监听某个端口&#xff0c;等待其它程序的请求&#xff0c;比如(mysql , sshd 防…

Vue加SpringBoot实现项目前后端分离

首先需要搭建一个Vue的脚手架项目&#xff08;已经放在gitee里面了&#xff0c;下面是gitee网址&#xff0c;可以直接拉&#xff09; 那么接下来就是实现前后端分离的步骤 首先我们需要有一个登录页面 登录的点击事件利用axios提交到后台去&#xff0c;代码放在后面&#xff08…

【C++修炼之路:二叉搜索树】

目录&#xff1a; 二叉搜索树的概念构建一颗二叉树二叉树的查找二插树的插入 二叉树的删除删除右子树的最小节点 写一个中序来走这个二叉搜索树递归版删除&#xff08;recursion&#xff09;递归版插入&#xff08;recursion&#xff09;递归版查找&#xff08;recursion&#…

基于AT89C51单片机的电子密码锁设计与仿真

点击链接获取Keil源码与Project Backups仿真图&#xff1a; https://download.csdn.net/download/qq_64505944/87760996?spm1001.2014.3001.5503 源码获取 主要内容&#xff1a; &#xff08;1&#xff09;本设计为了防止密码被窃取要求在输入密码时在LCD屏幕上显示*号。 &a…

类和对象中(1)

文章目录 一、类的6个默认成员函数二、构造函数1、概念2、构造函数只初始化自定义类型3、对于不会初始化内置类型的补丁4、构造函数优点 三、析构函数1、概念2、什么时候需要自己写析构函数 &#xff1f;3、构造和析构顺序差异 四、拷贝构造函数1、概念2、拷贝构造下传值会无限…

MySQL环境搭建——“MySQL数据库”

各位CSDN的uu们你们好呀&#xff0c;小雅兰又来啦&#xff0c;好久没有更文啦&#xff0c;今天继续&#xff01;&#xff01;&#xff01;今天小雅兰的内容是MySQL环境搭建&#xff0c;下面&#xff0c;让我们进入MySQL数据库的世界吧 MySQL的卸载 MySQL的下载、安装、配置 M…

ubuntu18.04下pass-through直通realteck PCI设备到qemu-kvm虚拟机实践

设备直通是一种虚拟化资源分配方式&#xff0c;通过将物理设备直通给虚拟机环境&#xff0c;达到虚拟机可以直接访问物理设备的目的&#xff0c;直通功能对设备的要求不高&#xff0c;不需要设备支持PF/VF&#xff0c;18年后的普通家用PC的PCI设备都支持设备直通模式&#xff0…

【Java】Java对象的比较

Java对象的比较 PriorityQueue中插入对象元素的比较基本数据类型的比较对象的比较重写基类的equals方法基于Comparble接口类的比较基于比较器进行比较 PriorityQueue中插入对象 优先级队列在插入元素时有个要求&#xff1a;插入的元素不能是null或者元素之间必须要能够进行比较…

Redis持久化之AOF日志高频问题

1、如何采用AOF日志避免宕机丢失数据&#xff1f; Redis 的持久化主要有两大机制&#xff0c;即 AOF&#xff08;Append Only File&#xff09;日志和 RDB 快照。 MySQL数据库的写前日志&#xff08;Write Ahead Log, WAL&#xff09;&#xff0c;在实际写数据前&#xff0c;…

PWLCM分段线性混沌映射

混沌映射是生成混沌序列的一种方法,常见的混沌映射方式有 Logistic映射、Tent映射、Lorenz映射,而PWLCM&#xff08;Piecewise Linear Chaotic Map&#xff0c;分段线性混沌映射&#xff09;作为混沌映射的典型代表&#xff0c;数学形式简单&#xff0c;具有遍历性和随机性。其…

智能优化算法:基于减法平均的优化算法-附代码

智能优化算法&#xff1a;基于减法平均的优化算法 文章目录 智能优化算法&#xff1a;基于减法平均的优化算法1.基于减法平均优化算法1.1 初始化1.2 SABO的数学建模 2.实验结果3.参考文献4.Matlab 摘要&#xff1a;基于减法平均的优化算法&#xff08;Subtraction-Average-Base…

[数据结构] 二叉搜索树的详解实现

文章目录 概念实现架构BSTreeNodea&#xff08;节点&#xff09;BSTree框架 增删查 -- 循环写法insert&#xff08;尾插&#xff09;inOrder&#xff08;遍历&#xff09;Find&#xff08;查找&#xff09;Erase&#xff08;删除&#xff09;默认成员函数构造拷贝构造析构函数赋…

哈夫曼编码文件压缩和解压

哈夫曼编码&文件压缩和解压 文章目录 哈夫曼编码&文件压缩和解压哈夫曼编码基本介绍原理解析代码实现 文件的压缩文件的解压完整代码 哈夫曼编码 基本介绍 赫夫曼编码也翻译为 哈夫曼编码(Huffman Coding)&#xff0c;又称霍夫曼编码&#xff0c;是一种编码方式, 属于…

实现c++轻量级别websocket协议客户端

1 websocket 轻量客户端 因以前发过这个代码&#xff0c;但是一直没有整理&#xff0c;这次整理了一下&#xff0c;持续修改&#xff0c;主要是要使用在arm的linux上&#xff0c;发送接收的数据压缩成图片发送出去。 要达到轻量websocket 使用&#xff0c;必须要达到几个方面…

MySQL:数学函数和字符串函数

目录 前言&#xff1a; 数学函数&#xff1a; 求绝对值&#xff1a; 求PI&#xff1a; 求平方根&#xff1a; 求余数&#xff1a; 取整&#xff1a; 随机数&#xff1a; 四舍五入&#xff1a; 只舍不入&#xff1a; 返回参数符号&#xff1a; 幂运算&#xff1a; …

Illustrator如何编辑图形对象之实例演示?

文章目录 0.引言1.绘制海浪插画2.绘制时尚波浪发型3.绘制一条鲸鱼 0.引言 因科研等多场景需要进行绘图处理&#xff0c;笔者对Illustrator进行了学习&#xff0c;本文通过《Illustrator CC2018基础与实战》及其配套素材结合网上相关资料进行学习笔记总结&#xff0c;本文对图形…

快速上手Pytorch实现BERT,以及BERT后接CNN/LSTM

快速上手Pytorch实现BERT&#xff0c;以及BERT后接CNN/LSTM 本项目采用HuggingFace提供的工具实现BERT模型案例&#xff0c;并在BERT后接CNN、LSTM等 HuggingFace官网 一、实现BERT&#xff08;后接线性层&#xff09; 1.引用案例源码&#xff1a; from transformers impo…

开关电源基础01:电源变换器基础(2)

说在开头&#xff1a;关于德布罗意的电子波&#xff08;3&#xff09; 1923年&#xff0c;德布罗意在求出他的相波之前&#xff0c;康普顿刚好用光子说解释了康普顿效应&#xff08;记性好的胖友们应该还记得&#xff1a;散射波的波长变长问题&#xff09;&#xff0c;从而带领…