线程池的意义
在讲解线程池之前,有些读者可能存在这样的疑惑:为什么需要线程池,线程池有什么优越性?
关于这个问题,主要从两个角度来进行解答:
减少开销
在大部分JVM上,用户线程与操作系统内核线程是1:1的关系,也就是说每次创建回收线程都要进行内核调用,开销较大。那么有了线程池,就可以重复使用线程资源,大幅降低创建和回收的频率。此外,也能一定程度上避免有人在写BUG时,大量创建线程导致资源耗尽。
便于管理
线程池可以帮你维护线程ID、线程状态等信息,也可以帮你统计任务执行状态等信息。
理解了线程池的意义,那么本文的主角便是JUC提供的线程池组件:ThreadPoolExecutor。
请注意,有人会将JUC中的ThreadPoolExecutor与Spring Framework中的ThreadPoolTaskExecutor混淆。这是两个不同的组件,ThreadPoolTaskExecutor可以理解为对ThreadPoolExecutor做的一层封装,主要就是为了支持线程池的Bean化,将其交给Spring Context来管理,防止滥用线程池。而内部的核心逻辑还是由ThreadPoolExecutor处理。关于这一点,简单了解即可。
首先尝试用一句话对ThreadPoolExecutor进行概括:
从宏观上看,开发者将任务提交给ThreadPoolExecutor,ThreadPoolExecutor分配工作线程
(Worker)来执行任务,任务完成后,工作线程回到ThreadPoolExecutor,等待后续任务。
根据这端描述,产生了三个比较值得探究的问题:
1. ThreadPoolExecutor自身有哪些状态,如何维护这些状态?
2. ThreadPoolExecutor如何维护内部的工作线程?
3. ThreadPoolExecutor处理任务的整体逻辑是什么样的?
源码之前,了无秘密。在读源码的过程中,脑海中带着这三个问题。
继承关系
ThreadPoolExecutor继承了AbstractExecutorService, AbstractExecutorService实现了
ExecutorService接口。ExecutorService接口继承了Executor接口,整体呈现出了这样的关系:
ThreadPoolExecutor→AbstractExecutorService→ExecutorService→Executor
从上往下来看:
public inter face Executor {
void execute (Runnable command);
}
Executor接口中只声明了一个execute方法,用于执行提交的任务。
ExecutorService扩展了Executor的语义,增加了多种多样的操作。
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awai tTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny (Collection<? extends Callable<T>> tasks)
throws Inter ruptedException,ExecutionException;
<T> T invokeAny (Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
而AbstractExecutorService则是对ExecutorService中声明的方法进行默认实现,方便子类进行调用。比如ThreadPoolExecutor就直接使用了AbstractExecutorService的submit方法。
AbstractExecutorService也是一个比较核心的类,但它不是本文的重点,所以不会详细讲解。
静态变量
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
这几个变量很关键,在注释中也已经有了比较详细的解释,英文还不错的同学不妨通读一遍。我这里就以更直白的的方式加以介绍,顺便帮你温习一些计算机基础知识。
先看下半部分的五个变量,从命名上可以判定这些值代表了ThreadPoolExecutor的状态。
这些状态值涉及到了二进制移位操作,我们知道int类型在Java中的二进制表示是以补码存储的,关于原码反码补码的基础知识这里不展开解释。那么-1的二进制表示是32个1的序列,COUNT_ BITS值是常数,为32-3=29。因此RUNNING的二进制表示是高三位为111,低29位都为0的序列。
我们用同样的方式表示出其余四个状态:
RUNNING: 11100000 00000000 00000000 00000000
SHUTDOWN: 00000000 00000000 00000000 00000000
STOP: 00100000 00000000 00000000 00000000
TIDYING: 01000000 00000000 00000000 00000000
TERMINATED: 01100000 00000000 00000000 00000000
不难发现,这五个状态可以理解为目前只用到了高三位,这是因为ThreadPoolExecutor只用一个int变量来同时保存线程池状态以及工作线程数这两个信息,线程状态使用高三位,工作线程数使用低29位。CAPACITY这个变量就表示为工作线程的最大数量。
TERMINATED: 00011111 11111111 11111111 11111111
这种将两种状态存储在一个二进制序列中的做法,在业务代码中相对比较少见,在底层源码中很常见。比如ReentrantReadWriteLock中,用一个int来组合表示读锁和写锁的个数,比如在ZooKeeper中,用一个long来组合表示epoch和事务个数。
这几种状态的含义是:
RUNNING:接受新任务,也能处理阻塞队列里的任务。
SHUTDOWN:不接受新任务,但是处理阻塞队列里的任务。
STOP:不接受新任务,不处理阻塞队列里的任务,中断处理过程中的任务。
TIDYING: 当所有的任务都执行完了,当前线程池已经没有工作线程,这时线程池将会转换为TIDYING状态,并且将要调用terminated方法。
TERMINATED: terminated方法调用完成。
这几个状态之间的变化如图所示:
属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));
//相关方法
private static int ctl0f(int rs,int wc) { return rs | wc; }
首先着重介绍的是AtomicInteger类型的属性ctl。
ctl就是上文所说的,组合了线程池状态以及池中工作线程数两个信息的变量。它初始化时调用了ctlOf方法,可以看到ctlOf只是一个或操作。这就说明,线程池在初始化时,状态被标记为RUNNING,工作线程数为0。
读到这里,有一些读者可能会存在疑惑:为啥非要用一个int值来组合表示两种状态?用两个值表示,清清楚楚不行吗?
可以,当然可以。但使用一个变量的好处是:如果需要对两个状态值进行同步修改,直接通过位操作就可以了,省去了加锁操作。因为在操作系统级别,对int的修改本身就是原子的。顺便提一下,像64位的double、long。在32位操作系统上对它们的操作不是原子的,可能出现半读半写问题。
再来看看其他属性,属性往往会透露出这个类是如何组织的。
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet <Worker> workers = new HashSet<Worker>();
private final Condition termination = mainLock.newCondition();
private int largestPoolSize;
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTime0ut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
一个个属性来看。每个属性其实在源码里都有注释,有兴趣的读者不妨去通读一遍,我这边来简单讲解一下。
workQueue
类型是BlockingQueue接口,用来存储积压任务的阻塞队列,具体实现类可以由用户自己定义。细节上有一个疑惑点: BlockingQueue的泛型是Runnable,难道Callable类型的任务就无法进入该阻塞队列吗?这是因为父类AbstractExecutorService将会把Callable对象转化为Runable的子类FutureTask,所以阻塞队列的泛型是Runnable没问题。
mainLock
ReentrantLock类型,可以料想对线程池的一些操作需要状态同步,所以需要用到锁。具体在哪里用到下文再看。
workers
HashSet类型,泛型是内部类Worker。Worker这个内部类可以视为对工作线程以及一些状态的封装,workers是用来存储所有Worker的集合。
termination
由mainLock创建的Condition,看变量名应该是用于terminal调用时的线程同步。
largestPoolSize
线程池中最多有过多少个活跃线程。
completedTaskCount
线程池总共处理了多少任务。
threadFactory
类型为ThreadFactory接口,用户可以自定义创建工作线程的工厂。
handler
拒绝策略,当workQueue满载时将会触发。
keepAliveTime
工作线程空闲时则保持存活的时间。
allowCoreThreadTimeOut
布尔类型,是否需要保持核心线程始终处于存活。
corePoolSize
核心线程数。可以看作稳定的工作线程数量,当阻塞队列还未满载时,线程池将保持核心线程数。
maximumPoolSize
最大线程数。可以看作弹性的工作线程数量,当阻塞队列满载时,线程池将会在核心线程数的基础上创建新线程来处理任务,直到最大线程数。
其中拒绝策略,核心线程数,最大线程数这三个变量比较重要,在下文中也会重点讲到。
内部类
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker (Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker (this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0,1)) {
setExclusiveOwner Thread (Thread. currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0) ;
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t. interrupt();
} catch (SecurityException ignore) {
}
}
}
}
继承关系
继承AQS:说明Worker内部存在同步需求
实现Runnable:Worker本身就是一个异步的任务调度者
当某个Worker从workQueue中获取一个任务后,便持有锁,直到将任务在当前线程内执行完后,再释放锁,获取新的任务。有的读者可能会有疑惑:这里根本不用锁呀,是不是有点多次一举。事实上,这里加锁的作用是表示Worker是否处于工作中,不接收中断信号。
方法
三个属性就不用进一步介绍了,注释写得很明确。在构造函数中,将AQS的state初始化为-1,之所以不初始化为0,是为了在初始化期间不接受中断信号,直到runWorker方法开始运行,即工作线程真的开始处理任务,state将 会被修改为0,此时相当于锁被释放的状态,可以接受中断信号。这部分逻辑可以从interruptlfStarted方法中理解。
run
来看Worker中最主要的run方法,其实也就是runWorker方法,该方法当Worker启动时便会调用。runWorker的主要逻辑在第8行开始的while循环。
final void runWorker (Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null | (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread. interrupted() &&
runStateAtLeast(ctl.get(),STOP))) &&
!wt. isInterrupted())
wt.interrupt();
try{
beforeExecute(wt,task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(W,completedAbruptly) ;
}
}
若当前线程从workQueue中获取任务,首先加锁,这里加锁不是为了防止并发,而是标记当前工作线程正在执行任务,不接收中断信号。
接下来14-18行用于判断当前线程池状态以及线程中断状态,大致意思就是若当前线程池状态>=stop(非running、shutdown状态) ,则对当前线程调用中断(当线程正在运行时,对线程调用中断只是标记将要中断的状态,线程不会立即中断)
19-37行是对任务本身的处理,有两个细节:
调用了task的run方法而不是start方法,表示依然在当前线程中处理,而非新启线程在task.run()方法的前后,有beforeExecute和afterExecute这两个方法,相当于线程池给每个任务都进行了切面。
33-37行中,完成任务数++,并释放锁,没什么问题。
若while判断条件中的getTask方法返回了null,那么将会跳出循环,调用finally块中的
processWorkerExit方法来对Worker进行回收。也就是说getTask方法如果返回null,那将会触发回收当前worker的行为。
这里需要了解的一点是:当workQueue中不存在排队任务时,不一定会直接返回null,更有以下两种情况:
工作线程阻塞等待,结合之前的信息,如果工作线程被标记中断,并且进入阻塞状态的话,那么将会触发中断,代表该worker需要被回收,此时getTask将会返回null。
线程池当前状态需要回收worker,此时getTask将会返回null。
getTask
带着这个预期,我们点开getTask方法来看。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for(;;) {
int C = ctl.get();
int rs = runState0f(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOPII workQueue. isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCount0f(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedout))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecr ementWorkerCount(c))
return null;
continue ;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
workQueue.take();
if (r != null)
return r;
timed0ut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
第2行
timedOut这个bool值变量用于记录上一次从阻塞队列里poll任务是否超时,接下来第4行开始就是一个循环。
5-12行
获取当前线程池的状态,若线程池的状态已经是STOP、TIDYING、TERMINATED或者是SHUTDOWN,且工作队列为空,那么返回null,代表当前worker可以回收,符合我们的预期。
14-24行
接下来,wc就是当前worker数量,allowCoreThreadTimeOut这个变量我们最开始说过,代表是否需要保持核心线程空闲时始终存活,默认是false。那么这个逻辑就是,如果当前工作线程数超过了最大线程数,或达到了核心线程数的回收条件,且池中还有其他工作线程在工作或workQueue时,则开始尝试回收当前worker。符合我们的预期。
26-35行
如果不满足上述两种回收条件,那么就开始从阻塞队列里获取任务,不同的是,poll操作在队列为空的时候,将直接返回null,而take操作将会等待,直到队列中有任务可以被取出。
这里有个细节: pol的超时时间keepAliveTime,即为我们最开始介绍的空闲线程的回收时间。也就是说,既然队列里无任务需要处理,那么也就代表该线程空闲,可以尝试进行新一轮的回收判断。
getTask这个方法我认为就是线程池动态维护工作线程的核心。设计比较巧,当我们在业务中自己处理一些复杂的生产消费问题时,可以借鉴这种思路。
方法
execute
看完了内部类,接下来便是ThreadPoolExecutor最重要的execute方法。
public void execute (Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCount0f(c) < corePoolSize) {
if (addWorker(command, true))
return;
C = ctl.get();
}
if (isRunning(c) & workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove (command))
reject (command);
else if (workerCountOf(recheck) == 0)
addWorker(null,false);
}
else if (!addWorker(command, false))
reject(command) ;
}
我们来看execute方法。注释告诉我们当你向线程池提交一个任务,线程池可能会创建一个线程来处理,也可能使用已有的线程处理。当然,不一定会立即处理,因为可能会被队列缓存起来。如果队列也已经满了,那么将会触发拒绝机制。
当向线程池提交一个任务,如果当前线程数小于核心线程数,那么就新增worker。由于上层可能存在并发提交任务的情况,那么这里很可能会由于核心线程数的限制而导致新增失败。当新增工作线程失败,则进入下面的流程。向阻塞队列offer一个任务,如果阻塞队列已满,那么继续尝试创建worker,不过此时创建的不是核心线程,而是奔着最大线程数去的,如果已经达到了最大线程数,那么触发拒绝策略。
而如果成功提交到了阻塞队列,这时再判断线程池的状态,如果处于非RUNNING状态,那么尝试移除任务,如果成功移除该任务,就触发拒绝策略。如果移除失败了,那就给这个任务一个被执行的机会,尝试新增worker去消费阻塞队列里的任务。
reject方法会调用具体的拒绝策略,ThreadPoolExecutor提供了默认的四种拒绝策略,当然也可以自定义拒绝策略,这些都是可以在ThreadPoolExecutor的构造方法中指定的。
这段逻辑我相信很多博客都已经讲过了,很多人面试时也被问到过。我们今天呢要关注更深入一点的东西。可以猜想,在addWorker的时候,一定会有并发问题需要处理,我们来看看是如何处理的。
addWorker
private boolean addWorker (Runnable firstTask, boolean core) {
retry:
for (;;) {
int C = ctl.get();
int rs = runState0f(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCount0f(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
C = ctl.get(); // Re-read ctl
if (runState0f(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
一开始一个大的循环我们暂时先不看,这段逻辑主要就是对当前线程池的一系列状态进行判断,判断当前时刻下是否还需要创建worker。当这个循环执行完并且没有退出,准确地来说也就是这里CAS操作成功,说明才真的有机会创建新的worker。
首先new一个Worker,Worker的构造器我们之前讲过了其中的一个细节。下面例行公事判断一下worker内的Thread对象是否为null,我觉得这里可能仅仅是谨慎而已,因为worker是刚构造出来的,其中的属性thread应该是不可能为null。
下面就要获取独占锁,之前也说到了,这里很可能出现多线程创建worker的情况,为了防止突破核心线程数和最大线程数的限制,这里必然要加锁。下面是对线程池状态的判断, 若处于这两种情况:
如果线程池处于非RUNNING的状态
线程池状态为SHUTDOWN且当前worker是为了用于消费阻塞队列里缓存的任务
便向worker set中新增该worker,此时释放锁。然后工作开始执行,如果t.start出现异常,将会进入finally块进行清理操作。
这里细心的同学会发现一个严重的问题,那就是若当前线程释放锁,其他线程获取锁进行worker的插入时,居然没有对当前线程数的判断,那岂不是很可能会导致实际worker数超过预设的最大线程数?
我们再来重新翻看上面的逻辑。
事实上,上面两段嵌套的循环已经进行了限制,当且仅当只有CAS成功的线程才能跳出这个循环,也就是无论什么时刻,都只能有一个线程执行下面创建worker的逻辑,而其他线程都将自旋等待,这就已经保证了线程安全。
既然创建worker的操作已经是线程安全的,为什么这里还要获取mainLock呢?这是为了同步对workers这个hash set的操作。因为hash set本身不是线程安全的,当这里正在向workers中add worker,其他地方很可能正在对workers进行remove操作。事实上workers这个属性上方的注释也已经说明了,当操作该属性时,需要获取mainLock。
因此既保证了新建worker操作是同步的,也保证了对workers执行add操作的线程安全。