0.前言
长期以来,虚拟线程是 Java 中最重要的创新之一。 它们是在 Project Loom 中开发的,自 Java 19 作为预览功能以来一直包含在 JDK 中,自 Java 21 作为最终版本 (JEP 444) 以来,它们已包含在 JDK 中。
1.虚拟线程的作用
任何曾经在高并发高可用场景下维护过后端应用程序的人都知道线程通常是瓶颈。 对于每个传入的请求,都需要一个线程来处理该请求。 一个 Java 线程对应一个操作系统线程,这些线程非常消耗资源:
操作系统线程为堆栈保留 1 MB 并预先提交 32 或 64 KB,具体取决于操作系统。
启动一个操作系统线程大约需要1ms。
上下文切换发生在内核空间中,并且非常消耗 CPU 资源。
起始数量不应超过几千; 否则,您将面临整个系统稳定性的风险。
然而,几千个并不总是足够的——特别是如果因为需要等待阻塞数据结构(例如队列、锁或数据库、微服务或云 API 等外部服务)而需要更长的时间来处理请求。
例如,如果一个请求需要两秒钟,并且我们将线程池限制为 1,000 个线程,那么每秒最多可以响应 500 个请求。 然而,CPU 远未得到充分利用,因为即使每个 CPU 核心有多个线程,它也会花费大部分时间等待外部服务的响应。
到目前为止,我们只能通过异步编程来克服这个问题——例如,使用 CompletableFuture 或 RxJava 和 Project Reactor 等反应式框架。
然而,任何必须维护如下代码的人都知道,反应式代码比顺序代码复杂很多倍——而且绝对没有乐趣。
public CompletionStage<Response> getProduct(String productId) {
return productService
.getProductAsync(productId)
.thenCompose(
product -> {
if (product.isEmpty()) {
return CompletableFuture.completedFuture(
Response.status(Status.NOT_FOUND).build());
}
return warehouseService
.isAvailableAsync(productId)
.thenCompose(
available ->
available
? CompletableFuture.completedFuture(0)
: supplierService.getDeliveryTimeAsync(
product.get().supplier(), productId))
.thenApply(
daysUntilShippable ->
Response.ok(
new ProductPageResponse(
product.get(), daysUntilShippable))
.build());
});
}
这段代码不仅难以阅读和维护,而且调试起来也极其困难。 例如,在这里设置断点是没有意义的,因为代码只定义了异步流程,但并不执行它。 稍后业务代码会在单独的线程池中执行。
此外,数据库驱动程序和其他外部服务的驱动程序也必须支持异步、非阻塞模型。
2.虚拟线程的定义
虚拟线程以一种再次允许我们编写易于阅读和维护的代码的方式解决了这个问题。 从 Java 代码的角度来看,虚拟线程感觉就像普通线程,但它们并未 1:1 映射到操作系统线程。
相反,有一个所谓的载体线程池,虚拟线程临时映射(“安装”)到该载体线程上。 一旦虚拟线程遇到阻塞操作,虚拟线程就会从载体线程中移除(“卸载”),并且载体线程可以执行另一个虚拟线程(新的或之前被阻塞的虚拟线程)。
下图描述了从虚拟线程到载体线程,进而到操作系统线程的这种 M:N 映射:
载体线程池是一个 ForkJoinPool,即每个线程都有自己的队列的池,如果自己的队列为空,则可以从其他线程的队列中“窃取”任务。 其大小默认设置为 Runtime.getRuntime().availableProcessors(),并且可以使用 VM 选项 jdk.virtualThreadScheduler.parallelism 进行调整。
随着时间的推移,三个任务的 CPU 活动,例如,每个任务执行3次代码,并阻塞 了2 次,可以映射到单个载体线程,如下所示:
因此,阻塞操作不再阻塞正在执行的载体线程,我们可以使用小型载体线程池同时处理大量请求。
然后我们可以像这样简单地实现上面的示例用例:
public ProductPageResponse getProduct(String productId) {
Product product = productService.getProduct(productId)
.orElseThrow(NotFoundException::new);
boolean available = warehouseService.isAvailable(productId);
int shipsInDays =
available ? 0 : supplierService.getDeliveryTime(product.supplier(), productId);
return new ProductPageResponse(product, shipsInDays);
}
该代码不仅更易于编写和阅读,而且像任何顺序代码一样可以通过传统方式进行调试。
如果您的代码已经像这样 - 即您从未切换到异步编程,那么我有个好消息:您可以继续使用虚拟线程不变的代码。
3.示例
我们还可以在没有后端框架的情况下演示虚拟线程的强大功能。 为此,我们模拟了与上述场景类似的场景:我们启动 1,000 个任务,每个任务等待一秒(以模拟对外部 API 的访问),然后返回结果(示例中为随机数)。
首先我们执行任务:
public class Task implements Callable<Integer> {
private final int number;
public Task(int number) {
this.number = number;
}
@Override
public Integer call() {
System.out.printf(
"Thread %s - Task %d waiting...%n", Thread.currentThread().getName(), number);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.printf(
"Thread %s - Task %d canceled.%n", Thread.currentThread().getName(), number);
return -1;
}
System.out.printf(
"Thread %s - Task %d finished.%n", Thread.currentThread().getName(), number);
return ThreadLocalRandom.current().nextInt(100);
}
}
现在我们测量由 100 个平台线程(这是非虚拟线程的称呼)组成的池处理所有 1,000 个任务所需的时间:
try (ExecutorService executor = Executors.newFixedThreadPool(100)) {
List<Task> tasks = new ArrayList<>();
for (int i = 0; i < 1_000; i++) {
tasks.add(new Task(i));
}
long time = System.currentTimeMillis();
List<Future<Integer>> futures = executor.invokeAll(tasks);
long sum = 0;
for (Future<Integer> future : futures) {
sum += future.get();
}
time = System.currentTimeMillis() - time;
System.out.println("sum = " + sum + "; time = " + time + " ms");
}
从 Java 19 开始,ExecutorService 是可自动关闭的,即它可以被 try-with-resources 块包围。 在该块的末尾,调用 ExecutorService.close(),这又调用 shutdown() 和awaitTermination() – 如果线程在awaitTermination() 期间被中断,可能还调用shutdownNow()。
该程序运行了 10 秒多一点。 这是意料之中的:
1,000 个任务除以 100 个线程 = 每个线程 10 个任务
每个平台线程必须顺序处理十个任务,每个任务持续约一秒。
接下来,我们使用虚拟线程测试整个过程。 因此,我们只需要,将
Executors.newFixedThreadPool(100)
替换为
Executors.newVirtualThreadPerTaskExecutor()
该执行器不使用线程池,而是为每个任务创建一个新的虚拟线程。
之后程序就不再需要10秒,而只需要1秒多一点。 它几乎不可能更快,因为每个任务都会等待一秒钟。
令人印象深刻:我们的小程序甚至可以在一秒多一点的时间内处理 10,000 个任务。
只有在执行 100,000 个任务时,吞吐量才会显着下降:我的笔记本电脑需要大约 4 秒才能完成此任务,这与线程池相比仍然快得惊人,线程池需要近 17 分钟才能完成此任务。
4.如何创建虚拟线程
我们已经了解了一种创建虚拟线程的方法:我们使用 Executors.newVirtualThreadPerTaskExecutor() 创建的执行程序服务为每个任务创建一个新的虚拟线程。
使用Thread.startVirtualThread()或Thread.ofVirtual().start(),我们还可以显式启动虚拟线程:
Thread.startVirtualThread(() -> {
// code to run in thread
});
Thread.ofVirtual().start(() -> {
// code to run in thread
});
在第二种变体中,Thread.ofVirtual() 返回一个 VirtualThreadBuilder,其 start() 方法启动一个虚拟线程。 另一种方法 Thread.ofPlatform() 返回一个 PlatformThreadBuilder,通过它我们可以启动一个平台线程。
两个构建器都实现了 Thread.Builder 接口。 这使我们能够编写灵活的代码,在运行时决定是否应该在虚拟线程或平台线程中运行:
Thread.Builder threadBuilder = createThreadBuilder();
threadBuilder.start(() -> {
// code to run in thread
});
顺便说一句,您可以使用 Thread.currentThread().isVirtual() 来了解代码是否在虚拟线程中运行。
5.如何衡量特定的系统环境可以运行多少个虚拟线程
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
public class HowManyVirtualThreadsDoingSomething {
// Activate GC logging: -Xlog:gc*
// jmap -histo <pid>
private static final int NUMBER_OF_VIRTUAL_THREADS = 1_000_000_000;
private static final int PRINT_STEP = Math.min(NUMBER_OF_VIRTUAL_THREADS / 10, 100_000);
public static void main(String[] args) throws InterruptedException {
AtomicLong runningThreadsCounter = new AtomicLong();
long startTime = System.currentTimeMillis();
for (int i = 1; i <= NUMBER_OF_VIRTUAL_THREADS; i++) {
Thread.ofVirtual()
.start(
() -> {
runningThreadsCounter.incrementAndGet();
HowManyThreadsHelper.doSomething();
});
if (i % PRINT_STEP == 0) {
long runningThreads = runningThreadsCounter.get();
long time = System.currentTimeMillis() - startTime;
System.out.printf(
"%,d virtual threads started, %,d virtual threads running after %,d ms%n",
i, runningThreads, time);
if (i - runningThreads > 200_000) {
HowManyThreadsHelper.waitForVirtualThreadsToCatchUp(i, runningThreadsCounter, startTime);
}
}
}
HowManyThreadsHelper.waitForVirtualThreadsToCatchUp(
NUMBER_OF_VIRTUAL_THREADS, runningThreadsCounter, startTime);
// Sleep, so we can look at the memory usage
Thread.sleep(Duration.ofHours(1));
}
}
通过上述程序可以大概估测出当前设置参数下启动的JVM可以同时运行多少个虚拟线程?应用程序启动越来越多的线程,并在这些线程中无限循环地执行 Thread.sleep() 操作,以模拟等待来自数据库或外部 API 的响应。 尝试使用 VM 选项 -Xmx 为程序提供尽可能多的堆内存。
在我的 64 GB 机器上,可以毫无问题地启动 20,000,000 个虚拟线程,只要有一点耐心,甚至可以启动 30,000,000 个。 从那时起,垃圾收集器尝试不间断地执行完整的GC - 因为一旦虚拟线程阻塞,虚拟线程的堆栈就会“停放在”堆上,即所谓的StackChunk对象中。 不久之后,应用程序因 OutOfMemoryError 终止。
使用 HowManyPlatformThreadsDoingSomething 类,您还可以测试系统支持多少个平台线程。 但请注意:大多数情况下,程序会在某个时刻以 OutOfMemoryError 结束(对我来说是 80,000 到 90,000 个线程),但它也可能导致计算机崩溃。
6.虚拟线程的使用
6.1.Jakarta EE
我们只需要添加一行,并带有注释@RunOnVirtualThread:
@GET
@Path("/product/{productId}")
public ProductPageResponse getProduct(@PathParam("productId") String productId) {
Product product = productService.getProduct(productId)
.orElseThrow(NotFoundException::new);
boolean available = warehouseService.isAvailable(productId);
int shipsInDays =
available ? 0 : supplierService.getDeliveryTime(product.supplier(), productId);
return new ProductPageResponse(product, shipsInDays);
}
@RunOnVirtualThread 在2024 年第一季度发布的Jakarta EE 11 中有定义。
6.2.Quarkus
自版本 2.10(即自 2022 年 6 月)起,Quarkus 已经支持 Jakarta EE 11 中定义的 @RunOnVirtualThread 注释。因此,在当前的 Quarkus 版本中,您已经可以使用上面显示的代码。
在此 GitHub 存储库中,您将找到一个示例 Quarkus 应用程序,其控制器如上所示 - 一个具有平台线程,一个具有虚拟线程,还有一个带有 CompletableFuture 的异步变体。 自述文件解释了如何启动应用程序以及如何调用三个控制器。
6.3.Spring
在 Spring 中,控制器看起来像这样:
@GetMapping("/stage1-seq/product/{productId}")
public ProductPageResponse getProduct(@PathVariable("productId") String productId) {
Product product =
productService
.getProduct(productId)
.orElseThrow(() -> new ResponseStatusException(NOT_FOUND));
boolean available = warehouseService.isAvailable(productId);
int shipsInDays =
available ? 0 : supplierService.getDeliveryTime(product.supplier(), productId);
return new ProductPageResponse(product, shipsInDays);
}
然而,要切换到虚拟线程,您需要做一些不同的事情。 根据 Spring 文档,您必须定义以下两个 bean:
@Bean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME)
public AsyncTaskExecutor asyncTaskExecutor() {
return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());
}
@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutorCustomizer() {
return protocolHandler -> {
protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
};
}
然而,这会导致所有控制器都在虚拟线程上运行,这对于大多数用例来说可能没问题,但对于 CPU 密集型任务则不然——这些任务应该始终在平台线程上运行。
7.虚拟线程的优势
7.1.建议使用虚拟线程的场景
如果您有许多任务需要并发处理,并且主要包含阻塞操作,则应该使用虚拟线程。
对于大多数服务器应用程序来说都是如此。 但是,如果您的服务器应用程序处理 CPU 密集型任务,则应该为它们使用系统线程。
7.2.优点
首先,它们价格便宜:
- 它们的创建速度比平台线程快得多:创建平台线程大约需要 1 毫秒,创建虚拟线程则需要不到 1 微秒。
- 它们需要更少的内存:平台线程为堆栈保留 1 MB 并预先提交 32 到 64 KB,具体取决于操作系统。 虚拟线程以大约 1 KB 开始。 但是,这只适用于平面调用堆栈。 半兆字节大小的调用堆栈在两个线程变体中都需要半兆字节。
- 阻塞虚拟线程的成本很低,因为阻塞的虚拟线程不会阻塞操作系统线程。 但是,它不是免费的,因为它的堆栈需要复制到堆。
- 上下文切换速度很快,因为它们是在用户空间而不是内核空间中执行的,并且 JVM 中进行了大量优化以使其更快。
其次,我们可以以熟悉的方式使用虚拟线程:
- 仅对 Thread 和 ExecutorService API 进行了极少的更改。
- 我们可以用传统的阻塞线程每个请求的风格编写代码,而不是编写带有回调的异步代码。
- 我们可以使用现有工具调试、观察和分析虚拟线程。
8.虚拟线程的劣势
8.1.不应该使用虚拟线程的场景
- 虚拟线程并不是更快的线程——它们在相同的时间内无法比平台线程执行更多的 CPU 指令。
- 它们不是抢占式的:当虚拟线程正在执行 CPU 密集型任务时,它不会从承载线程中卸载。 所以如果你有20个载体线程和20个占用CPU而不阻塞的虚拟线程,则不会执行其他虚拟线程。
- 它们不提供比平台线程更高级别的抽象。 您需要了解使用常规线程时也需要了解的所有微妙事项。 也就是说,如果虚拟线程访问共享数据,就必须处理可见性问题,必须同步原子操作等等。
8.2.不支持的阻塞操作
尽管JDK中绝大多数的阻塞操作都已经被重写以支持虚拟线程,但仍然有一些操作不会从载体线程中卸载虚拟线程:
文件 I/O – 这也将在不久的将来进行调整,对象.wait()在这两种情况下,阻塞的虚拟线程也会阻塞载体线程。 为了弥补这一点,这两个操作都会暂时增加承载线程的数量 - 最多 256 个线程,可以通过 VM 选项 jdk.virtualThreadScheduler.maxPoolSize 进行更改。
8.3. 固定
固定意味着通常从其载体线程卸载虚拟线程的阻塞操作不会这样做,因为虚拟线程已“固定”到其载体线程 - 这意味着不允许更改载体线程。 这种情况发生在两种情况下:
在同步块内,如果调用堆栈包含对本机代码的调用原因是在这两种情况下,指向堆栈上内存地址的指针都可以存在。 如果堆栈在卸载时停放在堆上并在安装时移回到堆栈上,则它可能最终位于不同的内存地址。 这将使这些指针无效。
使用 VM 选项 -Djdk.tracePinnedThread=full/short 当虚拟线程在固定时发生阻塞时,您可以获得完整/短堆栈跟踪。
您可以使用 ReentrantLock 替换阻塞操作周围的同步块。
8.4. 线程转储中没有锁
线程转储当前不包含有关虚拟线程持有的锁或阻塞虚拟线程的数据。 因此,它们不会显示虚拟线程之间或虚拟线程与平台线程之间的死锁。
9.带有虚拟线程的线程转储
通过 jcmd Thread.print 打印的传统线程转储不包含虚拟线程。 原因是此命令会停止虚拟机以创建正在运行的线程的快照。 这对于几百甚至几千个线程来说是可行的,但对于数百万个线程来说是不可行的。
因此,已经实现了线程转储的新变体,它不会停止VM(因此,线程转储本身可能不一致),但它包含虚拟线程作为回报。 可以使用以下两个命令之一创建这个新的线程转储:
jcmd <pid> Thread.dump_to_file -format=plain <文件>
jcmd <pid> Thread.dump_to_file -format=json <文件>
第一个命令生成与传统线程转储类似的线程转储,其中包含线程名称、ID 和堆栈跟踪。 第二个命令生成一个 JSON 格式的文件,其中还包含有关线程容器、父容器和所有者线程的信息。
10.注意事项
以下是有关使用和迁移到虚拟线程的一些技巧:
虚拟线程是新事物,与异步或反应式框架相比,我们还没有太多使用它们的经验。 因此,在将应用程序部署到生产环境之前,您应该使用虚拟线程对应用程序进行集中测试。
尽管许多有关虚拟线程的文章让我们相信这一点:它们本质上并不比平台线程使用更少的内存。 仅当调用堆栈很浅时才会出现这种情况。 对于深调用堆栈,两种类型的线程消耗相同的内存量。 所以这同样适用于这里:集中测试!
虚拟线程不需要池化。 池用于共享昂贵的资源。 另一方面,虚拟线程非常便宜,因此最好在需要时创建一个虚拟线程,并在不再需要时终止它。
如果需要限制对资源的访问,例如有多少线程可以同时访问数据库或 API,请使用信号量而不是线程池。
大部分虚拟线程代码是用 Java 编写的。 因此,您必须在运行性能测试之前预热 JVM,以便在测量开始之前编译和优化所有字节码。
11.总结
虚拟线程兑现了它们的承诺:它们允许我们编写可读且可维护的顺序代码,这些代码在等待锁、阻塞数据结构或来自文件系统或外部服务的响应时不会阻塞操作系统线程。
可以创建数百万个虚拟线程。
Spring和Quarkus等常见后端框架已经可以处理虚拟线程。 尽管如此,当您切换到虚拟线程时,您应该集中测试应用程序。 例如,请确保您不会对它们执行 CPU 密集型计算任务,它们不会被框架池化,并且它们中不会存储任何 ThreadLocal(另请参阅作用域值)。