[Java] 如何理解和设置ThreadPoolExecutor三大核心属性?什么情况下工作线程数会突破核心线程数?任务拒绝策略都有哪些?

news2024/9/25 11:21:19

文章目录

  • 前言
  • ThreadPoolExecutor类是什么?
  • ThreadPoolExecutor的三大核心属性
    • 1. 核心线程数(corePoolSize)属性
    • 2. 任务队列(workQueue)属性
    • 3. 最大线程数(maximumPoolSize)属性
    • 总结:ThreadPoolExecutor执行任务的流程
  • 任务拒绝策略
    • 自定义任务拒绝策略
  • 附录
    • 样例代码1:allowCoreThreadTimeOut导致应用退出
    • 样例代码2:扩展非核心线程逻辑导致的插队问题
  • 结语

前言


ThreadPoolExecutor类是JDK提供给开发者的一个比较常用的多线程任务执行器。也因为比较常用,所以笔者将用本文一文去结合源码梳理掌握ThreadPoolExecutor类的几个核心知识点,相信看过本文之后,你将会更好地掌握ThreadPoolExecutor类。

ThreadPoolExecutor类是什么?


首先要明确地是ThreadPoolExecutor不是线程池(ThreadPool),而是一个基于线程池的任务执行器服务(Executor Service)主要作用是帮助开发者去利用多个线程去异步地并行执行多个计算任务

虽然ThreadPoolExecutor在其名称中省略了Service,但ThreadPoolExecutor是AbstractExecutorService的子类,是个货真价实的ExecutorService。

ThreadPoolExecutor继承树

服务(Service)有诸多定义,要精准描述也比较难。这里的ExecutorService不同于我们Spring里面的Service层,而是更类似于我们操作系统或者说企业级系统层面的服务(进程),这类长时间运行地服务的特点就是有启动(start up)关闭(shut down) 的概念。而ThreadPoolExecutor也是如此,会在我们应用(进程)内部开启一个常驻服务用于接收计算任务(compute task)去执行(execute),当开发者想要退出应用时,要记得主动关闭已开启的ThreadPoolExecutor,否则你的应用不会主动退出,这是初学者使用ThreadPoolExecutor时比较容易遇到的一个问题。

你可以在下面的源码摘要里看到,ExecutorService接口所定义的shutdown相关的接口。

public class ThreadPoolExecutor extends AbstractExecutorService { /* 略 */ }
public abstract class AbstractExecutorService implements ExecutorService { /* 略 */ }
public interface ExecutorService extends Executor { 
	void shutdown();
	List<Runnable> shutdownNow();
	boolean isShutdown();
	boolean isTerminated();
	/* 略 */
}

ThreadPoolExecutor的三大核心属性


ThreadPoolExecutor作为一个工具类,其开发者为我们提供了诸多控制其行为的属性,本章会讲解其中最重要的三个核心属性,你通常会在构造器看到这些属性。

  1. 核心线程数(corePoolSize)
  2. 任务队列(workQueue)
  3. 最大线程数(maximumPoolSize)

1. 核心线程数(corePoolSize)属性

核心线程数(corePoolSize)这个属性是用于指示ThreadPoolExecutor设置常驻(核心)工作线程的个数。意味着任务数量在比较平稳情况下,最多有corePoolSize个线程用于执行任务。

在ThreadPoolExecutor创建之初,不会立即创建corePoolSize个线程当做核心工作线程。会在前corePoolSize个任务被要求执行时,一个一个被创建,直到创建满corePoolSize个核心工作线程。源码摘要如下:

ThreadPoolExecutor添加核心线程的源码摘要

默认设置下,核心工作线程一旦被创建,即使没有新任务执行也会一直存在。不过如果你通过allowCoreThreadTimeOut(boolean)方法,设置允许核心线程的超时回收特性的话,核心工作线程的数量在没有任务执行时会被逐步回收,如果没有其他运行中的线程保护,核心线程归零时会导致应用直接退出

ThreadPoolExecutor allowCoreThreadTimeOut(boolean) 方法
如果想尝试导致应用退出的,你可以在文末《样例代码1:allowCoreThreadTimeOut导致应用退出》章节看到相关样例。

2. 任务队列(workQueue)属性

任务队列(workQueue)属性,不难看出其是用于存放待处理(积压中)的任务的一个属性。也就是说假设我们有5个核心工作线程,那么同一时间只能并行处理5个任务,新到来的任务就需要被存储在某个地方管理起来,这个地方就是我们的任务队列(workQueue)属性了。

ThreadPoolExecutor workQueue 属性
根据ThreadPoolExecutor的定义呢,workQueue是一个BlockingQueue(阻塞队列),选阻塞队列呢,也是因为当各个工作线程去workQueue获取不到接下来要执行的任务时,能够方便地阻塞工作线程直到获取到新任务。

任务队列通常是有界的,这意味着 能够积压在任务队列里的任务数量是有上限的当积压任务达到上限,这意味着任务过于繁重,当前工作线程数量不足以支持消化如此数量的任务,因此需要寻求额外的线程资源去做计算。

此时ThreadPoolExecutor就会尝试去增加新的非核心工作线程(worker thread),此种情况下工作线程数就会突破核心线程数。与核心线程的增加一样,每一次积压任务达到上限值仅会触发一次新增工作线程的处理。源码摘要和解释如下:

  1. 任务积压达到上限 workQueue.offer(command)返回 false。
  2. 触发一次尝试增加工作线程的处理 addWork(command, false),如果失败返回false则会触发任务拒绝reject(command),任务拒绝这个我们放到后面讲。

ThreadPoolExecutor Execute摘要

3. 最大线程数(maximumPoolSize)属性

线程是计算机中珍贵的计算资源,不能无限制申请。ThreadPoolExecutor的开发者也为我们提供了相关的配置属性,那就是本章节要讲的最大线程数(maximumPoolSize)属性

这个属性用于指示ThreadPoolExecutor至多创建多少个线程用于执行任务。前面一章我们将任务队列属性时提到了任务过渡积压导致爆仓时,会触发 尝试新增工作线程的处理逻辑(上图的addWorker(command, false)),这个尝试会在工作线程数量达到上限(maximumPoolSize)时判定失败而返回false,即无法再新增非核心工作线程。源码摘要如下:

addWorker 方法摘要

总结:ThreadPoolExecutor执行任务的流程

  1. 多个线程可以向同一个ThreadPoolExecutor并发地提交新任务。
  2. 核心工作线程数小于corePoolSize时,ThreadPoolExecutor会新增一个核心工作线程直接执行被提交的任务,并退出。否则转到3。
  3. 任务队列workQueue还有剩余空间时,把被提交的任务压入任务队列,等待未来某一时刻被工作线程取走处理。并退出,否则转到4。
  4. 工作线程总数小于最大线程数(maximumPoolSize)时,新增一个非核心工作线程直接执行被提交的任务。并退出,否则转到5。
  5. 由于计算任务过于繁重,ThreadPoolExecutor会拒绝执行任务。被拒绝的任务何去何从(如何处理)是可以配置的。这个在后面章节任务拒绝策略会讲。

需要补充一点的是,相信看得仔细的读者会发现在 4的处理时,会直接新增一个非核心工作线程去执被提交的任务,这里会有个任务插队的问题,因为你的workQueue里面还是满任务的情况下就先处理了后提交的任务。你可以在文末《样例代码2:扩展非核心线程逻辑导致的插队问题》看到相关测试代码。

任务拒绝策略


对于ThreadPoolExecutor来说,任务过于繁重以至于无法处理时,会因任务无法及时被消化而积压起来,导致爆仓(工作线程全开的情况下workQueue爆满)。这个时候如何处理就成了问题。不过问题不大,ThreadPoolExecutor为我们提供了下图四种预制策略(RejectedExecutionHandler接口四种实现)去应对。

ThreadPoolExecutor RejectedExecutionHandler

主要内容参考下表:

拒绝策略说明
AbortPolicy通过抛出RejectedExecutionException这个RuntimeException子类的方式来告知调用者出错。
CallerRunsPolicy这个任务拒绝策略会当前调用者的线程上直接执行计算任务,直接 征用当前提交异步任务的生产者线程来充当worker线程,这会降低任务的生产效率缓解ThreadPoolExecutor所有worker线程的压力。
DiscardOldestPolicy这个策略会把workQueue中第一个任务给废弃掉,会一直重新尝试提交该任务提交直至成功。会保证最新的任务会进到任务队列,但不保证会被执行。
DiscardPolicy朴实无华的拒绝策略,啥也不干,丢弃这个任务,直接开摆,实现代码部分都是0行。

自定义任务拒绝策略

主章节中我们提到的四种拒绝策略都是 预制(Predefined) 的,源码中也很清晰地能看到Predefined RejectedExecutionHandlers这个注释:

Predefined RejectedExecutionHandlers

那么可以不可以自己实现一个RejectedExecutionHandler呢?答案自然是可以的。比如我们可以自定义一个带重试的任务拒绝策略。

public static class RetryThenDiscardPolicy implements RejectedExecutionHandler {
    
    final int retryCount;
    /** 
     * RejectedExecutionHandler代码的执行是在任务生产者线程。
     * 任务生产者线程可能是多线程的,所以需要用到线程安全的ConcurrentHashMap。
     */
    final ConcurrentHashMap<Runnable, Integer> map;
    
    public RetryThenDiscardPolicy(int retryCount) { 
        if (retryCount <= 1) throw new IllegalArgumentException("无意义的retryCount");
        this.retryCount = retryCount;
        map = new ConcurrentHashMap<>();
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        
        if (!map.containsKey(r)) {
            final Runnable wrapperR = RunnableWrapper.wrapRunnable(r, this); 
            map.put(wrapperR, retryCount);
            e.execute(wrapperR);
        } else if (map.get(r) > 0) {
            System.out.println("重试任务:" + r);
            map.computeIfPresent(r, (key, cnt) -> --cnt);         
            e.execute(r);
        } else {
            // 重试次数耗尽,丢弃。
            System.out.println("丢弃任务:" + r);
            map.remove(r);
            return;
        }
    }
}

public static class RunnableWrapper implements Runnable {

    final Runnable r;
    final RetryThenDiscardPolicy policy;

    RunnableWrapper(Runnable r, RetryThenDiscardPolicy policy) { 
        this.r = r; 
        this.policy = policy;
    }

    @Override
    public void run() {
        r.run();
        policy.map.remove(this);
    }
    
    public static RunnableWrapper wrapRunnable(Runnable r, RetryThenDiscardPolicy policy) {
        Objects.nonNull(r);
        Objects.nonNull(policy);
        final RunnableWrapper wrapper = new RunnableWrapper(r, policy);
        return wrapper;
    }
}

简单执行一下可以看到命令行的输出。

命令行输出

附录


样例代码1:allowCoreThreadTimeOut导致应用退出

/**
 * <p>Case - 核心线程归零时会导致应用直接退出</p>
 * <pre>
 *  不过如果你通过allowCoreThreadTimeOut(boolean)方法,设置允许超时回收的话,
 *  核心工作线程的数量在没有任务执行时逐步被回收,如果没有其他运行中的线程保护,核心线程归零时会导致应用直接退出。
 * </pre>
 */
private static void testSetAllowCoreThreadTimeOut2TrueCauseAppExit() {
    final ThreadPoolExecutor tpExecutor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, 
        new ArrayBlockingQueue<>(10_0000)); 
    tpExecutor.allowCoreThreadTimeOut(true);
    tpExecutor.execute(() -> { System.out.println(Thread.currentThread().getName()); });
}

样例代码2:扩展非核心线程逻辑导致的插队问题

/**
 * <p>Case - 扩展非核心线程逻辑导致的插队问题</p>
 * <pre>
 *  在workQueue满任务时,新提交的任务会直接交予新增非核心线程执行,导致插队。
 * </pre>
 */    
private static void testCutInLine() {
    final ThreadPoolExecutor tpExecutor = new ThreadPoolExecutor(
        5, 
        10, 
        10, TimeUnit.SECONDS, 
        new ArrayBlockingQueue<>(5)); 

    for (int i = 0; i < (5 + 5 + 1); i++) {
        final int idx = i;
        tpExecutor.submit(() -> {
            System.out.println("第" + idx + "个任务执行开始");
            try {
                Thread.sleep(1_000L);
            } catch (InterruptedException e) {
            }
            System.out.println("第" + idx + "个任务执行结束。");
        });
    }
}

结语


ThreadPoolExecutor是多线程编程中比较常用的一种工具类,熟练掌握其基本工作原理是非常重要的。希望通过本文你能了解到ThreadPoolExecutor是如何去把一个任务添加到其内部管理起来、同时在任务积压时又有哪几种基本的任务拒绝策略。

我是虎猫,希望本文对你有帮助。(=・ω・=)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/97778.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【开发百宝箱之猿如意使用指南】「工欲成其事,必先利其器」一文教你如何通过“猿如意”便捷的使用数据库管理工具DBeaver

开发百宝箱之猿如意使用指南欢迎您使用“猿如意”百宝箱大家科普一下什么是猿如意&#xff1f;赶快趁热下载个【猿如意】吧每个程序猿值得拥有的学习开发工作必备“良药”没有猿如意的“我们”&#xff08;猿如意帮我们解决了哪些问题&#xff1f;&#xff09;【如何快速下载自…

非零基础自学Golang 第12章 接口与类型 12.2 接口的创建与实现 12.2.1 接口创建

非零基础自学Golang 文章目录非零基础自学Golang第12章 接口与类型12.2 接口的创建与实现12.2.1 接口创建第12章 接口与类型 12.2 接口的创建与实现 Go语言接口是方法的集合&#xff0c;使用接口是实现模块化的重要方式。 下面将重点介绍如何创建和实现一个Go语言接口。 12…

Pytest用例执行的先后顺序

[内部资源] 想拿年薪30W的软件测试人员&#xff0c;这份资料必须领取~ 温馨提示 「本篇约1600字&#xff0c;看完需3-5分钟&#xff0c;学习学半小时&#xff0c;加油&#xff01;」 先看普通函数运行顺序 import pytestdef test_one():print("我是清安")def tes…

React学习27(react-redux多组件共享数据)

项目结构 准备工作 1&#xff09;定义一个person组件&#xff0c;和count组件通过redux共享数据 2&#xff09;为person组件编写&#xff1a;reducer &#xff0c;action和contant常量 3&#xff09;重点&#xff1a;Person的reducer和Count的reducer要用combineReducers进行…

深度学习-优化器

优化器 文章目录优化器1. 引言1. SGD1.1 vanilla SGD1.2 SGD with Momentum1.3 SGD with Nesterov Acceleration2. AdaGrad3. RMSProp4. AdaDelta5. Adam优化器选择出处1. 引言 优化算法可以分成一阶优化和二阶优化算法&#xff0c;其中一阶优化就是指的梯度算法及其变种&#…

Linux——vi,vim的使用

三种模式 正常模式 以vi或vim打开一个档案就直接进入一般模式了(这是默认的模式)。在这个模式中&#xff0c;你可以使用 【上下左右】 按键来移动光标&#xff0c;你可以使用 【删除字符】或 【删除整行】来处理档案内容&#xff0c;也可以使用 【复制&#xff0c;粘贴】来处…

3D激光里程计其三:点云畸变补偿

3D激光里程计其三&#xff1a;点云畸变补偿1. 产生原因2. 补偿方法Reference: 深蓝学院-多传感器融合 1. 产生原因 一帧点云&#xff1a;通常指雷达内部旋转一周扫描得到的点的集合。 优点&#xff1a;有足够数量的点云才能进行匹配&#xff0c;且一周正好是周围环境的完整采…

认识Java中的异常处理

文章目录Java异常处理异常体系的介绍常见运行时异常常见编译时异常Java异常处理 异常体系的介绍 什么事异常处理? 异常是程序在“编译”或者“执行”的过程中可能出现的问题&#xff0c;比如: 数组索引越界、空指针异常、 日期格式化异常&#xff0c;等… 注意&#xff1a;语…

常用损失函数

常见损失函数 文章目录常见损失函数引言回归1. 均方差2.平均绝对误差(MAE)3. 均方根误差(RMSE)4. 交叉熵分类二分类多分类引言 无论在机器学习还是深度领域中,损失函数都是一个非常重要的知识点。损失函数&#xff08;Loss Function&#xff09;是用来估量模型的预测值 f(x) 与…

Eslint 8.23 Flat Config 新配置迁移升级指南

前言 直入正题&#xff0c;eslint 目前为止的配置文件格式&#xff08; 如 .eslintrc.js &#xff09; 存在很多无法避免的历史问题&#xff1a; 配置格式混乱&#xff0c;层层不明确的继承&#xff0c;不易理解。 插件配置时不支持实体运用&#xff08;不能传 function / ob…

学完框架后的反思—为什么要使用框架?

学习前端也有一定的时间了,最近在网上看到了一个问题让我反思了很久——为什么要使用前端框架? 我发现自己当初学习框架时并没有想那么多,只是看中了其在业界企业的应用,大部分公司开发页面基本上都是使用框架进行开发的,而最为被大厂广泛接受的就是 React 框架,所以我当…

二棕榈酰磷酯酰乙醇胺-聚乙二醇-叠氮 DPPE-PEG-N3简介,可用于药物传递、基因转染和生物分子修饰。

二棕榈酰磷酯酰乙醇胺-聚乙二醇-叠氮 DPPE-PEG-N3 中文名称&#xff1a;二棕榈酰磷酯酰乙醇胺-聚乙二醇-叠氮基 英文名称&#xff1a;DPPE-PEG-N3 英文别名&#xff1a; 1,2-dipalmitoyl-sn-glycero-3-phosphoethanolamine-PEG-Azide 分子量&#xff08;PEG&#xff09;&a…

Linux——任务调度

at定时任务 基本介绍 at命令是一次性定时计划任务&#xff0c;at地守护线程atd会以后台模式运行&#xff0c;检查作业队列来运行默认情况下&#xff0c;atd守护进程没60秒检查作业队列&#xff0c;有作业时&#xff0c;会检查作业运行时间&#xff0c;如果时间于当前时间匹配…

Dubbo 1 分布式系统中的相关概念 1.2 集群和 分布式

Dubbo 【黑马程序员Dubbo快速入门&#xff0c;Java分布式框架dubbo教程】 1 分布式系统中的相关概念 文章目录Dubbo1 分布式系统中的相关概念1.2 集群和 分布式1.2.1 集群和分布式1.2.2 集群和分布式 【互联网 中】1.2 集群和 分布式 1.2.1 集群和分布式 集群&#xff1a;很…

使用synchornized和ReentrantLock来解决并发错误

文章目录什么是并发错误&#xff1f;并发错误是如何产生的&#xff1f;演示并发错误如何解决并发错误使用synchornized解决并发错误使用ReentrantLock解决并发错误什么是并发错误&#xff1f; 多个线程共享操作同一个对象的时候&#xff0c;线程体当中连续的多行操作未必能够连…

下个文档还要马内?还好我会Python,教大家来一手强制复制粘贴

前因后果 公司有人阳了&#xff0c;今天在家上班&#xff0c;突然小姨子就问我有没有baidu文库会员&#xff0c;想下载点东西&#xff0c;我心想这还要会员&#xff1f;用Python不是分分钟的事情&#xff01; 然后我非常自信的告诉她不用会员随便下载&#xff0c;结果她顺势想…

10两级电力市场环境下计及风险的省间交易商最优购电模型

参考文章&#xff1a; 两级电力市场环境下计及风险的省间交易商最优购电模型—郭立邦&#xff08;电网技术2019&#xff09; 主要内容&#xff1a; 为进一步推动电力市场建设&#xff0c;促进电力资源大范围优化配置&#xff0c;我国正逐步建成包含省间与省内电力交易的两级…

齿轮魔方、五阶齿轮魔方

齿轮魔方 1&#xff0c;魔方三要素 &#xff08;1&#xff09;组成部件 部件和三阶魔方完全对应&#xff0c;但每个棱块的朝向不止2种&#xff0c;而是有6种。 &#xff08;2&#xff09;可执行操作 只有3种操作&#xff0c;即上下层同时旋转180度、左右180度、前后180度。…

一文助你快速理解Cookie,Session,Token的区别

目录 一、Cookie简介 1.1.cookie定义 1.2.cookie鉴权原理 1.3.cookie的分类 二、Session简介 2.1.session的定义 2.2.session会话机制 2.3.Session 的缺点 三、cookie与session区别 3.1.存储位置 3.2.安全性 3.3.占用服务器资源 3.4.存储空间 3.5.存储类型 3.6.…

编程入门宝典,刚开始学习编程新手必看的5点建议!

编程就像围城&#xff0c;城里的人想出去&#xff0c;城外的人想进来。 对于零基础的小白&#xff0c;要杀入代码的战场需要准备好哪些东西呢?最帅的萌宝在此给大家分享5点建议。 1、选择编程语言 编程首要还是选择好适合自己的语言。 编程语言有&#xff1a;C/C、java、VB、P…