Java多线程(四)——ThreadPoolExecutor源码解析

news2025/1/13 13:58:58

ThreadPoolExecutor源码解析

多线程场景下,手动创建线程有许多缺点:

  • 频繁创建、销毁线程会消耗大量 CPU 资源,
  • 销毁线程后需要被回收,对 GC 垃圾回收也有一定的压力

使用线程池有许多好处:

  • 降低 CPU 资源消耗。通过复用线程,减少创建、销毁线程造成的消耗
  • 提高响应速度。由于有复用的线程,工作队列中的任务可以直接被空闲线程获取并执行,不需要等待线程创建。
  • 提高管理性。使用线程池统一分配管理,避免无限制创建线程消耗系统资源,降低系统稳定性。同时线程池提供了许多钩子函数,可以批量管理线程池中的线程。

ThreadPoolExecutor使用方法

public class ThreadPoolDemo1 {
    public static void main(String[] args) {
        //实例化自定义参数的线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
                3,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(20),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r){
                            @Override
                            public void run() {
                                super.run();
                            }
                        };
                        //t.setName();
                        return t;
                    }
                },
                new ThreadPoolExecutor.AbortPolicy());
		//执行任务
        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("say hello");
            }
        });
        //柔和地关闭线程池
        threadPoolExecutor.shutdown();
    }
}

线程池状态

  • RUNNING:接收新任务,并且处理阻塞队列中的任务
  • SHUTDOWN:拒绝新任务,但是处理阻塞队列里的任务
  • STOP:拒绝新任务,并且抛弃阻塞队列中的任务,同时中断正在处理的任务
  • TIDING:所有任务都执行完(包含阻塞队列里面的任务),当前线程池活动线程数为 0 ,即将执行 terminated() 方法
  • TERMINATED:终止状态, terminated()方法调用完成以后的状态。

线程池状态数据结构

状态设计

线程池状态由原子类型 AtomicInteger 来记录,value为32位的int类型数据,其高 3 位用于记录线程池的运行状态,低29位用于记录线程池当前活动线程数。

在ThreadPoolExecutor中,其数据结构设计如下:

//原子类型的int类型,其中的value为int类型,32位数据
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//COUNT_BITS=32-3=29,即活动线程数的位数
private static final int COUNT_BITS = Integer.SIZE - 3;
//最大活动线程数(低29位全为1)
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 运行状态存储在高3位, 活动线程数存储在低29位
//-1二进制为 1111...111
//1 二进制为 0000...001
//2 二进制为 0000...010
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
//获取运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//获取活动线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }
//根据 运行状态 与 活动线程数 ,生成ctl这个int类型的数据
private static int ctlOf(int rs, int wc) { return rs | wc; }

ThreadPoolExecutor()构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        ...
}

构造方法为线程池设置了许多核心参数:

参数名说明
corePoolSize核心线程数
maximumPoolSize总线程数=核心线程数+非核心线程数
keepAliveTime非核心线程最长空闲时间(如果超过空闲时间还没有任务,非核心线程就会被销毁)
unitkeepAliveTime的时间单位,可以是 TimeUnit.SECONDS
workQueue工作队列,存放Runnable,即线程任务。
threadFactory线程的构造工厂,用于自定义新建线程,如给线程添加名字
handler拒绝策略,如果在如下情况,新来的任务就会进入拒绝策略。
1. 工作线程全都被占用,而且任务队列以及被存满。
2. 当前线程不接受新的任务,如SHUTDOWN、STOP状态

如果线程池主要用来做计算逻辑判断等 CPU 密集型任务,我们需要尽量减少 CPU 上下文切换,设置核心线程数大小为:Ncpu + 1。

如果主要任务是大量 I/O 操作的 I/O 密集型任务,对CPU消耗较小,我们可以让核心线程数大一些,设置核心线程数大小为: 2 * Ncpu

execute(Runnable) 将新任务交给线程池

线程池任务提交的主要流程如下;

线程池任务提交的主要流程

在 execute() 方法中,对应上图,主要做了这几件事:

  1. 如果当前活动线程数没有超过核心线程数,创建核心线程执行传入的任务。
  2. 核心线程都被占用,如果队列还没满,将任务放到队列中。
  3. 如果队列已经满了,如果没超过最大线程数,可以创建非核心线程执行传入的任务。
  4. 都不满足,或者线程池状态变为非运行态,将会执行拒绝策略来拒绝任务。
  5. 特别的,如果线程池变为非运行态,且为 SHUTDOWN,为了让线程池成功进入 TIDYING 状态,需要将任务队列中的任务清空,创建一个空任务非核心线程,用于获取并处理任务队列中的任务。

在创建非核心线程之前,任务队列中等待的任务将被空闲下来的核心线程获取并执行。

创建非核心线程之后,任务队列中等待的任务不仅可以被核心线程获取,也可以被存活的非核心线程获取并执行。

非核心线程有存活时间,由参数 keepAliveTime 设定,如果空闲时,持续 keepAliveTime 时间无法获取到任务,该非核心线程将会被销毁。

public void execute(Runnable command) {
    //如果传入的任务是空,抛异常
    if (command == null)
        throw new NullPointerException();
    //获取当前线程池状态
    int c = ctl.get();
    //1. 如果活动线程数<核心线程数
    if (workerCountOf(c) < corePoolSize) {
        //创建核心线程来处理传入的任务
        if (addWorker(command, true))
            return;
        //如果创建失败,说明有其他线程介入
        //再次获取最新线程池状态
        c = ctl.get();
    }
    //2. 如果当前线程池是运行态,并且成功将任务放入工作队列
    if (isRunning(c) && workQueue.offer(command)) {
        //添加任务队列成功
        //再次检查最新线程状态
        int recheck = ctl.get();
        //如果当我们将任务放入队列后出现了:
        //1. 线程池不再是运行态 --> 将任务从队列中剔除,并执行拒绝策略(非运行状态,拒绝接受任务)
        //2. 活动线程数为0 --> 创建非核心线程,处理的任务由非核心线程自主去工作队列中获取
        if (! isRunning(recheck) && remove(command))
            //线程池状态不正确,执行拒绝策略
            reject(command);
        else if (workerCountOf(recheck) == 0)
            //情况1:RUNNING状态,但设置核心线程数为0,此时任务放入了工作队列,但没人处理
            //情况2:SHUTDOWN状态,但remove任务失败。在此之前工作线程处理完所有任务后全部停止工作,为了保证队列为空,需要再创建一个空任务非核心线程,处理这个队列中的剩余任务
            addWorker(null, false);
    }
    //3. 如果线程池不是运行态,或者是运行态,但任务队列已满,尝试添加非核心线程处理当前任务
    else if (!addWorker(command, false))
        //添加非核心线程失败,执行拒绝策略
        reject(command);
}

addWorker(Runnable, boolean) 添加工作线程

addWorker()的任务是添加工作线程,它主要做了以下几件事:

  1. 先在表示线城池状态的 ctl 变量中,增加工作线程数
    1. 检查线程池状态是否合法
    2. 自旋CAS增加工作线程数——CAS方法提供了原子性的 检查+更新
  2. 然后真正地创建线程,并加入到工作线程集合 workers 中
    1. 新建Worker
    2. 获取锁——非CAS方法 检查+更新 需要加锁使之为原子性
    3. 检查线程池状态是否合法
    4. 将 Worker 加入 workers
    5. 释放锁
    6. 开启线程

mainLock 逻辑上用于 workers 资源的上锁

private boolean addWorker(Runnable firstTask, boolean core) {
    //----------------增加工作线程数--------------------
    retry:
    for (;;) {
        //创建新的工作线程,需要 CAS自旋 更新工作线程数
        int c = ctl.get();
        int rs = runStateOf(c);
		//如果出现以下情况,将不添加工作线程
        //1. 线程池非运行态
        //2. 在1的条件下,线程池为STOP状态(既不是运行态也不是SHUTDOWN态),且传入了任务
        //3. 在2的条件下,工作队列不为空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            //如果线程池状态为运行态,进入该循环,自旋cas尝试增加工作线程数
            int wc = workerCountOf(c);
            //创建失败条件:
            //1. 如果当前活动线程数已经到达了最大线程数
            //2. 如果请求创建核心线程,但核心线程数已经满了
            //3. 如果请求非核心线程数,但总线线程数已经满了
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //如果还可以创建,先自旋cas尝试增加工作线程数
            if (compareAndIncrementWorkerCount(c))
                //有别的线程介入,cas失败,自旋再cas。
                //注意,由于别的线程cas失败,所以需要重新判断线程状态ctl
                break retry;
            c = ctl.get();  // Re-read ctl
            //活动线程数加1完成后,观察最新线程池运行状态
            if (runStateOf(c) != rs)
                //如果线程池运行状态变化(变成非运行态)重新进入retry循环查看条件
                continue retry;
            
        }
    }
    
    //----------------真正创建工作线程--------------------

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //新的Worker,Worker封装了线程、任务
        //初始Worker的state是-1
        w = new Worker(firstTask);
        final Thread t = w.thread;
        //健壮性判断,如果线程不为空
        if (t != null) {
            //多线程环境下 workers是非线程安全集合,它的增删操作需要加锁
            //需要通过上锁,将 检查+更新 包装为原子性
            //mainLock 逻辑上用于 workers 资源的上锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //检查当前线程池状态
                int rs = runStateOf(ctl.get());
				//1. 如果是运行态
                //2. SHUTDOWN态且传入任务为空,这个情况创建线程的目的是处理工作队列中剩余任务。
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //健壮性判断,该线程不应该为运行态,因为他是刚创建的。
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    //在临界区内操作临界资源
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //标记添加工作线程成功
                    workerAdded = true;
                }
            } finally {
                //释放锁
                mainLock.unlock();
            }
            if (workerAdded) {
                //添加工作线程成功后,启动线程
                t.start();
                //线程开启标记
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            //线程开启失败,自旋cas将之前增加的工作线程数记录-1
            //同时将该启动线程失败的worker从set中移除
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker对象

Worker对象封装了线程、线程任务、线程在线程池中的状态

它构造方法如下:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable{
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    public void run() {
        runWorker(this);
    }
}
    

注意到Worker类实现了Runnable接口,构造方法中新建线程时,给线程赋予的Runnable对象不是firstTask,而是worker实例。

未来执行线程,将会进入到worker的run()方法,随后进入runWorker()方法。

runWorker(Worker) 线程的执行逻辑

回顾一下普通启动线程的方式,只传入一个实际任务,它们执行任务完毕就会结束线程。在多线程环境下,如果频繁创建、销毁线程,是很消耗CPU资源的。而线程池为了复用线程,在 ThreadPoolExecutor 中,将核心线程与非核心线程都设计为了持续运行的可复用的线程。

核心线程需要一直保持运行状态,不断阻塞处理工作队列中的任务;非核心线程在空闲时也会处理工作队列中的任务,只有在空闲时间超时后才会结束线程。

让线程保持运行状态、获取工作队列中任务的一系列动作,需要将工作队列与线程联系起来,这由 ThreadPoolExecutor 设计的 runWorker(Worker w) 完成。

线程池创建的线程由 t.start() 启动后,run() 方法将会被回调,Worker对象实例化线程时,传入了实现了 Runnable 接口的 Worker 实例。所以 t.run() 方法将会进入 Worker类 的 run() 方法。其中就只做了一件事:执行由上述由 ThreadPoolExecutor 实现的 runWorker() 方法。

**runWorker() 方法为了将工作队列与线程联系起来,让线程有任务可做,**做了几件事:

  1. 如果有初始任务,则执行初始任务
  2. 如果没有,或者初始任务已经执行完了,就去工作队列中获取任务
  3. 线程是临界区资源,先上锁,再执行任务

需要注意的是,runWorker 在循环中,在执行任务前后添加了钩子函数 beforeExecute() 与 afterExecute() ,它们抛出的异常如果方法内没有处理,将会导致 runWorker 跳出循环,进入线程终止处理。

如果我们希望线程可以中断,我们可以在钩子函数中实现对中断的响应。

除了响应中断,我们还可以做很多操作,例如让线程暂停/回复 处理任务,这个代码在文末提供。

final void runWorker(Worker w) {
    //获取当前线程环境
    Thread wt = Thread.currentThread();
    //取出Worker封装的初始任务
    Runnable task = w.firstTask;
    //将Worker封装的初始任务清空
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //循环获取可执行的任务,进入循环的条件:
        //1. 如果worker中有初始任务
        //2. 获取到工作队列中的任务
        while (task != null || (task = getTask()) != null) {
            //线程是临界区资源,处理任务的过程需要将线程上锁
            w.lock();
            //1.如果线程为STOP状态,或者TIDYING、TERMINATED状态,那么就将线程中断(将中断标记设为true)
            //2.如果线程中断标记为真,且状态此时变为上述状态,那么就将线程中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //如果线程没被中断,为正常运行态
                //执行任务前的钩子函数,注意,钩子函数抛出的异常是没人管的,会直接进入到finally代码块
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //在当前线程环境执行任务的run()方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                //处理完任务,将task置空,等待GC
                task = null;
                //累计完成任务数+1
                w.completedTasks++;
                //处理任务完成,将线程资源释放
                w.unlock();
            }
        }
        //如果执行到这里之前有人抛出异常,且无人捕获,则不执行这句,直接进入finally
        //(工作线程 task.run() 即使抛了异常,也被捕获了。只有可能在 getTask,w.lock(), 钩子函数 等无人捕获异常的地方抛出异常,才有可能跳过这句话的执行,直接进入finally)
        //如果执行到这里都没人抛出异常,或者抛出的异常全都被捕获过,则执行这一句
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

有的朋友就会疑问,非核心线程是有 keepAliveTime 的,但是在 runWorker() 中并没有看到相关的结束线程的判断。不用急,我们看到 getTask() 方法中,它处理了非核心线程的存活时间。

getTask() 从工作队列中获取线程任务

getTask() 运行在当前线程的环境,当前线程需要从工作队列中获取任务。如果超时了,或者线程需要结束的情况,将会返回null。如果正常,将会把取到的任务返回,并由当前线程执行 r.run()。getTask()主要做了下面几件事:

  1. 获取线程最新状态
  2. 根据线程池状态,判断当前线程是否要继续运行
    1. 如果线程为运行态
    2. 或者是SHUTDOWN状态,但工作队列不为空,就继续运行
  3. 如果线程超时,或者线程数过多,且满足下列条件,就可以退出当前线程
    1. 如果有其他线程(工作队列中的任务可以被别的线程处理)
    2. 如果工作队列为空(不需要线程处理任务)
  4. 线程正常运行,则 限时 或 无限时 从工作队列中获取任务。
  5. 只有限时获取任务失败,才会第二次进入循环。
private Runnable getTask() {
    boolean timedOut = false; 

    for (;;) {
        //获取线程池最新状态
        int c = ctl.get();
        int rs = runStateOf(c);

        //如果线程非运行态
        //如果
        //1. 是STOP、TIDYING、TERMINATED状态
        //2. 是SHUTDOWN状态,且工作队列为空
        //则自旋cas减小工作线程数,并return null,让所在线程获取任务失败,退出获取任务的循环,从而结束线程。
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
		//如果线程为运行态
        //或者是SHUTDOWN状态,但工作队列不为空
        //就需要将任务取出处理
        int wc = workerCountOf(c);
        
		//allowCoreThreadTimeOut 一般都是 false
        //判断当前线程是否有限时,是需要判断当前线程是否为核心线程的,这里用了很聪明的方法:判断工作线程数是否大于核心线程数
        //如果工作线程数小于核心线程数,说明当前线程一定是核心线程,那么不限时
        //反之,就直接认定进入到当前判断的是非核心线程,让它限时。
        //注意,这里没有上锁
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
		//如果工作线程超过了最大值,或者当前线程超时了,那么当前线程应当被废弃
        //额外条件:且此时 有别的工作线程(有别人处理任务) 或者 工作队列为空(没有任务需要处理)
        //那么当前线程就确定可以废弃,return null,结束当前线程。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            //CAS减小工作线程数,不取出任务
            if (compareAndDecrementWorkerCount(c))
                return null;
            //自旋
            continue;
        }
        //如果当前线程可用
        try {
            //如果限时,就通过限时方式poll()获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            //如果不限时,直接阻塞获取任务
            workQueue.take();
            if (r != null)
                //如果拿到了任务,直接返回执行
                return r;
            //进到这里说明超时了还没拿到任务
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

processWorkerExit() 结束线程

如果线程获取工作队列任务失败,或者被判定为当前线程应当结束,将会退出 runWorker() 的 while 循环,并进入 processWorkerExit() 处理线程结束的工作:

  1. 如果当前线程是因为异常退出的,需要将线程池工作线程数更新-1
  2. 获取锁,逻辑上对 workers 临界区变量上锁
  3. 将当前线程从 workers 中取出,并释放锁
  4. 释放线程后,尝试终止线程池
  5. 根据情况,看看是否需要补一个工作线程
//移除当前工作线程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //如果执行了 completedAbruptly=false 语句,说明当前线程异常退出了。需要将线程池工作线程数更新-1
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    //有线程退出的时候,将要操作 workers 这个非线程安全集合,它是临界区资源
    //对临界区资源的 检查+修改,需要原子性操作,需要上锁。对 mainLock 上锁。 
    // mainLock 逻辑上用于 workers 资源的上锁, 这里顺便同时也对 completedTaskCount 这个互斥资源进行逻辑上的上锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //工作记录,统计线程池总完成任务数
        completedTaskCount += w.completedTasks;
        //将该线程从 workers 集合中取出
        workers.remove(w);
    } finally {
        //释放对 workers 资源的 mainLock 锁
        mainLock.unlock();
    }

    //释放线程之后,尝试终止线程池
    //终止条件:SHUTDOWN状态且工作线程数为0且工作队列为空 || STOP状态且工作线程数为0
    tryTerminate();

    //获取当前线程池状态
    int c = ctl.get();
    //如果线程池状态为 RUNNING 或者 SHUTDOWN
    if (runStateLessThan(c, STOP)) {
        //如果是正常状态溢出当前线程,进入if方法块
        if (!completedAbruptly) {
            // 核心线程数最小值允许多少?
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //工作队列不为空,设置工作线程的最小值为1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            //如果工作线程数大于等于工作线程最小值,说明还有工作线程在线程池中,那就return,什么也不干。
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        //1. 如果是不正常的方式移除了当前工作线程,再补一个工作线程!
        //2. 线程池工作队列不为空,并且没有工作线程,再添加一个工作线程。
        addWorker(null, false);
    }
}

tryTerminate() 尝试终止线程

如果当前状态为 SHUTDOWN 或者 STOP ,而且满足变为 TIDYING 状态的条件,就CAS一次尝试将状态变为 TIDYING 。如果变为 TIDYING 成功,将继续变为 TERMINATED。在真正变为 TERMINATED 之前,会给一个钩子函数 terminated() 由外界/子类 实现,处理变为 TERMINATED 状态之前的最后操作。

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //如果是运行态,就不终止线程
        if (isRunning(c) ||
            //如果已经是TIDYING状态了,就不作处理
            runStateAtLeast(c, TIDYING) ||
            //如果是SHUTDOWN,但是工作队列还有任务,不作处理( SHUTDOWN需要到工作队列为空的时候,才可以变成TIDYING)
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        //如果是SHUTDOWN或者STOP状态,但是工作线程还没清空
        if (workerCountOf(c) != 0) { // Eligible to terminate
            //中断一个当前运行的线程
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        //如果可以变为TIDYING状态,上锁,因为terminated中的操作可能会是临界区资源
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    //自定义处理线程池终止前的最后操作
                    terminated();
                } finally {
                    //最终设置为TERMINATED线程池状态
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

shutdown() 将线程池变为 SHUTDOWN 状态

之所以在 mainLock.lock() 内还要对 ctl 变量进行自旋更新,是因为还有其他线程在没持有锁的时候,也对 ctl 进行自旋更新。例如:addWorker()中对工作线程数进行更改就没有上锁。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //判断能否SHUTDOWN
        checkShutdownAccess();
        //自旋将线程池运行状态改为 SHUTDOWN
        advanceRunState(SHUTDOWN);
        //中断所有线程
        interruptIdleWorkers();
        //线程池生命周期钩子函数
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    //变为SHUTDOWN的时候,尝试变为TIDYING
    tryTerminate();
}

shutdownNow() 将线程池变为 STOP 状态

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //检查能否SHUTDOWN
        checkShutdownAccess();
        //将状态改为STOP
        advanceRunState(STOP);
        //中断所有工作线程,中断响应可在runWorker的钩子函数中自定义处理(可以通过抛出异常来终止线程运行)
        interruptWorkers();
        //将工作队列中的任务全都溢出,并且添加到 tasks中。可以交给拒绝策略,或者外部处理这些还没完成的任务。
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    //变为 STOP 状态后,尝试变为 TIDYING 状态
    tryTerminate();
    return tasks;
}

文中多次提到了钩子函数,其中最常用的是在 runWorker() 中,在线程执行任务前后添加的 beforeExecute() 和 afterExecute() 两个钩子函数。除了响应中断,我们还可以做很多操作,例如让线程暂停/回复 处理任务:

class PausableThreadPoolExecutor extends ThreadPoolExecutor {
    private boolean isPaused;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();

    public PausableThreadPoolExecutor(...) { super(...); }

    protected void beforeExecute(Thread t, Runnable r) {
        //确保继承的方法多次重写的调用链正常,要调用super.beforeExecute()
        super.beforeExecute(t, r);
        //获取暂停锁
        pauseLock.lock();
        try {
            //条件变量,只要不满足,就阻塞等待
            while (isPaused) unpaused.await();
        } catch (InterruptedException ie) {
            t.interrupt();
        } finally {
            pauseLock.unlock();
        }
    }

    public void pause() {
        pauseLock.lock();
        try {
            isPaused = true;
        } finally {
            pauseLock.unlock();
        }
    }

    public void resume() {
        pauseLock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        } finally {
            pauseLock.unlock();
        }
    }
}}

其中使用了条件变量,使用方法见 ReentrantLock 的条件变量文章。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/178893.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux运维之解决服务器挖矿木马问题

文章目录1 挖矿木马1.1 定义1.2 挖矿特征1.3 解决挖矿木马1.3.1 阻断异常网络通信&#xff08;非必需&#xff09;1.3.2 清除定时任务1.3.3 清除启动项1.3.4 清除SSH公钥1.3.5 清除木马进程1.4 其他常见问题1.4.1 清除木马后又100%1.4.2 CPU占用100%却看不到进程1 挖矿木马 1.…

Python OS 文件目录方法 os.walk()

Python OS 文件/目录方法 os.walk() 概述 os.walk() 方法用于通过在目录树中游走输出在目录中的文件名&#xff0c;向上或者向下。 os.walk() 方法是一个简单易用的文件、目录遍历器&#xff0c;可以帮助我们高效的处理文件、目录方面的事情。 在Unix&#xff0c;Windows中…

BFS(三)腐烂的橘子(感染问题)

994. 腐烂的橘子 在给定的 m x n 网格 grid 中&#xff0c;每个单元格可以有以下三个值之一&#xff1a; 值 0 代表空单元格&#xff1b; 值 1 代表新鲜橘子&#xff1b; 值 2 代表腐烂的橘子。 每分钟&#xff0c;腐烂的橘子 周围 4 个方向上相邻 的…

领导看到我自用的IDEA插件,也回去悄悄安装了...

现在哪有程序员离得开 Idea 啊&#xff01;没有 Idea 的程序员那还有灵魂吗&#xff1f;那没有&#xff01;既然我们都用 Idea&#xff0c;如何提高 Idea 的开发效率&#xff0c;在开发工具上&#xff0c;我们就卷掉其他小伙伴呢&#xff01;今天鸡翅老哥就来给大家介绍几款我一…

函数的认识

文章目录 函数是什么库函数 自定义函数 函数参数 函数调用 函数的嵌套调用和链式访问 函数的声明和定义 函数递归一、函数是什么 维基百科中对函数的定义&#xff1a;子程序在计算机科学中&#xff0c;子程序&#xff08;英语&#xff1a;Subroutine, procedure, functio…

SpringBoot+Vue项目课程作业管理系统

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7/8.0 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.3.9 浏…

NodeJS Web 框架 Express 之路由

NodeJS Web 框架 Express 之路由参考描述路由路由匹配规则顺序匹配模块化创建使用前缀参考 项目描述哔哩哔哩黑马程序员搜索引擎Bing 描述 项目描述Edge109.0.1518.61 (正式版本) (64 位)NodeJSv18.13.0nodemon2.0.20Express4.18.2 路由 在 Web 中&#xff0c;路由可以理解为…

体验 micronaut 微服务框架

体验 micronaut 微服务框架谁在使用 MICRONAUT主要特点代码示例展示几点特性原生云原生安装 Micronaut 命令行工具创建一个 MICRONAUT 应用程序MICRONAUT是基于 JVM 的现代全栈框架&#xff0c;用于构建模块化、易于测试的微服务和无服务器应用程序。 谁在使用 MICRONAUT 主要…

数学建模——评价算法

层次分析法&#xff08;AHP&#xff09; 步骤 1.建立层次结构模型&#xff1b; 2.构造判断(成对比较)矩阵&#xff1b; 3.层次单排序及其一致性检验&#xff1b; 4.层次总排序及其一致性检验&#xff1b; 建立层次结构模型 将决策的目标、考虑的因素(决策准则)和决策对象按…

【HBase入门】5. 常用 Shell 操作(2)

前言 我们可以以shell的方式来维护和管理HBase。例如&#xff1a;执行建表语句、执行增删改查操作等等。 导入测试数据集 需求 在资料的 数据集/ ORDER_INFO.txt 中&#xff0c;有一份这样的HBase数据集&#xff0c;我们需要将这些指令放到HBase中执行&#xff0c;将数据导入…

申请Moonbeam Accelerator孵化计划申请答题指导

Moonbeam Accelerator是一个为期10 周的孵化计划&#xff0c;由Moonbeam基金会、Arrington Capital和Rokk3r共同推出&#xff0c;旨在帮助初创团队提高技术、业务、营销、金融和融资技能&#xff0c;助力您的Web3创业之梦。 申请孵化计划有任何限制吗&#xff1f;没有&#xff…

BFS(二)二叉树层序遍历(I、II)、二叉树锯齿形层序遍历、N叉树层序遍历

目录 102. 二叉树的层序遍历 107. 二叉树的层序遍历 II 103. 二叉树的锯齿形层序遍历 429. N 叉树的层序遍历 102. 二叉树的层序遍历 给你二叉树的根节点 root &#xff0c;返回其节点值的 层序遍历 。 &#xff08;即逐层地&#xff0c;从左到右访问所有节点&#xff09;。…

VUE3/TS/TSX入门手册指北

VUE3入门手册vue3入门首先 查看 官方文档&#xff1a;https://cn.vuejs.org/guide/quick-start.html如果有vue2基础&#xff0c;速成课程&#xff1a;https://www.zhoulujun.co/learning-vue3/component.html&#xff08;官方文档 还是建议 翻一遍&#xff09;VUE3深入首先看源…

STM32+python产生三角波

目录任务目标实现方法python制作数表由于项目需要&#xff0c;需要产生一个三角波&#xff0c;需要覆盖4000个点的一个数组&#xff0c;这样的数组点数太多了&#xff0c;肯定不能自己一个一个手写了。最简单的一个方法是在嵌入式程序中用C写一个函数&#xff0c;对一个数组&am…

基于蜣螂优化的Elman神经网络数据预测-附代码

基于蜣螂算法优化的Elman神经网络数据预测 - 附代码 文章目录基于蜣螂算法优化的Elman神经网络数据预测 - 附代码1.Elman 神经网络结构2.Elman 神经用络学习过程3.电力负荷预测概述3.1 模型建立4.基于蜣螂优化的Elman网络5.测试结果6.参考文献7.Matlab代码摘要&#xff1a;针对…

LwIP系列--内存管理(内存池)详解

一、目的在《LwIP系列--内存管理&#xff08;堆内存&#xff09;详解》中我们详细介绍了LwIP中内存堆的实现原理&#xff0c;本篇我们介绍LwIP中内存池的实现细节。在LwIP源码中为了满足特定内存分配的需要以及优化内存占用制定了各种尺寸大小的内存池&#xff08;每种内存池管…

Golang应用执行Shell命令实战教程

本文学习如何在Golang程序中执行Shell命令&#xff08;如&#xff0c;ls&#xff0c;mkdir或grep&#xff09;&#xff0c;如何通过stdin和stdout传入I/O给正在运行的命令&#xff0c;同时管理长时间运行的命令。为了更好的理解&#xff0c;针对不同场景由浅入深提供几个示例进…

77、TensoRF: Tensorial Radiance Fields

简介 主页&#xff1a;https://apchenstu.github.io/TensoRF/ 总体而言&#xff0c;该文章主要内容于DVGO类似 将场景的亮度场建模为4D张量&#xff0c;它表示一个具有每体素多通道特征的3D体素网格&#xff0c;中心思想是将4D场景张量分解为多个紧凑低秩张量分量&#xff0c…

06 | 要找工作了,应该如何准备?

前言 前言&#xff1a;找工作更像相亲&#xff0c;总有一款适合自己。简历就像一份广告&#xff0c;对方要什么你写什么&#xff0c;而不是你有什么。 文章目录前言一、找工作的流程二、做法1. 分析职位描述&#xff08;JD&#xff09;1&#xff09;组成2&#xff09;做法一、找…

【数据结构】7.2 线性表的查找

7.2.1 顺序查找&#xff08;线性查找&#xff09; 应用范围&#xff1a; 顺序表或线性链表表示的静态查找表。表内元素之间可以无序。 数据元素类型定义&#xff1a; 数据表可能有多个数据域的值&#xff0c;比如成绩表中有姓名、成绩、总分等。所以用结构类型来表示要存储…