欢迎继续跟随《Redis新手指南:从入门到精通》专栏的步伐!在本文中,我们将深入探讨Redis的发布与订阅(Pub/Sub)模式。这是一种强大的消息传递机制,适用于各种实时通信场景,如聊天应用、实时通知和事件驱动架构。通过学习本篇文章,你将能够掌握如何使用Redis的Pub/Sub功能来构建高效的消息传递系统。
在本文中,你将会学到:
- 为什么需要发布与订阅模式: 通过业务实际案例解释为什么需要Pub/Sub模式
- 什么是发布与订阅模式:理解Pub/Sub的基本概念及其在分布式系统中的重要性。
- Redis的Pub/Sub命令:详细介绍如何使用
PUBLISH
,SUBSCRIBE
,UNSUBSCRIBE
, 和PSUBSCRIBE
等命令。 - 频道与模式:了解频道(channels)和模式(patterns)的区别及用法。
- 实战案例:通过一个简单的聊天应用示例,展示如何在实际项目中使用Redis的Pub/Sub功能。
- 最佳实践:分享一些使用Pub/Sub时的最佳实践,包括性能优化和故障处理。
无论你是初学者还是有经验的开发者,本文都将为你提供清晰易懂的指导,帮助你在项目中有效利用Redis的发布与订阅功能。让我们一起探索这一强大的消息传递机制吧!
也许有的小伙伴对这个功能比较陌生,不太清楚这个功能是干什么的. 不妨我们先举个案例
假设我们有这么一个业务场景,在网站下单支付以后,需要通知库存服务进行发货处理。
上面业务实现不难,我们只要让库存服务提供给相关的给口,下单支付之后只要调用库存服务即可。
后面如果又有新的业务,比如说积分服务,他需要获取下单支付的结果,然后增加用户的积分。
这个实现也不难,让积分服务同样提供一个接口,下单支付之后只要调用库存服务即可。
如果就两个业务需要获取下单支付的结果,那也还好,程序改造也快。可是随着业务不断的发展,越来越多的新业务说是要下单支付的结果。
这时我们会发现上面这样的系统架构存在很多问题:
第一,下单支付业务与其他业务重度耦合,每当有个新业务需要支付结果,就需要改动下单支付的业务。
第二,如果调用业务过多,会导致下单支付接口响应时间变长。另外,如果有任一下游接口响应变慢,就会同步导致下单支付接口响应也变长。
第三,如果任一下游接口失败,可能导致数据不一致的情况。比如说下图,先调用 A,成功之后再调用 B,最后再调用 C。
如果在调用 B 接口的发生异常,此时可能就导致下单支付接口返回失败,但是此时 A 接口其实已经调用成功,这就代表它内部已经处理下单支付成功的结果。
这样就会导致 A,B,C 三个下游接口,A 获取成功获取支付结果,但是 B,C 没有拿到,导致三者系统数据不一致的情况。
其实我们仔细想一下,对于下单支付业务来讲,它其实不需要关心下游调用结果,只要有某种机制通知能通知到他们就可以了。
讲到这里,这就需要引入今天需要介绍发布订阅机制
1 什么是发布订阅🍀
Redis 发布订阅(pub/sub) 是一种消息通信模式: 发送者(pub)发送消息,订阅者(sub)接收消息。Redis 的 subscribe 命令可以让客户端订阅任意数量的频道, 每当有新信息发送到被订阅的频道时, 信息就会被发送给所有订阅指定频道的客户端
下图展示了频道channel , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:
当有新消息通过 publish 命令发送给频道 channel 时, 这个消息就会被发送给订阅它的三个客户端:
如上图所示,消息发布者只需要想指定的频道发布消息,订阅该频道的每个客户端都可以接受到到这个消息。
使用 Redis 发布订阅这种机制,对于上面业务,下单支付业务只需要向「支付结果」这个频道发送消息,其他下游业务订阅「支付结果」这个频道,就能收相应消息,然后做出业务处理即可
这样就可以解耦系统上下游之间调用关系
2 为什么要用发布订阅🍀
针对消息订阅发布功能,市面上很多大厂使用的是kafka
、RabbitMQ
、ActiveMQ
, RocketMQ等这几种,
redis的订阅发布功能跟这三者相比,相对轻量,针对数据准确和安全性要求没有那么高可以直接使用,适用于小公司
1.双端队列
redis 的List数据类型结构提供了 blpop 、brpop 命令结合 rpush、lpush 命令可以实现消息队列机制,基于双端链表实现的发布与订阅功能
这种方式存在两个局限性:
-
不能支持一对多的消息分发。
-
如果生产者生成的速度远远大于消费者消费的速度,易堆积大量未消费的消息
◇ 双端队列图解如下:
✦ 解析:双端队列模式只能有一个或多个消费者轮着去消费,却不能将消息同时发给其他消费者
2. 发布/订阅模式
发布/订阅模式图解如下:
✦ 解析:redis订阅发布模式,生产者生产完消息通过频道分发消息,给订阅了该频道的所有消费
3 发布/订阅如何使用🍀
Redis有两种发布/订阅模式
1.基于频道(Channel)的发布/订阅
允许用户订阅一个或多个频道,以接收所有发送到这些频道的消息。这种模型与其它消息队列不同,因为它具有消息的持久性,即使消费者在消息发送时不在线,当它们上线时,它们仍然可以接收到所有的消息
命令 | 命令描述 |
---|---|
subscribe channel [cha...] | 订阅给定的一个或多个频道 |
unsubscribe channel [cha...] | 退订给定的频道.说明:若没有指定channel,则默认退订所有频道 |
publish channel message | 将消息发送给指定频道的channel,返回结果:接收到信息的订阅者数量,无订阅者返回0 |
publish channels [argument [arg]....] | 查看订阅与发布系统的状态.说明:返回活跃频道列表(即至少有一个订阅者的频道,订阅模式的客户端除外) |
"发布/订阅" 包含2种角色:发布者和订阅者。发布者可以向指定的频道(channel)发送消息;订阅者可以订阅一个或者多个频道(channel),所有订阅此频道的订阅者都会收到此消息
- 订阅者订阅频道 subscribe channel [channel ...]
--------------------------客户端1(订阅者) :订阅频道 ---------------------
# 订阅 “meihuashisan” 和 “csdn” 频道(如果不存在则会创建频道)
127.0.0.1:6379> subscribe meihuashisan csdn
Reading messages... (press Ctrl-C to quit)
1) "subscribe" -- 返回值类型:表示订阅成功!
2) "meihuashisan" -- 订阅频道的名称
3) (integer) 1 -- 当前客户端已订阅频道的数量
1) "subscribe"
2) "csdn"
3) (integer) 2
#注意:订阅后,该客户端会一直监听消息,如果发送者有消息发给频道,这里会立刻接收到消息
- 发布者发布消息 publish channel message
-----------------------客户端2(发布者):发布消息给频道 -------------------
# 给“meihuashisan”这个频道 发送一条消息:“I am meihuashisan”
127.0.0.1:6379> publish meihuashisan "I am meihuashisan"
(integer) 1 # 接收到信息的订阅者数量,无订阅者返回0
- 客户端2(发布者)发布消息给频道后,此时我们再来观察 客户端1(订阅者)的客户端窗口变化
--------------------------客户端1(订阅者) :订阅频道 -----------------
127.0.0.1:6379> subscribe meihuashisan csdn
Reading messages... (press Ctrl-C to quit)
1) "subscribe" -- 返回值类型:表示订阅成功!
2) "meihuashisan" -- 订阅频道的名称
3) (integer) 1 -- 当前客户端已订阅频道的数量
1) "subscribe"
2) "csdn"
3) (integer) 2
--------------------变化如下:(实时接收到了该频道的发布者的消息)------------
1) "message" -- 返回值类型:消息
2) "meihuashisan" -- 来源(从哪个频道发过来的)
3) "I am meihuashisan" -- 消息内容
注意:如果是先发布消息,再订阅频道,不会收到订阅之前就发布到该频道的消息!
注意:进入订阅状态的客户端,不能使用除了subscribe
、unsubscribe
、psubscribe
和 punsubscribe
这四个属于"发布/订阅"之外的命令,否则会报错!
这里的客户端指的是 jedis、lettuce的客户端,redis-cli是无法退出订阅状态的
底层实现逻辑
底层通过字典实现。pubsub_channels
是一个字典类型,保存订阅频道的信息:字典的key为订阅的频道, 字典的value是一个链表, 链表中保存了所有订阅该频道的客户端
struct redisServer {
/* General */
pid_t pid;
//省略百十行
// 将频道映射到已订阅客户端的列表(就是保存客户端和订阅的频道信息)
dict *pubsub_channels; /* Map channels to list of subscribed clients */
}
-
数据结构
比如说,在下图展示的这个 pubsub_channels 示例中, client2 、 client5 和 client1 就订阅了 channel1 , 而其他频道也分别被别的客户端所订阅:
-
订阅
当客户端调用 SUBSCRIBE 命令时, 程序就将客户端和要订阅的频道在 pubsub_channels 字典中关联起来。
举个例子,如果客户端 client10086 执行命令 SUBSCRIBE channel1 channel2 channel3
,那么前面展示的 pubsub_channels 将变成下面这个样子:
-
发布
当调用 PUBLISH channel message
命令, 程序首先根据 channel 定位到字典的键, 然后将信息发送给字典值链表中的所有客户端。
比如说,对于以下这个 pubsub_channels 实例, 如果某个客户端执行命令 PUBLISH channel1 "hello moto"
,那么 client2 、 client5 和 client1 三个客户端都将接收到 "hello moto" 信息:
-
退订
使用 UNSUBSCRIBE 命令可以退订指定的频道, 这个命令执行的是订阅的反操作:它从 pubsub_channels 字典的给定频道(键)中, 删除关于当前客户端的信息, 这样被退订频道的信息就不会再发送给这个客户端。
2.基于模式(pattern)的发布/订阅
命令 | 命令描述 |
---|---|
psubscribe pattern1 [pattern....] | 订阅一个或多个符合给定模式的频道.说明:每个模式以*为匹配符;例如cn*匹配所有以cn开头的频道 |
punsubscribe pattern1 [pattern....] | 退订所有给定模式的频道.说明:pattern未指定,则订阅的所有模式都会被退订,否则只退订指定的订阅模式 |
如果有某个/某些模式和该频道匹配,所有订阅这个/这些频道的客户端也同样会收到信息
图解
下图展示了一个带有频道和模式的例子, 其中 com.ahead.* 频道匹配了 com.ahead.juc 频道和 com.ahead.thread 频道, 并且有不同的客户端分别订阅它们三个,如下图:
当有信息发送到com.ahead.thread 频道时, 信息除了发送给 client 4 和 client 5 之外, 还会发送给订阅 com.ahead.* 频道模式的 client x 和 client y
✦ 解析:反之也是,如果当有消息发送给 com.ahead.juc 频道,消息发送给订阅了 juc 频道的客户端之外,还会发送给订阅了 com.ahead.* 频道的客户端: client x 、client y
通配符中?表示1个占位符,表示任意个占位符(包括0),?表示1个以上占位符。
-
订阅者订阅频道 psubscribe pattern [pattern ...]
--------------------------客户端1(订阅者) :订阅频道 -------------------- # 1. ------------订阅 “a?” "com.*" 2种模式频道-------------- 127.0.0.1:6379> psubscribe a? com.* # 进入订阅状态后处于阻塞,可以按Ctrl+C键退出订阅状态 Reading messages... (press Ctrl-C to quit) ---------------订阅成功------------------- 1) "psubscribe" -- 返回值的类型:显示订阅成功 2) "a?" -- 订阅的模式 3) (integer) 1 -- 目前已订阅的模式的数量 1) "psubscribe" 2) "com.*" 3) (integer) 2 ---------------接收消息 (已订阅 “a?” "com.*" 两种模式!)----------------- # ---- 发布者第1条命令:publish ahead "hello" 结果:没有接收到消息,匹配失败,不满足 “a?” ,“?”表示一个占位符, a后面的head有4个占位符 # ---- 发布者第2条命令: publish aa "hello" (满足 “a?”) 1) "pmessage" -- 返回值的类型:信息 2) "a?" -- 信息匹配的模式:a? 3) "aa" -- 信息本身的目标频道:aa 4) "hello" -- 信息的内容:"hello" # ---- 发布者第3条命令:publish com.juc "hello2"(满足 “com.*”, *表示任意个占位符) 1) "pmessage" -- 返回值的类型:信息 2) "com.*" -- 匹配模式:com.* 3) "com.juc" -- 实际频道:com.juc 4) "hello2" -- 信息:"hello2" ---- 发布者第4条命令:publish com. "hello3"(满足 “com.*”, *表示任意个占位符) 1) "pmessage" -- 返回值的类型:信息 2) "com.*" -- 匹配模式:com.* 3) "com." -- 实际频道:com. 4) "hello3" -- 信息:"hello3"
-
发布者发布消息 publish channel message
------------------------客户端2(发布者):发布消息给频道 ------------------ 注意:订阅者已订阅 “a?” "com.*" 两种模式! # 1. ahead 不符合“a?”模式,?表示1个占位符 127.0.0.1:6379> publish ahead "hello" (integer) 0 -- 匹配失败,0:无订阅者 # 2. aa 符合“a?”模式,?表示1个占位符 127.0.0.1:6379> publish aa "hello" (integer) 1 # 3. 符合“com.*”模式,*表示任意个占位符 127.0.0.1:6379> publish com.juc "hello2" (integer) 1 # 4. 符合“com.*”模式,*表示任意个占位符 127.0.0.1:6379> publish com. "hello3" (integer) 1
底层是pubsubPattern节点的链表。
-
数据结构 redisServer.pubsub_patterns 属性是一个链表,链表中保存着所有和模式相关的信息:
struct redisServer {
// ...
list *pubsub_patterns;
// ...
};
链表中的每个节点都包含一个 redis.h/pubsubPattern 结构:
typedef struct pubsubPattern {
redisClient *client;
robj *pattern;
} pubsubPattern;
client 属性保存着订阅模式的客户端,而 pattern 属性则保存着被订阅的模式。
每当调用 PSUBSCRIBE 命令订阅一个模式时, 程序就创建一个包含客户端信息和被订阅模式的 pubsubPattern 结构, 并将该结构添加到 redisServer.pubsub_patterns 链表中。
作为例子,下图展示了一个包含两个模式的 pubsub_patterns 链表, 其中 client123 和 client256 都正在订阅 tweet.shop.* 模式:
-
订阅
如果这时客户端 client10086 执行 PSUBSCRIBE broadcast.list.*
, 那么 pubsub_patterns 链表将被更新成这样:
通过遍历整个 pubsub_patterns 链表,程序可以检查所有正在被订阅的模式,以及订阅这些模式的客户端。
-
发布
发送信息到模式的工作也是由 PUBLISH 命令进行的, 显然就是匹配模式获得Channels,然后再把消息发给客户端。
-
退订
使用 PUNSUBSCRIBE 命令可以退订指定的模式, 这个命令执行的是订阅模式的反操作:程序会删除 redisServer.pubsub_patterns 链表中, 所有和被退订模式相关联的 pubsubPattern 结构, 这样客户端就不会再收到和模式相匹配的频道发来的信息
4 Redis 发布订阅实际应用🍀
Redis Sentinel 节点发现
「Redis Sentinel」 是 Redis 一套高可用方案,可以在主节点故障的时候,自动将从节点提升为主节点,从而转移故障。
今天这里我们不详细解释 「Redis Sentinel」 详细原理,主要来看下 「Redis Sentinel」 如何使用发布订阅机制。
「Redis Sentinel」 节点主要使用发布订阅机制,实现新节点的发现,以及交换主节点的之间的状态。
如下所示,每一个 「Sentinel」 节点将会定时向 _sentinel_:hello
频道发送消息,并且每个 「Sentinel」 都会订阅这个节点。
这样一旦有节点往这个频道发送消息,其他节点就可以立刻收到消息。
这样一旦有的新节点加入,它往这个频道发送消息,其他节点收到之后,判断本地列表并没有这个节点,于是就可以当做新的节点加入本地节点列表。
除此之外,每次往这个频道发送消息内容可以包含节点的状态信息,这样可以作为后面 「Sentinel」 领导者选举的依据。
以上都是对于 Redis 服务端来讲,对于客户端来讲,我们也可以用到发布订阅机制。
当 「Redis Sentinel」 进行主节点故障转移,这个过程各个阶段会通过发布订阅对外提供。
对于我们客户端来讲,比较关心切换之后的主节点,这样我们及时切换主节点的连接(旧节点此时已故障,不能再接受操作指令),
客户端可以订阅 +switch-master
频道,一旦 「Redis Sentinel」 结束了对主节点的故障转移就会发布主节点的的消息。
redission 分布式锁
redission 开源框架提供一些便捷操作 Redis 的方法,其中比较出名的 redission 基于 Redis 的实现分布式锁。
今天我们来看下 Redis 的实现分布式锁中如何使用 Redis 发布订阅机制,提高加锁的性能。
首先我们来看下 redission 加锁的方法:
Redisson redisson = ....
RLock redissonLock = redisson.getLock("xxxx");
redissonLock.lock();
RLock
继承自 Java 标准的 Lock
接口,调用 lock
方法,如果当前锁已被其他客户端获取,那么当前加锁的线程将会被阻塞,直到其他客户端释放这把锁。
这里其实有个问题,当前阻塞的线程如何感知分布式锁已被释放呢?
这里其实有两种实现方法:
第一钟,定时查询分布时锁的状态,一旦查到锁已被释放(Redis 中不存在这个键值),那么就去加锁。
实现伪码如下:
while (true) {
boolean result=lock();
if (!result) {
Thread.sleep(N);
}
}
这种方式实现起来起来简单,不过缺点也比较多。
如果定时任务时间过短,将会导致查询次数过多,其实这些都是无效查询。
如果定时任务休眠时间过长,那又会导致加锁时间过长,导致加锁性能不好。
那么第二种实现方案,就是采用服务通知的机制,当分布式锁被释放之后,客户端可以收到锁释放的消息,然后第一时间再去加锁。
这个服务通知的机制我们可以使用 Redis 发布订阅模式。
当线程加锁失败之后,线程将会订阅 redisson_lock__channel_xxx
(xx 代表锁的名称) 频道,使用异步线程监听消息,然后利用 Java 中 Semaphore
使当前线程进入阻塞。
一旦其他客户端进行解锁,redission 就会往这个redisson_lock__channel_xxx
发送解锁消息。
等异步线程收到消息,将会调用 Semaphore
释放信号量,从而让当前被阻塞的线程唤醒去加锁。
❝ps:这里只是简单描述了 redission 加锁部分原理,出于篇幅,这里就不再消息解析源码。
感兴趣的小伙伴可以自己看下 redission 加锁的源码。
❞
通过发布订阅机制,被阻塞的线程可以及时被唤醒,减少无效的空转的查询,有效的提高的加锁的效率。
❝ps: 这种方式,性能确实提高,但是实现起来的复杂度也很高,这部分源码有点东西,快看晕了。
5 实时聊天应用示例🍀
场景描述
假设你正在开发一个实时聊天应用,用户可以在不同的聊天室中进行交流。每个聊天室都有多个用户,当一个用户发送消息时,其他所有在该聊天室中的用户都应该立即收到这条消息。
传统解决方案的问题
- 轮询(Polling):客户端定期向服务器发送请求,检查是否有新消息。这种方式效率低下,且响应时间较长。
- 长轮询(Long Polling):客户端发送请求后,服务器保持连接打开,直到有新消息或超时。虽然比普通轮询更高效,但仍存在延迟和资源浪费问题。
- WebSocket:使用WebSocket可以实现全双工通信,但设置和维护相对复杂,并且需要处理更多的网络层细节。
使用Redis Pub/Sub的优势
- 实时性:Redis的Pub/Sub机制可以实现实时的消息传递,确保消息几乎即时送达。
- 简单易用:Redis的Pub/Sub命令简单直观,易于集成到现有系统中。
- 高效率:Redis是内存数据库,消息传递速度快,适合高频次的消息传输。
- 灵活性:可以轻松地扩展到多个聊天室和大量用户。
示例实现
-
初始化Redis客户端
在服务器端和客户端分别初始化Redis客户端 -
创建聊天室频道
每个聊天室对应一个Redis频道。例如,chatroom:1
表示第一个聊天室 -
客户端订阅频道
客户端在进入某个聊天室时,订阅对应的频道 -
客户端发布消息
当用户发送消息时,客户端将消息发布到对应的频道 -
服务器端管理聊天室
服务器端可以负责管理和验证用户的订阅和发布操作,确保只有合法用户能够参与聊天
# 相关代码如下:
---------- 聊天室 服务器 代码 如下 ------------
#! /usr/bin/python
# -*- coding: UTF-8 -*-
import redis
# 初始化Redis客户端
r = redis.Redis(host='localhost', port=6379, db=0, password='kdlrdsha2019')
# 订阅聊天室列表
pubsub = r.pubsub()
pubsub.subscribe('chatroom:1')
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received message: {message['data'].decode('utf-8')}")
---------- 客户端使用 代码如下 ----------------
[root@test ~]# python3
Python 3.12.0 (main, Oct 27 2023, 14:11:11) [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import redis
>>> r = redis.Redis(host='localhost', port=6379, db=0, password='kdlrdsha2019')
>>> r.publish('chatroom:1', 'Hello, everyone!')
1
---------- 服务端返回如下 ------------------
[root@test Redis]# python3 Redis_Pub_Sub.py
Received message: Hello, everyone!
代码示例
以下是Python示例代码,展示了如何使用Redis的Pub/Sub功能实现简单的实时聊天应用。
#! /usr/bin/python
# -*- coding: UTF-8 -*-
import redis
import threading
# 初始化Redis客户端, Redis服务器有密码的话需要带password参数
r = redis.Redis(host='localhost', port=6379, db=0)
def subscribe_to_chatroom(chatroom):
pubsub = r.pubsub()
pubsub.subscribe(chatroom)
print(f"Subscribed to {chatroom}")
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received message in {chatroom}: {message['data'].decode('utf-8')}")
def send_message(chatroom, message):
r.publish(chatroom, message)
print(f"Message sent to {chatroom}: {message}")
# 创建两个线程,模拟两个客户端订阅同一个聊天室
thread1 = threading.Thread(target=subscribe_to_chatroom, args=('chatroom:1',))
thread2 = threading.Thread(target=subscribe_to_chatroom, args=('chatroom:1',))
thread1.start()
thread2.start()
# 模拟发送消息
send_message('chatroom:1', 'Hello, everyone!')
send_message('chatroom:1', 'How are you?')
总结
在这个示例中,Redis的Pub/Sub功能使得实时聊天应用变得非常简单和高效。客户端可以轻松地订阅和接收消息,而无需复杂的网络配置。这种模式不仅适用于聊天应用,还可以用于各种需要实时消息传递的场景,如实时通知、日志聚合、事件驱动架构等。
通过这个例子,读者可以清楚地看到Redis的Pub/Sub功能在实际项目中的强大作用和实用性。希望这能帮助你更好地讲解这个功能的重要性!