【kubernetes】kubernetes中的Controller

news2024/12/22 17:38:55

1 什么是Controller?

kubernetes采用了声明式API,与声明式API相对应的是命令式API:

  • 声明式API:用户只需要告诉期望达到的结果,系统自动去完成用户的期望
  • 命令式API:用户需要关注过程,通过命令一步一步完成用户的需求

因此,用户向k8s提交的yaml文件中最重要的部分就是spec,相当于就是用户期望的结果,而使用-o yaml选项查看时,还有一个很重要的部分就是status,它表示的就是当前状态,因此,k8s主要任务就是完成status->spec的转变。这项工作就是Controller(控制器)完成的。

对于不同的资源,控制逻辑是不一样的,因此,就有很多Controller,例如,DeploymentController负责将Deployment的status向spec进行转变,ReplicaSetController负责将ReplicaSet的status向spec进行转变。

从上面可以看出,Controller的工作方式如下:

  • 监听资源变化
  • 得到资源的当前状态status和期望状态spec
  • 执行逻辑使得status->spec

下面以Deployment的创建操作为例说明整个流程:
请添加图片描述

通过上图重新复习下各组件的工作方式:

  • apiserver:为其他组件提供接口,并且所有的组件都通过apiserver进行交互
  • etcd:存储集群的资源对象
  • Controller Manager:管理控制器,Watch -> Analyze -> Act,监听资源的变化,分析出spec和status的差别,执行操作使得status向spec转变
  • Scheduler:监听资源的变化,如果发现未调度的Pod,通过一定的策略选择出Node,设置Pod的Node字段
  • Kubelet:监听调度给当前Node的Pod,并执行对应的操作

可以发现,除了apiserver和etcd,其他组件都可以称为Controller。

2 Controller的实现

知道了Controller的工作方式,如果是我们自己实现Controller,可以会这样来实现:

请添加图片描述

Controller直接通过Apiserver的接口监控对应资源的变化,当资源发生变化时,直接执行对应的业务逻辑,也就是调协循环。

这样会有啥问题呢?

当集群中Node很多时,就会有很多kubelet监控Pod的状态变化,而所有的监听操作都需要通过apiserver,那么apiserver的压力就会很大,就会造成集群的不稳定。

当然,其他资源(例如,Pod或者服务)很多时,同样会造成集群不稳定。

因此,k8s的client-go(client-go)库采用了另外的设计:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

client-go components:

  • Reflector:对特定类型的资源执行ListAndWatch,当监听到资源变更时,通过API获取最新的资源对象,然后将它放到Delta Fifo queue队列中
  • Informer:从Delta Fifo queue队列中弹出对象,然后调用Indexer放到Store里面,同时调用用户提交的回调函数(ResourceEventHandler)
  • Indexer:用于操作Store中的对象

Custom Controller components:

  • Informer Reference和Indexer Reference都是对client-go中对象的引用,用户控制器可以通过cache库直接创建或者使用Factory工厂函数创建
  • ResourceEventHandler:用户控制器接收对象的回调函数,一般来说,里面的逻辑就是,获取对象的key,然后将key写入WorkQueue
  • WorkQueue:用户控制器创建的队列,负责存储用户控制器需要处理的对象的key
  • Process Item:从WorkQueue中读取key,通过key获取对应的对象

上图是通常会给出的关于Controller的实际实现的逻辑,初看还是挺复杂的,大致的模块和功能如下:

请添加图片描述

于是,Controller实现的步骤如下:

  • 获取Informer和Indexer的引用,指定要监控变更的资源类型,注册ResourceEventHandler,并创建WorkQueue,用上述的3个对象初始化我们自己的Controller
  • 编写Process Item Loop,从WorkQueue中读取key,然后执行我们自己的业务逻辑

因此,整个Controller我们需要注入的逻辑只有2个部分,其他都是相对固定的:

  • ResourceEventHandler
  • Process Item

3 Controller的使用

上面介绍了k8s中的Controller的实现,而要使用

下面对client-go中的workqueue的例子进行分析:

workqueue example by client-go

type Controller struct {
	indexer  cache.Indexer // Indexer,缓存的索引
	queue    workqueue.RateLimitingInterface // 带限速功能的WorkQueue
	informer cache.Controller // Informer
}

// 创建控制器
func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
	return &Controller{
		informer: informer,
		indexer:  indexer,
		queue:    queue,
	}
}

// worker的具体执行逻辑
func (c *Controller) processNextItem() bool {
	// 从workqueue中获取key
	key, quit := c.queue.Get()
	if quit {
		return false
	}
	
	// 告诉队列已经处理完毕
	defer c.queue.Done(key)

	err := c.syncToStdout(key.(string))
	
	// 错误处理
	c.handleErr(err, key)
	return true
}

// 控制器的业务逻辑,这里就执行status->spec的转变
func (c *Controller) syncToStdout(key string) error {
	obj, exists, err := c.indexer.GetByKey(key)
	if err != nil {
		klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
		return err
	}

	if !exists {
		// Pod已经不存在
		fmt.Printf("Pod %s does not exist anymore\n", key)
	} else {
		// 这里执行status->spec的转变逻辑
		fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
	}
	return nil
}

// 错误处理,包含重试处理
func (c *Controller) handleErr(err error, key interface{}) {
	if err == nil {
		// 处理完毕
		c.queue.Forget(key)
		return
	}

	// 如果出现问题,会进行重试,也就是重新入workqueue
	// 但是,入workqueue不超过5次
	if c.queue.NumRequeues(key) < 5 {
		klog.Infof("Error syncing pod %v: %v", key, err)

		// 重新入workqueue
		c.queue.AddRateLimited(key)
		return
	}

	c.queue.Forget(key)
	runtime.HandleError(err)
	klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}

// 启动我们自己的控制器
func (c *Controller) Run(workers int, stopCh chan struct{}) {
	defer runtime.HandleCrash()

	defer c.queue.ShutDown()

	// 启动Informer开始监听资源变化
	go c.informer.Run(stopCh)

	// 等待cache同步
	if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
		return
	}

	// 运行若干个worker,
	// wait.Until(),每隔1秒执行runWorker()函数,直到stopCh收到结束信号
	for i := 0; i < workers; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}

	// 读取结束信号,结束控制器
	<-stopCh
	klog.Info("Stopping Pod controller")
}

func (c *Controller) runWorker() {
	for c.processNextItem() {
	}
}

func main() {
	var kubeconfig string
	var master string

	flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
	flag.StringVar(&master, "master", "", "master url")
	flag.Parse()

	// 通过master和kubeconfig生成配置对象
	config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
	if err != nil {
		klog.Fatal(err)
	}

	// 根据配置对象生成clientset,用于连接k8s
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		klog.Fatal(err)
	}

	// 创建Pod的watcher
	podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())

	// 创建workqueue
	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

	// 创建Indexer和Informer,其中重要的是两个参数,Pod的watcher和回调函数
	// 告知Informer,我们只监听Pod的资源变化,并且,给Infomer注册回调函数
	indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
		UpdateFunc: func(old interface{}, new interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(new)
			if err == nil {
				queue.Add(key)
			}
		},
		DeleteFunc: func(obj interface{}) {
			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
	}, cache.Indexers{})

	// 创建我们自己的控制器
	controller := NewController(queue, indexer, informer)

	// 启动控制器
	stop := make(chan struct{})
	defer close(stop)
	go controller.Run(1, stop)

	// Wait forever
	select {}
}

参考资料:

1 client-go under the hood

2 client-go Examples

3 k8s-client-go demo

4 writing controllers

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1059851.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

EdgeView 4 for Mac:重新定义您的图像查看体验

您是否厌倦了那些功能繁杂、操作复杂的图像查看器&#xff1f;您是否渴望一款简单、快速且高效的工具&#xff0c;以便更轻松地浏览和管理您的图像库&#xff1f;如果答案是肯定的&#xff0c;那么EdgeView 4 for Mac将是您的理想之选&#xff01; EdgeView 4是一款专为Mac用户…

【C语言】结构体内存对齐

结构体内存对齐是很重要的一个考点&#xff0c;但不难掌握&#xff0c;接下来就来了解一下结构体内存对齐 目录 对齐规则&#xff1a;偏移量&#xff1a;例题&#xff1a;为什么存在内存对齐? 对齐规则&#xff1a; 首先来看一下对齐规则 第一个成员在与结构体变量偏移量为0…

交叉编译和GCC编译器

目录 交叉编译 hello.c文件 提问 GCC编译器 GCC编译过程 GCC常用选项 编译多个文件 预处理 编译 汇编 链接 交叉编译 hello.c文件 #include <stdio.h>int main(int argc, char argv) {if(argc > 2)printf("Hello, %s!\n", argv[1]);elseprintf…

【Spring Boot】创建一个 Spring Boot 项目

创建一个 Spring Boot 项目 1. 安装插件2. 创建 Spring Boot 项目3. 项目目录介绍和运行注意事项 1. 安装插件 IDEA 中安装 Spring Boot Helper / Spring Assistant / Spring Initializr and Assistant插件才能创建 Spring Boot 项⽬ &#xff08;有时候不用安装&#xff0c;直…

JVM:如何通俗的理解并发的可达性分析

并发的可达性分析 前面在介绍对象是否已死那一节有说到可达性分析算法&#xff0c;它理论上是要求全过程都基于一个能保障一致性的快照&#xff08;类比 MySQL 的MVCC&#xff09;中才能够进行分析&#xff0c;也就意味着必须全程冻结用户线程的运行&#xff08;STW&#xff0…

实战型开发1/3--结果业务导向

假期难得一段时间把近期一些实战型开发的阅读&#xff0c;实践做一些小结&#xff1b; 风格方面就是包括不限于一些好的开发实践&#xff0c;nb的开发技术流程等&#xff0c;但是总体着力于实战型的开发&#xff1b; 三层视角 业务&团队视角&#xff1a;开发所要最终服务…

单调栈---基础数据结构与算法

简介 栈 (stack) 又名堆栈&#xff0c;是一种数据结构&#xff0c;向一个栈插入新元素又称作进栈、入栈或压栈&#xff0c;从一个栈删除元素又称作出栈或退栈。 栈是一种只允许在表尾进行插入和删除操作的线性表&#xff0c;也就是我们所说的后进先出&#xff0c;我们把栈想象…

软件测试工程师经典面试题,金九银十可以跳槽了。

大家好&#xff0c;前两天跟朋友感慨&#xff0c;今年的铜九铁十、裁员、导致好多人都没拿到offer&#xff01;现在互联网大厂终于迎来了应届生集中求职季。 对于想跳槽的职场人来说&#xff0c;绝对是个找工作的好时机。这时候&#xff0c;很多高薪技术岗、管理岗的缺口和市场…

想要精通算法和SQL的成长之路 - 并查集的运用和案例(省份数量)

想要精通算法和SQL的成长之路 - 并查集的运用 前言一. 并查集的使用和模板1.1 初始化1.2 find 查找函数1.3 union 合并集合1.4 connected 判断相连性1.5 完整代码 二. 运用案例 - 省份数量 前言 想要精通算法和SQL的成长之路 - 系列导航 一. 并查集的使用和模板 先说一下并查集…

记住这份软件测试八股文还怕不能拿offer?你值得拥有

前言 2023秋招即将来临&#xff0c;很多同学会问软件测试面试八股文有必要背吗&#xff1f; 我的回答是&#xff1a;很有必要。你可以讨厌这种模式&#xff0c;但你一定要去背&#xff0c;因为不背你就进不了大厂。 国内的互联网面试&#xff0c;恐怕是现存的、最接近科举考试…

计算机竞赛 车道线检测(自动驾驶 机器视觉)

0 前言 无人驾驶技术是机器学习为主的一门前沿领域&#xff0c;在无人驾驶领域中机器学习的各种算法随处可见&#xff0c;今天学长给大家介绍无人驾驶技术中的车道线检测。 1 车道线检测 在无人驾驶领域每一个任务都是相当复杂&#xff0c;看上去无从下手。那么面对这样极其…

【MySQL】表的约束(一)

文章目录 为什么要有约束一. 空属性二. 默认值三. 列描述四. zerofill结束语 为什么要有约束 数据库是用来存放数据的&#xff0c;所以其需要保证数据的完整性和可靠性 数据类型也算是一种约束&#xff0c;比如&#xff0c;整型的数据无法插入字符型。 通过约束&#xff0c;让…

超详细!手把手带你实现一个完整的Promise

Promise是JavaScript中异步编程的解决方案&#xff0c;一开始在社区中提出和实现&#xff0c;后来ECMAScript将其写进了标准中。Promise有效的解决了异步编程的回调地狱问题&#xff0c;非常受开发者的欢迎。 本文首先介绍了JavaScript中异步编程的几种方式&#xff0c;再对Pr…

Vue学习之页面上中下三层布局

Vue学习之页面上中下三层布局 页面布局&#xff1a;头部&#xff0c;内容区&#xff0c;尾部&#xff0c;其中头部和尾部几乎所有页面都有&#xff0c;可抽成公共组件&#xff0c;内容区是可变的&#xff0c;由路由组件展示 页面效果 实现 &#xff08;1&#xff09;app.vue &…

为什么在使用PageHelper插件时,指定的每页记录数大小失效?显示所有的记录数

1.问题现象&#xff1a; 这里指定每页显示5条&#xff0c;却把所有的记录数都显示出来了 2.分析&#xff1a; 之前是可以的&#xff0c;然后发现&#xff1a;PageHelper.startPage(pageNum,pageSize) 和执行sql的语句 顺序颠倒了&#xff0c;然后就出错了。 3.验证&#xf…

十天学完基础数据结构-第四天(链表(Linked List))

链表的基本概念 链表是一种线性数据结构&#xff0c;与数组不同&#xff0c;链表的元素&#xff08;节点&#xff09;之间通过指针相互连接。链表有以下基本概念&#xff1a; 节点&#xff1a;链表中的每个数据项称为节点&#xff0c;每个节点包含数据和一个指向下一个节点的指…

【Unity2022】Unity实现手机游戏操控摇杆(实现操控轮盘)

文章目录 运行效果预览创建物体脚本获取RectTransform处理玩家拖动事件完整代码 获取输入运行其他文章 运行效果预览 首先展示一下本文章实现的效果&#xff1a; 创建物体 创建两个UI图像&#xff0c;一个用于表示背景&#xff0c;作为父物体&#xff0c;命名为JoyStick&am…

ubuntu安装ROS rosdep init rosdep update报错,完美解决方案!

ubuntu安装ROS rosdep init rosdep update报错&#xff0c;终于让我发现完美解决方法啦&#xff01;清华源解决 问题的原因完美解决&#xff01; 问题的原因 rosdep init&#xff0c;rosdep update报错的根本原因还是国内网络连不上外网。所以改DNS之类的方法都是比较偶然能成…

Windows安装Docker并创建Ubuntu环境及运行神经网络模型

目录 前言在Windows上安装Docker在Docker上创建Ubuntu镜像并运行容器创建Ubuntu镜像配置容器&#xff0c;使其可以在宿主机上显示GUI 创建容器并运行神经网络模型创建容器随便找一个神经网络模型试试 总结 前言 学生党一般用个人电脑玩神经网络&#xff0c;估计很少有自己的服…

nginx下载与安装教程

文章目录 nginx简介nginx的主要应用场景nginx开源项目的源码结构 使用centos7安装nginx检查centos版本号和linux内核版本检查是否安装gcc、pcre、zlib、openssl等依赖 安装nginx启动nginx停止nginx重启nginx nginx简介 nginx是一款业内流行、功能强大的web服务器。 高性能&…