[k8s源码]9.workqueue

news2025/1/13 6:12:23

client-go 是一个库,提供了与 Kubernetes API 服务器交互的基础设施。它提供了诸如 Informer、Lister、ClientSet 等工具,用于监听、缓存和操作 Kubernetes 资源。而自定义控制器则利用这些工具来实现特定的业务逻辑和自动化任务。业务逻辑实现:client-go 不包含特定的业务逻辑。自定义控制器允许实现特定于您的应用程序或需求的逻辑。扩展 Kubernetes:通过自定义控制器,可以扩展 Kubernetes 的功能,处理自定义资源或实现特定的自动化任务。响应资源变化:自定义控制器可以监听特定资源的变化,并据此执行相应的操作。

而这里的workqueue是costromer Controller的一部分:

逻辑

当我们创建一个workqueue的时候,到底发生了什么

queue := workqueue.New()

该方法调用的new方法又调用了NewWithConfig()以及newQueueWithConfig().可以看到逐级返回以后,返回的是一个type类型的数据。 

func New() *Type {
	return NewWithConfig(QueueConfig{
		Name: "",
	})
}
func NewTyped[T comparable]() *Typed[T] {
	return NewTypedWithConfig(TypedQueueConfig[T]{
		Name: "",
	})
}

func NewWithConfig(config QueueConfig) *Type {
	return NewTypedWithConfig(config)
}

func NewTypedWithConfig[T comparable](config TypedQueueConfig[T]) *Typed[T] {
	return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod)
}

// newQueueWithConfig constructs a new named workqueue
// with the ability to customize different properties for testing purposes
func newQueueWithConfig[T comparable](config TypedQueueConfig[T], updatePeriod time.Duration) *Typed[T] {
	var metricsFactory *queueMetricsFactory
	if config.MetricsProvider != nil {
		metricsFactory = &queueMetricsFactory{
			metricsProvider: config.MetricsProvider,
		}
	} else {
		metricsFactory = &globalMetricsFactory
	}

	if config.Clock == nil {
		config.Clock = clock.RealClock{}
	}

	if config.Queue == nil {
		config.Queue = DefaultQueue[T]()
	}

	return newQueue(
		config.Clock,
		config.Queue,
		metricsFactory.newQueueMetrics(config.Name, config.Clock),
		updatePeriod,
	)
}
TypedInterface

Interface 被标记为废弃(Deprecated),并建议使用 TypedInterface 代替。这种变化主要是因为 Go 语言引入了泛型特性。TypedInterface[T comparable] 使用了泛型,T 是一个类型参数,它必须是可比较的(comparable)。泛型允许在编译时进行类型检查,提供了更好的类型安全性。使用 TypedInterface[T] 可以在编译时捕获类型错误,而不是在运行时。

这里最后返回了一个newQueue,而它的定义如下:

func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMetrics, updatePeriod time.Duration) *Typed[T] {
	t := &Typed[T]{
		clock:                      c,
		queue:                      queue,
		dirty:                      set[T]{},
		processing:                 set[T]{},
		cond:                       sync.NewCond(&sync.Mutex{}),
		metrics:                    metrics,
		unfinishedWorkUpdatePeriod: updatePeriod,
	}
	// Don't start the goroutine for a type of noMetrics so we don't consume
	// resources unnecessarily
	if _, ok := metrics.(noMetrics); !ok {
		go t.updateUnfinishedWorkLoop()
	}
	return t
}

那么Type类型到底是什么:Type 是 Typed[any] 的一个别名。这意味着 Type 可以在任何使用 Typed[any] 的地方使用,它们是完全等价的。

type Type = Typed[any]
type Typed[t comparable] struct {
	queue Queue[t]
	// dirty defines all of the items that need to be processed.
	dirty set[t]
	// Things that are currently being processed are in the processing set.
	// These things may be simultaneously in the dirty set. When we finish
	// processing something and remove it from this set, we'll check if
	// it's in the dirty set, and if so, add it to the queue.
	processing set[t]
	cond *sync.Cond
	shuttingDown bool
	drain        bool
	metrics queueMetrics
	unfinishedWorkUpdatePeriod time.Duration
	clock                      clock.WithTicker
}

type empty struct{}
type t interface{}
type set[t comparable] map[t]empty

 这里有两个set,一个是process一个是dirty,一个项目可能同时存在于这两个集合中。这是因为一个正在处理的项目(在 processing 中)可能在处理过程中被标记为需要重新处理(因此也在 dirty 中)。如果它在 dirty 集合中,说明在处理过程中它被标记为需要重新处理。这时,系统会将它重新加入到处理队列中。

这里的t是一个空接口,允许存储任何形式的kubernetes资源。

这里还定义了接口,而Type实现了这个接口。

type Interface interface {
	Add(item interface{})
	Len() int
	Get() (item interface{}, shutdown bool)
	Done(item interface{})
	ShutDown()
	ShutDownWithDrain()
	ShuttingDown() bool
}
dirty队列

添加任务:当有新任务时,首先检查它是否已经在 dirty 中。如果不在,就添加进去。开始处理:当开始处理一个任务时,将它从 dirty 中移除。重新添加:如果一个正在处理的任务需要重新处理,就把它再次加入 dirty。dirty 帮助工作队列系统更高效地管理需要处理的任务,避免重复工作,并能快速决定是否需要添加新任务到处理队列中。 

各种类型的queue

在k8s.io/client-go/util/workqueue中查看。

从上面的例子可以看到,一个queue是有很多参数的,如果只是简单的通过new来创建,很多参数都是默认的参数。

限速队列

k8s.io/client-go/util/workqueue/default-rate-limiters.go

限速队列应用得非常广泛,比如在我们做一些操作失败后希望重试几次,但是立刻重试很有可能还是会失败,这个时候我们可以延迟一段时间再重试,而且失败次数越多延迟时间越长,这个其实就是限速。首先我们需要来了解下限速器

type RateLimiter TypedRateLimiter[any]

type TypedRateLimiter[T comparable] interface {
	// When gets an item and gets to decide how long that item should wait
	When(item T) time.Duration
	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for failing
	// or for success, we'll stop tracking it
	Forget(item T)
	// NumRequeues returns back how many failures the item has had
	NumRequeues(item T) int
}

TypedBucketRateLimiter (令牌桶限速器)
这个限速器基于令牌桶算法。想象一个固定容量的桶,桶里装着令牌。令牌以固定的速率被加入到桶中。当一个请求(或任务)到来时,它需要从桶中获取一个令牌。如果桶中有令牌,请求可以立即处理。如果桶是空的,请求必须等待直到新的令牌被加入。这种方法可以很好地控制平均处理速率,同时允许短时间的突发流量。
TypedItemExponentialFailureRateLimiter (指数退避限速器)
这个限速器根据失败次数增加等待时间:每次失败,等待时间会指数增加(基础延迟 * 2^失败次数)。有一个最大延迟时间,防止等待时间无限增长。
TypedItemFastSlowRateLimiter (快慢双速限速器)
这个限速器有两种速率:快速和慢速:在最初的几次尝试中使用快速延迟。超过设定的尝试次数后,切换到慢速延迟。适用于需要快速重试几次,然后如果仍然失败就减慢重试频率的场景。
TypedMaxOfRateLimiter (最大值限速器)
这个限速器组合了多个其他限速器:
它包含一个限速器的列表。当需要决定延迟时间时,它会询问所有的限速器。然后返回所有限速器中最长的延迟时间。这允许你组合多种限速策略,总是使用最保守(最慢)的那个。
TypedWithMaxWaitRateLimiter (最大等待时间限速器)

从代码中可以看到,有一个基础的RateLimiter的接口interface,然后其余的结构体都是这个端口的实现:

type TypedBucketRateLimiter[T comparable] struct {
	*rate.Limiter
}
ype TypedItemExponentialFailureRateLimiter[T comparable] struct {
	failuresLock sync.Mutex
	failures     map[T]int

	baseDelay time.Duration
	maxDelay  time.Duration
}
type TypedItemFastSlowRateLimiter[T comparable] struct {
	failuresLock sync.Mutex
	failures     map[T]int

	maxFastAttempts int
	fastDelay       time.Duration
	slowDelay       time.Duration
}
type TypedMaxOfRateLimiter[T comparable] struct {
	limiters []TypedRateLimiter[T]
}
type TypedWithMaxWaitRateLimiter[T comparable] struct {
	limiter  TypedRateLimiter[T]
	maxDelay time.Duration
}

他们的new函数(部分)

func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {
	return NewTypedWithMaxWaitRateLimiter[any](limiter, maxDelay)
}

func NewTypedWithMaxWaitRateLimiter[T comparable](limiter TypedRateLimiter[T], maxDelay time.Duration) TypedRateLimiter[T] {
	return &TypedWithMaxWaitRateLimiter[T]{limiter: limiter, maxDelay: maxDelay}
}

接口实现:

func (w TypedWithMaxWaitRateLimiter[T]) When(item T) time.Duration {
	delay := w.limiter.When(item)
	if delay > w.maxDelay {
		return w.maxDelay
	}

	return delay
}

func (w TypedWithMaxWaitRateLimiter[T]) Forget(item T) {
	w.limiter.Forget(item)
}

func (w TypedWithMaxWaitRateLimiter[T]) NumRequeues(item T) int {
	return w.limiter.NumRequeues(item)
}

 我们可以看到有的限速器需要一个基础限速器:NewTypedWithMaxWaitRateLimiter是从多个限速器中取得最大的限速时间。(这里函数名称不同,源代码里是NewTypedWithMaxWaitRateLimiter而实际演示代码是NewWithMaxWaitRateLimiter,这是因为源码读的是最新版,而实际安装的go是1.22,所以不一样,但是只有增加和缺少Type的区别)

baseRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second)
ratelimiter :=workqueue.NewWithMaxWaitRateLimiter(baseRateLimiter,10*time.Second)
ratelimitedQueue := workqueue.NewRateLimitingQueue(ratelimiter)
延迟队列 
type DelayingInterface interface {
	Interface
	// AddAfter adds an item to the workqueue after the indicated duration has passed
	AddAfter(item interface{}, duration time.Duration)
}
type delayingType struct {
	Interface

	// clock tracks time for delayed firing
	clock clock.Clock

	// stopCh lets us signal a shutdown to the waiting loop
	stopCh chan struct{}
	// stopOnce guarantees we only signal shutdown a single time
	stopOnce sync.Once

	// heartbeat ensures we wait no more than maxWait before firing
	heartbeat clock.Ticker

	// waitingForAddCh is a buffered channel that feeds waitingForAdd
	waitingForAddCh chan *waitFor

	// metrics counts the number of retries
	metrics retryMetrics
}

func NewDelayingQueue() DelayingInterface {
	return NewDelayingQueueWithConfig(DelayingQueueConfig{})
}

具体实现:可以看到NewDelayingQueue()->NewDelayingQueueWithConfig{return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider)},然后有一个newDelayingQueue但是带有参数的方法,这里的new的n是小写的,代表这是一个私有的方法,可以看到最后返回的是一个delayingType。而NewDelayingQueue()返回的是一个interface。

func NewDelayingQueue() DelayingInterface {
	return NewDelayingQueueWithConfig(DelayingQueueConfig{})
}

func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface {
	if config.Clock == nil {
		config.Clock = clock.RealClock{}
	}

	if config.Queue == nil {
		config.Queue = NewWithConfig(QueueConfig{
			Name:            config.Name,
			MetricsProvider: config.MetricsProvider,
			Clock:           config.Clock,
		})
	}

	return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider)
}

func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider MetricsProvider) *delayingType {
	ret := &delayingType{
		Interface:       q,
		clock:           clock,
		heartbeat:       clock.NewTicker(maxWait),
		stopCh:          make(chan struct{}),
		waitingForAddCh: make(chan *waitFor, 1000),
		metrics:         newRetryMetrics(name, provider),
	}

	go ret.waitingLoop()
	return ret
}
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
	// don't add if we're already shutting down
	if q.ShuttingDown() {
		return
	}
	q.metrics.retry()
	// immediately add things with no delay
	if duration <= 0 {
		q.Add(item)
		return
	}
	select {
	case <-q.stopCh:
		// unblock if ShutDown() is called
	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
	}
}

可以看到还有很多变种,但是最后都会调用 NewDelayingQueue但是带有参数的方法。


// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
// inject custom queue Interface instead of the default one
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
	return NewDelayingQueueWithConfig(DelayingQueueConfig{
		Name:  name,
		Queue: q,
	})
}

// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability.
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewNamedDelayingQueue(name string) DelayingInterface {
	return NewDelayingQueueWithConfig(DelayingQueueConfig{Name: name})
}

// NewDelayingQueueWithCustomClock constructs a new named workqueue
// with ability to inject real or fake clock for testing purposes.
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface {
	return NewDelayingQueueWithConfig(DelayingQueueConfig{
		Name:  name,
		Clock: clock,
	})
}

为什么newDelayingQueue返回的是type类型,而他的上级返回的是interface类型呢?以下面的代码为例:advancedAnimal实现的结构体包含一个interface:animal,和一个trick。这个trick字段是为了实现PerformTrick方法。

func (a *advancedAnimalType) PerformTrick() string {
    return a.trick
}

这个接收advancedAnimal的函数实现了PerformTrick(),所以可以看作是advancedAnimal实现了AdvancedAnimal的interface。 所以在下面的New函数中,虽然返回的是advancedAnimalType,但是最后NewAdvancedAnimal返回的是interface类型。

func NewAdvancedAnimal(config AdvancedAnimalConfig) AdvancedAnimal {
    if config.Animal == nil {
        config.Animal = NewAnimal(config.Species, config.Sound, config.Movement)
    }

    return &advancedAnimalType{
        Animal: config.Animal,
        trick:  config.Trick,
    }
}

type Animal interface {
    Speak() string
    Move() string
}
// 扩展的 AdvancedAnimal 接口
type AdvancedAnimal interface {
    Animal
    PerformTrick() string
}
// 基本的动物实现
type basicAnimal struct {
    species string
    sound   string
    movement string
}
func (a *basicAnimal) Speak() string {
    return a.sound
}
func (a *basicAnimal) Move() string {
    return a.movement
}
// 高级动物实现
type advancedAnimalType struct {
    Animal
    trick string
}

func (a *advancedAnimalType) PerformTrick() string {
    return a.trick
}
// 创建基本动物的函数
func NewAnimal(species, sound, movement string) Animal {
    return &basicAnimal{
        species: species,
        sound:   sound,
        movement: movement,
    }
}
// 创建高级动物的函数
func NewAdvancedAnimal(config AdvancedAnimalConfig) AdvancedAnimal {
    if config.Animal == nil {
        config.Animal = NewAnimal(config.Species, config.Sound, config.Movement)
    }

    return &advancedAnimalType{
        Animal: config.Animal,
        trick:  config.Trick,
    }
}
// 配置结构体
type AdvancedAnimalConfig struct {
    Animal   Animal
    Species  string
    Sound    string
    Movement string
    Trick    string
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1946590.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jmeter实战(2)- 入门使用教程

一、运行Jmeter 参考上一篇博客&#xff1a;jmeter实战&#xff08;1&#xff09;- Mac环境安装 二、创建线程组 JMeter的线程组是进行负载测试的基本构建单元&#xff0c;它用于模拟多个用户对目标系统进行并发访问。线程组中的属性允许你控制测试的并发级别和执行模式。 1.…

聚观早报 | Meta将推出新款AR眼镜;iPhone SE 4将升级显示屏

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 7月24日消息 Meta将推出新款AR眼镜 iPhone SE 4将升级显示屏 华硕天选Air 2024开启预约 巴菲特再次减持比亚迪股…

DT浏览器首页征集收录海内外网址

DT浏览器首页征集收录海内外网址&#xff0c;要求页面整洁&#xff0c;内容丰富&#xff0c;知识性和可读性强&#xff0c;符合大众价值观&#xff0c;不含恶意代码

linux添加普通用户后无法使用K8S的kubectl命令怎么办/Linux普通用户管理K8S/Linux下普通用户无法使用K8S命令

1.给Linux添加普通用户 sudo useradd mqq #添加mqq账号 sudo passwd mqq #给mqq账号设置密码&#xff0c;需要输入2次&#xff0c;我输入密码是Admin1232.利用mqq用户输入K8S命令报错 3.给mqq用户提权 suduers文件位于路径/etc/sudoers #编辑文件/etc/sudoers vim /etc/su…

第十四章 数据库

第十四章 数据库 14.1 引言 数据存储在传统上是使用单独的没有关联的文件&#xff0c;称为平面文件 14.1.1 定义 定义&#xff1a;数据库是一个组织内被应用程序使用的逻辑相一致的相关数据的集合 14.1.2 数据库的优点 数据库的优点&#xff1a; 冗余少避免数据的不一致…

ZLMRTCClient配置说明与用法(含示例)

webRTC播放视频 后面在项目中会用到通过推拉播放视频流的技术&#xff0c;所以最近预研了一下webRTC 首先需要引入封装好的webRTC客户端的js文件ZLMRTCClient.js 下面是地址需要的自行下载 http://my.zsyou.top/2024/ZLMRTCClient.js 配置说明 new ZLMRTCClient.Endpoint…

小猪佩奇.js

闲着没事 使用js 画一个小猪佩奇把 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</tit…

BUUCTF [MRCTF2020]Ezpop

这道题对于刚接触到pop链的我直接把我整懵了&#xff0c;一边看着魔术方法一边分析 魔术方法可以看这里PHP 魔术方法 - 简介 - PHP 魔术方法 - 简单教程&#xff0c;简单编程 (twle.cn) 代码解析 经过以上的分析我们可以理一下解题思路&#xff1a;接收参数反序列化之前先触发…

基于 HTML+ECharts 实现智慧交通数据可视化大屏(含源码)

构建智慧交通数据可视化大屏&#xff1a;基于 HTML 和 ECharts 的实现 随着城市化进程的加快&#xff0c;智慧交通系统已成为提升城市管理效率和居民生活质量的关键。通过数据可视化&#xff0c;交通管理部门可以实时监控交通流量、事故发生率、道路状况等关键指标&#xff0c;…

C#使用csvhelper实现csv的操作

新建控制台项目 安装csvhelper 33.0.1 写入csv 新建Foo.cs namespace CsvSut02;public class Foo {public int Id { get; set; }public string Name { get; set; } }批量写入 using System.Globalization; using CsvHelper; using CsvHelper.Configuration;namespace Csv…

Python3网络爬虫开发实战(2)爬虫基础库

文章目录 一、urllib1. urlparse 实现 URL 的识别和分段2. urlunparse 用于构造 URL3. urljoin 用于两个链接的拼接4. urlencode 将 params 字典序列化为 params 字符串5. parse_qs 和 parse_qsl 用于将 params 字符串反序列化为 params 字典或列表6. quote 和 unquote 对 URL的…

通信原理思科实验五:家庭终端以太网接入Internet实验

实验五 家庭终端以太网接入Internet实验 一实验内容 二实验目的 三实验原理 四实验步骤 1.按照上图选择对应的设备&#xff0c;并连接起来 为路由器R0两个端口配置IP 为路由器R1端口配置IP 为路由器设备增加RIP&#xff0c;配置接入互联网的IP的动态路由项 5.为路由器R1配置静…

Mysql-索引视图

目录 1.视图 1.1什么是视图 1.2为什么需要视图 1.3视图的作用和优点 1.4创建视图 1.5更新视图 1.6视图使用规则 1.7修改视图 1.8删除视图 2.索引 2.1什么是索引 2.2索引特点 2.3索引分类 2.4索引优缺点 2.5创建索引 2.6查看索引 2.7删除索引 1.视图 1.1什么是…

[Javascript】前端面试基础3【每日学习并更新10】

Web开发中会话跟踪的方法有那些 cookiesessionurl重写隐藏inputip地址 JS基本数据类型 String&#xff1a;用于表示文本数据。Number&#xff1a;用于表示数值&#xff0c;包括整数和浮点数。BigInt&#xff1a;用于表示任意精度的整数。Boolean&#xff1a;用于表示逻辑值…

流量录制与回放:jvm-sandbox-repeater工具详解

在软件开发和测试过程中&#xff0c;流量录制与回放是一个非常重要的环节&#xff0c;它可以帮助开发者验证系统在特定条件下的行为是否符合预期。本文将详细介绍一款强大的流量录制回放工具——jvm-sandbox-repeater&#xff0c;以及如何利用它来提高软件测试的效率和质量。 …

linux进程——解析命令行参数——环境变量详解

前言&#xff1a;本节内容还是linux进程&#xff0c; 主要讲解里面的环境变量——我们首先要知道的就是环境变量其实就是操作系统维护的一组kv值&#xff0c; 环境变量是系统提供的一组 变量名变量值 形式的变量。不同的环境变量之间具有不同的用途&#xff0c; 并且具有全局属…

刷机维修进阶教程-----何谓“tee损坏” 指纹丢失 掉帧 传感器失效?详细修复步骤教程

TEE损坏指的是安卓机型中Key Attestation密钥认证所依赖的可信应用中的证书库被破坏了。然后拒绝为指纹密匙认证提供服务。加密的密匙由TEE负责管理。tee损坏只影响当前机型的密匙认证。不影响加密。通俗的理解。如果你机型维修或者刷机或者解锁或者格机 全檫除分区等等后有异常…

Python自然语言处理库之NLTK与spaCy使用详解

概要 自然语言处理(NLP)是人工智能和数据科学领域的重要分支,致力于让计算机理解、解释和生成人类语言。在Python中,NLTK(Natural Language Toolkit)和spaCy是两个广泛使用的NLP库。本文将详细介绍NLTK和spaCy的特点、功能及其使用方法,并通过具体示例展示如何使用这两…

押金原路退回,手机版一键操作秒到账,无需设备收银设备

传统收银 1.操作不熟练&#xff1a;对收银系统的功能和操作不熟悉&#xff0c;可能导致收银过程较长&#xff0c;顾客等待时间增加。 2无法适应新技术&#xff1a;对于一些新的支付方式或技术&#xff0c;不能自主学 3操作不熟练&#xff1a;鼠标使用不熟练&#xff0c;对收…

element DatePicker 日期选择器,只选择月,月份数据从0开始(0月31日),而不是从1开始,JavaScript的原因,如下为解决办法

<el-date-pickerv-model"queryParam.yuedTime"type"month"placeholder"选择月"change"handleMonthChange"></el-date-picker> 方式一&#xff1a;在前端处理 change"handleMonthChange" &#xff0c;增加此方…