谷粒商城中消息队列的使用

news2024/12/25 9:19:24

目录

一、概述

二、步骤

三、说明

四、详细步骤

五、总结


一、概述

在订单服务中使用到了消息队列  具体就是解决关单还有自动解锁库存的功能

其实就是使用消息队列的延迟队列的功能 达到一个定时任务的作用

使用消息队列到达最终一致性的效果

比如说库存 当下单之后 执行锁库存的远程方法

如果说下订单的那个方法,在锁库存之后出现了异常,那么这个下订单的方法会回滚,但是这个锁库存的远程方法,已经锁定,就不能够再回滚了。这样就会造成订单没有下成,但是库存却锁定了

二、步骤

一旦开始下单,锁库存之后,就往这个库存的延迟队列里面发送消息,在库存服务中,消费这个消息,来选择是否需要解锁库存

三、说明

其实这一大堆的代码,可以使用分布式事务的一个注解就能解决,因为这就是一个服务与服务之间的那种独立性,造成只能回滚订单,不能回滚库存的这样一种局面,而使用这种分布式事务,就能将多个服务视为一个事务中,也就能够一处出问题,处处回滚。

但是分布式事务相当于整个完全就是一个大事务,相当于串行化了,效率就低了。但是它的好处能够保证即时一致性

而现在这种下单的业务,我们对高并发,效率这方面有要求,而其实并不需要强一致性,虽然当初数据不合理,就让它不合理,只要最终弥补回来就行了

其实就是自动解锁库存,利用了消息队列的这种消息发布与订阅达到最终一致性的效果

四、详细步骤

整个流程图:

1. 搭建消息队列环境

1.1 将消息队列的依赖导入:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.2 启动类上面启动消息队列服务

加上@EnableRabbit注解

1.3 配置消息转换器使得发消息还有收消息能以实体类的方式接收

@Configuration
public class MyRabbitConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

创建队列、交换机、绑定关系

创建队列:延迟队列、死信队列

注意:这个延迟队列的超时时间不能使用字符串,得使用整形数字

订单相关:

@Configuration
public class RabbitMQBeanConfig {

    @Bean
    public Queue orderDelayQueue(){
        //String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments
        HashMap<String, Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange","order-event-exchange");
        map.put("x-dead-letter-routing-key","order.release.order");
        map.put("x-message-ttl",60000);
        return new Queue("order-delay-queue",true,false,false,map);
    }

    @Bean
    public Queue orderReleaseStockQueue(){
        return new Queue("order.release.order.queue",true,false,false,null);
    }

    @Bean
    public Exchange orderEventExchange(){
        return new TopicExchange("order-event-exchange",true,false);
    }

    @Bean
    public Binding createOrder(){
        //String destination, DestinationType destinationType,
        // String exchange, String routingKey, @Nullable Map<String, Object> arguments
        return new Binding("order-delay-queue", Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);
    }

    @Bean
    public Binding releaseOrder(){
        //String destination, DestinationType destinationType,
        // String exchange, String routingKey, @Nullable Map<String, Object> arguments
        return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);
    }


    @Bean
    public Binding OrderReleaseOther(){
        return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE,"order-event-exchange","order.release.other.#",null);
    }

}

库存相关:

@Configuration
public class RabbitMQBeanConfig {

    @Bean
    public Queue stockDelayQueue(){
        //String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments
        HashMap<String, Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange","stock-event-exchange");
        map.put("x-dead-letter-routing-key","stock.release.stock");
        map.put("x-message-ttl",120000);
        return new Queue("stock-delay-queue",true,false,false,map);
    }

    @Bean
    public Queue stockReleaseStockQueue(){
        return new Queue("stock.release.stock.queue",true,false,false,null);
    }

    @Bean
    public Exchange stockEventExchange(){
        return new TopicExchange("stock-event-exchange",true,false);
    }

    @Bean
    public Binding stockLocked(){
        //String destination, DestinationType destinationType,
        // String exchange, String routingKey, @Nullable Map<String, Object> arguments
        return new Binding("stock-delay-queue", Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);
    }

    @Bean
    public Binding stockRelease(){
        //String destination, DestinationType destinationType,
        // String exchange, String routingKey, @Nullable Map<String, Object> arguments
        return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);
    }

}

2. 发消息

自动解锁库存:

完整锁库存:

@Override
    @Transactional
    public Boolean lockWareStock(WareStockIdsDto wareStockIdsDto) {

        //创建订单工作单
        WareOrderTask wareOrderTask = new WareOrderTask();
        wareOrderTask.setOrderSn(wareStockIdsDto.getOrderSn());
        wareOrderTaskService.save(wareOrderTask);

        List<OrderItem> orderItems = wareStockIdsDto.getOrderItems();
        orderItems.stream().forEach(item->{

            WareStockIds wareStockIds = new WareStockIds();
            wareStockIds.setSkuId(item.getSkuId());
            //查询商品的所有库存信息
            List<Long> wareIds = this.getBaseMapper().wareStockIds(item.getSkuId());
            wareStockIds.setWareIds(wareIds);
            //锁库存
            if(wareIds!=null && wareIds.size()>0){
                boolean flag = false;
                for (Long wareId : wareIds) {
                    int row = this.getBaseMapper().lockWareStock(item.getSkuId(), wareId, item.getSkuQuantity());
                    if(row==1){
                        flag = true;
                        WareOrderTaskDetail wareOrderTaskDetail = new WareOrderTaskDetail();
                        wareOrderTaskDetail.setTaskId(wareOrderTask.getId());
                        wareOrderTaskDetail.setSkuId(item.getSkuId());
                        wareOrderTaskDetail.setWareId(wareId);
                        wareOrderTaskDetail.setSkuNum(item.getSkuQuantity());
                        wareOrderTaskDetail.setLockStatus(StockStatus.LOCK);
                        wareOrderTaskDetailService.save(wareOrderTaskDetail);

                        WareStockTaskTo wareStockTaskTo = new WareStockTaskTo();
                        wareStockTaskTo.setTaskId(wareOrderTask.getId());
                        wareStockTaskTo.setTaskDetailId(wareOrderTaskDetail.getId());
                        rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",wareStockTaskTo);
                        break;
                    }
                }
                if(!flag){
                    throw new WareStockEmptyException(item.getSkuId()+"的商品库存不足");
                }
            }else{
                //库存不足,则直接抛出异常,整个锁定库存失败
                throw new WareStockEmptyException(item.getSkuId()+"的商品库存不足");
            }
        });
        return true;
    }

发消息:发送到库存延迟队列

一旦锁库存成功,就需要发送消息,检测这个库存锁定是否合理;

rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",wareStockTaskTo);

发送之前需要保存工作单、工作单详情也就是相当于记录日志的方式,方便之后消息消费的时候来解锁库存:

3. 消费消息

注意:使用消息消费监听器进行消费,不要忘记一旦消费失败就要重新将消息放入到消息队列中

当消费成功之后,要ack消息 手动提交 只有这样才能防止消息丢失,因为如果是默认自动提交,那么如果消息消费的时候出现了异常,消息并没有合理消费,此时是需要重试的,但是已经自动ack了,就没有消息了。

@RabbitListener(queues = "stock.release.stock.queue")
@Component
@Slf4j
public class StockReleaseStockQueueListen {


    @Autowired
    private IWareSkuService wareSkuService;

    @RabbitHandler
    public void handle(WareStockTaskTo wareStockTaskTo, Message message, Channel channel) throws IOException {
        try{
            log.info("handle开始解锁库存...{}",wareStockTaskTo);
            wareSkuService.unLock(wareStockTaskTo);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }
    }
    
}
@Override
    public void unLock(WareStockTaskTo wareStockTaskTo) {
        Long taskDetailId = wareStockTaskTo.getTaskDetailId();
        WareOrderTaskDetail orderTaskDetail = wareOrderTaskDetailService.getById(taskDetailId);
        //如果工作单详情为空,则代表库存服务本身自己就回滚了,无需解锁
        if(orderTaskDetail!=null){
            Long taskId = wareStockTaskTo.getTaskId();
            WareOrderTask wareOrderTask = wareOrderTaskService.getById(taskId);
            //通过工作单进而得到订单
            String orderSn = wareOrderTask.getOrderSn();
            OrderTo order = orderFeignService.getOrder(orderSn);
            //当订单不存在或者是订单已被取消则需要解锁库存
            if(order==null || OrderStatusEnum.CANCLED.equals(order.getStatus().intValue())){
                //只有是上锁状态才能解库存
                //这样其实就保证了幂等性
                if(StockStatus.LOCK.equals(orderTaskDetail.getLockStatus())){
                    unlock(orderTaskDetail);
                }
            }
        }
    }
private void unlock(WareOrderTaskDetail orderTaskDetail) {
        getBaseMapper().unLock(orderTaskDetail.getSkuId(), orderTaskDetail.getWareId(), orderTaskDetail.getSkuNum());
        WareOrderTaskDetail wareOrderTaskDetail = new WareOrderTaskDetail();
        wareOrderTaskDetail.setId(orderTaskDetail.getId());
        //将工作单详情状态修改为已解锁状态
        wareOrderTaskDetail.setLockStatus(StockStatus.UN_LOCK);
        wareOrderTaskDetailService.updateById(wareOrderTaskDetail);
    }

自动收单:

提交订单:

@Transactional
    //@GlobalTransactional
    @Override
    public OrderRespTo submitOrder(SubmitOrderDto submitOrderDto) {
        OrderRespTo orderRespTo = createOrder(submitOrderDto);
        if(orderRespTo.getCode().equals(0)){
            //保存订单
            saveOrder(orderRespTo);
            //发送消息 释放订单
            Order order = orderRespTo.getOrder();
            rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order);
            //锁定库存
            WareStockIdsVo wareStockIdsVo = new WareStockIdsVo();
            wareStockIdsVo.setOrderSn(orderRespTo.getOrder().getOrderSn());
            wareStockIdsVo.setOrderItems(orderRespTo.getOrderItems());
            //调用远程库存服务锁库存
            AjaxResult result = wareFeignService.lockWareStok(wareStockIdsVo);
            //int i = 10/0;
            if(result.get("code").equals(200)){
                return orderRespTo;
            }else{
                orderRespTo.setCode(3);
            }
        }
        return orderRespTo;
    }

发消息:只要保存了订单就需要往订单的延迟队列中发消息

rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order);

消费消息:

消息消费监听器

@Component
@RabbitListener(queues = "order.release.order.queue")
public class OrderReleaseOrderQueueListener {


    @Autowired
    private IOrderService orderService;

    @RabbitHandler
    public void handleReleaseOrder(Order order, Message message, Channel channel) throws IOException {
        try{
            orderService.releaseOrder(order);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }


    }
}
@Override
    public void releaseOrder(Order order) {
        Order currentOrder = getById(order.getId());
        //如果此时订单还是一个新建状态 未支付状态 就需要释放订单
        if (OrderStatusEnum.CREATE_NEW.getCode().equals(currentOrder.getStatus().intValue())) {
            Order order1 = new Order();
            order1.setId(currentOrder.getId());
            order1.setStatus(new Long(OrderStatusEnum.CANCLED.getCode().intValue()));
            updateById(order1);
            OrderTo orderTo = BeanCopyUtils.copyBean(order, OrderTo.class);
            //向库存服务发送消息
            rabbitTemplate.convertAndSend("stock-event-exchange","stock.release.other",orderTo);
        }
    }

看到代码中,存在这一步:

//向库存服务发送消息
rabbitTemplate.convertAndSend("stock-event-exchange","stock.release.other",orderTo);

为什么还要向库存服务发送消息,来解锁库存呢?

是为了防止如果在关单的时候阻塞了,订单的状态还没来得及修改,而此时需要消费库存消息了,此时判断订单状态并不是已取消状态,则无法解锁库存,造成库存永远无法解锁

正常情况下消费库存在关单之后就不会出现这种问题,但是发生了阻塞就没办法了。

所以就得在关单之后同时还要发消息,通知库存服务来判断订单的真实状态,如果是取消状态,则需要解锁库存

而且现在不再向延迟队列里面发送消息,而是直接向死信队列中发送消息,来消息这个消息,因此死信队列现在又多了一个消费者,两个消费者同时订阅这个消息,通过消息实体类的不同区分不同的消费逻辑。

@RabbitHandler
    public void handle1(OrderTo orderTo,Message message, Channel channel) throws IOException {
        try{
            log.info("handle1开始解锁库存...{}",orderTo);
            wareSkuService.unLock(orderTo);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        }
    }

 

@Override
    public void unLock(com.sq.gulimall.to.mq.OrderTo orderTo) {
        String orderSn = orderTo.getOrderSn();
        Long status = orderTo.getStatus();
        //当订单是取消状态时解锁库存
        if(OrderStatusEnum.CANCLED.getCode().equals(status.intValue())){
            LambdaQueryWrapper<WareOrderTask> wrapper = new LambdaQueryWrapper<WareOrderTask>().eq(WareOrderTask::getOrderSn, orderSn);
            WareOrderTask orderTask = wareOrderTaskService.getOne(wrapper);
            LambdaQueryWrapper<WareOrderTaskDetail> wrapper1 = new LambdaQueryWrapper<WareOrderTaskDetail>().eq(WareOrderTaskDetail::getTaskId, orderTask.getId());
            List<WareOrderTaskDetail> list = wareOrderTaskDetailService.list(wrapper1);
            for (WareOrderTaskDetail wareOrderTaskDetail : list) {
                //同样也是只有当库存是已锁定状态时才需要去解锁
                if(StockStatus.LOCK.equals(wareOrderTaskDetail.getLockStatus())){
                    unlock(wareOrderTaskDetail);
                }
            }
        }
    }
private void unlock(WareOrderTaskDetail orderTaskDetail) {
        getBaseMapper().unLock(orderTaskDetail.getSkuId(), orderTaskDetail.getWareId(), orderTaskDetail.getSkuNum());
        WareOrderTaskDetail wareOrderTaskDetail = new WareOrderTaskDetail();
        wareOrderTaskDetail.setId(orderTaskDetail.getId());
        //将工作单详情状态修改为已解锁状态
        wareOrderTaskDetail.setLockStatus(StockStatus.UN_LOCK);
        wareOrderTaskDetailService.updateById(wareOrderTaskDetail);
    }

五、总结

使用消息队列解决多服务下面的数据不一致的问题,通过最终一致性解决

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1083301.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

超强大的 Nginx 可视化管理平台 Nginx-Proxy-Manager

一、简介 Nginx-Proxy-Manager 是一个基于 Web 的 Nginx 服务器管理工具&#xff0c;它允许用户通过浏览器界面轻松地管理和监控 Nginx 服务器。通过 Nginx-Proxy-Manager&#xff0c;可以获得受信任的 SSL 证书&#xff0c;并通过单独的配置、自定义和入侵保护来管理多个代理…

通过线程池方式改造Stream.parallel()并行流

目录 一、IntStream.rangeClosed并行流二、线程池方式改造1、创建线程池2、线程类3、信心满满&#xff0c;走起来 三、再次解决并发时i原子性问题四、并行流与多线程1、并行和并发的区别&#xff1f;2、并行和并发的使用场景 大家好&#xff0c;我是哪吒。 上一篇简单聊一聊公…

从解决问题到人生规划

从解决问题到人生规划&#xff0c;如何通过深度思考&#xff0c;让自己成为这个世界上最顶级的人才&#xff1f; 我们对于问题的理解一般有6个层次&#xff0c;每个层次的深度不同&#xff0c;决定了我们思考的深度和看问题的眼界。 首先&#xff0c;来想象这样一个场景&#x…

graphviz 绘制二叉树

代码 digraph BalancedBinaryTree {node [fontname"Arial", shapecircle, stylefilled, color"#ffffff", fillcolor"#0077be", fontsize12, width0.7, height0.7];edge [fontname"Arial", fontsize10, color"#333333", arr…

上海亚商投顾:沪指冲高回落 医药、芯片股全天领涨

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 沪指昨日小幅反弹&#xff0c;创业板指盘中涨超1.6%&#xff0c;午后涨幅有所收窄。医药医疗股全线走强&#…

俩个el-select的联动选择

需求&#xff1a; 1.有俩个select下拉框&#xff0c;之后左边选中后右边根据左边的选择自动选择内容 2.右边自动选择之后可以取消。 3.右侧的下拉框只能选中左侧下拉框的内容&#xff0c;左边没选中的右边也不能被选中 4.左侧下拉添加全选功能 5.左侧选择右侧没选择就把右侧数据…

如何避免 IDEA 每次重启都index

如何避免 IDEA 每次重启都index 在 IntelliJ IDEA 中&#xff0c;可以通过以下几个步骤来避免每次重启时索引&#xff1a; 打开 File -> Settings 菜单。在左侧的菜单栏中选择 “Appearance & Behavior” -> “System Settings” -> “Synchronization”。 在右…

与艺术同频!卡萨帝在海外崭露头角

在品牌全球化步伐日益加快的当下&#xff0c;高端品牌如何真正实现业务全球化、品牌全球化乃至用户圈层全球化&#xff1f; 作为国际高端家电引领者&#xff0c;卡萨帝今年以来在全球范围内展开了一系列的品牌布局活动。1月&#xff0c;卡萨帝于巴基斯坦召开品牌发布会&#x…

生产ERP管理系统源码 ERP系统源码

生产ERP管理系统 1、产品管理系统 产品资料系统包括两方面的内容&#xff1a;物料主文件和产品结构&#xff0c;ERP系统企业管理软件平台最基本的信息&#xff0c;绝大多数物流、制造、甚至财务类系统均要使用到产品资料的信息。 &#xff08;1&#xff09;、全方位描述物料…

网工实验笔记:匹配工具ACL的使用

一、概述 访问控制列表简称为ACL&#xff0c;它使用包过滤技术&#xff0c;在路由器上读取第3层及第4层包头中的信息&#xff0c;如源地址、目的地址、源端口和目的端口等&#xff0c;根据预告定义好的规则对包进行过滤从而达到访问控制的目的。ACL分很多种&#xff0c;不同场…

用ChatGPT+Midjourney 5分钟生成30条爆款小红书图文(内有详细教程)

本期是赤辰第35期AI项目教程&#xff0c;文章底部准备了粉丝福利&#xff0c;看完后可免费领取&#xff01;今天给大家讲一下如何5分钟生成30条爆款小红书图文先说个账号&#xff0c;这个应该有同学也看过&#xff0c;前几个月在小红书有个涨粉很快的AI绘画项目&#xff0c;就是…

python自动化操作邮箱

POP3、IMAP、SMTP&#xff0c;CardDAV、CalDAV协议特点 POP3 POP3是Post Office Protocol 3的简称&#xff0c;即邮局协议的第3个版本,它规定怎样将个人计算机连接到Internet的邮件服务器和下载电子邮件的电子协议。它是因特网电子邮件的第一个离线协议标准,POP3允许用户从服…

微信小程序支持h5实现webrtc h264 h265低延迟传输渲染

微信小程序自成体系&#xff0c;自身也带了很强的rtc音视频能力&#xff0c;但是他捆绑了他自己的服务&#xff0c;开发也相对受限于他的api。基于以前的了解可以采webview的方式内嵌h5网址来实现自定义的webrtc.但实践起来并不轻松&#xff0c;主要是小程序的严格限制&#xf…

docker-compose Install hfish

前言hfish HFish是一款社区型免费蜜罐,侧重企业安全场景,从内网失陷检测、外网威胁感知、威胁情报生产三个场景出发,为用户提供可独立操作且实用的功能,通过安全、敏捷、可靠的中低交互蜜罐增加用户在失陷感知和威胁情报领域的能力。 HFish具有超过40种蜜罐环境、提供免费…

VS编译的时候不生成Release文件夹

方法描述&#xff1a; Build>Configuration Manager>Release 编译》配置管理》选择发布版本 再编译就有了 具体操作过程 第一步&#xff1a; 第二步&#xff1a; 第三步&#xff1a; 特此记录 anlog 2023年10月12日

在线答题+考试出题小程序源码系统,轻松无忧,功能强大

今天给大家分享一款在线答题小程序源码系统&#xff0c;功能强大&#xff0c;搭建起来也比较简单&#xff0c;同时还具有在线考试&#xff0c;轻松出题的功能&#xff0c;适用于各个行业。 功能展示具体罗列部分如下&#xff1a; 基础设置&#xff08;全局设置&#xff0c;…

信创办公–基于WPS的PPT最佳实践系列 (绘制自选图形)

信创办公–基于WPS的PPT最佳实践系列 &#xff08;绘制自选图形&#xff09; 目录 应用背景操作步骤1、记忆复制&#xff1a;CTRLD2、微移&#xff1a;CTRL四个方向键 应用背景 如果想将文字转为简单而形象的smartart图形&#xff0c;但是又找不到自己想要的图形&#xff0c;我…

14.2 Socket 反向远程命令行

在本节&#xff0c;我们将继续深入探讨套接字通信技术&#xff0c;并介绍一种常见的用法&#xff0c;实现反向远程命令执行功能。对于安全从业者而言&#xff0c;经常需要在远程主机上执行命令并获取执行结果。本节将介绍如何利用 _popen() 函数来启动命令行进程&#xff0c;并…

实现paho.mqtt.cpp库编译

编译环境 WIN10 VS2019 CMake 3.27.4 因为paho.mqtt.cpp依赖paho.mqtt.c 安装paho.mqtt.c 先下载paho.mqtt.c安装包&#xff0c;win下有安装包 地址: https://github.com/eclipse/paho.mqtt.c/releases/tag/v1.3.12 编译paho.mqtt.cpp 然后下载 paho.mqtt.cpp 源码Release…

【Python 零基础入门】 函数

【Python 零基础入门】第五课 函数 【Python 零基础入门】第五课 函数函数在生活中的类比函数为什么要使用函数函数的格式无参函数含参函数 参数形参实参 变量作用域局部变量全局变量 递归函数基本的递归斐波那契数列 Lambda 表达式高阶函数map 函数filter 函数reduce 函数结合…