claimWorker()
claim worker中循环执行workFunc()
- claim worker从claimQueue中取数据,也就是取出的都是PVC
- workFunc首先从队列中取出一个obj,然后拿name去informer缓存中尝试获取
- 如果在informer缓存。说明不是删除事件,执行
updateClaim()
函数 - 如果不在informer缓存中。
- 如果pvc控制器的缓存中不存在,说明pvc已经被删除,打印日志。
- 如果pvc控制器的缓存中存在,就调用
deleteClaim()
在缓存中删除这个pvc 并且将对应的pv加入volume队列。
- 如果在informer缓存。说明不是删除事件,执行
// claimWorker processes items from claimQueue. It must run only once,
// syncClaim is not reentrant.
func (ctrl *PersistentVolumeController) claimWorker() {
workFunc := func() bool {
keyObj, quit := ctrl.claimQueue.Get()
if quit {
return true
}
defer ctrl.claimQueue.Done(keyObj)
key := keyObj.(string)
klog.V(5).Infof("claimWorker[%s]", key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)
return false
}
claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)
if err == nil {
// The claim still exists in informer cache, the event must have
// been add/update/sync
ctrl.updateClaim(claim)
return false
}
if !errors.IsNotFound(err) {
klog.V(2).Infof("error getting claim %q from informer: %v", key, err)
return false
}
// The claim is not in informer cache, the event must have been "delete"
claimObj, found, err := ctrl.claims.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting claim %q from cache: %v", key, err)
return false
}
if !found {
// The controller has already processed the delete event and
// deleted the claim from its cache
klog.V(2).Infof("deletion of claim %q was already processed", key)
return false
}
claim, ok := claimObj.(*v1.PersistentVolumeClaim)
if !ok {
klog.Errorf("expected claim, got %+v", claimObj)
return false
}
ctrl.deleteClaim(claim)
return false
}
for {
if quit := workFunc(); quit {
klog.Infof("claim worker queue shutting down")
return
}
}
}
updateClaim()
- 执行
storeClaimUpdate()
,将这个pvc存到PVC controller的缓存中- storeClaimUpdate主要判断缓存中是否有这个对象,如果没有就直接添加到缓存中,如果存在就比较缓存中的resourceVersion和事件中的resourceVersion的大小,如果比缓存中新,就更新缓存中的对象。
- 如果不是新创建的对象,直接返回
- 如果是新创建的对象,就执行
syncClaim()
// updateClaim runs in worker thread and handles "claim added",
// "claim updated" and "periodic sync" events.
func (ctrl *PersistentVolumeController) updateClaim(claim *v1.PersistentVolumeClaim) {
// Store the new claim version in the cache and do not process it if this is
// an old version.
new, err := ctrl.storeClaimUpdate(claim)//存储该pvc的新版本
if err != nil {
klog.Errorf("%v", err)
}
if !new {
return
}
err = ctrl.syncClaim(claim)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
klog.V(3).Infof("could not sync claim %q: %+v", claimToClaimKey(claim), err)
} else {
klog.Errorf("could not sync volume %q: %+v", claimToClaimKey(claim), err)
}
}
}
syncClaim()
- syncClaim首先执行updateClaimMigrationAnnotations判断是否需要更新CSI
- syncClaim的最后,无论这个pvc是否被更新了,都会判断是否存在
pv.kubernetes.io/bind-completed的
注解。这个注解通常用于表示PVC是否已经成功绑定到一个PersistentVolume(PV)- 如果不存在执行
syncUnboundClaim
- 如果存在执行
syncBoundClaim
- 如果不存在执行
// syncClaim is the main controller method to decide what to do with a claim.
// It's invoked by appropriate cache.Controller callbacks when a claim is
// created, updated or periodically synced. We do not differentiate between
// these events.
// For easier readability, it was split into syncUnboundClaim and syncBoundClaim
// methods.
func (ctrl *PersistentVolumeController) syncClaim(claim *v1.PersistentVolumeClaim) error {
klog.V(4).Infof("synchronizing PersistentVolumeClaim[%s]: %s", claimToClaimKey(claim), getClaimStatusForLogging(claim))
// Set correct "migrated-to" annotations on PVC and update in API server if
// necessary
newClaim, err := ctrl.updateClaimMigrationAnnotations(claim)
if err != nil {
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
return err
}
claim = newClaim
if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBindCompleted) {
return ctrl.syncUnboundClaim(claim)
} else {
return ctrl.syncBoundClaim(claim)
}
}