JUC并发编程与源码分析笔记03-CompletableFuture

news2024/11/26 10:18:50

Future接口理论知识复习

Future接口(FutureTask实现类)定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
找到java.util.concurrent.Future,看到里面定义的方法,这些方法就是我们需要关注的方法。
Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。

Future接口常用实现类FutureTask异步任务

Future接口能干什么

Future是Java5新加的一个接口,它提供了一个异步并行计算的功能,如果主线程需要执行一个很耗时的计算任务,我们就可以通过Future把这个任务放到异步线程中执行,主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。
目的:异步多线程任务执行且返回有结果,三个特点:多线程、有返回、异步任务。

本源的Future接口相关架构

在这里插入图片描述
可以看到FutureTask实现了Runnable、Future接口,而且它的构造参数还支持传入Callable,所以FutureTask现在就具有多线程(Runnable)、有返回(Callable)、异步任务(Future)这3个特点了。

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<>(new MyThread());// 将一个耗时的操作封装到FutureTask里
        new Thread(futureTask, "threadName").start();// 启动一个子线程执行FutureTask
        System.out.println(futureTask.get());// 获取FutureTask的返回值
    }
}

class MyThread implements Callable<String> {
    @Override
    public String call() {
        System.out.println("MyThread.call");
        return "Hello Callable";
    }
}

Future编码实战和优缺点分析

优点

Future结合线程池,可以显著提高程序的执行效率。

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class FutureThreadPoolDemo {
    /**
     * 有3个任务,分别耗时500ms,300ms,300ms
     * fun1():3个任务由主线程依次执行
     * fun2():将3个任务放到线程池执行
     */
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        fun1();
        fun2();
    }

    private static void fun2() throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        ExecutorService executorService = Executors.newFixedThreadPool(3);// 创建一个线程池
        FutureTask<String> futureTask1 = new FutureTask<>(() -> {// 任务1
            Thread.sleep(500);
            return "task1 over";
        });
        executorService.submit(futureTask1);
        FutureTask<String> futureTask2 = new FutureTask<>(() -> {// 任务2
            Thread.sleep(300);
            return "task2 over";
        });
        executorService.submit(futureTask2);
        FutureTask<String> futureTask3 = new FutureTask<>(() -> {// 任务3
            Thread.sleep(300);
            return "task3 over";
        });
        executorService.submit(futureTask3);
        // 这里获取返回值,主线程会阻塞等待直到拿到返回值,如果把获取结果注释掉,fun2()的执行时间会很短,因为主线程执行完毕,但是子线程依旧在跑
        System.out.println(futureTask1.get());
        System.out.println(futureTask2.get());
        System.out.println(futureTask3.get());
        long endTime = System.currentTimeMillis();
        System.out.println("fun()2总耗时:" + (endTime - startTime));
        executorService.shutdown();// 关闭线程池
    }

    private static void fun1() throws InterruptedException {
        long startTime = System.currentTimeMillis();
        Thread.sleep(500);// 任务1
        Thread.sleep(300);// 任务2
        Thread.sleep(300);// 任务3
        long endTime = System.currentTimeMillis();
        System.out.println("fun1()总耗时:" + (endTime - startTime));
    }
}

缺点

get()阻塞

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> futureTask = new FutureTask<>(()->{
            Thread.sleep(5000);
           return "task over";
        });
        new Thread(futureTask).start();
        System.out.println("主线程正在运行");
//        System.out.println(futureTask.get());// 阻塞,直到获取到返回结果
        System.out.println(futureTask.get(2000, TimeUnit.MILLISECONDS));// 只等待2000ms,超时未返回直接抛异常
    }
}

get()方法会有阻塞问题,导致主线程无法继续执行,另外提供了一个带参的get()方法,超时自动放弃等待。

isDone()轮询

有时候,在get()阻塞期间,我们希望看到进度或者提示信息,而不是一味地等待,可以将get()改成轮询,在轮询方法里通过isDone()判断任务是否执行完毕,只是多了一些提示信息。

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeoutException;

public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            Thread.sleep(5000);
            return "task over";
        });
        new Thread(futureTask).start();
        System.out.println("主线程正在运行");
//        System.out.println(futureTask.get());// 阻塞,直到获取到返回结果
//        System.out.println(futureTask.get(2000, TimeUnit.MILLISECONDS));// 只等待2000ms,超时未返回直接抛异常
        while (true) {
            if (futureTask.isDone()) {
                System.out.println(futureTask.get());
                break;
            } else {
                Thread.sleep(1000);
                System.out.println("正在处理中,请稍等……");
            }
        }
    }
}

这样的缺点就是:频繁的调用isDone()方法,对CPU来说是浪费资源。

结论

Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式获取任务结果。

想完成一些复杂的任务

对于简单的业务场景,Future是完全可以胜任的。

回调通知

对于Future的完成时间,我们希望,完成之后可以通知主线程。

创建异步任务

Future结合线程池。

多个任务前后依赖可以组合处理(水煮鱼)

希望将多个异步任务的结果组合起来,后一个异步任务的计算需要前一个任务的值。
希望将多个异步计算合成成一个异步计算,这几个异步计算相互独立,同时,后一个的计算依赖前一个的计算结果。

对计算速度选最快

当Future集合中有多个任务的时候,处理最快的一个完成,返回第一个处理的结果。

CompletableFuture对Future的改进

CompletableFuture为什么会出现

Future中get()方法和isDone()方法都存在问题,对于真正的异步处理,我们希望可以通过回调函数,在Future结束之后,自动调用回调函数,这样就不用等待返回结果了。
阻塞的方式和异步编程的设计理念相违背,轮询会额外耗费CPU资源,因此JDK8设计出了CompletableFuture,它提供了一个类似观察者模式的机制,任务完成之后,通知监听的一方。

CompletableFuture和CompletionStage源码分别介绍

类架构说明

在这里插入图片描述

接口CompletionStage

  • CompletionStage代表异步计算过程中的某个阶段,一个阶段完成后可能会触发另一个阶段,有些类似Linux系统管道分隔传参数
  • 一个阶段的执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.println(x)).thenRun(() -> System.out.println())
  • 一个阶段的执行可能被单个阶段的完成触发,有可能是由多个阶段一起触发

类CompletableFuture

  • 在Java8中,CompletableFuture提供了非常强大的Future扩展功能,可以帮助我们简化异步编程复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,提供了转换和组合CompletableFuture的方法
  • 它可能代表一个明确完成的Future,也可能带一个完成阶段(CompletionStage),它支持在计算完成后触发一些函数或者执行某些动作
  • 它实现了Future和CompletionStage接口

核心的四个静态方法,创建一个异步任务

runAsync:无返回值

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

supplyAsync:有返回值

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

上述Executor参数说明

没有指定线程池executor的时候,使用的是默认的ForkJoinPool.commonPool(),如果指定了线程池,则使用自定义的或指定的线程池。

Code

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        fun1();
        System.out.println("----------------------------------------");
        fun2();
        System.out.println("----------------------------------------");
        fun3();
        System.out.println("----------------------------------------");
        fun4();
    }

    private static void fun1() throws ExecutionException, InterruptedException {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(completableFuture.get());
    }

    private static void fun2() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, executorService);
        System.out.println(completableFuture.get());
        executorService.shutdown();
    }

    private static void fun3() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "CompletableFutureBuildDemo.fun3";
        });
        System.out.println(completableFuture.get());
    }

    private static void fun4() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "CompletableFutureBuildDemo.fun4";
        }, executorService);
        System.out.println(completableFuture.get());
        executorService.shutdown();
    }
}

通用演示,减少阻塞和轮询

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;

public class CompletableFutureUserDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture.supplyAsync(() -> {
                    System.out.println(Thread.currentThread().getName());
                    int result = ThreadLocalRandom.current().nextInt();
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return result;
                }, executorService)
                .whenComplete((v, e) -> {
                    if (e == null) {
                        System.out.println("计算完成,计算结果:" + v);
                    }
                })
                .exceptionally((e) -> {
                    e.printStackTrace();
                    System.out.println("出现异常:" + e.getCause() + "\t" + e.getMessage());
                    return null;
                });
        System.out.println(Thread.currentThread().getName() + "正在运行");
        executorService.shutdown();
    }
}

注意这里的一个坑,如果使用的是默认的线程池,主线程执行完毕后,CompletableFuture使用的默认线程池会立刻关闭,就会导致whenComplete方法不能被执行到,所以这里,还是推荐使用自定义线程池。
如果在supplyAsync方法中,出现了异常,也会走whenComplte方法,而且也走execptionally方法。

CompletableFuture的优点

  • 异步任务结束时,自动调用某个对象的方法
  • 主线程设置好回调后,不需要再关心异步任务的执行,异步任务之间可以顺序执行
  • 异步任务出错时,会自动回调某个对象的方法

案例精讲-从电商网站的比价需求说开去

函数式编程已经主流

Lambda表达式、Stream流式调用、Chain链式调用、Java8函数式编程。
函数式编程:

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
}

@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}

@FunctionalInterface
public interface BiConsumer<T, U> {
    void accept(T t, U u);
}

@FunctionalInterface
public interface Supplier<T> {
    T get();
}
函数式接口名称方法名称参数个数返回值
Runnablerun0
Functionapply1
Consumeraccept1
BiConsumeraccept2
Supplierget0

还有一个常用的函数式接口,这里视频并没有提及,它是Predicate

先说说join和get对比

CompletableFuture的get()方法和join()方法相比,作用是一样的,区别是get()在编译阶段,会抛出checkedException,而join()不会。
这里还学到一个Lombok的新知识:在类上添加这个注解:@Accessors(chain = true)// 开启链式写法,可以开启对象的链式写法,比如student.setId(1).setName("xxx").setMajor("yyy");,把原来竖着写的set方法,扭转成横着写了,算是个语法糖吧。

大厂业务需求说明

需求说明:
同一款产品,同时搜索出同款产品在各大电商平台的售价
同一款产品,同时搜索出本产品在同一电商平台下,各个入驻卖家售价
输出返回:
希望查询结果是这款产品在不同地方的价格清单列表,返回一个List<String>
解决方案:
一步一步的查询,最后汇总,效率上会慢
多线程异步任务同时查询 ,返回结果汇总,效率上很高

一波流Java8函数式编程带走-比价案例实战Case

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

public class CompletableFutureMallDmo {
    static List<Mall> mallList = new ArrayList() {{
        add(new Mall("jd", "mysql"));
        add(new Mall("tb", "mysql"));
        add(new Mall("dd", "mysql"));
    }};

    public static void main(String[] args) {
        long begin1 = System.currentTimeMillis();
        List<String> list1 = fun1(mallList, "mysql");
        list1.forEach(System.out::println);
        long end1 = System.currentTimeMillis();
        System.out.println("fun1用时:" + (end1 - begin1));
        System.out.println("---------------------------------------------");
        long begin2 = System.currentTimeMillis();
        List<String> list2 = fun2(mallList, "mysql");
        list2.forEach(System.out::println);
        long end2 = System.currentTimeMillis();
        System.out.println("fun2用时:" + (end2 - begin2));
    }

    private static List<String> fun1(List<Mall> mallList, String productName) {
        return mallList.stream().map(mall -> String.format("%s in %s price is %s", productName, mall.getMallName(), mall.calculatePrice())).collect(Collectors.toList());
    }

    private static List<String> fun2(List<Mall> mallList, String productName) {
        return mallList.stream().map(mall -> CompletableFuture.supplyAsync(() -> String.format("%s in %s price is %s", productName, mall.getMallName(), mall.calculatePrice())))
                .collect(Collectors.toList()).stream()// 这里collect一下,是为了让前一个stream流完成,这样stream流里的线程就可以开始运算,如果没有这一行,就起不到并行作用,因为stream有懒惰的特性,只有执行终端操作时候,才会真正执行运算
                .map(CompletableFuture::join).collect(Collectors.toList());
    }
}

@AllArgsConstructor
@NoArgsConstructor
@Data
@Accessors(chain = true)
class Mall {
    private String mallName;
    private String productName;

    public Double calculatePrice() {
        try {
            Thread.sleep(1000);// 模拟查询时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return ThreadLocalRandom.current().nextDouble();
    }
}

CompletableFuture常用方法

获取结果和触发计算

获取结果

public T get()
public T get(long timeout, TimeUnit unit)
public T join()
public T getNow(T valueIfAbsent):如果在get的时候,还没有返回结果,就将valueIfAbsent的值作为返回值返回

主动触发计算

public boolean complete(T value):方法首先判断进程有没有执行完,如果没有执行完,进行打断,并将value赋值为线程的返回值(通过get()join()获取),如果执行完了,就不需要打断,线程的返回值就是线程里正常的返回值。
关于complete()方法的返回值,有点不好理解,如果线程是被complete()方法触发结束的,返回true,如果线程在执行complete()方法的时候,已经结束,返回false。

public static void main(String[] args) throws InterruptedException {
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello";
    });
    Thread.sleep(2000);
    System.out.println(completableFuture.complete("default") + "\t" + completableFuture.join());
}

修改主线程里的sleep为1000,子线程里的sleep为2000,查看结果对比。

对计算结果进行处理

计算结果存在依赖关系,这两个线程串行化。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureAPIDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第一步");
            return 1;
        }, executorService).thenApply(v -> {
            System.out.println("第二步");
            int a = 1 / 0;
            return v + 1;
        }).thenApply(v -> {
            System.out.println("第三步");
            return v + 1;
        }).whenComplete((v, e) -> {
            if (e == null) {
                System.out.println("最终结果为:" + v);
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            System.out.println(e.getMessage());
            return null;
        });
        executorService.shutdown();
        System.out.println(completableFuture.join());
    }
}

thenApply换成handle,对比区别。可以看到,使用handle后第二步报错的情况下,第三步依旧执行了,而且会带着异常参数,可以根据异常参数做一些判断处理,使用thenApply的话,如果线程内部报错,后续的thenApply就不会执行了。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureAPIDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第一步");
            return 1;
        }, executorService).handle((v, e) -> {
            System.out.println("第二步");
            int a = 1 / 0;
            return v + 1;
        }).handle((v, e) -> {
            System.out.println("第三步");
            return v + 1;
        }).whenComplete((v, e) -> {
            if (e == null) {
                System.out.println("最终结果为:" + v);
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            System.out.println(e.getMessage());
            return null;
        });
        executorService.shutdown();
        System.out.println(completableFuture.join());
    }
}

对计算结果进行消费

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
把线程的运算结果消费掉,没有返回值。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureAPIDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        }, executorService).thenAccept(System.out::println);
        executorService.shutdown();
    }
}

任务之间的执行顺序对比:
public CompletableFuture<Void> thenRun(Runnable action):任务A执行完执行任务B,任务B不需要任务A的结果,也无返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action):任务A执行完执行任务B,任务B需要任务A的结果,但是任务B无返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn):任务A执行完执行任务B,任务B需要任务A的结果,但是任务B有返回值
CompletableFuture和线程池的说明:
如果不指定线程池,默认使用ForkJoinPool,如果指定了线程池,使用指定的线程池,在调用then*()方法的时候,还有一个then*Async()方法,如果使用了then*Async()方法,这个方法内的任务和这个方法后的任务都会使用ForkJoinPool来执行,除非你在then*Async()方法里又传了自定义线程池。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureAPIDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "";
        }, executorService).thenRun(() -> {
            System.out.println("任务2" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).thenRun(() -> {
            System.out.println("任务3" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executorService.shutdown();
    }
}

如果线程内执行的太快,Thread.currentThread().getName()的结果可能是main线程,比方说,把Thread.sleep()去掉,就可以复现。
查看thenRunAsync()方法,里面有一个asyncPool变量,查看它的赋值过程,判断useCommonPool,如果为真,使用ForkJoinPool,如果为假,新建一个线程池。useCommonPool的值又取决于ForkJoinPool.getCommonPoolParallelism() > 1,通过调试发现ForkJoinPool.getCommonPoolParallelism()的值是7,所以默认情况下,使用的是ForkJoinPool。

对计算速度进行选用

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
应用场景:一个方法需要实现多个查询服务,多个服务之间相互独立,只要有一个能返回结果,自动放弃等待其他未执行完的查询。
又找了一个例子:我计划从A到B去,有1路车和2路车,都可以到达,而且它们路线一致,我坐哪个呢?哪个车先来坐哪个呗,这里比较的是等待时间,可以把等待时间抽象成程序处理时间,哪个快走哪个,而且放弃其他所有的,也是这个道理。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureAPIDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "result1";
        }, executorService);
        CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "result2";
        }, executorService);
        CompletableFuture<String> completableFuture = completableFuture1.applyToEither(completableFuture2, v -> v + " is winner");
        System.out.println(completableFuture.join());
        executorService.shutdown();
    }
}

对计算结果进行合并

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
两个CompletionStage任务都完成后,将两个任务的结果一起提交给thenCombine来处理,先完成的任务需要等待另一个任务完成。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureAPIDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 10;
        }, executorService);
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 20;
        }, executorService);
        CompletableFuture<Integer> completableFuture = completableFuture1.thenCombine(completableFuture2, (v1, v2) -> v1 * v2);
        System.out.println(completableFuture.join());
        executorService.shutdown();
    }
}

看到这里,发现弹幕有人提到public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)老师没讲,我就自己搜索了下。
allOf接收若干个CompletableFuture,当所有的CompletableFuture都完成后,才会执行返回CompletableFuture。
anyOf接收若干个CompletableFuture,当任意一个任务执行完成,就返回CompletableFuture。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/42642.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Node.js 入门教程 22 将所有 Node.js 依赖包更新到最新版本

Node.js 入门教程 Node.js官方入门教程 Node.js中文网 本文仅用于学习记录&#xff0c;不存在任何商业用途&#xff0c;如侵删 文章目录Node.js 入门教程22 将所有 Node.js 依赖包更新到最新版本22 将所有 Node.js 依赖包更新到最新版本 当使用 npm install <packagename&g…

pytorch 手写数字识别1

目录 概述 加载图片 绘图部分 backward 前言&#xff1a; 这里以一个手写数字识别的例子,简单了解一下pytorch 实现神经网络的过程. 本章重点讲一下加载数据过程 参考&#xff1a; 课时9 手写数字识别初体验-1_哔哩哔哩_bilibili Pytorch中的backward函数 - …

为了让线上代码可追溯, 我开发了这个vite插件

人生的第一个vite插件 前言 想在控制台输出一下前端代码的一些构建信息&#xff0c; 比如打包时间、打包的人, 代码分支、commit是那个&#xff0c;方便在控制台追溯。 背景 遇到的问题 1、场景一 前端多人协同开发的情况下&#xff0c;比方测试站&#xff0c; 你发的代码…

Java 反射系列 —— 学习笔记

Java 反射系列 1. 类成员 为了更好的描述&#xff0c;我们做个约定个通配符 XXXX&#xff0c; 如果是成员变量就代表 Field&#xff0c;如果是类方法就代表 Method&#xff0c;如果是构造器就代表 Constructor。 1.1 获取方法 那么怎么获取到这三类成员呢&#xff1f; 获…

逆势涨薪3k!新媒体运营毅然转行测试,我的入行秘籍是什么?

不尝试永远都不会成功&#xff0c;勇敢的尝试是成功的一半。 大学毕业做运营&#xff0c;业务难精进&#xff0c;薪资难提升 “你大学专业是商务英语&#xff0c;为什么毕业后会选择做新媒体运营呢&#xff1f;” 其实我当时没有想那么多的&#xff0c;商务英语的就业方向一个…

苹果电容笔值得买吗?2022最新电容笔推荐

如今&#xff0c;许多人都喜欢用IPAD来学习记录&#xff0c;或是安静地作画。很多ipad的用户&#xff0c;都很重视它的实用性&#xff0c;因为他们发现&#xff0c;如果有一款功能不错的电容笔来搭配ipad&#xff0c;那么ipad的实用性就会得到极大的提高。事实上&#xff0c;如…

开发 Chrome 扩展程序的利弊

作为一名软件开发人员,您总是希望从事能够提高您的技术技能并赚钱的项目。有什么比开发现金流 chrome 扩展程序更好的方法呢? 在本文中,我将从软件开发人员的角度概述开发 chrome 扩展程序的一些优点和缺点。 开发 Chrome 扩展程序的好处 Chrome 扩展程序是软件开发人员接…

基于遗传算法与神经网络的测井预测(Matlab代码实现)

&#x1f468;‍&#x1f393;个人主页&#xff1a;研学社的博客 &#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜…

【序列召回推荐】(task4)多兴趣召回MIND模型

note Hinton在2011年提出的capsule network&#xff0c;通过EM期望值最大化算法&#xff0c;用动态路由代替反向传播进行更新参数&#xff0c;学习不同capsule之间的连接权重&#xff0c;实现比CNN更优秀的空间关系建模效果&#xff08;CNN可能对同一个图像的旋转版本识别错误…

Java笔记(十四)

文献种类&#xff1a;专题技术总结文献 开发工具与关键技术&#xff1a; IntelliJ IDEA、Java 语言 作者&#xff1a; 方建恒 年级&#xff1a; 2020 撰写时间&#xff1a; 2022 年 11 月 28 日 Java笔记(十四) 今天我给大家继续分享一下我的Java笔记&#xff0c; 我们继续来…

终于读完了阿里云p9专家分享云原生Kubernetes全栈架构师实战文档

都说程序员工资高、待遇好&#xff0c; 2022 金九银十到了&#xff0c;你的小目标是 30K、40K&#xff0c;还是 16薪的 20K&#xff1f;作为一名 Java 开发工程师&#xff0c;当能力可以满足公司业务需求时&#xff0c;拿到超预期的 Offer 并不算难。然而&#xff0c;提升 Java…

Linux便捷操作

1. Tab 这是你不能没有的 Linux 快捷键。它将节省你 Linux 命令行中的大量时间。 只需要输入一个命令&#xff0c;文件名&#xff0c;目录名甚至是命令选项的开头&#xff0c;并敲击 tab 键。它将自动完成你输入的内容&#xff0c;或为你显示全部可能的结果。 如果你只记一个…

中国住宅设施杂志中国住宅设施杂志社中国住宅设施编辑部2022年第9期目录

景观园林《中国住宅设施》投稿&#xff1a;cnqikantg126.com 市政园林景观工程施工项目管理的基本方法与措施 蒋伟;刘巍;张辉; 1-3 低成本风景园林设计与相关问题分析 魏小静; 4-6 城市文化公园景观设计探究——以临夏河州牡丹文化公园为例 姜丽; 7-9 建筑设计 …

一文读懂:低代码和无代码的演进历程、应用范围

低代码和无代码技术的演进发展 整个软件开发的演进路径大致可以分为四个阶段&#xff1a;第一代程序设计语言&#xff1b;第二代是汇编语言&#xff1b;第三代是现在常见的高级语言&#xff0c;比如 Python、Java 等&#xff1b;第四代就是低代码和无代码技术。低代码、无代码…

Pro_11丨跟踪+目标出场自适应切换

量化策略开发&#xff0c;高质量社群&#xff0c;交易思路分享等相关内容 『正文』 ˇ 大家好&#xff0c;今天我们分享第11期策略——跟踪目标出场自适应切换策略。本期策略是2022年度倒数第2期策略&#xff0c;2023年度松鼠俱乐部内容会更加丰富&#xff0c;12月出预告敬请…

【Vagrant】使用 Vagrant 快速创建多台 centos7 虚拟机

问题场景&#xff1a; 最近在学习数据库的主从复制&#xff0c;因此需要安装两个虚拟机&#xff0c;一个放主数据库&#xff0c;一个放从数据库&#xff08;不会用 Docker ,咱就多搭几个虚拟机吧 &#xff09;&#xff0c;因此记录使用 Vagrant 快速搭建两个 CentOS 7 的教程&a…

Python海龟turtle基础知识大全与画图集合

Turtle图形库 Turtle 库是 Python 内置的图形化模块&#xff0c;属于标准库之一&#xff0c;位于 Python 安装目录的 lib 文件夹下&#xff0c;常用函数有以下几种&#xff1a; 一.Turtle绘图的基础知识 画布是turtle用于绘图区域&#xff0c;我们可以设置它的大小和初始位置。…

使用支持向量机的基于异常的入侵检测系统

使用支持向量机的基于异常的入侵检测系统使用支持向量机的基于异常的入侵检测系统学习目标&#xff1a;学习内容&#xff1a;1.⼀种智能⼊侵检测系统第⼀阶段第⼆阶段&#xff1a;分类总结2.使用支持向量机的基于异常的入侵检测系统1.预处理入侵数据集2.基于信息增益的特征排名…

c++ CJsonObject 读写json

CJsonObject简介 CJsonObject是Bwar基于cJSON全新开发一个C版的JSON库&#xff0c;CJsonObject的最大优势是简单、轻量、跨平台&#xff0c;开发效率极高&#xff0c;尤其对多层嵌套json的读取和生成、修改极为方便。CJsonObject比cJSON简单易用得多&#xff0c;且只要不是有意…

Linux —— 文件操作

目录 1.内核提供的文件系统调用 1.1open和close 1.2write和read 2.文件描述 2.1文件描述符 2.2文件描述符分配规则 3.重定向 3.1最“挫”的重定向 3.2使用系统调用 3.3重定向原理 3.4让我们的"shell"支持重定向操作 4.一切皆文件 1.内核提供的文件系统调用…