一、什么是RabbitMQ死信队列
RabbitMQ死信队列(Dead-Letter Exchange,简称DLX)是一种特殊类型的交换机,用于处理在队列中无法被消费的消息。当消息无法被消费时,它会被转发到死信队列中,以便进一步处理。
在RabbitMQ中,死信队列通常用于处理以下情况:
- 消息无法被消费者处理:例如,如果消费者崩溃或消息的格式不正确,则无法处理消息。此时,消息将被发送到死信队列进行进一步处理。
- 消息的优先级较低:如果消息的优先级较低,则可能无法在队列中得到及时处理。在这种情况下,消息也会被发送到死信队列中,以确保它最终被处理。
要使用死信队列,需要创建一个普通的交换机和一个普通的队列,然后创建一个死信队列并将其绑定到普通队列上。当消息无法被消费时,它将被发送到死信队列中。
二、RabbitMQ关单逻辑
1. 流程图
- 订单创建成功后, 发送消息给order-event-exchange交换机,采用路由键order.create.order
- order-event-exchange交换机将消息转发给order.delay.queue队列,队列保存时间为30分钟,如果没有消费,则再将消息路由到order-event-exchange交换机,采用路由键order.release.order
- order-event-exchange交换机再将消息转发到死信队列 order-realease-order.queue,采用路由键order.release.order
- 监听死信队列 order-realease-order.queue,如果订单状态为“待付款”的,说明支付不成功,改为“取消”关闭订单
三、Springboot配置RabbitMQ
1. 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 参数配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
# 虚拟主机
virtual-host: /
# 开启发送端发送确认,无论是否到达broker都会触发回调【发送端确认机制+本地事务表】
publisher-confirm-type: correlated
# 开启发送端抵达队列确认,消息未被队列接收时触发回调【发送端确认机制+本地事务表】
publisher-returns: true
# 消息在没有被队列接收时是否强行退回
template:
mandatory: true
# 消费者手动确认模式,关闭自动确认,否则会消息丢失
listener:
simple:
acknowledge-mode: manual
3、RabbitMQ模板配置
@Configuration
@Slf4j
public class MyRabbitConfig {
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
// TODO 封装RabbitTemplate
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate(rabbitTemplate);
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter() {
// 使用json序列化器来序列化消息,发送消息时,消息对象会被序列化成json格式
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
* 1、服务收到消息就会回调
* 1、spring.rabbitmq.publisher-confirms: true
* 2、设置确认回调
* 2、消息正确抵达队列就会进行回调
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、设置确认回调ReturnCallback
* <p>
* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
*/
//@PostConstruct // (MyRabbitConfig对象创建完成以后,执行这个方法)
public void initRabbitTemplate(RabbitTemplate rabbitTemplate) {
/**
* 发送消息触发confirmCallback回调
* @param correlationData:当前消息的唯一关联数据(如果发送消息时未指定此值,则回调时返回null)
* @param ack:消息是否成功收到(ack=true,消息抵达Broker)
* @param cause:失败的原因
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("发送消息触发confirmCallback回调" +
"\ncorrelationData ===> " + correlationData +
"\nack ===> " + ack + "" +
"\ncause ===> " + cause);
log.info("=================================================");
});
/**
* 消息未到达队列触发returnCallback回调
* 只要消息没有投递给指定的队列,就触发这个失败回调
* @param message:投递失败的消息详细信息
* @param replyCode:回复的状态码
* @param replyText:回复的文本内容
* @param exchange:接收消息的交换机
* @param routingKey:接收消息的路由键
*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息未到达队列触发returnCallback回调" +
"\nmessage ===> " + message +
"\nreplyCode ===> " + replyCode +
"\nreplyText ===> " + replyText +
"\nexchange ===> " + exchange +
"\nroutingKey ===> " + routingKey);
// TODO 修改mq_message,设置消息状态为2-错误抵达【后期定时器重发消息】
});
}
}
4、启动RabbitMQ
@SpringBootApplication
@EnableRabbit //启用RabbitMQ自动配置
public class Application implements CommandLineRunner {
public static void main(String[] args) throws Exception {
SpringApplication.run(Application.class, args);
}
}
四、关单业务流程
1. 提交订单完成后,发送关单消息
1. 提交订单控制层
/**
* 创建订单
* 创建成功,跳转订单支付页
* 创建失败,跳转结算页
* 无需提交要购买的商品,提交订单时会实时查询最新的购物车商品选中数据提交
*/
@TokenVerify
@PostMapping(value = "/submitOrder")
public String submitOrder(OrderSubmitVO vo, Model model, RedirectAttributes attributes) {
try {
SubmitOrderResponseVO orderVO = orderService.submitOrder(vo);
// 创建订单成功,跳转收银台
model.addAttribute("submitOrderResp", orderVO);// 封装VO订单数据,供页面解析[订单号、应付金额]
return "pay";
} catch (Exception e) {
// 下单失败回到订单结算页
if (e instanceof VerifyPriceException) {
String message = ((VerifyPriceException) e).getMessage();
attributes.addFlashAttribute("msg", "下单失败" + message);
} else if (e instanceof NoStockException) {
String message = ((NoStockException) e).getMessage();
attributes.addFlashAttribute("msg", "下单失败" + message);
}
return "redirect:http://order.kmall.com/toTrade";
}
}
2. 提交订单实现层
@Transactional(isolation = Isolation.DEFAULT)
@Override
public SubmitOrderResponseVO submitOrder(OrderSubmitVO orderSubmitVO) throws Exception {
SubmitOrderResponseVO result = new SubmitOrderResponseVO();// 返回值
// 创建订单线程共享提交数据
confirmVoThreadLocal.set(orderSubmitVO);
// 1.生成订单实体对象(订单 + 订单项)
OrderCreateTO order = this.createOrder();
// 2.验价应付金额(允许0.01误差,前后端计算不一致)
if (Math.abs(orderSubmitVO.getPayPrice().subtract(order.getPayPrice()).doubleValue()) >= 0.01) {
// 验价不通过
throw new VerifyPriceException();
}
// 验价成功
// 3.保存订单
saveOrder(order);
// 4.库存锁定(wms_ware_sku)
// 封装待锁定商品项TO
WareSkuLockTO lockTO = new WareSkuLockTO();
lockTO.setOrderSn(order.getOrder().getOrderSn());
List<OrderItemVO> itemList = order.getOrderItems().stream().map((item) -> {
OrderItemVO itemVO = new OrderItemVO();
itemVO.setSkuId(item.getSkuId());
itemVO.setCount(item.getSkuQuantity());
itemVO.setTitle(item.getSkuName());
return itemVO;
}).collect(Collectors.toList());
lockTO.setLocks(itemList);
// 待锁定订单项
R response = wmsFeignService.orderLockStock(lockTO);
if (response.getCode() == 0) {
// 锁定成功
// TODO 5.远程扣减积分
// 封装响应数据返回
result.setOrder(order.getOrder());
//System.out.println(10 / 0); // 模拟订单回滚,库存不会滚
// 6.发送创建订单到延时队列
rabbitTemplate.convertAndSend(MQConstant.order_event_exchange, MQConstant.order_create_routekey, order.getOrder());
return result;
} else {
// 锁定失败
throw new NoStockException("");
}
}
2. 在容器注入消息交换机、队列并进行绑定
@Configuration
public class MyRabbitMQConfig {
/**
* 延时队列
*/
@Bean
public Queue orderDelayQueue() {
/**
* Queue(String name, 队列名字
* boolean durable, 是否持久化
* boolean exclusive, 是否排他
* boolean autoDelete, 是否自动删除
* Map<String, Object> arguments) 属性【TTL、死信路由、死信路由键】
*/
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", MQConstant.order_event_exchange);// 死信路由
arguments.put("x-dead-letter-routing-key", MQConstant.order_release_routekey);// 死信路由键
arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
return new Queue(MQConstant.order_delay_queue, true, false, false, arguments);
}
/**
* 交换机(死信路由)
*/
@Bean
public Exchange orderEventExchange() {
return new TopicExchange(MQConstant.order_event_exchange, true, false);
}
/**
* 死信队列
*/
@Bean
public Queue orderReleaseQueue() {
return new Queue(MQConstant.order_release_queue, true, false, false);
}
/**
* 绑定:交换机与订单解锁延迟队列
*/
@Bean
public Binding orderCreateBinding() {
/**
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
**/
return new Binding(MQConstant.order_delay_queue,
Binding.DestinationType.QUEUE,
MQConstant.order_event_exchange,
MQConstant.order_create_routekey,
null);
}
/**
* 绑定:交换机与订单解锁死信队列
*/
@Bean
public Binding orderReleaseBinding() {
return new Binding(MQConstant.order_release_queue,
Binding.DestinationType.QUEUE,
MQConstant.order_event_exchange,
MQConstant.order_release_routekey,
null);
}
/**
* 绑定:交换机与库存解锁
*/
@Bean
public Binding orderReleaseOtherBinding() {
return new Binding(MQConstant.stock_release_queue,
Binding.DestinationType.QUEUE,
MQConstant.order_event_exchange,
"order.release.other.#",
null);
}
}
3. 监听死信队列,进行关单,确认消息
@Slf4j
@RabbitListener(queues = MQConstant.order_release_queue)
@Component
public class OrderCloseListener {
@Autowired
OrderService orderService;
/**
* 定时关单,监听死信队列,如果死信队列 消息过期时间 1分钟 后没有消费,就该关单
* @param order
* @param message
* @param channel
* @throws IOException
*/
@RabbitHandler
public void handleOrderRelease(OrderEntity order, Message message, Channel channel) throws IOException {
log.debug("订单解锁,订单号:" + order.getOrderSn());
try {
orderService.closeOrder(order);
// 手动删除消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 解锁失败 将消息重新放回队列,让别人消费
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
4. 关闭订单
只有是待付款状态才要关单
@Override
public void closeOrder(OrderEntity order) {
OrderEntity _order = this.getById(order.getId());
//只有是待付款状态才要关单
if (OrderConstant.OrderStatusEnum.CREATE_NEW.getCode().equals(_order.getStatus())) {
// 待付款状态允许关单
OrderEntity temp = new OrderEntity();
temp.setId(order.getId());
temp.setStatus(OrderConstant.OrderStatusEnum.CANCLED.getCode());
updateById(temp);
try {
// 发送消息给MQ
OrderTO orderTO = new OrderTO();
BeanUtils.copyProperties(_order, orderTO);
//TODO 持久化消息到mq_message表中,并设置消息状态为3-已抵达(保存日志记录)
rabbitTemplate.convertAndSend(MQConstant.order_event_exchange, "order.release.other", orderTO);
} catch (Exception e) {
// TODO 消息为抵达Broker,修改mq_message消息状态为2-错误抵达
}
}
}
五、源码下载
https://gitee.com/charlinchenlin/koo-erp