概述
RabbitMQ是流行的开源消息队列系统,使用erlang语言开发。为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。但由于对死信队列的概念及配置不熟悉,导致曾一度陷入百度的汪洋大海,无法自拔,很多文章都看起来可行,但是实际上却并不能帮我解决实际问题。最终,在官网文档中找到了我想要的答案,通过官网文档的学习,才发现对于死信队列存在一些误解,导致配置死信队列之路困难重重。
详细
一、运行效果
二、实现过程
①、先创建一个Springboot项目。然后在pom文件中添加 spring-boot-starter-amqp
和 spring-boot-starter-web
的依赖,接下来创建一个Config类,这里是关键:
package com.zyf.rabbitmqdeadletterdemo.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.dead.letter.business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.dead.letter.business.queueA";
public static final String BUSINESS_QUEUEB_NAME = "rabbitmq.dead.letter.business.queueB";
public static final String DEAD_LETTER_EXCHANGE = "rabbitmq.dead.letter.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "rabbitmq.dead.letter.deadletter.queueA.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "rabbitmq.dead.letter.deadletter.queueB.routingkey";
public static final String DEAD_LETTER_QUEUEA_NAME = "rabbitmq.dead.letter.deadletter.queueA";
public static final String DEAD_LETTER_QUEUEB_NAME = "rabbitmq.dead.letter.deadletter.queueB";
// 声明业务Exchange
@Bean("businessExchange")
public FanoutExchange businessExchange(){
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
// 声明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 声明业务队列A
@Bean("businessQueueA")
public Queue businessQueueA(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
}
// 声明业务队列B
@Bean("businessQueueB")
public Queue businessQueueB(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
}
// 声明死信队列A
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
// 声明死信队列B
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB(){
return new Queue(DEAD_LETTER_QUEUEB_NAME);
}
// 声明业务队列A绑定关系
@Bean
public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 声明业务队列B绑定关系
@Bean
public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 声明死信队列A绑定关系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
// 声明死信队列B绑定关系
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
}
}
②、接下来,是业务队列的消费代码:
@Slf4j@Componentpublic class BusinessMessageReceiver { @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("收到业务消息A:{}", msg); boolean ack = true;
Exception exception = null; try { if (msg.contains("deadletter")){ throw new RuntimeException("dead letter exception");
}
} catch (Exception e){
ack = false;
exception = e;
} if (!ack){
log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} @RabbitListener(queues = BUSINESS_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
System.out.println("收到业务消息B:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
③、然后配置死信队列的消费者:
@Componentpublic class DeadLetterMessageReceiver { @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息A:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息B:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
④、为了方便测试,写一个简单的消息生产者,并通过controller层来生产消息。
@Componentpublic class BusinessMessageSender { @Autowired
private RabbitTemplate rabbitTemplate; public void sendMsg(String msg){
rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg);
}
}
@RequestMapping("rabbitmq")@RestControllerpublic class RabbitMQMsgController { @Autowired
private BusinessMessageSender sender; @RequestMapping("sendmsg")
public void sendMsg(String msg){
sender.sendMsg(msg);
}
}
三、项目结构图
四、补充总结
死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。
总结一下死信消息的生命周期:
-
业务消息被投入业务队列
-
消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作
-
被nck或reject的消息由RabbitMQ投递到死信交换机中
-
死信交换机将消息投入相应的死信队列
-
死信队列的消费者消费死信消息
死信消息是RabbitMQ为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中,当你明白了这些之后,这些Exchange和Queue想怎样配合就能怎么配合。比如从死信队列拉取消息,然后发送邮件、短信、钉钉通知来通知开发人员关注。或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费。