前言
最近做项目,需要写一个controller(k8s的插件),需要从k8s的apiserver取数据,就用了自带的client-go,但是client-go是怎么从apiserver获取数据的一直没有研究过,只是看网上,看官方文档说是chunk读取数据,然而事实上,笔者却发现使用http2.0的长轮询。强烈建议使用linux或者mac开发机。
1. client-go demo
demo实际上就是官方代码,这段代码是网上流传的经典代码
client-go v0.25.3
kubernates 1.25.4
config, err := clientcmd.BuildConfigFromFlags("", "~/.kube/config")//注意路径
if err != nil {
log.Fatal(err)
}
//这2行是抓包的时候使用,日常是不需要的
config.TLSClientConfig.CAData = nil
config.TLSClientConfig.Insecure = true
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal(err)
}
//这里可以调一些参数,defaultResync很关键
factory := informers.NewSharedInformerFactoryWithOptions(clientSet, 0, informers.WithNamespace("default"))
informer := factory.Core().V1().Pods().Informer()//获取pod的informer,实际上使用client-go的api很多informer都创建了,直接拿过来用,避免使用的时候重复创建
informer.AddEventHandler(xxx) //事件处理,是一个回调hook
stopper := make(chan struct{}, 1)
go informer.Run(stopper)
log.Println("----- list and watch pod starting...")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
close(stopper)
log.Println("main stopped...")
demo构建好了,实际上就可以在k8s启动后运行,本质上k8s的监听就是apiserver发送指令,驱动k8s的各个部件干活,驱动的本质的http的“推送”,为了真实的还原,使用抓包工具抓包分析,随便写一个deployment的yaml文件,可以自行构建,以官方文档为例Deployments | Kubernetes
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
labels:
app: nginx
spec:
replicas: 2
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.14.2
ports:
- containerPort: 80
因为笔者本地跑的东西比较多,节省资源就部署2个pod
2. apiserver抓包分析
针对刚刚到官方yaml,使用
kubectl apply -f xxx.yaml
即可部署deployment的pod,通过创建ReplicaSet创建POD
可以看到部署的POD,pause容器是k8s的定义容器,定义pod的网络等资源。以本地docker-desktop为例,对于本地,可以通过设置修改,看到k8s的容器
当然也可以使用kubectl。
2.1 抓包方式
实际上抓包方式总类很多,概括实现主要有2种,代理和复制,比如典型的VPN和tcpdump。代理比较好说,就是中间代理,在istio的时候envoy就是iptables代理;tcpdump也是linux很常用的方式,比如笔者以前讲的goreplay,使用pacap,通过bpf技术。
抓包工具wireshark就是tcpdump的典型实现,但是因为配置证书麻烦,所以此次使用代理抓包,也可以tcpdemp后使用wireshark分析,注意tls1.2和tls1.3的区别,tls1.3解包更困难,因为生成非对称的秘钥是算法动态生成的,相对tls1.2比较好解包。
这2个抓包工具是3个平台都有安装包的
实战wireshark
执行 sudo chown -R xxx:admin /dev/bpf*
然后抓取本地网卡,127的ip选择lo0即可,过滤port:tcp.port == 6443(6443是本地k8s的apiserver的端口,一般tls的端口默认443)
执行kubectl delete deployment nginx-deployment
tls1.3 ,看来解包不容易,笔者查询资料,有ali的社区说可以使用代理解包,相对比较容易,笔者也试了,确实可以,但是有个问题文章没提到如何通过抓包来查看Kubernetes API流量-阿里云开发者社区 (aliyun.com)
可能是史上最全的Kubernetes证书解析 (qingwave.github.io)
实战charles
# 提取出客户端证书
grep client-certificate-data ~/.kube/config | \
awk '{ print $2 }' | \
base64 --decode > client-cert.pem
# 提取出客户端私钥
grep client-key-data ~/.kube/config | \
awk '{ print $2 }' | \
base64 --decode > client-key.pem
# 提取出服务端CA证书
grep certificate-authority-data ~/.kube/config | \
awk '{ print $2 }' | \
base64 --decode > cluster-ca-cert.pem
提取kube的证书信息
导入文本,private key是私钥;cert是证书,里面有公钥,非对称加密。
勾上,k8s使用http2
此时只想kubectl需要配置https_proxy,否则还是会直连
export https_proxy=http://127.0.0.1:8888/
如果不使用了,可以使用
删除,但是只对此次操作有效
export -n https_proxy=http://127.0.0.1:8888/
unset ,只会在当前环境有效
unset https_proxy
配置代理会出现证书不认识,可以把charles证书加入系统信任
huahua@huahuadeMac-mini kube % kubectl get pod
Unable to connect to the server: x509: certificate signed by unknown authority
执行代理后,证书认证是不过的,因为charles代理了请求。需要忽略证书认证,或者导入charles的证书,避免麻烦,直接忽略吧
kubectl --insecure-skip-tls-verify get pods -A
huahua@huahuadeMac-mini kube % kubectl --insecure-skip-tls-verify get pods
Error from server (Forbidden): pods is forbidden: User "system:anonymous" cannot list resource "pods" in API group "" in the namespace "default"
就是没权限,本地很好解决,用cluster-admin的权限角色给过去
huahua@huahuadeMac-mini .kube % kubectl create clusterrolebinding test:anonymous --clusterrole=cluster-admin --user=system:anonymous
clusterrolebinding.rbac.authorization.k8s.io/test:anonymous created
至此抓包成功,代理后使用的tls1.2连接apiserver。如果是tls1.3还会麻烦点
执行上面demo的代码
go build -o kube_listen .
执行代理配置
export https_proxy=http://127.0.0.1:8888/
./kube_listen #执行程序
执行
kubectl --insecure-skip-tls-verify apply -f nginx-deployment.yaml
分析抓包
对于监听程序抓包如下,实际上kubectl也是使用http访问apiserver,也可以被抓包到。
启动时会监听2个api,以demo的pod为例,实际上就是ListAndWatch的结果
这个就是List的API。
首先发起请求,访问现在存在的pod数据,并且获取当前最新的资源版本(版本控制) ,Watch的接口。
到了超时时间,发起新的请求
然后使用新版本号读取新的变更资源,读取是个长轮询的过程,且还是http2,在源码分析也可以得出相同结论。并非http1.1+chunk,chunk必须在header标记chunk,否则client怎么知道是chunk呢,chunk的数据是特殊格式的,解析也必须特殊解析。
然后试着让某个pod模拟突然down
推送了2条pod变更消息
对比2个结果
一个是pod down的message,一个是pod重启后的message,k8s的默认调度能力
观察一段时间会发现监听版本号太旧,会更新版本号,从新发起监听request请求
服务端会检测资源版本号,太旧就会直接返回,不确定是否定时检查,需要查看api-server的源码。
版本号太旧会继续执行List的操作请求,会获取到当前版本的pod的列表信息,然后执行Watch。
3. client-go 监听源码分析
笔者在分析api-server的时候,拿到github的apiserver的源码,发现没有main入口,查询github,发现代码入口在kubernates里面
可以通过
kubectl --namespace="kube-system" describe pod kube-apiserver-docker-desktop
看到api-server的详情
3.1 client-go Config
第一步是读取配置,证书等的文件信息
clientcmd.BuildConfigFromFlags("", "~/.kube/config")
读取的过程会根据参数使用不同的实现,本地模式使用DeferredLoadingClientConfig
load配置文件,调用k8s versioning.go自带的decoder
func LoadFromFile(filename string) (*clientcmdapi.Config, error) {
kubeconfigBytes, err := ioutil.ReadFile(filename)
if err != nil {
return nil, err
}
config, err := Load(kubeconfigBytes)
if err != nil {
return nil, err
}
klog.V(6).Infoln("Config loaded from file: ", filename)
// set LocationOfOrigin on every Cluster, User, and Context
for key, obj := range config.AuthInfos {
obj.LocationOfOrigin = filename
config.AuthInfos[key] = obj
}
for key, obj := range config.Clusters {
obj.LocationOfOrigin = filename
config.Clusters[key] = obj
}
for key, obj := range config.Contexts {
obj.LocationOfOrigin = filename
config.Contexts[key] = obj
}
if config.AuthInfos == nil {
config.AuthInfos = map[string]*clientcmdapi.AuthInfo{}
}
if config.Clusters == nil {
config.Clusters = map[string]*clientcmdapi.Cluster{}
}
if config.Contexts == nil {
config.Contexts = map[string]*clientcmdapi.Context{}
}
return config, nil
}
读取的文件实际上是url、tls信息、content等信息
kubernetes.NewForConfig(config)
实际上就是初始化restClient
func NewForConfig(c *rest.Config) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.UserAgent == "" {
configShallowCopy.UserAgent = rest.DefaultKubernetesUserAgent()
}
// share the transport between all clients
// 上面的注释很明显了,创建httpclient,share transport
httpClient, err := rest.HTTPClientFor(&configShallowCopy)
if err != nil {
return nil, err
}
// 看看如何share的
return NewForConfigAndClient(&configShallowCopy, httpClient)
}
定位发现一堆使用httpClient
以第一个为例
// NewForConfigAndClient creates a new AdmissionregistrationV1Client for the given config and http client.
// Note the http client provided takes precedence over the configured transport values.
func NewForConfigAndClient(c *rest.Config, h *http.Client) (*AdmissionregistrationV1Client, error) {
config := *c
// 设置默认值, url path等
if err := setConfigDefaults(&config); err != nil {
return nil, err
}
//复用transport
client, err := rest.RESTClientForConfigAndClient(&config, h)
if err != nil {
return nil, err
}
return &AdmissionregistrationV1Client{client}, nil
}
看看client的创建过程
func RESTClientForConfigAndClient(config *Config, httpClient *http.Client) (*RESTClient, error) {
if config.GroupVersion == nil {
return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
}
if config.NegotiatedSerializer == nil {
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
}
// 从配置拿到了url
baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
if err != nil {
return nil, err
}
// 限流,不配置就是默认,如果开发k8s的插件,建议根据实际需求配置
rateLimiter := config.RateLimiter
if rateLimiter == nil {
qps := config.QPS
if config.QPS == 0.0 {
qps = DefaultQPS
}
burst := config.Burst
if config.Burst == 0 {
burst = DefaultBurst
}
if qps > 0 {
rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
}
}
var gv schema.GroupVersion
if config.GroupVersion != nil {
gv = *config.GroupVersion
}
clientContent := ClientContentConfig{
AcceptContentTypes: config.AcceptContentTypes,
ContentType: config.ContentType,
GroupVersion: gv,
Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
}
// 复用httpClient,因为指针
restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
if err == nil && config.WarningHandler != nil {
restClient.warningHandler = config.WarningHandler
}
return restClient, err
}
其他创建过程差不多,省略
informers.NewSharedInformerFactoryWithOptions(clientSet, 0, informers.WithNamespace("default"))
工厂类,初始化informer准备
factory.Core().V1().Pods().Informer()
前面的函数都是准备数据,重点看Informer函数
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
resyncPeriod, exists := f.customResync[informerType]
if !exists {
//关键参数之一
resyncPeriod = f.defaultResync
}
// 创建informer,注意有2个回调函数,函数指针
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
回调函数,极其关键,ListWatch对应的函数List和Watch,跟前面抓包相对应。
// NewFilteredPodInformer constructs a new informer for Pod type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
看看添加监听器
informer.AddEventHandler(xxx) // 刚刚设置的ResyncPeriod派上用场
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.stopped {
klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
return
}
//检查ResyncPeriod
if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod {
klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod)
resyncPeriod = minimumResyncPeriod
}
if resyncPeriod < s.resyncCheckPeriod {
if s.started {
klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
resyncPeriod = s.resyncCheckPeriod
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
// accordingly
s.resyncCheckPeriod = resyncPeriod
s.processor.resyncCheckPeriodChanged(resyncPeriod)
}
}
}
//新增监听器
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
if !s.started {
// Informer未启动时加入监听
s.processor.addListener(listener)
return
}
// in order to safely join, we have to
// 1. stop sending add/update/delete notifications
// 2. do a list against the store
// 3. send synthetic "Add" events to the new handler
// 4. unblock
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock() //阻塞Deltas队列,后面监听会用到,Informer的实现架构的一环
s.processor.addListener(listener) // Informer启动后加入监听器
for _, item := range s.indexer.List() { //对新监听器执行List操作,数据直接给过去
listener.add(addNotification{newObj: item})
}
}
看关键部分addListener
func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
// 加入切片
p.addListenerLocked(listener)
if p.listenersStarted { //如果是已经启动,那么执行listener的run和pop;这是2个关键的chan处理函数
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
p.listeners = append(p.listeners, listener)
p.syncingListeners = append(p.syncingListeners, listener)
}
如果状态是已启动,把缓存的数据刷到handler事件(动态增加listener),看看这2个关键处理函数
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne() //读取pending的缓存数据
if !ok { // Nothing to pop //读取到数据就在下一次循环给nextCh管道
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh: //这个是上面的函数,在Informer状态是启动时,或者启动后,写入的List数据
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd) //写入listener缓存
}
}
}
}
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
//这就是handler的回调,nextChan就是上面pop产生
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
分析过程中发现listener有Informer启动前和启动后添加的过程,启动后已经分析,看看启动前Informer怎么启动的
informer.Run(stopper) //不少网上教程也有自己写controller的,实际上client-go已经封装了
go的匿名函数指针在client-go有非常多的地方应用,看起代码来很累😅,再用变量传来传去
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() //处理panic
if s.HasStarted() { //实际上这个函数是启动Informer,所以如果已经启动就返回,避免重复启动
klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
return
}
//fifo队列,对应的是lifo;这个设计是Informer的关键设计之一
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
//关键的配置,非常关键,涉及整个架构的过程
cfg := &Config{
Queue: fifo, //deltafifo队列
ListerWatcher: s.listerWatcher, //list watch 函数接口
ObjectType: s.objectType, //监听类型,笔者这里监听的pod,因为前面的代码调用了Pod()函数
FullResyncPeriod: s.resyncCheckPeriod, //重新检查周期
RetryOnError: false,
//resync是listener的重新同步,可以看这个函数的实现
ShouldResync: s.processor.shouldResync, //是否需要重新同步的函数指针
Process: s.HandleDeltas, //处理delta的函数
WatchErrorHandler: s.watchErrorHandler, //见名知义
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
//cache缓存监控
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run) //这个run就是前面的解析的run函数
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}() //defer函数,在controller运行后更新状态
s.controller.Run(stopCh) //关键一步
}
DeltaFIFO创建,里面是map和切片的结合体,还有锁
// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
// items. See also the comment on DeltaFIFO.
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
opts.KeyFunction = MetaNamespaceKeyFunc
}
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: opts.KeyFunction,
knownObjects: opts.KnownObjects,
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
}
f.cond.L = &f.lock
return f
}
还有线程安全的本地存储,所以实际上这个也可以用store里面取数据,常见的用法是nginx-ingress的用法Welcome - NGINX Ingress Controller (kubernetes.github.io),可以去看源代码用的store的方式。
看看HandleDeltas处理队列的函数,这个是一个关键过程,所以List和Watch都会触发handler
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, s.transform, deltas)
}
return errors.New("object given as Process argument is not Deltas")
}
// Multiplexes updates in the form of a list of Deltas into a Store, and informs
// a given handler of events OnUpdate, OnAdd, OnDelete
func processDeltas(
// Object which receives event notifications from the given deltas
handler ResourceEventHandler,
clientState Store,
transformer TransformFunc,
deltas Deltas,
) error {
// from oldest to newest
for _, d := range deltas {
obj := d.Object
if transformer != nil { //默认情况这个是nil,即不需要转换
var err error
obj, err = transformer(obj) //转换类型,前面的config里面是有类型的
if err != nil {
return err
}
}
//handler分发,这也是我们注册的handler生效的原因
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil { //更新状态,可以是本地缓存,也可以是队列,默认使用本地store ThreadSafeStore
return err
}
handler.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil { //同上
return err
}
handler.OnAdd(obj)
}
case Deleted:
if err := clientState.Delete(obj); err != nil { //同上
return err
}
handler.OnDelete(obj)
}
}
return nil
}
s.controller.Run(stopCh) //最终的启动干活了
// Run begins processing items, and will continue until a value is sent down stopCh or it is closed. run就开始干活了,直到被stopCh关闭
// It's an error to call Run more than once. 调用多次是错误的
// Run blocks; call via go. 运行会阻塞,用协程调用
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() //处理panic
go func() {
<-stopCh
c.config.Queue.Close()
}() //关闭代码
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
// 定时List And Watch
wg.StartWithChannel(stopCh, r.Run)
// delta队列取数据处理,监听后面的事情了
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
继续看r.Run,这个是循环运行的函数,定时运行ListAndWatch,直到stopChan;r.ListAndWatch(stopCh)就是去读取和定时增量更新,所以会一直不断的List And Watch,对应抓包的循环调用List和Watch的url
// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
定时循环
processLoop函数,不断的从queue delta队列取数据处理
func (c *controller) processLoop() {
for {
//使用函数指针的强制类型转换c.config.Process实际上在前面定义了,即s.HandleDeltas
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
HandleDeltas前面已经分析过了,交付handler与store
3.2 ListAndWatch的过程
实际上上面已经把Informer启动的过程分析完成,但是Informer的数据是怎么拿到的,毕竟数据的来源是根源,否则后面的delta队列和store也没有必要,需要进一步分析
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
err := r.list(stopCh) //list,即前面抓包的list,读取资源版本号和items列表(此刻),用于Watch
if err != nil {
return err
}
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil { //重新同步delta队列
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <-stopCh:
return nil
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{ //watch初始化,资源版本号,超时随机时长
ResourceVersion: r.LastSyncResourceVersion(),
// We want to avoid situations of hanging watchers. Stop any watchers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks: true,
}
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now()
//实际上是立即返回,但是response的body是阻塞的,除非apiserver给出结束标记,或者超时
w, err := r.listerWatcher.Watch(options) //watch核心,读取到监听数据
if err != nil {
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case begin exponentially backing off and resend watch request.
// Do the same for "429" errors.
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
<-r.initConnBackoffManager.Backoff().C()
continue
}
return err
}
//很关键,读取上面watch阻塞的response body的通信管道,写入delta fifo队列
// 上面是watch -- 阻塞 -- apiserver结束或超时 -- 写入管道
// 此处是从管道读取 -- 写入deltaFIFO;当没有数据是阻塞的,结束条件是超时,或者apiserver error信息
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
retry.After(err)
if err != nil {
if err != errorStopRequested {
switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
case apierrors.IsTooManyRequests(err):
klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)
<-r.initConnBackoffManager.Backoff().C()
continue
case apierrors.IsInternalError(err) && retry.ShouldRetry():
klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.expectedTypeName, err)
continue
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}
}
return nil //结束当前任务执行定时任务 //wait.BackoffUntil函数,上面已经分析过了
}
}
}
watchHandler
// watchHandler watches w and sets setLastSyncResourceVersion
func watchHandler(start time.Time,
w watch.Interface,
store Store,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
clock clock.Clock,
errc chan error,
stopCh <-chan struct{},
) error {
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
}
if expectedType != nil {
if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
continue
}
}
if expectedGVK != nil {
if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
continue
}
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
continue
}
resourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Modified:
err := store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
}
setLastSyncResourceVersion(resourceVersion)
if rvu, ok := store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(resourceVersion)
}
eventCount++
}
}
watchDuration := clock.Since(start)
if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)
return nil
}
核心就是更新deltafifo和资源版本号
List 和 Watch就是这里触发,细节后面分析。
3.3 List的过程
r.list(stopCh)
// list simply lists all items and records a resource version obtained from the server at the moment of the call.
// the resource version can be used for further progress notification (aka. watch).
func (r *Reflector) list(stopCh <-chan struct{}) error {
var resourceVersion string
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var paginatedResult bool
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response. 收集chunks的list;否则返回全部,笔者本地难道是不支持chunk 或者是需要配置开启chunk
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts) //回调函数List
}))
switch {
case r.WatchListPageSize != 0:
pager.PageSize = r.WatchListPageSize
case r.paginatedResult:
// We got a paginated result initially. Assume this resource and server honor
// paging requests (i.e. watch cache is probably disabled) and leave the default
// pager size set.
case options.ResourceVersion != "" && options.ResourceVersion != "0":
// User didn't explicitly request pagination.
//
// With ResourceVersion != "", we have a possibility to list from watch cache,
// but we do that (for ResourceVersion != "0") only if Limit is unset.
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
// switch off pagination to force listing from watch cache (if enabled).
// With the existing semantic of RV (result is at least as fresh as provided RV),
// this is correct and doesn't lead to going back in time.
//
// We also don't turn off pagination for ResourceVersion="0", since watch cache
// is ignoring Limit in that case anyway, and if watch cache is not enabled
// we don't introduce regression.
pager.PageSize = 0
}
//发起调用刚刚到回调函数,拿到items和resourceVersion
list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
r.setIsLastSyncResourceVersionUnavailable(true)
// Retry immediately if the resource version used to list is unavailable.
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
// continuation pages, but the pager might not be enabled, the full list might fail because the
// resource version it is listing at is expired or the cache may not yet be synced to the provided
// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
// the reflector makes forward progress.
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh) //协程管道通信
}()
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh: //管道close,这里就不阻塞了
}
initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
if err != nil {
klog.Warningf("%s: failed to list %v: %v", r.name, r.expectedTypeName, err)
return fmt.Errorf("failed to list %v: %w", r.expectedTypeName, err)
}
// We check if the list was paginated and if so set the paginatedResult based on that.
// However, we want to do that only for the initial list (which is the only case
// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
// situations we may force listing directly from etcd (by setting ResourceVersion="")
// which will return paginated result, even if watch cache is enabled. However, in
// that case, we still want to prefer sending requests to watch cache if possible.
//
// Paginated result returned for request with ResourceVersion="0" mean that watch
// cache is disabled and there are a lot of objects of a given type. In such case,
// there is no need to prefer listing from watch cache.
if options.ResourceVersion == "0" && paginatedResult {
r.paginatedResult = true
}
//状态数据,表示list成功,官方注释明确
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v: %v", list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion() //刚刚的版本号,从List函数读取的,就是上次请求apiserver的那个时刻的资源版本号
initTrace.Step("Resource version extracted")
items, err := meta.ExtractList(list) //拿到list数据
if err != nil {
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
}
initTrace.Step("Objects extracted")
// 写入delta FIFO队列
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion) //更新资源版本
initTrace.Step("Resource version updated")
return nil
}
继续看List,反复在注释试图通过server的chunk读取,否则读取全部;检查设置limit,默认设置500
// List returns a single list object, but attempts to retrieve smaller chunks from the
// server to reduce the impact on the server. If the chunk attempt fails, it will load
// the full list instead. The Limit field on options, if unset, will default to the page size.
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
if options.Limit == 0 {
options.Limit = p.PageSize
}
requestedResourceVersion := options.ResourceVersion
requestedResourceVersionMatch := options.ResourceVersionMatch
var list *metainternalversion.List
paginatedResult := false
for {
select {
case <-ctx.Done():
return nil, paginatedResult, ctx.Err()
default:
}
//调用刚刚设置的回调函数r.listerWatcher.List(opts),进而继续调用最开始配置的函数lw.ListFunc(options)
// 获取到资源版本号和items资源列表
obj, err := p.PageFn(ctx, options)
if err != nil {
// Only fallback to full list if an "Expired" errors is returned, FullListIfExpired is true, and
// the "Expired" error occurred in page 2 or later (since full list is intended to prevent a pager.List from
// failing when the resource versions is established by the first page request falls out of the compaction
// during the subsequent list requests).
if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" {
return nil, paginatedResult, err
}
// the list expired while we were processing, fall back to a full list at
// the requested ResourceVersion.
options.Limit = 0
options.Continue = ""
options.ResourceVersion = requestedResourceVersion
options.ResourceVersionMatch = requestedResourceVersionMatch
result, err := p.PageFn(ctx, options)
return result, paginatedResult, err
}
m, err := meta.ListAccessor(obj)
if err != nil {
return nil, paginatedResult, fmt.Errorf("returned object must be a list: %v", err)
}
// exit early and return the object we got if we haven't processed any pages
//没有资源,就返回了, 如果是分页,那么后面还有资源
if len(m.GetContinue()) == 0 && list == nil {
return obj, paginatedResult, nil
}
// initialize the list and fill its contents
if list == nil {
list = &metainternalversion.List{Items: make([]runtime.Object, 0, options.Limit+1)} //
list.ResourceVersion = m.GetResourceVersion()//非常重要,更新最新版本号,分页的时候
list.SelfLink = m.GetSelfLink()
}
// 分页准备,如果continue,那么还有数据,所以需要list定义且不断for循环拼接obj,直到分页结束。
if err := meta.EachListItem(obj, func(obj runtime.Object) error {
list.Items = append(list.Items, obj)
return nil
}); err != nil {
return nil, paginatedResult, err
}
// if we have no more items, return the list
if len(m.GetContinue()) == 0 { //分页结束
return list, paginatedResult, nil
}
// set the next loop up
options.Continue = m.GetContinue() //设置读取的分页,为下一次调用数据准备
// Clear the ResourceVersion(Match) on the subsequent List calls to avoid the
// `specifying resource version is not allowed when using continue` error.
// See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143.
options.ResourceVersion = ""
options.ResourceVersionMatch = ""
// At this point, result is already paginated.
paginatedResult = true //分页标记
}
}
// List a set of apiserver resources
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
// ListWatch is used in Reflector, which already supports pagination.
// Don't paginate here to avoid duplication.
return lw.ListFunc(options)
}
这里又回到了最开始的函数指针
继续分析List函数
client.CoreV1().Pods(namespace).List(context.TODO(), options)
// List takes label and field selectors, and returns the list of Pods that match those selectors.
func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
result = &v1.PodList{}
err = c.client.Get().
Namespace(c.ns).
Resource("pods").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Do(ctx).
Into(result)
return
}
func (r *Request) Do(ctx context.Context) Result {
var result Result
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req)
})
if err != nil {
return Result{err: err}
}
if result.err == nil || len(result.body) > 0 {
metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
}
return result
}
func (r Result) Into(obj runtime.Object) error {
if r.err != nil {
// Check whether the result has a Status object in the body and prefer that.
return r.Error()
}
if r.decoder == nil {
return fmt.Errorf("serializer for %s doesn't exist", r.contentType)
}
if len(r.body) == 0 {
return fmt.Errorf("0-length response with status code: %d and content type: %s",
r.statusCode, r.contentType)
}
out, _, err := r.decoder.Decode(r.body, nil, obj)
if err != nil || out == obj {
return err
}
// if a different object is returned, see if it is Status and avoid double decoding
// the object.
switch t := out.(type) {
case *metav1.Status:
// any status besides StatusSuccess is considered an error.
if t.Status != metav1.StatusSuccess {
return errors.FromObject(t)
}
}
return nil
}
3部曲,构建request,do,decode response body,注意这个decode是阻塞的,默认使用json的decoder,在List的过程,笔者本地并未阻塞,可能跟不支持chunk有关,或者笔者的K8S版本比较新,旧版本支持chunk。 笔者就读取到了抓包的第一个请求,获取500条以内的items pod列表,和当前资源版本号
3.4 watch核心过程
Watch实际上在上面讲的定时任务就会触发,最终触发WatchFunc函数指针
分析源码
// Watch returns a watch.Interface that watches the requested pods.
func (c *pods) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
opts.Watch = true
return c.client.Get().
Namespace(c.ns).
Resource("pods").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Watch(ctx)
}
普通的Http请求,核心看Watch
// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
// We specifically don't want to rate limit watches, so we
// don't use r.rateLimiter here.
if r.err != nil {
return nil, r.err
}
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
isErrRetryableFunc := func(request *http.Request, err error) bool {
// The watch stream mechanism handles many common partial data errors, so closed
// connections can be retried in many cases.
if net.IsProbableEOF(err) || net.IsTimeout(err) {
return true
}
return false
}
retry := r.retryFn(r.maxRetries)
url := r.URL().String()
for {
if err := retry.Before(ctx, r); err != nil {
return nil, retry.WrapPreviousError(err)
}
req, err := r.newHTTPRequest(ctx)
if err != nil {
return nil, err
}
//http请求
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
retry.After(ctx, r, resp, err)
if err == nil && resp.StatusCode == http.StatusOK {
return r.newStreamWatcher(resp) //这里很关键,毕竟正常情况下就是这里处理的,就是http长轮询的原因,response的body控制阻塞,并通过管道跟delta和handler交互
}
//错误消息处理
done, transformErr := func() (bool, error) {
defer readAndCloseResponseBody(resp)
if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
return false, nil
}
if resp == nil {
// the server must have sent us an error in 'err'
return true, nil
}
if result := r.transformResponse(resp, req); result.err != nil {
return true, result.err
}
return true, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
}()
//错误处理成功就返回,等下一次ListAndWatch了
//否则就再发一次请求
if done {
if isErrRetryableFunc(req, err) {
return watch.NewEmptyWatch(), nil
}
if err == nil {
// if the server sent us an HTTP Response object,
// we need to return the error object from that.
err = transformErr
}
return nil, retry.WrapPreviousError(err)
}
}
}
可以看到Https 并不是chunk,没有chunk标记
newStreamWatcher
func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) {
contentType := resp.Header.Get("Content-Type")
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
}
objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
if err != nil {
return nil, err
}
handleWarnings(resp.Header, r.warningHandler)
//帧处理,命名可以简单认为http2,resp.Body,这个是个流,可以被server阻塞
frameReader := framer.NewFrameReader(resp.Body)
watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)
//这个是神奇操作,就是数据的接收操作
return watch.NewStreamWatcher(
restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
// use 500 to indicate that the cause of the error is unknown - other error codes
// are more specific to HTTP interactions, and set a reason
errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
), nil
}
实际上就是协程读取Decoder sw.receive()
// NewStreamWatcher creates a StreamWatcher from the given decoder.
func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
sw := &StreamWatcher{
source: d,
reporter: r,
// It's easy for a consumer to add buffering via an extra
// goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better.
result: make(chan Event),
// If the watcher is externally stopped there is no receiver anymore
// and the send operations on the result channel, especially the
// error reporting might block forever.
// Therefore a dedicated stop channel is used to resolve this blocking.
done: make(chan struct{}),
}
go sw.receive()
return sw
}
sw.receive()
// receive reads result from the decoder in a loop and sends down the result channel.
func (sw *StreamWatcher) receive() {
defer utilruntime.HandleCrash()
defer close(sw.result)
defer sw.Stop()
for {
//核心就这里,不断的读取,直到EOF,或者sw done
action, obj, err := sw.source.Decode()
if err != nil {
switch err {
case io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
default:
if net.IsProbableEOF(err) || net.IsTimeout(err) {
klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
} else {
select {
case <-sw.done:
case sw.result <- Event{
Type: Error,
Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
}:
}
}
}
return
}
select {
case <-sw.done:
return
case sw.result <- Event{ //watch结果
Type: action,
Object: obj,
}:
}
}
}
这个sw的result的管道就会在cache.(*Reflector).ListAndWatch 的
watchHandler
写入deltafifo队列,队列POP就会触发handler和store存储
sw.source.Decode()一般情况数据是json数据,会使用json的Decoder处理,笔者本地的K8S是读取过程中阻塞,直到api-server有数据过来。
总结
实际上client-go的核心代码并不复杂,但是有比较长的流程,架构设计又有restClient的多重交付,后面的fifo队列,监听器回调,本地store等,demo案例的POD监听大概逻辑如下图:
看起来很简单,但是细节很多,而且实现多样化,最难受的是匿名函数指针的传递,很难受,读懂代码需要结合上下文才行。
参考资料
如何通过抓包来查看Kubernetes API流量-阿里云开发者社区 (aliyun.com)
可能是史上最全的Kubernetes证书解析 (qingwave.github.io)