kubernetes-informer机制

news2025/1/4 16:46:02

一、概念

informer 是 client-go 中的核心工具包,在kubernetes中,各个组件通过HTTP协议跟 API Server 进行通信。如果各组件每次都直接和API Server 进行交互,会给API Server 和ETCD造成非常大的压力。在不依赖任何中间件的情况下,通过informer保证了消息的实时性、可靠性和顺序性。

二、架构设计

在这里插入图片描述
informer运行原理
在这里插入图片描述

三、源码分析

3.1 informer启动

informer启动有以下步骤:

  1. 注册及启动processLoop和reflector
  2. reflector开始LIST和WATCH,watch到的数据进行对比处理,存入到queue中
  3. processLoop开始循环pop队列数据
	factory := informers.NewSharedInformerFactory(clientset, 0)
	podInformer := factory.Core().V1().Pods().Informer()
	podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("New pod added: %s", mObj.GetName())
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			oObj := oldObj.(v1.Object)
			nObj := newObj.(v1.Object)
			log.Printf("%s pod updated to %s", oObj.GetName(), nObj.GetName())
		},
		DeleteFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("pod deleted from store: %s", mObj.GetName())
		},
	})
	//启动informer
	podInformer.Run(stopCh)
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	......
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          s.indexer,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,
        //注册回调函数HandleDeltas,后面从queue弹出数据的时候要用到
		Process:           s.HandleDeltas,
		WatchErrorHandler: s.watchErrorHandler,
	}
	......
	s.controller.Run(stopCh)
}

代码位置:client-go/tools/cache/controller.go

func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	// 省略代码
    ......
	var wg wait.Group
    //启动reflector
	wg.StartWithChannel(stopCh, r.Run)
    //启动processLoop
	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

reflector开始list and watch,代码位置:client-go/tools/cache/reflector.go

func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
	klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
			switch event.Type {
			//watch到add事件
			case watch.Added:
				err := r.store.Add(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			//watch到modified事件
			case watch.Modified:
				err := r.store.Update(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			//watch到delete事件
			case watch.Deleted:
				// TODO: Will any consumers need access to the "last known
				// state", which is passed in event.Object? If so, may need
				// to change this.
				err := r.store.Delete(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
				}
			case watch.Bookmark:
				// A `Bookmark` means watch has synced here, just update the resourceVersion
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
			}

以update为例

func (f *DeltaFIFO) Update(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	return f.queueActionLocked(Updated, obj)
}
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	oldDeltas := f.items[id]
	newDeltas := append(oldDeltas, Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)

	if len(newDeltas) > 0 {
		if _, exists := f.items[id]; !exists {
		    //将key放入到queue
			f.queue = append(f.queue, id)
		}
		//将newDeltas放入到items中
		f.items[id] = newDeltas
		//事件到达广播
		f.cond.Broadcast()
	} else {
		// This never happens, because dedupDeltas never returns an empty list
		// when given a non-empty list (as it is here).
		// If somehow it happens anyway, deal with it but complain.
		if oldDeltas == nil {
			klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
			return nil
		}
		klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
		f.items[id] = newDeltas
		return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
	}
	return nil
}

DeltaFIFO的数据结构如下:

type DeltaFIFO struct {
	// lock/cond protects access to 'items' and 'queue'.
	lock sync.RWMutex
	cond sync.Cond

	// `items` maps a key to a Deltas.
	// Each such Deltas has at least one Delta.
	items map[string]Deltas

	// `queue` maintains FIFO order of keys for consumption in Pop().
	// There are no duplicates in `queue`.
	// A key is in `queue` if and only if it is in `items`.
	queue []string

	// populated is true if the first batch of items inserted by Replace() has been populated
	// or Delete/Add/Update/AddIfNotPresent was called first.
	populated bool
	// initialPopulationCount is the number of items inserted by the first call of Replace()
	initialPopulationCount int

	// keyFunc is used to make the key used for queued item
	// insertion and retrieval, and should be deterministic.
	keyFunc KeyFunc

	// knownObjects list keys that are "known" --- affecting Delete(),
	// Replace(), and Resync()
	knownObjects KeyListerGetter

	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
	// Currently, not used to gate any of CRUD operations.
	closed bool

	// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
	// DeltaType when Replace() is called (to preserve backwards compat).
	emitDeltaTypeReplaced bool
}

到这里,已经将最新的数据推送到了DeltaFIFO的queue中,接下来看下怎么处理queue中的数据。

queue出队:
回到之前注册的processLoop

func (c *controller) processLoop() {
	for {
	    //从queue弹出数据,交由process处理,也就是之前注册的handleDeltas
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				// 重新入队queue
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
			// When Close() is called, the f.closed is set and the condition is broadcasted.
			// Which causes this loop to continue and return from the Pop().
			if f.closed {
				return nil, ErrFIFOClosed
			}
            //如果queue中没有数据,阻塞等待
			f.cond.Wait()
		}
		id := f.queue[0]
		f.queue = f.queue[1:]
		depth := len(f.queue)
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		item, ok := f.items[id]
		if !ok {
			// This should never happen
			klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
			continue
		}
		delete(f.items, id)
		// Only log traces if the queue depth is greater than 10 and it takes more than
		// 100 milliseconds to process one item from the queue.
		// Queue depth never goes high because processing an item is locking the queue,
		// and new items can't be added until processing finish.
		// https://github.com/kubernetes/kubernetes/issues/103789
		if depth > 10 {
			trace := utiltrace.New("DeltaFIFO Pop Process",
				utiltrace.Field{Key: "ID", Value: id},
				utiltrace.Field{Key: "Depth", Value: depth},
				utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
			defer trace.LogIfLong(100 * time.Millisecond)
		}
		//处理数据,重点看下这个方法,进入HandleDeltas
		err := process(item)
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		// Don't need to copyDeltas here, because we're transferring
		// ownership to the caller.
		return item, err
	}
}

代码位置 client-go/tools/cache/shared_informer.go

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Replaced, Added, Updated:
			s.cacheMutationDetector.AddObject(d.Object)
			//从本地缓存indexer中查询数据是否存在
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
			    //如果存在,则更新indexer中该数据
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}

				isSync := false
				switch {
				case d.Type == Sync:
					// Sync events are only propagated to listeners that requested resync
					isSync = true
				case d.Type == Replaced:
					if accessor, err := meta.Accessor(d.Object); err == nil {
						if oldAccessor, err := meta.Accessor(old); err == nil {
							// Replaced events that didn't change resourceVersion are treated as resync events
							// and only propagated to listeners that requested resync
							isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
						}
					}
				}
				//分发监听者,通知监听update
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
			    //如果不存在,则在indexer中添加该数据
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				//分发监听者,通知监听add
				s.processor.distribute(addNotification{newObj: d.Object}, false)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			//分发监听者,通知监听delete
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	if sync {
		for _, listener := range p.syncingListeners {
		    //往监听者加入数据
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
		    //往监听者加入数据
			listener.add(obj)
		}
	}
}
func (p *processorListener) add(notification interface{}) {
	p.addCh <- notification
}

数据分发到了监听者,那么监听者是什么时候注册的,又是怎么工作的呢?
联系到前面informer注册的eventHandler

podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("New pod added: %s", mObj.GetName())
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			oObj := oldObj.(v1.Object)
			nObj := newObj.(v1.Object)
			log.Printf("%s pod updated to %s", oObj.GetName(), nObj.GetName())
		},
		DeleteFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("pod deleted from store: %s", mObj.GetName())
		},
	})
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
	s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
	//省略代码
    //......
    //创建监听者
	listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

	if !s.started {
		s.processor.addListener(listener)
		return
	}

	// in order to safely join, we have to
	// 1. stop sending add/update/delete notifications
	// 2. do a list against the store
	// 3. send synthetic "Add" events to the new handler
	// 4. unblock
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()
    //添加监听者
	s.processor.addListener(listener)
	for _, item := range s.indexer.List() {
		listener.add(addNotification{newObj: item})
	}
}
func (p *sharedProcessor) addListener(listener *processorListener) {
	p.listenersLock.Lock()
	defer p.listenersLock.Unlock()

	p.addListenerLocked(listener)
	if p.listenersStarted {
	    //在不同的协程使监听者运行起来
		p.wg.Start(listener.run)
		p.wg.Start(listener.pop)
	}
}
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
	p.listeners = append(p.listeners, listener)
	p.syncingListeners = append(p.syncingListeners, listener)
}
func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			// Notification dispatched
			var ok bool
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		//联系前面distribute分发监听者的时候将notification发送到addCh
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
				// Optimize the case - skip adding to pendingNotifications
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}
func (p *processorListener) run() {
	// this call blocks until the channel is closed.  When a panic happens during the notification
	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
	// the next notification will be attempted.  This is usually better than the alternative of never
	// delivering again.
	stopCh := make(chan struct{})
	wait.Until(func() {
		for next := range p.nextCh {
		    //这里调用到用户定义的handler方法
			switch notification := next.(type) {
			case updateNotification:
				p.handler.OnUpdate(notification.oldObj, notification.newObj)
			case addNotification:
				p.handler.OnAdd(notification.newObj)
			case deleteNotification:
				p.handler.OnDelete(notification.oldObj)
			default:
				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
			}
		}
		// the only way to get here is if the p.nextCh is empty and closed
		close(stopCh)
	}, 1*time.Second, stopCh)
}

最后看一下informer的详细全局设计
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/567964.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Git简单使用介绍

Git作用 版本控制&#xff08;版本迭代&#xff09;&#xff0c;多人开发&#xff0c;没有版本控制&#xff0c;每修改一下文件就需要备份 常用的版本控制器&#xff1a;Git 和SVN 主要区别&#xff1a; SVN是集中式版本控制系统&#xff0c;版本库是集中放在中央服务器的&a…

半导体芯片划片机怎么使用

使用半导体芯片划片机的方法如下&#xff1a; 准备工作&#xff1a;清洁设备&#xff0c;核对晶圆数量和批次信息&#xff0c;确保晶圆完好无破损。 粘贴晶圆片&#xff1a;将待切割的晶圆片粘贴到蓝膜上&#xff0c;并将蓝膜框架放入划片机。 划片开始&#xff1a;实时清除划…

想劝大家别去外包,干了5年,彻底废了......

先说一下自己的情况&#xff0c;大专生&#xff0c;18年通过校招进入湖南某软件公司&#xff0c;干了接近5年的功能测试&#xff0c;今年年初&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落&#xff01; 而我已经在一个企业干了四…

什么是SQL Server 审核

IT 安全专业人员需要定期监视和审核 SQL Server 中的所有活动&#xff0c;以确保关键数据的完整性和机密性不会受到损害。手动监视服务器上的所有这些活动是一个忙碌的过程。为了使管理员的工作更轻松一些&#xff0c;Microsoft 提供了 SQL Server 审核功能来帮助管理员有效地审…

vue不同开发方式引用vue文件讲解

上面这个图是vue构建后dist目录的文件&#xff0c;这里面包含了各种开发环境所需要的vue文件 一、开发环境和生产环境 主要分为带有prod和不带prod的版本&#xff1b; 带有prod为生产环境版本并进行了代码压缩&#xff0c;没有运行中不会有警告、错误等详细的提示信息 不带pr…

《程序员面试金典(第6版)》面试题 02.07. 链表相交(查找节点操作,哈希表,双指针(先走n步法,交替遍历法))

题目描述 给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表没有交点&#xff0c;返回 null 。 题目传输门&#xff1a;面试题 02.07. 链表相交 图示两个链表在节点 c1 开始相交&#xff1a; 题目数据 保证 整个链式结…

Python——openpyxl读取Excel表格(读取、单元格修改、单元格加底色)

首先python读取Excel的库有很多&#xff0c;包括xlwings&#xff0c;pandas&#xff0c;xlrd等等&#xff0c;我比较常用的是openpyxl&#xff0c;以及pandas&#xff0c;当然还有喜欢将数据量比较大的Excel转为csv格式再读取。 今天我们用openpyxl来读取excel文件&#xff0c…

【Python爬虫】urllib模块:强大的网络爬虫利器,让你轻松获取万千信息!

前言&#xff1a; Python爬虫是一种获取互联网信息的技术&#xff0c;它可以自动化地从网站上抓取数据并进行处理。Python爬虫的优点在于它可以快速地获取大量数据&#xff0c;并且可以自动化地进行数据处理和分析。在Python爬虫中&#xff0c;urllib模块是一个非常重要的模块&…

python---变量(3)

求字符串的长度 使用len来求字符串中有几个字符 字符串的拼接 此时是把a2字符串拼接到a1字符串的末尾&#xff0c;得到更大的字符串&#xff0c;对于原来的a1和a2是没有影响的&#xff01; 不能把字符串和数字混合相加&#xff01; 这个时候程序就会报错&#xff0c;不能…

深入理解Linux虚拟内存管理

系列文章目录 Linux 内核设计与实现 深入理解 Linux 内核&#xff08;一&#xff09; 深入理解 Linux 内核&#xff08;二&#xff09; Linux 设备驱动程序&#xff08;一&#xff09; Linux 设备驱动程序&#xff08;二&#xff09; Linux 设备驱动程序&#xff08;三&#xf…

智慧档案馆库房环境要求中需要做到几防?

《档案馆工作通则&#xff08;国档发〔1983〕14号&#xff09;》要求档案库房必须坚固适用&#xff0c;并应具有抗震、防盗、防火、防水、防潮、防尘、防虫、防鼠、防高温、防强光等设施&#xff1b;《第13号国家档案局令《机关档案管理规定》》要求 做好防火、防盗、防紫外线、…

Docker 快速入门实用教程

文章结构 Docker 的常用使用概念安装Docker 简单使用镜像相关操作容器相关操作 Docker 的常用使用 参考&#xff1a;https://www.runoob.com/docker/macos-docker-install.html 概念 images 镜像&#xff08;镜像相当于类概念&#xff09; container 容器&#xff08;con…

Spring框架之体系结构和目录结构

Spring是由Rob Jonson租住和开发的一个分层的JavaEE/SE一站式&#xff08;full stack&#xff09;轻量级开发框架&#xff0c;他的核心思想是控制翻转(Inversion of Control IOC)和面向切面(Aspect Oriented Programming, aop)的编程&#xff0c;其中IoC是Spring的基础&#xf…

国标28181-2022 变更说明

此为国标28181协议的第3版本&#xff0c;变更调整较大&#xff1a;正式发布之后替代了原国标28181-2011协议和国标28181-2016版本的协议 此次参与修订的企业&#xff1a;公安部第一研究所&#xff0c;视频图像信息智能分析与共享应用技术国家工程实验室&#xff0c;国家信息中…

js正则校验特殊的不可见字符

背景 表单的输入框&#xff0c;用户可能从Excel或者其他地方直接复制粘贴&#xff0c;这时候提交到后端会导致获取的用户输入中包含一些特殊的不可见字符&#xff0c;比如tab键或者制表符等&#xff0c;这时需要在前端对用户输入做一些检验&#xff0c;检查是否存在不可见字符…

制作投票的链接制作投票链接的制作个投票链接

大家在选择投票小程序之前&#xff0c;可以先梳理一下自己的投票评选活动是哪种类型&#xff0c;目前有匿名投票、图文投票、视频投票、赛事征集投票等。 我们现在要以“摄影能力提升”为主题进行一次投票活动&#xff0c;我们可以在在微信小程序搜索&#xff0c;“活动星”投票…

LabVIEWCompactRIO 开发指南35 使用桌面执行节点

使用桌面执行节点 通常建议使用桌面执行节点来验证组件。由于它执行FPGA VI仿真模式&#xff0c;因此可以为包含目标资源&#xff08;如I/O和存储器项目&#xff09;的VI开发测试。本节介绍为组件测试设置桌面执行节点的步骤。 考虑一个以LabVIEW FPGA组件为该组件的示例&…

DNC数控机床联网及数据采集系统@杭州乐芯科技

项目背景 中国政府高度重视工业化与信息化融合&#xff0c;十八大以后&#xff0c;已经将“两化深度融合”上升为国家战略。工信部在2013年9月的“两化深度融合专项行动”中重点强调“培育数字化车间、智能工厂&#xff0c;推广智能制造生产模式”以及“在重点行业组织开展试点…

《分布式缓存(二)- Redis主从》

文章目录 Redis主从1.搭建主从架构1.1.Redis主从Linux版1.2.Redis主从Windows版2.主从数据同步原理2.1.全量同步2.2.增量同步2.3.repl_backlog原理3.主从同步优化4.小结Redis主从 1.搭建主从架构 单节点Redis的并发能力是有上限的,要进一步提高Redis的并发能力,就需要搭建主…

virtualenv使用教程

添加不同版本的python虚拟环境时要把path添加到系统变量里 运行代码 先激活&#xff0c;在对应版本的cd Scripts目录下输入 .\activate 激活后&#xff0c;输入以下&#xff0c;注意路径检查 python xxx.py