文章目录
- 从源码剖析Java线程池的工作机制
- 一、序言
- 二、基础概念
- 1、线程调度模型
- 2、线程池创建方式
- (1)Executors工具类
- (2)ThreadPoolExecutor构造方法
- 2.1 核心参数
- 2.3 拒绝策略实现
- 三、源码剖析
- 1、状态控制变量ctl
- 2、线程执行execute方法
- 3、新增线程addWorker方法
- 4、工作线程Worker内部类
- 5、启动线程runWorker方法
- 6、获取阻塞队列getTask方法
- 7、退出线程processWorkerExit方法
- 8、线程池关闭
- (1)shutdown方法
- (2)shutdownNow方法
- 四、流程图
- 五、后记
从源码剖析Java线程池的工作机制
一、序言
在多核处理器环境下,多线程与并发编程已经成为提升程序响应速度和吞吐量的关键手段,对于Java工程师而言,深入理解多线程与并发编程内部机制,是构建高性能、高可用系统的基石。
本文小豪将带大家深入解读Java线程池ThreadPoolExecutor
的工作原理,从源码分析其任务处理流程,掌握线程池内部机制,帮助我们更好地应用Java线程池。
文章最后附有流程图,进一步帮我们梳理业务逻辑
二、基础概念
在上一篇【从JDK源码探究Java线程与操作系统的交互】一文中,介绍到Java中线程的底层源码实现,而我们在开发中,通常不太会直接创建线程去使用,大多数仍是通过采用线程池来统一创建、管理线程。
线程池是一种用于管理和复用线程的机制,通过预创建一定数量的线程,来减少线程创建和销毁的开销,提高程序的性能和响应速度。
1、线程调度模型
在Java中,线程池的实现建立在Executor
框架之上的,JDK为我们提供了一系列Executor
框架的接口、实现类等,便于我们创建使用线程池。
同时,在上一篇我们了解到Java线程对应着操作系统内核级线程,且为一对一映射关系,由操作系统调度所有线程并分配给CPU去执行,这里我们可以认为,Java线程由两层线程调度模型组成:
- 上层:Executor框架调度Java线程
- 下层:操作系统调度内核级线程
这里的Executor
框架是java.util.concurrent
包的一部分,它是实现线程池的顶层接口,Executor
接口只有一个方法execute(Runnable command)
,用于执行一个Runnable
任务,其主要目的就是提供一个提交任务的机制,不需要我们直接管理线程的创建和生命周期。
Executor
接口下层实现类结构关系如下:
2、线程池创建方式
基于Executor
框架,我们一般创建线程池常用两种方式:通过Executors工具类或ThreadPoolExecutor构造方法。
(1)Executors工具类
Executors
是JDK提供的创建线程池工具类,利于简化线程池的创建,内部提供了大量静态方法,这里列举几个常见的,源码如下:
public class Executors {
// 创建一个固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// 创建一个单线程的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO)执行
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// 创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 创建一个可以执行延迟任务的线程池,支持定时及周期性任务执行
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
但现在大家早已不用Executors
工具类来创建线程池了,太死板而且有内存溢出OOM风险,相信没有小伙伴还不知道这一点,具体原因为何大家可阅读阿里的Java开发手册。
内存溢出及内存泄漏小豪在之前有写过一篇【深入JVM:线上内存泄漏问题诊断与处理】,里面详细介绍了内存泄漏的检测方法和解决策略。
(2)ThreadPoolExecutor构造方法
根据Executor
接口的类结构关系,以及Executors
工具类中静态方法的源码,不难发现底层都是通过ThreadPoolExecutor
对象的去实现的。
一般开发中,也基本都是通过实例化ThreadPoolExecutor
类的方式,通过其构造方法设置线程池参数,自定义创建线程池,这种方法相对来说更灵活,也规避资源耗尽的风险。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 参数校验
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
// 入参赋值给成员变量
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2.1 核心参数
ThreadPoolExecutor
构造方法一共有7个参数,分别是:
-
① 核心线程数(corePoolSize)
线程池中的核心线程数量,用于定义最少的线程个数,即使空闲默认也不会被销毁(除非设置
allowCoreThreadTimeOut
属性,允许核心线程超时销毁)。创建线程池后,线程池中默认也没有核心线程的,等提交任务之后才会优先创建核心线程,另外,也可以设置创建线程池之后立即初始化核心线程
-
② 最大线程数(maximumPoolSize)
线程池中的允许的最大线程数量,当核心线程全部在繁忙且任务队列存满之后,线程池会临时追加非核心线程,直到总线程数达到该值上限
-
③ 线程空闲等待时间(keepAliveTime)
线程的空闲等待时间,当线程空闲时间超过配置的等待时间,线程就会被销毁,默认销毁的是非核心线程,如果设置核心线程也可被销毁,则核心线程空闲时间超过等待时间也会被销毁
-
④ 空闲等待时间单位(unit)
线程空闲等待时间的单位,秒、分、小时等
-
⑤ 任务队列(workQueue)
存放待执行的任务的队列(采用阻塞队列),当核心线程均繁忙时,新提交的任务会存放到该任务队列中,等待被线程执行
-
⑥ 线程工厂(threadFactory)
线程工厂,用来创建线程,通常用它来自定义线程名称
-
⑦ 拒绝策略(handler)
线程池中的拒绝(饱和)策略,当任务太多无法处理时(任务队列
workQueue
已满,正在执行的线程数达到maximumPoolSize
),采用哪种拒绝策略去处理新任务。
2.3 拒绝策略实现
ThreadPoolExecutor
构造方法中的拒绝策略参数,我们需要额外注意一下。
JDK提供了四种内置的拒绝策略:
-
AbortPolicy:丢弃任务并抛出
RejectedExecutionException
异常,线程池默认的拒绝策略public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } // 直接抛出异常 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
-
DiscardPolicy:直接丢弃任务
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } // 什么都没有处理 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
-
DiscardOldestPolicy:丢弃阻塞队列中最靠前的任务
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 弹出最先进入阻塞队列中的任务(即时间最长的) e.getQueue().poll(); // 执行当前新的任务 e.execute(r); } } }
-
CallerRunsPolicy:由提交任务的线程去执行当前任务
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 由调度线程执行该任务 r.run(); } } }
同时也支持我们定制化拒绝策略,实现RejectedExecutionHandler
接口,自定义拒绝策略:
public class CustomRejectedExecution implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 自定义拒绝策略
// 存放至缓存...
}
}
自定义拒绝策略一般使用较多,当任务量较大时,我们可以先将拒绝的任务放入缓存或数据库,待压力较小时再依次消费处理。
三、源码剖析
熟悉过线程池的基础概念之后,我们开始分析ThreadPoolExecutor
线程池处理任务流程的源码实现。
在上文我们了解到,顶层Executor
接口的execute()
方法用来执行Runnable
任务,在ThreadPoolExecutor
中,也是通过execute()
方法提交任务并执行,因此,我们从execute()
方法作为入口,分析其源码。
在此之前,还有一个概念需要了解,一个成员变量ctl
1、状态控制变量ctl
在ThreadPoolExecutor
中,将线程池状态和工作线程数采用一个原子整数成员变量ctl
进行存储:
public class ThreadPoolExecutor extends AbstractExecutorService {
// (重要)线程池状态控制变量
// 两个含义:高3位代表当前线程池状态、低29位代表当前工作线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 1字节等于8位,int类型是占用4字节,COUNT_BITS = 32 - 4 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池容量,1左移29位再减1,CAPACITY约等于5亿多
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态
// RUNNING状态,可以处理新的任务和阻塞队列中的任务
// 位图111...
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN状态,不再处理新任务,但会处理阻塞队列中的任务
// 位图000...
private static final int SHUTDOWN = 0 << COUNT_BITS;
// STOP状态,不再处理新任务,且不处理阻塞队列中的任务
// 位图001...
private static final int STOP = 1 << COUNT_BITS;
// TIDYING状态,过度状态,所有任务都已经执行完毕,即将关闭线程池
// 位图010...
private static final int TIDYING = 2 << COUNT_BITS;
// TERMINATED状态,线程终止
// 位图011...
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取线程状态,根据&运算特点,始终获取ctl高3位,即当前线程池状态
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
// 计算工作线程数,根据&运算特点,始终获取ctl低29位,即当前工作线程数
private static int workerCountOf(int c) {
return c & CAPACITY;
}
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
// 其它代码省略......
}
这里的状态控制变量ctl
设计的十分巧妙,通过一个AtomicInteger
类型存储了两个变量值,AtomicInteger
类型占用4字节32位,操作时都是基于位运算,高3位代表当前线程池状态、低29位代表当前工作线程数。
同时这里也确保了共享变量的线程安全,利用AtomicInteger
的原子操作,确保线程池状态和工作线程数始终是保持统一的。
不懂位运算以及这里为何可以通过
runStateOf()
、workerCountOf()
方法获取到线程状态和工作线程数的小伙伴,可以自己写个Demo验证一下这个结论,本篇着重分析线程池工作流程的源码,这里不做过多赘述。小豪读到这里时也不得不感叹Java并发编程大师Doug Lea确实流弊!
2、线程执行execute方法
通过状态控制变量ctl
,我们得到两个方法:
- runStateOf(int c):获取当前线程状态
- workerCountOf(int c):获取当前工作线程数
接着我们正式进入ThreadPoolExecutor
的execute
方法,源码如下:
public void execute(Runnable command) {
// 参数验证,判断任务是否为空,为空则抛出空指针异常
if (command == null)
throw new NullPointerException();
// 获取状态控制变量clt的值,用于后续获取线程池状态和工作线程数量
int c = ctl.get();
// 阶段一:启动核心线程 -----------------------------------------
// 判断当前工作线程数小于核心线程数corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 创建一个核心线程,并把任务添加到该线程中运行
// addWorker()方法:参数二为true,代表添加核心线程;参数二为false,代表添加非核心线程
if (addWorker(command, true))
return;
// 若添加失败,则重新获取控制变量clt的值;
c = ctl.get();
}
// 走到这里,说明workerCountOf(c) >= corePoolSize,核心线程数已满
// 阶段二:核心线程数已满,任务添加至阻塞队列 -----------------------
// 判断当前线程池是否为运行状态 且 新任务添加到阻塞队列成功(即阻塞队列未满)
if (isRunning(c) && workQueue.offer(command)) {
// 重新获取ctl的值
int recheck = ctl.get();
// 再次检验线程池是否是运行状态
// 如果不是运行状态,由于已经把任务添加到workQueue阻塞队列中了,此时需移除该任务
if (!isRunning(recheck) && remove(command))
// 线程池不是运行状态 且 移除任务成功,触发拒绝策略
reject(command);
// 线程池是运行状态
// 获取线程池中的有效线程数,如果是0,则新建一个非核心线程(确保线程池在RUNNING状态下至少有一个线程执行任务)
else if (workerCountOf(recheck) == 0)
// addWorker()方法:参数一为null,代表在线程池中新创建一个线程,但没有传入任务
// 任务已经被添加到workQueue阻塞队列中,新建的非核心线程会从workQueue中获取任务来执行
addWorker(null, false);
}
// 走到这里,说明两种情况:
// 1.线程池已经不是RUNNING状态
// 2.线程池是RUNNING状态,但workQueue阻塞队列已满(核心线程数在之前也以及满了)
// 阶段三:阻塞队列已满,启动非核心线程 -----------------------------
// 再次调用addWorker()方法,参数二为false,代表创建一个非核心线程
else if (!addWorker(command, false))
// 创建非核心线程失败,执行拒绝策略(工作线程数已达到maximumPoolSize最大线程数的限制)
reject(command);
}
在源码中,execute
方法大致包含三个阶段:
- 阶段一:当前工作线程数小于核心线程数,创建核心线程
- 阶段二:核心线程数已满(当前工作线程数大于等于核心线程数),任务添加至阻塞队列
- 阶段三:阻塞队列已满(且核心线程数已满),创建非核心线程,若非核心线程已满,则执行拒绝策略
execute
方法的整体逻辑比较清晰,其中多次出现addWorker()
方法,我们继续往下走。
3、新增线程addWorker方法
addWorker()
方法主要用于新增工作线程,其主要参数有两个:
- 参数一:Runnable firstTask 任务实例
- 不为null:创建线程并启动传入的任务实例
- 为null:创建线程,但不启动任务,后续从阻塞队列中获取任务
- 参数二:boolean core 是否核心线程
- true:创建核心线程
- false:创建非核心线程
具体源码如下:
private boolean addWorker(Runnable firstTask, boolean core) {
// 标志位
retry:
// 外层死循环
for (; ; ) {
// 获取状态控制变量clt的值
int c = ctl.get();
// 获取当前线程池状态
int rs = runStateOf(c);
// 判断是否需要新创建工作线程
// 如果当前线程池非运行状态(rs >= SHUTDOWN,代表状态为SHUTDOWN、STOP、TIDYING、TERMINATED其中之一)
// 并且
// 当前线程池状态>SHUTDOWN 或 新任务不为空 或 阻塞队列为空
// 均不需要创建新的工作线程,直接返回false
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
// !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())
// 这段代码逻辑有点绕,其实就是线程池为SHUTDOWN时,会停止接收新任务,但还是会处理完阻塞队列中的任务,如过是这种情况,这里就不会返回false
// 内层死循环
// 采用CAS,将工作线程个数+1
for (; ; ) {
// 获取当前线程池工作线程数
int wc = workerCountOf(c);
// 判断当前工作线程数是否符合要求
// 当前工作线程数是否>=线程池容量 或 根据参数二core判断当前工作线程数是否>=核心线程数/最大线程数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试将线程池工作线程数量+1
if (compareAndIncrementWorkerCount(c))
// 成功则退回标志位,即结束外层循环
break retry;
// workerCount线程数量添加失败,重新获取控制变量ctl的值
c = ctl.get();
// 如果当前线程池的运行状态不等于上面rs,代表在此期间线程池运行状态已经发生改变
if (runStateOf(c) != rs)
//返回外层for循环继续执行
continue retry;
// 若仍未添加数量成功,则CAS自旋重新尝试添加
}
}
// 创建两个状态标记
// 标记线程是否启动
boolean workerStarted = false;
// 标记线程是否添加
boolean workerAdded = false;
Worker w = null;
try {
// 将firstTask当前任务封装为Worker对象
w = new Worker(firstTask);
// 拿到worker对象创建的线程
final Thread t = w.thread;
if (t != null) {
// 加锁ReentrantLock,确保线程安全
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 获取当前线程池状态
int rs = runStateOf(ctl.get());
// 校验当前线程池状态
// rs < SHUTDOWN 代表当前线程是否为RUNNING运行状态
// 或
// 线程池状态为SHUTDOWN状态 且 firstTask新任务为null(线程池状态SHUTDOWN时,仍会执行workQueue中的任务)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 判断当前线程已经还存活
if (t.isAlive())
throw new IllegalThreadStateException();
// 添加到工作线程集合workers中
// workers是一个HashSet集合,线程不安全,因此上面需要加锁
workers.add(w);
int s = workers.size();
// 统计largestPoolSize线程池中的最大线程数量
if (s > largestPoolSize)
// 更新最大值
largestPoolSize = s;
// worker线程添加成功,标记线程已添加
workerAdded = true;
}
} finally {
// 解锁
mainLock.unlock();
}
// 判断线程是否添加成功
if (workerAdded) {
// 成功则启动线程
t.start();
// 标记worker线程已启动
workerStarted = true;
}
}
} finally {
// 判断线程是否未启动成功
if (!workerStarted)
// 在workers集合中移除该worker
addWorkerFailed(w);
}
// 返回worker线程是否启动成功
return workerStarted;
}
观察源码发现,addWorker()
方法主要分为两步:
-
校验线程池状态和工作线程数
- 首先判断当前线程池状态是否非运行状态,并且当前线程池状态不为
SHUTDOWN
或新任务不为空或阻塞队列为空,这两种情况均不需要创建新的工作线程,直接返回false
- 紧接着判断当前工作线程数是否符合要求,是否超过线程池容量或核心/最大线程数(根据参数二
core
判断),超过则直接返回false
- 否则通过
CAS
不断尝试将工作线程数量+1
,成功则跳出循环,执行下一步
- 首先判断当前线程池状态是否非运行状态,并且当前线程池状态不为
-
创建worker线程对象并启动线程
-
创建
worker
对象,封装firstTask
任务,采用独占锁将worker
工作线程对象放入工作队列workers
集合,同时更新最大线程数的值,最后启动线程。// 将firstTask当前任务封装为Worker对象 w = new Worker(firstTask)
-
若线程启动失败则调用
addWorkerFailed()
方法在workers
集合中移除该worker
对象
-
4、工作线程Worker内部类
在ThreadPoolExecutor
中,线程统一被封装为Worker
内部类,即具体的工作线程类,Worker
继承自AQS
并实现了Runnable
接口:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
// 当前的线程实例
final Thread thread;
// 当前的任务实例
Runnable firstTask;
// 记录线程完成的任务数
volatile long completedTasks;
// 构造方法,入参为firstTask任务实例
Worker(Runnable firstTask) {
// (重要)设置worker线程状态为-1,表示禁止线程中断
setState(-1);
// 赋值成员变量,保存firstTask任务实例
this.firstTask = firstTask;
// 通过线程工厂创建新线程(传入当前对象),真正的线程
this.thread = getThreadFactory().newThread(this);
}
public void run() {
// (重要)线程启动时执行的任务
runWorker(this);
}
// 是否持有独占锁
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 尝试获取独占锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 尝试释放独占锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() {acquire(1);}
public boolean tryLock() {return tryAcquire(1);}
public void unlock() {release(1);}
public boolean isLocked() {return isHeldExclusively();}
// 线程启动后,进行线程中断时执行方法
void interruptIfStarted() {
Thread t;
// worker中状态 >= 0 且 线程不为空 且 当前线程未中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
// 中断线程
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker
类的构造方法首先会通过setState(-1)
设置线程状态state
为-1
,当state
为-1
时,代表线程此时不能被中断,这里采用的是**AQS
的独占锁模式**,只有当state
不为-1
(独占锁被释放),线程才被允许中断,具体中断逻辑在interruptIfStarted()
方法。
在创建完worker
工作线程对象后,会启动该工作线程,调用woker
的run()
方法,run()
方法内部又调用了ThreadPoolExecutor
类的runWorker()
方法。
5、启动线程runWorker方法
runWorker()
方法会执行具体的任务,源码如下:
final void runWorker(Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
// 获取worker构造方法传入的任务实例
Runnable task = w.firstTask;
w.firstTask = null;
// 解锁,设置worker状态state为0,表示允许线程中断
w.unlock();
// 标识线程退出原因,true代表线程异常退出,false代表线程正常结束
boolean completedAbruptly = true;
try {
// 判断当前任务是否不为空 或 从阻塞队列获取成功
// getTask()方法从阻塞队列获取一个任务
while (task != null || (task = getTask()) != null) {
// 加锁处理并发问题,防止shutdown()时终止正在执行的worker
w.lock();
// 判断如果线程池状态 >= STOP 且线程未被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 中断线程
wt.interrupt();
try {
// 可自定义实现的扩展点,执行前的扩展方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务,对应任务的run()方法
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
// 扩展点,执行后的扩展方法
afterExecute(task, thrown);
}
} finally {
// 任务对象置空
task = null;
// 完成的工作任务数+1
w.completedTasks++;
// 解锁
w.unlock();
}
}
// 标识线程为正常结束退出
completedAbruptly = false;
} finally {
// 处理worker线程退出
processWorkerExit(w, completedAbruptly);
}
}
其内部流程主要如下:
- 首先解锁
worker
,允许线程中断 - 接着通过
while
循环判断当前任务是否为空,若为空则调用getTask()
方法从阻塞队列中获取待处理的任务 - 获取到任务后工作线程直接加锁,同时根据线程池状态判断是否中断当前线程
- 执行对象任务的
run()
方法,在任务执行前后均提供扩展点可自定义扩展 - 重复第
2
步的逻辑,直到阻塞队列中的任务全部执行完毕,最后调用processWorkerExit()
方法处理worker
线程退出
这里我们也发现,线程池中的线程复用实际上就是通过runWorker()
方法中while
循环实现的,worker
工作线程不断的从阻塞队列中获取新的任务,直接调用任务的run()方法,避免去创建新线程,这一过程实现了线程复用。
6、获取阻塞队列getTask方法
getTask()
方法用来获取阻塞队列中的任务,源码如下:
private Runnable getTask() {
// 标记上一次从阻塞队列获取任务是否超时
boolean timedOut = false;
// 死循环
for (; ; ) {
// 获取ctl值
int c = ctl.get();
// 获取当前线程池状态
int rs = runStateOf(c);
// 判断是否需获取任务
// 若线程池状态 >= STOP 或 (线程池状态为SHUTDOWN 且 阻塞队列为空),则不需要再处理任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 工作线程数-1
decrementWorkerCount();
// 直接返回
return null;
}
// 获取当前工作线程数
int wc = workerCountOf(c);
// timed标记用于判断是否需进行超时控制
// allowCoreThreadTimeOut是否运行核心线程超时销毁(默认为false,核心线程不允许超时销毁)
// wc > corePoolSize当前工作线程数大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 工作线程数 > maximumPoolSize
// 或
// timed && timedOut,timedOut首次进入默认为false
if ((wc > maximumPoolSize || (timed && timedOut))
// 工作线程数大于1
// 或
// 阻塞队列为空
&& (wc > 1 || workQueue.isEmpty())) {
// 采用CAS,工作线程数-1,并且返回null
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 执行到这,说明前面线程池状态和工作线程数校验已通过,开始从阻塞队列获取任务
try {
// timed为true代表核心线程允许超时销毁 或 当前工作线程数大于核心线程数(存在非核心线程)
// timed = true:超时等待获取任务poll(),超时未获取到则返回null
// timed = false:阻塞获取任务take(),一直等着直到获取到任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 判断是否获取到任务
if (r != null)
// 获取到直接返回
return r;
// 未获取到,代表等待超时也没有拿到任务,设置timedOut值为true
timedOut = true;
} catch (InterruptedException retry) {
// 即使异常也循环重试
timedOut = false;
}
}
}
大致过程其实就是从workQueue
阻塞队列中获取任务,在获取之前也会校验线程池状态和工作线程数是否合规,最后根据条件选择超时等待获取还是阻塞获取。
7、退出线程processWorkerExit方法
runWorker()
方法执行结束之后在finally
代码块中会调用processWorkerExit()
方法,主要用来销毁空闲线程,具体源码如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// completedAbruptly:true代表线程异常退出,false代表线程正常结束
if (completedAbruptly)
// 异常退出,工作线程数-1
decrementWorkerCount();
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 统计当前worker完成的任务数,累加到线程池总完成任务数
completedTaskCount += w.completedTasks;
// 从线程池HastSet集合中移除当前工作线程
workers.remove(w);
} finally {
// 解锁
mainLock.unlock();
}
// 判断是否终止线程池
tryTerminate();
// 获取ctl值
int c = ctl.get();
// 当前线程池状态 < STOP,即RUNNING或SHUTDOWN状态时
if (runStateLessThan(c, STOP)) {
// 判断线程是否为正常结束
if (!completedAbruptly) {
// 正常结束退出
// allowCoreThreadTimeOut是否运行核心线程超时销毁
// 若允许,min最小线程数为0,不允许则为corePoolSize核心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 判断workQueue阻塞队列是否不为空
if (min == 0 && !workQueue.isEmpty())
// 确保至少有一个worker线程
min = 1;
//判断工作线程数 >= min最小线程数
if (workerCountOf(c) >= min)
// 符合条件直接返回
return;
}
// 若线程为异常退出或不符合条件(工作线程数 < min最小线程数)
// 创建一个非核心线程执行任务
addWorker(null, false);
}
}
源码中,会将worker工作线程从线程池HastSet集合中移除,最后也会确保当前工作线程数不小于最小线程数,通过processWorkerExit()
方法,控制了线程池中线程数量的大小变化。
8、线程池关闭
线程池关闭有两种方法,分别是shutdown()
和shutdownNow()
(1)shutdown方法
shutdown()
方法首先会将线程池状态设置为SHUTDOWN
,不再接收新的任务。
内部正在执行的任务和阻塞队列中的任务都会继续执行,待均执行完毕之后会关闭线程池。
具体源码如下:
public void shutdown() {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 安全检查
checkShutdownAccess();
// 设置线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断线程池中空闲的线程
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
}
// 判断是否终止线程池
tryTerminate();
}
(2)shutdownNow方法
shutdownNow()
方法则是将线程池状态设置为STOP
,不再接收新的任务。
但所有线程都会被中断(已经拿到任务正在执行的不会被中断),且阻塞队列中的任务会被丢弃,不再执行了,并且返回未执行的任务队列。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 设置线程池状态为STOP
advanceRunState(STOP);
// 中断线程池中所有线程
interruptWorkers();
// 清空阻塞队列,赋值给tasks返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 判断是否终止线程池
tryTerminate();
// 返回阻塞队列中未被执行的任务
return tasks;
}
四、流程图
五、后记
本文着重带大家分析了ThreadPoolExecutor
线程池处理任务流程的源码实现,过程中额外介绍了线程池的创建方式以及核心参数,详细通过本文,大家更深入的理解了线程池的原理和工作机制。
但在我们实际开发中,使用线程池主要关注线程池大小的设置,合理配置线程池大小能够更高的提升程序性能和稳定性,国内通用的共识即根据CPU密集型和IO密集型来设定线程数,这里面其实是基于任务的阻塞系数来商榷增减线程数,一般阻塞时间占比越高,设置越多线程,CPU计算时间所占越高,设置越少线程。
网上有很多配置线程池大小的公式,但这些公式无法适用于所有业务场景,我们只能根据实际场景,通过压测及性能监控等手段,不断的调整线程池大小,确定出合适的线程数量,实现最高的任务处理效率。
下一篇,小豪将会继续更新Java多线程与并发编程相关内容,创作不易,如果大家觉得内容对你有收获,不妨考虑关注关注小豪~