go-resiliency源码解析之-batcher
源代码地址 : https://github.com/eapache/go-resiliency/blob/master/batcher/batcher.go
1.batcher定义
创建一个batch对象需要2个参数:
Timeout:超时,这是一个batch对象收集输入参数的时间。
work函数变量:在timeout超时后,会调用一次work函数,来处理每一个输入参数。
整体处理流程如下图:
2.核心源码解析
核心结构定义
type work struct {
//收集的一个参数
param interface{}
//参数处理返回
future chan error
}
type Batcher struct {
//收集参数的超时时间
timeout time.Duration
//过滤器函数
prefilter func(interface{}) error
//互斥量,用于参数收集并发控制
lock sync.Mutex
//存储收集到参数的chan
submit chan *work
//批处理函数,超时后,调用该函数一次,处理全部参数
//[]interface{}
doWork func([]interface{}) error
done chan bool
}
Run函数
//param是timeout内可收集参数,业务方调用Run函数传入参数
func (b *Batcher) Run(param interface{}) error {
//先判断是否有过滤器函数。 prefilter相当于一个数据清洗函数,对无效param参数返回err,这样
//在dowork里就不会处理这个输入参数
if b.prefilter != nil {
if err := b.prefilter(param); err != nil {
return err
}
}
//timeout==0表示无收集参数时间,需要立刻执行doWork函数
if b.timeout == 0 {
return b.doWork([]interface{}{param})
}
//当timeout > 0 ,就构造一个work对象放入到chan里
w := &work{
param: param,
future: make(chan error, 1),
}
b.submitWork(w)
return <-w.future
}
func (b *Batcher) Prefilter(filter func(interface{}) error) {
b.prefilter = filter
}
submitWork函数:在Run函数里,当timeout > 0会调用submitWork函数
func (b *Batcher) submitWork(w *work) {
//这里为什么要加一个互斥锁?
//对,主要是防止下面if里的代码被并发执行
b.lock.Lock()
defer b.lock.Unlock()
//创建submit的chan, 开启一个batch协程
if b.submit == nil {
b.done = make(chan bool)
b.submit = make(chan *work, 4)
go b.batch()
}
b.submit <- w
}
func (b *Batcher) batch() {
//params为收集参数集合
var params []interface{}
var futures []chan error
input := b.submit
go b.timer()
//for读取input这个chan,input在没有close前,这个for不会退出
//所以这里就是在等待timeout时间,把输入的参数收集到params这个切片
//?? 那input chan什么时候被close了?? 就是 go b.timer()这一句
for work := range input {
params = append(params, work.param)
futures = append(futures, work.future)
}
//这里就是把收集到的参数传入到你设置的函数,执行业务逻辑
ret := b.doWork(params)
//把doWork执行结果写回到future,这样调用线程就可以读取到执行结果
for _, future := range futures {
future <- ret
close(future)
}
close(b.done)
}
func (b *Batcher) timer() {
//阻塞协程timeout时间,然后调用flush函数
time.Sleep(b.timeout)
//主要就是关闭submit这个chan,让batch里收集参数for循环退出
b.flush()
}
func (b *Batcher) flush() {
b.lock.Lock()
defer b.lock.Unlock()
if b.submit == nil {
return
}
close(b.submit)
b.submit = nil
}
3.测试用例
这个测试用例实现,在1s内收集传入的整形,然后求和
func TestBatcher(t *testing.T) {
wg := &sync.WaitGroup{}
b := New(time.Second, func(params []interface{}) error {
sum := 0
for _, p := range params {
sum += p.(int)
}
t.Logf("sum %d", sum)
return nil
})
b.Prefilter(func(param interface{}) error {
// do some sort of sanity check on the parameter, and return an error if it fails
return nil
})
for i := 1; i <= 10; i++ {
wg.Add(1)
go func(param interface{}) {
go b.Run(i)
wg.Done()
}(i)
}
wg.Wait()
time.Sleep(5 * time.Second)
}