深入理解 Golang: 锁

news2024/10/7 19:24:03

本文通过对 Go 中源码层面的加锁、解锁实现细则来介绍锁的操作,包括 Mutex 互斥锁、RWMutex 读写锁,以及它们底层依赖的 sema 信号锁。

atomic 原子操作

正常情况下,多个协程同时操作 num 时,不能保证 num 值得最终一致性,根本原因是对 num 的操作不是原子性的,即在 内存读取->操作->写回内存 的过程中,多个协程同时在读取内存,同时写回内存。

var wg = sync.WaitGroup{}

func add(num *int32) {
 *num++
 defer wg.Done()
}

func main() {
 num := int32(0)
 wg.Add(1000)
 for i := 0; i < 1000; i++ {
  go add(&num)
 }

 wg.Wait()
 fmt.Println(num) // 996
}

使用原子 atomic 包即可保证 内存读取->操作->写回内存 的原子性:

func add(num *int32) {
 atomic.AddInt32(num, 1)
}

AddInt32 方法是用汇编实现的,实现原子操作的核心是使用 CPU 级别的内存锁 LOCK

func AddInt32(addr *int32, delta int32) (new int32)
//%GOROOT%/src/runtime/internal/atomic/atomic_amd64.s
TEXT ·Xaddint32(SB), NOSPLIT, $0-20
 JMP ·Xadd(SB)

TEXT ·Xadd(SB), NOSPLIT, $0-20
 MOVQ ptr+0(FP), BX
 MOVL delta+8(FP), AX
 MOVL AX, CX

 // 上锁
 LOCK
 XADDL AX, 0(BX)
 ADDL CX, AX
 MOVL AX, ret+16(FP)
 RET

不足点:atomic 包只能用于简单变量的简单操作

sema 锁

使用 Mutex 互斥锁,RWMutex 读写锁时,其内部会用到 sema 锁,另外一般都将其作为专用的等待队列。

  • sema 锁是信号量/信号锁。
  • 核心是一个 uint32 值,含义是同时可并发的数量。
  • 每一个 sema 锁都对应一个 SemaRoot 结构体,其中有一个平衡二叉树用于协程队列:
type semaRoot struct {
 lock  mutex
 // 平衡二叉树根节点,用于存放等待的协程
 treap *sudog 
 // 正在等待的协程数
 nwait atomic.Uint32
}

sema 操作
获取 sema 锁,其实就是将一个 uint32 的值减一 atomic.Cas(addr, v, v-1),如果这个操作成功,便获取到锁。

// %GOROOT%/src/runtime/sema.go
func semacquire(addr *uint32) {
 semacquire1(addr, false, 0, 0, waitReasonSemacquire)
}

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
 gp := getg()
 if gp != gp.m.curg {
  throw("semacquire not on the G stack")
 }

 // Easy case.
 if cansemacquire(addr) {
  return
 }

 // ...
}

// uint32 需要大于 0 才能获取锁。
func cansemacquire(addr *uint32) bool {
 for {
  v := atomic.Load(addr)
  if v == 0 {
   return false
  }
  if atomic.Cas(addr, v, v-1) {
   return true
  }
 }
}

释放 sema 锁,将 unit32 加一 atomic.Xadd(addr, 1),如果这个操作成功,便获释放锁。

func semrelease(addr *uint32) {
 semrelease1(addr, false, 0)
}

func semrelease1(addr *uint32, handoff bool, skipframes int) {
 root := semtable.rootFor(addr)
 atomic.Xadd(addr, 1)

 // Easy case: no waiters?
 // This check must happen after the xadd, to avoid a missed wakeup
 // (see loop in semacquire).
 if root.nwait.Load() == 0 {
  return
 }
    // ...
}

获取锁的时候,如果 uint32 值一开始就为 0,或减到了 0,则协程休眠: goparkunlock(),进入堆树等待:

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
 
 // ...

 for {
  lockWithRank(&root.lock, lockRankRoot)
  // Add ourselves to nwait to disable "easy case" in semrelease.
  root.nwait.Add(1)
  // Check cansemacquire to avoid missed wakeup.
  if cansemacquire(addr) {
   root.nwait.Add(-1)
   unlock(&root.lock)
   break
  }


  // Any semrelease after the cansemacquire knows we're waiting
  // (we set nwait above), so go to sleep.
  root.queue(addr, s, lifo)

  // gopark 让当前协程休眠
  goparkunlock(&root.lock, reason, traceEvGoBlockSync, 4+skipframes)
  if s.ticket != 0 || cansemacquire(addr) {
   break
  }
 }
 // ...
}

当别的协程释放锁时,从堆树中取出一个等待的协程唤醒: root.dequeue(addr)

func semrelease1(addr *uint32, handoff bool, skipframes int) {
 // ...
 s, t0 := root.dequeue(addr)
 if s != nil {
  root.nwait.Add(-1)
 }
 // ...
}

一般将 sema 的 uint32 值置为 0,当作休眠队列

Mutex 互斥锁

Mutex 结构体如下所示:

type Mutex struct {
 state int32
 sema  uint32
}

Mutex 的 sema 默认置0,当作等待队列。假设当前有两个协程 g 同时竞争资源,并进行加锁操作:
在这里插入图片描述
state 标志位的初始状态值:

  1. WaiterShift:0
  2. Starving:0
  3. Woken:0
  4. Locked:0

没有饥饿的正常模式加锁
其中一个协程加锁成功:sync.Mutex.Lock(),Locked 状态变为 1:
在这里插入图片描述
此时另一个协程进行多次的自旋操作,尝试能不能上锁,自旋多次失败后便会休眠自己,去获取 sema;此时 sema 为 0,将 g 记录在平衡树中,WaiterShift 值置为 1:
在这里插入图片描述
同理,一个新来的协程一开始也要去获取锁,但当前锁为 Locked 状态,则进行自旋操作,多次失败后去获取 sema;此时 sema 为 0,将 g 记录在平衡树中,WaiterShift 值置为 2:
在这里插入图片描述

sync.Mutex.Lock() 具体内部实现如下:

func (m *Mutex) Lock() {
 // 通过 CAS 上锁
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  if race.Enabled {
   race.Acquire(unsafe.Pointer(m))
  }
  return
 }
 
 // 上锁失败后执行
 m.lockSlow()
}

尝试上锁失败后进行自旋操作:

func (m *Mutex) lockSlow() {
 // ...
 for {
    // 被锁且没处于饥饿模式,并记录自旋 Spin 次数
  if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
   if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    awoke = true
   }
   // 自旋
   runtime_doSpin()
   iter++
   old = m.state
   continue
  }
  new := old
  // 没有饥饿尝试加锁
  if old&mutexStarving == 0 {
   new |= mutexLocked
  }
  // 超过自旋次数限制
  if old&(mutexLocked|mutexStarving) != 0 {
    // 如果被锁或饥饿,则等待数量加 1
   new += 1 << mutexWaiterShift
  }
  if starving && old&mutexLocked != 0 {
   new |= mutexStarving
  }
  if atomic.CompareAndSwapInt32(&m.state, old, new) {
   if old&(mutexLocked|mutexStarving) == 0 {
    // 加锁成功后退出
    break // locked the mutex with CAS
   }
   queueLifo := waitStartTime != 0
   if waitStartTime == 0 {
    waitStartTime = runtime_nanotime()
   }

   // 获取 sema 锁,由于 sema 为 0,加入等待队列(树)
   runtime_SemacquireMutex(&m.sema, queueLifo, 1)
   // ...
}

没有饥饿的正常模式解锁

sync.Mutex.Unlock() 解锁代码如下:

func (m *Mutex) Unlock() {
 if race.Enabled {
  _ = m.state
  race.Release(unsafe.Pointer(m))
 }

 // sema 加 1,释放锁并唤醒队列里的一个协程
 new := atomic.AddInt32(&m.state, -mutexLocked)
 if new != 0 {
  m.unlockSlow(new)
 }
}

Mutex 饥饿模式
产生饥饿的情况:等待队列中的协程多次被唤醒后去竞争锁,但没有获取到锁,就会造成饥饿,具体来说:

  • 当前协程等待锁的时间超过 10ms,则切换到饥饿模式:starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
  • 饥饿模式中,新来的协程不自旋,而是直接 sema 休眠。
  • 饥饿模式中,唤醒的协程直接获取锁
  • 没有协程在等待队列中时,退出饥饿模式。
// func (m *Mutex) lockSlow()

// 判断当前模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
   if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
   throw("sync: inconsistent mutex state")
   }
   delta := int32(mutexLocked - 1<<mutexWaiterShift)
   if !starving || old>>mutexWaiterShift == 1 {
   delta -= mutexStarving
   }

   // 直接加锁
   atomic.AddInt32(&m.state, delta)
   break
}
awoke = true
iter = 0

RWMutex 读写锁

  • 加上读锁时,多个协程能同时进行读操作,但不能进行写操作。
  • 读锁里没有协程队列时,才可加写互斥锁,加上写锁,每次只允许一个协程进入写操作。

读写锁结构体如下:

type RWMutex struct {
 // 互斥锁作为写锁
 w           Mutex 
 // 作为写协程队列,等待读锁释放
 writerSem   uint32
 // 作为读协程队列。等待写锁释放
 readerSem   uint32
 // 正值:正在读的协程个数;负值:加了写锁
 readerCount atomic.Int32
 // 写锁生效前还需等待释放读锁的读协程个数
 readerWait  atomic.Int32
}

const rwmutexMaxReaders = 1 << 3

写锁

加写锁步骤:

  1. 先加 Mutex 写锁,如已经存在写锁,则进入等待。
  2. readerCount 值变为负值:readerWait - rwmutexMaxReaders,阻塞读锁获取。
  3. 计算需要等待多少个读协程释放。
  4. 如果需要等待读协程释放,则陷入 writerSem

加写锁代码如下:

// %GOROOT%src/sync/rwmutex.go
func (rw *RWMutex) Lock() {
 // ...
 // 1. 加锁
 rw.w.Lock()
 // 2. 将 `readerCount` 值变为负值:`readerWait - rwmutexMaxReaders`,阻塞读锁获取。
 r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
 // r 表示读协程的个数
 if r != 0 && rw.readerWait.Add(r) != 0 {
  // 3. 如果需要等待读协程释放,则陷入 `writerSem` 休眠。
  runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
 }
 // ...
}

解写锁过程:

func (rw *RWMutex) Unlock() {
 // ...
 // 1. 复原 readerCount 的值
 r := rw.readerCount.Add(rwmutexMaxReaders)
 if r >= rwmutexMaxReaders {
  race.Enable()
  fatal("sync: Unlock of unlocked RWMutex")
 }
 // 2. 释放加写锁期间后续进入的被阻塞读协程
 for i := 0; i < int(r); i++ {
  runtime_Semrelease(&rw.readerSem, false, 0)
 }
 // 3. 释放写锁
 rw.w.Unlock()
 if race.Enabled {
  race.Enable()
 }
}

读锁

加读锁步骤:

func (rw *RWMutex) RLock() {
 // ...
 // 1. readerCount 加 1,如果 readCounter 是正数,表示加锁成功
 if rw.readerCount.Add(1) < 0 {
  // 2. 如果 readerCount 为负数,表示有写锁存在,陷入 readerSem 排队
  runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
 }
 // ...
}

解读锁步骤:

func (rw *RWMutex) RUnlock() {
 // ...

 // 1. 直接 readerCount 减一
 if r := rw.readerCount.Add(-1); r < 0 {
  // 如果 readerCount 小于 0,表示前面有写协程在等待
  rw.rUnlockSlow(r)
 }
 // ...
}

func (rw *RWMutex) rUnlockSlow(r int32) {
 if r+1 == 0 || r+1 == -rwmutexMaxReaders {
  race.Enable()
  fatal("sync: RUnlock of unlocked RWMutex")
 }
 // 2. 每个读协程释放锁,将 readerWait 减 1
 if rw.readerWait.Add(-1) == 0 {
  // 3. 最后一个读协程将 readerWait 减到 0 时,去释放一个写协程
  runtime_Semrelease(&rw.writerSem, false, 1)
 }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/704052.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

锈湖新作地铁繁花试玩版正式上线啦

地铁繁花是锈湖厂商新作点击式解谜冒险解谜游戏&#xff0c;英文名称为“Underground Blossom”&#xff0c;在游戏中你将深入锈湖的地下&#xff0c;扮演并追溯Laura Vanderboom的人生和记忆吧&#xff01;从一个车站到另一个车站&#xff0c;每个地铁站都象征着劳拉的一段过去…

语音芯片WT2003H-B003,集成压力传感与语音提示的按摩器创新方案

​在如今追求健康、舒适生活方式的时代&#xff0c;压力传感技术与语音提示功能的结合正引领着按摩器行业的创新浪潮。WT2003H-B003语音芯片IC作为一款独具价值的语音芯片&#xff0c;以其集成了先进的压力传感算法和语音提示功能&#xff0c;为按摩器压感方案带来了全新的体验…

Selenium修改HTTP请求头三种方式

目录 什么是HTTP请求头 需要更改HTTP请求请求头 Selenium修改请求头 Java HTTP请求框架 代码实战 使用反向代理 使用 Firefox 扩展 下载火狐浏览器扩展 加载火狐扩展 设置扩展首选项 设置所需的功能 完整自动化用例 总结&#xff1a; 什么是HTTP请求头 HTTP请求头…

科普 | 什么是5G消息平台功能完备性认证,怎么才能获得5G消息平台功能完备性证书

5G消息平台功能完备性测试是由中国信息通信研究院同中国通信企业协会在5G消息工作组共同发起&#xff0c;旨在提升CSP的5G消息平台质量&#xff0c;促进5G消息业务发展。 测试针对5G消息平台的Chatbot下行消息交互、Chatbot接收消息、消息平台业务配置管理、消息平台业务统计管…

智能故障诊断的深度学习模型复杂度指标计算(MACs、Params)

引言: 对于智能故障诊断任务而言,受限于现场工业设备设施的算力,模型在轻量化上具有典型需求。因此,在保证模型精准性的同时尽量降低模型的复杂度是必要的,本博客对模型的复杂度概念进行了剖析,并在pytorch框架下对相关热门轻量级模型的复杂度评估进行了分析。 深度学习…

容智信息荣获2023第三届中国RPA+AI开发者大赛多项大奖

近日&#xff0c;历时数月的「2023第三届中国RPAAI开发者大赛」在苏州圆满收官。本次大赛由RPA中国联合全球人工智能产品应用博览会主办&#xff0c;容智信息作为顶级联合主办单位&#xff0c;主旨挖掘人才&#xff0c;促进RPA和AI技术在社会各领域的融合性应用。 这次大赛的主…

计算机网络————应用层

文章目录 概述域名系统DNS域名结构域名服务器解析过程常见的DNS记录DNS报文格式基础结构部分问题部分资源记录(RR, Resource Record)部分 万维网WWWURLHTTPHTTP发展HTTP报文结构请求报文响应报文 cookie 内容分发网络CDN 概述 应用层的具体内容就是规定应用进程在通信时所遵循的…

JS中常用内置对象

真正原创的东西很少&#xff0c;能抄明白就很不容易了 文章目录 数组常用方法❗push 数据增加到尾部并返回unshift 数据增加到头部并返回pop 删除最后一个数据并返回shift 删除第一个数据并返回sort 数组排序reverse 数组逆序concat 合并多个数组的数据并返回join 数据连接成字…

SpringBoot Thymeleaf企业级真实应用:使用Flying Saucer结合iText5将HTML界面数据转换为PDF输出(四) 表格中断问题

接上一篇 SpringBoot Thymeleaf企业级真实应用&#xff1a;使用Flying Saucer结合iText5将HTML界面数据转换为PDF输出(三) 给pdf加水印、页眉页脚、页眉logo 设置表格的css样式 table {/*分页时表格换行, 可不用, 使用表格行换行即可*//*page-break-before: always;*/border-…

QT简易加法计算器项目实现

完整代码见GitHub&#xff1a;点击进入 在该项目中&#xff0c;使用了三个文件&#xff0c;分别是CalculatorDialog.h, CalculatorDialog.cpp, main.cpp CalculatorDialog.h&#xff1a;在该头文件里定义了一些成员变量和槽函数&#xff0c;用于实现计算器基本功能。Calculator…

Springboot的自动装配解读

目录 1.Springboot的自动装配 1.1 组件装配 1.1.1 组件 1.2 Spring Framework 的模块装配 1.2.1 Import注解 1.2.2 BeanDefinition 1.3 Spring Framework 的条件装配 1.3.1 Profile 1.3.2 Conditional 1.3.3 MetaData元数据接口&#xff08;补充&#xff09; Annot…

4、离线数仓数据同步策略(全量表数据同步、增量表数据同步、首日同步、采集通道脚本)

1、离线数仓同步数据 1.1 用户行为数据同步 1.1.1 数据通道 用户行为数据由Flume从Kafka直接同步到HDFS&#xff0c;由于离线数仓采用Hive的分区表按天统计&#xff0c;所以目标路径要包含一层日期。具体数据流向如下图所示。 1.1.2 日志消费Flume配置概述 按照规划&…

【选择排序】手撕八大排序之直接选择排序和堆排序

目录 一.选择排序 1.直接选择排序 2.堆排序 一.选择排序 1.直接选择排序 选择排序&#xff08;Selection Sort&#xff09;是一种简单直观的排序算法。它的基本思想是每次遍历找到最小&#xff08;或最大&#xff09;的元素&#xff0c;然后将其放置在已排序序列的末尾。在…

实操接口自动化测试项目之项分层设计

本文以笔者当前使用的自动化测试项目为例&#xff0c;浅谈分层设计的思路&#xff0c;不涉及到具体的代码细节和某个框架的实现原理&#xff0c;重点关注在分层前后的使用对比&#xff0c;可能会以一些伪代码为例来说明举例。 接口测试三要素&#xff1a; 参数构造发起请求&a…

JS 1.如何实现继承 2.原型和原型链

1_使用class实现继承 /** 继承 */ class Person { constructor(name) { this.name name;}drink() { console.log(喝水)} }class Student extends Person{ constructor(name, score) { // new Personsuper(name);this.score score;}introduce() { console.log(我是${this.nam…

EasyCVR播放设备录像出现部分视频不能播放的原因排查与解决

EasyCVR视频融合平台基于云边端协同架构&#xff0c;具有强大的数据接入、处理及分发能力。平台支持多协议接入&#xff0c;包括&#xff1a;国标GB28181、RTMP、RTSP/Onvif、海康Ehome、海康SDK、大华SDK、宇视SDK等&#xff0c;对外可分发多格式视频流&#xff0c;包括RTSP、…

栈和队列(二) 队列的实现,用栈实现队列,用队列实现栈,设计循环队列

文章目录 队列的实现用队列实现栈用栈实现队列设计循环队列 队列的实现 这里的队列我们使用链式队列&#xff0c;好处就是可以很方便的取出队头的元素。 使用顺序队列取出队头元素所花费的时间复杂度为O&#xff08;N&#xff09;&#xff0c;把后面的元素向前移动一个下标所花…

CentOS Linux的最佳替代方案(二)_AlmaLinux OS 8.6基础安装教程

文章目录 CentOS Linux的最佳替代方案&#xff08;二&#xff09;_AlmaLinux OS 8.6基础安装教程一 AlmaLinux介绍和发展历史二 AlmaLinux基础安装2.1 下载地址2.2 安装过程 三 AlmaLinux使用3.1 关闭selinux/firewalld3.2 替换默认源3.3 安装一些必要工具 CentOS Linux的最佳替…

瓶盖扫码回收APP系统 废旧物品创造价值收益

资源回收再利用是近些年国家大力倡导的&#xff0c;人们也在积极践行&#xff0c;从垃圾回收、废旧衣物回收、烟盒回收等等.....今天小白要带大家了解的是瓶盖回收APP软件开发的相关事项。瓶盖回收APP是本着资源回收的初衷&#xff0c;可以时间废旧瓶盖的多次利用&#xff0c;减…

使用Xshell服务器跑程序,用pycharm连接服务器远程开发

目标&#xff1a; 1.使用Xshell在服务器上创建自己项目需要的虚拟环境 2.用pycharm实现远程服务器的连接&#xff08;这样就可以在本地debug或者写代码&#xff0c;然后再用xshell在服务器上跑&#xff09; 一、使用Xshell在服务器上创建自己项目需要的虚拟环境 1.打开Xshe…