SpringBoot的任务执行器
Spring Boot通过auto-configuration机制自动创建了任务执行器Task Execution,因此在SpringBoot项目中,你不需要任何配置、也不需要自己创建Task Execution就可以直接使用它。
Spring Boot通过auto-configuration机制创建的任务执行器有以下作用:
- asynchronous task execution (@EnableAsync):通过@EnableAsync以及@Async注解使用任务执行器。
- Spring for GraphQL’s asynchronous handling of Callable return values from controller methods(这个没用过)。
- Spring MVC’s asynchronous request processing:Spring MVC的异步请求处理。
- Spring WebFlux’s blocking execution support
除以上官网提到的,你还可以:手动使用任务执行器执行异步任务。
SpringBoot通过auto-configuration机制帮你创建了任务执行器TaskExecution,至于怎么通过TaskExecution、执行什么异步任务当然是你自己的事情了。
今天研究两部分内容:
- SpringBoot的auto-configuration机制创建任务执行器的过程。
- 通过任务执行器执行任务,主要是上述提到的官网内容第1项@EnableAsync以及@Async注解的原理及使用。
Spring Boot创建任务执行器
SpringBoot官网说的很明确,TaskExecutor通过SpringBoot的auto-configuration技术创建。我们知道SpringBoot的auto-configuration技术(详情请参考:SpringBoot 自动配置@EnableAutoConfiguration)是通过META-INF/spring.factories文件指定自动配置内容的,我们打开spring.factories文件找一下taskExecutor的相关内容,在EnableAutoConfiguration项下果然发现了TaskExecutionAutoConfiguration:
TaskExecutionAutoConfiguration在org.springframework.boot.autoconfigure.task包下,代码不算长,比较简单:
@ConditionalOnClass(ThreadPoolTaskExecutor.class)
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(TaskExecutionProperties.class)
public class TaskExecutionAutoConfiguration {
/**
* Bean name of the application {@link TaskExecutor}.
*/
public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";
@Bean
@ConditionalOnMissingBean
public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties,
ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
ObjectProvider<TaskDecorator> taskDecorator) {
TaskExecutionProperties.Pool pool = properties.getPool();
TaskExecutorBuilder builder = new TaskExecutorBuilder();
builder = builder.queueCapacity(pool.getQueueCapacity());
builder = builder.corePoolSize(pool.getCoreSize());
builder = builder.maxPoolSize(pool.getMaxSize());
builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
builder = builder.keepAlive(pool.getKeepAlive());
Shutdown shutdown = properties.getShutdown();
builder = builder.awaitTermination(shutdown.isAwaitTermination());
builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
builder = builder.taskDecorator(taskDecorator.getIfUnique());
return builder;
}
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}
}
它当然是一个Configuration类,在SpringBoot启动的过程@Bean注解指定的方法会被加载到Spring IoC容器中。其次,通过@EnableConfigurationProperties指定了配置类TaskExecutionProperties。
被加载到Ioc容器中的有两个对象:一个是通过taskExecutorBuilder方法加载的TaskExecutorBuilder,另外一个是通过applicationTaskExecutor方法加载的ThreadPoolTaskExecutor。
分别看一下。
TaskExecutorBuilder的创建
TaskExecutor构建器,在applicationTaskExecutor方法中负责构建TaskExecutor。
taskExecutorBuilder方法会接收一个参数TaskExecutionProperties ,用来指定TaskExecutor的各属性,比如queueCapacity、coreSize、keepAlive等等线程池相关参数,线程池相关内容请参考:线程池 - ThreadPoolExecutor源码分析。
简单看一眼TaskExecutionProperties类:
@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {
private final Pool pool = new Pool();
private final Shutdown shutdown = new Shutdown();
/**
* Prefix to use for the names of newly created threads.
*/
private String threadNamePrefix = "task-";
public Pool getPool() {
return this.pool;
}
public Shutdown getShutdown() {
return this.shutdown;
}
public String getThreadNamePrefix() {
return this.threadNamePrefix;
}
public void setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
}
public static class Pool {
/**
* Queue capacity. An unbounded capacity does not increase the pool and therefore
* ignores the "max-size" property.
*/
private int queueCapacity = Integer.MAX_VALUE;
/**
* Core number of threads.
*/
private int coreSize = 8;
/**
* Maximum allowed number of threads. If tasks are filling up the queue, the pool
* can expand up to that size to accommodate the load. Ignored if the queue is
* unbounded.
*/
private int maxSize = Integer.MAX_VALUE;
/**
* Whether core threads are allowed to time out. This enables dynamic growing and
* shrinking of the pool.
*/
private boolean allowCoreThreadTimeout = true;
/**
* Time limit for which threads may remain idle before being terminated.
*/
private Duration keepAlive = Duration.ofSeconds(60);
public int getQueueCapacity() {
return this.queueCapacity;
}
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
public int getCoreSize() {
return this.coreSize;
}
public void setCoreSize(int coreSize) {
this.coreSize = coreSize;
}
public int getMaxSize() {
return this.maxSize;
}
public void setMaxSize(int maxSize) {
this.maxSize = maxSize;
}
public boolean isAllowCoreThreadTimeout() {
return this.allowCoreThreadTimeout;
}
public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
this.allowCoreThreadTimeout = allowCoreThreadTimeout;
}
public Duration getKeepAlive() {
return this.keepAlive;
}
public void setKeepAlive(Duration keepAlive) {
this.keepAlive = keepAlive;
}
}
正是@EnableConfigurationProperties以及@ConfigurationProperties注解决定了我们可以在配置文件(比如application.yml)中指定TaskExecutionProperties中的这些有关线程池的参数。
接收到这些配置参数之后,使用配置参数创建TaskExecutorBuilder,交给Spring Ioc容器。
ThreadPoolTaskExecutor 的创建
applicationTaskExecutor方法通过上面创建出来的TaskExecutorBuilder的build方法创建。
TaskExecutorBuilderd的build方法:
public ThreadPoolTaskExecutor build() {
return configure(new ThreadPoolTaskExecutor());
}
new了一个ThreadPoolTaskExecutor对象,调用configure方法:
public <T extends ThreadPoolTaskExecutor> T configure(T taskExecutor) {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this.queueCapacity).to(taskExecutor::setQueueCapacity);
map.from(this.corePoolSize).to(taskExecutor::setCorePoolSize);
map.from(this.maxPoolSize).to(taskExecutor::setMaxPoolSize);
map.from(this.keepAlive).asInt(Duration::getSeconds).to(taskExecutor::setKeepAliveSeconds);
map.from(this.allowCoreThreadTimeOut).to(taskExecutor::setAllowCoreThreadTimeOut);
map.from(this.awaitTermination).to(taskExecutor::setWaitForTasksToCompleteOnShutdown);
map.from(this.awaitTerminationPeriod).as(Duration::toMillis).to(taskExecutor::setAwaitTerminationMillis);
map.from(this.threadNamePrefix).whenHasText().to(taskExecutor::setThreadNamePrefix);
map.from(this.taskDecorator).to(taskExecutor::setTaskDecorator);
if (!CollectionUtils.isEmpty(this.customizers)) {
this.customizers.forEach((customizer) -> customizer.customize(taskExecutor));
}
return taskExecutor;
}
将配置文件传递过来的参数传递给创建出来的ThreadPoolTaskExecutor对象并返回。
不配置的情况下,线程池默认参数在TaskExecutionProperties中指定:
ThreadPoolTaskExecutor创建完成!
TaskExecutor的使用
既然Spring Boot已经帮助我们完成了TaskExecutor的创建并注入了Spring Ioc容器中,接下来我们就看一下该怎么使用它。
首先要尝试的是“手动使用”,不使用Spring的注解、而是想办法在代码中直接从Spring容器中获取到TaskExecutor之后调用他的execute方法。
首先创建一个Spring Boot项目,不需要什么特殊功能,pom文件也很简单,引入spring-web即可:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.5</version>
<!-- <version>3.1.4</version>-->
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>springbootstart</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springbootstart</name>
<description>springbootstart</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
然后,创建一个userService:
@Service
@Slf4j
public class UserService {
@Autowired
private TaskExecutor taskExecutor;
public void test2(){
taskExecutor.execute(()->{
log.info("this is userservice test2 start...");
try{
Thread.sleep(10000);
}catch (Exception e){
}
log.info("This is userService' test2 end...");
});
}
}
userService非常简单,比较重要的是:
@Autowired
private TaskExecutor taskExecutor;
这行代码通过@Autowired自动装配一个TaskExecutor 对象,因为我们从前面对Spring Boot代码的分析,Spring Boot应该是在启动的过程中已经通过auto-configuration机制自动创建并注入了TaskExecutor,所以按道理我们是可以通过自动装配的方式在userService中应用它的。
然后写一个test2方法,log看一下装配进来的taskExecutor到底是个啥对象,再调用taskExecutor的execute的方法模拟异步执行任务,执行前后打印log。
然后,写controller:
@RestController
@RequestMapping("/hello")
@Slf4j
public class HelloWorldController {
public HelloWorldController(){
}
@Autowired
private UserService userService;
@GetMapping ("/test2")
public String test2(){
userService.test2();
log.info("after userservice test2 ...");
return "hello";
}
}
OK,代码准备好了,启动应用,测试。通过应用的端口号8002可以正常访问:
而且,结果可以立即返回,前台并不需要等待userService的test2方法中睡眠的10秒钟,说明睡眠的线程一定是通过taskExecutor调用起来的异步线程,taskExecutor一定是生效了。
后台log也说明确实如此:
前面的log是前台调用接口、tomcat的线程nio-8002-exec-1打印的,之后taskExecutor启动了新线程task-1,后面的两行日志是线程task-1打印的。
@Async注解
自己写代码使用taskExecutor线程池启动新线程执行任务这种方式虽然行得通,但是太low太繁琐了,既然使用了Spring框架,我们当然不需要这么麻烦。Spring给我们提供了@Async注解。
@Async注解可以用在方法上,也可以用在类上,不管用在方法上、还是用在类上,都要求当前类必须是受Spring管理的bean,因为@Async注解是通过Spring的BeanPostProcessor机制生效的。
我们改造UserService类,再编写一个test方法:
@Async
public void test(){
log.info("This is userService' test start...");
try {
Thread.sleep(10000);
}catch (Exception e){
}
log.info("This is userService' test end...");
return;
}
代码逻辑也非常简单,和test2方法一样,睡眠10秒后才返回结果。
重新启动应用后测试,发现@Async没有生效!
不生效的原因是缺少@EnableAsync注解,在启动类增加@EnableAsync注解后重新测试,发现@Async生效了,测试结果和test2的一样,所以也就不贴图了。
接下来的任务是,研究@EnableAsync注解的作用,为什么没有@EnableAsync注解的情况下,@Async注解不能生效。
@EnableAsync注解的底层原理
关于Spring的@Enablexxx注解,我们前面的文章分析过,基本就是通过@Configuration+@Import注解的联合使用达到注入指定对象到Spring IoC容器中。
先看@EnableAsync源码:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
通过@Import注解引入AsyncConfigurationSelector类,继续跟踪AsyncConfigurationSelector代码:
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
/**
* Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
* for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
* respectively.
*/
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
扩展了AdviceModeImportSelector类,而AdviceModeImportSelector类实现了ImportSelector接口,而ImportSelector接口这种方式最终是通过他的方法selectImports来实现注入的(这部分可以参考 SpringBoot 自动配置@EnableAutoConfiguration)。
selectImports方法根据adviceMode(默认是PROXY)会引入ProxyAsyncConfiguration类:
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
ProxyAsyncConfiguration 是一个配置类,会通过@Bean注解注入一个叫AsyncAnnotationBeanPostProcessor 的BeanPostProcessor。从类名称我们就可以猜测到@Async注解就是通过这个后置处理器进行处理的。
接下来的代码跟踪还是稍稍有点复杂的。
首先,AsyncAnnotationBeanPostProcessor 通过父类AbstractBeanFactoryAwareAdvisingPostProcessor实现了BeanFactoryAware接口,所以我们知道他的setBeanFactory方法在Spring的Bean创建过程中会被回调:
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
setBeanFactory方法中会创建一个advisor类AsyncAnnotationAdvisor,从名字中我们又可以猜测到,@Async注解最终应该会通过AOP技术实现。
继续跟踪AsyncAnnotationAdvisor源码,构造器:
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
调用buildAdvice和buildPointcut,创建切面和切点:
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
构造切面的方法会创建一个AnnotationAsyncExecutionInterceptor 类,回忆一下AOP相关知识,我们知道Pointcut满足的情况下会调用切面类的invoke方法。
构造Pointcut的方法源码我们就不再跟踪了,可以猜测到他的匹配逻辑应该是检查当前方法(或者当前类)是否有@Async注解。
接下来我们就继续跟踪AnnotationAsyncExecutionInterceptor 类。
AnnotationAsyncExecutionInterceptor继承自父类AsyncExecutionInterceptor,invoke方法在他父类AsyncExecutionInterceptor中。
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
invoke方法就是实现异步调用的地方!
首先会通过determineAsyncExecutor方法获取taskExecutor,这也是我们关心的地方,不过我们先放放,先看一下拿到TaskExecutor之后的处理逻辑。
代码并不复杂,lamda方式创建一个callable任务,通过invocation.proceed()执行原方法。
通过doSubmit方法、使用TaskExecutor启动新的线程调用task任务、完成对原方法的执行!
主要代码跟踪完毕。
最后,再来看一下determineAsyncExecutor方法:
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
总体的逻辑就是,通过beanFactory从Spring Ioc容器中获取TaskExecutor,首先判断是否有QualifiedExecutor,有的话通过findQualifiedExecutor方法从容器中获取QualifiedExecutor,没有的话通过this.defaultExecutor.get()获取。
this.defaultExecutor.get()的业务逻辑需要基于接口AsyncConfigurer来解释:
public interface AsyncConfigurer {
/**
* The {@link Executor} instance to be used when processing async
* method invocations.
*/
@Nullable
default Executor getAsyncExecutor() {
return null;
}
/**
* The {@link AsyncUncaughtExceptionHandler} instance to be used
* when an exception is thrown during an asynchronous method execution
* with {@code void} return type.
*/
@Nullable
default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
AsyncConfigurer 接口有两个方法,一个用来获取Executor,一个用来获取AsyncUncaughtExceptionHandler。
this.defaultExecutor.get()的业务逻辑大概可以概括为:如果应用实现了AsyncConfigurer接口,则通过该接口获取Executor,否则,如果没有提供AsyncConfigurer的实现类,则向Spring Ioc容器获取默认的TaskExecutor:
OK,Thanks a lot!