Redis【实战篇】---- Redis消息队列

news2025/1/19 22:16:53

Redis【实战篇】---- Redis消息队列

  • 1. Redis消息队列 - 认识消息队列
  • 2. Redis消息队列 - 基于List实现消息队列
  • 3. Redis消息队列 - 基于PubSub的消息队列
  • 4. Redis消息队列 - 基于Stream的消息队列
  • 5. Redis消息队列 - 基于Stream的消息队列-消费组
  • 6. 基于Redis的Stream结构作为消息队列,实现异步秒杀下单

1. Redis消息队列 - 认识消息队列

什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

在这里插入图片描述

使用队列的好处在于 **解耦:**所谓解耦,举一个生活中的例子就是:快递员(生产者)把快递放到快递柜里边(Message Queue)去,我们(消费者)从快递柜里边去拿东西,这就是一个异步,如果耦合,那么这个快递员相当于直接把快递交给你,这事固然好,但是万一你不在家,那么快递员就会一直等你,这就浪费了快递员的时间,所以这种思想在我们日常开发中,是非常有必要的。

这种场景在我们秒杀中就变成了:我们下单之后,利用redis去进行校验下单条件,再通过队列把消息发送出去,然后再启动一个线程去消费这个消息,完成解耦,同时也加快我们的响应速度。

这里我们可以使用一些现成的mq,比如kafka,rabbitmq等等,但是呢,如果没有安装mq,我们也可以直接使用redis提供的mq方案,降低我们的部署和学习成本。

2. Redis消息队列 - 基于List实现消息队列

基于List结构模拟消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。

队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

在这里插入图片描述

基于List的消息队列有哪些优缺点?
优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

3. Redis消息队列 - 基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg :向一个频道发送消息
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

在这里插入图片描述

基于PubSub的消息队列有哪些优缺点?
优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

4. Redis消息队列 - 基于Stream的消息队列

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息的命令:

在这里插入图片描述

例如:

在这里插入图片描述

读取消息的方式之一:XREAD

在这里插入图片描述

例如,使用XREAD读取第一个消息:

在这里插入图片描述

XREAD阻塞方式,读取最新的消息:

在这里插入图片描述

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下

在这里插入图片描述

注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

5. Redis消息队列 - 基于Stream的消息队列-消费组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

在这里插入图片描述

创建消费者组:

在这里插入图片描述

key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
其它常见命令:

删除指定的消费者组

XGROUP DESTORY key groupName

给指定的消费者组添加消费者

XGROUP CREATECONSUMER key groupname consumername

删除消费者组中的指定消费者

XGROUP DELCONSUMER key groupname consumername

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:
  • “>”:从下一个未消费的消息开始
    其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

消费者监听消息的基本思路:

在这里插入图片描述

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

最后我们来个小对比

在这里插入图片描述

6. 基于Redis的Stream结构作为消息队列,实现异步秒杀下单

需求:

  • 创建一个Stream类型的消息队列,名为stream.orders
  • 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
  • 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

修改lua表达式,新增3.6
在这里插入图片描述

VoucherOrderServiceImpl

    private class VoucherOrderHandler implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                    );
                    // 2.判断订单信息是否为空
                    if (list == null || list.isEmpty()) {
                        // 如果为null,说明没有消息,继续下一次循环
                        continue;
                    }
                    // 解析数据
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 3.创建订单
                    createVoucherOrder(voucherOrder);
                    // 4.确认消息 XACK
                    stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理订单异常", e);
                    //处理异常消息
                    handlePendingList();
                }
            }
        }

        private void handlePendingList() {
            while (true) {
                try {
                    // 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create("stream.orders", ReadOffset.from("0"))
                    );
                    // 2.判断订单信息是否为空
                    if (list == null || list.isEmpty()) {
                        // 如果为null,说明没有异常消息,结束循环
                        break;
                    }
                    // 解析数据
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 3.创建订单
                    createVoucherOrder(voucherOrder);
                    // 4.确认消息 XACK
                    stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
                } catch (Exception exception) {
                    log.error("处理pendding订单异常", exception);
                    try{
                        Thread.sleep(20);
                    }catch(Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/719142.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

环肽抑制剂:1088715-84-7,LY2510924,LY2510924游离态,试剂相关特点说明

​编辑资料作者&#xff1a;陕西新研博美生物科技有限公司小编MISSwu​ LY2510924&#xff0c;LY2510924游离态&#xff0c;拮抗剂LY2510924&#xff0c;环肽&#xff0c;抑制剂 Product structure&#xff1a; Product specifications&#xff1a; 1.CAS No&#xff1a;10887…

机器学习基础之《特征工程(1)—数据集》

一、数据集 1、目标 知道数据集分为训练集和测试集 会使用sklearn的数据集 2、可用数据集 公司内部&#xff0c;比如百度、微博 数据接口&#xff0c;花钱 政府拥有的数据集 3、在学习阶段用到的数据集 scikit-learn特点&#xff1a; &#xff08;1&#xff09;数据量较小 &…

SpringBoot配置外部Tomcat项目启动流程源码分析

前言 SpringBoot应用默认以Jar包方式并且使用内置Servlet容器(默认Tomcat)&#xff0c;该种方式虽然简单但是默认不支持JSP并且优化容器比较复杂。故而我们可以使用习惯的外置Tomcat方式并将项目打War包。 【1】创建项目并打War包 ① 同样使用Spring Initializer方式创建项目 …

宝塔Panel搭建Python环境

服务器安装python环境 找到软件商店 应用搜索 输入&#xff1a;python 安装Python项目管理器2.4 开启首页显示 回到首页 找到python管理器并点击进入 安装对应的python版本 到这里 服务器就可以告一段落了 在本地开发服务端应用并上传服务器 将写好的python应用 导出依赖…

蓝桥杯专题-真题版含答案-【世纪末的星期】【猜年龄】【组素数】【第39级台阶】

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例点击跳转>软考全系列点击跳转>蓝桥系列 &#x1f449;关于作者 专注于Android/Unity和各种游…

antv-x6在vue中使用:拖拽 Dnd、Stencil——以小诺管理平台为例

1、说明 由于antv-x6刚刚开放不久,一方面网上资料很少,此外antv目前官方的实例基本都是以react作为demo进行演示,所以vue的示例几乎没有,自己按照官方文档的react写了一个vue版本,仅供参考。 2、效果 先看一下demo的效果,如下所示 这是官方网文档的效果,同时官方也给…

【Spark】SparkCore

文章目录 RDD特点&#xff1a;弹性分布式数据集数据抽象不可变可分区、并行计算分区列表分区计算函数RDD 之间的依赖关系分区器&#xff08;可选&#xff09;首选位置&#xff08;可选&#xff09; 执行原理启动 Yarn 集群环境Spark 通过申请资源创建调度节点和计算节点Spark 框…

ue4:Dota总结_BP_CameraPawn篇

设计wasd移动&#xff1a; 鼠标拖动视口&#xff1a; 鼠标滚轮调整远近&#xff1a; Beginplay&#xff1a; qe按键旋转&#xff1a; 变量&#xff1a;

Python练手项目-学生成绩管理系统

之前在最初的学习中&#xff0c;我就写过一个Python的学生管理系统&#xff0c;但是那个很粗糙&#xff0c;很简陋 今天在学习过程中&#xff0c;再次拿出来&#xff0c;重新优化书写 希望对初学者以及需要的朋友有帮助。 文章目录 系统开发环境&#xff1a;1.实现功能2.系统设…

css3动画应用

按钮边框 按钮 首先实现正常边框按钮&#xff0c; 其次按钮相对定位&#xff0c;因为旋转的内容需要绝对定位&#xff0c; 旋转内容做伪元素处理&#xff0c;z-index设置负数是为了把按钮文字显示出来&#xff0c; 定位一半一半是把长方块中心点放按钮正中&#xff0c; 变形原…

照片jpg大小kb如何修改?图片在线压缩大小怎么处理?

最近需要在各种报名平台上传照片的小伙伴比较多&#xff0c;难免会遇到需要压缩jpg图片的情况&#xff0c;那么怎么才能将jpg图片压缩&#xff08;https://www.yasuotu.com/jpg&#xff09;呢&#xff1f;今天介绍一个图片在线压缩大小的方法&#xff0c;不用下载任何软件就可以…

内嵌 iframe 实现PDF预览

效果图如下&#xff1a; 代码如下&#xff1a; <template><div><!-- 控制浮层显示隐藏 --><el-button type"primary" size"small" class"btn" click"dialogVisible true">PDF 预览 (内嵌 iframe)</el-but…

pwn(7.4小学期)

Ret2libc 先查一下壳 32位&#xff0c;放入IDA中看看 查看一下 vulnerable_function();这个函数 Read函数&#xff0c;很明显的栈溢出 但是观察半天&#xff0c;发现这里并没有system函数 字符搜索我们看到了libc.so.6 可以想到libc函数库以及 PLT表中的代码会根据函数在…

[已成功破解] 阿里 taobao 滑条验证码 x5sec解密 slidedata参数

[已破解] 阿里 taobao 滑条验证码 x5sec解密 slidedata参数 今天在爬tb数据的时候 发现老是会触发一个滑块验证 只要过了这个滑块将滑块返回的x5secdata 的cookie 带到请求参数里面去 就能完美避开了 然后去抓了下滑块的包 过了这个滑块拿到cookie就能 访问阿里一直获取数据…

FPGA纯verilog实现UDP协议栈 AXIS用户接口,可替代Tri Mode Ethernet MAC,提供三套工程源码和技术支持

目录 1、前言2、我这里已有的UDP方案3、该UDP协议栈性能4、详细设计方案网络PHYRGMII转GMII模块AXIS FIFOUDP协议栈 5、vivado工程1-->B50610 工程6、vivado工程1-->RTL8211 工程7、vivado工程1-->88E1518 工程8、上板调试验证并演示准备工作查看ARPUDP数据回环测试 9…

1.5 基于MyBatis数据库逆向生成工具,并创建serverice和controller控制层

步骤1&#xff1a;在mybatis-generator中添加要生成的数据库表名 在genratorConfig.xml内容: <!-- 数据库表 --> <table tableName"stu"></table>步骤2&#xff1a;StuMapper.xml和StuMapper拷贝到对应的mapper模块下 步骤3&#xff1a;pojo对应…

计算机视觉颜色校正方法

计算机视觉颜色校正方法 调色和色彩矫正之间的区别直方图均衡化原理实现代码 CCM颜色校正矩阵原理 深度学习Deep_White_Balance什么是sRGB图像问题描述&#xff1a;方法概述&#xff1a;模型架构&#xff1a;训练损失函数实现快速开始 调色和色彩矫正之间的区别 调色是指通过调…

Vue3使用element-plus实现弹窗效果-demo

使用 <ShareDialog v-model"isShow" onChangeDialog"onChangeDialog" /> import ShareDialog from ./ShareDialog.vue; const isShow ref(false); const onShowDialog (show) > {isShow.value show; }; const onChangeDialog (val) > {co…

使用AI人工智能给线稿上色,给漫画上色

【深度学习】AIGC &#xff0c;ControlNet 论文中的图。 一些酷炫的可以做纵深领域的图&#xff0c;这里再放一下。 下图是由手绘草稿给出图&#xff1a; 由线稿得到图&#xff0c;给漫画上色&#xff1a;

简单的PWN学习-ret2shellcode

最近笔者开始钻研pwn的一些知识&#xff0c;发现栈溢出真的非常的有意思&#xff0c;于是经过一个多礼拜的学习&#xff0c;终于是把2016年的一道CTF题给看明白了了&#xff0c;首先我们学习一下前置技能 0x01 shellcode​ 首先简单看一下shellcode是怎么生成的&#xff0c;首…