map粗略介绍
源码开头注释:
A map is just a hash table. The data is arranged into an array of buckets. Each bucket contains up to 8 key/elem pairs. The low-order bits of the hash are used to select a bucket. Each bucket contains a few high-order bits of each hash to distinguish the entries within a single bucket.
If more than 8 keys hash to a bucket, we chain on extra buckets.
When the hashtable grows, we allocate a new array of buckets twice as big. Buckets are incrementally copied from the old bucket array to the new bucket array.
Map iterators walk through the array of buckets and return the keys in walk order (bucket #, then overflow chain order, then bucket index). To maintain iteration semantics, we never move keys within their bucket (if we did, keys might be returned 0 or 2 times). When growing the table, iterators remain iterating through the old table and must check the new table if the bucket they are iterating through has been moved ("evacuated") to the new table.
map就是一个哈希表。数据放在在一组桶中。每个桶最多包含8对key/elem,称之为8个槽。键的hash值的后面几位用于确定这个数据放在哪个桶中,前面几位用于确定这个数据放在桶的哪个槽。
如果一个桶中超过8个数据,就建立一个桶的链。
当哈希表增长时,我们分配一个两倍大的桶数组,数据从旧桶数组增量复制到新桶数组。
Map迭代器遍历bucket数组,并按遍历顺序返回键(桶内数据,然后是溢出链顺序,然后是桶索引)。为了维护迭代语义,map从不在桶中移动键的位置(如果在桶内移动了键的位置,键可能会被重复遍历或者漏掉)。在扩展表时,迭代器仍对旧表进行迭代,同时检查新表,以防止他们所遍历的存储桶已经迁移到新表中。
一般来说,map使用hash表来实现,不同语言或框架的实现方式不同,但是都会涉及到一个问题:如何避免冲突?不同作者的解决方案不同:
- java的HashMap使用拉链法+红黑树
- redis的字典类型使用拉链法
- cpp的unordered_map也是使用拉链法+红黑树
go语言中的map实现有些特殊,go语言使用拉链法+数组,如下图所示,go语言存储数据是在Bucket数组,下文用桶数组来代替,每个桶数组下用拉链法挂着一系列的bmap,也就是桶。每个桶里可以看成是可以包含8个元素的数组,每个元素的位置下文称之为槽。
这样的结构,在hash冲突时,会优先填满当前桶的8个槽,一个桶被填充满之后,如果还有冲突,就再用拉链法加一个槽,每个桶有指向下一个桶的指针。在拉链法中除了首个被直接访问到的桶,其余的桶下文称之为溢出桶。如图中最右边的那个桶就是溢出桶。
下图中的mapextra字段可以看文章最后的extra字段章节,在此之前,只需要关注mapextra的nextOverflow
字段。
结构体、常量
结构体介绍
接下来根据上图详细介绍用到的一些结构体和常量。
这是go map的主要结构hmap,用于存储map的一些状态和桶数组。
// go map的主要结构
type hmap struct {
count int // map中KV对的数量
flags uint8 // 该map当前的状态,标志位
B uint8 // 桶数基于2的指数,该map最多能存储loadFactor * 2^B个数据对
noverflow uint16 // 溢出桶的大概数量,可以先不关注,详见溢出桶相关章节
hash0 uint32 // 计算键值的哈希种子
buckets unsafe.Pointer // 长为2^B的桶数组,元素是桶,当count==0时为nil
oldbuckets unsafe.Pointer // 仅在扩容时为非nil的旧桶数组,是当前桶数组的一半大小
nevacuate uintptr // 表示下一个待迁移桶的下标,小于该值的桶已经被迁移了
extra *mapextra // 可选字段
}
// mapextra 是hmap的可选字段
type mapextra struct {
// 以下两个字段先不关注,具体看extra相关章节
overflow *[]*bmap
oldoverflow *[]*bmap
// nextOverflow 存储指向预建立的空溢出桶的指针
nextOverflow *bmap
}
- count是表示map中KV对的数量的字段
- noverflow是表示map中溢出桶的大概数量的字段,溢出桶超出一定数量时,map就要扩容,该字段的计算方式见溢出桶章节
- B是表示map中桶数组大小的字段,桶数组有2^B个元素,最多能存储loadFactor * 2^B个数据对,超出这个界限就要扩容
- flags是表示map当前状态的一个标志位,具体的状态看下面的常量介绍章节
- hash0是用于计算key的hash的随机种子
- buckets就是桶数组的变量
- oldbuckets是旧桶数组的变量,只有在扩容的时候不是nil
- nevacuate是下一个待迁移的桶的下标,初始化为0,只有扩容时才用得到
- nextOverflow指向预建立的空溢出桶的指针,预建立的具体操作在初始化章节
- extra的overflow和oldoverflow字段与map的具体操作关系不大,见extra字段章节
以下是桶的结构:
// bucket的结构为bmap
// 表面是以下的情况
type bmap struct {
tophash [8]uint8 // 每个槽对应的tophash值
}
// 以下才是bucket运行时的结构
type bmap struct {
tophash [8]uint8 // 存储hash值的前几位,如果小于5,则表示上述的tophash状态码
// 这里KV对是分别存储的,虽然更复杂一些,但是这方便了内存的对齐
keys [8]keytype // key数组,隐藏字段
values [8]valuetype // value数组,隐藏字段
overflow uintptr // 溢出buceket指针,隐藏字段
}
可以看出,桶在运行时的结构大概是下图这样,黄色的是tophash,蓝色的是KV堆叠排列,绿色的是overflow变量。为什么代码里写的是上面的情况,而真实运行时结构体变成下面的样子呢?还是得看extra字段章节,反正现在知道了每个桶里有8个tophash,8个KV和一个溢出桶指针。
tophash的作用是,当key经过hash后选中了这个桶,使用tophash先进行逐个的比较,tophash一样的才比较key的真实值是否一致。这样设计的好处是,tophash是桶结构的第一个字段,排列紧凑,很容易取到,tophash是uint8字段,很小,比较起来也很快。而且后续还讲到tophash的状态位辅助比较,速度更快。
此处再大概讲一下go的map是如何定位一个KV的:
- key经过hash算法获得一个hash值
- 使用hash值的后B位作为桶数组的下标找到对应的桶
- 使用hash值的前8位作为key的tophash
- 然后在桶中从0到7依次遍历每个槽的tophash,看看是否与key的tophash相等。
- 如果在桶中8个槽都没找到,就看看这个桶有没有下一个溢出桶,有就继续找下一个溢出桶。
- 如果找到了tophash一致的槽,就进一步对比key是否一致
- 如果一致,就定位到KV了,如果不一致就继续找
- 如果找完这个桶及其所有的溢出桶都没找到,就说明这个KV不存在
上述过程是一个粗略的查找过程,细节还包括使用tophash的状态位进行判断等等操作,具体看读取数据章节。
常量介绍
const (
// 每个桶能包含的最大的KV对数量,即槽的数量
bucketCntBits = 3
bucketCnt = 1 << bucketCntBits
// 当哈希表的负载因子达到6.5时应当触发扩容
// 负载因子 = loadFactorNum/loadFactorDen,用两个整数来表示是为了与其他整数乘除
loadFactorNum = 13
loadFactorDen = 2
// KV对内联模式的阈值为128Byte,
// 内联模式是指KV对直接存储在桶里,而不使用指针指向真实值
maxKeySize = 128
maxElemSize = 128
// 用于在结构体中迅速定位到数据
dataOffset = unsafe.Offsetof(struct {
b bmap
v int64
}{}.v)
// tophash字段的几个保留状态
// 也就是tophash字段小于5的都是这些特殊状态,大于5的才是正常的tophash值。
emptyRest = 0 // “后继空状态”这个桶中这个tophash对应的位置是空的,并且这个桶后面的溢出桶的这个tophash对应的位置也是空的
emptyOne = 1 // “当前空状态”这个tophash对应的位置是空的
evacuatedX = 2 // 这个tophash对应的KV对是有效的,但是已经被迁移到新表的前半部分了。
evacuatedY = 3 // 这个tophash对应的KV对是有效的,但是已经被迁移到新表的后半部分了。
evacuatedEmpty = 4 // 这个tophash对应的位置是空的,这个桶已经被迁移了。
minTopHash = 5 // 一个正常的tophash的最小值
// flag的一些状态位
iterator = 1 // 有遍历器在遍历桶
oldIterator = 2 // 有遍历器在遍历旧桶
hashWriting = 4 // 有协程在写map
sameSizeGrow = 8 // 等量扩容,也就是在扩容,但是不是两倍扩容,而是等量扩容
// 用于在遍历map时做指示用的哨兵桶ID,值是-1
noCheck = 1<<(8*sys.PtrSize) - 1
)
- 内联模式相关的两个常量可以先忽略,具体看extra字段章节
- tophash的字段,重点关注
emptyRest
和emptyOne
常量 - flag的状态位有4个
noCheck
是在遍历时用到的
初始化
初始化map的汇编结果:https://go.godbolt.org/z/T6MfPGKjE
map有很多种初始化方式,上面链接中只是提供了其中一种,它使用了makemap
方法,其他初始化的方法都用到了makemap
函数,所以接下来主要讲这个函数的实现。
makemap
实现了go的make(map[k]v, hint)
的初始化方式,如果编译器确定了这个map或第一个桶可以在栈上创建时,入参的hmap可能就不会是nil。如果h != nil,map就直接基于h创建。
func makemap(t *maptype, hint int, h *hmap) *hmap {
// 判断是否溢出
mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
if overflow || mem > maxAlloc {
hint = 0
}
// 创建hmap结构体
if h == nil {
h = new(hmap)
}
h.hash0 = fastrand()
// 选择满足 loadFactor * 2^B > hint的最小B值
// 如果hint连一个桶都占不满的话,B=0
B := uint8(0)
for overLoadFactor(hint, B) {
B++
}
h.B = B
// 初始化hmap
if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return h
}
注意,如果make(map[k]v, hint)
中的hint
小于8,即小于一个桶的槽数量,就不会使用makemap
创建了。不过这种情况不在本篇讨论范围,这只是go的一个trade-off而已。
上面代码主要的操作都在makeBucketArray
里,makeBucketArray
主要做的事情就是为map申请一个桶数组,其长度至少为2^b,顺便有可能还会预建立一个溢出桶的数组,这个溢出桶的数组在内存上与map的桶数组是连续的,此处不再展开,详情可以参考溢出桶章节。
dirtyalloc
字段要么是nil,要么是一个桶数组指针,当为nil时,表示申请一个新的桶数组,当传入一个桶数组时,表示清空并重用这个桶数组作为该map的桶数组(目前只在清空map时传入非nil)。
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
// 计算桶数组的大小
base := bucketShift(b)
nbuckets := base
if b >= 4 {
// 加上预估可能用到的溢出桶的大小
// 此处为 2^(b-4) 并进行内存对齐的数字
// 所以最终的要申请的桶数组的大小为 2^(2*b-4) 的内存对齐的数量
nbuckets += bucketShift(b - 4)
sz := t.bucket.size * nbuckets
up := roundupsize(sz)
if up != sz {
nbuckets = up / t.bucket.size
}
}
// 要么新建一个桶数组,要么清空旧的桶数组
if dirtyalloc == nil {
buckets = newarray(t.bucket, int(nbuckets))
} else {
buckets = dirtyalloc
size := t.bucket.size * nbuckets
if t.bucket.ptrdata != 0 {
memclrHasPointers(buckets, size)
} else {
memclrNoHeapPointers(buckets, size)
}
}
// 此处成立就说明之前预估的溢出桶的大小不为0
if base != nbuckets {
// 处理额外添加溢出桶的情况
// step1: 额外添加的初始位置
nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
// step2: 获取桶数组(包含了溢出桶)中的最后一个桶
last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
// step3: 将最后一个bucket的overflow指向buckets的头部
// 这步的用意其实是一个哨兵的思想
last.setoverflow(t, (*bmap)(buckets))
}
return buckets, nextOverflow
}
当B==5时,经过上述的初始化,此时base =2 ^ 5= 32,nbuckets = 2 ^ B + 2 ^ (B-4) = 32+2 = 34,具体如图所示:
总结下来,makemap的流程是:
- 根据输入的大小判断是否溢出
- 创建hmap结构体
- 选取合适的B值
- 初始化hmap的桶数组
- 计算桶数组的大小以及预估溢出桶的大小
- 创建一个新的桶数组
- 如果预估溢出桶大小>0,将hmap.extra.nextOverflow字段设为指向第一个溢出桶的指针,并将最后一个溢出桶的overflow指针指向第一个桶
读取数据
读取map的汇编结果:https://go.godbolt.org/z/8eEe8fesq
map中有两种读取key的方式:
val = m[2] // mapaccess1
val, ok = m[1] // mapaccess2
分别对应着源码的mapaccess1
函数和mapaccess2
函数,这两个函数步骤是一模一样的,此处以mapaccess1
为例。
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
// 如果map为空,返回0值
if h == nil || h.count == 0 {
if t.hashMightPanic() {
t.hasher(key, 0)
}
return unsafe.Pointer(&zeroVal[0])
}
// 如果该map状态为写状态,panic
if h.flags&hashWriting != 0 {
throw("concurrent map read and map write")
}
// 获取key的hash,并根据hash和桶掩码在桶数组找到对应的桶
hash := t.hasher(key, uintptr(h.hash0))
m := bucketMask(h.B)
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
// 如果正在扩容,判断要去新桶中找还是旧桶中找
if c := h.oldbuckets; c != nil {
// 如果是双倍扩容,在查找旧桶时,应当使用旧桶的掩码
if !h.sameSizeGrow() {
m >>= 1
}
// 根据hash和桶掩码在旧桶数组找到对应的桶
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
// 如果当前的桶还在旧桶中,还没被迁移,本次查找应当在旧桶中查找
// 这是根据桶的tophash[0]的状态位来判断的
if !evacuated(oldb) {
b = oldb
}
}
// 根据hash获取tophash
top := tophash(hash)
bucketloop:
// 依次遍历桶以及溢出桶
for ; b != nil; b = b.overflow(t) {
// 在桶内,遍历桶内的8个槽
for i := uintptr(0); i < bucketCnt; i++ {
// 如果槽与键的tophash不相同,判断槽的tophash状态位
// 如果是“后继空状态”就直接提前退出返回零值。
// 否则就继续遍历下一个槽。
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
// 如果槽与键的tophash相同,说明可能找到了
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
// 再判断查找的键与存储的键是否是一样的
// 不一样就继续遍历下一个槽。
// 一样的话,就取出值并返回。
if t.key.equal(key, k) {
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
return e
}
}
}
return unsafe.Pointer(&zeroVal[0])
}
从上面看,map读取数据主要有以下几个步骤:
- 判断map是否为空,空则直接返回零值
- 判断该map的状态,如果正在写,则panic
- 获取key的hash值,并根据掩码找到对应的桶
- 判断当前是否正在扩容
- 根据旧桶的掩码找到key在旧桶数组的桶位置
- 该旧桶位置是否已经被迁移
- 如果被迁移,接下来的查找在新桶中查找
- 如果没迁移,接下来的查找在旧桶中查找
- 依次遍历桶以及溢出桶来查找key
- 在桶内依次遍历8个槽
- 判断槽的tophash是否与key的tophash相同
- 不同就判断状态位是否是“后继空状态”。
- “后继空状态”说明这个key在以后的槽中也没有,这个key不存在,直接返回零值
- 不是“后继空状态”,就继续去下一个槽里找
- 相同的tophash就对比key是否一样
- 如果一样,返回对应的值
- 如果不一样,就继续去下一个槽里找
- 不同就判断状态位是否是“后继空状态”。
- 遍历完桶以及溢出桶还没找到key,说明不存在,返回零值
遍历桶及其溢出桶的部分在后续的插入、更新、删除、遍历等操作都会遇到,虽然不完全一致,但是基本的步骤是差不多的。
插入和更新数据
插入和更新map的汇编结果:https://go.godbolt.org/z/s7GPqKxx9
map中插入和更新数据核心函数在mapassign
实现,代码如下:
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
// 空map不可写,
if h == nil {
panic(plainError("assignment to entry in nil map"))
}
// 如果map正在被写,panic
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}
// 计算key的hash
hash := t.hasher(key, uintptr(h.hash0))
// 置map写状态
h.flags ^= hashWriting
// 如果桶数组为空,初始化桶数组
if h.buckets == nil {
h.buckets = newobject(t.bucket)
}
again:
// 根据key的hash值计算目标桶的位置
bucket := hash & bucketMask(h.B)
// 如果正在扩容,不但要把自己将要使用的桶的数据迁移掉
// 还需要再帮忙迁移一个桶的数据
if h.growing() {
growWork(t, h, bucket)
}
// 获取到目标桶的指针,如果在扩容的话,到这一步已经是迁移到新桶数组中了
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
// 计算tophash
top := tophash(hash)
var inserti *uint8 // 插入或修改目标的槽的tophash指针
var insertk unsafe.Pointer // 插入或修改目标的槽的key指针
var elem unsafe.Pointer // 插入或修改目标的槽的val指针
bucketloop: // 主要是查找map中是否已经存在该key了
for {
// 遍历桶中的8个槽
for i := uintptr(0); i < bucketCnt; i++ {
// 如果槽与key的tophash不等
if b.tophash[i] != top {
// 判断槽的tophash是否是空的
// 如果是空的,那就说明找到了目标的候选插入位置了
if isEmpty(b.tophash[i]) && inserti == nil {
inserti = &b.tophash[i]
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
}
// 如果此槽的状态位为“后继空状态”,说明这个key在此之前没有插入过
// 刚刚的候选插入位置就是真实的插入位置,退出最外层的for循环
if b.tophash[i] == emptyRest {
break bucketloop
}
// 槽与key的tophash不相等,遍历下一个槽
continue
}
// 槽与key的tophash相同,获取到槽对应的key并进行对比
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
// 虽然tophash相同,但是key不同,继续遍历下一个槽
if !t.key.equal(key, k) {
continue
}
// 找到Key了,将新的val更新到该槽中,然后直接跳到收尾操作
if t.needkeyupdate() {
typedmemmove(t.key, k, key)
}
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
goto done
}
// 遍历下一个溢出桶
ovf := b.overflow(t)
if ovf == nil {
break
}
b = ovf
}
// 到这一步说明没有在map中找到key,要新增一个KV对
// 判断是否满足扩容条件
// 如果满足,先做好扩容准备,返回again再查一次
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again
}
// 到这一步说明,在map中既没找到key
// 在key对应的桶和溢出桶中也没找到空的槽
// 应当申请一个新的溢出桶
if inserti == nil {
// 申请一个新的溢出桶
// 将inserti,insertk,elem全部指向新的溢出桶的第一个槽
newb := h.newoverflow(t, b)
inserti = &newb.tophash[0]
insertk = add(unsafe.Pointer(newb), dataOffset)
elem = add(insertk, bucketCnt*uintptr(t.keysize))
}
// 在insertk,elem中存下新的KV对
if t.indirectkey() {
kmem := newobject(t.key)
*(*unsafe.Pointer)(insertk) = kmem
insertk = kmem
}
if t.indirectelem() {
vmem := newobject(t.elem)
*(*unsafe.Pointer)(elem) = vmem
}
typedmemmove(t.key, insertk, key)
// 将inserti的值更新为key的tophash
*inserti = top
// 字典元素数量++
h.count++
done: // 此时元素已经插入或更新完毕,收尾工作
// 此时又相当于乐观锁
// 再次判断map是否正在被写入,预期是正在被写入
// 如果不是正在写入,说明并发写了,panic
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
// 清map的写状态
h.flags &^= hashWriting
if t.indirectelem() {
elem = *((*unsafe.Pointer)(elem))
}
return elem
}
从上面看,map插入或更新数据主要有以下几个步骤:
- 判断是否是空map或是否正在被并发写,如果是,panic
- 计算key的hash,置map为写状态
- 如果桶数组为空,初始化桶数组
- 查找程序:
- 根据key的hash计算出目标桶数组的位置
- 如果正在扩容,需要帮忙扩容至少一个桶以及它的溢出桶的数据
- 首先判断自己需要的这个桶是否未迁移,如果未迁移,就迁移自己需要的这个桶。
- 然后再另外多迁移一个桶。
- 获取目标桶的指针,计算出key的tophash,开始查找key程序
- 查找key程序:
- 依次遍历桶以及它的溢出桶。
- 对每个桶遍历它的8个槽。
- 判断槽与key的tophash
- 不等且为空,如果之前没记过候选位,此时记下候选位。
- 不等且为“后继空状态”,说明在此之前没插入过该key,执行插入key程序。
- 不等且不为空,说明当前槽的tophash不符合,继续下一个槽。
- 相等,判断槽的key与key是否相等。
- 不等,继续下一个槽
- 相等,说明找到了目标key的位置,替换key对应的val,执行收尾程序。
- 插入key程序:
- 找不到key的位置,说明这是一个新key
- 判断是否满足扩容条件
- 如果扩容,返回查找程序重新查找。
- 判断候选位是否为空
- 为空,说明溢出桶中没有空余位置了,需要新建空余位置
- 不为空,说明溢出桶中有空余位置,候选位就是空余位置
- 将新的KV对插入候选位置
- 更新候选位置的槽的tophash以及字典元素数量
- 收尾程序:
- 判断map的并发写状态。
- 清map的写状态。
疑点:
map的mapassign
操作主要可以概括如下,但是下面的代码真的可以防并发吗?
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
// 如果map正在被写,panic
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}
// 置map写状态
h.flags ^= hashWriting
// 插入、更新、扩容等操作
// 此时又相当于乐观锁
// 再次判断map是否正在被写入,预期是正在被写入
// 如果不是正在写入,说明并发写了,panic
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
// 清map的写状态
h.flags &^= hashWriting
return elem
}
扩容操作
go的map扩容操作使用了渐进式扩容,将键的rehash迁移操作分配到每一次插入、更新、删除操作中,每次操作都会先迁移至少一个桶的数据再进行实际的CUD操作。
扩容的条件是:
- map里的数据数量太多,超过了负载因子,此时是两倍扩容。
func overLoadFactor(count int, B uint8) bool {
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}
- map里的溢出桶数量太多,超出了桶数组的长度,此时是等量扩容。
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
if B > 15 {
B = 15
}
return noverflow >= uint16(1)<<(B&15)
}
两倍扩容是为了让map存储更多的数据,等量扩容是为了让map占的内存尽可能地少,数据在内存中紧凑起来,同时也为了提高操作效率。
开始扩容
开始扩容功能是在hashGrow
中实现,hashGrow
中主要是申请了新的桶数组、初始化了迁移的进度,并将旧的桶数组和溢出桶数组都放到old字段中,并没有真实地对桶数组进行迁移,具体的迁移过程将在帮助扩容的growWork
中进行渐进式地迁移。
func hashGrow(t *maptype, h *hmap) {
// 判断是否是等量扩容
// 如果loadFactor不达标准,就是等量扩容
// 如果是等量扩容,置map等量扩容位
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
bigger = 0
h.flags |= sameSizeGrow
}
// 申请一个更大的新的桶数组
oldbuckets := h.buckets
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)
// 如果map处于遍历状态
// 应当把所有的遍历标志改为旧数组遍历位
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
// 更新map中的一些字段
// 旧桶数组放到oldbuckets字段中
// 新桶数组放到buckets字段中
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0
if h.extra != nil && h.extra.overflow != nil {
// 溢出桶也得放到oldoverflow中
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil")
}
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
}
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}
}
帮助扩容
growWork
中实现了渐进式地迁移,迁移过程是以桶为单位进行迁移的,是map在扩容迁移时的主要操作,其主要操作放在evacuate
中实现。
func growWork(t *maptype, h *hmap, bucket uintptr) {
// 先迁移输入的桶
evacuate(t, h, bucket&h.oldbucketmask())
// 再帮忙迁移一个桶再走
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}
在介绍迁移函数evacuate
之前,先介绍一下迁移结构evacDst
type evacDst struct {
b *bmap // 迁移的目标桶
i int // 迁移的目标槽下标
k unsafe.Pointer // 待迁移的Key指针
e unsafe.Pointer // 待迁移的Val指针
}
另外,由于扩容时有等量扩容和双倍扩容两种:
- 一般来说,等量扩容的Key在重新哈希之后的目标桶位置还是在原先的位置上的,而双倍扩容的Key在重新哈希之后的的目标桶位置只有两个可能:原位置(新桶数组的前一半)和原位置+原容量的偏移(新桶数组的后一半)。
- 比如一个key的哈希值是
100101110101
,如果旧桶的B值是4,那么这个key应当落在0101
也就是3号桶中,而扩容后,桶的B值是5,这个key应当落在10101
,也就是3+16=21号桶中。
- 比如一个key的哈希值是
- 不一般的情况是,Key的每次哈希都不一样,比如
math.NaN()
。
对于上述的一般情况而言,如果Key在重新哈希之后落在原位置,那么迁移后的tophash的状态位就置为*evacuatedX
,如果Key在重新哈希之后落在原位置+原容量的偏移的位置上,那么tophash就置为evacuatedY
。*
对于上述的不一般情况而言,则根据原位置的tophash的最后1bit来决定放到新桶数组的前一半还是后一半。
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
// 获取旧桶的指针、旧桶的大小
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
newbit := h.noldbuckets()
// 判断旧桶是否已经被迁移了
if !evacuated(b) {
// x为新数组的前一半,y为新数组的后一半
// 此处初始化x的字段
var xy [2]evacDst
x := &xy[0]
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset)
x.e = add(x.k, bucketCnt*uintptr(t.keysize))
if !h.sameSizeGrow() {
// 如果是两倍扩容,初始化y的字段
y := &xy[1]
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
y.k = add(unsafe.Pointer(y.b), dataOffset)
y.e = add(y.k, bucketCnt*uintptr(t.keysize))
}
// 开始迁移旧桶以及旧的溢出桶
for ; b != nil; b = b.overflow(t) {
k := add(unsafe.Pointer(b), dataOffset)
e := add(k, bucketCnt*uintptr(t.keysize))
// 开始迁移旧桶内的旧槽
for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
// 获取槽的tophash
top := b.tophash[i]
// 判断是否为空,空则标记这个槽被迁移完成,开始迁移下一个槽
if isEmpty(top) {
b.tophash[i] = evacuatedEmpty
continue
}
if top < minTopHash {
throw("bad map state")
}
// 获取Key
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
// useY用于判断Key被迁移到新桶数组的前一半还是后一半
var useY uint8
if !h.sameSizeGrow() {
// 如果是两倍扩容,根据Key的hash来判断
hash := t.hasher(k2, uintptr(h.hash0))
if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
// 对于那种每次哈希结果不一样的Key,比如math.NaN()
// 用tophash的最后1bit来决定useY的值
useY = top & 1
top = tophash(hash)
} else {
// 对于每次哈希结果一样的正常Key,以hash的结果来决定useY的值
if hash&newbit != 0 {
useY = 1
}
}
}
// 对旧桶进行标记,表示迁移到新桶中
b.tophash[i] = evacuatedX + useY
// 获取最终的目标新桶
dst := &xy[useY]
// 如果目标新桶的目标槽下标等于8
// 新建一个新的溢出桶
if dst.i == bucketCnt {
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
}
// 将新的tophash写入目标桶的目标槽中
dst.b.tophash[dst.i&(bucketCnt-1)] = top
// 将KV写入目标桶的目标槽中
if t.indirectkey() {
*(*unsafe.Pointer)(dst.k) = k2
} else {
typedmemmove(t.key, dst.k, k)
}
if t.indirectelem() {
*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
} else {
typedmemmove(t.elem, dst.e, e)
}
// 将目标槽的下标++,下次插入的就是下一个槽位了
dst.i++
dst.k = add(dst.k, uintptr(t.keysize))
dst.e = add(dst.e, uintptr(t.elemsize))
}
}
// 如果没有其他协程在读取旧的桶数组
// 此处要清空溢出桶的数据,方便GC
if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
ptr := add(b, dataOffset)
n := uintptr(t.bucketsize) - dataOffset
memclrHasPointers(ptr, n)
}
}
// 如果要迁移的旧桶正好是下一个要迁移的桶
// 继续往后遍历找下一个待迁移的桶
if oldbucket == h.nevacuate {
advanceEvacuationMark(h, t, newbit)
}
}
// 当前桶已被迁移,继续往后找下一个待迁移的桶,newbit是旧桶数组的大小
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
// 待迁移下标++
h.nevacuate++
// 一次性查接下来1024个桶,看看是否有下一个待迁移的桶
// 为啥是1024个桶呢?见代码后的解释。
stop := h.nevacuate + 1024
if stop > newbit {
stop = newbit
}
for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
h.nevacuate++
}
// 判断是否迁移完成
if h.nevacuate == newbit {
// 迁移完成,释放旧桶数组和旧溢出桶数组
// 即使有迭代器在迭代旧的桶数组和旧的溢出桶数组
// 等它们迭代完成了,GC才会去清理那些旧的内存
h.oldbuckets = nil
if h.extra != nil {
h.extra.oldoverflow = nil
}
// 清掉map的等量扩容位
h.flags &^= sameSizeGrow
}
}
为啥一次性查1024个桶?这个数字是经过以下两个方面的trade-off的:
- evacuate是在每次调用插入、更新、删除操作时的渐进式迁移函数。如果这个数字太大,会导致每次操作的时间太长,不符合渐进式的原则。
- 如果太小,迁移的速度又太慢。经过测试,1024这个数字既能保证插入、更新、删除操作时间与map总数的长度无关(即复杂度为O(1)),也能保证迁移速度不会太慢。
整理一下渐进式迁移的过程:
- 根据待迁移旧桶下标获取待迁移的旧桶的指针
- 获取旧桶的长度
- 如果旧桶没被迁移过,迁移这个桶
- 初始化迁移结构体
- 如果是等量扩容,只初始化一个迁移结构体x即可
- 如果是两倍扩容,初始化两个迁移结构体,一个表示迁移至目标桶数组的前一半x,一个表示后一半y
- 遍历旧桶数组及其溢出桶数组,遍历桶中的每个槽
- 首先获取槽的tophash,如果是空,标记为*
evacuatedEmpty
*,遍历下一个槽 - 根据是否是等量扩容、是否是正常Key、Key的hash来决定将Key放到目标桶数组的哪个位置。
- 将槽中的数据迁移到新的槽中,如果新槽满了,新建一个溢出桶。
- 如果没有迭代器在迭代旧的桶数组,回收旧桶数组的空间
- 旧桶迁移完成
- 初始化迁移结构体
- 如果旧桶迁移完成了并且正好是下一个待迁移的桶的下标,继续往后寻找下一个待迁移的桶的下标。
- 如果桶迁移完成,清除map的旧数据指针,清除map的等量扩容状态。
删除操作
删除单个KV
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
// 如果是空map,直接返回
if h == nil || h.count == 0 {
if t.hashMightPanic() {
t.hasher(key, 0) // see issue 23734
}
return
}
// 判断map并发写
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}
hash := t.hasher(key, uintptr(h.hash0))
// 置map写标志
h.flags ^= hashWriting
// 与插入操作一样,帮助扩容、获取桶指针、计算tophash
bucket := hash & bucketMask(h.B)
if h.growing() {
growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
bOrig := b
top := tophash(hash)
search: // 一系列寻找Key的操作
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
break search
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
if !t.key.equal(key, k2) {
continue
}
// 找到了,删掉该KV对,回收内存
if t.indirectkey() {
*(*unsafe.Pointer)(k) = nil
} else if t.key.ptrdata != 0 {
memclrHasPointers(k, t.key.size)
}
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() {
*(*unsafe.Pointer)(e) = nil
} else if t.elem.ptrdata != 0 {
memclrHasPointers(e, t.elem.size)
} else {
memclrNoHeapPointers(e, t.elem.size)
}
// 置当前槽的tophash为“当前空状态”
b.tophash[i] = emptyOne
// 根据下一个槽的状态判断自己是否要由“当前空状态”转为“后继空状态”
// 下一个槽也许是下一个桶的第一个槽,也许是当前桶的下一个槽
if i == bucketCnt-1 {
if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
goto notLast
}
} else {
if b.tophash[i+1] != emptyRest {
goto notLast
}
}
// 现在确定了下一个槽是后继空状态
// 因为当前槽也是空的,所以要一直向前面的槽进行回溯
// 找到该桶中的第一个后继空状态的槽
for {
b.tophash[i] = emptyRest
if i == 0 {
if b == bOrig {
// 直到第一个桶的第一个槽,回溯终止
break
}
// 想获取上一个槽,如果上一个槽是上一个桶的最后一个槽
// 但是溢出桶都是以单链表连接的,
// 所以只能从第一个桶遍历到上一个桶,再获取上一个槽
c := b
for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
}
i = bucketCnt - 1
} else {
// 上一个槽就是本桶里的上一个槽
i--
}
// 直到找到了一个槽不是“当前空的状态”,终止。
if b.tophash[i] != emptyOne {
break
}
}
notLast:
// map的KV数量--
h.count--
// 如果map被清空了,为了保证安全性,重置hash种子
if h.count == 0 {
h.hash0 = fastrand()
}
break search
}
}
// 判断是否并发写
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
// 清除map写状态
h.flags &^= hashWriting
}
所以map删除操作虽然代码挺长的,但是逻辑异常简单。
- 空map判断、map并发写判断、置map写状态
- 计算key的hash获取目标桶指针,计算tophash
- 这里还有个细节:用另一个变量bOrig保存了首个目标桶的指针,后面有用。
- 开始寻找KV,与之前差不多,根据tophash的状态位帮助查找提速。
- 找到对应的槽,删除这个KV,并将当前状态置为“当前空状态”。
- 判断这个槽的下一个槽是否是“后继空状态”
- 如果是,将当前槽也改为“后继空状态”,考虑到当前槽前面连续的槽可能也有很多“当前空状态”,所以进行回溯,回溯的目的是将“后继空状态”尽可能地向前面的槽推进。标志是从后往前找,找到第一个不是“当前空状态”的槽。因为从后往前找的方向和溢出桶单链表的方向相反,所以要找到上一个桶就必须要依靠bOrig变量来遍历获取上一个桶。
- 如果不是,往下走。
- map的数量-1
- 如果map数量归零了,为了保证安全性,重置hash种子。
- 验证是否并发写、清除map写状态。
清空整个map
清空map的编译结果:https://go.godbolt.org/z/83a3a3ffj
当在代码中遍历所有KV并删除时,go编译器就会将其优化为清空map:mapclear
函数。
func main() { // go tool compile -S -l -N main.go
m := make(map[int]int)
m[2] = 2
for k := range m {
delete(m, k)
}
}
mapclear
函数比删除单个KV简单多了,直接一把梭清空就完事。
func mapclear(t *maptype, h *hmap) {
// 空map直接返回
if h == nil || h.count == 0 {
return
}
// 判断并发写
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}
// 置map写状态
h.flags ^= hashWriting
// 置map等量扩容状态
h.flags &^= sameSizeGrow
// 清空map字段
h.oldbuckets = nil
h.nevacuate = 0
h.noverflow = 0
h.count = 0
// 为了安全性,重置hash种子
h.hash0 = fastrand()
// 清空extra信息
if h.extra != nil {
*h.extra = mapextra{}
}
// 重置桶数组和溢出数组
_, nextOverflow := makeBucketArray(t, h.B, h.buckets)
if nextOverflow != nil {
h.extra.nextOverflow = nextOverflow
}
// 并发写判断
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
// 清除map写状态
h.flags &^= hashWriting
}
遍历操作
go的map最让人值得注意的地方就是,每次go的遍历顺序都是不一样的。这是因为go在每次开始遍历前,就随机初始化出两个变量,一个用于表示遍历的起点的桶下标,一个表示桶内遍历的起点槽下标。
在开始了解map的遍历操作之前,先了解遍历器,或者也叫迭代器。
// map遍历器的结构
type hiter struct {
key unsafe.Pointer // 必须放在第一个字段,nil时表示遍历结束
elem unsafe.Pointer // 必须放在第二个字段
t *maptype
h *hmap
buckets unsafe.Pointer // 桶数组指针,指向在hiter初始化成功时的桶数组
bptr *bmap // 当前桶
overflow *[]*bmap // hmap.buckets的溢出桶指针
oldoverflow *[]*bmap // hmap.oldbuckets的溢出桶指针
startBucket uintptr // 开始遍历时,桶数组的下标
offset uint8 // 开始遍历时,某个桶的桶内槽偏移
wrapped bool // 用于判断是否遍历过一圈了,true表示遍历完了
B uint8
i uint8
bucket uintptr
checkBucket uintptr
}
初始化遍历器
在遍历时,可能会发生map的扩容,有以下几个可能:
- 先遍历:遍历后发生的扩容,go的map不关心这种情况,go只遍历开始遍历时的那个桶数组,至于它是否变成oldbuckets,不影响遍历的结果
- 先扩容:遍历前就扩容了,这是最关键的问题,接下来的代码凡是涉及到扩容的地方,都是这种情况,go会继续优先遍历旧的数组,因为此时旧数组还在迁移中,所以为了防止违反“不重不漏”的原则,go做了很多操作。具体看代码。
func mapiterinit(t *maptype, h *hmap, it *hiter) {
// 空map判断
if h == nil || h.count == 0 {
return
}
it.t = t
it.h = h
// 保存map的B值和map的当前桶指针
// 这样避免在扩容时改变了map的B值和桶指针
it.B = h.B
it.buckets = h.buckets
if t.bucket.ptrdata == 0 {
// 如果该map是内联模式,溢出桶的指针是保存在extra里的
h.createOverflow()
it.overflow = h.extra.overflow
it.oldoverflow = h.extra.oldoverflow
}
// 随机找某个桶的某个槽开始
r := uintptr(fastrand())
if h.B > 31-bucketCntBits {
r += uintptr(fastrand()) << 31
}
// 记录下开始时桶的下标和槽的下标
it.startBucket = r & bucketMask(h.B)
it.offset = uint8(r >> h.B & (bucketCnt - 1))
// 记录下开始的桶的指针
it.bucket = it.startBucket
// 如果map不在迭代状态,置map迭代状态
if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
atomic.Or8(&h.flags, iterator|oldIterator)
}
// 执行一个迭代
mapiternext(it)
}
- 空map判断
- 遍历开始时要保存这一时刻的map的数据快照
- 随机找一个桶下标开始遍历
- 随机找一个槽下标,每个桶都要从这个随机的槽下标开始遍历
- 置map为遍历状态。
- 执行第一次遍历
单次遍历
执行一次遍历用的是mapiternext
函数。
func mapiternext(it *hiter) {
h := it.h
// 判断并发写
if h.flags&hashWriting != 0 {
throw("concurrent map iteration and map write")
}
// 获取上次迭代进度
t := it.t
bucket := it.bucket
b := it.bptr
i := it.i
checkBucket := it.checkBucket
next: // 开始一个迭代
if b == nil { // 说明还未开始遍历或上一个桶遍历完了
// 如果回到了迭代的起点,说明迭代完成了,返回
if bucket == it.startBucket && it.wrapped {
it.key = nil
it.elem = nil
return
}
// 如果迭代在扩容后开始,并且还没迭代完
if h.growing() && it.B == h.B {
oldbucket := bucket & it.h.oldbucketmask()
b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
// 如果将要迭代的数据还在旧桶数组中且没有迁移,只能去迭代旧桶
// 那么本次迭代应当只迭代旧的桶中的一部分数据
// 这部分数据的特征是:在迁移后的桶的下标仍然在这个桶下标
// 所以需要对这种桶进行check
if !evacuated(b) {
checkBucket = bucket
} else {
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}
} else {
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}
bucket++
// 遍历到最后一个桶,要绕回0桶继续遍历
if bucket == bucketShift(it.B) {
bucket = 0
it.wrapped = true
}
// 槽的下标归零
i = 0
}
// 开始遍历桶中的槽
for ; i < bucketCnt; i++ {
// 每个桶都是从it.offset槽开始遍历的
offi := (i + it.offset) & (bucketCnt - 1)
if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty {
// 如果槽为空,或已被迁移,继续遍历下一个槽
// 此处没有用“后继空状态”来提前中断遍历,思考一下为什么?答案看下面的注释
// TODO: emptyRest is hard to use here, as we start iterating
// in the middle of a bucket. It's feasible, just tricky.
continue
}
// 读取KV的值
k := add(unsafe.Pointer(b), dataOffset+uintptr(offi)*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+uintptr(offi)*uintptr(t.elemsize))
if checkBucket != noCheck && !h.sameSizeGrow() {
// 此处就用到了上文中的checkBucket
// 对于在双倍扩容中的map,且当前桶还没有被迁移的
// 只遍历那些在扩容后还落在当前桶的那些数据
// 而判断扩容后落在哪个桶的方法,在帮助扩容那一节已经说明了,此处不再重复
if t.reflexivekey() || t.key.equal(k, k) {
hash := t.hasher(k, uintptr(h.hash0))
if hash&bucketMask(it.B) != checkBucket {
continue
}
} else {
if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) {
continue
}
}
}
if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) ||
!(t.reflexivekey() || t.key.equal(k, k)) {
// 这部分数据是那些迁移后还在这个桶下标的数据
// 或者是每次哈希值都不一样的key,可以直接返回数据
it.key = k
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
it.elem = e
} else {
// 到这一步说明该数据已经被迁移走或被删除了
// 使用mapaccessK去找回这部分的数据
rk, re := mapaccessK(t, h, k)
if rk == nil {
continue // 找不到说明该数据已被删除
}
it.key = rk
it.elem = re
}
// 记住本次迭代的进度,并返回
it.bucket = bucket
if it.bptr != b {
it.bptr = b
}
it.i = i + 1
it.checkBucket = checkBucket
return
}
// 继续迭代下一个溢出桶
b = b.overflow(t)
i = 0
goto next
}
- 判断并发写,获取上次遍历进度
- 开始一次遍历
- 如果待遍历桶是nil,说明还未开始遍历,或者最后一个溢出桶也遍历完了
- 如果全部遍历完成,直接退出
- 如果是在扩容,且当前待遍历的桶的数据还在旧桶中没有被迁移,那就只能去遍历旧桶,而且还只能遍历那些扩容后仍然当前桶的数据。所以在这里需要设置一个checkBucket变量来保存当前桶的下标。
- 如果不在扩容,或者当前遍历的桶的数据已经被迁移完成,checkBucket设置为-1。
- 桶下标++,如果遍历到最后一个桶,绕回来遍历第一个桶,槽下标归零。
- 接下来对待遍历桶的8个槽进行遍历
-
如果槽为空或已被迁移,遍历下一个槽。因为每个桶都是随机地从中间开始遍历,如果遇到“后继空状态”就退出桶遍历的话,前面哪些有数据的槽就无法被遍历到了,所以此处没有使用“后继空状态”来提效,毕竟也就8个槽,但是代码注释中提到,还是有办法利用这个状态的,我的想法是:如果遇到“后继空状态”,就将循环变量直接加到最后一个槽。
-
// 原代码 for ; i < bucketCnt; i++ { offi := (i + it.offset) & (bucketCnt - 1) if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty { continue } // 其他代码 } // 新代码 for ; i < bucketCnt; i++ { offi := (i + it.offset) & (bucketCnt - 1) if b.tophash[offi] == emptyRest { i = bucketCnt - 1 - it.offset continue } if b.tophash[offi] == emptyOne || b.tophash[offi] == evacuatedEmpty { continue } // 其他代码 }
-
读取KV的值。
-
如果checkBucket不为-1,且不是等量扩容的话,过滤掉那些扩容后桶下标不是当前桶的,过滤的算法与扩容时的算法一致。
-
对于未迁移的数据或者每次哈希值都不一样的key,直接返回数据
-
对于已迁移的数据或被删除的数据,尽力去找回,找不到就继续遍历下一个,找到了就返回数据
-
记住本次迭代的进度,并返回
-
如果迭代了桶里全部的数据,槽下标归零,继续迭代下一个溢出桶。
-
根据某个未迁移的槽在扩容后是否还在这个桶中来判断本次遍历是否输出它,这样保证了遍历的不重不漏。
溢出桶
插入操作中,可能会新建一个溢出桶,先看看初始化时的这张图:
奇怪的step3,为什么step3中最后一个溢出桶的next要指向头部?这个指针后面用在何处?下面这个函数newoverflow
是新建一个溢出桶。
func (h *hmap) newoverflow(t *maptype, b *bmap) *bmap {
var ovf *bmap
if h.extra != nil && h.extra.nextOverflow != nil {
// 如果初始化预建立的溢出桶还有的话,拿一个出来。
ovf = h.extra.nextOverflow
if ovf.overflow(t) == nil {
// 预建立的溢出桶剩下不止一个,直接拿吧。
h.extra.nextOverflow = (*bmap)(add(unsafe.Pointer(ovf), uintptr(t.bucketsize)))
} else {
// 还记得初始化时那个奇怪的step3吗?
// step3: 将最后一个预建立的溢出桶的next指针指向头部
// 其实指谁都可以,只要不是nil就行
// 此处就根据“next不是nil”这个来判断:这是预建立的溢出桶的最后一个了!
// 预建立的溢出桶的最后一个,将其next指针重新设置为nil
// 同时将预建立的溢出桶也设为nil
ovf.setoverflow(t, nil)
h.extra.nextOverflow = nil
}
} else {
// 初始化预建立的溢出桶没了,此时只能自己新建一个了
ovf = (*bmap)(newobject(t.bucket))
}
// 增加一个溢出桶计数
h.incrnoverflow()
if t.bucket.ptrdata == 0 {
h.createOverflow()
*h.extra.overflow = append(*h.extra.overflow, ovf)
}
// 设置溢出桶
b.setoverflow(t, ovf)
return ovf
}
主要逻辑是:
- 如果预建立的溢出桶还有,就直接拿一个来用。
- 如果预建立的溢出桶没有了,就直接新建一个溢出桶,并新增溢出桶的计数。
增加一个溢出桶计数的incrnoverflow
函数也很有意思,注释说,溢出桶计数字段noverflow
是对map中溢出桶数量的大概估计:
- 当桶数量比较小时,是精确数量
- 当桶数量比较大时,是大概估计
func (h *hmap) incrnoverflow() {
// 当map的B值小于16时,是精确数量
if h.B < 16 {
h.noverflow++
return
}
// 当map的B值大于等于15时,使用概率来估计
mask := uint32(1)<<(h.B-15) - 1
// 举例:如果B值为18时,则mask == 7,此时noverflow加1的概率是1/8
// 也就是期望是增加8个溢出桶,才增加1个noverflow计数
if fastrand()&mask == 0 {
h.noverflow++
}
}
与判断溢出桶数量的函数放在一起看,更能体会其设计的精妙。
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
if B > 15 {
B = 15
}
return noverflow >= uint16(1)<<(B&15)
}
分类讨论:
当B<16时,noverflow是精确数量,此时满足noverflow>=桶数组大小
时就会触发等量扩容。
当B>=16时,noverflow会以概率性增长,假设B==18,数值期望上,每增加8个桶,noverflow才会增加一个,此时满足noverflow>=2^15
时,8 * noverflow ≈ 溢出桶数量
,此时也能大致满足溢出桶数量>=2^18
时才触发等量扩容。
extra字段
参考:
- 曹大带我学 Go之从 Map 的 Extra 字段谈起-51CTO.COM
- runtime: Large maps cause significant GC pauses · Issue #9477 · golang/go
- go-review.googlesource.com
内联模式
常量中有以下两个,与内联模式有关,当map存储的key和val都不包含指针,且都不大于128B,此时map就采用内联模式,也就是桶里8个KV槽存储的不是指向KV的指针,而是KV本身的值。
// KV对内联模式的阈值为128Byte,
// 内联模式是指KV对直接存储在桶里,而不使用指针指向真实值
maxKeySize = 128
maxElemSize = 128
同时,桶的结构也有所变化,之所以桶的keys、values、overflow字段要到编译时才能确定,就是因为在没有编译时,编译器不知道是否应该采取内联模式。如果采取了内联模式,桶的overflow字段就不再是一个指针,而是一个uintptr
类型,这个类型是一个uint,但是具有指针的长度。用这样的方式,就使得在内联模式时,map的所有桶是不包含指针的。
// bucket的结构为bmap
// 表面是以下的情况
type bmap struct {
tophash [8]uint8 // 每个槽对应的tophash值
}
// 以下才是bucket编译时确定的结构
type bmap struct {
tophash [8]uint8 // 存储hash值的前几位,如果小于5,则表示上述的tophash状态码
// 这里KV对是分别存储的,虽然更复杂一些,但是这方便了内存的对齐
keys [8]keytype // key数组,隐藏字段
values [8]valuetype // value数组,隐藏字段
overflow uintptr // 溢出buceket指针,隐藏字段
}
为什么要大费周章去掉桶里的指针?因为当桶里包含指针时,GC程序就必须依次遍历map中每个桶,去给每个桶的每个指针的对象染色,这在map很大的时候是很耗时的操作。如果桶中没有指针了,GC程序就不需要遍历每个桶了。上面参考链接中的Issue就是讨论这个问题。
现在还有个问题:没有下一个溢出桶指针了,如何找到下一个溢出桶?
很简单,overflow
是uintptr
类型,里面存储的是溢出桶的地址,虽然不是指针,但是也可以根据这个地址找到溢出桶的位置,然后把它揪出来。
// bmap.overflow用于获取bmap的溢出桶
func (b *bmap) overflow(t *maptype) *bmap {
return *(**bmap)(add(unsafe.Pointer(b), uintptr(t.bucketsize)-sys.PtrSize))
}
细心的人肯定会问出另一个问题:现在overflow
是uintptr
类型了,所有的溢出桶都没有指针指向它们了,难道它们不会被GC吗?会的。所以,该extra上场了。
extra字段
extra中之前一直忽略的两个字段overflow
和oldoverflow
,当map为内联模式时,为了防止没有被指向的溢出桶被GC,就由这两个字段来收集那些溢出桶。这样每个溢出桶都会被收集在这两个字段中,也就不会被GC了。
// mapextra 是hmap的可选字段
type mapextra struct {
// 如果KV对都不包含指针,并且都是内联模式的(小于128B),
// 那桶中(也就是上述的bmap实际结构)的overflow字段就不是指针,这就避免了GC的扫描。
// 但是桶中的overflow指针还是需要的,这就放在下面这两个字段中存储overflow指针。
// 也就是当实际的bmap结构中不存在overflow指针指针时,才用下面这两个指针。
// overflow存储 hmap.buckets 的溢出桶
// oldoverflow存储 hmap.oldbuckets 的溢出桶
overflow *[]*bmap
oldoverflow *[]*bmap
// nextOverflow 存储指向空溢出桶的指针
nextOverflow *bmap
}