前言
本篇博客是《一文吃透Java线程池》系列博客的下半部分。
上半部分链接:一文吃透Java线程池——基础篇
实现机制(源码解析)
根据前面的学习,我们知道,线程池是如下的运作机制
解析:
- 一开始,核心线程数以内的任务。直接分配线程,然后线程执行任务。
- 后来任务量越来越多,超出了核心线程处理能力,就被排到了任务队列里
- 当当前任务量再大,线程池就会临时新增线程来处理,直到达到最大线程数。
- 当当前任务量超出队列承载能力和最大线程数限制,任务就会被拒绝。
- 给一开始的任务分配了线程。这些线程处理完他们各自的任务,并不会止步于此。(主次关系开始反转:线程一开始是随着任务生成的,但生成之后,任务结束了,但线程落地生根。完成了第一个任务,就开始找第二个任务,第三个任务。。。。)
- 线程找第二个,第三个任务。去哪找的呢?就是第二句中说到的BlockingQueue。
- 图中最右边的“线程回收”,指的就是一开始说的“临时新增”的线程。核心线程是不会被回收的,他们会一直尝试从BlockingQueue获取任务,否则就阻塞着。(BlockingQueue.take()的阻塞特性)
下面我看一下ThreadPoolExecutor中,几个关键的源码逻辑
线程池状态/worker数量记录:ctl
这是ThreadPoolExecutor中的一个成员变量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- 这是一个原子整数,可以保证多线程操作的原子性。
- int有32位。这个ctl被拆成了两部分:3 + 29。 高3位存储的是线程池状态(runState),低29位存的是有效线程数(也叫WorkerCount。注意:这个值特别容易把人带沟里。这并不是前面我说的”当前任务量”,我说的当前任务量,指的是正在运行的加上队列等待的。而这个单指正在运行的,和线程数量一一对应,所以叫“有效线程数”)。
这种存储方式在ReentrantReadWriteLock中的state也是这么处理的(它是对半分:16+16)。因为这两个数的操作都是原子同步的,但这两个数之间却未必同步。存在一起,免去了这样的麻烦。 - 因为拆成了“二进制”,所以围绕着这个数的修改和查看就少不了一些位运算。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;// 11100000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS;// 00000000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS;// 00100000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS;// 01000000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS;// 01100000 00000000 00000000 00000000
private static int runStateOf(int c) { return c & ~CAPACITY; }//获取线程池状态
private static int workerCountOf(int c) { return c & CAPACITY; }//获取Worker数量
private static int ctlOf(int rs, int wc) { return rs | wc; }//把runStateOf和workerCount合成完整的ctl
解析:
- Integer.SIZE - 3即 32-3=29
- CAPACITY为:00011111111111111111111111111111
- '~'是一个位运算符。用于取反操作。结果为
11100000000000000000000000000000 - 00011111111111111111111111111111和11100000000000000000000000000000是两个位运算“工具数”,
可以对c进行"与运算"。分别来求 低位的WorkerCount和高位的runState - c就是ctl,并不是一个普通的int值(所以不要当成1,2,3…来看)
- ctlOf中的’|'是一个位运算符(或运算:0|1–>1, 1|1 -> 1, 0|0 -> 0)
‘|‘和’&‘是位运算符,’||‘和’&&‘是逻辑运算符。
true和false在使用’|‘和’&‘运算时,相当于1和0。从而让’|‘和’&‘表现出可以进行逻辑运算的现象。
因此,初学时可能会产生的印象:’|’ '&'和 ‘||’ '&&'都是逻辑运算符。前者的运算逻辑是运算符前后都要运算,后者可能只需要判断一个就能得出结果(这个结论是没错的,但理解的不够深刻)。
前面有可能会有疑问的二进制解释:
任务队列:workQueue
这是ThreadPoolExecutor中的一个成员变量。
private final BlockingQueue<Runnable> workQueue;
就是前面一直说的,当任务量超过核心线程数,就会去任务队列里等着被处理。
任务的封装类:Worker
前面总在说Worker。它就是ThreadPoolExecutor的一个内部类。
一开始当任务(Runnable)被提交进来,就会被封装成Worker。Worker里还有一个线程。
可以简单理解为:Worker = Runnable + Thread
这两个东西都是Worker内的两个成员变量,如下图Worker的内部结构:
这里的任务之所以被叫做firstTask。是因为thread处理完它,就会抛弃它(事实上是抛弃它所属的整个Worker),去处理下一个任务。因为前面打比喻时,就说过,Worker相当于给医生提供的门诊。医生有几个,门诊就有几个,是不会随便增加的。
所以:Thread和Worker都是随着Runnable的提交,而“被动”产生的。但并不会随着Runnable的运行结束而结束。而是反转为主人,开始主动为其他任务服务。(这是源码中反直觉的一个地方)
先不要关心Worker还实现了Runnable。这只是一个”编程技巧“,并非主要逻辑,不要被迷惑了。我们后面会讲为什么还要实现Runnable。
提交任务:executorService.execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 1. 核心线程数内:直接执行
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 超出核心线程:进队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 进队列失败 && 超出核心最大线程:拒绝
else if (!addWorker(command, false))
reject(command);
}
这段代码和上面那张原理图很容易对应起来,提交任务之后的三条路,正好对应上面三段代码。
这里着重强调几个方法:
- addWorker : 封装Worker并执行(后面会详细解析)
- workQueue.offer:进队列(和add的区别是:如果添加失败,只是返回false,而add则会报错)
- int recheck = ctl.get(); 这是在尝试进核心线程失败后,再次获取ctl,以防止在这短暂的时间内,线程状态和任务数量已经被其他线程给改了。
- reject:这个很好理解,就是执行拒绝策略。
addWorker:封装并启动
private boolean addWorker(Runnable firstTask, boolean core) {
//1. 修改ctl
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))// <-------------CAS
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 2. 封装并启动Worker
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);// <-------------封装Worker
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();// <-------------这里开始启动线程,执行任务了
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
这段代码很长,但逻辑并不复杂,只有两段:1. 修改ctl 2.封装Worker并启动。
修改ctl占了一半的代码。原因是修改ctl是一个需要考虑多线程并发问题的事情,但这里又不想加锁 影响性能,所以弄了两层for循环+CAS,也就是无锁自旋的方式来修改ctl。
封装Worker的过程用了锁来保证线程安全。
在看到 t.start();
,你是否会疑惑:线程启动,和任务有什么关系?他俩不是并列关系吗?
要说清楚这个问题,就得先看看Worker是怎么创建的,下面是Worker构造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask; //赋值任务
this.thread = getThreadFactory().newThread(this);//赋值线程
}
第二句赋值任务,第三句赋值线程。这都很符合我们的预期(给两个关键的成员变量赋值)。
而线程是从getThreadFactory()来的,也没什么好说的,就是一开始传入的线程工厂。
关键是:newThread(this)。把当前Worker对象作为任务,塞给了这个线程。这就是为什么Worker要实现Runnable。
Worker既然实现了Runnable,就需要一个run方法给Thread去执行。说到这里,是不是感觉就能串起来了: 在这个run方法里只要去调用我们提交的任务的run方法。下面展示了:Worker,Runnable,Thread之间更隐秘的关系(别被他们的位置所属关系迷惑了,这里我把他们三个拆成三个独立的个体)
线程调用Worker的run方法,Worker的run方法里又调用Runnable(用户提交的任务)的run方法。Worker的run相当于一个中转站。
runWorker
上图中,Worker里的run方法,并不是简简单单直接调用了一下下面Runnable的run方法。而是“做了一点文章”。
原因很好理解:如果只是简单调用,那么不就意味着:任务提交,核心线程数内的任务 分配线程,执行完,然后就没然后了。线程变成了“一次性”线程。任务队列里还有一堆嗷嗷待哺的任务没人管了。
所以Worker里的run方法调用了一个叫runWorker的“中转方法”,看一下这个方法内容:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {// <-----------------不断去获取新任务
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//钩子方法1:执行任务前
Throwable thrown = null;
try {
task.run();// <-------------------执行我们提交的Runnable的run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//钩子方法2:执行任务后
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);// <----------------没任务了,清理线程
}
}
- 只要能通过getTask()获取到新任务,这个循环就会一直继续。
- 获取不到新任务,才会跳出死循环,清理线程。
- 这两个钩子方法(beforeExecute和afterExecute)AOP的作用对象是工作线程,而不是线程池。
getTask():获取任务
为什么核心线程可以永远不会被清除呢,看一下获取任务的方法就知道了:
可以看到在从任务队列里取任务,有两种结果:
- 在指定时间内拿不到,就放弃(poll)。然后继续走下去,后面就会走出死循环,最终清理掉线程。
- 拿不到就一直阻塞(take)。线程会一直存在。
一般情况都是第二种情况。
会产生第一种情况有两种,任意一个条件满足就可以:
- 给线程池配置了allowCoreThreadTimeOut参数为true(默认是false)
executorService.allowCoreThreadTimeOut(true);
这个参数的意思是:是否允许核心线程超时(被清理)
【这个参数可以让核心线程“不那么核心”,也会随时被清理。之所以前面不说这个参数,我是希望逐步加深,而不要一开始搞得太复杂】。 - 当前的Worker数量超过了最大核心线程数。也就是“最大线程数很大,但队列却很短,不够排队,就直接运行,导致临时新增的线程”(医院排队用的椅子不够用,所以去其他医院借调医生)。
到这里,你可以先停一下。再仔细看看那个’wc’变量,稍微分析一下,看看逻辑是否说的通。判断一下自己的理解是否有偏差。
线程回收
通过前面的讲解,我们知道了什么样的线程会被清理。
有人或许会提到一个词叫“线程回收”。从字面意思来看,“回收”指的就是“回收再利用”。但看过源码我们就知道,并不存在一个“回收再利用”的机制。其实是“同一个线程不断索取新任务”的机制。
我们看看线程是如何清理的,也就是前面runWorker方法里,最后finally里的方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
...
workers.remove(w);
...
}
我把其他代码都删了,只留了一句关键代码。
这个workers,是ThreadPoolExecutor里的一个成员属性HashSet,存储当前正在执行的Worker。
private final HashSet workers = new HashSet();
这里就是把当前这个Worker,从这个Set中删掉。
所以,准确的说是:清理Worker,顺带着把线程也清理了。
因为当前这线程,只有当前这个Worker引用。一旦Worker被GC,那么线程也变的无依无靠,然后也被GC。
为什么Worker要继承AQS
关于AQS的详细解释,请看我的另一篇博客Java锁深入理解2——ReentrantLock
Worker类实现Runnable的原因,前面已经说过了。那它为啥要继承AQS呢。
我在网上查了好多文章。大家抄来抄去,也没一个人说清楚,如下是一个还不错的说法:
Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。
- lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
- 如果正在执行任务,则不应该中断线程。
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
- 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。
对于这段话。实现不可重入的特性反应线程现在的状态,我不太理解。【有理解的,可以评论区留言】
但下面那段话,我大致是明白了,可以给大家通俗直观的讲一下。
锁要起作用,至少要两个线程出现并发。
这里出现并发的线程只有两个:主线程 和 某一个工作线程。
而发生的场景,就是对线程池使用shutdown()的时候。
shutdown()
这是线程池的一个停止方法,目的是“优雅的把线程池”停下来(前面我们很多demo也都用了)
下面是代码
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();// <-------------中断空闲线程
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
由于前面我们也说了,线程和Worker一一对应,所以中断Worker,也就是为了中断线程。下面是具体的代码
基本逻辑就是:遍历所有Worker,然后给他们的线程发出中断信号。
但是呢,有条件。第一个条件没啥,就是已经中断的就不再中断。关键是第二个条件,继续往下看tryLock的代码
public boolean tryLock() { return tryAcquire(1); }//试图抢锁
----
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {// <-----只有当前state是0,才更新为1,否则返回false
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
也就是说Worker把自己包装成了一个独占锁。
主线程在试图给Worker发中断消息前,会先抢一下这个锁。抢成功了,才给这个Worker发中断,否就跳过这个Worker。
好,至此主线程这边就讲完了。
然后看看另一边,可能会受影响的工作线程那头的情况(也就是Worker):
这段代码的大致逻辑:
- 如果能通过getTask()获取到任务,就一直循环。
- 在循环体内会通过Worker的锁,开始lock(把state改为1),结尾unlock(把state改为0),围成一个“安全区”。
所以,对那头的主线程来说:只有Worker线程处在绿色框里的getTask()时,才会发中断。而当Worker线程在红框里时,就不会发(注意:是直接跳过不发,而不是等着工作线程跑到绿色框内再发)。
从业务角度讲,红框内属于正在执行程序。绿框里属于空闲或者获取下一个任务的间隙(也算是短暂的空闲阶段)
所以:Worker通过继承AQS,把自己包装成一个独占锁。来实现只中断空闲线程的需求。
“中断空闲线程”方法(interruptIdleWorkers),并不只有shutdown()会调用,当修改最大核心线程数是也会调用。
为什么不使用同一个锁?
Worker之间执行应该是互不干扰的。用同一个锁,线程池就退化成“所有任务一个个顺序执行”的无意义框架了。
虽然上面说我不太理解Worker的可重入效果。但我也有自己的想法,只是没有找到佐证我想法的地方。
我觉得这里的“非重入效果”,大白话应该是:防止重复提交中断。可以这么推理:非重入。指的就是自己反复拿锁。
这里只有两个线程:主线程,工作线程。谁可能重复拿锁呢?
工作线程应该不可能,人家就是老老实实的一遍遍循环“lock-unlock”,这是一个自动的过程。
那么可能重复拿锁的就是主线程了,比如用户连着写了两个shutdown()。那么就会在短时间内,出现两次w.tryLock()。如果前一次还没执行完,后面个就会tryLock()失败,返回false,跳过这个Worker。如果Worker很少,执行的很快,大概率是第二遍的所有Worker都会跳过通知中断。就实现了“防止重复提交中断”的效果。
怎么实现的非重入锁效果?(其实关于锁介绍,我之前"锁"的文章都讲过,这里再着重重复一遍)
其实非重入是最简单的,反而是要实现重入锁,反而需要增加判断逻辑。
关键就是tryAcquire方法逻辑(为什么和tryLock一样?因为tryLock调用的就是tryAcquire),就是尝试获取锁的逻辑:
非重入:一视同仁,别说旁人了,连自己都不能重复获取。
可重入:需要对当前线程进行判断,看看当前线程是不是获取锁的线程,如果是,说明是当前线程自己又来了。如下所示
getTask()的出口
前面我知道,想要从runWorker方法里的死循环里出来,唯一的办法就是getTask()获取任务返回null。所以我们看一下getTask()如何返回null
getTask()只有两个出口条件(这里的“出口”指的是跳出外面的runWorker循环,而不仅仅是跳出getTask()的循环),就是:
rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())
和
(wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())
第一个条件语句的意思是:如果状态已经被修改成SHUTDOWN并且任务队列为空。或者状态为STOP(使用shutdownNow后的状态)【stop状态值比shutdown的大。所以满足rs>stop 就一定满足 rs>shutdown】。
所以这里也解释了shutdown()和shutdownNow()的业务逻辑的区别。
第二个条件语句比较复杂,大致意思是:
- 如果确认已经没任务可处理了,那些增加的临时线程,就可以出去了(被清理)
- 因为改配置(核心线程数,最大线程数),从而需要删减掉的线程,在确保还有其他线程可以干活的时候,也可以出去了
- 就是处理“临时工” 和 “公司因为架构调整而裁员”(走之前都确保完成了手头工作,并完成了交接),具体怎么动态改配置,见基础篇的最后[如何配置线程池]
shutdown()发出的中断信号是如何起作用的:
如果此时线程阻塞在workQueue.take();
因为收到中断信号,阻塞就会解除,循环继续。然后在第一个出口出去(SHUTDOWN状态 并且 任务队列为空)
shutdown()和shutdownNow()的区别
通过前面分析,我们得知两者最大的区别是:shutdown()需要等队列空了之后才会销毁线程。而shutdownNow()只需要等当前线程执行的任务结束,就会销毁线程。
那就产生一个问题:为什么shutdown()只对空闲线程发中断消息?
其实我们看runwork和getTask()的逻辑可知,就算shutdown()像shutdownNow()那样给所有状态的线程都发中断消息,也不影响结果(反正出口 只在getTask里)。那他们中断的条件 做出区别,意义是什么呢?
答:我们要考虑工作线程里的任务内容。
假如用户写的任务里也写了一个阻塞语句(比如最简单的Thread.sleep)。此时,两者的差别就出来了:
shutdownNow()因为无视运行中的用户任务,直接发送中断消息。比如此时用户任务里刚好执行到了sleep,正在阻塞。那么线程收到中断消息之后,sleep将会响应这个中断消息,从而结束阻塞,然后抛出一个异常。虽然用户捕获了这个异常(中断异常都是检查异常,需要强制捕获),但还是起到了一个作用:加速用户任务的结束。虽然这样引起了用户任务的报错,一定程度上确实影响了任务的正常执行,但也好过使用Thread.stop(被明令禁止使用的终止线程的粗暴方式,因为可能会引起意想不到的结果)。
但是,用户的任务里未必有阻塞语句。即便有,也不一定刚好赶上,阻塞的时候收到中断消息(也许刚好阻塞结束了,此时收到了中断消息。那么中断消息就对用户任务不起任何作用)。
所以shutdownNow()的作用是通过中断消息让用户任务尽快结束,但是否真的起作用,全凭用户任务是否理睬这个中断消息。如果用户任务无视这个中断消息,那就没什么特殊的效果。
所以我们更进一步引申:添加到线程池里的任务。在将要执行长时间操作的之前,最好手动刻意判断一下当前线程中断状态。如果收到中断消息,就可以根据实际业务情况。如果能停,就优雅的提前结束任务,也算是帮助shutdownNow()加快停机的速度。毕竟线程池执行shutdownNow()并不常见(相比较而言,shutdown()还是更优雅,更常用)。如果出现了,那么执行人必然是希望线程都能尽快的停下来的。
【关于中断,详细请看Java锁深入理解4——ReentrantLock VS synchronized中的中断部分】
线程池状态变化
- 对用户来说一般只关心正常运行的RUNNING状态,和操作shutdown()之后变成SHUTDOWN状态,操作shutdownNow()变成STOP状态。
- 操作shutdown()或shutdownNow()之后,用户一般就不用管了,都会自动走到最终的TERMINATED(shutdown和shutdownNow方法都会通过调用tryTerminate()来完成后续收尾工作)。
- TIDYING(字面意思是整理收拾残局)是一个中间状态,感知最不明显。
- terminated()就是基础篇中讲到的ThreadPoolExecutor为子类提供的三个钩子方法之一,也是唯一的一个线程池本身状态变化的钩子方法,表示:线程池关闭后,你要做点什么。
两种关闭前的钩子方法
在ThreadPoolExecutor中还发现有一个finalize()方法。
/**
* Invokes {@code shutdown} when this executor is no longer
* referenced and it has no threads.
*/
protected void finalize() {
SecurityManager sm = System.getSecurityManager();
if (sm == null || acc == null) {
shutdown();
} else {
PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
AccessController.doPrivileged(pa, acc);
}
}
表示:在线程池中没有线程了,并且线程池本身这个引用也没人引用了(可能只是一个局部变量)。即将被GC处理掉前调用这个方法。
在初学Java时,会经常提到这个方法,是Object类为我们提供的钩子方法。在这个方法里,调用了一下shutdown()方法。
这让我想起了另外一个关闭前的钩子方法Runtime.getRuntime().addShutdownHook(线程)
。
它的意思JVM关闭前会调用这线程(这里的只能传入一个线程,而不是方法)。在SpringBoot里,创建完Tomcat对象,就把关闭Tomcat的代码写在了这个线程里。
所以,为了优雅而安全的关闭资源,我们可以重写finalize()方法或者使用Runtime.getRuntime().addShutdownHook(线程)
来完成一些收尾操作。
submit + Future + Callable实现异步返回值
这里讲一下线程池实现异步返回运行结果的实现机制
- 表面上(左侧),把Callable任务传入ThreadPoolExecutor的submit方法,返回Future。在Future里可以异步get出任务的返回值。
- 实际内部机制(右侧):以FutureTask为核心。
- FutureTask干了如下事情:
- 把Callable封装进来
- 实现Runnable接口,所以有一个run方法。
- 在run方法里调用Callable的call方法。
- 把call方法的返回值封装在自己的另一个成员属性outcome里(等人来取)。
- 最后把FutureTask作为一个任务(Runnable)传入线程池的execute方法。
- 线程(Thread)执行FutureTask中的run方法,然后前面的步骤就走通了。。。
- submit的返回值Future.get(),获取到的就是FutureTask中的outcome(call方法的返回值)
小结
- 线程池的实现机制基本都在ThreadPoolExecutor类中
- Worker是ThreadPoolExecutor中的内部类。
- 任务队列(workQueue)和ctl是ThreadPoolExecutor的成员变量。
- ctl记录有效线程数和线程池状态
- Worker封装了线程(Thread)和任务(Runnable)
- 线程和Worker是公用的,一般都是固定数量(和核心线程数一致)
- 初始任务(Runnable)的提交,伴随着Worker的创建和线程的创建。之后线程和Worker很快固定下来,不断从任务队列中获取并处理新的任务
- 有效线程数达到核心线程数,新提交进来的任务都是进入任务队列,等待被线程处理。
- 核心线程获取不到任务就一直阻塞着。临时新增的线程获取不到任务,等待一段时间确认没活干了,就会被清理掉。
- Worker通过继承AQS把自己包装成一个不可重入的独占锁。并在运行过程中,用这个锁来保证线程安全(不被中断信号干扰)。同时避免了中断信号的重复发送。
- shutdown()和shutdownNow()都是通过给线程发中断信号,来中断阻塞。但shutdown()只会让空闲的线程中断。总之,就是为了让线程池“优雅”的关闭。
- 为了优雅的关闭,我们除了设置一些主动操作,还可以设置一些兜底的被动关闭方式:重写finalize()或者使用
Runtime.getRuntime().addShutdownHook(线程)
。 - 本质上,线程池只有一个接收任务的方法:execute(Runnable)。任务也只有Runnable。只不过在FutureTask的作用下,实现了第二个接受任务的方式submit。而且这个方法还有返回值,只不过接受的任务也变成了Callable。
总结
- 为了合理利用线程资源,JDK参考“池化思想”,引入线程池来统一管理和使用线程。
- 主要涉及到三个相关概念:线程池,任务,返回值(Future)
- 线程池是一个生产者-消费者模型。通过阻塞队列来让少量的线程处理很多任务(类似于医院里,几个医生为很多患者服务的场景)
- 配置合理的线程池关键参数(核心线程数,最大线程数,队列长度),在实际生产中并不容易。可以通过线程池提供的getxxx来监控,然后通过setxxx来动态配置参数。
- 在线程池的实现中,“解决多线程并发问题”和“优雅的关闭线程池”是两个重点考量问题。主要应用了锁和CAS(无锁)来实现。【关于锁,可以参考另一篇博客Java锁深入理解】
参考
JDK1.8源码
Java线程池
JAVA Future类详解
Java线程池实现原理及其在美团业务中的实践
jdk线程池工作原理解析
线程的高效利用