RabbitMQ
【黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战】
文章目录
- RabbitMQ
- 第二天 高级
- 7 RabbitMQ 高级特性
- 7.6 延迟队列
- 7.6.1 延迟队列概述
- 7.6.2 代码实现
- 7.6.3 小结
第二天 高级
7 RabbitMQ 高级特性
7.6 延迟队列
7.6.1 延迟队列概述
【重点】
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
常见的需求:
- 下单后,30分钟未支付,取消订单,回滚库存。
- 新用户注册成功7天后,发送短信问候。
- …
[实现方式]
- 定时器【这种实现方式 其实并不优雅,定多长时间 不好确定、有误差,体验不好】
- 延迟队列
【优雅】
【但是非常 可惜的是,在RabbitMQ中并未提供延迟队列功能。】
但是!!!!!!可以使用:TTL+死信队列 组合实现延迟队列的效果。
【没毛病】
7.6.2 代码实现
修改生产端 的核心配置文件
<!-- 延迟队列
1. 定义正常交换机【order_exchange】和队列【order_queue】
2. 定义死信交换机【order_exchange_dlx】和队列【order_queue_dlx】
3. 绑定。设置正常队列 过期时间为 30 分钟
-->
<!-- 1. 定义正常交换机【order_exchange】和队列【order_queue】 -->
<rabbit:queue id="order_queue" name="order_queue">
<!-- 3. 绑定。设置正常队列 过期时间为 30 分钟 -->
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx"/>
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel"/>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="order_exchange">
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 2. 定义死信交换机【order_exchange_dlx】和队列【order_queue_dlx】 -->
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
OK
添加生产消息的 方法
@Test
public void testDelay(){
}
先运行一个另外的方法,让 rabbitMQ 把队列和交换机创建 出来
OK,查看交换机
查看绑定关系
没毛病
补全测试 方法
@Test
public void testDelay() throws InterruptedException {
//1. 发送订单消息。【将来 是在订单系统中,下单成功后,发送消息】
rabbitTemplate.convertAndSend("order_exchange","order.msg","您有一条新的订单 来咯~");
//2. 打印倒计时10 s
for (int i = 0; i < 10; i++) {
System.out.println(i + "...");
Thread.sleep(1000);
}
}
这就是生产者了 ,为了看到效果,再来一个 消费者
package com.dingjiaxiong.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* ClassName: OrderListener
* date: 2022/11/16 21:16
*
* @author DingJiaxiong
*/
@Component
public class OrderListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1. 接收转换消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
System.out.println("根据订单id 查询其状态...");
System.out.println("判断状态是否为支付成功");
System.out.println("取消订单,回滚库存...");
//3. 手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// e.printStackTrace();
// 如果出现异常 4. 拒绝签收
System.out.println("出现异常,拒绝签收");
// 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker 会重新发送给该消息给消费端
channel.basicNack(deliveryTag,true,false);
}
}
}
修改一下监听
<!-- 定义监听器容器 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>
</rabbit:listener-container>
OK。直接运行消费者
OK,这就在 监听 了
直接发送消息【笔者 这里上 一个动图】
OK,效果很明显【这就是利用 TTL + 死信队列(交换机) 实现的 延迟队列 的效果】
7.6.3 小结
- 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
- RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。