写在文章开头
近期对go语言的map进行深入了解和探究,其中关于map解决大量冲突的扩容操作设计的十分巧妙,所以笔者特地整理了这篇文章来探讨问题。

hmap扩容详解
为什么需要扩容
go语言中map是由无数个bucket构成,假设某个哈希对应的bucket空间已满,则需要创建一个新的bmap存储键值对,无数个bmap通过overflow指针进行关联。 以下图为例,假设我们需要查找key-111元素,就需要经过以下几个步骤:
- 通过哈希运算定位到
bucket[1]。 - 基于哈希值高位得到一个值
tophash。 - 最终遍历当前
bucket[1]中的所有数组,终于在第二个溢出桶找到key-111。

很明显,如果极端情况下因为有限的桶导致大量的冲突就很可能使map元素定位的时间复杂度退化为o(n),所以我们需要重新计算哈希值以及对桶进行扩容,从而解决极端的哈希冲突场景。
hmap扩容过程
默认情况下,map的进行扩容需要符合以下两大条件之一:
- 未处于扩容且键值对数超过bucket数以及当前负载系统超过6.5。
- 未发生扩容且溢出桶数量大于bucket数。
假设我们当前hmap如下,所有key的算法都是通过哈希值的低3位和b(011)进行取模运算,因为符合上述某个条件之一触发了,需要进行扩容。

一般情况扩容都是以原有bucket数*2,所以新的bucket数组长度为16,创建新的数组空间后,hmap的bucket指针该指向这个新的数组,而原有bucket则交由oldbucket管理。 也因为数组长度有8变为16,所以记录底数的变量b也由原来的3变为4(2^3变为2^4),因为原有的数组还有两个空闲的溢出桶,所以新的数组也会创建同等数量的溢出交由管理空闲一处的指针extra.nextoverflow管理。

自此我们完成了扩容操作,go语言中的map为了避免扩容后大量迁移导致的计算耗时,对于旧有的bucket元素采用渐进式再哈希的方式进行迁移。 所以后续我们在进行相同的写入操作时,若发现这个桶正处于扩容状态,那么它就会计算当前桶中每个元素的新位置,然后一次性进行驱逐。
以下图为例,假设此时我们要修改key-111的键值对的值,在进行修改的过程中,map就会通过哈希定位到旧的bucket的key-111的值,然后进行修改,完成后基于全新的哈希算法(由%3改为%4)

源码印证
我们现在通过一段代码调试来了解一下go语言中的map时机的工作流程:
func main() {
m := make(map[int]string)
for i := 0; i < 9; i++ {
m[i] = "xiaoming"
}
}
上述这段代码经过编译之后,实际调用的是mapassign_fast64这个方法:
0x0092 00146 (f:\github\awesomeproject\main.go:10) call runtime.mapassign_fast64(sb)
这里笔者就贴出这段代码的核心片段,从核心代码可以看出当map符合进行扩容的条件时会调用hashgrow进行扩容,再通过goto again到again对应代码段进行驱逐操作:
func mapassign_fast64(t *maptype, h *hmap, key uint64) unsafe.pointer {
// ......
again:
bucket := hash & bucketmask(h.b)
if h.growing() {
growwork_fast64(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
var insertb *bmap
var inserti uintptr
var insertk unsafe.pointer
// ......
//如果map未进行扩容,且当前map负载超过最大值(默认6.5)或者溢出桶数量超过了bucket数量则进行扩容
if !h.growing() && (overloadfactor(h.count+1, h.b) || toomanyoverflowbuckets(h.noverflow, h.b)) {
hashgrow(t, h)
goto again // growing the table invalidates everything, so try again
}
// ......
}
hashgrow的扩容逻辑和上图的流程差不多,读者可以基于笔者给出的核心注释进行了解:
func hashgrow(t *maptype, h *hmap) {
//默认b的值是+1
bigger := uint8(1)
//oldbuckets 记录现在的bucket
oldbuckets := h.buckets
//创建一个新的bucket和溢出桶
newbuckets, nextoverflow := makebucketarray(t, h.b+bigger, nil)
//更新b、oldbuckets 指针指向原有bucket、buckets 指向新创建的bucket
h.b += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0
//如果原有的bucket还有空闲溢出桶,则记录到h.extra.oldoverflow指针上
if h.extra != nil && h.extra.overflow != nil {
// promote current overflow buckets to the old generation.
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil")
}
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
}
//如果刚刚有新创建的溢出桶,则用h.extra.nextoverflow指针进行管理
if nextoverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextoverflow = nextoverflow
}
}
进行修改操作时会触发的again代码段的growwork_fast64方法其内部就涉及了驱逐操作方法evacuate_fast64方法,因为有了上文的图解,所以对于这段代码的解读就很容易了,我们直接查看evacuate_fast64的核心注释即可了解驱逐操作的含义:
func growwork_fast64(t *maptype, h *hmap, bucket uintptr) {
//将对应哈希的oldbucket的元素驱逐到新bucket上
evacuate_fast64(t, h, bucket&h.oldbucketmask())
//......
}
func evacuate_fast64(t *maptype, h *hmap, oldbucket uintptr) {
//获取旧的bucket的指针
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
//......
//通过[2]evacdst数组的0索引记录oldbucket的bucket、keys、values指针,1记录new buckets的bucket、keys、values指针
if !evacuated(b) {
var xy [2]evacdst
x := &xy[0]
//记录oldbucket的bucket、keys、values指针
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.pointer(x.b), dataoffset)
x.e = add(x.k, bucketcnt*8)
//new buckets的bucket、keys、values指针
if !h.samesizegrow() {
// only calculate y pointers if we're growing bigger.
// otherwise gc can see bad pointers.
y := &xy[1]
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
y.k = add(unsafe.pointer(y.b), dataoffset)
y.e = add(y.k, bucketcnt*8)
}
//循环遍历非空的tophash进行驱逐操作
for ; b != nil; b = b.overflow(t) {
//获取在old bucket上的地址
k := add(unsafe.pointer(b), dataoffset)
e := add(k, bucketcnt*8)
for i := 0; i < bucketcnt; i, k, e = i+1, add(k, 8), add(e, uintptr(t.elemsize)) {
//计算在旧的bucket的tophash以定位键值对
top := b.tophash[i]
if isempty(top) {
b.tophash[i] = evacuatedempty
continue
}
if top < mintophash {
throw("bad map state")
}
//计算在新的bucket上的hash值
var usey uint8
if !h.samesizegrow() {
// compute hash to make our evacuation decision (whether we need
// to send this key/elem to bucket x or bucket y).
hash := t.hasher(k, uintptr(h.hash0))
if hash&newbit != 0 {
usey = 1
}
}
b.tophash[i] = evacuatedx + usey // evacuatedx + 1 == evacuatedy, enforced in makemap
//获取新的bucket的指针地址
dst := &xy[usey] // evacuation destination
//到新bucke的tophash数组上的位置记录这个tophash
dst.b.tophash[dst.i&(bucketcnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
//将旧的bucket的key复制到新bucket上
if t.key.ptrdata != 0 && writebarrier.enabled {
if goarch.ptrsize == 8 {
// 通过指针操作,将旧的key复制到新的bucket的key上
*(*unsafe.pointer)(dst.k) = *(*unsafe.pointer)(k)
} else {
// there are three ways to squeeze at least one 32 bit pointer into 64 bits.
// give up and call typedmemmove.
typedmemmove(t.key, dst.k, k)
}
} else {
*(*uint64)(dst.k) = *(*uint64)(k)
}
//复制value到新bucket的位置上
typedmemmove(t.elem, dst.e, e)
//....
}
}
// 完成后删除旧的bucket的键值对,辅助gc
if h.flags&olditerator == 0 && t.bucket.ptrdata != 0 {
b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
// preserve b.tophash because the evacuation
// state is maintained there.
ptr := add(b, dataoffset)
n := uintptr(t.bucketsize) - dataoffset
memclrhaspointers(ptr, n)
}
}
if oldbucket == h.nevacuate {
advanceevacuationmark(h, t, newbit)
}
}
经过这段代码之后,我们的旧的bucket上的桶的数据也就都驱逐完成了,随后跳出这些代码段,再次回到mapassign_fast64的赋值操作的代码找到合适key的位置设置键值对。
如下所示,可以看到核心步骤大致是:
- for循环定位到哈希算法得出的
tophash对应key的位置。 - 如果找到空位置则跳出循环进行键值对设置,并更新
count值。 - 若找到和当前key相等的位置,直接跳到
done代码段设置value。
func mapassign_fast64(t *maptype, h *hmap, key uint64) unsafe.pointer {
//......
//定位到key的指针
bucketloop:
for {
for i := uintptr(0); i < bucketcnt; i++ {
//若定位到的bucket为nil,则说明这个位置没有被使用过,则进入该分支定位指针
if isempty(b.tophash[i]) {
if insertb == nil {
insertb = b
inserti = i
}
//如果找到当前tophash为0值,说明这个位置没有被用过,直接退出循环,到外部直接赋值
if b.tophash[i] == emptyrest {
break bucketloop
}
continue
}
//获取k判断和当前key值是否一致,若不一致的continue,反之记录直接到done代码段,直接设置value的值
k := *((*uint64)(add(unsafe.pointer(b), dataoffset+i*8)))
if k != key {
continue
}
insertb = b
inserti = i
goto done
}
//.....
}
//定位到key位置设置key
insertk = add(unsafe.pointer(insertb), dataoffset+inserti*8)
// store new key at insert position
*(*uint64)(insertk) = key
//元素数量+1
h.count++
done:
//设置value
elem := add(unsafe.pointer(insertb), dataoffset+bucketcnt*8+inserti*uintptr(t.elemsize))
if h.flags&hashwriting == 0 {
fatal("concurrent map writes")
}
h.flags &^= hashwriting
return elem
}
扩容未完成时如何读
因为map是在操作时进行驱逐操作的,所以在读取时需要会按照以下步骤执行:
- 定位到当前
hash对应bucket设置给指针b。 - 查看当前
hash对应oldbucket是否发生驱逐操作,若未发生则说明当前的值在oldbucket上,将指针b指向oldbucket。 - 到b指针的
bucket上查到对应的key,若找到则返回value。
对应的核心代码如下,读者可参照注释了解大体流程:
func mapaccess1(t *maptype, h *hmap, key unsafe.pointer) unsafe.pointer {
//......
//定位hash值
hash := t.hasher(key, uintptr(h.hash0))
//先定位到bucket指针
m := bucketmask(h.b)
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
//在基于hash到oldbucket定位旧的bucket指针,若未发生驱逐则上b指针指向oldbucket
if c := h.oldbuckets; c != nil {
if !h.samesizegrow() {
// there used to be half as many buckets; mask down one more power of two.
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
//若未发生驱逐则上b指针指向oldbucket
if !evacuated(oldb) {
b = oldb
}
}
top := tophash(hash)
bucketloop:
//循环定位键值对
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketcnt; i++ {
//......
//如果找到的key和我们要查询的key相等则直接返回
if t.key.equal(key, k) {
e := add(unsafe.pointer(b), dataoffset+bucketcnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() {
e = *((*unsafe.pointer)(e))
}
return e
}
}
}
return unsafe.pointer(&zeroval[0])
}
小结
本文通过图解+代码的形式了解go语言中的map如何通过扩容解决散列性能退化,以及在扩容未完成期间如何实现元素的快速定位查询,希望对你有帮助。
以上就是go语言中的map如何解决散列性能下降的详细内容,更多关于go map散列性能下降的资料请关注代码网其它相关文章!
发表评论