FutureTask源码分析

news2025/1/22 9:05:49

        Thread类的run方法返回值类型是void,因此我们无法直接通过Thread类获取线程执行结果。如果要获取线程执行结果就需要使用FutureTask。用法如下:

class CallableImpl implements Callable{

    @Override
    public Object call() throws Exception {
        //do somethings
        //return result;
    }
}
FutureTask futureTask = new FutureTask(new CallableImpl());
new Thread(futureTask).start();
try {
       //获取线程执行结果
       Object result = futureTask.get();
     } catch (InterruptedException e) {
            e.printStackTrace();
     } catch (ExecutionException e) {
            e.printStackTrace();
    }

        在实例化FutureTask时构造函数传入了实现Callable接口的实例。而在实例化Thread类时,构造函数传入FutureTask实例。因此,我们可以猜测线程在执行run方法时必定会调用call方法,并且保存call方法返回的结果。

总览

    通过类图,可以看到FutureTask主要实现了Runnable和Future。实现Runnable的run方法作为线程的执行体。正因为实现了Runnable,我们才可以使用FutureTask来创建线程。Future接口定义了如下几个方法
public interface Future<V> {
/**
 *取消线程执行的任务
 */
boolean cancel(boolean mayInterruptIfRunning);
/**
 *判断任务是否被取消
 *如果任务在正常完成前因调用cancel方法而被取消,返回true
 */
boolean isCancelled();
/**
 * 判断任务是否完成,如果任务已经完成,返回true
 * 完成可能是由于正常终止、异常或取消——在这些情况下,此方法都将返回true。
 */
boolean isDone();
/**
 * 获取任务执行的结果,调用该方法时,如果任务还没有执行完成,将会阻塞当前线程
 * 直到任务完成或者被中断
 */
V get() throws InterruptedException, ExecutionException;
/**
 * 获取任务执行的结果,调用该方法时,如果任务还没有执行完成,将会阻塞当前线程
 * 直到任务完成或者被中断或者超时
 */
V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

      可以看出FutureTask具备获取线程任务执行结果、取消线程任务的能力。

成员变量

public class FutureTask<V> implements RunnableFuture<V> {
    //state标识任务的运行状态
    private volatile int state;
    //新建状态,这是任务的最初状态
    private static final int NEW          = 0;
    //正在完成,任务执行体已经完成,正在保存执行结果
    private static final int COMPLETING   = 1;
    //任务正常完成
    private static final int NORMAL       = 2;
    //任务执行过程中发生异常
    private static final int EXCEPTIONAL  = 3;
    //任务被取消
    private static final int CANCELLED    = 4;
    //正在中断运行任务的线程
    private static final int INTERRUPTING = 5;
    //任务被中断
    private static final int INTERRUPTED  = 6;
    
    //任务的执行体,任务完成后,将会设置成null
    private Callable<V> callable;
    //任务执行体的返回结果
    private Object outcome; 
    //运行callable的线程
    private volatile Thread runner;
    //等待任务执行结果的线程队列
    private volatile WaitNode waiters;

    static final class WaitNode {
        //当前节点代表的线程
        volatile Thread thread;
        //下一个节点
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

}

         成员变量的含义只有在分析具体的方法代码和作者的注释时才能知晓。接下来具体分析FutureTask是如何实现保存任务执行结果和获取结果的。

源码分析

        在分析FutureTask源码前,需要对其中使用到jdk的方法做个简单的介绍。其中Unsafe类提供的cas操作的相关方法。

public final native boolean compareAndSwapObject(Object obj, 
long offset, Object expect, Object update);
  • obj :要修改字段的对象;
  • offset :要修改的字段在对象内的偏移量;
  • expect : 字段的期望值;
  • update :如果该字段的值等于字段的期望值,用于更新字段的新值;

   LockSupport的park和unpark提供了阻塞和解除阻塞线程的有效方法,park会使当前线程阻塞,unpark可以唤醒指定的线程。

public static void park() {
    UNSAFE.park(false, 0L);
}
public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

构造函数

public FutureTask(Callable<V> callable) {
    if (callable == null)       
        throw new NullPointerException();    
    this.callable = callable;    
    this.state = NEW;       
}

接收callable实例并赋值给成员变量callable,将任务状态初始化为NEW。

run方法

public void run() {
    //先检查任务的状态,如果任务状态是NEW。利用cas操作设置runner为当前执行任务的线程
    //这里是为了确保在多线程的情况下任务执行和结果设置的安全性及一致性
    //比如下面的代码,会导致一个任务在多个线程中运行。
    //  FutureTask futureTask = new FutureTask(task);
    //  futureTask.run();
    //  Thread thread = new Thread(futureTask);
    //  Thread thread1 = new Thread(futureTask);
    //  thread.start();
    //  thread1.start();
    if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //调用callable方法,执行真正的任务逻辑
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                //执行异常处理,将任务状态修改为EXCEPTIONAL
                setException(ex);
            }
            if (ran)
                //任务执行体运行成功,保存任务结果
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        //handlePossibleCancellationInterrupt需要结合cancel方法分析
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
    
protected void set(V v) {
    //在outcome还没有保存返回结果前,先将任务状态设置为COMPLETING(正在完成)
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        //保存任务运行结果
        outcome = v;
        //将任务状态设置为正常完成
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        //结束任务:唤醒所有因调用get方法而阻塞的线程,并清空等待队列
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    //在outcome还没有保存返回结果前,先将任务状态设置为COMPLETING(正在完成)
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        //将任务的结果设置为异常信息
        outcome = t;
        //将任务状态设置为EXCEPTIONAL(异常中断)
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); 
        //结束任务:唤醒所有因调用get方法而阻塞的线程,并清空等待队列
        finishCompletion();
    }
}

/**
 *唤醒所有因调用get方法而阻塞的线程,并清空等待队列
 */
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        //先将成员变量waiters设置为null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            //从头开始遍历等待队列,唤醒其每个节点对应的线程
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                //获取下一个节点
                WaitNode next = q.next;
                if (next == null)
                    break;
                //当前节点next指向null,当前节点从等待队列中断开,之后被GC回收
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

从run方法中,FutureTask的生命周期和线程的生命周期有一定的关联:

        当FutureTask的state为NEW时,执行任务的线程可能处于New状态、Runable状态(线程在操作系统中被创建,处于等待CPU时间或运行中)、Blocked状态(线程在等待锁)。

        当程序调用call方法后,在将call的执行结果保存到FutureTask的成员变量outcome前,会将FutureTask设置为COMPLETING。此时FutureTask的COMPLETING 对应线程的Runable状态。

        如果程序调用call发生异常,FutureTask最终被设置为EXCEPTIONAL,正常执行则被设置为NORMAL,此时线程即将进入Terminated状态。

get方法

/**
 *无限时长的等待获取执行结果
 */
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    //COMPLETING代表任务正在保存执行结果。<=这个状态,说明任务执行还没有保存执行结果
    //则会调用awaitDone方法等待执行结果。
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}
/**
 *有限时长的等待获取执行结果
 */
public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

/**
 *等待任务完成,在被中断或超时时终止
 */
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    //当前节点是否在等待队列中
    boolean queued = false;
    for (;;) {
        //检查当前获取结果的线程是否被中断,如果被中断,从等待队列中移除,并抛出中断异常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            //1、创建一个新的节点
            q = new WaitNode();
        else if (!queued)
            //如果新的节点没有在排队,将这个节点加入到队列的头部
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                    q.next = waiters, q);
        else if (timed) {
            //判断是否超时
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            //阻塞当前线程
            LockSupport.parkNanos(this, nanos);
        }
        else
            阻塞当前线程
            LockSupport.park(this);
    }
}
/**
*遍历等待队列移除节点
*/
private void removeWaiter(WaitNode node) {
    if (node != null) {
        //首先将节点thread设置为null,防止节点被意外地再次使用或唤醒
        //同时thread =null的节点是作为需要被移除节点的标记
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            //声明pre q s 三个WaitNode变量
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                //如果节点的thread !=null说明当前节点不需要被移除,遍历下一个
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    //上一个节点pred != null,说明当前节点不是队列的第一个节点
                    //则将pred.next指向当前节点的下一个节点s,即跳过了当前节点
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        //队列在遍历过程中发生了变化,从头开始遍历
                        continue retry;
                }
                //如果当前节点是头节点,将头节点设置为当前节点的下一个节点
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q, s))
                    //队列在遍历过程中发生了变化,从头开始遍历
                    continue retry;
            }
            //完成对等待队列的遍历,成功移除了节点(无论是通过更新队列头部还是通过跳过内部节点)
            //退出
            break;
        }
    }
}

cancel方法

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

cancel返回值是boolean类型,任务取消成功,返回true,任务取消失败返回false;参数mayInterruptIfRunning表示是否尝试取消正在执行中的任务。

1、如果任务状态不是NEW状态,直接返回false,通过对run方法的分析,可以知道当FutureTask状态不为NEW时,任务已经被取消或者已经执行了call方法,无法取消任务。

2、任务状态是NEW

        2.1 参数mayInterruptIfRunning为false,设置任务状态为CANCELLED,从run方法中可以得到,正在执行的任务不会被取消,还未开始的任务会被取消。

        2.2 参数mayInterruptIfRunning为true,尝试调用正在执行任务的线程的interrupt()方法(在一个线程内部存在着名为interrupt flag的标识,如果一个线程被interrupt,那么它的flag将被设置,但是如果当前线程正在执行可中断方法被阻塞时,如Object的wait方法,Thread的sleep、join方法等,调用interrupt方法将其中断,反而会导致flag被清除)

再回过头看看run方法最后的handlePossibleCancellationInterrupt

public void run() {
    ......
    finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        //handlePossibleCancellationInterrupt需要结合cancel方法分析
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}


private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt

        // assert state == INTERRUPTED;

        // We want to clear any interrupt we may have received from
        // cancel(true).  However, it is permissible to use interrupts
        // as an independent mechanism for a task to communicate with
        // its caller, and there is no way to clear only the
        // cancellation interrupt.
        //
        // Thread.interrupted();
    }

        这个方法到底有什么作用呢,作者通过源代码注释告诉我们这里的自旋目的是如果其他线程调用了cancel(true)方法,确保中断只能传递给当前执行任务的线程runner,并且state在runner线程执行期间最终能够被设置为INTERRUPTED。当线程调用cancel(true)方法方法时,先将任务状态设置为INTERRUPTING,再执行运行任务线程的中断方法,最后将任务状态设置为INTERRUPTED。执行任务的线程检测到有其他线程正在中断任务时,会等待完成中断操作后再退出。

        通过对源码的阅读,我们大致了解到了:任务是如何执行并且保存执行结果,完成任务后,如何唤醒等待获取执行结果的线程。在获取执行结果时,如果任务还未完成,如何进入等待队列,如果等待超时或者被中断,如何从等待队列中移除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2157753.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

信息安全工程师(12)网络攻击概述

前言 网络攻击&#xff08;Cyber Attacks&#xff0c;也称赛博攻击&#xff09;是指针对计算机信息系统、基础设施、计算机网络或个人计算机设备的任何类型的进攻动作。这些攻击旨在破坏、揭露、修改、使软件或服务失去功能&#xff0c;或在未经授权的情况下偷取或访问计算机数…

超详细超实用!!!AI编程之cursor编写一个官网(二)

云风网 云风笔记 云风知识库 一、新建html文件 选中添加index.html,输入编写官网要求&#xff0c;自动生成代码&#xff0c;先来个简单的。 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"…

WPS2019 数据透视表多列数据如何显示同一行

在excel表格中&#xff0c;只有行筛选&#xff0c;没有列筛选功能&#xff0c;当我们需要只选取某些列的数据时&#xff0c;使用数据透视表是个可行的方法&#xff0c;但默认生成的数据透视表可观性较差。要如何才能使得数据透视表格式与原来数据格式一样美观易看呢&#xff1f…

Leetcode990.等式方程的可满足性

题目 原题链接 等式方程的可满足性 思路 定义一个长度为26&#xff08;变量为小写字母&#xff09;的数组充当并查集&#xff0c;并将数组中的元素初始化为 -1判断“”并合并元素&#xff0c;将相等的放在一个集合中判断“!”&#xff1b;不等的如果在一个集合中&#xff0c;则…

【Linux】指令和权限的这些细节,你确定都清楚吗?

&#x1f680;个人主页&#xff1a;奋斗的小羊 &#x1f680;所属专栏&#xff1a;Linux 很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~ 目录 前言&#x1f4a5;一、Linux基本指令&#x1f4a5;1.1 mv 指令&#x1f4a5;1.2 cat 指令&#x1f4a5;…

webLogic反序列化漏洞CVE-2017-3506

1.环境搭建 cd vulhub-master/weblogic/weak_password docker-compose up -d 2.判断wls-wsat组件是否存在 拼接/wls-wsat/CoordinatorPortType 查看页面是否有回显 有回显说明存在组件 3.在当前页面抓包 反弹shell 添加请求包内容 <soapenv:Envelope xmlns:soapenv&q…

hCaptcha 图像识别 API 对接说明

hCaptcha 图像识别 API 对接说明 本文将介绍一种 hCaptcha 图像识别 API 对接说明&#xff0c;它可以通过用户输入识别的内容和 hCaptcha验证码图像&#xff0c;最后返回需要点击的小图像的坐标&#xff0c;完成验证。 接下来介绍下 hCaptcha 图像识别 API 的对接说明。 注册…

线程的状态及join()插队方法

一、线程的状态 线程整个生命周期中有6种状态&#xff0c;分别为 NEW 新建状态 、RUNNABLE 可运行状态、TERMINATED 终止状态、TIMED_WAITING计时等待状态、WAITING 等待状态、BLOCKED 阻塞状态 线程各个状态之间的转换&#xff1a; 在 JAVA 程序中&#xff0c;一个线程对象通过…

一文搞懂offset、client、scroll系列及案例

目录 一、offset 1-1、offset系列属性 1-2、offset与style区别 1-3、案例 1-3-1、计算鼠标在盒子内的坐标 1-3-2、拖动模态框 二、client 2-1、client系列属性 三、scroll 3-1、scroll系列属性 3-2、案例 3-2-1、滚动页面一定距离后固定侧边栏 一、offset offset是…

pg入门3—详解tablespaces—下

pg默认的tablespace的location为空&#xff0c;那么如果表设置了默认的tablespace&#xff0c;数据实际上是存哪个目录的呢? 在 PostgreSQL 中&#xff0c;如果你创建了一个表并且没有显式指定表空间&#xff08;tablespace&#xff09;&#xff0c;或者表空间的 location 为…

数据库数据恢复—SQL Server附加数据库出现“错误823”怎么恢复数据?

SQL Server数据库故障&#xff1a; SQL Server附加数据库出现错误823&#xff0c;附加数据库失败。数据库没有备份&#xff0c;无法通过备份恢复数据库。 SQL Server数据库出现823错误的可能原因有&#xff1a;数据库物理页面损坏、数据库物理页面校验值损坏导致无法识别该页面…

【靶点Talk】免疫检查点争夺战:TIGIT能否超越PD-1?

曾经的TIGIT靶点顶着“下一个PD-1”的名号横空出世&#xff0c;三年的“征程”中TIGIT走过一次又一次的失败&#xff0c;然而面对质疑和压力仍有一批公司选择前行。今天给大家分享TIGIT靶点的相关内容&#xff0c;更多靶点科普视频请关注义翘神州B站和知乎官方账号。 TIGIT的“…

C#和数据库高级:虚方法

文章目录 一、抽象方法和抽象类中的思考1.1、回顾抽象方法的特点1.2、针对抽象方法问题的引出 二、虚方法的使用步骤2.1、虚方法重写方法的调用2.2、系统自带的虚方法2.3、重写Equals方法2.4、虚方法和抽象方法的比较 三、虚方法和抽象方法的联系3.1、ToString()方法的应用 一、…

2024/9/23 leetcode 25题 k个一组翻转链表

目录 25.k个一组翻转链表 题目描述 题目链接 解题思路与代码 25.k个一组翻转链表 题目描述 给你链表的头节点 head &#xff0c;每 k 个节点一组进行翻转&#xff0c;请你返回修改后的链表。 k 是一个正整数&#xff0c;它的值小于或等于链表的长度。如果节点总数不是 k 的…

Gartner:中国企业利用GenAI提高生产力的三大策略

作者&#xff1a;Gartner高级首席分析师 雷丝、Gartner 研究总监 闫斌、Gartner高级研究总监 张桐 随着生成式人工智能&#xff08;GenAI&#xff09;风靡全球&#xff0c;大多数企业都希望利用人工智能&#xff08;AI&#xff09;技术进行创新&#xff0c;以收获更多的业务成果…

JS 历史简介

目录 1. JS 历史简介 2. JS 技术特征 1. JS 历史简介 举例&#xff1a;在提交用户的注册信息的时候&#xff0c;为避免注册出现错误后重新填写信息&#xff0c;可以在写完一栏信息后进行校验&#xff0c;并提示是否出现错误&#xff0c;这样会大大提高用户提交的成功率&…

PCL 随机下采样

目录 一、概述 1.1原理 1.2实现步骤 1.3应用场景 二、代码实现 2.1关键函数 2.2完整代码 三、实现效果 PCL点云算法汇总及实战案例汇总的目录地址链接&#xff1a; PCL点云算法与项目实战案例汇总&#xff08;长期更新&#xff09; 一、概述 随机下采样 是一种常用的点…

大模型LLM对话模拟器Dialogue Simulator Visualization可视化工具

伴随着生成式人工智能技术发展&#xff0c;进2年涌现出大语言模型LLM/Agent系统/AI推理等众多方向的技术项目和论文。其中对话系统&#xff0c;智能体交互是用户通过UX界面和AI系统进行交互&#xff0c;这种交互有时候也是多模态&#xff08;用户输入文字/语音/图像&#xff09…

众数信科 AI智能体智慧办公解决方案——众数办公AI

智慧办公解决方案 众数办公AI 智能问答 拥有广域知识库 结合AI交互能力 通过智能人机、多轮对话即可完成 信息检索、知识提炼、内容衍生等问答操作 为用户提供多样化的问答服务 PPT创作 支持用户明确计划创作内容后进行需求提交 即刻智能生成符合需求的文章 支持上百种…

【软件工程】验证软件需求

一、验证需求正确性的四个方面 二、验证软件需求的方法 三、用于需求分析的软件工具 小结 例题 判断题