【Java】CompletableFuture 并发顺序调度

news2025/1/11 21:09:03

前言

Java CompletableFuture 提供了一种异步编程的方式,可以在一个线程中执行长时间的任务,而不会堵塞主线程。

和Future相比,CompletableFuture不仅实现了Future接口,也实现了 CompletionStage接口。Future接口不用多说,CompletionStage接口将多个CompletionStage执行顺序依赖给抽象了出来。

有了CompletableFuture接口,就能将多个异步事件的结果进行执行顺序编排。

使用

可数操作

一般使用 CompletableFuture的场景是有一个 a 操作,一个 b操作,还有一个 c 操作依赖 a、b两个操作的返回结果。可以直接使用 allOf()接受一长串的入参,也可以使用thenCombine()针对两个操作的特定情况。


    public static void main(String[] argv) {
        CompletableFuture<String> c1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(second * 20);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "1";
        });
        CompletableFuture<String> c2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(second * 20);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "2";
        });
        CompletableFuture c9 = CompletableFuture.allOf(c1, c2);
        c9.thenApply(v -> {
            try {
                c1.get();
                c2.get();
                System.out.println("Everything is all right");
            } catch(Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("Something error");
            }
            return v;
        });
        c9.join();
    }

可变操作

当想要处理的 CompletableFuture 是可变的,比如说根据数据库查出的数据每个都需要执行一个 CompletableFuture 操作,也就是 n 个 CompletableFuture。

                CompletableFuture<Void> allFuture = CompletableFuture
                .allOf(completableFutureList.toArray(new CompletableFuture[0]));
                CompletableFuture<List<T>> result = allFuture.thenApply(v ->
                        completableFutureList.stream()
                                .map(CompletableFuture::join)
                                .filter(Objects::nonNull)
                                .collect(Collectors.toList()));
                List<T> tList = result.get(50, TimeUnit.SECONDS);

源码实现

CompletableFuture 类成员变量

CompletableFuture中有一个 volatile 关键词修饰的成员变量,resultCompletableFuture.get()函数中的返回的就是这个变量。它会先检查result变量是否为null,不为null则直接返回,为null则会根据是否可中断进行一个while循环等。

在这里插入图片描述

根据使用get() 或者 get(long timeout, TimeUnit unit) 函数的不同,最终等待result结果的函数也不同。get(long timeout, TimeUnit unit)函数会是用 timedGet(long nanos) 函数进行等待。


    /**
     * Waits if necessary for this future to complete, and then
     * returns its result.
     *
     * @return the result value
     * @throws CancellationException if this future was cancelled
     * @throws ExecutionException if this future completed exceptionally
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    public T get() throws InterruptedException, ExecutionException {
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
    }

除了代表结果的 result 之外,还有一个 Completion 类 的变量 stack。从断点执行和代码的注解上看,这个stack代表者从属当前CompletableFuture的操作。当前CompletableFuture操作执行完毕后(result里有结果),会引动其他Completion进行处理。


    /* 
     * A CompletableFuture may have dependent completion actions,
     * collected in a linked stack. It atomically completes by CASing
     * a result field, and then pops off and runs those actions. This
     * applies across normal vs exceptional outcomes, sync vs async
     * actions, binary triggers, and various forms of completions.
     *
    /*	
    

可以通过截图看出 在Idea 内存中有和没有 Completion stack的CompletableFuture相比,有比没有多了 1 dependents的标记

Completion stack里没有东西的CompletableFutureCompletion stack里有东西的CompletableFuture
在这里插入图片描述在这里插入图片描述

CompletableFuture 多个操作组织结构

CompletableFuture类能够通过 CompletableFuture.allOf()或者 CompletableFuture.anyOf()将多个CompletableFuture 对象组合在一起,等到满足条件时,再触发之后操作的执行。

allOf方法为例,CompletableFuture.allOf(CompletableFuture<?>... cfs) 方法会整合作为入参的所有CompletableFuture,等到他们呢所有的都完成之后,才返回结果。


    /* ------------- Arbitrary-arity constructions -------------- */

    /**
     * Returns a new CompletableFuture that is completed when all of
     * the given CompletableFutures complete.  If any of the given
     * CompletableFutures complete exceptionally, then the returned
     * CompletableFuture also does so, with a CompletionException
     * holding this exception as its cause.  Otherwise, the results,
     * if any, of the given CompletableFutures are not reflected in
     * the returned CompletableFuture, but may be obtained by
     * inspecting them individually. If no CompletableFutures are
     * provided, returns a CompletableFuture completed with the value
     * {@code null}.
     *
     * <p>Among the applications of this method is to await completion
     * of a set of independent CompletableFutures before continuing a
     * program, as in: {@code CompletableFuture.allOf(c1, c2,
     * c3).join();}.
     *
     * @param cfs the CompletableFutures
     * @return a new CompletableFuture that is completed when all of the
     * given CompletableFutures complete
     * @throws NullPointerException if the array or any of its elements are
     * {@code null}
     */
    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }

    /** Recursively constructs a tree of completions. */
    static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
                                           int lo, int hi) {
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (lo > hi) // empty
            d.result = NIL;
        else {
            CompletableFuture<?> a, b;
            int mid = (lo + hi) >>> 1;
            if ((a = (lo == mid ? cfs[lo] :
                      andTree(cfs, lo, mid))) == null ||
                (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                      andTree(cfs, mid+1, hi)))  == null)
                throw new NullPointerException();
            if (!d.biRelay(a, b)) {
                BiRelay<?,?> c = new BiRelay<>(d, a, b);
                a.bipush(b, c);
                c.tryFire(SYNC);
            }
        }
        return d;
    }

    /** Pushes completion to this and b unless both done. */
    final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
        if (c != null) {
            Object r;
            while ((r = result) == null && !tryPushStack(c))
                lazySetNext(c, null); // clear on failure
            if (b != null && b != this && b.result == null) {
                Completion q = (r != null) ? c : new CoCompletion(c);
                while (b.result == null && !b.tryPushStack(q))
                    lazySetNext(q, null); // clear on failure
            }
        }
    }   

    /** Returns true if successfully pushed c onto stack. */
    final boolean tryPushStack(Completion c) {
        Completion h = stack;
        lazySetNext(c, h);
        return UNSAFE.compareAndSwapObject(this, STACK, h, c);
    }   
	
	boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
        Object r, s; Throwable x;
        if (a == null || (r = a.result) == null ||
            b == null || (s = b.result) == null)
            return false;
        if (result == null) {
            if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
                completeThrowable(x, r);
            else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null)
                completeThrowable(x, s);
            else
                completeNull();
        }
        return true;
    }

从源码上看是,是将整个CompletableFuture数组通过andTree()方法划分成了一颗二叉树,这个二叉树的叶子节点是传入的CompletableFuture对象,非叶子节点代表了它的子节点CompletableFuture的完成情况。

然后检测根节点的CompletableFuture的两个子节点是否完成。

在这里插入图片描述

cfs1、cfs2、cfs3、cfs4 是allOf的入参,四个CompletableFuture对象。

代码中通过a.bipush(b, c) 将 a、b串在一起。因为涉及到UNSAFE方法,不知道方法具体执行了什么操作。所以只能通过IDEA里内存里实际的值,去由结果推过程。

a.bipush(b,c) 前,内存各个变量实际值。

在这里插入图片描述

a.bipush(b,c) 后,内存各个变量实际值。

在这里插入图片描述

tryPushStack(Completion c) 方法前
在这里插入图片描述

tryPushStack(Completion c) 方法后 可以看到内存中 变量b 对应的内存地址为 75bd9247的 stack被赋值了成为了Completion c。

tryFire(int mode)方法执行前

在这里插入图片描述

可以看到 cfs 除了 cfs1 之外,其他的 cfs 中的 stack都被赋值了。通过观察IDEA中内存中对象实际值,可以发现stack中 的 src 是 自己的树上的兄弟节点, snd 是自己。

在这里插入图片描述

CompletableFuture 多个操作执行顺序控制

CompletableFuture 一个节点要开始执行的前提是他的子节点全部执行完毕之后,才能触发自己节点上的操作。

当调用CompletableFuture 异步执行方法 supplyAsync 会传递一个 Supplier 对象作为入参。这个Supplier 会被封装成为 一个Runnable 子类 AsyncSupply 对象,作为其抽象方法 run 中 执行的一部分。


        CompletableFuture<String> c2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(second * 20);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "2";
        });

---------------------------------------------------------------------------------
    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the {@link ForkJoinPool#commonPool()} with
     * the value obtained by calling the given Supplier.
     *
     * @param supplier a function returning the value to be used
     * to complete the returned CompletableFuture
     * @param <U> the function's return type
     * @return the new CompletableFuture
     */
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

    static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                     Supplier<U> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<U> d = new CompletableFuture<U>();
        e.execute(new AsyncSupply<U>(d, f));
        return d;
    }

-----------------------------------------------------------------------------------
        public void run() {
           // fn 就是 CompletableFuture.supplyAsync 传入的 Supplier 
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
						// 将 Supplier 处理结果赋值给 CompletableFuture 的 result
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                } 
				// Pops and tries to trigger all reachable dependents. Call only when known to be done.
                d.postComplete();
            }
        }

从源码中可以看到,当执行了CompletableFuture.supplyAsync()他的通知机制封装在实现Runnable抽象方法run里。当你传入的Supplier 有结果返回之后,会调用 CompletableFuture 中的 postComplete() 方法,通知 stack中其他可达的 从属 Completion,让他们各自完成自己的 action。


    /**
     * Pops and tries to trigger all reachable dependents.  Call only
     * when known to be done.
     */
    final void postComplete() {
        /*
         * On each step, variable f holds current dependents to pop
         * and run.  It is extended along only one path at a time,
         * pushing others to avoid unbounded recursion.
         */
        CompletableFuture<?> f = this; Completion h;
        while ((h = f.stack) != null ||
               (f != this && (h = (f = this).stack) != null)) {
            CompletableFuture<?> d; Completion t;
            if (f.casStack(h, t = h.next)) {
                if (t != null) {
                    if (f != this) {
                        pushStack(h);
                        continue;
                    }
                    h.next = null;    // detach
                }
				// 将 下一个需要执行的 Completion 弹出来后 执行 tryFire
                f = (d = h.tryFire(NESTED)) == null ? this : d;
            }
        }
    }

    static final class UniApply<T,V> extends UniCompletion<T,V> {
        Function<? super T,? extends V> fn;
        UniApply(Executor executor, CompletableFuture<V> dep,
                 CompletableFuture<T> src,
                 Function<? super T,? extends V> fn) {
            super(executor, dep, src); this.fn = fn;
        }
        final CompletableFuture<V> tryFire(int mode) {
            CompletableFuture<V> d; CompletableFuture<T> a;
            if ((d = dep) == null ||
			    // uniApply 对封装的 Supplier 进行执行
                !d.uniApply(a = src, fn, mode > 0 ? null : this))
                return null;
            dep = null; src = null; fn = null;
            return d.postFire(a, mode);
        }
    }  

    final <S> boolean uniApply(CompletableFuture<S> a,
                               Function<? super S,? extends T> f,
                               UniApply<S,T> c) {
        Object r; Throwable x;
        if (a == null || (r = a.result) == null || f == null)
            return false;
        tryComplete: if (result == null) {
            if (r instanceof AltResult) {
                if ((x = ((AltResult)r).ex) != null) {
                    completeThrowable(x, r);
                    break tryComplete;
                }
                r = null;
            }
            try {
                if (c != null && !c.claim())
                    return false;
                @SuppressWarnings("unchecked") S s = (S) r;
				// 这里实际执行 CompletableFuture 的 Supplier
                completeValue(f.apply(s));
            } catch (Throwable ex) {
                completeThrowable(ex);
            }
        }
        return true;
    }

从Idea里的 栈帧中可以看出来,是由 CompletableFuture 1 执行完后的 postComplete 引发了接下来的CompletableFuture

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/384938.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

react renderProps学习记录

react renderProps学习记录1.引入2.改一下呢3.再改一下呢4.总结一下如何向组件内部动态传入带内容的结构(标签)?children propsrender props1.引入 上代码&#xff1a; import React, { Component } from react import ./index.css export default class Parent extends Com…

【云原生】K8S调度约束

一、调度约束 Kubernetes 是通过 List-Watch&#xff08;监控&#xff09; 的机制进行每个组件的协作&#xff0c;保持数据同步的&#xff0c;每个组件之间的设计实现了解耦。 用户是通过 kubectl 根据配置文件&#xff0c;向 APIServer 发送命令&#xff0c;在 Node 节点上面…

ANR系列(二)——ANR监听方案之SyncBarrier

前言 在项目中经常遇到了手机假死问题&#xff0c;无规律的偶现问题&#xff0c;大量频繁随机操作后&#xff0c;便会出现假死&#xff0c;整个应用无法操作&#xff0c;不会响应事件&#xff0c;会发生各种奇怪的ANR&#xff0c;且trace不固定。而SyncBarrier是其中的罪魁祸首…

RabbitMq(具体怎么用,看这一篇即可)

RabbitMq汇总1.RabbitMq的传统实现方式2.SpringAMQP简化RabbitMq开发2.1 基本消息队列&#xff08;BasicQueue&#xff09;2.2 工作消息队列&#xff08;WorkQueue&#xff09;2.3 发布订阅 -- 广播&#xff08;Fanout&#xff09;2.4 发布订阅 -- 路由&#xff08;Direct&…

2024级浙江大学MBA提面申请流程参考

近年来浙大MBA项目的招生一直都有提前批面试的环节&#xff0c;而且每年在申请政策方面也会做出一些微调&#xff0c;但大的方面不会做调整&#xff0c;2024年MBA提面申请即将开始&#xff0c;对此杭州达立易考教育结合2023年的情况为大家梳理出来基本的申请流程和批次参考&…

【Spring Boot】Spring Boot以Repository方式整合Redis

1 简介 Redis是高性能的NoSQL数据库&#xff0c;经常作为缓存流行于各大互联网架构中。本文将介绍如何在Springboot中整合Spring Data Redis&#xff0c;使用Repository的方式操作。 代码结构如下&#xff1a; 2 整合过程 2.1 安装Redis数据库 为了节省时间&#xff0c;就直…

为什么很多计算机专业大学生毕业后还会参加培训?

基于IT互联网行业越来越卷的现状&#xff0c;就算是科班出身&#xff0c;很多也是达不到用人单位的要求。面对这样的现实情况&#xff0c;有的同学会选择继续深造&#xff0c;比如考个研&#xff0c;去年考研人数457万人次&#xff0c;可见越来越的同学是倾向考研提升学历来达到…

管道。环境变量和常用命令

文章目录管道与文件重定向的区别举例环境变量使用自己编写的程序像命令符一样常用命令grepagwctree . -acutsortmorehistory管道 管道类似于重定向&#xff0c;但是又不太一样&#xff0c;管道可以连接好几个&#xff0c;把第一个的输出当成第二个的输入&#xff0c;第二个的输…

实践Spring5 响应式编程框架WebFlux

WebFlux 以 Reactor 库为基础, 基于异步和事件驱动&#xff0c;可以让我们在不扩充硬件资源的前提下&#xff0c;提升系统的吞吐量和伸缩性。一、什么是 Spring WebFlux了解 WebFlux ,首先了解下什么是 Reactive Streams。Reactive Streams 是 JVM 中面向流的库标准和规范&…

论文推荐:ScoreGrad,基于能量模型的时间序列预测

能量模型&#xff08;Energy-based model&#xff09;是一种以自监督方式执行的生成式模型&#xff0c;近年来受到了很多关注。本文将介绍ScoreGrad&#xff1a;基于连续能量生成模型的多变量概率时间序列预测。如果你对时间序列预测感兴趣&#xff0c;推荐继续阅读本文。 为什…

Qt实现系统桌面目录下文件搜索的GUI:功能一:文件查找与现实

⭐️我叫恒心&#xff0c;一名喜欢书写博客的研究生在读生。 原创不易~转载麻烦注明出处&#xff0c;并告知作者&#xff0c;谢谢&#xff01;&#xff01;&#xff01; 这是一篇近期会不断更新的博客欧~~~ 有什么问题的小伙伴 欢迎留言提问欧。 功能点一&#xff1a;文件查找与…

《MongoDB入门教程》第27篇 创建索引

本文将会介绍 MongoDB 中的索引概念&#xff0c;以及如何利用 createIndex() 方法创建索引。 索引简介 假设存在一本包含介绍各种电影的图书。 如果想要查找一部名为“Pirates of Silicon Valley”的电影&#xff0c;我们需要翻阅每一页&#xff0c;直到发现该电影的介绍为止…

从ChatGPT的技术发展角度解析未来智能化的发展方向

ChatGPT 由人工智能研究实验室 OpenAI 于 2022年11 月 30 日推出。在推出时就带来不小的震动&#xff0c;但真正点燃全民热潮的是&#xff0c;应该是从今年的二月初算起&#xff0c;是全网全平台涵盖各行业领域的舆论盛况。 本文就以ChatGPT为切入点&#xff0c;从技术发展角度…

MQTT协议分析

目录 一、前言 二、MQTT协议概述 概念 基本原理 MQTT协议的结构 MQTT的QoS机制 QoS 0&#xff1a;最多一次传输 QoS 1&#xff1a;至少一次传输 QoS 2&#xff1a;恰好一次传输 三、MQTT的应用场景 四、MQTT的优点和缺点 五、MQTT协议的实现 六、实战体验MQTT …

自己动手写编译器:DFA状态机最小化算法

上一节我们完成了从NFA到DFA的状态机转换&#xff0c;有个问题是状态机并非处于最有状态&#xff1a; 在上图的状态机中&#xff0c;状态6和7其实可以合成一个状态点&#xff0c;本节我们看看如何将这类节点进行合并&#xff0c;使得状态机处于最精简状态(状态4也是终结点&…

KDZD土壤电阻率测试仪

一、简介 KDZD土壤电阻率测试仪专为现场测量接地电阻、土壤电阻率、接地电压、交流电压而精心设计制造的&#xff0c;采用新数字及微处理技术&#xff0c;精密4线法、3线法和简易2线法测量接地电阻&#xff0c;导入FFT(快速傅立叶变换)技术、AFC(自动频率控制)技术&#xff0c;…

基于树莓派4B设计的音视频播放器(从0开始)

一、前言 【1】功能总结 选择树莓派设计一款家庭影院系统,可以播放本地视频、网络视频直播、游戏直播、娱乐直播、本地音乐、网络音乐,当做FM网络收音机。 软件采用Qt设计、播放器引擎采用ffmpeg。 当前的硬件选择的是树莓派4B,烧写官方系统,完成最终的开发。 本篇文章主…

Qt QtCreator 安卓开发环境搭建

踩坑 我的qt是使用在线安装工具安装的&#xff0c;Qt版本使用的是5.15.2&#xff0c;QtCreator版本9.0.2 在网上很多教程都是如下步骤 1.安装qt 2.安装jdk 3.安装android-sdk 4.安装android-ndk 5.配置android设置 例如&#xff1a; https://blog.csdn.net/weixin_51363326/…

【p2p】专利:P2p网络中数据传输的方法、电子设备、装置、网络架构

基于混合CDN的低延时直播P2P技术实践 huya 大佬们的讲座。 而且发表了很多专利: 2018年的,点击直接阅读。 本文是学习笔记 本申请公开了P2P网络中数据传输的方法、电子设备、装置、网络架构,该方法包括步骤:接收服务器发送的数据包,所述数据包由共享资源拆分而成,并由服务…

金山轻维表项目进展自动通知

项目经理作为项目全局把控者&#xff0c;经常要和时间“赛跑”。需要实时了解到目前进展如何&#xff0c;跟进人是那些&#xff1f;哪些事项还未完成&#xff1f;项目整体会不会逾期&#xff1f;特别是在一些大型公司中&#xff0c;优秀的项目经理已经学会使用金山轻维表做项目…