1 Redis 发布订阅
1.1 概述
发布订阅模式(Publish-Subscribe Pattern)是一种消息传递模式,其基本原理是消息的发送者(发布者)不会直接发送消息给特定的接收者(订阅者),而是将消息分成不同的类别(频道),然后将消息发送给订阅了这些类别的所有接收者。发布订阅模式在分布式系统中广泛应用,例如实时消息推送、日志收集等。
在 Redis 中,发布订阅模式有两个主要的角色:发布者和订阅者。发布者通过 PUBLISH 命令向指定的频道发送消息,而订阅者则通过 SUBSCRIBE 命令订阅/取消订阅指定的频道,并通过监听器(Callback)接收到发布者发送的消息。
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:
当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:
1.2 基本原理
Redis 发布订阅是一种消息通信模式,通过这种模式可以让多个客户端之间进行消息的发布和订阅。Redis 提供了以下几个命令来实现发布订阅的功能:
- PUBLISH channel message:将消息 message 发送到指定的频道 channel 中,返回值为接收到消息的订阅者数量。
- SUBSCRIBE channel [channel …]:订阅一个或多个频道 channel,每当有新消息发布到订阅的频道时,就会收到相应的消息。
- UNSUBSCRIBE [channel [channel …]]:取消订阅一个或多个频道 channel,如果不指定 channel,则取消订阅所有频道。
- PSUBSCRIBE pattern [pattern …]:订阅一个或多个符合指定模式 pattern 的频道,每当有新消息发布到符合模式的频道时,就会收到相应的消息。
- PUNSUBSCRIBE [pattern [pattern …]]:取消订阅一个或多个符合指定模式 pattern 的频道,如果不指定 pattern,则取消订阅所有模式。
- PUBSUB subcommand [argument [argument …]]:查看订阅与发布系统状态,可以用来获取订阅与发布系统的各种信息,比如订阅者数量、频道列表等等。 其中,PUBLISH命令用于向指定的频道发布消息,SUBSCRIBE 命令用于订阅一个或多个频道,PSUBSCRIBE命令用于订阅一个或多个符合指定模式的频道,PUBSUB 命令用于查看订阅与发布系统状态。
1.2.1 基于频道(Channel)的发布/订阅
Redis 中的频道(Channel)相当于消息的分类,一个频道可以有多个订阅者,而一个订阅者也可以订阅多个频道。在 Redis 中,通过 PUBLISH 命令向指定的频道发送消息,而通过 SUBSCRIBE 命令来订阅/取消订阅指定的频道,并通过监听器接收到发布者发送的消息。
以下是 Redis 命令行界面中基于频道的发布/订阅示例:
# 订阅频道
127.0.0.1:6379> SUBSCRIBE news
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news"
3) (integer) 1
# 发布消息
127.0.0.1:6379> PUBLISH news "Hello, world!"
(integer) 1
# 订阅者接收到消息
1) "message"
2) "news"
3) "Hello, world!"
在 Redis 中,发布/订阅模式的实现基于 Redis 的事件机制,即订阅者通过执行 SUBSCRIBE 命令将自己的监听器添加到 Redis 服务器的事件循环器中,当发布者通过 PUBLISH 命令向指定频道发送消息时,Redis 服务器会将消息发送给监听该频道的所有订阅者。
具体来说,Redis 服务器会维护一个事件循环器,并在其中注册所有客户端的监听器。当客户端通过 SUBSCRIBE 命令订阅某个频道时,Redis 服务器会将该客户端的监听器添加到与该频道相关的事件处理器中,并在事件循环器中注册该事件处理器。当发布者通过 PUBLISH 命令向指定频道发送消息时,Redis 服务器会将消息发送给与该频道相关的事件处理器中的所有监听器,从而实现消息的发布和订阅。
1.2.2 基于模式(Pattern)的发布/订阅
Redis 还支持基于模式(Pattern)的发布/订阅,模式是一种特殊的频道,它可以匹配一个或多个频道。在 Redis 中,通过 PSUBSCRIBE 命令订阅/取消订阅匹配指定模式的频道,并通过监听器接收到发布者发送的消息。
以下是 Redis 命令行界面中基于模式的发布/订阅示例:
# 订阅模式
127.0.0.1:6379> PSUBSCRIBE news.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "news.*"
3) (integer) 1
# 发布消息
127.0.0.1:6379> PUBLISH news.world "Hello, world!"
(integer) 1
# 订阅者接收到消息
1) "pmessage"
2) "news.*"
3) "news.world"
4) "Hello, world!"
基于模式的发布/订阅与基于频道的发布/订阅实现原理类似,只是在订阅时可以使用通配符(*)匹配多个频道,从而实现更加灵活的消息过滤和订阅。
具体来说,当客户端通过 PSUBSCRIBE 命令订阅某个模式时,Redis 服务器会将该客户端的监听器添加到所有与该模式匹配的频道相关的事件处理器中,并在事件循环器中注册该事件处理器。当发布者通过 PUBLISH 命令向与匹配该模式的频道发送消息时,Redis 服务器会将消息发送给与该模式相关的事件处理器中的所有监听器,从而实现基于模式的消息发布和订阅。
1.3 Redis 发布订阅命令
下表列出了 redis 发布订阅常用命令:
序号 | 命令及描述 |
1 | PSUBSCRIBE pattern [pattern ...] 订阅一个或多个符合给定模式的频道。 |
2 | PUBSUB subcommand [argument [argument ...]] 查看订阅与发布系统状态。 |
3 | PUBLISH channel message 将信息发送到指定的频道。 |
4 | PUNSUBSCRIBE [pattern [pattern ...]] 退订所有给定模式的频道。 |
5 | SUBSCRIBE channel [channel ...] 订阅给定的一个或多个频道的信息。 |
6 | UNSUBSCRIBE [channel [channel ...]] 指退订给定的频道。 |
2 Redis 发布订阅实际应用
2.1 Redis Sentinel
Redis Sentinel 是 Redis 的一套高可用方案,在主节点故障时可以自动将从节点提升为主节点,从而实现故障转移。Redis Sentinel使用发布订阅机制来实现新节点的发现以及交换主节点之间的状态,并且客户端也可以通过订阅特定频道来获取主节点故障转移的状态信息。
在 Redis Sentinel 中,每个 Sentinel 节点都会定期向 sentinel:hello 频道发送消息,并且每个 Sentinel 节点也都会订阅这个频道,这样一旦有节点往这个频道发送消息,其他节点就可以立刻收到消息,并且将该节点加入本地节点列表。此外,每次往这个频道发送消息时,可以包含节点的状态信息,作为后续 Sentinel 领导者选举的依据。
对于客户端来说,可以通过订阅 +switch-master 频道来获取主节点故障转移的状态信息。一旦 Redis Sentinel 完成了主节点故障转移,就会发布主节点的消息,客户端可以接收到该消息并及时切换到新的主节点上,从而保证系统的可靠性和可用性。
Redis Sentinel 节点主要使用发布订阅机制,实现新节点的发现,以及交换主节点的之间的状态。 如上图所示,每一个 Sentinel 节点将会定时向 sentinel:hello 频道发送消息,并且每个 Sentinel 都会订阅这个节点。 这样一旦有节点往这个频道发送消息,其他节点就可以立刻收到消息。
这样一旦有的新节点加入,它往这个频道发送消息,其他节点收到之后,判断本地列表并没有这个节点,于是就可以当做新的节点加入本地节点列表。
除此之外,每次往这个频道发送消息内容可以包含节点的状态信息,这样可以作为后面 Sentinel 领导者选举的依据。
以上都是对于 Redis 服务端来讲,对于客户端来讲,我们也可以用到发布订阅机制。
当 Redis Sentinel 进行主节点故障转移,这个过程各个阶段会通过发布订阅对外提供。
对于我们客户端来讲,比较关心切换之后的主节点,这样我们及时切换主节点的连接(旧节点此时已故障,不能再接受操作指令),
客户端可以订阅 +switch-master频道,一旦 Redis Sentinel 结束了对主节点的故障转移就会发布主节点的的消息。
2.2 SpringBoot Redis发布/订阅
在 Spring Boot 中,可以通过 Spring Data Redis 提供的 RedisMessageListenerContainer 和 RedisTemplate 类来实现 Redis 的发布/订阅功能。具体步骤如下:
1. 添加 Redis 和 Spring Data Redis 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2. 配置 Redis 连接信息:
spring.redis.host=localhost
spring.redis.port=6379
3. 创建 RedisMessageListenerContainer 和 RedisTemplate 实例:
@Configuration
public class RedisConfig {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Bean
public RedisMessageListenerContainer messageListenerContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
return container;
}
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
template.setDefaultSerializer(new StringRedisSerializer());
return template;
}
}
4. 创建消息监听器:
@Component
public class MessageListener implements MessageListenerAdapter {
@Override
public void onMessage(Message message, byte[] pattern) {
String msg = (String) redisTemplate().getValueSerializer().deserialize(message.getBody());
System.out.println("Received message: " + msg);
}
}
5. 订阅消息:
@Autowired
private RedisMessageListenerContainer container;
@Autowired
private MessageListener listener;
@PostConstruct
public void subscribe() {
container.addMessageListener(listener, new PatternTopic("news.*"));
}
6. 发布消息:
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void publish() {
redisTemplate.convertAndSend("news.world", "Hello, world!");
}
在使用 Redis 发布/订阅模式时,需要考虑订阅者的并发处理能力、消息序列化和反序列化等问题,以保证系统的可靠性和性能。
参考链接
Redis 发布订阅 | 菜鸟教程
Redis发布订阅模式(publish/subscribe)
Redis从入门到精通【进阶篇】之消息传递发布订阅模式详解_redis发布订阅模式_冰点.的博客-CSDN博客
Redis进阶——发布订阅详解
redis的发布订阅模式