ThreadPoolExecutor 线程池源码学习
1.阅读源码
1.ThreadPoolExecutor.execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// ctl 高三位记录线程状态。低29位记录线程池中线程数
int c = ctl.get();
//位运算获取工作线程数 如果少于核心线程数,继续创建核心线程
if (workerCountOf(c) < corePoolSize) {
//当前线程池添加线程任务 两个参数的意义放后面在看
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果当前线程池状态为 RUNNING 状态,将当前线程任务添加到等待队列中。调用的offer方法,队满返回false
if (isRunning(c) && workQueue.offer(command)) {
//再次获取ctl
int recheck = ctl.get();
//如果当前线程池状态已经发送变更 不在是 RUNNING 状态 ,则调用remove方法删除刚刚添加到队列中的线程任务
if (! isRunning(recheck) && remove(command))
//调用拒绝策略
reject(command);
//如果当前活跃线程数等于0
else if (workerCountOf(recheck) == 0)
//当前线程池添加线程任务
addWorker(null, false);
}
//添加非核心线程
else if (!addWorker(command, false))
reject(command);
}
2. ThreadPoolExecutor.addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
//标记外部循环
retry:
for (;;) {
int c = ctl.get();
//获取线程池状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//线程池状态检查
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//内部循环
for (;;) {
//获取活跃线程数
int wc = workerCountOf(c);
//如果大于最大上限 或 大于 核心线程数
if (wc >= CAPACITY ||
// core -> addworker方法的第二个参数
wc >= (core ? corePoolSize : maximumPoolSize))
// 添加失败返回
return false;
//将当前线程池活跃线程数加一
if (compareAndIncrementWorkerCount(c))
//跳出外部循环
break retry;
//如果活跃线程数修改失败 再次检查线程池状态
c = ctl.get(); // Re-read ctl
//发生变化
if (runStateOf(c) != rs)
//结束本次外部循环,进入下一次循环,直到修改成功或返回false
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//结束循环到此
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建新的线程任务
w = new Worker(firstTask);
//从worker中获取当前线程
final Thread t = w.thread;
//如果不为null
if (t != null) {
//上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//获取线程池状态
int rs = runStateOf(ctl.get());
//如果线程池状态是Running 或 (SHUTDOWN,但是任务为null)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//当前线程是活跃的(新建线程不会被启动)
if (t.isAlive()) // precheck that t is startable
//抛出异常
throw new IllegalThreadStateException();
//添加到workers容器中
workers.add(w);
//获取当前线程池中所有的worker数量
int s = workers.size();
//记录历史最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
//解锁
mainLock.unlock();
}
if (workerAdded) {
//启动当前线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
3.Worker.run
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
//当前线程
Thread wt = Thread.currentThread();
//线程任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果当前worker包装的task不为null,执行当前task 如果为null,则调用getTask()从阻塞队列中获取线程任务
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//线程任务执行的前置方法 默认空方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//真正执行线程任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//线程任务的后置处理方法
processWorkerExit(w, completedAbruptly);
}
}
4.ThreadPoolExecutor.getTask
private Runnable getTask() {
//是否超时
boolean timedOut = false;
for (;;) {
int c = ctl.get();
//当前线程池状态
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//当前线程池中活跃的线程数量
int wc = workerCountOf(c);
// allowCoreThreadTimeOut 是否需要进行超时控制 默认为false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//compareAndDecrementWorkerCount 是将活跃线程数减一
//满足以下情况将执行 :
// 1.活跃线程大于最大线程数 或者 上次从队列中获取超时
// 2.活跃线程大于1,但等待队列是空的
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//将活跃线程数减一
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//上次从等待队列中获取是否超时? poll 队列为空时超时等待 : take 队列为空时一直等待
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
//获取成功返回线程任务
return r;
//获取失败标记超时
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
5.ThreadPoolExecutor.processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
//从Set集合中移除已执行run方法的 worker,让jvm进行回收
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
//是否抛出异常
if (!completedAbruptly) {
//线程替换逻辑判断
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//woker异常补偿
addWorker(null, false);
}
}
2. 源码分析
1.核心线程和非核心线程的区别
//添加核心线程任务 command 为线程任务
addWorker(command, true)
//添加非核心线程任务 command 为线程任务
addWorker(command, false)
创建时机
//线程池中活跃线程数少于核心线程数则调用addWorker(command, true)创建核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
....
}
if (isRunning(c) && workQueue.offer(command)) {
// workqueue调用offer方法返回false则会调用addWorker(command, false),尝试创建非核心线程
}else if (!addWorker(command, false))
}
消亡时机 java.util.concurrent.ThreadPoolExecutor#getTask
for (;;) {
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
当创建的是核心线程时,如果没有设置allowCoreThreadTimeOut的值true(默认值为false),则将调用queue.take方法,一直阻塞,直到队列中有任务。如果是临时线程,则调用queue.poll方法,在规定时间内未从队列中获取到线程任务,在将活跃线程数量减去一后直接返回null。只要不返回null,将会在java.util.concurrent.ThreadPoolExecutor#runWorker 一直循环获取任务并执行,而一旦超时返回null,将跳出runWorker方法,调用processWorkerExit 对临时线程所属的worker进行回收。
runWorker:
while (task != null || (task = getTask()) != null){
getTask:
for (;;) {
}
}
2.线程任务抛出异常怎么办
阅读源码部分我们看的是调用线程的execute方法,java.util.concurrent.ThreadPoolExecutor#runWorker 抛出异常后,completedAbruptly为true,java.util.concurrent.ThreadPoolExecutor#processWorkerExit
boolean completedAbruptly = true;
try {
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
try {
completedTaskCount += w.completedTasks;
//移除老的worker
workers.remove(w);
} finally {
mainLock.unlock();
}
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//添加一个新的worker。注意参数一task为null,将直接去队列中获取线程任务
addWorker(null, false);
}
代码演示
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10,
10,3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100),
new DefaultThreadFactory("main"),
new ThreadPoolExecutor.DiscardPolicy()
);
for (int i = 0; i < 2; i++) {
threadPoolExecutor.execute(()->{
System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
throw new RuntimeException();
});
}
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
for (int i1 = 0; i1 < 2; i1++) {
threadPoolExecutor.execute(()-> {
System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
});
}
输出结果. 3,4线程由于1,2抛出异常,已经被创建
Thread.currentThread().getName() = main-1-2
Thread.currentThread().getName() = main-1-1
Exception in thread "main-1-1" Exception in thread "main-1-2" java.lang.RuntimeException
at org.example.Main.lambda$main$0(Main.java:36)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:750)
java.lang.RuntimeException
at org.example.Main.lambda$main$0(Main.java:36)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:750)
Thread.currentThread().getName() = main-1-5
Thread.currentThread().getName() = main-1-6
所以当使用executor执行线程任务时,要避免抛出异常,对可能出现的异常try{}catch{}包裹处理