CompletableFuture的allOf一定不要乱用!血泪史复盘

news2025/1/20 14:55:47

文章目录

  • 1. 到底遇到了什么问题?
  • 2. CountDownLatch搞起?
  • 3. allOf里面的坑
  • 4. 优化建议:

1. 到底遇到了什么问题?

最近看到组里面的同学遇到了这样的业务场景:

主线程需要异步并发调用多个接口,并且主线程需要等待线程全部执行完成之后,返回执行结果,业务流程如下:

这里面有一个注意点:一旦异步线程有一个失败,我主线程就不等待了,这种需求很常见,比如多线程拼装对象数据等等。那我们如何解决呢?

一定要坚持看完!细节往往决定成败,写代码也一样!!收获多多
在这里插入图片描述

2. CountDownLatch搞起?

此时有可能有可能第一时间会想到CountDownLatch,但是CountDownLatch本质内部是采用计数器,主线程一直调用await()阻塞等待,需要等待所有子线程执行完之后(不管成功或者失败)主线程才会往下执行,且主线程和子线程无法传递异常,并且需要注意异常的抛出。

这块可以参考之前写的CountDownLatch文章:

关于CountDownLatch的底层源码和闭坑指南,只看这一篇就够了!!

这里面有常见的坑一栏,可以重点看下,保你必有收获,所以CountDownLatch并不是最优方案。

这块很多同学会选用CompletableFuture来实现,因为CompletableFuture底层有丰富的任务编排和链式调用,并且此时肯定会有很多同学说CompletableFuture.allOf可以轻松实现。

此时如果你不自己去试试的话,八股文一背,此时你就被带偏了!!我们可以去看一下,在接下来会给出一个allOf的使用场景,把这个问题重现一下:

3. allOf里面的坑

场景:当有一批任务交给线程池执行,我们需要获取所有线程的返回结果。

首先定义一个并发执行器类:

public class CompletableFutureEngine {

    private final static ExecutorService executorService = Executors.newFixedThreadPool(4);

    /**
     * 创建并行任务并执行
     *
     * @param list            数据源
     * @param function        API调用逻辑
     * @param exceptionHandle 异常处理逻辑
     * @return 处理结果列表
     */
    public static <S, T> List<T> parallelFutureJoin(Collection<S> list, Function<S, T> function, Consumer<Throwable> exceptionHandle) {

        List<CompletableFuture<T>> completableFutures = list.stream()
                .map(s -> CompletableFuture.supplyAsync(() -> function.apply(s)))
                .collect(Collectors.toList());

        List<T> results = new ArrayList<>();
        try {
            CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join();
            for (CompletableFuture<T> completableFuture : completableFutures) {
                results.add(completableFuture.get());
            }
        } catch (Exception e) {
            if (e instanceof CompletionException) {
                if (e.getCause() != null) {
                    exceptionHandle.accept(e.getCause());
                }
            }
        }
        return results;
    }

}

调用方法:

public class EngineDemo {

    private static void sleep(long sleepTime) {
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void currentDate(String str) {
        // 创建一个Date对象,它包含了当前时间
        Date now = new Date();
        // 创建一个SimpleDateFormat对象,用于指定输出格式
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        // 使用format方法将Date对象格式化为字符串
        String currentTime = dateFormat.format(now);
        // 打印当前时间
        System.out.println(str + "是: " + currentTime);
    }

    public static void main(String[] args) {
        currentDate("执行前");
        List<Integer> numList = CompletableFutureEngine.parallelFutureJoin(Arrays.asList(1, 3, 5),
            num -> {
                sleep(num * 1000);
                if (num == 1) {
                    throw new BusinessException("心别太大");
                }
                return num;
            }
            , e -> {
                if (e instanceof BusinessException) {
                    System.out.println("BusinessException =" + e.getMessage());
                } else {
                    System.out.println("Exception entrance");
                }
            }
        );
        System.out.println(numList);
        currentDate("执行后");

    }
}

此时,你会发现,程序的执行时间为5秒钟,感觉似乎和想的不一样,因为在代码中,是根据num决定的休眠时间,因此在第一印象中,应该是第一秒执行完就会抛出异常。尴尬了,现在从日志看程序的执行时间是5秒,那我们看下原因:走起,那我们debug下代码:

你会发现当程序抛出异常的时候,发现传进来的三个CompletableFuture,不管是成功还是失败都执行完了。

这也就说明了当抛出异常后,allOf并不会及时感知异常,而是等所有任务都执行完之后才往下继续运行,那此处会有两种情况:

  1. 使用完allOf之后,还要去做流程编排,不去直接get的话,这种方式没有问题。
  2. 需要汇聚所有的子线程执行结果返回给主线程,这种allOf可能效率会低,因为需要等待所有的子线程执行完才会去返回最终的结果,那如果遇到主要一个子线程失败,执行就失败,就会导致执行时间短的失败了,但是有个任务执行时间很长,返回时间也会变长,从而导致主线程等待的时间变长.

那针对于第二种情况应该如何优化呢!!!继续往下看!!!

4. 优化建议:

可以用CompletableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()) 来代替 allOf。这个方法只要有一个子线程出现异常,主线程就会感知到异常,不用等待其他线程执行完。

优化后的并发执行器如下:

public class CompletableFutureEngine2 {

    /**
     * 创建并行任务并执行
     *
     * @param list            数据源
     * @param function        API调用逻辑
     * @return 处理结果列表
     */
    public static <S, T> List<T> parallelFutureJoin(Collection<S> list, Function<S, T> function, Consumer<Throwable> exceptionHandle) {

        List<CompletableFuture<T>> completableFutures = list.stream()
                .map(s -> CompletableFuture.supplyAsync(() -> function.apply(s)))
                .collect(Collectors.toList());

        List<T> results = null;
        try {
            results = completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
        } catch (Exception e) {
            if (e instanceof CompletionException) {
                if (e.getCause() != null) {
                    exceptionHandle.accept(e.getCause());
                }
            }
        }
        return results;
    }
}

调用demo如下:

public class EngineDemo {

    private static void sleep(long sleepTime) {
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void currentDate(String str) {
        // 创建一个Date对象,它包含了当前时间
        Date now = new Date();
        // 创建一个SimpleDateFormat对象,用于指定输出格式
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        // 使用format方法将Date对象格式化为字符串
        String currentTime = dateFormat.format(now);
        // 打印当前时间
        System.out.println(str + "是: " + currentTime);
    }

    public static void main(String[] args) {
        currentDate("执行前");
        List<Integer> numList = CompletableFutureEngine2.parallelFutureJoin(Arrays.asList(1, 3, 5),
            num -> {
                sleep(num * 1000);
                if (num == 1) {
                    throw new BusinessException("心别太大");
                }
                return num;
            }
            , e -> {
                if (e instanceof BusinessException) {
                    System.out.println("BusinessException =" + e.getMessage());
                } else {
                    System.out.println("Exception entrance");
                }
            }
        );
        System.out.println(numList);
        currentDate("执行后");

    }
}

运行完之后,我们再去看下执行日志

你会发现,因为子线程的睡眠时间为传进来的num值,当num=1时,触发告警,因此主线程在等待一秒中就会感知到异常,郑如日志打印的时间间隔为1秒钟。

此时再去想想:那此时抛出异常时,num为3,5执行状态时怎么样的,再去debug一下触发异常这块代码:

你会发现当num=1抛出异常的时候,其他两个线程的执行状态还是未完成,这就是和allOf这个方法最大的区别。

是否感觉只有自己试验过才会有更大的收获,今天就先分享到这,后面干货多多!!

感觉对您有所启发的话,记得帮忙点赞,收藏加关注,并且分享给需要的小伙伴!!加油!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2153703.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大模型终极指南:零基础到精通,一文搞定!

随着 ChatGPT 的到来&#xff0c;大模型[1]&#xff08;Large Language Model&#xff0c;简称 LLM&#xff09;成了新时代的 buzzword&#xff0c;各种 GPT 产品百花齐放。 大多数人直接用现有产品就可以了&#xff0c;但对于喜欢刨根问底的程序员来说&#xff0c;能够在本地…

python-SZ斐波那契数列/更相减损数

一&#xff1a;SZ斐波那契数列题目描述 你应该很熟悉斐波那契数列&#xff0c;不是吗&#xff1f;现在小理不知在哪里搞了个山寨版斐波拉契数列&#xff0c;如下公式&#xff1a; F(n) { $\ \ \ \ \ \ \ \ \ \ \ \ $ a,( n1) $\ \ \ \ \ \ \ \ \ \ \ \ $ b,( n2) $\ \ \ \ \ \ …

回归预测 | Matlab实现ReliefF-XGBoost多变量回归预测

回归预测 | Matlab实现ReliefF-XGBoost多变量回归预测 目录 回归预测 | Matlab实现ReliefF-XGBoost多变量回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.ReliefF-xgboost回归预测代码&#xff0c;对序列数据预测性能相对较高。首先通过ReleifF对输入特征计算权…

Spring中的Web Service消费者集成(应该被淘汰的技术)

问题 最近需要对接一个老系统的web service接口&#xff0c;我已经有7&#xff0c;8年没有遇到过这种接口了。 思路 先用Spring空项目中的jaxws-maven-plugin插件生成一波web service客户端&#xff0c;然后&#xff0c;集成到现有的SpringBoot3项目中使用就可以了。 生成w…

人才有约,职为你:颐年集团携手粤荣学校共绘养老行业的美好未来

摘要&#xff1a;广州市白云区粤荣职业培训学校成功举办“人才有约&#xff0c;职为你”颐年集团养老机构线下招聘会 2024年9月19日下午2点30分&#xff0c;广州市白云区金骊城二楼热闹非凡。粤荣职业培训学校携手颐年集团&#xff0c;共同举办了主题为“人才有约&#xff0c;…

做短剧申请微信小程序备案整体的操作流程!

做国内短剧对接微信小程序&#xff0c;小程序备案是必不可少的&#xff0c;需要准备哪些资料&#xff0c;以及需要注意的事项&#xff0c;所需材料全部整理出来了&#xff0c;小程序从注册到类目和备案分为五个步骤来讲解&#xff0c;下面就由我来向大家介绍所有的操作流程。 …

【RabbitMQ】消息分发、事务

消息分发 概念 RabbitMQ队列拥有多个消费者时&#xff0c;队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅该队列订阅列表里的一个消费者。这种方式非常适合扩展&#xff0c;如果现在负载加重&#xff0c;那么只需要创建更多的消费者来消费处理消息即可。 默…

docker desktop windows stop

服务docker改为启动 cmd下查看docker版本 {"builder": {"gc": {"defaultKeepStorage": "20GB","enabled": true}},"experimental": false,"registry-mirrors": ["https://hub.atomgit.com/"]…

详解c++:new和delete

文章目录 前言一、new和mallocnew的用法&#xff08;爽点&#xff09;自动构造 delete和freedelete的用法&#xff08;爽点&#xff09; 提醒 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 在C中&#xff0c;new 和 delete 是两个非常重要的操作符&am…

Python面相对象案例熟悉对MySQL的操作(案例一、学生管理系统,案例二、模拟注册与用户登录)有源码

Python面相对象案例熟悉对MySQL的操作 案例一&#xff0c;学生管理系统 对数据表的要求&#xff1a; 在mysql中创建数据库gamedb,创建用户表userinfo,字段如下&#xff1a; 用户编号uid(int,主键&#xff0c;自动增长) 用户姓名uname(varchar(20),非空) 用户昵称nickname(…

【Delphi】Delphi 中的 LiveBindings 使用场景与概念

LiveBindings 是 Delphi 提供的一种数据绑定机制&#xff0c;用于将 UI 控件与数据源&#xff08;如数据库字段、对象属性等&#xff09;进行动态连接。LiveBindings 允许开发人员通过可视化的方式绑定数据&#xff0c;省去了大量的手动编写代码&#xff0c;使 UI 更新和数据同…

RAG+Agent人工智能平台:RAGflow实现GraphRA知识库问答,打造极致多模态问答与AI编排流体验

1.RAGflow简介 全面优化的 RAG 工作流可以支持从个人应用乃至超大型企业的各类生态系统。大语言模型 LLM 以及向量模型均支持配置。基于多路召回、融合重排序。提供易用的 API&#xff0c;可以轻松集成到各类企业系统。支持丰富的文件类型&#xff0c;包括 Word 文档、PPT、exc…

『玉竹』基于Laravel 开发的博客、微博客系统和Android App

基于 Laravel 和 Filament 开发, 使用 Filament 开发管理后台&#xff0c;前端比较简洁。 博客大家都清楚是什么东西&#xff0c;微博客类似于微博之类的吧&#xff0c;有时候想要写的东西可能只有几句话&#xff0c;想要起个标题都不好起。 为了是微博客功能更好用&#xff0c…

Navicat导入Sql文件至Mysql数据库,事务失效

Mysql 版本&#xff1a;8.0.39 Navicat 版本&#xff1a;17.x、16.x 结论&#xff1a; Navicat 导入sql文件&#xff0c;事务不会生效&#xff0c;无论怎么设置 mysql.exe 导入sql文件&#xff0c;事务生效 测试 准备一张表 name约束不能为空&#xff0c;用于测试事务失败…

Qemu开发ARM篇-2、uboot交叉编译

文章目录 1、交叉编译工具安装2、uboot交叉编译3、FAQ 在继上一篇 Qemu开发ARM篇-1、环境搭建篇中&#xff0c;我们搭建安装了qemu虚拟机&#xff0c;在本节中&#xff0c;我们将演示如何安装交叉编译工具并交叉编译 uboot,在下一节中&#xff0c;我们将演示如何使用 qemu运…

如何快速找回Finalshell中VPS的SSH密码

买了vps亦或者重装了系统&#xff0c;就会更新SSH的连接密码&#xff0c;如果忘记保存或者遗忘&#xff0c;在邮箱里也找不到&#xff0c;再重装系统会非常麻烦。这时就需要在Finalshell中找回SHH的密码了。方法如下&#xff1a; 第一步&#xff1a;无认哪一种方法&#xff0c…

嵌入式入门小工程

此代码基于s3c2440 1.点灯 //led.c void init_led(void) {unsigned int t;t GPBCON;t & ~((3 << 10) | (3 << 12) | (3 << 14) | (3 << 16));t | (1 << 10) | (1 << 12) | (1 << 14) | (1 << 16);GPBCON t; }void le…

window下idea中scala的配置

目录 Scala安装步骤&#xff1a; 1.下载scala安装包 2.配置环境变量&#xff1a; 3.检查scala是否安装成功&#xff1a; 4.idea安装scala插件 5.导入scala-sdk 6.新建scala文件 Scala安装步骤&#xff1a; 1.下载scala安装包 访问Scala官网&#xff1a;https://www.sca…

MySQL高阶1907-按分类统计薪水

目录 题目 准备数据 分析数据 总结 题目 结果表 必须 包含所有三个类别。 如果某个类别中没有帐户&#xff0c;则报告 0 。 按 任意顺序 返回结果表。 查询每个工资类别的银行账户数量。 工资类别如下&#xff1a; "Low Salary"&#xff1a;所有工资 严格低于…

YOLOv8改进 | 特征融合篇,YOLOv8添加iAFF(多尺度通道注意力模块),并与C2f结构融合,提升小目标检测能力

摘要 特征融合,即来自不同层或分支的特征的组合,是现代网络架构中无处不在的一部分。虽然它通常通过简单的操作(如求和或拼接)来实现,但这种方式可能并不是最佳选择。在这项工作中,提出了一种统一且通用的方案,即注意力特征融合(Attentional Feature Fusion),适用于…