文章目录
- 一.线程池简介
- 1. 什么是线程池
- 2. 线程池的优点
- 3. 线程池中核心关系继承
- 4.对线程池的理解
- ①框架的两极调度模型
- ②核心线程和非核心线程的本质区别
- 二. 线程池核心概念
- 1. 线程池核心参数
- 2.两种常见的线程池
- ①newCachedThreadPool
- ②newFixedThreadPool
- ③newSingleThreadExcutor
- ④newScheduledThreadPool
- ⑤newWorkStealingPool(int parallelism) 【了解】
- 阿里巴巴手册不建议使用Excutors来创建线程池
- 3. 线程池的状态
- 三.源码解读
- ①execute()
- 在代码中进行两次isRunning检查,为什么要进行两次?
- 简述线程池工作流程
- ②addWorker(Runnable firstTask,boolean core)
- ③Worker类
- ④addWorkerFailed(w)
- ⑤runWorker
- ⑥getTask 线程获取任务
- ⑦shutdonw()
- ⑦shutdownNow()
- ⑧tryTerminate()
- ⑨awaitTermination()
一.线程池简介
1. 什么是线程池
线程是稀缺资源,如果在高并发的情况下被无限制地创建和销毁,不仅会消耗系统资源,还会降低系统的稳定性。所以线程池的出现就是为了解决这些问题的。线程池通过重用已经存在的线程资源,减少线程创建和销毁的次数,提高了性能
2. 线程池的优点
● 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
● 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
● 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
3. 线程池中核心关系继承
4.对线程池的理解
①框架的两极调度模型
在JVM线程模型中,java线程被一对一映射为本地操作系统线程。java线程启动时会创建一个本地操作系统线程。当线程终止时,这个操作系统线程也会被终止回收。操作系统会调度所有线程并将它们分配给可用的CPU
在上层,java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器将这些任务映射到固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。
应用程序通过executor框架控制上层的调度,下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。
②核心线程和非核心线程的本质区别
本质没有区别,通过看源码的话,其实就是所有在线程池中的线程并没有一个特定的标识去标识这个线程是核心线程还是非核心线程,而是通过workCount当前线程数去对比线程池的核心线程数来看是核心线程还是非核心线程。
就像一个常说的核心线程不会自己销毁而非核心线程会自动线程。在底层当前线程会根据线程数和线程池核心线程数做对比来看自己该执行核心线程的逻辑(去queue中take值)还是非核心线程的逻辑(去queue中poll值),所以说某一个线程在执行时是否是核心线程需要看当前线程数和核心线程数进行对比来看的,一个线程在生命周期中既有可能是核心线程也有可能是非核心线程。
二. 线程池核心概念
1. 线程池核心参数
- **corePoolSize 😗*核心线程数,核心线程会一直存活,即使没有任务需要执行(除非allowCoreThreadTimeOut参数设置为true,这样的话即使是核心线程也会被超时销毁)
- maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
- workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
介绍一下常见的阻塞队列:
-
LinkedBlockingQueue链式阻塞队列,底层数据结构是链表,默认大小是Integer.MAX_VALUE,也可以指定大小。
-
ArrayBlockingQueue数组阻塞队列,底层数据结构是数组,需要指定队列的大小。
-
SynchronousQueue同步队列,内部容量为0,每个put操作必须等待一个take操作,反之亦然。
SynchronousQueue 也是一个队列来的,但它的特别之处在于它内部没有容器,一个生产线程,当它生产产品(即put的时候),如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递),这样的一个过程称为一次配对过程(当然也可以先take后put,原理是一样的)。
-
DelayQueue延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。
-
keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
-
unit : keepAliveTime 参数的时间单位。
-
threadFactory :executor 创建新线程的时候会用到。
-
handler :饱和策略。关于饱和策略下面单独介绍一下
①ThreadPoolExecutor.AbortPolicy :抛出 RejectedExecutionException来拒绝新任务的处理。
②ThreadPoolExecutor.CallerRunsPolicy :调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
③ThreadPoolExecutor.DiscardPolicy :不处理新任务,直接丢弃掉。
④ThreadPoolExecutor.DiscardOldestPolicy : 此策略将丢弃最早的未处理的任务请求。
2.两种常见的线程池
通过Excutors来创建线程池
①newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
运行流程:
-
提交任务进线程池。
-
因为corePoolSize为0的关系,不创建核心线程,线程池最大为Integer.MAX_VALUE。
-
尝试将任务添加到SynchronousQueue队列。
-
如果SynchronousQueue入列成功,等待被当前运行的线程空闲后拉取执行。如果当前没有空闲线程,那么就创建一个非核心线程,然后从SynchronousQueue拉取任务并在当前线程执行。
-
如果SynchronousQueue已有任务在等待,入列操作将会阻塞。
适用场景:
当需要执行很多短时间的任务时,CacheThreadPool的线程复用率比较高, 会显著的提高性能。而且线程60s后会回收,意味着即使没有任务进来,CacheThreadPool并不会占用很多资源。
②newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
核心线程数量和总线程数量相等,都是传入的参数nThreads,所以只能创建核心线程,不能创建非核心线程。因为LinkedBlockingQueue的默认大小是Integer.MAX_VALUE,故如果核心线程空闲,则交给核心线程处理;如果核心线程不空闲,则入列等待,直到核心线程空闲。
与前面的CachedThreadPool有什么区别?
- 因为 corePoolSize == maximumPoolSize ,所以FixedThreadPool只会创建核心线程。 而CachedThreadPool因为corePoolSize=0,所以只会创建非核心线程。
- 在 getTask() 方法,如果队列里没有任务可取,线程会一直阻塞在 LinkedBlockingQueue.take() ,线程不会被回收。 CachedThreadPool会在60s后收回。
- 由于线程不会被回收,会一直卡在阻塞,所以没有任务的情况下, FixedThreadPool占用资源更多。
- 都几乎不会触发拒绝策略,但是原理不同。FixedThreadPool是因为阻塞队列可以很大(最大为Integer最大值),故几乎不会触发拒绝策略;CachedThreadPool是因为线程池很大(最大为Integer最大值),几乎不会导致线程数量大于最大线程数,故几乎不会触发拒绝策略。
③newSingleThreadExcutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
有且仅有一个核心线程( corePoolSize == maximumPoolSize=1),使用了LinkedBlockingQueue(容量很大),所以,不会创建非核心线程。所有任务按照先来先执行的顺序执行。如果这个唯一的线程不空闲,那么新来的任务会存储在任务队列里等待执行。
④newScheduledThreadPool
创建一个定长线程池,支持定时周期性执行任务
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
能够进行定时的原因之一就是有一个延迟队列
newSingleThreadScheduledExecutor() 和 newScheduledThreadPool(int corePoolSize),创建的是个 ScheduledExecutorService,可以进行定时或周期性的工作调度,区别在于单一工作线程还是多个工作线程。
⑤newWorkStealingPool(int parallelism) 【了解】
这是一个经常被人忽略的线程池,Java 8 才加入这个创建方法,其内部会构建ForkJoinPool,利用Work-Stealing算法,并行地处理任务,不保证处理顺序。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
阿里巴巴手册不建议使用Excutors来创建线程池
- FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
- CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM
3. 线程池的状态
ThreadPoolExecutor类中使用了一些final int常量变量来表示线程池的状态 ,分别为RUNNING、SHUTDOWN、STOP、TIDYING 、TERMINATED。
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
- 线程池创建后处于RUNNING状态。
- 调用shutdown()方法后处于SHUTDOWN状态,线程池不能接受新的任务,清除一些空闲worker,不会等待阻塞队列的任务完成。
- 调用shutdownNow()方法后处于STOP状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。
- 当所有的任务已终止**,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。接着会执行terminated()函数。**
在ThreadPoolExcutor中有一个控制状态的属性ctl,他是一个AtomicInteger类型的变量。
其高3位表示当前线程池的运行状态
低29位表示线程池中的工作线程数量
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
三.源码解读
以下所有方法都仅针对于jdk1.8
①execute()
public void execute(Runnable command) {
//如果任务为空,那直接报错
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果线程数小于核心线程数,就让核心线程来处理任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
//处理失败,继续向下执行
c = ctl.get();
}
//尝试将任务放进工作队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果isRunning返回false(状态检查),则remove这个任务,然后执行拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);
//线程池处于running状态,但是没有线程,则创建线程。
//怎么会没有线程?
//因为线程池是允许设置核心线程允许超时的(allowCoreThreadTimeOut),所以老爷子在这里再次判断,很严谨,很细腻。如果没线程了,则addWorker(null, false);为什么是null?null在addWorker里有判断用处。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//工作队列已满,则尝试用非核心线程来处理任务
else if (!addWorker(command, false))
reject(command); //工作线程数已经超过了线程池最大线程数,调用拒绝策略
}
在代码中进行两次isRunning检查,为什么要进行两次?
在多线程的环境下,线程池的状态是时刻发生变化的。很有可能刚获取线程池状态后线程池状态就改变了。判断是否将command加入workqueue是线程池之前的状态。倘若没有二次检查,万一线程池处于非RUNNING状态(在多线程环境下很有可能发生),那么command永远不会执行。
简述线程池工作流程
其实从上面的源码部分我们就已经很容易看出来了
- 线程总数量 < corePoolSize,无论线程是否空闲,都会新建一个核心线程执行任务(让核心线程数量快速达到corePoolSize,在核心线程数量 < corePoolSize时)。注意,这一步需要获得全局锁。
- 线程总数量 >= corePoolSize时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行(体现了线程复用)。这个阶段还会进行双重检查,避免因为线程池状态的变化或当前线程池无线程导致放入的任务一直无法被执行
- 当缓存队列满了,说明这个时候任务已经多到爆棚,需要一些“临时工”来执行这些任务了。于是会创建非核心线程去执行这个任务。注意,这一步需要获得全局锁。
- 缓存队列满了, 且总线程数达到了maximumPoolSize,则会采取上面提到的拒绝策略进行处理。
②addWorker(Runnable firstTask,boolean core)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//见下文详解1
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//如果线程数超过了最大容量或者线程数超过了设置的线程数,则return,这个三目表达式秒啊,省了多少if else的代码
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS的方式将线程数+1,也就是ctl变量的低29位。
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)// 判断线程池状态是否发生变化,若没有变化则继续CAS。
continue retry; //如果CAS失败且状态发生拜年话,就进行自旋到retry开始,因为ctl一直在变化,我们必须保证ctl在正确的执行逻辑之内
}
}
//上半部分其实就是进行状态的判断和ctl的变更,是准备动作,下面的部分就是真正进行任务执行
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//将任务进行封装
w = new Worker(firstTask);
//添加一个新的还未开始的线程
final Thread t = w.thread;
if (t != null) {
//上锁的目的是保证workers.add(worker)方法在多线程操作时候是线程安全的
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
//再次检查当前线程池状态是否符合执行条件
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果线程已经被start过了,则抛出异常,不允许重复调用start
if (t.isAlive())
throw new IllegalThreadStateException();
// 添加任务到HashSet任务队列
workers.add(w);
int s = workers.size();
// 如果workers的长度(任务队列长度)大于阈值,则更新阈值。
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//以上:如果线程池的状态是RUNNING或者线程池状态是SHUTDOWN但是任务是null的话(execute第二步执行会是null),则添加任务到workers,且标记workerAdded = true;代表任务添加成功。最后finally里解锁
//真正的开启线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
详解1
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
结论:
- 如果线程池不是RUNNING状态且不是SHUTDOWN状态,则直接return false。外层的execute不做处理或执行拒绝策略。
- 如果线程池是SHUTDOWN状态,且此次有提交新的任务过来(最外层有取反操作),且任务队列是空,则return false,外层的execute不做处理或执行拒绝策略。
问题解答:
问题来了,什么叫此次有提交新的任务?
在execute方法里调用addWorker的地方有三个:
-
开启核心线程执行任务,addWorker(command, true);
-
核心线程满了,任务队列添加成功了,但是核心线程都超时了,导致线程池中线程数为0,addWorker(null, false);
-
核心线程满了,任务队列也满了。则开启非核心线程来执行任务,addWorker(command, false);
所以在execute中,第2步就叫没有提交新的任务过来,第1和3两步就叫有提交新任务过来,所以是在execute里控制的。
③Worker类
Worker主要用来封装线程和任务
// Worker类部分源码
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
//其余代码略...
}
Worker类实现了Runnable接口,所以Worker也是一个线程任务。在构造方法中,创建了一个线程,线程的任务就是自己。当任务执行时就会调用
④addWorkerFailed(w)
如果添加任务的流程中失败了或者添加成功了,但是执行任务的线程启动失败了,则走失败的策略。那失败的策略到底是啥呢?
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
很简单,就是要把任务移除(因为可能添加成功了,只是线程启动失败了,所以要remove掉),还需要将线程池中的任务数-1(ctl变量的低29位,CAS的方式进行减一)。
上锁的目的很清楚了吧?workers.remove(w)
是多线程并发执行的,所以需要上锁
⑤runWorker
runWork方法是在worker中的run方法调用的,即线程真正执行的时候调用的方法。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//哪儿来的解锁?请看详解1
w.unlock();
boolean completedAbruptly = true;
try {
//这里的循环体逻辑非常重要,请看 详情2
while (task != null || (task = getTask()) != null) {
w.lock();
/* 中断判断:
*
* 线程池状态是大于等于STOP的话(执行了shutdownNow),
* 并且你没有被中断过的话(!wt.isInterrupted()),则让线程中断,
* 也就是说,线程池状态都是大于等于STOP的了,那么设置中断标记位,告诉这个线程说:
* 你小子别找事啊,你赶紧给我回来,你被开除(中断)了。
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//在线程任务开始执行前做一些处理,可以自定义实现方法。模板方法。需要
//注意的地方是他只被try finally包起来了,没有catch,也就是说异常
//会被吞,即使报错,如果用户不做catch捕获的话,那么将不会影响线程下
//面的工作。这很关键,对下面的completedAbruptly有决定性的作用。
beforeExecute(wt, task);
Throwable thrown = null;
//开始执行任务,也就是你提交到线程池里的任务,且捕获异常。
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//在线程任务开始执行完后做一些处理,可以自定义实现方法。模板方法。需要
//注意的地方是他只被try finally包起来了(指的是第二层的try),没有catch,也就是说异常
//会被吞,即使报错,如果用户不做catch捕获的话,那么将不会影响线程下
//面的工作。这很关键,对下面的completedAbruptly有决定性的作用。
afterExecute(task, thrown);
}
} finally {
//首先可以发现也没有被catch捕获。其次就是一些辅助工作,比如task弄成null来辅助最外层的while循环,完成的任务数+1,解锁的工作。
//剩余没讲的代码还有最后一段,那就是:processWorkerExit,再说这个方法之前,必须看下前面提到两次的completedAbruptly是个什么鬼。
task = null;
w.completedTasks++;
w.unlock();
}
}
//很简单,如果任务正常得到执行,没有任何异常,他就是false,如果中途发生了异常,那就是true。具体含义是是否发生了中断,而且这个中断还是用户自己中断的,因为比如beforeExecute、afterExecute啥的都是重新才可能发生异常。这里就体现了两个可重写函数的作用,即用户设置completedAbruptly
completedAbruptly = false;
} finally {
//请看 详情3
processWorkerExit(w, completedAbruptly);
}
}
详解1
解锁?可有在哪儿加锁?
答案:这个与Woker类中的方法有关
// 哦豁,继承了AQS
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
Worker(Runnable firstTask) {
// 设置AQS是state为-1,主要目的是为了不让中断。
setState(-1);
// 设置任务
this.firstTask = firstTask;
// 创建线程
this.thread = getThreadFactory().newThread(this);
}
}
好了,我们知道构造器里面干了三件事,但是我们这里只关注第一件事,那就是setState(-1);,state是AQS的变量,-1啥意思?没啥意思,很简单,就是说我这个任务不可以被中断。那为啥要这么设置?废话,你都没开始执行呢,你只是new一个任务出来,线程都没启动,怎么可能允许中断呢?恍然大悟…!
那再看unlock干了啥?
public void unlock() {
release(1);
}
狗日的…!调用AQS的release方法,给state释放1,也就是说unlock后state变成了0,通俗点就是:我现在任务得到了执行,我要让他允许中断了,怎么允许?当然是state=0,这都是AQS的知识!
所以为啥Worker要继承AQS?因为他巧妙的运用了AQS的中断。
详情2
while循环拿任务
task一上来就是w.firstTask;,也就是说我们在addWorker方法里包装在Worker里的任务,第一次肯定不是null,所以会执行while循环体,执行完后再finally里给task弄成了null。所以这个条件仅在第一次执行的时候为true,因为每次finally都会把task弄成null。
task = getTask():从队列里取任务,取出来后赋值给task,如果队列里有任务就执行循环体,执行完成后会给任务从队列里remove掉,如果getTask获取不到任务则会阻塞,因为底层是BlockingQueue。
详情3
processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果任务执行过程中发生了报错,则CAS的方式把任务数-1。ctl的低29位。
if (completedAbruptly)
// CAS的方式将任务数-1。
decrementWorkerCount();
// 上锁 保证将任务移除队列的线程安全。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 将任务移除队列,因为任务已经执行完了嘛
workers.remove(w);
} finally {
// 解锁
mainLock.unlock();
}
// 钩子函数,在线程执行中因错误还调用
tryTerminate();
/*
* 下面这段代码的含义是如果线程池是RUNNING或者SHUTDOWN状态的话,
* 且任务顺利完成(completedAbruptly=false)的话,那么判断是否设置了允许核心线程超时
* 如果允许核心线程超时,且任务队列不等于空的话,那么开启一个线程来执行任务。
*
* 一言以蔽之:如果线程池是RUNNING或者SHUTDOWN状态的话,且任务队列不是空,那么至少保证线程池中有一个线程在执行任务
*/
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
上面将了这么多,这里总结一下runWorker
- 先unlock调用AQS的release方法,让任务可中断。(因为任务已经开始执行了,可以中断了)
- while循环拿任务,没任务就阻塞,采取的BlockingQueue的阻塞api
- 中断判断,线程池状态是大于等于STOP的话(执行了shutdownNow),就让线程中断
- 线程执行前会先执行beforeExecute,可重写
- 真正的任务执行
- 线程执行前会先执行afterExecute,可重写
- 执行完成后将任务从workers里remove掉
- 如果线程池是RUNNING或者SHUTDOWN状态的话,且任务队列不是空,那么至少保证线程池中有一个线程在执行任务
⑥getTask 线程获取任务
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//详解1
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//详情2
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//指定时间获取失败,r==null
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
详解1
timed
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
-
如果设置了核心线程允许超时的话,则timed=true,反之false。
-
如果线程池中活跃线程数大于核心线程数,则timed=true,反之false。
timed干啥用的?不是傻子的话都该看出来了,释放线程用的,也就是说非核心线程(大于核心线程数了)要被释放,允许核心线程超时的话也要被释放。
详情2
释放线程
Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();
如果timed是true,那么走workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)也就是设置获取任务的超时时间,到时间后还没获取到任务的话则会timeOut=true。
try {
//详情2
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//指定时间获取失败,r==null
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
timeOut=true的话,getTask有个判断会让其跳出循环,线程生命周期也自然而然的随之结束。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
反之如果timed是false的话,那么会执行workQueue.take();不带超时时间的,则一直阻塞等待有结果返回。
其实就是:从队列中获取任务,如果timed是true的话,则调用阻塞队列的poll方法阻塞一段时间获取任务,这段时间没任务的话,则超时设置timeOut=true,结束生命周期。否则调用take()方法一直阻塞等待任务到来,也就是核心线程为什么能一直存活的原因。
⑦shutdonw()
CAS将线程池状态设为SHUTDOWN->遍历空闲线程并进行中断->调用钩子方法->tryTerminate
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//加锁:防止多个线程同时shutdown
mainLock.lock();
try {
checkShutdownAccess(); //JVM权限相关,可以不看
advanceRunState(SHUTDOWN); //详解1
interruptIdleWorkers(); //详解2
//钩子函数,默认是个空方法,模板方法,按需重写。就是在线程池shutdown后可以自 //定义一些操作。比如ScheduledThreadPoolExecutor是ThreadPoolExecutor的 //子类,他就重写了onShutdown方法来做一些自定义的事情
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate(); //请看 详情4
}
详解1
advanceRunState(SHUTDOWN) :就是通过CAS自旋的方式给线程池状态设置为SHUTDOWN。
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
很简单,就是比大小。在这里就是我判断我线程池目前的状态是不是大于等于传递进来的线程池状态(SHUTDOWN),如果当前线程池状态比SHUTDOWN还大,那就直接是true,没必要进行后面的CAS了,直接break就完事了。
详解2
interruptIdleWorkers()
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
/**
* true:只中断其中一个线程, false:for循环遍历全部线程,逐个中断,无一幸免。
* 可以看到if (onlyOne) break;,也就是说如果是true,第一轮for后就break了,只中断一个。
**/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
//防止并发
mainLock.lock();
try {
//很简单,就是遍历当前线程池的全部任务,然后获取任务的线程,逐个进行interrupt中断。
for (Worker w : workers) {
Thread t = w.thread;
//请看 详情3 重点!!
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
//最后就是中断和finally解锁啦。
//解锁还有个点需要注意:就是需要解worker的锁:
mainLock.unlock();
}
}
详情3
:if (!t.isInterrupted() && w.tryLock()) {…}
很巧妙,tryLock承上启下,巧妙的一逼!
首先判断是不是被中断过了,如果已经被中断了,则下一轮循环,如果没被中断且w.tryLock()成功,则进行中断。
w.tryLock()是何意?下面分析也表明Worker为啥要继承AQS了。
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
可以看到tryLock很简单,就干一件事,就是将state设置为1,如果设置失败,则return false,也就是不会走到if里面,不会进行中断线程。如果设置成功了则进行中断线程。这个tryLock是点睛之笔,用于判断此线程是不是空闲线程,如果是空闲线程则进行中断,因为线程池SHUTDOWN了嘛,空闲线程肯定要回收。
什么情况下tryLock会失败?
这要回溯到runWorker方法:
final void runWorker(Worker w) {
try {
while (task != null || (task = getTask()) != null) {
w.lock();
}
} finally {
w.unlock();
}
}
看到了没,我先getTask()阻塞式获取任务,如果没获取到,肯定是阻塞了,如果获取到了,则给这个任务上锁,上锁解锁都干嘛了?
public void lock() {
acquire(1);
}
public void unlock() {
release(1);
}
神他妈逻辑,就是给state+1,state-1的操作。大彻大悟,如果当前线程获取到了任务,则interruptIdleWorkers#tryLock肯定失败,因为获取到任务后已经抢占锁了,代表当前worker到线程是活跃线程,不是空闲线程,不可被中断。如果没获取任务,阻塞在getTask那里了,那肯定是没上锁的,那tryLock肯定会返回true,代表这个线程数空闲线程,可以被中断。中断线程后,此线程在getTask会立即不在阻塞,发生InterruptedException,也就是可以终止那些正在执行workQueue.take()方法的工作线程.
这个方法就是:中断空闲的线程,很合理,因为都SHUTDOWN了,不接收新任务了,空闲的线程没啥用了。**但是怎么确定当前线程是不是空闲线程的,这就很巧妙,巧妙的运用了AQS的state状态位。在runWorker里承上启下。只是中断了空闲线程,任务队列是饱满状态,线程忙不过来的话就不会中断任何线程, 会等执行完workers等任务。**这就是SHUTDOWN。
⑦shutdownNow()
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
//防止并发问题
mainLock.lock();
try {
//可忽略
checkShutdownAccess();
//设置状态位为stop
advanceRunState(STOP);
//请看 详解1
interruptWorkers();
//请看 详解2
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
详解1
interruptWorkers();
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 循环全部任务,逐个中断。
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
// 只要线程状态是大于等于0的(也就是说调用了线程的start方法,因为new Worker的时候state=-1),
// 且线程没有被中断,那就中断,
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
不再像shutdown那么友好,只中断空闲线程,shutdownNow的interruptWorkers会中断全部线程。大概原理就是:循环遍历线程池中的全部任务,如果这些任务的线程状态是大于等于0的(也就是说调用了线程的start方法,因为new Worker的时候state=-1),且线程没有被中断,那就中断,中断后getTask会抛出一个中断异常,顺带可以停止那些正在执行workQueue.take()方法的工作线程,就是这么简单粗暴!
详解2
drainQueue()
private List<Runnable> drainQueue() {
// 当前线程池的任务队列
BlockingQueue<Runnable> q = workQueue;
// 最终返回的结果
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
// 将任务队列中的每个元素都放到ArrayList<Runnable>里面,每放成功一个就从q中移除一个
q.drainTo(taskList);
// 如果执行完drainTo后,q还不是空的,这是啥情况?
// 1. 上面报错了,按道理来讲报错后就跳出方法了,所以不是此种可能。
// 2. 为了延迟队列来的,延迟队列没有放进去ArrayList<Runnable>后将任务从老队列移除的操作,所以延迟队列的话就手动for
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
// 返回最终的任务List
return taskList;
}
看注释就行了,唯一需要注意两点:
1.直接返回BlockingQueue不行嘛?为啥还要转成ArrayList返回?
shutdownNow后肯定要清空任务队列的,shutdown不需要清空是因为他都会执行完。所以边清空边放到一个List里,统一返回类型。
2.都drainTo了,为啥还要再次判空,手动remove/add?
因为线程池的任务队列是用户自定义传参的,队列不同,drainTo的实现方案不同,如果是延迟队列的话是不具备删除功能的,所以手动remove/add。
小总结:
和shutdown很相似,区别在于:
- shutdownNow没有onShutdown钩子函数,我个人认为是因为shutdownNow代表很紧急,我把未完成的任务都给你,紧急关闭就行了,不支持钩子。而shutdown比较优雅,不紧不慢的,支持钩子自定义一些东西。
- shutdownNow中断全部线程,shutdown只中断空闲线程,忙着的线程会等处理完任务在中断。
- shutdown没有返回值,shutdownNow会把当前任务队列里的任务转成ArrayList返回回去。
- shutdown支持onShutdown钩子函数,shutdownNow不支持。
- shutdown给线程池状态设置为SHUTDOWN,shutdownNow给线程池状态设置为STOP。
⑧tryTerminate()
这个方法就干了一件事:设置线程池状态为TERMINATED状态且唤醒调用 awaitTermination() 方法的线程。
final void tryTerminate() {
//自旋
for (;;) {
//获取最新ctl值
int c = ctl.get();
//条件一:isRunning(c) 成立,直接返回就行,线程池很正常!
//条件二:runStateAtLeast(c, TIDYING) 说明 已经有其它线程 在执行 TIDYING -> TERMINATED状态了,当前线程直接回去。
//条件三:(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())
//SHUTDOWN特殊情况,如果是这种情况,直接回去。得等队列中的任务处理完毕后,再转化状态。
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//条件成立:当前线程池中的线程数量 > 0
if (workerCountOf(c) != 0) {
// 中断一个空闲线程,注意是一个,不是全部,因为ONL_YONE是true,其实就是通过中断信号,唤醒阻塞的线程(getTask()阻塞的)
//为什么只中断其中一个空闲线程而不是全部呢?
//原因就是因为tryTerminate方法不是只有在shutdown的时候才会调用,而是在运行完任务后
//processWorkerExit里面也会调用,所以每次执行完任务都会调用,所以每次都中断其中一个线程。
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
//获取线程池全局锁
mainLock.lock();
try {
//设置线程池状态为TIDYING状态。
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//调用钩子方法,默认空方法,需要自己重写。
terminated();
} finally {
//设置线程池状态为TERMINATED状态。
ctl.set(ctlOf(TERMINATED, 0));
//唤醒调用 awaitTermination() 方法的线程。
termination.signalAll();
}
return;
}
} finally {
//释放线程池全局锁。
mainLock.unlock();
}
}
}
⑨awaitTermination()
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
// 如果当前线程池状态大于等于TERMINATED了,也就是说已经被terminated了,则直接返回true
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
// 如果达到超时时间,已经超时,则返回false
if (nanos <= 0)
return false;
// 重置距离超时时间的剩余时长
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
在这里就是如果当前线程池状态大于等于TERMINATED了,也就是说已经被terminated了,则直接返回true。
如果达到超时时间,已超时,则返回false,否则就等待,重置距离超时时间的剩余时长。同时awaitNanos也会被tryTermination唤醒。
参考:
拆解ThreadPoolExecuto
java并发编程
线程池原理