go-zero的服务发现源码阅读

news2025/1/23 2:01:58

服务发现原理与grpc源码解析_wangxiaoangg的博客-CSDN博客

 

go-zero rpc demo官方文档:rpc编写与调用 | go-zero

目录

一 服务注册

1. 创建rpc服务

2. 启动rpc服务

3. registerEtcd做了什么

4. discov.NewPublisher 服务发布者

二 服务发现

1.定义&注册resolver

2.解析etcd地址&创建链接

3.update方法


一 服务注册

在看rpc服务端服务注册前,可以先看下go-zero的官方的 user rpc服务 demo。

在rpc的配置文件中配置了Etcd信息,以及服务对应的key,如下:user.yaml

Name: user.rpc
ListenOn: 127.0.0.1:8080
Etcd:
  Hosts:
    - $etcdHost
  Key: user.rpc


1. 创建rpc服务


创建rpc服务调用了 zrpc/internal/rpcpubserver.go 中 NewRpcPubServer方法。

该方法返回一个server对象,并将registerEtcd方法注入到该sever。

// NewRpcPubServer returns a Server.
func NewRpcPubServer(etcd discov.EtcdConf, listenOn string, middlewares ServerMiddlewaresConf,
	opts ...ServerOption) (Server, error) {
	registerEtcd := func() error {
		pubListenOn := figureOutListenOn(listenOn)
		var pubOpts []discov.PubOption
		if etcd.HasAccount() {
			pubOpts = append(pubOpts, discov.WithPubEtcdAccount(etcd.User, etcd.Pass))
		}
		if etcd.HasTLS() {
			pubOpts = append(pubOpts, discov.WithPubEtcdTLS(etcd.CertFile, etcd.CertKeyFile,
				etcd.CACertFile, etcd.InsecureSkipVerify))
		}
		if etcd.HasID() {
			pubOpts = append(pubOpts, discov.WithId(etcd.ID))
		}
		pubClient := discov.NewPublisher(etcd.Hosts, etcd.Key, pubListenOn, pubOpts...)
		return pubClient.KeepAlive()
	}
	server := keepAliveServer{
		registerEtcd: registerEtcd,
		Server:       NewRpcServer(listenOn, middlewares, opts...),
	}

	return server, nil
}

2. 启动rpc服务

在启动Server的时候,调用Start方法,在Start方法中会调用registerEtcd进行真正的服务注册。
go-zerozrpc/internal/rpcpubserver.go


type keepAliveServer struct {
	registerEtcd func() error
	Server
}

func (s keepAliveServer) Start(fn RegisterFn) error {
	if err := s.registerEtcd(); err != nil {
		return err
	}

	return s.Server.Start(fn)
}

3. registerEtcd做了什么

	registerEtcd := func() error {
        //解析服务监听的地址
		pubListenOn := figureOutListenOn(listenOn)
		var pubOpts []discov.PubOption
		//etcd的链接方式
        if etcd.HasAccount() {
			pubOpts = append(pubOpts, discov.WithPubEtcdAccount(etcd.User, etcd.Pass))
		}
		if etcd.HasTLS() {
			pubOpts = append(pubOpts, discov.WithPubEtcdTLS(etcd.CertFile, etcd.CertKeyFile,
				etcd.CACertFile, etcd.InsecureSkipVerify))
		}
		if etcd.HasID() {
			pubOpts = append(pubOpts, discov.WithId(etcd.ID))
		}

        //新建puslisher
		pubClient := discov.NewPublisher(etcd.Hosts, etcd.Key, pubListenOn, pubOpts...)
	    //异步etcd 保活
    	return pubClient.KeepAlive()
	}

4. discov.NewPublisher 服务发布者

代码路径core/discov/publisher.go

在KeepAlive方法中,
1.首先创建etcd连接,
2.用register方法进行服务注册。
3.register创建租约,租约默认时间为10秒钟
4.最后通过Put方法进行注册。
5.调用 keepAliveAsync 进行租约的续期,保证服务一直是存活的状态,如果服务异常退出了,那么也就无法进行续期,服务发现也就能自动识别到该服务异常下线了。

// KeepAlive keeps key:value alive.
func (p *Publisher) KeepAlive() error {
	cli, err := p.doRegister()
	if err != nil {
		return err
	}

	proc.AddWrapUpListener(func() {
		p.Stop()
	})

	return p.keepAliveAsync(cli)
}


func (p *Publisher) doRegister() (internal.EtcdClient, error) {
    //链接etcd
	cli, err := internal.GetRegistry().GetConn(p.endpoints)
	if err != nil {
		return nil, err
	}

	p.lease, err = p.register(cli)
	return cli, err
}

func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
    //创建租约

	resp, err := client.Grant(client.Ctx(), TimeToLive)
	if err != nil {
		return clientv3.NoLease, err
	}

	lease := resp.ID
	if p.id > 0 {
		p.fullKey = makeEtcdKey(p.key, p.id)
	} else {
		p.fullKey = makeEtcdKey(p.key, int64(lease))
	}
    //put key 注册
	_, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))

	return lease, err
}


//异步续租 保活
func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
	ch, err := cli.KeepAlive(cli.Ctx(), p.lease)
	if err != nil {
		return err
	}

	threading.GoSafe(func() {
		for {
			select {
			case _, ok := <-ch:
				if !ok {
					p.revoke(cli)
					if err := p.doKeepAlive(); err != nil {
						logx.Errorf("etcd publisher KeepAlive: %s", err.Error())
					}
					return
				}
			case <-p.pauseChan:
				logx.Infof("paused etcd renew, key: %s, value: %s", p.key, p.value)
				p.revoke(cli)
				select {
				case <-p.resumeChan:
					if err := p.doKeepAlive(); err != nil {
						logx.Errorf("etcd publisher KeepAlive: %s", err.Error())
					}
					return
				case <-p.quit.Done():
					return
				}
			case <-p.quit.Done():
				p.revoke(cli)
				return
			}
		}
	})

	return nil
}

二 服务发现

前面的已经介绍了,rpc服务启动时候是如何将服务注册到etcd中的。

在rpc的服务调用方 配置服务提供方的Etcd信息,以及服务对应的key,如下:user.yaml

Name: search-api
Host: 0.0.0.0
Port: 8889
Auth:
  AccessSecret: $AccessSecret
  AccessExpire: $AccessExpire
UserRpc:
  Etcd:
    Hosts:
      - $etcdHost
    Key: user.rpc

1.定义&注册resolver

go-zero的服务发现是在客户端实现的。在创建zRPC客户端的时候,通过init方法进行了自定义Resolver的注册。

go-zero/zrpc/internal/client.go

func init() {
	resolver.Register()
}

 zrpc/resolver/internal/resolver.go

// RegisterResolver registers the direct and discov schemes to the resolver.
func RegisterResolver() {
	resolver.Register(&directResolverBuilder)
	resolver.Register(&discovResolverBuilder)
	resolver.Register(&etcdResolverBuilder)
	resolver.Register(&k8sResolverBuilder)
}

gozero注册了四个revlover builder 这里我们只看etcd reslover。

2.解析etcd地址&创建链接

首先从target中解析出etcd的地址,和服务对应的key。然后创建etcd连接,接着执行update方法,在update方法中,通过调用cc.UpdateState方法进行服务状态的更新。 

zrpc/resolver/internal/discovbuilder.go

func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (
	resolver.Resolver, error) {
	hosts := strings.FieldsFunc(targets.GetAuthority(target), func(r rune) bool {
		return r == EndpointSepChar
	})
	sub, err := discov.NewSubscriber(hosts, targets.GetEndpoints(target))
	if err != nil {
		return nil, err
	}

	update := func() {
		var addrs []resolver.Address
		for _, val := range subset(sub.Values(), subsetSize) {
			addrs = append(addrs, resolver.Address{
				Addr: val,
			})
		}
		if err := cc.UpdateState(resolver.State{
			Addresses: addrs,
		}); err != nil {
			logx.Error(err)
		}
	}
	sub.AddListener(update)
	update()

	return &nopResolver{cc: cc}, nil
}

3.update方法

update方法会被添加到事件监听中,当有PUT和DELETE事件触发,都会调用update方法进行服务状态的更新,事件监听是通过etcd的Watch机制实现,代码如下:

func (c *cluster) watchStream(cli EtcdClient, key string) bool {
    rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
    for {
        select {
        case wresp, ok := <-rch:
            if !ok {
                logx.Error("etcd monitor chan has been closed")
                return false
            }
            if wresp.Canceled {
                logx.Errorf("etcd monitor chan has been canceled, error: %v", wresp.Err())
                return false
            }
            if wresp.Err() != nil {
                logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
                return false
            }

            c.handleWatchEvents(key, wresp.Events)
        case <-c.done:
            return true
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/568445.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何在华为OD机试中获得满分?Java实现【差值数组不同的字符串】一文详解!

✅创作者&#xff1a;陈书予 &#x1f389;个人主页&#xff1a;陈书予的个人主页 &#x1f341;陈书予的个人社区&#xff0c;欢迎你的加入: 陈书予的社区 &#x1f31f;专栏地址: Java华为OD机试真题&#xff08;2022&2023) 文章目录 1. 题目描述2. 输入描述3. 输出描述…

本地vue搭建的web网站项目app如何发布到互联网?

对于非专业人来说&#xff0c;提到 Vue并不熟悉。Vue 是一款用于构建用户界面的 JavaScript 框架。它基于标准 HTML、CSS 和 JavaScript 构建。 简单来说&#xff0c;Vue是干什么用的呢&#xff1f;Vue通过提供了一套声明式的、组件化的编程模型&#xff0c;帮助你高效地开发用…

七、SpringBoot从入门到精通

一、SpringBoot概述 Spring Boot是一个基于Spring框架的开发框架&#xff0c;用于快速构建能够立即运行的生产级Spring应用程序。它是Spring的一个子项目&#xff0c;致力于使Spring开发更加简单、快速和便捷。 二、SpringBoot基础程序 1、点击Spring Initializer&#xff0…

苹果MR头显背后的秘密武器,ARKit演进历程一览

苹果MR头显可以说是板上钉钉了&#xff0c;看了这么多的预测&#xff0c;几乎没有从ARKit的角度来看苹果头显的&#xff0c;因此本文我们将回顾ARKit近7年来的发展历程。 实际上&#xff0c;ARKit 3.5开始支持LiDAR&#xff08;首个搭载的设备是iPad Pro 2020&#xff09;就是…

信号隔离器在水处理控制系统的应用

摘要&#xff1a;水处理控制系统中&#xff0c;其控制、监测模块的非电量模拟量传感器采用信号隔离器的接线方式合理地解决了相关模拟量传感器供电电源安全和相对独立的问题&#xff0c;保证了监测模块的电源、模拟量采集模块和输出模块的相对隔离&#xff0c;降低了可能会造成…

我想成为一名黑客

什么是黑客&#xff1f; 首先我们需要知道什么是黑客呢&#xff1f;黑客最初是指水平高超的电脑专家&#xff0c;而骇客就是我们常见的爱搞破坏的家伙。 黑客和骇客有什么区别呢&#xff1f; 如果黑客是制造炸弹的专家&#xff0c;那骇客就是拿着炸弹到处乱炸的坏蛋。 怎么…

如何在华为OD机试中获得满分?Java实现【合并 K 个升序链表】一文详解!

✅创作者&#xff1a;陈书予 &#x1f389;个人主页&#xff1a;陈书予的个人主页 &#x1f341;陈书予的个人社区&#xff0c;欢迎你的加入: 陈书予的社区 &#x1f31f;专栏地址: Java华为OD机试真题&#xff08;2022&2023) 文章目录 1. 题目描述2. 输入描述3. 输出描述…

【JAVA进阶】异常处理

&#x1f4c3;个人主页&#xff1a;个人主页 &#x1f525;系列专栏&#xff1a;JAVASE基础 目录 1.异常概述、体系 2.常见运行时异常 3.常见编译时异常 4.异常的默认处理流程 5.编译时异常的处理机制 6.运行时异常的处理机制 7.finally 8.自定义异常 1.异常概述、体系…

VUE A页面跳转到B页面带参数,且每次点击跳转,数据刷新

这里写目录标题 一、描述二、VUE带参数页面跳转跳转方式&#xff1a;1&#xff09;标签<router-link>2&#xff09;js操作跳转 三、B页面实现每次点击参数变化&#xff0c;数据刷新解决方案&#xff1a;1&#xff09;去除缓存。2&#xff09;watch监听router 背景&#x…

香港:禁止中国内地参与虚拟资产交易!散户不可交易稳定币,放开不意味着放松!

前天&#xff0c;香港证监会&#xff08;SFC&#xff09;公布了对虚拟资产交易的开放态度和监管思路&#xff0c;宣布《适用于虚拟资产交易平台营运者的指引》&#xff08;下简称《指引》&#xff09;将于2023年6月1日生效。 虽然SFC对于公众提出的建议给予了很完整的回应&…

领导下发紧急且风险大的任务,如何处理?

在遇到这种无法拒绝&#xff0c;明显很难按时交付的紧急任务时&#xff0c;项目经理处理的关键&#xff1a; 1、降低关键干系人期望值 降低关键干系人的期望值&#xff0c;是项目管理非常重要的一门艺术&#xff0c;也是让干系人满意&#xff0c;便于与关系人沟通的关键。 在项…

多地住建局推广工程资料电子化,帮助工程企业“降本增效”

工程资料签署和管理是每个在建工程绕不开的课题&#xff0c;庞大的签署量、动则几十万的签署成本如何优化&#xff1b;有关部门的合规审查如何过关…纸质工程资料需要面对的难题还有很多&#xff1a; 麻烦&#xff1a;从工程立项申报、审批、设计、施工到验收等全过程中产生的大…

7-事务

目录 1.什么是事务&#xff1f; 2.为什么用事务&#xff1f; 3.事务怎么用&#xff1f; 4.事务的四大特性&#xff08;ACID&#xff09; ①原子性&#xff08;Atomicity&#xff0c;又称不可分割性&#xff09; ②一致性&#xff08;Consistency&#xff09; ③隔离性&a…

文本转语音怎么转?教你三招轻松搞定

近年来&#xff0c;人工智能技术飞速发展&#xff0c;语音合成技术 (TTS) 已经被广泛应用于各种应用场景中。在日常生活中&#xff0c;人们经常需要阅读长篇文章、新闻报道、科技论文等&#xff0c;但传统的阅读方式不仅效率低下&#xff0c;而且容易让人感到疲劳。随着语音合成…

品牌需要来看看这个UP主眼里的“她困境”

连续三次发布带货视频&#xff0c;但却仍然涨粉3w。 5月16日&#xff0c;时尚区UP主鹦鹉梨在B站发布作品《漫画胸能不能走出现实啊&#xff1f;&#xff1f;&#xff1f;&#xff1f;》&#xff0c;视频中UP主指出现在女性在生活中的一大困境&#xff0c;当下互联网上的审美一…

【突发奇想 之 vector使用时性能测试】

目录&#xff1a; 前言分析vector不同操作对时间的影响1.for循环中使用 size()成员函数2.初始化时初始化为0&#xff0c;与其他值3.vector分配容量问题4. vector赋值操作5. 遍历&#xff1a;下标和迭代器 总结 前言 打怪升级&#xff1a;第90天 分析vector不同操作对时间的影响…

2023百城巡展|首站北京迎来新老朋友,百家聚势共拓数安蓝海

“新起点 新战略 共赢数安蓝海” 2023年4月 在首届渠道高峰论坛上 美创通过一系列革新之举 传递了坚定渠道化战略的决心 2023年5月 步履不停&#xff0c;加速渠道战略下沉 与全国各地伙伴更深入沟通&#xff0c;互信赋能 美创2023百城巡展正式启航 5月23日&#xff0c;美…

python绘制带置信区间的折线图

本文目录 一、数据准备二、添加置信区间三、完整代码四、运行结果五、python绘图往期系列文章目录 在统计学和数据分析领域中&#xff0c;我们常常需要比较两个或多个样本数据之间的差异。而带置信区间的折线图则是一种直观且常用的展示数据差异的方式。在这篇文章中&#xff0…

xml报文转Java实体

公众号推广: 目前CSDN进行VIP可见,文章可在微信公众号进行免费的阅读。 文章内容经过认证实践,比较的清晰易懂,适合初次接触的人员。 请关注微信公众号:菜鸟编程踩坑之路,进入公众号搜索关键词 xml转实体 需求场景: 因为需要对接一些比较老的系统接口,他们的请求方…

AI生成和修改音频音乐类工具网站集合

AI时代&#xff0c;有最强问答ChatGPT,有文本生成图像的Stable Diffusion,Midjourney&#xff0c;当然也少不了AI生成和修改音频的各种工具&#xff0c;我们整理出其中最具影响力的&#xff0c;并且可能最用得上的一些收录到 AI生成和修改音频类工具网站集合​http://www.webhu…