概述
摘要:本文从源码层面解读了kubernetes源码中常用的workqueue.ParallelizeParallelizeUntil()
框架的源码实现,并且本文也将举例说明了workqueue.ParallelizeUntil()
方法的典型使用场景。
正文
说明:基于 kubernetes v1.18.0
源码分析
在Kubernetes源码中, 我们经常会读到workqueue.ParallelizeUntil()函数,它的作用是在并发多个worker来处理任务,直到接收到context发出的停止信号或任务完成。workqueue.ParallelizeUntil源码位于k8s.io/client-go/util/workqueue/parallelizer.go
,该workqueue是client-go中的一个工作队列,队列包括三种:FIFO、延迟队列和限速队列。关于workqueue我已经在之前的文章中有详细的介绍,如需了解请阅览informer中的WorkQueue机制的实现分析与源码解读(1)
ParallelizeUntil()的源码解读
ParallelizeUntil()
方法作用是并发多个worker来处理任务,直到接收到context发出的停止信号或任务完成
workers
表示启动多少个worker并发处理任务
pieces
表示要处理任务对应的index的数量
DoWorkPieceFunc
表示用于处理任务的工作函数
ctx
使用context控制并发任务的停止
// 定义worker函数
type DoWorkPieceFunc func(piece int)
// ParallelizeUntil is a framework that allows for parallelizing N
// independent pieces of work until done or the context is canceled.
// parallelelizeuntil是一个框架,它允许并行处理任务,直到完成或上下文被取消。
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
// 定义一个stop信号,当手动ctx.Done()信号后,就让整个 ParallelizeUntil 任务停止
var stop <-chan struct{}
if ctx != nil {
stop = ctx.Done()
}
// 将把需要多的工作对应的索引放到chan中,放入完成后就关闭chan,这里借助chan,是因为chan是线程安全的。
toProcess := make(chan int, pieces)
for i := 0; i < pieces; i++ {
toProcess <- i
}
close(toProcess)
// 对于要处理的对象数量少于worker数量时
if pieces < workers {
workers = pieces
}
wg := sync.WaitGroup{}
wg.Add(workers)
// 启动多个worker,同时从chan中取对象进行处理,知道收到停止信号或完成任务
for i := 0; i < workers; i++ {
go func(x int) {
defer utilruntime.HandleCrash()
defer wg.Done()
for piece := range toProcess {
fmt.Printf("work %d ",x)
select {
case <-stop:
return
default:
doWorkPiece(piece)
}
}
}(i)
}
wg.Wait()
}
从源码分析知道,ParallelizeUntil()
的逻辑比较简单,先将要处理的任务,放到chan中,放入完成后就关闭chan,这里借助chan,是因为chan是线程安全的。之后再启动多个worker协程,这些协程不断的从chan去job进行处理。逻辑示意图如下。
如果在多想象一下,ParallelizeUntil()
的逻辑,是不是与工厂中多个机器人不断的从传输带上去东西来进行加工的场景
啊。
接下来我们再看下,worker协程中捕获panic的HandleCrash
的源码。源码逻辑比较简单,就是worker协程中如果出现panic将被recover捕获,捕获之后如果定义了额外的handler函数,会遍历执行。
var PanicHandlers = []func(interface{}){logPanic}
// PanicHandlers is a list of functions which will be invoked when a panic happens.
var PanicHandlers = []func(interface{}){logPanic}
// HandleCrash simply catches a crash and logs an error. Meant to be called via
// defer. Additional context-specific handlers can be provided, and will be
// called in case of panic. HandleCrash actually crashes, after calling the
// handlers and logging the panic message.
//
// E.g., you can provide one or more additional handlers for something like shutting down go routines gracefully.
func HandleCrash(additionalHandlers ...func(interface{})) {
// 捕获panic
if r := recover(); r != nil {
for _, fn := range PanicHandlers {
fn(r)
}
for _, fn := range additionalHandlers {
fn(r)
}
if ReallyCrash {
// Actually proceed to panic.
panic(r)
}
}
}
代码测试
编写一个简单程序,测试workqueue.ParallelizeUntil()
方法的使用.
需求: 通过workqueue.ParallelizeUntil()
,并发的找出100以内的素数
。
import (
"context"
"fmt"
"k8s.io/client-go/util/workqueue"
"testing"
"time"
)
// 判断一个数是否是素数
func isPrieme(num int) bool {
for i := 2; i < num; i++ {
if num%i == 0 {
return false
}
}
return true
}
// 定义用于保存结果的chan
var ResultChan = make(chan int, 100)
func WorkFunc(num int) {
fmt.Println(" check num: ", num)
if isPrieme(num) {
ResultChan <- num
}
}
// go test -mod=vendor -run="^TestParallelizeUntil" -v
func TestParallelizeUntil(t *testing.T) {
// 定义超时信号对应的ctx
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
// 判断一个数是否是素数
myFunc := WorkFunc
// 执行查找
workqueue.ParallelizeUntil(ctx, 3, 100, myFunc)
// 将结果放入结果chan
fmt.Println("result:")
close(ResultChan)
for i := range ResultChan {
fmt.Printf("%d ", i)
}
fmt.Println()
}
---------------执行与输出如下-----------------
(base) @PM80280051 ➜ utils git:(dev-xw) ✗ go test -run="^TestParallelizeUntil" -v
=== RUN TestParallelizeUntil
check num: 0
check num: 3
check num: 4
check num: 5
check num: 6
check num: 7
check num: 8
check num: 9
check num: 10
check num: 11
check num: 12
check num: 13
check num: 14
check num: 15
check num: 16
check num: 17
check num: 1
check num: 19
check num: 20
check num: 21
check num: 22
check num: 23
check num: 24
check num: 25
check num: 26
check num: 27
check num: 28
check num: 29
check num: 30
check num: 31
check num: 32
check num: 2
check num: 34
check num: 33
check num: 36
check num: 37
check num: 38
check num: 39
check num: 40
check num: 41
check num: 42
check num: 43
check num: 44
check num: 45
check num: 46
check num: 47
check num: 48
check num: 49
check num: 35
check num: 51
check num: 52
check num: 53
check num: 54
check num: 55
check num: 56
check num: 57
check num: 58
check num: 59
check num: 60
check num: 61
check num: 62
check num: 63
check num: 64
check num: 65
check num: 66
check num: 67
check num: 68
check num: 69
check num: 70
check num: 18
check num: 72
check num: 71
check num: 74
check num: 75
check num: 76
check num: 77
check num: 78
check num: 79
check num: 80
check num: 81
check num: 82
check num: 50
check num: 84
check num: 85
check num: 86
check num: 87
check num: 88
check num: 89
check num: 90
check num: 91
check num: 92
check num: 93
check num: 94
check num: 95
check num: 96
check num: 97
check num: 83
check num: 99
check num: 98
check num: 73
result: 0 3 5 7 11 13 17 1 19 23 29 31 2 37 41 43 47 53 59 61 67 71 79 89 97 83 73
--- PASS: TestParallelizeUntil (0.00s)
PASS
ok kubecmdb/utils 0.552s
在这个例子中,ParallelizeUntil 函数,启动了3个worker,并发的处理10个任务,直到任务处理完,或者接收到ctx定义的超时信号。
注意,ParallelizeUntil 函数不会返回任何值,如果需要记录worker协程中的结果,可以自己定义。`
使用场景
在kubernetes源码中,Kube-scheduler在进行预选算法时,使用了workqueue.ParallelizeUntil()
,并发16个worker同时执行预选算法。在集群规模较大时,并发处理能提升kube-scheduler调度任务的整体效能。
预选算法执行预选的源码findNodesThatPassFilters
// k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go
// findNodesThatPassFilters finds the nodes that fit the filter plugins.
// findNodesThatPassFilters使用过滤插件来查找适合的节点。
func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, err
}
numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes)))
// Create filtered list with enough space to avoid growing it
// and allow assigning.
filtered := make([]*v1.Node, numNodesToFind)
// 代码略
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
// !!!重点,启动了16个work,并发的执行 checkNode 函数,对nodes节点进行预选操作
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), checkNode)
processedNodes := int(filteredLen) + len(statuses)
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes)
// filter 列表预选算法,过滤出满足预选算法的node节点
filtered = filtered[:filteredLen]
if err := errCh.ReceiveError(); err != nil {
statusCode = framework.Error
return nil, err
}
return filtered, nil
}
结论
workqueue.ParallelizeUntil
框架广泛的适用于kubernetes源码,通过对其源码的解读,我们了解到了其如何实现与使用场景。我们可以在平时日常开发中,也可以多多尝试使用这个成熟的并发任务框架。
我们可以通过阅读kubernetes源码,学习Kubernetes内部机制,同时Kubernets项目中有一些好的成熟框架,我们可以学以致用,多在用在日常开发中才能真正掌握。
参考资料
Kubernete-v1.18源码