工作中查看项目代码,发现会存在使用 GO 语言做并发的时候出现各种各样的异常情况,有的输出结果和自己期望和设计的不一致,有的是程序直接阻塞住,更有甚者直接是程序 crash 掉。
实际上,出现上述的情况,还是因为我们对于 GO 语言的并发模型和涉及的 GO 语言基础不够扎实,误解了语言的用法。
那么,对于 GO 语言的并发模式,我们一起来梳理一波。 GO 语言常见的并发模式有这些:
- 创建模式
- 退出模式
- 管道模式
- 超时模式和取消模式
在 GO 语言里面,咱们使用使用并发,自然离不开使用 GO 语言的协程 goroutine,通道 channel 和 多路复用 select,接下来就来看看各种模式都是如何去搭配使用这三个关键原语的
创建模式
使用过通道和协程的朋友对于创建模式肯定不会模式,这是一个非常常用的方式,也是一个非常简单的使用方式:
- 主协程中调用 help 函数,返回一个通道 ch 变量
- 通道 ch 用于主协程和 子协程之间的通信,其中通道的数据类型完全可以自行定义
type XXX struct{...}
func help(fn func()) chan XXX {
ch := make(chan XXX)
// 开启一个协程
go func(){
// 此处的协程可以控制和外部的 主协程 通过 ch 来进行通信,达到一定逻辑便可以执行自己的 fn 函数
fn()
ch <- XXX
}()
}
func main(){
ch := help(func(){
fmt.Println("这是GO 语言 并发模式之 创建模式")
})
<- ch
}
退出模式
程序的退出我们应该也不会陌生,对于一些常驻的服务,如果是要退出程序,自然是不能直接就断掉,此时会有一些连接和业务并没有关闭,直接关闭程序会导致业务异常,例如在关闭过程中最后一个 http 请求没有正常响应等等等
此时,就需要做优雅关闭了,对于协程 goroutine 退出有 3 种模式
- 分离模式
- join 模式
- notify-and-wait 模式
分离模式
此处的分离模式,分离这个术语实际上是线程中的术语,pthread detached
分离模式可以理解为,咱们创建的协程 goroutine,直接分离,创建子协程的父协程不用关心子协程是如何退出的,子协程的生命周期主要与它执行的主函数有关,咱们 return 之后,子协程也就结束了
对于这类分离模式的协程,咱们需要关注两类,一种是一次性的任务,咱们 go 出来后,执行简单任务完毕后直接退出,一种是常驻程序,需要优雅退出,处理一些垃圾回收的事情
例如这样:
- 主程序中设置一个通道变量 ch ,类型为 os.Signal
- 然后主程序就开始各种创建协程执行自己的各种业务
- 直到程序收到了 syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT 任意一个信号的时候,则会开始进行垃圾回收等清理工作,执行完毕后,程序再进行退出
func main(){
ch := make(chan os.Signal)
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
// ...
// go 程序执行其他业务
// ...
for i := range ch {
switch i {
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
// 做一些清理工作
os.Exit(0)
}
}
}
join 模式
看到这个关键字,是不是也似曾相识,和线程貌似很像,例如 线程中 父线程可以通过 pthread_join 来等待子线程结束,并且还可以获取子线程的结束状态
GO 语言中等待子协程退出并且获取子协程的退出状态,咱们就可以使用通道 channel 的方式来进行处理
例子1
等待一个子协程退出,并获取退出状态
- 主协程中调用 help 方法得到一个 ch 通道变量,主协程阻塞着读 ch
- help 中开辟一个子协程去执行传入的 fn 回调函数,并传参为 ok bool
- 实际 fn 函数判断传参 ok 是否是 true,若不是则返回具体的错误信息,若是 true 则返回 nil
func help(f func(bool) error, ok bool) <-chan error {
ch := make(chan error)
go func() {
ch <- f(ok)
}()
return ch
}
func fn(ok bool) error {
if !ok {
return errors.New("not ok ... ")
}
return nil
}
func main() {
ch := help(fn, true)
fmt.Println("help 111")
err := <-ch
fmt.Println("help 111 done ", err)
ch = help(fn, false)
fmt.Println("help 222")
err = <-ch
fmt.Println("help 222 done ", err)
}
看上如上程序,我们就可以知道,第一次调用 help(fn , true)
,主协程等待子协程退出的时候,会得到一个错误信息,为 not ok ...
, 第二次调用 help(fn , false)
的时候,返回的 err 是一个 nil
通过上述这种方式,主协程不仅可以轻易的等待一个子协程退出,还可以获取到子协程退出的状态
那么,主协程如果是等待多个协程退出呢?需要如何处理?
例子2
主协程等待多个协程退出咱们就需要使用到 GO 中的 sync.WaitGroup
- 使用 help 函数,传入回调函数,参数1 bool,参数2 int ,其中参数 2 表示开辟子协程的个数,返回值为一个无缓冲的 channel 变量,数据类型是 struct{}
- 使用 var wg sync.WaitGroup ,开辟子协程的时候记录一次 wg.Add(1),当子协程退出时 ,记录退出 wg.Done()
- help 中再另起一个协程 wg.Wait() 等待所有子协程退出,并将 ch 变量写入值
- 主协程阻塞读取 ch 变量的值,待所有子协程都退出之后,help 中写入到 ch 中的数据,主协程就能马上收到 ch 中的数据,并退出程序
func help(f func(bool)error, ok bool, num int)chan struct{}{
ch := make(chan struct{})
var wg sync.WaitGroup
for i:=0; i<num; i++ {
wg.Add(1)
go func(){
f(ok)
fmt.Println(" f done ")
wg.Done()
}()
}
go func(){
// 等待所有子协程退出
wg.Wait()
ch <- struct{}{}
}()
return ch
}
func fn(ok bool) error{
time.Sleep(time.Second * 1)
if !ok{
return errors.New("not ok ... ")
}
return nil
}
func main(){
ch := help(fn , true)
fmt.Println("help 111")
<- ch
fmt.Println("help 111 done ",err)
}
notify-and-wait 模式
可以看到上述模式,都是主协程等待一个子协程,或者多个子协程结束后,主协程再进行退出,或者处理完垃圾回收后退出
那么如果主协程要主动通知子协程退出,我们应该要如何处理呢?
同样的问题,如果主协程自己退出了,而没有通知其他子协程退出,这是会导致业务数据异常或者丢失的,那么此刻我们就可以使用到 notify-and-wait 模式 来进行处理
我们就直接来写一个主协程通知并等待多个子协程退出的 demo:
- 主协程调用 help 函数,得到一个 quit chan struct{} 类型的通道变量,主协程阻塞读取 quit 的值
- help 函数根据传入的参数 num 来创建 num 个子协程,并且使用 sync.WaitGroup 来控制
- 当主协程在 quit 通道中写入数据时,主动通知所有子协程退出
- help 中的另外一个协程读取到 quit 通道中的数据,便 close 掉 j 通道,触发所有的子协程读取 j 通道值的时候,得到的 ok 为 false,进而所有子协程退出
- wg.Wait() 等待所有子协程退出后,再在 quit 中写入数据
- 主协程此时从 quit 中读取到数据,则知道所有子协程全部退出,自己的主协程即刻退出
func fn(){
// 模拟在处理业务
time.Sleep(time.Second * 1)
}
func help(num int, f func()) chan struct{}{
quit := make(chan struct{})
j := make(chan int)
var wg sync.WaitGroup
// 创建子协程处理业务
for i:=0;i<num;i++{
wg.Add(1)
go func(){
defer wg.Done()
_,ok:=<-j
if !ok{
fmt.Println("exit child goroutine .")
return
}
// 子协程 正常执行业务
f()
}()
}
go func(){
<-quit
close(j)
// 等待子协程全部退出
wg.Wait()
quit <- struct{}{}
}()
return quit
}
func main(){
quit := help(10, fn)
// 模拟主程序处理在处理其他事项
// ...
time.Sleep(time.Second * 10)
quit <- struct{}{}
// 此处等待所有子程序退出
select{
case <- quit:
fmt.Println(" programs exit. ")
}
}
上述程序执行结果如下,可以看到 help 函数创建了 10 个子协程,主协程主动通知子协程全部退出,退出的时候也是 10 个子协程退出了,主协程才退出
上述程序,如果某一个子协程出现了问题,导致子协程不能完全退出,也就是说某些子协程在 f 函数中阻塞住了,那么这个时候主协程岂不是一直无法退出???
那么此时,在主协程通知子协程退出的时候,我们加上一个超时时间,表达意思为,超过某个时间,如果子协程还没有全部退出完毕,那么主协程仍然主动关闭程序,可以这样写:
- 设定一个定时器, 3 秒后会触发,即可以从 t.C 中读取到数据
t := time.NewTimer(time.Second * 3)
defer t.Stop()
// 此处等待所有子程序退出
select{
case <-t.C:
fmt.Println("timeout programs exit. ")
case <- quit:
fmt.Println(" 111 programs exit. ")
}
管道模式
说到管理,或许大家对 linux 里面的管道更加熟悉吧,例如使用 linux 命令找到文件中的 golang 这个字符串
cat xxx.txt |grep "golang"
那么对于 GO 语言并发模式中的管道模式也是类似的效果,我们就可以用这个管道模式来过滤数据
例如我们可以设计这样一个程序,兄弟们可以动起手来写一写,评论区见哦:
- 整个程序总共使用 2 个通道
- help 函数中传输数据量 50 ,逻辑计算能够被 5 整除的数据写到第一个通道 ch1 中
- 另一个协程阻塞读取 ch1 中的内容,并将取出的数据乘以 3 ,将结果写入到 ch2 中
- 主协程就阻塞读取 ch2 的内容,读取到内容后,挨个打印出来
管道模式有两种模式,扇出模式 和 扇入模式,这个比较好理解
- 扇出模式:多种类型的数据从同一个通道 channel 中读取数据,直到通道关闭
- 扇入模式:输入的时候有多个通道channel,程序将所有的通道内数据汇聚,统一输入到另外一个通道channel A 里面,另外一个程序则从这个通道channel A 中读取数据,直到这个通道A关闭为止
超时模式和取消模式化
超时模式
上述例子中有专门说到如何去使用他,实际上我们还可以这样用:
select{
case <- time.Afer(time.Second * 2):
fmt.Println("timeout programs exit. ")
case <- quit:
fmt.Println(" 111 programs exit. ")
}
取消模式
则是使用了 GO 语言的 context 包中的提供了上下文机制,可以在协程 goroutine 之间传递 deadline,取消等信号
我们使用的时候例如可以这样:
- 使用 context.WithCancel 创建一个可以被取消的上下文,启动一个协程 在 3 秒后关闭上下文
- 使用 for 循环模拟处理业务,默认会走 select 的 default 分支
- 3 秒后 走到 select 的 ctx.Done(),则进入到了取消模式,程序退出
ctx, cancelFunc := context.WithCancel(context.Background())
go func() {
time.Sleep(time.Second * 3)
cancelFunc()
}()
for {
select {
case <-ctx.Done():
fmt.Println("program exit .")
return
default:
fmt.Println("I'm still here.")
time.Sleep(time.Second)
}
}
总的来说,今天分享了 GO 语言中常见的几种并发模式:创建模式,退出模式,管道模式,超时模式和取消模式,更多的,还是要我们要思考其原理和应用起来,学习他们才能更加的有效
欢迎点赞,关注,收藏
朋友们,你的支持和鼓励,是我坚持分享,提高质量的动力
好了,本次就到这里
技术是开放的,我们的心态,更应是开放的。拥抱变化,向阳而生,努力向前行。
我是阿兵云原生,欢迎点赞关注收藏,下次见~
可以进入地址进行体验和学习:https://xxetb.xet.tech/s/3lucCI