目录
- 死信队列是什么
- 怎样实现一个死信队列
- 说明
- 实现过程
- 导入依赖
- 添加配置
- 编写mq配置类
- 添加业务队列的消费者
- 添加死信队列的消费者
- 添加消息发送者
- 添加消息测试类
- 测试
- 死信队列的应用场景
- 总结
死信队列是什么
“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:
- 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
- 消息在队列的存活时间超过设置的TTL时间。
- 消息队列的消息数量已经超过最大队列长度。
那么该消息将成为“死信”。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
怎样实现一个死信队列
说明
配置死信队列大概可以分为三个步骤:
1.配置业务队列,绑定到业务交换机上
2.为业务队列配置死信交换机和路由key
3.为死信交换机配置死信队列
注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。
实现过程
导入依赖
<!--RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加配置
spring:
#rabbitmq
rabbitmq:
host: 83.136.16.134
password: guest
username: guest
listener:
type: simple
simple:
default-requeue-rejected: false
acknowledge-mode: manual
编写mq配置类
代码里面有详细说明,这里不在赘述。
package com.miaosha.study.mq;
import com.sun.org.apache.regexp.internal.RE;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: laz
* @CreateTime: 2023-02-27 09:16
* @Version: 1.0
*/
@Configuration
public class RabbitmqConfig {
/**
* 业务交换机
*/
public static final String BUSINESS_EXCHANGE_NAME = "business.exchange";
/**
* 业务队列a
*/
public static final String BUSINESS_QUEUEA_NAME = "business.queue.a";
/**
* 业务交换机b
*/
public static final String BUSINESS_QUEUEB_NAME = "business.queue.b";
/**
* 死信交换机
*/
public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
/**
* 死信队列a
*/
public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.queue.a";
/**
* 死信队列b
*/
public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.queue.b";
/**
* 死信队列路由键a
*/
public static final String DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME = "dead.letter.queue.a.rounting.key";
/**
* 死信队列路由键b
*/
public static final String DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME = "dead.letter.queue.b.rounting.key";
/**
* 申明业务交换机
* @return
*/
@Bean
public FanoutExchange businessExchange(){
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
/**
* 申明死信交换机
* @return
*/
@Bean
public DirectExchange deadletterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
}
/**
* 申明业务队列a
* @return
*/
@Bean
public Queue queuea(){
Map<String,Object> map = new HashMap<>();
//绑定死信交换机
map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);
//绑定的死信路由键
map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME);
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(map).build();
}
/**
* 申明业务队列b
* @return
*/
@Bean
public Queue queueb(){
Map<String,Object> map = new HashMap<>();
//绑定死信交换机
map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);
//绑定的死信路由键
map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME);
return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(map).build();
}
/**
* 申明死信队列a
* @return
*/
@Bean
public Queue deadletterQueuea(){
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
/**
* 申明死信队列b
* @return
*/
@Bean
public Queue deadletterQueueb(){
return new Queue(DEAD_LETTER_QUEUEB_NAME);
}
/**
* 队列a绑定到业务交换机
* @return
*/
@Bean
public Binding businessBindinga(){
return BindingBuilder.bind(queuea()).to(businessExchange());
}
/**
* 队列b绑定到业务交换机
* @return
*/
@Bean
public Binding businessBindingb(){
return BindingBuilder.bind(queueb()).to(businessExchange());
}
/**
* 死信队列a绑定到死信交换机
* @return
*/
@Bean
public Binding deadletterBindinga(){
return BindingBuilder.bind(deadletterQueuea()).to(deadletterExchange()).with(DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME);
}
/**
* 死信队列b绑定到死信交换机
* @return
*/
@Bean
public Binding deadletterBindingB(){
return BindingBuilder.bind(deadletterQueueb()).to(deadletterExchange()).with(DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME);
}
}
添加业务队列的消费者
package com.miaosha.study.mq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_QUEUEA_NAME;
import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_QUEUEB_NAME;
/**
* @Author: laz
* @CreateTime: 2023-02-27 09:53
* @Version: 1.0
*/
@Slf4j
@Component
public class RabbitmqReceiver {
/**
* 监听业务队列a
* @param message
*/
@RabbitListener(queues = BUSINESS_QUEUEA_NAME)
public void queuea(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("业务队列A接受到消息【{}】",msg);
boolean ack = true;
Exception exception = null;
try {
//这里模拟业务逻辑出现异常的情况
if (msg.contains("fail")){
throw new RuntimeException("dead letter exception");
}
} catch (Exception e){
ack = false;
exception = e;
}
//当ack为false时(业务逻辑出现异常),说明当前消息消费异常,这里直接放入死信队列
if (!ack){
log.error("业务队列A消费发生异常,error msg:{}", exception.getMessage());
/**
* void basicNack(long deliveryTag, boolean multiple, boolean requeue)
* 参数一:当前消息的唯一id
* 参数二:是否针对多条消息
* 参数三:是否从新入队列
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
/**
* 监听业务队列b
* @param msg
*/
@RabbitListener(queues = BUSINESS_QUEUEB_NAME)
public void queueb(Message msg,Channel channel) throws Exception{
String str = new String(msg.getBody());
log.info("业务队列B接受到消息【{}】",str);
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
}
}
添加死信队列的消费者
package com.miaosha.study.mq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import static com.miaosha.study.mq.RabbitmqConfig.*;
/**
* @Author: laz
* @CreateTime: 2023-02-27 09:58
* @Version: 1.0
*/
@Slf4j
@Component
public class DeadLetterReceiver {
/**
* 监听业务队列a
* @param msg
*/
@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
public void queuea(Message msg, Channel channel) throws IOException {
String str = new String(msg.getBody());
log.info("死信队列A接受到消息【{}】",str);
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
log.info("死信消息properties:{}", msg.getMessageProperties());
}
/**
* 监听业务队列b
* @param msg
*/
@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
public void queueb(Message msg, Channel channel) throws IOException {
String str = new String(msg.getBody());
log.info("死信队列B接受到消息【{}】",str);
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
log.info("死信消息properties:{}", msg.getMessageProperties());
}
}
添加消息发送者
package com.miaosha.study.mq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_EXCHANGE_NAME;
/**
* @Author: laz
* @CreateTime: 2023-02-27 09:49
* @Version: 1.0
*/
@Component
public class RabbitmqSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String msg){
rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME,"",msg);
}
}
添加消息测试类
package com.miaosha.study.controller;
import com.miaosha.study.mq.RabbitmqSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: laz
* @CreateTime: 2023-02-27 09:59
* @Version: 1.0
*/
@RestController
@RequestMapping("mq")
public class TestController {
@Autowired
private RabbitmqSender rabbitmqSender;
@RequestMapping("testDeadLetterQueue/{msg}")
public void testDeadLetterQueue(@PathVariable("msg")String msg){
rabbitmqSender.sendMsg(msg);
}
}
测试
运行项目,访问:http://localhost:8081/mq/testDeadLetterQueue/msg
可以看到,此时只有业务消费者消费了消息,死信队列并没有消费到消息。
然后根据消费者里面的逻辑,我们发送一条 ‘fail’的消息,再次测试
访问:http://localhost:8081/mq/testDeadLetterQueue/fail
可以看到,死信队列a已收到消息。到此实现死信队列的流程就通了。
注意:我们的死信消息MessageProperties
中的内容比较多,代表的含义分别是:
字段名 | 含义 |
---|---|
x-first-death-exchange | 第一次被抛入的死信交换机的名称 |
x-first-death-reason | 第一次成为死信的原因,rejected:消息在重新进入队列时被队列拒绝,由于default-requeue-rejected 参数被设置为false。expired :消息过期。maxlen : 队列内消息数量超过队列最大容量 |
x-first-death-queue | 第一次成为死信前所在队列名称 |
x-death | 历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新 |
死信队列的应用场景
一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等,当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息。通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了。
总结
死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。
死信消息的生命周期:
- 业务消息被投入业务队列
- 消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作
- 被nck或reject的消息由RabbitMQ投递到死信交换机中
- 死信交换机将消息投入相应的死信队列
- 死信队列的消费者消费死信消息
本篇文章到此结束!希望对您有所帮助。