kubelet源码分析 syncLoopIteration(二) plegCh、syncCh

news2025/1/16 8:02:45

kubelet源码分析 syncLoopIteration(二) plegCh

在这里插入图片描述
上一篇:kubelet源码分析 syncLoopIteration(一) configCh
上一篇说了configCh管道的作用,这一篇说一下plegCh管道。这个管道主要是监听容器运行时状态的,当状态有变化后,会触发event然后到syncLoopIteration重新执行同步。
一、plegCh
1.初始化pleg运行

  • 在kubelet的Run函数中,会执行start
  • 在syncLoop执行watch获得pleg的chan管道,传到syncLoopIteration函数中
  • start函数会执行一个gorounite函数去每一秒执行一次relist
kl.pleg.Start()
plegCh := kl.pleg.Watch()
func (g *GenericPLEG) Start() {
	go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}

2.relist函数

  • 获得当前时间
  • 获得所有pod信息,包括沙箱容器
  • 更新relist时间
  • 将所有pod的数量做记录
  • setCurrent函数,podRecords结构体下有两个podlist,一个新的一个老的。setCurrent是将上面获得的容器运行时的podlist赋值给新的。
  • 所有的新老pod的容器组到一起进行遍历。进行新老的对比,如果有event变化,则记录。这里是根据容器id,去新、老pod中去查找这个容器,根据容器状态的变化做校验来确定是哪种event变化(2.1)
  • 是否需要再次检查(检查上一次未通过的)
  • 遍历有event变化的pod信息
  • 如果需要检查,则将pod信息遍历更新,更新失败的记录到需要再次检查的map中
  • g.podRecords.update(pid)是将老的pod变成新的,新的清空(为下次relist做准备)
  • 遍历这个pod的event变化,如果是不知道的类型,跳过这次。否则,把这次变更推送到eventChannel管道(这个管道上面的kubelet会监听触发plegCh的)
  • 同时如果这个event是删除容器,如果容器还没有退出码并且pod还存在切缓存也存在,获得容器运行时状态,并且记录他的所有退出码(这里的event对应的是容器,pod的status对应的是pod,pod可能有很多容器,所以根据pod下面的容器的id进行存储,一次性把pod的所有容器都存下来)
  • 如果这个容器是删除的,并且已经有了退出码了,则记录一下日志即可(上面一步可能把所有容器的退出码都记录了,这里直接读取)
  • 如果没有event,切需要再次检查一遍上次未通过的,则更新一下最新的信息。
  • 更新relist所耗时间
func (g *GenericPLEG) relist() {
	klog.V(5).InfoS("GenericPLEG: Relisting")
	//获得当前时间
	if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
		metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
	}

	timestamp := g.clock.Now()
	defer func() {
		metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))
	}()

	// 获得所有pod信息,包括沙箱容器
	podList, err := g.runtime.GetPods(true)
	if err != nil {
		klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")
		return
	}

	g.updateRelistTime(timestamp)
	//类型转换一下
	pods := kubecontainer.Pods(podList)
	//将所有pod的数量做记录
	updateRunningPodAndContainerMetrics(pods)
	//更新新的pod缓存
	g.podRecords.setCurrent(pods)
	eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
	for pid := range g.podRecords {
		oldPod := g.podRecords.getOld(pid)
		pod := g.podRecords.getCurrent(pid)
		// 获得新的老的所有容器
		allContainers := getContainersFromPods(oldPod, pod)
		for _, container := range allContainers {
		    //检查是否有容器变化
			events := computeEvents(oldPod, pod, &container.ID)
			for _, e := range events {
				updateEvents(eventsByPodID, e)
			}
		}
	}

	var needsReinspection map[types.UID]*kubecontainer.Pod
	//缓存是否开启,如果开启,需要再次检查(检查上一次未通过的)
	if g.cacheEnabled() {
		needsReinspection = make(map[types.UID]*kubecontainer.Pod)
	}
	//遍历所有发生的event的pod
	for pid, events := range eventsByPodID {
		pod := g.podRecords.getCurrent(pid)
		if g.cacheEnabled() {
			if err := g.updateCache(pod, pid); err != nil {
				klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))
				needsReinspection[pid] = pod

				continue
			} else {
				delete(g.podsToReinspect, pid)
			}
		}
		//将老的pod信息改成新的,新的变成nil
		g.podRecords.update(pid)

		containerExitCode := make(map[string]int)
		//遍历这个pod的所有event变化
		for i := range events {
		    //如果是不知道的类型跳过
			if events[i].Type == ContainerChanged {
				continue
			}
			select {
			//推送的kubelet的plegChg管道
			case g.eventChannel <- events[i]:
			default:
				metrics.PLEGDiscardEvents.Inc()
				klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
			}
			//如果是失败的容器
			if events[i].Type == ContainerDied {
			//如果没有失败状态(退出码)
				if len(containerExitCode) == 0 && pod != nil && g.cache != nil {
					// 获得最新的容器运行时的pod状态
					status, err := g.cache.Get(pod.ID)
					if err == nil {
					    //遍历容器的状态信息,并且记录退出码
						for _, containerStatus := range status.ContainerStatuses {
							containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode
						}
					}
				}
				//如果容器有了退出码信息并且pod还在,记录一下日志
				if containerID, ok := events[i].Data.(string); ok {
					if exitCode, ok := containerExitCode[containerID]; ok && pod != nil {
						klog.V(2).InfoS("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)
					}
				}
			}
		}
	}
	//如果需要检查,并且pod需要同步的数量大于0
	if g.cacheEnabled() {
		if len(g.podsToReinspect) > 0 {
			klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")
			for pid, pod := range g.podsToReinspect 
			    //更新这些pod信息
				if err := g.updateCache(pod, pid); err != nil {
					klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
					needsReinspection[pid] = pod
				}
			}
		}
		g.cache.UpdateTime(timestamp)
	}
	g.podsToReinspect = needsReinspection
}

2.1容器变化的校验

func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent {
	var pid types.UID
	if oldPod != nil {
		pid = oldPod.ID
	} else if newPod != nil {
		pid = newPod.ID
	}
	oldState := getContainerState(oldPod, cid)//获得pod的容器状态
	newState := getContainerState(newPod, cid)
	return generateEvents(pid, cid.ID, oldState, newState)//比对两个pod的容器状态
}

func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) plegContainerState {
	state := plegContainerNonExistent
	if pod == nil {
		return state
	}
	c := pod.FindContainerByID(*cid)//找到这个容器
	if c != nil {
		return convertState(c.State)//验证这个容器状态
	}
	c = pod.FindSandboxByID(*cid)
	if c != nil {
		return convertState(c.State)
	}

	return state
}

func convertState(state kubecontainer.State) plegContainerState {
	switch state {
	case kubecontainer.ContainerStateCreated:
		return plegContainerUnknown
	case kubecontainer.ContainerStateRunning:
		return plegContainerRunning
	case kubecontainer.ContainerStateExited:
		return plegContainerExited
	case kubecontainer.ContainerStateUnknown:
		return plegContainerUnknown
	default:
		panic(fmt.Sprintf("unrecognized container state: %v", state))
	}
}
//这里是新老状态对比的
func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
    //如果新老状态一样,代表没变化
	if newState == oldState {
		return nil
	}

	klog.V(4).InfoS("GenericPLEG", "podUID", podID, "containerID", cid, "oldState", oldState, "newState", newState)
	switch newState {
	//选择新状态变更就可以了
	case plegContainerRunning:
		return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
	case plegContainerExited:
		return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}}
	case plegContainerUnknown:
		return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}}
	case plegContainerNonExistent:
		switch oldState {
		case plegContainerExited:
			// We already reported that the container died before.
			return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}}
		default:
			return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}}
		}
	default:
		panic(fmt.Sprintf("unrecognized container state: %v", newState))
	}
}

二、syncCh
这个比较简单,所有的pod进入syncLoopIteration后,最终会走到managePodLoop函数中,当这个函数执行完成后,会将pod信息存入到workQueue队列里。同时syncCh会一秒循环一次这个队列,将pod数据再次进入syncLoopIteration函数执行


syncTicker := time.NewTicker(time.Second)

func (p *podWorkers) completeWork(pod *v1.Pod, phaseTransition bool, syncErr error) {
	switch {
	case phaseTransition:
		p.workQueue.Enqueue(pod.UID, 0)
	case syncErr == nil:
		p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
	case strings.Contains(syncErr.Error(), NetworkNotReadyErrorMsg):
		p.workQueue.Enqueue(pod.UID, wait.Jitter(backOffOnTransientErrorPeriod, workerBackOffPeriodJitterFactor))
	default:
		p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
	}
	p.completeWorkQueueNext(pod.UID)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/93916.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

搭建Python环境

搭建Python环境 文章目录搭建Python环境需要安装的环境&#xff1a;安装Python1&#xff09;找到官网2&#xff09;找到下载页面3&#xff09;双击安装包4&#xff09;运行 hello world安装 PyCharm1&#xff09;找到官方网站2&#xff09;找到下载页面3&#xff09;双击安装包…

BEVFormer-accelerate:基于 EasyCV 加速 BEVFormer

导言 BEVFormer是一种纯视觉的自动驾驶感知算法&#xff0c;通过融合环视相机图像的空间和时序特征显式的生成具有强表征能力的BEV特征&#xff0c;并应用于下游3D检测、分割等任务&#xff0c;取得了SOTA的结果。我们在EasyCV开源框架&#xff08;https://github.com/alibaba…

照片调色JixiPix Hand Tint Pro

JixiPix Hand Tint Pro带有专业分层系统的简单工作流程具有色调&#xff0c;色调&#xff0c;颜色&#xff0c;乘法&#xff0c;柔和涂料或可以逐层更改的涂料的模式&#xff0c;以及功能强大的选色工具&#xff0c;可在隔离区域内保持刷涂&#xff0c;以实现快速着色和准确性。…

Linux环境下多线程C/C++程序的内存问题诊断

目录说明常见的内存错误举例常见的内存访问错误有以下几种&#xff1a;内存问题定位步骤野指针内存释放后使用&#xff08;UaF&#xff0c;Use after Free&#xff09;内存问题检查工具常见的内存问题检查工具Valgrindgcc 命令行参数 -fsanitizeaddress -fno-omit-frame-pointe…

Prim算法

应用场景 1.如何修路才能保证修路的总路程最短&#xff1f; 特点&#xff1a; 1.将所有节点全部连通&#xff0c;并且边上的权总和最小——>最小生成树 2.N个顶点&#xff0c;有N-1条边 Prim算法图解分析 简而言之&#xff0c;就是先确定顶点A&#xff0c;然后寻找没有遍…

代码随想录训练营第52天|LeetCode 300.最长递增子序列、674. 最长连续递增序列、718. 最长重复子数组

参考 代码随想录 题目一&#xff1a;LeetCode 300.最长递增子序列 确定dp数组下标及其含义 dp[i]&#xff1a;在nums数组中&#xff0c;在下标0~i元素&#xff08;包含i&#xff09;的基础上&#xff0c;以nums[i]作为子序列的最后一个元素&#xff0c;组成的最长严格递增子序…

0126 搜索与回溯算法 Day15

剑指 Offer 34. 二叉树中和为某一值的路径 给你二叉树的根节点 root 和一个整数目标和 targetSum &#xff0c;找出所有 从根节点到叶子节点 路径总和等于给定目标和的路径。 叶子节点 是指没有子节点的节点。 示例 1&#xff1a; 输入&#xff1a;root [5,4,8,11,null,13,…

cuda学习笔记4——cuda 核函数

cuda学习笔记4——cuda 核函数一、CUDA规范二、核函数内部线程的使用2.1 如何启动核函数demo 1&#xff1a;起16个线程来计算&#xff0c;四个线程块&#xff0c;每个块内四个线程例子demo2核函数是指在GPU端运行的代码&#xff0c;核函数内部主要干了什么&#xff1f;简而言之…

一个《跳动的爱心》代码,纯HTML+JS,双击直接运行

HTMLJS实现的一个跳动的爱心。集合了web动画库GSAP JS、OBJ 文件加载器OBJLoader、WebGL第三方库Three.js等。效果非常棒&#xff01; 目录实际效果&#xff1a;目录结构&#xff1a;HTML代码CSS代码js代码&#xff1a;简单的修改完整文件下载实际效果&#xff1a; 由于是纯前端…

学会IDEA这些断点操作,生产问题解决的越来越快了

文章目录IDEA断点高级用法1、断点类型1&#xff09;行断点&#xff08;line breakpoints&#xff09;2&#xff09;字段断点&#xff08;field breakpoints&#xff09;3&#xff09;方法断点&#xff08;method breakpoints&#xff09;1> 加载类名上的断点2> 正常方法断…

xss-labs(WriteUp)

xss-labs 先讲讲什么是跨站脚本攻击XSS(Cross Site Scripting) XSS原理 本质上是针对html的一种注入攻击&#xff0c;没有遵循数据与代码分离的原则&#xff0c;把用户输入的数据当作代码来执行 xss跨站脚本攻击是指恶意攻击者往Web页面里插入恶意脚本代码&#xff08;包括当…

redis之codis和redis cluster对比

写在前面 codis和Redis cluster 都是Redis的集群方案&#xff0c;本文就一起来看下。 1&#xff1a;codis的组件和架构 codis的组件有4个&#xff0c;如下&#xff1a; codis server&#xff1a;基于redis进行了二次开发的组件&#xff0c;负责数据的读写 codis proxy&…

Halcon图像拼接

图像拼接在实际的应用场景很广&#xff0c;比如无人机航拍&#xff0c;遥感图像等等&#xff0c;图像拼接是进一步做图像理解基础步骤&#xff0c;拼接效果的好坏直接影响接下来的工作&#xff0c;所以一个好的图像拼接算法非常重要。 如按下图是将两张楼房图片拼接成一个图像。…

QT 学习笔记(九)

文章目录一、事件的接收和忽略1. 准备工作2. 接收和忽略二、event() 函数1. 简介2. 实例演示3. 总结三、事件过滤器四、总结&#xff08;细看&#xff09;1. 知识点汇总2. QT 的事件处理五、事件、事件的接收和忽略、event() 函数和事件过滤器代码1. 主窗口头文件 mywidget.h2.…

英语文本转语音软件哪个好?分享三个新手也能学会的工具

大家平时都是怎么学习英语的呢&#xff1f;课上老师让我们熟悉单词意思、巩固语法、多练阅读理解&#xff1b;其实通过练习听力来加强语感也很重要。很多小伙伴的阅读理解很好&#xff0c;但是听力却跟不上。这里教大家一个小技巧&#xff0c;就是在做阅读理解的时候&#xff0…

第十章TomCat详解

文章目录Tomcat的部署和启动Tomcat扮演的角色①对外&#xff1a;Web服务器②对内&#xff1a;Servlet容器深入理解为什么需要TomCat从目的开始出发遇到的问题总过程部署前提解压TomCat的目录文件启动Tomcat并访问首页如何部署一个项目访问对应的web资源专业版IDEA创建一个JavaW…

力扣(718.1143)补9.12

718.最长重复子数组 这题真的想不到。 看图的话会好懂很多。 class Solution { public int findLength(int[] nums1, int[] nums2) { int nnums1.length; int n2nums2.length; int[][] dpnew int[n1][n21]; int result0; for(int…

【区块链-智能合约工程师】第二篇:Solidity入门

文章目录Solidity极简入门HelloWorld数值类型三种函数类型函数输出变量作用域引用类型参考文章&#xff1a;一文速览2022十大智能合约开发工具 资料地址&#xff1a;WTF学院 Solidity极简入门 HelloWorld remix&#xff1a;在线智能合约开发IDE&#xff08;Integrated Deve…

DBCO-PEG-Aminooxy, Aminooxy-PEG-DBCO,氨甲基聚乙二醇环辛炔

DBCO-PEG-Aminooxy &#xff0c; Aminooxy-PEG-DBCO&#xff0c;二苯并环辛炔-聚乙二醇-氨甲基&#xff0c;氨甲基聚乙二醇环辛炔 Product specifications&#xff1a; 1.CAS No&#xff1a;N/A 2.Molecular weightMV&#xff1a;1000&#xff0c;2000&#xff0c;34000&#x…

小侃设计模式(十八)-发布订阅模式

1.概述 发布订阅模式又叫观察者模式&#xff08;Observer Pattern&#xff09;&#xff0c;它是指对象之间一对多的依赖关系&#xff0c;每当那个特定对象改变状态时&#xff0c;所有依赖于它的对象都会得到通知并被自动更新&#xff0c;它是行为型模式的一种。观察者模式内部…