Controller原理
在 K8s 中,用户通过声明式 API 定义资源的“预期状态”,Controller 则负责监视资源的实际状态,当资源的实际状态和“预期状态”不一致时,Controller 则对系统进行必要的更改,以确保两者一致,这个过程被称之为调谐(Reconcile)。
例如下图中,用户定义了一个 Deployment 资源,其中指定了运行的容器镜像,副本数等信息。Deployment Controller 会根据该定义在 K8s 节点上创建对应的 Pod,并对这些 Pod 进行持续监控。如果某个 Pod 异常退出了,Deployment Controller 会重新创建一个 Pod,以保证系统的实际状态和用户定义的“预期状态”(8个副本)一致。
有时候 Controller 也被叫做 Operator。这两个术语的混用有时让人感到迷惑。Controller 是一个通用的术语,凡是遵循 “Watch K8s 资源并根据资源变化进行调谐” 模式的控制程序都可以叫做 Controller。而 Operator 是一种专用的 Controller,用于在 Kubernetes 中管理一些复杂的,有状态的应用程序。例如在 Kubernetes 中管理 MySQL 数据库的 MySQL Operator。
K8s HTTP API的List Watch机制
我们说过Controller需要监控k8s中的资源状态,那么这个过程是怎么实现的呢?
K8s API Server 提供了获取某类资源集合的 HTTP API,此类 API 被称为 List 接口。例如下面的 URL 可以列出 default namespace 下面的 pod。
HTTP GET api/v1/namespaces/default/pods
在该 URL 后面加上参数 ?watch=true
,则 API Server 会对 default namespace 下面的 pod 的状态进行持续监控,并在 pod 状态发生变化时通过 chunked Response (HTTP 1.1) 或者 Server Push(HTTP2)通知到客户端。K8s 称此机制为 watch。
HTTP GET api/v1/namespaces/default/pods?watch=true
我们现在来演示一下这个效果:
我们可以通过kubectl proxy
启动API Server的代理服务器。
kubectl proxy --port 8080
然后通过curl来List Pod资源:
curl http://localhost:8080/api/v1/namespaces/default/pods
在该命令的输出中,我们可以看到 HTTP Response 是一个 json 格式的数据结构,里面列出来目前 default namespace 中的所有 pod。在返回数据结构中有一个 resourceVersion
字段,该字段的值是此次 List 操作得到的资源的版本号。我们在 watch 请求中可以带上该版本号作为参数,API Server 会 watch 将该版本之后的资源变化并通知客户端。
{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {
"resourceVersion": "770715" //资源版本号
},
"items": [
{
"metadata": {
"name": "foo",
"namespace": "default",
"uid": "d6adfe72-4e90-4b6e-bf14-b6192acb5d07",
"resourceVersion": "762448",
"creationTimestamp": "2023-03-10T16:16:02Z",
"annotations": {…},
"managedFields": […]
},
"spec": {…},
"status": {…}
},
{
"metadata": {
"name": "bar",
"namespace": "default",
"uid": "bac55478-ad8d-49a6-bab2-23bfdc788736",
"resourceVersion": "762904",
"creationTimestamp": "2023-03-10T16:19:17Z",
"annotations": {…},
"managedFields": […]
},
"spec": {…},
"status": {…}
}
]
}
在请求中加上 watch 参数,并带上前面 List 返回的版本号,以 watch pod 资源的变化。
curl http://localhost:8080/api/v1/namespaces/default/pods?watch=true&resourceVersion=770715
在另一个终端中创建一个名为 test 的 pod,然后将其删除,可以看到下面的输出:
从上面 HTTP Watch 返回的 Response 中,可以看到有三种类型的事件:ADDED,MODIFIED 和 DELETED。ADDED 表示创建了新的 Pod,Pod 的状态变化会产生 MODIFIED 类型的事件,DELETED 则表示 Pod 被删除。
利用 K8s 的 HTTP API,我们可以编写一个最简化版本的 “Controller”。例如下面的程序,该程序的实现逻辑和前面的 curl 请求是相同的,也是通过 HTTP GET 请求来 watch pod 资源。这个 “Controller” 只是用于展示 HTTP API 的 Watch 机制,其中并没有调谐的业务逻辑,只是将 HTTP Response 中收到的事件打印出来。
package main
import (
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"time"
)
const token = "eyJhbGciOiJSUzI1NiIsImtpZCI6ImFRM2J0Z3NmUk1hR2VhV2VRbE5vbkVHbGRSMUIwdEdTU3ZPb21TSXEtMkUifQ"
const apiServer = "https://127.0.0.1:55429"
type Pod struct {
Metadata struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
CreationTimestamp time.Time `json:"creationTimestamp"`
} `json:"metadata"`
}
type Event struct {
EventType string `json:"type"`
Object Pod `json:"object"`
}
func main() {
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
req, err := http.NewRequest(http.MethodGet, apiServer+"/api/v1/namespaces/default/pods?watch=true", nil)
if err != nil {
panic(err)
}
req.Header.Set("Authorization", "Brearer "+token)
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
var event Event
decoder := json.NewDecoder(resp.Body)
for {
if err := decoder.Decde(&event); err != nil {
panic(err)
}
fmt.Printf("%s Pod %s \n", event.EventType, event.Object.Metadata)
}
}
为了方便开发者使用,k8s 提供了对封装了 HTTP watch 机制的 go client。如果使用 k8s go client,几十行代码就可以实现一个简单的 Controller,如下所示:
package main
import (
"context"
"fmt"
v12 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
func main() {
// create a kubernetes API client
config, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// watch for changes to pods
watcher, err := clientset.CoreV1().Pods("").Watch(context.Background(), v1.ListOptions{})
if err != nil {
panic(err.Error())
}
// loop through events from the watcher
for event := range watcher.ResultChan() {
pod := event.Object.(*v12.Pod)
switch event.Type {
case watch.Added:
fmt.Printf("Pod %s added\n", pod.Name)
// todo: reconcile logic goes here
case watch.Modified:
fmt.Printf("Pod %s modified\n", pod.Name)
// todo: reconcile logic goes here
case watch.Deleted:
fmt.Printf("Pod %s deleted\n", pod.Name)
// todo: reconcile logic goes here
}
}
}
Informer机制
采用 k8s HTTP API 可以查询 K8s API 资源对象并 Watch 其变化,但大量的 HTTP 调用会对 API Server 造成较大的负荷,而且网络调用可能存在较大的延迟。除此之外,开发者还需要在程序中处理资源的缓存,HTTP 链接出问题后的重连等。为了解决这些问题并简化 Controller 的开发工作,K8s 在 client go 中提供了一个 informer 客户端库。
在 Kubernetes 中,Informer 是一个客户端库,用于监视 Kubernetes API 服务器中的资源并将它们的当前状态缓存到本地。Informer 提供了一种方法,让客户端应用程序可以高效地监视资源的更改,而无需不断地向 API 服务器发出请求。
相比直接使用HTTP Watch,使用Kubernetes Informer有以下优势:
- 减少API服务器的负载:通过在本地缓存资源信息,Informer减少了需要向API服务器发出的请求数量。这可以防止API服务器过载而影响整个集群的性能。
- 提高应用程序性能:使用缓存的数据,客户端应用程序可以快速访问资源信息,而无需等待 API 服务器响应。这可以提高应用程序性能并减少延迟。
采用Informer库编写的Controller架构如下:
图中间的虚线将图分为上下两部分,其中上半部分是 Informer 库中的组件,下半部分则是使用 Informer 库编写的自定义 Controller 中的组件,这两部分一起组成了一个完整的 Controller。
采用 Informer 机制编写的 Controller 中的主要流程如下:
- Reflector采用K8s HTTP API List/Watch API Server中指定的资源。Reflector会先List资源,然后使用List接口返回的resouceVersion来watch后续的资源变化。
- Reflector将List得到的资源列表和后续的资源变化放到一个FIFO(先进先出)队列中。
- 使用List的结果刷新到FIFO队列
- 将Watch收到的事件加入到FIFO队列
- Informer在一个循环中从FIFO队列中拿出资源对象进行处理。
- Informer将从FIFO队列中拿出资源对象放到Infexer。
- Indexer 是 Informer 中的一个本地缓存,该缓存提供了索引功能(这是该组件取名为 Indexer 的原因),允许基于特定条件(如标签、注释或字段选择器)快速有效地查找资源。此处代码中的 clientState 就是 Indexer,来自于NewIndexerInformer方法中构建的 Indexer,该 Indexer 作为 clientState 参数传递给了 newInformer 方法。
- Indexer将收到的资源对象放入其内部的缓存ThreadSafeStore中。
- 回调Controller的ResourceEventHandler,将资源对象变化通知到应用逻辑。
- 在ResourceEventHandler对资源对象的变化进行处理。ResourveEventHandler处于用户的Controller代码中,k8s推荐的编程范式是将收到的消息放入到一个队列中,然后在一个循环中处理队列的消息,执行调谐逻辑。推荐该模式的原因是采用对象可以解耦消息生产者(Informer)和消费者(Controller调谐逻辑),避免消费者阻塞生产者。在用户代码中需要注意几点:
- 前面我们已经讲到,Reflector 会使用 List 的结果刷新 FIFO 队列,因此 ResourceEventHandler 收到的资源变化消息其实包含了 Informer 启动时获取的完整资源列表,Informer 会采用 ADDED 事件将列表的资源通知到用户 Controller。该机制屏蔽了 List 和 Watch 的细节,保证用户的 ResourceEventHandler 代码中会接收到 Controller 监控的资源的完整数据,包括启动 Controller 前已有的资源数据,以及之后的资源变化。
- ResourceEventHandler 中收到的消息中只有资源对象的 key,用户在 Controller 中可以使用该 key 为关键字,通过 Indexer 查询本地缓存中的完整资源对象。
接下来我们来剖析一下client-go官方给出的实例学习代码:
https://github.com/kubernetes/client-go/blob/master/examples/workqueue/main.go#L215
- 在启动 Controller 时需要调用
informer.Run(stopCh)
方法(参见 107 行)。该方法会调用 Reflector 的 ListAndWatch 方法。ListAndWatch 首先采用 HTTP List API 从 K8s API Server 获取当前的资源列表,然后调用 HTTP Watch API 对资源变化进行监控,并把 List 和 Watch 的收到的资源通过 ResourceEventHandlerFuncs 的 AddFunc UpdateFunc DeleteFunc 三个回调接口分发给 Controller。 - 在开始对队列中的资源事件进行处理之前,先调用
cache.WaitForCacheSync(stopCh, c.informer.HasSynced)
(参见 110 行)。正如其方法名所示,该方法确保 Informer 的本地缓存已经和 K8s API Server 的资源数据进行了同步。当 Reflector 成功调用 ListAndWatch 方法从 K8s API Server 获取到需要监控的资源数据并保存到本地缓存后,会将c.informer.HasSynced
设置为 true。在开始业务处理前调用该方法可以确保在本地缓存中的资源数据是和 K8s API Server 中的数据一致的。 - 在对事件进行处理之后,需要调用
queue.Done(key)
方法将事件从队列中删除,以避免重复处理。 - 如果处理时发生异常,可以通过
c.queue.AddRateLimited(key)
将出错事件的 key 重新加入到队列中。该方法会对重新加入队列的错误消息进行限流,缺省的限流规则是 10 qps。这意味着当 1 秒内出错的消息大于 10 条时,10 条后的错误消息就会在等待一段时间后才会被重新加入到队列中(参见 74 行的 handleErr 方法)。
SharedInformer
如果在一个应用中有多处相互独立的业务逻辑都需要监控同一种资源对象,用户会编写多个 Informer 来进行处理。这会导致应用中发起对 K8s API Server 同一资源的多次 ListAndWatch 调用,并且每一个 Informer 中都有一份单独的本地缓存,增加了内存占用。
K8s 在 client go 中基于 Informer 之上再做了一层封装,提供了 SharedInformer 机制。采用 SharedInformer 后,客户端对同一种资源对象只会有一个对 API Server 的 ListAndWatch 调用,多个 Informer 也会共用同一份缓存,减少了对 API Server 的请求,提高了性能。
SharedInformerFactory 中有一个 Informer Map。当应用代码调用 InformerFactory 获取某一资源类型的 Informer 时, SharedInformer 会判断该类型的 Informer 是否存在,如果不存在就新建一个 Informer 并保存到该 Map 中,如果已存在则直接返回该 Informer(参见 SharedInformerFactory 的 InformerFor 方法)。因此应用中所有从 InformerFactory 中取出的同一类型的 Informer 都是同一个实例。
采用Controller来处理自定义CRD
通过之前的讲解,我们了解到了 如何编写一个Controller来监控和处理Kubernetes中内置的Pod资源对象。采用同样的方法我们也可以编写一个Controller来处理自定义的CRD资源对象。
我们首先使用下面的 yaml 片段来在 Kubernetes 中创建一个自定义 CRD。该 yaml 文件中定义了名为 Foo 的自定义资源,该资源的 Spec 中有 deployment 和 replica 两个属性,可以看出是对 Deployment 的一个简单封装,即将一个 Deployment 的副本数设置为指定的数量。
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: foos.samplecontroller.k8s.io
# for more information on the below annotation, please see
# https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/2337-k8s.io-group-protection/README.md
annotations:
"api-approved.kubernetes.io": "unapproved, experimental-only; please get an approval from Kubernetes API reviewers if you're trying to develop a CRD in the *.k8s.io or *.kubernetes.io groups"
spec:
group: samplecontroller.k8s.io
versions:
- name: v1alpha1
served: true
storage: true
schema:
# schema used for validation
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
deploymentName:
type: string
replicas:
type: integer
minimum: 1
maximum: 10
status:
type: object
properties:
availableReplicas:
type: integer
# subresources for the custom resource
subresources:
# enables the status subresource
status: {}
names:
kind: Foo
plural: foos
scope: Namespaced
在 Kubernetes 中创建了 Foo 这个 CRD 之后,我们可以采用 kubectl 命令行工具创建/删除/修改该 CRD 对应的资源。例如下面的代码片段将创建一个 名为 example-foo
的 Foo 资源。该资源要求将 example-foo 这个 Deployment 的副本数设置为5个。
在前面章节的示例中,我们采用 Inoformer 机制来对 Pod 进行监控和调谐;类似地,我们也希望采用类似的方式对新建的该自定义 CRD Foo 进行处理。但是 Kubernetes client go 中只有 Kubernetes 原生的 API 对象相关的接口,并不能处理自定义 CRD。为了对自定义 CRD 进行访问,Kubernetes 提供了 k8s.io/code-generator 代码生成工具,我们可以使用该工具来生成创建 Informer 需要的相关框架代码,包括 clientset,informers,listers 和 API 对象中相关数据结构的 DeepCopy 方法。
为了使用 go-generator 工具来生成我们需要的 go-client 代码,我们先采用 go 来编写和该 CRD 对应的数据结构。如下面的代码片段所示,CRD 的结构中主要包含下列的内容:
- TypeMeta - CRD的Group,Version,Kind
- ObjectMeta - 标准的k8s metadata字段,包括name和namespace
- Spec - CRD中的自定义字段
- Status - Spec对应的状态
我们先来看一个示例:
/* source code from https://github.com/kubernetes/sample-controller/blob/master/pkg/apis/samplecontroller/v1alpha1/types.go */
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type Foo struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec FooSpec `json:"spec"`
Status FooStatus `json:"status"`
}
type FooSpec struct {
DeploymentName string `json:"deploymentName"`
Replicas *int32 `json:"replicas"`
}
type FooStatus struct {
AvailableReplicas int32 `json:"availableReplicas"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// FooList is a list of Foo resources
type FooList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []Foo `json:"items"`
}
可以看到,在定义 CRD 的 go 代码中有类似 // +...
的注释(称为 Tag),go-generator 会根据这些 Tag 来生成 k8s go client 框架代码。
我们需要在 doc.go 文件中使用一个全局 tag +k8s:deepcopy-gen=package
,来为整个 package 中的所有数据结构生成 DeepCopy 方法。 DeepCopy 方法对数据结构进行深拷贝,当你需要在代码中对该一个对象进行修改,而又不希望影响其他使用到该对象的代码时,可以先对对象进行一次 DeepCopy,拿到该对象的一个副本后再进行操作。
Kubernetes client 要求注册到 Scheme 中的 API 对象必须实现 runtime.Object
接口。因此除了该全局 Tag 之外,我们可以看到在上面代码片段的 Foo
和 FooList
数据结构中,还采用了 +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
本地 Tag 来告诉 deepcoy-gen
工具为这两个数据结构生成返回 runtime.Objec
对象的附加的 DeepCopyObject
方法。
+genclient
和 +groupName=samplecontroller.k8s.io
则被 client-gen
工具用于生成 clientsent。此外,我们还需要使用 informer-gen
和 lister-gen
为自定义 CRD 生成 informer
和 lister
代码。
Leader Election
在实际部署时,为了保证 Controller 的高可用,我们常常同时运行多个 Controller 实例。在这种情况下,多个 Controller 实例之间需要进行 Leader Election。被选中成为 Leader 的 Controller 实例才执行 Watch 和 Reconcile 逻辑,其余 Controller 处于等待状态。当 Leader 出现问题后,另一个实例会被重新选为 Leader,接替原 Leader 继续执行。
Kubernetes Client go 已经封装了上面描述的选举逻辑,我们可以直接使用封装后的代码,不必关心 Leader Election 的实现细节。这里了解一下就可以,我们开发肯定是使用kubebuilder,不可能这么硬撸的。