golang.org/x/sync/errgroup
errgroup提供了一组并行任务中错误采集的方案。
先看注释
Package errgroup provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.
Group 结构体
// A Group is a collection of goroutines working on subtasks that are part of the same overall task.
// A zero Group is valid and does not cancel on error.
type Group struct {
cancel func() // 内部使用的结束方法
wg sync.WaitGroup // 内嵌sync.WaitGroup,用来阻塞
errOnce sync.Once // 只采集一次error
err error // 采集到的error
}
WithContext 返回 *Group 与 context.Context
context.Context 会在子任务发生错误,或Wait方法结束阻塞时被取消
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx) // 创建可以取消的context
return &Group{cancel: cancel}, ctx // 创建并返回句柄
}
Wait 作为阻塞屏障,与WaitGroup的Wai方法作用一样
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait() // 阻塞
if g.cancel != nil { // 阻塞结束后,cancel掉整个子Context链,Wait结束阻塞
g.cancel()
}
return g.err // 返回收集的error
}
Go 创建新的协程去执行子任务
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
g.wg.Add(1) // 令牌+1
go func() {
defer g.wg.Done() // 方法执行结束后令牌-1, 令牌为0的时候WaitGroup的Wait方法结束阻塞
if err := f(); err != nil { //执行传入的f()方法,并检测err
g.errOnce.Do(func() { // 如果err不为空,则将err赋值给g.err,并且只赋值一次
g.err = err
if g.cancel != nil { //如果cancel非空,则执行该方法,通知Context链,做取消操作。
g.cancel()
}
})
}
}()
}
原理简析
eg, ctx := errgroup.WithContext(context.Background()) // ctx还可以作为父ctx传递给其他函数调用
eg.Go(func() error {
return // ...
})
eg.Go(func() error {
return // ...
})
if err := eg.Wait(); err != nil {
// ...
}
errgroup是通过封装WaitGroup,Context,sync.Once来实现的。它利用了WaitGroup的Add,Done,Wait方法实现阻塞屏障。使用context.WithCancel来实现取消策略,取消策略针对WithContext传递出的ctx。使用sync.Once实现只保存一组任务中第一次出现的error。
errgroup.Wait结束阻塞的时机:wg sync.WaitGroup的令牌归零。
ctx 传播信号的几个时机:
- 一组任务中任何一个子任务产生error,执行了cancel。
- 一组任务顺利执行结束,Wait中执行了cancel。
- 外部传入的context为可取消的context。外层调用了cancel方法。
c, cancel := context.WithCancel(context.Background())
eg, ctx := errgroup.WithContext(c)
go func() {
select {
case <-ctx.Done():
fmt.Println("结束")
return
}
}()
eg.Go(func() error {
time.Sleep(time.Second * 10)
return nil
})
eg.Go(func() error {
time.Sleep(time.Second * 10)
return nil
})
cancel()
if err := eg.Wait(); err != nil {
fmt.Println(err)
}