并发包工具之 批量处理任务 CompletionService(异步)、CompletableFuture(回调)

news2025/1/23 4:06:16

文章目录

      • 一、处理异步任务并获取返回值——CompletionService
      • 二、线程池
      • 三、Callable 与 Future
      • 四、通过回调方式处理可组合编排任务——CompletableFuture

一、处理异步任务并获取返回值——CompletionService

特点描述:
        对于比较复杂的计算,把任务进行提交,并发执行,哪个任务先执行完,get()方法就会获取到相应的任务结果。

范式:
1、
        假设有一组针对某个问题的任务solvers(需要实现Callable接口,任务的具体逻辑就在其call方法里),每个任务都返回一个类型为Result的值,并且想要并发地运行它们,处理每个返回一个非空值的结果,在某些方法使用:

void solve(Executor e,
            Collection<Callable<Result>> solvers)
     throws InterruptedException, ExecutionException {
     CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     for (Callable<Result> s : solvers)
         ecs.submit(s);
     int n = solvers.size();
     for (int i = 0; i < n; ++i) {
         Result r = ecs.take().get();
         if (r != null)
             use(r);
     }
 }

2、
        假设想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消所有其他任务(比如:多仓库文件/镜像下载,从最近的服务中心下载后,终止其他下载过程)

void solve(Executor e,
            Collection<Callable<Result>> solvers)
     throws InterruptedException {
     CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     int n = solvers.size();
     List<Future<Result>> futures
         = new ArrayList<Future<Result>>(n);
     Result result = null;
     try {
         for (Callable<Result> s : solvers)
             futures.add(ecs.submit(s));
         for (int i = 0; i < n; ++i) {
             try {
                 Result r = ecs.take().get();
                 if (r != null) {
                     result = r;
                     break;
                 }
             } catch (ExecutionException ignore) {}
         }
     }
     finally {
         for (Future<Result> f : futures)
               // 注意这里的参数给的是 true,详解同样在前序 Future 源码分析文章中
             f.cancel(true);
     }

     if (result != null)
         use(result);
 }

总得来说分两步:
1、提交异步任务 submit方法(submit最终会委托给内部的 executor 去执行任务)
2、从队列中拿取并移除元素 take(如果队列为空,那么调用 take() 方法的线程会被阻塞)/poll(…不会被阻塞,返回null)/poll带超时参数(获取并移除阻塞队列中的第一个元素,如果超时时间到而队列还是空,该方法返回null) 方法

实现原理:
        将异步任务的生产、任务完成结果的消费进行解耦,类似mq,哪个任务先执行完,就把结果放到队列中。

唯一实现类:
        ExecutorCompletionService;阻塞队列默认是 LinkedBlockingQueue

二、线程池

为什么要用线程池
        ∵ 手动创建线程的缺点:
1、不受控,系统资源有限,每个人如果都创建的话,标准不一样,线程疯狂抢占资源.,混乱…
2、开销大,创建一个线程需要调用操作系统内核API,然后操作系统要为线程分配一系列资源,创建个线程啥也不干大概需要1M左右大小。

        线程池可以统一管理、控制最大并发数并实现拒绝策略、隔离线程环境;当执行大量异步任务时,线程池里的线程能复用,不用频繁创建和销毁,能够提供好的性能。
       
Java并发包里的线程池——ThreadPoolExecutor; (接口是ExecutorService)
Spring对线程池的封装——ThreadPoolTaskExecutor
       
关于线程池核心线程数的设置:
        CPU是时间片轮转机制来让线程占用的,也就是说程序表面上是同时进行的,实际上是切换执行的,CPU每个时刻只能由一个线程占用,比如 4核CPU,只能同时跑4个线程。
       
对于CPU密集型程序(如运算、逻辑判断等,I/O操作可以在短时间完成,但CPU运算比较多)
——最佳线程数量=CPU核数+1,这个1可以理解为替补,如果某个线程因为发生错误或其他原因暂停了,这个线程可以继续工作。
       
对于I/O密集型(如涉及网络、磁盘、内存等)
——最佳线程数=CPU核心数 * (1/CPU利用率)=CPU核心数 * (1 + (I/O耗时/CPU耗时)),如果几乎都是I/O耗时,可取2N+1(1为替补)
       
(p.s.线程数不是越多越好,线程上下文切换开销不小)
       

三、Callable 与 Future

        Runnable接口的方法没有返回值;Callable 是泛型接口,可以返回指定类型的结果。
        当提交一个Callable 任务后,会同时获得一个Future对象,然后,在主线程某个时刻调用Future对象的get() 方法,就可以获得异步执行的结果。在调用get()时,如果异步任务已经完成,就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果。
       

四、通过回调方式处理可组合编排任务——CompletableFuture

特点描述:
        CompletableFuture是由 Java 8 引入的,在 Java 8之 前一般通过 Future 实现异步,CompletableFuture 对 Future 进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,比如步骤1、2、3存在依赖关系,支持对步骤进一步的编排,降低依赖之间的阻塞。

使用:
在这里插入图片描述

        如上图所示,这里描绘的是一个业务接口的流程,其中包括 CF1\CF2\CF3\CF4\CF5 共5个步骤,并描绘了这些步骤之间的依赖关系,每个步骤可以是一次 RPC 调用、一次数据库操作或者是一次本地方法调用等,在使用 CompletableFuture 进行异步化编程时,图中的每个步骤都会产生一个 CompletableFuture 对象,最终结果也会用一个 CompletableFuture 来进行表示。(只看第一层的话 好像跟 CompletionService 效果差不多… 都可以异步执行批量任务并拿到结果…)

1、零依赖,CompletableFuture 的创建
比如图中所示的 CF1、CF2,可以有以下方式:

     // 1、使用 runAsync 或 supplyAsync 发起异步调用
        // 线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        CompletableFuture<String> CF1 = CompletableFuture.supplyAsync(() -> {
            return "CF1 result";
        }, executorService);

        // 2、CompletableFuture.completedFuture() 直接创建一个已完成状态的 CompletableFuture<
        CompletableFuture<String> CF2 = CompletableFuture.completedFuture("CF2 result");

        // 3、先初始化一个未完成的 CompletableFuture
        CompletableFuture<String> CF3 = new CompletableFuture<>();
        // 然后通过complete()、completeExceptionally(),完成该CompletableFuture
        CF3.complete("CF3result");

2、一元依赖,依赖一个 CompletableFuture
比如图中所示的 CF3、CF5,可以用 thenApply、thenAccept、thenCompose 等方法来实现:

// result为CF1的结果
    CompletableFuture<String> CF3=CF1.thenApply(result->{
        return "CF3result";
    });

3、二元依赖:依赖两个 CompletableFuture
比如图中所示的 CF4,这种二元依赖可以通过 thenCombine 等回调来实现:

   // result1、result2分别为CF1、CF2的结果
        CompletableFuture<String> CF4 = CF1.thenCombine(CF2, (result1, result2) -> {
            return "CF4result";
        });

4、多元依赖,依赖多个 CompletableFuture
比如图中所示的 CF6,依赖于三个步骤CF3、CF4、CF5,这种多元依赖可以通过 allOf 或 anyOf 方法来实现,区别是当需要多个依赖全部完成时使用allOf,当多个依赖中的任意一个完成即可时使用anyOf:

    CompletableFuture<Void> CF6 = CompletableFuture.allOf(CF3, CF4, CF5);
        CompletableFuture<String> result = CF6.thenApply(v ->
        {
            // 这里的 join是完成任务后用来获取结果的,并不会阻塞
            // 因为传给 thenApply 的函数都是在 CF3、CF4、CF5 全都完成时才会执行
            String result3 = CF3.join();
            String result4 = CF4.join();
            String result5 = CF5.join();
            // 根据 result3、result4、result5组装最终 result
            return result3 + result4 + result5;
        });

        如果只用一层的话,异步执行批量任务并拿到总的结果,参考api里 allOf:
在这里插入图片描述

代码示例:

 // 任务入参集合
            ArrayList<String> paramList = new ArrayList<>();
            // 用于汇总所有结果
            ArrayList<String> resultList = new ArrayList<>();
            CompletableFuture.allOf(
                    paramList.stream().map(string ->
                            CompletableFuture.supplyAsync(() ->
                                            // 这里返回了本身,实际上也可以是具体的方法
                                            string,
                                    asyncServiceExecutor)
                                    // thenApply是对结果做简单映射,类似于Stream.map,list->list就是原样往下传递,这里不使用thenApply也行
                                    .thenApply(list -> list)
                                    .whenComplete((result, e) -> {
                                        // 对异常结果的处理
                                        if (e != null) System.out.println("exception");
                                        // 汇总结果
                                        resultList.add(result);
                                    })
                    ).toArray(CompletableFuture[]::new)
			// 完成后返回结果值,如果异常完成则抛出(未经检查)异常,相当于一个等待任务完成的动作
            ).join(); 

参考文档:
https://dayarch.top/p/how-many-threads-should-be-created.html
https://segmentfault.com/a/1190000023129592?utm_source=sf-similar-article
https://segmentfault.com/a/1190000023587881
https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/342418.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

dfs(九)字符串的全排列

字符串的排列_牛客题霸_牛客网【牛客题霸】收集各企业高频校招笔面试题目&#xff0c;配有官方题解&#xff0c;在线进行百度阿里腾讯网易等互联网名企笔试面试模拟考试练习,和牛人一起讨论经典试题,全面提升你的技术能力https://www.nowcoder.com/practice/fe6b651b66ae47d7ac…

Windows 安装Tomcat

版本:tomcat8.5jdk-8u231一.解压JDK安装包 更换JDK安装路径二.解压安装Tomcat 选择jdk安装路径更换tomcat安装路径三.设置环境变量 1.“环境变量”界面中系统变量点击”新建“&#xff0c;创建CATALINA_HOMEC:\RESSET\tomcat&#xff08;Tomcat服务器的根目录&#xff09;2.创建…

电脑录屏win+g没反应怎么办?打开这2个开关,就能解决

有不少的小伙伴在使用电脑自带的录屏软件的时候&#xff0c;发现录屏快捷键wing没反应了。电脑录屏wing没反应怎么办&#xff1f;解决办法很简单&#xff0c;只需要打开2个开关&#xff0c;就能够快速解决&#xff0c;一起来看看吧&#xff01; 一、电脑录屏wing没反应怎么办&a…

响应式布局之viewport-超级简单

之前文章CSS布局之详解_故里2130的博客-CSDN博客 上面的文章可以实现响应式布局&#xff0c;根据浏览器的大小变化而变化&#xff0c;但是相对于viewport来说&#xff0c;之前的还是有点复杂&#xff0c;而使用viewport更加的简单。 当我们使用amfe-flexible的时候&#xff0…

记录ideal中使用springboot遇到的问题

持续记录&#xff0c;避免反复查找资料 选择Maven构建项目 创建springboot项目时&#xff0c;【Project Metadata页】的Type选项默认是Gradle&#xff0c;如果要使用Maven需要修改选项&#xff0c;如下图 mysql依赖包直接在pom.xml中添加 创建时在对话框中勾选【Mysql Driver】…

两个月,测试转岗产品经理,我是怎么规划的?

​本期同学依旧来自深圳 测试到产品转变&#xff0c;用了两个月 本周&#xff0c;为大家介绍M同学的佛系转岗经历 学员档 学员档案 原岗位&#xff1a;测试 转岗级别&#xff1a;中级产品经理 转岗特点&#xff1a; 1.未接触产品工作 2.对岗位地点要求严格 先看结果 …

浅显易懂的说清楚小游戏与H5游戏的技术区别

从“跳一跳”到“羊了个羊”微信小游戏上线4年时间&#xff0c;除了涌现出不少火爆全网的小游戏之外&#xff0c;也有类似于“动物餐厅”、“口袋奇兵”等游戏得以在此孵化繁荣&#xff0c;凭借着微信强大的社交属性小游戏成为游戏厂商在桌面端、App 端、H5 端之外争夺的另一个…

40个改变你编程技能的小技巧!

40个改变编程技能的小技巧 1、将大块代码分解成小函数 2、今日事今日毕&#xff0c;如果没毕&#xff0c;就留到明天。 如果下班之前还没有解决的问题&#xff0c;那么你需要做的&#xff0c;就是关闭电脑&#xff0c;把它留到明天。 中途不要再想着问题了&#xff01; 3、…

【LeetCode】不同的二叉搜索树 [M](卡特兰数)

96. 不同的二叉搜索树 - 力扣&#xff08;LeetCode&#xff09; 一、题目 给你一个整数 n &#xff0c;求恰由 n 个节点组成且节点值从 1 到 n 互不相同的 二叉搜索树 有多少种&#xff1f;返回满足题意的二叉搜索树的种数。 示例 1&#xff1a; 输入&#xff1a;n 3 输出&a…

学会使用LoadRunner录制脚本

1.LoadRunner安装 https://blog.csdn.net/weixin_48584088/article/details/129012469 2.Loadrunner的基本概念 LoadRunner是一种适用于许多软件体系架构的自动负载测试工具&#xff0c;从用户关注的响应时间、吞吐量&#xff0c; 并发用户和性能计数器等方面来衡量系统的性…

Linux数据流重定向

数据流重定向就是将某个命令执行后应该要出现在屏幕上的数据&#xff0c;给它传输到其他地方去 如果我们要执行一个命令&#xff0c;它通常是这样的&#xff1a; 标准输入、输出、错误stdin标准输入0stdout标准输出1stderr标准错误输出2标准输入&#xff08;stdin&#xff09;…

ESP-IDF:使用多任务测试互斥体mutex

代码&#xff1a; #include <stdio.h> #include “sdkconfig.h” #include “freertos/FreeRTOS.h” #include “freertos/task.h” #include “freertos/semphr.h” #include “esp_system.h” #include “esp_spi_flash.h” /互斥体测试/ SemaphoreHandle_t xmutex; …

【html弹框拖拽和div拖拽功能】原生html页面引入vue语法后通过自定义指令简单实现div和弹框拖拽功能

前言 这是html版本的。只是引用了vue的语法。 这是很多公司会出现的一种情况&#xff0c;就是原生的页面&#xff0c;引入vue的语法开发 这就导致有些vue上很简单的功能。放到这里需要转换一下 以前写过一个vue版本的帖子&#xff0c;现在再加一个html版本的。 另一个vue版本…

CentOS8基础篇5:用户账号与用户组的创建

一、用户与用户组概念 Linux是一个多用户、多任务的服务器操作系统&#xff0c;多用户多任务指可以在系统上建立多个用户&#xff0c;而多个用户可以在同一时间内登录同一个系统执行各自不同的任务&#xff0c;而互不影响。 Linux用户是根据角色定义的&#xff0c;具体分为三…

JVM - 内存分配

目录 JVM的简化架构和运行时数据区 JVM的简化架构 运行时数据区 PC寄存器 Java栈 Java堆 方法区 运行时常量池 本地方法栈 栈、堆、方法区交互关系 Java堆内存模型和分配 Java堆内存概述 Java堆的结构 对象的内存布局 对象的访问定位 Trace跟踪和Java堆的参数配…

【漏洞真实影响分析】Apache Kafka Connect 模块JNDI注入(CVE-2023-25194)

系列简介&#xff1a; 漏洞真实影响分析是墨菲安全实验室针对热点漏洞的分析系列文章&#xff0c;帮助企业开发者和安全从业者理清漏洞影响面、梳理真实影响场景&#xff0c;提升安全应急响应和漏洞治理工作效率。 漏洞概述 Apache Kafka Connect服务在2.3.0 至 3.3.2 版本中&…

sql server安装并SSMS连接

博主简介&#xff1a;原互联网大厂tencent员工&#xff0c;网安巨头Venustech员工&#xff0c;阿里云开发社区专家博主&#xff0c;微信公众号java基础笔记优质创作者&#xff0c;csdn优质创作博主&#xff0c;创业者&#xff0c;知识共享者,欢迎关注&#xff0c;点赞&#xff…

vue3使用vis绘制甘特图制作timeline可拖动时间轴,时间轴中文化

本文写作顺序&#xff1a;效果展示——子组件封装——父组件传值 仅js的原始图&#xff08;回归初始化&#xff09;、撤销&#xff08;上一步&#xff09;功能实现&#xff0c;样式需要自己调整 目录前言&#xff1a;参考文档文章一、实现效果&#xff1a;二、安装插件及依赖&a…

0代码实现接口自动化测试 —— RF框架实践

robotframework是一款关键字自动化测试框架&#xff0c;可能做各种类型的自动化测试。本文介绍通过 robotframework 来实现接口测试。 01、安装接口请求的第三方库 pip install robotframework-requests 在python安装目录的Lib\site-packages可以看到 02、接口关键字基础 ro…

【操作系统】操作系统IO技术底层机制和ZeroCopy

1.DMA技术详解 &#xff08;1&#xff09;应用程序 从 磁盘读写数据 的时序图&#xff08;未用DMA技术前&#xff09; &#xff08;2&#xff09;什么是DMA 技术 (Direct Memory Access&#xff09; 直接内存访问&#xff0c;直接内存访问是计算机科学中的一种内存访问技术。…