client-go监听apiserver,监听http2逻辑分析

news2025/1/10 2:18:08

前言

最近做项目,需要写一个controller(k8s的插件),需要从k8s的apiserver取数据,就用了自带的client-go,但是client-go是怎么从apiserver获取数据的一直没有研究过,只是看网上,看官方文档说是chunk读取数据,然而事实上,笔者却发现使用http2.0的长轮询。强烈建议使用linux或者mac开发机。

1. client-go demo

demo实际上就是官方代码,这段代码是网上流传的经典代码

client-go v0.25.3

kubernates 1.25.4

    config, err := clientcmd.BuildConfigFromFlags("", "~/.kube/config")//注意路径
	if err != nil {
		log.Fatal(err)
	}
    //这2行是抓包的时候使用,日常是不需要的
	config.TLSClientConfig.CAData = nil
	config.TLSClientConfig.Insecure = true

	clientSet, err := kubernetes.NewForConfig(config)
	if err != nil {
		log.Fatal(err)
	}

    //这里可以调一些参数,defaultResync很关键
	factory := informers.NewSharedInformerFactoryWithOptions(clientSet, 0, informers.WithNamespace("default"))
	informer := factory.Core().V1().Pods().Informer()//获取pod的informer,实际上使用client-go的api很多informer都创建了,直接拿过来用,避免使用的时候重复创建
	informer.AddEventHandler(xxx) //事件处理,是一个回调hook

	stopper := make(chan struct{}, 1)
	go informer.Run(stopper)
	log.Println("----- list and watch pod starting...")

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	<-sigs

	close(stopper)
	log.Println("main stopped...")

demo构建好了,实际上就可以在k8s启动后运行,本质上k8s的监听就是apiserver发送指令,驱动k8s的各个部件干活,驱动的本质的http的“推送”,为了真实的还原,使用抓包工具抓包分析,随便写一个deployment的yaml文件,可以自行构建,以官方文档为例Deployments | Kubernetes

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx-deployment
  labels:
    app: nginx
spec:
  replicas: 2
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - name: nginx
        image: nginx:1.14.2
        ports:
        - containerPort: 80

因为笔者本地跑的东西比较多,节省资源就部署2个pod 

2. apiserver抓包分析

针对刚刚到官方yaml,使用

kubectl apply -f xxx.yaml

即可部署deployment的pod,通过创建ReplicaSet创建POD

可以看到部署的POD,pause容器是k8s的定义容器,定义pod的网络等资源。以本地docker-desktop为例,对于本地,可以通过设置修改,看到k8s的容器

当然也可以使用kubectl。

2.1 抓包方式 

实际上抓包方式总类很多,概括实现主要有2种,代理和复制,比如典型的VPN和tcpdump。代理比较好说,就是中间代理,在istio的时候envoy就是iptables代理;tcpdump也是linux很常用的方式,比如笔者以前讲的goreplay,使用pacap,通过bpf技术。

抓包工具wireshark就是tcpdump的典型实现,但是因为配置证书麻烦,所以此次使用代理抓包,也可以tcpdemp后使用wireshark分析,注意tls1.2和tls1.3的区别,tls1.3解包更困难,因为生成非对称的秘钥是算法动态生成的,相对tls1.2比较好解包。

这2个抓包工具是3个平台都有安装包的

实战wireshark

执行 sudo chown -R xxx:admin /dev/bpf*

然后抓取本地网卡,127的ip选择lo0即可,过滤port:tcp.port == 6443(6443是本地k8s的apiserver的端口,一般tls的端口默认443)

执行kubectl delete deployment nginx-deployment

tls1.3 ,看来解包不容易,笔者查询资料,有ali的社区说可以使用代理解包,相对比较容易,笔者也试了,确实可以,但是有个问题文章没提到如何通过抓包来查看Kubernetes API流量-阿里云开发者社区 (aliyun.com)

可能是史上最全的Kubernetes证书解析 (qingwave.github.io)

实战charles

# 提取出客户端证书
grep client-certificate-data ~/.kube/config | \
  awk '{ print $2 }' | \
  base64 --decode > client-cert.pem
# 提取出客户端私钥
grep client-key-data ~/.kube/config | \
  awk '{ print $2 }' | \
  base64 --decode > client-key.pem
# 提取出服务端CA证书
grep certificate-authority-data ~/.kube/config | \
  awk '{ print $2 }' | \
  base64 --decode > cluster-ca-cert.pem

提取kube的证书信息

导入文本,private key是私钥;cert是证书,里面有公钥,非对称加密。

勾上,k8s使用http2

此时只想kubectl需要配置https_proxy,否则还是会直连

export https_proxy=http://127.0.0.1:8888/

如果不使用了,可以使用

删除,但是只对此次操作有效

export  -n https_proxy=http://127.0.0.1:8888/

unset ,只会在当前环境有效

unset https_proxy

配置代理会出现证书不认识,可以把charles证书加入系统信任

huahua@huahuadeMac-mini kube % kubectl get pod
Unable to connect to the server: x509: certificate signed by unknown authority

执行代理后,证书认证是不过的,因为charles代理了请求。需要忽略证书认证,或者导入charles的证书,避免麻烦,直接忽略吧

kubectl  --insecure-skip-tls-verify get pods -A

huahua@huahuadeMac-mini kube % kubectl --insecure-skip-tls-verify get pods
Error from server (Forbidden): pods is forbidden: User "system:anonymous" cannot list resource "pods" in API group "" in the namespace "default"

就是没权限,本地很好解决,用cluster-admin的权限角色给过去

huahua@huahuadeMac-mini .kube % kubectl create clusterrolebinding test:anonymous --clusterrole=cluster-admin --user=system:anonymous

clusterrolebinding.rbac.authorization.k8s.io/test:anonymous created

至此抓包成功,代理后使用的tls1.2连接apiserver。如果是tls1.3还会麻烦点

 执行上面demo的代码

go build -o kube_listen .

 执行代理配置

export https_proxy=http://127.0.0.1:8888/

./kube_listen  #执行程序

执行

kubectl --insecure-skip-tls-verify apply -f nginx-deployment.yaml

分析抓包 

对于监听程序抓包如下,实际上kubectl也是使用http访问apiserver,也可以被抓包到。

启动时会监听2个api,以demo的pod为例,实际上就是ListAndWatch的结果

这个就是List的API。 

首先发起请求,访问现在存在的pod数据,并且获取当前最新的资源版本(版本控制) ,Watch的接口。

到了超时时间,发起新的请求 

然后使用新版本号读取新的变更资源,读取是个长轮询的过程,且还是http2,在源码分析也可以得出相同结论。并非http1.1+chunk,chunk必须在header标记chunk,否则client怎么知道是chunk呢,chunk的数据是特殊格式的,解析也必须特殊解析。

 然后试着让某个pod模拟突然down

推送了2条pod变更消息

 对比2个结果

 一个是pod down的message,一个是pod重启后的message,k8s的默认调度能力

观察一段时间会发现监听版本号太旧,会更新版本号,从新发起监听request请求

服务端会检测资源版本号,太旧就会直接返回,不确定是否定时检查,需要查看api-server的源码。

版本号太旧会继续执行List的操作请求,会获取到当前版本的pod的列表信息,然后执行Watch。 

3. client-go 监听源码分析

笔者在分析api-server的时候,拿到github的apiserver的源码,发现没有main入口,查询github,发现代码入口在kubernates里面

可以通过

kubectl --namespace="kube-system" describe pod  kube-apiserver-docker-desktop 

看到api-server的详情 

3.1 client-go Config

第一步是读取配置,证书等的文件信息

clientcmd.BuildConfigFromFlags("", "~/.kube/config")

读取的过程会根据参数使用不同的实现,本地模式使用DeferredLoadingClientConfig

load配置文件,调用k8s versioning.go自带的decoder

func LoadFromFile(filename string) (*clientcmdapi.Config, error) {
	kubeconfigBytes, err := ioutil.ReadFile(filename)
	if err != nil {
		return nil, err
	}
	config, err := Load(kubeconfigBytes)
	if err != nil {
		return nil, err
	}
	klog.V(6).Infoln("Config loaded from file: ", filename)

	// set LocationOfOrigin on every Cluster, User, and Context
	for key, obj := range config.AuthInfos {
		obj.LocationOfOrigin = filename
		config.AuthInfos[key] = obj
	}
	for key, obj := range config.Clusters {
		obj.LocationOfOrigin = filename
		config.Clusters[key] = obj
	}
	for key, obj := range config.Contexts {
		obj.LocationOfOrigin = filename
		config.Contexts[key] = obj
	}

	if config.AuthInfos == nil {
		config.AuthInfos = map[string]*clientcmdapi.AuthInfo{}
	}
	if config.Clusters == nil {
		config.Clusters = map[string]*clientcmdapi.Cluster{}
	}
	if config.Contexts == nil {
		config.Contexts = map[string]*clientcmdapi.Context{}
	}

	return config, nil
}

读取的文件实际上是url、tls信息、content等信息

kubernetes.NewForConfig(config)

实际上就是初始化restClient

func NewForConfig(c *rest.Config) (*Clientset, error) {
	configShallowCopy := *c

	if configShallowCopy.UserAgent == "" {
		configShallowCopy.UserAgent = rest.DefaultKubernetesUserAgent()
	}

	// share the transport between all clients
    // 上面的注释很明显了,创建httpclient,share transport
	httpClient, err := rest.HTTPClientFor(&configShallowCopy)
	if err != nil {
		return nil, err
	}

    // 看看如何share的
	return NewForConfigAndClient(&configShallowCopy, httpClient)
}

定位发现一堆使用httpClient

以第一个为例

// NewForConfigAndClient creates a new AdmissionregistrationV1Client for the given config and http client.
// Note the http client provided takes precedence over the configured transport values.
func NewForConfigAndClient(c *rest.Config, h *http.Client) (*AdmissionregistrationV1Client, error) {
	config := *c
    // 设置默认值, url path等
	if err := setConfigDefaults(&config); err != nil {
		return nil, err
	}
    //复用transport
	client, err := rest.RESTClientForConfigAndClient(&config, h)
	if err != nil {
		return nil, err
	}
	return &AdmissionregistrationV1Client{client}, nil
}

看看client的创建过程

func RESTClientForConfigAndClient(config *Config, httpClient *http.Client) (*RESTClient, error) {
	if config.GroupVersion == nil {
		return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
	}
	if config.NegotiatedSerializer == nil {
		return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
	}
    // 从配置拿到了url
	baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
	if err != nil {
		return nil, err
	}

    // 限流,不配置就是默认,如果开发k8s的插件,建议根据实际需求配置
	rateLimiter := config.RateLimiter
	if rateLimiter == nil {
		qps := config.QPS
		if config.QPS == 0.0 {
			qps = DefaultQPS
		}
		burst := config.Burst
		if config.Burst == 0 {
			burst = DefaultBurst
		}
		if qps > 0 {
			rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
		}
	}

	var gv schema.GroupVersion
	if config.GroupVersion != nil {
		gv = *config.GroupVersion
	}
	clientContent := ClientContentConfig{
		AcceptContentTypes: config.AcceptContentTypes,
		ContentType:        config.ContentType,
		GroupVersion:       gv,
		Negotiator:         runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
	}
    // 复用httpClient,因为指针
	restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
	if err == nil && config.WarningHandler != nil {
		restClient.warningHandler = config.WarningHandler
	}
	return restClient, err
}

 其他创建过程差不多,省略

informers.NewSharedInformerFactoryWithOptions(clientSet, 0, informers.WithNamespace("default"))

工厂类,初始化informer准备

factory.Core().V1().Pods().Informer()

前面的函数都是准备数据,重点看Informer函数

func (f *podInformer) Informer() cache.SharedIndexInformer {
   return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

	informerType := reflect.TypeOf(obj)
	informer, exists := f.informers[informerType]
	if exists {
		return informer
	}

	resyncPeriod, exists := f.customResync[informerType]
	if !exists {
        //关键参数之一
		resyncPeriod = f.defaultResync
	}

    // 创建informer,注意有2个回调函数,函数指针
	informer = newFunc(f.client, resyncPeriod)
	f.informers[informerType] = informer

	return informer
}

回调函数,极其关键,ListWatch对应的函数List和Watch,跟前面抓包相对应。

// NewFilteredPodInformer constructs a new informer for Pod type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
			},
		},
		&corev1.Pod{},
		resyncPeriod,
		indexers,
	)
}

看看添加监听器

informer.AddEventHandler(xxx) // 刚刚设置的ResyncPeriod派上用场
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
	s.startedLock.Lock()
	defer s.startedLock.Unlock()

	if s.stopped {
		klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
		return
	}

    //检查ResyncPeriod
	if resyncPeriod > 0 {
		if resyncPeriod < minimumResyncPeriod {
			klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod)
			resyncPeriod = minimumResyncPeriod
		}

		if resyncPeriod < s.resyncCheckPeriod {
			if s.started {
				klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
				resyncPeriod = s.resyncCheckPeriod
			} else {
				// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
				// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
				// accordingly
				s.resyncCheckPeriod = resyncPeriod
				s.processor.resyncCheckPeriodChanged(resyncPeriod)
			}
		}
	}

    //新增监听器
	listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

	if !s.started {
        // Informer未启动时加入监听
		s.processor.addListener(listener)
		return
	}

	// in order to safely join, we have to
	// 1. stop sending add/update/delete notifications
	// 2. do a list against the store
	// 3. send synthetic "Add" events to the new handler
	// 4. unblock
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock() //阻塞Deltas队列,后面监听会用到,Informer的实现架构的一环

	s.processor.addListener(listener) // Informer启动后加入监听器
	for _, item := range s.indexer.List() { //对新监听器执行List操作,数据直接给过去
		listener.add(addNotification{newObj: item})
	}
}

看关键部分addListener

func (p *sharedProcessor) addListener(listener *processorListener) {
	p.listenersLock.Lock()
	defer p.listenersLock.Unlock()
    // 加入切片
	p.addListenerLocked(listener)
	if p.listenersStarted { //如果是已经启动,那么执行listener的run和pop;这是2个关键的chan处理函数
		p.wg.Start(listener.run)
		p.wg.Start(listener.pop)
	}
}

func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
	p.listeners = append(p.listeners, listener)
	p.syncingListeners = append(p.syncingListeners, listener)
}

如果状态是已启动,把缓存的数据刷到handler事件(动态增加listener),看看这2个关键处理函数

func (p *processorListener) add(notification interface{}) {
	p.addCh <- notification
}

func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			// Notification dispatched
			var ok bool
			notification, ok = p.pendingNotifications.ReadOne() //读取pending的缓存数据
			if !ok { // Nothing to pop //读取到数据就在下一次循环给nextCh管道
				nextCh = nil // Disable this select case
			}
		case notificationToAdd, ok := <-p.addCh: //这个是上面的函数,在Informer状态是启动时,或者启动后,写入的List数据
			if !ok {
				return
			}
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
				// Optimize the case - skip adding to pendingNotifications
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
				p.pendingNotifications.WriteOne(notificationToAdd) //写入listener缓存
			}
		}
	}
}

func (p *processorListener) run() {
	// this call blocks until the channel is closed.  When a panic happens during the notification
	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
	// the next notification will be attempted.  This is usually better than the alternative of never
	// delivering again.
	stopCh := make(chan struct{})
	wait.Until(func() { 
        //这就是handler的回调,nextChan就是上面pop产生
		for next := range p.nextCh {
			switch notification := next.(type) {
			case updateNotification:
				p.handler.OnUpdate(notification.oldObj, notification.newObj)
			case addNotification:
				p.handler.OnAdd(notification.newObj)
			case deleteNotification:
				p.handler.OnDelete(notification.oldObj)
			default:
				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
			}
		}
		// the only way to get here is if the p.nextCh is empty and closed
		close(stopCh)
	}, 1*time.Second, stopCh)
}

分析过程中发现listener有Informer启动前和启动后添加的过程,启动后已经分析,看看启动前Informer怎么启动的

informer.Run(stopper) //不少网上教程也有自己写controller的,实际上client-go已经封装了

go的匿名函数指针在client-go有非常多的地方应用,看起代码来很累😅,再用变量传来传去

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash() //处理panic

	if s.HasStarted() { //实际上这个函数是启动Informer,所以如果已经启动就返回,避免重复启动
		klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
		return
	}
    //fifo队列,对应的是lifo;这个设计是Informer的关键设计之一
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          s.indexer,
		EmitDeltaTypeReplaced: true,
	})

    //关键的配置,非常关键,涉及整个架构的过程
	cfg := &Config{
		Queue:            fifo, //deltafifo队列
		ListerWatcher:    s.listerWatcher, //list watch 函数接口
		ObjectType:       s.objectType,    //监听类型,笔者这里监听的pod,因为前面的代码调用了Pod()函数
		FullResyncPeriod: s.resyncCheckPeriod, //重新检查周期
		RetryOnError:     false,
        //resync是listener的重新同步,可以看这个函数的实现
		ShouldResync:     s.processor.shouldResync, //是否需要重新同步的函数指针

		Process:           s.HandleDeltas,    //处理delta的函数
		WatchErrorHandler: s.watchErrorHandler, //见名知义
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
    //cache缓存监控
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run) //这个run就是前面的解析的run函数

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}() //defer函数,在controller运行后更新状态
	s.controller.Run(stopCh) //关键一步
}

DeltaFIFO创建,里面是map和切片的结合体,还有锁

// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
// items. See also the comment on DeltaFIFO.
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
	if opts.KeyFunction == nil {
		opts.KeyFunction = MetaNamespaceKeyFunc
	}

	f := &DeltaFIFO{
		items:        map[string]Deltas{},
		queue:        []string{},
		keyFunc:      opts.KeyFunction,
		knownObjects: opts.KnownObjects,

		emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
	}
	f.cond.L = &f.lock
	return f
}

还有线程安全的本地存储,所以实际上这个也可以用store里面取数据,常见的用法是nginx-ingress的用法Welcome - NGINX Ingress Controller (kubernetes.github.io),可以去看源代码用的store的方式。

看看HandleDeltas处理队列的函数,这个是一个关键过程,所以List和Watch都会触发handler

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	if deltas, ok := obj.(Deltas); ok {
		return processDeltas(s, s.indexer, s.transform, deltas)
	}
	return errors.New("object given as Process argument is not Deltas")
}

// Multiplexes updates in the form of a list of Deltas into a Store, and informs
// a given handler of events OnUpdate, OnAdd, OnDelete
func processDeltas(
	// Object which receives event notifications from the given deltas
	handler ResourceEventHandler,
	clientState Store,
	transformer TransformFunc,
	deltas Deltas,
) error {
	// from oldest to newest
	for _, d := range deltas {
		obj := d.Object
		if transformer != nil { //默认情况这个是nil,即不需要转换
			var err error
			obj, err = transformer(obj) //转换类型,前面的config里面是有类型的
			if err != nil {
				return err
			}
		}
        //handler分发,这也是我们注册的handler生效的原因
		switch d.Type {
		case Sync, Replaced, Added, Updated:
			if old, exists, err := clientState.Get(obj); err == nil && exists {
				if err := clientState.Update(obj); err != nil { //更新状态,可以是本地缓存,也可以是队列,默认使用本地store ThreadSafeStore
					return err
				}
				handler.OnUpdate(old, obj)
			} else {
				if err := clientState.Add(obj); err != nil { //同上
					return err
				}
				handler.OnAdd(obj)
			}
		case Deleted:
			if err := clientState.Delete(obj); err != nil { //同上
				return err
			}
			handler.OnDelete(obj)
		}
	}
	return nil
}
s.controller.Run(stopCh) //最终的启动干活了
// Run begins processing items, and will continue until a value is sent down stopCh or it is closed. run就开始干活了,直到被stopCh关闭
// It's an error to call Run more than once. 调用多次是错误的
// Run blocks; call via go. 运行会阻塞,用协程调用
func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash() //处理panic
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}() //关闭代码
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.WatchListPageSize = c.config.WatchListPageSize
	r.clock = c.clock
	if c.config.WatchErrorHandler != nil {
		r.watchErrorHandler = c.config.WatchErrorHandler
	}

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group
    // 定时List And Watch
	wg.StartWithChannel(stopCh, r.Run)
    // delta队列取数据处理,监听后面的事情了
	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

继续看r.Run,这个是循环运行的函数,定时运行ListAndWatch,直到stopChan;r.ListAndWatch(stopCh)就是去读取和定时增量更新,所以会一直不断的List And Watch,对应抓包的循环调用List和Watch的url

// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
	klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

定时循环 

processLoop函数,不断的从queue delta队列取数据处理
func (c *controller) processLoop() {
	for {
        //使用函数指针的强制类型转换c.config.Process实际上在前面定义了,即s.HandleDeltas
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

HandleDeltas前面已经分析过了,交付handler与store 

3.2  ListAndWatch的过程

实际上上面已经把Informer启动的过程分析完成,但是Informer的数据是怎么拿到的,毕竟数据的来源是根源,否则后面的delta队列和store也没有必要,需要进一步分析

// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)

	err := r.list(stopCh) //list,即前面抓包的list,读取资源版本号和items列表(此刻),用于Watch
	if err != nil {
		return err
	}

	resyncerrc := make(chan error, 1)
	cancelCh := make(chan struct{})
	defer close(cancelCh)
	go func() {
		resyncCh, cleanup := r.resyncChan()
		defer func() {
			cleanup() // Call the last one written into cleanup
		}()
		for {
			select {
			case <-resyncCh:
			case <-stopCh:
				return
			case <-cancelCh:
				return
			}
			if r.ShouldResync == nil || r.ShouldResync() {
				klog.V(4).Infof("%s: forcing resync", r.name)
				if err := r.store.Resync(); err != nil { //重新同步delta队列
					resyncerrc <- err
					return
				}
			}
			cleanup()
			resyncCh, cleanup = r.resyncChan()
		}
	}()

	retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
	for {
		// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
		select {
		case <-stopCh:
			return nil
		default:
		}

		timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
		options := metav1.ListOptions{ //watch初始化,资源版本号,超时随机时长
			ResourceVersion: r.LastSyncResourceVersion(),
			// We want to avoid situations of hanging watchers. Stop any watchers that do not
			// receive any events within the timeout window.
			TimeoutSeconds: &timeoutSeconds,
			// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
			// Reflector doesn't assume bookmarks are returned at all (if the server do not support
			// watch bookmarks, it will ignore this field).
			AllowWatchBookmarks: true,
		}

		// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
		start := r.clock.Now()
        //实际上是立即返回,但是response的body是阻塞的,除非apiserver给出结束标记,或者超时
		w, err := r.listerWatcher.Watch(options) //watch核心,读取到监听数据
		if err != nil {
			// If this is "connection refused" error, it means that most likely apiserver is not responsive.
			// It doesn't make sense to re-list all objects because most likely we will be able to restart
			// watch where we ended.
			// If that's the case begin exponentially backing off and resend watch request.
			// Do the same for "429" errors.
			if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
				<-r.initConnBackoffManager.Backoff().C()
				continue
			}
			return err
		}

        //很关键,读取上面watch阻塞的response body的通信管道,写入delta fifo队列
        // 上面是watch -- 阻塞 -- apiserver结束或超时 -- 写入管道 
        // 此处是从管道读取 -- 写入deltaFIFO;当没有数据是阻塞的,结束条件是超时,或者apiserver error信息
		err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
		retry.After(err)
		if err != nil {
			if err != errorStopRequested {
				switch {
				case isExpiredError(err):
					// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
					// has a semantic that it returns data at least as fresh as provided RV.
					// So first try to LIST with setting RV to resource version of last observed object.
					klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
				case apierrors.IsTooManyRequests(err):
					klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)
					<-r.initConnBackoffManager.Backoff().C()
					continue
				case apierrors.IsInternalError(err) && retry.ShouldRetry():
					klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.expectedTypeName, err)
					continue
				default:
					klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
				}
			}
			return nil //结束当前任务执行定时任务 //wait.BackoffUntil函数,上面已经分析过了
		}
	}
}
watchHandler
// watchHandler watches w and sets setLastSyncResourceVersion
func watchHandler(start time.Time,
	w watch.Interface,
	store Store,
	expectedType reflect.Type,
	expectedGVK *schema.GroupVersionKind,
	name string,
	expectedTypeName string,
	setLastSyncResourceVersion func(string),
	clock clock.Clock,
	errc chan error,
	stopCh <-chan struct{},
) error {
	eventCount := 0

	// Stopping the watcher should be idempotent and if we return from this function there's no way
	// we're coming back in with the same watch interface.
	defer w.Stop()

loop:
	for {
		select {
		case <-stopCh:
			return errorStopRequested
		case err := <-errc:
			return err
		case event, ok := <-w.ResultChan():
			if !ok {
				break loop
			}
			if event.Type == watch.Error {
				return apierrors.FromObject(event.Object)
			}
			if expectedType != nil {
				if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
					utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
					continue
				}
			}
			if expectedGVK != nil {
				if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
					utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
					continue
				}
			}
			meta, err := meta.Accessor(event.Object)
			if err != nil {
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
				continue
			}
			resourceVersion := meta.GetResourceVersion()
			switch event.Type {
			case watch.Added:
				err := store.Add(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
				}
			case watch.Modified:
				err := store.Update(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
				}
			case watch.Deleted:
				// TODO: Will any consumers need access to the "last known
				// state", which is passed in event.Object? If so, may need
				// to change this.
				err := store.Delete(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
				}
			case watch.Bookmark:
				// A `Bookmark` means watch has synced here, just update the resourceVersion
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
			}
			setLastSyncResourceVersion(resourceVersion)
			if rvu, ok := store.(ResourceVersionUpdater); ok {
				rvu.UpdateResourceVersion(resourceVersion)
			}
			eventCount++
		}
	}

	watchDuration := clock.Since(start)
	if watchDuration < 1*time.Second && eventCount == 0 {
		return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)
	}
	klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)
	return nil
}

 核心就是更新deltafifo和资源版本号

List 和 Watch就是这里触发,细节后面分析。

3.3 List的过程

r.list(stopCh)

// list simply lists all items and records a resource version obtained from the server at the moment of the call.
// the resource version can be used for further progress notification (aka. watch).
func (r *Reflector) list(stopCh <-chan struct{}) error {
	var resourceVersion string
	options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

	initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
	defer initTrace.LogIfLong(10 * time.Second)
	var list runtime.Object
	var paginatedResult bool
	var err error
	listCh := make(chan struct{}, 1)
	panicCh := make(chan interface{}, 1)
	go func() {
		defer func() {
			if r := recover(); r != nil {
				panicCh <- r
			}
		}()
		// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
		// list request will return the full response. 收集chunks的list;否则返回全部,笔者本地难道是不支持chunk 或者是需要配置开启chunk
		pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
			return r.listerWatcher.List(opts) //回调函数List
		}))
		switch {
		case r.WatchListPageSize != 0:
			pager.PageSize = r.WatchListPageSize
		case r.paginatedResult:
			// We got a paginated result initially. Assume this resource and server honor
			// paging requests (i.e. watch cache is probably disabled) and leave the default
			// pager size set.
		case options.ResourceVersion != "" && options.ResourceVersion != "0":
			// User didn't explicitly request pagination.
			//
			// With ResourceVersion != "", we have a possibility to list from watch cache,
			// but we do that (for ResourceVersion != "0") only if Limit is unset.
			// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
			// switch off pagination to force listing from watch cache (if enabled).
			// With the existing semantic of RV (result is at least as fresh as provided RV),
			// this is correct and doesn't lead to going back in time.
			//
			// We also don't turn off pagination for ResourceVersion="0", since watch cache
			// is ignoring Limit in that case anyway, and if watch cache is not enabled
			// we don't introduce regression.
			pager.PageSize = 0
		}

        //发起调用刚刚到回调函数,拿到items和resourceVersion
		list, paginatedResult, err = pager.List(context.Background(), options)
		if isExpiredError(err) || isTooLargeResourceVersionError(err) {
			r.setIsLastSyncResourceVersionUnavailable(true)
			// Retry immediately if the resource version used to list is unavailable.
			// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
			// continuation pages, but the pager might not be enabled, the full list might fail because the
			// resource version it is listing at is expired or the cache may not yet be synced to the provided
			// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
			// the reflector makes forward progress.
			list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
		}
		close(listCh) //协程管道通信
	}()
	select {
	case <-stopCh:
		return nil
	case r := <-panicCh:
		panic(r)
	case <-listCh: //管道close,这里就不阻塞了
	}
	initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
	if err != nil {
		klog.Warningf("%s: failed to list %v: %v", r.name, r.expectedTypeName, err)
		return fmt.Errorf("failed to list %v: %w", r.expectedTypeName, err)
	}

	// We check if the list was paginated and if so set the paginatedResult based on that.
	// However, we want to do that only for the initial list (which is the only case
	// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
	// situations we may force listing directly from etcd (by setting ResourceVersion="")
	// which will return paginated result, even if watch cache is enabled. However, in
	// that case, we still want to prefer sending requests to watch cache if possible.
	//
	// Paginated result returned for request with ResourceVersion="0" mean that watch
	// cache is disabled and there are a lot of objects of a given type. In such case,
	// there is no need to prefer listing from watch cache.
	if options.ResourceVersion == "0" && paginatedResult {
		r.paginatedResult = true
	}

    //状态数据,表示list成功,官方注释明确
	r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
	listMetaInterface, err := meta.ListAccessor(list)
	if err != nil {
		return fmt.Errorf("unable to understand list result %#v: %v", list, err)
	}
	resourceVersion = listMetaInterface.GetResourceVersion() //刚刚的版本号,从List函数读取的,就是上次请求apiserver的那个时刻的资源版本号
	initTrace.Step("Resource version extracted")
	items, err := meta.ExtractList(list) //拿到list数据
	if err != nil {
		return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
	}
	initTrace.Step("Objects extracted")
    // 写入delta FIFO队列
	if err := r.syncWith(items, resourceVersion); err != nil {
		return fmt.Errorf("unable to sync list result: %v", err)
	}
	initTrace.Step("SyncWith done")
	r.setLastSyncResourceVersion(resourceVersion) //更新资源版本
	initTrace.Step("Resource version updated")
	return nil
}

继续看List,反复在注释试图通过server的chunk读取,否则读取全部;检查设置limit,默认设置500

// List returns a single list object, but attempts to retrieve smaller chunks from the
// server to reduce the impact on the server. If the chunk attempt fails, it will load
// the full list instead. The Limit field on options, if unset, will default to the page size.
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
	if options.Limit == 0 {
		options.Limit = p.PageSize
	}
	requestedResourceVersion := options.ResourceVersion
	requestedResourceVersionMatch := options.ResourceVersionMatch
	var list *metainternalversion.List
	paginatedResult := false

	for {
		select {
		case <-ctx.Done():
			return nil, paginatedResult, ctx.Err()
		default:
		}

        //调用刚刚设置的回调函数r.listerWatcher.List(opts),进而继续调用最开始配置的函数lw.ListFunc(options)
        // 获取到资源版本号和items资源列表
		obj, err := p.PageFn(ctx, options)
		if err != nil {
			// Only fallback to full list if an "Expired" errors is returned, FullListIfExpired is true, and
			// the "Expired" error occurred in page 2 or later (since full list is intended to prevent a pager.List from
			// failing when the resource versions is established by the first page request falls out of the compaction
			// during the subsequent list requests).
			if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" {
				return nil, paginatedResult, err
			}
			// the list expired while we were processing, fall back to a full list at
			// the requested ResourceVersion.
			options.Limit = 0
			options.Continue = ""
			options.ResourceVersion = requestedResourceVersion
			options.ResourceVersionMatch = requestedResourceVersionMatch
			result, err := p.PageFn(ctx, options)
			return result, paginatedResult, err
		}
		m, err := meta.ListAccessor(obj)
		if err != nil {
			return nil, paginatedResult, fmt.Errorf("returned object must be a list: %v", err)
		}

		// exit early and return the object we got if we haven't processed any pages
        //没有资源,就返回了, 如果是分页,那么后面还有资源
		if len(m.GetContinue()) == 0 && list == nil {
			return obj, paginatedResult, nil
		}

		// initialize the list and fill its contents
		if list == nil {
			list = &metainternalversion.List{Items: make([]runtime.Object, 0, options.Limit+1)} //
			list.ResourceVersion = m.GetResourceVersion()//非常重要,更新最新版本号,分页的时候
			list.SelfLink = m.GetSelfLink()
		} 
        // 分页准备,如果continue,那么还有数据,所以需要list定义且不断for循环拼接obj,直到分页结束。
		if err := meta.EachListItem(obj, func(obj runtime.Object) error {
			list.Items = append(list.Items, obj)
			return nil
		}); err != nil {
			return nil, paginatedResult, err
		}

		// if we have no more items, return the list
		if len(m.GetContinue()) == 0 { //分页结束
			return list, paginatedResult, nil
		}

		// set the next loop up
		options.Continue = m.GetContinue() //设置读取的分页,为下一次调用数据准备
		// Clear the ResourceVersion(Match) on the subsequent List calls to avoid the
		// `specifying resource version is not allowed when using continue` error.
		// See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143.
		options.ResourceVersion = ""
		options.ResourceVersionMatch = ""
		// At this point, result is already paginated.
		paginatedResult = true //分页标记
	}
}

// List a set of apiserver resources
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
	// ListWatch is used in Reflector, which already supports pagination.
	// Don't paginate here to avoid duplication.
	return lw.ListFunc(options)
}

这里又回到了最开始的函数指针

继续分析List函数

client.CoreV1().Pods(namespace).List(context.TODO(), options)
// List takes label and field selectors, and returns the list of Pods that match those selectors.
func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
	var timeout time.Duration
	if opts.TimeoutSeconds != nil {
		timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
	}
	result = &v1.PodList{}
	err = c.client.Get().
		Namespace(c.ns).
		Resource("pods").
		VersionedParams(&opts, scheme.ParameterCodec).
		Timeout(timeout).
		Do(ctx).
		Into(result)
	return
}

func (r *Request) Do(ctx context.Context) Result {
	var result Result
	err := r.request(ctx, func(req *http.Request, resp *http.Response) {
		result = r.transformResponse(resp, req)
	})
	if err != nil {
		return Result{err: err}
	}
	if result.err == nil || len(result.body) > 0 {
		metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
	}
	return result
}

func (r Result) Into(obj runtime.Object) error {
	if r.err != nil {
		// Check whether the result has a Status object in the body and prefer that.
		return r.Error()
	}
	if r.decoder == nil {
		return fmt.Errorf("serializer for %s doesn't exist", r.contentType)
	}
	if len(r.body) == 0 {
		return fmt.Errorf("0-length response with status code: %d and content type: %s",
			r.statusCode, r.contentType)
	}

	out, _, err := r.decoder.Decode(r.body, nil, obj)
	if err != nil || out == obj {
		return err
	}
	// if a different object is returned, see if it is Status and avoid double decoding
	// the object.
	switch t := out.(type) {
	case *metav1.Status:
		// any status besides StatusSuccess is considered an error.
		if t.Status != metav1.StatusSuccess {
			return errors.FromObject(t)
		}
	}
	return nil
}

3部曲,构建request,do,decode response body,注意这个decode是阻塞的,默认使用json的decoder,在List的过程,笔者本地并未阻塞,可能跟不支持chunk有关,或者笔者的K8S版本比较新,旧版本支持chunk。 笔者就读取到了抓包的第一个请求,获取500条以内的items pod列表,和当前资源版本号

3.4 watch核心过程 

 Watch实际上在上面讲的定时任务就会触发,最终触发WatchFunc函数指针

分析源码

// Watch returns a watch.Interface that watches the requested pods.
func (c *pods) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
	var timeout time.Duration
	if opts.TimeoutSeconds != nil {
		timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
	}
	opts.Watch = true
	return c.client.Get().
		Namespace(c.ns).
		Resource("pods").
		VersionedParams(&opts, scheme.ParameterCodec).
		Timeout(timeout).
		Watch(ctx)
}

普通的Http请求,核心看Watch

// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
	// We specifically don't want to rate limit watches, so we
	// don't use r.rateLimiter here.
	if r.err != nil {
		return nil, r.err
	}

	client := r.c.Client
	if client == nil {
		client = http.DefaultClient
	}

	isErrRetryableFunc := func(request *http.Request, err error) bool {
		// The watch stream mechanism handles many common partial data errors, so closed
		// connections can be retried in many cases.
		if net.IsProbableEOF(err) || net.IsTimeout(err) {
			return true
		}
		return false
	}
	retry := r.retryFn(r.maxRetries)
	url := r.URL().String()
	for {
		if err := retry.Before(ctx, r); err != nil {
			return nil, retry.WrapPreviousError(err)
		}

		req, err := r.newHTTPRequest(ctx)
		if err != nil {
			return nil, err
		}
        //http请求
		resp, err := client.Do(req)
		updateURLMetrics(ctx, r, resp, err)
		retry.After(ctx, r, resp, err)
		if err == nil && resp.StatusCode == http.StatusOK {
			return r.newStreamWatcher(resp) //这里很关键,毕竟正常情况下就是这里处理的,就是http长轮询的原因,response的body控制阻塞,并通过管道跟delta和handler交互
		}

        //错误消息处理
		done, transformErr := func() (bool, error) {
			defer readAndCloseResponseBody(resp)

			if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
				return false, nil
			}

			if resp == nil {
				// the server must have sent us an error in 'err'
				return true, nil
			}
			if result := r.transformResponse(resp, req); result.err != nil {
				return true, result.err
			}
			return true, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
		}()
        //错误处理成功就返回,等下一次ListAndWatch了
        //否则就再发一次请求
		if done {
			if isErrRetryableFunc(req, err) {
				return watch.NewEmptyWatch(), nil
			}
			if err == nil {
				// if the server sent us an HTTP Response object,
				// we need to return the error object from that.
				err = transformErr
			}
			return nil, retry.WrapPreviousError(err)
		}
	}
}

可以看到Https 并不是chunk,没有chunk标记

newStreamWatcher
func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) {
	contentType := resp.Header.Get("Content-Type")
	mediaType, params, err := mime.ParseMediaType(contentType)
	if err != nil {
		klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
	}
	objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
	if err != nil {
		return nil, err
	}

	handleWarnings(resp.Header, r.warningHandler)
    //帧处理,命名可以简单认为http2,resp.Body,这个是个流,可以被server阻塞
	frameReader := framer.NewFrameReader(resp.Body)
	watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)

    //这个是神奇操作,就是数据的接收操作
	return watch.NewStreamWatcher(
		restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
		// use 500 to indicate that the cause of the error is unknown - other error codes
		// are more specific to HTTP interactions, and set a reason
		errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
	), nil
}

实际上就是协程读取Decoder sw.receive()

// NewStreamWatcher creates a StreamWatcher from the given decoder.
func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
	sw := &StreamWatcher{
		source:   d,
		reporter: r,
		// It's easy for a consumer to add buffering via an extra
		// goroutine/channel, but impossible for them to remove it,
		// so nonbuffered is better.
		result: make(chan Event),
		// If the watcher is externally stopped there is no receiver anymore
		// and the send operations on the result channel, especially the
		// error reporting might block forever.
		// Therefore a dedicated stop channel is used to resolve this blocking.
		done: make(chan struct{}),
	}
	go sw.receive()
	return sw
}

sw.receive()

// receive reads result from the decoder in a loop and sends down the result channel.
func (sw *StreamWatcher) receive() {
	defer utilruntime.HandleCrash()
	defer close(sw.result)
	defer sw.Stop()
	for {
        //核心就这里,不断的读取,直到EOF,或者sw done
		action, obj, err := sw.source.Decode()
		if err != nil {
			switch err {
			case io.EOF:
				// watch closed normally
			case io.ErrUnexpectedEOF:
				klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
			default:
				if net.IsProbableEOF(err) || net.IsTimeout(err) {
					klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
				} else {
					select {
					case <-sw.done:
					case sw.result <- Event{
						Type:   Error,
						Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
					}:
					}
				}
			}
			return
		}
		select {
		case <-sw.done:
			return
		case sw.result <- Event{ //watch结果
			Type:   action,
			Object: obj,
		}:
		}
	}
}

这个sw的result的管道就会在cache.(*Reflector).ListAndWatch 的

watchHandler

写入deltafifo队列,队列POP就会触发handler和store存储

sw.source.Decode()一般情况数据是json数据,会使用json的Decoder处理,笔者本地的K8S是读取过程中阻塞,直到api-server有数据过来。

总结

实际上client-go的核心代码并不复杂,但是有比较长的流程,架构设计又有restClient的多重交付,后面的fifo队列,监听器回调,本地store等,demo案例的POD监听大概逻辑如下图:

 看起来很简单,但是细节很多,而且实现多样化,最难受的是匿名函数指针的传递,很难受,读懂代码需要结合上下文才行。

参考资料

如何通过抓包来查看Kubernetes API流量-阿里云开发者社区 (aliyun.com)

可能是史上最全的Kubernetes证书解析 (qingwave.github.io)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/189647.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Hudi系列10:Flink流式插入

文章目录流式插入概述一. Hudi流式插入案例1(datagen)1.1 准备工作1.2 源端准备1.3 目标端表准备1.4 ETL准备1.5 数据验证1.6 通过SPARK SQL查看数据二. Hudi流式插入案例2(Kafka)2.1 准备工作2.2 源端准备2.2.1 创建kafka的topic (hudi_flink)2.2.2 Flink SQL Client消费kafka…

卷积神经网络中的权值共享和局部连接

卷积神经网络中的权值共享和局部连接卷积神经网络的两大特点权值共享全连接卷积神经网络的两大特点 权值共享&#xff0c;就是输入一张图&#xff0c;用一个filter去扫这张图&#xff0c;filter里面的数就叫权重&#xff0c;这张图每个位置都是被同样的filter扫的&#xff0c;…

Flink官方例子解析:WordCount

1. 简介 今天介绍的是官方子项目flink-examples-streaming里面的WordCount例子。 WordCount &#xff0c;中文&#xff1a;单词统计&#xff0c;是大数据计算常用的例子。 2. WordCount需要实现的功能 监听指定目录下的文件&#xff0c;读取文件的文本内容&#xff1b;如果未…

Python继承机制及其使用

Python 类的封装、继承、多态 3 大特性&#xff0c;前面章节已经详细介绍了 Python 类的封装&#xff0c;本节继续讲解 Python 类的继承机制。继承机制经常用于创建和现有类功能类似的新类&#xff0c;又或是新类只需要在现有类基础上添加一些成员&#xff08;属性和方法&#…

RASP技术进阶系列(三):重大漏洞自动化热修复

在上篇文章《RASP技术进阶系列&#xff08;二&#xff09;&#xff1a;东西向Web流量智能检测防御》中提到&#xff0c;在企业日常安全运营以及HW场景下&#xff0c;应用漏洞攻击应急响应和恶意流量溯源分析是安全团队的重点工作。在恶意流量溯源方面&#xff0c;指向攻击来源的…

趁着你对象吃泡面的功夫,我修复了误删除的文件

文章目录前言一. linux下文件删除原理1.1 文件删除原理的简单介绍1.2 测试inode号是否容易被覆盖&#xff1f;二. 实验测试过程2.1 实验环境&#xff1a;2.2 新增一块硬盘测试2.3 对磁盘分区2.3.1 分区&#xff08;使用fdisk分区&#xff09;2.3.2 格式化&#xff0c;创建目录挂…

网络化多智能体系统的共识与合作

在所有参与者之间提供快速协议和团队合作的算法通过自组织网络系统实现有效的任务执行。By Reza Olfati-Saber, Member IEEE, J. Alex Fax, and Richard M. Murray, Fellow IEEE小于 翻译摘要&#xff1a;本文提供了一个理论框架&#xff0c;用于分析多智能体网络系统的共识算法…

Linux文件与目录的查看:ls

前言 ls作为我们在Linux系统中最常用的命令&#xff0c;因为我们常常需要去知道文件或是目录的相关信息&#xff0c;但我们Linux的文件所记录的信息实在是太多了&#xff0c;ls也没有需要全部都列出来&#xff0c;所以&#xff0c;当我们执行ls命令时&#xff0c;默认显示的只…

【数据结构】基础:二叉搜索树

【数据结构】基础&#xff1a;二叉搜索树 摘要&#xff1a;本文为二叉树的进阶&#xff0c;主要介绍其概念与基本实现&#xff08;递归与非递归&#xff09;&#xff0c;再介绍其应用&#xff0c;主要介绍内容为KV模型。最后为简单的性能分析。 文章目录【数据结构】基础&#…

【数据结构】1.1 数据结构的研究内容

文章目录数据结构的研究内容数据结构研究的内容小结数据结构的研究内容 早期&#xff0c;计算机主要用于数值计算: 首先&#xff0c;分析问题、提取操作对象&#xff0c;然后&#xff0c;找出操作对象之间的关系&#xff0c;用数学语言加以描述&#xff0c;建立相应数学方程。…

Java日志门面技术 SLF4J

文章目录背景SLF4J概述切换日志框架实际应用配合自身简单日志实现(slf4j-simple)配置logback日志实现配置Log4J日志实现(需适配器)配置JUL日志实现(需适配器)添加slf4j-nop依赖(日志开关)桥接旧的日志实现框架背景 随着系统开发的进行&#xff0c;可能会更新不同的日志框架&am…

TF数据流图图与TensorBoard

2.1 TF数据流图 学习目标 目标 说明TensorFlow的数据流图结构应用 无内容预览 2.1.1 案例&#xff1a;TensorFlow实现一个加法运算 1 代码2 TensorFlow结构分析2.1.2 数据流图介绍 2.1.1 案例&#xff1a;TensorFlow实现一个加法运算 2.1.1.1 代码 def tensorflow_demo():&…

CMMI对企业有什么价值,如何高效落地?

1、获得权威认证 CMMI是全球性软件与系统工程行业的唯一权威认证&#xff0c;是对企业软件研发与能力服务的认可。 CMMI企业价值 CoCode项目管理全面支持CMMI3-5级高效落地​ 2、降本增效&#xff0c;提高企业能力。 CMMI对软件开发过程进行规范化梳理&#xff0c;保证软…

虚拟机ubuntu系统内存满,无法进入桌面,扩展内存

1、 关闭虚拟机&#xff0c;在虚拟机设置中将原先20GB扩展到30GB 注意&#xff1a;有快照需要删除快照后才能扩展 2、命令行进入ubuntu 内存满了&#xff0c;无法进入Ubuntu图形界面 按下ctrlaltf2~f6组合键 输入用户名和密码进入命令行模式 3、删除一些东西 删除回收站…

vuex的modules和辅助函数

一、回顾&#xff1a;vuex状态管理器1、版本问题&#xff1a;vue2对应的是vuex3&#xff1b;vue3对应的vuex42、vuex作用&#xff1a;每个vuex中都有一个Store(仓库)&#xff0c;用于管理vue项目中用到的状态变量&#xff08;属性&#xff09;。vuex维护的是一个单一的状态树vu…

工作常用cron总结

一、cron表达式详解 corn从左到右&#xff08;用空格隔开&#xff09;&#xff1a; 秒 分 小时 日 月 周 (星期中的日期&#xff0c;1代表周日&#xff0c;7代表周六) 年 定时任务统计 数据同步 0 0 10 * * &#xff1f; 每天上午10点触发…

Spring 整合Mybatis。

目录 一、环境准备 1、Mybatis 环境 2、整合思路分析 二、Spring整合Mybatis 三、Spring整合Junit 一、环境准备 1、Mybatis 环境 ▶ 步骤1 : 准备数据库表 Mybatis是来操作数据库表&#xff0c;所以先创建一个数据库及表 create database spring_db character set utf8; …

LeetCode刷题系列 -- 1008. 前序遍历构造二叉搜索树

给定一个整数数组&#xff0c;它表示BST(即 二叉搜索树 )的 先序遍历 &#xff0c;构造树并返回其根。保证 对于给定的测试用例&#xff0c;总是有可能找到具有给定需求的二叉搜索树。二叉搜索树 是一棵二叉树&#xff0c;其中每个节点&#xff0c; Node.left 的任何后代的值 严…

JVM的理解(垃圾回收算法和类加载过程)

文章目录1、JVM的位置2、JVM的体系结构3、JVM组件3.1、类加载器&#xff08;加载class文件&#xff09;3.1.1、类加载器的执行步骤3.2、PC寄存器3.3、方法区3.4、栈3.5、堆4、GC算法4.1、引用计数法4.2、复制算法1、模型2、原理图4.3、标记清除4.4、标记压缩总结&#xff1a;1、…

2023年了学Java还能找到工作么?

Java人才需求缺口巨大 为何还有人找不到工作&#xff1f; 近两年&#xff0c;传统企业开始数字化转型&#xff0c;各企业对互联网IT技术人才呈现井喷趋势。对于进可攻前端、后可守后端的Java程序员而言&#xff0c;市场对他们青睐有加&#xff0c;薪资更是水涨船高。然而在…