简介
AQS(AbstractQueuedSynchronizer)是JUC包中的核心抽象类,许多并发工具的实现,包括ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore 等都是基于此实现。
AQS,主要提供了表示同步状态的 state 变量,以及抢不到锁时的线程进入的 等待队列 及其排队逻辑。队列使用内部类 Node 进行实现。Node中包含了实现双向队列的必要属性:prev 指向前驱节点、next 指向后继节点,thread 属性指向当前等待节点对应的线程,waitStatus 表示当前线程节点是否正常(节点有可能被取消或其他状态)
AQS 继承自另一个抽象类:AbstractOwnableSynchronizer,AbstractOwnableSynchronizer比较简单,内部维护了 exclusiveOwnerThread 变量表征当前持有锁(独占锁)的线程,对应的getter、setter方法因为只有在获取到锁后会被调用,因此是线程安全的,不需要不同访问。
下面以 ReentrantLock 的实现为例,介绍对应的AQS的继承类与它的配合逻辑。
ReentrantLock
ReentrantLock 内部定义了 AQS 的继承类 Sync,NonfairSync、FairSync 分别对应了非公平锁、公平锁的实现。我们常用的ReentrantLock类的 lock、unlock 方法会委托给 NonfairSync 或 FairSync 的 lock、release 方法实现。
lock 实现
非公平锁 NonfairSync,直接对state变量执行 CAS 原子性操作,如果成功的话,则设置持有锁的线程为当前线程,否则执行 acquire 方法。而 公平锁 FairSync 则是直接执行 acquire 方法。
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
}
NonfairSync#lock
acquire 方法在 AQS 中 进行定义,我们提到 AQS 实现定义了state 变量,但具体使用,以及是否成功获得锁,则是由子类进行定义,因此在 acquire 内部执行了 子类的 tryAcquire 方法,来让子类决定是否成功获得锁,如果成功则直接退出,否则将当前线程加入到线程等待队列中。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我们先来看非公平锁的 tryAcquire–nonfairTryAcquire 实现:首先获得state的实时值,如果为 0 表示没有线程占用,立即去抢,抢成功直接返回true,进而退出 acquire。如果当前线程就是持有锁的线程,则对state变量进行累加赋值,同样返回true,退出acquire。否则返回false,表示有其他线程已经持有锁。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
FairSync#lock
公平锁的实现:拿到state值后,如果为0,没事没有线程占用锁,然后先判断排队队列中是否有线程等待,如果有,则返回flse,将自己添加到队列尾部(AQS的acquire中实现)。如果队列中没有等待线程,再去执行 CAS state 的动作。后续的逻辑与 非公平锁 一致了。
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
一句话总结,公平与非公平锁的区别在于:线程抢锁之前,是否要关注锁的等待队列中有无节点,非公平锁是不看,来了直接抢,公平锁是要看,人家先来的要先执行,我到后面去等着执行。实践证明,非公平锁的效率更高,是因为有可能持有锁的线程很快会释放锁,这样非公平锁直接获取,减少了去内核中阻塞的线程数量,提高了执行的效率。
AbstractQueuedSynchronizer#acquire
lock方法实现的核心逻辑还是在 AQS 的 acquire 中定义,前面我们说过了线程是否成功获得锁的方法 tryAcquire 在子类中的定义,我们再回过来看如果没有成功获得锁,tryAcquire 返回false的情况。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter
首先会执行 addWaiter 方法,将当前线程添加到等待队列中。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
主要逻辑如下
- 创建当前线程对应的Node节点
- 如果队里的 tail 指针不为空,则将新建的node设置为新的tail,然后设置原来的 tail 的 next 指针指向当前线程的node。
- 如果tail为空,则进入的enq方法中执行。
这里我们可以看到,在set Tail 的时候,也是用的CAS操作,也就是说,在进入队列的时候,也是有并发情况的。我们可以画出添加新节点之前队列的示意图。
在执行 compareAndSetTail 之前,优先将新节点的prev进行了设置,这一步是对线程结构没有任何影响的,然后在 compareAndSetTail 之后设置了 原来的 tail 指向自己,完成整个步骤。
再来看 enq 的逻辑:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- 忽略for循环,tail不为空的情况下与外层的 addWaiter 方法执行逻辑是一致的,这里for循环保证了 compareAndSetTail 一次不成功的情况下继续执行。
- 如果tail为空,说明队列还没有被初始化,此时优先初始化 head 指针,然后将 head 赋值给 tail,再下一轮循环中将入参的 node 添加到 tail 后面。
由此可见 addWaiter 在调用 enq 之前,优先判断 tail 不为空,仅执行一次 compareAndSetTail 操作,是一步优化过程,完全可以忽略,直接进入到enq,因为enq中有更完备的逻辑,外层的代码仅作为优化存在。外层执行一次 compareAndSetTail 失败,还是会进入到 enq中的。并且当队列为空时,也必定会进入到enq中,但大多数情况下可能并没有入队时的竞争,因此这里值得此优化。
acquireQueued
线程进入等待队列后,执行到 acquireQueued 方法。按逻辑推理,进入到等待队列的线程,应该去阻塞了吧,这应该是 acquireQueued 的主要目的,但看一下实现逻辑其实并没有这么简单。
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- for循环的作用是,在线程被唤醒后,仍然是在这个方法for循环内部,并且因为非公平锁的存在,所以该方法内部调用 tryAcquire 并不一定能够执行成功,因此这里必须有for循环才可。
- 获取到当前线程的前驱节点,如果为head(head为当前持有锁正在执行的线程节点),则去尝试获取锁,因为下一个执行的就是自己,如果成功获取,不必再去阻塞了,直接执行即可。
- 如果前驱节点不是 head,则执行 shouldParkAfterFailedAcquire 判断并更新前驱节点的状态,如果返回true,表示当前线程节点的前驱节点是正常的,自己应该去执行 parkAndCheckInterrupt 并阻塞,否则需要重复循环并再次执行 shouldParkAfterFailedAcquire。
shouldParkAfterFailedAcquire 简单来说是想检查一遍前面的线程节点,如果有取消状态的,则直接忽略掉,修改新节点的前驱节点为正常状态的临近节点。开头我们提过,node对象中包含有 waitStatus 属性,这里正是检查了该属性的值,如果为 CANCELLED( = 1) 状态,则直接从链表中删除。方法的返回结果表征当前node节点是否应该去阻塞。具体代码如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire 返回true,则执行 parkAndCheckInterrupt 线程进入阻塞状态了。在被唤醒之时,则会重新进行循环,去执行 if (p == head && tryAcquire(arg)) 的逻辑。
总结
本文解析了 ReentrantLock 的公平锁、非公平锁的实现源码,分析了非公平与公平锁的区别,并认识了AQS的实现。AQS 的 acquire 正是加锁逻辑的模板方法模式。父类中定义算法的整体逻辑,内部调用子类的局部实现,完成整个的逻辑。
同时我们也认识了state的定义与使用,AQS 只声明了改变量,但没有具体定义要怎么使用,具体使用同样交给子类,我们也看到了 ReentrantLock 中的使用 大于0 表示持有锁,等于0表示没有线程持有锁,进一步,我们也可以认识重入锁的原理,即持有锁的线程去访问另一个同样要求获取同一个锁的代码片段的时候,在持有锁的基础上,继续累加 state 实现。