CompletableFuture异步多任务最佳实践

news2024/11/26 18:30:02

简介

CompletableFuture相比于Java8的并行流,对于处理并发的IO密集型任务有着得天独厚的优势:

  1. 在流式编程下,支持构建任务流时即可执行任务。
  2. CompletableFuture任务支持提交到自定义线程池,调优方便。

本文所有案例都会基于这样的一个需求,某网站有多个商家,用户会在不同的店铺查看同一件商品,只要用户在提供给商店对应的产品名称,商店就会返回对应产品的最终价格。
该需求有几个注意点:

  1. 每次到一家商店查询时,同一时间只能查询一个商品。
  2. 允许用户同一时间在系统里查询多个商店的一个商品。
  3. 查询的商品售价是需要耗时的,平均500ms-2500ms不等。
  4. 为了更直观了解代码性能,我们的例子用户每次会去家左右的店铺分别查询一件商品。

在这里插入图片描述

这里介绍一下,这些功能所要用到的类,首先是商家类,核心方法getPrice会根据用户传入的产品名称返回售价,代码执行时会休眠一段时间返回用户产品最终价格。

/**
 * 商店
 */
public class Shop {

    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));
    }

    /**
     * 传入产品名称返回对应价格和折扣代码
     * @param product 产品名称
     * @return
     */
    public String getPrice(String product) {
        double price = calculatePrice(product);
        Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
        return name + ":" + price + ":" + code;
    }


    

    /**
     * 获取商品价格
     * @param product
     * @return
     */
    public double calculatePrice(String product) {
        try {
            TimeUnit.MILLISECONDS.sleep(RandomUtil.randomInt(500,2500));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return (int)(random.nextDouble() * product.charAt(0) + product.charAt(1));
    }

    public String getName() {
        return name;
    }
}

同时我们也给出了18个商家的列表。

private final static List<Shop> shops = Arrays.asList(new Shop("Nike"),
            new Shop("Apple"),
            new Shop("Coca-Cola"),
            new Shop("Amazon"),
            new Shop("Samsung"),
            new Shop("McDonald's"),
            new Shop("Mercedes-Benz"),
            new Shop("Google"),
            new Shop("Louis Vuitton"),
            new Shop("Chanel"),
            new Shop("Gucci"),
            new Shop("Adidas"),
            new Shop("Pepsi"),
            new Shop("Ford"),
            new Shop("Microsoft"),
            new Shop("Rolex"),
            new Shop("Ferrari"),
            new Shop("IKEA")
    );

实现

顺序流

第一版我们先用顺序流实现改需求,代码很简单用stream遍历商家并调用getPrice获得结果,最终存到List中。

public static void main(String[] args) {
        long begin = System.currentTimeMillis();
        List<String> resultList = shops.stream()
                .map(shop -> shop.getName() + " price is " + shop.getPrice("Air Jordan"))
                .collect(Collectors.toList());
        long end = System.currentTimeMillis();
        System.out.println("执行结束,执行结果:" + JSONUtil.toJsonStr(resultList) + " 执行耗时:" + (end - begin)+"ms");
    }

最终输出结果如下,可以看到用了快30s。

执行结束,执行结果:["Nike price is Nike:112.0:GOLD","Apple price is Apple:135.0:DIAMOND","Coca-Cola price is Coca-Cola:111.0:DIAMOND","Amazon price is Amazon:155.0:GOLD","Samsung price is Samsung:165.0:SILVER","McDonald's price is McDonald's:164.0:PLATINUM","Mercedes-Benz price is Mercedes-Benz:111.0:DIAMOND","Google price is Google:121.0:NONE","Louis Vuitton price is Louis Vuitton:168.0:PLATINUM","Chanel price is Chanel:131.0:NONE","Gucci price is Gucci:132.0:SILVER","Adidas price is Adidas:133.0:PLATINUM","Pepsi price is Pepsi:158.0:SILVER","Ford price is Ford:113.0:PLATINUM","Microsoft price is Microsoft:156.0:GOLD","Rolex price is Rolex:126.0:GOLD","Ferrari price is Ferrari:163.0:DIAMOND","IKEA price is IKEA:150.0:NONE"] 执行耗时:27409ms

并行流

对此我们采用并行流尝试以多线程的形式执行任务,所以我们将stream改为parallelStream。

public static void main(String[] args) {
        long begin = System.currentTimeMillis();
        List<String> resultList = shops.parallelStream()
                .map(shop -> shop.getName() + " price is " + shop.getPrice("Air Jordan"))
                .collect(Collectors.toList());
        long end = System.currentTimeMillis();
        System.out.println("执行结束,执行结果:" + JSONUtil.toJsonStr(resultList) + " 执行耗时:" + ((end - begin)/1000)+"s");
    }

可以看到耗时仅仅5s,这里补充一下笔者的机器信息,笔者的CPU是6核的,所以并行流执行时会有6个线程在同时工作。

执行结束,执行结果:["Nike price is Nike:112.0:GOLD","Apple price is Apple:135.0:DIAMOND","Coca-Cola price is Coca-Cola:111.0:DIAMOND","Amazon price is Amazon:155.0:GOLD","Samsung price is Samsung:165.0:SILVER","McDonald's price is McDonald's:164.0:PLATINUM","Mercedes-Benz price is Mercedes-Benz:111.0:DIAMOND","Google price is Google:121.0:NONE","Louis Vuitton price is Louis Vuitton:168.0:PLATINUM","Chanel price is Chanel:131.0:NONE","Gucci price is Gucci:132.0:SILVER","Adidas price is Adidas:133.0:PLATINUM","Pepsi price is Pepsi:158.0:SILVER","Ford price is Ford:113.0:PLATINUM","Microsoft price is Microsoft:156.0:GOLD","Rolex price is Rolex:126.0:GOLD","Ferrari price is Ferrari:163.0:DIAMOND","IKEA price is IKEA:150.0:NONE"] 执行耗时:5077ms

使用CompletableFuture执行异步多查询任务

我们给出了CompletableFuture执行多IO查询任务的代码示例,可以看到代码的执行流程大致为:

  1. 遍历商家。
  2. 提交异步查询任务。
  3. 调用join(),注意这里的join和CompletableFuture的get方法作用是一样的,都是阻塞获取查询结果,唯一的区别就是join方法签名没有抛异常,所以无需try-catch处理。
 public static void main(String[] args) {
        long begin = System.currentTimeMillis();
        List<String> resultList = shops.stream()
                        .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is "
                                + shop.getPrice("Air Jordan")))//CompletableFuture提交价格查询任务
                        .map(CompletableFuture::join) //用join阻塞获取结果
                        .collect(Collectors.toList());//组成列表
        long end = System.currentTimeMillis();
        System.out.println("执行结束,执行结果:" + JSONUtil.toJsonStr(resultList) + " 执行耗时:" + (end - begin) + "ms");
    }

可以看到执行结果为31s,查询效率还不如顺序流。

执行结束,执行结果:["Nike price is Nike:112.0:GOLD","Apple price is Apple:135.0:DIAMOND","Coca-Cola price is Coca-Cola:111.0:DIAMOND","Amazon price is Amazon:155.0:GOLD","Samsung price is Samsung:165.0:SILVER","McDonald's price is McDonald's:164.0:PLATINUM","Mercedes-Benz price is Mercedes-Benz:111.0:DIAMOND","Google price is Google:121.0:NONE","Louis Vuitton price is Louis Vuitton:168.0:PLATINUM","Chanel price is Chanel:131.0:NONE","Gucci price is Gucci:132.0:SILVER","Adidas price is Adidas:133.0:PLATINUM","Pepsi price is Pepsi:158.0:SILVER","Ford price is Ford:113.0:PLATINUM","Microsoft price is Microsoft:156.0:GOLD","Rolex price is Rolex:126.0:GOLD","Ferrari price is Ferrari:163.0:DIAMOND","IKEA price is IKEA:150.0:NONE"] 执行耗时:31097ms

原因很简单,我们本次的流操作执行会构成下面这样一张图,可以看到查询价格的操作是有耗时的,随后我们又调用了join方法使得流的后续步骤被阻塞,最终CompletableFuture用的和顺序流一样。

在这里插入图片描述

分解流优化使用CompletableFuture

上述我们提到过,之所以慢是因为join阻塞了流的操作,所以提升效率的方式就是不要让join阻塞流的操作。所以笔者将流拆成了两个。
如下图,第一个流负责提交任务,即遍历每一个商家并将查询价格的任务提交出去,期间不阻塞,最终会生成一个CompletableFuture的List。

紧接着我们遍历上一个流生成的List<CompletableFuture>,调用join方法阻塞获取结果,因为上一个流操作提交任务时不阻塞,所以每个任务一提交时就可能已经在执行了,所以join方法获取结果的耗时也会相对短一些。

在这里插入图片描述

所以我们的代码最后改造成了这样:

public static void main(String[] args) {
        long begin = System.currentTimeMillis();
        List<CompletableFuture<String>> completableFutureList = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice("Air Jordan")))
                .collect(Collectors.toList());//CompletableFuture提交价格查询任务

        List<String> resultList = completableFutureList.stream()
                .map(CompletableFuture::join) //用join阻塞获取结果
                .collect(Collectors.toList());//组成列表

        long end = System.currentTimeMillis();
        System.out.println("执行结束,执行结果:" + JSONUtil.toJsonStr(resultList) + " 执行耗时:" + (end - begin) + "ms");
    }

执行结果如下,可以看到代码耗时差不多也是5s和并行流差不多,原因很简单,线程池默认用6个,对于IO密集型任务来说显然是不够的。

执行结束,执行结果:["Nike price is Nike:112.0:GOLD","Apple price is Apple:135.0:DIAMOND","Coca-Cola price is Coca-Cola:111.0:DIAMOND","Amazon price is Amazon:155.0:GOLD","Samsung price is Samsung:165.0:SILVER","McDonald's price is McDonald's:164.0:PLATINUM","Mercedes-Benz price is Mercedes-Benz:111.0:DIAMOND","Google price is Google:121.0:NONE","Louis Vuitton price is Louis Vuitton:168.0:PLATINUM","Chanel price is Chanel:131.0:NONE","Gucci price is Gucci:132.0:SILVER","Adidas price is Adidas:133.0:PLATINUM","Pepsi price is Pepsi:158.0:SILVER","Ford price is Ford:113.0:PLATINUM","Microsoft price is Microsoft:156.0:GOLD","Rolex price is Rolex:126.0:GOLD","Ferrari price is Ferrari:163.0:DIAMOND","IKEA price is IKEA:150.0:NONE"] 执行耗时:5633ms

CompletableFuture使用自定义线程池

《Java并发编程实战》一书中,Brian Goetz和合著者们为线程池大小的优化提供了不少中肯的建议。这非常重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的 情况,处理器的一些核可能就无法充分利用。BrianGoetz建议,线程池大小与处理器的利用率 之比可以使用下面的公式进行估算:
N(threads) = N(CPU)* U(CPU) * (1+ W/C)
其中: N(CPU)是处理器的核的数目,可以通过 Runtime.getRuntime().available Processors() 得到。U(CPU)是期望的 CPU利用率(该值应该介于 0和 1之间) W/C是等待时间与计算时间的比率。

我们的CPU核心数为6,我们希望的CPU利用率为1,而等待时间按照这种的计算应该是1250ms而计算时间可以忽略不计,所以W/C差不多可以换算为1000。
最终我们计算结果为:

N(threads) = N(CPU)* U(CPU) * (1+ W/C)
		   = 6*1*1000
		   =6000

很明显6000个线程非常不合理,所以我们使用了和商店数差不多的线程数,所以我们将线程设置为18(这也是个大概的数字,具体情况还需要经过压测进行增减)。

最终代码写成这样。

 private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20,
            20,
            1,
            TimeUnit.MINUTES,
            new ArrayBlockingQueue<>(100),
            new ThreadFactoryBuilder().setNamePrefix("threadPool-%d").build());


    public static void main(String[] args) {
        long begin = System.currentTimeMillis();
        List<CompletableFuture<String>> completableFutureList = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice("Air Jordan"), threadPool))
                .collect(Collectors.toList());//CompletableFuture提交价格查询任务

        List<String> resultList = completableFutureList.stream()
                .map(CompletableFuture::join) //用join阻塞获取结果
                .collect(Collectors.toList());//组成列表

        long end = System.currentTimeMillis();
        System.out.println("执行结束,执行结果:" + JSONUtil.toJsonStr(resultList) + " 执行耗时:" + (end - begin) + "ms");
        threadPool.shutdownNow();
    }

输出结果如下,可以看到查询效率有了质的飞跃。

执行结束,执行结果:["Nike price is Nike:112.0:GOLD","Apple price is Apple:135.0:DIAMOND","Coca-Cola price is Coca-Cola:111.0:DIAMOND","Amazon price is Amazon:155.0:GOLD","Samsung price is Samsung:165.0:SILVER","McDonald's price is McDonald's:164.0:PLATINUM","Mercedes-Benz price is Mercedes-Benz:111.0:DIAMOND","Google price is Google:121.0:NONE","Louis Vuitton price is Louis Vuitton:168.0:PLATINUM","Chanel price is Chanel:131.0:NONE","Gucci price is Gucci:132.0:SILVER","Adidas price is Adidas:133.0:PLATINUM","Pepsi price is Pepsi:158.0:SILVER","Ford price is Ford:113.0:PLATINUM","Microsoft price is Microsoft:156.0:GOLD","Rolex price is Rolex:126.0:GOLD","Ferrari price is Ferrari:163.0:DIAMOND","IKEA price is IKEA:150.0:NONE"] 执行耗时:1893ms

并行流一定比CompletableFuture烂吗?

如果是计算密集型的任务,使用stream是最佳姿势,因为密集型需要一直计算,加多少个线程都无济于事,使用stream简单使用了。
而对于io密集型的任务,例如上文这种大量查询都需要干等的任务,使用CompletableFuture是最佳姿势了,通过自定义线程创建比cpu核心数更多的线程来提高工作效率才是较好的解决方案

参考文献

Java 8实战:https://book.douban.com/subject/26772632/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1300988.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机网络:数据链路层之差错控制、奇偶校验码、CRC循环冗余码、海明码

带你度过期末难关 文章目录 一、差错控制 1、冗余编码2、编码VS编码二、检错编码 1、奇偶校验码2、CRC循环冗余码三、纠错编码————海明码 海明距离1、确定校验码位数r2、确定校验码和数据的位置3、求出校验码的值4、检错并纠错 纠错的方法一&#xff1a;纠错方法二&#x…

正则表达式(9):扩展正则表达式

正则表达式&#xff08;9&#xff09;&#xff1a;扩展正则表达式 小结 本博文转载自 前文中一直在说&#xff0c;在Linux中&#xff0c;正则表达式可以分为”基本正则表达式”和”扩展正则表达式”。 我们已经认识了”基本正则表达式”&#xff0c;现在&#xff0c;我们来认…

想学编程,但不知道从哪里学起,应该怎么办?

怎样学习任何一种编程语言 我将教你怎样学习任何一种你将来可能要学习的编程语言。本书的章节是基于我和很多程序员学习编程的经历组织的&#xff0c;下面是我通常遵循的流程。 1&#xff0e;找到关于这种编程语言的书或介绍性读物。 2&#xff0e;通读这本书&#xff0c;把…

基于深度学习的超分辨率图像技术一览

超分辨率(Super-Resolution)即通过硬件或软件的方法提高原有图像的分辨率&#xff0c;图像超分辨率是计算机视觉和图像处理领域一个非常重要的研究问题&#xff0c;在医疗图像分析、生物特征识别、视频监控与安全等实际场景中有着广泛的应用。 SR取得了显著进步。一般可以将现有…

30 张图解 HTTP 常见的面试题

前言 在面试过程中&#xff0c;HTTP 被提问的概率还是比较高的 我搜集了 5 大类 HTTP 面试常问的题目&#xff0c;同时这 5 大类题跟 HTTP 的发展和演变关联性是比较大的&#xff0c;通过问答 图解的形式由浅入深的方式帮助大家进一步的学习和理解 HTTP 协议。 HTTP 基本概…

持续集成交付CICD:使用Maven命令上传Nexus制品

目录 一、实验 1.使用Maven命令上传Nexus制品&#xff08;第一种方式&#xff09; 2.使用Maven命令上传Nexus制品&#xff08;第二种方式&#xff09; 一、实验 1.使用Maven命令上传Nexus制品&#xff08;第一种方式&#xff09; &#xff08;1&#xff09;指定一个 hoste…

Blender学习--制作带骨骼动画的机器人

1. 首先创建一个机器人模型 时间关系&#xff0c;这部分步骤有时间补充 2. 然后为机器人创建一副骨架 时间关系&#xff0c;这部分步骤有时间补充 3.骨骼绑定 切换到物体模式&#xff0c;选中机器人头部&#xff0c;Shift选中骨骼&#xff0c;切换到姿态模式&#xff0c;&am…

zcms企业官网建站系统源码搭建-支持页面自定义

1.支持mysql&#xff0c;sqlite&#xff0c;access三种数据库。 2.模板和标签与asp版的zzzcms通用。 3.asp版的zzzcms的access数据库可直接使用。 4.支持手机站。 &#xff08;增删改查不做描述&#xff09;&#xff1a; 网站信息 名称&#xff0c;logo&#xff0c;微信&…

记录一次云原生线上服务数据迁移全过程

文章目录 背景迁移方案调研迁移过程服务监控脚本定时任务暂停本地副本服务启动&#xff0c;在线服务下线MySQL 数据迁移Mongo 数据迁移切换新数据库 ip 本地服务启动数据库连接验证服务打包部署服务重启前端恢复正常监控脚本定时任务启动旧服务器器容器关闭 迁移总结 背景 校园…

pytorch一致数据增强

分割任务对 image 做&#xff08;某些&#xff09;transform 时&#xff0c;要对 label&#xff08;segmentation mask&#xff09;也做对应的 transform&#xff0c;如 Resize、RandomRotation 等。如果对 image、label 分别用 transform 处理一遍&#xff0c;则涉及随机操作的…

基于深度学习的yolov7植物病虫害识别及防治系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介简介YOLOv7 系统特性工作流程 二、功能三、系统四. 总结 一项目简介 # YOLOv7植物病虫害识别及防治系统介绍 简介 该系统基于深度学习技术&#xff0c;采…

【KCC@南京】KCC南京“数字经济-开源行”活动回顾录

11月26日&#xff0c;由KCC南京、中科南京软件研究所、傲空间、PowerData联合主办的 KCC南京“数字经济-开源行” 的活动已圆满结束。此次活动&#xff0c;3 场主题研讨&#xff0c;11 场分享&#xff0c;现场参会人数 60&#xff0c;线上直播观看 3000&#xff0c;各地小伙伴从…

抓取真实浏览器设备指纹fingerprint写入cookie方案

一个关于抓取真实浏览器设备指纹写入cookie方案&#xff0c;用户访问页面获取到用户设备生成指纹id&#xff0c;通过js把指纹存入cookie&#xff0c;然后用php进行获取cookie存的指纹值到后台。 用途&#xff1a;追踪用户设备&#xff0c;防恶意注册&#xff0c;防恶意采集 浏…

1827_ChibiOS中OSLIB的邮箱机制

全部学习汇总&#xff1a; GreyZhang/g_ChibiOS: I found a new RTOS called ChibiOS and it seems interesting! (github.com) 1. 邮箱其实是一个环形队列&#xff1b; 2. 使用场景上&#xff0c;邮箱主要是用来实现异步单向的一些消息或者数据处理的。在处理机制上&#xff…

C语言 预处理 + 条件编译宏 + 井号运算符

预处理阶段任务 预处理指令 条件编译宏 条件编译宏的作用在于根据编译时的条件进行代码的选择性编译&#xff0c;从而实现不同环境、不同配置或不同功能的编译版本。 这可以用于实现调试模式和发布模式的切换&#xff0c;平台适配&#xff0c;以及选择性地编译不同的功能模块等…

【Spring 基础】00 入门指南

【Spring 基础】00 入门指南 文章目录 【Spring 基础】00 入门指南1.简介2.概念1&#xff09;控制反转&#xff08;IoC&#xff09;2&#xff09;依赖注入&#xff08;DI&#xff09; 3.核心模块1&#xff09;Spring Core2&#xff09;Spring AOP3&#xff09;Spring MVC4&…

组件之间传值

目录 1&#xff1a;组件中的关系 2&#xff1a;父向子传值 3&#xff1a;子组件向父组件共享数据 4&#xff1a;兄弟组件数据共享 1&#xff1a;组件中的关系 在项目中使用到的组件关系最常用两种是&#xff0c;父子关系&#xff0c;兄弟关系 例如A组件使用B组件或者C组件…

大师学SwiftUI第18章Part2 - 存储图片和自定义相机

存储图片 在前面的示例中&#xff0c;我们在屏幕上展示了图片&#xff0c;但也可以将其存储到文件或数据库中。另外有时使用相机将照片存储到设备的相册薄里会很有用&#xff0c;这样可供其它应用访问。UIKit框架提供了如下两个保存图片和视频的函数。 UIImageWriteToSavedPh…

CCF刷题记录 -- 202305-2:矩阵运算 --python解法

2023.12.7 主要算法 矩阵置换矩阵相乘 满分注意点 运算顺序&#xff0c;利用了矩阵运算法则中的&#xff08;A*B&#xff09;*c A*(B*C) # 矩阵置换 def zhihuan(a):b[]for i in range(d):c []for j in range(n):c.append(a[j][i])b.append(c)return b# 矩阵相乘 def ju_zh…

C# WPF上位机开发(通讯协议的编写)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 作为上位机&#xff0c;它很重要的一个部分就是需要和外面的设备进行数据沟通的。很多时候&#xff0c;也就是在这个沟通的过程当中&#xff0c;上…