Redis消息队列实现异步秒杀

news2024/12/24 21:37:15

Redis秒杀优化

改进秒杀业务,提高并发性能

需求:

1.新增秒杀优惠券的同时,将优惠券的信息保存到redis中

2.基于Lua脚本,判断秒杀库存,一人一单,决定用户是否抢购成功

3.如果抢购成功,将优惠券id和用户id封装后存入阻塞队列

4.开启线程任务,不断从阻塞队列中获取消息,实现异步下单功能

秒杀业务的优化思路是什么?

1.先利用Redis完成库存余量、一人一单判断,完成抢单业务

2.再将下单业务放入阻塞队列,利用独立线程异步下单

基于阻塞队列的异步秒杀存在那些问题

1.内存限制问题

2.数据安全问题

Redis消息队列实现异步秒杀

消息队列,字面意思就是存放消息的队列,最简单的消息队列模型包括三个角色:

  • 消息队列:存储和管理消息,也被称为消息代理

  • 生产者:发送消息到消息队列

  • 消费者:从消息队列获取消息并处理

Redis提供了三种不同的方式来实现消息队列:

  • list结构:基于List结构模拟消息队列

  • PubSub:基本的点对点的消息模型

  • Stream:比较完善的消息队列模型

基于list结构模拟消息队列

消息队列,字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果

队列是出口和入口不在一边,因此我们可以利用:LPUSH和LPOP、或者RPUSH和LPOP来实现

不过要注意的是,当队列中没有消息时RPOP和LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP和BLPOP来实现阻塞效果

基于List的消息队列有那些优缺点

优点:

1.利用Redis存储,不受限于JVM内存上限

2.基于Redis的持久化机制,数据安全性有保证

3.可以满足消息有序性

缺点:

1.无法避免消息丢失

2.只支持单消费者

基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应的channel发送消息后,所有订阅者都能收到相关的消息

  • SUBSCRIBU channel[channel]:订阅一个或多个频道

  • PUBLISH channel msg:向一个频道发送消息

  • PSUBSCRIBU pattern[pattern]:订阅与pattern格式匹配的所有频道

基于PubSub的消息队列有那些优缺点:

优点:

1.采用发布订阅模型、支持多生产、多消费

缺点:

1.不支持数据持久化

2.无法避免消息丢失

3.消息堆积有上限,超出时数据丢失

基于Stream的消息队列

Stream 是Redis5.0引入的一种新的数据结构,可以实现一个功能非常完善的消息队列

发送消息的命令:

例如:

读取消息的方式之一:XREAD

XREAD阻塞方式,读取最新的消息:

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:

STREAM类型消息模型的XREAD命令特点:

1.消息可回溯

2.一个消息可以被多个消费者读取

3.可以阻塞读取

4.有消息漏读的风险

基于STREAM的消息队列-消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听头一个队列,具备下列特点

创建消费者组

  • key:队列名称

  • groupName:消费者组名称

  • ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息

  • MkSTREAM:队列不存在时自动创建队列

其他常见命令

从消费者组读取消息

  • grooup:消费者组名称

  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者

  • count:本次查询的最大数量

  • BLOCK milliseconds:当没有消息时最长等待时间

  • NOACK:无需手动ACK,获取到消息后自动确认

  • STREAMS KEY:指定队列名称

  • ID:获取队列的起始ID:

“>”:从下一个未消费的消息开始

其他:根据指定id从pengding-list中获取已消费但未确认的消息,例如0,是从pengding-list中的第一个消息开始

消费者监听消息的基本思路

seckill.lua

-- 参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]

-- 2.数据key

-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.1 库存key
local orderKey = 'seckill:order:' .. voucherId

-- 脚本业务

-- 3.1判断库存是否充足 get stockKey
if (tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2 库存不足,返回1
    return 1
end
-- 3.3 判断用户是否下单 SISMEMBER orderKey userId
if (redis.call('sismember', orderKey, userId) == 1) then
    -- 3.4 存在,说明重复下单,返回2
    return 2
end
-- 3.5 扣减库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.6下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.7 发送消息到队列中,XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

VoucherOrderServiceImpl部分代码

private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }


    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
    @PostConstruct
    private void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }
    private class VoucherOrderHandler implements Runnable {
        String queueName = "stream.orders";

        @Override
        public void run() {
            while (true) {
                try {
                    //1.获取消息队列中的订单信息XREADGROUP GROUP g1 c1 count 1 BLOCK 2000 STREAMS stream.orders
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    //2.判断消息获取是否成功
                    if (list == null || list.isEmpty()) {
                        //2.1如果获取失败,说明没有消息继续下一次循环
                        continue;
                    }
                    //3.解析消息中的订单消息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    //4.如果获取成功,可以下单
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    HandleVoucherOrder(voucherOrder);
                    //5.ACK确认 stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理订单异常", e);
                    handlePendingList();
                }
            }
        }
        private void handlePendingList() {
            while (true) {
                try {
                    //1.获取pending-list中的订单信息XREADGROUP GROUP g1 c1  1 BLOCK 2000 STREAMS stream.orders 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    //2.判断消息获取是否成功
                    if (list == null || list.isEmpty()) {
                        //2.1如果获取失败,说明pending-list没有异常消息,结束循环
                        break;
                    }
                    //3.解析消息中的订单消息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    //4.如果获取成功,可以下单
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    HandleVoucherOrder(voucherOrder);
                    //5.ACK确认 stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理pending-list异常", e);
                    try {
                        Thread.sleep(20);
                    } catch (InterruptedException interruptedException) {
                        interruptedException.printStackTrace();
                    }
                }
            }
        }
    }

STREAN类型消息队列的XREADGROUP命令特点

1.消息可回溯

2.可多个消费者争抢消息,加快消费速度

3.可以阻塞读取

4.没有消息漏读的风险

5.有消息确认机制,保证消息至少被消费一次

视频地址

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/418133.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android系统启动流程--init进程的启动流程

这可能是个系列文章&#xff0c;用来总结和梳理Android系统的启动过程&#xff0c;以加深对Android系统相对全面的感知和理解&#xff08;基于Android11&#xff09;。 1.启动电源&#xff0c;设备上电 引导芯片代码从预定义的地方&#xff08;固化在ROM&#xff0c;全称Read …

hive 入门 一般用于正式环境 修改元数据(二)

安装配置可参考 https://blog.csdn.net/weixin_43205308/article/details/130020674 1、如果启动过derby&#xff0c;最小初始化过 在安装路径下删除 derby.log metastore_db rm -rf derby.log metastore_db此处省略安装mysql数据库 2、配置MySQL 登录mysql mysql -uroot …

EightCap易汇:外汇投资入门需要了解哪些必要知识?

外汇市场是国际投资市场&#xff0c;日内交易量巨大&#xff0c;盈利机会极多。外汇是一种含有杠杆的投资产品&#xff0c;杠杆带来了高收益&#xff0c;也会带来高风险&#xff0c;对于外汇新手来说存在一定难度。新手投资者要如何交易&#xff0c;才能抓住外汇市场的盈利机会…

C++标准库 -- 关联容器 (Primer C++ 第五版 · 阅读笔记)

C标准库 -- 关联容器(Primer C 第五版 阅读笔记&#xff09;第11章 关联容器------(持续更新)11.1、使用关联容器11.2、关联容器概述11.3、关联容器操作11.4、无序容器第11章 关联容器------(持续更新) 关联容器和顺序容器有着根本的不同:关联容器中的元素是按关键字来保存和…

助力AI语音开发者的社区-语音之家

语音之家简介 语音之家成立于2021年4月&#xff0c;是一家助力AI语音开发者的社区&#xff0c;我们希望通过知识传播、在线学习、资源分享、各类活动等方式提供全生命周期的服务&#xff0c;帮助全球的AI语音开发者获得成长&#xff0c;洞见AI语音技术领域的发展。目前&#x…

TiDB实战篇-TiDB Lightning 导入数据

简介 使用TiDB Lightning 导入数据。 原理 TiKV进入导入模式 它是使用物理导入的模式&#xff0c;将SQL文件直接导入到TiKV中&#xff0c;它是一种初始化的导入&#xff0c;也就是说目标的数据库和表都是不能够存在的&#xff08;注意事项&#xff0c;在这种方式导入的时候T…

论文笔记 U-Net: Convolutional Networks for Biomedical Image Segmentation

摘要&#xff1a;人们普遍认为&#xff0c;深度网络的成功训练需要数千个带注释的训练样本。在本文中&#xff0c;我们提出了一种网络和训练策略&#xff0c;该策略依赖于大量使用数据增强来更有效地使用可用的注释样本。该体系结构包括用于捕获上下文的收缩路径和用于实现精确…

计算机组件介绍

1. CPU 1.1 主频 1.2 CPU缓存 注&#xff1a;越高越好 2. Memory 注&#xff1a;只有内存是主存&#xff08;因为CPU只能和内存打交道&#xff09;&#xff0c;硬盘这种就是外存&#xff08;因为硬盘太慢了&#xff0c;跟不上cpu的运行速度&#xff09; 3. I/O 注&#xff1a;输…

Segment Anything论文翻译,SAM模型,SAM论文,SAM论文翻译;一个用于图像分割的新任务、模型和数据集;SA-1B数据集

【论文翻译】- Segment Anything / Model / SAM论文 论文链接&#xff1a; https://arxiv.org/pdf/2304.02643.pdfhttps://ai.facebook.com/research/publications/segment-anything/ 代码连接&#xff1a;https://github.com/facebookresearch/segment-anything 论文翻译&…

微软 AI 作图上线完全免费,“奖励自己”可提升速度

ChatGPT 的横空出世应该已经让大家意识到了 AI 的恐怖。 称不上啥都能干&#xff0c;但给东西它真学&#xff0c;学得还比你快。 最近一段时间 AI 在作图领域又一次人气暴涨。 什么小姐姐写真、突破时间线的历史古图、甚至是抽象的表情包都可能源于 AI 之手。 看着手痒想玩玩…

滴滴滴,请看MYSQL事务的四大特征(ACID)的实现原理:晓其原理而通其实现。

一.什么是事务的四特征 原子性&#xff08;Atomicity&#xff0c;或称不可分割性&#xff09;一致性&#xff08;Consistency&#xff09;隔离性&#xff08;Isolation&#xff09;持久性&#xff08;Durability&#xff09; 接下来&#xff0c;我们将对四大特性的具体概念以及…

本地快速搭建Kubernetes单机版实验环境(含问题解决方案)

Kubernetes是一个容器编排系统&#xff0c;用于自动化应用程序部署、扩展和管理。本指南将介绍Kubernetes的基础知识&#xff0c;包括基本概念、安装部署和基础用法。 一、什么是Kubernetes&#xff1f; Kubernetes是Google开发的开源项目&#xff0c;是一个容器编排系统&…

长沙基层公务员待遇调查结果

之前发放了1000份调查问卷&#xff0c;统计过长沙各个行业&#xff08;其中一半是信息产业从业人员&#xff09;的待遇情况&#xff0c;发现很多人对长沙公务员&#xff08;包含有编制/合同工&#xff09;的待遇很感兴趣。我随手翻了翻几个基层政府单位的财政决算公开说明&…

MySQL笔记2

MySQL笔记2一、CRUD操作1.修改数据&#xff1a;update2.删除数据&#xff1a;delete二、数据库中表的约束1.非空约束 not null2.唯一性约束 unique3.主键约束 primary key4.外键约束三、索引1**什么是索引**&#xff1f;2**MySQL中最经典的两种存储引擎**3**为何需要索引**&…

Linux项目日志管理log4cpp的安装与使用【结合sample】

文章目录前言log4cpp安装log4cpp的使用设置类别输出的&#xff08;category&#xff09;和日志优先级&#xff08;priority&#xff09;定义一个宏用于输出日志配置文件使用log4cpp的栗子结语前言 我们都清楚对于一个项目来说它的日志信息是非常重要的&#xff0c;那么我们应该…

springcloud微服务架构搭建过程

项目地址&#xff1a;源代码 仅作为学习用例使用&#xff0c;是我开发过程中的总结、实际的一部分使用方式 开发环境&#xff1a; jdk11 springboot2.7.6 springcloud2021.0.5 alibabacloud 2021.0.4.0 redis6.0 mysql8.0 一、项目搭建 wdz-api&#xff1a;存放远程服务调用相关…

VS for Qt 向MySql 数据库中插入中文

问题&#xff1a; 今天我想向数据库中插入中文&#xff0c;但是&#xff0c;插入的时候会报错&#xff1a; 首先先看报错&#xff1a; QSqlError("1054", "QMYSQL: Unable to execute query", "Unknown column 韩红 in field list")如果错误码…

将Linux服务器上的项目上传至Github

使用git上传项目到github常规的步骤继续上传注意事项参考文章常规的步骤 初始化git空间 git init向缓冲区添加想要上传的文件 git add -f /data/xuhongbo/xuhongbo.code/unbiased_sgg_xuhongbo_BCL/maskrcnn_benchmark/*添加备注信息告诉机器&#xff0c;你真的要添加上述文…

elasticsearch-7.17.9

1、ElasticSearch 1.1、概念 1.1.1、分片(shard) 1、分片 在ES中所有数据的文件块&#xff0c;也是数据的最小单元块&#xff0c;整个ES集群的核心就是对所有分片的分布、索引、负载、路由等达到惊人的速度。 实列场景&#xff1a; 假设 IndexA 有2个分片&#xff0c;向 I…

数据结构 - 计数排序 | C

什么是计数排序 如上图&#xff0c;统计数组中值的个数&#xff1a; 2个[1]&#xff1a;1&#xff0c;1 1个[2]&#xff1a;2 3个[3]&#xff1a;3&#xff0c;3&#xff0c;3 2个[4]&#xff1a;4&#xff0c;4 传给原数组&#xff1a;&#xff08;即完成排序↓&#xff09; …