1.前言
1.1 为什么要使用线程池?
线程池主要为了解决两个问题
- 一是当执行大量异步任务时,线程池能够提供较好的性能,避免了重复创建和销毁线程带来的开销
- 二是线程池提供了一种资源限制和管理的手段,比如限制线程个数,动态新增线程
1.2 类图
ThreadPoolExecutor
继承自AbstractExecutorService
,上图中的Executors
类提供了很多静态方法用于创建不同的线程池实例。
在ThreadPoolExecutor
中,成员变量ctl
是一个原子变量,用于记录线程池状态以及线程中的线程个数,类似ReentrantReadWriteLock
中,state
变量的高16位表示读状态,低16位表示写状态。
2.线程池常见成员变量
2.1 线程池状态相关成员变量
// 线程池用高3位表示线程池状态,低29位表示线程个数
// 这里的ctl变量就是保存线程池状态+线程池中线程个数的变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程个数掩码位数,值为Integer的位数(32)-3
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池中工作线程的数目,注意要和largestPoolSize区别开
// 线程池最大线程数:00011111 11111111 11111111 11111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态常量,用于判断线程池的状态
// (高三位)11100000 00000000 00000000 00000000
private static final int RUNNING = -1 << COUNT_BITS;
// (高三位)00000000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// (高三位)00100000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS;
// (高三位)01000000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS;
// (高三位)01100000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS;
这里RUNNING
相对于其他变量稍微有点区别,因为RUNNING
的值是一个负数通过左移计算得来的。计算机内部是使用补码存放值的,-1的补码为11111111 11111111 11111111 11111111
,左移29位自然就是上面的值了。其他的正数进行左移自然就不用多说。
这里额外说一下线程中常用的用于获取线程状态及线程个数的方法,我们已经知道了ThreadPoolExecutor
用到了一个原子变量保存线程状态及线程个数,所以这几个方法都是对ctl
这个变量进行操作。
// 用于获取ctl变量中的运行状态:***00000 00000000 00000000 00000000
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 用于获取ctl变量中的线程个数:000***** ******** ******** ********
private static int workerCountOf(int c) { return c & CAPACITY; }
// 计算ctl变量的新值
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 表示线程池的运行状态是否小于s
private static boolean runStateLessThan(int c, int s) { return c < s; }
// 表示线程池的运行状态是否至少为s
private static boolean runStateAtLeast(int c, int s) { return c >= s; }
2.2 线程池其他常见成员变量
// 存储了线程池中所有的工作线程,用于快速查询线程池中是否存在某个线程。
// 当一个线程加入到线程池中时,会被 workers 集合持有。当线程被移除时,会从集合中删除。
private final HashSet<Worker> workers = new HashSet<Worker>();
// 工作线程的阻塞队列,用于存放等待执行的任务。
private final BlockingQueue<Runnable> workQueue;
// 线程池从开始到现在有过的最大线程数目
private int largestPoolSize;
// 线程池总完成任务量计数
private long completedTaskCount;
3.线程池状态
从上面代码我们可以得知,线程池的状态可以分为:
RUNNING
:接受新任务,并且处理阻塞队列中的任务SHUTDOWN
:拒绝新任务,但是处理阻塞队列中的任务STOP
:拒绝新任务,并且抛弃阻塞队列中的任务,同时会中断正在处理中的任务TIDYING
:所有的任务(包括线阻塞队列中的任务)都执行完后,当前线程池中活动线程数为0,将调用terminated
方法TERMINATED
:终止状态。terminated
方法调用完成以后的状态
线程池的状态转换
RUNNING -> SHUTDOWN
:显式调用shutdown()
方法,或者隐式调用了finalize()
方法里面的shutdown()
方法。RUNNING 或 SHUTDOWN-> STOP
:显式调用shutdownNow()
方法时。SHUTDOWN -> TIDYING
:当线程池和任务队列都为空时。STOP -> TIDYING
:当线程池为空时。TIDYING -> TERMINATED
:当 terminated() hook 方法执行完成时。
4.线程池构造方法中的参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {...}
corePoolSize
:线程池核心线程个数。workQueue
:用于保存等待执行的任务的阻塞队列,意味着N个核心线程正在执行任务。- 常见的阻塞队列有
- 基于数组的有界ArrayBlockingQueue
- 基于链表的无界 LinkedBlockingQueue
- 最多只有一个元素的同步队列 SynchronousQueue
- 优先级队列 PriorityBlockingQueue
- 常见的阻塞队列有
maximunPoolSize
:线程池最大线程数量。当核心线程都正在执行任务,且阻塞队列已满,那么就会创建新的线程,直到maximunPoolSize个。ThreadFactory
:创建线程的工厂。RejectedExecutionHandler
:饱和策略,当队列满并且线程个数达到 maximunPoolSize后采取的策略。- 常见的包和策略有
- AbortPolicy(抛出异常)
- CallerRunsPolicy(使用调用者所在 线程来运行任务)
- DiscardOldestPolicy(调用 poll 丢弃一个任务,执行当前任务)
- DiscardPolicy(默默丢弃 , 不抛出异常)
- 常见的包和策略有
keeyAliveTime
:存活时间。如果当前线程池中的线程数量比核心线程数量多,并且是闲置状态,则这些闲置的线程能存活的最大时间。TimeUnit
:存活时间的时间单位。
5.源码分析
execute
这个不必多说,是最常用的方法,就是向线程池提交任务。
public void execute(Runnable command) {
// 判断任务是否为null,是则抛出空指针异常
if (command == null)
throw new NullPointerException();
// 判断当前线程池中的线程数目是否超过核心线程数目
// 获取当前ctl的值
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 如果没有超过核心线程数目,那么就调用addWorker方法增加线程池中的线程,然后就此返回
if (addWorker(command, true)) // 这里第二个参数用于标识添加的线程是否是核心线程
return;
// 重新获取ctl的值,因为执行addWorker方法时,线程池的状态可能已经发生了变化
c = ctl.get();
}
// 上面的代码执行完,如果继续向下走,只有两种情况:
// (1)workerCountOf(c) = corePoolSize:这种情况会把任务放入阻塞队列
// (2)workerCountOf(c) > corePoolSize:这种情况会创建新线程
// 如果线程池处于运行状态,那么就添加任务到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
// 二次检查,重新获取ctl的值,原因同上
int recheck = ctl.get();
// 如果线程池已经不处于recheck状态,那么就尝试将刚刚添加的任务移除,同时执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 当前线程为空则添加一个新的线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 添加任务到阻塞队列失败,换句话说就是阻塞队列已满
else if (!addWorker(command, false))
// 执行拒绝策略
reject(command);
}
接下来我们将详细讲解public void execute(Runnable command)
方法中的addWorker
方法。
addWorker
该方法的主要作用是向线程池中添加工作线程。
private boolean addWorker(Runnable firstTask, boolean core) {
// 外层循环
retry:
for (;;) {
// 获取ctl变量的值
int c = ctl.get();
// 获得线程池状态
int rs = runStateOf(c);
// (1)
// 检查队列是否只在必要时为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内层循环
// 循环CAS增加线程个数
for (;;) {
// 获取当前线程池中的线程数目
int wc = workerCountOf(c);
// 如果线程数目超限则直接返回false意味着添加工作线程失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// (2)
// 这里使用CAS增加线程个数,只有一个线程可以CAS成功
if (compareAndIncrementWorkerCount(c))
// CAS操作成功则跳出外循环,继续向下执行
break retry;
// CAS操作失败以后,判断线程池状态是否发生改变
// 如果变化则重新获取线程池状态值,进行下一轮的外层循环,如果未发生变化,那么就直接进行下一轮的内层循环(继续CAS增加线程数目)
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
/**--------------------------------------------------------------------------------------------**/
// 标识新增线程是否启动成功
boolean workerStarted = false;
// 标识新增线程是否添加成功
boolean workerAdded = false;
Worker w = null;
try {
// 创建worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加独占锁,为了实现workers同步,因为可能多个线程调用了execute方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 重新检查线程池状态,以免在获取锁前调用了shutdown接口
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 判断线程是否可以启动
if (t.isAlive())
throw new IllegalThreadStateException();
// 添加任务
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 将添加任务标识修改为成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加任务成功以后则启动任务,同时将启动任务标识修改为true
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程启动失败,则调用addWorkerFailed方法将worker回滚到创建时的创建状态
if (! workerStarted)
addWorkerFailed(w);
}
// 最后将worker启动标识返回
return workerStarted;
}
addWorker
方法有点长,但是大体可以分为两部分来看,根据代码中我标注的横线注释可以分为
- 双重循环通过CAS操作增加线程数(注意是线程数,不是线程!)
- 把并发安全的部分添加到
workers
里面,并启动任务执行
大体逻辑除了有点长,其实还是很好理解的,不过我在代码中标注的两个序号(1)(2)要稍微注意下。
(1)
下面的判断代码,其实等价于下面的代码
rs >= SHUTDOWN &&
(rs != SHUTDOWN ||
firstTask != null ||
workQueue.isEmpty())
也就意味着在(1)中上面的判断中有三种情况会返回true,也就是addWorker
方法失败,返回false
-
当前线程池状态为
STOP
、TIDYING
或TERMINATED
- 这三个状态都意味着线程池即将关闭,这个时候添加工作线程是不合理的。
- RUNNING(
11100000 00000000 00000000 00000000
)是负数,不满足条件
-
当前线程池状态为
SHUTDOWN
,并且firstTask不为null。- 如果线程池为
SHUTDOWN
状态,这个时候添加有任务的工作线程本身就是不合理的。如果添加一个不携带任务的工作线程,加快SHUTDOWN
转变为TIDYING
倒是合理。
- 如果线程池为
-
当前线程池状态为
SHUTDOWN
,并且任务队列为空- 这就意味着任务队列为空,此时线程池中可能可能还有正在执行任务的线程,当这几个正在执行的线程执行完毕,就会转变为
TIDYING
状态。没有多的任务要做却创建一个不携带任务的工作线程,有点浪费的意思了。
- 这就意味着任务队列为空,此时线程池中可能可能还有正在执行任务的线程,当这几个正在执行的线程执行完毕,就会转变为
以上内容皆为个人愚见,如果错误,请多多指教!
(2)
这个地方的代码是使用CAS操作增加线程数目,我们直到如果众多线程并发CAS修改一个变量,只能有一个变量修改成功同时返回false,其他的线程就都会CAS失败返回false。
- 如果CAS操作成功,也就意味着addWorker中上半部分的代码任务已经执行完毕,直接跳出外层循环。
- 但是如果CAS操作失败,就会判断当前线程池的状态是否改变
- 如果没有改变(当前线程增加线程数目的任务还没有完成)就会继续内层循环继续尝试增加线程数目
- 如果已经改变,那么就会进行下一轮的外层循环,重新获取线程池的状态变量然后重复上述逻辑直到CAS增加线程数目成功
runWorker
从类图我们可以得知,Worker
类实现了Runnable
接口,所以他一定实现了了run()
,而这个run()
方法正是调用runWorker方法。
在前面addWorker
方法中,我们已经看到了当Worker
成功添加进workers
集合后,会调用start()
方法,start
方法会开启一个线程,线程就绪以后会执行run
方法,这部分其实也就是接着上述addWorker
的w.start()
逻辑来的。所以我们接下来会详细讲runWorker
。
首先看一下Worker的构造方法:
Worker(Runnable firstTask) {
// 设置state为-1,在调用runWorker方法前禁止中断
setState(-1);
// 将任务传给Worker的成员变量firstTask
this.firstTask = firstTask;
// 创建一个线程
this.thread = getThreadFactory().newThread(this);
}
随后来看看runWorker方法到底做了什么?
final void runWorker(Worker w) {
// 获得当前工作线程
Thread wt = Thread.currentThread();
// 获得Worker中的任务
Runnable task = w.firstTask;
// 将Worker中的firstTask置为null
w.firstTask = null;
// 在我们创建Worker的时候构造方法中将state置为了-1,禁止被中断,这里将state置为0,意味着允许被中断
w.unlock();
// 标识线程是否是突然退出,如果在任务执行完毕以前(completedAbruptly为true),当前线程被中断,就会为true标识突然中断
boolean completedAbruptly = true;
try {
// 如果task不为null或者从任务队列获取的任务不为null,则进入循环
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果池停止,请确保线程中断;如果没有,请确保线程未中断。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务前干一些事情(未定义)
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 正式执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务后做一些事情(未定义)
afterExecute(task, thrown);
}
} finally {
// 任务完成后手动释放内存
task = null;
// 将Work的完成任务计数器+1
w.completedTasks++;
// 释放锁
w.unlock();
}
}
// 标识线程是否是突然退出
completedAbruptly = false;
} finally {
// 执行清理工作
// 第一个参数是工作线程对象,第二参数标识线程是否是被突然中断
processWorkerExit(w, completedAbruptly);
}
}
整体的代码不算长,逻辑也比较清晰,重要的是最后的processWorkerExit
方法需要看一下。
要注意的是runWorker
方法中有一个while
循环,这个循环只有在任务队列为空时才会停止,停止之后才会调用processWorkerExit
方法清除多余的线程(除核心线程外的线程)。
同时这里提一下ThreadPoolExecutor
是如何缓存复用线程的:
ThreadPoolExecutor
中维护了一个线程池的核心线程数和最大线程数,只要任务队列中还有未执行的任务,线程池就会维护至少核心线程数的线程在空闲状态,以便及时处理新来的任务。当任务执行完毕后,执行该任务的工作线程就会从
BlockingQueue
中获取下一个任务,如果队列中还有未执行的任务,则工作线程就会继续执行该任务,否则工作线程就会将自己从线程池的工作线程列表workers
中移除。
processWorkerExit
这个方法主要是为了清除多余的空闲线程。比如,我们上面已经说到的,while循环结束,也就是任务队列为空,那么除了核心线程之外的线程已经没有任务处理了,所以要清除掉。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果是被突然中断了,则不调整线程池中线程数目
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 统计整个线程池完成的任务个数,并从工作集中删除当前Worker
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 工作线程执行完任务后,及时将线程从线程池中移除,释放资源,避免浪费系统资源。
workers.remove(w);
} finally {
mainLock.unlock();
}
// 如果当前是SHUTDOWN状态并且工作队列为空,或者当前是STOP状态,当前线程池中没有活动线程。那么尝试设置线程池状态为TERMINATED
tryTerminate();
// 如果当前线程个数小于核心线程个数,则增加线程
int c = ctl.get();
// c < STOP : RUNNING
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
shutdown
调用shutdown方法以后,线程池就不会接受新的任务了,但是工作队列里面的任务还是要执行的。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限检查,查看当前线程是否有关闭/中断线程的权限,如果没有则抛出SecurityException或NullPointerException
checkShutdownAccess();
// 设置当前线程池状态为SHUTDOWN,如果已经是SHUTDOWN则直接返回
advanceRunState(SHUTDOWN);
// 设置中断标志
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
}
// 尝试将线程池状态修改为TERMINATED
tryTerminate();
}
其中前两个方法不太重要,我们看一下interruptIdleWorkers方法。
interruptIdleWorkers
该方法的主要目的就是将所有空闲线程的中断标识设置为true。
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历workers中的所有worker
for (Worker w : workers) {
Thread t = w.thread;
// 如果当前worker的线程没有被设置为中断,并且没有正在运行则将其设置为中断。
// 简单来说就是将所有空闲线程的中断标志设置为true
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
for (Worker w : workers) {
Thread t = w.thread;
// 如果当前worker的线程没有被设置为中断,并且没有正在运行则将其设置为中断。
// 简单来说就是将所有空闲线程的中断标志设置为true
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}