如何使用 client-go 监听自定义资源
在 Kubernetes 中使用 client-go
监听自定义资源(Custom Resource,简称 CR)需要借助 Dynamic Client 或 Custom Informer,因为 client-go
的标准 Clientset
只支持内置资源(如 Pod、Deployment)。自定义资源由 CustomResourceDefinition(CRD)或 Operator 定义,监听它们需要动态处理其 Group、Version 和 Resource(GVR)。以下是详细步骤和实现方法。
前提条件
- CRD 已部署:确保你的自定义资源定义(CRD)已在集群中注册。
- 示例:
myresource.example.com/v1
,Kind 为MyResource
。
- 示例:
- 依赖:
client-go
(推荐与集群版本匹配,例如v0.28.0
对应 Kubernetes 1.28)。- 添加依赖:
go get k8s.io/client-go@v0.28.0
- 权限:确保 ServiceAccount 有权访问 CRD(通过 RBAC 配置)。
方法 1:使用 Dynamic Client 和 Informer
Dynamic Client
是 client-go
提供的通用客户端,支持任意资源类型。结合 SharedInformer
,可以监听自定义资源。
步骤
-
初始化 Dynamic Client:
package main import ( "context" "log" "time" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" ) func getDynamicClient() dynamic.Interface { config, err := rest.InClusterConfig() if err != nil { config, err = clientcmd.BuildConfigFromFlags("", "/path/to/kubeconfig") if err != nil { log.Fatalf("Failed to create config: %v", err) } } client, err := dynamic.NewForConfig(config) if err != nil { log.Fatalf("Failed to create dynamic client: %v", err) } return client }
-
定义 GVR:
- 指定自定义资源的 Group、Version 和 Resource。
- 示例:
myresource.example.com/v1
,资源名为myresources
。gvr := schema.GroupVersionResource{ Group: "example.com", Version: "v1", Resource: "myresources", }
-
创建 Dynamic Informer:
func main() { client := getDynamicClient() // 创建 Dynamic Informer Factory factory := dynamicinformer.NewDynamicSharedInformerFactory(client, time.Minute*30) informer := factory.ForResource(gvr).Informer() // 添加事件处理函数 informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { unstructuredObj := obj.(*unstructured.Unstructured) name := unstructuredObj.GetName() namespace := unstructuredObj.GetNamespace() log.Printf("CR Added: %s/%s", namespace, name) }, UpdateFunc: func(oldObj, newObj interface{}) { oldUnstructured := oldObj.(*unstructured.Unstructured) newUnstructured := newObj.(*unstructured.Unstructured) log.Printf("CR Updated: %s/%s", newUnstructured.GetNamespace(), newUnstructured.GetName()) }, DeleteFunc: func(obj interface{}) { unstructuredObj := obj.(*unstructured.Unstructured) log.Printf("CR Deleted: %s/%s", unstructuredObj.GetNamespace(), unstructuredObj.GetName()) }, }) // 启动 Informer ctx, cancel := context.WithCancel(context.Background()) defer cancel() factory.Start(ctx.Done()) factory.WaitForCacheSync(ctx.Done()) // 保持运行 <-ctx.Done() }
-
获取缓存数据:
lister := factory.ForResource(gvr).Lister() items, err := lister.List(labels.Everything()) if err != nil { log.Printf("Failed to list CRs: %v", err) } else { for _, item := range items { unstructuredObj := item.(*unstructured.Unstructured) log.Printf("Current CR: %s/%s", unstructuredObj.GetNamespace(), unstructuredObj.GetName()) } }
说明
- GVR:通过
kubectl api-resources
查看自定义资源的准确 GVR。 - Unstructured:自定义资源以
unstructured.Unstructured
类型返回,需手动解析字段。 - 依赖:需要导入
k8s.io/client-go/dynamic/informer
。
方法 2:生成类型化客户端和 Informer(推荐生产环境)
如果你的 CRD 有明确的 Go 类型(通过代码生成器生成),可以使用类型化的客户端和 Informer。这种方法需要更多前期工作,但更安全和直观。
步骤
-
生成代码:
- 使用
controller-tools
或k8s.io/code-generator
生成 CRD 的客户端代码。 - 示例 CRD 文件(
myresource_v1.yaml
):apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: name: myresources.example.com spec: group: example.com names: kind: MyResource plural: myresources scope: Namespaced versions: - name: v1 served: true storage: true schema: openAPIV3Schema: type: object properties: spec: type: object properties: replicas: type: integer
- 生成命令:
mkdir -p pkg/apis/example.com/v1 controller-gen crd paths=./pkg/apis/example.com/v1 output:crd:dir=./manifests controller-gen object paths=./pkg/apis/example.com/v1 k8s.io/code-generator/generate-groups.sh all ./pkg/client ./pkg/apis example.com:v1
- 使用
-
注册类型:
- 在
pkg/apis/example.com/v1/types.go
中定义类型:package v1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type MyResourceSpec struct { Replicas int32 `json:"replicas"` } type MyResource struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec MyResourceSpec `json:"spec,omitempty"` } type MyResourceList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` Items []MyResource `json:"items"` }
- 在
-
创建 Informer:
package main import ( "context" "log" "time" "k8s.io/client-go/tools/cache" examplev1 "your/module/pkg/apis/example.com/v1" exampleclientset "your/module/pkg/client/clientset/versioned" exampleinformers "your/module/pkg/client/informers/externalversions" ) func main() { config, err := rest.InClusterConfig() if err != nil { log.Fatalf("Failed to create config: %v", err) } client, err := exampleclientset.NewForConfig(config) if err != nil { log.Fatalf("Failed to create clientset: %v", err) } factory := exampleinformers.NewSharedInformerFactory(client, time.Minute*30) informer := factory.Example().V1().MyResources().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { myResource := obj.(*examplev1.MyResource) log.Printf("MyResource Added: %s/%s, Replicas: %d", myResource.Namespace, myResource.Name, myResource.Spec.Replicas) }, UpdateFunc: func(oldObj, newObj interface{}) { newResource := newObj.(*examplev1.MyResource) log.Printf("MyResource Updated: %s/%s", newResource.Namespace, newResource.Name) }, DeleteFunc: func(obj interface{}) { myResource := obj.(*examplev1.MyResource) log.Printf("MyResource Deleted: %s/%s", myResource.Namespace, myResource.Name) }, }) ctx, cancel := context.WithCancel(context.Background()) defer cancel() factory.Start(ctx.Done()) factory.WaitForCacheSync(ctx.Done()) <-ctx.Done() }
说明
- 类型安全:使用生成的类型(如
*examplev1.MyResource
),避免手动解析。 - 依赖:需要自定义的客户端包(
pkg/client
)。 - 复杂度:前期生成代码较繁琐,但长期维护更方便。
注意事项
- CRD 注册:
- 确保 CRD 已应用(
kubectl apply -f myresource_v1.yaml
)。 - 检查:
kubectl get crd myresources.example.com
- 确保 CRD 已应用(
- 权限:
- 为 ServiceAccount 配置 RBAC:
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: myresource-reader rules: - apiGroups: ["example.com"] resources: ["myresources"] verbs: ["get", "list", "watch"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: myresource-reader-binding subjects: - kind: ServiceAccount name: default namespace: default roleRef: kind: ClusterRole name: myresource-reader apiGroup: rbac.authorization.k8s.io
- 为 ServiceAccount 配置 RBAC:
- 性能:
- 使用
WithNamespace
或标签过滤减少监听范围。
- 使用
- 错误处理:
- Watch 失败时,Informer 会自动重试,需关注日志。
验证
-
创建自定义资源:
apiVersion: example.com/v1 kind: MyResource metadata: name: test-resource namespace: default spec: replicas: 3
kubectl apply -f test-resource.yaml
-
运行程序,观察日志输出:
MyResource Added: default/test-resource, Replicas: 3
总结
- Dynamic Client:
- 适合快速实现,无需生成代码。
- 使用
unstructured.Unstructured
处理数据。
- 类型化客户端:
- 适合生产环境,类型安全,需生成代码。
- 使用特定类型(如
*MyResource
)操作。
- 选择建议:
- 测试或简单场景:Dynamic Client。
- 长期项目或 Operator:类型化客户端。
如果你有具体的 CRD 定义或需求(例如监听特定字段),可以告诉我,我会进一步定制代码!
---