Redis 7.x 系列【14】数据类型之流(Stream)

news2025/1/15 22:35:11

有道无术,术尚可求,有术无道,止于术。

本系列Redis 版本 7.2.5

源码地址:https://gitee.com/pearl-organization/study-redis-demo

文章目录

    • 1. 概述
    • 2. 常用命令
      • 2.1 XADD
      • 2.2 XRANGE
      • 2.3 XREVRANGE
      • 2.4 XDEL
      • 2.5 XLEN
      • 2.6 XREAD
      • 2.7 XGROUP CREATE
      • 2.8 XACK
      • 2.9 XPENDING
    • 3. 应用场景

1. 概述

消息队列:是指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递,生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由服务端给其推送消息。

Redis 也支持消息队列功能,在 5.0 版本之前,基于以下两种方式实现:

  • Pub/Sub
  • List

Pub/Sub 发布订阅模式,消息的发送者不会将消息直接发送给特定的接收者,而是通过消息通道广播出去,让订阅该消息主题的订阅者消费到:

在这里插入图片描述
Pub/Sub 中的消息无法持久化,如果出现网络断开、宕机等,消息就会被丢弃。而且也没有 Ack 机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了。

Redis List 也可以实现消息队列,按照插入顺序排序,可以添加一个元素到列表的头部(左边)或者尾部(右边)。 将需要延后处理的任务结构体序列化成字符串塞进 Redis 的列表,另一个线程从这个列表中轮询数据进行处理:
在这里插入图片描述
Redis List 同样存在诸多问题,比如,不支持多消费者模式,不支持延时消息,不支持优先级,不支持消息确认机制等等。

Redis Stream5.0 版本中引入的一种新的数据结构,用于实现简单但功能强大的消息传递模式。以时间序列的方式存储消息,每个消息都有一个唯一的 ID ,并且可以被多个消费者订阅和消费。是 Redis 实现消息队列的另外一种模式,支持消息的持久化、支持自动生成全局唯一 1D、支持 Ack 确认消息模式、支持消费组模式等,旨在让消息队列更加的稳定和可靠。

其结构图如下:
在这里插入图片描述
各部分解释:

  • Message Content:消息内容
  • Consumer group:消费组,通过 XGROUP CREATE 命令创建,同一个消费组可以有多个消费者
  • Last_delivered_id:游标,每个消费组会有个游标 Last_delivered_id,任意一个消费者读取了消息都会使游标往前移动。
  • Consumer:消费者,消费组中的消费者
  • Pending_ ids:消费者会有一个状态变量,用于记录被当前消费已读取但未 ack 的消息 Id ,如果客户端没有 ack ,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack 它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL(Pending Entries List),记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符),它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理

2. 常用命令

Stream 相关所有命令:

命名描述
XACK确认消费者已经成功处理从 Stream 中获取的消息
XADD添加消息到队列末尾
XAUTOCLAIM转移符合指定条件的待处理流条目的所有权
XCLAIM改变待处理消息的所有权
XDEL删除消息
XGROUP CREATE为存储在 key 的流创建一个新的消费者组
XGROUP CREATECONSUMER要在存储在key的流的消费者组中创建一个消费者
XGROUP DELCONSUMER消费者组中删除一个消费者
XGROUP DESTROY删除一个已存在的消费者组
XGROUP SETID为消费者组设置最后传递的ID
XINFO CONSUMERS返回消费者组中的消费者列表
XINFO GROUPS返回消费者组列表
XINFO STREAM存储在的key流的相关信息
XLEN获取 Stream 中的消息长度
XPENDING通过消费者组从流中获取数据但不确认这些数据,会产生待处理条目
XRANGE获取消息列表(可以指定范围)
XREAD获取消息(阻塞/非阻塞),返回大于指定 ID 的消息
XREADGROUPXREAD命令的一个特殊版本,支持消费者组
XREVRANGEXRANGE 相比区别在于反向获取,ID从大到小
XSETID内部命令。它用于主节点来复制流的最后传递的ID
XTRIM限制 Stream 的长度,如果已经超长会进行截取

2.1 XADD

XADD 命令用于向 Stream(流)数据结构末尾添加消息。

语法格式:

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

参数说明:

  • key:指定要添加消息的 Stream 的名称。
  • [NOMKSTREAM]:可选参数,用于指定当流不存在时是否报错。默认情况下,如果指定的流不存在,XADD命令会创建。如果使用NOMKSTREAM选项,则流不存在时命令会失败。
  • [MAXLEN|MINID [=|~] threshold [LIMIT count]]:这组选项用于控制流的最大长度或最小消息 ID
    • MAXLEN maxlen:限制 Stream 的最大长度。当长度达到maxlen时,旧的消息会被自动删除。
    • MINID minid:指定最旧的消息ID。当插入新消息时,如果已经存在比minid更旧的消息,则会将这些消息删除。
    • [=|~]:操作符,=表示精确匹配,~表示小于等于(对于MINID而言)或大于等于(对于MAXLEN而言)。
    • [LIMIT count]:当使用MAXLEN~时,指定需要保留的消息数量的最小值。
  • *|ID:消息的ID。使用*表示自动生成一个唯一的ID。如果不使用*,则需要提供一个有效的消息ID,它必须大于流中所有已存在的消息的ID
  • field value [field value ...]:消息的字段和值。可以指定一个或多个字段及其对应的值。

示例,插入消息:

localhost:0>XADD mystream * msg_1 100 msg_2 38
"1719279960591-0"

示例, 插入消息,并限制长度不超过 1000 条:

localhost:0>XADD mystream MAXLEN 1000 * msg_3 100 msg_4 38
"1719279971749-0"

查看控制台:

在这里插入图片描述

2.2 XRANGE

XRANGE 命令用于获取指定范围内的消息。

命令格式:

XRANGE key start end [COUNT count]

参数说明:

  • key:指定 Streamkey
  • start:指定要检索的消息范围的起始 ID 。可以使用特殊值-来表示最小值。
  • end:指定要检索的消息范围的结束 ID 。可以使用特殊值+来表示最大值。
  • [COUNT count]:可选参数,用于限制返回的消息数量。

注意事项:

  • Stream 的消息 ID 由两部分组成:一个时间戳和一个序列号。时间戳表示消息被添加到 Stream 的时间,而序列号则用于在同一时间戳内区分不同的消息。
  • XRANGE 命令返回的消息是按照它们在 Stream 中的顺序排列的,即按照消息 ID 的顺序。
  • 如果在检索消息时使用了 COUNT 参数,但指定的范围内的消息数量少于 COUNT 指定的数量,那么只会返回范围内的所有消息。

示例,检索所有消息:

localhost:0>XRANGE mystream - +
 1)    1)   "1719279960591-0"
  2)      1)    "msg_1"
   2)    "100"
   3)    "msg_2"
   4)    "38"
 2)    1)   "1719279971749-0"
  2)      1)    "msg_3"
   2)    "100"
   3)    "msg_4"
   4)    "38"

示例,检索特定范围内的消息:

localhost:0>XRANGE mystream  1719279960591-0 1719279960600-0
 1)    1)   "1719279960591-0"
  2)      1)    "msg_1"
   2)    "100"
   3)    "msg_2"
   4)    "38"

示例,限制返回的消息数量:

localhost:0>XRANGE mystream - + COUNT 1
 1)    1)   "1719279960591-0"
  2)      1)    "msg_1"
   2)    "100"
   3)    "msg_2"
   4) 

2.3 XREVRANGE

XREVRANGE 命令与 XRANGE 命令类似,但它是按照消息 ID 的递减顺序(用于反向)获取指定范围内的消息。

命令格式:

XREVRANGE key end start [COUNT count]

示例,检索最后两个时间序列的消息:


localhost:0>XREVRANGE mystream + - COUNT 2
 1)    1)   "1719279971749-0"
  2)      1)    "msg_3"
   2)    "100"
   3)    "msg_4"
   4)    "38"

 2)    1)   "1719279960591-0"
  2)      1)    "msg_1"
   2)    "100"
   3)    "msg_2"
   4)    "38"

2.4 XDEL

XDEL 命令用于从 Stream 中删除指定的消息。返回一个整数,表示被成功删除的消息数量。

命令格式:

XDEL key ID [ID ...]

参数说明:

  • key:指定 Streamkey
  • ID:一个或多个要删除的消息的 ID

注意事项:

  • 在使用 XDEL 命令时,你需要确保提供的消息 ID 是存在的,否则命令将不会删除任何消息,并返回0。
  • 可以通过一次 XDEL 命令删除多个消息,只需在命令中提供多个消息 ID 即可。
  • XDEL 命令不会改变 Stream 的其他消息的顺序或 ID

示例,删除消息:

localhost:0>XDEL mystream 1719280747405-0
"1"

2.5 XLEN

XLEN 命令用于获取指定 Stream 中包含的消息数量,即流的长度。如果 Stream 不存在或为空,则返回 0

命令格式:

XLEN key

示例:

localhost:0>XLEN mystream
"1"

2.6 XREAD

XREAD 命令是用于从 Stream 独立消费消息,支持阻塞等待新消息的到来。返回一个数组,其中每个元素都是一个包含 Stream key 和消息列表的数组。消息列表是一个包含消息 ID 和消息数据的数组。

命令格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

参数说明:

  • COUNT count:指定一次读取的最大消息数量。如果未指定,则默认为 1
  • BLOCK milliseconds:用于指定阻塞的时间(以毫秒为单位)。如果指定了此参数,并且 Stream 中没有可消费的消息,客户端将在指定的时间内阻塞等待。如果设置为 0 ,则表示非阻塞模式,即如果没有消息可消费,则立即返回。
  • STREAMS key [key ...]:指定要从中读取消息的 Streamkey 。可以指定一个或多个。
  • ID [ID ...]:对于每个指定的 key ,可以提供一个或多个消息 ID 。这些 ID 用于指定从哪个位置开始读取消息。如果某个 key 后面没有指定 ID ,则默认为从该 Stream 的最新消息开始读取。

示例,非阻塞模式读取最新消息:

XREAD COUNT 1 STREAMS mystream $

示例,阻塞模式,读取最新消息并等待新消息:

XREAD COUNT 1 BLOCK 10000 STREAMS mystream $

2.7 XGROUP CREATE

XGROUP CREATE 命令用于在已存在的流(stream)上创建一个新的消费者组(consumer group)。消费者组允许多个消费者(consumer)协作消费同一个流中的数据,并且每个消费者都可以从自己的位置开始读取流。

命令格式:

XGROUP CREATE <key> <groupname> <id> [MKSTREAM] [MKTABLE] [CREATECONSUMER <consumername>]

参数说明:

  • <key>:流的名称。
  • <groupname>:消费者组的名称。
  • <id>:消费者组初始的最后一个条目 ID ,即消费者组开始读取的起始点。可以使用$表示流的最新条目,或者使用0表示流的起始点,或者使用任何其他有效的 ID
  • [MKSTREAM]:可选参数,如果流不存在,则创建它。
  • [MKTABLE]:在 Redis 6.2 及更高版本中引入的可选参数,用于创建与流关联的二级索引表(secondary index table)。这通常用于支持基于特定字段的查询。
  • [CREATECONSUMER <consumername>]:在 Redis 6.2 及更高版本中引入的可选参数,用于在创建消费者组时同时创建一个消费者。

示例,创建一个新的消费者组,从流的最新条目开始读取:

localhost:0>XGROUP CREATE mystream mygroup $ MKSTREAM
"OK"

2.8 XACK

XACK 命令用于确消费者已经成功处理了一个或多个消息。这些消息通常是从流(Stream)中读取的,并存储在消费者组的待处理条目列表(Pending Entry ListPEL)中。通过发送 XACK 命令,消费者通知 Redis 服务器它已经完成了一个或多个消息的处理,从而将这些消息从 PEL 中移除。

命令格式:

XACK <key> <groupname> <consumername> <ID> [<ID> ...]

参数说明:

  • <key>:流的名称。
  • <groupname>:消费者组的名称。
  • <consumername>:消费者的名称。
  • <ID>:要确认的消息的ID,可以指定一个或多个。

示例,假设消费者已经读取了一些消息,并决定它们已经被成功处理。现在,消费者想要确认这些消息:

XACK mystream mygroup myconsumer 1526569900000-0 1526569900002-0

在这个例子中,消费者确认了两个消息,它们的 ID 分别是 1526569900000-01526569900002-0

一旦消息被确认,它们将从该消费者组的 PEL 中移除,表示这些消息已经被成功处理。注意,即使消息被确认并从 PEL 中移除,它们仍然保留在流中,并且可以被其他消费者组或消费者读取。

如果消费者在处理消息时失败,或者需要稍后重试,它可以选择不发送 XACK 命令,这样消息将保持在 PEL 中,直到消费者准备好确认它们或它们因超时而被自动从 PEL 中移除(取决于消费者组的配置)。

2.9 XPENDING

XPENDING 命令用于查询消费者组中未确认消息的详细信息。允许你查看哪些消息正在等待被处理,以及哪些消费者拥有这些消息。

命令格式:

XPENDING <key> <groupname> [start end count] [consumername]

参数说明:

  • <key>:流的名称。
  • <groupname>:消费者组的名称。
  • [start end count]:这三个参数是可选的,用于限制查询结果的范围。
  • start:查询的开始消息ID
  • end:查询的结束消息ID
  • count:要返回的消息数量。
  • [consumername]:可选参数,指定要查询的消费者的名称。如果不提供此参数,将返回消费者组中的所有未确认消息。

XPENDING 命令返回一个数组,其中包含以下信息:

  • 总未确认消息数:整数,表示在指定范围内未确认的消息总数。
  • 最小消息ID:字符串,表示在指定范围内未确认消息的最小ID
  • 最大消息ID:字符串,表示在指定范围内未确认消息的最大ID
  • 每个消费者的未确认消息:一个数组,其中每个元素都是一个包含消费者名称和该消费者拥有的未确认消息数的数组。

注意事项:

  • XPENDING 是一个只读命令,它不会修改任何数据。
  • 如果提供了 consumername 参数,则只返回该消费者的未确认消息信息。
  • 如果提供了 [start end count] 参数,则只返回指定范围内的未确认消息信息。
  • 通过 XPENDING 命令,你可以轻松地监控消费者组中的未确认消息,从而确保消息得到及时处理,并在必要时进行故障排除。

示例:

XPENDING mystream mygroup
2) "1526569900000-0"  # 最小消息ID  
3) "1526569900002-0"  # 最大消息ID  
4) 1) 1) "myconsumer" # 消费者名称  
     2) (integer) 2   # 该消费者拥有的未确认消息数

3. 应用场景

Redis Stream 主要用于消息队列,所以可以用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。但是更推荐使用专业的消息队列,比如RabbitMQRocketMQ等,对于简单的应用场景,如果能满足需求,也可以使用Redis Stream

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1886000.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Chisel学习笔记(1)——Chisel安装与Verilog代码仿真

参考链接&#xff1a; https://www.chisel-lang.org/docs/installation 使用Chisel语言编写硬件描述语言&#xff0c;相比于使用Verilog会更加地灵敏快捷&#xff0c;Coding效率更高&#xff0c;但似乎debug会出现一些小问题。但新工具还是要尝试一下才知道好不好用。 1 安装C…

武汉星起航:跨境电商领域的领航者,助力合作伙伴全球布局

在跨境电商的汹涌浪潮中&#xff0c;武汉星起航电子商务有限公司如同一颗璀璨的明星&#xff0c;自2017年起便以亚马逊自营店铺为核心业务&#xff0c;不断积累实战运营经验&#xff0c;逐步建立了自己在市场中的稳固地位。随着2020年公司的正式成立&#xff0c;武汉星起航明确…

@RequestMapping属性详解及案例演示

RequestMapping源码 Target({ElementType.TYPE, ElementType.METHOD}) Retention(RetentionPolicy.RUNTIME) Documented Mapping public interface RequestMapping {String name() default "";AliasFor("path")String[] value() default {};AliasFor(&quo…

最快33天录用!一投就中的医学4区SCI,几乎不退稿~

【SciencePub学术】今天小编给大家推荐2本生物医学领域的SCI&#xff0c;此期刊为我处目前合作的重点期刊&#xff01;影响因子0-3.0之间&#xff0c;最重要的是审稿周期较短&#xff0c;对急投的学者较为友好&#xff01; 医学医药类SCI 01 / 期刊概况 【期刊简介】IF&…

从0开始搭建vue项目

#先查下电脑有没有安装过node和npm node -v npm -v #安装vue npm install -g vue #安装webpack npm install webpack -g 都安装好后&#xff0c;进入你想创建的文件夹内 创建名字&#xff1a;vue init webpack <project_name> 就默认回车 然后根据项目需求Y/n 比如…

红酒香气探秘:解锁品味酒香的奥秘,带你领略葡萄酒的魅力

在葡萄酒的世界里&#xff0c;香气是葡萄酒的灵魂&#xff0c;它诉说着葡萄的故事&#xff0c;也展现着酿酒师的匠心独运。每一款红酒都有其不同的香气&#xff0c;如同一位优雅的舞者&#xff0c;用曼妙的舞姿诠释着酒的韵味。今天&#xff0c;就让我们一起走进红酒的香气世界…

24年诺瓦星云入职认知能力测验Verify + 职业性格问卷OPQ可搜索带解析求职SHL题库

一、走进西安诺瓦星云科技股份有限公司 西安诺瓦星云科技股份有限公司(简称诺瓦星云) 是全球极具竞争力的LED显示解决方案供应商&#xff0c;实施"基于西安&#xff0c;围绕北京与深圳&#xff0c;辐射全球"的全球化布局&#xff0c;总部位于西安&#xff0c;西安、…

微服务中的Feign远程调用

Feign的个人理解 Feign在英文中是“装”的意思&#xff0c;但在微服务中他是远程调用的一种方式&#xff0c;我的理解是&#xff1a;他替代了RestTemplateNacos中的URL编码的方式&#xff0c;显得很高大上&#xff0c;所以很装&#xff1a;&#xff08;声明式事务&#xff0c;只…

静态IP代理:保障网络稳定的核心技术

静态IP代理作为一种重要的网络工具&#xff0c;因其稳定性和持久性&#xff0c;受到越来越多用户的青睐。本文将深入探讨静态IP代理的定义和优势。 静态IP代理是什么&#xff1f; 静态IP代理是指在代理服务器中分配一个固定的IP地址&#xff0c;用户在使用过程中始终使用同一个…

当我问AI,智星云算力有什么特点时,答案出乎意料!

当我问AI&#xff0c;智星云算力有什么特点时&#xff0c;它的回答几乎全中。 唯一的错误是在“分钟计费”这里&#xff0c;之前确实是按分钟计费过&#xff0c;今年改成了按小时计费&#xff0c;大约一小时0.75元起。 另外就是一些更具体的特点没有展现出来&#xff0c;比如…

Mysql和ES使用汇总

一、mysql和ES在业务上的配合使用 一般使用时使用ES 中存储全文检索的关键字与获取的商品详情的id&#xff0c;通过ES查询获取查询商品的列表中展示的数据&#xff0c;通过展示id 操作去获取展示商品的所有信息。mysql根据id去查询数据库数据是很快的&#xff1b; 为什么ES一般…

高效除氟:探索CH-87up树脂在氟化工废水处理中的应用

摘要 本研究旨在评估Tulsimer CH-87up树脂针对经钙镁预处理后的氟化工废水的深度处理效果。实验结果显示&#xff0c;CH-87up树脂能显著降低废水中的氟离子浓度&#xff0c;从43.4mg/L降至0.34mg/L&#xff0c;远低于行业排放标准的5mg/L。此外&#xff0c;该树脂表现出卓越的…

Windows打开redis以及Springboot整合redis

目录 前言Windows系统打开redisSpringboot整合redis依赖实体类yml配置文件config配置各个数据存储类型分别说明记录string数据写入redis&#xff0c;并查询通过命令行查询 list插入数据到redis中从redis中读取命令读取数据 hash向redis中逐个添加map键值对获取key对应的map中所…

[附源码]最新springboot线上电商|前后端分离|界面简洁

一. 前言 今天小编给大家带来了一款可学习&#xff0c;可商用的&#xff0c;线上电商的网站源码&#xff0c;支持二开&#xff0c;无加密。代码的后端是SpringBoot技术栈&#xff08;非jsp&#xff09;&#xff0c;前端是Angular。如果您需要定制需求请找小编。 文章第六小节…

Vue 数据大屏适配

1、准备俩个盒子 .dataScreen-content 盒子内容根据设计稿给的px单位进行正常的布局就行 2、盒子的CSS样式 .dataScreen-container {width: 100%;height: 100%;// 有背景图需要的样式background: url("./images/bg.png") no-repeat;background-repeat: no-repeat;b…

让采购和工程师们既爱又恨的任务——BOM

在项目研发与生产过程中&#xff0c;有一个常常让采购经理和工程师们既爱又恨的任务&#xff0c;那就是整理BBOMB。BOM作为连接设计与制造的桥梁&#xff0c;其重要性不言而喻&#xff0c;它详细列出了产品构成所需的所有零部件、材料及其规格、数量&#xff0c;是成本估算、采…

学习记录之数学表达式(6)

目录 十二、图与网络12.1 有向图12.2 元组与对象12.3 二元关系与有向图12.4 无向图12.5 有向网络12.6 作业 十三、树13.1 例子13.2 定义13.3 Java代码13.4 作业 十四、 m \mathbf{m} m叉树14.1 预备知识&#xff1a;字符串14.2 m \mathbf{m} m-叉树的定义14.3 Java代码14.4 作…

代码随想录算法训练营第20天 | 题目: 235. 二叉搜索树的最近公共祖先 701.二叉搜索树中的插入操作 450.删除二叉搜索树中的节点

代码随想录算法训练营第20天 | 题目&#xff1a; 235. 二叉搜索树的最近公共祖先 701.二叉搜索树中的插入操作 450.删除二叉搜索树中的节点 文章来源&#xff1a;代码随想录 题目名称&#xff1a; 235. 二叉搜索树的最近公共祖先 给定一个二叉搜索树, 找到该树中两个指定节点的…

【.Net】Web项目部署腾讯云

文章目录 总述前置准备docker-compose部署普通部署 参考 总述 前置准备 云服务添加端口 另有linux本身防火墙请参考&#xff1a; 【Linux】防火墙命令 需安装.Net SDK和Asp .Net Runtime 注意&#xff1a; 1、sdk也要不只是runtime 2、是Asp .Net Runtime不是.Net Runtime …

Linux socketcan应用编程

一、基本步骤 1、打开并绑定到 CAN 套接字 在执行任何操作之前&#xff0c;第一步是创建一个套接字。此函数接受三个参数 – 域/协议系列 &#xff08;PF_CAN&#xff09;、套接字类型&#xff08;原始或数据报&#xff09;和套接字协议。如果成功&#xff0c;该函数将返回文件…