Redis做消息队列实现异步读写看这篇够了!

news2024/11/18 21:35:38

一、消息队列的简介

在企业的应用中,发送消息方和接收消息方,可以采取同步通信或异步通信。同步通信在实际的应用中效率不高。本文主要介绍异步通信,其中异步通信分为:第一,基于内存的jvm阻塞队列实现异步通信。这种方式面临的问题是:内存空间有限,导致内存泄漏;因为内存的消息泄漏导致敏感信息有泄漏的风险,造成数据安全性问题。第二,基于Redis实现的消息队列,可以解决上述两类问题。

redis在应用中实现消息队列的方式

  1. Redis 列表list结构是按插入顺序排序的字符串列表。
  2. Redis 发布/订阅是一种消息传模式,其中发送者(在Redis术语中称为发布者)发送消息,而接收者(订阅者)接收消息。传递消息的通道称为channel。发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。
  3. Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

MQ的基础概念

生产者:发送消息。消息队列:提供了FIFO的处理机制,具有缓存消息的能力。消费者:接受消息。

Redis 列表list结构实现消息队列

消费者:阻塞获取
brpop l1 30
生产者:
lpush l1 a b c

这种方式导致消息丢失,只适合单消费模式。

Redis 发布/订阅是一种消息传模式

消费者:
psubscribe order.* order.queue1
subscribe order.queue1
生产者:
publish order.queue1 msg1
publish order.* msg1

这种方式导致数据无法持久化、消息丢失、消息堆积有上限。

基于Stream的消息队列---单消费模式

Redis Streams tutorial | Redis 官网

发送消息命令:

 接收消息命令:

 基于Stream的消息队列----消费者组(Consumer groups)

创建消费者组命令:

 从消费组里读取消息命令:

确认消费组消息命令:

 常见的消费组的命令:

# 自动创建消费者组命令
XGROUP CREATE newstream mygroup $ MKSTREAM
# 从消费组读取消息数据 
XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
XREADGROUP GROUP mygroup Alice STREAMS mystream 0
# 确认消费组已经处理的消息
XACK mystream mygroup 1526569495631-0
# 给定的消费组待挂消息
XPENDING mystream mygroup - + 10

详细的Stream的消费组命令参考:https://redis.io/docs/data-types/streams-tutorial/#consumer-groups

二、基于Linux的redis的消息队列的测试

【Stream结构的redis单消费模式】

生产者发送消息:

 消费者1接收消息:

 消费者2接收消息:

【Stream结构的redis消费组模式】 

生产者发送消息:

 XGROUP CREATE s1 g1 $ MKSTREAM

消费者1接收消息:

XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >

消费者2接收消息:

XREADGROUP GROUP g1 c2 COUNT 1 BLOCK 2000 STREAMS s1 >

消费者确认消息:

XACK s1 g1 1683084625732-0

三、SpringBoot整合Redis用Stream结构做消息队列

需求场景:1、创建一个stream类型的消息队列(stream.orders)。2、修改下订单的Lua脚本,向消息队列中添加消息。3、项目启动,开启线程任务,尝试获取消息队列中的消息,完成下单。

【1.创建消息队列】 

# 创建一个stream类型的消息队列(stream.orders)
XGROUP CREATE stream.orders g1 0 MKSTREAM

【2.向消息队列发送消息】

-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)

【3.从线程池获取线程任务接收消息】

//初始化线程池
    private static final ExecutorService seckill_order_executor = Executors.newSingleThreadExecutor();

    @PostConstruct
    private void init() {
        seckill_order_executor.submit(new VoucherOrderHandler());
    }

    //创建线程任务
    private class VoucherOrderHandler implements Runnable {

        //从消息队列中获取线程任务
        @Override
        public void run() {
            while (true){

                try {
                    //1.接收消息队列的订单信息 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]
                    //  [NOACK] STREAMS key [key ...] id [id ...]
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                    );
                    //2.判断获取消息是否成功
                    if (list == null || list.isEmpty()){
                        //如果失败,进行下一次
                        continue;
                    }
                    //解析消息的订单
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    //3.获取消息成功,创建订单
                    handlerVoucherOrder(voucherOrder);
                    //4. 确认消息  XACK key group id [id ...]
                    stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理订单异常");
                    handlerPendinglist();
                }
            }
        }

【4. 使用消息队列和业务整合】

private IVoucherOrderService proxy;

    //消息队列进行下单
    @Override
    public Result seckillVoucherOrder(Long voucherId) {
        //获取用户
        Long userId = UserHolder.getUser().getId();
        //订单id
        long orderId = redisWork.nextId(SECKILL_ORDER_KEY);

        //1.执行Lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString(), String.valueOf(orderId)
        );
        //2.判断执行的结果是否为0
        int r = result.intValue();
        if (r != 0){
            //2.1 不是0,没有抢购资格
            return Result.fail(r == 1 ? "库存不足" : "不允许重复下单");
        }
        //aop获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();
        return Result.ok(orderId);
    }

【5.测试】

 详细的和业务整合代码实现参考链接:https://gitee.com/hfnu_112/springboot_04_dianping

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/485284.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android Studio Electric Eel 2022.1.1 Patch 2 导入opencv 4.5,并实现图片灰度变换和图片叠加

软件版本&#xff1a; Android Studio Electric Eel 2022.1.1 Patch 2 https://sourceforge.net/projects/opencvlibrary/files/4.5.0/opencv-4.5.0-android-sdk.zip/download 创建工程 with API23: 导入opencv sdk: File->New->Import Module 添加工程依赖&…

MySQL的概念、编译安装

一.数据库的基本概念 1、数据&#xff08;Data&#xff09; • 描述事物的符号记录 • 包括数字&#xff0c;文字&#xff0c;图形&#xff0c;图像&#xff0c;声音&#xff0c;档案记录等 • 以“记录”形式按统一的格式进行存储 2、表 • 将不同的记录组织在一起 • …

( 数组和矩阵) 287. 寻找重复数 ——【Leetcode每日一题】

❓287. 寻找重复数 难度&#xff1a;中等 给定一个包含 n 1 个整数的数组 nums &#xff0c;其数字都在 [1, n] 范围内&#xff08;包括 1 和 n&#xff09;&#xff0c;可知至少存在一个重复的整数。 假设 nums 只有 一个重复的整数 &#xff0c;返回 这个重复的数 。 你…

基于樽海鞘算法的极限学习机(ELM)回归预测-附代码

基于樽海鞘算法的极限学习机(ELM)回归预测 文章目录 基于樽海鞘算法的极限学习机(ELM)回归预测1.极限学习机原理概述2.ELM学习算法3.回归问题数据处理4.基于樽海鞘算法优化的ELM5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;本文利用樽海鞘算法对极限学习机进行优化&…

TouchGFX界面开发 | 使用STM32CubeMX移植TouchGFX

本文基于STM32F429IGT6 RGB (800 * 480)硬件平台&#xff0c;详细记录了如何利用STM32CubeMX将TouchGFX移植到STM32F429IGT6&#xff0c;并驱动RGB屏幕。相关软件的安装&#xff0c;可参考TouchGFX软件安装一文 TouchGFX的应用框架如下图所示&#xff1a; 一、STM32CubeMX配…

JoJo‘s Incredible Adventures

题目&#xff1a; 题意解析&#xff1a; 这个题目是要求找出输入的字符串&#xff0c;&#xff0c;字符串的循环移位s由k右边是字符串Sn−k1...Sn&#xff0c;S1&#xff0c;S2...Sn−k。直到所有的字符&#xff0c;都循坏出现在字符串的开头&#xff0c;然后输入1形成的长方形…

MySQL之Adaptive Hash Index详解

前言 本文已收录在MySQL性能优化原理实战专栏&#xff0c;点击此处浏览更多优质内容。 目录 一、MySQL InnoDB是否支持哈希索引&#xff1f;1.1 InnoDB不支持Hash Index1.2 InnoDB支持Hash Index 二、Adaptive Hash Index的概念三、涉及Adaptive Hash Index的参数3.1 innodb_ad…

接口优化的策略

1.批处理 批量思想&#xff1a;批量操作数据库&#xff0c;这个很好理解&#xff0c;我们在循环插入场景的接口中&#xff0c;可以在批处理执行完成后一次性插入或更新数据库&#xff0c;避免多次IO。 //批量入库 batchInsert();List的安全操作有以下几种方式&#xff1a; 使…

DAY 50 LVS负载均衡器 NAT模式

群集概述 群集的含义 Cluster&#xff0c;集群、群集由多台主机构成&#xff0c;但对外只表现为一一个整体&#xff0c;只提供一-个访问入口(域名或IP地址)&#xff0c; 相当于一台大型计算机。 为什么使用群集 互联网应用中&#xff0c;随着站点对硬件性能、响应速度、服务…

Python学习日记(2)

有关数字类型&#xff0c;字符串&#xff0c;函数 目录 有关数字类型&#xff0c;字符串&#xff0c;函数 数字 字符串 索引操作 切片操作 单个字符编码 运算符 还有一些常用的内置函数 Python输入函数 输出函数print()语法 python的函数也能给默认值 Python是个脚…

C++系列一: C++简介

C入门简介 1. C语言的特点2. C编译器3. 第一个 C 程序4. 总结&#xff08;手稿版&#xff09; C 是一种高级编程语言&#xff0c;是C语言的扩展和改进版本&#xff0c;由Bjarne Stroustrup于1983年在贝尔实验室为了支持C语言中的面向对象编程而创建。C 既能够进行底层的系统编程…

全注解下的SpringIoc 续4-条件装配bean

Spring Boot默认启动时会加载bean&#xff0c;如果加载失败&#xff0c;则应用就会启动失败。但是部分场景下&#xff0c;我们希望某个bean只有满足一定的条件下&#xff0c;才允许Spring Boot加载&#xff0c;所以&#xff0c;这里就需要使用Conditional注解来协助我们达到这样…

二叉搜索树(BST)详解

文章目录 性质二叉搜索树的遍历遍历伪代码实现 二叉搜索树的查找伪代码实现 二叉搜索树最大元素伪代码实现 二叉搜索树最小元素伪代码实现 二叉搜索树的插入伪代码实现 二叉搜索树的删除删除叶子节点&#xff08;对应上面第一种情况&#xff09;&#xff1a;删除度为1的节点&am…

机械硬盘(HDD)与固态硬盘(SSD)

目录 机械硬盘&#xff08;HDD&#xff09; 最小组成单元是扇区 硬盘结构 硬盘工作原理 硬盘上的数据组织 硬盘指标 影响性能的因素 固态硬盘&#xff08;SSD&#xff09; 最小存储单元是Cell SSD的特点 SSD架构 NAND Flash 闪存介质 地址映射管理 FTL闪存转换层 机械硬盘&…

Python之模块和包(九)

1、模块 1、模块概述 模块是一个包含了定义的函数和变量等的文件。模块可以被程序引入&#xff0c;以使用该模块中的函数等功能。通俗讲&#xff1a;模块就好比是工具包&#xff0c;要想使用这个工具包中的工具(就好比函数)&#xff0c;就需要导入这个模块。 2、import 在P…

Redis分布式锁原理之实现秒杀抢优惠卷业务

Redis分布式锁原理之实现秒杀抢优惠卷业务 1. 实现秒杀下单2. 库存超卖问题分析2.1 乐观锁解决超卖问题 3. 优惠券秒杀-一人一单3.1 集群环境下的并发问题 4、分布式锁4.1 基本原理和实现方式对比4.2 Redis分布式锁的实现核心思路4.3 实现分布式锁版本一4.4 Redis分布式锁误删情…

【Java入门合集】第三章面向对象编程(上)

【Java入门合集】第三章面向对象编程&#xff08;上&#xff09; 博主&#xff1a;命运之光 专栏&#xff1a;JAVA入门 理解面向对象三大主要特征&#xff1b; 掌握类与对象的区别与使用&#xff1b; 掌握类中构造方法以及构造方法重载的概念及使用&#xff1b; 掌握包的定义、…

国民技术N32G430开发笔记(14)-IAP升级 usart2接收数据

IAP升级 Usart2接收数据 1、之前有一节我们将PA6 PA7复用成了usart2的功能&#xff0c;这一节我们用usart2接收来自树莓派的升级请求&#xff0c;然后完成N32G430的Iap升级。 2、接线 PA9 PA10 接usb转串口模块A&#xff0c;A模块插入电脑。 PA6 PA7 接usb转串口模块B&#xf…

【移动端网页布局】流式布局案例 ⑥ ( 多排按钮导航栏 | 设置浮动及宽度 | 设置图片样式 | 设置文本 )

文章目录 一、多排按钮导航栏样式及核心要点1、实现效果2、总体布局设计3、设置浮动及宽度4、设置图片样式5、设置文本 二、完整代码实例1、HTML 标签结构2、CSS 样式3、展示效果 一、多排按钮导航栏样式及核心要点 1、实现效果 要实现下面的导航栏效果 ; 2、总体布局设计 该导…

计算机网络笔记:DNS域名解析过程

基本概念 DNS是域名系统&#xff08;Domain Name System&#xff09;的缩写&#xff0c;也是TCP/IP网络中的一个协议。在Internet上域名与IP地址之间是一一对应的&#xff0c;域名虽然便于人们记忆&#xff0c;但计算机之间只能互相认识IP地址&#xff0c;域名和IP地址之间的转…