JUC高级一: CompletableFuture

news2025/1/10 2:14:42

JUC高级: CompletableFuture

1. 线程基础知识复习

1.1 JUC四大口诀

  • 高内聚低耦合前提下,封装思想

    线程—>操作---->资源类

  • 判断、干活、通知

  • 防止虚假唤醒,wait方法要注意使用while判断

  • 注意标志位flag,可能是volatile的

1.2 为什么多线程及其重要?

摩尔定律:

它是由英特尔创始人之一Gordon Moore(戈登·摩尔)提出来的。其内容为:
当价格不变时,集成电路上可容纳的元器件的数目约每隔18-24个月便会增加一倍,性能也将提升一倍。
换言之,每一美元所能买到的电脑性能,将每隔18-24个月翻一倍以上。这一定律揭示了信息技术进步的速度。

  1. 硬件方面

    • 摩尔定律失效
      • 可是从2003年开始CPU主频已经不再翻倍,而是采用多核而不是更快的主频。
      • 在主频不再提高且核数在不断增加的情况下,要想让程序更快就要用到并行或并发编程
  2. 软件方面

    • 充分利用多核处理器
    • 提高程序性能,高并发系统
    • 提高程序吞吐量,异步+回调等生产需求
  3. 弊端及问题

    • 线程安全问题
      • i++
      • 集合类安全问题
    • 线程锁问题
    • 线程性能问题

1.3 start一个线程原理

OpenJDK官网网址

OpenJDK8源码下载地址

  1. java 线程理解及openjdk中的实现

    • 源码中我们调用一个线程的start方法实质上是调用start0方法

      image-20230306212846195

    • 而start0是native方法

      image-20230306212948636

    • Java语言本身底层就是C++语言

java线程是通过start的方法启动执行的,主要内容在native方法start0中,
Openjdk的写JNI一般是一一对应的,Thread.java对应的就是Thread.c

  1. C++底层源码解读

    • openjdk8\jdk\src\share\native\java\lang thread.c

      • start0其实就是JVM_StartThread。此时查看源代码可以看到在jvm.h中找到了声明,jvm.cpp中的JVM_StartThread实现。

        image-20230306214112179

    • openjdk8\hotspot\src\share\vm\prims jvm.cpp

      • JVM_StartThread方法本质上就是调用了JVM中的start方法

      image-20230306214213594

      image-20230306214237433

    • openjdk8\hotspot\src\share\vm\runtime thread.cpp

      • JVM中的start方法本质上就是C++调用操作系统的创建一个线程

        image-20230306214636728

总结: java中调用start本质上就是调用的start0本地方法,而本地方法是C++通过调用操作系统创建线程

1.4 Java多线程相关概念

  1. 进程

    是程序的⼀次执⾏,是系统进⾏资源分配和调度的独⽴单位,每⼀个进程都有它⾃⼰的内存空间和系统资源

  2. 线程

    线程(英语:thread)是操作系统能够进行运算调度的最小单位。 它被包含在进程之中,是进程中的实际运作单位。

  3. 管程

    Monitor(监视器),也就是我们平时所说的锁

    Monitor其实是一种同步机制,他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码。

1.5 用户线程和守护线程

Java线程分为用户线程和守护线程,线程的daemon属性为true表示是守护线程,false表示是用户线程

  • 守护线程
    • 是一种特殊的线程,在后台默默地完成一些系统性的服务,比如垃圾回收线程
  • 用户线程
    • 是系统的工作线程,它会完成这个程序需要完成的业务操作

注意事项:

  • 设置守护线程,需要在start()方法之前进行

  • 当程序中所有用户线程执行完毕之后,不管守护线程是否结束,系统都会自动退出

    • 如果用户线程全部结束了,意味着程序需要完成的业务操作已经结束了,系统可以退出
      了。所以当系统只剩下守护进程的时候,java虚拟机会自动退出

示例代码

public class DaemonDemo
{
public static void main(String[] args)
{
    Thread t1 = new Thread(() -> {
        System.out.println(Thread.currentThread().getName()+"\t 开始运行,"+(Thread.currentThread().isDaemon() ? "守护线程":"用户线程"));
        while (true) {

        }
    }, "t1");
    //线程的daemon属性为true表示是守护线程,false表示是用户线程
    t1.setDaemon(true);
    t1.start();
    //3秒钟后主线程再运行
    try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }

    System.out.println("----------main线程运行完毕");
}
}

2. CompleteableFuture进化历史

2.0 并发最主要三要素多线程,异步任务,返回值

2.1 Future为什么出现

2.1.1 Future 接口

Future接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。

Future 接口内部方法:

image-20230310130711756

2.1.2 Callable接口

Callable接口中定义了需要有返回的任务需要实现的方法。

image-20230310131211337

Future出现的原因:

目的:让主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务.

Future可以开启异步任务而callable接口可以开启新线程并且拿到返回值,

两者结合即可达到目的

2.2 Future接口常用实现类FutureTask异步任务

2.2.1 FutureTask相关架构

image-20230310132614187

2.2.2 FutureTask对Callable的特殊支持

从2.2.1FutureTask的架构图中可以看出FutureTask是对Runnable支持的,但是使用Runnable创建的任务是不会有返回值的.但是我们打开FutureTask的源码发现FutureTask的构造方法是对Callable接口进行支持(有返回值、可抛出异常),那么我们返回值的目的也就达到了

image-20230310134656602

2.2.3 FutureTask基础使用(创建线程)

package site.zhourui.juc;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask futureTask = new FutureTask<>(new MyThread());
        Thread t1 = new Thread(futureTask, "t1");
        t1.start();
        System.out.println(futureTask.get());
    }

}
class MyThread implements Callable {
    @Override
    public Object call() throws Exception {
        System.out.println("进入callable子线程");
        return "hello callable";
    }
}

执行结果:

image-20230310135819355

2.2.4 FutureTask结合线程池

package site.zhourui.juc;

import java.util.concurrent.*;

public class FutureTaskDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

        long s = System.currentTimeMillis();

        FutureTask<String> task1 = new FutureTask<>(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "task1 over";
        });
        FutureTask<String> task2 = new FutureTask<>(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "task2 over";
        });
        FutureTask<String> task3 = new FutureTask<>(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "task3 over";
        });
        fixedThreadPool.submit(task1);
        fixedThreadPool.submit(task2);
        fixedThreadPool.submit(task3);
        System.out.println(task1.get());
        System.out.println(task2.get());
        System.out.println(task3.get());
        long e = System.currentTimeMillis();
        System.out.println("执行时间:"+(e-s));
        fixedThreadPool.shutdown();
    }

}

执行结果:

image-20230311105224762

结论:future+线程池异步多线程任务配合,能显著提高程序的执行效率。

2.2.5 FutureTask 的缺点(为什么会使用CompleteableFuture)

2.2.5.1 get方法的阻塞

正常情况下:我们的get方法放在主线程执行之后是没有任何问题的

image-20230311105813084

package site.zhourui.juc;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> futureTask = new FutureTask<>(() ->{
            System.out.println(Thread.currentThread().getName()+"\t...come in");
            TimeUnit.SECONDS.sleep(5);
            return "task over";
        });
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
        System.out.println(Thread.currentThread().getName()+"/t...忙其他任务了");

        System.out.println(futureTask.get()); //git方法等待
    }
}

get方法过时不候.,超过等待时间没有拿到结果直接抛出异常

优点:假如我不愿意等待很长时间,我希望过时不候,可以自动离开

缺点:如果写多了抛出大量异常不优雅,但也可以用

可以通过捕获异常的方式做其他业务处理

image-20230311112555193

package site.zhourui.juc;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> futureTask = new FutureTask<>(() ->{
            System.out.println(Thread.currentThread().getName()+"\t...come in");
            TimeUnit.SECONDS.sleep(5);
            return "task over";
        });
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
        System.out.println(Thread.currentThread().getName()+"/t...忙其他任务了");
//        System.out.println(futureTask.get()); //get方法等待
        System.out.println(futureTask.get(3,TimeUnit.SECONDS));//get方法过时不候
    }
    /**
     *1 get容易导致阻塞,一般建议放在程序后面,一旦调用不见不散,非要等到结果才会离开,不管你是否计算完成,容易程序堵塞。
     *2 假如我不愿意等待很长时间,我希望过时不候,可以自动离开.
     */
}

当我们在主线程做其他事情之前调用get方法,那么主线程会被阻塞,主线程会一直等到子线程任务执行完毕,get方法拿到返回值为止,那么这样和单线程没有任何区别甚至更慢

综上注意事项:

一旦调用get()方法,不管是否计算完成,都会导致程序阻塞,所以get()方法的位置一般放在程序最后

image-20230311110743847

2.2.5.2 isDone()轮询

缺点:轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果.

如果想要异步获取结果,通常都会以轮询的方式去获取结果尽量不要阻塞

利用if(futureTask.isDone())的方式使得FutureTask在结束之后才get(),但是也会消耗cpu

通过sleep降低查询的频率,减少cpu的消耗

image-20230311113812964

package site.zhourui.juc;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureAPIDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> futureTask = new FutureTask<>(() ->{
            System.out.println(Thread.currentThread().getName()+"\t...come in");
            TimeUnit.SECONDS.sleep(5);
            return "task over";
        });
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
        System.out.println(Thread.currentThread().getName()+"/t...忙其他任务了");
        while(true){
            if(futureTask.isDone()){
                System.out.println(futureTask.get());
                break;
            }else{
                //暂停毫秒
                try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
                System.out.println("正在处理中,不要再催了,越催越慢 ,再催熄火");
            }
        }
    }
}

2.2.6 FutureTask总结

  • Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
  • 对于简单的业务场景使用Future完全可以.但最好使用轮询方式

Future优化思路(完成一些复杂的任务)

  • 回调通知

    • Future任务完成了可以告诉我们,也就是我们的回调通知
  • 创建异步任务 :Future+线程池配合

  • 多个任务前后依赖可以组合处理

    • 想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值
    • 将两个或多个异步计算合成一个异步计算,这几个异步计算相互独立,同时后面这个又依赖前一个处理的结果
  • 对计算速度选最快

    • 当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果
  • 更完备的任务控制

    • 仅仅靠Future的这些方法是无法完成复杂操作的

      image-20230310130711756

2.3 CompletableFuture闪亮登场

  • get()方法在Future计算完成之前会一直处在阻塞状态下,isDone()方法容易耗费CPU资源.
  • 对于真正的异步处理我们希望是可以通过传入回调函数,在Futrue结束时自动调用该回调函数,这样,我们就不用等待结果.
  • 阻塞的方式和异步编程的涉及理念相违背,而轮询的方式会耗费无谓的CPU资源,因此,JDK8设计出CompletableFuture.
  • CompletableFuture提供了一种观察者模式类的机制,可以让任务执行完成后通知监听的一方.
  • 在Java 8中, CompletableFuture提供了非常强大的Future的扩展功能, 可以帮助我们简化异步编程的复杂性, 并且提供了函数式编程的能力, 可以通过回调的方式处理计算结果, 也提供了转换和组合CompletableFuture的方法
  • 它可能代表一个明确完成的Future, 也有可能代表一个完成阶段(CompletionStage) , 它支持在计算完成以后触发一些函数或执行某些动作。

结构图:

image-20230311120424758

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}

2.3.1 CompletionStage

  • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段.
  • 一个阶段的计算执行可以是一个Function,Consumer或者Runnable,比如:stage.thenApply(x - >square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发.
  • 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数.

2.3.2 CompletableFuture核心的四个静态方法

image-20230311122230326

以上方法中Executor executor参数说明:

  • 带Executor的方法,直接使用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码.
  • 带Executor的方法,使用我们自定义的线程池

为什么使用静态方法来创建CompletableFuture,而不使用new CompletableFuture的方式呢?

官方提供了构造方法,但是在API中说明是不完备的,这个构造方法只是从语法上合规

image-20230311121532176

2.3.2.1 runAsync无返回值

带Executor executor参数的都是可以使用自定义线程池的

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)

代码实例:

package site.zhourui.juc;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.*;

public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> completableFuture= CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            //停顿几秒线程
            try {
                SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(completableFuture.get());
    }
}

执行结果:

image-20230311122717970

2.3.2.2 supplyAsync有返回值

带Executor executor参数的都是可以使用自定义线程池的

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

代码实例:

package site.zhourui.juc;

import java.util.concurrent.*;

public class CompletableFutureBuildDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "helllo supplyasync";
        });
        System.out.println(completableFuture.get());
    }
}

执行结果:

image-20230311123121519

2.3.3 CompletableFuture通用演示

注意:默认线程池ForkJoin会在主线程执行完成时关闭,如果有任务正在使用该线程池那么可能不会出结果

image-20230311124545311

CompletableFuture完成与Future相同的功能

package site.zhourui.juc;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        future1();
    }
    private static void future1() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "...come in");
            int result = ThreadLocalRandom.current().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("1秒钟后出结果:" + result);
            return result;
        });
        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务!");
        System.out.println(completableFuture.get());
    }
}

执行结果:和FutureTask一致

image-20230311123842399

CompletableFuture通用演示

package site.zhourui.juc;

import java.util.concurrent.*;

public class CompletableFutureUseDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //自定义线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        try {
            CompletableFuture.supplyAsync(() ->{
                System.out.println(Thread.currentThread().getName()+"...come in");
                int result = ThreadLocalRandom.current().nextInt(10);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("1秒钟后出结果:"+result);
                if(result > 5){
                    int i = 10/0; //制造异常
                }
                return result;
            },threadPool).whenComplete((v,e) ->{ //v表示result,e表示异常,CompletableFuture通过whenComplete来减少阻塞和轮询(自动回调)
                if(e == null){//判断有没有异常
                    System.out.println("计算完成,更新系统update value:"+v);
                }
            }).exceptionally(e ->{
                e.printStackTrace();
                System.out.println("异常情况:"+e.getCause()+"\t"+e.getMessage());
                return null;
            });
            System.out.println(Thread.currentThread().getName()+"线程先去忙其它任务");
        }catch (Exception e){
            e.printStackTrace();
        }finally {//关闭线程池
            threadPool.shutdown();
        }
    }
}

正常执行结果:

image-20230311131101181

异常执行结果:

image-20230311131157947

CompletableFuture优点总结:

  • 异步任务结束时,会自动回调某个对象的方法;
  • 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
  • 异步任务出错时,会自动回调某个对象的方法

2.3.4 函数式接口串讲

函数式接口定义:

任何接口,如果只包含唯一一个抽象方法,那么它就是一个函数式接口.对于函数式接口,我们可以通过lambda表达式来创建该接口的对象.

更准确的来说接口定义时带有@FunctionalInterface注解的都是函数式接口

2.3.4.1 常见的函数式接口
函数式接口名称方法名称参数返回值
Runnablerun无参数无返回值
Functionapply1个参数有返回值
Consumeaccept1个参数无返回值
Supplierget没有参数有返回值
Biconsumeraccept2个参数无返回值
  • Biconsumer(Bi是英语词根代表两个的意思,我们要传入两个参数,在上面的案例中是v和e)
2.3.4.2 链式调用写法

@Accessors(chain = true)开启链式编程,需要lombok

public class Chain {
    public static void main(String[] args) {
        //-------------------老式写法------------
//        Student student = new Student();
//        student.setId(1);
//        student.setMajor("cs");
//        student.setName("小卡");
        new Student().setId(1).setName("大卡").setMajor("cs");
    } 
}

@NoArgsConstructor
@AllArgsConstructor
@Data
@Accessors(chain = true)//开启链式编程
class Student{
    private int id;
    private String name;
    private String major;
}
2.3.4.3 join和get对比

功能几乎一样,区别在于编码时是否需要抛出异常

  • get()方法会在编译期间会做异常的检查,因此需要抛出异常或者做异常处理
  • join()方法不会在编译期间会做异常的检查
public class Chain {
    public static void main(String[] args) throws ExecutionException, InterruptedException {//抛出异常
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            return "hello 12345";
        });
        System.out.println(completableFuture.get());
    }

}

public class Chain {
    public static void main(String[] args)  {//抛出异常
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            return "hello 12345";
        });
        System.out.println(completableFuture.join());
    }
}

2.4 CompletableFuture电商比价案例

实战精讲-比价网站case:
1 需求说明
1.1 同一款产品,同时搜索出同款产品在各大电商平台的售价;
1.2 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少

2 输出返回:
出来结果希望是同款产品的在不同地方的价格清单列表, 返回一个List<String>
《mysql》in jd price is 88.05
《mysql》in dang dang price is 86.11
《mysql》in tao bao price is 90.43

3 解决方案,比对同一个商品在各个平台上的价格,要求获得一个清单列表
1   stepbystep   , 按部就班, 查完京东查淘宝, 查完淘宝查天猫......
2   all in       ,万箭齐发,一口气多线程异步任务同时查询。。。
package site.zhourui.juc;



import lombok.Getter;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

class CompletableFutureNetMallDemo
{
    static List<NetMall> list = Arrays.asList(
            new NetMall("jd"),
            new NetMall("pdd"),
            new NetMall("taobao"),
            new NetMall("dangdangwang"),
            new NetMall("tmall")
    );

    //同步 ,step by step

    /**
     * List<NetMall>  ---->   List<String>
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPriceByStep(List<NetMall> list,String productName)
    {
        return list
                .stream().
                map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName)))
                .collect(Collectors.toList());
    }
    //异步 ,多箭齐发

    /**
     * List<NetMall>  ---->List<CompletableFuture<String>> --->   List<String>
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPriceByASync(List<NetMall> list,String productName)
    {
        return list
                .stream()
                .map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + " is %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName))))
                .collect(Collectors.toList())
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }

    public static void main(String[] args)
    {
        long startTime = System.currentTimeMillis();
        List<String> list1 = getPriceByStep(list, "mysql");
        for (String element : list1) {
            System.out.println(element);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");

        System.out.println();

        long startTime2 = System.currentTimeMillis();
        List<String> list2 = getPriceByASync(list, "mysql");
        for (String element : list2) {
            System.out.println(element);
        }
        long endTime2 = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime2 - startTime2) +" 毫秒");

    }
}

class NetMall
{
    @Getter
    private String mallName;
    
    public NetMall(String mallName)
    {
        this.mallName = mallName;
    }

    public double calcPrice(String productName)
    {
        //检索需要1秒钟
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    }
}

执行结果:

image-20230311134426286

3. CompletableFuture常用方法

image-20230311135102639

3.1 获得结果和触发计算

3.1.1 getNow

get之前介绍过了,这里介绍getNow

getNow相当于备用方案如果此时异步任务还没执行完成就使用getNow设置的默认值,如果完成了就是用任务返回值

package site.zhourui.juc.cf;


import java.util.concurrent.*;

public class CompletableFutureAPIDemo
{
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException
    {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            return 1;
        },threadPoolExecutor);

        //System.out.println(future.get());
        //System.out.println(future.get(2L,TimeUnit.SECONDS));
        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println(future.getNow(9999));
        threadPoolExecutor.shutdown();
    }
}


image-20230311140516943

3.1.2 complete主动触发计算

当调用CompletableFuture.get()被阻塞的时候,complete方法就是结束阻塞并get()获取设置的complete里面的值.

package site.zhourui.juc.cf;


import java.util.concurrent.*;

public class CompletableFutureAPIDemo
{
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException
    {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            return 1;
        },threadPoolExecutor);

        //System.out.println(future.get());
        //System.out.println(future.get(2L,TimeUnit.SECONDS));
        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
//        System.out.println(future.getNow(9999));

        System.out.println(future.complete(-44)+"\t"+future.get());
        threadPoolExecutor.shutdown();
    }
}

3.2 对计算结果进行处理

总结:

thenApply 和handle都是将线程船型化

但是thenApply 报异常后,之后的线程无法继续执行

handle 报异常后,之后的线程可以继续执行

thenApply 带一个参数即返回值,handle带两个参数多带一个异常

3.2.1 thenApply

出错了不会继续执行

package site.zhourui.juc.cf;

import java.util.concurrent.*;

public class CompletableFutureBuildDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
        CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("111");
            return 1024;
        }).thenApply(f -> {
            System.out.println("222");
            return f + 1;
        }).thenApply(f -> {
            int age = 10/0; // 异常情况:那步出错就停在那步。
            System.out.println("333");
            return f + 1;
        }).whenCompleteAsync((v,e) -> {
            System.out.println("*****v: "+v);
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });
        System.out.println("-----主线程结束,END");
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
    }
}

image-20230311142241342

3.2.2 handle

出错了还是会继续执行

package site.zhourui.juc.cf;

import java.util.concurrent.*;

public class CompletableFutureBuildDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        //当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
        // 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
        CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("111");
            return 1024;
        }).handle((f,e) -> {
            int age = 10/0;
            System.out.println("222");
            return f + 1;
        }).handle((f,e) -> {
            System.out.println("333");
            return f + 1;
        }).whenCompleteAsync((v,e) -> {
            System.out.println("*****v: "+v);
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });
        System.out.println("-----主线程结束,END");
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
    }
}

image-20230311142417454

3.3 对计算结果进行消费

接收任务的处理结果,并消费处理,无返回结果

3.3.1 thenAccept

package site.zhourui.juc.cf;

import java.util.concurrent.*;

public class CompletableFutureBuildDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture.supplyAsync(() -> {
            return 1;
        }).thenApply(f -> {
            return f + 2;
        }).thenApply(f -> {
            return f + 3;
        }).thenApply(f -> {
            return f + 4;
        }).thenAccept(r -> System.out.println(r));
    }
}

image-20230311143143903

3.3.2 Code之任务之间的顺序执行

thenRun相当于不需要结果也不会返回值

image-20230311143655866

3.4 对计算速度进行选用

谁快用谁

3.4.1 applyToEither

package site.zhourui.juc.cf;

import java.util.concurrent.*;

public class CompletableFutureBuildDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            return 10;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            return 20;
        });

        CompletableFuture<Integer> thenCombineResult = completableFuture1.applyToEither(completableFuture2,f -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            return f + 1;
        });

        System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get());
    }
}

image-20230311143945373

3.5 对计算结果进行合并

  • 两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine 来处理
  • 先完成的先等着,等待其它分支任务

3.5.1 thenCombine

package site.zhourui.juc.cf;

import java.util.concurrent.*;

public class CompletableFutureBuildDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");
            return 10;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");
            return 20;
        }), (x,y) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");
            return x + y;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");
            return 30;
        }),(a,b) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");
            return a + b;
        });
        System.out.println("-----主线程结束,END");
        System.out.println(thenCombineResult.get());

        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
    }
}

image-20230311144351014

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/403474.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Webpack打包———处理样式资源

基本使用 本质上&#xff0c;webpack 是一个用于现代 JavaScript 应用程序的 静态模块打包工具。当 webpack 处理应用程序时&#xff0c;它会在内部从一个或多个入口点构建一个 依赖图(dependency graph)&#xff0c;然后将你项目中所需的每一个模块组合成一个或多个 bundles&a…

2.JVM常识之 jvm常见配置参数

1.常见的配置参数说明 常见配置 -Xms3072M -Xmx3072M -Xss1M -Xmn2048M -XX:MetaspaceSize256M -XX:MaxMetaspaceSize256M -XX:SurvivorRatio8 **-Xss&#xff1a;**每个线程的栈内存大小 默认是1M 说明一个线程栈里能分配的栈帧越少&#xff0c;但是对JVM整体来说能开启…

文档团队怎样使用GIT做版本管理

有不少小型文档团队想转结构化写作和发布&#xff0c;但是因为有限的IT技能和IT资源而受阻。本文为这样的小型文档团队而准备&#xff0c;描述怎样使用Git做内容的版本管理。 - 1 - 为什么需要版本管理 当一个团队进行协同创作内容时&#xff0c;有以下需要&#xff1a; 在对…

【C++】图

本文包含了图的基本概念 1.相关概念 1.1 无/有向 无向图&#xff1a;每一个顶点之间的连线没有方向 有向图&#xff1a;连线有方向&#xff08;类似离散数学的二元关系 <A,B>代表从A到B的边&#xff0c;有方向&#xff09; <A,B>中A为始点&#xff0c;B为终点在…

JDBC的API详解

&#x1f34e;道阻且长&#xff0c;行则将至。&#x1f353; 目录 一、DriverManager 驱动管理类 1.注册驱动 2.获取数据库连接 二、Connection 数据库连接对象 1.获取执行对象 2.事务管理 三、Statement 1.执行DDL、DML语句 2.执行DQL语句 四、ResultSet 以JDBC快速…

【漏洞复现】Grafana任意文件读取(CVE-2021-43798)

docker环境搭建 #进入环境 cd vulhub/grafana/CVE-2021-43798#启动环境&#xff0c;这个过程可能会有点慢&#xff0c;保持网络通畅 docker-compose up -d#查看环境 docker-compose ps直接访问虚拟机 IP地址:3000 目录遍历原理 目录遍历原理&#xff1a;攻击者可以通过将包含…

CNCF x Alibaba云原生技术公开课 第七章 应用编排与管理:Job和DaemonSet

1、Job&#xff1a;管理任务的控制器 概念 首先 kubernetes 的 Job 是一个管理任务的控制器&#xff0c;它可以创建一个或多个 Pod 来指定 Pod 的数量&#xff0c;并可以监控它是否成功地运行或终止&#xff1b;可以根据 Pod 的状态来给 Job 设置重置的方式及重试的次数&…

【Qt网络编程】实现TCP协议通信

文章目录概要&#xff1a;本期主要讲解QT中对于TCP协议通信的实现。一、TCP协议二、Qt中TCP协议处理1.QTcpSocket2.QTcpServer三、Qt实现TCP通信1.客户端2.服务器端结尾概要&#xff1a;本期主要讲解QT中对于TCP协议通信的实现。 一、TCP协议 传输控制协议&#xff08;TCP&am…

有哪些值得推荐的办公软件下载网站

新买了电脑之后&#xff0c;我们需要安装一些常用的办公软件才能方便我们的办公使用。很多小白不知道在哪里下载办公软件比较好&#xff0c;下面小编就来为大家分享几个值得推荐的办公软件下载网站。 1.常用软件下载 对于常用软件如果我们通过百度搜索&#xff0c;能够辨别官方…

【2】Dijkstra与SPFA等常见最短路算法的分析与比较——Bellman-Ford与SPFA

合集目录&#xff1a; 前言 一、Dijkstra 二、Bellman-Ford与SPFA&#xff08;本文&#xff09; 三、Dijkstra与SPFA的比较 四、Floyd 五、启发式搜索 Bellman-Ford 1. 算法介绍 The algorithm was first proposed by Alfonso Shimbel (1955), but is instead named af…

哪个牌子的洗地机耐用?耐用的洗地机推荐

作为当下非常热销的洗地机&#xff0c;它不仅解放了双手&#xff0c;使用也非常的便捷。是生活品质提高的最好代表&#xff0c;但是面对市面上让人眼花缭乱的洗地机&#xff0c;挑选几个来回都决定不了到底入手哪个好&#xff01;为了能帮助大家选购到合适的洗地机&#xff0c;…

gcc 编译的过程

#include <stdio.h> #define PI 3.14 int main(int argc, char const *argv[]) { //打印IP的值printf("PI %lf\n", PI);return 0; }编译的过程&#xff1a;预处理、编译、汇编、链接1.预处理&#xff1a;宏替换、删除注释、头文件包含、条件编译 -E &#xf…

问心 | 再看token、session和cookie

什么是cookie HTTP Cookie&#xff08;也叫 Web Cookie或浏览器 Cookie&#xff09;是服务器发送到用户浏览器并保存在本地的一小块数据&#xff0c;它会在浏览器下次向同一服务器再发起请求时被携带并发送到服务器上。 什么是session Session 代表着服务器和客户端一次会话…

Paddle项目调试记录

PaddlePaddle是百度公司提出的深度学习框架。近年来深度学习在很多机器学习领域都有着非常出色的表现&#xff0c;在图像识别、语音识别、自然语言处理、机器人、网络广告投放、医学自动诊断和金融等领域有着广泛应用。面对繁多的应用场景&#xff0c;深度学习框架有助于建模者…

MyBatis-Plus(狂神)

一.特点 无侵入&#xff1a;只做增强不做改变&#xff0c;引入它不会对现有工程产生影响&#xff0c;如丝般顺滑损耗小&#xff1a;启动即会自动注入基本 CURD&#xff0c;性能基本无损耗&#xff0c;直接面向对象操作强大的 CRUD 操作&#xff1a;内置通用 Mapper、通用 Serv…

GDB调试快速入门

什么是GDB&#xff1a; GDB - - - (GNU symbolic debugger)是Linux平台下最常用的一款程序调试器。 自己的Linux是否安装GDB? 一般来说&#xff0c;使用Ubuntu的话&#xff0c;系统就会自带的有GDB调试器的 命令窗口输入如下命令可以查看是否安装了gdb&#xff1a; gdb -v …

制作一个简单的信用卡验证表

下载:https://download.csdn.net/download/mo3408/87559584 效果图: 您可以从文章顶部附近的下载按钮获取该项目的完整代码。这些文件的概述如下所示: 我们需要将两个 .css 文件和两个 .js 文件包含在我们的 HTML 中。所有其他资源,例如 Bootstrap 框架、jQuery 和 Web 字…

SecureCRT 安装并绑定ENSP设备终端

软件下载链接链接&#xff1a;https://pan.baidu.com/s/1WFxmQgaO9bIiUTwBLSR4OA?pwd2023 提取码&#xff1a;2023 CRT安装&#xff1a;软件可以从上面链接进行下载&#xff0c;下载完成后解压如下&#xff1a;首先双击运行scrt-x64.8.5.4 软件&#xff0c;进行安装点击NEXT选…

PMP项目管理项目资源管理

目录1 项目资源管理概述2 规划资源管理3 估算活动资源4 获取资源5 建设团队6 管理团队7 控制资源1 项目资源管理概述 项目资源管理包括识别、获取和管理所需资源以成功完成项目的各个过程&#xff0c;这些过程有助于确保项目经理和项目团队在正确的时间和地点使用正确的资源。…

Nacos未授权访问漏洞(CVE-2021-29441)

目录漏洞描述影响范围环境搭建漏洞复现声明&#xff1a;本文仅供学习参考&#xff0c;其中涉及的一切资源均来源于网络&#xff0c;请勿用于任何非法行为&#xff0c;否则您将自行承担相应后果&#xff0c;本人不承担任何法律及连带责任。加粗样式 漏洞描述 Nacos 是阿里巴巴…