Thread类的run方法返回值类型是void,因此我们无法直接通过Thread类获取线程执行结果。如果要获取线程执行结果就需要使用FutureTask。用法如下:
class CallableImpl implements Callable{
@Override
public Object call() throws Exception {
//do somethings
//return result;
}
}
FutureTask futureTask = new FutureTask(new CallableImpl());
new Thread(futureTask).start();
try {
//获取线程执行结果
Object result = futureTask.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
在实例化FutureTask时构造函数传入了实现Callable接口的实例。而在实例化Thread类时,构造函数传入FutureTask实例。因此,我们可以猜测线程在执行run方法时必定会调用call方法,并且保存call方法返回的结果。
总览
通过类图,可以看到FutureTask主要实现了Runnable和Future。实现Runnable的run方法作为线程的执行体。正因为实现了Runnable,我们才可以使用FutureTask来创建线程。Future接口定义了如下几个方法
public interface Future<V> {
/**
*取消线程执行的任务
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
*判断任务是否被取消
*如果任务在正常完成前因调用cancel方法而被取消,返回true
*/
boolean isCancelled();
/**
* 判断任务是否完成,如果任务已经完成,返回true
* 完成可能是由于正常终止、异常或取消——在这些情况下,此方法都将返回true。
*/
boolean isDone();
/**
* 获取任务执行的结果,调用该方法时,如果任务还没有执行完成,将会阻塞当前线程
* 直到任务完成或者被中断
*/
V get() throws InterruptedException, ExecutionException;
/**
* 获取任务执行的结果,调用该方法时,如果任务还没有执行完成,将会阻塞当前线程
* 直到任务完成或者被中断或者超时
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
可以看出FutureTask具备获取线程任务执行结果、取消线程任务的能力。
成员变量
public class FutureTask<V> implements RunnableFuture<V> {
//state标识任务的运行状态
private volatile int state;
//新建状态,这是任务的最初状态
private static final int NEW = 0;
//正在完成,任务执行体已经完成,正在保存执行结果
private static final int COMPLETING = 1;
//任务正常完成
private static final int NORMAL = 2;
//任务执行过程中发生异常
private static final int EXCEPTIONAL = 3;
//任务被取消
private static final int CANCELLED = 4;
//正在中断运行任务的线程
private static final int INTERRUPTING = 5;
//任务被中断
private static final int INTERRUPTED = 6;
//任务的执行体,任务完成后,将会设置成null
private Callable<V> callable;
//任务执行体的返回结果
private Object outcome;
//运行callable的线程
private volatile Thread runner;
//等待任务执行结果的线程队列
private volatile WaitNode waiters;
static final class WaitNode {
//当前节点代表的线程
volatile Thread thread;
//下一个节点
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
}
成员变量的含义只有在分析具体的方法代码和作者的注释时才能知晓。接下来具体分析FutureTask是如何实现保存任务执行结果和获取结果的。
源码分析
在分析FutureTask源码前,需要对其中使用到jdk的方法做个简单的介绍。其中Unsafe类提供的cas操作的相关方法。
public final native boolean compareAndSwapObject(Object obj,
long offset, Object expect, Object update);
- obj :要修改字段的对象;
- offset :要修改的字段在对象内的偏移量;
- expect : 字段的期望值;
- update :如果该字段的值等于字段的期望值,用于更新字段的新值;
LockSupport的park和unpark提供了阻塞和解除阻塞线程的有效方法,park会使当前线程阻塞,unpark可以唤醒指定的线程。
public static void park() {
UNSAFE.park(false, 0L);
}
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
构造函数
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
接收callable实例并赋值给成员变量callable,将任务状态初始化为NEW。
run方法
public void run() {
//先检查任务的状态,如果任务状态是NEW。利用cas操作设置runner为当前执行任务的线程
//这里是为了确保在多线程的情况下任务执行和结果设置的安全性及一致性
//比如下面的代码,会导致一个任务在多个线程中运行。
// FutureTask futureTask = new FutureTask(task);
// futureTask.run();
// Thread thread = new Thread(futureTask);
// Thread thread1 = new Thread(futureTask);
// thread.start();
// thread1.start();
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//调用callable方法,执行真正的任务逻辑
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//执行异常处理,将任务状态修改为EXCEPTIONAL
setException(ex);
}
if (ran)
//任务执行体运行成功,保存任务结果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
//handlePossibleCancellationInterrupt需要结合cancel方法分析
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
//在outcome还没有保存返回结果前,先将任务状态设置为COMPLETING(正在完成)
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//保存任务运行结果
outcome = v;
//将任务状态设置为正常完成
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//结束任务:唤醒所有因调用get方法而阻塞的线程,并清空等待队列
finishCompletion();
}
}
protected void setException(Throwable t) {
//在outcome还没有保存返回结果前,先将任务状态设置为COMPLETING(正在完成)
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//将任务的结果设置为异常信息
outcome = t;
//将任务状态设置为EXCEPTIONAL(异常中断)
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
//结束任务:唤醒所有因调用get方法而阻塞的线程,并清空等待队列
finishCompletion();
}
}
/**
*唤醒所有因调用get方法而阻塞的线程,并清空等待队列
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
//先将成员变量waiters设置为null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
//从头开始遍历等待队列,唤醒其每个节点对应的线程
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
//获取下一个节点
WaitNode next = q.next;
if (next == null)
break;
//当前节点next指向null,当前节点从等待队列中断开,之后被GC回收
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
从run方法中,FutureTask的生命周期和线程的生命周期有一定的关联:
当FutureTask的state为NEW时,执行任务的线程可能处于New状态、Runable状态(线程在操作系统中被创建,处于等待CPU时间或运行中)、Blocked状态(线程在等待锁)。
当程序调用call方法后,在将call的执行结果保存到FutureTask的成员变量outcome前,会将FutureTask设置为COMPLETING。此时FutureTask的COMPLETING 对应线程的Runable状态。
如果程序调用call发生异常,FutureTask最终被设置为EXCEPTIONAL,正常执行则被设置为NORMAL,此时线程即将进入Terminated状态。
get方法
/**
*无限时长的等待获取执行结果
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
//COMPLETING代表任务正在保存执行结果。<=这个状态,说明任务执行还没有保存执行结果
//则会调用awaitDone方法等待执行结果。
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
/**
*有限时长的等待获取执行结果
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
/**
*等待任务完成,在被中断或超时时终止
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
//当前节点是否在等待队列中
boolean queued = false;
for (;;) {
//检查当前获取结果的线程是否被中断,如果被中断,从等待队列中移除,并抛出中断异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
//1、创建一个新的节点
q = new WaitNode();
else if (!queued)
//如果新的节点没有在排队,将这个节点加入到队列的头部
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
//判断是否超时
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//阻塞当前线程
LockSupport.parkNanos(this, nanos);
}
else
阻塞当前线程
LockSupport.park(this);
}
}
/**
*遍历等待队列移除节点
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
//首先将节点thread设置为null,防止节点被意外地再次使用或唤醒
//同时thread =null的节点是作为需要被移除节点的标记
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
//声明pre q s 三个WaitNode变量
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
//如果节点的thread !=null说明当前节点不需要被移除,遍历下一个
if (q.thread != null)
pred = q;
else if (pred != null) {
//上一个节点pred != null,说明当前节点不是队列的第一个节点
//则将pred.next指向当前节点的下一个节点s,即跳过了当前节点
pred.next = s;
if (pred.thread == null) // check for race
//队列在遍历过程中发生了变化,从头开始遍历
continue retry;
}
//如果当前节点是头节点,将头节点设置为当前节点的下一个节点
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
//队列在遍历过程中发生了变化,从头开始遍历
continue retry;
}
//完成对等待队列的遍历,成功移除了节点(无论是通过更新队列头部还是通过跳过内部节点)
//退出
break;
}
}
}
cancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
cancel返回值是boolean类型,任务取消成功,返回true,任务取消失败返回false;参数mayInterruptIfRunning表示是否尝试取消正在执行中的任务。
1、如果任务状态不是NEW状态,直接返回false,通过对run方法的分析,可以知道当FutureTask状态不为NEW时,任务已经被取消或者已经执行了call方法,无法取消任务。
2、任务状态是NEW
2.1 参数mayInterruptIfRunning为false,设置任务状态为CANCELLED,从run方法中可以得到,正在执行的任务不会被取消,还未开始的任务会被取消。
2.2 参数mayInterruptIfRunning为true,尝试调用正在执行任务的线程的interrupt()方法(在一个线程内部存在着名为interrupt flag的标识,如果一个线程被interrupt,那么它的flag将被设置,但是如果当前线程正在执行可中断方法被阻塞时,如Object的wait方法,Thread的sleep、join方法等,调用interrupt方法将其中断,反而会导致flag被清除)
再回过头看看run方法最后的handlePossibleCancellationInterrupt
public void run() {
......
finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
//handlePossibleCancellationInterrupt需要结合cancel方法分析
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
这个方法到底有什么作用呢,作者通过源代码注释告诉我们这里的自旋目的是如果其他线程调用了cancel(true)方法,确保中断只能传递给当前执行任务的线程runner,并且state在runner线程执行期间最终能够被设置为INTERRUPTED。当线程调用cancel(true)方法方法时,先将任务状态设置为INTERRUPTING,再执行运行任务线程的中断方法,最后将任务状态设置为INTERRUPTED。执行任务的线程检测到有其他线程正在中断任务时,会等待完成中断操作后再退出。
通过对源码的阅读,我们大致了解到了:任务是如何执行并且保存执行结果,完成任务后,如何唤醒等待获取执行结果的线程。在获取执行结果时,如果任务还未完成,如何进入等待队列,如果等待超时或者被中断,如何从等待队列中移除。