RPC教程 7.服务发现与注册中心

news2024/12/26 11:23:55

0.前言

这一节的内容只能解决只有一个服务的情况。要是有多个服务(即是多个结构体)这种就解决不了,也即是没有服务ip地址和服务实例的映射关系。

1.为什么需要注册中心

在上一节中,客户端想要找到服务实例的ip,需要硬编码把ip写到代码中。这时可能会出问题,要是该服务实例ip改变了呢,该服务实例下线宕机了呢?这时如何是好。

// 调用单个服务实例
func clientCall(addr1, addr2 string) {
	d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})
	xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
	defer xc.Close()
	//省略其他......
}

这时,注册中心的重要性就出来了。

注册中心主要有三种角色:

  • 服务提供者(RPC Server):在启动时,向 Registry 注册自身服务,并向 Registry 定期发送心跳汇报存活状态。
  • 服务消费者(RPC Client):在启动时,向 Registry 订阅服务,把 Registry 返回的服务节点列表缓存在本地内存中,并与 RPC Sever 建立连接。
  • 服务注册中心(Registry):用于保存 RPC Server 的注册信息,当 RPC Server 节点发生变更时,Registry 会同步变更,RPC Client 感知后会刷新本地 内存中缓存的服务节点列表。

最后,RPC Client 从本地缓存的服务节点列表中,基于负载均衡算法选择一台 RPC Sever 发起调用。

 当然注册中心的功能还有很多,比如配置的动态同步、通知机制等。比较常用的注册中心有 etcd、zookeeper、consul,一般比较出名的微服务或者 RPC 框架,这些主流的注册中心都是支持的。

2.Gee Registry

主流的注册中心 etcd、zookeeper 等功能强大,与这类注册中心的对接代码量是比较大的,需要实现的接口也很多。所以这里我们选择自己实现一个简单的支持心跳保活的注册中心。

GeeRegistry 的代码独立放置在子目录 registry 中。

首先定义 GeeRegistry 结构体,默认超时时间设置为 5 min,也就是说,超过5min没有收到该注册的服务的心跳,即视其为不可用状态。

//registry.go
type ServerItem struct {
	Addr  string
	start time.Time //用于心跳时间计算
}

// GeeRegistry is a simple register center
type GeeRegistry struct {
	timeout time.Duration
	mutex   sync.Mutex //protcect servers
	servers map[string]*ServerItem
}

const (
	defaultPath    = "/_rpc_/registry"
	defaultTimeout = time.Minute * 5
)

func New(timeout time.Duration) *GeeRegistry {
	return &GeeRegistry{
		servers: make(map[string]*ServerItem),
		timeout: timeout,
	}
}

var DefalultGeeRegister = New(defaultTimeout)

然后,为 GeeRegistry 实现添加服务实例和返回服务列表的方法。

  • putServer:添加服务实例,如果服务已经存在,则更新 start。
  • aliveServers:返回可用的服务列表,如果存在超时的服务,则删除。
func (r *GeeRegistry) putServer(addr string) {
	r.mutex.Lock()
	defer r.mutex.Unlock()

	s := r.servers[addr]
	if s == nil {
		r.servers[addr] = &ServerItem{Addr: addr, start: time.Now()}
	} else {
		s.start = time.Now() // if exists, update start time to keep alive
	}
}

func (r *GeeRegistry) aliveServers() []string {
	r.mutex.Lock()
	defer r.mutex.Unlock()

	var alive []string
	for addr, s := range r.servers {
		if r.timeout == 0 || s.start.Add(r.timeout).After(time.Now()) {
			alive = append(alive, addr)
		} else {
			delete(r.servers, addr)
		}
	}
	sort.Strings(alive)
	return alive
}

 为了简单,那么rpc客户端通过HTTP去访问注册中心,且所有的有用信息都承载在 HTTP Header 中。

  • Get:返回所有可用的服务列表,通过自定义字段 X-rpc-Servers 承载。
  • Post:添加服务实例或发送心跳,通过自定义字段 X-rpc-Server 承载。
func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	switch req.Method {
	case "GET":
		w.Header().Set("X-rpc-Servers", strings.Join(r.aliveServers(), ","))
	case "POST":
		addr := req.Header.Get("X-rpc-Servers")
		if addr == "" {
			w.WriteHeader(http.StatusInternalServerError)
			return
		}
		r.putServer(addr) //更新保存在注册中心的服务实例
	default:
		w.WriteHeader(http.StatusMethodNotAllowed)
	}
}

func (r *GeeRegistry) HandleHTTP(registryPath string) {
	http.Handle(registryPath, r)
}

func HandleHTTP() {
	DefalultGeeRegister.HandleHTTP(defaultPath)
}

另外,也要提供 Heartbeat 方法,便于服务启动时定时向注册中心发送心跳(也是通过HTTP),默认周期比注册中心设置的过期时间少 1 min。

// only send once
func sendHeartbeat(registryURL, addr string) error {
	httpClient := &http.Client{Timeout: time.Second * 10}
	req, _ := http.NewRequest("POST", registryURL, nil)
	req.Header.Set("X-rpc-Servers", addr)
	resp, err := httpClient.Do(req)
	if err != nil {
		fmt.Println("rpc server: heart beat err:", err)
		return err
	}
	defer resp.Body.Close()
	return nil
}

// Heartbeat send a heartbeat message every once in a while
func Heartbeat(registryURL, addr string, duration time.Duration) {
	if duration == 0 {
		duration = defaultTimeout - time.Duration(1)*time.Minute
	}

	err := sendHeartbeat(registryURL, addr)
	go func() {
		//创建一个定时器
		t := time.NewTicker(duration)
		for err == nil {
			<-t.C
			err = sendHeartbeat(registryURL, addr)
		}
	}()
}

3.需要注册中心的服务发现

上一节我们实现了一个不需要注册中心,服务列表由手工维护的服务发现的结构体MultiServersDiscovery。

而现在我们实现了注册中心,那这一节的服务发现就可以继承上一节的,并添加与注册中心相关的细节

type GeeRegistryDiscovery struct {
	*MultiServerDiscovery
	registryAddr string
	timeout      time.Duration //服务列表的过期时间
	lastUpdate   time.Time
}

const defaultUpdateTimeout = time.Second * 10

func NewGeeRegistryDiscovery(registerAddr string, timeout time.Duration) *GeeRegistryDiscovery {
	if timeout == 0 {
		timeout = defaultUpdateTimeout
	}
	return &GeeRegistryDiscovery{
		MultiServerDiscovery: NewMultiServerDiscovery(make([]string, 0)),
		registryAddr:         registerAddr,
		timeout:              timeout,
	}
}
  • GeeRegistryDiscovery 嵌套了 MultiServersDiscovery,很多能力可以复用。
  • registryAddr 即注册中心的地址
  • timeout 服务列表的过期时间
  • lastUpdate 是代表最后从注册中心更新服务列表的时间,默认 10s 过期,即 10s 之后,需要从注册中心更新新的列表。

实现 Update 和 Refresh 方法,超时重新获取的逻辑在 Refresh 中实现: 

func (d *GeeRegistryDiscovery) Update(servers []string) error {
	d.rwMutex.Lock()
	defer d.rwMutex.Unlock()
	d.servers = servers
	d.lastUpdate = time.Now()
	return nil
}

// 刷新,有了注册中心,在客户端每次获取服务实例时候,需要刷新注册中心的保存的服务实例
func (d *GeeRegistryDiscovery) Refresh() error {
	d.rwMutex.Lock()
	defer d.rwMutex.Unlock()
    //注册中心保存的服务实例还没超时,不用更新
	if d.lastUpdate.Add(d.timeout).After(time.Now()) {
		return nil
	}
	httpClient := http.Client{Timeout: time.Second * 10} //http客户端最好有个超时
	resp, err := httpClient.Get(d.registryAddr)
	if err != nil {
		fmt.Println("rpc registry refresh err:", err)
		return err
	}

	defer resp.Body.Close()
	servers := strings.Split(resp.Header.Get("X-rpc-Servers"), ",")
	d.servers = make([]string, 0, len(servers))
	for _, server := range servers {
		//返回一个string类型,并将最前面和最后面的ASCII定义的空格去掉,中间的空格不会去掉
		s := strings.TrimSpace(server)
		if s != "" {
			d.servers = append(d.servers, s)
		}
	}

	d.lastUpdate = time.Now()
	return nil
}

 Get 和 GetAll 与 MultiServersDiscovery 相似,唯一的不同在于,GeeRegistryDiscovery 需要先调用 Refresh 确保服务列表没有过期

func (d *GeeRegistryDiscovery) Get(mode SelectMode) (string, error) {
	if err := d.Refresh(); err != nil {
		return "", err
	}
	//d.Get(mode) 表示调用的是(GeeRegistryDiscovery).Get
	return d.MultiServerDiscovery.Get(mode) //d.MultiServerDiscovery是调用MultiServerDiscovery的Get()
}

func (d *GeeRegistryDiscovery) GetAll() ([]string, error) {
	if err := d.Refresh(); err != nil {
		return nil, err
	}
	return d.MultiServerDiscovery.GetAll()
}

4.测试

添加函数 startRegistry,之后需要稍微修改 startServer,定期向注册中心发送心跳保活(Heartbeat)。

这里使用sync.WaitGroup是为了等待该操作执行完毕才会往后执行,因为这些函数都是新开协程运行。

func startServer(registryAddr string, wg *sync.WaitGroup) {
	var myServie My

	l, _ := net.Listen("tcp", "localhost:0") //端口是0表示端口随机
	server := geerpc.NewServer()
	//这里一定要用&myServie,因为前面Sum方法的接受者是*My;若接受者是My,myServie或者&myServie都可以
	server.Register(&myServie)
	registry.Heartbeat(registryAddr, "tcp@"+l.Addr().String(), 0)  //定时发送心跳
	wg.Done()
	server.Accept(l)
}

func startRegistry(wg *sync.WaitGroup) {
	l, _ := net.Listen("tcp", "localhost:9999")
	registry.HandleHTTP()
	wg.Done()
	http.Serve(l, nil)
}

接下来,将 call 和 broadcast 的 MultiServersDiscovery 替换为 GeeRegistryDiscovery,不再需要硬编码服务列表。 

这里就重点对比下NewGeeRegistryDiscovery方法和之前的不同之处。

// 调用单个服务实例
func clientCall(registryAddr string) {
	// d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})
	d := xclient.NewGeeRegistryDiscovery(registryAddr, 0)
	xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
	defer xc.Close()

	var wg sync.WaitGroup

	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			var reply int = 1324
			if err := xc.Call(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {
				log.Println("call Foo.Sum error:", err)
			}
			fmt.Println("reply: ", reply)
		}(i)
	}
	wg.Wait()
}

func broadcast(registryAddr string) {
	// d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})
	d := xclient.NewGeeRegistryDiscovery(registryAddr, 0)
	xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
	defer xc.Close()

	var wg sync.WaitGroup

	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			var reply int = 1324
			if err := xc.Broadcast(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {
				fmt.Println("Broadcast call Foo.Sum error:", err)
			}
			fmt.Println("Broadcast reply: ", reply)

			ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
			defer cancel()
			var replyTimeout int = 1324
			if err := xc.Broadcast(ctx, "My.Sleep", &Args{Num1: i, Num2: i * i}, &replyTimeout); err != nil {
				fmt.Println("Broadcast call Foo.Sum error:", err)
			}
			fmt.Println("timeout Broadcast reply: ", replyTimeout)

		}(i)
	}
	wg.Wait()
}

最后是main函数。

确保注册中心启动后,再启动 RPC 服务端,最后客户端远程调用。

func main() {
	registryAddr := "http://localhost:9999/_rpc_/registry"

	var wg sync.WaitGroup
	wg.Add(1)
	go startRegistry(&wg) //开启注册中心服务
	wg.Wait()
	time.Sleep(time.Second)

	wg.Add(2)
	go startServer(registryAddr, &wg)
	go startServer(registryAddr, &wg)
	wg.Wait()

	time.Sleep(time.Second)
	clientCall(registryAddr)
	broadcast(registryAddr)
}

运行结果:

代码: https://github.com/liwook/Go-projects/tree/main/geerpc/7-registry

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1415847.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

永磁直驱式风力发电虚拟同步机仿真模型Matlab/Simulink模型

很久没有分享虚拟同步机控制相关的方向了&#xff0c;主要是因为硕士之后&#xff0c;也就没再继续深入研究这个课题了&#xff0c;更多的是在电科院的项目里会接触。这个课题方向其实作为硕士毕业课题还是够用的&#xff0c;相对来说也是比较容易毕业的&#xff0c;因为涉及的…

x-cmd pkg | go - Google 开发的开源编程语言

目录 简介首次用户技术特点竞品分析编译型语言解释型语言JavaWebAssebmly 进一步阅读 简介 Go 语言&#xff08;或 Golang&#xff09;是 Google 开发的开源编程语言&#xff0c;诞生于 2006 年。其设计目标是“兼具 Python 等动态语言的开发速度和 C/C 等编译型语言的性能与安…

Nodejs前端学习Day3_准备工作

妈的&#xff0c;这几天真tm冷&#xff0c;前天上午还下了一整天的雪&#xff0c;大雪 文章目录 前言一、Node.js简介1.1何为1.2有什么 二、Node.js可以做什么三、学习路线四、下载nodejs4.1小坑记录4.2LTS和Current版本的不同 五、什么是终端六、在nodejs中执行js代码七、powe…

Python 中的 strip 函数用法,你真的学会了吗?

Python 提供了大量内置函数&#xff0c;使编程变得更加简单。strip 就是这样一个函数。在本文中&#xff0c;我们将探讨 strip 是什么、为什么有用以及如何有效地使用它。 什么是 strip strip 是一种内置方法&#xff0c;用于删除字符串中的前导字符和尾随字符。这些字符可以…

MYSQL库和表的操作(修改字符集和校验规则,备份和恢复数据库及库和表的增删改查)

文章目录 一、MSYQL库的操作1.连接MYSQL2.查看当前数据库3.创建数据库4.字符集和校验规则5.修改数据库6.删除数据库7.备份和恢复8.查看连接 二、表的操作1.创建表2.查看表结构3.修改表4.删除表 一、MSYQL库的操作 1.连接MYSQL 我们使用下面的语句来连接MSYQL&#xff1a; my…

太阳能 LED 恒流电源 升降压原理图 AP9193 大功率升压恒流IC

特别 宽输入电压范围&#xff1a;3.6V~100V 高效率&#xff1a;可高达 95% 工作频率&#xff1a;1MHz CS 限流保护电压&#xff1a;250mV FB 电流采样电压&#xff1a;250mV 芯片供电欠压保护&#xff1a;2.5V 关断时间可调 外置频率补偿脚 应用领域 LED 灯杯 电池供…

docker容器运维命令

文章目录 docker psdocker execdocker inspectdocker topdocker attachdocker waitdocker exportdocker importdocker portdocker cpdocker diffdocker renamedocker statsdocker update总结 docker ps 列出容器。 docker ps [OPTIONS]OPTIONS说明&#xff1a; -a :显示所有的…

FPGA 通过 UDP 以太网传输 JPEG 压缩图片

FPGA 通过 UDP 以太网传输 JPEG 压缩图片 简介 在 FPGA 上实现了 JPEG 压缩和 UDP 以太网传输。从摄像机的输入中获取单个灰度帧&#xff0c;使用 JPEG 标准对其进行压缩&#xff0c;然后通过UDP以太网将其传输到另一个设备&#xff08;例如计算机&#xff09;&#xff0c;所有…

计算方法实验1:熟悉MATLAB 环境

一、问题描述 熟悉MATLAB 环境。 二、实验目的 了解Matlab 的主要功能&#xff0c;熟悉Matlab 命令窗口及文件管理&#xff0c;Matlab 帮助系统。掌握命令行的输入及编辑&#xff0c;用户目录及搜索路径的配置。了解Matlab 数据的特点&#xff0c;熟悉Matlab 变量的命名规则&a…

如何独立思考?这里有一份全指南

知乎上有一个问题&#xff0c;叫做&#xff1a;为什么我们要独立思考&#xff1f; 排名第一的回答&#xff0c;是凤凰前主笔王路写的&#xff0c;很有意思。他说&#xff1a; 因为别人告诉我们要独立思考。 这个回答非常妙&#xff0c;也非常反讽&#xff0c;有一种「第22条军规…

32GPIO输入&按键控制LED&光敏控制蜂鸣器

一.硬件 光线越强&#xff0c;光敏电阻的阻值越小 温度越高&#xff0c;热敏电阻的阻值就越小 红外光线越强&#xff0c;红外接收管的阻值就越小 类比&#xff1a;电阻阻值越小&#xff0c;上拉或下拉就越强 &#xff08;弹簧的拉力就越强&#xff09; 在上下的电阻分压下&a…

Android Studio 提示Use app:drawableStartCompat instead of android:drawableStart

每次提交代码时&#xff0c;AS这个老妈子总爱唠叨一堆warning&#xff0c;这些Warning都在讲什么&#xff1f; 1.Use app:drawableStartCompat instead of android:drawableStart 在Android开发中&#xff0c;android:drawableStart和app:drawableStartCompat是两个用于设置…

Linux下的进程操作

进程概念 ps -elf&#xff1a;查看操作系统的所有进程&#xff08;Linux命令&#xff09; ctrl z&#xff1a;把进程切换到后台 crtl c&#xff1a;结束进程 fg&#xff1a;把进程切换到前台 获取进程进程号和父进程号 函数原型&#xff1a; pid_t getpid(void); //pid_t…

2.精确度-机器学习模型性能常用的评估指标

一.精确度的定义 精确度&#xff1a;机器学习领域中一项至关重要的评价指标&#xff0c;其专注于评估模型对正样本的预测准确性。 相对于准确率而言&#xff0c;精确度更为细致&#xff0c;它关注的是模型在将实例预测为正样本的情况下&#xff0c;实际为正样本的比例。换句话…

2024 1.20~1.26周报

一、上周工作 了解注意力机制&#xff0c;开始论文的初步阅读 二、本周计划 简单了解transform架构&#xff0c;继续研读论文U-MixFormer: UNet-like Transformer with Mix-Attention for Efficient Semantic Segmentation。 三、完成情况——论文研读 标题&#xff1a;U-Mi…

Android双指缩放ScaleGestureDetector检测放大因子大图移动到双指中心点ImageView区域中心,Kotlin(2)

Android双指缩放ScaleGestureDetector检测放大因子大图移动到双指中心点ImageView区域中心&#xff0c;Kotlin&#xff08;2&#xff09; 在 Android ScaleGestureDetector检测双指缩放Bitmap基于Matrix动画移动到双指捏合中心点ImageView区域中心&#xff0c;Kotlin-CSDN博客 …

12.Elasticsearch应用(十二)

Elasticsearch应用&#xff08;十二&#xff09; 1.单机ES面临的问题 海量数据存储问题单点故障问题 2.ES集群如何解决上面的问题 海量数据存储解决问题&#xff1a; 将索引库从逻辑上拆分为N个分片&#xff08;Shard&#xff09;&#xff0c;存储到多个节点单点故障问题&a…

自动求导与可微分编程

1.张量的自动求导 1.1 自动求导机制 张量的自动求导机制是现代深度学习框架&#xff08;如PyTorch和TensorFlow&#xff09;的核心功能之一&#xff0c;它允许开发者在无需手动计算梯度的情况下&#xff0c;自动获得神经网络中所有参数相对于损失函数的梯度。以下是这一机制的…

Hadoop集群部署流程

前置要求 需要3台虚拟机&#xff0c;系统为Centos7&#xff0c;分别host命名为node1&#xff0c;node2&#xff0c;node3&#xff0c;密码均为root请确保这三台虚拟机已经完成了JDK、SSH免密、关闭防火墙、配置主机名映射等前置操作 在3台虚拟机的/etc/hosts文件中&#xff0…

天津大数据培训班推荐,数据分析过程的常见错误

大数据”是近年来IT行业的热词&#xff0c;目前已经广泛应用在各个行业。大数据&#xff0c;又称海量信息&#xff0c;特点是数据量大、种类多、实时性强、数据蕴藏的价值大。大数据是对大量、动态、能持续的数据&#xff0c;通过运用分析、挖掘和整理&#xff0c;实现数据信息…