k8s client-go源码解析之informer 二

news2025/1/18 3:29:58

Informer(二)

注意:本文内容为学习笔记,内容为个人见解,不保证准确性,但欢迎大家讨论何指教。

本篇介绍cache.SharedIndexInforme中 Controller及其组件
informer大致工作流程如下:
在这里插入图片描述

sharedIndexInformer

根据上一篇使用 Deployment 资源类型的informer创建举例, 我们可以定位到最终实现为:tools/cache/shared_informer.go

结构体定义如下:

type sharedIndexInformer struct {
	// 数据存储结构
	indexer    Indexer
	// 事件处理器
	controller Controller 
	// 用于分发事件到EventHandler
	processor             *sharedProcessor
	// 用于检查对象是否被篡改
	cacheMutationDetector MutationDetector 
	// 用于获取资源数据
	listerWatcher ListerWatcher
	// 当前监听的资源类型
	objectType runtime.Object
	// 资源在被分发前进行的处理操作
	transform TransformFunc
	// 省略部分代码...
}

informer启动函数:

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	// 初始化数据队列,用于资源数据的中转。
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          s.indexer,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:             fifo,
		ListerWatcher:     s.listerWatcher,
		ObjectType:        s.objectType,
		ObjectDescription: s.objectDescription,
		FullResyncPeriod:  s.resyncCheckPeriod,
		RetryOnError:      false,
		ShouldResync:      s.processor.shouldResync,

		Process:           s.HandleDeltas,
		WatchErrorHandler: s.watchErrorHandler,
	}
    // 省略部分代码...
	// ...
	// 启动数据突变检测器
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    // 启动事件分发器
	wg.StartWithChannel(processorStopCh, s.processor.run)
    // 启动数据控制器
	s.controller.Run(stopCh)
}

SharedProcessor

SharedProcessor用于管理processorListener
processorListener监听OnAdd,OnUpdate,OnDelete这些动作,交由对应的注册的事件函数处理。
sharedProcessor的代码这里就不赘述了,着重看一下processorListener的实现。

type processorListener struct {
	// 事件首先会放到addCh中
	addCh  chan interface{}
	// addCh中的事件,会放到nextCh中,守护协程就可以读到事件了
	// 来不及处理的事件,会放到pendingNotifications这个环形队列中
    pendingNotifications buffer.RingGrowing
	// 事件最终会放到这个chan中
	nextCh chan interface{}
    // 注册的事件处理函数
	handler ResourceEventHandler
    // 省略部分代码...
}

pop函数用于将addCh中的事件取出,基本逻辑如下:
在这里插入图片描述

func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) 

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		// 将取出的事件放到nextCh中
		case nextCh <- notification:
			var ok bool
			// 读取缓冲区,缓冲区为空时将notification和nextCh置为nil
			// 这样就可以继续走下面的写入逻辑
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { 
				// notification 为nil时,代表目前没有未处理的事件。
				// 这时候将nextCh,notification分别赋值,下一次循环就会将事件放到nextCh中
				notification = notificationToAdd
				nextCh = p.nextCh
			} else {
				// notification不为nil时,代表有未处理完的事件,将事件放到环形队列中
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

run方法从nextCh中消费

func (p *processorListener) run() {
	stopCh := make(chan struct{})
	wait.Until(func() {
		for next := range p.nextCh {
			switch notification := next.(type) {
			case updateNotification:
			    p.handler.OnUpdate(notification.oldObj, notification.newObj)
		    // 省略部分代码...
		}
		close(stopCh)
	}, 1*time.Second, stopCh)
}

controller

controllersharedIndexInformer的核心。
它负责从listerWatcher获取资源数据,然后将数据存储到indexer中,同时将数据分发到processor中。
我们来看看controller的定义:

type controller struct {
	config         Config
	// 用于获取数据,并转换成目标资源对象
	reflector      *Reflector
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

controller的启动函数:

func (c *controller) Run(stopCh <-chan struct{}) {
	// 省略部分代码...
	r := NewReflectorWithOptions(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		ReflectorOptions{
			ResyncPeriod:    c.config.FullResyncPeriod,
			TypeDescription: c.config.ObjectDescription,
			Clock:           c.clock,
		},
	)
    // 省略部分代码...

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()
	var wg wait.Group
	// 启动反射器,获取数据
	wg.StartWithChannel(stopCh, r.Run)
	// 启动processLoop,操作数据
	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

NewReflectorWithOptions会根据配置创建对应的Reflector对象。
Reflector.Run会启动一个ListAndWatch的goroutine,用于获取资源数据。
ListAndWatch会携带最后一次的资源版本号,加上重试机制来保证不会丢失数据。
然后将数据存储到DeltaFIFO中。

func (r *Reflector) Run(stopCh <-chan struct{}) {
    // 省略部分代码...
    wait.BackoffUntil(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
        r.watchErrorHandler(r, err)
    }
    }, r.backoffManager, true, stopCh)
}
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	// 首先list获取全量数据 
	err := r.list(stopCh)
	if err != nil {
		return err
	}
    // 省略部分代码...
	// 请求APIServer的参数
	options := metav1.ListOptions{
		// 指定了资源版本,这样就可以获取到资源版本之后的数据
        ResourceVersion: r.LastSyncResourceVersion(),
        TimeoutSeconds:
        } &timeoutSeconds,
		AllowWatchBookmarks: true,
    }
    // 重试器,仅限于调用apiServer的网络错误,比如超时、连接中断等
	retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
	for {
        // 省略部分代码...
		// 数据操作函数
		err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
		retry.After(err)
		if err != nil {
			if err != errorStopRequested {
				switch {
                // 省略部分代码...
				case apierrors.IsTooManyRequests(err):
					<-r.initConnBackoffManager.Backoff().C()
					continue
			}
			return nil
		}
	}
}

watchHandler

func watchHandler() error {
// 省略部分代码...
loop:
	for {
		select {
		case <-stopCh:
			return errorStopRequested
		case err := <-errc:
			return err
		case event, ok := <-w.ResultChan():
			if !ok {
				// 这里不清楚为什么要用这种方式
				// 可能是历史问题,这部分的代码比较老,是2016年的
				break loop
			}
			// 一堆数据合法性判断,省略部分代码...
			resourceVersion := meta.GetResourceVersion()
			switch event.Type {
			case watch.Added:
				// 写入到DeltaFIFO
				err := store.Add(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
				}
			case watch.Modified:
				// 省略部分代码...
			case watch.Deleted:
				// 省略部分代码...
			case watch.Bookmark:
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
			}
			// 更新记录的版本号,请求的时候会用到
			setLastSyncResourceVersion(resourceVersion)
			if rvu, ok := store.(ResourceVersionUpdater); ok {
				rvu.UpdateResourceVersion(resourceVersion)
			}
			eventCount++
		}
	}
	// 省略部分代码...
}

processLoop

processLoop会从DeltaFIFO中获取数据,然后将数据存储到indexer中,同时将数据分发到processor中。

func (c *controller) processLoop() {
	for {
		// 函数是在config中传入的
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		// 省略部分代码...
	}
}
// 最终定位到的函数
func processDeltas(
	handler ResourceEventHandler,
	clientState Store, 
	transformer TransformFunc,
	deltas Deltas,
	isInInitialList bool,
) error {
	// 代码里面有多处用Store关键字的地方
	// 命名也有很多store
	// 不同的位置,代表的含义也不一样,这里需要注意
	for _, d := range deltas {
		obj := d.Object
		// transformer用于在处理数据之前对数据进行转换
		// 默认transformer是nil
		if transformer != nil {
			var err error
			obj, err = transformer(obj)
			if err != nil {
				return err
			}
		}

		switch d.Type {
		case Sync, Replaced, Added, Updated:
			// 省略部分代码...
			// 写入到indexer中,即最终的数据存储位置
			if err := clientState.Update(obj); err != nil {
					return err
				}
			// 执行事件处理函数
			handler.OnUpdate(old, obj)
		case Deleted:
			// 省略部分代码...
		}
	}
	return nil
}	

总结

informercontroller负责数据控制,包括:获取数据、数据处理、数据分发。
controller中由:

  • Reflector负责获取数据并写入到队列。
    数据获取时会携带上一次的版本号,这样就可以获取到版本号之后的数据。
  • processLoop取出数据,写入到indexer中,同时将数据分发到processor中。processor中会根据动作类型,OnAddOnUpdateOnDelete,执行对应的事件处理函数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/398398.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CVS Health 西维斯健康EDI需求

CVS Health西维斯健康在特拉华州成立&#xff0c;通过旗下的 CVS Pharmacy 和 Longs Drugs 零售店以及 CVS.com 电商提供处方药、美容产品、化妆品、电影和照片加工服务、季节性商品、贺卡和方便食品。CVS Health通过使高质量的护理变得更经济、更易获得、更简单、更无缝&#…

中国通信行业十大杰出女性,看看你认识几个?

注&#xff1a;排名不分先后█ 01 中国信息通信研究院副院长 王志勤王志勤王志勤在中国乃至全球通信行业具有极高的知名度。她1992年毕业于北京邮电大学无线通信专业&#xff0c;在通信领域辛勤耕耘了三十余年&#xff0c;长期从事标准研究和制定工作&#xff0c;参与了中国从2…

C 语言网络编程 — 高并发 TCP 网络服务器

目录 文章目录目录TCP Socket 编程示例服务端客户端测试高并发 TCP 网络服务器I/O 并发模型设计系统文件描述符数量限制完全断开连接导致的性能问题关注 TCP 连接的状态合理配置 TCP 连接内核参数使用 shutdown() 来确保 Connection 被正常关闭断开重连问题使用 Heartbeat 来判…

【强化学习】强化学习数学基础:时序差分方法

时序差分方法Temporal Difference Learning举个例子TD learning of state values算法描述TD learning of action values: SarsaTD learning of action values: Expected SarsaTD learning of action values: n-step SarsaTD learning of optimal action values: Q-learningA un…

【Redis】主从集群 实现读写分离(二)

目录 2.Redis主从 2.1.搭建主从架构 2.2.主从数据同步原理 2.2.1.全量同步 2.2.2.增量同步 2.2.3.repl_backlog原理 2.3.主从同步优化 2.4.小结 2.Redis主从 2.1.搭建主从架构 单节点Redis的并发能力是有上限的&#xff0c;要进一步提高Redis的并发能力&#xff0c;…

YOLOv7、YOLOv5改进之打印热力图可视化:适用于自定义模型,丰富实验数据

💡该教程为改进YOLO高阶指南,属于《芒果书》📚系列,包含大量的原创改进方式🚀 💡更多改进内容📚可以点击查看:YOLOv5改进、YOLOv7改进、YOLOv8改进、YOLOX改进原创目录 | 唐宇迪老师联袂推荐🏆 💡🚀🚀🚀内含改进源代码 按步骤操作运行改进后的代码即可�…

【毕业设计】Java局域网聊天室系统的设计与实现

点击免费下载源码 视频聊天系统作为一种新型的通信和交流工具&#xff0c;突破了地域的限制&#xff0c;可以提供更为便捷、灵活、全面的音、视频信息的传递和服务&#xff0c;具有极其广泛的发展前景。 介绍了采用JAVA编程开发视频聊天系统的一套比较常用的解决方案。文字聊…

Spring之实例化Bean _ @Resource和@Autowired实现原理(3)

目录 1. 搜集注解信息 applyMergedBeanDefinitionPostProcessor(*) 2. 将实例化的Bean放入3级缓存中 addSingletonFactory&#xff08;***&#xff09;为循环依赖做准备 3. 根…

RS232/RS485信号接口转12路模拟信号 隔离D/A转换器LED智能调光控制

特点&#xff1a;● RS-485/232接口&#xff0c;隔离转换成12路标准模拟信号输出● 可选型输出4-20mA或0-10V控制其他设备● 模拟信号输出精度优于 0.2%● 可以程控校准模块输出精度● 信号输出 / 通讯接口之间隔离耐压3000VDC ● 宽电源供电范围&#xff1a;10 ~ 30VDC● 可靠…

搭建SpringBoot多模块微服务项目脚手架(一)

搭建SpringBoot多模块微服务项目脚手架(一) 文章目录搭建SpringBoot多模块微服务项目脚手架(一)1.概述2.微服务环境搭建介绍1.微服务环境描述2.搭建环境组件和版本清单3.搭建父模块环境3.1.创建springboot父工程1.创建springboot2.配置maven和java3.精简父模块4.pom文件配置5.父…

记录--服务端推送到Web前端有哪几种方式?

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 这个问题&#xff1f; 这个问题一般会出现在面试题里面&#xff0c;然后回答一些诸如轮询、WebSocket之类的答案。当然&#xff0c;实际开发中&#xff0c;也会遇到类似别人给你赞了&#xff0c;要通知…

华为OD机试题,用 Java 解【一种字符串压缩表示的解压】问题

华为Od必看系列 华为OD机试 全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典使用说明 参加华为od机试,一定要注意不…

20230309英语学习

What Is Sleep Talking? We Look at the Science 为什么人睡觉会说梦话&#xff1f;来看看科学咋说 Nearly everyone has a story about people talking in their sleep.Though it tends to be more common in children, it can happen at any age:A 2010 study in the jour…

如何恢复已清空的 Windows 回收站?

Windows 95 中引入的回收站文件夹&#xff08;也称为垃圾桶&#xff09;是有史以来最有用的功能之一&#xff0c;可以保护您免受自己的错误和粗心大意的影响。 通常&#xff0c;从文件夹中恢复丢失的文件就像单击桌面上的回收站图标并执行简单的拖放操作一样简单。 但是&…

Java实例实验项目大全源码企业通讯打印系统计划酒店图书学生管理进销存商城门户网站五子棋

wx供重浩&#xff1a;创享日记 对话框发送&#xff1a;java实例 获取完整源码源文件视频讲解文档资料等 文章目录1、企业通讯2、快递打印系统3、开发计划管理系统4、酒店管理系统5、图书馆管理系统6、学生成绩管理系统7、进销存管理系统8、神奇Book——图书商城9、企业门户网站…

数据库管理-第六十期 监听(20230309)

数据库管理 2023-03-09第六十期期 监听1 无法访问2 监听配置3 问题复现与解决4 静态监听5 记不住配置咋整总结第六十期期 监听 不知不觉又来到了一个整10期数&#xff0c;我承认上一期有很大的划水的。。。嫌疑吧&#xff0c;本期内容是从帮群友解决ADG前置配置时候的一个问题…

C51---定时器中断相关寄存器

1.中断系统&#xff0c;是为使CPU具有对外界紧急事件的实时处理能力而设置的。 当中央处理器CPU正在处理某件事情的时候&#xff0c;要求CPU暂停当前任务或工作&#xff0c;转而去处理这这个紧急事件。处理完以后&#xff0c;再回到原来的被中断的地方&#xff0c;继续原来的工…

华为OD机试题,用 Java 解【寻找相同子串】问题

华为Od必看系列 华为OD机试 全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典使用说明 参加华为od机试,一定要注意不…

RoCEv2网络部署实践

延续上篇RoCE网络的介绍&#xff0c;我们知道承载ROCEv2流量必须有一张无损网络。 本章主要介绍在以太网环境部署无损网络的关键点。 首先是QoS&#xff0c;包含流分类和队列调度两部分。 流分类&#xff1a;在网络接入设备&#xff08;TOR&#xff09;配置if-match类的语句&am…

一本通 2.8.1 广度优先搜索算法

1329&#xff1a;【例8.2】细胞 【题目描述】 一矩形阵列由数字0到9组成,数字1到9代表细胞,细胞的定义为沿细胞数字上下左右还是细胞数字则为同一细胞,求给定矩形阵列的细胞个数。如:阵列 有4个细胞。 【题目分析】 遍历所有节点&#xff0c;当无标识且不为零&#xff0c;…