文章目录
- 一、遇到问题
- 二、达成目的
- 三、开始调研
- 1、使用@Async获取线程池流程
- 2、查看中间件线程池工具类
- 3、观察AsyncConfigurer接口
- 4、查看TaskExecutorAdapter源码
- 四、复盘总结
一、遇到问题
自己负责的项目想通过引入一个中间件,达到在业务场景维度的全链路日志监控。当接入相关中间件后,发现如果在业务中开启了多线程,链路日志就会缺失。当然,该中间件官方给出了解决办法,那就是使用中间件提供的工具类对线程池包装一下(TrackTraceHelper.wrappedExecutorService(executorService))就可以解决。当然这个问题确实解决了,但我们还存在以下几个问题:
问题一:项目原本使用的线程池是Spring的ThreadPoolTaskExecutor,在其setTaskDecorator方法中有进行corp租户信息的传递。功能是否还生效?
问题二:项目有使用@Aync注解开启异步任务。异步注解能否获取到包装后的线程池?
二、达成目的
查看该中间件的源码,发现corp租户信息上下的传递在其底层已经实现,所以这算不是一个问题。在使用@Aync开启异步任务的时候,我们是可以指定线程池的名称的,所以问题二也不是问题。
虽然问题解决了,但是每次使用异步注解都需要去显式的指定名称,感觉这种做法并不友好。有没有一种办法能够在开发者无感知的情况下,实现@Async开启异步线程是、ThreadPoolTaskExecutor线程池的统一,并且这个线程池还是被中间件工具类包装后的,达到既要又要。
三、开始调研
查看Spring源码中异步任务相关的源码,从@EnableAsync注解往下点就完事
1、使用@Async获取线程池流程
根据源码可得出,使用@Async后,异步任务使用线程池流程为:
- 我们给显式给@Async指定了线程池
- 首先判定有没有实现AsyncConfigurer接口的线程池;
- 其次判定有没有TaskExecutor(比如Spring中的ThreadPoolTaskExecutor)类型的线程池Bean;
- 再然后就是判定有没有Executor类型的线程池Bean;
- 最后,如果都没有,那就使用兜底逻辑,Spring自己创建一个SimpleAsyncTaskExecutor线程池。
在我们已知的情况下,方案1(不友好)和5(无法被中间件工具类包装)肯定也不行,所以只剩下方案2、3、4
2、查看中间件线程池工具类
进入中间件线程池工具类源码,我们不难发现中间件包装的线程池类型,是一个ExecutorService,并且内部有直接调用submit方法。所以方案4,使用Executor类型的线程池肯定会报错。方案3中的TaskExecutor是隶属于Spring的线程池体系,它和Java中的线程池相交的点,只有顶层接口Executor,所以方案3也不行。只剩方案2了。
3、观察AsyncConfigurer接口
我发现getAsyncExecutor方法返回的线程池是一个Executor类型的,这必然也不满足我们的要求,因为我们想要的类型是ExecutorService,所以方案2也不行。
可我还想试试看,万一还有希望呢?既然这里需要返回的类型是Executor,我要是返回一个它的子类ExecutorService,编译肯定是没问题的,并且理论上子类比父类更强大,用起来也不会有问题。但是为了保险,还是继续跟下源码。
4、查看TaskExecutorAdapter源码
继续向下跟Spring异步任务线程池调用的源码,这里不得不称赞Spring源码的兼容性
最终我们会进入到TaskExecutorAdapter内部,这里以submit方法为例:
使用线程池调用submit方法时:
- 判断当前线程池是否为ExecutorService类型,如果是则直接强转后调用submit方法
- 如果是其它类型,则将任务封装为一个FutureTask,然后调用Executor的execute方法,最后将返回值get出来返回
即,方案2的做法是可行的。最终线程池的配置代码如下所示:
@Configuration
@ConfigurationProperties(prefix = "spring.task.execution")
public class SpringTaskConfig implements AsyncConfigurer {
private final TaskExecutionProperties.Pool pool = new TaskExecutionProperties.Pool();
/**
* 使用中间件包装后的线程池
*/
@Bean
@Override
public ExecutorService getAsyncExecutor() {
ExecutorService executorService = new ThreadPoolExecutor(
pool.getCoreSize(),
pool.getMaxSize(),
pool.getKeepAlive().getSeconds(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(pool.getQueueCapacity()),
new ThreadPoolExecutor.CallerRunsPolicy());
return TrackTraceHelper.wrappedExecutorService(executorService);
}
}
四、复盘总结
回头再理一下方案2为什么能成功。
其实Spring自始自终都没有使用Java中ExecutorService这个线程池,而是使用Executor线程池,并且自定义了一个TaskExecutor线程池来替代ExecutorService。AbstractExecutorService中相关的功能也都分别转交给了TaskExecutor的各个子类。
而今天我们的做法,相当于将ExecutorService以Executor子类,但是非TaskExecutor子类的身份进行传递,原本这种做法是有问题,巧在Spring在真正执行的时候,有一个兼容的逻辑判定,而正是这个兼容逻辑的存在完成了我们的目的。
至此,我们就可以在开发成员无感知的情况下,实现线程池的统一了。