文章目录
- 一、并发基础
- 1. 协程(Goroutine)
- 2. Channel
- 2.1 声明
- 2.2 无缓冲 channel
- 2.3 有缓冲 channel
- 2.4 关闭 channel
- 2.5 单向 channel
- 2.6 select+channel 示例
- 二、同步原语
- 1. 资源竞争
- 2. 同步原语
- 2.1 sync.Mutex
- 2.2 sync.RWMutex
- 2.3 sync.WaitGroup
- 2.4 sync.Once
- 2.5 sync.Cond
- 3. sync.Map
- 三、Context
- 1. 协程退出
- 2. Context
- 2.1 使用示例
- 2.2 定义
- 2.3 Context 树
- 2.4 使用 Context 取消多个协程
- 2.5 Context 传值
- 2.6 Context 使用原则
- 3. 实例:通过 Context 实现日志跟踪
- 四、并发模式
- 1. for select 循环模式
- 1.1 无限循环模式
- 1.2 for range select 有限循环
- 2. select timeout 模式
- 3. Pipeline 模式
- 4. 扇出和扇入模式
- 5. Futures 模式
一、并发基础
1. 协程(Goroutine)
Go 语言中没有线程的概念,只有协程,也称为 goroutine。相比线程来说,协程更加轻量,一个程序可以随意启动成千上万个 goroutine。
goroutine 被 Go runtime 所调度,这一点和线程不一样。也就是说,Go 语言的并发是由 Go 自己所调度的,自己决定同时执行多少个 goroutine,什么时候执行哪几个。这些对于开发者来说完全透明,只需要在编码的时候告诉 Go 语言要启动几个 goroutine,至于如何调度执行不用关心。
Go 语言提供了 go 关键字来启动一个 goroutine,语法如下:
go function()
go 关键字后跟一个方法或者函数的调用,就可以启动一个 goroutine,让方法在这个新启动的 goroutine 中运行。
示例:
func main() {
go fmt.Println("小明")
fmt.Println("我是 main goroutine")
time.Sleep(time.Second)
}
这段代码里有两个 goroutine,一个是 main 函数启动的 main goroutine,一个是通过 go 关键字启动的 goroutine。执行时程序是并发的,go 关键字启动的 goroutine 并不阻塞 main goroutine 的执行。
示例中的 time.Sleep(time.Second) 表示等待一秒,这里是让 main goroutine 等一秒,不然 main goroutine 执行完毕程序就退出了,也就看不到启动的新 goroutine 中的打印结果了。
2. Channel
Go 语言提供的 channel(通道)主要用于解决多个 goroutine之间的通信问题。
在 Go 语言中,提倡通过通信来共享内存,而不是通过共享内存来通信,其实就是提倡通过 channel 发送接收消息的方式进行数据传递,而不是通过修改同一个变量,所以在数据流动、传递的场景中要优先使用 channel,它是并发安全的,性能也不错。
channel 内部使用了互斥锁来保证并发的安全
2.1 声明
在 Go 语言中,声明一个 channel 可使用内置的 make 函数,如下所示:
ch:=make(chan string)
其中 chan 是一个关键字,表示是 channel 类型。后面的 string 表示 channel 里的数据是 string 类型。通过 channel 的声明也可以看到,chan 是一个集合类型。
定义好 chan 后就可以使用了,一个 chan 的操作只有两种:发送和接收。
- 接收:获取 chan 中的值,操作符为 <- chan。
- 发送:向 chan 发送值,把值放在 chan 中,操作符为 chan <-。
发送和接收的操作符都是 <- ,只不过位置不同。接收的 <- 操作符在 chan 的左侧,发送的 <- 操作符在 chan 的右侧。
使用 chan 来代替上例中 time.Sleep 函数的等待工作,如下面的代码所示:
func main() {
ch:=make(chan string)
go func() {
fmt.Println("小明")
ch <- "goroutine 完成"
}()
fmt.Println("我是 main goroutine")
v:=<-ch
fmt.Println("接收到的chan中的值为:",v)
}
在上面的示例中,新启动的 goroutine 中向 chan 类型的变量 ch 发送值;在 main goroutine 中,从变量 ch 接收值;如果 ch 中没有值,则阻塞等待到 ch 中有值可以接收为止。
2.2 无缓冲 channel
上面的示例中,使用 make 创建的 chan 就是一个无缓冲 channel,它的容量是 0,不能存储任何数据。所以无缓冲 channel 只起到传输数据的作用,数据并不会在 channel 中做任何停留。这也意味着,无缓冲 channel 的发送和接收操作是同时进行的,它也可以称为同步 channel。
2.3 有缓冲 channel
有缓冲 channel 类似一个可阻塞的队列,内部的元素先进先出。通过 make 函数的第二个参数可以指定 channel 容量的大小,进而创建一个有缓冲 channel,如下面的代码所示:
cacheCh:=make(chan int,5)
上例创建了一个容量为 5 的 channel,内部的元素类型是 int,也就是说这个 channel 内部最多可以存放 5 个类型为 int 的元素,如下图所示:
有缓冲 channel 具备以下特点:
- 有缓冲 channel 的内部有一个缓冲队列;
- 发送操作是向队列的尾部插入元素,如果队列已满,则阻塞等待,直到另一个 goroutine 执行接收操作释放队列的空间;
- 接收操作是从队列的头部获取元素并把它从队列中删除,如果队列为空,则阻塞等待,直到另一个 goroutine 执行发送操作插入新的元素。
通过内置函数 cap 可以获取 channel 的容量,也就是最大能存放多少个元素,通过内置函数 len 可以获取 channel 中元素的个数。
cacheCh:=make(chan int,5)
cacheCh <- 2
cacheCh <- 3
fmt.Println("cacheCh容量为:",cap(cacheCh),",元素个数为:",len(cacheCh))
无缓冲 channel 其实就是一个容量大小为 0 的 channel。比如 make(chan int,0)。
2.4 关闭 channel
channel 可以使用内置函数 close 关闭,如下面的代码所示:
close(cacheCh)
如果一个 channel 被关闭了,就不能向里面发送数据了,如果发送的话,会引起 painc 异常。但是还可以接收 channel 里的数据,如果 channel 里没有数据的话,接收的数据是元素类型的零值。
2.5 单向 channel
有一些特殊的业务需求需要限制一个 channel 只可以接收但是不能发送,或者限制一个 channel 只能发送但不能接收,这种 channel 称为单向 channel。
单向 channel 需要在声明的时候带上 <- 操作符即可,如下面的代码所示:
onlySend := make(chan<- int)
onlyReceive:=make(<-chan int)
声明单向 channel <- 操作符的位置和上面讲到的发送和接收操作是一样的。
在函数或者方法的参数中,使用单向 channel 的较多,这样可以防止一些操作影响了 channel。
示例:
func counter(out chan<- int) {
//函数内容使用变量out,只能进行发送操作
}
上例中的 counter 函数的参数 out 是一个只能发送的 channel,所以在 counter 函数体内使用参数 out 时,只能对其进行发送操作,如果执行接收操作,则程序不能编译通过。
2.6 select+channel 示例
在 Go 语言中,通过 select 语句可以实现多路复用,其语句格式如下:
select {
case i1 = <-c1:
//todo
case c2 <- i2:
//todo
default:
// default todo
}
select 结构和 switch 非常像,都有 case 和 default,只不过 select 的 case 是一个个可以操作的 channel。它可以监听多个 channel 的读写操作,实现处理多个 channel 的并发操作。
多路复用可以简单地理解为,N 个 channel 中,任意一个 channel 有数据产生,select 都可以监听到,然后执行相应的分支,接收数据并处理。
示例:
从网上下载一个文件启动 3 个 goroutine 进行下载,并把结果发送到 3 个 channel 中,哪个先下载好就使用哪个 channel 的结果。如下面的代码所示:
func main() {
//声明三个存放结果的channel
firstCh := make(chan string)
secondCh := make(chan string)
threeCh := make(chan string)
//同时开启3个goroutine下载
go func() {
firstCh <- downloadFile("firstCh")
}()
go func() {
secondCh <- downloadFile("secondCh")
}()
go func() {
threeCh <- downloadFile("threeCh")
}()
//开始select多路复用,哪个channel能获取到值,
//就说明哪个最先下载好,就用哪个。
select {
case filePath := <-firstCh:
fmt.Println(filePath)
case filePath := <-secondCh:
fmt.Println(filePath)
case filePath := <-threeCh:
fmt.Println(filePath)
}
}
func downloadFile(chanName string) string {
//模拟下载文件,可以自己随机time.Sleep点时间试试
time.Sleep(time.Second)
return chanName+":filePath"
}
如果这些 case 中有一个可以执行,select 语句会选择该 case 执行,如果同时有多个 case 可以被执行,则随机选择一个,这样每个 case 都有平等的被执行的机会。如果一个 select 没有任何 case,那么它会一直等待下去。
二、同步原语
1. 资源竞争
如果同一块内存被多个 goroutine 同时访问,就会产生不知道谁先访问也无法预料最后结果的情况。这就是资源竞争,这块内存可以称为共享的资源。
示例:
//共享的资源
var sum = 0
func main() {
//开启100个协程让sum+10
for i := 0; i < 100; i++ {
go add(10)
}
//防止提前退出
time.Sleep(2 * time.Second)
fmt.Println("和为:",sum)
}
func add(i int) {
sum += i
}
当运行程序后,可能和为 1000,但也可能是 990 或者 980。导致这种情况的核心原因是资源 sum 不是并发安全的,因为同时会有多个协程交叉执行 sum+=i,产生不可预料的结果。
使用 go build、go run、go test 这些 Go 语言工具链提供的命令时,添加 -race 标识可以检查 Go 语言代码是否存在资源竞争。
2. 同步原语
同步原语通常用于更复杂的并发控制,如果追求更灵活的控制方式和性能可以使用。
2.1 sync.Mutex
互斥锁,顾名思义,指的是在同一时刻只有一个协程执行某段代码,其他协程都要等待该协程执行完毕后才能继续执行。
示例:
var(
sum int
mutex sync.Mutex
)
func add(i int) {
mutex.Lock()
sum += i
mutex.Unlock()
}
以上被加锁保护的 sum+=i 代码片段又称为临界区。在同步的程序设计中,临界区段指的是一个访问共享资源的程序片段,而这些共享资源又有无法同时被多个协程访问的特性。 当有协程进入临界区段时,其他协程必须等待,这样就保证了临界区的并发安全。
互斥锁的使用非常简单,它只有两个方法 Lock 和 Unlock,代表加锁和解锁。当一个协程获得 Mutex 锁后,其他协程只能等到 Mutex 锁释放后才能再次获得锁。
Mutex 的 Lock 和 Unlock 方法总是成对出现,而且要确保 Lock 获得锁后,一定执行 UnLock 释放锁,所以在函数或者方法中会采用 defer 语句释放锁,如下面的代码所示:
func add(i int) {
mutex.Lock()
defer mutex.Unlock()
sum += i
}
2.2 sync.RWMutex
读取操作使用多个协程的示例:
func main() {
for i := 0; i < 100; i++ {
go add(10)
}
for i:=0; i<10;i++ {
go fmt.Println("和为:",readSum())
}
time.Sleep(2 * time.Second)
}
//增加了一个读取sum的函数,便于演示并发
func readSum() int {
b:=sum
return b
}
这个示例开启了 10 个协程,它们同时读取 sum 的值。因为 readSum 函数并没有任何加锁控制,所以它不是并发安全的,即一个 goroutine 正在执行 sum+=i 操作的时候,另一个 goroutine 可能正在执行 b:=sum 操作,这就会导致读取的 num 值是一个过期的值,结果不可预期。
可以使用互斥锁 sync.Mutex 解决以上资源竞争的问题,如下面的代码所示:
func readSum() int {
mutex.Lock()
defer mutex.Unlock()
b:=sum
return b
}
因为 add 和 readSum 函数使用的是同一个 sync.Mutex,所以它们的操作是互斥的。但这个操作因为每次读写共享资源都要加锁,所以性能低下。
分析读写这个特殊场景,有以下几种情况:
- 写的时候不能同时读,因为这个时候读取的话可能读到脏数据(不正确的数据);
- 读的时候不能同时写,因为也可能产生不可预料的结果;
- 读的时候可以同时读,因为数据不会改变,所以不管多少个 goroutine 读都是并发安全的。
可以通过读写锁 sync.RWMutex 来优化这段代码,提升性能。将以上示例改为读写锁,如下所示:
var mutex sync.RWMutex
func readSum() int {
//只获取读锁
mutex.RLock()
defer mutex.RUnlock()
b:=sum
return b
}
对比互斥锁的示例,读写锁的改动有两处:
- 把锁的声明换成读写锁 sync.RWMutex。
- 把函数 readSum 读取数据的代码换成读锁,也就是 RLock 和 RUnlock。
2.3 sync.WaitGroup
有没有办法监听所有协程的执行,一旦全部执行完毕,程序马上退出,这样既可保证所有协程执行完毕,又可以及时退出节省时间,提升性能。channel 可以解决这个问题,不过非常复杂,Go 语言提供了更简洁的解决办法,它就是 sync.WaitGroup。
示例:
func run(){
var wg sync.WaitGroup
//因为要监控110个协程,所以设置计数器为110
wg.Add(110)
for i := 0; i < 100; i++ {
go func() {
//计数器值减1
defer wg.Done()
add(10)
}()
}
for i:=0; i<10;i++ {
go func() {
//计数器值减1
defer wg.Done()
fmt.Println("和为:",readSum())
}()
}
//一直等待,只要计数器值为0
wg.Wait()
}
sync.WaitGroup 的使用比较简单,一共分为三步:
- 声明一个 sync.WaitGroup,然后通过 Add 方法设置计数器的值,需要跟踪多少个协程就设置多少,这里是 110;
- 在每个协程执行完毕时调用 Done 方法,让计数器减 1,告诉 sync.WaitGroup 该协程已经执行完毕;
- 最后调用 Wait 方法一直等待,直到计数器值为 0,也就是所有跟踪的协程都执行完毕。
sync.WaitGroup 适合协调多个协程共同做一件事情的场景,比如下载一个文件,假设使用 10 个协程,每个协程下载文件的 1/10 大小,只有 10 个协程都下载好了整个文件才算是下载好了。
2.4 sync.Once
在实际的工作中可能会有这样的需求:让代码只执行一次,哪怕是在高并发的情况下,比如创建一个单例。
针对这种情形,Go 语言提供了 sync.Once 来保证代码只执行一次,如下所示:
func main() {
doOnce()
}
func doOnce() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
//用于等待协程执行完毕
done := make(chan bool)
//启动10个协程执行once.Do(onceBody)
for i := 0; i < 10; i++ {
go func() {
//把要执行的函数(方法)作为参数传给once.Do方法即可
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
}
这是 Go 语言自带的一个示例,虽然启动了 10 个协程来执行 onceBody 函数,但是因为用了 once.Do 方法,所以函数 onceBody 只会被执行一次。也就是说在高并发的情况下,sync.Once 也会保证 onceBody 函数只执行一次。
sync.Once 适用于创建某个对象的单例、只加载一次的资源等只执行一次的场景。
2.5 sync.Cond
sync.Cond 可以让所有协程可以开始执行,关键点在于协程开始的时候是等待的,要等待 sync.Cond 唤醒才能执行。sync.Cond 从字面意思看是条件变量,它具有阻塞协程和唤醒协程的功能,所以可以在满足一定条件的情况下唤醒协程,但条件变量只是它的一种使用场景。
以 10 个人赛跑为例,在这个示例中有一个裁判,裁判要先等这 10 个人准备就绪,然后一声发令枪响,这 10 个人就可以开始跑了,如下所示:
//10个人赛跑,1个裁判发号施令
func race(){
cond :=sync.NewCond(&sync.Mutex{})
var wg sync.WaitGroup
wg.Add(11)
for i:=0;i<10; i++ {
go func(num int) {
defer wg.Done()
fmt.Println(num,"号已经就位")
cond.L.Lock()
cond.Wait()//等待发令枪响
fmt.Println(num,"号开始跑……")
cond.L.Unlock()
}(i)
}
//等待所有goroutine都进入wait状态
time.Sleep(2*time.Second)
go func() {
defer wg.Done()
fmt.Println("裁判已经就位,准备发令枪")
fmt.Println("比赛开始,大家准备跑")
cond.Broadcast()//发令枪响
}()
//防止函数提前返回退出
wg.Wait()
}
具体步骤如下:
- 通过 sync.NewCond 函数生成一个 *sync.Cond,用于阻塞和唤醒协程;
- 启动 10 个协程模拟 10 个人,准备就位后调用 cond.Wait() 方法阻塞当前协程等待发令枪响,这里需要注意的是调用 cond.Wait() 方法时要加锁;
- time.Sleep 用于等待所有人都进入 wait 阻塞状态,这样裁判才能调用 cond.Broadcast() 发号施令;
- 裁判准备完毕后,就可以调用 cond.Broadcast() 通知所有人开始跑了。
sync.Cond 有三个方法,它们分别是:
5. Wait,阻塞当前协程,直到被其他协程调用 Broadcast 或者 Signal 方法唤醒,使用的时候需要加锁,使用 sync.Cond 中的锁即可,也就是 L 字段。
6. Signal,唤醒一个等待时间最长的协程。
7. Broadcast,唤醒所有等待的协程。
注意:在调用 Signal 或者 Broadcast 之前,要确保目标协程处于 Wait 阻塞状态,不然会出现死锁问题。
3. sync.Map
sync.Map 的使用和内置的 map 类型一样,只不过它是并发安全的。sync.Map 支持的方法如下:
- Store:存储一对 key-value 值。
- Load:根据 key 获取对应的 value 值,并且可以判断 key 是否存在。
- LoadOrStore:如果 key 对应的 value 存在,则返回该 value;如果不存在,存储相应的 value 并返回该值。
- Delete:删除一个 key-value 键值对。
- Range:循环迭代 sync.Map,需要在参数中提供一个回调函数。该回调函数在需要继续迭代遍历时,返回 true,终止迭代遍历时,返回 false。
三、Context
1. 协程退出
下例是一个监控狗的监控程序,使用 select+channel 检测协程退出。
func main() {
var wg sync.WaitGroup
wg.Add(1)
stopCh := make(chan bool) //用来停止监控狗
go func() {
defer wg.Done()
watchDog(stopCh,"【监控狗1】")
}()
time.Sleep(5 * time.Second) //先让监控狗监控5秒
stopCh <- true //发停止指令
wg.Wait()
}
func watchDog(stopCh chan bool,name string){
//开启for select循环,一直后台监控
for{
select {
case <-stopCh:
fmt.Println(name,"停止指令已收到,马上停止")
return
default:
fmt.Println(name,"正在监控……")
}
time.Sleep(1*time.Second)
}
}
上例通过 channel 发送指令让监控狗停止,进而达到协程退出的目的。以上示例主要有两处关键点,具体如下:
- 为 watchDog 函数传递的 stopCh 参数,用于接收停止指令;
- 在 main 函数中,声明用于停止的 stopCh,传递给 watchDog 函数,然后通过 stopCh<-true 发送停止指令让协程退出。
2. Context
Context 通过 With 系列函数生成 Context 树,把相关的 Context 关联起来统一进行控制。取消时关联的 Context 都会发出取消信号,使用这些 Context 的协程就可以收到取消信号,然后清理退出。在定义函数的时候,如果想让外部给函数发取消信号,就可以为这个函数增加一个 Context 参数,让外部的调用者可以通过 Context 进行控制,比如下载一个文件超时退出的需求。
2.1 使用示例
通过 select+channel 让协程退出的方式比较优雅,但是如果希望做到同时取消很多个协程, select+channel 的局限性就凸现出来了,即使定义了多个 channel 解决问题,代码逻辑也会非常复杂、难以维护。
使用 Context 重写上面的示例,实现让监控狗停止的功能,如下所示:
func main() {
var wg sync.WaitGroup
wg.Add(1)
ctx,stop:=context.WithCancel(context.Background())
go func() {
defer wg.Done()
watchDog(ctx,"【监控狗1】")
}()
time.Sleep(5 * time.Second) //先让监控狗监控5秒
stop() //发停止指令
wg.Wait()
}
func watchDog(ctx context.Context,name string) {
//开启for select循环,一直后台监控
for {
select {
case <-ctx.Done():
fmt.Println(name,"停止指令已收到,马上停止")
return
default:
fmt.Println(name,"正在监控……")
}
time.Sleep(1 * time.Second)
}
}
相比 select+channel 的方案,Context 方案主要有 4 个改动点。
- watchDog 的 stopCh 参数换成了 ctx,类型为 context.Context。
- 原来的 case <-stopCh 改为 case <-ctx.Done(),用于判断是否停止。
- 使用 context.WithCancel(context.Background()) 函数生成一个可以取消的 Context,用于发送停止指令。这里的 context.Background() 用于生成一个空 Context,一般作为整个 Context 树的根节点。
- 原来的 stopCh <- true 停止指令,改为 context.WithCancel 函数返回的取消函数 stop()。
2.2 定义
Context 是一个接口,它具备手动、定时、超时发出取消信号、传值等功能,主要用于控制多个协程之间的协作,尤其是取消操作。一旦取消指令下达,那么被 Context 跟踪的这些协程都会收到取消信号,就可以做清理和退出操作。
Context 接口只有四个方法,如下所示:
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
- Deadline 方法可以获取设置的截止时间,第一个返回值 deadline 是截止时间,到了这个时间点,Context 会自动发起取消请求,第二个返回值 ok 代表是否设置了截止时间。
- Done 方法返回一个只读的 channel,类型为 struct{}。在协程中,如果该方法返回的 chan 可以读取,则意味着 Context 已经发起了取消信号。通过 Done 方法收到这个信号后,就可以做清理操作,然后退出协程,释放资源。
- Err 方法返回取消的错误原因,即因为什么原因 Context 被取消。
- Value 方法获取该 Context 上绑定的值,是一个键值对,所以要通过一个 key 才可以获取对应的值。
Context 接口的四个方法中最常用的就是 Done 方法,它返回一个只读的 channel,用于接收取消信号。当 Context 取消的时候,会关闭这个只读 channel,也就等于发出了取消信号。
2.3 Context 树
Go 语言提供了函数可以帮助我们生成不同的 Context,通过这些函数可以生成一颗 Context 树,这样 Context 才可以关联起来,父 Context 发出取消信号的时候,子 Context 也会发出,这样就可以控制不同层级的协程退出。
从使用功能上分,有四种实现好的 Context。
- 空 Context:不可取消,没有截止时间,主要用于 Context 树的根节点。
- 可取消的 Context:用于发出取消信号,当取消的时候,它的子 Context 也会取消。
- 可定时取消的 Context:多了一个定时的功能。
- 值 Context:用于存储一个 key-value 键值对。
从下图 Context 的衍生树可以看到,最顶部的是空 Context,它作为整棵 Context 树的根节点,在 Go 语言中,可以通过 context.Background() 获取一个根节点 Context。
有了根节点 Context 后,这颗 Context 树要怎么生成呢?需要使用 Go 语言提供的四个函数。
- WithCancel(parent Context):生成一个可取消的 Context。
- WithDeadline(parent Context, d time.Time):生成一个可定时取消的 Context,参数 d 为定时取消的具体时间。
- WithTimeout(parent Context, timeout time.Duration):生成一个可超时取消的 Context,参数 timeout 用于设置多久后取消
- WithValue(parent Context, key, val interface{}):生成一个可携带 key-value 键值对的 Context。
以上四个生成 Context 的函数中,前三个都属于可取消的 Context,它们是一类函数,最后一个是值 Context,用于存储一个 key-value 键值对。
2.4 使用 Context 取消多个协程
在上例中增加两个监控狗,即增加两个协程,使得一个 Context 同时控制三个协程,一旦 Context 发出取消信号,这三个协程都会取消退出。
wg.Add(3)
go func() {
defer wg.Done()
watchDog(ctx,"【监控狗2】")
}()
go func() {
defer wg.Done()
watchDog(ctx,"【监控狗3】")
}()
以上示例中的 Context 没有子 Context,如果一个 Context 有子 Context,在该 Context 取消时,它的子 Context 也会被取消。
2.5 Context 传值
Context 不仅可以取消,还可以传值,通过这个能力,可以把 Context 存储的值供其他协程使用。
func main() {
wg.Add(4) //记得这里要改为4,原来是3,因为要多启动一个协程
//省略其他无关代码
valCtx:=context.WithValue(ctx,"userId",2)
go func() {
defer wg.Done()
getUser(valCtx)
}()
//省略其他无关代码
}
func getUser(ctx context.Context){
for {
select {
case <-ctx.Done():
fmt.Println("【获取用户】","协程退出")
return
default:
userId:=ctx.Value("userId")
fmt.Println("【获取用户】","用户ID为:",userId)
time.Sleep(1 * time.Second)
}
}
}
上述示例通过 context.WithValue 函数存储一个 userId 为 2 的键值对,就可以在 getUser 函数中通过 ctx.Value(“userId”) 方法把对应的值取出来,达到传值的目的。
2.6 Context 使用原则
在 Go 语言标准库中也使用了 Context ,比如 net/http 中使用 Context 取消网络的请求。
一些 Context 基本使用原则如下:
- Context 不要放在结构体中,要以参数的方式传递。
- Context 作为函数的参数时,要放在第一位,也就是第一个参数。
- 要使用 context.Background 函数生成根节点的 Context,也就是最顶层的 Context。
- Context 传值要传递必须的值,而且要尽可能地少,不要什么都传。
- Context 多协程安全,可以在多个协程中放心使用。
以上原则是规范类的,Go 语言的编译器并不会做这些检查,要靠自己遵守。
3. 实例:通过 Context 实现日志跟踪
要想跟踪一个用户的请求,必须有一个唯一的 ID 来标识这次请求调用了哪些函数、执行了哪些代码,然后通过这个唯一的 ID 把日志信息串联起来。这样就形成了一个日志轨迹,也就实现了用户的跟踪,于是思路就有了。
- 在用户请求的入口点生成 TraceID。
- 通过 context.WithValue 保存 TraceID。
- 然后这个保存着 TraceID 的 Context 就可以作为参数在各个协程或者函数间传递。
- 在需要记录日志的地方,通过 Context 的 Value 方法获取保存的 TraceID,然后把它和其他日志信息记录下来。
- 这样具备同样 TraceID 的日志就可以被串联起来,达到日志跟踪的目的。
以上思路实现的核心是 Context 的传值功能。
四、并发模式
并发模式和设计模式很相似,都是对现实场景的抽象封装,以便提供一个统一的解决方案。但和设计模式不同的是,并发模式更专注于异步和并发。
1. for select 循环模式
for select 循环模式一般和 channel 组合完成任务,代码格式如下:
for { //for无限循环,或者for range循环
select {
//通过一个channel控制
}
}
这是一种 for 循环 +select 多路复用的并发模式,哪个 case 满足就执行哪个,直到满足一定的条件退出 for 循环(比如发送退出信号)。
从具体实现上讲,for select 循环有两种模式:无限循环模式和 for range select 有限循环
1.1 无限循环模式
只有收到终止指令才会退出,如下所示:
for {
select {
case <-done:
return
default:
//执行具体的任务
}
}
这种模式会一直执行 default 语句中的任务,直到 done 这个 channel 被关闭为止。
1.2 for range select 有限循环
一般用于把可以迭代的内容发送到 channel 上,如下所示:
for _,s:=range []int{}{
select {
case <-done:
return
case resultCh <- s:
}
}
这种模式也会有一个 done channel,用于退出当前的 for 循环,而另外一个 resultCh channel 用于接收 for range 循环的值,这些值通过 resultCh 可以传送给其他的调用者。
2. select timeout 模式
假如需要访问服务器获取数据,因为网络的不同响应时间不一样,为保证程序的质量,不可能一直等待网络返回,所以需要设置一个超时时间,这时候就可以使用 select timeout 模式,如下所示:
func main() {
result := make(chan string)
go func() {
//模拟网络访问
time.Sleep(8 * time.Second)
result <- "服务端结果"
}()
select {
case v := <-result:
fmt.Println(v)
case <-time.After(5 * time.Second):
fmt.Println("网络访问超时了")
}
}
select timeout 模式的核心在于通过 time.After 函数设置一个超时时间,防止因为异常造成 select 语句的无限等待。
该场景也可以使用 Context 的 WithTimeout 函数超时取消。如果可以使用,要优先使用。
3. Pipeline 模式
Pipeline 模式也称为流水线模式。从技术上看,每一道工序的输出,就是下一道工序的输入,在工序之间传递的东西就是数据,这种模式就称为流水线模式,而传递的数据称为数据流。
以组装手机为例,假设一条组装手机的流水线有 3 道工序,分别是配件采购、配件组装、打包成品,如图所示:
从以上示意图中可以看到,采购的配件通过 channel 传递给工序 2 进行组装,然后再通过 channel 传递给工序 3 打包成品。相对工序 2 来说,工序 1 是生产者,工序 3 是消费者。相对工序 1 来说,工序 2 是消费者。相对工序 3 来说,工序 2 是生产者。
代码示例:
首先定义一个采购函数 buy,它有一个参数 n,可以设置要采购多少套配件。采购代码的实现逻辑是通过 for 循环产生配件,然后放到 channel 类型的变量 out 里,最后返回这个 out,调用者就可以从 out 中获得配件。
//工序1采购
func buy(n int) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for i := 1; i <= n; i++ {
out <- fmt.Sprint("配件", i)
}
}()
return out
}
组装函数 build 有一个 channel 类型的参数 in,用于接收配件进行组装,组装后的手机放到 channel 类型的变量 out 中返回。
//工序2组装
func build(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for c := range in {
out <- "组装(" + c + ")"
}
}()
return out
}
函数 pack 的代码实现和组装函数 build 基本相同。
//工序3打包
func pack(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for c := range in {
out <- "打包(" + c + ")"
}
}()
return out
}
流水线上的三道工序都完成后,就可以通过一个组织者把三道工序组织在一起,形成一条完整的手机组装流水线,这个组织者可以用 main 函数,如下面的代码所示:
func main() {
coms := buy(10) //采购10套配件
phones := build(coms) //组装10部手机
packs := pack(phones) //打包它们以便售卖
//输出测试,看看效果
for p := range packs {
fmt.Println(p)
}
}
输出结果为:
打包(组装(配件1))
打包(组装(配件2))
打包(组装(配件3))
打包(组装(配件4))
打包(组装(配件5))
打包(组装(配件6))
打包(组装(配件7))
打包(组装(配件8))
打包(组装(配件9))
打包(组装(配件10))
总结:
- 流水线由一道道工序构成,每道工序通过 channel 把数据传递到下一个工序;
- 每道工序一般会对应一个函数,函数里有协程和 channel,协程一般用于处理数据并把它放入 channel 中,整个函数会返回这个 channel 以供下一道工序使用;
- 最终要有一个组织者(示例中的 main 函数)把这些工序串起来,这样就形成了一个完整的流水线,对于数据来说就是数据流。
4. 扇出和扇入模式
上例中为了提升手机产能,组织者决定对工序 2 增加两班人手。人手增加后,整条流水线的示意图如下所示:
从改造后的流水线示意图可以看到,工序 2 共有工序 2-1、工序 2-2、工序 2-3 三班人手,工序 1 采购的配件会被工序 2 的三班人手同时组装,这三班人手组装好的手机会同时传给merge 组件汇聚,然后再传给工序 3 打包成品。在这个流程中,会产生两种模式:扇出和扇入。
- 示意图中红色的部分是扇出,对于工序 1 来说,它同时为工序 2 的三班人手传递数据(采购配件)。以工序 1 为中点,三条传递数据的线发散出去,就像一把打开的扇子一样,所以叫扇出。
- 示意图中蓝色的部分是扇入,对于 merge 组件来说,它同时接收工序 2 三班人手传递的数据(组装的手机)进行汇聚,然后传给工序 3。以 merge 组件为中点,三条传递数据的线汇聚到 merge 组件,也像一把打开的扇子一样,所以叫扇入。
扇出和扇入都像一把打开的扇子,因为数据传递的方向不同,所以叫法也不一样,扇出的数据流向是发散传递出去,是输出流;扇入的数据流向是汇聚进来,是输入流。
根据扇出扇入的原理,增加一个 merge 函数,如下面的代码所示:
//扇入函数(组件),把多个chanel中的数据发送到一个channel中
func merge(ins ...<-chan string) <-chan string {
var wg sync.WaitGroup
out := make(chan string)
//把一个channel中的数据发送到out中
p:=func(in <-chan string) {
defer wg.Done()
for c := range in {
out <- c
}
}
wg.Add(len(ins))
//扇入,需要启动多个goroutine用于处于多个channel中的数据
for _,cs:=range ins{
go p(cs)
}
//等待所有输入的数据ins处理完,再关闭输出out
go func() {
wg.Wait()
close(out)
}()
return out
}
新增的 merge 函数的核心逻辑就是对输入的每个 channel 使用单独的协程处理,并将每个协程处理的结果都发送到变量 out 中,达到扇入的目的。总结起来就是通过多个协程并发,把多个 channel 合成一个。
在整条手机组装流水线中,merge 函数非常小,而且和业务无关,所以可以把它叫作组件。该 merge 组件是可以复用的,流水线中的任何工序需要扇入的时候,都可以使用 merge 组件。
使用新增的 merge 函数,修改流水线的组织者 main 函数,如下所示:
func main() {
coms := buy(100) //采购100套配件
//三班人同时组装100部手机
phones1 := build(coms)
phones2 := build(coms)
phones3 := build(coms)
//汇聚三个channel成一个
phones := merge(phones1,phones2,phones3)
packs := pack(phones) //打包它们以便售卖
//输出测试,看看效果
for p := range packs {
fmt.Println(p)
}
}
这个示例采购了 100 套配件,同时调用三次 build 函数,也就是为工序 2 增加人手,这里是三班人手同时组装配件,然后通过 merge 函数这个可复用的组件将三个 channel 汇聚为一个,然后传给 pack 函数打包。
因为已经有了通用的扇入组件 merge,所以整条流水中任何需要扇出、扇入提高性能的工序,都可以复用 merge 组件做扇入,并且不用做任何修改。
5. Futures 模式
在实际需求中,有大量的任务之间相互独立、没有依赖,为了提高性能,这些独立的任务应该可以并发执行。
Futures 模式可以理解为未来模式,主协程不用等待子协程返回的结果,可以先去做其他事情,等未来需要子协程结果的时候再来取,如果子协程还没有返回结果,就一直等待。
以做火锅的场景为例,代码如下:
//洗菜
func washVegetables() <-chan string {
vegetables := make(chan string)
go func() {
time.Sleep(5 * time.Second)
vegetables <- "洗好的菜"
}()
return vegetables
}
//烧水
func boilWater() <-chan string {
water := make(chan string)
go func() {
time.Sleep(5 * time.Second)
water <- "烧开的水"
}()
return water
}
洗菜和烧水这两个相互独立的任务可以一起做,所以示例中通过开启协程的方式,实现同时做的功能。当任务完成后,结果会通过 channel 返回。
在启动两个子协程同时去洗菜和烧水的时候,主协程就可以去干点其他事情(示例中是眯一会),等睡醒了,要做火锅的时候,就需要洗好的菜和烧好的水这两个结果了。
func main() {
vegetablesCh := washVegetables() //洗菜
waterCh := boilWater() //烧水
fmt.Println("已经安排洗菜和烧水了,我先眯一会")
time.Sleep(2 * time.Second)
fmt.Println("要做火锅了,看看菜和水好了吗")
vegetables := <-vegetablesCh
water := <-waterCh
fmt.Println("准备好了,可以做火锅了:",vegetables,water)
}
Futures 模式下的协程和普通协程最大的区别是可以返回结果,而这个结果会在未来的某个时间点使用。所以在未来获取这个结果的操作必须是一个阻塞的操作,要一直等到获取结果为止。
如果一个大任务可以拆解为一个个独立并发执行的小任务,并且可以通过这些小任务的结果得出最终大任务的结果,就可以使用 Futures 模式。