client-go源码学习(二):Reflector、DeltaFIFO

news2024/9/28 11:13:02

本文基于Kubernetes v1.22.4版本进行源码学习,对应的client-go版本为v0.22.4

3、Informer机制

在Kubernetes系统中,组件之间通过HTTP协议进行通信,在不依赖任何中间件的情况下需要保证消息的实时性、可靠性、顺序性等。那么Kubernetes是如何做到的呢?答案就是Informer机制。Kubernetes的其他组件都是通过client-go的Informer机制与Kubernetes API Server进行通信的

1)、Informer架构

Informer架构设计中,有多个核心组件:

  1. Reflector:用于监听Kubernetes资源,当资源发生变化时,触发相应的变更事件,例如Added、Updated、Deleted事件,并将其资源对象存放到本地缓存DeltaFIFO中
  2. DeltaFIFO:可以分开理解,FIFO是一个先进先出的队列,它拥有队列操作的基本方法,例如Add、Update、Delete、List、Pop、Close等,而Delta是一个资源对象存储,它可以保存资源对象的操作类型,例如Added、Updated、Deleted、Sync等
  3. Indexer:client-go中用来存储资源对象并自带索引功能的本地存储,Informer从DeltaFIFO中将消费出来的资源对象存储至Indexer。Indexer中的数据与etcd集群中的数据保持完全一致。client-go可以很方便地从本地存储中读取相应的资源对象数据,而无须每次都从远程etcd集群中读取,这样可以减轻Kubernetes API Server和etcd集群的压力

2)、Reflector

Reflector从Kubernetes API Server中listAndWatch资源对象,然后将对象的变化包装成Delta并放入到DeltaFIFO中

Reflector首先通过List操作获取全量的资源对象数据,调用DeltaFIFO的Replace方法全量插入DeltaFIFO,然后后续通过Watch操作根据资源对象的变化类型相应的调用DeltaFIFO的Add、Update、Delete方法,将对象及其变化插入到DeltaFIFO中

1)Reflector初始化

代码路径:vendor/k8s.io/client-go/tools/cache/reflector.go

func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	realClock := &clock.RealClock{}
	r := &Reflector{
		name:          name,
		listerWatcher: lw,
		store:         store,
		// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
		// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
		// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
		backoffManager:         wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
		initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
		resyncPeriod:           resyncPeriod,
		clock:                  realClock,
		watchErrorHandler:      WatchErrorHandler(DefaultWatchErrorHandler),
	}
	r.setExpectedType(expectedType)
	return r
}

通过NewReflector实例化Reflector对象时必须传入ListerWatcher interface的实现,它拥有List和Watch方法,用于获取及监控资源列表

2)ListWatch

代码路径:vendor/k8s.io/client-go/tools/cache/listwatch.go

type ListFunc func(options metav1.ListOptions) (runtime.Object, error)

type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)

type ListWatch struct {
	ListFunc  ListFunc
	WatchFunc WatchFunc
	// DisableChunking requests no chunking for this list watcher.
	DisableChunking bool
}

ListWatch struct实现了ListerWatcher interface

再来看下ListWatch struct初始化的例子:

代码路径:vendor/k8s.io/client-go/informers/core/v1/pod.go

func NewPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
	return NewFilteredPodInformer(client, namespace, resyncPeriod, indexers, nil)
}

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
			},
		},
		&corev1.Pod{},
		resyncPeriod,
		indexers,
	)
}

在NewPodInformer初始化Pod对象的informer中,会初始化ListWatch struct并定义ListFunc和WatchFunc,可以看到ListFunc和WatchFunc即为其资源对象客户端的List与Watch方法

3)ListAndWatch函数

在Reflector源码实现中,其中最重要的是ListAndWatch函数,它负责获取资源列表和监听指定的Kubernetes API Server资源,ListAndWatch函数实现可分为三部分:List操作、Resync操作、Watch操作

a)List操作

List在程序第一次运行时获取该资源下所有的对象数据并将其存储至DeltaFIFO中,相关源码如下:

代码路径:vendor/k8s.io/client-go/tools/cache/reflector.go

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	// 1.List操作(只执行一次)
	klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
	var resourceVersion string

	// 1)设置ListOptions,将resourceVersion设置为"0"
	options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

	if err := func() error {
		initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
		defer initTrace.LogIfLong(10 * time.Second)
		var list runtime.Object
		var paginatedResult bool
		var err error
		listCh := make(chan struct{}, 1)
		panicCh := make(chan interface{}, 1)
		// 2)调用r.listerWatcher.List方法,执行list操作,获取全量的资源对象
		go func() {
			defer func() {
				if r := recover(); r != nil {
					panicCh <- r
				}
			}()
			// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
			// list request will return the full response.
			pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
				return r.listerWatcher.List(opts)
			}))
			switch {
			case r.WatchListPageSize != 0:
				pager.PageSize = r.WatchListPageSize
			case r.paginatedResult:
				// We got a paginated result initially. Assume this resource and server honor
				// paging requests (i.e. watch cache is probably disabled) and leave the default
				// pager size set.
			case options.ResourceVersion != "" && options.ResourceVersion != "0":
				// User didn't explicitly request pagination.
				//
				// With ResourceVersion != "", we have a possibility to list from watch cache,
				// but we do that (for ResourceVersion != "0") only if Limit is unset.
				// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
				// switch off pagination to force listing from watch cache (if enabled).
				// With the existing semantic of RV (result is at least as fresh as provided RV),
				// this is correct and doesn't lead to going back in time.
				//
				// We also don't turn off pagination for ResourceVersion="0", since watch cache
				// is ignoring Limit in that case anyway, and if watch cache is not enabled
				// we don't introduce regression.
				pager.PageSize = 0
			}

			list, paginatedResult, err = pager.List(context.Background(), options)
			if isExpiredError(err) || isTooLargeResourceVersionError(err) {
				r.setIsLastSyncResourceVersionUnavailable(true)
				// Retry immediately if the resource version used to list is unavailable.
				// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
				// continuation pages, but the pager might not be enabled, the full list might fail because the
				// resource version it is listing at is expired or the cache may not yet be synced to the provided
				// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
				// the reflector makes forward progress.
				list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
			}
			close(listCh)
		}()
		select {
		case <-stopCh:
			return nil
		case r := <-panicCh:
			panic(r)
		case <-listCh:
		}
		if err != nil {
			return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
		}

		// We check if the list was paginated and if so set the paginatedResult based on that.
		// However, we want to do that only for the initial list (which is the only case
		// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
		// situations we may force listing directly from etcd (by setting ResourceVersion="")
		// which will return paginated result, even if watch cache is enabled. However, in
		// that case, we still want to prefer sending requests to watch cache if possible.
		//
		// Paginated result returned for request with ResourceVersion="0" mean that watch
		// cache is disabled and there are a lot of objects of a given type. In such case,
		// there is no need to prefer listing from watch cache.
		if options.ResourceVersion == "0" && paginatedResult {
			r.paginatedResult = true
		}

		r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
		initTrace.Step("Objects listed")
		listMetaInterface, err := meta.ListAccessor(list)
		if err != nil {
			return fmt.Errorf("unable to understand list result %#v: %v", list, err)
		}
		// 3)根据list返回的结果,获取最新的resourceVersion
		resourceVersion = listMetaInterface.GetResourceVersion()
		initTrace.Step("Resource version extracted")
		// 4)将list返回的结果转换为[]runtime.Object
		items, err := meta.ExtractList(list)
		if err != nil {
			return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
		}
		initTrace.Step("Objects extracted")
		// 5)调用r.syncWith,将资源对象列表和resourceVersion存储(Replace)至DeltaFIFO中
		if err := r.syncWith(items, resourceVersion); err != nil {
			return fmt.Errorf("unable to sync list result: %v", err)
		}
		initTrace.Step("SyncWith done")
		// 6)调用r.setLastSyncResourceVersion,更新Reflector中已被处理的最新资源对象的resourceVersion值
		r.setLastSyncResourceVersion(resourceVersion)
		initTrace.Step("Resource version updated")
		return nil
	}(); err != nil {
		return err
	}
  ...

List操作(只执行一次)逻辑如下:

  1. 设置ListOptions,将resourceVersion设置为"0"
  2. 调用r.listerWatcher.List方法,执行list操作,获取全量的资源对象
  3. 根据list返回的结果,获取最新的resourceVersion
  4. 将list返回的结果转换为[]runtime.Object
  5. 调用r.syncWith,将资源对象列表和resourceVersion存储(Replace)至DeltaFIFO中
  6. 调用r.setLastSyncResourceVersion,更新Reflector中已被处理的最新资源对象的resourceVersion值

resourceVersion的作用:

  • 保证客户端数据一致性和顺序性
  • 乐观锁,实现并发控制

设置ListOptions时,resourceVersion有三种设置方法:

  • 不设置,此时会直接从etcd中读取,此时数据是最新的
  • 设置为"0",此时会从API Server Cache中获取数据
  • 设置为指定的resourceVersion,获取resourceVersion大于指定版本的所有资源对象

b)Resync操作

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
  ...
	// 2.Resync操作(异步循环执行)
	resyncerrc := make(chan error, 1)
	cancelCh := make(chan struct{})
	defer close(cancelCh)
	go func() {
		resyncCh, cleanup := r.resyncChan()
		defer func() {
			cleanup() // Call the last one written into cleanup
		}()
		for {
			select {
			case <-resyncCh:
			case <-stopCh:
				return
			case <-cancelCh:
				return
			}
			// 1)判断是否需要执行Resync操作,即重新同步
			if r.ShouldResync == nil || r.ShouldResync() {
				klog.V(4).Infof("%s: forcing resync", r.name)
				// 2)如果需要,则调用r.store.Resync进行重新同步
				if err := r.store.Resync(); err != nil {
					resyncerrc <- err
					return
				}
			}
			cleanup()
			resyncCh, cleanup = r.resyncChan()
		}
	}()
  ...

Resync操作(异步循环执行)逻辑如下

  1. 判断是否需要执行Resync操作,即重新同步
  2. 如果需要,则调用r.store.Resync进行重新同步

c)Watch操作

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	...
	// 3.Watch操作(循环执行)
	for {
		// 1)根据stopCh判断是否需要退出循环
		// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
		select {
		case <-stopCh:
			return nil
		default:
		}

		timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
		// 2)设置ListOptions,将resourceVersion设置为最新的resourceVersion,即从list返回的最新的resourceVersion开始执行watch操作
		options = metav1.ListOptions{
			ResourceVersion: resourceVersion,
			// We want to avoid situations of hanging watchers. Stop any wachers that do not
			// receive any events within the timeout window.
			TimeoutSeconds: &timeoutSeconds,
			// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
			// Reflector doesn't assume bookmarks are returned at all (if the server do not support
			// watch bookmarks, it will ignore this field).
			AllowWatchBookmarks: true,
		}

		// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
		start := r.clock.Now()
		// 3)调用r.listerWatcher.Watch,开始监听操作
		w, err := r.listerWatcher.Watch(options)
		if err != nil {
			// If this is "connection refused" error, it means that most likely apiserver is not responsive.
			// It doesn't make sense to re-list all objects because most likely we will be able to restart
			// watch where we ended.
			// If that's the case begin exponentially backing off and resend watch request.
			// Do the same for "429" errors.
			if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
				<-r.initConnBackoffManager.Backoff().C()
				continue
			}
			return err
		}

		// 4)调用r.watchHandler,处理watch操作返回来的结果
		if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
			if err != errorStopRequested {
				switch {
				case isExpiredError(err):
					// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
					// has a semantic that it returns data at least as fresh as provided RV.
					// So first try to LIST with setting RV to resource version of last observed object.
					klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
				case apierrors.IsTooManyRequests(err):
					klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)
					<-r.initConnBackoffManager.Backoff().C()
					continue
				default:
					klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
				}
			}
			return nil
		}
	}
}  

Watch操作(循环执行)逻辑如下

  1. 根据stopCh判断是否需要退出循环
  2. 设置ListOptions,将resourceVersion设置为最新的resourceVersion,即从list返回的最新的resourceVersion开始执行watch操作
  3. 调用r.listerWatcher.Watch,开始监听操作
  4. 调用r.watchHandler,处理watch操作返回来的结果

watchHandler用于处理资源的变更事件。当触发Added、Updated、Deleted事件时,将对应的资源对象更新到本地缓存DeltaFIFO并更新resourceVersion。代码如下:

func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
	eventCount := 0

	// Stopping the watcher should be idempotent and if we return from this function there's no way
	// we're coming back in with the same watch interface.
	defer w.Stop()

loop:
	for {
		select {
		case <-stopCh:
			return errorStopRequested
		case err := <-errc:
			return err
		// 1)从watch操作返回来的结果中获取event事件
		case event, ok := <-w.ResultChan():
			if !ok {
				break loop
			}
			if event.Type == watch.Error {
				return apierrors.FromObject(event.Object)
			}
			if r.expectedType != nil {
				if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
					utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
					continue
				}
			}
			if r.expectedGVK != nil {
				if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
					utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
					continue
				}
			}
			meta, err := meta.Accessor(event.Object)
			if err != nil {
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
				continue
			}
			// 2)获得当前watch到资源的resourceVersion
			newResourceVersion := meta.GetResourceVersion()
			// 3)当触发Added、Updated、Deleted事件时,将对应的资源对象更新到本地缓存DeltaFIFO
			switch event.Type {
			case watch.Added:
				err := r.store.Add(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			case watch.Modified:
				err := r.store.Update(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			case watch.Deleted:
				// TODO: Will any consumers need access to the "last known
				// state", which is passed in event.Object? If so, may need
				// to change this.
				err := r.store.Delete(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
				}
			case watch.Bookmark:
				// A `Bookmark` means watch has synced here, just update the resourceVersion
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
			}
			*resourceVersion = newResourceVersion
			// 4)调用r.setLastSyncResourceVersion,更新Reflector中已被处理的最新资源对象的resourceVersion值
			r.setLastSyncResourceVersion(newResourceVersion)
			if rvu, ok := r.store.(ResourceVersionUpdater); ok {
				rvu.UpdateResourceVersion(newResourceVersion)
			}
			eventCount++
		}
	}

	watchDuration := r.clock.Since(start)
	if watchDuration < 1*time.Second && eventCount == 0 {
		return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
	}
	klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
	return nil
}

3)、DeltaFIFO

DeltaFIFO可以分开理解,FIFO是一个先进先出的队列,它拥有队列操作的基本方法,例如Add、Update、Delete、List、Pop、Close等,而Delta是一个资源对象存储,它可以保存资源对象的操作类型,例如Added、Updated、Deleted、Sync等

1)DeltaFIFO结构体

代码路径:vendor/k8s.io/client-go/tools/cache/delta_fifo.go

type DeltaFIFO struct {
	...
	// 存放Delta,与queue中存放的key是同样的key
	items map[string]Deltas

	// 存储资源对象的key,可以确保顺序性
	queue []string

	// 默认使用MetaNamespaceKeyFunc,默认使用<namespace>/<name>的格式,不指定namespace时用<name>
	keyFunc KeyFunc
	...
}

type Deltas []Delta

type Delta struct {
	Type   DeltaType
	Object interface{}
}

type DeltaType string

const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	Replaced DeltaType = "Replaced"
	Sync DeltaType = "Sync"
)

DeltaFIFO会保留所有关于资源对象的操作类型,队列中会存在拥有不同操作类型的同一资源对象,消费者在处理该资源对象时能够了解该资源对象所发生的事情

queue字段存储资源对象的key,该key通过keyFunc计算得出,默认使用MetaNamespaceKeyFunc,默认使用<namespace>/<name>的格式,不指定namespace时用<name>

items字段通过map数据结构的方式存储,value存储的是对象的Delta数组

DeltaFIFO存储结构如下图所示:

DeltaFIFO本质上是一个先进先出的队列,有数据的生产者和消费者:

生产过程:

  • Reflector的List
  • Reflector的Watch
  • Reflector的Resync

消费过程:

  • 事件派发到WorkQueue
  • 刷新本地缓存
2)生产者方法

DeltaFIFO队列中的资源对象在Added、Updated、Deleted事件中都调用了queueActionLocked函数,它是DeltaFIFO实现的关键,代码如下:

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	// 1)计算出资源对象的key
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	oldDeltas := f.items[id]
	// 2)将actionType和资源对象构造成Delta,添加到items中,并通过dedupDeltas函数进行去重操作
	newDeltas := append(oldDeltas, Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)

	if len(newDeltas) > 0 {
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
		// 3)更新构造后的Delta并通过cond.Broadcast通知所有消费者解除阻塞
		f.cond.Broadcast()
	} else {
		// This never happens, because dedupDeltas never returns an empty list
		// when given a non-empty list (as it is here).
		// If somehow it happens anyway, deal with it but complain.
		if oldDeltas == nil {
			klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
			return nil
		}
		klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
		f.items[id] = newDeltas
		return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
	}
	return nil
}

queueActionLocked逻辑如下

  1. 通过f.KeyOf函数计算出资源对象的key
  2. 将actionType和资源对象构造成Delta,添加到items中,并通过dedupDeltas函数进行去重操作
  3. 更新构造后的Delta并通过cond.Broadcast通知所有消费者解除阻塞
3)消费者方法

Pop方法作为消费者方法使用,从DeltaFIFO的头部取出最早进入队列中的资源对象数据。Pop方法需要传入process函数,用于接收并处理对象的回调方法,代码如下:

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
			// When Close() is called, the f.closed is set and the condition is broadcasted.
			// Which causes this loop to continue and return from the Pop().
			if f.closed {
				return nil, ErrFIFOClosed
			}
			// 1)当队列中没有数据时,通过f.cond.Wait阻塞等待数据,只有收到cond.Broadcast时才说明有数据被添加,解决当前阻塞状态
			f.cond.Wait()
		}
		// 2)如果队列中不为空,取出f.queue的头部数据,将该对象传入process回调函数,由上层消费者进行处理
		id := f.queue[0]
		f.queue = f.queue[1:]
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		item, ok := f.items[id]
		if !ok {
			// This should never happen
			klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
			continue
		}
		delete(f.items, id)
		err := process(item)
		// 3)如果process回调函数处理错误,则将该对象重新存入队列
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		// Don't need to copyDeltas here, because we're transferring
		// ownership to the caller.
		return item, err
	}
}

pop逻辑如下

  1. 当队列中没有数据时,通过f.cond.Wait阻塞等待数据,只有收到cond.Broadcast时才说明有数据被添加,解决当前阻塞状态
  2. 如果队列中不为空,取出f.queue的头部数据,将该对象传入process回调函数,由上层消费者进行处理
  3. 如果process回调函数处理错误,则将该对象重新存入队列

参考:

《Kubernetes源码剖析》

2022年最新k8s编程operator篇

k8s client-go源码分析 informer源码分析(3)-Reflector源码分析

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/130667.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

重装操作系统win10+重装sqlserver+数据库可视化工具

sqlserver安装以及使用 一、重装操作系统 操作系统win10镜像 原因&#xff1a;sqlserver无数次失败让我放弃原来操作系统。 重装操作系统三步骤 1>下载大白菜一键装机工具 2>有一个u盘&#xff0c;8G以上存储空间 3>win10系统镜像 详细讲解&#xff1a; win10系统镜…

管道和重定向

1.重定向 1.输出重定向 > 一般用于在控制台上为了避免看到过多冗余的代码代码操作&#xff0c;所以将正确和错误的结果信息写到文件夹中 标准输出 1> 1>> [rootiZ2zef4rb5ixg6sieztgsdZ /]# cat /etc/passwd > file.txt //将/etc/passwd中输出的结果打印…

ElasticSearch6.x版本概念介绍及Kibana操作的增删改查常用API

文章目录一、概念介绍1.接近实时(NRT Near Real Time )2.索引(index)3.类型(type)4.映射(mapping)5.文档(document)6.概念关系图二、Kibana的基本操作1.创建dangdang索引并创建product类型2.删除dangdang索引3.创建id为1的文档记录4.查询id为1的文档记录5.删除id为1的文档记录6…

【2022年终总结】:小伙子还需努力呀~

文章目录前言第一次遇见CSDN我的收获我的迷茫我的展望前言 有一段时间没写博客了&#xff0c;具体什么原因呢&#xff1f;先买个关子&#xff0c;埋在下面的文字里。 眨眼时间&#xff0c;在CSDN待了快一年了&#xff0c;这一年的时间里有收获有感动&#xff0c;当然&#xff0…

IO多路复用【学习笔记】

1.用户空间和内核空间 虚拟内存被操作系统划分成两块&#xff1a;内核空间和用户空间&#xff0c;内核空间是内核代码运行的地方&#xff0c;用户空间是用户程序代码运行的地方。当进程运行在内核空间时就处于内核态&#xff0c;当进程运行在用户空间时就处于用户态。 为了安全…

elasticsearch在linux环境安装使用过程遇到的问题

es在linux环境安装遇到问题 1、启动失败日志 ERROR: [1] bootstrap checks failed [1]: the default discovery settings are unsuitable for production use; at least one of [discovery.seed_hosts, discovery.seed_providers, cluster.initial_master_nodes] must be con…

MCU-51:单片机之AT24C02学习

目录一、存储器介绍1.1 RAM1.2 ROM二、AT24C022.1 AT24C02介绍2.2 引脚及应用电路2.3 内部结构框图三、I2C总线3.1 I2C总线介绍3.2 I2C电路规范3.3 I2C时序结构3.4 I2C数据帧3.5 AT24C02数据帧四、代码演示4.1 AT24C02数据存储4.2 秒表(定时器扫描按键数码管)注意&#xff1a;一…

ECG信号处理——包括基本波检测、信号去噪、信号重建度量(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 心电图&#xff08;ECG&#xff09;信号自动分析与诊断是目前信号处理领域中的研究热点之一,其真正实现将有力地促进医疗事业的…

集合引用类型——对象

介绍&#xff1a; 依据变量分类&#xff1a;ECMAScript变量包括两种不同类型的值&#xff1a;原始值和引用值。 依据类型分类&#xff1a;ECMAScript类型包括两种不同类型的值&#xff1a;基本数据类型和集合数据类型。 原始值和引用值&#xff1a; 原始值就是最简单的数据&am…

网络层IP

文章目录网络层基本概念IP报头分片和组装如何分片如何组合16位标识符13位片偏移3位标志网段划分IP地址的分类CIDRIP地址数量限制私有IP和公网IP运营商路由网络层 http和tcp只考虑了两端的问题&#xff0c;并没有考虑路途中的事情。路由查找 IP的构成&#xff1a;32位网络号主…

【Ctfer训练计划】——(八)

作者名&#xff1a;Demo不是emo 主页面链接&#xff1a;主页传送门 创作初心&#xff1a;舞台再大&#xff0c;你不上台&#xff0c;永远是观众&#xff0c;没人会关心你努不努力&#xff0c;摔的痛不痛&#xff0c;他们只会看你最后站在什么位置&#xff0c;然后羡慕或鄙夷座…

fx5u 脉冲输出指令PLSY(DPLSY)4种写法

本文描述三菱FX5U的 脉冲输出指令PLSY(DPLSY)&#xff0c;4种写法&#xff0c;都有效。 第一行&#xff1a;设置脉冲输出频率 第二行&#xff1a;DPLSY D0 K0 K1,FX5U &#xff0c;第二个参数是脉冲数量&#xff0c;设置为K0表示一值输出脉冲。 第三个参数是轴号K1,表示Y0脉冲…

2023跨年倒计时2023最炫烟花秀烟花代码

&#x1f4cb; 前言 &#x1f5b1; 博客主页&#xff1a;在下马农的碎碎念✍ 本文由在下马农原创&#xff0c;首发于CSDN&#x1f4c6; 首发时间&#xff1a;2022/12/30&#x1f4c5; 最近更新时间&#xff1a;2022/12/30&#x1f935; 此马非凡马&#xff0c;房星本是星。向前…

c#入门-补码

补码 明明我们用正数用的更多&#xff0c;如果把0归到负数里面&#xff0c;那么正数就是整的2n次方了。 为什么不这么做呢&#xff1f; 如果你的手表快了20分钟&#xff0c;你可以&#xff1a; 1.把他调慢20分钟 2.再调快11小时40分钟 其实负数就是一个特别大的正数。CPU没有…

Unity 基于法线和深度实现完美描边,可独立控制物体描边

目录前言自定义PostProcessOutlineShader关键代码说明1 使用深度绘制描边1.1 获得斜四方形UV坐标&#xff1a;1.2 采样四方向深度2 使用法线绘制描边3 解决倾斜表面白块问题3.1 计算视方向3.2 使用视方向修正阈值4 单独控制物体是否显示描边OutlineShader完整代码前言 最近项目…

github上传代码(亲测实用)

又被github上传代码折腾了我3个小时&#xff0c;各种问题都遇到过&#xff0c;最后写篇博客记录一下&#xff0c;方便后续上传。 github创建项目完成后&#xff0c;就会出现上传指令&#xff0c;如下图所示&#xff1a; 现在只需要按着命令的提示一步步执行&#xff1b; 1.点…

一文读懂HTTPS

大家第一次接触 HTTPS 协议的时候是不是和我一样&#xff0c;非常困惑。 这玩意概念又多又繁琐。尤其是里面的公钥私钥啥的。 当时就特别想知道&#xff0c;为什么用公钥加密却不能用公钥解密&#xff1f; 看完这篇文章你会弄明白&#xff0c;同时还会解锁很多HTTPS里的细节…

ansible的安装

自定义环境 1.操作环境 角色主机名IP地址组名 控制主机 server.example.com 192.168.90.134 server 受控主机 node1.example.com 192.168.90.135 node1 受控主机 node2.example.com 192.168.90.133 node2 需要保准三台主机能够互相通信。设置同一种网络模式&#xff0…

Tic-Tac-Toe可能棋局搜索的实现(python)

目录 1. 前言 2. 算法流程 3. 代码实现 3.1 终局及胜负判定方法 3.2 搜索邻节点 3.3 打印棋盘状态 3.4 代码 4. 小结 1. 前言 Tic-Tac-Toe中文常译作井字棋&#xff0c;即在3 x 3的棋盘上&#xff0c;双方轮流落子&#xff0c;先将3枚棋子连成一线的一方获得胜利。Tic-…

✿✿✿JavaScript --- jQuery框架一

目 录 1.jQuery的介绍和在线学习网址以及下载网址 2.jQuery的功能和优势 3.引用jQuery库和第一个案例 4.jQuery代码格式和注释 5.jQuery如何达到获取原生的DOM对象 6.jQuery选择器&#xff08;CSS对比版&#xff09; (1)常见选择器 (2)高级选择器以及方法 (3)属性选择…