Java并发工具合集JUC大爆发

news2025/1/24 8:37:09

1. CountDownLatch

CountDownLatch是一个同步计数器,初始化的时候 传入需要计数的线程等待数,可以是需要等待执行完成的线程数,或者大于 ,一般称为发令枪。\

​ countdownlatch 是一个同步类工具,不涉及锁定,当count的值为零时当前线程继续运行,不涉及同步,只涉及线程通信的时候,使用它较为合适

1.1 作用

用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用),是一组线程等待其他的线程完成工作以后在执行,相当于加强版join。

注意:这是一个一次性操作 - 计数无法重置。 如果你需要一个重置的版本计数,考虑使用CyclicBarrier。

1.2 举例

​ 我们去组团游玩一样,总共30个人,来的人要等待还没有到的人,一直等到第30个人到了,我们才开始出发,在等待过程中,其他人(线程)是等待状态不做任何事情的,一直等所有人(线程)到齐了(准备完成)才开始执行。

1.3 概念

  • countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
  • 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。

我们打开CountDownLatch的源代码分析,我们发现最重要的方法就是一下这两个方法:

//阻塞当前线程,等待其他线程执行完成,直到计数器计数值减到0。
public void await() throws InterruptedException;
//阻塞当前线程指定的时间,如果达到时间就放行,等待其他线程执行完成,直到计数器计数值减到0。
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
//负责计数器的减一。
public void countDown():
复制代码

1.4 应用场景

1.4.1 多线程压测

有时我们想同时启动多个线程,实现最大程度的并行性。

​ 例如,我们想测试一个单例类。如果我们创建一个初始计数为1的CountDownLatch,并让所有线程都在这个锁上等待,那么我们可以很轻松地完成测试。我们只需调用 一次countDown()方法就可以让所有的等待线程同时恢复执行。

1.4.2 等待其他线程

​ 例如应用程序启动类要确保在处理用户请求前,所有N个外部系统已经启动和运行了,例如处理excel中多个表单,如果一个一个出来很耗IO和性能,我们可以等100或者1000个线程都完成了表单的操作后一下子写进excel表单中。

注意:一个线程不一定只能做countDown一次,也可以countDown多次

1.5 示例

1.5.1 准备完成后执行

在实际项目中可能有些线程需要资源准备完成后才能进行执行,这个时候就可以使用countDownLatch

package chapter02.countdownlatch;

import java.util.Random;
import java.util.concurrent.*;

/**
 * countdownlatch 示例
 */
public class CountDownLatchTest {
    private static ExecutorService executorService = Executors.newFixedThreadPool(10);
    private static Random random = new Random();


    public static void execute(CountDownLatch countDownLatch) {
        //获取一个随机数
        long sleepTime = random.nextInt(10);
        //获取线程ID
        long threadId = Thread.currentThread().getId();
        System.out.println("线程ID" + threadId + ",开始执行--countDown");

        try {
            //睡眠随机秒
            Thread.sleep(sleepTime * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //计数器减1
        countDownLatch.countDown();
        System.out.println("线程ID" + threadId + ",准备任务完成耗时:" + sleepTime + "当前时间" + System.currentTimeMillis());
        try {
            //线程等待其他任务完成后唤醒
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("线程ID" + threadId + ",开始执行任务,当前时间:" + System.currentTimeMillis());
    }

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            executorService.submit(() -> {
                execute(countDownLatch);
            });
        }
        //线程等待其他任务完成后唤醒
        countDownLatch.await();
        Thread.sleep(1000);
        executorService.shutdown();
        System.out.println("全部任务执行完成");
    }
}
复制代码

1.5.2 多线程压测

在实战项目中,我们除了使用 jemter 等工具进行压测外,还可以自己动手使用 CountDownLatch 类编写压测代码。

​ 可以说 jemter 的并发压测背后也是使用的 CountDownLatch,可见掌握 CountDownLatch 类的使用是有多么的重要, CountDownLatch是Java多线程同步器的四大金刚之一,CountDownLatch能够使一个线程等待其他线程完成各自的工作后再执行。

package chapter02.countdownlatch;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * countDownLatch 压测
 */
public class CountDownLatchPressure {

    /**
     * 压测业务代码
     */
    public void testLoad() {
        System.out.println("压测:" + Thread.currentThread().getId() + ":" + System.currentTimeMillis());
    }

    /**
     * 压测启动
     * 主线程负责压测线程准备工作
     * 压测线程准备完成后 调用 start.countDown(); 启动线程执行
     * @throws InterruptedException
     */
    private void latchTest() throws InterruptedException {
        //压测线程数
        int testThreads = 300;
        final CountDownLatch start = new CountDownLatch(1);
        final CountDownLatch end = new CountDownLatch(testThreads);
        //创建线程池
        ExecutorService exce = Executors.newFixedThreadPool(testThreads);
        //准备线程准备
        for (int i = 0; i < testThreads; i++) {
            //添加到线程池
            exce.submit(() -> {
                try {
                    //启动后等待 唤醒
                    start.await();
                    testLoad();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //压测完成
                    end.countDown();
                }
            });

        }

        //连接池线程初始化完成 开始压测
        start.countDown();
        //压测完成后结束
        end.await();
        //关闭线程池
        exce.shutdown();
    }

    public static void main(String[] args) throws InterruptedException {
        CountDownLatchPressure countDownLatchPressure = new CountDownLatchPressure();
        //开始压测
        countDownLatchPressure.latchTest();
    }
}

复制代码

2. CyclicBarrier

2.1 简介

CyclicBarrier,是JDK1.5的java.util.concurrent(JUC)并发包中提供的一个并发工具类

C yclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集,当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置,如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

2.2 举例

就像生活中我们会约朋友们到某个餐厅一起吃饭,有些朋友可能会早到,有些朋友可能会晚到,但是这个餐厅规定必须等到所有人到齐之后才会让我们进去。

​ 这里的朋友们就是各个线程,餐厅就是CyclicBarrier,感觉和 CountDownLatch是一样的,但是他们是有区别的,吃完饭之后可以选择去玩一会,去处理任务,然后等待第二次聚餐,重复循环。

2.3 功能

CyclicBarrierCountDownLatch是非常类似的,CyclicBarrier核心的概念是在于设置一个等待线程的数量边界,到达了此边界之后进行执行。

CyclicBarrier类是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点(Common Barrier Point)。

CyclicBarrier类是一种同步机制,它能够对处理一些算法的线程实现同。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这里,然后所有线程才可以继续做其他事情。

​ 通过调用CyclicBarrier对象的await()方法,两个线程可以实现互相等待,一旦N个线程在等待CyclicBarrier达成,所有线程将被释放掉去继续执行。

2.4 构造方法

我们可以看下 CyclicBarrier源码的构造方法

public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)
复制代码

2.4.1 参数介绍

  • parties : 是参与线程的个数 , 其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

  • barrierAction : 优先执行线程 ,用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景,一般用于数据的整理以及汇总,例如excel插入一样,等所有线程都插入完了,到达了屏障后,barrierAction线程开始进行保存操作,完成后,接下来由其他线程开始进行插入,然后到达屏障接着就是保存,不断循环。

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。

2.5 重要方法

我们上面介绍了构造方法,下面我们介绍下CyclicBarrier中重要的方法

//阻塞当前线程,等待其他线程执行完成。
public int await() throws InterruptedException, BrokenBarrierException
//阻塞当前线程指定的时间,如果达到时间就放行,等待其他线程执行完成,
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
复制代码
  • 线程调用 await() 表示自己已经到达栅栏
  • BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时

2.6 基本使用

一个线程组的线程需要等待所有线程完成任务后再继续执行下一次任务

package chapter02.cyclicbarrier;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierTest {

    private static Random random = new Random();

    /**
     * 执行任务
     *
     * @param barrier
     */
    public static void execute(CyclicBarrier barrier) {
        //获取一个随机数
        long sleepTime = random.nextInt(10);
        //获取线程id
        long threadId = Thread.currentThread().getId();
        try {
            //睡眠随机秒
            Thread.sleep(sleepTime * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("线程ID" + threadId + ",准备任务完成耗时:" + sleepTime + "当前时间" + System.currentTimeMillis());

        //线程等待其他任务完成后唤醒
        try {
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("线程ID" + threadId + ",开始执行任务,当前时间:" + System.currentTimeMillis());
    }


    public static void main(String[] args) {
        //初始化线程数量
        int threadNum = 5;
        //初始化一般的线程
        CyclicBarrier barrier = new CyclicBarrier(5, () -> System.out.println("整理任务开始..."));
        ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        for (int i = 0; i < threadNum; i++) {
            executor.submit(() -> {
                execute(barrier);
            });
        }
    }
}
复制代码

2.7 CyclicBarrier 与 CountDownLatch 区别

  • CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
  • CountDownLatch.await一般阻塞工作线程,所有的进行预备工作的线程执行countDown,而CyclicBarrier通过工作线程调用await从而自行阻塞,直到所有工作线程达到指定屏障,再大家一起往下走。
  • CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。
  • 在控制多个线程同时运行上,CountDownLatch可以不限线程数量,而CyclicBarrier是固定线程数。
  • CyclicBarrier还可以提供一个barrierAction,合并多线程计算结果。

3. Semaphore

3.1 简介

Semaphore也叫信号量,在JDK1.5被引入,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。

  • 访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。
  • 访问资源后,使用release释放许可。

Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。Semaphore是一种计数信号量,用于管理一组资源,内部是基于AQS的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数。

​ 可以用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源

3.2 举例

​ 这里面令牌就像停车位一样,来了十辆车,停车位只有三个,只有三辆车能够进行,只有等其他车开走后,其他车才能开进去,和锁的不一样的地方是,锁一次只能进入一辆车,但是Semaphore允许一次进入很多车,这个令牌是可以调整的,随时可以增减令牌。

3.3 应用场景

Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的synchronized 关键字是实现不了的。

​ Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制

3.4 工作原理

以一个停车场是运作为例,为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。

​ 这时如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。

​ 这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。

​ 这个停车系统中,每辆车就好比一个线程,看门人就好比一个信号量,看门人限制了可以活动的线程,假如里面依然是三个车位,但是看门人改变了规则,要求每次只能停两辆车,那么一开始进入两辆车,后面得等到有车离开才能有车进入,但是得保证最多停两辆车。

​ 对于Semaphore类而言,就如同一个看门人,限制了可活动的线程数。

3.5 构造方法

创建具有给定许可数的计数信号量并设置为非公平信号量

查看Semaphore源码发现他有这两个构造方法

public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
复制代码

3.5.1 参数介绍

  • permits 是设置同时允许通过的线程数

  • fair 等于true时,创建具有给定许可数的计数信号量并设置为公平信号量。

3.6 其他方法

Semaphore类里面还有一些重要的方法

//从此信号量获取一个许可前线程将一直阻塞。相当于一辆车占了一个车位
public void acquire() throws InterruptedException
    
//从此信号量获取给定数目许可,在提供这些许可前一直将线程阻塞。比如n=2,就相当于一辆车占了两个车位。
public void acquire(int permits) throws InterruptedException
    
//释放一个许可,将其返回给信号量。就如同车开走返回一个车位。    
public void release()   
    
//获取当前可用许可数    
public void release(int permits)   
    
 //获取当前可用许可数
public int availablePermits()
复制代码

3.7 示例代码

共有5个车位但是有100个线程进行占用,车停几秒后会离开,释放车位给其他线程。

package chapter02.semaphore;

import java.util.Random;
import java.util.concurrent.*;

public class SemaphoreTest {
    private static ExecutorService executorService = Executors.newCachedThreadPool();

    private static Random random = new Random();

    //阻塞队列
    private static BlockingQueue<String> parks = new LinkedBlockingQueue<>(5);


    public static void execute(Semaphore semaphore) {
        //获取一个随机数
        long sleepTime = random.nextInt(10);
        long threadId = Thread.currentThread().getId();
        String park = null;
        try {
            /**
             * 获取许可,首先判断semaphore内部的数字是否大于0,如果大于0,
             * 才能获得许可,然后将初始值5减去1,线程才会接着去执行;如果没有
             * 获得许可(原因是因为已经有5个线程获得到许可,semaphore内部的数字为0),
             * 线程会阻塞直到已经获得到许可的线程,调用release()方法,释放掉许可,
             * 也就是将semaphore内部的数字加1,该线程才有可能获得许可。
             */
            semaphore.acquire();
            /**
             *  对应的线程会到阻塞对,对应车辆去获取到车位,如果没有拿到一致阻塞,
             *  直到其他车辆归还车位。
             */
            park = parks.take();
            System.out.println("线程ID" + threadId + ",开始占用车位:" + park + ",当前剩余车位" + semaphore.availablePermits());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            //睡眠随机秒
            Thread.sleep(sleepTime * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //归还车位
        parks.offer(park);
        System.out.println("线程ID" + threadId + ",开始归还车位:" + park + ",共占用" + sleepTime + "秒");
        //线程释放掉许可,通俗来将就是将semaphore内部的数字加1
        semaphore.release();
    }

    public static void main(String[] args) {
        //初始化线程数量
        int threadNum = 100;
        parks.offer("车位一");
        parks.offer("车位二");
        parks.offer("车位三");
        parks.offer("车位四");
        parks.offer("车位五");


        // 初始化5个许可证
        Semaphore semaphore = new Semaphore(5);
        //可以提前释放但是车位就会被多个线程同时占用
        //semaphore.release(5);
        for (int i = 0; i < threadNum; i++) {
            executorService.submit(() -> {
                execute(semaphore);
            });
        }
    }
}
复制代码

3.8 注意事项

即使创建信号量的时候,指定了信号量的大小 ,但是在通过 release()操作释放信号量仍然能释放超过配置的大小,也就有可能同时执行的线程数量比最开始设置的要大,没有任何线程获取信号量的时候,依然能够释放并且释放的有效。

​ 推荐的做法是一个线程先 acquire 然后 release,如果释放线程和获取线程不是同一个,那么最好保证这种对应关系。不要释放过多的许可证。

4. Fork/Join

4.1 简介

java下多线程的开发可以我们自己启用多线程,线程池,还可以使用forkjoin,forkjoin可以让我们不去了解诸如Thread,Runnable等相关的知识,只要遵循forkjoin的开发模式,就可以写出很好的多线程并发程序

​ Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

​ Fork/Join框架是一个实现了ExecutorService接口的多线程处理器。它可以把一个大的任务划分为若干个小的任务并发执行,充分利用可用的资源,进而提高应用的执行效率。

Fork/Join框架简化了并行程序的原因有 :

  • 它简化了线程的创建,在框架中线程是自动被创建和管理。
  • 它自动使用多个处理器,因此程序可以扩展到使用可用处理器。

4.2 举例

​ 就像我需要处理一百万行的excel,普通的处理是一个一个的excel进行处理,但是使用Fork/Join框架后的处理方式呢,加入我们定义100条数据为一个批次,那么Fork/Join就会拆分这个excel先到中间拆分成各有50万的数据,然后还比100大就继续拆分,不断的细分,最后分到了每一个线程分得到了100条然后才开始执行。

4.3 分而治之

“分而治之” 一直是一个有效的处理大量数据的方法。著名的 MapReduce 也是采取了分而治之的思想。

​ 简单来说,就是如果你要处理1000个数据,但是你并不具备处理1000个数据的能力,那么你可以只处理其中的10个,然后,分阶段处理100次,将100次的结果进行合成,那就是最终想要的对原始的1000个数据的处理结果。

​ 同时forkjoin在处理某一类问题时非常的有用,哪一类问题?分而治之的问题。十大计算机经典算法:快速排序、堆排序、归并排序、二分查找、线性查找、深度优先、广度优先、Dijkstra、动态规划、朴素贝叶斯分类,有几个属于分而治之?3个,快速排序、归并排序、二分查找,还有大数据中M/R都是。

4.3.1 分治法的设计思想

​ 将一个难以直接解决的大问题,分割成一些规模较小的相同问题,以便各个击破,分而治之。

4.3.2 分治策略

​ 对于一个规模为n的问题,若该问题可以容易地解决(比如说规模n较小)则直接解决,否则将其分解为k个规模较小的子问题,这些子问题互相独立且与原问题形式相同(子问题相互之间有联系就会变为动态规范算法),递归地解这些子问题,然后将各子问题的解合并得到原问题的解。这种算法设计策略叫做分治法。

4.4 Fork-Join原理

​ Fork/Join实现了ExecutorService,所以它的任务也需要放在线程池中执行。它的不同在于它使用了工作窃取算法,空闲的线程可以从满负荷的线程中窃取任务来帮忙执行。

​ 由于线程池中的每个线程都有一个队列,而且线程间互不影响,那么线程每次都从自己的任务队列的头部获取一个任务出来执行。如果某个时候一个线程的任务队列空了,而其余的线程任务队列中还有任务,那么这个线程就会从其他线程的任务队列中取一个任务出来帮忙执行。就像偷取了其他人的工作一样

4.4.1 任务分割和合并

Fork/Join框架的基本思想就是将一个大任务分解(Fork)成一系列子任务,子任务可以继续往下分解,当多个不同的子任务都执行完成后,可以将它们各自的结果合并(Join)成一个大结果,最终合并成大任务的结果

我们看下面这个图

​ 首先main Task 先fork成 0,1两个任务 接着,因为还是太大,继续fork成 0-0,0-1,1-0,1-1 然后进行计算计算完成后进行join操作,0-0,1-1 join到0, 1-0,1-1 join到1 然后 0和1继续join到mainTask,完成计算任务。

4.4.2 工作密取

即当前线程的Task已经全被执行完毕,则自动取到其他线程的Task池中取出Task继续执行即如果一个工作线程没有事情要做,它可以从其他仍然忙碌的线程窃取任务。

​ ForkJoinPool中维护着多个线程(一般为CPU核数)在不断地执行Task,每个线程除了执行自己职务内的Task之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高CPU利用率。

4.5 相关子类

​ 我们已经很清楚 Fork/Join 框架的需求了,那么我们可以思考一下,如果让我们来设计一个 Fork/Join 框架,该如何设计?这个思考有助于你理解 Fork/Join 框架的设计。

​ 第一步分割任务。首先我们需要有一个 fork 类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。

​ 第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Fork/Join 使用两个类来完成以上两件事情:

4.5.1 ForkJoinTask

​ 我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:

4.5.1.1 RecursiveAction

用于没有返回结果的任务

4.5.1.2 RecursiveTask

用于有返回结果的任务。

4.5.2 ForkJoinPool

​ ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务

4.6 Fork/Join使用

​ Task要通过ForkJoinPool来执行,使用submit 或 invoke 提交,两者的区别是:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit是异步执行,join()和get方法当任务完成的时候返回计算结果

​ 在我们自己实现的compute方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用invokeAll方法时,又会进入compute方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。

4.6.1 任务的提交逻辑

fork/join其实大部分逻辑处理操作都集中在提交任务和处理任务这两块,了解任务的提交基本上后面就很容易理解了, fork/join提交任务主要分为两种:

4.6.1.1 第一次提交到forkJoinPool

//创建初始化任务
SubmitTask submitTask = new SubmitTask(start, end);
//将初始任务扔进连接池中执行
forkJoinPool.invoke(submitTask);
复制代码

4.6.1.2 任务切分之后的提交

//没有达到阈值 计算一个中间值
long mid = (start + end) / 2;
//拆分 左边的
SubmitTask left = new SubmitTask(start, mid);
//拆分右边的
SubmitTask right = new SubmitTask(mid + 1, end);
//添加到任务列表
invokeAll(left, right);
复制代码

4.6.1.3 合并任务

//合并结果并返回
return left.join() + right.join();
复制代码

4.6.1.4 代码案例

package chapter02.forkjoin;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

/**
 * 计算 0-10000 阶乘
 */
public class SubmitTask extends RecursiveTask<Long> {
    /**
     * 起始值
     */
    private long start;
    /**
     * 结束值
     */
    private long end;
    /**
     * 阈值
     */
    private long threshold = 10L;

    public SubmitTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    /**
     * 计算逻辑
     * 进行任务的拆分 以及 达到阈值的计算
     *
     * @return
     */
    @Override
    protected Long compute() {
        //校验是否达到了阈值
        if (isLessThanThreshold()) {
            //处理并返回结果
            return handle();
        } else {
            //没有达到阈值 计算一个中间值
            long mid = (start + end) / 2;
            //拆分 左边的
            SubmitTask left = new SubmitTask(start, mid);
            //拆分右边的
            SubmitTask right = new SubmitTask(mid + 1, end);
            //添加到任务列表
            invokeAll(left, right);
            //合并结果并返回
            return left.join() + right.join();
        }
    }

    /**
     * 处理的任务
     *
     * @return
     */
    public Long handle() {
        long sum = 0;
        for (long i = start; i <= end; i++) {
            sum += i;
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return sum;
    }

    /*是否达到了阈值*/
    private boolean isLessThanThreshold() {
        return end - start <= threshold;
    }

    /**
     * forkJoin 方式调用
     *
     * @param start
     * @param end
     */
    public static void forkJoinInvok(long start, long end) {
        long sum = 0;
        long currentTime = System.currentTimeMillis();
        //创建ForkJoinPool 连接池
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //创建初始化任务
        SubmitTask submitTask = new SubmitTask(start, end);
        //将初始任务扔进连接池中执行
        forkJoinPool.invoke(submitTask);

        //forkJoinPool.submit(submitTask);
        // System.out.println("异步方式,任务结束才会调用该方法,当前耗时"+(System.currentTimeMillis() - currentTime));
        //等待返回结果
        sum = submitTask.join();
        //forkjoin调用方式耗时
        System.out.println("forkJoin调用:result:" + sum);
        System.out.println("forkJoin调用耗时:" + (System.currentTimeMillis() - currentTime));
    }

    /**
     * 普通方式调用
     *
     * @param start
     * @param end
     */
    public static void normalInvok(long start, long end) {
        long sum = 0;
        long currentTime = System.currentTimeMillis();
        for (long i = start; i <= end; i++) {
            sum += i;
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //普通调动方式耗时
        System.out.println("普通调用:result:" + sum);
        System.out.println("普通调用耗时:" + (System.currentTimeMillis() - currentTime));
    }

    public static void main(String[] args) {
        //起始值的大小
        long start = 0;
        //结束值的大小
        long end = 10000;
        //forkJoin 调用
        forkJoinInvok(start, end);
        System.out.println("========================");
        //普通调用
        normalInvok(start, end);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/433851.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

我实现了一个乞丐版的评论功能

文章目录 设计评论功能0 设计初衷1 前端组建设计**设计原则****设计代码**组件核心代码**调用组建并给出mock数据****效果** 2 后端数据库设计3 后端接口设计4 前后端联调5 后端评论保存接口设计6 前端评论填写流程设计7 联调8 验证码美化 设计评论功能 0 设计初衷 经过长达八…

三百左右的蓝牙耳机哪个音质好?三百左右音质最好的蓝牙耳机推荐

在外出携带的数码产品中&#xff0c;蓝牙耳机的出现频率居高不下&#xff0c;一部手机&#xff0c;一副耳机已经成为不少人外出的标配。蓝牙耳机无外乎是用来听的&#xff0c;下面&#xff0c;我来给大家推荐几款三百左右音质好的蓝牙耳机&#xff0c;一起来看看吧。 一、南卡…

LabVIEW-字符串与路径控件

在前面板中字符串与路径控件位于下图所示位置&#xff1a; 字符串输入和显示功能&#xff0c;是用户最常用的基本操作功能单击字符串控件&#xff0c;鼠标右键&#xff0c;选择“属性”可以对字符串控件的外观进行设置。显示样式有四种方式&#xff0c;即正常、反斜杠符号、密码…

家用洗地机好用吗?好用的洗地机分享

洗地机是一种高效、节能、环保的清洁设备&#xff0c;广泛应用于各种场所的地面清洁工作。它不仅可以快速清洁地面&#xff0c;还可以有效去除污渍、油渍等难以清洁的污染物&#xff0c;让地面恢复光洁如新的状态。同时&#xff0c;洗地机还可以减少清洁人员的劳动强度&#xf…

研读Rust圣经解析——Rust learn-10(泛型,trait,生命周期)

研读Rust圣经解析——Rust learn-10&#xff08;泛型&#xff0c;trait&#xff0c;生命周期&#xff09; 泛型应用泛型方法泛型结构体枚举泛型方法定义中的泛型 trait定义一个trait默认trait方法实现为结构体实现trait调用trait中实现的方法将trait作为参数trait bound多实现入…

2023年6月CDGP数据治理专家认证考试火热报名中

DAMA认证为数据管理专业人士提供职业目标晋升规划&#xff0c;彰显了职业发展里程碑及发展阶梯定义&#xff0c;帮助数据管理从业人士获得企业数字化转型战略下的必备职业能力&#xff0c;促进开展工作实践应用及实际问题解决&#xff0c;形成企业所需的新数字经济下的核心职业…

数据结构—单链表

目录 1.前言 2.了解单链表 3.单链表代码实现 3.1 单链表结构体实现 3.2 创建节点 3.3 打印单链表 3.4 尾插 3.5 头插 3. 6 头删 3.7 尾删 3.8 查找 3.9 插入 3.9.1 在pos位置之前插入 3.9.2 在pos位置之后插入&#xff08;主要使用这种功能&#xff09;---不需要找…

家用洗地机到底好不好用?家用洗地机分享

在当今社会&#xff0c;人们越来越关注卫生和清洁&#xff0c;这也促进了家庭和工作场所对清洁设备的需求。洗地机就是其中之一&#xff0c;它的高效和便捷性为我们提供了清洁和保洁的重要帮助。使用洗地机不仅能够卫生地保持地面清洁&#xff0c;而且可以节省时间和人力成本。…

拼多多的天天618,如何掀开电商营销的“皇帝新衣”?

电商价格战如火如荼&#xff0c;拼多多也在2023年4月正式启动“数码家电消费季”百亿补贴。 首季将在百亿补贴的基础上加码10亿&#xff0c;对手机、平板等各种数码家电&#xff0c;提供全品类补贴&#xff0c;苹果、华为、小米、美的等国内外各大品牌均会参与。拼多多相关负责…

安装虚拟机VMshare

前言&#xff1a;虚拟机必须在开机的状态下&#xff0c;而且互相需ping通&#xff0c;mobax才可以连接成功 一、下载VMsharePro软件 1、双击 安装程序&#xff1b; 2、按照步骤 点击一个个的“下一步” 3、安装完成之后&#xff0c;会要求你 输入许可证&#xff0c;这个可以…

【Redis】Redis十大数据类型—字符串String

介绍 获取命令地址 英文&#xff1a;https://redis.io/commands/ 中文&#xff1a;http://www.redis.cn/commands.html 字符串(string) 字符串是一种最基本的Redis值类型。Redis字符串是二进制安全的&#xff0c;这意味着一个Redis字符串能包含任意类型的数据&#xff0c;例…

STM:基于Siamese编码器的时空混频器用于CT扫描肺结节生长趋势预测

文章目录 Siamese Encoder-based Spatial-Temporal Mixer for Growth Trend Prediction of Lung Nodules on CT Scans摘要方法Spatial-Temporal MixerTwo-Layer H-Loss 实验结果 Siamese Encoder-based Spatial-Temporal Mixer for Growth Trend Prediction of Lung Nodules on…

JavaScript的三座大山

前言&#xff1a;这个题目是抄的&#xff0c;看着很有意思&#xff0c;就拿过用了&#xff0c;毕竟CV是程序员的基本功底嘛&#xff0c;顺带把图也拿过来了 作用域和闭包 这个几乎是天天在用的东西&#xff0c;可能有些人甚至不知道这个概念&#xff0c;但是用到过这种方法去解…

Dubbo消费者调用流程分析

消费者在发起一次调用的时候时序图如下 由于Dubbo调用是基于动态代理的方式,所以请求先进入 InvokerInvocationHandler#invoke()方法,进而调用到MockClusterInvoker#invoke()方法。MockClusterInvoker#invoke()中判断是否需要开启 Mock,如果开启 Mock 调用 doMockInvoke 执行…

WebRTC系列-Qos系列之AEC-可配置参数

文章目录 1. 简介2. 源码中相关参数WebRTC的自适应回声消除(AEC)是一个广泛使用的技术,用于在音频通信中消除扬声器输出产生的回声。在WebRTC中,有三种AEC算法可供选择,分别是 AECM、 AEC和 AEC3。本文将介绍WebRTC AEC 3算法的原理和应用场景。 在上图中可以看出AEC算…

MiniGPT4,开源了。

大家好&#xff0c;我是 Jack。 一个月前&#xff0c;我发布过一篇文章&#xff0c;讲解了 GPT4 的发布会。 ChatGPT 的对话能力&#xff0c;想必大家也早已体验过了&#xff0c;无论是文本生成能力&#xff0c;还是写代码的能力&#xff0c;甚至是上下文的关联对话能力&#…

SpringBoot自定义登录、权限验证

1、首先最基础的User实体类&#xff0c;使用了lombok&#xff0c;所以省略了getter、setter方法 Data public class UserInfo implements Serializable {private Integer id;//用户名private String username;//密码不需要被序列化存入redisprivate transient String password…

vue3类型uniapp调用signalr

目录 背景 安装 renderjs 1选择一个tab页面承载renderjs代码 2编写业务逻辑代码 3编写renderjs代码 背景 后端使用.net6开发&#xff0c;长链接选择了微软的signalr而非原生的websocket 前端uniapp下vue3类型开发的app&#xff0c;需要通过长链接获取后端推送的消息 安…

通过对话了解cookie session与token的用途和区别

1 先来了解cookie与localstorage 1.1 http的无状态 用户: 我想看csdn我有多少粉丝了(http请求) 服务器:你是&#xff1f;请告诉我你的名字和密码&#xff0c;我确认你是谁 用户&#xff1a;发起登录请求 admin 123456 服务器&#xff1a;ok&#xff0c;登录成功 用户&…

分享几个国内免费的ChatGPT镜像网址(亲测有效)

最近由于ChatGPT的爆火也让很多小伙伴想去感受一下ChatGPT的魅力&#xff0c;那么今天就分享几个ChatGPT国内的镜像网址&#xff0c;大家可以直接使用&#xff01;记得点赞收藏一下呦&#xff01; 1、AQ Bot&#xff0c;网址&#xff1a;点我 https://su.askaiw.com/aq 缺点&…