Day843.CompletableFuture -Java 并发编程实战

news2025/1/11 14:08:58

CompletableFuture

Hi,我是阿昌,今天学习记录的是关于CompletableFuture的内容。

前面不止一次提到,用多线程优化性能,其实不过就是将串行操作变成并行操作。

如果仔细观察,还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,得把它们并行化,那具体实施起来该怎么做呢?

//以下两个方法都是耗时操作
doBizA();
doBizB();

还是挺简单的,就像下面代码中这样,创建两个子线程去执行就可以了。

会发现下面的并行方案,主线程无需等待 doBizA() 和 doBizB() 的执行结果,也就是说 doBizA() 和 doBizB() 两个操作已经被异步化了。

new Thread(()->doBizA())
  .start();
new Thread(()->doBizB())
  .start();  

异步化,是并行方案得以实施的基础,更深入地讲其实就是:

利用多线程优化性能这个核心方案得以实施的基础。看到这里,相信应该就能理解异步编程最近几年为什么会大火了,因为优化性能是互联网大厂的一个核心需求啊。

Java 在 1.8 版本提供了 CompletableFuture 来支持异步编程,CompletableFuture 有可能是见过的最复杂的工具类了,不过功能也着实让人感到震撼。


一、CompletableFuture 的核心优势

用 CompletableFuture 重新实现前面曾提及的烧水泡茶程序

首先还是需要先完成分工方案,在下面的程序中,分了 3 个任务:

  • 任务 1 负责洗水壶、烧开水
  • 任务 2 负责洗茶壶、洗茶杯和拿茶叶
  • 任务 3 负责泡茶。

其中任务 3 要等待任务 1 和任务 2 都完成后才能开始。

这个分工如下图所示。

在这里插入图片描述
下面是代码实现,先略过 runAsync()、supplyAsync()、thenCombine() 这些不太熟悉的方法,从大局上看,会发现:

  1. 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
  2. 语义更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述“任务 3 要等待任务 1 和任务 2 都完成后才能开始”;
  3. 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
//任务1:洗水壶->烧开水
CompletableFuture<Void> f1 = 
  CompletableFuture.runAsync(()->{
  System.out.println("T1:洗水壶...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T1:烧开水...");
  sleep(15, TimeUnit.SECONDS);
});
//任务2:洗茶壶->洗茶杯->拿茶叶
CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
  System.out.println("T2:洗茶壶...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T2:洗茶杯...");
  sleep(2, TimeUnit.SECONDS);

  System.out.println("T2:拿茶叶...");
  sleep(1, TimeUnit.SECONDS);
  return "龙井";
});
//任务3:任务1和任务2完成后执行:泡茶
CompletableFuture<String> f3 = 
  f1.thenCombine(f2, (__, tf)->{
    System.out.println("T1:拿到茶叶:" + tf);
    System.out.println("T1:泡茶...");
    return "上茶:" + tf;
  });
//等待任务3执行结果
System.out.println(f3.join());

void sleep(int t, TimeUnit u) {
  try {
    u.sleep(t);
  }catch(InterruptedException e){}
}
// 一次执行结果:
T1:洗水壶...
T2:洗茶壶...
T1:烧开水...
T2:洗茶杯...
T2:拿茶叶...
T1:拿到茶叶:龙井
T1:泡茶...
上茶:龙井

领略 CompletableFuture 异步编程的优势之后,下面详细介绍 CompletableFuture 的使用,首先是如何创建 CompletableFuture 对象。


二、创建 CompletableFuture 对象

创建 CompletableFuture 对象主要靠下面代码中展示的这 4 个静态方法,先看前两个。

在烧水泡茶的例子中,已经使用了runAsync(Runnable runnable)supplyAsync(Supplier supplier),它们之间的区别是:

Runnable 接口的 run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的。

前两个方法和后两个方法的区别在于:后两个方法可以指定线程池参数。

默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。

如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。

所以,强烈建议要根据不同的业务类型创建不同的线程池,以避免互相干扰。


//使用默认线程池
static CompletableFuture<Void> 
  runAsync(Runnable runnable)
static <U> CompletableFuture<U> 
  supplyAsync(Supplier<U> supplier)
//可以指定线程池  
static CompletableFuture<Void> 
  runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> 
  supplyAsync(Supplier<U> supplier, Executor executor)  

创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run() 方法或者 supplier.get() 方法,对于一个异步操作,需要关注两个问题:

  • 一个是异步操作什么时候结束
  • 另一个是如何获取异步操作的执行结果

因为 CompletableFuture 类实现了 Future 接口,所以这两个问题你都可以通过 Future 接口来解决。

另外,CompletableFuture 类还实现了 CompletionStage 接口,这个接口内容实在是太丰富了,在 1.8 版本里有 40 个方法,这些方法该如何理解呢?


三、如何理解 CompletionStage 接口

可以站在分工的角度类比一下工作流。

任务是有时序关系的,比如有

  • 串行关系
  • 并行关系
  • 汇聚关系等。

这样说可能有点抽象,这里还举前面烧水泡茶的例子,其中洗水壶和烧开水就是串行关系,洗水壶、烧开水和洗茶壶、洗茶杯这两组任务之间就是并行关系,而烧开水、拿茶叶和泡茶就是汇聚关系。
串行关系
并行关系

汇聚关系

CompletionStage 接口可以清晰地描述任务之间的这种时序关系,例如前面提到的 f3 = f1.thenCombine(f2, ()->{}) 描述的就是一种汇聚关系。

烧水泡茶程序中的汇聚关系是一种 AND 聚合关系,这里的 AND 指的是所有依赖的任务(烧开水和拿茶叶)都完成后才开始执行当前任务(泡茶)。

既然有 AND 聚合关系,那就一定还有 OR 聚合关系,所谓 OR 指的是依赖的任务只要有一个完成就可以执行当前任务。

在编程领域,还有一个绕不过去的山头,那就是异常处理,CompletionStage 接口也可以方便地描述异常处理。

CompletionStage 接口如何描述串行关系、AND 聚合关系、OR 聚合关系以及异常处理。

1、描述串行关系

CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。

thenApply 系列函数里参数 fn 的类型是接口 Function,这个接口里与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage。

而 thenAccept 系列方法里参数 consumer 的类型是接口Consumer,这个接口里与 CompletionStage 相关的方法是 void accept(T t),这个方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回的是CompletionStage。

thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是CompletionStage。

这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。

其中,需要你注意的是 thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的。


CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);

通过下面的示例代码,可以看一下 thenApply() 方法是如何使用的。

首先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。

不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。


CompletableFuture<String> f0 = 
  CompletableFuture.supplyAsync(
    () -> "Hello World")      //①
  .thenApply(s -> s + " QQ")  //②
  .thenApply(String::toUpperCase);//③

System.out.println(f0.join());
//输出结果
HELLO WORLD QQ

2、描述 AND 汇聚关系

CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

它们的使用你可以参考上面烧水泡茶的实现程序,这里就不赘述了。


CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

3、描述 OR 汇聚关系

CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。


CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

下面的示例代码展示了如何使用 applyToEither() 方法来描述一个 OR 汇聚关系。


CompletableFuture<String> f1 = 
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});

CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});

CompletableFuture<String> f3 = 
  f1.applyToEither(f2,s -> s);

System.out.println(f3.join());

4、异常处理

虽然上面提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常,例如下面的代码,执行 7/0 就会出现除零错误这个运行时异常。

非异步编程里面,可以使用 try{}catch{}来捕获并处理异常,那在异步编程里面,异常该如何处理呢?


CompletableFuture<Integer> 
  f0 = CompletableFuture.
    .supplyAsync(()->(7/0))
    .thenApply(r->r*10);
System.out.println(f0.join());

CompletionStage 接口给我们提供的方案非常简单,比 try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。


CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);

下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。

既然有 try{}catch{},那就一定还有 try{}finally{},whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。

whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。


CompletableFuture<Integer> 
  f0 = CompletableFuture
    .supplyAsync(()->(7/0))
    .thenApply(r->r*10)
    .exceptionally(e->0);
System.out.println(f0.join());

四、总结

曾经一提到异步编程,脑海里都会随之浮现回调函数,例如在 JavaScript 里面异步问题基本上都是靠回调函数来解决的,回调函数在处理异常以及复杂的异步任务关系时往往力不从心,对此业界还发明了个名词:回调地狱(Callback Hell)。

应该说在前些年,异步编程还是声名狼藉的。不过最近几年,伴随着ReactiveX的发展(Java 语言的实现版本是 RxJava),回调地狱已经被完美解决了,异步编程已经慢慢开始成熟,Java 语言也开始官方支持异步编程:
在 1.8 版本提供了 CompletableFuture,在 Java 9 版本则提供了更加完备的 Flow API,异步编程目前已经完全工业化。因此,学好异步编程还是很有必要的。

CompletableFuture 已经能够满足简单的异步编程需求,如果你对异步编程感兴趣,可以重点关注 RxJava 这个项目,利用 RxJava,即便在 Java 1.6 版本也能享受异步编程的乐趣。


创建采购订单的时候,需要校验一些规则,例如最大金额是和采购员级别相关的。
有利用 CompletableFuture 实现了这个校验的功能,逻辑很简单,首先是从数据库中把相关规则查出来,然后执行规则校验。
觉得实现是否有问题呢?

//采购订单
PurchersOrder po;
CompletableFuture<Boolean> cf = 
  CompletableFuture.supplyAsync(()->{
    //在数据库中查询规则
    return findRuleByJdbc();
  }).thenApply(r -> {
    //规则校验
    return check(po, r);
});
Boolean isOk = cf.join();

首先对于io操作,需要考虑新开线程池 然后对于方法的执行,考虑捕捉异常

1.没有进行异常处理,
2.要指定专门的线程池做数据库查询
3.如果检查和查询都比较耗时,那么应该像之前的对账系统一样,采用生产者和消费者模式,让上一次的检查和下一次的查询并行起来。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/127458.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

堆树和堆排序

一、堆树 1、定义 堆树的定义如下&#xff1a; &#xff08;1&#xff09;堆树是一颗完全二叉树。 &#xff08;2&#xff09;堆树的每一个结点值都大于等于或者小于等于其左右子结点的值。 &#xff08;3&#xff09;堆树中每个结点的子树都是堆树。为什么是大于等于或者小于…

一口气讲透Redis分布式缓存、秒杀 + 思维导图

一、分布式缓存 1、单点Redis的问题 1、数据丢失问题 Redis数据持久化。 2、并发能力问题 大家主从集群&#xff0c;实现读写分离。 3、故障恢复问题 利用Redis哨兵&#xff0c;实现健康检测和自动恢复。 4、存储能力问题 搭建分片集群&#xff0c;利用插槽机制实现动…

【Linux 常用监控指标总结】

1. Linux运维基础采集项 做运维&#xff0c;不怕出问题&#xff0c;怕的是出了问题&#xff0c;抓不到现场&#xff0c;两眼摸黑。所以&#xff0c;依靠强大的监控系统&#xff0c;收集尽可能多的指标&#xff0c;意义重大。但哪些指标才是有意义的呢&#xff0c;本着从实践中…

【JavaScript】定时器详解

文章目录【JavaScript】定时器详解一. 定时器分类二. 定时器的使用三. 案例&#xff1a;实现抽奖效果样式一样式二【JavaScript】定时器详解 一. 定时器分类 延迟定时器 setTimeout(function(){}, 毫秒数)作用&#xff1a;隔一段时间之后执行 间隔定时器 setInterval(functio…

【UE4 第一人称射击游戏】18-添加角色换弹时的动作

上一篇&#xff1a; 【UE4 第一人称射击游戏】17-重写换弹逻辑 本篇效果&#xff1a; 步骤&#xff1a; 1.打开“SWAT_AnimBP”&#xff0c;在动画图表中&#xff0c;添加一个名为“Reloading”的状态 完善过渡规则的连线 双击打开“Reloading”&#xff0c;添加换弹动画 2…

Spring Security 401 问题解决

背景 &#xff1a; 微服务接口调用的时候报错&#xff0c;原来有一个rest服务用的不多&#xff0c;平时用的都是一些基础的服务&#xff0c;然后客户需要我们开放一个外部接口给他们&#xff0c;然后我寻思着就在这里面写接口。然后调用的时候就报如下的错。 后面跟踪源码发现 …

window上完全卸载oracle

Window上彻底卸载oracle 关闭oracle服务-----开始-----&#xff1e;设置-----&#xff1e;控制面板-----&#xff1e;管理工具-----&#xff1e;服务----->停止所有Oracle服务。 卸载软件------开始------所有程序------Oracle - OraDb11g_home1------Oracle 安装产品-----…

传统CV算法——边缘检测

文章目录传统CV算法-边缘检测第一章 概述1. **边缘检测概述**1.1 **认识边缘**1.2 **边缘检测的概念**1.3 **边缘检测的基本方法**1.4 **边缘检测算子的概念**1.5 **常见的边缘检测算子**2. **用梯度算子实现边缘检测的原理**2.1 **认识梯度算子**2.2 **梯度的衡量**2.3 **使用…

3D打印机的调平问题

快打完第五批料了&#xff0c;也算是有一些仅限于PLA以及PLA&#xff0b;耗材心得 3D打印机调平的简易方式有哪些&#xff1f; 在3D打印机中&#xff0c;打印平台作为模型的承载平台&#xff0c;如果有偏差&#xff0c;那么在后期的打印中&#xff0c;必然会导致细节的出现差…

[从零开始]用python制作识图翻译器·一

AlsoEasy-RecognitionTranslator前言需求分析应用场景需求提取需求补充竞品分析QQ识图百度翻译UU翻译器小结功能实现前言 这是我的毕设作品。当时玩了《人狼村之谜》觉得很惊艳&#xff0c;所以想玩一下该社的别的作品&#xff1a;《D.M.L.C.》。但是苦于没有熟肉&#xff0c;自…

vue2中使用VantUI

Vant 是一个轻量、可靠的移动端组件库 目前 Vant 官方提供了 Vue 2 版本、Vue 3 版本和微信小程序版本&#xff0c;并由社区团队维护 React 版本和支付宝小程序版本。 Vant 2 - Mobile UI Components built on Vue 第一步&#xff1a;安装该组件库 在现有项目中使用 Vant 时&a…

Azure 深入浅出[3]: 如何在MS Visio里面画专业的Azure技术架构图?

1.前言 笔者最近在研究Azure&#xff0c;需要画Azure的技术架构图&#xff1b;画Azure架构图的方法很多&#xff0c;有在线的工具&#xff0c;有基于Azure的PPT模板&#xff0c;同时也有笔者这样选择用最传统的微软的工具&#xff1a;MS Visio来绘图。那么在MS Visio里面如何绘…

设计模式六大原则

设计模式六大原则 1.单⼀职责( ⼀个类和⽅法只做⼀件事 ) 不遵守单一职责原则 模拟不同用户观看视频&#xff0c;先一把梭哈&#xff0c;所有用户观看视频的服务全部都写道一块 public class ApiTest {public static void main(String[] args) {VideoUserService service …

干货丨FPGA零基础学习,入门必看!

看到不少同学后台进行提问&#xff1a;FPGA如何入门&#xff1f;怎么学习&#xff1f;其实对于新人来说&#xff0c;FPGA的学习需要了解的东西还是非常多&#xff0c;下面IC修真院就带大家一起来了解一下吧。 FPGA简介 FPGA普遍用于实现数字电路模块&#xff0c;用户可对FPGA…

Codeforces Round #666 (Div. 1) A. Multiples of Length

Problem - A - Codeforces翻译&#xff1a; 您将得到一个由&#x1d45b;个整数组成的数组&#x1d44e;。 你想要让&#x1d44e;的所有元素都等于零&#xff0c;只需执行以下操作三次: 选择一个段&#xff0c;对于这个段中的每一个数字&#xff0c;我们可以给它加上&#…

后疫情时代语音机器人和大数据的发展前景

语音机器人可以通过自动化流程和提供快速、准确的信息来帮助企业降低成本、增加效率。具体来说&#xff0c;企业可以使用语音机器人来完成以下任务&#xff1a; 客户服务&#xff1a;语音机器人可以自动处理客户查询和请求&#xff0c;从而节省人力成本。数据录入&#xff1a;…

Mask R-CNN论文讲解

目录&#xff1a;Mask R-CNN论文理解一、摘要二、介绍三、Mask R-CNN四、RoIAlign五、Network Architecture六、训练一、摘要 论文提出了一个概念上简单、灵活和通用的对象实例分割框架。有效地检测图像中的对象&#xff0c;同时为每个实例生成高质量的分割掩码。 方法被称为…

Python量化交易06——Fama-French三因子模型(Rmt,SMB,HML)

参考书目:深入浅出Python量化交易实战 本次带来的是著名的获得了诺贝尔奖的三因子模型。 因子模型介绍 Fama和French从可以解释股票收益率的众多因素中提取出了三个重要的影响因子&#xff0c;即市场风险溢酬因子、市值因子和账面市值比因子B/M Ratio&#xff0c;仿照CAPM模型…

测试用例能带来什么

通过测试用例&#xff0c;我们都能获得些什么呢? 1、测试团队的质量判断。例如&#xff0c;测试用例的覆盖率。我们只需要去把所有的valid的功能bug去做一个分析&#xff0c;用所有在测试用例覆盖范围之外的bug数/总bug数&#xff0c;就可以作为测试用例覆盖率使用。一个良好的…

【css】深入解析CSS (4)网格布局

设置为display: grid的元素成为一个网格容器&#xff08;grid container&#xff09;。它的子元素则变成网格元素&#xff08;grid items&#xff09;。 1.网格的组成部分&#xff1a; grid-template-columns和grid-template-rows定义了网格轨道 grid-template-columns:1fr 1f…