【并发】Java并发线程池底层原理详解与源码分析(下)
前情回顾
上篇文章地址
【并发】Java并发线程池底层原理详解与源码分析(上)_面向鸿蒙编程的博客-CSDN博客线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。Executors 返回的线程池对象的弊端:1) FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。2.CachedThreadPool:允许的创建线程数量为 MAX_VALUE,可能会创建大量建大量的线程,从而导致 OOMhttps://blog.csdn.net/weixin_43715214/article/details/128045130
遗留问题解析
手动实现线程池代码
public class ThreadPoolDemo {
public static void main(String[] args) {
// 自定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10));
for (int i = 1; i <= 100; i++) {
threadPoolExecutor.execute(new MyTask(i));
}
}
}
class MyTask implements Runnable {
int i = 0;
public MyTask(int i) {
this.i = i;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "程序员做第" + i + "个项目");
try {
Thread.sleep(3000L);//业务逻辑
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行结果
我们在上一个章节中提到了手动实现线程池,但是它的结果似乎有点匪夷所思!
为什么11~20会在最后面?
为什么运行会抛异常?
为什么打印日志只有前30个?
!!!先说结论!!!
在案例代码中,有100个任务,1个任务的处理时间大致需要3秒,10个核心线程,10个临时线程,阻塞队列的容量是10。
这里只会打印会前30个任务(10+10+10=30),由于在3s内核心线程和临时线程都在忙碌中,队列也满了,按照ThreadPoolExecutor默认的策略会抛出异常!
按照线程池的工作顺序,会先分配10个核心线程(1~10),再装满队列(11~20),最后分配临时线程(21~30);执行逻辑是核心线程和临时线程会先把“手头上”的任务处理完,才会去处理队列里的任务,这就是队列里的任务(11~20)最后打印的原因!!!
ThreadPoolExecutor 源码分析
接下来,我将会结合线程池ThreadPoolExecutor的源码来解析这个问题!!!
线程池源码结构
顶层接口是Executor,它只有一个方法execute();ExecutorService接口继承了Executor接口,它主要是提供了submit()。
【面试题】 execute()方法与submit()方法有什么区别???
submit
的底层还是调用了execute
(1)关于返回值的问题
submit:
有返回值,返回值(包括异常)被封装于FutureTask
对象。适用于有返回结果的任务。execute:
void
类型的函数,没有返回值,适用于没有返回的任务。
(2)关于异常处理的问题
submit:
submit
的时候并不会抛出异常(此时线程可能处于就绪状态)。只有在get
操作的时候会抛出。因为get
操作会阻塞等待线程的执行完毕。execute
:在执行的时候会直接抛出。可以通过实现UncaughtExceptionHandler
接口来完成异常的捕获。
ThreadPoolExecutor 变量详解
// 记录线程池的状态信息,高三位是状态信息,其余位表示工作的worker数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 表示worker数量
private static final int COUNT_BITS = Integer.SIZE - 3;
// worker容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 可以接受新的任务,也可以处理队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
// 不接受新的任务,但是可以处理队列里的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 后面3个都不行!!!
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
ThreadPoolExecutor 之 execute()方法详解
public void execute(Runnable command) {
// 要提交的任务不能是null,否则就抛异常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/*
* 使用核心线程数的限制去开worker来执行这个任务
* addWorker会失败,可能是因为线程池状态或者worker的数量引起addWorker失败(后面分析addWorker方法的时候再说!)
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 如果addWorker失败,则ctl要重新获取。
// 因为不管是状态变,还是worker数量变,ctl都已经变了,你需要重新获取最新值。
c = ctl.get();
}
/*
* 在当前worker数量大于等于corePoolSize或者上面的addWorker失败之后才会走到这里
* 经过上面分析可能有两种可能 1、线程的状态发生改变 2、当前worker数量不小于核心线程数
* (1)查看一下当前线程的状态是否是running状态
* (2)在满足(1)下会尝试往工作队列里面添加这个任务,但是有可能失败(工作队列可能满了)
*/
if (isRunning(c) && workQueue.offer(command)) {
// 走到这里说明offer之前是running状态 放入工作队列成功了
// 需要重新获取当前状态 因为有可能放进去之后线程状态变了!!!
int recheck = ctl.get();
/*
* 如果offer之后线程池不是running了 需要尝试remove刚才的任务
* 不是running的状态下,remove也有可能失败,他可能被执行了
*/
if (! isRunning(recheck) && remove(command))
// 如果remove成功了需要拒绝这个任务
reject(command);
/*
* 走到这里,一定是offer成功了
* 这个判断是为了防止没有worker,但是队列里面有任务,没人执行
* 可能是工作一段时间后worker的数量为0 和 allowCoreThreadTimeOut()这个方法有关系
* 上一个if判断的!isRuning 是true remove失败的时候 有可能 workerCountOf(recheck) == 0 为true
* 这个时候线程池肯定是不让你再添加线程的
*/
else if (workerCountOf(recheck) == 0)
/*
* 如果出现线程池是running worker是0,队列有任务,需要添加一个worker执行这些任务
* 如果出现线程池不是running,但是remove失败,worker是0,线程池是不允许添加worker的这个逻辑在addworker方法里面!
*/
addWorker(null, false);
}
/*
* 如果用核心线程数限制开worker执行任务失败
* 或者 线程池状态不是running
* 或者 工作队列已经满了
* 使用最大线程数限制开worker执行任务
*/
else if (!addWorker(command, false))
// 失败的原因有
// 1、worker达到非核心线程数
// 2、线程池的状态变了不是running了 则拒绝这个任务
reject(command);
}
ThreadPoolExecutor 之 runWorker()方法详解
final void runWorker(Worker w) {
// 获取当前线程对象的引用
Thread wt = Thread.currentThread();
// 获取worker的firstTask
Runnable task = w.firstTask;
// 获取完之后把worker的firstTask置为null 防止下次获取到
w.firstTask = null;
// 初始化worker的state = 0, exclusiveOwnerThread = null 解锁
w.unlock();
// 如果发生异常 当前线程突然退出 该值为true
boolean completedAbruptly = true;
try {
// 如果firstTask获取getTask能获取到任务 则进行内层逻辑, 如果getTask返回null则循环退出了就要
while (task != null || (task = getTask()) != null) {
/*
* worker设置独占锁
* shutdown 时会判断当前worker的状态,根据独占锁的状态来判断worker是否在处理任务是否工作
*/
w.lock();
/*
* 3个判断
* 1、runStateAtLeast(ctl.get(), STOP)为真说明当前状态大于等于STOP 此时需要给他一个中断信号
* 2、wt.isInterrupted()查看当前是否设置中断状态如果为false则说明为设置中断状态
* 3、Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) 获取当前中断状态且清除中断状态
* 这个判断为真的话说明当前被设置了中断状态(有可能是线程池执行的业务代码设置的,然后重置了)且当前状态变成了大于等于STOP的状态了
*
* 整个判断为真的两种情况
* 1、如果当前线程大于等于STOP 且未设置中断状态 整个判断为true 第一个runStateAtLeast(ctl.get(), STOP)为true !wt.isInterrupted()为true
* 2、第一次判断的时候不大于STOP 且当前设置了中断状态(Thread.interrupted()把中断状态又刷新了) 且设置完了之后线程池状态大于等于STOP了
* Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) 为true !wt.isInterrupted()为true
*
* 结合if判断里面的代码来看
* 也就是说如果线程池状态大于等于STOP则设置当前线程的中断状态
* 如果线程池状态小于STOP则清除中断状态
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
// 设置中断状态
wt.interrupt();
try {
// 钩子方法留给子类实现
beforeExecute(wt, task);
try {
// task可能是FutureTask或者普通Runnable/Callable接口实现类
task.run();
// 钩子方法 留给子类实现
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
// 局部task设置为null
task = null;
// 完成数量加1
w.completedTasks++;
// 使用unlock 释放独占锁
w.unlock();
}
}
// getTask的返回为null 会走到这行 表示这次不是异常退出
completedAbruptly = false;
} finally {
/*
* 执行线程退出逻辑
* 如果completedAbruptly是true说明是task.run()方法有异常 先catch后又抛了出来 在执行完了w.unlock();走到了这里
* 如果是false则说明是拿不到任务走到了这里
*/
processWorkerExit(w, completedAbruptly);
}
}
ThreadPoolExecutor 之 addWorker()方法详解
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 每次for循环都需要获取最新的ctl值
for (int c = ctl.get();;) {
/*
* 这个地方的判断为true时可以分成三种情况
* 1、线程池的状态是大于等于 STOP的 这个判断为true
* 2、线程池状态是SHUTDOWN 但是 firstTask 是null 对应execute 的 else if (!addWorker(command, false)) 状态大于等于SHUTDOWN时不接受任务
* 3、线程池状态是SHUTDOWN 工作队列已经空了 对应execute的 在状态大于等于SHUTDOWN时用addWorker(null, false) 执行队列里面的任务
*/
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()) )
return false;
for (;;) {
// 如果core是true使用的核心线程数配置 否则使用maximumPoolSize
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 使用CAS的方法给ctl的worker的数量加1 成功则跳出最外层循环
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS不成功则重新获取ctl的值 因为不成功ctl的值一定变了 CAS嘛
c = ctl.get();
// 如果不成功的原因是状态变了 就重新进行外层循环
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}
// 能走到这里一定是说明CAS成功了 那么就可以进行创建worker执行任务了
// woker是否执行了start
boolean workerStarted = false;
// worker 是否添加到workers成功
boolean workerAdded = false;
Worker w = null;
try {
// 构建一个worker thread的target是worker所以调用t.start会执行worker的run方法,最后调用到runWorker方法
w = new Worker(firstTask);
// worker 的thread是 线程工厂的newThread方法创建的
final Thread t = w.thread;
if (t != null) {
// 需要操作workers 加锁执行
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
// 如果线程池状态是running 或者 是shutdown但是firstTask是null(kanexecute方法 防止没有worker执行队列里面的任务)
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 如果线程的状态不是NEW说明线程不是经过线程池开启的 抛异常
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
// 把worker添加到workers里面
workers.add(w);
workerAdded = true;
// 更新当前最大线程数量 maximumPoolSize 和 corePoolSize可以在线程池创建之后动态修改的
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
/*
* 如果添加成功 就启动这个线程
* 上面线程池状态判断没通过 或者 线程的状态不是NEW 就不会执行 workerAdded = true;
* 线程就不会启动 workerStarted 就是false
*/
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 如果没有执行过t.start() 就要把这个woker从workers里面剔除并且 ctl里面worker数量减一
addWorkerFailed(w);
}
return workerStarted;
}