写在前面
其实很少有公司会使用 Redis 来实现服务注册与发现,通常是ETCD、NACOS、ZOOKEEPER等等,但是也不妨碍我们了解。本文会先介绍 Redis 的发布/订阅模式,接着基于这个模式实现服务注册与发现。
Redis发布订阅流程图:
Redis 发布订阅
1. 简介
Redis的发布订阅功能主要由PUBLISH
、SUBSCRIBE
、PSUBSCRIBE
等命令组成的。
通过执行 SUBSCRIBE 命令,客户端可以订阅一个或多个频道,从而成为这个频道的订阅者。
每当有其他客户端向这个被订阅的频道发送消息的时候,频道的所有订阅者都会收到这条消息。
当然,客户端还可以通过PSUBSCRIBE
订阅一个或多个模式,从而成为这些模式的订阅者,也就是模糊匹配
2. 订阅
每当一个客户端执行SUBSCRIBE
命令订阅某个或某些频道的时候,这个客户端与被订阅者之间就会建立起一种订阅关系。而Redis会将这种订阅关系保存到pubsub_channels
这个字典中,这个字典的键是某个被订阅的频道,而值是一个链表,这个链表记录了所有订阅这个频道的客户端
每当有客户端执行了SUBSCRIBE
命令订阅某个或某些频道的时候,服务器都会将客户端与被订阅的频道在 pubsub_channels
字典中进行关联。
3. 退订
如果进行退订UNSUBSCRIBE
,那么服务器会从pubsub_channels
中接触客户端与被退订频道之间的关联。当这个key中,已经没有订阅者,那么会将这个key进行删除。例如下面的client7
4. 发布消息
当一个Redis客户端执行 PUBLISH channel message
命令将消息 message 发送给channel的时候,将消息发送给channel频道的所有订阅者(本文不讨论pattern模式)
服务注册与发现
我们了解完redis的发布订阅流程之后,我们来基于这个发布订阅来实现一个服务注册与发现的功能。
Redis服务发现与注册流程图:
1. 对象定义
redis服务发现与注册的结构体
type RedisRegistryService struct {
config *RedisConfig // the config about redis
cli *redis.Client // client for redis
rwLock *sync.RWMutex // rwLock lock groupList when update service instance
// vgroupMapping to store the cluster group
// eg: map[cluster_name_key]cluster_name
vgroupMapping map[string]string
// groupList store all addresses under this cluster
// eg: map[cluster_name][]{service_instance1,service_instance2...}
groupList map[string][]*ServiceInstance
ctx context.Context
}
订阅的消息内容,为key 以及 value
,而key就是服务的name,value就是服务的具体地址
type NotifyMessage struct {
// key = registry.redis.${cluster}_ip:port
Key string `json:"key"`
Value string `json:"value"`
}
2. 对象加载
新建一个redis服务注册与发现对象,并且在创建的这个对象的时候,我们会做两件事情
- 将redis中所已存在的key都load一次,存到本地缓存中。
- 开启一些协程进行发布订阅,不断监听上游的注册消息
func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) RegistryService {
if redisConfig == nil {
log.Fatalf("redis config is nil")
panic("redis config is nil")
}
cfg := &redis.Options{
Addr: redisConfig.ServerAddr,
Username: redisConfig.Username,
Password: redisConfig.Password,
DB: redisConfig.DB,
}
cli := redis.NewClient(cfg)
vgroupMapping := config.VgroupMapping
groupList := make(map[string][]*ServiceInstance)
redisRegistryService := &RedisRegistryService{
config: redisConfig,
cli: cli,
ctx: context.Background(),
rwLock: &sync.RWMutex{},
vgroupMapping: vgroupMapping,
groupList: groupList,
}
// loading all server at init time
redisRegistryService.load()
// subscribe at real time
go redisRegistryService.subscribe()
return redisRegistryService
}
3. 服务加载
load 函数:将所有 key 都 scan 出来,再遍历所有的key,拿到对应的value,进行一次初始化操作,加载到本地缓存中
func (s *RedisRegistryService) load() {
// find all the server list redis register by redisFileKeyPrefix
keys, _, err := s.cli.Scan(s.ctx, 0, fmt.Sprintf("%s*", redisFileKeyPrefix), 0).Result()
if err != nil {
log.Errorf("RedisRegistryService-Scan-Key-Error:%s", err)
return
}
for _, key := range keys {
clusterName := s.getClusterNameByKey(key)
val, err := s.cli.Get(s.ctx, key).Result()
if err != nil {
log.Errorf("RedisRegistryService-Get-Key:%s, Err:%s", key, err)
continue
}
ins, err := s.getServerInstance(val)
if err != nil {
log.Errorf("RedisRegistryService-getServerInstance-val:%s, Err:%s", val, err)
continue
}
// put server instance list in group list
s.rwLock.Lock()
if s.groupList[clusterName] == nil {
s.groupList[clusterName] = make([]*ServiceInstance, 0)
}
s.groupList[clusterName] = append(s.groupList[clusterName], ins)
s.rwLock.Unlock()
}
}
4.服务发现
通过 key 从 vgroupMapping
找到对应的 value
func (s *RedisRegistryService) Lookup(key string) (r []*ServiceInstance, err error) {
s.rwLock.RLock()
defer s.rwLock.RUnlock()
cluster := s.vgroupMapping[key]
if cluster == "" {
err = fmt.Errorf("cluster doesnt exit")
return
}
r = s.groupList[cluster]
return
}
5. 服务注册
- 将
key 和 value
set到 redis 中 - 将
key 和 value
通过 Channel 发布出去 - 另外开启一个协程将进行保活
func (s *RedisRegistryService) register(key, value string) (err error) {
_, err = s.cli.HSet(s.ctx, key, value).Result()
if err != nil {
return
}
msg := &NotifyMessage{
Key: key,
Value: value,
}
s.cli.Publish(s.ctx, redisRegisterChannel, msg)
go func() {
s.keepAlive(s.ctx, key)
}()
return
}
6. 服务订阅
订阅 Subscribe Channel 监听上游服务,并对服务的 key 和 value 进行更新操作。 注意这里对map进行读写的时候要加上读写锁,防止线程不安全。
func (s *RedisRegistryService) subscribe() {
go func() {
msgs := s.cli.Subscribe(s.ctx, redisRegisterChannel).Channel()
for msg := range msgs {
var data *NotifyMessage
err := json.Unmarshal([]byte(msg.Payload), &data)
if err != nil {
log.Errorf("RedisRegistryService-subscribe-Subscribe-Err:%+v", err)
continue
}
// get cluster name by key
clusterName := s.getClusterNameByKey(data.Key)
ins, err := s.getServerInstance(data.Value)
if err != nil {
log.Errorf("RedisRegistryService-subscribe-getServerInstance-value:%s, Err:%s", data.Value, err)
continue
}
s.rwLock.Lock()
if s.groupList[clusterName] == nil {
s.groupList[clusterName] = make([]*ServiceInstance, 0)
}
s.groupList[clusterName] = append(s.groupList[clusterName], ins)
s.rwLock.Unlock()
}
}()
return
}
注意一点:redis的发布订阅的消息是不存储到日志的,也没有ack确认。 所以如果发生的消息的丢失,就需要业务自己承担了,比如自己实现一个ack,发送的时候进行消息日志的存储。
完整代码:
https://github.com/CocaineCong/incubator-seata-go/blob/discovery/redis/pkg/discovery/redis.go