CompletableFuture异步回调

news2024/11/17 1:52:15

CompletableFuture异步回调

    • CompletableFuture
      • Future模式
      • CompletableFuture详解
        • 1.CompletableFuture的UML类关系
        • 2.CompletionStage接口
        • 3.使用runAsync和supplyAcync创建子任务
        • 4.设置子任务回调钩子
        • 5.调用handle()方法统一处理异常和结果
        • 6.线程池的使用
      • 异步任务的串行执行
        • thenApply()方法
        • thenRun()方法
        • thenAccept()方法
        • thenCompose()方法
        • 四个任务串行方法的区别
      • 异步任务的合并执行
        • thenCombine()方法
        • runAfterBoth()方法
        • thenAcceptBoth()方法
        • allOf()等待所有的任务结束
      • 泡茶喝实例

CompletableFuture

Future模式

  Future模式是高并发设计与开发过程中常见的设计模式,它的核心思想是异步调用。对于Future模式来说,它不是立即返回我们所需要的数据,但是它会返回一个契约(或异步任务),将来我们可以凭借这个契约(或异步任务)获取需要的结果。

  在进行传统的RPC(远程调用)时,同步调用RPC是一段耗时的过程。当客户端发出RPC请求后,服务端完成请求处理需要很长的一段时间才会返回,这个过程中客户端一直在等待,直到数据返回后,再进行其他任务的处理。现有一个Client同步对三个Server分别进行一次RPC调用,具体如下图:

image-20231004124526298

  假设一次远程调用的时间为500毫秒,则一个Client同步对三个Server分别进行一次RPC调用的总时间需要耗费1500毫秒。可以使用Future模式对其进行改造,将同步的RPC调用改为异步并发的RPC调用,一个Client异步并发对三个Server分别进行一次RPC调用,如下图:

image-20231004124912843

  假设一次远程调用的时间为500毫秒,则一个Client异步并发对三个Server分别进行一次RPC调用的总时间只要耗费500毫秒。使用Future模式异步并发地进行RPC调用,客户端在得到一个RPC地返回结果前并不急于获取该结果,而是充分利用等待时间去执行其他地耗时操作(如其他RPC调用),这就是Future模式地核心所在。

  Java的Future实现类并没有支持异步回调,仍然需要主动获取耗时任务的结果,而Java8的CompletableFuture组件实现了异步回调模式。

  在Java中,Future只是一个泛型接口,位于java.util.concurrent包下,其中定义了5个方法,主要包括如下几个功能:

  • 取消异步执行中的任务
  • 判断任务是否被取消
  • 判断异步任务是否执行完成
  • 获取异步任务完成后的执行结果
public interface Future<V> {
  //取消异步执行
  boolean cancel(boolean mayInterruptIfRunning);
  //获取异步任务的取消状态
  boolean isCancelled();
  //判断异步任务是否执行完成
  boolean isDone();
  //获取异步任务完成后的执行结果
  V get() throws InterruptedException, ExecutionException;
  //设置时限,获取异步任务完成后的执行结果
  V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

  总体来说,Future是一个对异步任务进行交互、操作的接口。但是Future仅仅是一个接口,通过它没有办法直接对异步任务操作,JDK提供了一个默认的实现类–FutureTask。

CompletableFuture详解

  CompletableFuture是JDK1.8引入的实现类,该类实现了FutureCompletionStage两个接口。该类的实例作为一个异步任务,可以在自己异步执行完成之后触发一些其他的异步任务,从而达到异步回调的效果。

1.CompletableFuture的UML类关系

image-20231004180648069

  这里的CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会进入另一个阶段。一个阶段可以理解为一个子任务,每一个子任务会包装一个Java函数式接口实例,表示该子任务所要执行的操作。

2.CompletionStage接口

  CompletionStage代表某个同步或异步计算的一个阶段,或者一系列异步任务中的一个子任务(或者阶段性任务)。

  每个CompletionStage子任务所包装的可以是一个FunctionConsumer或者Runnable函数式接口实例。这三个常用的函数式接口的特点如下:

  (1)Function

  Function接口的特点是:有输入、有输出。包装了Function实例的CompletionStage子任务需要一个输入参数,并会产生一个输出结果到下一步。

  (2)Runnable

  Runnable接口的特点是:无输入、无输出。包装了Runnable实例的CompletionStage子任务既不需要任何输入参数,又不会产生任何输出。

  (3)Consumer

  Consumer接口的特点是:有输入、无输出。包装了Consumer实例的CompletinStage子任务需要一个输入参数,但不会产生任何输出。

  多个CompletionStage构成了一条任务流水线,一个环节执行完成了可以将结果移交给下一个环节(子任务)。多个CompletionStage子任务之间可以使用链式调用,下面是一个例子:

oneStage.thenApply(x->square(x))
  								.thenAccept(y->System.out.println(y))
  								.thenRun(()->System.out.println())

对上例的解释如下:

oneStage是一个CompletionStage子任务,这是一个前提。

x->square(x)是一个Function类型的Lambda表达式,被thenApply()方法包装成了一个CompletionStage子任务,该子任务需要接收一个参数x,然后输出一个结果----x的平方值。

y->System.out.println(y)是一个Consumer类型的Lambda表达式,被thenAccept()方法包装成了一个CompletionStage子任务,该子任务需要消耗上一个子任务的输出值,但是此子任务并没有输出。

()->System.out.println()是一个Runnable类型的Lambda表达式,被thenRun()方法包装成了一个CompletionStage子任务,既不消耗上一个子任务的输出,又不产生结果。

  CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另一个阶段。虽然一个子任务可以触发其他子任务,但是并不能保证后续子任务的执行顺序。

3.使用runAsync和supplyAcync创建子任务

  CompletionStage子任务的创建是通过CompletebleFuture完成的。CompletableFuture类提供了非常强大的Future的扩展功能来帮助我们简化异步编程的复杂性,提供了函数式编程的能力来帮我们通过回调的方式处理计算结果,也提供了转换和组合CompletionStage()的方法。

  CompletableFuture定义了一组方法用于创建CompletionStage子任务(或者阶段性任务),基础的方法如下:

  	//子任务包装一个Supplier实例,并调用ForkJoinPool.commonPool()线程来执行
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
   //子任务包装一个Supplier实例,并使用指定的executor线程池来执行
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }
		//子任务包装一个Runnable实例,并调用ForkJoinPool.commonPool()线程来执行
    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }
		//子任务包装一个Runnable实例,并使用指定的executor线程池来执行
    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }

  在使用CompletableFuture创建CompletionStage子任务时,如果没有指定Executor线程池,在默认情况下CompletionStage会使用公共的ForkJoinPool线程池。

  两个创建CompletionStage子任务的示例如下:

public class CompletableFutureDemo {
    //创建一个无消耗值(无输入值)、无返回值的异步子任务
    public static void runAsyncDemo() throws Exception {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
                System.out.println("run end...");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        //等待异步任务执行完成,限时等待2秒
        future.get(2, TimeUnit.SECONDS);
    }

    //创建一个无消耗值(无输入值)、有返回值的异步子任务
    public static void supplyAsyncDemo() throws Exception {
        CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
            long start = System.currentTimeMillis();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("run end...");
            return System.currentTimeMillis() - start;
        });
        //等待异步任务执行完成,限时等待2秒
        long time = future.get(2, TimeUnit.SECONDS);
        System.out.println("异步执行耗时(秒):" + time / 1000);
    }

    public static void main(String[] args) throws Exception {
        runAsyncDemo();
        supplyAsyncDemo();
    }
}
4.设置子任务回调钩子

  可以为CompletionStage子任务设置特定的回调钩子,当计算结果完成或者抛出异常的时候,执行这些特定的回调钩子。

  设置子任务回调钩子的主要函数如下:

//设置子任务完成时的回调钩子
public CompletableFuture<T> whenComplete(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(null, action);
}
//设置子任务完成时的回调钩子,可能不在同一线程执行
public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(asyncPool, action);
}
//设置子任务完成时的回调钩子,提交给线程池executor执行
public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action, Executor executor) {
    return uniWhenCompleteStage(screenExecutor(executor), action);
}
//设置异常处理的回调钩子
public CompletableFuture<T> exceptionally(
        Function<Throwable, ? extends T> fn) {
        return uniExceptionallyStage(fn);
}

下面是一个CompletionStage子任务设置完成钩子和异常钩子的简单示例:

public class CompletableFutureDemo1 {
    public static void whenCompleteDemo() throws Exception {
        //创建异步任务
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                //模拟执行一秒
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName()+":抛出异常");
            throw new RuntimeException(Thread.currentThread().getName()+":发生异常");
        });
        //设置异步任务执行完成后的回调钩子
        future.whenComplete(new BiConsumer<Void, Throwable>() {
            @Override
            public void accept(Void unused, Throwable throwable) {
                System.out.println(Thread.currentThread().getName()+":执行完成!");
            }
        });

        //设置异步任务发生异常后的回调钩子
        future.exceptionally(new Function<Throwable, Void>() {
            @Override
            public Void apply(Throwable throwable) {
                System.out.println(Thread.currentThread().getName()+":执行失败!" + throwable.getMessage());
                return null;
            }
        });
        //获取异步任务的结果
        future.get();
    }

    public static void main(String[] args) throws Exception {
        whenCompleteDemo();
    }
}

image-20231004193257898

  调用cancel()方法取消CompletableFuture时,任务被视为异常完成,completeExceptionally()方法所设置的异常回调钩子也会被执行。

  如果没有设置异常回调钩子,发生内部异常时会有两种情况发生:

  (1)在调用get()和get(long,TimeUnit)方法启动任务时,如果遇到内部异常,get()方法就会抛出ExecutionException(执行异常)。

  (2)在调用join()和getNow(T)启动任务时(大多数情况下都是如此),如果遇到内部异常,join()和getNow(T)方法就会抛出CompletionException。

5.调用handle()方法统一处理异常和结果

  除了分别通过whenComplete、exceptionally设置完成钩子、异常钩子之外,还可以调用handle()方法统一处理结果和异常。

  handle方法有三个重载版本如下:

//在执行任务的同一个线程中处理异常和结果
public <U> CompletableFuture<U> handle(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(null, fn);
}
//可能不在执行任务的同一个线程中处理异常和结果
public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(asyncPool, fn);
}
//在指定线程池executor中处理异常和结果
public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
    return uniHandleStage(screenExecutor(executor), fn);
}

  handle()方法的示例代码如下:

public class CompletableDemo2 {
    public static void handleDemo() throws Exception {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                //模拟执行1秒
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName()+":抛出异常");
            throw new RuntimeException(Thread.currentThread().getName()+":发生异常");
        });

        //统一处理异常和结果
        future.handle(new BiFunction<Void, Throwable, Void>() {
            @Override
            public Void apply(Void unused, Throwable throwable) {
                if (throwable == null) {
                    System.out.println(Thread.currentThread().getName()+":没有发生异常!");
                } else {
                    System.out.println(Thread.currentThread().getName()+":sorry,发生了异常!");
                }
                return null;
            }
        });
        future.get();
    }
    public static void main(String[] args) throws Exception {
        handleDemo();
    }
}

image-20231004193956636

6.线程池的使用

  默认情况下,通过静态方法runAsync()、supplyAsync()创建的CompletableFuture任务会使用公共的ForkJoinPool线程池,默认的线程数是CPU的核数。

  问题是,如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的IO操作,就会导致线程池中的所有线程都阻塞在IO操作上,造成线程饥饿,进而影响整个系统的性能。所以,强烈建议根据不同的业务类型创建不同的线程池,以避免相互干扰

public class CompletableDemo3 {
    public static void threadPoolDemo() throws Exception {
        //混合线程池
        ExecutorService pool = Executors.newFixedThreadPool(10);
        CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName()+":run begin...");
            long start = System.currentTimeMillis();
            try {
                Thread.sleep(1000);//模拟执行1秒
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName()+":run end...");
            return System.currentTimeMillis() - start;
        }, pool);
        //等待异步任务执行完成,限时等待2秒
        Long time = future.get(2, TimeUnit.SECONDS);
        System.out.println(Thread.currentThread().getName()+":异步执行耗时(秒):" + time / 1000);
    }
    public static void main(String[] args) throws Exception {
        threadPoolDemo();
    }
}

image-20231004194419567

异步任务的串行执行

  如果两个异步任务需要串行(一个任务依赖另一个任务)执行,可以通过CompletionStage接口的thenApply()thenAccept()thenRun()thenCompose()四个方法来实现。

thenApply()方法
//后一个任务与前一个任务在同一个线程中执行
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) 
//后一个任务与前一个任务不在同一个线程中执行
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) 
//后一个任务在指定的executor线程池中执行
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

   thenApply的三个重载版本有一个共同的参数fn,该参数表示要串行执行的第二个异步任务,它的类型为Function。fn的类型声明涉及两个泛型参数,具体如下:

  • 泛型参数T:上一个任务所返回结果的类型。
  • 泛型参数U:当前任务的返回值类型。

调用thenApply分两步计算(10+10)*2:

//调用thenApply分两步计算(10+10)*2
public class ThenApplyDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
            @Override
            public Long get() {
                long firstStep = 10L + 10L;
                System.out.println(Thread.currentThread().getName()+":firstStep out is " + firstStep);
                return firstStep;
            }
        }).thenApplyAsync(new Function<Long, Long>() {
            @Override
            public Long apply(Long firstStepOut) {
                long secondStep = firstStepOut * 2;
                System.out.println(Thread.currentThread().getName()+":secondStep out is " + secondStep);
                return secondStep;
            }
        });

        Long result = future.get();
        System.out.println(Thread.currentThread().getName()+":out is " + result);
    }
}

image-20231004195110503

thenRun()方法

  thenRun()方法与thenApply()方法不同的是,不关心任务的处理结果。只要前一个任务执行完成,就开始执行后一个串行任务。

//后一个任务与前一个任务在同一个线程中执行
public CompletableFuture<Void> thenRun(Runnable action)
//后一个任务与前一个任务不在同一个线程中执行
public CompletableFuture<Void> thenRunAsync(Runnable action)
//后一个任务在指定的executor线程池中执行
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor) 

  从方法的声明可以看出,thenRun()方法同thenApply()方法类似,不同的是前一个任务处理完成后,thenRun()并不会把计算的结果传给后一个任务,而且后一个任务也没有结果输出。

thenRun系列方法中的action参数是Runnable类型的,所以thenRun()既不能接收参数也不支持返回值。

thenAccept()方法

  调用thenAccept()方法时后一个任务可以接收(或消费)前一个任务的处理结果,但是后一个任务没有结果输出。

//后一个任务与前一个任务在同一个线程中执行
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
//后一个任务与前一个任务不在同一个线程中执行
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) 
//后一个任务在指定的executor线程池中执行
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
thenCompose()方法

  thenCompose方法在功能上与thenApply()、thenAccept()、thenRun()一样,可以对两个任务进行串行的调度操作,第一个任务操作完成时,将它的结果作为参数传递给第二个任务。

public <U> CompletableFuture<U> thenCompose(
    Function<? super T, ? extends CompletionStage<U>> fn) 
public <U> CompletableFuture<U> thenComposeAsync(
    Function<? super T, ? extends CompletionStage<U>> fn) 

public <U> CompletableFuture<U> thenComposeAsync(
    Function<? super T, ? extends CompletionStage<U>> fn,
    Executor executor) 

  thenCompose()方法要求第二个任务的返回值是一个CompletionStage异步实例。因此,可以调用CompletableFuture.supplyAsync()方法将第二个任务所要调用的普通异步方法包装成一个CompletionStage实例。

  这里使用thenCompose()分两步计算(10+10)*2

public class ThenComposeDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
            @Override
            public Long get() {
                long firstStep = 10 + 10;
                System.out.println(Thread.currentThread().getName()+":first out is " + firstStep);
                return firstStep;
            }
        }).thenCompose(new Function<Long, CompletionStage<Long>>() {
            @Override
            public CompletionStage<Long> apply(Long firstStepOut) {
                //将第二个任务所要调用的普通异步方法包装成一个CompletionState异步实例
                return CompletableFuture.supplyAsync(new Supplier<Long>() {
                    //两个任务所要调用的普通异步方法
                    @Override
                    public Long get() {
                        long secondStep = firstStepOut * 2;
                        System.out.println(Thread.currentThread().getName()+":second Step out is " + secondStep);
                        return secondStep;
                    }
                });
            }
        });
        Long result = future.get();
        System.out.println(Thread.currentThread().getName()+":out is " + result);
    }
}

image-20231004200557549

四个任务串行方法的区别

  thenApply()、thenRun()、thenAccept()这三个方法的不同之处主要在于其核心参数fn、action、consumer的类型不同,分别为Function<T,R>、Runnable、Consumer<? super T>类型。

  但是,thenCompose()方法与thenApply()方法有本质的不同:

  • thenCompose()的返回值是一个新的CompletionStage实例,可以持续用来进行下一轮CompletionStage任务的调度。
  • thenApply()的返回值就是第二个任务的普通异步方法的执行结果,它的返回类型与第二不执行的普通异步方法的返回类型相同,通过thenApply()所返回的值不能进行下一轮CompletionStage链式(或者流式)调用。

异步任务的合并执行

  如果某个任务同时依赖另外两个异步任务的执行结果,就需要对另外两个异步任务进行合并。以泡茶为例,“泡茶喝”任务需要对“烧水”任务与“清洗”任务进行合并。

thenCombine()方法

  thenCombine()会在两个CompletionStage任务都执行完成后,把两个任务的结果一起交给thenCombine()来处理。

//合并代表第二步任务(参数other)的CompletionStage实例,返回第三步任务的CompletionStage
public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn)
//不一定在同一个线程中执行第三步任务的CompletionStage实例
public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn)
//第三步任务的CompletionStage实例在指定的executor线程池中执行
public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn, Executor executor)

  thenCombine()方法的调用者为第一步的CompletionStage实例,该方法的第一个参数为第二步的CompletionStage实例,该方法的返回值为第三步的CompletioStage实例。在逻辑上,thenCombine()方法的功能是将第一步、第二步的结果合并到第三步上。

  thenCombine()系列方法有两个核心参数:

  • other参数:表示待合并的第二部任务的CompletionStage实例。
  • fn参数:表示第一个任务和第二个任务执行完成后,第三步需要执行的逻辑。

  fn参数的类型为BiFunction<? super T,? super U,? extends V>,该类型的声明涉及三个泛型参数:

  • T:表示第一个任务所返回结果的类型
  • U:表示第二个任务所返回结果的类型
  • V:表示第三个任务所返回结果的类型

  调用thenCombine分三步计算(10+10)*(10+10):

//使用thenCombine()计算(10+10)*(10+10)
public class ThenCombineDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                Integer firstStep = 10 + 10;
                System.out.println(Thread.currentThread().getName()+":firstStep out is " + firstStep);
                return firstStep;
            }
        });
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                Integer secondStep = 10 + 10;
                System.out.println(Thread.currentThread().getName()+":secondStep out is " + secondStep);
                return secondStep;
            }
        });
        CompletableFuture<Integer> future3 = future1.thenCombine(future2, new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer step1Out, Integer step2Out) {
                return step1Out * step2Out;
            }
        });
        Integer result = future3.get();
        System.out.println(Thread.currentThread().getName()+":out is " + result);
    }
}

image-20231004201828595

runAfterBoth()方法

  funAfterBoth()方法不关心每一步任务的输入参数和处理结果。

//合并第二步任务的CompletionStage实例,返回第三步任务的CompletionStage
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                            Runnable action)
//不一定在同一个线程中执行第三步任务的CompletionStage实例
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action)
//第三步任务的CompletionStage实例在指定的executor线程池中执行
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action,
                                                 Executor executor) 
thenAcceptBoth()方法

  该方法是对runAfterBoth()方法和thenCombine()方法的特点进行了折中,调用thenAcceptBoth()方法,第三个任务可以接收其合并过来的第一个任务、第二个任务的处理结果,但是第三个任务(合并任务)却不能返回结果。

public <U> CompletableFuture<Void> thenAcceptBoth(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action, Executor executor)
allOf()等待所有的任务结束

  CompletionStage接口的allOf()会等待所有的任务结束,以合并所有的任务。thenCombine()只能合并两个任务,如果需要合并多个异步任务,那么可以调用allOf()

//allOF()会等待所有的任务结束,以合并所有的任务
public class AllOfDemo {
    public static void main(String[] args) {
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName()+":模拟异步任务1"));
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName()+":模拟异步任务2"));
        CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName()+":模拟异步任务3"));
        CompletableFuture<Void> future4 = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName()+":模拟异步任务4"));

        CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3, future4);
        all.join();
        System.out.println(Thread.currentThread().getName()+":所有异步任务都已经执行完毕");
    }
}

image-20231004202627949

CompletableFuture中的方法太多了,这里只介绍常用的

泡茶喝实例

  为了异步执行整个排查流程,分别设计三个线程:泡茶线程(MainThread,主线程)、烧水线程(HotWaterThread)和清洗线程(WashThread)。

  • 泡茶线程的工作是:启动清洗线程、启动烧水线程,等清洗、烧水的工作完成后,泡茶喝;

  • 清洗线程的工作是:洗茶壶、洗茶杯;

  • 烧水线程的工作是:洗好水壶,灌上凉水,放在火上,一直等水烧开。

  我们使用CompletableFuture实现整个泡茶喝程序。我们分3个任务:

  任务1负责洗水壶、烧开水

  任务2负责洗茶壶、洗茶杯和拿茶叶

  任务3负责泡茶

public class DrinkTea {
    public static final int SLEEP_GAP = 3000;    //等待3秒

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //任务1:洗水壶->烧开水
        CompletableFuture<Boolean> hotJob = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + ":洗好水壶");
            System.out.println(Thread.currentThread().getName() + ":烧开水");
            //线程睡眠一段时间,代表烧水中
            try {
                Thread.sleep(SLEEP_GAP);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName() + ":水开了");
            return true;
        });
        //任务2:洗茶壶->洗茶杯->拿茶叶
        CompletableFuture<Boolean> washJob = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + ":洗茶壶");
            System.out.println(Thread.currentThread().getName() + ":洗茶杯");
            System.out.println(Thread.currentThread().getName() + ":拿茶叶");
            //线程睡眠一段时间,代表清洗中
            try {
                Thread.sleep(SLEEP_GAP);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName() + ":洗完了");
            return true;
        });
        //任务3:任务1和任务2完成后执行泡茶
        CompletableFuture<String> drinkJob = hotJob.thenCombine(washJob, new BiFunction<Boolean, Boolean, String>() {
            @Override
            public String apply(Boolean hotOK, Boolean washOK) {
                if (hotOK && washOK) {
                    System.out.println(Thread.currentThread().getName() + ":泡茶喝,茶喝完");
                    return "茶喝完了";
                }
                return "没有喝到茶";
            }
        });
        //等待任务3执行结果
        System.out.println(Thread.currentThread().getName() + ":" + drinkJob.get());

    }
}

image-20231004203800659

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1060317.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【做题笔记】多项式/FFT/NTT

HDU1402 - A * B Problem Plus 题目链接 大数乘法是多项式的基础应用&#xff0c;其原理是将多项式 f ( x ) a 0 a 1 x a 2 x 2 a 3 x 3 ⋯ a n x n f(x)a_0a_1xa_2x^2a_3x^3\cdotsa_nx^n f(x)a0​a1​xa2​x2a3​x3⋯an​xn中的 x 10 x10 x10&#xff0c;然后让大数的…

20230922 比赛总结

反思 A 考场降智&#xff0c;没想到拆分成 2 α 5 β x 2^{\alpha}5^{\beta}x 2α5βx 的形式&#xff0c;一直在卡精度&#xff08;thx anti) B 又是随机题&#xff0c;又是 b l bl bl 题&#xff0c;差点又被区分了 C 我是 s b sb sb&#xff0c;排序顺序有点小问题…

concrt140.dll丢失怎么恢复?教你三种最简单的解决方法

今天早上&#xff0c;当我打开电脑时&#xff0c;突然看到一个提示窗口&#xff0c;显示找不到 concrt140.dll 文件。我一下子懵了&#xff0c;不知道这是怎么回事&#xff0c;也不知道如何解决这个问题。于是&#xff0c;我开始了寻找答案的旅程。首先&#xff0c;我了解到 co…

深入理解浏览器渲染原理

文章目录 浏览器是如何渲染页面的渲染流程解析HTML&#xff08;构建DOM树&#xff09;解析过程中遇到JS代码 样式计算1. 解析CSS代码2. 转换样式表中的属性值&#xff0c;使其标准化3. 计算DOM树中每个节点的具体样式CSS继承规则CSS层叠规则 布局分层分层update layer tree 绘制…

博物馆藏品管理系统-美术馆藏品管理系统

一、项目背景 文物作为前史留存下来最为珍贵的遗物&#xff0c;具有非常高的科学价值和艺术价值&#xff0c;博物馆的存在便是为了保存这些珍贵的前史文化遗产&#xff0c;所以对博物馆的建造必定要重视品质问题。对博物馆的库存办理工作也必定要注意细节&#xff0c;不能出一…

【LeetCode热题100】--20.有效的括号

20.有效的括号 使用栈&#xff1a; class Solution {public boolean isValid(String s) {Stack<Character> stack new Stack<>();int num s.length();for(int i 0;i<num;i){char c s.charAt(i);if(c(||c[||c{){stack.push(c);}else if(stack.isEmpty() ||c…

矩阵求导中的分子布局和分母布局

1.求偏导的自变量的符号区别 使用标量、向量和矩阵总共有九种可能性。请注意&#xff0c;当我们考虑每个自变量和因变量中更多数量的分量时&#xff0c;我们可能会留下非常多的可能性。下表收集了最能以矩阵形式最整齐地组织的六种导数。 在这里&#xff0c;我们使用了最一般…

AI配套的技术: 矢量数据库的概念

一、说明 随着人工智能的快速采用和围绕大型语言模型发生的创新&#xff0c;我们需要在所有这些的中心&#xff0c;能够获取大量数据&#xff0c;将其上下文化&#xff0c;处理它&#xff0c;并使其能够有意义地搜索。 为原生整合生成式 AI 功能而构建的生成式 AI 流程和应用程…

Java+Redis:布隆过滤器,打造高效数据过滤神器!

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是尘缘&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f449;点击这里&#xff0c;就可以查看我的主页啦&#xff01;&#x1f447;&#x…

基于蝴蝶优化的BP神经网络(分类应用) - 附代码

基于蝴蝶优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码 文章目录 基于蝴蝶优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码1.鸢尾花iris数据介绍2.数据集整理3.蝴蝶优化BP神经网络3.1 BP神经网络参数设置3.2 蝴蝶算法应用 4.测试结果&#xff1a;5.M…

IIC学习笔记(参考小梅哥教程)

IIC: inter-integrated circuit bus ,即 集成电路总线&#xff0c;串行通信&#xff0c;多主从架构&#xff0c;半双工&#xff08;对讲机&#xff09;&#xff0c;小数据量场合&#xff0c;短距离传输。 速率&#xff1a;100kb/s 、 400kb/s 、 3.4Mkb/s 传输单位&#xff1…

《C和指针》笔记31:多维数组的数组名、指向多维数组的指针、作为函数参数的多维数组

文章目录 1. 指向多维数组的数组名2. 指向多维数组的指针3. 作为函数参数的多维数组 1. 指向多维数组的数组名 我们知道一维数组名的值是一个指针常量&#xff0c;它的类型是“指向元素类型的指针”&#xff0c;它指向数组的第1个元素。那么多维数组的数组名代表什么呢&#x…

【C++】你看懂C++的类和对象了么

目录 类 默认成员函数 构造函数 析构函数 拷贝构造函数 赋值运算符重载 运算符重载 赋值运算符重载 前置和后置重载 const成员 取地址及const取地址操作符重载 再谈构造函数 构造函数体赋值 初始化列表 explicit关键字 static成员 友元 友元函数 友元类 内…

Springboot+vue的时间管理系统(有报告)。Javaee项目,springboot vue前后端分离项目。

演示视频&#xff1a; Springbootvue的时间管理系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot vue前后端分离项目。 项目介绍&#xff1a; 本文设计了一个基于Springbootvue的前后端分离的时间管理系统&#xff0c;采用M&#xff08;model&#xff0…

计算机毕设 大数据工作岗位数据分析与可视化 - python flask

文章目录 0 前言1 课题背景2 实现效果3 项目实现3.1 概括 3.2 Flask实现3.3 HTML页面交互及Jinja2 4 **完整代码**5 最后 0 前言 &#x1f525; 这两年开始毕业设计和毕业答辩的要求和难度不断提升&#xff0c;传统的毕设题目缺少创新和亮点&#xff0c;往往达不到毕业答辩的要…

使用VSCode插件开发Hyperledger Fabric智能合约(链码)

背景 开发Fabric链码对于开发者而言步骤繁琐&#xff1a;需要部署节点、安装链码、重启网络等操作。当前VSCode中的插件“Hyperledger Fabric Debugger”可以帮助我们迅速开发智能合约。 使用步骤 安装插件 在VSCode中安装Hyperledger Fabric Debugger插件 打开要开发链码的…

【LeetCode热题100】--35.搜索插入位置

35.搜索插入位置 使用二分查找&#xff1a; class Solution {public int searchInsert(int[] nums, int target) {int low 0,high nums.length -1;while(low < high){//注意每次循环完都要计算midint mid (low high)/2;if(nums[mid] target){return mid;}if(nums[mid]…

SE、CBAM、ECA 、CA注意力机制

文章目录 1. SE (Squeeze-and-Excitation)2. CBAM (Convolutional Block Attention Module)3. ECA (Efficient Channel Attention)4. CA (Coordinate Attention) 1. SE (Squeeze-and-Excitation) SENet是通道注意力机制的典型实现 对于SENet而言&#xff0c;其重点是获得输入进…

螺杆支撑座有哪些品牌?

螺杆支撑座是机械设备中重要的支撑部件&#xff0c;用于固定和支撑螺杆&#xff0c;以确保机械设备的稳定性和精度。以下是一些生产螺杆支撑座的品牌以及它们的特点&#xff1a; 1、NSK&#xff1a;提供各种高质量的轴承和机械部件&#xff0c;他们的螺杆支撑座采用先进的制造技…

2023.9.26 IO 文件操作详解

目录 文件 文件路径 文件类型 Java 文件操作 文件系统操作 文件内容操作 字节流 InputStream OutputStream 字符流 Reader Writer 补充 close 的必要性 Scanner 的基本了解 文件 当前指硬盘上的文件和文件夹相对于 变量 在内存中&#xff0c;文件 则是在硬盘上 …