JUC高级: CompletableFuture
1. 线程基础知识复习
1.1 JUC四大口诀
-
高内聚低耦合前提下,封装思想
线程—>操作---->资源类
-
判断、干活、通知
-
防止虚假唤醒,wait方法要注意使用while判断
-
注意标志位flag,可能是volatile的
1.2 为什么多线程及其重要?
摩尔定律:
它是由英特尔创始人之一Gordon Moore(戈登·摩尔)提出来的。其内容为:
当价格不变时,集成电路上可容纳的元器件的数目约每隔18-24个月便会增加一倍,性能也将提升一倍。
换言之,每一美元所能买到的电脑性能,将每隔18-24个月翻一倍以上。这一定律揭示了信息技术进步的速度。
-
硬件方面
- 摩尔定律失效
- 可是从2003年开始CPU主频已经不再翻倍,而是采用多核而不是更快的主频。
- 在主频不再提高且核数在不断增加的情况下,要想让程序更快就要用到并行或并发编程。
- 摩尔定律失效
-
软件方面
- 充分利用多核处理器
- 提高程序性能,高并发系统
- 提高程序吞吐量,异步+回调等生产需求
-
弊端及问题
- 线程安全问题
- i++
- 集合类安全问题
- 线程锁问题
- 线程性能问题
- 线程安全问题
1.3 start一个线程原理
OpenJDK官网网址
OpenJDK8源码下载地址
-
java 线程理解及openjdk中的实现
-
源码中我们调用一个线程的start方法实质上是调用start0方法
-
而start0是native方法
-
Java语言本身底层就是C++语言
-
java线程是通过start的方法启动执行的,主要内容在native方法start0中,
Openjdk的写JNI一般是一一对应的,Thread.java对应的就是Thread.c
-
C++底层源码解读
-
openjdk8\jdk\src\share\native\java\lang
thread.c
-
start0其实就是JVM_StartThread。此时查看源代码可以看到在jvm.h中找到了声明,jvm.cpp中的
JVM_StartThread
实现。
-
-
openjdk8\hotspot\src\share\vm\prims
jvm.cpp
- JVM_StartThread方法本质上就是调用了JVM中的start方法
-
openjdk8\hotspot\src\share\vm\runtime
thread.cpp
-
JVM中的start方法本质上就是C++调用操作系统的创建一个线程
-
-
总结: java中调用start本质上就是调用的start0本地方法,而本地方法是C++通过调用操作系统创建线程
1.4 Java多线程相关概念
-
进程
是程序的⼀次执⾏,是系统进⾏资源分配和调度的独⽴单位,每⼀个进程都有它⾃⼰的内存空间和系统资源
-
线程
线程(英语:thread)是操作系统能够进行运算调度的最小单位。 它被包含在进程之中,是进程中的实际运作单位。
-
管程
Monitor(监视器),也就是我们平时所说的锁
Monitor其实是一种同步机制,他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码。
1.5 用户线程和守护线程
Java线程分为用户线程和守护线程,线程的daemon属性为true表示是守护线程,false表示是用户线程
- 守护线程
- 是一种特殊的线程,在后台默默地完成一些系统性的服务,比如垃圾回收线程
- 用户线程
- 是系统的工作线程,它会完成这个程序需要完成的业务操作
注意事项:
设置守护线程,需要在start()方法之前进行
当程序中所有用户线程执行完毕之后,不管守护线程是否结束,系统都会自动退出
- 如果用户线程全部结束了,意味着程序需要完成的业务操作已经结束了,系统可以退出
了。所以当系统只剩下守护进程的时候,java虚拟机会自动退出
示例代码
public class DaemonDemo
{
public static void main(String[] args)
{
Thread t1 = new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"\t 开始运行,"+(Thread.currentThread().isDaemon() ? "守护线程":"用户线程"));
while (true) {
}
}, "t1");
//线程的daemon属性为true表示是守护线程,false表示是用户线程
t1.setDaemon(true);
t1.start();
//3秒钟后主线程再运行
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("----------main线程运行完毕");
}
}
2. CompleteableFuture进化历史
2.0 并发最主要三要素多线程
,异步任务
,返回值
2.1 Future为什么出现
2.1.1 Future 接口
Future接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
Future 接口内部方法:
2.1.2 Callable接口
Callable接口中定义了需要有返回的任务需要实现的方法。
Future出现的原因:
目的:让主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务.
Future可以开启异步任务而callable接口可以开启新线程并且拿到返回值,
两者结合即可达到目的
2.2 Future接口常用实现类FutureTask异步任务
2.2.1 FutureTask相关架构
2.2.2 FutureTask对Callable的特殊支持
从2.2.1FutureTask的架构图中可以看出FutureTask是对Runnable支持的,但是使用Runnable创建的任务是不会有返回值的.但是我们打开FutureTask的源码发现FutureTask的构造方法是对Callable接口进行支持(有返回值、可抛出异常),那么我们返回值
的目的也就达到了
2.2.3 FutureTask基础使用(创建线程)
package site.zhourui.juc;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FutureTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask<>(new MyThread());
Thread t1 = new Thread(futureTask, "t1");
t1.start();
System.out.println(futureTask.get());
}
}
class MyThread implements Callable {
@Override
public Object call() throws Exception {
System.out.println("进入callable子线程");
return "hello callable";
}
}
执行结果:
2.2.4 FutureTask结合线程池
package site.zhourui.juc;
import java.util.concurrent.*;
public class FutureTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
long s = System.currentTimeMillis();
FutureTask<String> task1 = new FutureTask<>(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
return "task1 over";
});
FutureTask<String> task2 = new FutureTask<>(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
return "task2 over";
});
FutureTask<String> task3 = new FutureTask<>(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
return "task3 over";
});
fixedThreadPool.submit(task1);
fixedThreadPool.submit(task2);
fixedThreadPool.submit(task3);
System.out.println(task1.get());
System.out.println(task2.get());
System.out.println(task3.get());
long e = System.currentTimeMillis();
System.out.println("执行时间:"+(e-s));
fixedThreadPool.shutdown();
}
}
执行结果:
结论:future+线程池异步多线程任务配合,能显著提高程序的执行效率。
2.2.5 FutureTask 的缺点(为什么会使用CompleteableFuture)
2.2.5.1 get方法的阻塞
正常情况下:我们的get方法放在主线程执行之后是没有任何问题的
package site.zhourui.juc;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
FutureTask<String> futureTask = new FutureTask<>(() ->{
System.out.println(Thread.currentThread().getName()+"\t...come in");
TimeUnit.SECONDS.sleep(5);
return "task over";
});
Thread t1 = new Thread(futureTask,"t1");
t1.start();
System.out.println(Thread.currentThread().getName()+"/t...忙其他任务了");
System.out.println(futureTask.get()); //git方法等待
}
}
get方法过时不候.,超过等待时间没有拿到结果直接抛出异常
优点:假如我不愿意等待很长时间,我希望过时不候,可以自动离开
缺点:如果写多了抛出大量异常不优雅,但也可以用
可以通过捕获异常的方式做其他业务处理
package site.zhourui.juc;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
FutureTask<String> futureTask = new FutureTask<>(() ->{
System.out.println(Thread.currentThread().getName()+"\t...come in");
TimeUnit.SECONDS.sleep(5);
return "task over";
});
Thread t1 = new Thread(futureTask,"t1");
t1.start();
System.out.println(Thread.currentThread().getName()+"/t...忙其他任务了");
// System.out.println(futureTask.get()); //get方法等待
System.out.println(futureTask.get(3,TimeUnit.SECONDS));//get方法过时不候
}
/**
*1 get容易导致阻塞,一般建议放在程序后面,一旦调用不见不散,非要等到结果才会离开,不管你是否计算完成,容易程序堵塞。
*2 假如我不愿意等待很长时间,我希望过时不候,可以自动离开.
*/
}
当我们在主线程做其他事情之前调用get方法,那么主线程会被阻塞,主线程会一直等到子线程任务执行完毕,get方法拿到返回值为止,那么这样和单线程没有任何区别甚至更慢
综上注意事项:
一旦调用get()方法,不管是否计算完成,都会导致程序阻塞,所以get()方法的位置一般放在程序最后
2.2.5.2 isDone()轮询
缺点:轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果.
如果想要异步获取结果,通常都会以轮询的方式去获取结果尽量不要阻塞
利用
if(futureTask.isDone())
的方式使得FutureTask在结束之后才get(),但是也会消耗cpu通过sleep降低查询的频率,减少cpu的消耗
package site.zhourui.juc;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureAPIDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
FutureTask<String> futureTask = new FutureTask<>(() ->{
System.out.println(Thread.currentThread().getName()+"\t...come in");
TimeUnit.SECONDS.sleep(5);
return "task over";
});
Thread t1 = new Thread(futureTask,"t1");
t1.start();
System.out.println(Thread.currentThread().getName()+"/t...忙其他任务了");
while(true){
if(futureTask.isDone()){
System.out.println(futureTask.get());
break;
}else{
//暂停毫秒
try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("正在处理中,不要再催了,越催越慢 ,再催熄火");
}
}
}
}
2.2.6 FutureTask总结
- Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
- 对于简单的业务场景使用Future完全可以.但最好使用轮询方式
Future优化思路(完成一些复杂的任务)
回调通知
- Future任务完成了可以告诉我们,也就是我们的
回调通知
创建异步任务 :Future+线程池配合
多个任务前后依赖可以组合处理
- 想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值
- 将两个或多个异步计算合成一个异步计算,这几个异步计算相互独立,同时后面这个又依赖前一个处理的结果
对计算速度选最快
- 当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果
更完备的任务控制
仅仅靠Future的这些方法是无法完成复杂操作的
2.3 CompletableFuture闪亮登场
get()
方法在Future计算完成之前会一直处在阻塞状态下,isDone()
方法容易耗费CPU资源.- 对于真正的异步处理我们希望是可以通过传入回调函数,在Futrue结束时自动调用该回调函数,这样,我们就不用等待结果.
- 阻塞的方式和异步编程的涉及理念相违背,而轮询的方式会耗费无谓的CPU资源,因此,JDK8设计出
CompletableFuture
.CompletableFuture
提供了一种观察者模式类的机制,可以让任务执行完成后通知监听的一方.- 在Java 8中,
CompletableFuture
提供了非常强大的Future的扩展功能, 可以帮助我们简化异步编程的复杂性, 并且提供了函数式编程的能力, 可以通过回调的方式处理计算结果, 也提供了转换和组合CompletableFuture
的方法- 它可能代表一个明确完成的Future, 也有可能代表一个完成阶段(
CompletionStage
) , 它支持在计算完成以后触发一些函数或执行某些动作。
结构图:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}
2.3.1 CompletionStage
- CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段.
- 一个阶段的计算执行可以是一个Function,Consumer或者Runnable,比如:stage.thenApply(x - >square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
- 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发.
- 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数.
2.3.2 CompletableFuture核心的四个静态方法
以上方法中Executor executor参数说明:
- 带Executor的方法,直接使用默认的
ForkJoinPool.commonPool()
作为它的线程池执行异步代码.- 带Executor的方法,使用我们自定义的线程池
为什么使用静态方法来创建CompletableFuture,而不使用new CompletableFuture的方式呢?
官方提供了构造方法,但是在API中说明是不完备的,这个构造方法只是从语法上合规
2.3.2.1 runAsync无返回值
带Executor executor参数的都是可以使用自定义线程池的
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
代码实例:
package site.zhourui.juc;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.*;
public class CompletableFutureBuildDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> completableFuture= CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
//停顿几秒线程
try {
SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(completableFuture.get());
}
}
执行结果:
2.3.2.2 supplyAsync有返回值
带Executor executor参数的都是可以使用自定义线程池的
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
代码实例:
package site.zhourui.juc;
import java.util.concurrent.*;
public class CompletableFutureBuildDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "helllo supplyasync";
});
System.out.println(completableFuture.get());
}
}
执行结果:
2.3.3 CompletableFuture通用演示
注意:默认线程池ForkJoin会在主线程执行完成时关闭,如果有任务正在使用该线程池那么可能不会出结果
CompletableFuture完成与Future相同的功能
package site.zhourui.juc;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class CompletableFutureUseDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
future1();
}
private static void future1() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "...come in");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("1秒钟后出结果:" + result);
return result;
});
System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务!");
System.out.println(completableFuture.get());
}
}
执行结果:和FutureTask一致
CompletableFuture通用演示
package site.zhourui.juc;
import java.util.concurrent.*;
public class CompletableFutureUseDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//自定义线程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
try {
CompletableFuture.supplyAsync(() ->{
System.out.println(Thread.currentThread().getName()+"...come in");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("1秒钟后出结果:"+result);
if(result > 5){
int i = 10/0; //制造异常
}
return result;
},threadPool).whenComplete((v,e) ->{ //v表示result,e表示异常,CompletableFuture通过whenComplete来减少阻塞和轮询(自动回调)
if(e == null){//判断有没有异常
System.out.println("计算完成,更新系统update value:"+v);
}
}).exceptionally(e ->{
e.printStackTrace();
System.out.println("异常情况:"+e.getCause()+"\t"+e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName()+"线程先去忙其它任务");
}catch (Exception e){
e.printStackTrace();
}finally {//关闭线程池
threadPool.shutdown();
}
}
}
正常执行结果:
异常执行结果:
CompletableFuture优点总结:
- 异步任务结束时,会
自动回调
某个对象的方法;- 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
- 异步任务出错时,会自动回调某个对象的方法
2.3.4 函数式接口串讲
函数式接口定义:
任何接口,如果只包含唯一一个抽象方法,那么它就是一个函数式接口.对于函数式接口,我们可以通过lambda表达式来创建该接口的对象.
更准确的来说接口定义时带有@FunctionalInterface注解的都是函数式接口
2.3.4.1 常见的函数式接口
函数式接口名称 | 方法名称 | 参数 | 返回值 |
---|---|---|---|
Runnable | run | 无参数 | 无返回值 |
Function | apply | 1个参数 | 有返回值 |
Consume | accept | 1个参数 | 无返回值 |
Supplier | get | 没有参数 | 有返回值 |
Biconsumer | accept | 2个参数 | 无返回值 |
Biconsumer(Bi是英语词根代表两个的意思,我们要传入两个参数,在上面的案例中是v和e)
2.3.4.2 链式调用写法
@Accessors(chain = true)
开启链式编程,需要lombok
public class Chain {
public static void main(String[] args) {
//-------------------老式写法------------
// Student student = new Student();
// student.setId(1);
// student.setMajor("cs");
// student.setName("小卡");
new Student().setId(1).setName("大卡").setMajor("cs");
}
}
@NoArgsConstructor
@AllArgsConstructor
@Data
@Accessors(chain = true)//开启链式编程
class Student{
private int id;
private String name;
private String major;
}
2.3.4.3 join和get对比
功能几乎一样,区别在于编码时是否需要抛出异常
get()
方法会在编译期间会做异常的检查,因此需要抛出异常或者做异常处理join()
方法不会在编译期间会做异常的检查
public class Chain {
public static void main(String[] args) throws ExecutionException, InterruptedException {//抛出异常
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
return "hello 12345";
});
System.out.println(completableFuture.get());
}
}
public class Chain {
public static void main(String[] args) {//抛出异常
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
return "hello 12345";
});
System.out.println(completableFuture.join());
}
}
2.4 CompletableFuture电商比价案例
实战精讲-比价网站case:
1 需求说明
1.1 同一款产品,同时搜索出同款产品在各大电商平台的售价;
1.2 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
2 输出返回:
出来结果希望是同款产品的在不同地方的价格清单列表, 返回一个List<String>
《mysql》in jd price is 88.05
《mysql》in dang dang price is 86.11
《mysql》in tao bao price is 90.43
3 解决方案,比对同一个商品在各个平台上的价格,要求获得一个清单列表
1 stepbystep , 按部就班, 查完京东查淘宝, 查完淘宝查天猫......
2 all in ,万箭齐发,一口气多线程异步任务同时查询。。。
package site.zhourui.juc;
import lombok.Getter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
class CompletableFutureNetMallDemo
{
static List<NetMall> list = Arrays.asList(
new NetMall("jd"),
new NetMall("pdd"),
new NetMall("taobao"),
new NetMall("dangdangwang"),
new NetMall("tmall")
);
//同步 ,step by step
/**
* List<NetMall> ----> List<String>
* @param list
* @param productName
* @return
*/
public static List<String> getPriceByStep(List<NetMall> list,String productName)
{
return list
.stream().
map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName)))
.collect(Collectors.toList());
}
//异步 ,多箭齐发
/**
* List<NetMall> ---->List<CompletableFuture<String>> ---> List<String>
* @param list
* @param productName
* @return
*/
public static List<String> getPriceByASync(List<NetMall> list,String productName)
{
return list
.stream()
.map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + " is %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName))))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
public static void main(String[] args)
{
long startTime = System.currentTimeMillis();
List<String> list1 = getPriceByStep(list, "mysql");
for (String element : list1) {
System.out.println(element);
}
long endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");
System.out.println();
long startTime2 = System.currentTimeMillis();
List<String> list2 = getPriceByASync(list, "mysql");
for (String element : list2) {
System.out.println(element);
}
long endTime2 = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime2 - startTime2) +" 毫秒");
}
}
class NetMall
{
@Getter
private String mallName;
public NetMall(String mallName)
{
this.mallName = mallName;
}
public double calcPrice(String productName)
{
//检索需要1秒钟
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
}
}
执行结果:
3. CompletableFuture常用方法
3.1 获得结果和触发计算
3.1.1 getNow
get之前介绍过了,这里介绍getNow
getNow
相当于备用方案如果此时异步任务还没执行完成就使用getNow设置的默认值,如果完成了就是用任务返回值
package site.zhourui.juc.cf;
import java.util.concurrent.*;
public class CompletableFutureAPIDemo
{
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException
{
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
return 1;
},threadPoolExecutor);
//System.out.println(future.get());
//System.out.println(future.get(2L,TimeUnit.SECONDS));
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(future.getNow(9999));
threadPoolExecutor.shutdown();
}
}
3.1.2 complete主动触发计算
当调用CompletableFuture.get()被阻塞的时候,complete方法就是结束阻塞并get()获取设置的complete里面的值.
package site.zhourui.juc.cf;
import java.util.concurrent.*;
public class CompletableFutureAPIDemo
{
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException
{
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
return 1;
},threadPoolExecutor);
//System.out.println(future.get());
//System.out.println(future.get(2L,TimeUnit.SECONDS));
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
// System.out.println(future.getNow(9999));
System.out.println(future.complete(-44)+"\t"+future.get());
threadPoolExecutor.shutdown();
}
}
3.2 对计算结果进行处理
总结:
thenApply 和handle都是将线程船型化
但是thenApply 报异常后,之后的线程无法继续执行
handle 报异常后,之后的线程可以继续执行
thenApply 带一个参数即返回值,handle带两个参数多带一个异常
3.2.1 thenApply
出错了不会继续执行
package site.zhourui.juc.cf;
import java.util.concurrent.*;
public class CompletableFutureBuildDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException
{
//当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("111");
return 1024;
}).thenApply(f -> {
System.out.println("222");
return f + 1;
}).thenApply(f -> {
int age = 10/0; // 异常情况:那步出错就停在那步。
System.out.println("333");
return f + 1;
}).whenCompleteAsync((v,e) -> {
System.out.println("*****v: "+v);
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
System.out.println("-----主线程结束,END");
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
3.2.2 handle
出错了还是会继续执行
package site.zhourui.juc.cf;
import java.util.concurrent.*;
public class CompletableFutureBuildDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException
{
//当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
// 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("111");
return 1024;
}).handle((f,e) -> {
int age = 10/0;
System.out.println("222");
return f + 1;
}).handle((f,e) -> {
System.out.println("333");
return f + 1;
}).whenCompleteAsync((v,e) -> {
System.out.println("*****v: "+v);
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
System.out.println("-----主线程结束,END");
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
3.3 对计算结果进行消费
接收任务的处理结果,并消费处理,无返回结果
3.3.1 thenAccept
package site.zhourui.juc.cf;
import java.util.concurrent.*;
public class CompletableFutureBuildDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException
{
CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply(f -> {
return f + 2;
}).thenApply(f -> {
return f + 3;
}).thenApply(f -> {
return f + 4;
}).thenAccept(r -> System.out.println(r));
}
}
3.3.2 Code之任务之间的顺序执行
thenRun
相当于不需要结果也不会返回值
3.4 对计算速度进行选用
谁快用谁
3.4.1 applyToEither
package site.zhourui.juc.cf;
import java.util.concurrent.*;
public class CompletableFutureBuildDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException
{
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
return 10;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
return 20;
});
CompletableFuture<Integer> thenCombineResult = completableFuture1.applyToEither(completableFuture2,f -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
return f + 1;
});
System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get());
}
}
3.5 对计算结果进行合并
- 两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine 来处理
- 先完成的先等着,等待其它分支任务
3.5.1 thenCombine
package site.zhourui.juc.cf;
import java.util.concurrent.*;
public class CompletableFutureBuildDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException
{
CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");
return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");
return 20;
}), (x,y) -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");
return x + y;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");
return 30;
}),(a,b) -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");
return a + b;
});
System.out.println("-----主线程结束,END");
System.out.println(thenCombineResult.get());
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
}
}