【Redis7】10大数据类型之Stream类型

news2024/12/26 13:57:36

文章目录

  • 1. Stream简介
  • 2. 生产消息命令(XADD)
  • 3. 查询相关命令
    • 3.1 获取指定范围内的消息(XRANGE)
    • 3.2 逆序获取指定范围内的消息(XREVRANGE)
    • 3.3 返回消息的数量(XLEN)
  • 4. 删除消息命令(XDEL)
  • 5. 截取消息命令(XTRIM)
  • 6. 消费消息命令(XREAD)
  • 7. 消费者组管理命令
    • 7.1 创建消费者组(XGROUP)
    • 7.2 在消费者组中读取消息(XREADGROUP)
    • 7.3 查询消费者组的信息(XPENDING)
    • 7.4 确认消费者成功处理了消息(XACK)

1. Stream简介

Redis Stream 是在Redis 5.0版本中引入的一种新的数据结构,它主要用于实时数据处理场景,如消息队列、日志记录和实时数据分析等。Stream的设计灵感部分来源于消息队列系统,如Kafka,但它提供了更直接集成到Redis生态系统中的能力。

简单来说:Stream就是消息中间件+阻塞队列

Redis Stream 类型本身设计时就充分考虑了生产者消费者模型的需求。它不仅包含了这一模型,还对其进行了优化和扩展,以便更好地适应现代分布式系统中的消息传递场景。

  • 生产者:任何可以向Redis Stream写入消息的客户端都可以视为生产者。生产者使用XADD命令向Stream中添加消息,每个消息都附有一个全局唯一的ID,确保消息的顺序性和可追踪性。
  • 消费者:消费者使用XREADXREADGROUP命令从Stream中读取消息。特别是XREADGROUP命令,它支持消费者组(Consumer Group)的概念,这是生产者消费者模型中的一个重要组成部分。消费者组让多个消费者可以协作处理Stream中的消息,同时保证了消息不会被重复处理。
  • 消费者组:Stream支持消费者组,组内的消费者共享消息,但每个消息只由组内的一个消费者处理,从而实现了消息的有序和公平分配。消费者组还允许消费者在处理失败时重新分配消息,以及通过XACK命令确认消息已成功处理,实现消息的确认机制。
  • 消息持久化与顺序性:Stream中的消息是持久化的,确保了即使在Redis服务器重启后,消息也不会丢失。同时,Stream保持消息的严格顺序,这对于某些依赖消息顺序的应用场景至关重要。
  • 阻塞读取与自定义读取策略:消费者可以选择阻塞读取模式,这意味着当没有新消息时,消费者会等待直至新消息到达。此外,还可以通过参数定制读取的起始位置、消息数量或时间范围,提供了高度的灵活性。

Stream类型主要特点:

  1. 有序性:Stream中的消息按照ID排序,每个消息都有一个全局唯一的ID,确保了消息的顺序。
  2. 持久化:Stream中的数据是持久化的,即使Redis服务器重启,消息也不会丢失。
  3. 多播与分组消费:支持多个消费者同时消费同一流中的消息,而且可以将消费者组织成消费组,实现消息的分组消费,每个消息可以被一个或多个组消费,但组内每个消息只会被其中一个消费者消费(类似于Kafka的分区消费者模型)。
  4. 灵活的数据结构:每条消息可以包含多个字段(field-value对),提供了高度的灵活性来携带复杂的数据。
  5. 消费者进度跟踪:消费者可以在读取消息时自动追踪自己的消费进度,Redis使用Last Seen Displacement (LSD) 来跟踪消费者的读取位置。
  6. 读取控制:支持多种读取模式,包括从特定消息ID读取、读取最近的N个消息、读取某个时间范围内的消息等。
  7. 阻塞读取:可以使用XREADXREADGROUP命令以阻塞的方式等待新消息,直到有新消息到达或超时。

Stream数据结构图:

一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容

image-20240512222700120

内容含义
Message Content消息内容
Consumer group消费组,通过XGROUP CREATE 命令创建,同一个消费组可以有多个消费者
Last_delivered_id游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
Consumer消费者,消费组中的消费者
Pending_ids消费者会有一个状态变量,用于记录被当前消费已读取但未ack的消息Id,如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack它就开始减少。这个pending_ids变量在Redis官方被称之为 PEL(Pending Entries List),记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符),它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理

2. 生产消息命令(XADD)

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]:向Stream(key)中添加一条消息,ID可以是自动生成或指定的唯一标识符,后面跟着一个或多个字段值对。

这个命令有三个注意点:

  • 消息id要比上个id大
  • 默认用*表示自动生成规矩
  • *:用于XADD命令,表示让系统自动生成id(类似于MySQL的自增主键)

示例:

127.0.0.1:6379> xadd k1 * name zhangsan age 18
1715525079420-0 # 系统自动生成的id
127.0.0.1:6379> xadd k1 * name lisi age 19
1715525087805-0
127.0.0.1:6379> xadd k1 * name wangwu age 20
1715525094621-0
127.0.0.1:6379> xadd k1 1715525094621-0 name wangwu age 20 # 重复的id会出错
ERR The ID specified in XADD is equal or smaller than the target stream top item
127.0.0.1:6379> xadd k1 1715525094621-1 name zhaoliu age 21 # 注意这里是 -1
1715525094621-1
127.0.0.1:6379> 

生成的消息ID,有两部分组成,毫秒时间戳-该毫秒内产生的第1条消息

信息条目指的是序列号,在相同的毫秒下序列号从0开始递增,序列号是64位长度,理论上在同一毫秒内生成的数据量无法到达这个级别,因此不用担心序列号会不够用。millisecondsTime指的是Redis节点服务器的本地时间,如果存在当前的毫秒时间戳比以前已经存在的数据的时间戳小的话(本地时间钟后跳),那么系统将会采用以前相同的毫秒创建新的ID,也即redis 在增加信息条目时会检查当前 id 与上一条目的 id, 自动纠正错误的情况,一定要保证后面的 id 比前面大,一个流中信息条目的ID必须是单调增的,这是流的基础。

客户端显示传入规则:

Redis对于ID有强制要求,格式必须是时间戳-自增Id这样的方式,且后续ID不能小于前一个ID

Stream的消息内容,它的结构类似Hash结构,以key-value的形式存在。

3. 查询相关命令

3.1 获取指定范围内的消息(XRANGE)

XRANGE key start end [COUNT count]:获取Stream中指定范围内的消息,startend定义了消息ID的范围,COUNT限制返回结果数量。

127.0.0.1:6379> xrange k1 - + count 2 
1715525079420-0
name
zhangsan
age
18
1715525087805-0
name
lisi
age
19
127.0.0.1:6379> 
  • -:表示Stream中的最小ID
  • +:表示Stream中的最大ID

3.2 逆序获取指定范围内的消息(XREVRANGE)

XREVRANGE key end start [COUNT count]:类似于XRANGE,但消息按逆序返回。

示例:

127.0.0.1:6379> xrevrange k1 + - count 2
1715525094621-1
name
zhaoliu
age
21
1715525094621-0
name
wangwu
age
20
127.0.0.1:6379> 

3.3 返回消息的数量(XLEN)

XLEN key:返回Stream中消息的数量。

127.0.0.1:6379> xlen k1
4
127.0.0.1:6379> 

4. 删除消息命令(XDEL)

XDEL key ID [ID ...]:删除Stream中指定ID的消息。

127.0.0.1:6379> xlen k1
4
127.0.0.1:6379> xdel k1 1715525079420-0
1
127.0.0.1:6379> xlen k1
3
127.0.0.1:6379> 

5. 截取消息命令(XTRIM)

XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]:截取Stream

  • MAXLEN:表示允许的最大长度,保留大的
  • MIDID:允许的最小id,这个id之前的消息会被截取掉
127.0.0.1:6379> xrange k1 - + 
1715525087805-0
name
lisi
age
19
1715525094621-0
name
wangwu
age
20
1715525094621-1
name
zhaoliu
age
21
127.0.0.1:6379> xtrim k1 maxlen 2 
1
127.0.0.1:6379> xrange k1 - + 
1715525094621-0
name
wangwu
age
20
1715525094621-1
name
zhaoliu
age
21
127.0.0.1:6379> xtrim k1 minid 1715525094621-1
1
127.0.0.1:6379> xrange k1 - + 
1715525094621-1
name
zhaoliu
age
21
127.0.0.1:6379> 

6. 消费消息命令(XREAD)

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]:非阻塞或阻塞式读取一个或多个Stream中的消息。

  • COUNT指定最多读取的消息数量
  • BLOCK milliseconds指定阻塞等待新消息的最长时间(毫秒)。默认不阻塞,如果milliseconds设置为0,则永远阻塞
127.0.0.1:6379> xread count 2 streams k1 0-0
k1
1715525094621-1
name
zhaoliu
age
21
1715526892718-0
name
aaa
127.0.0.1:6379> 

0-0代表从最小的ID开始获取Stream中的消息,当不指定count,将会返回Stream中的所有消息,注意也可以使用0(00/000也都是可以的)

阻塞读取示例:

客户端1:

127.0.0.1:6379> xread count 1 block 0 streams k1 $
k1
1715527170695-0
name
ddd
127.0.0.1:6379> 

客户端2:

127.0.0.1:6379> xadd k1 * name ddd 
"1715527170695-0"
127.0.0.1:6379> 

$:表示只消费新的消息,比当前id还要大的id

只有当客户端2添加数据之后,客户端1才会进行消费

7. 消费者组管理命令

消息创建好之后,就需要消费者来进行消费.而创建消费者要分组进行创建.

7.1 创建消费者组(XGROUP)

XGROUP create key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read]:创建一个新的消费者组。

127.0.0.1:6379> XGROUP create k1 group1 $
OK
127.0.0.1:6379> XGROUP create k1 group2 0
OK
127.0.0.1:6379> 
  • $表示消费新来de
  • 0表示从Stream头部开始消费

7.2 在消费者组中读取消息(XREADGROUP)

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]:在消费者组中读取消息,支持消息确认机制(通过ACKNOACK)。

127.0.0.1:6379> XGROUP create k1 group2 0
OK
127.0.0.1:6379> xreadgroup group group2 consumer1 streams k1 > 
k1
1715570723943-0
f1
v1
1715570728646-0
f2
v2
1715570732271-0
f3
v3
1715570735727-0
f4
v4
127.0.0.1:6379> xreadgroup group group2 consumer2 streams k1 > 

127.0.0.1:6379> 
  • >:用于XREADGROUP命令,表示迄今还没有发送给组中使用者的信息,会更新消费者组的最后ID

  • 消费者不存在时,会自动创建

group2这个组中的consumer1读完了k1中的所有消息,那么当group2中consumer2再来读时,就读不到任何消息了.

但当我新建一个group3组,在group3组中读数据时,又能够读到消息.

127.0.0.1:6379> XGROUP create k1 group3 0
OK
127.0.0.1:6379> xreadgroup group group3 consumer3 streams k1 > 
k1
1715570723943-0
f1
v1
1715570728646-0
f2
v2
1715570732271-0
f3
v3
1715570735727-0
f4
v4
127.0.0.1:6379> 

这是因为同组共享一个队列信息.这个组就相当mq中的队列,不同消费者读取同一个队列的时候每条消息只能被消费一次,但是同一条消息可以被不同队列的不同消费者同时消费,也就是广播

为了防止一个消费者读完所有消息,我们可以使用count参数来限制消费者读取几条消息,以此实现负载均衡

示例:

127.0.0.1:6379> xreadgroup group group4 consumer1 count 2 streams k1 > # 限制只能读2条消息
k1
1715570723943-0
f1
v1
1715570728646-0
f2
v2
127.0.0.1:6379> xreadgroup group group4 consumer2 count 2 streams k1 >
k1
1715570732271-0
f3
v3
1715570735727-0
f4
v4
127.0.0.1:6379> 

7.3 查询消费者组的信息(XPENDING)

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]:查询消费者组的信息,包括待处理消息的数量等。

示例:

127.0.0.1:6379> xpending k1 group2 # group2组中消费读取情况
4
1715570723943-0
1715570735727-0
consumer1
4
127.0.0.1:6379> xpending k1 group4 # group4组中消费读取情况
4
1715570723943-0
1715570735727-0
consumer1
2
consumer2
2
127.0.0.1:6379> 

7.4 确认消费者成功处理了消息(XACK)

XACK key group id [id ...]:用于确认(acknowledge)消费者已经成功处理了消息。

示例:

127.0.0.1:6379> xpending k1 group4 - + 5 consumer1
1715570723943-0
consumer1
543165
1
1715570728646-0
consumer1
543165
1
127.0.0.1:6379> xack k1 group4 1715570723943-0
1
127.0.0.1:6379> xpending k1 group4 - + 5 consumer1
1715570728646-0
consumer1
574237
1
127.0.0.1:6379> 

当客户端从Stream中读取消息后,会使用XACK命令告知Redis,它所指定的消息ID对应的消息已经得到了妥善处理。Redis接收到XACK命令后,会将这些消息ID从消费者组的待确认(pending)消息列表中移除,这样,这些消息就不会再次被同一个消费者组内的消费者获取到,实现了消息的确认与去重处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1670101.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

给centos机器打个样格式化挂载磁盘(新机器)

文章目录 一、先安装lvm2二、观察磁盘三、磁盘分区四、建PV五、建VG六、创建LV七、在LV上创建文件系统八、挂载到/home(1)临时挂载(2)永久挂载 九、最后reboot一下 一、先安装lvm2 yum install lvm2二、观察磁盘 三、磁盘分区 四…

Springboot + xxlJob注意事项

1. 部署 就是这个地址: https://gitee.com/xuxueli0323/xxl-job 由于xxl-job的思想是 调度中心负责调度任务,然后有执行器负责接受调度的信息,然后根据调度,执行任务中的具体逻辑 将 xl-job-admin 启动起来,操作xl-job-admin这个文件夹下的配置文件即可: 创建数据库 执行sql…

如何防止公司内部人员有意或无意的把内部核心文件资料泄露,拷贝,打印,上传,社交工具等途径外泄?

防止公司内部人员有意或无意泄露核心文件资料,需要采取一系列综合性的管理和技术措施。 以下是一些有效的策略: 加强员工意识教育:定期举办信息安全培训,提高员工对数据保护的意识,让员工了解数据泄露的风险和后果&…

冯喜运:5.13黄金原油震荡整理是涨还是跌?今日走势分析

【黄金消息面分析】;自5月初以来,黄金和白银一直在享受需求的回归,买家在过去几天加大了力度,一度推动金价重返2370美元上方,白银重返28.5美元上方。不过,经过几天的盘整后,黄金白银价格双双下跌。然而&…

leetcode经典例题之环形队列

P. S.:以下代码均在VS2019环境下测试,不代表所有编译器均可通过。 P. S.:测试代码均未展示头文件stdio.h的声明,使用时请自行添加。 目录 1、题目展示2、问题分析3、完整代码展示4、结语 1、题目展示 在拿到题目时,通…

SSH常用功能介绍-高级功能

一、介绍 SSH(Secure Shell)是一种用于远程登录和执行命令的网络协议,它提供了加密的连接,保证了数据的安全性。除了基本的远程登录功能外,SSH还提供了许多高级功能,以下是一些常用的高级功能介绍&#xf…

26版SPSS操作教程(高级教程第二十章)

目录 前言 粉丝及官方意见说明 第二十章一些学习笔记 第二十章一些操作方法 神经网络与支持向量机 人工神经网络(artificial neural network,ANN) 假设数据 具体操作 结果解释 对案例的进一步分析 结果解释 ​编辑 尝试将模型复…

mmdetection在训练自己数据集时候 报错‘ValueError: need at least one array to concatenate’

问题: mmdetection在训练自己数据集时候 报错‘ValueError: need at least one array to concatenate’ 解决方法: 需要修改数据集加载的代码文件,数据集文件在路径configs/base/datasets/coco_detection.py里面,需要增加meta…

水经微图万能版、专业版与企业版的区别?

水经微图(以下简称“微图”)的版本,主要分为万能版、专业版和企业版三个版本。 什么是万能版? 万能版是指“水经注万能地图下载器”软件功能的授权,虽然该软件已经停止更新,但购买过该软件的用户&#xf…

简单的DbUtils工具类【精细】

目录 单条通用增删改方法 1.创建maven项目,并加载依赖 2.创建数据库连接工具类(Dbutils类) 3.创建一个执行器(SqlExecutor类) 4.通用(增,删,改)方法 1.创建方法 2.创建userInfo实体类 3.创建测试类,测试增,删&#xf…

leetcode-最长公共子序列(二)-103

题目要求 思路 step 1:优先检查特殊情况。 step 2:获取最长公共子序列的长度可以使用动态规划,我们以dp[i][j]dp[i][j]dp[i][j]表示在s1中以iii结尾,s2中以jjj结尾的字符串的最长公共子序列长度。 step 3:遍历两个字…

C++——缺省参数与重载函数

目录 ​前言 一.缺省参数 1.1缺省参数概念 1.2缺省参数分类 注意事项: 二.函数重载 2.1函数重载概念 2.2c支持函数重载原理——命名修饰 前言 本篇文章主要讲述c中有关于缺少参数与函数重载的相关概念与实例,以下是本人拙见,如有错误…

文件夹重命名高效批量技巧:轻松实现在文件夹名称左边添加关键字

在日常工作和生活中,我们经常需要对大量的文件夹进行重命名,以便更好地组织和管理文件。然而,手动一个接一个地修改文件夹名称既费时又费力。幸运的是,有一些高效的批量重命名技巧可以帮助我们快速实现这一目标,特别是…

算法学习笔记(3)-差分

#差分 差分和前缀和互为逆运算: 给定一个原数组s,差分数组h,两者的关系如下所示: s[i] h[1] h[2] h[3] …… h[i] 针对于上面的公式,由差分数组h推导而来 h[1] s[1] h[2] s[2] - s[1] h[3] s[3] - [2] …… h[…

vue前端时间段选择控件

实现效果: 可选具体的某天的某时某分某秒 vue前端代码: <el-form-item label"日期"><el-date-pickerv-model"daterangerq"style"width: 240px"value-format"yyyy-MM-dd HH:mm:ss"type"datetimerange"range-separat…

Spring,SpringMVC,SpringBoot知识总结

1.简述Spring,SpringMVC&#xff0c;SpringBoot各自特点及联系 Spring、Spring MVC 和 Spring Boot 是 Java 开发中常用的三个框架&#xff0c;它们之间有以下关系&#xff1a; Spring&#xff1a;是一个全功能的 JavaEE 应用程序框架。它提供了一系列的解决方案&#xff0c…

【管理咨询宝藏99】离散制造智能工厂战略规划方案

本报告首发于公号“管理咨询宝藏”&#xff0c;如需阅读完整版报告内容&#xff0c;请查阅公号“管理咨询宝藏”。 【管理咨询宝藏99】离散制造智能工厂战略规划方案 【格式】PDF版本 【关键词】智能制造、先进制造业转型、数字化转型 【核心观点】 - 推进EHS、品质一致性、生…

通配符证书260元

通配符SSL证书是一种特殊的数字证书&#xff0c;可以保护域名以及该域名下的所有子域名。不论是个人开发者还是企业开发者&#xff0c;只需要购买一个通配符SSL证书&#xff0c;就可以为多个子域名提供传输数据加密服务&#xff0c;降低了多子域名网站开发者购买SSL证书的成本。…

ArcGIS水文水环境数据编辑、管理、处理与分析;ArcGIS水文分析及流域特征提取;湖泊水库水环境监测及评价;河道水污染预测与水环境容量计算等案例实践

目录 专题一 ArcGIS&#xff1a;数据管理 专题二 ArcGIS&#xff1a;数据转换 专题三 ArcGIS&#xff1a;地图制作 专题四 水文水环境数据编辑与管理 专题五 水文水环境数据处理与分析 专题六 ArcGIS水文分析及流域特征提取 专题七 湖泊水库水环境监测及评价 专题八 河…

redis服务连接及常规操作和redis服务getshell

简介 Redis&#xff0c;英文全称是Remote Dictionary Server&#xff08;远程字典服务&#xff09;&#xff0c;是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库&#xff0c;并提供多种语言的API。 redis服务于传统服务器不同&…