AbstractQueuedSynchronizer 源码解析
文章目录
- AbstractQueuedSynchronizer 源码解析
- 一、CAS
- 二、字段分析
- 三、内部类 Node
- 1、CLH 队列
- 2、源码分析
- 四、内部类 ConditionObject
- 1、字段分析
- 2、方法分析
- 1、await
- 2、signal
- 五、方法分析
- 1、独占式下的 AQS
- 1、acquire 独占式获取资源
- 2、release 独占式释放资源
- 2、共享式下的 AQS
- 1、acquireShared 共享式获取资源
- 2、releaseShared 共享式释放资源
- 3、cancelAcquire 方法
- 4、setHeadAndPropagate 方法
- AbstractQueuedSynchronizer :
抽象同步队列器框架。
- 前面的文章介绍完了集合框架(JCF) 的一些常用类的源码。后面准备总结 JUC 的一些常用类。而 JUC 的基石就是大名鼎鼎的 AQS了。所以这篇文章先详细介绍下 AQS,以便后面介绍 JUC 其他类。
一、CAS
-
在 AQS 里大量用到了 CAS 方式的方法,所以先总结下 CAS。
-
CAS 是一种乐观锁机制,在保证线程安全性的前提下,通过不加锁去完成某项操作,冲突失败了可以重试。
-
CAS的原理:CAS算法有三个操作数,
通过内存中的值(V)、预期原始值(A)、修改后的新值 (B)。
- 如果内存中的值和预期原始值相等, 就将修改后的新值保存到内存中。
- 如果内存中的值和预期原始值不相等,说明共享数据已经被修改,放弃已经所做的操作,然后重新执行刚才的操作,直到重试成功。
- JVM中CAS是通过UnSafe类来调用操作系统底层的CAS指令实现。
- java.util.concurrent.atomic包下的原子类都使用了CAS算法。而java.util.concurrent中的大多数类的实现都直接或间接的使用了这些原子类。
-
CAS 缺点:
ABA问题:
- 如果一个线程t1正修改共享变量的值A,但还没修改,此时另一个线程t2获取到CPU时间片,将共享变量的值A修改为B,然后又修改为A,此时线程t1检查发现共享变量的值没有发生变化,但是实际上却变化了。
- 解决办法:
使用版本号
,在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A-B-A 就会变成1A-2B-3A。从Java1.5开始JUC包里提供了一个类AtomicStampedReference来解决ABA问题。
- 循环时间长开销会比较大:
自旋重试时间,会给CPU带来非常大的执行开销。
- 只能保证一个共享变量的原子操作,不能保证同时对多个变量的原子性操作。
- 解决办法:从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。
-
CAS使用注意事项:
- CAS需要和volatile配合使用:CAS只能保证变量的原子性,不能保证变量的内存可见性。CAS获取共享变量的值时,需要
和volatile配合使用,来保证共享变量的可见性。
- CAS适用于并发量不高、多核CPU的情况:CPU多核情况下可以同时执行,如果不合适就失败。而
并发量过高,会导致自旋重试耗费大量的CPU资源。
- CAS需要和volatile配合使用:CAS只能保证变量的原子性,不能保证变量的内存可见性。CAS获取共享变量的值时,需要
二、字段分析
-
private volatile int state:
- 用来管理同步状态的变量。使用了 volatile 修饰,保证了可见性,高并发场景下,当 state 被修改时,别的线程也可看到被修改后 state 的值。线程同步的关键是对 state 的操作,可以说获取、释放资源是否成功都是有 state 决定的,
- 具体的含义,其实在不同的子类有不同的意义。
ReetrantLock:
state = 0:
当前共享资源没有被加锁。state = 1:
当前共享资源已被加锁。state > 1:
共享资源被同一个线程多次加锁。
ReentrantReadWriteLock:
- 高 16 位代表读锁状态。
- 低 16 位代表写锁状态。
Semaphore:
- 表示可用的信号的个数。
CountDownLatch:
- 表示计数器的值。
- AQS 提供了三个操作该变量的方法,并且都是使用 final 修饰,子类无法重写。
protected final int getState():
获取 state 值。protected final void setState(int newState):
修改 state 值。protected final boolean compareAndSetState(int expect, int update):
使用 CAS 的方式修改 state 的值为 update。
-
private transient Thread exclusiveOwnerThread:
该自来继承自父类 AbstractOwnableSynchronizer,当前获得锁的线程。 -
private transient volatile Node head:
指向 由双向链表构成的同步队列的 头结点。 -
private transient volatile Node tail:
指向 由双向链表构成的同步对的 尾结点。
三、内部类 Node
-
在AQS中, 使用 Node 节点构成两种队列。
同步队列(CLH):Node 节点构成的双向链表。
等待队列(又称条件队列):Node 节点构成的单向链表。
-
什么叫同步队列与等待队列:
同步队列:排队取锁的线程所在的队列
等待队列:调用 await 方法后,线程会从同步队列转移到等待队列。
1、CLH 队列
CLH:(Craig,Landin and Hagersten)是三个人,共同发明了一个可扩展、高性能、公平且基于自旋锁的链表;链表中的每个线程只在本地自旋前一个节点的状态,即该节点(线程)不断自旋获取前一个节点的状态;每个节点都有一个状态(要么自旋,要么释放锁)。
-
在AQS中,用到的数据结构是 CLH 的变体:
+-------+ prev +-------+ +-------+ head | A | <----- | B | <---- | C | tail +-------+ +-------+ +-------+
-
上图是AQS中
CLH的变体结构
,该结构是:- 一个 FIFO(first-in-first-out)队列;
- 新的等待获取锁的线程先加入队尾(tail);
- 如果队列是空,则第一个新加入的节点立即获得锁;
- 新加入的线程本地自旋前一个节点的状态(如 C 不断自旋获取 B 的状态);
(当A释放锁时,B成为第一个节点),头节点并不能保证能够获得锁,只是有优先权,如果获取失败,则重新变为等待状态;
而 AQS 中的同步队列就是 CLH 队列的变体,CLH 队列也是公平队列,所以AQS 的同步队列也是公平队列。那么既然是公平队列,上面说的头结点不能保证能获取到锁,不就冲突了吗,公平的话按理说你作为头结点一定可以获取到锁的。这是为什么呢?这就与AQS的实现方式有关了。
- 如果是公平模式:新来竞争锁的线程直接加入到同步对队列队尾。
- 如果是非公平模式:新来竞争锁的线程先尝试竞争一次
,如果竞争成功,则直接成为 first 节点,插入在 head 与 first 节点之间,而原 first 节点排在新的 first 节点之后进入等待状态并自旋。
说说AQS 中的同步队列的入队和出队的一些情况(注意:这些情况是理想状态下,即没有高并发的场景,目的是先了解下过程,在后续的源码分析的时候会详细介绍各种情况
)。
-
Node 节点就是用来包装一个个的线程的。一个线程未获取到锁,就会被分装成 Node 节点被添加到同步队列中。
-
这些因为没有获取到锁而阻塞的这些线程,会被分装成一个一个的节点,这些节点会连接成一个双向链表,该双向链表就是同步队列。
-
线程获取锁入队的流程讨论如下:后面也会根据源码一步一步分析,先了解下过程。
-
第一个线程 thread-0 来获取锁,这时的 state = 0,将 state 改为 1, thread-1 会获取到锁。并且一直持有锁。
-
第二个线程 thread-1 使用 CAS 方式 compareAndSet(0,1) 来修改 state 的值, 由于线程 thread-0 还未释放锁,所以 thread-1 会获取失败,AQS 就会将 现成 thread-1 分装成的节点添加到 同步队列中。注:第一次添加节点到同步队列中,会创建两个节点,一个空节点和thread-1 分装成的节点。至于原因后面会解释
-
第三个线程 thread- 2 也来了,重复第二个线程的步骤,也会加入到同步队列。
-
以此类推后面的线程都是如此。
-
-
上面讨论的入队的情况,现在在讨论下出队的情况。
-
第一个线程 thread-0 将 state=1 改为0,然后释放掉锁。
-
第二个线程 thread-0 释放锁之后,第二个现成 thread-1 被唤醒,被唤醒并不意味值就获取到了锁,首先通过 CAS 方式的将 state 的值从 0 改为1,然后获取到了锁,继续运行,运行完之后释放锁继续唤醒第三个线程 thread-2。以此类推。
-
-
同步队列也叫 CLH 队列,内部维护的 FIFO(先进先出)双端双向队列,当一个现成竞争资源失败,就会将等待资源的线程封装成一个Node 节点,通过 CAS 原子操作插入队列尾部,最终不同的 Node 节点连接组成一个 CLH 队列,所以 AQS 通过 CLH 队列来管理竞争资源的线程。
-
CLH 队列的优点:
- 先进先出保证公平性。
- 是非阻塞的队列,通过自旋锁和 CAS 保证节点插入和移除的原子性,实现无锁插入。
- 采用了自旋锁的思想,所以 CLH 也是一种基于链表的可扩展、高性能、公平的自旋锁。
-
CLH 队列的缺点:
- 在自旋过程中,线程会一直占用CPU,导致CPU资源的浪费,尤其是当等待时间较长时,自旋会增加系统负载。即 CPU 空转。
- 自旋期间线程无法进行其他有意义的工作。
2、源码分析
volatile int waitStatus
:四个状态:其实还有在 Node 初始化时 状态 为 0。 新节点入队时的默认状态。- static final int
CANCELLED = 1
:当前等待的线程,因为什么原因取消了。 - static final int
SIGNAL = -1
:代表下一个节点(下一个线程)需要被唤醒。后驱节点在等待当前节点唤醒,后驱节点入队时,会将前驱节点的状态更新为 signal。 - static final int
CONDITION = -2
:表示节点在条件队列上,当其他线程调用了 Condition 的 sign()方法后,CONDITION 状态的节点将从等待队列转移到等待队列中,等待获取资源。 - static final int
PROPAGATE = -3
:共享锁下的节点状态,前驱节点不仅会唤醒后驱节点,同时也可能会唤醒后驱的后驱节点。
- static final int
- 这两个字段代表了
AQS 的两种模式
,共享锁
(比如 ReentrantReadWriteLock) 和独占锁
(比如 ReentrantLock)。- static final Node SHARED = new Node():共享锁。`
static final Node EXCLUSIVE = null:独占锁。
volatile Thread thread
:包装到 Node 节点的线程。Node nextWaiter
:有两个作用- 在同步队列中,当前的节点想要获取的是排他锁(独占锁),还是共享锁。
- 在条件队列中:指向下一个节点。所以
AQS 的条件队列,时使用 nextWaiter 来构成单向链表,不是 next。
static final class Node {
//共享模式
static final Node SHARED = new Node();
//独占模式(排他模式)
static final Node EXCLUSIVE = null;
//线程取消后的waitStatus值
static final int CANCELLED = 1;
//后驱节点入队时,会将前面的节点更新为 signal,表示新入队的节点处于等待状态,且需要被前驱结点唤醒。前驱节点被取消或中断或运行完,
//会唤醒后面节点。
static final int SIGNAL = -1;
//处于等待队列的节点 waitStatus 值,同步队列不会出现该值。
//表示当前节点在等待队列中,当其他线程执行 condition.signal,
//等待队列的节点会转移到同步队列,等到获取资源
static final int CONDITION = -2;
//当处于共享模式下,waitStatus 才会赋值该值。
//前驱节点不仅会唤醒后驱节点,同时也可能会唤醒后驱的后驱节点
static final int PROPAGATE = -3;
//现成分装的 Node 节点的状态,为上面四个值 外加新建的默认状态0
volatile int waitStatus;
//作为同步队列 的前驱节点
volatile Node prev;
//作为同步队列 的后驱节点。注意:条件队列不使用该字段构成单向链表,而是使用 nextWaiter
volatile Node next;
//封装到 Node节点的引用
volatile Thread thread;
//上面也解释过,Node 节点既可以作为同步节点使用,也可以作为等待队列使用
//1:当Node作为同步队列使用时:nextWaiter 有两个值:
// EXCLUSIVE:代表共享模式
// SHARED : 代表排他模式
//2:当Node作为条件队列使用时:保存的是后继节点。所以 AQS 的条件队列,时使用 waitStatus 来构成单向链表,不是 next。
Node nextWaiter;
//同步队列中,判断是否为共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
//获取前驱节点,没有则抛空指针
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
//无参构造,注意 waitStatus 的值为0
Node() {
}
//如果是同步队列,则可指定共性模式
//如果是条件队列,则可指定后驱节点
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
//指定节点状态值
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
四、内部类 ConditionObject
-
我们知道 Synchronize 来实现锁,并且配合 Object 的 wait 和 notify、notifyAll 方法来实现获取到锁的线程的等待 与 通知。
-
同样的我们也可以使用 Lock 来实现锁,使用 Condition 提供的 wait 和 signal、signalAll 方法来实现获取到锁的线程的等待 与通知。
-
不同与 Synchronized ,一个 AQS 可以对应多个条件变量,而 Synchronized 只有一个。
-
Object 与 ConditionObject 的对比:
对比项 Object Monitor Methods Condition 前置条件 获取对象锁 先调用 Lock.lock()获取锁,再调用 Lock.newCondition()获取 Condition 对象 调用方式 直接调用,如 object.await() 直接调用,如 condition.await() 等待队列个数 一个 多个 当前现成释放锁并进入等待状态 awit() await() 当前线程释放锁并进入等待状态,在等待状态中不响应中断 不支持 awaitUninterruptible(),对中断不敏感 当前线程释放锁并进入超时等待状态 支持 支持 当前线程释放锁并计入等待状态到将来的某个时间 不支持 awaitUntil(Date deadline) :到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。 唤醒等待队列中的一个线程 支持 支持 唤醒等待队列中的全部线程 支持 支持 -
每一个 ConditionObject 对象都包含一个条件队列,同样是一个先进先出的队列(FIFO队列,有单向链表构成)。队列中每个节点都封装着一个线程。如果线程调用了 condition.await()方法,那么线程就会释放锁,同时封装成 Node 节点加入到条件队列进入等待状态。所以AQS 可以有多个条件队列和一个等待队列
AQS、同步队列、条件队列三者关系图如下:
1、字段分析
-
private transient Node firstWaiter:
指向同步队列的头结点。 -
private transient Node lastWaiter:
指向同步队列的尾结点。private transient Node firstWaiter; private transient Node lastWaiter;
2、方法分析
1、await
- 调用了 await 方法的线程,该线程所分装的节点会从 条件队列 移动到 同步队列。
public final void await() throws InterruptedException {
//判断是否中断过,如果中断过在调用 await 抛出异常。
if (Thread.interrupted())
throw new InterruptedException();
//将新节点添加到同步队列中,并返回该节点
Node node = addConditionWaiter();
//释放当前线程占用的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断node 是否在同步队列中
//true:在同步队列中
//false:不在同步队列中
//为何要判断是否在同步队列中等?
//因为 fullyRelease 方法释放当前线程占用的资源,并且唤醒后继节点,如果后继节点被唤醒且获取到资源了,那么会设置新的头节点,
//当前节点就会从同步队列中移除,
//但如果后继节点虽被唤醒了但是获取资源失败了,则继续沉睡,那么也就不会更新head,当前节点也就不会从同步队列中移除,这种情况通过
//当前节点node是否还在同步队列中来判断
//综上,只有当前节点释放掉了资源且同步队列的第二个节点(head的next节点)成功被唤醒并获取到资源,才会进入循环体。
while (!isOnSyncQueue(node)) {
//阻塞当前线程
LockSupport.park(this);
//执行到这里两种可能
//1.调用了signal
//2.调用了 interrupt 方法
//正因为有着两种可能,所以 checkInterruptWhileWaiting 检查是通过何种方法被换新的
//interruptMode = 0:没有中断过,正常唤醒
//interruptMode = 1:在调用了 signal之后调用了中断方法
//interruptMode = -1:在调用了 signal 之前调用了中断方法
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//acquireQueued(node, savedState) : 当前节点被唤醒,但还未获取到资源,所以执行acquireQueued进行判断,
//前驱是头结点,且tryAcquire成功获取到资源返回true
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
//是调用了signal之后调用了中断方法
interruptMode = REINTERRUPT;
//如果条件队列中,当前节点(第一个节点)还有下一个节点,则断开当前执行下一个节点
if (node.nextWaiter != null) // clean up if cancelled
//断开等待队列的第一个节点
unlinkCancelledWaiters();
//不等于0,说明调用了中断方法
if (interruptMode != 0)
//interruptMode = 1:在调用了 signal之后调用了中断方法,抛出 InterruptedException 异常
//interruptMode = -1:在调用了 signal 之前调用了中断方法,调用selfInterrupt打上中断标志
reportInterruptAfterWait(interruptMode);
}
//将当前线程分装成Node节点,添加到条件队列中
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
//正常情况下同步的节点状态都是 CONDITION,不是说明发生了异常,该if就是判断最后一个节点是否发生了异常
if (t != null && t.waitStatus != Node.CONDITION) {
//将最后一个发生了异常的节点从同步队列中踢出掉
unlinkCancelledWaiters();
t = lastWaiter;
}
//将当前线程分装成节点,加入到等待队列的队尾
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
//释放资源,并且尝试唤醒同步队列的第二个节点(head的下一个节点)
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
//发生异常了,设置为取消状态,配合上面 对 if (t != null && t.waitStatus != Node.CONDITION) 的判断可知,每次新的节点
//添加到条件队列队尾之前,会先检查队尾的节点是否发生异常,异常则剔除
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
//判断node节点是否还在同步队列中
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
//遍历的方法来判断node是否还在同步队列中
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
//判断线程发生过中断
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
//判断发生了何种中断,是signal之前调用的中断,还是signal之后调用的中断
//判断的一句是什么呢?
//singal 方法,会将条件队列的第一个节点添加到同步队列中,所以可以判断是否在同步队列中
//如果添加到同步队列中,节点的 waitStatus 一定不等于 CONDITION,所以通过CAS 的方式设置
//节点的 waitStatus 为0,成功则没有添加到同步队列,返回true,说明是signal之前发生中断的
//否则返回false就是在signal 之后发生中断的
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
//既然CAS不成功,说明中断发生在signal之后,但我还是得 while (!isOnSyncQueue(node))再判断一下,前面已经说了,signal发生时,
//不是立刻就到达同步队列的,如果同步队列没有,Thread.yield()这里可以理解为先让一下步,缓一缓,等节点到了同步队列,我再返回。
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
2、signal
- 将条件队列的第一个节点,加入到同步队列的队尾,并返回前驱节点,如果前驱节点waitStatus = SIGNAL,则唤醒当前节点,唤醒后会继续执行 await 循环体代码。
public final void signal() {
//判断当前线程是否独占资源,即只有线程拿到了锁,才可以调用 signal
//否则报错
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//获取条件队列的第一个节点
Node first = firstWaiter;
//如果不为空,则唤醒它
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
//将条件队列的第一个节点移除,并获取下一个节点,如果获取到的下一个节点 == null ,说明 条件队列空了
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//transferForSignal:如果first 节点被取消了,返回false,否则返回true。
//该判断作用是,如果first 节点取消了,那么继续尝试唤醒条件队列的下一个节点,直到有一个是非取消节点
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//false:node是取消状态的节点
//true:node不是取消状态的节点
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//说明不等于 CONDITION,在条件队列中,不等于 CONDITION 那一定是取消状态
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//将node节点加入到同步队列的队尾,并返回前驱节点
Node p = enq(node);
//获取前驱节点的waitStatus
int ws = p.waitStatus;
//如果p节点的waitStatus为CANCELLED(ws>0) 或 使用CAS将p节点的waitStatus修改成SIGNAL失败,
//则代表p节点无法来唤醒node节点,因此直接调用LockSupport.unpark方法唤醒node节点。
//否则需要前驱结点去唤醒
//被唤醒后的线程,将从await()方法的while循环中退出(isOnSyncQueue()方法返回true,节点已经在同步队列中),
//进而调用AQS的acquireQueued()方法接入到获取同步状态的竞争中。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
五、方法分析
AQS 采用了模板方法设计模式,提供了两类模板,一类是独占式模板,另一类是共享形模式,对应的模板函数如下
独占式
:- acquire获取资源
- release释放资源
共享式
:- acquireShared获取资源
- releaseShared释放资源
接下来我们根据不同模式下的AQS 进行方法分析。但无论是哪种模式,在进行状态 state 操作都是一样的。
-
getState()
:返回同步状态。 -
setState(int newState)
:设置同步状态。 -
compareAndSetState(int expect, int update)
:使用C A S设置同步状态。 -
isHeldExclusively()
:该线程是否正在独占资源。只有用到Condition才需要去实现它;//获取 state 值,不可被重写 protected final int getState() { return state; } //设置 state 值, 不可被重写 protected final void setState(int newState) { state = newState; } //使用 CAS 的方式设置 state 的值为 update protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } //判断当前线程是否正在独占资源。只有用到Condition才需要去实现它;由子类实现 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
1、独占式下的 AQS
- arg:表示 state 状态。
tryAcquire(int arg)
:独占式获取资源,子类实现。acquire(int arg)
:独占式获取资源模板。tryRelease(int arg)
:独占式释放资源,子类实现。release(int arg)
:独占式释放资源模板。
1、acquire 独占式获取资源
- 整个过程忽略中断影响。
下面是 acquire方法 的流程图,后面会分析涉及到的每一个方法。
public final void acquire(int arg) {
//1.tryAcquire:尝试获取资源,获取失败继续执行后面
//2.addWaiter:将当前线程分装成 Node 节点,且为独占模式,加入到同步队列队尾。
//3.acquireQueued:自旋获取资源
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//如何独占式获取资源由子类实现,实现的区别在于公平锁与非公平锁
//公平锁:不会尝试获取资源
//非公平锁:会尝试获取资源
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//将 node 节点添加到同步队列的队尾
private Node addWaiter(Node mode) {
//将当前线程分装成 Node 节点,且为独占模式。接下来尝试将该节点加入到同步队列的队尾
Node node = new Node(Thread.currentThread(), mode);
//获取同步队列尾部节点
Node pred = tail;
//判断尾部节点是否为null
if (pred != null) {
//如果尾部节点不为空,即同步队列不为空
//将新节点的的 prev 连接上尾部节点 => prev <- node
node.prev = pred;
//使用CAS 的方式更新 记录尾部节点的变量 tail 为 新节点node
if (compareAndSetTail(pred, node)) {
//如果tail 更新成功了,则将 旧的尾部节点prev 的 next 指向 新节点node => prev -> <- node
pred.next = node;
//添加到同步队列成功了,直接返回新节点node即可。
return node;
}
}
//导致列说明有两种可能,且新节点node还未添加到同步队列中
//1:同步队列为空,当前节点是第一个添加到同步队列的节点
//2:同步队列不为空,在更新 tail 时,由于并发的存在,更新失败,此时的关系为 prev <- node,
//但其实这个可以忽略,enq 还是设置一遍,因为还有情况1呢
//执行该方法将 node 添加到 同步队列中
enq(node);
//添加完成后返回新节点node
return node;
}
//使用死循环将node节点加入到同步队列中
//就像上上面说的有两种情况
private Node enq(final Node node) {
//死循环
for (;;) {
//获取尾部节点给t
Node t = tail;
//如果t为空,说明同步队列为空,是第一次添加于元素
if (t == null) { // Must initialize
//使用 cas 的方式设置头结点,如果失败,说明并发情况下,有别的节点设置为头结点了
//没关系,会继续for循环,走else 逻辑
//注意,并不是将 新节点设置为头节点,而是一个新建的空节点(哨兵节点)设置为头结点,
//设置完后继续for循环 走else 逻辑
if (compareAndSetHead(new Node()))
tail = head;
} else {
//到这里说明同步队列中有其他节点(无论是不是只有一个空节点)
//将 新节点的 prev指向尾结点 =》 t <- node
node.prev = t;
//是用 cas 的方式将 尾结点tail 设置为 新节点node
//如果失败,说明并发场景下,有别的现成执行该代码成功了,
//没关系,当前线程继续执行for循环
if (compareAndSetTail(t, node)) {
//设置成功,关系为 =》 t -> <- node
t.next = node;
//插入到同步队列结束,新节点node
return t;
}
}
}
}
//到这里说明,新节点node已经成功添加到同步队列了
//node:新添加的节点。
//arg:表示 state 值。
//该方法作用是:自旋阻塞等待获取资源
final boolean acquireQueued(final Node node, int arg) {
//记录是否发生异常,如果获取资源过程中发生异常,则需要将 node 节点取消掉,即从同步队列中移除 node
boolean failed = true;
try {
//记录是否发生中断
boolean interrupted = false;
//死循环获取资源
//如果node节点的前驱结点是head,node节点尝试获取资源,如果获取到了退出循环,否则进入等待
for (;;) {
//获取当前节点的前驱节点
final Node p = node.predecessor();
//如果前驱节点是头结点,且当前线程获取到了资源
if (p == head && tryAcquire(arg)) {
//因为当前节点已经抢到资源了,直接将当前节点设置为头结点
setHead(node);
//将前驱节点从同步队列中移除
p.next = null; // help GC
//执行成功了,finally 无需执行取消逻辑了
failed = false;
//返回中断状态
return interrupted;
}
//执行到这,说明有2种情况
//1:前驱节点不是头结点
//2:前驱节点是头结点,但是当前节点获取资源失败
//shouldParkAfterFailedAcquire:判断 node 的前驱节点的waitStaus是否为 SIGNAL
//parkAndCheckInterrupt:使用LockSupport类的静态方法park挂起当前线程,直到被唤醒,
//唤醒后检查当前线程是否被中断,返回该线程中断状态并重置中断状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//判断是否发生异常,异常则删除新节点node
if (failed)
cancelAcquire(node);
}
}
//判断 node 的前驱节点的waitStaus是否为 SIGNAL
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
//查找非取消状态的前驱节点,并将途中找到的取消状态的节点给断开,这些断开的节点会被gc
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0)
pred.next = node;
} else {
//前驱节点 waitStatus <= 0 设为 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//当前线程进行休眠并打上中断标记
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
2、release 独占式释放资源
- release 释放资源逻辑比较简单。唤醒 CLH 队列的第二个线程(head 的 下一个节点),源码如下:
//独占式释放资源
public final boolean release(int arg) {
//释放资源,由子类实现
if (tryRelease(arg)) {
//释放资源成功
//获取头节点
Node h = head;
//头结点不为空且头结点状态 != 0
//头结点状态不为0,说明存在阻塞的节点需要被唤醒,正如前面介绍的 waitStatus = -1,表示后继节点需要被被唤醒
//那么 waitStatus > 0 呢?
//头结点的诞生有三种情况:
// 1.第一个线程node尝试获取资源时,会创建空的头结点,waitStatus = 0,然后 node获取资源成功,设置新的 node 为head 节点,
//这时候 head.waitStatus = 0,但是没有可唤醒的后继节点,执行unparkSuccessor,执行unparkSuccessor,
//if (s != null) 是fasle,不会走唤醒代码。
// 2.第一个线程node尝试获取资源时,会创建空的头结点,waitStatus = 0,然后 node获取资源失败,
//会执行 shouldParkAfterFailedAcquire 将node 的前驱节点(这里是head)设为 -1,表示head的后继节点需要被唤醒。
// 3.不是第一个线程添加进来,和上面情况一样的,如果阻塞,会修改前驱节点的 waitStatus = -1。
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
//释放资源失败
return false;
}
//唤醒 node 的第一个非取消状态的后继节点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
//获取node的waitStatus
int ws = node.waitStatus;
//代码很简单,ws < 0,就修改成0
//那么为什么需要将 node 的 waitStatus 修改为 0 呢?
//有这样的一个场景,比如是第一个线程node进行阻塞,会将前驱节点 的 ws 的 waitStatus 改为 -1,但是 node 被唤醒后继续执行时发生了异常
//会执行 cancelAcquire 方法,将node取消掉,但是 head 节点的 waitStatus 还是等于-1,但其实没有后续节点可唤醒了,会卡在
//release 方法的 h.waitStatus != 0 这里。
//所以这里修改为0后,后驱在释放资源并没有必要在执行 unparkSuccessor 方法了,卡在了
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//获取第一个非取消状态的后继节点s
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒节点s所在的线程,被唤醒后,会继续执行acquireQueued方法尝试获取资源。
if (s != null)
LockSupport.unpark(s.thread);
}
release 流程图如下:
2、共享式下的 AQS
- arg:表示 state 状态。
tryAcquireShared(int arg)
:共享式获取资源,返回值大于等于0则表示获取成功,否则获取失败,子类实现。acquireShared(int arg)
:共享形获取资源模板。tryReleaseShared(int arg)
:共享式释放资源,子类实现。releaseShared(int arg)
:共享式释放资源模板。
1、acquireShared 共享式获取资源
- 线程获取共享资源,如果获取到资源,线程直接返回,否则进入CLH队列,直到获取到资源为止,且整个过程忽略中断的影响。
//获取共享锁,arg 为 waitStatus
public final void acquireShared(int arg) {
//获取共享资源失败,则将当前线程分装成Node节点加入到同步队列中,tryAcquireShared 由子类实现
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
//和独占锁 acquireQueued 几乎一样,这里标注下不同点
private void doAcquireShared(int arg) {
//参数不同,独占锁传入的是 Node.EXCLUSIVE
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//成功获取资源后,继续尝试唤醒后继共享节点,该方法后面会单独领出来解析
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2、releaseShared 共享式释放资源
- 和独占式释放资源一样,也是唤醒CHL队列的第二个线程节点(首节点的下个节点)。
//共享式释放资源
public final boolean releaseShared(int arg) {
//释放资源,由子类实现
if (tryReleaseShared(arg)) {
//资源释放后,从同步队列中踢出释放资源的节点,并且唤醒后继节点,该方法在后面的setHeadAndPropagate方法中会详细介绍。
doReleaseShared();
return true;
}
return false;
}
3、cancelAcquire 方法
该方法的作用为:
- 处理当前取消节点的状态为 CANCELLED,并将封装的线程 thread 变量设为null。
- 从当前节点出发,
将第一个非取消前置节点与第一个非取消后置节点连接取来
,中间所有的取消节点全部剔除。 - 如果前置节点释放了锁,那么
当前节点要去唤醒第一个非取消的后置接点。
该方法源码如下:
private void cancelAcquire(Node node) {
//检查空的情况
if (node == null)
return;
//第一部分代码 ------------------------------------------------------------------
//作用1:将封装的线程 thread 变量设为null
node.thread = null;
//作用2:找到第一个非取消前置节点,并将当前节点 node 的 prev 指向该节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//记录 pred 的前置节点
Node predNext = pred.next;
//作用1:处理当前取消节点的状态
node.waitStatus = Node.CANCELLED;
//第二部分代码 ------------------------------------------------------------------
//作用2:因为node就是尾结点了,所以没有后继节点,将 prev的next设为null,
//并且更新尾结点变量 tail 为 pred即可
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
//pred 是头结点 && pred 的 waitStatus <= 0 并设为 SIGNAL
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//到这里说明 pred 不是head,且 pred 的 waitStatus = -1
//后继节点
Node next = node.next;
//如果后继节点部位空,且不是取消状态,则将 pred 的 next 指向 节点next
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//到这里说明 pred 是头结点
//将node节点的后继节点唤醒
unparkSuccessor(node);
}
//将node节点的next指向自己,方便gc回收
node.next = node; // help GC
}
}
接下来我们来分析下 cancelAcquire 方法执行过程中 所有的情况
,注意区分下源码中我标注的第一分部代码和第二部分代码。
首先我们知道调用 cancelAcquire 方法传入的节点 node 是需要被取消的代码。
情况1:节点 node 是尾结点且pred 不头结点,假设找到的prev = N3,那么刚进入该方法时的同步队列如下图:
- 第一步执行第一部分代码后:
- 将 node 节点的 thread 置为null。
- 将 node 节点的 prev 指向第一个非取消的前驱节点。
- 将 node 的 waitStatus 状态置为 1。结果如下:
- 开始执行第二部分代码:
- 因为 node 是尾结点,先设置新的尾结点为 pred。
- 在将 prev 的 next 置为 null。
- 最后 N1 和 N2 会被 gc 回收。结果如下图:
情况2:节点 node 是尾结点且pred 是头结点,那么刚进入该方法时的同步队列如下图:
- 第一步执行第一部分代码后:
- 将 node 节点的 thread 置为null。
- 将 node 节点的 prev 指向第一个非取消的前驱节点。
- 将 node 的 waitStatus 状态置为 1。结果如下:
- 开始执行第二部分代码:
- 因为 node 是尾结点,先设置新的尾结点为 pred。
- 在将 prev 的 next 置为 null。
- 最后 N1、N2、N3、N4 会被 gc 回收。结果如下图:
可以发现情况1和情况2可以归纳一种情况,即 node 是尾结点
情况3:节点 node 不是尾结点,pred 不是头结点 且 node 的后继节点不是取消状态的节点,假设 node 为 N2, pred 是 N4,那么刚进入该方法时的同步队列如下图:
- 第一步执行第一部分代码后:
- 将 node 节点的 thread 置为null。
- 将 node 节点的 prev 指向第一个非取消的前驱节点。
- 将 node 的 waitStatus 状态置为 1。结果如下:
- 开始执行第二部分代码:因为 node 不是尾结点,pred 不是 head,且 pred 的 waitStatus = -1;则:
Node next = node.next:获取到 node 的后继节点(即 N1)给 next 变量
next != null && next.waitStatus <= 0 成立,将 pred 的 next 指向 next 节点。
node.next = node:将 node 的 next 指向它自己。
- 最后结果如下图:发现 node节点(N2)执行完之后,会将 前面的所有的取消状态的节点都变成可回收的状况(图中是N3,但如果N3 与 pred 之间还有取消状态节点,也是一样的情况)。
- 那么就有一个疑问,由于 N1 的 prev 指向 N2,导致 N2 无法被 GC,那么
为何不断开 N1 指向 N2 的 prev 呢
,这样就可以回收 N2 ??- 原因很简单,如果并发场景下,此时发生对 N1 的 cancelAcquire 操作,prev 不断开可以任然遍历到前面的节点。
- 那 N2 作为取消的节点,也是需要被回收的,那么
什么时候剔除掉 N2
呢?- 当 N2 的prev 指向的前驱节点,即 pred 成功获取到同步状态且释放同步状态,并唤醒 pred 的后继节点(N1)时完成的。是在 acquireQueued 方法中完成的。源码如下:
情况4:节点 node 不是尾结点,pred 不是头结点 且 node 的后继节点是取消状态的节点,假设 node 为 N2, pred 是 N4,那么刚进入该方法时的同步队列如下图:
- 第一步执行第一部分代码后:
- 将 node 节点的 thread 置为null。
- 将 node 节点的 prev 指向第一个非取消的前驱节点。
- 将 node 的 waitStatus 状态置为 1。结果如下:
- 开始执行第二部分代码:因为 node 不是尾结点,pred 不是 head,且 pred 的 waitStatus = 1;则:
-
unparkSuccessor(node):唤醒 node 节点的第一个非取消状态的后继节点。
:为什么说 第一个飞取消状态的后继节点节点呢?我们看下面的源码: -
node.next = node:将 node 的 next 指向它自己。
-
- 由于传入的node是取消节点,waitStatus = 1,所以 可走到 for 循环,我们发现:它是从 尾部tail 向前遍历的,中止条件是
t != null && t != node
,那么为何要从后开始遍历呢?以及为什么 要判空呢? - 正如英文注释给出原因:
需要唤醒的线程保存在后继节点中,通常情况下后继节点就是下一个节点。但是如果后继节点被取消或者为空,就需要从尾部开始向后遍历,找到实际的未取消的后继节点。
- 综上:第2部分执行完成之后结果如下:那么当 pred 被唤醒时,pred 以及紧接着的取消状态的后继节点会被gc。
情况5:节点 node 不是尾结点,pred 是头结点 且 node 的后继节点不是取消状态的节点,假设 node 为 N2, pred 是 N5,那么刚进入该方法时的同步队列如下图:
- 第一步执行第一部分代码后:
- 将 node 节点的 thread 置为null。
- 将 node 节点的 prev 指向第一个非取消的前驱节点。
- 将 node 的 waitStatus 状态置为 1。结果如下:
- 开始执行第二部分代码:由于:pred = head,所以:
- unparkSuccessor(node) : 直接执行唤醒操作。
node.next = node:将 node 的 next 指向它自己。
情况6:节点 node 不是尾结点,pred 是头结点 且 node 的后继节点是取消状态的节点,假设 node 为 N2, pred 是 N5,该情况和 情况5 是一样的,第二部分直接执行唤醒操作。
至此,cancelAcquire 方法所有情况分析完了。
4、setHeadAndPropagate 方法
-
该方法作用:唤醒后继节点,体现出了共享锁的传播性,什么叫传播性呢?
- 在互斥锁中,线程释放资源后,线程所在的节点会唤醒后后继节点便结束了。但是在 共享锁种,唤醒后继节点后,获继续尝试后继节点的后继节点,这样循环往复,直到没有节点可唤醒了则退出,这便是传播性。
-
源码如下:源码并复杂,复杂的很难想象到有些地方这样写是为了什么。
private void setHeadAndPropagate(Node node, int propagate) {
//和互斥锁一样,跟新当前节点为新的头结点
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
//propagate > 0 很容易理解,因为是共享锁, > 0 说明现成还可获取临界资源,直接调用 doReleaseShared 方法
//尝试唤醒后继及诶单
//留个疑问:为什么还需要 h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 这样的判断呢
//是为了解决什么场景可能带来的问题
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
//唤醒后继节点
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
//进入死循环
for (;;) {
//获取当前同步队列的头结点
Node h = head;
//如果头结点不为空,且不为尾结点,说明除了空的头结点,还有其他节点
if (h != null && h != tail) {
//获取头结点的状态
int ws = h.waitStatus;
//满足条件则唤醒后继节点,这个很好理解,和互斥锁一样
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//留个疑问,这里的判断以及为什么要设置为 PROPAGATE 呢?是为了解决什么场景下可能产生的问题呢?
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//结合 unparkSuccessor可知,如果线程被唤醒后,会继续执行 doAcquireShared 循环,并设置新的头结点,
//那么这里就不会成立,继续循环执行,
//霍如没有节点可唤醒,这里就成立,则退出循环
if (h == head) // loop if head changed
break;
}
}
疑问一:为什么要有 propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 这样的判断?
疑问二:ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE) 为什么要将头结点 waitStatus 设置为 PROPAGATE?
- 这两个问题可以一起回答:为了避免某些情况下线程无法被唤醒。在Java的官网关于 Bug 记录中,有场景的描述,代码如下:
public class TestSemaphore {
// 这里将信号量设置成了0
private static Semaphore sem = new Semaphore(0);
private static class Thread1 extends Thread {
@Override
public void run() {
// 获取锁
sem.acquireUninterruptibly();
}
}
private static class Thread2 extends Thread {
@Override
public void run() {
// 释放锁
sem.release();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000000; i++) {
Thread t1 = new Thread1();
Thread t2 = new Thread1();
Thread t3 = new Thread2();
Thread t4 = new Thread2();
t1.start();
t2.start();
t3.start();
t4.start();
t1.join();
t2.join();
t3.join();
t4.join();
System.out.println(i);
}
}
}
- 如果没有 将头结点 waitStatus 设置为 PROPAGATE,那么过程如下:
- 时刻1:由于 Semaphore 信号量为0,t1.start(),t2.start() 执行后,进入同步队列阻塞等待唤醒。效果如下
- 时刻:2:线程 t3 执行 releaseShared 方法释放资源后 调用 doReleaseShared方法的 compareAndSetWaitStatus(h, Node.SIGNAL, 0) 将 head 的 waitStatus 从 -1 改为 0。注:这时候还未唤醒 t1。效果如下:
- 时刻3:紧接着执行 unparkSuccessor(h),将 t1 唤醒,唤醒后执行 tryAcquireShared(arg) 获的 propagate = 0 ,并且开始执行 setHeadAndPropagate 方法,但是还未执行 setHead 方法去更新head。效果不变:
- 时刻4:线程 t4 执行 releaseShared 方法释放资源后 调用 doReleaseShared方法 尝试唤醒head 的后继节点,由于 head 和 时刻 3 的head 是同一个,所以 ws = 0,无法唤醒后继节点。效果不变:
- 时刻5:时刻2唤醒线程 t1后继续执行,更新 head 为 t1,由于 时刻2 调用 setHeadAndPropagate 方法传入的参数 propagate = 0,导致也无法唤醒后继节点,所以导致 现成 t2 无法被唤醒的情况。效果如下:
如果 将头结点 waitStatus 设置为 PROPAGATE,那么过程如下:
-
时刻4:线程 t4 执行 releaseShared 方法释放资源后 调用 doReleaseShared方法,由于 head 和 时刻 3 的head 是同一个,所以 ws = 0,然后执行 (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE) 将 head 设置为 PROPAGATE。
-
时刻5:唤醒线程 t1后继续执行,更新 head 为 t1,由于 时刻2 调用 setHeadAndPropagate 方法传入的参数 propagate = 0,但是时刻4 将 head 的waitStatus 设置为 PROPAGATE,h.waitStatus < 0 成立,所以继续唤醒 t2。
-
那么为什么时刻4 不设置 -1 呢,正如 PROPAGATE = -3 的含义,共享锁下的节点状态,前驱节点不仅会唤醒后驱节点,同时也可能会唤醒后驱的后驱节点。所以 t1 被唤醒后,由于 head 的 waitStatus = -3,所以 t1 继续承担了唤醒 t2 的职责。
-
总综上,head 在并发的过程中是可能被别的线程修改的,所以需要 h == null 和 (h = head) == null 的判断,同时为了防止节点都能被唤醒,所以 head.waitStatus = 0 的情况下 设置 head.waitStatus = PROPAGATE,并且 在 setHeadAndPropagate 方法里通过 h.waitStatus < 0 的判断避免节点出现无法唤醒的情况,不得不说,设计真的非常巧妙。
参考资料:
- 深入理解AbstractQueuedSynchronizer只需15张图
- AQS源码分析
- 从PROPAGATE和setHeadAndPropagate()分析共享锁的传播性
- PROPAGATE的作用
- CAS 介绍
- 书籍 方腾飞:《Java并发编程的艺术》
画图工具:processOn