ThreadPoolExecutor
的源码解析
线程池的核心属性
ctl:当前的
ctl
就是一个int
类型的数值,内部是基于AtomicInteger
套了一层,进行运算时,是原子性的
ctl
表示的线程池的两种核心状态:
- 线程池的状态:
ctl
的高3位标识线程池的状态- 工作线程的状态:
ctl
的低29位,表示工作线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
COUNT_BITS: 声明了一个常量,
COUNT_BITS = 29
Integer.SIZE
:在获取Integer
的bit的个数
private static final int COUNT_BITS = Integer.SIZE - 3;
CAPACITY:就是当前工作线程能记录工作线程的最大个数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
线程池五种状态的表示
1.RUNNING:
只有此状态表示线程池没有问题**,可以正常接受任务处理**
111**:高三位(代表running状态),
running
表示可以处理任务,或者阻塞队列的任务
private static final int RUNNING = -1 << COUNT_BITS;
2.SHUTDOWN:
000:代表
shutdown
的状态,不会接受新任务,正在处理的任务正常进行,阻塞队列的任务也会完成
private static final int SHUTDOWN = 0 << COUNT_BITS;
3.STOP:
001:代表我们的stop状态,不会接受新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管
private static final int STOP = 1 << COUNT_BITS;
4.TIDYING:
010:代表TIDYING状态,这个状态是
shutdown
或者stop
的状态转换过来的,代表当前线程马上关闭,就是一个过渡状态
private static final int TIDYING = 2 << COUNT_BITS;
5.TERMINATED
011:代表TERMINATED状态,这个状态是TIDYING状态转换过来的转换过来只需要执行一个terminated方法
private static final int TERMINATED = 3 << COUNT_BITS;
线程池的核心属性整体源码解析代码
//当前的ctl就是一个int类型的数值,内部是基于AtomicInteger套了一层,进行运算时,是原子性的
//ctl表示的线程池的两种核心状态:
//线程池的状态: ctl的高3位标识线程池的状态
//工作线程的状态:ctl的低29位,表示工作线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//声明了一个常量, COUNT_BITS = 29
//Integer.SIZE:在获取Integer的bit的个数
private static final int COUNT_BITS = Integer.SIZE - 3;
//CAPACITY就是当前工作线程能记录工作线程的最大个数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态的表示:
//当前的五个状态:只有running状态表示线程池没有问题,可以正常接受任务处理
///111:高三位(代表running状态),running表示可以处理任务,或者阻塞队列的任务
private static final int RUNNING = -1 << COUNT_BITS;
///000:代表shutdown的状态,不会接受新任务,正在处理的任务正常进行,阻塞队列的任务也会完成
private static final int SHUTDOWN = 0 << COUNT_BITS;
//001:代表我们的stop状态,不会接受新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管
private static final int STOP = 1 << COUNT_BITS;
//010:代表TIDYING状态,这个状态是shutdown或者stop的状态转换过来的,代表当前线程马上关闭,就是一个过渡状态
private static final int TIDYING = 2 << COUNT_BITS;
//011:代表TERMINATED状态,这个状态是TIDYING状态转换过来的转换过来只需要执行一个terminated方法
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl:在使用下面的三个值时,需要传进来ctl
//拿到高三位的值
//基于&运算的特点,保证只会拿到ctl的高三位值
private static int runStateOf(int c) { return c & ~CAPACITY; }
//基于&运算的特点,保证只会拿到ctl的低29位的值
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池的状态转换图
ThreadPoolExecutor的有参构造
//无论调用哪个有参构造,都会执行当前的有参构造
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//健壮性校验
//核心线程个数允许为0,最大线程数必须大于0,最大线程数要大于等于核心线程数
//非核心线程的最大空闲时间,可以等于0
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
//不满足要求就抛出参数异常
throw new IllegalArgumentException();
//阻塞队列,线程工厂,拒绝策略都不允许为null,为null就抛就抛空指针异常
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
//此行为系统资源访问策略,和线程池核心业务关系不大
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
//各种赋值,在JUC包下,几乎所有涉及到线程挂起的操作,单位都是用纳秒
//有参构造的值,都赋值给我们成员变量
//doug lea习惯就是将成员变量作为局部变量单独操作
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor的execute
方法
execute
是提交任务到线程池的核心方法,很重要
这里我强烈大家跟着敲一遍注释,绝对收获受益匪浅
线程池的执行流程其实就是再说execute方法内部作了那些判断
源码解析:
execute()
:是提交任务到线程池的核心方法
参数:command
就是提交过来的任务
public void execute(Runnable command) {
提交的任务不能为null,否则抛空指针异常
if (command == null) throw new NullPointerException();
ctl.get()
:获取核心属性ctl
,用于后面的判断
int c = ctl.get();
解释: 如果工作线程个数小于核心线程数,满足要求,添加核心工作线程
workerCountOf(c)
:查询工作线程个数
corePoolSize
:核心线程数
addworker
(任务,是核心线程吗): 是否添加工作线程
if (workerCountOf(c) < corePoolSize) {
//addworker(任务,是核心线程吗)
//addworker返回true,代表添加工作线程成功
//addworker返回false,代表添加工作线程失败
//addWorker会给予线程池状态以及工作线程个数做判断,查看能否添加工作线程
if (addWorker(command, true))
//工作线程构建出来了,任务也交给command去处理了
return;
//说明线程池状态或者是工作线程个数发生了变化,导致添加失败,重新获取一次ctl
c = ctl.get();
}
添加核心线程失败,走下面的步骤(上述的步骤都失败,走下面的)
isRunning(c)
:判断线程池状态是否是running
workQueue.offer(command)
:基于阻塞队列的offer
方法,将任务添加到阻塞队列在此阻塞队列添加收否成成功又有两种情况:
- 成功添加
- 重新获取ctl
- 判断线程池的状态是否为
running
:
- 否:任务从阻塞队列移除,并直接执行拒绝策略
- 是: 继续下一步
- 查看工作线程数是否是0个
- 是:添加一个非核心线程处理
- 否: 结束本次流程
//添加核心线程失败,走下面的步骤
//判断线程池状态是否是running,如果是,正常基于阻塞队列的offer方法,将任务添加到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
//如果任务添加到阻塞队列成功,直接走if内部
//如果任务在扔到阻塞队列之前线程池的状态改变了,
//重新获取ctl
int recheck = ctl.get();
//如果线程池的状态不是running,将任务从阻塞队列移除,并直接执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//阻塞队列有我刚刚放进去的任务
//查看工作线程数是否是0个
//如果工作线程为0个,需要添加一个非核心工作线程去处理阻塞队列中的任务
//发生这种情况由两种:
//1.构建线程池时,核心线程为0个
//2.即便有核心线程,也可以设置核心线程允许超时,设置allowsCoreThreadTimeOut等于true,表示核心线程可以超时
else if (workerCountOf(recheck) == 0)
//为了避免阻塞队列的任务饥饿,添加一个非核心线程处理
addWorker(null, false);
}
上述条件都不符合,直接直接执行决绝策略,结束
//任务添加到阻塞队列失败
//构建一个非核心工作线程
//如果添加非核心线程成功,直接结束
else if (!addWorker(command, false))
//添加 失败,执行拒绝策略
reject(command);
完整源码解析
//提交任务到线程池的核心方法
//command就是提交过来的任务
public void execute(Runnable command) {
//提交的任务不能为null
if (command == null)
throw new NullPointerException();
//获取核心属性ctl,用于后面的判断
int c = ctl.get();
//如果工作线程个数小于核心线程数
//满足要求,添加核心工作线程
if (workerCountOf(c) < corePoolSize) {
//addworker(任务,是核心线程吗)
//addworker返回true,代表添加工作线程成功
//addworker返回false,代表添加工作线程失败
//addWorker会给予线程池状态以及工作线程个数做判断,查看能否添加工作线程
if (addWorker(command, true))
//工作线程构建出来了,任务也交给command去处理了
return;
//说明线程池状态或者是工作线程个数发生了变化,导致添加失败,重新获取一次ctl
c = ctl.get();
}
//添加核心线程失败,走下面的步骤
//判断线程池状态是否是running,如果是,正常基于阻塞队列的offer方法,将任务添加到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
//如果任务添加到阻塞队列成功,直接走if内部
//如果任务在扔到阻塞队列之前线程池的状态改变了,
//重新获取ctl
int recheck = ctl.get();
//如果线程池的状态不是running,将任务从阻塞队列移除,并直接执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//阻塞队列有我刚刚放进去的任务
//查看工作线程数是否是0个
//如果工作线程为0个,需要添加一个非核心工作线程去处理阻塞队列中的任务
//发生这种情况由两种:
//1.构建线程池时,核心线程为0个
//2.即便有核心线程,也可以设置核心线程允许超时,设置allowsCoreThreadTimeOut等于true,表示核心线程可以超时
else if (workerCountOf(recheck) == 0)
//为了避免阻塞队列的任务饥饿,添加一个非核心线程处理
addWorker(null, false);
}
//任务添加到阻塞队列失败
//构建一个非核心工作线程
//如果添加非核心线程成功,直接结束
else if (!addWorker(command, false))
//添加 失败,执行拒绝策略
reject(command);
}
ThreadPoolExecutor的execute方法解析完整流程图
注意,到这里并没有完结撒花哈!~
正在更新中,但是上面的execute()
执行流程图我强烈推荐大家跟着画一遍,很有收获的!!!
后面更新对addworker()
方法源码的剖析