make 和 new 的区别
make 和 new 都是用来分配内存
- make 只能对 slice map channel 进行初始化结构体实例。new 可以对任意类型进行初始化
- make 用于分配数据对象的具体实例,new 用于分配数据类型的默认值,并返回该数据的指针。
new 出来的 slice 、map、channel 是无法使用的,是一个 nil 值。这三个值是不会分配地址空间的,因为其默认值就是nil。而map 会返回非nil 值。
Slice
slice 切片,具体概念如下:
- data 存了什么元素
- len 存了多少个元素
- cap 总共可以存多少个元素
- cap 指的是底层数组的容量,也就是从slice 的 第一个元素开始,底层数组能够容纳的元素个数。
- slice 的 cap 决定了 slice 能扩展的空间。
- 当 len 超过了 cap,就需要扩容。
slice 的元素会存在于一段连续的内存中,底层实现是数组。如果初始化的 切片是 var ints []int
则data = nil、len、cap 均为0,没有底层数据结构。
而make 的方式初始化slice,会按照make的传参来初始化 slice 的底层数据机构。
new 方法创建的 slice 同样没有底层数组,只有通过 append 才能为其创建底层数组 并追加元素。
数组转切片
func main() {
arr := [10]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
var (
s1 []int = arr[1:5]
s2 []int = arr[7:]
)
fmt.Printf("%v,%v,%v\n", s1, len(s1), cap(s1)) //cap = 9
fmt.Printf("%v,%v,%v", s2, len(s2), cap(s2)) //cap = 3
}
上面这段代码就是利用 数组的底层逻辑来划分切片。
- 首先数组的底层是 0~9 一共 10 位。
- s1 利用 cap 从数组的 1位开始,1位对应的是数字1,那么len = 5-1 ,cap = 10 - 1
- s2 利用 cap 从数组的 7位开始,7位对应的数字7,那么 len = 10-7,cap = 10 - 7
重点注意:slice 的 cap 是一个非常关键的字段,cap 是从数组的第一个元素开始计算,而不是从最后一个元素开始计算。比如:[1:10] 和 [2:5],只会从 1 / 2 开始计算数组长度,而不会从 10 /5 计算。
浅拷贝 和 深拷贝
概念:
- 浅拷贝 只 复制对象的指针,两个对象会用同一块内存空间,修改其中一个会影响另一个。
- 深拷贝 是 复制对象的数据,而不复制对象的指针,两个对象拥有各自的内存空间,修改其中一个对象不会影响另一个。
通过实例来理解深拷贝和浅拷贝:
- 假设我们有一个结构体类型 Person,它有两个字段:Name 和 Age。我们可以用下面的代码来创建一个 Person 的实例:
p1 := Person{Name: "Alice", Age: 20}
- 如果我们想要复制 p1 的值,我们可以用下面的代码来实现深拷贝:
p2 := p1 // 深拷贝,p2 和 p1 拥有各自的内存空间
p2.Name = "Bob" // 修改 p2 的 Name 字段
fmt.Println(p1.Name) // Alice,p1 的 Name 字段不受影响
fmt.Println(p2.Name) // Bob,p2 的 Name 字段已经改变
- 但是,如果我们想要复制 p1 的指针,我们可以用下面的代码来实现浅拷贝:
p3 := &p1 // 浅拷贝,p3 和 p1 共享同一块内存空间
p3.Name = "Charlie" // 修改 p3 的 Name 字段
fmt.Println(p1.Name) // Charlie,p1 的 Name 字段也被修改了
fmt.Println(p3.Name) // Charlie,p3 的 Name 字段已经改变
注意:浅拷贝复制后,如果使用append 就会触发新的底层数组,也就变成了两个 slice。修改其中一个也就不会导致第二个一并修改了。
渐进式扩容 和 翻倍扩容
slice 默认扩容规则(翻倍预估),具体规则如下:
一般的默认规则是在当前基础上 * 2,如果 *2 之后的容量可以满足要求,则 = *2的容量。然后将原有数据复制到新的内存地址中。
这种翻倍扩容的方式,会重新分配底层数组,需要消耗一定的时间 和 内存开销。
渐进式扩容:
渐进式扩容无需进行翻倍,而是根据原来的 cap 和 len 来计算新的 cap。下面是示例代码:
func main() {
// 创建一个初始 cap 为 10 的 slice
s := make([]int, 0, 10)
// 打印初始的 len 和 cap
fmt.Printf("len: %d, cap: %d\n", len(s), cap(s))
// 循环添加元素
for i := 0; i < 100; i++ {
s = SliceAppend(s, i)
// 每添加 10 个元素,打印一次 len 和 cap
if i%10 == 0 {
fmt.Printf("len: %d, cap: %d\n", len(s), cap(s))
}
}
}
func SliceAppend(old []int, i int) []int {
var newCaps int
//判断当前的长度是否超过了容量
if len(old) > cap(old) {
//则需要计算新的容量,如果原来的容量 《 1024 则直接翻倍
if cap(old) < 1024 {
newCaps = cap(old) * 2
} else {
newCaps = cap(old) * 5 / 4
}
} else {
old = append(old, i)
return old
}
//创建一个新的 slice, 并将容量 = 新的容量
newSlice := make([]int, len(old), newCaps)
//复制原来的元素
copy(newSlice, old)
newSlice = append(newSlice, i)
return newSlice
}
这种渐进式扩容的方式,通过cap 和 len 来计算新的 cap,而不是简单的翻倍,避免系统过度分配内存,减少内存浪费和垃圾回收压力。
渐进式扩容可以提高内存利用率,减少内存碎片和重新分配的次数,因为它可以适应不同大小和增长的 slice,提高灵活性。不依赖于 append 函数的内部实现,更清晰的控制 slice 的 cap,自定义扩容策略。
Benchmark 测试代码如下:
package test
import (
"fmt"
"testing"
)
func growSlice(old []int, n int) []int {
if old == nil {
return nil
}
var newCaps int
if len(old) > cap(old) {
if cap(old) > 1024 {
newCaps = cap(old) * 2
} else {
newCaps = cap(old) * 5 / 4
}
} else {
old = append(old, n)
return old
}
newSlice := make([]int, len(old), newCaps)
copy(newSlice, old)
newSlice = append(newSlice, n)
return newSlice
}
func BenchmarkGrowSlice(b *testing.B) {
s1 := make([]int, 0, 10)
b.StartTimer()
for i := 0; i < b.N; i++ {
//使用渐进式添加
s1 = growSlice(s1, i)
}
b.StopTimer()
fmt.Printf("s1 len: %d, cap: %d\n", len(s1), cap(s1)) // 打印 s1 的 len 和 cap
}
func BenchmarkAppendSlice(b *testing.B) {
s1 := make([]int, 0, 10)
b.StartTimer()
for i := 0; i < b.N; i++ {
//使用渐进式添加
s1 = growSlice(s1, i)
}
b.StopTimer()
fmt.Printf("s2 len: %d, cap: %d\n", len(s1), cap(s1)) // 打印 s1 的 len 和 cap
}
Map
map 用来存储 key-value 集合的数据,每个key 只会出现一次。Map的主要实现有两种方式“哈希表(hash table)”和 “搜索树(search tree)”。golang 和 java 基于哈希表实现,c++的Map基于 平衡二叉树(红黑)实现。
map 的key 都允许哪些类型
map的 key-value 中,key 允许所有的可以比较的数据类型,如下:
- bool
- string
- int、float
- 所有的指针类型
- 数组类型,但是要求数组类型的元素,是可以比较的类型。
- 结构体类型,结构体中的字段都是可以比较的类型。
不建议使用:
slice、map、struct、channel 作为map的key,因为难以维护。如果非要使用,建议将这些类型的数据做成 md5的编码为key值。
map删除条目会触发资源回收吗
首先,map中删除一个key,在一定条件下会释放资源。要满足条件如下:
- 该map 的key没有任何引用关系,没有任何其他引用指向这个key。
- 该map 的key在垃圾回收检测时访问不到这个内存空间,就会被释放。
如果需要立即释放,要手动调用gc来回收资源。或者显示的清空map。
map 的某些key被删除后,map本身的底层数组并不会缩小,占有内存空间也不会减少。因为删除key-value只是从分配好的 哈希表中将这个值删除,并不会改变底层哈希表的容量大小。
虽然底层哈希表的数组容量不变,但是会有GC在满足条件情况下释放资源。
哈希表(散列表)
哈希表是由 数组 + 哈希桶链表组合而成,其中的几个概念是必须要提前了解的:
- 哈希函数
- 哈希冲突
- 哈希桶:哈希桶(又称:哈希槽)每个哈希桶都是一个链表,链表中记录着k-v的数据对象。
- 负载因子:负载因子是表示哈希表元素的填满程度。负载因子越大,元素越多,冲突概率就越高。负载因子是用来决定哈希表是否要进扩容的关键指标参数。
哈希函数
哈希函数又称为散列函数,用于将任意大小的数据映射到固定大小值得函数,常见得包括(MD5\SHA)等。一个优秀的 hash 函数应该包含以下特性:
- 均匀性:一个好的哈希函数应该在其输出范围内尽可能均匀地映射,也就是说,应以大致相同的概率生成输出范围内的每个哈希值。
- 效率高:哈希效率要高,即使很长的输入参数也能快速计算出哈希值。
- 可确定性:哈希过程必须是确定性的,这意味着对于给定的输入值,它必须始终生成相同的哈希值。
- 雪崩效应:微小的输入值变化也会让输出值发生巨大的变化。
- 不可逆:从哈希函数的输出值不可反向推导出原始的数据。
哈希冲突
哈希冲突是随着 kv数据增多,哈希函数的结果可能会出现冲突的情况,当一个哈希桶中出现两个相同的 k-v就会出现冲突。解决方法如下:
- 链式存储:
链式存储的思想就是将映射到一个哈希桶中的所有元素都使用链表串起来。如果出现冲突了,就将其追加到链表的后面,使用链表组织起来。
这样做在查询时需要遍历整个链表。如果头节点的key不相同,就要向链表的下级节点一致遍历。
- 开放寻址:
开放寻址是不断进行查找空余的哈希桶,如果最开始得到的哈希桶已经存在数据,则继续寻找下一个哈希桶,直到找到一个空闲的哈希桶存储key-value。
开放寻址法有多种不同的探测方式,包括(线性探测、二次探测、双重散列)等。但是其核心思想都是不断寻找下一个位置直到找到一个空槽。
Go Map - hmap 底层实现
golang 基于 哈希表来实现,采用链式存储方法解决哈希冲突问题。map 的底层结构是一个 hmap 的结构体,关键字段 buckets 是指向 bmap 数组的指针,以及其它字段,分别指向旧桶,扩容进度,溢出桶等信息。
// A header for a Go map.
type hmap struct {
count int // 代表哈希表中的元素个数,调用len(map)时,返回的就是该字段值。
flags uint8 // 状态标志,下文常量中会解释四种状态位含义。
B uint8 // buckets(桶)的对数log_2(哈希表元素数量最大可达到装载因子*2^B)
noverflow uint16 // 溢出桶的大概数量。
hash0 uint32 // 哈希种子。
buckets unsafe.Pointer // 指向buckets数组的指针,数组大小为2^B,如果元素个数为0,它为nil。
oldbuckets unsafe.Pointer // 如果发生扩容,oldbuckets是指向老的buckets数组的指针,老的buckets数组大小是新的buckets的1/2。非扩容状态下,它为nil。
nevacuate uintptr // 表示扩容进度,小于此地址的buckets代表已搬迁完成。
extra *mapextra // 这个字段是为了优化GC扫描而设计的。当key和value均不包含指针,并且都可以inline时使用。extra是指向mapextra类型的指针。
最重要的是 buckets 字段,指向哈希桶数组,数组的元素是 bmap。bmap 是一个哈希桶,哈希桶用来存储key-value 数据,在源代码阶段是如下:
// A bucket for a Go map.
type bmap struct {
tophash [bucketCnt]uint8 //存储每个key哈希值的 高8位,以方便查找
}
桶外确定数组位置:
hmap 中 buckets 数组长度的对数 是指桶数组的长度用来计算数组长度的,公式:2的B次方。例如 B = 3 ,那么桶数组的长度是 2^3 = 8。方便通过位与运算来计算桶的位置。
hash-key 值的低 B 位,是指将 hash 后的按照 二进制表示,取最低的 B位。如下所示:
如果 hash 值是 0x12345678,那么它的二进制表示是:
0001 0010 0011 0100 0101 0110 0111 1000
如果 B = 4,那么它的低 B 位就是:
1000
如果 B = 8,那么它的低 B 位就是:
0111 1000
如果 B = 12,那么它的低 B 位就是:
0101 0111 1000
hash 值得低B位,绝对了key-value 落入那个 buckets 数组的 bmap。
桶内寻找key-value位置:
tophash 是一个数组类型,数组长度是8。用来存储每个key哈希值的高8位,通过高8位来寻找哈希桶内位置,判断key-value是否冲突。每个哈希桶最多存储8个键值对。
- 为了让哈希桶存放数据更加的紧凑,8个key和8个value会排列在一起存储。
- tophash:根据哈希值的高8位,来确定存放在哈希桶的位置。
举例说明 哈希值 与 tophash 的关系:
假设一个桶中有4个key,它们的哈希值分别是:
0x12345678 0x87654321 0xABCDEF01 0x10FEDCBA
那么它们的高 8 位分别是:
0x12 0x87 0xAB 0x10
那么 tophash 数组就是:
[0x12, 0x87, 0xAB, 0x10, 0, 0, 0, 0]
注意:在tophash中未使用的位置会填充0,表示空闲。
当存储key-value时,在tophash 数组中寻找 key(哈希值) 是否在数组中存在相同的高8位,如果有则代表找到了对应的key,如果没有说明不在这个桶中。具体的操作还要继续判断。
判断 key-value 是否冲突
找到对应的key之后,还要进一步比较完整的哈希值 和 key是否相等。如果完整的hash值和key都相等,说明已经存在map中,直接进行覆盖更新替换value。
如果完整的哈希值和key并不相等,说明key 的高8位发生了哈希冲突,高8位虽然相同,但是实际上是不同的key。那么就需要在桶内寻找下一个空闲的位置,将 key-value 存储在空闲处。
如果桶内没有空闲的位置,就需要创建一个新的桶,用 overflow
指针指向这个新的桶,并在新的溢出桶中存储这个 key和value。
注意:
- overflow 溢出桶就是一个 bmap新的bmap ,内部结构完全一样。
- 一个bmap 可能会关联多个溢出桶,最终形成一个链表。
例如,假设有以下几个 key:
0x12345678 0x22345678 0x32345678 0x42345678 0x52345678 0x62345678 0x72345678 0x82345678
如果 B = 8,那么它们的低 B 位都是 0111 1000,也就是说它们都会被映射到同一个 bmap 中。但是一个 bmap 只能存储 8 个 key-value 对,所以当第九个 key 来的时候,就需要创建一个新的溢出桶,并将 overflow 指针指向它。如果还有第十个、第十一个、第十二个…这样的 key 来的时候,就需要不断地创建新的溢出桶,并将它们串成一个链表。
在编译阶段期间 bmap会动态添加字段,完整的bmap 如下:
type bmap struct {
topbits [8]uint8 //tophash
keys [8]keytype //key 存储的连续key
values [8]valuetype //value 存储的连续value
pad uintptr //用于对齐的字段填充
overflow uintptr //overflow 指向下一个桶的指针
}
这样编译阶段补全的方式,是为了让bmap结构更灵活,根据不同的key 和 value 类型来生成不同的 bmap 结构。
总结map实现:
- map的实现依赖底层 hmap 数据结构,该数据结构中包含了一个 buckets 数组,数组的每一个元素是一个哈希桶,key和value 就存储在这个 哈希桶中。
- buckets 数组元素是 bmap 数据结构,一个 bmap 最多存储 8对 key-value。
- 如果对key-value指向增删改操作时,会在bmap中寻找匹配的key,如果找不到则判断是否存在溢出桶,如果有则去溢出桶寻找,溢出桶如果还是没有,则返回 value 的默认值。
map 存储key-value 的过程:
- 通过哈希函数得到一个存储 kv的哈希值。
- 根据哈希值的低B位,确定选择数组中的哪一个桶,B对应 hmap 中的一个字段,表示桶数组长度的对数。
- 根据哈希值的高8位,确定哈希桶内部的位置,使用tophash数组来存储每一个key的高8位。
- 如果桶内 或者 关联的溢出桶 能找到相同的key,则覆盖value;
- 如果桶内已经满了 或者 出现冲突,则创建一个新的桶,通过 overflow 字段指向溢出桶。
- 如果 map 的负载因子超过了阈值,则触发扩容操作,将桶的数组长度扩大一倍,并重新分配所有的key-value 键值对。
map 遍历过程(为什么无序)
map遍历使用for range 语句来实现,for range 会调用 map的迭代器,迭代器会返回 map 中的key 和 value。
map 的迭代器 是一个 hiter 结构体,包含了遍历map 所需要的信息,比如当前桶索引、桶中元素索引、溢出桶索引等等。
map 的迭代器会按照以下步骤遍历 map 中的元素:
- 检查 map 是否为空或者正在扩容,如果是直接返回。
- 然后从第一个桶开始,遍历每一个 bmap 的 cell,如果 cell 不为空则返回 cell 的key-value。
- 如果 bmap 的 overflow 不为空,包含溢出桶则继续遍历溢出桶的 cell。
- 直到遍历完所有的桶为止。
map 无序的原因:
- map 的元素首先存储时就是无序的,是按照哈希函数进行分布的。
- map 在每次遍历的时候 都会随机生成序号 bucket,再从其中随机的 cell 进行遍历。具体随机代码如下:
// 生成随机数
r := uintptr(fastrand())
if h.B > 31-bucketCntBits {
r += uintptr(fastrand()) << 31
}
// 决定了从哪个随机的bucket开始
it.startBucket = r & bucketMask(h.B)
// 决定了每个bucket中随机的cell的位置
it.offset = uint8(r >> h.B & (bucketCnt - 1))
注意:如果需要特定顺序遍历,需要先取出key 进行排序,然后按照key 的顺序取出map的value
map 扩容
触发扩容条件:
- 负载因子是否超过 6.5
- 判断是否有过多的溢出桶
总结:map扩容触发条件,一种是map的负载因子超过了6.5,一种是溢出桶太多了。
如何扩容 ?
map 扩容的核心函数如下:
- hashGrow:负责初始化新的桶。
- groWork:迁移数据,每次调用最多迁移两个桶的数据。调用 evacuate。
- evacuate:真正干活的迁移函数,负责迁移指定桶中的数据。
- advanceEvacuationMark:收尾工作,增加 nevacuate,确定所有的 oldbuckets 都迁移完了。
扩容迁移具体步骤
- 开始扩容,准备新的哈希桶数组:
需要扩容时就要创建更多的新桶,然后把旧桶存储的 key - value 都迁移到新桶里,如果哈希表存储的key-value 数据较多,一次性迁移所有桶花费的时间会比较显著,所以通常在分配哈希表进行扩容时,会先分配足够多的新桶,然后用 oldbuckets
字段记录旧桶的位置,再加一个字段记录旧桶迁移的进度(进度:下一个要迁移的旧桶编号)。
- 数据迁移
扩容完成后,需要做数据的迁移。数据的迁移并不是一次完成的,是使用时才会做对应 bucket 的迁移,每次做多迁移两个哈希桶数据,直到逐步完成迁移。
- 从第一个hash桶开始,检查是否已经被搬迁到新的 哈希桶中
- 如果是则跳过;
- 如果不是,则将该桶中的元素重新计算哈希值和位置,并复制到新的桶中。
- 如果该桶存在溢出桶,也按照同样的方式重新计算哈希值和位置,再搬迁溢出桶中的元素,并释放溢出桶内存。
- 最后,如果该桶中没有溢出桶,则继续搬迁下一个桶,直到搬迁完所有桶位置。
map 如何分辨读写数据在新桶还是旧桶
map 会通过 evacuatedX 和 evacuatedY 两个标志位来分辨要读写的数据是在旧桶还是在新桶。
-
当map 进行扩容时,会将原来旧桶赋值给 oldbuckets 字段,然后创建一个新的 哈希桶数组赋值给 buckets。新桶的长度是原来的二倍。
-
然后,会从第一个旧桶开始,将数据迁移到新桶中,迁移时会根据哈希值和新桶的数量重新进行计算元素位置,然后分配到新桶中完成迁移。
-
迁移完成后,会将旧桶中的 tophash 数组每个元素都设置 evacuatedX 或者 evacuatedY。表示该旧桶已经被迁移。
evacuatedX:表示迁移后的位置在:原来位置上 ~ 原来位置 + 旧桶数量 之间,如原来是3,旧桶是8,则迁移后的位置区间是 3 ~11。
evacuatedY:表示迁移后的位置在:原来位置上 + 旧桶数量 ~ 原来位置 + 2倍旧桶数量之间;如果原来是3,旧桶数量是8,则迁移后的位置在 11~19之间。
当map读写时,会先根据哈希值和桶数量计算出桶的位置,并检查该桶是否已经被迁移,如果是则会根据 evacuatedX 和 evacuatedY 的标志位,从新桶中获取元素;如果不是则从原来的旧桶中获取元素。
map 迁移时如何保护读写不受影响?
读取时:
- map遍历时根据 evacuatedX / evacuatedY标志位进行判断,判断k-v 目前是在新桶中还是在旧桶中。
- 通过 hmap.nevacuate 字段的值,可以判断已经搬迁了多少个旧桶,如果某个旧桶的索引小于该值,则说明该旧桶已经完全被搬迁,可以直接读新桶。
写入时:
- 在map写入时,会先检查是否被迁移过,如果已经完成了迁移,则直接到新桶中进行处理。
- 如果没有完成迁移,则会调用 growWork 函数,该函数会根据迁移的进度,找到还未迁移的哈希桶,将其数据复制到新的哈希桶中,在进行写入更新。
map 遍历、读写时,都会根据哈希值和桶数量计算出桶的位置,并检查该桶是否已经被迁移,如果是则会从新的桶数组中获取元素;如果不是,则会从原来的桶数组中获取元素。
Mutex 锁
Mutex 锁的使用:
package main
import (
"fmt"
"sync"
"time"
)
var mu sync.Mutex
var wg sync.WaitGroup
func main() {
// 模拟正常模式
fmt.Println("Normal mode:")
wg.Add(3)
go worker(1, 100*time.Millisecond) // 持有锁100ms
go worker(2, 0) // 立即释放锁
go worker(3, 0) // 立即释放锁
wg.Wait()
fmt.Println()
// 模拟饥饿模式
fmt.Println("Starvation mode:")
wg.Add(3)
go worker(1, 0) // 立即释放锁
go worker(2, 0) // 立即释放锁
go worker(3, 2*time.Second) // 持有锁2s
wg.Wait()
fmt.Println()
}
func worker(id int, duration time.Duration) {
defer wg.Done()
fmt.Printf("worker %d: trying to lock\n", id)
mu.Lock() //上锁
fmt.Printf("worker %d: locked\n", id)
time.Sleep(duration) // 模拟占用锁的时间
mu.Unlock() //解锁
fmt.Printf("worker %d: unlocked\n", id)
}
Mutex 实现原理
Mutex 内置了一个状态值(state) 标识锁的状态,以及一个 sema 信号量。
type Mutex struct {
state int32 //表示互斥锁的状态
sema uint32 //信号量变量,用来控制等待 goroutinee 的阻塞休眠和唤醒
}
const (
mutexLocked = 1 << iota //第一位:是否持有互斥锁
mutexWoken //第二位:是否有被唤醒的等待者
mutexStarving //第三位:互斥锁是否处于饥饿模式
mutexWaiterShift = iota //阻塞等待的数量
starvationThresholdNs = 1e6 //饥饿阈值 1ms
)
state 是int32类型,通过二进制位来表示锁的状态:
- 第1位 Locked:表示锁是否被持有,1 = 加锁,0 = 解锁。
- 第2位 Woken:表示是否有被唤醒的等待者,1 = 有,0 = 没有。
- 第3位 Starving:表示是否处于饥饿模式,1 = 饥饿模式,0 = 正常工作模式。
- 第4~n位:等待偏移量,包含一些排队上锁的
由Lock 和 UnLock 实现加解锁,具体的逻辑工作函数由 LockSlow 和 UnLockSlow 来实现。
mutex 锁升级
针对 goroutines 竞争锁资源,golang有两种处理方法,也是锁升级的一个过程。以下是处理 锁竞争时的两种处理方法:
-
阻塞 / 唤醒:
- 阻塞:当goroutine 试图加锁失败,出现阻塞后,当前 goroutine 会被直接挂起。
- 唤醒:当goroutine 等待锁资源释放,并成功抢到锁资源后,再将goroutine 重新唤醒继续执行。
-
自旋CAS:
- 基于自选结合 CAS 的方式,重复校验锁的状态并尝试获取到锁,始终处于循环状态。
自旋CAS 无序阻塞唤醒goroutine,但是会长时间for循环,占用CPU资源,适合并发情况较低的场景。
阻塞唤醒,需要频繁的挂起协程,进行上下文切换操作较重,适合并发场景高的场景。
golang 基于以上两种 场景设计了一个锁升级机制,针对并发环境由乐观(正常模式)逐渐转向悲观(饥饿模式)的过程,进行了区分实现。
mutex 如何判断锁竞争是否激烈
通过 state 和 seam 信号量来判断。
首先是保持正常模式,正常模式下的 goroutine 会自旋CAS 的方式竞争锁自旋。当mutex 的 state 字段表示mutex锁的状态,如果 state 值经常发生变化,或者有很多的 goroutine 处于排队状态,就说明锁的竞争比较激烈。
seam 信号量字段用于阻塞和唤醒等待的 goroutine。如果 seam 信号量频繁发生变化,就说明有很多阻塞 和 唤醒的goroutine操作,说明锁竞争很激烈。
重点原因:
- mutex 的自旋次数和时间,是源代码中模式切换的阈值
- 自旋时间超过 1ms 没有获取到锁
- 自旋次数超过 4次 没有获取到锁
CAS 自旋
CAS 是 CPU 中的一个指令,自旋的实现依赖 CAS。
当一个 Goroutine 在获取锁的时候,如果锁资源已经被其它 Goroutine 持有,那么当前 Goroutine 会一致循环等待监听 mutex,不断的判断是否能够成功获取到锁,直到获取到锁才会退出循环。
mutex 的正常模式和饥饿模式
- 正常模式过程详解
正常状态的 state = 1,在尝试加锁时 goroutine 会自旋4次,尝试通过原子性操作获取到锁。若无法获取锁,则通过seam信号量进入阻塞状态,进入排队等待状态。所有的goroutine 都会按照先入先出 FIFO 的顺序等待排队。
当一个持有锁的 Goroutine 调用 UnLock 方法时,会检查当前 mutex 的 state字段,判断是否有等待的 goroutine,并且还没有被唤醒,则会选择一个阻塞等待的 goroutine 唤醒。
当 Goroutine 被唤醒后,并不会直接拥有锁,而是需要跟后来者竞争,这些后来者就是处于自旋阶段且尚未排队的goroutine。如果没有竞争到锁资源,就会重新排队,插入到队列的头部等待下一次的唤醒。
为了避免这种频繁唤醒却拿不到锁资源的情况,当一个 goroutine 加锁等待时间超过 1ms还是不能获取到锁,就会将mutex 转变为饥饿模式。
- 饥饿模式过程详解
饥饿模式是指 goroutine 等待锁的时间过长,会切换正常模式到互斥模式。在饥饿模型下,锁的所有权会直接从解锁的 goroutine 传递给等待队列中的第一个 Goroutine,后来的 goroutine 不会再进行自旋或者尝试获取。保证了公平性。
饥饿模式如何转换回正常模式:
- 队列首个 goroutine 的等待时间已经小于 1ms;
- 队列中没有 goroutine 排队,获取到锁之后就会将 mutex state 状态从饥饿模式转变为正常模式。
正常模式和饥饿模式哪个更好
正常模式的优点是能够提高吞吐量,因为频繁的挂起和唤醒 goroutine 会带来较多的开销。缺点是可能会出现队列尾端排队的 goroutine 迟迟抢不到锁。
饥饿模式的优点是能够保证公平性,所有 goroutine 都需要排队,严格的 FIFO。缺点是性能会相对差一些,因为锁的所有权需要传递,不能直接被来的 goroutine 抢占。
mutex 的 midPath 和 osq Lock
mid path:
是一种介于 fast path 和 slow path 之间的代码路径,也叫 optimistic spinning(乐观锁),它是指当 mutex 被其他协程持有时,当前协程不进入等待队列,而是自旋等待锁的释放,希望能够快速获取锁。这种方式可以避免上下文切换的开销,但是也会消耗 CPU 资源。
osq lock:
是一种用于实现 mid path 的锁机制,它是一种结合了 MCS lock 和 qspinlock 的实现,又根据 mutex 的语义量身裁剪过的锁。它可以保证处于 mid path 的协程之间的公平性和高效性,避免过分乐观的自旋等待。
osq lock 使用一个双向链表来组织处于 mid path 的协程,每个协程都有一个 optimistic_spin_node 结构体来记录自己的状态和位置。
osq lock 还使用一个原子变量 tail 来记录链表的尾部节点,方便新来的协程加入链表。当持有 mutex 的协程释放锁时,它会检查 osq lock 队列是否为空,如果不为空,就会释放 osq lock,并让 spinner 们竞争锁。
Lock() 加锁过程
- fast path 快速路径
在 Lock() 中通过原子性操作实现处理理想的锁状态;而哪些非预期的状态放在了 LockSlow 方法中处理。
func (m *Mutex) Lock() {
//atomic,判断当前锁状态是不是0,如果是0,则设置成1,表示加锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
m.lockSlow()
}
Lock方法中会通过原子性操作,判断互斥锁当前是否处于解锁状态,如果是则直接加锁;理想状态下只需要一个自旋操作就能够获取到锁,如果一个CAS 没有获取到锁,就会进入 LockSlow方法处理。
- lockslow goroutine 队列+饥饿模式
- 声明一些局部变量用来存储状态
func (m *Mutex) Lock() {
// 等待时间
var waitStartTime int64
// 饥饿标记
starving := false
// 唤醒标记
awoke := false
// 自旋次数
iter := 0
// 当前的锁的状态
old := m.state
...
}
方法进来之后会先创建几个局部变量:
- waitStartTime:记录goroutine 进入阻塞时的时间点
- starving:是否开启饥饿模式;
- awoke:是否有尝试唤醒的 goroutine 在等锁;
- iter:标记当前 goroutine 参与自旋的次数;
- old:临时存储锁的 state 状态。
- 自旋CAS 空转
func (m *Mutex) Lock() {
...
for {
// 锁是非饥饿状态,锁还没被释放,尝试自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
//将当前goroutine设置成唤醒状态,worken = 1
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 自旋
runtime_doSpin()
// 自旋次数加1
iter++
// 设置当前锁的状态
old = m.state
continue
}
...
}
- 进入 for 循环
- 对锁当前的状态进行判断,满足三个条件:Ⅰ加锁状态、Ⅱ锁处于正常模式、Ⅲruntime_canSpin 还允许继续执行自旋。则进入自旋状态;(检查是否满足自旋状态)
- 进入自旋模式后
- 执行自旋 runtime_doSpin()
- iter 自旋次数++
- 重新赋值当前 mutex 状态
- 自旋后预写状态新值
如果自旋的条件不满足则需要对新的状态代码进行更新,具体如下:
new := old
if old&mutexStarving == 0 {
// 如果当前不是饥饿模式,那么将mutexLocked状态位设置1,表示加锁
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
// 如果当前被锁定或者处于饥饿模式,则waiter加一,表示等待一个等待计数
new += 1 << mutexWaiterShift
}
// 如果是饥饿状态,并且已经上锁了,那么mutexStarving状态位设置为1,设置为饥饿状态
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// awoke为true则表明当前线程在上面自旋的时候,修改mutexWoken状态成功
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 清除唤醒标志位
new &^= mutexWoken
}
- 第一个判断,如果当前加了锁但是没有处于饥饿状态,会重新加一次锁,属于一种保底策略,保证当前 goroutine 一定加锁成功。
- 第二个判断,如果锁处于饥饿模式,或者已经上锁了;则在队列中要加上一个等待的 goroutine。
- 第三个判断,如果锁处于饥饿模式,并且已经上锁了,则将mutex 状态改成饥饿模式
- 第四个判断,如果在自旋时设置成功,那么这里要消除 Woken 标志位。因为后续流程中很有可能当前线程会被挂起,需要等待其它释放锁的 gouroutine来唤醒。如果 unlock 时发现 mutexWoken 位置不是0,就不会去唤醒了。
- 继续判断是否需要加锁
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 1.如果原来状态没有上锁,也没有饥饿,那么直接返回,表示获取到锁
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 2.到这里是没有获取到锁,判断一下等待时长是否不为0
// 如果不为0,那么加入到队列头部
queueLifo := waitStartTime != 0
// 3.如果等待时间为0,那么初始化等待时间
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 4.阻塞等待
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 5.唤醒之后检查锁是否应该处于饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 6.判断是否已经处于饥饿状态
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 7.加锁并且将waiter数减1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// 8.如果当前goroutine不是饥饿状态,就从饥饿模式切换会正常模式
delta -= mutexStarving
}
// 9.设置状态
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
到这里之后,首先会CAS设置新的状态,如果成功则往下走,否则返回之后循环设置状态。
- 首先会判断old状态,如果没有饥饿,也没有获取到锁,那么直接返回,因为这种情况在进入到这段代码之前会将new状态设置为mutexLocked,表示已经获取到锁。这里还判断了一下old状态不能为饥饿状态,否则也不能获取到锁;
- 判断waitStartTime是否已经初始化过了,如果是新的goroutine来抢占锁,那么queueLifo会返回false;如果不是新的goroutine来抢占锁,那么加入到等待队列头部,这样等待最久的 goroutine 优先能够获取到锁;
- 如果等待时间为0,那么初始化等待时间;
- 阻塞等待,当前goroutine进行休眠;
- 唤醒之后检查锁是否应该处于饥饿状态,并设置starving变量值;
- 判断是否已经处于饥饿状态,如果不处于饥饿状态,那么这里直接进入到下一个for循环中获取锁;
- 加锁并且将waiter数减1,这里我看了一会,没用懂什么意思,其实需要分两步来理解,相当于state+mutexLocked,然后state再将waiter部分的数减一;
- 如果当前goroutine不是饥饿状态或者waiter只有一个,就从饥饿模式切换会正常模式;
- 设置状态;
加锁流程面试总结
lock() 方法加锁时分为两种状态:
- fastPath 理想状态,互斥锁处于解锁状态,直接通过CAS获取锁。
- lockSlow goroutine队列 + 饥饿模式,通过 goroutine 排队的方式来获取锁。
具体流程如下:
- 首先通过 CAS 原子性操作尝试将 mutex 加锁。如果成功表示加锁成功。
- CAS 失败后,表示当前 mutex 已经被其它 goroutine 持有,会进入 slowPath 。
- 在 slow Path 中继续判断是否可以进行自旋等待。如果自旋次数 达到4次还没有获取到锁,就会强制进入饥饿模式。
- 在饥饿模式中会排队等待锁的释放。
- 将当前 goroutine 加入到队首。
- 当持有 mutex 的 goroutine 调用Unlock 时,会检查 wait list 队列是否为空,并唤醒排队队首的 goroutine。将锁的所有权传递给它,避免饥饿。
- 如果当前 goroutine 就是 mutex 阻塞队列的最后一个,则将 state 设置为正常模式。
UnLock() 解锁过程
fast path:
slow path:
具体流程如下:
-
首先检查 mutex 的 state 是否 = 1,如果不为1,则表示没有被锁定属于运行时错误,导致panic。
-
fast Path:
- 如果 state = 1,则表示当前mutex 被goroutine 持有,尝试通过原子操作将state 设置为0,如果设置成功,则表示解锁成功。
-
slow path:
- 如果设置 state = 0 失败,就表示 mutex 还有其它协程在等待或者自旋,进入 slow path。
- 在slow path 中,会判断是否有等待的 Goroutine,如果有,则会唤醒队首的 Goroutine,并将锁的所有权传递,避免饥饿。
- 直到 Goroutine 阻塞队列为空时,并且没有自旋的 Goroutine 在抢锁后,就会将 state = 0,并退出临界区。
RWMutex 读写锁
RWMutex 可以理解为一把读锁,一把写锁;
写锁:
- 具有严格的排他性(互斥读、写),当其被占用时,其它视图获取读写锁的 goroutine 将会被阻塞。
读锁:
- 具有共享性,互斥写锁,兼容读锁。视图获取写锁时会进入阻塞,视图读取操作可以与其它goroutine 共享 读锁资源。
RWMutex 适合读多,写少的场景,最理想化的情况是所有操作均可以使用读锁,避免了程序出现阻塞;最坏的情况是所有操作均使用写锁,退化成普通的 Mutex。
底层数据结构实现
RWMutex 读写锁通过 count++ 计数器 和 Sem 信号量的方式记录读锁数量和写锁的数量。
type RWMutex struct {
w Mutex // 互斥锁,用于保护内部字段
writerSem uint32 // 写信号量,用于唤醒阻塞的写者
readerSem uint32 // 读信号量,用于唤醒阻塞的读者
readerCount int32 // 读者计数器,记录当前活跃的读者数量
readerWait int32 // 等待计数器,记录当前等待的读者数量
}
关键字段作用如下:
w:互斥锁,用户保护内部字段,以及实现写优先逻辑。
- 当有写操作时,会先尝试获取 w互斥锁,如果成功则可以进行写入操作。
- 如果失败,则表示有其它写操作 或者 读操作正在进行,需要等待 w 锁的释放。
writerSem:写信号量,用于唤醒阻塞的写 Goroutine。
- 当有写操作被阻塞时,会在 writeSem上等待。
- 当 w锁 被释放时,会检查所有等待的 Goroutine,如果有就会通过 writeSem 唤醒,并将锁的所有权传递。
readerSem:读信号量,用于唤醒阻塞的读 Goroutine。
- 当有读操作被阻塞时,会在 readerSem 上等待。
- 当 w锁 被释放时,会检查所有读的 Goroutine,如果有就通过 readerSem 唤醒所有的 Goroutines,并让它们可以并发读取。
readerCount:读计数器
- 记录当前活跃的读者数量。当有读操作开始时,会原子性的增加 readerCount的值。当有读操作结束时,会原子性的减少 readerCount的值。当readerCount = 0时,代表没有读操作,可以写入。
readerWait:等待计数器
- 记录当前等待的读者数量。当有读操作被阻塞时,会原子性的增加 readerWait的值,当前有读操作被唤醒,则原子性减少 readerWait 的值。当 readerWait = 0 ,代表没有等待的读者,可以释放 w锁。
读锁操作
RLock() 加锁执行过程
func (rw *RWMutex) RLock() {
// 如果启用了 race检测,就先禁用它
if race.Enabled {
_ = rw.w.state
race.Disable()
}
//原子性的增加 readerCount 的值
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
//如果readerCount 小于0,表示有writer 在等待或者执行写操作,阻塞在readerSem上。
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
// 如果启用了 race 检测,就重新启用它,并获取 readerSem 的所有权。
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
- 调用RLock方法,检查是否有写锁 或者 阻塞等待获取写锁 的 Goroutine。
- 使用atomic.AddInt32 原子性的增加 readerCount 的值,表示有一个 reader 获取了读锁。
- 接着判断 当前的 readerCount是否小于0。
- 如果 readerCount 的值小于 0 为负数,就会进入等待状态,说明此时有写入操作,读操作被阻塞在 readerSem上,表示 写 操作完成后唤醒。
为什么ReaderCount 会出现小于 0 的情况
判断 readerCount 是否小于0,是为了判断是否有 writer 写入操作在队列 A 中等待,或者正在执行写操作。如果有 write 在队列A中等待,那么ReaderCount 会减去 MaxReaders,使 ReaderCount 变成负数。
变成负数后,新来的 reader 读操作就会知道此时有 writer 写入操作正在执行,需要在信号量中排队等待。当writer 完成后,会把 readerCount 加回那个很大的数,使其变回正数。
这样后面的 reader 就可以获取读锁了。
RUnlock() 解锁执行过程
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
fatal("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
- 首先使用atmoic.AddInt32 原子性的将 readerCount - 1,表示放当前Goroutine 的读锁。
- 接着判断 readerCount 是否为负数,如果是负数,表示 write 在队列中等待执行,此时需要调用 rUnlockSlow 方法进行慢速路径进行处理。
- 处理完成后,结束函数,表示释放了读锁。
rUnlockSlow 方法:
- 首先判断 readerCount 是否 = 0 或者 -MaxReaders;如果是,表示没有 reader 持有读锁或者读锁已经溢出,此时发生 fatal 错误。
- 使用 atomic.Addint32 原子性的减少 readerWait的值,表示有一个 reader 从队列中退出等待。
- 判断 readerWait 是否为零值,如果是零,则表示队列中已经没有了 reader,此时需要唤醒 write 协程,并将 writerSem 信号量设置为 1。
总结:
- 解锁时有 fast path 和 slow path 两种解锁方式
- 第一种是直接 减去 readerCount 信号量,表示释放了一个读锁。
- 第二种是需要从 readerWait 阻塞队列中释放一个读锁。因为readerCount < 0 表示当前有写入操作正在执行,有写操作进行时,读操作会都进入阻塞等待状态。
写锁操作
lock 加写锁
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
- 调用 w.Lock() 互斥锁加锁的方法,该方法会使用互斥锁来保证只有一个写入操作。
- 接着使用 atomic.AddInt32 原子性的减少 readerCount 的值,并加上 rwmutexMaxReaders,表示有一个 writer 在队列A中等待或者执行写操作。
- 如果 readerCount 不为零,表示还有 reader 持有读锁,此时需要使用 atomic.AddInt32 原子地增加 readerWait 的值,并等待 writerSem 的信号量。
Unlock释放写锁
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
fatal("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
- 使用 atomic.AddInt32 元子地增加 readCount 的值,并加上 MaxReaders 表示释放了写锁。
- 如果 readerCount > MaxReaders 表示没有 reader 持有读锁,或者读锁溢出,触发fatal。
- 根据 readerCount 的值,唤醒相应数量的 reader Goroutine,并将ReaderSem 信号量设置为 1。
- 调用 w.Unlock 方法,该方法使用互斥锁来释放写协程的怕他性。
channel
前置知识 CSP
CSP 是并发编程的一种编程规范,允许多个任务在同一时间执行,从而提高程序的性能和效率。CSP 是一种思想,用于描述两个独立的并发实体通过共享的通信管道进行通信的方式。
“不要以共享内存的方式来通信,相反,要用通信来共享内存”,这样可以避免内存带来的复杂性和不确定性,只需要关注通信的逻辑和顺序。
golang 是支持CSP思想的语言,使用 channel 作为通信的原语,以goroutine 作为并发实体。
channel 特性
goroutine 用于并发执行任务。而 channel 可以同时被多个 goroutine 访问,用于在多个goroutines间并发安全的存储和传递数据。
chan T // 声明一个双向通道
chan<- T // 声明一个只能用于发送的通道
<-chan T // 声明一个只能用于接收的通道
有缓冲channel 的特性
buffer channel 带 size 缓冲区,属于异步模式,只要有剩余容量(生产和消费)双方就不会阻塞。否则写入方将会被阻塞,直到出现接收方才会被唤醒。
从 hchan 数据结构来分析:
- dataqsize: 不为0
- buf: 缓冲区不为空
- sendx 和 recvx 两个字段用来维护缓冲区中下一个要发送或者接收的元素索引。
- sendq 和 recvq 两个字段来维护发送或者接收数据的 goroutine 队列,实现阻塞和唤醒逻辑。
问题一:如果有缓冲channel 写满了,还继续写会发生什么?
答:channel 写满了之后会出现死锁阻塞,必须有接收方读取才能继续写入。
问题二:有缓冲channel 中有数据,但是channel 关闭了还能读吗?
答:channel 被关闭后,如果当中存在数据可以正常被接收者读取。但是如果channel 关闭后还继续写入,则会出现 panic 报错。
无缓冲channel 的特性
unbuffer channel 不带size 缓冲区,属于同步模式,只有(生产和消费)双方同步就绪,数据才能够进行传递。
无缓冲channel 的数据传输是一次性的,不会在channel中存储任何元素。
从 hchan 数据结构来分析
- qcount 肯定=0,因为无缓冲区不会在 channel 中存储任何元素,所以 qcount 始终=0。
- dataqsize 肯定=0,表示无传冲区channel
- recvq 和 sendq
问题一:无缓冲channel 没有接收者会panic吗
答:无缓冲的channel 如果没有接收者是不会报错的,并且可以正常写入数据,但是会一直阻塞直到有消费者消费。
channel 数据结构
hchan 数据结构,通过make 创建channel 的底层数据结构。
type hchan struct {
qcount uint // 元素数量
dataqsiz uint // 环形队列的数组长度
buf unsafe.Pointer // 缓冲区,指向datasize元素类型大小的数组
elemsize uint16 // 元素大小
closed uint32 // channel 是否关闭
elemtype *_type // 元素类型
sendx uint // 已发送元素在循环队列中的索引
recvx uint // 已接收元素在循环队列中的索引
recvq waitq // 等待接收队列
sendq waitq // 等待发送队列
lock mutex //保护hchan的互斥锁
}
qcount:
- 表示channel中当前有效的元素个数,在元素入队和出队时做出改变。
- qcount 永远不会大于 dataqsize。
- 只有 buffer channel 中,hchan 才会存储部分元素数据。
dataqsiz:
- 表示channel缓冲区的大小,在初始化时赋值,之后不会再改变。
- 只有 buffer channel 中才会有缓冲区大小。
buf:
- 表示channel 的缓冲区地址,指向一个循环数组,用于存放channel中的所有元素。
- 只有 buffer channel 中才会有 buffer 循环数组。
elemsize:表示channel中元素的大小,根据元素类型而定。
elemtype:表示channel中元素的类型,是一个 *_type 类型,用于反射和垃圾回收。
closed:表示channel是否已经关闭,用于修改和读取。
sendx:指向buf 循环队列,表示channel中下一个要发送的元素索引位置,用于循环缓冲区。
recvx:指向buf 循环队列,表示channel中下一个要接收的元素索引位置,用于循环缓冲区。
sendq:表示阻塞等待发送数据的 goroutine 队列。指向 waitq
recvq:表示阻塞等待接收数据的 goroutine 队列。
waitq 数据结构
type waitq struct {
first *sudog
last *sudog
}
waitq 是一个链表结构,保存在channel中的 goroutine 信息。
- first 指向头指针
- last 指向尾指针
waitq 作用:
当一个 goroutine 因为channel 操作而被阻塞时,就会创建一个sudog 结构体封装当前阻塞的 goroutine,并将其加入到 waitq 链表中,并等待被唤醒。
当一个goroutine 因为channel 操作而被唤醒时,会从 waitq中移除对应的 sudog结构体,并恢复 goroutine的执行。
recvq 和 sendq:
channel 中有两个waitq 链表,分别是 recvq 和 sendq。
-
recvq:用于保存等待接收数据的 goroutine 信息。
- 当channel 的缓冲区为空时,接收方会被阻塞,并加入到 recvq 中等待执行。如果有发送的数据了,会优先唤醒 recvq 中goroutine。
-
sendq:用于保存等待发送数据的 goroutine 信息。
- 当channel 的缓冲区慢时,发送方会阻塞,并加入的 sendq 中等待发送。如果有接收者了,会优先唤醒 sendq 中的 goroutine。
sudog 数据结构
type sudog struct {
g *g //指向被阻塞或唤醒的 goroutine 的指针
next *sudog //指向waitq中下一个sudog 的指针,实现链表操作
prev *sudog //指向waitq中上一个sudog 的指针,实现链表操作
elem unsafe.Pointer // 指向goroutine 要发送或者接收的数据元素的指针
acquiretime int64 //记录goroutine被阻塞的时间,用于调试和统计。
releasetime int64 //记录goroutine被唤醒的时间,用于调试和统计。
ticket uint32 //记录goroutine 被阻塞或唤醒时的时钟周期,用于调试和统计
isSelect bool //标记 goroutine 正在执行select 语句
success bool //标记goroutine 是否成功发送或者接收数据
parent *sudog //指向select 语句中的sudog结构体,用于实现select逻辑
waitlink *sudog //指向select 语句中的下一个sudog结构体,用于实现select逻辑。
waittail *sudog //指向select 语句中最后一个sudog结构体,用于实现select逻辑
c *hchan //指向goroutine所属的channel结构体。
}
作用:
- sudog用于封装goroutine 的状态和数据。并将其加入到 recvq 和 sendq 中等待被唤醒。当一个 goroutine 因为 channel 操作而被唤醒时,会从channel 的 recvq 和 sendq 链表中移除 sudog,并恢复 goroutine 继续执行。
- 可以实现select 语句的逻辑,多个channel 上进行非阻塞或者随机的选择。
有缓冲channel 的收发过程
初始状态下,channel缓冲区为空,读写下标(sendx 和 recvx )都指向下标0的位置。等待队列 recvq 和 sendq 也都为空。
举例说明:
假设 ch := make(chan int, 5)
创建一个5缓冲区的channel,收发过程如下:
- 首个 goroutine 向channel 中发送5次消息数据,因为此时还没有开启消费者,所以会不断向 buf 缓冲区中填入5次数据。
- 每个填入的数据会封装成 sudog 结构体,以链表的形式存储在 sendq 中。
- 并且sendx 会随着填入的数据不断递增,当写满5个后,会回到循环数组的起始位置。
- 此时发现属于 0索引位置的 数据还没有被接收,就会进入阻塞状态。
接着创建一个接收方Goroutine,不断接收来自channel 中的数据。
- 此时的recvx 指向的是下一个要消费的索引位置,也就是 0 。所以接收开始后,第一个接收的数据就是 recvx = 0 的数据。
- 此时会通过 sudog 中的元素指针,指向内存空间中的数据内容,并将其拷贝到channel 中,发送给消费者。
- 然后从 recvq 链表中删除 goroutine 的 sudog。并唤醒等待阻塞的 sendq。1
- 将sendq 队列中排队阻塞的6 写入到缓冲区数组中,填充到 sendx = 0 的循环数组首位,更新索引位置,最终实现写入。
无缓冲channel 的收发过程
无缓冲cahnnel 的底层数据结构是 hcahn 结构体,主要用到了 互斥锁,sendq 和 recvq 发送队列和接收队列。这两个队列都是由 sudog 结构体组成的链表,sudog 结构体封装了 goroutine 的状态和数据。
向无缓冲channel 发送数据过程:
- 先获取channel 的互斥锁,检查channel是否已经关闭了,如果关闭了,会抛出 panic异常。
- 如果没有关闭,则检查接收队列是否有等待的 goroutine
- 如果有:将数据拷贝到对方的 sudog 中。
- 如果没有,则将自己封装成一个 sudog,加入到发送队列中,并阻塞自己等待唤醒。
接收数据过程:
- 会先获取channel 的互斥锁,然后检查channel 是否已经被关闭了,如果关闭并且发送队列为空,则返回 默认值和false。
- 如果 channel 没有关闭,则检查发送队列中是否有等待执行的 goroutine。
- 如果有,消费者会将发送方 sudog 中的 elem 拷贝到自己的变量中,并将发送发从队列中移除,并唤醒发送者。
- 如果没有,则将自己封装成一个 sudog 加入到 sendq 队列中,并且阻塞自己等待被唤醒。
无缓冲区 与 有缓冲区的最大区别
- 无缓冲区的数据传递是指针拷贝
无缓冲channel 没有缓冲区来存储数据,而缓冲channel 有一定的容量。这意味着当一个 goroutine 像无缓冲区发送数据时,数据是不会存存储在缓冲区中的,而是直接拷贝到接收者的 sudog 中。
无缓冲区通过sudog 来管理等待发送和接收的goroutine,sudog 包含 goroutine 的信息。当有goroutine 向无缓冲 channel 发送或者 接收数据时,会检查是否有匹配的 goroutine 在等待,如果有则直接交换数据并唤醒对方。
- 有缓冲区的数据传递是从buf 的 sendq 和 recvq 中读取
select
select 执行过程大致如下:
- 首先创建一个 selectgo 数据结构,是一个数组,数组的每一个元素就是一个 scase 结构体,表示select 语句中的所有分支。
- selectgo 函数会按照一定的顺序加锁所有涉及到的channel,以避免出现死锁和竞态条件。
- selectgo 函数会生成一个随机的轮询顺序,用于遍历所有的 case,并尝试从channel 中发送或者接收标志,并唤醒对应的 goroutine。
- 如果没有任何的 case 成功执行了数据交换,则判断是否有 default case,如果有default case,则返回default case的索引和接收标志。如果没有则继续等待阻塞。
- selectgo 函数会按照相反的顺序解锁所有涉及到的channel,并返回被 case 选择的索引和接收标志。
scase 结构体
scase 使用来实现 select 的一种数据结构,属于case分支的信息,每一个case分支包含了channel 的指针,操作类型(发送/接收),数据元素的指针等等。
核心思想是实现多路复用,同时等待多个通道上的数据,并执行i相应的代码块,提高代码的性能。
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element
}