尺有所短,寸有所长;不忘初心,方得始终。
请关注公众号:星河之码
在JDK提供的几种线程池技术,除了JDK8新加的newWorkStealingPool之外,其余的几种线程池都是通过ThreadPoolExecutor 来实现线程池技术,也即是线程池的工作原理机其实主要就是在说ThreadPoolExecutor的工作原理。这篇文章就来分析分析ThreadPoolExecutor的源码,看看它是怎么实现线程池的。
前面的两篇文章已经介绍了线程池的基本原理,本文主要通过源码的方式,在它们的基础上分析线程池的实现过程,有兴趣的可以去看看这两篇文章
- 《并发基础(三):线程》
- 《并发基础(四):线程池》
理解上述两篇文章的内容后,我们可以总结出线程池的实现其实就是围绕两点展开:
- 判断线程池是否存活
- 接受任务时,是否需要创建线程执行任务
这两点说白了也就是线程池的【状态】和【工作线程数量】,本文主要围绕这两点结合ThreadPoolExecutor源码中一下几个方法展开分析。
一、线程池的执行流程
既然线程池是ThreadPoolExecutor实现的,那线程池的执行流程也就是ThreadPoolExecutor的执行流程
ThreadPoolExecutor的执行流程在《并发基础(四):线程池》中已经提到过,这里就直接搬过来回顾一下。
当提交一个新的任务到线程池时,线程池的运行机制如下
-
当【工作线程数量 < 核心线程数】,则创建一个核心线程执行任务
只要工作线程数量 < 核心线程数,即使有空闲的线程也会创建一个新的
-
当【工作线程数量 >= 核心线程数,并且队列没满】,则会将task加入到workQueue队列中等待执行
-
当【corePoolSize <= 工作线程数量 < maximumPoolSize,且阻塞队列已满】,则创建一个非核心线程来执行任务。
-
当【工作线程数量 ==maximumPoolSize,且工作队列已满 】,则采取饱和策略处理task
二、ctl 属性解析
前面说到线程池的执行主要依靠【状态】和【可运行的线程数量】这两点,正常来说我们定义两个变量记录这这两个值就可以,但是变量多了,维护性就差,所以线程池的作者Doug Lea只用了一个变量(ctl)来记录这两个变量。
-
ctl是一个整型变量,将一个整型变量按二进制位分成两部分,分别记录线程池的生命周期状态和当前工作的线程数两个信息。
-
ctl原子整型的,对它进行的操作具有原子性,也就是修改线程池状态和线程数具有原子性,即只用一次 cas 原子操作就可以进行赋值更新两个信息
-
在 ctl 中用二进制的高3位表示线程状态,低29位表示线程个数
如上图,在ThreadPoolExecutor的源码定义了一个ctl变量,下面具体看看是如何实现一个变量存储两个值的。
2.1 ctl的线程池的状态
在ThreadPoolExecutor的源码中定义了线程池的五种运行状态,分别是:Running、ShutDown、Stop、Tidying、Terminated。
/**
* COUTING_BITS表示使用多少位来存储线程数量,Integer.SIZE=32表示Integer的长度,32-3=29,低 29 位代表线程个数
*/
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 线程池默认容量,因为使用了29位来存储,故容量为2^29 - 1
*/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS; // 运行中
private static final int SHUTDOWN = 0 << COUNT_BITS; // 已关闭
private static final int STOP = 1 << COUNT_BITS; // 已停止
private static final int TIDYING = 2 << COUNT_BITS; // 清洁中
private static final int TERMINATED = 3 << COUNT_BITS; // 已终止
ctl中通过高3位二进制表示线程池的5种状态,通过以上源码可分析具体实现,以线程池RUNNING状态为例:
private static final int RUNNING = -1 << COUNT_BITS;
private static final int COUNT_BITS = Integer.SIZE - 3;
计算RUNNING的值我们通过下面四个步骤来分析
-
COUNT_BITS 常量的值为 Integer.SIZE - 3,而Integer.SIZE为32,即COUNT_BITS的值为29, RUNNING = -1 << COUNT_BITS等价与 RUNNING = -1 << 29
-
-1的二进制为1111 1111 1111 1111 1111 1111 1111 1111
-1的二进制是1111 1111 ,在计算机中,负数采用补码的形式储存。补码后-1的二进制为:1111 1111 1111 1111 1111 1111 1111 1111
-
<< 为左移运算符 RUNNING = -1 << 29
-1 的通过二进制1111 1111 1111 1111 1111 1111 1111 1111左移29位后为: 11100000 00000000 00000000 00000000
-
左移后取其3个高位。即 111,也就是说:
private static final int RUNNING = -1 << COUNT_BITS = 111; // 运行中
通过以上分析就得到了RUNNING的高位为111,同理可以计算出其他四个状态的3个高位分别如下:
//高3位:111,接受新任务并处理排队任务
private static final int RUNNING = -1 << COUNT_BITS;
// 高3位:000,不接受新任务,但处理排队任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 高3位:001,不接受新任务,不处理排队任务,并中断正在进行的任务
private static final int STOP = 1 << COUNT_BITS;
// 高3位:010,所有任务都已终止,workerCount 为零,转换到状态 TIDYING 的线程将运行 terminate() 钩子方法,terminate()是个空方法,需要自己实现逻辑
private static final int TIDYING = 2 << COUNT_BITS;
// 高3位:011,terminate() 已执行完
private static final int TERMINATED = 3 << COUNT_BITS;
拓展一个小问题:为啥 COUNT_BITS的值是 Integer.SIZE - 3,而不是减4?
这是因为线程池有5个状态,而5的二进制是101,需要3位来表示。
2.2 ctl的线程数量
我们在知道有几种线程池的最大线程数是Integer.MAX_VALUE,实际上并不会达到这值,在上图的ThreadPoolExecutor源码定义了一个常量
// 线程池允许最大线程个数 2^29 - 1(即低 29 位全为 1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
CAPACITY是表示线程的最大线程数,一般我们在创建线程的时候会定义(或者默认)最大线程数
-
(1 << COUNT_BITS) - 即 (1 << 29) - 1
1的二进制为0001,补码之后为0001 0000 0000 0000 0000 0000 0000 0000
-
1<<29 表示向左进29位
0001 0000 0000 0000 0000 0000 0000 0000 向左进29位后00100000 00000000 00000000 00000000
-
(1 << 29) - 1 也就变成了00100000 00000000 00000000 00000000 -1
00100000 00000000 00000000 00000000转换为十进制是2^29 ,因此(1 << 29) - 1 = 2^29 - 1
-
2^29 - 1 转换为二进制就是00011111 11111111 11111111 11111111
通过以上4个步骤就计算出了在CTL中存放线程数的低29位(最大值)。接下来再看看ctl的打包与拆包
2.3 &运算 与 | 运算
在计算分析这两个方法之前先复习一下&运算与 | 运算,后面需要用。
-
&运算(按位与)
只有对应的两个二进位均为1时,结果位才为1 ,否则为0。&运算的数需要以二进制补码的形式出现。
例如:9&5, 00001001 (9的二进制补码) & 00000101 (5的二进制补码) = 00000001 (1的二进制补码),即 :9&5=1
-
| 运算(按位或)
只要对应的两个二进位有一个为1时,结果位就为1,两个二进位都为0时才为0。| 运算的数需要以二进制补码的形式出现。
例如:5 | 3 , 00000101 (5的二进制补码) | 00000011 (3的二进制补码) = 00000111(7的二进制补码)即 :5 | 3=7
2.4 打包函数
在下图的ThreadPoolExecutor源码中对CTL进行了初始化。
- 通过以上源码可知,源码中在初始化ctl的对象,通过构造方法传入了一个初始值
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
public AtomicInteger(int initialValue) {
value = initialValue;
}
-
初始值是通过ctlOf方法计算得出,ctl也叫打包函数,即将线程状态和线程数打包
rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,根据线程池状态和工作线程数量进行或运算合并它们返回
由ctl的初始化new AtomicInteger(ctlOf(RUNNING, 0))可知,状态 是rs为RUNNING 线程数wc为 0
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
-
以线程状态为RUNNING,线程数最大为例,计算ctl
//高3位:111,最大线程数的低29位:00011111 11111111 11111111 11111111
而111 补码后为 11100000 00000000 00000000 00000000
ctl = rs | wc = 11100000 00000000 00000000 00000000 | 00011111 11111111 11111111 11111111 = 11111111 11111111 11111111 11111111。
2.5 ctl 的最大值
通过以上的分析,我们就知道了在线程数最大的时候,线程的5个状态分别对应的ctl值:
-
线程状态 RUNNING时 , ctl =11111111 11111111 11111111 11111111
//高3位:111,最大线程数的低29位:00011111 11111111 11111111 11111111
-
线程状态 SHUTDOWN时, ctl =000 11111 11111111 11111111 11111111
//高3位:000,最大线程数的低29位:00011111 11111111 11111111 11111111
-
线程状态 STOP时, ctl =010 11111 11111111 11111111 11111111
//高3位:010,最大线程数的低29位:00011111 11111111 11111111 11111111
-
线程状态 TIDYING时, ctl =010 11111 11111111 11111111 11111111
//高3位:010,最大线程数的低29位:00011111 11111111 11111111 11111111
-
线程状态 TERMINATED时, ctl =011 11111 11111111 11111111 11111111
//高3位:011,最大线程数的低29位:00011111 11111111 11111111 11111111
2.6 拆包函数
前面通过打包函数ctlOf,知道了如何将线程状态和线程数合并打包成一个ctL变量,保证了两个属性的原子性,那么当我们需要用到线程状态或者线程个数的时候应该怎么获取呢?这个时候就需要拆包函数了
继续看ThreadPoolExecutor源码,发现在ctlof上面还有两个方法runStateOf与workerCountOf,这个两个方法就是拆包的方法
其实通过注释Packing and unpacking ctl 也可以看出 这三个方法是打包与拆包
仔细看发现这两个方法其实差别不大,就是一个【~】的区别。
在上面已经分析了CAPACITY的值实际上为/线程池允许最大线程个数 2^29 - 1,二进制为:00011111 11111111 11111111 11111111。下面以这个值来分析这两个方法。
2.6.1 计算工作线程的数量
传入参数为32位的ctl
// c & CAPACITY 的结果是 c的低29位
private static int workerCountOf(int c) { return c & CAPACITY; }
可以看到线程数:workerCount = c & CAPACITY ,以ctl = 111 11111 11111111 11111111 11111111为例,计算线程池的线程数:
workerCount = c & CAPACITY = 11111111 11111111 11111111 11111111 & 0001 1111 11111111 11111111 11111111 = 00011111 11111111 11111111 11111111 。即线程数为00011111 11111111 11111111 11111111 = 2^29 - 1。
ctl =111 00011111 11111111 11111111 11111111时,线程状态为: RUNNING
高3位:111,最大线程数的低29位:00011111 11111111 11111111 11111111为例,
2.6.2 计算线程池的运行状态
传入参数为32位的ctl
// c & ~CAPACITY 的结果是 c的高3位
private static int runStateOf(int c) { return c & ~CAPACITY; }
在计算运行状态runState的时候,可以看到在CAPACITY前面加了一个【】,符号代表【按位取反】。
还是以ctl = 111 11111 11111111 11111111 11111111为例,计算线程池的运行状态可以分为以下两步:
ctl =111 00011111 11111111 11111111 11111111时,线程状态为: RUNNING
高3位:111,最大线程数的低29位:00011111 11111111 11111111 11111111为例,
-
先计算 ~CAPACITY ,即将CAPACITY 按位取反
~CAPACITY = ~ 00011111 11111111 11111111 11111111 = 11100000 00000000 00000000 00000000
-
在计算c & ~CAPACITY
runState = c & ~CAPACITY = 111 11111 11111111 11111111 11111111 &11100000 00000000 00000000 00000000 = 11100000 00000000 00000000 00000000
因此线程状态为高3位的111,即RUNNING 状态。
三、execute
通过上述的的分析,我们知道了线程池的状态和可运行的线程数的存储与计算,接下来就来看看线程池是如何执行,又是如何使用ctl的,线程池可以通过execute 与submit两个方法执行,而submit方法也是调用的execute,因此我们主要来看看execute的执行过程。
先看看execute 的源码
通过源码中的注释的描述告诉我们execute方法分 3 个步骤执行:
- 如果活跃线程少于corePoolSize,则尝试启动一个新线程执行任务。 并且通过调用addWorker以原子方式检查 runState 和 workerCount
- 如果一个任务可以成功添加到阻塞队列(活跃线程不少于corePoolSize)后,还需要检查一下线程池状态,因为可能在入队后,线程池关闭。
- 如果不能加入队列(队列已满),则尝试添加一个新的线程(非核心线程)执行任务,如果创建失败,则说明线程池已经关闭或饱和所以拒绝任务。
接下来围绕3点,具体看看execute方法的实现
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取ctl的值
int c = ctl.get();
/**
* 第一步:
* 调用workerCountOf 计算活跃的工作线程数
* 判断工作线程数 是否小于 核心线程数
*/
if (workerCountOf(c) < corePoolSize) {
// 工作线程数 小于 核心线程数,则通过addWorker方法创建核心线程,true代表核心线程
if (addWorker(command, true))
// 此时线程被创建,但是需要等调用start();线程被启动了,才算addWorker成功
return;
//创建新的线程后,ctl被修改,这里重新获取ctl属性值,因为当有多个线程同时调用addWorker,只有一个线程会成功创建新的线程后修改ctl,此时其他的线程需要重新获取ctl属性值
c = ctl.get();
}
/**
* 第二步:
* 工作线程数 不小于 核心线程数,且当线程池处于RUNNING状态时
* 尝试将任务加入到队列中
*
* isRunning(c) 判断线程池状态,创建线程失败可能造成线程池停止,所以需要再次判断线程池运行状态
*/
if (isRunning(c) && workQueue.offer(command)) {
// 添加任务到队列成功后,再次获取最新ctl属性值
int recheck = ctl.get();
// 加入队列后这里又重新调用了isRunning(recheck) ,主要是防止任务加入队列后,线程池状态改变了
//如果线程池不是RUNNING状态时,就需要移除已加入队列的任务(线程池不是RUNNING状态,比如调用了 shutdown 方法等,无法执行)
if (! isRunning(recheck) && remove(command))
// 如果线程池不是RUNNING状态,并且移除任务成功,则执行拒绝策略
reject(command);
// 走到这里说明,线程池不是处于RUNNING状态
// 调用workerCountOf(recheck) 获取活跃的工作线程,判断是否为0,如果为0,则创建非核心线程将队列中任务执行完
else if (workerCountOf(recheck) == 0)
// 如果工作线程为0,则创建一个空的任务,通过addWorker方法的t.start启动线程,然后通过runWorker方法执行task.run(),直到队列中所有的任务都执行完成
addWorker(null, false);
}
/**
* 第三步:
* 如果队列满了,则创建非核心线程,false代表非核心线程
*/
else if (!addWorker(command, false))
// 核心线程数 + 非核心线程数 超过了最大线程数,则执行拒绝策略
reject(command);
}
通过队execute的整体理解,这个方法主要就是围绕上面三点实现的,在其中有好几处都用到了addWorker方法来创建线程,接下来分析一下addWorker这个方法,不过在这之前,先来看看什么是Worker
四、Worker
4.1 什么是Worker
线程池将【创建出来的线程】都封装为 Worker,以Worker的形式维护线程运行任务的中断控制状态, 线程池模型的本质是生产者-消费者模型,生产者不断地往 workQueue 中丢 task, workQueue 负责源源不断地输送着任务,而 worker(线程) 不断地从workQueue获取任务来执行。
4.2 Worker有什么作用
既然线程池创建线程后在workQueue中消费task,那为什么要将线程封装成一个worker呢,直接使用线程不行吗啊,这么做有啥好处呢
带着上面这个问题,来看看源码中线程被封装成worker做了什么
Worker是一个内部对象,是ThreadPoolExecutor的一个内部类
通过源码可以看出,其实Worker继承了AbstractQueuedSynchronizer,并且实现了Runnable接口
-
继承AbstractQueuedSynchronizer
AbstractQueuedSynchronizer 也就是我们常说的AQS,继承AQS,主要处理线程中断相关操作
实际上通过用 AQS 实现了一个独占锁,在执行worker的时候会先上锁,当程序执行shutdown,setCorePoolSize,setMaximumPoolSize等方法时会尝试中断线程,在中断方法(interruptIdleWorkers) 中会先尝试获取 worker 的锁,如果不成功,说明 worker 在运行中,此时会先让 worker 执行完任务再关闭 worker 的线程。
-
实现了Runnable
实现Runnable,说明worker 是一个 Runnable 任务,它主要存储需要执行的任务
接下来分析一下Worker整体源码的实现,Worker类的代码很少,只有50多行。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 执行工作的线程对象 ,通过工厂获取,如果工厂失败,则为空 */
final Thread thread;
/** Worker要执行的第一个任务 可能为空。 */
Runnable firstTask;
/** 线程任务计数器 */
volatile long completedTasks;
/**
* 构造方法,通过有参构造创建Worker
* 使用 ThreadFactory工厂 给第一个任务和线程创建。
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 为了在线程真正开始运行任务之前禁止中断,所以将锁定状态初始化为负值,并在启动时将其清除(在 runWorker 中)
/**
* 在执行runWorker禁止中断
* 防止线程真正开始运行任务之前中断,将锁定状态初始化为负值,并在启动时将其清除(在 runWorker 中)
*/
setState(-1);
/**第一次new Worker时,会将有参构造的任务赋值给firstTask*/
this.firstTask = firstTask;
/** 使用工厂对象创建线程, 并把worker本身传进去 */
this.thread = getThreadFactory().newThread(this);
}
/** 当调用t.start()时,会执行run方法,这个方法将执行线程进一步委托给runWorker方法 */
public void run() {
/** 这是Worker的核心工作方法,需要传入Worker对象 */
runWorker(this);
}
/** 这是一个AQS的锁定方法,这个方法通过AQS实现了不可重入锁
* 值为 0 代表解锁状态。
* 值为 1 代表锁定状态。
* 中断线程不会立即让线程停止,只是将Thread的中断标记设置为true,等待线程执行完正在执行的任务后中断
*/
protected boolean isHeldExclusively() {
return getState() != 0;
}
/** 获取锁的方法,调用这个方法 尝试获取锁 */
protected boolean tryAcquire(int unused) {
/**
* 从这里可以看出来,这里用了CAS的实现
* 当获取锁后 cas 设置 state 不会成功,
* 因为它是用0替换1,而在上面的构造方法中把State设置成了 -1,所以在调用runWorker之前不可能获取到锁
* */
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 释放锁的方法,调用这个方法 释放独占锁
* 在runWorker中,调用Worker.unlock(); -> release(1); -> tryRelease(1)
* 最终调到这里 将State值设置为0 表示当前线程允许被中断
* */
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
/** 将State值设置为0 ,而0代表解锁状态。*/
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 中断线程,这个方法会被 shutdowNow 调用,
// 从中可以看出 shutdownNow 要中断线程不需要获取锁,也就是说如果线程正在运行,照样会给你中断掉,所以一般来说我们不用 shutdowNow 来中断线程,太粗暴了,中断时线程很可能在执行任务,影响任务执行
/**
* 中断线程的方法
* 这个方法跟interruptIdleWorkers不同,
* interruptIfStarted:可以看出它的实现没有获取锁,也就是说这个方法要中断线程不需要获取锁,即使线程正在运行,也会中断掉
* interruptIdleWorkers:中会调用 mainLock.lock();先去获取锁,然后中断
* interruptIfStarted方法会被 shutdowNow 调用,这也就解释了为啥shutdowNow是直接停止线程
* interruptIfStarted会被被 shutdow 调用,这也就解释了shutdow为啥要等线程执行完成后才停止线程
*
* 所以一般来说不用 shutdowNow 来中断线程,当中断时线程时有执行任务,就会影响任务执行
*/
void interruptIfStarted() {
Thread t;
/** 虽然这个方法中断线程没有获取锁,但是它要求线程的锁定状态state >= 0
* 所以这又一次说明了构造方法中state设置为 -1 ,不执行中断的原理了
*/
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
通过上述队Worker的源码分析,我们可以总结一下这个worker的作用
- Worker 主要维护线程运行任务的状态,防止在执行任务时被中断。
- 在构造方法中将状态设置为-1,可以防止线程在执行之前被中断
- Worker继承AQS,通过独占锁的方式,管理线程的执行与中断。
所以总的来说:将线程封装为 worker 主要是为了更好地管理线程的中断,实现线程执行与关闭优雅控制。
五、addWorker
5.1 addWorker 介绍
了解了worker实现机制和作用,再来看addWorker就容易很多了,在前面介绍execute的时候,说到好几个地方都用到了addWorker方法,接下来就来看看方法到底做了什么,先看看源码中的定义:
首先通过源码中的注释看看作者是怎么介绍这个方法的,这个注释主要对addWorker放的用途以及两个参数做了说明
方法说明(源码注释的翻译)
- 检查是当前线程池状态和给定界限(核心线程数或最大值线程数),确定是否可以添加一个新的Worker工作线程。
- 如果可以创建Worker工作线程。则先让工作线程数自增,并且如果可能,创建并启动一个新工作线程,将 firstTask 作为其第一个任务运行。
- 如果线程池已停止或有符合关闭条件,则此方法返回 false。
- 如果线程工厂在询问时未能创建线程,它也会返回 false。
- 如果线程创建失败,要么是由于线程工厂返回 null,要么是由于异常(通常是 Thread.start() 中的 OutOfMemoryError),这种情况会直接回滚。
参数说明(源码注释的翻译)
- firstTask :新线程应该首先运行的任务(如果没有,则为 null)
- core :如果为 true,则使用 corePoolSize 创建核心线程,否则使用 maximumPoolSize创建非核心线程。
了解了这个方法作用与参数的含义接下来看这个方法具体的实现逻辑
5.2 addWorker 源码分析
addWorker方法看名字就是添加worker,在ThreadPoolExecutor中定义了一个变量用来存放worker
这个变量在源码中也有注释说明:包含池中所有工作线程的集合。 仅在持有 mainLock 时访问。说明操作这个map要加锁,具体在哪里加的锁,来看看addWorker的具体实现
/**
* 这个方法主要分为两段来理解,其实源码中这段也比较清晰,
* 第一段被for循环的括号包裹,
* 第二段被 try - catch - finally 的括号包裹
*/
private boolean addWorker(Runnable firstTask, boolean core) {
/** 一个标记,跳出循环的时候用的 */
retry:
/** 第一段 */
for (;;) {
/** 这个在前文已经介绍了 获取ctl的值,ctl 记录了线程池 状态和活跃线程数量 */
int c = ctl.get();
/** 通过runStateOf方法解析ctl 获取线程池状态 */
int rs = runStateOf(c);
/**
* 判断线程状态是否>= SHUTDOWN,即其他四个状态: SHUTDOWN,STOP,TIDYING,TERMINATED
* 这四个状态中只有【当状态为SHUTDOWN,任务为空,且且队列非空时】才会去创建一个线程去处理队列中的任务,主要是为了加速处理完 workQueue 中的任务
* (即代码中 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) 满足时,
* 可以看到这里取反,所以不会返回false,而是继续往下执行)
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
/** 通过workerCountOf方法解析ctl 获取线程池的活跃线程数量 */
int wc = workerCountOf(c);
/**
* 判断线程数是否超过最大线程数,通过core来确定是核心线程还是非核心线程
* CAPACITY 时线程池支持的最大线程数2^29 - 1
* 如果超过,则说明不能新建线程,直接返回false
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/** 走到这说明线程没有超过阈值,则通过CAS对ctl中的工作线程数量+1*/
if (compareAndIncrementWorkerCount(c))
// 如果ctl中的工作线程数量+1成功,直接通过retry跳出双层循环
break retry;
/** 由于并发时,只有一个线程(主线程)能创建线程(线程池中的子线程),让失败的主线程重新获取ctl值 */
c = ctl.get();
/**
* 如果ctl中的工作线程数量+1 失败,并且线程池状态改变,执行跳转到外层循环
* 这也是在并发时,当第一次获取线程失败,说明已经有一个线程已经修改了ctl,就需要重新获取线程池状态,重新开始外层for循环
*/
if (runStateOf(c) != rs)
/** 跳转到外层循环继续执行*/
continue retry;
// else CAS failed due to workerCount change; retry inner loop
/** 没有进 if (runStateOf(c) != rs)中,说明是因为 CAS 增加线程数量失败所致,继续执行 retry 的内层循环*/
}
}
/** 第二段 */
boolean workerStarted = false; // 工作线程是否启动的标志
boolean workerAdded = false; // 工作线程是否添加的标志
Worker w = null; // Worker 就是工作线程
/**
* 前面已经校验了 线程池的状态和线程数量,
* 程序执行到这里,说明满足增加 worker 的条件了,所以这里开始创建 worker,准备添加进线程池中执行任务
*/
try {
/**传入一个 Thread线程属性,通过Worker的构造方法,调用工厂方法将Thread线程封装为一个worker对象*/
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
/**
* 加锁执行,这里就用到了前面说的加锁变量,因为要把worker加入到workers中,而workers 是 HashSet,不是线程安全的,所以要加锁
* 为什么加锁前面也说过:防止在创建线程的过程中再次执行shutdown、shutdownNow方法
* */
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/** 持有锁时的双重检查。防止ThreadFactory工厂创建失败,或者其他线程修改线程池的状态*/
int rs = runStateOf(ctl.get());
/**
* 如果满足以下两个条件之一 就可以添加 worker
* 1.线程池状态小于 SHUTDOWN,也就是RUNNING状态,
* 2.状态为 SHUTDOWN 但 firstTask == null(代表不接收新的任务,只创建线程处理 workQueue 中的任务)
* 二者满足其一即可
* */
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
/** 预先检查 t 是否可启动*/
if (t.isAlive()) // precheck that t is startable
/** 如果线程已启动,说明在启动线程t之前已存在t,就抛异常,因为此时只是创建了一个worker,还没有调用启动方法 */
throw new IllegalThreadStateException();
/** 将工作线程worker 加入集合workers ,此时是持有mainLock的 */
workers.add(w);
int s = workers.size();
/** 获取集合中工作线程的数量,与历史最大的工作线程数时比较*/
if (s > largestPoolSize)
/**
* 如果当前的工作线程数 大于 历史最大的工作线程数时,则更新历史最大工作线程数
* 记录最大的线程池大小主要用来作监控用
*/
largestPoolSize = s;
/** try 前面定义的标志 工作线程添加成功的标志 */
workerAdded = true;
}
} finally {
/** 释放前面获取的锁 */
mainLock.unlock();
}
if (workerAdded) {
/**
* 说明往 workers 中添加 worker 成功,此时启动线程
* Worker内的线程被启动了,才算addWorker成功
*
* 这个方法很重要
* 调用t.start()方法后 Worker内的线程被启动,会执行会执行Worker内的run方法,
* 在分析Worker的源码时,我们知道run方法会执行runWorker方法
* 即:public void run() { runWorker(this); }
* 最终执行线程
*/
t.start();
/** try 前面定义的标志 线程启动成功的标志*/
workerStarted = true;
}
}
} finally {
if (! workerStarted) {
/**
* 通过线程启动成功的标志判断 如果走到这里,说明线程启动失败
* 执行 addWorkerFailed 方法
* 启动失败将 worker 从 workers 中移除,减少线程数,并尝试着关闭线程池这样的操作
* */
addWorkerFailed(w);
}
}
return workerStarted;
}
分析完addWorker方法发现,其实这个方法还是很容易理解的,作者在写这个方法的时候,甚至从代码的格式上就给人一种分段的感觉,主要分为两段:
- 第一段被for循环的括号包裹,主要作用是外层for循环判断线程状态,内层for循环判断工作线程数量
- 第二段被 try - catch - finally 的括号包裹,主要作用是处理启动工作线程的流程
在上面的分析中,addWorker方法的最后,当线程启动失败,会调用一个addWorkerFailed方法,如下
通过这方法的注释可以知道这个方法主要就是为了在线程启动失败的时候将 worker 从 workers 中移除,减少线程数,并尝试着关闭线程池这样的操作的。
/**
* 线程启动失败后的回滚操作
* 将 worker 从 workers 中移除,减少线程数,并尝试着关闭线程池这样的操作的
* */
private void addWorkerFailed(Worker w) {
/** 前面说到操作Workers 要加锁,这里将 worker 从 workers 中移除 同样也要加锁 */
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
/** 将addWorker方法加入到集合中的线程移除 */
workers.remove(w);
/** 通过CAS的,将addWorker方法增加的工作线程数减去从ctl中减去 */
decrementWorkerCount();
/**线程启动失败 可能是线程状态变了,如果线程状态变是为SHUTDOWN、STOP状态,就尝试将线程状态转为TIDING状态*/
tryTerminate();
} finally {
/** 释放锁*/
mainLock.unlock();
}
}
六、runWorker
在分析addworker的时候,说到当启动线程调用start()方法的时候会触发worker内部的run方法,而run方法又会委托runWorker方法来执行线程,如下
为啥调用start()就会调用run方法?在前面分析Worker的时候,构造方法在创建 Worker 时,将 Worker 自己传给了此线程,所以启动线程后,会调用 Worker 的 run 方法。
6.1 runWorker 介绍
接下来分析一下runWorker方法的具体实现,还是一样的先看看源码中解释看看正方方法是做什么的
主要工作人员运行循环。反复从队列中获取任务并执行它们,同时处理许多问题:
-
我们可能从一个初始任务开始,在这种情况下我们不需要获取第一个任务。否则,只要 pool 正在运行,我们就从 getTask 获取任务。如果它返回 null ,则工作人员会因池状态或配置参数的更改而退出。其他退出是由外部代码中的异常抛出导致的,在这种情况下,completedAbruptly 成立,这通常会导致 processWorkerExit 替换该线程。
-
在运行任何任务之前,获取锁以防止在任务执行过程中发生其他池中断,然后我们确保除非池停止,否则该线程没有设置其中断。
-
每个任务运行之前都会调用 beforeExecute,这可能会抛出异常,在这种情况下,我们会导致线程死掉(用 completedAbruptly true 中断循环)而不处理任务。
-
假设 beforeExecute 正常完成,我们运行任务,收集它抛出的任何异常发送给 afterExecute。我们分别处理 RuntimeException、Error(规范保证我们捕获)和任意 Throwables。因为我们不能在 Runnable.run 中重新抛出 Throwables,所以我们在退出时将它们包装在 Errors 中(到线程的 UncaughtExceptionHandler)。任何抛出的异常也会保守地导致线程死亡。
-
task.run完成后,我们调用afterExecute,也可能会抛出异常,也会导致线程死掉。异常机制的最终效果是 afterExecute 和线程的 UncaughtExceptionHandler 具有我们可以提供的关于用户代码遇到的任何问题的准确信息。
6.2 runWorker 源码分析
runWorker方法主要运行工作线程。执行传入的Worker和从队列中获取tasks:
final void runWorker(Worker w) {
/** 获取当前线程*/
Thread wt = Thread.currentThread();
/** 获取工作线程的任务 */
Runnable task = w.firstTask;
/** 获取任务后 将工作线程的任务置空 */
w.firstTask = null;
/**
* 将工作线程解锁 ,执行过程中是可以被中断的(shutdowNow是直接停止线程)
* w.unlock(); -> release(1); -> tryRelease(1),最终将state置为0,表示当前线程允许被中断
*/
w.unlock(); // allow interrupts
/** 用于判断工作线程异常而死亡的标志*/
boolean completedAbruptly = true;
try {
/**
* 当创建线程时传入了任务,执行execute、submit时,传入的任务直接被处理
* 当没有传入任务,或者传入的任务被执行完成后,会通过getTask()从队列中过去任务执行
* 所以这里的while判断 只要传入的任务或者队列中的任务有一个不为空,就会被执行
*
* 从这里也可以看出来:
* 提交任务优先级:核心线程 > 任务队列 > 非核心线程
* 执行任务优先级:核心线程 > 非核心线程 > 任务队列,(因为首先判断的是task != null,task包含核心线程和非核心线程中的任务)
*/
while (task != null || (task = getTask()) != null) {
/**
* 当工作线程获取到任务之后, 会对这个Worker加锁,
* 可以看到这里加锁是直接w.lock(); 它跟addWorker中的mainLock.lock()不同
* mainLock.lock()是通过AQS实现的不可重入锁,mainLock.lock(); -> acquire(1); -> tryAcquire(1),
* 最终将state置为1,表示在SHUTDOWN状态下,当前线程不允许被中断
* */
w.lock();
/**
* 在运行线程之前先判断线程的状态 是否被中断
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
/** 将中断标记置为true */
wt.interrupt();
try {
/** 这是钩子函数,里面是空实现,需要自己实现,可以在执行之前,子类可实现此钩子方法 */
beforeExecute(wt, task);
Throwable thrown = null;
try {
/** task是封装到Worker对象中的Runnable任务 */
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
/** 这是钩子函数,里面是空实现,需要自己实现,可以在执行之后,子类可实现此钩子方法 */
afterExecute(task, thrown);
}
} finally {
/** 任务执行完后,将task重置为null */
task = null;
/** 任务执行成功的个数+1 */
w.completedTasks++;
/** 释放锁 将state置为0 */
w.unlock();
}
}
/** 用于判断工作线程异常而死亡的标志 上面抛异常后,不会执行这里 */
completedAbruptly = false;
} finally {
/** processWorkerExit对worker进行回收 */
processWorkerExit(w, completedAbruptly);
}
}
runWorker中提供了两个钩子方法,可以对线程在执行之前和之后对线程做一些监控,类似切面的前置和后置通知。
七、getTask
7.1 getTask 介绍
在runWorker方法的理解中,工作线程被封装成Worker后执行任务,而task是创建线程时传入或者时从队列中获取的,如下:
通这里可以看出,Worker是通过getTask方法从workQueue中获取task的。
老规矩,先看注释怎么说,这个方法没有参数,返回值为task。
- 根据配置执行阻塞或定时等待任务,(即Worker获取任务,也会判断task 是否有超时)
- 当工作线程由于以下几种原因之一必须退出时,则返回 null:
- 工作线程超过 maximumPoolSize(可能在task到队列后调用了 setMaximumPoolSize)。
- 线程池停止。
- 线程池关闭,队列为空。
- 这个worker超时等待一个任务,超时worker在定时等待前后都会被终止(即allowCoreThreadTimeOut || workerCount > corePoolSize),如果队列非空,这个worker 不是池中的最后一个线程。
7.2 getTask 源码介绍
大致了解了getTask方法的作用后,基于以上几点来看看源码的具体实现。getTask方法的实现也比较清晰,其实主要就三点
- 判断线程池状态
- 判断工作线程数量
- 从工作队列中获取任务
下面类分析一下这三点是怎么实现的
/**
* 从工作队列中获取任务
*/
private Runnable getTask() {
/** 标识:是否需要通过队列中的poll方法获取任务,poll方法可以设置超时时间,超时后将清除工作线程 */
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
/** 获取ctl的值 */
int c = ctl.get();
/** 通过ctl值获取线程池状态 */
int rs = runStateOf(c);
/**
* rs >= SHUTDOWN 表示检查线程池状态是否为SHUTDOWN、STOP
* rs >= STOP满足时,线程池不会接受新任务,也不会处理队列的认为,并且需要从活跃线程集合中移除当前线程
* workQueue.isEmpty() 时,说明没有任务了,此时也要将线程回收
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
/**通过CAS让ctl中的工作线程数自减*/
decrementWorkerCount();
/**
* 这里返回null,就会回到runWorker方法的while (task != null || (task = getTask()) != null)这一步
* 所以runWorker方法会跳出循环,最终执行最下面finally中的 processWorkerExit方法将对worker进行回收,
* 即将工作线程从集合Workers中移除
* */
return null;
}
/** 通过ctl值获取线程池活跃的线程数 */
int wc = workerCountOf(c);
/**
* 这例有两点:
* 第一:
* allowCoreThreadTimeOut 是一个变量 定义如下:private volatile boolean allowCoreThreadTimeOut;
* allowCoreThreadTimeOut = false(默认值) 表示核心线程即使在空闲时也保持活动状态
* allowCoreThreadTimeOut = true 表示核心线程使用 keepAliveTime 来超时等待工作
*(以上针对 allowCoreThreadTimeOut 属性的描述 在其定义的地方有注释说明)
*
* 第二:
* wc > corePoolSize 判断当前工作线程数是否大于核心线程数,目的是为了判断当前线程是核心线程还是非核心线程
*
* 这俩个判断的结果住就是用来 判断在超时时间keepAliveTime到了之后,闲置的线程要不要被回收
* */
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* wc > maximumPoolSize :判断工作线程数是否大于最大线程数
* timed && timedOut : timed在上面已经解释了,
* timed与timedOut主要就是为了判断当前线程在超时时间到了,还没有获取到任务,处于空闲阶段,需要尝试移除当前线程
* wc > 1 || workQueue.isEmpty() : 表示工作线程数超过1,或者工作队列为空,则可以移除当前线程
* 当wc > 1成立时,即使移除了当前线程,还有其他线程可以继续执行队列中任务
* 当wc > 1不成立时,再判断workQueue.isEmpty(),工作队列为空,移除当前线程也不会影响
* */
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
/**通过CAS将工作线程数-1*/
if (compareAndDecrementWorkerCount(c))
// 返回到while (task != null || (task = getTask()) != null)这一步,不满足该条件,退出while循环
// 执行processWorkerExit(w, completedAbruptly);将工作线程从集合中移除
/**
* CAS执行成功返回null:
*
* 跟上面的返回null一样
* 这里返回null,就会回到runWorker方法的while (task != null || (task = getTask()) != null)这一步
* 所以runWorker方法会跳出循环,最终执行最下面finally中的 processWorkerExit方法将对worker进行回收,
* 即将工作线程从集合Workers中移除
* */
return null;
/** CAS执行失败 可能是其他线程修改了CTl的值, 则continue,在for循环中重试 */
continue;
}
/** 上面判断了线程状态 线程数,走到这里,说明是可以执行获取任务了 */
try {
/**
* timed在上面已经解释了
* timed = true :通过poll带超时的方法获取任务,超过存活时间keepAliveTime,则继续进入for循环,进入上面的if条件中去移除当前线程
* timed = false : 通过阻塞式take从队列获取任务,获取不到任务会在这里阻塞住,不再往下执行,除非有异常
* */
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
/** 到这里 说明获取到了任务,将任务返回,去runWorker中执行*/
return r;
/**
* 走到这里说明 没有获取到任务,而workQueue.take();没有获取到会阻塞,
* 所以走到这里只会是workQueue.poll时,超时了还没有获取任务,将线程的超时标记 timedOut 置为 true,
* 下次循环时,在判断(timed && timedOut)的时候就会满足条件并移除当前线程
* */
timedOut = true;
} catch (InterruptedException retry) {
/**
* 走到这里 可能是workQueue.take()的阻塞被打断了,或是其他原因,
* 将超时标记 timedOut 置为 false,再次循环重试
* */
timedOut = false;
}
}
}
八、processWorkerExit
8.1 processWorkerExit是什么
到这里线程的封装,状态的转变,任务的执行,就都已经清晰了,还剩下最后一点,线程的清理,前面说了很多情况会移除线程,那么它究竟是怎么移除的呢?
在runWorker中最后执行了一个finally,其中就调用了一个清理回收Worker的方法processWorkerExit
通过源码的介绍,先看看这个这个方法的作用
- 为即将消亡的worker进行清理和记录。仅从工作线程调用。 除非设置了 completedAbruptly,否则假定 workerCount 已经被调整以考虑退出。
- 此方法从worker集合中删除工作线程,并且如果由于task异常而退出 或少于正在运行的核心线程数量 或 队列非空但没有worker,则可能终止线程池或替换worker。
8.2 processWorkerExit 源码解析
带着上面官方针对这个方法给的两个描述,可以发现processWorkerExit也一样主要就做了两件事
- 从工作线程集合workers中移除当前wor ker
- 清理队列中的剩余任务,保证剩余任务有线程能够执行
带着这两点来看看源码的具体实现
/**
* 为即将消亡的worker进行清理和记录
* @param w : 当前线程
* @param completedAbruptly 用于判断工作线程异常而死亡的标志 由runWorker传入
*/
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
/**
* 一般来说这个判断不会进去,因为runWorker默认的true,但是要runWorker正常执行就会被修改为false
*
* 如果这个值到这里还是为true,那说明runWorker抛异常了,而一般来说这种情况大多是
* beforeExecute(wt, task) 或 afterExecute(task, null)这两个钩子函数抛出了异常
*/
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
/** runWorker 执行异常 将ctl中的工作线程数自减 */
decrementWorkerCount();
/** 获取一个 可重入锁 */
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/**
* completedTaskCount : 是ThreadPoolExecutor一个变量,记录线程池已完成任务的计数器。
* 仅在工作线程终止时更新。 只能在 mainLock 下访问。
* completedTaskCount += w.completedTasks: 将当前线程已完成的任务数同步到线程池中记录的完成任务数
*/
completedTaskCount += w.completedTasks;
/** 从workers集合中移除该 worker */
workers.remove(w);
} finally {
/** 释放锁 */
mainLock.unlock();
}
/**
* 尝试终止线程池
* 移除Worker后,需要重新判断线程池的状态是否满足终止
* */
tryTerminate();
/** 获取ctl的值 */
int c = ctl.get();
/**判断线程池的状态是否< STOP(RUNNING 或 SHUTDOWN)*/
if (runStateLessThan(c, STOP)) {
/**completedAbruptly为false,表示是正常状态下移除了当前线程*/
if (!completedAbruptly) {
/** 通过allowCoreThreadTimeOut 是否允许核心线程数超时的设置获取核心线程数*/
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
/** 当核心线程数为0的时候,判断队列是否为空,如果为空就讲吧核心数设置为1,这样可以保证队列的任务有线程在执行 */
if (min == 0 && ! workQueue.isEmpty())
min = 1;
/**
* 通过workerCountOf获取线程池活跃线程数
* 判断是否大于1,如果大于1,则说明有线程在处理队列任务,直接返回即可
* */
if (workerCountOf(c) >= min)
return;
}
/**
* 走到这里只有两种情况,
* completedAbruptly 为true:在上面已经说了,只有热runWorker非正常执行才会返回true,此时
* 或者
* workerCountOf(c) >= min返回false:说明队列还有任务,但是没有线程在执行了
*
* 以上两种情况都需要创建一个线程来执行队列中的task
* addWorker(null, false)
* null:表示这个线程自身没有任务,专门用来处理队列中的task
* false: 表示创建一个非核心线程来执行队列的task
* */
addWorker(null, false);
}
}
九、ThreadPoolExecutor总结
到这里我们发现线程池的执行核心其实就是ThreadPoolExecutor的以下方法的实现:
结合这几个方法的实现以及在文章开头描述的线程池在接收新task时的步骤,总结一下线程池的执行原理如下:
结合上述原理图,在回过头来看各个方法的执行机制,进一步理解整个ThreadPoolExecutor的执行过程就清晰很多了,结合源码的调用链以及线程池的原理,整体的执行流程如下:
通过上述的5个方法以及一个Worker对象,ThreadPoolExecutor就实现了线程池的核心,其设计的优秀之处总结下来有以下几点:
- 通过一个整型变量ctl来保存线程池状态和工作线程数,保证这两个变量的原子性
- 将工作线程封装为一个worker,通过用 AQS 实现了一个独占锁,维护线程运行任务的中断控制状态,并且将初始值state设置为-1,保证线程运行后才能中断
- 使用多态的方式重写run方法,实现线程池的不同任务的执行
- 在执行任务时,可以重新beforeExecute和afterExecute两个钩子方法,实现对线程执行的监控(类似切面)
- 使用阻塞队列的方式进行线程阻塞,保证队列为空时线程在能够被回收
十、扩展
本文主要是对线程池的实现原理,以及ThreadPoolExecutor中线程池源码中线程运行机制进行了一个详细的解析,实际上ThreadPoolExecutor源码中还有很多方法的实现值得我们去研究,比如:
-
线程池的关闭
shutdownNow(),shutdown()
这两个方法可以关闭线程池
-
核心线程池的预热
prestartAllCoreThreads()
我们知道核心线程被创建出来后,执行完任务被线程池回收,不会销毁,
为了进一步提高线程池的执行效率,我们可以提前执行 prestartAllCoreThreads() 方法,一次性创建 corePoolSize 个线程,只要线程池接收到任务,直接在池中获取就可以,不需要创建。
通过阅读ThreadPoolExecutor源码的源码,可以发现作者对每个方法的注释都写的很清晰,阅读起来事半功倍,这也是值得我们学习的。