【Java多线程】CompletableFuture 异步多线程

news2025/1/22 16:44:51

1. 回顾 Future

一些业务场景我们需要使用多线程异步执行任务,加快任务执行速度。

JDK5新增了Future接口,用于描述一个异步计算的结果。

虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,我们必须使用Future.get()的方式阻塞调用线程,或者使用轮询方式判断 Future.isDone 任务是否结束,再获取结果。

这两种处理方式都不是很优雅,相关代码如下:

@Test  
public void testFuture() throws ExecutionException, InterruptedException {  
    ExecutorService executorService = Executors.newFixedThreadPool(5);  
    Future<String> future = executorService.submit(() -> {  
        Thread.sleep(2000);  
        return "hello";  
    });  
    System.out.println(future.get());  
    System.out.println("end");  
}  

与此同时,Future无法解决多个异步任务需要相互依赖的场景,简单点说就是,主线程需要等待子线程任务执行完毕之后在进行执行,这个时候你可能想到了CountDownLatch,没错确实可以解决,代码如下。

这里定义两个Future,第一个通过用户id获取用户信息,第二个通过商品id获取商品信息。

@Test  
 public void testCountDownLatch() throws InterruptedException, ExecutionException {  
     ExecutorService executorService = Executors.newFixedThreadPool(5);  
     CountDownLatch downLatch = new CountDownLatch(2);  
     long startTime = System.currentTimeMillis();  
     Future<String> userFuture = executorService.submit(() -> {  
         //模拟查询商品耗时500毫秒  
         Thread.sleep(500);  
         downLatch.countDown();  
         return "用户A";  
     });  

     Future<String> goodsFuture = executorService.submit(() -> {  
         //模拟查询商品耗时500毫秒  
         Thread.sleep(400);  
         downLatch.countDown();  
         return "商品A";  
     });  

     downLatch.await();  
     //模拟主程序耗时时间  
     Thread.sleep(600);  
     System.out.println("获取用户信息:" + userFuture.get());  
     System.out.println("获取商品信息:" + goodsFuture.get());  
     System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");  

 }  

「运行结果」

获取用户信息:用户A
获取商品信息:商品A
总共用时1110ms
从运行结果可以看出结果都已经获取,而且如果我们不用异步操作,执行时间应该是:500+400+600 = 1500,用异步操作后实际只用1110。

但是Java8以后这就不在认为是一种优雅的解决方式,接下来了解下CompletableFuture的使用。

2. CompletableFuture

JDK8之后,提供了CompletableFuture实现异步线程。不推荐再使用Future

通过CompletableFuture实现上面示例

@Test  
public void testCompletableInfo() throws InterruptedException, ExecutionException {  
    long startTime = System.currentTimeMillis();  

      //调用用户服务获取用户基本信息  
      CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() ->  
              //模拟查询商品耗时500毫秒  
      {  
          try {  
              Thread.sleep(500);  
          } catch (InterruptedException e) {  
              e.printStackTrace();  
          }  
          return "用户A";  
      });  

      //调用商品服务获取商品基本信息  
      CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() ->  
              //模拟查询商品耗时500毫秒  
      {  
          try {  
              Thread.sleep(400);  
          } catch (InterruptedException e) {  
              e.printStackTrace();  
          }  
          return "商品A";  
      });  

      System.out.println("获取用户信息:" + userFuture.get());  
      System.out.println("获取商品信息:" + goodsFuture.get());  

      //模拟主程序耗时时间  
      Thread.sleep(600);  
      System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");  
}  

[运行结果]

获取用户信息:用户A
获取商品信息:商品A
总共用时1112ms

通过CompletableFuture可以很轻松的实现CountDownLatch的功能
当然,CompletableFuture还有其他许多新功能:比如可以实现:任务1执行完了再执行任务2,甚至任务1执行的结果,作为任务2的入参数等等强大功能,下面就来学学CompletableFuture的API。

CompletableFuture创建方式

1、常用的4种创建方式

CompletableFuture源码中有四个静态方法用来执行异步任务

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}  
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}  
public static CompletableFuture<Void> runAsync(Runnable runnable){..}  
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}  

一般我们用上面的静态方法来创建CompletableFuture
区别:
supplyAsync执行任务,支持返回值。
runAsync执行任务,没有返回值。
supplyAsync方法

//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务  
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)  
//自定义线程,根据supplier构建执行任务  
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)  
「runAsync方法」

//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务  
public static CompletableFuture<Void> runAsync(Runnable runnable)   
//自定义线程,根据runnable构建执行任务  
public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)  

2、结果获取的4种方式

对于结果的获取CompltableFuture类提供了四种方式

//方式一  
public T get()  
//方式二  
public T get(long timeout, TimeUnit unit)  
//方式三  
public T getNow(T valueIfAbsent)  
//方式四  
public T join()  

说明:

get()get(long timeout, TimeUnit unit) => 在Future中就已经提供了,后者提供超时处理,如果在指定时间内未获取结果将抛出超时异常
getNow() => 立即获取结果不阻塞,结果计算已完成将返回结果或计算过程中的异常,如果未计算完成将返回设定的valueIfAbsent值
join() => 方法里不会抛出异常
示例:

@Test  
public void testCompletableGet() throws InterruptedException, ExecutionException {  

    CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {  
        try {  
            Thread.sleep(1000);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
        return "商品A";  
    });  

    // getNow方法测试   
    System.out.println(cp1.getNow("商品B"));  

    //join方法测试   
    CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((() -> 1 / 0));  
    System.out.println(cp2.join());  
   System.out.println("-----------------------------------------------------");  
    //get方法测试  
    CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((() -> 1 / 0));  
    System.out.println(cp3.get());  
}  

「运行结果」

第一个执行结果为 「商品B」,因为要先睡上1秒结果不能立即获取
join方法获取结果方法里不会抛异常,但是执行结果会抛异常,抛出的异常为CompletionException
get方法获取结果方法里将抛出异常,执行结果抛出的异常为ExecutionException

3. CountDownLatch

CountDownLatch可以使一个获多个线程等待其他线程各自执行完毕后再执行。

CountDownLatch 定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0之前,阻塞队列里面的线程处于挂起状态,当计数器递减到0时会唤醒阻塞队列所有线程,这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器,CountDownLatch可以解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景。

CountDownLatch 常用方法说明

CountDownLatch(int count); //构造方法,创建一个值为count 的计数器。

await();//阻塞当前线程,将当前线程加入阻塞队列。

await(long timeout, TimeUnit unit);//在timeout的时间之内阻塞当前线程,时间一过则当前线程可以执行,

countDown();//对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。

举例:用CountDownLatch 来优化我们的报表统计

运营系统有统计报表、业务为统计每日的用户新增数量、订单数量、商品的总销量、总销售额…等多项指标统一展示出来,因为数据量比较大,统计指标涉及到的业务范围也比较多,所以这个统计报表的页面一直加载很慢,所以需要对统计报表这块性能需进行优化。

问题分析:
统计报表页面涉及到的统计指标数据比较多,每个指标需要单独的去查询统计数据库数据,单个指标只要几秒钟,但是页面的指标有10多个,所以整体下来页面渲染需要将近一分钟。

解决方案:
任务时间长是因为统计指标多,而且指标是串行的方式去进行统计的,我们只需要考虑把这些指标从串行化的执行方式改成并行的执行方式,那么整个页面的时间的渲染时间就会大大的缩短, 如何让多个线程同步的执行任务,我们这里考虑使用多线程,每个查询任务单独创建一个线程去执行,这样每个统计指标就可以并行的处理了。

要求:
因为主线程需要每个线程的统计结果进行聚合,然后返回给前端渲染,所以这里需要提供一种机制让主线程等所有的子线程都执行完之后再对每个线程统计的指标进行聚合。 这里我们使用CountDownLatch 来完成此功能。

模拟代码

1、分别统计4个指标用户新增数量、订单数量、商品的总销量、总销售额;
2、假设每个指标执行时间为3秒。如果是串行化的统计方式那么总执行时间会为12秒。
3、我们这里使用多线程并行,开启4个子线程分别进行统计
4、主线程等待4个子线程都执行完毕之后,返回结果给前端。


    //用于聚合所有的统计指标
    private static Map map=new HashMap();
    //创建计数器,这里需要统计4个指标
    private static CountDownLatch countDownLatch=new CountDownLatch(4);public static void main(String[] args) {
        //记录开始时间
        long startTime=System.currentTimeMillis();Thread countUserThread=new Thread(new Runnable() {
            public void run() {
                try {
                    System.out.println("正在统计新增用户数量");
                    Thread.sleep(3000);//任务执行需要3秒
                    map.put("userNumber",1);//保存结果值
                    //countDownLatch.countDown();//标记已经完成一个任务
                    System.out.println("统计新增用户数量完毕");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally{
                	countDownLatch.countDown();//标记已经完成一个任务
                }}
        });
        Thread countOrderThread=new Thread(new Runnable() {
            public void run() {
                try {
                    System.out.println("正在统计订单数量");
                    Thread.sleep(3000);//任务执行需要3秒
                    map.put("countOrder",2);//保存结果值
                    System.out.println("统计订单数量完毕");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                	countDownLatch.countDown();//标记已经完成一个任务
                }
            }
        });Thread countGoodsThread=new Thread(new Runnable() {
            public void run() {
                try {
                    System.out.println("正在商品销量");
                    Thread.sleep(3000);//任务执行需要3秒
                    map.put("countGoods",3);//保存结果值
                    System.out.println("统计商品销量完毕");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                	countDownLatch.countDown();//标记已经完成一个任务
               	}
            }
        });Thread countmoneyThread=new Thread(new Runnable() {
            public void run() {
                try {
                    System.out.println("正在总销售额");
                    Thread.sleep(3000);//任务执行需要3秒
                    map.put("countmoney",4);//保存结果值
                    System.out.println("统计销售额完毕");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                	countDownLatch.countDown();//标记已经完成一个任务
               	}
            }
        });
        
        //启动子线程执行任务
        countUserThread.start();
        countGoodsThread.start();
        countOrderThread.start();
        countmoneyThread.start();try {
            //主线程等待所有统计指标执行完毕
            countDownLatch.await();
            long endTime=System.currentTimeMillis();//记录结束时间
            System.out.println("------统计指标全部完成--------");
            System.out.println("统计结果为:"+map.toString());
            System.out.println("任务总执行时间为"+(endTime-startTime)/1000+"秒");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

注意:countDownLatch.countDown();放在finally中,防止线程异常把机器卡死

「运行结果」
在这里插入图片描述

参考链接
https://zhuanlan.zhihu.com/p/647743286
https://zhuanlan.zhihu.com/p/95835099?utm_id=0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/866464.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

docker安装Nacos的《小白专用》详细教程

1.CentOS安装docker 安装docker yum -y install docker 设置开机自启 systemctl enable docker 启动docker systemctl start docker 查看docker当前的版本 docker version做到这里呢基本上你的docker就安装了一大部分了&#xff0c;当然也有那些无法安装的人&#xff0c;那我建…

prometheus监控k8s服务并告警到钉钉

一、监控k8s集群 要监控k8s集群需要使用到以下服务用于收集监控的资源信息&#xff0c;node_exporter用于监控k8s集群节点的资源信息&#xff0c;kube-state-metrics用于监控k8s集群的deployment、statefulset、daemonset、pod等的状态&#xff0c;cadvisor用于监控k8s集群的p…

爆肝整理,Python自动化测试-Pytest参数化实战封装,一篇打通...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 参数化&#xff1…

Gradio:交互式Python数据应用程序的新前沿

一、说明 什么是Gradio以及如何使用Gradio在Python中创建DataApp或Web界面&#xff1f;使用 Gradio 将您的 Python 数据科学项目转换为交互式应用程序。 摄影&#xff1a;Elijah Merrell on Unsplash Gradio是一个Python库&#xff0c;允许我们快速为机器学习模型创建可定制的接…

工程英语翻译怎样做效果比较好

我们知道&#xff0c;高质量的工程翻译可以有效指导工程项目操作的执行&#xff0c;但市场上专业的工程英语翻译人才严重不足。那么&#xff0c;工程英语翻译难吗&#xff0c;怎样翻译工程英语比较好&#xff1f; 业内人士指出&#xff0c; 工程翻译具有用词专业、涉及领域广、…

Python(八十一)字符串的常用操作——字符串判断的相关方法

❤️ 专栏简介&#xff1a;本专栏记录了我个人从零开始学习Python编程的过程。在这个专栏中&#xff0c;我将分享我在学习Python的过程中的学习笔记、学习路线以及各个知识点。 ☀️ 专栏适用人群 &#xff1a;本专栏适用于希望学习Python编程的初学者和有一定编程基础的人。无…

Python3 安装、环境变量配置、PyCharm新建Python项目

一、安装包下载 Pyhton官网下载>>最新稳定版的安装包&#xff1a; 找到合适的版本进行下载&#xff1a; 如果下载较慢&#xff0c;此处提供一个3.10.11的稳定版本的安装包&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/16GnWjkGFuSfWfaI9UVX8qA?pwd4u5o 提取…

案例13 Spring MVC参数传递案例

基于Spring MVC实现HttpServletRequest、基本数据类型、Java Bean、数组、List、Map、JSON方式的参数传递。 1. 创建项目 选择Maven快速构建web项目&#xff0c;项目名称为case13-springmvc02。 2. 配置Maven依赖 <?xml version"1.0" encoding"UTF-8&quo…

【JavaScript】怎么测试方法的兼容性

利用网站测试方法的兼容性 打开网站&#xff1a;https://caniuse.com在里面输入要检测的方法&#xff0c;红色代表不支持&#xff0c;绿色代码支持。

Linux:Shell编程之正则表达式

目录 绪论 1、正则表达式 1.1 通配符 1.2 正则表达式分类 1.3 基本正则 1.4 正则表达式中表示次数的表达式 1.5 位置锚定 1.5.1 词首锚定和词尾锚定 1.6 分组&#xff08;&#xff09; 1.7 逻辑或 1.8 扩展正则 绪论 正则表达式&#xff1a;有一类特殊字符以及文本…

10-1_Qt 5.9 C++开发指南_Data Visualization实现数据三维显示

Data Visualization 是 Qt 提供的用于数据三维显示的模块。在 Qt 5.7 以前只有商业版才有此模块&#xff0c;而从Qt5.7 开始此模块在社区版本里也可以免费使用了。Data Visualization 用于数据的三维显示&#xff0c;包括三维柱状图、三维空间散点、三维曲面等。Data Visualiza…

【教程】初识云函数,实现无需服务器的项目上云!

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhang.cn] 你是否也在忧愁&#xff0c;想把自己的项目放在云上跑&#xff0c;但又不想花大价钱购买云服务器&#xff1f; 云函数介绍 云函数(Serverless Cloud Function&#xff0c;SCF)的简单理解就是&#xff1a;可以部署…

第四十八周周报

学习目标&#xff1a; 修改ViTGAN 学习内容&#xff1a; 位置编码和多尺度 学习时间&#xff1a; 8.5-8。12 学习产出&#xff1a; 这两周主要工作在修改ViTGAN的结构和代码&#xff0c;将相对位置编码加入ViTGAN并将生成器变为多尺度&#xff0c;由于匹配维度很困难&am…

国产航顺HK32F030M: 内部参考电压

HK32F030MF4P6 用户手册 内部参考电压 adc.c #include "bsp_adc.h"/*** brief ADC GPIO 初始化* param 无* retval 无*/ static void ADCx_GPIO_Config(void) {GPIO_InitTypeDef GPIO_InitStructure;// 打开 ADC IO端口时钟ADC_GPIO_AHBxClock_FUN ( ADC_GPIO_C…

无涯教程-Perl - msgctl函数

描述 该函数使用参数ID,CMD和ARG调用系统函数msgctrl()。您可能需要包括IPC::SysV包以获得正确的常量。 语法 以下是此函数的简单语法- msgctl ID, CMD, ARG返回值 该函数返回0,但如果系统函数成功返回0和1,则返回true。 Perl 中的 msgctl函数 - 无涯教程网无涯教程网提供…

TCP协议的报头格式和滑动窗口

文章目录 TCP报头格式端口号序号和确认序号确认应答&#xff08;ACK&#xff09;机制超时重传机制 首部长度窗口大小报文类型URGACKSYNPSHFINRST 滑动窗口滑动窗口的大小怎么设定怎么变化滑动窗口变化问题 TCP报头格式 端口号 两个端口号比较好理解&#xff0c;通过端口号来找…

3.UE基本操作及数字人工程模块组成(UE数字人系统教程)

1.Fay-UE5数字人工程导入 2.UE数字人语音交互 3.UE基本操作及数字人工程模块组成&#xff08;UE数字人系统教程&#xff09; 一、ue5基本操作 1、项目文件管理 2、关卡素材编辑 在关卡上&#xff1a;w、s、a、d移动&#xff0c;鼠标右键拖动换视角。 二、数字人工程模…

Downie 4 for Mac

Downie是一款Mac平台上非常实用的视频下载工具。它支持下载各种视频网站上的视频&#xff0c;并且具有快速、稳定、易于使用的特点。 Downie具有快速、稳定的下载速度&#xff0c;可以帮助用户轻松地下载高清视频和音频文件。 该软件还提供了简洁、易于使用的界面&#xff0c;…

Java并发编程(三)线程同步 上[synchronized/volatile]

概念 当使用多个线程来访问同一个数据时,将会导致数据不准确,相互之间产生冲突,非常容易出现线程安全问题,比如多个线程都在操作同一数据,都打算修改商品库存,这样就会导致数据不一致的问题。 所以我们通过线程同步机制来保证线程安全,加入同步锁以避免在该线程没有完成操作之前…

C++文件类(整理自C语言中文网-全)

C文件类&#xff08;文件流类&#xff09;及用法详解 《C输入输出流》一章中讲过&#xff0c;重定向后的 cin 和 cout 可分别用于读取文件中的数据和向文件中写入数据。除此之外&#xff0c;C 标准库中还专门提供了 3 个类用于实现文件操作&#xff0c;它们统称为文件流类&…