ThreadPoolExecutor 理解

news2025/1/15 20:01:39
  参数的理解:
核心和最大池大小  Core and maximum pool sizes
ThreadPoolExecutor将根据corePoolSize 和 maximumPoolSize 设置的边界自动调整线程池大小,在方法execute(Runnable)中提交新任务时,如果运行的线程数少于corePoolSize, 即使其他工作线程处于空闲状态,也会创建一个新线程来处理请求。 如果运行的线程数少于最大池大小,并且在队列已满时 则将创建一个新线程处理请求。通过将corePoolSize和maximumPoolSize设置为相同值,可以创建一个固定大小的线程池。 通过将 maximumPoolSize 设置为基本上无界的值,如Integer.MAX_VALUE 就是允许线程池容纳任意数量的并发任务。核心和最大池大小可以在构建时设置,也可以使用setCorePoolSize 和 setMaximumPoolSize 动态更改。

  // 对于 corePoolSize (核心线程数 来说)

  if (池中已经创建的线程 <  corePoolSize){

                   立即创建一个线程来处理请求。

    } 

 // 对于 maximumPoolSize (最大池中线程数大小 来说)

 if ( (运行的线程数 <  maximumPoolSize )  &&  workQueue.isEmpty()){

                  立即创建一个线程来处理请求。

   }

corePoolSize:池中要保留的线程数,即使它们处于空闲状态。
maximumPoolSize:池中允许的最大线程数
keepAliveTime:当线程数大于核心时,这是多余空闲线程在终止前等待新任务的最长时间。
unit:keepAliveTime参数的时间单位
workQueue:在执行任务之前用于保存任务的队列。此队列将仅保存execute方法提交的Runnable任务。
threadFactory:执行器创建新的线程时使用的工厂
拒绝策略:当执行因达到线程边界 和 队列容量时 使用的策略,如果任务阻塞队列已满且线程池中的线程数等于maximumPoolSize,说明线程池此时处于饱和状态,应该执行一种拒绝策略来处理新提交的任务。

 4种拒绝策略:

  1.默认的拒绝策略  new ThreadPoolExecutor.AbortPolicy();

      直接抛出异常:抛出RejectedExecutionException

  2.由提交任务的线程处理该任务: new ThreadPoolExecutor.CallerRunsPolicy();

     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

          if (!e.isShutdown()) {

                r.run();

          }

      }

 3.不执行任务,也不抛出异常,直接丢弃:new ThreadPoolExecutor.DiscardPolicy()

    方法里面什么也没有实现:

     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }

 4.丢弃队列中最老的任务,然后尝试再次提交任务:

     new ThreadPoolExecutor.DiscardOldestPolicy()

     方法的实现:

      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

              if (!e.isShutdown()) {

                    e.getQueue().poll();

                     e.execute(r);

               }

        }

 5. 自定义拒绝策略 实现 RejectedExecutionHandler 接口中的 rejectedExecution 方法

   void rejectedExecution(Runnable r, ThreadPoolExecutor executor); 

execute方法的流程:
public static void main(String[] args) {
        // 创建一个固定大小的线程池
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                1, // 核心线程数
                2, // 最大线程数
                0L, // 存活时间
                TimeUnit.MILLISECONDS,
                workQueue,
                new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );

        // 提交任务
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                System.out.println("Task executed on thread: " + Thread.currentThread().getName());
            });
        }
        // 关闭线程池
        executor.shutdown();

    }
用的是默认的线程工厂
Executors.defaultThreadFactory()

线程池的控制状态ctl是一个原子整数,它包含了两个概念字段 workerCount:表示线程的有效数量
runState:表示是否正在运行、关闭等
private final AtomicInteger  ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
//workerCount 线程的数量(2^29)-1(约5亿)个线程
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; 

COUNT_MASK 二进制是:00011111111111111111111111111111
COUNT_MASK 值是:536870911


// runState 存储在高位
private static final int RUNNING    = -1 << COUNT_BITS;
    -1 的二进制数:11111111111111111111111111111111
左移动29的二进制数:11100000000000000000000000000000 
RUNNIN 的   值是: -536870912  
中间记录一下负数 左移动的 插曲:   
负数:取最高位有值的数,中间有间隔的数 减去间隔的数字。取反
例如:-10
源码:0000 1010
反码:1111 0101
补码:1111 0110
左移动4位 -10 int 类型的二进制:1111 1111 1111 1111 1111 1111 0110 0000
取最高位的值是:从右边数第9位是:256 减去间隔的值右边数6位和7位,他们的值是 64+32=96
 256 - 96 = 160
private static final int SHUTDOWN   =  0 << COUNT_BITS;
     0 的二进制数:0
左移动29的二进制数:0
SHUTDOWN 的 值是:0
private static final int STOP       =  1 << COUNT_BITS;
     1 的二进制数:00000000000000000000000000000001
左移动29的二进制数:00100000000000000000000000000000
STOP     的 值是:536870912
private static final int TIDYING    =  2 << COUNT_BITS;
     2 的二进制数:00000000000000000000000000000010
左移动29的二进制数:01000000000000000000000000000000
TIDYING  的 值是:1073741824
private static final int TERMINATED =  3 << COUNT_BITS;
     3 的二进制数:00000000000000000000000000000011
左移动29的二进制数:01100000000000000000000000000000
TERMINATED的值是:

进入 execute 方法:

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /**
         * 分为3个步骤:
         *
         *    1.如果运行的线程数少于corePoolSize,请尝试使用给定的任务作为第一个任务启动一个新线程。
         *    对addWorker的调用会自动检查runState和workerCount,
         *    从而通过返回false来防止在不应该添加线程的情况下添加错误警报
         *
         *    2.如果一个任务可以成功排队,那么我们仍然需要仔细检查是否应该添加一个线程(因为自上次检查以来,现有的线程已经死亡),
         *    或者池是否在进入此方法后关闭。因此,我们重新检查状态,并在必要时回滚查询,如果没有,则停止或启动新线程。
         *
         *     3.如果我们无法对任务进行排队,那么我们会尝试添加一个新线程。
         *       如果它失败了,我们知道我们被关闭或饱和了,因此拒绝了这项任务。
         *
         */
        /**
         * 第一次进来
         *    ctl.get() 获取的值是 -536870912
         *    初始化的时候 RUNNING 是 :11100000000000000000000000000000
         *    private final AtomicInteger  ctl = new AtomicInteger(ctlOf(RUNNING, 0));
         *    经过 | 后 结果不变
         *    private static int ctlOf(int rs, int wc) { return rs | wc; }
         * 第二次进来
         *   ctl.get() 获取的值是 -536870911  (在第一次的时候已经 进行了 + 1)
         */
        int c = ctl.get();
        /**
         * 第一次进来
         *    private static int workerCountOf(int c)  { return c & COUNT_MASK; }
         *    c 传过来的值是 -536870912 & COUNT_MASK
         *    COUNT_MASK 的值是:536870911
         *    c的二进制数:  11100000000000000000000000000000
         *               &
         *    COUNT_MASK: 00011111111111111111111111111111
         *    是 0   corePoolSize 是 1  结果是 true
         * 第二次进来
         *    private static int workerCountOf(int c)  { return c & COUNT_MASK; }
         *    c 传过来的值是 -536870911 & COUNT_MASK
         *    COUNT_MASK 的值是:536870911
         *    c的二进制数:  11100000000000000000000000000001
         *               &
         *    COUNT_MASK: 00011111111111111111111111111111
         *    是 1   corePoolSize 是 1  结果是 false
         */
        if (workerCountOf(c) < corePoolSize) {

            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 因为我核心数的大小设置的是 1 ,所以第二次就进到这里
        // private static boolean isRunning(int c) { return c < SHUTDOWN; }
        // c 是 -536870911  <  SHUTDOWN 是 0 结果是true
        // 然后判断队列是否可以插入数据  workQueue.offer(command) 没有给队列设置大小,可以插入进来
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 上面已经验证, 这里取反返回false
            if (! isRunning(recheck) && remove(command))
                reject(command);
                // private static int workerCountOf(int c)  { return c & COUNT_MASK; }
                // COUNT_MASK的二进制是:    11111111111111111111111111111
                //  c的二进制是:         11100000000000000000000000000001
                //  返回 1 结果为 false   里面没有处理任务 直接返回了 , 全部添加到了队列
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //将 队列设置为1 后 直接走到拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }
addWorker(Runnable firstTask, boolean core)方法:
/**
     * 检查是否可以根据当前线程池状态和给定的绑定(核心或最大值)添加新的worker。
     * 如果可以,则创建并启动一个新worker,将firstTask作为其第一个任务运行。
     * 如果线程池已停止或符合关闭条件,则此方法返回false。如果线程工厂在被询问时不能创建线程,它也会返回false。
     * 如果线程创建失败,无论是由于线程工厂返回null,还是由于异常(通常是thread.start()中的OutOfMemoryError),
     * 我们都会彻底回滚。
     * @param firstTask 线程
     * @param core true 当前的线程数 < core线程数
     * @return
     */

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // c 是  -536870912
        for (int c = ctl.get();;) {
            //  运行状态的检查
            //  private static boolean runStateAtLeast(int c, int s) { return c >= s; }
            //
            //  private static final int SHUTDOWN   =  0 << COUNT_BITS;
            //  private static final int STOP       =  1 << COUNT_BITS;
            //  SHUTDOWN 是 0  STOP 是 536870912
            //  -536870912 >= 0 && -536870912 > 536870912 返回 false
            if (runStateAtLeast(c, SHUTDOWN)
                    && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
                // private static int workerCountOf(int c)  { return c & COUNT_MASK; }
                // c 的 二进制是: 11100000000000000000000000000000
                // COUNT_MASK是: 00011111111111111111111111111111
                // 没有线程在工作 返回 0
                // 0 >= 1
                if (workerCountOf(c)
                        >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        ThreadPoolExecutor.Worker w = null;
        try {
            //利用线程的工厂创建线程
            w = new ThreadPoolExecutor.Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 上锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 拿到锁后重新检查。
                    // 如果ThreadFactory失败或在获得锁之前关闭,则退出。
                    // 前边已经原子 进行了 加 1  获取的值是:-536870911
                    int c = ctl.get();

                    // private static boolean isRunning(int c) { return c < SHUTDOWN;  }
                    //  SHUTDOWN 是 0
                    //  结果返回true
                    if (isRunning(c) ||
                            (runStateLessThan(c, STOP) && firstTask == null)) {
                        // 新创建的线程 没有执行之前的状态都是 new
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        workerAdded = true;
                        int s = workers.size();
                        // 跟踪已达到的最大池大小。仅在mainLock下访问
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

辅助方法:

// 原子修改 这个 线程池状态 工作线程 +1
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    //创建一个线程
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

根据拒绝的策略,最后由main方法 执行拒绝后的任务 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1996421.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

http参数污染利用php小特性绕过贷齐乐waf

分析源码 GET/POST/REQUEST/COOKIE都会经过这个替换str_ace(array(&, ", <, >,(,)), array(&, ", <, >,&#xff08;,&#xff09;), $string) GET/POST/REQUEST三个变量&#xff0c;都会经过这个正则&#xff1a;select\|insert\|update\|delet…

【Linux】系列入门摘抄笔记-2-语法格式与内置、外部、帮助命令

语法格式与命令 1、命令的语法格式 command [选项] [参数] &#xff08;1&#xff09;command : 称为命令&#xff0c;是必须的&#xff0c;要执行的操作。 &#xff08;2&#xff09;选项&#xff1a;对命令的功能进行微调&#xff0c;决定这个命令将如何执行&#xff0c;同…

xshell连接云服务器 出现“所选的用户密钥未在远程主机上注册”的解决办法

欢迎来到雲闪世界。最近刚刚开启了谷歌云&#xff0c;有空了准备开台小鸡折腾下onedrive索引工具pyone。但是开了台小鸡之后&#xff0c;高兴的使用xshell生成密钥&#xff0c;把公钥复制到谷歌云控制台。然后新建主机&#xff0c;输入ip一套操作行云流水&#xff0c;可是出了点…

HarmonyOS Developer之实现点赞效果

待实现效果 点赞按钮通过一个div组件关联click事件实现。div组件包含一个image组件和一个text组件&#xff1a; image组件用于显示未点赞和点赞的效果。click事件函数会交替更新点赞和未点赞图片的路径。text组件用于显示点赞数&#xff0c;点赞数会在click事件的函数中同步更…

解锁多场景,EasyCVR视频汇聚网关赋能业务数字化转型

在信息化高速发展的今天&#xff0c;视频监控系统已成为各行各业不可或缺的一部分。从公共安全到企业管理&#xff0c;从智慧城市建设到个人生活安全&#xff0c;视频监控的覆盖范围日益广泛。而视频汇聚网关&#xff0c;作为视频监控系统中的核心设备&#xff0c;扮演着承上启…

程序员学长 | 超强!六大优化算法全总结

本文来源公众号“程序员学长”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;超强&#xff01;六大优化算法全总结 今天我们将详细讨论一下用于训练神经网络&#xff08;深度学习模型&#xff09;时使用的一些常见优化技术&#…

FlexBV电路查看软件

FlexBV - Macbook, iPhone, PC/Laptop & Electronics BoardViewer with PDF Cross Referencing 免费。 支持tvw&#xff0c;cad格式。 支持Windows,Linux,Mac。 而且我发现cad格式是文本的&#xff01;意味着可以自由编辑&#xff01;

springboot窝窝酒店管理系统-计算机毕业设计源码91798

摘 要 随着时代的进步与发展&#xff0c;互联网技术的应用也变得日益广泛。窝窝酒店管理系统在当今社会体系中扮演了一个非常重要的角色&#xff0c;它能大大地提高效率并减少了资源上的浪费。本文首先介绍了窝窝酒店管理系统的优势以及重要性&#xff1b;然后描述了这个系统的…

学习鸿蒙-构建私有仓储

1.选择 鸿蒙提供ohpm-repo工具用于构建本地私有仓储 ohpm-repo下载 2.环境配置 安装node&#xff0c;ohpm-repo 支持 node.js 18.x 及以上版本 node最新版本下载 3.配置文件及运行 1.解压 ohpm-repo 私仓工具包 2.进入 ohpm-repo 解压目录的 conf 目录内&#xff0c;打开 c…

PyTorch深度学习框架

最近放假在超星总部河北燕郊园区实习&#xff0c;本来是搞前后端开发岗位的&#xff0c;然后带我的副总老大哥比较关照我&#xff0c;了解我的情况后得知我大三选的方向是大数据&#xff0c;于是建议我学学python、Hadoop&#xff0c;Hadoop我看了一下内容比较多&#xff0c;而…

从概念到落地:全面解析DApp项目开发的核心要素与未来趋势

随着区块链技术的迅猛发展&#xff0c;去中心化应用程序&#xff08;DApp&#xff09;逐渐成为Web3时代的重要组成部分。DApp通过智能合约和分布式账本技术&#xff0c;提供了无需信任中介的解决方案&#xff0c;这种去中心化的特性使其在金融、游戏、社交等多个领域得到了广泛…

金融行业如何高效管理新媒体矩阵

金融行业作为经济体系的重要一环&#xff0c;受到社会多方关注和监管。 前有“985大一投行实习日常”的短视频引发大众热议&#xff0c;后有某机构女员工自爆事件牵扯出多家金融机构&#xff0c;将金融行业一度推到了舆论的风口浪尖。 这两件事的接连出现&#xff0c;也把金融新…

飞天发布时刻:大数据AI平台产品升级发布

7月24日&#xff0c;阿里云飞天发布时刻产品发布会围绕阿里云大数据AI平台的新能力和新产品进行详细介绍。人工智能平台PAI、云原生大数据计算服务MaxCompute、开源大数据平台E-MapReduce、实时数仓Hologres、阿里云Elasticsearch、向量检索Milvus等产品均带来了相关发布的深度…

C++必修:STL之forward_list与list的使用

✨✨ 欢迎大家来到贝蒂大讲堂✨✨ &#x1f388;&#x1f388;养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属专栏&#xff1a;C学习 贝蒂的主页&#xff1a;Betty’s blog 1. forward_list与list forward_list 是 C 11 引入的一种容器&#xff0c;它是一…

LQR横向控制及融合PID纵向控制C++实现

目录 简介一、现代控制理论1.1 经典控制理论和现代控制理论的区别1.2 全状态反馈控制系统 二、LQR控制器2.1 连续时间2.1.1 Q、R矩阵的选取2.1.2 推导过程2.1.3 连续时间下的LQR算法步骤 2.2 离散时间2.2.1 连续LQR和离散LQR的区别2.2.2离散时间下的LQR算法步骤 三、LQR实现自动…

AI大模型之旅--安装向量库milvus

milvus&#xff0c;向量索引库 1.milvus部署 milvus的官方文档中看到最新版本的部署方式Install Milvus Standalone with Docker Compose curl -sfL https://raw.githubusercontent.com/milvus-io/milvus/master/scripts/standalone_embed.sh -o standalone_embed.sh &#xf…

stm32f103c8t6与TB6612FNG解耦测试

stm32f103c8t6与TB6612FNG解耦测试 本文操作方式: 忽略底层,只做上层, 所以前面全部照搬步骤,重在调试 文章目录 stm32f103c8t6与TB6612FNG解耦测试本文操作方式:创建基本工程(1)跳转此链接,创建(2)创建电机驱动文件夹(3)PWM原理(4)电机转动控制 oled调试和key调试(5)OLED转速…

C++:奇异递归模板模式(CRTP模式)

奇异递归模板模式 文章目录 奇异递归模板模式理论说明CRTP模式的功能静态多态强制静态接口编译时多态优化解释 理论说明 奇异递归模板模式&#xff08;Curiously Recurring Template Pattern, CRTP&#xff09; 是一种设计模式&#xff0c;其原理很简单&#xff1a; 继承者将自…

工业三防平板赋能自动化产线打造工厂智慧管理

随着工业4.0时代的到来&#xff0c;智能制造成为了众多企业转型升级的必然选择。而MES系统作为智能制造的核心环节&#xff0c;能够有效地整合生产数据&#xff0c;提升生产效率&#xff0c;并实现工厂运营的数字化管理。然而&#xff0c;传统的MES系统大多依赖于PC端操作&…

关于vs调试的一些基本技巧方法,建议新手学习

文章目录 1.Debug 和 Release2.VS的调试快捷键3.对程序的监视和内存观察3.1监视3.2内存 4.编程常见错误归类4.1编译型错误4.2链接型错误4.3运行时错误 1.Debug 和 Release 在我们使用的编译器 vs 中&#xff0c;这个位置有两个选项&#xff0c;分别为Debug和Release&#xff0c…