本文通过对 Go 中源码层面的加锁、解锁实现细则来介绍锁的操作,包括 Mutex 互斥锁、RWMutex 读写锁,以及它们底层依赖的 sema 信号锁。
atomic 原子操作
正常情况下,多个协程同时操作 num 时,不能保证 num 值得最终一致性,根本原因是对 num 的操作不是原子性的,即在 内存读取
->操作
->写回内存
的过程中,多个协程同时在读取内存,同时写回内存。
var wg = sync.WaitGroup{}
func add(num *int32) {
*num++
defer wg.Done()
}
func main() {
num := int32(0)
wg.Add(1000)
for i := 0; i < 1000; i++ {
go add(&num)
}
wg.Wait()
fmt.Println(num) // 996
}
使用原子 atomic
包即可保证 内存读取
->操作
->写回内存
的原子性:
func add(num *int32) {
atomic.AddInt32(num, 1)
}
AddInt32
方法是用汇编实现的,实现原子操作的核心是使用 CPU 级别的内存锁 LOCK
func AddInt32(addr *int32, delta int32) (new int32)
//%GOROOT%/src/runtime/internal/atomic/atomic_amd64.s
TEXT ·Xaddint32(SB), NOSPLIT, $0-20
JMP ·Xadd(SB)
TEXT ·Xadd(SB), NOSPLIT, $0-20
MOVQ ptr+0(FP), BX
MOVL delta+8(FP), AX
MOVL AX, CX
// 上锁
LOCK
XADDL AX, 0(BX)
ADDL CX, AX
MOVL AX, ret+16(FP)
RET
不足点:atomic
包只能用于简单变量的简单操作
sema 锁
使用 Mutex
互斥锁,RWMutex
读写锁时,其内部会用到 sema 锁,另外一般都将其作为专用的等待队列。
- sema 锁是信号量/信号锁。
- 核心是一个 uint32 值,含义是同时可并发的数量。
- 每一个 sema 锁都对应一个 SemaRoot 结构体,其中有一个平衡二叉树用于协程队列:
type semaRoot struct {
lock mutex
// 平衡二叉树根节点,用于存放等待的协程
treap *sudog
// 正在等待的协程数
nwait atomic.Uint32
}
sema 操作
获取 sema 锁,其实就是将一个 uint32 的值减一 atomic.Cas(addr, v, v-1)
,如果这个操作成功,便获取到锁。
// %GOROOT%/src/runtime/sema.go
func semacquire(addr *uint32) {
semacquire1(addr, false, 0, 0, waitReasonSemacquire)
}
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}
// Easy case.
if cansemacquire(addr) {
return
}
// ...
}
// uint32 需要大于 0 才能获取锁。
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}
释放 sema 锁,将 unit32 加一 atomic.Xadd(addr, 1)
,如果这个操作成功,便获释放锁。
func semrelease(addr *uint32) {
semrelease1(addr, false, 0)
}
func semrelease1(addr *uint32, handoff bool, skipframes int) {
root := semtable.rootFor(addr)
atomic.Xadd(addr, 1)
// Easy case: no waiters?
// This check must happen after the xadd, to avoid a missed wakeup
// (see loop in semacquire).
if root.nwait.Load() == 0 {
return
}
// ...
}
获取锁的时候,如果 uint32 值一开始就为 0,或减到了 0,则协程休眠: goparkunlock()
,进入堆树等待:
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
// ...
for {
lockWithRank(&root.lock, lockRankRoot)
// Add ourselves to nwait to disable "easy case" in semrelease.
root.nwait.Add(1)
// Check cansemacquire to avoid missed wakeup.
if cansemacquire(addr) {
root.nwait.Add(-1)
unlock(&root.lock)
break
}
// Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep.
root.queue(addr, s, lifo)
// gopark 让当前协程休眠
goparkunlock(&root.lock, reason, traceEvGoBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
// ...
}
当别的协程释放锁时,从堆树中取出一个等待的协程唤醒: root.dequeue(addr)
:
func semrelease1(addr *uint32, handoff bool, skipframes int) {
// ...
s, t0 := root.dequeue(addr)
if s != nil {
root.nwait.Add(-1)
}
// ...
}
一般将 sema 的 uint32 值置为 0,当作休眠队列
Mutex 互斥锁
Mutex 结构体如下所示:
type Mutex struct {
state int32
sema uint32
}
Mutex 的 sema 默认置0,当作等待队列。假设当前有两个协程 g
同时竞争资源,并进行加锁操作:
state 标志位的初始状态值:
- WaiterShift:0
- Starving:0
- Woken:0
- Locked:0
没有饥饿的正常模式加锁
其中一个协程加锁成功:sync.Mutex.Lock()
,Locked 状态变为 1:
此时另一个协程进行多次的自旋操作,尝试能不能上锁,自旋多次失败后便会休眠自己,去获取 sema;此时 sema 为 0,将 g
记录在平衡树中,WaiterShift 值置为 1:
同理,一个新来的协程一开始也要去获取锁,但当前锁为 Locked 状态,则进行自旋操作,多次失败后去获取 sema;此时 sema 为 0,将 g
记录在平衡树中,WaiterShift 值置为 2:
sync.Mutex.Lock()
具体内部实现如下:
func (m *Mutex) Lock() {
// 通过 CAS 上锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 上锁失败后执行
m.lockSlow()
}
尝试上锁失败后进行自旋操作:
func (m *Mutex) lockSlow() {
// ...
for {
// 被锁且没处于饥饿模式,并记录自旋 Spin 次数
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 自旋
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// 没有饥饿尝试加锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 超过自旋次数限制
if old&(mutexLocked|mutexStarving) != 0 {
// 如果被锁或饥饿,则等待数量加 1
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
// 加锁成功后退出
break // locked the mutex with CAS
}
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 获取 sema 锁,由于 sema 为 0,加入等待队列(树)
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// ...
}
没有饥饿的正常模式解锁
sync.Mutex.Unlock()
解锁代码如下:
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// sema 加 1,释放锁并唤醒队列里的一个协程
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
Mutex 饥饿模式
产生饥饿的情况:等待队列中的协程多次被唤醒后去竞争锁,但没有获取到锁,就会造成饥饿,具体来说:
- 当前协程等待锁的时间超过 10ms,则切换到饥饿模式:
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
。 - 饥饿模式中,新来的协程不自旋,而是直接 sema 休眠。
- 饥饿模式中,
唤醒的协程直接获取锁
。 - 没有协程在等待队列中时,退出饥饿模式。
// func (m *Mutex) lockSlow()
// 判断当前模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
// 直接加锁
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
RWMutex 读写锁
- 加上读锁时,多个协程能同时进行读操作,但不能进行写操作。
- 读锁里没有协程队列时,才可加写互斥锁,加上写锁,每次只允许一个协程进入写操作。
读写锁结构体如下:
type RWMutex struct {
// 互斥锁作为写锁
w Mutex
// 作为写协程队列,等待读锁释放
writerSem uint32
// 作为读协程队列。等待写锁释放
readerSem uint32
// 正值:正在读的协程个数;负值:加了写锁
readerCount atomic.Int32
// 写锁生效前还需等待释放读锁的读协程个数
readerWait atomic.Int32
}
const rwmutexMaxReaders = 1 << 3
写锁
加写锁步骤:
- 先加 Mutex 写锁,如已经存在写锁,则进入等待。
- 将
readerCount
值变为负值:readerWait - rwmutexMaxReaders
,阻塞读锁获取。 - 计算需要等待多少个读协程释放。
- 如果需要等待读协程释放,则陷入
writerSem
。
加写锁代码如下:
// %GOROOT%src/sync/rwmutex.go
func (rw *RWMutex) Lock() {
// ...
// 1. 加锁
rw.w.Lock()
// 2. 将 `readerCount` 值变为负值:`readerWait - rwmutexMaxReaders`,阻塞读锁获取。
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// r 表示读协程的个数
if r != 0 && rw.readerWait.Add(r) != 0 {
// 3. 如果需要等待读协程释放,则陷入 `writerSem` 休眠。
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
// ...
}
解写锁过程:
func (rw *RWMutex) Unlock() {
// ...
// 1. 复原 readerCount 的值
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
fatal("sync: Unlock of unlocked RWMutex")
}
// 2. 释放加写锁期间后续进入的被阻塞读协程
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 3. 释放写锁
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
读锁
加读锁步骤:
func (rw *RWMutex) RLock() {
// ...
// 1. readerCount 加 1,如果 readCounter 是正数,表示加锁成功
if rw.readerCount.Add(1) < 0 {
// 2. 如果 readerCount 为负数,表示有写锁存在,陷入 readerSem 排队
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
// ...
}
解读锁步骤:
func (rw *RWMutex) RUnlock() {
// ...
// 1. 直接 readerCount 减一
if r := rw.readerCount.Add(-1); r < 0 {
// 如果 readerCount 小于 0,表示前面有写协程在等待
rw.rUnlockSlow(r)
}
// ...
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
fatal("sync: RUnlock of unlocked RWMutex")
}
// 2. 每个读协程释放锁,将 readerWait 减 1
if rw.readerWait.Add(-1) == 0 {
// 3. 最后一个读协程将 readerWait 减到 0 时,去释放一个写协程
runtime_Semrelease(&rw.writerSem, false, 1)
}
}