使用RabbitMQ死信队列关闭未支付的订单

news2024/12/23 13:25:29

一、什么是RabbitMQ死信队列

RabbitMQ死信队列(Dead-Letter Exchange,简称DLX)是一种特殊类型的交换机,用于处理在队列中无法被消费的消息。当消息无法被消费时,它会被转发到死信队列中,以便进一步处理。

在RabbitMQ中,死信队列通常用于处理以下情况:

  1. 消息无法被消费者处理:例如,如果消费者崩溃或消息的格式不正确,则无法处理消息。此时,消息将被发送到死信队列进行进一步处理。
  2. 消息的优先级较低:如果消息的优先级较低,则可能无法在队列中得到及时处理。在这种情况下,消息也会被发送到死信队列中,以确保它最终被处理。

要使用死信队列,需要创建一个普通的交换机和一个普通的队列,然后创建一个死信队列并将其绑定到普通队列上。当消息无法被消费时,它将被发送到死信队列中。

二、RabbitMQ关单逻辑

1. 流程图

在这里插入图片描述

  • 订单创建成功后, 发送消息给order-event-exchange交换机,采用路由键order.create.order
  • order-event-exchange交换机将消息转发给order.delay.queue队列,队列保存时间为30分钟,如果没有消费,则再将消息路由到order-event-exchange交换机,采用路由键order.release.order
  • order-event-exchange交换机再将消息转发到死信队列 order-realease-order.queue,采用路由键order.release.order
  • 监听死信队列 order-realease-order.queue,如果订单状态为“待付款”的,说明支付不成功,改为“取消”关闭订单

三、Springboot配置RabbitMQ

1. 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 参数配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    # 虚拟主机
    virtual-host: /
    # 开启发送端发送确认,无论是否到达broker都会触发回调【发送端确认机制+本地事务表】
    publisher-confirm-type: correlated
    # 开启发送端抵达队列确认,消息未被队列接收时触发回调【发送端确认机制+本地事务表】
    publisher-returns: true
    # 消息在没有被队列接收时是否强行退回
    template:
      mandatory: true
    # 消费者手动确认模式,关闭自动确认,否则会消息丢失
    listener:
      simple:
        acknowledge-mode: manual

3、RabbitMQ模板配置

@Configuration
@Slf4j
public class MyRabbitConfig {

    @Primary
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        // TODO 封装RabbitTemplate
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        initRabbitTemplate(rabbitTemplate);
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter messageConverter() {
        // 使用json序列化器来序列化消息,发送消息时,消息对象会被序列化成json格式
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     * 1、spring.rabbitmq.publisher-confirms: true
     * 2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     * 1、spring.rabbitmq.publisher-returns: true
     *    spring.rabbitmq.template.mandatory: true
     * 2、设置确认回调ReturnCallback
     * <p>
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     */
    //@PostConstruct   // (MyRabbitConfig对象创建完成以后,执行这个方法)
    public void initRabbitTemplate(RabbitTemplate rabbitTemplate) {
        /**
         * 发送消息触发confirmCallback回调
         * @param correlationData:当前消息的唯一关联数据(如果发送消息时未指定此值,则回调时返回null)
         * @param ack:消息是否成功收到(ack=true,消息抵达Broker)
         * @param cause:失败的原因
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("发送消息触发confirmCallback回调" +
                    "\ncorrelationData ===> " + correlationData +
                    "\nack ===> " + ack + "" +
                    "\ncause ===> " + cause);
            log.info("=================================================");
        });

        /**
         * 消息未到达队列触发returnCallback回调
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * @param message:投递失败的消息详细信息
         * @param replyCode:回复的状态码
         * @param replyText:回复的文本内容
         * @param exchange:接收消息的交换机
         * @param routingKey:接收消息的路由键
         */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息未到达队列触发returnCallback回调" +
                    "\nmessage ===> " + message +
                    "\nreplyCode ===> " + replyCode +
                    "\nreplyText ===> " + replyText +
                    "\nexchange ===> " + exchange +
                    "\nroutingKey ===> " + routingKey);
            // TODO 修改mq_message,设置消息状态为2-错误抵达【后期定时器重发消息】
        });
    }
}

4、启动RabbitMQ

@SpringBootApplication
@EnableRabbit //启用RabbitMQ自动配置
public class Application implements CommandLineRunner {   

    public static void main(String[] args) throws Exception {
        SpringApplication.run(Application.class, args);
    }
}

四、关单业务流程

1. 提交订单完成后,发送关单消息

1. 提交订单控制层

/**
     * 创建订单
     * 创建成功,跳转订单支付页
     * 创建失败,跳转结算页
     * 无需提交要购买的商品,提交订单时会实时查询最新的购物车商品选中数据提交
     */
    @TokenVerify
    @PostMapping(value = "/submitOrder")
    public String submitOrder(OrderSubmitVO vo, Model model, RedirectAttributes attributes) {
        try {
            SubmitOrderResponseVO orderVO = orderService.submitOrder(vo);
            // 创建订单成功,跳转收银台
            model.addAttribute("submitOrderResp", orderVO);// 封装VO订单数据,供页面解析[订单号、应付金额]
            return "pay";
        } catch (Exception e) {
            // 下单失败回到订单结算页
            if (e instanceof VerifyPriceException) {
                String message = ((VerifyPriceException) e).getMessage();
                attributes.addFlashAttribute("msg", "下单失败" + message);
            } else if (e instanceof NoStockException) {
                String message = ((NoStockException) e).getMessage();
                attributes.addFlashAttribute("msg", "下单失败" + message);
            }
            return "redirect:http://order.kmall.com/toTrade";
        }
    }

2. 提交订单实现层

@Transactional(isolation = Isolation.DEFAULT)
    @Override
    public SubmitOrderResponseVO submitOrder(OrderSubmitVO orderSubmitVO) throws Exception {
        SubmitOrderResponseVO result = new SubmitOrderResponseVO();// 返回值

        // 创建订单线程共享提交数据
        confirmVoThreadLocal.set(orderSubmitVO);
        // 1.生成订单实体对象(订单 + 订单项)
        OrderCreateTO order = this.createOrder();
        // 2.验价应付金额(允许0.01误差,前后端计算不一致)
        if (Math.abs(orderSubmitVO.getPayPrice().subtract(order.getPayPrice()).doubleValue()) >= 0.01) {
            // 验价不通过
            throw new VerifyPriceException();
        }
        // 验价成功
        // 3.保存订单
        saveOrder(order);
        // 4.库存锁定(wms_ware_sku)
        // 封装待锁定商品项TO
        WareSkuLockTO lockTO = new WareSkuLockTO();
        lockTO.setOrderSn(order.getOrder().getOrderSn());
        List<OrderItemVO> itemList = order.getOrderItems().stream().map((item) -> {
            OrderItemVO itemVO = new OrderItemVO();
            itemVO.setSkuId(item.getSkuId());
            itemVO.setCount(item.getSkuQuantity());
            itemVO.setTitle(item.getSkuName());
            return itemVO;
        }).collect(Collectors.toList());
        lockTO.setLocks(itemList);

        // 待锁定订单项
        R response = wmsFeignService.orderLockStock(lockTO);
        if (response.getCode() == 0) {
            // 锁定成功
            // TODO 5.远程扣减积分
            // 封装响应数据返回
            result.setOrder(order.getOrder());
            //System.out.println(10 / 0); // 模拟订单回滚,库存不会滚
            // 6.发送创建订单到延时队列
            rabbitTemplate.convertAndSend(MQConstant.order_event_exchange, MQConstant.order_create_routekey, order.getOrder());
            return result;
        } else {
            // 锁定失败
            throw new NoStockException("");
        }
    }

2. 在容器注入消息交换机、队列并进行绑定

@Configuration
public class MyRabbitMQConfig {

    /**
     * 延时队列
     */
    @Bean
    public Queue orderDelayQueue() {
        /**
         * Queue(String name,  队列名字
         *       boolean durable,  是否持久化
         *       boolean exclusive,  是否排他
         *       boolean autoDelete, 是否自动删除
         *       Map<String, Object> arguments) 属性【TTL、死信路由、死信路由键】
         */
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", MQConstant.order_event_exchange);// 死信路由
        arguments.put("x-dead-letter-routing-key", MQConstant.order_release_routekey);// 死信路由键
        arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
        return new Queue(MQConstant.order_delay_queue, true, false, false, arguments);
    }

    /**
     * 交换机(死信路由)
     */
    @Bean
    public Exchange orderEventExchange() {
        return new TopicExchange(MQConstant.order_event_exchange, true, false);
    }

    /**
     * 死信队列
     */
    @Bean
    public Queue orderReleaseQueue() {
        return new Queue(MQConstant.order_release_queue, true, false, false);
    }

    /**
     * 绑定:交换机与订单解锁延迟队列
     */
    @Bean
    public Binding orderCreateBinding() {
        /**
         * String destination, 目的地(队列名或者交换机名字)
         * DestinationType destinationType, 目的地类型(Queue、Exhcange)
         * String exchange,
         * String routingKey,
         * Map<String, Object> arguments
         **/
        return new Binding(MQConstant.order_delay_queue,
                Binding.DestinationType.QUEUE,
                MQConstant.order_event_exchange,
                MQConstant.order_create_routekey,
                null);
    }

    /**
     * 绑定:交换机与订单解锁死信队列
     */
    @Bean
    public Binding orderReleaseBinding() {
        return new Binding(MQConstant.order_release_queue,
                Binding.DestinationType.QUEUE,
                MQConstant.order_event_exchange,
                MQConstant.order_release_routekey,
                null);
    }

    /**
     * 绑定:交换机与库存解锁
     */
    @Bean
    public Binding orderReleaseOtherBinding() {
        return new Binding(MQConstant.stock_release_queue,
                Binding.DestinationType.QUEUE,
                MQConstant.order_event_exchange,
                "order.release.other.#",
                null);
    }
}

3. 监听死信队列,进行关单,确认消息

@Slf4j
@RabbitListener(queues = MQConstant.order_release_queue)
@Component
public class OrderCloseListener {

    @Autowired
    OrderService orderService;

    /**
     * 定时关单,监听死信队列,如果死信队列  消息过期时间 1分钟 后没有消费,就该关单
     * @param order
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitHandler
    public void handleOrderRelease(OrderEntity order, Message message, Channel channel) throws IOException {
        log.debug("订单解锁,订单号:" + order.getOrderSn());
        try {
            orderService.closeOrder(order);
            // 手动删除消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 解锁失败 将消息重新放回队列,让别人消费
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }

}

4. 关闭订单

只有是待付款状态才要关单

@Override
    public void closeOrder(OrderEntity order) {
        OrderEntity _order = this.getById(order.getId());
        //只有是待付款状态才要关单
        if (OrderConstant.OrderStatusEnum.CREATE_NEW.getCode().equals(_order.getStatus())) {
            // 待付款状态允许关单
            OrderEntity temp = new OrderEntity();
            temp.setId(order.getId());
            temp.setStatus(OrderConstant.OrderStatusEnum.CANCLED.getCode());
            updateById(temp);

            try {
                // 发送消息给MQ
                OrderTO orderTO = new OrderTO();
                BeanUtils.copyProperties(_order, orderTO);
                //TODO 持久化消息到mq_message表中,并设置消息状态为3-已抵达(保存日志记录)
                rabbitTemplate.convertAndSend(MQConstant.order_event_exchange, "order.release.other", orderTO);
            } catch (Exception e) {
                // TODO 消息为抵达Broker,修改mq_message消息状态为2-错误抵达
            }
        }
    }

五、源码下载

https://gitee.com/charlinchenlin/koo-erp

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/687047.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

7-WebApis-1

Web APIs - 1 掌握DOM属性操作&#xff0c;完成元素内容设置&#xff0c;元素属性设置&#xff0c;控制元素样式 DOM简介获取DOM元素操作元素内容操作元素属性定时器-间隔函数综合案例 描述属性/方法效果获取DOM对象document.querySelector()获取指定的第一个元素document.que…

nuxt 设置i18n后多语言文件不会动态更新

nuxt 设置i18n后多语言文件不会动态更新 昨天遇到的一个问题&#xff0c;然后研究了一整天&#xff0c;今天才得到解决 nuxt 设置i18n多语言时多语言文件不会动态更新 我的原始代码如下&#xff1a; {modules: [nuxtjs/i18n,],i18n: {locales: [{code: en,iso: en-US,name:…

构建可靠软件的关键步骤之单元测试

引言&#xff1a;在当今快节奏的软件开发环境中&#xff0c;构建可靠的软件是至关重要的。单元测试作为软件开发过程中的关键步骤之一&#xff0c;能够帮助开发者发现和解决代码中的错误&#xff0c;确保代码的正确性。本文将详细介绍单元测试的概念、重要性以及如何有效地进行…

impala远程连接失败排查

周一开发反馈在本地电脑上连接impala失败&#xff0c;怀疑是服务问题。测试后发现服务正常&#xff0c;故障也恢复了&#xff0c;就没追究&#xff0c;第二天又出现相似的故障。服务依然正常。怀疑是网络问题。联系网络同事排查。telnet通。网络负载也不是很高&#xff0c;搁置…

档案库房温湿度标准及措施【档案八防十防解决方案】

档案馆库房温湿度调控标准及相应的措施方案 档案库房是档案保管的基本条件&#xff0c;档案库房温湿度与保护档案&#xff0c;延长档案寿命有很大关系。 档案库房适宜温湿度标准为&#xff1a;温度14℃—24℃&#xff0c;相对湿度45&#xff05;一60 一、库房温湿度对档案的影响…

基于Arduino单片机超声波测距仪设计

文章目录 摘 要 1.课程设计任务 1.1课程设计题目 1.2设计的要求 2.设计总体方案 2.1初步设计方案 2.2各个单元电路的设计要求 2.3主要性能指标 2.4总体方案 3.单元模块设计 3.1显示模块 3.2超声波测距模块 3.3蜂鸣器模块 3.4电机模块 3.5 LED二极管模块 4.软件…

【三维编辑】Editing Conditional Radiance Fields 编辑条件辐射场

Editing Conditional Radiance Fields&#xff08;ICCV 2021&#xff09; 作者单位&#xff1a;Steven Liu, Xiuming Zhang, Zhoutong Zhang, Richard Zhang MIT, Adobe Research, CMU 代码地址&#xff1a;https://github.com/stevliu/editnerf 文章目录 摘要前言一、相关工作…

避雷器带电监测仪

一、产品特点&#xff1a; 本机采用大屏幕液晶显示&#xff0c;全中文菜单操作&#xff0c;使用简便高精度采样、处理电路&#xff0c;先进的付里叶谐波分析技术&#xff0c;确保数据更加可靠 仪器采用独特的高速磁隔离数字传感器直接采ji输入的电压、电流信号&#xff0c;保证…

HTML 全面入门教程:从基础到高级

目录 一、基本结构和标签1. HTML 文档结构2. 常用标签 二、表单和输入元素1. 表单标签&#xff08;<form>&#xff09;2. 输入元素3.实例 三、样式和布局1. 内联样式2. 内部样式表3. 外部样式表 四、多媒体和嵌入内容1. 图像2. 音频和视频3. 嵌入内容 五、语义化标签语义…

胎压计PCBA方案设计

汽车的出现极大的方便了人们的交通出行&#xff0c;随着经济社会的发展&#xff0c;人们生活水平显著提高&#xff0c;不少家庭都购买了汽车。但是车主们不仅要知道开车&#xff0c;更需要知道检测汽车胎压。气压计也称为胎压计&#xff0c;是一种检测胎压的测量仪器。电子产品…

RabbitMQ学习笔记6(小滴课堂)路由,主题模式

我们去修改我们的生产者代码&#xff1a; 我们去修改我们的消费者&#xff1a; 第一个节点&#xff1a; 我们还要去创建其它更多的节点&#xff1a; 这里第二个节点我们只绑定一个交换机队列。 我们去分别启动消费者和生产者&#xff1a; 我们可以看到第一个交换机只绑定了一…

Deepin 20.8 linux convert 一寸照 调整图片尺寸413x579 300dpi

原图 convert修改尺寸指令 convert 一寸照.jpg -resize 413x579 一寸照413x579.jpg 目标图 尺寸已调整&#xff0c;dpi太低了 图片高清修复 提升dpi https://github.com/microsoft/Bringing-Old-Photos-Back-to-Life 官方安装过程参考 Installation Clone the Synchron…

2023腾讯云国际站注册流程介绍-腾讯云国际代充

腾讯云是国内三大云服务商之一&#xff0c;为国内外多个应用程序提供服务器支持。腾讯云的产品比较全面&#xff0c;包括云数据库、 CDN、对象存储&#xff08;COS&#xff09;和高防服务器等&#xff0c;满足各种上云需求。 1.腾讯云区分国内站和国际站&#xff0c;并存在明显…

C# MVC 多图片上传预览

一.效果图&#xff1a; 开发框架&#xff1a;MVC&#xff0c;Layui 列表主界面这里就不展示了&#xff0c;可以去看看这篇文章&#xff1a;Layui项目实战&#xff0c;这里讲的是“上传Banner”界面功能&#xff1a; 其中包括&#xff0c;多文件上传&#xff0c;预览&#xff0c…

【Python】python进阶篇之文件操作

文件操作 编码格式 python3默认的文件编码就是UTF-8 以下内容来源于AI 编码格式是指将字符、符号、数字等信息转化为二进制形式以便计算机能够理解和处理的规则或标准。在计算机领域&#xff0c;常见的编码格式有 ASCII、Unicode、UTF-8 等。 ASCII&#xff08;American Stand…

技术管理三板斧第一板斧拿结果-追过程

一、什么是过程管理&#xff1f; 管理就是追求事务的可持续发展&#xff0c;而想要达成这个目标有两个基本点&#xff1a; 管理动作要形成可持续迭代的闭环&#xff1b; 管理动作足够简单到可以复制和个性化升级。 过程管理当然也遵循这个理念。比如你这次 A 项目做得很好&a…

NXP i.MX 8M Plus工业开发板硬件说明书--上册( 四核ARM Cortex-A53 + 单核ARM Cortex-M7,主频1.6GHz)

前 言 本文档主要介绍创龙科技TLIMX8MP-EVM评估板硬件接口资源以及设计注意事项等内容。 创龙科技TLIMX8MP-EVM是一款基于NXP i.MX 8M Plus的四核ARM Cortex-A53 单核ARM Cortex-M7异构多核处理器设计的高性能工业评估板&#xff0c;由核心板和评估底板组成。ARM Cortex-A5…

FreeRTOS 队列传递结构体、内存块等复合数据类型

1. 队列一般传递的不是单个整型数据或者字符型数据&#xff0c;而是传递结构体或者内存块&#xff0c;一块内存的指针 2. 定义结构体数据类型 /* 定义队列传递的结构类型。 */ typedef struct { unsigned char ucValue; unsigned char ucSource; } xData; /* 声明两个xData类…

使用@RequiredArgsConstructor注入时@Qualifier失效问题

在一个项目中&#xff0c;使用到了 Lombok 的RequiredArgsConstructor注解来注入对象 Service RequiredArgsConstructor public class DeliveryServiceImpl implements DeliveryService {private final ResourceDao resourceDao; }因为ResourceDao接口有两个实现类 Repositor…

5大趋势!牛客CEO叶向宇深度解读《2023春季校园招聘白皮书》

校招并不只是一场求职者与企业间的相互选择&#xff0c;而是一场关乎未来的人才战略布局。 近日&#xff0c;牛客CEO叶向宇在「数智链接 向新而生」牛客青年人才招聘峰会中深度解读了《牛客2023春季校园招聘白皮书》中的5大校招趋势&#xff0c;为我们提供了宝贵的洞察。 01.校…