1. parallelStream是什么:
java 8
引入了并行流的概念来进行并行处理,而并行流(Parallel Stream)利用所有可用CPU内核的优势,并行处理任务。其原理(Parallel Stream)是可以把大任务分成多个小任务执行, 最后再把执行结果进行合并, ForkJoinPool用工作窃取算法实现。
2.Java8的paralleStream是线程安全吗
一个简单例子,循环1000000次,往list中插入数据,最后看list的长度。
public class TestParallel {
public static void main(String[] args) {
StopWatch stopWatch = new StopWatch();
List<Integer> list = new ArrayList<>();
stopWatch.start("TaskOneName");
IntStream.range(0, 1000000).forEach(list::add);
stopWatch.stop();
System.out.println("list size:" + list.size());
System.out.println("当前任务名称:" + stopWatch.currentTaskName() +" 执行时间:" + stopWatch.getTotalTimeMillis());
}
}
执行时间以及list 中的size见下图
使用并发流
public class TestParallel {
public static void main(String[] args) {
StopWatch stopWatch = new StopWatch();
List<Integer> list = new ArrayList<>();
stopWatch.start("TaskOneName");
IntStream.range(0, 10000).parallel().forEach(list::add);
stopWatch.stop();
System.out.println("list size:" + list.size());
System.out.println("当前任务名称:" + stopWatch.currentTaskName() +" 执行时间:" + stopWatch.getTotalTimeMillis());
}
}
执行情况一:
执行情况二:
从上图可看出,list的add操作并非我们想要的结果。
3.如果非要用并行流怎么办
加锁吧。
public class TestParallel {
public static void main(String[] args) {
StopWatch stopWatch = new StopWatch();
List<Integer> list = new ArrayList<>();
stopWatch.start("TaskOneName");
Lock lock = new ReentrantLock();
IntStream.range(0, 10000).parallel().forEach(adata->{
lock.lock();
try {
list.add(adata);
}finally {
lock.unlock();
}
});
stopWatch.stop();
System.out.println("list size:" + list.size());
System.out.println("当前任务名称:" + stopWatch.currentTaskName() +" 执行时间:" + stopWatch.getTotalTimeMillis());
}
}
执行结果:
4.使用线程安全的ArrayList:
使用线程安全的ArrayList: CopyOnWriteArrayList