深入源码分析kubernetes informer机制(二)Reflector

news2025/1/11 21:38:47

[阅读指南]
这是该系列第二篇
基于kubernetes 1.27 stage版本
为了方便阅读,后续所有代码均省略了错误处理及与关注逻辑无关的部分。


文章目录

  • Reflector是什么
  • 整体结构
  • 工作流程
    • list拉取数据
    • 缓存resync操作
    • watch监听操作
  • 总结

Reflector是什么

reflector在informer中就像是一个对外的窗口,它与api-server建立连接,监听和获取来自api-server的资源变化信息,并把这些信息放进deltaFIFO中,交给下一个环节处理。

整体结构

与api-server进行交互,通过list获取指定的全量资源,watch监听指定的资源变化事件,并将这些事件放入delta FIFO队列中。
结构与交互如下图

// 省略了部分字段,只留下我们关注的
type Reflector struct {
    // name identifies this reflector. By default it will be a file:line if possible.
    name string

    // reflector对象需要监控的资源类型,比如上一节workqueue中的&v1.Pod{}
    expectedType reflect.Type
    
    // deltaFIFO 队列存储对象
    store Store
    
    // 实现list/watch
    listerWatcher ListerWatcher
    
    // 上次更新的资源版本号,用来判断当前的node的资源状况
    lastSyncResourceVersion string
    ......
}

工作流程

reflecter主函数比较简单,循环同步运行ListAndWatch直到收到stop信号。

func (r *Reflector) Run(stopCh <-chan struct{}) {
	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
}

ListAndWatch主要做了这几件事:

  1. 通过stream或者chunk方式拉取全量list数据
  2. 开启一个协程进行缓存resync操作。
  3. 循环执行watch监听操作
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	...
    
	fallbackToList := !r.UseWatchList

    // stream式同步
	if r.UseWatchList {
		w, err = r.watchList(stopCh)
    	...
		if err != nil {
            ...
			fallbackToList = true
			w = nil
		}
	}

    // chunk式同步
	if fallbackToList {
		err = r.list(stopCh)
		if err != nil {
			return err
		}
	}

	...
	go r.startResync(stopCh, cancelCh, resyncerrc)
	return r.watch(w, stopCh, resyncerrc)
}

接下来咱一步步来看。

list拉取数据

ListAndWatch拉取全量数据时,出现了两种数据拉取的方式,list /watchstream list /watch

stream list是 kubernetes 1.27 引入的新方案,通过 ENABLE_CLIENT_GO_WATCH_LIST_ALPHA 变量可以启用stream list,默认会使用原有的list/watch。后续会单独开一篇介绍stream list方案,详情可以通过KEP-3157了解

前者在初始化时list拉取全量数据,通过watch更新增量变化。
后者可以通过watch 请求的方式获取list数据,从而减轻大规模集群初始化list数据时的资源消耗。
在建立watch连接时,携带如下两个参数即可告知服务器使用streaming list进行一致性读取。
sendInitialEvents=true
resourceVersionMatch=NotOlderThan

常规的list流程借用这个博主画的时序图来看下。
在这里插入图片描述

缓存resync操作

resync负责定期将本地的缓存重新加入deltaFIFO队列,确保本地缓存与controller的数据一致性。

国内太多博客没了解清楚就介绍这一部分是与api-server交互,进行relist。实际上resync完全没有涉及到服务端的部分,他就是一个本地缓存的同步机制。与服务端的交互使用list/watch已经完全可以确保资源一致性了,基本不怎么需要进行relist操作,并且对于节点非常多的大集群来说,list非常消耗资源,何况是定期relist呢。

关于resync机制的介绍,不在这里展开,详细看下一篇笔记。

watch监听操作

watch的实现非常巧妙,它利用了http的chunk编码传输机制建立长连接,来实现动态的数据监听,可以了解分块传输编码。
同样借用一张时序图来看下watch的流程
在这里插入图片描述

reflector通过Watcher监听api-server端的数据delta事件,并将这些事件放入deltaFIFO中统一处理。

// 在这里向服务端发起watch请求,并接收和处理资源变更事件
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
	...
    
	for {
        ...

        // w == nil表示使用常规的list/watch方式,streaming 方式会创建特殊的watcher
		if w == nil {
			timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
			options := metav1.ListOptions{
                // 上次同步的资源版本,也就是本地的资源版本。以此来获取增量的数据
				ResourceVersion: r.LastSyncResourceVersion(),
                // watch 超时时间,长时间没有接受任务事件的watcher会被关掉,避免长时间挂起。
				TimeoutSeconds: &timeoutSeconds,
                // watch书签,避免watch重启时请求api-server导致的消耗。
				AllowWatchBookmarks: true,
			}
        	// 创建一个watch对象,监听api-server的资源变更事件,将接收到的事件丢进resultChan中
			w, err = r.listerWatcher.Watch(options)
        	...
		}

        // 将resultChan中的取出放入FIFO 队列
		err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
		
        // 失败重试逻辑
        ...
		
	}
}

建立连接的逻辑在这一行
w, err = r.listerWatcher.Watch(options)

还是用上一篇workqueue来看看这个Watch实例的实现。
从Watch函数一路往上追溯,可以看到先是与server建立了http连接,再通过watch标记建立了watch连接,创建stream watcher对象,并拉起一个协程去处理监听到的事件信息。

  • 此后所有监听的delta事件都会经过receive协程进入到resultChan中。
// reflector调用的watch函数
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
	return lw.WatchFunc(options)
}

// watchFunc函数的定义
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
	...
	watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
		options.Watch = true // 向服务端请求chunk连接
		optionsModifier(&options)
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Watch(context.TODO()) // 这里调用了getter的watch函数
                                    // getter是controller初始化时建立的http客户端: clientset.CoreV1().RESTClient()
	}
	return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}


func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
    ...	
    
	url := r.URL().String()
	for {
		req, err := r.newHTTPRequest(ctx)

		resp, err := client.Do(req)
		if err == nil && resp.StatusCode == http.StatusOK {
			return r.newStreamWatcher(resp)
		}
		...
	}
}

func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
	sw := &StreamWatcher{
		source:   d,
		reporter: r,
		result: make(chan Event),
		done: make(chan struct{}),
	}
	go sw.receive() // 处理消息事件的协程
	return sw
}

// 解析接收到的事件,并放到resultChan中等待后续处理。
func (sw *StreamWatcher) receive() {
	for {
        // 解析数据
		action, obj, err := sw.source.Decode()
		select {
		case <-sw.done:
			return
        // 将事件发送到resultChan
		case sw.result <- Event{
			Type:   action,
			Object: obj,
		}:
		}
	}
}
  • 进入resultChan的事件,由watchHandler取出再分类添加到FIFO队列中。
func watchHandler(start time.Time,
	w watch.Interface,	// watch实例
	store Store, // 存储对象 比如delta FIFO queue
	...
) error {
	...
loop:
	for {
		select {
		case <-stopCh:
			return errorStopRequested
		case err := <-errc:
			return err

        // 从ResultChan中取出变更事件,并放进队列中,比如delta FIFO队列中
		case event, ok := <-w.ResultChan():
			// 省略了一些资源过滤和错误处理
            ...

            // 解析监听到的事件数据
			meta, err := meta.Accessor(event.Object)
			if err != nil {
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
				continue
			}

            // 解析资源事件的版本
			resourceVersion := meta.GetResourceVersion()
			switch event.Type {
			case watch.Added: 
				err := store.Add(event.Object) // 往队列添加add delta事件
            	... // err handle
			case watch.Modified: 
				err := store.Update(event.Object) // 往队列添加update delta事件
				... // err handle
			case watch.Deleted:	
				err := store.Delete(event.Object) // 往队列添加delete delta事件,在此之前会判断事件对应的资源对象是否存在
				... // err handle
			case watch.Bookmark:
            	...
			default:
				... // err handle
			}
            
            // 更新resourceVersion版本号,下一轮watch就不会再收到重复的更新事件
			setLastSyncResourceVersion(resourceVersion)
			if rvu, ok := store.(ResourceVersionUpdater); ok {
				rvu.UpdateResourceVersion(resourceVersion)
			}
            
			...
		}
	}

    ...
	return nil
}

总结

用一个图来回顾下reflector各个模块的关系~
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/881475.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

神经网络分类算法原理详解

目录 神经网络分类算法原理详解 神经网络工作流程 反向传播算法 1) 反向传播原理 2) 应用示例 总结 正向传播 &#xff08;forward-propagation&#xff09;&#xff1a;指对神经网络沿着输入层到输出层的顺序&#xff0c;依次计算并存储模型的中间变量。 反向传播 &a…

泛微 E-Office文件上传漏洞复现

声明 本文仅用于技术交流&#xff0c;请勿用于非法用途 由于传播、利用此文所提供的信息而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;文章作者不为此承担任何责任。 文章作者拥有对此文章的修改和解释权。如欲转载或传播此文章&#xff0c…

牛客多校题解 | I Non-Puzzle: Segment Pair 扫描线

给n对区间&#xff0c;要求每对区间恰好选一个使得选出来的n个区间有交集&#xff0c;问有多少方案数 可以从每一个点开始考虑 如果前面的点没有任何可行的方案&#xff0c;那么新点就可以作为左端点&#xff0c;对答案的贡献为 向后扫描的过程中&#xff0c;如果新的点有增加…

2023-08-15 linux mipi 屏幕调试:有一个屏幕开机时候不显示,开机后按power 按键休眠唤醒就可以显示。原因是reset gpio 被复用

一、现象&#xff1a;今天更新了一个新版本的buildroot linux sdk &#xff0c;调试两个mipi 屏幕&#xff0c;这两个屏幕之前在其他的sdk都调好了的&#xff0c;所有直接把配置搬过来。但是有一个屏幕可以正常显示&#xff0c;有一个屏幕开机时候不显示&#xff0c;开机后按po…

油画|怀念《记忆中的乌篷船》

《记忆中的乌篷船》 90x60cm 陈可之1998年绘 油画《记忆中的乌篷船》&#xff0c;描绘着晚霞中长江边的几艘乌篷船。寻常的景象&#xff0c;流淌着岁月的痕迹&#xff0c;是许多人的遥远回忆。 乌篷船处于画面中心&#xff0c;用焦点透视法&#xff0c;把近处的石板&#xff0…

k8s集群部署vmalert和prometheusalert实现钉钉告警

先决条件 安装以下软件包&#xff1a;git, kubectl, helm, helm-docs&#xff0c;请参阅本教程。 1、安装 helm wget https://xxx-xx.oss-cn-xxx.aliyuncs.com/helm-v3.8.1-linux-amd64.tar.gz tar xvzf helm-v3.8.1-linux-amd64.tar.gz mv linux-amd64/helm /usr/local/bin…

从零开始的机械臂yolov5抓取gazebo仿真(二)

使用moveit_setup_assistant配置机械臂&#xff08;上&#xff09; 观察机械臂模型 上一节中拿到了sunday_description功能包&#xff0c;将功能包放进工作空间进行编译&#xff0c;可将工作空间路径写进.bashrc文件中&#xff0c;这样就不必每次都source了 例如&#xff1a…

I2C连续读写实现

IIC系列文章: (1)I2C 接口控制器理论讲解 (2)I2C接口控制设计与实现 (3)I2C连续读写实现 文章目录 前言一、 i2c_bit_shift 模块分析二、 i2c_control 模块实现三、 i2c_control 模块仿真测试前言 上文的 i2c_bit_shift 模块说完了,我们发现实现一个字节的写操作还是可以实现…

为什么CAN要采取双绞线布局?

摘要&#xff1a; 在CAN总线应用中&#xff0c;一般建议使用屏蔽双绞线进行组网、布线&#xff0c;本文将详细讲解为什么CAN总线要采取双绞线的布局。 CAN&#xff08;Controller Area Network&#xff09;是一种用于实时应用的串行通讯协议总线&#xff0c;它可以使用双绞线来…

【解决】Kafka Exception thrown when sending a message with key=‘null‘ 异常

问题原因&#xff1a; 如下图&#xff0c;kafka 中配置的是监听域名的方式&#xff0c;但程序里使用的是 ip:port 的连接方式。 解决办法&#xff1a; kafka 中配置的是域名的方式&#xff0c;程序里也相应配置成 域名:port 的方式&#xff08;注意&#xff1a;本地h…

Medical Isolated Power Supply System in Angola

安科瑞 华楠 Abstract: Diagnosis and treatment in modern hospitals are inseparable from advanced medical equipment, which are inseparable from safe and reliable power supply. Many operations often last for several hours, and the consequences of a sudden pow…

js 构造函数

js 构造函数 new Pig() ---- 创建新的空对象 this 指向新对象 this.name name --------修改this&#xff0c;添加新的属性。 最后返回新的对象

C++遍历std::tuple(C++14 ~ C++20)

本文展示了遍历std::tuple的方式&#xff1a; 首先比较容易想到的是利用C14的std::make_index_sequence与std::get结合取值&#xff0c;然后配合std::initializer_list进行包展开&#xff1a; // since C14 class Func0 {template<typename T, typename F, size_t... I>…

bert,transformer架构图及面试题

Transformer详解 - mathor atten之后经过一个全连接层残差层归一化 class BertSelfOutput(nn.Module):def __init__(self, config):super().__init__()self.dense nn.Linear(config.hidden_size, config.hidden_size)self.LayerNorm nn.LayerNorm(config.hidden_size, epscon…

疫情打卡 vue+springboot疾病防控管理系统java jsp源代码

本项目为前几天收费帮学妹做的一个项目&#xff0c;Java EE JSP项目&#xff0c;在工作环境中基本使用不到&#xff0c;但是很多学校把这个当作编程入门的项目来做&#xff0c;故分享出本项目供初学者参考。 一、项目描述 疫情打卡 vuespringboot 系统有1权限&#xff1a;管理…

vue3+ts-tsconfig.json报错Option ‘importsNotUsedAsValues’

vue3ts-tsconfig.json报错Option ‘importsNotUsedAsValues’ is deprecated and will stop functioning in TypeScript 5.5. Specify compilerOption ‘“ignoreDeprecations”: “5.0”’ to silence this error. Use ‘verbatimModuleSyntax’ instead 自我记录 翻译 选项…

网工软考 | 软考到底考哪个证比较好?

最近来咨询软考的同学比较多&#xff0c;对软考有哪些证书&#xff0c;怎么来选择比较困扰&#xff0c;目前刚好是学习的最佳阶段。 本期就统一来解答一下。 01 软考分为五个专业 计算机软件、计算机网络、计算机应用技术、信息系统和信息服务共五个专业&#xff0c;并在各专…

股市杠杆操作是什么意思?从三方面分析

股市杠杆操作是指投资者通过借用资金进行证券交易&#xff0c;以放大投资回报的一种金融工具。这种操作可以使投资者借用额外的资金进行交易&#xff0c;增加投资收益的潜力&#xff0c;但也伴随着更高的风险。下面从三个方面对股市杠杆操作进行分析。 1. 操作原理和优势&#…

Nginx的块、变量以及重定向

目录 绪论 1、location匹配 1.1 常见的Nginx正则表达式 1.2 正则表达式&#xff1a;匹配的是文件内容 1.3 location匹配uri 1.4 location常用的匹配规则 1.5 location优先级 1.6 匹配小结 1.7 生产环境中的匹配规则 2、nginx的内置变量 3、rewrite 3.1 rewrite作用 …

Spring中bean生命周期的PostProcessor的每个方法的作用

可结合这个博客看 https://blog.csdn.net/riemann_/article/details/118500805、https://cloud.tencent.com/developer/article/1409315、https://blog.csdn.net/qq_43631716/article/details/120239438 本篇内容借鉴于chatgtp,应该有错误&#xff0c;仅作方法应用的参考&#…