一文搞定高并发编程:CompletableFuture的supplyAsync与runAsync

news2024/12/27 12:21:02

CompletableFuture是Java 8中引入的一个类,用于简化异步编程和并发操作。它提供了一种方便的方式来处理异步任务的结果,以及将多个异步任务组合在一起执行。CompletableFuture支持链式操作,使得异步编程更加直观和灵活。在这里插入图片描述
在引入CompletableFuture之前,Java已经有了Future接口来表示异步计算的结果,但是它的功能相对有限,无法轻松实现复杂的异步操作链。CompletableFuture通过提供更丰富的方法和操作,使得异步编程变得更加便捷。

CompletableFuture实现了Future接口, CompletionStage接口,成为JDK8多任务协同场景下一个有效利器。

提交有返回值的异步任务

package com.neo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class DemoCompletableFuture {
    private static  Logger logger = LoggerFactory.getLogger(DemoCompletableFuture.class);
    public static void main(String[] args) throws Exception {

        //提交一个CompletableFuture任务
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            long start = System.currentTimeMillis();
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (Exception e) {
                e.printStackTrace();
            }
            long end = System.currentTimeMillis();
            logger.info("@@ 打印执行耗时:" +(end - start)  + " ms");
            return 1;
        });

        logger.info("CompletableFuture.supplyAsync 开始" );
        //通过get方法阻塞获取任务执行结果
        logger.info("CompletableFuture.supplyAsync 执行结果: {}", task.get());
        logger.info("CompletableFuture.supplyAsync 结束");
    }

}

输出结果如下,可以看出CompletableFuture的get方法会阻塞主线程工作,直到得到返回值为止。

13:39:32.976 [main] INFO com.neo.DemoCompletableFuture - CompletableFuture.supplyAsync 开始
13:39:37.985 [ForkJoinPool.commonPool-worker-9] INFO com.neo.DemoCompletableFuture - @@ 打印执行耗时:5011 ms
13:39:37.986 [main] INFO com.neo.DemoCompletableFuture - CompletableFuture.supplyAsync 执行结果: 1
13:39:37.990 [main] INFO com.neo.DemoCompletableFuture - CompletableFuture.supplyAsync 结束

对此我们来看看get方法是如何做到阻塞主线程并等待异步线程任务执行完成的。

从下面这段源码我们可以看到get方法的执行步骤:

/**
 * Waits if necessary for this future to complete, and then
 * returns its result.
 *
 * @return the result value
 * @throws CancellationException if this future was cancelled
 * @throws ExecutionException if this future completed exceptionally
 * @throws InterruptedException if the current thread was interrupted
 * while waiting
 */
public T get() throws InterruptedException, ExecutionException {
    Object r;
    return reportGet((r = result) == null ? waitingGet(true) : r);
}

reportGet函数分析

/**
 * Reports result using Future.get conventions.
 */
private static <T> T reportGet(Object r)
    throws InterruptedException, ExecutionException {
    if (r == null) // by convention below, null means interrupted
        throw new InterruptedException();
    if (r instanceof AltResult) {
        Throwable x, cause;
        if ((x = ((AltResult)r).ex) == null)
            return null;
        if (x instanceof CancellationException)
            throw (CancellationException)x;
        if ((x instanceof CompletionException) &&
            (cause = x.getCause()) != null)
            x = cause;
        throw new ExecutionException(x);
    }
    @SuppressWarnings("unchecked") T t = (T) r;
    return t;
}

这是CompletableFuture类中的一个私有静态方法reportGet,用于报告异步任务执行的结果,遵循Future.get的约定。让我们逐步分析这个方法:

参数类型

private static <T> T reportGet(Object r)    
throws InterruptedException, ExecutionException 

这是一个泛型方法,接收一个Object类型的参数r,表示异步任务的结果。

判断结果是否为null

if (r == null)     
throw new InterruptedException(); 

如果结果r为null,按照惯例表示任务被中断,此时抛出InterruptedException。

处理AltResult

if (r instanceof AltResult) {     
// ...
} 

如果结果是AltResult类型,说明异步任务执行过程中发生了异常。进入AltResult的处理逻辑。

获取异常信息并抛出相应异常

Throwable x, cause;
if ((x = ((AltResult)r).ex) == null)
    return null;
if (x instanceof CancellationException)
    throw (CancellationException)x;
if ((x instanceof CompletionException) &&
    (cause = x.getCause()) != null)
    x = cause;
throw new ExecutionException(x);

如果AltResult中的异常ex为null,说明异步任务被取消,返回null。

如果异常是CancellationException,抛出CancellationException。

如果异常是CompletionException,获取它的原因(cause),如果有原因就将异常替换为原因,最终抛出ExecutionException。

类型转换并返回结果

@SuppressWarnings("unchecked") T t = (T) r;
return t; 

最后,将r强制类型转换为泛型类型T,然后返回。

这个方法主要负责处理异步任务执行结果中可能涉及的异常情况,并根据Future.get的约定进行适当的处理。

waitingGet函数分析

/**
 * Returns raw result after waiting, or null if interruptible and
 * interrupted.
 */
private Object waitingGet(boolean interruptible) {
    Signaller q = null;
    boolean queued = false;
    int spins = -1;
    Object r;
    while ((r = result) == null) {
        if (spins < 0)
            spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                1 << 8 : 0; // Use brief spin-wait on multiprocessors
        else if (spins > 0) {
            if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                --spins;
        }
        else if (q == null)
            q = new Signaller(interruptible, 0L, 0L);
        else if (!queued)
            queued = tryPushStack(q);
        else if (interruptible && q.interruptControl < 0) {
            q.thread = null;
            cleanStack();
            return null;
        }
        else if (q.thread != null && result == null) {
            try {
                ForkJoinPool.managedBlock(q);
            } catch (InterruptedException ie) {
                q.interruptControl = -1;
            }
        }
    }
    if (q != null) {
        q.thread = null;
        if (q.interruptControl < 0) {
            if (interruptible)
                r = null; // report interruption
            else
                Thread.currentThread().interrupt();
        }
    }
    postComplete();
    return r;
}

这是CompletableFuture类中的一个私有方法waitingGet,用于在异步任务完成前等待其结果。让我们逐步分析这个方法:

初始化变量

Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;

这里初始化了一些变量,包括一个Signaller对象q,一个表示是否已经将任务推入栈的标志queued,一个用于自旋等待的计数spins,以及用于存储异步任务结果的变量r

自旋等待任务完成

while ((r = result) == null) {
    // 自旋等待任务完成
}

在这个循环中,不断检查result是否为null,如果为null,说明任务还未完成,就继续等待。

自旋等待策略

if (spins < 0)
    spins = (Runtime.getRuntime().availableProcessors() > 1) ?
        1 << 8 : 0; // Use brief spin-wait on multiprocessors
else if (spins > 0) {
    if (ThreadLocalRandom.nextSecondarySeed() >= 0)
        --spins;
}
  • 如果spins为负值,根据当前系统的处理器数量决定是否使用自旋等待。如果有多个处理器,使用brief spin-wait。
  • 如果spins大于0,且随机数为正,则减少spins,继续自旋等待。

创建和推送Signaller对象

else if (q == null)
    q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
    queued = tryPushStack(q);

  • 如果q为null,创建一个Signaller对象。Signaller是用于协调等待的辅助类。
  • 如果q已创建但未推送到栈中,尝试推送到栈中。

处理中断

else if (interruptible && q.interruptControl < 0) {
    q.thread = null;
    cleanStack();
    return null;
}
    • 如果支持中断且q.interruptControl小于0,表示中断发生,清理相关状态并返回null。

使用ManagedBlocker进行等待

 else if (q.thread != null && result == null) {
    try {
        ForkJoinPool.managedBlock(q);
    } catch (InterruptedException ie) {
        q.interruptControl = -1;
    }
}
  • 如果q.thread不为null,且任务未完成,使用ForkJoinPool.managedBlock进行等待。这是一种协作式的等待方式。

处理中断和结果

if (q != null) {
    q.thread = null;
    if (q.interruptControl < 0) {
        if (interruptible)
            r = null; // report interruption
        else
            Thread.currentThread().interrupt();
    }
}
  • 清理Signaller对象的状态。
  • 如果支持中断,根据中断控制状态设置返回值r为null或者中断当前线程。

完成异步任务后的处理

postComplete();

最后,调用postComplete方法,该方法用于处理异步任务完成后的一些后续操作。

返回结果

return r;

返回异步任务的结果。

这个方法主要负责等待异步任务的完成,使用了一些自旋等待、协作式等待和中断处理的策略,确保在任务完成后能够正确返回结果。

提交无返回值的异步任务

通过runAsync提交一个无返回值的异步任务,这里我们为了实现任务执行完成再关闭主线程用了个get阻塞等待任务完成。

package com.neo;
/**
 * @Author zhangt
 * @create 2023/11/10
 */

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @author zhangt
 * @date 2023年11月10日
 */
public class DemoCompletableFuture2 {


    private static  Logger logger = LoggerFactory.getLogger(DemoCompletableFuture2.class);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> supplyAsync = CompletableFuture.runAsync(() -> {
            long start = System.currentTimeMillis();
            logger.info(Thread.currentThread().getName() + "开始执行时间:" + start);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            logger.info(Thread.currentThread().getName() + "结束总执行时间:" + (System.currentTimeMillis() - start));
        });
        logger.info("CompletableFuture.supplyAsync 主线程开始运行" );
        //get阻塞主线程等待任务结束
        logger.info("get阻塞主线程等待任务结束 :" + supplyAsync.get());
        logger.info("CompletableFuture.supplyAsync 主线程运行结束");
    }

}

返回结果

15:29:59.922 [ForkJoinPool.commonPool-worker-9] INFO com.neo.DemoCompletableFuture2 - ForkJoinPool.commonPool-worker-9开始执行时间:1699860599920
15:29:59.922 [main] INFO com.neo.DemoCompletableFuture2 - CompletableFuture.supplyAsync 主线程开始运行
15:30:00.935 [ForkJoinPool.commonPool-worker-9] INFO com.neo.DemoCompletableFuture2 - ForkJoinPool.commonPool-worker-9结束总执行时间:1015
15:30:00.935 [main] INFO com.neo.DemoCompletableFuture2 - get阻塞主线程等待任务结束 :null
15:30:00.935 [main] INFO com.neo.DemoCompletableFuture2 - CompletableFuture.supplyAsync 主线程运行结束

区别

CompletableFuture.supplyAsync和CompletableFuture.runAsync都是用于创建异步任务的方法,但它们在任务的类型和返回值处理上有一些区别。

CompletableFuture.supplyAsync

任务类型: 用于执行有返回值的异步任务。任务由Supplier提供,不接收任何参数,返回一个结果。

方法签名

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

示例

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
      // 有返回值的异步任务
      return "Hello, CompletableFuture!";
});

CompletableFuture.runAsync

任务类型: 用于执行没有返回值的异步任务。任务由Runnable提供,不返回任何结果。

方法签名

public static CompletableFuture<Void> runAsync(Runnable runnable)

示例

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
     // 没有返回值的异步任务
     System.out.println("Running async task");
});

区别总结

CompletableFuture.supplyAsync用于执行有返回值的异步任务,接收一个Supplier,返回一个CompletableFuture对象,可获取异步任务的结果。

CompletableFuture.runAsync用于执行没有返回值的异步任务,接收一个Runnable,返回一个CompletableFuture对象,表示异步任务执行完毕。

这两个方法都允许通过传递Executor来指定异步任务的执行线程。例如,可以使用

CompletableFuture.supplyAsync(supplier, executor)

CompletableFuture.runAsync(runnable, executor)

来指定特定的线程池。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2124129.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python(TensorFlow和PyTorch)及C++注意力网络导图

&#x1f3af;要点 谱图神经网络计算注意力分数对比图神经网络、卷积网络和图注意力网络药物靶标建模学习和预测相互作用腹侧和背侧皮质下结构手写字体字符序列文本识别组织病理学图像分析长短期记忆财务模式预测相关性生物医学图像特征学习和迭代纠正 Python注意力机制 对…

Java面试篇基础部分-Java 实现的I/O方式

Java I/O 在整个的java.io包中提供了5个重要的I/O类和1个接口类。5个类分别是File、OutputStream、InputStream、Writer、Reader ,1个接口是指Serializable序列化接口。具体的使用方式可以查看JDK的参考文档。 Java NIO 实现 Java NIO的实现内容主要有如下的三个核心内容 Sel…

使用Vue3.5的onWatcherCleanup封装自动cancel的fetch函数

前言 在欧阳的上一篇 这应该是全网最详细的Vue3.5版本解读文章中有不少同学对Vue3.5新增的onWatcherCleanup有点疑惑&#xff0c;这个新增的API好像和watch API回调的第三个参数onCleanup功能好像重复了。今天这篇文章来讲讲新增的onWatcherCleanup函数的使用场景&#xff1a;…

《 C++ 容器全景指南:五 》深入探索 C++ 标准库中的 stack 与 queue 容器适配器

1、引言 1.1、容器适配器的概念与应用 容器适配器&#xff08;Container Adapters&#xff09;是 C 标准库提供的一种特殊容器&#xff0c;它不是一种独立的容器&#xff0c;而是对其他标准容器的封装&#xff0c;用来实现特定的数据结构如栈&#xff08;stack&#xff09;和…

【信创】麒麟KOS上安装使用网络抓包工具Wireshark

原文链接&#xff1a;【信创】麒麟KOS上安装使用网络抓包工具Wireshark Hello&#xff0c;大家好啊&#xff01;今天给大家带来一篇关于如何在麒麟桌面操作系统上安装和使用Wireshark的文章。Wireshark是一款强大的网络协议分析工具&#xff0c;广泛应用于网络故障排查、网络流…

Makefile学习总结

Makefile学习总结 目录 Makefile学习总结1. Makefile介绍2. Makefile规则3. Makefile文件里的赋值方法4. Makefile常用函数4.1 字符串替换和分析函数4.2 文件名函数4.3 其他函数 5. Makefile使用示例6、多级目录通用Makefile Demo6.1 一般通用Makefile的设计思想6.2 Demo分析 参…

DAY73

作业 pro文件&#xff1a; QT texttospeech 头文件&#xff1a; #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QPushButton> //按钮类 #include <QLabel> //标签类 #include <QLineEdit> //行编译器类 #include…

【delphi】判断多显示器下,程序在那个显示器中

在 Delphi 中&#xff0c;如果你的电脑连接了多个显示器&#xff0c;可以通过以下步骤判断某个程序在哪个显示器上运行。 方法概述&#xff1a; 获取程序窗口的位置&#xff08;例如窗体的 Left、Top 坐标&#xff09;。使用 Screen.MonitorFromWindow 函数来确定该窗口所属的…

Hibernate QueryPlanCache 查询计划缓存引发的内存溢出

目录 1.排查方式2.结论3.解决办法 前言&#xff1a;在生产环境中有一个后端程序多次报oom然后导致程序中断。 1.排查方式 通过下载后端程序产生的oom文件&#xff0c;将oom文件导入MemoryAnalyzer程序分析程序堆内存使用情况。 1、将oom文件导入MemoryAnalyzer后可以看到概览信…

在银河麒麟服务器操作系统中设置SSH登录限制

在银河麒麟服务器操作系统中设置SSH登录限制 1、引言2、 步骤一&#xff1a;检查MaxStartups选项3、步骤二&#xff1a;修改MaxStartups选项4、步骤三&#xff1a;重启SSH服务 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 1、引言 在服务…

flask旧衣物捐赠系统—计算机毕业设计源码26577

摘要 科技进步的飞速发展引起人们日常生活的巨大变化&#xff0c;电子信息技术的飞速发展使得电子信息技术的各个领域的应用水平得到普及和应用。信息时代的到来已成为不可阻挡的时尚潮流&#xff0c;人类发展的历史正进入一个新时代。在现实运用中&#xff0c;应用软件的工作规…

【干货分享】Ftrans安全数据交换系统 搭建跨网数据传输通道

安全数据交换系统是一种专门设计用于在不同的网络、系统或组织之间安全地传输数据的软件或硬件解决方案。这种系统通常包含多种安全特性&#xff0c;以确保数据在传输过程中的保密性、完整性和可用性。 安全数据交换系统可以解决哪些问题&#xff1f; 安全数据交换系统主要解…

神经网络卷积层和最大池化

文章目录 一、卷积层原理二、相关函数的概念三、卷积层的应用四、最大池化原理五、最大池化案例 一、卷积层原理 ./ 当前目录&#xff1b;…/ 上级目录 父类&#xff08;也称为基类或超类&#xff09;是指在类继承体系中被其他类继承的类。也就是被其他子类进行调用的类 当In_…

「豆包 Marscode 体验官」AI 加持的云端 IDE——三种方法高效开发前后端聊天交互功能

以下是「豆包 MarsCode 体验官」优秀文章&#xff0c;作者努力的小雨。 豆包 MarsCode 豆包MarsCode 编程助手支持的 IDE: 支持 Visual Studio Code 1.67.0 及以上版本&#xff0c;以及 JetBrains 系列 IDE&#xff0c;如 IntelliJ IDEA、Pycharm 等&#xff0c;版本要求为 22…

016.PL-SQL编程—过程

我 的 个 人 主 页&#xff1a;&#x1f449;&#x1f449; 失心疯的个人主页 &#x1f448;&#x1f448; 入 门 教 程 推 荐 &#xff1a;&#x1f449;&#x1f449; Python零基础入门教程合集 &#x1f448;&#x1f448; 虚 拟 环 境 搭 建 &#xff1a;&#x1f449;&…

服务器测试之GPU基础汇总

GPU基础汇总 1.GPU简介 1.1.什么是GPU GPU英文全称Graphic Processing Unit&#xff0c;中文翻译为“图形处理器”。一个专门的图形核心处理器。GPU是显示卡的“大脑”&#xff0c;决定了该显卡的档次和大部分性能&#xff0c;同时也是2D显示卡和3D显示卡的区别依据。可以形…

Flask 第三课 -- 第一个应用

上一章节我们已经成功安装了 Flask&#xff0c;接下来我们可以创建一个简单的 Flask 应用。 首先&#xff0c;创建一个名为 app.py 的文件&#xff0c;并添加以下内容&#xff1a; from flask import Flaskapp Flask(__name__)app.route(/) def hello_world():return Hello,…

网络拓扑结构介绍

这张图展示了一个复杂的网络拓扑结构&#xff0c;它包括了多个运营商的接入、负载均衡、安全防护以及数据处理等多个关键环节。整个网络通过精心设计的架构和高效的节点连接&#xff0c;实现了数据的快速传输和安全处理。 一、各个模块介绍 运营商接入&#xff1a; 移动、电信…

论文速读|形机器人的高速和抗冲击远程操作

论文地址&#xff1a;https://arxiv.org/pdf/2409.04639 本文提出了一种综合解决方案&#xff0c;用于远程控制类人机器人&#xff0c;实现了高速度和冲击抵抗的操作。通过结合无校准的运动捕捉和重定标、低延迟全身运动流式传输工具箱和高带宽的摆线驱动器&#xff0c;显著提高…

【Python报错已解决】ValueError: All arrays must be of the same length

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 文章目录 前言一、问题描述1.1 报错示例1.2 报错分析1.3 解决思路 二、解决方法2.1 方法一&#xff1a;调整数组长度2.2 步骤二…