概述
摘要:kube-scheduler是kubernetes系统中的重要组件,kub-scheduler 的核心职责是为待调度的 pod 寻找一个最合适的 node 节点, 然后进行 bind node 绑定, 后面 kubelet 才会监听到并创建真正的 pod。kub-scheduler本身是一个功能强大且负责的核心组件,本文聚焦在调度器的启动过程,从源码层面走读启动逻辑。
正文
说明:基于 kubernetes
v1.12.0
源码分析
kube-scheduler调度过程
kube-scheduler 调度pod时,需要经过 predicates 预选和 priority 优选,选出最合适的node节点。
- 预选就是从集群的所有节点中根据调度算法,筛选(filter)出所有可以运行该 pod 的节点集合
- 优选则是按照算法对预选出来的节点进行打分(score),找到分值最高的节点作为调度节点.
接下来我们先将k8s scheduler的调度过程展开描述。在阅读源码前,我们先了解下kube-schduler的运行逻辑和包含的重要组件。下图是kube-scheduler执行调度任务的核心过程。
kube-scheduler包含的组件:
informer
: 作用是从kube-apiserver获取资源信息与监听变化事件,再将信息保存到Scheduling_queue和ScheduleCache中。Scheduling_queue
:调度队列,是一个优先级队列,用于保存待调度的pod。ScheduleCache
: 调度缓存,包括保存已经调度了的pod信息与node信息,在调度器执行调度任务时,为其提供查询信息。scheduler
: 调度器,利用调度算法为待调度的pod,执行调度任务。Algorithm
: 调度算法,kube-apiserver中的调度算法主要分为2类:predicates 预选和 priority 优选,算法的目的是为待调度的pod寻找一个合适的node.
Kube-scheduler调度过程:
- Kube-scheduler启动时,会初始化并启动各种资源的informer,并初始化调度缓存ScheduleCache、调度队列Scheduling_queue,并且加载调度算法:predicates 预选类算法和 priority 优选类算法。
- Informer启动后,将从kube-apiserver中,首先list出所有pod,并且将已经调度过的pod放入ScheduleCache中保存,将待调度的节点Add()到Scheduling_queue中去。其次list出所有node,将node信息保存到ScheduleCache保存。list完成后,informer开始watch kube-apiserver发送的 pod 变化事件,并将变化信息保存到ScheduleCache或Scheduling_queue中。
- Scheduler会启动16个并发worker,执行调度任务,每个调度任务执行scheduleOne(),即从Scheduling_queue获取一个待调度的 pod,使用predicates和priority算法为pod找一个合适的node。
- 选出最优节点后, 对 apiserver 发起 pod 节点 bind 操作, 其实就是对 pod 的 spec.NodeName 赋值最优节点.
源码结构
kube-scheduler是Kubernetes集群中的一个关键组件,它的源码位于k8s.io/kubernetes
目录下,结构主要包括以下部分:
-
cmd/kube-scheduler: 这是kube-scheduler的主入口点。它负责解析命令行参数,初始化日志,设置信号处理,并启动scheduler
-
pkg/scheduler: 这个目录包含了kube-scheduler的主要逻辑。它包括了调度算法,调度队列,调度器的核心循环等。
-
pkg/scheduler/algorithm: 这个目录包含了调度算法的实现。Kubernetes支持多种调度算法,包括优先级,选择器,节点亲和性等。
-
pkg/scheduler/algorithm/priorities: 这个目录包含了各种优先级函数的实现。优先级函数用于确定Pod应该被调度到哪个节点。
-
pkg/scheduler/algorithm/predicates: 这个目录包含了各种预测函数的实现。预测函数用于判断一个节点是否满足Pod的某些条件。
-
pkg/scheduler/algorithm/priorities/node_affinity: 这个目录包含了节点亲和性的实现。节点亲和性用于确定Pod应该被调度到哪些节点。
-
pkg/scheduler/api: 这个目录包含了调度器的API接口。
-
pkg/scheduler/core: 这个目录包含了调度器的核心逻辑。
-
pkg/scheduler/framework: 这个目录包含了调度框架的实现。调度框架是Kubernetes调度器的核心,它负责调度决策的制定。
-
pkg/scheduler/util: 这个目录包含了调度器的工具函数。
-
pkg/scheduler/volumebinder: 这个目录包含了卷绑定的实现。卷绑定用于确定Pod应该使用哪些卷。
-
pkg/scheduler/volume: 这个目录包含了卷的实现。
kube-scheduler的启动流程
kube-scheduler的启动流程的示意图如下:
Kube-scheduler组件的启动流程,主要步骤有
-
corbra命令行参数解析
-
注册内存的调度算法
-
实例化调度器配置schedulerConfig
-
实例化调度器对象scheduler
-
启动informer同步资源
-
启动leader选举
-
启动调度器sched.run()
corbra命令行参数解析
Kube-scheduler组件使用corbra三方库作为命令行参数解析,函数启动。
源码位置:/k8s.io/kubernetes/cmd/kube-scheduler/scheduler.go
main()作为函数入口
func main() {
rand.Seed(time.Now().UTC().UnixNano())
// 通过命令启动调度器
command := app.NewSchedulerCommand()
// 此次代码有省略...
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
// NewSchedulerCommand creates a *cobra.Command object with default parameters
func NewSchedulerCommand() *cobra.Command {
opts, err := options.NewOptions()
cmd := &cobra.Command{
Use: "kube-scheduler",
Run: func(cmd *cobra.Command, args []string) {
verflag.PrintAndExitIfRequested()
utilflag.PrintFlags(cmd.Flags())
// 此次代码有省略
// 验证配置有效性
if errs := opts.Validate(); len(errs) > 0 {
fmt.Fprintf(os.Stderr, "%v\n", utilerrors.NewAggregate(errs))
os.Exit(1)
}
if len(opts.WriteConfigTo) > 0 {
if err := options.WriteConfigFile(opts.WriteConfigTo, &opts.ComponentConfig); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
glog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
return
}
c, err := opts.Config()
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
// c.Complete()获取完整的配置
// 执行run()
stopCh := make(chan struct{})
if err := Run(c.Complete(), stopCh); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
return cmd
}
注册内置调度函数
server.Run()
函数是kube-scheduler组件的核心,这个函数里面分别完成了注册内存调度算法、调度器配置初始化、http/https服务的启动、调度器对象的创建、调度器配置的启动、leader的选举等。
源码位置:k8s.io/kubernetes/cmd/kube-scheduler/app/server.go
源码分析如下,注意代码注释说明。
// Run runs the Scheduler.
func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
glog.Infof("Version: %+v", version.Get())
// Apply algorithms based on feature gates.
// TODO: make configurable?
// 加载内置的调度算法
algorithmprovider.ApplyFeatureGates()
// Configz registration.
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(c.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}
// Build a scheduler config from the provided algorithm source.
// 构建一个调度器配置,调度器配置里面包括了指定的调度算法、各种资源的Informer、和clientset等非常重要的数据结构
schedulerConfig, err := NewSchedulerConfig(c)
if err != nil {
return err
}
// Create the scheduler.
// 通过schedulerConfig中指定的参数,来创建一个scheduler
sched := scheduler.NewFromConfig(schedulerConfig)
// Prepare the event broadcaster.
// 准备事件广播器
if c.Broadcaster != nil && c.EventClient != nil {
c.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.EventClient.Events("")})
}
// Start up the healthz server.
// 启动healthz server服务,可以通过curl http://<kube-sheduler>:<port>/healthz检查http服务器是否正常
if c.InsecureServing != nil {
separateMetrics := c.InsecureMetricsServing != nil
handler := buildHandlerChain(newHealthzHandler(&c.ComponentConfig, separateMetrics), nil, nil)
if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
return fmt.Errorf("failed to start healthz server: %v", err)
}
}
// 启动metrics servier
// 用于promethuse监控数据采集,可以通过curl <kube-sheduler>:<port>/metrics获取监控数据
if c.InsecureMetricsServing != nil {
handler := buildHandlerChain(newMetricsHandler(&c.ComponentConfig), nil, nil)
if err := c.InsecureMetricsServing.Serve(handler, 0, stopCh); err != nil {
return fmt.Errorf("failed to start metrics server: %v", err)
}
}
// 启动https对应的healthz服务
if c.SecureServing != nil {
handler := buildHandlerChain(newHealthzHandler(&c.ComponentConfig, false), c.Authentication.Authenticator, c.Authorization.Authorizer)
if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start healthz server: %v", err)
}
}
// Start all informers.
// 启动所有informer
go c.PodInformer.Informer().Run(stopCh)
c.InformerFactory.Start(stopCh)
// Wait for all caches to sync before scheduling.
// 等待informer将数据同步到cache
c.InformerFactory.WaitForCacheSync(stopCh)
controller.WaitForCacheSync("scheduler", stopCh, c.PodInformer.Informer().HasSynced)
// Prepare a reusable run function.
// 定义一个用于启动scheduler的run函数,这个函数接收ctx用于控制sheduler的停止
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
}
ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
defer cancel()
// 启动一个协程,当协程收到stopCh信号之后,会执行ctx.Done(),这样就间接的通过stopCh控制了sheduler的停止
go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()
// If leader election is enabled, run via LeaderElector until done and exit.
// 如果启用了leader的选举,就会进入leader的选举
if c.LeaderElection != nil {
c.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
utilruntime.HandleError(fmt.Errorf("lost master"))
},
}
leaderElector, err := leaderelection.NewLeaderElector(*c.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
// 启动循环leader选举的任务
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// Leader election is disabled, so run inline until done.
// 启动scheduler
run(ctx)
return fmt.Errorf("finished without leader elect")
}
接下来,我们对如何实现内置调度算法注册的过程做一个详细说明。
当执行到k8s.io/kubernetes/cmd/kube-scheduler/app/server.go
的server.Run()函数时,通过``Import “k8s.io/kubernetes/pkg/scheduler/factory”` 包,定义并初始化了2个map数据结构,分别用于存放 pridicate 算法 和 priority 算法。
import (
// 代码略
"k8s.io/kubernetes/pkg/scheduler/factory"
// 代码略
)
k8s.io/kubernetes/pkg/scheduler/factory
包中定义了存放算法的map: fitPredicateMap 和 priorityFunctionMap
fitPredicateMap: map结构,存放预选调度算法
priorityFunctionMap: map结构,存放优选调度算法
var (
schedulerFactoryMutex sync.Mutex
// maps that hold registered algorithm types
// 定义存放 pridicate 算法的map
fitPredicateMap = make(map[string]FitPredicateFactory)
mandatoryFitPredicates = sets.NewString()
// 定义存放 priority 算法的map
priorityFunctionMap = make(map[string]PriorityConfigFactory)
// 存放所有调度算法
algorithmProviderMap = make(map[string]AlgorithmProviderConfig)
// Registered metadata producers
priorityMetadataProducer PriorityMetadataProducerFactory
predicateMetadataProducer PredicateMetadataProducerFactory
)
我们再回到server.Run(),看下算法是如何注册的,所谓注册也就是将算法存放到fitPredicateMap和priorityFunctionMap
// Run runs the Scheduler.
func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
// Apply algorithms based on feature gates.
// TODO: make configurable?
algorithmprovider.ApplyFeatureGates()
// 代码略
}
package algorithmprovider
import (
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults"
)
// ApplyFeatureGates applies algorithm by feature gates.
func ApplyFeatureGates() {
defaults.ApplyFeatureGates()
}
k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults
中的init()函数实现预选算法与优选算法的注册。
func init() {
// Register functions that extract metadata used by predicates and priorities computations.
factory.RegisterPredicateMetadataProducerFactory(
func(args factory.PluginFactoryArgs) algorithm.PredicateMetadataProducer {
return predicates.NewPredicateMetadataFactory(args.PodLister)
})
factory.RegisterPriorityMetadataProducerFactory(
func(args factory.PluginFactoryArgs) algorithm.PriorityMetadataProducer {
return priorities.NewPriorityMetadataFactory(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
})
// 注册 defaultPredicates()返回的默认的预选算法
// 注册 defaultPriorities()返回的默认的优选算法
registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
// 注册预选算法
factory.RegisterFitPredicate("PodFitsPorts", predicates.PodFitsHostPorts)
factory.RegisterFitPredicate(predicates.PodFitsHostPortsPred, predicates.PodFitsHostPorts)
factory.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)
factory.RegisterFitPredicate(predicates.HostNamePred, predicates.PodFitsHost)
factory.RegisterFitPredicate(predicates.MatchNodeSelectorPred, predicates.PodMatchNodeSelector)
// 注册优选算法
factory.RegisterPriorityConfigFactory(
"ServiceSpreadingPriority",
factory.PriorityConfigFactory{
MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{}, algorithm.EmptyStatefulSetLister{})
},
Weight: 1,
},
)
factory.RegisterPriorityFunction2("EqualPriority", core.EqualPriorityMap, nil, 1)
factory.RegisterPriorityFunction2("MostRequestedPriority", priorities.MostRequestedPriorityMap, nil, 1)
factory.RegisterPriorityFunction2(
"RequestedToCapacityRatioPriority",
priorities.RequestedToCapacityRatioResourceAllocationPriorityDefault().PriorityMap,
nil,
1)
}
从函数RegisterFitPredicateFactory()可以看出,注册算法主要是将算法名与算法函数加入 map
// RegisterFitPredicateFactory registers a fit predicate factory with the
// algorithm registry. Returns the name with which the predicate was registered.
func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFactory) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
// 将算法名与算法函数加入 map
fitPredicateMap[name] = predicateFactory
return name
}
predicateFactory表示一个func()类型,返回值是调度算法 algorithm.FitPredicate,FitPredicate 是一个函数类型,作用是:对于给定一个 pod 对象 与 nodeInfo 对象,判断 node 是否满足 pod 的某种调度条件。
// FitPredicateFactory produces a FitPredicate from the given args.
// FitPredicateFactory 返回一个 FitPredicate 函数方法
type FitPredicateFactory func(PluginFactoryArgs) algorithm.FitPredicate
// FitPredicate is a function that indicates if a pod fits into an existing node.
// The failure information is given by the error.
// FitPredicate 是一个函数类型,判断 node 是否满足 pod 的调度条件。
// 如果能满足调度,返回true,如果无法满足调度条件,返回false并返回失败原因
type FitPredicate func(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []PredicateFailureReason, error)
同理,RegisterPriorityConfigFactory
与PriorityFunction
的作用类似。
defaultPredicates()函数,注册了内置的各种预选算法
func defaultPredicates() sets.String {
return sets.NewString(
// Fit is determined by volume zone requirements.
factory.RegisterFitPredicateFactory(
predicates.NoVolumeZoneConflictPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo, args.StorageClassInfo)
},
),
// Fit is determined by whether or not there would be too many AWS EBS volumes attached to the node
factory.RegisterFitPredicateFactory(
predicates.MaxEBSVolumeCountPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by whether or not there would be too many GCE PD volumes attached to the node
factory.RegisterFitPredicateFactory(
predicates.MaxGCEPDVolumeCountPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by whether or not there would be too many Azure Disk volumes attached to the node
factory.RegisterFitPredicateFactory(
predicates.MaxAzureDiskVolumeCountPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.PVInfo, args.PVCInfo)
},
),
factory.RegisterFitPredicateFactory(
predicates.MaxCSIVolumeCountPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewCSIMaxVolumeLimitPredicate(args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by inter-pod affinity.
factory.RegisterFitPredicateFactory(
predicates.MatchInterPodAffinityPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewPodAffinityPredicate(args.NodeInfo, args.PodLister)
},
),
// Fit is determined by non-conflicting disk volumes.
factory.RegisterFitPredicate(predicates.NoDiskConflictPred, predicates.NoDiskConflict),
// GeneralPredicates are the predicates that are enforced by all Kubernetes components
// (e.g. kubelet and all schedulers)
factory.RegisterFitPredicate(predicates.GeneralPred, predicates.GeneralPredicates),
// Fit is determined by node memory pressure condition.
factory.RegisterFitPredicate(predicates.CheckNodeMemoryPressurePred, predicates.CheckNodeMemoryPressurePredicate),
// Fit is determined by node disk pressure condition.
factory.RegisterFitPredicate(predicates.CheckNodeDiskPressurePred, predicates.CheckNodeDiskPressurePredicate),
// Fit is determined by node pid pressure condition.
factory.RegisterFitPredicate(predicates.CheckNodePIDPressurePred, predicates.CheckNodePIDPressurePredicate),
// Fit is determined by node conditions: not ready, network unavailable or out of disk.
factory.RegisterMandatoryFitPredicate(predicates.CheckNodeConditionPred, predicates.CheckNodeConditionPredicate),
// Fit is determined based on whether a pod can tolerate all of the node's taints
factory.RegisterFitPredicate(predicates.PodToleratesNodeTaintsPred, predicates.PodToleratesNodeTaints),
// Fit is determined by volume topology requirements.
factory.RegisterFitPredicateFactory(
predicates.CheckVolumeBindingPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewVolumeBindingPredicate(args.VolumeBinder)
},
),
)
}
defaultPriorities()函数,注册了内置的各种预选算法
func defaultPriorities() sets.String {
return sets.NewString(
// spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node.
factory.RegisterPriorityConfigFactory(
"SelectorSpreadPriority",
factory.PriorityConfigFactory{
MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
},
Weight: 1,
},
),
// pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.)
// as some other pods, or, conversely, should not be placed in the same topological domain as some other pods.
factory.RegisterPriorityConfigFactory(
"InterPodAffinityPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewInterPodAffinityPriority(args.NodeInfo, args.NodeLister, args.PodLister, args.HardPodAffinitySymmetricWeight)
},
Weight: 1,
},
),
// Prioritize nodes by least requested utilization.
factory.RegisterPriorityFunction2("LeastRequestedPriority", priorities.LeastRequestedPriorityMap, nil, 1),
// Prioritizes nodes to help achieve balanced resource usage
factory.RegisterPriorityFunction2("BalancedResourceAllocation", priorities.BalancedResourceAllocationMap, nil, 1),
// Set this weight large enough to override all other priority functions.
// TODO: Figure out a better way to do this, maybe at same time as fixing #24720.
factory.RegisterPriorityFunction2("NodePreferAvoidPodsPriority", priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000),
// Prioritizes nodes that have labels matching NodeAffinity
factory.RegisterPriorityFunction2("NodeAffinityPriority", priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1),
// Prioritizes nodes that marked with taint which pod can tolerate.
factory.RegisterPriorityFunction2("TaintTolerationPriority", priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1),
// ImageLocalityPriority prioritizes nodes that have images requested by the pod present.
factory.RegisterPriorityFunction2("ImageLocalityPriority", priorities.ImageLocalityPriorityMap, nil, 1),
)
}
实例化scheduler对象
scheduler对象是运行kube-scheduler对象的组件的主对象,它包含了实现调度的所依赖的模块对象。包括schedulerCache、scheduler_queue、informer、eventhandler函数
server.Run()中的一段代码
*// Build a scheduler config from the provided algorithm source.
*schedulerConfig, err := NewSchedulerConfig©
源码位置: k8s.io/kubernetes/cmd/kube-scheduler/app/server.go
// NewSchedulerConfig creates the scheduler configuration. This is exposed for use by tests.
func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Config, error) {
var storageClassInformer storageinformers.StorageClassInformer
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// storageClassInformer
storageClassInformer = s.InformerFactory.Storage().V1().StorageClasses()
}
// Set up the configurator which can create schedulers from configs.
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
// 指定调度器名
SchedulerName: s.ComponentConfig.SchedulerName,
Client: s.Client,
// 实例化各种 informer
NodeInformer: s.InformerFactory.Core().V1().Nodes(),
PodInformer: s.PodInformer,
PvInformer: s.InformerFactory.Core().V1().PersistentVolumes(),
PvcInformer: s.InformerFactory.Core().V1().PersistentVolumeClaims(),
ReplicationControllerInformer: s.InformerFactory.Core().V1().ReplicationControllers(),
ReplicaSetInformer: s.InformerFactory.Apps().V1().ReplicaSets(),
StatefulSetInformer: s.InformerFactory.Apps().V1().StatefulSets(),
ServiceInformer: s.InformerFactory.Core().V1().Services(),
PdbInformer: s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: storageClassInformer,
HardPodAffinitySymmetricWeight: s.ComponentConfig.HardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
// 是否禁用抢占
DisablePreemption: s.ComponentConfig.DisablePreemption,
// 打分阶段多少比例的node参与打分
PercentageOfNodesToScore: s.ComponentConfig.PercentageOfNodesToScore,
BindTimeoutSeconds: *s.ComponentConfig.BindTimeoutSeconds,
})
// 代码略
}
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{ … }
NewConfigFactory()函数中,实例化了各种Lister、调度器缓存 schedulerCache、调度队列 podQueue、以及pod,node等资源对应的事件处理回调函数(AddFunc,UpdateFunc,DeletedFunc)
// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator {
stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
// storageClassInformer is only enabled through VolumeScheduling feature gate
var storageClassLister storagelisters.StorageClassLister
if args.StorageClassInformer != nil {
storageClassLister = args.StorageClassInformer.Lister()
}
c := &configFactory{
// 实例化 client
client: args.Client,
// podLister是从调度缓存 schedulerCache 从获取数据
podLister: schedulerCache,
// 实例化 待调度Pod的调度队列
podQueue: core.NewSchedulingQueue(stopEverything),
// 实例化各种Lister
pVLister: args.PvInformer.Lister(),
pVCLister: args.PvcInformer.Lister(),
serviceLister: args.ServiceInformer.Lister(),
controllerLister: args.ReplicationControllerInformer.Lister(),
replicaSetLister: args.ReplicaSetInformer.Lister(),
statefulSetLister: args.StatefulSetInformer.Lister(),
pdbLister: args.PdbInformer.Lister(),
storageClassLister: storageClassLister,
// 实例化调度器缓存 schedulerCache
schedulerCache: schedulerCache,
StopEverything: stopEverything,
// 指定调度器名称
schedulerName: args.SchedulerName,
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
enableEquivalenceClassCache: args.EnableEquivalenceClassCache,
disablePreemption: args.DisablePreemption,
percentageOfNodesToScore: args.PercentageOfNodesToScore,
}
c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
// scheduled pod cache
// 注册 AddEventHandler 回调函数,将已经调度过的 pod 放到调度器缓存 schedulerCache
args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return assignedNonTerminatedPod(t)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return assignedNonTerminatedPod(pod)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodToCache,
UpdateFunc: c.updatePodInCache,
DeleteFunc: c.deletePodFromCache,
},
},
)
// unscheduled pod queue
// 注册 AddEventHandler 回调函数,将待调度的 pod 放到调度器队列
args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return unassignedNonTerminatedPod(t) && responsibleForPod(t, args.SchedulerName)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return unassignedNonTerminatedPod(pod) && responsibleForPod(pod, args.SchedulerName)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodToSchedulingQueue,
UpdateFunc: c.updatePodInSchedulingQueue,
DeleteFunc: c.deletePodFromSchedulingQueue,
},
},
)
// ScheduledPodLister is something we provide to plug-in functions that
// they may need to call.
c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
// 注册 AddEventHandler 回调函数,将 node事件 放到调度缓存
args.NodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addNodeToCache,
UpdateFunc: c.updateNodeInCache,
DeleteFunc: c.deleteNodeFromCache,
},
)
c.nodeLister = args.NodeInformer.Lister()
args.PdbInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addPDBToCache,
UpdateFunc: c.updatePDBInCache,
DeleteFunc: c.deletePDBFromCache,
},
)
c.pdbLister = args.PdbInformer.Lister()
// On add and delete of PVs, it will affect equivalence cache items
// related to persistent volume
args.PvInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
AddFunc: c.onPvAdd,
UpdateFunc: c.onPvUpdate,
DeleteFunc: c.onPvDelete,
},
)
c.pVLister = args.PvInformer.Lister()
// This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
args.PvcInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onPvcAdd,
UpdateFunc: c.onPvcUpdate,
DeleteFunc: c.onPvcDelete,
},
)
c.pVCLister = args.PvcInformer.Lister()
// This is for ServiceAffinity: affected by the selector of the service is updated.
// Also, if new service is added, equivalence cache will also become invalid since
// existing pods may be "captured" by this service and change this predicate result.
args.ServiceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onServiceAdd,
UpdateFunc: c.onServiceUpdate,
DeleteFunc: c.onServiceDelete,
},
)
c.serviceLister = args.ServiceInformer.Lister()
// Existing equivalence cache should not be affected by add/delete RC/Deployment etc,
// it only make sense when pod is scheduled or deleted
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// Setup volume binder
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
args.StorageClassInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onStorageClassAdd,
DeleteFunc: c.onStorageClassDelete,
},
)
}
// Setup cache comparer
comparer := &cacheComparer{
podLister: args.PodInformer.Lister(),
nodeLister: args.NodeInformer.Lister(),
pdbLister: args.PdbInformer.Lister(),
cache: c.schedulerCache,
podQueue: c.podQueue,
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, compareSignal)
go func() {
for {
select {
case <-c.StopEverything:
return
case <-ch:
comparer.Compare()
}
}
}()
return c
}
从上面代码可以看出,调度器实例化模块时,将待调度的pod放入调度队列podQueue;而将已经调度过的Pod放入了调度缓存scheduleCache,同时也会将node信息放入调度缓存scheduleCache
。之后scheduler对象,调度任务会从调度队列取出一个待调度Pod,再查询scheduleCache中的pod,node信息,借助调度算法执行调度任务。才形成一个完整的调度逻辑。
启动http/https服务
Kube-scheduler组件会启动Http/https服务,作用是提供监控和运行状态采集,具体接口包括
- /healthz: 用于监控检查
- /metrics: 用于为prometheus提供监控采集
源码位置: k8s.io/kubernetes/cmd/kube-scheduler/app/server.go
// Run runs the Scheduler.
func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
// 代码略
// Start up the healthz server.
if c.InsecureServing != nil {
separateMetrics := c.InsecureMetricsServing != nil
handler := buildHandlerChain(newHealthzHandler(&c.ComponentConfig, separateMetrics), nil, nil)
if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
return fmt.Errorf("failed to start healthz server: %v", err)
}
}
if c.InsecureMetricsServing != nil {
handler := buildHandlerChain(newMetricsHandler(&c.ComponentConfig), nil, nil)
if err := c.InsecureMetricsServing.Serve(handler, 0, stopCh); err != nil {
return fmt.Errorf("failed to start metrics server: %v", err)
}
}
if c.SecureServing != nil {
handler := buildHandlerChain(newHealthzHandler(&c.ComponentConfig, false), c.Authentication.Authenticator, c.Authorization.Authorizer)
if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start healthz server: %v", err)
}
}
// 代码略
}
启动informer同步资源
在前面已经实例化了各种资源的informer对象,接下来就启动Informer同步资源。
源码位置: k8s.io/kubernetes/cmd/kube-scheduler/app/server.go
// Start all informers.
// 启动所有 informer
go c.PodInformer.Informer().Run(stopCh)
c.InformerFactory.Start(stopCh)
// Wait for all caches to sync before scheduling.
// 等待 informer 将资源同步到 cache
c.InformerFactory.WaitForCacheSync(stopCh)
controller.WaitForCacheSync("scheduler", stopCh, c.PodInformer.Informer().HasSynced)
在informer 启动后,通过前面配置的event handler,会将已经调度过的 pod 对象放入调度缓存schedulerCache,将待调度的 pod 放入调度队列schedule_queue.同时 node 信息也会放入调度缓存schedulerCache
源码位置: k8s.io/kubernetes/pkg/scheduler/factory/factory.go
// scheduled pod cache
args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return assignedNonTerminatedPod(t)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
// 过滤出调度过的 pod
return assignedNonTerminatedPod(pod)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodToCache,
UpdateFunc: c.updatePodInCache,
DeleteFunc: c.deletePodFromCache,
},
},
)
// unscheduled pod queue
args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return unassignedNonTerminatedPod(t) && responsibleForPod(t, args.SchedulerName)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
// 过滤出待调度的 pod
return unassignedNonTerminatedPod(pod) && responsibleForPod(pod, args.SchedulerName)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
return false
}
},
// 加入调度队列 SchedulingQueue
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodToSchedulingQueue,
UpdateFunc: c.updatePodInSchedulingQueue,
DeleteFunc: c.deletePodFromSchedulingQueue,
},
},
)
// ScheduledPodLister is something we provide to plug-in functions that
// they may need to call.
c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
// node 事件放入 schedulerCache
args.NodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addNodeToCache,
UpdateFunc: c.updateNodeInCache,
DeleteFunc: c.deleteNodeFromCache,
},
)
leader选举
如果配置文件中指定启用了,先进行leader选举,之后再由leader启动sched.run()
源码位置: k8s.io/kubernetes/cmd/kube-scheduler/app/server.go
// If leader election is enabled, run via LeaderElector until done and exit.
if c.LeaderElection != nil {
c.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
// leader选举后,会由leader启动run() scheduler对象。
OnStartedLeading: run,
OnStoppedLeading: func() {
utilruntime.HandleError(fmt.Errorf("lost master"))
},
}
leaderElector, err := leaderelection.NewLeaderElector(*c.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
// 启动leader选举
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
示意图
(图片来做互联网,如有侵权请联系作者)
启动调度器sched.run()
Sched.run()启动调度器,再启动调度器之前需要等待 informer 把资源数据同步到cache。之前启动一个后台协程,在协程中使用wait.Until,循环的执行函数sched.scheduleOne。 sched.scheduleOne函数是kube-scheduler组件的调度主逻辑,它的作用是从调度队列中取出一个pod,利用调度算法找到合适的node节点
源码位置: k8s.io/kubernetes/pkg/scheduler/scheduler.go
// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
// 启动调度器之前必须等待cache同步完成
if !sched.config.WaitForCacheSync() {
return
}
// 使用wait.Until,循环执行 sched.scheduleOne,
// sched.scheduleOne 的作用是执行一次pod调度
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
结论
本文介绍了kube-scheduler调度器执行调度任务的主要流程,并从源码走读了scheduler的启动过程。进过浅层次的分析,我们对kube-scheduler大致有一定的认识。kube-scheduler组件较为复杂,接下来我会对其展开分析,包括但不限于:调度算法与调度过程详解、优先级与抢占特性、亲和性调度、性能优化等。
参考文档
kubernetes源码-kube-scheduler 原理和源码分析(一)
kubernetes_scheduler_code
nodes-scheduler-default-modifying_nodes-scheduler-default