kubelet组件的启动流程源码分析

news2024/9/21 0:39:28

概述

摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。

正文

kubelet的作用

这里对kubelet的作用做一个简单总结。

  • 节点管理

    • 节点的注册

    • 节点状态更新

  • 容器管理(pod生命周期管理)

    • 监听apiserver的容器事件

    • 容器的创建、删除(CRI)

    • 容器的网络的创建与删除(CNI)

    • 容器状态监控

    • 容器的驱逐

  • 监控

    • cadvisor
    • healthz

kubelet的原理

在这里插入图片描述
(图片来源于网络,如有侵权请联系作者)

如下 kubelet 内部组件结构图所示,Kubelet 由许多内部组件构成

  • Kubelet API,包括 10250 端口的认证 API、4194 端口的 cAdvisor API、10255 端口的只读 API 以及 10248 端口的健康检查 API
  • syncLoop:从 API 或者 manifest 目录接收 Pod 更新,发送到 podWorkers 处理,大量使用 channel 处理来处理异步请求
  • 辅助的 manager,如 cAdvisor、PLEG、Volume Manager 等,处理 syncLoop 以外的其他工作
  • CRI:容器执行引擎接口,负责与 container runtime shim 通信
  • 容器执行引擎,如 dockershim、rkt 等
  • 网络插件,目前支持 CNI 和 kubenet

kubelet的启动参数


/usr/local/bin/kubelet
 --address=<node-name>
 # 指定 kubelet 与 apiserver 通信的端口
 --port=10250
 --healthz-bind-address=0.0.0.0
 # 指定 只读api 的端口
 --read-only-port=10255
 # 注册 node 节点使用的hostname
 --hostname_override=<node-name>
 # 重要!!! 指定kubeletconfig配置文件的路径
 --config=/home/kube/kubernetes/conf/config.yaml
 # 指定 pause 容器的image下载路径
 --pod-infra-container-image=mirrors.myoas.com/nebula-docker/seg/pod/pause:3.1
 # 指定 kubelet 在节点上存储数据和文件的根目录。
 --root-dir=/home/kube/kubernetes/lib/kubelet
 # 每个宿主最大能创建多少个POD
 --max-pods=60
 # 指定日志输出记录级别,4表示记录含有调试信息的所有信息
 --v=4
 # 指定cni插件
 --network-plugin=cni
 --cni-conf-dir=/etc/cni/net.d
 --cni-bin-dir=/opt/cni/bin
 # 指定了 kubelet 从 Kubernetes API Server 同步更新的时间间隔(以秒为单位),默认是60秒
 --sync-frequency=5s
 # 指定kubeconfig的路径,用于节点注册的认证
 --kubeconfig=/home/kube/kubernetes/conf/kubelet.kubeconfig
 # 存放认证文件的目录
 --cert-dir=/home/kube/ssl/pkc
 # 设置系统保留资源
 --system-reserved=cpu=2000m,memory=20000Mi
 # 支持宿主节点使用swap
 --fail-swap-on=false

kubelet监听的端口

kubelet 默认监听三个端口,分别为 10250 、10255、10248(有些k8s版本的kubelet也包括cadvisor的4194端口)

  • 10250: kubelet server 与 apiserver 通信的端口,定期请求 apiserver 获取自己所应当处理的任务,通过该端口可以访问获取 node 资源以及状态。

  • 10255: 提供了 pod 、 node、metric和cadvisor 的信息,接口以只读形式暴露出去,访问该端口不需要认证和鉴权。

    root@ubuntu:~# curl http://10.234.12.78:10255/stats/summary
    {
     "node": {
      "nodeName": "10.234.12.78",
      "systemContainers": [
       {
        "name": "pods",
        "startTime": "2024-09-07T06:00:48Z",
        "cpu": {
         "time": "2024-09-07T11:49:09Z",
         "usageNanoCores": 12571621,
         "usageCoreNanoSeconds": 261530338504
        },
        // 输出略
    }
    
root@ubuntu:~# curl http://10.234.12.77:10255/metrics  |head -10
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0# HELP apiserver_audit_event_total [ALPHA] Counter of audit events generated and sent to the audit backend.
# TYPE apiserver_audit_event_total counter
apiserver_audit_event_total 0
# HELP apiserver_audit_requests_rejected_total [ALPHA] Counter of apiserver requests rejected due to an error in audit logging backend.
# TYPE apiserver_audit_requests_rejected_total counter
apiserver_audit_requests_rejected_total 0
# HELP apiserver_client_certificate_expiration_seconds [ALPHA] Distribution of the remaining lifetime on the certificate used to authenticate a request.
# TYPE apiserver_client_certificate_expiration_seconds histogram
apiserver_client_certificate_expiration_seconds_bucket{le="0"} 0
apiserver_client_certificate_expiration_seconds_bucket{le="1800"} 0
// 输出略

root@ubuntu:~# curl http://10.234.12.77:4194/metrics  |head -10
# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision.
# TYPE cadvisor_version_info gauge
cadvisor_version_info{cadvisorRevision="511ec9ef",cadvisorVersion="v0.33.0",dockerVersion="18.09.7",kernelVersion="4.15.0-147-generic",osVersion="Alpine Linux v3.8"} 1
# HELP container_cpu_cfs_periods_total Number of elapsed enforcement period intervals.
# TYPE container_cpu_cfs_periods_total counter
container_cpu_cfs_periods_total{container_label_annotation_io_kubernetes_container_hash="",container_label_annotation_io_kubernetes_container_ports="",container_label_annotation_io_kubernetes_container_restartCount="",container_label_annotation_io_kubernetes_container_terminationMessagePath="",container_label_annotation_io_kubernetes_container_terminationMessagePolicy="",container_label_annotation_io_kubernetes_pod_terminationGracePeriod="",container_label_annotation_kubernetes_io_config_seen="",container_label_annotation_kubernetes_io_config_source="",container_label_app="",container_label_controller_revision_hash="",container_label_io_kubernetes_container_logpath="",container_label_io_kubernetes_container_name="",container_label_io_kubernetes_docker_type="",container_label_io_kubernetes_pod_name="",container_label_io_kubernetes_pod_namespace="",container_label_io_kubernetes_pod_uid="",container_label_io_kubernetes_sandbox_id="",container_label_pod_template_generation="",id="/kubepods/burstable/podc387ad42-a56c-4e29-bc78-0264f44614b7",image="",name=""} 212064 1725710124084
// 输出略
  • 10248: 通过访问该端口可以判断 kubelet 是否正常工作, 通过 kubelet 的启动参数 --healthz-port--healthz-bind-address 来指定监听的地址和端口。

    root@ubuntu:~# curl http://10.234.12.78:10248/healthz
    ok
    

查询 Node 汇总信息

  • 在集群内部可以直接访问 kubelet 的 10255 端口
curl http://<node-name>:10255/stats/summary
  • 查询kubelet健康状态
curl http://<node-name>:10248/healthz
  • 查询kubelet的metrics
curl http://<master>:58201/api/v1/nodes/<node-name>/proxy/metrics
  • 查询cadvisor
curl http://<master>:58201/api/v1/nodes/<node-name>/proxy/metrics/cadvisor

pod创建流程

在这里插入图片描述
(图片来源于网络,如有侵权请联系作者)

上图是一个典型的pod创建流程图,kubelet通过syncloop 监听到 apiserver 将 pod调度到本机上,之后kubelet通过与dockershim交互,创建container,准备cni,准备image.创建完成container后,kubelet将container的状态信息反馈给apiserver.

kubelet启动源码解析

说明:基于 kubernetes v1.18.0 源码分析

再对kubelet的基础知识有一定了解后,我们下面正式进入kubelet启动流程的源码分析。源码位于k8s.io/kubernetes/cmd/kubelet/kubelet.go

kubeletconfig

在进行源码分析之前,我们分析Kubelet使用的配置文件kubeletconfig,kubeletconfig包括了kubelet程序运行的重要参数信息。

查看kubeletconfig的内容

  • 方法一: 直接访问
root@ubuntu:~# curl -X GET https://10.234.12.78:10250/configz -k |jq

方法二:通过kubectl proxy 间接访问

root@ubuntu:~# kubectl proxy
Starting to serve on 127.0.0.1:8001

查看kubeletconfig的内容

curl -X GET http://127.0.0.1:8001/api/v1/nodes/<node-name>/proxy/configz | jq .

kubeletconfig的内容如下


root@ubuntu:~# curl -X GET http://127.0.0.1:8001/api/v1/nodes/<node-name>/proxy/configz | jq .
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1985  100  1985    0     0   351k      0 --:--:-- --:--:-- --:--:--  387k
{
  "kubeletconfig": {
    "staticPodPath": "/etc/kubernetes/manifests",
    "syncFrequency": "5s",
    "fileCheckFrequency": "20s",
    "httpCheckFrequency": "20s",
    "address": "<node-name>",
    "port": 10250,
    "tlsCertFile": "/home/kube/ssl/pkc/kubelet.crt",
    "tlsPrivateKeyFile": "/home/kube/ssl/pkc/kubelet.key",
    "rotateCertificates": true,
    "authentication": {
      "x509": {
        "clientCAFile": "/home/kube/ssl/pkc/ca.crt"
      },
      "webhook": {
        "enabled": true,
        "cacheTTL": "2m0s"
      },
      "anonymous": {
        "enabled": true
      }
    },
    "authorization": {
      "mode": "Webhook",
      "webhook": {
        "cacheAuthorizedTTL": "5m0s",
        "cacheUnauthorizedTTL": "30s"
      }
    },
    "registryPullQPS": 5,
    "registryBurst": 10,
    "eventRecordQPS": 5,
    "eventBurst": 10,
    "enableDebuggingHandlers": true,
    "healthzPort": 10248,
    "healthzBindAddress": "0.0.0.0",
    "oomScoreAdj": -999,
    "clusterDomain": "cluster.local",
    "clusterDNS": [
      "10.96.0.10"
    ],
    "streamingConnectionIdleTimeout": "4h0m0s",
    "nodeStatusUpdateFrequency": "10s",
    "nodeStatusReportFrequency": "1m0s",
    "nodeLeaseDurationSeconds": 40,
    "imageMinimumGCAge": "2m0s",
    "imageGCHighThresholdPercent": 85,
    "imageGCLowThresholdPercent": 80,
    "volumeStatsAggPeriod": "1m0s",
    "cgroupsPerQOS": true,
    "cgroupDriver": "cgroupfs",
    "cpuManagerPolicy": "none",
    "cpuManagerReconcilePeriod": "10s",
    "topologyManagerPolicy": "none",
    "runtimeRequestTimeout": "2m0s",
    "hairpinMode": "promiscuous-bridge",
    "maxPods": 60,
    "podPidsLimit": -1,
    "resolvConf": "/etc/resolv.conf",
    "cpuCFSQuota": true,
    "cpuCFSQuotaPeriod": "100ms",
    "maxOpenFiles": 1000000,
    "contentType": "application/vnd.kubernetes.protobuf",
    "kubeAPIQPS": 5,
    "kubeAPIBurst": 10,
    "serializeImagePulls": true,
    "evictionHard": {
      "imagefs.available": "15%",
      "memory.available": "100Mi",
      "nodefs.available": "10%",
      "nodefs.inodesFree": "5%"
    },
    "evictionPressureTransitionPeriod": "5m0s",
    "enableControllerAttachDetach": true,
    "makeIPTablesUtilChains": true,
    "iptablesMasqueradeBit": 14,
    "iptablesDropBit": 15,
    "failSwapOn": false,
    "containerLogMaxSize": "10Mi",
    "containerLogMaxFiles": 5,
    "configMapAndSecretChangeDetectionStrategy": "Watch",
    "systemReserved": {
      "cpu": "2000m",
      "memory": "20000Mi"
    },
    "enforceNodeAllocatable": [
      "pods"
    ]
  }
}

main

kubelet组件的启动入口main(),在kubernetes/cmd/kubelet/kubelet.go

kubelet与kubernetes其他组件一样还是使用corba框架,进行命令行参数解析。

func main() {
	rand.Seed(time.Now().UnixNano())
	// 调用  app.NewKubeletCommand()
	command := app.NewKubeletCommand()
	logs.InitLogs()
	defer logs.FlushLogs()

	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

NewKubeletCommand

NewKubeletCommand 的执行逻辑包括:

  1. 从命令行参数与kubeletconfig中,读取参数

  2. 参数校验,配置文件校验

  3. 初始化默认的 featureGate 配置。

    具体有哪些featuregate可以参考kubernete官方文档feature-gates

  4. 使用命令行参数和配置文件的参数,构建 KubeletServer

  5. KubeletServer来 构建 kubeletDeps,kubeletDeps 包含 kubelet 运行所必须的配置

  6. 将前面创建的 featureGate, KubeletServer,kubeletDeps传入Run()函数,进行启动kubelet


// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {
	cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
	cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
  
  // 从命令行参数中读取配置
	kubeletFlags := options.NewKubeletFlags()
  // 从 Kubeletconfig 中读取配置, Kubeletconfig配置文件由 --config=xxx 指定
	kubeletConfig, err := options.NewKubeletConfiguration()
	// programmer error
	if err != nil {
		klog.Fatal(err)
	}

	cmd := &cobra.Command{
		Use: componentKubelet,
		Long: `The kubelet is the primary "node agent" that runs on each
node. It can register the node with the apiserver using one of: the hostname; `
		DisableFlagParsing: true,
		Run: func(cmd *cobra.Command, args []string) {
			// initial flag parse, since we disable cobra's flag parsing
      // Parse 解析命令行参数
			if err := cleanFlagSet.Parse(args); err != nil {
				cmd.Usage()
				klog.Fatal(err)
			}

			// check if there are non-flag arguments in the command line
			cmds := cleanFlagSet.Args()
			if len(cmds) > 0 {
				cmd.Usage()
				klog.Fatalf("unknown command: %s", cmds[0])
			}

			// short-circuit on help
			help, err := cleanFlagSet.GetBool("help")
			if err != nil {
				klog.Fatal(`"help" flag is non-bool, programmer error, please correct`)
			}
			if help {
				cmd.Help()
				return
			}

			// short-circuit on verflag
			verflag.PrintAndExitIfRequested()
			utilflag.PrintFlags(cleanFlagSet)

			// set feature gates from initial flags-based config
      // 初始化 featureGate 配置
			if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
				klog.Fatal(err)
			}

			// validate the initial KubeletFlags
      // 校验命令行参数
			if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
				klog.Fatal(err)
			}
			// pause 容器下载的路径必须指定。--pod-infra-container-image== 指定的pause容器的image下载路径
			if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
				klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
			}

			// load kubelet config file, if provided
      // 加载 kubeconfig 配置文件
			if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
				kubeletConfig, err = loadConfigFile(configFile)
				if err != nil {
					klog.Fatal(err)
				}
				// We must enforce flag precedence by re-parsing the command line into the new object.
				// This is necessary to preserve backwards-compatibility across binary upgrades.
				// See issue #56171 for more details.
				if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
					klog.Fatal(err)
				}
				// update feature gates based on new config
				if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
					klog.Fatal(err)
				}
			}

			// We always validate the local configuration (command line + config file).
			// This is the default "last-known-good" config for dynamic config, and must always remain valid.
      // 校验 kubeletconfig 配置文件
			if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
				klog.Fatal(err)
			}

			// use dynamic kubelet config, if enabled
      // 处理动态配置 
			var kubeletConfigController *dynamickubeletconfig.Controller
			if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
				var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
				dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
					func(kc *kubeletconfiginternal.KubeletConfiguration) error {
						return kubeletConfigFlagPrecedence(kc, args)
					})
				if err != nil {
					klog.Fatal(err)
				}
				// If we should just use our existing, local config, the controller will return a nil config
				if dynamicKubeletConfig != nil {
					kubeletConfig = dynamicKubeletConfig
					// Note: flag precedence was already enforced in the controller, prior to validation,
					// by our above transform function. Now we simply update feature gates from the new config.
					if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
						klog.Fatal(err)
					}
				}
			}

			// construct a KubeletServer from kubeletFlags and kubeletConfig
      // 用上面的命令行参数与kubeletconfig来构建一个 KubeletServer 对象
			kubeletServer := &options.KubeletServer{
				KubeletFlags:         *kubeletFlags,
				KubeletConfiguration: *kubeletConfig,
			}

			// use kubeletServer to construct the default KubeletDeps
      // 使用 kubeletServer 构建一个的 kubeletDeps, kubeletDeps是 kubelet启动所依赖的条件
			kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
			if err != nil {
				klog.Fatal(err)
			}

			// add the kubelet config controller to kubeletDeps
			kubeletDeps.KubeletConfigController = kubeletConfigController

			// set up stopCh here in order to be reused by kubelet and docker shim
			stopCh := genericapiserver.SetupSignalHandler()

			// start the experimental docker shim, if enabled
			if kubeletServer.KubeletFlags.ExperimentalDockershim {
				if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
					klog.Fatal(err)
				}
				return
			}

			// run the kubelet
      // 启动 kubelet
			klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
			if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil {
				klog.Fatal(err)
			}
		},
	}

	// keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flags
	kubeletFlags.AddFlags(cleanFlagSet)
	options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
	options.AddGlobalFlags(cleanFlagSet)
	cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))

	// ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flags
	const usageFmt = "Usage:\n  %s\n\nFlags:\n%s"
	cmd.SetUsageFunc(func(cmd *cobra.Command) error {
		fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
		return nil
	})
	cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
		fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
	})

	return cmd
}

Run

Run使用给定的Dependencies,来运行指定的KubeletServer,而且它永远不应该退出。kubeDeps参数可以是nil,如果是nil的话,它将从KubeletServer上的设置初始化。否则,将假定调用者已经设置了Dependencies对象,并且不会生成默认对象。

从源码可以看到,Run对OS类型是windows时做一些处理,之后就马上进入run方法

// Run runs the specified KubeletServer with the given Dependencies. This should never exit.
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
// not be generated.
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error {
	// To help debugging, immediately log version
	klog.Infof("Version: %+v", version.Get())
  // 如果 OS 是 Windows 时,这做一些处理
	if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
		return fmt.Errorf("failed OS init: %v", err)
	}
  // 调用run方法进一步处理
	if err := run(s, kubeDeps, featureGate, stopCh); err != nil {
		return fmt.Errorf("failed to run Kubelet: %v", err)
	}
	return nil
}

run

run() 函数很长,总结下它的做的工作包括:

  • 为 kubelet 设置默认的 FeatureGates,kubelet 所有的 FeatureGates 可以通过命令参数查看,k8s 中处于 Alpha 状态的 FeatureGates 在组件启动时默认关闭,处于 Beta 和 GA 状态的默认开启;
  • 校验 kubelet 的参数;
  • 将当前的配置文件注册到 http server /configz URL 中;这样就可以通过curl -X GET https://127.0.0.1:10250/configz -k查看kubeletconfig的配置信息
  • 检查 kubelet 启动模式是否为 standalone 模式,此模式下不会和 apiserver 交互,主要用于 kubelet 的调试;
  • 初始化 kubeDeps,kubeDeps 中包含 kubelet 的一些依赖,主要有 KubeClientEventClientHeartbeatClientAuthcadvisorContainerManager
  • 配置cgroupRoot目录。通过参数指定 --cgroup-root=。kubeletCgroup 用于指定 kubelet 进程自身的 cgroup 目录。runtimeCgroup 用于指定 kubelet 所管理的所有容器的 cgroup 目录。SystemCgroups 用于指定 kubelet 挂载 cgroup 文件系统时的根目录。
  • 检查是否以 root 用户启动;
  • 为进程设置 oom 分数,默认为 -999,分数范围为 [-1000, 1000],越小越不容易被 kill 掉;
  • 调用 RunKubelet 方法;
  • 检查 kubelet 是否启动了动态配置功能;
  • 启动 Healthz http server;默认是10248端口
  • 如果使用 systemd 启动,通知 systemd kubelet 已经启动;

func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) {
	// Set global feature gates based on the value on the initial KubeletServer
  // 设置 全局的 feature
	err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
	if err != nil {
		return err
	}
	// validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
  // 对初始化的 KubeletServer 进行校验
	if err := options.ValidateKubeletServer(s); err != nil {
		return err
	}

	// Obtain Kubelet Lock File
	if s.ExitOnLockContention && s.LockFilePath == "" {
		return errors.New("cannot exit on lock file contention: no lock file specified")
	}
	done := make(chan struct{})
	if s.LockFilePath != "" {
		klog.Infof("acquiring file lock on %q", s.LockFilePath)
		if err := flock.Acquire(s.LockFilePath); err != nil {
			return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
		}
		if s.ExitOnLockContention {
			klog.Infof("watching for inotify events for: %v", s.LockFilePath)
			if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
				return err
			}
		}
	}

	// Register current configuration with /configz endpoint
  // 注册当前配置文到 /configz http访问点, 可以通过访问 ”curl -X GET https://<nodename>:10250/configz -k“
	err = initConfigz(&s.KubeletConfiguration)
	if err != nil {
		klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
	}

	if len(s.ShowHiddenMetricsForVersion) > 0 {
		metrics.SetShowHidden()
	}

	// About to get clients and such, detect standaloneMode
	standaloneMode := true
	if len(s.KubeConfig) > 0 {
		standaloneMode = false
	}
	
  // 初始化 kubeDeps
	if kubeDeps == nil {
		kubeDeps, err = UnsecuredDependencies(s, featureGate)
		if err != nil {
			return err
		}
	}

	if kubeDeps.Cloud == nil {
		if !cloudprovider.IsExternal(s.CloudProvider) {
			cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
			if err != nil {
				return err
			}
			if cloud == nil {
				klog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
			} else {
				klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
			}
			kubeDeps.Cloud = cloud
		}
	}

  // 初始 hostName, nodeName
	hostName, err := nodeutil.GetHostname(s.HostnameOverride)
	if err != nil {
		return err
	}
	nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
	if err != nil {
		return err
	}

	// if in standalone mode, indicate as much by setting all clients to nil
  // 如果是 standalone 模式,则设置 KubeClient ,EventClient,HeartbeatClient 为空
	switch {
	case standaloneMode:
		kubeDeps.KubeClient = nil
		kubeDeps.EventClient = nil
		kubeDeps.HeartbeatClient = nil
		klog.Warningf("standalone mode, no API client")

  // 初始化 kubeClient , EventClient, HearbeatClient
	case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
		clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
		if err != nil {
			return err
		}
		if closeAllConns == nil {
			return errors.New("closeAllConns must be a valid function other than nil")
		}
		kubeDeps.OnHeartbeatFailure = closeAllConns

		kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
		if err != nil {
			return fmt.Errorf("failed to initialize kubelet client: %v", err)
		}

		// make a separate client for events
		eventClientConfig := *clientConfig
		eventClientConfig.QPS = float32(s.EventRecordQPS)
		eventClientConfig.Burst = int(s.EventBurst)
		kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
		if err != nil {
			return fmt.Errorf("failed to initialize kubelet event client: %v", err)
		}

		// make a separate client for heartbeat with throttling disabled and a timeout attached
		heartbeatClientConfig := *clientConfig
		heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
		// The timeout is the minimum of the lease duration and status update frequency
		leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
		if heartbeatClientConfig.Timeout > leaseTimeout {
			heartbeatClientConfig.Timeout = leaseTimeout
		}

		heartbeatClientConfig.QPS = float32(-1)
		kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
		if err != nil {
			return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
		}
	}

  // 出还是 auth 模块
	if kubeDeps.Auth == nil {
		auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
		if err != nil {
			return err
		}
		kubeDeps.Auth = auth
		runAuthenticatorCAReload(stopCh)
	}
	
  // 这种 cgroupRoot (linux通过cgroup现在容器的资源使用,cgroupRoot就是 cgroup 对应的根目录,默认是/sys/fs/cgroup/systemd/ 目录)
  
  // kubeletCgroup 用于指定 kubelet 进程自身的 cgroup 目录。
  // runtimeCgroup 用于指定 kubelet 所管理的所有容器的 cgroup 目录。 --cgroup-root=/my/cgroup/path
  // SystemCgroups 用于指定 kubelet 挂载 cgroup 文件系统时的根目录。
  // 这些配置项允许对 kubelet 自身和所管理的容器在 cgroup 层面进行资源限制和隔离。它们可以根据需求进行自定义配置,以满足特定的部署要求。
	var cgroupRoots []string
  // 在 systemd 就是一个 cgroupDriver 的一种
	cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))
	kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
	if err != nil {
		klog.Warningf("failed to get the kubelet's cgroup: %v.  Kubelet system container metrics may be missing.", err)
	} else if kubeletCgroup != "" {
		cgroupRoots = append(cgroupRoots, kubeletCgroup)
	}

	runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
	if err != nil {
		klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)
	} else if runtimeCgroup != "" {
		// RuntimeCgroups is optional, so ignore if it isn't specified
		cgroupRoots = append(cgroupRoots, runtimeCgroup)
	}

	if s.SystemCgroups != "" {
		// SystemCgroups is optional, so ignore if it isn't specified
		cgroupRoots = append(cgroupRoots, s.SystemCgroups)
	}

  // 配置 cavdisor 
	if kubeDeps.CAdvisorInterface == nil {
		imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
		kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
		if err != nil {
			return err
		}
	}

	// Setup event recorder if required.
  // 配置 事件记录器
	makeEventRecorder(kubeDeps, nodeName)

  // 初始化 ContainerManager
	if kubeDeps.ContainerManager == nil {
		if s.CgroupsPerQOS && s.CgroupRoot == "" {
			klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")
			s.CgroupRoot = "/"
		}

		var reservedSystemCPUs cpuset.CPUSet
		var errParse error
		if s.ReservedSystemCPUs != "" {
			reservedSystemCPUs, errParse = cpuset.Parse(s.ReservedSystemCPUs)
			if errParse != nil {
				// invalid cpu list is provided, set reservedSystemCPUs to empty, so it won't overwrite kubeReserved/systemReserved
				klog.Infof("Invalid ReservedSystemCPUs \"%s\"", s.ReservedSystemCPUs)
				return errParse
			}
			// is it safe do use CAdvisor here ??
			machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
			if err != nil {
				// if can't use CAdvisor here, fall back to non-explicit cpu list behavor
				klog.Warning("Failed to get MachineInfo, set reservedSystemCPUs to empty")
				reservedSystemCPUs = cpuset.NewCPUSet()
			} else {
				reservedList := reservedSystemCPUs.ToSlice()
				first := reservedList[0]
				last := reservedList[len(reservedList)-1]
				if first < 0 || last >= machineInfo.NumCores {
					// the specified cpuset is outside of the range of what the machine has
					klog.Infof("Invalid cpuset specified by --reserved-cpus")
					return fmt.Errorf("Invalid cpuset %q specified by --reserved-cpus", s.ReservedSystemCPUs)
				}
			}
		} else {
			reservedSystemCPUs = cpuset.NewCPUSet()
		}

		if reservedSystemCPUs.Size() > 0 {
			// at cmd option valication phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
			klog.Infof("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved=\"%v\", SystemReserved=\"%v\".", s.KubeReserved, s.SystemReserved)
			if s.KubeReserved != nil {
				delete(s.KubeReserved, "cpu")
			}
			if s.SystemReserved == nil {
				s.SystemReserved = make(map[string]string)
			}
			s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
			klog.Infof("After cpu setting is overwritten, KubeReserved=\"%v\", SystemReserved=\"%v\"", s.KubeReserved, s.SystemReserved)
		}
		kubeReserved, err := parseResourceList(s.KubeReserved)
		if err != nil {
			return err
		}
		systemReserved, err := parseResourceList(s.SystemReserved)
		if err != nil {
			return err
		}
		var hardEvictionThresholds []evictionapi.Threshold
		// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
		if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
			hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
			if err != nil {
				return err
			}
		}
		experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
		if err != nil {
			return err
		}

		devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)

		kubeDeps.ContainerManager, err = cm.NewContainerManager(
			kubeDeps.Mounter,
			kubeDeps.CAdvisorInterface,
			cm.NodeConfig{
				RuntimeCgroupsName:    s.RuntimeCgroups,
				SystemCgroupsName:     s.SystemCgroups,
				KubeletCgroupsName:    s.KubeletCgroups,
				ContainerRuntime:      s.ContainerRuntime,
				CgroupsPerQOS:         s.CgroupsPerQOS,
				CgroupRoot:            s.CgroupRoot,
				CgroupDriver:          s.CgroupDriver,
				KubeletRootDir:        s.RootDirectory,
				ProtectKernelDefaults: s.ProtectKernelDefaults,
				NodeAllocatableConfig: cm.NodeAllocatableConfig{
					KubeReservedCgroupName:   s.KubeReservedCgroup,
					SystemReservedCgroupName: s.SystemReservedCgroup,
					EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),
					KubeReserved:             kubeReserved,
					SystemReserved:           systemReserved,
					ReservedSystemCPUs:       reservedSystemCPUs,
					HardEvictionThresholds:   hardEvictionThresholds,
				},
				QOSReserved:                           *experimentalQOSReserved,
				ExperimentalCPUManagerPolicy:          s.CPUManagerPolicy,
				ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
				ExperimentalPodPidsLimit:              s.PodPidsLimit,
				EnforceCPULimits:                      s.CPUCFSQuota,
				CPUCFSQuotaPeriod:                     s.CPUCFSQuotaPeriod.Duration,
				ExperimentalTopologyManagerPolicy:     s.TopologyManagerPolicy,
			},
			s.FailSwapOn,
			devicePluginEnabled,
			kubeDeps.Recorder)

		if err != nil {
			return err
		}
	}
  // 检查是否以 root 权限启动
	if err := checkPermissions(); err != nil {
		klog.Error(err)
	}

	utilruntime.ReallyCrash = s.ReallyCrashForTesting

	// TODO(vmarmol): Do this through container config.
  // 配置 OOMScoreAdj 分数
	oomAdjuster := kubeDeps.OOMAdjuster
	if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
		klog.Warning(err)
	}

	err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
		kubeDeps, &s.ContainerRuntimeOptions,
		s.ContainerRuntime,
		s.RuntimeCgroups,
		s.RemoteRuntimeEndpoint,
		s.RemoteImageEndpoint,
		s.NonMasqueradeCIDR)
	if err != nil {
		return err
	}
	// 调用 RunKubelet 方法执行后续的启动操作
	if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
		return err
	}

	// If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
	if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
		kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
		if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
			return err
		}
	}
	// 启动 healthz 监控检查的http端口
	if s.HealthzPort > 0 {
		mux := http.NewServeMux()
		healthz.InstallHandler(mux)
		go wait.Until(func() {
			err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
			if err != nil {
				klog.Errorf("Starting healthz server failed: %v", err)
			}
		}, 5*time.Second, wait.NeverStop)
	}

	if s.RunOnce {
		return nil
	}

	// If systemd is used, notify it that we have started
  // 如果使用的是systemd,则向 systemd 发送启动ready 的信号
	go daemon.SdNotify(false, "READY=1")

	select {
	case <-done:
		break
	case <-stopCh:
		break
	}

	return nil
}

RunKubelet

run函数调用 RunKubelet 方法执行后续的启动操作,RunKubelet的工作包括:

  1. 设置默认启动特性模式
  2. 调用 createAndInitKubelet ,执行 kubelet 组件的初始化
  3. 检查 kubeDeps.PodConfig. kubeDeps.PodConfig 在 kubelet 中起到管理和存储单个 Pod 配置的作用,为 kubelet 提供了访问和操作 Pod 配置信息的便利性,以支持 Kubernetes 中的容器管理和调度任务
  4. 设置 MaxOpenFiles 。 可以通过kubelet启动参数进行调整。–max-open-files=
  5. 调用 startKubelet,启动 kubelet 中的组件
// RunKubelet is responsible for setting up and running a kubelet.  It is used in three different applications:
//   1 Integration tests
//   2 Kubelet binary
//   3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
	hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
	if err != nil {
		return err
	}
	// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
	nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
	if err != nil {
		return err
	}
	// Setup event recorder if required.
	makeEventRecorder(kubeDeps, nodeName)
  // 1.默认使用特权模式
	capabilities.Initialize(capabilities.Capabilities{
		AllowPrivileged: true,
	})

	credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
	klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory)

	if kubeDeps.OSInterface == nil {
		kubeDeps.OSInterface = kubecontainer.RealOS{}
	}
	// 2. 调用 createAndInitKubelet 
	k, err := createAndInitKubelet(.....)
	if err != nil {
		return fmt.Errorf("failed to create kubelet: %v", err)
	}

	// NewMainKubelet should have set up a pod source config if one didn't exist
	// when the builder was run. This is just a precaution.
  // 3. 检查 kubeDeps.PodConfig
  // kubeDeps.PodConfig 在 kubelet 中起到管理和存储单个 Pod 配置的作用,为 kubelet 提供了访问和操作 Pod 配置信息的便利性,
  // 以支持 Kubernetes 中的容器管理和调度任务
	if kubeDeps.PodConfig == nil {
		return fmt.Errorf("failed to create kubelet, pod source config was nil")
	}
	podCfg := kubeDeps.PodConfig

  // 4. 设置 MaxOpenFiles 
	rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))

	// process pods and exit.
	if runOnce {
		if _, err := k.RunOnce(podCfg.Updates()); err != nil {
			return fmt.Errorf("runonce failed: %v", err)
		}
		klog.Info("Started kubelet as runonce")
	} else {
    // 5. 调用 startKubelet
		startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
		klog.Info("Started kubelet")
	}
	return nil
}

createAndInitKubelet

createAndInitKubelet 中主要调用了三个方法来完成 kubelet 的初始化:

  • kubelet.NewMainKubelet 函数使用提供的配置参数创建一个新的kubelet实例。它接收一些输入参数,例如节点名称、配置对象、主节点的网络地址等,以及其他一些选项。它会初始化kubelet的各种子系统和依赖项,并返回一个指向kubelet实例的指针
  • k.BirthCry() 向 apiserver 发送一条 kubelet 启动了的 event;
  • k.StartGarbageCollection() 启动 垃圾回收,回收 container 和 images;
func createAndInitKubelet(....) (k kubelet.Bootstrap, err error) {
	// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
	// up into "per source" synchronizations
  // 实例化 kubelet 对象,并对 kubelet 依赖的所有模块进行初始化;
	k, err = kubelet.NewMainKubelet(....)
	if err != nil {
		return nil, err
	}
	// 向 apiserver 发送一条 kubelet 启动了的 event;
	k.BirthCry()
	
  // 启动 垃圾回收,回收 container 和 images;
	k.StartGarbageCollection()

	return k, nil
}

startKubelet

  1. k.Run((),启动 kubelet 中的所有模块以及主流程
  2. 启动 kubelet server,默认http监听端口是10250
  3. 启动只读状态http服务,默认是10255
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
	// start the kubelet
  // 启动 kubelet 中的所有模块以及主流程
	go k.Run(podCfg.Updates())

	// start the kubelet server
  // 启动 kubelet server,默认http监听端口是10250
	if enableServer {
		go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)

	}
  // 只读状态api,默认是10255
	if kubeCfg.ReadOnlyPort > 0 {
		go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
	}
  // 启动DefaultFeatureGate中对应http服务的相关端口
	if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
		go k.ListenAndServePodResources()
	}
}

Kubelet.Run

Run 方法是启动 kubelet 的核心方法,其中会启动 kubelet 的依赖模块以及主循环逻辑,该方法的主要逻辑为:

  • 1、注册 logServer;
  • 2、判断是否需要启动 cloud provider sync manager;
  • 3、调用 kl.initializeModules 首先启动不依赖 container runtime 的一些模块;
  • 4、启动 volume manager
  • 5、执行 kl.syncNodeStatus 定时同步 Node 状态;
  • 6、调用 kl.fastStatusUpdateOnce 更新容器运行时启动时间以及执行首次状态同步;
  • 7、判断是否启用 NodeLease 机制;
  • 8、执行 kl.updateRuntimeUp 定时更新 Runtime 状态;
  • 9、执行 kl.syncNetworkUtil 定时同步 iptables 规则;
  • 10、执行 kl.podKiller 定时清理异常 pod,当 pod 没有被 podworker 正确处理的时候,启动一个goroutine 负责 kill 掉 pod;
  • 11、启动 statusManager
  • 12、启动 probeManager
  • 13、启动 runtimeClassManager
  • 14、启动 pleg
  • 15、调用 kl.syncLoop 监听 pod 变化;

// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
  // 配置 logServer
	if kl.logServer == nil {
		kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
	}
  // 检查 kubeClient
	if kl.kubeClient == nil {
		klog.Warning("No api server defined - no node status update will be sent.")
	}

	// Start the cloud provider sync manager
  // 启动 cloudResourceSyncManager 
	if kl.cloudResourceSyncManager != nil {
		go kl.cloudResourceSyncManager.Run(wait.NeverStop)
	}

  // 在initializeModules()中会启动 imageManager,serverCertificateManager,oomWatcher,resourceAnalyzer
	if err := kl.initializeModules(); err != nil {
		kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
		klog.Fatal(err)
	}

	// Start volume manager
  // 启动 volumeManager
	go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

	if kl.kubeClient != nil {
		// Start syncing node status immediately, this may set up things the runtime needs to run.
    // 同步 node 信息
		go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
    // 调用 kl.fastStatusUpdateOnce 更新容器运行时启动时间以及执行首次状态同步
		go kl.fastStatusUpdateOnce()

		// start syncing lease
    // 启动 nodeLease控制器,nodeLease是一种节点健康检查机制
		go kl.nodeLeaseController.Run(wait.NeverStop)
	}
  // 执行 kl.updateRuntimeUp 定时更新 Runtime 状态,如果runtime (例如docker) 状态检查返回false,kubelet 将处于not ready状态
	go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

	// Set up iptables util rules
  // 同步 iptables 规则
	if kl.makeIPTablesUtilChains {
		kl.initNetworkUtil()
	}

	// Start a goroutine responsible for killing pods (that are not properly
	// handled by pod workers).
  // 定时清理异常 pod
	go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)

	// Start component sync loops.
  // 启动 statusManager、probeManager、runtimeClassManager
	kl.statusManager.Start()
	kl.probeManager.Start()

	// Start syncing RuntimeClasses if enabled.
	if kl.runtimeClassManager != nil {
		kl.runtimeClassManager.Start(wait.NeverStop)
	}

	// Start the pod lifecycle event generator.
  // 启动 pleg 即Pod生命周期实际管理器
	kl.pleg.Start()
  // 调用 kl.syncLoop 监听 pod 变化
	kl.syncLoop(updates, kl)
}

nodeLease 机制是 kubelet 与控制平面交互的机制之一,用于保持节点的活性以及更新节点的状态。是Kubernetes 引入了节点健康检查机制。nodeLease 机制允许 kubelet 周期性地向控制平面报告自己的“租赁”,以证明它仍然在运行。如果 kubelet 停止发送此报告,则控制平面将认为该节点已离线,并在控制平面上更新该节点的状态为“离线”。

initializeModules

initializeModules函数负责初始化kubelet的各个子系统和功能模块,以便kubelet能够正常运行和管理容器化工作负载。具体而言,它的工作包括:

  • 1、创建kubelet工作的文件目录,由参数–root-dir指定

  • 2、创建 ContainerLogsDir

  • 3、启动 imageManager

  • 4、启动 certificate manager

  • 5、启动 oomWatcher.

  • 6、启动 resource analyzer

func (kl *Kubelet) initializeModules() error {
    metrics.Register(
        kl.runtimeCache,
        collectors.NewVolumeStatsCollector(kl),
        collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),
    )
    metrics.SetNodeName(kl.nodeName)
    servermetrics.Register()

    // 1、创建文件目录,由参数--root-dir指定
    if err := kl.setupDataDirs(); err != nil {
        return err
    }

    // 2、创建 ContainerLogsDir
    if _, err := os.Stat(ContainerLogsDir); err != nil {
        if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
            klog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)
        }
    }

    // 3、启动 imageManager
    kl.imageManager.Start()

    // 4、启动 certificate manager 
    if kl.serverCertificateManager != nil {
        kl.serverCertificateManager.Start()
    }
    // 5、启动 oomWatcher.
    if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
        return fmt.Errorf("failed to start OOM watcher %v", err)
    }

    // 6、启动 resource analyzer
    kl.resourceAnalyzer.Start()

    return nil
}

Kubelet.synloop

syncLoop是处理pod变化的主循环。它监视来自三个通道(file、apisserver和http)的更改,并创建它们的联合。如果发现到任何新更改,将针对期望状态和运行状态运行同步处理。如果配置没有变化,将每同步频率秒同步最后一个已知的所需状态。

Kubelet.synloop 启动for{}循环调用Kubelet.syncLoopIteration对应监听的通道的变化事件进行处理

// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
	klog.Info("Starting kubelet main sync loop.")
	// The syncTicker wakes up kubelet to checks if there are any pod workers
	// that need to be sync'd. A one-second period is sufficient because the
	// sync interval is defaulted to 10s.
	syncTicker := time.NewTicker(time.Second)
	defer syncTicker.Stop()
	housekeepingTicker := time.NewTicker(housekeepingPeriod)
	defer housekeepingTicker.Stop()
	plegCh := kl.pleg.Watch()
	const (
		base   = 100 * time.Millisecond
		max    = 5 * time.Second
		factor = 2
	)
	duration := base
	// Responsible for checking limits in resolv.conf
	// The limits do not have anything to do with individual pods
	// Since this is called in syncLoop, we don't need to call it anywhere else
	if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
		kl.dnsConfigurer.CheckLimitsForResolvConf()
	}
	// 循环处理 syncLoopIteration 
	for {
		if err := kl.runtimeState.runtimeErrors(); err != nil {
			klog.Errorf("skipping pod synchronization - %v", err)
			// exponential backoff
			time.Sleep(duration)
			duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
			continue
		}
		// reset backoff if we have a success
		duration = base

		kl.syncLoopMonitor.Store(kl.clock.Now())
		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
		kl.syncLoopMonitor.Store(kl.clock.Now())
	}
}

Kubelet.syncLoopIteration

kubelet 的 pods 同步逻辑都在 syncLoopIteration 这里. syncLoopIteration 同时监听下面的 chan, 根据事件做不同的处理.

  • configCh: 监听 file, http, apiserver 的事件更新
  • syncCh: 定时器管道, 每隔一秒去同步最新保存的 pod 状态
  • houseKeepingCh: housekeeping 事件的管道,做 pod 清理工作
  • plegCh: 该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态.
  • livenessManager.Updates: 健康检查发现某个 pod 不可用, kubelet 将根据 Pod 的 restartPolicy 自动执行正确的操作
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
  // 监听 file, http, apiserver 的事件更新
	case u, open := <-configCh:
		// Update from a config source; dispatch it to the right handler
		// callback.
		if !open {
			klog.Errorf("Update channel is closed. Exiting the sync loop.")
			return false
		}

		switch u.Op {
		case kubetypes.ADD:
			klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
			// After restarting, kubelet will get all existing pods through
			// ADD as if they are new pods. These pods will then go through the
			// admission process and *may* be rejected. This can be resolved
			// once we have checkpointing.
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.REMOVE:
			klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
			handler.HandlePodRemoves(u.Pods)
		case kubetypes.RECONCILE:
			klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
			handler.HandlePodReconcile(u.Pods)
		case kubetypes.DELETE:
			klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
			// DELETE is treated as a UPDATE because of graceful deletion.
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.RESTORE:
			klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
			// These are pods restored from the checkpoint. Treat them as new
			// pods.
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.SET:
			// TODO: Do we want to support this?
			klog.Errorf("Kubelet does not support snapshot update")
		}

		if u.Op != kubetypes.RESTORE {
			// If the update type is RESTORE, it means that the update is from
			// the pod checkpoints and may be incomplete. Do not mark the
			// source as ready.

			// Mark the source ready after receiving at least one update from the
			// source. Once all the sources are marked ready, various cleanup
			// routines will start reclaiming resources. It is important that this
			// takes place only after kubelet calls the update handler to process
			// the update to ensure the internal pod cache is up-to-date.
			kl.sourcesReady.AddSource(u.Source)
		}
  // 该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态.
	case e := <-plegCh:
		if isSyncPodWorthy(e) {
			// PLEG event for a pod; sync it.
			if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
				klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
				handler.HandlePodSyncs([]*v1.Pod{pod})
			} else {
				// If the pod no longer exists, ignore the event.
				klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
			}
		}

		if e.Type == pleg.ContainerDied {
			if containerID, ok := e.Data.(string); ok {
				kl.cleanUpContainersInPod(e.ID, containerID)
			}
		}
  //  每隔一秒去同步最新保存的 pod 状态
	case <-syncCh:
		// Sync pods waiting for sync
		podsToSync := kl.getPodsToSync()
		if len(podsToSync) == 0 {
			break
		}
		klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
		handler.HandlePodSyncs(podsToSync)
  // 健康检查发现某个 pod 不可用, kubelet 将根据 Pod 的 restartPolicy 自动执行正确的操作
	case update := <-kl.livenessManager.Updates():
		if update.Result == proberesults.Failure {
			// The liveness manager detected a failure; sync the pod.

			// We should not use the pod from livenessManager, because it is never updated after
			// initialization.
			pod, ok := kl.podManager.GetPodByUID(update.PodUID)
			if !ok {
				// If the pod no longer exists, ignore the update.
				klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
				break
			}
			klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
			handler.HandlePodSyncs([]*v1.Pod{pod})
		}
  // housekeeping 事件的管道,做 pod 清理工作
	case <-housekeepingCh:
		if !kl.sourcesReady.AllReady() {
			// If the sources aren't ready or volume manager has not yet synced the states,
			// skip housekeeping, as we may accidentally delete pods from unready sources.
			klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
		} else {
			klog.V(4).Infof("SyncLoop (housekeeping)")
			if err := handler.HandlePodCleanups(); err != nil {
				klog.Errorf("Failed cleaning pods: %v", err)
			}
		}
	}
	return true
}

kubelet工作原理示意图:

img
(图片来源于网络,如有侵权请联系作者)

此处总结一下 kubelet 启动逻辑中的调用关系如下所示:

                                                                                  |--> NewMainKubelet
                                                                                  |
                                                      |--> createAndInitKubelet --|--> BirthCry
                                                      |                           |
                                    |--> RunKubelet --|                           |--> StartGarbageCollection
                                    |                 |
                                    |                 |--> startKubelet --> k.Run --> kl.syncLoop --> kl.syncLoopIteration
                                    |
NewKubeletCommand --> Run --> run --|--> http.ListenAndServe
                                    |
                                    |--> daemon.SdNotify

结论

本文主要介绍了 kublet的作用以及工作原理。之后从源码分析了kubelet 的启动流程,从分析过程中可以看到 kubelet 启动流程中的环节非常多,kubelet 中也包含了非常多的模块,如果要深入掌握kubelet的工作机制,则后面得对各个模板进行单独详细展开分析。

参考文档

https://feisky.gitbooks.io/kubernetes/content/components/kubelet.html

https://juejin.cn/post/6844903694618607623

https://blog.tianfeiyu.com/source-code-reading-notes/kubernetes/kubelet_init.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2115904.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【云原生】docker 部署 Doris 数据库使用详解

目录 一、前言 二、数据分析概述 2.1 什么是数据分析 2.2 数据分析目的和意义 2.3 数据分析常用的技术和工具 2.3.1 编程语言 2.3.2 数据处理和分析库 2.3.3 数据可视化工具 2.3.4 数据库系统 2.3.5大数据处理框架 2.3.6 云服务和平台 2.3.7 其他工具 三、Doris介绍…

Hash Table、HashMap、HashSet学习

文章目录 前言Hash Table&#xff08;散列表&#xff09;基本概念散列函数散列冲突&#xff08;哈希碰撞&#xff09;拉链法红黑树时间复杂度分析 HashMap基础方法使用基本的增删改查其他的方法 实现原理 HashSet基础操作去重原理 前言 本文用于介绍关于Hash Table、HashMap、…

UnLua调用C++函数

一、UnLua调用C全局静态函数 1、新建C类MyLuaUtils&#xff0c;继承BlueprintFunctionLibrary,实现全局静态函数GetInt。 MyLuaUtils.h UCLASS() class LUASHOOTING_API UMyLuaUtils : public UBlueprintFunctionLibrary {GENERATED_BODY()UFUNCTION(BlueprintCallable)static…

leetcode hot100_part17_技巧篇

题目 136.只出现一次的数字 结合题目给的数据特征&#xff0c;使用位运算中的异或^&#xff1b;异或的结果很好记&#xff0c;相互不同就是1&#xff0c;相同就是0&#xff1b;同或一样的 169.多数元素 直接排序了 后面那几个方法不看了&#xff0c;追求效率可以再看&#…

前端工程化详解 包管理工具

前端工程化详解 & 包管理工具 1、工程化体系介绍1.1、 什么是前端工程化1.2、 前端工程化发展 2、脚手架能力2.1 准备阶段2.2 开发阶段2.3 发布流程 3、npm能力3.1 剖析package.json3.1.1 必备属性3.1.2 描述信息3.1.3 依赖配置3.1.4 协议3.1.5 目录&文件相关3.1.5.1 程…

MATLAB基础语法知识

环境的配置等等就不写了&#xff0c;网上还是有很多资源可以找&#xff0c;而且正版的要付费&#xff0c;我也是看的网上的搞定的。 一&#xff0c;初识MATLAB 1.1 MATLAB的优势 不需要过多了解各种数值计算方法的具体细节和计算公式&#xff0c;也不需要繁琐的底层编程。可…

Untiy TTF转换为SDF

Untiy TTF转换为SDF 原因 下载的字体是TTF格式&#xff0c;但是TMP使用的是SDF格式&#xff0c;不支持TTF&#xff0c;需要转换网络没有检索到TTF转SDF的教程&#xff0c;可能是太简单了&#xff0c;自己记录一下吧 Unity内转换即可 在Asset中找到自己的TTF右键点击TTF&…

C++入门基础篇

引言 说到编程语言常常听到的就是C语言C Java 。C语言是面向过程的&#xff0c;C是和Java是面向对象的&#xff0c;那么什么是面向对象呢&#xff1f;什么又是面向过程呢&#xff1f;C是什么&#xff1f;封装、继承、多态是什么&#xff1f;且听我絮絮叨叨。 C入门基础 1.命名…

fluent 旋转机械流场与声场仿真-学习笔记

这里写目录标题 1、动网格与滑移网格、运动参考系2、网格拓扑与共节点设置3、模型选择4、关于旋转壁面&#xff08;rotor_blade)的边界条件设置5、滑移网格瞬态计算时间步长设置6、风机声场仿真域![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/7855a595ee704c42a644…

【linux学习指南】Linux项目自动化构建工具 make /makefile进度条代码

文章目录 &#x1f4dd;前言&#x1f320; Makefile 格式&#x1f309;Makefile命令符号 &#x1f320;makefile/make基本原理&#x1f309;总和小案例 &#x1f320;进度条代码&#x1f6a9;总结 &#x1f4dd;前言 一个工程中的源文件多不技计数&#xff0c;其按其按类型、功…

某部门系统主机中病毒分析

一、安全巡检 正在写着代码&#xff0c;我的电脑火绒软件提示有内网攻击&#xff0c;关于一个古老的漏洞&#xff1a;“永恒之蓝”。瞬间来了兴趣&#xff0c;不会现在仍然有电脑中这病毒吧&#xff0c;打开绿盟安全管理平台。根据IP查询记录&#xff0c;果然有很多漏洞。 发…

《深入浅出WPF》读书笔记.11Template机制(上)

《深入浅出WPF》读书笔记.11Template机制(上) 背景 模板机制用于实现控件数据算法的内容与外观的解耦。 《深入浅出WPF》读书笔记.11Template机制(上) 模板机制 模板分类 数据外衣DataTemplate 常用场景 事件驱动和数据驱动的区别 示例代码 使用DataTemplate实现数据样式…

2024Mysql And Redis基础与进阶操作系列(1)作者——LJS[含MySQL的下载、安装、配置详解步骤及报错对应解决方法]

目录 1.数据库与数据库管理系统 1.1 数据库的相关概念 1.2 数据库与数据库管理系统的关系 1.3 常见的数据库简介 Oracle 1. 核心功能 2. 架构和组件 3. 数据存储和管理 4. 高可用性和性能优化 5. 安全性 6. 版本和产品 7. 工具和接口 SQL Server 1. 核心功能 2. 架构和组…

唯徳知识产权产权系统存在任意文件读取漏洞

漏洞描述 深圳市唯德科创信息有限公司&#xff08;以下简称&#xff1a;唯德&#xff09;于2014年在深圳成立&#xff0c;是专业提供企业、代理机构知识产权管理软件供应商&#xff0c;唯德凭借领先的技术实力和深厚的专利行业积累&#xff0c;产品自上市推广以来&#xff0c;…

一文讲懂Spring Event事件通知机制

目录 一 什么是spring event 二 怎么实现spring event 一 什么是spring event 我不会按照官方的解释来说什么是spring event&#xff0c;我只是按照自己的理解来解释&#xff0c;可能原理上会和官方有偏差&#xff0c;但是它的作用和功能就是这个&#xff0c;我更加偏向于从他…

CTK框架(三): 插件的安装

目录 1.方式1&#xff1a;使用ctk框架工厂&#xff0c;适用于调用普通的插件 1.1.步骤 1.2.实现 2.方法2&#xff1a;使用ctk框架启动器&#xff0c;适用于需要eventadmin时 2.1.实现 3.注意事项 1.方式1&#xff1a;使用ctk框架工厂&#xff0c;适用于调用普通的插件 1…

Linux服务器应急响应(下)

目录 介绍步骤 介绍 Linux alias命令用于设置指令的别名。 用户可利用alias&#xff0c;自定指令的别名。若仅输入alias&#xff0c;则可列出目前所有的别名设置。alias的效力仅及于该次登入的操作。若要每次登入是即自动设好别名&#xff0c;可在.profile或.cshrc中设定指令…

ggplot2 缩小的、带箭头的坐标轴 | R语言

1. 效果图 左侧为DimPlot2()效果图。 右侧为DimPlot()效果图&#xff0c;原图。 2. 代码 # DimPlot with 缩小的坐标轴 # # param scObject # param reduction # param group.by # param label # param raster # param legend.position # param ... # # return # expo…

OCC开发_变高箱梁全桥建模

概述 上一篇文章《OCC开发_箱梁梁体建模》中详细介绍了箱梁梁体建模的过程。但是&#xff0c;对于实际桥梁&#xff0c;截面可能存在高度、腹板厚度、顶底板厚度变化&#xff0c;全桥的结构中心线存在平曲线和竖曲线。针对实际情况&#xff0c;通过一个截面拉伸来实现全桥建模显…

长短期记忆神经网络-LSTM回归预测-MATLAB代码实现

一、LSTM简介&#xff08;代码获取&#xff1a;底部公众号&#xff09; 长短期记忆神经网络&#xff08;Long Short-Term Memory, LSTM&#xff09;是一种循环神经网络&#xff08;Recurrent Neural Network, RNN&#xff09;的变体。相比于传统的RNN&#xff0c;LSTM能够更好…