kubernetes中的ParallelizeUntil()框架源码解读与使用

news2024/9/24 11:30:03

概述

摘要:本文从源码层面解读了kubernetes源码中常用的workqueue.ParallelizeParallelizeUntil()框架的源码实现,并且本文也将举例说明了workqueue.ParallelizeUntil()方法的典型使用场景。

正文

说明:基于 kubernetes v1.18.0 源码分析

在Kubernetes源码中, 我们经常会读到workqueue.ParallelizeUntil()函数,它的作用是在并发多个worker来处理任务,直到接收到context发出的停止信号或任务完成。workqueue.ParallelizeUntil源码位于k8s.io/client-go/util/workqueue/parallelizer.go,该workqueue是client-go中的一个工作队列,队列包括三种:FIFO、延迟队列和限速队列。关于workqueue我已经在之前的文章中有详细的介绍,如需了解请阅览informer中的WorkQueue机制的实现分析与源码解读(1)

ParallelizeUntil()的源码解读

ParallelizeUntil()方法作用是并发多个worker来处理任务,直到接收到context发出的停止信号或任务完成

workers 表示启动多少个worker并发处理任务

pieces 表示要处理任务对应的index的数量

DoWorkPieceFunc 表示用于处理任务的工作函数

ctx 使用context控制并发任务的停止

// 定义worker函数
type DoWorkPieceFunc func(piece int)

// ParallelizeUntil is a framework that allows for parallelizing N 
// independent pieces of work until done or the context is canceled.
// parallelelizeuntil是一个框架,它允许并行处理任务,直到完成或上下文被取消。
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
  // 定义一个stop信号,当手动ctx.Done()信号后,就让整个 ParallelizeUntil 任务停止
	var stop <-chan struct{}
	if ctx != nil {
		stop = ctx.Done()
	}
	// 将把需要多的工作对应的索引放到chan中,放入完成后就关闭chan,这里借助chan,是因为chan是线程安全的。
	toProcess := make(chan int, pieces)
	for i := 0; i < pieces; i++ {
		toProcess <- i
	}
	close(toProcess)

  // 对于要处理的对象数量少于worker数量时
	if pieces < workers {
		workers = pieces
	}

	wg := sync.WaitGroup{}
	wg.Add(workers)
  // 启动多个worker,同时从chan中取对象进行处理,知道收到停止信号或完成任务
	for i := 0; i < workers; i++ {
		go func(x int) {
			defer utilruntime.HandleCrash()
			defer wg.Done()
			for piece := range toProcess {
				fmt.Printf("work %d  ",x)
				select {
				case <-stop:
					return
				default:
					doWorkPiece(piece)
				}
			}
		}(i)
	}
	wg.Wait()
}

从源码分析知道,ParallelizeUntil()的逻辑比较简单,先将要处理的任务,放到chan中,放入完成后就关闭chan,这里借助chan,是因为chan是线程安全的。之后再启动多个worker协程,这些协程不断的从chan去job进行处理。逻辑示意图如下。

在这里插入图片描述

如果在多想象一下,ParallelizeUntil()的逻辑,是不是与工厂中多个机器人不断的从传输带上去东西来进行加工的场景啊。

在这里插入图片描述

接下来我们再看下,worker协程中捕获panic的HandleCrash的源码。源码逻辑比较简单,就是worker协程中如果出现panic将被recover捕获,捕获之后如果定义了额外的handler函数,会遍历执行。

var PanicHandlers = []func(interface{}){logPanic}

// PanicHandlers is a list of functions which will be invoked when a panic happens.
var PanicHandlers = []func(interface{}){logPanic}

// HandleCrash simply catches a crash and logs an error. Meant to be called via
// defer.  Additional context-specific handlers can be provided, and will be
// called in case of panic.  HandleCrash actually crashes, after calling the
// handlers and logging the panic message.
//
// E.g., you can provide one or more additional handlers for something like shutting down go routines gracefully.
func HandleCrash(additionalHandlers ...func(interface{})) {
  // 捕获panic
	if r := recover(); r != nil {
		for _, fn := range PanicHandlers {
			fn(r)
		}
		for _, fn := range additionalHandlers {
			fn(r)
		}
		if ReallyCrash {
			// Actually proceed to panic.
			panic(r)
		}
	}
}

代码测试

编写一个简单程序,测试workqueue.ParallelizeUntil()方法的使用.

需求: 通过workqueue.ParallelizeUntil(),并发的找出100以内的素数

import (
	"context"
	"fmt"
	"k8s.io/client-go/util/workqueue"
	"testing"
	"time"
)

// 判断一个数是否是素数
func isPrieme(num int) bool {
	for i := 2; i < num; i++ {
		if num%i == 0 {
			return false
		}
	}
	return true
}

// 定义用于保存结果的chan
var ResultChan = make(chan int, 100)

func WorkFunc(num int) {
	fmt.Println(" check num: ", num)
	if isPrieme(num) {
		ResultChan <- num
	}
}

// go test -mod=vendor -run="^TestParallelizeUntil" -v
func TestParallelizeUntil(t *testing.T) {

	// 定义超时信号对应的ctx
	ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))

	// 判断一个数是否是素数
	myFunc := WorkFunc

	// 执行查找
	workqueue.ParallelizeUntil(ctx, 3, 100, myFunc)

	// 将结果放入结果chan
	fmt.Println("result:")
	close(ResultChan)
	for i := range ResultChan {
		fmt.Printf("%d ", i)
	}
	fmt.Println()

}




---------------执行与输出如下-----------------
(base) @PM80280051 ➜ utils git:(dev-xw)go test  -run="^TestParallelizeUntil" -v
=== RUN   TestParallelizeUntil
 check num:  0
 check num:  3
 check num:  4
 check num:  5
 check num:  6
 check num:  7
 check num:  8
 check num:  9
 check num:  10
 check num:  11
 check num:  12
 check num:  13
 check num:  14
 check num:  15
 check num:  16
 check num:  17
 check num:  1
 check num:  19
 check num:  20
 check num:  21
 check num:  22
 check num:  23
 check num:  24
 check num:  25
 check num:  26
 check num:  27
 check num:  28
 check num:  29
 check num:  30
 check num:  31
 check num:  32
 check num:  2
 check num:  34
 check num:  33
 check num:  36
 check num:  37
 check num:  38
 check num:  39
 check num:  40
 check num:  41
 check num:  42
 check num:  43
 check num:  44
 check num:  45
 check num:  46
 check num:  47
 check num:  48
 check num:  49
 check num:  35
 check num:  51
 check num:  52
 check num:  53
 check num:  54
 check num:  55
 check num:  56
 check num:  57
 check num:  58
 check num:  59
 check num:  60
 check num:  61
 check num:  62
 check num:  63
 check num:  64
 check num:  65
 check num:  66
 check num:  67
 check num:  68
 check num:  69
 check num:  70
 check num:  18
 check num:  72
 check num:  71
 check num:  74
 check num:  75
 check num:  76
 check num:  77
 check num:  78
 check num:  79
 check num:  80
 check num:  81
 check num:  82
 check num:  50
 check num:  84
 check num:  85
 check num:  86
 check num:  87
 check num:  88
 check num:  89
 check num:  90
 check num:  91
 check num:  92
 check num:  93
 check num:  94
 check num:  95
 check num:  96
 check num:  97
 check num:  83
 check num:  99
 check num:  98
 check num:  73

result: 0 3 5 7 11 13 17 1 19 23 29 31 2 37 41 43 47 53 59 61 67 71 79 89 97 83 73 
--- PASS: TestParallelizeUntil (0.00s)
PASS
ok      kubecmdb/utils  0.552s



在这个例子中,ParallelizeUntil 函数,启动了3个worker,并发的处理10个任务,直到任务处理完,或者接收到ctx定义的超时信号。 注意,ParallelizeUntil 函数不会返回任何值,如果需要记录worker协程中的结果,可以自己定义。`

使用场景

在kubernetes源码中,Kube-scheduler在进行预选算法时,使用了workqueue.ParallelizeUntil(),并发16个worker同时执行预选算法。在集群规模较大时,并发处理能提升kube-scheduler调度任务的整体效能。

预选算法执行预选的源码findNodesThatPassFilters

// k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go

// findNodesThatPassFilters finds the nodes that fit the filter plugins.
// findNodesThatPassFilters使用过滤插件来查找适合的节点。
func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
	allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
	if err != nil {
		return nil, err
	}

	numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes)))

	// Create filtered list with enough space to avoid growing it
	// and allow assigning.
	filtered := make([]*v1.Node, numNodesToFind)
	// 代码略

	// Stops searching for more nodes once the configured number of feasible nodes
	// are found.
  // !!!重点,启动了16个work,并发的执行 checkNode 函数,对nodes节点进行预选操作
	workqueue.ParallelizeUntil(ctx, 16, len(allNodes), checkNode)
	processedNodes := int(filteredLen) + len(statuses)
	g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes)
  
	// filter 列表预选算法,过滤出满足预选算法的node节点
	filtered = filtered[:filteredLen]
	if err := errCh.ReceiveError(); err != nil {
		statusCode = framework.Error
		return nil, err
	}
	return filtered, nil
}

结论

workqueue.ParallelizeUntil框架广泛的适用于kubernetes源码,通过对其源码的解读,我们了解到了其如何实现与使用场景。我们可以在平时日常开发中,也可以多多尝试使用这个成熟的并发任务框架。

我们可以通过阅读kubernetes源码,学习Kubernetes内部机制,同时Kubernets项目中有一些好的成熟框架,我们可以学以致用,多在用在日常开发中才能真正掌握。

参考资料

Kubernete-v1.18源码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2105233.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Scratch教师节:给老师的一封信

小虎鲸Scratch资源站-免费Scratch作品源码,素材,教程分享平台! 【Scratch教师节特别献礼】—— 给老师的一封信&#xff1a;编程之光&#xff0c;照亮梦想之路 在这个金秋送爽、硕果累累的季节里&#xff0c;我们迎来了一个特别而温馨的日子——教师节。在这个充满感激与敬意的…

交叉编译概念

交叉编译概念 目录 交叉编译概念1. 什么是交叉编译2. 交叉编译的作用3. 交叉编译器4. 交叉编译工具链5. 交叉编译的一般步骤6. 交叉编译实例 1. 什么是交叉编译 交叉编译是指在一个平台上编译代码&#xff0c;使其能够在另一个不同的平台上运行的过程。这种编译方式通常用于开…

深入探索JDBC:Java数据库连接技术详解与实战应用

Java Database Connectivity&#xff08;JDBC&#xff09;是Java语言中用于访问关系型数据库的标准接口。它定义了一组API&#xff0c;使得Java程序能够以统一的方式连接、访问和操作不同的关系型数据库。JDBC不仅简化了数据库操作&#xff0c;还提高了Java应用程序的可移植性和…

抢先看:2024云栖大会体验攻略

这是一场「AI」奇妙之旅。 2024云栖大会定档 9月19日&#xff01; 期待与你在 杭州云栖小镇 共度一场为期3天的科技盛会 三日主论坛 400分论坛 与并行话题 4万平米 智能科技展区 免费领取云栖大会门票 怎么看、怎么玩、怎么逛 超长干货攻略奉上&#xff0c;请查收 ⬇️…

将OpenHarmony RK设备散包镜像打包为一个整包

本篇文章教大家使用瑞芯微的Linux_Pack_Firmware工具将rk设备的多个镜像文件打包为一个固件。首先感谢大佬AlgoIdeas开源的打包工具&#xff0c;开源地址&#xff1a;https://gitee.com/openharmony-driver/ril_adapter 接下来进行演示&#xff0c;下面我们使用OpenHarmony 4.…

js运算符----三元运算符

&#xff1f;前为真就执行&#xff1f;号后面的&#xff0c;为假就执行&#xff1a;号后边的 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge…

828华为云征文|Flexus云服务器X实例赋能,用Python将微信公众号秒变智能聊天机器人

&#x1f3c6;作者简介&#xff0c;黑夜开发者&#xff0c;CSDN领军人物&#xff0c;全栈领域优质创作者✌&#xff0c;CSDN博客专家&#xff0c;阿里云社区专家博主&#xff0c;2023年6月CSDN上海赛道top4。 &#x1f3c6;数年电商行业从业经验&#xff0c;AWS/阿里云资深使用…

微服务——服务注册和发现(一)

服务注册和发现 1.1.服务注册背景 假如某项微服务被调用较多&#xff0c;为了应对更高的并发&#xff0c;我们进行了多实例部署&#xff0c;如图&#xff1a; 此时&#xff0c;每个item-service的实例其IP或端口不同&#xff0c;问题来了&#xff1a; item-service这么多实例…

【JUC】12-CAS

1. CAS compare and swap&#xff0c;比较并交换。 包含三个操作数&#xff1a;内存位置、预期原值及更新值。 执行CAS操作的时候&#xff0c;将内存位置的值与预期原值比较&#xff1a; 匹配&#xff0c;更新为新值。不匹配&#xff0c;不进行操作&#xff0c;多个线程同时…

SpringCloud开发实战(五):Feign的一些优化建议

目录 SpringCloud开发实战&#xff08;一&#xff09;&#xff1a;搭建SpringCloud框架 SpringCloud开发实战&#xff08;二&#xff09;&#xff1a;通过RestTemplate实现远程调用 SpringCloud开发实战&#xff08;三&#xff09;&#xff1a;集成Eureka注册中心 SpringCloud开…

nefu暑假集训5 KMP 个人模板+例题汇总

前言&#xff1a; KMP算法用于匹配字符串&#xff0c;假设长字符串为s&#xff0c;需要匹配的字符串是p。KMP算法的基础思想是利用一个next[n]数组&#xff1a;next[i]对应的是&#xff1a;以下标i为结尾的连续子串&#xff0c;与以第一个字符开始的子串&#xff0c;相等的最大…

UE 【材质编辑】自定义ShadingMode

【UE 4.27.2】 在UE中提供了多种多样的ShadingMode&#xff0c;相当于一种风格化的处理方案(整体全面的流程调整)&#xff0c;切换ShadingMode可以看到不同的显示效果&#xff1a; 通过简单的拓展&#xff0c;我们可以实现自定义的ShadingMode&#xff0c;使得我们切换到自己的…

JMeter:如何定制Http请求取样器

一般使用JMeter发送HTTP请求时都会用到HTTP Request取样器&#xff0c;这种取样器大多数时候能够满足压力测试的需要。 图1 但也有一些场景&#xff0c;可能需要更加强大的取样器&#xff0c;或者需要定制一些功能&#xff0c;这时就需要自己手动编写取样器。幸好JMeter为我们提…

leveldb源码剖析(二)——LSM Tree

LSM Tree LSM Tree&#xff1a;Log-Structured Merge Tree&#xff0c;日志结构合并树。是一种频繁写性能很高的数据结构。 LSM Tree将写入操作与合并操作分离&#xff0c;数据首先写入磁盘中的日志文件&#xff08;WAL&#xff09;&#xff0c;随后写入内存缓存&#xff0c;…

Android经典实战之Textview文字设置不同颜色、下划线、加粗、超链接等效果

本文首发于公众号“AntDream”&#xff0c;欢迎微信搜索“AntDream”或扫描文章底部二维码关注&#xff0c;和我一起每天进步一点点 SpannableString 在 Android 开发中是一个非常强大的工具&#xff0c;它允许你在单个字符串范围内应用多种样式。使用 SpannableString&#xf…

【 C++ 】 类和对象的学习 (二)

&#x1f618;我的主页&#xff1a;OMGmyhair-CSDN博客 目录 I、类的默认成员函数 一、构造函数 二、析构函数 三、拷贝构造函数 四、 运算符重载 赋值运算符重载 五、取地址重载_普通对象 六、取地址重载_const对象 I、类的默认成员函数 用户没有显示实现&#xff0…

Linux学习笔记5 值得一读,Linux(ubuntu)软件管理,搜索下载安装卸载全部搞定!(上)

本文记录Ubuntu操作系统的软件包管理。 一、背景 整个Linux系统就是大大小小的软件包构成的&#xff0c;在linux系统中&#xff0c;软件的管理非常重要&#xff0c;与其他操作系统不同&#xff0c;linux的软件包管理比较复杂&#xff0c;有时还需要处理软件包之间的冲突。本文…

Python | 泰勒图

写在前面 最近&#xff0c;开始对于CMIP6的一些数据进行评估。Talor图是一个很好展现模式间误差的方式&#xff0c;这里简单记录一下在python中的实现方式。 主要为半图的画法 参考的代码为&#xff1a; https://zenodo.org/records/5548061 效果大致下面这个样子 这边在原…

maven中如何配置多个仓库使其同时生效

场景 有一个项目&#xff0c;我把代码跟本地maven依赖包从同事那里拷贝过来&#xff0c;然后打包却一直打不了&#xff0c;一直报aliyun仓库找不到这个依赖的错误&#xff0c;无论我改成引用本地仓库还是线上aliyun仓库都不行。 依赖 <dependency><groupId>org.spr…

有三层交换机就不用路由器了?真的假的

号主&#xff1a;老杨丨11年资深网络工程师&#xff0c;更多网工提升干货&#xff0c;请关注公众号&#xff1a;网络工程师俱乐部 晚上好&#xff0c;我的网工朋友。 在现代企业网络环境中&#xff0c;三层交换机因其高效的数据包处理能力和较低的延迟而受到广泛欢迎。 然而&…