官方推荐golang中错误处理当做值处理, 既然是值那就可以在channel中传输,本文带你看看golang中channel+error来做异步错误处理有多香,看完本文还会觉得golang的错误处理相比java try catch一点优势都没有吗?
场景
如下,一次任务起多个协程异步处理任务,比如同时做服务/redis/mysql/kafka初始化,当某一个协程出现错误(初始化失败)时,程序是停止还是继续呢?如何记录错误?如何控制优雅的退出全部工作协程呢?
为了解决类似的问题,常见如下三种解决方案:
1.中断退出并记录日志
如下,最简单粗暴的方式就是,一旦协程中发生错误,记录日志立即退出,外层如果想取到错误可以通过共享全局变量,单个协程无法控制所有协程动作。
// 出错中断协程,打印日志,退出
func TestSimpleExit(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
//do something
if err := doSomething(); err != nil {
t.Logf("Error when call doSomething:%v\n", err)
return
}
}()
wg.Wait()
}
2.监控error,可选记录日志或退出
前面讲过,既然error是值,那就可以在channel中传输,可以单独开一个channel,所有协程错误都发送到这个通道。
// 数据处理流程
dataFunc := func(ctx context.Context, dataChan chan int, errChan chan error) {
defer wg.Done()
for {
select {
case v, ok := <-dataChan:
if !ok {
log.Println("Receive data channel close msg!")
return
}
if err := doSomething2(v); err != nil {
errChan <- err
continue
}
// do ...
case <-ctx.Done():
log.Println("Receive exit msg!")
return
}
}
}
wg.Add(1)
go dataFunc(ctx, dataChan, errChan)
wg.Add(1)
go dataFunc(ctx, dataChan, errChan)
监控错误error通道,统一记录和退出,一旦检测到错误可以通过ctx通知所有协程退出,这里可以灵活控制监控到错误时的错误处理策略(是否记录日志/是否退出等),error通道可以同步或异步处理。
整体流程如下
异步监控error
// 错误处理流程,error处理通道异步等待
wg.Add(1)
go func(errChan chan error) {
defer wg.Done()
for {
select {
case v, ok := <-errChan:
if !ok {
log.Println("Receice err channel close msg!")
return
}
// 收到错误时,可选择记录日志或退出
if v != nil {
t.Logf("Error when call doSomething:%v\n", v)
cancel() // 通知全部退出
return
}
case <-ctx.Done():
log.Println("Receive exit msg!")
return
}
}
}(errChan)
dataChan <- 1
wg.Wait()
同步监控error
// 错误处理流程,error处理通道同步等待
for {
select {
case v, ok := <-errChan:
if !ok {
log.Println("Receice err channel close msg!")
goto EXIT
}
// 收到错误时,可选择记录日志或退出
if v != nil {
t.Logf("Error when call doSomething:%v\n", v)
cancel()
goto EXIT
}
case <-ctx.Done():
log.Println("Receive exit msg!")
goto EXIT
}
}
EXIT:
wg.Wait()
3.官方库errgroup
考虑到error输出到通道后统一处理是golang常用手段,官方也针对封装了一个error处理包,errgroup顾名思义,多个协程的error被当做一个组,一旦某个协程出错所有协程都退出,只输出第一个error。
func TestSimpleChannel5(t *testing.T) {
eg, ctx := errgroup.WithContext(context.Background())
dataChan := make(chan int)
defer close(dataChan)
// 数据处理流程
dataFunc := func() error {
for {
select {
case v, ok := <-dataChan:
if !ok {
log.Println("Receive data channel close msg!")
return nil
}
if err := doSomething2(v); err != nil {
return err
}
// do ...
// 增加ctx通知完成
case <-ctx.Done():
log.Println("Receive exit msg!")
return nil
}
}
}
eg.Go(dataFunc)
eg.Go(dataFunc)
eg.Go(dataFunc)
dataChan <- 1
// 错误处理流程,任何一个协程出现error,则会调用ctx对应cancel函数,所有相关协程都会退出
if err := eg.Wait(); err != nil {
fmt.Printf("Something is wrong->%v\n", err)
}
}
类似上一小节,可以看到errgroup就是结合waitgroup cancel和channel通道封装的。
4.监控error,全部日志合并后输出
同样是上述场景,有时候我们的需求是返回所有的错误(不是第一个错误)。
- 比如在服务启动时,对 redis、kafka、mysql 等各种资源初始化场景,可以把所有相关资源初始化的错误都返回,展示给用户统一排查。
- 另一种场景就是在 web 请求中,校验请求参数时,返回所有参数的校验错误给客户端的场景。
这种需求,一般考虑使用多错误管理(hashicorp/go-multierror库),如下一个简答同步任务演示多错误管理,所有返回的错误可以通过Append归并成一个错误,实际上是通过error wrap的方式合并起来的,因此也可以使用Is/As判断嵌套error。
// 多路协程error合并,用于多路check场景
func TestSimpleChannel3(t *testing.T) {
// 同步执行多个任务,返回error合并
var err = func() error {
var result error
if err := doSomething(); err != nil {
result = multierror.Append(result, err)
}
if err := doSomething2(nil); err != nil {
result = multierror.Append(result, err)
}
return result
}()
// 打印输出
if err != nil {
fmt.Printf("%v\n", err)
}
// 获取错误列表
if err != nil {
if merr, ok := err.(*multierror.Error); ok {
fmt.Printf("%v\n", merr.Errors)
}
}
// 判断是否为某种类型
if err != nil && errors.Is(err, Error1) {
fmt.Println("Errors contain error 1")
}
// 判断是否其中一个error能够转换成指定error
var e MyError
if err != nil && errors.As(err, &e) {
fmt.Println("One Error can be convert to nyerror")
}
}
那么,在起多个异步任务时,就可以如下处理,返回的多个error通过channel消费合并展示。
func TestSimpleChannel4(t *testing.T) {
wg := sync.WaitGroup{}
taskNum := 10
errChan := make(chan error, taskNum)
// 异步执行多个任务
step := func(stepNum int, errChan chan error) {
defer wg.Done()
errChan <- fmt.Errorf("step %d error", stepNum)
}
for i := 0; i < taskNum; i++ {
wg.Add(1)
go step(i, errChan)
}
// 等待任务完成
go func() {
wg.Wait()
close(errChan)
}()
// err通道阻塞等待,可能的所有错误合并
var result *multierror.Error
for err := range errChan {
result = multierror.Append(result, err)
}
// 出现一个错误时,选择记录日志或退出
if len(result.Errors) != 0 {
log.Println(result.Errors)
}
}
参考
演示代码 https://gitee.com/wenzhou1219/go-in-prod/tree/master/error_group
errgroup源码解析 https://zhuanlan.zhihu.com/p/416054707
errgroup使用参考 https://zhuanlan.zhihu.com/p/338999914
go-multierror使用参考 https://zhuanlan.zhihu.com/p/581030231