线程池ThreadPoolExecutor源码解析

news2025/1/11 21:49:42

参考视频

首先回顾一下创建线程等的三种方式

第一个是直接继承Thread类,重写run方法,这个其实内部也是继承了Runnable接口重写run方法。

比如:

public class MyThread extends Thread{
    @Override
    public void run() {
        System.out.println("这是我自己的线程");
    }

    public static void main(String[] args) {
        new MyThread().start();
    }
}

Thread类也是继承Runable接口,内部也是重写了run方法,

public
class Thread implements Runnable 

但是内部的run方法就是直接调用target的run,这个target也就是一个Runable接口的实现类。
@Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }

第二种就是直接继承Runnable接口重写Run方法了。
第三种就是重写Callable接口然后用FutureTask进行包装,最后利用Thread进行启动。这种方式创建的线程可以抛出异常并且是有返回值的。

public class MyCallable implements Callable {
    @Override
    public Object call() throws Exception {
        System.out.println("这是call方法,我自己写的");
        return "call function";
    }

    public static void main(String[] args) {
        MyCallable myCallable = new MyCallable();

        try {
            FutureTask futureTask = new FutureTask<>(myCallable);

            new Thread(futureTask).start();
//futureTask.get()这个方法会阻塞当前线程的执行。
            System.out.println(futureTask.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

为什么会出现线程池?

试想一下,如果我们一个比价大型的项目,,每次需要用到多线程都new一个线程,那么这就会导致线程的创建数量我们不好管理和控制,另外创建和销毁线程的开销还是比较大的,甚至有的时候都已经超过了我们这个线程本身执行这个任务所需要的资源耗,这就是非常不合理的了。也有可能会存在线程切换之间的开销过大,资源浪费严重,因此线程池就应运而生了。

我们先看一下这个ThreadPoolExecutor
在这里插入图片描述

这个线程的构造方法中可以传递7个参数,具体的作用和意义这里就不再过多赘述了。这里主要讲解具体这些参数的功能是怎么样实现的?比如为什么线程能够复用,为什么能够保活?

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {

我们在介绍这个类的时候首先先来看一下这个类中的一些属性,避免直接看源码啥也看不懂。

这个ctl是整个线程池的状态控制参数,用这个32位的整数来表示两个状态

  • 第一个就是线程池的运行状态,用最高的三位来表示。
  • 第二个是线程池中的工作线程的数量,用低的29位来表示。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	//Integer.SIZE - 3 = 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //线程池定义的最大工作线程的数量2^29-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    // 下面是线程池的运行状态,保存在ctl的高三位。
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
	//ctlOf其实就是按位异或
	private static int ctlOf(int rs, int wc) { return rs | wc; }

这里顺便介绍一些-1的二进制,我们可以用下面的代码查看-1的二进制

//11111111111111111111111111111111
原码取反变反码
反码加1变成补码
-1的原码:1000 0001
反码:1111 1110
补码:1111 1111-1使用的时候为1111 1111
System.out.println(Integer.toBinaryString(-1));

在这个线程池的类中还有一个属性,这个属性如果为false(默认值),则核心线程即使在空闲时也保持活动状态。如果为true,则核心线程使用keepAliveTime超时等待工作。也就是核心线程也会在超时时间过后被销毁。

private volatile boolean allowCoreThreadTimeOut;

下面我们再介绍几个方法

这个方法是用来计算核心线程数的,调用的时候传递的是ctl的值,有没有发现这里用按位与运算就可以实现,所以上面定义工作线程的最大数量的时候是不是很巧妙?这样运算好像效率更高。

private static int workerCountOf(int c)  { return c & CAPACITY; }

上面这个方法再下面这个方法中被调用,当然,下面这个方法也是线程池中的方法。
源码中的注释:启动核心线程,使其空闲等待工作。这将覆盖仅在执行新任务时启动核心线程的默认策略。
如果所有核心线程都已启动,则此方法将返回false。
返回值:如果启动了线程,则为true

public boolean prestartCoreThread() {
        return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);
    }

动态设置核心线程数。
这里

public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        //这里如果发现设置的核心线程数比原来的核心线程数小就会
        //调用interruptIdleWorkers打断闲置的线程
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers();
            //如果这里发现要设置的线程数大于已经存在的核心线程数,那么就会在阻塞队列不为空的情况下
            //不断低循环创建核心线程,直到队列为空或者核心线程数达到要求。
        else if (delta > 0) {
            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }

interruptIdleWorkers内补调用了下面的代码,这里面有个可重入锁,这是线程池类中的一个属性,源码中是这样解释的:
锁定对工人集合和相关簿记的访问。虽然我们可以使用某种类型的并发集合,但事实证明,使用锁通常更可取。其中一个原因是,它序列化了interruptIdleWorkers,这避免了不必要的中断风暴,尤其是在关机期间。否则,退出线程将同时中断那些尚未中断的线程。它还简化了largestPoolSize等的一些相关统计记账。我们还保留mainLock on shutdown和shutdown Now,以确保工人设置稳定,同时分别检查中断和实际中断的权限。

这代码里面有个Worker ,其实就是对工作线程的一个包装,感兴趣的话可以看一下源码。

private final ReentrantLock mainLock = new ReentrantLock();

private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        // 这里首先获取到锁然后对工人workers进行遍历,将所有的工作线程的打断状态设置位true
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

下面这个是提供了两个钩子函数,可以在线程池执行某一个线程的前后进行相关的操作

在给定线程中执行给定Runnable之前调用的方法。此方法由将执行任务r的线程t调用,可用于重新初始化
ThreadLocals或执行日志记录。
这个实现什么都不做,但可以在子类中定制。注意:为了正确嵌套多个重写,子类通常应该在这个方法的
末尾调用super.beforeExecute。
参数:
t–将运行任务的线程r–将执行的任务

protected void beforeExecute(Thread t, Runnable r) { }

protected void afterExecute(Runnable r, Throwable t) { }

这两个方法会在Worker 类的run方法中被调用

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable

public void run() {
            runWorker(this);
        }

我们可以看一下这个方法,我们可以发现这个线程里面不断地调用阻塞队列里面的线程,然后调用它的run方法执行任务,所以实际上没有创建新的线程,你可能比较好奇为这个执行run方法的线程是哪里来的。其实就是addWorker方法创建出来的核心线程来执行的,那为什么核心线程不会start结束呢?为什么核心线程能够复用呢?

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

我们看一下addWorker这个方法,这个方法你会发现会调用了Thread 类的start方法,这里你可能还不明白为什么么,我们看一下前一句的构造方法,w = new Worker(firstTask);

  • Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }

可以看到这个构造方法赋值给Worker类的thread的成员变量就是Worker类自己,因为Worker类实现了Runnable接口,所以这里下面调用的start方法就是调用了上run方法里面的runWorker方法。

private boolean addWorker(Runnable firstTask, boolean core) {
       /******//
       /* 此处省略创建核心线程之前的判断 */
       /*****/

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

这里又回到了上面的runWorker方法,为什么不会执行完呢,如果while (task != null || (task = getTask()) != null)这个判断为false了核心线程不就没了吗?所以复用的原理就出来了,我们看一下这个getTask方法,从阻塞队列里面获取任务。这个getTask会不断循环获取,除非当前线程大于核心线程数才会返回null不然就会循环判断。

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/167797.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

论文分享-《基于数据驱动多输出 ARMAX 建模的高炉十字测温中心温度》

1.简介 最近在学习研究NARMAX&#xff0c;故也分享下自己看的一篇论文。 2018 年 3 月 的《基于数据驱动多输出 ARMAX 建模的高炉十字测温中心温度》。主要是采用NARMAX模型进行预测&#xff0c;多输入多输出&#xff0c;有5个输出&#xff0c;预测中心五个点位的温度。下面讲…

计算机 - - - 局域网共享文件夹,局域网传输文件(待完善)

win10局域网共享文件夹 A电脑: 共享文件夹的电脑 B电脑: 访问共享文件夹的电脑 操作完成后, B电脑可以下载A电脑中的文件, B电脑可以修改删除, B电脑可以上传B电脑的文件到A电脑. A电脑 找到要共享的文件夹, 例如我要共享文档(E:), 我要把文档(E:)中的所有文件都让B电脑访问…

Python - 数据容器str(字符串)

目录 字符串的定义 字符串的常用操作 查找特定字符串的下标索引值 index 字符串的替换 replace 字符串的分割 split 字符串的规整操作 strip 统计字符串中某字符串的出现次数 count 统计字符串的长度 len 字符串切片 [起始下标:结束下标:步长] 字符串的定义 和其它容器…

银行案例分析:识别个人贷款客户画像,实现精准营销与风险防范

作为商业银行最主要的业务活动&#xff0c;也是收益最大的活动&#xff0c;贷款于银行的重要性不言而喻。又由于个人贷款是银行贷款不可或缺的一部分&#xff0c;那么了解个人贷款客户画像就有助于银行对客户进行精准销售和风险识别。 # 选手介绍 #张昊泽&#xff1a;亚利桑那州…

Pycharm入门搭建Django 项目

一、环境准备 1、pycharm版本 2、python版本 二、创建项目 击左上角的 File -> New Project 点击Create创建完成之后页面等待下载环境 查看Django的版本 python -m django --version 启动项目 python manage.py runserver 三、后记 在启动 Django 项目的时候我发现控制台…

【PaaS】分享一家最近发现的宝藏Paas厂家

目录 一、结识独自开 二、独自开的介绍 三、独自开的需求 四、独自开注册流程 五、神仙公司独自开 一、结识独自开 算是机缘巧合&#xff0c;我被C站白佬拉入了他的聊天群&#xff0c;群内均是来自于CSDN的不同领域的优质作者&#xff0c;其中不乏相关领域工作多年的老工程…

第一层:封装

文章目录前言类和对象封装class权限publicprotectedprivatestruct和class的区别封装的好处封装的用法学完封装&#xff0c;突破第一层&#x1f389;welcome&#x1f389; ✒️博主介绍&#xff1a;一名大一的智能制造专业学生&#xff0c;在学习C/C的路上会越走越远&#xff0c…

【电子学会】2022年12月图形化四级 -- 金牌百分比

金牌百分比 计算金牌榜前十的国家获得的金牌总数占金牌总数的百分比。金牌榜前十的国家获得的金牌总数占金牌总数的百分比等于(金牌榜前十国家的金牌总数本届冬奥会金牌总数)100,并将这个数向下取整。 1. 准备工作 (1)保留舞台默认背景及角色小猫; (2)建立变量“金牌…

YOLOALL 一文了解YOLO各版本答案

来源&#xff1a;投稿 作者&#xff1a;ΔU 编辑&#xff1a;学姐 YoloAll项目简介 相信了解YOLO的小伙伴们一定都有这样的困扰&#xff0c;目前YOLO各个版本数量非常多&#xff0c;不知道在实际场景中应该选择哪个YOLO版本。甚至有时为了比较两个不同版本的YOLO的效果&#x…

python标准库xmlrpc 之RPC远程方法调用

&#x1f496;&#x1f496;&#x1f496;养成每日阅读好习惯, 每天进步, 超越昨天的自己&#x1f496;&#x1f496;&#x1f496; 愿景&#xff1a;输出体系化编程知识与技巧&#xff0c;助力软件行业发展与从业者学习减负&#xff0c;让编程产生更大价值。 &#x1f50e;&am…

【linux】之网络安全

防火墙作用 在计算机领域,防火墙是用于保护信息安全的设备,其会依照用户定义的规则,允许或限制数据的传输。 用于保护内网安全的一种设备依据规则进行防护用户定义规则允许或拒绝外部用户访问防火墙分类 逻辑上划分,防火墙可以大体分为主机防火墙和网络防火墙 主机防火墙:…

服务注册与发现:Nacos Discovery

目录 一、概述 二、Nacos discovery——服务的注册与发现 1. 版本关系 2. 下载安装 &#xff08;1&#xff09;下载 &#xff08;2&#xff09;启动 &#xff08;3&#xff09;浏览器访问 三、Nacos服务注册与发现实战 1. 构建Spring Cloud Alibaba工程 &#xff08;1&…

Vite性能优化之分包策略

为什么需要分包策略&#xff1f; 浏览器的缓存策略 浏览器在请求静态资源时&#xff0c;只要静态资源的名称不变&#xff0c;它就不会重新请求 xxx.js资源。 使用Vite打包后的js文件是带有哈希值的&#xff0c;只要我们的代码内容有一点点变化&#xff0c;那么文件的hash值都…

ConfigMap

目录 文章目录目录本节实战前言1、创建:warning: yaml里易混淆的点&#xff08;1&#xff09;|用法&#xff08;2&#xff09;|和|-用法&#xff08;3&#xff09;>用法1.通过资源清单文件方法创建ConfigMap2.通过from-file关键字创建ConfigMap&#xff08;1&#xff09;fro…

二十九、Kubernetes中CronJob(CJ)详解

1、概述 在kubernetes中&#xff0c;有很多类型的pod控制器&#xff0c;每种都有自己的适合的场景&#xff0c;常见的有下面这些&#xff1a; ReplicationController&#xff1a;比较原始的pod控制器&#xff0c;已经被废弃&#xff0c;由ReplicaSet替代 ReplicaSet&#xff…

yolov5训练自己的数据集,OpenCV DNN推理

学更好的别人&#xff0c; 做更好的自己。 ——《微卡智享》 本文长度为4238字&#xff0c;预计阅读9分钟 前言 上一篇《OpenCV--自学笔记》搭建好了yolov5的环境&#xff0c;作为目标检测在应用中&#xff0c;最重要的还是训练自己的数字集并推理&#xff0c;所以这一篇就专门…

『Kubernetes』在Linux中快速安装K8S集群

&#x1f4e3;读完这篇文章里你能收获到 K8S安装全过程博主自己实操笔记带你跳过所有坑感谢点赞收藏&#xff0c;避免下次找不到~ 文章目录一、基本环境配置1. 关闭selinux2. 关闭swap分区或禁用swap文件3. 修改网卡配置4. 关闭防火墙5. 设置机器HostName6. 更新系统时间二、k…

Vue 2.x源码学习:数据响应式改造

众所周知&#xff0c;Vue是以数据驱动视图展示的&#xff0c;即Vue会监听数据的变化&#xff0c;从而自动重新渲染页面的结构。 Vue主要通过三步走来实现这个功能&#xff1a; 第一步是对数据进行响应式改造&#xff0c;即对数据的读写操作进行劫持&#xff1b; 第二步是对模…

字符数组编程题(C语言)

文章目录1、用字符%c的形式给一个字符数组赋初值&#xff0c;然后以字符串%s的形式输出2、用字符getchar&#xff08;&#xff09;的形式给一个字符数组赋初值&#xff0c;然后以字符串puts&#xff09;的形式输出4、从键盘上输入一串字符gets&#xff0c;然后输出%s5、从键盘输…

Mysql导出100万条数据,9种导出方法优缺点和速度、文件大小测试

这里写目录标题1.DBase文件2.文本文件3. CSV文件4.HTML文件5.Excel数据表低版本6.Excel文件2007年以后版本7.SQL脚本文件8.XML文件9.JSON文件总结这一次我主要就是想针对mysql导出的速度和文件大小进行优缺点测试&#xff0c;这次主要就是用上之前生成的天气表这是表里面的数据…