Redis实战案例21-消息队列

news2024/11/24 15:01:55

1. 基于JVM的阻塞队列的局限

  1. JVM内存限制问题,大量订单出现时,可能会超过JVM阻塞队列上限;
  2. 阻塞队列并不能持久化,因为内存不能持久化,出现异常或者宕机之类的故障时,出现数据丢失;

所以引出消息队列的概念

消息队列的两个优点:

  1. 消息队列在JVM以外的独立服务,不受JVM的内存限制;
  2. 消息队列不仅仅做数据存储,确保数据安全,会做数据的持久化,并且消费者取数据要做消息确认;如果没有确认,那么消息会在队列中依旧存在,下一次会再投递给消费者,让它继续处理,直到确认为止,确保消息至少消费一次;

在这里插入图片描述

在这里插入图片描述

2. 基于List结构模拟消息队列

在这里插入图片描述

  1. 假设从队列里取到消息,取到还未处理就发生了异常,这是消息就无法处理了,因为pop相当于remove;
  2. 发送消息,一旦被消费者拿走之后,别的消费者就无法获得了,无法解决一条消息多个消费者使用;

在这里插入图片描述

3. 基于PubSub的消息队列(不建议使用)

在这里插入图片描述

PSUBSCRIBE 订阅的格式匹配的三种规则

在这里插入图片描述
在这里插入图片描述

PubSub消息队列在传递消息时并不会将消息持久化到硬盘上,而是将消息存储在内存中,当服务重启或者发生故障时,可能会导致消息丢失。

在这里插入图片描述

4. 基于Stream的消息队列

在这里插入图片描述
在这里插入图片描述

$:返回最新的消息,前提是该条信息并没有被消费者读过,否则就是返回nil

在这里插入图片描述

阻塞等待读取最新的消息,阻塞时间设置为0表示永久等待直到有新等待消息

在这里插入图片描述

在这里插入图片描述

重点:这种读取方式存在着弊端,当指定其实ID为$时,代表读取最新等待消息,此时处理一条消息的过程中,又来了一条以上的信息到队列,则下次获取也只能获取最新的一条,可能就会出现漏读消息的问题;

当消费者读取一次之后,再生产k4、k5,此时消费者再次阻塞读取最新的消息,再生产消息k6,此时消费者只能获取消息k6,出现了消息漏读;
在这里插入图片描述

5. 基于Stream的消息队列问题优化

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
示例:

在这里插入图片描述
消息确认(k1…k5都在pending-list中等待确认)、

在这里插入图片描述
查看pending-list中所有未确认的元素

在这里插入图片描述
从pendin-list确认未确认的消息,此时消息的起始ID为0

在这里插入图片描述

所以可以得出处理消息的大致流程:先利用>的方式去获取所有未消费的消息,然后确认,如果出现异常,在Java中catch采用0的方法去获取pending-list的消息(异常消息),处理完毕再确认,pending-list清空;之后再使用>的方式继续获取未消费的消息,直到阻塞时间过后返回为nil;

在这里插入图片描述

6. 基于Redis的Stream结构实现异步秒杀

在这里插入图片描述

创建消息队列

在这里插入图片描述

修改lua脚本,认定可以抢购直接发送消息到消息队列中

注意

如果 redis.call('get', stockKey) 返回的结果是空值(nil),那么尝试将空值转换为数字时会出现错误。
因为无法将空值转换为数字。
为了避免这种错误,可以在进行比较之前,先检查返回结果是否为非空值。
这样,如果 redis.call('get', stockKey) 返回的结果是空值,就不会进行比较,从而避免了错误。

-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]

-- 2.数据key
-- 2.1 库存key
local stockKey = 'seckill:stock' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order' .. voucherId

-- 3.脚本业务
-- 3.1 判断库存是否充足
local stock = redis.call('get', stockKey)
if stock and tonumber(stock) <= 0 then
    -- 3.2 库存不足 返回1
    return 1
end
-- 3.2 判断用户是否下单
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.3 存在,说明是重复下单,返回2
    return 2
end
-- 3.4 扣库存
redis.call('incrby', stockKey, -1)
-- 3.5 下单(保存用户)
redis.call('sadd', orderKey, userId)
-- 3.6 发送消息到队列中,XDD stream.orders * k1 v1 k2 v2...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)

return 0

秒杀的逻辑修改(修改之前的阻塞队列方法)

@Override
public Result seckillVoucher(Long voucherId) {
    // 获取用户id
    Long userId = UserHolder.getUser().getId();
    // 订单id(生成唯一ID)
    long orderId = redisIdWorker.nextId("order");
    //1. 执行lua脚本(判断购买资格,发送订单信息到消息队列)
    Long result = stringRedisTemplate.execute(
            SECKILL_SCRIPT,
            Collections.emptyList(),// key参数为0,所以参数传空集合
            voucherId.toString(),
            userId.toString(),
            String.valueOf(orderId)
    );
    //2. 判断是否为0
    int i = result.intValue();
    if(i != 0) {
        //2.1 不为0,没有购买资格
        return Result.fail(i == 1 ? "库存不足" : "不能重复下单");
    }
    //3. 获取代理对象
    proxy = (IVoucherOrderService) AopContext.currentProxy();
    //4. 返回订单id
    return Result.ok(0);
}

开启线程任务

/**
 * 异步线程,从消息队列中取出订单信息,执行保存订单到数据库
 */
private class VoucherOrderHandler implements Runnable{
    String queueName = "stream.orders";
    @Override
    public void run() {
        while (true){
            try {
                // 1. 获取消息队列中的订单信息
                // XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order >
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                        Consumer.from("g1", "c1"),
                        StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                        StreamOffset.create(queueName, ReadOffset.lastConsumed())
                );
                // 2. 判断消息获取是否成功
                if(list == null || list.isEmpty()) {
                    // 2.1 如果获取失败,说明没有消息,继续下一次循环
                    continue;
                }
                // 3. 解析消息中的订单信息,键值类型参考脚本中的'userId', userId, 'voucherId', voucherId, 'id', orderId
                // 前者String指的是消息ID, <Object, Object>指的是上述键值对
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> values = record.getValue();
                // 3.1 转为voucherOrder对象
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                // 4. 如果有,获取成功,可以下单
                handleVoucherOrder(voucherOrder);
                // 5. ACK确认
                // SACK stream.orders g1 id
                stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
            } catch (Exception e) {
                log.error("订单异常信息",e);
                handlePendingList();
            }
        }
    }
    /**
     * 订单异常消息。采用0的方法去处理pending-list中的消息,进行ACK确认
     */
    private void handlePendingList() {
        while (true){
            try {
                // 1. 获取消息队列中的订单信息
                // XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order 0
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                        Consumer.from("g1", "c1"),
                        StreamReadOptions.empty().count(1),
                        StreamOffset.create(queueName, ReadOffset.from("0"))
                );
                // 2. 判断消息获取是否成功
                if(list == null || list.isEmpty()) {
                    // 2.1 如果获取失败,说明pending-list没有异常消息,继续下一次循环
                    break;
                }
                // 3. 解析消息中的订单信息,键值类型参考脚本中的'userId', userId, 'voucherId', voucherId, 'id', orderId
                // 前者String指的是消息ID, <Object, Object>指的是上述键值对
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> values = record.getValue();
                // 3.1 转为voucherOrder对象
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                // 4. 如果有,获取成功,可以下单
                handleVoucherOrder(voucherOrder);
                // 5. ACK确认
                // SACK stream.orders g1 id
                stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
            } catch (Exception e) {
                log.error("订单异常信息",e);
                try {
                    Thread.sleep(20);
                } catch (InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/772111.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux内核的任务:

硬件与软件之间的中间层&#xff1a;内核在技术层面上充当硬件和软件之间的中间层&#xff0c;负责将应用程序的请求传递给硬件&#xff0c;并处理硬件设备和组件的寻址和操作。 应用程序的接口&#xff1a;对于应用程序来说&#xff0c;内核是它们与硬件之间的接口。应用程序通…

基于 chinese-roberta-wwm-ext 微调训练 6 分类情感分析模型

一、模型和数据集介绍 1.1 预训练模型 chinese-roberta-wwm-ext 是基于 RoBERTa 架构下开发&#xff0c;其中 wwm 代表 Whole Word Masking&#xff0c;即对整个词进行掩码处理&#xff0c;通过这种方式&#xff0c;模型能够更好地理解上下文和语义关联&#xff0c;提高中文文…

NAS 问题处理记录

在解决自动配网的过程中&#xff0c;突然NAS不给力&#xff0c;偏偏这个时间找事情。上面这两个问题&#xff0c;说不复杂也不复杂&#xff0c;主要是自己在完全远程处理&#xff0c;很多不方便。当然少不了师弟的助攻&#xff0c;很感谢我的师弟帮忙&#xff0c;实验室的网络不…

Flink 启动就报错,但exception没提示。其中一个task failure 该怎么办?

文章目录 前言一、排查二、解决 前言 最近我在生产又遇到一个问题&#xff0c;就是消费着一段时间之后&#xff0c;忽然就不再消费了&#xff0c;但也不报错。观察了几次&#xff0c;我发现时间基本是停留在上下班高峰期数据量最大的时候。我主观猜测可能是同时间进来的数据过…

css通过子元素选择父元素

伪类:has选择父元素 td:has(> .unfoldTable){//可选中所有td下包含unfoldTable的class标签的td属性color: red; }td:has(> div){//可选中所有td下包含div标签的td属性color: red; } 特殊举例分析&#xff1a; 个别UI框架个别标签通过事件直接生成或者无法选中的情况。…

爆肝整理,Postman接口测试-全局变量/接口关联/加密/解密(超细)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 全局变量和环境变…

随手记——前端安全策略 Content-Security-Policy – CSP

随手记——前端安全策略 Content-Security-Policy – CSP 一、问题 1. 问题&#xff1a;前端meta标签中配置了CSP安全策略&#xff0c;导致使用第三方地图插件的时间报错不展示 2. 原安全配置&#xff1a; <meta http-equiv"Content-Security-Policy" content&…

STM32MP157驱动开发——按键驱动(查询方式)

文章目录 概述APP 读取按键的 4 种方法查询方式休眠-唤醒方式poll 方式异步通知方式 查询方式的按键驱动程序&#xff08;框架&#xff09;按键驱动编写思路board_xxx.cbutton_drv.cbutton_drv.hbutton_test.cMakefile编译测试 查询方式的按键驱动程序(stm32mp157)board_stm32m…

常见Redis使用问题

一 lettuce使用问题 1 问题描述 Redis Cluster集群&#xff0c;当master宕机&#xff0c;主从切换&#xff0c;客户端报错 timed out 2 原因 SpringBoot2.X版本开始Redis默认的连接池都是采用的Lettuce。当节点发生改变后&#xff0c;Letture默认是不会刷新节点拓扑的。 3…

Spring+SpringMvc+Mybatis整合小Demo

原始方式整合SSM 不使用spring-mybatis包 项目内容 整合ssm完成对account表新增和查询的操作 项目大体结构 创建mavenWeb项目 pom文件中引入依赖 spring核心、aspectj(aop)、spring-jdbc(jdbcTemplate)、spring-tx(事务)、 数据源&#xff1a;mysql、c3p0、mybatis my…

linux之Ubuntu系列 find 、 ln 、 tar、apt-get 指令 软链接和硬链接

查找文件 find 命令 功能非常强大&#xff0c;通常用来在 特定的目录下 搜索 符合条件的文件 find [path] -name “.txt” 记得要加 “ ” 支持通配符 &#xff0c;正则表达式 包括子目录 ls 不包括 子目录 如果省略路径&#xff0c;表示 在当前路径下&#xff0c;搜索 软链接…

测试老鸟总结,性能测试-最佳并发和最大并发,性能测试实施...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 性能测试&#xf…

无涯教程-Javascript - Switch语句

从JavaScript 1.2开始&#xff0c;您可以使用 switch 语句来处理这种情况&#xff0c;它比重复的 if ... else if 语句更有效。 流程图 以下流程图说明了switch-case语句的工作原理。 switch 语句的目的是给出一个要求值的表达式&#xff0c;并根据表达式的值执行多个不同的语…

云曦暑期学习第一周——sql注入

1浅谈sql注入 1.1sql注入 sql注入是指web应用程序对用户输入数据的合法性没有判断&#xff0c;前端传入后端的参数是攻击者可控的&#xff0c;并且参数带入数据库查询&#xff0c;攻击者可以通过构造不同的sql语句来实现对数据库的任意操作 1.2原理 条件&#xff1a; 1.参…

接入端口与中继端口

交换机端口是支持 IT 的基本组件&#xff0c;可实现网络通信。这些有线硬件设备负责连接并允许在不同设备和连接到其端口的网络部分之间进行数据传输。由于网络管理员在确保网络连接和可用性方面发挥着关键作用&#xff0c;因此网络管理员必须清楚地了解、映射和查看其网络交换…

面向对象Java基础

前言 看大话设计模式的时候&#xff0c;发现自己的基础不是很扎实&#xff0c;重新回顾一些存在有点点不确定的内容&#xff0c;并从书中截取下来&#xff0c;做成笔记快速复习。 1、字段和属性 字段&#xff1a;用private修饰&#xff0c;也叫私有变量。属性&#xff1a;字…

2.Docker操作

文章目录 Docker操作Docker镜像操作搜索镜像获取镜像镜像加速下载查看镜像详细信息为镜像添加标签删除镜像导出导入镜像上传镜像 Docker容器操作创建容器查看容器状态启动容器创建并启动容器进入容器停止容器删除容器复制容器文件到宿主机容器的导出导入 Docker操作 ###查看do…

【天工Godwork精品教程】天工3.1.7安装教程(附Godwork完整版下载地址)

本文讲解天工3.1.7安装过程(附Godwork完整版网盘下载地址)。 文章目录 一、天工3.1.7安装教程1. 安装GodWork-AT 3.1.72. 安装GodWork-AT 3.1.7补丁3. 安装GodWork-EOS-Setup-2017B-12314. 安装GodWork-EOS补丁5. 运行godwokr软件6. 生成ZC码7. 输入ZC码8. eos插件调用二、天…

AtcoderABC245场

A - Good morningA - Good morning 题目大意 给定Takahashi和Aoki的起床时间&#xff0c;判断谁先起床。 思路分析 题目要求比较Takahashi和Aoki的起床时间。首先&#xff0c;将起床时间转换为以分钟为单位。然后&#xff0c;通过比较两者的起床时间来确定谁先起床。 时间复…

文献阅读笔记——求解车辆路径问题及其变体的元启发式算法的分类综述

论文题目&#xff1a;A taxonomic review of metaheuristic algorithms for solving the vehicle routing problem and its variants 其他信息&#xff1a;Computers & Industrial Engineering|2020|Raafat Elshaer⁎, Hadeer Awad 文章贡献&#xff1a;1&#xff09;对使…