(十五)ForkJoin框架

news2025/1/11 5:52:29

ForkJoinPool

ForkJoinPool是一种“分治算法”的多线程并行计算框架,自Java7引入。它将一个大的任务分为若干个子任务,这些子任务分别计算,然后合并出最终结果。
ForkJoinPool比普通的线程池可以更好地实现计算的负载均衡,提高资源利用率。

创建ForkJoinPool

构造方法

共有三个public的构造方法,最多的有4个参数,分别是 并行度、工作线程工厂,线程未捕获异常的处理器、工作队列模式(FIFO或LIFO,默认是LIFO)、工作线程名称前缀。一般在使用无参或一个参数的构造方法即可,(或者使用commonPool),如果需要定制线程继承ForkJoinWorkerThread,则使用4个参数的构造方法。
//ForkJoinPool.commonPool();
public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
            defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism) {
    this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler,
                    boolean asyncMode) {
    this(checkParallelism(parallelism),
            checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
            "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}

private ForkJoinPool(int parallelism,
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler,
                     int mode, String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

RecursiveAction和RecursiveTask

这两个类都是ForkJoinTask子类,用于实现子任务的逻辑。区别是 前者没有返回值,后者有返回值。使用时,针对不同类型的任务,可以分别继承这两个类,实现其compute方法。

使用

案例1(RecursiveAction):快速排序

基本思想:
1、利用数组的某个元素(一般取第一个元素)把数组划分成两半,左边子数组里面的元素小于等于该元素,右边子数组里面的元素大于等于该元素。
2、对左右的2个子数组分别排序。
将数组划分为两部分后,对子数组分别排序是独立的子问题,这个过程可以递归分解子问题,所以可以利用多个线程分别为子数组排序。
package com.example.demo;

import org.junit.Test;

import java.security.SecureRandom;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.StringJoiner;
import java.util.concurrent.*;
import java.util.stream.Stream;

public class ForkJoinTest {
    ForkJoinPool pool = ForkJoinPool.commonPool();
    SecureRandom random = new SecureRandom();

    @Test
    public void testSort() throws ExecutionException, InterruptedException {
        StringJoiner before = new StringJoiner(",","[","]");
        //20个数的数组
        int[] arr = getRandomIntArray(20);
        Arrays.stream(arr).mapToObj(String::valueOf).forEach(before::add);
        System.out.println(before.toString());
        Instant start = Instant.now();
        ForkJoinTask<Void> task = pool.submit(new QuickSortTask(arr));
        //阻塞直到完成排序
        task.get();
        long i = Duration.between(start,Instant.now()).get(ChronoUnit.NANOS);
        System.out.println("排序时间:" + i + "纳秒");
        StringJoiner after = new StringJoiner(",","[","]");
        Arrays.stream(arr).mapToObj(String::valueOf).forEach(after::add);
        System.out.println(after.toString());
    }

    private int[] getRandomIntArray(int count) {
        int bound = count * 10;
        int[] array = new int[count];
        for (int i = 0; i < count; i++){
            array[i] = random.nextInt(bound);
        }
        return array;
    }

    public static class QuickSortTask extends RecursiveAction {

        private int start;
        private int end;
        private int[] array;

        public QuickSortTask(int[] array){
            this.array = array;
            this.start = 0;
            this.end = array.length-1;
        }

        public QuickSortTask(int[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute() {
            int mid = part(array, start, end);
            //当左边还有元素时
            if (mid != start) {
                QuickSortTask task1 = new QuickSortTask(array, start, mid - 1);
                task1.fork();
                task1.join();
            }
            //当右边还有元素时
            if (mid != end) {
                QuickSortTask task2 = new QuickSortTask(array, mid + 1, end);
                task2.fork();
                task2.join();
            }
        }

        /**
         * <p>返回基准值的下标,基准值左的元素都小于等于基准值,基准值右的元素大于等于基准值</p>
         * @param array
         * @param start
         * @param end
         * @return
         */
        private int part(int[] array, int start, int end) {
            int i = start, j = end;
            //基准值的下标
            int base = start;
            //左右扫描相遇时结束
            while (i < j) {
                //从右向左扫描,如果当前值比基准值小,则置换,已经置换过的元素不再扫描(j的右边)
                while (i < j && array[j] >= array[base]) {
                    j--;
                }
                if (i < j) {
                    swap(array, j, base);
                    base = j;
                }
                //从左向右扫描,如果当前值比基准值大,则置换,已经置换过的元素不再扫描(i的左边)
                while (i < j && array[i] <= array[base]) {
                    i++;
                }
                if (i < j) {
                    swap(array,i, base);
                    base = i;
                }
            }
            return base;
        }

        private void swap(int[] array, int x, int y) {
            if (x != y) {
                int temp = array[x];
                array[x] = array[y];
                array[y] = temp;
            }
        }
    }
}

案例2(RecursiveTask):求和

例如从1加到100,如果不用高斯的方法,可以用程序实现累加,将数拆分成小组,每个小组互相独立,每个小组组内分别相加,最后把组的结果相加,这个过程可以使用ForkJoin。
RecursiveTask<T>
package com.example.demo;

import org.junit.Test;

import java.security.SecureRandom;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.StringJoiner;
import java.util.concurrent.*;
import java.util.stream.Stream;

public class ForkJoinTest {
    ForkJoinPool pool = ForkJoinPool.commonPool();

    @Test
    public void testSum() throws ExecutionException, InterruptedException {
        ForkJoinTask<Integer> submit = pool.submit(new SumTask(1, 100));
        System.out.println(submit.get());
    }

    public static class SumTask extends RecursiveTask<Integer>{
        private int startNum;
        private int endNum;
        //决定当子任务处理的元素个数小于此值时不再切分任务,直接进行计算
        private static final int THRESHOLD = 10;

        public SumTask(int startNum, int endNum){
            this.startNum = startNum;
            this.endNum = endNum;
        }

        @Override
        protected Integer compute() {
            int sum = 0;
            if (endNum - startNum + 1 < THRESHOLD){
                for (int i = startNum; i <= endNum; i++){
                    sum += i;
                }
                return sum;
            }
            int mid = split(startNum, endNum);
            SumTask task1 = new SumTask(startNum, mid);
            SumTask task2 = new SumTask(mid + 1, endNum);
            ForkJoinTask<Integer> fork1 = task1.fork();
            ForkJoinTask<Integer> fork2 = task2.fork();
            sum = fork1.join() + fork2.join();
            return sum;
        }

        private int split(int startNum, int endNum) {
            return (startNum + endNum)/2;
        }
    }
}

关闭

和ThreadPoolExecutor一样,ForkJoinPool使用完也要关闭,依然是使用shutdown和shutdownNow方法,shutdown只拒绝新提交的任务;shutdownNow会取消现有的全局队列和局部队列中的任务,同时唤醒所有空闲的线程,让这些线程自动退出。
public void shutdown();
public List<Runnable> shutdownNow();
ForkJoinPool pl=new ForkJoinPool();
try {
    boolean flag;
    do {
        flag = pl.awaitTermination(500,TimeUnit.MILLISECONDS);
    } while (!flag);
} catch (Exception e){
    e.printStackTrace();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/177473.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

安装MikTeX-latex

安装MikTeX-latex一、报错信息二、重新安装三、编译MDPI Template一、报错信息 由于之前使用的是basic-miktex-2.9.7269-x64.exe这个版本&#xff0c;当安装完成后&#xff0c;在更新package时遇到了以下错误&#xff1a; MikTeX update error 于是&#xff0c;通过搜索&…

冯·诺依曼、哈佛、改进型哈佛体系结构解析

在如今的CPU中&#xff0c;由于Catch的存在&#xff0c;这些概念已经被模糊了。个人认为去区分他们并没有什么意义&#xff0c;仅作为知识点。 哈佛结构设计复杂&#xff0c;但效率高。冯诺依曼结构则比较简单&#xff0c;但也比较慢。CPU厂商为了提高处理速度&#xff0c;在C…

2023 年程序员的热门开发项目:掌握最新技术的教程和工具的完整列表

欢迎阅读我们关于“2023 年程序员的热门开发项目”的博文&#xff01;作为一名开发人员&#xff0c;了解最新的技术和工具对于在就业市场上保持竞争力至关重要。在这篇文章中&#xff0c;我们编制了一份 2023 年最热门开发项目的完整列表&#xff0c;以及掌握每个项目的教程和资…

ChatGPT付费版来啦,好用的AI生成产品还能免费使用吗?AIGC工具箱

​最新消息&#xff0c;chatGPT推出了付费版&#xff01;每月&#xff04;42美元&#xff0c;不限流使用&#xff0c;你会付费使用吗&#xff1f;&#x1f9f0;AIGC工具箱下面推荐几款AI 生成产品&#xff01;你觉得哪个更好用呢&#xff1f;AI 的出现&#xff0c;颠覆了内容生…

自己动手写一个操作系统——我们能做什么,我们需要做什么

文章目录计算机启动流程第一条指令BIOSMBRloaderkernel总结计算机启动流程 第一条指令 在开机的一瞬间&#xff0c;也就是上电的一瞬间&#xff0c;CPU 的 CS:IP 寄存器被硬件强制初始化为 0xF000:0xFFF0。 CS:IP 就是 PC 指针&#xff0c;也就是 CPU 下一条要执行的指令的地址…

Elasticsearch7.8.0版本入门—— Elasticsearch7.8.0映射操作

目录一、映射的概述二、创建映射的示例2.1、首先&#xff0c;创建索引2.2、然后&#xff0c;再创建好的索引基础上&#xff0c;创建映射2.3、映射属性说明2.4、查看创建的映射2.5、最后&#xff0c;创建文档2.6、根据文档中name属性条件查询文档 理解映射示例2.7、根据文档中se…

HDM KVM维护

前言 服务器遇到个问题&#xff0c;无法启动&#xff0c;下面简单记录一下解决程 方法 进入维护界面&#xff1a; 尝试 H5 KVM&#xff0c;发现H5 kvm挂载镜像速度较慢 使用 KVM.jnlp&#xff0c;需配置 java 环境&#xff0c;安装好java 环境已经配置java 环境变量后&…

Linux常见命令 18 - 用户管理命令 useradd, passwd, who, w

目录 1. 添加用户命令 useradd 2. 设置用户密码 passwd 3. 查看用户登录信息 who 4. 查看用户登录详细信息 w 1. 添加用户命令 useradd 语法&#xff1a;useradd [用户名] 2. 设置用户密码 passwd 语法&#xff1a;passwd [用户名] 注意&#xff1a;每个用户只能用passwd更改自…

作为项目经理,如何做好项目进度管理

一、项目进度管理需要做什么&#xff1f; 项目进度管理分9步&#xff1a;其中前⑥条属于规划过程组的工作内容&#xff0c;第⑦条属于监控过程组的工作内容。 ①规划进度管理&#xff1a;在文档内计划如何做好进度管理 ②定义活动&#xff1a;识别和记录项目中的活动 ③排列活动…

数据库系统概念 | 第六章:形式化关系查询语言 | 含带答案习题

文章目录&#x1f4da;关系代数&#x1f407;基本运算&#x1f955;选择运算&#x1f955;投影运算&#x1f955;关系运算的组合&#x1f955;集合并运算&#x1f955;集合差运算&#x1f955;集合交运算&#x1f955;笛卡尔积运算&#x1f955;更名运算&#x1f955;一道综合例…

量子力学奇妙之旅-微扰论和变分法

专栏目录: 高质量文章导航 一.基本概念 前置: 厄密算符和简并: 两大重要结论: 厄米算符的本征值一定是实数 厄米算符不同本征值的本征态一定正交 证明: 我们 λ<

Day02函数和条件表达

0. 格式化字符串 格式 化字符串 print(1) print(1,2,3,4)a 1 b 2.1123 c hello s a %d b %f c %s % (a,b,c)s -- worldprint(s)s fa {a} b {b} c {c} print(s)s a {0:5d} b {1:.2f} c {0}.format(a,b,c) print(s)1. 函数概述 print() input() type() int…

C++菱形继承以及解决方法--虚继承 虚基表

目录菱形继承形成原因出现二义性变量的内存布局应对方案虚继承 vitrual解决二义性变量内存布局--虚基表感悟关于代码复用等的另一种关系-组合菱形继承形成原因 多继承&#xff0c;呈菱形状 菱形继承代码: class A { public:A() {}int _a ; }; class B :public A { public…

NAT技术原理、使用场景

随着Internet的发展和网络应用的增多&#xff0c;有限的IPv4公有地址已经成为制约网络发展的瓶颈。为解决这个问题&#xff0c;NAT&#xff08;Network Address Translation&#xff0c;网络地址转换&#xff09;技术应需而生。 NAT技术主要用于实现内部网络的主机访问外部网络…

JDK8 新特性之新增的Optional类

目录 一&#xff1a;以前对null的处理方式 二&#xff1a;Optional类介绍 三&#xff1a;Optional的基本使用 Optional的高级使用 小结 一&#xff1a;以前对null的处理方式 Test public void test01() { String userName "凤姐"; // String userName null; …

十八、Gtk4-Stateful action

有些动作action有状态。状态的典型值是布尔值或字符串。但是&#xff0c;如果你愿意&#xff0c;也可以使用其他类型的状态。 具有状态的动作称为有状态的。 Stateful action without a paramete 有些菜单被称为切换菜单。例如&#xff0c;全屏菜单有一个状态&#xff0c;它…

在甲骨文云容器实例(Container Instances)上部署edge

甲骨文云推出了容器实例&#xff0c;这是一项无服务器计算服务&#xff0c;可以即时运行容器&#xff0c;而无需管理任何服务器。 今天我们尝试一下通过容器实例部署edge。 Step1. 创建容器实例 在甲骨文容器实例页面&#xff0c;单击"创建容器实例"&#xff0c; …

审批工作流—ccflow

审批工作流—ccflow目录概述需求&#xff1a;设计思路实现思路分析1.java 代码分析参考资料和推荐阅读Survive by day and develop by night. talk for import biz , show your perfect code,full busy&#xff0c;skip hardness,make a better result,wait for change,challen…

LeetCode[547]省份数量

难度&#xff1a;中等题目&#xff1a;有 n个城市&#xff0c;其中一些彼此相连&#xff0c;另一些没有相连。如果城市 a与城市 b直接相连&#xff0c;且城市 b与城市 c直接相连&#xff0c;那么城市 a与城市 c间接相连。省份 是一组直接或间接相连的城市&#xff0c;组内不含其…

Leetcode:93. 复原 IP 地址(C++)

目录 问题描述&#xff1a; 实现代码与解析&#xff1a; 回溯&#xff1a; 原理思路&#xff1a; 问题描述&#xff1a; 有效 IP 地址 正好由四个整数&#xff08;每个整数位于 0 到 255 之间组成&#xff0c;且不能含有前导 0&#xff09;&#xff0c;整数之间用 . 分隔。…