文章目录
- 支持异步线程自动传递上下文(例如当前请求)的工具类(支持自定义上下文传递逻辑,支持拦截所有异步操作)
- 使用示范
- ContextSupportedAsyncUtil .java
- 自动拦截所有异步线程池操作
- ContextSupportedExecutorAspect.java
- 自定义上下文注入逻辑示范
支持异步线程自动传递上下文(例如当前请求)的工具类(支持自定义上下文传递逻辑,支持拦截所有异步操作)
当我们使用异步线程去执行一些耗时操作的时候,这些异步操作中可能需要获取当前请求等上下文信息
若未做传递逻辑默认是获取不到的,因此写了一个自动传递的工具和切面,可使用工具手动调用时会自动拷贝上下文信息,加载切面后会拦截所有异步操作自动拷贝上下文信息。
同时,上下文信息的加载拷贝移除逻辑也可实现接口自定义,自行扩展。
和阿里巴巴TransmittableThreadLocal(TTL)类似,多支持了可以自定义上下文传递逻辑,你可以认为是阿里TTL手写版本知乎介绍阿里TTL原理
使用示范
异步执行lambda表达式中的代码,代码中获取当前请求的URL并打印
若未使用该工具,是无法获取到当前请求的。
ContextSupportedAsyncUtil.execute(()->{System.out.println("我在异步线程中获取到的当前请求URL是"+((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest().getRequestURI());});
ContextSupportedAsyncUtil .java
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
/**
* @author humorchen
* date: 2024/7/30
* description: 支持上下文自动传递的异步工具
* 默认支持自动传递 HttpServletRequest 到异步线程中
* 其他上下文可自行创建类实现 AsyncContextInjector 接口,并调用ContextSupportedAsyncUtil.registerContextInjector 将其注册上去。
* 可参考 SpringWebRequestContextInjector。class
**/
@Slf4j
public class ContextSupportedAsyncUtil {
private static final int CORE_SIZE = 8;
private static final int MAX_SIZE = 32;
private static final int QUEUE_SIZE = 1024;
private static final int KEEP_ALIVE = 5;
private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.MINUTES;
private static final ArrayBlockingQueue<Runnable> QUEUE = new ArrayBlockingQueue<>(QUEUE_SIZE);
private static final AtomicInteger THREAD_NUM = new AtomicInteger(0);
private static final RejectedExecutionHandler REJECT_POLICY = new ThreadPoolExecutor.CallerRunsPolicy();
private static final List<Class<? extends AsyncContextInjector>> ASYNC_CONTEXT_INJECTOR_CLS_LIST = new ArrayList<>();
private static ContextSupportedThreadPoolExecutor EXECUTOR = new ContextSupportedThreadPoolExecutor(CORE_SIZE, MAX_SIZE, KEEP_ALIVE, KEEP_ALIVE_UNIT, QUEUE, (r) -> new Thread(r, "AsyncUtil-thread-" + THREAD_NUM.incrementAndGet()), REJECT_POLICY);
static {
// 默认支持spring mvc 的RequestHolder自动传递到异步线程中
ASYNC_CONTEXT_INJECTOR_CLS_LIST.add(SpringWebRequestContextInjector.class);
}
/**
* 支持自定义上下文传递的executor
*/
public static class ContextSupportedThreadPoolExecutor extends ThreadPoolExecutor {
public ContextSupportedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public ContextSupportedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public ContextSupportedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public ContextSupportedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* @param command the task to execute
*/
@Override
public void execute(@NonNull Runnable command) {
super.execute(command instanceof AsyncRunnable ? command : getAsyncRunnable(command));
}
}
/**
* 异步上下文注入器接口
* 将A线程的上下文注入到执行Runnable的B线程,并在执行后清除
*/
public interface AsyncContextInjector {
/**
* 初始化
* 读取A线程的上下文并保存到当前对象
*/
void init();
/**
* 注入上下文信息
* 注入init阶段存储的上下文到B线程的上下文中
*/
void inject();
/**
* 移除上下文信息
* 清理B线程刚注入的上下文
*/
void remove();
}
/**
* 支持传递ServletRequestAttributes对象用于获取当前请求HttpServletRequest
*/
public static class SpringWebRequestContextInjector implements AsyncContextInjector {
private ServletRequestAttributes requestAttributes;
/**
* 初始化
* 读取A线程的上下文并保存到当前对象
*/
@Override
public void init() {
requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
}
/**
* 注入上下文信息
* 注入init阶段存储的上下文到B线程的上下文中
*/
@Override
public void inject() {
RequestContextHolder.setRequestAttributes(requestAttributes);
}
/**
* 移除上下文信息
* 清理B线程刚注入的上下文
*/
@Override
public void remove() {
RequestContextHolder.resetRequestAttributes();
}
}
/**
* 抽象的注入器,将自定义的每个注入器执行
*/
public static class AbstractAsyncContextInjector {
private final List<AsyncContextInjector> list = new ArrayList<>(ASYNC_CONTEXT_INJECTOR_CLS_LIST.size());
public void init() {
// 初始化每个异步上下文注入器
for (Class<? extends AsyncContextInjector> aClass : ASYNC_CONTEXT_INJECTOR_CLS_LIST) {
try {
AsyncContextInjector asyncContextInjector = aClass.newInstance();
asyncContextInjector.init();
list.add(asyncContextInjector);
} catch (Exception e) {
log.error("AbstractAsyncContextInjector init error", e);
}
}
}
public void inject() {
for (AsyncContextInjector asyncContextInjector : list) {
try {
asyncContextInjector.inject();
} catch (Exception e) {
log.error("AbstractAsyncContextInjector inject error", e);
}
}
}
public void remove() {
for (AsyncContextInjector asyncContextInjector : list) {
try {
asyncContextInjector.remove();
} catch (Exception e) {
log.error("AbstractAsyncContextInjector remove error", e);
}
}
}
}
/**
* 支持异步线程传递自定义上下文时使用的Runnable包装类
*/
public static class AsyncRunnable extends AbstractAsyncContextInjector implements Runnable {
private final Runnable runnable;
public AsyncRunnable(Runnable runnable) {
// 初始化保存A线程上下文信息
init();
this.runnable = runnable;
}
@Override
public void run() {
if (runnable == null) {
return;
}
try {
// 将保存的A线程的上下文信息恢复到B线程
inject();
runnable.run();
} catch (Exception e) {
log.error("【ContextSupportedAsyncUtil】 run error {}", e.getMessage());
throw e;
} finally {
// 清理B线程的上下文
remove();
}
}
}
/**
* 异步执行任务
*
* @param runnable
*/
public static void execute(Runnable runnable) {
EXECUTOR.execute(runnable);
}
/**
* 异步执行任务
*
* @param runnable
*/
public static void submit(Runnable runnable) {
execute(runnable);
}
/**
* 执行异步任务并获取返回值
*
* @param supplier
* @param <T>
* @return
*/
public static <T> CompletableFuture<T> execute(Supplier<T> supplier) {
return CompletableFuture.supplyAsync(supplier, EXECUTOR);
}
/**
* 获取线程池
*
* @return
*/
public static ThreadPoolExecutor getExecutor() {
return EXECUTOR;
}
/**
* 设置本工具使用的线程池
*
* @param executor
*/
public static void setExecutor(ContextSupportedThreadPoolExecutor executor) {
if (executor == null) {
throw new NullPointerException("executor 不得为空");
}
if (executor.isShutdown() || executor.isTerminated() || executor.isTerminating()) {
throw new IllegalStateException("executor 状态不得为shutdown");
}
ContextSupportedThreadPoolExecutor old = EXECUTOR;
EXECUTOR = executor;
if (old != null) {
old.shutdown();
}
}
/**
* 获取支持上下文注入的Runnable
*
* @param runnable
* @return 包装后的runnable
*/
public static AsyncRunnable getAsyncRunnable(Runnable runnable) {
return new AsyncRunnable(runnable);
}
/**
* 注册注入器
*
* @param cls
*/
public static void registerContextInjector(Class<? extends AsyncContextInjector> cls) {
if (cls != null && !ASYNC_CONTEXT_INJECTOR_CLS_LIST.contains(cls)) {
ASYNC_CONTEXT_INJECTOR_CLS_LIST.add(cls);
}
}
}
自动拦截所有异步线程池操作
ContextSupportedExecutorAspect.java
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
/**
* @author humorchen
* date: 2024/8/7
* description: 支持上下文的executor
**/
@Component
@Aspect
@Slf4j
public class ContextSupportedExecutorAspect {
@Around("execution(* java.util.concurrent.Executor.execute(java.lang.Runnable))")
public Object contextSupportedExecutor(ProceedingJoinPoint joinPoint) throws Throwable {
Object[] args = joinPoint.getArgs();
if (args != null && args.length > 0) {
Object arg = args[0];
if (arg instanceof Runnable && !(arg instanceof ContextSupportedAsyncUtil.AsyncRunnable)) {
args[0] = ContextSupportedAsyncUtil.getAsyncRunnable((Runnable) arg);
}
}
return joinPoint.proceed(args);
}
}
自定义上下文注入逻辑示范
SpringWebRequestContextInjector.java
异步线程上下文自动注入当前请求(默认提供)
/**
* 支持传递ServletRequestAttributes对象用于获取当前请求HttpServletRequest
*/
public static class SpringWebRequestContextInjector implements AsyncContextInjector {
private ServletRequestAttributes requestAttributes;
/**
* 初始化
* 读取A线程的上下文并保存到当前对象
*/
@Override
public void init() {
requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
}
/**
* 注入上下文信息
* 注入init阶段存储的上下文到B线程的上下文中
*/
@Override
public void inject() {
RequestContextHolder.setRequestAttributes(requestAttributes);
}
/**
* 移除上下文信息
* 清理B线程刚注入的上下文
*/
@Override
public void remove() {
RequestContextHolder.resetRequestAttributes();
}
}