文章目录
- 1. 前言
- 2. 消息队列(发布和订阅)
- 应用场景:消息通知、Websocket集群
- 3. WebSocket集群示例
1. 前言
接着上一篇Redis那些事儿(一) ,这一篇主要介绍Redis的发布和订阅功能,可以实现高效的消息通信和事件驱动架构。Redis的发布和订阅功能是一种消息传递模式,通过该功能可以实现消息的广播和订阅。Redis的发布和订阅功能是异步的,发布者和订阅者之间没有直接的通信。当发布者将消息发送到频道后,Redis会将消息传递给所有订阅了该频道的订阅者。订阅者会在接收到消息后立即进行处理,而不需要发布者等待。
2. 消息队列(发布和订阅)
应用场景:消息通知、Websocket集群
这种发布和订阅模式可以用于实现实时消息推送、事件通知、分布式系统之间的信息交互等场景。Redis的发布和订阅功能,可以实现高效的消息通信和事件驱动架构,作为轻量级的消息中间件,从一定程度上可以替代RabbitMQ、ActiveMQ等主流消息中间件的部分功能,非常好用,示例代码如下:
基础发布订阅实体类
用于存放消息、用户ID等信息,广播实体不需要用户ID
@Data
public class SendMsgAll {
//发布业务数据(json字符串)
private String msg;
}
@Data
public class SendMsg {
//发布业务数据(json字符串)
private String msg;
//用户ID
private String userId;
}
发布者
消息发布订阅的发起方,依赖注入StringRedisTemplate工具类,提供两个方法分别用于广播发送和一对一发送,使用convertAndSend方法进行消息发送处理。此处定义了两个channel,分别是all-topic和single-topic,后期在订阅者中会通过这个channel进行绑定。
@Component
public class RedisPublisher {
@Autowired
private StringRedisTemplate redisTemplate;
public void sendMessageAll(String message) {
SendMsgAll sendMsgAll = new SendMsgAll();
sendMsgAll.setMsg(message);
redisTemplate.convertAndSend("all-topic", JSON.toJSONString(sendMsgAll));
}
public void sendMessageById(String userId, String message) {
SendMsg sendMsg = new SendMsg();
sendMsg.setUserId(userId);
sendMsg.setMsg(message);
redisTemplate.convertAndSend("single-topic", JSON.toJSONString(sendMsg));
}
}
订阅者
消息发布订阅的接收方,定义两个方法,分别对应发布者的广播发送方法和一对一发送方法
@Component
public class RedisSubscriber {
/**
* 处理一对一消息
*/
public void sendMsg(String message) {
SendMsg msg = JSON.parseObject(message, SendMsg.class);
//TODO 一对一业务处理
}
/**
* 处理广播消息
*/
public void sendAllMsg(String message){
SendMsgAll msg = JSONObject.parseObject(message, SendMsgAll.class);
//TODO 广播业务处理
}
}
订阅者(发布、订阅关系绑定配置)
与订阅者类RedisSubscriber必须在一个微服务下,与发布者类RedisPublisher不一定在同一个微服务下。添加redis消息队列监听,监听channel消息主题的消息,使用messageListenerAdapter()中设置的类和方法处理消息,添加订阅消息处理类,通过反射获取处理类中的处理方法。
@Configuration
public class RedisMessageListenerConfig {
@Autowired
private RedisSubscriber redisSubscriber;
/**
* 监听redis中的订阅信息
*/
@Bean
public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
//监听single-topic消息主题的消息
redisMessageListenerContainer.addMessageListener(messageListenerAdapter(), new PatternTopic("single-topic"));
//监听all-topic消息主题的消息
redisMessageListenerContainer.addMessageListener(messageAllListenerAdapter(), new PatternTopic("all-topic"));
return redisMessageListenerContainer;
}
/**
* 订阅消息处理类,通过反射获取处理类中的sendMsg方法
*/
@Bean
public MessageListenerAdapter messageListenerAdapter() {
return new MessageListenerAdapter(redisSubscriber, "sendMsg");
}
/**
* 订阅消息处理类,通过反射获取处理类中的sendAllMsg方法
*/
@Bean
public MessageListenerAdapter messageAllListenerAdapter(){
return new MessageListenerAdapter(redisSubscriber, "sendAllMsg");
}
}
3. WebSocket集群示例
由于WebSocket是长连接,session保存在一个Server中,所以在不同Server在使用WebSocket推送消息时就需要获取对应的session进行推送,在分布式系统中就无法获取到所有session。这是Websocket集群中遇到的主要问题,这里就需要使用一个中间件将消息推送到各个系统中,使用Redis的sub/pub功能就可以完美解决。此时,消息发布者位于待推送生产消息微服务中,消息订阅者位于集群下的各个WebSocket服务中。