Go 语言中 Context 的作用和使用方法详解

news2024/9/22 21:12:53

KDP(数据服务平台)是一款由 KaiwuDB 独立自主研发的数据服务产品,以 KaiwuDB 为核心,面向 AIoT 场景打造的一站式数据服务平台,满足工业物联网、数字能源、车联网、智慧产业等行业核心业务场景下数据采集、处理、计算、分析、应用的综合业务需求,实现“业务即数据,数据即服务”,助力企业从数据中挖掘更大的商业价值。

在开发数据服务平台的实时计算组件时,可能会遇到这样的问题:实时计算组件向用户提供自定义规则功能,用户注册多个规则运行一段时间后,再修改规则的定义并重新启动,会发生协程泄露。

一、真实案例

本文将用伪代码全面介绍开发数据服务平台的实时计算组件过程中,可能遇到的协程泄露问题。

//规则的大致数据结构
type DummyRule struct{
    BaseRule    
    sorce []Source    
    sink  []Sink    
    //flow map key:flow 名称,value:flow 实例    
    flow map[string]Flow    
    ...
}

上述 DummyRule 是本次示例的规则数据结构,它包含多个数据来源 Source,多个数据目标 Sink,及数据流 Flow。规则具体过程如下图:

1 和 2 是两个源,首先分别用加法处理 1 和 2 两个源;其次调用 Merge 操作合成一个流;接着进行 Fanout 操作,生成两个相同的流,分别流入7 和 8;最终经过 7 和 8 的数字类型转成字符串,分别写入到 out1.txt 和 out2.txt 文件中。

type Source struct{
  consumers       []file.reader  
  out             chan interface{}  
  ctx             context.Context  
  cancel          context.CancelFunc  
  ...
}

上图是 Source 类数据源的伪代码,consumers 是用来读取文件数据的读取器,out 是用来传递给下一个数据流的通道,ctx 是 Go 的上下文。consumers 读取文件数据是一个单独的协程,读取的数据将放入 out 中,等待下一个数据流的消费。

type Sink struct{
   producers  []file.writer   
   in         chan interface{}   
   ctx        context.Context   
   cancel context.CancelFunc   
   ...
}

上图是 Sink 类数据目标的伪代码,producers 是用来写文件的写入器,in 是用来接受上一个数据流的通道,ctx 是 Go 的上下文,producers 写文件数据也是一个单独的协程。

func(fm FlatMap) Via(flow streams.Flow) streams.Flow{
    go fm.transmit(flow)
    return flow
}

上图是数据流传递的源码。FlatMap 的用法是 curFlow := prevFlow.Via(nextFlow),这样可以把前一个 Flow 传递给下一个 Flow,可以看到一次数据流传递过程是在一个协程中进行的。

从前面源码可知,这个示例规则至少存在 10 个协程,但实际上,要比 10 个协程多得多。可见,在数据服务平台的实时计算组件中,协程管理是十分复杂的。

使用 go pprof,top,go traces 等工具进行反复测试和排查后,我们才发现协程泄露是由于规则中 Sink 的 Context 未被正确取消导致。

Context 是管理 goroutine 重要的语言特性。学会正确使用 Context,可以更好地厘清 goroutine 的关系并加以管理。从上述实例可以看出 Context 的重要性,学会正确使用 Context,不仅可以提高代码质量,更可以避免大量的协程泄露排查工作。

二、走进 Context

1.介绍

Context 通常被称为上下文,在 Go 语言中,理解为 goroutine 的运行状态、现场,存在上下层 goroutine Context 的传递,上层 goroutine 会把 Context 传递给下层 goroutine。

每个 goroutine 在运行前,都要事先知道程序当前的执行状态,通常将这些状态封装在一个 Context 变量中,传递给要执行的 goroutine。

在网络编程中,当接收到一个网络请求 Request,处理 Request 时,可能会在多个 goroutine 中处理。而这些 goroutine 可能需要共享 Request 的一些信息,当 Request 被取消或者超时,所有从这个 Request 创建的 goroutine 也要被结束。

Go Context 包不仅实现了在程序单元之间共享状态变量的方法,同时能通过简单的方法,在被调用程序单元外部,通过设置 ctx 变量的值,将过期或撤销等信号传递给被调用的程序单元。

在网络编程中,如果存在 A 调用 B 的 API,B 调用 C 的  API,那么假如 A 调用 B 取消,那么 B 调用 C 也应该被取消。通过 Context 包,可以非常方便地在请求 goroutine 之间传递请求数据、取消信号和超时信息。

Context 包的核心时 Context 接口:

// A Context carries a deadline, a cancellation signal, and other values across
// API boundaries
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context interface{     
     // 返回一个超时时间,到期则取消context。在代码中,可以通过deadline为io操作设置超过时间     
     Deadline() (deadline time.Time, ok bool)     
     // 返回一个channel,用于接收context的取消或者deadline信号。     
     // 当channel关闭,监听done信号的函数会立即放弃当前正在执行的操作并返回。     
     // 如果context实例时不可能取消的,那么     
     // 返回nil,比如空context,valueCtx     
     Done()
}

2.使用方法

对于 goroutine,他们的创建和调用关系总是像层层调用进行的,就像一个树状结构,而更靠顶部的 Context 应该有办法主动关闭下属的 goroutine 的执行。为了实现这种关系,Context 也是一个树状结构,叶子节点总是由根节点衍生出来的。

要创建 Context 树,第一步应该得到根节点,Context.Backupgroup 函数的返回值就是根节点。

func Background() Context{
    return background
}

该函数返回空的 Context,该 Context 一般由接收请求的第一个 goroutine 创建,是与进入请求对应的 Context 根节点,他不能被取消,也没有值,也没有过期时间。他常常作为处理 Request 的顶层的 Context 存在。

有了根节点,就可以创建子孙节点了,Context 包提供了一系列方法来创建他们:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {}
func WithDeadline(parent Context, d time.Time)(Context, CancelFunc) {}
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {}
func WithValue(parent Context, key, val interface{}) Context {}

函数都接收一个 Context 类型的 parent,并返回一个 Context 类型的值,这样就层层创建除不同的 Context,子节点是从复制父节点得到,并且根据接收参数设定子节点的一些状态值,接着就可以将子节点传递给下层的 goroutine 了。

怎么通过 Context 传递改变后的状态呢?

在父 goroutine 中可以通过 Withxx 方法获取一个 cancel 方法,从而获得了操作子 Context 的权力。

(1)WithCancel

 WithCancel 函数,是将父节点复制到子节点,并且返回一个额外的 CancelFunc 函数类型变量,该函数类型的定义为:type CancelFunc func()

调用 CancelFunc 将撤销对应的子 Context 对象。在父 goroutine 中,通过 WithCancel 可以创建子节点的  Context,还获得了子 goroutine 的控制权,一旦执行了  CancelFunc 函数,子节点 Context 就结束了,子节点需要如下代码来判断是否已经结束,并退出 goroutine:

select {
case <- ctx.Cone():
    fmt.Println("do some clean work ...... ")
}

(2)WithDeadline

 WithDeadline 函数作用和 WithCancel 差不多,也是将父节点复制到子节点,但是其过期时间是由 deadline 和 parent 的过期时间共同决定。当 parent 的过期时间早于 deadline 时,返回的过期时间与 parent 的过期时间相同。父节点过期时,所有的子孙节点必须同时关闭。

(3)WithTimeout

WithTimeout 函数和 WithDeadline 类似,只不过,他传入的是从现在开始 Context 剩余的生命时长。他们都同样也都返回了所创建的子 Context 的控制权,一个 CancelFunc 类型的函数变量。

当顶层的 Request 请求函数结束时,我们可以 cancel 掉某个 Context,而子孙的 goroutine 根据 select ctx.Done()来判断结束。

(4)Withvalue

WithValue 函数,返回 parent 的一个副本,调用该副本的 Value(key) 方法将得到 value。这样,我们不仅将根节点原有的值保留了, 还在子孙节点中加入了新的值;注意如果存在 key 相同,则会覆盖。

3.例子

package main
import (
        "context"        
        "fmt"        
        "time"
)
func main() {
        ctxWithCancel, cancel := context.WithTimeout(context.Background(), 5 * time.Second)                
        
        go worker(ctxWithCancel, "[1]")        
        go worker(ctxWithCancel, "[2]")                
        
        go manager(cancel)                
        
        <-ctxWithCancel.Done()        
        // 暂停1秒便于协程的打印输出        
        time.Sleep(1 * time.Second)        
        fmt.Println("example closed")
}
func manager(cancel func( )) {
        time.Sleep(10 * time.Second)         
        fmt.Println("manager called cancel()")         
        cancel() 
}                
func worker(ctxWithCancle context.Context, name string) {
        for {
                 select {                 
                 case <- ctxWithCancel.Done():                          
                          fmt.Println(name, "return for ctxWithCancel.Done()")                          
                          return                 
                 default:
                          fmt.Println(name, "working")                 
                          }                 
                          time.Sleep(1 * time.Second)        
        }
}

这个过程的 Context 的架构图:

[1]working
[2]working
[2]working
[1]working
[1]working
[2]working
[2]working
[1]working
[1]working
[2]working
[1]return for ctxWithCancel.Done()
[2]return for ctxWithCancel.Done()example closed

可见,这次 worker 结束是因为 ctxWithCancel 的计时器到点引起的。

把 manager 时长改成 2 秒,WithTimeout 时长不变,再运行一次,worker 只工作了 2 秒就被 manager 提前叫停了。

[1]working
[2]working
[2]working
[1]workingmanager called cancel()
[1]return for ctxWithCancel.Done()
[2]return for ctxWithCancel.Done()example closed

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/690205.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在Azure SQL DB/Azure托管实例里快速查询各数据库大小以及每个数据库下表的大小

目录 &#xff08;一&#xff09;前言 &#xff08;二&#xff09;正文 1. 环境&#xff1a; 2. 查看实例下每个数据库的空间大小 &#xff08;1&#xff09; SQL语法 &#xff08;2&#xff09;运行结果 3. 查看特定数据库下每张表的大小 &#xff08;1&#xff09;SQ…

一个sql中的一张表,最多只会走一个索引吗

目录 先给结论 做实验 1.根据时间范围查询 什么是key_len&#xff1f; 2.根据时间范围和 is_delete 查询 最左匹配原则 2.根据时间范围和 blog_type 查询 如果加上id会怎么样 并不是索引一定会走 1.IN子表数量过多 2.单次查询超过30% 先给结论 先说结论&#xff0c;…

设计模式第14讲——享元模式(Flyweight)

目录 一、什么是享元模式 二、角色组成 三、优缺点 四、应用场景 4.1 生活场景 4.2 java场景 五、代码实现 5.0 代码结构 5.1 Bike——抽象享元类&#xff08;FlyWeight&#xff09; 5.2 具体享元类&#xff08;ConcreteFlyWeight&#xff09; 5.3 BikeFactory——享元…

layui框架学习(28:穿梭框模块)

Layui模块中的穿梭框模块transfer主要支撑穿梭框组件的显示、交互等操作。所谓穿梭框是指左右各有一个复选框列表&#xff0c;可以将左侧选中的项目移动到右边&#xff0c;后者将右侧的选中项移回左边的控件&#xff0c;其样式类似下图所示&#xff08;参考文献5-6&#xff09;…

TI AM62x工业开发板规格书(单/双/四核ARM Cortex-A53 + 单核ARM Cortex-M4F,主频1.4GHz)

1 评估板简介 创龙科技TL62x-EVM是一款基于TI Sitara系列AM62x单/双/四核ARM Cortex-A53 单核ARM Cortex-M4F多核处理器设计的高性能低功耗工业评估板&#xff0c;由核心板和评估底板组成。处理器ARM Cortex-A53(64-bit)主处理单元主频高达1.4GHz&#xff0c;ARM Cortex-M4F实…

如何使用 Flink SQL 探索 GitHub 数据集|Flink-Learning 实战营

为进一步帮助开发者学习使用 Flink&#xff0c;Apache Flink 中文社区近期发起 Flink-Learning 实战营项目。本次实战营通过真实有趣的实战场景帮助开发者实操体验 Flink&#xff0c;课程包括实时数据接入、实时数据分析、实时数据应用的场景实。并结合小松鼠助教模式&#xff…

USR-C216 WIIF连接手机

复位后连接USR-C216无线 浏览器输入10.10.100.254 账号密码为admin 客户端模式服务器地址无效&#xff0c;默认就行 打开手机网络调试助手选择客户端模式&#xff0c;输入10.10.100.254&#xff0c;端口8899 可以透传了 关于AT指令&#xff0c;先发“”&#xff0c;然后3s内发…

【数据管理架构】什么是 OLTP?

OLTP&#xff08;在线事务处理&#xff09;支持在 ATM 和在线银行、收银机和电子商务以及我们每天与之交互的许多其他服务背后进行快速、准确的数据处理。 什么是 OLTP&#xff1f; OLTP 或在线事务处理允许大量人员&#xff08;通常通过 Internet&#xff09;实时执行大量数据…

基于Vue+Node.js的宠物领养网站的设计与开发-计算机毕设 附源码83352

基于VueNode.js的宠物领养网站的 摘 要 随着互联网大趋势的到来&#xff0c;社会的方方面面&#xff0c;各行各业都在考虑利用互联网作为媒介将自己的信息更及时有效地推广出去&#xff0c;而其中最好的方式就是建立网络管理系统&#xff0c;并对其进行信息管理。由于现在网络…

【国产FPGA应用】紫光Pango Design联合 Modelsim 仿真方法

Modelsim 是 FPGA 开发中重要的 EDA 设计仿真工具&#xff0c;主要用于验证数字电路设计是否正确。我们经常用Xilinx的ISE或者Vivado与Modelsim进行联合仿真&#xff0c;其实国产FPGA开发工具也可以与Modelsim进行联合仿真&#xff0c;对于设计比较复杂的应用还是非常方便的&am…

创邻科技与浪潮信息KOS完成澎湃技术认证

近日&#xff0c;浙江创邻科技有限公司&#xff08;简称&#xff1a;创邻科技&#xff09;自主研发的Galaxybase图数据库系统与浪潮信息服务器操作系统KOS V5完成澎湃技术认证。创邻科技作为国内首个成熟的商业图数据库供应商&#xff0c;在同类厂商中率先完成认证。测试结果显…

vue3通过render函数实现一个菜单下拉框

背景说明 鼠标移动到产品服务上时&#xff0c;出现标红的下拉框。 使用纯css的方案实现最简单&#xff0c;但是没什么技术含量&#xff0c;弃之&#xff1b;使用第三方组件库&#xff0c;样式定制麻烦弃之。因此&#xff0c;我们使用vue3直接在页面创建一个dom作为下拉框吧。…

【经验分享】Docker容器部署方法说明

前 言 本案例适用开发环境&#xff1a; Windows开发环境&#xff1a;Windows 7 64bit、Windows 10 64bit Linux开发环境&#xff1a;Ubuntu 18.04.4 64bit 虚拟机&#xff1a;VMware15.1.0 Docker是一个开源的应用容器引擎&#xff0c;让开发者可打包他们的应用以及依赖包…

rust持续学习 声明宏

学习记录&#xff0c;都是学自圣经&#xff0c;macrobook啥的 https://doc.rust-lang.org/reference/macros-by-example.html macro_rules! bar {(3) > {println!("3");};(4) > {println!("4");}; }这个是入门例子&#xff0c;有点像match 调用就是…

【Java|多线程与高并发】线程池详解

文章目录 1. 线程池简介2. 创建线程池3. 工厂模式简介4. 线程池的使用5. 实现线程池6. ThreadPoolExecutor的构造方法讲解7. 线程池的线程数量,如何确定? 1. 线程池简介 Java线程池是一种用于管理和重用线程的机制&#xff0c;它可以在需要执行任务时&#xff0c;从线程池中获…

二叉树遍历方法——前、中、后序遍历(java)

二叉树结构&#xff1a; static class TreeNode{public char val;public TreeNode left;public TreeNode right;public TreeNode(char val) {this.val val;}Overridepublic String toString() {return this.val"";}} 一、前序遍历 前序遍历是一种访问二叉树的每一…

【shell脚本】沐风晓月跟你聊聊shell脚本中的case实战

前言 前面我们已经介绍了while及for循环&#xff0c;结合if语句可以构建一些简单的控制面板及菜单脚本&#xff0c;今天我们来探讨下case语句。 case选择语句&#xff0c;主要用于对多个选择条件进行匹配输出&#xff0c;与if elif语句结构类似&#xff0c;通常用于脚本传递输…

阵列模式合成第 I 部分:清零、窗口化和细化(附源码)

一、前言 本示例说明如何使用相控阵系统工具箱解决一些阵列合成问题。在相控阵设计应用中&#xff0c;通常需要找到一种方法来逐渐减小晶片响应&#xff0c;以使最终的阵列阵列模式满足某些性能标准。典型的性能标准包括主瓣位置、零位置和旁瓣电平。 二、使用旁瓣消除器消除干…

两个进程定时通过共享内存进行通信

进程1-client #include <stdio.h> #include <stdlib.h> #include <sys/ipc.h> #include <sys/shm.h> #include <unistd.h> #include <string.h>#define SHM_SIZE 10 * 1024 * 1024 // 共享内存大小为10M #define WRITE_INTERVAL 1 …

PHP 基础知识

目录 PHP基础 2 PHP代码标记 2 PHP注释 2 PHP语句分隔符 2 PHP变量 3 常量 3 数据类型 4 流程控制 6 文件 7 函数 9 闭包 11 常用系统函数 12 错误处理 13 错误显示设置 15 字符串类型 17 字符串相关函数 19 数组 21 遍历数组 22 数组的相关函数 25 PHP基础 PHP是一种运行在服务…