CompletableFuture-详解使用及源码解析

news2024/11/13 15:13:55

背景

上一篇文章我们看了FutureTask,分析了他的问题,异步编程并不方便。

  • 问题1: FutureTask获取执行结果前,主线程需要通过get()方法一直阻塞等待子线程执行完成call方法,才可以拿到返回结果
  • 问题2:如果不通过get挂起线程,通过while循环,不停的判断任务的状态是否结束,结束后,再拿结果。如果任务长时间没有执行完毕,CPU会一直调度查看任务状态的方法,浪费CPU资源。

CompletableFuture在一定程度上提高了各种异步非阻塞的方案,并且响应式变成,代码编写效果上,效率更高。

1、相关API

1.1 等在前置结果再执行当前任务的API

  • supplyAsync(Supplier supplier) 异步执行任务,有返回结果

  • runAsync(Runnable runnable) 异步执行任务,无返回结果

    在不指定线程池的前提下,两个异步任务都交给ForkJoinPool去执行,而ForkJoinPool内部是守护线程,守护线程是主线程结束后就结束了。

  • thenApply(Function fn) 等待前一个任务结束,拿到前一个方法的结果并处理然后返回结果。使用和前一个任务想同的线程

    thenApplyAsync(Function fn) 和上面的一样,但是使用不同的线程执行

    thenApplyAsync(Function fn, Executor executor) 和上面一样,这里需要自定义线程池

    CompletableFuture中的大部分方法都有三个重载,(不带Async、带Async、带Async和线程池)

  • thenAccept(Consumer action) 等待前一个任务结束,拿到前一个方法的结果并处理,没有返回值

  • thenRun(Runnable action) 等待前一个任务结束,再处理,不需要前面的结果,没有返回值

1.2 等待多个并行任务完成后,并拿到结果再执行当前任务API

  • thenCombine(CompletionStage other, BiFunction fn) 等待前两个并行任务完成后,拿到结果再执行当前任务,并返回结果

    thenCombineAsync(CompletionStage other, BiFunction fn) 等待前两个并行任务完成后,拿到结果,并在新线程执行当前任务,并返回结果

    thenCombineAsync(CompletionStage other, BiFunction fn, Executor executor) 等待前两个并行任务完成后,拿到结果,并在新线程执行当前任务(自定义线程池),并返回结果

  • thenAcceptBoth(CompletionStage other, BiConsumer action) 等待前两个并行任务结束,拿到结果并处理,没有返回值

    thenAcceptBothAsync(CompletionStage other, BiConsumer action)

    thenAcceptBothAsync(CompletionStage other, BiConsumer action, Executor executor)

  • runAfterBoth(CompletionStage other, Runnable action) 让两个并行任务结束,再执行,不需要前面的结果,没有返回值

    runAfterBothAsync

1.3 两个任务一起执行,有一个任务返回结果后,拿到结果就可以处理API

  • applyToEither(CompletionStage other, Function fn) 等待前两个并行任务执行,任意一个有返回结果,拿到结果执行,并返回结果

    Async方法同样如上

  • acceptEither(CompletionStage other, Consumer action) 等待前两个并行任务执行,任意一个有返回结果,拿到结果执行,无返回

  • runAfterEither(CompletionStage other, Runnable action) 等待前两个并行任务执行结束,再执行,无返回

1.4 等前置任务执行完成,再处理,后续返回结果为CompletableStage

  • thenCompose(Function> fn) 等待前置任务执行完成,拿到结果并执行,返回CompletableStage

    thenApply(Function fn) 用Apply执行就够了,等同于Apply

1.5 异常和其他处理

  • exceptionally(Function fn) 异常处理
  • whenComplete(BiConsumer action) 可以拿到上一个任务的返回结果和异常,当前处理不会返回结果
  • handle(BiFunction fn) 可以拿到上一个任务的返回结果和异常,同时当前处理可以返回结果

1.6 总结

上面的是常用API,CompletableFuture这么多API很难记,但是有规律可循:

  • Async结尾的是异步执行的API,通常有带线程池和不带线程池的版本
  • run开头的是无参方法,没有返回值
  • supply开头的是有参方法,有返回值
  • 以Accept开头或者结尾的方法,有参数,没有返回值
  • 以Apply开头或者结尾的方法,有参数,有返回值
  • 带有either后缀的方法,表示谁先完成就消费谁

2、如何使用?

基于上面API做应用案例

2.1 一个简单案例

小连要回家干饭, 小严做饭, 小连看电视, 等小严做完, 小连干饭。

我们把能同时执行的并发执行,必须分开的顺序执行,设计如下:

/** 
  * 小连要回家干饭, 小严做饭, 小连看电视, 等小严做完, 小连干饭 
  * 
  * main线程 小连 
  * 异步线程 小严 
  */ 
public static void main(String[] args) throws ExecutionException, InterruptedException { 
    print("小连回家干饭"); 
    CompletableFuture<String> future= CompletableFuture.supplyAsync(()->{ 
        print("小严做饭"); 
        sleep(2);
        print("小严做完"); 
        return "锅包肉"; 
     }); 
     print("小连看电视"); 
     print("小连干饭,"+future.join()); 
}

输出结果:
可以看到默认线程池是ForkJoinPool

main: 小连回家干饭 
main: 小连看电视 
ForkJoinPool.commonPool-worker-1: 小严做饭 
ForkJoinPool.commonPool-worker-1: 小严做完 
main: 小连干饭,锅包肉

2.2 稍微复杂的案例

小连要回家干饭, 小严炒菜, 小李焖饭, 小连看电视, 等小严小李做完, 小陈端菜和饭给小连, 小连干饭。

我们把能同时执行的并发执行,必须分开的顺序执行,设计如下:

/** 
  * 小连要回家干饭, 小严炒菜, 小李焖饭, 小连看电视, 等小严小李做完, 小陈端菜和饭给小连, 小连干饭   *
  * main线程 小连
  * 异步线程 小严, 小李,小陈
  *
  */ 
public static void main(String[] args) throws ExecutionException, InterruptedException {
      print("小连回家干饭"); 
      CompletableFuture<String> future= CompletableFuture.supplyAsync(()->{ 
          print("小严做菜"); 
          sleep(2); 
          print("小严做完"); 
          return "锅包肉"; 
      }, executor).thenCombineAsync(CompletableFuture.supplyAsync(()->{ 
          print("小李焖饭"); 
          sleep(3); 
          print("饭好了"); 
          return "大米饭"; 
      }, executor), (r1, r2)->{ 
                          print("饭菜好了,小李端菜"); 
                          sleep(1); 
                          return r1+", "+r2; 
                      }, executor); 
      print("小连看电视"); 
      print("小连干饭,"+future.join()); 
      executor.shutdown();
}

执行结果:
这里指定了线程池

main: 小连回家干饭 
pool-1-thread-1: 小严做菜 
pool-1-thread-2: 小李焖饭 
main: 小连看电视 
pool-1-thread-1: 小严做完 
pool-1-thread-2: 饭好了 
pool-1-thread-3: 饭菜好了,小李端菜 
main: 小连干饭,锅包肉, 大米饭

3、源码分析

3.1 runAsync源码

3.1.1 从runAsync进入,来到asyncRunStage方法
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
    if (f == null) throw new NullPointerException();
    //声明当前任务的CompletableFuture对象
    //任务执行和后续任务的触发是两个操作,这里的d是为了触发后续任务的执行
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    //将任务和CompletableFuture封装到一起,作为Async对象
    //将Async交给线程池执行
    e.execute(new AsyncRun(d, f));
    return d;
}
3.1.2 进入new AsyncRun

封装任务和CompletableFuture,作为Async对象,将Async交给线程池执行

static final class AsyncRun extends ForkJoinTask<Void>
    implements Runnable, AsynchronousCompletionTask {
    CompletableFuture<Void> dep; Runnable fn;
    //存储CompletableFuture以及当前任务
    AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
        this.dep = dep; this.fn = fn;
    }

    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) {}
    public final boolean exec() { run(); return false; }

    public void run() {
        CompletableFuture<Void> d; Runnable f;
        //将成员变量做临时存储
        if ((d = dep) != null && (f = fn) != null) {
            // help gc
            dep = null; fn = null;
            //当前任务是否已经有返回结果
            if (d.result == null) {
                try {
                    //线程池异步执行任务
                    f.run();
                    //当前Runnable是没有返回结果的,所以直接封装一个null值
                    d.completeNull();
                } catch (Throwable ex) {
                    d.completeThrowable(ex);
                }
            }
            //执行后续任务
            d.postComplete();
        }
    }
}

3.1.3 completeXXX系列

completeXXX系列方法都差不多,如下,还有completeThrowable等等

//不需要返回值的时候封装null
final boolean completeNull() {
    //CAS设置result值
    return UNSAFE.compareAndSwapObject(this, RESULT, null, NIL);
}
//需要返回值的时候,封装结果
final boolean completeValue(T t) {
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       (t == null) ? NIL : t);
}

3.1.4 postComplete后续任务的触发方式

当前任务执行完毕后,触发的后续处理。即触发后续任务执行。

final void postComplete() {
    /*
     * On each step, variable f holds current dependents to pop
     * and run.  It is extended along only one path at a time,
     * pushing others to avoid unbounded recursion.
     */
     //h 是栈顶
    CompletableFuture<?> f = this; Completion h;
    //f.stack存储后续任务的栈
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        //栈结构中有后续需要处理的任务,进入while循环,没循环一次,h指针会后移。
        //casStack将h后移,栈顶出栈
        if (f.casStack(h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;    // detach
            }
            //执行栈顶任务
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

3.2 thenRun源码

后续任务触发的方式有两种:

  • 一种是基于前置任务执行完毕,执行postComplete方法触发
  • 另一种是后续任务在压栈之前和之后,会尝试执行后续任务,只要前置任务执行结束快,后续任务就可以直接执行,不需要前置任务触发
3.2.1 thenRun系列

几个方法如下,都是走的uniRunStage,同步执行的线程池参数是null。

public CompletableFuture<Void> thenRun(Runnable action) {
    return uniRunStage(null, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action) {
    return uniRunStage(asyncPool, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action,
                                            Executor executor) {
    return uniRunStage(screenExecutor(executor), action);
}
3.2.2 从thenRun进入uniRunStage
//e 线程池执行器,如果是Async异步调用,会传递线程池
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    //如果传递了线程池,代表异步执行,直接走if代码块执行
    //如果没有传递线程池,先执行d.uniRun同步执行,如果d.uniRun返回false,继续向下
    if (e != null || !d.uniRun(this, f, null)) {
        //如果前置任务没有执行完成,那就压栈
        //将线程池、后续任务、前置任务,封装成c
        UniRun<T> c = new UniRun<T>(e, d, this, f);
        //将封装好的c压栈
        //不确保一定压到栈中
        //在这个位置,可能出现前置任务已经执行完毕,导致无法压到栈中
        push(c);
        //尝试执行后续任务
        c.tryFire(SYNC);
    }
    return d;
}

final void push(UniCompletion<?,?> c) {
    if (c != null) {
        //result是前置任务的结果
        //只有前置任务还没有执行完成,才能将封装好的UniRun对象压栈
        while (result == null && !tryPushStack(c))
            lazySetNext(c, null); // clear on failure
    }
}
3.2.3 uniRun方法,参数c==null表示同步执行,否则异步执行(进入claim执行)
//尝试执行后续任务,a是前置任务, f:后续任务
final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {
    Object r; Throwable x;
    //如果前置任务没有执行完,直接走后面的后续任务
    //只看第二个判断,如果前置任务没有执行完成,直接返回false
    if (a == null || (r = a.result) == null || f == null)
        return false;
    //说明前置任务已经完成,要执行后续任务,但是要先判断后续任务执行了么
    if (result == null) {
        //后续任务还没有执行
        //如果前置任务异常结束,那么后续任务不需要执行了
        if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
            completeThrowable(x, r);
        else
        //前置任务正常结束,尝试执行后继任务
            try {
                //c==null,同步执行,否则同步执行c.claim                                     
                if (c != null && !c.claim())
                    //异步执行完毕
                    return false;
                //同步执行
                f.run();
                completeNull(); //已分析,见3.1.3
            } catch (Throwable ex) {
                completeThrowable(ex);//已分析,见3.1.3
            }
    }
    return true;
}


abstract static class UniCompletion<T,V> extends Completion {
    //异步执行任务
    final boolean claim() {
        Executor e = executor;
        //判断当前任务标记,是否执行
        if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
            if (e == null)
            //线程池为null,代表同步执行,直接返回false
                return true;
            //异步执行,使用线程池执行即可
            executor = null; // disable
            e.execute(this);
        }
        return false;
    }
}
3.2.4 UniRun类(和uniRun方法没关系)

这个类将线程池、后续任务、前置任务、后续具体任务(Runable实现)封装,此类继承了UniCompletion

//后续任务执行,以及将前置任务封装成UniRun对象
static final class UniRun<T> extends UniCompletion<T,Void> {
    Runnable fn;
    UniRun(Executor executor, CompletableFuture<Void> dep,
           CompletableFuture<T> src, Runnable fn) {
        super(executor, dep, src); this.fn = fn;
    }
    //dep 后续任务
    //src 前置任务
    //fn 后续具体任务(Runable实现)
    final CompletableFuture<Void> tryFire(int mode) {
        //d 后续任务,  a 前置任务
        CompletableFuture<Void> d; CompletableFuture<T> a;
        if ((d = dep) == null ||
            //mode>0是同步
            !d.uniRun(a = src, fn, mode > 0 ? null : this))
            return null;
        dep = null; src = null; fn = null;
        return d.postFire(a, mode);
    }
}

3.3 整个执行流程

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2152089.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

电竞显示器哪个牌子好

电竞显示器哪个好&#xff1f;你想成为电竞选手吗&#xff1f;显示器很关键&#xff0c;下面我就列举7款市面流行的电竞显示器给大家看看&#xff0c;总有一款适合你。 1.电竞显示器哪个好 - 蚂蚁电竞 ANT255VF电竞显示器 一、产品概述 蚂蚁电竞 ANT255VF电竞显示器是一款专为…

2024/9/21 leetcode 21.合并两个有序链表 2.两数相加

目录 21.合并两个有序链表 题目描述 题目链接 解题思路与代码 2.两数相加 题目描述 题目链接 解题思路与代码 --------------------------------------------------------------------------- 21.合并两个有序链表 题目描述 将两个升序链表合并为一个新的 升序 链表并返…

ChatCADChatCAD+:Towards a Universal and Reliable Interactive CAD using LLMs

ChatCAD&#xff08;论文链接&#xff1a;[2302.07257] ChatCAD: Interactive Computer-Aided Diagnosis on Medical Image using Large Language Models (arxiv.org)&#xff09; 网络流程图&#xff1a; 辅助阅读&#xff1a; 基于大型语言模型的医学图像交互式计算机辅助诊…

【运维自动化-作业平台】如何使用全局变量之字符串类型?

使用变量是脚本很常见的处理场景&#xff0c;作业平台中主要有全局变量和魔法变量两类&#xff0c;全局变量又区分了字符串、命名空间、主机列表、密文、数组5种类型。字符串类型变量 最简单、使用频率最高的全局变量类型&#xff0c;可以跨主机、跨步骤使用。目前在作业平台中…

uniApp微信小程序扫描普通二维码跳转到小程序指定页面操作方法

这篇文章主要给大家介绍了关于微信小程序扫描普通二维码跳转到小程序指定页面操作的相关资料,需要的朋友可以参考下 1、首先我们需要在微信公众平台的开发管理——>开发设置&#xff0c;找到&#xff08;扫普通链接二维码打开小程序&#xff09;&#xff0c;点击添加,根据提…

vue3-05-Element-plus中表单校验:校验对象中的对象的属性,校验对象中的数组中的对象的属性,校验嵌套对象

目录 一、校验对象中的普通属性二、校验对象中对象的属性三、校验对象中的数组中的对象的属性 这两天写vue3项目&#xff0c;用了element-plus库&#xff0c;到了表单规则验证的环节&#xff0c;我发现我只会校验对象中的普通属性&#xff0c;如果校验嵌套对象&#xff0c;我就…

ML 系列:多元线性回归 (MLR)(04)

图 1.多元线性回归与简单线性回归 一、说明 线性回归从一维推广到多维&#xff0c;这与单变量线性回归有很多不同&#xff0c;情况更加复杂&#xff0c;而在梯度优化也需要改成向量梯度&#xff0c;同时&#xff0c;数据预处理也成了必要步骤。 二、综述 多元线性回归是简单线性…

C++:分苹果【排列组合】

描述 把M个同样的苹果放到N个同样的盘子里&#xff0c;允许有的盘子空着不放&#xff0c;问共有多少种不同的分法&#xff1f;&#xff08;用K表示&#xff09;&#xff0c;5&#xff0c;1&#xff0c;1和1&#xff0c;5&#xff0c;1是同一种分法。 输入描述 两个整数M和N&…

C语言 | Leetcode C语言题解之第420题强密码检验器

题目&#xff1a; 题解&#xff1a; #define MAX(a, b) ((a) > (b) ? (a) : (b)) #define MIN(a, b) ((a) < (b) ? (a) : (b))int strongPasswordChecker(char * password) {int n strlen(password);bool has_lower false, has_upper false, has_digit false;for …

YOLOv9改进系列,YOLOv9主干网络替换为RepViT (CVPR 2024,清华提出,独家首发),助力涨点

摘要 轻量级视觉变换器(ViTs)在资源受限的移动设备上表现出优越的性能和较低的延迟,相比之下轻量级卷积神经网络(CNNs)稍显逊色。研究人员发现了许多轻量级 ViTs 和轻量级 CNNs 之间的结构联系。然而,它们在块结构、宏观和微观设计上的显著架构差异尚未得到充分研究。在…

【重磅发布】大模型在金融领域的价值、治理和生态进阶之路白皮书

引言 金融行业天然具备数据和信息密集型的特点,在数字化成熟度方面处于领先地位。此外,金融行业的数字化投入持续稳步增长,汇集了大量具备数字化技能的人才。这些优势使得金融行业在AI技术的应用和创新方面具备独特的条件,能够在推动技术革新和提升行业效率方面起到示范作…

NLP(二)-文本表示

One-hot One-hot&#xff08;独热&#xff09;编码是一种最简单的文本表示方式。如果有一个大小为V的词表&#xff0c;对于第i个词$w_i$&#xff0c;可以用一个长度为V的向量来表示&#xff0c;其中第i个元素为1&#xff0c;其它为0.例如&#xff1a; 减肥&#xff1a;[1, 0,…

59.【C语言】内存函数(memmove函数)

目录 2.memove函数 *简单使用 部分翻译 *模拟实现 方案1 方案2 1.有重叠 dest在src左侧 dest在src右侧 2.无重叠 代码 2.memove函数 *简单使用 memove:memory move cplusplus的介绍 点我跳转 对比第59篇的memcpy函数 对比memmcpy函数的介绍如下区别: 部分翻译 m…

【Verilog学习日常】—牛客网刷题—Verilog快速入门—VL59

根据RTL图编写Verilog程序 描述 根据以下RTL图&#xff0c;使用 Verilog HDL语言编写代码&#xff0c;实现相同的功能&#xff0c;并编写testbench验证功能。 输入描述&#xff1a; clk&#xff1a;系统时钟信号 rst_n&#xff1a;复位信号&#xff0c;低电平有效 data_in…

js 获取树节点上某节点的最底层叶子节点数据

效果图 数据为某一个节点对象 递归代码 function getLeafNodes(node) {if (!node.children || node.children.length 0) {// 如果是叶子节点&#xff0c;返回它的数据return [node.data];}// 如果节点有子节点&#xff0c;递归获取所有叶子节点的数据return node.children.…

基于误差状态的卡尔曼滤波

基于误差状态的卡尔曼滤波ESKF 注意这里的观测方程&#xff0c;是IMU的误差状态和激光定位的差值得到的。

JavaWeb---三层架构

文章目录 1. 为什么需要分层&#xff1f;2.软件设计中的分层模式3.分层4.三层架构&#xff1a;显示层、业务逻辑层、数据访问层3. 案例&#xff1a;利用三层架构原理实现编写web程序的流程 摘自&#xff1a;https://blog.csdn.net/qq_64001795/article/details/124112824 1. 为…

Qt日志输出及QsLog日志库

目录 Qt日志输出及QsLog日志库日志输出格式化日志普通格式化条件格式化环境变量设置格式化日志输出位置日志输出对象信息禁用输出 QsLog日志库使用方法1. 将QsLog目录添加到项目中2. 配置CMakeLists.txt文件3. 配置.pro文件4. 日志记录器的配置5. 运行程序6. 启用行号和文件名C…

Why is OpenAI image generation Api returning 400 bad request in Unity?

题意&#xff1a;为什么 OpenAI 图像生成 API 在 Unity 中返回 400 Bad Request 错误&#xff1f; 问题背景&#xff1a; Im testing out dynamically generating images using OpenAI API in Unity. Amusingly, I actually generated most of this code from chatGPT. 我正在…

idea中.git文件夹存在但是没有git功能列表

1.问题&#xff1a; 该项目中已经将.git文件夹置入了&#xff0c;但是idea中却没有git相关的功能列表&#xff0c;如图&#xff1a; 2.解决办法&#xff1a; 在【文件】-【设置】-【版本控制】-【目录映射】中添加目录映射应用就好了 &#xff08;【File】 -> 【S…