informer中DeltaFIFO机制的实现分析与源码解读

news2025/1/14 18:10:02

informer中的DeltaFIFO机制的实现分析与源码解读

DeltaFIFO作为informer中重要组件,本文从源码层面了解是如何DelatFIFO是实现的。

DeltaFIFO的定义

找到delta_fifo.go的源码,位于client-go/tools/cache/delta_fifo.go

代码结构大致如下:
在这里插入图片描述

store定义了一个通用的存储接口

type Store interface {
	Add(obj interface{}) error
	Update(obj interface{}) error
	Delete(obj interface{}) error
	List() []interface{}
	ListKeys() []string
	Get(obj interface{}) (item interface{}, exists bool, err error)
	GetByKey(key string) (item interface{}, exists bool, err error)

	// Replace will delete the contents of the store, using instead the
	// given list. Store takes ownership of the list, you should not reference
	// it after calling this function.
	Replace([]interface{}, string) error
	Resync() error
}

Queue接口继承了store,但添加了Pop()重要方法,实现了队列的能力

// Queue is exactly like a Store, but has a Pop() method too.
type Queue interface {
	Store

	// Pop blocks until it has something to process.
	// It returns the object that was process and the result of processing.
	// The PopProcessFunc may return an ErrRequeue{...} to indicate the item
	// should be requeued before releasing the lock on the queue.
	Pop(PopProcessFunc) (interface{}, error)

	// AddIfNotPresent adds a value previously
	// returned by Pop back into the queue as long
	// as nothing else (presumably more recent)
	// has since been added.
	AddIfNotPresent(interface{}) error

	// HasSynced returns true if the first batch of items has been popped
	HasSynced() bool

	// Close queue
	Close()
}


FIFO类型实现了Queue接口

type FIFO struct {
	lock sync.RWMutex
	cond sync.Cond
	// We depend on the property that items in the set are in the queue and vice versa.
	items map[string]interface{}
	queue []string

	// populated is true if the first batch of items inserted by Replace() has been populated
	// or Delete/Add/Update was called first.
	populated bool
	// initialPopulationCount is the number of items inserted by the first call of Replace()
	initialPopulationCount int

	// keyFunc is used to make the key used for queued item insertion and retrieval, and
	// should be deterministic.
	keyFunc KeyFunc

	// Indication the queue is closed.
	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
	// Currently, not used to gate any of CRED operations.
	closed     bool
	closedLock sync.Mutex
}

var (
	_ = Queue(&FIFO{}) // FIFO is a Queue
)

DeltaFIFO类型也实现了Queue接口,与FIFO主要的区别是有两种特殊的方式:replaced和sync。replaced一般发生在资源版本更新时,而sync由resync定时发起。

type DeltaFIFO struct {
	// lock/cond protects access to 'items' and 'queue'.
	lock sync.RWMutex
	cond sync.Cond

	// We depend on the property that items in the set are in
	// the queue and vice versa, and that all Deltas in this
	// map have at least one Delta.
	items map[string]Deltas
	queue []string

	// populated is true if the first batch of items inserted by Replace() has been populated
	// or Delete/Add/Update was called first.
	populated bool
	// initialPopulationCount is the number of items inserted by the first call of Replace()
	initialPopulationCount int

	// keyFunc is used to make the key used for queued item
	// insertion and retrieval, and should be deterministic.
	keyFunc KeyFunc

	// knownObjects list keys that are "known", for the
	// purpose of figuring out which items have been deleted
	// when Replace() or Delete() is called.
	knownObjects KeyListerGetter

	// Indication the queue is closed.
	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
	// Currently, not used to gate any of CRED operations.
	closed     bool
	closedLock sync.Mutex
}

var (
	_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
)

DeltaFIFO的构造函数

// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
// items. See also the comment on DeltaFIFO.
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
	return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KeyFunction:  keyFunc,
		KnownObjects: knownObjects,
	})
}

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
	if opts.KeyFunction == nil {
		opts.KeyFunction = MetaNamespaceKeyFunc // 如果不指定keyFunc,默认就是MetaNamespaceKeyFunc
	}

	f := &DeltaFIFO{
		items:        map[string]Deltas{}, 	// 存放[]Delta的数组,
		queue:        []string{},						// 存储obj的key,key通常是ns/name格式的字符串
		keyFunc:      opts.KeyFunction,			// 由obj生成key的函数
		knownObjects: opts.KnownObjects,

		emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
		transformer:           opts.Transformer,
	}
	f.cond.L = &f.lock
	return f
}

var (
	_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
)

Delta的定义

根据Delta数据结构的定义,delta包含了一个资源对象的变更类型及变更的内容。这里的Object不一定是完整的资源数据,大部分场景下只会有变更的部分信息。

// Delta is a member of Deltas (a list of Delta objects) which
// in its turn is the type stored by a DeltaFIFO. It tells you what
// change happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
// state of the object before it was deleted.
type Delta struct {
	Type   DeltaType		// 表示对obj的操作类型"Added/Updated/Deleted/Replaced/Sync"
	Object interface{}	// 表示某个资源对象,比如命名为"one"的pod
}


// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta


// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string


// Change type definition
const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	// Replaced is emitted when we encountered watch errors and had to do a
	// relist. We don't know if the replaced object has changed.
	//
	// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
	// as well. Hence, Replaced is only emitted when the option
	// EmitDeltaTypeReplaced is true.
	Replaced DeltaType = "Replaced"
	// Sync is for synthetic events during a periodic resync.
	Sync DeltaType = "Sync"
)

Deletas的ADD()入队分析

watch机制监控到事件后,会把事件入队操作。

// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
  return f.queueActionLocked(Added, obj)	// 实际调用的是函数queueActionLocked()
}

queueActionLocked的逻辑主要包括从obj生产key(代码中是id),再有actionType和Obj构建一个新的Delta, 再把Delta加入Deltas切片中,之后,把Deltas放入items哈希表,key放入Queue队列中去。要注意Delta加入Deltas时需要进行出重。

// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)									// 先用keyFunc,通过obj获取到对应的key
	if err != nil {
		return KeyError{obj, err}
	}

  newDeltas := append(f.items[id], Delta{actionType, obj})	// 用actionType和Obj构建一个新的Delta,再把Delta追加到(f.items[id]返回的)Deltas切片
	newDeltas = dedupDeltas(newDeltas)												// 对新的Deltas切去去重

	if len(newDeltas) > 0 {																		// 如果newDeltas切片中存在Delta
		if _, exists := f.items[id]; !exists {									
			f.queue = append(f.queue, id)													// 将key放到queue中
		}
		f.items[id] = newDeltas																	// 把新的Deltes切片,放到items哈希表
		f.cond.Broadcast()
	}
	return nil
}

Delta去重

Delta进行Add()操作时,会对加入的delta进行去重。去重逻辑目前只针对两个delete类型的delta有效:当delta数组中倒数第一个和第二个delta都是delete类型时,将会去掉其中一个

// re-listing and watching can deliver the same update multiple times in any
// order. This will combine the most recent two deltas if they are the same.
func dedupDeltas(deltas Deltas) Deltas {
	n := len(deltas)
	if n < 2 {
		return deltas
	}
	a := &deltas[n-1]
	b := &deltas[n-2]
	if out := isDup(a, b); out != nil {
		d := append(Deltas{}, deltas[:n-2]...)
		return append(d, *out)
	}
	return deltas
}

// If a & b represent the same event, returns the delta that ought to be kept.
// Otherwise, returns nil.
// TODO: is there anything other than deletions that need deduping?
func isDup(a, b *Delta) *Delta {
	if out := isDeletionDup(a, b); out != nil {
		return out
	}
	// TODO: Detect other duplicate situations? Are there any?
	return nil
}

// keep the one with the most information if both are deletions.
func isDeletionDup(a, b *Delta) *Delta {
	if b.Type != Deleted || a.Type != Deleted {					// 仅处理a,b都是"Deleted"类型的事件;
		return nil
	}
	// Do more sophisticated checks, or is this sufficient?
	if _, ok := b.Object.(DeletedFinalStateUnknown); ok { // 如果a,b都是"Deleted",就只返回一个Delta
		return a
	}
	return b
}

在这里插入图片描述

Deltas的pop出队

deltaFIFO出队的操作和普通的队列出队类似,从队头取出一个资源对象key,并删除items中key对应的deltas数组。

pop出队时,会调用传参PopProcessFunc对出队元素进行处理。

// Pop blocks until an item is added to the queue, and then returns it.  If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
//
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
    // 如果队列为空,就f.cond.Wait阻塞等待
		for len(f.queue) == 0 {
			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
			// When Close() is called, the f.closed is set and the condition is broadcasted.
			// Which causes this loop to continue and return from the Pop().
			if f.IsClosed() {
				return nil, ErrFIFOClosed
			}

			f.cond.Wait()					
		}
    // 从f.queue中去重第一个元素
		id := f.queue[0]
		f.queue = f.queue[1:]
    
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
    // 从items哈希表中根据id,取出Deltas
		item, ok := f.items[id]
    // 如果itmes哈希表中差不到id对应的Deltas,就结束进入下次循环
		if !ok {
			// Item may have been deleted subsequently.
			continue
		}
    // 从items哈希表中删除id对应的Deltas
		delete(f.items, id)
    
    // process()函数来处理从items哈希表中取出的Deltas
		err := process(item)
    
    // 如果出现错误,就把id加回queue,同时把Deltas加回items
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		// Don't need to copyDeltas here, because we're transferring
		// ownership to the caller.
		return item, err
	}

接着我们看看process()函数具体是什么。

如果对informer启动比较熟悉的话,可以知道在创建informer时,newInformer()函数需要指定ProcessFunc。这个处理函数包括数据同步到存储,以及调用注册的用户函数两个操作。

func newInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
	clientState Store,
) Controller {
	// This will hold incoming changes. Note how we pass clientState in as a
	// KeyLister, that way resync operations will result in the correct set
	// of update/delete deltas.
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          clientState,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    lw,
		ObjectType:       objType,
		FullResyncPeriod: resyncPeriod,
		RetryOnError:     false,
		// 指定处理从deltaFIFO队列pop处理的数据的处理函数ProcessFunc
		Process: func(obj interface{}) error {
			// from oldest to newest
			for _, d := range obj.(Deltas) {
				switch d.Type {
				case Sync, Replaced, Added, Updated:
				   // 同步存储数据,clientState是一个store
					if old, exists, err := clientState.Get(d.Object); err == nil && exists {
						if err := clientState.Update(d.Object); err != nil {
							return err
						}
						// 回调用户定义的hander函数
						h.OnUpdate(old, d.Object)
					} else {
					   // 同步存储数据
						if err := clientState.Add(d.Object); err != nil {
							return err
						}
						// 回调用户定义的hander函数
						h.OnAdd(d.Object)
					}
				case Deleted:
				   // 同步存储数据
					if err := clientState.Delete(d.Object); err != nil {
						return err
					}
					// 回调用户定义的hander函数
					h.OnDelete(d.Object)
				}
			}
			return nil
		},
	}
	return New(cfg)
}

进一步探究一下,informer启动run()后,会调用controller.Run(),最后c.processLoop会循环处理pop出队处理,流程大致如下:

informer.run(stopCh) —> s.controller.Run(stopCh)—>c.processLoop—>c.config.Queue.Pop(PopProcessFunc(c.config.Process))

源码位于:vender/k8s.io/client-go/tools/cache/controller.go



func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	// 构建一个Reflector
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.clock = c.clock

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group

	wg.StartWithChannel(stopCh, r.Run)

  // 调用c.processLoop函数
	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

func (c *controller) processLoop() {
	for {
	  // 循环的Pop出队,把出队的事件交给PopProcessFunc函数处理
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

DeltaFIFO出队入队的流程图

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1968425.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【priority_queue的模拟实现】

priority_queue的模拟实现 小杨 容器适配器&#xff1a;priority_queue priority_queue即优先级队列是一种容器适配器&#xff0c;根据严格的弱排序标准(即排序规则可以更改),他的第一个元素总是它所包含的元素中最大的。优先队列将特定容器类封装作为其底层容器类。标准容器类…

ElasticSearch优化实战:打造高性能搜索引擎的秘籍

在当今这个大数据时代&#xff0c;信息的海量增长对搜索技术提出了前所未有的挑战。用户不仅需要快速准确地从数以亿计的数据中找到所需信息&#xff0c;还希望搜索引擎能够提供个性化和智能化的搜索体验。ElasticSearch作为市场上领先的搜索引擎&#xff0c;因其强大的全文搜索…

Typora2024最新版破解方法(亲测可用)

此方法非常简单&#xff0c;无需安装dll补丁&#xff0c;无需修改注册表&#xff0c;无需使用老版本。仅需修改部分文件内容即可 方法步骤 步骤一 下载并安装Typora 安装Typora 打开官网 下载并安装最新版即可 点击访问Typora官网 https://typoraio.cn/ 步骤二 修改文件 …

java面向对象重点总结

文章目录 java面向对象重点总结类与实例构造方法方法重载属性与修饰符封装继承多态重构抽象类接口抽象类和接口的区别&#xff1a;集合泛型 java面向对象重点总结 对象是一个自包含的实体&#xff0c;用一组可识别的特性和行为来标识。 面向对象编程&#xff0c;英文叫Object…

mcasttest-tool组播检测工具

作者&#xff1a;广大 检测组播 mcasttest-tool是oracle组播检测工具&#xff0c;组播是oracle 11.2.0.2开始的新功能。 1、上传mcasttest工具解压并授权 [rootrac1 soft]# cd /u01/soft/ [rootrac1 soft]# tar -xvf mcasttest.tgz[rootrac1 soft]# chown -R grid:oinstall…

Animate软件基础:“分散到图层”创建的新图层

FlashASer&#xff1a;AdobeAnimate2021软件零基础入门教程https://zhuanlan.zhihu.com/p/633230084 FlashASer&#xff1a;实用的各种Adobe Animate软件教程https://zhuanlan.zhihu.com/p/675680471 FlashASer&#xff1a;Animate教程及作品源文件https://zhuanlan.zhihu.co…

Pytorch实现线性回归Linear Regression

借助 PyTorch 实现深度神经网络 - 线性回归 - 第 2 周 | Coursera 线性回归预测 用PyTorch实现线性回归模块 创建自定义模块&#xff08;内含一个线性回归&#xff09; 训练线性回归模型 对于线性回归&#xff0c;特定类型的噪声是高斯噪声 平均损失均方误差函数&#xff1a…

Mallet:一款针对任意协议的安全拦截代理工具

关于Mallet Mallet是一款功能强大的协议安全分析工具&#xff0c;该工具支持针对任意协议创建用于安全审计的拦截代理&#xff0c;该工具本质上与我们所熟悉的拦截Web代理类似&#xff0c;只是通用性更强。 工具运行机制 Mallet建立在Netty框架之上&#xff0c;并且依赖于Net…

文案人的梦工场,网易入职指南!

网易云对于咱们一些有点文艺的文案策划来说&#xff0c;简直就是梦中情司。 在这里工作锻炼机会很多&#xff0c;也很开拓眼界&#xff0c;能获得相当于在别处3倍能力的成长速度&#xff0c;福利待遇也是很好的。 要进入网易云音乐做文案策划&#xff0c;你可以按照以下步骤进…

数据结构的基本概念与算法2

线性表 &#xff1a; 线性表 是具有相同数据类型的 n (n > 0) 个数据元素的有限序列&#xff0c;其中 n 为表长&#xff0c;当 n 0 时线性表是一个空表&#xff1b;若用 L 命名线性表&#xff0c;则其一般表示为&#xff1a;L (a1 , a2 , ... ai , ai1 , ... an) 上述中&a…

月木学途开发 2.项目架构

1.项目介绍 月木学途是一款it在线学习网站&#xff0c;项目采用前后端分离架构。前端开发主要使用vue.js&#xff0c;后端使用Spring Cloud Alibaba技术栈。项目包含学习网站的大部分功能&#xff0c;分为管理员端和用户端。管理员端有权限管理、课程管理、网站管理、求职模块管…

Shell函数和Shell 输入/输出重定向

LInux&#xff1a;Shell函数和Shell 输入/输出重定向 Shell函数 参数说明&#xff1a; 可以带function fun() 定义&#xff0c;也可以直接fun() 定义,不带任何参数。参数返回&#xff0c;可以显示加&#xff1a;return 返回&#xff0c;如果不加&#xff0c;将以最后一条命令运…

[Vue warn]: data functions should return an object:

仔细检查你的代码肯定有一个data()内忘记方return{}了

C语言程序设计23

《C程序设计教程&#xff08;第四版&#xff09;——谭浩强》 例题2.11 从键盘输入B、O、Y三个字符&#xff0c;然后把他们输出到屏幕上 代码&#xff1a; //《C程序设计教程&#xff08;第四版&#xff09;——谭浩强》 //例题2.11 从键盘输入B、O、Y三个字符&#xff0c;然…

RabbitMQ:MQ的可靠性

MQ的可靠性 在默认情况下&#xff0c;RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题: 一旦MQ宕机&#xff0c;内存中的消息会丢失 内存空间有限&#xff0c;当消费者故障或处理过慢时&#xff0c;会导致消息积压&#xff0c;引发MQ阻塞。 …

高效、安全、共享|济南市升级教育城域网,重塑教育网络生态

文/济南市电化教育馆 电教教研室主任 张承强 导语: 近年来,济南市教育局以前瞻性的视野,将教育数字化转型视为推动教育高质量发展的基石,全力加速教育现代化进程。在这一蓝图下,教育城域网的升级改造项目被赋予了基础性、先导性和战略性的重要意义,成为探索教育数字化转型新路…

一键搞定PDF翻译,这四款是职场达人常备翻译工具!!!

作为外贸搬砖人的一份子&#xff0c;虽说外语功底还说地过去&#xff0c;但是每天过目大量pdf文件的翻译&#xff0c;难免还有些吃力&#xff0c;这个时候如果有可以辅助翻译的工具那就再好不过了&#xff0c;今天给大家带来四款非常适合pdf文件翻译的工具&#xff0c;总有一款…

C#中的通信

上位机应用开发-串口通信1、基于C#的串口通信对象:SerialPort 2、字段属性 PortName:获取或设置通信端口 BaudRate:获取或设置串行波特率-DataBits:获取或设置每个字节的标准数据位长度 Parity:获取或设置奇偶校验检查协仪I-StopBits;获取或设置每个字节的标准停止位数 3、…

你需要的Node版本管理神器NVM

在做项目的时候&#xff0c;很多人本地的node都是装一个固定版本&#xff0c;一旦有些项目要下的依赖需要更高版本的node支持的时候&#xff0c;此时需要升级node就得把已经安装的低版本node卸载了&#xff0c;然后再重新下载、安装高版本的node,既费时间又抓狂&#xff0c;特别…

大模型算法面试题(十九)

本系列收纳各种大模型面试题及答案。 1、SFT&#xff08;有监督微调&#xff09;、RM&#xff08;奖励模型&#xff09;、PPO&#xff08;强化学习&#xff09;的数据集格式&#xff1f; SFT&#xff08;有监督微调&#xff09;、RM&#xff08;奖励模型&#xff09;、PPO&…