深入理解 go sync.Waitgroup

news2024/12/25 0:06:53

本文基于 Go 1.19。

go 里面的 WaitGroup 是非常常见的一种并发控制方式,它可以让我们的代码等待一组 goroutine 的结束。 比如在主协程中等待几个子协程去做一些耗时的操作,如发起几个 HTTP 请求,然后等待它们的结果。

WaitGroup 示例

下面的代码展示了一个 goroutine 等待另外 2 个 goroutine 结束的例子:

 

go

复制代码

func TestWaitgroup(t *testing.T) { var wg sync.WaitGroup // 计数器 +2 wg.Add(2) go func() { sendHttpRequest("https://baidu.com") // 计数器 -1 wg.Done() }() go func() { sendHttpRequest("https://baidu.com") // 计数器 -1 wg.Done() }() // 阻塞。计数器为 0 的时候,Wait 返回 wg.Wait() } // 发起 HTTP GET 请求 func sendHttpRequest(url string) (string, error) { method := "GET" client := &http.Client{} req, err := http.NewRequest(method, url, nil) if err != nil { return "", err } res, err := client.Do(req) if err != nil { return "", err } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { return "", err } return string(body), err }

在这个例子中,我们做了如下事情:

  • 定义了一个 WaitGroup 对象 wg,调用 wg.Add(2) 将其计数器 +2
  • 启动两个新的 goroutine,在这两个 goroutine 中,使用 sendHttpRequest 函数发起了一个 HTTP 请求。
  • 在 HTTP 请求返回之后,调用 wg.Done 将计数器 -1
  • 在函数的最后,我们调用了 wg.Wait,这个方法会阻塞,直到 WaitGroup 的计数器的值为 0 才会解除阻塞状态。

WaitGroup 基本原理

WaitGroup 内部通过一个计数器来统计有多少协程被等待。这个计数器的值在我们启动 goroutine 之前先写入(使用 Add 方法), 然后在 goroutine 结束的时候,将这个计数器减 1(使用 Done 方法)。除此之外,在启动这些 goroutine 的协程中, 会调用 Wait 来进行等待,在 Wait 调用的地方会阻塞,直到 WaitGroup 内部的计数器减到 0。 也就实现了等待一组 goroutine 的目的

背景知识

在操作系统中,有多种实现进程/线程间同步的方式,如:test_and_setcompare_and_swap、互斥锁等。 除此之外,还有一种是信号量,它的功能类似于互斥锁,但是它能提供更为高级的方法,以便进程能够同步活动。

信号量

一个信号量(semaphore)S是一个整型变量,它除了初始化外只能通过两个标准的原子操作:wait() 和 signal() 来访问。 操作 wait() 最初称为 P(荷兰语 proberen,测试);操作 signal() 最初称为 V(荷兰语 verhogen,增加),可按如下来定义 wait()

PV 原语。

 

javascript

复制代码

wait(S) { while (S <= 0) ; // 忙等待 S--; }

可按如下来定义 signal()

 

javascript

复制代码

signal(S) { S++; }

在 wait() 和 signal() 操作中,信号量整数值的修改应不可分割地执行。也就是说,当一个进程修改信号量值时,没有其他进程能够同时修改同一信号量的值。

简单来说,信号量实现的功能是:

  • 当信号量>0 时,表示资源可用,则 wait 会对信号量执行减 1 操作。
  • 当信号量<=0 时,表示资源暂时不可用,获取信号量时,当前的进程/线程会阻塞,直到信号量为正时被唤醒。

WaitGroup 中的信号量

在 WaitGroup 中,使用了信号量来实现 goroutine 的阻塞以及唤醒:

  • 在调用 Wait 的地方,goroutine 会陷入阻塞,直到信号量大于等于 0 的时候解除阻塞状态,得以继续执行。
  • 在调用 Done 的时候,如果 WaitGroup 内的等待协程的计数器减到 0 的时候,信号量会进行递增,这样那些阻塞的协程会进行执行下去。

WaitGroup 数据结构

 

go

复制代码

type WaitGroup struct { noCopy noCopy // 高 32 位为计数器,低 32 位为等待者数量 state atomic.Uint64 sema uint32 }

noCopy

我们发现,WaitGroup 中有一个字段 noCopy,顾名思义,它的目的是防止复制。 这个字段在运行时是没有什么影响的,但是我们通过 go vet 可以发现我们对 WaitGroup 的复制。 为什么不能复制呢?因为一旦复制,WaitGroup 内的计数器就不再准确了,比如下面这个例子:

 

go

复制代码

func test(wg sync.WaitGroup) { wg.Done() } func TestWaitGroup(t *testing.T) { var wg sync.WaitGroup wg.Add(1) test(wg) wg.Wait() }

go 里面的函数参数传递是值传递。调用 test(wg) 的时候将 WaitGroup 复制了一份。

在这个例子中,程序会永远阻塞下去,因为 test 中调用 wg.Done() 的时候,只是将 WaitGroup 副本的计数器减去了 1, 而 TestWaitGroup 里面的 WaitGroup 的计数器并没有发生改变,因此 Wait 会永远阻塞。

我们如果需要将 WaitGroup 作为参数,请传递指针:

 

go

复制代码

func test(wg *sync.WaitGroup) { wg.Done() }

传递指针之后,我们在 test 中调用 wg.Done() 修改的就是 TestWaitGroup 里面同一个 WaitGroup。 从而,Wait 方法可以正常返回。

state

WaitGroup 里面的 state 是一个 64 位的 atomic.Uint64 类型,它的高 32 位用来保存 counter(也就是上面说的计数器),低 32 位用来保存 waiter(也就是阻塞在 Wait 上的 goroutine 数量。)

sema

WaitGroup 通过 sema 来记录信号量:

  • runtime_Semrelease 表示将信号量递增(对应信号量中的 signal 操作)
  • runtime_Semacquire 表示将信号量递减(对应信号量中的 wait 操作)

简单来说,在调用 runtime_Semacquire 的时候 goroutine 会阻塞,而调用 runtime_Semrelease 会唤醒阻塞在同一个信号量上的 goroutine。

WaitGroup 的三个基本操作

  • Add: 这会将 WaitGroup 里面的 counter 加上一个整数(也就是传递给 Add 的函数参数)。
  • Done: 这会将 WaitGroup 里面的 counter 减去 1。
  • Wait: 这会将 WaitGroup 里面的 waiter 加上 1,并且调用 Wait 的地方会阻塞。(有可能会有多个 goroutine 等待一个 WaitGroup

WaitGroup 的实现

Add 的实现

Add 做了下面两件事:

  1. 将 delta 加到 state 的高 32 位上
  2. 如果 counter 为 0 了,并且 waiter 大于 0,表示所有被等待的 goroutine 都完成了,而还有在等待的 goroutine,这会唤醒那些阻塞在 Wait 上的 goroutine。

源码实现:

 

go

复制代码

func (wg *WaitGroup) Add(delta int) { // wg.state 的计数器加上 delta //(加到 state 的高 32 上) state := wg.state.Add(uint64(delta) << 32) // 高 32 位加上 delta v := int32(state >> 32) // 高 32 位(counter) w := uint32(state) // 低 32 位(waiter) // 计数器不能为负数(加上 delta 之后不能为负数,最小只能到 0) if v < 0 { panic("sync: negative WaitGroup counter") } // 正常使用情况下,是先调用 Add 再调用 Wait 的,这种情况下,w 是 0,v > 0 if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // v > 0,计数器大于 0 // w == 0,没有在 Wait 的协程 // 说明还没有到唤醒 waiter 的时候 if v > 0 || w == 0 { return } // Add 负数的时候,v 会减去对应的数值,减到最后 v 是 0。 // 计数器是 0,并且有等待的协程,现在要唤醒这些协程。 // 存在等待的协程时,goroutine 已将计数器设置为0。 // 现在不可能同时出现状态突变: // - Add 不能与 Wait 同时发生, // - 如果看到计数器==0,则 Wait 不会增加等待的协程。 // 仍然要做一个廉价的健康检查,以检测 WaitGroup 的误用。 if wg.state.Load() != state { // 不能在 Add 的同时调用 Wait panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 将等待的协程数量设置为 0。 wg.state.Store(0) for ; w != 0; w-- { // signal,调用 Wait 的地方会解除阻塞 runtime_Semrelease(&wg.sema, false, 0) // goyield } }

Done 的实现

WaitGroup 里的 Done 其实只是对 Add 的调用,但是它的效果是,将计数器的值减去 1。 背后的含义是:一个被等待的协程执行完毕了

Wait 的实现

Wait 主要功能是阻塞当前的协程:

  1. Wait 会先判断计数器是否为 0,为 0 说明没有任何需要等待的协程,那么就可以直接返回了。
  2. 如果计数器还不是 0,说明有协程还没执行完,那么调用 Wait 的地方就需要被阻塞起来,等待所有的协程完成。

源码实现:

 

go

复制代码

func (wg *WaitGroup) Wait() { for { // 获取当前计数器 state := wg.state.Load() // 计数器 v := int32(state >> 32) // waiter 数量 w := uint32(state) // v 为 0,不需要等待,直接返回 if v == 0 { // 计数器是 0,不需要等待 return } // 增加 waiter 数量。 // 调用一次 Wait,waiter 数量会加 1。 if wg.state.CompareAndSwap(state, state+1) { // 这会阻塞,直到 sema (信号量)大于 0 runtime_Semacquire(&wg.sema) // goparkunlock // state 不等 0 // wait 还没有返回又继续使用了 WaitGroup if wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } // 解除阻塞状态了,可以返回了 return } // 状态没有修改成功(state 没有成功 +1),开始下一次尝试。 } }

总结

  • WaitGroup 使用了信号量来实现了并发资源控制,sema 字段表示信号量。
  • 使用 runtime_Semacquire 会使得 goroutine 阻塞直到计数器减少至 0,而使用 runtime_Semrelease 会使得信号量递增,这等于是通知之前阻塞在信号量上的协程,告诉它们可以继续执行了。
  • WaitGroup 作为参数传递的时候,需要传递指针作为参数,否则在被调用函数内对 Add 或者 Done 的调用,在 caller 里面调用的 Wait 会观测不到。
  • WaitGroup 使用一个 64 位的数来保存计数器(高 32 位)和 waiter(低 32 位,正在等待的协程的数量)。
  • WaitGroup 使用 Add 增加计数器,使用 Done 来将计数器减 1,使用 Wait 来等待 goroutine。Wait 会阻塞直到计数器减少到 0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/553586.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CH32V3xx USART 空闲中断+DMA接收

目录 1、CH32V3xx USART简介2、测试程序2.1 USART 初始化配置2.1 发送函数2.1 接收中断1、CH32V3xx USART简介 CH32V3xx系列MCU包含3个同步异步收发器(USART1、2、3)和5个通用异步收发器(UART4、5、6、7、8)。USART模块支持DMA功能,DMA可以实现快速连续收发。使用DMA发送时…

windows安装pcl

文章目录 vcpkg方式安装默认安装x86默认没安装cloud_viewer.h AllInOne安装pcl的官方例程 vcpkg方式安装 官方建议在windows下&#xff0c;采用vcpkg的方式来安装&#xff1a; 安装vcpkg的&#xff0c;可以看这里&#xff1a;【WIN安装vcpkg】 默认安装x86 要注意的是 vcpkg…

品牌线上推广:如何进行电商控价?

随着电商平台的迅速崛起&#xff0c;电商控价已经成为了一个十分重要的话题。所谓电商控价&#xff0c;是指在电子商务平台上&#xff0c;品牌方针对自己的产品进行价格的控制&#xff0c;以确保产品的售价不被平台和第三方商家恶意砍价而影响品牌形象和盈利。那么&#xff0c;…

iptables 防火墙(二)SNAT/DNAT

目录 一&#xff1a;SNAT原理与应用 1.SNAT介绍 2.SNAT 应用环境 3.SNAT原理 二&#xff1a;SNAT配置 第一步&#xff1a;设置各个端口的网卡 1.先准备客户机、web服务器、网关服务器 2.网关服务器设置 &#xff08;1&#xff09;添加网卡 &#xff08;2&#xff09;修…

10分钟!Python写一个角色扮演带上下文功能的chatgpt聊天机器人!上篇!

大家都在网页上玩ChatGPT玩的不亦乐乎&#xff0c;但是很多时候我们需要打造个人专属的GPT&#xff0c;比如我是律师&#xff0c;我是医生&#xff0c;我是营养师&#xff0c;我是财会&#xff0c;我是猎头&#xff0c;我需要专属的某个领域的GPT&#xff0c;其实也不难。 今天…

加密解密软件VMProtect教程(八)许可制度之管理许可证

VMProtect是新一代软件保护实用程序。VMProtect支持德尔菲、Borland C Builder、Visual C/C、Visual Basic&#xff08;本机&#xff09;、Virtual Pascal和XCode编译器。 同时&#xff0c;VMProtect有一个内置的反汇编程序&#xff0c;可以与Windows和Mac OS X可执行文件一起…

加密解密软件VMProtect入门使用教程(八)许可制度之许可系统功能

VMProtect是新一代软件保护实用程序。VMProtect支持德尔菲、Borland C Builder、Visual C/C、Visual Basic&#xff08;本机&#xff09;、Virtual Pascal和XCode编译器。 同时&#xff0c;VMProtect有一个内置的反汇编程序&#xff0c;可以与Windows和Mac OS X可执行文件一起…

‘Light轻食初试版’小程序制作到发布过程中遇到的问题

目录 前言技术栈开发经验布局字体问题图片问题协作开发 发布时遇到的问题接口问题分包、图片显示问题小程序与公众号关于测试版本 总结 前言 学完小程序方面的知识后&#xff0c;我花四天时间做了一个简单的微信小程序——轻食Light说。这个小程序目前叫作“Light轻食说初始版…

【R模型】R语言线性回归之简单线性回归模型 (一)

&#x1f482; 个人信息&#xff1a;酷在前行&#x1f44d; 版权: 博文由【酷在前行】原创、需要转载请联系博主&#x1f440; 如果博文对您有帮助&#xff0c;欢迎点赞、关注、收藏 订阅专栏&#x1f516; 本文收录于【R模型】&#xff0c;该专栏主要介绍R语言各类型机器学习…

ORA-01555-快照过旧问题处理

背景 通过监控发现ETL报错 如何处理 ORA-01555 是 Oracle 数据库的一个错误代码&#xff0c;表示出现了“快照太旧”的错误。这个错误通常是由于数据库中的回滚段不够大&#xff0c;导致无法满足当前事务的需&#xff0c;从而导致事务回滚失败。 具体来说&#xff0c;ORA-0…

如何知道调用电商API是否成功返回数据?查看错误码解释

在API调用过程中&#xff0c;系统可能会返回一些错误码。错误码能够帮助开发者快速准确地了解出现的异常情况。错误码的含义通常涉及到请求参数不合法、认证失败、服务器内部错误等各种问题&#xff0c;它们提供了有关API调用失败的信息和上下文&#xff0c;在错误排查和修复时…

【案例教程】环境影响与碳排放生命周期评估应用及案例分析实践技术

生命周期分析是一种分析工具&#xff0c;它可帮助人们进行有关如何改变产品或如何设计替代产品方面的环境决策&#xff0c;即由更清洁的工艺制造更清洁的产品。例如&#xff0c;生命周期分析的结果表明&#xff0c;某种产品能耗低&#xff0c;寿命长&#xff0c;不含有毒化学物…

Java实现识别发票信息

Java实现调用第三方接口识别发票信息 需求&#xff1a;对每个发票图片文件进行重命名&#xff0c;名称为发票号固定信息&#xff0c;主要处理增值税发票 这里需要用到第三方接口&#xff0c;OCR识别功能&#xff0c;这里我用的是百度云接口&#xff0c;所以你需要注册百度云账…

计算机网络:计网体系结构

计网体系结构 1. 基本概念1.0 计算机网络的发展1.0.1 第一阶段1.0.2 第二阶段1.0.3 第三阶段 1.1 计算机网络的概念1.2 计算机网络的功能1.3 计算机网络的组成1.4 计算机网络的分类1.5 标准化工作及相关组织1.6 相关性能指标1.6.1 速率1.6.2 带宽1.6.3 吞吐量1.6.4 时延1.6.5 时…

springboot+vue摄影跟拍预定管理系统(源码+文档)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的摄影跟拍预定管理系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 &#x1f495;&#x1f495;作者&#xff1…

SSM框架学习-Spring事务管理入门

文章目录 前言六、Spring事务1.Spring事务简介2.入门案例--模拟银行间转账业务3.开启Spring事务的一般步骤4.Spring事务角色5.spring事务属性--rollbackfor6.入门案例进阶--转账业务追加日志7. Spring事务属性--事务传播行为 总结 前言 为了巩固所学的知识&#xff0c;作者尝试…

项目管理:有效的沟通对项目的成功至关重要

为实施有效的沟通&#xff0c;需要建立沟通管理计划同时理解什么是沟通&#xff0c;沟通的对象是谁&#xff0c;沟通的目标是什么&#xff0c;难度在哪里&#xff0c;并选择合适的沟通方式。 项目沟通是确保项目团队的相关信息能及时、正确地产生、收集、发布、储存和最终处理…

77.建立一个Web应用程序的布局第一部分

本次我们需要设计的布局是这样样子&#xff0c;这个很想一个邮件系统的基本布局&#xff1b; ● 首先我们生成基础代码&#xff0c;基础代码很简单&#xff0c;不用过多解释 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-…

MySQL锁应用详解

文章目录 前言MySQL锁的详解1. 表级锁1.1 读锁&#xff08;共享锁&#xff09;对比查询操作更新操作获取写锁获取读锁 1.2 写锁&#xff08;排他锁&#xff09;对比获取写锁对表进行事务操作获取表的读锁对表进行查询操作 2. 行级锁2.1 共享锁2.2 排他锁 锁的应用场景1.1 并发读…

PDF怎么添加水印?简单途径说明

在工作中&#xff0c;我们经常需要对PDF文档进行保护&#xff0c;以确保其不被未经授权的人员查看或修改。其中一种常见的保护方式是在PDF文件中添加水印。水印不仅可以保护文件的安全性&#xff0c;还可以帮助识别文档的来源以及保护版权。在本文中&#xff0c;我们将介绍如何…