Netty—Future&Promise
在异步处理时,经常用到这两个接口
首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展。
- jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果;
- netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;
- netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | - | - |
isCanceled | 任务是否取消 | - | - |
isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess | - | 判断任务是否成功 | - |
isCancellable | - | 是否可以取消执行 | |
cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addLinstener | - | 添加回调,异步接收结果 | - |
removeListener | - | 删除回调,异步接收结果 | |
setSuccess | - | - | 设置成功结果 |
setFailure | - | - | 设置失败结果 |
一、JDK原生 Future
关于 java.util.concurrent包下的Future 接口,我想大家应该都很熟悉,用得最多的就是在使用 Java 的线程池 ThreadPoolExecutor 的时候了。在 submit 一个任务到线程池中的时候,返回的就是一个 Future 实例,通过它来获取提交的任务的执行状态和最终的执行结果,我们最常用它的 isDone()
和 get()
方法。
// 尝试取消执行此任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否在正常执行完成之前取消
boolean isCancelled();
// 任务是否完成,完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法都将返回true
boolean isDone();
// 阻塞获取执行结果
V get() throws InterruptedException, ExecutionException;
// 阻塞获取执行结果,指定超时时间
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
接下来,演示一下使用jdk原生Future获取执行结果~
@Slf4j
public class JdkFutureTest01 {
public static void main(String[] args) {
// 线程池
ExecutorService service = newFixedThreadPool(2);
// 提交任务
Future<Object> future = service.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
log.info("执行计算");
Thread.sleep(1000);
return 50;
}
});
try {
System.out.println(future.get());
service.shutdownNow();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
二、Netty包下的 Future
原生的Future功能比较有限,Netty扩展了Future并增加了以下方法:
// 判断任务是否成功
boolean isSuccess();
// 判断是否可以取消执行
boolean isCancellable();
// 获取失败的信息
Throwable cause();
// 添加回调方法,异步接收结果
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
// 添加多个回调方法
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 删除回调方法,异步接收结果
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 删除多个回调方法
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 等待任务结束,如果任务失败,抛出异常
Future<V> sync() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> syncUninterruptibly();
// 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断
Future<V> await() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> awaitUninterruptibly();
// 等待该future在指定的时间限制内完成。
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
// 等待该future在指定的时间限制内完成。
boolean await(long timeoutMillis) throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
// 同上,区别是不可中断阻塞等待过程
boolean awaitUninterruptibly(long timeoutMillis);
// 获取任务结果,非阻塞,还未产生结果时返回 null
V getNow();
通过以上扩展的方法我们可以发现,Netty的Future增加了 sync()
和 await()
方法用于阻塞等待,还提供了 addListener()
方法用于添加回调方法,异步接收结果。
sync()
方法内部会先调用await()
方法,等待await()
方法返回后,会检查该任务是否失败,如果失败则将失败的异常抛出来。即使用await()
方法等待任务结束,如果任务失败,不会抛异常,而是需要通过 isSuccess 判断。然而sync()
方法是直接抛出异常!@Override public Promise<V> sync() throws InterruptedException { await(); rethrowIfFailed(); return this; } private void rethrowIfFailed() { Throwable cause = cause(); if (cause == null) { return; } PlatformDependent.throwException(cause); }
接下来,演示一下使用Netty包下的Future获取执行结果~
@Slf4j
public class NettyFutureTest01 {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
EventLoop eventLoop = eventLoopGroup.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.info("执行计算");
Thread.sleep(1000);
return 66;
}
});
// 阻塞等待
future.sync();
log.info("收到结果{}", future.getNow());
eventLoopGroup.shutdownGracefully();
}
}
又或者使用 addListener()
方法用于添加回调方法,异步接收结果。
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.info("收到结果{}", future.getNow());
eventLoopGroup.shutdownGracefully();
}
});
三、Promise
Future支持阻塞等待、添加回调方法、判断执行状态等,而Promise主要是支持状态设置相关方法。当底层I/O操作通过Promise改变执行状态,我们可以通过同步等待的Future立即得到结果。
// 设置成功结果并回调
Promise<V> setSuccess(V result);
// 同上,区别是是否报错
boolean trySuccess(V result);
// 设置失败异常并回调
Promise<V> setFailure(Throwable cause);
// 同上,区别是是否报错
boolean tryFailure(Throwable cause);
// 设置为不可取消状态
boolean setUncancellable();
可见,Promise作为一个特殊的Future,只是增加了一些状态设置方法。所以它常用于传入I/O业务代码中,用于I/O结束后设置成功(或失败)状态,并回调方法。以下是DefaultPromise
的继承关系:
设置promise的状态其实就是原子地修改result字段为传入的执行结果。值得注意的是,result字段带有volatile关键字来确保多线程之间的可见性。另外,设置完毕状态后,会尝试唤醒所有在阻塞等待该promise返回结果的线程。
// result 字段的原子更新器 private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result"); // 缓存执行结果的字段 private volatile Object result; // Promise所在的线程 private final EventExecutor executor; // 一个或多个回调方法 private Object listeners; // 阻塞线程数量计数器 private short waiters; @Override public Promise<V> setSuccess(V result) { if (setSuccess0(result)) { return this; } throw new IllegalStateException("complete already: " + this); } private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); } private boolean setValue0(Object objResult) { // 原子修改result字段为 objResult if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { if (checkNotifyWaiters()) { notifyListeners(); } return true; } return false; } private synchronized boolean checkNotifyWaiters() { if (waiters > 0) { // 唤醒其他等待线程 notifyAll(); } return listeners != null; }
1、使用Promise同步获取结果
@Slf4j
public class PromiseDemo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DefaultEventLoop eventLoop = new DefaultEventLoop();
Promise<Integer> promise = new DefaultPromise<>(eventLoop);
eventLoop.execute(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("set success");
promise.setSuccess(10);
});
log.info("start...");
log.info("promise.getNow():{}" , promise.getNow());
log.info("promise.get():{}" , promise.get());
}
}
2、使用Promise异步获取结果
@Slf4j
public class PromiseDemo03 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DefaultEventLoop eventLoop = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
eventLoop.execute(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
RuntimeException exception = new RuntimeException("error....hh");
log.debug("set failure,e: {}", exception.getMessage());
promise.setFailure(exception);
});
log.info("start");
log.info("promise.getNow():{}" , promise.getNow());
log.info("promise.get():{}" , promise.get());
}
}
.3、使用Promise同步获取异常 - sync & get
Slf4j
public class PromiseDemo03 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DefaultEventLoop eventLoop = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
eventLoop.execute(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
RuntimeException exception = new RuntimeException("error....hh");
log.debug("set failure,e: {}", exception.getMessage());
promise.setFailure(exception);
});
log.info("start");
log.info("promise.getNow():{}" , promise.getNow());
log.info("promise.get():{}" , promise.get());
}
}
4、使用Promise同步获取异常 - await
@Slf4j
public class PromiseDemo04 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
DefaultEventLoop eventLoop = new DefaultEventLoop();
Promise<Integer> promise = new DefaultPromise<>(eventLoop);
eventLoop.execute(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
RuntimeException exception = new RuntimeException("error....hh");
log.info("set failure,e: {}", exception.getMessage());
promise.setFailure(exception);
});
log.info("start");
log.info("promise.getNow():{}" , promise.getNow());
promise.await();
if (promise.isSuccess()) {
log.info("{}", promise.getNow());
} else {
log.error("{}", promise.cause().toString());
}
}
}
5、使用Promise异步获取异常
@Slf4j
public class PromiseDemo05 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
DefaultEventLoop eventLoop = new DefaultEventLoop();
Promise<Integer> promise = new DefaultPromise<>(eventLoop);
promise.addListener(future -> {
if (promise.isSuccess()) {
log.info("{}", promise.getNow());
} else {
log.error("{}", promise.cause().toString());
}
});
eventLoop.execute(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
RuntimeException exception = new RuntimeException("error....hh");
log.info("set failure,e: {}", exception.getMessage());
promise.setFailure(exception);
});
log.info("start");
log.info("promise.getNow():{}" , promise.getNow());
}
}