并发编程-线程池ForkJoinPool(二)

news2025/1/12 6:05:20

Fork/Join框架介绍

什么是Fork/Join

Fork/Join是一个是一个并行计算的框架,主要就是用来支持分治任务模型。

Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。

核心思想:将一个大任务分成许多小任务,然后并行执行这些小任务,最终将它们的结果合并成一个大的结果。

应用场景

1、递归分解型任务

这类任务通常可以将大任务分解成若干子任务,每个子任务可以独立执行,并且可以归并子任务得到有序的结果

举例:排序、归并、遍历

2、数组处理

处理大型数组时,可以将大数组分解成若干子数组,子数组并行处理,最后归并子数组的结果

举例:数组的排序、统计、查找

3、并行化算法

将问题分解成若干子问题,并行解决每个子问题,最后合并子问题得到最终解决方案

举例:并行化图像处理算法、并行化机器学习算法

4、大数据处理

将数据分成若干分片,并行处理每个分片,最后将处理后的分片合并成完整结果

举例:大型日志文件处理、大型数据库的查询

Fork/Join使用

Fork/Join框架的主要组成部分是ForkJoinPool、ForkJoinTask。ForkJoinPool是一个线程池,它用于管理ForkJoin任务的执行。ForkJoinTask是一个抽象类,用于表示可以被分割成更小部分的任务。

ForkJoinPool

ForkJoinPool是Fork/Join框架中的线程池类,它用于管理Fork/Join任务的线程。

方法:submit()、invoke()、shutdown()、awaitTermination()等

(提交任务、执行任务、关闭线程池、等待任务执行结果)

参数:线程池的大小、工作线程的优先级、任务队列的容量等,根据具体应用场景设置

构造器

ForkJoinPool中有四个核心参数,用于控制线程池的并行数、工作线程的创建、异常处理和模式指定等。

  • int parallelism:指定并行级别(parallelism level)。ForkJoinPool将根据这个设定,决定工作线程的数量。如果未设置的话,将使用Runtime.getRuntime().availableProcessors()来设置并行级别;

  • ForkJoinWorkerThreadFactory factory:ForkJoinPool在创建线程时,会通过factory来创建。注意,这里需要实现的是ForkJoinWorkerThreadFactory,而不是ThreadFactory。如果不指定factory,那么将由默认的DefaultForkJoinWorkerThreadFactory负责线程的创建工作;

  • UncaughtExceptionHandler handler:指定异常处理器,当任务在运行中出错时,将由设定的处理器处理;

  • boolean asyncMode:设置队列的工作模式。当asyncMode为true时,将使用先进先出队列,而为false时则使用后进先出的模式。默认是false,后进先出

// 获取处理器数量,注意这里是逻辑核
int processors = Runtime.getRuntime().availableProcessors();
// 构建forkjoin线程池
ForkJoinPool forkJoinPool = new ForkJoinPool(processors);

public ForkJoinPool(int parallelism) {
	this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}

任务提交方式(核心能力之一)

返回值方法
提交异步执行voidexecute(ForkJoinTask task) execute(Runnable task)
等待并获取结果Tinvoke(ForkJoinTask task)
提交执行获取Future结果ForkJoinTasksubmit(ForkJoinTask task) submit(Callable task) submit(Runnable task) submit(Runnable task, T result)

和普通线程池之间的区别

  • 工作窃取算法

ForkJoinPool采用工作窃取算法来提高线程的利用率,而普通线程池则采用任务队列来管理任务。

工作窃取:一个线程完成自己的任务后,可以从其它线程的队列中获取一个任务来执行,提高线程的利用率。

  • 任务的分解和合并

ForkJoinPool可以将一个大任务分解为多个小任务,并行地执行这些小任务,最终将它们的结果合并起来得到最终结果。而普通线程池只能按照提交的任务顺序一个一个地执行任务。

  • 工作线程的数量

ForkJoinPool会根据当前系统的CPU核心数来自动设置工作线程的数量,以最大限度地发挥CPU的性能优势。而普通线程池需要手动设置线程池的大小,如果设置不合理,可能会导致线程过多或过少,从而影响程序的性能。

  • 任务类型

ForkJoinPool适用于执行大规模任务并行化,而普通线程池适用于执行一些短小的任务,如处理请求等。

ForkJoinTask

ForkJoinTask是Fork/Join框架中的抽象类,它定义了执行任务的基本接口。用户可以通过继承ForkJoinTask类来实现自己的任务类,并重写其中的compute()方法来定义任务的执行逻辑。通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下三个子类:

  • RecursiveAction用于递归执行但不需要返回结果的任务。

  • RecursiveTask用于递归执行需要返回结果的任务。

  • CountedCompleter<T>:在任务完成执行后会触发执行一个自定义的钩子函数

调用方法

  • fork()——提交任务

fork()方法用于向当前任务所运行的线程池中提交任务。如果当前线程是ForkJoinWorkerThread类型,将会放入该线程的工作队列,否则放入common线程池的工作队列中。

  • join()——获取任务执行结果

join()方法用于获取任务的执行结果。调用join()时,将阻塞当前线程直到对应的子任务完成运行并返回结果。

计算斐波那契数列(处理递归任务)

斐波那契数列指的是这样一个数列:1,1,2,3,5,8,13,21,34,55,89... 这个数列从第3项 开始,每一项都等于前两项之和。

public class FibonacciDemo extends RecursiveTask<Integer>
{
    final int n;
    
    FibonacciDemo(int n)
    {
        this.n = n;
    }
    
    /**
     * 重写RecursiveTask的compute()方法
     */
    protected Integer compute()
    {
        if (n <= 1)
            return n;
        FibonacciDemo f1 = new FibonacciDemo(n - 1);
        // 提交任务
        f1.fork();
        FibonacciDemo f2 = new FibonacciDemo(n - 2);
        // 合并结果
        return f2.compute() + f1.join();
    }
    
    public static void main(String[] args)
    {
        // 构建forkjoin线程池
        ForkJoinPool pool = new ForkJoinPool();
        FibonacciDemo task = new FibonacciDemo(100000); // 参数大,抛出StackOverflowError
        // 提交任务并一直阻塞直到任务 执行完成返回合并结果。
        int result = pool.invoke(task);
        System.out.println(result);
    }
}

栈溢出如何解决
// 使用迭代方式,防止栈溢出
public class FibonacciDemo2
{
    public static void main(String[] args)
    {
        int n = 100000;
        long[] fib = new long[n + 1];
        fib[0] = 0;
        fib[1] = 1;
        for (int i = 2; i <= n; i++)
        {
            fib[i] = fib[i - 1] + fib[i - 2];
        }
        System.out.println(fib[n]);
    }
}

处理递归任务注意事项

在使用Fork/Join框架处理递归任务时,需要根据实际情况来评估递归深度和任务粒度,以避免任务调度和内存消耗的问题。如果递归深度较大,可以尝试采用其他方法来优化算法,如使用迭代方式替代递归,或者限制递归深度来减少任务数量,以避免Fork/Join框架的缺点。

递归深度较大时,子任务可能被调度到不同的线程执行,线程的创建、销毁、任务调度占用大量资源。另外递归调用方法时,创建大量方法栈帧,可能导致栈内存溢出StackOverflowError

处理阻塞任务

1、防止线程饥饿:当一个线程在执行一个阻塞型任务时,它将会一直等待任务完成,没有任务窃取的情况下可能会一直阻塞下去。为防止这种情况发生,应该避免在ForkJoinPool中提交大量的阻塞型任务。

2、使用特定的线程池:为了最大程度地利用ForkJoinPool的性能,可以使用专门的线程池来处理阻塞型任务,这些线程不会被ForkJoinPool的窃取机制所影响。例如,可以使用ThreadPoolExecutor来创建一个线程池,然后将这个线程池作为ForkJoinPool的执行器,这样就可以使用ThreadPoolExecutor来处理阻塞型任务,而使用ForkJoinPool来处理非阻塞型任务。

3、不要阻塞工作线程:如果在ForkJoinPool中使用阻塞型任务,那么需要确保这些任务不会阻塞工作线程,否则会导致整个线程池的性能下降。为了避免这种情况,可以将阻塞型任务提交到一个专门的线程池中,或者使用CompletableFuture等异步编程工具来处理阻塞型任务。

// 结合CompletableFuture使用示例
public class BlockingTaskDemo
{
    public static void main(String[] args)
    {
        // 构建一个forkjoin线程池
        ForkJoinPool pool = new ForkJoinPool();
        
        // 创建一个异步任务,并将其提交到ForkJoinPool中执行
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try
            {
                // 模拟一个耗时的任务
                TimeUnit.SECONDS.sleep(5);
                return "Hello, world!";
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
                return null;
            }
        }, pool);
        
        try
        {
            // 等待任务完成,并获取结果
            String result = future.get();
            
            System.out.println(result);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        catch (ExecutionException e)
        {
            e.printStackTrace();
        }
        finally
        {
            // 关闭ForkJoinPool,释放资源
            pool.shutdown();
        }
    }
}

ForkJoinPool工作原理

ForkJoinPool 内部有多个任务队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中。

另外,ForkJoinPool 支持一种叫 做“任务窃取”的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务。充分利用CPU的性能。

工作任务队列是一个双端链表,窃取是从base端窃取,top端是正常执行取任务

工作线程ForkJoinWorkerThread

ForkJoinWorkerThread是ForkJoinPool中的一个专门用于执行任务的线程。

当一个ForkJoinWorkerThread被创建时,它会自动注册一个WorkQueue到ForkJoinPool中。这个WorkQueue是该线程专门用于存储自己的任务的队列,只能出现在WorkQueues[]的奇数位。在ForkJoinPool中,WorkQueues[]是一个数组,用于存储所有线程的WorkQueue。

工作队列WorkQueue

WorkQueue是一个双端队列,用于存储工作线程自己的任务。每个工作线程都会维护一个本地的WorkQueue,并且优先执行本地队列中的任务。当本地队列中的任务执行完毕后,工作线程会尝试从其他线程的WorkQueue中窃取任务。

注意:在ForkJoinPool中,只有WorkQueues[]奇数位的WorkQueue是属于ForkJoinWorkerThread线程的,因此只有这些WorkQueue才能被线程本身使用和窃取任务。偶数位的WorkQueue是用于外部线程提交任务的,而且是由多个线程共享的,因此它们不能被线程窃取任务。

工作窃取

工作窃取,就是允许空闲线程从繁忙线程的双端队列中窃取任务。默认情况下,工作线程从它自己的双端队列的头部获取任务。但是,当自己的任务为空时,线程会从其他繁忙线程双端队列的尾部中获取任务。这种方法,最大限度地减少了线程竞争任务的可能性。

ForkJoinPool的大部分操作都发生在工作窃取队列(work-stealing queues ) 中,该队列由内部类WorkQueue实现。它是Deques的特殊形式,但仅支持三种操作方式:push、pop和poll(也称为窃取)。在ForkJoinPool中,队列的读取有着严格的约束,push和pop仅能从其所属线程调用,而poll则可以从其他线程调用。

通过工作窃取,Fork/Join框架可以实现任务的自动负载均衡,以充分利用多核CPU的计算能力,同时也可以避免线程的饥饿和延迟问题

如果对 ForkJoinPool 详细的实现细节感兴趣,也可以参考Doug Lea 的论文

总结

Fork/Join是一种基于分治思想的模型,在并发处理计算型任务时有着显著的优势。

  • 任务切分:将大的任务分割成更小粒度的小任务,让更多的线程参与执行;

  • 任务窃取:通过任务窃取,充分地利用空闲线程,并减少竞争。

使用ForkJoinPool时,需要特别注意任务的类型是否为纯函数计算类型,也就是这些任务不应该关心状态或者外界的变化,这样才是最安全的做法。如果是阻塞类型任务,那么你需要谨慎评估技术方案。虽然ForkJoinPool也能处理阻塞类型任务,但可能会带来复杂的管理成本。

处理阻塞任务重点学习CompletableFuture,包含各种编排任务的方法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1115761.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【ROS 2 基础-常用工具】-7 Rviz仿真机器人

所有内容请查看&#xff1a;博客学习目录_Howe_xixi的博客-CSDN博客

学习记录683@类别不平衡问题解决的基本策略之再缩放的数学解释

什么是类别不平衡问题 分类学习方法都有一个共同的基本假设&#xff0c;即不同类别的训练样例数目相当。如果不同类别的训练样例数目稍有差别&#xff0c;通常影响不大&#xff0c;但若差别很大&#xff0c;则会对学习过程造成困扰。例如有998个反例&#xff0c;但正例只有2个…

LLDB 三种输出方式 对比及原理探索

前言 当我们的项目过大时,就会使我们项目的编译耗时过长,如何在项目运行时进项代码调试,熟练使用LLDB就可以解决这个难题,大幅度提高我们的开发效率。 什么是 LLDB? LLDB是英文Low Lever Debug的缩写,是XCode内置的为我们开发者提供的调试工具,它与LLVM编译器一起,存…

程序被加载到进程的哪个位置?

程序被加载器加载到内存后&#xff0c;通过/proc/$pid/maps文件&#xff0c;我们可以观测到程序被加载的内存位置。那么&#xff0c;通过打印进程内存的方式&#xff0c;让我们确认程序是不是真的加载到内存&#xff0c;以及加载到内存的程序和硬盘中的文件有没有区别。 编写测…

未来数字化转型发展的前景如何,企业又该怎么实现?

商业世界有一个认识&#xff0c;互联网只用看中国和美国&#xff0c;其他国家已经被远远甩在了后边&#xff0c;移动互联网的出现更是将互联网的跨地域、跨国、互联等属性发挥到了极致&#xff0c;让众多互联网巨头开启了争夺世界各国市场的脚步。 移动互联网的飞速发展以及物…

drawio模板以及示例

drawio都能做那些事情和模板示例 你可以使用drawio&#xff0c;并使用drawio提供的扩展模板库和大量的形状库&#xff0c;针对很多不同的工业领域创建不同类型的图表。 针对如下的内容中的所有的图&#xff0c;均可以下载源文件并导入到drawio中再次编辑&#xff08;供学习者…

pytorch教程

文章目录 1 pytorch的安装2 PyTorch基础知识2.1 张量简介2.2 初始化2.3 张量的属性2.4 ndarray与tensor互转2.5 索引、切片、变形、聚合、矩阵拼接、切割、转置 3 pytorch自动微分4 线性回归5 分类5.1 写法一5.2 写法二 1 pytorch的安装 pytorch官网 https://pytorch.org/get-…

Vue3.0里为什么要用 Proxy API 替代 defineProperty API ?

一、Object.defineProperty 定义&#xff1a;Object.defineProperty() 方法会直接在一个对象上定义一个新属性&#xff0c;或者修改一个对象的现有属性&#xff0c;并返回此对象 为什么能实现响应式 通过defineProperty 两个属性&#xff0c;get及set get 属性的 getter 函…

攻防世界web篇-backup

这是链接中的网页&#xff0c;只有一句话 试着使用.bak点缀看看是否有效 这里链接中加上index.php.bak让下在东西 是一个bak文件&#xff0c;将.bak文件改为.php文件试试 打开.php文件后就可以得到flag值

【proteus】8086仿真、汇编语言

1.创建好新项目 2.点击source code 弹出VSM 3. 4.注意两个都不勾选 可以看到schematic有原理图出现 5. 再次点击source code 6.project/project settings&#xff0c;取消勾选embed 7. add 8.输入文件名保存后&#xff1a; 注意&#xff1a;proteus不用写dos的相关语句 。

【Linux升级之路】8_Linux多线程

目录 一、【Linux初阶】多线程1 | 页表的索引作用&#xff0c;线程基础&#xff08;优缺点、异常、用途&#xff09;&#xff0c;线程VS进程&#xff0c;线程控制&#xff0c;C多线程引入二、【Linux初阶】多线程2 | 分离线程&#xff0c;线程库&#xff0c;线程互斥&#xff0…

GCC优化相关

文章目录 优化选项博文链接 单独设置某段代码优化等级博文链接 优化选项 -O/-O0:无优化(默认)-O1:使用能减少目标文件大小以及执行时间并且不会使编译时间明显增加的优化。该模式在编译大型程序的时候会花费更多的时间和内存。在-O1 下&#xff0c;编译会尝试减少代码体积和代码…

【试题030】C语言之关系表达式例题

1.关系表达式是用关系运算符将两个表达式连接起来 错误示例&#xff1a;a<bc &#xff08;不是关系运算符&#xff0c;是赋值运算符&#xff09; 2.题目&#xff1a;设int m160,m280,m3100;&#xff0c;表达式m3>m2>m1的值是 &#xff1f; 3.代码分析&#xff1a; …

VueRouter 源码解析

重要函数思维导图 路由注册 在开始之前&#xff0c;推荐大家 clone 一份源码对照着看。因为篇幅较长&#xff0c;函数间的跳转也很多。 使用路由之前&#xff0c;需要调用 Vue.use(VueRouter)&#xff0c;这是因为让插件可以使用 Vue export function initUse(Vue: GlobalAP…

【Java】Java 11 新特性概览

Java 11 新特性概览 1. Java 11 简介2. Java 11 新特性2.1 HTTP Client 标准化2.2 String 新增方法&#xff08;1&#xff09;str.isBlank() - 判断字符串是否为空&#xff08;2&#xff09;str.lines() - 返回由行终止符划分的字符串集合&#xff08;3&#xff09;str.repeat(…

【标准化封装 SOT系列 】 C SOT-26

C // SOT-26 pin 间距 0.95mm 名称pin 数厂家 body DE矩形 (mm)SOT-266DIODES – ZXTC20 — 3.01.6

CEC2013(MATLAB):白鲨优化算法(White Shark Optimizer,WSO)求解CEC2013(提供MATLAB代码及参考文献)

一、白鲨优化算法原理 白鲨优化算法&#xff08;White Shark Optimizer&#xff0c;WSO&#xff09;由Malik Braik等人于2022年提出&#xff0c;该算法受大白鲨导航和觅食时具有的非凡听觉和嗅觉启发。该算法思路新颖&#xff0c;策略高效。【精选】单目标应用&#xff1a;白鲨…

蓝桥杯 (年号字串 C++)

思路&#xff1a; 1、看成10进制转化成26进制 。 2、A表示1、B表示2。以此类推&#xff0c;Z表示26. 代码&#xff1a; #include <iostream> using namespace std; int main() {char str[10]; int sum 2019, n, i 0; while (sum > 0) {str[i] sum % 26 64;sum / …

超低延迟直播技术路线,h265的无奈选择

超低延迟&#xff0c;多窗显示&#xff0c;自适应编解码和渲染&#xff0c;高分辨低码率&#xff0c;还有微信小程序的标配&#xff0c;这些在现今的监控和直播中都成刚需了&#xff0c;中国的音视频技术人面临着困境&#xff0c;核心门户浏览器不掌握在自己手上&#xff0c;老…

宝塔部署nginx遇到的400错误和502错误

在部署express项目的过程中&#xff0c;由于我的代码有些变化&#xff0c;于是在宝塔面板上我又重新上传了一下我的项目&#xff0c;结果阴差阳错的被nginx反向代理配置不当引起的400错误request header or cokkie is too large和自己代码逻辑问题引起的502 bad gataway给绊倒了…