JUC(十)-线程池-ThreadPoolExecutor分析

news2025/1/15 13:04:14

ThreadPoolExecutor 应用 & 源码解析

文章目录

  • ThreadPoolExecutor 应用 & 源码解析
    • 一、线程池相关介绍
      • 1.1 为什么有了JDK提供的现有的创建线程池的方法(Executors类中的方法),然而还需要自定义线程池
        • ThreadPoolExecutor 提供的七个核心参数大致了解
        • JDK提供的几种拒绝策略
    • 二、ThreadPoolExecutor的应用
    • 三、ThreadPoolExecutor源码解析
      • 3.1 线程池的核心属性
      • 3.2 线程池的有参构造
      • 3.3 execute 方法
        • 3.3.1 execute方法本体
      • 3.3.2 execute 方法的执行流程图
        • 3.3.3 addWorker方法
          • 3.3.3.1 addWorkerFailed , 启动工作线程失败执行方法
      • 3.4 Worker工作线程类
        • 3.4.1 runWorker
        • 3.4.2 getTask()
      • 3.5 关闭线程池方法
        • 3.5.1 shutdownNow()
          • advanceRunState
          • interruptWorkers
        • 3.5.2 shutdown();
          • interruptIdleWorkers()
        • 3.5.3 tryTerminate
    • 四、线程池整体流程图

一、线程池相关介绍

1.1 为什么有了JDK提供的现有的创建线程池的方法(Executors类中的方法),然而还需要自定义线程池

前面演示的Executors中的构建线程池的方式(newXXX一类的方法),大多数都是基于ThreadPoolExecutor new来创建出来的

ThreadPollExecutor的构造器中 一共提供了七个参数 , 每个参数都是非常核心的参数 , 在线程池去执行任务时每个参数都有对任务决定性的作用
如果直接使用JDK提供的Executors中的方法来创建线程池,其中可以自定义设置的核心参数只有两个,这样的话会导致线程池的控制粒度很粗。在阿里规范中也推荐自己去自定义线程池,手动new线程池 指定参数
自定义线程池,可以细粒度的控制线程池,针对一些参数的设置可以在后期有效的帮助我们排查问题(ThreadFactory)

ThreadPoolExecutor 提供的七个核心参数大致了解

public ThreadPoolExecutor(
        // 1. 核心工作线程个数(当前任务执行结束后,这个线程不会被销毁 , 结束后这个线程会执行take方法死等未来要处理的任务)
        int corePoolSize,
        // 2. 最大工作线程个数(池中一共可以有多少个工作线程=核心数+非核心数) , 非核心工作线程执行完任务后会执行 poll(time,unit) 方法过段时间自动销毁
        // 例如 corePollSize = 2,maximumPoolSize=5,那么就表示线程池中最多有5个工作线程存在 其中2个是核心线程 , 剩下3个是非核心线程
        int maximumPoolSize,
        // 3. 非核心工作线程,在阻塞队列中等待的时间 keepAliveTime
        long keepAliveTime,
        // 4. 上边keepAliveTime时间的单位
        TimeUnit unit,
        // 5. 在没有核心线程处理任务时,会把任务扔到此阻塞队列中等待非核心工作线程处理,具体选择哪种阻塞队列需要根据业务来选择
        BlockingQueue<Runnable> workQueue,
        // 6. 创建线程的工厂 , 一般用来指定线程池中线程的名称 (只用来构建Thread对象 而不是核心线程Worker对象)
        ThreadFactory threadFactory,
        // 7. 当线程池无法处理加入的任务时,执行拒绝策略
        RejectedExecutionHandler handler){
        
    
}

JDK提供的几种拒绝策略

  • AbortPolicy: 这个拒绝策略会在线程池无法处理任务时,直接抛出一个异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}
  • CallerRunsPolicy: 这个拒绝策略会在线程池无法处理任务时, 会将当前提交过来的任务让调用线程池的线程去处理该任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        // 让提交给线程池任务的线程去处理该任务 , 这种则是同步的方式
        r.run();
    }
}
  • DiscardPolicy: 这个拒绝策略会在线程池无法处理任务时,会直接丢弃掉这个任务(不做任何处理)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    // 空空如也、不做任何处理
}
  • DiscardOldestPolicy: 这个拒绝策略会在线程池无法处理任务时,会将当前阻塞队列中 最早加入的任务丢弃掉然后把本次提交的任务放进去
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        // 将阻塞队列中最早加入的任务移除
        e.getQueue().poll();
        // 再次将本次提交的任务提交到线程池中
        e.execute(r);
    }
}
  • 自定义线程池的拒绝策略
    • 根据自己的业务,可以将任务存储到数据库中,也可以做其他操作

二、ThreadPoolExecutor的应用

public class TestThreadPollExecutor {

    private static final AtomicInteger counter = new AtomicInteger(1);

    public static void main(String[] args) {
        // 1. 构建自定义线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2,
                5,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(5),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread returnThread = new Thread(r);
                        // 这里可以设置线程的一些属性
                        returnThread.setName("Test-ThreadPoolExecutor-" + counter.getAndIncrement());
                        return returnThread;
                    }
                },
                new MyRejectExecution()
        );

        // ===============
        // 2. 让线程池处理任务
        // ===============

        // 2.1 没有返回结果的任务
        threadPool.execute(() -> {
            System.out.println(Thread.currentThread().getName() + "-没有返回结果的处理方式-Runnable");
        });

        // 2.2 有返回结果的任务
        Future<String> handleResultFuture = threadPool.submit(() -> {
            System.out.println(Thread.currentThread().getName() + "-有返回结果的处理方式-Callable");
            return "处理的返回结果!";
        });
        try {
            System.out.println(Thread.currentThread().getName() + "-拿到返回结果=>" + handleResultFuture.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        // 3. 局部变量的线程池 用完之后需要 shutdown
        threadPool.shutdown();


        /*
            输出结果:

            Test-ThreadPoolExecutor-1-没有返回结果的处理方式-Runnable
            Test-ThreadPoolExecutor-2-有返回结果的处理方式-Callable
            main-拿到返回结果=>处理的返回结果!
            
         */
    }

    /**
     * 自定义拒绝策略
     */
    private static class MyRejectExecution implements RejectedExecutionHandler{
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("自定义拒绝策略,根据自己的业务逻辑来实现具体的拒绝策略功能!");
        }
    }

}

三、ThreadPoolExecutor源码解析

3.1 线程池的核心属性

/** 
 *  1. ctl 这个就是线程池最核心的属性,其实就是一个int类型的数值,只是套了一层AtomicInteger,在进行运算时是原子性的,下方所有的内容都是在对ctl这个属性做操作
 *  2. ctl 表示的是线程池的两个核心状态: <1> 线程池的状态  <2> 工作线程的个数(不区分核心数和非核心数)
 *     <1.> 线程池状态:   ctl代表的int类型值的高3位 , 表示当前线程池的状态
 *     <2.> 工作线程数量:  ctl的低29位表示,工作线程个数
 *     因此需要进行位运算来操作这两个状态,下方就提供了一些位运算的属性和计算这两种状态的位运算方法
 *  ctl的默认值通过 ctlOf(RUNNING,0)返回的结果作为默认值 , 即 (111 | 00000000 00000000 00000000 00000000) ==> 11100000 00000000 00000000 00000000
 */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 常量COUNT_BITS 一般表示的值就是 29 为了更方便的操作 int 的低29位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 00000000 00000000 00000000 00000001 => 1
// 00100000 00000000 00000000 00000000 => 1 << 29
// 00011111 11111111 11111111 11111111 => 1 << 29 - 1
// CAPACITY 表示当前线程池中 可以记录到的工作线程的做大值
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// =============== 线程池状态相关属性 ==================

// 可以看到线程池的状态有5种状态 , 当且仅当线程池状态为-1时 表示当前线程池没有问题 可以处理任务 其他都是有问题的状态

// -1二进制表示 : 11111111 11111111 11111111 11111111
// <1> RUNNING(-1): 高3位是 111
// 此时可以处理任务,也可以操作阻塞队列种的任务
private static final int RUNNING    = -1 << COUNT_BITS;
// <2> SHUTDOWN(0): 高3位是 000
// **此时不会接收新的任务,但是正在处理的任务和阻塞队列中的任务都会执行完毕**
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// <3> STOP(1) : 高3位是 001
// **此时不接收新的任务,而且会调用正在处理的任务线程的中断方法,阻塞队列种的任务一个不管** 
// 中断方法:(前面直到中断并不会直接结束线程而是由线程自己决定什么时候中断,只是告诉线程一声(改变中断标志位))
private static final int STOP       =  1 << COUNT_BITS;
// <4> TIDYING(2) : 高3位是 010
// 这个状态是从SHUTDOWN或者STOP状态转变过来的一个过度状态 , 表示当前线程池即将要关闭了,但是还未关闭
private static final int TIDYING    =  2 << COUNT_BITS;
// <5> TERMINATED(3) : 高3位是 011
// 线程池关闭的真正的状态 , 这个状态是由 TIDYING 转换过来的,转换过来之后为了执行一个钩子方法terminated()
private static final int TERMINATED =  3 << COUNT_BITS;

// 根据传入的ctl值 拿到ctl高3位的值 - 也即当前运行状态
// ~CAPACITY ==> 11100000 00000000 00000000 00000000
// ctl与此值执行&运算会拿到当前ctl所代表的线程池的运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 同理 拿到当前ctl低29位的值 代表当前线程池中的工作线程个数
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 传入运行状态rs , 和工作线程个数ws , 让这两个值组成一个新的ctl值 
// 两个值经过 | 运算会把他们的二进制位拼起来 组成一个新的ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • 线程池状态及其转换流程图
    • 线程池状态及其转换流程图
    • 老郑内容

3.2 线程池的有参构造

/**
 * 无论调用ThreadPollExecutor的哪个有参构造 最终都会调用这个构造器
 * 
 * 重点: 核心线程数是允许为0的
 * 
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    
    // corePoolSize可以 == 0
    // 最大工作线程数 必须 >  0
    // 最大工作线程数 必须 >= 核心线程数
    // 非核心线程的空闲时间 必须 >= 0
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    // 校验阻塞独立额、线程工厂、拒绝策略 均不允许为null
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    // 系统资源访问相关内容,和线程池核心业务无关
    this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
    // 各种赋值 , 赋值给成员变量
    // 后边会大量出现 Doug Lea的编码习惯 将这些成员变量作为局部变量进行操作
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    // 转换为纳秒,为了更精确
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

3.3 execute 方法

1.execute方法是提交任务到线程池的核心方法,很重要
2.线程池的执行流程就是execute方法 内部做了哪些判断

3.3.1 execute方法本体

// command 就是提交过来的任务
public void execute(Runnable command) {
    // 提交的任务不能为null
    if (command == null)
        throw new NullPointerException();
    // 拿到当前线程池 ctl属性 , 为了后续判断线程池状态判断
    int c = ctl.get();
    /**
     *  workerCountOf(c) : 拿到当前线程池的工作线程个数
     *  线程池的核心线程是懒加载的 , 只有用到了才会创建
     *  拿到工作线程个数 < 核心线程数 表示线程池还可以添加核心线程 那么就添加核心线程来处理此任务
     */
    if (workerCountOf(c) < corePoolSize) {
        // addWorker(任务,是否为核心线程)
        // addWorker 返回true 表示添加工作线程成功 , 返回false 则添加工作线程失败
        // 添加失败的原因比如 addWorker 执行时线程池执行了 shutdownNow()方法 中断所有线程此时就不能再添加工作线程了返回false ||| 或者并发判断工作线程数量时产生并发问题
        // addWorker 会基于线程池的状态、以及工作线程个数来判断能否添加工作线程
        if (addWorker(command, true))
            // 工作线程构建出来、并且任务也交给工作线程处理了 本次提交任务直接返回true
            return;
        // ================ 到这里说明添加工作线程失败 (线程池状态或者工作线程个数发生了变化导致添加失败) ===============
        // 此时ctl已经由于并发原因被改变了 所以重新获取ctl
        c = ctl.get();
    }
    
    // ====== 不能添加核心线程 或者 添加核心线程失败了 继续走下边的逻辑 ============
    /**
     * isRunning(c) : 判断当前线程池的状态是不是RUNNING状态(前边讲过线程池只有是RUNNING状态时才可以添加任务)
     *                如果线程池状态是RUNNING状态:则可以添加当前任务到阻塞队列中 , 否则继续执行下边else的逻辑
     */
    if (isRunning(c) && workQueue.offer(command)) {
        // 添加任务到阻塞队列中成功则走if里边的代码
        
        // 由于这个if逻辑并没有加锁,所以ctl可能会出现并发修改问题 因此 当任务添加到阻塞队列之后,重新检查线程池的状态是否发生了改变
        // 重新获取ctl
        int recheck = ctl.get();
        /**
            如果线程池的状态不是 RUNNING 状态了 , 那么就将本次添加的任务从阻塞队列中移除
         */
        if (!isRunning(recheck) && remove(command))
            // 执行拒绝策略
            reject(command);
        
        // =========== 上边的if判断没有执行 程序运行到这里说明 此时阻塞队列中已经存在了本次添加的任务command =================
        
        // 如果工作线程是0个,那么就需要添加一个工作线程去处理这个任务
        // 发生这种情况有两种情况: 
        // 1.构建线程池时 核心线程数为0  
        // 2.设置核心线程允许超时allowCoreThreadTimeOut属性设置为true
        else if (workerCountOf(recheck) == 0)
            // 添加一个 不需要任务且是非核心的工作线程 , 目的是处理在阻塞队列中无法被处理的任务
            addWorker(null, false);
    }
    
    // 线程池的状态不是RUNNING状态 或者 状态是RUNNING但是将任务添加到阻塞队列中 失败了走这个逻辑

   /**
    * 到这里 可能由于调用了shutdown、或者shutdownNow方法改变了线程池状态,或者是阻塞队列满了无法添加此任务
    * 如果是shutdown状态
    */
    // 添加非工作线程成功 直接结束本次execute方法
    else if (!addWorker(command, false))
        // 添加失败,执行拒绝策略
        reject(command);
}

3.3.2 execute 方法的执行流程图

  • 1.找的图片 image.png
  • 2.画的流程图 执行流程图

3.3.3 addWorker方法

/**
 * firstTask: 就是传入过来待执行的任务
 * core: true则添加的是核心线程,false则添加非核心线程
 * addWorker方法 可以分为两个模块去看 
 *          第一个模块就是对线程池状态、工作线程数量做校验模块
 *          第二个模块就是添加工作线程模块
 * 
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    // ================================================================================
    // ==================== addWorker第一块 对线程池的状态信息做校验 =======================
    // ===============================================================================
        
    // 为了从内层for循环,跳出外层for循环 使用标签语法
    retry:
    // =======================================================
    // **外层在校验线程池的状态信息 , 内层for循环在校验线程池工作线程数**
    // =======================================================
    for (;;) {
        // 拿到ctl、根据ctl线程池状态rs
        int c = ctl.get();
        int rs = runStateOf(c);

        // rs >= SHUTDOWN 说明线程池状态不是RUNNING,即当前线程池的状态不是正常的 , 当前不能添加新的任务
        // 如果线程池状态为SHUTDOWN,并且阻塞队列中有任务,那么就需要添加一个工作线程去处理阻塞队列中的任务 ( 满足前边的addWorker(null,false) )
        if (rs >= SHUTDOWN &&
            // 到这里状态已经不是RUNNING状态了,只有SHUTDOWN状态才可以继续处理阻塞队列中的任务
            // 如果三个条件有任意一个不满足返回false,配合! 代表不需要添加工作线程的
            // 1.不是SHUTDOWN状态(而是STOP状态),2. 任务不是空的,3. 阻塞队列是空的 |||| 满足这三个条件 则不添加工作线程
            ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            // 由于线程池状态的原因 本次添加工作线程失败
            return false;
        
        // =======================================================
        // ** 内层循环做工作线程数量的判断操作**
        // =======================================================

        for (;;) {
            // 根据ctl获取当前 工作线程个数 wc
            int wc = workerCountOf(c);
            // 如果工作线程数量 >= 最大容量 
            if (wc >= CAPACITY ||
                // 基于core判断能否添加 核心线程或非核心线程
                // 如果要添加核心线程 工作线程 >= 核心线程数量 则不可以再添加核心线程 直接返回false
                // 如果要添加非核心线程那么 当前工作线程数 >= 设置的最大工作线程数 那么也不可以添加 直接返回false
                wc >= (core ? corePoolSize : maximumPoolSize))
                // 由于工作线程个数的问题 本次添加工作线程失败
                return false;
            
            // ===== 上边数量校验通过 证明本次addWorker是可以添加工作线程 =====
        
            // CAS自增ctl , 然后break跳出第一模块校验参数模块
            if (compareAndIncrementWorkerCount(c))
                // 跳出两层for循环 进入addWorker第二模块(添加工作线程模块)
                break retry;
            
            // ============== 到这说明 CAS自增ctl属性失败 ctl发生了并发现象 =============
            // 再次拿到ctl , 此时拿到其他线程更改过后的ctl
            c = ctl.get();
            // 根据新的ctl获取线程池状态 如果发现状态和进入此方法时状态不同 那么就重新执行 第一模块校验参数模块
            // 如果状态没变 那么重新执行内层for循环即可 重新基于CAS自增ctl
            if (runStateOf(c) != rs)
                // 重新执行第一模块校验模块
                continue retry;
        }
    }
    
    // ================================================================================
    // ======================= addWorker第二块,添加工作线程、启动工作线程 ====================
    // ================================================================================
    
    // 工作线程启动成功标志位 默认false未启动
    boolean workerStarted = false;
    // 工作线程添加成功标志位 默认false未添加
    boolean workerAdded = false;
    // 工作线程对象,默认为null
    Worker w = null;
    try {
        // 创建工作线程对象并且把任务传递进去 , 如果传过来的firstTask不是空的 那么就会先处理带过来的任务 , 如果没有那么才回去阻塞队列中查找任务
        w = new Worker(firstTask);
        /**
         * Worker()构造器内部  this.thread = getThreadFactory().newThread(this);
         * 获取工作线程对象里边由ThreadFactory创建出来的 Thread对象
         */
        final Thread t = w.thread;
        // 这里Thread如果是null,那么可能是 new Worker() 时内部使用提供的ThreadFactory对象创建出来的Thread是null

        if (t != null) {
            // 加锁 , 因为下方会操作成员变量可能会出现线程安全问题
            // 下方会操作成员变量 workers、largestPoolSize
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 再次获取当前线程池状态 rs
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN : 说明线程池只能是RUNNING状态 , 状态正常执行if逻辑
                // rs == SHUTDOWN && firstTask == null , 只能是前边的 addWorker(null,false) 用来添加一个非核心工作线程来处理阻塞队列中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 到这里边说明可以添加工作线程
                    
                    // 如果线程已经启动过了 那么抛出异常
                    // 这里只可能是ThreadFactory里边生成Thread是已经启动了的线程
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // private final HashSet<Worker> workers = new HashSet<Worker>();
                    // 校验线程没有问题 , 把当前工作线程添加到 workes 工作线程集合中
                    workers.add(w);
                    // 获取 工作线程数量
                    int s = workers.size();
                    // private int largestPoolSize; 表示线程池最大线程个数记录
                    // 当前工作线程个数 > 原来的最大线程个数
                    if (s > largestPoolSize)
                        // 替换最大线程个数
                        largestPoolSize = s;
                    // 添加工作线程成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 添加成功 启动工作线程
            if (workerAdded) {
                // 启动工作线程
                t.start();
                // 启动成功标志
                workerStarted = true;
            }
        }
    } finally {
        // 如果工作线程启动失败 , 那么这个工作线程也就没有意义了 执行addWorkerFailed方法
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回工作线程是否启动成功
    return workerStarted;
}
3.3.3.1 addWorkerFailed , 启动工作线程失败执行方法
// 启动工作线程失败 做补偿操作
private void addWorkerFailed(Worker w) {
    // 需要操作workes 所以需要加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            // 将这个工作线程从 workes移除
            workers.remove(w);
        // 将 ctl -1 , 代表去掉了一个工作线程个数 (addWorker对ctl+1操作是在 第一模块校验时已经+1了)
        decrementWorkerCount();
        // 工作线程启动失败 可能是因为状态改变了 判断是不是可以走 TIDYING -> TERMINATED 结束线程池
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

3.4 Worker工作线程类

/**
 * 1. Worker继承了AQS,可以实现一些锁的机制,这里的目的就是为了控制工作线程的中断
 * 2. Worker实现了Runnable,内部的Thread对象在执行start时,会执行Worker中的run方法逻辑
 * 
 */
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
    
    // ====================================================================
    // ================================ Worker 管理任务模块 =================
    // ====================================================================
    
    // 由线程工厂构建出来的线程对象
    final Thread thread;
    // 当前Worker要执行的任务
    Runnable firstTask;
    // 记录当前工作线程处理了多少个任务
    volatile long completedTasks;
    
    // 工作线程只有一个创建方式 就是此有参构造器
    Worker(Runnable firstTask) {
        // 将state值设置为 -1 , 表示当前Worker不允许被执行中断方法
        setState(-1);
        // 赋值任务
        this.firstTask = firstTask;
        // 利用线程工厂创建Thread对象
        // 看到传入的Runnable是当前Worker对象 , 所以Worker里边的thread线程启动后实际执行的是 Worker的run方法逻辑
        this.thread = getThreadFactory().newThread(this);
    }
    
    // 上边可知 Worker里边的thread启动后 实际执行的就是这个run方法的逻辑 , 执行的就是runWorker()方法
    @Override
    public void run() {
        runWorker(this);
    }


    // ====================================================================
    // =========================== Worker 管理线程中断模块(AQS) ==============
    // ====================================================================
    
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    /**
     *  这个方法是中断工作线程时,执行的方法
     */
    void interruptIfStarted() {
        Thread t;
        /**
         * getState() >= 0 , 上边新建Worker时state设置为-1 因此-1表示当前Worker不允许执行中断操作
         * thread 不为空 , 并且 t 未中断 则可以执行中断线程
         * 
         */
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                // state不是-1 , t不为空,t未中断 , 则可以执行中断线程
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

3.4.1 runWorker

// 能够运行这个方法的唯一方式就是: Worker的run方法 , 因此运行这个方法的一定是线程池的工作线程
final void runWorker(Worker w) {
    // 拿到当前线程 wt
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    // 已经拿到了task 将工作线程中存储任务的字段设置为null
    w.firstTask = null;
    // 将Worker中的 state设置为0 , 代表当前工作线程给可以被中断
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        /** 
         * 这个while循环 就是要让这个工作线程一直处于工作状态 拿到任务->处理任务 , 当结束了while循环则考虑干掉当前工作线程了
         *  
         * 如果传入的工作线程Worker 带的有任务 那么就执行带过来的任务 
         * 如果传入的工作线程Worker 没有带任务 那么就去阻塞队列中拿取任务
         *
         */
        while (task != null || (task = getTask()) != null) {
            // 执行当前Worker的lock方法, 表示当前工作线程已经开始工作了 不允许中断当前线程
            // 使用shutdown()关闭线程池时 会检查Worker是否正在运行 正在运行的工作线程则不会被中断
            w.lock();
            
            // 比较ctl>=STOP,如果满足这个条件 则说明线程池已经到了STOP甚至已经终结了
            // 1. 线程池到了STOP状态,并且当前线程还未中断 , 则需要中断当前线程(if里的逻辑)
            // 2. 线程池状态不是STOP,线程已经中断并且再次查看线程池状态变为了STOP 那么就再次中断线程

            // 下面的判断其实就是表达了,如果线程池是 >= STOP状态 那么就必须确保这个线程是处于中断状态的
            if (
                (runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) 
                &&
                !wt.isInterrupted()){
                    wt.interrupt();
                }
            try {
                // 前置钩子函数
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 后置勾子函数
                    afterExecute(task, thrown);
                }
            } finally {
                // 任务执行完成 , 丢掉任务
                task = null;
                // 当前工作线程处理任务数量++操作
                w.completedTasks++;
                // 执行unlock 将state改为0 , shutdown方法才可以中断这个工作线程
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 结束掉当前工作线程
        processWorkerExit(w, completedAbruptly);
    }
}
  • processWorkerExit 结束工作线程方法
/**
 * @param w , 当前要执行结束的工作线程对象
 * @param completedAbruptly 突然完成标志位,如果runWorker中判定是突然完成那么就会传入true
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果是突然完成的,表示是异常结束工作线程
    if (completedAbruptly)
        // 减少工作线程数量
        decrementWorkerCount();
    
    // 下方需要操作共享变量 所以拿到锁并加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 将当前工作线程的 完成任务数量合并到线程池的完成任务数
        completedTaskCount += w.completedTasks;
        // 将当前工作线程从workes中移除掉
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 工作线程结束 查看是否是线程池状态改变了
    tryTerminate();

    // 拿到ctl
    int c = ctl.get();
    // 这个判断表示 如果当前线程池状态 < STOP , 即只能为 RUNNING、SHUTDOWN状态时
    if (runStateLessThan(c, STOP)) {
        // 不是突然完成的(正常结束工作线程) 可以进入这个if
        if (!completedAbruptly) {
            // 拿到当前线程池的 可能的最小核心线程数量 , 如果允许核心线程超时 那么最小的核心线程就是0 否则永远为corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果可能的核心线程数最小为0个 并且队列中不是空的
            if (min == 0 && ! workQueue.isEmpty())
                // 设置最小核心线程为1个 用来处理阻塞队列中的任务
                min = 1;
            // 比较 工作线程数量 >= 当前可能的最小核心线程数量 说明有线程处理任务 正常return
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 异常结束工作线程,为了避免出现问题 添加一个空任务的非核心线程来填补上刚刚异常结束的工作线程
        addWorker(null, false);
    }
}

3.4.2 getTask()

/**
 * 这个方法就是从线程池的阻塞队列中获取要执行的任务 
 * 
 *    第一块就是 检查线程池状态和工作线程数量的校验 , 如果不满足直接返回null则拿取任务失败上层runWorker方法的while结束
 *    第一块校验通过了 才能执行第二块 从阻塞队列中拿取任务
 */
private Runnable getTask() {
    // 默认为false,默认没有超时
    boolean timedOut = false;

    for (;;) {
        // 拿到ctl 、根据ctl获取线程池状态
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // 如果线程池状态是STOP,那么不需要再处理阻塞队列中的任务了,直接返回null
        // 如果线程池状态是SHUTDOWN,并且阻塞队列是空的 那么也不需要再处理阻塞队列任务了
        if (rs >= SHUTDOWN && 
            (rs >= STOP || workQueue.isEmpty())) {
            // 扣减工作线程个数
            decrementWorkerCount();
            return null;
        }
        // 根据ctl拿到工作线程个数
        int wc = workerCountOf(c);
        // 1. 核心线程允许超时 timed为true
        // 2. 当前工作线程数已经大于了核心线程数 timed为true
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        if (
            // 工作线程>最大线程数(一般不会满足 看为false即可)
            // 工作线程<=核心线程 那么一定是false
            // 第一次进入getTask方法的for循环一定不会走这个if里边,第二次就有可能了
            (wc > maximumPoolSize || (timed && timedOut))
            && 
            // 在满足上边条件的前提下 , 当前执行的线程是非核心线程并且timeOut超时标记位是true
            // 确保工作线程除了自己还有至少一个 , 或者阻塞队列是空的 , 那么就可以尝试减少工作线程数量 返回null由上层方法结束掉当前工作线程了
            (wc > 1 || workQueue.isEmpty())) {
            // CAS减少工作线程数
            if (compareAndDecrementWorkerCount(c))
                return null;
            // 若CAS失败再次执行for循环
            continue;
        }
        
        // 工作线程从阻塞队列中拿任务
        
        try {
            // 是核心线程timed是false , 非核心是true
            Runnable r = timed ?
                // 非核心,等待一会 keepAliveTime
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                // 核心线程,使用take方法死等
                workQueue.take();
            // 从阻塞队列中拿到任务不为null,拿到任务返回任务然后去执行即可
            if (r != null)
                return r;
            // 到这,没拿到任务 timeOut设置为true , 所以再次经过for循环就可以退出这个方法返回null结束当前线程了
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

3.5 关闭线程池方法

3.5.1 shutdownNow()

  • 该方法会将线程池状态从 RUNNING -> STOP , 立即中断所有的工作线程 , 并且不会处理阻塞队列中的任务
// shutdownNow 不会处理阻塞队列的任务所以会将任务返回给客户端
public List<Runnable> shutdownNow() {
    // 返回的任务
    List<Runnable> tasks;
    // 加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 将线程池状态修改为 STOP 
        advanceRunState(STOP);
        // shutdownNow不管工作线程是否再执行任务都会执行 中断工作线程
        interruptWorkers();
        // 将阻塞队列中的任务放到tasks中
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
advanceRunState
private void advanceRunState(int targetState) {
    // 死循环必然会修改状态
    for (;;) {
        int c = ctl.get();
        // 如果当前线程池状态已经 >= 将要修改的状态 则不需要修改了
        if (runStateAtLeast(c, targetState) ||
            // 将targetState重新和工作线程数量组成新的ctl 并将ctl修改为这个新组装的值
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}
interruptWorkers
// shutdownNow方法 立即执行中断工作线程操作
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 执行所有Worker的interruptIfStarted方法
        // 所以shutdownNow会立即中断正在运行的工作线程
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

3.5.2 shutdown();

  • 该方法会将线程池状态从 RUNNING -> SHUTDOWN , 不会立即中断正在干活的工作线程 , 并且会等待阻塞队列中的任务处理完成后再关闭线程池
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 将线程池状态 变为 SHUTDOWN状态
        advanceRunState(SHUTDOWN);
        // 中断处于空闲状态的工作线程
        interruptIdleWorkers();
        // 勾子函数 自己实现
        onShutdown();
    } finally {
        mainLock.unlock();
    }
    // 尝试结束线程池
    tryTerminate();
}
interruptIdleWorkers()
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
// 中断空闲的工作线程
private void interruptIdleWorkers(boolean onlyOne) {
    // 加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 线程目前没有中断 , 那么就去获取Worker的锁资源 都满足的话才能够执行线程中断操作
            // 工作线程在执行任务时会先把当前工作线程的 state变为1 所以这里tryLock正在运行的工作线程是失败的(所以这里只会中断空闲的工作线程)
            if (!t.isInterrupted() && w.tryLock()) {
                // 进到这里 说明当前工作线程是空闲线程
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

3.5.3 tryTerminate

// 查看当前线程池是否可以变为TERMINATED状态
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 如果是RUNNING状态 直接return
        // 如果现在的状态 >= TIDYING 说明即将变为TERMINATED状态 那么也可以return结束方法
        // 如果是SHUTDOWN状态并且阻塞队列还有任务,那么也不可以变为TERMINATED状态 return结束
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        
        // 如果还有工作线程 
        if (workerCountOf(c) != 0) {
            // 中断空闲的工作线程 只中断一个
            interruptIdleWorkers(ONLY_ONE);
            // 本次尝试结束 , 等下次工作线程为0个之后 再进来尝试改变状态
            return;
        }

        // 加锁 , 可以执行Condition的释放操作
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 将线程池状态修改为 TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 执行勾子函数
                    // 可以在线程池结束时做一些额外操作 自己实现
                    terminated();
                } finally {
                    // 修改状态为TERMINATED状态
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 唤醒正在等待线程池结束的线程
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
    }
}

四、线程池整体流程图

线程池整体流程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/131060.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一辆适合长途出行的电动跑车 奥迪RS e-tron GT正式上市

作为奥迪品牌电动化发展的先锋力作&#xff0c;奥迪RS e-tron GT不止是前瞻科技的呈现&#xff0c;在e-tron纯电技术的加持下&#xff0c;更传递着RS的情怀&#xff0c;承载着人们对GT豪华休旅生活的向往。 2022年12月30日&#xff0c;伴随着Audi Channel第九期直播节目盛大开播…

MySQL存储引擎介绍以及InnoDB引擎结构理解

目录存储引擎概述各个存储引擎介绍InnoDBMySIAMMemeory其他引擎引擎有关的SQL语句InnoDB引擎逻辑存储结构架构内存部分磁盘部分后台线程InnoDB三大特性存储引擎概述 数据引擎是与数据真正存储的磁盘文件打交道的&#xff0c;它的上层&#xff08;服务层&#xff09;将处理好的…

我的Python学习笔记:私有变量

一、私有变量的定义 在Python中&#xff0c;有以下几种方式来定义变量&#xff1a; xx&#xff1a;公有变量_xx&#xff1a;单前置下划线&#xff0c;私有化属性或方法&#xff0c;类对象和子类可以访问&#xff0c;from somemodule import *禁止导入__xx&#xff1a;双前置下…

掌握Python中列表生成式的五个原因

1. 引言 在Python中我们往往使用列表生成式来代替for循环&#xff0c;本文通过引入实际例子&#xff0c;来阐述这背后的原因。 闲话少说&#xff0c;我们直接开始吧&#xff01; 2. 简洁性 列表生成式允许我们在一行代码中创建一个列表并对其元素执行相应的操作&#xff0…

(十五)大白话我们每一行的实际数据在磁盘上是如何存储的?

文章目录 1、前情回顾2、真实数据是如何存储的?3、隐藏字段4、初步的把磁盘上的数据和内存里的数据给关联起来1、前情回顾 之前我们已经给大家讲过了,一行数据在磁盘文件里存储的时候,包括如下几部分: 首先会包含自己的变长字段的长度列表然后是NULL值列表接着是数据头然后…

图的概念及存储结构

文章目录图的概念图(graph)有向图(directed graph)无向图(undirected graph)加权图(weighted graph)无向完全图(undirected complete graph)有向完全图(directed complete graph)子图(subgraph)稀疏图与稠密图度路径与回路连通图与连通分量强连通图与强连通分量生成树图的存储结…

STM32H750自制开发板调试经验

​本篇只是一个记录&#xff0c;没啥可看的。 STM32H750硬件相关 STM32H750可以通过USB-OTG下载程序&#xff0c;也可以使用SWD进行调试&#xff0c;所以设计板子得时候将PA13和PA12预留出来即可&#xff0c;后续也可以用作usb虚拟串口&#xff08;CDC&#xff09;功能或者模拟…

stm32f407VET6 系统学习 day08 利用adc 模数转换 监控光敏电阻。

1. ADC 的知识 1.基本概念 &#xff1a; Analog-to-Digital Converter的缩写。指模/数转换器或者模拟/数字转换器。是指将连续变量的模拟信号转换为离散的数字信号的器件 。典型的模拟数字转换器将模拟信号转换为表示一定比例电压值的数字信号。 2.STM32F4x ADC特点 1. 可配…

git操作

删除暂存区文件&#xff1a; git rm --cached 完整文件名 git rm --cached xxx.txt这个删&#xff0c;只是把暂存区里的文件删了&#xff0c;工作区里面的没有删 把本地文件添加到暂存区 git add完整文件名 例如&#xff1a;git add xxx.txt git add xxx.txt此时xxx.txt已经…

Linux 权限理解和学习

✨个人主页&#xff1a; Yohifo &#x1f389;所属专栏&#xff1a; Linux学习之旅 &#x1f38a;每篇一句&#xff1a; 图片来源 &#x1f383;操作环境&#xff1a; CentOS 7.6 阿里云远程服务器 Don’t argue with the people of strong determination, because they may ch…

AtCoder Beginner Contest 283 Ex. Popcount Sum(类欧经典问题:数x在二进制表示下第k位的值)

题目 t(t<1e5)组样例&#xff0c;每组样例给定n,m,r(1<m<n<1e9,0<r<m) 求[1,n]这n个数中&#xff0c;所有满足i%mr的数i的二进制的1的个数之和 即&#xff1a;&#xff0c; 其中&#xff0c;__builtin_popcount(i)统计的是i的二进制表示中&#xff0c;1的…

Web APIs

文章目录一. Web API介绍1. Web APIs 和 JS 基础关联性1.1 JS 的组成1.2 JS 基础阶段以及 Web APIs 阶段2. API的概念[3.Web API的概念](https://developer.mozilla.org/zh-CN/docs/Web/API)4. API 和 Web API 总结二. DOM 介绍1. DOM 简介1.1 什么是 DOM1.2 DOM 树2. 获取元素…

Linux-6 三剑客命令

Linux-6 三剑客命令 awk&#xff08;取列&#xff09; 将系统的IP地址打印出来 [rootdestiny ~]# yum install net-tools -y #分析&#xff1a;#1.肯定是需要拿到IP地址&#xff0c;仅看某一个特定的网卡&#xff1b;ifconfig#2.先想办法过滤出数据的那一行&#xff1b; ###行#…

5)Django Admin管理工具,Form组件,Auth

目录 一 Django Admin管理工具 激活管理工具 使用管理工具 复杂模型 自定义表单 内联(Inline)显示 列表页的显示 二 django Form组件 局部钩子和全局钩子 三 Django 用户认证&#xff08;Auth&#xff09;组件 一 Django Admin管理工具 Django 提供了基于 web 的管理…

年终报告撰写小技巧,你学会了吗?

年年岁岁花相似&#xff0c;岁岁年年人不同。 临近年底&#xff0c;又到了一年一度的年终报告时段了。同事间见面最让人头疼的问候&#xff0c;莫过于&#xff0c;“你的年终报告写了吗&#xff1f;” 有的人东拼西凑、应付了事&#xff0c;汇报内容乏善可陈&#xff0c;领导…

美美的圣诞树画出来-CoCube

2022年圣诞节到来啦&#xff0c;很高兴这次我们又能一起度过~ CSDN诚邀各位技术er分享关于圣诞节的各种技术创意&#xff0c;展现你与众不同的精彩&#xff01;参与本次投稿即可获得【话题达人】勋章【圣诞快乐】定制勋章&#xff08;1年1次&#xff0c;错过要等下一年喔&#…

尚医通-上传医院接口实现(十八)

目录&#xff1a; &#xff08;1&#xff09;上传医院接口-基础类的创建 &#xff08;2&#xff09;数据接口-上传医院接口-初步实现 &#xff08;3&#xff09;上传医院接口-最终实现 &#xff08;1&#xff09;上传医院接口-基础类的创建 复制相关的工具类&#xff1a;这…

Redis Windows版安装和使用

下载地址&#xff0c;亲已测试可放心使用 https://github.com/tporadowski/redis/releases Redis安装和基本使用&#xff08;windows版&#xff09; 1.Redis简介 完全开源免费的高性能的key-value的数据库 支持数据的持久化&#xff0c;可以将内存中的数据保存在磁盘中&…

【函数】一篇文章带你看懂控制流、递归、高阶函数

目录 控制流 条件语句 迭代语句 示例&#xff1a;质因数分解 递归 示例&#xff1a;阶乘 示例&#xff1a;斐波那契数列 示例&#xff1a;判断奇偶数 高阶函数 lambda 表达式 设计函数 示例&#xff1a;累加计算 示例&#xff1a;柯里化 Lab 1: Functions, Control …

个人能用的短信平台有哪些?看这一篇就够了

对于程序员个人来说&#xff0c;在做开发或者是接项目的时候&#xff0c;常常会用到发送短信功能模块&#xff0c;而自己写这个模块会要相当多的精力和时间&#xff0c;去找短信平台来解决问题&#xff0c;已经成了不少程序员的共识。 但市面上的短信平台确实很杂&#xff0c;鱼…