SpringBoot下Redis消息队列(基于发布订阅模型)
1. 什么是生产者/消费者模式?
消息队列一般是有两种场景
1、种是发布者订阅者模式
2、种是生产者消费者模式
生产者消费者模式 :生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息;即对于每个消息只能被最多一个消费者拥有。
发布者订阅者模式:发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息;即正常情况下每个消费者收到的消息应该都是一样的。
2. 用Redis 实现发布订阅模型
此处演示用Redis实现的发布订阅模型
也可以用此模型广播消息时, 在消息接收方加入Redis分布式锁 实现单一消费者消费
2.1. 新建通道枚举类RedisMqChannelEnum
import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
/**
* @Author:
* @Date:2023/7/3 16:17
* @Des: MessageChannelEnum 消息队列通道枚举
*/
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ApiModel("消息队列通道枚举类")
public enum RedisMqChannelEnum {
WEBSOCKET_MSG_NOTIFY("channelId-websocket-notify", "websocket消息推送广播通道");
private String channelId;
private String channelString;
}
2.2. 消息队列通道注册类 RedisMqMessageListenerConfig
import com.sinotrans.gtp.enums.RedisMqChannelEnum;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
/**
* @Author:
* @Date:2023/7/3 16:11
* @Des: RedisMessageListenerConfig 消息队列通道注册
*/
@Configuration
public class RedisMqMessageListenerConfig {
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,
RedisMqMessageReceiver redisMqMessageReceiver
) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(redisMqMessageReceiver, new PatternTopic(RedisMqChannelEnum.WEBSOCKET_MSG_NOTIFY.getChannelId())); // 订阅指定频道的消息
// ...可监听多个通道
return container;
}
}
2.3. 消息发布类 RedisMqMessagePublisher
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
/**
* @Author:
* @Date:2023/7/3 16:10
* @Des: MessagePublisher
*/
@Service
@Slf4j
public class RedisMqMessagePublisher {
private final RedisTemplate<String, String> redisTemplate;
public RedisMqMessagePublisher(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* 往消息队列发布消息
*
* @param channel 通道名称
* @param message 消息
*/
public void publishMessage(String channel, String message) {
log.info("消息队列产生消息: channel:{}, message:{}", channel, message);
redisTemplate.convertAndSend(channel, message);
}
}
2.4. 消息订阅类RedisMqMessageReceiver
import com.sinotrans.gtp.entity.TransmissionReport;
import com.sinotrans.gtp.enums.RedisMqChannelEnum;
import com.sinotrans.gtp.service.CallService;
import com.sinotrans.gtp.utils.GsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
/**
* @Author:
* @Date:2023/7/3 16:07
* @Des: RedisMessageReceiver 消息接收
*/
@Component
@Slf4j
public class RedisMqMessageReceiver implements MessageListener {
@Autowired
private CallService callService;
@Override
public void onMessage(Message message, byte[] pattern) {
// 处理接收到的消息
String channel = new String(message.getChannel());
String body = new String(message.getBody());
log.info("消息队列消费消息: channel:{}, message:{}", channel, body);
// 根据通道名称执行相应的逻辑
if (RedisMqChannelEnum.WEBSOCKET_MSG_NOTIFY.getChannelId().equals(channel)) {
log.info("转发至通道: {}", RedisMqChannelEnum.WEBSOCKET_MSG_NOTIFY.getChannelString());
// 处理 websocket消息推送广播通道 的消息
// callService.sendFrontClientMsg(GsonUtils.fromJson(body, TransmissionReport.class));
}
// ...
}
}
2.5. 使用
@Autowired
private RedisMqMessagePublisher redisMqMessagePublisher;
// 调用消息队列模型进行发布消息
redisMqMessagePublisher.publishMessage(RedisMqChannelEnum.WEBSOCKET_MSG_NOTIFY.getChannelId(), transmissionReportJson);