0.前言
结构化并发功能是在JDK19中的JEP 428开始孵化,然后在JDK21中的JEP 453出第一版预览版,至今在JDK22中的JEP 462出第二版预览版。结构化并发和虚拟线程、作用域值等特性是在OpenJDK Loom项目中进行开发维护等。
1.什么是非结构化并发?
“非结构化并发”意味着我们的任务在错综复杂的线程网络中运行,其开始和结束在代码中很难看到。通常不存在干净的错误处理,当控制结构执行结束时,通常会出现孤立线程。
1.1.以电商开发票场景为例(不考虑支付相关、物流相关环节)
package org.sc.entity;
/**
* 用户实体
*/
public record Customer() {}
package org.sc.entity;
/**
* 订单实体
*/
public record Order() {}
package org.sc.entity;
/**
* 发票实体
*/
public record Invoice() {
public static Invoice generate(Order order, Customer customer, InvoiceTemplate template) {
return new Invoice();
}
}
package org.sc.entity;
/**
* 发票模版实体
*/
public record InvoiceTemplate() {}
package org.sc.service;
import org.sc.config.Configuration;
import org.sc.entity.Customer;
import org.sc.log.Log;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
/**
* 用户服务
*/
public class CustomerService {
/**
* 获取用户信息
* @param customerId
* @return
* @throws InterruptedException
*/
public Customer getCustomer(int customerId) throws InterruptedException {
Log.log("获取用户信息...");
try {
long minSleepTime = Configuration.PRESENTATION_MODE ? 300 : 500;
long maxSleepTime = Configuration.PRESENTATION_MODE ? 400 : 1000;
Thread.sleep(ThreadLocalRandom.current().nextLong(minSleepTime, maxSleepTime));
} catch (InterruptedException e) {
e.printStackTrace();
Log.log("获取用户信息过程中出现错误!");
throw e;
}
if (!Configuration.PRESENTATION_MODE) {
if (ThreadLocalRandom.current().nextDouble() < 0.2) {
Log.log("获取用户信息过程中出现错误!");
throw new RuntimeException("获取用户信息过程中出现错误!");
}
}
Log.log("获取用户信息完成.");
return new Customer();
}
/**
* 异步获取用户信息
* @param customerId
* @return
*/
public CompletableFuture<Customer> getCustomerAsync(int customerId) {
return CompletableFuture.supplyAsync(
() -> {
try {
return getCustomer(customerId);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
package org.sc.service;
import org.sc.config.Configuration;
import org.sc.entity.Order;
import org.sc.log.Log;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
/**
* 订单服务
*/
public class OrderService {
public Order getOrder(int orderId) throws InterruptedException {
Log.log("获取订单信息...");
try {
long minSleepTime = Configuration.PRESENTATION_MODE ? 900 : 500;
long maxSleepTime = 1000;
Thread.sleep(ThreadLocalRandom.current().nextLong(minSleepTime, maxSleepTime));
} catch (InterruptedException e) {
e.printStackTrace();
Log.log("获取订单信息失败!");
throw e;
}
if (!Configuration.PRESENTATION_MODE) {
if (ThreadLocalRandom.current().nextDouble() < 0.2) {
Log.log("获取订单信息失败!");
throw new RuntimeException("获取订单信息失败!");
}
}
Log.log("获取订单信息完成.");
return new Order();
}
public CompletableFuture<Order> getOrderAsync(int orderId) {
return CompletableFuture.supplyAsync(
() -> {
try {
return getOrder(orderId);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
package org.sc.service;
import org.sc.config.Configuration;
import org.sc.entity.InvoiceTemplate;
import org.sc.log.Log;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
/**
* 发票模版服务
*/
public class InvoiceTemplateService {
/**
* 获取发票模版
* @param language
* @return
* @throws InterruptedException
*/
public InvoiceTemplate getTemplate(String language) throws InterruptedException {
Log.log("获取发票模版信息.");
try {
long minSleepTime = Configuration.PRESENTATION_MODE ? 600 : 500;
long maxSleepTime = Configuration.PRESENTATION_MODE ? 700 : 1000;
Thread.sleep(ThreadLocalRandom.current().nextLong(minSleepTime, maxSleepTime));
} catch (InterruptedException e) {
e.printStackTrace();
Log.log("获取发票模版信息错误!");
throw e;
}
double errorProbability = Configuration.PRESENTATION_MODE ? 0.5 : 0.2;
if (ThreadLocalRandom.current().nextDouble() < errorProbability) {
Log.log("获取发票模版错误!");
throw new RuntimeException("获取发票模版错误!");
}
Log.log("获取发票模版完成.");
return new InvoiceTemplate();
}
/**
* 异步获取发票模版
* @param language
* @return
*/
public CompletableFuture<InvoiceTemplate> getTemplateAsync(String language) {
return CompletableFuture.supplyAsync(
() -> {
try {
return getTemplate(language);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
package org.sc.log;
/**
* 日志
*/
public final class Log {
private Log() {}
public static void log(String description) {
System.out.printf("[%s] %s%n", Thread.currentThread(), description);
}
}
package org.sc.config;
/**
* 配置
*/
public class Configuration {
// 在演示模式下,服务的结果是可预测的
public static final boolean PRESENTATION_MODE = true;
}
package org.sc.threadpool;
import org.sc.entity.Customer;
import org.sc.entity.Invoice;
import org.sc.entity.InvoiceTemplate;
import org.sc.entity.Order;
import org.sc.log.Log;
import org.sc.service.CustomerService;
import org.sc.service.InvoiceTemplateService;
import org.sc.service.OrderService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 发票生成器
*/
public class InvoiceGenerator {
private final OrderService orderService;
private final CustomerService customerService;
private final InvoiceTemplateService invoiceTemplateService;
private final ExecutorService executor = Executors.newCachedThreadPool();
public InvoiceGenerator(
OrderService orderService,
CustomerService customerService,
InvoiceTemplateService invoiceTemplateService) {
this.orderService = orderService;
this.customerService = customerService;
this.invoiceTemplateService = invoiceTemplateService;
}
/**
* 创建发票
* @param orderId
* @param customerId
* @param language
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
Invoice createInvoice(int orderId, int customerId, String language)
throws InterruptedException, ExecutionException {
Log.log("开始提交任务");
Log.log("1.获取订单信息...");
Future<Order> orderFuture = executor.submit(() -> orderService.getOrder(orderId));
Log.log("2.获取用户信息...");
Future<Customer> customerFuture =
executor.submit(() -> customerService.getCustomer(customerId));
Log.log("3.获取发票模版信息...");
Future<InvoiceTemplate> invoiceTemplateFuture =
executor.submit(() -> invoiceTemplateService.getTemplate(language));
Log.log("4.获取订单信息中...");
Order order = orderFuture.get();
Log.log("5.获取用户信息中...");
Customer customer = customerFuture.get();
Log.log("6.获取发票模版信息中...");
InvoiceTemplate invoiceTemplate = invoiceTemplateFuture.get();
Log.log("发票创建完成.");
return Invoice.generate(order, customer, invoiceTemplate);
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
InvoiceGenerator invoiceGenerator =
new InvoiceGenerator(
new OrderService(), new CustomerService(), new InvoiceTemplateService());
invoiceGenerator.createInvoice(10012, 61157, "en");
}
}
javac --enable-preview --source 22 -d target/classes src/main/java/org/sc/config/*.java src/main/java/org/sc/entity/*.java src/main/java/org/sc/log/*.java src/main/java/org/sc/service/*.java src/test/java/org/sc/threadpool/*.java
java -cp target/classes org.sc.threadpool/InvoiceGenerator
2.什么是结构化并发?
结构化并发的特点是并发子任务的起点和终点在代码中清晰可见。子任务中的错误将传播到父范围。这使得代码更易于阅读和维护,并确保所有启动的线程在作用域末尾完成。
3.结构化任务范围
3.1.以电商开发票场景为例(不考虑支付相关、物流相关环节)
package org.sc.structuredtaskscope;
import org.sc.entity.Customer;
import org.sc.entity.Invoice;
import org.sc.entity.InvoiceTemplate;
import org.sc.entity.Order;
import org.sc.log.Log;
import org.sc.service.CustomerService;
import org.sc.service.InvoiceTemplateService;
import org.sc.service.OrderService;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;
public class InvoiceGenerator {
private final OrderService orderService;
private final CustomerService customerService;
private final InvoiceTemplateService invoiceTemplateService;
public InvoiceGenerator(
OrderService orderService,
CustomerService customerService,
InvoiceTemplateService invoiceTemplateService) {
this.orderService = orderService;
this.customerService = customerService;
this.invoiceTemplateService = invoiceTemplateService;
}
Invoice createInvoice(int orderId, int customerId, String language) throws InterruptedException {
try (var scope = new StructuredTaskScope<>()) {
Log.log("开始提交任务");
Subtask<Order> orderSubtask = scope.fork(() -> orderService.getOrder(orderId));
Subtask<Customer> customerSubtask = scope.fork(() -> customerService.getCustomer(customerId));
Subtask<InvoiceTemplate> invoiceTemplateSubtask =
scope.fork(() -> invoiceTemplateService.getTemplate(language));
Log.log("等待所有任务完成");
scope.join();
Log.log("检索结果");
Order order = orderSubtask.get();
Customer customer = customerSubtask.get();
InvoiceTemplate template = invoiceTemplateSubtask.get();
Log.log("发票创建完成.");
return Invoice.generate(order, customer, template);
}
}
public static void main(String[] args) throws InterruptedException {
InvoiceGenerator invoiceGenerator =
new InvoiceGenerator(
new OrderService(), new CustomerService(), new InvoiceTemplateService());
invoiceGenerator.createInvoice(10012, 61157, "en");
}
}
javac --enable-preview --source 22 -Xlint:preview -d target/classes src/main/java/org/sc/config/*.java src/main/java/org/sc/entity/*.java src/main/java/org/sc/log/*.java src/main/java/org/sc/service/*.java src/test/java/org/sc/structuredtaskscope/*.java
java -cp target/classes --enable-preview org.sc.structuredtaskscope/InvoiceGenerator
4.结构化任务范围策略
在多线程并发场景下,如果任何一个线程发生异常该怎么处理呢?这里就会涉及结构化任务范围的处理策略。
4.1.失败时关闭策略
使用“失败时关闭”策略,我们可以指定一个任务发生异常将导致所有其他任务终止。
package org.sc.structuredtaskscope.policy;
import org.sc.entity.Customer;
import org.sc.entity.Invoice;
import org.sc.entity.InvoiceTemplate;
import org.sc.entity.Order;
import org.sc.log.Log;
import org.sc.service.CustomerService;
import org.sc.service.InvoiceTemplateService;
import org.sc.service.OrderService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.StructuredTaskScope;
public class InvoiceGenerator {
private final OrderService orderService;
private final CustomerService customerService;
private final InvoiceTemplateService invoiceTemplateService;
public InvoiceGenerator(
OrderService orderService,
CustomerService customerService,
InvoiceTemplateService invoiceTemplateService) {
this.orderService = orderService;
this.customerService = customerService;
this.invoiceTemplateService = invoiceTemplateService;
}
Invoice createInvoice(int orderId, int customerId, String language)
throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Log.log("开始提交任务");
StructuredTaskScope.Subtask<Order> orderSubtask = scope.fork(() -> orderService.getOrder(orderId));
StructuredTaskScope.Subtask<Customer> customerSubtask = scope.fork(() -> customerService.getCustomer(customerId));
StructuredTaskScope.Subtask<InvoiceTemplate> invoiceTemplateSubtask =
scope.fork(() -> invoiceTemplateService.getTemplate(language));
Log.log("等待所有任务完成");
scope.join();
scope.throwIfFailed();
Log.log("检索结果");
Order order = orderSubtask.get();
Customer customer = customerSubtask.get();
InvoiceTemplate template = invoiceTemplateSubtask.get();
Log.log("发票创建完成.");
return Invoice.generate(order, customer, template);
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
InvoiceGenerator invoiceGenerator =
new InvoiceGenerator(
new OrderService(), new CustomerService(), new InvoiceTemplateService());
invoiceGenerator.createInvoice(10012, 61157, "en");
}
}
javac --enable-preview --source 22 -Xlint:preview -d target/classes src/main/java/org/sc/config/*.java src/main/java/org/sc/entity/*.java src/main/java/org/sc/log/*.java src/main/java/org/sc/service/*.java src/test/java/org/sc/structuredtaskscope/policy/*.java
java -cp target/classes --enable-preview org.sc.structuredtaskscope.policy/InvoiceGenerator
4.2.成功时关闭策略
只要一个子任务成功,作用域就会终止。其他子任务将终止。
package org.sc.entity;
/**
* 地址
*/
public record Address(
String addressLine1,
String addressLine2,
String city,
String state,
String postalCode,
String countryCode) {}
package org.sc.entity;
/**
* 地址验证结果
*/
public record AddressVerificationResponse(String value) {}
package org.sc.service;
import org.sc.config.Configuration;
import org.sc.entity.Address;
import org.sc.entity.AddressVerificationResponse;
import org.sc.log.Log;
import java.util.concurrent.ThreadLocalRandom;
/**
* 地址验证服务
*/
public class AddressVerificationService {
public AddressVerificationResponse verifyViaServiceA(Address address)
throws InterruptedException {
return verifyViaService(address, "A");
}
public AddressVerificationResponse verifyViaServiceB(Address address)
throws InterruptedException {
return verifyViaService(address, "B");
}
public AddressVerificationResponse verifyViaServiceC(Address address)
throws InterruptedException {
return verifyViaService(address, "C");
}
private AddressVerificationResponse verifyViaService(Address address, String service)
throws InterruptedException {
Log.log("通过服务" + service+" 去验证地址。");
try {
// In presentation mode, we want the services to return in a predictable order
long minSleepTime =
Configuration.PRESENTATION_MODE
? (switch (service) {
case "A" -> 900;
case "B" -> 300;
case "C" -> 600;
default -> throw new IllegalStateException();
})
: 500;
long maxSleepTime = Configuration.PRESENTATION_MODE ? minSleepTime + 100 : 1000;
Thread.sleep(ThreadLocalRandom.current().nextLong(minSleepTime, maxSleepTime));
} catch (InterruptedException e) {
e.printStackTrace();
Log.log("通过服务" + service + " 去验证地址错误!");
throw e;
}
// 在演示模式下,
// a) 所有的地址都不可达,会得到一个错误
// b) 所有地址都可达到,会得到第一个响应都地址,会取消其它两个
// c) 第一不可达,会得到第二个可达都响应,取消第三个
double errorProbability =
Configuration.PRESENTATION_MODE ? (service.equals("B") ? 0.5 : 1.0) : 0.75;
if (ThreadLocalRandom.current().nextDouble() < errorProbability) {
Log.log("通过服务" + service+ " 去验证地址错误!");
throw new RuntimeException("通过服务" + service+ " 去验证地址错误!");
}
Log.log("通过服务" + service+ " 完成了地址验证.");
return new AddressVerificationResponse("来自服务" + service+" 验证响应.");
}
}
package org.sc.structuredtaskscope.policy;
import org.sc.entity.Address;
import org.sc.entity.AddressVerificationResponse;
import org.sc.log.Log;
import org.sc.service.AddressVerificationService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.StructuredTaskScope;
public class ShutdownOnSuccess {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ShutdownOnSuccess addressVerification =
new ShutdownOnSuccess(new AddressVerificationService());
AddressVerificationResponse response =
addressVerification.verifyAddress(
new Address("四川省成都市天府新区华阳富强街66号1层", null, "成都", "CD", "610213", "ZH"));
Log.log("结果: " + response);
}
private final AddressVerificationService verificationService;
public ShutdownOnSuccessInvoiceGenerator(AddressVerificationService verificationService) {
this.verificationService = verificationService;
}
AddressVerificationResponse verifyAddress(Address address)
throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<AddressVerificationResponse>()) {
Log.log("开始提交任务");
scope.fork(() -> verificationService.verifyViaServiceA(address));
scope.fork(() -> verificationService.verifyViaServiceB(address));
scope.fork(() -> verificationService.verifyViaServiceC(address));
Log.log("等待所有任务完成");
scope.join();
Log.log("检索结果");
return scope.result();
}
}
}
javac --enable-preview --source 22 -Xlint:preview -d target/classes src/main/java/org/sc/config/*.java src/main/java/org/sc/entity/*.java src/main/java/org/sc/log/*.java src/main/java/org/sc/service/*.java src/test/java/org/sc/structuredtaskscope/policy/*.java
java -cp target/classes --enable-preview org.sc.structuredtaskscope.policy/ShutdownOnSuccess
4.3.自定义策略
假设场景:从众多服务提供商中,将最终可以成功提供服务的供应商中选择服务最好的供应商。
package org.sc.customize;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.StructuredTaskScope;
import java.util.function.Supplier;
/**
* 自定义策略
* @param <T>
*/
public class BestResultScope<T> extends StructuredTaskScope<T> {
private final Comparator<T> comparator;
private T bestResult;
private final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<>());
public BestResultScope(Comparator<T> comparator) {
this.comparator = comparator;
}
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
switch (subtask.state()) {
case UNAVAILABLE -> {
// Ignore
}
case SUCCESS -> {
T result = subtask.get();
synchronized (this) {
if (bestResult == null || comparator.compare(result, bestResult) > 0) {
bestResult = result;
}
}
}
case FAILED -> exceptions.add(subtask.exception());
}
}
public <X extends Throwable> T resultOrElseThrow(Supplier<? extends X> exceptionSupplier)
throws X {
ensureOwnerAndJoined();
if (bestResult != null) {
return bestResult;
} else {
X exception = exceptionSupplier.get();
exceptions.forEach(exception::addSuppressed);
throw exception;
}
}
}
package org.sc.entity;
/**
* 服务提供商信息
*/
public record SupplierDeliveryTime(String supplier, int deliveryTimeHours) {}
package org.sc.exception;
public class SupplierDeliveryTimeCheckException extends Exception {}
package org.sc.service;
import org.sc.entity.SupplierDeliveryTime;
import org.sc.log.Log;
import java.util.concurrent.ThreadLocalRandom;
/**
* 服务提供商服务处理时间服务
*/
public class SupplierDeliveryTimeService {
private final boolean failAll;
public SupplierDeliveryTimeService(boolean failAll) {
this.failAll = failAll;
}
public SupplierDeliveryTime getDeliveryTime(String productId, String supplier)
throws InterruptedException {
Log.log("服务提供商" + supplier+"的处理时间.");
try {
Thread.sleep(ThreadLocalRandom.current().nextLong(250, 1000));
} catch (InterruptedException e) {
e.printStackTrace();
Log.log("服务提供商" + supplier + " 处理错误!");
throw e;
}
// 40% failure probability --> 2 out of 5 requests should fail
if (failAll || ThreadLocalRandom.current().nextDouble() < 0.4) {
Log.log("服务提供商" + supplier+ " 处理错误!");
throw new RuntimeException("服务提供商" + supplier+ " 处理错误!");
}
int deliveryTimeHours = ThreadLocalRandom.current().nextInt(1, 7 * 24);
Log.log(
"服务提供商处理时间 %s: %d hours"
.formatted(supplier, deliveryTimeHours));
return new SupplierDeliveryTime(supplier, deliveryTimeHours);
}
}
package org.sc.structuredtaskscope.policy;
import org.sc.customize.BestResultScope;
import org.sc.entity.SupplierDeliveryTime;
import org.sc.exception.SupplierDeliveryTimeCheckException;
import org.sc.log.Log;
import org.sc.service.SupplierDeliveryTimeService;
import java.util.Comparator;
import java.util.List;
public class Customize {
private static final boolean FAIL_ALL = false;
public static void main(String[] args)
throws SupplierDeliveryTimeCheckException, InterruptedException {
Customize supplierDeliveryTimeCheck =
new Customize(
new SupplierDeliveryTimeService(FAIL_ALL));
SupplierDeliveryTime response =
supplierDeliveryTimeCheck.getSupplierDeliveryTime(
"兔子", List.of("A", "B", "C", "D", "E"));
Log.log("响应: " + response);
}
private final SupplierDeliveryTimeService service;
public Customize(SupplierDeliveryTimeService service) {
this.service = service;
}
SupplierDeliveryTime getSupplierDeliveryTime(String productId, List<String> supplierIds)
throws SupplierDeliveryTimeCheckException, InterruptedException {
try (var scope =
new BestResultScope<>(
Comparator.comparing(SupplierDeliveryTime::deliveryTimeHours).reversed())) {
for (String supplierId : supplierIds) {
scope.fork(() -> service.getDeliveryTime(productId, supplierId));
}
scope.join();
return scope.resultOrElseThrow(SupplierDeliveryTimeCheckException::new);
}
}
}
javac --enable-preview --source 22 -Xlint:preview -d target/classes src/main/java/org/sc/config/*.java src/main/java/org/sc/entity/*.java src/main/java/org/sc/log/*.java src/main/java/org/sc/service/*.java src/main/java/org/sc/customize/*.java src/main/java/org/sc/exception/*.java src/test/java/org/sc/structuredtaskscope/policy/*.java
java -cp target/classes --enable-preview org.sc.structuredtaskscope.policy/Customize
5.嵌套结构化任务范围
场景:不仅想一次查询一种产品的供应商,还想查询多种产品的供应商。
package org.sc.structuredtaskscope.policy;
import org.sc.customize.BestResultScope;
import org.sc.entity.SupplierDeliveryTime;
import org.sc.exception.SupplierDeliveryTimeCheckException;
import org.sc.log.Log;
import org.sc.service.SupplierDeliveryTimeService;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.concurrent.StructuredTaskScope.Subtask.State;
public class NestedStructuredTaskScope {
private static final boolean FAIL_ALL = false;
private final SupplierDeliveryTimeService service;
public NestedStructuredTaskScope(SupplierDeliveryTimeService service) {
this.service = service;
}
List<SupplierDeliveryTime> getSupplierDeliveryTimes(
List<String> productIds, List<String> supplierIds) throws InterruptedException {
try (var scope = new StructuredTaskScope<SupplierDeliveryTime>()) {
List<Subtask<SupplierDeliveryTime>> subtasks =
productIds.stream()
.map(productId -> scope.fork(() -> getSupplierDeliveryTime(productId, supplierIds)))
.toList();
scope.join();
return subtasks.stream()
.filter(subtask -> subtask.state() == State.SUCCESS)
.map(Subtask::get)
.toList();
}
}
SupplierDeliveryTime getSupplierDeliveryTime(String productId, List<String> supplierIds)
throws SupplierDeliveryTimeCheckException, InterruptedException {
try (var scope =
new BestResultScope<>(
Comparator.comparing(SupplierDeliveryTime::deliveryTimeHours).reversed())) {
for (String supplierId : supplierIds) {
scope.fork(() -> service.getDeliveryTime(productId, supplierId));
}
scope.join();
return scope.resultOrElseThrow(SupplierDeliveryTimeCheckException::new);
}
}
public static void main(String[] args)
throws SupplierDeliveryTimeCheckException, InterruptedException {
NestedStructuredTaskScope supplierDeliveryTimeCheck =
new NestedStructuredTaskScope(
new SupplierDeliveryTimeService(FAIL_ALL));
List<SupplierDeliveryTime> responses =
supplierDeliveryTimeCheck.getSupplierDeliveryTimes(
List.of("兔子", "猫", "狗狗"), List.of("A", "B", "C", "D", "E"));
Log.log("响应: " + responses);
}
}
javac --enable-preview --source 22 -Xlint:preview -d target/classes src/main/java/org/sc/config/*.java src/main/java/org/sc/entity/*.java src/main/java/org/sc/log/*.java src/main/java/org/sc/service/*.java src/main/java/org/sc/customize/*.java src/main/java/org/sc/exception/*.java src/test/java/org/sc/structuredtaskscope/policy/*.java
java -cp target/classes --enable-preview org.sc.structuredtaskscope.policy/NestedStructuredTaskScope
6.结构化并发的优点
- 任务和子任务在代码中形成一个独立的单元——ExecutorService在更高范围内不存在。线程不是来自线程池;相反,每个子任务都在一个新的虚拟线程中执行;
- try-with-resources 块跨越的范围导致所有线程的明确起点和终点;
- 在范围结束时,所有线程都已完成;
- 子任务内的错误会被干净地传播到父范围;
- 根据策略,如果某个子任务成功或出现错误,则剩余的子任务将被中止;
- 当调用线程被取消时,子任务也会被取消;
- 调用线程和子任务执行线程之间的调用层次结构在线程转储中可见;
- 此外,StructuredTaskScope还有助于调试:如果我们以新的 JSON 格式创建线程转储(jcmd Thread.dump_to_file -format=json ),那么它将反映父线程和子线程之间的调用层次结构。