rabbitmq延时队列自动解锁库存

news2024/12/30 3:07:21

在这里插入图片描述
使用了最终一致性来解决分布式事务
当order服务出现异常回滚,此时ware服务无法回滚,怎么办?

使用seata全局事务虽然能在order服务出现异常导致回滚时使其他服务的也能同时回滚,但在流量大的情况下是使用加锁的方式,效率
低不适合并发量大的情况,也可以使用定时任务轮询去查看订单的状态,但是轮询的方式比较占资源和内存,所以选用最终一致性的方案,使用mq延时队列死信路由,然后做出补救方案,只要订单服务出现故障就通过mq定时去判断,只要能保证库存最终能解锁即可

延时队列自动解锁库存业务逻辑
ware服务在完成锁库存时就给mq发消息,把消息存到死信队列中,这个消息记录了那些商品锁定多少库存,当queue到达存活时间就会把消息交给死信路由交换机,死信路由交换机会把消息发到最终的队列,如果订单支付时间为30分钟,我们就把存活时间设置为40分钟,这样就能保证我们监听的消息一定是超过了支付的时间的,然后让ware库存服务去订阅监听最终的队列即可,只要有消息我们就去检验order订单服务,只要证明订单服务出现异常回滚或者订单超过支付时间未支付的订单我们就去做一个解锁还原库存的操作

1.库存锁定成功,给mq发消息

(1)保存 工作单(订单号)、工作单详情(商品锁了多少库存)
(2)把上面的数据给mq发一份

@Transactional
    @Override
    public boolean orderStock(OrderStockRequest orderStockRequest) {
        //保存工作单
        WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity();
        wareOrderTaskEntity.setOrderSn(orderStockRequest.getOrderSn());
        wareOrderTaskService.save(wareOrderTaskEntity);

        List<OrderItemVo> itemVos = orderStockRequest.getItemVos();
        List<SkuStockfromWare> collect = itemVos.stream().map(item -> {
            SkuStockfromWare skuStockfromWare = new SkuStockfromWare();
            skuStockfromWare.setSkuId(item.getSkuId());
            skuStockfromWare.setNum(item.getCount());
            //查询该商品在那些仓库有库存
            List<Long> wareId = wareSkuDao.skuStockfromWare(item.getSkuId());
            skuStockfromWare.setWareId(wareId);
            return skuStockfromWare;
        }).collect(Collectors.toList());

        //根据skuId遍历
        for (SkuStockfromWare skuStockfromWare : collect) {
            //判断是否锁定成功
            boolean flag = false;

            //判断该商品是否有仓库存在库存
            List<Long> wareIdList = skuStockfromWare.getWareId();
            if (wareIdList.size() < 0 || wareIdList == null){
                throw new NoWareStockException(skuStockfromWare.getSkuId());
            }
            for (Long wareId : wareIdList) {
                Long count = wareSkuDao.LockedStockFromWare(skuStockfromWare.getSkuId(),wareId,skuStockfromWare.getNum());
                if (count.equals(1L)){
                    //锁定成功
                    flag = true;

                    //保存工作单详情
                    WareOrderTaskDetailEntity wareOrderTaskDetailEntity = new WareOrderTaskDetailEntity();
                    wareOrderTaskDetailEntity.setSkuId(skuStockfromWare.getSkuId());
                    wareOrderTaskDetailEntity.setSkuNum(skuStockfromWare.getNum());
                    wareOrderTaskDetailEntity.setTaskId(wareOrderTaskEntity.getId());
                    wareOrderTaskDetailEntity.setWareId(wareId);
                    wareOrderTaskDetailEntity.setLockStatus(1);
                    wareOrderTaskDetailService.save(wareOrderTaskDetailEntity);
                    //TODO 库存锁定成功->发消息给交换机
                    StockLocked stockLocked = new StockLocked();
                    stockLocked.setTaskId(wareOrderTaskEntity.getId());
                    WareOrderTaskDetailTo wareOrderTaskDetailTo = new WareOrderTaskDetailTo();
                    BeanUtils.copyProperties(wareOrderTaskDetailEntity,wareOrderTaskDetailTo);
                    stockLocked.setDetailTo(wareOrderTaskDetailTo);
                    //convertAndSend(String exchange, String routingKey, Object object)
                    rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",);

                    //该商品锁定库存成功就执行下一个商品
                    break;
                }

            }

            //如果没有一个仓库扣成功,代表此skuId的库存不足
            if (!flag){
                throw new SkuNoStockException(skuStockfromWare.getSkuId());
            }

        }
        return true;
    }

2.监听队列,解锁库存

(1)判断工作单是否存在
不存在代表锁库存操作已回滚,不做处理
(2)查询订单是否存在
如果订单不存在,表示下订单操作已回滚,执行解锁库存操作
如果存在,查询订单状态是否为 4-已关闭,如果是 4-已关闭,执行解锁库存操作,订单其他状态不做处理
(3)解锁前判断工作单的状态是否为 1-已锁定,证明只做了锁定库存操作
(4)解锁库存,修改工作单详情状态为 已解锁

/**
 * 解锁库存
 */
@RabbitListener(queues = {"stock.release.stock.queue"})
@Service
public class UnLockStockListener {

    @Autowired
    WareSkuService wareSkuService;

    @RabbitHandler
    public void UnLockStock(StockLockedTo lockedTo, Channel channel, Message message) throws IOException {
        try {
            wareSkuService.unlockStock(lockedTo);
            //签收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            //拒签,让消息重新归队,等待服务器重启进行下一次解锁
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }

    }

}

解锁操作


    /**
     * 解锁库存
     *
     * (1).判断工作单是否存在
     *      不存在代表已回滚,不做处理
     *      (2).查询订单是否存在
     *          如果订单不存在,表示已回滚
     *             (3).执行解锁库存操作
     *          如果存在,查询订单状态是否为 4-已关闭
     *                如果是 4-已关闭,执行解锁库存操作,订单其他状态不做处理
     */
    @Override
    public void unlockStock(StockLockedTo lockedTo) {

        WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(lockedTo.getTaskId());
        //已回滚不做处理
        if (taskEntity != null){
            //查询订单是否存在
            R<OrderVo> r = orderFeignService.orderStatus(taskEntity.getOrderSn());
            if (r.getCode() == 0){
                OrderVo orderVo = r.getData(new TypeReference<OrderVo>() {
                });
                if (orderVo == null || orderVo.getStatus() == 4){
                    WareOrderTaskDetailTo detailTo = lockedTo.getDetailTo();
                    //判断工作单的状态是否为 1-已锁定,证明只做了锁定库存操作
                    if (detailTo.getLockStatus() == 1){
                        //恢复库存
                        unlock(detailTo.getId(),detailTo.getSkuNum(),detailTo.getSkuId(),detailTo.getWareId());
                    }
                }
            }else {
                throw new OrderFeignException();
            }

        }
    }

    /**
     * 解锁库存
     * UPDATE `wms_ware_sku` SET stock_locked = stock_locked - ?
     * WHERE sku_id = ? AND ware_id = ?
     */
    private void unlock(Long id,Integer skuNum, Long skuId, Long wareId) {
        wareSkuDao.unlock(skuNum,skuId,wareId);
        //修改状态为 已解锁
        WareOrderTaskDetailEntity wareOrderTaskDetailEntity = new WareOrderTaskDetailEntity();
        wareOrderTaskDetailEntity.setId(id);
        wareOrderTaskDetailEntity.setLockStatus(2);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/750941.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

transformer 学习

原理学习: (3条消息) The Illustrated Transformer【译】_于建民的博客-CSDN博客 代码学习: https://github.com/jadore801120/attention-is-all-you-need-pytorch/tree/master/transformer mask学习: (3条消息) NLP 中的Mask全解_mask在自然语言处理代表什么_郝伟博士的…

HTTP原理解析-超详细

作者&#xff1a;20岁爱吃必胜客&#xff08;坤制作人&#xff09;&#xff0c;近十年开发经验, 跨域学习者&#xff0c;目前于海外某世界知名高校就读计算机相关专业。荣誉&#xff1a;阿里云博客专家认证、腾讯开发者社区优质创作者&#xff0c;在CTF省赛校赛多次取得好成绩。…

linux 安装 milvus 和 Attu

效果图 准备 建议使用docker安装&#xff0c;比较简单易操作 查看自己是否安装docker-compose docker-compose --version 如果docker-compose 的版本低于2.0&#xff0c;会报错&#xff0c;报错内容如下&#xff1a; 所以在此之前需要把docker-compose升级到2.0版本 升级d…

Kafka 概述、Filebeat+Kafka+ELK

Kafka 概述、FilebeatKafkaELK 一、为什么需要消息队列&#xff08;MQ&#xff09;1、使用消息队列的好处2、消息队列的两种模式 二、Kafka 定义1、Kafka 简介2、Kafka 的特性3、Kafka 系统架构 三、部署 kafka 集群1.下载安装包2.安装 Kafka3.Kafka 命令行操作 四、Kafka 架构…

解决win11选择打开方式时卡死

解决win11选择打开方式时卡死 问题描述 右键想要打开的文件&#xff0c;选择打开方式&#xff0c;点击在电脑上选择应用&#xff0c;在地址栏输入地址&#xff0c;卡死 解决方法 在桌面底部点击右键&#xff0c;打开“任务管理器” 搜索“选取应用”进程 右键该进程&#…

Java postman+ajax

0目录 1.PostMan 2.实战&#xff08;引入阿贾克斯&#xff09; 1.PostMan 定义 Postman是一个接口测试工具 doPost 和doGet方法 配置xml 测试 PostMan测试 Get 请求 Post请求 测试 新建add.jsp 利用jsp实现post请求 Service方法实现doPost…

实时进度追踪与可视化:Gradio库中的Progress模块详解

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

【剑指offer】20. 链表中环的入口结点(java)

文章目录 链表中环的入口结点描述输入描述&#xff1a;返回值描述&#xff1a; 示例1示例2示例3思路完整代码 链表中环的入口结点 描述 给一个长度为n链表&#xff0c;若其中包含环&#xff0c;请找出该链表的环的入口结点&#xff0c;否则&#xff0c;返回null。 数据范围&…

java学习路程之篇五、知识点、变量、标识符、数据类型、Scanner键盘录入

文章目录 1、变量2、标识符3、数据类型4、Scanner键盘录入 1、变量 2、标识符 3、数据类型 4、Scanner键盘录入

twaver——树中选择子网,拓扑中显示子网里面的拓扑

twaver.network.Network.setCurrentSubNetwork ( currentSubNetwork [animate] [finishFunction] ) 将当前子网设置为指定子网&#xff0c;并且可以设置是否有动画效果&#xff0c;而且能指定设置当前子网结束后执行的动作 Parameters: currentSubNetwork twaver.SubNetwork 子…

OSPF(链路状态路由协议)

目录 OSPF&#xff08;链路状态路由协议&#xff09; 动态路由评判标准&#xff1a; 1.选路佳 2.收敛快 3.资源占用&#xff08;越小越好&#xff09; 相同于不同 RIP 和OSPF相同点&#xff1a; RIP 和OSPF不同点&#xff1a; 结构部署&#xff1a;区域规划 OSPF区域划…

4G 网络跟 5G 的区别

4G网络和5G网络是两种不同的移动通信技术&#xff0c;它们在数据传输速度、延迟、连接密度和网络容量等方面存在一些区别。以下是它们之间的主要区别&#xff1a; 1. 速度&#xff1a;5G网络的速度比4G网络更快。5G网络具备更广的频段和更高的频率&#xff0c;能够提供更大的带…

上位机一般的开发工具?

上位机开发工具是用于开发和构建上位机应用程序的软件工具。它们提供了一系列功能和资源&#xff0c;帮助开发人员设计、编写和调试上位机应用程序。以下是一些常见的上位机开发工具&#xff1a;Visual Studio&#xff1a;作为一种集成开发环境&#xff08;IDE&#xff09;&…

shardingsphere mybatisplus properties和yml配置实现

shardingsphere mybatisplus properties和yml配置实现 目录结构 model package com.oujiong.entity; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; import java.util.Date;/*** user表*/ TableName("user") Data public class Use…

CSS整段文字缩进(一段多行文字中首列位置相对应)

<style>p {text-align: justify;padding-left: 2em;} </style>

webpack5性能优化

webpack构建速度 一、优化babel-loader 注意&#xff1a;开启缓存,配置后打包是就能缓存babel webpack.common.js文件命中缓存cacheDirectory {test: /\.js$/,use: [babel-loader?cacheDirectory],include: srcPath,exclude: /node_modules/ }, 测试&#xff1a; 打包后的…

ChatGLM-6B+LangChain实战

目标&#xff1a;原始使用ChatGLM-6B可接受的文字长度有限&#xff0c;打算结合LangChain实现长文本生成摘要. 方法&#xff1a; step1&#xff1a;自定义一个GLM继承LangChain中的langchain.llms.base.LLM&#xff0c;load自己的模型. step2&#xff1a;使用LangChain的mapred…

Web前端 Day 5

js初体验 使得代码可以具有某些行为 <body><button>点击我变成粉色</button><script>const btn document.querySelector(button)btn.addEventListener(click, () > {btn.style.backgroundColor pink ​})</script> </body> 效果图…

Maven详见及在Idea中的使用方法[保姆级包学包会]

文章目录 Maven详解1.1 目标1.2 Maven概括1.3 多模块开发1.3.1 pom.xml1.3.2 生命周期1.3.3 依赖特性(多模块1)1.3.4 继承特性(多模块2)1.3.5 dependencyManagement标签1.3.6 Maven-聚合(多模块3)聚合 1.3.6.1聚合总结 Maven详解 1.1 目标 maven是什么?maven能干什么?maven…

java并发编程 10:AQS

目录 什么是AQS原理 什么是AQS juc包的结构如下图&#xff1a; AQS就是AbstractQueuedSynchronizer&#xff0c;是个抽象类&#xff0c;实现了自己的一些方法。它是阻塞式锁和相关的同步器工具的框架。很多并发类都是基于它实现的&#xff0c;如&#xff1a;ReentrantLock、Co…