文章目录
- 同步队列
- 同步队列结点 Node
- 同步队列状态 state
- 获取互斥锁
- acquire 方法
- tryAcquire 方法获取互斥锁
- addWaiter 方法
- enq() 入队
- acquireQueued()
- setHead 方法设置头节点
- shouldParkAfterFailedAcquire()
- parkAndCheckInterrupt()
- cancelAcquire 发生异常,取消线程获取锁的行为
- unparkSuccessor 方法唤醒后驱结点
- acquireInterruptibly 响应中断
- doAcquireInterruptibly
- 获取共享锁
- acquireShared 方法
- tryAcquireShared 方法由子类实现
- doAcquireShared 方法
- setHeadAndPropagate 方法唤醒其他读线程
- 释放锁
- fullyRelease
- release 释放互斥锁
- releaseShared 释放共享锁
- tryReleaseShared 由子类实现
- doReleaseShared
- 总结
- 条件队列
- ConditionObject
- 等待条件满足
- awaitUninterruptibly
- await()
- addConditionWaiter 将线程放入等待队列
- unlinkCancelledWaiters
- checkInterruptWhileWaiting
- transferAfterCancelledWait
- reportInterruptAfterWait
- 唤醒等待线程
- signal
- doSignal
- signalAll
- doSignalAll
- transferForSignal
- 总结
AQS 是 JAVA 中管程模型的实现,JUC 包中的很多工具类都是基于 AQS 实现的。所以笔者将本篇文章作为《JDK8 JUC 源码全解》专题的开篇文章。
同步队列
同步队列结点 Node
同步队列的底层是双向链表,链表中的结点就是 Node 类。
// AQS 中的队列是通过双向链表实现的,Node 就是链表中的结点。
static final class Node {
static final Node SHARED = new Node(); // 共享锁模式
static final Node EXCLUSIVE = null; // 互斥锁模式
// 采用 volatile 修饰的变量
private transient volatile Node head; // 双向链表头结点
private transient volatile Node tail; // 双向链表尾结点
volatile int waitStatus; // Node 结点的等待状态
volatile Node prev; // 前继结点
volatile Node next; // 后继结点
volatile Thread thread; // Node 中封装的线程
// Node 结点的等待状态枚举
static final int CANCELLED = 1; // 取消状态
static final int SIGNAL = -1; // 后继结点已挂起,需要它来唤醒
static final int CONDITION = -2; // 结点在条件队列中
static final int PROPAGATE = -3; // 共享锁模式下,传播唤醒的行为
Node nextWaiter; // 特指条件队列中的 next
}
同步队列状态 state
表明当前同步队列当前的状态。
[[ReentrantLock]] 中 state 表示互斥锁是否被线程持有,即 0 没有线程持有,1 已经被线程持有。
[[Semaphore]] 中 state 变量来实现信号量计数。
[[ReentrantReadWriteLock]] 中 state 的高 16 位用于表示持有读锁的线程的数量,低 16 位用于表示持有写锁的线程的重入次数。
private volatile int state;
protected final int getState() {
return state;
}
// 注:此方法不具备线程安全性,需要调用方自行保证上下文线程安全
protected final void setState(int newState) {
state = newState;
}
// 线程安全的方式修改 state 变量
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
获取互斥锁
try 前缀的方法都是由子类实现的,AQS 中只是定义了方法,比如 tryAcquire
,tryRelease
。
do 前缀的方法,都是 AQS 实现的去同步队列排队,比如 doAcquireShared
。
acquire 方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // Node.EXCLUSIVE表示互斥锁
selfInterrupt(); // 如果在等待过程中发生中断,传递中断标志位
}
acquire 方法依赖于 tryAcquire 方法的返回值,tryAcquire 方法的具体逻辑由 AQS 子类实现。
- 若 tryAcquire 方法返回值为 false,则先将当前线程封装为 Node 对象,插入同步队列中。
- 然后判断当前线程是否应该阻塞等待,通过死循环保证当前线程阻塞等待或者发生异常被取消或者直接获得锁。
- 如果在等待过程中发生中断,则传递中断标志位
tryAcquire 方法获取互斥锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryAcquire
具体实现由子类决定。[[模版方法模式]]
addWaiter 方法
将当前线程封装为 Node 结点,插入到 AQS 双向链表的末尾。
// 没竞争到锁资源的线程封装成 Node 插入双向链表排队
private Node addWaiter(Node mode) {
// 将当前线程封装为 Node 结点
Node node = new Node(Thread.currentThread(), mode);
// 保存 tail 快照
Node pred = tail;
if (pred != null) {
// 分为三步将新结点挂到链表末端
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 若 CAS 失败或者 pred 为 null,则以死循环的方式,保证当前线程挂到双向链表末尾
enq(node);
return node;
}
enq() 入队
enq:
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
addWaiter:
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
为什么 enq 方法和 addWaiter 方法里都有 CAS 替换尾指针来原子性插入 Node 结点这段代码,这是代码冗余么?为什么要这么设计?
因为在竞争激烈的条件下,可以直接通过 CAS 来操作插入,而不需要先判断后插入。从代码层面上来说只是多了一个条件判断。但是如果在 CPU 指令流水线层面上减少判断,则是一个重要的优化。
private Node enq(final Node node) {
for (;;) { // 存在多线程竞争,所以这里需要死循环。
// 拿到尾节点
Node t = tail;
// 如果尾节点为空,则构建一个伪头节点作为 head 和 tail。
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued()
通过死循环来让当前结点获取锁或者阻塞。返回值表示等待过程中是否发生了中断(true 表示等待过程中发生了中断)。
// acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 前继结点 p
final Node p = node.predecessor();
// 若前继结点是伪头结点,则尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获取锁资源成功,更新头结点
setHead(node);
// 释放 p 结点指向当前结点的引用,帮助 GC
p.next = null; // help GC
failed = false;
return interrupted; // 传递中断标志位
}
// 当前线程必须将前继结点的 ws 设置为 -1 后,才能将自身挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally { // tryAcquire(arg) 抛异常的情况下会进这里
if (failed)
cancelAcquire(node);
}
}
setHead 方法设置头节点
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
shouldParkAfterFailedAcquire()
如果结点的状态是 SIGNAL(-1),那么它释放锁以后必须唤醒后继结点。shouldParkAfterFailedAcquire()
就是要确保前继结点的 ws 是 SIGNAL 后,再调用 parkAndCheckInterrupt()
将自身挂起。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前继结点的 ws
int ws = pred.waitStatus;
// 若 ws 为 Node.SIGNAL,说明前继结点阻塞后会唤醒当前线程,当前线程可以放心阻塞
if (ws == Node.SIGNAL)
return true;
// ws 大于 0 的状态只有取消状态 CANCELLED(1)
if (ws > 0) {
// 往前找,直到找到一个结点的状态不等于 1 的结点,作为当前结点的上一个结点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // ws 为 0 或 PROPAGATE 的情况
// 尝试将前继结点的 ws 修改为 SIGNAL(这里没用死循环,不一定成功)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false; // 返回 false,重新进入外层死循环
}
跳过 ws 是 CANCELLED 的结点:
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
- 图中红色边框的结点由于没有 GC ROOT 引用,会被垃圾回收器回收
parkAndCheckInterrupt()
将线程设置为可中断的等待并返回线程是否在等待过程中发生中断。如果通过 LockSupport.unpark
方法唤醒线程,那么返回 false。如果通过 Thread.interrupt
方法唤醒线程,那么返回 true。
private final boolean parkAndCheckInterrupt() {
// 阻塞线程
LockSupport.park(this);
// 可以确认当前挂起的线程是被中断唤醒的还是被正常唤醒的
// 被中断唤醒返回 true,正常唤醒返回 false
return Thread.interrupted();
}
注:Thread.interrupted();
会清除中断标志位
// The interrupted status of the thread is cleared by this method.
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
cancelAcquire 发生异常,取消线程获取锁的行为
Cancels an ongoing attempt to acquire.
node.thread = null;
- Skip cancelled predecessors
node.waitStatus = Node.CANCELLED;
- 将当前结点脱离同步队列,这一步分为 3 种情况:
- 情况 1:要取消的结点就是 tail 结点
- 情况 2:要取消的结点是伪头结点
- 情况 3:要取消的结点不是伪头结点也不是 tail 结点
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
// Skip cancelled predecessors
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
// 若当前 node 是尾结点,由于是多线程竞争尾指针,必须通过 CAS 操作修改尾指针为前一个结点
if (node == tail && compareAndSetTail(node, pred)) {
// 若 CAS 修改尾指针成功后,将前继结点的 next 设置为 null
compareAndSetNext(pred, predNext, null);
} else {
// 当前结点不是尾结点或者 compareAndSetTail 失败
int ws;
// 当前结点不是伪头结点的后继结点
// 如果 pred 结点的状态是 SIGNAL,则表明 pred 需要唤醒后一个结点。尝试设置 pred 结点的 next 指针为需要他来唤醒的结点。
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
// 存在一个状态 ws == Node.SIGNAL 但是 pred.thread == null
// 因为 cancelAcquire 是先执行 node.thread = null; 后执行 node.waitStatus = Node.CANCELLED;
// 所以这里需要校验 pred.thread != null
&& pred.thread != null) {
Node next = node.next;
// node 的 next 不为 null,且不是取消结点,则将 pred 的 next 指向当前结点的 next
// 这里只是试一下,没有用死循环,因为查找有效节点是从 tail 往前找,这里失败也没事
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// pred == head,当前结点是伪头结点的后继结点,则唤醒后继结点
// 进入条件判断if (p == head && tryAcquire(arg))尝试竞争锁
unparkSuccessor(node);
}
// 当前结点的 next 指向当前结点
node.next = node; // help GC
}
}
unparkSuccessor 方法唤醒后驱结点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) { // 后继结点为 null 或者是 CANCELLED 状态
s = null;
// 从 tail 往前找,找到一个离 node 最近的未取消结点
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒
}
acquireInterruptibly 响应中断
响应中断的方法,如果在获取锁过程中发生了线程被中断,则会抛出中断异常。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); // 若线程被中断,抛出中断异常
if (!tryAcquire(arg)) // 调用子类获取锁的方法
doAcquireInterruptibly(arg); // 执行可中断的获取锁流程
}
doAcquireInterruptibly
doAcquireInterruptibly 方法和 acquire 方法逻辑大致相同,区别在于 doAcquireInterruptibly 会响应中断即如果线程被中断,则不会继续获取锁,直接抛出中断异常。acquire 方法则不会响应中断标志位,仅仅是把中断标识位传递出去。
// 拿不到锁资源就一直等,直到锁资源释放后被唤醒或者被中断唤醒
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 响应中断,与 acquire 主要区别
}
} finally {
if (failed)
cancelAcquire(node);
}
获取共享锁
学习共享锁时,我们需要特别关注读写锁是如果解决写线程饥饿(写线程抢不到锁)问题。
acquireShared 方法
acquireShared 和 acquire 主要区别是,当前线程获取锁后是否会唤醒后继结点。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared 方法由子类实现
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
Semaphore、CountDownLatch、ReentrantReadWriteLock 中都有 tryAcquireShared 方法的实现。
doAcquireShared 方法
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 若当前结点的前继结点是头节点,则调用子类实现的 tryAcquireShared 获取锁
if (p == head) {
// 若成功获取锁,则设置当前结点为头节点,并唤醒同样等待在 SHARED 模式下的线程
// ReentrantReadWriteLock 的实现中返回值为 1 表示获取到读锁,-1 表示未获取到
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt(); // 若被中断过,传递中断标志位
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate 方法唤醒其他读线程
Sets head of queue, and checks if [[successor]] may be waiting in shared mode, if so propagating if either propagate > 0 or PROPAGATE status was set.
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 保存旧的 head 结点
setHead(node); // 当前结点设置为头结点
// 老爷子的习惯,在使用变量之前都会做非空校验,虽然这里 h 和 head 必不为 null,但是还是校验了 h == null 和 (h = head) == null
// 下述条件判断可简化为 if(propagate > 0 || h.waitStatus < 0 || head.waitStatus < 0)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) // 此处 s 也不可能为 null,所以可简写为 if(s.isShared())。下一个结点是读线程才唤醒,写线程就不唤醒了
doReleaseShared();
}
}
[[ReentrantReadWriteLock]] 的实现中,tryAcquireShared 获取到读锁时返回值为 1,即 propagate 为 1,propagate > 0 恒成立,setHeadAndPropagate 可简写为:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
Node s = node.next;
if (s.isShared())
doReleaseShared();
}
释放锁
fullyRelease
Invokes release with current state value; returns saved state. Cancels node and throws exception on failure.
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 若是重入锁,则 savedState 是重入次数
int savedState = getState();
if (release(savedState)) { // 释放锁并唤醒后继结点
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException(); // 若线程未获取到锁就执行该方法,则抛出异常
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED; // 若失败则将结点状态更新为 CANCELLED(-1)
}
}
release 释放互斥锁
release 方法存在伪唤醒问题,即唤醒的线程又继续阻塞。
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 头结点也即当前结点
Node h = head;
// 若当前结点的 waitStatus 状态不等于 0(等于 -1)
// 说明当前结点需要唤醒后面的结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
releaseShared 释放共享锁
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared 由子类实现
- [[CountDownLatch#tryReleaseShared]]
- [[ReentrantReadWriteLock#tryReleaseShared]]
- [[Semaphore#tryReleaseShared]]
doReleaseShared
多个线程同时释放读锁时,第一个线程将头结点的状态从 SIGNAL 修改为 0。第二个线程进来发现 ws == 0 了,于是就将 ws 修改为 -3,表明此时有多个线程同时释放了读锁,被唤醒的线程需要将唤醒传播下去。
private void doReleaseShared() {
for (;;) { // 允许多个持有读锁的线程同时释放锁
Node h = head;
if (h != null && h != tail) { // 校验是否存在等待读锁的线程
int ws = h.waitStatus;
// 若当前头结点的状态为 SIGNAL,则表明后面有结点需要它来唤醒。CAS 将其修改为 0,并唤醒后驱结点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h); // 唤醒后继结点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 传播唤醒
continue;
}
if (h == head) // 若头结点发生变化,必须重新进入死循环校验条件
break;
}
}
总结
条件队列
在生产者消费者模型中,消费者消费时,如果队列为空,则需要等待生产者生产后,唤醒消费者。生产者生产时,如果队列满了,则需要等待消费者消费后,由消费者唤醒生产者。可以参考笔者的 ArrayBlockingQueue 源码分析。
显然只有同步队列是满足不了我们的需求,我们还需要 2 个来队列来保存等待条件满足的生产者的消费者即条件队列。
想象一下,如果没有条件队列,队列为空时,消费者获取锁成功,发现队列为空,于是释放锁,又重新竞争锁资源,导致生产者一直没有获取到锁进行生产。这时极大的性能浪费。
ConditionObject
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter; // 第一个等待结点
private transient Node lastWaiter; // 最后一个等待结点
private static final int REINTERRUPT = 1; // 重复中断状态位
private static final int THROW_IE = -1; // 发生异常状态位
}
等待条件满足
先将线程结点放入等待队列中,然后释放同步队列的锁,等待条件达成、中断、超时。当线程等待完成后,将重新调用 acquireQueued 方法获取锁。
awaitUninterruptibly
Implements uninterruptible condition wait.
- Save lock state returned by AbstractQueuedSynchronizer.getState().
- Invoke AbstractQueuedSynchronizer.release(int) with saved state as argument, throwing IllegalMonitorStateException if it fails.
- Block until signalled.
- Reacquire by invoking specialized version of AbstractQueuedSynchronizer.acquire(int) with saved state as argument.
public final void awaitUninterruptibly() {
// 加入同步队列
Node node = addConditionWaiter();
// 释放锁
int savedState = fullyRelease(node);
boolean interrupted = false;
// 若线程不在同步队列中,则阻塞
while (!isOnSyncQueue(node)) {
LockSupport.park(this); // 阻塞
if (Thread.interrupted())
interrupted = true; // 中断唤醒
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt(); // 传递中断标志位
}
await()
await 和 awaitUninterruptibly 的区别在于,await 是会响应中断的。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); // 响应中断
// 将线程添加到条件队列中
Node node = addConditionWaiter();
int savedState = fullyRelease(node); // 释放锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // while(不在同步队列中)
LockSupport.park(this); // 挂起当前线程
// 有两种情况会导致线程被唤醒:1. 中断唤醒 2. singal 将其移入同步队列后唤醒
// 被唤醒后需要校验等待过程中是否发生中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break; // 若等待过程中发生中断,跳出 while 循环
}
// 被唤醒后,重新获取锁,此时线程已在同步队列中
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0) // 处理中断
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter 将线程放入等待队列
Adds a new waiter to wait queue.
private Node addConditionWaiter() {
Node t = lastWaiter;
// 若最后一个等待结点状态不是 CONDITION,则执行 unlinkCancelledWaiters 方法去掉所有非 Node.CONDITION 状态的变量
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter; // 获取最新的 lastWaiter
}
// 新建 Node 结点,插入等待队列
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
unlinkCancelledWaiters
Unlinks cancelled waiter nodes from condition queue. Called only while holding lock. This is called when cancellation occurred during condition wait, and upon insertion of a new waiter when lastWaiter is seen to have been cancelled. This method is needed to avoid garbage retention in the absence of signals. So even though it may require a full traversal, it comes into play only when timeouts or cancellations occur in the absence of signals. It traverses all nodes rather than stopping at a particular target to unlink all pointers to garbage nodes without requiring many re-traversals during cancellation storms.
总结一下就是线程被 transfer 到同步队列以后,可能还在条件队列中,此时需要将他们 unlink 掉。
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
checkInterruptWhileWaiting
Checks for interrupt, returning THROW_IE if interrupted before signalled, REINTERRUPT if after signalled, or 0 if not interrupted.
检查在等待过程中是否发生了中断:
- 调用 signal 方法之前被中断唤醒,则返回 THROW_IE
- 调用 sgnal 方法之后被中断唤醒,则返回 REINTERRUPT
- 未被中断,则返回 0
// 传递中断状态位
private static final int REINTERRUPT = 1;
// 发生异常状态位
private static final int THROW_IE = -1;
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ? // 检查中断标志位
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
transferAfterCancelledWait
- Transfers node, if necessary, to sync queue after a cancelled wait.
- Returns true if thread was cancelled before being signalled.
final boolean transferAfterCancelledWait(Node node) {
// 当前线程是通过中断方式唤醒,需要抛出中断异常
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node); // transfer
return true; // 抛出中断异常
}
//
// 其他线程 signal 当前线程时,被中断了。
while (!isOnSyncQueue(node))
Thread.yield();
return false; // REINTERRUPT
}
reportInterruptAfterWait
Throws InterruptedException, reinterrupts current thread, or does nothing, depending on mode.
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
唤醒等待线程
signal
Moves the longest-waiting thread, if one exists, from the wait queue for this condition to the wait queue for the owning lock.
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter; // 先进先出
if (first != null)
doSignal(first); // 唤醒等待队列中的头节点
}
doSignal
Removes and transfers nodes until hit non-cancelled one or null. Split out from signal in part to encourage compilers to inline the case of no waiters.
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
signalAll
相当于 notifyAll
public final void signalAll() {
if (!isHeldExclusively()) // 必须持有锁
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
doSignalAll
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first); // 移动到同步队列
first = next; // 更新 first 的引用
} while (first != null);
}
transferForSignal
- Transfers a node from a condition queue onto sync queue.
- Returns true if successful.
final boolean transferForSignal(Node node) {
// If cannot change waitStatus, the node has been cancelled.
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 将 node 放入同步队列,并返回前继结点 p
Node p = enq(node);
int ws = p.waitStatus;
// 若 p 的状态大于 0,那么就只有 CANCELLED 状态了,说明前继结点并不能唤醒 node,那么直接调用 unpark 唤醒 node
// 若 p 不是 CANCELLED 状态,那么尝试将其修改为 -1,后续由 p 来唤醒 node,若 CAS 失败,则直接唤醒 node
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 线程协作
LockSupport.unpark(node.thread);
return true;
}