本文基于Kubernetes v1.22.4版本进行源码学习,对应的client-go版本为v0.22.4
3、Informer机制
4)、Indexer
Indexer中有Informer维护的指定资源对象的相对于etcd数据的一份本地缓存,可通过该缓存获取资源对象,以减少对Kubernetes API Server和etcd集群的请求压力
Indexer除了维护一份本地缓存之外,还有一个很重要的功能,便是索引功能。索引的目的是为了快速查找,比如我们需要查找某个node节点上的所有pod、查找某个命名空间下的所有pod等,利用索引可以实现快速查找
1)Indexer的结构定义分析
Indexer接口继承了一个Store接口(实现本地缓存),以及包含几个index索引相关的方法声明(实现索引功能)
// vendor/k8s.io/client-go/tools/cache/index.go
type Indexer interface {
Store
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexedValue string) ([]string, error)
ListIndexFuncValues(indexName string) []string
ByIndex(indexName, indexedValue string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
}
Store接口中定义了Add、Update、Delete、List、Get等一些资源对象增删改查的方法声明,用于操作Informer的本地缓存
// vendor/k8s.io/client-go/tools/cache/store.go
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
Resync() error
}
cache struct是Indexer接口的一个实现,所以自然也是Store接口的一个实现,cache struct包含一个ThreadSafeStore接口的实现,以及一个计算object key的函数KeyFunc
cache struct会根据KeyFunc生成某个资源对象对应的一个唯一key,然后调用ThreadSafeStore接口中的方法来操作本地缓存中的对象
// vendor/k8s.io/client-go/tools/cache/store.go
type cache struct {
cacheStorage ThreadSafeStore
keyFunc KeyFunc
}
ThreadSafeStore接口包含了操作本地缓存的增删改查方法以及索引功能的相关方法,其方法名称与Indexer接口类似,最大的区别是ThreadSafeStore接口的增删改查方法入参基本都有key,由cache struct中的KeyFunc函数计算得出
// vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
Resync() error
}
threadSafeMap struct是ThreadSafeStore接口的一个实现。items字段中存储的是资源对象数据,其中items的key通过keyFunc函数计算得出,计算默认使用MetaNamespaceKeyFunc函数,该函数根据资源对象计算出<namespace>/<name>
格式的key,如果资源对象的<namespace>
为空,则<name>
作为key,而items的value用于存储资源对象。indexers与indices属性则与索引功能有关
// vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
indexers Indexers
indices Indices
}
Indexer整体结构如下图:
2)Indexer的索引功能
在threadSafeMap struct中,与索引功能有关的是indexers与indices属性,代码如下:
// vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
indexers Indexers
indices Indices
}
type Indexers map[string]IndexFunc
type IndexFunc func(obj interface{}) ([]string, error)
type Indices map[string]Index
type Index map[string]sets.String
Indexer数据结构说明如下:
- Indexers:存储索引器,key为索引器名称,value为索引器的实现函数
- IndexFunc:索引器函数,定义为接收一个资源对象,返回检索结果列表
- Indices:存储缓存器,key为缓存器名称,value为缓存数据
- Index:存储缓存数据,其结构为K/V
indexers包含了所有索引器(索引分类)及其索引器函数IndexFunc,IndexFunc为计算某个索引键下的所有对象键列表的方法:
indexers: {
"索引器1": 索引函数1,
"索引器2": 索引函数2,
}
数据示例:
indexers: {
"namespace": MetaNamespaceIndexFunc,
"nodeName": NodeNameIndexFunc,
}
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
return []string{meta.GetNamespace()}, nil
}
func NodeNameIndexFunc(obj interface{}) ([]string, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, fmt.Errorf("not a pod")
}
return []string{pod.Spec.NodeName}, nil
}
indices包含了所有索引器(索引分类)及其所有的索引数据Index,而Index则包含了索引键以及索引键下的所有对象键的列表:
indices: {
"索引器1": {
"索引键1": ["对象键1", "对象键2"],
"索引键2": ["对象键3"],
},
"索引器2": {
"索引键3": ["对象键1"],
"索引键4": ["对象键2", "对象键3"]
}
}
数据示例:
pod1 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "default",
},
Spec: v1.PodSpec{
NodeName: "node1",
},
}
pod2 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-2",
Namespace: "default",
},
Spec: v1.PodSpec{
NodeName: "node2",
},
}
pod3 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-3",
Namespace: "kube-system",
},
Spec: v1.PodSpec{
NodeName: "node2",
},
}
indices: {
"namespace": {
"default": ["pod-1", "pod-2"],
"kube-system": ["pod-3"],
},
"nodeName": {
"node1": ["pod-1"],
"node2": ["pod-2", "pod-3"],
}
}
3)Indexer的索引功能核心实现
index.ByIndex
函数通过执行索引器函数得到索引结果,代码如下:
// vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
// 1)从c.indexers中查找指定的索引器函数
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
// 2)从c.indices中查找指定的缓存器函数
index := c.indices[indexName]
// 3)根据需要检索的indexedValue从缓存数据中查到并返回数据
set := index[indexedValue]
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
}
return list, nil
}
ByIndex接收两个参数:indexName(索引器名称)和indexedValue(需要检索的key)。首先从c.indexers
中查找指定的索引器函数,从c.indices
中查找指定的缓存器函数,然后根据需要检索的indexedValue从缓存数据中查到并返回数据
threadSafeMap的Add、Update方法将资源对象存入items后,并调用updateIndices方法更新索引,代码如下:
// vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
// if we got an old object, we need to remove it before we add it again
if oldObj != nil {
c.deleteFromIndices(oldObj, key)
}
// 1)遍历存储索引器indexers
for name, indexFunc := range c.indexers {
// 2)调用indexFunc计算出检索结果列表
indexValues, err := indexFunc(newObj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
// 3)从存储缓存器indices获取对应的index
index := c.indices[name]
if index == nil {
index = Index{}
c.indices[name] = index
}
// 4)遍历indexFunc计算出的检索结果列表,添加到index中
for _, indexValue := range indexValues {
set := index[indexValue]
if set == nil {
set = sets.String{}
index[indexValue] = set
}
set.Insert(key)
}
}
}
5)、SharedInformer
1)示例代码
使用SharedInformer监听default命名空间下的Pod资源对象的Added、Updated、Deleted事件,代码如下:
package main
import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"log"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
// 1)初始化informer factory
factory := informers.NewSharedInformerFactoryWithOptions(
clientset, 0, informers.WithNamespace("default"))
// 2)初始化pod informer
informer := factory.Core().V1().Pods().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("New Pod Added to Store: %s", mObj.GetName())
},
UpdateFunc: func(oldObj, newObj interface{}) {
oObj := oldObj.(v1.Object)
nObj := newObj.(v1.Object)
log.Printf("%s Pod Updated to %s", oObj.GetName(), nObj.GetName())
},
DeleteFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("Pod Deleted from Store: %s", mObj.GetName())
},
})
stopCh := make(chan struct{})
// 3)启动informer factory
factory.Start(stopCh)
// 4)等待list操作获取到的对象都同步到informer本地缓存Indexer中
factory.WaitForCacheSync(stopCh)
<-stopCh
}
2)sharedInformerFactory的初始化
// vendor/k8s.io/client-go/informers/factory.go
type sharedInformerFactory struct {
client kubernetes.Interface
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
defaultResync time.Duration
customResync map[reflect.Type]time.Duration
informers map[reflect.Type]cache.SharedIndexInformer
startedInformers map[reflect.Type]bool
}
sharedInformerFactory结构体中几个比较重要的属性:
- client:连接Kubernetes的clientSet
- informers:存储了资源类型和对应的SharedIndexInformer的映射关系
- startedInformers:记录已经启动的informer
NewSharedInformerFactory方法用于初始化InformerFactory,主要是初始化并返回sharedInformerFactory结构体,代码如下:
// vendor/k8s.io/client-go/informers/factory.go
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
return NewSharedInformerFactoryWithOptions(client, defaultResync)
}
func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
}
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}
// Apply all options
for _, opt := range options {
factory = opt(factory)
}
return factory
}
3)informer的初始化
示例代码中,调用factory.Core().V1().Pods().Informer()
初始化了sharedInformerFactory中的pod informer
// vendor/k8s.io/client-go/informers/core/v1/pod.go
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
Informer方法中调用f.factory.InformerFor
方法来初始化pod informer
// vendor/k8s.io/client-go/informers/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(obj)
// informers中存储了资源类型和对应的sharedIndexInformer的映射关系
// 如果已经存在同类型的资源informer,则返回当前的informer,不再继续添加
informer, exists := f.informers[informerType]
if exists {
return informer
}
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
informers中存储了资源类型和对应的sharedIndexInformer的映射关系,如果已经存在同类型的资源informer,则返回当前的informer,不再继续添加
defaultInformer方法中调用了NewFilteredPodInformer方法来初始化pod informer,最终初始化并返回sharedIndexInformer结构体
// vendor/k8s.io/client-go/informers/core/v1/pod.go
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
// vendor/k8s.io/client-go/tools/cache/shared_informer.go
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
sharedIndexInformer结构体代码如下:
// vendor/k8s.io/client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
indexer Indexer
controller Controller
processor *sharedProcessor
cacheMutationDetector MutationDetector
listerWatcher ListerWatcher
objectType runtime.Object
resyncCheckPeriod time.Duration
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
blockDeltas sync.Mutex
watchErrorHandler WatchErrorHandler
}
sharedIndexInformer结构体中几个比较重要的属性:
- indexer:Indexer中有Informer维护的指定资源对象的相对于etcd数据的一份本地缓存,可通过该缓存获取资源对象,以减少对Kubernetes API Server和etcd集群的请求压力
- controller:Controller从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer中的本地缓存,并通知Processor相关对象有变化时间发生
- processor:Processor根据对象的变化事件类型,调用相应的ResourceEventHandler来处理对象的变化
controller结构体代码如下:
// vendor/k8s.io/client-go/tools/cache/controller.go
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
type Config struct {
// DeltaFIFO
Queue
ListerWatcher
Process ProcessFunc
ObjectType runtime.Object
FullResyncPeriod time.Duration
ShouldResync ShouldResyncFunc
RetryOnError bool
WatchErrorHandler WatchErrorHandler
WatchListPageSize int64
}
controller结构体中包含了Reflector和DeltaFIFO
- Reflector:Reflector从Kubernetes API Server中listAndWatch资源对象,然后将对象的变化包装成Delta并将其存储到DeltaFIFO中
- DeltaFIFO:DeltaFIFO存储着map[object key]Deltas以及object key的queue,Delta装有对象及对象的变化类型 ,Reflector负责DeltaFIFO的输入,Controller负责处理DeltaFIFO的输出
3)启动sharedInformerFactory
sharedInformerFactory.Start
为sharedInformerFactory的启动方法,其主要逻辑为循环遍历informers,然后运行goroutine调用informer.Run
来启动sharedInformerFactory中存储的各个informer
// vendor/k8s.io/client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
a)informer.Run
// vendor/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 1)初始化DeltaFIFO
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
// 2)构建controller中的需要的config,Process属性赋值了s.HandleDeltas
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 3)使用config来初始化controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
// 4)调用s.processor.run,启动processor
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
// 5)调用s.controller.Run,启动controller
s.controller.Run(stopCh)
}
informer.Run
方法用于启动Informer,主要逻辑如下:
- 初始化DeltaFIFO
- 构建controller中的需要的config,Process属性赋值了
s.HandleDeltas
- 使用config来初始化controller
- 调用
s.processor.run
,启动processor - 调用
s.controller.Run
,启动controller
b)s.processor.run
s.processor.run
启动了processor,其中注意到listener.run
与listener.pop
两个核心方法即可,暂时没有用到,等下面用到他们的时候再做分析
// vendor/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
c)s.controller.Run
s.controller.Run
为controller的启动方法,代码如下:
// vendor/k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 1)初始化Reflector
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
// 2)调用r.Run,启动Reflector
wg.StartWithChannel(stopCh, r.Run)
// 3)调用c.processLoop,开始controller的核心处理逻辑
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
controller.Run
主要逻辑如下:
- 初始化Reflector
- 调用
r.Run
,启动Reflector - 调用
c.processLoop
,开始controller的核心处理逻辑
d)r.Run
r.Run
方法中启动Reflector,其中调用Reflector的ListAndWatch方法,核心逻辑为从Kubernetes API Server做listAndWatch操作,然后将得到的对象封装存储到DeltaFIFO中,代码如下:
// vendor/k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
// 调用Reflector的ListAndWatch方法,核心逻辑为从Kubernetes API Server做listAndWatch操作,然后将得到的对象封装存储到DeltaFIFO中
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}s
e)c.processLoop
c.processLoop
为controller的核心处理逻辑,代码如下:
// vendor/k8s.io/client-go/tools/cache/controller.go
func (c *controller) processLoop() {
for {
// 循环调用c.config.Queue.Pop将DeltaFIFO选中的队头元素给pop出来,然后调用c.config.Process方法来做处理
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
// 当处理出错时,再调用c.config.Queue.AddIfNotPresent将对象重新加入到DeltaFIFO中去
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
processLoop方法中,循环调用c.config.Queue.Pop
将DeltaFIFO选中的队头元素给pop出来,然后调用c.config.Process
方法来做处理。当处理出错时,再调用c.config.Queue.AddIfNotPresent
将对象重新加入到DeltaFIFO中去
根据前面sharedIndexInformer.Run
方法的分析中可以得知,c.config.Process
其实就是sharedIndexInformer.HandleDeltas
方法
f)sharedIndexInformer.HandleDeltas
// vendor/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
HandleDeltas方法中,将从DeltaFIFO中pop出来的对象以及类型,相应的在indexer中做添加、更新、删除操作,并调用s.processor.distribute
通知自定义的ResourceEventHandler
g)s.processor.distribute
// vendor/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
distribute方法最终是将构造好的addNotification、updateNotification、deleteNotification对象写入到p.addCh
中
到这里s.processor.run
中调用的listener.run
与listener.pop
方法终于派上了用场
h)listener.pop
// vendor/k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
pop方法就是将p.addCh
中的对象给拿出来,然后放到了p.nextCh
中
i)listener.run
// vendor/k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
run方法中循环读取p.nextCh
,判断对象类型,是updateNotification则调用p.handler.OnUpdate
方法,是addNotification则调用p.handler.OnAdd
方法,是deleteNotification则调用p.handler.OnDelete
方法做处理
而p.handler.OnUpdate
、p.handler.OnAdd
、p.handler.OnDelete
方法实际上就是自定义的的ResourceEventHandlerFuncs了
// vendor/k8s.io/client-go/tools/cache/controller.go
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
if r.AddFunc != nil {
r.AddFunc(obj)
}
}
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
if r.UpdateFunc != nil {
r.UpdateFunc(oldObj, newObj)
}
}
func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
if r.DeleteFunc != nil {
r.DeleteFunc(obj)
}
}
4)Informer整体架构回顾
Informer整体架构如下图:
client-go的Informer主要包括以下组件:
- Reflector:Reflector从Kubernetes API Server中list&watch资源对象,然后调用DeltaFIFO的Add/Update/Delete/Replace方法将资源对象及其变化包装成Delta并将其丢到DeltaFIFO中
- DeltaFIFO:DeltaFIFO中存储着一个map和一个queue,即map[object key]Deltas以及object key的queue,Deltas为Delta的切片类型,Delta装有对象及对象的变化类型(Added/Updated/Deleted/Sync),Reflector负责DeltaFIFO的输入,Controller负责处理DeltaFIFO的输出
- Controller:Controller从DeltaFIFO的queue中pop一个object key出来,并获取其关联的Deltas出来进行处理,遍历Deltas,根据对象的变化更新Indexer中的本地内存缓存,并通知Processor,相关对象有变化事件发生
- Processor:Processor根据对象的变化事件类型,调用相应的ResourceEventHandler来处理对象的变化
- Indexer:Indexer中有informer维护的指定资源对象的相对于etcd数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对Kubernetes API Server、对etcd的请求压力
- ResourceEventHandler:用户根据自身处理逻辑需要,注册自定义的的ResourceEventHandler,当对象发生变化时,将触发调用对应类型的ResourceEventHandler来做处理
参考:
《Kubernetes源码剖析》
2022年最新k8s编程operator篇
k8s client-go源码分析 informer源码分析(6)-Indexer源码分析
k8s client-go源码分析 informer源码分析(2)-初始化与启动分析