JUC-线程Callable使用与FutureTask源码阅读
Callable简单使用
带返回值的线程(实现implements Callable<返回值类型>),使用示例
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
/**
- 实现Callable<T> 接口
- 含返回值且可抛出异常的线程创建启动方式
- @author fatah
*/
public class Demo5 implements Callable<String>{
public String call() throws Exception {
System.out.println("正在执行新建线程任务");
Thread.sleep(2000);
return "新建线程睡了2s后返回执行结果";
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
Demo5 d = new Demo5();
/* call()只是线程任务,对线程任务进行封装
class FutureTask<V> implements RunnableFuture<V>
interface RunnableFuture<V> extends Runnable, Future<V>
*/
FutureTask<String> task = new FutureTask<>(d);
Thread t = new Thread(task);
t.start();
System.out.println("提前完成任务...");
//获取任务执行后返回的结果
String result = task.get();
System.out.println("线程执行结果为"+result);
}
}
FutureTask面向对象方式学习
为了定义这样一个事物“有返回结果”,暂且称之为RunnableFuture。它集合了Runnable和Future两种事物
(其中Future接口 表示了一个任务的生命周期,是一个可取消的异步运算,可以把它看作是一个异步操作的结果的占位符,它将在未来的某个时刻完成,并提供对其结果的访问。在并发包中许多异步任务类都继承自Future)
其中
-
Future接口主要使用get()方法获取线程任务返回值
-
Runnable接口的run()方法是执行线程任务主体
public interface Runnable {
public abstract void run();
}
public interface Future<V> {
//取消任务
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
//任务是否完成
boolean isDone();
// 获取任务完成情况
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
结合两个接口创造出RunnableFuture接口
FutureTask构造方法
当使用new FutureTask()构造对象时,需要Callable接口类。如果是Runnable接口,使用适配器转换为Callable类型对象。*
- FutureTask(Callable callable)
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
- FutureTask(Runnable runnable, V result)
此处使用了适配器模式,通过工具方法Executors.callable方法把Runnable转化为Callable调用
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
Executors.java中
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
/**
* 运行给定任务并返回给定结果的可调用对象
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
其中callable是FutureTask内部私有变量
private Callable<V> callable;
Run方法-RunnableFuture接口实现的
Callable c = callable;
result = c.call();
1.此处调用函数式接口Callable业务**call()**方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); (1.重点!!!)
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);(2.重点!!!)
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
运行任务,如果任务状态为NEW状态,则利用CAS修改为当前线程。
2.业务任务执行完毕调用set(result)方法设置执行结果。set(result)源码如下:
其中outcome为FutureTask私有变量
private Object outcome;
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
Get()方法-Future接口实现
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
说明:FutureTask 通过get()方法获取任务执行结果。如果任务处于未完成的状态(state <= COMPLETING
),就调用awaitDone方法(后面单独讲解)等待任务完成。任务完成后,通过report方法获取执行结果或抛出执行期间的异常。report源码如下。
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
可以看到使用get()方法会调用awaitDone等待线程完成,同时响应线程“取消”等操作。
可以看到report中 return (V)x; 即run方法中存储线程任务执行返回结果outcome