记一次对Codis的无知引起的逻辑变更

news2025/1/12 2:56:50

先提前说明,对Codis的无知是因为Codis不支持一些Redis的命令,而这次的逻辑变更,就是因为使用了PUBLISH,而Codis又不支持PUBLISH导致的。

1. 前言

前段时间的一次需求中,因为设计到多个服务的注册问题,在项目中没有Zookeeper的情况下,决定采用Redis替代Zookeeper的方案实现多个服务注册、动态扩缩容等逻辑。

最初的服务逻辑是:

基于Redis的注册服务主体逻辑

主要可以分为5个函数:

  1. ServiceRegister: 主要负责服务的注册
    1. 服务启动时,会通过Lua脚本将自己注册到Redis的SortedSet中(其中member代表服务名称(保证唯一),score代表服务的下标)
  2. ServiceUnregister: 主要负责服务的注销
    1. 当服务下线时,会通过Lua脚本将自己从Redis的SortedSet中移除,同时触发其他服务的Rebalance
  3. ReportHeartbeat: 主要负责心跳上报
    1. 当服务注册后,会启动心跳上报机制,主要为了防止服务异常下线,其他服务对异常下线的服务无感知
  4. EvictedExpiredService: 主要负责驱逐心跳数据过期的服务
    1. 这个与ReportHeartbeat相辅相成,如果ReportHeartbeat的心跳数据上报时间戳太老(过期),则会被驱逐,同时会告知其他服务进行Rebalance
  5. ServiceRebalace: 主要负责在服务扩缩容、上下线过程中的服务重平衡逻辑
    1. 如果有服务下线、服务上线都会触发服务重平衡,以使得所有注册的服务均处于正常状态。

介绍完上面的5个函数,下面就记录下为什么会引发对Codis无知的思考。

2. 对Codis的无知

为什么会引发对Codis的无知呢?

这个问题在代码合入Test分支以及自己在本地单测过程中均未发现,未发现的原因是自己本地的Redis是单节点,部署在Test环境的Redis是单节点的Redis Cluster,根本不具备高可用性。而我们的生产环境是采用了Codis proxy进行部署的分布式Redis的集群。

在不了解Codis的情况下,上面5个函数中的ServiceRebalance的逻辑我采用的是Redis Publish/Subscribe的命令来监听其他服务下线时发送到channel中的Rebalance消息,从而触发ServiceRegister的逻辑。

Rebalance逻辑流程示意图

问题?

看起来一切似乎都没有毛病,毕竟从Redis 2.x版本开始,Pub/Sub的逻辑就已经支持了。当一切都准备就绪之后,它们遇到了Codis,这个时候当下线服务Publish Rebalace消息的时候,报错了:

command PUBLISH is not allowed

在没有去了解Codis之前,看到这个报错,第一想法就是SRE从redis.conf中禁止了此命令的使用,想着如果放开,这个报错不就消失了吗?

于是带着这个问题去寻找SRE,SRE反馈”Codis不支持PUBLISH这个命令”,收到反馈后,第一时间就去github寻找答案了,于是发现Codis有一个文档,文档标注了Codis不支持的Redis Command:https://github.com/CodisLabs/codis/blob/release3.2/doc/unsupported_cmds.md,而PUBLISH就包含其中,而不支持的原因主要是因为在分布式环境下,为了确保消息在所有节点之间正确传播和同步,Codis为了简化分布式处理的复杂性,所以就没有实现PUBLISH命令:

var opTable = make(map[string]OpInfo, 256)

func init() {
	for _, i := range []OpInfo{
	  // ...
		{"PUBLISH", FlagNotAllow},
		// ...
	} {
		opTable[i.Name] = i
	}
}

func (s *Session) handleRequest(r *Request, d *Router) error {
	opstr, flag, err := getOpInfo(r.Multi)
	if err != nil {
		return err
	}
	r.OpStr = opstr
	r.OpFlag = flag
	r.Broken = &s.broken

	if flag.IsNotAllowed() {
		return fmt.Errorf("command '%s' is not allowed", opstr)
	}
// ignore...
}

3. 解决办法

因为无法使用Pub/Sub的逻辑,就需要移除基于Pub/Sub机制实现的代码逻辑,从而采用其他的方式来替代,因为Publish是嵌入在Lua脚本中的,所以导致我们没有办法使用Kafka这种消息队列来实现这个服务的注册,再加上我们希望这个服务注册的逻辑本身只依赖Redis,所以采用Kafka消息队列来替代Publish/Subscribe的逻辑就被抛弃掉了。

后面采用了Get/Set来替代,因为从Pub/Sub的逻辑来看,对于Publish和Subscribe来看,在go-redis/v8的实现中,subscribe的实现如下列代码所示:

func (c *PubSub) Channel(opts ...ChannelOption) <-chan *Message {
	c.chOnce.Do(func() {
		c.msgCh = newChannel(c, opts...) // new出新的channel
		c.msgCh.initMsgChan() // 初始化msgChan
	})
	if c.msgCh == nil {
		err := fmt.Errorf("redis: Channel can't be called after ChannelWithSubscriptions")
		panic(err)
	}
	return c.msgCh.msgCh
}

func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel {
	c := &channel{
		pubSub: pubSub,

		chanSize:        100,
		chanSendTimeout: time.Minute,
		checkInterval:   3 * time.Second,
	}
	for _, opt := range opts {
		opt(c)
	}
	if c.checkInterval > 0 {
		c.initHealthCheck() // 这里默认是3s发送一次ping, 保证subscribe与redis的连通性
	}
	return c
}

func (c *channel) initHealthCheck() {
	ctx := context.TODO()
	c.ping = make(chan struct{}, 1)

	go func() {
		timer := time.NewTimer(time.Minute) // 这里默认是1分钟
		timer.Stop()

		for {
			timer.Reset(c.checkInterval) // 重置为3s
			select {
			case <-c.ping: // 这里是与下面的函数(initMsgChan)结合使用的
			// 一旦接收到c.ping, 实际就说明网络是联通的,就没必要再去发送ping消息验证健康状态了
				if !timer.Stop() {
					<-timer.C
				}
			case <-timer.C:
				if pingErr := c.pubSub.Ping(ctx); pingErr != nil {
					c.pubSub.mu.Lock()
					c.pubSub.reconnect(ctx, pingErr)
					c.pubSub.mu.Unlock()
				}
			case <-c.pubSub.exit:
				return
			}
		}
	}()
}

func (c *channel) initMsgChan() {
	ctx := context.TODO()
	c.msgCh = make(chan *Message, c.chanSize)

	go func() {
		timer := time.NewTimer(time.Minute)
		timer.Stop()

		var errCount int
		for {
			// 这里的连接如果往底层追的,主要是一个延迟阻塞读取的逻辑
			msg, err := c.pubSub.Receive(ctx)
			if err != nil {
				if err == pool.ErrClosed {
					close(c.msgCh)
					return
				}
				if errCount > 0 {
					time.Sleep(100 * time.Millisecond)
				}
				errCount++
				continue
			}

			errCount = 0

			// Any message is as good as a ping.
			// 这里只要获取到了消息,就意味着subscribe的连接是健康的
			select {
			case c.ping <- struct{}{}:
			default:
			}

			switch msg := msg.(type) {
			case *Subscription:
				// Ignore.
			case *Pong:
				// Ignore.
			case *Message:
				// 如果获取到消息之后,直接重置定时器
				timer.Reset(c.chanSendTimeout)
				select {
				case c.msgCh <- msg:
					if !timer.Stop() { // 这一步主要是因为Reset只能重置timer已经stopped或者已经过了timer expired的timer,可以看其方法comment
					// 这里做这一步因为如果timer.C在timer.Reset之前就到期了,这里不释放,会导致下次出现命中 <-timer.C的误操作(select多个条件命中会随机选择一个)
						<-timer.C // 如果定时器没有stop,说明可能到期了,先把chan释放
					}
				case <-timer.C:
					internal.Logger.Printf(
						ctx, "redis: %s channel is full for %s (message is dropped)",
						c, c.chanSendTimeout)
				}
			default:
				internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
			}
		}
	}()
}

关于c.pubSub.Receive(ctx)方法,继续向下看,会一直看到

func (c *PubSub) Receive(ctx context.Context) (interface{}, error) {
	return c.ReceiveTimeout(ctx, 0)
}

func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error {
	if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
		return err
	}
	return fn(cn.rd)
}

func (c *conn) SetReadDeadline(t time.Time) error {
	if !c.ok() {
		return syscall.EINVAL
	}
	if err := c.fd.SetReadDeadline(t); err != nil {
		return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err}
	}
	return nil
}

func (fd *netFD) SetReadDeadline(t time.Time) error {
	return fd.pfd.SetReadDeadline(t)
}

func (fd *FD) SetReadDeadline(t time.Time) error {
	return setDeadlineImpl(fd, t, 'r')
}

func runtime_pollSetDeadline(ctx uintptr, d int64, mode int)

这里主要是为读取fd数据设置了一个延迟时间,而我们的timeout传递为0,表示的是一直在监听,而我们的initHealthCheck函数会每3s发送一个ping的命令给Redis,以保证这里3s一定会读取到一个”pong”的回复。

关于runtime_pollSetDeadline的进一步的实现原理,可以参考【5-4 Golang】实战—Go服务502总结这篇博客。

3.1 实现

有了上面对subscribe获取数据的分析,对于Get/Set的实现方案,则采用如下的逻辑:

  1. 当服务下线时,之前利用PUBLISH给所有SUBSCRIBE发送消息的行为被改写成了获取所有心跳数据中的服务key,然后为每一个key写一个需要rebalance的数据到这个key中
  2. 其他的服务则按时轮询这个key是否存在,如果存在则相当于收到了类似之前PUBLISH的消息,则触发Rebalance的行为。

上面的实现方案,在某种程度上是可行的,但是相比于go-redis的实现方案,自己写的代码差距还是有些大,因为这样就导致了轮询必须不断地发送Redis的请求,而不是像go-redis一样利用healthcheck每3s发送一个命令,在receive方法中也是采用读到了才返回数据的机制(timeout=0意味着没有任何的deadline)实现的,则保证了服务在没有收到新的请求下每3s才会有一个请求发送给redis。

4. 小结

因为使用了Redis,引入了PUBLISH,因为Codis,PUBLISH不允许使用,导致了需求逻辑的变更。同时也暴露出自己对于服务中一些基础中间件的不了解,作为一个后端服务开发,在对中间件不了解的情况下使用一些中间件的技术来实现服务逻辑,这个行为本身其实就是需求分析不到位导致的。虽然在刚开始自己还在为Redis不允许使用PUBLISH命令感觉到自己也很难去知道命令不可用,但考虑到自己对系统服务使用的是Codis proxy搭建的分布式Redis集群,就让自己对这次的问题产生了一些反思。

希望后续遇到类似的需求,可以先调查清楚这些基础中间件的支持情况,然后再决定业务逻辑代码的编写,而不是先实现了业务逻辑代码,因为遇到基础中间件不支持的情况,再反过来修改业务逻辑代码!

参考

  • Codis:https://github.com/CodisLabs/codis
  • go-redis: https://github.com/redis/go-redis/blob/f3fe61148b2b8fe0a669dc23620690407f5f92af/pubsub.go#L564

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1561155.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

docker容器添加新端口映射的步骤及`wsl$`目录的作用

在Docker容器已经创建后&#xff0c;需要添加新的端口映射&#xff0c;即对已经存在的Docker容器添加新的端口映射&#xff0c;可以通过以下步骤来添加&#xff0c;即通过修改配置文件的方法。 如何新增端口映射&#xff1f; 查找容器的hash值 docker inspect [容器id或名称…

机器学习在智能音箱中的应用探索与实践:让声音更懂你

&#x1f9d1; 作者简介&#xff1a;阿里巴巴嵌入式技术专家&#xff0c;深耕嵌入式人工智能领域&#xff0c;具备多年的嵌入式硬件产品研发管理经验。 &#x1f4d2; 博客介绍&#xff1a;分享嵌入式开发领域的相关知识、经验、思考和感悟,欢迎关注。提供嵌入式方向的学习指导…

椋鸟数据结构笔记#5:树、二叉树基础

文章目录 树树的相关概念树的表示 二叉树基础二叉树分类满二叉树完全二叉树 二叉树的性质二叉树的存储结构顺序存储链式存储 萌新的学习笔记&#xff0c;写错了恳请斧正。 树 树是一种非线性的数据结构&#xff0c;它是由 n 个节点组成的一个具有层次关系的数据集合。其大概结…

算法学习——LeetCode力扣补充篇3(143. 重排链表、141. 环形链表、205. 同构字符串、1002. 查找共用字符、925. 长按键入)

算法学习——LeetCode力扣补充篇3 143. 重排链表 143. 重排链表 - 力扣&#xff08;LeetCode&#xff09; 描述 给定一个单链表 L 的头节点 head &#xff0c;单链表 L 表示为&#xff1a; L0 → L1 → … → Ln - 1 → Ln 请将其重新排列后变为&#xff1a; L0 → Ln → …

题目:小蓝的神秘行囊(蓝桥OJ 3937)

问题描述&#xff1a; 解题思路&#xff1a; 二维优化01背包模板题。与一维优化01背包不同在于多增加一维。 代码&#xff1a; #include <bits/stdc.h> using namespace std;const int N 1e2 9; int dp[N][N]; //二维的01背包&#xff0c;dp[i][j]&#xff1a;i是体…

【SpringCloud】一文详谈Nacos

&#x1f3e1;浩泽学编程&#xff1a;个人主页 &#x1f525; 推荐专栏&#xff1a;《深入浅出SpringBoot》《java对AI的调用开发》 《RabbitMQ》《Spring》《SpringMVC》《项目实战》 &#x1f6f8;学无止境&#xff0c;不骄不躁&#xff0c;知行合一 文章目录 …

metasploit使用及内网笔记

1 基本操作 Metasploit就是一个漏洞框架。它的全称叫做The Metasploit Framework&#xff0c;简称叫做MSF。Metasploit作为全球最受欢迎的工具&#xff0c;不仅仅是因为它的方便性和强大性&#xff0c;更重要的是它的框架。它允许使用者开发自己的漏洞脚本&#xff0c;从而进行…

Dockerfile和Docker-compose

一、概述 Dockerfile和Docker Compose是用于构建和管理 Docker 容器的两个工具&#xff0c;但它们的作用和使用方式不同。 Dockerfile Dockerfile 是一个文本文件&#xff0c;用于定义 Docker 镜像的构建规则。它包含一系列指令&#xff0c;如 FROM&#xff08;指定基础镜像…

RAG:检索增强生成系统如何工作

随着大型语言模型&#xff08;LLM&#xff09;的发展&#xff0c;人工智能世界取得了巨大的飞跃。经过大量数据的训练&#xff0c;LLM可以发现语言模式和关系&#xff0c;使人工智能工具能够生成更准确、与上下文相关的响应。 但LLM也给人工智能工程师带来了新的挑战&#xff…

System.gc 之后到底发生了什么 ?

本文基于 OpenJDK17 进行讨论 在 JDK NIO 针对堆外内存的分配场景中&#xff0c;我们经常会看到 System.gc 的身影&#xff0c;比如当我们通过 FileChannel#map 对文件进行内存映射的时候&#xff0c;如果 JVM 进程虚拟内存空间中的虚拟内存不足&#xff0c;JVM 在 native 层就…

将整数的二进制位的奇偶数位交换

✅博客主页:爆打维c-CSDN博客​​​​​​ &#x1f43e; &#x1f539;分享c语言知识及代码 以下是一个用于交换整数二进制位的宏&#xff1a; #define SWAP_BITS(num) (((num) & 0xAAAAAAAA) >> 1) | (((num) & 0x55555555) << 1)这个宏利用了二进制掩…

WordPress AutomaticPlugin SSRF漏洞复现(CVE-2024-27954)

0x01 产品简介 WordPress是一款免费开源的内容管理系统(CMS),最初是一个博客平台,但后来发展成为一个功能强大的网站建设工具,适用于各种类型的网站,包括个人博客、企业网站、电子商务网站等,并逐步演化成一款内容管理系统软件。 0x02 漏洞概述 WordPress AutomaticPlu…

让工作自动化起来!无所不能的Python

让工作自动化起来&#xff01;无所不能的Python 一、Python是办公自动化的重要工具二、Python是提升职场竞争力的利器三、Python是企业数字化的重要平台四、Python是AI发展的重要通道之一内容简介作者简介前言读者对象如何阅读本书购买链接参与方式 随着我国企业数字化和信息化…

Shell与Bash与POSIX与Linux间的关系

shell是什么&#xff1f; Shell的英语翻译是“壳”&#xff0c;其作用也跟名字差不多&#xff0c;为操作系统套个壳&#xff0c;人与操作系统的壳交互。与壳相对应的则是操作系统内核&#xff0c;一个“壳”一个“核”。核从1970年代开始就基本定型了&#xff0c;没什么大的改…

QA测试开发工程师面试题满分问答4: 如何测试购物车功能?

当测试一个购物车时&#xff0c;我们需要采用全面的测试策略&#xff0c;以确保购物车在各种情况下的功能正常、性能良好和用户体验优秀。以下是一个详细的测试计划&#xff0c;包含了各个方面的测试。 功能测试&#xff1a; 添加商品到购物车&#xff1a;验证能否将商品成功添…

基于深度学习的端到端自动驾驶的最新进展:调研综述

基于深度学习的端到端自动驾驶的最新进展&#xff1a;调研综述 附赠自动驾驶学习资料和量产经验&#xff1a;链接 论文链接&#xff1a;https://arxiv.org/pdf/2307.04370.pdf 调研链接&#xff1a;https://github.com/Pranav-chib/ 摘要 本文介绍了基于深度学习的端到端自…

书生浦语笔记一

2023年6月&#xff0c;InternLM的第一代大模型正式发布。仅一个月后&#xff0c;该模型以及其全套工具链被开源。随后&#xff0c;在8月份&#xff0c;多模态语料库chat7B和lagent也被开源。而在接下来的9月份&#xff0c;InternLM20B的开源发布进一步加强了全线工具链的更新。…

猜数游戏(Python)

一、实验要求&#xff1a; &#xff08;1&#xff09;在游戏开始时&#xff0c;随机生成一个1~100之间的整数。 &#xff08;2&#xff09;在游戏中&#xff0c;玩家有10次机会猜数。如果10次都没有猜中&#xff0c;则游戏失败&#xff1b;否则&#xff0c;游戏成功。 &…

Linux:查询类型的命令type

相关阅读 Linuxhttps://blog.csdn.net/weixin_45791458/category_12234591.html?spm1001.2014.3001.5482 type命令是Linux中一个查询类型的命令&#xff0c;它可以查询name是alias别名、keyword关键字、function函数名、builtin内建命令名&#xff08;这很有用&#xff09;或…

Linux系统使用Docker部署MeterSphere并实现公网访问本地测试平台

文章目录 前言1. 安装MeterSphere2. 本地访问MeterSphere3. 安装 cpolar内网穿透软件4. 配置MeterSphere公网访问地址5. 公网远程访问MeterSphere6. 固定MeterSphere公网地址 前言 MeterSphere 是一站式开源持续测试平台, 涵盖测试跟踪、接口测试、UI 测试和性能测试等功能&am…