【Java8新特性--->异步处理】CompletableFuture

news2025/1/8 18:53:15

一、引入
假设一个商品详情页需要以下操作:

查询展示商品的基本信息耗时:0.5s
查询展示商品的销售信息耗时:0.7s
查询展示商品的图片信息耗时:1s
查询展示商品销售属性耗时:0.3s
查询展示商品规格属性耗时:1.5s
查询展示商品详情信息耗时:1s
即使每个查询时间耗时不多,但是加起来却需要很长耗时。为了减少线性执行造成耗时的累积,这就需要引入异步处理做优化。

二、Future介绍
Future是Java 5添加的类,用来描述一个异步计算的结果。

优点:

可以使用 isDone 方法检查计算是否完成。
使用 get 阻塞住调用线程,直到计算完成返回结果。

可以使用 cancel 方法停止任务的执行。
缺点:

对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。
阻塞的方式与我们想及时得到计算结果的期望相违背。
轮询的方式会消耗大量CPU资源,并且不能及时得到计算结果。

功能:

boolean cancel(boolean mayInterruptIfRunning);

1、尝试取消执行此任务。如果任务已经完成、已被取消或由于其他原因无法取消,则此尝试将失败。如果成功,并且调用 cancel
时此任务尚未启动,则此任务永远不会运行。
2、 如果任务已经开始,则mayInterruptIfRunning
参数确定是否应中断执行此任务的线程以尝试停止任务。 参数mayInterruptIfRunning 为true,表示执行此任务的线程应该被中断;否则,允许进行中的任务完成。 3、此方法返回后,后续调用isDone 将始终返回true。
4、如果此方法返回true,则对isCancelled 的后续调用将始终返回true。

boolean isCancelled();

如果此任务在正常完成之前被取消,则返回true,否则返回false。

boolean isDone();

如果此任务完成,则返回 true。任务完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法将返回 true。

V get() throws InterruptedException, ExecutionException;

阻塞直至任务完成,然后检索其结果。 throws CancellationException:如果计算被取消 throws ExecutionException:如果计算抛出异常 throws InterruptedException:如果当前线程在等待时被中断

V get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException;
如有必要,最多等待计算完成的给定时间,然后检索其结果(如果可用)。
参数timeout:等待的最长时间 参数unit:超时参数的时间单位
throws CancellationException:如果计算被取消 throws
ExecutionException:如果计算抛出异常 throws InterruptedException:如果当前线程在等待时被中断
throws TimeoutException:如果等待超时

三、CompletableFuture
1、介绍
在Java 8中, 新增加了一个包含50个方法左右的类:CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过 get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。

2、创建异步对象(runAsync、supplyAsync)
CompletableFuture 提供了四个静态方法来创建一个异步操作。
(带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。)

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

方法分为两类:

runAsync 没有返回结果
supplyAsync 有返回结果

测试代码:

public class CompletableFutureDemo {

   // corePoolSize:线程池的核心线程数量 线程池创建出来后就会 new Thread() 5个
   // maximumPoolSize:最大的线程数量,线程池支持的最大的线程数
   // keepAliveTime:存活时间,当线程数大于核心线程,空闲的线程的存活时间 50-5=45
   // unit:存活时间的单位
   // BlockingQueue<Runnable> workQueue:阻塞队列 当线程数超过了核心线程数据,那么新的请求到来的时候会加入到阻塞的队列中
   // new LinkedBlockingQueue<>() 默认队列的长度是 Integer.MAX 那这个就太大了,所以我们需要指定队列的长度
   // threadFactory:创建线程的工厂对象
   // RejectedExecutionHandler handler:当线程数大于最大线程数的时候会执行的淘汰策略
   private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
           5,
           50,
           10,
           TimeUnit.SECONDS,
           new LinkedBlockingDeque(1000),
           Executors.defaultThreadFactory(),
           new ThreadPoolExecutor.AbortPolicy()
   );

   public static void main(String[] args) throws ExecutionException, InterruptedException {
       System.out.println("main方法开始了…………");
       CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
           System.out.println("线程开始了...");
           System.out.println("当前线程---->" + Thread.currentThread().getName());
           int i = 100 / 50;
           System.out.println("线程结束了...");
       }, executor);
       System.out.println("main方法结束了…………");

       System.out.println("-----------------------------");
       CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
           System.out.println("线程开始了...");
           System.out.println("当前线程---->" + Thread.currentThread().getName());
           int i = 100 / 50;
           System.out.println("线程结束了...");
           return i;
       }, executor);
       System.out.println("integerCompletableFuture=" + integerCompletableFuture.get());
   }
}

测试结果:
在这里插入图片描述

3、whenCompleteAsync、exceptionally和handleAsync
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);

public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) ;
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) ;
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) ;

3.1、whenCompleteAsync
可以获取异步任务的返回值和抛出的异常信息,但是不能修改返回结果。

测试代码:

public class CompletableFutureDemo1 {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            50,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main方法开始了…………");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 2;
            System.out.println("线程结束了...");
            return i;
        }, executor).whenCompleteAsync((res, e) -> { // 不能修改返回值
            System.out.println("res= " + res);
            System.out.println("e=" + e);
        }, executor);
        System.out.println("main方法结束了…………");
        System.out.println("future=" + future.get());
    }
}

测试结果:

在这里插入图片描述

3.2、exceptionally
当异步任务跑出了异常后会触发的方法,如果没有抛出异常该方法不会执行

测试代码:

public class CompletableFutureDemo1 {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            50,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main方法开始了…………");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 0;
            System.out.println("线程结束了...");
            return i;
        }, executor).whenCompleteAsync((res, e) -> { // 不能修改返回值
            System.out.println("res= " + res);
            System.out.println("e=" + e);
        }, executor).exceptionally(e -> {
            System.out.println("exceptionally执行了,e = " + e);
            return 100;
        });
        System.out.println("main方法结束了…………");
        System.out.println("future=" + future.get());
    }
}

测试结果: 主动触发算术异常

在这里插入图片描述

测试结果: 将int i = 100 / 0;改为int i = 100 / 5;

在这里插入图片描述

3.2.1、拓展—>利用exceptionally达到显式地捕获相关异常的效果
示例:

@Test
public void test() {
    CompletableFuture<Integer> future = createNewFile();
    try {
        Integer flag = future.get();
        if (flag == 0) {
            System.out.println("创建成功!!!");
        } else if (flag == 1) {
            System.out.println("捕获到NullPointerException,创建失败!!!");
        } else if (flag == 2) {
            System.out.println("捕获到IOException,创建失败!!!");
        } else {
            System.out.println("捕获到其他异常,创建失败!!!");
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

private CompletableFuture<Integer> createNewFile() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        File file = null;
        try {
            file.createNewFile();
        } catch (IOException e) {
            // 对于编译时异常,在CompletableFuture中不能直接向外抛出,
            throw new RuntimeException(e.getMessage());
        } catch (NullPointerException e) {
            throw e;
        }
        return 0;
    }).exceptionally(th -> {
        // 走到这里,说明创建失败
        if (th.getCause() instanceof NullPointerException) {
            // 显式地修改返回值
            return 1;
        } else if (th.getCause() instanceof RuntimeException) {
            return 2;
        }
        return 3;
    });
    return future;
}

效果:
在这里插入图片描述

3.2.2、拓展—>completeExceptionally​
如果完成动作过程中抛出异常,将导致对get()和相关方法的调用引发给定的异常。

简而言之,future.completeExceptionally​(e)是在完成过程中主动设置异常信息。(如果future未完成,抛出主动设置的异常信息。反之,则返回完成后的结果。)

案例:

 @Test
 public void test03() throws InterruptedException {
     CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
         System.out.println("future start……");
         try {
             Thread.sleep(2 * 1000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         int i = 100 / 20;
         System.out.println("future end……");
         return i;
     });
     Thread.sleep(3 * 1000);
     future.completeExceptionally(new RuntimeException("未完成时主动抛出异常!!!"));
     try {
         Integer res = future.get();
         System.out.println("res = " + res);
     } catch (ExecutionException e) {
         // 可捕获主动抛出的异常
         System.out.println("捕获主动抛出的异常ExecutionException");
         e.printStackTrace();
     }
 }

效果 (future已完成计算):

在这里插入图片描述

如果将主线程Thread.sleep(3 * 1000);改为Thread.sleep(1 * 1000);
结果如下 (future未完成计算):
在这里插入图片描述

3.2.3、拓展—>obtrudeException
强制导致方法get()和相关方法的后续调用抛出给定的异常,无论是否已完成。

3.3、handleAsync
可以获取异步任务的返回值和抛出的异常信息,而且可以显示地修改返回的结果

测试代码:

public class CompletableFutureDemo1 {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            50,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main方法开始了…………");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 5;
            System.out.println("线程结束了...");
            return i;
        }, executor).handle((res, e) -> {
            System.out.println("res = " + res);
            System.out.println("e = " + e);
            return 200;
        });
        System.out.println("main方法结束了…………");
        System.out.println("future=" + future.get());
    }
}

测试效果:

在这里插入图片描述

4、线程串行方法
thenRunAsync 方法:只要之前的执行完成就执行 thenRun的后续操作。(无接受参数,无返回)
thenAcceptAsync 方法:消费者模式,接受上一个任务处理的结果,并消费处理,无返回结果
thenApplyAsync 方法:当一个线程依赖另一个线程,获取上一个任务的返回结果,并返回当前任务的结果。
(带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。)

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

4.1、thenRunAsync 实现代码

public class CompletableFutureDemo2 {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            50,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main方法开始了…………");
        CompletableFuture.runAsync(() -> {
            System.out.println("线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 5;
            System.out.println("线程结束了...");
        }, executor).thenRunAsync(()->{
            System.out.println("thenRunAsync我进行操作了……………………");
        });
    }
    
}

4.2、thenRunAsync 实现效果
在这里插入图片描述

4.3、thenAcceptAsync 实现代码

public class CompletableFutureDemo2 {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            50,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main方法开始了…………");
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 5;
            System.out.println("线程结束了...");
            return i;
        }, executor).thenAcceptAsync(res -> {
            System.out.println("thenAcceptAsync-------->" + res);
        });
    }
    
}

4.4、thenAcceptAsync 实现效果
在这里插入图片描述

4.5、thenApplyAsync 实现代码

public class CompletableFutureDemo2 {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            50,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main方法开始了…………");

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 5;
            System.out.println("线程结束了...");
            return i;
        }, executor).thenApplyAsync(res -> {
            System.out.println("thenAcceptAsync-------->" + res);
            return 200;
        });

        System.out.println("future--------->" + future.get());
    }
    
}

4.6、thenApplyAsync 实现效果
在这里插入图片描述

5、等待两个任务执行完成后才会触发
runAfterBothAsync 方法:不可以获取前面两线程的返回结果,本身也没有返回结果。
thenAcceptBothAsync 方法:可以获取前面两线程的返回结果,本身没有返回结果。
thenCombineAsync 方法:可以获取前面两线程的返回结果,本身也有返回结果
(带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。)

5.1、runAfterBothAsync 实现代码

public class CompletableFutureDemo3 {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            50,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main方法开始了…………");

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future1线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 5;
            System.out.println("future1线程结束了...");
            return i;
        }, executor);

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future2线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 20;
//            try {
//                Thread.sleep(3000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
            System.out.println("future2线程结束了...");
            return i;
        }, executor);
        
        future1.runAfterBothAsync(future2,()->{
            System.out.println("任务3执行了");
        },executor);
    }
    
}

5.2、runAfterBothAsync 实现效果
如果放开Thread.sleep(3000);,那么 runAfterBothAsync 中的代码会等3s后(也就是等待 future1和 future2都执行完)才执行。
在这里插入图片描述

5.3、thenAcceptBothAsync 实现代码

public class CompletableFutureDemo3 {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            50,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main方法开始了…………");

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future1线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 5;
            System.out.println("future1线程结束了...");
            return i;
        }, executor);

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future2线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 20;
//            try {
//                Thread.sleep(3000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
            System.out.println("future2线程结束了...");
            return i;
        }, executor);
        
        future1.thenAcceptBothAsync(future2, (res1, res2) -> {
            System.out.println("thenAcceptBothAsync开始了");
            System.out.println("res1 = " + res1);
            System.out.println("res2 = " + res2);
            System.out.println("thenAcceptBothAsync结束了");
        },executor);
    }
    
}

5.4、thenAcceptBothAsync 实现效果
在这里插入图片描述

5.5、thenCombineAsync 实现代码

public class CompletableFutureDemo3 {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            50,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main方法开始了…………");

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future1线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 5;
            System.out.println("future1线程结束了...");
            return i;
        }, executor);

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future2线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 20;
//            try {
//                Thread.sleep(3000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
            System.out.println("future2线程结束了...");
            return i;
        }, executor);

        CompletableFuture<Integer> future3 = future1.thenCombineAsync(future2, (res1, res2) -> {
            System.out.println("future3开始了");
            return res1 + res2;
        }, executor);
        System.out.println("future3.get()=" + future3.get());
    }
    
}

5.6、thenCombineAsync 实现效果
在这里插入图片描述

6、两个任务完成一个就会触发
两个任务只要有一个完成就会触发。
(对于acceptEitherAsync、applyToEitherAsync可接受前面两任务返回结果来说,如果任务有返回值,哪个任务先执行完先获取其结果作为参数)

runAfterEitherAsync 方法:
不可以获取前面两线程的返回结果,本身也没有返回结果。
acceptEitherAsync方法:
可以获取前面两线程的返回结果,本身也没有返回结果。 applyToEitherAsync方法:
可以获取前面两线程的返回结果,本身有返回结果。 (带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。)

6.1、runAfterEitherAsync 实现代码

public class CompletableFutureDemo4 {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            50,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main方法开始了…………");

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future1线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 5;
//            try {
//                Thread.sleep(3000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
            System.out.println("future1线程结束了...");
            return i;
        }, executor);

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future2线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 20;
//            try {
//                Thread.sleep(3000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
            System.out.println("future2线程结束了...");
            return i;
        }, executor);
        
        future1.runAfterEitherAsync(future2,()->{
            System.out.println("runAfterEitherAsync任务执行了");
        },executor);
        
    }
    
}

6.2、runAfterEitherAsync 实现效果
在这里插入图片描述

6.3、acceptEitherAsync 实现代码

public class CompletableFutureDemo4 {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            50,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main方法开始了…………");

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future1线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 5;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("future1线程结束了...");
            return i;
        }, executor);

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future2线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 20;
//            try {
//                Thread.sleep(3000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
            System.out.println("future2线程结束了...");
            return i;
        }, executor);
        
        future1.acceptEitherAsync(future2, (res) -> {
            System.out.println("acceptEitherAsync开始了");
            System.out.println("res = " + res);
        },executor);
        
    }
    
}

6.4、acceptEitherAsync 实现效果
在这里插入图片描述

6.5、applyToEitherAsync 实现代码

public class CompletableFutureDemo4 {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            50,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main方法开始了…………");

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future1线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 5;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("future1线程结束了...");
            return i;
        }, executor);

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future2线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 20;
//            try {
//                Thread.sleep(3000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
            System.out.println("future2线程结束了...");
            return i;
        }, executor);
        
        CompletableFuture<Integer> future3 = future1.applyToEitherAsync(future2, (res) -> {
            System.out.println("future3开始了");
            System.out.println("res = " + res);
            return res;
        }, executor);
        System.out.println("future3.get()=" + future3.get());
    }
    
}

6.6、applyToEitherAsync 实现效果
在这里插入图片描述

7、多任务组合
anyOf 方法:只要有一个任务完成。
allOf 方法:等待所有任务完成。

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

7.1、anyOf 实现代码
只要有一个任务完成就会触发。

public class CompletableFutureDemo5 {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            50,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    /**
     * anyOf:只要有一个线程完成,那么就不阻塞
     * allOf:所有线程都完成,在 get方法阻塞直至所有线程都完成
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main方法开始了…………");

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future1线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 5;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("future1线程结束了...");
            return i;
        }, executor);

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future2线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 20;
            System.out.println("future2线程结束了...");
            return i;
        }, executor);

        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2);
        anyOf.get();
        System.out.println("主任务完成anyOf:" + anyOf.get());
    }

}

7.2、anyOf 实现效果

在这里插入图片描述

7.3、allOf 实现代码
阻塞等待所有任务完成才会触发。

public class CompletableFutureDemo5 {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            50,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    /**
     * anyOf:只要有一个线程完成,那么就不阻塞
     * allOf:所有线程都完成,在 get方法阻塞直至所有线程都完成
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main方法开始了…………");

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future1线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 5;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("future1线程结束了...");
            return i;
        }, executor);

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future2线程开始了...");
            System.out.println("当前线程---->" + Thread.currentThread().getName());
            int i = 100 / 20;
            System.out.println("future2线程结束了...");
            return i;
        }, executor);
        
        CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);
        allOf.get(); // 阻塞在这个位置,等待所有线程的完成
        System.out.println("主任务完成allOf:" + future1.get() + "," + future2.get());
    }

}

7.4、allOf 实现效果
在这里插入图片描述

8、细节
8.1、thenApply与thenCompose区别

public <U> CompletableFuture<U> thenApply(
       Function<? super T,? extends U> fn) {
       return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenCompose(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(null, fn);
}

thenApply:返回的是泛型中的类型转化为返回值类型的CompletableFuture对象。
thenCompose:返回的是一个扁平化的CompletableFuture对象。
(用来连接两个CompletableFuture,是生成一个新的CompletableFuture。特别像stream().flatMap扁平化处理)
区别:当返回是CompletableFuture的话,thenApply是嵌套,而thenCompose扁平化。

public class CompletableFutureTest {

    public static void main(String[] args) {
        CompletableFuture<Integer> apply = CompletableFuture.supplyAsync(() -> {
            int i;
            try {
                i = 100 / 2;
            } catch (Exception e) {
                throw new RuntimeException("算术异常!!!");
            }
            return i;
        });
        // thenApply返回的如果是CompletableFuture,就会嵌套起来
        CompletableFuture<? extends CompletableFuture<?>> apply1 = apply.thenApply(res -> {
            System.out.println("res1 = " + res);
            System.out.println("apply1当前还在执行中……");
            return CompletableFuture.completedFuture(res);
        });
        CompletableFuture<? extends CompletableFuture<?>> apply2 = apply.thenApply(res -> {
            System.out.println("res2 = " + res);
            System.out.println("apply2当前还在执行中……");
            return CompletableFuture.completedFuture(res);
        });
        // thenCompose不同,返回的扁平化之后的一维CompletableFuture
        CompletableFuture<?> compose = apply.thenCompose(res -> {
            System.out.println("res3 = " + res);
            System.out.println("compose当前还在执行中……");
            return CompletableFuture.completedFuture(res);
        });

        System.out.println(apply1 == apply2);
        try {
            apply.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

测试效果:
在这里插入图片描述

总结
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/571507.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JVM--解析运行期优化与JIT编译器

本篇博客&#xff0c;我们来谈一谈JVM&#xff08;HotSpot&#xff09;为了提高Java程序的运行效率&#xff0c;都实现了哪些激动人心的技术&#xff5e; 1 JIT编译器的引入 首先我们这篇文章中所说的编译器都是指JVM的组成部分之一---即时编译器&#xff08;JIT&#xff09;…

《三》Git 中的本地仓库

初始化本地 Git 仓库&#xff1a; 通过 git init 初始化&#xff0c;可以把当前目录变成了 Git 管理的本地仓库。目前仅仅是做了一个初始化仓库的操作&#xff0c;项目里的文件还没有被跟踪。 在当前目录下会出现一个名为 .git 的目录&#xff0c;这些文件是 Git 仓库的核心。…

每天一个面试题之==和equals的区别是什么?

&#xff1d;&#xff1d;和equals的区别是什么&#xff1f; ""是一个关系运算符&#xff0c;关系运算符可以用来进行数据和数据之间的比较&#xff0c;而在java中数据类型大致可以分为两大类分别是基本数据类型和引用数据类型。 基本数据类型包含 byte&#xff0c…

React学习笔记八-受控与非受控组件

此文章是本人在学习React的时候&#xff0c;写下的学习笔记&#xff0c;在此纪录和分享。此为第八篇&#xff0c;主要介绍非受控组件与受控组件。 目录 1.非受控组件 1.1表单提交案例 1.2案例的总结 2.受控组件 2.1受控组件案例 2.1受控案例总结 1.非受控组件 1.1表单提…

基于LLMs的多模态大模型(Flamingo, BLIP-2,KOSMOS-1,ScienceQA)

前一篇博客已经整理了不训练视觉模型的文章们&#xff1a; 基于LLMs的多模态大模型&#xff08;Visual ChatGPT&#xff0c;PICa&#xff0c;MM-REACT&#xff0c;MAGIC&#xff09; 本篇文章将介绍一些需要训练视觉编码器来适配多模态大模型的工作们&#xff0c;这也是目前最…

QT桌面项目(状态栏和导航栏设置)

文章目录 前言一、状态栏二、导航栏三、同时添加状态栏和导航栏总结 前言 为了和我们这个项目做的更加真实&#xff0c;这里为我们的项目添加上状态栏和导航栏让他变成更加接近手机的桌面效果。 一、状态栏 这个状态栏就是显示时间和wifi状态&#xff0c;电池电量的&#xf…

9秒被骗132万元,AI换脸骗术,如何防范?

5月22日&#xff0c;安徽安庆的何先生接到熟人视频电话&#xff0c;让他帮忙转一笔账&#xff0c;但在9秒之后&#xff0c;对方却以“在开会”为由&#xff0c;迅速挂断了电话&#xff0c;还称“微信和电话不能说&#xff0c;加一下QQ”。“因为打了视频电话&#xff0c;又是熟…

数据结构学习记录——如何建立图(邻接矩阵、邻接表-图节点的结构、创建并初始化、插入变、完整图的建立)

目录 邻接矩阵 图节点的结构 创建并初始化 插入边 完整的图的建立 邻接表 图节点的结构 创建并初始化 插入边 完整的图的建立 邻接矩阵 图节点的结构 #include <stdio.h> #include <stdlib.h>#define MaxVertexNum 100 // 最大顶点数typedef int Wei…

Maven介绍与安装和配置

目录 Maven 简介 约定优于配置 Maven 特点 Maven 安装与配置 Maven 下载 配置 Maven 环境变量 Maven 简介 Maven 是一款基于 Java 平台的项目管理和整合工具&#xff0c;它将项目的开发和管理过程抽象成一个项目对象模型&#xff08;POM&#xff09;。开发人员只需要做一…

C语言结构体

C语言结构体 前言1. 结构体的声明1.1 结构体的基础知识1.2 结构体声明1.3 结构体成员的类型1.4 结构体变量的定义和初始化 2. 结构体成员的访问2.1 结构体变量访问成员2.2 结构体指针访问指针变量的成员 3. 结构体传参4. 结尾 前言 C语言结构体是一种自定义数据类型&#xff0…

vite-plugin-pwa配置详解

vite-plugin-pwa配置详解 前提&#xff1a;前端域名和后端服务域名相同时&#xff0c;用window.open新开页面下载或者导出文件&#xff0c;项目中导出和下载功能失效&#xff0c;原因是&#xff0c;域名相同走缓存 实现service worker离线缓存以前需要自己编写sw.js文件内容&…

基于SpringBoot+Vue的闲一品交易平台设计与实现

博主介绍&#xff1a; 大家好&#xff0c;我是一名在Java圈混迹十余年的程序员&#xff0c;精通Java编程语言&#xff0c;同时也熟练掌握微信小程序、Python和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架下…

MT4电脑版交易软件使用技巧有哪些?

MT4交易软件作为连接券商平台与投资者之间的纽带&#xff0c;不仅是外汇金融机构的首选交易平台&#xff0c;也因其显著的优势成为了外汇投资者进行网上交易的重要平台。而MT4交易软件又分为电脑版和手机版&#xff0c;因为大多数投资者进行外汇投资时使用的是MT4电脑版软件&am…

Gradio的web界面演示与交互机器学习模型,接口自动刷新或连续刷新数据流《5》

通过在接口中设置liveTrue&#xff0c;可以使接口自动刷新。现在&#xff0c;一旦用户输入发生变化&#xff0c;界面就会重新计算。依然使用计算器的示例&#xff1a; 实时接口 import gradio as grdef calculator(num1, operation, num2):if operation "add":ret…

浏览器原理+跨域+解决方案

原网址&#xff1a;浏览器部分笔记_浏览器不同窗口cookie共享吗_JackieChan_的博客-CSDN博客 一、浏览器存储对象 1.cookie cookie是一种纯文本文件&#xff0c;大小只有4kb&#xff0c;每次发送非跨域html请求时都会自动携带。特性如下&#xff1a; cookie一旦创建&#xff…

华为开源自研AI框架昇思MindSpore应用案例:Pix2Pix实现图像转换

目录 一、环境准备1.进入ModelArts官网2.使用CodeLab体验Notebook实例 在实际应用场景中&#xff0c;由于训练数据集不足&#xff0c;所以很少有人会从头开始训练整个网络。普遍的做法是&#xff0c;在一个非常大的基础数据集上训练得到一个预训练模型&#xff0c;然后使用该模…

Java程序设计入门教程--主函数

情形 在Java中&#xff0c;主函数就是主方法&#xff0c;即main()方法。它是Java应用程序的入口方法&#xff0c;也就是说&#xff0c;程序在运行的时候&#xff0c;第一个执行的方法就是main()方法&#xff0c;这个方法和其他的方法有很大的不同&#xff0c;比如方法的名字必…

Python100天:01.初识python

❝ 本教程计划通过100天的时间&#xff0c;每天分享一篇关于python的知识点&#xff0c;与大家一起学习python这门编程语言。 ❞ Python 对初学者来说是一门很棒的语言&#xff1a; 容易学 有一个积极的支持社区 在网络开发、游戏、数据科学方面提供多种机会。 Python的应用领域…

PMP课堂模拟题目及解析(第13期)

121. 项目经理、团队成员以及若干干系人共同参与一次风险研讨会。已经根据风险管理计划生成并提供一份风险报告。若要为各个项目风险进行优先级排序&#xff0c;现在必须执行哪一项分析&#xff1f; A. 定量风险分析 B. 根本原因分析 C. 偏差分析 D. 定性风险分析 122. …

Yarn资源调度详解

第1章 Yarn资源调度器 思考&#xff1a; 1&#xff09;如何管理集群资源&#xff1f; 2&#xff09;如何给任务合理分配资源&#xff1f; Yarn是一个资源调度平台&#xff0c;负责为运算程序提供服务器运算资源&#xff0c;相当于一个分布式的操作系统平台&#xff0c;而MapRe…