Java并发工具-4-并发框架(ExecutorForkJoin)

news2024/11/23 22:30:22

一 Executor 并发框架介绍

1 整体结构介绍

executor [ɪɡˈzekjətə(r)] 执行者 execute [ˈeksɪkjuːt] 执行

从 JDK 1.5 开始,java 中将工作单元和执行机制做了分离,于是 Executor 并行框架出现。

什么是工作单元(或称为任务)呢?其实就是我们需要运行的一段逻辑代码。不管什么逻辑的工作单元,最终都需要通过线程运行。

Executor 并行框架对工作单元、以及工作单元的执行做了高度抽象,形成了一整套完整模型。这个模型包括 3 大部分:

  1. 对工作单元的抽象,即任务。
  2. 任务的执行机制,即如何组织任务的提交、如何管理提交的任务、如何组织多个线程执行。
  3. 对任务执行结果的抽象,即如何跟踪任务执行状态,如何获取任务执行结果。

2 核心接口和实现类

整个 Executor 框架的核心接口和实现类型如下:

  1. 工作单元:Runnable,Callable
  2. 工作单元执行:Executor,ExecutorService
  3. 工作单元执行结果:Future,FutureTask

Executor 框架核心接口的使用逻辑如下图:

在这里插入图片描述

2.1 Runnable & Callable

当不需要关注任务执行结果时,使用 Runnable 很合适,反之使用 Callable。代码举例:

Runnable task = new Runnable() {
	public void run() {
		// 任何想要执行的逻辑
	}
}

Callable<String> task = new Callable<String>() {
    public String call() throws Exception {
        // 任何想要执行的逻辑
        return "任务执行后任何想返回的信息";
    }
};

2.2 Executor & ExecutorService

Executor 接口定义了一个用于执行任务的 execute 方法。ExecutorService 是 Executor 的子接口,其职责是对一堆用于执行任务的线程做管理,即定义了线程池的基本操作接口,有很多具体的实现子类,其核心操作有:

ExecutorService的创建

1.newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
2.newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
3.newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
4.newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

ExecutorService的使用

  1. execute (Runnable):提交 Runnable 任务。
  2. submit (Callable 或 Runnable):提交 Callable 或 Runnable 任务,并返回代表此任务的 Future 对象。
  3. invokeAny(…)方法接收的是一个Callable的集合,执行这个方法不会返回Future,但是会返回所有Callable任务中其中一个任务的执行结果。这个方法也无法保证返回的是哪个任务的执行结果,反正是其中的某一个。
  4. invokeAll(…)与 invokeAny(…)类似也是接收一个Callable集合,但是前者执行之后会返回一个Future的List,其中对应着每个Callable任务执行后的Future对象。
  5. shutdown ():关闭新的外部任务提交。
  6. shutdownNow ():尝试中断正在执行的所有任务,清空并返回待执行的任务列表。
  7. isTerminated ():测试是否所有任务都执行完毕了。
  8. isShutdown ():测试是否该 ExecutorService 已被关闭。

2.3 Future & FutureTask

Future 接口定义了对任务执行结果的取消、状态查询、结果获取方法。FutureTask 是 Future 的唯一实现类,其职责是提供方便地构建带有返回结果的任务。Future 接口的核心操作有:

  1. cancel (boolean):尝试取消已经提交的任务。
  2. isCancelled ():查询任务是否在完成之前被取消了。
  3. isDone ():查询任务是否已经完成。
  4. get ():获取异步任务的执行结果(如果任务没执行完将等待)。

二 Executor 应用示例

1 案例描述

我们可以通过手工创建线程做逻辑单元的执行,但是当存在大量的需要执行的逻辑单元也是这样处理,就会出现很多麻烦的事情,且效率非常低下。手工创建线程并做线程管理,需要我们实现很多与业务无关的控制代码,另外手工不停的创建线程并做线程销毁,会浪费很多系统资源。

我们在实际项目中,常常通过使用 java 提供好的非常好用的线程框架 Executor 进行任务执行操作。

有这样一个场景:需要对某个目录下的所有文件(成百上千)进行加密并用文件的 MD5 串修改文件名称。

在开始动手实现之前,我们先做一个简单的分析。在这个案例中,我们将 “对文件进行加密、生成 MD5 串、修改文件名称” 作为待执行任务的内容。所有文件形成的列表就是我们待处理的数据范围。为了校验整个处理过程是否有文件遗漏,我们最终需要核对处理结果。为了方便演示,下面编码中部分数据采用了模拟的方式生成。

2 编码实现

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorTest {

    // 模拟待处理的文件列表
    private static int fileListSize = new Random().nextInt(6);
    private static String[] fileList = new String[fileListSize];
    static {
        for(int i=0; i<fileListSize; i++) {
            fileList[i] = "fileName" + i;
        }
    }

    // 主线程
    public static void main(String[] args) throws Exception {
        // 创建用于处理任务的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        // 任务提交,每一个任务处理一个文件
        List<FileDealTask> tasks = new ArrayList<>();
        for(int i=0; i<fileListSize; i++) {
            tasks.add(new FileDealTask(0, fileListSize, fileList[i]));
        }
        // 等待异步处理结果返回
        List<Future<Integer>> results = executorService.invokeAll(tasks);
        // 获取任务执行结果
        Integer total = 0;
        for (Future<Integer> result : results) {
            total = total + result.get();
        }
        System.out.println("预备处理的文件个数" + fileListSize + ",总共处理的文件个数:" + total);
        // 关闭线程池
        executorService.shutdown();
    }
}

上面代码注释已经很清楚了,我们观察下面的代码,看看任务代码

import java.util.Random;
import java.util.concurrent.Callable;

public class FileDealTask implements Callable<Integer> {

    private String fileName;

    public FileDealTask(int first, int last, String fileName) {
        this.fileName = fileName;
    }

    @Override
    public Integer call() throws Exception {
        try {
            Thread.sleep(new Random().nextInt(2000));
            System.out.println(Thread.currentThread().getName() + ":文件" + fileName + "已处理完毕");
        } catch (Exception e) {
            return 0;
        }
        return 1;
    }
}

上面代码逻辑中有随机内容,每次运行结果会有差异,运行上面的代码,我们观察运行结果:

pool-1-thread-2:文件fileName1已处理完毕
pool-1-thread-3:文件fileName2已处理完毕
pool-1-thread-1:文件fileName0已处理完毕
预备处理的文件个数3,总共处理的文件个数:3

3 注意事项

  1. Executors 是 Executor 框架体系中的一个独立的工具类,用于快速创建各类线程池,在实际应用中,如果需要对线程池的各类参数做更多的自定义,可以参考此类的实现。
  2. 做好评估权衡,当需要处理的数据量不是特别大时,没有必要使用 Executor。其底层使用多线程的方式处理任务,涉及到线程上下文的切换,当数据量不大的时候使用串行会比使用多线程快。
  3. 在使用时,如果主线程不关心子任务的执行结果,请使用 Runnable 接口封装任务的执行逻辑。

三 ForkJoin 并发框架

1 分治法

基本思想

把一个规模大的问题划分为规模较小的子问题,然后分而治之,最后合并子问题的解得到原问题的解。

步骤

①分割原问题;

②求解子问题;

③合并子问题的解为原问题的解。

我们可以使用如下伪代码来表示这个步骤。

if(任务很小){
    直接计算得到结果
}else{
    分拆成N个子任务
    调用子任务的fork()进行计算
    调用子任务的join()合并计算结果
}

在分治法中,子问题一般是相互独立的,因此,经常通过递归调用算法来求解子问题。

2 ForkJoin框架概述

Java 1.7 引入了一种新的并发框架—— Fork/Join Framework,主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数。

ForkJoin框架的本质是一个用于并行执行任务的框架, 能够把一个大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务的计算结果。在Java中,ForkJoin框架与ThreadPool共存,并不是要替换ThreadPool

其实,在Java 8中引入的并行流计算,内部就是采用的ForkJoinPool来实现的。例如,下面使用并行流实现打印数组元组的程序。

public class SumArray {
    public static void main(String[] args){
        List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
        numberList.parallelStream().forEach(System.out::println);
    }
}

这段代码的背后就使用到了ForkJoinPool。

说到这里,可能有读者会问:可以使用线程池的ThreadPoolExecutor来实现啊?为什么要使用ForkJoinPool啊?ForkJoinPool是个什么鬼啊?! 接下来,我们就来回答这个问题。

3 ForkJoin框架原理

ForkJoin框架是从jdk1.7中引入的新特性,它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入指定的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。

ForkJoinPool主要使用**分治法(Divide-and-Conquer Algorithm)**来解决问题。典型的应用比如快速排序算法。这里的要点在于,ForkJoinPool能够使用相对较少的线程来处理大量的任务。

比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。

比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概200万+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。

所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法向任务队列中再添加一个任务并在等待该任务完成之后再继续执行。而使用ForkJoinPool就能够解决这个问题,它就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。

那么使用ThreadPoolExecutor或者ForkJoinPool,性能上会有什么差异呢?

首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,很显然这是不可行的,也是很不合理的!!

4 ForkJoin框架的实现

ForkJoin框架中一些重要的类如下所示。

在这里插入图片描述

ForkJoinPool 框架中涉及的主要类如下所示。

4.1 ForkJoinPool类

实现了ForkJoin框架中的线程池,由类图可以看出,ForkJoinPool类实现了线程池的Executor接口。

我们也可以从下图中看出ForkJoinPool的类图关系。

在这里插入图片描述

其中,可以使用Executors.newWorkStealPool()方法创建ForkJoinPool。

ForkJoinPool中提供了如下提交任务的方法。
public void execute(ForkJoinTask<?> task)
public void execute(Runnable task)
public <T> T invoke(ForkJoinTask<T> task)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(Callable<T> task)
public <T> ForkJoinTask<T> submit(Runnable task, T result)
public ForkJoinTask<?> submit(Runnable task)

4.2 ForkJoinWorkerThread类

实现ForkJoin框架中的线程。

4.3 ForkJoinTask<V>类

ForkJoinTask封装了数据及其相应的计算,并且支持细粒度的数据并行。ForkJoinTask比线程要轻量,ForkJoinPool中少量工作线程能够运行大量的ForkJoinTask。

ForkJoinTask类中主要包括两个方法fork()和join(),分别实现任务的分拆与合并。

fork()方法类似于Thread.start(),但是它并不立即执行任务,而是将任务放入工作队列中。跟Thread.join()方法不同,ForkJoinTask的join()方法并不简单的阻塞线程,而是利用工作线程运行其他任务,当一个工作线程中调用join(),它将处理其他任务,直到注意到目标子任务已经完成。

我们可以使用下图来表示这个过程。

在这里插入图片描述

ForkJoinTask有3个子类:

在这里插入图片描述

  • RecursiveAction:无返回值的任务。
  • RecursiveTask:有返回值的任务。
  • CountedCompleter:完成任务后将触发其他任务。

4.4 RecursiveTask<V> 类

有返回结果的ForkJoinTask实现Callable。

4.5 RecursiveAction类

无返回结果的ForkJoinTask实现Runnable。

4.6 CountedCompleter<T> 类

在任务完成执行后会触发执行一个自定义的钩子函数。

5 ForkJoin示例程序

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
    public static final int threshold = 2;
    private int start;
    private int end;

    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        //如果任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任务大于阈值,就分裂成两个子任务计算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

            // 执行子任务
            leftTask.fork();
            rightTask.fork();

            // 等待任务执行结束合并其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一个计算任务,计算1+2+3+4+...+100
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

        //执行一个任务
        Future<Integer> result = forkjoinPool.submit(task);

        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}
16:39:08.917 [main] INFO ForkJoinTaskExample - result:5050

四 补充 - 创建线程池的方法 - ThreadPoolExecutor和Executors

1 线程池介绍

什么是线程池?

线程池就是一个可以复用线程的技术。

不使用线程池的问题:

如果用户每发起一个请求,后台就创建一个新线程来处理,下次新任务来了又要创建新线程,而创建新线程的开销是很大的,这样会严重影响系统的性能。

线程池工作原理:

例如线程池中最多可以允许创建三个工作线程, 也叫核心线程, 前面三个任务来的时候会给前面三个任务单独创建三个线程; 但是后面任务再来的时候, 因为创建的工作线程已达到最大数, 那么后面的任务就会进入任务队列中排队等待; 等前面的任务执行完成, 有空闲的线程的时候使用空闲的线程依次执行任务队列中的任务

2 实现线程池的方式

谁代表线程池?

JDK 5.0起提供了代表线程池的接口:ExecutorService

如何得到线程池对象?

  • 方式一:使用ExecutorService的实现类ThreadPoolExecutor自创建一个线程池对象
  • 方式二:使用Executors(线程池的工具类)调用方法返回不同特点的线程池对象

方式一: 实现类ThreadPoolExecutor

ThreadPoolExecutor构造器的参数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 

参数介绍:

参数一:指定线程池的线程数量(核心线程): corePoolSize ----> 不能小于0
参数二:指定线程池可支持的最大线程数: maximumPoolSize ----> 最大数量 >= 核心线程数量
参数三:指定临时线程的最大存活时间: keepAliveTime ----> 不能小于0
参数四:指定存活时间的单位(秒、分、时、天): unit ----> 时间单位
参数五:指定任务队列: workQueue ----> 不能为null
参数六:指定用哪个线程工厂创建线程: threadFactory ----> 不能为null
参数七:指定线程忙,任务满的时候,新任务拒绝策略: handler ----> 不能为null

新任务拒绝策略:

策略详解
ThreadPoolExecutor.AbortPolicy丢弃任务并抛出RejectedExecutionException异常。是默认的策略
ThreadPoolExecutor.DiscardPolicy丢弃任务,但是不抛出异常 这是不推荐的做法
ThreadPoolExecutor.DiscardOldestPolicy抛弃队列中等待最久的任务 然后把当前任务加入队列中
ThreadPoolExecutor.CallerRunsPolicy由主线程负责调用任务的run()方法从而绕过线程池直接执行

ThreadPoolExecutor创建线程池对象示例:

ExecutorService pools = new ThreadPoolExecutor(3, 
                                               5, 
                                               8, 
                                               TimeUnit.SECONDS, 
                                               new ArrayBlockingQueue<>(6), 
                                               Executors.defaultThreadFactory(), 
                                               new ThreadPoolExecutor.AbortPolicy());

思考:

临时线程什么时候创建啊?

  • 新任务提交时发现核心线程都在忙,任务队列也满了,并且还可以创建临时线程,此时才会创建临时线程。

什么时候会开始拒绝任务?

  • 核心线程和临时线程都在忙,任务队列也满了,新的任务过来的时候才会开始任务拒绝。
线程池处理Runnable任务

ExecutorService的常用方法:

方法名称说明
execute(Runnable command)执行任务/命令,没有返回值,一般用来执行 Runnable 任务
shutdown()等任务执行完毕后关闭线程池(一般不会关闭线程池)
shutdownNow()立刻关闭,停止正在执行的任务,并返回队列中未执行的任务(一般不会关闭线程池)

演示代码:

创建一个Runnable任务线程类

public class MyRunnable implements Runnable{
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName() + "线程执行输出: " + i);
        }
        // 为了测试, 我们让每个线程睡眠3秒
        try {
            System.out.println(Thread.currentThread().getName() + "线程进入休眠");
            Thread.sleep(3000);
            System.out.println(Thread.currentThread().getName() + "线程执行完成");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在主类中, 创建一个线程池对象并创建Runnable任务交给线程池处理

  • 如果只有三个任务, 那么会被三个核心线程同时执行
public static void main(String[] args) {
    // 1. 创建一个线程池对象 核心线程为3, 最大线程数5, 临时线程存活6秒, 任务队列最大为5
    ExecutorService es = new ThreadPoolExecutor(3, 5, 6,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    // 2. Runnable任务交给任务线程池处理
    Runnable target = new MyRunnable();
    // 三个Runnable任务会被核心线程执行
    es.execute(target);
    es.execute(target);
    es.execute(target);
}
  • 打印结果如下
pool-1-thread-1线程执行输出: 0
pool-1-thread-2线程执行输出: 0
pool-1-thread-3线程执行输出: 0
pool-1-thread-2线程执行输出: 1
pool-1-thread-2线程执行输出: 2
pool-1-thread-2线程执行输出: 3
pool-1-thread-1线程执行输出: 1
pool-1-thread-2线程执行输出: 4
pool-1-thread-3线程执行输出: 1
pool-1-thread-3线程执行输出: 2
pool-1-thread-3线程执行输出: 3
pool-1-thread-1线程执行输出: 2
pool-1-thread-3线程执行输出: 4
pool-1-thread-1线程执行输出: 3
pool-1-thread-1线程执行输出: 4
pool-1-thread-2线程进入休眠
pool-1-thread-3线程进入休眠
pool-1-thread-1线程进入休眠
pool-1-thread-3线程执行完成
pool-1-thread-1线程执行完成
pool-1-thread-2线程执行完成
  • 如果核心线程全部被占用, 那么后面的任务会进入任务队列中排队等待有空闲的核心线程; 由于我们设置的任务队列数是5, 所以进入任务队列的任务数量小于等于5时, 不会创建临时线程
public static void main(String[] args) {
    // 1. 创建一个线程池对象 核心线程为3, 最大线程数5, 临时线程存活6秒, 任务队列最大为5
    ExecutorService es = new ThreadPoolExecutor(3, 5, 6,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    // 2. Runnable任务交给任务线程池处理
    Runnable target = new MyRunnable();
    // 三个Runnable任务会被核心线程执行
    es.execute(target);
    es.execute(target);
    es.execute(target);

    // 三个核心线程被占满, 会进入任务队列排队等待有空闲的核心线程
    es.execute(target);
    es.execute(target);
    es.execute(target);
    es.execute(target);
    es.execute(target);
}
  • 由于核心线程都在忙, 任务队列也满了, 这时候我们再继续添加任务时, 就会创建临时线程; 因为我们设置的核心线程数是3个, 最大线程数是5个, 所以临时线程最多只会创建两个
public static void main(String[] args) {
    // 1. 创建一个线程池对象 核心线程为3, 最大线程数5, 临时线程存活6秒, 任务队列最大为5
    ExecutorService es = new ThreadPoolExecutor(3, 5, 6,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    // 2. Runnable任务交给任务线程池处理
    Runnable target = new MyRunnable();
    // 三个Runnable任务会被核心线程执行
    es.execute(target);
    es.execute(target);
    es.execute(target);

    // 三个核心线程被占满, 会进入任务队列排队等待有空闲的核心线程(我们设置的任务队列最大允许排队5个任务)
    es.execute(target);
    es.execute(target);
    es.execute(target);
    es.execute(target);
    es.execute(target);

    // 三个核心线程都在忙, 任务队列满, 创建临时线程
    es.execute(target);
    es.execute(target);
}

  • 核心线程都忙, 任务队列满, 临时线程忙, 此时再继续添加任务就会触发拒绝任务策略
public static void main(String[] args) {
    // 1. 创建一个线程池对象 核心线程为3, 最大线程数5, 临时线程存活6秒, 任务队列最大为5
    ExecutorService es = new ThreadPoolExecutor(3, 5, 6,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    // 2. Runnable任务交给任务线程池处理
    Runnable target = new MyRunnable();
    // 三个Runnable任务会被核心线程执行
    es.execute(target);
    es.execute(target);
    es.execute(target);

    // 三个核心线程被占满, 会进入任务队列排队等待有空闲的核心线程(我们设置的任务队列最大允许排队5个任务)
    es.execute(target);
    es.execute(target);
    es.execute(target);
    es.execute(target);
    es.execute(target);

    // 三个核心线程都在忙, 任务队列满, 创建临时线程
    es.execute(target);
    es.execute(target);

    // 后续任务拒绝任务会被拒绝
    es.execute(target);
}
线程池处理Callable任务

ExecutorService的常用方法:

方法名称说明
submit(Callable<T> task)执行Callable任务,返回未来任务对象(FutureTask)获取线程结果
shutdown()等任务执行完毕后关闭线程池
shutdownNow()立刻关闭,停止正在执行的任务,并返回队列中未执行的任务

注意: submit方法返回的是Future对象, Future对象是FutureTask对象继承的父类

演示代码:

定义一个Callable任务类, 用于计算1到n的和返回

import java.util.concurrent.Callable;

public class MyCallable implements Callable<String> {
    private int n;
    public MyCallable(int n) {
        this.n = n;
    }

    @Override
    public String call() throws Exception {
        int sum = 0;
        for (int i = 1; i <= n ; i++) {
            sum += i;
        }
        // 为了测试, 让每一个线程任务睡眠3秒
        System.out.println(Thread.currentThread().getName() + "线程进入睡眠");
        Thread.sleep(3000);
        System.out.println(Thread.currentThread().getName() + "线程执行完成");
        return Thread.currentThread().getName() + "执行1-" + n + "结果是: " + sum;
    }
}

线程池处理Callable任务时的细节和处理Callable的一样, 这里不再一一赘述, 代码如下

public static void main(String[] args) {
    // 1. 创建一个线程池对象 核心线程为5, 最大线程数5, 临时线程存活6秒, 任务队列最大为5
    ExecutorService pool = new ThreadPoolExecutor(3, 5, 6,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    // 2. 创建Callable任务, 调用submit方法, 交给任务线程池处理, 返回未来线程对象
    // 三个Runnable任务会被核心线程执行
    Future<String> f1 = pool.submit(new MyCallable(10));
    Future<String> f2 = pool.submit(new MyCallable(20));
    Future<String> f3 = pool.submit(new MyCallable(30));

    // 三个核心线程被占满, 会进入任务队列排队等待有空闲的核心线程(我们设置的任务队列最大允许排队5个任务)
    Future<String> f4 = pool.submit(new MyCallable(30));
    Future<String> f5 = pool.submit(new MyCallable(40));
    Future<String> f6 = pool.submit(new MyCallable(50));
    Future<String> f7 = pool.submit(new MyCallable(40));
    Future<String> f8 = pool.submit(new MyCallable(50));

    // 三个核心线程都在忙, 任务队列满, 创建临时线程
    Future<String> f9 = pool.submit(new MyCallable(80));
    Future<String> f10 = pool.submit(new MyCallable(60));

    // 后续任务拒绝任务会被拒绝
    Future<String> f11 = pool.submit(new MyCallable(30));

    // 3. 互获取未来线程的返回结果
    try { // 正常的结果
        System.out.println(f1.get());
        System.out.println(f2.get());
        System.out.println(f3.get());
        System.out.println(f4.get());
        System.out.println(f5.get());
        System.out.println(f6.get());
        System.out.println(f7.get());
        System.out.println(f8.get());
        System.out.println(f9.get());
        System.out.println(f10.get());
        System.out.println(f11.get());
    } catch (Exception e) { // 异常的结果
        e.printStackTrace();
    }
}

方式二: Executors工具类创建线程池

文本一二个章节已阐述

Executors使用可能存在的陷阱

大型并发系统环境中使用Executors如果不注意可能会出现系统风险。

方法名称存在问题
ExecutorService newCachedThreadPool()创建的线程数量最大上限是无穷大(Integer.MAX_VALUE), 线程数可能会随着任务1:1增长,也可能出现OOM内存溢出错误 ( java.lang.OutOfMemoryError )
ExecutorService newFixedThreadPool(int nThreads)允许创建的线程是固定的, 但是线程允许请求的任务队列长度是无穷大的(Integer.MAX_VALUE),可能出现OOM内存溢出错误(java.lang.OutOfMemoryError )
ScheduledExecutorService newScheduledThreadPool(int corePoolSize)创建的线程数量最大上限是无穷大(Integer.MAX_VALUE), 线程数可能会随着任务1:1增长,也可能出现OOM内存溢出错误 ( java.lang.OutOfMemoryError )
ExecutorService newSingleThreadExecutor()允许创建的线程是固定的, 允许请求的任务队列长度是无穷大的(Integer.MAX_VALUE),可能出现OOM内存溢出错误(java.lang.OutOfMemoryError )

阿里巴巴开发手册中明确说明, 不允许Executors创建线程池

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1185889.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Leetcode 第 369 场周赛题解

Leetcode 第 369 场周赛题解 Leetcode 第 369 场周赛题解题目1&#xff1a;2917. 找出数组中的 K-or 值思路代码复杂度分析 题目2&#xff1a;2918. 数组的最小相等和思路代码复杂度分析 题目3&#xff1a;2919. 使数组变美的最小增量运算数思路代码复杂度分析 题目4&#xff1…

合并两个有序链表OJ

合并两个有序链表OJ 文章目录 合并两个有序链表OJ一、题目及要求二、思路分析三、代码实现 一、题目及要求 二、思路分析 其次&#xff0c;题目里说了新链表是通过拼接原来的结点形成的&#xff0c;所以说我们不需要开辟新的空间。 三、代码实现 if (list1 NULL) {return li…

在字节4年,一个27岁女软件测试工程师的心路历程

个人经验分享 简单的先说一下&#xff0c;坐标深圳&#xff0c;18届本科毕业&#xff0c;算上在字节的面试&#xff0c;一共有面试了5家公司&#xff08;不想请假&#xff0c;所以只是每个晚上去其他公司面试&#xff0c;面试的公司就比较少&#xff09; 其中面试成功的有3家&…

vue基础知识十八:说说你对keep-alive的理解是什么?

一、Keep-alive 是什么 keep-alive是vue中的内置组件&#xff0c;能在组件切换过程中将状态保留在内存中&#xff0c;防止重复渲染DOM keep-alive 包裹动态组件时&#xff0c;会缓存不活动的组件实例&#xff0c;而不是销毁它们 keep-alive可以设置以下props属性&#xff1a…

js各种简单事件处理(整理)

**## 获取当天昨天日期** // 当天日期 const today new Date();// 格式化当天日期为 YYYY-MM-DD 格式 const formattedToday today.toISOString().slice(0, 10);// 昨天日期 const yesterday new Date(); yesterday.setDate(yesterday.getDate() - 1);// 格式化昨天日期为 Y…

2023年11月在线IDE流行度最新排名

点击查看最新在线IDE流行度最新排名&#xff08;每月更新&#xff09; 2023年11月在线IDE流行度最新排名 TOP 在线IDE排名是通过分析在线ide名称在谷歌上被搜索的频率而创建的 在线IDE被搜索的次数越多&#xff0c;人们就会认为它越受欢迎。原始数据来自谷歌Trends 如果您相…

广和通5G模组FM650助力阿里云打造无影魔方Pro

随着云基础设施的完善及云电脑体验的不断优化&#xff0c;越来越多的个人和企业选择无影云电脑进行办公。基于云原生的云网端技术架构&#xff0c;无影云电脑相比传统PC&#xff0c;具有弹性、安全、保障个人数据等产品优势。 10月31日&#xff0c;阿里云在杭州云栖大会上宣布…

易货:一种古老而新颖的交易方式

在当今快速发展的经济环境中&#xff0c;易货模式正逐渐引起人们的关注。这种古老而新颖的交易方式&#xff0c;不仅为企业提供了新的商业机会&#xff0c;还为消费者带来了更多的选择。本文将详细介绍易货模式的概念、优势以及如何实现易货交易&#xff0c;并探讨这种模式未来…

精解括号匹配问题与极致栈设计:揭开最大栈和最小栈的奥秘

目录 括号匹配问题最小栈最大栈 最大栈和最小栈是极致栈的两个重要变种。最大栈用于存储当前匹配的最大值&#xff0c;而最小栈用于存储当前匹配的最小值。 括号匹配问题 这个问题我们来看力扣20题的描述&#xff1a; 给定一个只包括 ‘(’&#xff0c;‘)’&#xff0c;‘{’…

如何修改文件的修改日期?

如何修改文件的修改日期&#xff1f;文件的修改日期指的是文件最近一次被修改的日期和时间。当文件内容被修改、编辑或更新时&#xff0c;系统会自动更新文件的修改日期。这个日期记录了文件内容的实际修改时间&#xff0c;可以帮助用户了解文件的更新情况以及文件版本的管理。…

[云原生案例2.2 ] Kubernetes的部署安装 【单master集群架构 ---- (二进制安装部署)】网络插件部分

文章目录 1. Kubernetes的网络类别2. Kubernetes的接口类型3. CNI网络插件 ---- Flannel的介绍及部署3.1 简介3.2 flannel的三种模式3.3 flannel的UDP模式工作原理3.4 flannel的VXLAN模式工作原理3.5 Flannel CNI 网络插件部署3.5.1 上传flannel镜像文件和插件包到node节点3.5.…

2023年度API安全状况详解

随着云计算和移动应用的快速发展&#xff0c;API&#xff08;应用程序接口&#xff09;已成为不可或缺的技术组成部分。然而&#xff0c;API的广泛使用也带来了安全风险。本文将探讨2023年的API安全状况&#xff0c;并介绍了一些应对这些安全挑战的最佳实践。 引言 随着全球互联…

改进YOLOv5:结合ICCV2023|动态蛇形卷积,构建不规则目标识别网络

🔥🔥🔥 提升多尺度、不规则目标检测,创新提升 🔥🔥🔥 🔥🔥🔥 捕捉图像特征和处理复杂图像特征 🔥🔥🔥 👉👉👉: 本专栏包含大量的新设计的创新想法,包含详细的代码和说明,具备有效的创新组合,可以有效应用到改进创新当中 👉👉👉: �…

安装ubuntu-20.04.6-desktop版本、根据ISO文件制作U盘启动盘

前言 本文简述&#xff0c;安装Ubuntu20.04.6的过程&#xff0c;包括制作U盘启动盘、安装。 下载Ubuntu镜像 去官网下载桌面版ubuntu-20.04.6镜像&#xff0c;下载完后文件名是ubuntu-20.04.6-desktop-amd64.iso&#xff0c;这里有个问题amd64.iso能安装在intel处理器的电脑…

计算机网络学习笔记(五):运输层(待更新)

5.1 概述 5.1.1 TCP协议的应用场景 TCP为应用层协议提供可靠传输&#xff0c;发送端按顺序发送&#xff0c;接收端按顺序接收&#xff0c;其间发送丢包、乱序&#xff0c;TCP负责重传和排序。下面是TCP的应用场景。 多次交互&#xff1a;客户端程序和服务端程序需要多次交互才…

ai实景直播矩阵式引流---技术开发搭建(剪辑、矩阵、直播)

目前我们的短视频矩阵剪辑分发系统更新&#xff1a; 无人直播更新&#xff1a; 1、新增文案引流&#xff1a;已接入混元数据大模型&#xff0c;千帆数据大模型&#xff0c;星火数据大模型&#xff0c;盘古数据大模型&#xff0c;通义数据大模型&#xff0c;ChatGPT数据大模型…

muduo源码剖析之TimerQueue类

简介 TimerQueue ​ 通过timerfd实现的定时器功能&#xff0c;为EventLoop扩展了一系列runAt&#xff0c;runEvery&#xff0c;runEvery等函数TimerQueue中通过std::set维护所有的Timer&#xff0c;也可以使用优先队列实现 muduo的TimerQueue是基于timerfd_create实现&#…

Jupyter Notebook 内核似乎挂掉了,它很快将自动重启

报错原因&#xff1a; OMP: Error #15: Initializing libiomp5md.dll, but found libiomp5md.dll already initialized. OMP: Hint This means that multiple copies of the OpenMP runtime have been linked into the program. That is dangerous, since it can degrade perfo…

AD教程 (十)Value值的核对

AD教程 &#xff08;十&#xff09;Value值的核对 填写器件位号 直接根据原理图的原始编号进行更改 通过位号编辑器快速更改 点击工具&#xff0c;选择标注&#xff0c;选择原理图标注&#xff0c;进入位号编辑器 可以在位号编辑器中 设置处理顺序&#xff0c;从上往下还是从…

推荐一款网络拓扑自动扫描工具

topology-scanner Topology-Scanner是WeOps团队免费开放的一个网络拓扑自动扫描模块&#xff0c;可以自动发现网络设备的类型、网络设备之间的互联 资源下载地址&#xff1a;https://download.csdn.net/download/XMWS_IT/88510697 或 加 薇|信|号吗&#xff1a;xmws-IT 免…