【并发】Java并发线程池底层原理详解与源码分析(下)

news2024/11/30 14:47:37

【并发】Java并发线程池底层原理详解与源码分析(下)

前情回顾

上篇文章地址

【并发】Java并发线程池底层原理详解与源码分析(上)_面向鸿蒙编程的博客-CSDN博客线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。Executors 返回的线程池对象的弊端:1) FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。2.CachedThreadPool:允许的创建线程数量为 MAX_VALUE,可能会创建大量建大量的线程,从而导致 OOMhttps://blog.csdn.net/weixin_43715214/article/details/128045130

遗留问题解析

手动实现线程池代码

public class ThreadPoolDemo {
    public static void main(String[] args) {
        // 自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20,
                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10));
        for (int i = 1; i <= 100; i++) {
            threadPoolExecutor.execute(new MyTask(i));
        }
    }
}
 
class MyTask implements Runnable {
    int i = 0;
 
    public MyTask(int i) {
        this.i = i;
    }
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "程序员做第" + i + "个项目");
        try {
            Thread.sleep(3000L);//业务逻辑
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果

我们在上一个章节中提到了手动实现线程池,但是它的结果似乎有点匪夷所思! 

为什么11~20会在最后面?

为什么运行会抛异常?

为什么打印日志只有前30个?

!!!先说结论!!!

在案例代码中,有100个任务,1个任务的处理时间大致需要3秒,10个核心线程,10个临时线程,阻塞队列的容量是10。

这里只会打印会前30个任务(10+10+10=30),由于在3s核心线程临时线程都在忙碌中,队列也,按照ThreadPoolExecutor默认的策略会抛出异常

按照线程池的工作顺序,会先分配10个核心线程(1~10),再装满队列(11~20),最后分配临时线程(21~30);执行逻辑是核心线程临时线程会先把“手头上”的任务处理完,才会去处理队列里的任务,这就是队列里的任务(11~20)最后打印的原因!!!

ThreadPoolExecutor 源码分析

接下来,我将会结合线程池ThreadPoolExecutor源码来解析这个问题!!! 

线程池源码结构

 顶层接口是Executor,它只有一个方法execute()ExecutorService接口继承了Executor接口,它主要是提供了submit()

【面试题】 execute()方法与submit()方法有什么区别???

submit底层还是调用execute

(1)关于返回值的问题

  • submit:有返回值,返回值(包括异常)被封装于FutureTask对象。适用于有返回结果的任务
  • execute:void类型的函数,没有返回值,适用于没有返回的任务

(2)关于异常处理的问题

  • submit:submit的时候并不会抛出异常(此时线程可能处于就绪状态)。只有在get操作的时候会抛出。因为get操作会阻塞等待线程的执行完毕。
  • execute在执行的时候会直接抛出。可以通过实现 UncaughtExceptionHandler接口来完成异常的捕获。

ThreadPoolExecutor 变量详解 

// 记录线程池的状态信息,高三位是状态信息,其余位表示工作的worker数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 表示worker数量
private static final int COUNT_BITS = Integer.SIZE - 3;

// worker容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 可以接受新的任务,也可以处理队列中的任务
private static final int RUNNING    = -1 << COUNT_BITS;
// 不接受新的任务,但是可以处理队列里的任务
private static final int SHUTDOWN   =  0 << COUNT_BITS;

// 后面3个都不行!!!
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

ThreadPoolExecutor 之 execute()方法详解 

public void execute(Runnable command) {
    // 要提交的任务不能是null,否则就抛异常
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    
    /*
     * 使用核心线程数的限制去开worker来执行这个任务 
     * addWorker会失败,可能是因为线程池状态或者worker的数量引起addWorker失败(后面分析addWorker方法的时候再说!)
     */
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;

        // 如果addWorker失败,则ctl要重新获取。
        // 因为不管是状态变,还是worker数量变,ctl都已经变了,你需要重新获取最新值。
        c = ctl.get();
    }

    /*
     * 在当前worker数量大于等于corePoolSize或者上面的addWorker失败之后才会走到这里
     * 经过上面分析可能有两种可能 1、线程的状态发生改变  2、当前worker数量不小于核心线程数
     * (1)查看一下当前线程的状态是否是running状态
     * (2)在满足(1)下会尝试往工作队列里面添加这个任务,但是有可能失败(工作队列可能满了)
     */ 
    if (isRunning(c) && workQueue.offer(command)) {

        // 走到这里说明offer之前是running状态 放入工作队列成功了 
        // 需要重新获取当前状态 因为有可能放进去之后线程状态变了!!!
        int recheck = ctl.get();

        /*
         * 如果offer之后线程池不是running了 需要尝试remove刚才的任务
         * 不是running的状态下,remove也有可能失败,他可能被执行了
         */
        if (! isRunning(recheck) && remove(command))
            // 如果remove成功了需要拒绝这个任务
            reject(command);

        /*
        * 走到这里,一定是offer成功了 
        * 这个判断是为了防止没有worker,但是队列里面有任务,没人执行
        * 可能是工作一段时间后worker的数量为0 和 allowCoreThreadTimeOut()这个方法有关系
        * 上一个if判断的!isRuning 是true remove失败的时候 有可能 workerCountOf(recheck) == 0 为true
        * 这个时候线程池肯定是不让你再添加线程的
        */
        else if (workerCountOf(recheck) == 0)
            /* 
             * 如果出现线程池是running worker是0,队列有任务,需要添加一个worker执行这些任务
             * 如果出现线程池不是running,但是remove失败,worker是0,线程池是不允许添加worker的这个逻辑在addworker方法里面!
             */
            addWorker(null, false);
    }
    /*
     * 如果用核心线程数限制开worker执行任务失败
     * 或者 线程池状态不是running
     * 或者 工作队列已经满了
     * 使用最大线程数限制开worker执行任务
     */
    else if (!addWorker(command, false))
        // 失败的原因有 
        // 1、worker达到非核心线程数 
        // 2、线程池的状态变了不是running了 则拒绝这个任务
        reject(command);
}

ThreadPoolExecutor 之 runWorker()方法详解 

final void runWorker(Worker w) {
    // 获取当前线程对象的引用
    Thread wt = Thread.currentThread();
    // 获取worker的firstTask
    Runnable task = w.firstTask;
    // 获取完之后把worker的firstTask置为null 防止下次获取到
    w.firstTask = null;
    // 初始化worker的state = 0, exclusiveOwnerThread = null 解锁
    w.unlock(); 
    // 如果发生异常 当前线程突然退出 该值为true 
    boolean completedAbruptly = true;
    try {
        // 如果firstTask获取getTask能获取到任务 则进行内层逻辑, 如果getTask返回null则循环退出了就要
        while (task != null || (task = getTask()) != null) {
            /*
            * worker设置独占锁
            * shutdown 时会判断当前worker的状态,根据独占锁的状态来判断worker是否在处理任务是否工作
            */
            w.lock();
            /*
            * 3个判断
            * 1、runStateAtLeast(ctl.get(), STOP)为真说明当前状态大于等于STOP 此时需要给他一个中断信号
            * 2、wt.isInterrupted()查看当前是否设置中断状态如果为false则说明为设置中断状态
            * 3、Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) 获取当前中断状态且清除中断状态 
            *    这个判断为真的话说明当前被设置了中断状态(有可能是线程池执行的业务代码设置的,然后重置了)且当前状态变成了大于等于STOP的状态了
            * 
            * 整个判断为真的两种情况
            * 1、如果当前线程大于等于STOP 且未设置中断状态 整个判断为true 第一个runStateAtLeast(ctl.get(), STOP)为true !wt.isInterrupted()为true
            * 2、第一次判断的时候不大于STOP 且当前设置了中断状态(Thread.interrupted()把中断状态又刷新了) 且设置完了之后线程池状态大于等于STOP了
            *    Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) 为true !wt.isInterrupted()为true
            *
            * 结合if判断里面的代码来看 
            * 也就是说如果线程池状态大于等于STOP则设置当前线程的中断状态
            * 如果线程池状态小于STOP则清除中断状态
            */
            if ((runStateAtLeast(ctl.get(), STOP) || 
                        (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) 
                    && !wt.isInterrupted())
                // 设置中断状态
                wt.interrupt();
            try {
                // 钩子方法留给子类实现
                beforeExecute(wt, task);
                try {
                    // task可能是FutureTask或者普通Runnable/Callable接口实现类
                    task.run();
                    // 钩子方法 留给子类实现
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                // 局部task设置为null
                task = null;
                // 完成数量加1
                w.completedTasks++;
                // 使用unlock 释放独占锁
                w.unlock();
            }
        }
        // getTask的返回为null 会走到这行 表示这次不是异常退出
        completedAbruptly = false;
    } finally {
        /*
        * 执行线程退出逻辑
        * 如果completedAbruptly是true说明是task.run()方法有异常 先catch后又抛了出来 在执行完了w.unlock();走到了这里
        * 如果是false则说明是拿不到任务走到了这里
        */
        processWorkerExit(w, completedAbruptly);
    }
}

ThreadPoolExecutor 之 addWorker()方法详解 

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 每次for循环都需要获取最新的ctl值
    for (int c = ctl.get();;) {
        /*
        * 这个地方的判断为true时可以分成三种情况
        * 1、线程池的状态是大于等于 STOP的 这个判断为true
        * 2、线程池状态是SHUTDOWN 但是 firstTask 是null 对应execute 的 else if (!addWorker(command, false)) 状态大于等于SHUTDOWN时不接受任务
        * 3、线程池状态是SHUTDOWN 工作队列已经空了 对应execute的 在状态大于等于SHUTDOWN时用addWorker(null, false) 执行队列里面的任务
        */
        if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()) )
            return false;

        for (;;) {
            // 如果core是true使用的核心线程数配置 否则使用maximumPoolSize
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            // 使用CAS的方法给ctl的worker的数量加1 成功则跳出最外层循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // CAS不成功则重新获取ctl的值 因为不成功ctl的值一定变了 CAS嘛 
            c = ctl.get();
            // 如果不成功的原因是状态变了 就重新进行外层循环
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
        }
    }

    // 能走到这里一定是说明CAS成功了 那么就可以进行创建worker执行任务了
    // woker是否执行了start
    boolean workerStarted = false;
    // worker 是否添加到workers成功
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 构建一个worker thread的target是worker所以调用t.start会执行worker的run方法,最后调用到runWorker方法
        w = new Worker(firstTask);
        // worker 的thread是 线程工厂的newThread方法创建的
        final Thread t = w.thread;
        if (t != null) {
            // 需要操作workers 加锁执行
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int c = ctl.get();

                // 如果线程池状态是running 或者 是shutdown但是firstTask是null(kanexecute方法 防止没有worker执行队列里面的任务)
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    // 如果线程的状态不是NEW说明线程不是经过线程池开启的 抛异常
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();
                    // 把worker添加到workers里面
                    workers.add(w);
                    workerAdded = true;
                    // 更新当前最大线程数量 maximumPoolSize 和 corePoolSize可以在线程池创建之后动态修改的
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                /*
                * 如果添加成功 就启动这个线程
                * 上面线程池状态判断没通过 或者 线程的状态不是NEW 就不会执行 workerAdded = true;
                * 线程就不会启动 workerStarted 就是false
                */
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 如果没有执行过t.start() 就要把这个woker从workers里面剔除并且 ctl里面worker数量减一
            addWorkerFailed(w);
    }
    return workerStarted;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/41119.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

浅析数据迁移工具Sqoop

title: Sqoop系列 第一章 Sqoop理论 1.1 概述 sqoop 是 apache 旗下一款“Hadoop 和关系数据库服务器之间传送数据”的工具。 导入&#xff08;Import&#xff09;数据&#xff1a;MySQL&#xff0c;Oracle 导入数据到 Hadoop 的 HDFS、HIVE、HBASE 等数据存储系统 导出&…

八、Gateway

文章目录一、Gateway网关1.网关的作用二、配置网关1.创建gateway模块2.引入依赖3.编写application.yml4.启动gateway模块&#xff0c;查看是否能访问user-service服务三、路由断言工厂Route Predicate Factory四、GatewayFilter&#xff08;过滤器&#xff09;1.添加过滤器方式…

IDEA2022用maven创建的Servlet项目

因为博主太菜太笨&#xff0c;总是记不住大佬教的步骤&#xff0c;写一篇博客记录一下。 有什么不对&#xff0c;或者疑惑可以请假这位大佬&#xff08;没错就是那个被我问烦的大佬&#xff09;ljj大佬 第一步&#xff1a;新建maven 新建Project 选择webapp 初始界面需要等待…

S32K144之ADC

一&#xff0c;S32K144的ADC介绍 1&#xff0c;ADC模块特性 S32K14x和S32K14xW包含两个12位ADC模块&#xff0c;ADC0和ADC1。 S32K11x包含一个12位的ADC模块&#xff0c;ADC0。 不同封装&#xff0c;ADC0和ADC1所包含的通道数不一样&#xff0c;LQFP100来说ADC0和ADC1分别有16…

Spark - RDD 算子介绍及使用 Scala、Java、Python 三种语言演示

一、RDD 的起源 在 RDD 出现之前, 当时 MapReduce 是比较主流的, 而 MapReduce 如何执行流程如下&#xff1a; 多个 MapReduce 任务之间只能通过磁盘来进行传递数据&#xff0c;很明显的效率低下&#xff0c;再来看 RDD 的处理方式&#xff1a; 整个过程是共享内存的, 而不需…

利用pycharm调试ssh远程程序,并实时同步文件

或许你的服务器由于设置问题&#xff0c;不能通过Vscode进行远程调试python程序&#xff0c;那么本篇文章提供了利用pycharm远程调试程序的方法&#xff0c;且使用的编译器可以是服务器中的虚拟环境的编译器&#xff0c;可以实时同步本地与服务器的文件内容。希望对你能够有所帮…

【Oracle系列1】Oracle 的connect权限和create session的区别

【Oracle系列1】Oracle 的connect权限和create session的区别 背景 oracle数据库新建用户之后是无法登录的&#xff0c;需要赋予connect角色或者create session权限。 注意&#xff1a; connect是角色不是权限&#xff0c;create session是权限不是角色。角色是权限的集合。…

c++中的结构体

结构体&#xff1a;属于用户自定义的数据类型&#xff0c;允许用户存储不同的数据类型 语法&#xff1a;struct 结构体名 {结构体成员列表}&#xff1b;通过结构体创建变量的三种方式&#xff1a;1、struct 结构体名 变量名2、struct 结构体名 变量名{成员1值&#xff0c;成员…

第一个Shader程序

shader 很复杂&#xff0c;我学习的过程中也确实感受到了&#xff0c;需要会数学、图形学、编程语法等等知识。不如让我们直接看看 Shader 到底是什么&#xff1f;直接应用起来。或许没有那么复杂。 1、在场景中新建一个正方体&#xff0c;如下图 2、在 project 面板下新建一…

超级棒,使用 LIME 和 SHAP 可轻松解释机器学习模型的预测

在本文中&#xff0c;我将介绍两个可以帮助了解模型的决策过程的模型 LIME 和 SHAP。 作为数据科学家或机器学习从业者&#xff0c;将可解释性集成到机器学习模型中可以帮助决策者和其他利益相关者有更多的可见性并可以让他们理解模型输出决策的解释。 文章目录技术提升模型SH…

day02 redis

day02 Redis 第一章 Redis持久化机制 Redis的高性能是由于其将所有数据都存储在了内存中&#xff0c;为了使Redis在重启之后仍能保证数据不丢失&#xff0c;需要将数据从内存中同步到硬盘(文件)中&#xff0c;这一过程就是持久化。Redis 提供了一系列不同的持久化选项&#x…

MyBatis框架入门(含实例)

目录 1.MyBatis简介 2.ORM框架 3.数据持久化 4.Mybatis入门实战案例 4.1 下载mybatis的jar包 4.2 将jar包导入工程中 4.3 配置Mybatis的核心配置文件 4.3.1 MyBatis核心文件模板(mybatis-config) 4.3.2 mybatis-config模板的设置 4.4 创建User 实体类 4.5定义DAO层M…

11.25学到的东西==命令行

创建文件&#xff0c;可以直接选择文件之后再加上.py import argparse# 单个参数 # 创建解析器 # ArgumentParser 对象包含将命令行解析成 Python 数据类型所需的全部信息。 parser argparse.ArgumentParser() # 单独的参数 square 之后这个help就是提示的信息 # 显示给定数字…

【药材识别】基于matlab GUI SVM色差色温判断药材炮制程度系统【含Matlab源码 2241期】

⛄一、SVM色差色温判断药材炮制程度系统简介 本课题来源于"十二五"国家科技支撑计划项目(2012BAI29B11).颜色是中药质量标准中性状评价极为重要的内容,但传统的中药颜色检测大多依靠人的感官评估,人对颜色的辨别是一个非常复杂的过程,受到光学,视觉生理,视觉心理等诸…

JDBC操作数据库实现增、删、查、改

0.JDBC概念 实际开发中,手动的输入SQL语句是少之又少,大多数情况下是通过编译代码进行来控制自动执行. 具体操作如下: 上述展示有一个【自己写的Mysql客户端】&#xff0c;这种操作是非常容易的&#xff0c;因为各种数据库本身就提供一系列的API&#xff0c;可以让用户很方便…

wordpress 安装主题显示要配置FTP的解决办法

目录 问题复现 1、在安装插件的时候会弹出一个窗口 2、输入相关信息后显示失败 问题解决方法 1、查看wordpress文件权限 2、修改wordpress文件权限 3、插件安装完后&#xff0c;将权限改回 场景&#xff1a;基于Linux 的 wordpress 安装主题显示要配置FTP 安装插件或者主…

RegExp 对象

文章目录RegExp 对象创建RegExp对象正则表达式语法RegExp 对象方法支持正则表达式的 String 对象的方法RegExp.prototype[search]()replace() 方法match()常用正则表达式RegExp 对象 RegExp对象表示正则表达式&#xff0c;是由普通字符和特殊字符(也叫元字符或限定符)组成的文…

基于节点分层的配网潮流前推回代方法matlab程序(IEEE33节点潮流计算)

基于节点分层的配网潮流前推回代方法matlab程序&#xff08;IEEE33节点潮流计算&#xff09; 摘要&#xff1a;结合配电网特有的辐射状特点&#xff0c;提出了一种新的基于节点分层的配网潮流前推回代方法。该方法利用配网支路及其节点参数所形成的节点-节点关联矩阵推导出节点…

MiniAlphaGo黑白棋 蒙特卡洛搜索

做个笔记。 一、蒙特卡洛在黑白棋的应用 输入&#xff1a;棋盘&#x1d44f;&#x1d45c;&#x1d44e;&#x1d45f;&#x1d451;、当前执子方&#x1d450;&#x1d45c;&#x1d459;&#x1d45c;&#x1d45f;、搜索时间&#x1d461;&#x1d456;&#x1d45a;&#x…

小米平板5ProWIFI(elish)刷ArrowOS

文章目录警告下载奇兔刷机系统本体及Recovery清除数据刷入AospRec开始刷入警告完成设置输入法变砖头了qwq又是警告芝士截图Root方法结尾警告 此文章只针对 小米平板5Pro Wifi版本&#xff08;elish&#xff09; 由于条件限制&#xff0c;本文大部分无配图 请务必仔细认真阅读此…