Redis:原理速成+项目实战——Redis实战10(Redis消息队列实现异步秒杀)

news2025/1/11 21:07:27

👨‍🎓作者简介:一位大四、研0学生,正在努力准备大四暑假的实习
🌌上期文章:Redis:原理速成+项目实战——Redis实战9(秒杀优化)
📚订阅专栏:Redis:原理速成+项目实战
希望文章对你们有所帮助

上一节已经实现了异步秒杀,也就是将秒杀分为两个环节:
1、判断是否有抢单资格(库存量是否充足、是否满足一人一单)、
2、下单操作(优惠券表中的库存量-1,订单表增加相应信息)
其中,第一步的操作放在了Redis中,可以有效提高效率,而真正大幅度提高效率的点还是因为我们将下单的操作交给了另一个开辟的线程,因为对数据库的操作并不需要什么时效性。

异步执行所需要的信息被封装并保存到了阻塞队列中,上一节分析了这会造成的问题:
1、内存限制问题
2、数据安全问题

消息队列可以解决这个问题,一般建议用专业的消息中间件来使用,最主流的当然就是RabbitMQ了,但是这边也讲解一下用Redis里面的一些数据结构来模拟出消息队列的效果,实现的话我感觉也挺容易的,只演示基于Stream消息队列实现异步秒杀。

Redis消息队列实现异步秒杀

  • 认识消息队列
  • 基于List实现消息队列
  • PubSub实现消息队列
  • Stream的单消费模式
  • Stream的消费者组模式
  • 基于Stream消息队列实现异步秒杀

认识消息队列

消息队列,也就是存放消息的队列,最简单的消息队列包括3个角色:
(1)消息队列(代理):存储和管理信息
(2)生产者:发消息到消息队列
(3)消费者:从消息队列中获取消息并处理
因此,异步秒杀的思路为:
在这里插入图片描述

这个思路与上一节用阻塞队列的思路是差不多的,但是有2点重要区别:
1、消息队列是JVM以外的独立服务,不受JVM内存的限制
2、消息队列不仅仅做数据存储,还确保了数据安全,存到消息队列中的消息会做持久化处理,并要求消费者要做出消息的确认,否则会持续将消息传递给消费者,确保消息至少被“签收”一次

基于List实现消息队列

List是一种双向链表,很容易模拟出队列。
需要注意的是,当消息队列中没有消息的时候,我们应当要让线程等待,而不是直接返回Null,因此这儿要用BRPOPBLPOP来实现阻塞效果(B表示阻塞)

优点:
(1)利用Redis存储,不受限于JVM内存上限
(2)基于Redis的持久化机制,保证数据安全性
(3)满足消息有序性
缺点:
(1)无法避免消息丢失(消息会从队列直接移除)
(2)只支持单消费者

PubSub实现消息队列

PubSub(发布订阅)是Redis2.0引入的,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

优点:采用发布订阅模型,支持多生产、多消费
缺点:
(1)不支持数据持久化
(2)无法避免消息丢失
(3)消息堆积有上线,超出时数据丢失

Stream的单消费模式

Stream是Redis5.0引入的一种新数据类型,可以实现功能完善的消息队列。
在这里插入图片描述
例如:
在这里插入图片描述
读取消息:XREAD
在这里插入图片描述
例如,用XREAD读第一个消息:

XREAD COUNT 1 STREAMS users 0

用XREAD阻塞方式读取最新消息:

XREAD COUNT 1 BLOCK STREAMS users $

所以,在开发的时候,可以循环调用XREAD阻塞方式来查询最新消息,从而实现持久监听队列。
但是,当指定起始ID为$读取最新消息,处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取也只能获取到最新的一条,会出现消息漏读

特点:
(1)消息可回溯
(2)一个消息可以被多个消费者读取
(3)可以阻塞读取
(4)有消息漏读的风险

Stream的消费者组模式

这一部分命令还是麻烦了,理解就行,要使用就去看文档就好了。

消费者组可以解决消息漏读的问题。
消费者组:将多个消费者划分到一个组中,监听同一个队列。

特点:
1、消息分流:队列中的消息会分流给组内不同消费者,而不是重复消费,从而加快消息处理速度
2、消息标识:消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后去读取消息,确保了每一个消息都会被消费
3、消息确认:消费者获取消息后,消息处于pending状态,存入pending-list,当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list中移出,可以解决消息丢失的问题

创建消费者组:
XGROUP CREATE key groupName ID [MKSTREAM]
删除指定消费者组
XGROUP DESTROY key groupName
给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupName consumerName
删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupName consumerName

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key ID
其中,ID表示获取消息的起始ID:
(1)“>”:从下一个未消费的消息开始
(2)其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中第一个消息开始

基于Stream消费者组,我们利用消费者监听消息的基本思路:
1、使用阻塞模式尝试监听队列,没消息就继续监听,有消息就开线程处理消息,并在完成后ACK。
2、若没有成功ACK,抛出异常,那么消息就会留在padding-list中,这时候就需要读取padding-list获取异常消息并处理。

STREAM类型消息队列的XREADGROUP命令特点:

1、消息可回溯
2、可以多消费者争抢消息,加快消费速度
3、可以阻塞读取
4、没有消息漏读的风险
5、有消息确认机制,保证消息至少被消费一次

基于Stream消息队列实现异步秒杀

1、创建Stream类型的消息队列stream.orders和消费者组:

XGROUP CREATE stream.orders g1 0 MKSTREAM # 组名g1,起始位置为0

在这里插入图片描述
2、修改之前秒杀下单的Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包括voucherId、userId、orderId:

-- 1 参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]

-- 2 数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3 脚本业务
-- 3.1 判断库存是够充足
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2 库存不足,返回1
    return 1
end
-- 3.2 判断用户是否下单,即判断用户id是不是这个set集合的成员
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.2 存在,说明重复下单
    return 2
end
-- 3.4 扣库存
redis.call('incrby', stockKey, -1)
-- 3.5 下单(保存用户)
redis.call('sadd', orderKey, userId)
-- 3.6 发送消息到队列中,orderId的key指定为Id更好,因为订单实体类是这么定义的
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'Id', orderId)
return 0

与上次的代码相比,我们多增加了一个参数,所以我们要修改一下函数的调用:
在这里插入图片描述
这个参数的增加,在后续的编写中会省去一些麻烦。

3、项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单,整体的业务流程的代码如下:

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    //注入秒杀优惠券的service
    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private RedissonClient redissonClient;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    public static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    //线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    //线程任务,用户随时都能抢单,所以应该要在这个类被初始化的时候马上开始执行
    @PostConstruct  //该注解表示在当前类初始化完毕以后立即执行
    private void init(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        //获取用户,用户id不能再从UserHolder中取了,因为现在是从线程池获取的全新线程,不是主线程
        Long userId = voucherOrder.getUserId();
        //创建锁对象
        RLock lock = redissonClient.getLock("lock:order:" + userId);
        //获取锁
        boolean isLock = lock.tryLock();
        //判断是否获取锁成功
        if(!isLock){
            log.error("不允许重复下单");//理论上不会发生
        }
        try {
            proxy.createVoucherOrder(voucherOrder);
        } finally {
            lock.unlock();
        }
    }

    IVoucherOrderService proxy;

    private class VoucherOrderHandler implements Runnable{
        String queueName = "stream.orders";
        @Override
        public void run() {
            while (true){
                try {
                    //获取消息队列中的订单信息,XREADGROUP GROUP g1 c1 BLOCK 2000 STREAMS stream.orders
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    //判断消息获取是否成功
                    if(list == null || list.isEmpty()) {//获取失败,说明没有消息,继续下一次循环
                        continue;
                    }
                    //解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    //将其转变为VoucherOrder对象,忽略异常
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    //获取成功,下单
                    handleVoucherOrder(voucherOrder);
                    //ACK确认
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                } catch (Exception e) {
                    //异常则表示没有被ACK确认,剩下的操作都是针对pending-list的
                    log.error("处理订单异常", e);
                    handlePendingList();
                }
            }
        }

        private void handlePendingList() {
            while (true){
                try {
                    //获取pending-list中的订单信息 XREADGROUP g1 c1 COUNT 1 STREAMS stream.orders 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"), //消费者信息
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    if(list == null || list.isEmpty()) {
                        //获取失败,说明没有消息,结束循环
                        break;
                    }
                    //解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    //将其转变为VoucherOrder对象,忽略异常
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    //获取成功,下单
                    handleVoucherOrder(voucherOrder);
                    //ACK确认
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理pending-list订单异常", e);
                }
            }
        }
    }

    //秒杀优化,调用Lua的代码
    @Override
    public Result seckillVoucher(Long voucherId) {
        //获取用户
        Long userId = UserHolder.getUser().getId();
        //获取订单id
        long orderId = redisIdWorker.nextId("order");
        //执行Lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString(), String.valueOf(orderId)
        );
        //判断结果是否为0
        int r = result.intValue();
        if(r != 0){
            //不为0,没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
        //获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();

        //返回订单id
        return Result.ok(orderId);
    }
    
    @Transactional(rollbackFor = Exception.class)
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        Long voucherId = voucherOrder.getVoucherId();
        //查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        //判断是否存在
        if (count > 0) {
            log.error("不可重复购买");
        }
        //扣减库存
        boolean success = seckillVoucherService.update().
                setSql("stock = stock - 1").
                eq("voucher_id", voucherId).
                gt("stock", 0).
                update();
        if (!success) {
            log.error("库存不足");
        }
        //保存订单
        this.save(voucherOrder);
    }
}

我觉得真的还是太麻烦了。。。而且我遇到了很多次bug,反正都跟线程池有关系,自己修改bug的能力一般,耽误了不少时间,这方面能力要提高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1374914.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

wxWidgets实战:使用mpWindow绘制阻抗曲线

选择模型时&#xff0c;需要查看model的谐振频率&#xff0c;因此需要根据s2p文件绘制一张阻抗曲线。 如下图所示&#xff1a; mpWindow 左侧使用mpWindow&#xff0c;右侧使用什么&#xff1f; wxFreeChart https://forums.wxwidgets.org/viewtopic.php?t44928 https://…

实战环境搭建-linux下安装tomcat

安装tomcat Index of /dist/tomcat/tomcat-9/v9.0.8/bin 下载apache-tomcat-9.0.8.tar.gz&#xff0c;可以使用wget; 2、将压缩包tar -zxvf apache-tomcat-9.0.8.tar.gz解压到/home/tomcat 3、修改环境变量 vi /etc/profile export JAVA_HOME/home/java/jdk1.8.0_221 expo…

IntelliJ IDEA Java 连接 mysql 配置(附完整 demo)

下载 MySQL 驱动 从MySQL官网下载JDBC驱动的步骤如下&#xff1a; 1&#xff09;访问MySQL的官方网站&#xff1a;MySQL 2&#xff09;点击页面上方的"DOWNLOADS"菜单&#xff1b; 3&#xff09;在下载页面&#xff0c;找到"MySQL Community (GPL) Downloads…

QT第四天

头文件&#xff1a; #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include<QTime>//时间类 #include<QTimerEvent>//定时器事件类 #include<QtTextToSpeech> //语言播报类 #include<QPushButton> namespace Ui { class Widget; }clas…

2023年全国职业院校技能大赛(高职组)“云计算应用”赛项赛卷⑥

2023年全国职业院校技能大赛&#xff08;高职组&#xff09; “云计算应用”赛项赛卷6 目录 需要竞赛软件包环境以及备赛资源可私信博主&#xff01;&#xff01;&#xff01; 2023年全国职业院校技能大赛&#xff08;高职组&#xff09; “云计算应用”赛项赛卷6 模块一…

前端导出Excel文件,部分数字前面0消失处理办法

详细导出可以看之前的文章 js实现导出Excel文档_js 通过 接口 导出 xlsx 代码-CSDN博客 今天的问题是导出一些数据时&#xff0c;有些字段是前面带有0的字符串&#xff0c;而导出后再excel中就被识别成了数字 如图本来字符串前面的0 都没了 解决方案 1. 导出的时候在前面加单…

微信V3支付,JSAPI 支付报错,返回 映射到值字段“子商户号/二级商户号”字符串规则校验失败

问题 最近使用 微信V3 支付的JAVA 版本&#xff0c;调用 JSAPI 支付&#xff0c;报错&#xff1a; httpResponseBody[{"code":"PARAM_ERROR","detail":{"location":null,"value":""},"message":"…

Fluids —— Whitewater (SOP)

目录 Whitewater Lifecycle Workflow Whitewater source Deformation sources Visualizing whitewater Whitewater solver Wind Foam erosion Repellants Whitewater postprocess 基于SOP的白水是对SOP FLIP工作流的增强&#xff1b;该系统与规模无关&#xff0c;无需…

day-06 构造有效字符串的最少插入数

思路 动态规划&#xff1a; Word[i]单独组成abc 如果Word[i]>word[i-1] 则word[i]和word[i-1]一起构成abc 解题方法 关系式&#xff1a;dp[i]dp[i-1]2或dp[i]dp[i-1]-1 时间复杂度&#xff1a; O(n) 空间复杂度&#xff1a; O(1) Code class Solution {/*动态规划&…

摩托车充气泵方案芯片应用设计

技术工程师在做产品方案开发之前&#xff0c;首先也是最重要的就是芯片选型。为什么这样说呢&#xff1f;那是因为芯片是整个方案设计中&#xff0c;最至关重要的一环&#xff0c;没有它&#xff0c;后面的工作将无法进行&#xff0c;只有将芯片核心基础定下来&#xff0c;后面…

zabbix监控windows主机

下载安装zabbix agent安装包 Zabbix官网下载地址: https://www.zabbix.com/cn/download_agents?version5.0LTS&release5.0.40&osWindows&os_versionAny&hardwareamd64&encryptionOpenSSL&packagingMSI&show_legacy0 这里使用zabbix agent2 安装 …

C#,迭代深化搜索(IDS)或迭代深化深度优先搜索(IDDFS)算法的源代码

摘要&#xff1a;本文介绍适合于大数据规模情况下的&#xff0c;新型的迭代深化深度优先搜索(IDDFS)算法的原理、实例及实现的C#源代码。 引言 常用的树&#xff08;或图&#xff09;遍历算法是两种&#xff1a; 广度优先搜索算法&#xff08;BFS&#xff09; 和 深度优先搜索…

LeetCode[105] 从前序与中序遍历序列构造二叉树

给定两个整数数组 preorder 和 inorder &#xff0c;其中 preorder 是二叉树的先序遍历&#xff0c; inorder 是同一棵树的中序遍历&#xff0c;请构造二叉树并返回其根节点。 示例 1: 输入: preorder [3,9,20,15,7], inorder [9,3,15,20,7] 输出: [3,9,20,null,null,15,7] …

Veeam Backup12安装备份恢复ESXI7.0 U3虚拟机

介绍 只需单个平台即可保护并管理所有工作负载、应用及数据&#xff1a;云端、虚拟、物理、SaaS、Kubernetes、VMware、Hyper-V、Windows、Linux、UNIX、NAS、AWS、Azure、企业应用等。 个人主要用于备份ESXi上的虚拟机&#xff0c;可以实现单次完整备份&#xff0c;和定时的…

k8s中top指令使用前提:正确安装metrics-server

参考引用项目&#xff1a;https://www.cnblogs.com/lfl17718347843/p/14283796.html Kubernetes Metrics Server 是 Cluster 的核心监控数据的聚合器&#xff0c;kubeadm 默认是不部署的。 确认metrics-server能否被使用的三个前提&#xff08;验证以及修改方法https://cnblogs…

C#编程-实现多线程

实现多线程 多线程帮助同时执行各种操作。这为用户节省时间。多线程程序包括一个主线程和其他用户定义的线程以同时执行多个任务。 微处理器为执行的进程分配内存。每个进程占有内存中它们自己的地址空间。但是,所有在进程中的线程占有相同的地址空间。多线程允许在一个程序…

k8s的存储卷

存储卷------数据卷 把容器内的目录&#xff0c;和宿主机的目录进行挂载。 容器在系统上的生命周期是短暂的&#xff0c;delete&#xff0c;k8s用控制&#xff08;deployment&#xff09;创建的pod&#xff0c;delete相当于重启&#xff0c;容器的状态也会回复到初始状态。 …

红队打靶练习:BREACH: 1

信息收集 1、arp ┌──(root㉿ru)-[~/kali] └─# arp-scan -l Interface: eth0, type: EN10MB, MAC: 00:0c:29:69:c7:bf, IPv4: 192.168.110.128 Starting arp-scan 1.10.0 with 256 hosts (https://github.com/royhills/arp-scan) 192.168.110.1 00:50:56:c0:00:08 …

Linux基础开发工具--vim

2.vim 代码编辑器 vim共有3种模式&#xff1a; ​ ​ 命令模式&#xff1a; $:将光标定位在当前行最右侧 ^:将光标定位在当前行最左侧 shiftg:将光标移到文本结尾 nshiftg:将光标移到文本第n行 gg:将光标移到文本开始 h:左 j:下 k:上 l:右 nyy:复制当前行/或复制多…

解锁行内元素和块元素的奥秘:网页开发的基础

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…