学习笔记:Java并发编程(补)CompletableFuture

news2024/11/24 16:06:17

  • 学习视频https://www.bilibili.com/video/BV1ar4y1x727
  • 参考书籍《实战 JAVA 高并发程序设计》 葛一鸣

系列目录


  • 学习笔记:Java 并发编程①_基础知识入门
  • 学习笔记:Java 并发编程②_共享模型之管程
  • 学习笔记:Java 并发编程③_共享模型之内存
  • 学习笔记:Java 并发编程④_共享模型之无锁
  • 学习笔记:Java 并发编程⑤_共享模型之不可变
  • 学习笔记:Java 并发编程⑥_共享模型之并发工具_线程池
  • 学习笔记:Java 并发编程⑥_共享模型之并发工具_JUC
  • 学习笔记Java并发编程CompletableFuture
  • 学习笔记Java并发编程ThreadLocal

前言


系列文章目录中的前七篇文章的相关学习视频都是:黑马程序员深入学习 Java 并发编程JUC 并发编程全套教程。然而这个时长为 32h 的视频中并没有 CompletableFuture 的相关知识,这个知识点还是蛮重要的。故找了相关的视频来学习这个重要的知识点,写在了系列文章目录中的第八篇博客里。第八篇的相关学习视频是 尚硅谷 JUC 并发编程对标阿里 P6-P7


  • 若文章内容或图片失效请留言反馈
  • 部分素材来自网络若不小心影响到您的利益请联系博主删除
  • 写这篇博客旨在制作笔记方便个人在线阅览巩固知识无他用

1.Future


1.1.Future 概述


Future 的核心思想是异步调用

Future 接口(FutureTask实现类)会用异步编程的方式去执行一些方法。
如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务是否执行完毕等。

当我们需要调用一个函数的时候,如果这个函数执行的过程非常的漫长,那我们就需要等待。
但我们不一定急着要该函数的结果。所以我们可以让被调者立即返回,让它在后台慢慢地处理这个请求。
对于调用者来说,可以先处理其他的任务,在真正需要数据的场合,再去尝试获得需要的数据。
在整个的调用过程中,不存在无谓的等待,充分利用了所有的时间片段,从而提高了系统的响应速度。

一句话:Future 接口可以为主线程开一个分支任务,专门为主线程处理耗时费力的复杂业务。


FutureJava 5 新加的一个接口,它提供了一种 异步并行计算的功能
如果主线程需要执行一个很耗时的计算任务,我们就可以通过 future 把这个任务放到异步编程中执行。
主线程继续处理其他任务或者先行结束后,再通过 Future 来获取计算结果。


1.2.Future 的简单使用


  • Callable 接口
  • Future 接口 和 FutureTask 实现类

在这里插入图片描述
java/util/concurrent/FutureTask.java

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

下面是 Future 的一些简单功能

public boolean isCancelled() // 取消任务

public boolean isDone() // 是否已经取消

public boolean cancel(boolean mayInterruptIfRunning) // 是否已经完成

public V get() throws InterruptedException, ExecutionException // 取得返回对象

public V get(long timeout, TimeUnit unit) // 取得返回对象,可以设置超时时间

示例代码

@Slf4j(topic = "c.FutureDemo_1")
public class FutureDemo_1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask_1 = new FutureTask<>(new MyThread_1(), "Hello MyThread_1");
        FutureTask<String> futureTask_2 = new FutureTask<>(new MyThread_2());

        Thread t1 = new Thread(futureTask_1);
        t1.setName("t1");
        Thread t2 = new Thread(futureTask_2, "t2");
        t1.start();
        t2.start();

        log.info(futureTask_1.get());
        log.info(futureTask_2.get());
    }
}

@Slf4j(topic = "c.MyThread_1")
class MyThread_1 implements Runnable {
    @Override
    public void run() {
        log.info("--- Come In MyThread_1.run() ---");
    }
}

@Slf4j(topic = "c.MyThread_2")
class MyThread_2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        log.info("--- Come In MyThread_2.call() ---");
        return "Hello MyThread_2";
    }
}

输出结果

22:03:06.526 [t1] INFO c.MyThread_1 - --- Come In MyThread_1.run() ---
22:03:06.526 [t2] INFO c.MyThread_2 - --- Come In MyThread_2.call() ---
22:03:06.538 [main] INFO c.FutureDemo_1 - Hello MyThread_1
22:03:06.538 [main] INFO c.FutureDemo_1 - Hello MyThread_2

1.3.Future 的优缺点


  • 优点Future + 线程池 异步 多线程任务 配合,可以显著提高程序的执行效率
  • 缺点Future 对结果的获取不是很友好,只能通过阻塞或者是轮询的方式得到任务的结果
    • get() 阻塞:一旦调用 get() 方法求结果,在 futureTask 没有计算完成的情况下,容易导致线程阻塞
    • isDone() 轮询
      如果想要异步获取结果,一般都是会以轮询的方式去获取结果,尽量避免阻塞。
      轮询的方式会耗费无谓的 CPU 资源,而且也不见得可以及时地得到计算结果

1.3.1.Future + 线程池


示例代码

@Slf4j(topic = "FutureThreadPoolDemo")
public class FutureThreadPoolDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();

        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        FutureTask<String> futureTask_1 = new FutureTask<>(() -> {
            TimeUnit.MILLISECONDS.sleep(500);
            return "Task_1 over";
        });
        threadPool.submit(futureTask_1);

        FutureTask<String> futureTask_2 = new FutureTask<>(() -> {
            TimeUnit.MILLISECONDS.sleep(500);
            return "Task_2 over";
        });
        threadPool.submit(futureTask_2);

        TimeUnit.MILLISECONDS.sleep(300);
        log.info("{}", futureTask_1.get());// get() 方法会阻塞线程
        log.info("{}", futureTask_2.get());

        long endTime = System.currentTimeMillis();
        log.info("costTime:{} 毫秒", (endTime - startTime));
        threadPool.shutdown();

        m1();
    }

    private static void m1() throws InterruptedException {
        long startTime = System.currentTimeMillis();

        TimeUnit.MILLISECONDS.sleep(500);
        TimeUnit.MILLISECONDS.sleep(500);
        TimeUnit.MILLISECONDS.sleep(300);

        long endTime = System.currentTimeMillis();
        log.info("m1_costTime:{} 毫秒", (endTime - startTime));
    }
}

输出

22:45:13.678 [main] INFO FutureThreadPoolDemo - Task_1 over
22:45:13.692 [main] INFO FutureThreadPoolDemo - Task_2 over
22:45:13.692 [main] INFO FutureThreadPoolDemo - costTime:567 毫秒
22:45:15.018 [main] INFO FutureThreadPoolDemo - m1_costTime:1326 毫秒

1.3.2.get() 阻塞


示例代码

@Slf4j(topic = "c.FutureApiDemo")
public class FutureApiDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> future = new FutureTask<String>(() -> {
            log.info("{} Come in", Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(5);
            return "TASK OVER !!!";
        });

        Thread t1 = new Thread(future, "t1");
        t1.start();

        log.info("{} 忙其他任务了", Thread.currentThread().getName());
        log.info("future.get():{}", future.get());
    }
}

输出结果

23:15:20.551 [t1] INFO c.FutureApiDemo - t1 Come in
23:15:20.551 [main] INFO c.FutureApiDemo - main 忙其他任务了
23:15:25.565 [main] INFO c.FutureApiDemo - future.get()TASK OVER !!!

上面这种情况是没有什么问题的,但如果我们调换一下 future.get() 的位置呢?

// 将 future.get() 置于前方
log.info("future.get():{}", future.get());
log.info("{} 忙其他任务了", Thread.currentThread().getName());

输出结果(显然,主线程被 future.get() 阻塞住了,只有在 future 的任务解决了之后,主线程才继续运行)

23:16:20.399 [t1] INFO c.FutureApiDemo - t1 Come in
23:16:25.422 [main] INFO c.FutureApiDemo - future.get()TASK OVER !!!
23:16:25.422 [main] INFO c.FutureApiDemo - main 忙其他任务了

更改 future.get() 方法中的参数(不用 public V get() 了,改用 public V get(long timeout, TimeUnit unit) 了)

log.info("future.get():{}", future.get(3, TimeUnit.SECONDS));
log.info("{} 忙其他任务了", Thread.currentThread().getName());

输出结果(直接抛出异常了,不管 future 之外的线程了,主线程的那一行代码完全没有执行)

23:22:57.287 [t1] INFO c.FutureApiDemo - t1 Come in
Exception in thread "main" java.util.concurrent.TimeoutException
	at java.util.concurrent.FutureTask.get(FutureTask.java:205)
	at org.example.replenish.future.FutureApiDemo.main(FutureApiDemo.java:24)

当然了,如果使用的是 try-catch 的话,主线程倒还是可以在等待时间结束后继续运行的

try {
    log.info("future.get():{}", future.get(3, TimeUnit.SECONDS));
} catch (TimeoutException e) {
    log.info("e.printStackTrace()");
    e.printStackTrace();
}

log.info("{} 忙其他任务了", Thread.currentThread().getName());

输出结果

23:38:26.841 [t1] INFO c.FutureApiDemo - t1 Come in
23:38:29.843 [main] INFO c.FutureApiDemo - e.printStackTrace()
23:38:29.843 [main] INFO c.FutureApiDemo - main 忙其他任务了
java.util.concurrent.TimeoutException
	at java.util.concurrent.FutureTask.get(FutureTask.java:205)
	at org.example.replenish.future.FutureApiDemo.main(FutureApiDemo.java:26)

1.3.3.isDone() 轮询


鉴于上面的异常情况,还是不要调用 public V get(long timeout, TimeUnit unit) 为好,建议使用下面的姿势

while (true) {
    if (future.isDone()) {
        log.info("future.get():{}", future.get());
        break;
    } else {
        try {
            TimeUnit.MILLISECONDS.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("正在处理中...莫急");
        System.out.println("... ...");
    }
}
log.info("{} 忙其他任务了", Thread.currentThread().getName());

输出结果(控制台输出的信息,一切都是岁月静好)

23:46:11.225 [t1] INFO c.FutureApiDemo - t1 Come in
23:46:16.232 [main] INFO c.FutureApiDemo - 正在处理中...莫急
... ...
23:46:21.247 [main] INFO c.FutureApiDemo - 正在处理中...莫急
... ...
23:46:21.247 [main] INFO c.FutureApiDemo - future.get()TASK OVER !!!
23:46:21.247 [main] INFO c.FutureApiDemo - main 忙其他任务了

但是,CPU 资源还是浪费了


1.4.Future 异步优化的思路


想完成一些复杂的任务

  • 对于简单的业务场景,使用 Future 是没有任务问题的
  • 回调通知
    应用对应着 Future 的完成时间,任务完成了可以告诉我,此即回调通知
    通过轮询的方式去判断任务是否完成,这样做是很占 CPU 资源的,而且代码也不优雅
    我希望的是,任务完成的时候,再去主动通知我(任务已经完成了)
  • 创建异步任务Future + 线程池
  • 多个任务前后依赖可以组合处理
    想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值。
    将两个或多个异步计算合成依噶异步计算,这几个异步计算相互独立,同时后面这个又依赖前一个处理的结果。
  • 选择最快的计算结果:当 Future 集合中某个任务最快结束时,返回结果(也就是最快的那个)
  • … …

Future 提供的 API 的功能并不能完全满足我们的需求,处理起来也不够优雅

此时还是需要 CompletableFuture 来处理这些需求(以声明式的方式)

在这里插入图片描述


2.CompletableFuture


2.1.简单介绍


2.1.1.为什么会出现?


get() 方法在 Future 计算完成之前会一直处在阻塞状态下,isDone() 方法容易耗费 CPU 资源

对于真正的异步处理,我们希望是可以通过传入回调函数,在 Future 结束的时候自动调用该回调函数,这样我们就可以不用等待结果。

阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的 CPU 资源,故 JDK 8 中出现了 CompletableFuture 这样的设计。

CompletableFuture 提供了一种类似观察者模式类似的机制,可以让任务执行完成后通知监听的一方。(完成好了,就通知我)


2.1.2.CompletionStage 接口


CompletionStage 接口有 40 多种方法,是为了函数式编程中的流式调用准备的。

  • CompletionStage 代表的是异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另一个阶段。
    有些类似 Linux 系统的管道分隔符传参数
  • 一个阶段的计算执行可以是 FunctionConusmer 或者 Runnable
    比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

2.1.3.ComletableFuture 类


CompletableFutureJava 8 新增的一个超大型工具类。

  • Java 8 中,CompletableFuture 提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性;
    并且它还提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法
  • 它可能代表一个明确完成的 Future,也有可能代表一个完成阶段(CompletionStage
    它支持在计算完成以后触发一些函数或执行某些动作
  • 它实现了 Future 接口和 CompletionStage 接口。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

在这里插入图片描述


优点

  • 异步任务结束时,会自动回调某个对象的方法
  • 主线程设置好回调以后,不再关心异步任务的执行,异步任务之间可以按照顺序来执行
  • 异步任务出错的时候,会自动回调某个对象的方法

2.2.核心的四个静态方法


CompletableFuture 中有四个核心的工厂方法


runAsync() 方法用于没有返回值的场景。比如,仅仅是简单地执行某一个异步动作。

public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}

supplyAsync() 方法用于那些需要有返回值的场景。比如计算某个数据等。

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}

参考书籍《实战 JAVA 高并发程序设计》 葛一鸣

在上面的两对方法中,都有一个方法可以接收一个 Executor 参数。我们可以借此让 Supplier<U> 或者 Runnable 在指定的线程池中工作;如果不指定 Executor,则在默认的系统公共的 ForkJoinPool.common 线程池中执行。

补充知识:在 Java 8 中,新增了 ForkJoinPool.commonPool() 方法。它可以获得一个公共的 ForkJoin 线程池。这个公共线程池中的所有线程都是 Daemon 线程。这意味着如果主线程退出,这些线程无论是否执行完毕,都会退出系统。


下面的示例代码是在验证上面的说法:在上面的四个静态方法中

  • 如果是没有指定 Executor 的方法,会使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码;
  • 如果是指定了线程池的方法,则使用指定的线程池运行

@Slf4j(topic = "c.CompletableFutureDemo_1")
public class CompletableFutureDemo_1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        method_1();
    }
	
	// ... ...
}
  • 调用 method_1() 方法(未指定线程池)
private static void method_1() throws InterruptedException, ExecutionException {
    CompletableFuture<Void> voidCF = CompletableFuture.runAsync(() -> {
        log.info("{}", Thread.currentThread().getName());

        // 线程暂停 1 秒
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
    });

    log.info("{}", voidCF.get());
}
  • 调用 method_2() 方法(指定线程池)
private static void method_2() throws InterruptedException, ExecutionException {
    ExecutorService threadPool = Executors.newFixedThreadPool(3);

    CompletableFuture<Void> voidCF = CompletableFuture.runAsync(() -> {
        log.info("{}", Thread.currentThread().getName());

        // 线程暂停 1 秒
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace();}
    }, threadPool);

    log.info("{}", voidCF.get());

    threadPool.shutdown();
}
  • 调用 method_3() 方法(未指定线程池)
private static void method_3() throws InterruptedException, ExecutionException {
    CompletableFuture<String> rCF = CompletableFuture.supplyAsync(() -> {
                log.info("{}", Thread.currentThread().getName());
                // 线程暂停 1 秒
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "Hello SupplyAsync";
            }
    );

    log.info("{}", rCF.get());
}
  • 调用 method_4() 方法(指定线程池)
private static void method_4() throws InterruptedException, ExecutionException {
    ExecutorService threadPool = Executors.newFixedThreadPool(3);

    CompletableFuture<String> rCF = CompletableFuture.supplyAsync(() -> {
                log.info("{}", Thread.currentThread().getName());
                // 线程暂停 1 秒
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "Hello SupplyAsync";
            }, threadPool
    );

    log.info("{}", rCF.get());
    threadPool.shutdown();
}
  • 调用 method_1() 方法(未指定线程池)控制台输出
10:48:14.519 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureDemo_1 - ForkJoinPool.commonPool-worker-1
10:48:15.535 [main] INFO c.CompletableFutureDemo_1 - null
  • 调用 method_2() 方法(指定线程池)控制台输出
10:51:03.789 [pool-1-thread-1] INFO c.CompletableFutureDemo_1 - pool-1-thread-1
10:51:04.813 [main] INFO c.CompletableFutureDemo_1 - null
  • 调用 method_3() 方法(未指定线程池)控制台输出
11:04:24.533 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureDemo_1 - ForkJoinPool.commonPool-worker-1
11:04:25.555 [main] INFO c.CompletableFutureDemo_1 - Hello SupplyAsync
  • 调用 method_4() 方法(指定线程池)控制台输出
11:05:23.226 [pool-1-thread-1] INFO c.CompletableFutureDemo_1 - pool-1-thread-1
11:05:24.244 [main] INFO c.CompletableFutureDemo_1 - Hello SupplyAsync

2.3.通用异步编程


Java 8 开始引入了 CompletableFuture,它是 Future 的扩展功能增强版,可以减少 Future 中的阻塞和轮询问题。

它可以传入回调对象,当异步任务完成或发生异常的时候,自动调用回调对象的回调方法。


@Slf4j(topic = "c.CompletaleFutureDemo_2")
public class CompletableFutureDemo_2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        test_1();
    }

	// ... ...
}

熟悉的配方,熟悉的味道

private static void test_1() throws InterruptedException, ExecutionException {
    CompletableFuture<Integer> rCF = CompletableFuture.supplyAsync(() -> {
        log.info("---【{} Come In】---", Thread.currentThread().getName());

        int result = ThreadLocalRandom.current().nextInt(10);

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.info("---【1 秒后出结果:{}】---", result);
        return result;
    });

    log.info("---【{} 线程先去忙其他任务了】---", Thread.currentThread().getName());
    log.info("---【result:{}】---", rCF.get());
}

Future 可以做到的事情,CompletableFuture 都可以做到。

11:30:11.460 [main] INFO c.CompletaleFutureDemo_2 - ---【main 线程先去忙其他任务了】---
11:30:11.460 [ForkJoinPool.commonPool-worker-1] INFO c.CompletaleFutureDemo_2 - ---ForkJoinPool.commonPool-worker-1 Come In---
11:30:12.487 [ForkJoinPool.commonPool-worker-1] INFO c.CompletaleFutureDemo_2 - ---1 秒后出结果:9---
11:30:12.487 [main] INFO c.CompletaleFutureDemo_2 - ---【result:9---

CompletableFuture 可以传入回调对象,当异步任务完成或发生异常的时候,自动调用回调对象的回调方法。

private static void test_2() throws InterruptedException, ExecutionException {
    ExecutorService threadPool = Executors.newFixedThreadPool(3);

    try {
        CompletableFuture.supplyAsync(() -> {
            log.info("【{} Come In】", Thread.currentThread().getName());
            int result = ThreadLocalRandom.current().nextInt(10);
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            log.info("【1 秒后出结果:{}】", result);
            if (result > 5) { int i = 10 / 0; }
            return result;
        }, threadPool).whenComplete((result, exception) -> {
            log.info("[进入 whenComplete 方法]");
            if (exception == null) { log.info("【计算完成,更新系统。Update result:{}】", result); }
            log.info("[退出 whenComplete 方法]");
        }).exceptionally(exception -> {
            // exception.printStackTrace();
            System.out.println("【异常情况 Cause:" + exception.getCause() + "】");
            log.info("【异常情况 Message:{}】", exception.getMessage());
            return null;
        });
    } catch (Exception e) {
        // e.printStackTrace();
    } finally {
        threadPool.shutdown();
    }

    // 【注意】
    //  在这个程序里,CompletableFuture 默认使用的 [ForkJoinPool] 就是守护线程,[main] 就是用户线程
    //  如果主线程立刻结束的话,CompletableFuture 默认使用的线程池会立刻关闭
    // (守护线程要守护的对象不存在了,程序就应该结束)
    // 	我们一般推荐 [自定义线程池 + CompletableFuture] 配合使用
    log.info("【{} 线程先去忙其他任务了】", Thread.currentThread().getName());
}

控制台输出(没有出现异常的情况,走 whenComplete() 方法)

12:28:54.135 [main] INFO c.CompletaleFutureDemo_2 - 【main 线程先去忙其他任务了】
12:28:54.135 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - 【pool-1-thread-1 Come In12:28:55.151 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 -1 秒后出结果:512:28:55.151 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - [进入 whenComplete 方法]
12:28:55.151 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - 【计算完成,更新系统。Update result:512:28:55.151 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - [退出 whenComplete 方法]

控制台输出(出现了异常的情况,依然会进入 whenComplete() 方法,之后会走 exceptionally() 方法)

注意:就算是出现了异常,也还是会进入 whenComplete() 方法,故我们还需要在 whenComplete() 加判断处理

12:29:58.062 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - 【pool-1-thread-1 Come In12:29:58.062 [main] INFO c.CompletaleFutureDemo_2 - 【main 线程先去忙其他任务了】
12:29:59.079 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 -1 秒后出结果:812:29:59.079 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - [进入 whenComplete 方法]
12:29:59.079 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - [退出 whenComplete 方法]
【异常情况 Causejava.lang.ArithmeticException: / by zero】
12:29:59.080 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - 【异常情况 Messagejava.lang.ArithmeticException: / by zero】

2.4.函数式接口简单复习


  • Runnable 接口无参数,无返回值
@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

  • Function<T, R> 接收一个参数,且有返回值
@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
}


  • Consumer 接收一个参数,没有返回值
  • BiConsumer<T, U> 接收两个参数,没有返回值(Bi 是英文单词的词根,代表两个的意思)
@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}
@FunctionalInterface
public interface BiConsumer<T, U> {
	accept(T t, U u);
}

  • Supplier 接口没有参数,有一个返回值
@FunctionalInterface
public interface Supplier<T> {
    T get();
}

2.5.案例:电商比价


案例说明:电商比价需求

模拟如下情况:

  • 需求
    • 同一款产品,同时搜索出同款产品在各大电商的售价
    • 同一款产品,同时搜索出本产品在某一个电商平台下,各个入驻门店的售价是多少
  • 输出
    • 输出的结果,希望是同款产品的在不同地方的价格清单列表,返回的是一个 List
      《MySQL》 in jd price is 88.05
      《MySQL》 pdd price is 86.11
      《MySQL》 in taobao price is 90.43
  • 解决方案:比对同一个商品在各个平台上的价格,要求活动一个清单列表
    • step by step,按部就班,查完京东查淘宝,查完淘宝查天猫 … …
    • all in,多线程异步任务同时查询

NetMall.java

public class NetMall {
    private String netMallName;

    public String getNetMallName() {
        return netMallName;
    }

    public NetMall(String netMallName) {
        this.netMallName = netMallName;
    }

    public Double calcPrice(String productName) {
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
        
        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    }
}

CompletableFutureMallDemo.java

@Slf4j(topic = "c.CompletableFutureMallDemo")
public class CompletableFutureMallDemo {
    static List<NetMall> list = Arrays.asList(
            new NetMall("jd"),
            new NetMall("dangdang"),
            new NetMall("taobao"));

    /**
     * Step By Step
     *
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPrice_1(List<NetMall> list, String productName) {
        // 输出格式:MySQL in taobao price is 90.43
        List<String> collectList = list.stream()
                .map(netMall -> String.format(
                        productName + " in %s  price is %.2f",
                        netMall.getNetMallName(),
                        netMall.calcPrice(productName)))
                .collect(Collectors.toList());
        return collectList;
    }

    /**
     * All In
     *
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPrice_2(List<NetMall> list, String productName) {
        // 其实可以一步链式到位的,但这里为了便于理解,还是中间截断了一下,设了俩变量
        List<CompletableFuture<String>> futureList = list.stream()
                .map(netMall -> CompletableFuture.supplyAsync(
                        () -> String.format(productName + " in %s price is %.2f",
                                netMall.getNetMallName(),
                                netMall.calcPrice(productName))
                )).collect(Collectors.toList());

        List<String> collectList = futureList.stream()
                .map(s -> s.join())
                .collect(Collectors.toList());

        return collectList;
    }

    public static void main(String[] args) {
        long startTime_1 = System.currentTimeMillis();
        List<String> lists_1 = getPrice_1(list, "MySQL");
        for (String element : lists_1) {
            System.out.println(element);
        }
        long endTime_1 = System.currentTimeMillis();
        log.info("【costTime_1:{} 毫秒】", (endTime_1 - startTime_1));

        System.out.println("---------------------------------------------");

        long startTime_2 = System.currentTimeMillis();
        List<String> lists_2 = getPrice_2(list, "MySQL");
        for (String element: lists_2){
            System.out.println(element);
        }
        long endTime_2 = System.currentTimeMillis();
        log.info("【costTime_2:{} 毫秒】", (endTime_2 - startTime_2));
    }
}

控制台输出

MySQL in jd  price is 77.53
MySQL in dangdang  price is 77.19
MySQL in taobao  price is 77.53
17:27:21.710 [main] INFO c.CompletableFutureMallDemo - 【costTime_1:3068 毫秒】
---------------------------------------------
MySQL in jd price is 78.91
MySQL in dangdang price is 77.80
MySQL in taobao price is 77.56
17:27:22.735 [main] INFO c.CompletableFutureMallDemo - 【costTime_2:1014 毫秒】

2.6.常用方法


2.6.1.获得结果和触发计算


获取结果

  • public T get():一旦调用 get() 方法求结果,在 futureTask 没有计算完成的情况下,容易导致线程阻塞
  • public T get(long timeout, TimeUnit unit):超过了设置的时间,则不再等候,会发生阻塞
  • public T join():和 get() 方法的区别不大,只是其在编译期也不会报错,而 get() 方法必须抛异常
  • public T getNow(T valueIfAbsent):在没有计算完成的情况下,会返回一个替代的结果
    特点是立即获取结果不阻塞:计算完成,则返回计算完成后的结果;计算没有完成,则会返回设定的 ValueIfAbsent

主动触发计算

  • public boolean complete(T value):判断是否是 get() 方法发生打断操作,并立即返回括号里面的值
    显然这个方法也是立即获取结果不阻塞
    计算完成,get() 方法没有发生打断操作,返回 false ,返回计算完成后的结果;
    计算没有完成,get() 方法发生打断操作,返回 true ,返回设定的 value

@Slf4j(topic = "c.CompletableFutureAPIDemo")
public class CompletableFutureAPIDemo {
    // public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
    public static void main(String[] args) {
        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
            log.info("【进入 CompletableFuture.supplyAsync()】");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "ABC";
        });

        // log.info("【{}】", cf.get());
        // log.info("【{}】", cf.get(2L, TimeUnit.SECONDS));
        // log.info("【{}】", cf.join());

        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // log.info("【{}】", cf.getNow("xxx"));

        log.info("[{}]\t[{}]", cf.complete("completeValue"), cf.join());
    }
}

控制台输出结果

19:43:53.219 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo - 【进入 CompletableFuture.supplyAsync()19:43:56.223 [main] INFO c.CompletableFutureAPIDemo - [false]	[ABC]

2.6.2.对计算结果进行处理


  • public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
    计算结果存在依赖关系,这两个线程串行化
    由于存在依赖关系(当前步骤出错,则不走下一步),当前步骤有异常会叫停。
  • public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
    计算结果存在依赖关系,这两个线程串行化
    这个方法和上面方法的区别就是,纵然异常出现了,它也可以继续往下一步走,可以根据带的异常参数进一步处理

exceptionallly() 类似于 try-catch

whenComplete()handle() 类似于 try-finally


@Slf4j(topic = "c.CompletableFutureAPIDemo_2")
public class CompletableFutureAPIDemo_2 {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        // test_1_1(threadPool);
        // test_2(threadPool);
    }
}
private static void test_1_1(ExecutorService threadPool) {
    CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
        log.info("【111】");
        return 1;
    }, threadPool).thenApply(f -> {
        int i = 1 / 0; //故意设置异常
        log.info("【222】");
        return f + 2;
    }).thenApply(f -> {
        log.info("【333】");
        return f + 3;
    }).whenComplete((v, e) -> {
        log.info("---[进入 whenComplete() 方法]---");
        if (e == null) log.info("【计算结果:{}】", v);
        log.info("---[即将退出 whenComplete() 方法]---");
    }).exceptionally(e -> {
        // e.printStackTrace();
        log.info("【{}】", e.getMessage());
        return null;
    });

    log.info("-----【({})线程去忙其它事情了】-----", Thread.currentThread().getName());

    threadPool.shutdown();
}
private static void test_2(ExecutorService threadPool) {
    CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
        log.info("【111】");
        return 1;
    }, threadPool).handle((f, e) -> {
        int i = 1 / 0;// 此处故意设置异常
        log.info("【222】");
        return f + 2;
    }).handle((f, e) -> {
        log.info("【333】");
        return f + 3;
    }).whenComplete((v, e) -> {
        log.info("---[进入 whenComplete() 方法]---");
        if (e == null) log.info("【计算结果:{}】", v);
        log.info("---[即将退出 whenComplete() 方法]---");
    }).exceptionally(e -> {
        // e.printStackTrace();
        log.info("【{}】", e.getMessage());
        return null;
    });

    log.info("-----【({})线程去忙其它事情了】-----", Thread.currentThread().getName());

    threadPool.shutdown();
}

控制台正常输出信息

20:31:23.578 [main] INFO c.CompletableFutureAPIDemo_2 - -----【(main)线程去忙其它事情了】-----
20:31:24.581 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 -11120:31:24.581 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 -22220:31:24.581 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 -33320:31:24.581 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - ---[进入 whenComplete() 方法]---
20:31:24.581 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - 【计算结果:620:31:24.581 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - ---[即将退出 whenComplete() 方法]---

控制台输出信息(打印 【222】 的操作部分有异常)使用 thenApply()

20:30:53.304 [main] INFO c.CompletableFutureAPIDemo_2 - -----【(main)线程去忙其它事情了】-----
20:30:54.302 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 -11120:30:54.302 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - ---[进入 whenComplete() 方法]---
20:30:54.302 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - ---[即将退出 whenComplete() 方法]---
20:30:54.302 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 -java.lang.ArithmeticException: / by zero】

控制台输出信息(打印 【222】 的操作部分有异常)使用 handle()

20:31:48.706 [main] INFO c.CompletableFutureAPIDemo_2 - -----【(main)线程去忙其它事情了】-----
20:31:49.711 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 -11120:31:49.711 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 -33320:31:49.711 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - ---[进入 whenComplete() 方法]---
20:31:49.711 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - ---[即将退出 whenComplete() 方法]---
20:31:49.711 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 -java.lang.NullPointerException

这里我们再看一下 thenApplyAsync() 方法

java/util/concurrent/CompletableFuture.java

public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn, Executor executor) {
    return uniApplyStage(screenExecutor(executor), fn);
}

实际操作

private static void test_1_2(ExecutorService threadPool) {
    CompletableFuture.supplyAsync(() -> {
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
        log.info("【111】");
        return 1;
    }, threadPool).thenApplyAsync(f -> {
        log.info("【222】");
        return f + 2;
    }, threadPool).thenApplyAsync(f -> {
        log.info("【333】");
        return f + 3;
    }).whenComplete((v, e) -> {
        log.info("---[进入 whenComplete() 方法]---");
        if (e == null) log.info("【计算结果:{}】", v);
        log.info("---[即将退出 whenComplete() 方法]---");
    }).exceptionally(e -> {
        // e.printStackTrace();
        log.info("【{}】", e.getMessage());
        return null;
    });

    log.info("-----【({})线程去忙其它事情了】-----", Thread.currentThread().getName());

    try { TimeUnit.SECONDS.sleep(6); } catch (InterruptedException e) { e.printStackTrace(); }
    
    threadPool.shutdown();
}

控制信息输出

21:35:21.902 [main] INFO c.CompletableFutureAPIDemo_2 - -----【(main)线程去忙其它事情了】-----
21:35:22.902 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 -11121:35:22.902 [pool-1-thread-2] INFO c.CompletableFutureAPIDemo_2 -22221:35:22.903 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_2 -33321:35:22.903 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_2 - ---[进入 whenComplete() 方法]---
21:35:22.903 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_2 - 【计算结果:621:35:22.903 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_2 - ---[即将退出 whenComplete() 方法]---

如果你 threadPool.shutdown(); 关闭的时间过早的话,也会抛出异常

java.util.concurrent.CompletionException: java.util.concurrent.RejectedExecutionException: 
	Task java.util.concurrent.CompletableFuture$UniApply@514e386b rejected from 
	java.util.concurrent.ThreadPoolExecutor@5f89dc32
	[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
... ...

Async 的意思是 “异步的”。

CompletableFuture 中,带了 Async 的方法都是在多个线程之间做串行化的处理。

thenApplyAsync:在前一个线程执行完成后,会开始执行,获取前一个线程的返回结果,同时也会返回结果信息


2.6.3.对计算结果进行消费


  • public CompletableFuture<Void> thenRun(Runnable action)
    任务 A 执行完执行 B,并且 B 不需要 A 的结果
  • public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
    任务 A 执行完成执行 BB 需要 A 的结果,但是任务 B 无返回值
  • public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
    任务 A 执行完成执行 BB 需要 A 的结果,同时任务 B 有返回值

public class CompletableFutureAPIDemo_3 {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            return "整一个值 A";
        }).thenApply(
                f -> { return f + "\t再整一个值 C"; }
        ).thenAccept(
                f -> System.out.println("thenAccept:" + f)
        ).thenRun(() -> {
            System.out.println("thenRun():无输入值,亦无返回值");
        });
        System.out.println("===========================================");
        System.out.println(CompletableFuture.supplyAsync(() -> "Result").thenRun(() -> {}).join());
    }
}

控制台输出信息

thenAccept:整一个值 A	再整一个值 C
thenRun():无输入值,亦无返回值
===========================================
null

2.6.4.其他:线程池运行选择


thenRunthenRunAsync 为例,分析他们的区别

(这里的代码比较简单,故省略。下面只贴上控制台的输出结果)


  • supplyAsync(supplier) + thenRun(runnable)
  • supplyAsync(suppler) + thenRunAsync(runnable)
10:00:07.860 [main] INFO c.TestA - 【main 线程正在摸鱼中...10:00:07.893 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 一号任务:ForkJoinPool.commonPool-worker-1
10:00:07.925 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 二号任务:ForkJoinPool.commonPool-worker-1
10:00:07.956 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 三号任务:ForkJoinPool.commonPool-worker-1
10:00:07.988 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 四号任务:ForkJoinPool.commonPool-worker-1
10:00:07.988 [main] INFO c.TestA - 【cf.get:null

  • supplyAsync(suppler + threadPool) + thenRun(runnable)
10:02:00.847 [main] INFO c.TestA - 【main 线程正在摸鱼中...10:02:00.868 [pool-1-thread-1] INFO c.TestA - 一号任务:pool-1-thread-1
10:02:00.898 [pool-1-thread-1] INFO c.TestA - 二号任务:pool-1-thread-1
10:02:00.930 [pool-1-thread-1] INFO c.TestA - 三号任务:pool-1-thread-1
10:02:00.960 [pool-1-thread-1] INFO c.TestA - 四号任务:pool-1-thread-1
10:02:00.960 [main] INFO c.TestA - 【cf.get:null

  • supplyAsync(suppler + threadPool) + thenRunAsync(runnable)
10:07:34.691 [main] INFO c.TestA - 【main 线程正在摸鱼中...10:07:34.713 [pool-1-thread-1] INFO c.TestA - 一号任务:pool-1-thread-1
10:07:34.744 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 二号任务:ForkJoinPool.commonPool-worker-1
10:07:34.776 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 三号任务:ForkJoinPool.commonPool-worker-1
10:07:34.808 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 四号任务:ForkJoinPool.commonPool-worker-1
10:07:34.808 [main] INFO c.TestA - 【cf.get:null

  • supplyAsync(suppler + threadPool) + thenRunAsync(runnable + threadPool)
10:12:27.048 [main] INFO c.TestA - 【main 线程正在摸鱼中...10:12:27.068 [pool-1-thread-1] INFO c.TestA - 一号任务:pool-1-thread-1
10:12:27.100 [pool-1-thread-2] INFO c.TestA - 二号任务:pool-1-thread-2
10:12:27.130 [pool-1-thread-3] INFO c.TestA - 三号任务:pool-1-thread-3
10:12:27.161 [pool-1-thread-4] INFO c.TestA - 四号任务:pool-1-thread-4
10:12:27.161 [main] INFO c.TestA - 【cf.get:null

  • supplyAsync(suppler + threadPool) + thenRunAsync(runnable) + thenRunAsync(runnable + threadPool) + thenRunAsync(runnable)
10:15:02.708 [main] INFO c.TestA - 【main 线程正在摸鱼中...10:15:02.741 [pool-1-thread-1] INFO c.TestA - 一号任务:pool-1-thread-1
10:15:02.772 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 二号任务:ForkJoinPool.commonPool-worker-1
10:15:02.804 [pool-1-thread-2] INFO c.TestA - 三号任务:pool-1-thread-2
10:15:02.835 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 四号任务:ForkJoinPool.commonPool-worker-1
10:15:02.835 [main] INFO c.TestA - 【cf.get:null

分析

  • 没有传入自定义的线程池的时候,使用默认线程池 ForkJoinPool
  • 执行第一个任务的时候,传入了一个自定义的线程池:
    • 调用 thenRun() 方法执行第二个任务的时候,第二个任务和第一个任务都是共用一个线程池
    • 调用 thenRunAsync() 方法执行第二个任务的时候,第一个任务是要的是自己传入的线程池,但第二个任务仍然使用 ForkJoinPool

注意:有时候系统处理的速度太快,则直接使用 main 线程处理

  • supplyAsync(suppler + threadPool) + thenRun(runnable)
  • supplyAsync(suppler + threadPool) + thenRunAsync(runnable)
【一号任务:pool-1-thread-1】
【二号任务:main】
【三号任务:main】
【四号任务:main】
10:38:02.131 [main] INFO c.TestA - 【main 线程正在摸鱼中...10:38:02.143 [main] INFO c.TestA - 【cf.get:null

这里再贴一下相关的源码

java/util/concurrent/CompletableFuture.java

public CompletableFuture<Void> thenRunAsync(Runnable action) {
    return uniRunStage(asyncPool, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) {
    return uniRunStage(screenExecutor(executor), action);
}
private static final boolean useCommonPool =
    (ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
static Executor screenExecutor(Executor e) {
    if (!useCommonPool && e == ForkJoinPool.commonPool())
        return asyncPool;
    if (e == null) throw new NullPointerException();
    return e;
}

简言之:在一般情况下,如果不给 thenRunAsync() 方法指定线程池的话,其就会使用默认的线程池

有一种特殊情况,那就是系统处理的速度太快,来不及使用线程池,就直接使用 main 线程处理完了


2.6.5.对计算速度进行选用


  • public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
    选择最快的策略(谁快用谁)

示例代码

@Slf4j(topic = "c.CompletableFutureAPIDemo_4")
public class CompletableFutureAPIDemo_4 {
    public static void main(String[] args) {
        CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> {
            log.info("【A come in】");
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
            return "[This is cfA]";
        });

        CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> {
            log.info("【B come in】");
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            return "[This is cfB]";
        });

        CompletableFuture<String> result = cfA.applyToEither(cfB, f -> { return "【" + f + " is Winer" + "】"; });
        
        log.info(result.join());
    }
}

控制台输出信息

11:19:01.620 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_4 -A come in】
11:19:01.620 [ForkJoinPool.commonPool-worker-2] INFO c.CompletableFutureAPIDemo_4 -B come in】
11:19:02.634 [main] INFO c.CompletableFutureAPIDemo_4 -[This is cfB] is Winer

2.6.6.对计算结果进行合并


  • public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
  • 两个 CompletionStage 任务都完成后,最终能把两个任务的结果一起交给 thenCombine 来处理。
  • 先完成的策略先等着,等待其它分支任务完成,最后一起合并。

示例代码

@Slf4j(topic = "c.CompletableFutureAPIDemo_5")
public class CompletableFutureAPIDemo_5 {
    public static void main(String[] args) {
        CompletableFuture<Integer> cf_1 = CompletableFuture.supplyAsync(() -> {
            log.info("【[cf_1] | 开始启动】");
            try { TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) { e.printStackTrace(); }
            return 10;
        });

        CompletableFuture<Integer> cf_2 = CompletableFuture.supplyAsync(() -> {
            log.info("【[cf_2] | 开始启动】");
            try { TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) { e.printStackTrace(); }
            return 20;
        });

        CompletableFuture<Integer> combineResult = cf_1.thenCombine(cf_2, (x, y) -> {
            log.info("【俩结果开始合并】");
            return x + y;
        });

        log.info("【CombineResult:{}】", combineResult.join());
    }
}

控制台输出信息

11:39:27.550 [ForkJoinPool.commonPool-worker-2] INFO c.CompletableFutureAPIDemo_5 -[cf_2] | 开始启动】
11:39:27.550 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_5 -[cf_1] | 开始启动】
11:39:29.571 [ForkJoinPool.commonPool-worker-2] INFO c.CompletableFutureAPIDemo_5 - 【俩结果开始合并】
11:39:29.571 [main] INFO c.CompletableFutureAPIDemo_5 -CombineResult30

示例代码

@Slf4j(topic = "c.CompletableFutureAPIDemo_5Plus")
public class CompletableFutureAPIDemo_5Plus {
    public static void main(String[] args) {
        CompletableFuture<Integer> combineResult = CompletableFuture.supplyAsync(() -> {
            log.info("【come in 1】");
            return 10;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            log.info("【come in 2】");
            return 20;
        }), (x, y) -> {
            log.info("【come in 3】");
            return x + y;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            log.info("【come in 4】");
            return 30;
        }), (a, b) -> {
            log.info("【come in 5】");
            return a + b;
        });

        log.info("【[{}]线程在摸鱼】", Thread.currentThread().getName());
        log.info("【CombineResult:{}】", combineResult.join());
    }
}

输出结果

11:50:02.395 [ForkJoinPool.commonPool-worker-3] INFO c.CompletableFutureAPIDemo_5Plus - 【come in 411:50:02.395 [ForkJoinPool.commonPool-worker-2] INFO c.CompletableFutureAPIDemo_5Plus - 【come in 211:50:02.396 [main] INFO c.CompletableFutureAPIDemo_5Plus -[main]线程在摸鱼】
11:50:02.395 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_5Plus - 【come in 111:50:02.406 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_5Plus - 【come in 311:50:02.406 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_5Plus - 【come in 511:50:02.406 [main] INFO c.CompletableFutureAPIDemo_5Plus -CombineResult60

2.6.7.并行运行多任务


有这样一种情况:并行运行多个互不相关的任务,最后统计整理它们处理的计算结果。

  • public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
    方法会等到所有的 CompletableFuture 都运行完成之后再返回
  • public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
    只要有一个 CompletableFuture 执行完成,它就会返回

示例代码

@Slf4j(topic = "c.Demo_6")
public class CompletableFutureAPIDemo_6 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.MILLISECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); }
            log.info("执行一号任务");
            return "111";
        });

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }
            log.info("执行二号任务");
            return "222";
        });

        CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
            log.info("执行三号任务");
            return "333";
        });

        // allOfTest(cf1, cf2, cf3);
        // anyOfTest(cf1, cf2, cf3);

        log.info("溜啦溜啦");
    }

    private static void anyOfTest(CompletableFuture<String> cf1, CompletableFuture<String> cf2, CompletableFuture<String> cf3) {
        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(cf1, cf2, cf3);
        log.info("【[anyFuture.join]:{}]】", anyFuture.join());
        log.info("【完成了一个任务了】");
        log.info("【[anyFuture]:{}】", anyFuture);
    }

    private static void allOfTest(CompletableFuture<String> cf1, CompletableFuture<String> cf2, CompletableFuture<String> cf3) {
        CompletableFuture<Void> allFuture = CompletableFuture.allOf(cf1, cf2, cf3);
        log.info("【[allFuture.join]:{}】",allFuture.join());
        log.info("【所有任务都已经完成啦】");
        log.info("【[allFuture]:{}】", allFuture);
    }
}

输出结果-1CompletableFuture.allOf()

13:16:01.084 [ForkJoinPool.commonPool-worker-3] INFO c.Demo_6 - 执行三号任务
13:16:01.097 [ForkJoinPool.commonPool-worker-2] INFO c.Demo_6 - 执行二号任务
13:16:01.112 [ForkJoinPool.commonPool-worker-1] INFO c.Demo_6 - 执行一号任务
13:16:01.112 [main] INFO c.Demo_6 -[allFuture.join]null13:16:01.113 [main] INFO c.Demo_6 - 【所有任务都已经完成啦】
13:16:01.114 [main] INFO c.Demo_6 -[allFuture]java.util.concurrent.CompletableFuture@66cd51c3[Completed normally]13:16:01.114 [main] INFO c.Demo_6 - 溜啦溜啦

输出结果-2CompletableFuture.anyOf()

13:14:00.829 [ForkJoinPool.commonPool-worker-3] INFO c.Demo_6 - 执行三号任务
13:14:00.829 [ForkJoinPool.commonPool-worker-2] INFO c.Demo_6 - 执行二号任务
13:14:00.842 [ForkJoinPool.commonPool-worker-1] INFO c.Demo_6 - 执行一号任务
13:14:00.841 [main] INFO c.Demo_6 -[anyFuture.join]333]13:14:00.843 [main] INFO c.Demo_6 - 【完成了一个任务了】
13:14:00.844 [main] INFO c.Demo_6 -[anyFuture]java.util.concurrent.CompletableFuture@66cd51c3[Completed normally]13:14:00.844 [main] INFO c.Demo_6 - 溜啦溜啦

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/392586.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

win10开机黑屏只有鼠标怎么办?这里有4个妙招

真实案例&#xff1a;电脑开机黑屏&#xff0c;只出现鼠标箭头光标怎么办&#xff1f; “早上打开电脑&#xff0c;发现开不了机&#xff0c;屏幕上只有一个鼠标光标&#xff01;百度搜索了很长时间&#xff0c;但所有的方法都没有奏效。求教各位大神&#xff0c;有什么好方法…

中电金信源启小程序开发平台 赋能金融+业务生态共享共建

导语&#xff1a;源启小程序开发平台立足于“为金融业定制”&#xff0c;从小程序全生命周期的角度出发&#xff0c;助力银行、互联网金融、保险、证券客户实现一站式小程序开发、发布、运营与营销。企业可以通过源启小程序开发平台&#xff0c;低成本高效率开发一款定制化小程…

The 19th Zhejiang Provincial Collegiate Programming Contest vp

和队友冲了这场&#xff0c;极限6题&#xff0c;重罚时铁首怎么说&#xff0c;前面的A题我贡献了太多的罚时&#xff0c;然后我的G题最短路调了一万年&#xff0c;因为太久没写了&#xff0c;甚至把队列打成了优先队列&#xff0c;没把head数组清空完全&#xff0c;都是我的锅呜…

搭载英伟达Jetson Orin的Allspark 2全新亮相,算力高达100TOPS!

Allspark 2 系列AI边缘计算机 Allspark 2经过设计优化的铝合金外壳&#xff0c;内置静音涡轮风扇&#xff0c;散热优秀。尺寸102.5X62.5X31mm&#xff0c;整机重量188g。 相比Allspark 1&#xff0c;2代整机轻了25克&#xff0c;更加轻薄。 在机身更加轻薄的情况下&#xff0c…

1497. 树的遍历

文章目录1.二叉树的遍历2.二叉树的构造3.例题3.1不使用BFS3.2使用BFS二叉树的构造&#xff1a;没有中序遍历则无法唯一构造1.二叉树的遍历 2.二叉树的构造 3.例题 一个二叉树&#xff0c;树中每个节点的权值互不相同。 现在给出它的后序遍历和中序遍历&#xff0c;请你输出它…

蓝桥杯训练day2

day21.二分(1)789. 数的范围(2)四平方和&#xff08;1&#xff09;哈希表做法&#xff08;2&#xff09;二分做法(3)1227. 分巧克力&#xff08;4&#xff09;113. 特殊排序(5)1460. 我在哪&#xff1f;2.双指针(1)1238. 日志统计(2)1240. 完全二叉树的权值&#xff08;3&#…

koa-vue的分页实现

1.引言 最近确实体会到了前端找工作的难处&#xff0c;不过大家还是要稳住心态&#xff0c;毕竟有一些前端大神说的有道理&#xff0c;前端发展了近20年&#xff0c;诞生了很多leader级别的大神&#xff0c;这些大神可能都没有合适的坑位&#xff0c;我们新手入坑自然难一些&am…

HD-G2L-IOT V2.0核心板MPU压力测试

1. 测试对象HD-G2L-IOT基于HD-G2L-CORE V2.0工业级核心板设计&#xff0c;双路千兆网口、双路CAN-bus、2路RS-232、2路RS-485、DSI、LCD、4G/5G、WiFi、CSI摄像头接口等&#xff0c;接口丰富&#xff0c;适用于工业现场应用需求&#xff0c;亦方便用户评估核心板及CPU的性能。H…

PMP高分上岸人士的备考心得,分享考试中你还不知道的小秘密

上岸其实也不是什么特别难的事情&#xff0c;考试一共就180道选择题&#xff0c;题目只要答对60.57%就可以通过考试&#xff0c;高分通过没在怕的&#xff0c;加油备考呀朋友们&#xff01; 这里也提一嘴&#xff0c;大家备考的时候比较顾虑的一个问题就是考试究竟要不要报班…

js循环判断的方法

js循环判断的方法if语句if else语句if else if else if......三元表达式switchswitch语句和if语句的区别for循环while循环do while循环for inforEachfor of性能问题if语句 条件满足就执行&#xff0c;不满足就不执行 if(条件){语句}if else语句 条件满足&#xff0c;执行语句…

认识3D旋转变换矩阵

前文输出了cesium的Rotation变量&#xff0c;一个矩阵&#xff1b;把这矩阵写下来看下&#xff1b; 0.99939 -0.034899 0 0 0.034899 0.99939 0 0 0 0 1 0 0 0 0 1 看一下3D数学的相关描述&#xff1b;…

周赛335(模拟、质因子分解、分组背包)

题解&#xff1a;0x3f https://leetcode.cn/problems/number-of-ways-to-earn-points/solution/fen-zu-bei-bao-pythonjavacgo-by-endlessc-ludl/ 文章目录周赛335[6307. 递枕头](https://leetcode.cn/problems/pass-the-pillow/)模拟[6308. 二叉树中的第 K 大层和](https://le…

扬帆优配|本周限售股解禁规模不足300亿元,这8家公司解禁压力大

本周限售股解禁规模环比大降至300亿元之下。 Wind数据显示&#xff0c;除掉新上市公司&#xff0c;本周A股商场共有36家公司限售股解禁&#xff0c;解禁数量27.69亿股&#xff0c;以最新收盘价核算&#xff08;下同&#xff09;&#xff0c;解禁市值268.81亿元。 解禁市值超越…

【第二章 @RequestMapping注解(value,method,params属性),springMVC支持ant风格的路径,支持路径中的占位符】

第二章 RequestMapping注解&#xff08;value&#xff0c;method&#xff0c;params属性&#xff09;&#xff0c;springMVC支持ant风格的路径&#xff0c;支持路径中的占位符 1. RequestMapping注解&#xff1a; &#xff08;1&#xff09; RequestMapping注解的作用就是将请…

数据结构与算法之桶排序

目录桶排序概念代码实现时间复杂度桶排序概念 桶排序 &#xff08;Bucket sort&#xff09;或所谓的箱排序&#xff0c;是一个排序算法&#xff0c;工作的原理是将数组分到有限数量的桶里。每个桶再个别排序&#xff08;有可能再使用别的排序算法或是以递归方式继续使用桶排序…

如何根据实际需求选择合适的三维实景建模方式?

随着实景三维中国建设的推进&#xff0c;对三维实景建模的数字化需求大幅增加。由于三维实景建模具有采集速度快、计算精度高等建模优势&#xff0c;引起了各个行业的高度关注。三维实景建模是一种应用数码相机或者激光扫描仪对现有场景进行多角度环视拍摄&#xff0c;然后利用…

进制转换(详解二进制、八进制、十进制、十六进制的相互转换)

目录二进制运算规则十进制的转换二进制数、八进制数、十六进制数的相互转换&#x1f458;什么是数制 用进位的原则进行计数称为进位计数制&#xff0c;简称数制。进位计数制的特点是表示数值大小的数码与它所处的位置有关&#xff0c;每种数制都包含两个基本的要素&#xff1a;…

WPF WrapPanel、UniformGrid、DockPanel介绍

WPF WrapPanel、UniformGrid、DockPanel介绍 WrapPanel WrapPanel , 具有在有限的容器范围内, 可以自动换行, 或者换列处理。具体则取决于WrapPanel的排列方式 (Orientation)。 Orientation"Horizontal"时各控件从左至右罗列&#xff0c;当面板长度不够时&#xff…

JMM JVM 垃圾回收

目录 一、JMM内存模型 1、定义 2、JMM的三大特性(可见性原子性有序性) 2.1 可见性 2.2 原子性 2.3 有序性 3、JMM中的8种原子操作 二、JVM 1、JVM体系结构 2、JVM参数调优 2.1 三大参数类型 2.2 九个调优参数 三、垃圾回收器 1、4种GC算法(引用计数 /复制拷贝/标…

深度学习pytorch实战三:VGG16图像分类篇自建数据集图像分类三类

1.自建数据集与划分训练集与测试集 2.模型相关知识 3.model.py——定义AlexNet网络模型 4.train.py——加载数据集并训练&#xff0c;训练集计算损失值loss&#xff0c;测试集计算accuracy&#xff0c;保存训练好的网络参数 5.predict.py——利用训练好的网络参数后&#xff0c…