client-go源码学习(三):Indexer、SharedInformer

news2025/1/6 20:34:03

本文基于Kubernetes v1.22.4版本进行源码学习,对应的client-go版本为v0.22.4

3、Informer机制

4)、Indexer

Indexer中有Informer维护的指定资源对象的相对于etcd数据的一份本地缓存,可通过该缓存获取资源对象,以减少对Kubernetes API Server和etcd集群的请求压力

Indexer除了维护一份本地缓存之外,还有一个很重要的功能,便是索引功能。索引的目的是为了快速查找,比如我们需要查找某个node节点上的所有pod、查找某个命名空间下的所有pod等,利用索引可以实现快速查找

1)Indexer的结构定义分析

Indexer接口继承了一个Store接口(实现本地缓存),以及包含几个index索引相关的方法声明(实现索引功能)

// vendor/k8s.io/client-go/tools/cache/index.go
type Indexer interface {
	Store

	Index(indexName string, obj interface{}) ([]interface{}, error)

	IndexKeys(indexName, indexedValue string) ([]string, error)

	ListIndexFuncValues(indexName string) []string

	ByIndex(indexName, indexedValue string) ([]interface{}, error)

	GetIndexers() Indexers

	AddIndexers(newIndexers Indexers) error
}

Store接口中定义了Add、Update、Delete、List、Get等一些资源对象增删改查的方法声明,用于操作Informer的本地缓存

// vendor/k8s.io/client-go/tools/cache/store.go
type Store interface {

	Add(obj interface{}) error

	Update(obj interface{}) error

	Delete(obj interface{}) error

	List() []interface{}

	ListKeys() []string

	Get(obj interface{}) (item interface{}, exists bool, err error)

	GetByKey(key string) (item interface{}, exists bool, err error)
  
	Replace([]interface{}, string) error

	Resync() error
}

cache struct是Indexer接口的一个实现,所以自然也是Store接口的一个实现,cache struct包含一个ThreadSafeStore接口的实现,以及一个计算object key的函数KeyFunc

cache struct会根据KeyFunc生成某个资源对象对应的一个唯一key,然后调用ThreadSafeStore接口中的方法来操作本地缓存中的对象

// vendor/k8s.io/client-go/tools/cache/store.go
type cache struct {

	cacheStorage ThreadSafeStore

	keyFunc KeyFunc
}

ThreadSafeStore接口包含了操作本地缓存的增删改查方法以及索引功能的相关方法,其方法名称与Indexer接口类似,最大的区别是ThreadSafeStore接口的增删改查方法入参基本都有key,由cache struct中的KeyFunc函数计算得出

// vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
type ThreadSafeStore interface {
	Add(key string, obj interface{})
	Update(key string, obj interface{})
	Delete(key string)
	Get(key string) (item interface{}, exists bool)
	List() []interface{}
	ListKeys() []string
	Replace(map[string]interface{}, string)
	Index(indexName string, obj interface{}) ([]interface{}, error)
	IndexKeys(indexName, indexKey string) ([]string, error)
	ListIndexFuncValues(name string) []string
	ByIndex(indexName, indexKey string) ([]interface{}, error)
	GetIndexers() Indexers

	AddIndexers(newIndexers Indexers) error

	Resync() error
}

threadSafeMap struct是ThreadSafeStore接口的一个实现。items字段中存储的是资源对象数据,其中items的key通过keyFunc函数计算得出,计算默认使用MetaNamespaceKeyFunc函数,该函数根据资源对象计算出<namespace>/<name>格式的key,如果资源对象的<namespace>为空,则<name>作为key,而items的value用于存储资源对象。indexers与indices属性则与索引功能有关

// vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
type threadSafeMap struct {
	lock  sync.RWMutex
	items map[string]interface{}

	indexers Indexers

	indices Indices
}

Indexer整体结构如下图:

在这里插入图片描述

2)Indexer的索引功能

在threadSafeMap struct中,与索引功能有关的是indexers与indices属性,代码如下:

// vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
type threadSafeMap struct {
	lock  sync.RWMutex
	items map[string]interface{}

	indexers Indexers

	indices Indices
}

type Indexers map[string]IndexFunc

type IndexFunc func(obj interface{}) ([]string, error)

type Indices map[string]Index

type Index map[string]sets.String

Indexer数据结构说明如下:

  • Indexers:存储索引器,key为索引器名称,value为索引器的实现函数
  • IndexFunc:索引器函数,定义为接收一个资源对象,返回检索结果列表
  • Indices:存储缓存器,key为缓存器名称,value为缓存数据
  • Index:存储缓存数据,其结构为K/V

indexers包含了所有索引器(索引分类)及其索引器函数IndexFunc,IndexFunc为计算某个索引键下的所有对象键列表的方法:

indexers: {
  "索引器1": 索引函数1,
  "索引器2": 索引函数2,
}

数据示例:

indexers: {
  "namespace": MetaNamespaceIndexFunc,
  "nodeName": NodeNameIndexFunc,
}
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
	meta, err := meta.Accessor(obj)
	if err != nil {
		return []string{""}, fmt.Errorf("object has no meta: %v", err)
	}
	return []string{meta.GetNamespace()}, nil
}

func NodeNameIndexFunc(obj interface{}) ([]string, error) {
	pod, ok := obj.(*api.Pod)
	if !ok {
		return nil, fmt.Errorf("not a pod")
	}
	return []string{pod.Spec.NodeName}, nil
}

indices包含了所有索引器(索引分类)及其所有的索引数据Index,而Index则包含了索引键以及索引键下的所有对象键的列表:

indices: {
  "索引器1": {
    "索引键1": ["对象键1", "对象键2"],
    "索引键2": ["对象键3"],
  },
  "索引器2": {
    "索引键3": ["对象键1"],
    "索引键4": ["对象键2", "对象键3"]
  }
}

数据示例:

	pod1 := &v1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "pod-1",
			Namespace: "default",
		},
		Spec: v1.PodSpec{
			NodeName: "node1",
		},
	}
	pod2 := &v1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "pod-2",
			Namespace: "default",
		},
		Spec: v1.PodSpec{
			NodeName: "node2",
		},
	}
	pod3 := &v1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "pod-3",
			Namespace: "kube-system",
		},
		Spec: v1.PodSpec{
			NodeName: "node2",
		},
	}
indices: {
  "namespace": {
    "default": ["pod-1", "pod-2"],
    "kube-system": ["pod-3"],
  },
  "nodeName": {
  	"node1": ["pod-1"],
  	"node2": ["pod-2", "pod-3"],
  }
}
3)Indexer的索引功能核心实现

index.ByIndex函数通过执行索引器函数得到索引结果,代码如下:

// vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
	c.lock.RLock()
	defer c.lock.RUnlock()

	// 1)从c.indexers中查找指定的索引器函数
	indexFunc := c.indexers[indexName]
	if indexFunc == nil {
		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
	}

	// 2)从c.indices中查找指定的缓存器函数
	index := c.indices[indexName]

	// 3)根据需要检索的indexedValue从缓存数据中查到并返回数据
	set := index[indexedValue]
	list := make([]interface{}, 0, set.Len())
	for key := range set {
		list = append(list, c.items[key])
	}

	return list, nil
}

ByIndex接收两个参数:indexName(索引器名称)和indexedValue(需要检索的key)。首先从c.indexers中查找指定的索引器函数,从c.indices中查找指定的缓存器函数,然后根据需要检索的indexedValue从缓存数据中查到并返回数据

threadSafeMap的Add、Update方法将资源对象存入items后,并调用updateIndices方法更新索引,代码如下:

// vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
	// if we got an old object, we need to remove it before we add it again
	if oldObj != nil {
		c.deleteFromIndices(oldObj, key)
	}
	// 1)遍历存储索引器indexers
	for name, indexFunc := range c.indexers {
		// 2)调用indexFunc计算出检索结果列表
		indexValues, err := indexFunc(newObj)
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}
		// 3)从存储缓存器indices获取对应的index
		index := c.indices[name]
		if index == nil {
			index = Index{}
			c.indices[name] = index
		}

		// 4)遍历indexFunc计算出的检索结果列表,添加到index中
		for _, indexValue := range indexValues {
			set := index[indexValue]
			if set == nil {
				set = sets.String{}
				index[indexValue] = set
			}
			set.Insert(key)
		}
	}
}

5)、SharedInformer

1)示例代码

使用SharedInformer监听default命名空间下的Pod资源对象的Added、Updated、Deleted事件,代码如下:

package main

import (
	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"log"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		panic(err)
	}

	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	// 1)初始化informer factory
	factory := informers.NewSharedInformerFactoryWithOptions(
		clientset, 0, informers.WithNamespace("default"))
	// 2)初始化pod informer
	informer := factory.Core().V1().Pods().Informer()

	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("New Pod Added to Store: %s", mObj.GetName())
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			oObj := oldObj.(v1.Object)
			nObj := newObj.(v1.Object)
			log.Printf("%s Pod Updated to %s", oObj.GetName(), nObj.GetName())
		},
		DeleteFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("Pod Deleted from Store: %s", mObj.GetName())
		},
	})

	stopCh := make(chan struct{})
	// 3)启动informer factory
	factory.Start(stopCh)
	// 4)等待list操作获取到的对象都同步到informer本地缓存Indexer中
	factory.WaitForCacheSync(stopCh)
	<-stopCh
}
2)sharedInformerFactory的初始化
// vendor/k8s.io/client-go/informers/factory.go
type sharedInformerFactory struct {
	client           kubernetes.Interface
	namespace        string
	tweakListOptions internalinterfaces.TweakListOptionsFunc
	lock             sync.Mutex
	defaultResync    time.Duration
	customResync     map[reflect.Type]time.Duration

	informers map[reflect.Type]cache.SharedIndexInformer

	startedInformers map[reflect.Type]bool
}

sharedInformerFactory结构体中几个比较重要的属性:

  1. client:连接Kubernetes的clientSet
  2. informers:存储了资源类型和对应的SharedIndexInformer的映射关系
  3. startedInformers:记录已经启动的informer

NewSharedInformerFactory方法用于初始化InformerFactory,主要是初始化并返回sharedInformerFactory结构体,代码如下:

// vendor/k8s.io/client-go/informers/factory.go
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
	return NewSharedInformerFactoryWithOptions(client, defaultResync)
}

func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
	return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
}

func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
	factory := &sharedInformerFactory{
		client:           client,
		namespace:        v1.NamespaceAll,
		defaultResync:    defaultResync,
		informers:        make(map[reflect.Type]cache.SharedIndexInformer),
		startedInformers: make(map[reflect.Type]bool),
		customResync:     make(map[reflect.Type]time.Duration),
	}

	// Apply all options
	for _, opt := range options {
		factory = opt(factory)
	}

	return factory
}

3)informer的初始化

示例代码中,调用factory.Core().V1().Pods().Informer()初始化了sharedInformerFactory中的pod informer

// vendor/k8s.io/client-go/informers/core/v1/pod.go
func (f *podInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

Informer方法中调用f.factory.InformerFor方法来初始化pod informer

// vendor/k8s.io/client-go/informers/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

	informerType := reflect.TypeOf(obj)
	// informers中存储了资源类型和对应的sharedIndexInformer的映射关系
	// 如果已经存在同类型的资源informer,则返回当前的informer,不再继续添加
	informer, exists := f.informers[informerType]
	if exists {
		return informer
	}

	resyncPeriod, exists := f.customResync[informerType]
	if !exists {
		resyncPeriod = f.defaultResync
	}

	informer = newFunc(f.client, resyncPeriod)
	f.informers[informerType] = informer

	return informer
}

informers中存储了资源类型和对应的sharedIndexInformer的映射关系,如果已经存在同类型的资源informer,则返回当前的informer,不再继续添加

defaultInformer方法中调用了NewFilteredPodInformer方法来初始化pod informer,最终初始化并返回sharedIndexInformer结构体

// vendor/k8s.io/client-go/informers/core/v1/pod.go
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
			},
		},
		&corev1.Pod{},
		resyncPeriod,
		indexers,
	)
}
// vendor/k8s.io/client-go/tools/cache/shared_informer.go
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
	realClock := &clock.RealClock{}
	sharedIndexInformer := &sharedIndexInformer{
		processor:                       &sharedProcessor{clock: realClock},
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
		listerWatcher:                   lw,
		objectType:                      exampleObject,
		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
		clock:                           realClock,
	}
	return sharedIndexInformer
}

sharedIndexInformer结构体代码如下:

// vendor/k8s.io/client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
	indexer    Indexer
	controller Controller

	processor             *sharedProcessor
	cacheMutationDetector MutationDetector

	listerWatcher ListerWatcher

	objectType runtime.Object

	resyncCheckPeriod time.Duration

	defaultEventHandlerResyncPeriod time.Duration

	clock clock.Clock

	started, stopped bool
	startedLock      sync.Mutex

	blockDeltas sync.Mutex

  watchErrorHandler WatchErrorHandler
}

sharedIndexInformer结构体中几个比较重要的属性:

  1. indexer:Indexer中有Informer维护的指定资源对象的相对于etcd数据的一份本地缓存,可通过该缓存获取资源对象,以减少对Kubernetes API Server和etcd集群的请求压力
  2. controller:Controller从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer中的本地缓存,并通知Processor相关对象有变化时间发生
  3. processor:Processor根据对象的变化事件类型,调用相应的ResourceEventHandler来处理对象的变化

controller结构体代码如下:

// vendor/k8s.io/client-go/tools/cache/controller.go
type controller struct {
	config         Config
	reflector      *Reflector
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

type Config struct {
	// DeltaFIFO
	Queue

	ListerWatcher

	Process ProcessFunc

	ObjectType runtime.Object

	FullResyncPeriod time.Duration

	ShouldResync ShouldResyncFunc

	RetryOnError bool

	WatchErrorHandler WatchErrorHandler

	WatchListPageSize int64
}

controller结构体中包含了Reflector和DeltaFIFO

  1. Reflector:Reflector从Kubernetes API Server中listAndWatch资源对象,然后将对象的变化包装成Delta并将其存储到DeltaFIFO中
  2. DeltaFIFO:DeltaFIFO存储着map[object key]Deltas以及object key的queue,Delta装有对象及对象的变化类型 ,Reflector负责DeltaFIFO的输入,Controller负责处理DeltaFIFO的输出
3)启动sharedInformerFactory

sharedInformerFactory.Start为sharedInformerFactory的启动方法,其主要逻辑为循环遍历informers,然后运行goroutine调用informer.Run来启动sharedInformerFactory中存储的各个informer

// vendor/k8s.io/client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()

	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
			go informer.Run(stopCh)
			f.startedInformers[informerType] = true
		}
	}
}

a)informer.Run

// vendor/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	// 1)初始化DeltaFIFO
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          s.indexer,
		EmitDeltaTypeReplaced: true,
	})

	// 2)构建controller中的需要的config,Process属性赋值了s.HandleDeltas
	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process:           s.HandleDeltas,
		WatchErrorHandler: s.watchErrorHandler,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		
		// 3)使用config来初始化controller
		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	// 4)调用s.processor.run,启动processor
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	// 5)调用s.controller.Run,启动controller
	s.controller.Run(stopCh)
}

informer.Run方法用于启动Informer,主要逻辑如下:

  1. 初始化DeltaFIFO
  2. 构建controller中的需要的config,Process属性赋值了s.HandleDeltas
  3. 使用config来初始化controller
  4. 调用s.processor.run,启动processor
  5. 调用s.controller.Run,启动controller

b)s.processor.run

s.processor.run启动了processor,其中注意到listener.runlistener.pop两个核心方法即可,暂时没有用到,等下面用到他们的时候再做分析

// vendor/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		for _, listener := range p.listeners {
			p.wg.Start(listener.run)
			p.wg.Start(listener.pop)
		}
		p.listenersStarted = true
	}()
	<-stopCh
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	for _, listener := range p.listeners {
		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
	}
	p.wg.Wait() // Wait for all .pop() and .run() to stop
}

c)s.controller.Run

s.controller.Run为controller的启动方法,代码如下:

// vendor/k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	// 1)初始化Reflector
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.WatchListPageSize = c.config.WatchListPageSize
	r.clock = c.clock
	if c.config.WatchErrorHandler != nil {
		r.watchErrorHandler = c.config.WatchErrorHandler
	}

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group

	// 2)调用r.Run,启动Reflector
	wg.StartWithChannel(stopCh, r.Run)

	// 3)调用c.processLoop,开始controller的核心处理逻辑
	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

controller.Run主要逻辑如下:

  1. 初始化Reflector
  2. 调用r.Run,启动Reflector
  3. 调用c.processLoop,开始controller的核心处理逻辑

d)r.Run

r.Run方法中启动Reflector,其中调用Reflector的ListAndWatch方法,核心逻辑为从Kubernetes API Server做listAndWatch操作,然后将得到的对象封装存储到DeltaFIFO中,代码如下:

// vendor/k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.BackoffUntil(func() {
		// 调用Reflector的ListAndWatch方法,核心逻辑为从Kubernetes API Server做listAndWatch操作,然后将得到的对象封装存储到DeltaFIFO中
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
	klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}s

e)c.processLoop

c.processLoop为controller的核心处理逻辑,代码如下:

// vendor/k8s.io/client-go/tools/cache/controller.go
func (c *controller) processLoop() {
	for {
		// 循环调用c.config.Queue.Pop将DeltaFIFO选中的队头元素给pop出来,然后调用c.config.Process方法来做处理
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		// 当处理出错时,再调用c.config.Queue.AddIfNotPresent将对象重新加入到DeltaFIFO中去
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

processLoop方法中,循环调用c.config.Queue.Pop将DeltaFIFO选中的队头元素给pop出来,然后调用c.config.Process方法来做处理。当处理出错时,再调用c.config.Queue.AddIfNotPresent将对象重新加入到DeltaFIFO中去

根据前面sharedIndexInformer.Run方法的分析中可以得知,c.config.Process其实就是sharedIndexInformer.HandleDeltas方法

f)sharedIndexInformer.HandleDeltas

// vendor/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Replaced, Added, Updated:
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}

				isSync := false
				switch {
				case d.Type == Sync:
					// Sync events are only propagated to listeners that requested resync
					isSync = true
				case d.Type == Replaced:
					if accessor, err := meta.Accessor(d.Object); err == nil {
						if oldAccessor, err := meta.Accessor(old); err == nil {
							// Replaced events that didn't change resourceVersion are treated as resync events
							// and only propagated to listeners that requested resync
							isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
						}
					}
				}
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, false)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

HandleDeltas方法中,将从DeltaFIFO中pop出来的对象以及类型,相应的在indexer中做添加、更新、删除操作,并调用s.processor.distribute通知自定义的ResourceEventHandler

g)s.processor.distribute

// vendor/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
}

func (p *processorListener) add(notification interface{}) {
	p.addCh <- notification
}

distribute方法最终是将构造好的addNotification、updateNotification、deleteNotification对象写入到p.addCh

到这里s.processor.run中调用的listener.runlistener.pop方法终于派上了用场

h)listener.pop

// vendor/k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			// Notification dispatched
			var ok bool
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
				// Optimize the case - skip adding to pendingNotifications
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

pop方法就是将p.addCh中的对象给拿出来,然后放到了p.nextCh

i)listener.run

// vendor/k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) run() {
	// this call blocks until the channel is closed.  When a panic happens during the notification
	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
	// the next notification will be attempted.  This is usually better than the alternative of never
	// delivering again.
	stopCh := make(chan struct{})
	wait.Until(func() {
		for next := range p.nextCh {
			switch notification := next.(type) {
			case updateNotification:
				p.handler.OnUpdate(notification.oldObj, notification.newObj)
			case addNotification:
				p.handler.OnAdd(notification.newObj)
			case deleteNotification:
				p.handler.OnDelete(notification.oldObj)
			default:
				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
			}
		}
		// the only way to get here is if the p.nextCh is empty and closed
		close(stopCh)
	}, 1*time.Second, stopCh)
}

run方法中循环读取p.nextCh,判断对象类型,是updateNotification则调用p.handler.OnUpdate方法,是addNotification则调用p.handler.OnAdd方法,是deleteNotification则调用p.handler.OnDelete方法做处理

p.handler.OnUpdatep.handler.OnAddp.handler.OnDelete方法实际上就是自定义的的ResourceEventHandlerFuncs了

// vendor/k8s.io/client-go/tools/cache/controller.go
type ResourceEventHandler interface {
	OnAdd(obj interface{})
	OnUpdate(oldObj, newObj interface{})
	OnDelete(obj interface{})
}

type ResourceEventHandlerFuncs struct {
	AddFunc    func(obj interface{})
	UpdateFunc func(oldObj, newObj interface{})
	DeleteFunc func(obj interface{})
}

func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
	if r.AddFunc != nil {
		r.AddFunc(obj)
	}
}

func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
	if r.UpdateFunc != nil {
		r.UpdateFunc(oldObj, newObj)
	}
}

func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
	if r.DeleteFunc != nil {
		r.DeleteFunc(obj)
	}
}
4)Informer整体架构回顾

Informer整体架构如下图:

client-go的Informer主要包括以下组件:

  1. Reflector:Reflector从Kubernetes API Server中list&watch资源对象,然后调用DeltaFIFO的Add/Update/Delete/Replace方法将资源对象及其变化包装成Delta并将其丢到DeltaFIFO中
  2. DeltaFIFO:DeltaFIFO中存储着一个map和一个queue,即map[object key]Deltas以及object key的queue,Deltas为Delta的切片类型,Delta装有对象及对象的变化类型(Added/Updated/Deleted/Sync),Reflector负责DeltaFIFO的输入,Controller负责处理DeltaFIFO的输出
  3. Controller:Controller从DeltaFIFO的queue中pop一个object key出来,并获取其关联的Deltas出来进行处理,遍历Deltas,根据对象的变化更新Indexer中的本地内存缓存,并通知Processor,相关对象有变化事件发生
  4. Processor:Processor根据对象的变化事件类型,调用相应的ResourceEventHandler来处理对象的变化
  5. Indexer:Indexer中有informer维护的指定资源对象的相对于etcd数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对Kubernetes API Server、对etcd的请求压力
  6. ResourceEventHandler:用户根据自身处理逻辑需要,注册自定义的的ResourceEventHandler,当对象发生变化时,将触发调用对应类型的ResourceEventHandler来做处理

参考:

《Kubernetes源码剖析》

2022年最新k8s编程operator篇

k8s client-go源码分析 informer源码分析(6)-Indexer源码分析

k8s client-go源码分析 informer源码分析(2)-初始化与启动分析

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/146604.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算Java对象大小(附实际例子分析)

对象大小如何计算 对象大小包括俩部分的内容&#xff0c;对象头和对象内容。&#xff08;图片源于网络&#xff09; 对象头 此处假设是64位的JVM 对象地址&#xff0c;占4个字节。对象标记&#xff0c;占8个字节&#xff0c;包括锁标记&#xff0c;hashcode, age 等。数组…

python 如何使用 pandas 在 flask web 网页中分页显示 csv 文件数据

目录 一、实战场景 二、知识点 python 基础语法 python 文件读写 python 分页 pandas 数据处理 flask web 框架 jinja 模版 三、菜鸟实战 初始化 Flask 框架&#xff0c;设置路由 jinja 模版 渲染列表数据 分页请求数据 显示详情页数据示例 运行结果 运行截图 …

[oeasy]python0040_换行与回车的不同_通用换行符_universal_newlines

换行回车 回忆上次内容 区分概念 terminal终端 主机网络中 最终的 端点 TeleTYpewriter 电传打印机终端硬件 shell 终端硬件基础上的 软件壳子 Console 控制台 主机旁边 的 控制面板 存储文件 的 时候 我 在文件里 打了回车\n系统 将0x0a存入字节 进文件换行 自动就有 回车…

航空客运订票系统(C语言,软件用的DEV)

这两天整理之前的作业代码&#xff0c;把自己一点一点敲出来的系统又看了一下&#xff0c;挑几个发出来供大家参考。想要源码、报告可以找我啦&#xff0c;代码的注释之前写的都是非常详细的&#xff01; 但是不是无偿的啦&#xff08;不坑&#xff0c;一杯奶茶喽&#xff0c;不…

Java逃逸分析(附实际例子分析)

Java逃逸分析 1. 什么是Java逃逸分析 我们知道对象一般是在堆上生成的&#xff0c;但这并不是绝对的。特例就是今天要说的逃逸分析。 JVM 在分析代码以后&#xff0c;发现一个对象在声明之后&#xff0c;只有在它当前声明的这个函数中调用&#xff0c;那么它就会将这个对象在…

《微SaaS创富周刊》第3期:GPT-3\ChatGPT、Stable Diffusion等AI模型驱动的微SaaS创意盘点

大家新年好&#xff01;第3期《微SaaS创富周刊》问世啦&#xff01;本周刊面向独立开发者、早期创业团队&#xff0c;报道他们主要的产品形态——微SaaS如何变现的最新资讯和经验分享等。所谓微SaaS&#xff0c;就是“针对利基市场的SaaS”&#xff0c;特点是一般由个人或者小团…

网络爬虫的危害与防御方法

爬虫程序是一种计算机程序&#xff0c;旨在通过执行自动化或重复性任务来模仿或替代人类的操作。爬虫程序执行任务的速度和准确性比真实用户高得多。爬虫程序在互联网上扮演着各种各样的角色&#xff0c;超过一半的网络流量是由爬虫程序产生的。有些爬虫程序非常有用&#xff0…

v-if和v-show的区别?使用场景?v-if状态改变调用钩子函数的示例

文章目录1、v-show与v-if的共同点2、v-show与v-if的区别3、v-show与v-if的使用场景4、附属到组件和普通元素时的情况4.1、v-show4.2、v-if5、具体实现的效果5.1 查看是否渲染5.2 查看调用的钩子函数6、钩子函数实现的过程分析1、v-show与v-if的共同点 v-show和v-if的作用效果是…

共享模型之管程(五)

1.多线程设计模式 1.1.同步模式之保护性暂停 1.1.1.定义 1>.即Guarded Suspension,用在一个线程等待另一个线程的执行结果的场景中; 2>.使用场景 ①.有一个结果(数据)需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject; ②.如果有结果(数据)不断从一个…

Vitepress(一):基础教程

什么是Vitepress Vitepress是使用Vue3Vite来快速搭建一个个人网站的工具&#xff0c;网站搭建者不需要掌握Vue3&#xff0c;Vite等的具体内容&#xff0c;只需要简单的配置就可以生成Vue风格的个人网站 官方地址&#xff1a;https://vitejs.cn/vitepress/ 本教程希望教会大家…

SD Nand 与 SD卡 SDIO模式应用流程

SD Nand/SD卡 SDIO模式应用流程 文章目录SD Nand/SD卡 SDIO模式应用流程1. 前言1.1 参考文档1.2 概述2. Response响应类型及格式3. 各步骤流程3.1 卡识别流程3.2 通讯速率及总线宽度修改流程3.3 擦除流程3.4 单块读流程3.5 单块写流程3.6 多块读流程3.7 多块写流程4. 结束语SD …

Java初识泛型 | 如何通过泛型类/泛型方法实现求数组元素最大值?

目录 一、引言 二、编程分析 1、泛型类实现 思路 代码 2、泛型方法实现 思路 代码 三、拓展&#xff1a;数组排序&#xff08;以冒泡排序为例&#xff09; 1、int类型 原代码 2、泛型类 3、泛型方法 一、引言 给定一个整型数组&#xff0c;求数组中所有元素的最大…

JVM知识体系学习三:class文件初始化过程、硬件层数据一致性(硬件层)、缓存行、指令乱序执行问题、如何保证不乱序(volatile等)

文章目录前言一、class文件初始化过程1、概述2、初始化过程-案例1a、代码T001_ClassLoadingProcedure 类加载过程b、解析3、初始化过程-案例2a、代码b、解析二、单例模式-双重检查三、硬件层数据一致性1、硬件层的并发优化基础知识b、Intel 的缓存一致性协议&#xff1a;MESI四…

Vivado综合设置之-keep_equivalent_registers

-keep_equivalent_registers即保留等效寄存器&#xff0c;所谓等效寄存器是指共享输入端口&#xff08;输入时钟端口clk和输入数据端口rst&#xff09;的寄存器。 勾选它时&#xff0c;意味着Vivado不会对等效寄存器进行优化&#xff1b; 不勾选它时&#xff08;默认情况&…

eclipse安装UML插件

安装AmaterasUML AmaterasUML 是一个用于 Eclipse 的轻量级 UML 和 ER 图编辑器。 将AmaterasUML的3个jar包拷到Eclpise的plugins文件下&#xff1a; 重启eclipse 在新建菜单中可以发现已经出现了UML文件选项 安装GEF插件&#xff08;Eclipse2018-12 以后无需安装&#xf…

②电子产品拆解分析-电动牙刷

②电子产品拆解分析-电动牙刷一、功能介绍二、电路分析以及器件作用1、振动电机开关控制电路2、锂电池供电与充电电路三、本产品的优缺点1、优点&#xff1a;2、缺点&#xff1a;一、功能介绍 ①5档工作模式&#xff1b;②2分钟倒计时停止工作&#xff1b;③工作续航一个星期以…

【MySQL】详解索引操作

索引什么是索引&#xff1f;索引的优势和劣势索引类型按数据结构分类按物理存储分类按字段特性分类主键索引唯一索引普通索引全文索引前缀索引按字段个数分类索引操作创建索引创建主键索引唯一索引的创建普通索引的创建全文索引的创建explain工具查询索引删除索引索引最好设置为…

SQL 注入学习路线

学习路线&#xff08;大致&#xff09; HTML > SQL > Python > SQL 注入&#xff08;使用 sqli-labs 靶场来学习 SQL 注入&#xff09; HTML 视频 【前端开发入门教程&#xff0c;web前端零基础html5 css3前端项目视频教程】 要求 使用该视频进行 HTML 基础部分…

Python之字符串的特点

1.布尔值 Python2中没有布尔值&#xff0c;直接用数字0表示Flase&#xff0c;用数字1表示True。Python3中&#xff0c;把True和False定义成了关键字&#xff0c;但他们的本质还是1和0&#xff0c;甚至可以和数字相加。 >>> a True >>> b 3 >>> …

[多图,秒懂]如何训练一个“万亿大模型”?

1. 背景近几年&#xff0c;随着“大模型”概念的提出&#xff0c;深度学习模型越来越大&#xff0c;如何训练这些大模型成为一个亟待解决的工程问题。最初的视觉模型只有几百兆的参数量&#xff0c;而现在的语言模型中&#xff0c;动则百亿&#xff0c;千亿的参数量&#xff0c…