KDP(数据服务平台)是一款由 KaiwuDB 独立自主研发的数据服务产品,以 KaiwuDB 为核心,面向 AIoT 场景打造的一站式数据服务平台,满足工业物联网、数字能源、车联网、智慧产业等行业核心业务场景下数据采集、处理、计算、分析、应用的综合业务需求,实现“业务即数据,数据即服务”,助力企业从数据中挖掘更大的商业价值。
在开发数据服务平台的实时计算组件时,可能会遇到这样的问题:实时计算组件向用户提供自定义规则功能,用户注册多个规则运行一段时间后,再修改规则的定义并重新启动,会发生协程泄露。
一、真实案例
本文将用伪代码全面介绍开发数据服务平台的实时计算组件过程中,可能遇到的协程泄露问题。
//规则的大致数据结构
type DummyRule struct{
BaseRule
sorce []Source
sink []Sink
//flow map key:flow 名称,value:flow 实例
flow map[string]Flow
...
}
上述 DummyRule 是本次示例的规则数据结构,它包含多个数据来源 Source,多个数据目标 Sink,及数据流 Flow。规则具体过程如下图:
1 和 2 是两个源,首先分别用加法处理 1 和 2 两个源;其次调用 Merge 操作合成一个流;接着进行 Fanout 操作,生成两个相同的流,分别流入7 和 8;最终经过 7 和 8 的数字类型转成字符串,分别写入到 out1.txt 和 out2.txt 文件中。
type Source struct{
consumers []file.reader
out chan interface{}
ctx context.Context
cancel context.CancelFunc
...
}
上图是 Source 类数据源的伪代码,consumers 是用来读取文件数据的读取器,out 是用来传递给下一个数据流的通道,ctx 是 Go 的上下文。consumers 读取文件数据是一个单独的协程,读取的数据将放入 out 中,等待下一个数据流的消费。
type Sink struct{
producers []file.writer
in chan interface{}
ctx context.Context
cancel context.CancelFunc
...
}
上图是 Sink 类数据目标的伪代码,producers 是用来写文件的写入器,in 是用来接受上一个数据流的通道,ctx 是 Go 的上下文,producers 写文件数据也是一个单独的协程。
func(fm FlatMap) Via(flow streams.Flow) streams.Flow{
go fm.transmit(flow)
return flow
}
上图是数据流传递的源码。FlatMap 的用法是 curFlow := prevFlow.Via(nextFlow),这样可以把前一个 Flow 传递给下一个 Flow,可以看到一次数据流传递过程是在一个协程中进行的。
从前面源码可知,这个示例规则至少存在 10 个协程,但实际上,要比 10 个协程多得多。可见,在数据服务平台的实时计算组件中,协程管理是十分复杂的。
使用 go pprof,top,go traces 等工具进行反复测试和排查后,我们才发现协程泄露是由于规则中 Sink 的 Context 未被正确取消导致。
Context 是管理 goroutine 重要的语言特性。学会正确使用 Context,可以更好地厘清 goroutine 的关系并加以管理。从上述实例可以看出 Context 的重要性,学会正确使用 Context,不仅可以提高代码质量,更可以避免大量的协程泄露排查工作。
二、走进 Context
1.介绍
Context 通常被称为上下文,在 Go 语言中,理解为 goroutine 的运行状态、现场,存在上下层 goroutine Context 的传递,上层 goroutine 会把 Context 传递给下层 goroutine。
每个 goroutine 在运行前,都要事先知道程序当前的执行状态,通常将这些状态封装在一个 Context 变量中,传递给要执行的 goroutine。
在网络编程中,当接收到一个网络请求 Request,处理 Request 时,可能会在多个 goroutine 中处理。而这些 goroutine 可能需要共享 Request 的一些信息,当 Request 被取消或者超时,所有从这个 Request 创建的 goroutine 也要被结束。
Go Context 包不仅实现了在程序单元之间共享状态变量的方法,同时能通过简单的方法,在被调用程序单元外部,通过设置 ctx 变量的值,将过期或撤销等信号传递给被调用的程序单元。
在网络编程中,如果存在 A 调用 B 的 API,B 调用 C 的 API,那么假如 A 调用 B 取消,那么 B 调用 C 也应该被取消。通过 Context 包,可以非常方便地在请求 goroutine 之间传递请求数据、取消信号和超时信息。
Context 包的核心时 Context 接口:
// A Context carries a deadline, a cancellation signal, and other values across
// API boundaries
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context interface{
// 返回一个超时时间,到期则取消context。在代码中,可以通过deadline为io操作设置超过时间
Deadline() (deadline time.Time, ok bool)
// 返回一个channel,用于接收context的取消或者deadline信号。
// 当channel关闭,监听done信号的函数会立即放弃当前正在执行的操作并返回。
// 如果context实例时不可能取消的,那么
// 返回nil,比如空context,valueCtx
Done()
}
2.使用方法
对于 goroutine,他们的创建和调用关系总是像层层调用进行的,就像一个树状结构,而更靠顶部的 Context 应该有办法主动关闭下属的 goroutine 的执行。为了实现这种关系,Context 也是一个树状结构,叶子节点总是由根节点衍生出来的。
要创建 Context 树,第一步应该得到根节点,Context.Backupgroup 函数的返回值就是根节点。
func Background() Context{
return background
}
该函数返回空的 Context,该 Context 一般由接收请求的第一个 goroutine 创建,是与进入请求对应的 Context 根节点,他不能被取消,也没有值,也没有过期时间。他常常作为处理 Request 的顶层的 Context 存在。
有了根节点,就可以创建子孙节点了,Context 包提供了一系列方法来创建他们:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {}
func WithDeadline(parent Context, d time.Time)(Context, CancelFunc) {}
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {}
func WithValue(parent Context, key, val interface{}) Context {}
函数都接收一个 Context 类型的 parent,并返回一个 Context 类型的值,这样就层层创建除不同的 Context,子节点是从复制父节点得到,并且根据接收参数设定子节点的一些状态值,接着就可以将子节点传递给下层的 goroutine 了。
怎么通过 Context 传递改变后的状态呢?
在父 goroutine 中可以通过 Withxx 方法获取一个 cancel 方法,从而获得了操作子 Context 的权力。
(1)WithCancel
WithCancel 函数,是将父节点复制到子节点,并且返回一个额外的 CancelFunc 函数类型变量,该函数类型的定义为:type CancelFunc func()
调用 CancelFunc 将撤销对应的子 Context 对象。在父 goroutine 中,通过 WithCancel 可以创建子节点的 Context,还获得了子 goroutine 的控制权,一旦执行了 CancelFunc 函数,子节点 Context 就结束了,子节点需要如下代码来判断是否已经结束,并退出 goroutine:
select {
case <- ctx.Cone():
fmt.Println("do some clean work ...... ")
}
(2)WithDeadline
WithDeadline 函数作用和 WithCancel 差不多,也是将父节点复制到子节点,但是其过期时间是由 deadline 和 parent 的过期时间共同决定。当 parent 的过期时间早于 deadline 时,返回的过期时间与 parent 的过期时间相同。父节点过期时,所有的子孙节点必须同时关闭。
(3)WithTimeout
WithTimeout 函数和 WithDeadline 类似,只不过,他传入的是从现在开始 Context 剩余的生命时长。他们都同样也都返回了所创建的子 Context 的控制权,一个 CancelFunc 类型的函数变量。
当顶层的 Request 请求函数结束时,我们可以 cancel 掉某个 Context,而子孙的 goroutine 根据 select ctx.Done()来判断结束。
(4)Withvalue
WithValue 函数,返回 parent 的一个副本,调用该副本的 Value(key) 方法将得到 value。这样,我们不仅将根节点原有的值保留了, 还在子孙节点中加入了新的值;注意如果存在 key 相同,则会覆盖。
3.例子
package main
import (
"context"
"fmt"
"time"
)
func main() {
ctxWithCancel, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
go worker(ctxWithCancel, "[1]")
go worker(ctxWithCancel, "[2]")
go manager(cancel)
<-ctxWithCancel.Done()
// 暂停1秒便于协程的打印输出
time.Sleep(1 * time.Second)
fmt.Println("example closed")
}
func manager(cancel func( )) {
time.Sleep(10 * time.Second)
fmt.Println("manager called cancel()")
cancel()
}
func worker(ctxWithCancle context.Context, name string) {
for {
select {
case <- ctxWithCancel.Done():
fmt.Println(name, "return for ctxWithCancel.Done()")
return
default:
fmt.Println(name, "working")
}
time.Sleep(1 * time.Second)
}
}
这个过程的 Context 的架构图:
[1]working
[2]working
[2]working
[1]working
[1]working
[2]working
[2]working
[1]working
[1]working
[2]working
[1]return for ctxWithCancel.Done()
[2]return for ctxWithCancel.Done()example closed
可见,这次 worker 结束是因为 ctxWithCancel 的计时器到点引起的。
把 manager 时长改成 2 秒,WithTimeout 时长不变,再运行一次,worker 只工作了 2 秒就被 manager 提前叫停了。
[1]working
[2]working
[2]working
[1]workingmanager called cancel()
[1]return for ctxWithCancel.Done()
[2]return for ctxWithCancel.Done()example closed