ThreadPoolExecutor源码解析
多线程场景下,手动创建线程有许多缺点:
- 频繁创建、销毁线程会消耗大量 CPU 资源,
- 销毁线程后需要被回收,对 GC 垃圾回收也有一定的压力
使用线程池有许多好处:
- 降低 CPU 资源消耗。通过复用线程,减少创建、销毁线程造成的消耗
- 提高响应速度。由于有复用的线程,工作队列中的任务可以直接被空闲线程获取并执行,不需要等待线程创建。
- 提高管理性。使用线程池统一分配管理,避免无限制创建线程消耗系统资源,降低系统稳定性。同时线程池提供了许多钩子函数,可以批量管理线程池中的线程。
ThreadPoolExecutor使用方法
public class ThreadPoolDemo1 {
public static void main(String[] args) {
//实例化自定义参数的线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
3,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(20),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r){
@Override
public void run() {
super.run();
}
};
//t.setName();
return t;
}
},
new ThreadPoolExecutor.AbortPolicy());
//执行任务
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("say hello");
}
});
//柔和地关闭线程池
threadPoolExecutor.shutdown();
}
}
线程池状态
- RUNNING:接收新任务,并且处理阻塞队列中的任务
- SHUTDOWN:拒绝新任务,但是处理阻塞队列里的任务
- STOP:拒绝新任务,并且抛弃阻塞队列中的任务,同时中断正在处理的任务。
- TIDING:所有任务都执行完(包含阻塞队列里面的任务),当前线程池活动线程数为 0 ,即将执行
terminated()
方法 - TERMINATED:终止状态,
terminated()
方法调用完成以后的状态。
线程池状态数据结构
线程池状态由原子类型 AtomicInteger 来记录,value为32位的int类型数据,其高 3 位用于记录线程池的运行状态,低29位用于记录线程池当前活动线程数。
在ThreadPoolExecutor中,其数据结构设计如下:
//原子类型的int类型,其中的value为int类型,32位数据
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//COUNT_BITS=32-3=29,即活动线程数的位数
private static final int COUNT_BITS = Integer.SIZE - 3;
//最大活动线程数(低29位全为1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 运行状态存储在高3位, 活动线程数存储在低29位
//-1二进制为 1111...111
//1 二进制为 0000...001
//2 二进制为 0000...010
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
//获取运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取活动线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
//根据 运行状态 与 活动线程数 ,生成ctl这个int类型的数据
private static int ctlOf(int rs, int wc) { return rs | wc; }
ThreadPoolExecutor()构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
构造方法为线程池设置了许多核心参数:
参数名 | 说明 |
---|---|
corePoolSize | 核心线程数 |
maximumPoolSize | 总线程数=核心线程数+非核心线程数 |
keepAliveTime | 非核心线程最长空闲时间(如果超过空闲时间还没有任务,非核心线程就会被销毁) |
unit | keepAliveTime的时间单位,可以是 TimeUnit.SECONDS |
workQueue | 工作队列,存放Runnable,即线程任务。 |
threadFactory | 线程的构造工厂,用于自定义新建线程,如给线程添加名字 |
handler | 拒绝策略,如果在如下情况,新来的任务就会进入拒绝策略。 1. 工作线程全都被占用,而且任务队列以及被存满。 2. 当前线程不接受新的任务,如SHUTDOWN、STOP状态 |
如果线程池主要用来做计算逻辑判断等 CPU 密集型任务,我们需要尽量减少 CPU 上下文切换,设置核心线程数大小为:Ncpu + 1。
如果主要任务是大量 I/O 操作的 I/O 密集型任务,对CPU消耗较小,我们可以让核心线程数大一些,设置核心线程数大小为: 2 * Ncpu
execute(Runnable) 将新任务交给线程池
线程池任务提交的主要流程如下;
在 execute() 方法中,对应上图,主要做了这几件事:
- 如果当前活动线程数没有超过核心线程数,创建核心线程执行传入的任务。
- 核心线程都被占用,如果队列还没满,将任务放到队列中。
- 如果队列已经满了,如果没超过最大线程数,可以创建非核心线程执行传入的任务。
- 都不满足,或者线程池状态变为非运行态,将会执行拒绝策略来拒绝任务。
- 特别的,如果线程池变为非运行态,且为 SHUTDOWN,为了让线程池成功进入 TIDYING 状态,需要将任务队列中的任务清空,创建一个空任务非核心线程,用于获取并处理任务队列中的任务。
在创建非核心线程之前,任务队列中等待的任务将被空闲下来的核心线程获取并执行。
创建非核心线程之后,任务队列中等待的任务不仅可以被核心线程获取,也可以被存活的非核心线程获取并执行。
非核心线程有存活时间,由参数 keepAliveTime 设定,如果空闲时,持续 keepAliveTime 时间无法获取到任务,该非核心线程将会被销毁。
public void execute(Runnable command) {
//如果传入的任务是空,抛异常
if (command == null)
throw new NullPointerException();
//获取当前线程池状态
int c = ctl.get();
//1. 如果活动线程数<核心线程数
if (workerCountOf(c) < corePoolSize) {
//创建核心线程来处理传入的任务
if (addWorker(command, true))
return;
//如果创建失败,说明有其他线程介入
//再次获取最新线程池状态
c = ctl.get();
}
//2. 如果当前线程池是运行态,并且成功将任务放入工作队列
if (isRunning(c) && workQueue.offer(command)) {
//添加任务队列成功
//再次检查最新线程状态
int recheck = ctl.get();
//如果当我们将任务放入队列后出现了:
//1. 线程池不再是运行态 --> 将任务从队列中剔除,并执行拒绝策略(非运行状态,拒绝接受任务)
//2. 活动线程数为0 --> 创建非核心线程,处理的任务由非核心线程自主去工作队列中获取
if (! isRunning(recheck) && remove(command))
//线程池状态不正确,执行拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)
//情况1:RUNNING状态,但设置核心线程数为0,此时任务放入了工作队列,但没人处理
//情况2:SHUTDOWN状态,但remove任务失败。在此之前工作线程处理完所有任务后全部停止工作,为了保证队列为空,需要再创建一个空任务非核心线程,处理这个队列中的剩余任务
addWorker(null, false);
}
//3. 如果线程池不是运行态,或者是运行态,但任务队列已满,尝试添加非核心线程处理当前任务
else if (!addWorker(command, false))
//添加非核心线程失败,执行拒绝策略
reject(command);
}
addWorker(Runnable, boolean) 添加工作线程
addWorker()的任务是添加工作线程,它主要做了以下几件事:
- 先在表示线城池状态的 ctl 变量中,增加工作线程数
- 检查线程池状态是否合法
- 自旋CAS增加工作线程数——CAS方法提供了原子性的 检查+更新
- 然后真正地创建线程,并加入到工作线程集合 workers 中
- 新建Worker
- 获取锁——非CAS方法 检查+更新 需要加锁使之为原子性
- 检查线程池状态是否合法
- 将 Worker 加入 workers
- 释放锁
- 开启线程
mainLock 逻辑上用于 workers 资源的上锁
private boolean addWorker(Runnable firstTask, boolean core) {
//----------------增加工作线程数--------------------
retry:
for (;;) {
//创建新的工作线程,需要 CAS自旋 更新工作线程数
int c = ctl.get();
int rs = runStateOf(c);
//如果出现以下情况,将不添加工作线程
//1. 线程池非运行态
//2. 在1的条件下,线程池为STOP状态(既不是运行态也不是SHUTDOWN态),且传入了任务
//3. 在2的条件下,工作队列不为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//如果线程池状态为运行态,进入该循环,自旋cas尝试增加工作线程数
int wc = workerCountOf(c);
//创建失败条件:
//1. 如果当前活动线程数已经到达了最大线程数
//2. 如果请求创建核心线程,但核心线程数已经满了
//3. 如果请求非核心线程数,但总线线程数已经满了
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//如果还可以创建,先自旋cas尝试增加工作线程数
if (compareAndIncrementWorkerCount(c))
//有别的线程介入,cas失败,自旋再cas。
//注意,由于别的线程cas失败,所以需要重新判断线程状态ctl
break retry;
c = ctl.get(); // Re-read ctl
//活动线程数加1完成后,观察最新线程池运行状态
if (runStateOf(c) != rs)
//如果线程池运行状态变化(变成非运行态)重新进入retry循环查看条件
continue retry;
}
}
//----------------真正创建工作线程--------------------
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新的Worker,Worker封装了线程、任务
//初始Worker的state是-1
w = new Worker(firstTask);
final Thread t = w.thread;
//健壮性判断,如果线程不为空
if (t != null) {
//多线程环境下 workers是非线程安全集合,它的增删操作需要加锁
//需要通过上锁,将 检查+更新 包装为原子性
//mainLock 逻辑上用于 workers 资源的上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查当前线程池状态
int rs = runStateOf(ctl.get());
//1. 如果是运行态
//2. SHUTDOWN态且传入任务为空,这个情况创建线程的目的是处理工作队列中剩余任务。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//健壮性判断,该线程不应该为运行态,因为他是刚创建的。
if (t.isAlive())
throw new IllegalThreadStateException();
//在临界区内操作临界资源
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//标记添加工作线程成功
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
if (workerAdded) {
//添加工作线程成功后,启动线程
t.start();
//线程开启标记
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//线程开启失败,自旋cas将之前增加的工作线程数记录-1
//同时将该启动线程失败的worker从set中移除
addWorkerFailed(w);
}
return workerStarted;
}
Worker对象
Worker对象封装了线程、线程任务、线程在线程池中的状态
它构造方法如下:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
注意到Worker类实现了Runnable接口,构造方法中新建线程时,给线程赋予的Runnable对象不是firstTask,而是worker实例。
未来执行线程,将会进入到worker的run()方法,随后进入runWorker()方法。
runWorker(Worker) 线程的执行逻辑
回顾一下普通启动线程的方式,只传入一个实际任务,它们执行任务完毕就会结束线程。在多线程环境下,如果频繁创建、销毁线程,是很消耗CPU资源的。而线程池为了复用线程,在 ThreadPoolExecutor 中,将核心线程与非核心线程都设计为了持续运行的可复用的线程。
核心线程需要一直保持运行状态,不断阻塞处理工作队列中的任务;非核心线程在空闲时也会处理工作队列中的任务,只有在空闲时间超时后才会结束线程。
让线程保持运行状态、获取工作队列中任务的一系列动作,需要将工作队列与线程联系起来,这由 ThreadPoolExecutor 设计的 runWorker(Worker w) 完成。
线程池创建的线程由 t.start() 启动后,run() 方法将会被回调,Worker对象实例化线程时,传入了实现了 Runnable 接口的 Worker 实例。所以 t.run() 方法将会进入 Worker类 的 run() 方法。其中就只做了一件事:执行由上述由 ThreadPoolExecutor 实现的 runWorker() 方法。
**runWorker() 方法为了将工作队列与线程联系起来,让线程有任务可做,**做了几件事:
- 如果有初始任务,则执行初始任务
- 如果没有,或者初始任务已经执行完了,就去工作队列中获取任务
- 线程是临界区资源,先上锁,再执行任务
需要注意的是,runWorker 在循环中,在执行任务前后添加了钩子函数 beforeExecute() 与 afterExecute() ,它们抛出的异常如果方法内没有处理,将会导致 runWorker 跳出循环,进入线程终止处理。
如果我们希望线程可以中断,我们可以在钩子函数中实现对中断的响应。
除了响应中断,我们还可以做很多操作,例如让线程暂停/回复 处理任务,这个代码在文末提供。
final void runWorker(Worker w) {
//获取当前线程环境
Thread wt = Thread.currentThread();
//取出Worker封装的初始任务
Runnable task = w.firstTask;
//将Worker封装的初始任务清空
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//循环获取可执行的任务,进入循环的条件:
//1. 如果worker中有初始任务
//2. 获取到工作队列中的任务
while (task != null || (task = getTask()) != null) {
//线程是临界区资源,处理任务的过程需要将线程上锁
w.lock();
//1.如果线程为STOP状态,或者TIDYING、TERMINATED状态,那么就将线程中断(将中断标记设为true)
//2.如果线程中断标记为真,且状态此时变为上述状态,那么就将线程中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//如果线程没被中断,为正常运行态
//执行任务前的钩子函数,注意,钩子函数抛出的异常是没人管的,会直接进入到finally代码块
beforeExecute(wt, task);
Throwable thrown = null;
try {
//在当前线程环境执行任务的run()方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
//处理完任务,将task置空,等待GC
task = null;
//累计完成任务数+1
w.completedTasks++;
//处理任务完成,将线程资源释放
w.unlock();
}
}
//如果执行到这里之前有人抛出异常,且无人捕获,则不执行这句,直接进入finally
//(工作线程 task.run() 即使抛了异常,也被捕获了。只有可能在 getTask,w.lock(), 钩子函数 等无人捕获异常的地方抛出异常,才有可能跳过这句话的执行,直接进入finally)
//如果执行到这里都没人抛出异常,或者抛出的异常全都被捕获过,则执行这一句
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
有的朋友就会疑问,非核心线程是有 keepAliveTime 的,但是在 runWorker() 中并没有看到相关的结束线程的判断。不用急,我们看到 getTask() 方法中,它处理了非核心线程的存活时间。
getTask() 从工作队列中获取线程任务
getTask() 运行在当前线程的环境,当前线程需要从工作队列中获取任务。如果超时了,或者线程需要结束的情况,将会返回null。如果正常,将会把取到的任务返回,并由当前线程执行 r.run()。getTask()主要做了下面几件事:
- 获取线程最新状态
- 根据线程池状态,判断当前线程是否要继续运行
- 如果线程为运行态
- 或者是SHUTDOWN状态,但工作队列不为空,就继续运行
- 如果线程超时,或者线程数过多,且满足下列条件,就可以退出当前线程
- 如果有其他线程(工作队列中的任务可以被别的线程处理)
- 如果工作队列为空(不需要线程处理任务)
- 线程正常运行,则 限时 或 无限时 从工作队列中获取任务。
- 只有限时获取任务失败,才会第二次进入循环。
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
//获取线程池最新状态
int c = ctl.get();
int rs = runStateOf(c);
//如果线程非运行态
//如果
//1. 是STOP、TIDYING、TERMINATED状态
//2. 是SHUTDOWN状态,且工作队列为空
//则自旋cas减小工作线程数,并return null,让所在线程获取任务失败,退出获取任务的循环,从而结束线程。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//如果线程为运行态
//或者是SHUTDOWN状态,但工作队列不为空
//就需要将任务取出处理
int wc = workerCountOf(c);
//allowCoreThreadTimeOut 一般都是 false
//判断当前线程是否有限时,是需要判断当前线程是否为核心线程的,这里用了很聪明的方法:判断工作线程数是否大于核心线程数
//如果工作线程数小于核心线程数,说明当前线程一定是核心线程,那么不限时
//反之,就直接认定进入到当前判断的是非核心线程,让它限时。
//注意,这里没有上锁
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果工作线程超过了最大值,或者当前线程超时了,那么当前线程应当被废弃
//额外条件:且此时 有别的工作线程(有别人处理任务) 或者 工作队列为空(没有任务需要处理)
//那么当前线程就确定可以废弃,return null,结束当前线程。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//CAS减小工作线程数,不取出任务
if (compareAndDecrementWorkerCount(c))
return null;
//自旋
continue;
}
//如果当前线程可用
try {
//如果限时,就通过限时方式poll()获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//如果不限时,直接阻塞获取任务
workQueue.take();
if (r != null)
//如果拿到了任务,直接返回执行
return r;
//进到这里说明超时了还没拿到任务
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit() 结束线程
如果线程获取工作队列任务失败,或者被判定为当前线程应当结束,将会退出 runWorker() 的 while 循环,并进入 processWorkerExit() 处理线程结束的工作:
- 如果当前线程是因为异常退出的,需要将线程池工作线程数更新-1
- 获取锁,逻辑上对 workers 临界区变量上锁
- 将当前线程从 workers 中取出,并释放锁
- 释放线程后,尝试终止线程池
- 根据情况,看看是否需要补一个工作线程
//移除当前工作线程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果执行了 completedAbruptly=false 语句,说明当前线程异常退出了。需要将线程池工作线程数更新-1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
//有线程退出的时候,将要操作 workers 这个非线程安全集合,它是临界区资源
//对临界区资源的 检查+修改,需要原子性操作,需要上锁。对 mainLock 上锁。
// mainLock 逻辑上用于 workers 资源的上锁, 这里顺便同时也对 completedTaskCount 这个互斥资源进行逻辑上的上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//工作记录,统计线程池总完成任务数
completedTaskCount += w.completedTasks;
//将该线程从 workers 集合中取出
workers.remove(w);
} finally {
//释放对 workers 资源的 mainLock 锁
mainLock.unlock();
}
//释放线程之后,尝试终止线程池
//终止条件:SHUTDOWN状态且工作线程数为0且工作队列为空 || STOP状态且工作线程数为0
tryTerminate();
//获取当前线程池状态
int c = ctl.get();
//如果线程池状态为 RUNNING 或者 SHUTDOWN
if (runStateLessThan(c, STOP)) {
//如果是正常状态溢出当前线程,进入if方法块
if (!completedAbruptly) {
// 核心线程数最小值允许多少?
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//工作队列不为空,设置工作线程的最小值为1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果工作线程数大于等于工作线程最小值,说明还有工作线程在线程池中,那就return,什么也不干。
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//1. 如果是不正常的方式移除了当前工作线程,再补一个工作线程!
//2. 线程池工作队列不为空,并且没有工作线程,再添加一个工作线程。
addWorker(null, false);
}
}
tryTerminate() 尝试终止线程
如果当前状态为 SHUTDOWN 或者 STOP ,而且满足变为 TIDYING 状态的条件,就CAS一次尝试将状态变为 TIDYING 。如果变为 TIDYING 成功,将继续变为 TERMINATED。在真正变为 TERMINATED 之前,会给一个钩子函数 terminated() 由外界/子类 实现,处理变为 TERMINATED 状态之前的最后操作。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//如果是运行态,就不终止线程
if (isRunning(c) ||
//如果已经是TIDYING状态了,就不作处理
runStateAtLeast(c, TIDYING) ||
//如果是SHUTDOWN,但是工作队列还有任务,不作处理( SHUTDOWN需要到工作队列为空的时候,才可以变成TIDYING)
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//如果是SHUTDOWN或者STOP状态,但是工作线程还没清空
if (workerCountOf(c) != 0) { // Eligible to terminate
//中断一个当前运行的线程
interruptIdleWorkers(ONLY_ONE);
return;
}
//如果可以变为TIDYING状态,上锁,因为terminated中的操作可能会是临界区资源
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//自定义处理线程池终止前的最后操作
terminated();
} finally {
//最终设置为TERMINATED线程池状态
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
shutdown() 将线程池变为 SHUTDOWN 状态
之所以在 mainLock.lock() 内还要对 ctl 变量进行自旋更新,是因为还有其他线程在没持有锁的时候,也对 ctl 进行自旋更新。例如:addWorker()中对工作线程数进行更改就没有上锁。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//判断能否SHUTDOWN
checkShutdownAccess();
//自旋将线程池运行状态改为 SHUTDOWN
advanceRunState(SHUTDOWN);
//中断所有线程
interruptIdleWorkers();
//线程池生命周期钩子函数
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//变为SHUTDOWN的时候,尝试变为TIDYING
tryTerminate();
}
shutdownNow() 将线程池变为 STOP 状态
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查能否SHUTDOWN
checkShutdownAccess();
//将状态改为STOP
advanceRunState(STOP);
//中断所有工作线程,中断响应可在runWorker的钩子函数中自定义处理(可以通过抛出异常来终止线程运行)
interruptWorkers();
//将工作队列中的任务全都溢出,并且添加到 tasks中。可以交给拒绝策略,或者外部处理这些还没完成的任务。
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//变为 STOP 状态后,尝试变为 TIDYING 状态
tryTerminate();
return tasks;
}
文中多次提到了钩子函数,其中最常用的是在 runWorker() 中,在线程执行任务前后添加的 beforeExecute() 和 afterExecute() 两个钩子函数。除了响应中断,我们还可以做很多操作,例如让线程暂停/回复 处理任务:
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) { super(...); }
protected void beforeExecute(Thread t, Runnable r) {
//确保继承的方法多次重写的调用链正常,要调用super.beforeExecute()
super.beforeExecute(t, r);
//获取暂停锁
pauseLock.lock();
try {
//条件变量,只要不满足,就阻塞等待
while (isPaused) unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}}
其中使用了条件变量,使用方法见 ReentrantLock 的条件变量文章。