k8s 源码分析 informer篇

news2025/1/11 0:55:12

之前介绍过informer的流程,文章在 informer介绍。今天梳理一下他的源码和流程。

请添加图片描述

一、概念

什么是 Informer
informer 是 client-go 中的核心工具包,informer 其实就是一个带有本地缓存和索引机制的,可以注册 EventHandler 的 client 本地缓存被称为 Store,索引被称为 Index 。使用 informer 的目的是为了减轻 apiserver 数据交互的压力而抽象出来的一个 cache 层,客户端对 apiserver 数据的 读取和监听操作都通过本地 informer进行。

为什么需要 Informer 机制

Kubernetes 各个组件都是通过 REST API 跟 API Server 交互通信的,而如果每次每一个组件都直接跟 API Server 交互去读取/写入到后端的etcd的话,会对 API Server 以及etcd造成非常大的负担。 而 Informer 机制是为了保证各个组件之间通信的实时性、可靠性,并且减缓对 API Server和etcd 的负担。

informer和configch(newSourceApiserverFromLW)区别
之前介绍过一篇configch的文章 configch源码。这篇文章主要是介绍newSourceApiserverFromLW方法,与kubelet交互后进行更新删除等。并且newSourceApiserverFromLW只监听一个api对象(pod)
而informer则监听整个namespace下的api对象。但是informer不会做出动作,只是监听后维护自己的缓存。方便用户去使用最新数据,避免每次都http访问api-server浪费资源

二、注册及接收

大概步骤如下

  • 启动informer
  • 注册processLoop和reflector
  • reflector开始LIST&WATCH。同时processLoop开始循环pop队列数据(步骤三)
  • watch到的数据进行对比处理,不存在的存入的queue队列中

注册
2.1.start注册
代码位置:vendor/k8s.io/client-go/informer/factory.go

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()
	if f.shuttingDown {
		return
	}
	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
			f.wg.Add(1)
			informer := informer
			go func() {
				defer f.wg.Done()
				//启动informer
				informer.Run(stopCh)
			}()
			f.startedInformers[informerType] = true
		}
	}
}

2.2 注册processLoop和reflector
代码位置:vendor/k8s.io/client-go/tools/cache/controller.go

//完成一些初始化,总要的是下面两个函数
    //启动监听
    wg.StartWithChannel(stopCh, r.Run)
    //这里是循环delta FIFO的queue,然后pop出来最新的
	wait.Until(c.processLoop, time.Second, stopCh)

2.3 reflector开始LIST&WATCH
代码位置:vendor/k8s.io/client-go/tools/cache/reflector.go
这里开始LIST&WATCH

	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
   ....省略
   //开始watch
   return r.watch(w, stopCh, resyncerrc)
   ...省略
   //watch函数开始
   		err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)

2.4 watch处理数据
还是上面reflector文件,开始处理接收到的最新watch的信息

switch event.Type {
			case watch.Added:
				err := store.Add(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
				}
			case watch.Modified:
				err := store.Update(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
				}
			case watch.Deleted:
				err := store.Delete(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
				}
			case watch.Bookmark:
			
				if _, ok := meta.GetAnnotations()["k8s.io/initial-events-end"]; ok {
					if exitOnInitialEventsEndBookmark != nil {
						*exitOnInitialEventsEndBookmark = true
					}
				}
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
			}

代码位置:vendor/k8s.io/client-go/tools/cache/delta_fifo.go
进入到更新。这里通过queueActionLocked函数验证验证一下obj是否存在,不存在则推倒queue队列中

func (f *DeltaFIFO) Update(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	return f.queueActionLocked(Updated, obj)
}
...省略部分代码
//queueActionLocked中就是获取到item映射出来的这个obj信息,如果不存在则推送到queue中,如果存在则替换。
	if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas

到这里,接收的工作就完成了,已经将最新的数据推送到了queue中,下面就是开始处理这个queue了

三、弹出队列处理

  • 开启queue队列的循环接收
  • 获取到最新队列信息开始处理
  • 更新本地indexer缓存
  • 分发给监听informer的用户去触发回调函数

3.1 开始循环接收数据

func (c *controller) processLoop() {
	for {
	    //无限循环处理queue数据
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		}
}

3.2 这里面也是个for循环,如果队列大于0,则读取出来通过process处理,否则等待队列有新数据加入
代码位置 vendor/k8s.io/client-go/tools/cache/delta_fifo.go

for {
		for len(f.queue) == 0 {
			if f.closed {
				return nil, ErrFIFOClosed
			}
			f.cond.Wait()
		}
		id := f.queue[0]
		f.queue = f.queue[1:]
		depth := len(f.queue)
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		item, ok := f.items[id]
		delete(f.items, id)
		//进入这里
		err := process(item, isInInitialList)
}

3.3 注册的回调。判断映射是否存在,存在的话则进行处理
代码位置 vendor/k8s.io/client-go/tools/cache/shared_informer.go

func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	if deltas, ok := obj.(Deltas); ok {
	    //如果映射存在,则进行处理
		return processDeltas(s, s.indexer, deltas, isInInitialList)
	}
	return errors.New("object given as Process argument is not Deltas")
}

3.4 处理本地缓存和回调
代码位置:vendor/k8s.io/client-go/tools/cache/controller.go

func processDeltas(
	handler ResourceEventHandler,
	clientState Store,
	deltas Deltas,
	isInInitialList bool,
) error {
	for _, d := range deltas {
		obj := d.Object

		switch d.Type {
		 //如果是添加、同步、替换、更新,则进入这里
		case Sync, Replaced, Added, Updated:
			if old, exists, err := clientState.Get(obj); err == nil && exists {
			    //更新indexer的本地缓存(步骤3.5)
				if err := clientState.Update(obj); err != nil {
					return err
				}
				//处理回调(步骤3.6)
				handler.OnUpdate(old, obj)
			} else {
				if err := clientState.Add(obj); err != nil {
					return err
				}
				handler.OnAdd(obj, isInInitialList)
			}
		case Deleted:
			if err := clientState.Delete(obj); err != nil {
				return err
			}
			handler.OnDelete(obj)
		}
	}
	return nil
}

3.5 更新本地indexer(Storage)缓存
代码位置:vendor/k8s.io/client-go/tools/cache/controller.go

func (c *cache) Update(obj interface{}) error {
	key, err := c.keyFunc(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	c.cacheStorage.Update(key, obj)
	return nil
}

3.6处理回调

func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
	isSync := false
	if accessor, err := meta.Accessor(new); err == nil {
		if oldAccessor, err := meta.Accessor(old); err == nil {
			// Events that didn't change resourceVersion are treated as resync events
			// and only propagated to listeners that requested resync
			isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
		}
	}
	s.cacheMutationDetector.AddObject(new)
	//这个是重点,分发给监听者
	s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
}

3.7 分发给监听者

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
    //p.listeners代表有几个监听者。这里循环所有监听者,把最新信息发送给监听者。
	for listener, isSyncing := range p.listeners {
		switch {
		case !sync:
			listener.add(obj)
		case isSyncing:		
			listener.add(obj)
		default:
			
		}
	}
}

到这里就处理结束了。如果想要使用监听者,需要用户自己建立informer进行监听。然后添加注册函数,就可以接收了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/457750.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

L2-1 堆宝塔

堆宝塔游戏是让小朋友根据抓到的彩虹圈的直径大小&#xff0c;按照从大到小的顺序堆起宝塔。但彩虹圈不一定是按照直径的大小顺序抓到的。聪明宝宝采取的策略如下&#xff1a; 首先准备两根柱子&#xff0c;一根 A 柱串宝塔&#xff0c;一根 B 柱用于临时叠放。把第 1 块彩虹圈…

Linux下使用Mysql 第一天

目录 安装mysql 更改账户名和密码 启动/关闭mysql mysql的基本操作 数据库CURD 创建数据库 查看数据库 修改数据库 删除数据库 表的CURD 创建表 查看表 修改表 删除表 表数据的CURD create数据 Retrieve数据 update数据 delete数据 DML和DDL的区别&#xf…

波形生成:均匀和非均匀时间向量

波形生成—— 脉冲、chirp、VCO、正弦函数、周期性/非周期性和调制信号 使用 chirp 生成线性、二次和对数 chirp。使用 square、rectpuls 和 sawtooth 创建方波、矩形波和三角形波。 如需了解此处未显示的其他无线波形生成功能&#xff0c;请参阅无线波形发生器 (Communicat…

【腾讯云-2】极简搭建边缘集群

1 创建 流程和https://blog.csdn.net/qq_47058489/article/details/130347795差不多&#xff0c;可参考 查看基本信息&#xff1a; 创建边缘集群的过程中会初始化master&#xff0c;说明包含一个托管master节点 但是没有其他节点 2 开启节点远程登录 通过 SSH 的方式远…

JavaScript模块化开发

目录&#xff1a; 1 认识模块化开发 2 CommonJS和Node 3 require函数解析 4 AMD和CMD&#xff08;了解&#xff09; 5 ESModule用法详解 6 ESModule运行原理 模块化不是两个不同的js文件直接导入到某个页面中的&#xff0c;因为这两个文件只要有相同的变量或函数&#xf…

html学习(标签、css、选择器)

认识HTML HTML是HyperText Markup Language的缩写&#xff0c;中文名为超文本标记语言。它是一种用来创建网页的标准标记语言&#xff0c;由标签&#xff08;tag&#xff09;和文本构成&#xff0c;用于描述网页的结构和内容。HTML文档可以被浏览器解析并呈现出网页的内容和样…

方向梯度直方图(Histogram of Oriented Gradient)

1.方向梯度直方图&#xff08;Histogram of Oriented Gradient&#xff09; 笔记参考&#xff1a;HOG特征提取 笔记参考&#xff1a;一文讲解方向梯度直方图&#xff08;hog&#xff09; 笔记参考&#xff1a;Histogram of Oriented Gradients (HOG) | By Dr. Ry Stemplicity …

【Java-01】深入浅出匿名对象 , 继承 , 抽象类

主要内容 面向对象回顾 匿名对象介绍 面向对象特征 - 继承 抽象类的使用 模板设计模式 1 面向对象回顾 面向对象的核心思想是什么 ? 用代码来模拟现实生活中的事物 , 比如学生类表示学生事物 , 对象表示的就是具体的学生 , 有了类就可以描述万千世界所有的事物了 现有的…

【PaddleNLP-kie】关键信息抽取2:UIE模型做图片信息提取全流程

文章目录 本文参考UIE理论部分step0、UIEX原始模型使用网页体验本机安装使用环境安装快速开始 step1、UIEX模型微调&#xff08;小样本学习&#xff09;微调模型对比step2、服务化部署step3、提升推理速度模型量化更换模型fast-tokenizer提高batch_size&#xff08;没用&#x…

第十一章 组合模式

文章目录 前言一、组合模式基本介绍二、UML类图三、完整代码抽象类&#xff0c;所有类都继承此类学校类以父类型引用组合一个学院类学院类以父类型引用组合一个专业类专业类&#xff0c;叶子节点&#xff0c;不能再组合其他类测试类 四、组合模式在JDK集合的源码分析五、组合模…

51单片机(一)软硬件环境和单片机介绍

❤️ 专栏简介&#xff1a;本专栏记录了从零学习单片机的过程&#xff0c;其中包括51单片机和STM32单片机两部分&#xff1b;建议先学习51单片机&#xff0c;其实STM32等高级单片机的基础&#xff1b;这样再学习STM32时才能融会贯通。 ☀️ 专栏适用人群 &#xff1a;适用于想要…

delta.io 参数 spark.databricks.delta.replaceWhere.constraintCheck.enabled

总结 默认值true 你写入的df分区字段必须全部符合覆盖条件 .option("replaceWhere", "c2 == 2") false: df1 overwrite tb1: df1中每个分区的处理逻辑: - tb1中存在(且谓词中匹配)的分区,则覆盖 - tb1中存在(谓词中不匹配)的分区,则append - tb1中不存…

热闹之后,香港是否会成为Web3的“应许之地”?

出品&#xff5c;欧科云链研究院 作者&#xff5c;Jason Jiang 自从2022年底有关虚拟资产在港发展的政策宣言发布后&#xff0c;香港始终是Web3世界的焦点。当港府官员频繁现身以鼓励Web3创新发展&#xff0c;当数以万计的Web3 Builders时隔三年再次聚首香江&#xff0c;当传…

密码学报Latex模板使用

密码学报Latex模板使用 首先从密码学报下载模板 然后注册overleaf账号 创建新项目&#xff0c;把.zip包导入 修改编译器为XeLaTeX 然后点击重新编译即可

信息安全复习六:公开密钥密码学

一、章节梗概 1.公开密钥密码模型的基本原理 2.两个算法&#xff1a;RSA&D-H算法 主要内容 1.对称密钥密码的密钥交换问题 2.公钥密码模型的提出 3.设计公钥密码的基本要求 4.数字签名 5.RSA算法 6.公钥密码的特征总结 二、对称密钥密码 对称加密算法中&#xff0c;数据…

SpringCloud --- Feign远程调用

一、RestTemplate问题 先来看我们以前利用RestTemplate发起远程调用的代码&#xff1a; 存在下面的问题&#xff1a; 代码可读性差&#xff0c;编程体验不统一参数复杂URL难以维护 Feign是一个声明式的http客户端&#xff0c;官方地址&#xff1a;GitHub - OpenFeign/feign:…

程序员如何提高代码能力

目录 程序员如何提高代码能力——C语言方向阅读优秀的代码不断练习学习新技术与他人合作不断反思和改进 程序员如何提高代码能力——C语言方向 C 是一种功能强大的编程语言&#xff0c;广泛应用于操作系统、数据库、游戏开发等领域。而要成为一名优秀的 C 程序员&#xff0c;不…

动力节点Vue笔记④ Vue与Ajax

四、Vue与AJAX 4.1 回顾发送AJAX异步请求的方式 发送AJAX异步请求的常见方式包括&#xff1a; 原生方式&#xff0c;使用浏览器内置的JS对象XMLHttpRequest const xhr new XMLHttpRequest()xhr.onreadystatechange function(){}xhr.open()xhr.send() 原生方式&#xff0…

RK3568平台开发系列讲解(调试篇)内核函数调用堆栈打印方法汇总

🚀返回专栏总目录 文章目录 一、dump_stack 函数二、WARN_ON(condition)函数三、BUG_ON (condition)函数四、panic (fmt...)函数沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇将对驱动调试方法进行汇总学习。 一、dump_stack 函数 dump_stack 作用:打印内核调…

C++、STL标准模板库和泛型编程 ——迭代器、 算法、仿函数(侯捷)

C、STL标准模板库和泛型编程 ——迭代器、 算法、仿函数 &#xff08;侯捷&#xff09; 迭代器iterator_category 算法accumulatefor_eachreplacecountfindsortbinary_search 仿函数 functors(六大部件中最简单的一种&#xff01;) 使用一个东西&#xff0c;却不明白它的道理&a…