异步回调
所谓异步回调,本质上就是多线程中线程的通信,如今很多业务系统中,某个业务或者功能调用多个外部接口,通常这种调用就是异步的调用。如何得到这些异步调用的结果自然也就很重要了。
Callable、Future、FutureTask
public class test implements Callable<Boolean>{
public static void main(String[] args) {
test a=new test();
FutureTask futureTask=new FutureTask<>(a);
new Thread(futureTask).start();
Object su=null;
try {
su=futureTask.get();
}catch (Exception e){
e.printStackTrace();
}
System.out.println(su);
}
@Override
public Boolean call() throws Exception {
return null;
}
}
FutureTask和Callable都是泛型类,泛型参数表示返回结果的类型。通过FutureTask获取异步线程的执行结果,但是其调用get()方法获取异步结果时,主线程也会被阻塞。属于异步阻塞模式。异步阻塞模式属于主动模式的异步调用,异步回调属于被动模式的异步调用。Java中回调模式的标准实现类为CompletableFuture。由于此类出现时间比较晚,期间Guava和Netty等都提出了自己的异步回调模式API来使用。这里主要介绍CompletableFuture,其他的有时间后面再学习。
CompletableFuture
CompletableFuture实现Future和CompletionStage两个接口。此类的实例作为一个异步任务,可以在自己异步执行完成之后触发一些其他的异步任务,从而达到异步回调的效果。主要方法如下所示:
runAsync和supplyAsync创建子任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
可以看出runAsync没有返回值,supplyAsync有返回值,此处用supplyAsync举例:
ExecutorService executor= Executors.newFixedThreadPool(10);
CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{
return "你好,周先生";
},executor);
System.out.println(completableFuture.get());//输出你好,周先生
executor.shutdown();
上例中的线程池可以自己构造,如若不指定使用CompletableFuture中默认的线程池ForkJoinPool。
handle()方法统一处理异常和结果
//在执行任务的同一个线程中处理异常和结果
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
//可能不在执行任务的同一个线程中处理异常和结果
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(asyncPool, fn);
}
//在指定线程池executor中处理异常和结果
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}
案例:
CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{
throw new RuntimeException("你好");
});
completableFuture.handle(new BiFunction<String,Throwable,String>(){
@Override
public String apply(String s, Throwable throwable) {
if(throwable==null){
System.out.println("mei");;
}else {
System.out.println("出错了");
}
return "ok";
}
});
异步任务的串行执行
主要方法为以下几种:thenApply()、thenAccept()、thenRun()和 thenCompose()。
thenApply()
此方法实现异步任务的串行化执行,前一个任务结果作为下一个任务的入参。
后一个任务与前一个任务在同一个线程中执行
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
//后一个任务与前一个任务不在同一个线程中执行
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
//后一个任务在指定的executor线程池中执行
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
其中泛型参数T:上一个任务所返回结果的类型。泛型参数U:当前任务的返回类型。
案例:
ExecutorService executor= Executors.newFixedThreadPool(10);
CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getId());//12
return "你好,周先生";
},executor).thenApplyAsync(new Function<String,String>() {
@Override
public String apply(String s) {
System.out.println(Thread.currentThread().getId());//13
return "你好,毛先生";
}
});
System.out.println(completableFuture.get());//输出你好,毛先生
executor.shutdown();
thenRun()
此方法不关心任务的处理结果。只要前一个任务执行完成,就开始执行后一个串行任务。而且没有返回值。
//后一个任务与前一个任务在同一个线程中执行
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
//后一个任务与前一个任务可以不在同一个线程中执行
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
//后一个任务在executor线程池中执行
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
thenAccept()
使用此方法时一个任务可以接收(或消费)前一个任务的处理结果,但是后一个任务没有结果输出。
//后一个任务与前一个任务在同一个线程中执行
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
//后一个任务与前一个任务不在同一个线程中执行
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(asyncPool, other, action);
}
//后一个任务在指定的executor线程池中执行
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), other, action);
}
thenCompose()
对两个任务进行串行的调度操作,第一个任务操作完成时,将其结果作为参数传递给第二个任务。
//后一个任务与前一个任务在同一个线程中执行
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(null, fn);
}
//后一个任务与前一个任务不在同一个线程中执行
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(asyncPool, fn);
}
//后一个任务在指定的executor线程池中执行
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor) {
return uniComposeStage(screenExecutor(executor), fn);
}
thenCompose()方法第二个任务的返回值是一个CompletionStage异步实例。
ExecutorService executor= Executors.newFixedThreadPool(10);
CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getId());//12
return "你好,周先生";
},executor).thenComposeAsync(new Function<String,CompletableFuture<String>>(){
@Override
public CompletableFuture<String> apply(String s) {
return CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getId());//12
return "你好,毛先生";
});
}
});
System.out.println(completableFuture.get());//输出你好,毛先生
executor.shutdown();
异步任务的合并执行
主要实现为以下几个方法:thenCombine()、runAfterBoth()、
thenAcceptBoth()。
thenCombine()
thenCombine()会在两个CompletionStage任务都执行完成后,一块来处理两个任务的执行结果。如果要合并多个任务,可以使用allOf()。
//合并第二步任务的CompletionStage实例,返回第三步任务的CompletionStage
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
//不一定在同一个线程中执行第三步任务的CompletionStage实例
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
//第三步任务的CompletionStage实例在指定的executor线程池中执行
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
其中泛型参数T:表示第一个任务所返回结果的类型。泛型参数U:表示第二个任务所返回结果的类型。泛型参数V:表示第三个任务所返回结果的类型。
案例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getId());//12
return "你好,周先生";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getId());//12
return "你好,毛先生";
});
CompletableFuture<String> future3 = future1.thenCombine(future2, new BiFunction<String, String, String>(){
@Override
public String apply(String s, String s2) {
return s+"-----"+s2;
}
});
String s = future3.get();
System.out.println(s);//你好,周先生-----你好,毛先生
而runAfterBoth()方法不关注每一步任务的输入参数和输出参数,thenAcceptBoth()中第三个任务接收第一个和第二个任务的结果,但是不返回结果。
异步任务的选择执行
若异步任务的选择执行不是按照某种条件进行选择的,而按照执行速度进行选择的:前面两并行任务,谁的结果返回速度快,其结果将作为第三步任务的输入。对两个异步任务的选择可以通过CompletionStage接口的applyToEither()、acceptEither()等方法来实现。
applyToEither()
//和其他任务进行速度比较,最快返回的结果用于执行fn回调函数
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(null, other, fn);
}
//和其他任务进行速度比较,最快返回的结果用于执行fn回调函数,不一定在同一个线程中执行fn回调函数
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(asyncPool, other, fn);
}
//和其他任务进行速度比较,最快返回的结果用于执行fn回调函数,在指定线程池执行fn回调函数
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor) {
return orApplyStage(screenExecutor(executor), other, fn);
}
案例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getId());//12
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "你好,周先生";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getId());//12
return "你好,毛先生";
});
CompletableFuture<String> future3 = future1.applyToEither(future2, new Function<String, String>(){
@Override
public String apply(String s) {
return s;
}
});
String s = future3.get();
System.out.println(s);//你好,毛先生