TraceId在线程池及@Async异步线程中如何传递

news2024/11/15 17:57:49

何时使用线程池

提起线程池相信大家都不陌生,什么情况下会考虑使用线程池呢?我总结了一下大概是这么几种情况

第一种情况:程序中有任务需要异步执行。这样不可避免的要分配线程去执行,如果这个异步任务执行的频次很低,那就不用做额外处理,每次新建线程也是可以的;如果这个异步任务执行的很频繁,那么每次都新建线程就会有问题,问题一是线程的频繁创建与销毁会浪费系统资源,问题二是对系统资源的消耗不可控,无限制的创建线程会导致服务崩溃。

第二种情况:异步执行任务可以提高程序运行效率。主逻辑执行到某一阶段后,有几段程序可以并行执行,这几段程序结束后再将结果汇总到一起,继续执行主逻辑,类似于总分总的步骤。那么这几个可以并行执行的逻辑,就可以分几个线程去异步执行,这样就节省了主线程的运行时间。这种情况下,因为都考虑使用异步执行来提升效率了,那这个业务必然也是个比较关键且执行频繁的业务,自然也得使用线程池将线程资源控制在可控范围内。

使用线程池是管理线程的一个很好实践,它可以将线程的数量控制在一个合理的范围内,不至于耗尽系统资源;它还可以复用线程,避免线程的重复创建与销毁;在这个池子满足不了突发流量时也可以设定兜底策略。

池化思想

其实线程池就是池化思想的一个落地场景,其他的像数据库连接池,Redis连接池,对象池等都是其落地场景。

池化思想是一种优化资源管理的技术,它主要用于解决频繁创建和销毁资源带来的性能开销和系统负担问题。池化技术通过重用已有的资源实例,而不是每次都创建新的资源,从而提高系统的效率和稳定性。总结下来就是两个优点

  1. 性能提升。通过复用资源,减少了创建和销毁资源的时间消耗,减少系统资源浪费,提高系统性能。
  2. 资源控制。池化机制能够控制资源的使用上限,防止系统因为资源过度分配而崩溃。

如何跟踪处理线程池执行任务时产生的异常

接下来回到本章的主线上,如果我们在项目中使用了线程池,那么该怎么跟踪和处理线程池执行任务时产生的异常呢?

线程提交任务的两种方式及其抛异常的策略

在提出解决方案前,先来重申下线程提交任务的两种方式,及其抛异常的策略

1. execute() 方式提交任务,这种方式主要用于不需要接收返回结果的情况(Runnable 接口),当某个任务发生异常后,线程池会将这个异常打印出来,日志记录中也能观测到这个异常

举例说明

private void methodExecute() {
    // 创建一个容量为1的线程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
    // 循环提交任务,在i=3时提交的任务会抛出异常
    for (int i = 0; i < 10; i++) {
        int finalI = i;
        threadPoolExecutor.execute(() -> {
            if( finalI == 3) {
                int val = 10/0;
            }
            System.out.println("任务执行完成 编号-" + finalI+", 执行线程:"+Thread.currentThread().getName());
        });
        // 为了更好的观测现象这里将每次提交任务的间隔设为1秒
        ThreadUtil.sleep(1,TimeUnit.SECONDS);
    }

    ThreadUtil.sleep(10,TimeUnit.SECONDS);
    System.out.println("main - finish");
}

执行日志

任务执行完成 编号-0, 执行线程:pool-1-thread-1
任务执行完成 编号-1, 执行线程:pool-1-thread-1
任务执行完成 编号-2, 执行线程:pool-1-thread-1
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
	at com.hml.threadtest.dealexception.DealException.lambda$methodExecute$0(DealException.java:47)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
任务执行完成 编号-4, 执行线程:pool-1-thread-2
任务执行完成 编号-5, 执行线程:pool-1-thread-2
任务执行完成 编号-6, 执行线程:pool-1-thread-2
任务执行完成 编号-7, 执行线程:pool-1-thread-2
任务执行完成 编号-8, 执行线程:pool-1-thread-2
任务执行完成 编号-9, 执行线程:pool-1-thread-2
main - finish

为什么异常日志会被打印出来呢?这是因为异常的任务会走到这个方法里 java.lang.ThreadGroup#uncaughtException

另外,可以看到,线程池中当某个任务执行发生异常且未被捕获时,该任务的执行线程也会被销毁,线程池会再创建一个新的线程来补充到核心线程中。这样的话,在极端情况下,如果提交到该线程池的任务都会抛出异常的话,那么线程池的线程就会频繁的销毁重建,失去了池化的意义。

2.submit() 方式提交任务,使用这个方式多是在需要接收返回结果的情况下(Callable 接口),当然 Runnable 接口定义的线程 submit() 方法也能接。当某个任务发生异常后,线程池不再打印异常,而是将异常包装起来,包装成 ExecutionException ,只有在 futrue.get() 方法调用时才会触发此异常。

举例说明

private void methodSubmit02() {
    // 创建一个线程数为1的线程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

    // 循环执行任务,i=3时会提交一个有异常的任务
    List<Future<Integer>> futureList = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        int finalI = i;
        Future<Integer> future = threadPoolExecutor.submit(() -> {
            if (finalI == 3) {
                int val = 10 / 0;
            }
            System.out.println("任务执行完成 编号-" + finalI+", 执行线程:"+Thread.currentThread().getName());
            return finalI;
        });
        futureList.add(future);
    }

    // 等待5秒,等上面10个任务都处理完
    ThreadUtil.sleep(5,TimeUnit.SECONDS);
    System.out.println("开始获取结果");
    for (int i = 0; i < futureList.size(); i++) {
        try {
            Integer result = futureList.get(i).get();
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("编号"+i+"发生异常:"+ExceptionUtil.stacktraceToString(e));
        }
    }

    System.out.println("main - finish");
}

执行日志

任务执行完成 编号-0, 执行线程:pool-1-thread-1
任务执行完成 编号-1, 执行线程:pool-1-thread-1
任务执行完成 编号-2, 执行线程:pool-1-thread-1
任务执行完成 编号-4, 执行线程:pool-1-thread-1
任务执行完成 编号-5, 执行线程:pool-1-thread-1
任务执行完成 编号-6, 执行线程:pool-1-thread-1
任务执行完成 编号-7, 执行线程:pool-1-thread-1
任务执行完成 编号-8, 执行线程:pool-1-thread-1
任务执行完成 编号-9, 执行线程:pool-1-thread-1
开始获取结果
编号3发生异常:java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at com.hml.threadtest.dealexception.DealException.methodSubmit02(DealException.java:107)
	at com.hml.threadtest.dealexception.DealException.main(DealException.java:32)
Caused by: java.lang.ArithmeticException: / by zero
	at com.hml.threadtest.dealexception.DealException.lambda$methodSubmit02$2(DealException.java:95)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

main - finish

通过日志可以看出来,通过 submit 方法提交的任务在发生异常后,线程池会把异常吃掉,执行线程也不会被销毁,单从日志看不到任何异常,但是当调用 future.get() 方法时,异常会抛出来。

通过上面的两个案例,可以看到,excute 与 submit 两种方式提交的任务,线程池对它们产生异常的处理方式是不一样的,那怎么能将异常处理的方式统一起来呢?很简单粗暴的方式就是在每个任务的最外层加上 try catch 自行编码处理异常。

当然你可能说 还有 UncaughtExceptionHandler 和 afterExecute 的方式来处理异常,但是 UncaughtExceptionHandler 只能处理 excute 方式的产生的异常,afterExecute 方式倒是可以编码处理两种异常,但是在配合MDC使用时会存在 traceId 混乱的情况,不知道是不是我的程序问题。这里我采用的方式是自定义线程池+包装任务。

日志跟踪

当项目出现线上问题的时候,我们通常的做法就是

  1. 先定位异常日志
  2. 然后根据traceId跟踪执行链路判断问题原因

在使用SpringBoot的前提下,通常我们跟踪一个程序执行链路的方法就是使用MDC在执行线程中加入traceId, 然后在日志中打印出 traceId 。这种方式在大多数情况下没有问题,但是遇到异步逻辑时就会遇到 traceId传递不到异步线程中的情况。 网上的解决方法也有很多,大概有以下这么几种

1. 自定义 MDCAdapter,将 copyOnThreadLocal 这个属性 由 ThreadLocal 换为 TransmittableThreadLocal(阿里提供的包),并要配置MDC的监听器使这个 MDCAdapter 生效。 在创建线程池时要用 TransmittableThreadLocal 包提供的 TtlExecutors 包一下。

2. 使用MDC自带的方法,MDC.getCopyOfContextMap() 复制主线程的 MDC上下文,MDC.setContextMap() 将复制的主线程上下文赋值到到异步线程相关的MDC中。

这里我采用第二种方法,它不用引入额外的包。

自定义线程池+包装任务

采用第二种方法,在每次向线程池提交任务时都需要将 MDC的上下文复制一下并赋值给线程池中的执行线程,这里我们可以将这个逻辑提炼出来,封装一下,我采用的是自定义线程池的方法。

首先配置一下日志格式,加上traceId,简单点的话,可以直接在配置文件中配置

# 控制台日志输出格式
logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss} [%thread] [traceId=%X{traceId}] %-5level %logger{36} - %msg%n

# 文件日志输出格式
logging.pattern.file=%d{yyyy-MM-dd HH:mm:ss} [%thread] [traceId=%X{traceId}] %-5level %logger{36} - %msg%n

然后自定义一个类 CustomThreadPoolExecutor 继承了 ThreadPoolTaskExecutor。重写了execute 和 submit 方法,在执行父类的原逻辑前,自己包装了一下提交的任务(wrap 方法),加入了复制 MDC上下文的逻辑。

这里简单说下 ThreadPoolTaskExecutor ,这是Spring 提供的一个创建线程池的类,内部封装了 JUC包里ThreadPoolExecutor ,用起来的感觉和 ThreadPoolExecutor 差不多,但是它有一个独特的方法就是  submitListenable ,返回类型是 ListenableFuture ,它是 Future 的扩展,可以注册回调函数来监听任务的执行结果。

比如,可以这样用

ListenableFuture<Integer> listenableFuture = taskExecutor.submitListenable(() -> {
    Thread.sleep(1000);
    return 42;
});

listenableFuture.addCallback(result -> {
    System.out.println("Task completed successfully with result: " + result);
}, ex -> {
    System.err.println("Task failed with exception: " + ex.getMessage());
});

这种不用阻塞等待返回结果的用法可能在一些场景下会有用。

我这里自定义线程池是继承了 ThreadPoolTaskExecutor,如果继承JUC里的ThreadPoolExecutor也是没有任何问题的。

回到我们的 CustomThreadPoolExecutor 类上,下面是代码

import cn.hutool.core.exceptions.ExceptionUtil;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;


@Slf4j
public class CustomThreadPoolExecutor extends ThreadPoolTaskExecutor {


    @Override
    public void execute(Runnable task) {
        super.execute(wrap(task));
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        super.execute(wrap(task), startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(wrap(task));
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(wrap(task));
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        return super.submitListenable(wrap(task));
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        return super.submitListenable(wrap(task));
    }

    @Override
    protected void cancelRemainingTask(Runnable task) {
        super.cancelRemainingTask(task);
    }

    private Runnable wrap(Runnable runnable) {
        Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
        return () -> {
            MDC.setContextMap(copyOfContextMap);
            try {
                runnable.run();
            } catch (Exception e) {
                log.error("have error: " + ExceptionUtil.stacktraceToString(e));
            } finally {
                MDC.clear();
            }
        };
    }

    private <T> Callable<T> wrap(Callable<T> callable) {
        Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
        return () -> {
            MDC.setContextMap(copyOfContextMap);
            try {
                return callable.call();
            } catch (Exception e) {
                log.error("have error: " + ExceptionUtil.stacktraceToString(e));
                // 后续的 future.get() 我们仍然期望能获取到异常,所以这里将异常抛出,并记录下这个异常,方便traceId跟踪日志
                throw e;
            } finally {
                MDC.clear();
            }
        };
    }


}

定义好了这个线程池类,接下来项目中直接用这个类来创建线程池就好,其他的逻辑不需要变动。

来看两个代码示例

第一个示例,使用 excute 方式提交任务
@SpringBootTest
@Slf4j
public class ThreadTests {
    @BeforeEach
    void initCustomThreadPool() {
        // 初始化线程池
        customThreadPoolExecutor = new CustomThreadPoolExecutor();
        customThreadPoolExecutor.setCorePoolSize(3);
        customThreadPoolExecutor.setMaxPoolSize(3);
        customThreadPoolExecutor.setKeepAliveSeconds(60);
        customThreadPoolExecutor.setQueueCapacity(100);
        customThreadPoolExecutor.setAllowCoreThreadTimeOut(false);
        customThreadPoolExecutor.setThreadNamePrefix("custom-thread-");
        customThreadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        customThreadPoolExecutor.setWaitForTasksToCompleteOnShutdown(true);
        customThreadPoolExecutor.initialize();
    }

    @Test
    void testCustomSpringThreadPollExecute() {

        // 创建 n 个任务,每个任务都有自己的traceId
        for (int i = 0; i < 5; i++) {
            int finalI = i;
            String uuid = UUID.randomUUID().toString(true);
            MDC.put("traceId", uuid);
            log.info("准备执行任务,编号-" + finalI);

            customThreadPoolExecutor.execute(() -> {
                if (finalI == 3) {
                    int val = 10 / 0;
                }
                log.info("任务执行完成 编号-" + finalI);
            });

            MDC.clear();
        }

        // 阻塞主线程,不让它结束
        try {
            System.in.read();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

    }

}

执行日志

2024-09-23 18:08:02 [main] [traceId=e2decaf36cf44541993b0fb32504e296] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-0
2024-09-23 18:08:02 [main] [traceId=ddb2895605254f20b66a57ed4ec6655b] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-1
2024-09-23 18:08:02 [main] [traceId=180d676f893f42409ed0b452e3afdbfa] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-2
2024-09-23 18:08:02 [custom-thread-1] [traceId=e2decaf36cf44541993b0fb32504e296] INFO  c.h.t.ThreadTestApplicationTests - 任务执行完成 编号-0
2024-09-23 18:08:02 [main] [traceId=80f97f6cfee049deba6548c6d40fe755] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-3
2024-09-23 18:08:02 [main] [traceId=c13df8bf1405416199664beaadfd9446] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-4
2024-09-23 18:08:02 [custom-thread-2] [traceId=ddb2895605254f20b66a57ed4ec6655b] INFO  c.h.t.ThreadTestApplicationTests - 任务执行完成 编号-1
2024-09-23 18:08:02 [custom-thread-3] [traceId=180d676f893f42409ed0b452e3afdbfa] INFO  c.h.t.ThreadTestApplicationTests - 任务执行完成 编号-2
2024-09-23 18:08:02 [custom-thread-2] [traceId=c13df8bf1405416199664beaadfd9446] INFO  c.h.t.ThreadTestApplicationTests - 任务执行完成 编号-4
2024-09-23 18:08:02 [custom-thread-1] [traceId=80f97f6cfee049deba6548c6d40fe755] ERROR c.h.t.c.CustomThreadPoolExecutor - have error: java.lang.ArithmeticException: / by zero
	at com.hml.threadtest.ThreadTestApplicationTests.lambda$testCustomSpringThreadPollExecute$2(ThreadTestApplicationTests.java:177)
	at com.hml.threadtest.customthreadpool.CustomThreadPoolExecutor.lambda$wrap$0(CustomThreadPoolExecutor.java:62)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

可以看到线程池内部的线程也能正确打印出执行任务的traceId了,并且异常的任务也被捕获到了,日志中也带着traceId, 这下就很容易根据 traceId 跟踪问题了。

第二个示例,使用 submit 方式提交任务
@Test
void testCustomSpringThreadPollSubmitResult() {

    List<Future<Integer>> futureList = new ArrayList<>();

    // 创建 n 个任务,每个任务都有自己的traceId
    for (int i = 0; i < 5; i++) {
        int finalI = i;
        String uuid = UUID.randomUUID().toString(true);
        MDC.put("traceId", uuid);
        log.info("准备执行任务,编号-" + finalI);

        Future<Integer> future = customThreadPoolExecutor.submit(() -> {
            if (finalI == 3) {
                int val = 10 / 0;
            }
            log.info("任务执行完成 编号-" + finalI);
            return finalI;
        });
        futureList.add(future);

        MDC.clear();
    }

    // 等任务执行完,方便看日志
    ThreadUtil.sleep(5,TimeUnit.SECONDS);
    for (Future<Integer> future : futureList) {
        try {
            Integer result = future.get();
            log.info("receive result -> " + result);
        } catch (Exception e) {
            log.error("future.get() get error -> " + ExceptionUtil.stacktraceToString(e));
        }
    }

}

执行日志

2024-09-23 18:16:27 [main] [traceId=157907dd4bb1413f98b7781e29b12d9b] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-0
2024-09-23 18:16:27 [custom-thread-1] [traceId=157907dd4bb1413f98b7781e29b12d9b] INFO  c.h.t.ThreadTestApplicationTests - 任务执行完成 编号-0
2024-09-23 18:16:27 [main] [traceId=bc732a731db148f6b65e2759c42bc828] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-1
2024-09-23 18:16:27 [main] [traceId=72c57baf07a24a1083f8fbbbf282814a] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-2
2024-09-23 18:16:27 [custom-thread-2] [traceId=bc732a731db148f6b65e2759c42bc828] INFO  c.h.t.ThreadTestApplicationTests - 任务执行完成 编号-1
2024-09-23 18:16:27 [custom-thread-3] [traceId=72c57baf07a24a1083f8fbbbf282814a] INFO  c.h.t.ThreadTestApplicationTests - 任务执行完成 编号-2
2024-09-23 18:16:27 [main] [traceId=436fa1b93aa84fd996ffc41e461144a3] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-3
2024-09-23 18:16:27 [main] [traceId=0bf0e109e35a4fa887522737e0377051] INFO  c.h.t.ThreadTestApplicationTests - 准备执行任务,编号-4
2024-09-23 18:16:27 [custom-thread-2] [traceId=0bf0e109e35a4fa887522737e0377051] INFO  c.h.t.ThreadTestApplicationTests - 任务执行完成 编号-4
2024-09-23 18:16:27 [custom-thread-1] [traceId=436fa1b93aa84fd996ffc41e461144a3] ERROR c.h.t.c.CustomThreadPoolExecutor - have error: java.lang.ArithmeticException: / by zero
	at com.hml.threadtest.ThreadTestApplicationTests.lambda$testCustomSpringThreadPollSubmitResult$4(ThreadTestApplicationTests.java:237)
	at com.hml.threadtest.customthreadpool.CustomThreadPoolExecutor.lambda$wrap$1(CustomThreadPoolExecutor.java:76)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

2024-09-23 18:16:32 [main] [traceId=] INFO  c.h.t.ThreadTestApplicationTests - receive result -> 0
2024-09-23 18:16:32 [main] [traceId=] INFO  c.h.t.ThreadTestApplicationTests - receive result -> 1
2024-09-23 18:16:32 [main] [traceId=] INFO  c.h.t.ThreadTestApplicationTests - receive result -> 2
2024-09-23 18:16:32 [main] [traceId=] ERROR c.h.t.ThreadTestApplicationTests - future.get() get error -> java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at com.hml.threadtest.ThreadTestApplicationTests.testCustomSpringThreadPollSubmitResult(ThreadTestApplicationTests.java:251)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(
2024-09-23 18:16:32 [main] [traceId=] INFO  c.h.t.ThreadTestApplicationTests - receive result -> 4

可以看到 submit 提交的任务,如果发生异常,也会将异常日志打印出来了,并且还附带了 traceId,由于 CustomThreadPoolExecutor 中并没有吃掉这个异常,仍然抛了出来,所以 futrue.get() 方法仍然能获取到异常,以便主线程中进行进一步的处理。

看,在不引入额外jar包的情况下,我们仅仅通过自定义了一个线程池类,就将 traceId 在线程池中的传递问题解决了。

traceId在@Async异步线程中的传递

@Async 是 Spring 的一个注解,如果将它标注在一个方法上,代表这个方法会异步执行。默认情况下 @Async 搭配的异步执行器是 SimpleAsyncTaskExecutor,它持有一个 ThreadPoolTaskExecutor 线程池对象, 该线程池的配置信息在TaskExecutionProperties类中, 如下图所示(Pool是TaskExecutionProperties的内部类),核心线程数是8,最大线程数是int 最大值。

如果你想用该默认线程池,但是想调整配置的话,可以在项目的配置文件中设置,如下所示

spring.task.execution.pool.core-size=8
spring.task.execution.pool.max-size=10
spring.task.execution.pool.keep-alive=60s
spring.task.execution.pool.allow-core-thread-timeout=false
spring.task.execution.pool.queue-capacity=100

这是最普通的 ThreadPoolTaskExecutor 线程池,traceId 是传递不进来的,要想使 @Async 也能传递 traceId, 那就得用我们自定义的线程池 CustomThreadPoolExecutor, 下面给出配置类来替换线程池

新建配置类AsyncThreadPollConfig,实现AsyncConfigurer接口,重写getAsyncExecutor() 方法来替换 @Async 默认对应的线程池,另外也提供了一个自定义的可选备用线程池 customThreadPoll。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class AsyncThreadPollConfig implements AsyncConfigurer {


    @Bean
    public CustomThreadPoolExecutor defaultThreadPoll(){
        CustomThreadPoolExecutor customThreadPoolExecutor = new CustomThreadPoolExecutor();
        customThreadPoolExecutor.setCorePoolSize(10);
        customThreadPoolExecutor.setMaxPoolSize(15);
        customThreadPoolExecutor.setKeepAliveSeconds(60);
        customThreadPoolExecutor.setQueueCapacity(100);
        customThreadPoolExecutor.setAllowCoreThreadTimeOut(false);
        customThreadPoolExecutor.setThreadNamePrefix("default-async-custom-thread-");
        customThreadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        customThreadPoolExecutor.setWaitForTasksToCompleteOnShutdown(true);
        customThreadPoolExecutor.initialize();
        return customThreadPoolExecutor;
    }

    @Bean
    public CustomThreadPoolExecutor customThreadPoll(){
        CustomThreadPoolExecutor customThreadPoolExecutor = new CustomThreadPoolExecutor();
        customThreadPoolExecutor.setCorePoolSize(20);
        customThreadPoolExecutor.setMaxPoolSize(40);
        customThreadPoolExecutor.setKeepAliveSeconds(60);
        customThreadPoolExecutor.setQueueCapacity(100);
        customThreadPoolExecutor.setAllowCoreThreadTimeOut(false);
        customThreadPoolExecutor.setThreadNamePrefix("custom-async-custom-thread-");
        customThreadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        customThreadPoolExecutor.setWaitForTasksToCompleteOnShutdown(true);
        customThreadPoolExecutor.initialize();
        return customThreadPoolExecutor;
    }

    /**
     *  @Async 注解默认使用的线程池
     */
    @Override
    public Executor getAsyncExecutor() {
        return defaultThreadPoll();
    }
}

来个测试案例,看看这个配置类生效没

定义一个service类,包含一个异步方法,这里我们使用 @Async默认的线程池

@Service
@Slf4j
public class AsyncService {

    @Async
    public void exe(Integer num) {
        log.info("get num -> "+num);
    }
}

测试方法

@SpringBootTest
@Slf4j
public class ThreadTest {

    @Autowired
    private AsyncService asyncService;

    @Test
    void testAsync() {

        for (int i = 0; i < 3; i++) {
            MDC.put(traceId, UUID.randomUUID().toString(true));
            log.info("start exe num -> " + i);
            asyncService.exe(i);
            MDC.clear();
        }

        try {
            System.in.read();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        
    }
}

执行结果

2024-09-24 10:59:52 [main] [traceId=ac6897c2d6fe4fcba381b7abb482eaeb] INFO  c.h.t.ThreadTestApplicationTests - start exe num -> 0
2024-09-24 10:59:52 [main] [traceId=cb072d9147604481b70d629e164181da] INFO  c.h.t.ThreadTestApplicationTests - start exe num -> 1
2024-09-24 10:59:52 [main] [traceId=0bc2ce62854b47689f59f6c77f8e465c] INFO  c.h.t.ThreadTestApplicationTests - start exe num -> 2
2024-09-24 10:59:52 [default-async-custom-thread-1] [traceId=ac6897c2d6fe4fcba381b7abb482eaeb] INFO  com.hml.threadtest.AsyncService - get num -> 0
2024-09-24 10:59:52 [default-async-custom-thread-3] [traceId=0bc2ce62854b47689f59f6c77f8e465c] INFO  com.hml.threadtest.AsyncService - get num -> 2
2024-09-24 10:59:52 [default-async-custom-thread-2] [traceId=cb072d9147604481b70d629e164181da] INFO  com.hml.threadtest.AsyncService - get num -> 1

可以看到我们配置的默认线程池生效了,traceId 也正常传递了。

再来个测试案例,这次我们指定一下 @Async 要使用的线程池,给 @Async 加上 value 值即可,customThreadPoll 就是我们上面在 AsyncThreadPollConfig 配置类中提前定义好的 备选线程池。

@Service
@Slf4j
public class AsyncService {

    @Async("customThreadPoll")
    public void exe(Integer num) {
        log.info("get num -> "+num);
    }

}

testAsync测试方法不用动,再次执行以下,执行结果如下

2024-09-24 11:58:01 [main] [traceId=1e78ddaa36164e0ca0a92ffd803deea6] INFO  c.h.t.ThreadTestApplicationTests - start exe num -> 0
2024-09-24 11:58:01 [main] [traceId=4e75eb99fd174562bfe6ea4ec0e2d754] INFO  c.h.t.ThreadTestApplicationTests - start exe num -> 1
2024-09-24 11:58:01 [main] [traceId=24a3b77c2ae346d18b5bbb91b97919cf] INFO  c.h.t.ThreadTestApplicationTests - start exe num -> 2
2024-09-24 11:58:01 [custom-async-custom-thread-1] [traceId=1e78ddaa36164e0ca0a92ffd803deea6] INFO  com.hml.threadtest.AsyncService - get num -> 0
2024-09-24 11:58:01 [custom-async-custom-thread-3] [traceId=24a3b77c2ae346d18b5bbb91b97919cf] INFO  com.hml.threadtest.AsyncService - get num -> 2
2024-09-24 11:58:01 [custom-async-custom-thread-2] [traceId=4e75eb99fd174562bfe6ea4ec0e2d754] INFO  com.hml.threadtest.AsyncService - get num -> 1

可以看到,@Async 对应的线程池切换成功。

TransmittableThreadLocal

抛开日志不谈,如果我们有其他需求需要在 ThreadLocal 中存放一些内容,并且有可能会往子线程或者线程池中传递下去的,那么采用 TransmittableThreadLocal 是一个不错的选择,TransmittableThreadLocal 类继承了 InheritableThreadLocal ,它继承了 InheritableThreadLocal 可以往子线程中传递的特性,同时也扩展了往线程池中传递这个特性,不过线程池需要用 TtlExecutors 修饰一下。

下面举例说明一下,创建一个测试方法,测试子线程传递与线程池传递两种情况

@Test
void testThread() throws IOException {
    // 创建一个TransmittableThreadLocal
    TransmittableThreadLocal<Integer> threadLocal = new TransmittableThreadLocal<>();

    // 1.现在测试 TransmittableThreadLocal 在子线程中传递的情况
    threadLocal.set(456123);
    log.info("main thread threadLocal value -> {}", threadLocal.get());
    new Thread(() -> {
        log.info("son thread threadLocal value -> {}", threadLocal.get());
    }).start();
    threadLocal.remove();

    // 2.现在测试 TransmittableThreadLocal 在线程池中传递的情况
    // 创建一个容量为1的线程池
    ExecutorService threadPoolExecutor = new ThreadPoolExecutor(1,
            1,
            60,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(100));

    // 用TtlExecutors修饰线程池
    threadPoolExecutor = TtlExecutors.getTtlExecutorService(threadPoolExecutor);

    // 创建10个任务,让线程池去执行,看看该线程池中的这个线程是否能正常切换 threadLocal 上下文
    for (int i = 0; i < 10; i++) {
        threadLocal.set(i);
        threadPoolExecutor.execute(() -> {
            log.info(" thread pool threadLocal value -> {} ", threadLocal.get());
            threadLocal.remove();
        });
        threadLocal.remove();
    }

    System.in.read();
}

执行日志

2024-09-24 16:44:28 [main] [traceId=] INFO  c.h.t.ThreadTestApplicationTests - main thread threadLocal value -> 456123
2024-09-24 16:44:28 [Thread-3] [traceId=] INFO  c.h.t.ThreadTestApplicationTests - son thread threadLocal value -> 456123
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 0 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 1 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 2 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 3 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 4 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 5 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 6 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 7 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 8 
2024-09-24 16:44:28 [pool-1-thread-1] [traceId=] INFO  c.h.t.ThreadTestApplicationTests -  thread pool threadLocal value -> 9 

可以看到,两种情况都OK。

文末提醒一下大家,无论是直接使用 ThreadLocal ,还是使用它的衍生产品如 MDC,TransmittableThreadLocal 等,在使用结束后都记得清理一下,如 MDC.clear();  threadLocal.remove(); 等方法, 否则的话很容易会出现内存泄漏的情况。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2161373.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

模拟实现 string 类的一些常用函数

目录 构造函数 析构函数 拷贝构造 赋值重载 迭代器( begin() 和 end() ) 运算符重载流插入( operator << ( ) ) size() capacity() 运算符重载operator[ ] clear() reserve ( ) push_back ( ) append ( ) 运算符重载 operator ( ) insert ( ) erase ( )…

IO相关流

IO流 一、C语言的输入与输出1、介绍2、输入输出缓冲区&#xff08;1&#xff09;介绍&#xff08;2&#xff09;示意图 二、流1、介绍2、主要特点 三、CIO流1、介绍2、示意图 四、iostream1、介绍2、基本概念3、注意 五、类型转换1、operator bool&#xff08;1&#xff09;介绍…

计算机毕业设计非遗项目网站 登录注册搜索 评论留言资讯 前后台管理/springboot/javaWEB/J2EE/MYSQL数据库/vue前后分离小程序

遗项目网站需求&#xff0c;以下是一个基于Spring Boot、Java Web、J2EE技术栈&#xff0c;使用MySQL数据库&#xff0c;并结合Vue实现前后端分离的简要设计方案&#xff1a; 系统功能概述 ‌用户登录与注册‌&#xff1a;实现用户的注册、登录功能&#xff0c;确保用户信息的…

【Python】PyCharm: 强大的 Python 开发环境

⭕️宇宙起点 &#x1f4e2; 引言&#x1f3ac; 什么是 PyCharm&#xff1f;&#x1f528; PyCharm 的核心特性1. 智能代码编辑2. 调试和测试3. 项目和代码结构导航4. 集成 AI 助手5. 远程开发6. 集成数据库7. 科学工具8. 版本控制集成9. Web 开发 &#x1f4e6; 安装 PyCharm&…

【NLP】daydayup 词向量训练模型word2vec

词嵌入算法 word2vec word2vec是一种高效训练词向量的模型&#xff0c;基本出发点是上下文相似的两个词。它们的词向量也应该相似。一般分为CBOW&#xff08;Continuous Bag-of-Words&#xff09;与 Skip-Gram CBOW 词袋模型&#xff0c;使用中心词周围的词来预测中心词&…

《微信小程序实战(4) · 地图导航功能》

&#x1f4e2; 大家好&#xff0c;我是 【战神刘玉栋】&#xff0c;有10多年的研发经验&#xff0c;致力于前后端技术栈的知识沉淀和传播。 &#x1f497; &#x1f33b; CSDN入驻不久&#xff0c;希望大家多多支持&#xff0c;后续会继续提升文章质量&#xff0c;绝不滥竽充数…

seL4 Mapping(三)

官网链接: Mapping Mapping 这节课程主要是介绍seL4的虚存管理。 虚存 Virtual memory 除了用于操作硬件分页结构的内核原语之外&#xff0c;seL4不提供虚拟内存管理。用户必须为创建中间级分页结构&#xff0c;映射页面以及取消映射页面提供服务。 用户可以随意的定义他们…

Python图表显示添加中文

import re import numpy as np import matplotlib.pyplot as plt from matplotlib.font_manager import FontProperties# 动态加载字体文件 font_path /usr/local/sunlogin/res/font/wqy-zenhei.ttc # 替换为实际字体路径 my_font FontProperties(fnamefont_path)# 定义日志…

Go语言基础学习01-Liunx下Go开发环境配置;源码组织方式;go build/install/get详解

目录 Linux环境下配置安装VScode并配置Go语言开发环境Go语言源码的组织方式Go语言源码安装后的结果Go程序构建和安装的过程go build扩展go get 命令详解 之前学习过Go语言&#xff0c;学习的时候没有记录笔记&#xff0c;最近找了个极客时间的Go语言36讲&#xff0c;打算时间学…

影响RPA流程稳定运行的若干因素|实在RPA研究

RPA发展现状 当前&#xff0c;中国正处于实现高质量发展、数字化转型升级的关键时期。RPA作为数字化转型的一项重要工具&#xff0c;已经开始在许多领域发挥积极作用。 RPA&#xff08;Robotic Process Automation 机器人流程自动化&#xff09;是一种通过软件机器人自动执行…

stm32 keil有一些别人的工程在你这打开为什么会乱码?如何解决的

因为别人编辑代码使用的编辑器和你的不一样&#xff0c;要更正可以调一下自己的翻译器编码格式 也可以直接换掉文件的格式&#xff0c; 用记事本打开文件&#xff0c;然后点会另存为&#xff0c;下面有个编码格式选择&#xff0c;换成你自己的就行

Neko一个在Docker环境下的虚拟浏览器

Neko是一个在 Docker 中运行并使用 WebRTC 技术的自托管虚拟浏览器。Neko 是一个强大的工具&#xff0c;可让您在虚拟环境中运行功能齐全的浏览器&#xff0c;使您能够从任何地方安全、私密地访问互联网。使用 Neko&#xff0c;您可以像在常规浏览器上一样浏览 Web、运行应用程…

Python接口测试实践—参数化测试、数据驱动测试和断言的使用

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 在Python接口测试实践中&#xff0c;参数化测试、数据驱动测试和断言是常用的技术手段。 参数化测试 参数化测试是指将测试用例中的某些部分&#xff08;如输入数…

蓝桥杯算法之暴力

暴力 1.十进制数转换成罗马数字 2.判断给出的罗马数字是否正确 小知识 %&#xff08;模除&#xff09;&#xff1a; % 符号用作模除&#xff08;或取模&#xff09;运算符。模除运算是一种数学运算&#xff0c;它返回两个数相除的余数。 具体来说&#xff0c;如果 a 和 b 是…

初识 C++ ( 1 )

引言&#xff1a;大家都说c是c的升级语言。我不懂这句话的含义后来看过解释才懂。 一、面向过程语言和面向对象语言 我们都知道C语言是面向过程语言&#xff0c;而C是面向对象语言&#xff0c;说C和C的区别&#xff0c;也就是在比较面向过程和面向对象的区别。 1.面向过程和面向…

自然语言处理实战项目:从理论到实现

一、引言 自然语言处理&#xff08;NLP&#xff09;是计算机科学、人工智能和语言学交叉的领域&#xff0c;旨在让计算机能够理解、处理和生成人类语言。随着互联网的飞速发展&#xff0c;大量的文本数据被产生&#xff0c;这为自然语言处理技术的发展提供了丰富的素材&#xf…

【动态规划】(五)动态规划——子序列问题

动态规划——子序列问题 子序列问题☆ 最长递增子序列&#xff08;离散&#xff09;最长连续递增序列&#xff08;连续&#xff09;最大子序和&#xff08;连续&#xff09;最长重复子数组&#xff08;连续&#xff09;☆ 最长公共子序列&#xff08;离散-编辑距离过渡&#xf…

【驱动】修改USB转串口设备的属性,如:Serial

1、查看串口信息 在Windows上,设备管理窗口中查看设备号 2、修改串口号工具 例如使用:CH34xSerCfg.exe 使用步骤:恢复默认值 - -> 修改 Serial String(或者Product String等属性)–> 写入配置 3、查看设备节点 在linux上使用lsub查看新增的设备信息,如下这个…

python多线程开发的具体示例

用一个具体的示例&#xff0c;展示如何使用 ThreadPoolExecutor 和 asyncio 来并行运行多个任务&#xff0c;并输出结果。 代码&#xff1a; import asyncio import time from concurrent.futures import ThreadPoolExecutorclass WorkJob:def __init__(self, job_id):self.j…

报表做着太费劲?为你介绍四款好用的免费报表工具

1. 山海鲸可视化 介绍&#xff1a; 山海鲸可视化是一款免费的国产可视化报表软件&#xff0c;与许多其他宣传免费的软件不同&#xff0c;山海鲸的报表功能完全免费并且没有任何限制&#xff0c;就连网站管理后台这个功能也是免费的。同时山海鲸可视化还提供了种类丰富的可视化…