在维护 Kubernetes 系统集群时,kubectl 应该是最常用的工具之一。从 Kubernetes 架构设计的角度看,kubectl 工具是 Kubernetes API Server 的客户端。它的主要工作是向 Kubernetes API Server 发起 HTTP 请求。Kubernetes 是一个完全以资源为中心的系统,而 kubectl 会通过发起 HTTP 请求来操纵这些资源,以控制 Kubernetes 系统集群。
Kubernetes 官方提供了命令行工具(CLI),用户可以通过 kubectl 以命令行交互的方式与 Kubernetes API Server 进行通信,通信协议使用 HTTP/JSON。
Cobra 命令行参数解析
Cobra 是一个创建强大的现代化 CLI 命令行应用程序的 Go 语言库,也可以用来生成应用程序的文件。很多知名的开源软件都使用 Cobra 实现其 CLI 部分,例如 Istio、Docker、Etcd 等。Cobra 提供了如下功能。
下面使用一个 Cobra Example 代码示例描述其应用步骤:
package main
import (
"fmt"
"github.com/spf13/cobra"
"os"
)
func main() {
var Version bool
var rootCmd = &cobra.Command{
Use: "root [sub]",
Short: "root command",
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("Hello, World!: %v\n", args)
if Version {
fmt.Printf("Version:1.0\n")
}
},
}
flags := rootCmd.Flags()
flags.BoolVarP(&Version, "version", "v", false, "Print version information and quit")
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
Cobra 基本应用步骤分为如下 3 步。
(1)创建 rootCmd 主命令,并定义 Run 执行函数(注意,此处是定义 Run 函数而非直接执行该函数)。也可以通过 rootCmd.AddCommand 方法添加子命令。
(2)为命令添加命令行参数(Flag)。
(3)执行 rootCmd 命令调用的函数,rootCmd.Execute 会在内部回调 Run 执行函数。
在 Kubernetes 系统中,Cobra 被广泛使用,Kubernetes 核心组件(kube-apiserver、kube-controller-manager、kube-scheduler、kubelet 等)都通过 Cobra 来管理 CLI 交互方式。Kubernetes 组件使用 Cobra 的方式类似,下面以 kubectl 为例,分析 Cobra 在 Kubernetes 中的应用,kubectl 命令行示例如图:
kubectl [command] [TYPE] [NAME] [flags]
kubectl ge pod pod_name -n kube-system
kubectl CLI 命令行结构分别为 Command、TYPE、NAME 及 Flag,分别介绍如下。
- Command:指定命令操作,例如 create、get、describe、delete 等。命令后面也可以加子命令,例如 kubectl config view。
- TYPE:指定资源类型,例如 pod、pods、rc 等。资源类型不区分大小写。
- NAME:指定资源名称,可指定多个,例如 name1 name2。资源名称需要区分大小写。
- Flag:指定可选命令行参数,例如 -n 命令行参数用于指定不同的命名空间。
同样,在 Kubernetes 中,Cobra 的应用步骤也分为 3 步:第 1 步,创建 Command;第 2 步,为 get 命令添加命令行参数;第 3 步,执行命令。kubectl 代码示例如下:
1.创建 Command
实例化 cobra.Command 对象,并通过 cmds.AddCommand 方法添加命令或子命令。每个 cobra.Command 对象都可设置 Run 执行函数,代码示例如下:
代码路径:vendor\k8s.io\kubectl\pkg\cmd\cmd.go
// NewKubectlCommand creates the `kubectl` command and its nested children.
func NewKubectlCommand(o KubectlOptions) *cobra.Command {
warningHandler := rest.NewWarningWriter(o.IOStreams.ErrOut, rest.WarningWriterOptions{Deduplicate: true, Color: term.AllowsColorOutput(o.IOStreams.ErrOut)})
warningsAsErrors := false
// Parent command to which all subcommands are added.
cmds := &cobra.Command{
Use: "kubectl",
Short: i18n.T("kubectl controls the Kubernetes cluster manager"),
Long: templates.LongDesc(`
kubectl controls the Kubernetes cluster manager.
Find more information at:
https://kubernetes.io/docs/reference/kubectl/`),
Run: runHelp,
// Hook before and after Run initialize and write profiles to disk,
// respectively.
PersistentPreRunE: func(*cobra.Command, []string) error {
rest.SetDefaultWarningHandler(warningHandler)
return initProfiling()
},
PersistentPostRunE: func(*cobra.Command, []string) error {
if err := flushProfiling(); err != nil {
return err
}
if warningsAsErrors {
count := warningHandler.WarningCount()
switch count {
case 0:
// no warnings
case 1:
return fmt.Errorf("%d warning received", count)
default:
return fmt.Errorf("%d warnings received", count)
}
}
return nil
},
}
// From this point and forward we get warnings on flags that contain "_" separators
// when adding them with hyphen instead of the original name.
cmds.SetGlobalNormalizationFunc(cliflag.WarnWordSepNormalizeFunc)
flags := cmds.PersistentFlags()
addProfilingFlags(flags)
flags.BoolVar(&warningsAsErrors, "warnings-as-errors", warningsAsErrors, "Treat warnings received from the server as errors and exit with a non-zero exit code")
kubeConfigFlags := o.ConfigFlags
if kubeConfigFlags == nil {
kubeConfigFlags = defaultConfigFlags
}
kubeConfigFlags.AddFlags(flags)
matchVersionKubeConfigFlags := cmdutil.NewMatchVersionFlags(kubeConfigFlags)
matchVersionKubeConfigFlags.AddFlags(flags)
// Updates hooks to add kubectl command headers: SIG CLI KEP 859.
addCmdHeaderHooks(cmds, kubeConfigFlags)
f := cmdutil.NewFactory(matchVersionKubeConfigFlags)
// Sending in 'nil' for the getLanguageFn() results in using
// the LANG environment variable.
//
// TODO: Consider adding a flag or file preference for setting
// the language, instead of just loading from the LANG env. variable.
i18n.LoadTranslations("kubectl", nil)
// Proxy command is incompatible with CommandHeaderRoundTripper, so
// clear the WrapConfigFn before running proxy command.
proxyCmd := proxy.NewCmdProxy(f, o.IOStreams)
proxyCmd.PreRun = func(cmd *cobra.Command, args []string) {
kubeConfigFlags.WrapConfigFn = nil
}
// Avoid import cycle by setting ValidArgsFunction here instead of in NewCmdGet()
getCmd := get.NewCmdGet("kubectl", f, o.IOStreams)
getCmd.ValidArgsFunction = utilcomp.ResourceTypeAndNameCompletionFunc(f)
groups := templates.CommandGroups{
{
Message: "Basic Commands (Beginner):",
Commands: []*cobra.Command{
create.NewCmdCreate(f, o.IOStreams),
expose.NewCmdExposeService(f, o.IOStreams),
run.NewCmdRun(f, o.IOStreams),
set.NewCmdSet(f, o.IOStreams),
},
},
{
Message: "Basic Commands (Intermediate):",
Commands: []*cobra.Command{
explain.NewCmdExplain("kubectl", f, o.IOStreams),
getCmd,
edit.NewCmdEdit(f, o.IOStreams),
delete.NewCmdDelete(f, o.IOStreams),
},
},
{
Message: "Deploy Commands:",
Commands: []*cobra.Command{
rollout.NewCmdRollout(f, o.IOStreams),
scale.NewCmdScale(f, o.IOStreams),
autoscale.NewCmdAutoscale(f, o.IOStreams),
},
},
{
Message: "Cluster Management Commands:",
Commands: []*cobra.Command{
certificates.NewCmdCertificate(f, o.IOStreams),
clusterinfo.NewCmdClusterInfo(f, o.IOStreams),
top.NewCmdTop(f, o.IOStreams),
drain.NewCmdCordon(f, o.IOStreams),
drain.NewCmdUncordon(f, o.IOStreams),
drain.NewCmdDrain(f, o.IOStreams),
taint.NewCmdTaint(f, o.IOStreams),
},
},
{
Message: "Troubleshooting and Debugging Commands:",
Commands: []*cobra.Command{
describe.NewCmdDescribe("kubectl", f, o.IOStreams),
logs.NewCmdLogs(f, o.IOStreams),
attach.NewCmdAttach(f, o.IOStreams),
cmdexec.NewCmdExec(f, o.IOStreams),
portforward.NewCmdPortForward(f, o.IOStreams),
proxyCmd,
cp.NewCmdCp(f, o.IOStreams),
auth.NewCmdAuth(f, o.IOStreams),
debug.NewCmdDebug(f, o.IOStreams),
},
},
{
Message: "Advanced Commands:",
Commands: []*cobra.Command{
diff.NewCmdDiff(f, o.IOStreams),
apply.NewCmdApply("kubectl", f, o.IOStreams),
patch.NewCmdPatch(f, o.IOStreams),
replace.NewCmdReplace(f, o.IOStreams),
wait.NewCmdWait(f, o.IOStreams),
kustomize.NewCmdKustomize(o.IOStreams),
},
},
{
Message: "Settings Commands:",
Commands: []*cobra.Command{
label.NewCmdLabel(f, o.IOStreams),
annotate.NewCmdAnnotate("kubectl", f, o.IOStreams),
completion.NewCmdCompletion(o.IOStreams.Out, ""),
},
},
}
groups.Add(cmds)
filters := []string{"options"}
// Hide the "alpha" subcommand if there are no alpha commands in this build.
alpha := NewCmdAlpha(f, o.IOStreams)
if !alpha.HasSubCommands() {
filters = append(filters, alpha.Name())
}
templates.ActsAsRootCommand(cmds, filters, groups...)
utilcomp.SetFactoryForCompletion(f)
registerCompletionFuncForGlobalFlags(cmds, f)
cmds.AddCommand(alpha)
cmds.AddCommand(cmdconfig.NewCmdConfig(clientcmd.NewDefaultPathOptions(), o.IOStreams))
cmds.AddCommand(plugin.NewCmdPlugin(o.IOStreams))
cmds.AddCommand(version.NewCmdVersion(f, o.IOStreams))
cmds.AddCommand(apiresources.NewCmdAPIVersions(f, o.IOStreams))
cmds.AddCommand(apiresources.NewCmdAPIResources(f, o.IOStreams))
cmds.AddCommand(options.NewCmdOptions(o.IOStreams.Out))
// Stop warning about normalization of flags. That makes it possible to
// add the klog flags later.
cmds.SetGlobalNormalizationFunc(cliflag.WordSepNormalizeFunc)
return cmds
}
NewKubectlCommand 函数实例化了 cobra.Command 对象,templates.CommandGroups 定义了 kubectl 的 8 种命令类别,即基础命令(初级)、基础命令(中级)、部署命令、集群管理命令、故障排查和调试命令、高级命令及设置命令,最后通过 cmds.AddCommand 函数添加第 8 种命令类别——其他命令。以基础命令(中级)下的 get 命令为例,get 命令的 Command 定义如下:
代码路径:vendor\k8s.io\kubectl\pkg\cmd\get\get.go
// NewCmdGet creates a command object for the generic "get" action, which
// retrieves one or more resources from a server.
func NewCmdGet(parent string, f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.Command {
o := NewGetOptions(parent, streams)
cmd := &cobra.Command{
Use: fmt.Sprintf("get [(-o|--output=)%s] (TYPE[.VERSION][.GROUP] [NAME | -l label] | TYPE[.VERSION][.GROUP]/NAME ...) [flags]", strings.Join(o.PrintFlags.AllowedFormats(), "|")),
DisableFlagsInUseLine: true,
Short: i18n.T("Display one or many resources"),
Long: getLong + "\n\n" + cmdutil.SuggestAPIResources(parent),
Example: getExample,
// ValidArgsFunction is set when this function is called so that we have access to the util package
Run: func(cmd *cobra.Command, args []string) {
cmdutil.CheckErr(o.Complete(f, cmd, args))
cmdutil.CheckErr(o.Validate())
cmdutil.CheckErr(o.Run(f, cmd, args))
},
SuggestFor: []string{"list", "ps"},
}
o.PrintFlags.AddFlags(cmd)
cmd.Flags().StringVar(&o.Raw, "raw", o.Raw, "Raw URI to request from the server. Uses the transport specified by the kubeconfig file.")
cmd.Flags().BoolVarP(&o.Watch, "watch", "w", o.Watch, "After listing/getting the requested object, watch for changes.")
cmd.Flags().BoolVar(&o.WatchOnly, "watch-only", o.WatchOnly, "Watch for changes to the requested object(s), without listing/getting first.")
cmd.Flags().BoolVar(&o.OutputWatchEvents, "output-watch-events", o.OutputWatchEvents, "Output watch event objects when --watch or --watch-only is used. Existing objects are output as initial ADDED events.")
cmd.Flags().BoolVar(&o.IgnoreNotFound, "ignore-not-found", o.IgnoreNotFound, "If the requested object does not exist the command will return exit code 0.")
cmd.Flags().StringVar(&o.FieldSelector, "field-selector", o.FieldSelector, "Selector (field query) to filter on, supports '=', '==', and '!='.(e.g. --field-selector key1=value1,key2=value2). The server only supports a limited number of field queries per type.")
cmd.Flags().BoolVarP(&o.AllNamespaces, "all-namespaces", "A", o.AllNamespaces, "If present, list the requested object(s) across all namespaces. Namespace in current context is ignored even if specified with --namespace.")
addOpenAPIPrintColumnFlags(cmd, o)
addServerPrintColumnFlags(cmd, o)
cmdutil.AddFilenameOptionFlags(cmd, &o.FilenameOptions, "identifying the resource to get from a server.")
cmdutil.AddChunkSizeFlag(cmd, &o.ChunkSize)
cmdutil.AddLabelSelectorFlagVar(cmd, &o.LabelSelector)
cmdutil.AddSubresourceFlags(cmd, &o.Subresource, "If specified, gets the subresource of the requested object.", supportedSubresources...)
return cmd
}
在 cobra.Command 对象中,Use、Short、Long 和 Example 包含描述命令的信息,其中最重要的是定义了 Run 执行函数。Run 执行函数中定义了 get 命令的实现。Cobra 中的 Run 函数家族成员由很多,在 Run 函数之前调用(PreRun)或之后调用(PostRun)。它们的执行顺序为 PersistentPreRun -> PreRun -> Run -> PostRun -> PersistentPostRun
。
2.为 get 命令添加命令行参数
get 命令下支持的命令行参数较多,这里以 --all-namespaces
参数为例,代码示例如下:
cmd.Flags().BoolVarP(&o.AllNamespaces, "all-namespaces", "A", o.AllNamespaces, "If present, list the requested object(s) across all namespaces. Namespace in current context is ignored even if specified with --namespace.")
cmd.Flags 实现了命令行参数的解析:第 1 个参数,接受命令行参数的变量;第 2 个参数,指定命令行参数的名称;第 3 个参数,指定命令行参数的名称简称;第 4 个参数,设置命令行参数的默认值;第 5 个参数,设置命令行参数的提示信息。
3.执行命令
代码路径:cmd\kubectl\kubectl.go
func main() {
command := cmd.NewDefaultKubectlCommand()
if err := cli.RunNoErrOutput(command); err != nil {
// Pretty-print the error and exit with an error.
util.CheckErr(err)
}
}
kubectl 的 main 函数中定义了执行函数 command.Execute,原理是对命令行中的所有参数解析出 Command 和 Flag,把 Flag 作为参数传递给 Command 并执行。command.Execute -> ExecuteC
代码示例如下:
...
cmd, flags, err = c.Find(args)
...
err = cmd.execute(flags)
args 数组中包含所有命令行参数,通过 c.Find 函数解析出 cmd 和 flags,然后通过 cmd.execute 执行命令中定义的 Run 执行函数。
创建资源对象的过程
Deployment 是一种常见的资源对象。在 Kubernetes 系统中创建资源对象有很多种方法。本节将用 kubectl create 命令创建 Deployment 资源对象的过程进行分析。kubectl 资源对象创建过程如图:
使用kubectl创建资源对象是Kubernetes中最常见的操作之一,内部运行原理是客户端与服务端进行一次HTTP请求的交互。Kubernetes整个系统架构的设计方向是通用和具有高扩展性,所以以上功能在代码实现上略微复杂。
创建资源对象的流程可分为:
- 实例化 Factory 接口
- 通过 Builder 和 Visitor 将资源对象描述文件(deployment.yaml)文本格式转换成资源对象。
- 将资源对象以 HTTP 请求的方式发送给 kube-apiserver,并得到响应结果。
- 最终根据 Visitor 匿名函数集的 errors 判断是否成功创建了资源对象。
1. 编写资源对象描述文件
Kubernetes 系统的资源对象可以使用 JSON 或 YAML 文件来描述,一般使用 YAML 文件居多。下面提供了一个简单的 Deployment Example 资源对象文件:
本文主要了解 k8s 创建资源对象的过程,因此代码使用截图代替。
通过kubectl create命令与kube-apiserver交互并创建资源对象,执行命令如下:
kubectl create -f nginx-deployment.yaml
2. 实例化 Factory 接口
在执行每一个 kubectl 命令之前,都需要执行实例化 cmdutil.Factory接口对象的操作。Factory 是一个通用对象,它提供了与 kube-apiserver 的交互方式,以及验证资源对象等方法。cmdutil.Factory 接口代码示例如下:
源码使用最新版本k8s进行阅读: vendor/k8s.io/kubectl/pkg/cmd/cmd.go:332
cmdutil.Factory 接口说明如下:
• DynamicClient:动态客户端。
• KubernetesClientSet:ClientSet 客户端。
• RESTClient:RESTClient 客户端。
• NewBuilder:实例化 Builder,Builder 用于将命令行获取的参数转换成资源对象。
• Validator:验证资源对象。
cmdutil.Factory 接口封装了 3 种 client-go 客户端与 kube-apiserver 交互的方式,分别是 DynamicClient、KubernetesClientSet(简称ClientSet)及 RESTClient。3 种交互方式各有不同的应用场景。
3. Builder构建资源对象
Builder用于将命令行获取的参数转换成资源对象(Resource Object)。它实现了一种通用的资源对象转换功能。Builder结构体保存了命令行获取的各种参数,并通过不同函数处理不同参数,将其转换成资源对象。Builder的实现类似于Builder建造者设计模式,提供了一种实例化对象的最佳方式。代码示例如下: vendor/k8s.io/kubectl/pkg/cmd/create/create.go:251
首先通过 f.NewBuilder
实例化 Builder 对象,通过函数Unstructured、Schema、ContinueOnError、NamespaceParam、FilenameParam、LabelSelectorParam、Flatten 对参数赋值和初始化,将参数保存到 Builder 对象中。最后通过 Do 函数完成对资源的创建。
其中,FilenameParam 函数用于识别 kubectl create
命令行参数是通过哪种方式传入资源对象描述文件的,kubectl 目前支持3种方式:
- 第1种,标准输入Stdin(即cat deployment.yaml|kubectl create-f);
- 第2种,本地文件(即kubectl create-f deployment.yaml);
- 第3种,网络文件(即kubectl create-f http:///deployment.yaml)。
4. Visitor多层匿名函数嵌套
在 Builder Do 函数中,Result 对象中的结果由 Visitor 执行并产生,Visitor 的设计模式类似于 Visitor 访问者模式。Visitor 接口定义如下:vendor/k8s.io/cli-runtime/pkg/resource/interfaces.go:94
Visitor 接口包含 Visit 方法,实现了 Visit(VisitorFunc) error
的结构体都可以成为 Visitor。其中,VisitorFunc 是一个匿名函数,它接收 Info 与 error 信息,Info 结构用于存储 RESTClient 请求的返回结果,而 VisitorFunc 匿名函数则生成或处理 Info 结构。
Visitor 的设计较为复杂,并非单纯实现了访问者模式,它相当于一个匿名函数集。在 Kubernetes 源码中,Visitor 被设计为可以多层嵌套(即多层匿名函数嵌套,使用一个 Visitor 嵌套另一个 Visitor)。直接阅读 Visitor 源码,会比较晦涩,为了更好地理解 Visitor 的工作原理,这里提供了代码示例。Visitor Example 代码示例如下:
在 Visitor Example 代码示例中,定义了 Visitor 接口,增加了 VisitorList 对象,该对象相当于多个 Visitor 匿名函数的集合。另外,增加了 3 个 Visitor 的类,分别实现 Visit 方法,在每一个 VisitorFunc执行之前(before)和执行之后(after)分别输出 print 信息。Visitor Example代码执行结果输出如下:
通过 Visitor 代码示例的输出,能够更好地理解 Visitor 的多层嵌套关系。在 main 函数中,首先将 Visitor1 嵌入 VisitorList 中,VisitorList 是 Visitor 的集合,可存放多个 Visitor。然后将 VisitorList 嵌入 Visitor2 中,接着将 Visitor2 嵌入 Visitor3 中。最终形成 Visitor3{Visitor2{VisitorList{Visitor1}}}
的嵌套关系。
根据输出结果,最先执行的是 Visitor1 中 fn 匿名函数之前的代码,然后是 VisitorList、Visitor2 和 Visitor3 中 fn 匿名函数之前的代码。紧接着执行 VisitFunc(visitor.Visit)
。最后执行 Visitor3、Visitor2、VisitorList、Visitor1 的 fn 匿名函数之后的代码。整个多层嵌套关系的执行过程有些类似于递归操作。
多层嵌套关系理解起来有点困难,如果读者看过电影《盗梦空间》的话,该过程可以类比为其中的场景。每次执行 Visitor 相当于进入盗梦空间中的另一层梦境,在触发执行了 visitFunc return 后,就开始从每一层梦境中苏醒过来。
回到 Kubernetes 源码中的 Visitor,再次阅读源码时,就容易理解了。Visitor 中的 VisitorList(存放 Visitor 的集合)有两种,定义在vendor/k8s.io/cli-runtime/pkg/resource/visitor.go
中,代码示例如下:
- EagerVisitorList:当遍历执行 Visitor 时,如果遇到错误,则保留错误信息,继续遍历执行下一个 Visitor。最后一起返回所有错误。
- VisitorList:当遍历执行 Visitor 时,如果遇到错误,则立刻返回。
Kubernetes Visitor中存在多种实现方法,不同实现方法的作用不同
下面将资源创建(kubectl create -f yaml/deployment.yaml
)过程中的 Visitor 多层匿名函数嵌套关系整理了出来:
EagerVisitorList{FileVisitor{StreamVisitor{FlattenListVisitor{FlattenListVisitor{ContinueOnErrorVisitor{DecoratedVisitor{result.Visit{}}}}}}}}
EagerVisitorList 是 Visitor 集合,集合中包含 FileVisitor 和 StreamVisitor,执行 FileVisitor 和 StreamVisitor 并保留执行后的 error 信息,然后继续执行下面的 Visitor。FileVisitor 和 StreamVisitor将资源对象描述文件(deployment.yaml)的内容通过 infoForData 函数转换成 Info 对象。FlattenListVisitor 将资源对象描述文件中定义的资源类型转换成 Info 对象。ContinueOnErrorVisitor 将 Visitor 调用过程中产生的错误保留在 []error
中。DecoratedVisitor 会执行注册过的 VisitorFunc,分别介绍如下。
resource.SetNamespace
:设置命名空间(Namespace),确保每个Info对象都有命名空间。resource.RequireNamespace
:设置命名空间,并检查资源对象描述文件中提供的命名空间与命令行参数(-namespace
)提供的命名空间是否相符,如果不相符则返回错误。resource.RetrieveLazy
:如果info.Object
为空,则根据info
的 Namepsace 和 Name 等字段调用 Helper 获取 obj,并更新 info 的 Object 字段。
由result.Visit
执行 createAndRefresh:
第1步,通过Helper.Create
向 kube-apiserver 发送创建资源的请求,Helper 对 client-go 的 RESTClient 进行了封装,在此基础上实现了 Get、List、Watch、Delete、Create、Patch、Replace 等方法,实现了与 kube-apiserver 的交互功能;
第2步,将与 kube-apiserver 交互后得到的结果通过info.Refresh
函数更新到info.Object
中。最后逐个退出 Visitor,其过程为
DecoratedVisitor→ContinueOnErrorVisitor → FlattenListVisitor →FlattenListVisitor → StreamVisitor →FileVisitor→EagerVisitorList
。
最终根据 Visitor 的 error 信息为空判断创建资源请求执行成功。