本文基于Istio 1.18.0版本进行源码学习
5、xDS的异步分发
DiscoveryService主要包含下述逻辑:
- 启动GRPC Server并接收来自Envoy端的连接请求
- 接收Envoy端的xDS请求,从ConfigController和ServiceController中获取配置和服务信息,生成响应消息发送给Envoy
- 监听来自ConfigController的配置变化消息和ServiceController的服务变化消息,并将配置和服务变化内容通过xDS接口推送到Envoy
1)、DiscoveryService初始化
DiscoveryService初始化流程如下图:
1)xds.NewDiscoveryServer
DiscoveryServer是通过调用xds.NewDiscoveryServer()
方法初始化的,返回的是一个DiscoveryServer实例。DiscoveryServer定义如下:
// pilot/pkg/xds/discovery.go
type DiscoveryServer struct {
// Env is the model environment.
Env *model.Environment
// ConfigGenerator is responsible for generating data plane configuration using Istio networking
// APIs and service registry info
// xds配置的生成器
ConfigGenerator core.ConfigGenerator
// Generators allow customizing the generated config, based on the client metadata.
// Key is the generator type - will match the Generator metadata to set the per-connection
// default generator, or the combination of Generator metadata and TypeUrl to select a
// different generator for a type.
// Normal istio clients use the default generator - will not be impacted by this.
// 针对不同配置类型的生成器
Generators map[string]model.XdsResourceGenerator
// ProxyNeedsPush is a function that determines whether a push can be completely skipped. Individual generators
// may also choose to not send any updates.
// 判断本次推送是否涉及该envoy
ProxyNeedsPush func(proxy *model.Proxy, req *model.PushRequest) bool
// concurrentPushLimit is a semaphore that limits the amount of concurrent XDS pushes.
// 控制推送并发数
concurrentPushLimit chan struct{}
// RequestRateLimit limits the number of new XDS requests allowed. This helps prevent thundering hurd of incoming requests.
RequestRateLimit *rate.Limiter
// InboundUpdates describes the number of configuration updates the discovery server has received
InboundUpdates *atomic.Int64
// CommittedUpdates describes the number of configuration updates the discovery server has
// received, process, and stored in the push context. If this number is less than InboundUpdates,
// there are updates we have not yet processed.
// Note: This does not mean that all proxies have received these configurations; it is strictly
// the push context, which means that the next push to a proxy will receive this configuration.
CommittedUpdates *atomic.Int64
// pushChannel is the buffer used for debouncing.
// after debouncing the pushRequest will be sent to pushQueue
// 统一接收其他组件发来的pushRequest的channel
pushChannel chan *model.PushRequest
// mutex used for protecting Environment.PushContext
updateMutex sync.RWMutex
// pushQueue is the buffer that used after debounce and before the real xds push.
// 主要是在真正xds推送前做防抖缓存
pushQueue *PushQueue
// debugHandlers is the list of all the supported debug handlers.
debugHandlers map[string]string
// adsClients reflect active gRPC channels, for both ADS and EDS.
// ads和eds的grpc连接
adsClients map[string]*Connection
adsClientsMutex sync.RWMutex
StatusReporter DistributionStatusCache
// Authenticators for XDS requests. Should be same/subset of the CA authenticators.
Authenticators []security.Authenticator
// StatusGen is notified of connect/disconnect/nack on all connections
StatusGen *StatusGen
WorkloadEntryController *autoregistration.Controller
// serverReady indicates caches have been synced up and server is ready to process requests.
serverReady atomic.Bool
debounceOptions debounceOptions
instanceID string
clusterID cluster.ID
// Cache for XDS resources
// xds资源缓存
Cache model.XdsCache
// JwtKeyResolver holds a reference to the JWT key resolver instance.
JwtKeyResolver *model.JwksResolver
// ListRemoteClusters collects debug information about other clusters this istiod reads from.
ListRemoteClusters func() []cluster.DebugInfo
// ClusterAliases are aliase names for cluster. When a proxy connects with a cluster ID
// and if it has a different alias we should use that a cluster ID for proxy.
ClusterAliases map[cluster.ID]cluster.ID
}
2)initGrpcServer
在初始化GRPC Server时,调用了XDSServer.Register()
方法,向GRPC Server注册服务:
// pilot/pkg/xds/discovery.go
func (s *DiscoveryServer) Register(rpcs *grpc.Server) {
// Register v3 server
discovery.RegisterAggregatedDiscoveryServiceServer(rpcs, s)
}
DiscoveryServer实现了AggregatedDiscoveryServiceServer接口:
// envoy/service/discovery/v3/ads.pb.go
type AggregatedDiscoveryServiceServer interface {
// This is a gRPC-only API.
// 全量ads stream接口
StreamAggregatedResources(AggregatedDiscoveryService_StreamAggregatedResourcesServer) error
// 增量ads stream接口
DeltaAggregatedResources(AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error
}
StreamAggregatedResources接收DiscoveryRequest,返回DiscoveryResponse流,包含全量的xDS数据
调用流程如下图:
3)initDiscoveryService
initDiscoveryService()
方法中将DiscoveryServer启动函数添加到Server的startFuncs队列中,会在初始化完毕之后调用:
// pilot/pkg/bootstrap/server.go
func (s *Server) initDiscoveryService() {
log.Infof("starting discovery service")
// Implement EnvoyXdsServer grace shutdown
s.addStartFunc(func(stop <-chan struct{}) error {
log.Infof("Starting ADS server")
s.XDSServer.Start(stop)
return nil
})
}
2)、DiscoveryService启动
DiscoveryServer的Start()
方法主要分别启动了四个协程:
// pilot/pkg/xds/discovery.go
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
go s.WorkloadEntryController.Run(stopCh)
go s.handleUpdates(stopCh)
go s.periodicRefreshMetrics(stopCh)
go s.sendPushes(stopCh)
go s.Cache.Run(stopCh)
}
比较重要的是handleUpdates和sendPushes
Config、Service、Endpoint对资源的处理最后都会调用ConfigUpdate()
方法向DiscoveryServer的pushChannel队列发送PushRequest实现的,处理流程如下:
DiscoveryServer首先通过handleUpdates协程阻塞式地接收并处理更新请求,并将PushRequest发送到DiscoveryServer的pushQueue中,然后由sendPushes协程并发地将PushRequest发送给每一条连接的pushChannel,最后由DiscoveryServer的流处理接口处理分发请求
3)、handleUpdates
// pilot/pkg/xds/discovery.go
func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) {
debounce(s.pushChannel, stopCh, s.debounceOptions, s.Push, s.CommittedUpdates)
}
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, opts debounceOptions, pushFn func(req *model.PushRequest), updateSent *atomic.Int64) {
var timeChan <-chan time.Time
var startDebounce time.Time
var lastConfigUpdateTime time.Time
pushCounter := 0
debouncedEvents := 0
// Keeps track of the push requests. If updates are debounce they will be merged.
var req *model.PushRequest
free := true
freeCh := make(chan struct{}, 1)
push := func(req *model.PushRequest, debouncedEvents int, startDebounce time.Time) {
pushFn(req)
updateSent.Add(int64(debouncedEvents))
debounceTime.Record(time.Since(startDebounce).Seconds())
freeCh <- struct{}{}
}
pushWorker := func() {
eventDelay := time.Since(startDebounce)
quietTime := time.Since(lastConfigUpdateTime)
// it has been too long or quiet enough
// 当以下两个条件满足任意一个时,进行更新事件处理
// 距离本轮第1次更新时间超过最大延迟时间(debounceMax默认10s)
// 距离上次更新时间超过最大静默时间(debounceAfter默认100ms)
if eventDelay >= opts.debounceMax || quietTime >= opts.debounceAfter {
if req != nil {
pushCounter++
if req.ConfigsUpdated == nil {
log.Infof("Push debounce stable[%d] %d for reason %s: %v since last change, %v since last push, full=%v",
pushCounter, debouncedEvents, reasonsUpdated(req),
quietTime, eventDelay, req.Full)
} else {
log.Infof("Push debounce stable[%d] %d for config %s: %v since last change, %v since last push, full=%v",
pushCounter, debouncedEvents, configsUpdated(req),
quietTime, eventDelay, req.Full)
}
free = false
go push(req, debouncedEvents, startDebounce)
req = nil
debouncedEvents = 0
}
} else {
timeChan = time.After(opts.debounceAfter - quietTime)
}
}
for {
select {
case <-freeCh:
free = true
pushWorker()
case r := <-ch:
// If reason is not set, record it as an unknown reason
if len(r.Reason) == 0 {
r.Reason = []model.TriggerReason{model.UnknownTrigger}
}
if !opts.enableEDSDebounce && !r.Full {
// 立即触发eds推送
// trigger push now, just for EDS
go func(req *model.PushRequest) {
pushFn(req)
updateSent.Inc()
}(r)
continue
}
lastConfigUpdateTime = time.Now()
if debouncedEvents == 0 {
// 启动新一轮的配置下发定时器,定时长度为最小静默时间
timeChan = time.After(opts.debounceAfter)
// 记录第1次事件更新的时间
startDebounce = lastConfigUpdateTime
}
debouncedEvents++
// 合并pushRequest
req = req.Merge(r)
case <-timeChan:
if free {
pushWorker()
}
case <-stopCh:
return
}
}
}
debounce()
方法中通过最小静默时间(debounceAfter)合并更新时间,通过最大延迟时间(debounceMax)控制xDS配置下发的时延。由于DiscoveryServer.Push需要初始化PushContext,会消耗大量内存,所以为了避免OOM,debounce()
方法中控制DiscoveryServer.Push串行执行
DiscoveryServer.Push方法会一直往下调用,直到把数据推入到DiscoveryServer的pushQueue管道中,代码调用逻辑如下:
3)、PushContext的初始化
PushContext是xDS生成中最重要的结构对象,几乎包含所有网格资源信息,PushContext结构体定义如下:
// pilot/pkg/model/push_context.go
type PushContext struct {
proxyStatusMutex sync.RWMutex
// ProxyStatus is keyed by the error code, and holds a map keyed
// by the ID.
ProxyStatus map[string]map[string]ProxyPushStatus
// Synthesized from env.Mesh
exportToDefaults exportToDefaults
// ServiceIndex is the index of services by various fields.
// service缓存
ServiceIndex serviceIndex
// serviceAccounts contains a map of hostname and port to service accounts.
serviceAccounts map[serviceAccountKey][]string
// virtualServiceIndex is the index of virtual services by various fields.
// virtualService缓存
virtualServiceIndex virtualServiceIndex
// destinationRuleIndex is the index of destination rules by various fields.
// destinationRule缓存
destinationRuleIndex destinationRuleIndex
// gatewayIndex is the index of gateways.
gatewayIndex gatewayIndex
// clusterLocalHosts extracted from the MeshConfig
clusterLocalHosts ClusterLocalHosts
// sidecarIndex stores sidecar resources
// sidecar缓存
sidecarIndex sidecarIndex
// envoy filters for each namespace including global config namespace
// envoyFilter缓存
envoyFiltersByNamespace map[string][]*EnvoyFilterWrapper
// wasm plugins for each namespace including global config namespace
wasmPluginsByNamespace map[string][]*WasmPluginWrapper
// AuthnPolicies contains Authn policies by namespace.
AuthnPolicies *AuthenticationPolicies `json:"-"`
// AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there
// are no authorization policies in the cluster.
AuthzPolicies *AuthorizationPolicies `json:"-"`
// Telemetry stores the existing Telemetry resources for the cluster.
Telemetry *Telemetries `json:"-"`
// ProxyConfig stores the existing ProxyConfig resources for the cluster.
ProxyConfigs *ProxyConfigs `json:"-"`
// The following data is either a global index or used in the inbound path.
// Namespace specific views do not apply here.
// Mesh configuration for the mesh.
Mesh *meshconfig.MeshConfig `json:"-"`
// PushVersion describes the push version this push context was computed for
PushVersion string
// LedgerVersion is the version of the configuration ledger
LedgerVersion string
// JwtKeyResolver holds a reference to the JWT key resolver instance.
JwtKeyResolver *JwksResolver
// GatewayAPIController holds a reference to the gateway API controller.
GatewayAPIController GatewayController
// cache gateways addresses for each network
// this is mainly used for kubernetes multi-cluster scenario
networkMgr *NetworkManager
Networks *meshconfig.MeshNetworks
InitDone atomic.Bool
initializeMutex sync.Mutex
ambientIndex AmbientIndexes
}
PushContext对象的缓存为后续xDS配置的生成提供了快捷的资源索引
5)、sendPushes
// pilot/pkg/xds/discovery.go
func (s *DiscoveryServer) sendPushes(stopCh <-chan struct{}) {
doSendPushes(stopCh, s.concurrentPushLimit, s.pushQueue)
}
func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {
for {
select {
case <-stopCh:
return
default:
// We can send to it until it is full, then it will block until a pushes finishes and reads from it.
// This limits the number of pushes that can happen concurrently
// semaphore默认容量100,用来控制推送并发数
semaphore <- struct{}{}
// Get the next proxy to push. This will block if there are no updates required.
// 从pushQueue出队一个xdsConnection
client, push, shuttingdown := queue.Dequeue()
if shuttingdown {
return
}
recordPushTriggers(push.Reason...)
// Signals that a push is done by reading from the semaphore, allowing another send on it.
doneFunc := func() {
queue.MarkDone(client)
<-semaphore
}
proxiesQueueTime.Record(time.Since(push.Start).Seconds())
var closed <-chan struct{}
if client.stream != nil {
closed = client.stream.Context().Done()
} else {
closed = client.deltaStream.Context().Done()
}
go func() {
// 初始化pushEvent
pushEv := &Event{
pushRequest: push,
done: doneFunc,
}
select {
// pushEvent添加到xdsConnection的pushChannel中
case client.pushChannel <- pushEv:
return
case <-closed: // grpc stream was closed
doneFunc()
log.Infof("Client closed connection %v", client.conID)
}
}()
}
}
}
doSendPushes()
方法内启动了一个无限循环,在default代码块中实现了主要的功能逻辑:
- push事件面向所有xDS客户端,使用semaphore来控制推送并发数,当semaphore满了之后会阻塞
- 如果semaphore允许,为每个客户端都启动一个发送协程,尝试发送pushEvent到客户端队列pushChannel中
向pushChannel发送的pushEvent会在StreamAggregatedResources()
方法中被处理:
// pilot/pkg/xds/ads.go
func (s *DiscoveryServer) StreamAggregatedResources(stream DiscoveryStream) error {
return s.Stream(stream)
}
func (s *DiscoveryServer) Stream(stream DiscoveryStream) error {
if knativeEnv != "" && firstRequest.Load() {
// How scaling works in knative is the first request is the "loading" request. During
// loading request, concurrency=1. Once that request is done, concurrency is enabled.
// However, the XDS stream is long lived, so the first request would block all others. As a
// result, we should exit the first request immediately; clients will retry.
firstRequest.Store(false)
return status.Error(codes.Unavailable, "server warmup not complete; try again")
}
// Check if server is ready to accept clients and process new requests.
// Currently ready means caches have been synced and hence can build
// clusters correctly. Without this check, InitContext() call below would
// initialize with empty config, leading to reconnected Envoys loosing
// configuration. This is an additional safety check inaddition to adding
// cachesSynced logic to readiness probe to handle cases where kube-proxy
// ip tables update latencies.
// See https://github.com/istio/istio/issues/25495.
if !s.IsServerReady() {
return status.Error(codes.Unavailable, "server is not ready to serve discovery information")
}
ctx := stream.Context()
peerAddr := "0.0.0.0"
if peerInfo, ok := peer.FromContext(ctx); ok {
peerAddr = peerInfo.Addr.String()
}
if err := s.WaitForRequestLimit(stream.Context()); err != nil {
log.Warnf("ADS: %q exceeded rate limit: %v", peerAddr, err)
return status.Errorf(codes.ResourceExhausted, "request rate limit exceeded: %v", err)
}
ids, err := s.authenticate(ctx)
if err != nil {
return status.Error(codes.Unauthenticated, err.Error())
}
if ids != nil {
log.Debugf("Authenticated XDS: %v with identity %v", peerAddr, ids)
} else {
log.Debugf("Unauthenticated XDS: %s", peerAddr)
}
// InitContext returns immediately if the context was already initialized.
if err = s.globalPushContext().InitContext(s.Env, nil, nil); err != nil {
// Error accessing the data - log and close, maybe a different pilot replica
// has more luck
log.Warnf("Error reading config %v", err)
return status.Error(codes.Unavailable, "error reading config")
}
con := newConnection(peerAddr, stream)
// Do not call: defer close(con.pushChannel). The push channel will be garbage collected
// when the connection is no longer used. Closing the channel can cause subtle race conditions
// with push. According to the spec: "It's only necessary to close a channel when it is important
// to tell the receiving goroutines that all data have been sent."
// Block until either a request is received or a push is triggered.
// We need 2 go routines because 'read' blocks in Recv().
go s.receive(con, ids)
// Wait for the proxy to be fully initialized before we start serving traffic. Because
// initialization doesn't have dependencies that will block, there is no need to add any timeout
// here. Prior to this explicit wait, we were implicitly waiting by receive() not sending to
// reqChannel and the connection not being enqueued for pushes to pushChannel until the
// initialization is complete.
<-con.initialized
for {
// Go select{} statements are not ordered; the same channel can be chosen many times.
// For requests, these are higher priority (client may be blocked on startup until these are done)
// and often very cheap to handle (simple ACK), so we check it first.
select {
case req, ok := <-con.reqChan:
if ok {
if err := s.processRequest(req, con); err != nil {
return err
}
} else {
// Remote side closed connection or error processing the request.
return <-con.errorChan
}
case <-con.stop:
return nil
default:
}
// If there wasn't already a request, poll for requests and pushes. Note: if we have a huge
// amount of incoming requests, we may still send some pushes, as we do not `continue` above;
// however, requests will be handled ~2x as much as pushes. This ensures a wave of requests
// cannot completely starve pushes. However, this scenario is unlikely.
select {
case req, ok := <-con.reqChan:
if ok {
if err := s.processRequest(req, con); err != nil {
return err
}
} else {
// Remote side closed connection or error processing the request.
return <-con.errorChan
}
case pushEv := <-con.pushChannel:
// 调用pushConnection推送xds配置
err := s.pushConnection(con, pushEv)
// 通知semaphore,本次推送完成
pushEv.done()
if err != nil {
return err
}
case <-con.stop:
return nil
}
}
}
每个客户端在通过pushConnection将本次xDS推送完后,都会调用pushEv.done()
方法,通知semaphore
从pushQueue到最终推送xDS配置流程如下图:
6)、xDS配置的生成与分发
pushConnection()
方法负责向Envoy发送xDS配置,代码如下:
// pilot/pkg/xds/ads.go
func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {
pushRequest := pushEv.pushRequest
if pushRequest.Full {
// Update Proxy with current information.
s.computeProxyState(con.proxy, pushRequest)
}
// 根据资源的变化情况,判断是否需要为proxy更新xds
if !s.ProxyNeedsPush(con.proxy, pushRequest) {
log.Debugf("Skipping push to %v, no updates required", con.conID)
if pushRequest.Full {
// Only report for full versions, incremental pushes do not have a new version.
reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, nil)
}
return nil
}
// Send pushes to all generators
// Each Generator is responsible for determining if the push event requires a push
wrl, ignoreEvents := con.pushDetails()
// 遍历proxy监听的资源类型
for _, w := range wrl {
// 根据订阅的资源类型生成xds配置并发送到客户端
if err := s.pushXds(con, w, pushRequest); err != nil {
return err
}
}
if pushRequest.Full {
// Report all events for unwatched resources. Watched resources will be reported in pushXds or on ack.
reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, ignoreEvents)
}
proxiesConvergeDelay.Record(time.Since(pushRequest.Start).Seconds())
return nil
}
pushConnection()
方法核心逻辑如下:
- 根据资源的变化情况,判断是否需要为该Envoy代理更新xDS,如果不需要更新直接返回
- 遍历该Envoy代理监听的资源类型,根据订阅的资源类型生成xds配置并发送到客户端
1)ProxyNeedsPush
ProxyNeedsPush函数根据资源的变化情况,判断是否需要为该Envoy代理更新xDS,默认实现为DefaultProxyNeedsPush函数,最主要使用的是根据SidecarProxy判断是否依赖对应资源,核心方法如下:
// pilot/pkg/model/sidecar.go
var (
sidecarScopeKnownConfigTypes = map[kind.Kind]struct{}{
kind.ServiceEntry: {},
kind.VirtualService: {},
kind.DestinationRule: {},
kind.Sidecar: {},
}
// clusterScopedConfigTypes includes configs when they are in root namespace,
// they will be applied to all namespaces within the cluster.
clusterScopedConfigTypes = map[kind.Kind]struct{}{
kind.EnvoyFilter: {},
kind.AuthorizationPolicy: {},
kind.RequestAuthentication: {},
kind.WasmPlugin: {},
}
)
func (sc *SidecarScope) DependsOnConfig(config ConfigKey) bool {
if sc == nil {
return true
}
// This kind of config will trigger a change if made in the root namespace or the same namespace
// clusterScopedConfigTypes类型配置,如果变更的配置在root namespace或相同namespace需要xds推送
if _, f := clusterScopedConfigTypes[config.Kind]; f {
return config.Namespace == sc.RootNamespace || config.Namespace == sc.Namespace
}
// This kind of config is unknown to sidecarScope.
if _, f := sidecarScopeKnownConfigTypes[config.Kind]; !f {
return true
}
// 如果sidecarScope包含对应配置需要进行xds推送
_, exists := sc.configDependencies[config.HashCode()]
return exists
}
DependsOnConfig()
方法核心逻辑如下:
- 如果变更的clusterScopedConfigTypes类型配置在root namespace或相同namespace需要xDS推送
- 如果SidecarScope包含对应配置需要进行xDS推送
2)pushXds
// pilot/pkg/xds/xdsgen.go
func (s *DiscoveryServer) pushXds(con *Connection, w *model.WatchedResource, req *model.PushRequest) error {
if w == nil {
return nil
}
// 获取xds生成器
gen := s.findGenerator(w.TypeUrl, con)
if gen == nil {
return nil
}
t0 := time.Now()
// If delta is set, client is requesting new resources or removing old ones. We should just generate the
// new resources it needs, rather than the entire set of known resources.
// Note: we do not need to account for unsubscribed resources as these are handled by parent removal;
// See https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#deleting-resources.
// This means if there are only removals, we will not respond.
var logFiltered string
if !req.Delta.IsEmpty() && features.PartialFullPushes &&
!con.proxy.IsProxylessGrpc() {
logFiltered = " filtered:" + strconv.Itoa(len(w.ResourceNames)-len(req.Delta.Subscribed))
w = &model.WatchedResource{
TypeUrl: w.TypeUrl,
ResourceNames: req.Delta.Subscribed.UnsortedList(),
}
}
// xds生成器生成xds配置
res, logdata, err := gen.Generate(con.proxy, w, req)
info := ""
if len(logdata.AdditionalInfo) > 0 {
info = " " + logdata.AdditionalInfo
}
if len(logFiltered) > 0 {
info += logFiltered
}
if err != nil || res == nil {
// If we have nothing to send, report that we got an ACK for this version.
if s.StatusReporter != nil {
s.StatusReporter.RegisterEvent(con.conID, w.TypeUrl, req.Push.LedgerVersion)
}
if log.DebugEnabled() {
log.Debugf("%s: SKIP%s for node:%s%s", v3.GetShortType(w.TypeUrl), req.PushReason(), con.proxy.ID, info)
}
// If we are sending a request, we must respond or we can get Envoy stuck. Assert we do.
// One exception is if Envoy is simply unsubscribing from some resources, in which case we can skip.
isUnsubscribe := features.PartialFullPushes && !req.Delta.IsEmpty() && req.Delta.Subscribed.IsEmpty()
if features.EnableUnsafeAssertions && err == nil && res == nil && req.IsRequest() && !isUnsubscribe {
log.Fatalf("%s: SKIPPED%s for node:%s%s but expected a response for request", v3.GetShortType(w.TypeUrl), req.PushReason(), con.proxy.ID, info)
}
return err
}
defer func() { recordPushTime(w.TypeUrl, time.Since(t0)) }()
resp := &discovery.DiscoveryResponse{
ControlPlane: ControlPlane(),
TypeUrl: w.TypeUrl,
// TODO: send different version for incremental eds
VersionInfo: req.Push.PushVersion,
Nonce: nonce(req.Push.LedgerVersion),
Resources: model.ResourcesToAny(res),
}
configSize := ResourceSize(res)
configSizeBytes.With(typeTag.Value(w.TypeUrl)).Record(float64(configSize))
ptype := "PUSH"
if logdata.Incremental {
ptype = "PUSH INC"
}
// 调用send方法将discoveryResponse发送出去
if err := con.send(resp); err != nil {
if recordSendError(w.TypeUrl, err) {
log.Warnf("%s: Send failure for node:%s resources:%d size:%s%s: %v",
v3.GetShortType(w.TypeUrl), con.proxy.ID, len(res), util.ByteCount(configSize), info, err)
}
return err
}
switch {
case !req.Full:
if log.DebugEnabled() {
log.Debugf("%s: %s%s for node:%s resources:%d size:%s%s",
v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res), util.ByteCount(configSize), info)
}
default:
debug := ""
if log.DebugEnabled() {
// Add additional information to logs when debug mode enabled.
debug = " nonce:" + resp.Nonce + " version:" + resp.VersionInfo
}
log.Infof("%s: %s%s for node:%s resources:%d size:%v%s%s", v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res),
util.ByteCount(ResourceSize(res)), info, debug)
}
return nil
}
pushXds()
方法首先根据订阅的资源类型找到对应的xDS生成器,然后通过生成器相应的xDS配置,最后通过send接口发送出去
3)XdsResourceGenerator.Generate
Pilot主要负责6种xDS配置资源(CDS、EDS、LDS、RDS、ECDS、NDS)的生成及下发。接来下以CDS生成器为例,看看DiscoveryServer是如何根据代理的属性及PushContext缓存生成原始的Cluster配置的
CDS配置通过ConfigGenerator.BuildClusters()
方法生成,代码如下:`
// pilot/pkg/networking/core/v1alpha3/cluster.go
func (configgen *ConfigGeneratorImpl) BuildClusters(proxy *model.Proxy, req *model.PushRequest) ([]*discovery.Resource, model.XdsLogDetails) {
// In Sotw, we care about all services.
var services []*model.Service
if features.FilterGatewayClusterConfig && proxy.Type == model.Router {
services = req.Push.GatewayServices(proxy)
} else {
services = proxy.SidecarScope.Services()
}
return configgen.buildClusters(proxy, req, services)
}
func (configgen *ConfigGeneratorImpl) buildClusters(proxy *model.Proxy, req *model.PushRequest,
services []*model.Service,
) ([]*discovery.Resource, model.XdsLogDetails) {
clusters := make([]*cluster.Cluster, 0)
resources := model.Resources{}
envoyFilterPatches := req.Push.EnvoyFilters(proxy)
// 创建cluster生成器
cb := NewClusterBuilder(proxy, req, configgen.Cache)
instances := proxy.ServiceInstances
cacheStats := cacheStats{}
switch proxy.Type {
// 生成sidecarProxy cluster
case model.SidecarProxy:
// Setup outbound clusters
outboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_OUTBOUND}
// 构建outbound cluster
ob, cs := configgen.buildOutboundClusters(cb, proxy, outboundPatcher, services)
cacheStats = cacheStats.merge(cs)
resources = append(resources, ob...)
// Add a blackhole and passthrough cluster for catching traffic to unresolved routes
// 添加blackhole和passthrough cluster为默认的路由转发流量
clusters = outboundPatcher.conditionallyAppend(clusters, nil, cb.buildBlackHoleCluster(), cb.buildDefaultPassthroughCluster())
clusters = append(clusters, outboundPatcher.insertedClusters()...)
// Setup inbound clusters
inboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_INBOUND}
// 构建inbound cluster
clusters = append(clusters, configgen.buildInboundClusters(cb, proxy, instances, inboundPatcher)...)
if proxy.EnableHBONE() {
clusters = append(clusters, configgen.buildInboundHBONEClusters())
}
// Pass through clusters for inbound traffic. These cluster bind loopback-ish src address to access node local service.
// 添加passthrough cluster为默认的路由转发流量
clusters = inboundPatcher.conditionallyAppend(clusters, nil, cb.buildInboundPassthroughClusters()...)
clusters = append(clusters, inboundPatcher.insertedClusters()...)
case model.Waypoint:
svcs := findWaypointServices(proxy, req.Push)
// Waypoint proxies do not need outbound clusters in most cases, unless we have a route pointing to something
outboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_OUTBOUND}
ob, cs := configgen.buildOutboundClusters(cb, proxy, outboundPatcher, filterWaypointOutboundServices(req.Push.ServicesAttachedToMesh(), svcs, services))
cacheStats = cacheStats.merge(cs)
resources = append(resources, ob...)
// Setup inbound clusters
inboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_INBOUND}
clusters = append(clusters, configgen.buildWaypointInboundClusters(cb, proxy, req.Push, svcs)...)
clusters = append(clusters, inboundPatcher.insertedClusters()...)
default: // Gateways
patcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_GATEWAY}
ob, cs := configgen.buildOutboundClusters(cb, proxy, patcher, services)
cacheStats = cacheStats.merge(cs)
resources = append(resources, ob...)
// Gateways do not require the default passthrough cluster as they do not have original dst listeners.
clusters = patcher.conditionallyAppend(clusters, nil, cb.buildBlackHoleCluster())
if proxy.Type == model.Router && proxy.MergedGateway != nil && proxy.MergedGateway.ContainsAutoPassthroughGateways {
clusters = append(clusters, configgen.buildOutboundSniDnatClusters(proxy, req, patcher)...)
}
clusters = append(clusters, patcher.insertedClusters()...)
}
// OutboundTunnel cluster is needed for sidecar and gateway.
if proxy.EnableHBONE() {
clusters = append(clusters, cb.buildConnectOriginate(proxy, req.Push, nil))
}
// if credential socket exists, create a cluster for it
if proxy.Metadata != nil && proxy.Metadata.Raw[security.CredentialMetaDataName] == "true" {
clusters = append(clusters, cb.buildExternalSDSCluster(security.CredentialNameSocketPath))
}
for _, c := range clusters {
resources = append(resources, &discovery.Resource{Name: c.Name, Resource: protoconv.MessageToAny(c)})
}
resources = cb.normalizeClusters(resources)
if cacheStats.empty() {
return resources, model.DefaultXdsLogDetails
}
return resources, model.XdsLogDetails{AdditionalInfo: fmt.Sprintf("cached:%v/%v", cacheStats.hits, cacheStats.hits+cacheStats.miss)}
}
7)、响应Envoy主动发起的xDS请求
StreamAggregatedResources()
方法中开启receive协程接收Envoy的xDS请求,代码如下:
// pilot/pkg/xds/ads.go
func (s *DiscoveryServer) receive(con *Connection, identities []string) {
defer func() {
close(con.errorChan)
close(con.reqChan)
// Close the initialized channel, if its not already closed, to prevent blocking the stream.
select {
case <-con.initialized:
default:
close(con.initialized)
}
}()
firstRequest := true
for {
// 接收discoveryRequest
req, err := con.stream.Recv()
if err != nil {
if istiogrpc.IsExpectedGRPCError(err) {
log.Infof("ADS: %q %s terminated", con.peerAddr, con.conID)
return
}
con.errorChan <- err
log.Errorf("ADS: %q %s terminated with error: %v", con.peerAddr, con.conID, err)
totalXDSInternalErrors.Increment()
return
}
// This should be only set for the first request. The node id may not be set - for example malicious clients.
if firstRequest {
// probe happens before envoy sends first xDS request
if req.TypeUrl == v3.HealthInfoType {
log.Warnf("ADS: %q %s send health check probe before normal xDS request", con.peerAddr, con.conID)
continue
}
firstRequest = false
if req.Node == nil || req.Node.Id == "" {
con.errorChan <- status.New(codes.InvalidArgument, "missing node information").Err()
return
}
if err := s.initConnection(req.Node, con, identities); err != nil {
con.errorChan <- err
return
}
defer s.closeConnection(con)
log.Infof("ADS: new connection for node:%s", con.conID)
}
select {
// 将discoveryRequest添加到reqChan
case con.reqChan <- req:
case <-con.stream.Context().Done():
log.Infof("ADS: %q %s terminated with stream closed", con.peerAddr, con.conID)
return
}
}
}
receive()
方法中接收Envoy的DiscoveryRequest,然后将DiscoveryRequest添加到reqChan中
StreamAggregatedResources()
方法中从reqChan中获取DiscoveryRequest,然后调用processRequest()
方法:
// pilot/pkg/xds/ads.go
func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con *Connection) error {
stype := v3.GetShortType(req.TypeUrl)
log.Debugf("ADS:%s: REQ %s resources:%d nonce:%s version:%s ", stype,
con.conID, len(req.ResourceNames), req.ResponseNonce, req.VersionInfo)
if req.TypeUrl == v3.HealthInfoType {
s.handleWorkloadHealthcheck(con.proxy, req)
return nil
}
// For now, don't let xDS piggyback debug requests start watchers.
if strings.HasPrefix(req.TypeUrl, v3.DebugType) {
return s.pushXds(con,
&model.WatchedResource{TypeUrl: req.TypeUrl, ResourceNames: req.ResourceNames},
&model.PushRequest{Full: true, Push: con.proxy.LastPushContext})
}
if s.StatusReporter != nil {
s.StatusReporter.RegisterEvent(con.conID, req.TypeUrl, req.ResponseNonce)
}
shouldRespond, delta := s.shouldRespond(con, req)
if !shouldRespond {
return nil
}
request := &model.PushRequest{
Full: true,
Push: con.proxy.LastPushContext,
Reason: []model.TriggerReason{model.ProxyRequest},
// The usage of LastPushTime (rather than time.Now()), is critical here for correctness; This time
// is used by the XDS cache to determine if a entry is stale. If we use Now() with an old push context,
// we may end up overriding active cache entries with stale ones.
Start: con.proxy.LastPushTime,
Delta: delta,
}
// SidecarScope for the proxy may not have been updated based on this pushContext.
// It can happen when `processRequest` comes after push context has been updated(s.initPushContext),
// but proxy's SidecarScope has been updated(s.computeProxyState -> SetSidecarScope) due to optimizations that skip sidecar scope
// computation.
if con.proxy.SidecarScope != nil && con.proxy.SidecarScope.Version != request.Push.PushVersion {
s.computeProxyState(con.proxy, request)
}
// 推送xds配置
return s.pushXds(con, con.Watched(req.TypeUrl), request)
}
processRequest()
方法中根据DiscoveryRequest信息推送xDS配置
8)、小结
配置变化后向Envoy推送xDS时序:
响应Envoy主动发起的xDS时序:
参考:
《Istio权威指南 下》
4.深入Istio源码:Pilot的Discovery Server如何执行xDS异步分发?
Istio Pilot代码深度解析