基于redis实现消息队列(更推荐使用专业的mq)

news2024/12/21 18:37:02

目录

 利用redis实现消息队列(基于list,点对点模型)——lpush存放队列(lpush 队列名 队列内容(可一次存放多个内容,用空格隔开)) brpop取队列(brpop 队列名 等待时间单位秒(一次取一个))

 基于pubsub实现消息队列(发布订阅模型)

 基于stream的消息队列

方法一:

存放消息

 读取消息

​编辑  案例

 特点:

 方法二:消费者组

 创建

 读取

 java实现思路

 特点

三种方式的对比:

 案例,更改之前的案例


基于基于秒杀-----分布式锁----lua脚本_xzm_的博客-CSDN博客改进

 利用redis实现消息队列(基于list,点对点模型)——lpush存放队列(lpush 队列名 队列内容(可一次存放多个内容,用空格隔开)) brpop取队列(brpop 队列名 等待时间单位秒(一次取一个))

优缺点:

 基于pubsub实现消息队列(发布订阅模型)

 

 优缺点:

 基于stream的消息队列

方法一:

存放消息

 读取消息

  案例

 特点:

 方法二:消费者组

 创建

 读取

 java实现思路

 特点

三种方式的对比:

 个人感觉:mq>stream>list>pubsub

 案例,更改之前的案例

创建消息队列

 修改Lua秒杀脚本

---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by Lenovo.
--- DateTime: 2023/5/30 16:55
---

-- 1.参数列表
-- 1.1 . 优惠卷id
local voucherId= ARGV[1]
-- 1.2. 用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]

-- 2.数据key
--2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
--2.2 订单key
local orderKey = 'seckill:order:' .. voucherId


-- 3. 脚本业务
-- 3.1. 判断库存是否充足 get stockKey
if (tonumber(redis.call('get',stockKey)) <= 0 ) then
    -- 3.2. 库存不足,返回1
    return 1
end

-- 3.2. 判断用户是否下单 SISMEMBER orderKey userId
if (redis.call('SISMEMBER',orderKey,userId) == 1) then
    -- 3.3. 存在,说明是重复下单,返回2
    return 2
end
-- 3.4. 扣库存 incrby stockKey -1
redis.call('incrby',stockKey, -1)
-- 3.5. 下单(保存用户) sadd orderKey userId
redis.call('sadd',orderKey,userId)
-- 3.6 发送消息到队列中,xadd stream.orders * k1 v1 k2 v2 ...
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
return 0

修改java代码实现发送消息

/**
     * 基于stream的实现秒杀
     * @param voucherId
     * @return
     */
    public Result seckillVoucher(Long voucherId) {
        //获取用户
        Long userId = UserHolder.getUser().getId();
        //生成订单id
        long orderId = redisIdWorker.nextId("order");
        //执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,//脚本文件
                Collections.emptyList(),//key的集合
                voucherId.toString(), userId.toString()  ,String.valueOf(orderId)//三个ARGV
        );
        //将返回值转换为int
        int i = result.intValue();
        //判断结果是否为零
        if (i != 0 ){
            //不为零,无法购买 ---1为库存不足,2为用户已经下单
            return Result.fail(i == 1 ? "库存不足" : "不能重复下单");
        }
        //可以购买
        //获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();
        //返回订单id
        return Result.ok(orderId);
    }

 实现消息的消费

 //创建线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    @PostConstruct
    private void init(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    private class VoucherOrderHandler implements Runnable{
        String queueName="stream.orders";
        @Override
        public void run() {
            while (true){
                try {
                    //1.获取消息队列中的订单消息,xreadgroup group g1 c1 count 1 block 2000 streams streams.order >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    //2.判断消息是否获取成功
                    if (list ==null || list.isEmpty()){
                        //获取失败,进行下一次循环
                        continue;
                    }
                    //3.解析消息中的订单信息
                    MapRecord<String, Object, Object> entries = list.get(0);
                    Map<Object, Object> value = entries.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    //2.创建订单
                    handleVoucherOrder(voucherOrder);
                    //ack确认 ack stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",entries.getId());
                } catch (Exception e) {
                    log.error("处理订单异常",e);
                    handlePendingList();
                }
            }

        }

        private void handlePendingList(){
            while (true){
                try {
                    //1.获取pending-list中的订单消息,xreadgroup group g1 c1 count 1  streams streams.order >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    //2.判断消息是否获取成功
                    if (list ==null || list.isEmpty()){
                        //获取失败,退出循环
                        break;
                    }
                    //3.解析消息中的订单信息
                    MapRecord<String, Object, Object> entries = list.get(0);
                    Map<Object, Object> value = entries.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    //2.创建订单
                    handleVoucherOrder(voucherOrder);
                    //ack确认 ack stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",entries.getId());
                } catch (Exception e) {
                    log.error("处理pending-list订单异常",e);
                    try {
                        Thread.sleep(20);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    }

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/612051.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器视觉日常习题(更新中。。。)

目录 第二讲 图像处理概述 第二讲 图像处理概述 几何变换&#xff1a;又称为图像空间变换&#xff0c;它将一幅图像中的坐标位置映射到另一幅图像中的新坐标位置。图像分割&#xff1a;把图像分成各具特色的区域并提取感兴趣目标的技术和过程。图像边缘&#xff1a; 平滑&#…

【深入浅出 Spring Security(六)】一文搞懂密码的加密和比对

Spring Security 中的密码加密 一、PasswordEncoder 详解常见的实现类&#xff08;了解&#xff09;DelegatingPasswordEncoder源码分析DelegatingPasswordEncoder 在哪实例化的&#xff1f; 二、自定义加密自定义方式一&#xff1a;使用{id}的形式自定义方式二&#xff1a;向S…

程序员失业转行送外卖,晒出当天收入,还以为看错了!

在程序员的共识中&#xff0c;30岁之前自己是很受企业欢迎的&#xff0c;有经验有技术&#xff0c;能够为公司创造足够多的价值。 但是一旦超过了35岁&#xff0c;如果没有做到架构师或者成为管理人员&#xff0c;那很可能是连工作都找不到的。而且这个年龄的程序员还要面临着…

700MHz设备对广播电视信号的干扰有哪些?

700MHz&#xff0c;由于其较长的波长&#xff0c;良好的传播与覆盖特性&#xff0c;不仅一直被多国用作广播电视信号频率&#xff0c;4G LTE和5G NR也同样看好这一频段&#xff0c;并在此频段上进行了相应的部署和规划。目前已经有超过45个国家和地区&#xff0c;将700MHz频段部…

【网络安全】企业应急响应基础技能

windows 任务计划列表 1. 计算机管理窗口,选择 系统工具 中 任务计划程序 中的 任务计划程序库选项 可以查看任务计划的名称,状态,触发器等详细信息 2.powershell中输入get-scheduledtask 可以查看当前系统所有任务计划信息 任务路径,名称,状态等详细信息 3.命令行中输入s…

C++11:右值引用,实现移动语义和完美转发

目录 1、右值引用 2、移动语义&#xff08;std::move&#xff09; 3、完美转发&#xff08;std::forward&#xff09; 1、右值引用 右值引用&#xff08;Rvalue reference&#xff09;是C11引入的一个新特性&#xff0c;它是一种新的引用类型&#xff0c;用于表示将要被移动…

5个小时,搭出2套应用,这一低代码平台很强劲!

现代管理学之父德鲁克提及创新本质时&#xff0c;说了两点&#xff1a; 一是让昂贵的东西变得便宜&#xff0c;老百姓能用&#xff1b;二是让高门槛东西变得低门槛&#xff0c;普通人可用。 而低代码正符合这两个条件。 一、背景 所谓低代码&#xff0c;是一种软件开发方法&…

常用的LED显示屏驱动芯片和控制系统

常用的LED显示屏驱动芯片包括以下几种&#xff1a; TPIC6B595&#xff1a;这是一种串行输入、并行输出的LED显示屏驱动芯片&#xff0c;适用于驱动7段数码管等简单的LED显示屏。 MAX7219/MAX7221&#xff1a;这是一种常用的LED显示屏驱动器&#xff0c;可驱动8x8点阵LED显示屏。…

npm 发布新包或者新模块后,无法下载最新版本,如何解决?

目录 1、方法一&#xff1a;在npm官网搜索对应的模块&#xff0c;看看是否有最新版本的存在&#xff1f; 2、方法二&#xff1a;排查本地使用的是什么镜像&#xff1f; 3、方法三&#xff1a;将淘宝镜像切换成npm镜像 1、方法一&#xff1a;在npm官网搜索对应的模块&#xf…

运维小白必学篇之基础篇第十四集:DHCP中继实验

DHCP中继实验 目录 服务器端&#xff1a;&#xff08;vmware5&#xff09; 中继器端&#xff1a;(双网卡ens33、vmware5&#xff1b;ens36、vmware6&#xff09; 客户端&#xff1a;&#xff08;vmware6&#xff09; 实验作业&#xff08;主机名为自己的名字&#xff09;&a…

WebGIS学习-01-GIS基础概念与Mapbox基础

1.地图数据来源 1.栅格数据&#xff1a; -.jpg&#xff0c;.png等图片数据&#xff1b; -卫星等拍摄的影像&#xff1b;.tiff 2.矢量数据&#xff1a; -geojson的数据&#xff0c;多用于绘制边界 -放大缩小都不会失真&#xff0c;且高度支持手绘 2.网页是如何渲染地图数据的 …

什么是压力测试?如何进行Jmeter压力测试

一、什么是压力测试 软件测试中&#xff1a;压力测试&#xff08;Stress Test&#xff09;&#xff0c;也称为强度测试、负载测试。压力测试是模拟实际应用的软硬件环境及用户使用过程的系统负荷&#xff0c;长时间或超大负荷地运行测试软件&#xff0c;来测试被测系统的性能、…

深度学习-调参技巧总结

针对CNN优化的总结 使用没有 batchnorm 的 ELU 非线性或者有 batchnorm 的 ReLU。用类似1*1的网络结构预训练RGB数据&#xff0c; 能得到更好的效果。使用线性学习率衰退策略。使用平均和最大池化层的和。使用大约 128&#xff08;0.005&#xff09; 到 256 &#xff08;0.01&a…

5.1 - Web漏洞 - XSS漏洞详解

「作者简介」&#xff1a;CSDN top100、阿里云博客专家、华为云享专家、网络安全领域优质创作者 「推荐专栏」&#xff1a;对网络安全感兴趣的小伙伴可以关注专栏《网络安全入门到精通》 XSS漏洞 一、什么是XSS&#xff1f;二、XSS概述三、靶场练习四、XSS使用步骤五、XSS攻击类…

el-table动态表头数据渲染

desc&#xff1a;el-table的数据是后端动态返回的&#xff0c;包括表头的情况下如何进行渲染 // 第一页的数据 {"code": 1,"message": "查询成功","data": {"current": 1,"size": 10,"total": 41,"…

企业如何通过CRM系统做好客户管理?

企业如何通过CRM系统做好客户管理&#xff1f; CRM客户管理作为一种综合性的管理思想&#xff0c;可以帮助我们更好地了解客户需求和行为&#xff0c;制定更加精准的销售和服务策略&#xff0c;提高客户满意度和忠诚度&#xff0c;从而实现可持续发展。 接下来将给大家详细介…

SQL——事务

&#x1f388; 什么是事务 &#x1f4a7; 概念 事务是用于保证数据的一致性,它由一组相关的DML&#xff08;增、删、改&#xff09;语句&#xff0c;该组的DML语句要么全部成功&#xff0c;要么全部失败。使用事务可以确保数据库的一致性和完整性&#xff0c;避免数据出现异常…

“微商城”项目(5登录和注册)

1.我的信息 在pages\User.vue文件中编写HTML结构代码&#xff0c;示例代码如下。 <template><div class"member"><div class"header-con"><router-link :to"{ name: login }" class"mui-navigate-right">&l…

段 寄 存 器-汇编复习(5)

图解演示8086CPU CS执行过程和逻辑 段 寄 存 器 8086CPU 在访问内存时要由相关部件提供内存单元的段地址和偏移地址,送入地址加法器合成物理地址。这里,需要看一下,是什么部件提供段地址。段地址在 8086CPU 的段寄存器中存放。8086CPU 有 4 个段存器: CS、DS、SS、ES。当80…

力扣刷题笔记——动态规划

动态规划基础 简称DP&#xff0c;如果某⼀问题有很多重叠⼦问题&#xff0c;使⽤动态规划是最有效的。 动态规划中每⼀个状态⼀定是由上⼀个状态推导出来的 1. 确定dp 数组&#xff08; dp table &#xff09;以及下标的含义 2. 确定递推公式 3. dp 数组如何初始化 4. 确定遍…