一、前言
创建一个线程可以通过继承Thread类或实现Runnable接口来实现,这两种方式创建的线程在运行结束后会被虚拟机回收并销毁。若线程数量过多,频繁的创建和销毁线程会浪费资源,降低效率。而线程池的引入就很好解决了上述问题,线程池可以更好的创建、维护、管理线程的生命周期,做到复用,提高资源的使用效率。也避免了开发人员滥用new关键字创建线程的不规范行为。
说明:阿里开发手册中明确指出,在实际生产中,线程资源必须通过线程池提供,不允许在应用中显式的创建线程。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
通过分析ThreadPoolExecutor线程池的工作原理、核心参数以及执行过程来深入了解线程池相关工作原理。
二、ThreadPoolExecutor
ThreadPoolExecutor是java中实现线程池的核心类,主要用于管理和异步执行任务。从以下几个方面分析工作原理
1、生命周期
线程存在生命周期,同样线程池也有生命周期。生命周期中存在5个状态。
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
线程池的开启到关闭过程就是这5个状态流转过程,状态之间转换流转如下图。
5个状态描述说明如下
状态 | 含义 |
RUNNING | 运行状态,该状态下线程池可以接受新的任务,也可以处理阻塞队列中的任务 执行 shutdown 方法可进入 SHUTDOWN 状态 执行 shutdownNow 方法可进入 STOP 状态 |
SHUTDOWN | 待关闭状态,不再接受新的任务,继续处理阻塞队列中的任务 当阻塞队列中的任务为空,并且工作线程数为0时,进入 TIDYING 状态 |
STOP | 停止状态,不接收新任务,也不处理阻塞队列中的任务,并且会尝试结束执行中的任务 当工作线程数为0时,进入 TIDYING 状态 |
TIDYING | 整理状态,此时任务都已经执行完毕,并且也没有工作线程 执行 terminated 方法后进入 TERMINATED 状态 |
TERMINATED | 终止状态,此时线程池完全终止了,并完成了所有资源的释放 |
线程池的重点之一就是控制线程资源合理高效的使用,所以必须控制工作线程的个数,所以需要保存当前线程池中工作线程的个数。ThreadPoolExecutor中使用一个AtomicInteger类型的ctl属性存储了线程池的状态和工作线程个数。
ctl的高3位用来表示线程池的状态(runState),低29位用来表示工作线程的个数(workerCnt)。因为线程池有5个状态,2位只能表示4个状态,所以用3位来表示5个状态。
2、创建线程池
1. 构造方法
通过ThreadPoolExecutor类的构造方法,来创建一个线程池,其中构造方法上有7大核心参数,通过7大核心参数的配置来定制化线程池。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
7大核心参数含义:
corePoolSize:核心线程数
maximumPoolSize:最大线程数
keepAliveTime:非核心线程空闲时间
unit:keepAliveTime的时间单位
workQueue:工作阻塞队列
ThreadFactory:线程工厂提供创建线程的方式,默认使用Executors.defaultThreadFactory()创建。
handler:拒绝策略
2. 核心参数详解
ThreadPoolExecutor的构造方法中包含了7大核心参数,通过配置核心参数可以定制化线程池。接下来分别介绍参数
corePoolSize 和 maximumPoolSize
通过corePoolSize和maximumPoolSize在构造方法中设置线程池边界,来调整线程池中的工作线程数量。工作线程个数自动调整有两个场景:
a. 工作线程个数小于corePoolSize
当线程池提交一个新任务,且工作线程个数小于corePoolSize时,即使其他工作线程空闲,也会创建一个新线程来执行任务
b. 线程数量介于corePoolSize和maximumPoolSize之间
当运行的工作线程数量大于corePoolSize但小于maximumPoolSize,且阻塞队列已满时 才会创建非核心工作线程。
默认情况下,即使是核心线程也是在新任务到达时开始创建和启动的。若使用非空队列创建线程池,可通过重写prestartCoreThread或prestartAllCoreThreads方法动态覆盖,进行线程预启动。
keepAliveTime
keepAliveTime参数用来设置工作线程空闲时间。非核心线程通过poll方法获取任务,若获取不到,那么线程会keepAliveTime长的时间进行重复获取。当执行时间大于keepAliveTime时,线程就会采用CAS(比较-替换)随机方式进行线程销毁流程。
workQueue
workQueue参数用来指定存放提交任务的队列,任务BlockingQueue都可以用来传输和保存提交的任务。队列大小与线程数量之间存在关系:
a. 若线程数小于corePoolSize,对于提交的新任务会创建一个新的线程处理,不会把任务放入到队列。
b. 若线程数介于corePoolSize和maximumPoolSize之间,新提交的任务会被放到阻塞队列。
c. 若线程池处于饱和状态,即无法创建新线程且阻塞队列已满,那么新提交的任务交给拒绝策略处理。
线程池常见的阻塞队列一般有SynchronousQueue(同步队列)、LinkedBlockingQueue(无界阻塞队列)、ArrayLiBlockQueue(有界阻塞对列)、DelayedWorkQueue(延迟阻塞队列)。
- SynchronousQueue不是真正意义上的队列,它没有容量也不存储任务,只是维护一组线程,等待着任务加入和移除队列,相当于直接交接任务给具体制定的队列,在CacheThreadPool线程池中使用。
- LinkedBlockingQueue是采用链表实现的无界队列。若不预定义LinkedBlockingQueue队列容量,当所有核心线程都在执行时,就会无限添加任务到队列中,可能导致OOM,且这种场景下maximumPoolSize的值对线程数无影响。在SingleThreadExecutor和FixedThreadPool线程池中使用。
- ArrayBlockingQueue是通过数组实现的有界队列。有界队列和有限的maximumPoolSize一起使用有助于防止资源耗尽。使用ArrayBlockingQueue可以根据应用场景,可以预先估计线程数和队列容量,互相权衡队列大小和线程数:
- 大队列和小线程数:减少线程数,可以最大限度的减少CPU使用率、操作系统资源和上下文切换开销,可能会导致吞吐量降低
- 小队列和大线程数:较大的线程,若任务提交速度快,会在短时间提升CPU使用率,提高系统吞吐量。若任务经常阻塞(如IO阻塞),会使得CPU切换更加频繁,可能会有更大的调度开销,这也会降低吞吐量
- DelayedWorkQueue是采用数组的完全二叉树实现小根堆特性的延迟队列。在堆中每个父节点的值都不大于子节点的值,因此堆顶元素(数组第一个元素)总是当前队列中延时最小的任务。队列中元素是ScheduledFutureTask对象,对象封装了任务的执行逻辑以及延迟time信息。在ScheduledThreadPool线程池中使用。
threadFactory
该参数提供了线程池中线程创建方式,这里使用了工厂模式ThreadFactory创建新线程,默认情况下使用Executors.defaultThreadFactory创建,它创建的线程都在同一个ThreadGroup,且具有想同的NORM_PRIORITY优先级和非守护进程状态。也可以自定义ThreadFactory修改线程的名称、线程组、优先级以及守护程序状态等。
handler
若线程池处于饱和状态没有足够的线程数或队列空间来处理提交的任务,或者线程池已处于关闭状态但还在处理进行中的任务,那么新提交的任务就会由拒绝策略处理。
出现以上任何情况,execute方法都会调用RejectExecutionHandler.rejectExecution()方法进行拒绝策略处理。线程池提供了四种预定义的拒绝处理策略
ThreadPoolExecutor.AbortPolicy(默认策略):
行为:当工作队列已满且无法再添加新任务时,直接抛出RejectExecutionException异常。
场景:适用于哪些不能容忍任务被丢弃或延迟执行的场景。因为会立即通知调用者任务被拒绝,从而可以采取相应的处理措施。如订单处理
ThreadPoolExecutor.DisCardPolicy:
行为:当工作队列已满且无法再添加新任务时,直接丢弃新任务,不做任务处理。
场景:适用于哪些任务可以被安全忽略的场景,或任务执行与否对系统整体影响不大的情况。如日志收集
ThreadPoolExecutor.DiscardOldestPolicy:
行为:当工作队列已满且无法再添加新任务时,丢弃队列中最早的任务(即等待时间最长的任务),然后尝试重新提交当前任务。
场景:适用于哪些任务可以相互替代,或较早的任务执行结果对当前系统状态影响不大的场景。如消息队列处理系统,若消息队列已满且新消息已来,可以采用该策略
ThreadPoolExecutor.CallerRunsPolicy:
行为:当工作队列已满且无法再添加新任务时,由提交任务的线程来执行该任务,即任务在调用线程(提交任务的线程)的上下文中执行。
场景:适用于需保证任务不被丢弃,且任务执行时间相对较短的场景。如在线视频处理
除了上述四种提供的拒绝策略外,还可通过实现RejectedExecutionHandler接口来自定义拒绝策略。
3、工作流程
了解了线程池中生命周期和核心参数,接下来了解下线程池整体工作流程,如图:
上图是一张线程池工作的精简图,线程池工作流程主要包含提交任务、创建工作线程并启动、获取任务并执行、销毁工作线程几部分。
1. 提交任务
当线程池通过execute提交任务时,线程池有三种处理情况,分别是创建工作线程执行该任务、将任务添加到阻塞队列、拒绝该任务。提交任务过程可以拆分一下几步:
a. 线程数小于corePoolSize时,通过addWorker创建新的核心线程处理该任务
b. 线程数等于corePoolSize且非空闲时,将任务添加到阻塞队列中。
c. 若添加成功,需二次验证线程池状态,若为非RUNNING状态,则需将该任务从队列中移除,然后拒绝该任务。 若为RUNNING状态且当前工作线程数为0,则需主动创建一个空任务的非核心线程来执行队列中该任务。
d. 若添加失败,则队列已满,创建新的临时线程(非核心线程)执行该任务。
e. 若创建临时线程(非核心线程)失败,则说明工作线程等于maximumPoolSize,只能拒绝该任务。
ThreadPoolExecutor.execute源码解析
//ThreadPoolExecutor.execute()方法执行任务
public void execute(Runnable command){
if(command == null){
throw new NullPointerExecption();
}
int c = ctl.get();
//获取ctl低29位中的工作线程数与核心线程数比较,若小于核心线程数 直接创建worker对象执行任务
if(workerCount(c) < corePoolSize){
if(addWorker(command, true))
return;
c = ctl.get();
}
//当线程池为RUNNING状态且worker数量超过核心线程数,任务直接放入到阻塞队列中
if(isRunning(c) && workerQueue.offer(command)){
int recheck = ctl.get();
//线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()拒绝操作。
//这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。
if(!isRunning(recheck) && remove(command)){
拒绝处理任务
reject(command);
}else if(workerCountof(recheck) == 0){
//当工作线程个数为0时,创建一个空任务的非核心工作线程执行队列中任务
//这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0
addWorker(null, false);
}
}
// 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务。
// 这儿有3点需要注意:
// 1. 线程池不是运行状态时,addWorker内部会判断线程池状态
// 2. addWorker第2个参数表示是否创建核心线程
// 3. addWorker返回false,则说明任务执行失败,需要执行reject拒绝操作
else if(!addWorker(command, false))
reject(command);
}
2. 创建工作线程并启动
ThreadPoolExecutor线程池核心任务单元是Worker内部类实现的。创建工作线程首先就是创建Worker对象,Worker对象是通过核心addWorker方法创建工作线程,在通过Worker对象中的Thread对象启动线程。addWorker方法包含两部分操作。
a. 校验线程池状态以及工作线程个数
i. 先判断线程池状态
ii. 在判断工作线程个数
iii. 通过CAS原子性对ctl属性低29位的值+1
b. 添加工作线程并启动工作线程
i. 所有校验通过后,先new Worker对象,再将worker对象放大HashSet集合中
ii. 然后拿到worker对象中的Thread对象执行.start方法启动工作线程,通过runWorker执行任务
addWorker方法执行源码解析
private boolean addWorker(runnable firstTask, boolean core){
//外层for循环是在校验线程池状态
//内层for循环是在校验工作线程个数
//retry是给外层for添加一个标记,是为了方便在内层for循环跳出外层for循环
retry:
for(; ;){
//获取存储线程池状态的ctl属性
int c = ctl.get();
//拿到ctl的高3位的值 即线程池状态
int rs = runStateOf(c);
//========================线程池状态判断=======================//
//如果线程池状态是SHUTDOWN;并且此时阻塞队列中有任务 核心线程数为0,添加一个工作线程(非核心线程)去处理阻塞队列的任务
//判断线程池状态是否大于等于SHUTDOWN,若满足可能为SHUTDOWN、STOP、TIDING、TERMINATED,说明线程池状态不是RUNNING(才能处理任务)
if(rs >= SHUTDOWN &&
//若三个条件都满足,就代表是要添加非核心线程去处理阻塞队列中的任务
//若三个条件有一个不满足,返回false 配合!,表示不需要添加工作线程
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())){
//不需要添加工作线程
return false;
}
for(; ;){
//========================工作线程个数判断=======================//
//获取ctl低29位的值,代表当前工作线程的个数
int wc = workerCountOf(c);
//若工作线程个数大于第29位最大值,不可以再添加工作线程,返回false
if(wc >= CAPACITY ||
//基于core来判断添加的是否为核心工作线程
//若是核心线程:基于corePoolSize来判断
//若是非核心线程:基于maximumPoolSize来判断
wc >= (core ? corePoolSize : maximumPoolSize))
//代表不能添加,工作线程个数不满足要求
return false;
//若可以添加,直接对ctl的第29位采用CAS方式 直接+1。因为添加工作线程可能出现并发情况 使用CAS保证操作的原子性
if(compareAndIncrementWorkerCount(c))
//CAS成功后,直接退出外层for循环,代表可以执行添加工作线程的操作了
break retry;
//若CAS操作失败,重新获取一次ctl的值
c = ctl.get();
//判断重新获取的ctl中,线程池的状态跟之前的是否有区别
//若状态不一致,说明有变化,重新去判断线程池的状态
if(runStateOf(c) != rs)
//跳出一次外出for循环
continue retry;
}
}
//========================添加工作线程以及启动工作线程=======================//
//声明三个变量
//工作线程启动了没 默认false
boolean workerStarted = false;
//工作线程添加了没 默认false
boolean workerAdded = false;
//工作线程 默认null
Worker w = null;
try{
//构建工作线程,并将任务传递进去
w = new Worker(firstTask);
//获取Worker中的Thread对象
final Thread t = w.thread;
//判断Thread对象是否为null,在new Worker时 内部会通过给予的ThreadFactory去构建Thread对象交给Worker
//一般若为null,代表ThreadFactory有问题
if(t != null){
//加锁,保证使用workers(HashSet集合 线程不安全)成员变量以及对largestPoolSize赋值时 保证线程安全
final ReentrantLock mainLock = this.mainLock;
//加锁原因是HashSet线程不安全 加锁
main.lock();
try{
//再次获取线程池状态
int rs = runStateOf(ctl.get());
//再次判断线程池状态
//若状态满足 rs < SHUTDOWN,说明线程池状态为RUNNING,状态正常可以添加工作线程 执行if代码块
//若状态为SHUTDOWN,且firstTask任务为null,添加非核心工作线程处理阻塞队列中的任务
if(rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)){
//到这可以添加工作线程了
//校验ThreadFactory创建线程后 线程是否已自动启动,若已启动,抛异常
if(t.isAlive())
throw new IllegalThreadStateExecption();
//private final HashSet<Worker> workers = new HashSet<Worker>();
//将new好的worker添加到HashSet中
workers.add(w);
//获取HashSet的size,拿到工作线程的个数
int s = workers.size();
//largestPoolSize-表示历史最大线程数记录
//若当前最大工作线程个数 大于历史最大线程个数记录,就赋值
if(s > largestPoolSize)
largestPoolSize = s;
//添加工作线程成功
workerAdded = true;
}
}finally{
mainLock.unLock();
}
//工作线程添加成功后 启动工作线程
if(workerAdded){
//直接启动Worker中的线程
t.start();
//启动工作线程成功
workerStarted = true;
}
}
}finally{
//做补偿的操作,若工作线程启动失败,将这个添加失败的工作线程处理掉
if(!workerStarted)
addWorkerFailed(w);
}
//返回工作线程是否启动成功
return workerStarted;
}
3. 获取任务并执行
通过执行runWorker方法来获取任务并执行,runWorker方法执行主要有以下几步
a. 获取当前线程以及worker中封装的任务
b. 判断当前任务是否为null,若不为null,直接执行。若为null,重阻塞队列中获取任务
c. 判断线程池状态以及当前线程中断标记位,若线程池状态为STOP或当前线程中断标记位为false,则中端当前线程
d. 执行任务
runWorker方法源码解析
final void runWorker(Worker w) {
//拿到当前工作线程(w对应的线程)
Thread wt = Thread.currentThread();
//拿到Worker对象中封装的的任务
Runnable task = w.firstTask;
//将worker的firstTask归位
w.firstTask = null;
// 将Worker的state状态归为0,代表可以被中断
w.unlock(); // allow interrupts
// 任务执行时,勾子函数中是否出现异常的标识,默认为true-出现异常
boolean completedAbruptly = true;
try {
//获取任务的第一个方式,就是执行execute、submit时,转入的任务直接处理
//获取任务的第二个方式,从工作队列中获取任务执行。
while (task != null || (task = getTask()) != null) {
// 加锁,在SHUTDOWN状态下,当前线程不允许被中断
// 并且Worker内部的锁,并不是可重入锁,因为中断时,也需要对worker进行lock,不能获取就代表当前线程正在执行任务
w.lock();
//如果线程池状态变为了STOP状态,必须将当前线程中断
// 第一个判断:判断当前线程池状态是否为STOP
// 第二个判断:查看线程中断标记位并归位,若为false 说明不是STOP,若变为true,需要再次查看是否是否是并发操作导致线程池为STOP
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
// 查看当前线程中断标记是否为false,若为false,就执行wt.interrupt();
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务的勾子函数,前置增强(不是动态代理),可以重写这个方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务的勾子函数,后置增强(不是动态代理),可以重写这个方法
afterExecute(task, thrown);
}
} finally {
// 将task置为null
task = null;
// 将执行成功的任务个数+1
w.completedTasks++;
// 将线程的state状态标记位设置为0,表示可以通过SHUTDOWN中断当前线程
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 自旋操作被退出,说明线程池正在结束
processWorkerExit(w, completedAbruptly);
}
}
三、总结
Executor框架主要由三部分组成,任务
,任务的执行者
,执行结果
,ThreadPoolExecutor和ScheduledThreadPoolExecutor的设计思想也是将这三个关键要素进行了解耦,将任务的提交和执行分离。线程池是一种基于池化技术的线程管理工具,能够降低资源消耗、提高响应速度、提高线程的可管理性。