本专栏学习内容又是来自尚硅谷周阳老师的视频
有兴趣的小伙伴可以点击视频地址观看
在学习CompletableFuture
之前,必须要先了解一下Future
Future
概念
Future
接口(FutureTask
实现类)定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
比如主线程在执行多个任务时,其中有一个任务耗时特别久,可以交给子线程执行,主线程继续做自己的事情,过一会再去获取子任务的执行结果。
FutureTask
FutureTask
是Future
的一个实现类
引出FutureTask
Future
提供了一种异步并行计算的功能,目的是:异步多线程任务执行且返回有结果,三个特点
- 多线程
- 有返回
- 异步任务
说句题外话,小黄一直不了解多线程和异步有什么区别,刚好问了ChatGpt
多线程和异步是两个相关但又不完全相同的概念。
多线程指的是在一个程序中同时运行多个线程,每个线程都是独立执行的。多线程可以使程序具有并发性,提高系统的吞吐量和响应速度。通过多线程,可以将耗时的操作和任务分配给不同的线程来执行,从而提高整体的工作效率。
异步指的是一种编程模型,其中任务的执行不需要等待前一个任务的完成。在异步编程中,可以执行一个任务,而不需要阻塞等待其完成,而是继续执行后续的任务。异步操作通常适用于涉及网络请求、文件读写、数据库查询等可能造成阻塞的操作。通过异步操作,可以充分利用等待时间来执行其他任务,提高程序的效率。
多线程和异步之间存在一定的关系。在实现异步操作时,通常会使用多线程来实现异步任务的执行。通过创建新的线程,可以在后台执行耗时的操作,而不会阻塞主线程的执行。
例如,对于一个网络请求,使用多线程可以在主线程发送请求后,继续执行其他任务,而不需要等待网络请求返回的数据。当网络请求完成后,在新的线程中处理响应数据,从而实现异步操作。
总结起来,多线程是一种实现并发性的机制,而异步是一种编程模型,在实现异步操作时通常会使用多线程来达到异步执行的效果。多线程可以提供资源的并行使用,而异步可以提高程序的运行效率和响应性。
来看一下创建线程的方式
实现Runnable
接口可以开启多线程
实现Callable
接口可以开启多线程并且有返回值
class MyThread1 implements Runnable {
@Override
public void run() {
}
}
class MyThread2 implements Callable<String> {
@Override
public String call() throws Exception {
return null;
}
}
但是Thread
的构造方法中只有参数为Runnable
的方法,无法满足我们的需求
继续往下看,发现RunnableFuture
接口继承了Runnable
以及Future
,那也就是说他具有多线程、异步两个特点
而FutureTask
实现了RunnableFuture
接口,并且他的构造方法中可以传入Callable
,那么他就同时具有了多线程、异步、有返回三大特点。
优点
Future
配合线程池异步多线程,能显著提高程序的效率
需求
主线程需要执行三个任务,且三个任务耗时分别是500毫秒、300毫秒、300毫秒,如果主线程自己执行的话,那程序至少需要花费11秒的时间,现在使用Future + 线程池
来优化
实现
public static void main(String[] args) throws InterruptedException {
//创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
long startTime = new Date().getTime();
//创建任务
FutureTask<String> task1 = new FutureTask<>(() -> {
Thread.sleep(500);
return "task1 over";
});
threadPool.submit(task1,"t1");
FutureTask<String> task2 = new FutureTask<>(() -> {
Thread.sleep(300);
return "task2 over";
});
threadPool.submit(task2,"t2");
Thread.sleep(300);
System.out.println("task3 over");
long endTime = new Date().getTime();
System.out.println("花费" + (endTime - startTime) + "毫秒"); //花费338毫秒
}
缺点
get方法会阻塞
调用task1.get()
会使主线程阻塞,因为get()
他会一直等待子线程返回结果在继续运行。
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建任务
FutureTask<String> task1 = new FutureTask<>(() -> {
Thread.sleep(5000);
return "task1 over";
});
new Thread(task1,"t1").start();
System.out.println("t1线程结果:" + task1.get());
System.out.println("主线程执行完毕");
}
isDone方法轮询
对于上述代码,没有一个友好的提示,导致我们不知道程序为何阻塞,FutureTask
提供了isDone()
,调用该方法,结果为true
表示线程执行完毕。
但是这种方法的结果就是需要不停的轮询,大量的消耗了CPU
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建任务
FutureTask<String> task1 = new FutureTask<>(() -> {
Thread.sleep(5000);
return "task1 over";
});
new Thread(task1,"t1").start();
while (true) {
if (task1.isDone()) {
System.out.println("t1线程结果:" + task1.get());
break;
}else {
Thread.sleep(500);
System.out.println("请等待");
}
}
System.out.println("主线程执行完毕");
}
更复杂的任务
对于简单的任务使用Future
完全可以解决,下面有几个更为复杂的需求Future
不好解决了
-
多个任务前后可以组合处理
例如:子线程A计算返回的结果,在子线程B中需要用到
-
对计算速度选最快
例如:联机游戏,谁先到终点谁就赢了,那么当A到达终点时,B的线程也需要中断
对此,就引出了CompletableFuture
,这就有点像一个名场面东厂管不了的,我西厂来管,东厂管得了的,我西厂更要管
,也就是说Future
有的功能CompletableFuture
都有,Future
没有的功能CompletableFuture
也有,有点像plus版本。
CompletableFuture
之前介绍了Future
,发现他只能解决一些简单的逻辑,并且阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无谓的CPU资源,所有CompletableFuture
应运而生。
概念
CompletableFuture
提供了一种类似于观察者模式的机制,可以让任务执行完后通知监听的一方。
他是JDK1.8新增的类,实现了Future
、CompletionStage
接口
Future
不用在介绍了,CompletionStage
提供了一种处理异步操作结果的机制,可以与回调函数一起使用,来处理 CompletableFuture 的计算结果。
创建方式
我们学习一个新的类的方式,第一步就是看他的构造函数,创建这个类,CompletableFuture
虽然有一个空参构造函数,但是官方并不推荐我们使用,一般我们通过4个静态方法来创建。
调用静态方法创建 | 返回值 |
---|---|
CompletableFuture runAsync(Runnable runnable) | 无 |
CompletableFuture runAsync(Runnable runnable,Executor executor) | 无 |
CompletableFuture supplyAsync(Supplier supplier) | 有 |
supplyAsync(Supplier supplier, Executor executor) | 有 |
代码实现
runAsync()
不带有线程池,默认使用ForkJoinPool
的线程
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println("进入子线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
System.out.println(completableFuture.get()); //null
System.out.println("主线程结束");
}
supplyAsync
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("进入子线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int result = ThreadLocalRandom.current().nextInt(10);
System.out.println("子线程执行结果:" + result);
return result;
});
System.out.println(completableFuture.get());
System.out.println("主线程结束");
}
带有线程池的创建就不举例了
通用异步编程
上面还是在演示Future
原有的功能,接下来学一下新的功能
通过whenComplete
来监听子进程执行完毕,来做一系列操作
通过exceptionally
来解决子进程出现异常的情况
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("进入子线程:" + Thread.currentThread().getName());
System.out.println(Thread.currentThread().isDaemon()); //守护线程
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int result = ThreadLocalRandom.current().nextInt(10);
System.out.println("子线程执行结果:" + result);
return result;
}).whenComplete((v,e) -> {
if (e == null) {
System.out.println("计算结果为:" + v);
}
}).exceptionally(e -> {
//处理异常
e.printStackTrace();
System.out.println("计算过程出现异常:" + e.getCause() + "\t" + e.getMessage());
return null;
});
System.out.println("主线程结束");
}
//输出
进入子线程:ForkJoinPool.commonPool-worker-9
主线程结束
但是发现控制台输出没有等待结果,主线程就直接结束了,这是因为默认情况下ForkJoinPool
里面是守护线程,解决方法有两种
- 在主线程结束前等待
- 使用自定义的线程池
修改代码
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("进入子线程:" + Thread.currentThread().getName());
System.out.println(Thread.currentThread().isDaemon()); //守护线程
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int result = ThreadLocalRandom.current().nextInt(10);
System.out.println("子线程执行结果:" + result);
return result;
},threadPool).whenComplete((v,e) -> {
if (e == null) {
System.out.println("计算结果为:" + v);
}
}).exceptionally(e -> {
//处理异常
e.printStackTrace();
System.out.println("计算过程出现异常:" + e.getCause() + "\t" + e.getMessage());
return null;
});
System.out.println("主线程结束");
threadPool.shutdown();
}
//输出
进入子线程:pool-1-thread-1
false
主线程结束
子线程执行结果:7
计算结果为:7
通过控制台输出可以看到,自定义线程池创建的是用户线程,所以即使是主线程执行完毕,程序还是要等待所有用户线程执行完毕才会结束。
链式语法
接下来会用到很多链式语法,这个在Java8很常见,其实就是在写法上更加简洁了
public class CompletableFutureDemo5 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Student student = new Student();
/*
以前的写法
student.setId(1);
student.setName("张三"));
*/
student.setId(1).setName("张三");
System.out.println(student);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
class Student{
private Integer id;
private String name;
}
join方法
在介绍join()
之前,必须先介绍一下get()
get()
是获取异步计算结果,但是在编译期会需要抛出异常
join()
也是获取异步计算结果,但是不需要抛出异常
电商比价案例
需求:
需要查询《深入理解JVM虚拟机》这本书在各大电商平台销售的价格,显示结果如下
《深入理解JVM虚拟机》in jd price is 100
普通解决方案
使用同步方式,一步一步来,查一个保存一个价格
此方法的优点是简洁粗暴,缺点是非常的耗时
public class CompletableFutureDemo7 {
static List<NetMall> malls = Arrays.asList(
new NetMall("jd"),
new NetMall("tb"),
new NetMall("pdd")
);
public static List<String> step(List<NetMall> list ,String productName){
return list.stream()
.map(netMall -> String.format(productName + " in %s price is %.2f",
netMall.getNetMallName(),
netMall.calcPrice(productName)))
.collect(Collectors.toList());
}
public static void main(String[] args) {
long startTime = new Date().getTime();
List<String> list = step(malls, "MySQL");
long endTime = new Date().getTime();
System.out.println("耗时:" + (endTime - startTime));
for (String item : list) {
System.out.println(item);
}
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class NetMall{
private String netMallName;
public double calcPrice(String productName) {
//模拟请求过程耗时
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
}
}
//结果
耗时:3067
MySQL in jd price is 78.86
MySQL in tb price is 78.95
MySQL in pdd price is 78.98
异步解决方案
核心计算方法,使用异步的方式进行,这样大大的节约了时间
public static List<String> byCompletableFuture(List<NetMall> list ,String productName){
return list.stream()
.map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",
netMall.getNetMallName(),
netMall.calcPrice(productName))))
.collect(Collectors.toList())
.stream()
.map(s -> s.join())
.collect(Collectors.toList());
}
//结果
耗时:1050
MySQL in jd price is 77.77
MySQL in tb price is 77.18
MySQL in pdd price is 77.32
计算方法
CompletableFuture
提供了非常多的计算方法
获取结果和触发计算
方法名 | 作用 |
---|---|
public T get() | 获取结果,会造成当前线程阻塞 |
public T get(long timeout, TimeUnit unit) | 获取结果,在指定的时间内获取不到,抛出异常 |
public T join() | 获取结果,跟get()用法一致,区别是编译器不需要抛异常 |
public T getNow(T valueIfAbsent) | 立刻获取结果,如果结果没出来,使用指定值代替结果 |
public boolean complete(T value) | 中断计算,计算过程被中断返回true,并且用指定值代替计算结果 |
public static void main(String[] args) throws InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "123";
});
TimeUnit.SECONDS.sleep(2);
System.out.println(completableFuture.complete("completeValue") + "\t " + completableFuture.join());//true completeValue
}
对计算结果进行处理
方法名 | 作用 |
---|---|
public CompletableFuture thenApply() | 获取计算结果,对其进行处理 |
public CompletableFuture handle() | 作用同thenApply,区别在于遇到异常不会组织下一步运行 |
thenApply()
public class CompletableFutureDemo9 {
public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("111");
return 1;
},threadPool).thenApply(f -> {
System.out.println("222");
return f + 1;
}).thenApply(f -> {
System.out.println("333");
return f + 2;
}).whenComplete((v,e) -> {
if (e == null) {
System.out.println("计算结果:" + v);
}
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
System.out.println("主线程去忙其他的了 : " + Thread.currentThread().getName());
threadPool.shutdown();
}
}
可以看到程序在抛异常时,就停止了,不会继续往下执行
hanlde()
public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("111");
return 1;
}, threadPool).handle((f, e) -> {
int i = 1 / 0;
System.out.println("222");
return f + 1;
}).handle((f, e) -> {
System.out.println("333");
return f + 2;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("计算结果:" + v);
}
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
System.out.println("主线程去忙其他的了 : " + Thread.currentThread().getName());
threadPool.shutdown();
}
通过输出可以看出在第一个handle
中出现异常,不继续往下执行该handle
的方法,但是不影响后续的hanlde
方法
对计算结果进行消费
消费,顾名思义就是把这条消息消费掉,后面的人就获取不到这条消息了。
方法 | 作用 |
---|---|
public CompletableFuture thenRun(Runnable action) | 任务A执行完执行B,并且B不需要A的结果 |
public CompletableFuture thenAccept(Consumer<? super T> action) | 任务A执行完执行B,B需要A的结果,但是任务B没有返回值 |
public CompletableFuture thenApply | 任务A执行完执行B,B需要A的结果,同事任务B有返回值 |
如下所示,thenAccept()
方法,在获取结果时为null
public static void main(String[] args) throws InterruptedException {
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join()); //null
System.out.println(CompletableFuture.supplyAsync(() -> "resultB").thenAccept(y -> {System.out.println(y);}).join()); //resultB null
System.out.println(CompletableFuture.supplyAsync(() -> "resultC").thenApply(y -> y + "resultD").join()); //resultCresultD
}
对运行线程池进行选择
不使用线程池,默认走的是ForkJoinPool
使用线程池,走的全都是自定义线程池
使用线程池,中间调用了thenRunAsync
方法,那么之后的方法都会使用ForkJoinPool
源码
thenRun()
和thenRunAsync()
区别在于一个传参使用了默认的线程池
对计算速度进行选用
调用applyToEither()
方法,他会将两个异步任务先完成的值返回
public class CompletableFutureDemo12 {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}
return "future1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {TimeUnit.SECONDS.sleep(4);} catch (InterruptedException e) {throw new RuntimeException(e);}
return "future2";
});
CompletableFuture<String> result = future1.applyToEither(future2, s -> s + " is win");
System.out.println(result.join()); //future1 is win
}
}
对计算结果进行合并
thenCombine()
可以将两个计算结果合并
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}
return 10;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
try {TimeUnit.SECONDS.sleep(4);} catch (InterruptedException e) {throw new RuntimeException(e);}
return 20;
});
CompletableFuture<Integer> result = future1.thenCombine(future2, (x,y) -> {
System.out.println("计算结果合并");
return x + y;
});
System.out.println(result.join()); //30
}