文章目录
- 3.1 加锁
- 3.1.1 读锁加锁
- 3.1.1.1 tryAcquireShared()
- 3.1.1.2 readerShouldBlock()
- 3.1.1.3 fullTryAcquireShared()
- 3.1.1.4 doAcquireShared()
- 3.1.2 写锁加锁
- 3.1.2.1 tryAcquire()
- 3.1.2.2 acquireQueued()
- 3.2 加锁示意图
- 3.2.1 先写锁在读锁
- 3.2.2 先读锁在写锁
- 后记
3.1 加锁
3.1.1 读锁加锁
当前读写锁未加锁,开始加读锁,看下读锁加锁过程。源代码如下:
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
执行流程如下:
- 读锁加锁,调用sync.acquireShared()加读锁
- acquireShared()方法,执行tryAcquireShared()尝试加读锁,返回负数表示失败;返回整数成功,方法结束。
- 这里不会有返回0的情况
- 加锁失败,执行doAcquireShared()执行读锁阻塞流程,这个下面讲解。
- 下面我们详细分析下读锁加锁在tryAcquireShared()方法中的主要逻辑。
3.1.1.1 tryAcquireShared()
源代码如下:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
执行流程如下:unused值为1
-
获取当前线程,获取锁状态(计数)
-
第一步判断如果当前已经加写锁且锁持有线程非当前线程,返回-1
- 当前加写锁且锁持有线程是当前线程的情况就是当前线程以获取读锁,现在想获取读锁即锁降级情况,后面关于读写锁特性分析方法,详细讲解。
-
获取读锁计数,具体计算方式,前面有提到
-
第二步判断
-
读锁是否应该被阻塞
-
满足1,读锁计数是否小于最大容量
-
满足1,2,通过cas方式对状态计数+1
-
-
如果第二步执行成功,对线程读锁进行计数
- 如果是读锁计数为0,即第一次加读锁
- firstReader单独记录当前线程,firstReaderHoldCount对当前线程计数1
- 如果当前线程是第一个加读锁的线程,表示该线程读锁重入,计数+1
- 否则获取缓存的计数器,判断如果为null
- 通过readHolds获取当前线程的计算器
- 不为null,判断计数如果=0,通过readHolds设置计算器
- 计算器计数+1
- 返回1,获取锁成功
- 如果是读锁计数为0,即第一次加读锁
-
如果第二步执行失败,执行fullTryAcquireShared()方法
相关说明:
- 对第一个加读锁的线程有单独的线程变量记录,计数器。其他后续加读锁的线程,通过readHolds设置或获取计数器。
- readHolds继承ThreadLocal,多线程环境下不会有安全问题。ThreadLocal相关知识,可以查看之前的讲解或者查阅相关文档。
- 第二步的3个判断条件,只有上一个满足的情况下才会继续向下判断。
- 这里看出加读锁是可以共享的,通过线程计数器记录同一线程读锁锁重入,锁计数和线程计数器后面解锁时会用到。
3.1.1.2 readerShouldBlock()
此处是读写锁非公平锁中的方法,看下源代码:
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
判断如果锁竞争队列第二个结点是获取写锁的线程结点,返回true,其他情况返回false。目的防止一直执行获取读锁的线程执行,而获取写锁的线程一直阻塞,下面具体分析。
3.1.1.3 fullTryAcquireShared()
读锁尝试加锁第二步没执行成功,会执行该方法,源代码如下:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
对应之前的3个判断条件失败的情况,做了相应的处理,执行流程如下:
- 设置计算器变量rh
- for循环没有结束条件
- 获取锁状态
- 判断是否是加的写锁
- 是,判断锁持有线程不是当前线程
- 返回-1,失败
- 否的话死锁了
- 是,判断锁持有线程不是当前线程
- 否则判断是不是readerShouldBlock(),
- 是,说明此时锁竞争队列第二个结点为获取写锁的线程结点
- 判断如果是第一个加写锁的线程再次获取读锁,这里没必要
- 否则判断rh为空
- 获取缓存的计数器
- 判断rh为空或者计数器记录线程非当前线程
- 获取当前线程的计数器
- 如果计数为0直接移除,释放资源
- 判断rh计数为0,返回-1
- 是,说明此时锁竞争队列第二个结点为获取写锁的线程结点
- 判断是不是由于读锁计数达到容量上限,是的话直接抛错误
- 非上述情况,就是之前cas锁计数+1失败,再次尝试cas锁计数+1
- 成功执行同3.1.1.1中一样的操作,不在重复。
说明:
- 进入readerShouldBlock()为true的语句块的话,如果是新的获取读锁的线程,会返回-1,执行阻塞流程,目的就是我们在3.1.1.2 中说的,不在重复;如果不是新的获取读锁的线程,那就说明是正在执行已经获取了读锁的线程,这种情况下我认为没有必要,会判断执行计数器清理工作。至于什么环境下会出现后一种情况,暂时不知,遇到的话在记录。
- 我们这个方法执行是在3.1.1.1方法中调用,如果当前加写锁情况下,又获取读锁,锁持有线程不是当前线程,直接返回-1没问题。如果锁持有线程是当前线程,表示同一线程在获取了写锁的情况下,又获取读锁。
- 第一种情况,3.1.1.1中第二步的执行失败,进入3.1.1.3执行,会死锁
- 3个判断条件,第二个判断容量的一般成功,第三个cas,既然持有锁也成功。失败的情况只能是第一个判断条件失败。
- 所以虽然读写锁支持锁降级,一般不建议使用。
- 第一种情况,3.1.1.1中第二步的执行失败,进入3.1.1.3执行,会死锁
3.1.1.4 doAcquireShared()
尝试加锁失败后,执行获取读锁线程的阻塞流程,源代码如下:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
有没有很熟悉的赶脚😄,对的之前我们讲解过ReentrantLock的加锁流程方法(传送门==加锁和解锁-ReentrantLock详解-AQS-并发编程(Java)==)主要流程一样,下面我们吧不一样的地方讲解下:
- addWaiter(Node.SHARED)这里结点类型为共享结点,默认null即独占结点
- 共享结点:即获取读锁的结点,在唤醒的时候,会把连续的共享结点唤醒,想象一下撸串的感觉🤤
- 独占结点:获取写锁的结点,唤醒的时候只唤醒当前结点,且会把所持有线程设置为结点线程。
- tryAcquireShared()这里尝试获取读锁,具体逻辑3.1.1.1中有讲解。
- setHeadAndPropagate(),这里会执行唤醒共享结点的操作,具体我们放在解锁中讲解。
3.1.2 写锁加锁
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
依然很眼熟,下面讲解了和ReentrantLock不一样的tryAcquire()和acquireQueued()方法,其他参考ReentrantLock加锁流程。
3.1.2.1 tryAcquire()
源代码如下:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
写锁尝试加锁流程如下:
- 获取锁状态,获取写锁计数
- 第一步判断锁计数不等0,说明已经加锁
- 如果写锁计数为0或者锁持有线程非当前线程,返回false,失败
- 写锁计数为0,说明加的是读锁,读写互斥
- 判断如果写锁计数+1超过读锁容量,抛错误
- 锁计数+1,返回true,加写锁成功
- 这里没有通过cas设置原因,程序执行到这儿,一定是当前线程持有锁
- 如果写锁计数为0或者锁持有线程非当前线程,返回false,失败
- 锁计数为0 表示没有加锁
- 执行第二步的判断,2个条件
- 第一个条件判断获取写锁线程是否应该被阻塞
- 公平锁实现:判断如果锁竞争队列有前驱结点,返回true;否则返回false。非公平实现:直接放回false
- 第二个判断条件,cas直接设置锁状态
- 执行成功,返回false,尝试加锁失败
- 第一个条件判断获取写锁线程是否应该被阻塞
- 继续执行,说明cas尝试设置状态成功
- 设置锁持有线程为当前线程,返回true,成功加写锁
说明:
- 关于读写锁公平和非公平实现,只在writerShouldBlock()和readerShouldBlock()这2个方法有区别,其他都相同,具体逻辑,在后面关于读写锁的特性在讲解。
3.1.2.2 acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// tryAcquire不同于ReentrantLock
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
不说了,这里不同的是tryAcquire()这里是尝试获取读锁,其他参考ReentrantLock。
因为之前学习AQS之一的ReentrantLock,这里相对来说容易理解,就没有在画流程图,简单过了下加锁流程。
3.2 加锁示意图
3.2.1 先写锁在读锁
以先加写锁为例,thread-0获取写锁,示意图如下3.2-1所示:
然后thread-1,thread-2尝试获取读锁,thread-3尝试获取写锁,示意图如下3.2-3所示:
- 其中结点的nextWaiter值为Shared为共享结点,即获取读锁的线程结点;值为null为独占结点,即为获取写锁的线程结点。
3.2.2 先读锁在写锁
thread-0,thread-1尝试获取读锁,thread-2尝试获取写锁,thread-3,thread-4尝试获取读锁,示意图如下3.2.2-1所示:
后记
如有问题,欢迎交流讨论。
❓QQ:806797785
⭐️源代码仓库地址:https://gitee.com/gaogzhen/concurrent
参考:
[1]黑马程序员.黑马程序员深入学习Java并发编程,JUC并发编程全套教程[CP/OL].2020-01-18/2022-12-12.p253~p255.