rabbit mq 死信队列
什么是死信队列?
DL-Dead Letter 死信队列
死信,在官网中对应的单词为“Dead Letter”,可以看出翻译确实非常的简单粗暴。那么死信是个什么东西呢?
“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:
- 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。通常来说,如果consumer如果消费出现异常,并且没有ack的话,也属于这种情况
- 消息在队列的存活时间超过设置的生存时间(TTL)时间。
- 消息队列的消息数量已经超过最大队列长度。
那么该消息将成为“死信”。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
流程
publisher端
下面是生产者需要做的
代码实现
- 创建message 相关
@Configuration
public class DXLOrderMessageConfig {
public static final String DXL_ORDER_QUEUE = "dxl-order-queue";
public static final String DXL_ORDER_EXCHANGE = "dxl-order-exchange";
public static final String DXL_ORDER_KEY = "dxl-order-routingKey";
public static final String ORDER_QUEUE = "order-queue";
public static final String ORDER_ROUTING_KEY = "order-routingKey";
public static final String ORDER_EXCHANGE = "order-exchange";
@Bean
Queue DXLOrderQueue() {
return new Queue(DXL_ORDER_QUEUE, true, false, false);
}
@Bean
DirectExchange DXLOrderExchange() {
return new DirectExchange(DXL_ORDER_EXCHANGE, true, false);
}
@Bean
Binding dxlBinding() {
return BindingBuilder.bind(DXLOrderQueue()).to(DXLOrderExchange()).with(DXL_ORDER_KEY);
}
@Bean
DirectExchange OrderExchange() {
Map<String, Object> deadLetterParams = new HashMap<>(2);
return new DirectExchange(ORDER_EXCHANGE, true, false, deadLetterParams);
}
@Bean
Queue OrderQueue() {
Map<String, Object> args = new HashMap<>(1);
// ms,指定对应的死信队列配置,和该消息的有效期限,这里设置的是60分钟
args.put("x-message-ttl", 60000);
// x-max-length:队列最大容纳消息条数,大于该值,mq拒绝接受消息,消息进入死信队列
// args.put("x-max-length", 5);
args.put("x-dead-letter-exchange", DXL_ORDER_EXCHANGE);
args.put("x-dead-letter-routing-key", DXL_ORDER_KEY);
return new Queue(ORDER_QUEUE, true, false, false, args);
}
@Bean
Binding bindingOrder() {
return BindingBuilder.bind(OrderQueue()).to(OrderExchange()).with(ORDER_ROUTING_KEY);
}
- 发送正常的mq,为了支持自定义的测试case,增加了自定义的orderNo
@GetMapping("/create-order/{orderNo}")
public String createOrder(@PathVariable("orderNo") String orderNo) {
// 发送持久化mq
Map<String, Object> orderMessage = getMqMessage();
if (StringUtils.isEmpty(orderNo)) {
orderNo = "order-" + RandomUtils.nextInt(1, 100);
}
orderMessage.put("orderNo", orderNo);
rabbitTemplate.convertAndSend(DXLOrderMessageConfig.ORDER_EXCHANGE, DXLOrderMessageConfig.ORDER_ROUTING_KEY, orderMessage);
return "ok";
}
调用url: http://localhost:9876/rabbit-mq-sender/create-order/xxxxx即可测试
测试
我们发送一个正常的订单message
在一分钟后,如果不被消费的话,那么就会自动进入binding的DL中
消费端
在消费端,通常我们需要两个消费者,一个正常的业务消费者,一个dlx的消费者,我们首先定义一个消费异常的cosumer,那么消息在异常之后会进入dead letter
代码实现
业务消费
正常的业务代码-OrderChannelListener.java,测试使用
@Service
public class OrderChannelListener extends Loggable implements ChannelAwareMessageListener {
@RabbitListener(queues = "order-queue")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
business(message);
// 手动签收
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
//拒绝消费消息(丢失消息) 给死信队列
channel.basicReject(deliveryTag, false);
}
}
private void business(Message orderMessage) {
String body = new String(orderMessage.getBody(), StandardCharsets.UTF_8);
Map<String, Object> messageInMap = JsonUtils.toMap(body);
String orderNo = (String) messageInMap.get("orderNo");
logger.info("DirectReceiver[OrderQueue]正常订单消费者收到消息 : " + orderNo);
if (orderNo != null && orderNo.contains("404")) {
throw new RuntimeException("illegal order no");
}
}
}
dead letter消费者
死信队列的消费 DLXOrderChannelListener.class
@Service
public class DLXOrderChannelListener extends Loggable implements ChannelAwareMessageListener {
@RabbitListener(queues = "dlx-order-queue")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String s = new String(message.getBody(), StandardCharsets.UTF_8);
logger.info("订单dead letter queue[dead letter queue]订单消费者收到消息 : " + s);
logger.info("开始处理超时的订单.....");
logger.info("结束处理超时的订单.....");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
我们调用http://localhost:9876/rabbit-mq-sender/create-order/404,我们期待,在正常消费的时候,发生异常,然后会调到死信队列的消费者中去了,我们看日志
和预测的一模一样。
实际应用
我们都知道,下单过程中,一般会有半个小时的支付时间,这个时间段,订单会占用库存,那么如果过了这个时间段不支付的话,我们会取消该订单并且释放库存。为了实现这个目的,通常我们会有两个问题
- 后台任务轮询
- mq的死信队列,利用mq信息的ttl,来驱动
流程图如下
从流程图上我们可以看出,我们只需要一个死信队列的消费者就好了,我们使用mq的有效期,ttl去实现定时的mq到死信队列的状态
代码实现
服务器端
@GetMapping("/save-order/{orderNo}")
public String simulateCreateOrder(@PathVariable("orderNo") String orderNo) {
try {
insertOrderTable();
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
// 发送mq去检测延时检测状态
sendCheckMq(orderNo);
return "ok";
}
private void insertOrderTable() throws InterruptedException {
logger.info("start to create order ....");
TimeUnit.SECONDS.sleep(1);
logger.info("inserted to order table and occupy the stock");
}
private void sendCheckMq(String orderNo) {
// 发送持久化mq
Map<String, Object> orderMessage = getMqMessage();
if (StringUtils.isEmpty(orderNo)) {
orderNo = "order-" + RandomUtils.nextInt(1, 100);
}
orderMessage.put("orderNo", orderNo);
rabbitTemplate.convertAndSend(DXLOrderMessageConfig.ORDER_EXCHANGE, DXLOrderMessageConfig.ORDER_ROUTING_KEY, orderMessage);
}
死信队列端
死信端和上述的dead letter代码没啥区别,但是业务的消费者需要注释掉,只能通过ttl来触发正常的mq跳转到死信队列,参考DLXOrderChannelListener.class
代码参考
https://github.com/GitHubsteven/spring-in-action2.0/tree/master/spring-boot-mesage
参考文档
- Spring Boot + RabbitMQ Tutorial - Retry and Error Handling Example | JavaInUse
- RabbitMQ死信队列实战——解决订单超时未支付