Redis - 消息队列 Stream

news2025/3/1 6:55:11

一、概述

消息队列

  1. 定义
    1. 消息队列模型:一种分布式系统中的消息传递方案,由消息队列、生产者和消费者组成
    2. 消息队列:负责存储和管理消息的中间件,也称为消息代理(Message Broker)
    3. 生产者:负责 产生并发送 消息到队列的应用程序
    4. 消费者:负责从队列 获取并处理 消息的应用程序
  2. 功能:实现消息发送和处理的解耦,支持异步通信,提高系统的可扩展性和可靠性
  3. 主流消息队列解决方案
    1. RabbitMQ:轻量级,支持多种协议,适合中小规模应用
    2. RocketMQ:阿里开源,高性能,适合大规模分布式应用

Stream

  1. 定义:Stream:Redis 5.0 引入的一种数据类型,用于处理高吞吐量的消息流、事件流等场景
  2. 功能:按时间顺序 ”添加、读取、消费“ 消息,支持消费者组、消息确认等功能

二、Stream 工作流程

  1. 写入消息
    1. 生产者通过 XADD 向 Stream 中添加消息。每条消息自动获得唯一的 ID,按时间顺序存入 Stream。
  2. 创建消费者组
    1. 如果使用消费者组,首先需要通过 XGROUP CREATE 创建消费者组。
    2. 消费者组会根据时间顺序将消息分配给组内的消费者。
  3. 读取消息
    1. 消费者使用 XREADGROUP 命令读取 Stream 中的消息。
    2. 消息按规则分配给不同消费者处理,每个消费者读取到不同的消息。
  4. 确认消息
    1. 消费者在处理完消息后,使用 XACK 命令确认消息,表示该消息已成功处理。
    2. 如果消息未确认(例如消费者崩溃或超时),它将保持在 Pending 状态,等待重新分配给其他消费者。
  5. 重新分配未确认消息
    1. 如果消息在一定时间内没有被确认,其他消费者可以读取未确认的消息并进行处理。
    2. 可通过 XPENDING 命令查看未确认消息,或在消费者组中设置时间阈值自动重新分配。
  6. 删除消费者组
    1. 不再需要消费者组时,使用 XGROUP DESTROY 命令删除消费者组

三、Stream 实现

消费者组模式

  1. 定义:Redis Streams 的一部分,用于处理消息的分布式消费
  2. 优点
    1. 消息分流:多消费者争抢消息,加快消费速度,避免消息堆积
    2. 消息标示:避免消息漏读,消费者读取消息后不马上销毁,加入 consumerGroup 维护的 pending list 队列等待 ACK
    3. 消息确认:通过消息 ACK 机制,保证消息至少被消费一次
    4. 可以阻塞读取,避免盲等
  3. 实现方法 :通过 Stream 数据类型实现消息队列,命令以 “X” 开头

常用命令

XGROUP CREATE key groupName ID [MKSTREAM]

  1. 功能:创建消费者组
  2. 参数
    1. key:队列名称
    2. groupName:组名称
    3. ID:起始 ID 标识,$ 表示队列中最后一个消息,0 表示队列中第一个消息
    4. MKSTREAM:队列不存在则创建队列

XGROUP DESTORY key groupName

  1. 功能:删除指定消费者组

XGROUP CREATECONSUMER key groupName consumerName

  1. 功能:添加组中消费者

XGROUP DELCONSUMER key groupName consumerName

  1. 功能:删除组中消费者

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]

  1. 功能:读取组中的消息
  2. gourp:消费者组名称
  3. consumer:消费者名称(不存在则自动创建)
  4. count:本次查询的最大数量
  5. BLOCK milliseconds:当没有消息时最长等待时间
  6. NOACK:无需手动 ACK,获取到消息后自动确认
  7. STREAMS KEY:指定队列名称
  8. ID:获取消息的起始 ID,> 表示从下一个未消费消息开始 (常用)

XPENDING key group [ [ IDLE min-idle-time ] start end count [consumer] ]

  1. 功能:获取 pending-list 中的消息
  2. IDLE:获取消息后、确认消息前的这段时间,空闲时间超过 min-idle-time 则取出
  3. start:获取的最小目标 ID
  4. end:获取的最大目标 ID
  5. count:获取的数量
  6. consumer:获取 consumer 的 pending-list

XACK key group ID [ ID … ]

  1. 功能:确认从组中读取的消息已被处理
  2. key:队列名称
  3. group:组名称
  4. ID:消息的 ID

表格版命令

  1. 命令

    命令功能
    XGROUP CREATE key groupName ID [MKSTREAM]创建消费者组
    XGROUP DESTORY key groupName删除指定消费者组
    XGROUP CREATECONSUMER key groupName consumerName添加组中消费者
    XGROUP DELCONSUMER key groupName consumerName删除组中消费者
    XREADGROUP GROUP groupName consumerName [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]读取组中的消息,ID 填写 “>” 则读取第一条未读消息
    XACK key group ID [ ID … ]确认从组中读取的消息已被处理
  2. 属性

    属性名定义
    key队列名称
    groupName消费者组名称
    ID起始 ID 标示,$ 代表队列中最后一个消息,0 代表第一个消息
    MKSTREAM队列不存在时自动创建队列
    BLOCK milliseconds没有消息时的最大等待时长
    NOACK无需手动 ACK,获取到消息后自动确认
    STREAMS key指定队列名称

运行逻辑

while(true) {
	// 尝试监听队列,使用阻塞模式,最长等待 2000 ms
	Object msg = redis.call("XREADGROUP GROUP group1 consumer1 COUNT 1 BLOCK 2000 STREAMS s1 >");
	if(msg == null) {
		continue;
	}
	try {
		// 处理消息,完成后一定要 ACK
		handleMessage(msg);
	} catch (Exception e) {
		while(true) {
			// 重新读取阻塞队列消息
			Object msg = redis.call("XREADGROUP GROUP group1 consumer1 COUNT 1 STREAM S1 0");
			if(msg == null)               // 如果阻塞队中的消息已经全部处理则退出pending-list
				break;
			try {
				handleMessage(msg);    			// 重新处理 pending-list 中的消息
			} catch (Exception e){
				continue;                   // 如果还出错, 则继续重新读取
			}
		}
	}
}

四、示例

  1. 目标:消息队列实现数据库异步修改数据库,将下单 message 缓存在 redis 中,减小下单操作对数据库的冲击

  2. 项目结构

    1. RedisConfig 配置类:创建消费者组是一次性的操作,适合放在配置类中
    2. VoucherOrderHandler 内部类:消费者的逻辑和订单业务相关,因此适合放在 VoucherOrderServiceImpl 中
    3. 多线程启动逻辑:消费者线程的启动与订单业务密切相关,直接放在 VoucherOrderServiceImpl 类中更符合职责分离原则
    src/main/java
    ├── com/example
    │   ├── config
    │   │   └── RedisConfig.java                  // Redis 配置类,包含消费者组初始化
    │   ├── service
    │   │   ├── VoucherOrderService.java
    │   │   └── impl
    │   │       └── VoucherOrderServiceImpl.java  // 包含 VoucherOrderHandler 内部类
    │   ├── entity
    │   │   └── VoucherOrder.java                 // 优惠券订单实体
    │   ├── utils
    │   │   └── BeanUtil.java                     // 用于 Map 转 Bean 的工具类
    │   └── controller
    │       └── VoucherOrderController.java       // 如果有 Controller
    
  3. 创建消费者组(config.RedisConfig)

    @Bean
    public void initStreamGroup() {
        // 检查是否存在消费者组 g1
        try {
            stringRedisTemplate.opsForStream().createGroup("stream.orders", "g1");
        } catch (RedisSystemException e) {
            // 如果 group 已存在,抛出异常,可忽略
            log.warn("消费者组 g1 已存在");
        }
    }
    
  4. 创建消费者线程

    1. 位置:作为 VoucherOrderServiceImpl 内的预构造部分
    @PostConstruct
    public void startConsumers() {
        for (int i = 0; i < 5; i++) { // 5 个线程,模拟多个消费者
            new Thread(new VoucherOrderHandler()).start();
        }
    }
    
  5. 添加消息到消息队列 (src/main/resources/lua/SECKILL_SCRIPT.lua)

    --1. 参数列表
    --1.1. 优惠券id
    local voucherId = ARGV[1]
    --1.2. 用户id
    local userId = ARGV[2]
    --1.3. 订单id
    local orderId = ARGV[3]
    
    --2. 数据key
    local stockKey = 'seckill:stock:' .. voucherId          --2.1. 库存key
    local orderKey = 'seckill:order' .. voucherId           --2.2. 订单key
    
    --3. 脚本业务
    --3.1. 判断库存是否充足 get stockKey
    if( tonumber( redis.call('GET', stockKey) ) <= 0 ) then
    	return 1
    end
    --3.2. 判断用户是否重复下单 SISMEMBER orderKey userId
    if( redis.call( 'SISMEMBER', orderKey, userId ) == 1 ) then
    	return 2
    end
    --3.4 扣库存 incrby stockKey -1
    redis.call( 'INCRBY', stockKey, -1 )
    --3.5 下单(保存用户) sadd orderKey userId
    redis.call( 'SADD', orderKey, userId )
    -- 3.6. 发送消息到队列中
    redis.call( 'XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId )
    
  6. 创建消费者类(ServiceImpl)

    1. 位置:作为 VoucherOrderServiceImpl 内的私有类
    // 在ServiceImpl中创建一个VoucherOrderHandler消费者类,专门用于处理消息队列中的消息
    private class VoucherOrderHandler implements Runnable {
    
    	@Override
    	public void run() {
    	
    		while (true) {
    			try {
    				// 1. 获取消息队列中的订单信息
    				List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
    										Consumer.from("g1", "c1"),
    										StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
    										StreamOffset.create( "stream.order", ReadOffset.lastConsumed())
    				);
    				// 2. 没有消息则重新监听
    				if (list == null || list.isEmpty() ) continue;
    				
    				// 3. 获取消息中的 voucherOrder
    				MapRecord<String, Object, Object> record = list.get(0);
    				Map<Object, Object> value = record.getValue();
    				VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
    				
    				// 4. 创建订单
    				createVoucherOrder(voucherOrder);
    				
    				// 5. 确认当前消息已消费 XACK
    				stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
    			} catch ( Exception e) {
    				log.error("处理订单异常", e);
    				// 6. 处理订单失败则消息会加入pending-list,继续处理pending-list
    				handlePendingList();
    			}
    		}
    	}
    	
    	
    	// 处理pending-list中的消息
    	private void handlePendingList() {
    	
    		while(true) {
    			try {
    				// 1. 消费pending-list中的消息
    				List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
    					Consumer.from("g1", "c1"),                                   // 消费者此消息的消费者
    					StreamReadOptions.empty().count(1),                          // 
    					StreamOffset.create("stream.order", ReadOffset.from("0"))     // 从pending-list的第一条消息开始读
    				);
    				// 2. 退出条件, list 为空 -> pending-list 已全部处理
    				if(list == null || list.isEmpty()) break;
    				
    				// 3. 获取消息中的 voucherOrder
    				MapRecord<String, Object, Object> record = list.get(0);
    				Map<Object, Object> value = record.getValue();
    				VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
    				
    				// 4. 创建订单
    				createVoucherOrder(voucherOrder);
    				
    				// 5. 确认消息已消费(XACK)
    				stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
    			} catch (Exception e) {
    				log.error("处理pendding订单异常", e);
    				try{
    					Thread.sleep(20);     // 如果发生异常则休眠一会再重新消费pending-list中的消息
    				} catch (Exception e2) {
    					e.printStackTrace(); 
    				}
    			}
    		}
    	}
    }
    
  7. 创建消息方法

    1. 目标:用户通过这个方法发送一条创建订单的 Message 给 Redis Stream
    // 创建Lua脚本对象
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    
    // Lua脚本初始化 (通过静态代码块)
    static {
    		SECKILL_SCRIPT = new DefaultRedisScript<>();
    		SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/SECKILL_SCRIPT.lua"));
    		SECKILL_SCRIPT.setResultType(Long.class);
    }
    
    @Override
    public void createVoucherOrder(Long voucherId, Long userId) {
        // 生成订单 ID(模拟)
        long orderId = System.currentTimeMillis();
    
        // 执行 Lua 脚本
        Long result = stringRedisTemplate.execute(
            SECKILL_SCRIPT,
            Collections.emptyList(),                    // 使用空的 key 列表
            voucherId.toString(), userId.toString(), String.valueOf(orderId)
        );
    
        // 根据 Lua 脚本返回结果处理
        if (result == 1) {
            throw new RuntimeException("库存不足!");
        } else if (result == 2) {
            throw new RuntimeException("不能重复下单!");
        }
        // 如果脚本执行成功,则订单消息会进入 Redis Stream,消费者组会自动处理
        System.out.println("订单创建成功!");
    }
    

(缺陷) 单消费者模式

  1. 常用命令
    1. XADD key [NOMKSTREAM] [MAXLEN | MINID [=|~] threshold [LIMIT count] * | ID field value [field value …]
    2. XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ ID … ]
  2. 缺陷:有消息漏读风险

五、其他消息队列方案

(缺陷) List 实现

  1. 优点
    1. 不受 JVM 内存上限限制:因为利用 Redis 存储
    2. 数据安全 :因为基于 List 结构本身是数据存储,基于 Redis 持久化机制
    3. 消息有序性:通过 List 结构的 LPUSH & BRPOP 命令实现顺序
  2. 缺点
    1. 消息丢失:BRPOP 的时候如果宕机则消息会丢失
    2. 只支持单消费者

(缺陷) PubSub 实现

  1. 定义
    1. Publish & Subscribe 模型,一种消息队列模型
    2. 生产者向指定的 channel 来 public 消息
    3. 消费者从 subscribe 的 channel 中接收消息
  2. 功能:支持多消费者模式,多个消费者可以同时 subscribe 一个 channel
  3. 优点:采用发布订阅模型,支持多生产者、消费者
  4. 缺点
    1. 不支持数据持久化
    2. 无法避免消息丢失
    3. 消息堆积有上限,超出时数据丢失

三种消息队列对比


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2259622.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

3D 生成重建034-NerfDiff借助扩散模型直接生成nerf

3D 生成重建034-NerfDiff借助扩散模型直接生成nerf 文章目录 0 论文工作1 论文方法2 实验结果 0 论文工作 感觉这个论文可能能shapE差不多同时期工作&#xff0c;但是shapE是生成任意种类。 本文提出了一种新颖的单图像视图合成方法NerfDiff&#xff0c;该方法利用神经辐射场 …

聊聊Oracle自适应查询优化

成也AQO败也AQO 因为工作的原因&#xff0c;我们接触到的客户大部分是金融和运营商行业&#xff0c;这些客户有个最大的特点是追求稳定&#xff0c;对于使用数据库新特性持保守的态度&#xff0c;不会轻易尝试某些可能会导致生产系统不稳定的新特性。上线前通常都会将一些新特…

【数据库】选择题+填空+简答

1.关于冗余数据的叙述中&#xff0c;不正确的是&#xff08;&#xff09; A.冗余的存在容易破坏数据库的完整新 B.冗余的存在给数据库的维护增加困难 C.不应该在数据库中存储任何冗余数据 D.冗余数据是指由基本数据导出的数据 C 2.最终用户使用的数据视图称为&#xff08;&…

Comparator.comparing 排序注意

1. 对数字型字符串排序 List<String> values new ArrayList<>();values.add("10");values.add("6");values.add("20");values.add("30");values.add("50");//方法1 &#xff08;正确的排序方法&#xff09;//倒…

R语言的数据结构-矩阵

【图书推荐】《R语言医学数据分析实践》-CSDN博客 《R语言医学数据分析实践 李丹 宋立桓 蔡伟祺 清华大学出版社9787302673484》【摘要 书评 试读】- 京东图书 (jd.com) R语言医学数据分析实践-R语言的数据结构-CSDN博客 矩阵是一个二维数组&#xff0c;矩阵中的元素都具有相…

动态分区存储管理

一、实验目的 目的&#xff1a;熟悉并掌握动态分区分配的各种算法&#xff0c;熟悉并掌握动态分区中分区回收的各种情况&#xff0c;并能够实现分区合并。 任务&#xff1a;用高级语言模拟实现动态分区存储管理。 二、实验内容 1、实验内容 分区分配算法至少实现首次适应算法、…

JPG 转 PDF:免费好用的在线图片转 PDF 工具

JPG 转 PDF&#xff1a;免费好用的在线图片转 PDF 工具 在日常工作和生活中&#xff0c;我们经常需要将图片转换为 PDF 格式。无论是制作电子文档、准备演示材料&#xff0c;还是整理照片集&#xff0c;将图片转换为 PDF 都是一个常见的需求。今天为大家介绍一款完全免费、无需…

10、C++继承2

本章介绍菱形继承、虚继承和类型适应。 1、菱形继承与虚继承 在多继承中&#xff0c;可能会发生多个父类共基类的问题&#xff0c;即菱形继承。 例如&#xff1a; 解决办法&#xff1a; 父类继承基类时使用虚继承&#xff0c;在继承前加virtual&#xff0c;即&#xff1a;虚…

探索Telnet:实现Windows远程登录Ubuntu的实践指南

前言 在互联网技术日新月异的今天&#xff0c;远程登录已经成为许多开发者和系统管理员日常工作中不可或缺的一部分。虽然SSH已经成为远程登录的首选协议&#xff0c;但了解并掌握Telnet这一经典协议仍然具有重要意义。本文将带您一起探索如何使用Telnet实现Windows远程登录Ub…

字符串的常见操作【C语言】

一、案例内容 字符串的常见操作主要有创建&#xff0c;输出&#xff0c;查找指定字符或查找指定位置的字符、删除指定字符或删除指定位置的字符&#xff0c;在指定位置插入指定字符等。 二、案例代码 #include <stdio.h> #include <stdlib.h> #include <stri…

axios的引入和基本使用

一、axios的引入 使用 pnpm add axios 二、使用axios 三、axios的使用方法补充 axios除了直接使用它实例上的方法&#xff0c;还可以通过配置的方式进行使用axios({})&#xff0c;传入一个对象&#xff0c;这个对象可以有如下属性&#xff1a; url&#xff08;字符串&#…

Dart 3.6 发布,workspace 和 Digit separators

workspace 之前我们就聊过 Flutter 正在切换成 Monorepo 和支持 workspaces &#xff0c;Dart 3.6 开始&#xff0c;Pub 现在正式支持 monorepo 或 workspace 中 package 之间的共享解析。 pub workspaces 功能可确保 monorepo 中的 package 共享一组一致的依赖项&#xff0c…

三、nginx实现lnmp+discuz论坛

lnmp l&#xff1a;linux操作系统 n&#xff1a;nginx前端页面 m&#xff1a;mysql数据库&#xff0c;账号密码&#xff0c;数据库等等都保存在这个数据库里面 p&#xff1a;php——nginx擅长处理的是静态页面&#xff0c;页面登录账户&#xff0c;需要请求到数据库&#…

【MFC】如何读取rtf文件并进行展示

tf是微软的一个带格式的文件&#xff0c;比word简单&#xff0c;我们可以用写字板等程序打开编辑。下面以具体实例讲解如何在自己程序中展示rtf文件。 首先使用VS2022创建一个MFC的工程。 VIEW类需要选择richview类&#xff0c;用于展示&#xff0c;如下图&#xff1a; 运行效…

AudioSegment 将音频分割为指定长度时间片段 - python 实现

DataBall 助力快速掌握数据集的信息和使用方式&#xff0c;会员享有 百种数据集&#xff0c;持续增加中。 需要更多数据资源和技术解决方案&#xff0c;知识星球&#xff1a; “DataBall - X 数据球(free)” -------------------------------------------------------------…

【新人系列】Python 入门(十六):正则表达式

✍ 个人博客&#xff1a;https://blog.csdn.net/Newin2020?typeblog &#x1f4dd; 专栏地址&#xff1a;https://blog.csdn.net/newin2020/category_12801353.html &#x1f4e3; 专栏定位&#xff1a;为 0 基础刚入门 Python 的小伙伴提供详细的讲解&#xff0c;也欢迎大佬们…

Leetcode 每日一题9.回文数

&#x1f308;&#x1f308;&#x1f308;今天给大家分享的是:回文数的解法 目录 ​编辑 问题描述 输入输出格式 示例 约束条件 进阶挑战 解决方案 问题分析 过题图片 字符串转换法 数学方法 算法解释 题目链接 结论 问题描述 给定一个整数 x&#xff0c;我们需要…

Python Turtle 实现动态时钟:十二时辰与星空流星效果

在这篇文章中&#xff0c;我将带你通过 Python 的 turtle 模块构建一个动态可视化时钟程序。这个时钟不仅具备传统的时间显示功能&#xff0c;还融合了中国古代的十二时辰与八卦符号&#xff0c;并通过动态星空、流星效果与昼夜背景切换&#xff0c;为程序增添了观赏性和文化内…

建筑电气火灾是怎么发生的?如何降低电气火灾风险?

电气火灾一直是建筑火灾的主要诱因&#xff0c;占总火灾比例的28.4%。传统的末端配电监控手段存在覆盖范围不足、反应时间滞后等问题&#xff0c;难以及时发现并应对潜在的电气安全隐患。本文提出了一种基于通讯模块、智能微断和智能终端的建筑末端配电解决方案&#xff0c;通过…

Go有限状态机实现和实战

Go有限状态机实现和实战 有限状态机 什么是状态机 有限状态机&#xff08;Finite State Machine, FSM&#xff09;是一种用于建模系统行为的计算模型&#xff0c;它包含有限数量的状态&#xff0c;并通过事件或条件实现状态之间的转换。FSM的状态数量是有限的&#xff0c;因此称…