并发控制
我们考虑这么一种场景,协程在A执行过程中需要创建子协程A1、A2、A3…An,协程创建完子协程后就等待子协程退出。
针对这种场景,Go提供了三种解决方案:
- Channel:使用channel控制子协程
- 优点:实现简单
- WaitGroup:使用信号量机制控制子协程
- 优点:子协程个数动态可调整
- Context:使用上下文控制子协程
- 优点:方便控制子协程派生出的孙子协程的控制
1. Channel
channel一般用于协程之间的通信,不过channel也可以用于并发控制。比如主协程启动N个子协程,主协程等待所有子协程退出后再继续后续流程。
场景示例:
下面的程序通过创建N个channel来管理N个协程,每个协程都有一个channel用于与父协程通信,父协程创建完所有协程后等待所有协程结束。
package main
import(
"time"
"fmt"
)
func Process(ch chan int) {
// Do some work...
time.Sleep(time.Second)
ch <- 1 //在管道中写入一个元素表示当前协程已结束
}
func main(){
// 创建一个10个元素的切片,元素类型为channel
channels := make([]chan int,10)
for i:=0;i<10;i++{
channels[i] = make(chan int)//在切片中放入一个channel
go Process(channels[i])//启动协程,传入一个管道用于通信
}
for i,ch := range channels{//遍历子协程结束
<-ch
fmt.Println("Routine",i," quit!")
}
}
小结
优点:实现简单
缺点:需要创建与协程数量相等的channel,且对于子孙协程的控制不方便
2.WaitGroup
WaitGroup可以理解为Wait-Goroutine-Group,即等待一组goroutine结束。比如某个goroutine需要等待其他几个goroutine全部完成,那么使用WaitGroup可以轻松实现。
场景示例:
func main(){
var wg sync.WaitGroup
wg.Add(2)//设置计数器,数值即goroutine的个数
go func(){
//Do some work
time.Sleep(1*time.Second)
fmt.Println("Goroutine 1 finished!")
wg.Done() //goroutine 执行结束后将计数器减1
}()
go func(){
//Do some work
time.Sleep(1*time.Second)
fmt.Println("Goroutine 2 finished!")
wg.Done() //goroutine 执行结束后将计数器减1
}()
wg.Wait() // 主goroutine阻塞等待计数器变为0
fmt.Println("All Goroutine finished!")
}
2.1 基础知识
信号量是Unix系统提供的一种共享保护资源的机制,用于防止多个线程同时访问某个资源
- 当信号量>0时,表示资源可用,获取信号量时系统自动将信号量减1
- 当信号量==0时,表示资源不可用,获取信号量时,当前线程会进入睡眠,当信号量为正时被唤醒
2.2 WaitGroup
2.2.1 数据结构
type WaitGroup struct{
state1 [3]uint32
}
state1 是一个长度为3的数组,包含state和一个信号量,而state实际上又分为两个计数器。
- counter:当前还未执行结束的goroutine的计数器
- waiter count:等待goroutine-group结束的goroutine数量,即又多少个等候者
- semaphore:信号量
WaitGroup对外暴露三个方法
- Add(delta int):将delta值加到counter中
- Wait():waiter递增1,并阻塞等待信号量semaphore
- Done():counter递减1,按照waiter数值释放相应次数信号量
2.2.2 Add(delta int)
Add方法做了两件事:
- 把delta值累加到counter中,因为delta可以为负值,也就是锁counter有可能变成0或负值
- 当counter变为0时,根据waiter数值释放等量信号量,把等待的goroutine全部唤醒,如果counter变为负值,则触发panic
2.2.3 Wait()
Wait方法做了两件事:
- 累加waiter
- 阻塞等待信号量
使用了CAS算法保证了多个goroutine同时执行Wait方法是也能正确累加waiter
2.2.4 Done()
Done方法只做一件事,即把counter减1,Done方法实际上调用的是Add(-1)。
3.Context
Go语言的context是应用开发常用的并发控制技术,对于派生goroutine有更强的控制力,可以控制多级的goroutine。
3.1 context的接口定义
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
基础的context接口只定义了4个方法
- Deadline:返回一个deadline和标识是否已设置deadline的bool值,如果没有设置deadline,则ok=false,此时deadline为一个初始值的time.Time值
- Done:该方法返回一个用来探测Context是否取消的channel,当Context取消时会自动将该channel关闭
- 对于不支持取消的Context,该方法可能会返回nil
- Err:返回context关闭的原因,若context还未关闭,返回nil
- Value:
- 适用于在协程中传递信息
- 根据key值查询map中的value
3.2 空context
context包中定义了一个空的context,名为emptyCtx,用于context的根节点。
同时定义了一个公用的emptyCtx全局变量,名为background,可以使用context.Background()获取
context包中提供了四个方法创建不同类型的context,使用这四个方法时如果没有父节点context,需要传入background作为其父节点。
- WithCancel()
- WithDeadline()
- WithTimeout()
- WithValue()
3.3 cancelCtx
type cancelCtx struct{
Context
mu sync.Mutex
done chan struct{}//lazily初始化
//记录由此context派生出的所有child
//此context被取消时会将其child全部cancel
children map[canceler]struct{}
err error
}
2.3.1 Done方法的实现
Done方法只需要返回一个channel接即可
源码如下:
func (c *cancelCtx) Done() <-chan struct{}{
c.mu.Lock()
if c.done == nil{
c.done = make(chan struct{})
}
d := c.done
c.mu.Lock()
return d
}
2.3.2 Err方法的实现
Err()只需要返回一个error告知context被关闭的原理。
cancelCtx.err默认是nil,在context被”cancel“时指定一个error变量:
var Canceled = errors.New("context canceled")
func(c *cancelCtx) Err() error{
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
2.3.3 cancel方法的实现
cancel方法是理解cancelCtx的关键,其作用是关闭自己和其后代。
伪代码如下:
func(c *cancelCtx) cancel(removeFromParent bool,err error){
c.mu.Lock()
c.err = err //设置一个error,说明关闭原因
close(c.done) //将channel关闭,依次通知派生的context
for child := range c.children{//遍历child,调用cancel方法
child.cancel(false,err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent{
//正常情况下,需要将自己从parent中删除
removeChild(c.Context, c)
}
}
2.3.4 WithCancel方法的实现
WithCancel()方法做了三件事
- 初始化一个cancelCtx实例
- 将cancelCtx实例添加到其父节点的children中(如果父节点也可以被cancel)
- 返回cancelCtx实例和cancel方法
其实现源码如下:
func WithCancel(parent Context)(ctx Context,cancel CancelFunc){
c:= newCancelCtx(parent)
//将自身添加到支持cancel的祖先节点中,如果都不支持,则启动一个协程等待父节点结束,然后把当前context结束。
propagateCancel(parent, &c)
return &c, func(){c.cancel(true, Canceled)}
}
2.3.5 使用案例
func HandelRequest(ctx context.Context) {
go WriteRedis(ctx)
go WriteDatabase(ctx)
for {
select {
case <-ctx.Done():
fmt.Println("HandelRequest Done.")
return
default:
fmt.Println("HandelRequest running")
time.Sleep(2 * time.Second)
}
}
}
func WriteRedis(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WriteRedis Done.")
return
default:
fmt.Println("WriteRedis running")
time.Sleep(2 * time.Second)
}
}
}
func WriteDatabase(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WriteDatabase Done.")
return
default:
fmt.Println("WriteDatabase running")
time.Sleep(2 * time.Second)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go HandelRequest(ctx)
time.Sleep(5 * time.Second)
fmt.Println("It's time to stop all sub goroutines")
cancel()
// Just for test whether sub goroutines exit or not
time.Sleep(5 * time.Second)
}
main协程创建context,并把context在各个子协程间传递,main协程在适当的时机可以"cancel"所有子协程。
3.4 timerCtx
源码如下:
type timerCtx struct{
cancelCtx
timer *time.Timer
deadline time.Time
}
timerCtx在cancel的基础上增加了deadline,用于标示自动cancel的最终时间,而timer就上触发自动cancel的定时器。
在此基础上衍生出了WithDeadline()和WithTimeout()
- deadline:指定最后期限
- timeout:指定最长存活时间
3.4.1 Deadline方法的实现
该方法仅仅返回timerCtx.deadline而已。
3.4.2 cancel方法的实现
cancel()方法基本继承了cancelCtx,只需要额外把timer关闭
timerCtx被关闭后,timerCtx.cancelCtx.err将存储关闭原因:
- 如果在deadline到来之前手动关闭,则关闭原因与cancelCtx显示一致
- 如果是到期自动关闭,原因为context deadline exceeded
3.4.3 WithDeadline方法的实现
- 初始化一个timerCtx实例
- 将timerCtx实例添加到其父节点的children中(如果父节点可以被“cancel”)
- 启动定时器,定时器到期后会自动"cancel"本context
- 返回timerCtx实例和cancel方法
3.4.4 WithTimeout方法的实现
WithTimeout()实际上调用了WithDeadline,两者的实现原理一致。
func WithTimeout(parent Context,timeout time.Duration)(Context,CancelFunc){
return WithDeadline(parent,time.Now().Add(timeout))
}
3.4.5 典型使用案例
func HandelRequest(ctx context.Context) {
go WriteRedis(ctx)
go WriteDatabase(ctx)
for {
select {
case <-ctx.Done():
fmt.Println("HandelRequest Done.")
return
default:
fmt.Println("HandelRequest running")
time.Sleep(2 * time.Second)
}
}
}
func WriteRedis(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WriteRedis Done.")
return
default:
fmt.Println("WriteRedis running")
time.Sleep(2 * time.Second)
}
}
}
func WriteDatabase(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WriteDatabase Done.")
return
default:
fmt.Println("WriteDatabase running")
time.Sleep(2 * time.Second)
}
}
}
func main() {
ctx, _ := context.WithTimeout(context.Background(),5*time.Second)
go HandelRequest(ctx)
time.Sleep(10 * time.Second)
}
3.5 valueCtx
源码如下:
type valueCtx struct{
Context
key,value interface{}
}
valueCtx只是在Context基础上增加了一个key-value对,用于在各级协程间传递一些数据,由于valueCtx不需要cancel,也不需要deadline,那么只需要实现Value接口即可。
3.5.1 Value方法的实现
当前context查询不到key时,会向父节点查找,如果查询不到最终返回interface{}
func(c *valueCtx) Value(key interface{}) interface{}{
if c.key == key{
return c.val
}
return c.Context.Value(key)
}
3.5.2 WithValue方法的实现
伪代码如下:
func WithValue(parent Context,key,val interface{})Context{
if key == nil {
panic("nil key")
}
return &valueCtx{parent,key,val}
}
3.5.3 典型使用案例
该案例中main()通过WithValue()方法获得一个context,需要指定一个父context、key和value。contextkey用来在父子协程中传递信息。由于父节点是一个Timeout类型的,当父节点结束后,子节点也会结束。
func HandelRequest(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("HandelRequest Done.")
return
default:
fmt.Println("HandelRequest running,parameter:",ctx.Value("parameter"))
time.Sleep(2 * time.Second)
}
}
}
func main() {
cancelCtx, _ := context.WithTimeout(context.Background(), 10*time.Second)
ctx := context.WithValue(cancelCtx, "parameter", "1")
go HandelRequest(ctx)
time.Sleep(15 * time.Second)
}
4. 小结
三种context实例可以互为父节点,从而可以组合成不同的应用形式。