参考:
- Go并发——singleflight - 知乎
- 十一. Go并发编程–singleflight - failymao - 博客园
一、背景
介绍:Go的singleflight库提供了一个重复的函数调用抑制机制。
场景:适用于并发读请求量较大的后台服务,以降低存储层的压力。
在大量请求同时请求某一个热点key的场景下,singleflight方法可以很好的解决缓存击穿问题。
- 优点:可以避免同一时间内,大量相同的流量请求都打到数据库上进而引起其压力。通过限制对同一个键值对的多次重复请求,减少对下游如MySQL的瞬时流量。
- 缺点:但是依然有可能阻塞大量请求导致系统等待获取缓存数据的goroutine激增,因此需要灵活使用
DoCall()
异步调用方法和Forget()
方法。
二、原理
将一组相同的请求合并成一个请求。实际上最终只会有 第一个 请求会访问DB,在其获取到结果后,通过本地内存对剩余其他阻塞的请求返回相同结果。
如上图所示:请求1、2、3同时请求相同的key,singleflight机制只会让请求1访问DB,请求1返回的value不仅返回给客户端1,也作为请求2、请求3的结果返回给客户端。这里的多请求理解为多个goroutine并发执行。
底层实际上是通过 go map(区分是否是相同key的请求) + lock(线程安全) + waitgroup(阻塞其他请求),将请求1从DB获取的结果直接通过本地内存去返回给其他相同的请求。
三、使用方法
// Do:传入key和fn回调函数,如果key相同,fn方法只会执行一次,同步等待
// 返回值v:表示fn执行结果
// 返回值err:表示fn的返回的err
// 返回值shared:表示是否是真实fn返回的还是从保存的map[key]返回的,也就是共享的
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {...}
// DoChan:与Do方法类似,区别在于执行函数fn非阻塞,结果通过chan返回给同组请求
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {...}
// Forget:用于主动删除Group的m(map类型)成员中的指定key
// 这样同一批中具有相同key的其他请求,就不会被waitgroup阻塞而等待了
// 控制key关联值是否失效,默认以上两方法只要fn执行完成,内部维护的fn值也删除(即并发结束后就失效了)
func (g *Group) Forget(key string) {...}
四、底层数据结构
// Group:实现singleflight机制的对象,多个请求共用一个group。
// mu:锁,该字段保证并发安全
// m:map类型,该字段保存请求键值对,使用m保证同一键只有一个call对象。
type Group struct {
mu sync.Mutex // 锁,保证m的并发安全
m map[string]*call // 保存请求(key),对应的调用信息(value)包括返回结果等 【懒加载】
}
// Call:调用信息,包括结果和一些统计字段。多个请求的key相同,只会有一个请求被调用
type call struct {
// 通过wg的机制可以保证阻塞相同key的其他请求。
wg sync.WaitGroup
// 请求返回结果,保证在wg.Done之前只写入一次,且在wg.Done之后才会读
val interface{}
err error
// 当前key是否调用了Forget方法
forgotten bool
// 统计相同key的次数
dups int
// 请求返回结果,但是DoChan方法调用,用channel进行通知。
chans []chan<- Result
}
// Result:请求的返回结果
type Result struct {
// 返回值
Val interface{}
Err error
// 是否共享(多个相同key的请求等待)
Shared bool
}
Do
这里只介绍关键的Do方法,它用来执行传入的函数fn。更多源码剖析 参考:大佬文章
这里有两个关键步骤:
- ★①★:针对第一个并发请求,这里只有第一个请求会调用 Add(1),其他的都会调用 wait 被阻塞掉
- ★②★:针对除了第一个请求之外的其他并发请求,进来后会阻塞在Wait这里,等待第一个请求执行完毕或超时Forget
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
// 前面Group.m中提到的【懒加载】
if g.m == nil {
g.m = make(map[string]*call)
}
// 先判断 key 是否已经存在
if c, ok := g.m[key]; ok {
// 如果存在就会解锁
c.dups++
g.mu.Unlock()
// ★②★ 除了第一个请求之外的其他请求,进来后会阻塞在Wait这里,等待第一个请求执行完毕或超时Forget
// 然后等待 WaitGroup 请求执行完毕,只要一执行完,所有的 wait 都会被唤醒
c.wg.Wait()
// 这里区分 panic 错误和 runtime 的错误,避免出现死锁,后面可以看到为什么这么做
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
// 如果我们没有找到这个 key 就 new call
c := new(call)
// ★①★ 这里只有第一个请求会调用 Add(1),其他的都会调用 wait 被阻塞掉
// 所以这要这次调用返回,所有阻塞的调用都会被唤醒
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
// 然后我们调用 doCall 去执行
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
五、回顾与总结:
场景: 在大量请求同时请求某一个热点key的场景下,singleflight库可以很好的解决缓存击穿问题。它通过减少对下游的相同请求,来增加系统吞吐量和服务质量。
作用:当多个goroutine并发执行时,它们会共享同一份内存。这意味着如果一个goroutine正在执行某个函数,而另一个goroutine同时或几乎同时也调用了同一个函数,那么第二个goroutine将会等待第一个goroutine完成,然后直接获取和使用第一个goroutine的结果。这就是singleflight库所做的事情:它确保了对函数的重复调用在逻辑上是串行的,以减少不必要的计算或网络请求(比如:可以减少其他被阻塞的一批请求,再次去访问MySQL或Redis获取重复结果,所带来的性能消耗)。
原理: 通过 go map(区分是否是相同key的请求) + lock(线程安全) + waitgroup(阻塞其他请求),将第一个请求从DB获取的结果直接通过本地内存去返回给其他相同的并发请求。
注意点: 不过在使用时,我们也需要注意以下几个问题:
- singleflight 分别提供了同步和异步的调用方式,这让我们使用起来也更加灵活:
Do()
用于同步阻塞调用传入的函数。DoChan
用于异步调用传入的参数并通过 Channel 接收函数返回值。
Forget
用于主动丢弃超时或异常的key(删除map中某个key → Group.m),防止当前请求故障而导致所有相同key的请求都阻塞住。- 一旦调用的函数fn返回了错误,所有在等待的其他 Goroutine 也都会接收到相同的错误。
Do
和DoChan
方法的第一个参数为key,用于标识不同的任务或请求。在真实业务场景下,key的生成方式可以根据具体需求来确定,但通常需要遵循一定的规则和约定。举个例子:
// 场景:用户概览页 → 安全播报功能
// 在安全播报功能中,展示相关产品:功能更新、行业荣誉、紧急通知和版本发布信息。
// 付费用户类型:不同的付费用户类型有不同的安全播报信息(1-基础版、2-专业版、3-旗舰版...)
// 背景:1、日活用户3000;2、安全播报数据存于MySQL中,且数据量较多
// 为了避免缓存击穿问题的发生,这里引入了 singleflight
// getSafetyBroadcastKey 获取key,传入singleflight的Do/DoChan方法的参数key中
func (d *Dao) getSafetyBroadcastKey(userType int) string {
// 1-基础版、2-专业版、3-旗舰版
return fmt.Sprintf("safety_broadcast_%d", userType)
}