【黑马点评Redis——003优惠券秒杀4——消息队列Stream】

news2024/12/26 22:38:54

1. 目前还存在的问题

  • 设置的阻塞队列可能会超出最大长度
  • 系统重启会导致阻塞队列中的信息消失,可能会出现问题

2. 消息队列

  • 消息队列 (Message Queue)。
    • 字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色消息队列:存储和管理消息,也被称为消息代理 (Message Broker)
  • 生产者
    • 发送消息到消息队列
  • 消费者
    • 从消息队列获取消息并处理消息

在这里插入图片描述

  • Redis提供了三种不同的方式来实现消息队列
  • list结构:基于List结构模拟消息队列
  • PubSub:基本的点对点消息队列
  • Stream:比较完善的消息队列模型

2.1 基于List结构模拟消息队列

在这里插入图片描述
优点

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点

  • 无法避免消息丢失(消息拿出来后没来得及处理就挂了,导致这条消息没有处理)
  • 只支持单消费者(一个人获取了这条消息,另一个人就不能获取了)

2.2 PubSub基本的点对点消息队列

在这里插入图片描述
优点

  • 采用发布订阅模型,支持多生产,多消费

缺点

  • 不支持数据持久化(服务一旦关闭就消失了)
  • 无法避免消息丢失(没人接收这条消息就丢了)
  • 消息堆积有上限,超出时消息丢失(缓存在客户端,有上限)

2.3 基于Stream的消息队列(重点)

Redis 中的 Stream 是一种在 Redis 5.0 版本引入的新数据类型,它专为实现高性能、高可靠性的消息队列和流式数据处理而设计。Stream 结构提供了一种有序、可持久化、可重复消费且支持多路写入与多消费者并行消费的消息存储模型。

写入命令XADD

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
  • key:消息队列的名称
  • NOMKSTREAM:如果队列不存在,是否自动创建队列。默认是自动创建
  • MAXLEN:最大长度,默认不设置上限
  • *|ID:消息的唯一id,*代表由Redis自动生成。
  • field value [field value …]:称为一个Entry,格式是多个key-value键值对
    在这里插入图片描述
    读取命令XREAD
    在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

特点

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

2.4 基于Stream的消息队列-消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备以下特点:

  • 消息分流
    • 队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
  • 消息标识
    • 消费者者会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息,确保每一个消息都会被消费。
  • 消息确认
    • 消费者获取消息后没消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除

2.4.1 创建消费者组

在这里插入图片描述

2.4.2 从消费者组读取消息

在这里插入图片描述

2.4.3 确认消息命令

将pending-list中的某个消息,标记为已处理。

XACK KEY GROUP ID

小结

  • 消息可回溯
  • 可以多消费者争抢消息,加快消息速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消息一次

2.4.4 XPENDING

XPENDING 是 Redis 5.0 引入的一项新命令,用于管理 Redis Streams 中的待处理消息。Redis Streams 是一个用于处理实时数据流的数据结构,而 XPENDING 则允许你查看、管理待处理消息的信息。
XPENDING 命令的一般语法如下:

XPENDING stream_name group_name [start end count] [consumer]

其中:
stream_name 是待处理消息所在的流的名称。
group_name 是消费者组的名称。
start 和 end 是两个可选参数,用于指定待处理消息的范围。
count 是一个可选参数,用于指定要返回的待处理消息的数量。
consumer 是一个可选参数,用于指定特定的消费者。

2.5 Java的伪代码

消费者监听消息的基本思路

while(true){
	//尝试监听队列,使用阻塞模式,最长等待2000ms
	Obejct msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAM s1 >");
	if(msg == null){
		// null说明没有消息,继续下一次
		continue;
	}
	try{
		//处理消息,完成后需要ACK
		handleMessage(msg)
	}catch(Exception e){
		while(true){
			Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAM s1 0");
			if(msg == null){
				//null说明没有异常消息,所有消息都已确认,结束循环
				break;
			}
				try{
					//处理消息,完成后需要ACK
					handleMessage(msg)
				}catch(Exception e){
					//再次出现异常,记录日志,继续循环
					continue;
				}
		}
	}
}

2.6 对比

在这里插入图片描述

3.代码优化目标

在这里插入图片描述

3.1 修改Lua脚本

-- 1.参数列表
-- 1.1. 优惠券id
local voucherId = ARGV[1]
-- 1.2. 用户id
local userId = ARGV[2]
-- 1.3. 订单id
local orderId = ARGV[3]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足
if(tonumber(redis.call('get',stockKey))<=0)then
    -- 3.2.库存不足,返回1
    return 1
end

-- 3.3.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call("sismember",orderKey,userId) == 1) then
    -- 3.4.存在,说明是重复下单
    return 2
end

-- 3.5.扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.6.下单(保存用户)sadd orderKey userId
redis.call('sadd',orderKey,userId)
-- 3.7.发送消息到队列中, XADD stream.order * K1 v1 K2 v2 ...
redis.call('xadd','stream.orders','*',"userId",userId,"voucherId",voucherId,"id",orderId)
return 0

3.2 修改Java端代码

    private class VoucherOrderHandler implements Runnable{
        String queueName = "stream.orders";
        @Override
        public void run(){
            while (true){
                try {
                    // 1.获取消息队列中的订单信息
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    // 2.判断消息获取是否成功
                    if ( list==null || list.isEmpty()){
                        // 2.1.如果获取失败,说明没有消息,继续下一次循环
                        continue;
                    }
                    // 3.解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(record.getValue(), new VoucherOrder(), true);
                    // 4.如果获取成功,可以下单
                    handleVoucherOrder(voucherOrder);
                    // 5.ACK确认
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
                }catch (Exception e){
                    handlePendingList();
                    log.error("处理订单异常:",e);
                }
            }
        }

        private void handlePendingList() {
            while (true){
                try {
                    // 1.获取pending-list队列中的订单信息,XREADGROUP GROUP g1 c1 COUNT 1 STREAMS streams.order 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    // 2.判断消息获取是否成功
                    if ( list==null || list.isEmpty()){
                        // 2.1.如果获取失败,说明没有消息,结束循环
                        break;
                    }
                    // 3.解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(record.getValue(), new VoucherOrder(), true);
                    // 4.如果获取成功,可以下单
                    handleVoucherOrder(voucherOrder);
                    // 5.ACK确认
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
                }catch (Exception e){
                    log.error("处理订单异常:",e);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException ex) {
                        throw new RuntimeException(ex);
                    }
                }
            }
        }
    }

4. 总结

目前我们已经能够使用Stream消息队列来实现一个功能较为完全的秒杀功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1620457.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网页自动跳转到其他页面,点击浏览器返回箭头,回不到原来页面的问题

背景&#xff1a;今天产品提个需求&#xff0c;需要从index页面自动触发跳转到下一页面的事件&#xff0c;从而不做任何操作&#xff0c;直接跳转到test页面。 代码是这样的&#xff1a; index.vue: <template><div style"width:500px;height:600px;background-…

20 Debian如何配置DNS服务(2)主从服务器

作者&#xff1a;网络傅老师 特别提示&#xff1a;未经作者允许&#xff0c;不得转载任何内容。违者必究&#xff01; Debian如何配置DNS服务&#xff08;2&#xff09;主从服务器 《傅老师Debian小知识库系列之20》——原创 前言 傅老师Debian小知识库特点&#xff1a; 1、…

纸箱码垛机:从传统到智能,科技如何助力产业升级

随着科技的飞速发展&#xff0c;传统工业领域正经历着一场重要的变革。作为物流行业重要一环的纸箱码垛机&#xff0c;其从传统到智能的转型升级&#xff0c;不仅提高了生产效率&#xff0c;还大幅降低了人工成本&#xff0c;为产业升级提供了强大助力。星派将探讨纸箱码垛机的…

【一些神金】怎么缓解工作压力?使用VS-code彩虹屁插件

怎么缓解工作压力&#xff1f; 其实吃点好的&#xff0c;多睡一会儿&#xff0c;再锻炼锻炼身体就好。 但我只是想炫耀一下这个彩虹屁插件。 原版插件&#xff1a;VS-code-Rainbowfart 我的版本&#xff1a;RainbowFart-Oberon 基于 MIT 开源&#xff0c;包括所有设计资源及音…

大数据信用风险竟然是这样形成的!查询方法也很简单

在大数据时代背景下&#xff0c;大数据信用风险成为了众多机构关注的焦点。这类风险涵盖了多头借贷、履约行为、联系人以及司法等多个方面。本文将深入解析大数据信用风险的形成原因及其查询方法&#xff0c;让我们一起来探索一下。 大数据信用风险主要表现在以下几个方面&…

硬盘删除的文件怎么恢复?这4个方法可以恢复误删文件

在数字时代&#xff0c;硬盘作为我们存储信息的主要工具&#xff0c;承载着大量的重要数据。然而&#xff0c;有时我们可能会因为误操作或病毒攻击等原因&#xff0c;不小心删除了硬盘中的文件。这时&#xff0c;如何有效地恢复这些文件就显得尤为重要。今天给大家分享三种恢复…

币圈是什么意思?币圈开发

币圈是一个涵盖了区块链、加密货币及其应用的独特领域&#xff0c;它的兴起与发展已经彻底改变了我们对金融、科技和未来的认知。 一、什么是币圈&#xff1f; 币圈可以被理解为围绕虚拟货币展开的一系列活动和产业的总称。它包括区块链技术的研发、数字货币的创造、交易、投资…

图形界面挂了?教你如何纯命令行下快速安装CentOS 7

在某些特定的系统或软件环境下&#xff0c;如使用 Parallels Desktop 18&#xff08;PD18&#xff09;虚拟化软件安装较老版本的操作系统&#xff08;如 CentOS 7&#xff09;&#xff0c;可能会遇到只能通过命令行进行安装的情况。这通常是由于内核版本与图形安装器的兼容性问…

JavaScript 进阶 (三)之构造函数/原型对象/对象原型/原型继承/原型链

JavaScript 进阶 &#xff08;三&#xff09;之构造函数/原型对象/对象原型/原型继承/原型链 编程思想面向过程面向对象 构造函数原型对象constructor 属性对象原型原型继承原型链 了解构造函数原型对象的语法特征&#xff0c;掌握 JavaScript 中面向对象编程的实现方式&#x…

【书生浦语第二期实战营学习笔记作业(四)】

课程文档&#xff1a;https://github.com/InternLM/Tutorial/blob/camp2/xtuner/readme.md 作业文档&#xff1a;https://github.com/InternLM/Tutorial/blob/camp2/xtuner/homework.md 书生浦语第二期实战营学习笔记&作业(四) 1.1、微调理论讲解及 XTuner 介绍 两种Fin…

C语言实现简单CRC校验

目录 一、实现题目 二、send模块 三、receive模块 四、运行截图 一、实现题目 二、send模块 #include <stdio.h> #include <string.h>// 执行模2除法&#xff0c;并计算出余数&#xff08;CRC校验码&#xff09; //dividend被除, divisor除数 void divide…

maya blendshape

目录 shape编辑器 maya创建blendshape python 脚本 添加形变动画 查看顶点个数 shape编辑器 打开方式&#xff1a; 窗口-动画编辑器-形变编辑器 maya创建blendshape python 脚本 import maya.cmds as cmds# 创建基础网格 - 球体 baseMesh cmds.polySphere(name"bas…

计算机技术的发展与未来趋势

引言 随着科技的飞速发展&#xff0c;计算机技术已成为现代社会不可或缺的一部分。从最初的简单计算工具到如今的超级计算机、云计算、人工智能等技术的广泛应用&#xff0c;计算机技术的变革深刻影响着我们的生活和工作方式。本文将探讨计算机技术的发展历程、当前的应用领域…

【Qt】.ui文件转.h文件

1、打开qt命令行 2、转换 uic -o ui.h mainwindow.ui

2024HW --->蓝队面试题

这段时间在写横向移动&#xff0c;搞得鸽了很久&#xff08;内网真的很玄学&#xff09; 还没写完。。。 但是这不是准备HW了吗。小编也来整理一下自己收集到的题目吧&#xff01;&#xff01;&#xff01; &#xff08;仅为个人见解&#xff0c;不代表最终答案&#xff09;&…

Android 12 Starting window的添加与移除

添加&#xff1a; 04-13 16:29:55.931 2944 7259 D jinyanmeistart: at com.android.server.wm.StartingSurfaceController.createSplashScreenStartingSurface(StartingSurfaceController.java:87) 04-13 16:29:55.931 2944 7259 D jinyanmeistart: at com.android.server.wm.…

STM32H7使用FileX库BUG,SD卡挂载失败

问题描述&#xff1a; 使用STM32H7ThreadXFileX&#xff0c;之前使用swissbit牌的存储卡可正常使用&#xff0c;最近项目用了金士顿的存储卡&#xff0c;发现无法挂载文件系统。 原因分析&#xff1a; 调试过程发现&#xff0c;关闭D-Cache可以挂载使用exfat文件系统。 File…

edu邮箱官方购买渠道手把手选购指南记录

教育优惠&#xff0c;是一项针对于在校大学生和教职员工推出的特殊优惠活动。一些公司会将旗下产品或服务以一定的折扣&#xff0c;甚至免费提供给高校师生。想想自己上大学的时候啥都不知道,毕业后才发现浪费了这么多优秀的资源.如果你还是一名在校大学生&#xff0c;那么就不…

小程序总结

第一章 微信小程序概述 1.1 认识微信小程序 简介&#xff1a;微信&#xff08;WeChat)是腾讯公司于2011年1月21日推出的一款为智能终端提供即时通信服务的应用程序&#xff0c;于2017年1月上线。 优势&#xff1a;1. 无需安装 2. 触手可及 3. 用完即走 4. 无需卸载…

日志框架整合SpringBoot保姆级教程+日志文件拆分(附源码)

目录 介绍 日志概述 日志文件 调试日志 系统日志 日志框架 日志框架的作用 日志框架的价值 流行的日志框架 SLF4J日志门面 介绍 环境搭建简单测试 集成log4j logback Logback简介 Logback中的组件 Logback配置文件 日志输出格式 控制台输出日志 输出日志到…