当前go的各种源码中应该都可以看到context的使用,Context是golang 1.7的引入的核心结构,本质是为了处理go的并发控制问题。本文主要带大家深入理解context如何使用,为什么需要context和context设计原理。
并发控制问题
先来看下并发控制到底有什么问题要解决,立马能想到什么?如下
1.多个任务并行运行起来
2.控制任务的停止
3.控制任务的超时
多任务并行执行
首先看多个并行任务如何跑起来,经典实现利用WaitGroup,如下go中多个任务跑起来很简单,func+go即可快速定义协程任务,这里利用WaitGroup控制所有任务完成后退出主程序。
// 多个任务并行控制,等待所有任务完成
func TestTaskControl(t *testing.T) {
taskNum := 3
wg := sync.WaitGroup{}
wg.Add(taskNum)
for i := 0; i < taskNum; i++ {
go func(taskNo int) {
t.Logf("Task %d run\n", taskNo)
wg.Done()
}(i)
}
wg.Wait()
}
多任务取消/停止
那么问题来了——如何协程任务运行过程中,取消任务执行呢?在context出现之前,我们一般用两种方法,如下可以对比看
1.数据通道关闭
一般多任务执行时,我们通过channel分发任务,当检测到channel关闭时认为是收到了任务退出信号。由于channel退出是全局广播,所有下游任务都可以接到通知。如下,关闭data时协程任务会退出,简单的任务取消/停止可以使用这种方式。
func TestCancelControl(t *testing.T) {
data := make(chan int, 10)
go func(data chan int) {
for {
select {
case val, ok := <-data:
if !ok {
t.Logf("Channel closed !!!")
return
}
t.Logf("Revice data %d\n", val)
}
}
}(data)
go func() {
data <- 1
time.Sleep(1 * time.Second)
data <- 2
close(data)
}()
time.Sleep(10 * time.Second)
}
2.单独退出通道
和数据通道关闭类似,不同的是和传输数据不共用一个channel,对于复杂任务公用数据channel会带来复杂和不可控,不如单独引入一个退出channel专门接受退出消息,甚至可以复用这个channel做更多的任务控制动作。
如下,引入exit来执行退出监听,一旦exit channel关闭,多个协程任务都退出。
在引入context之前,主流的任务取消/停止就是这样处理,不是特别复杂的多任务控制目前很多地方也保留了这种方式。
func TestMixControl(t *testing.T) {
data := make(chan int, 10)
defer close(data)
exit := make(chan struct{})
taskNum := 3
wg := sync.WaitGroup{}
wg.Add(taskNum)
for i := 0; i < taskNum; i++ {
go func(taskNo int, data chan int, exit chan struct{}) {
defer wg.Done()
for {
select {
case val, ok := <-data:
if !ok {
t.Logf("Task %d channel closed !!!", taskNo)
return
}
t.Logf("Task %d revice data %d\n", taskNo, val)
case <-exit:
t.Logf("Task %d revice exit signal!\n", taskNo)
return
}
}
}(i, data, exit)
}
go func() {
data <- 1
data <- 2
data <- 3
time.Sleep(1 * time.Second)
data <- 4
data <- 5
data <- 6
close(exit)
}()
wg.Wait()
}
多任务超时控制
进一步,再思考一个问题,还是和前述逻辑一样,但是每个任务需要考虑超时,该如何实现呢?如下,和引入exit通道类似,只是引入一个超时time.After通知即可处理任务超时场景。
// 执行任务超时后退出
func TestTimeoutControl(t *testing.T) {
data := make(chan int, 10)
go func(data chan int) {
for {
select {
case val, ok := <-data:
if !ok {
t.Logf("Channel closed——revice exit signal !!!")
return
}
t.Logf("Revice data %d\n", val)
case <-time.After(2 * time.Second):
t.Log("Task time out, exit!\n")
return
}
}
}(data)
go func() {
data <- 1
time.Sleep(3 * time.Second)
data <- 2
}()
time.Sleep(10 * time.Second)
}
那新问题来了,既然channel可以处理这些问题,那么为什么还需要引入context呢?思考这个问题:如下是多个任务执行,每个任务一个协程,现在考虑如下几个目标
1.支持多级嵌套,父任务停止后,子任务自动停止
2.控制停止顺序,先停EFG 再停BCD 最后停A
目标1还好说,目标2好像就没那么灵活了,正式讨论context如何解决这些问题前,我们先看下常规context的使用
context定义和使用
context源码结构定义如下
type Context interface {
// 当 context 被取消或者到了 deadline,返回一个被关闭的 channel
Done() <-chan struct{}
// 在 channel Done 关闭后,返回 context 取消原因
Err() error
// 返回 context 是否会被取消以及自动取消时间(即 deadline)
Deadline() (deadline time.Time, ok bool)
// 获取 key 对应的 value
Value(key interface{}) interface{}
}
使用也很简单——定义好context时指定超时控制或者取消方法,在协程任务中监听ctx.Done通道,一旦超时或者取消则响应退出即可。如下
// 1.先定义context
ctx, cancel := context.WithCancel(context.Background()) // 取消/停止控制
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) // 取消/停止控制 + 超时控制
// 2.执行任务
go Stream(ctx, xxx)
// 3.任务中监听ctx.Done()
func Stream(ctx context.Context, out chan<- Value) error {
for {
// 具体任务
v, err := DoSomething(ctx)
if err != nil {
return err
}
// 检查完成通知
select {
case <-ctx.Done():
return ctx.Err()
case out <- v:
}
}
}
// 4.外部控制退出
cancel()
可以看到,这里使用context统一了之前任务停止和超时控制,
注意这里,ctx.Background 通常用在 main 函数中,作为所有 context 的根节点。
ctx.TODO 通常用在并不知道传递什么 context的情形。例如,调用一个需要传递 context 参数的函数,你手头并没有其他 context 可以传递,这时就可以传递 todo。
context多任务控制
context多任务取消/停止
先类比,之前的任务,实现如下
func TestContextCancelControl(t *testing.T) {
data := make(chan int, 10)
defer close(data)
ctx, cancel := context.WithCancel(context.Background())
taskNum := 3
wg := sync.WaitGroup{}
wg.Add(taskNum)
for i := 0; i < taskNum; i++ {
go func(taskNo int, data chan int, ctx context.Context) {
defer wg.Done()
for {
select {
case val, ok := <-data:
if !ok {
t.Logf("Task %d channel closed !!!", taskNo)
return
}
t.Logf("Task %d revice data %d\n", taskNo, val)
case <-ctx.Done():
t.Logf("Task %d revice exit signal!\n", taskNo)
return
}
}
}(i, data, ctx)
}
go func() {
data <- 1
data <- 2
data <- 3
time.Sleep(1 * time.Second)
data <- 4
data <- 5
data <- 6
cancel()
}()
wg.Wait()
}
context多任务超时
和上述任务一个套路,只是使用WithTimeout定义context,如下
func TestContextTimeoutControl(t *testing.T) {
data := make(chan int, 10)
defer close(data)
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
taskNum := 3
wg := sync.WaitGroup{}
wg.Add(taskNum)
for i := 0; i < taskNum; i++ {
go func(taskNo int, data chan int, ctx context.Context) {
defer wg.Done()
for {
select {
case val, ok := <-data:
if !ok {
t.Logf("Task %d channel closed !!!", taskNo)
return
}
t.Logf("Task %d revice data %d\n", taskNo, val)
case <-ctx.Done():
t.Logf("Task %d revice exit signal!\n", taskNo)
return
}
}
}(i, data, ctx)
}
go func() {
data <- 1
data <- 2
data <- 3
time.Sleep(1 * time.Second)
data <- 4
data <- 5
data <- 6
}()
wg.Wait()
}
context复杂多任务取消
这里看,我们之前提出的问题,先实现协程任务链如下
func TestContextMixCancelControl(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
type FUNC func(ctx context.Context)
runFunc := func(ctx context.Context, fname string, f FUNC) {
t.Logf("Task %s start!\n", fname)
f(ctx)
for {
select {
case <-ctx.Done():
t.Logf("Task %s revice exit signal!\n", fname)
return
}
}
}
go runFunc(ctx, "A", func(ctx context.Context) {
go runFunc(ctx, "B", func(ctx context.Context) {
go runFunc(ctx, "C", func(ctx context.Context) {
go runFunc(ctx, "D", func(ctx context.Context) {})
})
})
go runFunc(ctx, "E", func(ctx context.Context) {
go runFunc(ctx, "F", func(ctx context.Context) {
go runFunc(ctx, "G", func(ctx context.Context) {})
})
})
})
go func() {
time.Sleep(3 * time.Second)
cancel()
}()
time.Sleep(10 * time.Second)
}
执行,可以看到如下,任务执行是按照协程任务链顺序,但是退出是无序的,因为他们都等待同一个ctx.Done通道关系消息,响应是无序的。
context_test.go:141: Task A start! context_test.go:141: Task E start! context_test.go:141: Task F start! context_test.go:141: Task G start! context_test.go:141: Task B start! context_test.go:141: Task C start! context_test.go:141: Task D start! context_test.go:148: Task A revice exit signal! context_test.go:148: Task D revice exit signal! context_test.go:148: Task F revice exit signal! context_test.go:148: Task E revice exit signal! context_test.go:148: Task C revice exit signal! context_test.go:148: Task B revice exit signal! context_test.go:148: Task G revice exit signal!
那么,如何准确控制目标2——“控制停止顺序,先停EFG 再停BCD 最后停A”的退出执行呢,如下操作
func TestContextMixCancelControl2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
type FUNC func(ctx context.Context)
runFunc := func(ctx context.Context, fname string, f FUNC) {
t.Logf("Task %s start!\n", fname)
f(ctx)
for {
select {
case <-ctx.Done():
t.Logf("Task %s revice exit signal!\n", fname)
return
}
}
}
ctxb, cancelb := context.WithCancel(context.Background())
ctxe, cancele := context.WithCancel(context.Background())
go runFunc(ctx, "A", func(ctx context.Context) {
go runFunc(ctxb, "B", func(ctx context.Context) {
go runFunc(ctx, "C", func(ctx context.Context) {
go runFunc(ctx, "D", func(ctx context.Context) {})
})
})
go runFunc(ctxe, "E", func(ctx context.Context) {
go runFunc(ctx, "F", func(ctx context.Context) {
go runFunc(ctx, "G", func(ctx context.Context) {})
})
})
})
go func() {
time.Sleep(3 * time.Second)
cancele()
time.Sleep(3 * time.Second)
cancelb()
}()
time.Sleep(10 * time.Second)
}
然后执行
context_test.go:184: Task A start! context_test.go:184: Task E start! context_test.go:184: Task F start! context_test.go:184: Task G start! context_test.go:184: Task B start! context_test.go:184: Task C start! context_test.go:184: Task D start! context_test.go:191: Task E revice exit signal! context_test.go:191: Task G revice exit signal! context_test.go:191: Task F revice exit signal! context_test.go:191: Task B revice exit signal! context_test.go:191: Task D revice exit signal! context_test.go:191: Task C revice exit signal!
可以看到,通过增加Cancel点,我们可以精准的控制任务的退出,这就是context的复杂任务控制能力。
context原理简述
所以可以看到,引入context的意义在于
1.统一的任务执行/取消/超时控制模型
2.增强的任务取消/停止控制
除此之外,context还支持传入一些简单kv,用于任务参数定义,如下,不赘述
func TestContextValueControl(t *testing.T) {
ctx, cancel := context.WithCancel(context.WithValue(context.Background(), "testkey", "testvalue"))
taskNum := 1
wg := sync.WaitGroup{}
wg.Add(taskNum)
go func(ctx context.Context) {
defer wg.Done()
for {
select {
case <-ctx.Done():
t.Logf("Task revice exit signal, ctx value:%s!\n", ctx.Value("testkey"))
return
}
}
}(ctx)
go func() {
time.Sleep(3 * time.Second)
cancel()
}()
wg.Wait()
}
其实,写到这里,对比channel实现任务和context任务控制,我们也能自然看到context的基础原理,如下
简单来说,就如下几句话
1.创建context时创建一个退出通知通道,同时维持一个协程任务的关系树,如下示意图
树的根节点是backgroud和todo节点,也就是emptyCtx
background = new(emptyCtx)
todo = new(emptyCtx)
树的子节点是cancelCtx,每个子节点包括父节点指向Context和子节点map-children
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done atomic.Value // of chan struct{}, created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
2.执行WithCancel/WithTimeout时,更新协程任务的关系树
- 如果父节点已经退出,则遍历子节点退出
- 如果父节点没退出,创建监听协程,一旦父节点收到ctx.Done,子节点cancel
参考函数propagateCancel
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
...
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
// 遍历子节点退出
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
atomic.AddInt32(&goroutines, +1)
go func() {
select {
// 创建监听协程,一旦父节点收到ctx.Done,子节点cancel
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
3.执行cancel/timeout参数时,通知当前cancel对应的根任务和子任务退出
此时当前cancelCtx任务从整颗树上分离,父节点再退出时不会通知已经退出的树节点,参考cancel函数
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
...
// 当前cancel对应的根任务和子任务退出
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
// 当前cancelCtx任务从整颗树上分离
if removeFromParent {
removeChild(c.Context, c)
}
}
参考
https://zhuanlan.zhihu.com/p/68792989
https://zhuanlan.zhihu.com/p/110085652
演示代码地址 https://gitee.com/wenzhou1219/go-in-prod/tree/master/context