Kubeedge:Metamanager源码速读(不定期更新)

news2024/11/25 14:48:22

Kubeedge源码版本:v1.15.1

在看Metamanager之前,先看一下Metamanager源码的目录结构(位于edge/pkg下)和官方文档:

目录结构如下面的两张图所示。请忽略绿色的文件高亮,这是Jetbrains goland对未提交修改的文件的标记。
在这里插入图片描述
在这里插入图片描述
然后简单地看一下官方文档。

官方文档宣称,Metamanager做的事情主要包括两类,分别是Edged和Edgehub之间的消息中介处理和消息持久化。

此外:

  • Metamanager抽象了一系列客户端接口,允许edged封装资源的变更信息并且发给Metamanager
  • Metamanager开启了一个HTTPServer用来接受k8s API的直连。

Metamanager基于下面的这些操作收发不同种类的message:

Insert
Update
Delete
Query
Response
NodeConnection
MetaSync

其中:

Insert操作(比如,新增一个pod)的主要流程图如下:
在这里插入图片描述

Update操作(比如云端给pod追加标签,或者edged检测到了pod的变更,向云上汇报)的主要流程图如下,根据变更的来源不同,有两种信息流动的流程:
在这里插入图片描述

Delete操作主要流程图如下:
在这里插入图片描述
在delete操作中,云端下达指令,edgehub转发,metamng会先把边缘里的数据删掉,然后把指令下发给edged。

Query操作主要允许edged查询本地的数据库缓存和云上(比如configmap/secret)的etcd。消息源可以拆成3个part(resKey/resType/resId),主要流程如下:
在这里插入图片描述

Response操作:就是上面那些图片里的请求对应的相应。

NodeConnection操作:edgehub会发送向边缘组件广播当前边缘节点的连接状态——告知云边是否连接。metamanager会在内存里维护这个信息,用于特定的操作(比如向云发送query)。

MetaSync操作:定期同步edge上pod的状态。

下面考察Metamanager的源码。从Start函数开始:

func (m *metaManager) Start() {
	if metaserverconfig.Config.Enable {
		imitator.StorageInit() // 初始化资源版本(RV)
		go metaserver.NewMetaServer().Start(beehiveContext.Done())
	}

	m.runMetaManager()
}

StorageInit做的事情主要是初始化RV。从边缘数据库metav2表里拿到最新的资源版本。

// StorageInit must be called before using imitator storage (run metaserver or metamanager)
func StorageInit() {
	m := new(v2.MetaV2)
	// get the most recent record as the init resource version
	_, err := dbm.DBAccess.QueryTable(v2.NewMetaTableName).OrderBy("-" + v2.RV).Limit(1).All(m)
	utilruntime.Must(err)
	DefaultV2Client.SetRevision(m.ResourceVersion)
}

然后,根据指定的配置生成一个metaserver:

func NewMetaServer() *MetaServer {
	ls := MetaServer{
		HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup),
		LongRunningFunc:       genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
		NegotiatedSerializer:  serializer.NewNegotiatedSerializer(),
		Factory:               handlerfactory.NewFactory(),
		Auth:                  buildAuth(),
	}
	return &ls
}

最后Start:

func (ls *MetaServer) Start(stopChan <-chan struct{}) {
	if kefeatures.DefaultFeatureGate.Enabled(kefeatures.RequireAuthorization) {
		ls.startHTTPSServer(stopChan)
	} else {
		ls.startHTTPServer(stopChan)
	}
}

HTTPSServer主要就是增加了openssl x509的那些密钥,用于构建安全的HTTP服务器。

为代码分析方便起见,我们忽略安全性部分,看一下HTTPServer里干了什么事情:

主要是指定了Handler,用于处理云端APIServer直连时收发的信息,最后启动了一个http服务器。

func (ls *MetaServer) startHTTPServer(stopChan <-chan struct{}) {
	h := ls.BuildBasicHandler()
	h = BuildHandlerChain(h, ls)
	s := http.Server{
		Addr:    metaserverconfig.Config.Server,
		Handler: h,
	}

	go func() { // 用于server的退出
		<-stopChan

		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // 把退出的消息往下传
		defer cancel()
		if err := s.Shutdown(ctx); err != nil {
			klog.Errorf("Server shutdown failed: %s", err)
		}
	}()

	klog.Infof("[metaserver]start to listen and server at http://%v", s.Addr)
	utilruntime.HandleError(s.ListenAndServe())
	// When the MetaServer stops abnormally, other module services are stopped at the same time.
	beehiveContext.Cancel()
}

具体的handler函数逻辑见下:

func (ls *MetaServer) BuildBasicHandler() http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
		ctx := req.Context()
		reqInfo, ok := apirequest.RequestInfoFrom(ctx)
		//klog.Infof("[metaserver]get a req(%v)(%v)", reqInfo.Path, reqInfo.Verb)
		//klog.Infof("[metaserver]get a req(\nPath:%v; \nVerb:%v; \nHeader:%+v)", reqInfo.Path, reqInfo.Verb, req.Header)
		if !ok {
			err := fmt.Errorf("invalid request")
			responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
			return
		}

		if reqInfo.IsResourceRequest {
			switch {
			case reqInfo.Verb == "get":
				ls.Factory.Get().ServeHTTP(w, req)
			case reqInfo.Verb == "list", reqInfo.Verb == "watch":
				ls.Factory.List().ServeHTTP(w, req)
			case reqInfo.Verb == "create":
				ls.Factory.Create(reqInfo).ServeHTTP(w, req)
			case reqInfo.Verb == "delete":
				ls.Factory.Delete().ServeHTTP(w, req)
			case reqInfo.Verb == "update":
				ls.Factory.Update(reqInfo).ServeHTTP(w, req)
			case reqInfo.Verb == "patch":
				ls.Factory.Patch(reqInfo).ServeHTTP(w, req)
			default:
				err := fmt.Errorf("unsupported req verb")
				responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
			}
			return
		}

		if passthrough.IsPassThroughPath(reqInfo.Path, reqInfo.Verb) {
			ls.Factory.PassThrough().ServeHTTP(w, req)
			return
		}

		err := fmt.Errorf("request[%s::%s] isn't supported", reqInfo.Path, reqInfo.Verb)
		responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
	})
}

最后,在Start函数里执行runMetaManager()

func (m *metaManager) runMetaManager() {
	go func() {
		for {
			select {
			case <-beehiveContext.Done():
				klog.Warning("MetaManager main loop stop")
				return
			default:
			}
			msg, err := beehiveContext.Receive(m.Name())
			if err != nil {
				klog.Errorf("get a message %+v: %v", msg, err)
				continue
			}
			klog.V(2).Infof("get a message %+v", msg)
			m.process(msg)
		}
	}()
}

主要的逻辑就是不断地从beehive框架那里拿一个message然后进行处理。但是处理的过程比较长。重点是这个process函数:

func (m *metaManager) process(message model.Message) {
	operation := message.GetOperation()

	switch operation {
	case model.InsertOperation:
		m.processInsert(message)
	case model.UpdateOperation:
		m.processUpdate(message)
	case model.PatchOperation:
		m.processPatch(message)
	case model.DeleteOperation:
		m.processDelete(message)
	case model.QueryOperation:
		m.processQuery(message)
	case model.ResponseOperation:
		m.processResponse(message)
	case constants.CSIOperationTypeCreateVolume,
		constants.CSIOperationTypeDeleteVolume,
		constants.CSIOperationTypeControllerPublishVolume,
		constants.CSIOperationTypeControllerUnpublishVolume:
		m.processVolume(message)
	default:
		klog.Errorf("metamanager not supported operation: %v", operation)
	}
}

可以看到,process函数会处理Insert、Update、Patch、Delete、Query、Response信息,以及处理和卷相关的信息。

我们以m.processUpdate(message)为例考察process的处理逻辑。主要是看kubeedge的源码,跳过migration的部分:

func (m *metaManager) processUpdate(message model.Message) {
	imitator.DefaultV2Client.Inject(message)

	msgSource := message.GetSource()
	_, resType, _ := parseResource(&message)
	if msgSource == modules.EdgedModuleName && resType == model.ResourceTypeLease {
		// 来自于edged的消息(需要转发到云上)并且type为lease(用于节点心跳)
		// edged里的kubelet会定时地向云端发送心跳信息,但是在边缘设备里需要对消息做更进一步的包裹
		if !connect.IsConnected() { // 云边断连就直接返回
			klog.Warningf("process remote failed, req[%s], err: %v", msgDebugInfo(&message), errNotConnected)
			feedbackError(fmt.Errorf("failed to process remote: %s", errNotConnected), message) // 把错误消息返回给edged
			return
		}
		m.processRemote(message) // 这个函数相当于由metamng代替edged发送消息,并且处理自动回复,当然processRemote的功能不止于此
		return
	}

	// 如果不是“edged的心跳信息”,比如是pod的更新信息,那么咱们自己处理一手
	if err := m.handleMessage(&message); err != nil { // 拿到消息后先经过m.handleMessage函数记录到db中
		feedbackError(err, message)
		return
	}

	// 根据edged的模块名,自行决定转发路径
	switch msgSource {
	case modules.EdgedModuleName:
		// For pod status update message, we need to wait for the response message
		// to ensure that the pod status is correctly reported to the kube-apiserver
		sendToCloud(&message)
		resp := message.NewRespByMessage(&message, OK)
		sendToEdged(resp, message.IsSync())
	case cloudmodules.EdgeControllerModuleName, cloudmodules.DynamicControllerModuleName:
		sendToEdged(&message, message.IsSync())
		resp := message.NewRespByMessage(&message, OK)
		sendToCloud(resp)
	case cloudmodules.DeviceControllerModuleName:
		resp := message.NewRespByMessage(&message, OK)
		sendToCloud(resp)
		message.SetRoute(modules.MetaGroup, modules.DeviceTwinModuleName)
		beehiveContext.Send(modules.DeviceTwinModuleName, message)
	case cloudmodules.PolicyControllerModuleName:
		resp := message.NewRespByMessage(&message, OK)
		sendToCloud(resp)
	default:
		klog.Errorf("unsupport message source, %s", msgSource)
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1632514.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用Pandas从Excel文件中提取满足条件的数据并生成新的文件

目录 一、引言 二、环境准备 三、读取Excel文件 四、数据筛选 五、保存为新的Excel文件 六、案例与代码总结 七、进阶用法与注意事项 八、结语 在数据处理的日常工作中&#xff0c;我们经常需要从大量数据中筛选出满足特定条件的数据集。Pandas是一个强大的Python数据分…

【产品经理】如果人人都是产品经理,那么如何提升自己的不可替代性?

任何职业都需要有危机感&#xff0c;不只是产品经理。 人有生老病死&#xff0c;相应的职场上也有升降变离。当乔布斯站在宇宙之巅望着芸芸众生说“活着就是为了改变世界”的时候&#xff0c;这话着实燃烧了我们一把。随之&#xff0c;马化腾、周鸿祎、张小龙、王小川等汹涌而入…

瓦片编辑器成功移植到小熊猫C++ 2.25.1版本,解决_findnext移植问题

移植之后出现绿色屏幕闪退 查了版本回滚直到不闪退&#xff0c;发现是在读取自定义文件上出问题 然后在找读取自定义文件函数&#xff0c;发现是读取图片部分出问题 然后就卡住了 调试半天&#xff0c;不是数据溢出&#xff0c;于是就看 函数_findnext,网上搜 ———_findn…

WebGL/Cesium 大空间相机抖动 RTE(Relative to Eye)实现原理简析

在浏览器中渲染大尺寸 3D 模型&#xff1a;Speckle 处理空间抖动的方法 WebGL/Cesium 大空间相机抖动 RTE(Relative to Eye)实现原理简析 注: 相机空间和视图空间 概念等效混用 1、实现的关键代码 const material new THREE.RawShaderMaterial({uniforms: {cameraPostion: {…

花生壳域名收费?那就用免费的dnsexit动态域名解析保姆级图文教程,效果杠杠的

免费dnsexit动态域名解析教程 在互联网上有很多不同的域名解析服务&#xff0c;其中dnsexit是一个流行的免费动态域名解析服务&#xff0c;它允许用户动态更新其IP地址&#xff0c;确保域名始终指向正确的服务器。以下是一个dnsexit动态域名解析的图文教程&#xff0c;帮助你了…

OpenCV 实现重新映射

返回:OpenCV系列文章目录&#xff08;持续更新中......&#xff09; 上一篇&#xff1a;OpenCV 实现霍夫圆变换 下一篇 :OpenCV实现仿射变换 目标 在本教程中&#xff0c;您将学习如何&#xff1a; 一个。使用 OpenCV 函数 cv&#xff1a;&#xff1a;remap 实现简单的重新…

AIGC遇到制造业

AIGC (AlGeneratedContet&#xff0c;即由AI生成的内容) &#xff0c;是指通过AI技术生成的各种文本、图片、音频和视频等形式的内容。AI技术的发展使得AI生成的内容在质量和多样性方面取得了巨大的进步。通过AI生成的内容&#xff0c;可以快速、批量地满足人们对于个性化、多样…

持续记录|UNIAPP适配APP遇到的问题以及解决方案

在使用UNIAPP开发APP的时候遇到的一些奇奇怪怪问题记录 组件样式丢失 问题&#xff1a;组件引入界面中&#xff0c;在小程序和H5环境下样式正常&#xff0c;而在APP中却出现高度异常问题 解决&#xff1a;增加view标签将组件包裹起来即可正常显示 解决前&#xff1a; 解决后…

考试实况:云计算HCIE考试中我是这样做的

大家好&#xff0c;我是誉天云计算HCIE学员黄同学&#xff0c;4月11日已成功获取到hcie证书。 考取云计算IE大概花费了4个半月时间&#xff08;3个月学习1个半月备考&#xff09;&#xff0c;由于我是脱产考试的&#xff0c;所以备考时间给自己定了1-1个半月时间考下来。整个课…

在config.json文件中配置出来new mars3d.graphic.PolylineCombine({大量线合并渲染类型的geojson图层

在config.json文件中配置出来new mars3d.graphic.PolylineCombine({大量线合并渲染类型的geojson图层 问题场景&#xff1a; 1.浏览官网示例的时候图层看到大量线数据合并渲染的示例 2.矢量数据较大量级的时候&#xff0c;这种时候怎么在config.json文件中尝试配置呢&#x…

【Java并发知识总结 | 第九篇】ThreadLocal总结

文章目录 9.ThreadLocal总结9.1ThreadLocal是什么&#xff1f;9.2ThreadLocal的作用&#xff1f;9.3使用ThreadLocal9.4ThreadLocal原理9.5ThreadLocal问题&#xff1a;内存泄漏/溢出9.6为什么key要设计成弱引用&#xff1f;9.7ThreadLocal中的强弱引用关系9.8ThreadLocalMap怎…

web安全---CSRF漏洞/OWASP-CSRFTester的使用

what 跨站请求伪造 Cross Site Request Forgery how 攻击者诱骗点击恶意网页&#xff0c;盗用&#xff08;伪造&#xff09;受害者的身份&#xff0c;以受害者的名义向服务器发送恶意请求,而这种恶意请求在服务端看起来是正常请求 CSRF&&XSS区别 他们最本质区别就…

利用RunnerGo数据大屏强化测试管理与决策

测试平台中的数据大屏在提供实时监控、统计分析、效率提升、制定策略和促进沟通等方面具有重要的意义。它为测试团队提供更全面、更直观的数据支持&#xff0c;有助于提高测试质量和效率&#xff0c;减少风险&#xff0c;并加强团队协作和沟通。 数据大屏也是RunnerGo的核心特…

21 Debian如何配置Apache2(1)配置文件摊开看

作者&#xff1a;网络傅老师 特别提示&#xff1a;未经作者允许&#xff0c;不得转载任何内容。违者必究&#xff01; Debian如何配置DNS服务&#xff08;2&#xff09;主从服务器 《傅老师Debian小知识库系列之20》——原创 前言 傅老师Debian小知识库特点&#xff1a; 1、…

LLM学习笔记-4

从Hugging Face加载预训练权重 因为每次训练都要有资源消耗 (GPU算力&#xff0c;还有时间成本&#xff09;&#xff0c;所以说及时保存模型是非常重要的。教大家如何去下载Hugging Face的模型进行生成文本 pip install transformers pip install tiktokenfrom importlib.me…

【树莓派】yolov5 Lite,目标检测,行人检测入侵报警,摄像头绑定

延续之前的程序&#xff1a; https://qq742971636.blog.csdn.net/article/details/138172400 文章目录 播放声音pygame不出声音怎么办&#xff08;调节音量&#xff09;树莓派上的音乐播放器&#xff08;可选&#xff09;命令行直接放歌&#xff08;尝试放mp3歌曲&#xff09; …

用vue3实现留言板功能

效果图&#xff1a; 代码&#xff1a; <script setup lang"ts"> import { ref } from vue;interface Message {name: string;phone: string;message: string; }const name ref<string>(); const phone ref<string>(); const message ref<st…

【SQL】❤️数据库理论加实践详细教程❤️实践出真知❤️

SQL(结构化查询语言) 基础部分 SQL作用 按照作用划分可以划分为四个模块&#xff0c;从而由此行文 DDL&#xff08;数据定义语言&#xff09;: DDL涉及的命令允许用户定义或修改数据库的结构。主要命令包括&#xff1a; CREATE&#xff1a;用于创建新的数据库对象&#xff0c;…

使用 BurpSuite 基于 Token 机制实施暴力破解

前言 Token是一种用于身份验证和授权的令牌&#xff0c;通常由服务器生成并发送给客户端&#xff0c;客户端在后续的请求中携带该令牌来进行身份验证和授权操作。Token的使用可以增强应用程序的安全性&#xff0c;避免了直接传递敏感凭证&#xff08;如用户名和密码&#xff0…

SpringMVC整体工作流程

. 用户发起一个请求&#xff0c;请求首先到达前端控制器前端控制器接收到请求后会调用处理器映射器&#xff0c;由此得知&#xff0c;这个请求该由哪一个Controller来进行处理(并未调用Controller)&#xff1b;前端控制器调用处理器适配器&#xff0c;告诉处理器适配器应该要…