多线程与高并发(16)——线程池原理(ThreadPoolExecutor源码)

news2025/1/16 21:11:38

本文从ThreadPoolExecutor源码来理解线程池原理。
ThreadPoolExecutor使用了AQS、位操作、CAS操作等。在看这篇文章之前,需要具备以下知识:
多线程与高并发(6)——CAS详解(包含ABA问题)
多线程与高并发(7)——从ReentrantLock到AQS源码(两万字大章,一篇理解AQS)
多线程与高并发(13)——Java常见并发容器总结
多线程与高并发(15)——线程池详解(非源码层面)
这里我们就不再赘述ThreadPoolExecutor的使用以及入参参数的用法了,直接上源码,JDK8。

源码分析

一、主要属性

这里没列举状态,只列举了主要的属性,代码如下 :

	//存放状态和线程数的控制变量
	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	//BlockingQueue是一个阻塞队列,这里用于存储任务
    private final BlockingQueue<Runnable> workQueue;
	//全局锁,ReentrantLock是可重入锁,用于访问工作线程Worker集合和进行数据统计记录时候的加锁操作
    private final ReentrantLock mainLock = new ReentrantLock();

    //工作线程集合,只有在持有全局锁mainLock的前提下才能访问此集合
    private final HashSet<Worker> workers = new HashSet<Worker>();

    //awaitTermination等待条件变量
    private final Condition termination = mainLock.newCondition();

    //峰值线程数(全局锁下)
    private int largestPoolSize;

    //已经成功执行的任务数
    private long completedTaskCount;
	//线程工厂
    private volatile ThreadFactory threadFactory;
	//拒绝策略
    private volatile RejectedExecutionHandler handler;

    /**
     * 当线程池中的线程数量大于核心线程数的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,
     * 而是会等待,直到等待的时间超过了这个时间才会被回收销毁;
     */
    private volatile long keepAliveTime;

    /**
     * 是否允许核心线程超时,如果为true则keepAliveTime对核心线程也生效
     */
    private volatile boolean allowCoreThreadTimeOut;
	//核心线程数
    private volatile int corePoolSize;
    //最大线程数
    private volatile int maximumPoolSize;

    //默认拒绝策略是AbortPolicy,抛异常
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

构造函数我们就不在这一一列举了,上篇文章已经总结。

二、线程池状态

线程的生命周期及状态,可参考:多线程与高并发(1)——线程的基本知识(实现,常用方法,状态)。
而在线程池中,其线程状态的控制主要由AtomicInteger修饰的ctl变量决定。
其中c通常代表ctl,s代表状态常量,wc就是表示worker count、rs就是表示running status。
代码如下:

	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	//COUNT_BITS的值是32-3=29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 工作线程上限数量掩码
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 获取运行状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 获取工作线程数
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //通过运行状态和工作线程数计算ctl的值,或运算
    private static int ctlOf(int rs, int wc) { return rs | wc; }

	// ctl和状态常量比较,判断是否小于
	private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
	//ctl和状态常量比较,判断是否大于或等于
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
	// ctl和状态常量SHUTDOWN比较,判断是否处于RUNNING状态,因为只有RUNNING是小于SHUTDOWN值的
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    /**
     * 通过CAS操作使线程数+1
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /**
     * 通过CAS操作使线程数-1
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

总结上面代码之前,可以先了解位运算:位运算详解。
一个bit是一个0或1,中文叫做一个二进制位。一个byte是8个bit,中文名称叫一个字节。
一个Integer的大小是4byte(4个字节),Integer.SIZE是32bit,也就是说COUNT_BITS的值是29。

AtomicInteger ctl用于存放状态和线程数,所以其长度是32位。
其中,低29位用于存放工作线程数,而高3位用于存放线程池状态,所以线程池的状态最多只能有2^3种。
工作线程上限数量为2^29 - 1,超过5亿,这个数量在短时间内不用考虑会超限。

工作线程上限数量掩码为CAPACITY ,1左移29位再减1,其二进制值为:
在这里插入图片描述
同上,我们计算各状态的值:
在这里插入图片描述
同过上面各个状态的值,我们再来分析以下代码:

	//1、RUNNING的二进制位是111-00000000000000000000000000000
	//2、通过ctl(111-00000000000000000000000000000,0)计算
	//3、计算结果为111-00000000000000000000000000000,默认状态为RUNNING
	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 获取运行状态
    //1、先取反CAPACITY,得到111-00000000000000000000000000000
    //2、再按位取与,其中c是ctl.get()得到的值,ctl位图特点是:xxx-yyyyyyyyyyyyyyyyyyyyyyyyyyyyyy
    //3、获取高三位的值
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 获取工作线程数
    // 直接与CAPACITY做按位与的操作,得到后29位
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //通过运行状态和工作线程数计算ctl的值,或运算
    private static int ctlOf(int rs, int wc) { return rs | wc; }

总结一下,就是通过计算,获取到运行状态、工作线程数和ctl。
=线程池的状态跃迁图如下:
在这里插入图片描述

三、execute方法

execute方法是ThreadPoolExecutor异步执行任务的方法,用来提交一个任务到线程池中去。
其中,ExecutorService接口中提供了submit()方法,最终也是调用的execute()方法。
execute()方法代码如下:

public void execute(Runnable command) {
		//判断任务对象非空
        if (command == null)
            throw new NullPointerException();
       	//获取ctl的值,上面我们知道默认为running
        int c = ctl.get();
        //1、获取工作线程数,判断是否小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
        	//1.1 如果小于核心线程数,则创建新的核心线程,并执行任务,如果成功则返回,addwoker()后面写
            if (addWorker(command, true))
                return;
            //1.2 创建新的核心线程失败,获取到最新的ctl值
            c = ctl.get();
        }
		//2、当前执行的工作线程数量大于等于corePoolSize
		//2.1 判断当前状态是否为running,尝试用workQueue.offer非阻塞方法向任务队列放入任务(放入任务失败返回false)
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //2.2二次检查当前线程池的状态,因为在这个执行期间线程池状态可能会发生变化
            // 如果线程池二次检查状态是非运行中状态,则从任务队列移除当前的任务,调用拒绝策略处理
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //2.3 如果当前线程池线程数量为0,则创建一个非核心线程,但是传入的任务为null
            //非核心线程不会马上运行,而是等待获取任务队列的任务去执行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3、最后一步,核心线程已被用满,线程池不是running,线程池可能是RUNNING状态同时任务队列已经满了
        //这个时候,就执行以下
        //3.1如果向任务队列投放任务失败,则会尝试创建非核心线程传入任务执行
        //如果创建非核心线程执行失败,执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

其流程图如下:
简便点的,直接从网上找来的,如下:
在这里插入图片描述
按上述步骤详细画下,如下:
在这里插入图片描述

如果一个任务成功加入任务队列,我们依然需要二次检查是否需要添加一个工作线程(因为所有存活的工作线程有可能在最后一次检查之后已经终结)或者执行当前方法的时候线程池是否已经shutdown了。
所以我们需要二次检查线程池的状态,必须时把任务从任务队列中移除,并且在没有可用的工作线程的前提下新建一个工作线程。

四、addWorker方法

addWorker主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。
代码如下:

//firstTask表示执行的任务,core表示是否核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        //死循环
        for (;;) {
        	//获取线程池状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // 1、线程池非运行状态
            // 2、线程池SHUTDOWN状态、任务为空、任务队列不为空三者的反面判断——————很难理解
            //这个判断的边界是线程池状态在非运行状态下,不会再接受新的任务,
            //在此前提下如果状态已经不是SHUTDOWN、传入任务不为空、任务队列为空(已经没有积压任务)都不需要添加新的线程
            //总结下,就是说,非运行状态,且SHUTDOWN下传入了新任务,且任务队列已经空了,不创建新线程了
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
			//又是一个死循环
            for (;;) {
            	//循环中,每次重新获取工作线程数wc
                int wc = workerCountOf(c);
                //1、线程数超纲了
                if (wc >= CAPACITY ||
                //2、core为true,核心线程数,为false,最大线程数,反正做个线程数的判断,超出就返回false
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //通过CAS更新工作线程数wc+1,break最外层的循环,true则新增成功
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //走到这里说明了通过CAS更新工作线程数wc失败,这个时候需要重新判断线程池的状态是否由RUNNING已经变为SHUTDOWN
                c = ctl.get();  // Re-read ctl
                //如果线程的状态改变了就再次执行上述操作,到最外层循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
                //CAS失败了,线程池状态依然是RUNNING,有可能是并发更新导致的失败,则在内层循环重试即可
            }
        }
		//标记工作线程是否启动成功
        boolean workerStarted = false;
        //标记工作线程是否创建成功
        boolean workerAdded = false;
        Worker w = null;
        try {
        	//创建Worker实例,Worker构造里面会通过线程工厂创建新的Thread对象
            w = new Worker(firstTask);
            //这里直接获取Thread
            final Thread t = w.thread;
            if (t != null) {
            	//加全局锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //获取线程池状态
                    int rs = runStateOf(ctl.get());
					//1、状态是运行中
					//2、状态是SHUTDOWN 但是任务为空
					//以上两点需要添加工作线程
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //线程是否存活,不存活抛异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //工作线程集合中添加线程
                        workers.add(w);
                        int s = workers.size();
                        //尝试更新历史峰值工作线程数,也就是线程池峰值容量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //工作线程创建成功
                        workerAdded = true;
                    }
                } finally {
                	//释放锁
                    mainLock.unlock();
                }
                if (workerAdded) {
                	//启动线程
                    t.start();
                    //线程启动成功 
                    workerStarted = true;
                }
            }
        } finally {
        	//线程启动失败,需要从工作线程中移除对应的Worker
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

扩展:
retry:就是一个标记,标记对一个循环方法的操作(continue和break)处理点,功能类似于goto,所以retry一般都是伴随着for循环出现,retry:标记的下一行就是for循环,在for循环里面调用continue(或者break)再紧接着retry标记时,就表示从这个地方开始执行continue(或者break)操作
总结一下:
1、死循环中,非运行状态,且SHUTDOWN下传入了新任务,且任务队列已经空了,不创建新线程。
2、死循环中,一系列判断后,通过CAS更新工作线程数wc+1,并break结束循环。
3、加全局锁,创建线程并启动线程。只有启动成功线程,才算添加好woker对象了,否则只是一个无用的临时对象。
添加worker失败的代码如下:

 private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
            	//移除woker
                workers.remove(w);
            //wc数量减1
            decrementWorkerCount();
            //根据状态尝试终结线程池
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

五、工作线程内部类Worker

1、Worker构造函数

线程池中的每一个具体的工作线程被包装为内部类Worker实例,Worker继承AQS,实现了Runnable接口,如下:
在这里插入图片描述
其代码如下:

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
		//线程实例,如果创建线程失败则为null
        final Thread thread;
        //任务实例
        Runnable firstTask;
        //记录每个线程完成的任务总数
        volatile long completedTasks;

        /**
         * 构造函数
         */
        Worker(Runnable firstTask) {
        	//AQS里的锁状态,禁止线程中断,直到runWorker()方法执行
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //通过ThreadFactory创建线程实例,其中Worker实例自身作为Runnable用于创建新的线程实例
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
        	//外部的runWorker()方法,是线程池的方法
            runWorker(this);
        }

        // 是否持有锁,state值为1的时候表示持有锁,state值为0的时候表示已经释放锁
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
		//获取锁(资源),成功当前线程为独占线程
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
		//释放锁(资源),释放独占线程
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
		//启动后进行线程中断,只有中断标志位为false才会中断
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

其中,最值得注意的是,

this.thread = getThreadFactory().newThread(this);

通过ThreadFactory创建的Thread实例同时传入Worker实例,因为Worker本身实现了Runnable,所以可以作为任务提交到线程中执行。
只要thread 执行了start()方法,就能够执行Worker中的run()方法。
Worker继承自AQS,其中也用到了模板方法模式,重写了获取资源和释放资源的方法。

2、runWorker()方法

final void runWorker(Worker w) {
		//获取当前线程
        Thread wt = Thread.currentThread();
        //获取Worker中持有的初始化时传入的任务对象
        Runnable task = w.firstTask;
        //初始化时传入的任务对象设置为空
        w.firstTask = null;
        //由于Worker初始化时AQS中state设置为-1,这里要先做一次解锁把state更新为0,允许线程中断
        w.unlock(); // allow interrupts
        //线程是否因为用户异常终结,默认是true
        boolean completedAbruptly = true;
        try {
            // 初始化任务对象不为null,或者从任务队列获取任务不为空
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 如果线程池正在停止,那么要确保当前工作线程是中断状态
            	// 否则,要保证当前线程不是中断状态
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                	//线程执行前的钩子方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    	//任务执行
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                   		//线程执行后的钩子方法
                        afterExecute(task, thrown);
                    }
                } finally {
                	//清空task临时变量,不然while会死循环执行同一个task
                    task = null;
                    //已完成任务+1
                    w.completedTasks++;
                    //释放资源
                    w.unlock();
                }
            }
            //getTask()为null,正常结束
            completedAbruptly = false;
        } finally {
        	//处理线程退出,completedAbruptly为true说明由于用户异常导致线程非正常退出
            processWorkerExit(w, completedAbruptly);
        }
    }

流程图如下:
在这里插入图片描述

3、getTask()方法

getTask()方法是工作线程在while死循环中获取任务队列中的任务对象的方法,代码如下:

private Runnable getTask() {
		//记录last上一次从队列中拉取是否超时
        boolean timedOut = false; // Did the last poll() time out?
		//死循环
        for (;;) {
        	//当前状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //线程池至少为SHUTDOWN ,且线程池正在STOP或者队列为空,工作线程减1
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
			//线程池还处于RUNNING状态,重新获取一次工作线程数
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            //allowCoreThreadTimeOut 是否允许核心线程超时,如果为true则keepAliveTime对核心线程也生效,默认为false
            //工作线程是否大于核心线程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
			//1、工作线程大于最大线程,或者超时过
			//2、工作线程不止一条,或者队列为空
			//以上连个条件同事满足,CAS把线程数减去1失败会进入下一轮循环做重试
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
	
            try {
            // 如果timed为true,通过poll()方法做超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null
            // 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个有效的任务时候再返回(一般不会是null)
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                //非null时候才返回
                if (r != null)
                    return r;
                //说明上一次从任务队列中获取到的任务为null,一般是workQueue.poll()方法超时返回null
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

第一处直接返回null,且线程数-1的场景:
1、线程池状态为SHUTDOWN,一般是调用了shutdown()方法,并且任务队列为空。
2、线程池状态为STOP。
第二处直接返回null且线程数-1的场景:
1、工作线程大于最大线程,说明了通过setMaximumPoolSize()方法减少了线程池容量。
2、允许线程超时同时上一轮通过poll()方法从任务队列中拉取任务为null。
3、工作线程不止一条,或者队列为空。

在execute()方法中,当线程池总数已经超过了corePoolSize并且还小于maximumPoolSize时,当任务队列已经满了的时候,会通过addWorker(task,false)添加非核心线程。而这里的逻辑恰好类似于addWorker(task,false)的反向操作,用于减少非核心线程,使得工作线程总数趋向于corePoolSize。
如果对于非核心线程,上一轮循环获取任务对象为null,这一轮循环很容易满足timed && timedOut为true,这个时候getTask()返回null会导致Worker#runWorker()方法跳出死循环,之后执行processWorkerExit()方法处理后续工作,而该非核心线程对应的Worker则变成“游离对象”,等待被JVM回收。
当allowCoreThreadTimeOut设置为true的时候,这里分析的非核心线程的生命周期终结逻辑同时会适用于核心线程。

本文参考:https://www.throwx.cn/2020/08/23/java-concurrency-thread-pool-executor/

六、shutdown()方法

1、shutdown()方法

shutdown()是线程池关闭的方法,代码如下:

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        //加锁
        mainLock.lock();
        try {
        	//安全策略
            checkShutdownAccess();
            //设置为SHUTDOWN状态
            advanceRunState(SHUTDOWN);
            //中断空闲的工作线程
            interruptIdleWorkers();
            //钩子方法,ScheduledThreadPoolExecutor用过
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        //使状态更变为TIDYING,执行钩子方法terminated()后,最终状态更新为TERMINATED
        tryTerminate();
    }

2、shutdownNow()方法

我上篇文章写的:
shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。(新任务停止,已存在的要执行完)
shutdownNow():关闭线程池,线程的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。(停止所有任务并返回等待值)
代码如下:

  public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
        	//安全策略
            checkShutdownAccess();
            //设置为STOP状态
            advanceRunState(STOP);
            //中断所有的工作线程
            interruptWorkers();
            //清空工作队列并且取出所有的未执行的任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

3、tryTerminate()

线程池在调用shutdown终结之后,会调用tryTerminate方法,将线程池状态改为TIDYING,调用完钩子方法terminated()后,状态会变成TERMINATED。
代码如下:

final void tryTerminate() {
		//死循环
        for (;;) {
            int c = ctl.get();
            //1、线程池是running
            if (isRunning(c) ||
            //2、或者只是是TIDYING状态,表示即将结束或者已经结束
                runStateAtLeast(c, TIDYING) ||
                //3、或者是SHUTDOWN状态且任务队列为空
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                //结束循环,直接返回,因为线程池还要运行或者已经结束
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
            	//工作线程数不为0,则中断工作线程集合中的第一个空闲的工作线程
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
            //CAS设置线程池状态为TIDYING,如果设置成功则执行钩子方法terminated()
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                    	//线程池状态为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 唤醒阻塞在termination条件的所有线程,这个变量的await()方法在awaitTermination()中调用
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

shutdown()方法中会通过interruptIdleWorkers()中断所有的空闲线程,这个时候有可能有非空闲的线程在执行某个任务,执行任务完毕之后,如果它刚好是核心线程,就会在下一轮循环阻塞在任务队列的take()方法,如果不做额外的干预,它甚至会在线程池关闭之后永久阻塞在任务队列的take()方法中。为了避免这种情况,每个工作线程退出的时候都会尝试中断工作线程集合中的某一个空闲的线程,确保所有空闲的线程都能够正常退出。

interruptIdleWorkers()方法中会对每一个工作线程先进行tryLock()判断,只有返回true才有可能进行线程中断。我们知道runWorker()方法中,工作线程在每次从任务队列中获取到非null的任务之后,会先进行加锁Worker#lock()操作,这样就能避免线程在执行任务的过程中被中断,保证被中断的一定是空闲的工作线程。

总结

ThreadPoolExecutor源码的重点主要是提交执行任务、任务队列、线程池关闭这三块,其余的暂时就不做分析,本文主要参考来源:硬核干货:4W字从源码上分析JUC线程池ThreadPoolExecutor的实现原理,感谢大佬留下的康庄大道,想了解的更多更深的可以参考此文章。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/145093.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

腾讯三面:进程写文件过程中,进程崩溃了,文件数据会丢吗?

进程写文件&#xff08;使用缓冲 IO&#xff09;过程中&#xff0c;写一半的时候&#xff0c;进程发生了崩溃&#xff0c;会丢失数据吗&#xff1f; 答案&#xff0c;是不会的。 因为进程在执行 write &#xff08;使用缓冲 IO&#xff09;系统调用的时候&#xff0c;实际上是…

企业宣传片制作配音,我们该从哪里找?

优秀的品质的配音是制作优质企业视频必不可少的硬件条件。因此&#xff0c;许多公司视频配音或旁白声音是由专门从事配音行业的人员配音的。 首先是在宣传视频中配音的作用 1.宣传视频的配音为您建立企业形象 2.宣传视频的配音将为您打开市场 3.宣传视频的配音将使您的宣传…

深入理解Synchronized

Synchronized 底层原理 Synchronized的语义底层是通过一个 Monitor 的对象来完成&#xff0c;其实wait/notify等方法也依赖于 Monitor 对象&#xff0c;这就是为什么只有在同步的块中&#xff0c;拿到锁之后&#xff0c;才能调用wait/notify等方法&#xff0c;否则会抛出java.…

AI助力产品质量检验,基于YOLO实现瓷砖缺陷问题检测识别

在我之前的文章中也写过很多关于生产质检相关的实践文章&#xff0c;一直觉得这块是比较有意思的应用方向&#xff0c;做出来的模型能够以一种更加直观贴切的形式展现出来&#xff0c;瓷砖缺陷问题检测识别也是一个比较老的话题了&#xff0c;今天还是想拿出来具体实践做一下&a…

Golang.org/x库初探1——image库

Golang有一个很有意思的官方库&#xff0c;叫golang.org/x&#xff0c;x可能是extends&#xff0c;experimental&#xff0c;总之是一些在官方库中没有&#xff0c;但是又很有用的库。最近花点时间把这里有用的介绍一下。 Image库 提供更多的图像格式 golang.org/x/image库整…

Linux 网络驱动

1. linux 里面驱动三巨头&#xff1a;字符设备驱动、块设备驱动、网络设备驱动。2.嵌入式网络硬件分为两部分&#xff1a; MAC 和 PHY。如果一款芯片数据手册说自己支持网络&#xff0c;一般都是说的这款 SOC 内置 MAC&#xff0c; MAC 类似 I2C 控制器、SPI 控制器一样的外设。…

Java三大技术平台是什么?

为了使软件开发人员、服务提供商和设备生产商可以针对特定的市场进行开发&#xff0c;SUN公司将Java划分为三个技术平台&#xff0c;它们分别是 JavaSE、 JavaEE和 JavaME。Java SE( Java Platform Standard Edition)标准版&#xff0c;是为开发普通桌面和商务应用程序提供的解…

零宽断言正则表达式替换方案

一、背景 safari浏览器不支持零宽断言正则表达式 二、解决方案 使用其他正则替换零宽断言正则&#xff08;包含&#xff1a;(?<)正向肯定预查、(?<!)正向否定预查、(?)反向肯定预查、(?!)反向否定预查&#xff09; 三、涉及场景 1、仅校验&#xff0c;不取值 如表…

首汽约车驶向极速统一之路!出行平台如何基于StarRocks构建实时数仓?

作者&#xff1a;王满&#xff0c;高级数据架构工程师首汽约车&#xff08;以下简称 “首约”&#xff09;是首汽集团为响应交通运输部号召&#xff0c;积极拥抱互联网&#xff0c;推动传统出租车行业转型升级&#xff0c;加强建设交通强国而打造的网约车出行平台。 在用车服务…

KernelSU: 内核 ROOT 方案, KernelSU KernelSU KernelSU 新的隐藏root防止检测 封号方案

大约一年多以前&#xff0c;我在一篇讲Android 上 ROOT 的过去、现在和未来https://mp.weixin.qq.com/s?__bizMjM5Njg5ODU2NA&mid2257499009&idx1&sn3cfce1ea7deb6e0e4f2ac170cffd7cc1&scene21#wechat_redirect 的文章中提到&#xff1a; 我认为&#xff0c;随…

三菱FX5U 多个表格运行指令 DRVTBL

简述该指令可以用GX Works3预先在表格数据中设定的控制方式的动作&#xff0c;&#xff08;连续或步进&#xff09; 执行多行。 本文演示了步进执行多行。指令解释2.1梯形图中的指令第一个参数&#xff1a;输出脉冲的轴编号 &#xff0c;K1,K2,K3,K4... 第二个参数&#xff1a;…

ESP8266 Windows开发环境搭建(IDE1.5)好用不骗人

最近一个项目需要用ESP8266&#xff0c;找了很多文章进行环境搭建编译都很问题&#xff0c;不是make Menuconfig 不出来&#xff0c;就是编译报错&#xff0c;现总结如下。 我在自己电脑上没弄出来&#xff0c;就安装了一个虚拟机很干净的环境没有其它开发环境影响。 提前去官…

逆向入门|全国建筑市场监管公共服务平台JS逆向

看了志远的公开课&#xff0c;自己做一下练手。 全国建筑市场监管公共服务平台&#xff08;四库一平台&#xff09; 先点到 数据这里打开f12看一眼 第一个就是 https://jzsc.mohurd.gov.cn/api/webApi/dataservice/query/comp/list?pg1&pgsz15&total450 取这个地址…

线段树讲解

0、引入 假设给定一个长度为 1001 的数组&#xff0c;即下标 0 到 1000。 现在需要完成 3 个功能&#xff1a; add(1, 200, 6); //给下标 1 到 200 的每个数都加 6&#xff1b; update(7, 375, 4); //下标 7 到 375 的数全部修改为 4 query(3, 999); //下标 3 到 999 所有数…

深入理解如何利用PWM驱动舵机:ESP32驱动DS1115舵机

深入理解如何利用PWM驱动舵机&#xff1a;ESP32驱动DS1115舵机DS1115舵机技术规格举例说明之前做了一个项目&#xff0c;关于ESP32驱动DS1115舵机&#xff0c;但是在项目运行的过程中由于学艺不精&#xff0c;导致电机抽搐 &#x1f635;‍&#x1f4ab;&#xff0c;所以特意拜…

声纹识别可靠评测

分享嘉宾 | 李蓝天 文稿整理 | William 1 Introduction 声纹识别的发展&#xff0c;非常迅猛&#xff0c;在一些基准上取得了不错的效果&#xff0c;但如果将其部署到一个实际的应用系统里面&#xff0c; 从应用方的反馈来看&#xff0c;纹识别在很多场景里的鲁棒性并不理想。…

聚观早报 | 亚马逊将裁员17000人;苹果砍单MacBook等产品线架构

今日要闻&#xff1a;亚马逊将裁员17000人&#xff1b;苹果砍单MacBook等产品线&#xff1b;京东科技调整组织架构&#xff1b;小米x徕卡团队获技术大奖&#xff1b;必应搜索或将纳入ChatGPT亚马逊将裁员17000人 1 月 5 日消息&#xff0c;知情人士称&#xff0c;亚马逊新一轮裁…

正版授权|FastStone Capture 专业屏幕截图录屏工具软件 商业版,支持商业用途。

现在截图对每个人来说都是一个必不可少的功能。QQ软件截图、360游览器截图等都是相对简单快速的途径。但是如果你对截图有更多的要求&#xff0c;那么这里推荐一款截图软件&#xff0c;它就是FastStone Capture。这个对于商城老用户来说&#xff0c;几乎是接近人手一份。强大的…

【VUE3】保姆级基础讲解(六)Axios库

目录 Axios介绍与原生的差异 发送常见的请求和配置选项 1、发送request请求 baseURL &#xff1a; 2、发送get请求 3、发送post请求 axios.all Axios创建新的实例 请求和响应拦截 请求拦截 响应拦截 Axios介绍与原生的差异 Axios其实就是一个网络请求库 与原生的差异&…

勇夺中国市场豪华品牌第一名后,特斯拉S3XY全系售价调整

比你优秀的人比你更努力&#xff0c;用这句话形容特斯拉最贴切不过。 刚刚过去的2022年&#xff0c;特斯拉在海内外市场交出了亮眼答卷&#xff1a;全球共计交付产品超131万辆&#xff0c;同比增长40%&#xff1b;乘联会给出的数据显示&#xff0c;上海超级工厂全年交付71.1万辆…