本文从ThreadPoolExecutor源码来理解线程池原理。
ThreadPoolExecutor使用了AQS、位操作、CAS操作等。在看这篇文章之前,需要具备以下知识:
多线程与高并发(6)——CAS详解(包含ABA问题)
多线程与高并发(7)——从ReentrantLock到AQS源码(两万字大章,一篇理解AQS)
多线程与高并发(13)——Java常见并发容器总结
多线程与高并发(15)——线程池详解(非源码层面)
这里我们就不再赘述ThreadPoolExecutor的使用以及入参参数的用法了,直接上源码,JDK8。
源码分析
一、主要属性
这里没列举状态,只列举了主要的属性,代码如下 :
//存放状态和线程数的控制变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//BlockingQueue是一个阻塞队列,这里用于存储任务
private final BlockingQueue<Runnable> workQueue;
//全局锁,ReentrantLock是可重入锁,用于访问工作线程Worker集合和进行数据统计记录时候的加锁操作
private final ReentrantLock mainLock = new ReentrantLock();
//工作线程集合,只有在持有全局锁mainLock的前提下才能访问此集合
private final HashSet<Worker> workers = new HashSet<Worker>();
//awaitTermination等待条件变量
private final Condition termination = mainLock.newCondition();
//峰值线程数(全局锁下)
private int largestPoolSize;
//已经成功执行的任务数
private long completedTaskCount;
//线程工厂
private volatile ThreadFactory threadFactory;
//拒绝策略
private volatile RejectedExecutionHandler handler;
/**
* 当线程池中的线程数量大于核心线程数的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,
* 而是会等待,直到等待的时间超过了这个时间才会被回收销毁;
*/
private volatile long keepAliveTime;
/**
* 是否允许核心线程超时,如果为true则keepAliveTime对核心线程也生效
*/
private volatile boolean allowCoreThreadTimeOut;
//核心线程数
private volatile int corePoolSize;
//最大线程数
private volatile int maximumPoolSize;
//默认拒绝策略是AbortPolicy,抛异常
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
构造函数我们就不在这一一列举了,上篇文章已经总结。
二、线程池状态
线程的生命周期及状态,可参考:多线程与高并发(1)——线程的基本知识(实现,常用方法,状态)。
而在线程池中,其线程状态的控制主要由AtomicInteger修饰的ctl变量决定。
其中c通常代表ctl,s代表状态常量,wc就是表示worker count、rs就是表示running status。
代码如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//COUNT_BITS的值是32-3=29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 工作线程上限数量掩码
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取工作线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
//通过运行状态和工作线程数计算ctl的值,或运算
private static int ctlOf(int rs, int wc) { return rs | wc; }
// ctl和状态常量比较,判断是否小于
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
//ctl和状态常量比较,判断是否大于或等于
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// ctl和状态常量SHUTDOWN比较,判断是否处于RUNNING状态,因为只有RUNNING是小于SHUTDOWN值的
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 通过CAS操作使线程数+1
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 通过CAS操作使线程数-1
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
总结上面代码之前,可以先了解位运算:位运算详解。
一个bit是一个0或1,中文叫做一个二进制位。一个byte是8个bit,中文名称叫一个字节。
一个Integer的大小是4byte(4个字节),Integer.SIZE是32bit,也就是说COUNT_BITS的值是29。
AtomicInteger ctl用于存放状态和线程数,所以其长度是32位。
其中,低29位用于存放工作线程数,而高3位用于存放线程池状态,所以线程池的状态最多只能有2^3种。
工作线程上限数量为2^29 - 1,超过5亿,这个数量在短时间内不用考虑会超限。
工作线程上限数量掩码为CAPACITY ,1左移29位再减1,其二进制值为:
同上,我们计算各状态的值:
同过上面各个状态的值,我们再来分析以下代码:
//1、RUNNING的二进制位是111-00000000000000000000000000000
//2、通过ctl(111-00000000000000000000000000000,0)计算
//3、计算结果为111-00000000000000000000000000000,默认状态为RUNNING
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 获取运行状态
//1、先取反CAPACITY,得到111-00000000000000000000000000000
//2、再按位取与,其中c是ctl.get()得到的值,ctl位图特点是:xxx-yyyyyyyyyyyyyyyyyyyyyyyyyyyyyy
//3、获取高三位的值
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取工作线程数
// 直接与CAPACITY做按位与的操作,得到后29位
private static int workerCountOf(int c) { return c & CAPACITY; }
//通过运行状态和工作线程数计算ctl的值,或运算
private static int ctlOf(int rs, int wc) { return rs | wc; }
总结一下,就是通过计算,获取到运行状态、工作线程数和ctl。
=线程池的状态跃迁图如下:
三、execute方法
execute方法是ThreadPoolExecutor异步执行任务的方法,用来提交一个任务到线程池中去。
其中,ExecutorService接口中提供了submit()方法,最终也是调用的execute()方法。
execute()方法代码如下:
public void execute(Runnable command) {
//判断任务对象非空
if (command == null)
throw new NullPointerException();
//获取ctl的值,上面我们知道默认为running
int c = ctl.get();
//1、获取工作线程数,判断是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//1.1 如果小于核心线程数,则创建新的核心线程,并执行任务,如果成功则返回,addwoker()后面写
if (addWorker(command, true))
return;
//1.2 创建新的核心线程失败,获取到最新的ctl值
c = ctl.get();
}
//2、当前执行的工作线程数量大于等于corePoolSize
//2.1 判断当前状态是否为running,尝试用workQueue.offer非阻塞方法向任务队列放入任务(放入任务失败返回false)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//2.2二次检查当前线程池的状态,因为在这个执行期间线程池状态可能会发生变化
// 如果线程池二次检查状态是非运行中状态,则从任务队列移除当前的任务,调用拒绝策略处理
if (! isRunning(recheck) && remove(command))
reject(command);
//2.3 如果当前线程池线程数量为0,则创建一个非核心线程,但是传入的任务为null
//非核心线程不会马上运行,而是等待获取任务队列的任务去执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3、最后一步,核心线程已被用满,线程池不是running,线程池可能是RUNNING状态同时任务队列已经满了
//这个时候,就执行以下
//3.1如果向任务队列投放任务失败,则会尝试创建非核心线程传入任务执行
//如果创建非核心线程执行失败,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
其流程图如下:
简便点的,直接从网上找来的,如下:
按上述步骤详细画下,如下:
如果一个任务成功加入任务队列,我们依然需要二次检查是否需要添加一个工作线程(因为所有存活的工作线程有可能在最后一次检查之后已经终结)或者执行当前方法的时候线程池是否已经shutdown了。
所以我们需要二次检查线程池的状态,必须时把任务从任务队列中移除,并且在没有可用的工作线程的前提下新建一个工作线程。
四、addWorker方法
addWorker主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。
代码如下:
//firstTask表示执行的任务,core表示是否核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//死循环
for (;;) {
//获取线程池状态
int c = ctl.get();
int rs = runStateOf(c);
// 1、线程池非运行状态
// 2、线程池SHUTDOWN状态、任务为空、任务队列不为空三者的反面判断——————很难理解
//这个判断的边界是线程池状态在非运行状态下,不会再接受新的任务,
//在此前提下如果状态已经不是SHUTDOWN、传入任务不为空、任务队列为空(已经没有积压任务)都不需要添加新的线程
//总结下,就是说,非运行状态,且SHUTDOWN下传入了新任务,且任务队列已经空了,不创建新线程了
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//又是一个死循环
for (;;) {
//循环中,每次重新获取工作线程数wc
int wc = workerCountOf(c);
//1、线程数超纲了
if (wc >= CAPACITY ||
//2、core为true,核心线程数,为false,最大线程数,反正做个线程数的判断,超出就返回false
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//通过CAS更新工作线程数wc+1,break最外层的循环,true则新增成功
if (compareAndIncrementWorkerCount(c))
break retry;
//走到这里说明了通过CAS更新工作线程数wc失败,这个时候需要重新判断线程池的状态是否由RUNNING已经变为SHUTDOWN
c = ctl.get(); // Re-read ctl
//如果线程的状态改变了就再次执行上述操作,到最外层循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//CAS失败了,线程池状态依然是RUNNING,有可能是并发更新导致的失败,则在内层循环重试即可
}
}
//标记工作线程是否启动成功
boolean workerStarted = false;
//标记工作线程是否创建成功
boolean workerAdded = false;
Worker w = null;
try {
//创建Worker实例,Worker构造里面会通过线程工厂创建新的Thread对象
w = new Worker(firstTask);
//这里直接获取Thread
final Thread t = w.thread;
if (t != null) {
//加全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//获取线程池状态
int rs = runStateOf(ctl.get());
//1、状态是运行中
//2、状态是SHUTDOWN 但是任务为空
//以上两点需要添加工作线程
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//线程是否存活,不存活抛异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//工作线程集合中添加线程
workers.add(w);
int s = workers.size();
//尝试更新历史峰值工作线程数,也就是线程池峰值容量
if (s > largestPoolSize)
largestPoolSize = s;
//工作线程创建成功
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
if (workerAdded) {
//启动线程
t.start();
//线程启动成功
workerStarted = true;
}
}
} finally {
//线程启动失败,需要从工作线程中移除对应的Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
扩展:
retry:就是一个标记,标记对一个循环方法的操作(continue和break)处理点,功能类似于goto,所以retry一般都是伴随着for循环出现,retry:标记的下一行就是for循环,在for循环里面调用continue(或者break)再紧接着retry标记时,就表示从这个地方开始执行continue(或者break)操作。
总结一下:
1、死循环中,非运行状态,且SHUTDOWN下传入了新任务,且任务队列已经空了,不创建新线程。
2、死循环中,一系列判断后,通过CAS更新工作线程数wc+1,并break结束循环。
3、加全局锁,创建线程并启动线程。只有启动成功线程,才算添加好woker对象了,否则只是一个无用的临时对象。
添加worker失败的代码如下:
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
//移除woker
workers.remove(w);
//wc数量减1
decrementWorkerCount();
//根据状态尝试终结线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
五、工作线程内部类Worker
1、Worker构造函数
线程池中的每一个具体的工作线程被包装为内部类Worker实例,Worker继承AQS,实现了Runnable接口,如下:
其代码如下:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//线程实例,如果创建线程失败则为null
final Thread thread;
//任务实例
Runnable firstTask;
//记录每个线程完成的任务总数
volatile long completedTasks;
/**
* 构造函数
*/
Worker(Runnable firstTask) {
//AQS里的锁状态,禁止线程中断,直到runWorker()方法执行
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//通过ThreadFactory创建线程实例,其中Worker实例自身作为Runnable用于创建新的线程实例
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
//外部的runWorker()方法,是线程池的方法
runWorker(this);
}
// 是否持有锁,state值为1的时候表示持有锁,state值为0的时候表示已经释放锁
protected boolean isHeldExclusively() {
return getState() != 0;
}
//获取锁(资源),成功当前线程为独占线程
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//释放锁(资源),释放独占线程
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
//启动后进行线程中断,只有中断标志位为false才会中断
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
其中,最值得注意的是,
this.thread = getThreadFactory().newThread(this);
通过ThreadFactory创建的Thread实例同时传入Worker实例,因为Worker本身实现了Runnable,所以可以作为任务提交到线程中执行。
只要thread 执行了start()方法,就能够执行Worker中的run()方法。
Worker继承自AQS,其中也用到了模板方法模式,重写了获取资源和释放资源的方法。
2、runWorker()方法
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
//获取Worker中持有的初始化时传入的任务对象
Runnable task = w.firstTask;
//初始化时传入的任务对象设置为空
w.firstTask = null;
//由于Worker初始化时AQS中state设置为-1,这里要先做一次解锁把state更新为0,允许线程中断
w.unlock(); // allow interrupts
//线程是否因为用户异常终结,默认是true
boolean completedAbruptly = true;
try {
// 初始化任务对象不为null,或者从任务队列获取任务不为空
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池正在停止,那么要确保当前工作线程是中断状态
// 否则,要保证当前线程不是中断状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//线程执行前的钩子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//任务执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//线程执行后的钩子方法
afterExecute(task, thrown);
}
} finally {
//清空task临时变量,不然while会死循环执行同一个task
task = null;
//已完成任务+1
w.completedTasks++;
//释放资源
w.unlock();
}
}
//getTask()为null,正常结束
completedAbruptly = false;
} finally {
//处理线程退出,completedAbruptly为true说明由于用户异常导致线程非正常退出
processWorkerExit(w, completedAbruptly);
}
}
流程图如下:
3、getTask()方法
getTask()方法是工作线程在while死循环中获取任务队列中的任务对象的方法,代码如下:
private Runnable getTask() {
//记录last上一次从队列中拉取是否超时
boolean timedOut = false; // Did the last poll() time out?
//死循环
for (;;) {
//当前状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//线程池至少为SHUTDOWN ,且线程池正在STOP或者队列为空,工作线程减1
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//线程池还处于RUNNING状态,重新获取一次工作线程数
int wc = workerCountOf(c);
// Are workers subject to culling?
//allowCoreThreadTimeOut 是否允许核心线程超时,如果为true则keepAliveTime对核心线程也生效,默认为false
//工作线程是否大于核心线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//1、工作线程大于最大线程,或者超时过
//2、工作线程不止一条,或者队列为空
//以上连个条件同事满足,CAS把线程数减去1失败会进入下一轮循环做重试
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果timed为true,通过poll()方法做超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null
// 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个有效的任务时候再返回(一般不会是null)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//非null时候才返回
if (r != null)
return r;
//说明上一次从任务队列中获取到的任务为null,一般是workQueue.poll()方法超时返回null
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
第一处直接返回null,且线程数-1的场景:
1、线程池状态为SHUTDOWN,一般是调用了shutdown()方法,并且任务队列为空。
2、线程池状态为STOP。
第二处直接返回null且线程数-1的场景:
1、工作线程大于最大线程,说明了通过setMaximumPoolSize()方法减少了线程池容量。
2、允许线程超时同时上一轮通过poll()方法从任务队列中拉取任务为null。
3、工作线程不止一条,或者队列为空。
在execute()方法中,当线程池总数已经超过了corePoolSize并且还小于maximumPoolSize时,当任务队列已经满了的时候,会通过addWorker(task,false)添加非核心线程。而这里的逻辑恰好类似于addWorker(task,false)的反向操作,用于减少非核心线程,使得工作线程总数趋向于corePoolSize。
如果对于非核心线程,上一轮循环获取任务对象为null,这一轮循环很容易满足timed && timedOut为true,这个时候getTask()返回null会导致Worker#runWorker()方法跳出死循环,之后执行processWorkerExit()方法处理后续工作,而该非核心线程对应的Worker则变成“游离对象”,等待被JVM回收。
当allowCoreThreadTimeOut设置为true的时候,这里分析的非核心线程的生命周期终结逻辑同时会适用于核心线程。
本文参考:https://www.throwx.cn/2020/08/23/java-concurrency-thread-pool-executor/
六、shutdown()方法
1、shutdown()方法
shutdown()是线程池关闭的方法,代码如下:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//加锁
mainLock.lock();
try {
//安全策略
checkShutdownAccess();
//设置为SHUTDOWN状态
advanceRunState(SHUTDOWN);
//中断空闲的工作线程
interruptIdleWorkers();
//钩子方法,ScheduledThreadPoolExecutor用过
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//使状态更变为TIDYING,执行钩子方法terminated()后,最终状态更新为TERMINATED
tryTerminate();
}
2、shutdownNow()方法
我上篇文章写的:
shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。(新任务停止,已存在的要执行完)
shutdownNow():关闭线程池,线程的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。(停止所有任务并返回等待值)
代码如下:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//安全策略
checkShutdownAccess();
//设置为STOP状态
advanceRunState(STOP);
//中断所有的工作线程
interruptWorkers();
//清空工作队列并且取出所有的未执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
3、tryTerminate()
线程池在调用shutdown终结之后,会调用tryTerminate方法,将线程池状态改为TIDYING,调用完钩子方法terminated()后,状态会变成TERMINATED。
代码如下:
final void tryTerminate() {
//死循环
for (;;) {
int c = ctl.get();
//1、线程池是running
if (isRunning(c) ||
//2、或者只是是TIDYING状态,表示即将结束或者已经结束
runStateAtLeast(c, TIDYING) ||
//3、或者是SHUTDOWN状态且任务队列为空
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
//结束循环,直接返回,因为线程池还要运行或者已经结束
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
//工作线程数不为0,则中断工作线程集合中的第一个空闲的工作线程
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//CAS设置线程池状态为TIDYING,如果设置成功则执行钩子方法terminated()
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//线程池状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒阻塞在termination条件的所有线程,这个变量的await()方法在awaitTermination()中调用
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
shutdown()方法中会通过interruptIdleWorkers()中断所有的空闲线程,这个时候有可能有非空闲的线程在执行某个任务,执行任务完毕之后,如果它刚好是核心线程,就会在下一轮循环阻塞在任务队列的take()方法,如果不做额外的干预,它甚至会在线程池关闭之后永久阻塞在任务队列的take()方法中。为了避免这种情况,每个工作线程退出的时候都会尝试中断工作线程集合中的某一个空闲的线程,确保所有空闲的线程都能够正常退出。
interruptIdleWorkers()方法中会对每一个工作线程先进行tryLock()判断,只有返回true才有可能进行线程中断。我们知道runWorker()方法中,工作线程在每次从任务队列中获取到非null的任务之后,会先进行加锁Worker#lock()操作,这样就能避免线程在执行任务的过程中被中断,保证被中断的一定是空闲的工作线程。
总结
ThreadPoolExecutor源码的重点主要是提交执行任务、任务队列、线程池关闭这三块,其余的暂时就不做分析,本文主要参考来源:硬核干货:4W字从源码上分析JUC线程池ThreadPoolExecutor的实现原理,感谢大佬留下的康庄大道,想了解的更多更深的可以参考此文章。