什么是ForkJoin
ForkJoin在JDK1.7,并行执行任务!提高效率,大数据量!
大数据:Map Reduce(把大任务拆分为小任务)
ForkJoin特点:工作窃取
这个里面维护的都是双端队列
当第一行的任务执行完后,会去第二行窃取未执行的任务
缺点:可能会竞争同一个任务
ForkJoin操作
ForkJoinPool的方法
例:
import java.util.concurrent.RecursiveTask;
/**
* 计算求和任务
* 如何使用ForkJoin
* 1.通过ForkJoinPool通过它来执行
* 2.计算任务:ForkJoinPool.execute(ForkJoinTask<?> task)
* 3. 计算类要继承RecursiveTask
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
//临界值
private Long temp = 10000L;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
//计算方法
@Override
protected Long compute() {
if ((end-start)<temp){
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
}else { //分支合并计算ForkJoin 递归
//中间值
long midden = (start + end) / 2;
ForkJoinDemo task1 = new ForkJoinDemo(start, midden);
//拆分任务,把任务压入线程队列
task1.fork();
ForkJoinDemo task2 = new ForkJoinDemo(midden + 1, end);
//拆分任务,把任务压入线程队列
task2.fork();
return task1.join() + task2.join();
}
}
}
//基础递归方法
//public class ForkJoinDemo {
// public static void main(String[] args) {
// long sum = 0;
// for (long i = 1; i <= 10_0000_0000; i++) {
// sum += i;
// }
// System.out.println(sum);
// }
//}
测试:
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// test1();
// test2();
// test3();
}
public static void test1() {
Long sum = 0L;
long start = System.currentTimeMillis();
for (Long i = 1L; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("总和:" + sum + " 时间:" + (end - start));
}
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> forkJoinTask = new ForkJoinDemo(0L, 10_0000_0000L);
//提交任务
ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinTask);
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("总和:" + sum + " 时间:" + (end - start));
}
public static void test3(){
long start = System.currentTimeMillis();
//Stream并行流 range -> () rangeClosed -> (]
long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0,Long::sum);
long end = System.currentTimeMillis();
System.out.println("总和:" + sum + " 时间:" + (end - start));
}
}
运行结果
test1:
test2:
test3: