kubernetes之client-go informer源码分析

news2024/12/23 13:54:37

一、概念

informer 是 client-go 中的核心工具包,在kubernetes中,各个组件通过HTTP协议跟 API Server 进行通信。如果各组件每次都直接和API Server 进行交互,会给API Server 和ETCD造成非常大的压力。在不依赖任何中间件的情况下,通过informer保证了消息的实时性、可靠性和顺序性。

二、架构设计

在这里插入图片描述
informer运行原理
在这里插入图片描述

三、源码分析

3.1 informer启动

informer启动有以下步骤:

  1. 注册及启动processLoop和reflector
  2. reflector开始LIST和WATCH,watch到的数据进行对比处理,存入到queue中
  3. processLoop开始循环pop队列数据
	factory := informers.NewSharedInformerFactory(clientset, 0)
	podInformer := factory.Core().V1().Pods().Informer()
	podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("New pod added: %s", mObj.GetName())
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			oObj := oldObj.(v1.Object)
			nObj := newObj.(v1.Object)
			log.Printf("%s pod updated to %s", oObj.GetName(), nObj.GetName())
		},
		DeleteFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("pod deleted from store: %s", mObj.GetName())
		},
	})
	//启动informer
	podInformer.Run(stopCh)
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	......
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          s.indexer,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,
        //注册回调函数HandleDeltas,后面从queue弹出数据的时候要用到
		Process:           s.HandleDeltas,
		WatchErrorHandler: s.watchErrorHandler,
	}
	......
	s.controller.Run(stopCh)
}

代码位置:client-go/tools/cache/controller.go

func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	// 省略代码
    ......
	var wg wait.Group
    //启动reflector
	wg.StartWithChannel(stopCh, r.Run)
    //启动processLoop
	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

reflector开始list and watch,代码位置:client-go/tools/cache/reflector.go

func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
	klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
			switch event.Type {
			//watch到add事件
			case watch.Added:
				err := r.store.Add(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			//watch到modified事件
			case watch.Modified:
				err := r.store.Update(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			//watch到delete事件
			case watch.Deleted:
				// TODO: Will any consumers need access to the "last known
				// state", which is passed in event.Object? If so, may need
				// to change this.
				err := r.store.Delete(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
				}
			case watch.Bookmark:
				// A `Bookmark` means watch has synced here, just update the resourceVersion
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
			}

以update为例

func (f *DeltaFIFO) Update(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	return f.queueActionLocked(Updated, obj)
}
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	oldDeltas := f.items[id]
	newDeltas := append(oldDeltas, Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)

	if len(newDeltas) > 0 {
		if _, exists := f.items[id]; !exists {
		    //将key放入到queue
			f.queue = append(f.queue, id)
		}
		//将newDeltas放入到items中
		f.items[id] = newDeltas
		//事件到达广播
		f.cond.Broadcast()
	} else {
		// This never happens, because dedupDeltas never returns an empty list
		// when given a non-empty list (as it is here).
		// If somehow it happens anyway, deal with it but complain.
		if oldDeltas == nil {
			klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
			return nil
		}
		klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
		f.items[id] = newDeltas
		return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
	}
	return nil
}

DeltaFIFO的数据结构如下:

type DeltaFIFO struct {
	// lock/cond protects access to 'items' and 'queue'.
	lock sync.RWMutex
	cond sync.Cond

	// `items` maps a key to a Deltas.
	// Each such Deltas has at least one Delta.
	items map[string]Deltas

	// `queue` maintains FIFO order of keys for consumption in Pop().
	// There are no duplicates in `queue`.
	// A key is in `queue` if and only if it is in `items`.
	queue []string

	// populated is true if the first batch of items inserted by Replace() has been populated
	// or Delete/Add/Update/AddIfNotPresent was called first.
	populated bool
	// initialPopulationCount is the number of items inserted by the first call of Replace()
	initialPopulationCount int

	// keyFunc is used to make the key used for queued item
	// insertion and retrieval, and should be deterministic.
	keyFunc KeyFunc

	// knownObjects list keys that are "known" --- affecting Delete(),
	// Replace(), and Resync()
	knownObjects KeyListerGetter

	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
	// Currently, not used to gate any of CRUD operations.
	closed bool

	// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
	// DeltaType when Replace() is called (to preserve backwards compat).
	emitDeltaTypeReplaced bool
}

到这里,已经将最新的数据推送到了DeltaFIFO的queue中,接下来看下怎么处理queue中的数据。

queue出队:
回到之前注册的processLoop

func (c *controller) processLoop() {
	for {
	    //从queue弹出数据,交由process处理,也就是之前注册的handleDeltas
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				// 重新入队queue
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
			// When Close() is called, the f.closed is set and the condition is broadcasted.
			// Which causes this loop to continue and return from the Pop().
			if f.closed {
				return nil, ErrFIFOClosed
			}
            //如果queue中没有数据,阻塞等待
			f.cond.Wait()
		}
		id := f.queue[0]
		f.queue = f.queue[1:]
		depth := len(f.queue)
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		item, ok := f.items[id]
		if !ok {
			// This should never happen
			klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
			continue
		}
		delete(f.items, id)
		// Only log traces if the queue depth is greater than 10 and it takes more than
		// 100 milliseconds to process one item from the queue.
		// Queue depth never goes high because processing an item is locking the queue,
		// and new items can't be added until processing finish.
		// https://github.com/kubernetes/kubernetes/issues/103789
		if depth > 10 {
			trace := utiltrace.New("DeltaFIFO Pop Process",
				utiltrace.Field{Key: "ID", Value: id},
				utiltrace.Field{Key: "Depth", Value: depth},
				utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
			defer trace.LogIfLong(100 * time.Millisecond)
		}
		//处理数据,重点看下这个方法,进入HandleDeltas
		err := process(item)
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		// Don't need to copyDeltas here, because we're transferring
		// ownership to the caller.
		return item, err
	}
}

代码位置 client-go/tools/cache/shared_informer.go

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Replaced, Added, Updated:
			s.cacheMutationDetector.AddObject(d.Object)
			//从本地缓存indexer中查询数据是否存在
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
			    //如果存在,则更新indexer中该数据
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}

				isSync := false
				switch {
				case d.Type == Sync:
					// Sync events are only propagated to listeners that requested resync
					isSync = true
				case d.Type == Replaced:
					if accessor, err := meta.Accessor(d.Object); err == nil {
						if oldAccessor, err := meta.Accessor(old); err == nil {
							// Replaced events that didn't change resourceVersion are treated as resync events
							// and only propagated to listeners that requested resync
							isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
						}
					}
				}
				//分发监听者,通知监听update
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
			    //如果不存在,则在indexer中添加该数据
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				//分发监听者,通知监听add
				s.processor.distribute(addNotification{newObj: d.Object}, false)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			//分发监听者,通知监听delete
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	if sync {
		for _, listener := range p.syncingListeners {
		    //往监听者加入数据
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
		    //往监听者加入数据
			listener.add(obj)
		}
	}
}
func (p *processorListener) add(notification interface{}) {
	p.addCh <- notification
}

数据分发到了监听者,那么监听者是什么时候注册的,又是怎么工作的呢?
联系到前面informer注册的eventHandler

podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("New pod added: %s", mObj.GetName())
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			oObj := oldObj.(v1.Object)
			nObj := newObj.(v1.Object)
			log.Printf("%s pod updated to %s", oObj.GetName(), nObj.GetName())
		},
		DeleteFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("pod deleted from store: %s", mObj.GetName())
		},
	})
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
	s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
	//省略代码
    //......
    //创建监听者
	listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

	if !s.started {
		s.processor.addListener(listener)
		return
	}

	// in order to safely join, we have to
	// 1. stop sending add/update/delete notifications
	// 2. do a list against the store
	// 3. send synthetic "Add" events to the new handler
	// 4. unblock
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()
    //添加监听者
	s.processor.addListener(listener)
	for _, item := range s.indexer.List() {
		listener.add(addNotification{newObj: item})
	}
}
func (p *sharedProcessor) addListener(listener *processorListener) {
	p.listenersLock.Lock()
	defer p.listenersLock.Unlock()

	p.addListenerLocked(listener)
	if p.listenersStarted {
	    //在不同的协程使监听者运行起来
		p.wg.Start(listener.run)
		p.wg.Start(listener.pop)
	}
}
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
	p.listeners = append(p.listeners, listener)
	p.syncingListeners = append(p.syncingListeners, listener)
}
func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			// Notification dispatched
			var ok bool
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		//联系前面distribute分发监听者的时候将notification发送到addCh
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
				// Optimize the case - skip adding to pendingNotifications
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}
func (p *processorListener) run() {
	// this call blocks until the channel is closed.  When a panic happens during the notification
	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
	// the next notification will be attempted.  This is usually better than the alternative of never
	// delivering again.
	stopCh := make(chan struct{})
	wait.Until(func() {
		for next := range p.nextCh {
		    //这里调用到用户定义的handler方法
			switch notification := next.(type) {
			case updateNotification:
				p.handler.OnUpdate(notification.oldObj, notification.newObj)
			case addNotification:
				p.handler.OnAdd(notification.newObj)
			case deleteNotification:
				p.handler.OnDelete(notification.oldObj)
			default:
				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
			}
		}
		// the only way to get here is if the p.nextCh is empty and closed
		close(stopCh)
	}, 1*time.Second, stopCh)
}

最后看一下informer的详细全局设计
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/595547.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从零开始手写VIO 第3讲 基于优化的 IMU 与视觉信息融合

技巧性u初始值&#xff0c;更新的原因都不知道杂来的 F(x)是复杂的非线性函数&#xff0c;所以没法直接求导0得到最小值。所以展开 两种方法来下降&#xff0c;一种是先找一个方向&#xff0c;在确定一个步长line search。另一种是先确定区域&#xff0c;再找一个点。 J是FX的雅…

docker架构速看(1)-启动

Docker架构速看(1)-启动 ​ Docker是常用的容器管理工具&#xff0c;这篇文章对Docker架构结合源码做简要分析&#xff0c;由于也只使用过很少的命令&#xff0c;所以只分析image和container的相关部分。 源码准备 ​ Docker源码可以在github上找到&#xff0c;当前已更名为…

Vector-常用CAN工具 - VN5000接口以太网包过滤

目录 一、什么是硬件过滤&#xff1f; 1、什么时候应该过滤硬件&#xff1f; 2、需要注意什么&#xff1f; 3、如何过滤VN5000接口上的以太网报文&#xff1f; &#xff08;1&#xff09;Vector Hardware Manager &#xff08;2&#xff09;Vector Hardware Config 一、…

Benewake(北醒) 快速实现 TF03-485 与电脑通信操作说明

目录 一、前言二、工具准备1. USB-RS485 转接板或北醒 TF 系列专用转接板2. TF03-4853. PC&#xff1a;Windows 系统4. 串口助手软件、上位机 WINCC 三、连接方式方案一&#xff1a;USB-RS485 连接电脑与雷达(1) USB-RS485 转接板接口说明(2) TF03-485 引脚定义(3) 连接方式 方…

Go中的并发是困难的

我明白标题可能有些令人困惑&#xff0c;因为一般来说&#xff0c;Go被认为在并发方面有很好的内置支持。然而&#xff0c;我并不认为在Go中编写并发软件是容易的。让我向您展示我是什么意思。 使用全局变量 第一个例子是我们在项目中遇到的问题。直到最近&#xff0c;sarama…

【致敬未来的攻城狮计划】打卡1:rcsa+keil环境搭建

前言 这回参加的是csdn李肯老师的攻城狮计划&#xff0c;简单说就是我白嫖板子&#xff0c;输出学习笔记。 板子是瑞萨的CPK_RA2E1&#xff0c;还有触摸元件&#xff0c;看起来很有意思hh。 环境搭建 一开始决定采取vscode搭建的方式。后期进行到最后一步——cmake build的时…

SQL-计算留存率cohort

目录 1、留存率cohort介绍及其业务价值 2、计算思路 3、实操 3.1、日对日留存cohort 3.2、周对周留存cohort 3.3、月对月留存cohort 1、留存率cohort介绍及其业务价值 留存率cohort也叫做同期群留存分析&#xff0c;将同一时间范围内的用户分为一组&#xff0c;计算这批…

Linux命令(26)之uptime

Linux命令之uptime 1.uptime介绍 linux命令uptime是用来为用户提供系统从开启到当前运行uptime命令时系统已运行的时长信息&#xff0c;除此之外&#xff0c;还提了系统启动时间&#xff0c;当前登录用户&#xff0c;系统平均负载信息。 2.uptime用法 uptime [参数] uptime…

华为OD机试真题(Java),四则运算(100%通过+复盘思路)

一、题目描述 输入一个表达式&#xff08;用字符串表示&#xff09;&#xff0c;求这个表达式的值。 保证字符串中的有效字符包括[‘0’-‘9’],‘’,‘-’, ‘*’,‘/’ ,‘(’&#xff0c; ‘)’,‘[’, ‘]’,‘{’ ,‘}’。且表达式一定合法。 数据范围&#xff1a;表达…

gitlab记录

1、docker方式部署启动 参考文档&#xff1a; https://blog.csdn.net/weixin_53443677/article/details/125518696 https://blog.csdn.net/weixin_39034012/article/details/119211630 1.1、docker启动gitlab 前期准备 > # 拉镜像 > docker pull gitlab/gitlab-ce:late…

chatgpt赋能python:Python代码30行:提高网站SEO的最佳实践

Python 代码 30 行&#xff1a;提高网站 SEO 的最佳实践 搜索引擎优化&#xff08;SEO&#xff09;是网站成功的重要因素&#xff0c;它可以让网站排名更高并吸引更多的流量。Python 代码可以帮助您实现最佳的 SEO 实践&#xff0c;并提高网站的可见性和排名。下面是一个包含 …

Tugraph的设计和源码初步解析

1. Tugraph Tugraph是一款开源的性能优秀的图数据库&#xff0c;该图数据库使用多版本的BTree作为数据的存储引擎&#xff0c;同时设置了点边数据在这个存储引擎上的布局&#xff08;主要考虑数据的局部性&#xff09;&#xff0c;从而达到高性能查询的目的。本文主要从Tugrap…

ubuntu系统登录密码重置方法

公司搬家&#xff0c;需要更改git服务器地址&#xff0c;发现服务器密码也忘记了&#xff0c;折腾了下&#xff0c;通过如下方法修改成功。 一、重启计算机并进入GRUB菜单&#xff08;如果您的计算机没有显示GRUB菜单&#xff0c;请尝试按住Shift键或Esc键&#xff0c;直到出现…

手机安卓Termux搭建Hexo博客网站,并发布公网访问。

文章目录 1. 安装 Hexo2. 安装cpolar内网穿透3. 公网远程访问4. 固定公网地址 转载自cpolar极点云的文章&#xff1a;安卓手机使用Termux搭建Hexo个人博客网站【内网穿透公网访问】 Hexo 是一个用 Nodejs 编写的快速、简洁且高效的博客框架。Hexo 使用 Markdown 解析文章&#…

DAY04_JDBC快速入门JDBC API详解SQL防注入数据库连接池JDBC综合练习

目录 1 JDBC1.1 JDBC概念1.2 JDBC本质 1.3 JDBC好处 2 JDBC快速入门2.1 编写代码步骤2.2 具体操作 3 JDBC API详解3.1 DriverManager3.2 Connection3.2.1 获取执行对象3.2.2 事务管理 3.3 Statement3.4 ResultSet3.4.1 ResultSet案例 3.5 PreparedStatement3.5.1 SQL注入3.5.2 …

基于opencv实现两路yuv数据拼接合成一张大图

背景 实时音视频通话&#xff08;RTC&#xff09;越来越注重安全审核&#xff0c;特别是在1v1娱乐社交场景中&#xff0c;对于视频反垃圾的需求也越来越大。随之而来的是客户对审核成本降低的诉求日益强烈。针对1v1场景&#xff0c;将两路视频拼接成一张图片进行审核相比于分别…

大数据Doris(三十一):Broker Load导入HDFS json格式数据和注意事项

文章目录 Broker Load导入HDFS json格式数据和注意事项 一、导入HDFS json格式数据 1、创建Doris表

nginx(八十一)rewrite模块指令再探之(三)重定向

一 return和rewrite重定向再探 ① 前言 多种重定向跳转方式的差异 nginx与Location响应头细节探讨 本为不涉及讨论如下的绝对重定向1) return 301 http://www.wzj.com:6443/url?namewzj2) rewrite ... http://www.wzj.com:6443/url 2) rewrite ... http://www.wzj.com:64…

一分钟学一个 Linux 命令 - pwd

前言 大家好&#xff0c;我是 god23bin。欢迎大家继续围观《一分钟学一个 Linux 命令》&#xff0c;每天只需一分钟&#xff0c;记住一个 Linux 命令不成问题。本篇文章将聚焦于 pwd 命令&#xff0c;一个超级简单又常用的命令。在接下来的内容中&#xff0c;我将快速介绍 pwd…

Elasticsearch总结

详细描述一下 Elasticsearch 搜索的过程&#xff1f; 1、搜索被执行成一个两阶段过程&#xff0c;我们称之为 Query Then Fetch&#xff1b; 2、在初始查询阶段时&#xff0c;查询会广播到索引中每一个分片拷贝&#xff08;主分片或者副本分片&#xff09;。 每个分片在本地执…