秒杀业务优化之从分布式锁到基于消息队列的异步秒杀

news2025/3/25 22:08:32

一、业务场景介绍

        优惠券、门票等限时抢购常常出现在各类应用中,这样的业务一般为了引流宣传而降低利润,所以一旦出现问题将造成较大损失,那么在业务中就要求我们对这类型商品严格限时、限量、每位用户限一次、准确无误的创建订单,这样的要求看似简单,但在分布式系统中,要求我们充分考虑高并发下的线程安全问题,今天我们来看一下两种解决思路。

二、基于Redisson分布式锁的秒杀方案

        这里我们就不进行自定义redis锁了,Redisson 基于 Redis 实现了 Java 驻内存数据网格(In-Memory Data Grid),它不仅提供了对 Redis 原生命令的封装,还提供了一系列高级的分布式数据结构和服务,促进使用者对 Redis 的关注分离,让开发者能够更专注于业务逻辑,所以我们直接使用Redisson,但底层源码还是需要我们去自己学习掌握的。

1.流程概览

        其实单看流程图我们就能发现这一连串的串行逻辑就会非常影响效率,我们先留着这个问题后面优化 。

2.具体实现

    @Override
    public Result generate(Long voucherId) {
        //查询优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        //活动是否开始/结束
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            return Result.fail("活动未开始!");
        }
        if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
            return Result.fail("活动已结束!");
        }
        //库存表是否充足
        if (voucher.getStock()<1) {
            return Result.fail("库存不足!");
        }
        Long userId = UserHolder.getUser().getId();//只锁同一个id

        //创建锁对象
        RLock lock = redissonClient.getLock("lock:order:" + userId);
        //获取锁,防止同一用户的并发请求
        boolean isLock = lock.tryLock();//默认不等待,30秒过期
        if (!isLock) {
            //获取锁失败
            return Result.fail("网络繁忙!");
        }
        //拿到spring事务代理,这里为了简单解决事务自调用直接去拿代理可能造成问题,建议将事务方法重构至另一服务类并注入
        try {
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        } finally {
            //释放锁
            lock.unlock();
        }
    }


    @Transactional//要锁住事物,防止事物在锁释放后才提交导致其他线程进入
    public Result createVoucherOrder(Long voucherId) {
        Long userId = UserHolder.getUser().getId();
        //一人一单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        if (count >0) {
            return Result.fail("您最多只可购买一单!");
        }
        //扣减库存
        boolean flag = seckillVoucherService.update()
                .setSql("stock=stock-1")
                .eq("voucher_id", voucherId).gt("stock",0)
                .update();
        if (!flag){
            //高并发下已经被其他用户线程扣减
            return Result.fail("库存不足2!");
        }
        //创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        //唯一ID
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        //用户id
        voucherOrder.setUserId(userId);
        //代金券Id
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);
        //返回订单ID
        return Result.ok(orderId);

    }

3.测试分析

 接下来我们登录数据库中所有的用户并记录Authorization

@SpringBootTest
@Component
public class SecKill {
    @Autowired
    private IUserService userService;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Test
    void userLogin() throws IOException {
        // 定义保存 token 的文件路径
        String filePath = "D:\\tokens.txt";

        // 使用 BufferedWriter 写入文件
        try (BufferedWriter writer = new BufferedWriter(new FileWriter(filePath, true))) { // 追加模式
            for (User user : userService.list()) {
                String phone = user.getPhone();
                HttpSession session = null;
                userService.sendCode(phone, session);
                String code = stringRedisTemplate.opsForValue().get("login:code:" + phone);
                LoginFormDTO loginFormDTO = new LoginFormDTO();
                loginFormDTO.setCode(code);
                loginFormDTO.setPhone(phone);
                String token = userService.logIn(loginFormDTO, session);

                // 将 token 写入文件
                writer.write(token);
                writer.newLine(); // 换行
                writer.flush(); // 刷新缓冲区,确保数据写入文件
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

然后我们设置优惠券数量为200,通过jmeter(一款测试工具,大家自行学习如何使用)模拟数据库中1000多个用户总计每秒1000的高并发请求

 从聚合报告中可以看到虽然80%的异常率确实满足了我们对优惠券的限量要求,通过查看数据库订单和库存也不存在问题,但是我们可以看到我们的平均响应时间在高并发下达到了344ms,吞吐量只有1200左右,如果面临更高的并发难免因性能局限出现问题。

三、基于消息队列的异步秒杀

1.问题分析

正如我们一开始发现的,每个请求来到服务器都需要执行一串的数据库读写操作,而写操作耗时是比较久的,可是当我们确定用户抢单成功后只要能确保订单最终写入即可,无需让其阻塞请求,所以我们其实可以将读写操作分离开。

我们可以利用读操作完成下单资格的各种校验,校验成功即可对请求做出响应,那么后续写订单操作怎么完成呢?我们需要根据校验成功的记录完成写操作,那谁来完成校验成功的记录呢,这样记录是不是又和原来的读写串行一样了呢?

2.工具对比

首先我们的目的是加快请求响应效率,减轻数据库压力,其实我们需要的就是一个中间工具做到能够快速存储校验成功的记录并有限制的可控的逐渐将存储起来的记录转发给数据库让其创建订单,能做到上述要求的工具有很多,这里简单对比以下三种供大家参考。

特性/技术阻塞队列RedisMQ消息中间件(如RabbitMQ、Kafka)
系统解耦低,主要用于单机环境中,支持集群部署高,天然用于系统解耦
异步通信支持,但需要手动实现通过发布/订阅模式实现专为异步通信设计
削峰填谷临时存储请求,能力有限缓存请求,需合理设计策略缓存大量请求,后端按速率消费
可靠性和持久性依赖具体实现,需额外持久化支持持久化,可靠性较高高可靠性和持久性,支持消息确认
性能和吞吐量受限于单机处理能力性能较高,支持集群最高,适用于大规模分布式系统
功能丰富性单一,主要用于线程间通信支持多种数据结构和操作支持多种消息协议、路由机制等
开发和维护成本低,但需手动实现异步逻辑中等,易于实现和使用高,需学习和理解相关协议和机制
适用场景小规模、单机环境中小规模、集群部署大规模分布式系统、复杂路由

 3.流程概览

        由于阻塞队列局限较大,MQ中间件比较简单,这里我们以Redis中的stream为例(除此之外,list和PubSub也能实现,但是局限较大)实现异步秒杀。

对于红框部分,为了确保原子性,我们借助lua脚本完成,这样一来我们就将MySQL的读写操作分离开来,请求响应中只需要读取验证,用redis更高效的io操作完成简单记录,随后异步逐渐处理MySQl的订单写入。

4.具体实现

  • lua脚本
    ---
    --- Generated by EmmyLua(https://github.com/EmmyLua)
    --- Created by cds.
    --- DateTime: 2025/3/23 13:03
    ---
    --1.参数列表
    --1.1优惠券id
    local voucherId=ARGV[1]
    --1.2用户id
    local userId=ARGV[2]
    --1.3订单id
    local orderId=ARGV[3]
    
    --2.数据key
    --2.1库存key
    local stockKey='seckill:stock:' .. voucherId
    --2.2订单key
    local orderKey='seckill:order:' .. voucherId
    
    --脚本业务
    --判断库存是否充足
    if (tonumber(redis.call('get',stockKey))<=0) then
        --库存不足返回1
        return 1
    end
    --判断用户是否下单
    if (redis.call('sismember',orderKey,userId)==1) then
        --下过单返回2
        return 2
    end
    --扣库存
    redis.call('incrby',stockKey,-1)
    --下单
    redis.call('sadd',orderKey,userId)
    --发送消息到消息队列  xadd stream.orders * k1 v1 k2 v2 ..
    redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,"id",orderId)
    
    return 0
  •  具体业务
        @Autowired
        private IVoucherOrderService proxy;
        //初始化lua脚本信息
        private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
        static {
            SECKILL_SCRIPT =new DefaultRedisScript<>();
            SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
            SECKILL_SCRIPT.setResultType(Long.class);
        }
        //异步单例线程
        private static final ExecutorService SECKILL_ORDER_EXECTUOR= Executors.newSingleThreadExecutor();
        //在spring的Bean初始化并注入后开始
        @PostConstruct
        private void init(){
            SECKILL_ORDER_EXECTUOR.submit(new VoucherOrderHandler());
        }
        //线程任务
        private class VoucherOrderHandler implements Runnable {
            String queueName = "stream.orders";
            @Override
            public void run() {
                while (true) {
                    try {
                        //获取消息队列中的订单信息  XREAD GROUP group1 c1 count 1 block 2000 streams stream.orders >
                        List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                                Consumer.from("group1", "c1"),
                                StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                                StreamOffset.create(queueName, ReadOffset.lastConsumed())
                        );
                        //判断消息是否获取成
                        if (list == null || list.isEmpty()) {
                            //获取失败 没有消息,继续循环
                            continue;
                        }
                        //获取成功,可以下单
                        //解析消息中的订单信息
                        MapRecord<String, Object, Object> record = list.get(0);
                        Map<Object, Object> value = record.getValue();
                        VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                        handleVoucherOrder(voucherOrder);
                        //ACK确认 SACK stream.orders group1 id
                        stringRedisTemplate.opsForStream().acknowledge(queueName, "group1", record.getId());
                    } catch (Exception e) {
                        log.error("创建订单异常{}", e.getMessage());
                        //有异常去pendingList拿
                        handlePendingList();
                    }
                }
            }
    
            private void handlePendingList() {
                while (true) {
                    try {
                        //获取pending-list队列中的订单信息  XREAD GROUP group1 c1 count 1 block 2000 streams stream.orders 0
                        List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                                Consumer.from("group1", "c1"),
                                StreamReadOptions.empty().count(1),
                                StreamOffset.create(queueName, ReadOffset.from("0"))
                        );
                        //判断消息是否获取成
                        if (list == null || list.isEmpty()) {
                            //获取失败 pending-list没有消息,结束循环
                            break;
                        }
                        //获取成功,可以下单
                        //解析消息中的订单信息
                        MapRecord<String, Object, Object> record = list.get(0);
                        Map<Object, Object> value = record.getValue();
                        VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                        handleVoucherOrder(voucherOrder);
                        //ACK确认 SACK stream.orders group1 id
                        stringRedisTemplate.opsForStream().acknowledge(queueName, "group1", record.getId());
                    } catch (Exception e) {
                        log.error("创建订单异常{}", e.getMessage());
                        try {
                            Thread.sleep(20);
                        } catch (InterruptedException ex) {
                            throw new RuntimeException(ex);
                        }
                    }
                }
            }
        }
    
    
                private void handleVoucherOrder(VoucherOrder voucherOrder) {
                Long userId = voucherOrder.getUserId();
                //创建锁对象
                RLock lock = redissonClient.getLock("lock:order:" + userId);
                //获取锁
                boolean isLock = lock.tryLock();//默认不等待,30秒过期
                if (!isLock) {
                    //获取锁失败
                    log.info("请勿重复购买!");
                    return;
                }
                //拿到spring事务代理
                try {
                    proxy.createVoucherOrder(voucherOrder);
                } finally {
                    //释放锁
                    lock.unlock();
                }
            }
    
            //这部分的检验是以防stream消息队列里出现问题导致重复save操作
            @Transactional//要锁住事物
            public void createVoucherOrder(VoucherOrder voucherOrder) {
                Long userId = voucherOrder.getUserId();
                //一人一单
                int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
                if (count > 0) {
                    log.error("您最多只可购买一单!");
                    return;
                }
                //扣减库存
                boolean flag = seckillVoucherService.update()
                        .setSql("stock=stock-1")
                        .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)
                        .update();
                if (!flag) {
                    log.info("库存不足!");
                    return;
                }
                //创建订单
                save(voucherOrder);
            }
        }
    
    
    
    
    
            @Override
            public Result secKill(Long voucherId) {
                //查询优惠券
                SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
                //活动是否开始/结束
                if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
                    return Result.fail("活动未开始!");
                }
                if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
                    return Result.fail("活动已结束!");
                }
                //库存表是否充足
                if (voucher.getStock() < 1) {
                    return Result.fail("库存不足!");
                }
                //获取用户
                Long userId = UserHolder.getUser().getId();
                //1执行lua脚本
                //唯一ID
                long orderId = redisIdWorker.nextId("order");
                Long result = stringRedisTemplate.execute(
                        SECKILL_SCRIPT,
                        Collections.emptyList(),
                        voucherId.toString(), userId.toString(), String.valueOf(orderId));
                int r = result.intValue();
                //2判断lua脚本返回值0
                if (r != 0) {
                    //2.1不为零无资格
                    return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!");
                }
                return Result.ok(orderId);
            }
    
    

5.测试分析 

我们再次使用jmeter进行同样的测试,但这次我们需要提前将库存信息同步到redis 

可以看到经过优化的秒杀业务吞吐量大大增加,平均响应时间降低到30ms左右,得到了十倍左右的提升,大大增加了响应处理效率

 redis订单记录

redis消息队列记录

如果去控制台观察日志可以发现,删改请求少量穿插在中间,大部分聚集在查询校验结束的末尾,读操作基本都聚集在最前面,DB操作得到有效控制,这就是异步写入处理的体现

好了,本次分享到这里结束,谢谢阅读!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2321570.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

纯vue手写流程组件

前言 网上有很多的vue的流程组件&#xff0c;但是本人不喜欢很多冗余的代码&#xff0c;喜欢动手敲代码&#xff1b;刚开始写的时候&#xff0c;确实没法下笔&#xff0c;最后一层一层剥离&#xff0c;总算实现了&#xff1b;大家可以参考我写的代码&#xff0c;可以拿过去定制…

WPS宏开发手册——使用、工程、模块介绍

目录 系列文章前言1、开始1.1、宏编辑器使用步骤1.2、工程1.3、工程 系列文章 使用、工程、模块介绍 JSA语法 第三篇练习练习题&#xff0c;持续更新中… 前言 如果你是开发人员&#xff0c;那么wps宏开发对你来说手拿把切。反之还挺吃力&#xff0c;需要嘻嘻&#xf…

django入门教程之request和reponse【二】

接上节&#xff1a;入门【一】 再创建一个orders子应用&#xff0c;python manager.py startapp orders&#xff0c;orders目录中新建一个urls.py文件。结构如图&#xff1a; 通过上节课&#xff0c;我们知道在views.py文件中编写函数时&#xff0c;有一个默认入参request&…

RAG优化:python从零实现[吃一堑长一智]循环反馈Feedback

本文将介绍一种有反馈循环机制的RAG系统,让当AI学会"吃一堑长一智",给传统RAG装了个"后悔"系统,让AI能记住哪些回答被用户点赞/拍砖,从此告别金鱼记忆: 每次回答都像在玩roguelike:失败结局会强化下次冒险悄悄把优质问答变成新知识卡牌,实现"以…

【Linux】VMware17 安装 Ubuntu24.04 虚拟机

目录 安装教程 一、下载 Ubuntu 桌面版iso映像 二、安装 VMware 三、安装 Ubuntu 桌面版 VMware 创建虚拟机 挂载 Ubuntu ISO 安装 Ubuntu 系统 安装教程 一、下载 Ubuntu 桌面版iso映像 链接来自 清华大学开源软件镜像站 ISO文件地址&#xff1a;ubuntu-24.04.2-des…

WPS宏开发手册——JSA语法

目录 系列文章2、JSA语法2.1、打印输出2.2、注释2.3、变量2.4、数据类型2.5、函数2.6、运算符2.7、比较2.8、if else条件语句2.9、for循环2.10、Math对象&#xff08;数字常用方法&#xff09;2.11、字符串常用方法2.12、数组常用方法 系列文章 使用、工程、模块介绍 JSA语…

word中指定页面开始添加页码

第一步&#xff1a; 插入页码 第二步&#xff1a; 把光标放到指定起始页码处 第三步&#xff1a; 取消链接到前一节 此时关掉页脚先添加分节符 添加完分节符后恢复点击 第四步&#xff1a; 设置页码格式&#xff0c;从1开始 第五步&#xff1a; 删掉不要的页码&#xff0c…

Python实现deepseek接口的调用

简介&#xff1a;DeepSeek 是一个强大的大语言模型&#xff0c;提供 API 接口供开发者调用。在 Python 中&#xff0c;可以使用 requests 或 httpx 库向 DeepSeek API 发送请求&#xff0c;实现文本生成、代码补全&#xff0c;知识问答等功能。本文将介绍如何在 Python 中调用 …

文档处理控件Aspose.Words 教程:.NET版中增强的 AI 文档摘要功能

Aspose.Words是一个功能强大的 Word 文档处理库。它可以帮助开发人员自动编辑、转换和处理文档。 自 24.11 版以来&#xff0c;Aspose.Words for .NET 提供了 AI 驱动的文档摘要功能&#xff0c;使用户能够从冗长的文本中快速提取关键见解。在 25.2 版中&#xff0c;我们通过使…

19,C++——11

目录 一、 C11简介 二、 新增的列表初始化 三、 新增的STL容器 四、 简化声明 1&#xff0c;auto 2&#xff0c;decltype 3&#xff0c;nullptr 五、右值引用 1&#xff0c;左值引用和右值引用 2&#xff0c;两种引用的比较 3&#xff0c;左值引用的使用场景 4&…

风尚云网|前端|前后端分离架构深度剖析:技术革新还是过度设计?

前后端分离架构深度剖析&#xff1a;技术革新还是过度设计&#xff1f; 作者&#xff1a;风尚云网 在数字化转型浪潮中&#xff0c;前后端分离架构已成为现代Web开发的主流模式。但这项技术真的是银弹吗&#xff1f;本文将从工程实践角度&#xff0c;剖析其优势与潜在风险&am…

CMS网站模板设计与用户定制化实战评测

内容概要 在数字化转型背景下&#xff0c;CMS平台作为企业内容管理的核心载体&#xff0c;其模板架构的灵活性与用户定制能力直接影响运营效率。通过对WordPress、Baklib等主流系统的技术解构发现&#xff0c;模块化设计理念已成为行业基准——WordPress依托超过6万款主题库实…

搭建个人博客教程(Hexo)

如何快速搭建一套本地的博客系统呢&#xff1f;这里有一套gitNode.jsHexo的部署方案来进行解决。 安装git Git 是一款免费开源的分布式版本控制系统&#xff0c;由 Linus Torvalds 于 2005 年为 Linux 内核开发设计。它通过本地仓库和远程仓库实现代码管理&#xff0c;支持分支…

Docker 可视化工具 Portainer

Docker 可视化工具 Portainer安装 官方安装地址&#xff1a;https://docs.portainer.io/start/install-ce/server/docker/wsl 一&#xff0c;首先&#xff0c;创建 Portainer Server 用来存储数据库的卷&#xff1a; docker volume create portainer_data二&#xff0c;然后…

数据库基础知识点(系列二)

1&#xff0e;关系数据模型由哪三个要素组成。 答&#xff1a;关系数据模型由关系数据结构、关系操作集合和关系完整性约束三部分组成。 2&#xff0e;简述关系的性质。&#xff08;关系就是一张二维表格&#xff0c;但不是任何二维表都叫关系&#xff09; 答&#xff1a;(1…

如何进行灌区闸门自动化改造-闸门远程控制系统建设

改造背景 操作效率低‌&#xff1a;人工启闭耗时耗力&#xff0c;单次操作需2-3人配合&#xff0c;耗时长。 ‌水资源浪费‌&#xff1a;依赖经验估算放水量&#xff0c;易导致漫灌或供水不足。 ‌管理滞后‌&#xff1a;无法实时监控水位、流量&#xff0c;故障响应延迟。 …

【算法笔记】图论基础(二):最短路、判环、二分图

目录 最短路松弛操作Dijkstra朴素Dijkstra时间复杂度算法过程例题 堆优化Dijkstra时间按复杂度算法过程例题 bellman-ford时间复杂度为什么dijkstra不能处理负权边&#xff1f;dijkstra的三个步骤&#xff1a;反例失效的原因 算法过程例题 spfa时间复杂度算法过程例题spfa求最短…

EMS小车技术特点与优势:高效灵活的自动化输送解决方案

北成新控伺服技术丨EMS小车调试视频 EMS小车是一种基于单轨运行的电动输送系统&#xff0c;通过电力驱动实现物料的高效搬运和输送&#xff0c;具有高效灵活、节能环保、多功能集成、行业适配性强等特性&#xff0c;广泛应用于汽车制造、工程机械、家电生产、仓储物流等行业自动…

uniapp运行到支付宝开发者工具

使用uniapp编写专有钉钉和浙政钉出现的样式问题 在支付宝开发者工具中启用2.0构建的时候&#xff0c;在开发工具中页面样式正常 但是在真机调试和线上的时候不正常 页面没问题&#xff0c;所有组件样式丢失 解决 在manifest.json mp-alipay中加入 "styleIsolation&qu…

C++ 性能优化隐藏陷阱:从系统调用到并发开销的深度反思

作为一名C++技术专家,我深知性能优化不仅是代码层面的艺术,更是理解硬件与语言交互的科学。在现代计算中,C++的抽象为开发者提供了便利,却也隐藏了硬件的复杂性。如何揭开这些“谎言”,让代码与硬件协同工作?本文将以小案例为载体,通过优化前后的对比,深入剖析每个章节…