文章目录
- 一、AQS 介绍
- 二、通过ReentrantLock分析AQS的实现
- 2.1、获取锁流程
- 2.2、获取锁源码分析
- 2.2.1、acquire
- 2.2.2、tryAcquire
- 2.2.3、addWaiter
- 2.2.4、acquireQueued
- 2.2.5、shouldParkAfterFailedAcquire
- 2.3、解锁源码分析
- 2.3.1、unlock
- 2.3.2、release
- 2.3.3、tryRelease
- 2.3.4、unparkSuccessor
- 三、公平锁&非公平锁
一、AQS 介绍
AQS(AbstractQueuedSynchronizer)就是一个抽象类,其主要维护一个资源同步状态变量的值(state)和一个存放排队线程的CLH双向队列,同时线程阻塞等待以及被唤醒时锁分配的机制。具体结构如下:
AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的先进先出的CLH队列来完成获取资源线程的排队工作,将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对State值的修改。
CLH锁其实就是一种是基于逻辑队列非线程饥饿的一种自旋公平锁,由于是 Craig、Landin 和
Hagersten三位大佬的发明,因此命名为CLH锁。
AQS有一个内部类Node,Node节点对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等。
这里可以直观地看出该node节点会存储当前被阻塞的请求资源线程,变量waitStatus则表示当前Node节点的等待状态,共有5种取值如下:
- CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
- SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
- CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,- CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
- PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
- 0:新结点入队时的默认状态。
AQS的关键方法有以下几个:
- acquire():用于线程获取同步器的状态,如果无法获取,则线程会进入阻塞状态等待其他线程释放同步器。
- release():用于线程释放同步器的状态,并唤醒等待队列中的其他线程。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。•
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余可用资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点则返回true,否则返回false。
二、通过ReentrantLock分析AQS的实现
ReentrantLock是JUC并发包中一个重要常用的同步器,其底层就是依赖AQS实现的。下面我们将详细看下ReentrantLock的实现。
2.1、获取锁流程
ReentrantLock抢占锁交互图:
整体流程如下
2.2、获取锁源码分析
下面我们分别看下这些方法的源码:
打开lock.lock()这时候跳转到ReentrantLock的内部类NonfairSync(NonfairSync继承自AQS)的lock()方法:
2.2.1、acquire
acquire()方法是AQS中的方法,这里就是线程加锁占位资源不成功,把线程放到CLH队列等待通知唤醒的核心入口处
核心流程步骤如下:
- tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待);
- addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
2.2.2、tryAcquire
FairSync.tryAcquire的实现如下:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这段代码是 AbstractQueuedSynchronizer (AQS) 中的 nonfairTryAcquire 方法的实现。该方法用于非公平地尝试获取同步器的状态。
- 首先,获取当前线程并获取当前的状态值 c。
- 如果当前状态值 c 为 0,表示同步器当前没有被其他线程占用。在这种情况下,直接使用 compareAndSetState 方法将状态从 0 修改为 acquires(期望值为 0,更新值为 acquires)。如果修改成功,表示当前线程成功获取了同步器,并将当前线程设置为独占式锁的拥有者,然后返回 true。
- 如果当前状态值 c 不为 0,表示同步器已经被其他线程占用。此时需要判断当前线程是否为同步器的独占式锁拥有者,如果是,则将当前状态值加上 acquires(增加锁的持有次数),并返回 true。
- 如果以上条件都不满足,则表示当前线程无法获取同步器的状态,返回 false。
2.2.3、addWaiter
此方法用于将当前线程加入到等待队列CLH的队尾,并返回当前线程所在的结点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
- 首先,创建一个新的节点 Node,该节点包含当前线程和指定的等待模式 mode。
- 尝试使用快速路径 (fast path) 将新节点添加到队列中。首先获取当前队列的尾节点 pred,如果尾节点不为 null,则将新节点的 prev 指向 pred,并尝试使用 compareAndSetTail 方法将尾节点更新为新节点。如果更新成功,则将原尾节点 pred 的 next 指向新节点,并返回新节点。
- 如果快速路径失败,即尾节点为 null 或者 compareAndSetTail 失败,表示有其他线程在并发修改队列。这时候需要使用完整的入队操作 (enq) 将新节点添加到队列中。
- 在 enq 方法中,首先通过自旋操作将新节点添加到队列的尾部。
- 返回新节点。
该方法的作用是将当前线程作为等待线程添加到同步队列中。它采用了一种乐观的策略,首先尝试使用快速路径将新节点添加到队列尾部,如果失败则使用完整的入队操作。通过这种方式,可以在大多数情况下避免竞争和线程阻塞,提高并发性能。
2.2.4、acquireQueued
该方法用于在同步队列中获取锁,当无法直接获取锁时,将当前线程加入到等待队列中并进行自旋等待。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 首先,设置一个标志位 failed 为 true,用于记录获取锁的过程中是否出现异常。
- 在 try-catch-finally 块中进行循环,直到成功获取到锁或被中断。
- 在循环中,首先获取当前节点的前驱节点 p,判断当前节点的前驱节点是否为头节点且尝试获取锁成功(通过调用 tryAcquire 方法)。如果是,则将当前节点设置为新的头节点,断开原头节点和当前节点的连接,将前驱节点的 next 指针置为 null,最后将标志位 failed 设为 false,并返回中断状态 interrupted。
- 如果前驱节点 p 不满足条件,调用 shouldParkAfterFailedAcquire 方法判断是否需要阻塞当前线程,并调用 parkAndCheckInterrupt 方法进行线程阻塞,并检查线程是否被中断。如果被中断,则将中断状态 interrupted 设置为 true。
- 循环回到第2步,继续尝试获取锁或阻塞。
- 如果循环结束时仍然无法获取锁,即获取锁的过程中出现异常,则调用 cancelAcquire 方法取消当前节点的获取操作。
2.2.5、shouldParkAfterFailedAcquire
该方法用于判断在获取锁失败后,当前线程是否应该进行阻塞:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- 首先,获取前驱节点 pred 的等待状态 waitStatus。
- 如果等待状态 waitStatus 等于 Node.SIGNAL,表示前驱节点已经设置了状态要求释放锁来发出信号,因此当前节点可以安全地进行阻塞,返回 true。
- 如果等待状态 waitStatus 大于 0,表示前驱节点已被取消。循环遍历跳过前驱节点及其之前的已被取消的节点,直到找到一个等待状态小于等于 0 的前驱节点 pred。然后更新当前节点的 prev 指针为找到的前驱节点 pred,将前驱节点 pred 的 next 指针指向当前节点 node。
- 如果等待状态 waitStatus 为 0 或 Node.PROPAGATE,则表明需要一个信号来唤醒当前节点,但不立即进行阻塞。调用 compareAndSetWaitStatus 方法将前驱节点 pred 的等待状态修改为 Node.SIGNAL,以表示需要一个信号。
- 返回 false,表示当前线程不需要进行阻塞。
2.3、解锁源码分析
2.3.1、unlock
NonfairSync(NonfairSync继承自AQS)的unlock()方法:
public void unlock() {
sync.release(1);
}
2.3.2、release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
2.3.3、tryRelease
该方法用于释放锁资源。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
-
首先,获取当前锁的状态值 c = getState() - releases,表示要释放锁后的新状态。
-
判断当前线程是否是独占锁的拥有者,如果不是则抛出 IllegalMonitorStateException 异常。
-
根据新状态 c 进行处理:
- 如果新状态 c 等于 0,表示锁已完全释放。将 exclusiveOwnerThread 设置为 null,表示当前没有拥有者,同时将 free 标志设置为 true。
- 否则,更新锁的状态为新状态 c。
-
返回 free 标志,表示锁是否已完全释放。
2.3.4、unparkSuccessor
如果上面释放资源成功返回true,那么就会执行unparkSuccessor()唤醒等待队列里的下一个线程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
- 首先,判断节点的等待状态 ws,如果 ws 小于 0,则尝试将其置为 0。这是为了预先清除可能需要信号的状态。即使此操作失败或等待线程更改了状态,也不会影响后续的操作。
- 获取节点的后继节点 s,通常后继节点就是当前节点的下一个节点。但如果后继节点被取消或为空,就从尾部向前遍历找到实际未被取消的后继节点。这是为了找到真正需要唤醒的节点。
- 如果找到了需要唤醒的后继节点 s,则调用 LockSupport.unpark(s.thread) 方法唤醒该节点对应的线程。
三、公平锁&非公平锁
ReentrantLock分为公平锁和非公平锁,默认构造方法是非公平锁,如果我们需要构建公平锁,只需要传参true即可:
Lock lock = new ReentrantLock(true);
其加锁解锁流程几乎跟上面非公平锁一样,一些细节有些不同:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这里lock()没有和非公平锁的一样先使用CAS尝试加锁占有资源,是直接调用acquire(1)进行入队操作。加锁这里多了hasQueuedPredecessors的逻辑
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}