目录
第六章-RabbitMQ之死信队列
1. 死信概念
2. 死信架构
3. 死信来源
3.1 消息 TTL 过期
3.2 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
3.3 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false(不再重新入队)
4. 验证代码
4.1 TTL
4.2 超过Length
4.3 拒绝
5. 总结
第六章-RabbitMQ之死信队列
1. 死信概念
未了解死信这块的内容的时候,经常听到别人提及 ‘死信’ - What? 信息死了? 按字面意思去看就是一个信息由于某些原因导致无法正常消费,造成了这个信息死亡的结果,造成的原因我们再单说。
死信- Dead Letter . 由于消息未正常消费,导致了消息的再次转移或被丢弃-成为死信。本文中涉及到的内容,是消息从原正常消费队列中又重定向路由到了另外一个预留的队列-死信队列。
应用场景:我们应用最多的应该是在消息防丢处理中使用,消费者消费的时候,出现了异常,我们可以使用死信队列的机制,将此条信息转移到一个预留的队列中,再有对应的消费者去处理这部分数据内容。-还是模糊-继续往下-
2. 死信架构
如上图所示,我们可以看出 黄色的部分内容 为 正常业务的交换机 Exchange,以及正常业务的对列1 Queue, 而绿色的部分为 另外一套 交换机与队列,他们的创建与正常的类型无异,姑且我们称他们为 死信交换机及死信队列2. 那现在我们就有两套交换机与队列。
那最核心的内容是需要将死信交换机与正常业务的队列1做好绑定关系,这样就能实现当正常队列1无法消费或其他原因时,将消息重定向至死信的交换机中去,死信的交换机再将消息路由至队列2中。
总结一下:生产者生产了一条消息,将消息推送到Broker 中的 正常交换机Exchange,交换机将消息路由至队列1 ,这时由于队列1中的消息未能正常消费,过期了,导致队列1又将消息投递到了死信交换机中,死信交换机再将消息路由到队列2,从而是的消费者2正常消费到了队列2中的死信消息。
3. 死信来源
上文也提到了,消息过期了,导致消息未正常消费,那我们在这里总结一下所有死信的来源
3.1 消息 TTL 过期
TTL意思是 Time to live , 即消息的存活时间,那时间到期了,消息也就达到了他的生命的尽头成为了死信。那如何设置TTL呢?
3.1.1 设置消息的TTL
发送消息的时候设置属性,可以每条消息设置不同的ttl
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("1000").build();
3.1.2 设置队列的TTL
声明消息队列的时候,这个是全局的,所有发到这个队列的消息的过期时间是一样的
deadLetterParams.put("x-message-ttl", 1000);
队列设置好TTL,则面板中会提示:
假如你两种都设置了,以小的ttl为准
那这两者设置有啥区别呢?
区别:queue的全局ttl,消息过期立刻就会被删掉;如果是发送消息时设置的ttl,过期之后并不会立刻删掉,这时候消息是否过期是需要投递给消费者的时候判断的。【这个可以做个小实验-投递两条消息,第一条设置过期时间10s,第二条1s, 投递完成,都不进行消费理论第二条应该先过期,但由于第一条没消费,第二条也不会过期】
原因:queue的全局ttl,队列的有效期都一样,先入队列的队列头部,头部也是最早过期的消息,rabbitmq会有一个定时任务从队列的头部开始扫描是否有过期消息即可。而每条设置不同的ttl,只有遍历整个队列才可以筛选出来过期的消息,这样的效率实在是太低,而且如果消息量大了根本不可行,所以rabbitmq在等到消息投递给消费者的时候判断当前消息是否过期,虽然删除的不及时但是不影响功能。
3.2 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
队列设置最大长度及存储
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length",10);/∥设置queue的最大长度10
args.put("x-max-length-bytes",1024);//设置最大总字节数1KB
channel.queueDeclare("myqueue", false, false, false, args);
面板展示:
当消息超限会被丢弃或转向至死信队列中。
3.3 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false(不再重新入队)
消息拒绝方式:这两者的区别是 basic.nack 多一个参数,boolean multiple 支持批量拒绝。其余一致。
channel.basicReject(envelope.getDeliveryTag(), false);
channel.basicNack(envelope.getDeliveryTag(), false, false);
4. 验证代码
4.1 TTL
生产者:
/**
* @author rabbit
* @version 1.0.0
* @Description 死信队列-
* 当消息在一个队列中变为死信后,它被重新发送到另一个Exchange。
* @createTime 2022/07/27 19:34:00
*/
public class DeadLetterTTLProducer {
private static String NORMAL_EXCHANGE_NAME = "normal_exchange_ttl";
//生产者
public static void main(String[] args) throws Exception {
//1、获取connection
Connection connection = RabbitCommonConfig.getConnection();
//2、创建channel
Channel channel = connection.createChannel();
for (int i = 0; i < 5; i++) {
sendMsg(channel);
}
//4、关闭管道和连接
channel.close();
connection.close();
}
private static void sendMsg(Channel channel) throws IOException, InterruptedException {
// 1. 设置消息 TTL 过期时间
// 2. 设置 队列 TTL 过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("1000").build();
String message = "info";
channel.basicPublish(NORMAL_EXCHANGE_NAME, "normal-key", properties, message.getBytes());
System.out.println("消息发送完成:" + message);
}
}
消费者:
/**
* @author rabbit
* @version 1.0.0
* @Description 正常队列消费者,除了正常的消费者 需要创建 队列、交换机,绑定关系外,
* 还需要创建 死信的队列、交换机、绑定关系。
* 最核心的一点还有,需要将 死信队列的交换机信息做为一个参数,绑定到正常的队列中去。
* @createTime 2022/11/17 16:53:00
*/
public class DeadLetterTTLConsumer_Normal {
private static String NORMAL_EXCHANGE_NAME = "normal_exchange_ttl";
private static String NORMAL_QUEUE_NAME = "normal_queue_ttl";
private static String DEAD_EXCHANGE_NAME = "dead_exchange";
private static String DEAD_QUEUE_NAME = "dead-queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1、获取连对象、
Connection connection = RabbitCommonConfig.getConnection();
//2、创建channel
Channel channel = connection.createChannel();
//3. 创建死信队列与交换机及绑定关系
handleQueueAndBinding(channel, DEAD_QUEUE_NAME, null, DEAD_EXCHANGE_NAME, "dead-letter-key");
// 正常队列与死信交换机的绑定关系
Map<String, Object> deadLetterParams = getNormalAndDeadParams();
// 4.声明一个正常队列与交换机及绑定关系
handleQueueAndBinding(channel, NORMAL_QUEUE_NAME, deadLetterParams, NORMAL_EXCHANGE_NAME, "normal-key");
channel.basicQos(1);
//5.开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Normal消费者接收消息: " + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(NORMAL_QUEUE_NAME, false, consumer);
System.out.println("Normal消费者启动接收消息......");
//5、键盘录入,让程序不结束!
System.in.read();
//6、释放资源
channel.close();
connection.close();
}
private static Map<String, Object> getNormalAndDeadParams() {
Map<String, Object> deadLetterParams = new HashMap<>();
deadLetterParams.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
deadLetterParams.put("x-dead-letter-routing-key", "dead-letter-key");
//队列过期时间限制
//deadLetterParams.put("x-message-ttl", 1000);
return deadLetterParams;
}
/**
* 处理队列与绑定关系
*
* @param channel
* @param deadQueueName
* @param o
* @param deadExchangeName
* @param routingKey
* @throws IOException
*/
private static void handleQueueAndBinding(Channel channel, String deadQueueName, Map<String, Object> o, String deadExchangeName, String routingKey) throws IOException {
// 声明一个队列
channel.queueDeclare(deadQueueName, false, false, false, o);
// 声明一个交换机
channel.exchangeDeclare(deadExchangeName, BuiltinExchangeType.DIRECT);
// 队列与交换机绑定
channel.queueBind(deadQueueName, deadExchangeName, routingKey);
}
}
死信消费者:
/**
* @author rabbit
* @version 1.0.0
* @Description 死信队列消费者
* 与正常消费者一致 监听自己的队列消息即可
* @createTime 2022/11/17 16:54:00
*/
public class DeadLetterConsumer_Dead {
private static String DEAD_QUEUE_NAME = "dead-queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1、获取连对象、
Connection connection = RabbitCommonConfig.getConnection();
//2、创建channel
Channel channel = connection.createChannel();
//3.开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("死信消费者接收消息: " + new String(body, "UTF-8"));
}
};
channel.basicConsume(DEAD_QUEUE_NAME, true, consumer);
System.out.println("死信消费者启动等待消费消息:");
//5、键盘录入,让程序不结束!
System.in.read();
//6、释放资源
channel.close();
connection.close();
}
}
结果:
生产者:
消息发送完成:info
消息发送完成:info
消息发送完成:info
消息发送完成:info
消息发送完成:info
消费者:
Normal消费者启动接收消息......
Normal消费者接收消息: info
死信消费者:
死信消费者启动等待消费消息:
死信消费者接收消息: info
死信消费者接收消息: info
死信消费者接收消息: info
死信消费者接收消息: info
4.2 超过Length
生产者:
/**
* @author rabbit
* @version 1.0.0
* @Description 普通的生产者
* @createTime 2022/11/17 16:51:00
*/
public class DeadLetterLengthProducer {
private static String EXCHANGE_NAME = "normal_exchange";
public static void main(String[] args) throws IOException {
//1、获取连对象、
Connection connection = RabbitCommonConfig.getConnection();
//2、创建channel
Channel channel = connection.createChannel();
//3. 发送消息
for (int i = 1; i <= 10; i++) {
String message = "info" + i;
channel.basicPublish(EXCHANGE_NAME, "normal-key", null, message.getBytes());
System.out.println("生产者已发送消息:" + message);
}
System.out.println("消息发送完成");
}
}
消费者:
/**
* @author rabbit
* @version 1.0.0
* @Description 正常队列消费者,除了正常的消费者 需要创建 队列、交换机,绑定关系外,
* 还需要创建 死信的队列、交换机、绑定关系。
* 最核心的一点还有,需要将 死信队列的交换机信息做为一个参数,绑定到正常的队列中去。
* @createTime 2022/11/17 16:53:00
*/
public class DeadLetterLengthConsumer_Normal {
private static String NORMAL_EXCHANGE_NAME = "normal_exchange";
private static String NORMAL_QUEUE_NAME = "normal-queue";
private static String DEAD_EXCHANGE_NAME = "dead_exchange";
private static String DEAD_QUEUE_NAME = "dead-queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1、获取连对象、
Connection connection = RabbitCommonConfig.getConnection();
//2、创建channel
Channel channel = connection.createChannel();
//3. 创建死信队列与交换机及绑定关系
handleQueueAndBinding(channel, DEAD_QUEUE_NAME, null, DEAD_EXCHANGE_NAME, "dead-letter-key");
// 正常队列与死信交换机的绑定关系
Map<String, Object> deadLetterParams = getNormalAndDeadParams();
// 4.声明一个正常队列与交换机及绑定关系
handleQueueAndBinding(channel, NORMAL_QUEUE_NAME, deadLetterParams, NORMAL_EXCHANGE_NAME, "normal-key");
channel.basicQos(1);
//5.开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Normal消费者接收消息: " + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(NORMAL_QUEUE_NAME, false, consumer);
System.out.println("Normal消费者启动接收消息......");
//5、键盘录入,让程序不结束!
System.in.read();
//6、释放资源
channel.close();
connection.close();
}
private static Map<String, Object> getNormalAndDeadParams() {
Map<String, Object> deadLetterParams = new HashMap<>();
deadLetterParams.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
deadLetterParams.put("x-dead-letter-routing-key", "dead-letter-key");
deadLetterParams.put("x-max-length", 6);
return deadLetterParams;
}
/**
* 处理队列与绑定关系
*
* @param channel
* @param deadQueueName
* @param o
* @param deadExchangeName
* @param routingKey
* @throws IOException
*/
private static void handleQueueAndBinding(Channel channel, String deadQueueName, Map<String, Object> o, String deadExchangeName, String routingKey) throws IOException {
// 声明一个队列
channel.queueDeclare(deadQueueName, false, false, false, o);
// 声明一个交换机
channel.exchangeDeclare(deadExchangeName, BuiltinExchangeType.DIRECT);
// 队列与交换机绑定
channel.queueBind(deadQueueName, deadExchangeName, routingKey);
}
}
死信消费者:同上
结果:
生产者:
生产者已发送消息:info1
生产者已发送消息:info2
生产者已发送消息:info3
生产者已发送消息:info4
生产者已发送消息:info5
生产者已发送消息:info6
生产者已发送消息:info7
生产者已发送消息:info8
生产者已发送消息:info9
生产者已发送消息:info10
消息发送完成
消费者:
Normal消费者启动接收消息......
Normal消费者接收消息: info1
Normal消费者接收消息: info5
Normal消费者接收消息: info6
Normal消费者接收消息: info7
Normal消费者接收消息: info8
Normal消费者接收消息: info9
Normal消费者接收消息: info10
死信消费者:
死信消费者启动等待消费消息:
死信消费者接收消息: info2
死信消费者接收消息: info3
死信消费者接收消息: info4
4.3 拒绝
生产者:
/**
* @author rabbit
* @version 1.0.0
* @Description 普通的生产者
* @createTime 2022/11/17 16:51:00
*/
public class DeadLetterRejectProducer {
private static String EXCHANGE_NAME = "normal_exchange_reject";
public static void main(String[] args) throws IOException {
//1、获取连对象、
Connection connection = RabbitCommonConfig.getConnection();
//2、创建channel
Channel channel = connection.createChannel();
//3. 发送消息
for (int i = 1; i <= 10; i++) {
String message = "info" + i;
channel.basicPublish(EXCHANGE_NAME, "normal-key", null, message.getBytes());
System.out.println("生产者已发送消息:" + message);
}
System.out.println("消息发送完成");
}
}
消费者:
/**
* @author rabbit
* @version 1.0.0
* @Description 正常队列消费者,除了正常的消费者 需要创建 队列、交换机,绑定关系外,
* 还需要创建 死信的队列、交换机、绑定关系。
* 最核心的一点还有,需要将 死信队列的交换机信息做为一个参数,绑定到正常的队列中去。
* @createTime 2022/11/17 16:53:00
*/
public class DeadLetterRejectConsumer_Normal {
private static String NORMAL_EXCHANGE_NAME = "normal_exchange_reject";
private static String NORMAL_QUEUE_NAME = "normal_queue_reject";
private static String DEAD_EXCHANGE_NAME = "dead_exchange";
private static String DEAD_QUEUE_NAME = "dead-queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1、获取连对象、
Connection connection = RabbitCommonConfig.getConnection();
//2、创建channel
Channel channel = connection.createChannel();
//3. 创建死信队列与交换机及绑定关系
handleQueueAndBinding(channel, DEAD_QUEUE_NAME, null, DEAD_EXCHANGE_NAME, "dead-letter-key");
// 正常队列与死信交换机的绑定关系
Map<String, Object> deadLetterParams = getNormalAndDeadParams();
// 4.声明一个正常队列与交换机及绑定关系
handleQueueAndBinding(channel, NORMAL_QUEUE_NAME, deadLetterParams, NORMAL_EXCHANGE_NAME, "normal-key");
channel.basicQos(1);
//5.开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if ("info5".equals(new String(body, "UTF-8"))) {
System.out.println("Normal消费者接收消息:" + new String(body, "UTF-8") + "并且拒绝签收了");
// 禁止重新入队
//channel.basicReject(envelope.getDeliveryTag(), false);
channel.basicNack(envelope.getDeliveryTag(), false, false);
} else {
System.out.println("Normal消费者接收消息:" + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(NORMAL_QUEUE_NAME, false, consumer);
System.out.println("Normal消费者启动接收消息......");
//5、键盘录入,让程序不结束!
System.in.read();
//6、释放资源
channel.close();
connection.close();
}
private static Map<String, Object> getNormalAndDeadParams() {
Map<String, Object> deadLetterParams = new HashMap<>();
deadLetterParams.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
deadLetterParams.put("x-dead-letter-routing-key", "dead-letter-key");
return deadLetterParams;
}
/**
* 处理队列与绑定关系
*
* @param channel
* @param deadQueueName
* @param o
* @param deadExchangeName
* @param routingKey
* @throws IOException
*/
private static void handleQueueAndBinding(Channel channel, String deadQueueName, Map<String, Object> o, String deadExchangeName, String routingKey) throws IOException {
// 声明一个队列
channel.queueDeclare(deadQueueName, false, false, false, o);
// 声明一个交换机
channel.exchangeDeclare(deadExchangeName, BuiltinExchangeType.DIRECT);
// 队列与交换机绑定
channel.queueBind(deadQueueName, deadExchangeName, routingKey);
}
}
死信消费者:同上
结果:
生产者:
生产者已发送消息:info1
生产者已发送消息:info2
生产者已发送消息:info3
生产者已发送消息:info4
生产者已发送消息:info5
生产者已发送消息:info6
生产者已发送消息:info7
生产者已发送消息:info8
生产者已发送消息:info9
生产者已发送消息:info10
消息发送完成
消费者:
Normal消费者启动接收消息......
Normal消费者接收消息:info1
Normal消费者接收消息:info2
Normal消费者接收消息:info3
Normal消费者接收消息:info4
Normal消费者接收消息:info5并且拒绝签收了
Normal消费者接收消息:info6
Normal消费者接收消息:info7
Normal消费者接收消息:info8
Normal消费者接收消息:info9
Normal消费者接收消息:info10
死信消费者:
死信消费者启动等待消费消息:
死信消费者接收消息: info5
5. 总结
其实死信队列没什么神秘的内容,只是:
1. 在原来的基础上增加了一套死信Exchange和死信Queue
2. 与原来的Queue 和死信Exchange做好了绑定关系。
绑定关系依靠参数设置在业务正常queue声明的时候进行传入:
Map<String, Object> deadLetterParams = new HashMap<>();
deadLetterParams.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
deadLetterParams.put("x-dead-letter-routing-key", "dead-letter-key");
channel.queueDeclare(deadQueueName, false, false, false, deadLetterParams );
与业务Queue绑定好关系后,看下面板的体现:
这里的 DLX - dead-letter-exchange = 业务中的参数 :"x-dead-letter-exchange" 指向的 死信Exchange
这里的DLK -dead-letter-routing-key" = 业务中的参数:"x-dead-letter-routing-key" 指向的是 死信的 路由键