java中CompletableFuture异步编程详解以及实践案例

news2024/11/18 1:38:19

文章目录

    • 一、CompletableFuture的使用
      • 1、 创建CompletableFuture的方式
      • 2、 获得异步执行结果
      • 3、 对执行结果进行处理
      • 4、对执行结果进行消费
      • 5、异常处理
      • 6、 两组任务按顺序执行
      • 7、 两组任务谁快用谁
      • 8、 两组任务完成后合并
      • 9、 多任务组合
    • 二、一个使用CompletableFuture异步编排的例子

Future接口用于 代表异步计算的结果,通过Future接口提供的方法可以查看异步计算是否执行完成,或者等待执行结果并获取执行结果,同时还可以取消执行

列举**Future**接口的方法:

  • get()获取任务执行结果,如果任务还没完成则会阻塞等待直到任务执行完成。如果任务被取消则会抛出CancellationException异常,如果任务执行过程发生异常则会抛出ExecutionException异常,如果阻塞等待过程中被中断则会抛出InterruptedException异常。
  • get(long timeout,Timeunit unit)带超时时间的**get()**方法,如果**阻塞等待过程中超时**则会抛出TimeoutException异常。
  • cancel()用于取消异步任务的执行。如果异步任务已经完成或者已经被取消,或者由于某些原因不能取消,则会返回false。如果任务还没有被执行,则会返回true并且异步任务不会被执行。如果任务已经开始执行了但是还没有执行完成,若mayInterruptIfRunning为true,则会立即中断执行任务的线程并返回true,若mayInterruptIfRunning为false,则会返回true且不会中断任务执行线程。
  • isCanceled()判断任务是否被取消,如果任务在结束(正常执行结束或者执行异常结束)前被取消则返回true,否则返回false。
  • isDone()判断任务是否已经完成,如果完成则返回true,否则返回false。需要注意的是:任务执行过程中发生异常、任务被取消也属于任务已完成,也会返回true。

使用**Future接口和Callable**接口实现异步执行:

public static void main(String[] args) {
  // 快速创建线程池
  ExecutorService executorService = Executors.newFixedThreadPool(4);
  // 获取商品基本信息(可以使用Lambda表达式简化Callable接口,这里为了便于观察不使用)
  Future<String> future1 = executorService.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {
      return "获取到商品基本信息";
    }
  });
  // 获取商品图片信息
  Future<String> future2 = executorService.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {
      return "获取商品图片信息";
    }
  });
  // 获取商品促销信息
  Future<String> future3 = executorService.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {
      return "获取商品促销信息";
    }
  });
  // 获取商品各种类基本信息
  Future<String> future4 = executorService.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {
      return "获取商品各种类基本信息";
    }
  });
        // 获取结果
  try {
    System.out.println(future1.get());
    System.out.println(future2.get());
    System.out.println(future3.get());
    System.out.println(future4.get());
  } catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
  }finally {
    executorService.shutdown();
  }
}

简述一下Future接口的弊端:

  • 不支持手动完成
    • 当提交了一个任务,但是执行太慢了,通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成。
  • 不支持进一步的非阻塞调用
    • 通过Future的get()方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为 Future 不支持回调函数,所以无法实现这个功能。
  • 不支持链式调用
    • 对于Future的执行结果,想继续传到下一个Future处理使用,从而形成一个链式的pipline调用,这在 Future中无法实现。
  • 不支持多个 Future 合并
    • 比如有10个Future并行执行,想在所有的Future运行完毕之后,执行某些函数,是无法通过Future实现的。
  • 不支持异常处理
    • Future的API没有任何的异常处理的api,所以在异步运行时,如果出了异常问题不好定位。

使用Future接口可以通过get()阻塞式获取结果或者通过轮询+isDone()**非阻塞式获取结果,但是前一种方法会阻塞,后一种会耗费CPU资源**,所以JDK的Future接口实现异步执行对获取结果不太友好,所以在JDK8时推出了CompletableFuture实现异步编排
`

一、CompletableFuture的使用

JDK8中新增加了一个包含50个方法左右的类CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

CompletableFuture类实现了Future接口和CompletionStage接口,即除了可以使用Future接口的所有方法之外,CompletionStage<T>接口提供了更多方法来更好的实现异步编排,并且大量的使用了JDK8引入的函数式编程概念。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kskk1GQQ-1688700220533)(image/image_4lCef11cwd.png)]

`

1、 创建CompletableFuture的方式

  1. 使用**new**关键字创建
// 无返回结果
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// 已知返回结果
CompletableFuture<String> completableFuture = new CompletableFuture<>("result");
// 已知返回结果(底层其实也是带参数的构造器赋值)
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("result");

创建一个返回结果类型为String的CompletableFuture,可以使用Future接口的get()方法获取该值(同样也会阻塞)。

可以使用无参构造器返回一个没有结果的CompletableFuture,也可以通过构造器的传参CompletableFuture设置好返回结果,或者使用CompletableFuture.completedFuture(U value)构造一个已知结果的CompletableFuture。

  1. 使用CompletableFuture类的静态工厂方法(常用)

runAsync()** 无返回值**​

// 使用默认线程池
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 使用自定义线程池(推荐)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) 

runAsync()方法的参数是Runnable接口,这是一个函数式接口,不允许返回值。当需要异步操作且不关心返回结果的时候可以使用**runAsync()**方法。

// 例子
public static void main(String[] args) {
    // 快速创建线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        // 通过Lambda表达式实现Runnable接口
        CompletableFuture.runAsync(()-> System.out.println("获取商品基本信息成功"), executor).get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

supplyAsync()** 有返回值**​

// 使用默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 使用自定义线程池(推荐)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

supplyAsync()方法的参数是Supplier<U>供给型接口(无参有返回值),这也是一个函数式接口,**U是返回结果值的类型当需要异步操作且关心返回结果的时候,可以使用supplyAsync()**方法。

// 例子
public static void main(String[] args) {
  // 快速创建线程池
  ExecutorService executor = Executors.newFixedThreadPool(4);
  try {
    // 通过Lambda表达式实现执行内容,并返回结果通过CompletableFuture接收
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
      System.out.println("获取商品信息成功");
      return "信息";
    }, executor);
    // 输出结果
    System.out.println(completableFuture.get());
  } catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
  }finally {
    executor.shutdown();
  }
}  

在没有指定第二个参数(即没有指定线程池)时,CompletableFuture直接使用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码。

在实际生产中会使用自定义的线程池来执行异步代码
`

2、 获得异步执行结果

  1. get()** 阻塞式获取执行结果**​
public T get() throws InterruptedException, ExecutionException

该方法调用后如果任务还没完成则会阻塞等待直到任务执行完成。如果任务执行过程发生异常则会抛出ExecutionException异常,如果阻塞等待过程中被中断则会抛出InterruptedException异常。

  1. get(long timeout, TimeUnit unit)** 带超时的阻塞式获取执行结果**​
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException

该方法调用后如果如果任务还没完成则会阻塞等待直到任务执行完成或者超出timeout时间,如果阻塞等待过程中超时则会抛出TimeoutException异常。

  1. getNow(T valueIfAbsent)** 立刻获取执行结果**​
public T getNow(T valueIfAbsent)

该方法调用后,会立刻获取结果不会阻塞等待。如果任务完成则直接返回执行完成后的结果,如果任务没有完成,则返回调用方法时传入的参数valueIfAbsent值。

  1. join()** 不抛异常的阻塞时获取执行结果**​
public T join()

该方法和get()方法作用一样,只是不会抛出异常

  1. complete(T value)** 主动触发计算,返回异步是否执行完毕**​
public boolean complete(T value)

该方法调用后,会主动触发计算结果,如果此时异步执行并没有完成(此时boolean值返回true),则通过get()拿到的数据会是complete()设置的参数value值,如果此时异步执行已经完成(此时boolean值返回false),则通过get()拿到的就是执行完成的结果。

// 例子
public static void main(String[] args) {
    // 快速创建线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        // 通过Lambda表达式实现执行内容,并返回结果通过CompletableFuture接收
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            // 休眠2秒,使得异步执行变慢,会导致主动触发计算先执行,此时返回的get就是555
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            return 666;
        }, executor);
        // 主动触发计算,判断异步执行是否完成
        System.out.println(completableFuture.complete(555));
        // 输出结果
        System.out.println(completableFuture.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输出结果:
    true
    555
**/

`

3、 对执行结果进行处理

whenComplete** 等待前面任务执行完再执行当前处理**​

public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action)

会在之前任务执行完成后继续执行whenComplete里的内容(whenComplete传入的action只是对之前任务的结果进行处理),即使用该方法可以避免前面说到的Future接口的问题,不再需要通过阻塞或者轮询的方式去获取结果,而是通过调用该方法等任务执行完毕自动调用。

该方法的参数为BiConsumer<? super T, ? super Throwable> action消费者接口,可以接收两个参数,一个是任务执行完的结果,一个是执行任务时的异常

// 例子
public static void main(String[] args) {
    // 快速创建线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture.supplyAsync(() -> 666, executor)
                .whenComplete((res, ex) -> System.out.println("任务执行完毕,结果为" + res + " 异常为" + ex)
                );
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输出结果:
    任务执行完毕,结果为666 异常为null
**/

除了上述的方法外,还有一些类似的方法如**XXXAsync()或者是XXXAsync(XX,Executor executor)**,对于这些方法,这里统一说明,后续文章中将不会再列举

public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action)

public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action, Executor executor)

XXXAsync():表示上一个任务执行完成后,不会再使用之前任务中的线程,而是重新使用从默认线程(ForkJoinPool 线程池)中重新获取新的线程执行当前任务

XXXAsync(XX,Executor executor):表示不会沿用之前任务的线程,而是使用自己第二个参数指定的线程池重新获取线程执行当前任务
`

4、对执行结果进行消费

  1. thenRun** 前面任务执行完后执行当前任务,不关心前面任务的结果,也没返回值**​
public CompletableFuture<Void> thenRun(Runnable action)

CompletableFuture.supplyAsync(actionA).thenRun(actionB)像这样链式调用该方法表示:执行任务A完成后接着执行任务B,但是任务B不需要A的结果,并且执行完任务B也不会返回结果

thenRun(Runnable action)的参数为Runnable接口即没有传入参数

// 例子
public static void main(String[] args) {
  // 快速创建线程池
  ExecutorService executor = Executors.newFixedThreadPool(4);
  try {
    CompletableFuture.supplyAsync(() -> 666, executor)
                    .thenRun(() -> System.out.println("我都没有参数怎么拿到之前的结果,我也没有返回值。")
                );
  } catch (Exception e) {
    e.printStackTrace();
  }finally {
    executor.shutdown();
  }
}

/**
输出结果:
    我都没有参数怎么拿到之前的结果,我也没有返回值。
**/
  1. thenAccept** 前面任务执行完后执行当前任务,消费前面的结果,没有返回值**​
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)

CompletableFuture.supplyAsync(actionA).thenRun(actionB)像这样链式调用该方法表示:执行任务A完成后接着执行任务B,而且任务B需要A的结果,但是执行完任务B不会返回结果

thenAccept(Consumer<? super T> action)的参数为消费者接口,即可以传入一个参数,该参数为上一个任务的执行结果。

// 例子
public static void main(String[] args) {
    // 快速创建线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture.supplyAsync(() -> 666, executor)
                .thenAccept((res) -> System.out.println("我能拿到上一个的结果" + res + ",但是我没法传出去。")
                );
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输出结果:
    我能拿到上一个的结果666,但是我没法传出去。
**/
  1. thenApply** 前面任务执行完后执行当前任务,消费前面的结果,具有返回值**​
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

CompletableFuture.supplyAsync(actionA).thenRun(actionB)像这样链式调用该方法表示:执行任务A完成后接着执行任务B,而且任务B需要A的结果,并且执行完任务B需要有返回结果

thenApply(Function<? super T,? extends U> fn)的参数为函数式接口,即可以传入一个参数类型为T,该参数是上一个任务的执行结果,并且函数式接口需要有返回值,类型为U。

// 例子
public static void main(String[] args) {
    // 快速创建线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture.supplyAsync(() -> 666, executor)
                .thenApply((res) -> {
                        System.out.println("我能拿到上一个的结果" + res + "并且我要将结果传出去");
                        return res;
                    }
                ).whenComplete((res, ex) -> System.out.println("结果" + res));
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}
/**
输出结果:
    我能拿到上一个的结果666并且我要将结果传出去
    结果666
**/

参数不同:whenComplete方法需要传入一个BiConsumer参数,接收当前任务的执行结果和异常信息;而thenAccept方法需要传入一个Consumer参数,仅接收当前任务的执行结果。

返回值不同:whenComplete方法没有返回值,仅用于处理任务执行完毕后的回调操作;while thenAccept方法返回一个CompletableFuture对象,可以继续进行链式操作。

用途不同:whenComplete方法通常用于记录日志、清理资源、发送消息等无需返回结果的操作;而thenAccept方法通常用于接收并处理任务的执行结果,进一步进行数据处理或输出。
`

5、异常处理

  1. exceptionally** 异常捕获,只消费前面任务中出现的异常信息,具有返回值**​
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

可以通过链式调用该方法来获取异常信息,并且具有返回值。如果某一个任务出现异常被exceptionally捕获到则剩余的任务将不会再执行。类似于Java异常处理的catch。

exceptionally(Function<Throwable, ? extends T> fn)的参数是函数式接口,具有一个参数以及返回值,该参数为前面任务的异常信息。

// 例子
public static void main(String[] args) {
    // 快速创建线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture.supplyAsync(() -> {
                    if (Math.random() < 0.5) throw new RuntimeException("error");
                    return 666;
                }, executor)
                .thenApply((res) -> {
                    System.out.println("不出现异常,结果为" + res);
                    return res;
                }).exceptionally((ex) -> {
                    ex.printStackTrace();
                    return -1;
                });
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输出结果:
// 这是不出现异常的情况
不出现异常,结果为666

// 这是出现异常的情况
java.util.concurrent.CompletionException: java.lang.RuntimeException: error
        at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
        at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: error
        at com.xqsr.review.thread.ThreadTest.lambda$main$0(ThreadTest.java:15)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        ... 3 more
**/
  1. handle** 异常处理,消费前面的结果及异常信息,具有返回值,不会中断后续任务**​
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

可以通过链式调用该方法可以跟thenApply()一样可以消费前面任务的结果并完成自己任务内容,并且具有返回值。不同之处在于出现异常也可以接着往下执行,根据异常参数做进一步处理。

handle(BiFunction<? super T, Throwable, ? extends U> fn)的参数是消费者接口,一个参数是任务执行结果,一个是异常信息,并且具有返回值。

// 例子
public static void main(String[] args) {
    // 快速创建线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture.supplyAsync(() -> 666, executor)
                .thenApply((res) -> {
                    if (Math.random() < 0.5) throw new RuntimeException("error");
                    return res;
                }).handle((res, ex) -> {
                    System.out.println("结果" + res + "(null表示之前出现异常导致结果无法传过来)");
                    return res == null ? -1 : res;
                }).thenApply((res) -> {
                    System.out.println("结果为" + res + "(-1表示之前出现异常,经过handler使得结果处理成-1)");
                    return res;
                }).exceptionally((ex) -> {
                    ex.printStackTrace();
                    return -1;
                });
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输出结果:
// 这是不出现异常的情况
结果666(null表示之前出现异常导致结果无法传过来)
结果为666(-1表示之前出现异常,经过handler使得结果处理成-1)

// 这是出现异常的情况
结果null(null表示之前出现异常导致结果无法传过来)
结果为-1(-1表示之前出现异常,经过handler使得结果处理成-1)
**/

可以看到通过handle类似于Java异常处理的finally,出现异常并不会像使用exceptionally那样中断后续的任务,而是继续执行,可以通过handle为之前出现异常无法获得的结果重新赋值(根据业务需求设置安全值之类的)。
`

6、 两组任务按顺序执行

thenCompose** 实现两组任务按前后顺序执行**​

public <U> CompletableFuture<U> thenCompose(
    Function<? super T, ? extends CompletionStage<U>> fn)

A.thenCompose(B)相当于任务A要排在任务B前面,即顺序的执行任务A、任务B。该方法的参数是函数式接口,函数式接口的参数是调用者的执行结果,返回值是另一个任务B。

public static void main(String[] args) {
    // 快速创建线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务A先执行结果为666");
            return 666;
        }, executor);

        actionA.thenCompose((res) ->  CompletableFuture.supplyAsync(() -> {
            System.out.println("任务B后执行结果加上333");
            return 333 + res;
        })).whenComplete((res, ex) -> System.out.println(res));
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输出结果:
    任务A先执行结果为666
    任务B后执行结果加上333
    999
**/

`

7、 两组任务谁快用谁

applyToEither** 比较两组任务执行速度,谁快消费谁的执行结果**​

public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn)

该方法用于比较两组任务的执行速度,谁先执行完就用谁的执行结果

传入参数说明:第一个参数传入的是另一个任务的执行内容,第二个参数传入的是最终这两个任务谁快返回谁的结果,并通过当前函数式接口进行接收和处理(使用函数式接口,有参且有返回值)。

// 例子
public static void main(String[] args) {
    // 快速创建线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("任务A等待久一点,执行结果为555");
            return 555;
        }, executor);

        actionA.applyToEither(CompletableFuture.supplyAsync(() -> {
            System.out.println("任务B很快,执行结果为666");
            return 666;
        }), (res) -> {
            System.out.println("最终使用的执行结果为" + res);
            return res;
        });
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}
/**
输出结果:
    任务B很快,执行结果为666
    最终使用的执行结果为666
    任务A等待久一点,执行结果为555
**/

除了applyToEither对任务最终结果进行获取并消费,并且具有返回值的方法外,还有两个类似的方法。

// 这个方法效果和上面的一样,比谁快拿谁的结果,不同的是这个方法只消费不具有返回值
public CompletableFuture<Void> acceptEither(
        CompletionStage<? extends T> other, Consumer<? super T> action)
// 这个方法效果和上面的一样,比谁快拿谁的结果,不同的是这个方法不消费结果也不具有返回值
public CompletableFuture<Void> runAfterEither(
        CompletionStage<?> other, Runnable action)

`

8、 两组任务完成后合并

thenCombine** 等待两组任务执行完毕后,合并两组任务的执行结果**​

 public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn)

该方法用于两组任务都完成后,将两组任务的执行结果一起交给当前方法的BiFunction处理。先完成的任务会等待后者任务完成。

传入参数说明:第一个参数传入的是另一个任务的执行内容,第二个参数传入的是带两个参数的函数式接口(第一个参数是任务1的执行结果,第二个参数是任务2的执行结果,具有返回值)。

// 例子
public static void main(String[] args) {
  // 快速创建线程池
  ExecutorService executor = Executors.newFixedThreadPool(4);
  try {
    CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {
      try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
      System.out.println("任务A等待久一点,执行结果为333");
      return 333;
    }, executor);

    CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> {
      System.out.println("任务B很快,执行结果为666");
      return 666;
    }, executor);

    actionA.thenCombine(actionB, (res1, res2) -> {
      System.out.println("最终使用的执行结果为" + (res1 + res2));
      return res1 + res2;
    });
  } catch (Exception e) {
    e.printStackTrace();
  }finally {
    executor.shutdown();
  }
}

/**
输出结果:
    任务B很快,执行结果为666
    任务A等待久一点,执行结果为333
    最终使用的执行结果为999
**/

除了thenCombine对任务最终结果进行获取并消费,并且具有返回值的方法外,还有两个类似的方法。

// 这个方法效果和上面的一样,获取合并结果,不同的是这个方法只消费不具有返回值
public <U> CompletableFuture<Void> thenAcceptBoth(
  CompletionStage<? extends U> other,
  BiConsumer<? super T, ? super U> action)
// 这个方法效果和上面的一样,获取合并结果,不同的是这个方法不消费结果也不具有返回值
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                                Runnable action)

`

9、 多任务组合

  1. allOf** 实现并行地执行多个任务,等待所有任务执行完成(无需考虑执行顺序)** ​
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

该方法可以实现并行地执行多个任务,适用于多个任务没有依赖关系,可以互相独立执行的,传入参数为多个任务,没有返回值。

allOf()方法会等待所有的任务执行完毕再返回,可以通过get()阻塞确保所有任务执行完毕

// 例子
public static void main(String[] args) {
    // 快速创建线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture<Void> actionA = CompletableFuture.runAsync(() -> {
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("任务A等待2秒后执行完毕");
        }, executor);

        CompletableFuture<Void> actionB = CompletableFuture.runAsync(() -> {
            System.out.println("任务B很快执行完毕");
        }, executor);

        CompletableFuture<Void> actionC = CompletableFuture.runAsync(() -> {
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("任务C等待1秒后执行完毕");
        }, executor);

        CompletableFuture<Void> actionD = CompletableFuture.runAsync(() -> {
            try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("任务D等待5秒后执行完毕");
        }, executor);

        CompletableFuture.allOf(actionA, actionB, actionC, actionD).get();
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输出结果:
    任务B很快执行完毕
    任务C等待1秒后执行完毕
    任务A等待2秒后执行完毕
    任务D等待5秒后执行完毕
**/
  1. anyOf** 实现并行地执行多个任务,只要有个一个完成的便会返回执行结果**​
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

该方法可以实现并行地执行多个任务,传入参数为多个任务,具有返回值。该方法不会等待所有任务执行完成后再返回结果,而是当有一个任务完成时,便会返回那个任务的执行结果

// 例子
public static void main(String[] args) {
    // 快速创建线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("任务A等待2秒后执行完毕");
            return 555;
        }, executor);

        CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务B很快执行完毕");
            return 666;
        }, executor);

        CompletableFuture<Integer> actionC = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("任务C等待1秒后执行完毕");
            return 999;
        }, executor);

        CompletableFuture<Integer> actionD = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("任务D等待5秒后执行完毕");
            return 888;
        }, executor);

        System.out.println("最先执行完的返回结果为" + CompletableFuture.anyOf(actionA, actionB, actionC, actionD).get());
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输出结果:
    任务B很快执行完毕
    最先执行完的返回结果为666
    任务C等待1秒后执行完毕
    任务A等待2秒后执行完毕
    任务D等待5秒后执行完毕
**/

`

二、一个使用CompletableFuture异步编排的例子

不需要关心例子中的业务内容,使用时按照自己业务的需求,对不同的需求调用不同API即可。

编写任务时主要关心以下几点:

① 是否需要消费之前任务的结果

② 是否需要返回结果给其他任务消费

③ 是否要求顺序执行(是否允许并行,有没有前置要求)

/**
 * 该方法用于获取单个商品的所有信息
 * 1. 商品的基本信息
 * 2. 商品的图片信息
 * 3. 商品的销售属性组合
 * 4. 商品的各种分类基本信息
 * 5. 商品的促销信息
 */
@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
  // 创建商品Vo通过各个任务去完善Vo的信息
  SkuItemVo skuItemVo = new SkuItemVo();
  
  // 获取商品基本信息 查询到后设置进Vo中,返回基本信息给后续任务消费 (使用自定义的线程池进行异步)
  CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
    SkuInfoEntity info = this.getById(skuId);
    skuItemVo.setInfo(info);
    return info;
  }, executor);

  // 获取商品的图片信息 获取后设置进Vo中,此处不需要消费图片信息,也不需要返回结果。所以使用runAsync即可
  CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
    List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId);
    skuItemVo.setImages(imagesEntities);
  }, executor);

  // 获取商品销售属性 因为要利用之前查询到的基本信息,但后续任务不需要消费销售属性(不需要返回结果),所以使用thenAcceptAsync消费之前的基本信息,不返回销售信息。
  CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {
    List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());
    skuItemVo.setSaleAttr(saleAttrVos);
  }, executor);

  // 获取商品各分类基本信息,同样要消费之前的基本信息,但无需返回,所以使用thenAcceptAsync即可
  CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
    SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
    skuItemVo.setDesc(spuInfoDescEntity);
  }, executor);

  // 获取商品的促销信息 这个也不需要消费之前任务的结果,也不需要返回结果。所以直接使用runAsync即可
  CompletableFuture<Void> seckillFuture = CompletableFuture.runAsync(() -> {
    R skuSeckilInfo = seckillFeignService.getSkuSeckilInfo(skuId);
    if (skuSeckilInfo.getCode() == 0) {
      SeckillSkuVo seckilInfoData = skuSeckilInfo.getData("data", new TypeReference<SeckillSkuVo>() {
      });
      skuItemVo.setSeckillSkuVo(seckilInfoData);

      if (seckilInfoData != null) {
        long currentTime = System.currentTimeMillis();
        if (currentTime > seckilInfoData.getEndTime()) {
          skuItemVo.setSeckillSkuVo(null);
        }
      }
    }
  }, executor);

  // 使用allOf()组合所有任务,并且使用get()阻塞,等待所有任务完成。
        // 补充:infoFuture不能放入allOf中,因为allOf是并行无序执行(需要多个条件是无依赖性的)的,当上面任务中有需要消费infoFuture的结果,所以需要先执行infoFuture。
  CompletableFuture.allOf(imageFuture,saleAttrFuture,descFuture,seckillFuture).get();
  
  // 最后返回商品Vo
  return skuItemVo;
}

手动事务的方式进行回滚,可以做一个List存储每个任务的事务,然后通过exceptionally对每个任务出错时进行异常处理抛出异常,最后在allof方法处通过try-catch的方式进行每个事务的回滚。具体代码如下图(A任务执行修改操作,B任务故意报错,已经测试过了,可以做到事务回滚)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0UMtEEmu-1688700220534)(image/image_VT6WYOovi3.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NlbuqvYM-1688700220534)(image/image_1FXC9NphNP.png)]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/727140.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【算法】区间合并类题目总结

文章目录 重叠区间&#xff1a;452. 用最少数量的箭引爆气球解法1——左边界排序解法2——右边界排序 无重叠区间&#xff1a;435. 无重叠区间解法1——左边界排序解法2——右边界排序 合并区间&#xff1a;56. 合并区间左边界排序这题为什么不能按照右边界排序&#xff1f;其实…

【数据结构与算法】图课后习题

题目 下面一共有七道有关图的课后习题&#xff0c;全部都是思路画图题并不是算法设计题故在此就一起列举出来了~ 1. 已知如下图所示的有向图&#xff0c;请回答下面几个问题 每个顶点的入/出度&#xff1b;邻接矩阵&#xff1b;邻接表&#xff1b;逆邻接表&#xff1b;强连通…

Hugging Face应用——图像识别

利用人工智能解决音频、视觉和语言问题。音频分类、图像分类、物体检测、问答、总结、文本分类、翻译等均有大量模型进行参考。 Eg1: 图像识别 图像分类是为整个图像分配标签或类别的任务。每张图像预计只有一个类别。图像分类模型将图像作为输入并返回有关图像所属类别的预测…

OPPO手机上怎么设置阴历或阳历生日提醒?

有不少手机用户现在使用的都是OPPO这个品牌的手机&#xff0c;并且绝大多数用户都表示OPPO手机是比较好用的&#xff0c;不过也有一部分用户在使用手机的过程中遇到了一些问题&#xff0c;例如不知道在OPPO手机上怎么设置阴历或阳历生日提醒&#xff0c;这应该怎么办呢&#xf…

基于matlab开发和评估停车场场景中的视觉定位算法(附源码)

一、前言 本示例展示了如何使用虚幻引擎模拟环境中的合成图像数据开发视觉定位系统。 获取基本事实以评估定位算法在不同条件下的性能是一项具有挑战性的任务。与使用高精度惯性导航系统或差分GPS等更昂贵的方法相比&#xff0c;不同场景下的虚拟仿真是一种经济高效的方法来获…

数字化时代,到底如何认识商业智能BI?

数字化时代&#xff0c;商业智能BI对于企业的落地应用有着巨大价值&#xff0c;逐渐成为了现代企业信息化、数字化转型中的基础建设。 我曾经看到有人在讨论过商业智能BI的部署对于企业是否有实际意义&#xff0c;现在市场的数据已经证明商业智能BI在商业世界中&#xff0c;在…

使用Docker安装RabbitMQ并实现入门案例“Hello World”

RabbitMQ官方文档&#xff1a;RabbitMQ Tutorials — RabbitMQ 一、RabbitMQ安装&#xff08;Linux下&#xff09; 你可以选择原始的方式安装配置&#xff0c;也可以使用docker进行安装&#xff0c;方便快捷&#xff01; 1. 安装docker 没有docker的先安装一下docker&#x…

谷歌和edge浏览器升级到94及以上版本后反复提示安装pageoffice客户端

原因&#xff1a;Chrome开发团队以网络安全为由&#xff0c;强推ssl证书&#xff0c;希望所有部署在公网的网站&#xff0c;全部改用https访问&#xff0c;所以最新的谷歌和edge升级到94版本后对公网上的http请求下的非同域的http请求进行了拦截&#xff0c;于是就出现了目前遇…

一分钟告诉你国内和国外的ai绘画软件哪个好

前几天&#xff0c;我在一次聚会上偶然听到朋友们谈论起创作ai绘画的问题&#xff0c;大家都很热衷于用国内的ai绘画软件来生成自己喜欢的艺术作品&#xff0c;但又不知道国内和国外的ai绘画软件哪个好。正当我们陷入无尽的思考中时&#xff0c;其中一位朋友突然站出来说&#…

【计算机网络】1.5——计算机网络的体系结构

计算机网络的体系结构 概述 计算机网络的体系结构是计算机网络及其构建所应完成功能的精确定义 考题 不属于网络体系结构所描述的内容的是 A、网络的层次 B、每层使用的协议 C、协议的内部实现细节 D、每层必须完成的功能 这些功能的「实现细节」&#xff0c;是遵守这种体系…

SPEC CPU 2017 Ubuntu 20.04 LTS cpu2017-1_0_5.iso 安装、测试 单核成绩 笔记

环境 $ gcc -v Using built-in specs. COLLECT_GCCgcc COLLECT_LTO_WRAPPER/usr/lib/gcc/x86_64-linux-gnu/11/lto-wrapper OFFLOAD_TARGET_NAMESnvptx-none:amdgcn-amdhsa OFFLOAD_TARGET_DEFAULT1 Target: x86_64-linux-gnu Configured with: ../src/configure -v --with-pk…

vue3中的computed和watch

一、computed 1. vue2和vue3中计算属性用法对比 Vue2中的计算属性 Vue2中的计算属性是通过在Vue实例的computed选项中定义函数来创建的。计算属性会根据依赖的响应式属性进行缓存&#xff0c;只有当依赖的属性发生变化时&#xff0c;计算属性才会重新求值。 举个例子&#x…

【环境配置】Conda报错 requests.exceptions.HTTPError

问题&#xff1a; conda 创建新的虚拟环境时报错 Collecting package metadata (current_repodata.json): done Solving environment: done# >>>>>>>>>>>>>>>>>>>>>> ERROR REPORT <<<<<<…

OpenCVForUnity(二)基本图像容器Mat

这里写目录标题 前言Mat指针引用说明存储的方式如何创建一个Mat对像 前言 今天继续学习OpenCV的基本单位Mat. 学计算机的同学都知道在计算机中,你所看到的一切其都是数据的呈现.期最底层的本质皆是0和1的构成的.当然图片,视频等等也不例外.我们用相机,扫描仪核磁共振成像等方式…

OpenAI深夜放大招,GPT4 API全面开放并弃用一系列旧模型

GPT-4 API 现已向所有付费 OpenAI API 客户开放。GPT-3.5 Turbo、DALLE 和 Whisper API 现已普遍可用&#xff0c;我们宣布了一些旧型号的弃用计划&#xff0c;这些型号将于 2024 年初退役。 ✅ GPT4 API面向付费用户开放&#xff0c;不需要再额外申请,并且具有8K上下文&#…

bash文件输入到txt文件中

bash test_bct.sh >> test.txt结果如下

WeeChat 4.0.0 正式发布

导读WeeChat (Wee Enhanced Environment for Chat) 是一款自由开源的轻量级 IRC 客户端&#xff0c;具有高度的可定制特性&#xff0c;并且可以通过脚本进行扩展。 WeeChat 支持大多数的平台和操作系统&#xff0c;例如 Linux、BSD、macOS、Debian GNU/Hurd、HP-UX、Solaris、…

全国产化适配低代码平台,政企数字化的不二选择

编者按&#xff1a;在国家政策及战略方向的指导下&#xff0c;信创产业已成为奠定中国未来发展的重要数字基础&#xff0c;而国产化则可以解决核心技术关键被“卡脖子”的问题。另一方面&#xff0c;低代码平台能够为企业加速交付业务应用&#xff0c;降低运营成本&#xff0c;…

插入排序(思路+代码)

变量&#xff1a; index &#xff1a;代表待插入数的前一个数的下标&#xff0c;依次往回找&#xff0c;找到找到结果。 indexvalue&#xff1a;代表待插入元素的值&#xff0c;找到位置之后往index1的位置插入元素 代码&#xff1a; import java.util.Arrays;public class …

【库表操作】

一、数据库Market中创建表customers 1、创建数据库 #创建数据库 mysql> create database Market; mysql> use Market;2、创建数据表 #创建数据表 mysql> create table customers(-> c_num int(11) primary key auto_increment,-> c_name varchar(50),-> c_…