CompletableFuture异步编排

news2025/1/9 15:07:22

CompletableFuture异步编排

  • 1、CompletableFuture异步编排
    • 1.1 为什么需要异步编排
    • 1.2 CompletableFuture介绍
    • 1.3 创建异步对象
    • 1.4 线程串行化与并行化方法
    • 1.5 多任务组合
    • 1.6 优化商品详情页(业务代码)
      • 1.6.1 未优化之前的代码
      • 1.6.2 使用CompletableFuture异步编排
      • 1.6.3 测试功能是否正常

1、CompletableFuture异步编排

1.1 为什么需要异步编排

  问题:查询商品详情页的逻辑非常复杂,数据的获取都需要远程调用,必然需要花费更多的时间。

目前我业务中商品详情页包含如下7个方法:

获取sku的基本详情和图片列表

获取实时价格

获取三级分类

获取销售属性和选中状态

获取商品切换数据

获取海报信息

获取平台信息

  上面查询过程都是用OpenFeign服务调用实现的,假设每个远程调用需要1s时间,那么全部执行完需要7s,这对用户来说是难以接受的。

  那如果有多个线程同时执行这7步操作呢,时间是不是就更短了。


1.2 CompletableFuture介绍

  FutureJava 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。

  在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

  CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。

  CompletableFutureFutureTask同属于Future接口的实现类,都可以获取线程的执行结果。

image-20230419214958726

1.3 创建异步对象

  CompletableFuture 提供了四个静态方法来创建一个异步操作。

image-20230419215150849

  没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

  • runAsync方法不支持返回值。

  • supplyAsync可以支持返回值。

  whenComplete可以处理正常或异常的计算结果,exceptionally处理异常情况。BiConsumer<? super T,? super Throwable>可以定义处理业务


whenCompletewhenCompleteAsync 的区别:

  whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。

  whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

  方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)


代码演示:

public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创建一个没有返回值的异步对象
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("没有返回值结果");
        });
        System.out.println(future.get());

        //创建一个有返回值的异步对象
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                int a=1/0;
                return 404;
            }
        }).whenComplete(new BiConsumer<Integer, Throwable>() {
            /**
             *whenComplete 和异步对象使用用一个线程
             * @param integer   异步对象执行后的返回值结果
             * @param throwable 异常对象
             */
            @Override
            public void accept(Integer integer, Throwable throwable) {
                System.out.println("whenComplete:"+integer);
                System.out.println("whenComplete:"+throwable);
            }
        }).exceptionally(new Function<Throwable, Integer>() {
            /**
             * 只处理异常的回调
             * @param throwable
             * @return
             */
            @Override
            public Integer apply(Throwable throwable) {
                return null;
            }
        }).whenCompleteAsync(new BiConsumer<Integer, Throwable>() {
            /**
             * whenCompleteAsync跟异步对象有可能不适用同一个线程,由线程池重新分配
             * @param integer
             * @param throwable
             */
            @Override
            public void accept(Integer integer, Throwable throwable) {

            }
        });
    }

}

image-20230419220803592

1.4 线程串行化与并行化方法

  thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

image-20230419225245838

  thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

在这里插入图片描述

  thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作

在这里插入图片描述

  带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。

Function<? super T,? extends U> 
T:上一个任务返回结果的类型 
U:当前任务的返回值类型

  代码演示:

public class CompletableFutureDemo {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(
                        50,
                        500,
                        30,
                        TimeUnit.SECONDS,
                        new ArrayBlockingQueue<>(10000)
                );

        //创建一个异步任务对象A
        CompletableFuture<Object> futureA = CompletableFuture.supplyAsync(new Supplier<Object>() {
            @Override
            public Object get() {
                return "404";
            }
        },threadPoolExecutor);
        //创建一个B
        futureA.thenAcceptAsync(new Consumer<Object>() {
            @SneakyThrows
            @Override
            public void accept(Object o) {
                    Thread.sleep(500);
                    System.out.println("我是B");
            }
        },threadPoolExecutor);
        //创建一个C
        futureA.thenAcceptAsync(new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                System.out.println("我是C");
            }
        },threadPoolExecutor);
    }
}

image-20230419221321514

  这里是测试看是否是并行化,我们让B休眠一会,可以看到先输出C再输出B,说明是并行化。

  因为如果是串行化的化,那么即使B休眠一会,那么C也会一直等着,输出顺序为B、C

1.5 多任务组合

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

  allOf:等待所有任务完成。

  anyOf:只要有一个任务完成。

1.6 优化商品详情页(业务代码)

1.6.1 未优化之前的代码

@Service
@SuppressWarnings("all")
public class ItemServiceImpl implements ItemService {
    
    @Autowired
    private ProductFeignClient productFeignClient;

    //获取商品详情数据
    @Override
    public HashMap<String, Object> getItem(Long skuId) {
        HashMap<String, Object> resultMap=new HashMap<>();

        //获取sku的基本详情和图片列表
        SkuInfo skuInfo = productFeignClient.getSkuInfo(skuId);
        //获取实时价格
        BigDecimal skuPrice = productFeignClient.getSkuPrice(skuId);

        //判断
        if(skuInfo!=null){
            //获取三级分类
            BaseCategoryView categoryView = productFeignClient.getCategoryView(skuInfo.getCategory3Id());
            //获取销售属性和选中状态
            List<SpuSaleAttr> spuSaleAttrListCheckBySku = productFeignClient.getSpuSaleAttrListCheckBySku(skuId, skuInfo.getSpuId());
            //获取商品切换数据
            Map skuValueIdsMap = productFeignClient.getSkuValueIdsMap(skuInfo.getSpuId());
            //获取海报信息
            List<SpuPoster> spuPosterBySpuId = productFeignClient.findSpuPosterBySpuId(skuInfo.getSpuId());

            resultMap.put("categoryView",categoryView);
            resultMap.put("spuSaleAttrList",spuSaleAttrListCheckBySku);
            resultMap.put("valuesSkuJson", JSON.toJSONString(skuValueIdsMap));
            resultMap.put("spuPosterList",spuPosterBySpuId);
        }
        //获取平台信息
        List<BaseAttrInfo> attrList = productFeignClient.getAttrList(skuId);
        //处理数据符合要求 List  Obj  key attrName value attrValue
        List<Map<String, String>> spuAttrList = attrList.stream().map(baseAttrInfo -> {
            Map<String, String> map = new HashMap<>();
            map.put("attrName", baseAttrInfo.getAttrName());
            map.put("attrValue", baseAttrInfo.getAttrValueList().get(0).getValueName());
            return map;
        }).collect(Collectors.toList());

        //存储数据
        resultMap.put("skuInfo",skuInfo);
        resultMap.put("price",skuPrice);
        resultMap.put("skuAttrList",spuAttrList);
        return resultMap;
    }
}

1.6.2 使用CompletableFuture异步编排

配置线程池:

@Configuration
public class ThreadPoolConfig {
    /**
     * 核心线程数
     * 最大线程数
     * 空闲存活时间
     * 时间单位
     * 阻塞队列
     * 默认:
     *  线程工厂
     *  拒绝策略
     * @return
     */
    @Bean
    public ThreadPoolExecutor threadPoolExecutor(){

        return new ThreadPoolExecutor(
                50,
                500,
                30,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10000)
        );
    }
}

实现类改造:

@Service
@SuppressWarnings("all")
public class ItemServiceImpl implements ItemService {

    @Autowired
    private ProductFeignClient productFeignClient;

    @Autowired
    private ThreadPoolExecutor executor;

    //获取商品详情数据
    @Override
    public HashMap<String, Object> getItem(Long skuId) {
        HashMap<String, Object> resultMap=new HashMap<>();

        CompletableFuture<SkuInfo> skuInfoCompletableFuture = CompletableFuture.supplyAsync(new Supplier<SkuInfo>() {
            @Override
            public SkuInfo get() {
                //获取sku的基本详情和图片列表
                SkuInfo skuInfo = productFeignClient.getSkuInfo(skuId);
                resultMap.put("skuInfo", skuInfo);
                return skuInfo;
            }
        }, executor);

        CompletableFuture<Void> skuPriceCompletableFuture = CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                //获取实时价格
                BigDecimal skuPrice = productFeignClient.getSkuPrice(skuId);
                resultMap.put("price", skuPrice);
            }
        }, executor);

        //判断
        CompletableFuture<Void> categoryViewCompletableFuture = skuInfoCompletableFuture.thenAcceptAsync(new Consumer<SkuInfo>() {
            @Override
            public void accept(SkuInfo skuInfo) {
                //获取三级分类
                BaseCategoryView categoryView = productFeignClient.getCategoryView(skuInfo.getCategory3Id());
                resultMap.put("categoryView",categoryView);
            }
        }, executor);

        CompletableFuture<Void> spuSaleAttrListCheckBySkuCompletableFuture = skuInfoCompletableFuture.thenAcceptAsync(new Consumer<SkuInfo>() {
            @Override
            public void accept(SkuInfo skuInfo) {
                //获取销售属性和选中状态
                List<SpuSaleAttr> spuSaleAttrListCheckBySku = productFeignClient.getSpuSaleAttrListCheckBySku(skuId, skuInfo.getSpuId());
                resultMap.put("spuSaleAttrList",spuSaleAttrListCheckBySku);
            }
        }, executor);
        CompletableFuture<Void> skuValueIdsMapCompletableFuture = skuInfoCompletableFuture.thenAcceptAsync(new Consumer<SkuInfo>() {
            @Override
            public void accept(SkuInfo skuInfo) {
                //获取商品切换数据
                Map skuValueIdsMap = productFeignClient.getSkuValueIdsMap(skuInfo.getSpuId());
                resultMap.put("valuesSkuJson", JSON.toJSONString(skuValueIdsMap));
            }
        }, executor);

        CompletableFuture<Void> findSpuPosterBySpuIdCompletableFuture = skuInfoCompletableFuture.thenAcceptAsync(new Consumer<SkuInfo>() {
            @Override
            public void accept(SkuInfo skuInfo) {
                //获取海报信息
                List<SpuPoster> spuPosterBySpuId = productFeignClient.findSpuPosterBySpuId(skuInfo.getSpuId());
                resultMap.put("spuPosterList",spuPosterBySpuId);
            }
        }, executor);


        CompletableFuture<Void> attrListCompletableFuture = CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                //获取平台信息
                List<BaseAttrInfo> attrList = productFeignClient.getAttrList(skuId);
                //处理数据符合要求 List  Obj  key attrName value attrValue
                List<Map<String, String>> spuAttrList = attrList.stream().map(baseAttrInfo -> {
                    Map<String, String> map = new HashMap<>();
                    map.put("attrName", baseAttrInfo.getAttrName());
                    map.put("attrValue", baseAttrInfo.getAttrValueList().get(0).getValueName());
                    return map;
                }).collect(Collectors.toList());

                //存储数据
                resultMap.put("skuAttrList", spuAttrList);
            }
        }, executor);

        //多任务组合 -- 所有的异步任务执行完成才是完成
        CompletableFuture.allOf(
                skuInfoCompletableFuture,
                skuPriceCompletableFuture,
                categoryViewCompletableFuture,
                spuSaleAttrListCheckBySkuCompletableFuture,
                skuValueIdsMapCompletableFuture,
                findSpuPosterBySpuIdCompletableFuture,
                attrListCompletableFuture
        ).join();
        return resultMap;
    }
}

  根据是否有返回值决定调用哪个API,然后看有没有依赖关系,有好几个都依赖SkuInfo,所以要用skuInfoCompletableFuture去创建。

   我们需要等待每个任务执行完毕之后在返回,所以最后使用allOf方法进行多任务组合。

1.6.3 测试功能是否正常

  这种异步效果其实在高并发下环境下测比较好,我们这里验证功能是否正常就行。

  访问商品详情页:

image-20230419224246988

  查看Redis中的数据

image-20230419224307865

  可以看到,有6个key被缓存,由于我们的价格是实时价格,所以一直查的是数据库,千万别用缓存。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/436780.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux 下 REST 客户端的新选择:Insomnia 3.0

正在为 Linux 桌面端找一个免费的 REST 客户端&#xff1f; 别睡不着觉了&#xff01;试试 Insomnia。 这个应用是跨平台的&#xff0c;可以工作在 Linux、macOS、Windows。开发者 Gregory Schier 告诉我们他创造这个应用是为了“帮助开发者处理和 REST API 的通信”。他还说&a…

如何在Java中创建临时文件?

在Java程序中&#xff0c;有时需要创建临时文件来暂存数据或者执行某些操作。Java提供了许多方式来创建临时文件。在本教程中&#xff0c;我们将介绍如何使用Java标准库来创建临时文件。 一、使用File.createTempFile()方法 Java标准库中的File类提供了createTempFile()方法来…

设计模式--单例模式

介绍 所谓类的单例模式 就是采取一定的方法保证在整个软件系统中对某个类只能存在一个对象实例,并且该类只提供一个取得其对象实例的方法(静态方法) 比如 Hibemate的SessionFactory 它充当数据存储源的代理 并负责创建Session对象 SessionFactory并不是轻量级的 一般情况下 一个…

Java中的Map(三种双列集合万字详解)

点击可查看单列集合Set万字详解&#xff1a;其中还包含哈希解读和底层分析。 文章目录 前言一、Map1.Map集合常用的API代码演示&#xff1a;1.Map集合的基本功能2.Map集合的获取功能3.Map的getOrDefault()方法 2.Map集合的三种遍历1.键找值、值找键2.键值对3.Lambda表达式 二、…

【C++11】晦涩难懂语法系列:可变参数模板

目录 可变参数模板 1.1 概念 1.2 可变参数模板定义 1.3 参数包的展开方式 1.3.1 递归展开参数包 1.3.2 逗号表达式展开参数包 1.4 STL的emplace系列函数 可变参数模板 1.1 概念 在C语言阶段&#xff0c;我们已经接触过可变参数&#xff0c;比如scand、printf等等 这里…

9.2 回归分析

学习目标&#xff1a; 回归分析是一种广泛应用于数据分析和预测的统计方法&#xff0c;可以用来探索自变量与因变量之间的关系并进行预测。我学习回归分析&#xff0c;我会采取以下步骤&#xff1a; 学习基本概念&#xff1a;回归分析中的基本概念包括自变量和因变量、回归系数…

运放专题:运放输入端交直流混合信号隔直放大

运放输入不隔直放大 运放输入端不隔直&#xff0c;那么经过运放放大后&#xff0c;交流成分放大了&#xff0c;直流成分也被放大了。看下面的仿真&#xff1a; 交流信号为&#xff1a;振幅3V, 频率5K的正弦波&#xff0c;直流偏置为1V 可以看到&#xff0c;交流信号被放大的…

【Linux】匿名管道代码实现-mypipe

文章目录 管道介绍什么是管道&#xff1a;管道的原理管道的特点 具体代码详写创建初始文件makefile编写定义任务列表-task.hpp分阶段代码编写总代码展示: ctrlProcess.cc 编写头文件包含(如有不会,自己查谷歌)定义全局变量以解耦main,函数框架EndPoint定义creatProcess 创建管道…

Apollo配置中心使用篇

Apollo配置中心使用篇 常见配置中心对比Apollo核心概念Apollo核心特性Apollo架构设计各模块介绍服务端设计客户端设计Apollo与Spring集成的底层原理 Apollo安装安装apollo-portalconfig service和admin service部署多网卡问题解决修改Portal环境配置调整ApolloPortal配置 Apoll…

【产品设计】用户操作日志

日志记录了代码的执行过程&#xff0c;根据目的不同&#xff0c;可以分为系统日志和操作日志。 一、什么是日志 日志记录了代码的执行过程。根据目的不同&#xff0c;可分为系统日志和操作日志。 1&#xff09;系统日志 记录系统中硬件、软件和系统问题的信息&#xff0c;同…

C#基础学习--枚举器和迭代器

目录 枚举器和可枚举类型 IEnumerator 接口 IEnumerable 接口 实现 IEnumerable 和 IEnumerator的示例 泛型枚举接口 迭代器 迭代器块 使用迭代器来创建枚举器 使用迭代器来创建可枚举类型 常见迭代器模式 产生多个可枚举类型 将迭代器作为属性 迭代器实质 枚举器和可…

【分享】比ChatGPT还厉害?可以自主解决复杂任务的Auto-GPT迅速走红(内含体验地址)

哈喽&#xff0c;大家好&#xff0c;我是木易巷~ 最近木易巷在了解Auto GPT&#xff0c;今天给大家分享一下~ 自主解决复杂任务的Auto-GPT 什么是Auto-GPT&#xff1f; Auto-GPT 是一款开源 Python 应用程序&#xff0c;由开发者用户 Significant Gravitas 于 2023 年 3 月 30…

钉钉接入“通义千问”大模型,输入斜杠“/”唤起智能服务

4月18日&#xff0c;钉钉总裁叶军在2023春季钉峰会上宣布&#xff0c;钉钉正式接入阿里巴巴“通义千问”大模型&#xff0c;输入“&#xff0f;”在钉钉即可唤起 10 余项 AI 能力&#xff0c;叶军现场演示了群聊、文档、视频会议及应用开发四个场景。 现场展示中&#xff0c;只…

C++:智能指针(auto_ptr/unique_ptr/shared_ptr/weak_ptr)

为什么需要智能指针&#xff1f; C没有垃圾回收机制。 #include<iostream> using namespace std;int div() {int a, b;cin >> a >> b;if (b 0)throw invalid_argument("除0错误");return a / b; }void Func() {// 1、如果p1这里new 抛异常会如何…

网络原理数据链路层

嘿嘿,又见面了,今天为大家带来数据链路层的相关知识.这个层面的知识离咱们程序员太遥远了,我们简单介绍一下就行 1.以太网 2.认识Mac地址 3.区分Mac地址和IP地址 4.MTU 5.DNS 1.以太网 以太网是数据链路层和物理层的使用的网络,物理层用的不咋多,我们就先不讲了,直接看数…

论文阅读 - Segment Anything

文章目录 0 前言1 预备知识1.1 深度学习训练框架1.2 语义分割训练框架 2 SAM的任务3 SAM的模型3.1 模型整体结构3.2 Image encoder3.3 Prompt encoder3.4 Mask decoder3.5 训练细节 4 SAM的数据4.1 模型辅助的手动标注阶段4.2 半自动阶段4.3 全自动阶段 5 SAM的应用5.1 拿来主义…

什么是感知机——图文并茂,由浅入深

什么是感知机——图文并茂&#xff0c;由浅入深 文章目录 什么是感知机——图文并茂&#xff0c;由浅入深引言感知机的引入宝宝版青年版老夫聊发少年狂版激活函数 感知机的应用与门或门 感知机与深度学习感知机与神经网络感知机和深度学习什么关系呢&#xff1f; 引言 生活中常…

【4月比赛合集】19场可报名的「创新应用」和「程序设计」大奖赛,任君挑选!

CompHub 实时聚合多平台的数据类(Kaggle、天池…)和OJ类(Leetcode、牛客…&#xff09;比赛。本账号同时会推送最新的比赛消息&#xff0c;欢迎关注&#xff01; 更多比赛信息见 CompHub主页 或 点击文末阅读原文 以下信息仅供参考&#xff0c;以比赛官网为准 目录 创新应用赛&…

【SpringBoot】一:SpringBoot的基础(上)

文章目录 1. 脚手架创建项目1.1使用Spring Initializr1.2 IDEA中使用脚手架创建项目 2. 代码结构2.1 单一结构2.2 多模块2.3 包和主类2.4 pom文件2.4.1 父项目2.4.2 启动器2.4.3 不使用父项目 3. 运行SpringBoot项目 1. 脚手架创建项目 脚手架辅助创建程序的工具&#xff0c;S…

《Java8实战》第12章 新的日期和时间 API

原来的Java的时间类Date、java.util.Calendar类都不太好&#xff0c;以语言无关方式格式化和解析日期或时间的 DateFormat 方法也有线程安全的问题 12.1 LocalDate、LocalTime、LocalDateTime、Instant、Duration 以及 Period 12.1.1 使用 LocalDate 和 LocalTime LocalDate…