【go语言grpc之client端源码分析二】

news2024/11/14 21:03:13

go语言grpc之server端源码分析二

  • DialContext
    • parseTargetAndFindResolver
      • getResolver
    • newCCResolverWrapper
      • ccResolverWrapper.UpdateState
        • cc.maybeApplyDefaultServiceConfig
        • ccBalancerWrapper.updateClientConnState

上一篇文章分析了ClientConn的主要结构体成员,然后接下来看一下对应的实现也就是DialContext方法。

DialContext

// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
// dial, use WithBlock() dial option.
//
// In the non-blocking case, the ctx does not act against the connection. It
// only controls the setup steps.
//
// In the blocking case, ctx can be used to cancel or expire the pending
// connection. Once this function returns, the cancellation and expiration of
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
//
// The target name syntax is defined in
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
	cc := &ClientConn{
		target:            target,
		csMgr:             &connectivityStateManager{},
		conns:             make(map[*addrConn]struct{}),
		dopts:             defaultDialOptions(),
		blockingpicker:    newPickerWrapper(),
		czData:            new(channelzData),
		firstResolveEvent: grpcsync.NewEvent(),
	}
    // 初始化ctx
	cc.ctx, cc.cancel = context.WithCancel(context.Background())
   
    // 加载额外的配置
	for _, opt := range opts {
		opt.apply(&cc.dopts)
	}
     // 将额外的配置串起来
	chainUnaryClientInterceptors(cc)
	chainStreamClientInterceptors(cc)

	defer func() {
		if err != nil {
			cc.Close()
		}
	}() 

	// 删掉一些用不到的ssl的配置
    
   // 设置cc.dopts.copts.UserAgent 为 "grpc-go/1.45.0"
   if cc.dopts.copts.UserAgent != "" {
		cc.dopts.copts.UserAgent += " " + grpcUA
	} else {
		cc.dopts.copts.UserAgent = grpcUA
	}


	// 获取使用的resolverBuilder
	resolverBuilder, err := cc.parseTargetAndFindResolver()
	if err != nil {
		return nil, err
	}
	// 获取authority 这里是 localtion:8002
	cc.authority, err = determineAuthority(cc.parsedTarget.Endpoint, cc.target, cc.dopts)
	if err != nil {
		return nil, err
	}
	
	// 初始化balancer
	cc.balancerBuildOpts = balancer.BuildOptions{
		DialCreds:        credsClone,
		CredsBundle:      cc.dopts.copts.CredsBundle,
		Dialer:           cc.dopts.copts.Dialer,
		Authority:        cc.authority,
		CustomUserAgent:  cc.dopts.copts.UserAgent,
		ChannelzParentID: cc.channelzID,
		Target:           cc.parsedTarget,
	}

	// 对resolverBuilder增加覆盖初始化
	rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
	if err != nil {
		return nil, fmt.Errorf("failed to build resolver: %v", err)
	}
	cc.mu.Lock()
	cc.resolverWrapper = rWrapper
	cc.mu.Unlock()

	return cc, nil
}

上面的是在删除了很多不用的代码精简后的结果,然后看一下这个主要是下面的两个方法。

  • parseTargetAndFindResolver
  • newCCResolverWrapper

parseTargetAndFindResolver

首先看一下代码的实现

func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
	channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target)
 
    // 接下target
	var rb resolver.Builder
	parsedTarget, err := parseTarget(cc.target)
	if err != nil {
		channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)
	} else {
	    // 根据scheme获取Resolver 如果存在那么就直接返回
		rb = cc.getResolver(parsedTarget.Scheme)
		if rb != nil {
			cc.parsedTarget = parsedTarget
			return rb, nil
		}
	}
 
    // 如果没有对应的resolver那么就使用 passthrough 对应的resolver
    
    //获取默认的scheme 这里就是passthrough
	defScheme := resolver.GetDefaultScheme()
	channelz.Infof(logger, cc.channelzID, "fallback to scheme %q", defScheme)
	
	// 这里的canonicalTarget就是passthrough:///localhost:8002
	canonicalTarget := defScheme + ":///" + cc.target

    // 根据canonicalTarget 去解析目标
	parsedTarget, err = parseTarget(canonicalTarget)
	if err != nil {
		return nil, err
	}
    // 回去的就是passthrough对应的Resolver
	rb = cc.getResolver(parsedTarget.Scheme)

	if rb == nil {
		return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.Scheme)
	}
    // 添加到parsedTarget中去
	cc.parsedTarget = parsedTarget
	return rb, nil
}

然后看一下parseTarget这个,其实就是如果我们传入的scheme://host:port.然后解析到Target结构体,我们在上一篇文章也说了,也就是

type Target struct {
	// Deprecated: use URL.Scheme instead.
	Scheme string
	// Deprecated: use URL.Host instead.
	Authority string
	// Deprecated: use URL.Path or URL.Opaque instead. The latter is set when
	// the former is empty.
	Endpoint string
	// URL contains the parsed dial target with an optional default scheme added
	// to it if the original dial target contained no scheme or contained an
	// unregistered scheme. Any query params specified in the original dial
	// target can be accessed from here.
	URL url.URL
}

getResolver

然后就是根据getResolver获取对应的Builder。先看一下对应的方法

func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
	for _, rb := range cc.dopts.resolvers {
		if scheme == rb.Scheme() {
			return rb
		}
	}
	return resolver.Get(scheme)
}

然后Build的实现是

// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
	// Build creates a new resolver for the given target.
	//
	// gRPC dial calls Build synchronously, and fails if the returned error is
	// not nil.
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	// Scheme returns the scheme supported by this resolver.
	// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
	Scheme() string
}

而这里的ClientConn是一个interface,实现是

// ClientConn contains the callbacks for resolver to notify any updates
// to the gRPC ClientConn.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {
	// UpdateState updates the state of the ClientConn appropriately.
	UpdateState(State) error
	// ReportError notifies the ClientConn that the Resolver encountered an
	// error.  The ClientConn will notify the load balancer and begin calling
	// ResolveNow on the Resolver with exponential backoff.
	ReportError(error)
	// NewAddress is called by resolver to notify ClientConn a new list
	// of resolved addresses.
	// The address list should be the complete list of resolved addresses.
	//
	// Deprecated: Use UpdateState instead.
	NewAddress(addresses []Address)
	// NewServiceConfig is called by resolver to notify ClientConn a new
	// service config. The service config should be provided as a json string.
	//
	// Deprecated: Use UpdateState instead.
	NewServiceConfig(serviceConfig string)
	// ParseServiceConfig parses the provided service config and returns an
	// object that provides the parsed config.
	ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult
}

然后 Resolver的实现前面提到过,这里是


// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
	// ResolveNow will be called by gRPC to try to resolve the target name
	// again. It's just a hint, resolver can ignore this if it's not necessary.
	//
	// It could be called multiple times concurrently.
	ResolveNow(ResolveNowOptions)
	// Close closes the resolver.
	Close()
}

然后 这里

func Register(b Builder) {
	m[b.Scheme()] = b
}
var (
	// m is a map from scheme to resolver builder.
	m = make(map[string]Builder)
	// defaultScheme is the default scheme to use.
	defaultScheme = "passthrough"
)

可以看到这里默认的Scheme 就是passthrough。然后看一下在哪里将passthrough的resolver进行注册。

const scheme = "passthrough"

type passthroughBuilder struct{}

func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	r := &passthroughResolver{
		target: target,
		cc:     cc,
	}
	r.start()
	return r, nil
}

func (*passthroughBuilder) Scheme() string {
	return scheme
}

type passthroughResolver struct {
	target resolver.Target
	cc     resolver.ClientConn
}

func (r *passthroughResolver) start() {
	r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
}

func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions) {}

func (*passthroughResolver) Close() {}

func init() {
	resolver.Register(&passthroughBuilder{})
}

可以看出来这里是利用了init方法注册了,然后返回的是passthroughBuilder这个方法.

所以这里parseTargetAndFindResolver方法也就说完了,返回的就是passthroughBuilder结构体,这个里面的build方法我们放到后面再说。

newCCResolverWrapper

然后这个方法有两个参数,第一个是cc也就是在DialContext方法刚开始就初始的ClientConn,然后就是resolverBuilder,也就是passthroughBuilder。然后看一下这个方法的实现。

// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
// returns a ccResolverWrapper object which wraps the newly built resolver.
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
    // 初始ccResolverWrapper
	ccr := &ccResolverWrapper{
		cc:   cc,
		done: grpcsync.NewEvent(),
	}

	var credsClone credentials.TransportCredentials
	if creds := cc.dopts.copts.TransportCredentials; creds != nil {
		credsClone = creds.Clone()
	}
	rbo := resolver.BuildOptions{
		DisableServiceConfig: cc.dopts.disableServiceConfig,
		DialCreds:            credsClone,
		CredsBundle:          cc.dopts.copts.CredsBundle,
		Dialer:               cc.dopts.copts.Dialer,
	}

	var err error
	// We need to hold the lock here while we assign to the ccr.resolver field
	// to guard against a data race caused by the following code path,
	// rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
	// accessing ccr.resolver which is being assigned here.
	ccr.resolverMu.Lock()
	defer ccr.resolverMu.Unlock()
	// 调用传入的resolver.Builder的build方法
	ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
	if err != nil {
		return nil, err
	}
	return ccr, nil
}

然后这个方法的逻辑就是初始化ccResolverWrapper,然后就是调用resolver.Builder的build获取resolver,放入到ccResolverWrapper,然后在把ccResolverWrapper返回。
因为这里的rb其实就是passthroughResolver,然后看一下这个的build方法。
根据上面的源码passthroughResolver的Build其实就是初始化passthroughResolver,然后调用传入的ccr的UpdateState方法。参数就是resolver.State。


func (r *passthroughResolver) start() {
	r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
}

因为这里cc就是ccr也就是ccResolverWrapper.所以看一下ccResolverWrapper的UpdateState方法。

ccResolverWrapper.UpdateState

先看一下源码的实现

func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
	ccr.incomingMu.Lock()
	defer ccr.incomingMu.Unlock()
 
	ccr.curState = s
	if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
		return balancer.ErrBadResolverState
	}
	return nil
}

这里的ccr.cc就是ClientConn,也就是一开始在DialContext中初始化,然后看一下ClientConn的updateResolverState方法。

func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
	defer cc.firstResolveEvent.Fire()
	cc.mu.Lock()
	// Check if the ClientConn is already closed. Some fields (e.g.
	// balancerWrapper) are set to nil when closing the ClientConn, and could
	// cause nil pointer panic if we don't have this check.
	if cc.conns == nil {
		cc.mu.Unlock()
		return nil
	}

    // 删除err不为nil的逻辑

	var ret error
	if cc.dopts.disableServiceConfig {
		channelz.Infof(logger, cc.channelzID, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
		cc.maybeApplyDefaultServiceConfig(s.Addresses)
	} else if s.ServiceConfig == nil {
		cc.maybeApplyDefaultServiceConfig(s.Addresses)
		// TODO: do we need to apply a failing LB policy if there is no
		// default, per the error handling design?
	}    
	
 cc.blockingpicker.updatePicker(base.NewErrPicker(err))
				  cc.csMgr.updateState(connectivity.TransientFailure)
				cc.mu.Unlock()
				return ret
			}
		}
	}

	var balCfg serviceconfig.LoadBalancingConfig
	if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
		balCfg = cc.sc.lbConfig.cfg
	}

	cbn := cc.curBalancerName
	bw := cc.balancerWrapper
	cc.mu.Unlock()
	if cbn != grpclbName {
		// Filter any grpclb addresses since we don't have the grpclb balancer.
		for i := 0; i < len(s.Addresses); {
			if s.Addresses[i].Type == resolver.GRPCLB {
				copy(s.Addresses[i:], s.Addresses[i+1:])
				s.Addresses = s.Addresses[:len(s.Addresses)-1]
				continue
			}
			i++
		}
	}
	uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
	if ret == nil {
		ret = uccsErr // prefer ErrBadResolver state since any other error is
		// currently meaningless to the caller.
	}
	return ret
}

上面的代码很多其实也就是两个方法。
第一个是当 s.ServiceConfig == nil 的时候调用cc.maybeApplyDefaultServiceConfig(s.Addresses).
第一个也就是
bw := cc.balancerWrapper
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})

然后接下来看一下这两个方法

cc.maybeApplyDefaultServiceConfig

这里的address就是在初始化的时候传入的 localhost:8002.
然后看一下这个方法的实现

func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
	if sc == nil {
		// should never reach here.
		return
	}
	cc.sc = sc
 
    // 生成balancerBuilder
	if cc.dopts.balancerBuilder == nil {
		// Only look at balancer types and switch balancer if balancer dial
		// option is not set.
		var newBalancerName string
		if cc.sc != nil && cc.sc.lbConfig != nil {
			newBalancerName = cc.sc.lbConfig.name
		} else {
			var isGRPCLB bool
			for _, a := range addrs {
				if a.Type == resolver.GRPCLB {
					isGRPCLB = true
					break
				}
			}
			if isGRPCLB {
				newBalancerName = grpclbName
			} else if cc.sc != nil && cc.sc.LB != nil {
				newBalancerName = *cc.sc.LB
			} else {
				newBalancerName = PickFirstBalancerName
			}
		}
		// 生成PickFirstBalancerName的balance
		cc.switchBalancer(newBalancerName)
	} else if cc.balancerWrapper == nil {
		// Balancer dial option was set, and this is the first time handling
		// resolved addresses. Build a balancer with dopts.balancerBuilder.
		cc.curBalancerName = cc.dopts.balancerBuilder.Name()
		cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
	}
}

因为cc.dopts.balancerBuilder 这里为nil,同时newBalancerName为else中的逻辑,也就是PickFirstBalancerName也就是pick_first。
所以这个方法的逻辑也就是cc.switchBalancer(“pick_first”)。

// Caller must hold cc.mu.
func (cc *ClientConn) switchBalancer(name string) {
	if strings.EqualFold(cc.curBalancerName, name) {
		return
	}

	channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
	if cc.dopts.balancerBuilder != nil {
		channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
		return
	}
	if cc.balancerWrapper != nil {
		// Don't hold cc.mu while closing the balancers. The balancers may call
		// methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
		// would cause a deadlock in that case.
		cc.mu.Unlock()
		cc.balancerWrapper.close()
		cc.mu.Lock()
	}

	builder := balancer.Get(name)
	if builder == nil {
		channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
		channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
		builder = newPickfirstBuilder()
	} else {
		channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)
	}

	cc.curBalancerName = builder.Name()
	cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
}

看一下这里的pick_first的实现

// PickFirstBalancerName is the name of the pick_first balancer.
const PickFirstBalancerName = "pick_first"

func newPickfirstBuilder() balancer.Builder {
	return &pickfirstBuilder{}
}

type pickfirstBuilder struct{}

func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
	return &pickfirstBalancer{cc: cc}
}

然后看一下newCCBalancerWrapper这个方法

func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
	ccb := &ccBalancerWrapper{
		cc:       cc,
		updateCh: buffer.NewUnbounded(),
		closed:   grpcsync.NewEvent(),
		done:     grpcsync.NewEvent(),
		subConns: make(map[*acBalancerWrapper]struct{}),
	}
	go ccb.watcher()
	ccb.balancer = b.Build(ccb, bopts)
	_, ccb.hasExitIdle = ccb.balancer.(balancer.ExitIdler)
	return ccb
}

这里是初始化ccBalancerWrapper这个结构体,然后调用build实例化balancer。
然后看一下Build这个方法,也就是pickfirstBuilder。这里的build其实就是返回了pickfirstBalancer这个结构体,看一下实现

type pickfirstBalancer struct {
	state connectivity.State
	cc    balancer.ClientConn
	sc    balancer.SubConn
}

然后看一下ccb.watcher方法,也就是

// watcher balancer functions sequentially, so the balancer can be implemented
// lock-free.
func (ccb *ccBalancerWrapper) watcher() {
	for {
		select {
		case t := <-ccb.updateCh.Get():
			ccb.updateCh.Load()
			if ccb.closed.HasFired() {
				break
			}
			switch u := t.(type) {
			case *scStateUpdate:
				ccb.balancerMu.Lock()
				ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err})
				ccb.balancerMu.Unlock()
			case *acBalancerWrapper:
				ccb.mu.Lock()
				if ccb.subConns != nil {
					delete(ccb.subConns, u)
					ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain)
				}
				ccb.mu.Unlock()
			case exitIdle:
				if ccb.cc.GetState() == connectivity.Idle {
					if ei, ok := ccb.balancer.(balancer.ExitIdler); ok {
						// We already checked that the balancer implements
						// ExitIdle before pushing the event to updateCh, but
						// check conditionally again as defensive programming.
						ccb.balancerMu.Lock()
						ei.ExitIdle()
						ccb.balancerMu.Unlock()
					}
				}
			default:
				logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)
			}
		case <-ccb.closed.Done():
		}

		if ccb.closed.HasFired() {
			ccb.balancerMu.Lock()
			ccb.balancer.Close()
			ccb.balancerMu.Unlock()
			ccb.mu.Lock()
			scs := ccb.subConns
			ccb.subConns = nil
			ccb.mu.Unlock()
			ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil})
			ccb.done.Fire()
			// Fire done before removing the addr conns.  We can safely unblock
			// ccb.close and allow the removeAddrConns to happen
			// asynchronously.
			for acbw := range scs {
				ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
			}
			return
		}
	}
}

这里主要是scStateUpdate这个case,这里可以看出来当状态有更新的时候,会调用对应balance的UpdateSubConnState方法,在这里的实现是pickfirst是

func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
	if logger.V(2) {
		logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", sc, s)
	}
	if b.sc != sc {
		if logger.V(2) {
			logger.Infof("pickfirstBalancer: ignored state change because sc is not recognized")
		}
		return
	}
	b.state = s.ConnectivityState
	if s.ConnectivityState == connectivity.Shutdown {
		b.sc = nil
		return
	}

	switch s.ConnectivityState {
	case connectivity.Ready:
		b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{result: balancer.PickResult{SubConn: sc}}})
	case connectivity.Connecting:
		b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}})
	case connectivity.Idle:
		b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &idlePicker{sc: sc}})
	case connectivity.TransientFailure:
		b.cc.UpdateState(balancer.State{
			ConnectivityState: s.ConnectivityState,
			Picker:            &picker{err: s.ConnectionError},
		})
	}
}

注意这里的cc是ccBalancerWrapper,所以也就是调用ccBalancerWrapper的UpdateState方法,也就是

func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
	ccb.mu.Lock()
	defer ccb.mu.Unlock()
	if ccb.subConns == nil {
		return
	}
	// Update picker before updating state.  Even though the ordering here does
	// not matter, it can lead to multiple calls of Pick in the common start-up
	// case where we wait for ready and then perform an RPC.  If the picker is
	// updated later, we could call the "connecting" picker when the state is
	// updated, and then call the "ready" picker after the picker gets updated.

	ccb.cc.blockingpicker.updatePicker(s.Picker)
	ccb.cc.csMgr.updateState(s.ConnectivityState)
}

然后blockingpicker的updatePicker是

// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
	pw.mu.Lock()
	if pw.done {
		pw.mu.Unlock()
		return
	}
	pw.picker = p
	// pw.blockingCh should never be nil.
	close(pw.blockingCh)
	pw.blockingCh = make(chan struct{})
	pw.mu.Unlock()
}

其实就是更新pickerWrapper,并且通知通过close通知picker有更新。
然后调用updateState也就是csMgr

// updateState updates the connectivity.State of ClientConn.
// If there's a change it notifies goroutines waiting on state change to
// happen.
func (csm *connectivityStateManager) updateState(state connectivity.State) {
	csm.mu.Lock()
	defer csm.mu.Unlock()
	if csm.state == connectivity.Shutdown {
		return
	}
	if csm.state == state {
		return
	}
	csm.state = state
	channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
	if csm.notifyChan != nil {
		// There are other goroutines waiting on this channel.
		close(csm.notifyChan)
		csm.notifyChan = nil
	}
}

这里的就是更新ClientConn中的connectivityStateManager的状态。

ccBalancerWrapper.updateClientConnState

上面的cc.applyServiceConfigAndBalancer说完了,然后就是updateClientConnState方法,

func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
	ccb.balancerMu.Lock()
	defer ccb.balancerMu.Unlock()
	return ccb.balancer.UpdateClientConnState(*ccs)
}

然后就是调用balancer的UpdateClientConnState。注意这里的balancer还是pickfirst。看一下实现

func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error {
	if len(cs.ResolverState.Addresses) == 0 {
		b.ResolverError(errors.New("produced zero addresses"))
		return balancer.ErrBadResolverState
	}
	if b.sc == nil {
		var err error
		b.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{})
		if err != nil {
			if logger.V(2) {
				logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
			}
			b.state = connectivity.TransientFailure
			b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,
				Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},
			})
			return balancer.ErrBadResolverState
		}
		b.state = connectivity.Idle
		b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}})
		b.sc.Connect()
	} else {
		b.cc.UpdateAddresses(b.sc, cs.ResolverState.Addresses)
		b.sc.Connect()
	}
	return nil
}

这里的sc就是subConn。然后就是调用cc的NewSubConn,也就是ccBalancerWrapper的NewSubConn。然后看一下实现

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
	if len(addrs) <= 0 {
		return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
	}
	ccb.mu.Lock()
	defer ccb.mu.Unlock()
	if ccb.subConns == nil {
		return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
	}
	ac, err := ccb.cc.newAddrConn(addrs, opts)
	if err != nil {
		return nil, err
	}
	acbw := &acBalancerWrapper{ac: ac}
	acbw.ac.mu.Lock()
	ac.acbw = acbw
	acbw.ac.mu.Unlock()
	ccb.subConns[acbw] = struct{}{}
	return acbw, nil
}

然后就是ac的实现,也就是

// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
//
// Caller needs to make sure len(addrs) > 0.
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
	ac := &addrConn{
		state:        connectivity.Idle,
		cc:           cc,
		addrs:        addrs,
		scopts:       opts,
		dopts:        cc.dopts,
		czData:       new(channelzData),
		resetBackoff: make(chan struct{}),
	}
	ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
	// Track ac in cc. This needs to be done before any getTransport(...) is called.
	cc.mu.Lock()
	if cc.conns == nil {
		cc.mu.Unlock()
		return nil, ErrClientConnClosing
	}
	if channelz.IsOn() {
		ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
		channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
			Desc:     "Subchannel Created",
			Severity: channelz.CtInfo,
			Parent: &channelz.TraceEventDesc{
				Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
				Severity: channelz.CtInfo,
			},
		})
	}
	cc.conns[ac] = struct{}{}
	cc.mu.Unlock()
	return ac, nil
}

所以就是返回了acBalancerWrapper这个结构体,然后看一下ccBalancerWrapper的UpdateState方法,这个上面说过,主要是更新
ccb.cc.blockingpicker.updatePicker(s.Picker)
ccb.cc.csMgr.updateState(s.ConnectivityState)
这两个方法。
然后就是connect‘,其实就是根据地址去真正的连接后端的地址。

然后看一下acBalancerWrapper的connect方法,也就是

func (acbw *acBalancerWrapper) Connect() {
	acbw.mu.Lock()
	defer acbw.mu.Unlock()
	go acbw.ac.connect()
}

然后看一下addrConn的connect实现,

// connect starts creating a transport.
// It does nothing if the ac is not IDLE.
// TODO(bar) Move this to the addrConn section.
func (ac *addrConn) connect() error {
	ac.mu.Lock()
	if ac.state == connectivity.Shutdown {
		ac.mu.Unlock()
		return errConnClosing
	}
	if ac.state != connectivity.Idle {
		ac.mu.Unlock()
		return nil
	}
	// Update connectivity state within the lock to prevent subsequent or
	// concurrent calls from resetting the transport more than once.
	// 更新ac的状态
	ac.updateConnectivityState(connectivity.Connecting, nil)
	ac.mu.Unlock()
    // 更新地址
	ac.resetTransport()
	return nil
}

然后就是 ac.tryAllAddrs方法,然后就是在调用ac.createTransport。最后调用transport.NewClientTransport方法。看一下实现

// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
	return newHTTP2Client(connectCtx, ctx, addr, opts, onPrefaceReceipt, onGoAway, onClose)
}

到这里就是http2的逻辑了,然后看一下newHTTP2Client的实现

// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
	scheme := "http"
	ctx, cancel := context.WithCancel(ctx)
	defer func() {
		if err != nil {
			cancel()
		}
	}()

	// gRPC, resolver, balancer etc. can specify arbitrary data in the
	// Attributes field of resolver.Address, which is shoved into connectCtx
	// and passed to the dialer and credential handshaker. This makes it possible for
	// address specific arbitrary data to reach custom dialers and credential handshakers.
	connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})

	conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
	if err != nil {
		if opts.FailOnNonTempDialError {
			return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
		}
		return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
	}
	// Any further errors will close the underlying connection
	defer func(conn net.Conn) {
		if err != nil {
			conn.Close()
		}
	}(conn)
	kp := opts.KeepaliveParams
	// Validate keepalive parameters.
	if kp.Time == 0 {
		kp.Time = defaultClientKeepaliveTime
	}
	if kp.Timeout == 0 {
		kp.Timeout = defaultClientKeepaliveTimeout
	}
	keepaliveEnabled := false
	if kp.Time != infinity {
		if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
			return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
		}
		keepaliveEnabled = true
	}
	var (
		isSecure bool
		authInfo credentials.AuthInfo
	)
	transportCreds := opts.TransportCredentials
	perRPCCreds := opts.PerRPCCredentials

	if b := opts.CredsBundle; b != nil {
		if t := b.TransportCredentials(); t != nil {
			transportCreds = t
		}
		if t := b.PerRPCCredentials(); t != nil {
			perRPCCreds = append(perRPCCreds, t)
		}
	}
	if transportCreds != nil {
		rawConn := conn
		// Pull the deadline from the connectCtx, which will be used for
		// timeouts in the authentication protocol handshake. Can ignore the
		// boolean as the deadline will return the zero value, which will make
		// the conn not timeout on I/O operations.
		deadline, _ := connectCtx.Deadline()
		rawConn.SetDeadline(deadline)
		conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, rawConn)
		rawConn.SetDeadline(time.Time{})
		if err != nil {
			return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
		}
		for _, cd := range perRPCCreds {
			if cd.RequireTransportSecurity() {
				if ci, ok := authInfo.(interface {
					GetCommonAuthInfo() credentials.CommonAuthInfo
				}); ok {
					secLevel := ci.GetCommonAuthInfo().SecurityLevel
					if secLevel != credentials.InvalidSecurityLevel && secLevel < credentials.PrivacyAndIntegrity {
						return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")
					}
				}
			}
		}
		isSecure = true
		if transportCreds.Info().SecurityProtocol == "tls" {
			scheme = "https"
		}
	}
	dynamicWindow := true
	icwz := int32(initialWindowSize)
	if opts.InitialConnWindowSize >= defaultWindowSize {
		icwz = opts.InitialConnWindowSize
		dynamicWindow = false
	}
	writeBufSize := opts.WriteBufferSize
	readBufSize := opts.ReadBufferSize
	maxHeaderListSize := defaultClientMaxHeaderListSize
	if opts.MaxHeaderListSize != nil {
		maxHeaderListSize = *opts.MaxHeaderListSize
	}
	t := &http2Client{
		ctx:                   ctx,
		ctxDone:               ctx.Done(), // Cache Done chan.
		cancel:                cancel,
		userAgent:             opts.UserAgent,
		conn:                  conn,
		remoteAddr:            conn.RemoteAddr(),
		localAddr:             conn.LocalAddr(),
		authInfo:              authInfo,
		readerDone:            make(chan struct{}),
		writerDone:            make(chan struct{}),
		goAway:                make(chan struct{}),
		framer:                newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
		fc:                    &trInFlow{limit: uint32(icwz)},
		scheme:                scheme,
		activeStreams:         make(map[uint32]*Stream),
		isSecure:              isSecure,
		perRPCCreds:           perRPCCreds,
		kp:                    kp,
		statsHandler:          opts.StatsHandler,
		initialWindowSize:     initialWindowSize,
		onPrefaceReceipt:      onPrefaceReceipt,
		nextID:                1,
		maxConcurrentStreams:  defaultMaxStreamsClient,
		streamQuota:           defaultMaxStreamsClient,
		streamsQuotaAvailable: make(chan struct{}, 1),
		czData:                new(channelzData),
		onGoAway:              onGoAway,
		onClose:               onClose,
		keepaliveEnabled:      keepaliveEnabled,
		bufferPool:            newBufferPool(),
	}

	if md, ok := addr.Metadata.(*metadata.MD); ok {
		t.md = *md
	} else if md := imetadata.Get(addr); md != nil {
		t.md = md
	}
	t.controlBuf = newControlBuffer(t.ctxDone)
	if opts.InitialWindowSize >= defaultWindowSize {
		t.initialWindowSize = opts.InitialWindowSize
		dynamicWindow = false
	}
	if dynamicWindow {
		t.bdpEst = &bdpEstimator{
			bdp:               initialWindowSize,
			updateFlowControl: t.updateFlowControl,
		}
	}
	if t.statsHandler != nil {
		t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
			RemoteAddr: t.remoteAddr,
			LocalAddr:  t.localAddr,
		})
		connBegin := &stats.ConnBegin{
			Client: true,
		}
		t.statsHandler.HandleConn(t.ctx, connBegin)
	}
	if channelz.IsOn() {
		t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
	}
	if t.keepaliveEnabled {
		t.kpDormancyCond = sync.NewCond(&t.mu)
		go t.keepalive()
	}
	// Start the reader goroutine for incoming message. Each transport has
	// a dedicated goroutine which reads HTTP2 frame from network. Then it
	// dispatches the frame to the corresponding stream entity.
	go t.reader()

	// Send connection preface to server.
	n, err := t.conn.Write(clientPreface)
	if err != nil {
		err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
		t.Close(err)
		return nil, err
	}
	if n != len(clientPreface) {
		err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
		t.Close(err)
		return nil, err
	}
	var ss []http2.Setting

	if t.initialWindowSize != defaultWindowSize {
		ss = append(ss, http2.Setting{
			ID:  http2.SettingInitialWindowSize,
			Val: uint32(t.initialWindowSize),
		})
	}
	if opts.MaxHeaderListSize != nil {
		ss = append(ss, http2.Setting{
			ID:  http2.SettingMaxHeaderListSize,
			Val: *opts.MaxHeaderListSize,
		})
	}
	err = t.framer.fr.WriteSettings(ss...)
	if err != nil {
		err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
		t.Close(err)
		return nil, err
	}
	// Adjust the connection flow control window if needed.
	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
		if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
			err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
			t.Close(err)
			return nil, err
		}
	}

	t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)

	if err := t.framer.writer.Flush(); err != nil {
		return nil, err
	}
	go func() {
		t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
		err := t.loopy.run()
		if err != nil {
			if logger.V(logLevel) {
				logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
			}
		}
		// Do not close the transport.  Let reader goroutine handle it since
		// there might be data in the buffers.
		t.conn.Close()
		t.controlBuf.finish()
		close(t.writerDone)
	}()
	return t, nil
}

可以看出来这里的http2的实现和之前是大同小异的,这里就不多描述了。
接下来化了一个流程图来进行帮助记忆。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/356802.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

扫雷小游戏 - C语言

目录 一、扫雷游戏 1.游戏一共创建使用了三个文件 2.test.c 文件代码&#xff08;游戏逻辑&#xff09; 3.game.h - 必要的函数声明代码 4.game.c 游戏所有函数实现代码 一、扫雷游戏 1.游戏一共创建使用了三个文件 test.c - 测试游戏逻辑 game.h - 游戏代码包含的函数…

低频量化之指数 PE-PB-偏离 数据

目录历史文章股票明日涨停预测指数PEPB分位指数PE分位指数PB分位行业指数PEPB分位行业指数PE分位行业指数PB分位指数60日线偏离数据指数MA60偏离统计上证指数 MA60偏离度深证成指 MA60偏离度创业板指 MA60偏离度中小100 MA60偏离度上证50 MA60偏离度沪深300 MA60偏离度中证500 …

代码随想录算法训练营第45天动态规划 背包基础 1 2、 416. 分割等和子集

文章目录01背包基础 &#xff08;二维数组&#xff09;思路递推公式初始化遍历顺序一维dp数组&#xff08;滚动数组&#xff09;一维数组的递推公式遍历顺序LeetCode 416. 分割等和子集思路总结01背包基础 &#xff08;二维数组&#xff09; 思路 根据动态规划五部进行分析&a…

Vulnhub 渗透练习(八)—— THE ETHER: EVILSCIENCE

环境搭建 环境下载 靶机和攻击机网络适配都选 NAT 即可。 信息收集 主机扫描 两个端口&#xff0c;22 和 80&#xff0c;且 apache httpd 2.4.0~2.4.29 存在换行解析漏洞。 Apache HTTPD是一款HTTP服务器&#xff0c;它可以通过mod_php来运行PHP网页。其2.4.0~2.4.29版本中…

跨域问题的三种解决办法

我们平时对于前后端联调的项目&#xff0c;以下的错误是经常常见的&#xff0c;我们查看浏览器报错&#xff1a; Access to XMLHttpRequest at http://localhost:63110/system/dictionary/all fromorigin http://localhost:8601 has been blocked by CORS policy: No Access…

自动化测试5年经验,分享一些心得

自动化测试介绍 自动化测试(Automated Testing)&#xff0c;是指把以人为驱动的测试行为转化为机器执行的过程。实际上自动化测试往往通过一些测试工具或框架&#xff0c;编写自动化测试用例&#xff0c;来模拟手工测试过程。比如说&#xff0c;在项目迭代过程中&#xff0c;持…

【Flutter入门到进阶】Dart进阶篇---Dart多线程异步原理

1 Isolate 1.1 什么是Isolate 1.1.1 概念 线程&#xff1f;异步&#xff1f;隔离&#xff1f;到底什么意思&#xff1f; Isolate中文意思是隔离&#xff0c;从使用角度来说是Dart的线程&#xff0c;但是从本质虚拟机的实现角度来讲Isolate是一组封装。 isolate可以理解为dar…

群晖-第1章-IPV6的DDNS

群晖-第1章-IPV6的DDNS 方案&#xff1a;腾讯云群晖DS920 本文参考群晖ipv6 DDNS-go教程-牧野狂歌&#xff0c;感谢原作者的分享。 这篇文章只记录了我需要的部分&#xff0c;其他的可以查看原文&#xff0c;原文还记录了更多的内容&#xff0c;可能帮到你。 一、购买域名 …

【基于众包标注的语文教材句子难易度评估研究 论文精读】

基于众包标注的语文教材句子难易度评估研究 论文精读信息摘 要0 引言1 相关研究2 众包标注方法3 语料库构建3.1 数据收集3.1 基于五点量表的专家标注3.3 基于成对比较的众包标注4 特征及模型4.1 特征抽取4.2 模型与实验设计4.2.1 任务一:单句绝对难度评估4.2.2 任务二:句对相对…

《JavaScript百炼成仙》,简单但是挺有效的

编程之修&#xff0c;重在积累&#xff0c;而非资质。资质虽然重要&#xff0c;可是后天的努力更不可少。 《JavaScript百炼成仙》是一本以玄幻小说的形式&#xff0c;来讲述JavaScript的知识。 此篇仅仅是我快速阅读《JavaScript百炼成仙》这本书的笔记&#xff0c;流水账笔…

MySQL进阶篇之InnoDB存储引擎

06、InnoDB引擎 6.1、逻辑存储结构 表空间&#xff08;Tablespace&#xff09; 表空间在MySQL中最终会生成ibd文件&#xff0c;一个mysql实例可以对应多个表空间&#xff0c;用于存储记录、索引等数据。 段&#xff08;Segment&#xff09; 段&#xff0c;分为数据段&#x…

python基于vue微信小程序 房屋租赁出租系统

目录 1 绪论 1 1.1课题背景 1 1.2课题研究现状 1 1.3初步设计方法与实施方案 2 1.4本文研究内容 2 2 系统开发环境 4 2.1 2.2MyEclipse环境配置 4 2.3 B/S结构简介 4 2.4MySQL数据库 5 2. 3 系统分析 6 3.1系统可行性分析 6 3.1.1经济可行性 6 3.1.2技术可行性 6 3.1.3运行可行…

面试准备知识点与总结——(虚拟机篇)

目录JVM的内存结构JVM哪些部分会发生内存溢出方法区、永久代、元空间三者之间的关系JVM内存参数JVM垃圾回收算法1.标记清除法2.标记整理3.标记复制说说GC和分代回收算法三色标记与并发漏标的问题垃圾回收器项目中什么时候会内存溢出&#xff0c;怎么解决类加载过程三个阶段何为…

Docker 架构简介

Docker 架构 Docker 包括三个基本概念: 镜像&#xff08;Image&#xff09;&#xff1a;Docker 镜像&#xff08;Image&#xff09;&#xff0c;就相当于是一个 root 文件系统。比如官方镜像 ubuntu:16.04 就包含了完整的一套 Ubuntu16.04 最小系统的 root 文件系统。容器&am…

【力扣-LeetCode】64. 最小路径和 C++题解

64. 最小路径和难度中等1430收藏分享切换为英文接收动态反馈给定一个包含非负整数的 m x n 网格 grid &#xff0c;请找出一条从左上角到右下角的路径&#xff0c;使得路径上的数字总和为最小。说明&#xff1a;每次只能向下或者向右移动一步。示例 1&#xff1a;输入&#xff…

实例1:控制树莓派板载LED灯闪烁

实例1&#xff1a;控制树莓派板载LED灯闪烁 实验目的 通过背景知识学习&#xff0c;了解四足机器人mini pupper搭载的微型控制计算机&#xff1a;树莓派。通过对树莓派板载LED灯的状态读写控制&#xff0c;熟悉树莓派本身的操作及Linux中文件的读写。掌握常见函数time.sleep(…

初识Hadoop,走进大数据世界

文章目录数据&#xff01;数据&#xff01;遇到的问题Hadoop的出现相较于其他系统的优势关系型数据库网格计算本文章属于Hadoop系列文章&#xff0c;分享Hadoop相关知识。后续文章中会继续分享Hadoop的组件、MapReduce、HDFS、Hbase、Flume、Pig、Spark、Hadoop集群管理系统以及…

rewrite中的if、break、last

目录 rewrite 作用&#xff1a; 依赖&#xff1a; 打开重定向日志&#xff1a; if 判断&#xff1a; location {} 本身有反复匹配执行特征 在 location 中加入 break 和 last &#xff08;不一样&#xff09; 加了break后&#xff0c;立刻停止向下 且 跳出。 加了last&#xf…

Windows 版本ffmpeg编译概述

在使用ffmpeg过程当中&#xff0c;ffmpeg在Linux(包括mac,android)编译非常容易,直接configure,make即可&#xff0c;Android需要交叉编译,在windows就比较麻烦&#xff0c;庆幸的是ffmpeg官方提供已编译好Windows版本的二进制库&#xff08;http://ffmpeg.org/download.html#b…

使用vector<char>作为输入缓冲区

一、引言 当我们编写代码&#xff1a;实现网络接收、读取文件内容等功能时&#xff0c;我们往往要在内存中开辟一个输入缓冲区(又名&#xff1a;input buffer/读缓冲区&#xff09;来存贮接收到的数据。在C里面我们可以用如下方法开辟输入缓冲区。 ①使用C语言中的数组&#x…