AQS独占锁模式源码分析
- 1、tryAcquire()、acquire()方法
- 2、addWaiter()方法
- 3、acquireQueued()方法
- 4、shouldParkAfterFailedAcquire()方法
- 5、tryRelease()、release()方法
1、tryAcquire()、acquire()方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
若上层调用tryAcquire返回true,线程获得锁,此时可以对相应的共享资源进行操作,使用完之后再进行释放。如果调用tryAcquire返回false,且上层逻辑上不想等待锁,那么可以自己进行相应的处理(@override tryAcquire()方法);如果上层逻辑选择等待锁,那么可以直接调用acquire方法,acquire方法内部封装了复杂的排队处理逻辑,非常易用。
假如tryAcquire返回false,说明需要排队,那么就进而执行
acquireQueued(addwaiter(Node.EXCLUSIVE),arg),acquireQueued方法其中嵌套了addWaiter方法。
2、addWaiter()方法
将当前线程封装成Node加入等待队列的队尾。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 先尝试用最快的方式入队,如果失败再执行完整的入队方法
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
上述代码通过使用compareAndSetTail(Node expect,Node update)方法来确保节点能够被线程安全添加。
完整的入队方法 enq()
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在enq(final Node node)方法中,同步器通过 “死循环” 来保证节点的正确添加,在 “死循环” 中只有通过CAS将节点设置成尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置。可以看出,enq(final Node node)方法将并发添加节点的请求通过CAS变得 “串行化” 了。
节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程)
AQS在各个线程中维护了当前Node的waitStatus,根据不同的状态,程序来做出不同的操作。通过调用acquireQueued 方法,开始对Node的waitStatus进行跟踪维护。
3、acquireQueued()方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //获取当前节点的前置节点
if (p == head && tryAcquire(arg)) { //如果当前节点的前置节点为head,且当前节点成功地获得锁
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
基本可以将acquireQueued分成三部分:
▲第一个 if 判断条件:如果当前节点的前置节点为head,说明当前节点有权限去尝试拿锁。如果tryAcquire返回true,代表拿到了锁,那么顺理成章,函数返回。
▲第二个 if 判断条件:包含两个方法,看名字是首先判断当前线程是否需要挂起等待?如果需要,那么就挂起。并且判断外部是否调用线程中断;如果不需要,那么继续尝试拿锁(自旋)。
▲如果 try 块中抛出非预期异常,那么取消当前线程获取锁的行为。
那么接下来分别来看 shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()方法。
4、shouldParkAfterFailedAcquire()方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
若当前节点没有拿锁的权限或拿锁失败,那么将会进入shouldParkAfterFailedAcquire(Node pred, Node node)判断是否需要挂起,方法的参数是pred Node和当前Node的引用。
方法的流程其实注释已经写的很清楚了:
1、若pred的waitSatus为SIGNAL,说明前置节点也在等待拿锁,并且之后将会唤醒当前节点,所以当前线程可以挂起休息,返回true。
2、如果ws大于0,说明pred的waitSatus是CANCEL,所以可以将其从队列中删除。这里通过从后向前搜索,将pred指向搜索过程中第一个waitSatus为非CANCEL的节点。相当于链式地删除被CANCEL的节点。然后返回false,代表当前节点不需要挂起,因为pred指向了新的Node,需要重试外层的逻辑。
3、除此之外,pred的ws还有两种可能,0或PROPAGATE,有人可能会问,为什么不可能是CONDITION?因为waitStatus只有在其他条件模式下,才会被修改为CONDITION,这里不会出现,并且只有在共享模式下,才可能出现waitStatus为PROPAGATE,暂时也不用管。那么在独占模式下,ws在这里只会出现0的情况。0代表pred处于初始化默认状态,所以通过CAS将当前pred的waitStatus修改为SIGNAL,然后返回false,重试外层逻辑。
如果shouldParkAfterFailedAcquire返回false,那么再进行一轮重试;如果返回true,代表当前节点需要被挂起,则执行parkAndCheckInterrupt方法。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //线程将在这里被挂起(阻塞)
return Thread.interrupted();
}
当需要阻塞或唤醒一个线程的时候,都会使用LockSupport工具类来完成相应的工作。这里LockSupport.park(this)本质是通过Unsafe下的native方法调用操作系统原语来将当前线程挂起。此时当前Node中的线程将阻塞在此处,如果调用unpark(Thread thread)方法或者当前线程被中断,才能从park()方法返回。
通过对acquireQueued这个方法的分析,我们可以这么说,如果当前线程所在的节点处于头节点的后一个,那么它将会不断去尝试拿锁,直到获取成功。否则进行判断,是否需要挂起。这样就能保证head之后的一个节点在自旋CAS获取锁,其他线程都已经被挂起或正在被挂起。这样就能最大限度地避免无用的自旋消耗CPU。
当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)
5、tryRelease()、release()方法
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
该方法执行时,会唤醒头结点的后继节点线程,unparkSuccessor(Node node)方法使用LockSupport来唤醒处于等待状态的线程。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
获取head的waitStatus,如果不为0,那么将其置为0,表示锁已释放。接下来获取后续节点如果后续节点为null或者处于CANCELED状态,那么从后往前搜索,找到除了head外最靠前且非CANCELED状态的Node,对其进行唤醒,让它起来尝试拿锁。
被挂起的线程一旦被唤醒,那么它将会继续执行acquireQueued()这个方法进行自旋尝试获取锁。这样就形成了一个良好的闭环,拿锁、挂起、释放、唤醒都能够有条不紊的进行。