Java 并行框架 Fork Join
- 一.Fork Join 简介
- 1.框架说明
- 2.任务说明
- 二.应用示例
- 1.RecursiveTask
- 分组示例
- 分组求和
- 2.RecursiveAction
- 3.CountedCompleter
- 三.ForkJoin 实践
- 代码
- 测试
- 1.测试用 Excel 文件
- 2.读取结果
一.Fork Join 简介
1.框架说明
ForkJoinPool 继承自 AbstractExecutorService , AbstractExecutorService 实现了 ExecutorService 接口
ForkJoin 是 Java 自带的一个并行框架,关于并行和并发的差异则看机器是否为多核配置
ForkJoinPool 之于 ThreadPoolExecutor 的差异即封装了一个双端工作队列,用于缓存父子任务,同时引入工
作窃取算法,如果某个队列任务全部处理完成,则该任务队列的线程从其他队列头取任务进行处理,充分利
用子线程资源
2.任务说明
ForkJoin 基于分治思想将大的工作任务,分解为小的处理
任务抽象类 ForkJoinTask , 预置了三个默认实现
RecursiveTask、RecursiveAction、CountedCompleter
二.应用示例
1.RecursiveTask
分组示例
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;
/**
* @author
* @date 2023-06-08 19:29
* @since 1.8
*/
public class ForkRecursiveTask<T> extends RecursiveTask<List<T>> {
/**
* 任务拆分参数
*/
private int left;
private int mid;
private int right;
private int batch;
/**
* 原始数据
*/
private List<T> list;
public ForkRecursiveTask(int batch, List<T> list){
this.batch = batch;
this.list = list;
}
@Override
public List<T> compute() {
left = 0;
right = list.size();
//判断是否开始处理
if (right > batch){
//ArrayList 二分拆分
mid = (left + right)/2;
List<T> tempLeft = list.subList(left,mid);
List<T> tempRight = list.subList(mid,right);
//创建子任务
ForkRecursiveTask<T> l = new ForkRecursiveTask(batch,tempLeft);
ForkRecursiveTask<T> r = new ForkRecursiveTask(batch,tempRight);
//调用子任务,调用 5,6,7,8,9 (调用右侧,执行左侧,将右侧结果拼接到左侧)
r.fork();
//递归执行,执行 0,1,2,3,4
List<T> tempL = l.compute();
//阻塞取子任务结果
List<T> tempR = r.join();
//聚合:将右侧合并到左侧后面
//tempR.addAll(tempL);
tempL.addAll(tempR);
return tempL;
} else {
return handler(list);
}
}
/**
* 处理方法
* @param temp
* @return
*/
private List<T> handler(List<T> temp){
return new ArrayList<>(temp);
}
}
测试类
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
/**
* @author
* @date 2023-06-08 19:59
* @since 1.8
*/
public class ForkJoin {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(3);
List<Task> list = new ArrayList<>(10);
for (int i=0;i<10;i++){
list.add(new Task(i));
}
ForkRecursiveTask<Task> taskTaskFork = new ForkRecursiveTask<>(3,list);
System.out.println(pool.invoke(taskTaskFork));
}
}
分组求和
分组求和,打印分组过程
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;
/**
* @author
* @date 2023-06-09 20:59
* @since 1.8
*/
public class ForkRecursiveTaskM extends RecursiveTask<Integer> {
private int batch;
private int step = 2;
private List<Integer> list;
public ForkRecursiveTaskM(int batch,int step, List<Integer> list){
this.batch = batch;
this.step = step;
this.list = list;
}
@Override
protected Integer compute() {
int size = list.size();
if (size > batch){
//拆分任务
int length = size/step;
List<List<Integer>> temp = split(list,batch,step);
List<ForkRecursiveTaskM> tasks = new ArrayList<>(temp.size());
for (List<Integer> d :temp){
tasks.add(new ForkRecursiveTaskM(batch,step,d));
}
//调用子任务
for (ForkRecursiveTaskM t:tasks){
t.fork();
}
//取结果
int result = 0;
for (ForkRecursiveTaskM t:tasks){
result += t.join();
}
return result;
} else {
int result=0;
for (Integer i:list){
result += i;
}
System.out.println(list + "---" + result);
return result;
}
}
public <T> List<List<T>> split(List<T> list,int batch,int step){
int size = list.size();
int real = size%batch > 0 ? size/batch + 1 : size/batch;
step = Math.min(real,step);
int length = Math.max(size / step,batch);
List<List<T>> temp = new ArrayList<>(step);
for (int i = 0;i < step;i++){
int start = i * length;
int end = (i + 1) * length;
if ((i + 1) == step){
end = size;
}
temp.add(list.subList(start,end));
}
return temp;
}
}
测试类
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
/**
* @author
* @date 2023-06-08 19:59
* @since 1.8
*/
public class ForkJoin {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(3);
List<Task> list = new ArrayList<>(10);
for (int i=0;i<10;i++){
list.add(new Task(i));
}
List<Integer> sums = new ArrayList<>(10);
for (int i=0;i<10;i++){
sums.add(i);
}
ForkRecursiveTaskM taskTaskFork = new ForkRecursiveTaskM(3,3,sums);
System.out.println(pool.invoke(taskTaskFork));
}
}
2.RecursiveAction
集合分组,并打印分组过程
import java.util.List;
import java.util.concurrent.RecursiveAction;
/**
* @author
* @date 2023-06-14 20:09
* @since 1.8
*/
public class ForkRecursiveAction extends RecursiveAction {
/**
* 任务拆分参数
*/
private int left;
private int mid;
private int right;
private int batch;
/**
* 原始数据
*/
private List<Task> list;
public ForkRecursiveAction(int batch, List<Task> list){
this.batch = batch;
this.list = list;
}
@Override
protected void compute() {
left = 0;
right = list.size();
//判断是否开始处理
if (right > batch){
//ArrayList 二分拆分
mid = (left + right)/2;
List<Task> tempLeft = list.subList(left,mid);
List<Task> tempRight = list.subList(mid,right);
//创建子任务
ForkRecursiveAction l = new ForkRecursiveAction(batch,tempLeft);
ForkRecursiveAction r = new ForkRecursiveAction(batch,tempRight);
//调用子任务
l.compute();
r.compute();
} else {
System.out.println(list);
}
}
}
测试类
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
/**
* @author
* @date 2023-06-08 19:59
* @since 1.8
*/
public class ForkJoin {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(3);
List<Task> list = new ArrayList<>(10);
for (int i=0;i<10;i++){
list.add(new Task(i));
}
ForkRecursiveAction taskTaskFork = new ForkRecursiveAction(3,list);
pool.invoke(taskTaskFork);
}
}
3.CountedCompleter
集合拆分与合并
import java.math.BigInteger;
import java.util.List;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author
* @date 2023-06-08 21:16
* @since 1.8
*/
public class ForkCountedCompleter<T> extends CountedCompleter<List<T>> {
private int left;
private int right;
private int mid;
private int batch;
private List<T> list;
private AtomicReference<List<T>> temp;
public ForkCountedCompleter(CountedCompleter<BigInteger> parent, int batch, List<T> list, AtomicReference<List<T>> temp) {
super(parent);
this.batch = batch;
this.list = list;
this.temp = temp;
}
/**
* 执行
*/
@Override
public void compute () {
left = 0;
right = list.size() ;
if (right > batch){
mid = (left + right)/2;
//ArrayList 二分拆分
List<T> tempLeft = list.subList(left,mid);
List<T> tempRight = list.subList(mid,right);
//创建子任务
ForkCountedCompleter taskLeft = new ForkCountedCompleter(this,batch,tempLeft,temp);
ForkCountedCompleter taskRight = new ForkCountedCompleter(this,batch,tempRight,temp);
taskLeft.fork();
taskRight.fork();
//计数
addToPendingCount(2);
} else {
//获取子任务结果
temp.get().addAll(list);
}
//判断任务是否完成
propagateCompletion();
}
}
测试类
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author
* @date 2023-06-08 19:59
* @since 1.8
*/
public class ForkJoin {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(3);
List<Task> list = new ArrayList<>(10);
for (int i=0;i<10;i++){
list.add(new Task(i));
}
AtomicReference<List<Task>> temp = new AtomicReference<>(new ArrayList<>());
ForkCountedCompleter<Task> taskTaskFork = new ForkCountedCompleter<>(null,3,list,temp);
pool.invoke(taskTaskFork);
System.out.println(temp);
}
}
三.ForkJoin 实践
代码
利用 ForkJoin 实现 Excel 数据导入数据并行处理
Excel 解析依赖包
<dependency>
<groupId>com.monitorjbl</groupId>
<artifactId>xlsx-streamer</artifactId>
<version>2.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
Excel ForkJoin 处理类
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;
/**
* @author
* @date 2023-06-22 13:11
* @since 1.8
*/
@Slf4j
public class ExcelFork extends RecursiveTask<List<User>> {
Logger logger = LoggerFactory.getLogger(ExcelFork.class);
private int start;
private int end;
private int batch;
private Sheet sheet;
/**
* 构造任务参数
* @param start
* @param end
* @param sheet
*/
public ExcelFork(int start, int end,int batch, Sheet sheet) {
this.start = start;
this.end = end;
this.batch = batch;
this.sheet = sheet;
}
/**
*
* @return
*/
@Override
protected List<User> compute() {
//数据异常
if (start > end ) {
return new ArrayList<>(0);
}
//最小任务取数据
if (end - start <= batch) {
return minimumTask(sheet, start, end);
} else {
/**
* 二分拆分,直到每块数据量满足设置的阈值
* 如果是读取 Excel 一般从第 1 行开始 除 2 取整左侧可能较小
* 例:0,1,2,3,4,5 0 为表头 2个一组则数据行如下:1,2;3,4,5 在分 3,4;5 最终就是 3 个子任务
*/
int mid = (start + end) / 2;
/**
* 分别创建子任务
*/
ExcelFork rightTask = new ExcelFork(start, mid,batch, sheet);
ExcelFork leftTask = new ExcelFork(mid + 1, end,batch, sheet);
//写法一
rightTask.fork();
List<User> leftList = leftTask.compute();
List<User> rightList = rightTask.join();
//将左边和右边的数据合并
leftList.addAll(rightList);
return leftList;
}
}
/**
* 最小任务单元
* @param sheet
* @param start
* @param end
* @return
*/
private List<User> minimumTask(Sheet sheet, int start, int end) {
List<User> mapList = new ArrayList<>(batch);
User user;
Row row;
for (int i = start; i <= end; i++) {
try {
row = sheet.getRow(i);
String code = String.valueOf(row.getCell(0).getNumericCellValue());
String name = row.getCell(1).getStringCellValue();
user = new User(code,name,0);
mapList.add(user);
} catch (Exception e) {
logger.info("Exception:", e);
}
}
return mapList;
}
}
人员类
/**
* @author
* @date 2023-06-22 13:01
* @since 1.8
*/
public class User {
private String code;
private String name;
private int age;
public User(int age,String name){
this.age = age;
this.name = name;
}
public User(String code,String name,int age){
this.code = code;
this.name = name;
this.age = age;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString(){
return "{\"Code\":\"" + code + "\",\"Name\":\"" + name + "\",\"age\":" + age + "}";
}
}
测试类
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import java.io.*;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
/**
* @author
* @date 2023-06-22 13:22
* @since 1.8
*/
public class Test {
public static void main(String[] args) throws IOException {
File file = new File("C:\\Users\\Desktop\\Test ForkJoin.xlsx");
InputStream inputStream = new FileInputStream(file);
Workbook workbook = new XSSFWorkbook(inputStream);
ForkJoinPool forkJoinPool = new ForkJoinPool(4);;
Sheet sheet = workbook.getSheetAt(0);
//开启任务
ExcelFork joinTask = new ExcelFork(1, sheet.getLastRowNum(),2000, sheet);
List<User> result = forkJoinPool.invoke(joinTask);
System.out.println(result);
}
}