一、获得结果和触发计算
1.获取结果
(1)public T get()
public class CompletableFutureAPIDemo{
public static void main(String[] args) throws ExecutionException, InterruptedException{
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
//get():阻塞1s后返回结果(必须得到结果)
System.out.println(completableFuture.get());
}
}
运行结果:
(2)public T get(long timeout,TimeUnit unit)
public class CompletableFutureAPIDemo{
public static void main(String[] args) throws ExecutionException, InterruptedException{
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
//get(2L,TimeUnit.SECONDS):只等待2s,2s没有获取到结果抛出
System.out.println(completableFuture.get(2L,TimeUnit.SECONDS));
}
}
运行结果:
(3)public T join()
public class CompletableFutureAPIDemo{
public static void main(String[] args){
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
//join():和get()一致,但是不抛出异常
System.out.println(completableFuture.join());
}
}
运行结果:
(4)public T getNow(T valueIfAbsent)
public class CompletableFutureAPIDemo{
public static void main(String[] args){
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
///暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//getNow(T valueIfAbsent)
//没有计算完成的情况下,给我一个替代结果
//立即获取结果不阻塞,计算完返回结果,没计算完返回替代结果
//也不需要抛出异常
System.out.println(completableFuture.getNow("xyz"));
}
}
运行结果:
2.主动触发计算
(1)public boolean complete(T value)
public class CompletableFutureAPIDemo{
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
//complete(T value):是否打断get方法立即返回括号值
//计算未执行完,打断成功,返回true,返回括号替代值
//计算执行完,打断失败,返回false,返回原计算结果
System.out.println("是否打断成功:" + completableFuture.complete("xyz"));
System.out.println("结果:" + completableFuture.get());
}
}
运行结果:
二、对计算结果进行处理
1.thenApply
- 计算结果存在依赖关系,使两个线程串行化
- 存在依赖关系(当前报错,不走下一步),当前步骤有异常的话就叫停
public class CompletableFutureAPI2Demo{
public static void main(String[] args){
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() ->{
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("111");
return 1;
},threadPool).thenApply(f -> {
int i=10/0;
System.out.println("222");
return f + 2;
}).thenApply(f -> {
System.out.println("333");
return f + 3;
}).whenComplete((v,e) -> {
if (e == null) {
System.out.println("----计算结果: "+v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName()+"----主线程正在执行其他任务");
threadPool.shutdown();
}
}
结果:
2.handle
- 计算结果存在依赖关系,这两个线程串行化
- 有异常也可以往下一步走,根据带的异常参数可以进一步处理
public class CompletableFutureAPI2Demo{
public static void main(String[] args){
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() ->{
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("111");
return 1;
},threadPool).handle((f,e) -> {
int i=10/0;
System.out.println("222");
return f + 2;
}).handle((f,e) -> {
System.out.println("333");
return f + 3;
}).whenComplete((v,e) -> {
if (e == null) {
System.out.println("----计算结果: "+v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName()+"----主线程正在执行其他任务");
threadPool.shutdown();
}
}
结果:
三、对计算结果进行消费
1.thenAccept
- 接受任务的处理结果,并消费处理,无返回结果
public class CompletableFutureApi2Demo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
return 1;
}, threadPool).thenApply(f -> {
return f + 2;
}).thenApply(f -> {
return f + 2;
}).thenAccept(r -> {
System.out.println(r);//5
});
}
}
结果:
2.对比补充
- thenRun(Runnable runnable) :任务A执行完执行B,并且不需要A的结果
- thenAccept(Consumer action): 任务A执行完执行B,B需要A的结果,但是任务B没有返回值
- thenApply(Function fn): 任务A执行完执行B,B需要A的结果,同时任务B有返回值
public class CompletableFutureApi2Demo {
public static void main(String[] args) {
System.out.println(CompletableFuture.supplyAsync(() -> "result").thenRun(() -> {}).join());//null
System.out.println(CompletableFuture.supplyAsync(() -> "result").thenAccept(r -> System.out.println(r)).join());//result null
System.out.println(CompletableFuture.supplyAsync(() -> "result").thenApply(f -> f + 2).join());//result2
}
}
结果:
3.CompletableFuture和线程池说明
- 如果没有传入自定义线程池,都用默认线程池ForkJoinPool
- 传入一个线程池,如果你执行第一个任务时,传入了一个自定义线程池
-
- 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务时共用同一个线程池
- 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自定义的线程池,第二个任务使用的是ForkJoin线程池
- 备注:可能是线程处理太快,系统优化切换原则, 直接使用main线程处理,thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,之间的区别同理。(方法名后面有无Async的区别是,有Async的方法在底层会调用uniRunStage(asyncPool,action);方法,将线程池更改为ForkJoinPool而不是自定义线程池)
四、对计算速度进行选用
1.applyToEither
- 用于在两个
CompletableFuture
中任何一个完成时,执行某个函数。它允许我们并行执行两个异步任务,并在任意一个任务完成后,立即对结果进行处理,而无需等待另一个任务完成。 (谁快用谁)
public class CompletableFutureApiDemo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("A come in");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "playA";
}, threadPool);
CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("B come in");
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "playB";
}, threadPool);
CompletableFuture<String> result = playA.applyToEither(playB, f -> {
return f + " is winner";
});
/**
* A come in
* B come in
* main-----------winner:playA is winner
*/
System.out.println(Thread.currentThread().getName() + "-----------winner:" + result.join());
}
}
结果:
五、对计算结果进行合并
1.thenCombine
- 用于将两个异步任务的结果组合在一起并执行某个操作。它会等待两个
CompletableFuture
都完成后,再将它们的结果一起传递给一个指定的函数进行处理。
public class CompletableFutureApi3Demo {
public static void main(String[] args) {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 启动");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 启动");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
});
CompletableFuture<Integer> finalResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
System.out.println("----------开始两个结果合并");
return x + y;
});
System.out.println(finalResult.join());
}
}
结果: