目录
- 1.什么是 Future 模式?Java 中是如何实现的?
- 2.Callable、Future 与 FutureTask 分别是什么?
- 2.1.Callable 接口
- 2.2.Future 接口
- 2.3.FutureTask 类
- 3.CompletableFuture 类有什么用?
1.什么是 Future 模式?Java 中是如何实现的?
(1)Future 模式是一种并发编程模式,它允许异步执行代码并在未来获取其结果。在 Future 模式中,调用线程可以提交一个任务给另一个线程或线程池,并立即返回一个 Future 对象作为任务的代理。Future 对象表示了尚未完成的任务,并允许调用线程在未来的某个时刻获取任务的结果。Future 模式通常用于处理长时间运行的任务,例如网络请求或耗时的计算。通过使用 Future 模式,调用线程可以避免阻塞并继续执行其他任务,同时仍然能够获得任务的结果。
(2)在 Java 中,Future 模式是通过 Future 接口
来实现的。Java 还提供了 CompletableFuture 类
,它是 Future 接口的实现,并提供了更丰富的功能,例如异步回调和异常处理。Java 设计到的相关接口和类如下图所示:
2.Callable、Future 与 FutureTask 分别是什么?
通常来说,我们使用 Runnable 和 Thread 来创建一个新的线程。但是它们有一个弊端,就是 run() 是没有返回值的。而有时候我们希望开启一个线程去执行一个任务,并且这个任务执行完成后有一个返回值。JDK 提供了 Callable 接口与 Future 类为我们解决这个问题,这也是所谓的“异步”模型。
2.1.Callable 接口
(1)Callable 与 Runnable 类似,同样是只有⼀个抽象方法的函数式接看。不同的是, Callable 提供的方法是有返回值的,而且支持泛型。Callable 接口的特点如下:
- 为了实现 Runnable,需要实现不返回任何内容的 run() 方法,而对于Callable,需要实现在完成时返回结果的 call() 方法;
- call() 方法可以引发异常,而 run() 则不能;
- 为实现 Callable 而必须重写 call() 方法;
- 不能直接替换 runnable,因为 Thread 类的构造方法根本没有 Callable;
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
class MyThread1 implements Runnable {
@Override
public void run() {
//无返回值
}
}
class MyThread2 implements Callable {
@Override
public Object call() throws Exception {
return 1;
}
}
(2)那⼀般是怎么使用 Callable 的呢? Callable⼀般配合线程池工具 ExecutorService 来使用。这里只介绍 ExecutorService 可以使用 submit 方法来让⼀个 Callable 接口执行。它会返回⼀个 Future ,我们后续的程序可以通过这个 Future 的 get 方法得到结果。这里可以看⼀个简单的使用案例:
import java.util.concurrent.*;
class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
// 模拟计算需要⼀秒
Thread.sleep(1000);
return 2;
}
public static void main(String args[]) throws ExecutionException, InterruptedException {
// 使⽤
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
// ExecutorService.submit() 方法返回的其实就是 Future 的实现类 FutureTask
Future<Integer> result = executor.submit(task);
//注意调⽤ get ⽅法会阻塞当前线程,直到得到结果,所以实际编码中建议使⽤可以设置超时时间的重载 get ⽅法
System.out.println(result.get());
}
}
输出结果:
2
2.2.Future 接口
(1)在 Java 中,Future 类是一个泛型接口,位于 java.util.concurrent 包下,其包含的方法如下:
package java.util.concurrent;
// V 表示任务返回值的类型
public interface Future<V> {
//成功取消任务返回 true,否则返回 false
boolean cancel(boolean mayInterruptIfRunning);
//判断任务是否被取消
boolean isCancelled();
//判断任务是否已经执行完成
boolean isDone();
//获取任务执行结果
V get() throws InterruptedException, ExecutionException;
//指定时间内没有返回计算结果就抛出 TimeOutException 异常
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
简单理解 Future:现在有一个任务,提交给了 Future 来处理。任务执行期间我自己可以去做任何想做的事情。并且,在这期间我还可以取消任务以及获取任务的执行状态。一段时间之后,我就可以 Future 那里直接取出任务执行结果。
(2)cancel 方法是试图取消⼀个线程的执行。 注意是试图取消,并不⼀定能取消成功。因为任务可能已完成、已取消、或者⼀些其它因素不能取消,存在取消失败的可能。boolean 类型的返回值是“是否取消成功”的意思。参数 paramBoolean 表示是否采用中断的方式取消线程执行。 所以有时候为了让任务有能够取消的功能,就使用 Callable 来代替 Runnable 。 如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。
2.3.FutureTask 类
(1)上面介绍了 Future 接口。这个接口有⼀个实现类叫 FutureTask 。 FutureTask 是实现的 RunnableFuture 接口的,而 RunnableFuture 接口同时继承了 Runnable 接口和 Future 接口:
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
(2)那 FutureTask 类有什么用?前面说到了 Future 只是⼀个接口,而它里面的 cancel、get、isDone 等方法要自己实现起来都是非常复杂的。所以 JDK 提供了⼀个 FutureTask 类来供我们使用。FutureTask 有两个构造函数,可传入 Callable 或者 Runnable 对象。实际上,传入 Runnable 对象也会在方法内部转换为 Callable 对象:
public class FutureTask<V> implements RunnableFuture<V> {
//...
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
// 通过适配器 RunnableAdapter 来将 Runnable 对象 runnable 转换成 Callable 对象
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
}
FutureTask 相当于对 Callable 进行了封装,管理着任务执行的情况,存储了 Callable 的 call 方法的任务执行结果。
(3)示例代码如下:
import java.util.concurrent.*;
//自定义 Callable,与上面⼀样
class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
//模拟计算需要⼀秒
Thread.sleep(1000);
return 2;
}
public static void main(String args[]) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
FutureTask<Integer> futureTask = new FutureTask<>(new Task());
executor.submit(futureTask);
System.out.println(futureTask.get());
}
}
使用上与第⼀个 Demo 有⼀点小的区别:
- 此处调用 submit 方法是没有返回值的,因为这里实际上是调用的 submit(Runnable task) 方法,而上面的 Demo,调用的是 submit(Callable task) 方法。
- 这里是使用 FutureTask 的 get 方法来获取返回值,而上面的 Demo 是通过 submit 方法返回的 Future 去取值。 在很多高并发的环境下,有可能 Callable 和 FutureTask 会创建多次。FutureTask 能够在高并发环境下确保任务只执行⼀次。这块可以查看 FutureTask 源码。
(4)核心原理
- 在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给 Future 对象在后台完成;
- 当主线程将来需要时,就可以通过 Future 对象获得后台作业的计算结果或者执行状态;
- 一般 FutureTask 多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果;
- 仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get() 方法,一旦计算完成,就不能再重新开始或取消计算;
- get() 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常;
- get() 只计算一次,因此 get() 方法放到最后。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
//比较Runnable 和 Callable 这两个接口
class MyThread1 implements Runnable {
@Override
public void run() {
//无返回值
}
}
class MyThread2 implements Callable {
@Override
public Object call() throws Exception {
return 1;
}
}
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//使用 Runnable 创建线程
new Thread(new MyThread1(), "AA").start();
/*
使用Callable创建线程
不能像上面那样直接创建 new Thread(new MyThread2(), "BB").start();
*/
//FutureTask
FutureTask<Integer> futureTask1 = new FutureTask<>(new MyThread2());
//使用 lambda 表达式进行简化
FutureTask<Integer> futureTask2 = new FutureTask<>(()->{
System.out.println(Thread.currentThread().getName() + " enters the callable .");
return 1;
});
//创建一个线程
new Thread(futureTask2, "Luck").start();
while (!futureTask2.isDone()) {
System.out.println("wait...");
}
//调用 FutureTask 的get()
System.out.println(futureTask2.get());
//只进行一次计算
System.out.println(futureTask2.get());
System.out.println(Thread.currentThread().getName() + " is over !");
}
}
(5)FutureTask 的几种状态
/**
* state 可能的状态转变路径如下:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
state 表示任务的运行状态,初始状态为 NEW。运行状态只会在 set、setException、cancel 方法中终止。COMPLETING、INTERRUPTING 是任务完成后的瞬时状态。
3.CompletableFuture 类有什么用?
(1)Future 在实际使用过程中存在一些局限性,例如不支持异步任务的编排组合、获取计算结果的 get() 方法为阻塞调用等。Java 8 才被引入 CompletableFuture 类可以解决 Future 的这些缺陷。CompletableFuture 除了提供了更为好用和强大的 Future 特性之外,还提供了函数式编程、异步任务编排组合(可以将多个异步任务串联起来,组成一个完整的链式调用)等能力。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
//...
}
(2)可以看到,CompletableFuture 同时实现了 Future 接口
和 CompletionStage 接口
。其中,CompletionStage 接口描述了一个异步计算的阶段。很多计算可以分成多个阶段或步骤,此时可以通过它将所有步骤组合起来,形成异步计算的流水线。CompletionStage 接口中的方法比较多,CompletableFuture 的函数式能力就是这个接口赋予的。从这个接口的方法参数可以发现其大量使用了 Java 8 引入的函数式编程。