jdk线程池ThreadPoolExecutor优雅停止原理解析(自己动手实现线程池)(二)

news2025/1/11 19:49:19

ThreadPoolExecutor优雅停止源码分析(自己动手实现线程池v2版本)

ThreadPoolExecutor为了实现优雅停止功能,为线程池设置了一个状态属性,其共有5种情况。
在第一篇博客中曾介绍过,AtomicInteger类型的变量ctl同时维护了两个业务属性当前活跃工作线程个数与线程池状态,其中ctl的高3位用于存放线程池状态。

线程池工作状态介绍

线程池工作状态是单调推进的,即从运行时->停止中->完全停止。共有以下五种情况

1. RUNNING

RUNNING状态,代表着线程池处于正常运行(运行时)。RUNNING状态的线程池能正常的接收并处理提交的任务
ThreadPoolExecutor初始化时对ctl赋予的默认属性便是RUNNING(private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));)
RUNNING状态下线程池正常工作的原理已经在第一篇博客中详细的介绍过了,这里不再赘述。

2. SHUTDOWN

SHUTDOWN状态,代表线程池处于停止对外服务的状态(停止中)。不再接收新提交的任务,但依然会将workQueue工作队列中积压的任务逐步处理完。
用户可以通过调用shutdown方法令线程池由RUNNING状态进入SHUTDOWN状态,shutdown方法会在下文详细展开分析。

3. STOP

STOP状态,代表线程池处于停止状态。不再接受新提交的任务(停止中),同时也不再处理workQueue工作队列中积压的任务,当前还在处理任务的工作线程将收到interrupt中断通知
用户可以通过调用shutdownNow方法令线程池由RUNNING或者SHUTDOWN状态进入STOP状态,shutdownNow方法会在下文详细展开分析。

4. TIDYING

TIDYING状态,代表着线程池即将完全终止,正在做最后的收尾工作(停止中)。
在线程池中所有的工作线程都已经完全退出,且工作队列中的任务已经被清空时会由SHUTDOWN或STOP状态进入TIDYING状态。

5. TERMINATED

TERMINATED状态,代表着线程池完全的关闭(完全停止)。

public class MyThreadPoolExecutorV2 implements MyThreadPoolExecutor {
    /**
     * 当前线程池中存在的worker线程数量 + 状态的一个聚合(通过一个原子int进行cas,来避免对两个业务属性字段加锁来保证一致性)
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;

    /**
     * 32位的有符号整数,有3位是用来存放线程池状态的,所以用来维护当前工作线程个数的部分就只能用29位了
     * 被占去的3位中,有1位原来的符号位,2位是原来的数值位
     * */
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

    /**
     * 线程池状态poolStatus常量(状态值只会由小到大,单调递增)
     * 线程池状态迁移图:
     *         ↗ SHUTDOWN ↘
     * RUNNING       ↓       TIDYING → TERMINATED
     *         ↘   STOP   ↗
     * 1 RUNNING状态,代表着线程池处于正常运行的状态。能正常的接收并处理提交的任务
     * 线程池对象初始化时,状态为RUNNING
     * 对应逻辑:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
     *
     * 2 SHUTDOWN状态,代表线程池处于停止对外服务的状态。不再接收新提交的任务,但依然会将workQueue工作队列中积压的任务处理完
     * 调用了shutdown方法时,状态由RUNNING -> SHUTDOWN
     * 对应逻辑:shutdown方法中的advanceRunState(SHUTDOWN);
     *
     * 3 STOP状态,代表线程池处于停止状态。不再接受新提交的任务,同时也不再处理workQueue工作队列中积压的任务,当前还在处理任务的工作线程将收到interrupt中断通知
     * 之前未调用shutdown方法,直接调用了shutdownNow方法,状态由RUNNING -> STOP
     * 之前先调用了shutdown方法,后调用了shutdownNow方法,状态由SHUTDOWN -> STOP
     * 对应逻辑:shutdownNow方法中的advanceRunState(STOP);
     *
     * 4 TIDYING状态,代表着线程池即将完全终止,正在做最后的收尾工作
     * 当前线程池状态为SHUTDOWN,任务被消费完工作队列workQueue为空,且工作线程全部退出完成工作线程集合workers为空时,tryTerminate方法中将状态由SHUTDOWN->TIDYING
     * 当前线程池状态为STOP,工作线程全部退出完成工作线程集合workers为空时,tryTerminate方法中将状态由STOP->TIDYING
     * 对应逻辑:tryTerminate方法中的ctl.compareAndSet(c, ctlOf(TIDYING, 0)
     *
     * 5 TERMINATED状态,代表着线程池完全的关闭。之前线程池已经处于TIDYING状态,且调用的钩子函数terminated已返回
     * 当前线程池状态为TIDYING,调用的钩子函数terminated已返回
     * 对应逻辑:tryTerminate方法中的ctl.set(ctlOf(TERMINATED, 0));
     * */

    //  11100000 00000000 00000000 00000000
    private static final int RUNNING = -1 << COUNT_BITS;
    //  00000000 00000000 00000000 00000000
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    //  00100000 00000000 00000000 00000000
    private static final int STOP = 1 << COUNT_BITS;
    //  01000000 00000000 00000000 00000000
    private static final int TIDYING = 2 << COUNT_BITS;
    //  01100000 00000000 00000000 00000000
    private static final int TERMINATED = 3 << COUNT_BITS;

    private static int runStateOf(int c) {
        return c & ~CAPACITY;
    }
    
    private static int ctlOf(int rs, int wc) {
        return rs | wc;
    }
    
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    /**
     * 推进线程池工作状态
     * */
    private void advanceRunState(int targetState) {
        for(;;){
            // 获得当前的线程池状态
            int currentCtl = this.ctl.get();

            // 1 (runState >= targetState)如果当前线程池状态不比传入的targetState小
            // 代表当前状态已经比参数要制定的更加快(或者至少已经处于对应阶段了),则无需更新poolStatus的状态(或语句中第一个条件为false,直接break了)
            // 2  (this.ctl.compareAndSet),cas的将runState更新为targetState
            // 如果返回true则说明cas更新成功直接break结束(或语句中第一个条件为false,第二个条件为true)
            // 如果返回false说明cas争抢失败,再次进入while循环重试(或语句中第一个和第二个条件都是false,不break而是继续执行循环重试)
            if (runStateAtLeast(currentCtl, targetState) ||
                    this.ctl.compareAndSet(
                            currentCtl,
                            ctlOf(targetState, workerCountOf(currentCtl)
                            ))) {
                break;
            }
        }
    }
}    
  • 因为线程池状态不是单独存放,而是放在ctl这一32位数据的高3位的,读写都比较麻烦,因此提供了runStateOf和ctlOf等辅助方法(位运算)来简化操作。
  • 线程池的状态是单调递进的,由于巧妙的将状态靠前的值设置的更小,因此通过直接比较状态的值来判断当前线程池状态是否推进到了指定的状态(runStateLessThan、runStateAtLeast、isRunning、advanceRunState)。

jdk线程池ThreadPoolExecutor优雅停止具体实现原理

线程池的优雅停止一般要能做到以下几点:

  1. 线程池在中止后不能再受理新的任务
  2. 线程池中止的过程中,已经提交的现存任务不能丢失(等待剩余任务执行完再关闭或者能够把剩余的任务吐出来还给用户)
  3. 线程池最终关闭前,确保创建的所有工作线程都已退出,不会出现资源的泄露

下面我们从源码层面解析ThreadPoolExecutor,看看其是如何实现上述这三点的.

如何中止线程池

ThreadPoolExecutor线程池提供了shutdown和shutdownNow这两个public方法给使用者用于发出线程池的停止指令。

shutdown方法

shutdown方法用于关闭线程池,并令线程池从RUNNING状态转变位SHUTDOWN状态。位于SHUTDOWN状态的线程池,不再接收新任务,但已提交的任务会全部被执行完。

    /**
     * 关闭线程池(不再接收新任务,但已提交的任务会全部被执行)
     * 但不会等待任务彻底的执行完成(awaitTermination)
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;

        // shutdown操作中涉及大量的资源访问和更新,直接通过互斥锁防并发
        mainLock.lock();
        try {
            // 用于shutdown/shutdownNow时的安全访问权限
            checkShutdownAccess();
            // 将线程池状态从RUNNING推进到SHUTDOWN
            advanceRunState(SHUTDOWN);
            // shutdown不会立即停止所有线程,而仅仅先中断idle状态的多余线程进行回收,还在执行任务的线程就慢慢等其执行完
            interruptIdleWorkers();
            // 单独为ScheduledThreadPoolExecutor开的一个钩子函数(hook for ScheduledThreadPoolExecutor)
            onShutdown();
        } finally {
            mainLock.unlock();
        }

        // 尝试终止线程池
        tryTerminate();
    }

    /**
     * 用于shutdown/shutdownNow时的安全访问权限
     * 检查当前调用者是否有权限去通过interrupt方法去中断对应工作线程
     * */
    private void checkShutdownAccess() {
        // 判断jvm启动时是否设置了安全管理器SecurityManager
        SecurityManager security = System.getSecurityManager();
        // 如果没有设置,直接返回无事发生

        if (security != null) {
            // 设置了权限管理器,验证当前调用者是否有modifyThread的权限
            // 如果没有,checkPermission会抛出SecurityException异常
            security.checkPermission(shutdownPerm);
    
            // 通过上述校验,检查工作线程是否能够被调用者访问
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (MyWorker w : workers) {
                    // 检查每一个工作线程中的thread对象是否有权限被调用者访问
                    security.checkAccess(w.thread);
                }
            } finally {
                mainLock.unlock();
            }
        }
    }

    /**
     * 中断所有处于idle状态的线程
     * */
    private void interruptIdleWorkers() {
        // 默认打断所有idle状态的工作线程
        interruptIdleWorkers(false);
    }

    private static final boolean ONLY_ONE = true;

    /**
     * 中断处于idle状态的线程
     * @param onlyOne 如果为ture,至多只中断一个工作线程(可能一个都不中断)
     *                如果为false,中断workers内注册的所有工作线程
     * */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (MyWorker w : workers) {
                Thread t = w.thread;
                // 1. t.isInterrupted(),说明当前线程存在中断信号,之前已经被中断了,无需再次中断
                // 2. w.tryLock(), runWorker方法中如果工作线程获取到任务开始工作,会先进行Lock加锁
                // 则这里的tryLock会加锁失败,返回false。 而返回true的话,就说明当前工作线程是一个idle线程,需要被中断
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        // tryLock成功时,会将内部state的值设置为1,通过unlock恢复到未加锁的状态
                        w.unlock();
                    }
                }
                if (onlyOne) {
                    // 参数onlyOne为true,至多只中断一个工作线程
                    // 即使上面的t.interrupt()没有执行,也在这里跳出循环
                    break;
                }
            }
        } finally {
            mainLock.unlock();
        }
    }
    
    /**
     * 单独为jdk的ScheduledThreadPoolExecutor开的一个钩子函数
     * 由ScheduledThreadPoolExecutor继承ThreadExecutor时重写(包级别访问权限)
     * */
    void onShutdown() {}
  1. shutdown方法在入口处使用mainLock加锁后,通过checkShutdownAccess检查当前是否有权限访问工作线程(前提是设置了SecurityManager),如果无权限则会抛出SecurityException异常。
  2. 通过advanceRunState方法将线程池状态推进到SHUTDOWN。
  3. 通过interruptIdleWorkers使用中断指令(Thread.interrupt)唤醒所有处于idle状态的工作线程(存在idle状态的工作线程代表着当前工作队列是空的)。
    idle的工作线程在被唤醒后从getTask方法中退出(getTask中对应的退出逻辑在下文中展开),进而退出runWorker方法,最终系统回收掉工作线程占用的各种资源(第一篇博客中runWorker的解析中提到过)。
  4. 调用包级别修饰的钩子函数onShutdown。这一方法是作者专门为同为java.util.concurrent包下的ScheduledThreadPoolExecutor提供的拓展,不在本篇博客中展开。
  5. 前面提到SHUTDOWN状态的线程池在工作线程都全部退出且工作队列为空时会转变为TIDYING状态,因此通过调用tryTerminate方法尝试终止线程池(当前不一定会满足条件,比如调用了shutdown但工作队列还有很多任务等待执行)。
    tryTerminate方法中细节比较多,下文中再展开分析。

shutdownNow方法

shutdownNow方法同样用于关闭线程池,但比shutdown方法更加激进。shutdownNow方法令线程池从RUNNING状态转变为STOP状态,不再接收新任务,而工作队列中未完成的任务会以列表的形式返回给shutdownNow的调用者。

  • shutdown方法在调用后,虽然不再接受新任务,但会等待工作队列中的队列被慢慢消费掉;而shutdownNow并不会等待,而是将当前工作队列中的所有未被捞取执行的剩余任务全部返回给shutdownNow的调用者,并对所有的工作线程(包括非idle的线程)发出中断通知。
  • 这样做的好处是线程池可以更快的进入终止态,而不必等剩余的任务都完成,都返回给用户后也不会丢任务。
    /**
     * 立即关闭线程池(不再接收新任务,工作队列中未完成的任务会以列表的形式返回)
     * @return 当前工作队列中未完成的任务
     * */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;

        final ReentrantLock mainLock = this.mainLock;

        // shutdown操作中涉及大量的资源访问和更新,直接通过互斥锁防并发
        mainLock.lock();
        try {
            // 用于shutdown/shutdownNow时的安全访问权限
            checkShutdownAccess();
            // 将线程池状态从RUNNING推进到STOP
            advanceRunState(STOP);
            interruptWorkers();

            // 将工作队列中未完成的任务提取出来(会清空线程池的workQueue)
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }

        // 尝试终止线程池
        tryTerminate();
        return tasks;
    }

   /**
    * shutdownNow方法内,立即终止线程池时该方法被调用
    * 中断通知所有已经启动的工作线程(比如等待在工作队列上的idle工作线程,或者run方法内部await、sleep等,令其抛出中断异常快速结束)
    * */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (MyWorker w : workers) {
              // 遍历所有的worker线程,已启动的工作线程全部调用Thread.interrupt方法,发出中断信号
              w.interruptIfStarted();
            }
        } finally {
            mainLock.unlock();
        }
    }

   /**
    * 将工作队列中的任务全部转移出来
    * 用于shutdownNow紧急关闭线程池时将未完成的任务返回给调用者,避免任务丢失
    * */
   private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> queue = this.workQueue;
        ArrayList<Runnable> taskList = new ArrayList<>();
        queue.drainTo(taskList);
        // 通常情况下,普通的阻塞队列的drainTo方法可以一次性的把所有元素都转移到taskList中
        // 但jdk的DelayedQueue或者一些自定义的阻塞队列,drainTo方法无法转移所有的元素
        // (比如DelayedQueue的drainTo方法只能转移已经不需要延迟的元素,即getDelay()<=0)
        if (!queue.isEmpty()) {
           // 所以在这里打一个补丁逻辑:如果drainTo方法执行后工作队列依然不为空,则通过更基础的remove方法把队列中剩余元素一个一个的循环放到taskList中
           for (Runnable r : queue.toArray(new Runnable[0])) {
              if (queue.remove(r)) {
                taskList.add(r);
              }
           }
        }
        
        return taskList;
   }    
  1. shutdownNow方法在入口处使用mainLock加锁后,与shutdown方法一样也通过checkShutdownAccess检查当前是否有权限访问工作线程(前提是设置了SecurityManager),如果无权限则会抛出SecurityException异常。
  2. 通过advanceRunState方法将线程池状态推进到STOP。
  3. 通过interruptWorkers使用中断指令(Thread.interrupt)唤醒所有工作线程(区别于shutdown中的interruptIdleWorkers)。区别在于除了idle的工作线程,所有正在执行任务的工作线程也会收到中断通知,期望其能尽快退出任务的执行。
  4. 通过drainQueue方法将当前工作线程中剩余的所有任务以List的形式统一返回给调用者。
  5. 通过调用tryTerminate方法尝试终止线程池。

如何保证线程池在中止后不能再受理新的任务?

在execute方法作为入口,提交任务的逻辑中,v2版本相比v1版本新增了一些基于线程池状态的校验(和jdk的实现保持一致了)。

execute方法中的校验

  • 首先在execute方法中,向工作队列加入新任务前(workQueue.offer)对当前线程池的状态做了一个校验(isRunning(currentCtl))。希望非RUNNING状态的线程池不向工作队列中添加新任务
    但在做该检查时可能与shutdown/shutdownNow内推进线程池状态的逻辑并发执行,所以在工作队列成功加入任务后还需要再检查一次线程池状态,如果此时已经不是RUNNING状态则需要通过remove方法将刚入队的任务从队列中移除,并调用reject方法(拒绝策略)

addWorker方法中的校验

  • 在addWorker方法的入口处(retry:第一层循环通过(runState >= SHUTDOWN && !(runState == SHUTDOWN && firstTask == null && !workQueue.isEmpty())))逻辑,
    保证了不是RUNNING状态的线程池(runState >= SHUTDOWN),无法创建新的工作线程(addWorker返回false)。
    但有一种特殊情况:即SHUTDOWN状态下(runState == SHUTDOWN),工作队列不为空(!workQueue.isEmpty()),且不是第一次提交任务时创建新工作线程(firstTask == null),
    依然允许创建新的工作线程,因为即使在SHUTDOWN状态下,某一存活的工作线程发生中断异常时,会调用processWorkerExit方法,在销毁原有工作线程后依然需要调用addWorker重新创建一个新的(firstTask == null)

execute与shutdown/shutdownNow并发时的处理

execute提交任务时addWorker方法和shutdown/shutdownNow方法是可能并发执行的,但addWorker中有多处地方都对线程池的状态进行了检查,尽最大的可能避免线程池停止时继续创建新的工作线程。

  1. retry循环中,compareAndIncrementWorkerCount方法会cas的更新状态(此前获取到的ctl状态必然是RUNNING,否则走不到这里),cas成功则会跳出retry:循环( break retry;)。
    而cas失败可能有两种情况:
    如果是workerCount发生了并发的变化,则在内层的for (;;)循环中进行重试即可
    如果线程池由于收到终止指令而推进了状态,则随后的if (runStateOf(currentCtl) != runState)将会为true,跳出到外层的循环重试(continue retry)
  2. 在new Worker(firstTask)后,使用mainLock获取锁后再一次检查线程池状态(if (runState < SHUTDOWN ||(runState == SHUTDOWN && firstTask == null)))。
    由于shutdown、shutdownNow也是通过mainLock加锁后才推进的线程池状态,因此这里获取到的状态是准确的。
    如果校验失败(if结果为false),则workers中不会加入新创建的工作线程,临时变量workerAdded=false,则工作线程不会启动(t.start())。临时变量workerStarted也为false,最后会调用addWorkerFailed将新创建的工作线程回收掉(回滚)

基于execute方法和addWorker方法中关于各项关于线程池停止状态校验,最大程度的避免了线程池在停止过程中新任务的提交和可能的新工作线程的创建。使得execute方法在线程池接收到停止指令后(>=SHUTDOWN),最终都会去执行reject拒绝策略逻辑。

/**
     * 提交任务,并执行
     * */
    @Override
    public void execute(Runnable command) {
        if (command == null){
            throw new NullPointerException("command参数不能为空");
        }

        int currentCtl = this.ctl.get();
        if (workerCountOf(currentCtl) < this.corePoolSize) {
            // 如果当前存在的worker线程数量低于指定的核心线程数量,则创建新的核心线程
            boolean addCoreWorkerSuccess = addWorker(command,true);
            if(addCoreWorkerSuccess){
                // addWorker添加成功,直接返回即可
                return;
            }

            // addWorker失败了
            // 失败的原因主要有以下几个:
            // 1 线程池的状态出现了变化,比如调用了shutdown/shutdownNow方法,不再是RUNNING状态,停止接受新的任务
            // 2 多个线程并发的execute提交任务,导致cas失败,重试后发现当前线程的个数已经超过了限制
            // 3 小概率是ThreadFactory线程工厂没有正确的返回一个Thread

            // 获取最新的ctl状态
            currentCtl = this.ctl.get();
        }

        // 走到这里有两种情况
        // 1 因为核心线程超过限制(workerCountOf(currentCtl) < corePoolSize == false),需要尝试尝试将任务放入阻塞队列
        // 2 addWorker返回false,创建核心工作线程失败

        // 判断当前线程池状态是否为running
        // 如果是running状态,则进一步执行任务入队操作
        if(isRunning(currentCtl) && this.workQueue.offer(command)){
            // 线程池是running状态,且workQueue.offer入队成功

            int recheck = this.ctl.get();
            // 重新检查状态,避免在上面入队的过程中线程池并发的关闭了
            // 如果是isRunning=false,则进一步需要通过remove操作将刚才入队的任务删除,进行回滚
            if (!isRunning(recheck) && remove(command)) {
                // 线程池关闭了,执行reject操作
                reject(command);
            } else if(workerCountOf(currentCtl) == 0){
                // 在corePoolSize为0的情况下,当前不存在存活的核心线程
                // 一个任务在入队之后,如果当前线程池中一个线程都没有,则需要兜底的创建一个非核心线程来处理入队的任务
                // 因此firstTask为null,目的是先让任务先入队后创建线程去拉取任务并执行
                addWorker(null,false);
            }else{
                // 加入队列成功,且当前存在worker线程,成功返回
                return;
            }
        }else{
            // 阻塞队列已满,尝试创建一个新的非核心线程处理
            boolean addNonCoreWorkerSuccess = addWorker(command,false);
            if(!addNonCoreWorkerSuccess){
                // 创建非核心线程失败,执行拒绝策略(失败的原因和前面创建核心线程addWorker的原因类似)
                reject(command);
            }else{
                // 创建非核心线程成功,成功返回
                return;
            }
        }
    }
/**
     * 向线程池中加入worker
     * */
    private boolean addWorker(Runnable firstTask, boolean core) {
        // retry标识外层循环
        retry:
        for (;;) {
            int currentCtl = ctl.get();
            int runState = runStateOf(currentCtl);

            // Check if queue empty only if necessary.
            // 线程池终止时需要返回false,避免新的worker被创建
            // 1 先判断runState >= SHUTDOWN
            // 2 runState >= SHUTDOWN时,意味着不再允许创建新的工作线程,但有一种情况例外
            // 即SHUTDOWN状态下(runState == SHUTDOWN),工作队列不为空(!workQueue.isEmpty()),还需要继续执行
            // 比如在当前存活的线程发生中断异常时,会调用processWorkerExit方法,在销毁原有工作线程后调用addWorker重新创建一个新的(firstTask == null)
            if (runState >= SHUTDOWN && !(runState == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
                // 线程池已经是关闭状态了,不再允许创建新的工作线程,返回false
                return false;
            }

            // 用于cas更新workerCount的内层循环(注意这里面与jdk的写法不同,改写成了逻辑一致但更可读的形式)
            for (;;) {
                // 判断当前worker数量是否超过了限制
                int workerCount = workerCountOf(currentCtl);
                if (workerCount >= CAPACITY) {
                    // 当前worker数量超过了设计上允许的最大限制
                    return false;
                }
                if (core) {
                    // 创建的是核心线程,判断当前线程数是否已经超过了指定的核心线程数
                    if (workerCount >= this.corePoolSize) {
                        // 超过了核心线程数,创建核心worker线程失败
                        return false;
                    }
                } else {
                    // 创建的是非核心线程,判断当前线程数是否已经超过了指定的最大线程数
                    if (workerCount >= this.maximumPoolSize) {
                        // 超过了最大线程数,创建非核心worker线程失败
                        return false;
                    }
                }

                // cas更新workerCount的值
                boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);
                if (casSuccess) {
                    // cas成功,跳出外层循环
                    break retry;
                }

                // 重新检查一下当前线程池的状态与之前是否一致
                currentCtl = ctl.get();  // Re-read ctl
                if (runStateOf(currentCtl) != runState) {
                    // 从外层循环开始continue(因为说明在这期间 线程池的工作状态出现了变化,需要重新判断)
                    continue retry;
                }

                // compareAndIncrementWorkerCount方法cas争抢失败,重新执行内层循环
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;

        MyWorker newWorker = null;
        try {
            // 创建一个新的worker
            newWorker = new MyWorker(firstTask);
            final Thread myWorkerThread = newWorker.thread;
            if (myWorkerThread != null) {
                // MyWorker初始化时内部线程创建成功

                // 加锁,防止并发更新
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();

                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int runState = runStateOf(ctl.get());

                    // 重新检查线程池运行状态,满足以下两个条件的任意一个才创建新Worker
                    // 1 runState < SHUTDOWN
                    // 说明线程池处于RUNNING状态正常运行,可以创建新的工作线程
                    // 2 runState == SHUTDOWN && firstTask == null
                    // 说明线程池调用了shutdown,但工作队列不为空,依然需要新的Worker。
                    // firstTask == null标识着其不是因为外部提交新任务而创建新Worker,而是在消费SHUTDOWN前已提交的任务
                    if (runState < SHUTDOWN ||
                            (runState == SHUTDOWN && firstTask == null)) {
                        if (myWorkerThread.isAlive()) {
                            // 预检查线程的状态,刚初始化的worker线程必须是未唤醒的状态
                            throw new IllegalThreadStateException();
                        }

                        // 加入worker集合
                        this.workers.add(newWorker);

                        int workerSize = workers.size();
                        if (workerSize > largestPoolSize) {
                            // 如果当前worker个数超过了之前记录的最大存活线程数,将其更新
                            largestPoolSize = workerSize;
                        }

                        // 创建成功
                        workerAdded = true;
                    }
                } finally {
                    // 无论是否发生异常,都先将主控锁解锁
                    mainLock.unlock();
                }

                if (workerAdded) {
                    // 加入成功,启动worker线程
                    myWorkerThread.start();
                    // 标识为worker线程启动成功,并作为返回值返回
                    workerStarted = true;
                }
            }
        }finally {
            if (!workerStarted) {
                addWorkerFailed(newWorker);
            }
        }

        return workerStarted;
    }
    public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        // 当一个任务从工作队列中被成功移除,可能此时工作队列为空。尝试判断是否满足线程池中止条件
        tryTerminate();
        return removed;
    }

如何保证中止过程中不丢失任务?

  1. 通过shutdown关闭线程池时,SHUTDOWN状态的线程池会等待所有剩余的任务执行完毕后再进入TIDYING状态。
  2. 通过shutdownNow关闭线程池时,以返回值的形式将剩余的任务吐出来还给用户

中止前已提交的任务不会丢失;而中止后线程池也不会再接收新的任务(走拒绝策略)。这两点共同保证了提交的任务不会丢失。

如何保证线程池最终关闭前,所有工作线程都已退出?

线程池在收到中止命令进入SHUTDOWN或者STOP状态时,会一直等到工作队列为空且所有工作线程都中止退出后才会推进到TIDYING阶段。
上面描述的条件是一个复合的条件,其只有在“收到停止指令(进入SHUTDOWN或者STOP状态)”、"工作队列中任务被移除或消费(工作队列为空)"或是“工作线程退出(所有工作线程都中止退出)”这三类事件发生时才有可能满足。
而判断是否满足条件并推进到TIDYING状态的关键就在tryTerminate方法中。tryTerminate顾名思义便是用于尝试终止线程池的,当上述任意事件触发时便判断是否满足终止条件,如果满足则将线程池推进到TIDYING阶段。
因此在ThreadPoolExecutor中tryTerminate一共在6个地方被调用,分别是shutdown、shutdownNow、remove、purge、addWorkerFailed和processWorkerExit方法。

  • shutdown、shutdownNow方法触发收到停止指令的事件
  • remove、purge方法触发工作队列中任务被移除的事件
  • addWorkerFailed、processWorkerExit方法触发工作线程退出的事件

tryTerminate源码分析

   /**
     * 尝试判断是否满足线程池中止条件,如果满足条件,将其推进到最后的TERMINATED状态
     * 注意:必须在任何可能触发线程池中止的场景下调用(例如工作线程退出,或者SHUTDOWN状态下队列工作队列为空等)
     * */
    final void tryTerminate() {
        for (;;) {
            int currentCtl = this.ctl.get();
            if (isRunning(currentCtl)
                    || runStateAtLeast(currentCtl, TIDYING)
                    || (runStateOf(currentCtl) == SHUTDOWN && !workQueue.isEmpty())) {
                // 1 isRunning(currentCtl)为true,说明线程池还在运行中,不满足中止条件
                // 2 当前线程池状态已经大于等于TIDYING了,说明之前别的线程可能已经执行过tryTerminate,且通过了这个if校验,不用重复执行了
                // 3 当前线程池是SHUTDOWN状态,但工作队列中还有任务没处理完,也不满足中止条件
                // 以上三个条件任意一个满足即直接提前return返回
                return;
            }

            // 有两种场景会走到这里
            // 1 执行了shutdown方法(runState状态为SHUTDOWN),且当前工作线程已经空了
            // 2 执行了shutdownNow方法(runState状态为STOP)
            // 这个时候需要令所有的工作线程都主动的退出来回收资源
            if (workerCountOf(currentCtl) != 0) {
                // 如果当前工作线程个数不为0,说明还有别的工作线程在工作中。
                // 通过interruptIdleWorkers(true),打断其中的一个idle线程,尝试令其也执行runWorker中的processWorkerExit逻辑,并执行tryTerminate
                // 被中断的那个工作线程也会执行同样的逻辑(getTask方法返回->processWorkerExit->tryTerminate)
                // 这样可以一个接着一个的不断打断每一个工作线程,令其逐步的退出(比起一次性的通知所有的idle工作线程,这样相对平滑很多)
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            // 线程池状态runState为SHUTDOWN或者STOP,且存活的工作线程个数已经为0了
            // 虽然前面的interruptIdleWorkers是一个一个中断idle线程的,但实际上有的工作线程是因为别的原因退出的(恰好workerCountOf为0了)
            // 所以这里是可能存在并发的,因此通过mainLock加锁防止并发,避免重复的terminated方法调用和termination.signalAll方法调用
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // cas的设置ctl的值为TIDYING+工作线程个数0(防止与别的地方ctl并发更新)
                if (ctl.compareAndSet(currentCtl, ctlOf(TIDYING, 0))) {
                    try {
                        // cas成功,调用terminated钩子函数
                        terminated();
                    } finally {
                        // 无论terminated钩子函数是否出现异常
                        // cas的设置ctl的值为TERMINATED最终态+工作线程个数0(防止与别的地方ctl并发更新)
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 通知使用awaitTermination方法等待线程池关闭的其它线程(通过termination.await等待)
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }

            // 如果上述对ctl变量的cas操作失败了,则进行重试,再来一次循环
            // else retry on failed CAS
        }
    }

如何保证工作线程一定能成功退出?

从上面tryTerminate方法的实现中可以看到,线程池必须等到所有工作线程都全部退出(workerCount为0),工作线程占用的全部资源都回收后才会推进到终止态。
那么之前启动的工作线程一定能通过processWorkerExit退出并销毁吗?答案是不一定,这主要取决于用户是否正确的编写了令工作线程安全退出的任务逻辑。
因为只有能退出任务执行逻辑(runWorker方法中的task.run())的工作线程才有机会执行processWorkerExit,无法从任务中跳出(正常退出or抛异常)的工作线程将永远无法退出,导致线程池也永远无法推进到终态。

下面分情况讨论:

  • 任务中的逻辑是一定会执行完正常结束的(没有无限循环也没有令线程陷入阻塞态的操作)。那么这是没问题的
   ()->{
        // 会正常结束的
        System.out.println("hello world!");
   };
  • 任务中存在需要无限循环的逻辑。那么最好在循环条件内监听一个volatile的变量,当需要线程池停止时,修改这个变量,从而令任务从无限循环中正常退出。
    ()->{
        // 无限循环
        while(true){
            System.out.println("hello world!");
        }
    };
    ()->{
        // 无限循环时监听一个变量
        while(!isStop) {
            System.out.println("hello world!");
        }
    };
  • 任务中存在Condition.await等会阻塞当前线程,令其无法自然退出的逻辑。
    tryTerminate中停止工作线程时会调用Worker类的interruptIfStarted方法发出中断指令(Thread.interrupt方法),如果被阻塞的方法是响应中断的,那么业务代码中不能无脑吞掉InterruptedException,而要能感知到中断异常,在确实要关闭线程池时令任务退出(向上抛异常或正常退出)。
    而如果是不响应中断的阻塞方法(如ReentrantLock.lock),则需要用户自己保证这些方法最终能够被唤醒,否则工作线程将无法正常退出而阻止线程池进入终止状态。
    ()->{
            try {
                new ReentrantLock().newCondition().await();
            } catch (InterruptedException e) {
                // doSomething处理一些逻辑后。。。
                // 向上抛出异常
                throw new XXXException(e);
            }
        }
    ()->{
        try {
            new ReentrantLock().newCondition().await();
        } catch (InterruptedException e) {

        }
        // doSomething处理一些逻辑后。。。正常退出
    }

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/22616.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot SpringBoot 开发实用篇 5 整合第三方技术 5.8 变更缓存供应商 memcached

SpringBoot 【黑马程序员SpringBoot2全套视频教程&#xff0c;springboot零基础到项目实战&#xff08;spring boot2完整版&#xff09;】 SpringBoot 开发实用篇 文章目录SpringBootSpringBoot 开发实用篇5 整合第三方技术5.8 变更缓存供应商 memcached5.8.1 memcached 缓存…

如何在 Navicat 16 中仅备份数据库结构 | 数据传输

尽管有少数据库管理员&#xff08;DBA&#xff09;不相信执行定期数据库备份是有用的&#xff0c;但对于如何最好地执行此操作有很多意见。无论你采用哪种方法&#xff0c;都有很多充分的理由保留数据库模式的副本。当发生数据丢失时&#xff0c;你可以从模式中还原数据库结构&…

[附源码]SSM计算机毕业设计二手车交易系统JAVA

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

Java中的final和常量以及枚举

Java中的final和常量以及枚举final的作用常量常量概述和基本作用常量做信息标志和分类枚举枚举的概念定义枚举类的格式枚举的特征final的作用 1.final关键字是最终的意思&#xff0c;可以修饰&#xff08;类、方法、变量&#xff09; 2.修饰类&#xff1a;表明该类是最终类&am…

数据结构之:链表

链表初体验之单链表 线性表 线性表"线性存储结构" —— 一根线能串起来的数组 存储到物理空间之中 数据需要有相同的数据类型 元素之间的关系 需要是“一对一” 两种存储方式“顺序” 和“链式”链表介绍 分为有头节点的链表和没有头节点的链表。 插入的时候&#xf…

化合物在高通量筛选中的作用

在 1985 年之前&#xff0c;先导物的筛选主要是通过人工进行的&#xff0c;每周处理的样本数量不过几百个&#xff0c;组合化学的出现使得科学家们获取化合物的方式发生了显著变化&#xff0c;他们可以在短时间内合成大量化合物。更重要的是&#xff0c;随着分子生物学和功能基…

【构建ML驱动的应用程序】第 1 章 :从产品目标到 ML 框架

&#x1f50e;大家好&#xff0c;我是Sonhhxg_柒&#xff0c;希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流&#x1f50e; &#x1f4dd;个人主页&#xff0d;Sonhhxg_柒的博客_CSDN博客 &#x1f4c3; &#x1f381;欢迎各位→点赞…

SpringBoot框架详细教学保姆级说明

目录 文章目录1.1 简介1.2 特性1.3 四大核心2 springboot入门案例2.1 SpringBoot 项目开发步骤2.2 创建一个 Spring MVC 的 Spring BootController2.3 分析2.4 核心配置文件格式2.5 Spring Boot 前端使用 JSP3 SpringBoot框架Web开发3.1 Spring Boot 集成 MyBatis3.2 DAO 的其它…

D. Make It Round(math)

Problem - D - Codeforces 在Berlandia发生了通货膨胀&#xff0c;所以商店需要改变商品的价格。 商品n的当前价格已经给出。允许将该商品的价格提高k倍&#xff0c;1≤k≤m&#xff0c;k为整数。输出商品的最圆的可能的新价格。也就是在最后有最大数量的零的那个。 例如&…

开发工程师的面经

目录1. static关键字2. 多态是什么&#xff1f;3. ArrayList和LinkList的区别区别ArrayList的扩容机制4. Java是编译型还是解释型&#xff1f;5. 什么是编译&#xff1f;什么是解释&#xff1f;6. String str“abc” 和 String str new String(“abc”)的区别&#xff1f;7. i…

C\C++刷题ADY3

题目来源&#xff1a;力扣 1.第一题 203. 移除链表元素 - 力扣&#xff08;LeetCode&#xff09; 思路分析:&#xff08;不带哨兵位的头节点&#xff09; 每次去分析一个节点&#xff0c; 如果节点不是存的是6&#xff0c;就拿节点来尾插 如果节点存的不是6&#xff0c;就把节…

计算机毕业设计ssm+vue基本微信小程序的“香草屋”饮料奶茶点单小程序

项目介绍 随着社会的发展,互联网的迅速发展,5G时代的到来,智能手机的普及,人们的生活方式更加智能一体化,衣食住行也越来越简单快捷,人们的生活也更加趋向于智能化,一台智能手机可以解决生活中的各种难题。为了使人们的生活更加方便,于是各种生活小程序普遍产生, 随着微信小程序…

记一次 .NET 某自动化采集软件 崩溃分析

一&#xff1a;背景 1.讲故事 前段时间有位朋友找到我&#xff0c;说他的程序在客户的机器上跑着跑着会出现偶发卡死&#xff0c;然后就崩掉了&#xff0c;但在本地怎么也没复现&#xff0c;dump也抓到了&#xff0c;让我帮忙看下到底怎么回事&#xff0c;其实崩溃类的dump也…

java项目-第155期ssm班级同学录网站-java毕业设计_计算机毕业设计

java项目-第155期ssm班级同学录网站-java毕业设计_计算机毕业设计 【源码请到资源专栏下载】 今天分享的项目是《ssm班级同学录网站》 该项目分为2个角色&#xff0c;管理员、用户。 用户可以浏览前台,包含功能有&#xff1a; 首页、公告信息、校友风采、论坛信息&#xff0c;用…

vue项目中使用 NProgress 进度加载插件

场景&#xff1a;每次进入页面时&#xff0c;页面顶部都有一个加载条 下来说下这个效果怎么实现的 第一步&#xff1a;下载 NProgress 插件 npm install --save nprogress 第二步&#xff1a;导入 nprogress 并配置 切记&#xff1a;NP 都必须是大写 我是在路由页面中导入的 im…

搜索引擎项目开发过程以及重难点整理

目录认识搜索引擎搜索的核心思路倒排索引介绍项目简介模块管理构建索引模块数据库设计正排索引倒排索引程序入口类Indexer类扫描文档子模块FileScanner类构建文档子模块构建标题parseTitle方法构建urlparseUrl方法构建内容parseContent方法构建正排索引子模块正排索引实体类map…

Nginz静态资源缓存

缓存 缓存&#xff08;cache&#xff09;&#xff0c;原始意义是指访问速度比一般随机存取存储器&#xff08;RAM&#xff09;快的一种高速存储器&#xff0c;通常它不像系统主存那样使用DRAM技术&#xff0c;而使用昂贵但较快速的SRAM技术。缓存的设置是所有现代计算机系统发…

【前端-TypeScript】TypeScript学习思维导图-一图看完《TypeScript编程》

目录前言文章已收录至https://lichong.work&#xff0c;转载请注明原文链接。 ps&#xff1a;欢迎关注公众号“Fun肆编程”或添加我的私人微信交流经验&#x1f91d; 前言 现在&#xff0c;TypeScript 正在逐渐成为与前端框架以及 ES6 语法同一地位的基础工具&#xff0c;更多…

计算机毕业设计node+vue基于微信小程序的西餐外卖系统 uniapp 小程序

项目介绍 随着信息技术和网络技术的飞速发展,人类已进入全新信息化时代,传统管理技术已无法高效,便捷地管理信息。为了迎合时代需求,优化管理效率,各种各样的管理系统应运而生,各行各业相继进入信息管理时代,西餐外卖系统就是信息时代变革中的产物之一。 任何系统都要遵循系统…

力扣每日一题:808. 分汤 【dp动态规划】

有 A 和 B 两种类型 的汤。一开始每种类型的汤有 n 毫升。有四种分配操作&#xff1a; 提供 100ml 的 汤A 和 0ml 的 汤B 。提供 75ml 的 汤A 和 25ml 的 汤B 。提供 50ml 的 汤A 和 50ml 的 汤B 。提供 25ml 的 汤A 和 75ml 的 汤B 。 当我们把汤分配给某人之后&#xff0c;汤…