10分钟完全理解golang context

news2025/1/19 17:09:42

当前go的各种源码中应该都可以看到context的使用,Context是golang 1.7的引入的核心结构,本质是为了处理go的并发控制问题。本文主要带大家深入理解context如何使用,为什么需要context和context设计原理。

并发控制问题

先来看下并发控制到底有什么问题要解决,立马能想到什么?如下
1.多个任务并行运行起来
2.控制任务的停止
3.控制任务的超时

多任务并行执行

首先看多个并行任务如何跑起来,经典实现利用WaitGroup,如下go中多个任务跑起来很简单,func+go即可快速定义协程任务,这里利用WaitGroup控制所有任务完成后退出主程序。

// 多个任务并行控制,等待所有任务完成
func TestTaskControl(t *testing.T) {

    taskNum := 3

    wg := sync.WaitGroup{}
    wg.Add(taskNum)

    for i := 0; i < taskNum; i++ {
        go func(taskNo int) {
            t.Logf("Task %d run\n", taskNo)

            wg.Done()
        }(i)
    }

    wg.Wait()
}

多任务取消/停止

那么问题来了——如何协程任务运行过程中,取消任务执行呢?在context出现之前,我们一般用两种方法,如下可以对比看

1.数据通道关闭

一般多任务执行时,我们通过channel分发任务,当检测到channel关闭时认为是收到了任务退出信号。由于channel退出是全局广播,所有下游任务都可以接到通知。如下,关闭data时协程任务会退出,简单的任务取消/停止可以使用这种方式。

func TestCancelControl(t *testing.T) {
	data := make(chan int, 10)

	go func(data chan int) {
		for {
			select {
			case val, ok := <-data:
				if !ok {
					t.Logf("Channel closed !!!")
					return
				}

				t.Logf("Revice data %d\n", val)
			}
		}
	}(data)

	go func() {
		data <- 1
		time.Sleep(1 * time.Second)
		data <- 2

		close(data)
	}()

	time.Sleep(10 * time.Second)
}

2.单独退出通道

和数据通道关闭类似,不同的是和传输数据不共用一个channel,对于复杂任务公用数据channel会带来复杂和不可控,不如单独引入一个退出channel专门接受退出消息,甚至可以复用这个channel做更多的任务控制动作。
如下,引入exit来执行退出监听,一旦exit channel关闭,多个协程任务都退出。
在引入context之前,主流的任务取消/停止就是这样处理,不是特别复杂的多任务控制目前很多地方也保留了这种方式。

func TestMixControl(t *testing.T) {
	data := make(chan int, 10)
	defer close(data)
	exit := make(chan struct{})

	taskNum := 3

	wg := sync.WaitGroup{}
	wg.Add(taskNum)

	for i := 0; i < taskNum; i++ {
		go func(taskNo int, data chan int, exit chan struct{}) {
			defer wg.Done()

			for {
				select {
				case val, ok := <-data:
					if !ok {
						t.Logf("Task %d channel closed !!!", taskNo)
						return
					}

					t.Logf("Task %d  revice data %d\n", taskNo, val)

				case <-exit:
					t.Logf("Task %d  revice exit signal!\n", taskNo)
					return
				}
			}

		}(i, data, exit)
	}

	go func() {
		data <- 1
		data <- 2
		data <- 3
		time.Sleep(1 * time.Second)
		data <- 4
		data <- 5
		data <- 6

		close(exit)
	}()

	wg.Wait()
}

多任务超时控制

进一步,再思考一个问题,还是和前述逻辑一样,但是每个任务需要考虑超时,该如何实现呢?如下,和引入exit通道类似,只是引入一个超时time.After通知即可处理任务超时场景。

// 执行任务超时后退出
func TestTimeoutControl(t *testing.T) {
	data := make(chan int, 10)

	go func(data chan int) {
		for {
			select {
			case val, ok := <-data:
				if !ok {
					t.Logf("Channel closed——revice exit signal !!!")
					return
				}

				t.Logf("Revice data %d\n", val)

			case <-time.After(2 * time.Second):
				t.Log("Task time out, exit!\n")
				return
			}
		}
	}(data)

	go func() {
		data <- 1
		time.Sleep(3 * time.Second)
		data <- 2
	}()

	time.Sleep(10 * time.Second)
}

那新问题来了,既然channel可以处理这些问题,那么为什么还需要引入context呢?思考这个问题:如下是多个任务执行,每个任务一个协程,现在考虑如下几个目标
1.支持多级嵌套,父任务停止后,子任务自动停止
2.控制停止顺序,先停EFG 再停BCD 最后停A
image.png
目标1还好说,目标2好像就没那么灵活了,正式讨论context如何解决这些问题前,我们先看下常规context的使用

context定义和使用

context源码结构定义如下

type Context interface {
    // 当 context 被取消或者到了 deadline,返回一个被关闭的 channel
    Done() <-chan struct{}

    // 在 channel Done 关闭后,返回 context 取消原因
    Err() error

    // 返回 context 是否会被取消以及自动取消时间(即 deadline)
    Deadline() (deadline time.Time, ok bool)

    // 获取 key 对应的 value
    Value(key interface{}) interface{}
}

使用也很简单——定义好context时指定超时控制或者取消方法,在协程任务中监听ctx.Done通道,一旦超时或者取消则响应退出即可。如下

// 1.先定义context
ctx, cancel := context.WithCancel(context.Background())  // 取消/停止控制
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)  // 取消/停止控制 + 超时控制

// 2.执行任务
go Stream(ctx, xxx)

// 3.任务中监听ctx.Done()
func Stream(ctx context.Context, out chan<- Value) error {
	for {
		// 具体任务
		v, err := DoSomething(ctx)
		if err != nil {
			return err
		}
		
		// 检查完成通知
		select {
		case <-ctx.Done():
			return ctx.Err()
		case out <- v:
		}
	}
 }

 // 4.外部控制退出
 cancel()

可以看到,这里使用context统一了之前任务停止和超时控制,
注意这里,ctx.Background 通常用在 main 函数中,作为所有 context 的根节点。
ctx.TODO 通常用在并不知道传递什么 context的情形。例如,调用一个需要传递 context 参数的函数,你手头并没有其他 context 可以传递,这时就可以传递 todo。

context多任务控制

context多任务取消/停止

先类比,之前的任务,实现如下

func TestContextCancelControl(t *testing.T) {
	data := make(chan int, 10)
	defer close(data)

	ctx, cancel := context.WithCancel(context.Background())

	taskNum := 3

	wg := sync.WaitGroup{}
	wg.Add(taskNum)

	for i := 0; i < taskNum; i++ {
		go func(taskNo int, data chan int, ctx context.Context) {
			defer wg.Done()

			for {
				select {
				case val, ok := <-data:
					if !ok {
						t.Logf("Task %d channel closed !!!", taskNo)
						return
					}

					t.Logf("Task %d  revice data %d\n", taskNo, val)

				case <-ctx.Done():
					t.Logf("Task %d  revice exit signal!\n", taskNo)
					return
				}
			}

		}(i, data, ctx)
	}

	go func() {
		data <- 1
		data <- 2
		data <- 3
		time.Sleep(1 * time.Second)
		data <- 4
		data <- 5
		data <- 6

		cancel()
	}()

	wg.Wait()
}

context多任务超时

和上述任务一个套路,只是使用WithTimeout定义context,如下

func TestContextTimeoutControl(t *testing.T) {
	data := make(chan int, 10)
	defer close(data)

	ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)

	taskNum := 3

	wg := sync.WaitGroup{}
	wg.Add(taskNum)

	for i := 0; i < taskNum; i++ {
		go func(taskNo int, data chan int, ctx context.Context) {
			defer wg.Done()

			for {
				select {
				case val, ok := <-data:
					if !ok {
						t.Logf("Task %d channel closed !!!", taskNo)
						return
					}

					t.Logf("Task %d  revice data %d\n", taskNo, val)

				case <-ctx.Done():
					t.Logf("Task %d  revice exit signal!\n", taskNo)
					return
				}
			}

		}(i, data, ctx)
	}

	go func() {
		data <- 1
		data <- 2
		data <- 3
		time.Sleep(1 * time.Second)
		data <- 4
		data <- 5
		data <- 6
	}()

	wg.Wait()
}

context复杂多任务取消

这里看,我们之前提出的问题,先实现协程任务链如下

func TestContextMixCancelControl(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())

	type FUNC func(ctx context.Context)
	runFunc := func(ctx context.Context, fname string, f FUNC) {

		t.Logf("Task %s start!\n", fname)
		f(ctx)

		for {
			select {

			case <-ctx.Done():
				t.Logf("Task %s  revice exit signal!\n", fname)
				return
			}
		}

	}

	go runFunc(ctx, "A", func(ctx context.Context) {
		go runFunc(ctx, "B", func(ctx context.Context) {
			go runFunc(ctx, "C", func(ctx context.Context) {
				go runFunc(ctx, "D", func(ctx context.Context) {})
			})
		})

		go runFunc(ctx, "E", func(ctx context.Context) {
			go runFunc(ctx, "F", func(ctx context.Context) {
				go runFunc(ctx, "G", func(ctx context.Context) {})
			})
		})
	})

	go func() {
		time.Sleep(3 * time.Second)
		cancel()
	}()

	time.Sleep(10 * time.Second)
}

执行,可以看到如下,任务执行是按照协程任务链顺序,但是退出是无序的,因为他们都等待同一个ctx.Done通道关系消息,响应是无序的。

context_test.go:141: Task A start!
context_test.go:141: Task E start!
context_test.go:141: Task F start!
context_test.go:141: Task G start!
context_test.go:141: Task B start!
context_test.go:141: Task C start!
context_test.go:141: Task D start!
context_test.go:148: Task A  revice exit signal!
context_test.go:148: Task D  revice exit signal!
context_test.go:148: Task F  revice exit signal!
context_test.go:148: Task E  revice exit signal!
context_test.go:148: Task C  revice exit signal!
context_test.go:148: Task B  revice exit signal!
context_test.go:148: Task G  revice exit signal!

那么,如何准确控制目标2——“控制停止顺序,先停EFG 再停BCD 最后停A”的退出执行呢,如下操作

func TestContextMixCancelControl2(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	type FUNC func(ctx context.Context)
	runFunc := func(ctx context.Context, fname string, f FUNC) {

		t.Logf("Task %s start!\n", fname)
		f(ctx)

		for {
			select {

			case <-ctx.Done():
				t.Logf("Task %s  revice exit signal!\n", fname)
				return
			}
		}

	}

	ctxb, cancelb := context.WithCancel(context.Background())
	ctxe, cancele := context.WithCancel(context.Background())

	go runFunc(ctx, "A", func(ctx context.Context) {

		go runFunc(ctxb, "B", func(ctx context.Context) {
			go runFunc(ctx, "C", func(ctx context.Context) {
				go runFunc(ctx, "D", func(ctx context.Context) {})
			})
		})

		go runFunc(ctxe, "E", func(ctx context.Context) {
			go runFunc(ctx, "F", func(ctx context.Context) {
				go runFunc(ctx, "G", func(ctx context.Context) {})
			})
		})
	})

	go func() {
		time.Sleep(3 * time.Second)
		cancele()
		time.Sleep(3 * time.Second)
		cancelb()
	}()

	time.Sleep(10 * time.Second)
}

然后执行

context_test.go:184: Task A start!
context_test.go:184: Task E start!
context_test.go:184: Task F start!
context_test.go:184: Task G start!
context_test.go:184: Task B start!
context_test.go:184: Task C start!
context_test.go:184: Task D start!
context_test.go:191: Task E  revice exit signal!
context_test.go:191: Task G  revice exit signal!
context_test.go:191: Task F  revice exit signal!
context_test.go:191: Task B  revice exit signal!
context_test.go:191: Task D  revice exit signal!
context_test.go:191: Task C  revice exit signal!

可以看到,通过增加Cancel点,我们可以精准的控制任务的退出,这就是context的复杂任务控制能力。
image.png

context原理简述

所以可以看到,引入context的意义在于
1.统一的任务执行/取消/超时控制模型
2.增强的任务取消/停止控制
除此之外,context还支持传入一些简单kv,用于任务参数定义,如下,不赘述

func TestContextValueControl(t *testing.T) {

	ctx, cancel := context.WithCancel(context.WithValue(context.Background(), "testkey", "testvalue"))

	taskNum := 1
	wg := sync.WaitGroup{}
	wg.Add(taskNum)

	go func(ctx context.Context) {
		defer wg.Done()

		for {
			select {
			case <-ctx.Done():
				t.Logf("Task revice exit signal, ctx value:%s!\n", ctx.Value("testkey"))
				return
			}
		}

	}(ctx)

	go func() {
		time.Sleep(3 * time.Second)
		cancel()
	}()

	wg.Wait()
}

其实,写到这里,对比channel实现任务和context任务控制,我们也能自然看到context的基础原理,如下

简单来说,就如下几句话
1.创建context时创建一个退出通知通道,同时维持一个协程任务的关系树,如下示意图
image.png
树的根节点是backgroud和todo节点,也就是emptyCtx

	background = new(emptyCtx)
	todo       = new(emptyCtx)

树的子节点是cancelCtx,每个子节点包括父节点指向Context和子节点map-children

type cancelCtx struct {
	Context

	mu       sync.Mutex            // protects following fields
	done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}

2.执行WithCancel/WithTimeout时,更新协程任务的关系树

  • 如果父节点已经退出,则遍历子节点退出
  • 如果父节点没退出,创建监听协程,一旦父节点收到ctx.Done,子节点cancel

参考函数propagateCancel

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
	...

	if p, ok := parentCancelCtx(parent); ok {
		p.mu.Lock()
		if p.err != nil {
			// parent has already been canceled
            // 遍历子节点退出
			child.cancel(false, p.err)
		} else {
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			p.children[child] = struct{}{}
		}
		p.mu.Unlock()
	} else {
		atomic.AddInt32(&goroutines, +1)
		go func() {
			select {
            // 创建监听协程,一旦父节点收到ctx.Done,子节点cancel
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
	}
}

3.执行cancel/timeout参数时,通知当前cancel对应的根任务和子任务退出
此时当前cancelCtx任务从整颗树上分离,父节点再退出时不会通知已经退出的树节点,参考cancel函数

// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
   ...
   // 当前cancel对应的根任务和子任务退出
   for child := range c.children {
      // NOTE: acquiring the child's lock while holding parent's lock.
      child.cancel(false, err)
   }
   c.children = nil
   c.mu.Unlock()

   // 当前cancelCtx任务从整颗树上分离
   if removeFromParent {
      removeChild(c.Context, c)
   }
}

参考

https://zhuanlan.zhihu.com/p/68792989
https://zhuanlan.zhihu.com/p/110085652

演示代码地址 https://gitee.com/wenzhou1219/go-in-prod/tree/master/context

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/150638.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity 3D 使用高度图创建地形|| Unity 3D 使用笔刷绘制地形

Unity 3D 使用高度图创建地形 在 Unity 3D 中编辑地形有两种方法&#xff1a; 通过地形编辑器编辑地形。通过导入一幅预先渲染好的灰度图来快速地为地形建模。 地形上每个点的高度被表示为一个矩阵中的一列值。这个矩阵可以用一个被称为高度图&#xff08;heightmap&#xff0…

Win11关闭Windows Defender实时保护,暂时关闭和永久关闭方法 | Win10怎么永久关闭Windows Defender实时保护

文章目录1. 按2. 暂时关闭Windows Defender实时保护3. 永久关闭实时保护3.1. 方法一&#xff1a;改组策略&#xff08;Windows11实测可以&#xff09;3.2. 方法二&#xff1a;改注册表&#xff08;Windows11实测不行&#xff09;1. 按 开启Windows Defender实时保护有时候会导…

HJ2 计算某字符出现次数

HJ2 计算某字符出现次数1 题目2 解法2.1 count_if 本题代码2.1.1 C STL非更易型算法--count_if介绍2.1.2 C中cin(),cin.get(),cin.getline(),getline()总结&#xff1a;2.2 一般做法3 【扩展】C STL--非更易型算法1 题目 题源链接 描述 写出一个程序&#xff0c;接受一个由字…

【HTTP】浏览器缓存(HTTP缓存)

文章目录一、强制缓存1.1、ExPires1.2、Cache-Control二、协商缓存2.1、last-modified2.2、etag浏览器缓存&#xff08;Browser Caching&#xff09;是为了节约网络的资源加速浏览&#xff0c;浏览器在用户磁盘上对最近请求过的文档进行存储&#xff0c;当访问者再次请求这个页…

在国内 PMP 有多少含金量?

PMP 证书已经在全球206个国家和地区得到认可&#xff0c;据 PMI 官方数据统计&#xff0c;截至2021&#xff0c;全球持有效 PMP 证书人数达110 W&#xff0c;国内占比28.98%&#xff0c;超33 W人次。 第一&#xff0c;PMP证书有什么价值&#xff1f; 01.PMP认证的重要性 PMP是…

Unreal UPROPERTY属性标记宏

BlueprintReadOnly,让该变量可在蓝图中访问。新建一个继承AActor的C类CustomActor,添加int变量TestProperty,并给他加上BlueprintReadOnly标记:这样,就能在蓝图中引用该变量。BlueprintReadWrite,让该变量可以在蓝图中使用以及修改。给TestProperty变量添加BlueprintReadWrite标…

【云原生进阶之容器】第二章Controller Manager原理2.7节--Indexer剖析

7 Indexer Indexer是client-go用来存储资源对象并自带索引功能的本地存储,Reflector从DeltaFIFO中将消费出来的资源对象存储至Indexer。Indexer中的数据与Etcd集群中的数据保持完全一致。client-go可以很方便地从本地存储中读取相应的资源对象数据,而无须每次都从远程Etcd集群…

复现yolov5CPP经验贴

源码&#xff1a; https://github.com/Hexmagic/ONNX-yolov5/blob/master/src/test.cpp 该源码亲测可行&#xff0c;但是还是有一些问题 此处改成False 此处改成自己转换好的onnx模型路径 用神经网络工具&#xff1a; https://github.com/lutzroeder/netron 用该工具可查看输入…

【UE4 第一人称射击游戏】24-添加人工智能的敌人跟随功能

上一篇&#xff1a;【UE4 第一人称射击游戏】23-添加子弹伤害本篇效果&#xff1a;步骤&#xff1a;将 导航网格体边界体积 拖入视口按P键显示区域将导航区域扩大一些如果不想让导航体覆盖上面的区域可以将导航体的高度降低一些打开“SimpleAI”&#xff0c;添加一个“Pawn感应…

Java Type

Type 是Java 编程语言中所有类型的公共高级接口&#xff08;官方解释&#xff09;&#xff0c;也就是Java 中所有类型的”爹“。其中”所有类型“的描述尤为指的关注。它并不是我们平常工作中经常使用的int、String、List、Map等数据类型&#xff0c;而是从Java语言角度磊说&am…

shell练习之安全脚本

题目&#xff1a; 将密码输入错误超过4次的IP地址通过firewalld防火墙阻止访问 1.初始配置 首先使用systemctl工具启用firewalld服务&#xff1a; ​[rootlocalhost ~]# systemctl enable firewalld如果已经启用了&#xff0c;我们现在可以通过执行以下命令启动firewalld&a…

失败就是差一点的成功,社科院与杜兰大学金融管理硕士项目为你在职读研助力

失败的人&#xff0c;只差了一点点&#xff1b;成功的人&#xff0c;是多做了一点点&#xff1b;顶尖的人&#xff0c;则是再多做一点点。小事成就大事&#xff0c;细节成就完美&#xff0c;所以&#xff0c;千万不要只差那么一点&#xff0c;就放弃了。都说失败是成功之母&…

从Reactor模式俯瞰Nginx,你会发现你与高手的差距就在设计模式上

我们知道了Nginx是做什么的以及它为何如此高效&#xff0c;以至于全宇宙拿它来做负载均衡或者说web server。 但是如果你只是了解了使用和知道了原理就认为已经掌握了它&#xff0c;那只能说你肤浅了&#xff0c;原理和使用技能看看大家都知道了&#xff0c;没必要拿出去和别人…

快排递归、迭代的实现和两种优化方法

目录 快速排序 实现代码 时间复杂度 快排的优化 随机选择策略 三位取中法 非递归的快排 快速排序 快速排序算法是基于分治策略的一个排序算法&#xff0c;其基本思想是对于输入的子数组进行分解、递归求解&#xff0c;最后合并。 分解&#xff1a;以数组中左边第一个数作…

运行flutter doctor命令检测环境是否配置成功报错及解决方案

/** 运行flutter doctor命令检测环境是否配置成功&#xff0c;报如下错误**/ 1. cmdline-tools component is missing & Android licenses status unknown 1.1.安装cmdline-tools 1.2.配置android-licenses 运行命令flutter doctor --android-licenses&#xff0c;提示…

封装一个帧动画组件,使用的是精灵图

我写的是淘宝小部件&#xff0c;限制很多&#xff0c;用的是精灵图&#xff0c;说下大概思路&#xff0c;主要是通过背景图片的X Y轴去控制&#xff0c;首先创建一个组件 例&#xff1a; 然后在props定义需要的参数&#xff0c;可通过父组件传递修改 需要传入精灵图地址、单…

【云原生】Prometheus监控docker容器

部署node-exporter用于搜集硬件和系统信息 // 全部主机都要做 docker run -d -p 9100:9100 -v /proc:/host/proc -v /sys:/host/sys -v /:/rootfs --nethost prom/node-exporter --path.procfs /host/proc --path.sysfs /host/sys --collector.filesystem.ignored-mount-point…

Windows系统pagefile.sys删除、移动

背景 在使用windows系统中通常会发现c盘系统盘容量和实际容量不符。以至于你以为还有几十个G的空间&#xff0c;但操作程序时会出现空间不足的情况 。 例如以下错误&#xff1a; # There is insufficient memory for the Java Runtime Environment to continue. # Native memo…

【六】Netty Google Protobuf 编解码

Netty Google Protobuf 编解码Google Protobuf 介绍Protobuf 的入门Protobuf 开发环境搭建Protobuf 下载创建.proto文件第五节的 对应实体&#xff08;SubscribeReq&#xff0c;SubscribeResp &#xff09;SubscribeReq.proto 文件SubscribeResp.proto利用命令生成对应的java文…

详解c++---string模拟实现

这里写目录标题前言准备工作构造函数析构函数迭代器的实现插入数据有关的函数实现reservepush_backoperatorappendinserterasefindresize[ ]clear>>>>新式拷贝构造函数新式赋值重载前言 在前面的文章里我们学习了c中string的用法&#xff0c;那么这篇文章我们将带…