Java8 CompletableFuture异步编程-进阶篇

news2025/2/27 20:08:56

🏷️个人主页:牵着猫散步的鼠鼠 

🏷️系列专栏:Java全栈-专栏

🏷️个人学习笔记,若有缺误,欢迎评论区指正 

前言

我们在前面文章讲解了CompletableFuture这个异步编程类的基本用法,这节我们继续学习CompletableFuture相关的进阶知识,上文入口:Java8 CompletableFuture异步编程-入门篇-CSDN博客

1、异步任务的交互

异步任务交互指 将异步任务获取结果的速度相比较,按一定的规则( 先到先用 )进行下一步处理。

1.1 applyToEither

applyToEither() 把两个异步任务做比较,异步任务先到结果的,就对先到的结果进行下一步的操作。

CompletableFuture<R> applyToEither(CompletableFuture<T> other, Function<T,R> func)

演示案例:使用最先完成的异步任务的结果

public class ApplyToEitherDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 开启异步任务1
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            int x = new Random().nextInt(3);
            CommonUtils.sleepSecond(x);
            CommonUtils.printThreadLog("任务1耗时:" + x + "秒");
            return x;
        });
​
        // 开启异步任务2
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            int y = new Random().nextInt(3);
            CommonUtils.sleepSecond(y);
            CommonUtils.printThreadLog("任务2耗时:" + y + "秒");
            return y;
        });
​
        // 哪些异步任务的结果先到达,就使用哪个异步任务的结果
        CompletableFuture<Integer> future = future1.applyToEither(future2, (result -> {
            CommonUtils.printThreadLog("最先到达的结果:" + result);
            return result;
        }));
​
        // 主线程休眠4秒,等待所有异步任务完成
        CommonUtils.sleepSecond(4);
        Integer ret = future.get();
        CommonUtils.printThreadLog("ret = " + ret);
    }
}
​

速记心法:任务1、任务2就像两辆公交,哪路公交先到,就乘坐(使用)哪路公交。

以下是applyToEither 和其对应的异步回调版本

CompletableFuture<R> applyToEither(CompletableFuture<T> other, Function<T,R> func)
CompletableFuture<R> applyToEitherAsync(CompletableFuture<T> other, Function<T,R> func)
CompletableFuture<R> applyToEitherAsync(CompletableFuture<T> other, Function<T,R> func,Executor executor)

1.2 acceptEither

acceptEither() 把两个异步任务做比较,异步任务先到结果的,就对先到的结果进行下一步操作 ( 消费使用 )。

CompletableFuture<Void> acceptEither(CompletableFuture<T> other, Consumer<T> action)
CompletableFuture<Void> acceptEitherAsync(CompletableFuture<T> other, Consumer<T> action)  
CompletableFuture<Void> acceptEitherAsync(CompletableFuture<T> other, Consumer<T> action,Executor executor)

演示案例:使用最先完成的异步任务的结果

public class AcceptEitherDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 异步任务交互
        CommonUtils.printThreadLog("main start");
        // 开启异步任务1
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            int x = new Random().nextInt(3);
            CommonUtils.sleepSecond(x);
            CommonUtils.printThreadLog("任务1耗时:" + x + "秒");
            return x;
        });
​
        // 开启异步任务2
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            int y = new Random().nextInt(3);
            CommonUtils.sleepSecond(y);
            CommonUtils.printThreadLog("任务2耗时:" + y + "秒");
            return y;
        });
​
        // 哪些异步任务的结果先到达,就使用哪个异步任务的结果
        future1.acceptEither(future2,result -> {
            CommonUtils.printThreadLog("最先到达的结果:" + result);
        });
​
        // 主线程休眠4秒,等待所有异步任务完成
        CommonUtils.sleepSecond(4);
        CommonUtils.printThreadLog("main end");
    }
}

1.3 runAfterEither

如果不关心最先到达的结果,只想在有一个异步任务先完成时得到完成的通知,可以使用 runAfterEither() ,以下是它的相关方法:

CompletableFuture<Void> runAfterEither(CompletableFuture<T> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<T> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<T> other, Runnable action, Executor executor)

提示

异步任务交互的三个方法和之前学习的异步的回调方法 thenApply、thenAccept、thenRun 有异曲同工之妙。

2、get() 和 join() 区别

get() 和 join() 都是CompletableFuture提供的以阻塞方式获取结果的方法。

那么该如何选用呢?请看如下案例:

public class GetOrJoinDemo {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "hello";
        });
​
        String ret = null;
        // 抛出检查时异常,必须处理
        try {
            ret = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("ret = " + ret);
​
        // 抛出运行时异常,可以不处理
        ret = future.join();
        System.out.println("ret = " + ret);
    }
}

使用时,我们发现,get() 抛出检查时异常 ,需要程序必须处理;而join() 方法抛出运行时异常,程序可以不处理。所以,join() 更适合用在流式编程中。

3、ParallelStream VS CompletableFuture

CompletableFuture 虽然提高了任务并行处理的能力,如果它和 Stream API 结合使用,能否进一步多个任务的并行处理能力呢?

同时,对于 Stream API 本身就提供了并行流ParallelStream,它们有什么不同呢?

我们将通过一个耗时的任务来体现它们的不同,更重要地是,我们能进一步加强 CompletableFuture 和 Stream API 的结合使用,同时搞清楚CompletableFuture 在流式操作的优势

需求:创建10个MyTask耗时的任务,统计它们执行完的总耗时

定义一个MyTask类,来模拟耗时的长任务

public class MyTask {
    private int duration;
​
    public MyTask(int duration) {
        this.duration = duration;
    }
​
    // 模拟耗时的长任务
    public int doWork() {
        CommonUtils.printThreadLog("doWork");
        CommonUtils.sleepSecond(duration);
        return duration;
    }
}

同时,我们创建10个任务,每个持续1秒。

IntStream intStream = IntStream.range(0, 10);
List<MyTask> tasks = intStream.mapToObj(item -> {
    return new MyTask(1);
}).collect(Collectors.toList());

3.1 并行流的局限

我们先使用串行执行,让所有的任务都在主线程 main 中执行。

public class SequenceDemo {
    public static void main(String[] args) {
        // 方案一:在主线程中使用串行执行
        // step 1: 创建10个MyTask对象,每个任务持续1s,存入list集合便于启动Stream操作
        IntStream intStream = IntStream.range(0, 10);
        List<MyTask> tasks = intStream.mapToObj(item -> {
            return new MyTask(1);
        }).collect(Collectors.toList());

        // step 2: 执行tasks集合中的每个任务,统计总耗时
        long start = System.currentTimeMillis();
        List<Integer> result = tasks.stream().map(myTask -> {
            return myTask.doWork();
        }).collect(Collectors.toList());
        long end = System.currentTimeMillis();
        double costTime = (end - start) / 1000.0;

        System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);
    }
}

它花费了10秒, 因为每个任务在主线程一个接一个的执行。

因为涉及 Stream API,而且存在耗时的长任务,所以,我们可以使用 parallelStream()

public class ParallelDemo {
    public static void main(String[] args) {
        // 方案二:使用并行流
        // step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合
        IntStream intStream = IntStream.range(0, 10);
        List<MyTask> tasks = intStream.mapToObj(item -> {
            return new MyTask(1);
        }).collect(Collectors.toList());

        // step 2: 执行10个MyTask,统计总耗时
        long start = System.currentTimeMillis();
        List<Integer> results = tasks.parallelStream().map(myTask -> {
            return myTask.doWork();
        }).collect(Collectors.toList());
        long end = System.currentTimeMillis();

        double costTime = (end - start) / 1000.0;
        System.out.printf("processed %d tasks %.2f second",tasks.size(),costTime);
    }
}

它花费了2秒多,因为此次并行执行使用了8个线程 (7个是ForkJoinPool线程池中的, 一个是 main 线程),需要注意是:运行结果由自己电脑CPU的核数决定。

3.2 CompletableFuture 在流式操作的优势

让我们看看使用CompletableFuture是否执行的更有效率

public class CompletableFutureDemo {
    public static void main(String[] args) {
        // 需求:创建10MyTask耗时的任务,统计它们执行完的总耗时
        // 方案三:使用CompletableFuture
        // step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合
        IntStream intStream = IntStream.range(0, 10);
        List<MyTask> tasks = intStream.mapToObj(item -> {
            return new MyTask(1);
        }).collect(Collectors.toList());

        // step 2: 根据MyTask对象构建10个耗时的异步任务
        long start = System.currentTimeMillis();
        List<CompletableFuture<Integer>> futures = tasks.stream().map(myTask -> {
            return CompletableFuture.supplyAsync(() -> {
                return myTask.doWork();
            });
        }).collect(Collectors.toList());

        // step 3: 当所有任务完成时,获取每个异步任务的执行结果,存入List集合中
        List<Integer> results = futures.stream().map(future -> {
            return future.join();
        }).collect(Collectors.toList());
        long end = System.currentTimeMillis();

        double costTime = (end - start) / 1000.0;
        System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);
    }
}

运行发现,两者使用的时间大致一样。能否进一步优化呢?

CompletableFutures 比 ParallelStream 优点之一是你可以指定Executor去处理任务。你能选择更合适数量的线程。我们可以选择大于Runtime.getRuntime().availableProcessors() 数量的线程,如下所示:

public class CompletableFutureDemo2 {
    public static void main(String[] args) {
        // 需求:创建10MyTask耗时的任务,统计它们执行完的总耗时
        // 方案三:使用CompletableFuture
        // step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合
        IntStream intStream = IntStream.range(0, 10);
        List<MyTask> tasks = intStream.mapToObj(item -> {
            return new MyTask(1);
        }).collect(Collectors.toList());

        // 准备线程池
        final int N_CPU = Runtime.getRuntime().availableProcessors();
        // 设置线程池的数量最少是10个,最大是16个
        ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), N_CPU * 2));

        // step 2: 根据MyTask对象构建10个耗时的异步任务
        long start = System.currentTimeMillis();
        List<CompletableFuture<Integer>> futures = tasks.stream().map(myTask -> {
            return CompletableFuture.supplyAsync(() -> {
                return myTask.doWork();
            },executor);
        }).collect(Collectors.toList());

        // step 3: 当所有任务完成时,获取每个异步任务的执行结果,存入List集合中
        List<Integer> results = futures.stream().map(future -> {
            return future.join();
        }).collect(Collectors.toList());
        long end = System.currentTimeMillis();

        double costTime = (end - start) / 1000.0;
        System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);
        // 关闭线程池
        executor.shutdown();
    }
}

测试代码时,电脑配置是4核8线程,而我们创建的线程池中线程数最少也是10个,所以,每个线程负责一个任务( 耗时1s ),总体来说,处理10个任务总共需要约1秒。

3.3 合理配置线程池中的线程数

正如我们看到的,CompletableFuture 可以更好地控制线程池中线程的数量,而 ParallelStream 不能

问题1:如何选用 CompletableFuture 和 ParallelStream ?

如果你的任务是IO密集型的,你应该使用CompletableFuture;

如果你的任务是CPU密集型的,使用比处理器更多的线程是没有意义的,所以选择ParallelStream ,因为它不需要创建线程池,更容易使用。

问题2:IO密集型任务和CPU密集型任务的区别?

CPU密集型也叫计算密集型,此时,系统运行时大部分的状况是CPU占用率近乎100%,I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU 使用率很高。比如说要计算1+2+3+…+ 10万亿、天文计算、圆周率后几十位等, 都是属于CPU密集型程序。

CPU密集型任务的特点:大量计算,CPU占用率一般都很高,I/O时间很短

IO密集型指大部分的状况是CPU在等I/O (硬盘/内存) 的读写操作,但CPU的使用率不高。

简单的说,就是需要大量的输入输出,例如读写文件、传输文件、网络请求。

IO密集型任务的特点:大量网络请求,文件操作,CPU运算少,很多时候CPU在等待资源才能进一步操作。

问题3:既然要控制线程池中线程的数量,多少合适呢?

如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 Ncpu+1

如果是IO密集型任务,参考值可以设置为 2 * Ncpu,其中Ncpu 表示 核心数。

总结

通过这两篇文章的讲解,我们基本学习了CompletableFuture这个异步编程类的基础用法和相关进阶玩法,不过总体上还是偏理论,我后续可以可能会开一篇新的专栏,专门讲解和分享Java高并发相关的代码片段,都是比较实用,请多多支持吧~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1504548.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【操作系统概念】第11章:文件系统实现

文章目录 0.前言11.1 文件系统结构11.2 文件系统实现11.2.1 虚拟文件系统 11.3 分配方法11.3.1 连续分配11.3.2 链接分配11.3. 3 索引分配 11.5 空闲空间管理11.5.1 位图/位向量11.5.2 链表11.5.3 组 0.前言 正如第10章所述&#xff0c;文件系统提供了机制&#xff0c;以在线存…

【数据分享】2000-2022年全国1km分辨率的逐年PM2.5栅格数据(免费获取)

PM2.5作为最主要的空气质量指标&#xff0c;在我们日常研究中非常常用&#xff01;之前我们给大家分享了2013-2022年全国范围逐日的PM2.5栅格数据&#xff08;可查看之前的文章获悉详情&#xff09;&#xff01; 本次我们给大家带来的是2000-2022年全国范围的逐年的PM2.5栅格数…

树莓派4B Ubuntu20.04 Python3.9安装ROS踩坑记录

问题描述 在使用sudo apt-get update命令更新时发现无法引入apt-pkg,使用python3 -c "import apt_pkg"发现无法引入&#xff0c;应该是因为&#xff1a;20.04的系统默认python是3.8&#xff0c;但是我换成了3.9所以没有编译文件&#xff0c;于是使用sudo update-alte…

K8S - 在任意node里执行kubectl 命令

当我们初步安装玩k8s &#xff08;master 带 2 nodes&#xff09; 时 正常来讲kubectl 只能在master node 里运行 当我们尝试在某个 node 节点来执行时&#xff0c; 通常会遇到下面错误 看起来像是访问某个服务器的8080 端口失败了。 原因 原因很简单 , 因为k8s的各个组建&…

思科网络中如何配置标准ACL协议

一、什么是标准ACL协议&#xff1f;有什么作用及配置方法&#xff1f; &#xff08;1&#xff09;标准ACL&#xff08;Access Control List&#xff09;协议是一种用于控制网络设备上数据流进出的协议。标准ACL基于源IP地址来过滤数据流&#xff0c;可以允许或拒绝特定IP地址范…

微信私信短剧机器人源码

本源码仅提供参考&#xff0c;有能力的继续开发 接口为api调用 云端同步 https://ys.110t.cn/api/ajax.php?actyingshilist 影视搜索 https://ys.110t.cn/api/ajax.php?actsearch&name剧名 每日更新 https://ys.110t.cn/api/ajax.php?actDaily 反馈接口 https://ys.11…

机器学习-pytorch1(持续更新)

上一节我们学习了机器学习的线性模型和非线性模型的机器学习基础知识&#xff0c;这一节主要将公式变为代码。 代码编写网站&#xff1a;https://colab.research.google.com/drive 学习课程链接&#xff1a;ML 2022 Spring 1、Load Data&#xff08;读取数据&#xff09; 这…

Chain of Verification(验证链、CoVe)—理解与实现

原文地址&#xff1a;Chain of Verification (CoVe) — Understanding & Implementation 2023 年 10 月 9 日 GitHub 存储库 介绍 在处理大型语言模型&#xff08;LLM&#xff09;时&#xff0c;一个重大挑战&#xff0c;特别是在事实问答中&#xff0c;是幻觉问题。当答案…

排序算法全景:从基础到高级的Java实现

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

【红外与可见光融合:条件学习:实例归一化(IN)】

Infrared and visible image fusion based on a two-stage class conditioned auto-encoder network &#xff08;基于两级类条件自编码器网络的红外与可见光图像融合&#xff09; 现有的基于自动编码器的红外和可见光图像融合方法通常利用共享编码器从不同模态中提取特征&am…

Java17 --- springCloud之LoadBalancer

目录 一、LoadBalancer实现负载均衡 1.1、创建两个相同的微服务 1.2、在客户端80引入loadBalancer的pom 1.3、80服务controller层&#xff1a; 一、LoadBalancer实现负载均衡 1.1、创建两个相同的微服务 1.2、在客户端80引入loadBalancer的pom <!--loadbalancer-->&…

ARTS Week 20

Algorithm 本周的算法题为 1222. 可以攻击国王的皇后 在一个 下标从 0 开始 的 8 x 8 棋盘上&#xff0c;可能有多个黑皇后和一个白国王。 给你一个二维整数数组 queens&#xff0c;其中 queens[i] [xQueeni, yQueeni] 表示第 i 个黑皇后在棋盘上的位置。还给你一个长度为 2 的…

js【详解】async await

为什么要使用 async await async await 实现了使用同步的语法实现异步&#xff0c;不再需要借助回调函数&#xff0c;让代码更加易于理解和维护。 (async function () {// await 必须放在 async 函数中try {// 加载第一张图片const img1 await loadImg1()// 加载第二张图片co…

Linux网络套接字之UDP网络程序

(&#xff61;&#xff65;∀&#xff65;)&#xff89;&#xff9e;嗨&#xff01;你好这里是ky233的主页&#xff1a;这里是ky233的主页&#xff0c;欢迎光临~https://blog.csdn.net/ky233?typeblog 点个关注不迷路⌯▾⌯ 实现一个简单的对话发消息的功能&#xff01; 目录…

力扣---腐烂的橘子

题目&#xff1a; bfs思路&#xff1a; 感觉bfs还是很容易想到的&#xff0c;首先定义一个双端队列&#xff08;队列也是可以的~&#xff09;&#xff0c;如果值为2&#xff0c;则入队列&#xff0c;我这里将队列中的元素定义为pair<int,int>。第一个int记录在数组中的位…

毅速3D打印随形透气钢:模具困气排气革新之选

在注塑生产过程中&#xff0c;模具内的气体若无法有效排出&#xff0c;往往会引发困气现象&#xff0c;导致产品表面出现气泡、烧焦等瑕疵。这些瑕疵不仅影响产品的美观度&#xff0c;更可能对其性能造成严重影响&#xff0c;甚至导致产品报废&#xff0c;从而增加生产成本。 传…

【C语言】linux内核tcp_write_xmit和tcp_write_queue_purge

tcp_write_xmit 一、讲解 这个函数 tcp_write_xmit 是Linux内核TCP协议栈中的一部分&#xff0c;其基本作用是发送数据包到网络。这个函数会根据不同情况推进发送队列的头部&#xff0c;确保只要远程窗口有空间&#xff0c;就可以发送数据。 下面是对该函数的一些主要逻辑的中…

C语言--函数指针变量和函数指针数组的区别(详解)

函数指针变量 函数指针变量的作用 函数指针变量是指向函数的指针&#xff0c;它可以用来存储函数的地址&#xff0c;并且可以通过该指针调用相应的函数。函数指针变量的作用主要有以下几个方面&#xff1a; 回调函数&#xff1a;函数指针变量可以作为参数传递给其他函数&…

基于pytorch的视觉变换器-Vision Transformer(ViT)的介绍与应用

近年来&#xff0c;计算机视觉领域因变换器模型的出现而发生了革命性变化。最初为自然语言处理任务设计的变换器&#xff0c;在捕捉视觉数据的空间依赖性方面也显示出了惊人的能力。视觉变换器&#xff08;Vision Transformer&#xff0c;简称ViT&#xff09;就是这种变革的一个…

链表基础知识详解

链表基础知识详解 一、链表是什么&#xff1f;1.链表的定义2.链表的组成3.链表的优缺点4.链表的特点 二、链表的基本操作1.链表的建立2.链表的删除3.链表的查找4.链表函数 一、链表是什么&#xff1f; 1.链表的定义 链表是一种物理存储单元上非连续、非顺序的存储结构&#xf…