深度解析CompletableFuture:Java 异步世界的奇迹

news2025/1/23 9:27:01

目录

概述

介绍

上文我们可知:CompletableFuture 是 Java 8 引入用于支持异步编程和非阻塞操作的类。对于没有使用过CompletableFuture通过它这么长的名字就感觉到一头雾水,那么现在我们来一起解读一下它的名字。

  • Completable:可完成
  • Future:未来/将来

这两个单词体现了它设计的目的:提供一种可完成的异步计算。

身世

接下来我将详细介绍CompletableFuture的实现。

Future接口

CompletableFuture实现自JDK 5出现的Future接口,该接口属于java.util.concurrent包,这个包提供了用于并发编程的一些基础设施,其中就包括 Future 接口。Future接口的目的是表示异步计算的结果,它允许你提交一个任务给一个 Executor(执行器),并在稍后获取任务的结果。尽管 Future 提供了一种机制来检查任务是否完成、等待任务完成,并获取其结果,但它的设计也有一些局限性,比如无法取消任务、无法组合多个任务的结果等。

Future接口为CompletableFuture提供了以下功能:

  1. 异步任务的提交:通过Future的接口,可以提交异步任务,并在稍后获取任务的结果,这是 Future 接口最基本的功能之一。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
  1. 检查任务完成状态: 使用 isDone 方法可以检查任务是否已经完成。
boolean isDone = future.isDone();
  1. 等待任 务完成: 通过get方法,阻塞当前线程,直到异步任务完成并获取其结果。
System.out.println("main Thread");
//开启异步线程
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
//阻塞异步线程执行完成
String result = future.get();
  1. 取消任务: 通过 cancel 方法,你可以尝试取消异步任务的执行。这是 Future 接口的一项功能,但在实际使用中,由于限制和不确定性,这个方法并不总是能够成功取消任务。
boolean canceled = future.cancel(true);

CompletionStage接口

CompletableFuture同时也实现自CompletionStage接口,CompletionStage 接口是 Java 8 中引入的,在CompletableFuture中用于表示一个步骤,这个步骤可能是由另外一个CompletionStage触发的,随当前步骤的完成,可以触发其他CompletionStage的执行。CompletableFuture 类实现了 CompletionStage 接口,因此继承了这些功能。以下是 CompletionStageCompletableFuture 提供的一些关键功能:

  1. 链式操作:CompletionStage 定义了一系列方法,如 thenApply, thenAccept, thenRun,允许你在一个异步操作完成后,基于其结果进行进一步的操作。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Integer> lengthFuture = future.thenApply(String::length);
  1. 组合多个阶段CompletionStage 提供了 thenCombine, thenCompose, thenAcceptBoth 等方法,用于组合多个阶段的结果,形成新的 CompletionStage
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
  1. 异常处理CompletionStage 提供了一系列处理异常的方法,如 exceptionally, handle,用于在异步计算过程中处理异常情况。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 抛出异常
    throw new RuntimeException("Some error");
});

CompletableFuture<String> resultFuture = future.exceptionally(ex -> "Handled Exception: " + ex.getMessage());
  1. 顺序执行thenApply, thenAccept, thenRun 等方法可以用于在上一个阶段完成后执行下一个阶段,形成顺序执行的链式操作。

图片来源于美团技术

CompletableFuture原理与实践-外卖商家端API的异步化


CompletableFuture-tryFire

tryFire 方法是 CompletableFuture 内部的一个关键方法,用于尝试触发异步操作链中的下一个阶段。这个方法的主要作用是在合适的时机执行异步操作链中的后续阶段,将计算结果传递给下一个阶段。

为什么先介绍这个方法呢?因为这个方法的大部分API都是基于该方法的基础上实现的。

abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
    volatile Completion next;

    final void tryFire(int mode) {
        // ... (其他逻辑)

        // 触发下一个阶段
        Completion n;
        if ((n = next) != null)
            n.tryFire(SYNC);
    }

    // ... (其他方法)
}
  1. 触发方式( mode ):
    • tryFire 方法接收一个 mode 参数,表示触发的方式。常见的触发方式包括同步触发(SYNC)、异步触发(ASYNC)以及嵌套触发(NESTED)。
  1. 触发下一个阶段:
    • tryFire 方法中,通过 next 字段获取下一个阶段的引用,然后调用下一个阶段的 tryFire 方法,将当前阶段的计算结果传递给下一个阶段。
  1. 递归触发:
    • tryFire 方法可能会递归调用下一个阶段的 tryFire 方法,以确保整个异步操作链中的阶段能够依次触发。这个递归调用保证了异步操作链的串联执行。
  1. 触发逻辑的条件判断:
    • tryFire 方法中通常还包含一些条件判断,用于确定是否应该触发后续的操作。例如,可能会检查当前阶段的状态,如果满足触发条件,则继续触发。

总体而言,tryFire 方法是 CompletableFuture 异步操作链中触发后续阶段的核心方法。通过递归调用,它实现了异步操作链的顺序执行,确保了各个阶段按照期望的顺序执行,并将计算结果传递给下一个阶段。

CompletableFuture结构

字段和常量定义

字段定义
  • result:存储异步计算的结果
  • stack:存储观察者链
  • NEXT:异步调用链中观察者链的管理
常量定义
// Modes for Completion.tryFire. Signedness matters.
static final int SYNC   =  0;
static final int ASYNC  =  1;
static final int NESTED = -1;

这三个变量用于Completion类中tryFire方法的标志,表示不同的触发模式。

  • SYNC:表示同步触发(默认触发方式),即当前计算完成后直接执行后续的操作。适用于当前计算的结果已经准备好并且可以直接进行下一步操作的情况。
  • AYSNC:表示异步触发,当前计算完成后将后续的操作提交到异步线程池中执行。即当前计算完成后将后续的操作提交到异步线程池中执行。适用于需要在不同线程上执行后续操作的情况。
  • NESTED:嵌套触发,通常表示当前阶段的触发是由另一个阶段触发的,因此无需再次触发后续操作。在某些情况下,可能会避免重复触发。

内部类定义

CompletableFuture 类包含多个内部类,这些内部类用于为CompletableFuture提供不同的API而设计的,用于异步编程中的不同阶段和操作。

常用内部类列举:

  1. UniCompletionBiCompletion
    • UniCompletionBiCompletion 是用于表示异步操作链中的单一阶段和二元阶段的基础抽象类。它们提供了一些通用的方法和字段,用于处理阶段之间的关系,尤其是观察者链的构建和触发。
  1. UniApplyUniAcceptUniRun
    • UniApplyUniAcceptUniRunUniCompletion 的具体子类,分别用于表示异步操作链中的 thenApplythenAcceptthenRun 阶段。它们实现了具体的 tryFire 方法,用于触发阶段的执行。
  1. BiApplyBiAcceptBiRun
    • BiApplyBiAcceptBiRunBiCompletion 的具体子类,分别用于表示异步操作链中的 thenCombinethenAcceptBothrunAfterBoth 阶段。它们同样实现了具体的 tryFire 方法。
  1. OrApplyOrAcceptOrRun
    • OrApplyOrAcceptOrRunBiCompletion 的另一组具体子类,用于表示异步操作链中的 applyToEitheracceptEitherrunAfterEither 阶段。同样,它们实现了具体的 tryFire 方法。
  1. Async
    • AsyncCompletableFuture 内部用于表示异步操作的标志类,用于表示某个阶段需要异步执行。例如,在调用 supplyAsyncrunAsync 等方法时,会生成一个带有 Async 标志的阶段。

异步编程模型

状态转换

volatile Object result;       // Either the result or boxed AltResult
volatile Completion stack;    // Top of Treiber stack of dependent actions

CompletableFuture中定义了两个属性:result、stack,result用于表示执行的结果或异常,stack用于表示执行完当前任务后触发的其他步骤。

CompletableFuture中包含两个字段:resultstack。result用于存储当前CF的结果,stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈(Treiber stack)的形式存储,stack表示栈顶元素。

图10 CF基本结构

这种方式类似“观察者模式”,依赖动作(Dependency Action)都封装在一个单独Completion子类中。下面是Completion类关系结构图。CompletableFuture中的每个方法都对应了图中的一个Completion的子类,Completion本身是观察者的基类。

  • UniCompletion继承了Completion,是一元依赖的基类,例如thenApply的实现类UniApply就继承自UniCompletion。
  • BiCompletion继承了UniCompletion,是二元依赖的基类,同时也是多元依赖的基类。例如thenCombine的实现类BiRelay就继承自BiCompletion。

引用自美团技术。

CompletableFuture 中,Completion 对象表示当前的异步操作,它是被观察者。stack 中存储的是后续的步骤对象,这些对象充当观察者的角色。当当前的异步操作执行完成后,会通知 stack 中的观察者获取执行结果。

这种设计允许异步操作的串联,每个步骤都对应一个 Completion 对象,形成了观察者链。当一个异步操作完成时,它会逐一触发 stack 中的观察者对象执行相应的回调函数,实现了链式的异步操作。这个机制是 CompletableFuture 强大异步编程模型的核心之一。

为印证以上结论,我们来看个例子,追踪下源码:

例子:

CompletableFuture<String> originalFuture = CompletableFuture.supplyAsync(() -> "Hello");

//thenAccept方法构造Completable
CompletableFuture<Void> thenAcceptFuture = originalFuture.thenAccept(result -> {
    System.out.println("Result: " + result);
});

以JDK 11为例

源码:

CompletableFuturethenAccept方法中直接调用了uniAcceptStage方法,该方法入参是线程池对象和JDK 8出现的函数式接口Consumer,即上文中的result -> {System.out.println("Result: " + result);}),这段代码的作用是获取到上一阶段的计算结果后,将计算结果传递给消费者操作f,在thenAccept方法中将f转换成一个新的CompletableFuture,将uniAccept推入观察者链中,来表示一个新的thenAccept阶段。

private CompletableFuture<Void> uniAcceptStage(Executor e,
                                               Consumer<? super T> f) {
    if (f == null) throw new NullPointerException();
    Object r;
    if ((r = result) != null)
        return uniAcceptNow(r, e, f);
    CompletableFuture<Void> d = newIncompleteFuture();
    unipush(new UniAccept<T>(e, d, this, f));
    return d;
}

以下代码是将给定的Completion对象推入观察者链:

/**
 * Pushes the given completion unless it completes while trying.
 * Caller should first check that result is null.
 */
final void unipush(Completion c) {
    if (c != null) {
        //尝试将Completion对象c推入观察者链,如果返回false,
        //说明推入的过程中观察者链发生了变化,可能有其他线程正在修改观察者链,
        //这种情况下,通过循环尝试
        while (!tryPushStack(c)) {
            //result对象不为空,表示当前CompletableFuture对象已完成,计算结果已存在
            if (result != null) {
                NEXT.set(c, null);
                break;
            }
        }
        if (result != null)
            c.tryFire(SYNC);
    }
}


/** Returns true if successfully pushed c onto stack. */
final boolean tryPushStack(Completion c) {
    Completion h = stack;
    NEXT.set(c, h);         // CAS piggyback
    return STACK.compareAndSet(this, h, c);
}

前提:判断观察者链是否被其他线程修改是通过被保持线程可见性的类、关键字修饰的。JDK 8使用的是volatile关键字实现简单的变量的原子性和线程可见性。在JDK 11中的CompletableFuture使用的是VarHandle类型定义。

// VarHandle mechanics
private static final VarHandle RESULT;
private static final VarHandle STACK;
private static final VarHandle NEXT;

CompletableFuture线程池

CompletableFuture 类在执行异步操作时,默认使用 ForkJoinPool.commonPool() 作为线程池。这是一个共享的线程池,通常是一个守护线程池,适用于执行异步任务。该线程池的特性包括自动管理线程数量、支持工作窃取(work-stealing)等。

如果你想要使用自定义的线程池,可以通过传递 Executor 对象作为参数来创建 CompletableFuture 实例。

public CompletableFuture<Void> thenRunAsync(Runnable action) {
    return uniRunStage(defaultExecutor(), action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action,
                                            Executor executor) {
    return uniRunStage(screenExecutor(executor), action);
}

默认线程池

Executor executor = Executors.newFixedThreadPool(10); // 创建一个固定大小为10的线程池
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // 异步任务的执行逻辑
}, executor);

以上是使用默认线程池的相关代码逻辑,我们来看一下源码:

public Executor defaultExecutor() {
    return ASYNC_POOL;
}
    /**
     * Default executor -- ForkJoinPool.commonPool() unless it cannot
     * support parallelism.
     */
    private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

ASYNC_POOL中使用了默认的ForkJoinPool去开启一个线程池。

自定义线程池

CompletableFuture中提供了使用自定义线程池的方法,方法中需要传入一个线程池的接口对象,那么我们就可以传入任何一个实现自Executor接口的线程池。

以下是基于Spring Framework中的线程池实现的异步操作:

@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(30);
        executor.setThreadNamePrefix("Async-");
        executor.initialize();
        return executor;
    }
}
@Service
public class MyAsyncService {

    @Autowired
    private TaskExecutor taskExecutor;

    @Async
    public CompletableFuture<String> performAsyncTask() {
        // 异步任务的执行逻辑
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Async Task Completed";
        }, taskExecutor);
    }
}

并发控制

CompletableFuture 默认使用共享线程池: ForkJoinPool.commonPool() 作为线程池,通过工作窃取算法提高了任务的并行度,同时使用VarHandlevolatile来保证线程间的可见性和原子操作,以上保证了线程安全和高可用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1199732.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Redis】Redis-Key的使用

上一篇&#xff1a; redis-server和redis-cli https://blog.csdn.net/m0_67930426/article/details/134361885?spm1001.2014.3001.5501 官网 命令 |雷迪斯 (redis.io) 设置key set name xxxxx 查看key keys * 再设置一个key并且查看 这里查看了两个key&#xff08;name a…

Git之分支与版本

&#x1f3ac; 艳艳耶✌️&#xff1a;个人主页 &#x1f525; 个人专栏 &#xff1a;《Spring与Mybatis集成整合》《Vue.js使用》 ⛺️ 越努力 &#xff0c;越幸运。 1.开发测试上线git的使用 1.1. 环境讲述 当软件从开发到正式环境部署的过程中&#xff0c;不同环境的作用…

《015.SpringBoot+vue之音乐网》【前后端分离】

《015.SpringBootvue之音乐网》【前后端分离】 项目简介 [1]本系统涉及到的技术主要如下&#xff1a; 推荐环境配置&#xff1a;DEA jdk1.8 Maven MySQL 前后端分离; 后台&#xff1a;SpringBootMybatisMySQL; 前台&#xff1a;Vue3.0 TypeScript Vue-Router Vuex Axios …

Globalsign证书

GlobalSign是全球可信的数字证书提供商之一&#xff0c;提供广泛的证书服务&#xff0c;包括SSL证书、代码签名证书、电子邮件证书等&#xff0c;帮助保护企业和个人的网络安全。本文将详细介绍GlobalSign证书的特点和优势&#xff0c;帮助您更好地了解这一重要的数字证书提供商…

高防CDN:护航网络安全的卓越之选

在当今数字化时代&#xff0c;网络攻击与日俱增&#xff0c;为了确保网站和应用程序的稳定运行&#xff0c;高防CDN&#xff08;高防御内容分发网络&#xff09;应运而生。选择高防CDN的理由不仅源于其强大的防护性能&#xff0c;还体现了其与硬件防火墙异曲同工的奥妙。 选择高…

MYSQL字符串函数详解和实战(字符串函数大全,内含示例)

MySQL提供了许多字符串函数&#xff0c;用于处理和操作字符串数据。以下是一些常用的MYSQL字符串函数。 建议收藏以备后续用到查阅参考。 目录 一、CONCAT 拼接字符串 二、CONCAT_WS 拼接字符串 三、SUBSTR 取子字符串 四、SUBSTRING 取子字符串 五、SUBSTRING_INDEX 取子…

Git的原理与使用(一)

目录 Git初始 Git安装 Git基本操作 创建git本地仓库 配置git 工作区,暂存区,版本库 添加文件,提交文件 查看.git文件 修改文件 版本回退 小结 Git初始 git是一个非常强大的版本控制工具.可以快速的将我们的文档和代码等进行版本管理. 下面这个实例看理解下为什么需…

eocc1_Findings_candlestick_ohlc_volume_

An Unusually Tall Candle Often Has a Minor High or Minor Low Occurring within One Day of It异常高的蜡烛通常会在一天内出现小幅高点或小幅低点 I looked at tens of thousands of candles to prove this, and the study details are on my web site, ThePatternSite.com…

软件工程——名词解释

适用多种类型的软件工程教材&#xff0c;有关名词释义的总结较为齐全~ 目录 1. 软件 2. 软件危机 3. 软件工程 4. 软件生存周期 5. 软件复用 6. 质量 7. 质量策划 8. 质量改进 9. 质量控制 10. 质量保证 11. 软件质量 12. 正式技术复审 13. ISO 14. ISO9000 15.…

SpringBoot系列-2 自动装配

背景&#xff1a; Spring提供了IOC机制&#xff0c;基于此我们可以通过XML或者注解配置&#xff0c;将三方件注册到IOC中。问题是每个三方件都需要经过手动导入依赖、配置属性、注册IOC&#xff0c;比较繁琐。 基于"约定优于配置"原则的自动装配机制为该问题提供了一…

macOS使用conda初体会

最近在扫盲测序的一些知识 其中需要安装一些软件进行练习&#xff0c;如质控的fastqc&#xff0c;然后需要用conda来配置环境变量和安装软件。记录一下方便后续查阅学习 1.安装miniconda 由于我的电脑之前已经安装了brew&#xff0c;所以我就直接用brew安装了 brew install …

【yolov5】onnx的INT8量化engine

GitHub上有大佬写好代码&#xff0c;理论上直接克隆仓库里下来使用 git clone https://github.com/Wulingtian/yolov5_tensorrt_int8_tools.git 然后在yolov5_tensorrt_int8_tools的convert_trt_quant.py 修改如下参数 BATCH_SIZE 模型量化一次输入多少张图片 BATCH 模型量化…

Technology Strategy Patterns 学习笔记8- Communicating the Strategy-Decks(ppt模板)

1 Ghost Deck/Blank Deck 1.1 It’s a special way of making an initial deck that has a certain purpose 1.2 you’re making sure you have figured out what all the important shots are before incurring the major expense of shooting them 1.3 需要从技术、战略、产…

2023 年最新企业微信官方会话机器人开发详细教程(更新中)

目标是开发一个简易机器人&#xff0c;能接收消息并作出回复。 获取企业 ID 企业信息页面链接地址&#xff1a;https://work.weixin.qq.com/wework_admin/frame#profile 自建企业微信机器人 配置机器人应用详情 功能配置 接收消息服务器配置 配置消息服务器配置 配置环境变量…

[01]汇川IMC30G-E系列运动控制卡应用笔记

简介 IMC30G-E系列产品是汇川技术自主研制的高性能EtherCAT网络型运动控制器&#xff08;卡&#xff09;&#xff0c;同时兼容脉冲轴的控制&#xff1b;IMC30G-E支持点位/JOG、插补、多轴同步、高速位置比较输出、PWM等全面的运动控制功能&#xff0c;具备高同步控制精度。 开发…

OpenWRT浅尝 / 基于RAVPower-WD009便携路由文件宝的旁路网关配置

目录 前言需求分析手头的设备家庭网络拓扑图旁路网关配置OpenWRT固件选择OpenWRT固件刷入旁路网关配置流程 旁路网关的使用前置工作日常存储/关键备份内网穿透24小时待命下载器 前言 近期由于个人需求&#xff0c;需要一台OpenWRT设备实现一些功能。所以本文主要还是为了自己后…

k8s-实验部署 1

1、k8s集群部署 更改所有主机名称和解析 开启四台实验主机&#xff0c;k8s1 仓库&#xff1b;k8s2 集群控制节点&#xff1b; k8s3 和k8s4集群工作节点&#xff1b; 集群环境初始化 使用k8s1作为仓库&#xff0c;将所有的镜像都保存在本地&#xff0c;不要将集群从外部走 仓库…

金和OA jc6 任意文件上传漏洞复现

0x01 产品简介 金和OA协同办公管理系统软件&#xff08;简称金和OA&#xff09;&#xff0c;本着简单、适用、高效的原则&#xff0c;贴合企事业单位的实际需求&#xff0c;实行通用化、标准化、智能化、人性化的产品设计&#xff0c;充分体现企事业单位规范管理、提高办公效率…

学习率范围测试(LR Finder)脚本

简介 深度学习中的学习率是模型训练中至关重要的超参数之一。合适的学习率可以加速模型的收敛&#xff0c;提高训练效率&#xff0c;而不恰当的学习率可能导致训练过慢或者无法收敛。为了找到合适的学习率&#xff0c;LR Finder成为了一种强大的工具。 学习率范围测试&#x…

Django的ORM操作

文章目录 1.ORM操作1.1 表结构1.1.1 常见字段和参数1.1.2 表关系 2.ORM2.1 基本操作2.2 连接数据库2.3 基础增删改查2.3.1 增加2.3.2 查找2.3.4 删除2.3.4 修改 1.ORM操作 orm&#xff0c;关系对象映射&#xff0c;本质翻译的。 1.1 表结构 实现&#xff1a;创建表、修改表、…