Redis实现服务注册与服务发现源码阅读(Go语言)

news2025/1/17 4:50:24

Redis实现服务注册与服务发现源码阅读

背景

近期在看开源项目CloudWeGo中看到目前GoLang微服务框架Hertz中支持通过Redis实现服务注册与服务发现功能。便想着阅读下源码

源码阅读

gut clone了hertz-contrib后看到在一级目录下有目前各种主流的服务注册与发现的实现方案。为了便于学习选择阅读redis
在这里插入图片描述

服务注册源码分析

看到redis/example/server/main.go中有服务注册的实现示例

func main() {
	r := redis.NewRedisRegistry("127.0.0.1:6379")
	addr := "127.0.0.1:8888"
	h := server.Default(
		server.WithHostPorts(addr),
		server.WithRegistry(r, &registry.Info{
			ServiceName: "hertz.test.demo",
			Addr:        utils.NewNetAddr("tcp", addr),
			Weight:      10,
			Tags:        nil,
		}),
	)
	h.GET("/ping", func(_ context.Context, ctx *app.RequestContext) {
		ctx.JSON(consts.StatusOK, utils.H{"ping": "pong"})
	})
	h.Spin()
}

代码主要逻辑是实现一个简单的webservice,其中用到了服务注册机制。可以看到,在hertz中服务注册可以通过配置engine的形式在webservice初始化时定义,其中

r := redis.NewRedisRegistry("127.0.0.1:6379")

定义了一个服务注册的地址,即要把这个微服务注册到哪个主机中。而server.WithRegistry()使得服务初始化时引入了这个服务注册。Info即是服务注册的相关信息
进入redis/registry.go查看服务注册的定义,可以看到redis服务注册是实现的registry.Registry接口

var _ registry.Registry = (*redisRegistry)(nil)

type redisRegistry struct {
	client *redis.Client
	rctx   *registryContext
	mu     sync.Mutex
	wg     sync.WaitGroup
}

type registryContext struct {
	ctx    context.Context
	cancel context.CancelFunc
}

// Registry is extension interface of service registry.
type Registry interface {
	Register(info *Info) error
	Deregister(info *Info) error
}

// Info is used for registry.
// The fields are just suggested, which is used depends on design.
type Info struct {
	// ServiceName will be set in hertz by default
	ServiceName string
	// Addr will be set in hertz by default
	Addr net.Addr
	// Weight will be set in hertz by default
	Weight int
	// extend other infos with Tags.
	Tags map[string]string
}

registry.Registry通过Register(info *Info)和Deregister(info *Info)描述服务注册与服务发现
接下来看如何创建一个redis服务注册

// NewRedisRegistry creates a redis registry
func NewRedisRegistry(addr string, opts ...Option) registry.Registry {
	redisOpts := &redis.Options{
		Addr:     addr,
		Password: "",
		DB:       0,
	}
	for _, opt := range opts {
		opt(redisOpts)
	}
	rdb := redis.NewClient(redisOpts)
	return &redisRegistry{
		client: rdb,
	}
}

我们已经可以猜到了,配置redis客户端连接User Server的redis,用redis来存储服务映射关系,实现服务注册中心,那么是不是这样呢,我们接着往下看服务注册的实现源码

func (r *redisRegistry) Register(info *registry.Info) error {
  // 校验配置信息
	if err := validateRegistryInfo(info); err != nil {
		return err
	}
	rctx := registryContext{}
	rctx.ctx, rctx.cancel = context.WithCancel(context.Background())
	m := newMentor()
	r.wg.Add(1)
	// 并发监控redis
	go m.subscribe(rctx.ctx, info, r)
	r.wg.Wait()
	rdb := r.client
	// 将注册信息hash化
	hash, err := prepareRegistryHash(info)
	if err != nil {
		return err
	}
	// 上锁
	r.mu.Lock()
	r.rctx = &rctx
	// 注册信息写入到redis,即我们的服务注册中心
	rdb.HSet(rctx.ctx, hash.key, hash.field, hash.value)
	rdb.Expire(rctx.ctx, hash.key, defaultExpireTime)
	// 生成服务相关信息和发送
	rdb.Publish(rctx.ctx, hash.key, generateMsg(register, info.ServiceName, info.Addr.String()))
	// 写完,解锁
	r.mu.Unlock()
	go m.monitorTTL(rctx.ctx, hash, info, r)
	// 保持长连接
	go keepAlive(rctx.ctx, hash, r)
	return nil
}

Register方法已经对服务注册的主要流程进行了描述,下面来看一些细节

func validateRegistryInfo(info *registry.Info) error {
	if info == nil {
		return fmt.Errorf("registry.Info can not be empty")
	}
	if info.ServiceName == "" {
		return fmt.Errorf("registry.Info ServiceName can not be empty")
	}
	if info.Addr == nil {
		return fmt.Errorf("registry.Info Addr can not be empty")
	}
	return nil
}

校验服务注册时并不会对客户端是否连接上进行校验,只会校验参数和结构体是否为空

func prepareRegistryHash(info *registry.Info) (*registryHash, error) {
	meta, err := json.Marshal(convertInfo(info))
	if err != nil {
		return nil, err
	}
	return &registryHash{
		key:   generateKey(info.ServiceName, server),
		field: info.Addr.String(),
		value: string(meta),
	}, nil
}

服务注册信息hash即生成key-velue,方便写入到redis中

func keepAlive(ctx context.Context, hash *registryHash, r *redisRegistry) {
	ticker := time.NewTicker(defaultTickerTime)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			r.client.Expire(ctx, hash.key, defaultKeepAliveTime)
		case <-ctx.Done():
			break
		}
	}
}

最后再起一个协程在生命期内监听保持长连接,这里用到的是多路复用

func keepAlive(ctx context.Context, hash *registryHash, r *redisRegistry) {
	ticker := time.NewTicker(defaultTickerTime)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			r.client.Expire(ctx, hash.key, defaultKeepAliveTime)
		case <-ctx.Done():
			break
		}
	}
}

再来看服务注册退出:

func (r *redisRegistry) Deregister(info *registry.Info) error {
	if err := validateRegistryInfo(info); err != nil {
		return err
	}
	rctx := r.rctx
	rdb := r.client
	hash, err := prepareRegistryHash(info)
	if err != nil {
		return err
	}
	r.mu.Lock()
	// 删除redis中的注册信息
	rdb.HDel(rctx.ctx, hash.key, hash.field)
	rdb.Publish(rctx.ctx, hash.key, generateMsg(deregister, info.ServiceName, info.Addr.String()))
	rctx.cancel()
	r.mu.Unlock()
	return nil
}

整体逻辑和服务注册相似,只是最后把注册信息删掉

服务发现源码分析

看到redis/example/client/main.go中有服务注册的实现示例

func main() {
	cli, err := client.NewClient()
	if err != nil {
		panic(err)
	}
	r := redis.NewRedisResolver("127.0.0.1:6379")
	cli.Use(sd.Discovery(r))
	for i := 0; i < 10; i++ {
		status, body, err := cli.Get(context.Background(), nil, "http://hertz.test.demo/ping", config.WithSD(true))
		if err != nil {
			hlog.Fatal(err)
		}
		hlog.Infof("HERTZ: code=%d,body=%s", status, string(body))
	}
}

config.WithSD(true)即通过中间件形式,使得客户端发送请求时,并非直接请求服务器,而是请求注册中心,通过服务发现再进一步转到服务器上
接前文中在redis里进行了服务注册,这里客户端想要进行服务发现找到自己请求的微服务。这里服务发现还是通过复用接口实现的

var _ discovery.Resolver = (*redisResolver)(nil)

type redisResolver struct {
	client *redis.Client
}

// NewRedisResolver creates a redis resolver
func NewRedisResolver(addr string, opts ...Option) discovery.Resolver {
	redisOpts := &redis.Options{Addr: addr}
	for _, opt := range opts {
		opt(redisOpts)
	}
	rdb := redis.NewClient(redisOpts)
	return &redisResolver{
		client: rdb,
	}
}

服务发现开始和服务注册一样,需要先连接上redis

func (r *redisResolver) Target(_ context.Context, target *discovery.TargetInfo) string {
	return target.Host
}

func (r *redisResolver) Resolve(ctx context.Context, desc string) (discovery.Result, error) {
	rdb := r.client
	// 查询服务列表
	fvs := rdb.HGetAll(ctx, generateKey(desc, server)).Val()
	var its []discovery.Instance
	for f, v := range fvs {
	  // 反序列化获取服务信息
		var ri registryInfo
		err := json.Unmarshal([]byte(v), &ri)
		if err != nil {
			hlog.Warnf("HERTZ: fail to unmarshal with err: %v, ignore instance Addr: %v", err, f)
			continue
		}
		// 负载均衡参数
		weight := ri.Weight
		if weight <= 0 {
			weight = defaultWeight
		}
		its = append(its, discovery.NewInstance(tcp, ri.Addr, weight, ri.Tags))
	}
	return discovery.Result{
	  // 服务发现的结果
		CacheKey:  desc,//redis表中的key
		Instances: its,//服务表
	}, nil
}

func (r *redisResolver) Name() string {
	return Redis
}

Target、Name、Resolve即为实现自方法的接口,其中target和Name分别解出redis的地址和Name,Resolve方法用来在Redis中发现服务
我们还可以细扣一下,服务发现中间件进一步是怎么实现的?
/pkg/mod/github.com/cloudwego/hertz@v0.6.0/pkg/common/config/request_option.go:58中WithSD如下:

// WithSD set isSD in RequestOptions.
func WithSD(b bool) RequestOption {
	return RequestOption{F: func(o *RequestOptions) {
		o.isSD = b
	}}
}

可见这里是用来高速请求,这个请求是有服务发现机制的。循着client.Get()方法一路往下找,这项配置写入到了req中:

func GetURL(ctx context.Context, dst []byte, url string, c Doer, requestOptions ...config.RequestOption) (statusCode int, body []byte, err error) {
	req := protocol.AcquireRequest()
	req.SetOptions(requestOptions...)

	statusCode, body, err = doRequestFollowRedirectsBuffer(ctx, req, dst, url, c)

	protocol.ReleaseRequest(req)
	return statusCode, body, err
}

在hertz中的Request定义中其实是包含有config定义,里面就有sd的flag

type Request struct {
	noCopy nocopy.NoCopy //lint:ignore U1000 until noCopy is used

	Header RequestHeader

	uri      URI
	postArgs Args

	bodyStream      io.Reader
	w               requestBodyWriter
	body            *bytebufferpool.ByteBuffer
	bodyRaw         []byte
	maxKeepBodySize int

	multipartForm         *multipart.Form
	multipartFormBoundary string

	// Group bool members in order to reduce Request object size.
	parsedURI      bool
	parsedPostArgs bool

	isTLS bool

	multipartFiles  []*File
	multipartFields []*MultipartField

	// Request level options, service discovery options etc.
	options *config.RequestOptions
}

也就是会从req中解析出服务地址

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/387282.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构初阶】二叉树顺序结构:堆的实现

前言 前边077带着大家学习了树与二叉树的相关概念&#xff0c;这篇文章我们来实现一个二叉树的顺序结构。 二叉树的顺序结构 普通的二叉树是不适合用数组来存储的&#xff0c;因为可能会存在大量的空间浪费。而完全二叉树更适合使用顺序结构存储。现实中我们通常把堆(一种二叉…

SQL注入基础入门篇 注入思路及常见的SQL注入类型总结

SQL注入基础入门篇1. SQL注入的概念1.1 什么是SQL注入&#xff1f;1.2 注入过程1.3 SQL注入的分类2. 注入思路3. 第一次注入3.1 寻找注入点3.2 构造攻击语句3.2.1 数据出在哪里&#xff1f;3.2.2 怎么有序的获取核心数据&#xff1f;3.2.2.1 基础信息查询3.2.2.2 表名&#xff…

TCP三次握手与四次挥手(一次明白)

TCP基本信息 默认端口号:80 LINUX中TIME_WAIT的默认时间是30s TCP三次握手 三次握手过程:每行代表发起握手到另一方刚刚收到数据包时的状态 客户端服务端客户端状态服务端状态握手前CLOSELISTEN客户端发送带有SYN标志的数据包到服务端一次握手SYN_SENDLISTEN二次握手服务端发送…

Java项目(一些注解、依赖

文章目录常用的几个注解DataAllArgsConstructorNoArgsConstructorSetterGetterEqualsAndHashCodeLog4j/Slf4jMYBatis-Plus常用注解TableNameTableIdTableFieldTableLogicMapperMapperMapperScanpom.xml中加入依赖创建项目常用的几个注解 Data 注在类上&#xff0c;提供类的ge…

cv2.addWeighted 操作 np.array 踩坑记录

cv2.addWeighted函数是把两张图片img1, img2达到融合的效果&#xff0c; 看官网的解释&#xff0c;下图中f0和f1代表两张图片&#xff0c; 用法是这样的 import cv2alpha 0.6 beta (1.0 - alpha)src1 cv2.imread("img1.jpg") src2 cv2.imread("img2.jpg&q…

MQ-2烟雾传感器模块功能实现(STM32)

认识MQ-2模块与其工作原理 MQ-2型烟雾传感器属于二氧化锡半导体气敏材料&#xff0c;属于表面离子式N型半导体。当处于200~300摄氏度时&#xff0c;二氧化锡吸附空气中的氧&#xff0c;形成氧的负离子吸附&#xff0c;使半导体中的电子密度减少&#xff0c;从而使其电阻值增加。…

【C语言复习】C语言中的数组与指针

数组与指针复习写在前面数组和指针指针基础概念进阶知识指针的分类指针和数组笔试题写在前面 数组和指针小节&#xff0c;主要分为以下关键点&#xff1a; 常见指针分类&#xff0c;如指针数组、数组指针、函数指针等。什么是数组/ 指针有关数组和指针的题目数组传参 我们也…

写字楼/园区/购物中心空置率太高?快用快鲸智慧楼宇系统

客户租不租你的写字楼&#xff0c;事关区位、交通、环境、价格、面积、装修等诸多因素&#xff0c;但很多招商部对这些影响客户决策的数据并不重视&#xff0c;在客户初次上门看房时仅简单记录姓名、联系方式、需求面积&#xff0c;对其他核心数据熟视无睹&#xff0c;也为日后…

第十三届蓝桥杯

这里写目录标题一、刷题统计&#xff08;ceil函数返回的是等值于某最小整数的浮点值&#xff0c;不强制转换回int就wa&#xff0c;没错就连和int整数相加都wa二、修剪灌木&#xff08;主要应看清楚会调转方向三、统计子矩阵&#xff08;前缀和滑动窗口⭐&#xff09;四、[积木画…

【算法】笔记:LeetCode 206. 反转链表

文章目录前言思考问题&#xff1a;把分开的节点连在一起结合原题&#xff1a;使用[迭代]解决卡点引入新指针边界条件代码反转的逻辑代码&#xff08;完整答案&#xff09;结合原题&#xff1a;使用[递归]解决卡点完整代码问题的子问题当前层要干什么递归出口前言 这道题可以拆…

冰箱压缩机 方案

压缩机是制冷系统的心脏&#xff0c;它从吸气管吸入低温低压的制冷剂气体&#xff0c;通过电机运转带动活塞对其进行压缩后&#xff0c;向排气管排出高温高压的制冷剂气体&#xff0c;为制冷循环提供动力&#xff0c;从而实现压缩→冷凝→膨胀→蒸发 ( 吸热 ) 的制冷循环。压缩…

C#开发的OpenRA的游戏主界面怎么样创建

通过前面加载界面布局数据,可以把整个界面逻辑的数据加载到内存, 但是这些数据怎么显示出来,又是没有定义的。比如前面定义了多个界面的布局, 又是怎么样知道需要显示哪一个界面? 现在就来解决这个问题,其实整个游戏都是可以通过yaml文件进行配置的, 所以我们需要从yaml…

水果FLStudio21.0.0中文版全能数字音乐工作站DAW

FL Studio 21.0.0官方中文版重磅发布纯正简体中文支持&#xff0c;更快捷的音频剪辑及素材管理器&#xff0c;多样主题随心换&#xff01;Mac版新增对苹果M2/1家族芯片原生支持。编曲、剪辑、录音、混音&#xff0c;20余年的技术积淀和实力研发&#xff0c;FL Studio 已经从电音…

【基础算法】单链表的OJ练习(3) # 移除链表元素 # 相交链表 #

文章目录前言移除链表元素相交链表写在最后前言 本章的OJ练习也是相对简单的&#xff0c;只要能够理解解题的思路&#xff0c;并且依照这个思路能够快速的写出代码&#xff0c;我相信&#xff0c;你的链表水平已经足够了。 对于OJ练习&#xff08;2&#xff09; : ->传送门…

不平凡的一天——

作者&#xff1a;指针不指南吗 专栏&#xff1a;个人日常记录 &#x1f43e;或许会很慢&#xff0c;但是不可以停下来&#x1f43e; 文章目录1.自我介绍2.上学期3.不凡的一天4.新学期写个博客&#xff0c;简单记录一下&#xff0c;新学期加油&#xff01;&#xff01;&#xff…

day7 同步互斥

作业 1.将一个文件中的数据打印到终端上类似cat一个文件&#xff0c;要求如下 &#xff08;1&#xff09;a线程读取文件中的数据 &#xff08;2&#xff09;B线程将A线程读取到的数据打印到终端上 &#xff08;3&#xff09;文件打印完毕后&#xff0c;结束进程 方法1&#…

CMMI流程规范—服务与维护

服务与维护&#xff08;Service and Maintenance, SM&#xff09;是指产品销售之后的客户服务和产品维护。客户服务和产品维护的宗旨就是提高客户对产品以及对开发方的满意度。服务与维护过程域是SPP模型的重要组成部分。本规范阐述了服务与维护过程域的两个主要规程&#xff1…

蓝库云|五大关键引领制造业数字化智慧升级

蓝库云根据《2023制造产业趋势展望》报告&#xff0c;并归纳出「强化企业韧性与敏捷、提升留才诱因、建构多元供应链、兼顾安全的智慧工厂、循环催化永续经营」是牵动制造产业发展的五大关键。将永续目标整合至企业中长期策略中&#xff1b;数字化方面则搭配五大发展关键&#…

【Redis应用】基于Redis实现共享session登录(一)

&#x1f697;Redis应用学习第一站~ &#x1f6a9;本文已收录至专栏&#xff1a;数据库学习之旅 &#x1f44d;希望您能有所收获 &#x1f449;相关推荐&#xff1a;使用短信服务发送手机验证码进行安全校验 一.引入 ​ 在开发项目过程中&#xff0c;我们常常能碰到需要登录注…

Linux操作系统学习(文件IO)

文章目录基础IO系统相关接口文件描述符一切皆文件文件描述符的分配规则重定向fork后的文件描述符基础IO 系统相关接口 在C语言中对文件的操作有fopen打开、fclose关闭、fread读、fwrite写等函数&#xff1b;其实这些都是在系统调用接口上进行的封装。 这里介绍4个系统调用接…