从多线程设计模式到对 CompletableFuture 的应用

news2025/1/16 1:48:18

大家好,我是 方圆。最近在开发 延保服务 频道页时,为了提高查询效率,使用到了多线程技术。为了对多线程方案设计有更加充分的了解,在业余时间读完了《图解 Java 多线程设计模式》这本书,觉得收获良多。本篇文章将介绍其中提到的 Future 模式,以及在实际业务开发中对该模式的应用,而这些内容对于本书来说只是冰山一角,还是推荐大家有时间去阅读原书。

1. Future 模式:“先给您提货单”

我们先来看一个场景:假如我们去蛋糕店买蛋糕,下单后,店员会递给我们提货单并告知“请您傍晚来取蛋糕”。到了傍晚我们拿着提货单去取蛋糕,店员会先和我们说“您的蛋糕已经做好了”,然后将蛋糕拿给我们。

如果将下单蛋糕到取蛋糕的过程抽象成一个方法的话,那么意味着这个方法需要花很长的时间才能获取执行结果,与其一直等待结果,不如先拿着一张“提货单”,到我们需要取货的时候,再通过它去取,而获取“提货单”的过程是几乎不耗时的,而这个提货单对象就被称为 Future,后续便可以通过它来获取方法的返回值。用 Java 来表示这个过程的话,需要使用到 FutureTaskCallable 两个类,如下:

public class Example {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 预定蛋糕,并定义“提货单”
        System.out.println("我:预定蛋糕");
        FutureTask<String> future = new FutureTask<>(() -> {
            System.out.println("店员:请您傍晚来取蛋糕");
            Thread.sleep(2000);
            System.out.println("店员:您的蛋糕已经做好了");

            return "Holiland";
        });
        // 开始做蛋糕
        new Thread(future).start();

        // 去做其他事情
        Thread.sleep(1000);
        System.out.println("我:忙碌中...");
        // 取蛋糕
        System.out.println("我:取蛋糕 " + future.get());
    }
}

// 运行结果:
// 我:预定蛋糕
// 店员:请您傍晚来取蛋糕
// 我:忙碌中...
// 店员:您的蛋糕已经做好了
// 我:取蛋糕 Holiland

方法的调用者可以将任务交给其他线程去处理,无需阻塞等待方法的执行,这样调用者便可以继续执行其他任务,并能通过 Future 对象获取执行结果。

它的运行原理如下:创建 FutureTask 实例时,Callable 对象会被传递给构造函数,当线程调用 FutureTaskrun 方法时,Callable 对象的 call 方法也会被执行。调用 call 方法的线程会同步地获取结果,并通过 FutureTaskset 方法来记录结果对象,如果 call 方法执行期间发生了异常,则会调用 setException 方法记录异常。最后,通过调用 get 方法获取方法的结果,注意这里可能会抛出方法执行时产生的异常

    public void run() {
        // ...
        try {
            // “提货任务”
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 调用 callable 的 call 方法
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    // 捕获并设置异常
                    setException(ex);
                }
                if (ran)
                    // 为结果赋值
                    set(result);
            }
        } finally {
            // ...
        }
    }

    protected void set(V v) {
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            // 将结果赋值给 outcome 全局变量,供 get 时获取
            outcome = v;
            // 修改状态为 NORMAL
            STATE.setRelease(this, NORMAL); // final state
            finishCompletion();
        }
    }

    protected void setException(Throwable t) {
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            // 将异常赋值给 outcome 变量,供 get 时抛出
            outcome = t;
            // 修改状态为 EXCEPTIONAL
            STATE.setRelease(this, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // 未完成时阻塞等一等
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        // 正常结束的话能正常获取到结果
        if (s == NORMAL)
            return (V)x;
        // 否则会抛出异常,注意如果执行中出现异常,调用 get 时会被抛出
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

现在对 Future 模式 已经有了基本的了解:它通过 Future 接口来表示未来的结果,实现 调用者与执行者之间的解耦提高系统的吞吐量和响应速度,那在实践中对该模式是如何使用的呢?

2. 对 Future 模式的实践

因为 延保服务 频道页访问量大且对接口性能要求较高,单线程处理并不能满足性能要求,所以应用了 Future 模式 来提高查询效率,但是并没有借助上文所述的 FutureTask 来实现,而是使用了 CompletableFuture 工具类,它们的实现原理基本一致,但是后者提供的方法和对 链式编程 的支持使代码更加简洁,实现更加容易(相关 API 参考见文末)。

如下是使用 CompletableFuture 异步多线程查询订单列表的逻辑,根据配置的 pageNo 分多条线程查询各页的订单数据:

        List<OrderListInfo> result = new ArrayList<>();
        // 并发查询订单列表
        List<CompletableFuture<List<OrderListInfo>>> futureList = new ArrayList<>();
        try {
            // 配置需要查询的页数 pageNo,并发查询不同页码的订单
            for (int i = 1; i <= pageNo; i++) {
                int curPageNo = i;
                CompletableFuture<List<OrderListInfo>> future = CompletableFuture.supplyAsync(
                        () -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor);

                futureList.add(future);
            }
            // 等待所有线程处理完毕,并封装结果值
            for (CompletableFuture<List<OrderListInfo>> future : futureList) {
                result.addAll(future.get());
            }
        } catch (Exception e) {
            log.error("并发查询用户订单信息异常", e);
        }

这段代码中对异常的处理能进行优化:第 15 行代码,如果某条线程查询订单列表时发生异常,那么在调用 get 方法时会抛出该异常,被 catch 后返回空结果,即使有其他线程查询成功,这些订单结果值也会被忽略掉,可以针对这一点进行优化,如下:

        List<OrderListInfo> result = new ArrayList<>();
        // 并发查询订单列表
        List<CompletableFuture<List<OrderListInfo>>> futureList = new ArrayList<>();
        try {
            // 配置需要查询的页数 pageNo,并发查询不同页码的订单
            for (int i = 1; i <= pageNo; i++) {
                int curPageNo = i;
                CompletableFuture<List<OrderListInfo>> future = CompletableFuture
                        .supplyAsync(() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor)
                // 添加异常处理
                .exceptionally(e -> {
                    log.error("查询用户订单信息异常", e);
                    return Collections.emptyList();
                });

                futureList.add(future);
            }
            // 等待所有线程处理完毕,并封装结果值
            for (CompletableFuture<List<OrderListInfo>> future : futureList) {
                result.addAll(future.get());
            }
        } catch (Exception e) {
            log.error("并发查询用户订单信息异常", e);
        }

优化后针对查询发生异常的任务打印异常日志,并返回空集合,这样即使单线程查询失败,也不会影响到其他线程查询成功的结果。

CompletableFuture 还提供了 allOf 方法,它返回的 CompletableFuture 对象在所有 CompletableFuture 执行完成时完成,相比于对每个任务都调用 get 阻塞等待任务完成的实现可读性更好,改造后代码如下:

        List<OrderListInfo> result = new ArrayList<>();
        // 并发查询订单列表
        CompletableFuture<List<OrderListInfo>>[] futures = new CompletableFuture[pageNo];
        // 配置需要查询的页数 pageNo,并发查询不同页码的订单
        for (int i = 1; i <= pageNo; i++) {
            int curPageNo = i;
            CompletableFuture<List<OrderListInfo>> future = CompletableFuture
                    .supplyAsync(() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor)
                    // 添加异常处理
                    .exceptionally(e -> {
                        log.error("查询用户订单信息异常", e);
                        return Collections.emptyList();
                    });

            futures[i - 1] = future;
        }

        try {
            // 等待所有线程处理完毕
            CompletableFuture.allOf(futures).get();
            for (CompletableFuture<List<OrderListInfo>> future : futures) {
                List<OrderListInfo> orderInfoList = future.get();
                if (CollectionUtils.isEmpty(orderInfoList)) {
                    result.addAll(orderInfoList);
                }
            }
        } catch (Exception e) {
            log.error("处理用户订单结果信息异常", e);
        }

Tips: CompletableFuture 的设计初衷是支持异步编程,所以应尽量避免在CompletableFuture 链中使用 get()/join() 方法,因为这些方法会阻塞当前线程直到CompletableFuture 完成,应该在必须使用该结果值时才调用它们。

相关的模式:命令模式

命令模式能将操作的调用者和执行者解耦,它能很容易的与 Future 模式 结合,以查询订单的任务为例,我们可以将该任务封装为“命令”对象的形式,执行时为每个线程提交一个命令,实现解耦并提高扩展性。在命令模式中,命令对象需要 支持撤销和重做,那么这便在查询出现异常时,提供了补偿处理的可能,命令模式类图关系如下:

在这里插入图片描述

3.《图解Java多线程设计模式》书籍推荐

我觉得本书算得上是一本老书:05 年出版的基于 JDK1.5 的Java多线程书籍,相比于目前我们常用的 JDK1.8 和时髦的 JDK21,在读之前总会让人觉得有一种过时的感觉。但是当我读完时,发现其中的模式能对应上代码中的处理逻辑:对 CompletableFuture 的使用正对应了其中的 Future 模式(异步获取其他线程的执行结果)等等,所以我觉得模式的应用不会局限于技术的新老,它是在某种情况下,研发人员共识或通用的解决方案,在知晓某种模式,采用已有的技术实现它是容易的,而反过来在只掌握技术去探索模式是困难且没有方向的。

同时,我也在考虑一个问题:对于新人学习多线程技术来说,究竟适不适合直接从模式入门呢?因为我对设计模式有了比较多的实践经验,所以对“模式”相关的内容足够敏感,如果新人没有这些经验的话,这对他们来说会不会更像是一个个知识点的堆砌呢?好在的是,本书除了模式相关的内容,对基础知识也做足了铺垫,而且提出的关于多线程编程的思考点也是非常值得参考和学习的,以线程互斥和协同为例,书中谈到:在对线程进行互斥处理时需要考虑 “要保护的东西是什么”,这样便能够 清晰的确定锁的粒度;对于线程的协同,书中提到的是需要考虑 “放在中间的东西是什么”,直接的抛出这个观点是不容易理解的,“中间的东西”是在多线程的 生产者和消费者模式 中提出的,部分线程负责生产,生产完成后将对象放在“中间”,部分线程负责消费,消费时取的便是“中间”的对象,而合理规划这些中间的东西便能 消除生产者和消费者之间的速度差异,提高系统的吞吐量和响应速度。而再深入考虑这两个角度时,线程的互斥和协同其实是内外统一的:为了让线程协调运行,必须执行互斥处理,以防止共享的内容被破坏,而线程的互斥是为了线程的协调运行才进行的必要操作。


附:CompletableFuture 常用 API

使用 supplyAsync 方法异步执行任务,并返回 CompletableFuture 对象

如下代码所示,调用 CompletableFuture.supplyAsync 静态方法异步执行查询逻辑,并返回一个新的 CompletableFuture 对象

CompletableFuture<List<Object>> future = CompletableFuture.supplyAsync(() -> doQuery(), executor);
使用 join 方法阻塞获取完成结果

如下代码所示,在封装结果前,调用 join 方法阻塞等待获取结果

futureList.forEach(CompletableFuture::join);

它与 get 方法的主要区别在于,join 方法抛出的是未经检查的异常 CompletionException,并将原始异常作为其原因,这意味着我们可以不需要在方法签名中声明它或在调用 join 方法的地方进行异常处理,而 get 方法会抛出 InterruptedExceptionExecutionException 异常,我们必须对它进行处理,get 方法源码如下:

    public T get() throws InterruptedException, ExecutionException {
        Object r;
        if ((r = result) == null)
            r = waitingGet(true);
        return (T) reportGet(r);
    }
用 thenApply(Function) 和 thenAccept(Consumer) 等回调函数处理结果

如下是使用 thenApply() 方法对 CompletableFuture 的结果进行转换的操作:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(greeting -> greeting + " World");
使用 exceptionally() 处理 CompletableFuture 中的异常

CompletableFuture 提供了exceptionally() 方法来处理异常,这是一个非常重要的步骤。如果在 CompletableFuture 的运行过程中抛出异常,那么这个异常会被传递到最终的结果中。如果没有适当的异常处理,那么在调用 get()join() 方法时可能会抛出异常。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) {
        throw new RuntimeException("Exception occurred");
    }
    return "Hello, World!";
}).exceptionally(e -> "An error occurred");
使用 allOf() 和 anyOf() 处理多个 CompletableFuture

如果有多个 CompletableFuture 需要处理,可以使用 CompletableFuture.allOf() 或者 CompletableFuture.anyOf()allOf() 在所有的 CompletableFuture 完成时完成,而 anyOf() 则会在任意一个 CompletableFuture 完成时完成。

complete()、completeExceptionally()、cancel() 方法

CompletableFuture 的运行是在调用了 complete()completeExceptionally()cancel() 等方法后才会被标记为完成。如果没有正确地完成 CompletableFuture,那么在调用 get() 方法时可能会永久阻塞。这三个方法在 Java 并发编程中有着重要的应用。以下是这三个方法的常见使用场景:

  1. complete(T value): 此方法用于显式地完成一个 CompletableFuture,并设置它的结果值。这在你需要在某个计算完成时,手动设置 CompletableFuture 的结果值的场景中非常有用。例如,你可能在一个异步操作完成时,需要设置 CompletableFuture 的结果值。
CompletableFuture<String> future = new CompletableFuture<>();
// Some asynchronous operation
future.complete("Operation Result");
  1. completeExceptionally(Throwable ex): 此方法用于显式地以异常完成一个 CompletableFuture。这在你需要在某个计算失败时,手动设置 CompletableFuture 的异常的场景中非常有用。例如,你可能在一个异步操作失败时,需要设置 CompletableFuture 的异常。
CompletableFuture<String> future = new CompletableFuture<>();
// Some asynchronous operation
future.completeExceptionally(new RuntimeException("Operation Failed"));
  1. cancel(boolean mayInterruptIfRunning): 此方法用于取消与 CompletableFuture 关联的计算。这在你需要取消一个长时间运行的或者不再需要的计算的场景中非常有用。例如,你可能在用户取消操作或者超时的情况下,需要取消 CompletableFuture 的计算。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // Long running operation
});
// Some condition
future.cancel(true);

这些方法都是线程安全的,可以从任何线程中调用。

使用 thenCompose() 处理嵌套的 CompletableFuture

如果在处理 CompletableFuture 的结果时又创建了新的CompletableFuture,那么就会产生嵌套的 CompletableFuture。这时可以使用 thenCompose() 方法来避免 CompletableFuture 的嵌套,如下代码所示:

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
使用 thenCombine() 处理两个 CompletableFuture 的结果
CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCombine(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> s1 + s2);

欢迎大家在京东APP内搜索 “京东延保” 跳转延保服务页~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1833120.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《C++ Primer》导学系列:第 4 章 - 表达式

4.1 基础 4.1.1 基本概念 组合运算符和运算对象 组合运算符是指将两个或多个操作数结合在一起进行运算的符号。在C中&#xff0c;常见的组合运算符包括算术运算符&#xff08;如, -, *, /, %&#xff09;、关系运算符&#xff08;如<, >, <, >, , !&#xff09;…

docker-compose jira、bugzilla、zentao

参见文章&#xff0c;这里是对之前的内容进行了改动&#xff0c;主要讲怎么将zentao容器融入到已有的docker-compose.yml中 一、zentao镜像 从官网上拉取&#xff1a;https://hub.docker.com/r/easysoft/zentao/tags 可以选择自己想要的版本&#xff0c;这里我选择的是开源版…

工厂物料管理系统(数据库课设)

1.课设要求描述 ●实现物料的分类管理; ●实现部门和员工信息管理; ●实现物料的入库和领用管理; ●实现物料的转仓管理; ●创建触发器&#xff0c;实现物料入库和领用时相应物料库存的自动更新; ●创建触发器&#xff0c;实现转仓时转入仓库物料增加、转出仓库物料减少…

线性卷积(相关)和圆周卷积(相关)以及FFT之间的关系(AEC举例)

时域自适应滤波算法中的线性卷积和线性相关运算量较大&#xff0c;导致计算复杂度升高&#xff0c;我们更愿意把这两个信号变换到频域&#xff0c;通过频域相乘的方式来取代时域复杂度相当高的卷积或相关运算。 预备知识&#xff1a;线性卷积&#xff08;相关&#xff09;和圆…

单一管理平台 - Enterprise Global Console

大约三年前&#xff0c;当我们向客户和社区推出控制台时&#xff0c;MinIO 的世界发生了变化。这是可访问性的巨大飞跃。可靠的 CLI 和 MC 命令很快让位于我们新的基于浏览器的 GUI 的速度和直观可用性。对于开发人员和企业 IT 管理员来说&#xff0c;这是一个游戏规则的改变者…

单片机建立自己的库文件(4)

文章目录 前言一、新建自己的外设文件夹1.新建外设文件夹&#xff0c;做项目好项目文件管理2.将之前写的.c .h 文件添加到文件夹中 二、在软件中添加项目 .c文件2.1 编译工程保证没问题2. 修改项目列表下的名称 三、在软件项目中添加 .h文件路径四、实际使用测试总结 前言 提示…

使用 C# 进行面向对象编程:第 10 部分

封装和抽象之间的区别 对于 OOP 初学者来说&#xff0c;封装和抽象之间存在非常基本的区别。他们可能会对此感到困惑。但如果你详细了解这两个主题&#xff0c;就会发现它们之间存在巨大差异。 抽象意味着向用户隐藏不必要的数据。用户只需要所需的功能或根据其需求的输出。例…

遵循法规,科学检测:可燃气体报警器多久检测一次?

在工业生产和日常生活中&#xff0c;可燃气体报警器作为一种重要的安全设备&#xff0c;能够实时监测并预警潜在的可燃气体泄漏风险&#xff0c;对于防范火灾和爆炸事故至关重要。 在这篇文章中&#xff0c;佰德将围绕可燃气体报警器的检测频率展开探讨&#xff0c;包括其功能…

基于WPF技术的换热站智能监控系统15--实时读取PLC数据

1、创建PLC实时数据 1、添加数据块 2、创建6个变量 用来表示水泵1和水泵2的参数&#xff0c;可以根据现场实际情况添加更多的变量参数 3、设置块属性并编译 4、下载该程序到PLC中 5、添加监控表 2、读取设备数据 S7协议下的tcp直接通讯&#xff0c;配置简单&#xff0c;一般P…

大模型泡沫退去,谁能活到下半场?

前言 从今年3月开始&#xff0c;国内企业纷纷下场大模型&#xff0c;铆足劲秀肌肉&#xff0c;如今转向垂直行业淘金&#xff0c;试图争霸行业大模型。我们的心态也逐渐从看乐子&#xff0c;到严肃讨论。 在人工智能的世界&#xff0c;我们经历了众多的概念游戏&#xff0c;在…

泛微开发修炼之旅--18泛微OA节点后操作代码自动退回流程的代码示例

文章链接&#xff1a;17泛微OA节点后操作代码自动退回流程的代码示例

短视频压缩与编码技术在短剧APP小程序开发中的应用

在短剧APP小程序开发中&#xff0c;短视频压缩与编码技术是实现高效视频处理的关键。本文将对这两项技术在短剧APP中的应用进行深入分析。 一、短视频压缩技术的重要性 节省存储空间&#xff1a;通过压缩技术&#xff0c;可以减小视频文件的大小&#xff0c;从而节省服务器和用…

【普中】基于51单片机的电子秒表数码管显示( proteus仿真+程序+设计报告+讲解视频)

这里写目录标题 设计资料内容清单&&下载链接资料下载链接&#xff1a;讲解视频&#xff1a;1.主要功能&#xff1a;2.仿真3. 程序代码4. 设计报告 【普中】基于51单片机的电子秒表数码管显示 ( proteus仿真程序设计报告讲解视频&#xff09; 仿真图proteus8.16(有低版…

8.12 矢量图层面要素单一符号使用一(简单填充)

文章目录 前言简单填充&#xff08;Simple fill&#xff09;QGis设置面符号为简单填充&#xff08;Simple fill&#xff09;二次开发代码实现简单填充&#xff08;Simple fill&#xff09; 总结 前言 本章介绍矢量图层线要素单一符号中简单填充&#xff08;Simple fill&#x…

玩转nRF52840-DK开发套件(1)

Nordic Semiconductor nRF52840开发套件 (nRF52840-DK) 是一款采用nRF52840多协议SoC&#xff08;片上系统&#xff09;的多功能单板开发工具&#xff0c;适用于蓝牙5.2/蓝牙低功耗、802.15.4 / Thread、ANT/ANT以及2.4GHz专有应用。nRF52840-DK与 Arduino Uno Revision 3 标准…

【html】用html5+css3+JavaScript制作一个计数器

目录 简介&#xff1a; 效果图&#xff1a; 源码&#xff1a; html: CSS: JS: 源码解析&#xff1a; 简介&#xff1a; 在日常生活当中很多事情都需要用到计数器特别是在体育运动当中&#xff0c;可以我们那么我们可不可以通过网页来制作一个计数器呢答案是肯定的我们需要利…

猫头虎推荐20个值得体验的通用大模型

猫头虎推荐20个值得体验的通用大模型 &#x1f680; 大家好&#xff0c;我是猫头虎&#xff0c;一名专注于科技领域的自媒体博主。今天是周一&#xff0c;新的开始&#xff0c;我们来深入探讨一下当前最值得体验的通用大模型。这些AI模型不仅功能强大&#xff0c;而且在各自领…

YonSuite银企直联:成长型企业数智转型的强力引擎

在当今数字化转型的浪潮中&#xff0c;成长型企业正面临着前所未有的发展机遇与挑战。在这场数字化转型的竞技场上&#xff0c;银企直联凭借其独特的优势&#xff0c;成为企业金融管理的重要利器&#xff0c;为企业带来前所未有的资金管理体验。用友YonSuite作为领先的数智化转…

rk3588 rkllm 安装部署

这是测试版本。 下载转换工具 $ git clone https://github.com/airockchip/rknn-llm.git安装转换环境 遵循此文档在PC Linux上安装Conda。 https://conda.io/projects/conda/en/stable/user-guide/install/linux.html 创建conda $ conda create -n RKLLM-Toolkit python3.…

RAG未来的出路

总有人喊RAG已死,至少看目前不现实。 持这个观点的人,大多是Long context派,老实说,这派人绝大多数不甚理解长上下文的技术实现点,就觉得反正context越长,越牛B,有点饭圈化 ,当然我并不否认长上下文对提升理解力的一些帮助,就是没大家想的那么牛B而已(说个数据,达到…