Java ForkJoin 简介和应用

news2025/1/2 4:19:22

Java 并行框架 Fork Join

  • 一.Fork Join 简介
    • 1.框架说明
    • 2.任务说明
  • 二.应用示例
    • 1.RecursiveTask
      • 分组示例
      • 分组求和
    • 2.RecursiveAction
    • 3.CountedCompleter
  • 三.ForkJoin 实践
    • 代码
    • 测试
      • 1.测试用 Excel 文件
      • 2.读取结果

一.Fork Join 简介

1.框架说明

ForkJoinPool 继承自 AbstractExecutorService , AbstractExecutorService 实现了 ExecutorService 接口
ForkJoin 是 Java 自带的一个并行框架,关于并行和并发的差异则看机器是否为多核配置
ForkJoinPool 之于 ThreadPoolExecutor 的差异即封装了一个双端工作队列,用于缓存父子任务,同时引入工
作窃取算法,如果某个队列任务全部处理完成,则该任务队列的线程从其他队列头取任务进行处理,充分利
用子线程资源

2.任务说明

ForkJoin 基于分治思想将大的工作任务,分解为小的处理
任务抽象类 ForkJoinTask , 预置了三个默认实现 
RecursiveTask、RecursiveAction、CountedCompleter

二.应用示例

1.RecursiveTask

分组示例


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;

/**
 * @author 
 * @date 2023-06-08 19:29
 * @since 1.8
 */
public class ForkRecursiveTask<T> extends RecursiveTask<List<T>> {

    /**
     * 任务拆分参数
     */
    private int left;
    private int mid;
    private int right;
    private int batch;

    /**
     * 原始数据
     */
    private List<T> list;

    public ForkRecursiveTask(int batch, List<T> list){
        this.batch = batch;
        this.list = list;
    }

    @Override
    public List<T> compute() {

        left = 0;
        right = list.size();

        //判断是否开始处理
        if (right > batch){
            //ArrayList 二分拆分
            mid = (left + right)/2;
            List<T> tempLeft = list.subList(left,mid);
            List<T> tempRight = list.subList(mid,right);
            //创建子任务
            ForkRecursiveTask<T> l = new ForkRecursiveTask(batch,tempLeft);
            ForkRecursiveTask<T> r = new ForkRecursiveTask(batch,tempRight);
            //调用子任务,调用 5,6,7,8,9 (调用右侧,执行左侧,将右侧结果拼接到左侧)
            r.fork();
            //递归执行,执行 0,1,2,3,4
            List<T> tempL = l.compute();
            //阻塞取子任务结果
            List<T> tempR = r.join();
            //聚合:将右侧合并到左侧后面
            //tempR.addAll(tempL);
            tempL.addAll(tempR);
            return tempL;
        } else {
            return handler(list);
        }
    }

    /**
     * 处理方法
     * @param temp
     * @return
     */
    private List<T> handler(List<T> temp){
        return new ArrayList<>(temp);
    }
}

测试类


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

/**
 * @author
 * @date 2023-06-08 19:59
 * @since 1.8
 */
public class ForkJoin {


    public static void main(String[] args) {

        ForkJoinPool pool = new ForkJoinPool(3);

        List<Task> list = new ArrayList<>(10);
        for (int i=0;i<10;i++){
            list.add(new Task(i));
        }

        ForkRecursiveTask<Task> taskTaskFork = new ForkRecursiveTask<>(3,list);
        System.out.println(pool.invoke(taskTaskFork));

    }
}

分组求和

分组求和,打印分组过程

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;

/**
 * @author 
 * @date 2023-06-09 20:59
 * @since 1.8
 */
public class ForkRecursiveTaskM extends RecursiveTask<Integer> {

    private int batch;
    private int step = 2;
    private List<Integer> list;

    public ForkRecursiveTaskM(int batch,int step, List<Integer> list){
        this.batch = batch;
        this.step = step;
        this.list = list;
    }

    @Override
    protected Integer compute() {
        int size = list.size();
        if (size > batch){
            //拆分任务
            int length = size/step;
            List<List<Integer>> temp = split(list,batch,step);
            List<ForkRecursiveTaskM> tasks = new ArrayList<>(temp.size());
            for (List<Integer> d :temp){
                tasks.add(new ForkRecursiveTaskM(batch,step,d));
            }

            //调用子任务
            for (ForkRecursiveTaskM t:tasks){
                t.fork();
            }
            //取结果
            int result = 0;
            for (ForkRecursiveTaskM t:tasks){
                result += t.join();
            }

            return result;
        } else {
            int result=0;
            for (Integer i:list){
                result += i;
            }
            System.out.println(list + "---" + result);
            return result;
        }
    }

    public <T> List<List<T>> split(List<T> list,int batch,int step){

        int size = list.size();
        int real = size%batch > 0 ? size/batch + 1 : size/batch;
        step = Math.min(real,step);
        int length = Math.max(size / step,batch);
        List<List<T>> temp = new ArrayList<>(step);
        for (int i = 0;i < step;i++){
            int start = i * length;
            int end = (i + 1) * length;
            if ((i + 1) == step){
                end = size;
            }
            temp.add(list.subList(start,end));
        }

        return temp;
    }
}

测试类


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

/**
 * @author
 * @date 2023-06-08 19:59
 * @since 1.8
 */
public class ForkJoin {

    public static void main(String[] args) {

        ForkJoinPool pool = new ForkJoinPool(3);

        List<Task> list = new ArrayList<>(10);
        for (int i=0;i<10;i++){
            list.add(new Task(i));
        }
        
        List<Integer> sums = new ArrayList<>(10);
        for (int i=0;i<10;i++){
            sums.add(i);
        }
        ForkRecursiveTaskM taskTaskFork = new ForkRecursiveTaskM(3,3,sums);
        System.out.println(pool.invoke(taskTaskFork));
    }
}

2.RecursiveAction

集合分组,并打印分组过程

import java.util.List;
import java.util.concurrent.RecursiveAction;

/**
 * @author
 * @date 2023-06-14 20:09
 * @since 1.8
 */
public class ForkRecursiveAction extends RecursiveAction {

    /**
     * 任务拆分参数
     */
    private int left;
    private int mid;
    private int right;
    private int batch;

    /**
     * 原始数据
     */
    private List<Task> list;

    public ForkRecursiveAction(int batch, List<Task> list){
        this.batch = batch;
        this.list = list;
    }

    @Override
    protected void compute() {

        left = 0;
        right = list.size();

        //判断是否开始处理
        if (right > batch){
            //ArrayList 二分拆分
            mid = (left + right)/2;
            List<Task> tempLeft = list.subList(left,mid);
            List<Task> tempRight = list.subList(mid,right);
            //创建子任务
            ForkRecursiveAction l = new ForkRecursiveAction(batch,tempLeft);
            ForkRecursiveAction r = new ForkRecursiveAction(batch,tempRight);
            //调用子任务
            l.compute();
            r.compute();
        } else {
            System.out.println(list);
        }
    }
}

测试类


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

/**
 * @author
 * @date 2023-06-08 19:59
 * @since 1.8
 */
public class ForkJoin {


    public static void main(String[] args) {

        ForkJoinPool pool = new ForkJoinPool(3);

        List<Task> list = new ArrayList<>(10);
        for (int i=0;i<10;i++){
            list.add(new Task(i));
        }

        ForkRecursiveAction taskTaskFork = new ForkRecursiveAction(3,list);
        pool.invoke(taskTaskFork);
    }
}

3.CountedCompleter

集合拆分与合并

import java.math.BigInteger;
import java.util.List;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @author
 * @date 2023-06-08 21:16
 * @since 1.8
 */
public class ForkCountedCompleter<T> extends CountedCompleter<List<T>> {


    private int left;
    private int right;
    private int mid;
    private int batch;
    private List<T> list;

    private AtomicReference<List<T>> temp;

    public ForkCountedCompleter(CountedCompleter<BigInteger> parent, int batch, List<T> list, AtomicReference<List<T>> temp) {
        super(parent);
        this.batch = batch;
        this.list = list;
        this.temp = temp;
    }

    /**
     * 执行
     */
    @Override
    public void compute () {
        left = 0;
        right = list.size() ;
        if (right > batch){
            mid = (left + right)/2;
            //ArrayList 二分拆分
            List<T> tempLeft = list.subList(left,mid);
            List<T> tempRight = list.subList(mid,right);
            //创建子任务
            ForkCountedCompleter taskLeft = new ForkCountedCompleter(this,batch,tempLeft,temp);
            ForkCountedCompleter taskRight = new ForkCountedCompleter(this,batch,tempRight,temp);
            taskLeft.fork();
            taskRight.fork();
            //计数
            addToPendingCount(2);
        } else {
            //获取子任务结果
            temp.get().addAll(list);
        }
        //判断任务是否完成
        propagateCompletion();
    }
}

测试类


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @author
 * @date 2023-06-08 19:59
 * @since 1.8
 */
public class ForkJoin {


    public static void main(String[] args) {

        ForkJoinPool pool = new ForkJoinPool(3);

        List<Task> list = new ArrayList<>(10);
        for (int i=0;i<10;i++){
            list.add(new Task(i));
        }

        AtomicReference<List<Task>> temp = new AtomicReference<>(new ArrayList<>());
        ForkCountedCompleter<Task> taskTaskFork = new ForkCountedCompleter<>(null,3,list,temp);
        pool.invoke(taskTaskFork);
        System.out.println(temp);

    }
}

三.ForkJoin 实践

代码

利用 ForkJoin 实现 Excel 数据导入数据并行处理

Excel 解析依赖包

<dependency>
	<groupId>com.monitorjbl</groupId>
    <artifactId>xlsx-streamer</artifactId>
    <version>2.2.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Excel ForkJoin 处理类


import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;

/**
 * @author
 * @date 2023-06-22 13:11
 * @since 1.8
 */
@Slf4j
public class ExcelFork extends RecursiveTask<List<User>> {

    Logger logger = LoggerFactory.getLogger(ExcelFork.class);

    private int start;
    private int end;
    private int batch;
    private Sheet sheet;

    /**
     * 构造任务参数
     * @param start
     * @param end
     * @param sheet
     */
    public ExcelFork(int start, int end,int batch, Sheet sheet) {
        this.start = start;
        this.end = end;
        this.batch = batch;
        this.sheet = sheet;
    }

    /**
     *
     * @return
     */
    @Override
    protected List<User> compute() {

        //数据异常
        if (start > end ) {
            return new ArrayList<>(0);
        }
        //最小任务取数据
        if (end - start <= batch) {
            return minimumTask(sheet, start, end);
        } else {
            /**
             * 二分拆分,直到每块数据量满足设置的阈值
             * 如果是读取 Excel 一般从第 1 行开始 除 2 取整左侧可能较小
             * 例:0,1,2,3,4,5 0 为表头 2个一组则数据行如下:1,2;3,4,5 在分 3,4;5 最终就是 3 个子任务
             */
            int mid = (start + end) / 2;
            /**
             * 分别创建子任务
             */
            ExcelFork rightTask = new ExcelFork(start, mid,batch, sheet);
            ExcelFork leftTask = new ExcelFork(mid + 1, end,batch, sheet);
            //写法一
            rightTask.fork();
            List<User> leftList =  leftTask.compute();
            List<User> rightList = rightTask.join();
            //将左边和右边的数据合并
            leftList.addAll(rightList);
            return leftList;
        }
    }

    /**
     * 最小任务单元
     * @param sheet
     * @param start
     * @param end
     * @return
     */
    private List<User> minimumTask(Sheet sheet, int start, int end) {
        List<User> mapList = new ArrayList<>(batch);
        User user;
        Row row;
        for (int i = start; i <= end; i++) {
            try {
                row = sheet.getRow(i);
                String code = String.valueOf(row.getCell(0).getNumericCellValue());
                String name = row.getCell(1).getStringCellValue();
                user = new User(code,name,0);
                mapList.add(user);
            } catch (Exception e) {
                logger.info("Exception:", e);
            }
        }
        return mapList;
    }
}

人员类


/**
 * @author
 * @date 2023-06-22 13:01
 * @since 1.8
 */
public class User {

    private String code;
    private String name;
    private int age;

    public User(int age,String name){
        this.age = age;
        this.name = name;
    }

    public User(String code,String name,int age){
        this.code = code;
        this.name = name;
        this.age = age;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString(){
        return "{\"Code\":\"" + code + "\",\"Name\":\"" + name + "\",\"age\":" + age + "}";
    }
}

测试类


import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;

import java.io.*;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

/**
 * @author 
 * @date 2023-06-22 13:22
 * @since 1.8
 */
public class Test {

    public static void main(String[] args) throws IOException {

        File file = new File("C:\\Users\\Desktop\\Test ForkJoin.xlsx");
        InputStream inputStream = new FileInputStream(file);

        Workbook workbook = new XSSFWorkbook(inputStream);

        ForkJoinPool forkJoinPool = new ForkJoinPool(4);;
        Sheet sheet = workbook.getSheetAt(0);
        //开启任务
        ExcelFork joinTask = new ExcelFork(1, sheet.getLastRowNum(),2000, sheet);

        List<User> result = forkJoinPool.invoke(joinTask);

        System.out.println(result);
    }
}

测试

1.测试用 Excel 文件

在这里插入图片描述

2.读取结果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/674171.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Java-SpringBoot+Vue+MySql】Day4-VUE框架使用

一、VUE入门 1、环境准备 2、预备知识 3、实战演练 vue官网 Vue.js - 渐进式 JavaScript 框架 | Vue.js 基础语法&#xff0c;vue2和vue3区别不大&#xff0c;但是后面路由会有很大区别。 前期基础语法&#xff0c;我们通过链接的方式使用vue&#xff0c;后面会用npm进行安装…

Transformer-XL模型简单介绍

目录 一、前言 二、整体概要 三、细节描述 3.1 状态复用的块级别循环 3.2 相对位置编码 四、论文链接 一、前言 以自注意力机制为核心的 Transformer 模型是各种预训练语言模型中的主要组成部分。自注意力机制能够构建序列中各个元素之间的上下文关联程度&#xff0c;挖掘…

java 2023秒杀项目 day(1) 面经

java 2023杀项目 day(1) 面经 一、秒杀项目1.1 如何设计秒杀系统1.2 数据库 二、业务2.1 登录2.2.1 密码加密 2.2.2 密码参数校验2.2.3 分布式session2.2.3.1 解决方案 2.2.4 参数解析器 2.3 异常处理2.3.1 ControllerAdvicerExceptionHandler 2.4 秒杀2.4.1 逻辑2.4.1 秒杀前判…

图像处理——以支票识别为例

用到环境 1、pycharm community edition 2022.3.2 2、Python 3.10 后续应该会在资源上传项目&#xff0c;需要的话可以私信我。 流程 图1 扩展实验“金额识别”流程图 正文 导入 cv2、pytesseract、re 和 locale 模块。 使用 cv2.imread() 函数加载名为 cheque.jpg 的支票图像…

ROS:结构

目录 前言一、设计者二、维护者三、系统架构四、ROS自身结构 前言 从不同的角度&#xff0c;对ROS架构的描述也是不同的&#xff0c;一般我们可以从设计者、维护者、系统结构与自身结构4个角度来描述ROS结构: 一、设计者 ROS设计者将ROS描述为“ROS Plumbing Tools Capab…

数仓工具Hive 概述

Hive Hive简介Hive架构HiveSQL语法不同之处建表语句查询语句 Hive查看执行计划Hive文件格式 Hive简介 Hive是由Facebook开源&#xff0c;基于Hadoop的一个数据仓库工具&#xff0c;可以将结构化的数据文件映射为一张表&#xff0c;并提供类SQL查询功能。 通过Hive可以将mapred…

数字IC前端学习笔记:仲裁轮询(三)

相关文章 数字IC前端学习笔记&#xff1a;LSFR&#xff08;线性反馈移位寄存器&#xff09; 数字IC前端学习笔记&#xff1a;跨时钟域信号同步 数字IC前端学习笔记&#xff1a;信号同步和边沿检测 数字IC前端学习笔记&#xff1a;锁存器Latch的综合 数字IC前端学习笔记&am…

mitmproxy抓包原理

文章目录 mitmproxy原理详解1 mitmproxy 基本原理2 作为中间代理获取HTTP请求信息2.1 应对显式HTTP请求2.2 应对隐式HTTP请求 3 作为中间代理获取HTTPS请求信息3.1 显式HTTPS请求1) 获取远程主机名2) 处理主题备用名称SAN3) 处理服务器名称指示SNI4) 显式HTTPS请求信息获取整个…

基于深度学习的高精度烟雾检测识别系统(PyTorch+Pyside6+YOLOv5模型)

摘要&#xff1a;基于深度学习的高精度烟雾检测识别系统可用于日常生活中或野外来检测与定位烟雾目标&#xff0c;利用深度学习算法可实现图片、视频、摄像头等方式的烟雾目标检测识别&#xff0c;另外支持结果可视化与图片或视频检测结果的导出。本系统采用YOLOv5目标检测模型…

为什么数据库字段建议设置为NOT NULL?

目录 一、性能 二、开发的友好性 三、聚合函数不准确 四、null与其它值计算 五、distinct、group by、order by的问题 六、索引问题 七、其它问题 一、性能 如果查询中包含可为null的列&#xff0c;对MYSQL来说更难优化&#xff0c;因为可为null的列使得索引、索引统计…

NUCLEO-F411RE RT-Thread 体验 (9) - GCC环境 PWM的驱动移植以及简单使用

NUCLEO-F411RE RT-Thread 体验 (9) - GCC环境 PWM的驱动移植以及简单使用 驱动移植 驱动位于drv_pwm.c文件中&#xff0c;components层代码位于rt_drv_pwm.c中。 修改Makefile文件 修改配置文件rtconfig.h LED2链接PA5&#xff0c;而TIM2_CHANNEL1可从PA5输出PWM&#xff0…

ubuntu命令

查看当前用户 whoami 进入root权限 sudo su 修改用户名密码 sudo passwd username #修改用户密码 //username是用户设置用户名&#xff0c;记得替换 sudo passwd root #修改root密码 https://blog.csdn.net/m0_54647521/article/details/127521032 重启…

图形编辑器开发:以光标位置缩放画布

大家好&#xff0c;我是前端西瓜哥。 画布缩放是图形设计工具中很重要的基础能力。 通过它&#xff0c;我们可以像举着一台摄影机&#xff0c;在图形所在的世界到处游逛&#xff0c;透过镜头&#xff0c;可以只看自己想看的图形&#xff1b;可以拉近摄影机&#xff0c;看到图…

SpringBoot 如何使用 @ControllerAdvice 注解进行全局异常处理

SpringBoot 如何使用 ControllerAdvice 注解进行全局异常处理 在 Web 开发中&#xff0c;异常处理是非常重要的一环。在 SpringBoot 框架中&#xff0c;我们通常使用 ExceptionHandler 注解来处理 Controller 层的异常。但是&#xff0c;如果想要处理全局异常&#xff0c;我们…

【MySql】MySql事务常见操作

文章目录 准备工作事务常见操作方式总结 准备工作 将mysql的默认隔离级别设置成读未提交 set global transaction isolation level read uncommitted;注意&#xff1a;设置完毕之后&#xff0c;需要重启终端&#xff0c;进行查看 select tx_isolation 创建测试表 mysql>…

HTML5 的新特性

html基础知识html基础知识_上半场结束&#xff0c;中场已休息&#xff0c;下半场ing的博客-CSDN博客html5的新特性HTML5 的新特性_上半场结束&#xff0c;中场已休息&#xff0c;下半场ing的博客-CSDN博客 目录 1.0 HTML5 的新特性 1.1 HTML5 新增的语义化标签 1.2 HTML5 新增的…

LabVIEW何得知是谁在连接远程前面板

LabVIEW何得知是谁在连接远程前面板 想要知道连接远程前面板的用户的身份。如何来得知用户的身份和他们连接远程前面板的时间&#xff1f; 解答: 可以使用Remote Panel: Connections To Clients属性或者Remote Panel Client Connections方法来得知连接远程面板用户的身份。Re…

TypeScript ~ TS 掌握编译文件配置项 ⑤

作者 : SYFStrive 博客首页 : HomePage &#x1f4dc;&#xff1a; TypeScript ~ TS &#x1f4cc;&#xff1a;个人社区&#xff08;欢迎大佬们加入&#xff09; &#x1f449;&#xff1a;社区链接&#x1f517; &#x1f4cc;&#xff1a;觉得文章不错可以点点关注 &…

【2023,学点儿新Java-20】流程控制语句关键字及其介绍:while、do、break、continue、return

前情回顾&#xff1a; 【2023&#xff0c;学点儿新Java-19】Java特殊空类型关键字 | Java流程控制语句关键字 | switch-case 选择结构的执行过程&注意点 | 详解&#xff1a;for循环的普通和增强版本【2023&#xff0c;学点儿新Java-18】Java关键字汇总说明 |附&#xff1a…

chatgpt赋能python:Python新手常见的报错提示及解决方法

Python新手常见的报错提示及解决方法 Python是一种非常流行的编程语言。对于新手来说&#xff0c;Python在学习过程中可能会遇到许多报错提示&#xff0c;这些提示可能会让人感到很困惑。本文将介绍Python新手常见的报错提示&#xff0c;并提供解决方法。 IndentationError: …