目录
Sync.RWMutex 背景与机制
接口简单介绍
sync.RWMutex 数据结构
读锁流程
RLock
RUnlock
RWMutex.rUnlockSlow
写锁流程
Lock
Unlock
Sync.RWMutex 背景与机制
从逻辑上,可以把 RWMutex 理解为一把读锁加一把写锁;
写锁具有严格的排他性,当其被占用,其他试图取写锁或者读锁的 goroutine 均阻塞;
读锁具有有限的共享性,当其被占用,试图取写锁的 goroutine 会阻塞,试图取读锁的 goroutine 可与当前 goroutine 共享读锁;
RWMutex 适用于读多写少的场景
最理想化的情况,当所有操作均使用读锁,则可实现去无化;
最悲观的情况,倘若所有操作均使用写锁,则 RWMutex 退化为普通的 Mutex.
接口简单介绍
var mtu sync.RWMutex
mtu.RLock() //读锁 加锁
mtu.RUnlock() //读锁 解锁
mtu.Lock() //写锁 加锁
mtu.Unlock() //写锁 解锁
sync.RWMutex 数据结构
const rwmutexMaxReaders = 1 << 30
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
• rwmutexMaxReaders:共享读锁的 goroutine 数量上限,值为 2^29;
• w:RWMutex 内置的一把普通互斥锁 sync.Mutex;
• writerSem:关联写锁阻塞队列的信号量;
• readerSem:关联读锁阻塞队列的信号量;
• readerCount:正常情况下等于介入读锁流程的 goroutine 数量;当 goroutine 接入写锁流程时,该值为实际介入读锁流程的 goroutine 数量减 rwmutexMaxReaders.
• readerWait:记录在当前 goroutine 获取写锁前,还需要等待多少个 goroutine 释放读锁.
读锁流程
RLock
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
1,基于原子操作,将 RWMutex 的 readCount 变量加一,表示占用或等待读锁的 goroutine 数加一
2,倘若 RWMutex.readCount 的新值仍小于 0,说明有 goroutine 未释放写锁
写锁加锁时 readCount 减去了 特殊值 (const rwmutexMaxReaders = 1 << 30)
因此将当前 goroutine 添加到读锁的阻塞队列中并阻塞挂起.
RUnlock
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
rw.rUnlockSlow(r)
}
}
1,基于原子操作,将 RWMutex 的 readCount 变量加一,表示占用或等待读锁的 goroutine 数减一
2,倘若 RWMutex.readCount 的新值小于 0,说明有 goroutine 在等待获取写锁,
则走入 RWMutex.rUnlockSlow(唤醒阻塞等待的写锁) 的流程中.
RWMutex.rUnlockSlow
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
fatal("sync: RUnlock of unlocked RWMutex")
}
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
1,对 RWMutex.readerCount 进行校验,倘若发现当前协程此前未抢占过读锁(并为对读锁+1),或
者介入读锁流程的goroutine 数量达到上限(r+1复原后等于特殊值,则唤醒前已经有写锁)抛出fatal
2.基于原子操作,对 RWMutex.readerWait 进行减一操作,倘若其新值为 0,说明当前 goroutine 是最后一个介入读锁流程的协程,因此需要唤醒一个等待写锁的阻塞队列的 goroutine.
写锁流程
Lock
func (rw *RWMutex) Lock() {
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
-
对 RWMutex 内置的互斥锁进行加锁操作;
-
基于原子操作,对 RWMutex.readerCount(标识作用)进行减少-rwmutexMaxReaders 的操作;
-
倘若此时存在未释放读锁的 gouroutine则基于原子操作在 RWMutex.readerWait 的基础上加上介入读锁流程的 goroutine 数量(负责更新),并将当前 goroutine 添加到写锁的阻塞队列中挂起.
Unlock
func (rw *RWMutex) Unlock() {
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
fatal("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
}
1,基于原子操作,将 RWMutex.readerCount 的值加上 rwmutexMaxReaders;(复原)
2,倘若发现 RWMutex.readerCount 的新值大于 rwmutexMaxReaders,则说明
要么当前 RWMutex 未上过写锁,要么介入读锁流程的 goroutine 数量已经超限,因此直接抛出 fatal
3,因此唤醒读锁阻塞队列中的所有 goroutine;
(可见,读锁比写锁先被唤醒,竞争读锁的 goroutine 更具备优势)
4,解开 RWMutex 内置的互斥锁.