1 AQS 简介
在同步组件的实现中, AQS 是核心部分, 同步组件的实现者通过使用 AQS 提供的模板方法实现同步组件语义。
AQS 则实现了对同步状态的管理, 以及对阻塞线程进行排队, 等待通知等一些底层的实现处理。
AQS 的核心也包括了这些方面: 同步队列, 独占式锁的获取和释放, 共享锁的获取和释放以及可中断锁, 超时等待锁获取这些特性的实现,
而这些实际上则是 AQS 提供出来的模板方法, 归纳整理如下:
在 Java 并发编程领域中, AbstractQueuedSynchronizer (AQS) 是一项功能强大且设计精巧的工具。
它为开发人员提供了一种高效的同步机制, 用于安全地控制多线程环境下的资源访问和状态管理。
其本身的设计很简单, 内部维护 1 个 int 的状态和 1 个链表
- 一个线程过来获取锁 (本质就是通过 cas 修改 int 的状态), 获取锁成功 (int 状态修改成功), 线程继续执行
- 一个线程过来获取锁, 获取锁失败, 则将线程封装为链表的一个节点, 放入链表中, 然后挂起
- 获取锁的线程执行完逻辑, 释放锁, 就唤醒链表的头节点, 重新尝试获取锁, 获取成功, 从链表移除, 执行逻辑 (这个过程可能有从外部来的线程进行竞争)
上面是 AQS 非公平锁的大体过程, AQS 本身还提供了公平锁的实现, 为了实现这些锁的逻辑,
AQS 本身还需要支持 同步队列, 独占式锁的获取和释放, 共享锁的获取和释放以及可中断锁, 超时等待锁获取等功能。
而这些功能本身复杂度高同时还是高频的逻辑, 所以 AQS 本身借助了模板方法的设计模式, 将常用的逻辑封装起来, 然后让子类去实现自己锁获取释放的逻辑。
大体的逻辑如下:
public abstract class AbstractQueuedSynchronizer {
public void lock() {
// 1. 尝试获取锁
// 由子类决定当前线程是否获取锁成功
if (tryAcquire()) {
// 获取成功, 直接返回
return;
}
// 2. 获取锁失败, 将线程封装为节点, 放入队列, 然后挂起
// 这些逻辑由 AQS 内部进行实现
addNodeToQueueAndPark();
}
// 由子类进行实现
protected abstract boolean tryAcquire();
}
而这些实际上则是 AQS 提供出来的模板方法, 归纳整理如下:
独占式锁相关的方法
// 独占式获取同步状态, 如果获取失败则插入同步队列进行等待
void acquire(int arg);
// 与 acquire 方法相同, 但在同步队列中进行等待的时候可以检测中断
void acquireInterruptibly(int arg);
// 在 acquireInterruptibly 基础上增加了超时等待功能, 在超时时间内没有获得同步状态返回 false
boolean tryAcquireNanos(int arg, long nanosTimeout);
// 释放同步状态, 该方法会唤醒在同步队列中的下一个节点
boolean release(int arg);
共享式锁相关的方法
// 共享式获取同步状态, 与独占式的区别在于同一时刻有多个线程获取同步状态
void acquireShared(int arg);
// 在 acquireShared 方法基础上增加了能响应中断的功能
void acquireSharedInterruptibly(int arg);
// 在 acquireSharedInterruptibly 基础上增加了超时等待的功能
boolean tryAcquireSharedNanos(int arg, long nanosTimeout);
// 共享式释放同步状态
boolean releaseShared(int arg);
本身了解这些模板方法的逻辑, 就能够很好的理解 AQS 的设计思想, 以及后续的同步组件的实现。
2 AQS 同步队列
AQS 内部核心的 2 个变量, 1 个 int 的状态值, 1 个同步队列。
int 的状态值本身没有多大的问题, 但是链表本身有一点设计, 所以这里对 AQS 的链表做个简单的介绍, 便于后面 AQS 的理解。
在 AQS 有一个静态内部类 Node (只列举了部分重要的属性)
static final class Node {
/******************** 属性 **************************/
// 节点状态
volatile int waitStatus;
// 当前节点的前驱节点
volatile Node prev;
// 当前节点的后驱节点
volatile Node next;
// 加入同步队列的线程引用
volatile Thread thread;
// 等待队列中的下一个节点
Node nextWaiter;
/******************** 节点模式 **************************/
// 标识节点为独占模式
static final Node SHARED = new Node();
// 标识节点为独占模式
static final Node EXCLUSIVE = null;
/******************** 节点状态 **************************/
// 节点从同步队列中取消
int CANCELLED = 1;
// 等待唤醒的状态
int SIGNAL = -1;
// 当前节点进入等待队列中
int CONDITION = -2;
// 在共享锁的释放中, 会从头节点向后逐个唤醒状态为 signal 的节点的线程, 直到遇到第一个状态为 0 的, 停下来, 会将其从 0 设置为 -3
// 表示下一次共享式同步状态获取将会无条件传播下去
int PROPAGATE = -3;
// 初始状态
int INITIAL = 0;
}
从上面的节点的属性可以知道每个节点有前驱节点 prev 和后驱节点 next, 所以可以知道同步队列的真实实现是一个双向链表。
另外 AQS 自身的属性中有两个重要的成员变量:
public abstract class AbstractQueuedSynchronizer {
// 同步队列的头节点
private transient volatile Node head;
// 同步队列的尾节点
private transient volatile Node tail;
}
结合 2 个属性, 可以得出 AQS 中维护的同步队列的结构如下:
同时, 我们也可以大概分析出节点加入同步队列的过程:
// 1. 将线程封装为节点
// 2. 将节点设置到双写链表的尾部
// 3. 修改 AQS 的 tail 指向新的节点
退出链表的逆推就行了, 这里就不再赘述了。
3 AQS 中的独占锁实现
3.1 独占锁的获取 - acquire 方法
public final void acquire(int arg) {
// 调用需要子类实现的 tryAcquire() 方法, 尝试获取锁
// 1. 获取锁成功了, 方法结束
// 2. 获取锁失败, 将当前线程封装为 Node 节点, 放到等待队列中, 等待唤醒
// 3. acquireQueued 方法返回 true 表示当前线程需要中断了, 设置线程的中断标识为 true
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 设置当前的线程的中断标识为 true
selfInterrupt();
}
3.1.1 acquire 中的入队操作 - addWaiter 方法
// 当前使用的为 OpenJdk 11 版本, 可能会有出入
// 入参的 mode 为 Node.EXCLUSIVE 或者 Node.SHARED, 表示当前节点的模式为独占模式或者共享模式
private Node addWaiter(Node mode){
// 1 将当前线程封装成一个 Node 节点, 这个节点的下一个等待的节点的模式, 既 Node.EXCLUSIVE 或 Node.SHARED
// 通过这个下一个节点的模式可以间接等待当前节点模式
Node node = new Node(Thread.currentThread(), mode);
// 死循环
for (;;){
// 取到当前链表的尾节点
Node oldTail = tail;
// 2 当前尾节点是否为 null
if (oldTail != null){
// 2.2 设置新的节点的前驱节点为当前链表的尾节点
node.setPrevRelaxed(oldTail);
// 通过 CAS 把当前节点设置为尾节点
if (compareAndSetTail(oldTail, node)){
// 旧的尾节点的下一个节点为当前的新节点
oldTail.next = node;
return node;
}
} else{
// 2.1 当前同步队列尾节点为 null, 说明当前线程是第一个加入同步队列进行等待的线程, 初始化同步队列
// 同步队列这时候不为空了, 又执行一次循环
initializeSyncQueue();
}
}
}
private final void initializeSyncQueue() {
Node h;
// 创建出一个空的 Node 节点, 通过 CAS 操作尝试将其变为头节点, 再将尾节点的指针指向新创建的节点
if (HEAD.compareAndSet(this, null, (h = new Node())))
tail = h;
}
分析可以看上面的注释。
程序的逻辑主要分为两个部分:
- 当前同步队列的尾节点为 null, 调用方法 initializeSyncQueue(), 初始出一个头部没有任何信息的链表, 然后回来, 重写回到循环, 再次尝试把当前节点放到链表的尾部
- 当前队列的尾节点不为 null, 则采用尾插入 (compareAndSetTail() 方法) 的方式入队
3.1.2 acquire 中的在等待队列唤醒 - acquireQueued 方法
获取独占式锁失败的线程会包装成 Node, 然后插入等待同步队列。
在同步队列中的节点 (线程) 会做什么事情来保证自己能够有机会获得独占式锁了?
带着这样的问题我们就来看看 acquireQueued() 方法, 从方法名就可以很清楚, 这个方法的作用就是排队获取锁的过程, 源码如下:
final boolean acquireQueued(final Node node, int arg) {
// 是否需要通知当前线程中断
boolean interrupted = false;
try {
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 2 前驱节点是头节点并且成功获取同步状态, 即可以获得独占式锁
// 在上面创建 addWaiter 方法可以知道, 同步队列为空, 会创建一个默认值的头节点 head, 再把新节点放到这个头节点前面
// 如果一个节点的前驱节点为头节点, 就可以判断出这个节点为链表中真正数据的第一个节点
if (p == head && tryAcquire(arg)) {
// 当前节点设置为 头节点
// 设置头节点 = node
// 设置 node.thread = null
// 设置 node.prev = null
// 这时候头节点的状态为 signal (-1)
setHead(node);
p.next = null;
return interrupted;
}
// 3 获取锁失败, 线程进入等待状态等待获取独占式锁
// shouldParkAfterFailedAcquire 主要是判断当前的节点里面的线程是否可以挂起,
// 返回 true 的条件: node 的前驱节点的状态为 signal (等待唤醒的状态), 前驱在等待唤醒, 那么这个节点先挂起
// parkAndCheckInterrupt 这时会挂起线程, 阻塞住, 直到被唤醒获取中断
if (shouldParkAfterFailedAcquire(p, node))
// | 或运算, 只要有一个真, 就是真
// interrupted 默认为 false, parkAndCheckInterrupt() 返回了 true, 那么 interrupted 就会为 true
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
// 上面的逻辑出现了异常了, 正常的情况就是线程的中断标识为 true, 但是挂起了, 或者挂起中, 被中断了
// 取消获取锁
cancelAcquire(node);
// 需要设置中断标识,
if (interrupted)
selfInterrupt();
throw t;
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驱节点的状态为 signal
// signal 表示等待唤醒的状态, 安全的, 当前线程可以挂起
if (ws == Node.SIGNAL)
return true;
// > 0, 状态为取消状态
if (ws > 0) {
// 从当前节点一直往前找到第一个状态不为 CANCELLED (1) 的节点,
// 也就是找到链表中前面中最接近当前节点, 同时状态不为 CANCELLED (1), 将当前节点放到这个节点的后面, 中间的节点舍弃掉
// 效果: 从当前节点到第一个不为 CANCELLED 状态的节点之间所有的 CANCELLED 状态的节点都被删除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将前驱节点设置为 SIGNAL 状态, 表示节点里面的线程等待唤醒
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
// 返回 false, 表示当前的线程还不能挂起, 再走一遍循环
return false;
}
private final boolean parkAndCheckInterrupt() {
// 使当前线程挂起, 直到被唤醒
LockSupport.park(this);
// 返回当前线程的中断标识
return Thread.interrupted();
}
到这里就应该清楚了, acquireQueued() 在自旋过程中主要完成了两件事情:
1 如果当前节点的前驱节点是头节点, 并且再次尝试, 能够获取到同步状态的话 (即获取到锁), 直接返回, 让线程能哥继续执行, 否则进入下一步
2 获取锁失败的话, 会根据前驱节点的状态进行处理 (如下)2.1 前驱节点的状态为 CANCELLED, 从当前节点一直往前找到第一个不是取消状态的节点, 将当前节点放到其后面, 重新执行 acquireQueued 方法的逻辑
2.2 前驱节点不是 SIGNAL 和 CANCELLED, 将前驱节点设置为 SIGNAL 状态, 重新执行 acquireQueued 方法的逻辑
2.3 前驱节点为 SIGNAL 状态, 把当前线程挂起来。等待被唤醒
到这里可以看出独占锁的特点
- 线程进来, 就直接尝试获取同步状态, 获取成功, 直接返回
- 获取失败, 就将线程封装为节点, 放入等待链表, 然后挂起
3.1.3 acquire 中等待队列唤醒异常 - cancelAcquire 方法
在上面的 acquireQueued 方法中, 线程的中断标识为 true, 尝试挂起会失败, 这时候会让这个线程取消获取锁的逻辑
private void cancelAcquire(Node node) {
// 节点为 null, 直接结束
if (node == null)
return;
// 设置节点的线程为 null
node.thread = null;
Node pred = node.prev;
// 从当前的节点往前找到第一个状态为取消状态 (1) 的节点, 也就是当前链表中最后一个状态为取消状态的节点
while (pred.waitStatus > 0)
// 设置当前节点的前缀节点为这个取消状态节点的前驱节点
node.prev = pred = pred.prev;
// 这里的 predNext 就是当前链表中最后一个状态为取消状态的节点, 为下面的 cas 使用
Node predNext = pred.next;
// 当前节点的状态设置为取消状态(1)
node.waitStatus = Node.CANCELLED;
// 当前节点就是为节点, 通过 cas 将当前链表的尾节点从当前节点设置为找到的节点
if (node == tail && compareAndSetTail(node, pred)) {
// 设置找到的节点的下一个节点从 predNext 设置为 null
pred.compareAndSetNext(predNext, null);
} else {
int ws;
// 找到的节点不是头节点, 同时节点的线程不为空
// 加上 节点的状态为 signal 或者 不是取消状态下, 能设置为 signal 状态
// 后面的判断最少为了确保找到的节点为 signal 状态
if (pred != head && pred.thread != null && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL)))) {
// 当前节点的下一个节点
Node next = node.next;
// 下一个节点不为空, 同时状态不是取消状态, 将找到的节点的下一个节点设置为当前节点的下一个节点
if (next != null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext, next);
} else {
// 找到的节点为头节点
// 找到的节点的线程为空
// 找到的节点的状态为取消状态
// 都会执行到这个方法, 唤醒这个节点后面的第一个状态小于等于 0 的线程
unparkSuccessor(node);
}
// 协助 gc
node.next = node;
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 当前的节点状态为不是初始状态或者取消状态, 设置为默认值 0, 初始状态
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
// 下一个节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找到, 找到第一个状态不为取消的节点和初始状态的节点
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
// 找到了进行唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
取消获取锁的过程看起来很绕, 实际整理起来很简单
- 清除当前节点和它前面的到第一个非取消状态的节点之间所有取消状态的节点
- 如果找到的节点为头节点 (注意了头节点为没有任何信息的节点), 尝试从当前节点往后找到第一个不为取消状态的节点, 唤醒它
3.2 独占锁的释放 - release 方法
独占锁的释放就相对来说比较容易理解了, 废话不多说先来看下源码:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 头节点存在, 同时状态不为 0 (初始状态)
// 判断 != 0 的作用下面分析
if (h != null && h.waitStatus != 0)
// 唤醒头节点的下一个节点
unparkSuccessor(h);
return true;
}
return false;
}
首先获取头节点的后驱节点, 后驱节点存在并且状态不为取消状态, 唤醒这个线程。
如果不存在后驱节点或者后驱节点为取消状态, 会尝试从尾节点往前找到第一个状态不为取消状态和初始状态的节点, 同时这个节点不是当前的节点, 找到了会唤醒这个节点对应的线程。
- 假设现在有一个锁, 线程 A 通过 acquire 获取到了锁, 经过上面的上面的代码, 可以知道, 这时没有同步队列还没创建
- 线程 B 这时候通过 acquire 尝试获取锁失败了, 会创建出一个链表, 把自己封装为节点 B 放到链表的后面
- acquireQueued 方法中的死循环会一直判断到当前的节点的前驱节点为头节点, 会不断重试获取锁, 而不会挂起
- 这时候线程 A 要释放锁了, 不需要唤醒头节点的下一个节点, 在第三步中会自己唤醒
- 在线程 A 释放锁之前, 又要线程 C 尝试获取锁, 失败了, 拼接到节点 B 的后面, 节点 C, 这时候会被挂起
- 第三步中, 线程 B 获取锁成立, 会将 B 节点设置为头节点, 清空里面的前驱节点, 线程信息等, 保留下了状态 signal (-1)
- 后面线程 B 释放锁, 状态不为 0 了, 就能进入唤醒 C 的过程
- C 唤醒后, 重新执行 acquireQueued 的方法, 这是 C 的前置节点为原本的节点 B, 将自己的节点 C 设置为头节点, 这时候的链表只有一个原本节点 C 的节点了
所以最终的独占锁的处理如下:
- 线程获取锁失败, 线程被封装成 Node 进行入队操作, 核心方法在于 addWaiter(), 同时 addWaiter() 会在队列为 null 的时候进行初始化。同时通过不断的 CAS 操作将节点存到当前队列的尾部
- 线程获取锁是一个自旋的过程, 当且仅当当前节点的前驱节点是头节点并且成功获得同步状态时, 节点出队即该节点引用的线程获得锁, 否则, 当不满足条件时就会调用 LookSupport.park() 方法使得线程阻塞
- 释放锁的时候会唤醒后继节点
总体来说:
在获取同步状态时, AQS 维护一个同步队列, 获取同步状态失败的线程会加入到链表中进行挂起, 从链表移除 (或唤醒) 的条件是前驱节点是头节点并且成功获得了同步状态。在释放同步状态时, 同步器会调用 unparkSuccessor() 方法唤醒后驱节点
3.3 可中断式独占锁的获取 - acquireInterruptibly 方法
我们知道 lock 相较于 synchronized 有一些更方便的特性, 比如能响应中断以及超时等待等特性, 现在我们依旧采用通过学习源码的方式来看看能够响应中断是怎么实现的。
可响应中断式锁可调用方法 lock.lockInterruptibly()。
而该方法其底层会调用 AQS 的 acquireInterruptibly 方法, 源码为:
public final void acquireInterruptibly(int arg) throws InterruptedException {
// 线程的中断标识为 true, 直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁失败
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg) throws InterruptedException {
// 将节点存入到 同步等待链表
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// help GC
setHead(node);
p.next = null;
return;
}
// shouldParkAfterFailedAcquire 判断当前线程是否可以挂起
// parkAndCheckInterrupt 挂起当前线程, 唤醒后, 判断线程的中断标识是否为 true, 这里为 true, 就会直接抛出异常, 结束死循环, 进入 catch 里面的逻辑
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
// 取消获取锁
cancelAcquire(node);
throw t;
}
}
与 acquire 方法逻辑几乎一致, 唯一的区别是当 parkAndCheckInterrupt 返回 true, 即线程阻塞时该线程被中断, 代码抛出被中断异常。
3.4 带超时等待时间的独占锁的获取 - tryAcquireNanos 方法
通过调用 lock.tryLock(timeout,TimeUnit) 方式达到超时等待获取锁的效果, 该方法会在三种情况下才会返回:
- 在超时时间内, 当前线程成功获取了锁
- 当前线程在超时时间内被中断
- 超时时间结束, 仍未获得锁返回 false
该方法会调用 AQS 的方法 tryAcquireNanos(), 源码为
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
// 线程的中断标识为 true
if (Thread.interrupted())
throw new InterruptedException();
// 先尝试获取锁, 获取锁成功, 直接返回
// 获取锁失败, 调用实现超时等待的方法
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
// 等待的时间小于 0, 直接返回
if (nanosTimeout <= 0L)
return false;
// 得到最终结束等待的时间点
final long deadline = System.nanoTime() + nanosTimeout;
// 把当前节点加入到等待链表
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
// 前驱节点为头结点, 同时获取锁成功, 将当前节点置为头结点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
return true;
}
// 1 计算超时时间
nanosTimeout = deadline - System.nanoTime();
// 2 判断是否到了结束的时间点
if (nanosTimeout <= 0L) {
// 将当前节点从队列里面删除
cancelAcquire(node);
return false;
}
// 3
// 判断可以挂起线程, 同时设置的超时时间 > SPIN_FOR_TIMEOUT_THRESHOLD = 1000L, 即超时时间大于 1 秒
// 带超时时间的挂起线程
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
// 线程的中断标识为 true
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
// 取消获取锁
cancelAcquire(node);
throw t;
}
}
程序逻辑同独占锁可响应中断式获取基本一致, 唯一的不同在于获取锁失败后, 对超时时间的处理上。
先计算出按照现在时间和超时时间计算出理论上的截止时间 deadline, 然后 deadline - System.nanoTime() 计算出来就是一个负数, 自然而然会在第 2 步中的 if 判断之间返回 false。
如果还没有超时即第 2 步中的 if 判断为 true 时就会继续执行第 3 步。
4 AQS 中的共享锁实现
4.1 共享锁的获取 - acquireShared 方法
public final void acquireShared(int arg) {
// 调用子类重写的获取共享锁方法
// 返回了大于 0 的值, 表示获取锁
// 共享锁的 tryAcquireShared 的返回值, 代表了锁当前有多少个持有者
// 0 表示无锁状态, 返回 1 表示有 1 个持有者, 返回 2 表示锁已经有 2 个持有者
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
// 把节点加入等待链表中
final Node node = addWaiter(Node.SHARED);
boolean interrupted = false;
try {
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 前驱节点为头节点
if (p == head) {
// 获取锁
int r = tryAcquireShared(arg);
// 获取锁成功
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
return;
}
}
// 判断是否可以挂起线程
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}
共享锁的获取逻辑和独占式锁的获取差不多, 这里的线程退出死循环的条件: 当前节点的前驱节点是头节点并且 tryAcquireShared(arg) 返回值大于等于 0 即能成功获得同步状态。
和独占锁的获取不同的点在于
- 独占锁的获取成功, 只会把自己的节点移除
- 共享锁的获取成功, 则复杂了很多, 除了唤醒自己, 还需要把其他共享的节点也唤醒
4.1.1 acquireShard 中在等待代理中唤醒后的行为 - setHeadAndPropagate 方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 将当前节点设置为头节点, 清空线程信息
setHead(node);
// 持有共享锁的线程数大于 0
// 头节点为 null
// 头节点的状态为不是取消状态
// 新的头节点为 null
// 新的头节点的状态不是取消状态
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 下一个节点为 null 或者为共享节点
if (s == null || s.isShared())
// 尝试是否共享锁
doReleaseShared();
}
}
private void doReleaseShared() {
// 从当前的头节点开始, 向后处理, 把后面所有的状态为 signal 的节点唤醒,
// 直到遇到第一个节点状态不为 SIGNAL 的, 停止, 同时把这个节点的状态设置为 PROPAGATE
for (;;) {
// 获取头节点
Node h = head;
// 头节点不为 null 同时 头节点不等于尾节点
if (h != null && h != tail) {
// 获取头节点的状态
int ws = h.waitStatus;
// 头节点的状态等于 signal
if (ws == Node.SIGNAL) {
// 通过 cas 将头节点从 signal 设置为 0
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
// 设置失败了, 重新开始循环
continue;
// 获取后驱节点
unparkSuccessor(h);
// 状态为 0, 则通过 cas 将其从 0 设置为 -3, 设置失败了, 则继续回到头部,
} else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
大体的逻辑如下:
- 把当前的节点设置为头节点
- 如果头节点的下一个节点为共享节点, 向后处理, 把后面所有的状态为 signal 的节点唤醒, 直到遇到第一个节点状态为 0 的, 停止, 同时把这个节点的状态设置为 PROPAGATE
4.2 共享锁的释放 - releaseShared 方法
public final boolean releaseShared(int arg) {
// 尝试释放锁
if (tryReleaseShared(arg)) {
// 从当前的头节点开始, 向后处理, 把后面所有的状态为 signal 的节点唤醒, 直到遇到第一个节点状态为 0 的, 停止, 同时把这个节点的状态设置为 PROPAGATE
doReleaseShared();
return true;
}
return false;
}
4.3 共享锁的其他方法
- 可中断式的共享锁获取 acquireSharedInterruptibly
- 带超时等待时间的共享锁获取 tryAcquireSharedNanos
其实现和独占式锁可中断获取锁以及超时等待的实现几乎一致, 具体的就不再说了
5 参考
深入理解AbstractQueuedSynchronizer(AQS)