Redis实现服务注册与服务发现源码阅读
背景
近期在看开源项目CloudWeGo中看到目前GoLang微服务框架Hertz中支持通过Redis实现服务注册与服务发现功能。便想着阅读下源码
源码阅读
gut clone了hertz-contrib后看到在一级目录下有目前各种主流的服务注册与发现的实现方案。为了便于学习选择阅读redis
服务注册源码分析
看到redis/example/server/main.go中有服务注册的实现示例
func main() {
r := redis.NewRedisRegistry("127.0.0.1:6379")
addr := "127.0.0.1:8888"
h := server.Default(
server.WithHostPorts(addr),
server.WithRegistry(r, ®istry.Info{
ServiceName: "hertz.test.demo",
Addr: utils.NewNetAddr("tcp", addr),
Weight: 10,
Tags: nil,
}),
)
h.GET("/ping", func(_ context.Context, ctx *app.RequestContext) {
ctx.JSON(consts.StatusOK, utils.H{"ping": "pong"})
})
h.Spin()
}
代码主要逻辑是实现一个简单的webservice,其中用到了服务注册机制。可以看到,在hertz中服务注册可以通过配置engine的形式在webservice初始化时定义,其中
r := redis.NewRedisRegistry("127.0.0.1:6379")
定义了一个服务注册的地址,即要把这个微服务注册到哪个主机中。而server.WithRegistry()使得服务初始化时引入了这个服务注册。Info即是服务注册的相关信息
进入redis/registry.go查看服务注册的定义,可以看到redis服务注册是实现的registry.Registry接口
var _ registry.Registry = (*redisRegistry)(nil)
type redisRegistry struct {
client *redis.Client
rctx *registryContext
mu sync.Mutex
wg sync.WaitGroup
}
type registryContext struct {
ctx context.Context
cancel context.CancelFunc
}
// Registry is extension interface of service registry.
type Registry interface {
Register(info *Info) error
Deregister(info *Info) error
}
// Info is used for registry.
// The fields are just suggested, which is used depends on design.
type Info struct {
// ServiceName will be set in hertz by default
ServiceName string
// Addr will be set in hertz by default
Addr net.Addr
// Weight will be set in hertz by default
Weight int
// extend other infos with Tags.
Tags map[string]string
}
registry.Registry通过Register(info *Info)和Deregister(info *Info)描述服务注册与服务发现
接下来看如何创建一个redis服务注册
// NewRedisRegistry creates a redis registry
func NewRedisRegistry(addr string, opts ...Option) registry.Registry {
redisOpts := &redis.Options{
Addr: addr,
Password: "",
DB: 0,
}
for _, opt := range opts {
opt(redisOpts)
}
rdb := redis.NewClient(redisOpts)
return &redisRegistry{
client: rdb,
}
}
我们已经可以猜到了,配置redis客户端连接User Server的redis,用redis来存储服务映射关系,实现服务注册中心,那么是不是这样呢,我们接着往下看服务注册的实现源码
func (r *redisRegistry) Register(info *registry.Info) error {
// 校验配置信息
if err := validateRegistryInfo(info); err != nil {
return err
}
rctx := registryContext{}
rctx.ctx, rctx.cancel = context.WithCancel(context.Background())
m := newMentor()
r.wg.Add(1)
// 并发监控redis
go m.subscribe(rctx.ctx, info, r)
r.wg.Wait()
rdb := r.client
// 将注册信息hash化
hash, err := prepareRegistryHash(info)
if err != nil {
return err
}
// 上锁
r.mu.Lock()
r.rctx = &rctx
// 注册信息写入到redis,即我们的服务注册中心
rdb.HSet(rctx.ctx, hash.key, hash.field, hash.value)
rdb.Expire(rctx.ctx, hash.key, defaultExpireTime)
// 生成服务相关信息和发送
rdb.Publish(rctx.ctx, hash.key, generateMsg(register, info.ServiceName, info.Addr.String()))
// 写完,解锁
r.mu.Unlock()
go m.monitorTTL(rctx.ctx, hash, info, r)
// 保持长连接
go keepAlive(rctx.ctx, hash, r)
return nil
}
Register方法已经对服务注册的主要流程进行了描述,下面来看一些细节
func validateRegistryInfo(info *registry.Info) error {
if info == nil {
return fmt.Errorf("registry.Info can not be empty")
}
if info.ServiceName == "" {
return fmt.Errorf("registry.Info ServiceName can not be empty")
}
if info.Addr == nil {
return fmt.Errorf("registry.Info Addr can not be empty")
}
return nil
}
校验服务注册时并不会对客户端是否连接上进行校验,只会校验参数和结构体是否为空
func prepareRegistryHash(info *registry.Info) (*registryHash, error) {
meta, err := json.Marshal(convertInfo(info))
if err != nil {
return nil, err
}
return ®istryHash{
key: generateKey(info.ServiceName, server),
field: info.Addr.String(),
value: string(meta),
}, nil
}
服务注册信息hash即生成key-velue,方便写入到redis中
func keepAlive(ctx context.Context, hash *registryHash, r *redisRegistry) {
ticker := time.NewTicker(defaultTickerTime)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.client.Expire(ctx, hash.key, defaultKeepAliveTime)
case <-ctx.Done():
break
}
}
}
最后再起一个协程在生命期内监听保持长连接,这里用到的是多路复用
func keepAlive(ctx context.Context, hash *registryHash, r *redisRegistry) {
ticker := time.NewTicker(defaultTickerTime)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.client.Expire(ctx, hash.key, defaultKeepAliveTime)
case <-ctx.Done():
break
}
}
}
再来看服务注册退出:
func (r *redisRegistry) Deregister(info *registry.Info) error {
if err := validateRegistryInfo(info); err != nil {
return err
}
rctx := r.rctx
rdb := r.client
hash, err := prepareRegistryHash(info)
if err != nil {
return err
}
r.mu.Lock()
// 删除redis中的注册信息
rdb.HDel(rctx.ctx, hash.key, hash.field)
rdb.Publish(rctx.ctx, hash.key, generateMsg(deregister, info.ServiceName, info.Addr.String()))
rctx.cancel()
r.mu.Unlock()
return nil
}
整体逻辑和服务注册相似,只是最后把注册信息删掉
服务发现源码分析
看到redis/example/client/main.go中有服务注册的实现示例
func main() {
cli, err := client.NewClient()
if err != nil {
panic(err)
}
r := redis.NewRedisResolver("127.0.0.1:6379")
cli.Use(sd.Discovery(r))
for i := 0; i < 10; i++ {
status, body, err := cli.Get(context.Background(), nil, "http://hertz.test.demo/ping", config.WithSD(true))
if err != nil {
hlog.Fatal(err)
}
hlog.Infof("HERTZ: code=%d,body=%s", status, string(body))
}
}
config.WithSD(true)即通过中间件形式,使得客户端发送请求时,并非直接请求服务器,而是请求注册中心,通过服务发现再进一步转到服务器上
接前文中在redis里进行了服务注册,这里客户端想要进行服务发现找到自己请求的微服务。这里服务发现还是通过复用接口实现的
var _ discovery.Resolver = (*redisResolver)(nil)
type redisResolver struct {
client *redis.Client
}
// NewRedisResolver creates a redis resolver
func NewRedisResolver(addr string, opts ...Option) discovery.Resolver {
redisOpts := &redis.Options{Addr: addr}
for _, opt := range opts {
opt(redisOpts)
}
rdb := redis.NewClient(redisOpts)
return &redisResolver{
client: rdb,
}
}
服务发现开始和服务注册一样,需要先连接上redis
func (r *redisResolver) Target(_ context.Context, target *discovery.TargetInfo) string {
return target.Host
}
func (r *redisResolver) Resolve(ctx context.Context, desc string) (discovery.Result, error) {
rdb := r.client
// 查询服务列表
fvs := rdb.HGetAll(ctx, generateKey(desc, server)).Val()
var its []discovery.Instance
for f, v := range fvs {
// 反序列化获取服务信息
var ri registryInfo
err := json.Unmarshal([]byte(v), &ri)
if err != nil {
hlog.Warnf("HERTZ: fail to unmarshal with err: %v, ignore instance Addr: %v", err, f)
continue
}
// 负载均衡参数
weight := ri.Weight
if weight <= 0 {
weight = defaultWeight
}
its = append(its, discovery.NewInstance(tcp, ri.Addr, weight, ri.Tags))
}
return discovery.Result{
// 服务发现的结果
CacheKey: desc,//redis表中的key
Instances: its,//服务表
}, nil
}
func (r *redisResolver) Name() string {
return Redis
}
Target、Name、Resolve即为实现自方法的接口,其中target和Name分别解出redis的地址和Name,Resolve方法用来在Redis中发现服务
我们还可以细扣一下,服务发现中间件进一步是怎么实现的?
/pkg/mod/github.com/cloudwego/hertz@v0.6.0/pkg/common/config/request_option.go:58中WithSD如下:
// WithSD set isSD in RequestOptions.
func WithSD(b bool) RequestOption {
return RequestOption{F: func(o *RequestOptions) {
o.isSD = b
}}
}
可见这里是用来高速请求,这个请求是有服务发现机制的。循着client.Get()方法一路往下找,这项配置写入到了req中:
func GetURL(ctx context.Context, dst []byte, url string, c Doer, requestOptions ...config.RequestOption) (statusCode int, body []byte, err error) {
req := protocol.AcquireRequest()
req.SetOptions(requestOptions...)
statusCode, body, err = doRequestFollowRedirectsBuffer(ctx, req, dst, url, c)
protocol.ReleaseRequest(req)
return statusCode, body, err
}
在hertz中的Request定义中其实是包含有config定义,里面就有sd的flag
type Request struct {
noCopy nocopy.NoCopy //lint:ignore U1000 until noCopy is used
Header RequestHeader
uri URI
postArgs Args
bodyStream io.Reader
w requestBodyWriter
body *bytebufferpool.ByteBuffer
bodyRaw []byte
maxKeepBodySize int
multipartForm *multipart.Form
multipartFormBoundary string
// Group bool members in order to reduce Request object size.
parsedURI bool
parsedPostArgs bool
isTLS bool
multipartFiles []*File
multipartFields []*MultipartField
// Request level options, service discovery options etc.
options *config.RequestOptions
}
也就是会从req中解析出服务地址