mutex锁相关
mutex源码分析
Locker接口:
type Locker interface {
Lock()
Unlock()
}
Mutex 就实现了这个接口,Lock请求锁,Unlock释放锁
type Mutex struct {
state int32 //锁状态,保护四部分含义
sema uint32 //信号量,用于阻塞等待或者唤醒
}
-
Locked:表示该 mutex 是否被锁定,0 表示没有,1 表示处于锁定状态;
-
Woken:表示是否有协程被唤醒,0 表示没有,1 表示有协程处于唤醒状态,并且在加锁过程中;
-
Starving:Go1.9 版本之后引入,表示 mutex 是否处于饥饿状态,0 表示没有,1 表示有协程处于饥饿状态;
-
Waiter: 等待锁的协程数量。
方法解析
const (
// mutex is locked ,在低位,值 1
mutexLocked = 1 << iota
//标识有协程被唤醒,处于 state 中的第二个 bit 位,值 2
mutexWoken
//标识 mutex 处于饥饿模式,处于 state 中的第三个 bit 位,值 4
mutexStarving
// 值 3,state 值通过右移三位可以得到 waiter 的数量
// 同理,state += 1 << mutexWaiterShift,可以累加 waiter 的数量
mutexWaiterShift = iota
// 标识协程处于饥饿状态的最长阻塞时间,当前被设置为 1ms
starvationThresholdNs = 1e6
)
Lock
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex. //运气好,直接加锁成功
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
//内联,加锁失败,就得去自旋竞争或者饥饿模式下竞争
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
// 标识是否处于饥饿模式
starving := false
// 唤醒标记
awoke := false
// 自旋次数
iter := 0
old := m.state
for {
// 非饥饿模式下,开启自旋操作
// 从 runtime_canSpin(iter) 的实现中(runtime/proc.sync_runtime_canSpin)可以知道,
// 如果 iter 的值大于 4,将返回 false
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 如果没有其他 waiter 被唤醒,那么将当前协程置为唤醒状态,同时 CAS 更新 mutex 的 Woken 位
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 开启自旋
runtime_doSpin()
iter++
// 重新检查 state 的值
old = m.state
continue
}
new := old
// 非饥饿状态
if old&mutexStarving == 0 {
// 当前协程可以直接加锁
new |= mutexLocked
}
// mutex 已经被锁住或者处于饥饿模式
// 那么当前协程不能获取到锁,将会进入等待状态
if old&(mutexLocked|mutexStarving) != 0 {
// waiter 数量加 1,当前协程处于等待状态
new += 1 << mutexWaiterShift
}
// 当前协程处于饥饿状态并且 mutex 依然被锁住,那么设置 mutex 为饥饿模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 清除唤醒标记
// &^ 与非操作,mutexWoken: 10 -> 01
// 此操作之后,new 的 Locked 位值是 1,如果能够成功写入到 m.state 字段,那么当前协程获取锁成功
new &^= mutexWoken
}
// CAS 设置新状态成功
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 旧的锁状态已经被释放并且处于非饥饿状态
// 这个时候当前协程正常请求到了锁,就可以直接返回了
if old&(mutexLocked|mutexStarving) == 0 {
break
}
// 处理当前协程的饥饿状态
// 如果之前已经处于等待状态了(已经在队列里面),那么将其加入到队列头部,从而可以被高优唤醒
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
// 阻塞开始时间
waitStartTime = runtime_nanotime()
}
// P 操作,阻塞等待
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 唤醒之后,如果当前协程等待超过 1ms,那么标识当前协程处于饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// mutex 已经处于饥饿模式
if old&mutexStarving != 0 {
// 1. 如果当前协程被唤醒但是 mutex 还是处于锁住状态
// 那么 mutex 处于非法状态
//
// 2. 或者如果此时 waiter 数量是 0,并且 mutex 未被锁住
// 代表当前协程没有在 waiters 中,但是却想要获取到锁,那么 mutex 状态非法
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// delta 代表加锁并且将 waiter 数量减 1 两步操作
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 非饥饿状态 或者 当前只剩下一个 waiter 了(就是当前协程本身)
if !starving || old>>mutexWaiterShift == 1 {
// 那么 mutex 退出饥饿模式
delta -= mutexStarving
}
// 设置新的状态
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
解锁操作会根据 Mutex.state 的状态来判断需不需要去唤醒其他等待中的协程。
func (m *Mutex) unlockSlow(new int32) {
// new - state 字段原子减 1 之后的值,如果之前是处于加锁状态,那么此时 new 的末位应该是 0
// 此时 new+mutexLocked 正常情况下会将 new 末位变成 1
// 那么如果和 mutexLocked 做与运算之后的结果是 0,代表 new 值非法,解锁了一个未加锁的 mutex
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 如果不是处于饥饿状态
if new&mutexStarving == 0 {
old := new
for {
// old>>mutexWaiterShift == 0 代表没有等待加锁的协程了,自然不需要执行唤醒操作
// old&mutexLocked != 0 代表已经有协程加锁成功,此时没有必要再唤醒一个协程(因为它不可能加锁成功)
// old&mutexWoken != 0 代表已经有协程被唤醒并且在加锁过程中,此时不需要再执行唤醒操作了
// old&mutexStarving != 0 代表已经进入了饥饿状态,
// 以上四种情况,皆不需要执行唤醒操作
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 唤醒一个等待中的协程,将 state woken 位置为 1
// old - 1<<mutexWaiterShift waiter 数量减 1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饥饿模式
// 将 mutex 的拥有权转移给下一个 waiter,并且交出 CPU 时间片,从而能够让下一个 waiter 立刻开始执行
runtime_Semrelease(&m.sema, true, 1)
}
}
UnLock
// 解锁操作
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// mutexLocked 位设置为 0,解锁
new := atomic.AddInt32(&m.state, -mutexLocked)
// 如果此时 state 值不是 0,代表其他位不是 0(或者出现异常使用导致 mutexLocked 位也不是 0)
// 此时需要进一步做一些其他操作,比如唤醒等待中的协程等
if new != 0 {
m.unlockSlow(new)
}
}
mutex两种运行模式
饥饿模式是对公平性和性能的一种平衡,它避免了某些 goroutine 长时间的等待锁。在饥饿模式下,优先对待的是那些一直在等待的 waiter。
mutex normal 正常模式
默认情况下,Mutex的模式为normal。
该模式下,协程如果加锁不成功不会立即转入阻塞排队,而是判断是否满足自旋的条件,如果满足则会启动自旋过程,尝试抢锁。
正常模式 高吞吐量
自旋
自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。
在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻:
- 互斥锁只有在普通模式才能进入自旋;
- runtime.sync_runtime_canSpin 需要返回 true:
运行在多 CPU 的机器上 - 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
- 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/
mutex starvation 饥饿模式
自旋过程中能抢到锁,一定意味着同一时刻有协程释放了锁,我们知道释放锁时如果发现有阻塞等待的协程,还会释放一个信号量来唤醒一个等待协程,被唤醒的协程得到CPU后开始运行,此时发现锁已被抢占了,自己只好再次阻塞,不过阻塞前会判断自上次阻塞到本次阻塞经过了多长时间,如果超过1ms的话,会将Mutex标记为"饥饿"模式,然后再阻塞。
处于饥饿模式下,不会启动自旋过程,也即一旦有协程释放了锁,那么一定会唤醒协程,被唤醒的协程将会成功获取锁,同时也会把等待计数减1。
在饥饿模式下,Mutex 的拥有者将直接把锁交给队列最前面的 waiter。新来的 goroutine 不会尝试获取锁,即使看起来锁没有被持有,它也不会去抢,也不会 spin(自旋),它会乖乖地加入到等待队列的尾部。
如果拥有 Mutex 的 waiter 发现下面两种情况的其中之一,它就会把这个 Mutex 转换成正常模式:
- 此 waiter 已经是队列中的最后一个 waiter 了,没有其它的等待锁的 goroutine 了;
- 此 waiter 的等待时间小于 1 毫秒(ms)。
锁的底层实现类型
锁内存总线,针对内存的读写操作,在总线上控制,限制程序的内存访问
锁缓存行,同一个缓存行的内容读写操作,CPU内部的高速缓存保证一致性
锁,作用在一个对象或者变量上。现代CPU会优先在高速缓存查找,如果存在这个对象、变量的缓存行数据,会使用锁缓存行的方式。否则,才使用锁总线的方式。
RWMutex
RWMutex的实现比Mutex要简单。
RWMutex的结构中有两个信号量,分别对应于读者和写者,
另外两个整数记录读者和写者的数量。
RWMutex的加解锁过程都是两步,首先原子操作修改读者或写者计数,然后直接获取信号量,与Mutex相比省去了fastpath和自旋锁过程。由此可见,当多读少写的场景中,使用RWMutex的效率应当会高于Mutex,因为省去了大量的忙等待过程。
更细力度的锁,在读读操作非互斥,
正常业务下读多写少,用mutex互斥的话,并发性能不高,
RWMutex读不会产生互斥,并发性能会好一些
项目中读写锁会多一些,有一些加载到内存的一些共享数据
自旋的协程时间过长会空耗cpu资源, cpu nginx_cpu_pause函数会调用汇编中的pause指令, 防止cpu过度消费cpu资源
其他共享内存线程安全的方式
官方不太推荐使用锁,更多的是通过channel做数据交换
思考
如何设计一个并发更高的锁?
在Go语言中,使用切片来设计并发更高效的锁是一种常见的做法,通常被称为"分段锁"或"分片锁"。
这种技术可以在一定程度上减小锁的粒度,从而提高并发性能。
package main
import (
"fmt"
"sync"
"hash/fnv"
)
const numSegments = 16
type ConcurrentMap struct {
segments []sync.Mutex
data map[interface{}]interface{}
}
func NewConcurrentMap() *ConcurrentMap {
segments := make([]sync.Mutex, numSegments)
data := make(map[interface{}]interface{})
return &ConcurrentMap{segments: segments, data: data}
}
func (cm *ConcurrentMap) getSegment(key interface{}) *sync.Mutex {
hash := hashFunction(key) % numSegments
return &cm.segments[hash]
}
func (cm *ConcurrentMap) Get(key interface{}) interface{} {
segment := cm.getSegment(key)
segment.Lock()
defer segment.Unlock()
return cm.data[key]
}
func (cm *ConcurrentMap) Set(key, value interface{}) {
segment := cm.getSegment(key)
segment.Lock()
defer segment.Unlock()
cm.data[key] = value
}
// 假设的哈希函数,仅用于示例目的
func hashFunction(key interface{}) int {
h := fnv.New32a()
// 将键的字节表示写入哈希函数
_, _ = h.Write([]byte(fmt.Sprintf("%v", key)))
return int(h.Sum32())
}
func main() {
concurrentMap := NewConcurrentMap()
var wg sync.WaitGroup
numItems := 1000
for i := 0; i < numItems; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
key := fmt.Sprintf("key%d", index)
concurrentMap.Set(key, index)
}(i)
}
wg.Wait()
// 输出结果
for i := 0; i < numItems; i++ {
key := fmt.Sprintf("key%d", i)
fmt.Printf("%s: %v\n", key, concurrentMap.Get(key))
}
}