【JUC系列-15】深入理解CompletableFuture的基本使用

news2024/11/19 5:24:17

JUC系列整体栏目


内容链接地址
【一】深入理解JMM内存模型的底层实现原理https://zhenghuisheng.blog.csdn.net/article/details/132400429
【二】深入理解CAS底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/132478786
【三】熟练掌握Atomic原子系列基本使用https://blog.csdn.net/zhenghuishengq/article/details/132543379
【四】精通Synchronized底层的实现原理https://blog.csdn.net/zhenghuishengq/article/details/132740980
【五】通过源码分析AQS和ReentrantLock的底层原理https://blog.csdn.net/zhenghuishengq/article/details/132857564
【六】深入理解Semaphore底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/132908068
【七】深入理解CountDownLatch底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/133343440
【八】深入理解CyclicBarrier底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/133378623
【九】深入理解ReentrantReadWriteLock 读写锁的底层实现https://blog.csdn.net/zhenghuishengq/article/details/133629550
【十】深入理解ArrayBlockingQueue的基本使用和底层实现https://blog.csdn.net/zhenghuishengq/article/details/133692023
【十一】深入理解LinkedBlockingQueue的基本使用和底层实现https://blog.csdn.net/zhenghuishengq/article/details/133723652
【十二】深入理解PriorityQueue的基本使用和底层实现https://blog.csdn.net/zhenghuishengq/article/details/133788655
【十三】深入理解DelayQueue的基本使用和底层实现https://blog.csdn.net/zhenghuishengq/article/details/133820599
【十四】深入理解线程池的基本使用和底层源码https://blog.csdn.net/zhenghuishengq/article/details/133850545
【十五】深入理解CompletableFuture的基本使用https://blog.csdn.net/zhenghuishengq/article/details/133957222

深入理解线程池的基本使用和底层源码

  • 一,深入理解CompletableFuture的基本使用
    • 1,Callable的基本使用
    • 2,Future
    • 3,CompletableFuture
      • 3.1,创建CompletableFuture异步操作四种方式
      • 3.2,get和join获取值
      • 3.3,处理结果whenCompleteAsync
        • 3.3.1,没有异常的情况
        • 3.3.2,有异常时
      • 3.4,多任务链路中的结果处理
        • 3.4.1,thenApply
        • 3.4.2,thenCombine
        • 3.4.2,thenAccept
        • 3.4.3,runAfterEither
        • 3.4.4,anyOf

一,深入理解CompletableFuture的基本使用

在上一篇文章中讲了线程池,线程任务类是通过实现Runnable实现的,但是Runnable接口会有缺点,一个是不能直接在提交任务之后有返回值,另一个是不能在run方法上面抛异常,因此为了解决这两个问题,jdk中引入了一个新的接口 Callable

1,Callable的基本使用

如直接先定义一个线程任务类Task,实现Callable方法

/**
 * @Author: zhenghuisheng
 * @Date: 2023/10/19 1:02
 */
public class Task implements Callable {
    @Override
    public Object call() throws Exception {
        return 1;
    }
}

随后创建一个Demo类,用于测试。在jdk中,new Thread的参数只能是Runnable类或者其具体的实现类,因此先将Callable类的具体实现作为参数,加入FutureTask中,而FutureTask是一个Runnable的具体的实现类

/**
 * @Author: zhenghuisheng
 * @Date: 2023/10/19 1:04
 */
public class FutureTaskDemo {
    public static void main(String[] args) throws Exception {
        //创建一个线程
        Task task = new Task();
        //将线程作为FutureTask的参数
        FutureTask futureTask = new FutureTask(task);
        new Thread(futureTask).start();
        //将结果返回
        System.out.println(futureTask.get());
    }
}

在这里插入图片描述

这样就成功的将Callable引入进来,成为一个创建线程的一种方式。并且通过这种方式可以将需要的返回值直接获取。

2,Future

上面这张图可以看出这个FutureTask也是Future的一个实现类,接下来查看这个接口中的抽象方法

Future f = new FutureTask(task);

在这个接口中,主要有两个方法,一个是任务执行是否完成,一个是获取任务完成的结果值。get方法在获取到结果之前,内部会进行阻塞

public interface Future<V> {
    boolean isDone();	//是否已经执行完成
    V get();			//获取执行完的结果
}

如下面这段代码,总共就四个步骤,当线程任务完成之后,会将结果填充到这个FutureTask中,随后通过这个实例获取结果即可。

Task task = new Task();					//构建一个线程任务,实现了callable接口
FutureTask f = new FutureTask(task);	//将task类作为参数添加到FutureTask中
threadPool.execute(f);					//加入到线程池
System.out.println(f.get());			//获取结果

除了上面最重要的两个方法之外,Future接口中还有下面这些方法

boolean cancel(boolean mayInterruptIfRunning);	//取消线程任务
boolean isCancelled();							//判断是否已取消
V get(long timeout, TimeUnit unit)				//超时机制获取

3,CompletableFuture

Future可以通过多个异步任务来解决多个同步任务的效率问题,但是其本身也存在着一些缺陷,如无法进行任务与任务之间的链式调用、无法组合多个任务、以及无法在任务处理时做异常处理。为了解决这个问题,因此在juc包中,又引入了一个新的任务类 CompletableFuture

先查看这个CompletableFuture实现类,该类是Future的一个具体实现类,同时还实现了这个 CompletionStage 接口,也就是说该类要全部实现这两个接口中的全部方法,那么该类的方法相必是特别多的,因此该类的功能也非常的强大

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}

在这个类中,如果没有自定义线程池,则采用的是 ForkJoinPool 线程池,专门处理cpu密集型任务的线程池。

private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

本文主要讲解的是这个类的使用,因此主要是对内部的一些方法,结合一定的场景来举例。在举例之前,先熟悉一下这些方法中的参数的某些含义。在了解这些规律之后,接下来结合场景对部分api进行讲解

CompletionStage<? extends T> other		//表示要创建一个任务
Consumer<? super T> action				//表示消费一个任务
Function<? super T,? extends V> fn		//表示可以携带上一个任务的返回值到先一个任务
Executor executor						//默认的forkjoin或者自定义实现的线程池

3.1,创建CompletableFuture异步操作四种方式

主要分为线程时runnable的实现类和callable的实现类,以及是否自定义线程池等。通过runAsync的方法是没有返回值的,通过supplyAsync的方法是有返回值的,但是在使用get方法时,会进行阻塞。如果没有自定义的实现线程池,则会使用默认的forkjoinpool线程池。

static ThreadPoolExecutor threadPool = ThreadPoolUtil.getThreadPool();
public static void main(String[] args) throws Exception {
	//没有返回值,参数为runnable,线程池为forkjoin
	CompletableFuture future1 = CompletableFuture.runAsync(() -> System.out.println("run1"));
	//没有返回值,参数为runnable,线程池为自定义线程池
	CompletableFuture future2 = CompletableFuture.runAsync(() -> System.out.println("run2"),threadPool);
	//有返回值,参数为callable,线程池为forkjoin
	CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {return 0;});
	//有返回值,参数为callable,线程池为自定义线程池
	CompletableFuture future4 = CompletableFuture.supplyAsync(() -> {return 0;},threadPool);

3.2,get和join获取值

先看这个get方法,如下面这段代码

CompletableFuture future = CompletableFuture.supplyAsync(() -> {return 0;});
future.get();

由于是线程池,内部肯定要执行对应的run方法,因此定位到这个 AsyncSupply 类,对应的run方法如下。可以发现在执行这个run方法时,会对这个方法进行回调操作

public void run() {
    CompletableFuture<T> d; Supplier<T> f;
    if ((d = dep) != null && (f = fn) != null) {
        dep = null; fn = null;
        if (d.result == null) {
            try {
                d.completeValue(f.get());	//设置值
            } catch (Throwable ex) {
                d.completeThrowable(ex);	//抛异常
            }
        }
        d.postComplete();		//接口回调
    }
}

回调的具体实现如下,通过cas的方式对这个 CompletableFuture 类中的result值赋值,随后就可以直接通过get的方式进行获取的操作。

final boolean completeValue(T t) {
    //通过cas对result赋值
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       (t == null) ? NIL : t);
}

除了get方法能获取到值之外,还能通过join的方式获取值

CompletableFuture future = CompletableFuture.supplyAsync(() -> {return 0;});
future.join();

join方法的具体实现如下,就是没拿到结果就一直阻塞,拿到才能返回。和get最大的区别就是get使用get方法时,需要手动的抛出异常,而join不需要开发者强制抛出或者捕获异常

public T join() {
    Object r;
    //拿到结果就返回,没拿到结果就一直阻塞
    return reportJoin((r = result) == null ? waitingGet(false) : r);
}

3.3,处理结果whenCompleteAsync

3.3.1,没有异常的情况

当结果获取成功之后,如对某个值的计算,对整体流程都执行成功时,可以使用以下方法,参数同样也是区分了是否需要返回值,是否自定义线程池等

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwa
ble> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super T
hrowable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super T
hrowable> action, Executor executor)

举个例子,如当对某个值计算后,成功拿到结果时

public static void main(String[] args) throws Exception {
    //创建异步对象
    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        int number = 0;
        for (int i = 0; i < 100; i++) {
            number = number + i;
        }
        return number;
    });
    //处理上面的结果
    future.whenCompleteAsync(new BiConsumer<Integer,Throwable>() {
        @Override
        public void accept(Integer data, Throwable throwable) {
            System.out.println(data);
        }
    });
}
3.3.2,有异常时

当结果可能会出现异常时,那么就需要使用到这个 exceptionally 方法

public CompletableFuture<T> exceptionally(Function<Throwable,? extends T>
fn)

接下来对这个方法的使用举例,就是自定义一个简单的异常,随后通过创建的future对象调用获取结果

public static void main(String[] args) throws Exception {
    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        int number = 10 / 0;
        return number;
    });
    //处理上面的结果
    future.exceptionally(new Function<Throwable, String>() {
        @Override
        public String apply(Throwable throwable) {
            System.out.println("异常信息为:" + throwable.getMessage());
            return throwable.getMessage();
        }
    });
}

没有异常时是需要消费者继续处理消费的,因此参数是一个 BiConsumer 类,而有异常时不需要消费者处理,因此只需创建一个Function处理异常即可。

3.4,多任务链路中的结果处理

3.4.1,thenApply

如果在一个需要多个异步任务的调用链路中,比如B需要A的执行结果,c需要b的执行结果,一直下去,那么就需要使用这个thenApply 了,当然这个方法也区分是否有返回值,是否定义线程池等方法

CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    int number = 0;
    for (int i = 0; i < 100; i++) {
        number = number + i;
    }
    System.out.println(number);
    return number;
}).thenApplyAsync(data -> {		//链路调用1
    return data + 999;
}).thenApplyAsync(data -> {		//链路调用2
    return data + 888;
});
3.4.2,thenCombine

如果需要结合两个任务的计算,那么可以考虑使用这种thenCombine,比如一个任务算当月的总收入,一个任务算当月的总支出

CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    int income = 1000 * 30;
    System.out.println("总收入为" + income);
    return income;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
    int expend = 0;
    for (int i = 1; i <= 30; i++) {
        expend = expend + i + 500;
    }
    System.out.println("总支出为:" + expend);
    return expend;
}),(income,expend)->{
    return income - expend;
});
//获取结果
System.out.println(future.get());
3.4.2,thenAccept

当存在链路调用中,只需关注自身任务的求值,而不需要求总值时,可以直接通过这个thenAccept。如计算一年中走的步数,参数是一个Consumer消费者,会将结果消费,因此在后续的get中,获取到的值为null。

CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    int runData = 0;
    for (int i = 0; i < 30; i++) {
        runData = runData + 10000 + i;
    }
    System.out.println("第一个月的总步数为:" + runData);
    return runData;
}).thenAccept(runData ->{
    for (int i = 0; i < 30; i++) {
        runData = runData + 10000 + i;
    }
    System.out.println("前两个月的总步数为:" + runData);
});
System.out.println(future.get());
3.4.3,runAfterEither

如在重试接口中,无论同时发送多少次请求,只要有一个请求成功,就可以不管后续的发出的请求的执行结果

public static void main(String[] args) throws Exception {
    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1;
    });
    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 2;
    });
    future1.runAfterEither(future2, new Runnable() {
        @Override
        public void run() {
            System.out.println("已经有一个任务执行完成");
        }
    }).join();
}

runAfterBoth这个使用和上面的一样,但是得同时满足两个请求

3.4.4,anyOf

原理和上面的一样,就是在多任务中,只要满足一个就可以将对应的请求的返回值返回。而对应的allOf就是可以将所有任务的返回值返回

总的来说可以分为下面这图所示

img

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1117193.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023 10月8日 至 10 月16日学习总结

1.做的题目 [RootersCTF2019]I_&#xff1c;3_Flask_双层小牛堡的博客-CSDN博客 [NCTF2019]SQLi regexp 盲注-CSDN博客 [网鼎杯 2018]Comment git泄露 / 恢复 二次注入 .DS_Store bash_history文件查看-CSDN博客 PHP LFI 利用临时文件Getshell_双层小牛堡的博客-CSDN博客 …

《动手学深度学习 Pytorch版》 9.7 序列到序列学习(seq2seq)

循环神经网络编码器使用长度可变的序列作为输入&#xff0c;将其编码到循环神经网络编码器固定形状的隐状态中。 为了连续生成输出序列的词元&#xff0c;独立的循环神经网络解码器是基于输入序列的编码信息和输出序列已经看见的或者生成的词元来预测下一个词元。 要点&#x…

关于使用 vxe-table 时设置了 show-overflow tooltip 不展示的问题(Dialog 组件和 table 同时使用)

众所周知&#xff0c;vxe-table 是可以支撑万级数据渲染的表格组件&#xff0c;本质上还是用了虚拟滚动的实现。之前一直知道vxe-table, 但是基本没有机会用的上这个组件&#xff0c;最近在开发埋点数据的统计&#xff0c;后端一次性返回了上千条数据&#xff0c;elementui 的 …

【Shell】环境变量 自定义变量 特殊变量

Shell变量&#xff1a;环境变量 目标 1、理解什么是系统环境变量&#xff1f; 2、掌握常用的系统环境变量都有哪些&#xff1f; Shell变量的介绍 变量用于存储管理临时的数据, 这些数据都是在运行内存中的. 变量类型 系统环境变量 自定义变量 特殊符号变量 系统环境变…

golang查看CPU使用率与内存及源码中的//go:指令

golang查看CPU使用率与内存 1 psutil 1.1 概念与应用场景 psutil是业内一个用于监控OS资源的软件包&#xff0c;目前已经多种语言&#xff0c;包括但不限于Python、Go。 gopsutil是 Python 工具库psutil 的 Golang 移植版&#xff0c;可以帮助我们方便地获取各种系统和硬件信…

基于Java的实验室设备管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09; 代码参考数据库参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…

如何在电脑上设置新的蓝牙耳机

本文介绍如何将蓝牙耳机连接到Windows或Mac电脑。 如何在Windows上设置新的蓝牙耳机 蓝牙耳机的设置过程因平台而异&#xff0c;但以下是Windows 11的步骤&#xff1a; 1、选择“开始”&#xff0c;然后在搜索框中输入蓝牙&#xff0c;以显示蓝牙和其他设备。 2、选择添加设…

【LeetCode:2316. 统计无向图中无法互相到达点对数 | BFS + 乘法原理】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

施密特正交化

相信大家在平时的期末考试中一定少不了对某某向量组执行标准正交化类型的题目。今天我们从这个题目入手&#xff0c;说明这个如何执行施密特正交化&#xff0c;以及为什么要进行正交化。 一、例子 例子&#xff1a;设 a 1 [ 1 2 − 1 ] a_1\begin{bmatrix}1\\2\\-1\end{bmat…

阿里妈妈Union Lab全量公测,你会用吗?

Union Lab是一种智能化的选品推荐、推广内容创作工具&#xff0c;它内置了大语言模型&#xff08;LLMs&#xff09;&#xff0c;使得选品、推广更加智能和简单。通过联盟底层货品服务&#xff0c;Union Lab可以实现智能化货品发现、分析、选品、投放、创意生产&#xff0c;基于…

嵌入式mqtt总线架构方案mosquitto+paho

一 mqtt通信模型 MQTT 协议提供一对多的消息发布&#xff0c;可以降低应用程序的耦合性&#xff0c;用户只需要编写极少量的应用代码就能完成一对多的消息发布与订阅&#xff0c;该协议是基于<客户端-服务器>模型&#xff0c;在协议中主要有三种身份&#xff1a;发布者&…

力扣第37题 解数独 c++ 难~ 回溯

题目 37. 解数独 困难 相关标签 数组 哈希表 回溯 矩阵 编写一个程序&#xff0c;通过填充空格来解决数独问题。 数独的解法需 遵循如下规则&#xff1a; 数字 1-9 在每一行只能出现一次。数字 1-9 在每一列只能出现一次。数字 1-9 在每一个以粗实线分隔的 3x3 宫…

基于Java的文物管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09; 代码参考数据库参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…

GRASP 、SOLID 与 GoF 设计模式

一、GRASP GRASP&#xff1a;通用职责分配软件设计模式(General Responsibility Assignment Software Patterns)&#xff0c;其主要思想是基于单一职责设计软件对象。 思考软件对象设计以及大型构件的流行方式是&#xff0c;考虑其职责、角色和协作。这是被称为职责驱动设计&a…

C++:无法查找或打开 PDB 文件?? 如何解决呢?以及产生原因是什么?

C:无法查找或打开 PDB 文件?? 如何解决呢&#xff1f;以及产生原因是什么&#xff1f; 前言解决办法原因 前言 最近博主在写C时&#xff0c;明明代码都正确&#xff0c;但编译失败。查看原因后发现显示&#xff1a;无法查找或打开 PDB 文件。&#xff08;先介绍解决办法&…

【高等数学】微分中值定理

文章目录 1、极值2、费马引理3、罗尔定理4、拉格朗日中值定理4.1用拉格朗日定理证明基本结论 5、柯西中值定理6、微分中值定理的意义7、三大中值定理的意义 1、极值 若 ∃ δ > 0 ∃δ>0 ∃δ>0&#xff0c;使得 ∀ x ∈ U ( x 0 , δ ) ∀x\in U(x_0,δ) ∀x∈U(x0…

DDD与微服务的千丝万缕

一、软件设计发展过程二、什么是DDD&#xff1f;2.1 战略设计2.2 战术设计2.3 名词扫盲1. 领域和子域2. 核心域、通用域和支撑域3. 通用语言4. 限界上下文5. 实体和值对象6. 聚合和聚合根 2.4 事件风暴2.5 领域事件 三、DDD与微服务3.1 DDD与微服务的关系3.2 基于DDD进行微服务…

2023年中国煤气节能器产量及市场规模分析[图]

煤气节能器行业是指生产和销售用于煤气燃烧系统中的节能器的行业。煤气节能器是一种能够有效提高煤气燃烧效率、降低燃烧过程中的热损失的装置&#xff0c;广泛应用于工业生产、民用燃烧等领域。煤气节能器行业的发展与煤气燃烧技术的发展密切相关&#xff0c;随着煤气燃烧技术…

操作系统期末复习题版详解(含解析)

操作系统期末复习 第一章 操作系统引论 一、Os具有哪几大特征?它们之间有何关系? 【参考答案】OS具有并发、共享、虚拟和异步这4个基本特征。它们之间的关系包含以下几个方面。 并发和共享是OS最基本的特征。为了提高计算机资源的利用率&#xff0c;OS必然要采用多道程序设…

Excel 5s内导入20w条简单数据(ExecutorType.BATCH)Mybatis批处理的应用

文章目录 Excel 5s内导入20w条数据1. 生成20w条数据1.1 使用Excel 宏生成20w条数据1.2 生成成功 2. ExecutorType&#xff1a;批量操作执行器类型2.1 ExecutorType.SIMPLE2.2 ExecutorType.BATCH2.3 ExecutorType.REUSE 3. 20w条数据直接插入数据库3.1 使用ExecutorType.SIMPLE…