前言
博主在学习 Redis 实现发布订阅功能的时候,踩了太多的坑。
不是讲解不详细,看的一知半解;就是代码有问题,实际压根跑不起来!
于是博主萌生了自己写一个最新版且全程无错的博客供各位参考。希望各位不要把我才过的坑再踩一遍。(实战篇的所有代码均由本人测试,全程无Bug。)
废话不多说,让我们进入实战篇的学习!
在开始实战篇的之前,我们先一起回顾下原理篇的内容。
Redis 发布/订阅的优点
- 高性能:Redis 作为内存存储,具备极高的读写性能,能够快速处理发布和订阅消息。
- 简单易用:Redis 的发布/订阅接口简单,易于集成和使用。
- 实时性强:发布的消息会立即传递给所有订阅者,具备高实时性。
Redis 发布/订阅的缺点
- 消息丢失:由于 Redis 是内存存储,如果 Redis 实例宕机,未处理的消息可能会丢失。
- 无法持久化:Redis 的发布/订阅模式不支持消息持久化,无法存储和检索历史消息。
- 订阅者不可控:发布者无法控制订阅者的数量和状态,无法保证所有订阅者都能接收到消息。
- 无确认机制:发布者无法确认消息是否被订阅者接收和处理。
正如上述中Redis
的缺点,Redis
的发布订阅功能并不可靠,如果我们需要保证消息的可靠性、包括确认、重试等要求,我们还是要选择使用MQ
实现发布订阅。
Redis的发布/订阅应用场景:
- 对于消息处理可靠性要求不强
- 消息无需持久化
- 消费能力无需通过增加消费方进行增强
- 架构简单 中小型系统不希望应用过多中间件
Redis发布订阅命令
命令 | 描述 |
Redis Unsubscribe 命令 | 指退订给定的频道 |
Redis Subscribe 命令 | 订阅给定的一个或多个频道的信息 |
Redis Pubsub 命令 | 查看订阅与发布系统状态 |
Redis Punsubscribe 命令 | 退订所有给定模式的频道 |
Redis Publish 命令 | 将信息发送到指定的频道 |
Redis Psubscribe 命令 | 订阅一个或多个符合给定模式的频道 |
正式开始
一、添加依赖
首先,确保你已经安装并配置好 Redis 服务器,并创建了 Spring Boot 项目,在pom.xml中引入依赖。
<!-- 所需依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
二、配置文件中配置Redis
spring:
# 项目名称
application:
name: test-redis-boot
# Redis 配置
data:
redis:
host: 填写自己的主机IP
port: 8000
password: 有则填,没有去掉这个属性
database: 1
# 连接超时时间
timeout: 10s
lettuce:
pool:
# 连接池中的最小空闲连接
min-idle: 5
# 连接池中的最大空闲连接
max-idle: 8
# 连接池的最大数据库连接数
max-active: 20
# #连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1ms
三、创建Redis配置类
创建一个配置类,用于配置 Redis
连接工厂和消息监听器监听通道信息。
注:此配置类无需死记硬背。只需大致了解每个方法的作用即可。
/**
* @Description Redis 配置类,用于配置 Redis 连接工厂和消息监听器监听通道信息
* @Author gongming.Zhang
* @Date 2024/9/11 18:27
* @Version 1.0
*/
@Configuration
@Slf4j
public class RedisConfig {
/**
* 自定义 RedisTemplate 序列化方式
*
* @param redisConnectionFactory Redis 连接的线程安全工厂
* @return 模板类
*/
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
// 绑定 RedisConnectionFactory
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 创建 Jackson2JsonRedisSerializer 序列方式,对象类型使用 Object 类型。
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(new LaissezFaireSubTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(objectMapper, Object.class);
// 设置 RedisTemplate 序列化规则,因为 key 通常是普通的字符串,所以使用StringRedisSerializer即可,而 value 是对象时,才需要使用序列化与反序列化。
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
// hash key 序列化规则
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
// 属性设置后操作
redisTemplate.afterPropertiesSet();
log.info("RedisTemplate 自定义序列化配置完毕...");
return redisTemplate;
}
/**
* 配置主题订阅
*
* 可以添加多个监听器,监听多个通道,只需要将消息监听器与订阅的通道/主题绑定即可。
* addMessageListener(MessageListener listener, Collection<? extends Topic> topics):将消息监听器与多个订阅的通道/主题绑定
* addMessageListener(MessageListener listener, Topic topic):将消息监听器与订阅的通道/主题绑定
*
* @param connectionFactory Redis 连接的线程安全工厂
* @return 容器对象
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 设置连接工厂,RedisConnectionFactory 可以直接从容器中取,也可以从 RedisTemplate 中取
container.setConnectionFactory(connectionFactory);
// 订阅名称叫 cache 的通道, 类似 Redis 中的 subscribe 命令
container.addMessageListener(listenerAdapter, new ChannelTopic("cache"));
// 订阅名称以 'test-' 开头的全部通道, 类似 Redis 的 pSubscribe 命令
container.addMessageListener(listenerAdapter, new PatternTopic("test-*"));
log.info("消息监听器和通道绑定完毕...");
return container;
}
/**
* 配置消息监听适配器
*
* @param redisReceiveListener
* @return
*/
@Bean
public MessageListenerAdapter listenerAdapter(RedisReceiveListener redisReceiveListener) {
return new MessageListenerAdapter(redisReceiveListener);
}
}
四、创建消息发送服务端
创建一个消息发布类,用于向客户端发布消息。
/**
* @Description 消息发布服务端
* @Author gongming.Zhang
* @Date 2024/9/12 9:42
* @Version 1.0
*/
@Component
@Slf4j
public class MessagePublisher {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
/**
* 服务端发布消息
*
* @param channel 通道名
* @param message 待发送的消息
*/
public void sendMessage(String channel, String message) {
redisTemplate.convertAndSend(channel, message);
log.info("消息发送成功... channel={}, message={}", channel, message);
}
}
五、创建监听器(客户端)
用于监听服务端发送的消息,每次服务端发送新消息时,都会自动调用onMessage()
方法。
/**
* @Description Redis 消息监听器
* @Author gongming.Zhang
* @Date 2024/9/11 18:53
* @Version 1.0
*/
/*
当收到订阅的消息时,会将消息交给这个类处理。
* 可以直接实现 MessageListener 接口,也可以继承它的实现类 MessageListenerAdapter.
* 自动多线程处理,打印日志即可看出,即使手动延迟,也不会影响后面消息的接收。
*/
@Component
@Slf4j
public class RedisReceiveListener implements MessageListener {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
/**
* 处理回调逻辑。每次新消息到达时,都会调用此方法。通过 onMessage 方法执行业务代码
* <p>
* 该接口不仅可以访问实际消息,还可以访问接收消息的频道(Channel),以及订阅时用于匹配频道(Channel)的模式。
* 此信息使被调用者不仅可以通过内容区分各种消息,还可以检查其他详细信息。
*
* @param message 消息对象,不能为 null
* @param pattern 与通道匹配的模式(如果指定),可以为 null
*/
@Override
public void onMessage(Message message, byte[] pattern) {
// 1.获取消息所属的通道 --> 首先获取 字符串序列化器,再从给定的二进制数据中反序列化对象。
String channel = redisTemplate.getStringSerializer().deserialize(message.getChannel());
// 2.获取客户端发送的消息内容 --> 后期可以根据自己项目中 消息 的类型,来确定用什么序列化器
Object msg = redisTemplate.getValueSerializer().deserialize(message.getBody());
log.info("收到Redis订阅消息: channel={} msg={}", channel, msg);
}
}
六、编写Controller测试
/**
* @Description 测试订阅发布功能
* @Author gongming.Zhang
* @Date 2024/9/12 10:13
* @Version 1.0
*/
@RestController
@RequestMapping(value = "/api/v1/publish")
public class PublisherController {
@Autowired
private MessagePublisher publisher;
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message, @RequestParam(value = "channel") String channel) {
publisher.sendMessage(channel, message);
return "Message published: " + message;
}
}
七、测试响应数据
至此,我们 SpringBoot 整合 Redis 实现发布订阅功能已经完成!
总结
通过本文,我们详细介绍了如何在 SpringBoot 中整合 Redis 实现发布/订阅功能,并提供了详细的代码示例。Redis 发布/订阅模式以其高性能和简单易用的特点,在实时消息传递场景中有着广泛的应用,但同时也需要注意其消息丢失和无法持久化等缺点,需要根据实际业务需求选择。