sync.Mutex
- mutex简介
- mutex 方法
- 源码
- 标志位
- 获取锁
- Lock
- lockSlow
- Unlock
- 怎么 调度 goroutine
- runtime 方法
mutex简介
mutex 是 一种实现互斥的同步原语。(go-version 1.21) (还涉及到Go运行时的内部机制)
mutex 方法
- Lock() 方法用于获取锁,如果锁已被其他 goroutine 占用,则调用的 goroutine 会阻塞,直到锁可用。
- Unlock() 方法用于释放锁,调用该方法前必须确保当前 goroutine 持有锁。
- TryLock()方法尝试获取锁,返回是否成功。使用 TryLock 需要谨慎,因为它通常是对互斥锁的误用的迹象。
源码
标志位
mutexLocked:表示锁是否被持有。如果这个标志位被设置,说明锁已经被某个 goroutine 持有。
mutexWoken:表示是否有被唤醒的等待者。如果这个标志位被设置,说明在释放锁的时候有 goroutine 被唤醒。
mutexStarving:表示锁是否处于饥饿模式。在饥饿模式下,锁的所有权会直接从解锁的 goroutine 直接移交给等待队列中的第一个等待者
获取锁
Lock
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 快速路径上, 直接拿到锁了, 一般是第一个协程的时候 ,
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 获取不到, 就去慢路径上了, 自旋等待、正常模式、饥饿模式 操作
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
lockSlow
func (m *Mutex) lockSlow() {
// 记录 等待开始的事件
var waitStartTime int64
// 标记是否是饥饿模式
starving := false
// 标志是否 已经唤醒
awoke := false
// 自旋迭代次数
iter := 0
// 获取当前互斥锁的状态
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
// 检查 当前状态的第0位(mutexLocked 互斥锁)是否为1, 第2位 (mutexStarving 互斥锁饥饿模式) 是否为0, 满足这俩个状态说明当前被锁住,并且不处于饥饿模式
// 并且 进行函数检查, runtime_canSpin(1、当前goroutine 是可运行状态, 2、当前goroutine不可抢占),来判断是否有资格进行自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 实现互斥锁的自旋等待机制
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
// !awoke 表示在此之前没有唤醒过其他等待的 Goroutine
// old&mutexWoken == 0:表示之前的状态中还没有被唤醒过的标记。
// old>>mutexWaiterShift != 0:表示有等待的 Goroutine
// atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 尝试唤醒, 原子操作, 将mutexWoken位 置为1
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
// 唤醒位 置为true, 唤醒了, 当前goroutine 有争夺锁的机会了
awoke = true
}
// 通过自旋等待的方式竞争锁的所有权。
// 在获取锁失败的情况下, 不至于立即阻塞当前 goroutine,而是通过短暂的自旋等待,期望其他 goroutine 尽快释放锁,以便当前 goroutine 有机会获取到锁
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
//检查当前锁是否处于饥饿模式,如果不是,说明当前 goroutine 是第一个尝试获取锁的,将 mutexLocked 位置为1,表示锁被持有
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 检查当前锁是否已经被持有或处于饥饿模式。如果是,表示有其他 goroutine 持有锁或已经处于饥饿模式,将 mutexWaiterShift 位加到 new 中,表示有等待者。
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
// 这段代码用于处理在当前锁状态已经是 starving(饥饿)模式,并且当前锁是被锁住的情况下,将新状态 new 中的 mutexStarving 位设置为1,则将锁切换到饥饿模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 唤醒标志位
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
// 在之前没唤醒过了, 但是始终没拿到锁, 所以后面阶段还是需要被重新唤醒, 这个标志位需要被重置
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
//如果成功表示成功获取了锁。
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// old 状态位是0 , 之前没有锁, 说明成功获取了锁, 结束 退出
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// else 如果之前的状态已经锁定了, 后续处理逻辑,主要包括处理饥饿模式、等待队列等情况。
// If we were already waiting before, queue at the front of the queue.
// 当前goroutine在等待队列中排队, 是否使用LIFO方式排队
queueLifo := waitStartTime != 0
// 等待时间为0 , 刚开始排队, 记录下时间
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 等待获取锁, 其中 queueLifo 决定了是否使用 LIFO 排队,最后的参数 1 表示等待一个锁。
// 1、 当锁处于饥饿模式时,等待锁的 goroutine 将不再与新的 goroutine 竞争锁。而是直接将锁的所有权直接移交给队列中的第一个等待的 goroutine(等待队列的队首) runtime_SemacquireMutex(&m.sema, true, 1) LIFO
// 2、 正常模式 runtime_SemacquireMutex(&m.sema, false, 1)FIFO
// 通过runtime 包中, 等待队列
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 根据等待时间 判断是否处于饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
//如果等待时间超过了饥饿模式的阈值,且当前锁的状态是饥饿模式,就尝试退出饥饿模式,切换回正常模式。这里需要考虑到可能的竞态条件,因此使用 CAS(Compare-And-Swap)操作
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 计算状态变化值
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果不处于饥饿模式,或者当前 Goroutine 是队列中的第一个等待者,表示需要退出饥饿模式。
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
// 使用原子操作将互斥锁的状态更新为新值。
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
// 获取不到
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
Unlock
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
// 快速路径:尝试直接获取未锁定的互斥锁
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) TryLock() bool {
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
// There may be a goroutine waiting for the mutex, but we are
// running now and can try to grab the mutex before that
// goroutine wakes up.
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return true
}
func (m *Mutex) unlockSlow(new int32) {
// 不能解锁 没有锁住的锁
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
// 正常模式
old := new
for {
// 如果没有等待者或者 goroutine 已经被唤醒或者获取了锁,就不需要唤醒任何人。
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
// 获取唤醒等待者的权利
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 释放一个信号量,唤醒一个等待者
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饥饿模式 , 使用LIFO队列
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true, 1)
}
}
怎么 调度 goroutine
对goroutine进行调度是通过runtime保重的调度器来实现的
Mutex 的状态: Mutex 的状态信息存储在 state 字段中,其中高位表示已经锁住的数量,低位表示其他信息(比如等待者的数量等)。
等待者队列: Go 的调度器管理着等待者的队列,当一个 goroutine 尝试获取锁但锁已经被其他 goroutine 占用时,它会被放入调度器的等待队列。
调度器的作用: 调度器会负责管理所有的 goroutine,包括它们的状态、调度和等待队列。当锁被释放时,调度器会决定哪个等待中的 goroutine 会被唤醒,然后有机会获取锁。
自旋和阻塞: 在尝试获取锁时,如果锁已经被其他 goroutine 占用,当前 goroutine 会通过自旋等待(短暂的忙等待)或者阻塞等待(让出 CPU 资源,等待调度器通知)。
总体来说,Mutex 的实现是基于 state 字段和调度器的协同工作。它通过调度器来管理等待者的队列,实现了一种高效的锁竞争和等待机制
runtime 方法
1、 runtime_canSpin(iter int):
作用:该函数用于检查当前 goroutine 是否可以进行自旋等待,以避免阻塞。
用法:在自旋等待的时候调用,避免过多的自旋。
2、runtime_doSpin():
作用:实现自旋等待的具体逻辑,包括执行一定的无用操作,使得当前 goroutine 让出 CPU 时间,增加其他 goroutine 获取锁的机会。
用法:在自旋等待的时候调用。
3、runtime_nanotime():
作用:获取当前时间(纳秒级别)。
用法:在等待过程中记录等待的起始时间,用于判断是否需要切换到饥饿模式。
4、runtime_SemacquireMutex(sema *uint32, lifo bool, skipframes int):
作用:在等待获取锁时使用,该函数封装了对信号量的获取操作,可以阻塞当前 goroutine。
参数:
sema:信号量指针,用于同步等待。
lifo:是否使用后进先出(LIFO)方式排队等待。
skipframes:用于在跟踪时跳过的帧数,以隐藏 runtime_SemacquireMutex 的调用。
5、runtime_Semrelease(sema *uint32, handoff bool, skipframes int):
作用:在释放锁时使用,该函数封装了对信号量的释放操作,用于唤醒等待者。
参数:
sema:信号量指针,用于同步等待。
handoff:是否切换到饥饿模式。
skipframes:用于在跟踪时跳过的帧数,以隐藏 runtime_Semrelease 的调用。
这些 runtime 包中的方法提供了底层的并发控制机制,支持互斥锁的实现。它们用于在不同的情况下实现自旋等待、唤醒等待者以及记录时间等操作。