业务场景解析之——如何执行批次重要的任务
- 前言
- 一、流程时序
- 二、并发设计
前言
这次业务场景上需要跑一批任务识别任务,而每个具体任务识别都比较重要,需要调用外部接口进行计费,而量又比较大,这就要求这个任务是比较稳定安全的。
一、流程时序
首先可以参照如下的时间线,来确保这批任务能够正常的按照流程进行执行。
其中假如你的任务是并发执行的,应该控制你并发的量(尤其是在go起一个协程代价很低的情况下,java中一般直都是起线程池。)
二、并发设计
所以在批次任务的设计上可以这样进行构思:
可以在配置中心定义三个key。**第一个key为协程数,第二个key为sleep时间,第三个key为任务的开关。**然后在一批次任务分多批次,每一批任务由一个协程来跑。然后每次任务执行前,都进行一次开关的校验,是否执行任务。然后执行任务后再进行睡眠,由此就能对批次任务的速率进行实时调节。
对于并发的代码逻辑可以使用conc中的waitGroup或者conc中的pool来进行编写,能够更加的优雅便捷。
func TestConcPool(t *testing.T) {
p := pool.New().WithMaxGoroutines(2)
for i := 0; i < 100; i++ {
k := i
p.Go(func() {
fmt.Println(GetGID())
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered from panic:", r)
}
}()
if k == 10 {
ExecuteTaskDetail()
} else {
fmt.Println("goroutine_print, k is ", k)
}
})
}
p.Wait()
}
func GetGID() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
b = bytes.TrimPrefix(b, []byte("goroutine "))
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
只起了两个19、33两个协程。
- 使用waitGroup:
- func TestSplitArray(t *testing.T) {
// 起的协程的数量
goCount := 2
wg := conc.NewWaitGroup()
list := []int{1, 2, 3, 4, 5, 6}
// 将数组按照指定长度切割,这里切割成3个长度为2的数组
lists := SplitArray(list, goCount)
startTime := time.Now()
for _, splitTaskList := range lists {
for _, task := range splitTaskList {
p := task
go func() {
fmt.Print(p)
}()
}
// 等两个协程执行完再执行下一批
wg.WaitAndRecover()
time.Sleep(time.Duration(5) * time.Second)
}
// 因此预期应该为15s
fmt.Printf("Batch task execution successful, time is: %v ", time.Since(startTime))
}