Java Stream
1. 问题引入
学习了尚硅谷的JUC,周阳老师讲的商城比较价格的案例,自己模拟了一个多线程的demo, 多线程处理任务并且汇总结果,出现了疑问,实例代码放在下面,读者有兴趣的话可ctrl+cv玩一玩
如下是自定义的任务类
public class Task implements Callable<String> {
private String taskName;
private Date taskTime;
// 构造函数,toString那些略去
//这里是因为前面还弄了线程池处理任务并且汇总结果的,所以实现了Callable接口,
//这个可以略过不看,因为疑问跟这个无关
@Override
public String call() throws Exception {
try {
System.out.println("正在处理任务" + this.getTaskName());
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
return this.getTaskName();
}
}
}
Main方法
package com.manytask02;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
// 主方法
public class Main {
public static void main(String[] args) {
// 1.顺序处理任务
//testOrder();
//2.CompletableFuture
try {
testCompletable();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 顺序处理任务方法
public static void testOrder() {
List<Task> tasks = new ArrayList<>();
// 创建一系列任务 ( 5个任务)
for ( int i = 0; i < 5; ++i ) {
tasks.add(new Task("任务" + i, new Date()));
}
// 一 顺序处理
System.out.println("顺序处理任务开始了~~~~~~~");
long start = System.currentTimeMillis();
List<String> strings = takcleByOrder(tasks);
long end = System.currentTimeMillis();
System.out.println("顺序处理任务耗时 " + (end-start));
//返回处理结果
strings.forEach(System.out::println);
}
public static List<String> takcleByOrder(List<Task> tasks) {
List<String> ans = new ArrayList<>();
for (Task task : tasks) {
ans.add(takcleTask(task));
}
return ans;
}
//========================================================================
// CompletableFuture工具
public static void testCompletable() throws ExecutionException, InterruptedException {
List<Task> tasks = new ArrayList<>();
// 创建一系列任务 ( 5个任务)
for ( int i = 0; i < 5; ++i ) {
tasks.add(new Task("任务" + i, new Date()));
}
// 一 异步处理
System.out.println("异步处理任务开始了~~~~~~~");
long start = System.currentTimeMillis();
List<String> strings = takcleByAsync02(tasks);
long end = System.currentTimeMillis();
System.out.println("异步处理任务耗时 " + (end-start));
//返回处理结果
strings.forEach(System.out::println);
}
private static List<String> takcleByAsync02(List<Task> tasks) {
return tasks.stream()
.map(task -> CompletableFuture.supplyAsync(()->takcleTask(task)))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
// return tasks.stream()
// .map(task -> CompletableFuture.supplyAsync(()->takcleTask(task)))
// .map(CompletableFuture::join)
// .collect(Collectors.toList());
}
}
疑问出来了,如下图
上图中两个红框框中就出现了疑问,第一个红框框中划线的部分,既然前后结果类型一样,为什么不能写成下面的两个map那种形式呢??
从上面主方法(main)代码可以看出,是在比较顺序处理任务和异步处理任务所花费的时间。可是事实却是:写成下面的形式,就不是异步处理任务了,耗时还是5秒。为什么呢,难道是两个map连着一起写有问题吗?
猜想:两个map连着一起写的话,第一个map里面执行了,会紧接着执行另一个map里面的操作,并不是等一个map将流中的所有元素映射完了才执行第二个map里面的映射,由于CompletableFuture的join方法会阻塞,每当第一个map处理任务时,紧接着就执行join了,造成了阻塞,这就相当于是在顺序执行任务了。
public class Mapmap {
public static void main(String[] args) {
List<Integer> list = Arrays.asList(1, 3, 5, 7, 9);
List<Integer> integers = list.stream()
.map(i -> {
i *= 2;
System.out.println(i);
return i;
})
.map(j -> {
j *= 2;
System.out.println(j);
return j;
})
.collect(Collectors.toList());
}
}
测试一下,如果是像猜想里面的那样的话,输出内容就肯定是2,4,6,12,10,20,14,28,18,36; 如果不是的话那么就是2,6,10,14,18,4,12,20,28,36
印证了猜想。具体什么原因我也看不懂源码