informer中的WorkQueue机制的实现分析与源码解读(1)

news2025/1/13 13:06:49

背景

client-go中的workqueue包里主要有三个队列,分别是普通队列Queue,延时队列DelayingQueue,限速队列RateLimitingQueue,后一个队列以前一个队列的实现为基础,层层添加新功能。

workqueue是整个client-go源码的重点和难点。采用层层拨开分步理解有助于理解workqueue的源码。本文重点在从源码角度了解下workqueue的add(),get(),done()方法执行的过程。关于延时队列与限速队列是如何实现的后面再单独讨论。

workqueue源码分析

代码结构

源码位于:vendor/k8s.io//client-go/util/workqueue/queue.go

queue类型的定义

下面是queue类型定义。其中queue、dirty、processing 都保存 items。它们的区别是:

  • queue是有序列表用来存储 item 的处理顺序。

  • dirty集合存储的是所有需要处理的 item,是set类型,无序,用于保证items的唯一。dirty的字面意思就是需要被处理的数据。

  • processing集合存储的是当前正在处理的 item,也是set类型,无序,用于保证items的唯一。

// Type is a work queue (see the package comment).
type Type struct {
	// queue defines the order in which we will work on items. Every
	// element of queue should be in the dirty set and not in the
	// processing set.
	queue []t     // 是一个切片保证对象入队的顺序性。每个在queue队列的对象,必须同时也在dirty集合。

	// dirty defines all of the items that need to be processed.
	dirty set    // 是一个set集合,保证对象的唯一性。存放需要被处理的对象

	// Things that are currently being processed are in the processing set.
	// These things may be simultaneously in the dirty set. When we finish
	// processing something and remove it from this set, we'll check if
	// it's in the dirty set, and if so, add it to the queue.
	processing set     // 也是一个set集合,保证对象的唯一性

	cond *sync.Cond    // 安全处理队列里面的对象

	shuttingDown bool    // 队列是否处理关闭中

	metrics queueMetrics // 用于计数统计

	unfinishedWorkUpdatePeriod time.Duration
	clock                      clock.Clock
}

要理解workqueue是的工作机制,必须要了解queue队列3个重要的方法,Add,Get,Done。接下来展开分析。

Add方法

informer机制中,当一个对象从DeltaFIFO队列中pod弹出后,会转到AddEventHandler事件处理函数处理,AddEventHandler需要调用workqueue的Add方法,先把对象加入队列,等下用户任务来处理。

Add方法是将item加入队列q.queue和待处理集合q.dirty。若该item正在被处理只加入q.dirty。

// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
  // 如果队列处于shuttdingDown状态就返回
	if q.shuttingDown {
		return
	}
  // 如果dirty里面有这个item,就返回
	if q.dirty.has(item) {
		return
	}
  // 添加到metric用于计数
	q.metrics.add(item)
  // 插入到dirty队列
	q.dirty.insert(item)
  // 如果processing队列已经有这个item,就返回
	if q.processing.has(item) {
		return
	}
  // 如果processing队列没有这个item,就加入queue队列
	q.queue = append(q.queue, item)
  // 发信号让其他goroutine处理
	q.cond.Signal()
}

在执行Add()添加对象到workqueue时,主要有三种场景,如下图所示

场景一:当3个队列都没有这个对象时,对象插入到queue和dirty

场景二:当某个对象已经加入到了队列,但还未开始被处理时,就直接返回不再加入队列。

场景三:当某个对象处于”处理中“状态,也就是位于processing队列中时,会把元素加入到dirty队列。当处理完,执行Done()方法后,item会被重新加入queue队列。

Get方法

Get方法是从 queue队列中取出一个元素item加入正处理集合q.processing,并从queue队列中删除,从dirty中删除。

// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Type) Get() (item interface{}, shutdown bool) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
  // 如果queue队列为空,并且队列不是处于shuttingDown状态就阻塞等待Add()对象到队列
	for len(q.queue) == 0 && !q.shuttingDown {
		q.cond.Wait()
	}
  // 如果queue队列为空了,而且队列处于shuttingDown状态,就返回空值
	if len(q.queue) == 0 {
		// We must be shutting down.
		return nil, true
	}
  // 从queue队列弹出一个元素
	item, q.queue = q.queue[0], q.queue[1:]
  // 计数
	q.metrics.get(item)
  // 插入到processing队列
	q.processing.insert(item)
	q.dirty.delete(item)
	return item, false
}

Get方法的图示

Done方法

Done方法是表明这个元素item被处理完了,从processing队列删除。这里加了一个判断,如果dirty中还存在,还要将其加入 queue队列。

// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	q.metrics.done(item)
	
  // 直接从processing队列删除
	q.processing.delete(item)
  // 如果dirty队列里面还有这个对象(通常是处理元素过程中,对象再次入队了),就将元素从新加到queue队列。
	if q.dirty.has(item) {
		q.queue = append(q.queue, item)
		q.cond.Signal()
	}
}

在执行Add()添加对象到workqueue时,主要有二种场景,如下图所示

场景一:对象item完成处理,并且处理过程中该对象没有再次入队

场景二:对象item在处理过程中,还没处理完之前,这个对象又入队被加入了dirty队列(也就是此时执行了Add方法)。当执行Done()后,item会被重新添加(re-add)到queue队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1980092.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

每日学术速递8.5—1

1.SV4D: Dynamic 3D Content Generation with Multi-Frame and Multi-View Consistency 标题: SV4D:具有多帧和多视图一致性的动态 3D 内容生成 作者:Yiming Xie, Chun-Han Yao, Vikram Voleti, Huaizu Jiang, Varun Jampani 文章链接&…

LinuxC++(10):调用可执行程序

认识system函数 可以直接用system在代码中实现调用shell命令 /bin/ls -l /tmp表示执行ls -l命令,打开/tmp地址 而前面的/bin/表示这是shell命令,不可少,可以认为,/bin/后面的就是等价于shell里面输入的命令。 然后,cou…

* (头指针分离自 9822ba4) ,提交代码不能到分支——游离分支

背景 通过git checkout commitId(之前的一个版本); 基于这个版本修改提交代码推送代码,但是远端没有更新最新数据。 操作 通过git checkout commitId(之前的一个版本);通过git branch 查看分支情况,发现所处分支在游离分支:切换到master分…

连接池的原理

文章目录 1. 连接池的含义2. 连接池的作用2.1 不使用连接池的情况2.2 使用连接池的情况 3. 连接池和线程池的关系4. 连接池设计要点5. 使用实测 1. 连接池的含义 数据库连接池(Connection pooling)是程序启动时建立足够的数据库连接,并将这些…

《Unity3D网络游戏实战》学习与实践

纸上得来终觉浅,绝知此事要躬行~ Echo 网络上的两个程序通过一个双向的通信连接实现数据交换,这个连接的一端称为一个Socket “端口”是英文port的意译,是设备与外界通信交流的出口。每台计算机可以分配0到65535共65536个端口 每一条Sock…

Java | Leetcode Java题解之第322题零钱兑换

题目&#xff1a; 题解&#xff1a; public class Solution {public int coinChange(int[] coins, int amount) {int max amount 1;int[] dp new int[amount 1];Arrays.fill(dp, max);dp[0] 0;for (int i 1; i < amount; i) {for (int j 0; j < coins.length; j)…

基于springboot+vue+uniapp的智慧物业平台小程序

开发语言&#xff1a;Java框架&#xff1a;springbootuniappJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#…

c++ 容器 vector

vector的意思就是向量&#xff0c;就是一个顺序表的意思&#xff0c;这个顺序表可以存任意的类型&#xff0c;因为其线性的内存特点&#xff0c;所以在stl里是经常被使用的存在。 vector vector既然要能储存任意的变量&#xff0c;那么就必须使用模板: 这里的T就是变量类型&a…

【QT】鼠标按键事件 - QMouseEvent QKeyEvent

qt 事件 事件1. 事件概念2. 事件的处理3. 按键事件&#xff08;1&#xff09;单个按键&#xff08;2&#xff09;组合按键 4. 鼠标事件&#xff08;1&#xff09;鼠标单击事件&#xff08;2&#xff09;鼠标释放事件&#xff08;3&#xff09;鼠标双击事件&#xff08;4&#x…

【多线程-从零开始-叁】线程的核心操作

一、创建一个线程-start() start 和 run 的区别&#xff1a;&#xff08;经典面试题&#xff09; run 描述了线程要执行的任务&#xff0c;也可以称为“线程的入口”此处 start 会根据不同的系统&#xff0c;分别调用不同的 API&#xff0c;来执行系统函数&#xff0c;在系统…

原生多模态跟GPT聊天部分测试,大家都用他来做什么;字节推出AI音乐产品-海绵音乐,可以媲美Udio和Suno

✨ 1: 跟GPT聊天 原生多模态跟GPT聊天部分测试&#xff0c;大家都用他来做什么。 说各个国家的语言&#xff0c;例如普通话&#xff0c;或者是广东话等。 ChatGPT担任富有激情的足球比赛解说员 使用新的高级语音模式 视觉&#xff0c;进行实时日语翻译&#xff01; 地址&…

Java编程规范 空格

public static void main(String[] args) { // 缩进4 个空格 String say "hello"; // 运算符的左右必须有一个空格 int flag 0; // 关键词if 与括号之间必须有一个空格&#xff0c;括号内的f与左括号&#xff0c;0与右括号不需要空格 if (flag 0) { System…

秃姐学AI系列之:模型选择 | 欠拟合和过拟合 | 权重衰退

目录 训练误差 泛化误差 验证数据集和测试数据集 验证数据集 Validation Dataset&#xff1a; 测试数据集&#xff1a; K-则交叉验证 总结 过拟合和欠拟合 模型容量 模型容量的影响 估计模型容量 数据复杂度 总结 权重衰退 weight decay 使用均方范数作为硬性…

【八】Zookeeper3.7.1集成Hadoop3.3.4集群安装

文章目录 1.基本原理2.下载并解压ZooKeeper3.配置环境变量4.配置ZooKeeper5.创建数据目录并初始化myid6.启动ZooKeeper7.配置ZooKeeper集成到Hadoop8.重启Hadoop9.ZooKeeper状态检查 1.基本原理 ZooKeeper 是一个分布式协调服务&#xff0c;用于分布式系统中管理配置信息、命名…

51单片机—智能垃圾桶(定时器)

一. 定时器 1. 简介 C51中的定时器和计数器是同一个硬件电路支持的&#xff0c;通过寄存器配置不同&#xff0c;就可以将他当做定时器或者计数器使用。 确切的说&#xff0c;定时器和计数器区别是致使他们背后的计数存储器加1的信号不同。当配置为定时器使用时&#xff0c;每…

vue3 手写日历组件

找了很久vue3的element样式一直没办法修改实现。只能手写日历了。借鉴了一些大佬的代码 调用&#xff1a; 再要使用的地方引入 import calendarelement from ./calendarelement.vue //日历组件 <div > <calendarelement /> //日历</div> 效果&#…

押金原路退回系统在医院中应用,一键操作秒到账 押金+身份证+电子押金单

一、医院押金管理必要性 保障医疗服务的连续性&#xff1a;患者缴纳押金能够确保在治疗过程中&#xff0c;医院有足够的资金来提供必要的医疗服务、药品和设备&#xff0c;不会因为费用问题而中断治疗。例如&#xff0c;在紧急手术或需要持续使用昂贵药物的情况下&#xff0c;…

【Vue3】组件通信之$attrs

【Vue3】组件通信之$attrs 背景简介开发环境开发步骤及源码总结 背景 随着年龄的增长&#xff0c;很多曾经烂熟于心的技术原理已被岁月摩擦得愈发模糊起来&#xff0c;技术出身的人总是很难放下一些执念&#xff0c;遂将这些知识整理成文&#xff0c;以纪念曾经努力学习奋斗的…

开发在线客服系统新的宣传推广站【微客客服】

打造一个软件宣传官网&#xff0c;这事儿可不简单。咱们得先搞清楚&#xff0c;这个网站要给谁看&#xff0c;要传达啥信息&#xff0c;需要哪些功能。 我们网站是宣传【在线客服系统】的&#xff0c;所以需要把主要功能展示清楚 在线网址&#xff1a;https://weikefu.com.cn 然…

Python面试宝典第27题:全排列

题目 给定一个不含重复数字的数组nums&#xff0c;返回其所有可能的全排列 。备注&#xff1a;可以按任意顺序返回答案。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3] 输出&#xff1a;[[1,2,3], [1,3,2], [2,1,3], [2,3,1], [3,1,2], [3,2,1]] 示例 2&#xff1a; 输…