Future 接口理论
Future 接口(FutureTask 实现类): 定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等
方法图:
类图:
代码示例:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(new MyThread2());
Thread t1 = new Thread(futureTask, "t1");
t1.start();
System.out.println(futureTask.get());
}
}
// 多线程,无返回
class MyThread implements Runnable{
@Override
public void run() {
}
}
// 无返回
class MyThread2 implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("----- come in call()");
return "hello Callable";
}
}
优点:
Future + 线程池异步多线程任务配合,能显著提高程序的执行效率
import java.util.concurrent.*;
public class FutureThreadPoolDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
long startTime = System.currentTimeMillis();
FutureTask<String> futureTask1 = new FutureTask<String>(()-> {
try{
TimeUnit.MICROSECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task1 over";
});
threadPool.submit(futureTask1);
FutureTask<String> futureTask2 = new FutureTask<String>(()-> {
try{
TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task2 over";
});
threadPool.submit(futureTask2);
try{
TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println(futureTask1.get());
System.out.println(futureTask2.get());
System.out.println("---costTime" + (endTime - startTime) + "毫秒");
System.out.println(Thread.currentThread().getName() + "\t ----end");
threadPool.shutdown();
}
public static void m1(String[] args) {
long startTime = System.currentTimeMillis();
try{
TimeUnit.MICROSECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
try{
TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
try{
TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println("---costTime" + (endTime - startTime) + "毫秒");
System.out.println(Thread.currentThread().getName() + "\t ----end");
}
}
缺点:
- get: 容易导致阻塞,一般建议放在程序后面
- IsDone 轮询:轮询的方式会耗费无谓的 CPU 资源,而且也不见得能及时地得到计算结果, 如果想要异步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
public class FutureApiDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask1 = new FutureTask<String>(()-> {
try{
System.out.println(Thread.currentThread().getName() + "\t ---- come in");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task1 over";
});
Thread t1 = new Thread(futureTask1, "t1");
t1.start();
// System.out.println(futureTask1.get()); // 非要等到结果才会离开,不管你是否计算完成, get 容易导致阻塞,一般建议放在程序后面
// System.out.println(Thread.currentThread().getName() + "\t -- 忙其它任务了");
while(true) {
if (futureTask1.isDone()) {
System.out.println(futureTask1.get());
break;
}else {
try{
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("正在处理中...");
}
}
}
}
CompletableFuture
CompletableFure 提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方
类图:
CompletionStage:
- CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
- 一个阶段的执行可能被单个阶段的完成触发,也可以由多个阶段一起触发
CompletableFuture:
- 在 java8 中,CompletableFuture 提供了非常强大的 Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法
- 它可能代表一个明确完成的 Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作
- 它实现了 Future 和 CompletionStage 接口
四大常用静态方法:
- 没有执行 Executor 的方法,直接使用默认的 ForkJoinPool.commonPool() 作为它的线程池执行异步代码
- 如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码
类型 | 方法名 |
runAsync 无返回值 | public static CompletableFuture<Void> runAsync(Runable runable) |
public static CompletableFuture<Void> runAsync(Runable runable, Executor executor) | |
supplyAsync 有返回值 | public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) |
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) |
示例代码:
import java.util.concurrent.*;
public class CompletableFutureBuildDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
// CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()-> {
// System.out.println(Thread.currentThread().getName());
// try{
// TimeUnit.SECONDS.sleep(1);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }, threadPool);
// System.out.println(completableFuture.get());
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()-> {
System.out.println(Thread.currentThread().getName());
try{
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello supplyAsync";
}, threadPool);
System.out.println(completableFuture.get());
threadPool.shutdown();
}
}
减少阻塞和轮询:
可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法
import java.util.concurrent.*;
public class CompletableFutureUseDemo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "----come in");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(result > 5) {
int i = 10/0;
}
System.out.println("---- 1 秒钟后出结果:" + result);
return result;
}, threadPool).whenComplete((v, e) -> {
if(e != null) {
System.out.println("---- 计算完成,更新系统UpdateValue:" + v);
}
}).exceptionally(e-> {
e.printStackTrace();
System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + " 线程先去忙其他任务");
}
public static void future2(String[] args) {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "----come in");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("---- 1 秒钟后出结果:" + result);
return result;
}).whenComplete((v, e) -> {
if(e != null) {
System.out.println("---- 计算完成,更新系统UpdateValue:" + v);
}
}).exceptionally(e-> {
e.printStackTrace();
System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + " 线程先去忙其他任务");
// ForkJoinPool 避免守护线程关闭
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void future1() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "----come in");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("---- 1 秒钟后出结果:" + result);
return result;
});
System.out.println(Thread.currentThread().getName() + " 线程先去忙其他任务");
System.out.println(completableFuture.get());
}
}
CompletableFuture的优点:
- 异步任务结束时,会自动回调某个对象的方法
- 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
- 异步任务出错时,会自动回调某个对象方法
使用案例:
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class CompletableFutureMallDemo {
static List<NetMall> list = Arrays.asList(
new NetMall("jd")
, new NetMall("dangdang")
, new NetMall("taobao"));
public static List<String> getPrice(List<NetMall> list, String productName) {
return list.stream()
.map(it-> String.format(productName + "in %s price is %.2f", it.getNetMallName(), it.calcPrice(productName)))
.collect(Collectors.toList());
}
public static List<String> getPriceByCompletableFuture(List<NetMall> list, String productName) {
return list.stream()
.map(it-> CompletableFuture.supplyAsync(()-> String.format(productName + "in %s price is %.2f", it.getNetMallName(), it.calcPrice(productName))))
.collect(Collectors.toList())
.stream()
.map(s -> s.join()).collect(Collectors.toList());
}
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
List<String> list1 = getPrice(list, "mysql");
for (String element : list1) {
System.out.println(element);
}
long endTime = System.currentTimeMillis();
System.out.println("----costTime:" + (endTime - startTime) + "毫秒");
long startTime2 = System.currentTimeMillis();
List<String> list2 = getPriceByCompletableFuture(list, "mysql");
for (String element : list1) {
System.out.println(element);
}
long endTime2 = System.currentTimeMillis();
System.out.println("----costTime:" + (endTime2 - startTime2) + "毫秒");
}
}
class NetMall{
private String netMallName;
public String getNetMallName() {
return netMallName;
}
public NetMall(String netMallName) {
this.netMallName = netMallName;
}
public double calcPrice(String productName) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
}
}
常用方法
结果和触发计算:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class CompletableFutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()-> {
System.out.println(Thread.currentThread().getName());
try{
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello supplyAsync";
});
// System.out.println(completableFuture.get());
// System.out.println(completableFuture.get(2, TimeUnit.SECONDS));
// System.out.println(completableFuture.join());
// 如果正常完成,则返回正常值,否则返回设置的默认值
// System.out.println(completableFuture.getNow("xxx"));
// 是否打断 get 方法立即返回括号值
System.out.println(completableFuture.complete("completeValue") + "\t" + completableFuture.join());
}
}
计算结果进行处理:
public class CompletableFutureAPI2Demo {
public static void main(String[] args) {
// 任务 A 执行完执行 B,并且 B 不需要 A的结果
System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenRun(()-> {}).join());
// 任务 A 执行完执行B,B 需要 A的结果,但是任务 B 无返回值
System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenAccept(r-> System.out.println(r)).join());
// 任务 A 执行完执行B, B需要 A的结果,同时任务 B 有返回值
System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenApply(r-> r+ "resultB").join());
}
// thenAccept 消费处理,无返回结果
public static void thenAccept(String[] args) {
CompletableFuture.supplyAsync(()-> {
return 1;
}).thenApply(f-> {
return f + 2;
}).thenApply(f-> {
return f + 3;
}).thenAccept((v)->{
System.out.println("消费处理:" + v);
});
}
// handler 可以带着异常继续向下走
public static void handle() {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
CompletableFuture.supplyAsync(()-> {
System.out.println(Thread.currentThread().getName());
int i = 10/0;
try{
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1111);
return 1;
},threadPool).handle((f,e)-> {
System.out.println(2222);
return f + 2;
}).handle((f,e)-> {
System.out.println(3333);
return f + 3;
}).whenComplete((v, e)->{
if (e == null) {
System.out.println("---- 计算结果:" + v);
}
}).exceptionally(e-> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "---- 主线程先去忙其他任务");
}
// 如果出现异常,无法走下一步
public static void thenApply() {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
CompletableFuture.supplyAsync(()-> {
System.out.println(Thread.currentThread().getName());
try{
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1111);
return 1;
},threadPool).thenApply(f-> {
System.out.println(2222);
return f + 2;
}).thenApply(f-> {
System.out.println(3333);
return f + 3;
}).whenComplete((v, e)->{
if (e == null) {
System.out.println("---- 计算结果:" + v);
}
}).exceptionally(e-> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "---- 主线程先去忙其他任务");
}
}
CompletableFuture 和 线程池:
- 没有传入自定义线程池,都是默认线程池 ForkJoinPool
- 传入了一个自定义线程池,如果第一个执行第一个任务的时候,传入了一个自定义线程池,调用 thenRun 方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。调用 thenRunAsync 执行第二个任务时,则第一个任务使用的你自己传入的线程池,第二个任务使用的是 ForkJoin 线程池
- 有可能处理太快,系统优化切换原则,直接使用 main 线程处理
import java.util.concurrent.*;
public class CompletableFutureWithThreadPoolDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("1 号任务\t" + Thread.currentThread().getName());
try{ TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }
return "hello";
}, threadPool).thenRun(() -> {
try{ TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("2 号任务\t" + Thread.currentThread().getName());
}).thenRunAsync(() -> {
try{ TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("3 号任务\t" + Thread.currentThread().getName());
}).thenRun(() -> {
try{ TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("4 号任务\t" + Thread.currentThread().getName());
});
System.out.println(completableFuture.get(2, TimeUnit.SECONDS));
}
}
计算速度选用:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class CompletableFutureFastDemo {
public static void main(String[] args) {
CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
System.out.println("A come in");
try{ TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
return "playA";
});
CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
System.out.println("B come in");
try{ TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
return "playB";
});
CompletableFuture<String> result = playA.applyToEither(playB, f -> {
return f + " is winer";
});
System.out.println(Thread.currentThread().getName() + "\t----" + result.join());
}
}
计算结果进行合并:
- 两个 CompletionStage 任务都完成后,最终能把两个任务的结果一起交给 thenCombine 来处理
- 先完成的先等着,等待其他分支任务
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class CompletableFutureCombineDemo {
public static void main(String[] args) {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\n" + "启动了");
try{ TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
return 10;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\n" + "启动了");
try{ TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
return 20;
});
CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
System.out.println("------ 开始两个结果合并");
return x + y;
});
System.out.println(Thread.currentThread().getName() + "\t----" + result.join());
}
}