controller-runtime源码学习

news2024/11/6 3:08:33

本文基于controller-runtime v0.11.2版本进行源码学习

kubebuilder、operator-sdk这些框架都是在controller-runtime基础上做了一层封装,方便开发者快速生成项目的脚手架,本文会以kuebuilder搭建工程作为使用controller-runtime的demo进行源码分析

1、kuebuilder搭建工程

创建脚手架工程

kubebuilder init --domain blog.com

创建API

kubebuilder create api --group apps --version v1alpha1 --kind Application

工程结构如下:

$ tree application-controller
application-controller
├── Dockerfile
├── Makefile
├── PROJECT
├── README.md
├── api
│   └── v1alpha1
│       ├── application_types.go # 自定义CRD的地方
│       ├── groupversion_info.go # GV的通用元数据用于CRD生成,以及Scheme创建方法
│       └── zz_generated.deepcopy.go # 包含代码生成的runtime.Object接口的实现,DeepCopy是核心
├── bin
│   └── controller-gen
├── config
│   ├── crd # 部署CRD的yaml
│   ├── default
│   ├── manager # 部署Controller的yaml
│   ├── prometheus
│   ├── rbac # Controller运行所需的RBAC权限
│   └── samples
├── controllers
│   ├── application_controller.go # 实现自定义Controller业务逻辑的地方
│   └── suite_test.go
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
└── main.go # Controller的入口

2、整体架构

  1. Manager管理多个Controller的运行,负责初始化Cache、Client等公共依赖,并提供各个runnbale使用
  2. Client封装了对资源的CRUD操作,其中读操作实际查询的是本地Cache,写操作直接访问API Server
  3. Cache负责在Controller进程里面根据Scheme同步API Server中所有该Controller关心的资源对象,其核心是相关Resource的Informer,Informer会负责监听对应Resource的创建/删除/更新操作,以触发Controller的Reconcile逻辑
  4. Controller是控制器的业务逻辑所在的地方,一个Manager可能会有多个Controller,我们一般只需要实现Reconcile方法即可。上图的Predicate是事件过滤器,我们可以在Controller中过滤掉我们不关心的事件信息
  5. WebHook是我们准入控制实现的地方了,主要是有两类接口,一个是MutatingAdmissionWebhook需要实现Defaulter接口,一个是ValidatingAdmissionWebhook需要实现Validator接口

3、main.go

kubebuilder生成的main.go是整个项目的入口,代码如下:

var (
	scheme   = runtime.NewScheme()
	setupLog = ctrl.Log.WithName("setup")
)

func init() {
	utilruntime.Must(clientgoscheme.AddToScheme(scheme))

	utilruntime.Must(appsv1alpha1.AddToScheme(scheme))
	//+kubebuilder:scaffold:scheme
}

func main() {
	var metricsAddr string
	var enableLeaderElection bool
	var probeAddr string
	flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
	flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
	flag.BoolVar(&enableLeaderElection, "leader-elect", false,
		"Enable leader election for controller manager. "+
			"Enabling this will ensure there is only one active controller manager.")
	opts := zap.Options{
		Development: true,
	}
	opts.BindFlags(flag.CommandLine)
	flag.Parse()

	ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

	// 1)init Manager
	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
		Scheme:                 scheme,
		MetricsBindAddress:     metricsAddr,
		Port:                   9443,
		HealthProbeBindAddress: probeAddr,
		LeaderElection:         enableLeaderElection,
		LeaderElectionID:       "fcd03b9b.blog.com",
	})
	if err != nil {
		setupLog.Error(err, "unable to start manager")
		os.Exit(1)
	}

	// 2)init Controller
	if err = (&controllers.ApplicationReconciler{
		Client: mgr.GetClient(),
		Scheme: mgr.GetScheme(),
	}).SetupWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "Application")
		os.Exit(1)
	}
	//+kubebuilder:scaffold:builder

	if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
		setupLog.Error(err, "unable to set up health check")
		os.Exit(1)
	}
	if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
		setupLog.Error(err, "unable to set up ready check")
		os.Exit(1)
	}

	setupLog.Info("starting manager")
	// 3)start Manager
	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
		setupLog.Error(err, "problem running manager")
		os.Exit(1)
	}
}

main.go主要逻辑如下:

  1. 初始化Manager
  2. 将Manager的Client传给Controller,并且调用SetupWithManager()方法传入Manager进行Controller的初始化
  3. 启动Manager

4、Manager

Manager是一个用于初始化共享依赖关系的接口,接口定义如下:

// pkg/manager/manager.go
// Manager初始化共享的依赖关系,比如Cache和Client,并将他们提供给Runnables
type Manager interface {
	// cluster中包含了Cache和Client
	cluster.Cluster

	// Add将在组件上设置所需的依赖关系,并在调用Start时启动组件
	Add(Runnable) error

	// Start启动所有已注册的控制器
	Start(ctx context.Context) error
	...
}

type Runnable interface {
  
	Start(context.Context) error
}

Manager可以管理Runnable的生命周期(添加/启动),Controller只是Runnable的一个特例

  1. 持有Runnable共同的依赖:client、cache、scheme等
  2. 提供了getter(例如GetClient()),还有一个简单的依赖注入机制(runtime/inject)
  3. 支持领导者选举,只需用选项指定即可,还提供了一个用于优雅关闭的信号处理程序

1)、Manager实例化

// pkg/manager/manager.go
func New(config *rest.Config, options Options) (Manager, error) {
	// 1)设置options属性的默认值
	options = setOptionsDefaults(options)

	// 2)创建cluster
	cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
		clusterOptions.Scheme = options.Scheme
		clusterOptions.MapperProvider = options.MapperProvider
		clusterOptions.Logger = options.Logger
		clusterOptions.SyncPeriod = options.SyncPeriod
		clusterOptions.Namespace = options.Namespace
		clusterOptions.NewCache = options.NewCache
		clusterOptions.NewClient = options.NewClient
		clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
		clusterOptions.DryRunClient = options.DryRunClient
		clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck
	})
	if err != nil {
		return nil, err
	}

	recorderProvider, err := options.newRecorderProvider(config, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)
	if err != nil {
		return nil, err
	}

	leaderConfig := options.LeaderElectionConfig
	if leaderConfig == nil {
		leaderConfig = rest.CopyConfig(config)
	}
	resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{
		LeaderElection:             options.LeaderElection,
		LeaderElectionResourceLock: options.LeaderElectionResourceLock,
		LeaderElectionID:           options.LeaderElectionID,
		LeaderElectionNamespace:    options.LeaderElectionNamespace,
	})
	if err != nil {
		return nil, err
	}

	metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)
	if err != nil {
		return nil, err
	}

	metricsExtraHandlers := make(map[string]http.Handler)

	healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)
	if err != nil {
		return nil, err
	}

	errChan := make(chan error)
	// 3)创建runnables
	runnables := newRunnables(errChan)

	// 4)将cluster和runnables赋值给controllerManager
	return &controllerManager{
		stopProcedureEngaged:          pointer.Int64(0),
		cluster:                       cluster,
		runnables:                     runnables,
		errChan:                       errChan,
		recorderProvider:              recorderProvider,
		resourceLock:                  resourceLock,
		metricsListener:               metricsListener,
		metricsExtraHandlers:          metricsExtraHandlers,
		controllerOptions:             options.Controller,
		logger:                        options.Logger,
		elected:                       make(chan struct{}),
		port:                          options.Port,
		host:                          options.Host,
		certDir:                       options.CertDir,
		webhookServer:                 options.WebhookServer,
		leaseDuration:                 *options.LeaseDuration,
		renewDeadline:                 *options.RenewDeadline,
		retryPeriod:                   *options.RetryPeriod,
		healthProbeListener:           healthProbeListener,
		readinessEndpointName:         options.ReadinessEndpointName,
		livenessEndpointName:          options.LivenessEndpointName,
		gracefulShutdownTimeout:       *options.GracefulShutdownTimeout,
		internalProceduresStop:        make(chan struct{}),
		leaderElectionStopped:         make(chan struct{}),
		leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
	}, nil
}

func newRunnables(errChan chan error) *runnables {
	// 包含Webhooks、Caches、需要支持领导者选举的、Others四种类型的RunnableGroup
	return &runnables{
		Webhooks:       newRunnableGroup(errChan),
		Caches:         newRunnableGroup(errChan),
		LeaderElection: newRunnableGroup(errChan),
		Others:         newRunnableGroup(errChan),
	}
}

New()方法用于Manager实例化,主要逻辑如下:

  1. 调用setOptionsDefaults(),设置options属性的默认值
  2. 创建cluster
  3. 创建runnables,runnables分为Webhooks、Caches、需要支持领导者选举的、Others四种类型的RunnableGroup
  4. 将cluster和runnables赋值给controllerManager,controllerManager结构体是Manager接口的一个具体实现

2)、Manager启动

// pkg/manager/internal.go
func (cm *controllerManager) Start(ctx context.Context) (err error) {
	cm.Lock()
	if cm.started {
		cm.Unlock()
		return errors.New("manager already started")
	}
	var ready bool
	defer func() {
		if !ready {
			cm.Unlock()
		}
	}()

	cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

	stopComplete := make(chan struct{})
	defer close(stopComplete)
	defer func() {
		stopErr := cm.engageStopProcedure(stopComplete)
		if stopErr != nil {
			if err != nil {
				err = kerrors.NewAggregate([]error{err, stopErr})
			} else {
				err = stopErr
			}
		}
	}()

	if err := cm.add(cm.cluster); err != nil {
		return fmt.Errorf("failed to add cluster to runnables: %w", err)
	}

	if cm.metricsListener != nil {
		cm.serveMetrics()
	}

	if cm.healthProbeListener != nil {
		cm.serveHealthProbes()
	}

	// 启动webhooks的runnables
	if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
		if err != wait.ErrWaitTimeout {
			return err
		}
	}

	// 启动caches的runnables
	if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
		if err != wait.ErrWaitTimeout {
			return err
		}
	}

	// 启动不需要领导者选举的runnables
	if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
		if err != wait.ErrWaitTimeout {
			return err
		}
	}

	{
		ctx, cancel := context.WithCancel(context.Background())
		cm.leaderElectionCancel = cancel
		go func() {
			if cm.resourceLock != nil {
				// 启动领导者选举
				if err := cm.startLeaderElection(ctx); err != nil {
					cm.errChan <- err
				}
			} else {
				// 启动需要领导者选举的runnables
				if err := cm.startLeaderElectionRunnables(); err != nil {
					cm.errChan <- err
				}
				close(cm.elected)
			}
		}()
	}

	ready = true
	cm.Unlock()
	select {
	case <-ctx.Done():
		return nil
	case err := <-cm.errChan:
		return err
	}
}

Start()用于Manager启动,其实就是去启动所有添加到Manager中的Runnable(Controller)

3)、小结

1)创建Manager

  • 创建并注册scheme
  • 创建cluster
  • 为runnable创建map

2)注册Runnable

添加runnable到map

3)启动Manager

启动runnable

5、Controller

1)、Controller创建

kuebuilder生成的Controller的具体实现:

type ApplicationReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

// 实现Controller的业务逻辑
func (r *ApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	_ = log.FromContext(ctx)

	return ctrl.Result{}, nil
}

// 将Controller添加到Manager中	
func (r *ApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&appsv1alpha1.Application{}).
		Complete(r)
}

ApplicationReconciler包含两个方法:

  1. Reconcile()用于实现Controller的业务逻辑
  2. SetupWithManager()将Controller添加到Manager中

SetupWithManager()中先调用了ctrl.NewControllerManagedBy()返回了一个新的控制器构造器Builder对象:

// pkg/builder/controller.go
// 控制器构造器
type Builder struct {
	forInput         ForInput
	ownsInput        []OwnsInput
	watchesInput     []WatchesInput
	mgr              manager.Manager
	globalPredicates []predicate.Predicate
	ctrl             controller.Controller
	ctrlOptions      controller.Options
	name             string
}

// ControllerManagedBy返回一个新的控制器构造器
func ControllerManagedBy(m manager.Manager) *Builder {
	return &Builder{mgr: m}
}

controller-runtime封装了一个Builde的结构体用来生成Controller,将Manager传递给这个构造器,然后是调用构造器的For()方法:

// pkg/builder/controller.go
// ForInput表示For方法设置的信息
type ForInput struct {
	object           client.Object
	predicates       []predicate.Predicate
	objectProjection objectProjection
	err              error
}

// For方法定义了reconciled的对象类型
// 并配置ControllerManagedBy通过调谐对象来响应create/delete/update事件
// 调用For函数相当于调用Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{})
func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
	if blder.forInput.object != nil {
		blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation")
		return blder
	}
	input := ForInput{object: object}
	for _, opt := range opts {
		opt.ApplyToFor(&input)
	}

	blder.forInput = input
	return blder
}

For()方法就是用来定义我们要处理的对象类型的,然后就是最重要的Complete()方法:

// pkg/builder/controller.go
func (blder *Builder) Complete(r reconcile.Reconciler) error {
	//  调用Build函数构建Controller
	_, err := blder.Build(r)
	return err
}

func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
	if r == nil {
		return nil, fmt.Errorf("must provide a non-nil Reconciler")
	}
	if blder.mgr == nil {
		return nil, fmt.Errorf("must provide a non-nil Manager")
	}
	if blder.forInput.err != nil {
		return nil, blder.forInput.err
	}
	if blder.forInput.object == nil {
		return nil, fmt.Errorf("must provide an object for reconciliation")
	}

	// 配置ControllerManagedBy
	if err := blder.doController(r); err != nil {
		return nil, err
	}

	// 配置Watch
	if err := blder.doWatch(); err != nil {
		return nil, err
	}

	return blder.ctrl, nil
}

Complete()方法通过调用Build函数来构建Controller,其中比较重要的就是doController()doWatch()两个方法,先来看doController()方法:

// pkg/builder/controller.go
func (blder *Builder) doController(r reconcile.Reconciler) error {
	globalOpts := blder.mgr.GetControllerOptions()

	ctrlOptions := blder.ctrlOptions
	if ctrlOptions.Reconciler == nil {
		ctrlOptions.Reconciler = r
	}

	// 获取gvk
	gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())
	if err != nil {
		return err
	}

	if ctrlOptions.MaxConcurrentReconciles == 0 {
		groupKind := gvk.GroupKind().String()

		if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 {
			ctrlOptions.MaxConcurrentReconciles = concurrency
		}
	}

	if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil {
		ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout
	}

	if ctrlOptions.Log.GetSink() == nil {
		ctrlOptions.Log = blder.mgr.GetLogger()
	}
	ctrlOptions.Log = ctrlOptions.Log.WithValues("reconciler group", gvk.Group, "reconciler kind", gvk.Kind)

	// 构造Controller
	blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions)
	return err
}

doController()方法通过获取资源对象的GVK来获取Controller的名称,最后通过一个newController()方法来实例化一个真正的Controller:

// pkg/controller/controller.go
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
	c, err := NewUnmanaged(name, mgr, options)
	if err != nil {
		return nil, err
	}

	// 将controller注册manager中
	return c, mgr.Add(c)
}

func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
	if options.Reconciler == nil {
		return nil, fmt.Errorf("must specify Reconciler")
	}

	if len(name) == 0 {
		return nil, fmt.Errorf("must specify Name for Controller")
	}

	if options.Log.GetSink() == nil {
		options.Log = mgr.GetLogger()
	}

	if options.MaxConcurrentReconciles <= 0 {
		options.MaxConcurrentReconciles = 1
	}

	if options.CacheSyncTimeout == 0 {
		options.CacheSyncTimeout = 2 * time.Minute
	}

	if options.RateLimiter == nil {
		options.RateLimiter = workqueue.DefaultControllerRateLimiter()
	}

	// 在Reconciler中注入依赖关系
	if err := mgr.SetFields(options.Reconciler); err != nil {
		return nil, err
	}

	// 创建Controller并配置依赖关系
	return &controller.Controller{
		Do: options.Reconciler,
		MakeQueue: func() workqueue.RateLimitingInterface {
			return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
		},
		MaxConcurrentReconciles: options.MaxConcurrentReconciles,
		CacheSyncTimeout:        options.CacheSyncTimeout,
		SetFields:               mgr.SetFields,
		Name:                    name,
		Log:                     options.Log.WithName("controller").WithName(name),
		RecoverPanic:            options.RecoverPanic,
	}, nil
}

NewUnmanaged()方法是真正实例化Controller的地方,Controller实例化完成后,又通过mgr.Add()方法将Controller添加到Manager中去进行管理

Controller结构体定义如下:

// pkg/internal/controller/controller.go
type Controller struct {
  
	Name string

	// 可以运行的最大并发Reconciles数量,默认值为1
	MaxConcurrentReconciles int

	// 定义了Reconcile()方法,包含了Controller同步的业务逻辑
	// Reconcile()能在任意时刻被调用,接收一个对象的Name与Namespace,并同步集群当前实际状态至该对象被设置的期望状态
	Do reconcile.Reconciler

	// 用于在Controller启动时,创建工作队列
	MakeQueue func() workqueue.RateLimitingInterface

	Queue workqueue.RateLimitingInterface

	// 用来将依赖关系注入到其他对象,比如Sources、EventHandlers以及Predicates
	SetFields func(i interface{}) error

	mu sync.Mutex

	// Controller是否已经启动
	Started bool

	ctx context.Context

	CacheSyncTimeout time.Duration

	// 定义了一组watch操作的属性,会在Controller启动时,根据属性进行watch操作
	startWatches []watchDescription

	Log logr.Logger

	RecoverPanic bool
}

// watchDescription包含Event的源Source、event的入队方法EventHandler以及Event的过滤方法Predicate
type watchDescription struct {
	src        source.Source
	handler    handler.EventHandler
	predicates []predicate.Predicate
}

2)、Controller启动

Controller实现了Runnable接口的Start()方法,Controller注册到Manager之后,Manager启动的时候会调用Controller的Start()方法:

// pkg/internal/controller/controller.go
func (c *Controller) Start(ctx context.Context) error {
	c.mu.Lock()
	if c.Started {
		return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
	}

	c.initMetrics()

	c.ctx = ctx

	// 创建工作队列
	c.Queue = c.MakeQueue()
	go func() {
		<-ctx.Done()
		c.Queue.ShutDown()
	}()

	wg := &sync.WaitGroup{}
	err := func() error {
		defer c.mu.Unlock()

		defer utilruntime.HandleCrash()

		for _, watch := range c.startWatches {
			c.Log.Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))

			// 启动startWatches中的watch
			if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
				return err
			}
		}

		c.Log.Info("Starting Controller")

		for _, watch := range c.startWatches {
			syncingSource, ok := watch.src.(source.SyncingSource)
			if !ok {
				continue
			}

			if err := func() error {
				sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
				defer cancel()

				if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
					err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
					c.Log.Error(err, "Could not wait for Cache to sync")
					return err
				}

				return nil
			}(); err != nil {
				return err
			}
		}

		c.startWatches = nil

		c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
		wg.Add(c.MaxConcurrentReconciles)
		for i := 0; i < c.MaxConcurrentReconciles; i++ {
			go func() {
				defer wg.Done()
				// 启动workers来处理资源
				for c.processNextWorkItem(ctx) {
				}
			}()
		}

		c.Started = true
		return nil
	}()
	if err != nil {
		return err
	}

	<-ctx.Done()
	c.Log.Info("Shutdown signal received, waiting for all workers to finish")
	wg.Wait()
	c.Log.Info("All workers finished")
	return nil
}

Start()方法中调用c.processNextWorkItem()来启动workers来处理资源对象,processNextWorkItem()方法代码如下:

// pkg/internal/controller/controller.go
// processNextWorkItem将从工作队列中弹出一个元素,并尝试通过调用reconcileHandler来处理它
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
	// 从队列中弹出元素
	obj, shutdown := c.Queue.Get()
	if shutdown {
		return false
	}

	// 标记为处理完成
	defer c.Queue.Done(obj)

	ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
	defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)

	// 调用reconcileHandler进行元素处理
	c.reconcileHandler(ctx, obj)
	return true
}

func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
	reconcileStartTS := time.Now()
	defer func() {
		c.updateMetrics(time.Since(reconcileStartTS))
	}()

	req, ok := obj.(reconcile.Request)
	if !ok {
		c.Queue.Forget(obj)
		c.Log.Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
		return
	}

	log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
	ctx = logf.IntoContext(ctx, log)

	// 调用Reconciler函数来处理这个元素,也就是我们真正去编写业务逻辑的地方
	result, err := c.Reconcile(ctx, req)
	switch {
	case err != nil:
		// 如果业务逻辑处理出错,重新添加到限速队列中去
		c.Queue.AddRateLimited(req)
		ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
		log.Error(err, "Reconciler error")
	case result.RequeueAfter > 0:
		// 如果Reconcile处理结果中包含大于0的RequeueAfter,忘记元素,然后延迟加入队列
		c.Queue.Forget(obj)
		c.Queue.AddAfter(req, result.RequeueAfter)
		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
	case result.Requeue:
		// 重新加入队列
		c.Queue.AddRateLimited(req)
		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
	default:
		// 最后如果没有发生错误,就会Forget这个元素,这样直到发送另一个变化它就不会再被排队了
		c.Queue.Forget(obj)
		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
	}
}

func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
	if c.RecoverPanic {
		defer func() {
			if r := recover(); r != nil {
				for _, fn := range utilruntime.PanicHandlers {
					fn(r)
				}
				err = fmt.Errorf("panic: %v [recovered]", r)
			}
		}()
	}
	log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
	ctx = logf.IntoContext(ctx, log)
	return c.Do.Reconcile(ctx, req)
}

reconcileHandler()方法就是我们真正执行元素业务处理的地方,函数中包含了事件处理以及错误处理,真正的事件处理是通过c.Do.Reconcile()暴露给开发者的,所以对于开发者来说,只需要在Reconcile()方法中去处理业务逻辑就可以了

根据c.Do.Reconcile()方法的返回值来判断是否需要将元素重新加入队列进行处理:

  • 如果返回错误,则将元素重新添加到限速队列中
  • 如果返回的result.RequeueAfter > 0,则先将元素忘记,然后在result.RequeueAfter时间后加入到队列中
  • 如果返回result.Requeue,则直接将元素重新加入到限速队列中
  • 如果正常返回,则直接忘记这个元素

Controller启动流程:

3)、Controller监听事件

在讲解Controller创建的部分时提到,Complete()方法通过调用Build函数来构建Controller,里面会调用doController()doWatch()两个方法,doWatch()方法代码如下:

// pkg/builder/controller.go
func (blder *Builder) doWatch() error {
	// Reconcile type
	typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
	if err != nil {
		return err
	}
	// 创建source.Source
	src := &source.Kind{Type: typeForSrc}
	// 创建handler.EventHandler
	hdler := &handler.EnqueueRequestForObject{}
	allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
	// 调用Controller的Watch方法
	if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
		return err
	}

	for _, own := range blder.ownsInput {
		typeForSrc, err := blder.project(own.object, own.objectProjection)
		if err != nil {
			return err
		}
		src := &source.Kind{Type: typeForSrc}
		hdler := &handler.EnqueueRequestForOwner{
			OwnerType:    blder.forInput.object,
			IsController: true,
		}
		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
		allPredicates = append(allPredicates, own.predicates...)
		if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
			return err
		}
	}

	for _, w := range blder.watchesInput {
		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
		allPredicates = append(allPredicates, w.predicates...)

		if srckind, ok := w.src.(*source.Kind); ok {
			typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
			if err != nil {
				return err
			}
			srckind.Type = typeForSrc
		}

		if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
			return err
		}
	}
	return nil
}

doWatch()方法会调用Controller的Watch()方法,Controller的Watch()方法代码如下:

// pkg/internal/controller/controller.go
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	if err := c.SetFields(src); err != nil {
		return err
	}
	if err := c.SetFields(evthdler); err != nil {
		return err
	}
	for _, pr := range prct {
		if err := c.SetFields(pr); err != nil {
			return err
		}
	}

	// Controller还未启动时,将watchDescription添加到startWatches中,Controller启动时会启动startWatches中的watch
	if !c.Started {
		c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
		return nil
	}

	c.Log.Info("Starting EventSource", "source", src)
	return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

此时Controller还未启动,会把watchDescription添加到startWatches中,在Controller启动时会启动startWatches中的watch,代码如下:

// pkg/internal/controller/controller.go
func (c *Controller) Start(ctx context.Context) error {
	c.mu.Lock()
	if c.Started {
		return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
	}

	c.initMetrics()

	c.ctx = ctx

	// 创建工作队列
	c.Queue = c.MakeQueue()
	go func() {
		<-ctx.Done()
		c.Queue.ShutDown()
	}()

	wg := &sync.WaitGroup{}
	err := func() error {
		defer c.mu.Unlock()

		defer utilruntime.HandleCrash()

		for _, watch := range c.startWatches {
			c.Log.Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))

      // 1)启动startWatches中的watch
			if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
				return err
			}
		}

		c.Log.Info("Starting Controller")

		for _, watch := range c.startWatches {
			syncingSource, ok := watch.src.(source.SyncingSource)
			if !ok {
				continue
			}

			if err := func() error {
				sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
				defer cancel()

				if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
					err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
					c.Log.Error(err, "Could not wait for Cache to sync")
					return err
				}

				return nil
			}(); err != nil {
				return err
			}
		}

		c.startWatches = nil

		c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
		wg.Add(c.MaxConcurrentReconciles)
		for i := 0; i < c.MaxConcurrentReconciles; i++ {
			go func() {
				defer wg.Done()
				// 启动workers来处理资源
				for c.processNextWorkItem(ctx) {
				}
			}()
		}

		c.Started = true
		return nil
	}()
	if err != nil {
		return err
	}

	<-ctx.Done()
	c.Log.Info("Shutdown signal received, waiting for all workers to finish")
	wg.Wait()
	c.Log.Info("All workers finished")
	return nil
}

代码1)处可以看到最终是去调用的Source这个参数的Start()方法,Source是事件的源,如对资源对象的Create、Update、Delete操作,需要由event.EventHandlersreconcile.Requests入队列进行处理

  • 使用Kind来处理来自集群的事件(如资源对象的Create、Update、Delete操作)
  • 使用Channel来处理来自集群外部的事件(如GitHub Webhook回调、轮询外部URL)
// pkg/source/source.go
type Source interface {
	// Start是一个内部函数,只应该由Controller调用,向Informer注册一个EventHandler,将reconcile.Request放入队列
	Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

Source是一个接口,doWatch()方法中可以看到传入的是source.Kind类型,该结构体就实现了source.Source接口:

// pkg/source/source.go
// Kind用于提供来自集群内部的事件源,这些事件来自于Watches(例如创建Pod事件)
type Kind struct {
	// Type是watch对象的类型,比如&v1.Pod{}
	Type client.Object

	// cache用于watch的API接口
	cache cache.Cache

	started     chan error
	startCancel func()
}

func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
	prct ...predicate.Predicate) error {
	if ks.Type == nil {
		return fmt.Errorf("must specify Kind.Type")
	}

	if ks.cache == nil {
		return fmt.Errorf("must call CacheInto on Kind before calling Start")
	}

	ctx, ks.startCancel = context.WithCancel(ctx)
	ks.started = make(chan error)
	go func() {
		var (
			i       cache.Informer
			lastErr error
		)

		if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
			// 从Cache中获取Informer
			i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
			if lastErr != nil {
				kindMatchErr := &meta.NoKindMatchError{}
				if errors.As(lastErr, &kindMatchErr) {
					log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
						"kind", kindMatchErr.GroupKind)
				}
				return false, nil
			}
			return true, nil
		}); err != nil {
			if lastErr != nil {
				ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr)
				return
			}
			ks.started <- err
			return
		}

		// 向Informer中添加EventHandler
		i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
		if !ks.cache.WaitForCacheSync(ctx) {
			ks.started <- errors.New("cache did not sync")
		}
		close(ks.started)
	}()

	return nil
}

watch.src.Start()实际调用了Kind.Start()方法,该方法实现的获取资源对象的Informer以及注册事件监听函数。其中调用的AddEventHandler()方法中传入了internal.EventHandler类型的实例,internal.EventHandler结构体实现了client-go中提供的ResourceEventHandler接口,也就是OnAdd()/OnUpdate()/OnDelete()几个方法:

// pkg/source/internal/eventsource.go
// EventHandler实现了client-go中的cache.ResourceEventHandler接口
type EventHandler struct {
	EventHandler handler.EventHandler
	Queue        workqueue.RateLimitingInterface
	Predicates   []predicate.Predicate
}

func (e EventHandler) OnAdd(obj interface{}) {
	// kubernetes对象被创建的事件
	c := event.CreateEvent{}

	if o, ok := obj.(client.Object); ok {
		c.Object = o
	} else {
		log.Error(nil, "OnAdd missing Object",
			"object", obj, "type", fmt.Sprintf("%T", obj))
		return
	}

	// Predicates用于事件过滤,循环调用Predicates的Create方法
	for _, p := range e.Predicates {
		if !p.Create(c) {
			return
		}
	}

	// 调用EventHandler的Create方法
	e.EventHandler.Create(c, e.Queue)
}

func (e EventHandler) OnUpdate(oldObj, newObj interface{}) {
	u := event.UpdateEvent{}

	if o, ok := oldObj.(client.Object); ok {
		u.ObjectOld = o
	} else {
		log.Error(nil, "OnUpdate missing ObjectOld",
			"object", oldObj, "type", fmt.Sprintf("%T", oldObj))
		return
	}

	if o, ok := newObj.(client.Object); ok {
		u.ObjectNew = o
	} else {
		log.Error(nil, "OnUpdate missing ObjectNew",
			"object", newObj, "type", fmt.Sprintf("%T", newObj))
		return
	}

	for _, p := range e.Predicates {
		if !p.Update(u) {
			return
		}
	}

	// 调用EventHandler的Update方法
	e.EventHandler.Update(u, e.Queue)
}

func (e EventHandler) OnDelete(obj interface{}) {
	d := event.DeleteEvent{}

	var ok bool
	if _, ok = obj.(client.Object); !ok {
		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
		if !ok {
			log.Error(nil, "Error decoding objects.  Expected cache.DeletedFinalStateUnknown",
				"type", fmt.Sprintf("%T", obj),
				"object", obj)
			return
		}

		obj = tombstone.Obj
	}

	if o, ok := obj.(client.Object); ok {
		d.Object = o
	} else {
		log.Error(nil, "OnDelete missing Object",
			"object", obj, "type", fmt.Sprintf("%T", obj))
		return
	}

	for _, p := range e.Predicates {
		if !p.Delete(d) {
			return
		}
	}

	// 调用EventHandler的Delete方法
	e.EventHandler.Delete(d, e.Queue)
}

EventHandler结构体实现了client-go中的cache.ResourceEventHandler接口,实现过程中调用了Predicates进行事件过滤,过滤后才是真正的事件处理,真正的事件处理通过handler.EventHandler处理的,这个函数通过前面的doWatch()方法可以看出来它是一个&handler.EnqueueRequestForObject{}对象,所以真正的事件处理逻辑是这个函数去实现的

// pkg/handler/enqueue.go
// EnqueueRequestForObject是一个包含了作为事件源的对象的Name和Namespace的入队列的Request
type EnqueueRequestForObject struct{}

func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
	if evt.Object == nil {
		enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
		return
	}
	// 添加一个Request对象到工作队列
	q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
		Name:      evt.Object.GetName(),
		Namespace: evt.Object.GetNamespace(),
	}})
}

func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
	switch {
	// 如果新的meta对象不为空,添加到工作队列中
	case evt.ObjectNew != nil:
		q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
			Name:      evt.ObjectNew.GetName(),
			Namespace: evt.ObjectNew.GetNamespace(),
		}})
	// 如果旧的meta对象不为空,添加到工作队列中
	case evt.ObjectOld != nil:
		q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
			Name:      evt.ObjectOld.GetName(),
			Namespace: evt.ObjectOld.GetNamespace(),
		}})
	default:
		enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
	}
}

func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
	if evt.Object == nil {
		enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
		return
	}
	// 添加一个Request对象到工作队列
	q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
		Name:      evt.Object.GetName(),
		Namespace: evt.Object.GetNamespace(),
	}})
}

通过EnqueueRequestForObject的Create()/Update()/Delete()实现可以看出我们放入到工作队列中的元素不是以前默认的元素唯一的KEY,而是经过封装的reconcile.Request对象,当然通过这个对象也可以很方便获取对象的唯一标识KEY

总结起来就是watch.src.Start()方法就是来实现Informer初始化以及事件监听函数的注册

在这里插入图片描述

Controller监听事件流程:

在这里插入图片描述

4)、小结

1)Controller是如何被Manager管理的?

作为组件,实现了Runnable的接口,由Manager的Add()方法将其加入Manager中,在Manager启动时,调用Start()方法启动

2)Controller是如何创建的?

通过controller-runtime提供的builder,构件好所需要的Controller选项,并调用Controller.New()方法

3)Reconcile是如何被调用的?

Controller.New()方法中,会将我们的Reconcile添加到Controller中,这样当workqueue中存在事件时,会交由Reconcile处理

4)Reconcile的参数和返回值如何处理?

参数被封装为reconcile.Request,其实就是namespace和name

返回值被封装为reconcile.Result,通过它我们可以控制元素是否重新入队

5)Controller是如何添加事件处理方法的?

在builder中的doWatch()方法会根据创建对应的handler,Kind类型的Source,当启动Controller时,启动Source,在Kind.Start()方法中会将handler注册到Cache的Informer中

6)Controller是如何过滤事件的?

在builder中,可以添加Predicates用来决定create、update、delete事件是否处理

7)Builder提供的其他能力?

Builder还支持通过WithOptions()设置Controller的Option,通过WithEventFilter()添加事件过滤方法

6、Cache

Cache负责同步Controller关心的资源,其核心是GVK->Informer的映射,Informer会负责监听对应GVK的GVRs的创建/删除/更新操作,以触发Controller的Reconcile逻辑,Cache接口定义如下:

// pkg/cache/cache.go
type Cache interface {
	// 用于从Cache中获取及列举Kubernetes集群的资源
	client.Reader

	// 为不同的GVK创建或获取对应的Informer,并将Index添加到对应的Informer中
	Informers
}

type Informers interface {
	GetInformer(ctx context.Context, obj client.Object) (Informer, error)

	GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)

	Start(ctx context.Context) error

	WaitForCacheSync(ctx context.Context) bool

	client.FieldIndexer
}

1)、Cache依赖注入

如果想要被注入Cache依赖,需要实现InjectCache()方法,也就实现了Cache接口

// pkg/runtime/inject/inject.go
type Cache interface {
	InjectCache(cache cache.Cache) error
}

func CacheInto(c cache.Cache, i interface{}) (bool, error) {
	if s, ok := i.(Cache); ok {
		return true, s.InjectCache(c)
	}
	return false, nil
}

在Controller创建过程中,NewUnmanaged()方法是真正实例化Controller的地方,这里会调用controllerManager的SetFields()方法,在Reconciler中注入依赖关系,代码如下:

// pkg/controller/controller.go
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
	c, err := NewUnmanaged(name, mgr, options)
	if err != nil {
		return nil, err
	}

	// 将controller注册manager中
	return c, mgr.Add(c)
}

func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
	if options.Reconciler == nil {
		return nil, fmt.Errorf("must specify Reconciler")
	}

	if len(name) == 0 {
		return nil, fmt.Errorf("must specify Name for Controller")
	}

	if options.Log.GetSink() == nil {
		options.Log = mgr.GetLogger()
	}

	if options.MaxConcurrentReconciles <= 0 {
		options.MaxConcurrentReconciles = 1
	}

	if options.CacheSyncTimeout == 0 {
		options.CacheSyncTimeout = 2 * time.Minute
	}

	if options.RateLimiter == nil {
		options.RateLimiter = workqueue.DefaultControllerRateLimiter()
	}

	// 在Reconciler中注入依赖关系
	if err := mgr.SetFields(options.Reconciler); err != nil {
		return nil, err
	}

	// 创建Controller并配置依赖关系
	return &controller.Controller{
		Do: options.Reconciler,
		MakeQueue: func() workqueue.RateLimitingInterface {
			return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
		},
		MaxConcurrentReconciles: options.MaxConcurrentReconciles,
		CacheSyncTimeout:        options.CacheSyncTimeout,
		SetFields:               mgr.SetFields,
		Name:                    name,
		Log:                     options.Log.WithName("controller").WithName(name),
		RecoverPanic:            options.RecoverPanic,
	}, nil
}

controllerManager的SetFields()方法代码如下:

// pkg/manager/internal.go
func (cm *controllerManager) SetFields(i interface{}) error {
	// 调用cluster的SetFields
	if err := cm.cluster.SetFields(i); err != nil {
		return err
	}
	if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
		return err
	}
	if _, err := inject.StopChannelInto(cm.internalProceduresStop, i); err != nil {
		return err
	}
	if _, err := inject.LoggerInto(cm.logger, i); err != nil {
		return err
	}

	return nil
}
// pkg/cluster/internal.go
func (c *cluster) SetFields(i interface{}) error {
	if _, err := inject.ConfigInto(c.config, i); err != nil {
		return err
	}
	if _, err := inject.ClientInto(c.client, i); err != nil {
		return err
	}
	if _, err := inject.APIReaderInto(c.apiReader, i); err != nil {
		return err
	}
	if _, err := inject.SchemeInto(c.scheme, i); err != nil {
		return err
	}
	if _, err := inject.CacheInto(c.cache, i); err != nil {
		return err
	}
	if _, err := inject.MapperInto(c.mapper, i); err != nil {
		return err
	}
	return nil
}

controllerManager持有了Cache的实例,通过SetFields()方法将其注入到Controller中,Controller的Watch()方法中再将Cache注入到Source中

// pkg/internal/controller/controller.go
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	// 将cache注入到Source中
	if err := c.SetFields(src); err != nil {
		return err
	}
	if err := c.SetFields(evthdler); err != nil {
		return err
	}
	for _, pr := range prct {
		if err := c.SetFields(pr); err != nil {
			return err
		}
	}

	// Controller还未启动时,将watchDescription添加到startWatches中,Controller启动时会启动startWatches中的watch
	if !c.Started {
		c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
		return nil
	}

	c.Log.Info("Starting EventSource", "source", src)
	return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

2)、Cache实例化

在Manager实例化时会创建cluster,创建cluster时会调用options.NewCache实例化Cache,默认是是调用cache的New方法进行实例化,代码如下:

// pkg/cache/cache.go
func New(config *rest.Config, opts Options) (Cache, error) {
	// 设置默认参数
	opts, err := defaultOpts(config, opts)
	if err != nil {
		return nil, err
	}
	selectorsByGVK, err := convertToSelectorsByGVK(opts.SelectorsByObject, opts.DefaultSelector, opts.Scheme)
	if err != nil {
		return nil, err
	}
	disableDeepCopyByGVK, err := convertToDisableDeepCopyByGVK(opts.UnsafeDisableDeepCopyByObject, opts.Scheme)
	if err != nil {
		return nil, err
	}
	// 初始化InformersMap
	im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK)
	return &informerCache{InformersMap: im}, nil
}

NewInformersMap方法实现如下:

// pkg/cache/internal/deleg_map.go
func NewInformersMap(config *rest.Config,
	scheme *runtime.Scheme,
	mapper meta.RESTMapper,
	resync time.Duration,
	namespace string,
	selectors SelectorsByGVK,
	disableDeepCopy DisableDeepCopyByGVK,
) *InformersMap {
	// 为structured、unstructured、metadata分别构建InformersMap
	return &InformersMap{
		structured:   newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
		unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
		metadata:     newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),

		Scheme: scheme,
	}
}

func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
	namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap {
	// 传入createStructuredListWatch函数,通过该函数对GVK进行List和Watch操作
	return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createStructuredListWatch)
}

createStructuredListWatch函数实现如下:

// pkg/cache/internal/informers_map.go
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
	mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
	if err != nil {
		return nil, err
	}

	client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
	if err != nil {
		return nil, err
	}
	listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
	listObj, err := ip.Scheme.New(listGVK)
	if err != nil {
		return nil, err
	}

	ctx := context.TODO()
	return &cache.ListWatch{
		ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
			ip.selectors(gvk).ApplyToList(&opts)
			res := listObj.DeepCopyObject()
			namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
			isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
			err := client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
			return res, err
		},
		WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
			ip.selectors(gvk).ApplyToList(&opts)
			opts.Watch = true
			namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
			isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
			return client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)
		},
	}, nil
}

Cache实例化流程:

  1. NewInformersMap:为structured、unstructured、metadata分别构建InformersMap
  2. newStructuredInformersMap:该接口通过不同类型Object(structured、unstructured、metadata)与GVK的组合信息创建并缓存Informers
  3. 定义List-Watch函数:为3种不同类型的Object实现List-Watch函数,通过该函数可对GVK进行List和Watch操作

Cache的初始化流程中,Cache主要创建了InformersMap,Scheme中的每个GVK都会创建对应的Informers,再通过informersByGVKde Map,实现GVK到Informer的映射;每个Informer都会通过List-Watch函数对相应的GVK进行List和Watch操作

3)、Cache启动

Manager启动的时候执行Cache启动的启动,Manager的Start()方法代码如下:

// pkg/manager/internal.go
func (cm *controllerManager) Start(ctx context.Context) (err error) {
	cm.Lock()
	if cm.started {
		cm.Unlock()
		return errors.New("manager already started")
	}
	var ready bool
	defer func() {
		if !ready {
			cm.Unlock()
		}
	}()

	cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

	stopComplete := make(chan struct{})
	defer close(stopComplete)
	defer func() {
		stopErr := cm.engageStopProcedure(stopComplete)
		if stopErr != nil {
			if err != nil {
				err = kerrors.NewAggregate([]error{err, stopErr})
			} else {
				err = stopErr
			}
		}
	}()

	// 将cluster注册manager中
	if err := cm.add(cm.cluster); err != nil {
		return fmt.Errorf("failed to add cluster to runnables: %w", err)
	}

	if cm.metricsListener != nil {
		cm.serveMetrics()
	}

	if cm.healthProbeListener != nil {
		cm.serveHealthProbes()
	}

	// 启动webhooks的runnables
	if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
		if err != wait.ErrWaitTimeout {
			return err
		}
	}

	// 启动caches的runnables
	if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
		if err != wait.ErrWaitTimeout {
			return err
		}
	}

	// 启动不需要领导者选举的runnables
	if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
		if err != wait.ErrWaitTimeout {
			return err
		}
	}

	{
		ctx, cancel := context.WithCancel(context.Background())
		cm.leaderElectionCancel = cancel
		go func() {
			if cm.resourceLock != nil {
				// 启动领导者选举
				if err := cm.startLeaderElection(ctx); err != nil {
					cm.errChan <- err
				}
			} else {s
				// 启动需要领导者选举的runnables
				if err := cm.startLeaderElectionRunnables(); err != nil {
					cm.errChan <- err
				}
				close(cm.elected)
			}
		}()
	}

	ready = true
	cm.Unlock()
	select {
	case <-ctx.Done():
		return nil
	case err := <-cm.errChan:
		return err
	}
}

Start()方法中先调用cm.add()将cluster注册到manager中,cluster中包含了cache,然后调用cm.runnables.Caches.Start()启动caches的runnables,就会启动注册的cluster,实际会调用InformersMap的Start()方法,这里的核心逻辑就是启动所有的Informer,代码如下:

// pkg/cache/internal/deleg_map.go
func (m *InformersMap) Start(ctx context.Context) error {
	go m.structured.Start(ctx)
	go m.unstructured.Start(ctx)
	go m.metadata.Start(ctx)
	<-ctx.Done()
	return nil
}
// pkg/cache/internal/informers_map.go
func (ip *specificInformersMap) Start(ctx context.Context) {
	func() {
		ip.mu.Lock()
		defer ip.mu.Unlock()

		// Set the stop channel so it can be passed to informers that are added later
		ip.stop = ctx.Done()

		// Start each informer
		// 启动Informer
		for _, informer := range ip.informersByGVK {
			go informer.Informer.Run(ctx.Done())
		}

		// Set started to true so we immediately start any informers added later.
		ip.started = true
		close(ip.startWait)
	}()
	<-ctx.Done()
}

Controller监听事件中讲到,Controller会先向Informer注册特定资源的EventHandler,然后这里Cache会启动Informer,Informer向ApiServer发出请求,建立连接。当Informer检测到有资源变动后,使用Controller注册进来的EventHandler判断是否推入队列中

Cache启动流程:

在这里插入图片描述

7、Client

1)、Client实例化

在Manager实例化时会创建cluster,创建cluster时会调用options.NewClient实例化Client,默认是调用DefaultNewClient()方法进行实例化,代码如下:

// pkg/cluster/cluster.go
func DefaultNewClient(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) {
	c, err := client.New(config, options)
	if err != nil {
		return nil, err
	}

	return client.NewDelegatingClient(client.NewDelegatingClientInput{
		CacheReader:     cache,
		Client:          c,
		UncachedObjects: uncachedObjects,
	})
}

DefaultNewClient()方法调用了client.NewDelegatingClient()方法,代码如下:

// pkg/client/split.go
func NewDelegatingClient(in NewDelegatingClientInput) (Client, error) {
	uncachedGVKs := map[schema.GroupVersionKind]struct{}{}
	for _, obj := range in.UncachedObjects {
		gvk, err := apiutil.GVKForObject(obj, in.Client.Scheme())
		if err != nil {
			return nil, err
		}
		uncachedGVKs[gvk] = struct{}{}
	}

	return &delegatingClient{
		scheme: in.Client.Scheme(),
		mapper: in.Client.RESTMapper(),
		Reader: &delegatingReader{
			CacheReader:       in.CacheReader,
			ClientReader:      in.Client,
			scheme:            in.Client.Scheme(),
			uncachedGVKs:      uncachedGVKs,
			cacheUnstructured: in.CacheUnstructured,
		},
		Writer:       in.Client,
		StatusClient: in.Client,
	}, nil
}

Reader中包含了CacheReader,实际是前面传入的Cache,Writer的赋值为Client。所以,调用Client时,读操作实际查询的是本地Cache,写操作直接访问API Server

2)、小结

在创建Manager时,会创建一个Cluster的组件,这个组件中会创建Cache和Client。在controller-runtime中,Cache和Client实际是对client-go中的Informer和RESTClient的封装。最后其他组件,比如Reconciler和Source就可以通过相应的setFields()方法将Cache、Client等依赖注入进去,从而使用它们

8、总结

controller-runtime整体工作流程:

首先Controller会先向Informer注册特定资源的eventHandler;然后Cache会启动Informer,Informer向APIServer发出请求,建立连接;当Informer检测到有资源变动后,使用Controller注册进来的eventHandler判断是否推入队列中;当队列中有元素被推入时,Controller会将元素取出,并执行用户侧的Reconciler

参考:

2022年最新k8s编程operator篇

kubebuilder 进阶: 源码分析

controller-runtime 之 manager 实现

controller-runtime 之控制器实现

Controller Runtime 的四种使用姿势

《云原生应用开发 Operator原理与实践》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/338292.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

paddle表情识别部署

表情识别模块1.环境部署1.1同样采用fastDeploy库1.2相关模型2.封装成静态库2.1参考[百度Paddle中PP-Mattingv2的部署并将之封装并调用一个C静态库](https://blog.csdn.net/weixin_43564060/article/details/128882099)2.2项目依赖添加2.3生成成功3.test3.1创建emotion_test项目…

多传感器融合定位十二-基于图优化的建图方法其一

多传感器融合定位十二-基于图优化的建图方法其一1. 基于预积分的融合方案流程1.1 优化问题分析1.2 预积分的作用1.3 基于预积分的建图方案流程2. 预积分模型设计3. 预积分在优化中的使用3.1 使用方法3.2 残差设计3.3 残差雅可比的推导3.3.1 姿态残差的雅可比3.3.2 速度残差的雅…

Python3.10新特性之match语句示例详解

这篇文章主要为大家介绍了Python3.10新特性之match语句示例详解&#xff0c;有需要的朋友可以借鉴参考下&#xff0c;希望能够有所帮助&#xff0c;祝大家多多进步&#xff0c;早日升职加薪正文在Python 3.10发布之前&#xff0c;Python是没有类似于其他语言中switch语句的&…

Clip-path实现按钮流动边框动画

前言 &#x1f44f;Clip-path实现按钮流动边框动画&#xff0c;速速来Get吧~ &#x1f947;文末分享源代码。记得点赞关注收藏&#xff01; 1.实现效果 2.实现步骤 添加div标签 <div>苏苏_icon</div>添加样式 div {position: relative;width: 220px;height: 6…

1947抓住那头牛(队列 广度优先搜索)

目录 题目描述 解析 解题思路 代码部分 代码部分 运行结果 看看len数组中各个位置的标记值 为什么这样做一定是最短路径&#xff1a; 题目描述 农夫知道一头牛的位置&#xff0c;想要抓住它。农夫和牛都位于数轴上&#xff0c;农夫起始位于点N(0<N<100000)&…

Java八股文(Java面试题)

JDK、JRE、JVM 三者之间的关系&#xff1f;JDK&#xff08;Java Development Kit&#xff09;&#xff1a;是Java开发工具包&#xff0c;是整个Java的核心&#xff0c;包括了Java运行环境JRE、Java工具和Java基础类库。它能够创建和编译程序。JRE&#xff08;Java Runtime Envi…

MySQL-字符集和比较规则

在计算机中只能存储二进制数据&#xff0c;那该怎么存储字符串呢&#xff1f;当然是建立字符与二进制数据的映射关系 了&#xff0c;建立这个关系最起码要搞清楚两件事&#xff1a; 界定清楚字符范围&#xff1a;需要把哪些字符映射成二进制数据&#xff1f;编码与解码&#x…

九龙证券|外资强势出手!这只科创板百元股,被疯狂加仓

本周&#xff0c;北上资金净买入29.32亿元&#xff0c;连续第13周加仓A股。分商场看&#xff0c;北上资金加仓重点倾向于沪市的白马蓝筹股&#xff0c;沪股通取得50.34亿元&#xff0c;深股通则被净卖出21.02亿元。 食品饮料本周取得逾23亿元的增持&#xff0c;居职业首位&…

leaflet 读取上传的geojson文件,转换为wkt文件(057)

第057个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+leaflet中上传geojson文件,解析geojson文件并转换为WKT,并在地图上显示图片。 直接复制下面的 vue+openlayers源代码,操作2分钟即可运行实现效果. 文章目录 示例效果配置方式示例源代码(共128行)安装 @terraf…

软件测试/自动化测试/测试开发/性能测试经典好书推荐

目录 前言 测试理论与实践 性能测试 安全测试 自动化测试 编程语言与开发技能 管理技能 前言 软件测试入行容易进阶难。从测试小白成长为测试经理、总监级别人才&#xff0c;要跨越长长的技术栈升级之路&#xff0c;挑战非常高的综合能力要求。 大牛都是相似的&#xf…

五分钟看懂Java字节码:极简手册

字节码新手很容易被厚厚的 JVM 书籍劝退&#xff0c;即使我看过相关书籍&#xff0c;工作真正用到时也全忘了&#xff0c;还得现学。 等我有了一定的字节码阅读经验&#xff0c;才发现字节码其实非常简单&#xff0c;只需要三步就能快速学会&#xff1a; 先了解 JVM 的基本结…

【设计模式之美 设计原则与思想:面向对象】14 | 实战二(下):如何利用面向对象设计和编程开发接口鉴权功能?

在上一节课中&#xff0c;针对接口鉴权功能的开发&#xff0c;我们讲了如何进行面向对象分析&#xff08;OOA&#xff09;&#xff0c;也就是需求分析。实际上&#xff0c;需求定义清楚之后&#xff0c;这个问题就已经解决了一大半&#xff0c;这也是为什么我花了那么多篇幅来讲…

创建Django项目

创建Django项目 步骤 创建Django项目 django-admin startproject name 创建子应用 python manager.py startapp name创建工程 在使用Flask框架时&#xff0c;项目工程目录的组织与创建是需要我们自己手动创建完成的。 在django中&#xff0c;项目工程目录可以借助django提供…

嵌软工程师要掌握的硬件知识2:一文看懂什么开漏和推挽电路(open-drain / push-pull)

想了解开漏和推挽,就要先了解一下三极管和场效应管是什么,在其他章节有详细介绍,本文就不再进行赘述。 1 推挽(push pull)电路 1.1 理解什么是推挽电路 - 详细介绍 如图所示,Q3是个NPN型三极管,Q4是个PNP型三极管。 1)当Vin电压为正时,上面的N型三极管控制端有电…

ccc-Classification-李宏毅(4)

文章目录Classification 概念Example ApplicationHow to do ClassificationWhy not RegesssionProbability from Class - FeatureProbability from ClassHow’s the results?Modifying ModelThree StepsProbability DistributionClassification 概念 本质是找一个函数&#x…

电商导购CPS,淘宝联盟如何跟单实现用户和订单绑定

前言 大家好&#xff0c;我是小悟 做过自媒体的小伙伴都知道&#xff0c;不管是发图文还是发短视频&#xff0c;直播也好&#xff0c;可以带货。在你的内容里面挂上商品&#xff0c;你自己都不需要囤货&#xff0c;如果用户通过这个商品下单成交了&#xff0c;自媒体平台就会…

基于 MySQL 排它锁实现分布式可重入锁解决方案

一、MySQL 排它锁和共享锁 在进行实验前&#xff0c;先来了解下MySQL 的排它锁和共享锁&#xff0c;在 MySQL 中的锁分为表锁和行锁&#xff0c;在行锁中锁又分成了排它锁和共享锁两种类型。 1. 排它锁 排他锁又称为写锁&#xff0c;简称X锁&#xff0c;是一种悲观锁&#x…

【C++】模板初阶STL简介

今天&#xff0c;你内卷了吗&#xff1f; 文章目录一、泛型编程二、函数模板&#xff08;显示实例化和隐式实例化&#xff09;1.函数模板格式2.单参数模板3.多参数模板4.模板参数的匹配原则三、类模板&#xff08;没有推演的时机&#xff0c;统一显示实例化&#xff09;1.类模…

RTOS之二环境搭建初识RTOS

参考&#xff1a;https://blog.csdn.net/kouxi1/article/details/123650688RTOS本质就是切换线程栈&#xff0c;栈换了环境就换了&#xff0c;一个重要的结构tcb&#xff08;linux叫PCB或thread_info&#xff09;&#xff1a;struct tcb{int32_t *sp; // 重要的sp指针&#xff…

seata【SAGA模式】代码实践(细节未必完全符合saga的配置,仅参考)

seata SAGA模式&#xff1a; 代码仍然是上一篇AT模式的代码&#xff1a;AT模式 不需要undo_log表 下面开始&#xff1a; 首先&#xff0c;saga模式依靠状态机的json文件来执行整个流程&#xff0c;其中的开始节点的服务即TM&#xff0c;然后状态机需要依靠三张表&#xff0…