Istio Pilot源码学习(一):Pilot-Discovery启动流程、ConfigController配置规则发现

news2024/11/26 13:43:54

本文基于Istio 1.18.0版本进行源码学习

1、Pilot-Discovery工作原理

Pilot-Discovery是Istio控制面的核心,负责服务网格中的流量管理以及控制面和数据面之间的配置下发

Pilot-Discovery从注册中心(如Kubernetes)获取服务信息并汇集,从Kubernetes API Server中获取配置规则,将服务信息和配置数据转换为xDS接口的标准数据结构,通过GRPC下发到数据面的Envoy

2、Pilot-Discovery代码结构

在这里插入图片描述

Pilot-Discovery的入口函数为:pilot/cmd/pilot-discovery/main.go中的main方法。main方法中创建了Pilot Server,Pilot Server中主要包含三部分逻辑:

  • ConfigController:管理各种配置数据,包括用户创建的流量管理规则和策略
  • ServiceController:获取Service Registry中的服务发现数据
  • DiscoveryService:主要包含下述逻辑:
    • 启动GRPC Server并接收来自Envoy端的连接请求
    • 接收Envoy端的xDS请求,从ConfigController和ServiceController中获取配置和服务信息,生成响应消息发送给Envoy
    • 监听来自ConfigController的配置变化消息和ServiceController的服务变化消息,并将配置和服务变化内容通过xDS接口推送到Envoy

3、Pilot-Discovery启动流程

创建Pilot Server代码如下:

// pilot/pkg/bootstrap/server.go
func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {
	e := model.NewEnvironment()
	e.DomainSuffix = args.RegistryOptions.KubeOptions.DomainSuffix
	e.SetLedger(buildLedger(args.RegistryOptions))

	ac := aggregate.NewController(aggregate.Options{
		MeshHolder: e,
	})
	e.ServiceDiscovery = ac

	s := &Server{
		clusterID:               getClusterID(args),
		environment:             e,
		fileWatcher:             filewatcher.NewWatcher(),
		httpMux:                 http.NewServeMux(),
		monitoringMux:           http.NewServeMux(),
		readinessProbes:         make(map[string]readinessProbe),
		readinessFlags:          &readinessFlags{},
		workloadTrustBundle:     tb.NewTrustBundle(nil),
		server:                  server.New(),
		shutdownDuration:        args.ShutdownDuration,
		internalStop:            make(chan struct{}),
		istiodCertBundleWatcher: keycertbundle.NewWatcher(),
		webhookInfo:             &webhookInfo{},
	}

	// Apply custom initialization functions.
	for _, fn := range initFuncs {
		fn(s)
	}
	// Initialize workload Trust Bundle before XDS Server
	e.TrustBundle = s.workloadTrustBundle
	// 初始化discoveryServer
	s.XDSServer = xds.NewDiscoveryServer(e, args.PodName, s.clusterID, args.RegistryOptions.KubeOptions.ClusterAliases)

	prometheus.EnableHandlingTimeHistogram()

	// make sure we have a readiness probe before serving HTTP to avoid marking ready too soon
	s.initReadinessProbes()

	// 初始化http和grpc server,向grpc server注册discoveryServer
	s.initServers(args)
	if err := s.initIstiodAdminServer(args, s.webhookInfo.GetTemplates); err != nil {
		return nil, fmt.Errorf("error initializing debug server: %v", err)
	}
	if err := s.serveHTTP(); err != nil {
		return nil, fmt.Errorf("error serving http: %v", err)
	}

	// Apply the arguments to the configuration.
	if err := s.initKubeClient(args); err != nil {
		return nil, fmt.Errorf("error initializing kube client: %v", err)
	}

	// used for both initKubeRegistry and initClusterRegistries
	args.RegistryOptions.KubeOptions.EndpointMode = kubecontroller.DetectEndpointMode(s.kubeClient)

	s.initMeshConfiguration(args, s.fileWatcher)
	spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())

	s.initMeshNetworks(args, s.fileWatcher)
	s.initMeshHandlers()
	s.environment.Init()
	if err := s.environment.InitNetworksManager(s.XDSServer); err != nil {
		return nil, err
	}

	// Options based on the current 'defaults' in istio.
	caOpts := &caOptions{
		TrustDomain:      s.environment.Mesh().TrustDomain,
		Namespace:        args.Namespace,
		DiscoveryFilter:  args.RegistryOptions.KubeOptions.GetFilter(),
		ExternalCAType:   ra.CaExternalType(externalCaType),
		CertSignerDomain: features.CertSignerDomain,
	}

	if caOpts.ExternalCAType == ra.ExtCAK8s {
		// Older environment variable preserved for backward compatibility
		caOpts.ExternalCASigner = k8sSigner
	}
	// CA signing certificate must be created first if needed.
	if err := s.maybeCreateCA(caOpts); err != nil {
		return nil, err
	}

	// 初始化configController和serviceController
	if err := s.initControllers(args); err != nil {
		return nil, err
	}

	s.XDSServer.InitGenerators(e, args.Namespace, s.internalDebugMux)

	// Initialize workloadTrustBundle after CA has been initialized
	if err := s.initWorkloadTrustBundle(args); err != nil {
		return nil, err
	}

	// Parse and validate Istiod Address.
	istiodHost, _, err := e.GetDiscoveryAddress()
	if err != nil {
		return nil, err
	}

	// Create Istiod certs and setup watches.
	if err := s.initIstiodCerts(args, string(istiodHost)); err != nil {
		return nil, err
	}

	// Secure gRPC Server must be initialized after CA is created as may use a Citadel generated cert.
	if err := s.initSecureDiscoveryService(args); err != nil {
		return nil, fmt.Errorf("error initializing secure gRPC Listener: %v", err)
	}

	// common https server for webhooks (e.g. injection, validation)
	if s.kubeClient != nil {
		s.initSecureWebhookServer(args)
		wh, err := s.initSidecarInjector(args)
		if err != nil {
			return nil, fmt.Errorf("error initializing sidecar injector: %v", err)
		}
		s.webhookInfo.mu.Lock()
		s.webhookInfo.wh = wh
		s.webhookInfo.mu.Unlock()
		if err := s.initConfigValidation(args); err != nil {
			return nil, fmt.Errorf("error initializing config validator: %v", err)
		}
	}

	// This should be called only after controllers are initialized.
	// 向configController和serviceController注册事件回调函数
	s.initRegistryEventHandlers()

	// 设置discoveryServer启动函数
	s.initDiscoveryService()

	// Notice that the order of authenticators matters, since at runtime
	// authenticators are activated sequentially and the first successful attempt
	// is used as the authentication result.
	authenticators := []security.Authenticator{
		&authenticate.ClientCertAuthenticator{},
	}
	if args.JwtRule != "" {
		jwtAuthn, err := initOIDC(args)
		if err != nil {
			return nil, fmt.Errorf("error initializing OIDC: %v", err)
		}
		if jwtAuthn == nil {
			return nil, fmt.Errorf("JWT authenticator is nil")
		}
		authenticators = append(authenticators, jwtAuthn)
	}
	// The k8s JWT authenticator requires the multicluster registry to be initialized,
	// so we build it later.
	if s.kubeClient != nil {
		authenticators = append(authenticators,
			kubeauth.NewKubeJWTAuthenticator(s.environment.Watcher, s.kubeClient.Kube(), s.clusterID, s.multiclusterController.GetRemoteKubeClient, features.JwtPolicy))
	}
	if len(features.TrustedGatewayCIDR) > 0 {
		authenticators = append(authenticators, &authenticate.XfccAuthenticator{})
	}
	if features.XDSAuth {
		s.XDSServer.Authenticators = authenticators
	}
	caOpts.Authenticators = authenticators

	// Start CA or RA server. This should be called after CA and Istiod certs have been created.
	s.startCA(caOpts)

	// TODO: don't run this if galley is started, one ctlz is enough
	if args.CtrlZOptions != nil {
		_, _ = ctrlz.Run(args.CtrlZOptions, nil)
	}

	// This must be last, otherwise we will not know which informers to register
	if s.kubeClient != nil {
		s.addStartFunc(func(stop <-chan struct{}) error {
			s.kubeClient.RunAndWait(stop)
			return nil
		})
	}

	return s, nil
}

NewServer()方法中核心逻辑如下:

  1. 初始化DiscoveryServer
  2. 初始化HTTP和GRPC Server,向GRPC Server注册DiscoveryServer
  3. 初始化ConfigController和ServiceController
  4. 向ConfigController和ServiceController注册事件回调函数,有配置和服务信息变更时会通知DiscoveryServer
  5. 设置DiscoveryServer启动函数

Pilot Server定义如下:

// pilot/pkg/bootstrap/server.go
type Server struct {
	// discoveryServer
	XDSServer *xds.DiscoveryServer

	clusterID cluster.ID
	// pilot环境所需的api集合
	environment *model.Environment

	// 处理kubernetes主集群的注册中心
	kubeClient kubelib.Client

	// 处理kubernetes多个集群的注册中心
	multiclusterController *multicluster.Controller

	// 统一处理配置规则的controller
	configController model.ConfigStoreController
	// 配置规则缓存
	ConfigStores []model.ConfigStoreController
	// 负责serviceEntry的服务发现
	serviceEntryController *serviceentry.Controller

	httpServer  *http.Server // debug, monitoring and readiness Server.
	httpAddr    string
	httpsServer *http.Server // webhooks HTTPS Server.

	grpcServer        *grpc.Server
	grpcAddress       string
	secureGrpcServer  *grpc.Server
	secureGrpcAddress string

	// monitoringMux listens on monitoringAddr(:15014).
	// Currently runs prometheus monitoring and debug (if enabled).
	monitoringMux *http.ServeMux
	// internalDebugMux is a mux for *internal* calls to the debug interface. That is, authentication is disabled.
	internalDebugMux *http.ServeMux

	// httpMux listens on the httpAddr (8080).
	// If a Gateway is used in front and https is off it is also multiplexing
	// the rest of the features if their port is empty.
	// Currently runs readiness and debug (if enabled)
	httpMux *http.ServeMux

	// httpsMux listens on the httpsAddr(15017), handling webhooks
	// If the address os empty, the webhooks will be set on the default httpPort.
	httpsMux *http.ServeMux // webhooks

	// fileWatcher used to watch mesh config, networks and certificates.
	fileWatcher filewatcher.FileWatcher

	// certWatcher watches the certificates for changes and triggers a notification to Istiod.
	cacertsWatcher *fsnotify.Watcher
	dnsNames       []string

	CA *ca.IstioCA
	RA ra.RegistrationAuthority

	// TrustAnchors for workload to workload mTLS
	workloadTrustBundle     *tb.TrustBundle
	certMu                  sync.RWMutex
	istiodCert              *tls.Certificate
	istiodCertBundleWatcher *keycertbundle.Watcher
	// pilot的所有组件都注册启动任务到此对象,便于在Start()方法中批量启动及管理
	server server.Instance

	readinessProbes map[string]readinessProbe
	readinessFlags  *readinessFlags

	// duration used for graceful shutdown.
	shutdownDuration time.Duration

	// internalStop is closed when the server is shutdown. This should be avoided as much as possible, in
	// favor of AddStartFunc. This is only required if we *must* start something outside of this process.
	// For example, everything depends on mesh config, so we use it there rather than trying to sequence everything
	// in AddStartFunc
	internalStop chan struct{}

	webhookInfo *webhookInfo

	statusReporter *distribution.Reporter
	statusManager  *status.Manager
	// RWConfigStore is the configstore which allows updates, particularly for status.
	RWConfigStore model.ConfigStoreController
}

Pilot-Discovery启动流程如下图:

在这里插入图片描述

3、配置规则发现:ConfigController

Pilot的配置规则指网络路由规则及网络安全规则,包含Virtualservice、Destinationrule、Gateway、PeerAuthentication、RequestAuthentication等资源。目前支持三种类型的ConfigController:

  • MCP:是一种服务网格配置传输协议,用于隔离Pilot和底层平台(Kubernetes、文件系统或者其他注册中心),使得Pilot无需感知底层平台的差异,更专注于Envoy xDS配置的生成与分发
  • Kubernetes:基于Kubernetes的Config发现利用了Kubernetes Informer的List-Watch能力。在Kubernetes集群中,Config以CustomResource的形式存在。Pilot通过配置控制器(CRD Controller)监听Kubernetes APIServer配置规则资源,维护所有资源的缓存,并触发事件处理回调函数
  • File:通过文件监视器周期性地读取本地配置文件,将配置规则缓存在内存中,并维护配置的增加、更新、删除事件,当缓存有变化时,异步通知内存控制器执行事件回调函数

1)、ConfigController的核心接口

ConfigController实现了ConfigStoreController接口:

// pilot/pkg/model/config.go
type ConfigStoreController interface {
	// 配置缓存接口
	ConfigStore

	// 注册事件处理函数
	// RegisterEventHandler adds a handler to receive config update events for a
	// configuration type
	RegisterEventHandler(kind config.GroupVersionKind, handler EventHandler)

	// 运行控制器
	// Run until a signal is received.
	// Run *should* block, so callers should typically call `go controller.Run(stop)`
	Run(stop <-chan struct{})

	// 配置缓存是否已同步
	// HasSynced returns true after initial cache synchronization is complete
	HasSynced() bool
}

ConfigStoreController继承ConfigStore接口,ConfigStore为控制器核心的资源缓存接口提供了对Config资源的增删改查功能:

// pilot/pkg/model/config.go
type ConfigStore interface {
	// Schemas exposes the configuration type schema known by the config store.
	// The type schema defines the bidirectional mapping between configuration
	// types and the protobuf encoding schema.
	Schemas() collection.Schemas

	// Get retrieves a configuration element by a type and a key
	Get(typ config.GroupVersionKind, name, namespace string) *config.Config

	// List returns objects by type and namespace.
	// Use "" for the namespace to list across namespaces.
	List(typ config.GroupVersionKind, namespace string) []config.Config

	// Create adds a new configuration object to the store. If an object with the
	// same name and namespace for the type already exists, the operation fails
	// with no side effects.
	Create(config config.Config) (revision string, err error)

	// Update modifies an existing configuration object in the store.  Update
	// requires that the object has been created.  Resource version prevents
	// overriding a value that has been changed between prior _Get_ and _Put_
	// operation to achieve optimistic concurrency. This method returns a new
	// revision if the operation succeeds.
	Update(config config.Config) (newRevision string, err error)
	UpdateStatus(config config.Config) (newRevision string, err error)

	// Patch applies only the modifications made in the PatchFunc rather than doing a full replace. Useful to avoid
	// read-modify-write conflicts when there are many concurrent-writers to the same resource.
	Patch(orig config.Config, patchFn config.PatchFunc) (string, error)

	// Delete removes an object from the store by key
	// For k8s, resourceVersion must be fulfilled before a deletion is carried out.
	// If not possible, a 409 Conflict status will be returned.
	Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error
}

2)、ConfigController的初始化

Kubernetes ConfigController实际上是一个CRD Operator,它从Kubernetes API Server监听所有的Istio API资源,其初始化过程如下:

crdclient.New()方法代码如下:

// pilot/pkg/config/kube/crdclient/client.go
func New(client kube.Client, opts Option) (*Client, error) {
	schemas := collections.Pilot
	if features.EnableGatewayAPI {
		schemas = collections.PilotGatewayAPI()
	}
	return NewForSchemas(client, opts, schemas)
}

collections.Pilot中定义了Istio所有的Config资源类型,代码如下:

// pkg/config/schema/collections/collections.gen.go
	// Pilot contains only collections used by Pilot.
	Pilot = collection.NewSchemasBuilder().
		MustAdd(AuthorizationPolicy).
		MustAdd(DestinationRule).
		MustAdd(EnvoyFilter).
		MustAdd(Gateway).
		MustAdd(PeerAuthentication).
		MustAdd(ProxyConfig).
		MustAdd(RequestAuthentication).
		MustAdd(ServiceEntry).
		MustAdd(Sidecar).
		MustAdd(Telemetry).
		MustAdd(VirtualService).
		MustAdd(WasmPlugin).
		MustAdd(WorkloadEntry).
		MustAdd(WorkloadGroup).
		Build()

crdclient.New()方法中调用了NewForSchemas()方法:

// pilot/pkg/config/kube/crdclient/client.go
func NewForSchemas(client kube.Client, opts Option, schemas collection.Schemas) (*Client, error) {
	schemasByCRDName := map[string]resource.Schema{}
	for _, s := range schemas.All() {
		// From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."
		name := fmt.Sprintf("%s.%s", s.Plural(), s.Group())
		schemasByCRDName[name] = s
	}
	// 实例化crd client
	out := &Client{
		domainSuffix:     opts.DomainSuffix,
		schemas:          schemas,
		schemasByCRDName: schemasByCRDName,
		revision:         opts.Revision,
		queue:            queue.NewQueue(1 * time.Second),
		kinds:            map[config.GroupVersionKind]*cacheHandler{},
		handlers:         map[config.GroupVersionKind][]model.EventHandler{},
		client:           client,
		// 创建crdWatcher,监听crd的创建
		crdWatcher:       crdwatcher.NewController(client),
		logger:           scope.WithLabels("controller", opts.Identifier),
		namespacesFilter: opts.NamespacesFilter,
		crdWatches: map[config.GroupVersionKind]*waiter{
			gvk.KubernetesGateway: newWaiter(),
			gvk.GatewayClass:      newWaiter(),
		},
	}

	// 添加回调函数,当crd创建时调用handleCRDAdd方法
	out.crdWatcher.AddCallBack(func(name string) {
		handleCRDAdd(out, name)
	})
	// 获取集群中当前所有的crd
	known, err := knownCRDs(client.Ext())
	if err != nil {
		return nil, err
	}
	// 遍历istio所有的config资源类型
	for _, s := range schemas.All() {
		// From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."
		name := fmt.Sprintf("%s.%s", s.Plural(), s.Group())
		if s.IsBuiltin() {
			handleCRDAdd(out, name)
		} else {
			// istio config资源类型对应crd已创建,调用handleCRDAdd方法
			if _, f := known[name]; f {
				handleCRDAdd(out, name)
			} else {
				out.logger.Warnf("Skipping CRD %v as it is not present", s.GroupVersionKind())
			}
		}
	}
	return out, nil
}

NewForSchemas()方法中实例化了CRD Client,CRD Client定义如下:

// pilot/pkg/config/kube/crdclient/client.go
type Client struct {
	// schemas defines the set of schemas used by this client.
	// Note: this must be a subset of the schemas defined in the codegen
	schemas collection.Schemas

	// domainSuffix for the config metadata
	domainSuffix string

	// revision for this control plane instance. We will only read configs that match this revision.
	revision string

	// kinds keeps track of all cache handlers for known types
	// 记录所有资源类型对应的informer控制器
	kinds   map[config.GroupVersionKind]*cacheHandler
	kindsMu sync.RWMutex
	// 事件处理队列
	queue queue.Instance

	// handlers defines a list of event handlers per-type
	// 资源类型及对应的事件处理回调函数
	handlers map[config.GroupVersionKind][]model.EventHandler

	// crd相关的schema
	schemasByCRDName map[string]resource.Schema
	// kubernetes客户端,包含istioClient操作istio api对象,istio informer监听istio api对象变更事件
	client kube.Client
	// 监听crd的创建
	crdWatcher *crdwatcher.Controller
	logger     *log.Scope

	// namespacesFilter is only used to initiate filtered informer.
	namespacesFilter func(obj interface{}) bool

	// crdWatches notifies consumers when a CRD is present
	crdWatches map[config.GroupVersionKind]*waiter
	stop       <-chan struct{}
}

3)、ConfigController的工作机制

Kubernetes ConfigController为每种Config资源都创建了一个Informer,用于监听所有Config资源并注册EventHandler

NewForSchemas()方法中,如果Istio Config资源类型对应CRD已创建或者crdWatcher监听CRD创建后,都会调用handleCRDAdd()方法:

// pilot/pkg/config/kube/crdclient/client.go
func handleCRDAdd(cl *Client, name string) {
	cl.logger.Debugf("adding CRD %q", name)
	s, f := cl.schemasByCRDName[name]
	if !f {
		cl.logger.Debugf("added resource that we are not watching: %v", name)
		return
	}
	resourceGVK := s.GroupVersionKind()
	gvr := s.GroupVersionResource()

	cl.kindsMu.Lock()
	defer cl.kindsMu.Unlock()
	if _, f := cl.kinds[resourceGVK]; f {
		cl.logger.Debugf("added resource that already exists: %v", resourceGVK)
		return
	}
	var i informers.GenericInformer
	var ifactory starter
	var err error
	// 根据api group添加到不同的sharedInformerFactory中
	switch s.Group() {
	case gvk.KubernetesGateway.Group:
		ifactory = cl.client.GatewayAPIInformer()
		i, err = cl.client.GatewayAPIInformer().ForResource(gvr)
	case gvk.Pod.Group, gvk.Deployment.Group, gvk.MutatingWebhookConfiguration.Group:
		ifactory = cl.client.KubeInformer()
		i, err = cl.client.KubeInformer().ForResource(gvr)
	case gvk.CustomResourceDefinition.Group:
		ifactory = cl.client.ExtInformer()
		i, err = cl.client.ExtInformer().ForResource(gvr)
	default:
		ifactory = cl.client.IstioInformer()
		i, err = cl.client.IstioInformer().ForResource(gvr)
	}
	if err != nil {
		// Shouldn't happen
		cl.logger.Errorf("failed to create informer for %v: %v", resourceGVK, err)
		return
	}
	_ = i.Informer().SetTransform(kube.StripUnusedFields)

	// 调用createCacheHandler方法,为informer添加事件回调函数
	cl.kinds[resourceGVK] = createCacheHandler(cl, s, i)
	if w, f := cl.crdWatches[resourceGVK]; f {
		cl.logger.Infof("notifying watchers %v was created", resourceGVK)
		w.once.Do(func() {
			close(w.stop)
		})
	}
	// Start informer. In startup case, we will not start here as
	// we will start all factories once we are ready to initialize.
	// For dynamically added CRDs, we need to start immediately though
	if cl.stop != nil {
		// 启动informer
		ifactory.Start(cl.stop)
	}
}

每种Informer的事件回调函数均通过createCacheHandler()方法注册,代码如下:

// pilot/pkg/config/kube/crdclient/cache_handler.go
func createCacheHandler(cl *Client, schema resource.Schema, i informers.GenericInformer) *cacheHandler {
	scope.Debugf("registered CRD %v", schema.GroupVersionKind())
	h := &cacheHandler{
		client: cl,
		schema: schema,
		// 创建informer,支持配置namespace级别隔离
		informer: kclient.NewUntyped(cl.client, i.Informer(), kclient.Filter{ObjectFilter: cl.namespacesFilter}),
	}

	kind := schema.Kind()
	// 添加事件回调函数
	h.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj any) {
			incrementEvent(kind, "add")
			// 创建任务对象并将其发送到任务队列中
			cl.queue.Push(func() error {
				return h.onEvent(nil, obj, model.EventAdd)
			})
		},
		UpdateFunc: func(old, cur any) {
			incrementEvent(kind, "update")
			cl.queue.Push(func() error {
				return h.onEvent(old, cur, model.EventUpdate)
			})
		},
		DeleteFunc: func(obj any) {
			incrementEvent(kind, "delete")
			cl.queue.Push(func() error {
				return h.onEvent(nil, obj, model.EventDelete)
			})
		},
	})
	return h
}

当Config资源在Kubernetes中创建、更新和删除时,EventHandler会创建任务对象并将其发送到任务列中,然后由任务处理协程处理。处理资源变化的onEvent()方法代码如下:

// pilot/pkg/config/kube/crdclient/cache_handler.go
func (h *cacheHandler) onEvent(old any, curr any, event model.Event) error {
	currItem := controllers.ExtractObject(curr)
	if currItem == nil {
		return nil
	}

	// 进行对象转换
	currConfig := TranslateObject(currItem, h.schema.GroupVersionKind(), h.client.domainSuffix)

	var oldConfig config.Config
	if old != nil {
		oldItem, ok := old.(runtime.Object)
		if !ok {
			log.Warnf("Old Object can not be converted to runtime Object %v, is type %T", old, old)
			return nil
		}
		oldConfig = TranslateObject(oldItem, h.schema.GroupVersionKind(), h.client.domainSuffix)
	}

	if h.client.objectInRevision(&currConfig) {
		// 执行事件处理回调函数
		h.callHandlers(oldConfig, currConfig, event)
		return nil
	}

	// Check if the object was in our revision, but has been moved to a different revision. If so,
	// it has been effectively deleted from our revision, so process it as a delete event.
	if event == model.EventUpdate && old != nil && h.client.objectInRevision(&oldConfig) {
		log.Debugf("Object %s/%s has been moved to a different revision, deleting",
			currConfig.Namespace, currConfig.Name)
		// 执行事件处理回调函数
		h.callHandlers(oldConfig, currConfig, model.EventDelete)
		return nil
	}

	log.Debugf("Skipping event %s for object %s/%s from different revision",
		event, currConfig.Namespace, currConfig.Name)
	return nil
}

func (h *cacheHandler) callHandlers(old config.Config, curr config.Config, event model.Event) {
	// TODO we may consider passing a pointer to handlers instead of the value. While spec is a pointer, the meta will be copied
	// 执行该资源类型对应的事件处理回调函数
	for _, f := range h.client.handlers[h.schema.GroupVersionKind()] {
		f(old, curr, event)
	}
}

onEvent()方法中通过TranslateObject()方法进行对象转换,然后执行该资源类型对应的事件处理回调函数

h.client.handlers是各种资源类型的处理函数集合,是通过ConfigController的RegisterEventHandler()注册的,注册代码如下:

// pilot/pkg/bootstrap/server.go
func (s *Server) initRegistryEventHandlers() {
	...
	if s.configController != nil {
		configHandler := func(prev config.Config, curr config.Config, event model.Event) {
			defer func() {
				// 状态报告
				if event != model.EventDelete {
					s.statusReporter.AddInProgressResource(curr)
				} else {
					s.statusReporter.DeleteInProgressResource(curr)
				}
			}()
			log.Debugf("Handle event %s for configuration %s", event, curr.Key())
			// For update events, trigger push only if spec has changed.
			// 对于更新事件,仅当对象的spec发生变化时才触发xds推送
			if event == model.EventUpdate && !needsPush(prev, curr) {
				log.Debugf("skipping push for %s as spec has not changed", prev.Key())
				return
			}
			// 触发xds全量更新
			pushReq := &model.PushRequest{
				Full:           true,
				ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.MustFromGVK(curr.GroupVersionKind), Name: curr.Name, Namespace: curr.Namespace}),
				Reason:         []model.TriggerReason{model.ConfigUpdate},
			}
			s.XDSServer.ConfigUpdate(pushReq)
		}
		schemas := collections.Pilot.All()
		if features.EnableGatewayAPI {
			schemas = collections.PilotGatewayAPI().All()
		}
		for _, schema := range schemas {
			// This resource type was handled in external/servicediscovery.go, no need to rehandle here.
			// 下面3种类型在serviceEntry controller中处理,这里不用为其注册事件处理函数
			if schema.GroupVersionKind() == gvk.ServiceEntry {
				continue
			}
			if schema.GroupVersionKind() == gvk.WorkloadEntry {
				continue
			}
			if schema.GroupVersionKind() == gvk.WorkloadGroup {
				continue
			}

			// 注册其他所有api对象的事件处理函数
			s.configController.RegisterEventHandler(schema.GroupVersionKind(), configHandler)
		}
		...
}

完整的Config事件处理流程如下图所示:

  1. EventHandler构造任务(Task),任务实际上是对onEvent函数的封装
  2. EventHandler将任务推送到任务队列中
  3. 任务处理协程阻塞式地读取任务队列,执行任务,通过onEvent方法处理事件,并通过ConfigHandler触发xDS的更新

参考:

《Istio权威指南 下》

3.深入Istio源码:Pilot配置规则ConfigController

Istio Pilot代码深度解析

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/782167.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

新版UI盲盒抽奖系统源码-带H5端小程序端可打包APP_带安装教程

新版UI盲盒抽奖系统源码-带H5端小程序端可打包APP,这套盲盒系统小白看了一下里面没有搭建教程的&#xff0c;但是盲盒的搭建方式都是差不多的这套就放给你们自己研究了&#xff0c;UI还是很好看的感兴趣可以自己搭建看看。

Vue中TodoList案例_勾选

与上一篇Vue中TodoList案例_添加有三个文件变化了 App.vue&#xff1a;中加了checkTodo方法 <template><div id"root"><div class"todo-container"><div class"todo-wrap"><MyHeader :addTodo"addTodo"/&…

【Linux】信号保存信号处理

前言&#xff1a;对信号产生的思考 上一篇博客所说的信号产生&#xff0c;最终都要有OS来进行执行&#xff0c;为什么&#xff1f;OS是进程的管理者&#xff01;信号的处理是否是立即处理的&#xff1f;在合适的时候 -》那什么是合适的时候&#xff1f;信号如图不是被立即处理…

动态规划入门第3课,经典DP问题2 --- 背包问题

练习1 第1题 方案数 查看测评数据信息 给你n个整数&#xff0c;每个数可选或不选&#xff0c;要求选一些数&#xff0c;使它们的和为S&#xff0c;问有多少种方案&#xff1f; 输入格式 第一行&#xff1a;2个整数n和s&#xff0c;范围都在[1, 100]。 第二行&#xff1a;n个…

spring boot3 集成swagger3

快速开始 | Knife14j 官方的推荐 1. 设置pom.xml 主要是引入nexus-maven&#xff0c;com.github.xiaoymin 2个&#xff0c;cn.hutool&#xff0c;org.springframework <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://ma…

Linux超详细的了解

文章目录 前言Linux的简介不同应用领域的主流操作系统Linux系统历史Linux系统各版本 Linux的安装安装方式介绍安装Linux网卡设置安装SSH连接工具finalShell连接虚拟机Linux和Windows目录结构对比Linux目录介绍 Linux常用命令Linux命令初体验文件目录命令ls命令cd命令cat命令mor…

【Hammerstein模型的级联】快速估计构成一连串哈默斯坦模型的结构元素研究(Matlab代码实现)

&#x1f4a5;1 概述 在许多振动应用中&#xff0c;所研究的系统略微非线性。Hammerstein模型的级联可以方便地描述这样的系统。Hammerstein提供了一种基于指数正弦扫描相位属性的简单方法。 构成一连串Hammerstein模型的结构元素可以在快速估计中起到关键的作用。Hammerstei…

260. 只出现一次的数字 III

题目描述&#xff1a; 主要思路&#xff1a; 首先通过抑或的方式可以将所有两个的数字全部排除&#xff0c;得到两个单个数字的异或值。 接下来将当前得到的异或值取最低一位的1。 分析异或值的每一位&#xff0c;为1的肯定是两个数中一个有一个没有。于是可以通过这一特性将两…

(转载)PID神经元网络解耦控制算法(matlab实现)

​本博客的完整代码获取&#xff1a;https://www.mathworks.com/academia/books/book106283.html​ 1案例背景 1.1PID 神经元网络结构 PID神经元网络从结构上可以分为输人层、隐含层和输出层三层&#xff0c;n个控制量的PID神经元网络包含n个并列的相同子网络,各子网络间既相…

【车载开发系列】AUTOSAR DemComponent和DemDTC

【车载开发系列】AUTOSAR DemComponent和DemDTC 【车载开发系列】AUTOSAR DemComponent和DemDTC 【车载开发系列】AUTOSAR DemComponent和DemDTC一. DemComponent概念二. DemDTC概念三. 常用设置参数DemDTCClass1&#xff09; DemDTCFunctional2&#xff09;DemDTCSeverity3&am…

正点原子ubuntu虚拟机 使用QT虚拟键盘

目录 下载源码使用QT creator 编译移植库文件 http://t.csdn.cn/3QWOj https://blog.csdn.net/LuoLewin/article/details/124283314 本文参考这篇文章&#xff0c;使用正点原子的Ubuntu虚拟机&#xff0c;成功实现QT虚拟键盘&#xff0c;其中使用的方法做下记录&#xff0c;以免…

在Goland上配置GO环境

基本可以参照&#xff1a;https://segmentfault.com/a/1190000023710741 几点说明&#xff1a; 如果项目中有mod文件&#xff0c;然后提示&#xff1a;$GOPATH/go.mod exists but should not,可以如下操作&#xff1a; 设置进入到这个页面&#xff1a; 如何要安装一些包&…

数据结构--图的遍历 BFS

数据结构–图的遍历 BFS 树的广度优先遍历 从 1 结点进行 b f s bfs bfs的顺序&#xff1a; 【1】 【2】【3】【4】 【4】【6】【7】【8】 图的广度优先遍历 从 2 号点开始 b f s bfs bfs的顺序&#xff1a; 【2】 【1】【6】 【5】【3】【7】 【4】【8】 树 vs 图 不存在“回…

appium中toast识别

目录 一、什么是Toast&#xff1f; 二、环境前提 三、修改配置 四、安装驱动 五、常见报错及解决方案 1、cnpm 不识别&#xff0c;提示不是内部或外部命令 2、npm 也不识别 3、报错 六、代码节选 一、什么是Toast&#xff1f; Android中的Toast是一种简易的消息提示框…

IDEA+springboot+ MyBatis +ssm+ Bootstrap+Mysql房屋租赁系统源码

IDEAspringboot MyBatis ssm BootstrapMysql房屋租赁系统源码 一、系统介绍1.环境配置 二、系统展示1. 管理员登录2.房源列表3.添加房源4.在租列表5. 已退租列表6. 看房申请7. 退租申请8. 待处理报障9.已处理报障10.我要收租11.租客待缴租金12.租客已缴租金13.查看日程14.添加日…

2023.07.23 学习周报

文章目录 摘要文献阅读1.题目2.问题3.解决方案4.方法4.1 框架4.2 基于高斯扩散的修复方法4.3 PM2.5的误差校正模型4.4 PM2.5数据修复的GD-GRU模型4.5 评估指标 5.实验5.1 网络参数5.2 实验结果 6.结论7.展望 Ns方程1.NS方程每一项的物理意义2.NS方程的推导过程3.深度学习与NS方…

机器学习预测指数

导包&#xff0c;收集数据 import numpy as np import pandas as pd import talib import warnings warnings.filterwarnings(ignore) import tushare as tsdata ts.get_k_data(codehs300, start2005-04-08, end2023-11-08, ktypeD) data data.set_index(date) data data[[…

ftp和sftp区别,以及xftp的使用

网上找链接找的很辛苦对吧&#xff01; 网上下载的破解版还不用。而且用没多久又说要更新了&#xff0c;又得重新找。 这下直接把官方免费获取链接发给你&#xff0c;就不用在被这种事情麻烦了。 家庭/学校免费 - NetSarang Website (xshell.com):家庭/学校免费 - NetSarang W…

JVM运行时数据区——方法区、堆、栈的关系

方法区存储加载的字节码文件内的相关信息和运行时常量池&#xff0c;方法区可以看作是独立于Java堆的内存空间&#xff0c;方法区是在JVM启动时创建的&#xff0c;其内存的大小可以调整&#xff0c;是线程共享的&#xff0c;并且也会出现内存溢出的情况&#xff0c;也可存在垃圾…

Idea中git push to origin/master was rejected错误解决方案

Idea中git push to origin/master was rejected错误解决方案 问题描述解决方法 问题描述 idea开发中,需要将项目发布到gitee上,在gitee上创建仓库后,通过idea中git推送项目代码提示: push to origin/master was rejected 解决方法 gitee创建仓库时创建了README.md文件,本地…