源码角度看待线程池的执行流程

news2025/1/13 13:56:26

文章目录

  • 前言
  • 一、线程池的相关接口和实现类
    • 1.Executor接口
    • 2.ExecutorService接口
    • 3.AbstractExecutorService接口
    • 4.ThreadPoolExecutor 实现类
  • 二、ThreadPoolExecutor源码解析
    • 1.Worker内部类
    • 2.execute()方法
    • 3.addWorker()方法
  • 总结


前言

线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待空闲状态。如果有新的线程任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,线程池会创建一个新线程进行处理或者放入队列(工作队列)中等待。
线程池在多线程编程中扮演着重要角色,它能够管理和复用线程,提高并发执行效率。

在之前的学习中,我们知道了线程池的基本流程如下,而今天我们则用这个流程配合着源码的来重新分析线程池的执行流程。
在这里插入图片描述


一、线程池的相关接口和实现类

1.Executor接口

public interface Executor {
    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

Executor接口作为线程池技术中的顶层接口,它的作用是用来定义线程池中,用于提交并执行线程任务的核心方法:exuecte()方法。未来线程池中所有的线程任务,都将由exuecte()方法来执行。

2.ExecutorService接口

public interface ExecutorService extends Executor {
	//.....
}

ExecutorService接口继承了Executor接口,扩展了awaitTermination()、submit()、shutdown()等专门用于管理线程任务的方法。

3.AbstractExecutorService接口

public abstract class AbstractExecutorService implements ExecutorService {
	//....
}

ExecutorService接口的抽象实现类AbstractExecutorService,为不同的线程池实现类,提供submit()、invokeAll()等部分方法的公共实现。但是由于在不同线程池中的核心方法exuecte()执行策略不同,所以在AbstractExecutorService并未提供该方法的具体实现。

4.ThreadPoolExecutor 实现类

public class ThreadPoolExecutor extends AbstractExecutorService {
	//...
}

ThreadPoolExecutor实现类是AbstractExecutorService接口的的两个重要实现类之一,(ForkJoinPool是另一个),也是要掌握的关于线程池的重点区域;
ThreadPoolExecutor线程池通过Woker工作线程、BlockingQueue阻塞工作队列 以及 拒绝策略实现了一个标准的线程池;


二、ThreadPoolExecutor源码解析

在对源码进行解析之前,我们先看看官方给我们关于ThreadPoolExecutor的解析是什么:

The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
* …

这句话什么意思呢?
主池控制状态ctl是一个原子整数封装?

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

再结合ctl的实例化,我们这个发现这个ctl是一个具有原子性的整数,再往后就是告诉我们这个具有原子性的整数是由两个概念组成的:workerCount工作线程数和runState运行状态。
他是一个32位的整数,具体表示形式位:
在这里插入图片描述
所以说,它的作用就是通过位运算来存储线程池的状态和活动线程数信息

1.Worker内部类

每个Woker类的对象,都代表线程池中的一个工作线程。
Worker类是ThreadPoolExecutor类中定义的一个私有内部类,保存了每个Worker工作线程要执行的Runnable线程任务和Thread线程对象。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {	
        private static final long serialVersionUID = 6138294804551838833L;

        //用于存储当前工作线程的引用。
        final Thread thread;
		//用于存储该工作线程要执行的第一个任务。
        Runnable firstTask;
        //用于记录该工作线程已经完成的任务数量
        volatile long completedTasks;

       //初始化工作线程的状态,设置第一个任务并创建一个线程对象
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        //Runnable 接口的 run 方法,调用了 runWorker(this),将工作线程自身作为参数传递给 runWorker方法
        public void run() {
            runWorker(this);
        }
		//因为继承了AbstractQueuedSynchronizer 类,一下方法都是基于线程安全的方法
		//.....       
    }

当ThreadPoolExecutor线程池,通过exeute()方法执行1个线程任务时,会调用addWorker()方法创建一个Woker工作线程对象。并且,创建好的Worker工作线程对象,会被添加到一个HashSet workders工作线程集合,统一由线程池进行管理。
当Worker工作线程,在第一次执行完成线程任务后,这个Worker工作线程并不会销毁,而是会以循环的方式,通过线程池的getTask()方法,获取阻塞工作队列中新的Runnable线程任务,并通过当前Worker工作线程中所绑定Thread线程,完成新线程任务的执行,从而实现了线程池的中Thread线程的重复使用

2.execute()方法

ThreadPoolExecutor线程池中,会通过execute(Runnable command)方法执行Runnable类型的线程任务。
在分析execute()方法之前,也来看看官方是怎么解释这个方法的:

    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */

简单来说就是将execute()方法分成三个步骤:
1.如果,工作线程的数量小于核心线程数,则通过addWorker()方法,创建新的Worker工作线程,并添加至workers工作线程集合;
2.如果一个任务可以成功排队(工作线程的数量大于核心线程数),并且线程池处于RUNNING状态,那么,线程池会将Runnable类型的线程任务,缓存至workQueue阻塞工作队列,等待某个空闲工作线程获取并执行该任务;
3.如果我们不能排队任务,那么我们尝试添加一个新的线程。如果它失败了,我们知道我们被关闭或饱和了,因此拒绝该任务。

源码:

/* 
   *@param 要提交给线程池的任务
*/
public void execute(Runnable command) {
		//如果传入的任务为空,抛出空指针异常。
        if (command == null)
            throw new NullPointerException();
        //获取当前线程池的状态和活动线程数。
        int c = ctl.get();
        //如果当前线程数小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
        	//如果可以通过addWorker方法创建一个新的工作线程来执行任务
        	//因为当前线程数小于核心线程数,所以第二个参数穿入true代表创建的是核心线程
            if (addWorker(command, true))
                return;//创建成功,直接返回。
            //创建失败,获取更新后的线程池状态
            c = ctl.get();
        }
        //如果线程池状态是运行中且工作队列能够接受新任务
        if (isRunning(c) && workQueue.offer(command)) {
        	//任务进入工作队列
        	//重新获取线程池的状态和工作线程数
            int recheck = ctl.get();
            //如果线程池不是运行状态,则删除任务
            if (! isRunning(recheck) && remove(command))//执行拒绝策略
                reject(command);
            // 如果工作线程数等于零,通过addWorker()方法检查线程池状态和工作队列
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果无法将任务添加到队列中,也无法创建新的工作线程,那么拒绝任务的执行
        else if (!addWorker(command, false))
            reject(command);
    }

3.addWorker()方法

在分析execute()方法中,发现execute()方法多次调用了addWorker()创建一个工作线程,用于执行当前线程任务。
addWorker()可以分为两个执行部分,检查线程池的状态和工作线程数量和创建并执行工作线程。
第1部分:检查线程池的状态和工作线程数量

//参数:1.传入的任务,2.是否创建核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
	// 循环检查线程池的状态,直到符合创建工作线程的条件,通过retry标签break退出
        retry:
        for (;;) {
        	//获取线程池运行状态
            int c = ctl.get();
            int rs = runStateOf(c);
            //如果线程池处于开始关闭的状态(获取线程任务为空,同时工作队列不等于空)
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                return false;
            //检查工作线程数量
            for (;;) {
            	//获取当前工作线程数
                int wc = workerCountOf(c);
                //如果工作线程数量如果超出线程池的最大容量或者核心线程数(最大线程数)
                //三元运算符表示的是当前要的是核心线程还是非核心线程
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;//不再创建新的线程
                //通过ctl对象,将当前工作线程数量+1,并通过retry标签break退出外层循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //再次获取线程池状态,检查是否发生变化
                c = ctl.get();  
                if (runStateOf(c) != rs)
                    continue retry;
                //...
            }
        }
    }

第二部分:创建并执行线程工程

		//....
		//用于判断工作线程是否启动和保存
		boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
        	//创建新工作线程,并通过线程工厂创建Thread线程
            w = new Worker(firstTask);
            //获取新工作线程的Thread线程对象,用于启动真正的线程
            final Thread t = w.thread;   
            if (t != null) {
            	//获取线程池的reentrantLock主锁,保证线程安全
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //检查线程池运行状态
                    int rs = runStateOf(ctl.get());
                    //如果线程池状态小于关闭状态或者线程池状态为关闭且没有初始任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //如果工作线程已经在运行(存活),抛出非法线程状态异常。
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //保存工作线程
                        workers.add(w);
                        //记录线程池的最大工作线程数
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //正式启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
        	//如果工作线程没有成功启动,则调用添加失败的方法
            if (! workerStarted)
                addWorkerFailed(w);
        }
        //返回线程启动状态
        return workerStarted;

总结

execute()方法是ThreadPoolExecutor线程池执行的开始,它完整实现了Executor接口定义execute()方法,这个方法作用是执行一个Runnable类型的线程任务。整体的执行流程是:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/968797.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RT-Thread 内核移植

内核移植 内核移植就是将RTT内核在不同的芯片架构、不同的板卡上运行起来&#xff0c;能够具备线程管理和调度&#xff0c;内存管理&#xff0c;线程间同步等功能。 移植可分为CPU架构移植和BSP&#xff08;Board support package&#xff0c;板级支持包&#xff09;移植两部…

1783_CMD启动MATLAB同时执行一个脚本

全部学习汇总&#xff1a; GitHub - GreyZhang/g_matlab: MATLAB once used to be my daily tool. After many years when I go back and read my old learning notes I felt maybe I still need it in the future. So, start this repo to keep some of my old learning notes…

【数据结构】树和二叉树的概念及结构(一)

目录 一&#xff0c;树的概念及结构 1&#xff0c;树的定义 2&#xff0c;树结点的分类及关系 3&#xff0c;树的表示 二&#xff0c;二叉树的概念及结构 1&#xff0c;二叉树的定义 2&#xff0c;特殊的二叉树 3&#xff0c;二叉树的性质 4&#xff0c;二叉树的存储结构 1&…

Unity中Shader 纹理属性 Tilling(缩放度) 和 Offset(偏移度)

文章目录 前言一、Tilling(缩放度)&#xff0c;个人理解有点像减小周期函数的周期的效果&#xff08;在单位空间内&#xff0c;容得下重复的函数图像的多少&#xff09;二、Offset&#xff08;偏移度&#xff09;&#xff0c;个人理解是函数的平移三、在Shader中使用 Tilling 和…

如何批量查询所有德邦快递的物流信息

当我们需要查询多个德邦快递的物流信息时&#xff0c;我们可以使用固乔快递查询助手来批量查询。以下是具体的操作步骤&#xff1a; 1. 在浏览器中搜索并下载【固乔快递查询助手】软件。这款软件支持多种快递公司&#xff0c;包括德邦快递&#xff0c;而且可以批量查询物流信息…

洞发现-APP应用之漏洞探针利用修复(44)

主要分为三个部分&#xff0c;第一部分抓包是很重要的&#xff0c;第二部分是协议&#xff0c;第三部分是逆向&#xff08;讲的不会太多&#xff0c;介绍根据使用不介绍原理&#xff09;&#xff0c; 关于反编译&#xff0c;app就分为安卓和苹果系统&#xff0c;苹果系统的源码…

基于STM32的简易示波器设计

疫情期间闲来无事&#xff0c;正好学习STM32F407&#xff0c;因此设计、制作了简易示波器&#xff0c;以助学习。长话短说方案如下&#xff1a; &#xff08;1&#xff09;单片机&#xff0c;选择STM32F407VET6&#xff0c;采用SWD方式仿真及程序烧写。五路独立按键和两个LED指…

[国产MCU]-W801开发实例-用户报文协议(UDP)数据接收和发送

用户报文协议(UDP)数据接收和发送 文章目录 用户报文协议(UDP)数据接收和发送1、UDP简单介绍2、W801的UDP创建逻辑2.1 UDP使用步骤2.2 代码实现1、UDP简单介绍 用户数据报协议 (UDP) 是一种跨互联网使用的通信协议,用于对时间敏感的传输,例如视频播放或 DNS查找。它通过在数…

OTFS-ISAC通信最新进展

测试场景 Tx DD域帧结构导频区域 Rx DD域帧导频区域 原始星座图 信道估计及数据检测 经过MP算法后的星座图 误码率曲线

串行协议——USB驱动[基础]

多年前的学习记录&#xff0c;整理整理。 一、USB协议基础 二、Linux内核USB驱动源码分析 USB中不同类型设备使用的 设备描述符(设备类\设备子类\设备协议) 配置不同,典型的以下几种:1)HID设备: Human Input Device人工输入设备, 如鼠标\键盘\游戏手柄等.2)CDC设备: Communi…

GB28181学习(二)——注册与注销

概念 使用REGISTER方法进行注册和注销&#xff1b;注册和注销应进行认证&#xff0c;认证方式应支持数字摘要认证方式&#xff0c;高安全级别的宜支持数字证书认证&#xff1b;注册成后&#xff0c;SIP代理在注册过期时间到来之前&#xff0c;应向注册服务器进行刷新注册&…

core dump管理在linux中的前世今生

目录 一、什么是core dump&#xff1f; 二、coredump是怎么来的&#xff1f; 三、怎么限制coredump文件的产生&#xff1f; ulimit 半永久限制 永久限制 四、从源码分析如何对coredump文件的名字和路径管理 命名 管理 一些问题的答案 1、为什么新的ubuntu不能产生c…

ApplicationRunner、InitializingBean、@PostConstruct 执行顺序

概述 开发中可能会有这样的场景&#xff0c;需要在容器启动的时候执行一些内容。比如读取配置文件&#xff0c;数据库连接之类的。SpringBoot给我们提供了两个接口来帮助我们实现这种需求。两个启动加载接口分别是&#xff1a;CommandLineRunner和ApplicationRunner。Spring 提…

【高阶数据结构】红黑树 {概念及性质;红黑树节点的定义;红黑树插入操作详细解释;红黑树的验证}

红黑树 一、红黑树的概念 红黑树&#xff08;Red Black Tree&#xff09; 是一种自平衡二叉查找树&#xff0c;在每个结点上增加一个存储位表示结点的颜色&#xff0c;可以是Red或Black。 通过对任何一条从根到叶子的路径上各个结点着色方式的限制&#xff0c;红黑树确保没有…

【多线程案例】生产者消费者模型(堵塞队列)

文章目录 1. 什么是堵塞队列&#xff1f;2. 堵塞队列的方法3. 生产者消费者模型4. 自己实现堵塞队列 1. 什么是堵塞队列&#xff1f; 堵塞队列也是队列&#xff0c;故遵循先进先出的原则。但堵塞队列是一种线程安全的数据结构&#xff0c;可以避免线程安全问题&#xff0c;当队…

数学建模--时间序列预测模型的七种经典算法的Python实现

目录 1.开篇版权提示 2.时间序列介绍 3.项目数据处理 4.项目数据划分可视化 5.时间预测序列经典算法1&#xff1a;朴素法 6.时间预测序列经典算法2&#xff1a; 简单平均法 7.时间预测序列经典算法3&#xff1a;移动平均法 8.时间预测序列经典算法4&#xff1a;简单指…

pytest自动化测试两种执行环境切换的解决方案

目录 一、痛点分析 方法一&#xff1a;Hook方法pytest_addoption注册命令行参数 1、Hook方法注解 2、使用方法 方法二&#xff1a;使用插件pytest-base-url进行命令行传参 一、痛点分析 在实际企业的项目中&#xff0c;自动化测试的代码往往需要在不同的环境中进行切换&am…

windows-nessus安装

1、下载 路径&#xff1a;Download Tenable Nessus | Tenable 2、获取active code 路径&#xff1a;Tenable Nessus Essentials Vulnerability Scanner | Tenable 3、安装 challenge code:上图马赛克位置 active code:获取active code第二张图片的马赛克位置 4、激活 5、安装…

Docker从认识到实践再到底层原理(三)|Docker在Centos7环境下的安装和配置

前言 那么这里博主先安利一些干货满满的专栏了&#xff01; 首先是博主的高质量博客的汇总&#xff0c;这个专栏里面的博客&#xff0c;都是博主最最用心写的一部分&#xff0c;干货满满&#xff0c;希望对大家有帮助。 高质量博客汇总 然后就是博主最近最花时间的一个专栏…

企业架构LNMP学习笔记10

1、Nginx版本&#xff0c;在实际的业务场景中&#xff0c;需要使用软件新版本的功能、特性。就需要对原有软件进行升级或重装系统。 Nginx的版本需要升级迭代。那么如何进行升级呢&#xff1f;线上服务器如何升级&#xff0c;我们选择稳定版本。 从nginx的1.14版本升级到ngin…