基于 Redis 发布订阅实现服务注册与发现

news2025/1/12 19:56:10

写在前面

其实很少有公司会使用 Redis 来实现服务注册与发现,通常是ETCD、NACOS、ZOOKEEPER等等,但是也不妨碍我们了解。本文会先介绍 Redis 的发布/订阅模式,接着基于这个模式实现服务注册与发现。

Redis发布订阅流程图:
在这里插入图片描述

Redis 发布订阅

1. 简介

Redis的发布订阅功能主要由PUBLISHSUBSCRIBEPSUBSCRIBE 等命令组成的。

通过执行 SUBSCRIBE 命令,客户端可以订阅一个或多个频道,从而成为这个频道的订阅者。
在这里插入图片描述

每当有其他客户端向这个被订阅的频道发送消息的时候,频道的所有订阅者都会收到这条消息。

在这里插入图片描述
当然,客户端还可以通过PSUBSCRIBE订阅一个或多个模式,从而成为这些模式的订阅者,也就是模糊匹配

2. 订阅

每当一个客户端执行SUBSCRIBE命令订阅某个或某些频道的时候,这个客户端与被订阅者之间就会建立起一种订阅关系。而Redis会将这种订阅关系保存到pubsub_channels 这个字典中,这个字典的键是某个被订阅的频道,而值是一个链表,这个链表记录了所有订阅这个频道的客户端

在这里插入图片描述
每当有客户端执行了SUBSCRIBE命令订阅某个或某些频道的时候,服务器都会将客户端与被订阅的频道在 pubsub_channels字典中进行关联。

3. 退订

如果进行退订UNSUBSCRIBE,那么服务器会从pubsub_channels中接触客户端与被退订频道之间的关联。当这个key中,已经没有订阅者,那么会将这个key进行删除。例如下面的client7
在这里插入图片描述

4. 发布消息

当一个Redis客户端执行 PUBLISH channel message 命令将消息 message 发送给channel的时候,将消息发送给channel频道的所有订阅者(本文不讨论pattern模式)

服务注册与发现

我们了解完redis的发布订阅流程之后,我们来基于这个发布订阅来实现一个服务注册与发现的功能。

Redis服务发现与注册流程图:
在这里插入图片描述

1. 对象定义

redis服务发现与注册的结构体

type RedisRegistryService struct {
	config *RedisConfig // the config about redis
	cli *redis.Client // client for redis
	rwLock *sync.RWMutex // rwLock lock groupList when update service instance
	// vgroupMapping to store the cluster group
	// eg: map[cluster_name_key]cluster_name
	vgroupMapping map[string]string
	// groupList store all addresses under this cluster
	// eg: map[cluster_name][]{service_instance1,service_instance2...}
	groupList map[string][]*ServiceInstance
	ctx context.Context
}

订阅的消息内容,为key 以及 value ,而key就是服务的name,value就是服务的具体地址

type NotifyMessage struct {
	// key = registry.redis.${cluster}_ip:port
	Key   string `json:"key"`
	Value string `json:"value"`
}

2. 对象加载

新建一个redis服务注册与发现对象,并且在创建的这个对象的时候,我们会做两件事情

  1. 将redis中所已存在的key都load一次,存到本地缓存中。
  2. 开启一些协程进行发布订阅,不断监听上游的注册消息
func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) RegistryService {
	if redisConfig == nil {
		log.Fatalf("redis config is nil")
		panic("redis config is nil")
	}

	cfg := &redis.Options{
		Addr:     redisConfig.ServerAddr,
		Username: redisConfig.Username,
		Password: redisConfig.Password,
		DB:       redisConfig.DB,
	}
	cli := redis.NewClient(cfg)

	vgroupMapping := config.VgroupMapping
	groupList := make(map[string][]*ServiceInstance)

	redisRegistryService := &RedisRegistryService{
		config:        redisConfig,
		cli:           cli,
		ctx:           context.Background(),
		rwLock:        &sync.RWMutex{},
		vgroupMapping: vgroupMapping,
		groupList:     groupList,
	}

	// loading all server at init time
	redisRegistryService.load()
	// subscribe at real time
	go redisRegistryService.subscribe()
	return redisRegistryService
}

3. 服务加载

load 函数:将所有 key 都 scan 出来,再遍历所有的key,拿到对应的value,进行一次初始化操作,加载到本地缓存中

func (s *RedisRegistryService) load() {
	// find all the server list redis register by redisFileKeyPrefix
	keys, _, err := s.cli.Scan(s.ctx, 0, fmt.Sprintf("%s*", redisFileKeyPrefix), 0).Result()
	if err != nil {
		log.Errorf("RedisRegistryService-Scan-Key-Error:%s", err)
		return
	}
	for _, key := range keys {
		clusterName := s.getClusterNameByKey(key)
		val, err := s.cli.Get(s.ctx, key).Result()
		if err != nil {
			log.Errorf("RedisRegistryService-Get-Key:%s, Err:%s", key, err)
			continue
		}
		ins, err := s.getServerInstance(val)
		if err != nil {
			log.Errorf("RedisRegistryService-getServerInstance-val:%s, Err:%s", val, err)
			continue
		}
		// put server instance list in group list
		s.rwLock.Lock()
		if s.groupList[clusterName] == nil {
			s.groupList[clusterName] = make([]*ServiceInstance, 0)
		}
		s.groupList[clusterName] = append(s.groupList[clusterName], ins)
		s.rwLock.Unlock()
	}
}

4.服务发现

通过 key 从 vgroupMapping 找到对应的 value

func (s *RedisRegistryService) Lookup(key string) (r []*ServiceInstance, err error) {
	s.rwLock.RLock()
	defer s.rwLock.RUnlock()
	cluster := s.vgroupMapping[key]
	if cluster == "" {
		err = fmt.Errorf("cluster doesnt exit")
		return
	}
	r = s.groupList[cluster]
	return
}

5. 服务注册

  1. key 和 value set到 redis 中
  2. key 和 value 通过 Channel 发布出去
  3. 另外开启一个协程将进行保活
func (s *RedisRegistryService) register(key, value string) (err error) {
	_, err = s.cli.HSet(s.ctx, key, value).Result()
	if err != nil {
		return
	}

	msg := &NotifyMessage{
		Key:   key,
		Value: value,
	}

	s.cli.Publish(s.ctx, redisRegisterChannel, msg)

	go func() {
		s.keepAlive(s.ctx, key)
	}()

	return
}

6. 服务订阅

订阅 Subscribe Channel 监听上游服务,并对服务的 key 和 value 进行更新操作。 注意这里对map进行读写的时候要加上读写锁,防止线程不安全。

func (s *RedisRegistryService) subscribe() {
	go func() {
		msgs := s.cli.Subscribe(s.ctx, redisRegisterChannel).Channel()
		for msg := range msgs {
			var data *NotifyMessage
			err := json.Unmarshal([]byte(msg.Payload), &data)
			if err != nil {
				log.Errorf("RedisRegistryService-subscribe-Subscribe-Err:%+v", err)
				continue
			}
			// get cluster name by key
			clusterName := s.getClusterNameByKey(data.Key)
			ins, err := s.getServerInstance(data.Value)
			if err != nil {
				log.Errorf("RedisRegistryService-subscribe-getServerInstance-value:%s, Err:%s", data.Value, err)
				continue
			}
			s.rwLock.Lock()
			if s.groupList[clusterName] == nil {
				s.groupList[clusterName] = make([]*ServiceInstance, 0)
			}
			s.groupList[clusterName] = append(s.groupList[clusterName], ins)
			s.rwLock.Unlock()
		}
	}()

	return
}

注意一点:redis的发布订阅的消息是不存储到日志的,也没有ack确认。 所以如果发生的消息的丢失,就需要业务自己承担了,比如自己实现一个ack,发送的时候进行消息日志的存储。

完整代码:
https://github.com/CocaineCong/incubator-seata-go/blob/discovery/redis/pkg/discovery/redis.go

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1629091.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

多端文件互传软件-LocalSend

一、前言 日常学习或者是工作需求,需要手机和电脑互传文件。用到频率低的话,使用即时通讯软件也就够了。 像我日常使用的多端互传文件软件是LocalSend。 二、 LocalSend LocalSend是一款基于局域网的文件传输工具。 LocalSend是一种用于在本地网络中…

系统盘空间不足调优方式1-APPData/大文件清理

作者:私语茶馆 1.前言 Windows系统盘(C盘)很容易剩余空间不足,这种情况下会非常影响Windows系统的运行,系统盘约束非常多,不方便在线扩容,因此规划和利用好系统盘是保障整体运行效率的关键。包…

经典机器学习算法——Pagerank算法

目录 Pagerank介绍 背景介绍 中心思想 一、量化重要性 三大指标 1、数量指标 2、质量指标 3、稀释指标 二、实际应用简化为理论模型 PageRank公式 手动预测网站的重要度 马尔可夫矩阵预测网站的重要度 两个方法的联系 PageRank算法存在的问题 一、Dead Ends问题…

XTuner微调LLM:1.8B、多模态和Agent-笔记四

本次课程由XTuner 贡献者李剑锋、汪周谦、王群老师讲解【XTuner 微调 LLM:1.8B、多模态和 Agent】课程 课程视频:http:// https://b23.tv/QUhT6ni 课程文档:https://github.com/InternLM/Tutorial/blob/camp2/xtuner/readme.md 两种Finetun…

web题目实操 5(备份文件和关于MD5($pass,true)注入的学习)

1.[ACTF2020 新生赛]BackupFile (1)打开页面后根据提示是备份文件 (2)查看源码发现啥都没有 (3)这里啊直接用工具扫描,可以扫描到一个文件名为:/index.php.bak的文件 (…

使用Docker部署Jupyter Notebook并结合花生壳的内网穿透实现远程访问(详文)

一、前言 本文主要介绍如何利用宝塔面板中的Docker 3.9.3管理器,使用Docker本地部署Jupyter Notebook,并结合花生壳内网穿透工具实现任意浏览器公网远程访问Jupyter登录界面。 安装完成后在宝塔面板中图例 Jupyter Notebook是一个交互式笔记本,支持运行40多种编程语言。…

D - Grid and Magnet

思路&#xff1a;标记一下磁铁周围的空地即可&#xff0c;每个连通块一定可以互相到达&#xff0c;我们dfs算出联通块的大小再加上该连通块周围的可达磁场区域即可。 代码&#xff1a; #include <bits/stdc.h> using namespace std; using ll long long; using ld lon…

【Godot4.2】自定义Todo清单类 - myTodoList

概述 在写myList类的时候&#xff0c;就想到可以写一个类似的Todo清单类。 基础思路 本质还是在内部维护一个数组&#xff0c;在其基础上进行增删改查操作的封装为了方便存储数据&#xff0c;编写一个自定义内置类TodoItem&#xff0c;内部数组就变成了Array[TodoItem]类型的…

66、二分-搜索旋转排序数组

思路&#xff1a; 不断二分&#xff0c;首先判断左侧有序还是右侧有序&#xff0c;如果左侧有序那么就在左侧寻找&#xff0c;如果右侧有序那就在右侧寻找。假设左侧有序&#xff0c;那就判断目标值在不在左侧&#xff0c;如果在左侧继续左侧二分。如果不在左侧&#xff0c;那么…

Redis可视化工具RedisInsight

下载地址&#xff1a;RedisInsight - The Best Redis GUIRedisInsight provides an intuitive and efficient graphical interface for Redis, allowing you to interact with your databases and manage your data.https://redis.com/redis-enterprise/redis-insight/#insight…

STM32通过ESP8266连接阿里云 详细步骤

一、烧录MQTT固件 ESP8266出厂时&#xff0c;默认是&#xff1a;AT固件。连接阿里云需要&#xff1a;MQTT固件。 因此&#xff0c;我们需要给8266重新烧录 MQTT固件。 针对“魔女开发板&#xff0c;ESP8266模块烧录MQTT固件&#xff0c;图解教程如下&#xff1a; ESP8266 烧录 …

Spring Task学习记录

介绍 cron表达式 cron表达式在线生成器 链接: link 入门案例 Component Slf4j public class MyTask {/*** 定时任务 每隔5秒触发1次*/Scheduled(cron "0/5 * * * * ?")public void executeTask(){log.info("定时任务开始执行&#xff1a;{}", new Date…

什么是外汇爆仓?怎样避免?

外汇爆仓是指当交易者的保证金低于特定比例时&#xff0c;经纪商会自动平仓一个或所有的开仓头寸。避免外汇爆仓的关键在于合理配置资金、设置止损、适度交易、顺势而为以及调整心态。 外汇爆仓是外汇交易中的一种风险控制机制。当交易者的账户净值低于已用保证金的特定比例时&…

C语言 基本数据类型及大小

一、基本数据类型 1.整型int 整型的关键字是int&#xff0c;定义一个整型变量时&#xff0c;只需要用int来修饰即可。也分为短整型和长整型。 2.浮点型 浮点型又分单精度浮点型float和双精度浮点型double。 3.字符型char 前面的整型和浮点型都是用于存放数字。字符型&…

考研数学|跟完武忠祥基础,刷题还是看张宇基础❓

听完武忠祥老师的课程&#xff0c;当然是趁热打铁&#xff0c;多练题&#xff0c;巩固做题技巧 首先&#xff0c;武忠祥老师和张宇老师在基础阶段的课程质量基本是差不多的&#xff0c;如果你听完武忠祥老师的课程&#xff0c;并且基本都听懂了&#xff0c;真的没有必要再去浪…

Autosar MCAL-RH850P1HC Fls配置

文章目录 FlsFlsGeneralFlsAcLoadOnJobStartFlsBaseAddressFlsBlankCheckApiFlsCancelApiFlsCompareApiFlsCopySupportedFlsCriticalSectionProtectionFlsDevErrorDetectFlsDeviceNameFlsDriverIndexFlsFaciEccCheckFlsGetJobResultApiFlsGetStatusApiFlsLoopCountFlsReadImmed…

大型语言模型高效推理综述

论文地址&#xff1a;2404.14294.pdf (arxiv.org) 大型语言模型&#xff08;LLMs&#xff09;由于在各种任务中的卓越表现而受到广泛关注。然而&#xff0c;LLM推理的大量计算和内存需求给资源受限的部署场景带来了挑战。该领域的努力已经朝着开发旨在提高LLM推理效率的技术方…

libVLC 制作一款精美的播放器

1.简介 本文将简单介绍使用libVLC制作一款精美的播放器。 开发环境:Visual Studio + Qt插件。 Qt版本:Qt5.9。 libVLC版本:3.0.20。 以下是运行界面效果图:截取其中几张。 右键菜单,功能还是比较齐全。 2.ui界面构成 接下来简单介绍一下ui界面构成。 主界面由播放树…

【Linux内核驱动基础】从零开始手搓一个从上层应用到底层驱动的IO口代码

【Linux内核驱动基础】从零开始手搓一个从上层应用到底层驱动的IO口控制代码 文章目录 【Linux内核驱动基础】从零开始手搓一个从上层应用到底层驱动的IO口控制代码一、驱动基础认知1.为什么要学会写驱动2.文件名与设备号3.open函数从上层打通到底层硬件的详细过程 二、基于内核…

22年全国职业技能大赛——Web Proxy配置(web 代理)

前言&#xff1a;原文在我的博客网站中&#xff0c;持续更新数通、系统方面的知识&#xff0c;欢迎来访&#xff01; 系统服务&#xff08;22年国赛&#xff09;—— web Proxy服务&#xff08;web代理&#xff09;https://myweb.myskillstree.cn/114.html 目录 RouterSrv …