go 源码解读 - sync.WaitGroup

news2024/11/15 15:42:49

go version 1.19.7

在 Go 语言中,sync.WaitGroup 是一个并发编程的同步工具,用于等待一组 Goroutine 执行完毕。

当需要等待多个 Goroutine 完成任务后才能执行下一步操作时,我们可以使用 sync.WaitGroup 实现协程间的同步。它提供了 Add()、Done() 和 Wait() 三个方法,分别用于计数、计数减一和等待协程执行完毕。

具体来说:

Add(delta int) 方法可以用于增加 WaitGroup 的计数器。每次调用 Add(),计数器就会增加 delta。如果
delta 为负数,则会减少计数器。如果计数器变为0,则表示所有协程执行完毕。

Done() 方法用于减少 WaitGroup 的计数器,相当于 Add(-1),表示一个协程已经执行完毕。

Wait() 方法用于等待所有协程执行完毕。它会阻塞调用它的协程,直到所有协程执行完毕并调用 Done() 方法,或者等待超时。

WaitGroup 的使用示例如下:

var wg sync.WaitGroup

func main() {
    wg.Add(2) // 增加计数器
    go func() {
        // 协程1 执行任务
        wg.Done() // 完成任务,减少计数器
    }()
    go func() {
        // 协程2 执行任务
        wg.Done() // 完成任务,减少计数器
    }()
    wg.Wait() // 等待所有协程执行完毕
}

TIP

信号量
一个信号量(semaphore)S是一个整型变量,它除了初始化外只能通过两个标准的原子操作:wait() 和 signal() 来访问
当信号量>0 时,表示资源可用,则 wait 会对信号量执行减 1 操作。
当信号量<=0 时,表示资源暂时不可用,获取信号量时,当前的进程/线程会阻塞,直到信号量为正时被唤醒。

定义wait()

wait(S) {
    while (S <= 0)
        ; // 忙等待
    S--;
}

定义signal()

signal(S) {
    S++;
}

结构

type WaitGroup struct {
	// 告诉编辑器 waitgroup 对象不可复制,  (sync.pool 也有这个) 
	noCopy noCopy
	// 高32位是计数器(counters)  低32位是等待者数量(waiters)
	// 这边计数器 其实是Add(int) 的数量的总和, 例如 Add(1) 后 再Add(1)  计数器的就是1 +1 =2
	//  等待数量就是现在有多少goroutine 在执行wait() 等待被释放
	state1 uint64 
	//  这个是信号量
	// runtime_Semrelease 表示将信号量递增(对应信号量中的 signal 操作)
	// runtime_Semacquire 表示将信号量递减(对应信号量中的 wait 操作)
	state2 uint32
}

这边 state1 和state2 都是64位, 如果是32位的 需要重新处理获取这俩个值, 如下


func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
	// &wg.state1获取wg.state1变量的地址;
    // unsafe.Pointer(&wg.state1)将变量的地址转换为指针类型unsafe.Pointer;
    // uintptr(unsafe.Pointer(&wg.state1))将指针类型unsafe.Pointer转换为uintptr类型,即将指针类型转换为无符号整数类型,以便进行位运算;
    //%8对uintptr类型的地址值进行取模运算,判断地址是否是8的倍数。  
    // 即判断wg.state1 变量的地址 是否对齐了8字节边界, 对齐了说明是64位的编译器
	if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
		// 64位系统 原样返回即可
		return &wg.state1, &wg.state2
	} else {
		// 32位系统需要重新获取 counter、waiter、sema
		// &wg.state1获取wg.state1变量的地址;
        // unsafe.Pointer(&wg.state1)将变量的地址转换为指针类型unsafe.Pointer;
        // (*[3]uint32)将指针类型unsafe.Pointer转换为指向一个包含3个uint32类型的数组指针类型*[3]uint32。这里使用(*[3]uint32)是因为我们需要处理的是一个长度为3的数组,而不是单独的uint32值;
         // 即 &wg.state1的地址被转换为类型为*[3]uint32的指针。
		state := (*[3]uint32)(unsafe.Pointer(&wg.state1))
		// 然后 截一下, 数组里第一个是信号量,  
		// 第二第三个 是counter 和 waiter ,把俩个 uint32 转成一个Uint64
		return (*uint64)(unsafe.Pointer(&state[1])), &state[0]
	}
}

在这里插入图片描述

TIP 内存对齐, 不深入

64位对齐是指将数据结构中的成员按照64位对齐方式进行排列的过程。在64位体系结构中,CPU访问内存的最小单位为8字节(64位),也就是说,如果访问的数据不是8字节的整数倍,那么CPU需要进行多次访问才能读取或写入完整的数据,这会导致额外的CPU开销和内存访问延迟。因此,在设计数据结构时,为了避免CPU访问内存的额外开销,需要对数据结构进行64位对齐。
具体来说,64位对齐的原则是,结构体中的每个成员的偏移量必须是8的倍数。如果成员的大小不是8的倍数,那么需要在成员之间填充空白,使得下一个成员的偏移量是8的倍数。

Add

在这里插入图片描述

func (wg *WaitGroup) Add(delta int) {
	// statep 高32位是计数器(counters)  低32位是等待者数量(waiters)
	// semap 是信号量
	// 获取 counters/waiters , 信号量
	statep, semap := wg.state()
	// 判断当前程序是否开启了竞态条件检查, 默认是false  , 不深入, 后面相关代码忽略不显示
	if race.Enabled {
		_ = *statep 
		if delta < 0 {
			race.ReleaseMerge(unsafe.Pointer(wg))
		}
		race.Disable()
		defer race.Enable()
	}
	// 将int 的delta 变成 Uint64 再左移32位, 与statep 进行累加
	// 相当于把 delta  与 state 的高32位计数器 值进行相加
	state := atomic.AddUint64(statep, uint64(delta)<<32)
	// 高32位, 计数器的值
	v := int32(state >> 32)
	// 低32位, 等待者的值
	w := uint32(state)
	// 计数器小于0 , panic
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	//  当 Wait 和 Add 并发执行时,会有概率触发下面的 panic
	// 还有一种, wait  在add 在之前操作的时候, 也会触发panic
	// 需要注意, 不要在被调用的goroutine内部调用Add , 应该在外面调用
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// v > 0: 说明还有待完成的任务数,此时不应该唤醒等待协程
    // w = 0: 说明没有协程在等待
    // 此时可以直接退出
	if v > 0 || w == 0 {
		return
	}
	// 存在等待的协程时,goroutine 已将计数器设置为0。
   // 现在不可能同时出现状态突变:
    // - Add 不能与 Wait 同时发生,
   // - 如果看到计数器==0,则 Wait 不会增加等待的协程。
   // 仍然要做一个廉价的健康检查,以检测 WaitGroup 的误用。
 
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// 此时v = 0 所有任务都完成了

	// 将statep 置成0, 将 waiters 设成0 
	*statep = 0
	// 唤醒等待者
	// 可能有一堆等待者,    (一堆协程等一个协程干完活, 一堆协程等一堆协程干完活)
	for ; w != 0; w-- {
	// signal,调用 Wait 的地方会解除阻塞
	// semrelease 原子增加 semap 的值, 并通知阻塞在semacquire中正在等待的goroutine 
	// 第二、第三个参数 不深入
		runtime_Semrelease(semap, false, 0)
	}
}

Done

// 其实只是对 Add 的调用,但是它的效果是,将计数器的值减去 1 , 一个等待的协程 执行完成了

func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

Wait

在这里插入图片描述

// 注意点
// 1、 wait 只能在主流程中才能阻塞
// 2、 wait 过程中 不能add  会panic
func (wg *WaitGroup) Wait() {
	// 获取计数器和信号量
	statep, semap := wg.state()
	// 这边 我思考了一下, Q1 : 为啥不把 获取计数器和信号量放进循环里面
	// 看注意点2, wait 过程中 不允许add , 这样每次都一样(注意这边的一样是指地址一样, 为什么强调地址, 看下面 Q2)不用重复获取了
	for {
		// 又卡了, Q2 :既然都一样了, 为啥 counter,waiter 需要重新解析啊
		// add 操作会控制 信号量变化
		// wait 操作会控制state 变化
		// atomic.LoadUint64(statep) 原子加载, 保证statep已经写完了
		// 那 statep 哪边写呢, 在后面
		//  atomic.CompareAndSwapUint64(statep, state, state+1) , 对state 进行自增加操作
		// 看下 参数 func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) 
		// 在这个地址上(statep) 把旧值(state)改成新值(state+1)
		// 地址statep 没变, 存的值变了, 那这边肯定要重新解析, cool 
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32)
		w := uint32(state)
		// 计数器是0 了, 不需要等待, 返回
		if v == 0 {
			return
		}
		// 这边 上面Q2 的时候一起分析了
		// state + 1 , 其实是对waiters 加1 (加了低32位上), 等待者加1
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			 // 这会阻塞,直到 sema (信号量)大于 0
			runtime_Semacquire(semap)
			// 防止并发
			if *statep != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			// 解除阻塞状态
			return
		}
	}
}

可继续深入学习的点
1、信号量
2、内存对齐
3、原子操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/506185.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

测试:概念篇

目录 简单介绍测试 我们先简单的介绍一下测试工程师 简单来看看测试和开发的区别 测试的基本概念 什么是需求 BUG 的概念 测试用例 什么是测试用例&#xff1f; 为什么有测试用例 测试周期 开发模型 瀑布模型&#xff1a; 螺旋模型&#xff1a; 敏捷软件开发 V …

PostgreSQL 查找重复数据(二)

创建表和测试数据&#xff1a; -- DROP TABLE IF EXISTS people; CREATE TABLE people (id integer GENERATED ALWAYS AS IDENTITY PRIMARY KEY,name varchar(50) NOT NULL,email varchar(100) NOT NULL );INSERT INTO people(name, email) VALUES (张三, zhangsantest.com),(李…

操作系统考试复习-—第四章 分段式 段页式存储方式

分段从存储管理方式&#xff1a;一方面是通常的程序都可以分为若干段&#xff0c;另一方面是实现和满足信息共享&#xff0c;信息保护&#xff0c;动态链接以及信息的动态增长等需要。也都是以段为基本单位实现的。所以说&#xff0c;分段存储管理方式更符合用户和程序员多方面…

JWT认证

一、什么是JWT 官网地址: https://jwt.io/introduction/ jsonwebtoken&#xff08;JWT&#xff09;是一个开放标准&#xff08;rfc7519&#xff09;&#xff0c;它定义了一种紧凑的、自包含的方式&#xff0c;用于在各方之间以JSON对象安全地传输信息。此信息可以验证和信任&…

华为nqa实验拓扑案例

bqa是一种实时的网络性能探测和统计技术&#xff0c;可以对响应时间、网络抖动、丢包率等网络信息进行统计。如图1所示&#xff0c;接口备份与NQA联动功能配置相对简单&#xff0c;只需在本端RouterA上配置NQA测试例&#xff0c;并在RouterA的备份接口上配置接口备份与NQA联动&…

自定义组件中如何注入Spring底层的组件

1.概述 自定义的组件要想使用Spring容器底层的一些组件&#xff0c;比如ApplicationContext&#xff08;IOC容器&#xff09;、底层的BeanFactory等等&#xff0c;那么只需要让自定义组件实现XxxAware接口即可。此时&#xff0c;Spring在创建对象的时候&#xff0c;会调用XxxA…

搞懂 API,API 常见技术使用场景分享

API&#xff08;应用程序编程接口&#xff09;是一种允许软件应用程序之间相互交互和通信的技术。以下是API常用的使用场景&#xff1a; 应用程序开发 API通常被用于网站或应用程序的开发中&#xff0c;以便在不同平台、语言及数据库之间获取数据或进行消息传递。例如&#xff…

探索数字化转型新道路!流辰信息微服务与您一起创未来!

科技在进步&#xff0c;社会在发展&#xff0c;办公自动化也在高速发展中。数字化转型是当下企业获得长久发展的趋势之一&#xff0c;在信息瞬间万变的社会中&#xff0c;谁掌握了核心技术&#xff0c;谁能与时代同步&#xff0c;谁就能开启新的康庄大道&#xff0c;谁就能在转…

VS2017配置Qt——超详细步骤教学(看完不会算你狠)

一、环境要求 visual studio 2017 vsaddin Qt14.1 mysql 注意mysql环境与msvc2017编译器环境保持一致。 mysql32位 配 msvc2017 32位 或 mysql64位 配 msvc2017 64位 注意&#xff1a;环境不一致会导致软件运行错误&#xff0c;为了避免这些错误&#xff0c;要将…

第1章计算机系统漫游之 “源代码的编译与执行” 及 “操作系统管理硬件”

文章目录 1、信息就是位上下文2、程序被其他程序翻译成不同的格式3、了解编译系统如何工作的益处4、处理器读并解释储存在存储器中的指令4.1 系统的硬件组成4.2 执行 hello 程序 5、高速缓存6、形成层次结构的存储设备7、操作系统管理硬件7.1 进程7.2 线程7.3 虚拟存储器7.4 文…

docker容器内使用cat命令修改文件

有时候docker容器内部没装vi 或vim命令&#xff0c;无法使用vi来修改文件 可以使用cat命令来查看文件 cat 主要功能一次显示整个文件:cat filename 从键盘创建一个文件:cat > filename 只能创建新文件,不能编辑已有文件 将几个文件合并为一个文件:cat file1 file2 > fi…

最新黄金市场价格分析之干掉调整浪

等待的过程无疑是最令人心烦的。各位朋友应该试过&#xff0c;等待自己的朋友、亲人&#xff0c;等等结果&#xff0c;等待成绩公布等等。但是等待是我们干任何事都必不可少的过程&#xff0c;是我们缓冲、蓄力的阶段。最新黄金市场价格分析中的等待&#xff0c;体现在调整浪的…

Python心经(3)

这一节总结点demo和常用知识点 目录 有关字符串格式化打印的 lambda匿名函数&#xff0c;&#xff0c;将匿名函数作为参数传入 文件读写 生成器 python的装饰器 简单的网站代码&#xff1a; 有关三元运算 推导式&#xff1a; 新浪面试题&#xff1a; 有关面向对象里…

SpringBoot项目中一些常用的,工具类

推荐多使用这个&#xff1a; Hutool参考文档Hutool&#xff0c;Java工具集https://hutool.cn/docs/#/core/%E9%9B%86%E5%90%88%E7%B1%BB/%E9%9B%86%E5%90%88%E5%B7%A5%E5%85%B7-CollUtil?id%e4%bb%8b%e7%bb%8d 1&#xff1a;断言 断言是一个逻辑判断&#xff0c;用于检查不应…

芯片封装技术(三)

Interposer 是一种用于连接芯片的中间层技术&#xff0c;它的基底通常是一块硅基底&#xff0c;而硅基底也是 Substrate 的一种。因此&#xff0c;Interposer 与 Substrate 有一定的关系。对于RDL Interposer来说&#xff0c;Si Interposer的信号布线密度进一步提高&#xff0c…

[Linux] Linux文件系统

&#x1f941;作者&#xff1a; 华丞臧. &#x1f4d5;​​​​专栏&#xff1a;【LINUX】 各位读者老爷如果觉得博主写的不错&#xff0c;请诸位多多支持(点赞收藏关注)。如果有错误的地方&#xff0c;欢迎在评论区指出。 文章目录 一、Linux文件系统1.1 磁盘1.2 inode1.3 软硬…

Philosophy of life: growing flowers in your heart

Growing flowers in your heart An aged man lived in a nice cottage with a large garden in a town in England. He is seen busy looking after his flowers all time. 第一部分介绍的是: 有一个老人在英格兰的镇上有一个带大花园的屋子&#xff0c;他一直在忙着照顾他的花…

机器学习实战:带你进入AI世界!

机器学习是人工智能领域的一个重要分支&#xff0c;可以帮助我们从大量数据中发现规律&#xff0c;进行预测和分类等任务。然而&#xff0c;想要真正掌握机器学习算法&#xff0c;并将其应用到实际问题中&#xff0c;还需要进行大量的实战练习。 本文将介绍几个常见的机器学习实…

seurat -- 细胞注释部分

文章目录 brief寻找差异基因部分注释细胞部分详细参数 brief 细胞注释大概分为两步&#xff1a;差异基因 --> marker genes —> map reference 差异基因可以是表达量上存在差异也可以是表达细胞占比上存在差异&#xff0c;通常二者兼顾考虑。 marker genes 个人理解为…

蓝牙网状网络的基本原理及应用开发

借助蓝牙 5 的网状网络功能&#xff0c;开发人员可以增强无线连接系统&#xff08;如物联网设备&#xff09;的通信范围和网络可用性。但是&#xff0c;网状网络的低功耗无线硬件设计与网状网络软件开发之间存在着复杂的层次&#xff0c;这可能会使开发人员迅速陷入混乱并危及项…