序言
Redis的发布订阅(Pub/Sub
)是一种消息通信模式,允许发布者(Publisher
)发送消息到频道(Channel
),而订阅者(Subscriber
)可以订阅一个或多个频道来接收消息。
Redis 的发布订阅功能命令:PUBLISH
、SUBSCRIBE
、PSUBSIRIBE
等。
基础使用
频道订阅
执行SUBSCRIBE
命令,客户端可以订阅一个或多个频道,从而成为这些频道的订阅者,当其他客户端向被订阅的频道发送消息时,频道的所有订阅者都会受到该消息。
假设客户端 A 和客户端 B 都执行了SUBSCRIBE news.it
,那么他们就是频道news.it
的订阅者。
消息发布
假设客户端 C执行 PUBLISH news.it hello
,向频道news.it
发送消息,那么客户端 A 和客户端 B 都会收到消息。
频道退订
当客户端 B 执行UNSUBSCRIBE news.it
命令,则它与频道之间的订阅关系就被解除了,当该频道再有消息时,客户端 B 就收不到消息了。
实现原理
SUBSCRIBE
当客户端执行 SUBSCRIBE
命令订阅某个频道的时候,则客户端就与被订阅的频道之间建立了一种订阅关系。
Redis 是将所有频道的订阅关系都保存在服务器状态的pubsub_channels
字典里。
- 键:被订阅频道的名称;
- 值:一个链表,记录了所有订阅这个频道的客户端。
比如上图字典就记录了 - client-1、client-2、client-3 订阅了 news.it 频道;
- client-4 订阅了 news.sport 频道;
- client-5、client-6 订阅了 news.business 频道。
那么在内部,这些订阅关系是怎么维护的呢?
根据频道是否有其他订阅者,分为两种情况:
- 如果频道有其他订阅者,那么它在
pubsub_channel
字典中必然有相对应的订阅者链表,那么就将新客户端添加到链表的尾部; - 如果频道没有订阅者,那么它在
pubsub_channel
字典中就不存在,首先要在pubsub_channel
字典中为该频道创建一个键,并将这个键的值设置为空链表,然后将客户端添加到链表的头节点。
伪代码:
public class SubscribeDemo {
private static Map<String, LinkedList> map = new HashMap<>();
public static void main(String[] args) {
subscribe(Arrays.asList("news.it", "news.support"), "c1");
subscribe(Arrays.asList("news.it"), "c2");
map.forEach((k,v) -> {
System.out.println("频道:" + k +" 订阅者:" +v);
});
}
private static void subscribe(List<String> channels, String client) {
for (String channel : channels) {
if (map.containsKey(channel)) {
LinkedList linkedList = map.get(channel);
linkedList.addLast(client);
} else {
LinkedList<Object> node = new LinkedList<>();
node.addFirst(client);
map.put(channel, node);
}
}
}
}
UNSUBSCRIBE
当客户端退订频道的时候,服务器将从pubsub_channels
中解除客户端与被退订频道之间的关联。
- 根据被退订频道的名字,从字典中找到对应的订阅者链表,从链表中删除退订客户端的信息;
- 删除退订客户端后,如果频道的订阅者链表为空,则说明该频道已经没有任何订阅者了,则从字典中删除该频道信息。
比如此时客户端 4 执行 UNSUBSCRIBE news.sport
那么执行之后的字典信息是
伪代码:
private static void unsubscribe(List<String> channels, String client) {
for (String channel : channels) {
System.out.println(client + " 退定频道:" + channel);
if (map.containsKey(channel)) {
LinkedList subscribers = map.get(channel); // 该频道的订阅者列表
int index = subscribers.indexOf(client); // 查找退订的客户端
subscribers.remove(index); // 从链表中删除
if (subscribers.isEmpty()) {
//如果该频道的订阅者为空,则从字典中删除
map.remove(channel);
}
}
}
}
PUBLISH
当客户端执行PUBLISH <channel> <message>
命令将消息发送给频道channel
的时候,则服务器需要执行以下操作:
- 将消息
message
发送给channel
频道的所有订阅者;
private static void publish(String channel, String message) {
if (!map.containsKey(channel)) {
return;
}
LinkedList nodes = map.get(channel);
for (Object client : nodes) {
System.out.println("给客户端:" + client + " 发送消息:" + message);
}
}
完整伪代码
public class SubscribeDemo {
private static Map<String, LinkedList> map = new HashMap<>();
public static void main(String[] args) {
subscribe(Arrays.asList("news.it", "news.support"), "c1");
subscribe(Arrays.asList("news.it"), "c2");
map.forEach((k, v) -> {
System.out.println("频道:" + k + " 订阅者:" + v);
});
System.out.println("-------------------");
unsubscribe(Arrays.asList("news.support"), "c1");
map.forEach((k, v) -> {
System.out.println("频道:" + k + " 订阅者:" + v);
});
System.out.println("-------------------");
publish("news.it", "测试消息");
}
private static void subscribe(List<String> channels, String client) {
for (String channel : channels) {
if (map.containsKey(channel)) {
//如果订阅的频道在字典中,则将新的客户端添加到链表的尾部
LinkedList linkedList = map.get(channel);
linkedList.addLast(client);
} else {
//如果订阅的频道在字典中不存在,则将新的客户端添加的链表的头部
LinkedList<Object> node = new LinkedList<>();
node.addFirst(client);
map.put(channel, node);
}
}
}
private static void unsubscribe(List<String> channels, String client) {
for (String channel : channels) {
System.out.println(client + " 退定频道:" + channel);
if (map.containsKey(channel)) {
LinkedList subscribers = map.get(channel); // 该频道的订阅者列表
int index = subscribers.indexOf(client); // 查找退订的客户端
subscribers.remove(index); // 从链表中删除
if (subscribers.isEmpty()) {
//如果该频道的订阅者为空,则从字典中删除
map.remove(channel);
}
}
}
}
private static void publish(String channel, String message) {
//如果频道不在字典中,返回
if (!map.containsKey(channel)) {
return;
}
LinkedList nodes = map.get(channel);
//遍历频道的订阅者列表,并发送消息
for (Object client : nodes) {
System.out.println("给客户端:" + client + " 发送消息:" + message);
}
}
}
使用场景
- 实时消息系统:如聊天应用、新闻更新推送;
- 事件通知:如用户行为触发的通知,订单状态变更通知;
- 分布式系统中的数据同步:如数据库的主从复制状态同步。
优缺点
优点
- 简单易用:通过简单的命令即可实现发布和订阅功能;
- 低延迟:消息传递速度快,适用于需要快速响应的场景;
- 可扩展性:可以轻松地添加更多的订阅者。
缺点
- 消息无持久化:Redis不会存储发布的消息,如果订阅者不在线,将错过消息;
- 资源消耗:每个订阅者都需要维护一个与Redis服务器的连接,可能会导致资源消耗;
- 缺乏高级功能:与专业的分布式消息队列系统相比,缺乏消息确认、持久化、事务、死信队列等高级功能。
总结
Redis的发布订阅模式是一种轻量级的消息传递机制,适用于需要快速、简单消息传递的场景。然而,对于需要高可靠性、高吞吐量和复杂消息处理能力的场景,可能需要考虑使用专业的分布式消息队列系统。