第二章CompletableFuture

news2025/2/10 22:27:35

文章目录

  • Future和Callable接口
    • FutureTask实现类
      • 为什么引出FutureTask
  • Future到CompletableFuture
    • Future优点
    • Future的缺点
      • get()阻塞
      • isDone()轮询
      • Future应用现状
  • CompletableFuture基本介绍
    • CompletionStage
    • 核心的四个静态方法(分为两组)
      • runAsync无返回值
      • supplyAsync有返回值
    • CompletableFuture使用演示
      • 基本功能
      • 减少阻塞和轮询whenComplete
    • CompletableFuture优点总结
    • join和get对比
  • 实战精讲-比价网站case
    • 需求
    • 基本框架搭建
    • 从功能到性能:利用CompletableFuture
  • CompletableFuture常用API
    • 获得结果和触发计算
      • 获取结果
      • 主动触发计算
    • 对计算结果进行处理
    • 对计算结果进行消费
    • CompleteFuture和线程池说明(非常重要)
    • 对计算速度进行选用
    • 对计算结果进行合并

Future和Callable接口

  • Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。(异步:可以被叫停,可以被取消)

image-20230604132734762

一句话:Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。

  • eg.比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会才去获取子任务的执行结果。老师在上课,但是口渴,于是让班长这个线程去买水,自己可以继续上课,当班长买回来了,老师喝完水就继续上课

有个目的:异步多线程任务执行且有返回结果,三个特点:多线程/有返回值/异步任务(班长作为老师去买水作为新启动的异步多线程任务且买到水有结果返回)

FutureTask实现类

从我们的需求来分析,我们的异步多线程任务执行且返回有结果,可以看出来三个特点

  • 多线程
  • 有返回值
  • 异步任务

为什么引出FutureTask

image-20230604133730785

  • 我们想开启一个线程,必须new一个Thread类的对象但是我们的Thread类只有接收Runnable对象的构造函数
  • 我们还需要满足异步任务的条件,所以必须满足Future接口
  • 我们还需要有返回值,还需要满足我们的Callable接口

image-20230604134653253

  • 我们看见FutureTask实现了Future接口,也实现了Runnable接口,满足了可以开启一个线程,也可以具有异步任务的功能,且还使用了适配器设计模式,适配了Callable对象,也就满足了对应需要有返回值的要求
  • ctrl+alt+u) IDEA能帮助我们生成类的继承关系图

使用

public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> stringFutureTask = new FutureTask<String>(new MyThread2());
        Thread thread = new Thread(stringFutureTask);
        thread.start();
        System.out.println(stringFutureTask.get());
    }
}
class MyThread2 implements Callable<String>{

    @Override
    public String call() throws Exception {
        System.out.println("-----come in call() ----异步执行");
        return "hello Callable 返回值";
    }
}
-----come in call() ----异步执行
hello Callable 返回值

Future到CompletableFuture

Future优点

  • future+线程池异步多线程任务配合,能显著提高程序的执行效率。

方案一,3个任务1个main线程处理,大概1130ms

    public static void main(String[] args) throws InterruptedException {
        //3个任务,目前只有一个线程main来处理
        long startTime = System.currentTimeMillis();
        //暂停毫秒
        TimeUnit.MILLISECONDS.sleep(500);
        TimeUnit.MILLISECONDS.sleep(300);
        TimeUnit.MILLISECONDS.sleep(300);
        long endTime= System.currentTimeMillis();
        System.out.println((endTime-startTime)+"毫秒");
        System.out.println(Thread.currentThread().getName()+"\t--------end");
    }
}

方案二,3个任务3个线程,利用线程池(假如每次new一个Thread,太浪费资源,会有GC这些工作),大概400毫秒

public class FutureDemo1 {
    public static void main(String[] args) throws InterruptedException {
        //3个任务,开启多个异步任务线程进行处理
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        long startTime = System.currentTimeMillis();
        FutureTask<String> futureTask1 = new FutureTask<>(() -> {
            TimeUnit.MILLISECONDS.sleep(500);
            return "task1 over";
        });
        executorService.submit(futureTask1);
        FutureTask<String> futureTask2 = new FutureTask<>(() -> {
            TimeUnit.MILLISECONDS.sleep(300);
            return "task2 over";
        });
        executorService.submit(futureTask2);
        TimeUnit.MILLISECONDS.sleep(300);
        long endTime = System.currentTimeMillis();
        System.out.println((endTime-startTime)+"毫秒");
        System.out.println(Thread.currentThread().getName()+"\t--------end");
    }
}
  • 为什么是400毫秒左右呢?我们其中一个任务都有500毫秒,其实这里的时间只是记录main主线程的执行时间,我们的main线程结束,不代表我们的其他线程结束,这也就是我们的异步的概念
    • 拿给老师买水的例子,可能班长去买水,但是下课了,老师停止讲课,但是我们的班长还在买水的路上

Future的缺点

get()阻塞

一旦调用get()方法,不管是否计算完成,都会导致阻塞(所以一般get方法放到最后)

public class FutureDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask1 = new FutureTask<String>(() -> {
            System.out.println(Thread.currentThread().getName() + "\t ------副线程come in");
            TimeUnit.SECONDS.sleep(3);
            return "task over";
        });
        Thread thread = new Thread(futureTask1, "t1");
        thread.start();
        //-----------------------------------------------------------注意顺序
        System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");
        System.out.println(futureTask1.get());
        //----------------------------------------------------------注意顺序
    }
}
//t1	 ------副线程come in
//main	-------主线程忙其他任务了
//task over
public class FutureDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask1 = new FutureTask<String>(() -> {
            System.out.println(Thread.currentThread().getName() + "\t ------副线程come in");
            TimeUnit.SECONDS.sleep(3);
            return "task over";
        });
        Thread thread = new Thread(futureTask1, "t1");
        thread.start();
        //-----------------------------------------------------------注意顺序
        System.out.println(futureTask1.get());
        System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");
        //----------------------------------------------------------注意顺序
    }
}
//t1	 ------副线程come in
//task over 
//-------------------5秒后才出现下面的结果-------------说明一旦调用get()方法直接去找副线程了,阻塞了主线程
//main	-------主线程忙其他任务了

isDone()轮询

利用if(futureTask.isDone())的方式使得他在结束之后才get(),但是也会消耗cpu

public class FutureDemo3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<String>(()->{
            System.out.println(Thread.currentThread().getName()+"\t ------副线程come in");
            TimeUnit.SECONDS.sleep(5);//暂停几秒
            return "task over";
        });
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
        System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");
        //1-------  System.out.println(futureTask.get(3,TimeUnit.SECONDS));//只愿意等3秒,过了3秒直接抛出异常
        //2-------更健壮的方式-------轮询方法---等副线程拿到才去get()
        //但是也会消耗cpu资源
        while(true){
            if(futureTask.isDone()){
                System.out.println(futureTask.get());
                break;
            }else{
                //暂停毫秒
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("正在处理中------------正在处理中");
            }
        }
    }
}
//main  -------主线程忙其他任务了
//t1   ------副线程come in
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//task over

Future应用现状

对于简单的业务场景使用Future完全OK

回调通知

  • 前面的isDone()方法耗费cpu资源,一般应该还是利用回调函数,在Future结束时自动调用该回调函数。应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知

创建异步任务

  • Future+线程池配合

  • 多个任务前后依赖可以组合处理

    • 想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值,将两个或多个异步计算合成一个异步计算,这几个异步计算相互独立,同时后面这个又依赖前一个处理的结果

    • 比如买鱼-加料-烹饪

  • 对计算速度选最快完成的(并返回结果)

    • 当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果。

CompletableFuture基本介绍

  • 它实现了Future和Completion Stage接口
    • 阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无畏的CPU资源。因此,JDK8设计出CompletableFuture
    • CompletableFuture提供一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> 

image-20230605111923635

  • 在Java 8中, CompletableFuture提供了非常强大的Future的扩展功能, 可以帮助我们简化异步编程的复杂性, 并且提供了函数式编程的能力, 可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法。

  • 它可能代表一个明确完成的Future, 也有可能代表一个完成阶段(Completion Stage) , 它支持在计算完成以后触发一些函数或执行某些动作。

  • 它实现了Future和CompletionStage接口

CompletionStage

  • Completion Stage代表异步计算过程中的某一个阶段, 一个阶段完成以后可能会触发另外一个阶段
  • 一个阶段的计算执行可以是一个Function, Consumer或者Runnable。比如:stage.then Apply(x->square(x) ) .then Accept(x->System.out.print(x) ) .then Run() ->System.out.print In() ),一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。

核心的四个静态方法(分为两组)

  • 利用核心的四个静态方法创建一个异步操作 | 不建议用new

  • 关键就是 |有没有返回值|是否用了线程池|

  • 参数说明:

    • 没有指定Executor的方法,直接使用默认的ForkJoinPool.commPool()作为它的线程池执行异步代码。

    • 如果指定线程池,则使用我们定义的或者特别指定的线程池执行异步代码。

runAsync无返回值

public class CompletableFutureDemo1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(voidCompletableFuture.get());
    }
}
//ForkJoinPool.commonPool-worker-9
//null

runAsync+线程池

public class CompletableFutureDemo1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    	//自定义
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },executorService);
        System.out.println(voidCompletableFuture.get());
    }
}
//pool-1-thread-1
//null

supplyAsync有返回值

public class CompletableFutureDemo1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "helllo supplyasync";
        });
        System.out.println(stringCompletableFuture.get());

    }
}
//ForkJoinPool.commonPool-worker-9
//helllo supplyasync

supplyAsync+线程池

public class CompletableFutureDemo1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "helllo supplyasync";
        },executorService);
        System.out.println(stringCompletableFuture.get());

    }
}
//pool-1-thread-1
//helllo supplyasync

CompletableFuture使用演示

基本功能

  • CompletableFuture可以完成Future的功能
public class CompletableFutureDemo3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "----副线程come in");
            int result = ThreadLocalRandom.current().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("1秒钟后出结果"+result);
            return result;
        });
        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
        System.out.println(integerCompletableFuture.get());
    }
}
//main线程先去忙其他任务
//ForkJoinPool.commonPool-worker-9----副线程come in
//1秒钟后出结果5
//5

减少阻塞和轮询whenComplete

  • CompletableFuture通过whenComplete减少阻塞和轮询(自动回调)
public class CompletableFutureDemo3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> exceptionally = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "----副线程come in");
            int result = ThreadLocalRandom.current().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("1秒钟后出结果" + result);
            return result;
        }).whenComplete((v, e) -> {//没有异常,v是值,e是异常
            if (e == null) {
                System.out.println("------------------计算完成,更新系统updataValue" + v);
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            System.out.println("异常情况" + e.getCause() + "\t" + e.getMessage());
            return null;
        });
        //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
//ForkJoinPool.commonPool-worker-9----副线程come in
//main线程先去忙其他任务
//1秒钟后出结果7
//------------------计算完成,更新系统updataValue7

使用自定义线程池

public class CompletableFutureDemo3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<Integer> exceptionally = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "----副线程come in");
            int result = ThreadLocalRandom.current().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("1秒钟后出结果" + result);
            return result;
        },executorService).whenComplete((v, e) -> {//没有异常,v是值,e是异常
            if (e == null) {
                System.out.println("------------------计算完成,更新系统updataValue" + v);
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            System.out.println("异常情况" + e.getCause() + "\t" + e.getMessage());
            return null;
        });
        //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
//        try {
//            TimeUnit.SECONDS.sleep(3);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
    }
  • 我们这里注意如果使用的默认的ForkJoinPool.commPool()线程池,我们需要让主线程存活一段时间,不然会导致线程池关闭,因为他是守护线程,main线程结束,那么就没有用户线程了

CompletableFuture优点总结

  • 异步任务结束时,会自动回调某个对象的方法;
  • 主线程设置好毁掉后,不再关心异步任务的执行,异步任务之间可以顺序执行
  • 异步任务出错时,会自动回调某个对象的方法。

join和get对比

  • 功能几乎一样,区别在于编码时是否需要抛出异常
    • get()方法需要抛出异常
    • join()方法不需要抛出异常

实战精讲-比价网站case

需求

1需求说明
1.1同一款产品,同时搜索出同款产品在各大电商平台的售价;
1.2同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少

2输出返回:
出来结果希望是同款产品的在不同地方的价格清单列表, 返回一个List
《mysql》in jd price is 88.05
《mysql》in dang dang price is 86.11
《mysql》in tao bao price is 90.43

3解决方案,比对同一个商品在各个平台上的价格,要求获得一个清单列表
1 step by step , 按部就班, 查完京东查淘宝, 查完淘宝查天猫…
2 all in ,万箭齐发,一口气多线程异步任务同时查询。。。

基本框架搭建

  • 相当于是一个一个按部就班
public class NetMall {
    @Getter
    private String netMallName;

    public NetMall(String netMallName) {
        this.netMallName = netMallName;
    }
    public double calcPrice(String productName){
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return ThreadLocalRandom.current().nextDouble()*20;//用这句话来模拟价格
    }
}
public class Case {
    static List<NetMall> list= Arrays.asList(
            new NetMall("jd"),
            new NetMall("dangdang"),
            new NetMall("taobao")
    );
    public static List<String> getPrice(List<NetMall> list,String productName){
        return list.stream()
                .map(netMall -> String.format(productName+" in %s price is %.2f",
                        netMall.getNetMallName(),netMall.calcPrice(productName)))
                .collect(Collectors.toList());
    }
    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        List<String> list1 = getPrice(list, "mysql");
        for(String element:list1){
            System.out.println(element);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("---当前操作花费时间----costTime:"+(endTime-startTime)+"毫秒");
    }
}
mysql in jd price is 16.61
mysql in dangdang price is 18.52
mysql in taobao price is 17.34
---当前操作花费时间----costTime:3081毫秒

从功能到性能:利用CompletableFuture

  • 这里是利用异步线程,万箭齐发
  • 此处用了两步流式编程
  • 性能差距巨大
 //从功能到性能
    public static List<String> getPricesByCompletableFuture(List<NetMall> list,String productName){
        return list.stream().map(netMall -> CompletableFuture.supplyAsync(()-> String.format(productName+" in %s price is %.2f",
    														netMall.getNetMallName(),netMall.calcPrice(productName))))//Stream<CompletableFuture<String>>
                    .collect(Collectors.toList())//List<CompletablFuture<String>>
                    .stream()//Stream<CompletableFuture<String>
                    .map(s->s.join())//Stream<String>
                    .collect(Collectors.toList());
    }
mysql in jd price is 14.32
mysql in dangdang price is 2.48
mysql in taobao price is 2.24
---当前操作花费时间----costTime:1060毫秒

CompletableFuture常用API

获得结果和触发计算

获取结果

  • public T get() 不见不散,容易阻塞

  • public T get(long timeout,TimeUnit unit) 过时不候,超过时间会爆异常

  • public T join() 类似于get(),区别在于是否需要抛出异常

  • public T getNow(T valueIfAbsent)

    • 没有计算完成的情况下,给一个替代结果

    • 立即获取结果不阻塞

      • 计算完,返回计算完成后的结果

      • 没算完,返回设定的valueAbsent(直接返回了备胎值xxx)

主动触发计算

  • public boolean complete(T value) 是否立即打断get()方法返回括号值

    • (执行要2s,等待只有1s,所以还没执行完就被打断了。返回true表示打断了获取这个过程,直接返回了备胎值complete;如果没打断,返回false 和原来的abc)
public class CompletableFutureDemo4 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);//执行需要2秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "abc";
        });
        TimeUnit.SECONDS.sleep(1);
//        System.out.println(stringCompletableFuture.getNow("xxx"));//执2-等1 返回xxx
        System.out.println(stringCompletableFuture.complete("completeValue")+"\t"+stringCompletableFuture.get());//true completableFuture
    }
}

对计算结果进行处理

  • thenApply 计算结果存在在依赖关系,使得线程串行化。因为依赖关系,所以一旦有异常,直接叫停。
public class CompletableFutureDemo4 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
        CompletableFuture<Integer> exceptionally = CompletableFuture.supplyAsync(() -> {
            //暂停几秒线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("111");
            return 1024;
        }).thenApply(f -> {
           // int age = 10/0;//异常语句
            System.out.println("222");
            return f + 1;
        }).thenApply(f -> {
            System.out.println("333");
            return f + 1;
        }).whenComplete((v, e) -> {
            System.out.println("*****v: " + v);
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });
        System.out.println("-----主线程结束,END");

        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
    }
}    
//-----正常情况
//111
//222
//333
//----计算结果: 1026

//-----异常情况
//111
//异常.....  
  • handle 类似于thenApply,但是有异常的话仍然可以往下走一步。
public class CompletableFutureDemo2
{

    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        //当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
        // 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
        CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("111");
            return 1024;
        }).handle((f,e) -> {
            int age = 10/0;//异常语句
            System.out.println("222");
            return f + 1;
        }).handle((f,e) -> {
            System.out.println("333");
            return f + 1;
        }).whenCompleteAsync((v,e) -> {
            System.out.println("*****v: "+v);
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });

        System.out.println("-----主线程结束,END");

        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
    }
}
//-----异常情况
//111
//333
//异常,可以看到多走了一步333

对计算结果进行消费

  • 接收任务的处理结果,并消费处理,
  • 无返回结果|消费型函数式接口,之前的是Function
public static void main(String[] args) throws ExecutionException, InterruptedException
{
    CompletableFuture.supplyAsync(() -> {
        return 1;
    }).thenApply(f -> {
        return f + 2;
    }).thenApply(f -> {
        return f + 3;
    }).thenApply(f -> {
        return f + 4;
    }).thenAccept(r -> System.out.println(r));
}
//6
//消费一下,直接得到6

补充:Code之任务之间的顺序执行

  • thenRun

    • thenRun(Runnable runnable)

    • 任务A执行完执行B,并且B不需要A的结果

  • thenAccept

    • thenAccept(Consumer action)

    • 任务A执行完执行B,B需要A的结果,但是任务B无返回值

  • thenApply

    • thenApply(Function fn)

    • 任务A执行完执行B,B需要A的结果,同时任务B有返回值

System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
//null 

System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());
//resultA打印出来的 null因为没有返回值

System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
//resultAresultB 返回值

CompleteFuture和线程池说明(非常重要)

上面的几个方法都有普通版本和后面加Async的版本

以thenRun和thenRunAsync为例,有什么区别?

先看结论

  • 没有传入自定义线程池,都用默认线程池ForkJoinPool

  • 传入了一个自定义线程池如果你执行第一个任务的时候,传入了一个自定义线程池

    • 调用thenRun方法执行第二个任务的时候,则第二个任务和第一个任务是用同一个线程池

    • 调用thenRunAsync执行第二个任务的时候,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池也有可能处理太快,

  • 系统优化切换原则,直接使用main线程处理(把sleep去掉)

//2-1
public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
            return "abcd";
        },threadPool).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
        });
    }
}
//1号任务  pool-1-thread-1
//2号任务  pool-1-thread-1
//3号任务  pool-1-thread-1
//4号任务  pool-1-thread-1
//2-2
public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
            return "abcd";
        },threadPool).thenRunAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
        });
    }
}
//1号任务  pool-1-thread-1
//2号任务  ForkJoinPool.commonPool-worker-9---这里另起炉灶重新调用了默认的ForkJoinPool
//3号任务  ForkJoinPool.commonPool-worker-9
//4号任务  ForkJoinPool.commonPool-worker-9

public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
//            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
            return "abcd";
        },threadPool).thenRun(()->{
           // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
          //  try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            //try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
        });
    }
}

//1号任务  1号任务  pool-1-thread-1
//2号任务  main
//3号任务  main
//4号任务  main

对计算速度进行选用

  • applyToEither方法,快的那个掌权
public class CompletableFutureDemo2 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> play1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            return "play1 ";
        });
    	  CompletableFuture<String> play2 = CompletableFuture.supplyAsync(() -> {
        	System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
        	try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
        	return "play2";
   	  });
    	  CompletableFuture<String> thenCombineResult = play1.applyToEither(play2, f -> {//对计算速度进行选用
         	return f + " is winner";
   	  });
    	 System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get());
}
}
//ForkJoinPool.commonPool-worker-9  ---come in 
//ForkJoinPool.commonPool-worker-2  ---come in 
//main  play2 is winner

对计算结果进行合并

thenCombine 合并

  • 两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCOmbine来处理

  • 先完成的先等着,等待其它分支任务

public class CompletableFutureDemo2
{
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            return 10;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            return 20;
        });

        CompletableFuture<Integer> thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            return x + y;
        });
        
        System.out.println(thenCombineResult.get());
    }
}
//30

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/640340.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数字人解决方案——实时对话数字人源码与环境配置

前言 1.从技术角度来看&#xff0c;现在的数学人就是一个缝合怪&#xff0c;把各种技术点都整合在一起&#xff0c;用来实现直播、对话等数字人。技术流程大概如下图&#xff1a; 其实最重要的一环应该属于LLM(大型语言模型)&#xff0c;LLM相当于一个人的意识&#xff0c;如果…

外卖订单管理系统(Javaweb+Mysql)

程序源码 可以通过上方代码包.rar文件下载&#xff0c;也可以在下方链接下载 链接: https://pan.baidu.com/s/1OruBEcEK70DtUbvA8UIE-w?pwddkdg &#xff08;数据库sql文件在项目根目录下data -> sql&#xff09; 设计报告 【金山文档】 外卖订单管理系统设计报告 http…

编译原理期末速成–正规式、NFA转DFA、DFA的简化

编译原理期末速成–正规式、NFA转DFA、DFA的简化 文章目录 编译原理期末速成--正规式、NFA转DFA、DFA的简化什么是DFA、NFA&#xff1f;看个题消化一下步骤一&#xff1a;步骤二&#xff1a;步骤三&#xff1a;步骤四&#xff1a;步骤五&#xff1a;步骤六&#xff1a;步骤七&a…

POJ The Game

原题目&#xff1a;传送锚点 1.题目 The Game Description A game of Renju is played on a 19*19 board by two players. One player uses black stones and the other uses white stones. The game begins in an empty board and two players alternate in placing black …

面对工作中的失误:从错误中学习与成长

&#x1f604;作者简介&#xff1a; 小曾同学.com,一个致力于测试开发的博主⛽️&#xff0c;主要职责&#xff1a;测试开发、CI/CD 如果文章知识点有错误的地方&#xff0c;还请大家指正&#xff0c;让我们一起学习&#xff0c;一起进步。&#x1f60a; 座右铭&#xff1a;不想…

C99的一些新特性记录

固长类型头文件<stdint.h> 由于历史原因&#xff0c;C语言中实现的整型数只保证了在不同硬件体系中的最小长度&#xff0c;因此在使用时&#xff0c;需要根据代码实际运行的平台来确定类型的长度&#xff0c;这导致代码非常不方便移植。C99标准通过增加固长类型头文件引入…

【Unity3D】屏幕深度和法线纹理简介

1 前言 1&#xff09;深度纹理和法线纹理的含义 深度纹理本质是一张图片&#xff0c;图片中每个像素反应了屏幕中该像素位置对应的顶点 z 值相反数&#xff08;观察坐标系&#xff09;&#xff0c;之所以用 “反应了” 而不是 “等于”&#xff08;或 “对应” &#xff09;&am…

chatgpt赋能python:Python浮点型转换为整型的方法和应用场景

Python浮点型转换为整型的方法和应用场景 介绍 Python的浮点型和整型在数值计算中应用广泛。有时候我们需要将一个浮点数转换为整数&#xff0c;这时候就需要使用Python提供的一些函数来完成转换。本文将介绍Python浮点型转换为整型的方法和应用场景。 浮点型和整型的区别 …

初探MyBatis实现简单查询

一、创建数据库与表 1、创建数据库 在Navicat里创建MySQL数据库 - testdb&#xff0c;采用utf8mb4字符集 2、创建用户表 CREATE TABLE t_user (id int(11) NOT NULL AUTO_INCREMENT,name varchar(50) DEFAULT NULL,age int(11) DEFAULT NULL,address varchar(255) DEFAULT…

SpringBoot的日志文件

文章目录 前言日志怎么用自定义打印日志⽇志级别 - 了解⽇志持久化Lombok提供的方法 前言 上文讲述了 SpringBoot项目的构建 与配置文件的使用 ,下面来介绍 SpringBoot 的日志文件 , 日志在程序 中起到的作用是很大的 , 谁写的程序能不报错误呢, 日志就是一种让你快速找到错误…

Linux环境变量配合权限维持手法

前言&#xff1a; 权限维持的时候有其中有两种&#xff0c;一种是alias别名、第二种是prompt_command&#xff0c;这里我们可以将其添加到环境变量中&#xff0c;每次运行的时候都可以使用&#xff0c;从而达到权限控制的效果&#xff0c;而不是临时执行的效果。 环境变量&am…

harbor仓库的搭建

harbor仓库的搭建 前言一、准备二、registry私有仓库拉取registry镜像上传镜像下载镜像添加私有仓库解析配置使用非加密端口拉取镜像 三、仓库加密域名保持一致部署客户端证书&#xff0c;不然会报错验证仓库认证删除registry&#xff0c;重建登录仓库&#xff0c;不然无法上传…

[论文阅读笔记76]GPT Understands, Too(P-tuning)

1. 基本信息 题目论文作者与单位来源年份GPT Understands, Too清华大学 Citations, References 论文链接&#xff1a;https://arxiv.org/pdf/2103.10385.pdf 论文代码&#xff1a; 2. 要点 研究主题问题背景核心方法流程亮点数据集结论论文类型关键字微调大模型采用传统微…

css空间转换

目录 1. 3D移动 translate3d 1.1 三维坐标系 1.2 3D移动 translate3d 1.3 透视 perspective 1.4 translateZ 2. 3D旋转 rotate3d 2.1 左手法则-判断元素旋转方向的取值正负 3. 3D呈现 transform-style【***】 4. 3D缩放 transform:scale3d 1. 3D移动 translate3d …

nacos运行报错-jar: file does not existCan‘t retrieve image ID from build stream

一、问题 Deploying nacos Dockerfile: ruoyi-visual/ruoyi-nacos/Dockerfile… Building image… Preparing build context archive… [>]211/211 files DoneSending build context to Docker daemon… [>] 6.099MB DoneStep 1/8 : FROM openjdk:11---> 5505a9a39df…

chatgpt赋能python:用Python创建股票池

用Python创建股票池 介绍 如果你是一位投资者&#xff0c;你一定知道股票池是什么。它是一个包含一组股票的集合&#xff0c;使投资者能够跟踪和管理他们的投资组合。这些股票可以根据各种因素分类&#xff0c;例如行业&#xff0c;市值&#xff0c;收入增长等。 Python是一…

Oracle的学习心得和知识总结(二十六)|Oracle数据库Real Application Testing测试指南(数据库回放)

目录结构 注&#xff1a;提前言明 本文借鉴了以下博主、书籍或网站的内容&#xff0c;其列表如下&#xff1a; 1、参考书籍&#xff1a;《Oracle Database SQL Language Reference》 2、参考书籍&#xff1a;《PostgreSQL中文手册》 3、EDB Postgres Advanced Server User Gui…

学习HCIP的day.13

目录 IPV6 一、特征-升级点 二、IPV6地址 三、IPV6地址分类 1、单播地址 2、多播地址 四、协议 五、思科配置 1、接口配置IPV6的单播地址 2、IPV6的ACL 3、IPV6的单播路由协议 4、IPV4和IPV6共存 六、华为IPV6配置 1、静态 2、OSPF 3、BGP 4、IPV4和IPV6共存…

我的内网渗透-提权大法

拿到shell之后乱码解决 chcp 65001 #将编码设置为UTF-8的编码 出现这个提示就是切换成功&#xff0c;后面也是可以正常显示的 提权 方法一&#xff1a; 新版本的kali直接getsystem&#xff0c;可以提权成功&#xff08;有时候可以&#xff0c;有时候不可以&#xff09; mete…

chatgpt赋能python:Python循环3次的方法

Python循环3次的方法 循环是编程中经常用到的一个基本操作&#xff0c;可以让相同的代码运行多次。在Python中&#xff0c;循环也是极其重要的&#xff0c;其中最常用的是for和while循环。在本文中&#xff0c;我们将介绍如何使用Python循环3次。 使用for循环 使用for循环是…