前言
翻阅 Java 线程池的源码,可以看到用到了大量的位运算操作,本文来分析下这些位运算是如何计算的,以及最后算出的结果是什么。
正文
阅读之前,必须熟悉一下内容
- & 与运算
- | 或运算
- ~ 取反
- << 左移
- 负数的二进制表示方式(先取对应 正数 的二进制,然后对每一位 取反,最后加 1)
- java 中 位运算的优先级(取反 优先级大于 与运算)
ThreadPoolExecutor 源码
ThreadPoolExecutor 用一个 int 类型来表示当前线程池的 运行状态 和 线程有效数量。
但是不同平台的 int 类型的范围不一样,我们先假定用 32 位的二进制表示 int 类型:(以下内容都是基于这个条件)
-
高 3 位表示 线程池运行状态
-
低 29 位表示 线程有效数量
// 高 3 位 表示线程池状态,低 29 位表示线程个数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 线程个数的表示 位数(上面也提到了不同平台 int 类型范围不一样) // 不管 int 是多少位, 反正高三位 就是表示线程状态,剩余的位数表示线程数量 private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程最大数量 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 高三位表示 线程池状态 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // 获取线程池状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取线程池有效线程数量 private static int workerCountOf(int c) { return c & CAPACITY; } // 获取上面提到的 32 位 int 类型的数值 private static int ctlOf(int rs, int wc) { return rs | wc; }
位运算详解
线程池的几个属性
-
COUNT_BITS = Integer.SIZE - 3;
COUNT_BITS 表示用多少二进制位表示 线程数量, 这里假定 Integer.SIZE = 32 (注意: 这里不是 Integer.MAX_VALUE),所以最终结果 COUNT_BITS= 29
-
CAPACITY = (1 << COUNT_BITS) - 1
CAPACITY 表示当前线程池中的最大线程数量,这个位运算可以拆解成 2 步:
CAPACITY = 1 << 29
得到 1 000…0000(29 个 0)CAPACITY = CAPACITY-1
得到 1111…111 (29 个 1)
-
RUNNING = -1 << COUNT_BITS
RUNNING 表示线程池处于 运行状态,COUNT_BITS 上面说到是 29,因此这个位运算就表示 -1 左移 29 位。
-1 如果用 2 进制表示
-
获取 -1 的正数,也就是 1 的二进制: 0000000…00000 1 (前面 31 位 0)
-
对上一步进行取反, 1111111111…1111 0 (前面 31 位 1)
-
对上一步 +1 操作, 111111111…1111 (32 位 1)
因此 - 1 左移 29 位, 就得到了 111 0000…00000 ( 29个 0) 。 高三位 111 表示 RUNNING 状态
-
-
SHUTDOWN = 0 << COUNT_BITS
如此类推, 高三位为 000
-
STOP = 1 << COUNT_BITS
高三位为 001
-
TIDYING = 2 << COUNT_BITS
高三位为 010
-
TERMINATED = 3 << COUNT_BITS
高三位为 011
线程池的几个方法
参数 c 就是一开始提到了 32 位整数
1. runStateOf(int c)
private static int runStateOf(int c)
{
return c & ~CAPACITY;
}
-
~ CAPACITY
CAPACITY 从上面可以得知为 1111…111 (29 bit),取反后就是 111 0000000(29个 0)
-
c & 上面的结果
就可以获取到 高三位,而后 29 位 全部为 0(1 & 0 或者 0 & 0都为 0)
2. workerCountOf(int c)
private static int workerCountOf(int c) {
return c & CAPACITY;
}
CAPACITY 为 000 111111(29个1)
因此 c & CAPACITY, 就可以获取变量 c 的低 29 位的值,高三位 与运算结果 为 0
3. ctlOf(int rs, int wc)
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
rs 为 线程状态, wc 表示 线程数量, 或运算的结果就是,就相当于把 rs 的前三位,和 wc 的后 29 位,像字符串一样拼接 到了一起。
总结
上面提到的那些位运算操作是研究 ThreadPoolExecutor
源码的基础,如果为了省事,也完全不用研究这么透彻, 记住文中开头的源码注释内容即可。
比如 CAPACITY
表示的最到线程数量就是 29 bit 1。
Java并发源码学习:线程池源码解析
ThreadPoolExecutor概述
线程池解决的优点
- 当执行大量异步任务时线程池能够提供较好的性能,因为线程池中的线程是可复用的,不需要每次执行异步任务时都创建和销毁线程。
- 提供资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等等。
线程池处理流程
ThreadPoolExecutor执行execute时,流程如下:
- 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务,这里需要加全局锁。
- 如果运行的线程数>=corePoolSize,则将任务加入BlockingQueue。
- 如果此时BlockingQueue已满,则创建新的线程来处理任务,这里也需要加全局锁。
- 如果创建新线程将使当前运行的线程超出maximumPoolSize,则按照拒绝策略拒绝任务。
当然啦,这篇文章意在从源码角度学习线程池这些核心步骤的具体实现啦,线程池概念性的东西,可以参考一些其他的博客
创建线程池
创建线程池有几种方法,一种是使用Executors工具类快速创建内置的几种线程池,也可以自定义。
一、通过Executor框架的工具类Executors可以创建三种类型的ThreadPoolExecutor。
二、使用ThreadPoolExecutor的各种构造方法。
《阿里巴巴 Java 开发手册》中:强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
Executors 返回线程池对象的弊端如下: FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM。 CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
本篇的重点就是这个ThreadPoolExecutor。
重要常量及字段
public class ThreadPoolExecutor extends AbstractExecutorService {
// 原子的Integer变量ctl,用于记录线程池状态【高3位】和线程池中线程个数【低29位】,这里假设Integer是32位的
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 其实并不是每个平台的Integer二进制都是32位的,实际上是,二进制位-3代表线程个数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程最大个数【约5亿】 低COUNT_BITS都是1 000 11111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 111 00000000000000000000000000000 高3位是 111
private static final int RUNNING = -1 << COUNT_BITS;
// 000 00000000000000000000000000000 高3位是 000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 00000000000000000000000000000 高3位是 001
private static final int STOP = 1 << COUNT_BITS;
// 010 00000000000000000000000000000 高3位是 110
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000000000000000000000000000 高3位是 011
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 获取高3位的运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取低29位的线程个数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 通过RunState和WorkCount计算ctl的值
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 线程池状态变换是单调递增的
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 只有RUNNING 是小于SHUTDOWN的
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// ...
// 阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 独占锁 同步保证
private final ReentrantLock mainLock = new ReentrantLock();
// 存放 线程池中的工作线程
private final HashSet<Worker> workers = new HashSet<Worker>();
// 条件队列,线程调用awaitTermination时存放阻塞的线程
private final Condition termination = mainLock.newCondition();
// ...
// 继承AQS和Runnable,任务线程
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{ /*.. */}
}
- ThreadPoolExecutor通过AtomicInteger类的变量ctl记录线程池状态和线程池中线程个数,这里以Integer为32为例。
- 高3位表示线程池的状态,低29位表示线程个数,分别通过runStateOf和workerCountOf计算。
线程池的五种状态及转换
- 线程池的状态有五种,他们提供了线程池生命周期的控制:RUNNING:能够接收新任务,并且处理阻塞队列里的任务。SHUTDOWN:拒绝新任务,但会处理阻塞队列里的任务。STOP:拒绝新任务,并且抛弃阻塞队列里的任务,同时会中断正在处理的任务。TIDYING:所有任务都执行完后当前线程池workerCount为0,将调用terminated()这个钩子方法。TERMINATED:终止状态。terminated方法调用完成。
- 线程池的状态是有规律的,保证单调递增,但是不一定每个状态都会经历,比如有以下几种转换:RUNNING -> SHUTDOWN:可能是显式调用了shutdown()方法,也可能在finalize()里隐式调用。RUNNING或SHUTDOWN -> STOP:调用了shutdownNow()方法。SHUTDOWN -> TIDYING:队列和线程池都为空的时候。STOP -> TIDYING:线程池为空的时候。TIDYING -> TERMINATED:钩子方法terminated()调用完成的时候。
由于awaitTermination()方法而阻塞在条件队列中的线程将会在线程池TERMINATED的时候返回。
ThreadPoolExecutor构造参数及参数意义
ThreadPoolExecutor方法的构造参数有很多,我们看看最长的那个就可以了:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize:核心线程数定义了最小可以同时运行的线程数量。
- maximumPoolSize:当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。【如果使用的无界队列,这个参数就没啥效果】
- workQueue: 阻塞队列,当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到核心线程数的话,新任务就会被存放在队列中。
- keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁。
- unit:keepAliveTime 的时间单位。
- threadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,默认使用Executors的静态内部类DefaultThreadFactory。
- handler:饱和策略,当前同时运行的线程数量达到最大线程数量【maximumPoolSize】并且队列也已经被放满时,执行饱和策略。
Work类
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 具体执行任务的线程 */
final Thread thread;
/** 执行的第一个任务 */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 线程启动时,执行runWorker方法 */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 不可重入的,state = 1表示已获取
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// state = 0 表示锁未被获取
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 如果线程启动,则中断线程 state只有初始化的时候才是-1,其他的时间都是满足>=0的
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}</code></pre></div><p data-pid="nsgjAKiG">Worker继承了AQS和Runnable接口,是具体承载任务的对象。</p><p data-pid="OVHJ7K_t">基于AQS,Worker实现了不可重入的独占锁,state == 0 表示锁未被获取,state == 1表示锁已经被获取, state == -1为初始状态。</p><p data-pid="ObjKiAjx">firstTask记录该工作线程执行的第一个任务,thread是执行任务的线程。</p><p data-pid="6QuyA7bs">interruptIfStarted()方法会在shutdownNow中调用,意在中断Worker线程,state初始化为-1,是不满足getState条件的。</p><h2><b>void execute(Runnable command)</b></h2><p data-pid="C26e2efA">execute方法就是向线程池提交一个command任务进行执行。</p><div class="highlight"><pre><code class="language-text">public void execute(Runnable command) {
// 提交任务为null, 抛出空指针异常
if (command == null)
throw new NullPointerException();
// 获取当前ctl的值 : 线程池状态 + 线程个数
int c = ctl.get();
// 如果当前线程池中线程个数小于核心线程数corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 通过addWorker新建一个线程,然后,启动该线程从而执行任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池处于RUNNING状态,则添加任务到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
// double-check
int recheck = ctl.get();
// 如果线程池不是处于RUNNING, 则从队列中移除任务
if (! isRunning(recheck) && remove(command))
// 并执行拒绝策略
reject(command);
// 如果当前线程个数为0, 则添加一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果队列满,则新增线程,新增失败则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}</code></pre></div><ol><li data-pid="Ey989A1v">如果线程池当前线程数小于corePoolSize,则调用addWorker创建新线程执行任务,成功则直接返回。</li><li data-pid="btICQed3">如果线程池处于RUNNING状态,则添加任务到阻塞队列,如果添加成功,进行double-check,检测出当前不是RUNNING,则进行移除操作,并执行拒绝策略。否则添加一个线程,确保有线程可以执行。</li><li data-pid="VuyCIMtF">如果线程池不是处于RUNNING或加入阻塞队列失败,并采取拒绝策略。</li></ol><h2><b>boolean addWorker(firstTask, core)</b></h2><div class="highlight"><pre><code class="language-text">private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检测队列是否只在必要时为空
// 等价为:下面几种情况返回false
/* if (rs >= SHUTDOWN && rs 为STOP TIDYING TERMINATED时返回false
(rs != SHUTDOWN || rs不为SHUTDOWN
firstTask != null || rs为SHUTDOWN 但 已经有了第一个任务
workQueue.isEmpty())) rs为SHUTDOWN 并且任务队列为空
*/
if (rs >= SHUTDOWN && //
! (rs == SHUTDOWN && //
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 循环, 通过CAS操作来增加线程个数
for (;;) {
int wc = workerCountOf(c);
// 线程个数如果超过限制,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS增加线程个数,操作成功跳出循环break
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS失败,检测线程状态是否发生了变化,如果发生变化,则跳到retry外层循环重新尝试
// 否则在内层循环重新CAS
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 走到这代表CAS操作已经成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 独占锁保证同步
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 重新检查线程池状态,以避免在获取锁前调用了shutdown接口
int rs = runStateOf(ctl.get());
// 1. 线程池处于RUNNING
// 2. 线程池处于SHUTDOWN 并且firstTask为null
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果t已经启动
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加任务
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加任务成功后, 执行任务
if (workerAdded) {
t.start(); // 执行
workerStarted = true;
}
}
} finally {
// 任务未执行成功
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}</code></pre></div><p data-pid="qrY4fQEU">主要分为两步:</p><ol><li data-pid="YD-a8xWw">双重循环通过CAS操作增加线程数。</li><li data-pid="qANlJdmf">使用全局的独占锁来控制:将并发安全的任务添加到works里,并启动。</li></ol><h2><b>final void runWorker(Worker w)</b></h2><p data-pid="KL9ghh1E">用户线程提交任务到线程池后,由Worker执行,通过while循环不断地从工作队列里获取任务执行。</p><div class="highlight"><pre><code class="language-text">private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// Worker启动执行runWorker
public void run() {
runWorker(this);
}
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // state设置为0, 允许中断
boolean completedAbruptly = true;
try {
// 如果task不为null 或者 task为null 但是 getTask从任务队列获取的任务不为null
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池当前STOP,则确保线程是中断状态
// 如果不是STOP,确保线程没有被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 任务执行之前的hook方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // 执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任务执行之后的hook方法
afterExecute(task, thrown);
}
} finally {
task = null;
// 统计当前的Worker完成的任务数量
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 清理工作
processWorkerExit(w, completedAbruptly);
}
}</code></pre></div><h2><b>Runnable getTask()</b></h2><div class="highlight"><pre><code class="language-text">private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 循环
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1. 线程池状态 >= SHUTDOWN && 工作队列为空
// 2. 线程池状态 >= STOP
// 两种情况,都直接数量 -1 , 返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 工作线程的数量
int wc = workerCountOf(c);
// 需否需要超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//1. 工作线程的数量超过了maximumPoolSize 或者 需要超时控制,且poll出为null,就是没拿到
//2. 工作线程数量 > 1 或者 工作队列为空
// 两者都满足, 则数量 -1 , 返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 从工作队列里取出任务
Runnable r = timed ?
// keepAliveTime时间内还没有获取到任务, 继续循环
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}</code></pre></div><h2><b>void processWorkerExit(w, completedAbruptly)</b></h2><div class="highlight"><pre><code class="language-text">private void processWorkerExit(Worker w, boolean completedAbruptly) {
// completedAbruptly 为true表示用户线程运行异常,需要wc - 1
// 否则是不需要处理的,在getTask中已经处理过了
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
// 统计线程池完成的任务个数, 从workers中移除当前worker
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
//如果当前线程池状态为SHUTDOWN且工作队列为空,
//或者STOP状态但线程池里没有活动线程,则设置线程池状态为TERMINATED。
tryTerminate();
int c = ctl.get();
// 如果线程池为 RUNNING 或SHUTDOWN 表示,tryTerminate()没有成功
// 判断是否需要新增一个线程,如果workerCountOf(c) < min 新增一个线程
if (runStateLessThan(c, STOP)) {
// 表示正常退出
if (!completedAbruptly) {
// min 默认是corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 如果workerCountOf(c) < min 新增一个线程
addWorker(null, false);
}
}</code></pre></div><h2><b>void shutdown()</b></h2><blockquote data-pid="vR8SlkiH">SHUTDOWN : 拒绝新任务但是处理阻塞队列里的任务。</blockquote><p data-pid="VJyNxtUc">调用该方法之后,线程池不再接收新任务,但是工作队列里的任务还需要处理。</p><div class="highlight"><pre><code class="language-text">public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查权限,判断当前调用shutdown的线程是否拥有关闭线程的权限
checkShutdownAccess();
// 设置线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 设置中断标志
interruptIdleWorkers();
// 钩子方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试设置线程池状态为TERMINATED
tryTerminate();
}</code></pre></div><h2><b>void advanceRunState(int targetState)</b></h2><div class="highlight"><pre><code class="language-text">// 设置线程池状态为SHUTDOWN
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
// 当前的状态已经是SHUTDOWN了就直接break返回,如果不是就CAS设置一下
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}</code></pre></div><h2><b>void interruptIdleWorkers()</b></h2><div class="highlight"><pre><code class="language-text">private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
// onlyOne如果不传,默认为false
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历所有的Worker
for (Worker w : workers) {
Thread t = w.thread;
// 如果工作线程没有被中断 且 获取Worker的锁成功,则设置中断标志
// 这里:获取锁成功代表,设置的是没有在执行任务的线程,因为
// 正在执行任务的线程是已经获取了锁的,你tryLock不会成功的
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 只用设置一个
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}</code></pre></div><h2><b>final void tryTerminate()</b></h2><p data-pid="qnKcxDux">如果当前线程池状态为SHUTDOWN且工作队列为空,或者STOP状态但线程池里没有活动线程,则设置线程池状态为TERMINATED。</p><div class="highlight"><pre><code class="language-text">final void tryTerminate() {
// 循环
for (;;) {
int c = ctl.get();
// 如果RUNNING TIDYING TERMINATED
// 如果SHUTDOWN 且任务队列不为空,还需要处理queue里的任务
// 就不需要下面的操作了, 直接返回好了
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// STOP 但 线程池里还有活动线程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// CAS设置rs为TIDYING,且wc为0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 钩子方法
terminated();
} finally {
// terminated() 完成之后, 就设置为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 激活所有因为await等待的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}</code></pre></div><h2><b>List(Runnable) shutdownNow()</b></h2><blockquote data-pid="_hm4NcgF">STOP:拒绝新任务并且抛弃任务队列里的任务,同时会中断正在处理的任务。</blockquote><p data-pid="hSo27djh">调用该方法后,将线程池状态设置为STOP,拒绝新任务并且抛弃任务队列里的任务,同时会中断正在处理的任务,返回队列里被丢弃的任务列表。</p><div class="highlight"><pre><code class="language-text">public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查权限
checkShutdownAccess();
// 设置为STOP
advanceRunState(STOP);
// 设置中断标志
interruptWorkers();
// 将队列任务移到tasks中
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}</code></pre></div><h2><b>void interruptWorkers()</b></h2><div class="highlight"><pre><code class="language-text">private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
// 如果线程启动,则中断线程【正在执行 + 空闲的所有线程都会被中断】
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}</code></pre></div><h2><b>boolean awaitTermination(timeout, unit)</b></h2><p data-pid="1mterNar">当该方法被调用时,当前线程会被阻塞,直到超时时间到了,返回false。或者线程池状态为TERMINATED时,返回true。</p><div class="highlight"><pre><code class="language-text">public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
// 线程池状态为TERMINATED 返回true
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
// 超时了, 返回false
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
文章概述
我们阅读ThreadPoolExecutor源码时在开篇就会发现很多位运算代码:
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
}
不难发现线程状态都用位运算表示,但是为什么要这样做呢?为什么不定义为直观的数字呢?下面我们进行分析。虽然代码量不多,但是想要理解线程池就必须要理解为什么使用位运算。
ThreadPoolExecutor在设计时就是用一个int数值表示了两个业务含义:线程池状态和线程数量。其中高3位表示线程池状态,低29位表示线程数量,这个设计思想体现在以下三句代码:
代码1表示用一个int保存线程池信息,代码2表示一共有(32-3)=29位可以表示线程数量,代码3表示理论上最大线程数量为536870911,这个理论值足以支撑线程池使用。
1 线程池状态
我们明白了线程池上述设计思想,下面就来分析线程池状态值:
我们知道COUNT_BITS=29则上述代码等价于:
现在我们算一算这些状态等于多少,在这里我们从后往前算,因为RUNNING状态是负数左移运算,计算步骤稍微多一些。
(1) TERMINATED = 3 << 29
(2) TIDYING = 2 << 29
(3) STOP = 1 << 29
(4) SHUTDOWN = 0 << 29
(5) RUNNING = -1 << 29
2 位运算应用
runStateOf方法用来获取线程池状态信息,workerCountOf方法用来获取线程池线程数,ctlOf方法用来设置当前线程池状态和线程数量信息,我们分别进行计算。
(1) CAPACITY = (1 << 29) - 1
(2) ~CAPACITY
(3) runStateOf
现在一个线程池状态是RUNNING并且线程数量等于3用二进制表示如下:
执行runStateOf方法就可以得到线程池状态:
(4) workerCountOf
现在一个线程池状态是RUNNING并且线程数量等于4用二进制表示如下:
执行workerCountOf方法就可以得到线程数量:
(5) ctlOf
现在我们要设置一个状态是RUNNING且线程数量等于4的线程池ctl值:
11100000 00000000 00000000 00000000
|
00000000 00000000 00000000 00000100
11100000 00000000 00000000 00000100
这个方法真正体现了高3位表示线程池状态,低29位表示线程数量这个设计思想优点,原本需要两步设置动作现在只需要一步,从而实现了操作原子性,这样就可以满足线程池的很多CAS操作,例如线程池在调用addWorker新增工作线程数时会调用compareAndIncrementWorkerCount方法增加线程数量。
但是假设同一时刻shutdownNow方法导致线程池状态发生改变,那么新增工作线程数方法就不会调用成功,需要继续执行自旋进行尝试,这体现了线程状态和线程数量维护的原子性。
3 位图法应用
3.1 需求背景
我们看看位运算怎样应用在实际开发场景。假设在系统中用户一共有三种角色:普通用户、管理员、超级管理员,现在需要设计一张用户角色表记录这类信息。我们不难设计出如下方案:
id | name | super | admin | normal |
---|---|---|---|---|
101 | 用户一 | 1 | 0 | 0 |
102 | 用户二 | 0 | 1 | 0 |
103 | 用户三 | 0 | 0 | 1 |
104 | 用户四 | 1 | 1 | 1 |
我们使用1表示是,0表示否,那么观察上表不难得出,用户一有用超级管理员角色,用户二具有管理员角色,用户三具有普通用户角色,用户四同时具有三种角色。如果此时新增加一种角色呢?那么新增一个字段即可。
3.2 发现问题
按照上述一个字段表示一种角色进行表设计功能上是没有问题的,优点是容易理解结构清晰,但是我们想一想有没有什么问题?笔者遇到过如下问题:在复杂业务环境一份数据可能会使用在不同的场景,例如上述数据存储在MySQL数据库,这一份数据还会被用在如下场景:
检索数据需要同步一份到ES
业务方使用此表通过Flink计算业务指标
业务方订阅此表Binlog消息进行业务处理
如果表结构发生变化,数据源之间就要重新进行对接,业务方也要进行代码修改,这样开发成本比较非常高。有没有办法避免此类问题?
3.3 解决方案
我们可以使用位图法,这样同一个字段可以表示多个业务含义。首先设计如下数据表,userFlag字段暂时不填。
id | name | user_flag |
---|---|---|
101 | 用户一 | 暂时不填 |
102 | 用户二 | 暂时不填 |
103 | 用户三 | 暂时不填 |
104 | 用户四 | 暂时不填 |
我们设计位图每一个bit表示一种角色:
我们使用位图法表示如下数据表:
id | name | super | admin | normal |
---|---|---|---|---|
101 | 用户一 | 1 | 0 | 0 |
102 | 用户二 | 0 | 1 | 0 |
103 | 用户三 | 0 | 0 | 1 |
104 | 用户四 | 1 | 1 | 1 |
用户一位图如下十进制数值等于4:
![在这里插入图片描述](https://img-blog.csdnimg.cn/5ab2c90f31b6415b8692dd26566555d6.jpeg)用户二位图如下十进制数值等于2:
![在这里插入图片描述](https://img-blog.csdnimg.cn/fa057db2bcb246d4a0331273911f21b0.png)用户三位图如下十进制数值等于1:
![在这里插入图片描述](https://img-blog.csdnimg.cn/9e4f64c040c345e2a0a4a733b76a4144.png)用户四位图如下十进制数值等于7:
![在这里插入图片描述](https://img-blog.csdnimg.cn/279b1a9cfbe54060b039d687fb07be71.png)现在我们可以填写数据表第三列:
id | name | user_flag |
---|---|---|
101 | 用户一 | 4 |
102 | 用户二 | 2 |
103 | 用户三 | 1 |
104 | 用户四 | 7 |
3.4 代码实例
(1) 枚举定义
定义枚举时不要直接定义为1、2、4这类数字,而是采用位移方式进行定义,这样使用者可以明白设计者的意图。
}
假设用户已经具有普通用户角色,我们需要为其增加管理员角色,这就是新增角色,与之对应还有删除角色和查询角色,这些操作需要用到为位运算,详见代码注释。
}
(2) 数据查询
假设在运营后台查询界面中,需要查询具有普通用户角色的用户数据,可以使用如下SQL语句:
我们也可以使用如下MyBatis语句:
<select id=“selectByUserIdAndRole” resultMap=“BaseResultMap” parameterType=“java.util.Map”>
select * from user_role
where id = #{userId} and user_flag & #{userFlag} = #{userFlag}
</select>
4 文章总结
本文首先分析了位运算在Java线程池源码的应用,然后我们又介绍了位图法,这样一个字段就可以表示多个含义,从而减少了字段冗余,节省了对接和开发的成本。当然位图法也有缺点,例如数据库字段含义不直观需要进行转义,增加了代码理解成本,大家可以根据需求场景选择使用,希望本文对大家有所帮助。