背景
client-go中的workqueue包里主要有三个队列,分别是普通队列Queue,延时队列DelayingQueue,限速队列RateLimitingQueue,后一个队列以前一个队列的实现为基础,层层添加新功能。
workqueue是整个client-go源码的重点和难点。采用层层拨开分步理解有助于理解workqueue的源码。本文重点在从源码角度了解下workqueue的add(),get(),done()方法执行的过程。关于延时队列与限速队列是如何实现的后面再单独讨论。
workqueue源码分析
代码结构
源码位于:vendor/k8s.io//client-go/util/workqueue/queue.go
queue类型的定义
下面是queue类型定义。其中queue、dirty、processing 都保存 items。它们的区别是:
-
queue是有序列表用来存储 item 的处理顺序。
-
dirty集合存储的是所有需要处理的 item,是set类型,无序,用于保证items的唯一。dirty的字面意思就是需要被处理的数据。
-
processing集合存储的是当前正在处理的 item,也是set类型,无序,用于保证items的唯一。
// Type is a work queue (see the package comment).
type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t // 是一个切片保证对象入队的顺序性。每个在queue队列的对象,必须同时也在dirty集合。
// dirty defines all of the items that need to be processed.
dirty set // 是一个set集合,保证对象的唯一性。存放需要被处理的对象
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set // 也是一个set集合,保证对象的唯一性
cond *sync.Cond // 安全处理队列里面的对象
shuttingDown bool // 队列是否处理关闭中
metrics queueMetrics // 用于计数统计
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
要理解workqueue是的工作机制,必须要了解queue队列3个重要的方法,Add,Get,Done。接下来展开分析。
Add方法
informer机制中,当一个对象从DeltaFIFO队列中pod弹出后,会转到AddEventHandler事件处理函数处理,AddEventHandler需要调用workqueue的Add方法,先把对象加入队列,等下用户任务来处理。
Add方法是将item加入队列q.queue和待处理集合q.dirty。若该item正在被处理只加入q.dirty。
// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 如果队列处于shuttdingDown状态就返回
if q.shuttingDown {
return
}
// 如果dirty里面有这个item,就返回
if q.dirty.has(item) {
return
}
// 添加到metric用于计数
q.metrics.add(item)
// 插入到dirty队列
q.dirty.insert(item)
// 如果processing队列已经有这个item,就返回
if q.processing.has(item) {
return
}
// 如果processing队列没有这个item,就加入queue队列
q.queue = append(q.queue, item)
// 发信号让其他goroutine处理
q.cond.Signal()
}
在执行Add()添加对象到workqueue时,主要有三种场景,如下图所示
场景一:当3个队列都没有这个对象时,对象插入到queue和dirty
场景二:当某个对象已经加入到了队列,但还未开始被处理时,就直接返回不再加入队列。
场景三:当某个对象处于”处理中“状态,也就是位于processing队列中时,会把元素加入到dirty队列。当处理完,执行Done()方法后,item会被重新加入queue队列。
Get方法
Get方法是从 queue队列中取出一个元素item加入正处理集合q.processing,并从queue队列中删除,从dirty中删除。
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 如果queue队列为空,并且队列不是处于shuttingDown状态就阻塞等待Add()对象到队列
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
// 如果queue队列为空了,而且队列处于shuttingDown状态,就返回空值
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
// 从queue队列弹出一个元素
item, q.queue = q.queue[0], q.queue[1:]
// 计数
q.metrics.get(item)
// 插入到processing队列
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
Get方法的图示
Done方法
Done方法是表明这个元素item被处理完了,从processing队列删除。这里加了一个判断,如果dirty中还存在,还要将其加入 queue队列。
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
// 直接从processing队列删除
q.processing.delete(item)
// 如果dirty队列里面还有这个对象(通常是处理元素过程中,对象再次入队了),就将元素从新加到queue队列。
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
在执行Add()添加对象到workqueue时,主要有二种场景,如下图所示
场景一:对象item完成处理,并且处理过程中该对象没有再次入队
场景二:对象item在处理过程中,还没处理完之前,这个对象又入队被加入了dirty队列(也就是此时执行了Add方法)。当执行Done()后,item会被重新添加(re-add)到queue队列