本文中会涉及到一些前面 ReentrantLock 中学到的内容,先去阅读一下我关于独占锁 ReentrantLock 的源码解析阅读起来会更加清晰。
初探 JUC 并发编程:独占锁 ReentrantLock 底层源码解析
6.4)读写锁 ReentrantReadWriteLock 原理
前面提到的 ReentrantLock 是独占锁,某个时间只有一个线程可以获取这个锁,而实际情况中会出现读多写少的情况,ReentrantLock 无法满足这个需求,所以就有了读写锁 ReentrantReadWriteLock。这个锁采用了读写分离的策略,允许多个线程同时获取读锁。
6.4.1)类图结构
ReentrantReadWriteLock 的类图结构如图所示,类中维护了一个 ReadLock 和 WriteLock,它们依赖 Sync 实现功能,而 Sync 继承自 AQS,也提供了公平和非公平的实现。
下面来看一下 Sync 中的属性和常用方法:
因为读写锁中维护了读锁和写锁两个状态,但是 AQS 只提供了一个 state;读写锁中巧妙的使用了 state 的高 16 位表示读状态,也就是获取到读锁的次数,使用低十六位表示写的次数。
static final int SHARED_SHIFT = 16;
// 读锁状态单位值 65536
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 读锁的状态单位值 65536
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 写锁掩码,15 个 1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 返回读锁线程数 */
static int sharedCount(int c) {
// c 一般为 state 的值,将值右移 16 为
return c >>> SHARED_SHIFT;
}
/** 返回写锁的重入次数 */
static int exclusiveCount(int c) {
// 将值与写锁掩码做与操作
return c & EXCLUSIVE_MASK;
}
// 第一个获取到读锁的线程
private transient Thread firstReader = null;
// 第一个获取到读锁的线程的可重入次数
private transient int firstReaderHoldCount;
// 记录最后一个获取到读锁的可重入次数
private transient HoldCounter cachedHoldCounter;
static final class HoldCounter {
int count = 0;
// 使用 id 而不是引用来避免垃圾保留
final long tid = getThreadId(Thread.currentThread());
}
其中 readHolds
是一个 ThreadLocal 变量,存放第一个获取到读线程之外的其他线程读锁的可重入次数,ThreadLocalHoldCounter 继承自 ThreadLocal。
firstReader: 这是一个线程引用,用来记录第一个获得读锁的线程。当锁从无读线程持有(即读锁计数器shareCount为0)变为有读线程持有(即读锁计数器shareCount至少为1)时,这个变量会记录下那个“第一个”读线程。
这样做主要是为了优化后续的读锁获取操作,因为一旦有线程成为了firstReader
,它在再次尝试获取读锁时,可以更快地进行,因为它不需要像其他线程那样去更新或检查线程局部的HoldCounter
对象。如果这个线程释放了它的所有读锁,导致读锁计数器回到0,那么firstReader
会被设置为null
。
而 cachedHoldCounter 是存储最后一个获取到锁的线程的 id 和 count,是为了减少在常见情况下(即最近释放锁的线程通常是最近获取锁的线程)的线程本地存储(ThreadLocal)查找开销。
private transient ThreadLocalHoldCounter readHolds;
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
6.4.2)写锁的获取与释放
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
通过上面的代码获取写锁,写锁和上面的 ReentrantLock 锁都是独占可重入锁,所以方法都差不多,调用 lock 方法,可以获取锁:
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
// 调用 sync 中重写的 tryAcquire 方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
其中调用了 WriteLock 中重写的 tryAcquire 方法:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); // 获取写锁的重入次数
// 1)读锁或者写锁被占有
if (c != 0) {
// 1)写锁的重入次数为 0,也就是被读锁占有的情况,如果读锁占有,则 w 不为 0
// 2)当前线程不是持有写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 1)越界的情况
// 2)如果能走到这里,说明锁被当前线程持有
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 重入次数加一
setState(c + acquires);
return true;
}
// 这个 writerShouldBlock() 方法是 ReentrantReadWriteLock 中的一个抽象方法,
// 用于确定当前线程在尝试获取写锁时是否应该被阻塞,
// 具体是因为什么原因阻塞取决于锁的实现和其策略,如果是非公平锁不需要阻塞
// 非公平锁有阻塞相关的逻辑
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
上面的方法是写锁中实现的 tryAcquire 方法,方法的执行流程是这样的:首先回去判断状态值是否不等于 0,如果不等于零则说明读锁或者写锁被占有(读锁和写锁不能同时起作用),然后去判断写锁的重入次数是否为 0,如果为 0 则说明当前锁是读锁,无法获取写锁;如果当前锁是写锁的话,去判断锁是否被线程持有,如果被持有,对重入次数做一个自增;如果当前锁没有被占有,则将修改低 16 位的 state 来表明当前锁是写锁状态,且写锁被占有。
和 ReentrantLock 相同,方法中也提供了
lockInterruptibly()
方法、tryLock()
方法、tryLock(long timeout, TimeUnit unit)
作用和 ReentarntLock 完全相同,这里不赘述了。
写锁的释放方法是委托给 Sync 类来做的:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
下面来看核心代码 tryRelease 的实现:
protected final boolean tryRelease(int releases) {
// 1)锁未被当前线程持有
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases; // 下次修改的值
boolean free = exclusiveCount(nextc) == 0; // 如果为 0 则完全释放锁
if (free)
setExclusiveOwnerThread(null); // 清除持有锁的线程
setState(nextc);
return free; // 锁是否被线程持有
}
6.4.3)读锁的获取与释放
如果当前没有其他线程持有写锁,则当前线程可以获取读锁,AQS 的状态值 state 的高 16 位会增加 1,如果有线程持有写锁的话,获取读锁的线程会被阻塞。
先来看读锁的 lock 方法,同样是委托给 sync 进行的:
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
接下来看一下在读锁中实现的核心代码, tryAcquireShared()
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 1)写锁被占有
// 2)写锁不被当前线程持有
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取高 16 位的内容
int r = sharedCount(c);
// 判断获取读锁的时候是否需要被阻塞
// 1)本类中的逻辑为判断 AQS 队列中的第一个元素是否在获取写锁
// 2)共享锁的获取次数没有达到上限
// 3)当前线程修改 sharedCount 成功
// 多个线程调用该方法的时候只要一个线程会成功(因为进行 CAS 操作),
// 未成功的线程会进入 fullTryAcquireShared 方法
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 1)没有线程获取到读锁
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 1)当前线程是第一个获取到读锁的线程
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter; // 最后一个获取到读锁的线程
// 1)最后一个获取到读锁的线程为 null
// 2)最后一个获取到锁的线程不是当前线程
if (rh == null || rh.tid != getThreadId(current))
// 将 cachedHoldCounter 设置为当前线程
cachedHoldCounter = rh = readHolds.get();
// 1)最后一个获取到锁的线程为当前线程
else if (rh.count == 0)
// 确保 readHolds 被初始化
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
上面的代码中首先检查是否有其他线程获取到了写锁,如果有则直接返回 -1,之后会将当前线程放到 AQS 阻塞队列。如果当前获取读锁的线程持有写锁,则可以直接获取读锁,但注意释放锁的时候将两个锁都释放掉。
本类中的 readerShouldBlock()
方法是这样的:
// 避免重复获取读锁导致写锁无法被获取的情况
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
// 当前 AQS 队列中收个节点请求的是写锁
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
当多次获取读锁可能会导致写锁持续被阻塞,所以当发现 AQS 队列中首个节点请求的是写锁的时候,获取读锁的线程暂时阻塞给写锁让步。
因为多个线程只有一个会获取写锁,剩余的情况在 tryAcquireShared()
中并没有被处理
- 有线程获取写锁的时候,被阻塞
- CAS 操作失败
这时候就调用 fullTryAcquireShared,这个方法会循环自旋的获取读锁:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
// 1)写锁被占有
if (exclusiveCount(c) != 0) {
// 1)写锁不被当前线程持有
if (getExclusiveOwnerThread() != current)
return -1;
// 1)当前线程应该被阻塞
} else if (readerShouldBlock()) {
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
// 1)最后一个获取读锁的线程为空
// 或者
// 2)最后一个获取到锁的线程未被设置为本线程
if (rh == null || rh.tid != getThreadId(current)) {
// 判断当前线程是否获取过锁
rh = readHolds.get();
if (rh.count == 0)
// 未获取过的话,清除 readHolds
readHolds.remove();
}
}
// 当前线程被阻塞了
if (rh.count == 0)
return -1;
}
}
// 执行到这里说明写锁没有被占有,且当前线程没有被阻塞,可以尝试获取锁
if (sharedCount(c) == MAX_COUNT)
// 越界的情况
throw new Error("Maximum lock count exceeded");
// 使用 CAS 操作修改 state,给获取读锁的线程数加一
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 如果当前锁没有被线程占用
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 第一个持有锁的线程为当前线程
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
// cachedHoldCounter 为空,或不为当前线程
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get(); // 设置 rh
else if (rh.count == 0)
readHolds.set(rh);
rh.count++; // 自增
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
上面的方法中,先去判断写锁有没有被占有,如果被占有则直接返回 -1。
然后去判断当前线程是否应该被阻塞,也就是 AQS 队列的队头是不是请求的写锁,然后去判断最后一个获取到锁的线程是不是本线程,如果不是的话,检查线程中的 readHolds 是否为 0(如果为 0 则说明没有获取到锁,如果获取到了锁这里应该置为 1),因为 get 方法会向线程的 ThreadLocal 中添加对象,所以在确定它没有得到锁之后清楚 ThreadLocal 中的内容。
如果上面的代码均通过,说明写锁没有被占有,且当前线程没有被阻塞,可以尝试获取锁,其中获取锁的方法和上面相同。
同样的,读锁中也存在 tryLock 等方法,这里不做过多赘述。
然后来看释放锁的方法,这里的释放锁也是委托给 Sync 类进行的:
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
其中核心方法是 Sync 的实现类中实现的 tryReleaseShared 方法:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 1)当前线程是第一个获取到读锁的线程
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
// 1)当前线程不是最后一个获取到锁的线程
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
// 检查重入次数
int count = rh.count;
// 1)锁已经释放完成,可以清除了
if (count <= 1) {
readHolds.remove();
// 如果是 0 表示未获取到锁
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 减少一次重入次数
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
方法中先对线程是否为 firstReader 或者 cachedHoldCounter 做了判断,对其进行特殊的处理,然后检查重入的次数,如果次数小于等于一,则本次释放就将线程持有的读锁全部释放完成,此时删除线程 ThreadLocal 中的内容;最后循环减少可重入次数。