kubelet源码分析 syncLoopIteration
syncLoopIteration里有四个chan管道。分别是configCh、plegCh、syncCh、housekeepingCh。这篇主要聊一下这四个管道的由来。
一、configCh
configCh是通过list&watch的API SERVER获得的数据。然后在本地进行比对,推送到configCh管道中
1.cmd/kubelet/server.go中会根据pkg/kubelet/kubelet.go的一些初始化数据,将config.go的数据初始化,并将config.go中的update管道放到这里使用(LIST&WATCH会推送到这个管道里)最后kubelet.run会带上这个update管道。代码都是初始化的切比较乱,不贴这里了。go k.Run(podCfg.Updates())
2.1 初始化数据并监听apiserver
- kubelet.go中初始化结构体
- 监听三种数据来源,分别是api、file、http
- channel函数会创建监听
- ChannelWithContext创建goroutine执行listen
//初始化config.go中结构体
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
if kubeCfg.StaticPodPath != "" {
klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource))
}
//添加三种数据来源,分别是api、file、http
if kubeCfg.StaticPodURL != "" {
klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource))
}
//NewSourceApiserver会初始化LIST&WATCH
if kubeDeps.KubeClient != nil {
klog.InfoS("Adding apiserver pod source")
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource))
}
2.2.初始化LIST&WATCH
- 创建监听服务,如果node准备好了,则开启服务
- 注册send函数,函数主要将apiserver传过来的数据重新组装后传给后续使用
- run函数开始触发ListAndWatch函数
- 函数中会触发watch执行
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))
klog.InfoS("Waiting for node sync before watching apiserver pods")
go func() {
for {
if nodeHasSynced() {
klog.V(4).InfoS("node sync completed")
break
}
time.Sleep(WaitForAPIServerSyncPeriod)
klog.V(4).InfoS("node sync has not completed yet")
}
klog.InfoS("Watching apiserver")
newSourceApiserverFromLW(lw, updates)
}()
}
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
//这个send函数,流程2.3会用到,会通过这个函数把数据推到chan中
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
//所有api过来的数据,都以SET类型重新传入
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
//ListAndWatch函数代码过多,不全贴了,
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
2.3.watch开始执行,收到数据后进行验证
- add/update/delete虽然触发的函数不一样,但是执行流程都类似的。
- 先把这个数据更新/添加/删除在本地的缓存中。
- 然后遍历所有的已存在的pod数据,然后执行上面流程2中的send函数。
- 把所有pod数据通过send函数推送到一个podList后整体推送到chan管道
- 这次的watch就结束了。等待下次变化
case watch.Added:
err := store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Modified:
err := store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Deleted:
err := store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
}
case watch.Bookmark:
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
}
func (u *UndeltaStore) Add(obj interface{}) error {
if err := u.Store.Add(obj); err != nil {//更新本地缓存
return err
}
u.PushFunc(u.Store.List())//触发流程2的send函数,把所有pod遍历一遍推过去
return nil
}
3.上面都是推送的流程,下面开始接收的流程。
- 执行channel函数,初始化管道。然后监听这个管道,并返回给流程2.1
- 通过流程2.1中的send函数,最终数据会在这个channel管道中接收到。然后在liseten中执行
- 执行过交给merge函数
//流程2.1的函数,这里主要用到Channel函数
if kubeDeps.KubeClient != nil {
klog.InfoS("Adding apiserver pod source")
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource))
}
func (c *PodConfig) Channel(ctx context.Context, source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
return c.mux.ChannelWithContext(ctx, source)
}
func (m *Mux) ChannelWithContext(ctx context.Context, source string) chan interface{} {
if len(source) == 0 {
panic("Channel given an empty name")
}
m.sourceLock.Lock()
defer m.sourceLock.Unlock()
channel, exists := m.sources[source]
if exists {
return channel
}
newChannel := make(chan interface{})
m.sources[source] = newChannel
//监听新数据
go wait.Until(func() { m.listen(source, newChannel) }, 0, ctx.Done())
return newChannel
}
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
m.merger.Merge(source, update)
}
}
4.处理数据并推送到configCh
4.1 处理数据
- 接收到数据首先判断来源是否已经注册了
- 验证podList是需要更新还是删除还是调解等(流程4.2)
- 将数据推送到kubelet坚挺的chan管道中
func (s *podStorage) Merge(source string, change interface{}) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
seenBefore := s.sourcesSeen.Has(source)
//流程4.2
adds, updates, deletes, removes, reconciles := s.merge(source, change)
firstSet := !seenBefore && s.sourcesSeen.Has(source)
switch s.mode {
case PodConfigNotificationIncremental:
if len(removes.Pods) > 0 {
s.updates <- *removes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
s.updates <- *adds
}
if len(reconciles.Pods) > 0 {
s.updates <- *reconciles
}
case PodConfigNotificationSnapshotAndUpdates:
if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
}
case PodConfigNotificationUnknown:
fallthrough
default:
panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
}
return nil
}
4.2 判断pod
- 取出老的pod信息
- 注册函数updatePodsFunc
- 用oldPods拿到老pod的数据,同时清空pods数据(通过直接创建一个新的清除旧的)
- 执行函数
- 遍历函数,如果annotations注解为空,初始化,并且赋值来源
- 遍历oldpods,如果oldpods没有这个pod信息,则代表是添加。并且把这个pod信息添加到pods里
- 如果有这个pods信息,则存入pods里(这里为了后续判断是否这个pod被删除了)
- 验证这个pod需要进行什么类型的处理(流程4.3)
- 遍历新的pods数据,如果有oldPods没有的,则是代表删除了
- 将所有数据重新组装返回
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
s.podLock.Lock()
defer s.podLock.Unlock()
addPods := []*v1.Pod{}
updatePods := []*v1.Pod{}
deletePods := []*v1.Pod{}
removePods := []*v1.Pod{}
reconcilePods := []*v1.Pod{}
pods := s.pods[source]
if pods == nil {
pods = make(map[types.UID]*v1.Pod)
}
updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
filtered := filterInvalidPods(newPods, source, s.recorder)
for _, ref := range filtered {
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
if existing, found := oldPods[ref.UID]; found {
pods[ref.UID] = existing
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
if needUpdate {
updatePods = append(updatePods, existing)
} else if needReconcile {
reconcilePods = append(reconcilePods, existing)
} else if needGracefulDelete {
deletePods = append(deletePods, existing)
}
continue
}
recordFirstSeenTime(ref)
pods[ref.UID] = ref
addPods = append(addPods, ref)
}
}
update := change.(kubetypes.PodUpdate)
switch update.Op {
/*此处省略了其他类型的代码*/
case kubetypes.SET:
klog.V(4).InfoS("Setting pods for source", "source", source)
s.markSourceSet(source)
oldPods := pods
pods = make(map[types.UID]*v1.Pod)
updatePodsFunc(update.Pods, oldPods, pods)
for uid, existing := range oldPods {
if _, found := pods[uid]; !found {
// this is a delete
removePods = append(removePods, existing)
}
}
default:
klog.InfoS("Received invalid update type", "type", update)
}
s.pods[source] = pods
adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
return adds, updates, deletes, removes, reconciles
}
4.3检查pod如何处理
- 先检查这个pod和老的pod的spec、labels、DeletionTimestamp、DeletionGracePeriodSeconds、Annotations是否有变化,如果有变化,则需要进行更新处理,说明配置文件变了
- 如果没有变化,判断新老pod的status运行状态是否有变化,如果有变化,则是需要进行调解needReconcile
- 如果pod的spec有变化,则要把老的pod信息更新成新的
- 如果DeletionTimestamp不为nil,则代表删除。否则是更新
func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGracefulDelete bool) {
if !podsDifferSemantically(existing, ref) {
if !reflect.DeepEqual(existing.Status, ref.Status) {
existing.Status = ref.Status
needReconcile = true
}
return
}
ref.Annotations[kubetypes.ConfigFirstSeenAnnotationKey] = existing.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]
existing.Spec = ref.Spec
existing.Labels = ref.Labels
existing.DeletionTimestamp = ref.DeletionTimestamp
existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
existing.Status = ref.Status
updateAnnotations(existing, ref)
if ref.DeletionTimestamp != nil {
needGracefulDelete = true
} else {
needUpdate = true
}
return
}
func podsDifferSemantically(existing, ref *v1.Pod) bool {
if reflect.DeepEqual(existing.Spec, ref.Spec) &&
reflect.DeepEqual(existing.Labels, ref.Labels) &&
reflect.DeepEqual(existing.DeletionTimestamp, ref.DeletionTimestamp) &&
reflect.DeepEqual(existing.DeletionGracePeriodSeconds, ref.DeletionGracePeriodSeconds) &&
isAnnotationMapEqual(existing.Annotations, ref.Annotations) {
return false
}
return true
}
5.这时候kubelet的configCh管道就接受到上面推送来的信息了,根据状态进行每种类型的处理
case u, open := <-configCh:
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}
switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}
kl.sourcesReady.AddSource(u.Source)
下一篇kubelet源码分析 syncLoopIteration(二) plegCh