文章目录
- 前言
- 1. 并发(Concurrent) 和 并行(Parallel)
- 1.1 并发的来源
- 1.2 并发技术解决了什么问题
- 2. 并行的来源
- 2.1 并行解决了什么问题
- 3. CompletableFuture 简介
- 4. CompletableFuture 简单应用
- 5. CompletableFuture 工厂方法的应用
- 6. CompletableFuture join() 方法
- 7. 使用 ParallelStream 还是 CompletableFuture
- 8. 使用 CompletableFuture 编排异步任务
- 9. CompletableFuture 响应 completion 事件
- 10. CompletableFuture 异常处理
前言
看 《Java8 实战》后,觉得自己对多线程应用还是停留在 JUC 工具类的使用上,忽略了 CompletableFuture 这么强大的工具。本文主要内容
- 复习并行、并发的概念。
- 多线程的编程模型
- CompletableFuture 让多线程编程更加清爽
- 有时间的话,补充 CompletableFuture 的内部原理
1. 并发(Concurrent) 和 并行(Parallel)
Concurrent 和 Parallel 作为形容词,并列到一起。对应Java 的类名/方法名 也有所体现:
- ConcurrentHashMap
- parallelStream()
1.1 并发的来源
在单核CPU的时代,根本不可能真正同时运行一个以上的线程(进程是线程的容器,Linux是把时间片分给线程)。
假设有网易音乐、Chrome浏览器这两个应用需要同时运行,操作系统会 轮流 把这两个应用的任务放到同一个线程上执行。
宏观上看,CPU把时间片分给了不同应用,不同应用持有单个线程某一段时间的运行权力。这就是并发技术。
1.2 并发技术解决了什么问题
在 web 技术中,同一时刻请求的接收能力提高了,具体的:
如果有耗时较长的数据库查询、外部资源请求,一个线程不具有并发能力则耗时操作会一直阻塞后面的请求。
2. 并行的来源
多核CPU的出现
2.1 并行解决了什么问题
除了压榨硬件资源从而提高响应速度外,还尽可能减少任务之间的并发度。因为一个CPU核心管一个任务的情况下,任务之间是隔离的,也就是线程安全的。
3. CompletableFuture 简介
这个类是 Java 8 引入的,用于解决 Futrue
异步编程的局限性:
- Futrue 任务之间的依赖关系很难表达
- 等待Futrue集合中的所有任务都完成
- 应对Future的完成事件
“可以说 CompletableFuture 和 Future 的关系就跟 Stream 和 Collections的关系一样”
4. CompletableFuture 简单应用
- 定义一个异步任务
public Future<Double> getPriceAsync(String product) {
// 用于接收异步任务的响应
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
// 异步任务
new Thread( () -> {
try {
double price = calculatePrice(product);
// 异步任务完成后通知(带上返回值)
futurePrice.complete(price);
} catch (Exception ex) {
// 异步任务有异常,也会通知调用方
futurePrice.completeExceptionally(ex);
}
}
).start();
return futurePrice;
}
- 调用异步任务
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
doSomething();
try {
double pricie = futurePrice.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
5. CompletableFuture 工厂方法的应用
- getPriceAsync 可以用已有的api改写为:
// 同样获得了异步处理、异常处理的能力
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
6. CompletableFuture join() 方法
- 书中用了两个Stream,因为Stream有延时特性,写在一起的话第一个任务提交后,会被立即join();
- 立即join的副作用就是,主线程会阻塞等待第一个任务完成后才继续后面操作
- 进而所有线程都变成了顺序执行
- 所以需要拆成两个Stream
// 获取并行运算的任务列表
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product))
.collect(toList()); // import 了 Collectors.toList()
// 汇总并行运算的计算结果
List<String> result = priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
7. 使用 ParallelStream 还是 CompletableFuture
- 计算密集型使用 parallelStream() , 其默认的最大并行数就是 CPU核心数,不用额外维护其他参数
- IO密集或者等待时间不稳定的,使用 CompletableFuture
8. 使用 CompletableFuture 编排异步任务
- 有依赖关系
List<CompletableFuture<String>> priceFutures =
shops.stream()
// 获取价格 (异步)
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product))
// 解析报价
.map(future -> future.thenApply(Quota::parse))
// 为计算折扣价构造 future (异步) 【该异步任务需要等待报告被解析出来】
.map(future -> future.thenCompose(quota ->
CompletableFuture.supplyAsync(() ->
Discount.applyDiscount(quote),
executor
)
)
.collect(toList());
getPrice 和 applyDiscount 都是非阻塞调用,会比阻塞调用快一点
- 无依赖关系
Future<Double> futurePriceInUSD =
shops.stream()
// 获取价格 (异步)
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product))
// 获取汇率
.thenCombine(
CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)),
// 两个异步任务整合, 哪个值先获取到无所谓
(price, rate) -> price * rate
);
9. CompletableFuture 响应 completion 事件
CompletableFuture[] futures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product))
.map(future -> future.thenApply(Quota::parse))
.map(future -> future.thenCompose(quota ->
CompletableFuture.supplyAsync(() ->
Discount.applyDiscount(quote),
executor
)
)
// 【定义事件完成后做什么事】
.map(f -> thenAccept(System.out.println))
.toArray(size -> new CompletableFuture[size]);
// 等待所有子线程执行完成
CompletableFuture.allOf(futures).join();
10. CompletableFuture 异常处理
- 引用最早的一个代码
public Future<Double> getPriceAsync(String product) {
// 用于接收异步任务的响应
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
// 异步任务
new Thread( () -> {
try {
double price = calculatePrice(product);
// 异步任务完成后通知(带上返回值)
futurePrice.complete(price);
} catch (Exception ex) {
// 异步任务有异常,也会通知调用方
futurePrice.completeExceptionally(ex);
}
}
).start();
return futurePrice;
}
如果 calculatePrice 抛出异常,即 futurePrice.completeExceptionally(ex) 后,futurePrice 的调用端也会抛出运行时异常。这个异常处理也会封装在 CompletableFuture.supplyAsync(() -> calculatePrice(product)); 的api中
- exceptionally
参考这篇文章