12. WorkQueue(工作队列)

news2025/1/18 20:32:39

WorkQueue

WorkQueue 称为工作队列,Kubernetes 的 WorkQueue 队列与普通 FIFO(先进先出,First-In,First-Out)队列相比,实现略显复杂,它的主要功能在于标记去重,并支持如下特性。

● 有序 :按照添加顺序处理元素(item)。
● 去重 :相同元素在同一时间不会被重复处理,例如一个元素在处理之前被添加了多次,它只会被处理一次。
● 并发性 :多生产者和多消费者。
● 标记机制 :支持标记功能,标记一个元素是否被处理,也允许元素在处理时重新排队。
● 通知机制 :ShutDown 方法通过信号量通知队列不再接收新的元素,并通知 metric goroutine 退出。
● 延迟 :支持延迟队列,延迟一段时间后再将元素存入队列。
● 限速 :支持限速队列,元素存入队列时进行速率限制。限制一个元素被重新排队(Reenqueued)的次数。
● Metric :支持 metric 监控指标,可用于 Prometheus 监控。

WorkQueue 支持 3 种队列,并提供了 3 种接口,不同队列实现可应对不同的使用场景,分别介绍如下。

● Interface :FIFO 队列接口,先进先出队列,并支持去重机制。
● DelayingInterface :延迟队列接口,基于 Interface 接口封装,延迟一段时间后再将元素存入队列。
● RateLimitingInterface :限速队列接口,基于 DelayingInterface 接口封装,支持元素存入队列时进行速率限制。

FIFO 队列

FIFO 队列支持最基本的队列方法,例如插入元素、获取元素、获取队列长度等。另外,WorkQueue 中的限速及延迟队列都基于 Interface 接口实现,其提供如下方法:

代码路径:vendor/k8s.io/client-go/util/workqueue/queue.go

type Interface interface {
  Add(item interface{})
  Len() int
  Get() (item interface{}, shutdown bool)
  Done(item interface{})
  ShutDown()
  ShutDownWithDrain()
  ShuttingDown() bool
}

FIFO 队列 Interface 方法说明如下。

● Add :给队列添加元素(item),可以是任意类型元素。
● Len :返回当前队列的长度。
● Get :获取队列头部的一个元素。
● Done :标记队列中该元素已被处理。
● ShutDown :关闭队列。
● ShuttingDown :查询队列是否正在关闭。

FIFO队列数据结构如下:

// Type is a work queue (see the package comment).
type Type struct {
  // queue defines the order in which we will work on items. Every
  // element of queue should be in the dirty set and not in the
  // processing set.
  queue []t

  // dirty defines all of the items that need to be processed.
  dirty set

  // Things that are currently being processed are in the processing set.
  // These things may be simultaneously in the dirty set. When we finish
  // processing something and remove it from this set, we'll check if
  // it's in the dirty set, and if so, add it to the queue.
  processing set

  cond *sync.Cond

  shuttingDown bool
  drain        bool

  metrics queueMetrics

  unfinishedWorkUpdatePeriod time.Duration
  clock                      clock.WithTicker
}

FIFO 队列数据结构中最主要的字段有 queue、dirty 和 processing。其中 queue 字段是实际存储元素的地方,它是 slice 结构的,用于保证元素有序;dirty 字段非常关键,除了能保证去重,还能保证在处理一个元素之前哪怕其被添加了多次(并发情况下),但也只会被处理一次;processing 字段用于标记机制,标记一个元素是否正在被处理。应根据 WorkQueue 的特性理解源码的实现,FIFO 存储过程如图

在这里插入图片描述

通过 Add 方法往 FIFO 队列中分别插入 1、2、3 这 3 个元素,此时队列中的 queue 和 dirty 字段分别存有 1、2、3 元素,processing 字段为空。然后通过 Get 方法获取最先进入的元素(也就是 1 元素),此时队列中的 queue 和 dirty 字段分别存有 2、3 元素,而 1 元素会被放入 processing 字段中,表示该元素正在被处理。最后,当我们处理完 1 元素时,通过 Done 方法标记该元素已经被处理完成,此时队列中的 processing 字段中的 1 元素会被删除。

如图所示,这是 FIFO 队列的存储流程,在正常的情况下,FIFO 队列运行在并发场景下。高并发下如何保证在处理一个元素之前哪怕其被添加了多次,但也只会被处理一次?下面进行讲解,FIFO 并发存储过程如图

在这里插入图片描述

如图所示,在并发场景下,假设 goroutine A 通过 Get 方法获取 1 元素,1 元素被添加到 processing 字段中,同一时间,goroutine B 通过 Add 方法插入另一个 1 元素,此时在 processing 字段中已经存在相同的元素,所以后面的 1 元素并不会被直接添加到 queue 字段中,当前 FIFO 队列中的 dirty 字段中存有 1、2、3 元素,processing 字段存有 1 元素。在 goroutine A 通过 Done 方法标记处理完成后,如果 dirty 字段中存有 1 元素,则将 1 元素追加到 queue 字段中的尾部。需要注意的是,dirty 和 processing 字段都是用 Hash Map 数据结构实现的,所以不需要考虑无序,只保证去重即可。

延迟队列

延迟队列,基于 FIFO 队列接口封装,在原有功能上增加了 AddAfter 方法,其原理是延迟一段时间后再将元素插入 FIFO 队列。延迟队列数据结构如下:

代码路径:vendor/k8s.io/client-go/util/workqueue/delaying_queue.go

type DelayingInterface interface {
  Interface
  // AddAfter adds an item to the workqueue after the indicated duration has passed
  AddAfter(item interface{}, duration time.Duration)
}

// delayingType wraps an Interface and provides delayed re-enquing
type delayingType struct {
  Interface

  // clock tracks time for delayed firing
  clock clock.Clock

  // stopCh lets us signal a shutdown to the waiting loop
  stopCh chan struct{}
  // stopOnce guarantees we only signal shutdown a single time
  stopOnce sync.Once

  // heartbeat ensures we wait no more than maxWait before firing
  heartbeat clock.Ticker

  // waitingForAddCh is a buffered channel that feeds waitingForAdd
  waitingForAddCh chan *waitFor

  // metrics counts the number of retries
  metrics retryMetrics
}

AddAfter 方法会插入一个 item(元素)参数,并附带一个 duration(延迟时间)参数,该 duration 参数用于指定元素延迟插入 FIFO 队列的时间。如果 duration 小于或等于 0,会直接将元素插入 FIFO 队列中

delayingType 结构中最主要的字段是 waitingForAddCh,其默认初始大小为 1000,通过 AddAfter 方法插入元素时,是非阻塞状态的,只有当插入的元素大于或等于 1000 时,延迟队列才会处于阻塞状态。waitingForAddCh 字段中的数据通过 goroutine 运行的 waitingLoop 函数持久运行。延迟队列运行原理如图所示:

在这里插入图片描述

如图所示,将元素 1 放入 waitingForAddCh 字段中,通过 waitingLoop 函数消费元素数据。当元素的延迟时间不大于当前时间时,说明还需要延迟将元素插入 FIFO 队列的时间,此时将该元素放入优先队列(waitForPriorityQueue)中。当元素的延迟时间大于当前时间时,则将该元素插入 FIFO 队列中。另外,还会遍历优先队列(waitForPriorityQueue)中的元素,按照上述逻辑验证时间。

限速队列

限速队列,基于延迟队列和 FIFO 队列接口封装,限速队列接口(RateLimitingInterface)在原有功能上增加了 AddRateLimited、Forget、NumRequeues 方法。限速队列的重点不在于 RateLimitingInterface 接口,而在于它提供的 4 种限速算法接口(RateLimiter)。其原理是,限速队列利用延迟队列的特性,延迟某个元素的插入时间,达到限速目的。RateLimiter 数据结构如下:

代码路径:vendor/k8s.io/client-go/util/workqueue/default_rate_limiters.go

type RateLimiter interface {
  // When gets an item and gets to decide how long that item should wait
  When(item interface{}) time.Duration
  // Forget indicates that an item is finished being retried.  Doesn't matter whether it's for failing
  // or for success, we'll stop tracking it
  Forget(item interface{})
  // NumRequeues returns back how many failures the item has had
  NumRequeues(item interface{}) int
}

限速队列接口方法说明:

● When :获取指定元素应该等待的时间。
● Forget :释放指定元素,清空该元素的排队数。
● NumRequeues :获取指定元素的排队数。

注意 :这里有一个非常重要的概念——限速周期,一个限速周期是指从执行 AddRateLimited 方法到执行完 Forget 方法之间的时间。如果该元素被 Forget 方法处理完,则清空排队数。

下面会分别详解 WorkQueue 提供的 4 种限速算法,应对不同的场景,这 4 种限速算法分别如下。

● 令牌桶算法(BucketRateLimiter)。
● 排队指数算法(ItemExponentialFailureRateLimiter)。
● 计数器算法(ItemFastSlowRateLimiter)。
● 混合模式(MaxOfRateLimiter),将多种限速算法混合使用。

1.令牌桶算法

令牌桶算法是通过 Go 语言的第三方库 golang.org/x/time/rate 实现的。令牌桶算法内部实现了一个存放 token(令牌)的“桶”,初始时“桶”是空的,token 会以固定速率往“桶”里填充,直到将其填满为止,多余的 token 会被丢弃。每个元素都会从令牌桶得到一个 token,只有得到 token 的元素才允许通过(accept),而没有得到 token 的元素处于等待状态。令牌桶算法通过控制发放 token 来达到限速目的。令牌桶算法原理如图:

在这里插入图片描述

WorkQueue 在默认的情况下会实例化令牌桶,代码示例如下:

rate.NewLimiter(rate.Limit(10), 100)},

在实例化 rate.NewLimiter 后,传入 r 和 b 两个参数,其中 r 参数表示每秒往“桶”里填充的 token 数量,b 参数表示令牌桶的大小(即令牌桶最多存放的 token 数量)。我们假定 r 为 10,b 为 100。假设在一个限速周期内插入了 1000 个元素,通过 r.Limiter.Reserve().Delay 函数返回指定元素应该等待的时间,那么前 b(即 100)个元素会被立刻处理,而后面元素的延迟时间分别为 item100/100ms、item101/200ms、item102/300ms、item103/400ms,以此类推。

2.排队指数算法

排队指数算法将相同元素的排队数作为指数,排队数增大,速率限制呈指数级增长,但其最大值不会超过 maxDelay。元素的排队数统计是有限速周期的,一个限速周期是指从执行 AddRateLimited 方法到执行完 Forget 方法之间的时间。如果该元素被 Forget 方法处理完,则清空排队数。

排队指数算法的核心实现

代码示例路径:vendor/k8s.io/client-go/util/workqueue/default_rate_limiters.go

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
   r.failuresLock.Lock()
   defer r.failuresLock.Unlock()

   exp := r.failures[item]
   r.failures[item] = r.failures[item] + 1

   // The backoff is capped such that 'calculated' value never overflows.
   backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
   if backoff > math.MaxInt64 {
      return r.maxDelay
   }

   calculated := time.Duration(backoff)
   if calculated > r.maxDelay {
      return r.maxDelay
   }

   return calculated
}

该算法提供了 3 个主要字段:failures、baseDelay、maxDelay。其中,failures 字段用于统计元素排队数,每当 AddRateLimited 方法插入新元素时,会为该字段加 1;另外,baseDelay 字段是最初的限速单位(默认为5ms),maxDelay 字段是最大限速单位(默认为1000s)。排队指数增长趋势如图

在这里插入图片描述

限速队列利用延迟队列的特性,延迟多个相同元素的插入时间,达到限速目的。注意 :在同一限速周期内,如果不存在相同元素,那么所有元素的延迟时间为 baseDelay;而在同一限速周期内,如果存在相同元素,那么相同元素的延迟时间呈指数级增长,最长延迟时间不超过 maxDelay。

我们假定 baseDelay 是 1*time.Millisecond,maxDelay 是 1000*time.Second。假设在一个限速周期内通过 AddRateLimited 方法插入 10 个相同元素,那么第 1 个元素会通过延迟队列的 AddAfter 方法插入并设置延迟时间为 1ms(即 baseDelay),第 2 个相同元素的延迟时间为 2ms,第 3 个相同元素的延迟时间为 4ms,第 4 个相同元素的延迟时间为 8ms,第 5 个相同元素的延迟时间为 16ms…… 第 10 个相同元素的延迟时间为 512ms,最长延迟时间不超过 1000s(即 maxDelay)。

3.计数器算法

计数器算法是限速算法中最简单的一种,其原理是:限制一段时间内允许通过的元素数量,例如在 1 分钟内只允许通过 100 个元素,每插入一个元素,计数器自增 1,当计数器数到 100 的阈值且还在限速周期内时,则不允许元素再通过。但 WorkQueue 在此基础上扩展了 fast 和 slow 速率。

计数器算法提供了 4 个主要字段:failures、fastDelay、slowDelay 及 maxFastAttempts。其中,failures 字段用于统计元素排队数,每当 AddRateLimited 方法插入新元素时,会为该字段加 1;而 fastDelay 和 slowDelay 字段是用于定义 fast、slow 速率的;另外, maxFastAttempts 字段用于控制从 fast 速率转换到 slow 速率。计数器算法核心实现的

代码示例如下:vendor/k8s.io/client-go/util/workqueue/default_rate_limiters.go

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
  r.failuresLock.Lock()
  defer r.failuresLock.Unlock()

  r.failures[item] = r.failures[item] + 1

  if r.failures[item] <= r.maxFastAttempts {
    return r.fastDelay
  }

  return r.slowDelay
}

假设 fastDelay 是 5*time.Millisecond,slowDelay 是 10*time.Second,maxFastAttempts 是 3。在一个限速周期内通过 AddRateLimited 方法插入 4 个相同的元素,那么前 3 个元素使用 fastDelay 定义的 fast 速率,当触发 maxFastAttempts 字段时,第 4 个元素使用 slowDelay 定义的 slow 速率。

4.混合模式

混合模式是将多种限速算法混合使用,即多种限速算法同时生效。例如,同时使用排队指数算法和令牌桶算法,代码示例如下:

// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue.  It has
// both overall and per-item rate limiting.  The overall is a token bucket and the per-item is exponential
func DefaultControllerRateLimiter() RateLimiter {
  return NewMaxOfRateLimiter(
    NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
    // 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
    &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
  )
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1475686.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linux系统Jenkins的安装

Jenkins安装 安装上传安装包解压包首次登录要去服务器查看密码&#xff0c;更改密码选择需要安装的插件设置Admin用户和密码安装完成 安装 上传安装包 上传 jdk17 tomcat jenkins.war的安装包 . 上传 tomcat安装包解压包 解压jdk tar xf jdk-11.0.18_linux-x64_bin.tar.gz解…

雾锁王国Enshrouded服务器CPU内存配置怎么选择?

雾锁王国/Enshrouded服务器CPU内存配置如何选择&#xff1f;阿里云服务器网aliyunfuwuqi.com建议选择8核32G配置&#xff0c;支持4人玩家畅玩&#xff0c;自带10M公网带宽&#xff0c;1个月90元&#xff0c;3个月271元&#xff0c;幻兽帕鲁服务器申请页面 https://t.aliyun.com…

【HarmonyOS】鸿蒙开发之Video组件——第3.7章

Video组件内VideoOptions属性简介 src&#xff1a;设置视频地址。currentProgressRate&#xff1a;设置视频播放倍速&#xff0c;参数说明如下&#xff1a; number|string&#xff1a;只支持 0.75 &#xff0c; 1.0 &#xff0c; 1.25 &#xff0c; 1.75 &#xff0c; 2.0 。P…

首超星巴克,瑞幸咖啡开始“守擂”?

农历新年开年短短半个月&#xff0c;瑞幸咖啡凭一己之力&#xff0c;似乎拉开了国内现磨咖啡行业竞争的新序幕。 先是新年开工首日&#xff0c;瑞幸咖啡每周“9.9元喝一杯”的可选性品类减少&#xff0c;登上微博热搜&#xff0c;引发市场对于现磨咖啡行业生态的可持续性担忧。…

[计算机网络]--MAC/ARP/DNS协议

前言 作者&#xff1a;小蜗牛向前冲 名言&#xff1a;我可以接受失败&#xff0c;但我不能接受放弃 如果觉的博主的文章还不错的话&#xff0c;还请点赞&#xff0c;收藏&#xff0c;关注&#x1f440;支持博主。如果发现有问题的地方欢迎❀大家在评论区指正 目录 一、认识以…

ARM系列 -- 虚拟化(五)

在ARM体系结构中&#xff0c;处理器内部有通用计时器&#xff0c;通用计时器包含一组比较器&#xff0c;用来与系统计数器进行比较&#xff0c;一旦通用计时器的值小于等于系统计数器时便会产生时钟中断。 大家看到这里是不是想起了前面讲GIC时提到的PPI&#xff08;private p…

招聘系统架构的设计与实现

在当今竞争激烈的人才市场中&#xff0c;有效的招聘系统对企业吸引、筛选和管理人才至关重要。本文将探讨招聘系统的架构设计与实现&#xff0c;帮助企业构建一个高效、可靠的人才招聘平台。 ## 1. 系统架构设计 ### 1.1 微服务架构 招聘系统通常采用微服务架构&#xff0c;将…

移动Web系统中无监督KPI异常检测的监督式微调

简介 本文介绍由清华大学、南开大学、中国移动研究院与必示科技共同合作的论文&#xff1a;移动Web系统中无监督KPI异常检测的监督式微调。该论文已被The Web Conference 2024&#xff08;International World Wide Web Conference&#xff09;会议录用&#xff0c;论文标题为&…

【Hudi】核心概念

https://www.bilibili.com/video/BV1ue4y1i7na?p17&vd_sourcefa36a95b3c3fa4f32dd400f8cabddeaf 大数据新风口&#xff1a;Hudi数据湖&#xff08;尚硅谷&Apache Hudi联合出品&#xff09; 1 基础概念 1.1 时间轴(TimeLine) 1.2 文件布局(File Layout) 1.3 索引(In…

Leetcode—82. 删除排序链表中的重复元素 II【中等】

2024每日刷题&#xff08;117&#xff09; Leetcode—82. 删除排序链表中的重复元素 II 实现代码 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val…

npm login报错 ‘proxy‘ config is set properly. See: ‘npm help config‘

报错提示 解决办法 按照以下的顺序执行命令行 检查自己的代理 npm config get proxy npm config get npm config get https-proxy npm config get registry代理和缓存置空并且设置新镜像 npm config set proxy null npm config set https-proxy null npm config set regist…

面试笔记系列七之多线程+分布式系统基础知识点整理及常见面试题

介绍一下线程的生命周期及状态&#xff1f; 1.创建 当程序使用new关键字创建了一个线程之后&#xff0c;该线程就处于一个新建状态&#xff08;初始状态&#xff09;&#xff0c;此时它和其他Java对象一样&#xff0c;仅仅由Java虚拟机为其分配了内存&#xff0c;并初始化了其成…

【Linux】jieba的安装和使用

目录 链接 jieba库简介 ​获得jieba库 怎么使用jieba库 链接 因为github下载东西的时间原因&#xff0c;下面这个网站会定期将jieba从github镜像回来 项目 探索 GitCodehttps://gitcode.net/explore输入cppjieba进行搜索&#xff0c;点击相应链接进入 jieba库简介 需要…

【前端知识点】

虚拟 dom&#xff1a; 虚拟 dom 就是 vue 通过 js 对象渲染虚拟 dom 的&#xff0c;虚拟 dom 的 js 对象包含节点的类型、属性、子节点等信息&#xff0c;这些虚拟 dom 节点会构成一棵树形结构&#xff0c;用来表示整个页面的结构。 当 vue 组件更新时&#xff0c;会通过 diff…

雾锁王国服务器怎么建?雾锁王国服务器搭建方法

雾锁王国Enshrouded服务器搭建怎么搭建&#xff1f;非常简单&#xff0c;阿里云计算巢雾锁王国程序&#xff0c;可以一键搭建雾锁王国多人联机服务器&#xff0c;腾讯云是基于雾锁王国镜像系统&#xff0c;阿里云服务网aliyunfuwuqi.com汇总雾锁王国服务器搭建&#xff0c;超简…

Python爬虫实战第二例【二】

零.前言&#xff1a; 本文章借鉴&#xff1a;Python爬虫实战&#xff08;五&#xff09;&#xff1a;根据关键字爬取某度图片批量下载到本地&#xff08;附上完整源码&#xff09;_python爬虫下载图片-CSDN博客 大佬的文章里面有API的获取&#xff0c;在这里我就不赘述了。 一…

早产儿视网膜病变分期,自动化+半监督(无需大量医生标注数据)

早产儿视网膜病变 ROP 分期 提出背景解法框架解法步骤一致性正则化算法构建思路 实验 提出背景 论文&#xff1a;https://www.cell.com/action/showPdf?piiS2589-0042%2823%2902593-2 早产儿视网膜病变&#xff08;ROP&#xff09;目前是全球婴儿失明的主要原因之一。 这是…

链表基础知识详解(非常详细简单易懂)

概述&#xff1a; 链表作为 C 语言中一种基础的数据结构&#xff0c;在平时写程序的时候用的并不多&#xff0c;但在操作系统里面使用的非常多。不管是RTOS还是Linux等使用非常广泛&#xff0c;所以必须要搞懂链表&#xff0c;链表分为单向链表和双向链表&#xff0c;单向链表很…

求两个向量之间的夹角

求两个向量之间的夹角 介绍Unity的API求向量夹角Vector3.AngleVector3.SignedAngle 自定义获取方法0-360度的夹角 总结 介绍 求两个向量之间的夹角方法有很多&#xff0c;比如说Unity中的Vector3.Angle&#xff0c;Vector3.SignedAngle等方法&#xff0c;具体在什么情况下使用…

Groovy(第九节) Groovy 之单元测试

JUnit 利用 Java 对 Song 类进行单元测试 默认情况下 Groovy 编译的类属性是私有的,所以不能直接在 Java 中访问它们,必须像下面这样使用 setter: 编写这个测试用例余下的代码就是小菜一碟了。测试用例很好地演示了这样一点:用 Groovy 所做的一切都可以轻易地在 Java 程序…