kubelet源码分析 status_manager状态管理器篇
右上方的statusManager就是本篇要介绍的内容。上一篇kubelet的sync同步函数也介绍过,这篇内容详细介绍状态管理器的作用。
一、简介
status_manager(状态管理器)是 Kubernetes 中的一个组件,负责管理 Pod、Node、Endpoint 等资源的状态。它的主要功能包括:
- 状态缓存管理:status_manager 维护一个本地缓存,用于存储集群中各个资源的状态信息,如 Pod 的运行状态、Node的健康状态等。通过定期从 API Server 获取最新的状态,并将其存储在缓存中,可以提高性能和响应速度。
- 状态更新和同步:status_manager 负责从 API Server 获取最新的状态信息,并将其更新到本地缓存中。它通过监听 API Server 的事件或定期轮询的方式来获取更新。一旦有新的状态更新,status_manager就会相应地更新本地缓存,确保缓存中的状态信息与 API Server 中的一致。
- 状态处理和计算:status_manager 对获取的状态信息进行处理和计算,以生成更高级的状态信息或指标。例如,它可以根据 Pod的容器状态计算出整个 Pod 的运行状态,根据 Node 的资源使用情况计算出节点的负载情况等。
- 状态监控和报警:status_manager监控集群中各个资源的状态,并根据定义的规则和策略触发报警。它可以检测到异常的状态变化或运行问题,并发送警报通知管理员或自动触发一些处理操作。
- 状态查询和提供接口:status_manager提供查询接口,供其他组件或工具获取状态信息。通过这些接口,其他组件可以获取最新的状态信息,从而进行进一步的处理、分析或展示。
总的来说,status_manager 的主要功能是管理和维护集群中各个资源的状态信息,确保状态的准确性、一致性,并提供对外的接口和功能,以支持状态的查询、处理、监控和报警等操作。
同时,status_manager中的podStatuses也是管理了当前缓存的运行状态,这个状态要同步到API-SERVER中来保证数据的一致性
二、结构体构造
文件位置pkg/kubelet/status/status_manager.go
type manager struct {
//k8s客户端
kubeClient clientset.Interface
podManager kubepod.Manager
// 从pod UID到相应pod的同步状态的映射。也就是缓存的运行时信息
podStatuses map[types.UID]versionedPodStatus
//要保持有锁
podStatusesLock sync.RWMutex
//当管道有内容代表有需要更新的pod
podStatusChannel chan struct{}
//pod UID到成功发送到API服务器的最新状态版本的映射。
apiStatusVersions map[kubetypes.MirrorPodUID]uint64
podDeletionSafety PodDeletionSafetyProvider
podStartupLatencyHelper PodStartupLatencyStateHelper
// 状态允许保存/恢复pod资源分配,并容忍kubelet重新启动。
state state.State
// stateFileDirectory是存放检查点状态文件的目录。
stateFileDirectory string
}
type Manager interface {
PodStatusProvider
// 开始的函数
Start()
// 设置podstatuses的最新状态
SetPodStatus(pod *v1.Pod, status v1.PodStatus)
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool)
// TerminatePod将所提供的pod的容器状态重置为已终止,并触发状态更新。
TerminatePod(pod *v1.Pod)
//下面的函数作用不大,不详介绍
RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool)
GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool)
SetPodAllocation(pod *v1.Pod) error
SetPodResizeStatus(podUID types.UID, resize v1.PodResizeStatus) error
}
三、注册函数
//这一行是kubelet初始化时候执行的
kl.statusManager.Start()
func (m *manager) Start() {
//做一些初始化
m.state = state.NewNoopStateCheckpoint()
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
stateImpl, err := state.NewStateCheckpoint(m.stateFileDirectory, podStatusManagerStateFile)
if err != nil {
klog.ErrorS(err, "Could not initialize pod allocation checkpoint manager, please drain node and remove policy state file")
panic(err)
}
m.state = stateImpl
}
if m.kubeClient == nil {
klog.InfoS("Kubernetes client is nil, not starting status manager")
return
}
klog.InfoS("Starting to sync pod status with apiserver")
//每十秒进行一次全量同步
syncTicker := time.NewTicker(syncPeriod).C
// 协程获取管道数据,要不十秒全量同步一次,要不有单个pod需要同步
go wait.Forever(func() {
for {
select {
//有pod数据需要同步
case <-m.podStatusChannel:
klog.V(4).InfoS("Syncing updated statuses")
m.syncBatch(false)
//全量同步
case <-syncTicker:
klog.V(4).InfoS("Syncing all statuses")
m.syncBatch(true)
}
}
}, 0)
}
四、主函数执行
下面开始几个重要函数的介绍,分别是syncBatch(批量处理同步)syncpod(与api-server同步)SetPodStatus(更新最新缓存出发syncBatch)
函数1:syncBatch函数
- 如果是全量更新,获得api最新版本的pod。如果不存在,则直接删除掉
- 遍历所有缓存中的数据。
- 如果是静态pod。在源码中通过pod UID识别,但在API中通过镜像pod的UID跟踪。
- 如果不是全量更新,验证一下当期那pod的版本号和缓存的版本号对比,如果大于则跳过,小于则进行更新。将这个pod的数据和缓存数据推到待更新切片中
- 如果是全量更新,验证一下是否需要更新(neesUpdate函数也是验证一下version,并且验证是否可以删除)如果版本过老或者需要删除的,也会进行同步
- 是否需要调谐pod,是将podManager管理的缓存和status管理的实时缓存进行对比,如果有不同,则代表需要调谐
- 删掉当前pod的api版本号(删了也没事,调谐后会立马重写进来)
- 遍历所有需要同步的pod,进行syncpod的同步
func (m *manager) syncBatch(all bool) int {
type podSync struct {
podUID types.UID
statusUID kubetypes.MirrorPodUID
status versionedPodStatus
}
var updatedStatuses []podSync
podToMirror, mirrorToPod := m.podManager.GetUIDTranslations()
func() {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
if all {
//如果是全量更新,获得api最新版本的pod。如果不存在,则直接删除掉
for uid := range m.apiStatusVersions {
_, hasPod := m.podStatuses[types.UID(uid)]
_, hasMirror := mirrorToPod[uid]
if !hasPod && !hasMirror {
delete(m.apiStatusVersions, uid)
}
}
}
//遍历所有缓存中的数据
for uid, status := range m.podStatuses {
//静态pod在源码中通过pod UID识别,但在API中通过镜像pod的UID跟踪。
uidOfStatus := kubetypes.MirrorPodUID(uid)
if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok {
if mirrorUID == "" {
klog.V(5).InfoS("Static pod does not have a corresponding mirror pod; skipping",
"podUID", uid,
"pod", klog.KRef(status.podNamespace, status.podName))
continue
}
uidOfStatus = mirrorUID
}
if !all {
//如果不是全量更新,验证一下当期那pod的版本号和缓存的版本号对比,如果大于则跳过,小于则进行更新
if m.apiStatusVersions[uidOfStatus] >= status.version {
continue
}
//将这个pod的数据和缓存数据推到待更新切片中
updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status})
continue
}
//如果是全量更新,验证一下是否需要更新(neesUpdate函数也是验证一下version,并且验证是否可以删除)如果版本过老或者需要删除的,也会进行同步
if m.needsUpdate(types.UID(uidOfStatus), status) {
updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status})
//是否需要调谐pod,是将podManager管理的缓存和status管理的实时缓存进行对比,如果有不同,则代表需要调谐
} else if m.needsReconcile(uid, status.status) {
//删掉当前pod的api版本号(删了也没事,调谐后会立马重写进来)
delete(m.apiStatusVersions, uidOfStatus)
updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status})
}
}
}()
//遍历所有需要同步的pod,进行syncpod的同步
for _, update := range updatedStatuses {
klog.V(5).InfoS("Sync pod status", "podUID", update.podUID, "statusUID", update.statusUID, "version", update.status.version)
m.syncPod(update.podUID, update.status)
}
return len(updatedStatuses)
}
函数2:syncpod函数
- 查询到api-server的pod状态信息
- 如果查询到的uid和要同步的uid不一样,代表pod被删除后重建了,跳过了更新
- 将api-server的pod信息和statusManager的pod信息比较然后合并成最新的(api-server的状态就是statusManager的状态,要确保api-server的状态保持与statusManager状态同步)
- 通过patch方式,将最新的pod信息更新到api-server中
- 更新最新版本号
- 验证是否需要删除,如果需要删除,则删除掉pod
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
//查询到api-server的pod状态信息
pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{})
if errors.IsNotFound(err) {
klog.V(3).InfoS("Pod does not exist on the server",
"podUID", uid,
"pod", klog.KRef(status.podNamespace, status.podName))
return
}
if err != nil {
klog.InfoS("Failed to get status for pod",
"podUID", uid,
"pod", klog.KRef(status.podNamespace, status.podName),
"err", err)
return
}
//转换uid
translatedUID := m.podManager.TranslatePodUID(pod.UID)
//如果查询到的uid和要同步的uid不一样,代表pod被删除后重建了,跳过了更新。
if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) {
klog.V(2).InfoS("Pod was deleted and then recreated, skipping status update",
"pod", klog.KObj(pod),
"oldPodUID", uid,
"podUID", translatedUID)
m.deletePodStatus(uid)
return
}
//将api-server的pod信息和statusManager的pod信息比较然后合并成最新的(api-server的状态就是statusManager的状态,要确保api-server的状态保持与statusManager状态同步)
mergedStatus := mergePodStatus(pod.Status, status.status, m.podDeletionSafety.PodCouldHaveRunningContainers(pod))
//通过patch方式,将最新的pod信息更新到api-server中
newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(context.TODO(), m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus)
klog.V(3).InfoS("Patch status for pod", "pod", klog.KObj(pod), "podUID", uid, "patch", string(patchBytes))
if err != nil {
klog.InfoS("Failed to update status for pod", "pod", klog.KObj(pod), "err", err)
return
}
if unchanged {
klog.V(3).InfoS("Status for pod is up-to-date", "pod", klog.KObj(pod), "statusVersion", status.version)
} else {
klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", mergedStatus)
pod = newPod
m.podStartupLatencyHelper.RecordStatusUpdated(pod)
}
if status.at.IsZero() {
klog.V(3).InfoS("Pod had no status time set", "pod", klog.KObj(pod), "podUID", uid, "version", status.version)
} else {
duration := time.Since(status.at).Truncate(time.Millisecond)
metrics.PodStatusSyncDuration.Observe(duration.Seconds())
}
//更新最新版本号
m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version
//验证是否需要删除
if m.canBeDeleted(pod, status.status, status.podIsFinished) {
deleteOptions := metav1.DeleteOptions{
GracePeriodSeconds: new(int64),
Preconditions: metav1.NewUIDPreconditions(string(pod.UID)),
}
//删除pod(就是验证pod是否已经Finished了)
err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
if err != nil {
klog.InfoS("Failed to delete status for pod", "pod", klog.KObj(pod), "err", err)
return
}
klog.V(3).InfoS("Pod fully terminated and removed from etcd", "pod", klog.KObj(pod))
m.deletePodStatus(uid)
}
}
函数3:SetPodStatus函数
- 主要在updateStatusInternal中执行。传过来的参数分别是pod和将要变成的status
- 获得之前的status
- 如果存在,验证是否为静态pod
- 检查容器中的非法状态转换
- 更新pod条件的LastTransitionTime。
- 记录一下日志
- 这里的目的是为了防止对一个pod的状态的并发更新(函数4介绍)
- 初始化最新的状态版本
- 记录到缓存.推送管道数据,触发同步
func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
status = *status.DeepCopy()
//主要在updateStatusInternal中执行。传过来的参数分别是pod和将要变成的status
m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil, false)
}
func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate, podIsFinished bool) {
var oldStatus v1.PodStatus
//获得之前的status
cachedStatus, isCached := m.podStatuses[pod.UID]
if isCached {
//如果存在,验证是否为静态pod
oldStatus = cachedStatus.status
if !kubetypes.IsStaticPod(pod) {
if cachedStatus.podIsFinished && !podIsFinished {
klog.InfoS("Got unexpected podIsFinished=false, while podIsFinished=true in status cache, programmer error.", "pod", klog.KObj(pod))
podIsFinished = true
}
}
} else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
oldStatus = mirrorPod.Status
} else {
oldStatus = pod.Status
}
//检查容器中的非法状态转换
if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil {
klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod))
return
}
//更新pod条件的LastTransitionTime。
if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil {
klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod))
return
}
updateLastTransitionTime(&status, &oldStatus, v1.ContainersReady)
updateLastTransitionTime(&status, &oldStatus, v1.PodReady)
updateLastTransitionTime(&status, &oldStatus, v1.PodInitialized)
updateLastTransitionTime(&status, &oldStatus, kubetypes.PodHasNetwork)
updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled)
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
updateLastTransitionTime(&status, &oldStatus, v1.DisruptionTarget)
}
if oldStatus.StartTime != nil && !oldStatus.StartTime.IsZero() {
status.StartTime = oldStatus.StartTime
} else if status.StartTime.IsZero() {
now := metav1.Now()
status.StartTime = &now
}
normalizeStatus(pod, &status)
//记录一下日志
if klogV := klog.V(5); klogV.Enabled() {
var containers []string
for _, s := range append(append([]v1.ContainerStatus(nil), status.InitContainerStatuses...), status.ContainerStatuses...) {
var current, previous string
switch {
case s.State.Running != nil:
current = "running"
case s.State.Waiting != nil:
current = "waiting"
case s.State.Terminated != nil:
current = fmt.Sprintf("terminated=%d", s.State.Terminated.ExitCode)
default:
current = "unknown"
}
switch {
case s.LastTerminationState.Running != nil:
previous = "running"
case s.LastTerminationState.Waiting != nil:
previous = "waiting"
case s.LastTerminationState.Terminated != nil:
previous = fmt.Sprintf("terminated=%d", s.LastTerminationState.Terminated.ExitCode)
default:
previous = "<none>"
}
containers = append(containers, fmt.Sprintf("(%s state=%s previous=%s)", s.Name, current, previous))
}
sort.Strings(containers)
klogV.InfoS("updateStatusInternal", "version", cachedStatus.version+1, "podIsFinished", podIsFinished, "pod", klog.KObj(pod), "podUID", pod.UID, "containers", strings.Join(containers, " "))
}
//这里的目的是为了防止对一个pod的状态的并发更新(函数4介绍)
if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
klog.V(3).InfoS("Ignoring same status for pod", "pod", klog.KObj(pod), "status", status)
return
}
//初始化最新的状态版本
newStatus := versionedPodStatus{
status: status,
version: cachedStatus.version + 1,
podName: pod.Name,
podNamespace: pod.Namespace,
podIsFinished: podIsFinished,
}
if cachedStatus.at.IsZero() {
newStatus.at = time.Now()
} else {
newStatus.at = cachedStatus.at
}
//记录到缓存
m.podStatuses[pod.UID] = newStatus
select {
//推送管道数据,触发同步
case m.podStatusChannel <- struct{}{}:
default:
}
}
函数4:isPodStatusByKubeletEqual
- 检查其类型是否属于由 kubelet 跟踪的条件类型
- 如果条件类型属于 kubelet 跟踪的类型,并且在 oldCopy 中存在相同类型的条件,则比较它们的状态、消息和原因是否相等。如果有任何不相等的情况,函数会返回 false,表示两个 PodStatus 不相等。
- 果所有条件都相等,将 status.Conditions 赋值给 oldCopy.Conditions,确保 oldCopy 中的条件与 status 中的条件一致。
- 最后,函数使用 DeepEqual 函数比较 oldCopy 和 status 是否完全相等。如果它们在所有字段上都相等,函数将返回 true,否则返回 false。
func isPodStatusByKubeletEqual(oldStatus, status *v1.PodStatus) bool {
oldCopy := oldStatus.DeepCopy()
for _, c := range status.Conditions {
//检查其类型是否属于由 kubelet 跟踪的条件类型
if kubetypes.PodConditionByKubelet(c.Type) || kubetypes.PodConditionSharedByKubelet(c.Type) {
_, oc := podutil.GetPodCondition(oldCopy, c.Type)
if oc == nil || oc.Status != c.Status || oc.Message != c.Message || oc.Reason != c.Reason {
return false
}
}
}
oldCopy.Conditions = status.Conditions
//最后,函数使用 DeepEqual 函数比较 oldCopy 和 status 是否完全相等。如果它们在所有字段上都相等,函数将返回 true,否则返回 false。
return apiequality.Semantic.DeepEqual(oldCopy, status)
}
func PodConditionByKubelet(conditionType v1.PodConditionType) bool {
for _, c := range PodConditionsByKubelet {
if c == conditionType {
return true
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.PodHasNetworkCondition) {
if conditionType == PodHasNetwork {
return true
}
}
return false
}
status_manager状态管理器的几个主要函数功能大概介绍完。这个函数主要的用到地方就是kubelet层面,当kubelet需要sync代码时,就会触发SetPodStatus函数,然后status_manager就会将最新的数据通过patch的方式同步到api-server中。