ThreadPoolExecutor 应用 & 源码解析
文章目录
- ThreadPoolExecutor 应用 & 源码解析
- 一、线程池相关介绍
- 1.1 为什么有了JDK提供的现有的创建线程池的方法(Executors类中的方法),然而还需要自定义线程池
- ThreadPoolExecutor 提供的七个核心参数大致了解
- JDK提供的几种拒绝策略
- 二、ThreadPoolExecutor的应用
- 三、ThreadPoolExecutor源码解析
- 3.1 线程池的核心属性
- 3.2 线程池的有参构造
- 3.3 execute 方法
- 3.3.1 execute方法本体
- 3.3.2 execute 方法的执行流程图
- 3.3.3 addWorker方法
- 3.3.3.1 addWorkerFailed , 启动工作线程失败执行方法
- 3.4 Worker工作线程类
- 3.4.1 runWorker
- 3.4.2 getTask()
- 3.5 关闭线程池方法
- 3.5.1 shutdownNow()
- advanceRunState
- interruptWorkers
- 3.5.2 shutdown();
- interruptIdleWorkers()
- 3.5.3 tryTerminate
- 四、线程池整体流程图
一、线程池相关介绍
1.1 为什么有了JDK提供的现有的创建线程池的方法(Executors类中的方法),然而还需要自定义线程池
前面演示的Executors中的构建线程池的方式(newXXX一类的方法),大多数都是基于ThreadPoolExecutor new来创建出来的
ThreadPollExecutor的构造器中 一共提供了七个参数 , 每个参数都是非常核心的参数 ,
在线程池去执行任务时每个参数都有对任务决定性的作用
如果直接使用JDK提供的Executors中的方法来创建线程池,其中可以自定义设置的核心参数只有两个,这样的话会导致线程池的控制粒度很粗。在阿里规范中也推荐自己去自定义线程池,手动new线程池 指定参数
自定义线程池,可以细粒度的控制线程池,针对一些参数的设置可以在后期有效的帮助我们排查问题(ThreadFactory)
ThreadPoolExecutor 提供的七个核心参数大致了解
public ThreadPoolExecutor(
// 1. 核心工作线程个数(当前任务执行结束后,这个线程不会被销毁 , 结束后这个线程会执行take方法死等未来要处理的任务)
int corePoolSize,
// 2. 最大工作线程个数(池中一共可以有多少个工作线程=核心数+非核心数) , 非核心工作线程执行完任务后会执行 poll(time,unit) 方法过段时间自动销毁
// 例如 corePollSize = 2,maximumPoolSize=5,那么就表示线程池中最多有5个工作线程存在 其中2个是核心线程 , 剩下3个是非核心线程
int maximumPoolSize,
// 3. 非核心工作线程,在阻塞队列中等待的时间 keepAliveTime
long keepAliveTime,
// 4. 上边keepAliveTime时间的单位
TimeUnit unit,
// 5. 在没有核心线程处理任务时,会把任务扔到此阻塞队列中等待非核心工作线程处理,具体选择哪种阻塞队列需要根据业务来选择
BlockingQueue<Runnable> workQueue,
// 6. 创建线程的工厂 , 一般用来指定线程池中线程的名称 (只用来构建Thread对象 而不是核心线程Worker对象)
ThreadFactory threadFactory,
// 7. 当线程池无法处理加入的任务时,执行拒绝策略
RejectedExecutionHandler handler){
}
JDK提供的几种拒绝策略
- AbortPolicy: 这个拒绝策略会在线程池无法处理任务时,
直接抛出一个异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
- CallerRunsPolicy: 这个拒绝策略会在线程池无法处理任务时,
会将当前提交过来的任务让调用线程池的线程去处理该任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 让提交给线程池任务的线程去处理该任务 , 这种则是同步的方式
r.run();
}
}
- DiscardPolicy: 这个拒绝策略会在线程池无法处理任务时,
会直接丢弃掉这个任务(不做任何处理)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 空空如也、不做任何处理
}
- DiscardOldestPolicy: 这个拒绝策略会在线程池无法处理任务时,
会将当前阻塞队列中 最早加入的任务丢弃掉然后把本次提交的任务放进去
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 将阻塞队列中最早加入的任务移除
e.getQueue().poll();
// 再次将本次提交的任务提交到线程池中
e.execute(r);
}
}
- 自定义线程池的拒绝策略
-
根据自己的业务,可以将任务存储到数据库中,也可以做其他操作
-
二、ThreadPoolExecutor的应用
public class TestThreadPollExecutor {
private static final AtomicInteger counter = new AtomicInteger(1);
public static void main(String[] args) {
// 1. 构建自定义线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
5,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread returnThread = new Thread(r);
// 这里可以设置线程的一些属性
returnThread.setName("Test-ThreadPoolExecutor-" + counter.getAndIncrement());
return returnThread;
}
},
new MyRejectExecution()
);
// ===============
// 2. 让线程池处理任务
// ===============
// 2.1 没有返回结果的任务
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "-没有返回结果的处理方式-Runnable");
});
// 2.2 有返回结果的任务
Future<String> handleResultFuture = threadPool.submit(() -> {
System.out.println(Thread.currentThread().getName() + "-有返回结果的处理方式-Callable");
return "处理的返回结果!";
});
try {
System.out.println(Thread.currentThread().getName() + "-拿到返回结果=>" + handleResultFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 3. 局部变量的线程池 用完之后需要 shutdown
threadPool.shutdown();
/*
输出结果:
Test-ThreadPoolExecutor-1-没有返回结果的处理方式-Runnable
Test-ThreadPoolExecutor-2-有返回结果的处理方式-Callable
main-拿到返回结果=>处理的返回结果!
*/
}
/**
* 自定义拒绝策略
*/
private static class MyRejectExecution implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("自定义拒绝策略,根据自己的业务逻辑来实现具体的拒绝策略功能!");
}
}
}
三、ThreadPoolExecutor源码解析
3.1 线程池的核心属性
/**
* 1. ctl 这个就是线程池最核心的属性,其实就是一个int类型的数值,只是套了一层AtomicInteger,在进行运算时是原子性的,下方所有的内容都是在对ctl这个属性做操作
* 2. ctl 表示的是线程池的两个核心状态: <1> 线程池的状态 <2> 工作线程的个数(不区分核心数和非核心数)
* <1.> 线程池状态: ctl代表的int类型值的高3位 , 表示当前线程池的状态
* <2.> 工作线程数量: ctl的低29位表示,工作线程个数
* 因此需要进行位运算来操作这两个状态,下方就提供了一些位运算的属性和计算这两种状态的位运算方法
* ctl的默认值通过 ctlOf(RUNNING,0)返回的结果作为默认值 , 即 (111 | 00000000 00000000 00000000 00000000) ==> 11100000 00000000 00000000 00000000
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 常量COUNT_BITS 一般表示的值就是 29 为了更方便的操作 int 的低29位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 00000000 00000000 00000000 00000001 => 1
// 00100000 00000000 00000000 00000000 => 1 << 29
// 00011111 11111111 11111111 11111111 => 1 << 29 - 1
// CAPACITY 表示当前线程池中 可以记录到的工作线程的做大值
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// =============== 线程池状态相关属性 ==================
// 可以看到线程池的状态有5种状态 , 当且仅当线程池状态为-1时 表示当前线程池没有问题 可以处理任务 其他都是有问题的状态
// -1二进制表示 : 11111111 11111111 11111111 11111111
// <1> RUNNING(-1): 高3位是 111
// 此时可以处理任务,也可以操作阻塞队列种的任务
private static final int RUNNING = -1 << COUNT_BITS;
// <2> SHUTDOWN(0): 高3位是 000
// **此时不会接收新的任务,但是正在处理的任务和阻塞队列中的任务都会执行完毕**
private static final int SHUTDOWN = 0 << COUNT_BITS;
// <3> STOP(1) : 高3位是 001
// **此时不接收新的任务,而且会调用正在处理的任务线程的中断方法,阻塞队列种的任务一个不管**
// 中断方法:(前面直到中断并不会直接结束线程而是由线程自己决定什么时候中断,只是告诉线程一声(改变中断标志位))
private static final int STOP = 1 << COUNT_BITS;
// <4> TIDYING(2) : 高3位是 010
// 这个状态是从SHUTDOWN或者STOP状态转变过来的一个过度状态 , 表示当前线程池即将要关闭了,但是还未关闭
private static final int TIDYING = 2 << COUNT_BITS;
// <5> TERMINATED(3) : 高3位是 011
// 线程池关闭的真正的状态 , 这个状态是由 TIDYING 转换过来的,转换过来之后为了执行一个钩子方法terminated()
private static final int TERMINATED = 3 << COUNT_BITS;
// 根据传入的ctl值 拿到ctl高3位的值 - 也即当前运行状态
// ~CAPACITY ==> 11100000 00000000 00000000 00000000
// ctl与此值执行&运算会拿到当前ctl所代表的线程池的运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 同理 拿到当前ctl低29位的值 代表当前线程池中的工作线程个数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 传入运行状态rs , 和工作线程个数ws , 让这两个值组成一个新的ctl值
// 两个值经过 | 运算会把他们的二进制位拼起来 组成一个新的ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }
- 线程池状态及其转换流程图
- 线程池状态及其转换流程图
3.2 线程池的有参构造
/**
* 无论调用ThreadPollExecutor的哪个有参构造 最终都会调用这个构造器
*
* 重点: 核心线程数是允许为0的
*
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// corePoolSize可以 == 0
// 最大工作线程数 必须 > 0
// 最大工作线程数 必须 >= 核心线程数
// 非核心线程的空闲时间 必须 >= 0
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// 校验阻塞独立额、线程工厂、拒绝策略 均不允许为null
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 系统资源访问相关内容,和线程池核心业务无关
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
// 各种赋值 , 赋值给成员变量
// 后边会大量出现 Doug Lea的编码习惯 将这些成员变量作为局部变量进行操作
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
// 转换为纳秒,为了更精确
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
3.3 execute 方法
1.execute方法是提交任务到线程池的核心方法,很重要
2.线程池的执行流程就是execute方法 内部做了哪些判断
3.3.1 execute方法本体
// command 就是提交过来的任务
public void execute(Runnable command) {
// 提交的任务不能为null
if (command == null)
throw new NullPointerException();
// 拿到当前线程池 ctl属性 , 为了后续判断线程池状态判断
int c = ctl.get();
/**
* workerCountOf(c) : 拿到当前线程池的工作线程个数
* 线程池的核心线程是懒加载的 , 只有用到了才会创建
* 拿到工作线程个数 < 核心线程数 表示线程池还可以添加核心线程 那么就添加核心线程来处理此任务
*/
if (workerCountOf(c) < corePoolSize) {
// addWorker(任务,是否为核心线程)
// addWorker 返回true 表示添加工作线程成功 , 返回false 则添加工作线程失败
// 添加失败的原因比如 addWorker 执行时线程池执行了 shutdownNow()方法 中断所有线程此时就不能再添加工作线程了返回false ||| 或者并发判断工作线程数量时产生并发问题
// addWorker 会基于线程池的状态、以及工作线程个数来判断能否添加工作线程
if (addWorker(command, true))
// 工作线程构建出来、并且任务也交给工作线程处理了 本次提交任务直接返回true
return;
// ================ 到这里说明添加工作线程失败 (线程池状态或者工作线程个数发生了变化导致添加失败) ===============
// 此时ctl已经由于并发原因被改变了 所以重新获取ctl
c = ctl.get();
}
// ====== 不能添加核心线程 或者 添加核心线程失败了 继续走下边的逻辑 ============
/**
* isRunning(c) : 判断当前线程池的状态是不是RUNNING状态(前边讲过线程池只有是RUNNING状态时才可以添加任务)
* 如果线程池状态是RUNNING状态:则可以添加当前任务到阻塞队列中 , 否则继续执行下边else的逻辑
*/
if (isRunning(c) && workQueue.offer(command)) {
// 添加任务到阻塞队列中成功则走if里边的代码
// 由于这个if逻辑并没有加锁,所以ctl可能会出现并发修改问题 因此 当任务添加到阻塞队列之后,重新检查线程池的状态是否发生了改变
// 重新获取ctl
int recheck = ctl.get();
/**
如果线程池的状态不是 RUNNING 状态了 , 那么就将本次添加的任务从阻塞队列中移除
*/
if (!isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
// =========== 上边的if判断没有执行 程序运行到这里说明 此时阻塞队列中已经存在了本次添加的任务command =================
// 如果工作线程是0个,那么就需要添加一个工作线程去处理这个任务
// 发生这种情况有两种情况:
// 1.构建线程池时 核心线程数为0
// 2.设置核心线程允许超时allowCoreThreadTimeOut属性设置为true
else if (workerCountOf(recheck) == 0)
// 添加一个 不需要任务且是非核心的工作线程 , 目的是处理在阻塞队列中无法被处理的任务
addWorker(null, false);
}
// 线程池的状态不是RUNNING状态 或者 状态是RUNNING但是将任务添加到阻塞队列中 失败了走这个逻辑
/**
* 到这里 可能由于调用了shutdown、或者shutdownNow方法改变了线程池状态,或者是阻塞队列满了无法添加此任务
* 如果是shutdown状态
*/
// 添加非工作线程成功 直接结束本次execute方法
else if (!addWorker(command, false))
// 添加失败,执行拒绝策略
reject(command);
}
3.3.2 execute 方法的执行流程图
- 1.找的图片
- 2.画的流程图 执行流程图
3.3.3 addWorker方法
/**
* firstTask: 就是传入过来待执行的任务
* core: true则添加的是核心线程,false则添加非核心线程
* addWorker方法 可以分为两个模块去看
* 第一个模块就是对线程池状态、工作线程数量做校验模块
* 第二个模块就是添加工作线程模块
*
*/
private boolean addWorker(Runnable firstTask, boolean core) {
// ================================================================================
// ==================== addWorker第一块 对线程池的状态信息做校验 =======================
// ===============================================================================
// 为了从内层for循环,跳出外层for循环 使用标签语法
retry:
// =======================================================
// **外层在校验线程池的状态信息 , 内层for循环在校验线程池工作线程数**
// =======================================================
for (;;) {
// 拿到ctl、根据ctl线程池状态rs
int c = ctl.get();
int rs = runStateOf(c);
// rs >= SHUTDOWN 说明线程池状态不是RUNNING,即当前线程池的状态不是正常的 , 当前不能添加新的任务
// 如果线程池状态为SHUTDOWN,并且阻塞队列中有任务,那么就需要添加一个工作线程去处理阻塞队列中的任务 ( 满足前边的addWorker(null,false) )
if (rs >= SHUTDOWN &&
// 到这里状态已经不是RUNNING状态了,只有SHUTDOWN状态才可以继续处理阻塞队列中的任务
// 如果三个条件有任意一个不满足返回false,配合! 代表不需要添加工作线程的
// 1.不是SHUTDOWN状态(而是STOP状态),2. 任务不是空的,3. 阻塞队列是空的 |||| 满足这三个条件 则不添加工作线程
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
// 由于线程池状态的原因 本次添加工作线程失败
return false;
// =======================================================
// ** 内层循环做工作线程数量的判断操作**
// =======================================================
for (;;) {
// 根据ctl获取当前 工作线程个数 wc
int wc = workerCountOf(c);
// 如果工作线程数量 >= 最大容量
if (wc >= CAPACITY ||
// 基于core判断能否添加 核心线程或非核心线程
// 如果要添加核心线程 工作线程 >= 核心线程数量 则不可以再添加核心线程 直接返回false
// 如果要添加非核心线程那么 当前工作线程数 >= 设置的最大工作线程数 那么也不可以添加 直接返回false
wc >= (core ? corePoolSize : maximumPoolSize))
// 由于工作线程个数的问题 本次添加工作线程失败
return false;
// ===== 上边数量校验通过 证明本次addWorker是可以添加工作线程 =====
// CAS自增ctl , 然后break跳出第一模块校验参数模块
if (compareAndIncrementWorkerCount(c))
// 跳出两层for循环 进入addWorker第二模块(添加工作线程模块)
break retry;
// ============== 到这说明 CAS自增ctl属性失败 ctl发生了并发现象 =============
// 再次拿到ctl , 此时拿到其他线程更改过后的ctl
c = ctl.get();
// 根据新的ctl获取线程池状态 如果发现状态和进入此方法时状态不同 那么就重新执行 第一模块校验参数模块
// 如果状态没变 那么重新执行内层for循环即可 重新基于CAS自增ctl
if (runStateOf(c) != rs)
// 重新执行第一模块校验模块
continue retry;
}
}
// ================================================================================
// ======================= addWorker第二块,添加工作线程、启动工作线程 ====================
// ================================================================================
// 工作线程启动成功标志位 默认false未启动
boolean workerStarted = false;
// 工作线程添加成功标志位 默认false未添加
boolean workerAdded = false;
// 工作线程对象,默认为null
Worker w = null;
try {
// 创建工作线程对象并且把任务传递进去 , 如果传过来的firstTask不是空的 那么就会先处理带过来的任务 , 如果没有那么才回去阻塞队列中查找任务
w = new Worker(firstTask);
/**
* Worker()构造器内部 this.thread = getThreadFactory().newThread(this);
* 获取工作线程对象里边由ThreadFactory创建出来的 Thread对象
*/
final Thread t = w.thread;
// 这里Thread如果是null,那么可能是 new Worker() 时内部使用提供的ThreadFactory对象创建出来的Thread是null
if (t != null) {
// 加锁 , 因为下方会操作成员变量可能会出现线程安全问题
// 下方会操作成员变量 workers、largestPoolSize
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 再次获取当前线程池状态 rs
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN : 说明线程池只能是RUNNING状态 , 状态正常执行if逻辑
// rs == SHUTDOWN && firstTask == null , 只能是前边的 addWorker(null,false) 用来添加一个非核心工作线程来处理阻塞队列中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 到这里边说明可以添加工作线程
// 如果线程已经启动过了 那么抛出异常
// 这里只可能是ThreadFactory里边生成Thread是已经启动了的线程
if (t.isAlive())
throw new IllegalThreadStateException();
// private final HashSet<Worker> workers = new HashSet<Worker>();
// 校验线程没有问题 , 把当前工作线程添加到 workes 工作线程集合中
workers.add(w);
// 获取 工作线程数量
int s = workers.size();
// private int largestPoolSize; 表示线程池最大线程个数记录
// 当前工作线程个数 > 原来的最大线程个数
if (s > largestPoolSize)
// 替换最大线程个数
largestPoolSize = s;
// 添加工作线程成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加成功 启动工作线程
if (workerAdded) {
// 启动工作线程
t.start();
// 启动成功标志
workerStarted = true;
}
}
} finally {
// 如果工作线程启动失败 , 那么这个工作线程也就没有意义了 执行addWorkerFailed方法
if (! workerStarted)
addWorkerFailed(w);
}
// 返回工作线程是否启动成功
return workerStarted;
}
3.3.3.1 addWorkerFailed , 启动工作线程失败执行方法
// 启动工作线程失败 做补偿操作
private void addWorkerFailed(Worker w) {
// 需要操作workes 所以需要加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
// 将这个工作线程从 workes移除
workers.remove(w);
// 将 ctl -1 , 代表去掉了一个工作线程个数 (addWorker对ctl+1操作是在 第一模块校验时已经+1了)
decrementWorkerCount();
// 工作线程启动失败 可能是因为状态改变了 判断是不是可以走 TIDYING -> TERMINATED 结束线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
3.4 Worker工作线程类
/**
* 1. Worker继承了AQS,可以实现一些锁的机制,这里的目的就是为了控制工作线程的中断
* 2. Worker实现了Runnable,内部的Thread对象在执行start时,会执行Worker中的run方法逻辑
*
*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
// ====================================================================
// ================================ Worker 管理任务模块 =================
// ====================================================================
// 由线程工厂构建出来的线程对象
final Thread thread;
// 当前Worker要执行的任务
Runnable firstTask;
// 记录当前工作线程处理了多少个任务
volatile long completedTasks;
// 工作线程只有一个创建方式 就是此有参构造器
Worker(Runnable firstTask) {
// 将state值设置为 -1 , 表示当前Worker不允许被执行中断方法
setState(-1);
// 赋值任务
this.firstTask = firstTask;
// 利用线程工厂创建Thread对象
// 看到传入的Runnable是当前Worker对象 , 所以Worker里边的thread线程启动后实际执行的是 Worker的run方法逻辑
this.thread = getThreadFactory().newThread(this);
}
// 上边可知 Worker里边的thread启动后 实际执行的就是这个run方法的逻辑 , 执行的就是runWorker()方法
@Override
public void run() {
runWorker(this);
}
// ====================================================================
// =========================== Worker 管理线程中断模块(AQS) ==============
// ====================================================================
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
/**
* 这个方法是中断工作线程时,执行的方法
*/
void interruptIfStarted() {
Thread t;
/**
* getState() >= 0 , 上边新建Worker时state设置为-1 因此-1表示当前Worker不允许执行中断操作
* thread 不为空 , 并且 t 未中断 则可以执行中断线程
*
*/
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
// state不是-1 , t不为空,t未中断 , 则可以执行中断线程
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
3.4.1 runWorker
// 能够运行这个方法的唯一方式就是: Worker的run方法 , 因此运行这个方法的一定是线程池的工作线程
final void runWorker(Worker w) {
// 拿到当前线程 wt
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
// 已经拿到了task 将工作线程中存储任务的字段设置为null
w.firstTask = null;
// 将Worker中的 state设置为0 , 代表当前工作线程给可以被中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
/**
* 这个while循环 就是要让这个工作线程一直处于工作状态 拿到任务->处理任务 , 当结束了while循环则考虑干掉当前工作线程了
*
* 如果传入的工作线程Worker 带的有任务 那么就执行带过来的任务
* 如果传入的工作线程Worker 没有带任务 那么就去阻塞队列中拿取任务
*
*/
while (task != null || (task = getTask()) != null) {
// 执行当前Worker的lock方法, 表示当前工作线程已经开始工作了 不允许中断当前线程
// 使用shutdown()关闭线程池时 会检查Worker是否正在运行 正在运行的工作线程则不会被中断
w.lock();
// 比较ctl>=STOP,如果满足这个条件 则说明线程池已经到了STOP甚至已经终结了
// 1. 线程池到了STOP状态,并且当前线程还未中断 , 则需要中断当前线程(if里的逻辑)
// 2. 线程池状态不是STOP,线程已经中断并且再次查看线程池状态变为了STOP 那么就再次中断线程
// 下面的判断其实就是表达了,如果线程池是 >= STOP状态 那么就必须确保这个线程是处于中断状态的
if (
(runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&&
!wt.isInterrupted()){
wt.interrupt();
}
try {
// 前置钩子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 后置勾子函数
afterExecute(task, thrown);
}
} finally {
// 任务执行完成 , 丢掉任务
task = null;
// 当前工作线程处理任务数量++操作
w.completedTasks++;
// 执行unlock 将state改为0 , shutdown方法才可以中断这个工作线程
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 结束掉当前工作线程
processWorkerExit(w, completedAbruptly);
}
}
- processWorkerExit 结束工作线程方法
/**
* @param w , 当前要执行结束的工作线程对象
* @param completedAbruptly 突然完成标志位,如果runWorker中判定是突然完成那么就会传入true
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果是突然完成的,表示是异常结束工作线程
if (completedAbruptly)
// 减少工作线程数量
decrementWorkerCount();
// 下方需要操作共享变量 所以拿到锁并加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将当前工作线程的 完成任务数量合并到线程池的完成任务数
completedTaskCount += w.completedTasks;
// 将当前工作线程从workes中移除掉
workers.remove(w);
} finally {
mainLock.unlock();
}
// 工作线程结束 查看是否是线程池状态改变了
tryTerminate();
// 拿到ctl
int c = ctl.get();
// 这个判断表示 如果当前线程池状态 < STOP , 即只能为 RUNNING、SHUTDOWN状态时
if (runStateLessThan(c, STOP)) {
// 不是突然完成的(正常结束工作线程) 可以进入这个if
if (!completedAbruptly) {
// 拿到当前线程池的 可能的最小核心线程数量 , 如果允许核心线程超时 那么最小的核心线程就是0 否则永远为corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果可能的核心线程数最小为0个 并且队列中不是空的
if (min == 0 && ! workQueue.isEmpty())
// 设置最小核心线程为1个 用来处理阻塞队列中的任务
min = 1;
// 比较 工作线程数量 >= 当前可能的最小核心线程数量 说明有线程处理任务 正常return
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 异常结束工作线程,为了避免出现问题 添加一个空任务的非核心线程来填补上刚刚异常结束的工作线程
addWorker(null, false);
}
}
3.4.2 getTask()
/**
* 这个方法就是从线程池的阻塞队列中获取要执行的任务
*
* 第一块就是 检查线程池状态和工作线程数量的校验 , 如果不满足直接返回null则拿取任务失败上层runWorker方法的while结束
* 第一块校验通过了 才能执行第二块 从阻塞队列中拿取任务
*/
private Runnable getTask() {
// 默认为false,默认没有超时
boolean timedOut = false;
for (;;) {
// 拿到ctl 、根据ctl获取线程池状态
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池状态是STOP,那么不需要再处理阻塞队列中的任务了,直接返回null
// 如果线程池状态是SHUTDOWN,并且阻塞队列是空的 那么也不需要再处理阻塞队列任务了
if (rs >= SHUTDOWN &&
(rs >= STOP || workQueue.isEmpty())) {
// 扣减工作线程个数
decrementWorkerCount();
return null;
}
// 根据ctl拿到工作线程个数
int wc = workerCountOf(c);
// 1. 核心线程允许超时 timed为true
// 2. 当前工作线程数已经大于了核心线程数 timed为true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (
// 工作线程>最大线程数(一般不会满足 看为false即可)
// 工作线程<=核心线程 那么一定是false
// 第一次进入getTask方法的for循环一定不会走这个if里边,第二次就有可能了
(wc > maximumPoolSize || (timed && timedOut))
&&
// 在满足上边条件的前提下 , 当前执行的线程是非核心线程并且timeOut超时标记位是true
// 确保工作线程除了自己还有至少一个 , 或者阻塞队列是空的 , 那么就可以尝试减少工作线程数量 返回null由上层方法结束掉当前工作线程了
(wc > 1 || workQueue.isEmpty())) {
// CAS减少工作线程数
if (compareAndDecrementWorkerCount(c))
return null;
// 若CAS失败再次执行for循环
continue;
}
// 工作线程从阻塞队列中拿任务
try {
// 是核心线程timed是false , 非核心是true
Runnable r = timed ?
// 非核心,等待一会 keepAliveTime
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 核心线程,使用take方法死等
workQueue.take();
// 从阻塞队列中拿到任务不为null,拿到任务返回任务然后去执行即可
if (r != null)
return r;
// 到这,没拿到任务 timeOut设置为true , 所以再次经过for循环就可以退出这个方法返回null结束当前线程了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
3.5 关闭线程池方法
3.5.1 shutdownNow()
- 该方法会将线程池状态从 RUNNING -> STOP , 立即中断所有的工作线程 , 并且不会处理阻塞队列中的任务
// shutdownNow 不会处理阻塞队列的任务所以会将任务返回给客户端
public List<Runnable> shutdownNow() {
// 返回的任务
List<Runnable> tasks;
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 将线程池状态修改为 STOP
advanceRunState(STOP);
// shutdownNow不管工作线程是否再执行任务都会执行 中断工作线程
interruptWorkers();
// 将阻塞队列中的任务放到tasks中
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
advanceRunState
private void advanceRunState(int targetState) {
// 死循环必然会修改状态
for (;;) {
int c = ctl.get();
// 如果当前线程池状态已经 >= 将要修改的状态 则不需要修改了
if (runStateAtLeast(c, targetState) ||
// 将targetState重新和工作线程数量组成新的ctl 并将ctl修改为这个新组装的值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
interruptWorkers
// shutdownNow方法 立即执行中断工作线程操作
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 执行所有Worker的interruptIfStarted方法
// 所以shutdownNow会立即中断正在运行的工作线程
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
3.5.2 shutdown();
- 该方法会将线程池状态从 RUNNING -> SHUTDOWN , 不会立即中断正在干活的工作线程 , 并且会等待阻塞队列中的任务处理完成后再关闭线程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 将线程池状态 变为 SHUTDOWN状态
advanceRunState(SHUTDOWN);
// 中断处于空闲状态的工作线程
interruptIdleWorkers();
// 勾子函数 自己实现
onShutdown();
} finally {
mainLock.unlock();
}
// 尝试结束线程池
tryTerminate();
}
interruptIdleWorkers()
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
// 中断空闲的工作线程
private void interruptIdleWorkers(boolean onlyOne) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 线程目前没有中断 , 那么就去获取Worker的锁资源 都满足的话才能够执行线程中断操作
// 工作线程在执行任务时会先把当前工作线程的 state变为1 所以这里tryLock正在运行的工作线程是失败的(所以这里只会中断空闲的工作线程)
if (!t.isInterrupted() && w.tryLock()) {
// 进到这里 说明当前工作线程是空闲线程
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
3.5.3 tryTerminate
// 查看当前线程池是否可以变为TERMINATED状态
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果是RUNNING状态 直接return
// 如果现在的状态 >= TIDYING 说明即将变为TERMINATED状态 那么也可以return结束方法
// 如果是SHUTDOWN状态并且阻塞队列还有任务,那么也不可以变为TERMINATED状态 return结束
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果还有工作线程
if (workerCountOf(c) != 0) {
// 中断空闲的工作线程 只中断一个
interruptIdleWorkers(ONLY_ONE);
// 本次尝试结束 , 等下次工作线程为0个之后 再进来尝试改变状态
return;
}
// 加锁 , 可以执行Condition的释放操作
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将线程池状态修改为 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 执行勾子函数
// 可以在线程池结束时做一些额外操作 自己实现
terminated();
} finally {
// 修改状态为TERMINATED状态
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒正在等待线程池结束的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
四、线程池整体流程图
线程池整体流程