文章目录
- 参考
- 消息队列
- list
- 源码
- pub/sub
- 源码
参考
https://www.cnblogs.com/uniqueDong/p/15904837.html
https://www.cnblogs.com/wzh2010/p/17205390.html
https://blog.csdn.net/qq_16557637/article/details/121015736
https://developer.aliyun.com/article/1095035
https://blog.csdn.net/sco5282/article/details/132904956
消息队列
消息队列可以实现消息解耦、消息路由、异步处理、流量削峰填谷。主流消息队列有kafka, rabbitmq, rocketmq
。
Redis也可以实现消息队列。方式有
- list
- pub/sub
- stream
list
redis的list底层是链表,满足先进先出。
list实现队列比较方便。同时可以满足有序,消息去重。缺点是
- 没有订阅功能,消费者要主动查询队列。而为了避免频繁查询队列消耗CPU资源,可以采用阻塞式查询。redis中阻塞查询命令是
brpop
。 - 无法保证可靠性。缺少消息确认机制,无法及时感知遗漏消息,导致数据不一致。
源码
完整项目在https://gitcode.com/zsss1/redis_mq/overview
在pom.xml
添加redisson
依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.40.2</version>
</dependency>
Redission
封装依赖。
@SpringBootTest(classes = DemoApplication.class)
public class RedisListTest {
@Autowired
private RedissonClient client;
private static final String REDIS_QUEUE = "list_queue";
private static final Logger LOGGER = LoggerFactory.getLogger(RedisListTest.class);
@Test
public void test_redis_list_mq() throws Exception {
RedissonBlockingDeque r;
new Thread(() -> {
for (int i = 0; i < 10; i++) {
producer("message" + i);
}
}).start();
new Thread(() -> {
consumer();
}).start();
Thread.currentThread().join();
}
// 消费者,阻塞
public void consumer() {
RBlockingDeque<String> deque = client.getBlockingDeque(REDIS_QUEUE);
boolean isCheck = true;
while (isCheck) {
try {
String message = deque.takeLast();
System.out.println("consumer: " + message);
} catch (InterruptedException e) {
LOGGER.error("consumer failed, cause: {}", e.getMessage());
}
}
}
// 生产者
public void producer(String message) {
RBlockingDeque<String> deque = client.getBlockingDeque(REDIS_QUEUE);
System.out.println(deque.getClass());
try {
deque.putFirst(message);
} catch (InterruptedException e) {
LOGGER.error("producer failed, msg: {}, cause: {}", message, e.getMessage());
}
}
}
pub/sub
发布订阅模式是一种消息传递模式。发送者将消息发送到频道,订阅者订阅频道即可及时收到消息。
它支持组生产者与消费者。但是它会丢失消息。
Redis在server端为每个消费者保留一块内存区域,存储该消费者订阅的数据。如果消费者处理速度慢,内存区域满了,那么Redis会断开消费者连接,这会导致消息丢失。
源码
- 定义频道。
public class TopicChannel {
public static final String SEND_PHONE = "send_phone";
public static final String SEND_EMAIL = "send_email";
}
- 定义监听频道的订阅者。分清
org.springframework.data.redis.connection.MessageListener
与org.redisson.api.listener.MessageListener
。
public class MyMessageListener implements MessageListener {
private static Map<String, Consumer<String>> RULE = new HashMap<>();
static {
RULE.put(TopicChannel.SEND_EMAIL, MyMessageListener::sendEmail);
RULE.put(TopicChannel.SEND_PHONE, MyMessageListener::sendPhone);
}
public static void sendEmail(String msg) {
System.out.println("listen email:" + msg);
}
public static void sendPhone(String msg) {
System.out.println("listen phone:" + msg);
}
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] byteChannel = message.getChannel();
byte[] byteBody = message.getBody();
try {
String channel = new String(byteChannel);
String body = new String(byteBody);
System.out.println("channel: + " + channel + ", body: " + body);
RULE.get(channel).accept(body);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
- 在redis注册订阅者。
@Component
public class RedisConfig {
@Bean
public MessageListenerAdapter messageListenerAdapter() {
return new MessageListenerAdapter(new MyMessageListener());
}
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// messageListenerAdapter 订阅 SEND_EMAIL 频道
container.addMessageListener(messageListenerAdapter, new PatternTopic(TopicChannel.SEND_EMAIL));
// messageListenerAdapter 订阅 SEND_PHONE 频道
container.addMessageListener(messageListenerAdapter, new PatternTopic(TopicChannel.SEND_PHONE));
return container;
}
}
- 测试
@SpringBootTest(classes = DemoApplication.class)
public class MyListener {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Test
public void test_pub() {
redisTemplate.convertAndSend(TopicChannel.SEND_EMAIL, "pub email message");
redisTemplate.convertAndSend(TopicChannel.SEND_PHONE, "pub phone message");
}
}
测试结果
channel: + send_email, body: pub email message
listen email:pub email message
channel: + send_phone, body: pub phone message
listen phone:pub phone message