Redis消息队列 | 黑马点评

news2025/1/16 0:56:51

目录

一、认识消息队列

二、List模拟消息队列

三、PubSub的消息队列

四、Stream的消息队列(重点)

        1、单消费模式

        2、消费者组

五、redis三种消息队列对比 

 六、优化秒杀实战

1、创建消息队列

2、修改下单脚本

 3、接收消息处理


一、认识消息队列

消息队列,字面意思就存放消息的队列。最简单的消息队列模型包括3个角色:

消息队列:存储和管理消息,也被称为消息代理

生产者:发送消息到消息队列

消费者:从消息队列获取消息并处理消息

解决了jvm堵塞队列内存不足的问题,而且消息队列是可以持久化的,宕机了依然能够保存。

redis提供三种不同方式实现消息队列:

  • list结构:基于list结构模拟消息队列
  • PubSub:基于的点对点消息队列
  • Stream:比较完善的消息队列模型(推荐)

二、List模拟消息队列

redis的list结构是一个双向链表,很容易模拟出队列效果

队列是入口和出口不在一边,因此可以用LPUSH结合RPOP、或者RPUSH结合LPOP实现

但是,当队列没有消息时pop就会返回null,并不会jvm堵塞队列那样堵塞并等待消息,因此这里应该使用BRPOP或者BLPOP来实现堵塞队列。

缺点:

无法避免消息丢失。从消息队列取到消息,还没来得及处理就挂掉了,这个消息就消失了。

只支持单消费者。一个人拿走就从队列里面弹出了。

三、PubSub的消息队列

PubSub(发布订阅)是redis2.0版本引入的消息传递模型,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

支持多生产、多消费

缺点:

不支持数据持久化(刚刚的list本质是做存储的我们拿来当队列所以可以持久化)

无法避免消息丢失。

消息堆积有上限,超出时数据丢失。(缓存空间是有上限的)

四、Stream的消息队列(重点)

Stream是redis5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

1、单消费模式

特点:

  • 消息可回溯。不消失永久保存在队列里。
  • 一个消息可以被多个消费者读取。读完不消失的,可以多个读
  • 可以堵塞读取
  • 有消息漏读的风险

2、消费者组

消费者组(Consumer Group):将多个消费者划分到一个组,监听同一个队列。

 消费者监听消息的基本思路

stream类型消息队列的消费者组特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏镀的风险
  • 有消息确认机制,保证消息至少被消费一次

五、redis三种消息队列对比 

 六、优化秒杀实战

1、创建消息队列

创建一个stream类型的消息队列,名为stream.orders

2、修改下单脚本

修改之前秒杀下单lua脚本,认定有抢购资格后,直接向steam.orders中添加消息,内容包含voucher、userId、orderId

-- 优惠券id
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 订单id
local orderId = ARGV[3]

-- 库存key
local stockKey = "seckill:stock:"..voucherId
-- 订单key
local orderKey = "seckill:order:"..voucherId

-- 判断库存是否充足
if(tonumber(redis.call('get', stockKey)) <= 0) then
    return 1
end

-- 判断用户是否已经下过单
if(redis.call('sismember', orderKey, userId) == 1) then
    return 2
end

-- 扣减库存
redis.call('incrby', stockKey, -1)

-- 将 userId 存入当前优惠券的 set 集合
redis.call('sadd', orderKey, userId)

-- 将订单信息存入到消息队列中 xadd stream.orders * k1 v1 k2 v2
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

 3、接收消息处理

项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

    /***
     * 创建线程池
     */
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    /***
     * 容器启动时,便开始创建独立线程,从队列中读取数据,创建订单
     */
    @PostConstruct
    private void init(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    private class VoucherOrderHandler implements Runnable {

        @Override
        public void run() {
            while(true){
                try {
                    // 获取消息队列中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2000)),
                            StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                    );
                    // 判断订单信息是否为空
                    if(list == null || list.isEmpty()){
                        // 如果为 null,说明没有消息,继续下一次循环
                        continue;
                    }
                    // 解析消息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 创建订单
                    createVoucherOrder(voucherOrder);
                    // 确认消息 xack s1 g1 id
                    stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理订单异常!", e);
                    handlePendingList();
                }
            }

        }

        private void handlePendingList() {
            while(true){
                try {
                    // 获取 pending-list 中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                    );
                    // 判断订单信息是否为空
                    if(list == null || list.isEmpty()){
                        break;
                    }
                    // 解析消息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 创建订单
                    createVoucherOrder(voucherOrder);
                    // 确认消息 xack s1 g1 id
                    stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理订单异常!", e);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException interruptedException) {
                        interruptedException.printStackTrace();
                    }
                }
            }

        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/178558.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

设计模式 - 创建型模式_工厂方法模式

文章目录创建型模式概述CaseBad ImplBetter Impl &#xff08;⼯⼚模式优化代码&#xff09;创建型模式 创建型模式提供创建对象的机制&#xff0c; 能够提升已有代码的灵活性和可复⽤性。 类型实现要点工厂方法定义⼀个创建对象的接⼝&#xff0c;让其⼦类⾃⼰决定实例化哪⼀…

【蓝桥杯-筑基篇】基础数学思维与技巧(1)

&#x1f353;系列专栏:蓝桥杯 &#x1f349;个人主页:个人主页 目录 1.一百以内的AB 2.小学生算术求进位次数 3.最大公约数 4.最小公倍数 5.十进制转换其他进制 6.其他进制转十进制 7.天空数 8.求集合的所有子集 9.判断一个数是否为2的次方数 10.二进制中1的个数 1.一…

ISIS简介、NSAP与NET地址、Router-Id转换成NET地址

2.0.0 ISIS简介、NSAP与NET地址、Router-Id转换成NET地址 ISIS简介 IS-IS&#xff08;Intermediate System-to-Intermediate System&#xff09;中间系统到中间系统。 1、该协议最初是ISO国际标准化组织为CLNP&#xff08;Connection Less Network Protocol&#xff0c;无连接…

HashMap 正解

HashMap 实现原理 以及扩容机制 HashMap 的 put 以及扩容基本实现 数据结构 上述截图是 HashMap 的内部存储的数据结构。大体上是通过 hash 值来获取到对应的下标。如果当前下标为 null 的话&#xff0c;直接创建并设置一个新的节点&#xff0c;反之就是添加到该链表的最后 pu…

好客租房-09_学习MongoDB并完善通讯系统

9. 学习MongoDB 并完善租房的通讯系统后端本章目的为MongoDB快速入门, 并完善上一节编写的通讯系统后台, 将DAO层从HashMap迁移到MongoDB中.思考如下问题:MongoDB属于关系型还是非关系型数据库为什么在我们的通讯系统中选择MongoDB作为数据库?9.1 mongoDB概念简介MongoDB是一个…

python+django医院固定资产设备管理系统

管理员功能模块 管理员登录&#xff0c;通过填写用户名、密码、角色等信息&#xff0c;输入完成后选择登录即可进入医院设备管理系统&#xff0c; 管理员登录进入医院设备管理系统可以查看首页、个人中心、科室员管理、维修员管理、设备领用管理、设备信息管理、设备入库管理、…

人工智能入门杂记

本篇文章属于所有发表的文章的导读吧&#xff0c;以后会常更新。 目录 1.数据挖掘、机器学习、深度学习、云计算、人工智能 2.深度学习、强化学习、对抗学习、迁移学习 3.基础知识--线性代数 4.基础知识--概率与数理统计 5.常用工具库 6.机器学习 6.1 什么是训练什么是推…

Java数组

文章目录Java 数组一、数组介绍二、数组1. 数组静态初始化1.1 数组定义格式1.2 数组静态初始化2. 数组元素访问3. 数组遍历操作3.1 数组遍历介绍3.2 数组遍历场景3.3 数组遍历案例1&#xff09;数组遍历-求偶数和2&#xff09;数组遍历-求最大值3&#xff09;数组遍历综合案例4…

【C语言航路】第十四站:文件

目录 一、为什么使用文件 二、什么是文件 1.程序文件 2.数据文件 3.文件名 三、文件的打开和关闭 1.文件指针 2.文件的打开和关闭 四、文件的顺序读写 1.对于输入输出的理解 2.fgetc与fputc &#xff08;1&#xff09;fgetc与fputc的介绍 &#xff08;2&#xff0…

2023年springcloud面试题(第一部分)

1. 什么是微服务架构微服务架构就是将单体的应用程序分成多个应用程序&#xff0c;这多个应用程序就成为微服务&#xff0c;每个微服务运行在自己的进程中&#xff0c;并使用轻量级的机制通信。这些服务围绕业务能力来划分&#xff0c;并通过自动化部署机制来独立部署。这些服务…

MP-4可燃气体传感器介绍

MP-4可燃气体传感器简介MP-4可燃气体传感器采用多层厚膜制造工艺&#xff0c;在微型Al2O3陶瓷基片的两面分别制作加热器和金属氧化物半导体气敏层&#xff0c;封装在金属壳体内。当环境空气中有被检测气体存在时传感器电导率发生变化。该气体的浓度越高&#xff0c;传感器的电导…

JavaWeb | JDBC相关API详解 2 (内附以集合形式输出表)

本专栏主要是记录学习完JavaSE后学习JavaWeb部分的一些知识点总结以及遇到的一些问题等&#xff0c;如果刚开始学习Java的小伙伴可以点击下方连接查看专栏 本专栏地址&#xff1a;&#x1f525;JDBC Java入门篇&#xff1a; &#x1f525;Java基础学习篇 Java进阶学习篇&#x…

C语言编程题

1、求斐波那契数列1&#xff0c;1&#xff0c;2&#xff0c;3&#xff0c;5&#xff0c;8……前20项之和 #include<stdio.h> int main() {int i,j,k,t2;ij1;printf("%d %d\n",i,j);for(k0;k<9;k){iij;jij;ttij;printf("%d %d\n",i,j);}printf(&q…

java七大查找 十大排序 贪心

七大查找 1.1二分查找(前提是 数据有序)说明&#xff1a;元素必须是有序的&#xff0c;从小到大&#xff0c;或者从大到小都是可以的。public static int binarySearc(int[] arr,int number){int min0;int maxarr.length-1;while(true){if(min>max){return -1;}int mid(maxm…

c++二插搜索树

1二插搜索树的概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#xff0c;或者是具有以下性质的二叉树: ​ 若它的左子树不为空&#xff0c;则左子树上所有节点的值都小于根节点的值 ​ 若它的右子树不为空&#xff0c;则右子树上所有节点的值都大于根节点的值 …

mongodb shell

连接指定数据库 .\mongosh.exe localhost:27017/test不连接数据库 .\mongosh.exe --nodb然后连接数据库 conn new Mongo("localhost:27017") /// mongodb://localhost:27017/?directConnectiontrue&serverSelectionTimeoutMS2000 db conn.getDB("test&q…

Git学习笔记(黑马)

目录 一、获取本地仓库 二、为常用指令配置别名 三、基础操作指令 四、分支 五、Git远程仓库&#xff08; 码云Gitee&#xff09; &#xff08;一&#xff09;配置SSH公钥 &#xff08;二&#xff09;Gitee设置账户公钥 六、操作远程仓库 &#xff08;一&#xff09;添…

【数据结构】详谈复杂度

目录 1.前言 2.什么是复杂度 3.如何计算时间复杂度 1.引例 2.二分查找 3.常见的复杂度 4.如何计算空间复杂度 5.关于递归 6.总结 1.前言 我们在做一些算法题时&#xff0c;经常会发现题目会对时间复杂度或者空间复杂度有所要求&#xff0c;如果你不知道什么是复杂度时&am…

SQL--DDL

目录 一、数据库的相关概念 二、MySQL数据库 1. 关系型数据库&#xff08;RDBMS&#xff09; 2. 数据数据库 3. MySQL客户端连接的两种方式 方式一&#xff1a;使用MySQL提供的客户端命令行工具 方式二&#xff1a;使用系统自带的命令行工具执行指令 三、SQL SQL的…

【C++】深浅拷贝

最近一些老铁一直问我深浅拷贝的问题&#xff0c;今天我们就来介绍一下深浅拷贝在说深浅拷贝构造之前&#xff0c;我们先介绍一下拷贝构造函数的应用场景&#xff1a;使用另一个同类型的对象来初始化新创建的对象。浅拷贝我们在学类和对象时了解到了类的6大默认函数&#xff0c…