Java的Future机制详解

news2024/12/28 23:08:06

Java的Future机制详解

  • 一、为什么出现Future机制
  • 二、Future的相关类图
    • 2.1 Future 接口
    • 2.2 FutureTask 类
  • 三、FutureTask的使用方法
  • 四、FutureTask源码分析
    • 4.1 state字段
    • 4.2 其他变量
    • 4.4 构造函数
    • 4.5 run方法及其他

一、为什么出现Future机制

常见的两种创建线程的方式。一种是直接继承Thread,另外一种就是实现Runnable接口。

这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。

从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Future模式的核心思想是能够让主线程将原来需要同步等待的这段时间用来做其他的事情。(因为可以异步获得执行结果,所以不用一直同步等待去获得执行结果)

在这里插入图片描述

上图简单描述了不使用Future和使用Future的区别,不使用Future模式,主线程在invoke完一些耗时逻辑之后需要等待,这个耗时逻辑在实际应用中可能是一次RPC调用,可能是一个本地IO操作等。B图表达的是使用Future模式之后,我们主线程在invoke之后可以立即返回,去做其他的事情,回头再来看看刚才提交的invoke有没有结果。

二、Future的相关类图

2.1 Future 接口

首先,我们需要清楚,Futrue是个接口。Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

在这里插入图片描述

接口定义行为,我们通过上图可以看到实现Future接口的子类会具有哪些行为:

  • 我们可以取消这个执行逻辑,如果这个逻辑已经正在执行,提供可选的参数来控制是否取消已经正在执行的逻辑。
  • 我们可以判断执行逻辑是否已经被取消。
  • 我们可以判断执行逻辑是否已经执行完成。
  • 我们可以获取执行逻辑的执行结果。
  • 我们可以允许在一定时间内去等待获取执行结果,如果超过这个时间,抛TimeoutException。

2.2 FutureTask 类

类图如下:

在这里插入图片描述

FutureTask是Future的具体实现。FutureTask实现了RunnableFuture接口。RunnableFuture接口又同时继承了Runnable 和 Future 接口。所以FutureTask既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

三、FutureTask的使用方法

举个例子,假设我们要执行一个算法,算法需要两个输入 input1 和 input2, 但是input1和input2需要经过一个非常耗时的运算才能得出。由于算法必须要两个输入都存在,才能给出输出,所以我们必须等待两个输入的产生。接下来就模仿一下这个过程。

package src;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskTest {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        long starttime = System.currentTimeMillis();

        //input2生成, 需要耗费3秒
        FutureTask<Integer> input2_futuretask = new FutureTask<>(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                Thread.sleep(3000);
                return 5;
            }
        });

        new Thread(input2_futuretask).start();

        //input1生成,需要耗费2秒
        FutureTask<Integer> input1_futuretask = new FutureTask<>(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                Thread.sleep(2000);
                return 3;
            }
        });
        new Thread(input1_futuretask).start();

        Integer integer2 = input2_futuretask.get();
        Integer integer1 = input1_futuretask.get();
        System.out.println(algorithm(integer1, integer2));
        long endtime = System.currentTimeMillis();
        System.out.println("用时:" + String.valueOf(endtime - starttime));
    }

    //这是我们要执行的算法
    public static int algorithm(int input, int input2) {
        return input + input2;
    }
}

输出结果:

在这里插入图片描述

我们可以看到用时3001毫秒,与最费时的input2生成时间差不多。
注意,我们在程序中生成input1时,也让线程休眠了2秒,但是结果不是3+2。说明FutureTask是被异步执行了。

四、FutureTask源码分析

4.1 state字段

volatile修饰的state字段;表示FutureTask当前所处的状态。可能的转换过程见注释。

	/**
	* Possible state transitions:
	* NEW -> COMPLETING -> NORMAL(创建到正常运行结束的状态变化轨迹)
	* NEW -> COMPLETING -> EXCEPTIONAL(创建到异常运行结束的状态变化轨迹)
	* NEW -> CANCELLED  (创建到取消的状态变化轨迹)
	* NEW -> INTERRUPTING -> INTERRUPTED(创建到中断结束的状态变化轨迹)
	*/
	private volatile int state;
	// NEW 新建状态,表示这个 FutureTask还没有开始运行
	private static final int NEW          = 0;
	// COMPLETING 完成状态, 表示 FutureTask 任务已经计算完毕了
	// 但是还有一些后续操作,例如唤醒等待线程操作,还没有完成。
	private static final int COMPLETING   = 1;
	// FutureTask 任务完结,正常完成,没有发生异常
	private static final int NORMAL       = 2;
	// FutureTask 任务完结,因为发生异常。
	private static final int EXCEPTIONAL  = 3;
	// FutureTask 任务完结,因为取消任务
	private static final int CANCELLED    = 4;
	// FutureTask 任务完结,也是取消任务,不过发起了中断运行任务线程的中断请求
	private static final int INTERRUPTING = 5;
	// FutureTask 任务完结,也是取消任务,已经完成了中断运行任务线程的中断请求
	private static final int INTERRUPTED  = 6;

4.2 其他变量

    /** 任务 */
    private Callable<V> callable;
    /** 储存结果*/
    private Object outcome; // non-volatile, protected by state reads/writes
    /** 执行任务的线程*/
    private volatile Thread runner;
    /** get方法阻塞的线程队列 */
    private volatile WaitNode waiters;

    //FutureTask的内部类,get方法的等待队列
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

4.3 CAS工具初始化

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

这段代码是为了后面使用CAS而准备的。可以这么理解:

一个java对象可以看成是一段内存,各个字段都得按照一定的顺序放在这段内存里,同时考虑到对齐要求,可能这些字段不是连续放置的,用这个UNSAFE.objectFieldOffset()方法能准确地告诉你某个字段相对于对象的起始内存地址的字节偏移量,因为是相对偏移量,所以它其实跟某个具体对象又没什么太大关系,跟class的定义和虚拟机的内存模型的实现细节更相关。

4.4 构造函数

FutureTask有两个构造函数:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

这两个构造函数区别在于,如果使用第一个构造函数最后获取线程实行结果就是callable的执行的返回结果;而如果使用第二个构造函数那么最后获取线程实行结果就是参数中的result,接下来让我们看一下FutureTask的run方法。

同时两个构造函数都把当前状态设置为NEW。

4.5 run方法及其他

构造完FutureTask后,会把它当做线程的参数传进去,然后线程就会运行它的run方法。所以我们先来看一下run方法:

public void run() {
        //如果状态不是new,或者runner旧值不为null(已经启动过了),就结束
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable; // 这里的callable是从构造方法里面传人的
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call(); //执行任务,并将结果保存在result字段里。
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex); // 保存call方法抛出的异常
                }
                if (ran)
                    set(result); // 保存call方法的执行结果
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

其中,catch语句中的setException(ex)如下:

//发生异常时设置state和outcome
protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); 
            finishCompletion();// 唤醒get()方法阻塞的线程
        }
    }

而正常完成时,set(result);方法如下:

//正常完成时,设置state和outcome
protected void set(V v) {
//正常完成时,NEW->COMPLETING->NORMAL
 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
     outcome = v;
     UNSAFE.putOrderedInt(this, stateOffset, NORMAL); 
            finishCompletion(); // 唤醒get方法阻塞的线程
        }
    }

这两个set方法中,都是用到了finishCompletion()去唤醒get方法阻塞的线程。下面来看看这个方法:

//移除并唤醒所有等待的线程,调用done,并清空callable
private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t); //唤醒线程
                    }
                    //接下来的这几句代码是将当前节点剥离出队列,然后将q指向下一个等待节点。被剥离的节点由于thread和next都为null,所以会被GC回收。
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done(); //这个是空的方法,子类可以覆盖,实现回调的功能。
        callable = null;        // to reduce footprint
    }

好,到这里我们把运行以及设置结果的流程分析完了。那接下来看一下怎么获得执行结果把。也就是get方法。

get方法有两个,一个是有超时时间设置,另一个没有超时时间设置。

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        // get(timeout, unit) 也很简单, 主要还是在 awaitDone里面
        if(unit == null){
            throw new NullPointerException();
        }
        int s = state;
        // 判断state状态是否 <= Completing, 调用awaitDone进行自旋等待
        if(s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING){
            throw new TimeoutException();
        }
        // 根据state的值进行返回结果或抛出异常
        return report(s);
    }

两个get方法都用到了awaitDone()。这个方法的作用是: 等待任务执行完成、被中断或超时。看一下源码:

    //等待完成,可能是是中断、异常、正常完成,timed:true,考虑等待时长,false:不考虑等待时长
    private int awaitDone(boolean timed, long nanos) throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L; //如果设置了超时时间
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
         /**
         *  有优先级顺序
         *  1、如果线程已中断,则直接将当前节点q从waiters中移出
         *  2、如果state已经是最终状态了,则直接返回state
         *  3、如果state是中间状态(COMPLETING),意味很快将变更过成最终状态,让出cpu时间片即可
         *  4、如果发现尚未有节点,则创建节点
         *  5、如果当前节点尚未入队,则将当前节点放到waiters中的首节点,并替换旧的waiters
         *  6、线程被阻塞指定时间后再唤醒
         *  7、线程一直被阻塞直到被其他线程唤醒
         *
         */
            if (Thread.interrupted()) {// 1
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {// 2
                if (q != null)
                    q.thread = null;
                return s; 
            }
            else if (s == COMPLETING) // 3
                Thread.yield();
            else if (q == null) // 4
                q = new WaitNode();
            else if (!queued) // 5
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            else if (timed) {// 6
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q); //从waiters中移出节点q
                    return state; 
                }
                LockSupport.parkNanos(this, nanos); 
            }
            else // 7
                LockSupport.park(this);
        }
    }

接下来看下removeWaiter()移除等待节点的源码:

    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null; // 将移除的节点的thread=null, 为移除做标示
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    //通过 thread 判断当前 q 是否是需要移除的 q节点,因为我们刚才标示过了
                    if (q.thread != null) 
                        pred = q; //当不是我们要移除的节点,就往下走
                    else if (pred != null) {
                        //当p.thread==null时,到这里。下面这句话,相当于把q从队列移除。
                        pred.next = s;
                        //pred.thread == null 这种情况是在多线程进行并发 removeWaiter 时产生的
                        //此时正好移除节点 node 和 pred, 所以loop跳到retry, 从新进行这个过程。想象一下,如果在并发的情况下,其他线程把pred的线程置为空了。那说明这个链表不应该包含pred了。所以我们需要跳到retry从新开始。
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    //到这步说明p.thread==null 并且 pred==null。说明node是头结点。
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }

最后在get方法中调用report(s),根据状态s的不同进行返回结果或抛出异常。

    private V report(int s) throws ExecutionException {
        Object x = outcome;  //之前我们set的时候,已经设置过这个值了。所以直接用。
        if (s == NORMAL)
            return (V)x;  //正常执行结束,返回结果
        if (s >= CANCELLED)
            throw new CancellationException(); //被取消或中断了,就抛异常。
        throw new ExecutionException((Throwable)x);
    }

以上就是FutureTask的源码分析。

最后总结一下:

FutureTask既可以当做Runnable也可以当做Future。线程通过执行FutureTask的run方法,将正常运行的结果放入FutureTask类的result变量中。使用get方法可以阻塞直到获得结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1612864.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

高架学习笔记之软件架构风格

目录 零、什么是软件架构风格 一、常见的软件架构风格 二、数据流风格 2.1. 批处理风格 2.2. 管道-过滤器风格 三、调用/返回风格 3.1. 主/子程序风格 3.2. 面向对象风格 3.3. 层次型风格 3.4. 客户端/服务器风格 3.4.1. 两层C/S体系结构 3.4.2. 三层C/S体系结构 …

MBD_入门篇_20_Simulink子系统

20.Simulink子系统 20.1 概述 Simulink的子系统&#xff0c;相当于代码的function函数&#xff0c;但是模型的子系统又不完全等效于代码的函数。虚拟子系统并不会生成函数&#xff0c;而是以代码块的形式放在相应的调用位置上。模型层面我们使用子系统去做模块化的设计&#xf…

Mini-Gemini: 探索多模态视觉语言模型的新境界

一、背景 在数字化时代&#xff0c;人工智能的发展正以前所未有的速度推进。特别是在多模态学习领域&#xff0c;结合视觉和语言的能力已成为研究的热点。最近&#xff0c;一篇名为“Mini-Gemini: Mining the Potential of Multi-modality Vision Language Models”的文章在arX…

[已解决]react打包部署

react打包部署 问题 npm install 命令无反应 思路 换成 yarn install 安装完hadoop的环境后&#xff0c;使用node的yarn会报错&#xff1a; 我们在cmd使用where yarn&#xff0c;如下&#xff1a; 看你想保留哪一个&#xff0c;我平时node用的多&#xff0c;就把hadoop的y…

飞书API(5):查看多维表 28 种数据类型的数据结构

一、引入 前面我们用于测试的数据集其实都是比较常用的数据&#xff0c;比如说文本、数字、单选等&#xff0c;但飞书多维表并不仅仅只有这些数据&#xff0c;截止发文&#xff0c;飞书多维表应用上支持28种数据类型&#xff0c;在数据层面飞书官方只提供了23种数据类型&#…

Cadence软件安装

Cadence软件 iscape 用于安装cadence家的安装软件 解压缩安装包tar -xvf IScape04.23.tar.gz运行bash IScape/iscape/bin/iscape.sh 设置默认安装路径(可选)IC618 这里使用的是IC618.320版本作为示例,其他版本安装过程差不多 安装 首先安装终端模拟器,不然安装之后会失败…

【前端】校园二手书交易系统javascript+css+html (源码)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

Vnode是如何产生的?

源码 流程图 源码解读 Vue.js2.0中有两种生成方式&#xff1a;第一种是直接在Vue对象的option中添加render字段&#xff1b;第二种是像Vue.js 1.x版本那样写一个模板或者指定一个el根元素&#xff0c;它会首先转换成模板&#xff0c;经过HTMI语法解析器生成一个 ast 抽象语法树…

JAVAEE——IP协议

文章目录 IP协议IP协议报头格式IP协议报头的各个区段四位版本四位首部长度八位服务类型16位总长度16位标识&#xff0c;3位标志&#xff0c;13位片偏移八位生存时间八位协议 地址管理IP地址解决提议1&#xff1a;动态分配Ip地址解决提议2&#xff1a;NAT机制 IP协议 IP协议报头…

【新手入门必看】从零开始学指针

我使用VS CODEMSYS2的编译环境进行学习&#xff0c;想使用VS CODE进行C/C代码编写的小伙伴参考这篇文章进行环境配置VS Code 配置 C/C 编程运行环境&#xff08;保姆级教程&#xff09; 一、指针的引入 指针地址 #include <stdio.h>int main() {int a 10;printf(&quo…

编写函数fun,函数的功能是:根据以下公式计算s,计算结果作为函数值返回;n通过形参传入。

本文收录于专栏:算法之翼 https://blog.csdn.net/weixin_52908342/category_10943144.html 订阅后本专栏全部文章可见。 本文含有题目的题干、解题思路、解题思路、解题代码、代码解析。本文分别包含C语言、C++、Java、Python四种语言的解法完整代码和详细的解析。 题干 编写…

Java:二叉树(1)

从现在开始&#xff0c;我们进入二叉树的学习&#xff0c;二叉树是数据结构的重点部分&#xff0c;在了解这个结构之前&#xff0c;我们先来了解一下什么是树型结构吧&#xff01; 一、树型结构 1、树型结构简介 树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>…

深度学习系列64:数字人openHeygen详解

1. 主流程分析 从inference.py函数进入&#xff0c;主要流程包括&#xff1a; 1&#xff09; 使用cv2获取视频中所有帧的列表&#xff0c;如下&#xff1a; 2&#xff09;定义Croper。核心代码为69行&#xff1a;full_frames_RGB, crop, quad croper.crop(full_frames_RGB)。…

openobserve-filebeat配置

优势 rustgolang开发的日志工具组合&#xff0c;自带日志数据存储&#xff0c;简化部署和管理。日志数据可配置保留x天。从日志文件中采集&#xff0c;做到非侵入式日志集中管理。 可从日志内容中提取信息进行报警等二次开发。 下载 openobserve-v0.10.1-windows-amd64 fil…

VL02N交货单清除字段:VLSTK(分配状态)

VL02N交货单清除字段&#xff1a;VLSTK(分配状态) 通过查找增强对应的BADI&#xff1a;LE_SHP_DELIVERY_PROC 修改方法&#xff1a;IF_EX_LE_SHP_DELIVERY_PROC~CHANGE_DELIVERY_HEADER&#xff0c;代码如下&#xff1a;

JSS作业

JSS作业&#xff1a; 1: <script>var cnt parseInt(window.prompt("请输入打印的行数&#xff1a;"));for (var i 1; i < cnt; i){for (var j 1; j < i; j){document.write("*")}document.write("<br>")} </script>…

XTuner 微调 LLM:1.8B、多模态、Agent

两种微调范式&#xff1a;增量预训练和指令微调 在大语言模型下游应用中&#xff0c;主要有两种微调范式&#xff1a;增量预训练和指令微调。增量预训练旨在让模型学习特定领域的常识&#xff0c;而不需要有监督标注的数据&#xff1b;指令微调则是通过高质量的对话数据训练模型…

C语言中, 文件包含处理,#include< > 与 #include ““的区别

文件包含处理 指一个源文件可以将另外一个文件的全部内容包含进来 &#xff23;语言提供了#include命令用来实现文件包含的操作 #include< > 与 #include ""的区别 <> 表示系统直接按系统指定的目录检索 "" 表示系统先在 "" 指定…

vulfocus靶场couchdb 权限绕过 (CVE-2017-12635)

Apache CouchDB是一个开源数据库&#xff0c;专注于易用性和成为"完全拥抱web的数据库"。它是一个使用JSON作为存储格式&#xff0c;JavaScript作为查询语言&#xff0c;MapReduce和HTTP作为API的NoSQL数据库。应用广泛&#xff0c;如BBC用在其动态内容展示平台&…

动态内存管理 柔性数组

文章目录 动态内存函数 malloc freecallocrealloc 重新开辟空间realloc 也可以第一个参数为NULL&#xff0c;则是直接开辟内存&#xff0c;类似于malloc用法 常见的动态内存错误对空指针进行解引用操作对开辟的内存越界访问对非动态开辟的内存使用free释放使用free释放动态开辟…