图解java.util.concurrent并发包源码系列——深入理解ReentrantReadWriteLock读写锁,看完可以吊打面试官
- ReentrantReadWriteLock的作用
- ReentrantReadWriteLock的原理
- ReentrantReadWriteLock源码解析
- 构造方法
- 获取写锁和读锁对象
- 计算读锁被持有数和写锁被持有数的位移运算
- 获取锁和释放锁的大体流程
- tryAcquireShared
- fullTryAcquireShared
- tryReleaseShared
- tryAcquire
- tryRelease
- 总结
往期文章:
- 人人都能看懂的图解java.util.concurrent并发包源码系列 ThreadPoolExecutor线程池
- 图解java.util.concurrent并发包源码系列,原子类、CAS、AtomicLong、AtomicStampedReference一套带走
- 图解java.util.concurrent并发包源码系列——LongAdder
- 图解java.util.concurrent并发包源码系列——深入理解AQS,看完可以吊打面试官
- 图解java.util.concurrent并发包源码系列——深入理解ReentrantLock,看完可以吊打面试官
上一篇文章,介绍了ReentrantLock的作用和源码,了解到ReentrantLock是如何通过AQS去实现它的可重入锁的功能的。这次我们再了解另一个并发工具类ReentrantReadWriteLock读写锁,我们从ReentrantReadWriteLock的作用到源码,了解如何通过AQS实现更复杂的并发工具类。因为ReentrantReadWriteLock使用到AQS实现自身的功能,所以需要对AQS有一定的了解,本篇文章同样不会对AQS做过多的介绍,可以通过往期的文章(图解java.util.concurrent并发包源码系列——深入理解AQS,看完可以吊打面试官)了解AQS的作用和原理。
ReentrantReadWriteLock的作用
ReentrantReadWriteLock是读写锁,读写锁的作用就是可以实现读读并发,读写互斥。如果我们只是想对临界资源进行读操作,但是不做修改,那么我么可以获取读锁,读锁不会阻塞其他线程获取读锁,只会阻塞其他线程获取写锁,用于保证获取读锁期间临界资源不会被修改。如果我们对临界资源进行写操作,那么我们可以获取写锁,写锁会阻塞其他的线程获取写锁和读锁。
在读多写少的场景下,读写锁的并发度是比互斥锁(ReentrantLock)的并发度高的,互斥锁无论做的是读操作还是写操作,获取到锁后都会阻塞其他线程。
ReentrantReadWriteLock里面包含了读锁ReadLock和写锁WriteLock两把锁。当我们要加读锁时,我们可以通过ReentrantReadWriteLock获取到ReadLock,然后调用ReadLock的lock方法,就可以加读锁,调用ReadLock的unlock方法就可以释放读锁。当我们要加写锁时,我们可以通过ReentrantReadWriteLock获取到WriteLock,然后调用WriteLock的lock方法,就可以加写锁,调用WriteLock的unlock方法,就可以释放写锁。
ReentrantReadWriteLock的原理
既然只有读锁和读锁是可以并发的,读锁和写锁,写锁和写锁互斥,那读写和写锁间就要互相感知,也就是要知道对方是否被某些线程持有着,比如获取读锁时要判断写锁是否被其他线程持有,如果写锁被被其他线程持有,那么读锁就不能加锁成功。
但是读锁和写锁时两个锁,如何做到互相感知呢?那就是使用共享内存变量,而这个变量不是别的,正是AQS的state变量。
ReentrantReadWriteLock内部也是有一个Sync的内部类的,这个内部类也继承了AQS,在创建ReentrantReadWriteLock时,同时创建Sync、ReaderLock和WriteLock,并且把Sync作为ReaderLock和WriteLock的构造参数传递给ReaderLock和WriteLock,ReaderLock和WriteLock可以通过Sync获取AQS中的state变量,得知对方是否有被持有。
那一个变量,如何同时供ReaderLock读锁和WriteLock写锁使用呢?state要同时记录读锁的被持有数和写锁的被持有数,因此要使用位运算。ReentrantReadWriteLock把state切成两半,高16位用于记录读锁的被持有数,低16位用于记录写锁的被持有数。那么ReaderLock和WriteLock就可以通过对state变量做位运算,得知对方是否有被持有。
ReentrantReadWriteLock源码解析
构造方法
下面开始阅读ReentrantReadWriteLock的源码,首先是ReentrantReadWriteLock的构造方法。
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
ReentrantReadWriteLock的构造方法接收一个boolean类型的参数fair,如果fair是true,表示ReentrantReadWriteLock是公平锁形式的读写锁,那么内部的Sync的类型使用FairSync,如果fair是false,表示ReentrantReadWriteLock是非公平锁形式的读写锁,那么内部的Sync的类型使用NonfairSync。
ReentrantReadWriteLock的构造方法接下来会创建ReadLock对象和WriteLock对象,并以自身作为构造参数。
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
然后ReadLock和ReadLock的构造方法都会保存ReentrantReadWriteLock的Sync对象到自身内部,与我们上面说的一致。
获取写锁和读锁对象
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
调用ReentrantReadWriteLock的writeLock方法,可以获取写锁;调用ReentrantReadWriteLock的readLock方法,可以获取读锁。
计算读锁被持有数和写锁被持有数的位移运算
而计算读锁被持有数和写锁被持有数的方法,则是在Sync类内部,ReadLock和WriteLock可以通过Sync提供的方法判断对方的被持有数,从而判断自己是否可以尝试加锁。
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 通过位移运算获取读锁被持有数(state无符号右移16位)
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 通过位移运算获取写锁被持有数(state与写锁掩码按位与运算,写锁掩码高16位为0,低16位为1)
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// 。。。。。。下面代码省略
}
可以看到Sync的sharedCount方法用于获取读锁被持有数,exclusiveCount方法用于获取写锁被持有数,这两个方法的int类型参数c都是state。
sharedCount方法将state进行无符号右移16位(SHARED_SHIFT值固定为16),右移16位后,正好把原先低16位的写锁被持有数抹去,然后高16位的读锁被持有数移到了低16位,那么此时sharedCount方法的返回值就是读锁被持有数。
exclusiveCount则是通过state和一个掩码EXCLUSIVE_MASK做按位与运算,把高16位的读锁被持有数抹去,EXCLUSIVE_MASK的高16位全是0,低16位全是1,那么state和EXCLUSIVE_MASK做按位与运算之后,高16位正好都会变为0,那么此时exclusiveCount方法的返回值就是写锁被持有数。
获取锁和释放锁的大体流程
下面看一看获取锁和释放锁的大体流程,先不看细节,对整体流程有个印象。
ReadLock的lock方法,是直接调用sync的acquireShared方法,acquireShared方法是AQS提供的一个模板方法,以共享模式获取锁。
public void lock() {
sync.acquireShared(1);
}
ReadLock的unlock方法,直接调用sync的releaseShared方法,releaseShared方法是AQS提供的一个模板方法,用于共享模式获取锁情况下释放锁。
public void unlock() {
sync.releaseShared(1);
}
WriteLock的lock方法,直接调用sync的acquire方法,acquire方法是AQS提供的一个模板方法,以独占模式获取锁。
public void lock() {
sync.acquire(1);
}
WriteLock的unlock方法,直接调用sync的release方法,release方法是AQS提供的一个模板方法,用于独占模式获取锁情况下释放锁。
public void unlock() {
sync.release(1);
}
acquireShared方法、releaseShared方法、acquire方法、release方法,这些方法都是AQS内部提供的模板方法。acquireShared方法会调用tryAcquireShared方法,releaseShared方法会调用tryReleaseShared方法,acquire方法会调用tryAcquire方法,release方法会调用tryRelease方法,这些方法都要继承AQS的子类去实现,而在ReentrantReadWriteLock中,这些方法都在Sync类的内部。
abstract static class Sync extends AbstractQueuedSynchronizer {
// 。。。。。。省略上面代码
abstract boolean readerShouldBlock();
abstract boolean writerShouldBlock();
protected final boolean tryRelease(int releases) {...}
protected final boolean tryAcquire(int acquires) {...}
protected final boolean tryReleaseShared(int unused) {...}
protected final int tryAcquireShared(int unused) {...}
// 。。。。。。省略下面代码
}
那么既然tryAcquireShared、tryReleaseShared、tryAcquire、tryRelease都在Sync内部,那Sync类的子类FairSync和NonfairSync内部有啥呢?
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {...}
final boolean readerShouldBlock() {...}
}
static final class FairSync extends Sync {
final boolean writerShouldBlock() {...}
final boolean readerShouldBlock() {...}
}
ReentrantReadWriteLock的FairSync和NonfairSync内部相对于ReentrantLock的FairSync和NonfairSync就简单很多了,就是实现了Sync定义的两个抽象方法writerShouldBlock方法和readerShouldBlock方法。writerShouldBlock方法会在Sync的tryAcquire方法内部被调用,而readerShouldBlock方法会在Sync的tryAcquireShared方法内部被调用。
那么大体逻辑就是这样:
那么下面我们就着重对这几个方法进行分析。
tryAcquireShared
当我们调用ReadLock的lock方法获取读锁时,ReadLock的lock方法直接调用AQS的acquireShared方法,acquireShared的acquire方法会调用子类实现的tryAcquireShared方法,然后就会进入到Sync的tryAcquireShared方法内部。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 1.如果当前有线程获取了写锁,并且获取写锁的线程不是当前线程,那么当前线程此次尝试获取锁失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// readerShouldBlock():
// 如果是非公平锁,检查队列中的第一个节点是否是等待获取写锁的线程,如果是,那么返回true,当前线程次数尝试获取读锁失败;
// 如果是公平锁,看队列中是否有节点在排队,如果有,那么当前线程不能获取读锁(除非当前线程也在队列中,并且刚好轮到它)。
// r < MAX_COUNT:防止写锁超过最大的获取数
// compareAndSetState(c, c + SHARED_UNIT):上面两个条件都通过,那么尝试CAS更新state,因为读锁的持有数记录在state的高16位,所以这里不是+1,而是+SHARED_UNIT(一个读锁获取单位)
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 下面是读锁获取成功后,修改读锁的持有数
if (r == 0) {
// 当前没有线程获取读锁,当前线程是第一个获取读锁的线程,记录到firstReader,然后当前线程的读锁持有数记录到firstReaderHoldCount
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 当前线程是第一个获取读锁的线程,当前线程的读锁持有数记录到firstReaderHoldCount
firstReaderHoldCount++;
} else {
// 当前线程不是第一个获取读锁的线程,读锁持有数记录到一个ThreadLocal变量中欧冠
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// CAS失败,那么调用fullTryAcquireShared方法进行自旋
return fullTryAcquireShared(current);
}
先上一张图,描绘一下上面tryAcquireShared方法的代码流程:
tryAcquireShared方法的第一步,就是先看一下当前有没有线程获取了写锁,并且获取写锁的线程不是当前线程。如果是的话,那么本次尝试获取读锁就立刻返回失败。
如果当前没有线程获取写锁呢?那么可以继续往下走,调用readerShouldBlock()方法判断当前线程获取读锁是否应该被阻塞,我们进去看看readerShouldBlock()方法的逻辑。
可以看到readerShouldBlock()方法有两个版本的实现,因为tryAcquireShared方法是在Sync类内部,所以此时readerShouldBlock()方法会调用到Sync子类的实现,FairSync代表公平锁,NonfairSync代表非公平锁。
NonfairSync#readerShouldBlock:
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
// 判断队列中的第一个节点的线程,是否是非共享模式,也就是等待获取写锁的
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
NonfairSync的readerShouldBlock方法,就是判断队列中的第一个节点的线程,是否在等待获取写锁的,如果是的话,那么不能让当前线程获取读锁,为什么呢?因为如果这里不拦住,就会造成写饥饿。想象一下,此时源源不断的有线程过来获取读锁,那么是不是这个要获取写锁的线程,永远都无法获取到?
FairSync#readerShouldBlock:
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
FairSync的readerShouldBlock就相对简单了,就是调用hasQueuedPredecessors()方法进行判断,这个方法我们上一篇文章已经分析过,就是判断队列中是否有等待获取锁的线程。如果队列中有等待获取锁的线程,那么当前线程也不能获取读锁,要去排队。除非当前线程已经在排队,并且刚好轮到它了,那么当前线程可以尝试获取读锁。
回到tryAcquireShared方法,接下来是判断读锁当前被持有的次数,是否还没满。如果还没满,那么当前线程可以尝试获取读锁;如果满了,那么当前线程也是不能获取读锁的。因为state是一个int类型的变量,并且要同时记录读锁的被持有数和写锁的被持有数,只有16位记录读锁被持有的次数,超了就记录不下了。
上面两个条件都通过的话,那么就可以调用compareAndSetState(c, c + SHARED_UNIT)方法,尝试CAS修改state变量。c就是state变量,尝试修改为“c + SHARED_UNIT”而不是“c + 1”,因为state用高16位记录读锁被持有的次数,所有不能+1。
如果CAS修改成功了,那么就表示获取读锁成功了。在方法返回前,要更新当前线程持有读锁的次数,如果它是第一个获取读锁的线程,那么可以直接记录到firstReaderHoldCount这个成员变量中,并且用firstReader记录当前线程是第一个获取读锁的线程。如果当前线程不是第一个获取读锁的线程,那么就要记录到ThreadLocal中。
如果CAS失败,或者前两个条件就没满足,那么就调用fullTryAcquireShared(current)方法,在里面进行自旋重试。
fullTryAcquireShared
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) { // 自旋
int c = getState();
// 如果有别的线程获取了写锁,获取读锁失败
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// 再次调用readerShouldBlock()方法判断是否要阻塞当前线程获取读锁
} else if (readerShouldBlock()) {
if (firstReader == current) {
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 判断读锁被持有数是否要超了
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 尝试CAS修改state
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 获取读锁成功,更新当前线程读锁持有数
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh;
}
return 1;
}
}
}
可以看到就是一个自旋,自旋里面的逻辑跟外面的tryAcquireShared方法是非常相似的。也是判断如果有别的线程获取了写锁,那么当前线程获取读锁失败;如果没有线程持有写锁,那么调用readerShouldBlock()方法判断是否要阻塞当前线程获取读锁,如果readerShouldBlock()方法返回true,那么当前线程本轮循环不能获取读锁;如果readerShouldBlock()方法返回false,那么判断读锁的被持有数是否已经满了,如果满了,就抛一个异常;这些条件都通过了,才尝试使用CAS的方式更新state变量,如果更新成功了,就修改当前线程的读锁持有数,返回获取读锁成功;如果前面的这些条件没有满足,或者CAS更新失败,那么本轮循环获取读锁失败,进入下一轮循环。
tryAcquireShared方法分析完毕。
tryReleaseShared
当我们调用ReadLock的unlock释放读锁时,会调用AQS的releaseShared方法,AQS的releaseShared方法会调用Sync的tryReleaseShared方法。
Sync#tryReleaseShared:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 1.更新当前线程的读锁持有数。
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 2.死循环进行CAS更新state变量,直到更新成功为止。如果state等于0,会返回true表示锁全部释放完毕,如果队列中有等待获取写锁的,可以尝试获取写锁了;如果state不等于0,那么还有线程没有释放锁。
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
Sync的tryReleaseShared方法相对简单,就是两大步。第一步是更新当前线程的读锁持有数,如果当前线程是第一个获取读锁的线程,那么就更新firstReaderHoldCount变量,firstReaderHoldCount为0的话,会把firstReader (第一个获取读锁的线程)置空;如果当前线程不是第一个获取读锁的线程,那么就更新ThreadLocal。第二步是自旋CAS更新state变量,更新成功后如果state等于0了,会返回true,表示锁完全释放(此时没有线程持有锁),如果有线程想获取写锁的话,可以尝试获取写锁;更新成功后如果state不等于0,那么返回false,表示还有线程持有锁。
tryReleaseShared方法分析完毕。
tryAcquire
当我们通过ReentrantReadWriteLock的writeLock()方法获取了WriteLock对象,可以调用WriteLock对象的lock方法获取读锁,lock方法会调用AQS的acquire方法,然后acquire方法会调用Sync对象的tryAcquire方法尝试获取写锁。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// 获取AQS中的state变量
int c = getState();
// 获取写锁被持有数
int w = exclusiveCount(c);
// state不为0,表示有线程持有锁
if (c != 0) {
// w == 0,表示有线程持有读锁,那么当前线程不能获取写锁
// current != getExclusiveOwnerThread(),有线程获取写锁,但不是当前线程,那么当前线程也是不能获取写锁
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 写锁被持有数要超了,那么抛出异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 到这里,表示当前线程是持有了写锁的,现在只是重入,所以不需要CAS
setState(c + acquires);
return true;
}
// writerShouldBlock()判断是否要阻塞当前线程获取写锁
// compareAndSetState(c, c + acquires) CAS尝试更新state,更新成功表示获取写锁成功
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 设置当前线程为独占锁的线程
setExclusiveOwnerThread(current);
return true;
}
首先是判断state是否不等于0,如果state不等于0,那么表示当前已经有线程获取了锁,如果有线程获取了读锁,或者有别的线程获取了写锁,那么当前线程获取写锁失败,因为写锁与写锁之间、写锁与读锁之间都是互斥的。如果state不等于0,但是是当前线程已经持有了写锁,那么当前线程是可以继续获取写锁的,也就是锁重入,而且修改state变量只需要调用setState方法即可,不需要CAS,因为当前线程已经持有了写锁,其他线程是无法修改state的,只有当前线程可以修改state。
如果state等于0,那么当前没有线程持有锁,然后就调用writerShouldBlock()方法判断当前线程是否需要被阻塞,如果不需要阻塞当前线程获取写锁的话,那么当前线程可以尝试CAS修改state变量,如果修改state变量成功,代表当前线程成功获取了写锁,那么修改当前线程为独占锁的线程。
我们再看看writerShouldBlock()方法的逻辑。
NonfairSync#writerShouldBlock
final boolean writerShouldBlock() {
return false;
}
在非公平锁的情况下,只要当前没有线程已经持有读锁,那么永远不会阻塞写锁的获取。这是防止写饥饿的一种处理,只要当前没有线程持有读锁,当前线程就可以尝试获取写锁。如果不这样处理的话,有可能会有大量的线程抢先获取了读锁,那么当前线程就有可能迟迟获取不了写锁。
FairSync#writerShouldBlock
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
FairSync的writerShouldBlock方法和readerShouldBlock方法一样的逻辑,都是调用hasQueuedPredecessors()判断队列中是否有线程在排队,如果有的话,那么当前线程也只能去排队。
tryAcquire方法的整体逻辑:
tryAcquire方法分析完毕。
tryRelease
当我们调用WriteLock对象的unlock方法释放读锁时,unlock方法会调用AQS的release方法,然后release方法会调用Sync的tryRelease方法。
protected final boolean tryRelease(int releases) {
// 当前线程不是独占锁的线程,表示当前线程没有获取写锁,是不需要释放写锁的,抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 更新后的state
int nextc = getState() - releases;
// 更新后的state中写锁持有数为0,表示当前线程彻底释放了写锁,那么free为true
boolean free = exclusiveCount(nextc) == 0;
// 当前线程彻底释放了写锁,设置当前占有锁的线程为null
if (free)
setExclusiveOwnerThread(null);
// state更新之前,state中的写锁持有数是不为0的,其他线程是进不来的,所以这里直接setState即可,不需要CAS
setState(nextc);
return free;
}
首先判断当前线程是否是独占锁的线程,如果不是的话,表示当前线程是没有获取写锁的,那么就不需要释放写锁,因此会抛出一个异常。
然后判断当前线程在本次释放写锁之后,写锁是否被完全释放,也就是所有重入的锁是否都释放回去了,如果是,那么修改当前独占锁的线程为null。
最后使用AQS提供的setState方法修改state变量。这里不需要CAS,是因为即便是setState修改后state为0,在setState之前,state中的写锁持有数都是不为0的,那么其他线程是进不来的,因此还是只有当前线程可以修改state。
tryRelease方法分析完毕。
总结
整个ReentrantReadWriteLock的获取锁和释放锁的大体流程,到这里就分析完毕了,最后上一张大图:
简单概括的话,其实就是在获取读锁的时候,判断是否有其他线程获取写锁,如果有,那么就是冲突的,否则就可以获取读锁。而获取写锁的时候,就是判断一下是否有其他线程获取了读锁或者写锁,如果有,那么就是冲突,否则就可以获取写锁。如果可以获取写锁或者读锁,那么就会更新state变量。而释放锁的时候,就是反向更新state。