22.3 解读k8s服务发现源码

news2024/11/30 7:35:03

本节重点介绍 :

  • discovery.Manager服务发现管理员
    • 注册各个服务发现源
    • 启动各个服务发现源
    • 处理服务发现的结果
  • k8s服务发现
    • k8s-client informer机制

架构图补充

image.png

注册各个服务发现源

  • 位置 D:\go_path\src\github.com\prometheus\prometheus\discovery\manager.go
  • 去掉部分细节 , m.registerProviders代表注册服务发现源
func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
    	for name, scfg := range cfg {
    		failedCount += m.registerProviders(scfg, name)
    		discoveredTargets.WithLabelValues(m.name, name).Set(0)
    	}
}
  • 在registerProviders 调用各个服务发现源的NewDiscoverer方法,然后构造providers对象
		d, err := cfg.NewDiscoverer(DiscovererOptions{
			Logger: log.With(m.logger, "discovery", typ),
		})
		m.providers = append(m.providers, &provider{
			name:   fmt.Sprintf("%s/%d", typ, len(m.providers)),
			d:      d,
			config: cfg,
			subs:   []string{setName},
		})
  • 对应在k8s中就是,在 D:\go_path\src\github.com\prometheus\prometheus\discovery\kubernetes\kubernetes.go

k8s中的NewDiscoverer方法如下

// New creates a new Kubernetes discovery for the given role.
func New(l log.Logger, conf *SDConfig) (*Discovery, error) {
	if l == nil {
		l = log.NewNopLogger()
	}
	var (
		kcfg *rest.Config
		err  error
	)
	if conf.KubeConfig != "" {
		kcfg, err = clientcmd.BuildConfigFromFlags("", conf.KubeConfig)
		if err != nil {
			return nil, err
		}
	} else if conf.APIServer.URL == nil {
		// Use the Kubernetes provided pod service account
		// as described in https://kubernetes.io/docs/admin/service-accounts-admin/
		kcfg, err = rest.InClusterConfig()
		if err != nil {
			return nil, err
		}
		level.Info(l).Log("msg", "Using pod service account via in-cluster config")
	} else {
		rt, err := config.NewRoundTripperFromConfig(conf.HTTPClientConfig, "kubernetes_sd", config.WithHTTP2Disabled())
		if err != nil {
			return nil, err
		}
		kcfg = &rest.Config{
			Host:      conf.APIServer.String(),
			Transport: rt,
		}
	}

	kcfg.UserAgent = userAgent

	c, err := kubernetes.NewForConfig(kcfg)
	if err != nil {
		return nil, err
	}
	return &Discovery{
		client:             c,
		logger:             l,
		role:               conf.Role,
		namespaceDiscovery: &conf.NamespaceDiscovery,
		discoverers:        make([]discovery.Discoverer, 0),
		selectors:          mapSelector(conf.Selectors),
	}, nil
}
  • 如果 用户指定了 kubeconfig_file或者 api_server那么用制定的地址和配置初始化k8s的client
  • 不然使用rest.InClusterConfig方式配合service account初始化

discovery.manager中startProvider开启服务发现worker和结果处理任务

  • 代码如下
func (m *Manager) startProvider(ctx context.Context, p *provider) {
	level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
	ctx, cancel := context.WithCancel(ctx)
	updates := make(chan []*targetgroup.Group)

	m.discoverCancel = append(m.discoverCancel, cancel)

	go p.d.Run(ctx, updates)
	go m.updater(ctx, p, updates)
}

p.d.Run(ctx, updates) 开启服务发现worker

  • 对应k8s的Discovery.Run在 位置 D:\go_path\src\github.com\prometheus\prometheus\discovery\kubernetes\kubernetes.go
  • 以发现node为例,代码如下
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
    namespaces := d.getNamespaces()
    switch d.role {
        	case RoleNode:
        		nlw := &cache.ListWatch{
        			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
        				options.FieldSelector = d.selectors.node.field
        				options.LabelSelector = d.selectors.node.label
        				return d.client.CoreV1().Nodes().List(ctx, options)
        			},
        			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
        				options.FieldSelector = d.selectors.node.field
        				options.LabelSelector = d.selectors.node.label
        				return d.client.CoreV1().Nodes().Watch(ctx, options)
        			},
        		}
        		node := NewNode(
        			log.With(d.logger, "role", "node"),
        			cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncPeriod),
        		)
        		d.discoverers = append(d.discoverers, node)
        		go node.informer.Run(ctx.Done())
}
}
  • 解读一下:创建node的list 和watch方法,执行对应k8s-client informer的Run方法
  • K8S的informer模块封装list-watch API,用户只需要指定资源,编写事件处理函数,AddFunc,UpdateFunc和DeleteFunc等
  • 这样就可以实现对指定资源的变更监听了

执行具体role的run方法

	var wg sync.WaitGroup
	for _, dd := range d.discoverers {
		wg.Add(1)
		go func(d discovery.Discoverer) {
			defer wg.Done()
			d.Run(ctx, ch)
		}(dd)
	}
  • 对应role=node的run
// Run implements the Discoverer interface.
func (n *Node) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
	defer n.queue.ShutDown()

	if !cache.WaitForCacheSync(ctx.Done(), n.informer.HasSynced) {
		if ctx.Err() != context.Canceled {
			level.Error(n.logger).Log("msg", "node informer unable to sync cache")
		}
		return
	}

	go func() {
		for n.process(ctx, ch) {
		}
	}()

	// Block until the target provider is explicitly canceled.
	<-ctx.Done()
}

  • 调用node.process方法处理结果
func (n *Node) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
	keyObj, quit := n.queue.Get()
	if quit {
		return false
	}
	defer n.queue.Done(keyObj)
	key := keyObj.(string)

	_, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return true
	}

	o, exists, err := n.store.GetByKey(key)
	if err != nil {
		return true
	}
	if !exists {
		send(ctx, ch, &targetgroup.Group{Source: nodeSourceFromName(name)})
		return true
	}
	node, err := convertToNode(o)
	if err != nil {
		level.Error(n.logger).Log("msg", "converting to Node object failed", "err", err)
		return true
	}
	send(ctx, ch, n.buildNode(node))
	return true
}

  • 内部的send方法是将node结果发往ch,对应就是 discoveryManager.startProvider中的updates

discoveryManager.updater(ctx, p, updates) 处理服务发现的结果

  • 位置 D:\go_path\src\github.com\prometheus\prometheus\discovery\manager.go
func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
	for {
		select {
		case <-ctx.Done():
			return
		case tgs, ok := <-updates:
			receivedUpdates.WithLabelValues(m.name).Inc()
			if !ok {
				level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name)
				return
			}

			for _, s := range p.subs {
				m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
			}

			select {
			case m.triggerSend <- struct{}{}:
			default:
			}
		}
	}
}
  • 调用 更新对象到target map中
func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) {
	m.mtx.Lock()
	defer m.mtx.Unlock()

	if _, ok := m.targets[poolKey]; !ok {
		m.targets[poolKey] = make(map[string]*targetgroup.Group)
	}
	for _, tg := range tgs {
		if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics.
			m.targets[poolKey][tg.Source] = tg
		}
	}
}

updater和sender通过triggerSend通信,告诉 scrapeManager有新的target来了,开始采集

  • 位置 D:\go_path\src\github.com\prometheus\prometheus\cmd\prometheus\main.go
	{
		// Scrape manager.
		g.Add(
			func() error {
				// When the scrape manager receives a new targets list
				// it needs to read a valid config for each job.
				// It depends on the config being in sync with the discovery manager so
				// we wait until the config is fully loaded.
				<-reloadReady.C

				err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
				level.Info(logger).Log("msg", "Scrape manager stopped")
				return err
			},
			func(err error) {
				// Scrape manager needs to be stopped before closing the local TSDB
				// so that it doesn't try to write samples to a closed storage.
				level.Info(logger).Log("msg", "Stopping scrape manager...")
				scrapeManager.Stop()
			},
		)
	}

本节重点总结 :

image.png

  • discovery.Manager服务发现管理员
    • 注册各个服务发现源
    • 启动各个服务发现源
    • 处理服务发现的结果
  • k8s服务发现
    • k8s-client informer机制

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2188844.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ConcurrentHashMap 中的并行性

ConcurrentHashMap 在多线程应用程序中被广泛使用。多线程应用程序的示例包括在线游戏应用程序、聊天应用程序&#xff0c;它为应用程序增加了并发性的好处。为了使应用程序本质上更具并发性&#xff0c;ConcurrentHashMap 引入了一个名为“并行性”的概念。 在本文中&#xf…

飞机导航数据库资料

以上是从网上收集的飞机导航数据库的一些资料。现在放在百度网盘中。 链接&#xff1a;https://pan.baidu.com/s/1fDYuaB0DuyKmYt6C_lXvZQ?pwdkcqj 提取码&#xff1a;kcqj

ZTE RRC重建优化案例

ZTE RRC重建优化案例 随着移动通信网络的不断发展&#xff0c;用户对网络的稳定性和覆盖质量提出了更高的要求。尤其在LTE网络中&#xff0c;RRC&#xff08;Radio Resource Control&#xff09;连接的稳定性直接影响用户体验和业务连续性。然而&#xff0c;在实际网络环境中&a…

案例-表白墙简单实现

文章目录 效果展示初始画面提交内容后画面&#xff08;按键按下&#xff09; 代码区 效果展示 初始画面 提交内容后画面&#xff08;按键按下&#xff09; 代码区 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8">…

C++输⼊输出

1.<iostream> 是 Input Output Stream 的缩写&#xff0c;是标准的输⼊、输出流库&#xff0c;定义了标准的输⼊、输 出对象 2.std::cin 是 istream 类的对象&#xff0c;它主要⾯向窄字符&#xff08;narrow characters (of type char)&#xff09;的标准输 ⼊流。 3…

STL之priority_queue篇——深入剖析C++中优先队列的实现原理、核心特性及其底层机制

文章目录 前言一、补充内容&#xff1a;堆1.1 什么是堆1.2 堆的分类与性质1.3 堆的向下调整算法&#xff08;小根堆&#xff09;实现流程&#xff1a;代码&#xff1a; 1.4 堆的向上调整算法&#xff08;小根堆&#xff09;实现流程&#xff1a;代码&#xff1a; 1.5 数组建堆算…

eclpsexxx

Copyright?2001-2004 International Business Machines Corp. Guidelines Eclipse 用户界面指南 2.1 版 查看目录 作者&#xff1a;Nick Edgar, Kevin Haaland, Jin Li , Kimberley Peter 译者&#xff1a;Bobbie Wang&#xff0c;Qi Liang 最新更新: 2004年2月 注意 您…

kaggle实战2信用卡反欺诈逻辑回归模型案例1

信用卡欺诈案例 数据集下载地址 https://storage.googleapis.com/download.tensorflow.org/data/creditcard.csv 参考不平衡数据的分类 文章目录 只进行特征衍生&#xff0c;未进行数据标准化、上才样处理数据不平衡问题&#xff0c;得到的准确率和召回率居然很高如果不处理数据…

李宏毅 X 苹果书 自注意力机制 学习笔记上

self attention 是一种network架构使用场景&#xff1a;输入一组向量&#xff0c;这组向量的性质&#xff1a;数量有变化&#xff0c;序列长度不一 模型输入 文字处理&#xff1a; 模型输入&#xff1a;句子&#xff08;句子的长度&#xff0c;单词都不一样&#xff09;&am…

qt QMainWindow 自定义标题栏

可以使用setMenuWidget 来将自定义的标题栏 QWidget 设置进去就可以&#xff0c; 用来替代setMenu 菜单栏单一&#xff0c;自定义不高的问题

node_exporter使用textfile collector收集业务数据

上一篇文章讲了使用Pushgateway收集业务数据的方法&#xff0c;今天讲另外一种方式textfile collector The textfile collector is similar to the Pushgateway, in that it allows exporting of statistics from batch jobs. The Pushgateway should be used for service-leve…

解决ModuleNotFoundError: No module named ‘torchcrf‘

运行深度学习程序时候&#xff0c;出现报错&#xff1a;ModuleNotFoundError: No module named torchcrf 将 from torchcrf import CRF 改为 from TorchCRF import CRF

无设计器简单实例

目录 1、界面设计Qt5元对象系统1. **QObject 类**2. **QMetaObject**3. **信号和槽机制**4. **宏&#xff1a;Q_OBJECT**5. **动态属性**6. **反射机制**7. **元对象编译器&#xff08;MOC&#xff09;** 2、完成程序功能 1、界面设计 不点创建界面 在dialog.h中 #ifndef DIA…

树莓派5里使用protobuf

由于现在protobuf越来越复杂了&#xff0c;自己去编译&#xff0c;还是比较麻烦。 比如最新的V28版本&#xff0c;就会要求使用cmake或者bazel来编译了。 如果不要求使用最新的版本&#xff0c;直接使用系统里带的版本也是可以的。 可以进行如下操作&#xff1a; sudo apt …

【算法系列-链表】交换链表节点(反转 + 交换)

【算法系列-链表】交换链表节点(反转 交换) 文章目录 【算法系列-链表】交换链表节点(反转 交换)1. 反转链表1.1 思路分析&#x1f3af;1.2 解题过程&#x1f3ac;1.3 代码示例&#x1f330; 2. 两两交换链表中的节点2.1 思路分析&#x1f3af;2.2 解题过程&#x1f3ac;2.3 …

电器自动化入门08:隔离变压器、行程开关介绍及选型

视频链接&#xff1a;3.4 电工知识&#xff1a;三相交流异步电动机自动往返行程控制及控制变压器选型_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1PJ41117PW?p8&vd_sourceb5775c3a4ea16a5306db9c7c1c1486b5 1.隔离&#xff08;控制&#xff09;变压器 2.行程开…

C++_智能指针详解

什么是智能指针&#xff1f;为什么要有智能指针&#xff1f;到目前为止&#xff0c;我们编写的程序所使用的对象都有着严格定义的生命周期。比如说&#xff0c;全局对象在程序启动时分配&#xff0c;在程序结束时销毁&#xff1b;再比如说局部static对象在第一次使用前分配&…

4.5章节python中的break和continue语句的作用

在Python中&#xff0c;break 和 continue 是两个用于控制循环流程的关键字。它们提供了在特定条件下提前退出循环或跳过当前迭代并进入下一次迭代的机制。 一、break语句 break 语句用于立即终止当前的循环&#xff08;无论是 for 循环还是 while 循环&#xff09;&#xff…

最佳人力资源管理工具,6款热门产品功能对比

文章介绍了ZohoPeople、北森、i人事等六款主流人力资源管理系统&#xff0c;涵盖招聘、培训、考勤等功能&#xff0c;各有特点&#xff0c;适合不同规模企业需求。建议企业试用后选择&#xff0c;提高管理效率。 一、Zoho People Zoho People是一款强大的云端人力资源管理系统…

看Threejs好玩示例,学习创新与技术(LiquidRaymarching)

今天的示例有点超出我的想象&#xff0c;首先会科普下WGSL这种新的着色器脚本&#xff0c;然后说说示例《Liquid Raymarching Scene with Three.js Shading Language | Codrops (tympanus.net)》的技术流程。本示例最终呈现的效果如下。可以看到他跟QQ那个消息拖拽消灭的效果非…