CompletableFuture的基本使用和原理

news2024/9/30 13:20:49

CompletableFuture

CompletableFuture是对Future的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富的扩展,完美弥补了Future的局限性,同时CompletableFuture实现了对任务编排的能力。借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。

CompletableFuture实现了Future和CompletionStage两个接口在这里插入图片描述

  • 通过Future同步等待执行结果
  • CompletionStage,增强异步回调的功能。

将CompletableFuture当作简单的Future来使用

可以用一个无参数构造函数创建这个类的实例来表示Future的结果,将它分发给使用者,并在将来的某个时候使用complete方法完成它。使用者可以使用get方法阻塞当前线程,直到获取返回结果。

public Future<String> calculateAsync() throws InterruptedException {
    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello");
        return null;
    });

    return completableFuture;
}

CompletableFuture构建方法

构建一个CompletableFuture有以下四种办法

  • supplyAsync(runnable) 异步执行一个任务,提供返回值
  • supplyAsync(runnable,Executor executor) 提供返回值
  • runAsync(runnable,Executor executor) -> 通过自定义线程池异步执行一个任务,没有返回值
  • runAsync(runnable) -> 异步执行一个任务, 默认用ForkJoinPool.commonPool(), 没有返回值

注意在没有返回值的情形下,CompletableFuture也还是提供了get方法来阻塞获取执行结果,只是最后返回的结果为null

CompletionStage

CompletionStage定义了很多方法,大致可以分为以下几类

纯消费类型的方法

纯消费类型的方法,指依赖上一个异步任务的结果作为当前函数的参数进行下一步计算,它的特点是不返回新的计算值,这类的方法都包含 Accept 这个关键字
在CompletionStage中包含9个Accept关键字的方法,这9个方法又可以分为三类:

  • 依赖单个CompletionStage任务完成,
  • 依赖两个CompletionStage任务都完成
  • 依赖两个CompletionStage中的任何一个完成
//当前线程同步执行
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
//使用ForkJoinPool.commonPool线程池执行action
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
//使用自定义线程池执行action
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T>
action,Executor executor);
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U>
other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<?
extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<?
extends U> other,BiConsumer<? super T, ? super U> action,Executor executor);
public CompletionStage<Void> acceptEither(CompletionStage<? extends T>
other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T>
other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T>
other,Consumer<? super T> action,Executor executor);

有返回值类型的方法

有返回值类型的方法,就是用上一个异步任务的执行结果进行下一步计算,并且会产生一个新的有返回值的CompletionStage对象。

在CompletionStage中,定义了9个带有返回结果的方法,也可以根据依赖几个CompletionStage任务的完成来分成三类

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U>
fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U>
fn,Executor executor);
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U>
other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends
U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends
U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T>
other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends
T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends
T> other,Function<? super T, U> fn,Executor executor);

不消费也不返回的方法

该方法的执行,带run关键字,下一步的执行不依赖上一步的执行结果,也不返回结果,只是有执行的先后顺序

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor
executor);
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable
action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?>
other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?>
other,Runnable action,Executor executor);
public CompletionStage<Void> runAfterEither(CompletionStage<?>
other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?>
other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?>
other,Runnable action,Executor executor);

多任务组合

public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends
CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends
CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends
CompletionStage<U>> fn,Executor executor)

并行执行

  • allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
  • anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture

结果/异常处理

  • whenComplete
    whenComplete表示当任务执行完成后,会触发的方法,它的特点是,不论前置的
    CompletionStage任务是正常执行结束还是出现异常,都能够触发特定的 action 方法

  • handle
    handle表示前置任务执行完成后,不管前置任务执行状态是正常还是异常,都会执行handle中的
    fn 函数,它和whenComplete的作用几乎一致,不同点在于,handle是一个有返回值类型的方
    法。

  • exceptionally
    exceptionally接受一个 fn 函数,当上一个CompletionStage出现异常时,会把该异常作为参数传
    递到 fn 函数

       CompletableFuture.runAsync(()-> {
//            int i=1/0;
            System.out.println("执行某些操作");
        }).whenComplete((r, e) -> {
            if (e != null) {
                System.out.println("执行过程出现异常...");
            } else {
                System.out.println("任务执行完成");
            }
        });
    }

thenCompose和thenApply的异同

thenApply和thenCompose都是对一个CompletableFuture返回的结果进行后续操作,返回一个新的CompletableFuture。

对于thenApplyfn函数是一个对一个已完成的stage或者说CompletableFuture的返回值进行计算、操作;
对于thenComposefn函数是对另一个CompletableFuture进行计算、操作

        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> 100).thenApply(num -> num + " to String");
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> 100).thenCompose(num -> CompletableFuture.supplyAsync(() -> num + " to String"));

        System.out.println(f1.join()); // 100 to String
        System.out.println(f2.join()); // 100 to String

上面thenApplythenCompose都是将一个CompletableFuture<Integer>转换为CompletableFuture<String>。不同的是,thenApply中的传入函数的返回值是String,而thenCompose的传入函数的返回值是CompletableFuture<String>。就好像stream中学到的mapflatMap。回想我们做过的二维数组转一维数组,使用stream().flatMap映射时,我们是把流中的每个数据(数组)又展开为了流。

CompletableFuture原理介绍

以下述代码为例,简单了解下CompletableFuture的实现原理

 public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello";
        }).thenAccept(e -> {
            System.out.println("执行结果为" + e);
        });
        f.get();
    }

先看一下CompletableFuture里定义了哪些重要的变量

 //CompletableFuture的结果值或者是一个异常的包装对象AltResult
 volatile Object result;       
 // 依赖操作栈的栈顶
 volatile Completion stack;    // Top of Treiber stack of dependent actions

然后看下我的例子里调用的supplyAsync方法

supplyAsync

会将我们的Supplier参数封装成AsyncSupply对象,然后交给线程池执行,
AsyncSupply有两个参数,一个是源码里创建的CompletableFuture对象,一个是用户定义的Supplier参数

 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        //asyncPool是一个全局的ForkJoinPool.commonPool线程池
        return asyncSupplyStage(asyncPool, supplier);
    }
    
  static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                     Supplier<U> f) {
        if (f == null) throw new NullPointerException();
        //创建一个新的CompletableFuture并返回(1)
        CompletableFuture<U> d = new CompletableFuture<U>();
        e.execute(new AsyncSupply<U>(d, f));
        return d;
    }
 public void run() {
            CompletableFuture<T> d; Supplier<T> f;
            //如果dep和fn不为空
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                //如果CompletableFuture的result为空(表示当前任务还没执行完),则等待直接完成后执行postComplete
                if (d.result == null) {
                    try {
                        //通过get()方法获取返回结果并设置给result
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                //在执行完自己的方法获取到返回值之后,会执行所有依赖此任务的其他任务,这些任务存储在一个无锁并发栈里
                d.postComplete();
            }
        }

thenAccept

我们先来看下thenAccept的实现

 private CompletableFuture<Void> uniAcceptStage(Executor e,
                                                   Consumer<? super T> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();      
        //这里的this就是前面supplyAsync方法里创建的CompletableFuture
        //如果为异步任务,则将任务压栈后直接返回,因为源任务结束后会触发异步线程执行对应逻辑
        //如果为同步任务(e==null)会调用d.uniAccept方法 这个方法的逻辑:如果源任务完成,则直接调用f并返回true,否则进入下面的if代码块
        if (e != null || !d.uniAccept(this, f, null)) {
            //封装一个UniAccept对象,并压入到栈中
            UniAccept<T> c = new UniAccept<T>(e, d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }
    
    /** Pushes the given completion (if it exists) unless done. */
    final void push(UniCompletion<?,?> c) {
        if (c != null) {
            while (result == null && !tryPushStack(c))
                lazySetNext(c, null); // clear on failure
        }
    }
    
 final CompletableFuture<Void> tryFire(int mode) {
    CompletableFuture<Void> d; CompletableFuture<T> a;
    if ((d = dep) == null ||
        //如果是异步调用(mode>0),传入null。否则传入this
        !d.uniAccept(a = src, fn, mode > 0 ? null : this))
        return null;
    dep = null; src = null; fn = null;
    return d.postFire(a, mode);
}

 final <S> boolean uniAccept(CompletableFuture<S> a,
                                Consumer<? super S> f, UniAccept<S> c) {
        Object r; Throwable x;
        //判断当前CompletableFuture是否已完成,如果没有完成则返回false
        if (a == null || (r = a.result) == null || f == null)
            return false;
        tryComplete: if (result == null) {
            //判断任务执行结果是否为异常类型
            if (r instanceof AltResult) {
                if ((x = ((AltResult)r).ex) != null) {
                    completeThrowable(x, r);
                    break tryComplete;
                }
                r = null;
            }
            try {
                //判断当前任务是否可以执行(d.uniAccept(this, f, null)传入的c为null)
                if (c != null && !c.claim())
                    return false;
                @SuppressWarnings("unchecked") S s = (S) r;
                //获取CompletableFuture执行的任务结果并执行consumer
                f.accept(s);
                completeNull();
            } catch (Throwable ex) {
                completeThrowable(ex);
            }
        }
        return true;
    }

postComplete

再回过头看下在一个任务执行完成后调用的postComplete 方法

 /**
     * Pops and tries to trigger all reachable dependents.  Call only
     * when known to be done.
     */
    final void postComplete() {
        //无锁并发栈,(Completion有一个next指针), 保存的是依赖当前的CompletableFuture的一串任务
        CompletableFuture<?> f = this; Completion h;
        //判断stack是否为空
        while ((h = f.stack) != null ||
               (f != this && (h = (f = this).stack) != null)) {
            CompletableFuture<?> d; Completion t;
            //非空则通过CAS出栈
            if (f.casStack(h, t = h.next)) {
                if (t != null) {
                    //如果f不是this,将刚出栈的h压入this的栈顶
                    if (f != this) {
                        //通过CAS入栈
                        pushStack(h);
                        continue;
                    }
                     // 如果是当前CompletableFuture, 解除头节点与栈的联系, help GC
                    h.next = null;   
                }
                f = (d = h.tryFire(NESTED)) == null ? this : d;
            }
        }
    }
    final void pushStack(Completion c) {
        do {} while (!tryPushStack(c));
    }

CompletableFuture实现链式调用的核心原理就是通过一个无锁并发栈(Treiber Stack)来存储任务。

依赖任务执行的时候先判断源任务是否完成,如果完成,直接在对应线程执行以来任务(如果是同步,则在当前线程处理,否则在异步线程处理)
如果任务没有完成,直接返回,因为等任务完成之后会通过postComplete去触发调用依赖任务。

借用下在别人的博客看到的原理图:

public static void main(String[] args) {
    CompletableFuture<String> baseFuture = CompletableFuture.completedFuture("Base Future");
    log.info(baseFuture.thenApply((r) -> r + " Then Apply").join());
    baseFuture.thenAccept((r) -> log.info(r)).thenAccept((Void) -> log.info("Void"));
}

在这里插入图片描述

实战例子: 烧水泡茶
CompletableFuture使用详解

全网最详细CompletableFuture使用教程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/437966.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何将matlab的m文件转换成python文件

因为matlab的内存实在太大了&#xff0c;所以我只在实验室电脑安装了matlab&#xff0c;自己电脑没有安装&#xff0c;现在跑实验需要把matlab文件转成python文件。在网上找到可以使用smop小工具。 我是在本地的anaconda转换的。先创建一个新环境到指定路径 conda create --pr…

HttpWebRequest类

HttpWebRequest类与HttpRequest类的区别。 HttpRequest类的对象用于服务器端&#xff0c;获取客户端传来的请求的信息&#xff0c;包括HTTP报文传送过来的所有信息。而HttpWebRequest用于客户端&#xff0c;拼接请求的HTTP报文并发送等。 HttpWebRequest这个类非常强大&#…

Spring MVC 接收 json 和返回 json (14)

目录 总入口 测试case 源码分析 1. 针对RequestBody的参数解析 2. 针对 ResponseBody 的返回值处理 总入口 通过上一篇Spring MVC 参数解析&#xff08;13&#xff09;_chen_yao_kerr的博客-CSDN博客的说明&#xff0c;相信大家对Sping MVC的参数解析有了一定的了解&…

2.微服务项目实战---环境搭建,实现电商中商品、订单、用户

使用的电商项目中的商品、订单、用户为案例进行讲解。 2.1 案例准备 2.1.1 技术选型 maven &#xff1a; 3.3.9 数据库&#xff1a; MySQL 5.7 持久层 : SpingData Jpa 其他 : SpringCloud Alibaba 技术栈 2.1.2 模块设计 springcloud-alibaba 父工程 shop-common …

【观察】构建“零信任”架构,筑起制造业安全“护城河”

中国是全球制造业大国&#xff0c;过去40年&#xff0c;中国制造业规模增长了18倍&#xff0c;其附加值达到2.2万亿美元&#xff0c;制造业在中国GDP比重高达40%&#xff0c;其之于中国经济的重要性可见一斑。 与此同时&#xff0c;中国制造业在高速发展的同时&#xff0c;也普…

使用全球融合CDN的10大优势

根据预估&#xff0c;今年的全球内容交付网络&#xff08;CDN&#xff09;市场预计将达到424亿美元。而由于移动应用程序的激增和人工智能尤其是ChatGPT等相关领域的快速发展将进一步带来CDN市场的快速增长&#xff0c;可以说全球CDN的黄金时代才刚开始。 融合CDN和多CDN战略是…

32道子网划分练习题详细解析含答案

目录 1 子网划分概念&#xff1a; 2 划分方法&#xff1a; 子网划分方法&#xff1a;段&#xff0c;块&#xff0c;数的计算三步。 段就是确定ip地址段中既有网络地址&#xff0c;又有主机地址的那一段是四段中的那一段&#xff1f; 块就确定上一步中确定的那一段中的主机…

企业云成本优化:减少企业云支出的终极指南

向云的转移使企业的技术领导者能够实现基础设施的现代化&#xff0c;并提高应用程序的可用性、可扩展性和性能。然而优化云成本对很多以互联网业务为主体的公司都是一项挑战&#xff0c;因为需要执行可持续的云成本管理战略。随着世界经济近年来走向低迷&#xff0c;尤其是互联…

【Linux网络服务】DNS域名解析服务服务

一、BIND域名服务基础 服务背景 1在日常生活中人们习惯使用域名访问服务器&#xff0c;但机器向互相只认IP地址&#xff0c;域名与IP地址之间是多对一的关系&#xff0c;一个IP址不一定只对应一个域名&#xff0c;且一个完成域名只可以对应一个IP地址&#xff0c;它们之间转换…

[ARM+Linux] 基于wiringPi库的串口通信

wiringOP-master/examples/serialTest.c中&#xff0c;wiringPi库中自带的串口程序&#xff1a; #include <stdio.h> #include <string.h> #include <errno.h>#include <wiringPi.h> #include <wiringSerial.h>int main () {int fd ;int count …

JavaSE-part1

文章目录 Day01 面向对象特性1.java继承注意点2.多态2.1多态概述2.2多态中成员的特点:star::star:2.3多态的转型:star::star: 3.Super4.方法重写:star::star:5.Object类:star::star: Day02 面向对象特性1.代码块:star:(主要是初始化变量&#xff0c;先于构造器)2.单例设计模式:…

服务器初始化

Linux基础系类 提示&#xff1a;个人学习总结&#xff0c;仅供参考。 一、Linux系统部署 二、服务器初始化 提示&#xff1a;文档陆续更新整理 服务器初始化 Linux基础系类简介一、配置IP地址二、配置YUM源&#xff08;yum本地源和yum网络源&#xff09;1.简介2.准备工作3.配置…

数据结构与算法——深度寻路算法

&#x1f4d6;作者介绍&#xff1a;22级树莓人&#xff08;计算机专业&#xff09;&#xff0c;热爱编程&#xff1c;目前在c&#xff0b;&#xff0b;阶段&#xff0c;因为最近参加新星计划算法赛道(白佬)&#xff0c;所以加快了脚步&#xff0c;果然急迫感会增加动力>——…

SQL Server的行级安全性

行级安全性 一、前言二、描述三、权限四、安全说明&#xff1a;侧信道攻击五、跨功能兼容性六、示例 一、前言 行级别安全性使您能够使用组成员身份或执行上下文来控制对数据库表中行的访问。 行级别安全性 &#xff08;RLS&#xff09; 简化了应用程序中的安全性设计和编码。…

MyBatis(一)

一、简介 1.1 什么是MyBatis MyBatis是一个持久层框架&#xff0c;既然和持久层有关那就可以简单理解成和数据库有关&#xff0c;既然是框架那么就肯定是为了简化数据库有关的操作。由于传统的JDBC代码处理数据库有关的代码太复杂&#xff0c;所以出现了MyBatis来快速处理数据…

RK3588调试CAN驱动记录

背景 汽车芯片公司&#xff0c;IP领导随机分配&#xff0c;主要任务是各种IP的硅前验证&#xff0c;包括uboot命令行和Linux kernel验证。工作两年半没什么外设经验也没做过CAN总线(前两年在一家芯片公司做各种加解密IP的开发)&#xff0c;一个人的摸索过程可以说是充满了坎坷…

花有约,春不迟|弘博创新2023塘朗山到梅林水库穿越活动

花有约,春不迟|弘博创新2023塘朗山到梅林水库穿越活动 花开有约&#xff0c;春日不迟 4月16日&#xff0c;正值春暖花开的季节&#xff0c;周末闲暇无事&#xff0c;弘博创新的朋友们相聚一起&#xff0c;从塘朗山龙珠门到梅林水库&#xff0c;体验一场感受大自然&#xff0c;开…

dsl语法

查询 1.查询所有&#xff08;默认有分页查询&#xff09; #查询所有 GET /hotel/_search {"query": {"match_all": {}} } 2.match查询&#xff08;条件查询&#xff09;-----包含四川和外滩的信息&#xff0c;信息匹配度越高越靠前&#xff0c;两者存在一…

知识库管理系统对于企业有哪些作用及优势?

知识库管理系统是一种通过集成多种技术手段&#xff0c;将企业内部知识进行收集、整理、存储、分析和共享的信息管理系统。知识库管理系统可以帮助企业管理和利用企业内部的知识&#xff0c;提高企业的创新能力和竞争力。 知识库管理系统的作用 1、促进企业内部知识的流通和共…

AutoGPT 安装指南,使用避坑要点

最近&#xff0c; AIGC 中最火的可能就当属于 AutoGPT 了吧&#xff0c;首先简单介绍一下AutoGPT 背景 AutoGPT 是基于 ChatGPT API 接口开发&#xff0c;项目首推 GPT-4 模型&#xff0c;但 OpenAI 账号 API 只有 gpt-3.5-turo 权限同样也可以使用。 项目在 github 上获取的…