Redis之消息队列实现

news2024/11/18 19:37:39

文章目录

      • 秒杀场景
        • 采用消息队列实现
          • List实现消息队列
          • PubSub(发布订阅)实现消息队列
          • 基于Stream实现消息队列
            • 消费者组
        • 实践
    • 总结

秒杀问题是非常重要且比较难实现的,如果不进行架构的优化的话,直接访问会给业务系统造成很大的压力…

秒杀场景

  • 场景一:双十一出现的秒杀场景,比如部分商品开展活动,特价11.11,但是库存一般不多,甚至只有一台。在并发量比较大的情况下,如果我们不进行一些优化的话,很容易出现线程安全问题,而且可能给系统带来太大压力,导致宕机。

如果采用以下的秒杀架构图:

我们可以采用Redis来保证一人一单,我们可以将判断秒杀库存和校验一人一单放到Redis层面来完成,采用异步下单来进行秒杀。

秒杀优化流程.drawio.png

这里我们就需要使用到全面分布式锁中生成的全局ID生成器,因为订单id是需要满足自增、唯一性的条件的,这里也可以采用一些网上的开源方案雪花算法等衍生都可以,但是这样还是会存在问题。什么问题呢?

下单操作的原子性?同步队列的实现?

  • 下单代码是需要校验一人一单和库存等条件的,需要使用lua脚本来保证操作的原子性。

  • 采用阻塞队列来存放数据存在内存限制问题和数据安全问题

采用消息队列实现

消息队列(Message Queue),也叫存放消息的队列,最简单的消息队列模型包括:

  • 消息队列:存储和管理消息,也叫做消息代理。

  • 生产者:发送消息到消息队列

  • 消费者:从消息队列获取消息并处理消息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mtsozweI-1677297911799)(C:\Users\DY\AppData\Roaming\marktext\images\2023-02-21-10-17-22-image.png)]

在Redis中也提供了三种方式实现消息队列:

  • List结构:基于List模拟消息队列

  • PubSub:点对点的消息模型

  • Stream:比较完善的消息队列模型

List实现消息队列

在Redis中List的数据结构本身就是一个双向链表,所以很容易就可以模拟出队列的效果。

我们可以通过LPUSH结合RPOP、RPUSH结合LPOP实现,不过当队列中没有消息时RPOP或LPOP操作会返回null,并不会像阻塞队列一样等待消息,因此我们也可以使用BRPOP或者BLPOP来实现阻塞的效果。

优点

  • 优点非常明显,不再受限于JVM内存(阻塞队列消费)

  • 采用Redis的持久化机制,数据安全有保证

  • 可以保证消息的有序性

缺点:

  • 无法避免消息丢失

  • 仅支持单消费

PubSub(发布订阅)实现消息队列

消费者可以订阅一个或多个channel,生产者发送对应channel消息后,所以订阅者都能收到相关消息。

乍一看,好像这个确实比List强多了,使用发布订阅模式,还支持多订阅。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JJtSFlh0-1677297911800)(C:\Users\DY\AppData\Roaming\marktext\images\2023-02-21-10-37-09-image.png)]

但是缺点也很明显:

  • 不支持数据持久化

  • 无法避免消息丢失

  • 消息堆积有限制,超过时也会出现数据丢失

基于Stream实现消息队列

Stream是Redis5.0以后提出的一种数据类型,是功能比较完善的消息队列

发送消息:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fWu2DdL1-1677297911801)(C:\Users\DY\AppData\Roaming\marktext\images\2023-02-21-10-44-29-image.png)]

读取消息:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KJKu2Q53-1677297911802)(C:\Users\DY\AppData\Roaming\marktext\images\2023-02-21-10-44-50-image.png)]

除去可以实现读取外,还可以设置阻塞时间,从而实现持续监听的效果

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-V62KOP2a-1677297911802)(C:\Users\DY\AppData\Roaming\marktext\images\2023-02-21-10-47-02-image.png)]

上面的ID指定为$时,标识读取的是最新的消息,如果有同时两条消息同时到达MQ,下次读取可能会出现漏读的问题

消费者组

将多个消费者划分到一个组中,监听同一个队列。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MtsRDJTc-1677297911803)(C:\Users\DY\AppData\Roaming\marktext\images\2023-02-21-11-32-58-image.png)]

在Redis的Stream的ID策略中

  • ‘>’: 从下一条未消费的消息开始

  • 其它:根据指定id从pending-list中获取已消费但是未确认的消息

这里的确认是因为前面提到可能会丢失消息,而Stream中的消息在经过消费后,需要进行XACK确认,如果没有进行确认就会把这条消息放到pending-list中。

实践

前面提到如何保证秒杀下单的原子性需要使用到lua脚本,在这里面我把对应的key都写死了

-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2.库存不足,返回1
    return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.3.存在,说明是重复下单,返回2
    return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

在消费中,采用ExectorService来创建一个线程池来进行订单的消费

private class VoucherOrderHandler implements Runnable {
        String queueName = "stream.orders";
        @Override
        public void run() {
            while (true) {
                try {
                    // 1. 获取消息队列中的信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    // 2. 判断消息获取是否成功
                    if (list == null || list.isEmpty()) {
                        // 2.1 获取失败,没有消息,进行下一次循环
                        continue;
                    }
                    // 3. 解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    // 3. 获取成功,可以下单
                    handleVoucherOrder(voucherOrder);
                    // 4. ACK确认 XACK stream.orders  g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理订单异常");
                    handlePendingLists();
                }
            }
        }

        private void handlePendingLists() {
            while (true) {
                try {
                    // 1. 获取pendingList队列中的信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.order 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    // 2. 判断消息获取是否成功
                    if (list == null || list.isEmpty()) {
                        // 2.1 获取失败,说明pending-list没有消息,结束循环
                        break;
                    }
                    // 3. 解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    // 3. 获取成功,可以下单
                    handleVoucherOrder(voucherOrder);
                    // 4. ACK确认 XACK stream.orders  g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理pend-list订单异常", e);
                    try {
                        Thread.sleep(20);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    }

总结

Redis只能满足小项目的需求,更大需求可能需要用到更加高级的消息队列,比如Kafka、RabbitMQ、RocketMQ等等,还有好多学问要学习呢…

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/370340.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux】system V共享内存 | 消息队列 | 信号量

​&#x1f320; 作者&#xff1a;阿亮joy. &#x1f386;专栏&#xff1a;《学会Linux》 &#x1f387; 座右铭&#xff1a;每个优秀的人都有一段沉默的时光&#xff0c;那段时光是付出了很多努力却得不到结果的日子&#xff0c;我们把它叫做扎根 目录&#x1f449;system V共…

go进阶(2) -深入理解Channel实现原理

Go的并发模型已经在https://guisu.blog.csdn.net/article/details/129107148 详细说明。 1、channel使用详解 1、channel概述 Go的CSP并发模型&#xff0c;是通过goroutine和channel来实现的。 channel是Go语言中各个并发结构体(goroutine)之前的通信机制。 通俗的讲&#xf…

通用信息抽取技术UIE产业案例解析,Prompt 范式落地经验分享!

想了解用户的评价究竟是“真心夸赞”还是“阴阳怪气”&#xff1f;想快速从多角色多事件的繁杂信息中剥茧抽丝提取核心内容&#xff1f;想通过聚合相似事件准确地归纳出特征标签&#xff1f;……想了解UIE技术在产业中的实战落地经验&#xff1f;通用信息抽取技术 UIE 产业案例…

FPGA基础知识

FPGA是在PAL、PLA和CPLD等可编程器件的基础上进一步发展起来的一种更复杂的可编程逻辑器件。它是ASIC领域中的一种半定制电路&#xff0c;既解决了定制电路的不足&#xff0c;又克服了原有可编程器件门电路有限的缺点。 由于FPGA需要被反复烧写&#xff0c;它实现组合逻辑的基…

【强化学习】强化学习数学基础:贝尔曼公式

强化学习数学基础&#xff1a;贝尔曼公式强化学习的数学原理课程总览贝尔曼公式&#xff08;Bellman Equation&#xff09;一个示例状态值贝尔曼公式&#xff1a;推导过程贝尔曼公式&#xff1a;矩阵-向量形式&#xff08;Matrix-vector form&#xff09;贝尔曼公式&#xff1a…

(四)应变度量

本文主要内容包括&#xff1a;1. Hill 应变度量 与 Seth 应变度量2. Hill -Seth 应变度量的 Lagrange 描述2.1. Green-Lagrange 应变张量2.2. 物质 Biot 应变张量/工程应变2.3. 右 Henkey 应变张量/Lagrange 型对数应变2.4. Piola 应变张量3. Hill -Seth 应变度量的 Euler 描述…

Tesla都使用什么编程语言?

作者 | 初光 出品 | 车端 备注 | 转载请阅读文中版权声明 知圈 | 进“汽车电子与AutoSAR开发”群&#xff0c;请加微“cloud2sunshine” 总目录链接>> AutoSAR入门和实战系列总目录 带着对更美好未来的愿景&#xff0c;特斯拉不仅成为有史以来最有价值的汽车公司&…

乐友商城学习笔记(五)

什么是Nginx Nginx是一个高性能的web和反向代理服务器 作为web服务器作为负载均衡服务器作为邮件代理服务 树组件的用法 跨域问题 跨域&#xff1a;浏览器对javastript的同源策略的限制。 以下情况都属于跨域&#xff1a; 域名不同域名相同&#xff0c;端口不同二级域名不…

Python每日一练(20230225)

目录 1. 整数反转 2. 求最大公约数和最小公倍数 最大公约数 最小公倍数 3. 单词搜索 II 附录&#xff1a; DFS 深度优先搜索算法 BFS 广度优先搜索算法 BFS 和 DFS 的区别 1. 整数反转 给你一个 32 位的有符号整数 x &#xff0c;返回将 x 中的数字部分反转后的结果。…

大型旋转设备滑动轴承X、Y测点振动值说明(转载的)

滑动轴承支撑的大型旋转设备&#xff0c;绝大部分的故障都表现为不平衡引起的1倍频振动&#xff0c;诊断故障原因要根据振动随转速、负荷、温度、时间的变化情况来具体判断。滑动轴承设备的诊断主要依据电涡流传感器测量轴和轴瓦间的相对振动&#xff0c;判断转子相关的各种问题…

基于SpringBoot的共享汽车管理系统

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7/8.0 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.3.9 浏…

Orin安装ssh、vnc教程

文章目录一&#xff1a;ssh远程终端的配置PC的配置MobaXterm的下载二&#xff1a;VNC Viewer远程图形界面终端配置&#xff1a;PC配置&#xff1a;一&#xff1a;ssh远程 终端的配置 1.ifconfig查看终端ip地址 其中的eth是网口&#xff0c;我们需要看的是wlan0下的inet&#…

5M1270ZT144A5N CPLD 980MC 6.2NS 144TQFP /5M1270ZT144C5N

【产品介绍】MAX V系列低成本和低功耗CPLD提供更大的密度和每占地面积的I/O。MAX V器件的密度从40到2210个逻辑元件(32到1700个等效宏单元)和多达271个I/O&#xff0c;为I/O扩展、总线和协议桥接、电源监控和控制、FPGA配置和模拟IC接口等应用提供可编程解决方案。MAX V器件具有…

MYSQL 索引失效的十个场景(二)

六、对索引列运算&#xff08;如&#xff0c;、-、*、/、%等&#xff09;&#xff0c;索引失效 CREATE TABLE student (id bigint(20) NOT NULL AUTO_INCREMENT,name varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,score decimal(10,2) DEFAULT NULL,subject varchar(…

【华为OD机试模拟题】用 C++ 实现 - 绘图机器(2023.Q1)

最近更新的博客 华为OD机试 - 入栈出栈(C++) | 附带编码思路 【2023】 华为OD机试 - 箱子之形摆放(C++) | 附带编码思路 【2023】 华为OD机试 - 简易内存池 2(C++) | 附带编码思路 【2023】 华为OD机试 - 第 N 个排列(C++) | 附带编码思路 【2023】 华为OD机试 - 考古…

【Leedcode】环形链表必备的面试题和证明题(附图解)

环形链表必备的面试题和证明题&#xff08;附图解&#xff09; 文章目录环形链表必备的面试题和证明题&#xff08;附图解&#xff09;前言一、第一题1.题目2.思路3.代码4.延伸问题(1)证明题一&#xff1a;(2)证明题二&#xff1a;二、第二题1.题目2.思路延伸的证明题总结前言 …

【网络原理8】HTTP请求篇

在上一篇文章当中&#xff0c;我们也提到了什么是HTTP。 每一个HTTP请求&#xff0c;都会对应一个HTTP响应。 下面这一篇文章&#xff0c;将聊一下HTTP请求的一些内容 目录 一、URL 第一部分&#xff1a;协议名称 第二部分:认证信息(新的版本已经没有了) 第三部分&#xf…

这款 Python 调试神器推荐收藏

大家好&#xff0c;对于每个程序开发者来说&#xff0c;调试几乎是必备技能。 代码写到一半卡住了&#xff0c;不知道这个函数执行完的返回结果是怎样的&#xff1f;调试一下看看 代码运行到一半报错了&#xff0c;什么情况&#xff1f;怎么跟预期的不一样&#xff1f;调试一…

【教学类-10-03】python单线程下载哔哩哔哩网址(中间字母不同,前面后面相同)的视频

背景需求&#xff1a;最近测试以前的多线程&#xff08;同时下载5个视频&#xff09;&#xff0c;结果30个视频只下到了3个&#xff0c;于是把“单个下载&#xff08;单线程下载&#xff09;”的一个代码进行拓展研究。前一篇介绍了网址尾数递增的遍历程序&#xff0c;本篇介绍…

【华为OD机试模拟题】用 C++ 实现 - 最大报酬(2023.Q1)

最近更新的博客 华为OD机试 - 入栈出栈(C++) | 附带编码思路 【2023】 华为OD机试 - 箱子之形摆放(C++) | 附带编码思路 【2023】 华为OD机试 - 简易内存池 2(C++) | 附带编码思路 【2023】 华为OD机试 - 第 N 个排列(C++) | 附带编码思路 【2023】 华为OD机试 - 考古…