异步编程 - 06 基于JDK中的Future实现异步编程(中)_CompletableFuture源码解析

news2025/1/11 0:23:12

文章目录

  • CompletableFuture 类图结构
  • CompletionStage接口
  • 属性
    • result
    • stack
    • asyncPool
  • 方法
    • `CompletableFuture<Void>runAsync(Runnable runnable)`
    • `CompletableFuture<U> supplyAsync(Supplier<U>supplier)`
    • `CompletableFuture<U> supplyAsync(Supplier<U>supplier,Executor executor)`

在这里插入图片描述


CompletableFuture 类图结构

在这里插入图片描述

CompletionStage接口

CompletableFuture实现了CompletionStage接口 。

  • 1)一个CompletionStage代表着一个异步计算节点,当另外一个CompletionStage计算节点完成后,当前CompletionStage会执行或者计算一个值;一个节点在计算终止时完成,可能反过来触发其他依赖其结果的节点开始计算。

  • 2)一个节点(CompletionStage)的计算执行可以被表述为一个函数、消费者、可执行的Runable(例如使用apply、accept、run方法),

具体取决于这个节点是否需要参数或者产生结果。例如:

stage.thenApply(x -> square(x))//计算平方和
     .thenAccept(x -> System.out.print(x))//输出计算结果
     .thenRun(() -> System.out.println());//然后执行异步任务
  • 3)CompletionStage节点可以使用3种模式来执行:默认执行、默认异步执行(使用async后缀的方法)和用户自定义的线程执行器执行(通过传递一个Executor方式)。

  • 4)一个节点的执行可以通过一到两个节点的执行完成来触发。一个节点依赖的其他节点通常使用then前缀的方法来进行组织。

属性

result

volatile Object result;       // Either the result or boxed AltResult

result字段用来存放任务执行的结果,如果不为null,则标识任务已经执行完成。而计算任务本身也可能需要返回null值,所以使用AltResult(如下代码)来包装计算任务返回null的情况(ex等于null的时候),AltResult也被用来存放当任务执行出现异常时候的异常信息(ex不为null的时候):

static final class AltResult { // See above
    final Throwable ex;        // null only for NIL
    AltResult(Throwable x) { this.ex = x; }
}

stack

 volatile Completion stack;    // Top of Treiber stack of dependent actions

stack字段是当前任务执行完毕后要触发的一系列行为的入口,由于一个任务执行后可以触发多个行为,所以所有行为被组织成一个链表结构,并且使用Treiber stack实现了无锁基于CAS的链式栈,其中stack存放栈顶行为节点,stack是Completion类型的,定义如下所示。

 abstract static class Completion extends ForkJoinTask<Void>
    implements Runnable, AsynchronousCompletionTask {
    volatile Completion next;      // Treiber stack下一个节点
  ...
}

asyncPool

/**
 * Default executor -- ForkJoinPool.commonPool() unless it cannot
 * support parallelism.
 */
private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

asyncPool是用来执行异步任务的线程池,如果支持并发则默认为Fork-JoinPool.commonPool(),否则是ThreadPerTaskExecutor


方法

CompletableFuture<Void>runAsync(Runnable runnable)

该方法返回一个新的CompletableFuture对象,其结果值会在给定的runnable行为使用ForkJoinPool.commonPool()异步执行完毕后被设置为null,代码如下所示。

public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}

如上代码中,默认情况下asyncPool为ForkJoinPool.commonPool(),其中asyncRunStage代码如下所示。

static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
    //1.任务或者行为为null,则抛出NPE异常
    if (f == null) throw new NullPointerException();
    //2.创建一个future对象
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    //3.包装f和d为异步任务后,投递到线程池执行
    e.execute(new AsyncRun(d, f));
    //4.返回创建的future对象
    return d;
}
  • 代码1判断行为是否为null,如果是则抛出异常。

  • 代码2创建一个CompletableFuture对象。

  • 代码3首先创建一个AsyncRun任务,里面保存了创建的future对象和要执行的行为,然后投递到ForkJoinPool.commonPool()线程池执行。

  • 代码4直接返回创建的CompletableFuture对象。

可知runAsync方法会马上返回一个CompletableFuture对象,并且当前线程不会被阻塞;代码3投递AsyncRun任务到线程池后,线程池线程会执行其run方法。

下面我们看看在AsyncRun中是如何执行我们设置的行为,并把结果设置到创建的future对象中的。

static final class AsyncRun extends ForkJoinTask<Void>
       implements Runnable, AsynchronousCompletionTask {
        CompletableFuture<Void> dep; Runnable fn;
        //保存创建的future和要执行的行为
        AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
            this.dep = dep; this.fn = fn;
        }
       ...
        public void run() {
            CompletableFuture<Void> d; Runnable f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                //5.如果future的result等于null,说明任务还没完成
                if (d.result == null) {
                    try {
                        //5.1执行传递的行为
                        f.run();
                        //5.2设置future的结果为null
                        d.completeNull();
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                //6弹出当前future中依赖当前结果的行为并执行
                d.postComplete();
            }
        }
}
  • 这里代码5如果发现future的result不为null,说明当前future还没开始执行,则代码5.1执行我们传递的runnable方法,然后执行代码5.2将future对象的结果设置为null,这时候其他因调用future的get()方法而被阻塞的线程就会从get()处返回null。

  • 当代码6的future任务结束后,看看其stack栈里面是否有依赖其结果的行为,如果有则从栈中弹出来,并执行。

其实上面代码中的runAsync实现可以用我们自己编写的简单代码来模拟。

public static CompletableFuture runAsync(Runnable runnable) {
    CompletableFuture<String> future = new CompletableFuture<String>();

    // 2.开启线程计算任务结果,并设置
    POOL_EXECUTOR.execute(() -> {

        // 2.1模拟任务计算
        try {
            runnable.run();
            future.complete(null);

        } catch (Exception e) {
            future.completeExceptionally(e);
        }
    });

    return future;
}

CompletableFuture<U> supplyAsync(Supplier<U>supplier)

该方法返回一个新的CompletableFuture对象,其结果值为入参supplier行为执行的结果,代码如下所示。

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                             Supplier<U> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<U> d = new CompletableFuture<U>();
    e.execute(new AsyncSupply<U>(d, f));
    return d;
}

如上代码与runAsync类似,不同点在于,其提交到线程池的是AsyncSupply类型的任务,下面我们来看其代码。

static final class AsyncSupply<T> extends ForkJoinTask<Void>
        implements Runnable, AsynchronousCompletionTask {
    CompletableFuture<T> dep; Supplier<T> fn;
    AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
        this.dep = dep; this.fn = fn;
    }

   ...

    public void run() {
        CompletableFuture<T> d; Supplier<T> f;
        if ((d = dep) != null && (f = fn) != null) {
            dep = null; fn = null;
            //1.如果future的result等于null,说明任务还没完成
            if (d.result == null) {
                try {
                    //1.1 f.get()执行行为f的方法,并获取结果
                    //1.2 把f.get()执行结果设置到future对象
                    d.completeValue(f.get());
                } catch (Throwable ex) {
                    d.completeThrowable(ex);
                }
            }
            //2.弹出当前future中依赖当前结果的行为并执行

            d.postComplete();
        }
    }
}

如上代码与runAsync的不同点在于,这里的行为方法是Supplier,其get()方法有返回值,且返回值会被设置到future中,然后调用future的get()方法的线程就会获取到该值。


CompletableFuture<U> supplyAsync(Supplier<U>supplier,Executor executor)

该方法返回一个新的CompletableFuture对象,其结果值为入参supplier行为执行的结果,需要注意的是,supplier行为的执行不再是ForkJoinPool.commonPool(),而是业务自己传递的executor,其代码如下所示。

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }
static Executor screenExecutor(Executor e) {
    //如果使用commonpool并且传递的e本身就是commonpool
    if (!useCommonPool && e == ForkJoinPool.commonPool())
        return asyncPool;
    //如果传递的线程池为null,则抛出NPE异常
    if (e == null) throw new NullPointerException();

    //返回业务传递的线程池e
    return e;
}

如上代码通过使用screenExecutor方法来判断传入的线程池是否是一个可用的线程池,如果不是则抛出异常。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/984677.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【java】【SSM框架系列】【一】Spring

目录 一、简介 1.1 为什么学 1.2 学什么 1.3 怎么学 1.4 初识Spring 1.5 Spring发展史 1.6 Spring Framework系统架构图 1.7 Spring Framework学习线路 二、核心概念&#xff08;IoC/DI&#xff0c;IoC容器&#xff0c;Bean&#xff09; 2.1 概念 2.2 IoC入门案例 …

TVC广告片制作成本多少

电视是广告传播的主要媒介之一&#xff0c;具有广泛的受众群体和较高的覆盖率。通过在电视上播放广告片&#xff0c;企业可以将产品或者服务的信息传达给大量潜在客户&#xff0c;提高知名度和曝光度。接下来由深圳TVC广告片制作公司老友记小编从以下几个方面浅析制作一条TVC广…

SAP PI 配置SSL链接接口报错问题处理Peer certificate rejected by ChainVerifier

出现这种情况一般无非是没有正确导入证书或者证书过期的情况 第一种&#xff0c;如果没有导入证书的话&#xff0c;需要在NWA中的证书与验证-》CAs中导入管理员提供的证书&#xff0c;这里需要注意的是&#xff0c;需要导入完整的证书链。 第二种如果是证书过期的&#xff0c…

Java + Selenium + Appium自动化测试

一、启动测试机或者Android模拟器&#xff08;Genymotion俗称世界上最快的模拟器&#xff0c;可自行百度安装&#xff09; 二、启动Appium&#xff08;Appium环境安装可自行百度&#xff09; 三、安装应用到Genymotion上&#xff0c;如下图我安装一个计算机的小应用&#xff…

vit模型

AN IMAGE IS WORTH 16X16 WORDS:TRANSFORMERS FOR IMAGE RECOGNITION AT SCALE 1、问题2、模型结构 1、问题 在视觉方面&#xff0c;注意力要么与卷积网络结合使用&#xff0c;要么用于替代卷积网络的某些组件&#xff0c;同时保持其整体结构不变。 我们证明了这种对CNNs的依赖…

【C#】关于Array.Copy 和 GC

关于Array.Copy 和 GC //一个简单的 数组copy 什么情况下会触发GC呢[ReliabilityContract(Consistency.MayCorruptInstance, Cer.MayFail)]public static void Copy(Array sourceArray,long sourceIndex,Array destinationArray,long destinationIndex,long length);当源和目…

微服务·架构组件之网关- Spring Cloud Gateway

微服务架构组件之网关- Spring Cloud Gateway 引言 微服务架构已成为构建现代化应用程序的关键范式之一&#xff0c;它将应用程序拆分成多个小型、可独立部署的服务。Spring Cloud Gateway是Spring Cloud生态系统中的一个关键组件&#xff0c;用于构建和管理微服务架构中的网…

芯片方案应用于终端产品时需要哪些技术支持和保障?

在芯片方案应用于终端产品时&#xff0c;客户可能会遇到三大类问题&#xff1a;一是芯片本身的质量缺陷&#xff1b;二是芯片与终端系统软硬件联合调试及验证&#xff1b;三是终端生产。 接下来&#xff0c;小编简短介绍启英泰伦是如何全方位支持客户项目&#xff0c;保障客户…

mac m1 代码调用 Stable Diffusion

from diffusers import DiffusionPipeline import torchpipe DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5") pipe pipe.to("mps") pipe.enable_attention_slicing()prompt "便利店开业" _ pipe(prompt, num_inferen…

分享一个基于SpringBoot+Vue的房屋在线装修预约系统源码

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人七年开发经验&#xff0c;擅长Java、Python、PHP、.NET、微信小程序、爬虫、大数据等&#xff0c;大家有这一块的问题可以一起交流&#xff01; &#x1f495;&…

python读取.txt文件中某些关键字后面的内容 并根据该数据画图

感谢一下悦姐帮忙 import re#先把文件读进来&#xff0c;用read读入的是字符串&#xff0c;readlines是list with open(resok.txt) as f:txt f.read()dataset r5low:.*|5mix:.*|5normal:.* para rMAE: (.{6})#意思是MAE&#xff1a; 后面的六个东西 row_data re.findall(d…

6个免费图片素材库,高清无水印、无版权

推荐6个免费高清图片素材库&#xff0c;商用也可以&#xff0c;无需担心版权问题&#xff0c;收藏走一波~ 1、菜鸟图库 https://www.sucai999.com/pic.html?vNTYwNDUx 网站主要为新手设计师提供免费素材&#xff0c;这些素材的质量都很高&#xff0c;类别也很多&#xff0c;…

每日一练 | 网络工程师软考真题Day31

阅读以下说明&#xff0c;答复以下【问题1】至【问题7】 【说明】 某网络拓扑结构如图3-1所示。网络A中的DNS_Server1和网络B中的DNS_Server2分别安装有Windows Server 2003并启用了DNS效劳。DNS_Server1中安装有IIS6.0&#xff0c;建立了一个域名为 abc 的Web站点。 图3-1 【…

pytorch再次学习

目录 数据可视化切换设备device定义类打印每层的参数大小自动微分计算梯度禁用梯度追踪优化模型参数 模型保存模型加载 数据可视化 import torch from torch.utils.data import Dataset from torchvision import datasets from torchvision.transforms import ToTensor import…

Nginx中实现自签名SSL证书生成与配置

文章目录 一.相关介绍1.生成步骤2.相关名词介绍 二.Nginx中实现自签名SSL证书生成与配置1.私钥生成2.公钥生成3.生成解密的私钥key4.签名生成证书5.配置证书并验证6.登录 一.相关介绍 1.生成步骤 &#xff08;1&#xff09;生成私钥&#xff08;Private Key&#xff09;&…

elementUI——el-table自带排序使用问题

问题 排序表格默认第一列按降序排&#xff08;状态1&#xff09;&#xff0c;当点击其他列后&#xff08;状态2&#xff09;&#xff0c;改变日期&#xff0c;触发表格数据更新&#xff0c;发现列的排序还点亮在之前的操作上&#xff0c;没有按照默认来&#xff08;回到状态1&a…

运筹系列85:求解大规模tsp问题的julia代码

1. 大规模tsp问题的挑战 数学模型和精确解法见《运筹系列65&#xff1a;TSP问题的精确求解法概述》和《运筹系列80:使用Julia精确求解tsp问题》&#xff1a; variable(m, x[1:n,1:n], Bin,Symmetric) # 0-1约束 objective(model, Min, sum(x.*distmat)/2) constraint(model, …

Linux——线程详解(一)

索引 初识线程1.inux下的线程2.再谈进程3.理解页表4. 再次理解虚拟到物理的转化 线程的控制1.线程的创建2.线程异常3.验证pthread_join 的第二个参数4.线程的退出方式5. 线程的公有和私有6.pthread_t 与线程独立栈7.线程的局部性存储8.线程分离 初识线程 1.inux下的线程 之前了…

通过RTSP协议接入RTSP流媒体服务器EasyNVR视频监控汇聚平台的设备显示离线是什么原因?

EasyNVR安防视频云服务是基于RTSP/Onvif协议接入的视频平台&#xff0c;可支持将接入的视频流进行全平台、全终端的分发&#xff0c;分发的视频流包括RTSP、RTMP、HTTP-FLV、WS-FLV、HLS、WebRTC等。平台丰富灵活的视频能力&#xff0c;可应用在智慧校园、智慧工厂、智慧水利等…

028:vue上传解析excel文件,列表中输出内容

第028个 查看专栏目录: VUE ------ element UI 专栏目标 在vue和element UI联合技术栈的操控下&#xff0c;本专栏提供行之有效的源代码示例和信息点介绍&#xff0c;做到灵活运用。 &#xff08;1&#xff09;提供vue2的一些基本操作&#xff1a;安装、引用&#xff0c;模板使…