文章目录
- 1、CompletionStage
- 2、创建CompletableFuture对象
- 3、CompletbaleFuture
- 4、函数式接口
- 5、chain链式调用
- 6、实例:电商网站比价
针对前面提到的Future接口的实现类FutureTask的缺点,考虑传入一个回调函数,当任务完成时,自动去调用,since Java8,有了Future接口的新实现CompleteableFuture,CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
1、CompletionStage
CompletionStage是CompletableFuture实现的接口,代表异步计算过程中的某一个阶段,一个阶段完成后,可能触发另一个阶段。类比Linux的管道符。
stage.thenApply(x -> square(x))
.thenAccept(x -> System.out.println(x))
.thenRun(() -> System.out.println())
一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。
2、创建CompletableFuture对象
CompletableFuture是Future接口新的实现类,是对实现异步任务的的再一次扩展。创建CompletableFuture对象:
CompletbaleFuture future = new CompletableFuture();
可以看到这样创建的是一个未完成的CompletableFuture对象,即这个构造方法只是语法层面提供一下,真正创建CompletableFuture对象并不用它。正确用法为调用:
runAsync
supplyAsync
无返回值的,传参为:
- Runnable对象
- Runnable对象+线程池
有返回值的:
- Supplier(供给型函数式接口)
- Supplier+线程池
调用runAsync和supplyAsync这两个静态方法时,关于Executor参数:
- 没有指定Executor,使用默认的
ForkJoinPool.commonPool()
做为线程池来执行异步任务 - 指定了Executor,则使用指定的线程池
前面提到的Future(FutureTask)+ 线程池来提高执行效率,这里CompletableFuture也可以结合线程池。
public class CompletDemo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(completableFuture.get());
}
}
指定线程池后,线程名称发生变化:
public class CompletDemo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Executor pool = Executors.newFixedThreadPool(3);
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
},pool);
System.out.println(completableFuture.get());
}
}
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
return "9527";
});
System.out.println(completableFuture.get());
public class CompletDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Executor pool = Executors.newFixedThreadPool(3);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
return "9527";
},pool);
System.out.println(completableFuture.get());
}
}
3、CompletbaleFuture
从Java8开始引入CompletbaleFuture,它是Future功能的增强版,会较少阻塞和轮询,可以传入回调函数,当异步任务完成或者发生异常时,自动调用传入的回调方法。
上面创建CompletableFuture对象时,演示的CompletableFuture和FutureTask差不多,这里演示CompletableFuture的回调:
- whenComplete:传入一个BiConsumer,即异步任务完成后自己调用的逻辑
- exceptionally:传入一个Function,即异步任务发生异常时的逻辑
public class CompletDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "come in...");
//十以内的随机数
int result = ThreadLocalRandom.current().nextInt(10);
if(result > 5){
throw new RuntimeException("test");
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("----1s后该任务出结果:" + result);
return result;
},pool).whenComplete((v,e) -> { //v是上一步的计算结果,e为上一步发生了的异常
if (null == e){
System.out.println("计算完成,update:" + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
return null;
});
} catch (Exception e) {
e.printStackTrace();
} finally {
pool.shutdown();
}
System.out.println(Thread.currentThread().getName() + "线程接着去忙其他任务了");
//使用默认线程池时,主线程结束会导致线程池立刻关闭,导致分支线程task没来得及输出
//可使用自定义线程池或者sleep,不要让主线程立刻结束
}
}
注意:
- supplyAsync或者runAsync方法不传pool时,则默认使用fork-join-pool线程池,此时主线程结束会导致线程池立刻关闭,可能会导致分支线程task没来得及输出
- 可使用自定义线程池或者让主线程sleep一会儿,总之不要让主线程立刻结束
别程序运行,分支任务结果没输出,一脸懵不知道啥原因。
生产随机数:
//十以内的随机数
int result = ThreadLocalRandom.current().nextInt(10);
get和join的区别:
二者作用基本一样,join在编译阶段不会报异常,不用throw或者try处理
4、函数式接口
以下是后面要用到的几个函数式接口(since Java8):
消费型函数式接口:
两个入参的消费型函数式接口:
生产型函数式接口:
总结:
5、chain链式调用
链式编程一直在用,这里记下@Accessors(之前有个@Builder)
@AllArgsConstructor
@NoArgsConstructor
@Data
@Accessors(chain = true)
public class Student {
private String uid;
private String name;
private String address;
}
加@Accessors注解后,再给对象赋值,就可以直接链式调用。
new Student().setUid("001").setName("code9527").setAddress("Tianjin");
6、实例:电商网站比价
需求分析:
对比同一款商品在各大电商平台的售价,返回一个List,元素格式:
- 《MySQL》 in JD price is 88.05
- 《Mysql》 in TianMao price is 90.43
- ...
定义个实体类:
@AllArgsConstructor
public class NetMall {
@Getter
private String netMallName; //电商平台名称
/**
* 模拟计算某平台某商品的价格
* @param productName 商品名称
* @return 商品价格
*/
public double calcPrice(String productName){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//自定义一个价格计算方式,模拟查到返回一个价格
return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
}
}
实现这个功能,可step by step,即一个一个电商平台去查:
public class CompletablePlatPrice {
static List<String> platList = Arrays.asList(
"JD",
"TaoBao",
"TianMao",
"PDD"
);
public static List<String> getPrice(String productName){
return platList.stream()
.map(t -> String.format(productName + " in %s price is %.2f", t, new NetMall(t).calcPrice(productName)))
.collect(Collectors.toList());
}
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
List<String> priceList = getPrice("mysql基础");
for (String price : priceList) {
System.out.println(price);
}
long endTime = System.currentTimeMillis();
System.out.println("--耗时--" + (endTime-startTime) + "ms");
}
}
String.format方法修改字符串格式,p1为pattern,p2为占位符对应的变量 %.2f即小数点后保留两位。这个功能,使用异步task来实现,即多个CompletableFuture查多个平台:
public static List<String> getPriceByCompletableFuture(String productName){
return platList.stream()
.map(t -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f", t, new NetMall(t).calcPrice(productName))))
.collect(Collectors.toList())
.stream()
.map(t -> t.join())
.collect(Collectors.toList());
}
此后,电商平台数量再增加,新实现的耗时也基本不变,这就是从功能到性能:
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
List<String> priceList = getPrice("mysql基础");
for (String price : priceList) {
System.out.println(price);
}
long endTime = System.currentTimeMillis();
System.out.println("--耗时--" + (endTime-startTime) + "ms");
long startTime2 = System.currentTimeMillis();
List<String> priceList2 = getPriceByCompletableFuture("mysql基础");
for (String price : priceList2) {
System.out.println(price);
}
long endTime2 = System.currentTimeMillis();
System.out.println("--耗时--" + (endTime2-startTime2) + "ms");
}
使用IDEA看下上面的过程,实际是把一个List<String>里的String转成了一个个CompletableFuture对象,得到List<CompletableFuture<Result>>,再二次stream处理,join或者get拿到异步计算的值。