AQS在 ReentrantLock 的使用方式(非公平锁)
我们之前学习过 ReentrantLock 非公平锁与公平锁的区别在于,非公平锁不会强行按照任务等待队列去等待任务,而是在获取锁的时候先去尝试使用 CAS 改变一下 State,如果改变成功直接返回加锁成功不用排队,如果改变失败则进入等待队列。
我们简单看一下非公平锁
的源码。
如何寻找非公平锁的加锁实现呢?我们回顾一下上一节课,加锁解锁其实是由 AQS 的实现来做的,而 ReentrantLock 中对于 AQS 的实现是 Sync 内部类,Sync 实现了两种加锁方式:非公平锁和公平锁,结构关系如下:
通过这个结果关系,我们能够知道,非公平锁的加锁逻辑在java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
。
final void lock() {、
//尝试使用CAS修改state的值,修改成功后就加锁成功
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//开始加锁
acquire(1);
}
从源码中可以看到,非公平锁一进来就会直接尝试获取一次锁,不会进行太多的判断,这也符合非公平锁的定义,使用 CAS 修改如果成功了,就加锁成功,否则会执行 acquire 的加锁逻辑。
我们在上节课中分析,acquire 方法最终会调用到本身实现的 tryAcquire
:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
进入到 nonfairTryAcquire 的逻辑:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//直接尝试CAS加锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//可重入锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
在这里可以看到,它的加锁逻辑与公平锁很相似,但是与公平锁不同的是:
- 公平锁当发现 state = 0 也就是没有任务占有锁的情况下,会判断队列中是存在等待任务,如果存在就会加锁失败,然后执行入队操作。
- 而非公平锁发现 state = 0 也就是没有任务占有锁的情况下,会直接进行 CAS 加锁,只要 CAS 加锁成功了,就会直接返回加锁成功而不会进行入队操作。
我们从源码中就能够直接看出来所谓的公平锁和非公平锁的实现方式的区别。非公平锁的解锁方式与公平锁的解放方式一致,不做重复介绍。
我们使用一个流程图来彻底解释非公平锁的实现逻辑:
二、AQS在CountDownLatch的应用
与 ReentrantLock 相同的是,我们同样可以在 CountDownLatch 中寻找到 AQS 的实现类 Sync。没错,CountDownLatch 的实现也是基于 AQS 来做的。
在学习其实现原理前,先回顾一下 CountDownLatch 的使用:
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(() ->{
try {
System.out.println("线程:" +Thread.currentThread().getName() + "开始执行。");
Thread.sleep((long) (Math.random() + 10000));
System.out.println("线程:" +Thread.currentThread().getName() + "执行完成.");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
}).start();
}
System.out.println("开始等待10个线程都完成任务...");
countDownLatch.await();
System.out.println("线程全部执行完毕");
}
}
可以看到,在初始化 CountDownLatch 的时候,我们传递了 10,然后开启了 10 个线程执行任务,每一个线程执行完毕之后都会调用 countDownLatch.countDown();
来进行递减操作。我们在主线程调用 countDownLatch.await();
来等待 CountDownLatch 变为 0 后,它会解除阻塞继续向下执行!
所以,我们分析 CountDownLatch 对于 AQS 的使用应该从以下几个方面进行:
- CountDownLatch 初始化的时候,传递的 10 是什么意思?
- await 方法做了什么?
- countDown 方法做了什么?
我们按照问题的顺序逐个分析。
1. 初始化的时候做了什么?
进入到初始化的源码中:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
从源码中看到,初始化的时候,它将我们传递的数量,传递到了 Sync,上文我们了解到,Sync 是 AQS 的实现子类,所以我们从源码层面上证明了 CountDownLatch 的实现一定与 AQS 有关。
我们进入到 Sync 中查看它是如何运用这个 count 的:
Sync(int count) {
setState(count);
}
可以看到,Sync 只做了一件事,就是将 count 保存到了 AQS 的 state 中,比如我们传递的参数是 10,那么此时 AQS 中 state 的值也是 10。
2. await 方法做了什么?
了解了初始化方法之后,我们知道了此时 state 的值为 10,那么我们进入到 await 中,查看它是如何来进行阻塞线程的:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
可以看到,阻塞也是借助 AQS 来做的,我们继续:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取锁
if (tryAcquireShared(arg) < 0)
//获取失败则对任务进行入队和阻塞
doAcquireSharedInterruptibly(arg);
}
我们先分析 tryAcquireShared:
java.util.concurrent.CountDownLatch.Sync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
注意,此时 state 的数量为 10,所以这里应该返回的是 -1。我们继续看 doAcquireSharedInterruptibly :
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//创建一个节点 将节点加入到等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取当前节点的前置节点
final Node p = node.predecessor();
//如果前置节点是头节点
if (p == head) {
//再次尝试判断state的值是否为0
int r = tryAcquireShared(arg);
if (r >= 0) {
//如果state的数量为0 则r = 1, 开始讲当前节点设置为头节点 并清理废弃节点
setHeadAndPropagate(node, r);
//清理已经执行完的节点
p.next = null;
failed = false;
//返回尝试成功
return;
}
}
//阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
我们分析上述源码,当 state 的值不为 0 的时候,证明 CountDown 还没有释放完毕,此时应该阻塞,先将当前节点加入到等待队列,然后同 ReentrantLock 一样,在阻塞之前也会先判断自己是不是 head 的下一个节点,如果是的话会再次尝试判断一下 state 是不是等于 0 了,如果此时等于 0 了,就不用阻塞了,可以直接返回。
此时如果 state 依旧不为 0,则开始与 ReentrantLock 一样调用 park 进行阻塞等待唤醒。
事实上,await 阻塞的逻辑十分简单。我们总结来说,就是当程序调用 await 方法的时候,会判断 state 的值是不是 0,如果不是 0 就阻塞,是 0 就直接返回。
我们使用一张图来解释一下这个流程:
至于为什么会存在一个队列,我们之前在介绍 CountDownLatch 的时候介绍了一种“多等一”的场景(开发人员等待产品经理 PRD 的场景),每一个线程都会调用 await 等待,这里多个等待的任务就会进入到队列中。
3. countDown 方法做了什么?
直接进入到源码:
public void countDown() {
sync.releaseShared(1);
}
CountDownLatch 的 countDown 也是基于 AQS 来做的,我们进入到 AQS 的实现类java.util.concurrent.CountDownLatch.Sync#releaseShared
中:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
我们这里分为两步源码进行解读了。
先行分析 tryReleaseShared(注意,arg 的值默认为 1)java.util.concurrent.CountDownLatch.Sync#tryReleaseShared
:
protected boolean tryReleaseShared(int releases) {
for (;;) {
//获取当前的state的值
int c = getState();
//如果此时state的值为0,证明CountDownLatch已经被释放了,所以也没必要解锁释放队列中的任务了,直接返回false
if (c == 0)
return false;
//将当前的state的值减1
int nextc = c-1;
//cas将当前减1的值替换到state中,如果替换失败,因为本逻辑是一个死循环,所以替换失败会重新再来一遍逻辑
if (compareAndSetState(c, nextc))
//当state - 1 = 0的时候,证明需要唤醒等大队列中的任务了,所以返回true,否则不需要唤醒任务,返回false
return nextc == 0;
}
}
tryReleaseShared
的逻辑也是比较简单的,主要就是针对于 state 进行 -1 操作,当减 1 完成后,如果 state 的值等于 0,证明 CountDownLatch
的计数已经完成了,需要将此时 await 阻塞在队列中的任务唤醒,于是当 tryReleaseShared
返回 true 之后,doReleaseShared 将唤醒任务:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//唤醒head节点的下一个节点的阻塞
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
doReleaseShared 方法会获取当前等待队列中的头节点,然后调用 unparkSuccessor 方法将 head 节点的下一节点解除阻塞,进而完成对于 await 的唤醒。
我们使用一张图来简单解释一下 countDown 方法:
总结来说,countDown 方法主要就是对 AQS 中 State 的值进行 -1 操作,当 State 的值为 0 的时候,就开始唤醒等待队列中的任务。
三、AQS在ReentrantReadWriteLock的应用
一、前期准备
刚刚我们在上面说过,ReentrantReadWriteLock 使用一个 state 维护了两个状态。在学习之前,我们需要先去学习如何使用一个 32 位的 int 值来表示两种状态,主要是对于 Java 位运算的知识点。
我们前几节课了解到,AQS 中 state 主要是为了记录加锁的次数或者计数次数,但是在 ReentrantReadWriteLock 中存在读锁(共享锁)和写锁(独占锁)两种,那么此时只有一个 state 肯定是无法满足的,因为 state 是一个 int 值,我们知道 int 在 Java 占 32 位字节,所以我们考虑将 32 位分为高 16 位和低 16 位,如下图所示:
我们可以使用高 16 位存储共享锁,低 16 位存储独占锁,这 32 位最终表示为 10 进制就是一个数字,比如上图表示的就是 0。
那么我们如何计算来单独获取到共享锁或者独占锁的数值呢?
1. 获取独占锁的数量级
在 Java 中存在一种位运算 &
运算,这种运算模式下,它将两个操作数的每个对应位进行与运算,如果两个对应位都是 1,则结果位为 1,否则为 0,运算顺序如下图所示:
上图就是 & 运算的过程,只有全为 1,最终结果才为 1,如果我们使用低 16 位存储独占锁,那么独占锁的最大数量是低 16 位全部为 1 也就是 65535。
我们假设使用一个低 16 位全部都是 1 的二进制与 state 作 & 运算,那么我们是不是就能够得到低 16 位的数值呢?
可能没有了解过位运算的同学会懵,没关系,我们用图来表示:
也就是说,只要通过低 16 位全是 1 的值与 state 进行 & 运算,我们就能得到低 16 位的值,当然因为 16 位数据最大只能表示 65535,所以如果 state 的值大于 65535,那么计算就会出问题。所以,我们需要规定,独占锁的数量必须小于等于 65535!
我们看一下,ReentrantReadWriteLock 计算独占锁的规则,先不要管代码在哪里,后面会统一说明:
static final int SHARED_SHIFT = 16;
//得出的结果就是65535 低16位全为1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int exclusiveCount(int c) {
return c & EXCLUSIVE_MASK;
}
我们简单说明一下 EXCLUSIVE_MASK 的运算方式,<<
这个符号就是左移运算符,目的是将一个数据向左边移动 16 位,空缺位置补 0,然后再进行 -1 操作。
注意,因为这个不是本节课的重点,这里的位运算只是为了让新手更加容易理解,事实上进行 -1 操作的时候,二进制运算中会转换为(xxx + -1)来进行运算,这涉及到了补码的知识点。如果大家想学习,可以在网上查找资料学习,这里不做太多的讲解。
在上述的代码中,c 就是 state,而 EXCLUSIVE_MASK 就是基准值(低 16 位全为 1 的数据),通过 state 与基准值进行位运算,我们就能够得到低 16 位的值,也就是独占锁的数量值。
2. 获取共享锁的数量级
我们上文说到 state 采用了低 16 位来存储了独占锁,所以从 0 ~ 65535 的数据位置已经被独占锁全部占用,所以高 16 位存储共享锁就需要从 65536 来计算,也就是在二进制中至少需要第 17 位也就是高 16 位的最后一位存在数据才能存储共享锁的数量级。
如图所示,我们分别表示共享锁的数量为 1、2、3:
从上图可以看到,如果我们想要获取高 16 存储的实际的数值,需要将低 16 位的干扰排除掉,然后计算就能得到十进制的数据,所以我们需要用到另外一个计算符 >>>
,这个符号在 Java 中可以将二进制数据无符号右移,缺少的位置补充为 0,如图所示:
我们通过无符号右移 16 位,就能够完全抛弃掉低 16 位对我们运算的影响,从而达到计算共享锁数量级的目的。
如果我们想要对高 16 位进行 +1 操作呢?此时因为我们操作的是高 16 位,所以直接对 state + 1 是不行的,每一次对高 16 位 +1 都需要在实际的十进制中增加 65536 的数值。
依旧是使用一张图来证明演算过程,我们现在对高 16 位进行 +1 操作:
至此,我们学习了,如果想要计算高 16 位的值,需要将数据无符号右移 16 位来得到共享锁的数量级;如果想对当前共享锁的数量级数量 +1 ,则需要对 state + 65536
。
我们来看下 ReentrantReadWriteLock 对这个的应用。
获取共享锁的数量级:
static final int SHARED_SHIFT = 16;
static int sharedCount(int c) {
return c >>> SHARED_SHIFT;
}
可以看到,这里计算共享锁数量级方法的过程是计算高 16 位,计算过程与我们上文分析的一致。
对共享锁的数量级 +1 操作:
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
compareAndSetState(c, c + SHARED_UNIT)
这里可以看到,对于共享锁的数量级的修改是直接使用 CAS 修改为当前的 state + 65536,计算过程与我们上文说的一致。
二、源代码分析
至此,我们已经学习了在 ReentrantReadWriteLock 中如何通过一个 state 来维护两个锁的数量值,我们来分别分析一下共享锁和独占锁的加解锁的过程。
这里还是以公平锁为例进行分析,非公平锁你在闲暇之余可以根据这几章的所学来分析一下。
1. 读锁的加解锁
(1)加锁
直接进入到 java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquireShared
,源码的寻找过程跟前几章的寻找过程一致:
protected final int tryAcquireShared(int unused) {
//获取当前的线程
Thread current = Thread.currentThread();
//获取当前的state的值
int c = getState();
//判断是否存在独占锁
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//获取共享锁
int r = sharedCount(c);
//是否需要排队等动作 共享锁的数量是否小于65535 cas修改高16位是否成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//首次加锁
if (r == 0) {
//记录首次的线程 (优化)
firstReader = current;
//首次线程的加锁数量级 (优化)
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//如果还是首个线程 则直接累加,此时还没有竞争呢
firstReaderHoldCount++;
} else {
//存在线程竞争 则使用ThreadLocal缓存一个HoldCounter加锁的计数器
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
//计数器累加操作
rh.count++;
}
return 1;
}
//当上面共享锁加锁失败,则开始循环迭代加锁
return fullTryAcquireShared(current);
}
有了前面的知识其实很好理解这段代码,我们在学习 ReentrantReadWriteLock 的时候学习过它的特性,读写锁存在互斥性,即读锁和写锁不能同时共存,所以首先需要根据位运算来计算低 16 位是否存在写锁,如果存在直接加锁失败,如果不存在则继续。
在确认此时没有写锁后,根据位运算计算高 16 位的读锁的数量,当判断不需要排队和读锁的数量不超过 65535 并且 CAS 修改成功的情况下,开始执行读锁的加锁逻辑,主要有以下几个步骤:
- 如果读锁数量级为 0,则证明只有一个线程在获取锁,此时不存在锁竞争问题,所以直接通过维护 firstReader、firstReaderHoldCount 这两个变量来记录持有锁的线程和读锁的数量级,而不是使用 ThreadLocal 来记录。这样可以提高效率,因为 ThreadLocal 效率虽然高,但是依旧会损失一部分性能。
- 当读锁数量级不为 0,则证明此时可能不止一个线程在持有读锁,于是先判断是否是可重入锁的情况,如果是可重入锁,证明此时还不存在锁竞争问题,所以依旧使用 firstReaderHoldCount 来记录加锁次数,以保证重入锁的功能正确性。
- 当读锁数量级不为 0 且不是重入锁的情况下,证明此时一定存在锁竞争问题,于是开始对每一个线程使用 ThreadLocal 来单独记录一个加锁计数器,从而保证某一个线程重入锁的获取次数的正常计算。
fullTryAcquireShared 方法主要是进行兜底的操作,当发现线程需要排队、加锁次数超过 65535、CAS 加锁失败的时候进行兜底,内部工作机制如下:
- 当发现线程需要排队的时候直接返回 -1,进行入队操作。
- 当发现加锁次数超过 65535 时,抛出异常
new Error("Maximum lock count exceeded")
。 - 当 CAS 加锁失败的时候,使用死循环来不断地尝试修改 CAS 直至成功为止(自旋锁)。
我们使用一个流程图来描述整个过程:
总结来说,共享锁的加锁逻辑就是先判断是不是存在写锁,存在写锁就直接加锁失败入队,不存在就加锁成功并修改 state 的高 16 位数据,并在每一个线程维护一个计数器,来计算每一个线程加锁的次数。
(2)解锁
共享锁的解锁比较简单,解锁过程简单来说无非就是将累加器中的累加次数 -1,同时将 state 中的高 16 位 -1(state - 65536),然后再通知等待队列中的任务进行解除阻塞。
源码位置:java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
这里我们还是分两部分进行解析 tryReleaseShared 和 doReleaseShared。
tryReleaseShared
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//如果当前解锁线程是首个加锁的线程,证明没有竞争直接修改局部变量即可
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
//存在竞争 直接修改ThreadLocal中的累加器
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
//使用CAS修改数据 CAS修改失败则开始自旋修改,直至修改成功
if (compareAndSetState(c, nextc))
//重入锁的情况下,加锁次数减少一次不会为0
return nextc == 0;
}
}
正如我们前面分析的,解锁逻辑总共就做了两件事:
- 修改累加器,如果累加器为 1,则直接删除,证明该线程不再持有共享锁。
- 使用自旋的方式 CAS 修改 state 的值。
我们在这一步也就基本上了解了,有一部分同学在学习加锁的时候疑惑的 HoldCounter 的意义,它就是为了记录某一个线程的加锁次数,以方便在后续解锁的时候,释放重入锁进行计数。HoldCounter 与 state 不同的是,HoldCounter 记录的是单个线程持有读锁的次数,而 state 记录的是整个锁的状态,包括共享锁的持有次数和排他锁的持有信息。
doReleaseShared
在上一步释放锁成功后,会尝试将等待队列中的任务进行唤醒,具体方法如下:
java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared
private void doReleaseShared() {
for (;;) {
//获取头节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
//初始化状态waitStatus为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 将head节点的下一个节点解除阻塞
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
这一块代码主要的作用就是将 head 节点后面的节点调用 unPark 方法进行解除阻塞,以达到让等待队列中的任务能够继续执行的目的。
这里我们将共享锁(读锁)的加解锁做了一个详细的介绍,事实上,它的加解锁方式与 ReadWriteLock 的根本区别就是对于共享锁的加锁,在没有写锁的情况下,会直接加锁成功,同时在 state 中维护读锁的加锁数据。存在写锁,则直接加锁失败入队。
2. 写锁的加解锁
我们在上文详细学习了读锁的加解锁操作,下面我们开始正式分析写锁的加解锁的操作。
(1)加锁
代码位置在:java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里可以看到,它的加锁方式与 ReentrantLock 一样,我们只需要重点关注 tryAcquire 方法在 ReentrantReadWriteLock 中的实现就行了,其他方法在前面分析过,不再重复讲解。
先思考一下,如果是你,你该对写锁如何加锁?
首先,它是一个独占锁,所以我们需要先判断 state 的低 16 位是不是已经存在独占锁了,如果已经存在独占锁了,那么我们就需要判断是不是重入锁!如果 state 中已经存在独占锁了,而且也不是重入锁,那么直接加锁失败,将任务放到任务队列中就可以了。
其次,如果没有独占锁,因为 16 位字节的限制,所以独占锁的限制不能超过 65535 个,如果超过了就直接报错。
然后在既没有独占锁占用又没有超过最大值的情况下,我们直接就 state + 1,然后返回加锁成功就可以了。
那么,我们一起来看一下源码中是如何做的。
源码位置:java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquire
。
protected final boolean tryAcquire(int acquires) {
//获取当前的线程
Thread current = Thread.currentThread();
//获取当前State的值
int c = getState();
//计算低16位的值
int w = exclusiveCount(c);
//如果 state!=0 证明state存在读锁或者写锁
if (c != 0) {
//如果w = 0 证明State中存在读锁 不存在写锁,读写不能共存,所以直接加锁失败
//如果w != 0 这个名State中存在写锁,此时需要判断是不是重入锁,不是重入锁也直接失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//如果没有读锁,那么判断写锁数量是否超过最大值,超过直接报错
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//经过上面的判断,这里就符合w!=0 而且符合重入锁的规则,直接向上累加加锁次数就可以了
setState(c + acquires);
return true;
}
//如果state == 0 证明即不存在写锁也不存在读锁,此时需要判断等待队列中是否存在任务,不存在而且CAS能够修改成功就返回加锁成功,否则加锁失败
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
从代码注释中可以看到,代码中的设计原理与我们之前设想的基本一致。还是老规矩,我们使用一张流程图来说明问题:
(2)解锁
了解了写锁的加锁步骤之后,解锁步骤能猜出来:
- 将 state - 1;
- 判断当前 state 的写锁数量,如果为 0 的话证明重入锁释放完毕,直接将加锁线程置空,并解锁成功。
猜测完毕后,我们来看实际的源码。
源码位置:java.util.concurrent.locks.AbstractQueuedSynchronizer#release
。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
同样的,代码我们都分析过,我们这里只重点分析 tryRelease。
源码位置:java.util.concurrent.locks.AbstractQueuedSynchronizer#tryRelease
。
protected final boolean tryRelease(int releases) {
//判断解锁线程是不是加锁线程 不是就直接报错
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//state - 1
int nextc = getState() - releases;
//判断重入锁是否解锁成功
boolean free = exclusiveCount(nextc) == 0;
if (free)
//加锁线程置空
setExclusiveOwnerThread(null);
//cas设置state
setState(nextc);
return free;
}
可以看到,与我们的猜测基本一致,比较简单,这里不再画图说明。