RabbitMQ
【黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战】
文章目录
- RabbitMQ
- 第二天 高级
- 7 RabbitMQ 高级特性
- 7.5 死信队列
- 7.5.1 死信队列概述
- 7.5.2 代码实现
- 7.5.3 小结
第二天 高级
7 RabbitMQ 高级特性
7.5 死信队列
7.5.1 死信队列概述
死信队列,英文缩写:DLX 。
Dead Letter Exchange(死信交换机),当消息成为Dead message( 死信)后,可以被重新发送到另一个交换机,这个交换机就是DLX。
示意图:
【考虑两个问题】
- 队列 如何绑定 死信交换机?
- 消息在什么 时候成为 死信?【或者说 如何成为死信?】
【消息 成为死信的三种情况】
- 队列消息长度到达限制;【存不下 了的消息就会成为 死信】
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
- 原队列存在消息过期设置【TTL】,消息到达超时时间未被消费;
【队列 绑定 死信交换机】
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
7.5.2 代码实现
【修改生产者 的核心配置文件】
<!-- 死信队列
1. 声明正常的队列【test_queue_dlx】 和交换机【test_exchange_dlx】
2. 声明死信队列【queue_dlx】 和死信交换机【exchange_dlx】
3. 正常队列绑定 死信交换机
设置两个参数
* x-dead-letter-exchange:死信交换机名称
* x-dead-letter-routing-key:发送给死信交换机的路由key
-->
<!-- 1. 声明正常的队列【test_queue_dlx】 和交换机【test_exchange_dlx】 -->
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<!--3. 正常队列绑定 死信交换机-->
<rabbit:queue-arguments>
<!-- 3.1 x-dead-letter-exchange:死信交换机名称 -->
<entry key="x-dead-letter-exchange" value="exchange_dlx"></entry>
<!-- 3.2 x-dead-letter-routing-key:发送给死信交换机的路由key -->
<entry key="x-dead-letter-routing-key" value="dlx.hehe"></entry>
<!-- 4.1 设置队列的过期时间ttl -->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
<!-- 4.2 设置队列的长度限制 max-length -->
<entry key="x-max-length" value="10" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 2. 声明死信队列【queue_dlx】 和死信交换机【exchange_dlx】 -->
<rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
OK,现在就可以 进行消息的发送 了
直接来一个 新的测试方法
/**
* 发送测试死信消息
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
* */
@Test
public void testDlx(){
//1. 测试过期时间的死信消息
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条要死的 消息嘛?");
}
OK,直接运行
效果很明显,过了 10秒 后,消息从 正常的队列 test_queue_dlx 自动到了 queue_dlx 死信队列中,而且会一直保留
在死信 队列 中读一下 这个消息
没问题,这就是我们发送到正常 交换机的那条消息
【这就是测试 过期时间】
【测试长度】
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条要死的 消息嘛?");
}
预期的效果:有10 条会进入正常 交换机,剩余10 条会 直接进入死信 交换机 并转发到 死信队列【加上之前那条没消费的, 就会有11 条, 而且过 10s 钟 后,全部消息 都会到达死信 队列】
运行结果
10s 后
没毛病【这就是 队列 长度的测试】
【消息拒收】
修改一下消费者代码,我们要做的就是 整一个 正常的消费者 去监听正常的那个 队列,并且在拿到 消息后,直接拒绝接收 且不让其返回 队列,观察消息是否会成为 死信,并进入死信交换机
package com.dingjiaxiong.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* ClassName: DlxListener
* date: 2022/11/16 21:16
*
* @author DingJiaxiong
*/
@Component
public class DlxListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1. 接收转换消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
int i = 3 / 0; //出现异常
//3. 手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// e.printStackTrace();
// 如果出现异常 4. 拒绝签收
System.out.println("出现异常,拒绝签收");
// 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker 会重新发送给该消息给消费端
channel.basicNack(deliveryTag,true,false);
}
}
}
修改一下 消费端 的核心配置文件,让这个listener 监听test_queue_dlx 这个正常的 队列
<!-- 定义监听器容器 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>
</rabbit:listener-container>
OK, 直接启动消费者测试
OK,这样就在一直 监听了
现在 再往正常 队列中发送一条 消息
//3. 测试消息拒收
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hah","准备好,我要被拒收了...");
直接运行生产方法
OK,消息发送 完成,直接查看管控台
没毛病,拒收后的消息 也进入了死信队列
7.5.3 小结
- 死信交换机和死信队列和普通的没有区别
- 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
- 消息成为死信的三种情况:
- 队列消息长度到达限制;
- 消费者拒接消费消息,并且不重回队列;
- 原队列存在消息过期设置,消息到达超时时间未被消费;