Istio Pilot源码学习(三):xDS的异步分发

news2024/11/28 4:35:18

本文基于Istio 1.18.0版本进行源码学习

5、xDS的异步分发

DiscoveryService主要包含下述逻辑:

  • 启动GRPC Server并接收来自Envoy端的连接请求
  • 接收Envoy端的xDS请求,从ConfigController和ServiceController中获取配置和服务信息,生成响应消息发送给Envoy
  • 监听来自ConfigController的配置变化消息和ServiceController的服务变化消息,并将配置和服务变化内容通过xDS接口推送到Envoy

1)、DiscoveryService初始化

DiscoveryService初始化流程如下图:

1)xds.NewDiscoveryServer

DiscoveryServer是通过调用xds.NewDiscoveryServer()方法初始化的,返回的是一个DiscoveryServer实例。DiscoveryServer定义如下:

// pilot/pkg/xds/discovery.go
type DiscoveryServer struct {
	// Env is the model environment.
	Env *model.Environment

	// ConfigGenerator is responsible for generating data plane configuration using Istio networking
	// APIs and service registry info
	// xds配置的生成器
	ConfigGenerator core.ConfigGenerator

	// Generators allow customizing the generated config, based on the client metadata.
	// Key is the generator type - will match the Generator metadata to set the per-connection
	// default generator, or the combination of Generator metadata and TypeUrl to select a
	// different generator for a type.
	// Normal istio clients use the default generator - will not be impacted by this.
	// 针对不同配置类型的生成器
	Generators map[string]model.XdsResourceGenerator

	// ProxyNeedsPush is a function that determines whether a push can be completely skipped. Individual generators
	// may also choose to not send any updates.
	// 判断本次推送是否涉及该envoy
	ProxyNeedsPush func(proxy *model.Proxy, req *model.PushRequest) bool

	// concurrentPushLimit is a semaphore that limits the amount of concurrent XDS pushes.
	// 控制推送并发数
	concurrentPushLimit chan struct{}
	// RequestRateLimit limits the number of new XDS requests allowed. This helps prevent thundering hurd of incoming requests.
	RequestRateLimit *rate.Limiter

	// InboundUpdates describes the number of configuration updates the discovery server has received
	InboundUpdates *atomic.Int64
	// CommittedUpdates describes the number of configuration updates the discovery server has
	// received, process, and stored in the push context. If this number is less than InboundUpdates,
	// there are updates we have not yet processed.
	// Note: This does not mean that all proxies have received these configurations; it is strictly
	// the push context, which means that the next push to a proxy will receive this configuration.
	CommittedUpdates *atomic.Int64

	// pushChannel is the buffer used for debouncing.
	// after debouncing the pushRequest will be sent to pushQueue
	// 统一接收其他组件发来的pushRequest的channel
	pushChannel chan *model.PushRequest

	// mutex used for protecting Environment.PushContext
	updateMutex sync.RWMutex

	// pushQueue is the buffer that used after debounce and before the real xds push.
	// 主要是在真正xds推送前做防抖缓存
	pushQueue *PushQueue

	// debugHandlers is the list of all the supported debug handlers.
	debugHandlers map[string]string

	// adsClients reflect active gRPC channels, for both ADS and EDS.
	// ads和eds的grpc连接
	adsClients      map[string]*Connection
	adsClientsMutex sync.RWMutex

	StatusReporter DistributionStatusCache

	// Authenticators for XDS requests. Should be same/subset of the CA authenticators.
	Authenticators []security.Authenticator

	// StatusGen is notified of connect/disconnect/nack on all connections
	StatusGen               *StatusGen
	WorkloadEntryController *autoregistration.Controller

	// serverReady indicates caches have been synced up and server is ready to process requests.
	serverReady atomic.Bool

	debounceOptions debounceOptions

	instanceID string

	clusterID cluster.ID

	// Cache for XDS resources
	// xds资源缓存
	Cache model.XdsCache

	// JwtKeyResolver holds a reference to the JWT key resolver instance.
	JwtKeyResolver *model.JwksResolver

	// ListRemoteClusters collects debug information about other clusters this istiod reads from.
	ListRemoteClusters func() []cluster.DebugInfo

	// ClusterAliases are aliase names for cluster. When a proxy connects with a cluster ID
	// and if it has a different alias we should use that a cluster ID for proxy.
	ClusterAliases map[cluster.ID]cluster.ID
}

2)initGrpcServer

在初始化GRPC Server时,调用了XDSServer.Register()方法,向GRPC Server注册服务:

// pilot/pkg/xds/discovery.go
func (s *DiscoveryServer) Register(rpcs *grpc.Server) {
	// Register v3 server
	discovery.RegisterAggregatedDiscoveryServiceServer(rpcs, s)
}

DiscoveryServer实现了AggregatedDiscoveryServiceServer接口:

// envoy/service/discovery/v3/ads.pb.go
type AggregatedDiscoveryServiceServer interface {
	// This is a gRPC-only API.
	// 全量ads stream接口
	StreamAggregatedResources(AggregatedDiscoveryService_StreamAggregatedResourcesServer) error
	// 增量ads stream接口
	DeltaAggregatedResources(AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error
}

StreamAggregatedResources接收DiscoveryRequest,返回DiscoveryResponse流,包含全量的xDS数据

调用流程如下图:

3)initDiscoveryService

initDiscoveryService()方法中将DiscoveryServer启动函数添加到Server的startFuncs队列中,会在初始化完毕之后调用:

// pilot/pkg/bootstrap/server.go
func (s *Server) initDiscoveryService() {
	log.Infof("starting discovery service")
	// Implement EnvoyXdsServer grace shutdown
	s.addStartFunc(func(stop <-chan struct{}) error {
		log.Infof("Starting ADS server")
		s.XDSServer.Start(stop)
		return nil
	})
}

2)、DiscoveryService启动

DiscoveryServer的Start()方法主要分别启动了四个协程:

// pilot/pkg/xds/discovery.go
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
	go s.WorkloadEntryController.Run(stopCh)
	go s.handleUpdates(stopCh)
	go s.periodicRefreshMetrics(stopCh)
	go s.sendPushes(stopCh)
	go s.Cache.Run(stopCh)
}

比较重要的是handleUpdates和sendPushes

Config、Service、Endpoint对资源的处理最后都会调用ConfigUpdate()方法向DiscoveryServer的pushChannel队列发送PushRequest实现的,处理流程如下:

DiscoveryServer首先通过handleUpdates协程阻塞式地接收并处理更新请求,并将PushRequest发送到DiscoveryServer的pushQueue中,然后由sendPushes协程并发地将PushRequest发送给每一条连接的pushChannel,最后由DiscoveryServer的流处理接口处理分发请求

3)、handleUpdates

// pilot/pkg/xds/discovery.go
func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) {
	debounce(s.pushChannel, stopCh, s.debounceOptions, s.Push, s.CommittedUpdates)
}

func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, opts debounceOptions, pushFn func(req *model.PushRequest), updateSent *atomic.Int64) {
	var timeChan <-chan time.Time
	var startDebounce time.Time
	var lastConfigUpdateTime time.Time

	pushCounter := 0
	debouncedEvents := 0

	// Keeps track of the push requests. If updates are debounce they will be merged.
	var req *model.PushRequest

	free := true
	freeCh := make(chan struct{}, 1)

	push := func(req *model.PushRequest, debouncedEvents int, startDebounce time.Time) {
		pushFn(req)
		updateSent.Add(int64(debouncedEvents))
		debounceTime.Record(time.Since(startDebounce).Seconds())
		freeCh <- struct{}{}
	}

	pushWorker := func() {
		eventDelay := time.Since(startDebounce)
		quietTime := time.Since(lastConfigUpdateTime)
		// it has been too long or quiet enough
		// 当以下两个条件满足任意一个时,进行更新事件处理
		// 距离本轮第1次更新时间超过最大延迟时间(debounceMax默认10s)
		// 距离上次更新时间超过最大静默时间(debounceAfter默认100ms)
		if eventDelay >= opts.debounceMax || quietTime >= opts.debounceAfter {
			if req != nil {
				pushCounter++
				if req.ConfigsUpdated == nil {
					log.Infof("Push debounce stable[%d] %d for reason %s: %v since last change, %v since last push, full=%v",
						pushCounter, debouncedEvents, reasonsUpdated(req),
						quietTime, eventDelay, req.Full)
				} else {
					log.Infof("Push debounce stable[%d] %d for config %s: %v since last change, %v since last push, full=%v",
						pushCounter, debouncedEvents, configsUpdated(req),
						quietTime, eventDelay, req.Full)
				}
				free = false
				go push(req, debouncedEvents, startDebounce)
				req = nil
				debouncedEvents = 0
			}
		} else {
			timeChan = time.After(opts.debounceAfter - quietTime)
		}
	}

	for {
		select {
		case <-freeCh:
			free = true
			pushWorker()
		case r := <-ch:
			// If reason is not set, record it as an unknown reason
			if len(r.Reason) == 0 {
				r.Reason = []model.TriggerReason{model.UnknownTrigger}
			}
			if !opts.enableEDSDebounce && !r.Full {
				// 立即触发eds推送
				// trigger push now, just for EDS
				go func(req *model.PushRequest) {
					pushFn(req)
					updateSent.Inc()
				}(r)
				continue
			}

			lastConfigUpdateTime = time.Now()
			if debouncedEvents == 0 {
				// 启动新一轮的配置下发定时器,定时长度为最小静默时间
				timeChan = time.After(opts.debounceAfter)
				// 记录第1次事件更新的时间
				startDebounce = lastConfigUpdateTime
			}
			debouncedEvents++

			// 合并pushRequest
			req = req.Merge(r)
		case <-timeChan:
			if free {
				pushWorker()
			}
		case <-stopCh:
			return
		}
	}
}

debounce()方法中通过最小静默时间(debounceAfter)合并更新时间,通过最大延迟时间(debounceMax)控制xDS配置下发的时延。由于DiscoveryServer.Push需要初始化PushContext,会消耗大量内存,所以为了避免OOM,debounce()方法中控制DiscoveryServer.Push串行执行

DiscoveryServer.Push方法会一直往下调用,直到把数据推入到DiscoveryServer的pushQueue管道中,代码调用逻辑如下:

在这里插入图片描述

3)、PushContext的初始化

PushContext是xDS生成中最重要的结构对象,几乎包含所有网格资源信息,PushContext结构体定义如下:

// pilot/pkg/model/push_context.go
type PushContext struct {
	proxyStatusMutex sync.RWMutex
	// ProxyStatus is keyed by the error code, and holds a map keyed
	// by the ID.
	ProxyStatus map[string]map[string]ProxyPushStatus

	// Synthesized from env.Mesh
	exportToDefaults exportToDefaults

	// ServiceIndex is the index of services by various fields.
	// service缓存
	ServiceIndex serviceIndex

	// serviceAccounts contains a map of hostname and port to service accounts.
	serviceAccounts map[serviceAccountKey][]string

	// virtualServiceIndex is the index of virtual services by various fields.
	// virtualService缓存
	virtualServiceIndex virtualServiceIndex

	// destinationRuleIndex is the index of destination rules by various fields.
	// destinationRule缓存
	destinationRuleIndex destinationRuleIndex

	// gatewayIndex is the index of gateways.
	gatewayIndex gatewayIndex

	// clusterLocalHosts extracted from the MeshConfig
	clusterLocalHosts ClusterLocalHosts

	// sidecarIndex stores sidecar resources
	// sidecar缓存
	sidecarIndex sidecarIndex

	// envoy filters for each namespace including global config namespace
	// envoyFilter缓存
	envoyFiltersByNamespace map[string][]*EnvoyFilterWrapper

	// wasm plugins for each namespace including global config namespace
	wasmPluginsByNamespace map[string][]*WasmPluginWrapper

	// AuthnPolicies contains Authn policies by namespace.
	AuthnPolicies *AuthenticationPolicies `json:"-"`

	// AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there
	// are no authorization policies in the cluster.
	AuthzPolicies *AuthorizationPolicies `json:"-"`

	// Telemetry stores the existing Telemetry resources for the cluster.
	Telemetry *Telemetries `json:"-"`

	// ProxyConfig stores the existing ProxyConfig resources for the cluster.
	ProxyConfigs *ProxyConfigs `json:"-"`

	// The following data is either a global index or used in the inbound path.
	// Namespace specific views do not apply here.

	// Mesh configuration for the mesh.
	Mesh *meshconfig.MeshConfig `json:"-"`

	// PushVersion describes the push version this push context was computed for
	PushVersion string

	// LedgerVersion is the version of the configuration ledger
	LedgerVersion string

	// JwtKeyResolver holds a reference to the JWT key resolver instance.
	JwtKeyResolver *JwksResolver

	// GatewayAPIController holds a reference to the gateway API controller.
	GatewayAPIController GatewayController

	// cache gateways addresses for each network
	// this is mainly used for kubernetes multi-cluster scenario
	networkMgr *NetworkManager

	Networks *meshconfig.MeshNetworks

	InitDone        atomic.Bool
	initializeMutex sync.Mutex
	ambientIndex    AmbientIndexes
}

PushContext对象的缓存为后续xDS配置的生成提供了快捷的资源索引

5)、sendPushes

// pilot/pkg/xds/discovery.go
func (s *DiscoveryServer) sendPushes(stopCh <-chan struct{}) {
	doSendPushes(stopCh, s.concurrentPushLimit, s.pushQueue)
}

func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {
	for {
		select {
		case <-stopCh:
			return
		default:
			// We can send to it until it is full, then it will block until a pushes finishes and reads from it.
			// This limits the number of pushes that can happen concurrently
			// semaphore默认容量100,用来控制推送并发数
			semaphore <- struct{}{}

			// Get the next proxy to push. This will block if there are no updates required.
			// 从pushQueue出队一个xdsConnection
			client, push, shuttingdown := queue.Dequeue()
			if shuttingdown {
				return
			}
			recordPushTriggers(push.Reason...)
			// Signals that a push is done by reading from the semaphore, allowing another send on it.
			doneFunc := func() {
				queue.MarkDone(client)
				<-semaphore
			}

			proxiesQueueTime.Record(time.Since(push.Start).Seconds())
			var closed <-chan struct{}
			if client.stream != nil {
				closed = client.stream.Context().Done()
			} else {
				closed = client.deltaStream.Context().Done()
			}
			go func() {
				// 初始化pushEvent
				pushEv := &Event{
					pushRequest: push,
					done:        doneFunc,
				}

				select {
				// pushEvent添加到xdsConnection的pushChannel中
				case client.pushChannel <- pushEv:
					return
				case <-closed: // grpc stream was closed
					doneFunc()
					log.Infof("Client closed connection %v", client.conID)
				}
			}()
		}
	}
}

doSendPushes()方法内启动了一个无限循环,在default代码块中实现了主要的功能逻辑:

  1. push事件面向所有xDS客户端,使用semaphore来控制推送并发数,当semaphore满了之后会阻塞
  2. 如果semaphore允许,为每个客户端都启动一个发送协程,尝试发送pushEvent到客户端队列pushChannel中

向pushChannel发送的pushEvent会在StreamAggregatedResources()方法中被处理:

// pilot/pkg/xds/ads.go
func (s *DiscoveryServer) StreamAggregatedResources(stream DiscoveryStream) error {
	return s.Stream(stream)
}

func (s *DiscoveryServer) Stream(stream DiscoveryStream) error {
	if knativeEnv != "" && firstRequest.Load() {
		// How scaling works in knative is the first request is the "loading" request. During
		// loading request, concurrency=1. Once that request is done, concurrency is enabled.
		// However, the XDS stream is long lived, so the first request would block all others. As a
		// result, we should exit the first request immediately; clients will retry.
		firstRequest.Store(false)
		return status.Error(codes.Unavailable, "server warmup not complete; try again")
	}
	// Check if server is ready to accept clients and process new requests.
	// Currently ready means caches have been synced and hence can build
	// clusters correctly. Without this check, InitContext() call below would
	// initialize with empty config, leading to reconnected Envoys loosing
	// configuration. This is an additional safety check inaddition to adding
	// cachesSynced logic to readiness probe to handle cases where kube-proxy
	// ip tables update latencies.
	// See https://github.com/istio/istio/issues/25495.
	if !s.IsServerReady() {
		return status.Error(codes.Unavailable, "server is not ready to serve discovery information")
	}

	ctx := stream.Context()
	peerAddr := "0.0.0.0"
	if peerInfo, ok := peer.FromContext(ctx); ok {
		peerAddr = peerInfo.Addr.String()
	}

	if err := s.WaitForRequestLimit(stream.Context()); err != nil {
		log.Warnf("ADS: %q exceeded rate limit: %v", peerAddr, err)
		return status.Errorf(codes.ResourceExhausted, "request rate limit exceeded: %v", err)
	}

	ids, err := s.authenticate(ctx)
	if err != nil {
		return status.Error(codes.Unauthenticated, err.Error())
	}
	if ids != nil {
		log.Debugf("Authenticated XDS: %v with identity %v", peerAddr, ids)
	} else {
		log.Debugf("Unauthenticated XDS: %s", peerAddr)
	}

	// InitContext returns immediately if the context was already initialized.
	if err = s.globalPushContext().InitContext(s.Env, nil, nil); err != nil {
		// Error accessing the data - log and close, maybe a different pilot replica
		// has more luck
		log.Warnf("Error reading config %v", err)
		return status.Error(codes.Unavailable, "error reading config")
	}
	con := newConnection(peerAddr, stream)

	// Do not call: defer close(con.pushChannel). The push channel will be garbage collected
	// when the connection is no longer used. Closing the channel can cause subtle race conditions
	// with push. According to the spec: "It's only necessary to close a channel when it is important
	// to tell the receiving goroutines that all data have been sent."

	// Block until either a request is received or a push is triggered.
	// We need 2 go routines because 'read' blocks in Recv().
	go s.receive(con, ids)

	// Wait for the proxy to be fully initialized before we start serving traffic. Because
	// initialization doesn't have dependencies that will block, there is no need to add any timeout
	// here. Prior to this explicit wait, we were implicitly waiting by receive() not sending to
	// reqChannel and the connection not being enqueued for pushes to pushChannel until the
	// initialization is complete.
	<-con.initialized

	for {
		// Go select{} statements are not ordered; the same channel can be chosen many times.
		// For requests, these are higher priority (client may be blocked on startup until these are done)
		// and often very cheap to handle (simple ACK), so we check it first.
		select {
		case req, ok := <-con.reqChan:
			if ok {
				if err := s.processRequest(req, con); err != nil {
					return err
				}
			} else {
				// Remote side closed connection or error processing the request.
				return <-con.errorChan
			}
		case <-con.stop:
			return nil
		default:
		}
		// If there wasn't already a request, poll for requests and pushes. Note: if we have a huge
		// amount of incoming requests, we may still send some pushes, as we do not `continue` above;
		// however, requests will be handled ~2x as much as pushes. This ensures a wave of requests
		// cannot completely starve pushes. However, this scenario is unlikely.
		select {
		case req, ok := <-con.reqChan:
			if ok {
				if err := s.processRequest(req, con); err != nil {
					return err
				}
			} else {
				// Remote side closed connection or error processing the request.
				return <-con.errorChan
			}
		case pushEv := <-con.pushChannel:
			// 调用pushConnection推送xds配置
			err := s.pushConnection(con, pushEv)
			// 通知semaphore,本次推送完成
			pushEv.done()
			if err != nil {
				return err
			}
		case <-con.stop:
			return nil
		}
	}
}

每个客户端在通过pushConnection将本次xDS推送完后,都会调用pushEv.done()方法,通知semaphore

从pushQueue到最终推送xDS配置流程如下图:

在这里插入图片描述

6)、xDS配置的生成与分发

pushConnection()方法负责向Envoy发送xDS配置,代码如下:

// pilot/pkg/xds/ads.go
func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {
	pushRequest := pushEv.pushRequest

	if pushRequest.Full {
		// Update Proxy with current information.
		s.computeProxyState(con.proxy, pushRequest)
	}

	// 根据资源的变化情况,判断是否需要为proxy更新xds
	if !s.ProxyNeedsPush(con.proxy, pushRequest) {
		log.Debugf("Skipping push to %v, no updates required", con.conID)
		if pushRequest.Full {
			// Only report for full versions, incremental pushes do not have a new version.
			reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, nil)
		}
		return nil
	}

	// Send pushes to all generators
	// Each Generator is responsible for determining if the push event requires a push
	wrl, ignoreEvents := con.pushDetails()
	// 遍历proxy监听的资源类型
	for _, w := range wrl {
		// 根据订阅的资源类型生成xds配置并发送到客户端
		if err := s.pushXds(con, w, pushRequest); err != nil {
			return err
		}
	}
	if pushRequest.Full {
		// Report all events for unwatched resources. Watched resources will be reported in pushXds or on ack.
		reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, ignoreEvents)
	}

	proxiesConvergeDelay.Record(time.Since(pushRequest.Start).Seconds())
	return nil
}

pushConnection()方法核心逻辑如下:

  1. 根据资源的变化情况,判断是否需要为该Envoy代理更新xDS,如果不需要更新直接返回
  2. 遍历该Envoy代理监听的资源类型,根据订阅的资源类型生成xds配置并发送到客户端

1)ProxyNeedsPush

ProxyNeedsPush函数根据资源的变化情况,判断是否需要为该Envoy代理更新xDS,默认实现为DefaultProxyNeedsPush函数,最主要使用的是根据SidecarProxy判断是否依赖对应资源,核心方法如下:

// pilot/pkg/model/sidecar.go
var (
	sidecarScopeKnownConfigTypes = map[kind.Kind]struct{}{
		kind.ServiceEntry:    {},
		kind.VirtualService:  {},
		kind.DestinationRule: {},
		kind.Sidecar:         {},
	}

	// clusterScopedConfigTypes includes configs when they are in root namespace,
	// they will be applied to all namespaces within the cluster.
	clusterScopedConfigTypes = map[kind.Kind]struct{}{
		kind.EnvoyFilter:           {},
		kind.AuthorizationPolicy:   {},
		kind.RequestAuthentication: {},
		kind.WasmPlugin:            {},
	}
)

func (sc *SidecarScope) DependsOnConfig(config ConfigKey) bool {
	if sc == nil {
		return true
	}

	// This kind of config will trigger a change if made in the root namespace or the same namespace
	// clusterScopedConfigTypes类型配置,如果变更的配置在root namespace或相同namespace需要xds推送
	if _, f := clusterScopedConfigTypes[config.Kind]; f {
		return config.Namespace == sc.RootNamespace || config.Namespace == sc.Namespace
	}

	// This kind of config is unknown to sidecarScope.
	if _, f := sidecarScopeKnownConfigTypes[config.Kind]; !f {
		return true
	}

	// 如果sidecarScope包含对应配置需要进行xds推送
	_, exists := sc.configDependencies[config.HashCode()]
	return exists
}

DependsOnConfig()方法核心逻辑如下:

  1. 如果变更的clusterScopedConfigTypes类型配置在root namespace或相同namespace需要xDS推送
  2. 如果SidecarScope包含对应配置需要进行xDS推送

2)pushXds

// pilot/pkg/xds/xdsgen.go
func (s *DiscoveryServer) pushXds(con *Connection, w *model.WatchedResource, req *model.PushRequest) error {
	if w == nil {
		return nil
	}
	// 获取xds生成器
	gen := s.findGenerator(w.TypeUrl, con)
	if gen == nil {
		return nil
	}

	t0 := time.Now()

	// If delta is set, client is requesting new resources or removing old ones. We should just generate the
	// new resources it needs, rather than the entire set of known resources.
	// Note: we do not need to account for unsubscribed resources as these are handled by parent removal;
	// See https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#deleting-resources.
	// This means if there are only removals, we will not respond.
	var logFiltered string
	if !req.Delta.IsEmpty() && features.PartialFullPushes &&
		!con.proxy.IsProxylessGrpc() {
		logFiltered = " filtered:" + strconv.Itoa(len(w.ResourceNames)-len(req.Delta.Subscribed))
		w = &model.WatchedResource{
			TypeUrl:       w.TypeUrl,
			ResourceNames: req.Delta.Subscribed.UnsortedList(),
		}
	}
	// xds生成器生成xds配置
	res, logdata, err := gen.Generate(con.proxy, w, req)
	info := ""
	if len(logdata.AdditionalInfo) > 0 {
		info = " " + logdata.AdditionalInfo
	}
	if len(logFiltered) > 0 {
		info += logFiltered
	}
	if err != nil || res == nil {
		// If we have nothing to send, report that we got an ACK for this version.
		if s.StatusReporter != nil {
			s.StatusReporter.RegisterEvent(con.conID, w.TypeUrl, req.Push.LedgerVersion)
		}
		if log.DebugEnabled() {
			log.Debugf("%s: SKIP%s for node:%s%s", v3.GetShortType(w.TypeUrl), req.PushReason(), con.proxy.ID, info)
		}

		// If we are sending a request, we must respond or we can get Envoy stuck. Assert we do.
		// One exception is if Envoy is simply unsubscribing from some resources, in which case we can skip.
		isUnsubscribe := features.PartialFullPushes && !req.Delta.IsEmpty() && req.Delta.Subscribed.IsEmpty()
		if features.EnableUnsafeAssertions && err == nil && res == nil && req.IsRequest() && !isUnsubscribe {
			log.Fatalf("%s: SKIPPED%s for node:%s%s but expected a response for request", v3.GetShortType(w.TypeUrl), req.PushReason(), con.proxy.ID, info)
		}
		return err
	}
	defer func() { recordPushTime(w.TypeUrl, time.Since(t0)) }()

	resp := &discovery.DiscoveryResponse{
		ControlPlane: ControlPlane(),
		TypeUrl:      w.TypeUrl,
		// TODO: send different version for incremental eds
		VersionInfo: req.Push.PushVersion,
		Nonce:       nonce(req.Push.LedgerVersion),
		Resources:   model.ResourcesToAny(res),
	}

	configSize := ResourceSize(res)
	configSizeBytes.With(typeTag.Value(w.TypeUrl)).Record(float64(configSize))

	ptype := "PUSH"
	if logdata.Incremental {
		ptype = "PUSH INC"
	}

	// 调用send方法将discoveryResponse发送出去
	if err := con.send(resp); err != nil {
		if recordSendError(w.TypeUrl, err) {
			log.Warnf("%s: Send failure for node:%s resources:%d size:%s%s: %v",
				v3.GetShortType(w.TypeUrl), con.proxy.ID, len(res), util.ByteCount(configSize), info, err)
		}
		return err
	}

	switch {
	case !req.Full:
		if log.DebugEnabled() {
			log.Debugf("%s: %s%s for node:%s resources:%d size:%s%s",
				v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res), util.ByteCount(configSize), info)
		}
	default:
		debug := ""
		if log.DebugEnabled() {
			// Add additional information to logs when debug mode enabled.
			debug = " nonce:" + resp.Nonce + " version:" + resp.VersionInfo
		}
		log.Infof("%s: %s%s for node:%s resources:%d size:%v%s%s", v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res),
			util.ByteCount(ResourceSize(res)), info, debug)
	}

	return nil
}

pushXds()方法首先根据订阅的资源类型找到对应的xDS生成器,然后通过生成器相应的xDS配置,最后通过send接口发送出去

3)XdsResourceGenerator.Generate

Pilot主要负责6种xDS配置资源(CDS、EDS、LDS、RDS、ECDS、NDS)的生成及下发。接来下以CDS生成器为例,看看DiscoveryServer是如何根据代理的属性及PushContext缓存生成原始的Cluster配置的

CDS配置通过ConfigGenerator.BuildClusters()方法生成,代码如下:`

// pilot/pkg/networking/core/v1alpha3/cluster.go
func (configgen *ConfigGeneratorImpl) BuildClusters(proxy *model.Proxy, req *model.PushRequest) ([]*discovery.Resource, model.XdsLogDetails) {
	// In Sotw, we care about all services.
	var services []*model.Service
	if features.FilterGatewayClusterConfig && proxy.Type == model.Router {
		services = req.Push.GatewayServices(proxy)
	} else {
		services = proxy.SidecarScope.Services()
	}
	return configgen.buildClusters(proxy, req, services)
}

func (configgen *ConfigGeneratorImpl) buildClusters(proxy *model.Proxy, req *model.PushRequest,
	services []*model.Service,
) ([]*discovery.Resource, model.XdsLogDetails) {
	clusters := make([]*cluster.Cluster, 0)
	resources := model.Resources{}
	envoyFilterPatches := req.Push.EnvoyFilters(proxy)
	// 创建cluster生成器
	cb := NewClusterBuilder(proxy, req, configgen.Cache)
	instances := proxy.ServiceInstances
	cacheStats := cacheStats{}
	switch proxy.Type {
	// 生成sidecarProxy cluster
	case model.SidecarProxy:
		// Setup outbound clusters
		outboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_OUTBOUND}
		// 构建outbound cluster
		ob, cs := configgen.buildOutboundClusters(cb, proxy, outboundPatcher, services)
		cacheStats = cacheStats.merge(cs)
		resources = append(resources, ob...)
		// Add a blackhole and passthrough cluster for catching traffic to unresolved routes
		// 添加blackhole和passthrough cluster为默认的路由转发流量
		clusters = outboundPatcher.conditionallyAppend(clusters, nil, cb.buildBlackHoleCluster(), cb.buildDefaultPassthroughCluster())
		clusters = append(clusters, outboundPatcher.insertedClusters()...)
		// Setup inbound clusters
		inboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_INBOUND}
		// 构建inbound cluster
		clusters = append(clusters, configgen.buildInboundClusters(cb, proxy, instances, inboundPatcher)...)
		if proxy.EnableHBONE() {
			clusters = append(clusters, configgen.buildInboundHBONEClusters())
		}
		// Pass through clusters for inbound traffic. These cluster bind loopback-ish src address to access node local service.
		// 添加passthrough cluster为默认的路由转发流量
		clusters = inboundPatcher.conditionallyAppend(clusters, nil, cb.buildInboundPassthroughClusters()...)
		clusters = append(clusters, inboundPatcher.insertedClusters()...)
	case model.Waypoint:
		svcs := findWaypointServices(proxy, req.Push)
		// Waypoint proxies do not need outbound clusters in most cases, unless we have a route pointing to something
		outboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_OUTBOUND}
		ob, cs := configgen.buildOutboundClusters(cb, proxy, outboundPatcher, filterWaypointOutboundServices(req.Push.ServicesAttachedToMesh(), svcs, services))
		cacheStats = cacheStats.merge(cs)
		resources = append(resources, ob...)
		// Setup inbound clusters
		inboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_INBOUND}
		clusters = append(clusters, configgen.buildWaypointInboundClusters(cb, proxy, req.Push, svcs)...)
		clusters = append(clusters, inboundPatcher.insertedClusters()...)
	default: // Gateways
		patcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_GATEWAY}
		ob, cs := configgen.buildOutboundClusters(cb, proxy, patcher, services)
		cacheStats = cacheStats.merge(cs)
		resources = append(resources, ob...)
		// Gateways do not require the default passthrough cluster as they do not have original dst listeners.
		clusters = patcher.conditionallyAppend(clusters, nil, cb.buildBlackHoleCluster())
		if proxy.Type == model.Router && proxy.MergedGateway != nil && proxy.MergedGateway.ContainsAutoPassthroughGateways {
			clusters = append(clusters, configgen.buildOutboundSniDnatClusters(proxy, req, patcher)...)
		}
		clusters = append(clusters, patcher.insertedClusters()...)
	}

	// OutboundTunnel cluster is needed for sidecar and gateway.
	if proxy.EnableHBONE() {
		clusters = append(clusters, cb.buildConnectOriginate(proxy, req.Push, nil))
	}

	// if credential socket exists, create a cluster for it
	if proxy.Metadata != nil && proxy.Metadata.Raw[security.CredentialMetaDataName] == "true" {
		clusters = append(clusters, cb.buildExternalSDSCluster(security.CredentialNameSocketPath))
	}
	for _, c := range clusters {
		resources = append(resources, &discovery.Resource{Name: c.Name, Resource: protoconv.MessageToAny(c)})
	}
	resources = cb.normalizeClusters(resources)

	if cacheStats.empty() {
		return resources, model.DefaultXdsLogDetails
	}
	return resources, model.XdsLogDetails{AdditionalInfo: fmt.Sprintf("cached:%v/%v", cacheStats.hits, cacheStats.hits+cacheStats.miss)}
}

7)、响应Envoy主动发起的xDS请求

StreamAggregatedResources()方法中开启receive协程接收Envoy的xDS请求,代码如下:

// pilot/pkg/xds/ads.go
func (s *DiscoveryServer) receive(con *Connection, identities []string) {
	defer func() {
		close(con.errorChan)
		close(con.reqChan)
		// Close the initialized channel, if its not already closed, to prevent blocking the stream.
		select {
		case <-con.initialized:
		default:
			close(con.initialized)
		}
	}()

	firstRequest := true
	for {
		// 接收discoveryRequest
		req, err := con.stream.Recv()
		if err != nil {
			if istiogrpc.IsExpectedGRPCError(err) {
				log.Infof("ADS: %q %s terminated", con.peerAddr, con.conID)
				return
			}
			con.errorChan <- err
			log.Errorf("ADS: %q %s terminated with error: %v", con.peerAddr, con.conID, err)
			totalXDSInternalErrors.Increment()
			return
		}
		// This should be only set for the first request. The node id may not be set - for example malicious clients.
		if firstRequest {
			// probe happens before envoy sends first xDS request
			if req.TypeUrl == v3.HealthInfoType {
				log.Warnf("ADS: %q %s send health check probe before normal xDS request", con.peerAddr, con.conID)
				continue
			}
			firstRequest = false
			if req.Node == nil || req.Node.Id == "" {
				con.errorChan <- status.New(codes.InvalidArgument, "missing node information").Err()
				return
			}
			if err := s.initConnection(req.Node, con, identities); err != nil {
				con.errorChan <- err
				return
			}
			defer s.closeConnection(con)
			log.Infof("ADS: new connection for node:%s", con.conID)
		}

		select {
		// 将discoveryRequest添加到reqChan
		case con.reqChan <- req:
		case <-con.stream.Context().Done():
			log.Infof("ADS: %q %s terminated with stream closed", con.peerAddr, con.conID)
			return
		}
	}
}

receive()方法中接收Envoy的DiscoveryRequest,然后将DiscoveryRequest添加到reqChan中

StreamAggregatedResources()方法中从reqChan中获取DiscoveryRequest,然后调用processRequest()方法:

// pilot/pkg/xds/ads.go
func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con *Connection) error {
	stype := v3.GetShortType(req.TypeUrl)
	log.Debugf("ADS:%s: REQ %s resources:%d nonce:%s version:%s ", stype,
		con.conID, len(req.ResourceNames), req.ResponseNonce, req.VersionInfo)
	if req.TypeUrl == v3.HealthInfoType {
		s.handleWorkloadHealthcheck(con.proxy, req)
		return nil
	}

	// For now, don't let xDS piggyback debug requests start watchers.
	if strings.HasPrefix(req.TypeUrl, v3.DebugType) {
		return s.pushXds(con,
			&model.WatchedResource{TypeUrl: req.TypeUrl, ResourceNames: req.ResourceNames},
			&model.PushRequest{Full: true, Push: con.proxy.LastPushContext})
	}
	if s.StatusReporter != nil {
		s.StatusReporter.RegisterEvent(con.conID, req.TypeUrl, req.ResponseNonce)
	}
	shouldRespond, delta := s.shouldRespond(con, req)
	if !shouldRespond {
		return nil
	}

	request := &model.PushRequest{
		Full:   true,
		Push:   con.proxy.LastPushContext,
		Reason: []model.TriggerReason{model.ProxyRequest},

		// The usage of LastPushTime (rather than time.Now()), is critical here for correctness; This time
		// is used by the XDS cache to determine if a entry is stale. If we use Now() with an old push context,
		// we may end up overriding active cache entries with stale ones.
		Start: con.proxy.LastPushTime,
		Delta: delta,
	}

	// SidecarScope for the proxy may not have been updated based on this pushContext.
	// It can happen when `processRequest` comes after push context has been updated(s.initPushContext),
	// but proxy's SidecarScope has been updated(s.computeProxyState -> SetSidecarScope) due to optimizations that skip sidecar scope
	// computation.
	if con.proxy.SidecarScope != nil && con.proxy.SidecarScope.Version != request.Push.PushVersion {
		s.computeProxyState(con.proxy, request)
	}
	// 推送xds配置
	return s.pushXds(con, con.Watched(req.TypeUrl), request)
}

processRequest()方法中根据DiscoveryRequest信息推送xDS配置

8)、小结

配置变化后向Envoy推送xDS时序:

在这里插入图片描述

响应Envoy主动发起的xDS时序:

在这里插入图片描述

参考:

《Istio权威指南 下》

4.深入Istio源码:Pilot的Discovery Server如何执行xDS异步分发?

Istio Pilot代码深度解析

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/779987.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用 ChatGPT 碰到的坑

最近在使用 ChatGPT 的时候碰到一个小坑&#xff0c;因为某些特殊情况我需要使用 syslog 向 logbeat 中发送日志。 由于这是一个比较古老的协议&#xff0c;确实也没接触过&#xff0c;所以就想着让 ChatGPT 帮我生成个例子。 原本我已经在 Go 中将这个流程跑通&#xff0c;所…

快速排序qsort讲解

hello大家好&#xff0c;我是c语言boom家宝&#xff0c;今天为大家分享的博客内容是qsort快速排序&#xff0c;简称快排的一个知识点的讲解。 在讲到快排之前&#xff0c;允许博主先提一嘴冒泡排序。大家在c语言的学习过程中&#xff0c;冒泡排序是必不可少会学习到的一个思想&…

Kafka - Primie Number of Partitions Issue Consumer Group Rebalance

文章目录 生产者&#xff1a;将数据写入 Kafka 的客户端。 消费者&#xff1a;从 Kafka 中读取数据的客户端。 Topic&#xff1a;Kafka 中用于组织和存储数据的逻辑概念&#xff0c;类似于数据库表。 Record&#xff1a;发送到 Topic 的消息称为 Record。 Partition&#x…

基于深度学习的高精度交通信号灯检测系统(PyTorch+Pyside6+YOLOv5模型)

摘要&#xff1a;基于深度学习的高精度交通信号灯检测识别可用于日常生活中检测与定位交通信号灯目标&#xff0c;利用深度学习算法可实现图片、视频、摄像头等方式的交通信号灯目标检测识别&#xff0c;另外支持结果可视化与图片或视频检测结果的导出。本系统采用YOLOv5目标检…

斯坦福数据挖掘教程·第三版》读书笔记(英文版)Chapter 13 Neural Nets and Deep Learning

来源&#xff1a;《斯坦福数据挖掘教程第三版》对应的公开英文书和PPT Chapter 13 Neural Nets and Deep Learning In this chapter, we shall consider the design of neural nets, which are collections of perceptrons, or nodes, where the outputs of one rank (or lay…

C# 学习笔记

不再是学生了&#xff0c;成了社畜了&#xff0c;公司主要技术栈是C# 大一时候学C#学的很迷糊&#xff0c;总要重新学一下 入职已经20天了&#xff0c;也开始上手简单增删改查了 记录了一些C#相关的东西&#xff0c;只是还没有系统整理 WinForm 控件命名规范 ADO.NET 连接…

爬虫-微博个人主页的获取

我们在利用爬虫爬取微博个人主页的时候&#xff0c;我们需要获取到个人页面的cookie才能进入到微博的个人主页&#xff0c;否则的话将会是一直跳转到登录页面而导致不能进入个人主页。 import urllib.request url #自己微博个人主页的源代码 headers {User-Agent:Mozilla/5.…

办公软件ppt的制作

毕业找工作太难了&#xff0c;赶紧多学点什么东西吧&#xff0c;今天开始办公软件ppt的制作学习。 本文以WPS作为默认办公软件&#xff0c;问为什么不是PowerPoint&#xff0c;问就是没钱买不起&#xff0c;绝对不是不会破解的原因。 一.认识软件 在快捷工具栏中顾名思义就是一…

什么是框架?为什么要学框架?

一、什么是框架 框架是整个或部分应用的可重用设计&#xff0c;是可定制化的应用骨架。它可以帮开发人员简化开发过程&#xff0c;提高开发效率。 项目里有一部分代码&#xff1a;和业务无关&#xff0c;而又不得不写的代码>框架 项目里剩下的部分代码&#xff1a;实现业务…

机器学习:Bert and its family

Bert 先用无监督的语料去训练通用模型&#xff0c;然后再针对小任务进行专项训练学习。 ELMoBertERNIEGroverBert&PALS Outline Pre-train Model 首先介绍预训练模型&#xff0c;预训练模型的作用是将一些token表示成一个vector 比如&#xff1a; Word2vecGlove 但是对于…

Qt Creator创建控制台项目显示中文乱码

今天在使用Qt Creator创建c项目的时候显示中文乱码&#xff0c;这里分享一下解决办法&#xff0c;主要是由于我们的电脑大部分是GBK编码格式的是&#xff0c;然后Qt默认创建的一般是utf-8编码类型的。编码类型不一致就会导致中文乱码的现象。 从控制台的属性可以看到我们的程序…

Observability:Synthetic monitoring - 动手实践

在我之前的如下文章里&#xff1a; Observability&#xff1a;Synthetic monitoring - 合成监测入门&#xff08;一&#xff09;&#xff08;二&#xff09; Observability&#xff1a;Synthetic monitoring - 创建浏览器监测&#xff0c;配置单独的浏览器监测器及项目 我详…

基于RASC的keil电子时钟制作(瑞萨RA)(3)----使用J-Link烧写程序到瑞萨芯片

基于RASC的keil电子时钟制作3_使用J-Link烧写程序到瑞萨芯片 概述硬件准备视频教程软件准备hex文件准备J-Link与瑞萨开发板进行SWD方式接线烧录 概述 这一节主要讲解如何使用J-Link对瑞萨RA芯片进行烧录。 硬件准备 首先需要准备一个开发板&#xff0c;这里我准备的是芯片型…

【Node.js 安装】Node.js安装与使用教程

Node.js 安装 Node.js 是什么那什么是运行时 如何安装 Node.jsNode 使用教程 Node.js 是什么 先说结论&#xff0c;Node.js 它是一套 JavaScript 运行环境&#xff0c;用来支持 JavaScript 代码的执行 JavaScript 诞生于 1995 年&#xff0c;几乎是和互联网同时出现&#xf…

leetcode-206.反转链表

leetcode-206.反转链表 文章目录 leetcode-206.反转链表一.题目描述二.代码提交三.易错点 一.题目描述 二.代码提交 代码 class Solution {public:ListNode *reverseList(ListNode *head) {ListNode *temp; // 保存cur的下一个节点ListNode *cur head;ListNode *pre nullptr…

scikit-learn集成学习代码批注及相关练习

一、代码批注 代码来自&#xff1a;https://scikit-learn.org/stable/auto_examples/ensemble/plot_adaboost_twoclass.html#sphx-glr-auto-examples-ensemble-plot-adaboost-twoclass-py import numpy as np import matplotlib.pyplot as plt from sklearn.ensemble import …

【stable diffusion】保姆级入门课程02-Stable diffusion(SD)图生图-基础图生图用法

目录 学前视频 0.本章素材 1.图生图是什么 2.图生图能做什么 3.如何使用图生图 4.功能区域 4.1.提示词区域 4.2.图片提示词反推区域 1.CLIP反推 2.DeepBooru 反推 4.3.图片上传区域 4.4.结果图区域 4.5.缩放模式 4.6.重绘幅度 7.结语 8.课后训练 学前视频 …

【Ranking】50 Matplotlib Visualizations, Python实现,源码可复现

详情请参考博客: Top 50 matplotlib Visualizations 因编译更新问题&#xff0c;本文将稍作更改&#xff0c;以便能够顺利运行。 1 Ordered Bar Chart 有序条形图有效地传达项目的排名顺序。但是&#xff0c;将图表上方的指标值相加&#xff0c;用户将从图表本身获得准确的信息…

制造业想要数字化转型应该从哪方面入手?

制造业可以通过关注以下几个关键领域来开启数字化转型之旅&#xff1a; 数据收集和分析&#xff1a;实施系统收集和分析来自各种来源&#xff08;例如机器、传感器和生产过程&#xff09;的数据至关重要。这些数据可以提供有关运营效率、质量控制和预测性维护的见解。 物联网&…

Flask Bootstrap 导航条

(43条消息) Flask 导航栏&#xff0c;模版渲染多页面_U盘失踪了的博客-CSDN博客 (43条消息) 学习记录&#xff1a;Bootstrap 导航条示例_bootstrap导航栏案例_U盘失踪了的博客-CSDN博客 1&#xff0c;引用Bootstrap css样式&#xff0c;导航栏页面跳转 2&#xff0c;页面两列…