Redis队列Stream

news2025/1/10 16:54:40

1 缘起

项目中处理文件的场景:
将文件处理请求放入队列,
一方面,缓解服务器文件处理压力;
另一方面,可以根据文件大小拆分到不同的队列,提高文件处理效率。
这是Java开发组Leader佳汇提出的文件处理方案,非常实用。
从他那学习到之后,开始搜集Redis Stream相关的知识,整理成文,帮助开发者轻松应对知识交流和考核。

2 Redis Stream

Redis Stream是Redis 5.0.0版本新增的数据结构,想使用Stream需要Redis的最低版本是5.0
Stream是一个高性能、高可靠的消息队列,用于异步消息处理,就是传统的队列功能,完成流量削峰。Redis 5.0之前的版本就有提供队列功能,如列表、有序集合和Pub/Sub均可实现队列功能。既然Redis已经有了队列功能,为什么还要Stream这个数据结构呢?
按照正常的思考过程,新事物的出现,一般是为了解决旧事物的问题,或者,为了防止垄断,当然, 技术圈也遵循这个理论。

2.1 解决的问题

Stream的出现是为了解决原先队列存在的问题:
(1)Pub/Sub模式无法持久化消息,如果Redis网络异常或者宕机,消息会丢失;
(2)列表和有序集合的方式支持消息持久化,但是,不支持消息多播和分组消费。
Redis Stream一一解决了上面的问题,实现了消息持久化、消息多播和分组消费
Redis Stream既然是队列应用场景和其他队列一样:
(1)异步解耦;
(2)流量削峰;
不过Redis是基于内存的,如果是大量的消息数据建议选择其他消息队列,如RabbitMQ、Kafka、RocketMQ等。

2.2 架构

先来看一下Stream的总体架构:
在这里插入图片描述
Redis Stream有生产者、消费者和消费组,其中,
(1)消费组:有多个消费者,消费者之间是竞争关系,消费组中有一个last_delivered_id,消费组中的任意一消费者消费了消息,都会使last_delivered_id移动;
(2)消费者:消费者消费消息后,会产生pending_id,即消费者的状态变量,当消费者消费消息后,使用pending_ids记录被消费的消息,当客户端没有进行消费确认(ACK)时,pending_ids中的数据会一直增加,当客户端进行消息确认(ACK)后, 会移除pending_id。Redis官方称pending_ids为PEL(Pending Entries List),用于确保客户端至少消费一次消息,而不会在网络传输中丢失了处理。

2.3 数据结构

先从源码简单看下Stream相关的数据结构:

/* Stream item ID: a 128 bit number composed of a milliseconds time and
 * a sequence counter. IDs generated in the same millisecond (or in a past
 * millisecond if the clock jumped backward) will use the millisecond time
 * of the latest generated ID and an incremented sequence. */
typedef struct streamID {
    uint64_t ms;        /* Unix time in milliseconds. */
    uint64_t seq;       /* Sequence number. */
} streamID;

typedef struct stream {
    rax *rax;               /* The radix tree holding the stream. */
    uint64_t length;        /* Current number of elements inside this stream. */
    streamID last_id;       /* Zero if there are yet no items. */
    streamID first_id;      /* The first non-tombstone entry, zero if empty. */
    streamID max_deleted_entry_id;  /* The maximal ID that was deleted. */
    uint64_t entries_added; /* All time count of elements added. */
    rax *cgroups;           /* Consumer groups dictionary: name -> streamCG */
} stream;

由源码知,stream由Radix树和streamID类型的数据构成,
其中,streamID有两部分组成,ms和seq,ms即毫秒(10位),seq即序列号,
Stream中的每一条消息使用:{毫秒}-{序列号}唯一标识。

3 基础操作

3.1 新建数据:XADD

格式:

XADD key ID field value [field value ...]

参数:

XADD mystream-test * name xiaoyi age 10
XADD mystream-test * name xiaoer age 11

在这里插入图片描述

3.2 查询数据:XRANGE

格式:

XRANGE key start end [COUNT count]

参数:

参数描述
key队列名称
start起始ID标识
end结束ID标识
COUNT查询的条数

3.2.1 查询所有数据

XRANGE mystream-test - +

参数:
-:第一条数据
+:最后一条数据
使用- + 表示拆寻所有数据。
在这里插入图片描述

3.2.2 查询指定条数

XRANGE mystream-test - + COUNT 1

在这里插入图片描述

3.2.3 查询指定范围数据

XRANGE mystream-test 1697335440000-0 1697359922197-0

在这里插入图片描述

3.3 读取数据

格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

参数:

参数描述
COUNT返回的条数
BLOCK用于设置XREAD为阻塞模式,单位毫秒,默认为非阻塞模式。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。如果在这个时间内没有新的数据流入,那么输出(nil) (1.05s)

注:使用Block模式,配合 作为 I D ,表示读取最新的消息(在非阻塞模式 作为ID,表示读取最新的消息(在非阻塞模式 作为ID,表示读取最新的消息(在非阻塞模式无意义),若没有消息,命令阻塞!等待过程中,其他客户端向队列追加消息,则会立即读取到。

3.3.1 直接读取

XREAD STREAMS mystream-test 0

在这里插入图片描述

3.3.2 阻塞读取

XREAD BLOCK 4000 STRRAMS mystream-test $

在这里插入图片描述

3.3.3 非阻塞读取

XREAD STREAMS mystream-test 0

在这里插入图片描述

3.4 删除数据

格式:

XDEL key ID [ID ...]

参数:

参数描述
key队列名称
ID数据ID
XDEL mystream-test 1697376922916-0

在这里插入图片描述

3.5 消费组

3.5.1 创建消费组:XGROUP

格式:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

参数:

参数描述
CREATE创建消费组
key队列名称
groupname消费组名称
id接收指定ID之后的消息
$接收所有的消息
参数描述
DESTROY删除消费组
key队列名称
groupname消费组名称
参数描述
DELCONSUMER删除消费组中的消费者
key队列名称
groupname消费组组名称
consumername消费者名称
# 创建接收最新消息的消费组
XGROUP CREATE mystream-test mygroup-1 $
# 创建接收所有消息的消费组
XGROUP CREATE mystream-test mygroup-2 0

在这里插入图片描述

3.5.2 删除消费组

# 删除消费组
XGROUP DESTROY mystream-test mygroup-1
XGROUP DELCONSUMER mystream-test mygroup-2

3.5.3 消费组消费消息:XREADGROUP

格式:

 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
参数描述你
group消费组名称。
consumer消费者名称。
count要读取的数量。
milliseconds阻塞时间,以毫秒为单位。
key键指定的队列名称。
ID表示消息 ID。
XREADGROUP GROUP mygroup-1 myconsumer-1 COUNT 1 BLOCK 100000 STREAMS mystream-test >

在这里插入图片描述

3.6 查看等待确认状态:XPENDING

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

在这里插入图片描述

3.7 消费信息确认:XACK

格式:

XACK key group ID [ID ...]

参数:

参数描述
key队列名称
group消费组名称
ID消息ID
XACK mystream-test mygroup-1 1698558137966-0

在这里插入图片描述

3.8 查询信息:XINFO

格式:

XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

参数:
查询消费者信息

参数名称
CONSUMERS查询消费者名称
key消费者名称
groupname
查询消费组信息
参数名称
GROUPS查询消费组信息
key消费组名称
查询队列信息
参数名称
STREAM查询队列信息
key队列名称

3.8.1 查询队列信息

XINFO STREAM mystream-test

在这里插入图片描述

3.8.3 查询队列中的消费组

XINFO GROUPS mystream-test

在这里插入图片描述

3.8.4 查询队列消费组中的消费者

XINFO CONSUMERS mystream-test mygroup-1

在这里插入图片描述

4 小结

Stream的出现是为了解决原先队列存在的问题:
(1)Pub/Sub模式无法持久化消息,如果Redis网络异常或者宕机,消息会丢失;
(2)列表和有序集合的方式支持消息持久化,但是,不支持消息多播和分组消费。
Redis Stream一一解决了上面的问题,实现了消息持久化、消息多播和分组消费
Redis Stream既然是队列应用场景和其他队列一样:
(1)异步解耦;
(2)流量削峰;
不过Redis是基于内存的,如果是大量的消息数据建议选择其他消息队列,如RabbitMQ、Kafka、RocketMQ等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1147266.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

了解性能测试流程

性能测试概念 我们经常看到的性能测试概念,有人或称之为性能策略,或称之为性能方法,或称之为性能场景分类,大概可以看到性能测试、负载测试、压力测试、强度测试等一堆专有名词的解释。 针对这些概念,我不知道你看到…

【数据分析】上市公司半年报数据分析

前言 前文介绍过使用网络技术获取上市公司半年报数据的方法,本文将对获取到的数据进行简要的数据分析。 获取数据的代码介绍在下面的两篇文章中 【java爬虫】使用selenium获取某交易所公司半年报数据-CSDN博客 【java爬虫】公司半年报数据展示-CSDN博客 全量数…

【uniapp】JavaScript基础学习-20231027

今天有找到一个比较好的网站 https://www.w3school.com.cn/js/index.asp 介绍也全面,内容也比较多。我觉得把最基本的语法看看,然后可以上手写代码了。其他的就是需要靠长期的学习和积累了。 基础语法的使用: 1、定义一个变量 2、对变量赋值 …

拿到 phpMyAdmin 如何获取权限

文章目录 拿到 phpMyAdmin 如何获取权限1. outfile 写一句话木马2. general_log_file 写一句话木马 拿到 phpMyAdmin 如何获取权限 1. outfile 写一句话木马 尝试使用SQL注入写文件的方式&#xff0c;执行 outfile 语句写入一句话木马。 select "<?php eval($_REQU…

10.29数算小复习(选择题细节,二路归并,结构体排序)

排序、复杂度、细节&#xff08;选择题&#xff0c;判断题&#xff09; 对于一个已经排好序的序列&#xff0c;直接插入排序的复杂度是O(n)&#xff0c;而归并排序的复杂度是O(nlogn)。这时候归并排序就不比直接插入排序速度快了。 归并排序的最好、最坏、平均时间都是O(nlogn)…

【Spring】Spring MVC请求响应

文章目录 1. 请求1.1 传递单个参数1.2 传递多个参数1.3 传递对象1.4 后端参数重命名1.5 传递数组1.6 传递集合1.7 传递JSON对象1.8 获取URL中参数1.9 上传⽂件1.10 获得Cookie1.11 获得Session1.12 获得Header 2. 响应2.1 返回静态界面2.2 返回数据2.3 返回HTML代码片段2.4 返回…

微机原理:汇编语言程序设计

文章目录 一、汇编格式1、文字简述2、代码表述 二、汇编语言结构说明1、方式选择伪指令2、段定义语句3、段约定语句4、汇编结束语句5、返回DOS语句 三、实例1、例子2、汇编语言程序开发过程 四、功能调用DOS功能调用1、功能号01H2、功能号02H3、功能号09H4、功能号0AH5、举例 B…

操作系统——二级页表(王道视频p50)

1.总体概述&#xff1a; 2.二级页表的工作原理——如何实现一个逻辑地址到物理地址的转换 具体工作原理(有一个地方没有弄明白——就是到底是如何通过顶级页表找到 二级页表项的&#xff1f;)

el-input 给icon图标绑定点击事件

选择suffix-icon&#xff0c;添加点击事件 <temeplate><el-form-item :label"$t(company[Company address])" prop"address"><el-input v-model"enterpriseForm.address"><i slot"suffix" class"el-icon-m…

联邦学习与推荐系统

[Personalized Federated Recommendation via Joint Representation Learning, User Clustering, and Model Adaptation] (https://dl.acm.org/doi/abs/10.1145/3511808.3557668) CIKM2022(CCF-B) 论文精读 Abstract 联邦推荐的背景&#xff1a;联邦推荐使用联邦学习技术在推…

四十、【进阶】索引失效情况2

1、or的使用 在使用索引查询时&#xff0c;如果使用了or&#xff0c;会出现以下情况&#xff1a; &#xff08;情况一&#xff09;or左边是索引查询&#xff0c;or右边不是索引查询 结果&#xff1a;索引查询失效 &#xff08;情况二&#xff09;or左边不是索引查询&#x…

Java字节码技术

Java 字节码简介 Java 中的字节码&#xff0c;英文名为 bytecode, 是 Java 代码编译后的中间代码格式。JVM 需要读取并解析字节码才能执行相应的任务。 从技术人员的角度看&#xff0c;Java 字节码是 JVM 的指令集。JVM 加载字节码格式的 class 文件&#xff0c;校验之后通过 J…

B. Qingshan Loves Strings(贪心规律)

Problem - B - Codeforces 解析&#xff1a; 首先判断 t 字符串是不是相邻不同并且两端不同。 然后遍历 s 并且判断每一个相邻的相同字符&#xff0c;必须 t 字符符合并且两侧不同。 #include<bits/stdc.h> using namespace std; #define int long long const int N2e55…

私有云:【15】Composer安装无法使用cloudadmin进行下去

私有云&#xff1a;【15】Composer安装无法使用cloudadmin进行下去 1、Composer安装提示不支持windows授权2、这时候别退出3、稍微等待一会儿即可安装完成 1、Composer安装提示不支持windows授权 2、这时候别退出 上一步确定完之后&#xff0c;下一步让进行安装&#xff0c;不…

【软件测试02】测试方法

测试方法 学习目标&#xff1a; 1、能对穷举场景设计测试点---等价类划分法 2、能对限定边界规则设计测试点---边界值分析法 3、能对多条件依赖关系进行设计测试点---判定表法 4、能对项目业务进行设计测试点 一、等价类划分法 1、说明&#xff1a;在所有的测试数据中&am…

Spring更加简单的读取和存储对象

前言&#xff1a;在上篇文章中&#xff0c;小编写了一个Spring的创建和使用的相关博客&#xff1a;Spring的创建和使用-CSDN博客&#xff0c;但是&#xff0c;操作/思路比较麻烦&#xff0c;那么本文主要带领大家走进&#xff1a;Spring更加简单的读取和存储对象&#xff01; 本…

【数据结构】数组和字符串(十):稀疏矩阵的链接存储:十字链表的矩阵操作(加法、乘法、转置)

文章目录 4.2.1 矩阵的数组表示4.2.2 特殊矩阵的压缩存储a. 对角矩阵的压缩存储b~c. 三角、对称矩阵的压缩存储d. 稀疏矩阵的压缩存储——三元组表4.2.3三元组表的转置、加法、乘法、操作4.2.4十字链表0. 十字链表的基本操作1. 矩阵加法2. 矩阵乘法3. 矩阵转置4. 主函数 5. 代码…

linux 系统编程复习07-信号

1 复习目标 了解信号中的基本概念熟练使用信号相关的函数参考文档使用信号集操作相关函数熟练使用信号捕捉函数signal熟练使用信号捕捉函数sigaction熟练掌握使用信号完成子进程的回收 信号介绍 信号的概念 信号是信息的载体&#xff0c;Linux/UNIX 环境下&#xff0c;古老…

【C语言】优化通讯录管理系统

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家优化上一篇的通讯录&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 目录 一. 前言二. 动态通讯录2.1 通讯录结构体2.2 初始化通讯录2.3 增加联系人2.4 销毁通讯…

【需要理解】80 单词搜索

单词搜索 题解1 回溯&#xff08;需要改变起点&#xff09; 给定一个 m x n 二维字符网格 board 和一个字符串单词 word 。如果 word 存在于网格中&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 单词必须按照字母顺序&#xff0c;通过相邻的单元格内…