本文基于Kubernetes v1.22.4版本进行源码学习,对应的client-go版本为v0.22.4
4、自定义Controller的工作原理
Controller中主要使用到Informer和WorkQueue两个核心组件
Controller可以有一个或多个Informer来跟踪某一个resource。Informer跟Kubernetes API Server保持通讯获取资源的最新状态并更新到本地的cache中,一旦跟踪的资源有变化,Informer就会调用callback把关心的变更的Object放到WorkQueue里面
Worker执行真正的业务逻辑,计算和比较WorkQueue里items的当前状态和期望状态的差别,然后通过client-go向Kubernetes API Server发送请求,直到驱动这个集群向用户要求的状态演化
5、WorkQueue
WorkQueue支持3种队列类型:
- FIFO队列
- 延迟队列
- 限速队列
1)、FIFO队列
FIFO队列支持最基本的队列方法,WorkQueue中的限速及延迟队列都基于FIFO队列的Interface接口实现,其提供如下方法:
// vendor/k8s.io/client-go/util/workqueue/queue.go
type Interface interface {
Add(item interface{}) // 给队列添加元素
Len() int // 返回当前队列的长度
Get() (item interface{}, shutdown bool) // 获取队列头部的一个元素
Done(item interface{}) // 标记队列中该元素已被处理
ShutDown() // 关闭队列
ShuttingDown() bool // 查询队列是否正在关闭
}
FIFO队列数据结构如下:
// vendor/k8s.io/client-go/util/workqueue/queue.go
type Type struct {
queue []t // 定义队列,具有顺序性,待处理元素列表
dirty set // 标记所有需要被处理的元素
processing set // 当前正在被处理的元素
cond *sync.Cond
shuttingDown bool // 是否正在关闭
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
type empty struct{}
type t interface{}
type set map[t]empty
FIFO队列的核心方法:Add、Get、Done,代码如下:
// vendor/k8s.io/client-go/util/workqueue/queue.go
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 队列正在关闭,直接返回
if q.shuttingDown {
return
}
// dirty中已经包含该元素,直接返回
if q.dirty.has(item) {
return
}
q.metrics.add(item)
// 添加到dirty中
q.dirty.insert(item)
// 元素正在被处理,直接返回
if q.processing.has(item) {
return
}
// 添加到待处理元素列表
q.queue = append(q.queue, item)
// 通知有新元素到了,此时有协程阻塞就会被唤醒
q.cond.Signal()
}
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 队列中没有数据,阻塞协程
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
// 协程被激活但还没有数据,说明队列被关闭了
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
// 弹出队列中的第一个元素
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
// 加入到processing中,从dirty中移除
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
// 从processing中移除
q.processing.delete(item)
// 判断dirty中,看看处理期间是不是又被添加,如果是,就在放到队列中
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
FIFIO存储过程如下:
- 通过Add方法往FIFO队列中分别插入1、2、3这3个元素,此时queue和dirty中分别存有1、2、3元素,processing为空
- 然后通过Get方法获取最先进入的元素(1元素),此时queue和dirty中分别存有2、3元素,而1元素被放入processing中,表示该元素正在被处理
- 最后,当处理完1元素时,通过Done方法标记该元素已经被处理完成,此时processing中的1元素会被删除
FIFO并发存储过程如下:
- 在并发场景下,goroutine A通过Get方法获取1元素,1元素被添加到processing中
- 此时,goroutine B通过Add方法插入另一个1元素,此时在processing中已经存在相同的元素,所以后面的1元素并不会被直接添加到queue中,当前FIFO队列中的dirty中存有1、2、3元素,processing中存有1元素
- 在goroutine A通过Done方法标记元素1处理完成后,如果dirty中存有1元素,则将1元素追加到queue的尾部。正在处理中的元素还没有调用Done时,此时再添加该元素应该是最新的,处理中的应该是过时的,所以在goroutine A调用Done方法之后,该元素会重新入队
dirty和processing都是用Hash Map实现的,所以不需要考虑无序,只保证去重即可
2)、延迟队列
延迟队列,基于FIFO队列接口封装,在原有功能上增加了AddAfter方法,其原理是延迟一段时间后再将元素插入FIFO队列。延迟队列数据结构如下:
// vendor/k8s.io/client-go/util/workqueue/delaying_queue.go
type DelayingInterface interface {
Interface
AddAfter(item interface{}, duration time.Duration)
}
type delayingType struct {
Interface
clock clock.Clock
stopCh chan struct{}
stopOnce sync.Once
heartbeat clock.Ticker
waitingForAddCh chan *waitFor // 所有延迟添加的元素封装成waitFor放到chan中,默认初始大小为1000
metrics retryMetrics
}
type waitFor struct {
data t
readyAt time.Time
index int
}
延迟队列的AddAfter方法,该方法有一个duration(延迟时间)参数,该参数用于指定元素延迟插入FIFO队列的时间。如果duration小于或等于0,会直接将元素插入FIFO队列中,代码如下:
// vendor/k8s.io/client-go/util/workqueue/delaying_queue.go
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// 如果队列关闭,直接返回
if q.ShuttingDown() {
return
}
q.metrics.retry()
// 不需要延迟,直接将元素插入FIFO队列中
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
// 把元素封装成waitFor传入chan
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
AddAfter方法就是简单把元素添加到chan中, 所以核心实现在从chan中获取数据的waitingLoop方法, 如下所示:
// vendor/k8s.io/client-go/util/workqueue/delaying_queue.go
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
never := make(<-chan time.Time)
var nextReadyAtTimer clock.Timer
// waitForPriorityQueue把需要延迟的元素形成了一个队列,队列按照元素的延迟添加的时间(readyAt)从小到大排序
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{}
for {
if q.Interface.ShuttingDown() {
return
}
// 获取当前时间
now := q.clock.Now()
// waitingForQueue中是否有元素
for waitingForQueue.Len() > 0 {
// 获取waitingForQueue中的第一个元素
// waitingForQueue中的元素按照延迟添加的时间从小到大排序,第一个元素为时间最小的元素
entry := waitingForQueue.Peek().(*waitFor)
// 元素指定延迟添加的时间过了吗?如果没有过就跳出循环
if entry.readyAt.After(now) {
break
}
// 指定延迟添加的时间过了,就把元素从waitingForQueue中拿出来放入FIFO队列中
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
nextReadyAt := never
// 如果waitingForQueue中有元素,用第一个元素指定的时间减去当前时间作为等待时间
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}
select {
case <-q.stopCh:
return
case <-q.heartbeat.C():
// waitingForQueue中需要等待时间信号,时间到就会有信号
case <-nextReadyAt:
// 从chan中获取元素的,AddAfter()放入chan中的元素
case waitEntry := <-q.waitingForAddCh:
// 如果时间已经过了就直接放入FIFO队列中,没过就插入到有序队列
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
// 把chan中的元素全部取出,用了default意味着chan中没有数据就会立刻停止
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
延迟队列运行原理如下:
将元素1添加到waitingForAddCh中,通过waitingLoop方法消费元素数据。当元素的延迟时间不大于当前时间时,说明还需要延迟元素插入FIFO队列的时间,此时将该元素放入到waitingForQueue中。当元素的延迟时间大于当前时间时,则将该元素插入FIFO队列中。另外,还会遍历waitingForQueue中的元素,按照上述逻辑验证时间
3)、限速队列
限速队列,基于延迟队列和FIFO队列接口封装,在原有功能上增加了AddRateLimited、Forget、NumRequeues方法。限速队列接口定义如下:
// vendor/k8s.io/client-go/util/workqueue/rate_limiting_queue.go
type RateLimitingInterface interface {
DelayingInterface
AddRateLimited(item interface{}) // 往队列里加入一个元素
Forget(item interface{}) // 停止元素重试
NumRequeues(item interface{}) int // 查询这个元素被处理多少次了
}
限速队列的重点在于它提供的4种限速算法接口。其原理是,限速队列利用延迟队列的特性,延迟某个元素的插入时间,达到限速目的。RateLimiter接口定义如下:
// vendor/k8s.io/client-go/util/workqueue/default_rate_limiters.go
type RateLimiter interface {
When(item interface{}) time.Duration // 获取元素需要等待多长时间
Forget(item interface{}) // 抛弃该元素,意味着该元素已经被处理了
NumRequeues(item interface{}) int // 获取元素放入队列的次数
}
WorkQueue提供了4种限速算法:
- 令牌桶算法(BucketRateLimiter)
- 排队指数算法(ItemExponentialFailureRateLimiter)
- 计数器算法(ItemFastSlowRateLimiter)
- 混合模式(MaxOfRateLimiter),将多种限速算法混合使用
1)令牌桶算法
令牌桶算法是通过Go第三方库golang.org/x/time/rate
实现的。令牌桶算法内部实现了一个存放token的桶,初始时桶是空的,token会以固定速率往桶里填充,直到将其填满为止,多余的token会被丢弃。每个元素都会从令牌桶得到一个token,只有得到token的元素才允许通过,而没有得到token的元素处于等待状态。实现元素如下图:
WorkQueue在默认的情况下会实例化令牌桶,代码如下:
rate.NewLimiter(rate.Limit(10), 100)
在实例化rate.NewLimiter
时传入r和b两个参数:r参数表示每秒往桶里填充的token数量;b参数表示令牌桶的大小,即令牌桶最多存放的token数量
2)排队指数算法
排队指数算法将相同元素的排队数作为指数,排队数增大,速率限制呈指数级增长,但其最大值不会超过maxDelay。代码实现如下:
// vendor/k8s.io/client-go/util/workqueue/default_rate_limiters.go
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int // 记录元素的排队数,每调用一次When累加一次
baseDelay time.Duration // 延迟基数
maxDelay time.Duration // 最大延迟时间
}
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 累加元素的排队数
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// 通过元素的排队数计算延迟时间,公式是 2^i * baseDelay,呈指数级增长
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
// 计算后的延迟时间和最大延迟时间取最小值
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 清空元素的排队数
delete(r.failures, item)
}
元素的排队数统计是有限速周期的,一个限速周期是指从执行AddRateLimited方法到执行完Forget方法之间的时间。如果该元素被Forget方法处理完,则清空排队数
该算法提供了3个主要字段:failures用于统计元素排队数,每当AddRateLimited方法插入新元素时,会为该字段加1;baseDelay是延迟基数(默认为5ms),maxDelay是最大延迟时间(默认为1000s)。通过元素的排队数计算延迟时间,公式是 2 i ∗ b a s e D e l a y 2^i * baseDelay 2i∗baseDelay
3)计数器算法
计数器算法的原理是:限制一段时间内允许通过的元素数量,例如在1分钟内只允许通过1000个元素,每插入一个元素,计算器自增1,当计数器到100的阈值且还在限速周期内时,则不允许元素再通过。WorkQueue在此基础上扩展了fast和slow速率,代码实现如下:
// vendor/k8s.io/client-go/util/workqueue/default_rate_limiters.go
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int // 统计元素排队数
maxFastAttempts int // 用于控制从fastDelay转换到slowDelay
fastDelay time.Duration // 短延迟时间
slowDelay time.Duration // 长延迟时间
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 累加元素的排队数
r.failures[item] = r.failures[item] + 1
// 元素的排队数超过阈值用长延迟时间,否则用短延迟时间
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 清空元素的排队数
delete(r.failures, item)
}
假设fastDelay是5ms,slowDelay是10s,maxFastAttempts是3。在一个限速周期内通过AddRateLimited方法插入4个相同的元素,那么前3个元素使用fastDelay的5ms,当触发maxFastAttempts时,第4个元素使用slowDelay的10s
4)混合模式
混合模式内部有多个限速器,每次返回所有限速器里延迟最大的时间。代码如下:
// vendor/k8s.io/client-go/util/workqueue/default_rate_limiters.go
type MaxOfRateLimiter struct {
limiters []RateLimiter // 限速器数组
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
// 获取所有限速器里最大的延迟时间
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
ret := 0
for _, limiter := range r.limiters {
curr := limiter.NumRequeues(item)
if curr > ret {
ret = curr
}
}
return ret
}
func (r *MaxOfRateLimiter) Forget(item interface{}) {
for _, limiter := range r.limiters {
limiter.Forget(item)
}
}
参考:
《Kubernetes源码剖析》
深入浅出kubernetes之client-go的workqueue