线程池源码解析 3.excute() 方法

news2025/1/16 14:45:53

线程池源码解析—excute()方法

execute()

  • execute 方法是线程池的核心方法,所有的方法,包括包装的 FutureTask,都是调用这个方法。

大致流程

这里只是总结了一遍大致的流程,一些细节问题见下面的流程图或者参考源码。

  • 当提交任务时,首先判断当前线程池内的线程数是否达到了核心线程,没有达到核心线程数就开线程去执行任务,如果达到了核心线程数,就尝试将任务加入阻塞队列中。
  • 如果说队列也满了,就尝试继续开线程执行任务,如果此时线程池中的存活线程已经等于了 maximumPoolSize,那么直接走拒绝策略。没有到达最大线程,则开启线程执行任务。

流程图

execute

源码解析

   /*
    * command可以是普通的Runnable实现类,也可以是FutureTask(本质也是一个Runnable)
    */   
   public void execute(Runnable command) {
        // command不能为null
        if (command == null)
            throw new NullPointerException(); 
       	/* 
       	 * 获取内部的ctl的值赋值给c
       	 * 高3位表示线程池状态,低29位表示当前线程池的线程数量
       	 */
        int c = ctl.get();
        /*
         * workerCountOf(c)就是获取ctl的低29位,即当前线程池中存活的线程数量
         * 条件成立:表示当前线程数量小于核心线程数量,此次提交任务,直接创建一个新的worker(线程),对应线程池中多了一个新的线程
         */
        if (workerCountOf(c) < corePoolSize) {
            /*
             * addWorker(Runnable firstTask, boolean core) 为创建worker并执行任务的过程,会创建worker对象,并且将command作为firstTask
             * @param firstTask 表示当前的任务
             * @param core 表示此次创建的线程是否算到核心线程数的头上
             *        core == true 表示采用核心线程数量限制  false表示采用 maximumPoolSize
             */
            if (addWorker(command, true))
                // 创建成功后,直接返回。addWorker方法内部会启动新创建的worker,执行firstTask
                return;
            /*
             * 执行到这条语句,说明addWorker()一定失败了...
             * 有几种可能? 
             *   1.存在并发现象,execute方法是可能有多个线程同时调用的,当workerCountOf(c) < corePoolSize条件成立时,其他线程可能也成立了,
             *     并且向线程池中创建了worker,如果此时线程池中的线程数已经到达了上限等,那么就会失败。
             *   2.当前线程池状态发生了改变
             *     2.1 如果当前线程池状态是非RUNNING,addWorker(firstTask != null, true|false)一定会失败
             *     2.2 SHUTDOWN状态下,也有可能创建成功,前提是firstTask == null 并且当前队列不为空(特殊情况)
             */ 
            // 重新获取一下ctl的值
            c = ctl.get();
        }
        /*
         * 执行到这里有几种情况?
         *   1.当前线程数量已经达到了corePoolSize
         *   2.addWorker()失败
         */
       	/*
       	 * 条件成立:表示当前线程池处于running状态,则尝试将任务放到阻塞队列中
       	 */
        if (isRunning(c) && workQueue.offer(command)) {
            // 执行到这里,说明当前任务已经入队
            // 再次获取一下ctl的值
            int recheck = ctl.get();
            /*
            * 根据ctl的值 判断当前线程池处于非RUNNING状态,则尝试从队列中移除任务
            * 条件1:! isRunning(recheck) 成立:说明你提交到队列之后,线程池状态被外部线程给修改 比如:shutdown() shutdownNow()
            *        这种情况 需要把刚刚提交的任务删除掉。
            * 条件2:remove(command) 移除当前任务,有可能成功,也有可能失败
            *        成功:提交之后,线程池中的线程还未消费(处理)
            *        失败:提交之后,在shutdown() shutdownNow()之前,就被线程池中的线程 给处理了。
            */
            if (!isRunning(recheck) && remove(command))
                // 任务移除成功,走拒绝策略
                reject(command); 
            /*
             * 有几种情况会到这里?
             *   1.当前线程池是running状态(这个概率最大)
             *   2.线程池状态是非running状态 但是remove提交的任务失败
             *
             * 这里是一个担保机制,担保线程池是running状态,
             * 但是如果线程池中存活的线程是0的话,就会很尴尬,这里就调用addWorker()开一个线程去执行任务
             * 保证线程池在running状态下,最起码得有一个线程在工作。
             */
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        /*
         * 执行到这里有几种情况?
         *   1.当前线程池是非running状态
         *   2.任务入队失败
         * 
         * 1.如果线程池处于非running状态,调用addWorker()一定会失败,走拒绝策略
         * 2.如果说队列满了,这个时候如果当前线程数量没有达到maximumPoolSize,会创建新的worker直接执行任务(救急线程)
         *   如果达到了最大线程数,addWorker()就会失败,走拒绝策略
         */ 
        else if (!addWorker(command, false))
            // 走拒绝策略
            reject(command);
    }

addWorker()

总体流程分析

先判断当前线程池状态是否允许继续添加任务,允许添加的话,CAS操作ctl的值尝试+1,CAS 成功然后传入任务通过 new Worker() 的方式构造一个 Worker,然后加锁尝试将 worker 对象放入线程池,如果加入线程池成功,释放锁后就调用 start() 执行任务。如果任务最终没有被执行,就需要执行后续的清理逻辑。

源码解析

   /*
    * @param firstTask可以为null,表示启动worker之后,worker自动从队列中获取任务,如果不是null,worker优先执行firstTask(内部任务)
    * @param core 采用的线程数限制,如果为true,采用核心线程数限制,false采用maximumPoolSize线程数限制 (传入的参数一般都是false)
    * @return boolean 表示任务是否已经启动
    * 返回值总结:true表示创建worker成功且线程启动成功,加入到set集合中
    *            false: ① 线程池状态大于SHUTDOWN时,一定会失败
    *                    ② 当前状态 = SHUTDOWN 但是队列中已经没有任务了 或 当前状态是SHUTDOWN且队列未空,但是firstTask不为null
    *                    ③ 当前线程池已经达到了指定指标(core或max)
    *                    ④ ThreadFactory创建的线程是null
    */
   private boolean addWorker(Runnable firstTask, boolean core) {   
        // 自旋 判断当前线程池状态是否允许创建线程
        retry: for (;;) {
            // 获取当前的ctl值保存到c中
            int c = ctl.get();
            // rs:当前线程池的运行状态
            int rs = runStateOf(c);
            /*
             * 这个判断主要是为了判断当前线程池是SHUTDOWN状态,但是队列里还有任务尚未处理完,这个时候是允许添加worker(线程)的,
             * 但是不允许再次提交task
             */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null  &&
                   ! workQueue.isEmpty()))
                return false;
			
            // 上面的代码,就是判断当前线程池的状态是否允许继续添加线程
            
            // 内部自旋 获取创建线程令牌的过程
            for (;;) {
                // 获取当前线程池中的线程数量
                int wc = workerCountOf(c);
                /*
                 * 条件1:wc >= CAPACITY 永远不成立,因为CAPACITY是一个5亿多大的数字
                 * 条件2:根据传来的core参数判断当前线程数是否大于等于核心线程数或者是最大线程数,大于等于的话直接return false
                 */
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    // 执行到这里,说明当前无法添加线程了,已经达到指定限制了
                    return false;
                /*
                 * 尝试使用CAS方式更新ctl,将线程数量+1,成功,相当于申请到了一块令牌
                 * 成功:说明更新成功,结束外层自旋
                 * 失败:说明有竞争,其他线程已经修改了ctl的值 或者外部线程可能调用了
                 * shutdown或者shutdownNow(),导致线程池状态发生变化,CAS也会失败
                 */ 
                if (compareAndIncrementWorkerCount(c))
                    // 走到这,一定是CAS成功,申请到令牌了。结束外层自旋,继续向下执行外部代码(创建Worker)
                    break retry;
                // CAS失败,获取最新的ctl值
                c = ctl.get();  
                // 判断当前线程池状态是否发生过变化(如果外部在这之前调用过shutdown/shutdownNow会导致状态变化),如果变化了,继续自旋
                if (runStateOf(c) != rs)
					// 状态发生变化后,直接返回到外层循环,外层循环负责判断当前线程池状态 和 是否允许创建线程
                    continue retry;
            }
        }
        // 表示创建的worker是否已经启动
        boolean workerStarted = false;
        // 表示创建的worker是否添加到了workers(HashSet)中
        boolean workerAdded = false;
        // w表示后面创建worker的一个引用
        Worker w = null;
        try {
            // 创建Worker,执行完后,线程“应该”已经准备好了
            w = new Worker(firstTask); 
            // 将新创建的worker节点的线程 赋值给t
            final Thread t = w.thread;
            /*
             * 为什么要做 t != null这个判断?
             * 为了防止ThreadFactory实现类有bug,因为ThreadFactory是一个接口
             */
            if (t != null) {
                // 将全局锁的引用保存到mainLock中
                final ReentrantLock mainLock = this.mainLock;
                /*
                 * 尝试加锁,可能会阻塞,直到获取成功为止,同一时刻操纵线程池内部的相关操作,都必须持锁
                 */
                mainLock.lock();  
         		// 只要走到了这里,说明当前线程已经加锁成功,其他线程是无法修改当前线程池状态的
                try {
                    // 获取线程池运行状态保存到rs中
                    int rs = runStateOf(ctl.get());
                    /*
                     * 条件1:判断当前线程池的状态是否正常(RUNNING)
                     * 条件2:判断当前线程池为SHUTDOWN状态并且firstTask为null,其实就是判断是否是SHUTDOWN状态下的特殊情况(队列里还有任务)
                     */
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 这里防止程序员自定义ThreadFactory实现类时,创建线程返回给外部之前,将线程start了..
                        if (t.isAlive()) 
                            // 抛出异常
                            throw new IllegalThreadStateException();
                        // 将创建的worker添加到线程集合(就是线程池)中
                        workers.add(w);
                        // 获取最新线程池线程的数量
                        int s = workers.size(); 
                        // 条件成立,更新largestPoolSize 
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 更新标志位,表示线程已经追加到线程池中了
                        workerAdded = true;
                    }
                } finally {
                    // 释放全局锁
                    mainLock.unlock();
                }
                /*
                 * 条件成立:workerAdded为true,表示添加worker成功
                 * 条件不成立:表示线程池在lock之前,状态发生了变化,导致添加线程失败
                 */
                if (workerAdded) {
                    // 成功后,将线程启动
                    t.start();
                    // 启动标记设置为true
                    workerStarted = true;
                }
            }
        } finally {
            // 判断启动标记
            if (!workerStarted)
				// worker启动失败,需要做清理工作
                addWorkerFailed(w);
        }
        // 返回新创建的线程是否启动 true|false
        return workerStarted;
    }

addWorkerFaild()

    /*
     * 在addWorker()方法的内部自旋中,我们使用 compareAndIncrementWorkerCount(c) 将线程数+1了,
     * 但执行到这个方法说明我们创建线程失败了,所以需要进行清理工作,将ctl的值-1
     */
	private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        // 加锁
        mainLock.lock();
        try {
            if (w != null)
                // 从workers中移除失败的worker
                workers.remove(w);
            // 使用CAS + 自旋将ctl的值-1
            decrementWorkerCount();
            // 见后续SHUTDOWN()
            tryTerminate();
        } finally {
            // 释放锁
            mainLock.unlock();
        }
    }

参考

  • 视频参考
    • b站_小刘讲源码付费课
  • 文章参考
    • shstart7_线程池源码解析3.execute()方法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/12380.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【毕业设计】大数据电商销售预测分析 - python 数据分析

文章目录1 前言2 开始分析2.1 数据特征2.2 各项投入与销售额之间的关系2.3 建立销售额的预测模型3 最后1 前言 &#x1f525; Hi&#xff0c;大家好&#xff0c;这里是丹成学长的毕设系列文章&#xff01; &#x1f525; 对毕设有任何疑问都可以问学长哦! 这两年开始&#x…

Java8新特性 CompletableFuture

Java8新特性 CompletableFuture 什么是CompletableFuture&#xff1f; CompletableFuture类的设计灵感来自于 Google Guava 的 ListenableFuture 类&#xff0c;它实现了 Future 和 CompletionStage 接口并且新增了许多方法&#xff0c;它支持 lambda表达式&#xff0c;通过回…

【IDEA插件】这5款IDEA插件,堪称代码BUG检查神器!

随着业务的发展&#xff0c;系统会越来越庞大&#xff0c;原本简单稳定的功能&#xff0c;可能在不断迭代后复杂度上升&#xff0c;潜在的风险也随之暴露&#xff0c;导致最终服务不稳定&#xff0c;造成业务价值的损失。而为了减少这种情况&#xff0c;其中一种比较好的方式就…

5.盒子阴影(重点)

提示&#xff1a;css3中新增了盒子阴影&#xff0c;我们可以使用box-shadow属性为盒子添加阴影。 1、语法&#xff1a; div{ box-shadow:"h-shadow"或者“v-shadow” } 解释&#xff1a; h-shadow 必须&#xff0c;水平阴影位置&#xff0c;允许负值。 v-shado…

UE4 回合游戏项目 18- 退出战斗

在上一篇&#xff08;UE4 回合游戏项目 17- 进入指定区域触发战斗事件&#xff09;基础上完成击败敌人从而退出战斗的功能。 效果&#xff1a; 步骤&#xff1a; 1.打开“battleScenario”蓝图&#xff0c;添加一个自定义事件&#xff0c;命名为“离开战斗” ​ 2.删除所有…

[附源码]Python计算机毕业设计_社区无接触快递栈

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

手撕二叉搜索树

目录 一、概念 二、常见操作 2.1 查找操作 2.2 插入操作 2.3 删除操作 三、模型应用 3.1 K模型 3.2 KV模型 3.3 代码完整实现 四、 性能分析 一、概念 二叉搜索树(BST,Binary Search Tree),也称二叉排序树或二叉查找树 它或者是一棵空树&#xff0c;或者是具有以下…

Spring整合Mybatis和Junit小案例(9)

Spring整合Mybatis和Junit环境准备步骤1&#xff1a;准备数据库步骤2&#xff1a;创建项目导入jar包步骤3&#xff1a;根据数据库的表创建模型类步骤4&#xff1a;创建Dao接口步骤5&#xff1a;创建Service接口和实现类步骤6&#xff1a;添加jdbc.properties文件步骤7&#xff…

5种常用格式的数据输出,手把手教你用Pandas实现

导读:任何原始格式的数据载入DataFrame后,都可以使用类似DataFrame.to_csv()的方法输出到相应格式的文件或者目标系统里。本文将介绍一些常用的数据输出目标格式。 01 CSV DataFrame.to_csv方法可以将DataFrame导出为CSV格式的文件,需要传入一个CSV文件名。 df.to_csv(done.…

在 SPRING Boot JPA 中调用带有本机查询中的参数的存储过程

配置pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.…

惊了!10万字的Spark全文!

Hello&#xff0c;大家好&#xff0c;这里是857技术社区&#xff0c;我是社区创始人之一&#xff0c;以后会持续给大家更新大数据各组件的合集内容&#xff0c;路过给个关注吧!!! 今天给大家分享一篇小白易读懂的 Spark万字概念长文&#xff0c;本篇文章追求的是力求精简、通俗…

Linux(基于Centos7)(一)

文章目录一、任务介绍二、基本操作命令三、目录操作命令四、文件操作命令五、查看系统信息六、其他常用命令一、任务介绍 Linux服务器配置与管理&#xff08;基于Centos7.2&#xff09;任务目标&#xff08;一&#xff09; 实施该工单的任务目标如下&#xff1a; 知识目标 1、…

RNA剪接增强免疫检查点抑制疗效

什么是 RNA 剪接&#xff1f;真核生物基因包含一系列外显子和内含子&#xff0c;内含子必须在转录过程中被移除以便成熟的 mRNA 被翻译成蛋白质&#xff0c;RNA 剪接则是这一过程中至关重要的一步。RNA 剪接包含两类剪接事件。组成型剪接 (constitutive splicing): RNA 剪接的一…

【蓝桥杯Web】第十四届蓝桥杯(Web 应用开发)模拟赛 1 期-职业院校组 | 精品题解

&#x1f9d1;‍&#x1f4bc; 个人简介&#xff1a;一个不甘平庸的平凡人&#x1f36c; &#x1f5a5;️ Nodejs专栏&#xff1a;Node.js从入门到精通 &#x1f5a5;️ TS知识总结&#xff1a;十万字TS知识点总结 &#x1f449; 你的一键三连是我更新的最大动力❤️&#xff0…

企业级Java EE架构设计精深实践

内容简介 本书全面、深入介绍了企业级Java EE设计的相关内容&#xff0c;内容涵盖了Java EE架构设计的常见问题。 本书每一章讲解一个Java EE领域的具体问题&#xff0c;采用问题背景、需求分析、解决思路、架构设计、实践示例和章节总结的顺序组织内容&#xff0c;旨在通过分…

生成树(STP)

1.详细说明STP的工作原理 在二层交换网络中&#xff0c;逻辑的阻塞部分的接口&#xff0c;实现从跟交换机到所有节点唯一的路径称为最佳路径&#xff0c;生成一个没有环路的拓扑。当最佳路径出现故障时&#xff0c;个别被阻塞的接口将打开&#xff0c;形成备份链路。 2. STP的…

Redis的发布和订阅

Redis的发布和订阅 什么是发布和订阅 redis发布订阅&#xff08;pub/sub&#xff09;是一种消息通信模式&#xff1a;发布者&#xff08;pub&#xff09;发布消息&#xff0c;订阅者&#xff08;sub&#xff09;接收消息。 redis客户端可以订阅任意数量的频道。 redis的发布…

vue3【计算属性与监听-详】

一、计算属性--简写形式 需求&#xff1a;通过计算属性&#xff0c;计算一个人的全名。 <template><h1>基本信息</h1>姓&#xff1a;<input type"text" v-model"personInfo.firstName"><hr>名&#xff1a;<input type&…

综合实验高级网络—— 配置三层 热备等网络技术

个人简介&#xff1a;云计算网络运维专业人员&#xff0c;了解运维知识&#xff0c;掌握TCP/IP协议&#xff0c;每天分享网络运维知识与技能。个人爱好: 编程&#xff0c;打篮球&#xff0c;计算机知识个人名言&#xff1a;海不辞水&#xff0c;故能成其大&#xff1b;山不辞石…

结合邻域连接法的蚁群优化(NACO)求解TSP问题(Matlab代码实现)

&#x1f468;‍&#x1f393;个人主页&#xff1a;研学社的博客 &#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜…