JDK新特性之结构化并发及演示代码示例

news2025/1/20 14:58:10

0.前言

结构化并发功能是在JDK19中的JEP 428开始孵化,然后在JDK21中的JEP 453出第一版预览版,至今在JDK22中的JEP 462出第二版预览版。结构化并发和虚拟线程、作用域值等特性是在OpenJDK Loom项目中进行开发维护等。

1.什么是非结构化并发?

“非结构化并发”意味着我们的任务在错综复杂的线程网络中运行,其开始和结束在代码中很难看到。通常不存在干净的错误处理,当控制结构执行结束时,通常会出现孤立线程。
非结构化并发

1.1.以电商开发票场景为例(不考虑支付相关、物流相关环节)

以电商开发票场景为例(不考虑支付相关、物流相关环节)
源码目录

package org.sc.entity;

/**
 * 用户实体
 */
public record Customer() {}
package org.sc.entity;

/**
 * 订单实体
 */
public record Order() {}
package org.sc.entity;

/**
 * 发票实体
 */
public record Invoice() {
  public static Invoice generate(Order order, Customer customer, InvoiceTemplate template) {
    return new Invoice();
  }
}
package org.sc.entity;

/**
 * 发票模版实体
 */
public record InvoiceTemplate() {}
package org.sc.service;


import org.sc.config.Configuration;
import org.sc.entity.Customer;
import org.sc.log.Log;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 用户服务
 */
public class CustomerService {

  /**
   * 获取用户信息
   * @param customerId
   * @return
   * @throws InterruptedException
   */
  public Customer getCustomer(int customerId) throws InterruptedException {
    Log.log("获取用户信息...");
    try {
      long minSleepTime = Configuration.PRESENTATION_MODE ? 300 : 500;
      long maxSleepTime = Configuration.PRESENTATION_MODE ? 400 : 1000;
      Thread.sleep(ThreadLocalRandom.current().nextLong(minSleepTime, maxSleepTime));
    } catch (InterruptedException e) {
      e.printStackTrace();
      Log.log("获取用户信息过程中出现错误!");
      throw e;
    }

    if (!Configuration.PRESENTATION_MODE) {
      if (ThreadLocalRandom.current().nextDouble() < 0.2) {
        Log.log("获取用户信息过程中出现错误!");
        throw new RuntimeException("获取用户信息过程中出现错误!");
      }
    }

    Log.log("获取用户信息完成.");
    return new Customer();
  }

  /**
   * 异步获取用户信息
   * @param customerId
   * @return
   */
  public CompletableFuture<Customer> getCustomerAsync(int customerId) {
    return CompletableFuture.supplyAsync(
        () -> {
          try {
            return getCustomer(customerId);
          } catch (InterruptedException e) {
            throw new RuntimeException(e);
          }
        });
  }
}

package org.sc.service;


import org.sc.config.Configuration;
import org.sc.entity.Order;
import org.sc.log.Log;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 订单服务
 */
public class OrderService {

  public Order getOrder(int orderId) throws InterruptedException {
    Log.log("获取订单信息...");
    try {
      long minSleepTime = Configuration.PRESENTATION_MODE ? 900 : 500;
      long maxSleepTime = 1000;
      Thread.sleep(ThreadLocalRandom.current().nextLong(minSleepTime, maxSleepTime));
    } catch (InterruptedException e) {
      e.printStackTrace();
      Log.log("获取订单信息失败!");
      throw e;
    }

    if (!Configuration.PRESENTATION_MODE) {
      if (ThreadLocalRandom.current().nextDouble() < 0.2) {
        Log.log("获取订单信息失败!");
        throw new RuntimeException("获取订单信息失败!");
      }
    }

    Log.log("获取订单信息完成.");
    return new Order();
  }

  public CompletableFuture<Order> getOrderAsync(int orderId) {
    return CompletableFuture.supplyAsync(
        () -> {
          try {
            return getOrder(orderId);
          } catch (InterruptedException e) {
            throw new RuntimeException(e);
          }
        });
  }
}

package org.sc.service;

import org.sc.config.Configuration;
import org.sc.entity.InvoiceTemplate;
import org.sc.log.Log;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 发票模版服务
 */
public class InvoiceTemplateService {

  /**
   * 获取发票模版
   * @param language
   * @return
   * @throws InterruptedException
   */
  public InvoiceTemplate getTemplate(String language) throws InterruptedException {
    Log.log("获取发票模版信息.");
    try {
      long minSleepTime = Configuration.PRESENTATION_MODE ? 600 : 500;
      long maxSleepTime = Configuration.PRESENTATION_MODE ? 700 : 1000;
      Thread.sleep(ThreadLocalRandom.current().nextLong(minSleepTime, maxSleepTime));
    } catch (InterruptedException e) {
      e.printStackTrace();
      Log.log("获取发票模版信息错误!");
      throw e;
    }

    double errorProbability = Configuration.PRESENTATION_MODE ? 0.5 : 0.2;
    if (ThreadLocalRandom.current().nextDouble() < errorProbability) {
      Log.log("获取发票模版错误!");
      throw new RuntimeException("获取发票模版错误!");
    }

    Log.log("获取发票模版完成.");
    return new InvoiceTemplate();
  }

  /**
   * 异步获取发票模版
   * @param language
   * @return
   */
  public CompletableFuture<InvoiceTemplate> getTemplateAsync(String language) {
    return CompletableFuture.supplyAsync(
        () -> {
          try {
            return getTemplate(language);
          } catch (InterruptedException e) {
            throw new RuntimeException(e);
          }
        });
  }
}

package org.sc.log;

/**
 * 日志
 */
public final class Log {

  private Log() {}

  public static void log(String description) {
    System.out.printf("[%s] %s%n", Thread.currentThread(), description);
  }
}
package org.sc.config;

/**
 * 配置
 */
public class Configuration {

  // 在演示模式下,服务的结果是可预测的
  public static final boolean PRESENTATION_MODE = true;
}
package org.sc.threadpool;

import org.sc.entity.Customer;
import org.sc.entity.Invoice;
import org.sc.entity.InvoiceTemplate;
import org.sc.entity.Order;
import org.sc.log.Log;
import org.sc.service.CustomerService;
import org.sc.service.InvoiceTemplateService;
import org.sc.service.OrderService;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 发票生成器
 */
public class InvoiceGenerator {

  private final OrderService orderService;
  private final CustomerService customerService;
  private final InvoiceTemplateService invoiceTemplateService;

  private final ExecutorService executor = Executors.newCachedThreadPool();

  public InvoiceGenerator(
      OrderService orderService,
      CustomerService customerService,
      InvoiceTemplateService invoiceTemplateService) {
    this.orderService = orderService;
    this.customerService = customerService;
    this.invoiceTemplateService = invoiceTemplateService;
  }

  /**
   * 创建发票
   * @param orderId
   * @param customerId
   * @param language
   * @return
   * @throws InterruptedException
   * @throws ExecutionException
   */
  Invoice createInvoice(int orderId, int customerId, String language)
      throws InterruptedException, ExecutionException {
    Log.log("开始提交任务");
    Log.log("1.获取订单信息...");
    Future<Order> orderFuture = executor.submit(() -> orderService.getOrder(orderId));
    Log.log("2.获取用户信息...");
    Future<Customer> customerFuture =
        executor.submit(() -> customerService.getCustomer(customerId));
    Log.log("3.获取发票模版信息...");
    Future<InvoiceTemplate> invoiceTemplateFuture =
        executor.submit(() -> invoiceTemplateService.getTemplate(language));
    Log.log("4.获取订单信息中...");
    Order order = orderFuture.get();
    Log.log("5.获取用户信息中...");
    Customer customer = customerFuture.get();
    Log.log("6.获取发票模版信息中...");
    InvoiceTemplate invoiceTemplate = invoiceTemplateFuture.get();

    Log.log("发票创建完成.");
    return Invoice.generate(order, customer, invoiceTemplate);
  }

  public static void main(String[] args) throws InterruptedException, ExecutionException {
    InvoiceGenerator invoiceGenerator =
            new InvoiceGenerator(
                    new OrderService(), new CustomerService(), new InvoiceTemplateService());
    invoiceGenerator.createInvoice(10012, 61157, "en");
  }
}
javac --enable-preview --source 22 -d target/classes src/main/java/org/sc/config/*.java src/main/java/org/sc/entity/*.java src/main/java/org/sc/log/*.java src/main/java/org/sc/service/*.java src/test/java/org/sc/threadpool/*.java
java -cp target/classes org.sc.threadpool/InvoiceGenerator

在这里插入图片描述

2.什么是结构化并发?

结构化并发的特点是并发子任务的起点和终点在代码中清晰可见。子任务中的错误将传播到父范围。这使得代码更易于阅读和维护,并确保所有启动的线程在作用域末尾完成。
结构化并发

3.结构化任务范围

3.1.以电商开发票场景为例(不考虑支付相关、物流相关环节)

以电商开发票场景为例(不考虑支付相关、物流相关环节
源代码

package org.sc.structuredtaskscope;



import org.sc.entity.Customer;
import org.sc.entity.Invoice;
import org.sc.entity.InvoiceTemplate;
import org.sc.entity.Order;
import org.sc.log.Log;
import org.sc.service.CustomerService;
import org.sc.service.InvoiceTemplateService;
import org.sc.service.OrderService;

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;


public class InvoiceGenerator {

  private final OrderService orderService;
  private final CustomerService customerService;
  private final InvoiceTemplateService invoiceTemplateService;

  public InvoiceGenerator(
      OrderService orderService,
      CustomerService customerService,
      InvoiceTemplateService invoiceTemplateService) {
    this.orderService = orderService;
    this.customerService = customerService;
    this.invoiceTemplateService = invoiceTemplateService;
  }

  Invoice createInvoice(int orderId, int customerId, String language) throws InterruptedException {
    try (var scope = new StructuredTaskScope<>()) {
      Log.log("开始提交任务");

      Subtask<Order> orderSubtask = scope.fork(() -> orderService.getOrder(orderId));
      Subtask<Customer> customerSubtask = scope.fork(() -> customerService.getCustomer(customerId));
      Subtask<InvoiceTemplate> invoiceTemplateSubtask =
          scope.fork(() -> invoiceTemplateService.getTemplate(language));

      Log.log("等待所有任务完成");
      scope.join();

      Log.log("检索结果");
      Order order = orderSubtask.get();
      Customer customer = customerSubtask.get();
      InvoiceTemplate template = invoiceTemplateSubtask.get();

      Log.log("发票创建完成.");
      return Invoice.generate(order, customer, template);
    }
  }

  public static void main(String[] args) throws InterruptedException {
    InvoiceGenerator invoiceGenerator =
            new InvoiceGenerator(
                    new OrderService(), new CustomerService(), new InvoiceTemplateService());
    invoiceGenerator.createInvoice(10012, 61157, "en");
  }
}
javac --enable-preview --source 22 -Xlint:preview -d target/classes  src/main/java/org/sc/config/*.java src/main/java/org/sc/entity/*.java src/main/java/org/sc/log/*.java src/main/java/org/sc/service/*.java src/test/java/org/sc/structuredtaskscope/*.java
java -cp target/classes --enable-preview org.sc.structuredtaskscope/InvoiceGenerator

在这里插入图片描述

4.结构化任务范围策略

在多线程并发场景下,如果任何一个线程发生异常该怎么处理呢?这里就会涉及结构化任务范围的处理策略。

4.1.失败时关闭策略

使用“失败时关闭”策略,我们可以指定一个任务发生异常将导致所有其他任务终止。
失败时关闭策略

package org.sc.structuredtaskscope.policy;

import org.sc.entity.Customer;
import org.sc.entity.Invoice;
import org.sc.entity.InvoiceTemplate;
import org.sc.entity.Order;
import org.sc.log.Log;
import org.sc.service.CustomerService;
import org.sc.service.InvoiceTemplateService;
import org.sc.service.OrderService;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.StructuredTaskScope;


public class InvoiceGenerator {

  private final OrderService orderService;
  private final CustomerService customerService;
  private final InvoiceTemplateService invoiceTemplateService;

  public InvoiceGenerator(
      OrderService orderService,
      CustomerService customerService,
      InvoiceTemplateService invoiceTemplateService) {
    this.orderService = orderService;
    this.customerService = customerService;
    this.invoiceTemplateService = invoiceTemplateService;
  }

  Invoice createInvoice(int orderId, int customerId, String language)
      throws InterruptedException, ExecutionException {
      try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
      Log.log("开始提交任务");

      StructuredTaskScope.Subtask<Order> orderSubtask = scope.fork(() -> orderService.getOrder(orderId));
      StructuredTaskScope.Subtask<Customer> customerSubtask = scope.fork(() -> customerService.getCustomer(customerId));
      StructuredTaskScope.Subtask<InvoiceTemplate> invoiceTemplateSubtask =
          scope.fork(() -> invoiceTemplateService.getTemplate(language));

      Log.log("等待所有任务完成");
      scope.join();
      scope.throwIfFailed();

      Log.log("检索结果");
      Order order = orderSubtask.get();
      Customer customer = customerSubtask.get();
      InvoiceTemplate template = invoiceTemplateSubtask.get();

      Log.log("发票创建完成.");
      return Invoice.generate(order, customer, template);
    }
  }

  public static void main(String[] args) throws InterruptedException, ExecutionException {
      InvoiceGenerator invoiceGenerator =
              new InvoiceGenerator(
                      new OrderService(), new CustomerService(), new InvoiceTemplateService());
      invoiceGenerator.createInvoice(10012, 61157, "en");
    }

}
javac --enable-preview --source 22 -Xlint:preview -d target/classes  src/main/java/org/sc/config/*.java src/main/java/org/sc/entity/*.java src/main/java/org/sc/log/*.java src/main/java/org/sc/service/*.java src/test/java/org/sc/structuredtaskscope/policy/*.java
java -cp target/classes --enable-preview org.sc.structuredtaskscope.policy/InvoiceGenerator

在这里插入图片描述

4.2.成功时关闭策略

只要一个子任务成功,作用域就会终止。其他子任务将终止。
成功时关闭策略

package org.sc.entity;

/**
 * 地址
 */
public record Address(
    String addressLine1,
    String addressLine2,
    String city,
    String state,
    String postalCode,
    String countryCode) {}

package org.sc.entity;

/**
 * 地址验证结果
 */
public record AddressVerificationResponse(String value) {}

package org.sc.service;



import org.sc.config.Configuration;
import org.sc.entity.Address;
import org.sc.entity.AddressVerificationResponse;
import org.sc.log.Log;

import java.util.concurrent.ThreadLocalRandom;

/**
 * 地址验证服务
 */
public class AddressVerificationService {

  public AddressVerificationResponse verifyViaServiceA(Address address)
      throws InterruptedException {
    return verifyViaService(address, "A");
  }

  public AddressVerificationResponse verifyViaServiceB(Address address)
      throws InterruptedException {
    return verifyViaService(address, "B");
  }

  public AddressVerificationResponse verifyViaServiceC(Address address)
      throws InterruptedException {
    return verifyViaService(address, "C");
  }

  private AddressVerificationResponse verifyViaService(Address address, String service)
      throws InterruptedException {
    Log.log("通过服务" + service+" 去验证地址。");
    try {
      // In presentation mode, we want the services to return in a predictable order
      long minSleepTime =
          Configuration.PRESENTATION_MODE
              ? (switch (service) {
                case "A" -> 900;
                case "B" -> 300;
                case "C" -> 600;
                default -> throw new IllegalStateException();
              })
              : 500;
      long maxSleepTime = Configuration.PRESENTATION_MODE ? minSleepTime + 100 : 1000;
      Thread.sleep(ThreadLocalRandom.current().nextLong(minSleepTime, maxSleepTime));
    } catch (InterruptedException e) {
      e.printStackTrace();
      Log.log("通过服务" + service + " 去验证地址错误!");
      throw e;
    }

    // 在演示模式下,
    // a) 所有的地址都不可达,会得到一个错误
    // b) 所有地址都可达到,会得到第一个响应都地址,会取消其它两个
    // c) 第一不可达,会得到第二个可达都响应,取消第三个
    double errorProbability =
        Configuration.PRESENTATION_MODE ? (service.equals("B") ? 0.5 : 1.0) : 0.75;
    if (ThreadLocalRandom.current().nextDouble() < errorProbability) {
      Log.log("通过服务" + service+ " 去验证地址错误!");
      throw new RuntimeException("通过服务" + service+ " 去验证地址错误!");
    }

    Log.log("通过服务" + service+ " 完成了地址验证.");
    return new AddressVerificationResponse("来自服务" + service+" 验证响应.");
  }
}
package org.sc.structuredtaskscope.policy;

import org.sc.entity.Address;
import org.sc.entity.AddressVerificationResponse;
import org.sc.log.Log;
import org.sc.service.AddressVerificationService;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.StructuredTaskScope;

public class ShutdownOnSuccess {

  public static void main(String[] args) throws InterruptedException, ExecutionException {
    ShutdownOnSuccess addressVerification =
        new ShutdownOnSuccess(new AddressVerificationService());
    AddressVerificationResponse response =
        addressVerification.verifyAddress(
            new Address("四川省成都市天府新区华阳富强街66号1层", null, "成都", "CD", "610213", "ZH"));
    Log.log("结果: " + response);
  }

  private final AddressVerificationService verificationService;

  public ShutdownOnSuccessInvoiceGenerator(AddressVerificationService verificationService) {
    this.verificationService = verificationService;
  }

  AddressVerificationResponse verifyAddress(Address address)
      throws InterruptedException, ExecutionException {
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<AddressVerificationResponse>()) {
      Log.log("开始提交任务");

      scope.fork(() -> verificationService.verifyViaServiceA(address));
      scope.fork(() -> verificationService.verifyViaServiceB(address));
      scope.fork(() -> verificationService.verifyViaServiceC(address));

      Log.log("等待所有任务完成");

      scope.join();

      Log.log("检索结果");

      return scope.result();
    }
  }
}
javac --enable-preview --source 22 -Xlint:preview -d target/classes  src/main/java/org/sc/config/*.java src/main/java/org/sc/entity/*.java src/main/java/org/sc/log/*.java src/main/java/org/sc/service/*.java src/test/java/org/sc/structuredtaskscope/policy/*.java
java -cp target/classes --enable-preview org.sc.structuredtaskscope.policy/ShutdownOnSuccess

在这里插入图片描述

4.3.自定义策略

假设场景:从众多服务提供商中,将最终可以成功提供服务的供应商中选择服务最好的供应商。
在这里插入图片描述

package org.sc.customize;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.StructuredTaskScope;
import java.util.function.Supplier;

/**
 * 自定义策略
 * @param <T>
 */
public class BestResultScope<T> extends StructuredTaskScope<T> {

  private final Comparator<T> comparator;

  private T bestResult;
  private final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<>());

  public BestResultScope(Comparator<T> comparator) {
    this.comparator = comparator;
  }

  @Override
  protected void handleComplete(Subtask<? extends T> subtask) {
    switch (subtask.state()) {
      case UNAVAILABLE -> {
        // Ignore
      }
      case SUCCESS -> {
        T result = subtask.get();
        synchronized (this) {
          if (bestResult == null || comparator.compare(result, bestResult) > 0) {
            bestResult = result;
          }
        }
      }
      case FAILED -> exceptions.add(subtask.exception());
    }
  }

  public <X extends Throwable> T resultOrElseThrow(Supplier<? extends X> exceptionSupplier)
      throws X {
    ensureOwnerAndJoined();
    if (bestResult != null) {
      return bestResult;
    } else {
      X exception = exceptionSupplier.get();
      exceptions.forEach(exception::addSuppressed);
      throw exception;
    }
  }
}
package org.sc.entity;

/**
 * 服务提供商信息
 */
public record SupplierDeliveryTime(String supplier, int deliveryTimeHours) {}

package org.sc.exception;

public class SupplierDeliveryTimeCheckException extends Exception {}

package org.sc.service;


import org.sc.entity.SupplierDeliveryTime;
import org.sc.log.Log;

import java.util.concurrent.ThreadLocalRandom;

/**
 * 服务提供商服务处理时间服务
 */
public class SupplierDeliveryTimeService {

  private final boolean failAll;

  public SupplierDeliveryTimeService(boolean failAll) {
    this.failAll = failAll;
  }

  public SupplierDeliveryTime getDeliveryTime(String productId, String supplier)
      throws InterruptedException {
    Log.log("服务提供商" + supplier+"的处理时间.");

    try {
      Thread.sleep(ThreadLocalRandom.current().nextLong(250, 1000));
    } catch (InterruptedException e) {
      e.printStackTrace();
      Log.log("服务提供商" + supplier + " 处理错误!");
      throw e;
    }

    // 40% failure probability --> 2 out of 5 requests should fail
    if (failAll || ThreadLocalRandom.current().nextDouble() < 0.4) {
      Log.log("服务提供商" + supplier+ " 处理错误!");
      throw new RuntimeException("服务提供商" + supplier+ " 处理错误!");
    }

    int deliveryTimeHours = ThreadLocalRandom.current().nextInt(1, 7 * 24);

    Log.log(
        "服务提供商处理时间 %s: %d hours"
            .formatted(supplier, deliveryTimeHours));

    return new SupplierDeliveryTime(supplier, deliveryTimeHours);
  }
}

package org.sc.structuredtaskscope.policy;

import org.sc.customize.BestResultScope;
import org.sc.entity.SupplierDeliveryTime;
import org.sc.exception.SupplierDeliveryTimeCheckException;
import org.sc.log.Log;
import org.sc.service.SupplierDeliveryTimeService;

import java.util.Comparator;
import java.util.List;


public class Customize {

  private static final boolean FAIL_ALL = false;

  public static void main(String[] args)
      throws SupplierDeliveryTimeCheckException, InterruptedException {
    Customize supplierDeliveryTimeCheck =
        new Customize(
            new SupplierDeliveryTimeService(FAIL_ALL));
    SupplierDeliveryTime response =
        supplierDeliveryTimeCheck.getSupplierDeliveryTime(
            "兔子", List.of("A", "B", "C", "D", "E"));
    Log.log("响应: " + response);
  }

  private final SupplierDeliveryTimeService service;

  public Customize(SupplierDeliveryTimeService service) {
    this.service = service;
  }

  SupplierDeliveryTime getSupplierDeliveryTime(String productId, List<String> supplierIds)
      throws SupplierDeliveryTimeCheckException, InterruptedException {
    try (var scope =
        new BestResultScope<>(
            Comparator.comparing(SupplierDeliveryTime::deliveryTimeHours).reversed())) {
      for (String supplierId : supplierIds) {
        scope.fork(() -> service.getDeliveryTime(productId, supplierId));
      }

      scope.join();
      return scope.resultOrElseThrow(SupplierDeliveryTimeCheckException::new);
    }
  }
}

javac --enable-preview --source 22 -Xlint:preview -d target/classes  src/main/java/org/sc/config/*.java src/main/java/org/sc/entity/*.java src/main/java/org/sc/log/*.java src/main/java/org/sc/service/*.java src/main/java/org/sc/customize/*.java src/main/java/org/sc/exception/*.java src/test/java/org/sc/structuredtaskscope/policy/*.java
java -cp target/classes --enable-preview org.sc.structuredtaskscope.policy/Customize

在这里插入图片描述

5.嵌套结构化任务范围

场景:不仅想一次查询一种产品的供应商,还想查询多种产品的供应商。

package org.sc.structuredtaskscope.policy;

import org.sc.customize.BestResultScope;
import org.sc.entity.SupplierDeliveryTime;
import org.sc.exception.SupplierDeliveryTimeCheckException;
import org.sc.log.Log;
import org.sc.service.SupplierDeliveryTimeService;

import java.util.Comparator;
import java.util.List;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.concurrent.StructuredTaskScope.Subtask.State;


public class NestedStructuredTaskScope {

  private static final boolean FAIL_ALL = false;

  private final SupplierDeliveryTimeService service;

  public NestedStructuredTaskScope(SupplierDeliveryTimeService service) {
    this.service = service;
  }

  List<SupplierDeliveryTime> getSupplierDeliveryTimes(
      List<String> productIds, List<String> supplierIds) throws InterruptedException {
    try (var scope = new StructuredTaskScope<SupplierDeliveryTime>()) {
      List<Subtask<SupplierDeliveryTime>> subtasks =
          productIds.stream()
              .map(productId -> scope.fork(() -> getSupplierDeliveryTime(productId, supplierIds)))
              .toList();

      scope.join();

      return subtasks.stream()
          .filter(subtask -> subtask.state() == State.SUCCESS)
          .map(Subtask::get)
          .toList();
    }
  }

  SupplierDeliveryTime getSupplierDeliveryTime(String productId, List<String> supplierIds)
      throws SupplierDeliveryTimeCheckException, InterruptedException {
    try (var scope =
        new BestResultScope<>(
            Comparator.comparing(SupplierDeliveryTime::deliveryTimeHours).reversed())) {
      for (String supplierId : supplierIds) {
        scope.fork(() -> service.getDeliveryTime(productId, supplierId));
      }

      scope.join();
      return scope.resultOrElseThrow(SupplierDeliveryTimeCheckException::new);
    }
  }

  public static void main(String[] args)
          throws SupplierDeliveryTimeCheckException, InterruptedException {
    NestedStructuredTaskScope supplierDeliveryTimeCheck =
            new NestedStructuredTaskScope(
                    new SupplierDeliveryTimeService(FAIL_ALL));
    List<SupplierDeliveryTime> responses =
            supplierDeliveryTimeCheck.getSupplierDeliveryTimes(
                    List.of("兔子", "猫", "狗狗"), List.of("A", "B", "C", "D", "E"));
    Log.log("响应: " + responses);
  }
}
javac --enable-preview --source 22 -Xlint:preview -d target/classes  src/main/java/org/sc/config/*.java src/main/java/org/sc/entity/*.java src/main/java/org/sc/log/*.java src/main/java/org/sc/service/*.java src/main/java/org/sc/customize/*.java src/main/java/org/sc/exception/*.java src/test/java/org/sc/structuredtaskscope/policy/*.java
java -cp target/classes --enable-preview org.sc.structuredtaskscope.policy/NestedStructuredTaskScope

在这里插入图片描述

6.结构化并发的优点

  • 任务和子任务在代码中形成一个独立的单元——ExecutorService在更高范围内不存在。线程不是来自线程池;相反,每个子任务都在一个新的虚拟线程中执行;
  • try-with-resources 块跨越的范围导致所有线程的明确起点和终点;
  • 在范围结束时,所有线程都已完成;
  • 子任务内的错误会被干净地传播到父范围;
  • 根据策略,如果某个子任务成功或出现错误,则剩余的子任务将被中止;
  • 当调用线程被取消时,子任务也会被取消;
  • 调用线程和子任务执行线程之间的调用层次结构在线程转储中可见;
  • 此外,StructuredTaskScope还有助于调试:如果我们以新的 JSON 格式创建线程转储(jcmd Thread.dump_to_file -format=json ),那么它将反映父线程和子线程之间的调用层次结构。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1533454.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于SpringBoot的高校办公室行政事务管理系统

采用技术 基于SpringBoot的高校办公室行政事务管理系统的设计与实现~ 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringBootMyBatis 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 页面展示效果 功能清单 教师信息管理 办公室管理 办公物资管…

Ant Design Vue和VUE3下的upload组件使用以及文件预览

Ant Design Vue和VUE3下的upload组件使用以及文件预览 用到技术&#xff1a;Ant Design Vue、VUE3、xlsx的文件预览功能&#xff08;也可预览txt&#xff0c;csv&#xff09; 一、多文件上传 需求 可以多文件上传文件先上传到本地&#xff0c;点击开始上传再通过后端接口继续…

QT信号和槽机制connect用法

信号与槽机制是绝对不可或缺且常用的&#xff0c;其中的参数一般都会比较简单&#xff0c;bool、int、QString之类的&#xff0c;但当我们想要传递相对比较复杂的参数&#xff0c;例如QVector<int>、QList<QString>&#xff0c;以及一些我们自定义的结构体时&#…

【timm笔记1】

1. 安装timm pip install timm2. 打印模型 import timm# 获取并打印所有可用的预训练模型名称 available_models = timm.list_models() # 打印出所有的模型 print(available_models)# 打印所有包含"resnet"字符的模型名称 resnet_models = timm.list_models(*resne…

C# 数组(Array)

C# 数组&#xff08;Array&#xff09; 初始化数组 声明一个数组不会在内存中初始化数组。当初始化数组变量时&#xff0c;您可以赋值给数组。 数组是一个引用类型&#xff0c;所以您需要使用 new 关键字来创建数组的实例。 例如&#xff1a; double[] b new double[10];…

排序算法:归并排序(递归)

文章目录 一、归并排序的思路二、代码编写 先赞后看&#xff0c;养成习惯&#xff01;&#xff01;&#xff01;^ _ ^<3 ❤️ ❤️ ❤️ 码字不易&#xff0c;大家的支持就是我坚持下去的动力。点赞后不要忘了关注我哦&#xff01; 所属专栏:排序算法 一、归并排序的思路 单…

C++基础基础基础杂谈,面试理解,看看就行,有不对欢迎指出

20240321 面试 今天面了一家公司&#xff0c;遇到几个之前没有遇到的问题&#xff0c;回答得不是特别好&#xff0c;对此进行归纳整理&#xff0c;顺道吐槽一下&#xff0c;最近西安的公司是真的一言难尽啊&#xff0c;有好的公司推荐下小弟我。 联系方式 &#xff11;&#…

LeetCode-60题:排列序列解法二(原创)

【题目描述】 给出集合 [1,2,3,...,n]&#xff0c;其所有元素共有 n! 种排列。按大小顺序列出所有排列情况&#xff0c;并一一标记&#xff0c;当 n 3 时, 所有排列如下&#xff1a;"123" 、"132" 、"213" 、"231"、"312"、…

element-ui实现证件照上传预览下载组件封装

element-ui实现证件照上传预览下载组件封装 效果&#xff1a; 参数说明 我只写了两个参数&#xff0c;后续有需求再对其组件进行丰富~ 参数说明fileListProp用来存储上传后后端返回的图片UR了uploadUrl图片上传反悔的URL后端接口地址 父组件调用&#xff1a; <au-upload…

OSPF-1类Router LSA学习

前面我们又复习了一遍OSPF概述&#xff0c;在OSPF建立关系后有几种交互报文&#xff0c;通过LSU类型报文包含LSA信息实现路由信息传递&#xff0c;常见了1、2、3、4、5、7类LSA&#xff0c;分别对应不同功能使用。这里先看下1类LSA-Router LSA。 一、LSA概述 LSA&#xff0c;全…

vscode中断无法识别npm的命令解决方案

在cmd中可以正常执行npm -v等指令,但是在vs code终端中,无法执行npm -v,node -v等指令 出现报错&#xff1a; 解决办法&#xff1a; 方法一&#xff1a;【右键单击Vscode】以【管理员身份运行】&#xff0c;【重启Vscode】&#xff08;这种办法每次打开都用管理员身份比较麻烦…

【实例】React 组件传值方法: Props、回调函数、Context、路由传参

React原始传值方法 借用状态管理库的传值见下篇文&#xff1a;MobX 和 Redux 【实例】React 状态管理库 MobX Redux 入门及对比 文章目录 React原始传值方法父组件->子组件 props子组件->父组件 回调函数Context路由传值 父组件->子组件 props 父组件可以通过属性&a…

字符串筛选排序 - 华为OD统一考试(C卷)

OD统一考试(C卷) 分值: 100分 题解: Java / Python / C++ 题目描述 输入一个由n个大小写字母组成的字符串, 按照 ASCII 码值从小到大的排序规则,查找字符串中第 k 个最小ASCII 码值的字母(k>=1) , 输出该字母所在字符串的位置索引(字符串的第一个字符位置索引为0) 。…

电源配小了,是不是容易烧?是的!

电源小的话会不会容易烧毁&#xff1f; 是的。 功率电压*电流。 随着功率增大&#xff0c;电压不变&#xff0c;电流增大&#xff0c;发热量增大&#xff0c;可能会烧毁。 今天给大家推荐一款650w的电脑电源&#xff0c;不过在推荐之前&#xff0c;首先要确认自己的电脑功耗…

OpenCV4.9.0在Android 开发简介

查看&#xff1a;OpenCV系列文章目录&#xff08;持续更新中......&#xff09; 上一篇&#xff1a;使用 Clojure 进行 OpenCV 开发简介 下一篇&#xff1a;暂无 引言&#xff1a; OpenCV是一个跨平台计算机视觉库&#xff0c;广泛用于图像处理、计算机视觉和机器学习等领域…

【翻译】Attentive Pooling Networks

摘要&#xff1a; 在这项工作中&#xff0c;我们提出了注意力池化&#xff08;AP&#xff09;&#xff0c;一种用于判别模型训练的双向注意力机制。在使用神经网络进行成对排序或分类的背景下&#xff0c;AP使得池化层能够意识到当前的输入对&#xff0c;使得两个输入项的信息…

AI系统性学习03—ChatGPT开发教程

文章目录 1、OpenAI关键概念⭐️2、OpenAI SDK介绍3、OpenAI API KEY&API 认证3.1 REST API安全认证 4、OpenAI模型⭐️4.1 模型分类4.2 GPT44.3 GPT-3.54.4 Embeddings 5、OpenAI快速入门6、Function calling(函数调用)⭐️⭐️⭐️6.1 应用场景6.2 支持function calling的…

稀碎从零算法笔记Day23-LeetCode:相同的树

题型&#xff1a;二叉树的遍历、链表 链接&#xff1a;100. 相同的树 - 力扣&#xff08;LeetCode&#xff09; 来源&#xff1a;LeetCode 题目描述 给你两棵二叉树的根节点 p 和 q &#xff0c;编写一个函数来检验这两棵树是否相同。 如果两个树在结构上相同&#xff0c;…

ERP系统在企业管理中的重要作用

效率和盈利能力是企业在商业领域取得成功的两大关键要素。企业希望以尽可能高效的方式执行必要的、有利可图的业务流程&#xff0c;但又需要在保持最低运营成本的同时最大化企业的底线利润。要实现这种高效和高盈利的水平&#xff0c;企业需要扩展其业务流程管理策略&#xff0…

python网络爬虫实战教学——urllib的使用(3)

文章目录 专栏导读1、urlsplit2、urlunsplit3、urljoin4、urlencode 专栏导读 ✍ 作者简介&#xff1a;i阿极&#xff0c;CSDN 数据分析领域优质创作者&#xff0c;专注于分享python数据分析领域知识。 ✍ 本文录入于《python网络爬虫实战教学》&#xff0c;本专栏针对大学生、…