之前介绍过informer的流程,文章在 informer介绍。今天梳理一下他的源码和流程。
一、概念
什么是 Informer
informer 是 client-go 中的核心工具包,informer 其实就是一个带有本地缓存和索引机制的,可以注册 EventHandler 的 client 本地缓存被称为 Store,索引被称为 Index 。使用 informer 的目的是为了减轻 apiserver 数据交互的压力而抽象出来的一个 cache 层,客户端对 apiserver 数据的 读取和监听操作都通过本地 informer进行。
为什么需要 Informer 机制
Kubernetes 各个组件都是通过 REST API 跟 API Server 交互通信的,而如果每次每一个组件都直接跟 API Server 交互去读取/写入到后端的etcd的话,会对 API Server 以及etcd造成非常大的负担。 而 Informer 机制是为了保证各个组件之间通信的实时性、可靠性,并且减缓对 API Server和etcd 的负担。
informer和configch(newSourceApiserverFromLW)区别
之前介绍过一篇configch的文章 configch源码。这篇文章主要是介绍newSourceApiserverFromLW方法,与kubelet交互后进行更新删除等。并且newSourceApiserverFromLW只监听一个api对象(pod)
而informer则监听整个namespace下的api对象。但是informer不会做出动作,只是监听后维护自己的缓存。方便用户去使用最新数据,避免每次都http访问api-server浪费资源
二、注册及接收
大概步骤如下
- 启动informer
- 注册processLoop和reflector
- reflector开始LIST&WATCH。同时processLoop开始循环pop队列数据(步骤三)
- watch到的数据进行对比处理,不存在的存入的queue队列中
注册
2.1.start注册
代码位置:vendor/k8s.io/client-go/informer/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
if f.shuttingDown {
return
}
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
f.wg.Add(1)
informer := informer
go func() {
defer f.wg.Done()
//启动informer
informer.Run(stopCh)
}()
f.startedInformers[informerType] = true
}
}
}
2.2 注册processLoop和reflector
代码位置:vendor/k8s.io/client-go/tools/cache/controller.go
//完成一些初始化,总要的是下面两个函数
//启动监听
wg.StartWithChannel(stopCh, r.Run)
//这里是循环delta FIFO的queue,然后pop出来最新的
wait.Until(c.processLoop, time.Second, stopCh)
2.3 reflector开始LIST&WATCH
代码位置:vendor/k8s.io/client-go/tools/cache/reflector.go
这里开始LIST&WATCH
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
....省略
//开始watch
return r.watch(w, stopCh, resyncerrc)
...省略
//watch函数开始
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
2.4 watch处理数据
还是上面reflector文件,开始处理接收到的最新watch的信息
switch event.Type {
case watch.Added:
err := store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Modified:
err := store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Deleted:
err := store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
}
case watch.Bookmark:
if _, ok := meta.GetAnnotations()["k8s.io/initial-events-end"]; ok {
if exitOnInitialEventsEndBookmark != nil {
*exitOnInitialEventsEndBookmark = true
}
}
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
}
代码位置:vendor/k8s.io/client-go/tools/cache/delta_fifo.go
进入到更新。这里通过queueActionLocked函数验证验证一下obj是否存在,不存在则推倒queue队列中
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Updated, obj)
}
...省略部分代码
//queueActionLocked中就是获取到item映射出来的这个obj信息,如果不存在则推送到queue中,如果存在则替换。
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
到这里,接收的工作就完成了,已经将最新的数据推送到了queue中,下面就是开始处理这个queue了
三、弹出队列处理
- 开启queue队列的循环接收
- 获取到最新队列信息开始处理
- 更新本地indexer缓存
- 分发给监听informer的用户去触发回调函数
3.1 开始循环接收数据
func (c *controller) processLoop() {
for {
//无限循环处理queue数据
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
}
}
3.2 这里面也是个for循环,如果队列大于0,则读取出来通过process处理,否则等待队列有新数据加入
代码位置 vendor/k8s.io/client-go/tools/cache/delta_fifo.go
for {
for len(f.queue) == 0 {
if f.closed {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
delete(f.items, id)
//进入这里
err := process(item, isInInitialList)
}
3.3 注册的回调。判断映射是否存在,存在的话则进行处理
代码位置 vendor/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
if deltas, ok := obj.(Deltas); ok {
//如果映射存在,则进行处理
return processDeltas(s, s.indexer, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
}
3.4 处理本地缓存和回调
代码位置:vendor/k8s.io/client-go/tools/cache/controller.go
func processDeltas(
handler ResourceEventHandler,
clientState Store,
deltas Deltas,
isInInitialList bool,
) error {
for _, d := range deltas {
obj := d.Object
switch d.Type {
//如果是添加、同步、替换、更新,则进入这里
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
//更新indexer的本地缓存(步骤3.5)
if err := clientState.Update(obj); err != nil {
return err
}
//处理回调(步骤3.6)
handler.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
handler.OnAdd(obj, isInInitialList)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
handler.OnDelete(obj)
}
}
return nil
}
3.5 更新本地indexer(Storage)缓存
代码位置:vendor/k8s.io/client-go/tools/cache/controller.go
func (c *cache) Update(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Update(key, obj)
return nil
}
3.6处理回调
func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
isSync := false
if accessor, err := meta.Accessor(new); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
s.cacheMutationDetector.AddObject(new)
//这个是重点,分发给监听者
s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
}
3.7 分发给监听者
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
//p.listeners代表有几个监听者。这里循环所有监听者,把最新信息发送给监听者。
for listener, isSyncing := range p.listeners {
switch {
case !sync:
listener.add(obj)
case isSyncing:
listener.add(obj)
default:
}
}
}
到这里就处理结束了。如果想要使用监听者,需要用户自己建立informer进行监听。然后添加注册函数,就可以接收了