1. 环境说明
Kubernetes
源码版本:remotes/origin/release-1.25
Kubernetes
编译出来的Kubelet
版本:Kubernetes v1.24.0-beta.0.2463+ee7799bab469d7
Kubernetes
集群实验环境:使用Kubernetes v1.25.4
二进制的方式搭建了一个单节点集群
K8S 单节点单节点搭建可以参考:Kubernetes v1.25 搭建单节点集群用于Debug K8S源码
Golang
版本:go1.19.3 linux/amd64
IDEA
版本:2022.2.3
Delve
版本:1.9.1
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# dlv version
Delve Debugger
Version: 1.9.1
Build: $Id: d81b9fd12bfa603f3cf7a4bc842398bd61c42940 $
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# go version
go version go1.19.3 linux/amd64
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# kubectl version
WARNING: This version information is deprecated and will be replaced with the output from kubectl version --short. Use --output=yaml|json to get the full version.
Client Version: version.Info{Major:"1", Minor:"25", GitVersion:"v1.25.4", GitCommit:"872a965c6c6526caa949f0c6ac028ef7aff3fb78", GitTreeState:"clean", BuildDate:"2022-11-09T13:36:36Z", GoVersion:"go1.19.3", Compiler:"gc", Platform:"linux/amd64"}
Kustomize Version: v4.5.7
Server Version: version.Info{Major:"1", Minor:"25", GitVersion:"v1.25.4", GitCommit:"872a965c6c6526caa949f0c6ac028ef7aff3fb78", GitTreeState:"clean", BuildDate:"2022-11-09T13:29:58Z", GoVersion:"go1.19.3", Compiler:"gc", Platform:"linux/amd64"}
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# kubectl get nodes -owide
NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
k8s-master1 Ready <none> 31h v1.25.4 192.168.11.71 <none> CentOS Linux 7 (Core) 3.10.0-1160.80.1.el7.x86_64 containerd://1.6.10
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# kubectl get componentstatus
Warning: v1 ComponentStatus is deprecated in v1.19+
NAME STATUS MESSAGE ERROR
etcd-0 Healthy {"health":"true","reason":""}
controller-manager Healthy ok
scheduler Healthy ok
[root@k8s-master1 kubernetes]#
Kubelet
启动参数配置如下:
[root@k8s-master1 kubernetes]# ps -ef|grep "/usr/local/bin/kubelet"
root 7972 1 6 07:06 ? 00:00:06 /usr/local/bin/kubelet --bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.kubeconfig --kubeconfig=/etc/kubernetes/kubelet.kubeconfig --config=/etc/kubernetes/kubelet-conf.yml --container-runtime-endpoint=unix:///run/containerd/containerd.sock --node-labels=node.kubernetes.io/node= --v=8
root 9549 6424 0 07:07 pts/0 00:00:00 grep --color=auto /usr/local/bin/kubelet
[root@k8s-master1 kubernetes]#
Kubelet
参数配置如下:
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
address: 0.0.0.0
port: 10250
readOnlyPort: 10255
authentication:
anonymous:
enabled: false
webhook:
cacheTTL: 2m0s
enabled: true
x509:
clientCAFile: /etc/kubernetes/pki/ca.pem
authorization:
mode: Webhook
webhook:
cacheAuthorizedTTL: 5m0s
cacheUnauthorizedTTL: 30s
cgroupDriver: systemd
cgroupsPerQOS: true
clusterDNS:
- 10.96.0.10
clusterDomain: cluster.local
containerLogMaxFiles: 5
containerLogMaxSize: 10Mi
contentType: application/vnd.kubernetes.protobuf
cpuCFSQuota: true
cpuManagerPolicy: none
cpuManagerReconcilePeriod: 10s
enableControllerAttachDetach: true
enableDebuggingHandlers: true
enforceNodeAllocatable:
- pods
eventBurst: 10
eventRecordQPS: 5
evictionHard:
imagefs.available: 15%
memory.available: 100Mi
nodefs.available: 10%
nodefs.inodesFree: 5%
evictionPressureTransitionPeriod: 5m0s
failSwapOn: true
fileCheckFrequency: 20s
hairpinMode: promiscuous-bridge
healthzBindAddress: 127.0.0.1
healthzPort: 10248
httpCheckFrequency: 20s
imageGCHighThresholdPercent: 85
imageGCLowThresholdPercent: 80
imageMinimumGCAge: 2m0s
iptablesDropBit: 15
iptablesMasqueradeBit: 14
kubeAPIBurst: 10
kubeAPIQPS: 5
makeIPTablesUtilChains: true
maxOpenFiles: 1000000
maxPods: 110
nodeStatusUpdateFrequency: 10s
oomScoreAdj: -999
podPidsLimit: -1
registryBurst: 10
registryPullQPS: 5
resolvConf: /etc/resolv.conf
rotateCertificates: true
runtimeRequestTimeout: 2m0s
serializeImagePulls: true
staticPodPath: /etc/kubernetes/manifests
streamingConnectionIdleTimeout: 4h0m0s
syncFrequency: 1m0s
volumeStatsAggPeriod: 1m0s
2. 组件概览
StatusManager主要职责如下:
- 1、
StatusManager
中维护了Pod
的状态,该状态会和最新的v1.PodStatus
保持同步 - 2、于此同时,
StatusManager
会更新apiserver Pod
的状态
3. 源码分析
Manager |
我们先来看看StatusManager
的接口定义,如下:
Manager.Start
:用于和apiserver
同步状态Manager.SetPodStatus
:更新StatusManager
缓存中该Pod
的状态Manager.SetContainerReadiness
:更新StatusManager
缓存中的Container
状态,并且会触发状态更新Manager.SetContainerStartup
:更新StatusManager
缓存中的Container
的启动状态,并且会触发状态更新Manager.TerminatePod
:重置StatusManager
缓存中的Container
状态为Terminated
,并且触发状态更新Manager.RemoveOrphanedStatuses
:扫描StatusManager.cache
,并移除已经删除的Pod
PodStatusProvider.GetPodStatus
:查询容器状态
// pkg/kubelet/status/status_manager.go
type Manager interface {
PodStatusProvider
Start()
SetPodStatus(pod *v1.Pod, status v1.PodStatus)
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool)
TerminatePod(pod *v1.Pod)
RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
}
type PodStatusProvider interface {
GetPodStatus(uid types.UID) (v1.PodStatus, bool)
}
我们继续来看看StatusManager
的具体实现,如下:
// pkg/kubelet/status/status_manager.go
type manager struct {
// apiserver的客户端
kubeClient clientset.Interface
// PodManager用于维护StaticPod以及MirrorPod之间的映射
podManager kubepod.Manager
// Pod的状态缓存
podStatuses map[types.UID]versionedPodStatus
// 读写锁,StatusManager的所有的方法都是线程安全的
podStatusesLock sync.RWMutex
// Pod状态发生改变的消费方
podStatusChannel chan podStatusSyncRequest
// mirrorPod的版本缓存
apiStatusVersions map[kubetypes.MirrorPodUID]uint64
// 用于保证Pod可以被安全的删除
podDeletionSafety PodDeletionSafetyProvider
}
我们继续看StatusManager
的其余方法的实现
3.1. Start
Start |
具体逻辑如下:
- 1、首先判断是否已经初始化好了
apiserver
的客户端,如果没有初始化,直接退出。因为后续所有的的操作都需要和apiserver
交互,所以这里直接退出也没啥 - 2、只要从
podStatusChannel
拿到了同步请求,就调用syncPod
同步 - 3、每隔
10s
进行一次批量同步
// pkg/kubelet/status/status_manager.go
func (m *manager) Start() {
// StatusManager本来就是用于更新Apiserver Pod状态的组件,现在如果apiserver的客户端没有初始化,那肯定需要直接退出
if m.kubeClient == nil {
klog.InfoS("Kubernetes client is nil, not starting status manager")
return
}
klog.InfoS("Starting to sync pod status with apiserver")
// 用于定期批量同步
syncTicker := time.NewTicker(syncPeriod).C
// 可以看到,StatusManager这辈子基本上是交代在这里了,就干两件事,其一是当有Pod更新的时候同步这个Pod, 其二就是每隔10秒钟批量同步
go wait.Forever(func() {
for {
select {
case syncRequest := <-m.podStatusChannel:
klog.V(5).InfoS("Status Manager: syncing pod with status from podStatusChannel",
"podUID", syncRequest.podUID,
"statusVersion", syncRequest.status.version,
"status", syncRequest.status.status)
m.syncPod(syncRequest.podUID, syncRequest.status)
case <-syncTicker:
klog.V(5).InfoS("Status Manager: syncing batch")
for i := len(m.podStatusChannel); i > 0; i-- {
<-m.podStatusChannel
}
m.syncBatch()
}
}
}, 0)
}
3.1.1. syncPod
syncPod |
我们来看看SyncPod
是如何把Pod
的状态同步给apiserver
的,主要逻辑如下:
- 1、判断当前的
Pod
是否真的需要更新,如果不需要更新就直接返回 - 2、根据当前
Pod
的UID
,从apiserver
中获取当前Pod
- 3、如果从
PodManager
中获取的UID
和当前的Pod UID
不相等,说明StatusManager
中的装状态不是最新的,直接删除老Pod
的状态 - 4、合并新老
Pod
的Condition
给新的Pod
- 5、更新
apiserver
中的Pod
状态,更新StatusManager
中的Pod
状态 - 6、如果当前
Pod
已经被删除,那么通过CRI
接口删除底层Pod
,然后调用apiserver
的接口删除Pod
// pkg/kubelet/status/status_manager.go
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
// 先判断当前Pod是否需要更新,如果当前Pod的状态已经是最新的,那么直接退出,估计是通过版本来比较
if !m.needsUpdate(uid, status) {
klog.V(1).InfoS("Status for pod is up-to-date; skipping", "podUID", uid)
return
}
// 从apiserver中获取当前Pod
pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{})
if errors.IsNotFound(err) {// 如果当前Pod在apiserver中没有找到,说明这个Pod已经被删除,这里直接忽略更新
return
}
if err != nil {
return
}
translatedUID := m.podManager.TranslatePodUID(pod.UID)
// 从这里可以看出,apiserver中的Pod一定是最新的
if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) {
klog.V(2).InfoS("Pod was deleted and then recreated, skipping status update",
"pod", klog.KObj(pod),
"oldPodUID", uid,
"podUID", translatedUID)
// 删除StatusManager中维护的老Pod的状态,相当于是更新StatusManager的Pod状态
m.deletePodStatus(uid)
return
}
// 合并新老Pod的Condition
mergedStatus := mergePodStatus(pod.Status, status.status, m.podDeletionSafety.PodCouldHaveRunningContainers(pod))
// 更新apiserver中该Pod的状态
newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(context.TODO(), m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus)
klog.V(3).InfoS("Patch status for pod", "pod", klog.KObj(pod), "patch", string(patchBytes))
if err != nil {
klog.InfoS("Failed to update status for pod", "pod", klog.KObj(pod), "err", err)
return
}
if unchanged {
klog.V(3).InfoS("Status for pod is up-to-date", "pod", klog.KObj(pod), "statusVersion", status.version)
} else {
klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", mergedStatus)
pod = newPod
}
m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version
// 如果当前Pod需要被删除,那么就通知底层的CRI删除Pod
if m.canBeDeleted(pod, status.status) {
deleteOptions := metav1.DeleteOptions{
GracePeriodSeconds: new(int64),
Preconditions: metav1.NewUIDPreconditions(string(pod.UID)),
}
// 如果底层CRI删除Pod成功,就删除apiserver中保存的Pod
err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
if err != nil {
klog.InfoS("Failed to delete status for pod", "pod", klog.KObj(pod), "err", err)
return
}
klog.V(3).InfoS("Pod fully terminated and removed from etcd", "pod", klog.KObj(pod))
m.deletePodStatus(uid)
}
}
3.1.1.1. needsUpdate
needsUpdate |
needUpdate
方法主要是用于判断当前需要更新的Pod
是否需要更新
func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {
// 猜的没错,果然是比较Pod的版本
latest, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(uid)]
// 如果StatusManager的缓存中没找到当前Pod或者版本较小,那么肯定需要更新
if !ok || latest < status.version {
return true
}
// 从PodManager中获取当前Pod
pod, ok := m.podManager.GetPodByUID(uid)
if !ok { // 如果PodManager中没有找到这个Pod,则直接返回`False`,为啥呀? 没看懂
return false
}
// 如果在PodManager中找到了当前Pod,
return m.canBeDeleted(pod, status.status)
}
func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus) bool {
// 如果当前Pod被删除了,或者是MirrorPod,则直接返回false
if pod.DeletionTimestamp == nil || kubetypes.IsMirrorPod(pod) {
return false
}
return m.podDeletionSafety.PodResourcesAreReclaimed(pod, status)
}
3.1.1.2. mergePodStatus
mergePodStatus |
该方法主要是用于合并新老Pod
的Condition
func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus, couldHaveRunningContainers bool) v1.PodStatus {
// 给定切片的长度,防止动态扩容造成性能损失
podConditions := make([]v1.PodCondition, 0, len(oldPodStatus.Conditions)+len(newPodStatus.Conditions))
for _, c := range oldPodStatus.Conditions {
if !kubetypes.PodConditionByKubelet(c.Type) {
podConditions = append(podConditions, c)
}
}
for _, c := range newPodStatus.Conditions {
if kubetypes.PodConditionByKubelet(c.Type) {
podConditions = append(podConditions, c)
}
}
newPodStatus.Conditions = podConditions
if !podutil.IsPodPhaseTerminal(oldPodStatus.Phase) && podutil.IsPodPhaseTerminal(newPodStatus.Phase) {
if couldHaveRunningContainers {
newPodStatus.Phase = oldPodStatus.Phase
newPodStatus.Reason = oldPodStatus.Reason
newPodStatus.Message = oldPodStatus.Message
}
}
if podutil.IsPodPhaseTerminal(newPodStatus.Phase) {
if podutil.IsPodReadyConditionTrue(newPodStatus) || podutil.IsContainersReadyConditionTrue(newPodStatus) {
containersReadyCondition := generateContainersReadyConditionForTerminalPhase(newPodStatus.Phase)
podutil.UpdatePodCondition(&newPodStatus, &containersReadyCondition)
podReadyCondition := generatePodReadyConditionForTerminalPhase(newPodStatus.Phase)
podutil.UpdatePodCondition(&newPodStatus, &podReadyCondition)
}
}
return newPodStatus
}
3.1.2. syncBatch
syncBatch |
继续看看syncBatch
究竟干了啥:
// pkg/kubelet/status/status_manager.go
func (m *manager) syncBatch() {
var updatedStatuses []podStatusSyncRequest
podToMirror, mirrorToPod := m.podManager.GetUIDTranslations()
func() { // Critical section
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
// 如果当前Pod在StatusManager中没找到,并且也不是MirrorPod,说明当前Pod早已经被删除了,这里更新Map,是否内存空间
for uid := range m.apiStatusVersions {
_, hasPod := m.podStatuses[types.UID(uid)]
_, hasMirror := mirrorToPod[uid]
if !hasPod && !hasMirror {
delete(m.apiStatusVersions, uid)
}
}
// 遍历所有的Pod,看看他们是否需要更新
for uid, status := range m.podStatuses {
syncedUID := kubetypes.MirrorPodUID(uid)
if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok {
if mirrorUID == "" {
klog.V(5).InfoS("Static pod does not have a corresponding mirror pod; skipping",
"podUID", uid,
"pod", klog.KRef(status.podNamespace, status.podName))
continue
}
syncedUID = mirrorUID
}
if m.needsUpdate(types.UID(syncedUID), status) {
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
} else if m.needsReconcile(uid, status.status) {
delete(m.apiStatusVersions, syncedUID)
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
}
}
}()
for _, update := range updatedStatuses {
klog.V(5).InfoS("Status Manager: syncPod in syncbatch", "podUID", update.podUID)
m.syncPod(update.podUID, update.status)
}
}
3.2. SetPodStatus
TODO
3.3. SetContainerReadiness
TODO
3.4. SetContainerStartup
TODO
3.5. TerminatePod
TODO
3.6. RemoveOrphanedStatuses
TODO
4. Reference
Kubernetes源码分析——kubelet