【K8s源码分析(三)】-K8s调度器调度周期介绍

news2024/11/26 16:37:31

本文首发在个人博客上,欢迎来踩!

本次分析参考的K8s版本是v1.27.0。

K8s的整体调度框架如下图所示。
请添加图片描述

调度框架顶层函数

K8s调度器调度的核心函数scheduleronepkg/scheduler/schedule_one.go:62,如下,这里将一些解释写在了注释里

// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {
    // 获取调度队列中的下一个 Pod 信息
    podInfo := sched.NextPod()
    // 如果 podInfo 或者其包含的 Pod 为 nil,说明调度队列关闭或者没有 Pod 需要调度,直接返回
    if podInfo == nil || podInfo.Pod == nil {
        return
    }
    // 获取 Pod 对象
    pod := podInfo.Pod
    // 为当前 Pod 选择一个调度框架(scheduler framework)
    fwk, err := sched.frameworkForPod(pod)
    if err != nil {
        // 这种情况不应该发生,因为我们只接受那些指定了匹配调度器名称的 Pod 进行调度
        klog.ErrorS(err, "Error occurred")
        return
    }
    // 如果跳过调度,则直接返回
    if sched.skipPodSchedule(fwk, pod) {
        return
    }

    // 记录尝试调度 Pod 的日志
    klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod))

    // 开始计时,尝试为 Pod 找到合适的宿主机
    start := time.Now()
    // 初始化调度周期状态
    state := framework.NewCycleState()
    // 设置是否记录插件指标的随机概率
    state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)

    // 初始化一个空的 podsToActivate 结构,这个结构将由插件填充或者保持为空
    podsToActivate := framework.NewPodsToActivate()
    // 将 podsToActivate 写入状态中
    state.Write(framework.PodsToActivateKey, podsToActivate)

    // 创建一个新的带有取消功能的上下文,用于调度周期
    schedulingCycleCtx, cancel := context.WithCancel(ctx)
    defer cancel()

    // 执行调度周期,尝试为 Pod 找到合适的宿主机
    scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
    // 如果调度失败,则调用失败处理器
    if !status.IsSuccess() {
        sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, status, scheduleResult.nominatingInfo, start)
        return
    }

    // 异步绑定 Pod 到其宿主机(可以这样做是因为上面的假设步骤)
    go func() {
        // 创建一个新的带有取消功能的上下文,用于绑定周期
        bindingCycleCtx, cancel := context.WithCancel(ctx)
        defer cancel()

        // 增加绑定阶段的 goroutine 指标
        metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
        defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()
        metrics.Goroutines.WithLabelValues(metrics.Binding).Inc()
        defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec()

        // 执行绑定周期,尝试将 Pod 绑定到宿主机
        status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
        // 如果绑定失败,则处理绑定周期错误
        if !status.IsSuccess() {
            sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status)
        }
    }()
}

这段代码的主要功能是:

  1. 从调度队列中获取下一个要调度的 Pod。
  2. 为 Pod 选择一个调度框架。
  3. 如果配置允许,跳过调度。
  4. 记录日志并开始调度周期。
  5. 如果调度成功,异步地尝试将 Pod 绑定到选定的宿主机。
  6. 如果调度或绑定失败,执行相应的错误处理逻辑。

此处也指明了两个周期,分别为调度周期schedulingCycle和绑定周期bindingCycle,绑定周期会在后面一节进行介绍,这里主要关注schedulingCycle

查看关键的schedulingCycle函数,在pkg/scheduler/schedule_one.go:120中,补充了部分注释。

// schedulingCycle tries to schedule a single Pod.
func (sched *Scheduler) schedulingCycle(
    ctx context.Context, // 调度上下文
    state *framework.CycleState, // 调度周期状态
    fwk framework.Framework, // 调度框架
    podInfo *framework.QueuedPodInfo, // 待调度的 Pod 信息
    start time.Time, // 调度开始时间
    podsToActivate *framework.PodsToActivate, // 待激活的 Pods
) (ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {
    // 获取待调度的 Pod
    pod := podInfo.Pod
    // 调用调度器的 SchedulePod 方法尝试调度 Pod
    scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
    if err != nil {
        // 如果没有可用节点,则返回错误状态
        if err == ErrNoNodesAvailable {
            status := framework.NewStatus(framework.UnschedulableAndUnresolvable).WithError(err)
            return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, status
        }

        // 如果错误是 FitError 类型,则说明 Pod 无法适应任何节点
        fitError, ok := err.(*framework.FitError)
        if !ok {
            klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
            return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, framework.AsStatus(err)
        }

        // 如果没有 PostFilter 插件,则不执行抢占
        if !fwk.HasPostFilterPlugins() {
            klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
            return ScheduleResult{}, podInfo, framework.NewStatus(framework.Unschedulable).WithError(err)
        }

        // 运行 PostFilter 插件,尝试使 Pod 在未来的调度周期中可调度
        result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
        msg := status.Message()
        fitError.Diagnosis.PostFilterMsg = msg
        if status.Code() == framework.Error {
            klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
        } else {
            klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
        }

        // 获取 PostFilter 插件返回的 NominatingInfo
        var nominatingInfo *framework.NominatingInfo
        if result != nil {
            nominatingInfo = result.NominatingInfo
        }
        return ScheduleResult{nominatingInfo: nominatingInfo}, podInfo, framework.NewStatus(framework.Unschedulable).WithError(err)
    }

    // 计算并记录调度算法的延迟
    metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
    // 假设 Pod 已经在给定节点上运行,这样子就不用等它实际绑定就可以执行后续的操作了
    assumedPodInfo := podInfo.DeepCopy()
    assumedPod := assumedPodInfo.Pod
    // 假设操作,设置 Pod 的 NodeName 为调度结果推荐的宿主机
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    if err != nil {
        // 如果假设操作失败,这可能是重试逻辑中的一个 BUG
        // 报告错误以便重新调度 Pod
        return ScheduleResult{nominatingInfo: clearNominatedNode},
            assumedPodInfo,
            framework.AsStatus(err)
    }

    // 运行预留插件的 Reserve 方法
    if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
        // 如果预留失败,触发取消预留以清理与预留 Pod 相关的资源
        fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
        if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
            klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
        }

        return ScheduleResult{nominatingInfo: clearNominatedNode},
            assumedPodInfo,
            sts
    }

    // 运行 "permit" 插件
    runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
    if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
        // 如果许可检查失败,触发取消预留以清理与预留 Pod 相关的资源
        fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
        if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
            klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
        }

        return ScheduleResult{nominatingInfo: clearNominatedNode},
            assumedPodInfo,
            runPermitStatus
    }

    // 成功调度周期结束后,查看是否有必要设置一些pod为可调度的状态
    if len(podsToActivate.Map) != 0 {
        sched.SchedulingQueue.Activate(podsToActivate.Map)
        // 激活后清空条目
        podsToActivate.Map = make(map[string]*v1.Pod)
    }

    // 返回调度结果
    return scheduleResult, assumedPodInfo, nil
}

主要流程包括:

  1. 尝试调度 Pod,并处理可能出现的错误。
  2. 如果调度失败,根据错误类型执行不同的逻辑,如处理节点不可用或 Pod 不适应任何节点的情况。
  3. 如果调度成功,记录调度算法的延迟,并提前假设 Pod 已经在推荐的节点上运行。
  4. 运行预留插件的 Reserve 方法,并处理预留成功或失败的情况。
  5. 运行抢占插件,并根据结果进行相应的处理。
  6. 如果有待转为active的 Pods,执行激活操作。
  7. 返回调度结果。

一般调度

这里最关键的是SchedulePod函数,在pkg/scheduler/schedule_one.go:334

// schedulePod tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError with reasons.
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
	trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
	defer trace.LogIfLong(100 * time.Millisecond)

	if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
		return result, err
	}
	trace.Step("Snapshotting scheduler cache and node infos done")

	if sched.nodeInfoSnapshot.NumNodes() == 0 {
		return result, ErrNoNodesAvailable
	}

	feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
	if err != nil {
		return result, err
	}
	trace.Step("Computing predicates done")

	if len(feasibleNodes) == 0 {
		return result, &framework.FitError{
			Pod:         pod,
			NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),
			Diagnosis:   diagnosis,
		}
	}

	// When only one node after predicate, just use it.
	if len(feasibleNodes) == 1 {
		return ScheduleResult{
			SuggestedHost:  feasibleNodes[0].Name,
			EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
			FeasibleNodes:  1,
		}, nil
	}

	priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
	if err != nil {
		return result, err
	}

	host, err := selectHost(priorityList)
	trace.Step("Prioritizing done")

	return ScheduleResult{
		SuggestedHost:  host,
		EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
		FeasibleNodes:  len(feasibleNodes),
	}, err
}

在这里我们就能具体的看到predicates筛选过程和Prioritizing打分过程,整体的逻辑也比较简单,首先是筛选出合适的node,如果只有一个node了,那么就直接返回这个node,如果有多个就进行打分,然后选择评分最高的node返回回去。

筛选过程

然后我们查看predicates筛选过程,其代码在pkg/scheduler/schedule_one.go:387中,如下,补充了一些注释

// Filters the nodes to find the ones that fit the pod based on the framework
// filter plugins and filter extenders.
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
    // 初始化诊断信息,用于记录调度过程中的详细信息
    diagnosis := framework.Diagnosis{
        NodeToStatusMap:      make(framework.NodeToStatusMap),
        UnschedulablePlugins: sets.NewString(),
    }

    // 获取所有节点的信息
    allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
    if err != nil {
        return nil, diagnosis, err
    }
    // 运行 "prefilter" 插件
    preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
    if !s.IsSuccess() {
        if !s.IsUnschedulable() {
            return nil, diagnosis, s.AsError()
        }
        // 如果 PreFilter 插件返回的状态是不可调度的,记录相关信息
        msg := s.Message()
        diagnosis.PreFilterMsg = msg
        klog.V(5).InfoS("Status after running PreFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
        // 如果有插件失败,记录失败的插件名称
        if s.FailedPlugin() != "" {
            diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin())
        }
        return nil, diagnosis, nil
    }

    // 如果 Pod 已经被提名到一个节点上(可能由于之前的抢占操作),
    // 这个节点很可能是唯一一个合适的节点,所以首先评估这个节点
    if len(pod.Status.NominatedNodeName) > 0 {
        feasibleNodes, err := sched.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
        if err != nil {
            klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
        }
        // 如果提名的节点通过了所有的过滤,调度器可以决定将这个节点分配给 Pod
        if len(feasibleNodes) != 0 {
            return feasibleNodes, diagnosis, nil
        }
    }

    // 根据 PreFilter 插件的结果,可能需要过滤掉一些节点
    nodes := allNodes
    if !preRes.AllNodes() {
        nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
        for n := range preRes.NodeNames {
            nInfo, err := sched.nodeInfoSnapshot.NodeInfos().Get(n)
            if err != nil {
                return nil, diagnosis, err
            }
            nodes = append(nodes, nInfo)
        }
    }
    // 寻找通过过滤的节点
    feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes)
    // 无论是否发生错误,都尝试更新下一次开始搜索节点的索引
    processedNodes := len(feasibleNodes) + len(diagnosis.NodeToStatusMap)
    sched.nextStartNodeIndex = (sched.nextStartNodeIndex + processedNodes) % len(nodes)
    if err != nil {
        return nil, diagnosis, err
    }

    // 检查过滤扩展器以找到更多通过过滤的节点
    feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
    if err != nil {
        return nil, diagnosis, err
    }
    // 返回所有通过过滤的节点
    return feasibleNodes, diagnosis, nil
}

这部分首先运行preFilter插件首先进行一些轻量级的检查,然后再运行filter插件进行正式筛选,然后在运行filter拓展插件。

这里我们主要关注filter插件的运行,查看其对应的findNodesThatPassFilters函数,在pkg/scheduler/schedule_one.go:475中,如下,补充了部分注释

// findNodesThatPassFilters finds the nodes that fit the filter plugins.
func (sched *Scheduler) findNodesThatPassFilters(
    ctx context.Context, // 调度上下文
    fwk framework.Framework, // 调度框架
    state *framework.CycleState, // 调度周期状态
    pod *v1.Pod, // 待调度的 Pod
    diagnosis framework.Diagnosis, // 调度诊断信息
    nodes []*framework.NodeInfo) ([]*v1.Node, error) { // 所有节点信息
    numAllNodes := len(nodes) // 所有节点的数量
    // 计算应该找到的可行节点数量
    numNodesToFind := sched.numFeasibleNodesToFind(fwk.PercentageOfNodesToScore(), int32(numAllNodes))

    // 创建一个足够大的列表来存储通过过滤的节点,以避免在运行时增长该列表
    feasibleNodes := make([]*v1.Node, numNodesToFind)

    // 如果框架没有过滤插件,直接使用所有节点
    if !fwk.HasFilterPlugins() {
        for i := range feasibleNodes {
            // 从上一个调度周期停止的地方开始检查节点
            feasibleNodes[i] = nodes[(sched.nextStartNodeIndex+i)%numAllNodes].Node()
        }
        return feasibleNodes, nil
    }

    // 用于并行处理时的错误通道
    errCh := parallelize.NewErrorChannel()
    var statusesLock sync.Mutex // 用于保护对诊断信息的并发访问
    var feasibleNodesLen int32 // 通过过滤的节点数量
    ctx, cancel := context.WithCancel(ctx) // 创建一个可取消的上下文
    defer cancel()

    // 检查每个节点是否通过过滤
    checkNode := func(i int) {
        nodeInfo := nodes[(sched.nextStartNodeIndex+i)%numAllNodes] // 获取节点信息
        status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo) // 运行过滤插件
        if status.Code() == framework.Error {
            errCh.SendErrorWithCancel(status.AsError(), cancel) // 发送错误并可能取消整个操作
            return
        }
        if status.IsSuccess() {
            // 如果节点通过过滤,将其添加到可行节点列表中
            length := atomic.AddInt32(&feasibleNodesLen, 1)
            if length > numNodesToFind {
                cancel() // 如果找到的节点超过了预定数量,取消剩余的检查
                atomic.AddInt32(&feasibleNodesLen, -1)
            } else {
                feasibleNodes[length-1] = nodeInfo.Node()
            }
        } else {
            // 如果节点没有通过过滤,记录其状态
            statusesLock.Lock()
            diagnosis.NodeToStatusMap[nodeInfo.Node().Name] = status
            diagnosis.UnschedulablePlugins.Insert(status.FailedPlugin())
            statusesLock.Unlock()
        }
    }

    // 记录开始检查节点的时间
    beginCheckNode := time.Now()
    statusCode := framework.Success
    defer func() {
        // 记录 Filter 扩展点的延迟
        metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode))
    }()

    // 并行检查所有节点,直到找到预定数量的可行节点或检查完所有节点
    fwk.Parallelizer().Until(ctx, numAllNodes, checkNode, metrics.Filter)

    // 截断可行节点列表到实际找到的节点数量
    feasibleNodes = feasibleNodes[:feasibleNodesLen]
    if err := errCh.ReceiveError(); err != nil {
        statusCode = framework.Error
        return feasibleNodes, err
    }
    return feasibleNodes, nil
}

注意到这里首先计算了需要筛选的node的数量,这主要是为了在大规模场景下降低筛选的数量,查看其对应的函数,在pkg/scheduler/schedule_one.go:548中,如下,补充了部分注释。

// numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
// its search for more feasible nodes.
func (sched *Scheduler) numFeasibleNodesToFind(percentageOfNodesToScore *int32, numAllNodes int32) (numNodes int32) {
    if numAllNodes < minFeasibleNodesToFind {
        // 如果所有节点的数量小于预设的最小可行节点数,则返回所有节点的数量
        return numAllNodes
    }

    // 使用框架(profile)中设置的百分比,如果没有设置,则使用全局的百分比
    var percentage int32
    if percentageOfNodesToScore != nil {
        percentage = *percentageOfNodesToScore
    } else {
        percentage = sched.percentageOfNodesToScore
    }

    if percentage == 0 {
        // 如果没有提供百分比,则使用默认的计算方式
        percentage = int32(50) - numAllNodes/125
        if percentage < minFeasibleNodesPercentageToFind {
            // 确保百分比不低于预设的最小值
            percentage = minFeasibleNodesPercentageToFind
        }
    }

    // 计算基于总节点数和百分比的节点数
    numNodes = numAllNodes * percentage / 100
    if numNodes < minFeasibleNodesToFind {
        // 如果计算出的节点数小于最小可行节点数,则返回最小值
        return minFeasibleNodesToFind
    }

    // 返回计算出的可行节点数
    return numNodes
}

然后定义了内部的checkNode函数,其输入是要检查的node 的id相对于sched.nextStartNodeIndex的偏移。注意这里使用了k8s内部定义的并行函数fwk.Parallelizer().Until,其定义如下,在pkg/scheduler/framework/parallelize/parallelism.go:56staging/src/k8s.io/client-go/util/workqueue/parallelizer.go:46中:

// Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms.
// A given operation will be a label that is recorded in the goroutine metric.
func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc, operation string) {
	goroutinesMetric := metrics.Goroutines.WithLabelValues(operation)
	withMetrics := func(piece int) {
		goroutinesMetric.Inc()
		doWorkPiece(piece)
		goroutinesMetric.Dec()
	}

	workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, withMetrics, workqueue.WithChunkSize(chunkSizeFor(pieces, p.parallelism)))
}
// ParallelizeUntil is a framework that allows for parallelizing N
// independent pieces of work until done or the context is canceled.
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options) {
	if pieces == 0 {
		return
	}
	o := options{}
	for _, opt := range opts {
		opt(&o)
	}
	chunkSize := o.chunkSize
	if chunkSize < 1 {
		chunkSize = 1
	}

	chunks := ceilDiv(pieces, chunkSize)
	toProcess := make(chan int, chunks)
	for i := 0; i < chunks; i++ {
		toProcess <- i
	}
	close(toProcess)

	var stop <-chan struct{}
	if ctx != nil {
		stop = ctx.Done()
	}
	if chunks < workers {
		workers = chunks
	}
	wg := sync.WaitGroup{}
	wg.Add(workers)
	for i := 0; i < workers; i++ {
		go func() {
			defer utilruntime.HandleCrash()
			defer wg.Done()
			for chunk := range toProcess {
				start := chunk * chunkSize
				end := start + chunkSize
				if end > pieces {
					end = pieces
				}
				for p := start; p < end; p++ {
					select {
					case <-stop:
						return
					default:
						doWorkPiece(p)
					}
				}
			}
		}()
	}
	wg.Wait()
}

checkNode函数内部检查对应的node是否能通过所有filter插件的过滤(RunFilterPluginsWithNominatedPods)如果通过就将筛选过的node数量+1,并记录相关的值,同时还会检查是否已经筛选到了足够的node,如果足够了,那么就发送取消信号,停止并行进程,不再继续筛选。

对于每个node进行筛选的函数RunFilterPluginsWithNominatedPodspkg/scheduler/framework/runtime/framework.go:816中,如下

func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(
    ctx context.Context, // 调度上下文
    state *framework.CycleState, // 当前周期状态
    pod *v1.Pod, // 待调度的 Pod
    info *framework.NodeInfo, // 节点信息
) *framework.Status {
    var status *framework.Status

    podsAdded := false
  // We run filters twice in some cases. If the node has greater or equal priority
	// nominated pods, we run them when those pods are added to PreFilter state and nodeInfo.
	// If all filters succeed in this pass, we run them again when these
	// nominated pods are not added. This second pass is necessary because some
	// filters such as inter-pod affinity may not pass without the nominated pods.
	// If there are no nominated pods for the node or if the first run of the
	// filters fail, we don't run the second pass.
	// We consider only equal or higher priority pods in the first pass, because
	// those are the current "pod" must yield to them and not take a space opened
	// for running them. It is ok if the current "pod" take resources freed for
	// lower priority pods.
	// Requiring that the new pod is schedulable in both circumstances ensures that
	// we are making a conservative decision: filters like resources and inter-pod
	// anti-affinity are more likely to fail when the nominated pods are treated
	// as running, while filters like pod affinity are more likely to fail when
	// the nominated pods are treated as not running. We can't just assume the
	// nominated pods are running because they are not running right now and in fact,
	// they may end up getting scheduled to a different node.
    // 我们可能需要两次运行过滤插件。如果节点上有优先级更高或相等的被提名的 Pods,
    // 我们会在这些 Pods 被添加到 PreFilter 状态和 nodeInfo 时运行它们。
    // 如果所有过滤插件在这一轮通过,我们会在这些被提名的 Pods 没有被添加的情况下再次运行它们。
    // 第二轮运行是必要的,因为一些过滤插件(如 Pod 亲和性)可能在没有被提名的 Pods 的情况下无法通过。
    // 如果节点没有被提名的 Pods 或者第一轮过滤插件失败,我们不会进行第二轮。
    // 我们只考虑第一轮中优先级相等或更高的 Pods,因为当前的 "pod" 必须为它们让路,而不是占用为它们运行而开放的空间。
    // 如果当前的 "pod" 占用了为低优先级 Pods 释放的资源,这是可以的。
    // 要求新的 Pod 在这两种情况下都是可调度的,确保我们做出的是保守的决定:
    // 像资源和 Pod 反亲和性这样的过滤器在将被提名的 Pods 视为运行时更有可能失败,
    // 而像 Pod 亲和性这样的过滤器在将被提名的 Pods 视为未运行时更有可能失败。
    // 我们不能仅仅假设被提名的 Pods 正在运行,因为它们现在并没有运行,事实上,
    // 它们最终可能会被调度到一个不同的节点上。
    for i := 0; i < 2; i++ {
        stateToUse := state
        nodeInfoToUse := info
        if i == 0 {
            // 第一轮:添加被提名的 Pods 到周期状态和节点信息
            var err error
            podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info)
            if err != nil {
                return framework.AsStatus(err)
            }
        } else if !podsAdded || !status.IsSuccess() {
            break
        }

        // 运行过滤插件
        status = f.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
        if !status.IsSuccess() && !status.IsUnschedulable() {
            return status
        }
    }

    return status
}

注意到这里执行了两遍筛选,主要是考虑到这个node上面可能存在一些预计要被调度过来的pod,在第一轮中会假设这些pod真的会被调度过来,然后查看是否满足pod筛选需求,在第二列会假设这些pod最后没有被调度过来,然后检查是否满足pod的筛选需求。因为在第一轮中可能会存在反亲和性要求,导致无法通过筛选,在第二轮中可能会存在亲和性要求,导致无法通过筛选,这是一种很保守的筛选方式。

利用各个插件进行筛选的函数(RunFilterPlugins)在pkg/scheduler/framework/runtime/framework.go:725中,如下

// RunFilterPlugins runs the set of configured Filter plugins for pod on
// the given node. If any of these plugins doesn't return "Success", the
// given node is not suitable for running pod.
// Meanwhile, the failure message and status are set for the given node.
func (f *frameworkImpl) RunFilterPlugins(
	ctx context.Context,
	state *framework.CycleState,
	pod *v1.Pod,
	nodeInfo *framework.NodeInfo,
) *framework.Status {
	for _, pl := range f.filterPlugins {
		if state.SkipFilterPlugins.Has(pl.Name()) {
			continue
		}
		metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), metrics.Filter, f.profileName).Inc()
		if status := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo); !status.IsSuccess() {
			if !status.IsUnschedulable() {
				// Filter plugins are not supposed to return any status other than
				// Success or Unschedulable.
				status = framework.AsStatus(fmt.Errorf("running %q filter plugin: %w", pl.Name(), status.AsError()))
			}
			status.SetFailedPlugin(pl.Name())
			return status
		}
	}

	return nil
}

这里的逻辑很简单,就是遍历各个筛选的插件,依次检查是否符合要求。

可以继续看runFilterPlugin这运行一个筛选插件进行检查的函数,在pkg/scheduler/framework/runtime/framework.go:750中。

func (f *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.FilterPlugin, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
	if !state.ShouldRecordPluginMetrics() {
		return pl.Filter(ctx, state, pod, nodeInfo)
	}
	startTime := time.Now()
	status := pl.Filter(ctx, state, pod, nodeInfo)
	f.metricsRecorder.ObservePluginDurationAsync(metrics.Filter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
	return status
}

主要也就是调用插件的Filter函数,具体插件的介绍后面再补充。

打分过程

打分的函数prioritizeNodespkg/scheduler/schedule_one.go 中,如下,补充了部分注释

func prioritizeNodes(
    ctx context.Context,
    extenders []framework.Extender,
    fwk framework.Framework,
    state *framework.CycleState,
    pod *v1.Pod,
    nodes []*v1.Node,
) ([]framework.NodePluginScores, error) {
    // 如果没有提供优先级配置,则所有节点的分数都设为 1。
    // 这是为了在所需的格式中生成优先级列表
    if len(extenders) == 0 && !fwk.HasScorePlugins() {
        result := make([]framework.NodePluginScores, 0, len(nodes))
        for i := range nodes {
            result = append(result, framework.NodePluginScores{
                Name:       nodes[i].Name,
                TotalScore: 1,
            })
        }
        return result, nil
    }

    // 运行 PreScore 插件。
    preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
    if !preScoreStatus.IsSuccess() {
        return nil, preScoreStatus.AsError()
    }

    // 运行 Score 插件。
    nodesScores, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
    if !scoreStatus.IsSuccess() {
        return nil, scoreStatus.AsError()
    }

    // 如果启用了详细日志记录,记录每个插件对每个节点的打分
    klogV := klog.V(10)
    if klogV.Enabled() {
        for _, nodeScore := range nodesScores {
            for _, pluginScore := range nodeScore.Scores {
                klogV.InfoS("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", pluginScore.Name, "node", nodeScore.Name, "score", pluginScore.Score)
            }
        }
    }

    // 如果有扩展器并且有节点,运行扩展器
    if len(extenders) != 0 && nodes != nil {
        allNodeExtendersScores := make(map[string]*framework.NodePluginScores, len(nodes))
        var mu sync.Mutex
        var wg sync.WaitGroup
        // 并发运行每个扩展器的优先级函数
        for i := range extenders {
            if !extenders[i].IsInterested(pod) {
                continue
            }
            wg.Add(1)
            go func(extIndex int) {
                defer wg.Done()
                metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
                metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
                defer func() {
                    metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
                    metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
                }()
                prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
                if err != nil {
                    klog.V(5).InfoS("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", extenders[extIndex].Name())
                    return
                }
                mu.Lock()
                defer mu.Unlock()
                for i := range *prioritizedList {
                    nodename := (*prioritizedList)[i].Host
                    score := (*prioritizedList)[i].Score
                    klogV.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", nodename, "score", score)
                    // 将扩展器的分数转换为调度器使用的分数范围
                    finalscore := score * weight * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
                    if allNodeExtendersScores[nodename] == nil {
                        allNodeExtendersScores[nodename] = &framework.NodePluginScores{
                            Name:   nodename,
                            Scores: make([]framework.PluginScore, 0, len(extenders)),
                        }
                    }
                    allNodeExtendersScores[nodename].Scores = append(allNodeExtendersScores[nodename].Scores, framework.PluginScore{
                        Name:  extenders[extIndex].Name(),
                        Score: finalscore,
                    })
                    allNodeExtendersScores[nodename].TotalScore += finalscore
                }
            }(i)
        }
        wg.Wait() // 等待所有扩展器完成
        // 将扩展器的分数添加到节点分数中
        for i := range nodesScores {
            if score, ok := allNodeExtendersScores[nodes[i].Name]; ok {
                nodesScores[i].Scores = append(nodesScores[i].Scores, score.Scores...)
                nodesScores[i].TotalScore += score.TotalScore
            }
        }
    }

    // 记录每个节点的最终分数
    if klogV.Enabled() {
        for i := range nodesScores {
            klogV.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", nodesScores[i].Name, "score", nodesScores[i].TotalScore)
        }
    }
    return nodesScores, nil
}

主要流程包括:

  1. 如果没有提供任何扩展器或打分插件,则为所有节点设置默认分数,并返回。
  2. 运行 PreScore 插件,为打分阶段做准备。
  3. 运行 Score 插件,获取每个节点的分数。
  4. 如果有扩展器并且有节点,则并发运行每个扩展器的优先级函数,获取扩展器为节点分配的分数。
  5. 将扩展器的分数转换为调度器使用的分数范围,并添加到节点分数中。
  6. 记录每个节点的最终分数。

这里补充一下其记录节点分数的结构体NodePluginScores,在文件pkg/scheduler/framework/interface.go:55中,其定义如下:

// NodePluginScores is a struct with node name and scores for that node.
type NodePluginScores struct {
	// Name is node name.
	Name string
	// Scores is scores from plugins and extenders.
	Scores []PluginScore
	// TotalScore is the total score in Scores.
	TotalScore int64
}

// PluginScore is a struct with plugin/extender name and score.
type PluginScore struct {
	// Name is the name of plugin or extender.
	Name  string
	Score int64
}

可以看到每个插件给node打分都是一个int64的类型,一个节点可能会被多个插件进行打分,最后再汇总。

再回到插件打分,这里我们主要关注关键的打分插件RunScorePlugins ,在pkg/scheduler/framework/runtime/framework.go:931中,如下,补充了部分注释

func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ns []framework.NodePluginScores, status *framework.Status) {
    startTime := time.Now()
    defer func() {
        // 记录打分扩展点的持续时间
        metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Score, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
    }()
    allNodePluginScores := make([]framework.NodePluginScores, len(nodes))
    numPlugins := len(f.scorePlugins) - state.SkipScorePlugins.Len()
    plugins := make([]framework.ScorePlugin, 0, numPlugins)
    pluginToNodeScores := make(map[string]framework.NodeScoreList, numPlugins)
    // 为每个插件创建一个节点分数列表
    for _, pl := range f.scorePlugins {
        if state.SkipScorePlugins.Has(pl.Name()) {
            continue
        }
        plugins = append(plugins, pl)
        pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes))
    }
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    errCh := parallelize.NewErrorChannel()

    if len(plugins) > 0 {
        // 并行地为每个节点运行每个插件的 Score 方法
        f.Parallelizer().Until(ctx, len(nodes), func(index int) {
            nodeName := nodes[index].Name
            for _, pl := range plugins {
                s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
                if !status.IsSuccess() {
                    err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
                    errCh.SendErrorWithCancel(err, cancel)
                    return
                }
                pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
                    Name:  nodeName,
                    Score: s,
                }
            }
        }, metrics.Score)
        if err := errCh.ReceiveError(); err != nil {
            return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err))
        }
    }

    // 并行地为每个打分插件运行 NormalizeScore 方法
    f.Parallelizer().Until(ctx, len(plugins), func(index int) {
        pl := plugins[index]
        if pl.ScoreExtensions() == nil {
            return
        }
        nodeScoreList := pluginToNodeScores[pl.Name()]
        status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)
        if !status.IsSuccess() {
            err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
            errCh.SendErrorWithCancel(err, cancel)
            return
        }
    }, metrics.Score)
    if err := errCh.ReceiveError(); err != nil {
        return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err))
    }

    // 并行地为每个打分插件应用分数权重,并构建 allNodePluginScores
    f.Parallelizer().Until(ctx, len(nodes), func(index int) {
        nodePluginScores := framework.NodePluginScores{
            Name:   nodes[index].Name,
            Scores: make([]framework.PluginScore, len(plugins)),
        }

        for i, pl := range plugins {
            weight := f.scorePluginWeight[pl.Name()]
            nodeScoreList := pluginToNodeScores[pl.Name()]
            score := nodeScoreList[index].Score

            if score > framework.MaxNodeScore || score < framework.MinNodeScore {
                err := fmt.Errorf("plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), score, framework.MinNodeScore, framework.MaxNodeScore)
                errCh.SendErrorWithCancel(err, cancel)
                return
            }
            weightedScore := score * int64(weight)
            nodePluginScores.Scores[i] = framework.PluginScore{
                Name:  pl.Name(),
                Score: weightedScore,
            }
            nodePluginScores.TotalScore += weightedScore
        }
        allNodePluginScores[index] = nodePluginScores
    }, metrics.Score)
    if err := errCh.ReceiveError(); err != nil {
        return nil, framework.AsStatus(fmt.Errorf("applying score defaultWeights on Score plugins: %w", err))
    }

    // 返回所有节点的插件分数
    return allNodePluginScores, nil
}

主要流程包括:

  1. 为每个插件创建一个节点分数列表。
  2. 使用并行处理为每个节点运行每个插件的 Score 方法。
  3. 为每个插件运行 NormalizeScore 方法,以标准化分数。
  4. 应用每个插件的分数权重,构建最终的节点分数。
  5. 返回各个节点的分数

查看插件打分的函数runScorePlugin,在pkg/scheduler/framework/runtime/framework.go:1025 中,如下。

func (f *frameworkImpl) runScorePlugin(ctx context.Context, pl framework.ScorePlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
	if !state.ShouldRecordPluginMetrics() {
		return pl.Score(ctx, state, pod, nodeName)
	}
	startTime := time.Now()
	s, status := pl.Score(ctx, state, pod, nodeName)
	f.metricsRecorder.ObservePluginDurationAsync(metrics.Score, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
	return s, status
}

可以看到主要是调用插件的Score方法。

一般调度的后期处理

PostFilter插件

schedulingCycle中可以看到如果上述的一般调度没有为Pod找到合适的node,并且错误不是没有合适的node,即ErrNoNodesAvailable 的话,就会检查是否存在有PostFilterPlugins,如果有就运行,即运行RunPostFilterPlugins函数,来进行相关的处理,例如释放一些资源,从而希望使得该pod在下一次调度时有机会成功调度,当然这被释放的资源也可能被其他不同的pod给占用了,但是这对系统是无害的,所以也不管。

RunPostFilterPlugins函数在pkg/scheduler/framework/runtime/framework.go:762中,如下所示

// RunPostFilterPlugins runs the set of configured PostFilter plugins until the first
// Success, Error or UnschedulableAndUnresolvable is met; otherwise continues to execute all plugins.
func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (_ *framework.PostFilterResult, status *framework.Status) {
	startTime := time.Now()
	defer func() {
		metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
	}()

	// `result` records the last meaningful(non-noop) PostFilterResult.
	var result *framework.PostFilterResult
	var reasons []string
	var failedPlugin string
	for _, pl := range f.postFilterPlugins {
		r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap)
		if s.IsSuccess() {
			return r, s
		} else if s.Code() == framework.UnschedulableAndUnresolvable {
			return r, s.WithFailedPlugin(pl.Name())
		} else if !s.IsUnschedulable() {
			// Any status other than Success, Unschedulable or UnschedulableAndUnresolvable is Error.
			return nil, framework.AsStatus(s.AsError()).WithFailedPlugin(pl.Name())
		} else if r != nil && r.Mode() != framework.ModeNoop {
			result = r
		}

		reasons = append(reasons, s.Reasons()...)
		// Record the first failed plugin unless we proved that
		// the latter is more relevant.
		if len(failedPlugin) == 0 {
			failedPlugin = pl.Name()
		}
	}

	return result, framework.NewStatus(framework.Unschedulable, reasons...).WithFailedPlugin(failedPlugin)
}

可以看到他就是遍历了所有的postFilter插件,然后使用函数runPostFilterPlugin运行这些插件,其在pkg/scheduler/framework/runtime/framework.go:796

func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.PostFilterPlugin, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
	if !state.ShouldRecordPluginMetrics() {
		return pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
	}
	startTime := time.Now()
	r, s := pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
	f.metricsRecorder.ObservePluginDurationAsync(metrics.PostFilter, pl.Name(), s.Code().String(), metrics.SinceInSeconds(startTime))
	return r, s
}

Reserve插件

得到想要调度到的pod后,可能需要执行一些资源预留的操作,就需要定义在reserve插件中,该插件对应的调用函数为RunReservePluginsReserve,在pkg/scheduler/framework/runtime/framework.go:1144

// RunReservePluginsReserve runs the Reserve method in the set of configured
// reserve plugins. If any of these plugins returns an error, it does not
// continue running the remaining ones and returns the error. In such a case,
// the pod will not be scheduled and the caller will be expected to call
// RunReservePluginsUnreserve.
func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
	startTime := time.Now()
	defer func() {
		metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Reserve, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
	}()
	for _, pl := range f.reservePlugins {
		status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName)
		if !status.IsSuccess() {
			err := status.AsError()
			klog.ErrorS(err, "Failed running Reserve plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
			return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err))
		}
	}
	return nil
}

这里也是遍历所有的reserve插件,如果有任意一个插件失败了那么就失败了。单个插件的调用函数在pkg/scheduler/framework/runtime/framework.go:1160中,如下

func (f *frameworkImpl) runReservePluginReserve(ctx context.Context, pl framework.ReservePlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
	if !state.ShouldRecordPluginMetrics() {
		return pl.Reserve(ctx, state, pod, nodeName)
	}
	startTime := time.Now()
	status := pl.Reserve(ctx, state, pod, nodeName)
	f.metricsRecorder.ObservePluginDurationAsync(metrics.Reserve, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
	return status
}

Permit插件

找到了要调度的pod后还需要运行permit插件,该插件主要用来查看记录是否还需要等待一下其他操作,例如抢占某个pod的资源,那么就需要等待被抢占pod的资源释放掉。

该插件对应的函数RunPermitPluginspkg/scheduler/framework/runtime/framework.go:1200中,如下

// RunPermitPlugins runs the set of configured permit plugins. If any of these
// plugins returns a status other than "Success" or "Wait", it does not continue
// running the remaining plugins and returns an error. Otherwise, if any of the
// plugins returns "Wait", then this function will create and add waiting pod
// to a map of currently waiting pods and return status with "Wait" code.
// Pod will remain waiting pod for the minimum duration returned by the permit plugins.
func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
    startTime := time.Now() // 记录permit插件开始运行的时间
    defer func() {
        // 记录permit插件的运行时间和最终状态
        metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Permit, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
    }()
    pluginsWaitTime := make(map[string]time.Duration) // 存储每个插件的等待时间
    statusCode := framework.Success // 初始化状态码为成功
    for _, pl := range f.permitPlugins {
        // 运行当前permit插件
        status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)
        if !status.IsSuccess() {
            if status.IsUnschedulable() {
                // 如果插件返回不可调度的状态,则记录日志并返回该状态
                klog.V(4).InfoS("Pod rejected by permit plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message())
                status.SetFailedPlugin(pl.Name()) // 设置失败的插件名称
                return status
            }
            if status.IsWait() {
                // 如果插件返回等待的状态,则记录等待时间,但不立即返回
                // 允许的最长等待时间由 maxTimeout 限制
                if timeout > maxTimeout {
                    timeout = maxTimeout
                }
                pluginsWaitTime[pl.Name()] = timeout
                statusCode = framework.Wait // 更新状态码为等待
            } else {
                // 如果插件返回错误状态,则记录错误日志并返回错误状态
                err := status.AsError()
                klog.ErrorS(err, "Failed running Permit plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
                return framework.AsStatus(fmt.Errorf("running Permit plugin %q: %w", pl.Name(), err)).WithFailedPlugin(pl.Name())
            }
        }
    }
    if statusCode == framework.Wait {
        // 如果任何插件返回等待状态,则创建并添加等待中的 Pod 到映射中,并返回等待状态
        waitingPod := newWaitingPod(pod, pluginsWaitTime)
        f.waitingPods.add(waitingPod)
        msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
        klog.V(4).InfoS("One or more plugins asked to wait and no plugin rejected pod", "pod", klog.KObj(pod))
        return framework.NewStatus(framework.Wait, msg)
    }
    // 如果所有插件都成功或返回等待,且没有插件拒绝 Pod,则返回 nil 表示没有错误
    return nil
}

主要流程包括:

  1. 记录开始运行许可插件的时间。
  2. 使用 defer 语句确保无论函数如何结束,都记录许可插件的运行时间和状态。
  3. 遍历所有的permit插件。
  4. 运行当前插件,并将结果状态保存到 status
  5. 检查状态:
    • 如果状态是成功的,则继续运行下一个插件。
    • 如果状态是不可调度的,则记录日志并返回该状态。
    • 如果状态是等待的,则记录等待时间,并更新状态码为等待,然后继续运行下一个插件。
    • 如果状态是错误,则记录错误日志,并返回错误状态。
  6. 如果任何插件返回等待状态,则创建等待中的 Pod 并添加到映射中,然后返回等待状态。
  7. 如果所有插件都成功或返回等待,且没有插件拒绝 Pod,则返回 nil

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1805545.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CTF Show MISC做题笔记

MISCX 30 题目压缩包为misc2.rar,其中包含三个文件:misc1.zip, flag.txt, hint.txt。其中后两个文件是加密的。 先解压出misc1.zip, 发现其中包含两个文件&#xff1a;misc.png和music.doc。其中后面文件是加密的。 解压出misc.png,发现图片尾部有消息&#xff1a;flag{flag…

Autosar Dem配置-Condition(TRC)的使用-基于ETAS软件

文章目录 前言Dem配置DemEnableConditionDemEnableConditionIdDemEnableConditionStatus DemEnableConditionGroupDemEventParameter 接口配置代码实现总结 前言 在车辆工作状态下&#xff0c;每个DTC检测可能都需要一个前提条件&#xff0c;否则如果任何条件下都可以进行DTC检…

【ARM Cache 与 MMU 系列文章 7.3 – ARMv8/v9 MMU 块描述符与页表描述符】

请阅读【ARM Cache 及 MMU/MPU 系列文章专栏导读】 及【嵌入式开发学习必备专栏】 上篇文章&#xff1a;【ARM Cache 系列文章 7.2 – ARMv8/v9 MMU 页表配置详细介绍 03 】 文章目录 MMU 块描述符与页描述符Block DescriptorBlock descriptor formatsBlock Entry 介绍Block En…

【C#】开发过程中记录问题

1.DateTimePicker控件获取时间 拖动控件,设置属性format为custom格式。例如我想获得20240101这种类型的string类型的数据: string DateTime = DateTimePicker.Value.ToString("yyyyMMdd");2.ComboBox下拉列表控件 默认为DropDown,下拉可修改。 DropDownList为下…

《Windows API每日一练》

2.2.8 第15练&#xff1a;处理WM_CLOSE消息 /*------------------------------------------------------------------------ 015 编程达人win32 API每日一练 第15个例子WM_CLOSE.C&#xff1a;回调函数---处理WM_CLOSE消息 WM_CLOSE消息 DestroyWindow函数 注意&#xf…

SprirngBoot+Vue房屋租赁系统(前后端分离)

技术栈 JavaSpringBootMavenMySQLMyBatisVueShiroElement-UI 角色对应功能 租客管理员 功能截图

Git【版本控制命令】

02 【本地库操作】 1.git的结构 2.Git 远程库——代码托管中心 2.1 git工作流程 代码托管中心用于维护 Git 的远程库。包括在局域网环境下搭建的 GitLab 服务器&#xff0c;以及在外网环境下的 GitHub 和 Gitee (码云)。 一般工作流程如下&#xff1a; 1&#xff0e;从远程…

[Cesium学习]

Popup弹窗 Cesium点位弹窗_cesium popup弹窗-CSDN博客 Cesium构造popup弹窗函数_cesium popup-CSDN博客 开发之家 - Cesium构造popup弹窗函数 GitHub - cesium-plugin/cesium-popup-es6: 气泡弹窗 热力图分析 // 创建Cesium Viewer实例 const viewer new Cesium.Viewer(c…

C#中使用Mysql批量新增数据 MySqlBulkCopy

在C#中使用MySqlBulkCopy类来批量复制数据到MySQL数据库&#xff0c;首先需要确保你的项目中已经引用了MySQL Connector。以下是使用MySqlBulkCopy的基本步骤&#xff1a; 1.安装MySQL Connector。 可以通过NuGet安装MySQL Connector&#xff1a; 2.在代码中引用必要的命名空间…

安装 JDK 17

安装包 百度网盘 提取码&#xff1a;6666 安装步骤 双击下载得到的安装包&#xff0c;开始安装&#xff1a; 正在安装&#xff1a; 安装完成&#xff1a; 安装路径下&#xff0c;多出来了很多新的内容。安装文件夹所包含的内容及作用&#xff1a; src 是 JDK 的源码包。类库…

DevExpress WPF中文教程:Grid - 如何向项目添加GridControl并绑定到数据

DevExpress WPF拥有120个控件和库&#xff0c;将帮助您交付满足甚至超出企业需求的高性能业务应用程序。通过DevExpress WPF能创建有着强大互动功能的XAML基础应用程序&#xff0c;这些应用程序专注于当代客户的需求和构建未来新一代支持触摸的解决方案。 无论是Office办公软件…

VueRouter路由与Vuex状态管理

前言 随着前端技术的快速发展和前后端分离架构的普及&#xff0c;单页面应用&#xff08;SPA&#xff09;已成为现代Web开发的主流。在SPA中&#xff0c;前端路由和状态管理扮演着至关重要的角色。Vue3作为当前流行的前端框架之一&#xff0c;提供了强大的路由&#xff08;Vue …

sqli-labs 靶场 less-7 第七关详解:OUTFILE注入与配置

SQLi-Labs是一个用于学习和练习SQL注入漏洞的开源应用程序。通过它&#xff0c;我们可以学习如何识别和利用不同类型的SQL注入漏洞&#xff0c;并了解如何修复和防范这些漏洞。Less 7 SQLI DUMB SERIES-7判断注入点 进入页面中&#xff0c;并输入数据查看结果。 发现空数据提…

【JS实战03】学生信息的添加与删除

说明&#xff1a;本文章提供相应源码&#xff0c;需要到主页资源栏下载&#xff0c;并搭配源码看本文档&#xff1b;重点阐述每个JS模块实现过程中的重难点问题。 一&#xff1a;录入模块 1 渲染数据思路 减少DOM相关操作&#xff0c;避免因过多的DOM操作造成程序运行速度的…

机车 - 安驾培训记录

1&#xff0c;先学倒车后扶车。 先断电。脚蹬在外的话要展开&#xff0c;防止推过头。 可以挂档就挂到1档。可以用皮套把刹车拉紧&#xff0c;或手捏在刹车上。防止下坡溜车或扶起时车不稳。 站在车倒向的一侧&#xff0c;车把向内&#xff0c;方便一手抓车把&#xff0c;一…

【iOS】内存泄漏检查及原因分析

目录 为什么要检测内存泄漏&#xff1f;什么是内存泄漏&#xff1f;内存泄漏排查方法1. 使用Zombie Objects2. 静态分析3. 动态分析方法定位修改Leaks界面分析Call Tree的四个选项&#xff1a; 内存泄漏原因分析1. Leaked Memory&#xff1a;应用程序未引用的、不能再次使用或释…

CTFHUB-SQL注入-字符型注入

目录 查询数据库名 查询数据库中的表名 查询表中数据 总结 此题目和上一题相似&#xff0c;一个是整数型注入&#xff0c;一个是字符型注入。字符型注入就是注入字符串参数&#xff0c;判断回显是否存在注入漏洞。因为上一题使用手工注入查看题目 flag &#xff0c;这里就不…

基于微信小程序的社区志愿者服务平台 _8xh87【已测试】

前言&#xff1a;&#x1f469;‍&#x1f4bb; 计算机行业的同仁们&#xff0c;大家好&#xff01;作为专注于Java领域多年的开发者&#xff0c;我非常理解实践案例的重要性。以下是一些我认为有助于提升你们技能的资源&#xff1a; &#x1f469;‍&#x1f4bb; SpringBoot…

轻松掌握系统概况,提升工作效率

作为 Linux 系统管理员,我们经常需要了解系统的基本状况,比如当前时间、系统版本、内核信息、CPU 型号、内存使用等等。但是每次手动执行各种命令来获取这些信息,无疑是一件非常繁琐的事情。 幸运的是,我们可以通过编写一个简单的 shell 脚本来一键获取这些系统信息。让我们一…

Linux用户和用户组的管理

目录 前言一、系统环境二、Linux用户组的管理2.1 新增用户组2.2 删除用户组2.3 修改用户组2.4 查看用户组 三、Linux用户的管理3.1 新增用户3.2 删除用户3.3 修改用户3.4 查看用户3.5 用户口令&#xff08;密码&#xff09;的管理 总结 前言 本篇文章介绍如何在Linux系统上实现…