虚拟线程(Virtual Thread)也称协程或纤程,是一种轻量级的线程实现,与传统的线程以及操作系统级别的线程(也称为平台线程)相比,它的创建开销更小、资源利用率更高,是 Java 并发编程领域的一项重要创新。
PS:虚拟线程正式发布于 Java 长期支持版(Long Term Suort,LTS)Java 21(也就是 JDK 21)。
虚拟线程是一种在 Java 虚拟机(JVM)层面实现的逻辑线程,不直接和操作系统的物理线程一一对应,因此它可以减少上下文切换所带来的性能开销。
1 开启虚拟线程
spring boot 3中开启虚拟线程,可在yaml配置文件中添加如下配置:
spring:
threads:
virtual:
enabled: true
此时,所有请求tomcat都会使用使用虚拟线程来处理,在controller中打印当前线程:
VirtualThread[#69,tomcat-handler-0]/runnable@ForkJoinPool-1-worker-1
如果需要@Async
注解也支持虚拟线程,需要添加如下配置:
@EnableAsync注解,我这里放到了启动类上的
@Configuration
//保证是在开启虚拟线程的情况下,再启用这个配置
@ConditionalOnProperty(prefix = "spring.threads.virtual", name = "enabled", havingValue = "true")
public class ThreadVirtualConfig {
/**
* Async 注解支持
*
* @return
*/
@Bean
public AsyncTaskExecutor taskExecutor() {
//指定线程名称为virtual-async#,如果不需要指定,可直接 Executors.newVirtualThreadPerTaskExecutor()
return new TaskExecutorAdapter(Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("virtual-async#", 1).factory()));
}
/**
* 让tomcat使用虚拟线程来处理请求
* 如果配置 spring.threads.virtual.enabled = true 配置没生效的话,可开启如下bean配置
* 我这里spring boot 3.3.4 是生效的
*
* @return
*/
/*@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerCustomizer() {
return protocolHandler -> protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
}*/
}
此时在使用@Async
注解的方法中打印当前线程:
VirtualThread[#76,virtual-async#1]/runnable@ForkJoinPool-1-worker-3
虚拟线程创建有以下四种方式:
- Thread.startVirtualThread(Runnable task):创建虚拟线程,并直接启动执行任务
Thread.startVirtualThread(() -> { System.out.println("Do virtual thread."); });
- Thread.ofVirtual().unstarted(Runnable task):只创建虚拟线程,但不直接启动(创建之后通过 start 启动)
Thread vt = Thread.ofVirtual().unstarted(()->{ System.out.println("Do virtual thread."); }); vt.start();
- Thread.ofVirtual().factory():先创建虚拟线程工厂,然后再使用工厂创建虚拟线程,之后再调用 start() 方法进行执行
ThreadFactory tf = Thread.ofVirtual().factory(); Thread vt = tf.newThread(()->{ System.out.println("Do virtual thread."); }); vt.start();
- Executors.newVirtualThreadPerTaskExecutor():创建虚拟线程
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); executor.submit(()->{ System.out.println("Do virtual thread."); });
2 生成MDC traceId
分成两步走,主线程的通过拦截器设置,@Async创建的子线程通过创建时自动复制过去
2.1 主线程
- 拦截器 LogTraceIdInterceptor.java
public class LogTraceIdInterceptor implements HandlerInterceptor {
private static final String TRACE_ID = "TRACE_ID";
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
String tid = UUID.randomUUID().toString().replace("-", "");
if (StringUtils.hasText(request.getHeader(TRACE_ID))) {
tid = request.getHeader(TRACE_ID);
}
MDC.put(TRACE_ID, tid);
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
MDC.remove(TRACE_ID);
}
}
- 配置生效
@Configuration
public class WebConfigurerAdapter implements WebMvcConfigurer {
@Bean
public LogTraceIdInterceptor logTraceIdInterceptor() {
return new LogTraceIdInterceptor();
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(logTraceIdInterceptor());
}
}
2.2 子线程
- CustomVirtualThreadTaskExecutor.java
public class CustomVirtualThreadTaskExecutor extends TaskExecutorAdapter {
public CustomVirtualThreadTaskExecutor() {
super(Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("virtual-async#", 1).factory()));
}
@Override
public void execute(@NotNull Runnable task) {
super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public <T> Future<T> submit(@NotNull Callable<T> task) {
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public Future<?> submit(@NotNull Runnable task) {
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
}
- ThreadMdcUtil.java
public final class ThreadMdcUtil {
private static final String TRACE_ID = "TRACE_ID";
private ThreadMdcUtil(){}
/**
* 获取唯一性标识
*
* @return
*/
public static String generateTraceId() {
return UUID.randomUUID().toString();
}
public static void setTraceIdIfAbsent() {
if (MDC.get(TRACE_ID) == null) {
MDC.put(TRACE_ID, generateTraceId());
}
}
/**
* 用于父线程向线程池中提交任务时,将自身MDC中的数据复制给子线程
*
* @param callable
* @param context
* @param <T>
* @return
*/
public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
return () -> {
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
setTraceIdIfAbsent();
try {
return callable.call();
} finally {
MDC.clear();
}
};
}
/**
* 用于父线程向线程池中提交任务时,将自身MDC中的数据复制给子线程
*
* @param runnable
* @param context
* @return
*/
public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
return () -> {
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
setTraceIdIfAbsent();
try {
runnable.run();
} finally {
MDC.clear();
}
};
}
}
- 注册 Executor
将此前的ThreadVirtualConfig
配置调整为如下:
@Configuration
@ConditionalOnProperty(prefix = "spring.threads.virtual", name = "enabled", havingValue = "true")
public class ThreadVirtualConfig {
/**
* Async 注解支持
*
* @return
*/
@Bean
public Executor taskExecutor() {
return new CustomVirtualThreadTaskExecutor();
}
}
3 验证
日志打印配置输出
通过以上配置,验证如下: