Redis之stream类型解读

news2025/1/16 20:12:53

目录

基本介绍

数据结构

消息

消费组

消费者

基本使用命令

概述 

xadd 命令

xtrim 命令

 xdel 命令

xlen 命令

xrange 命令

xread 命令 

xgroup 命令 

xreadgroup 命令

xack 命令


基本介绍

Redis stream(流)是一种数据结构,其作用类似于仅追加日志,但也实现了多个操作来克服典型仅追加日志的一些限制。其中包括O(1)时间的随机访问和复杂的消费策略,如消费者群体。 您可以使用流实时记录和同时联合事件。 

Redis 为每个stream(流)条目生成一个唯一的 ID。可以在以后使用这些 ID 检索其关联的条目,或读取和处理流中的所有后续条目。

Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。stream 有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的,Redis 重启后,内容还在。

数据结构

Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

消息

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。XADD key ID field value[field value ...]

  • 消息 ID:消息 ID 的形式是 timestampInMillis-sequence,例如 1527846880572-5,它表示当前的消息在毫米时间戳 1527846880572 时产生,并且是该毫秒内产生的第5条消息。消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。
  • 消息内容:消息内容就是键值对,形如 hash 结构的键值对。

消费组

每个 Stream 都可以挂多个消费组(Consumer Group),每个消费组会有个游标last_delivered_id在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从 Stream 的某个消息ID开始消费,这个 ID 用来初始化 last_delivered_id 变量。每个消费组的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。同一个消费组可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者有一个组内唯一名称。

消费者

消费者内部会有一个状态变量pending_ids,维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有ack(Acknowledge character:确认字符)。如果客户端没有 ack,这个变量里面的消息 ID 就会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称为 PEL,也就是 Pending Entries List,这是一个核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了而没被处理。

基本使用命令

概述 

消息队列相关命令:

  • XADD - 添加消息到末尾
  • XTRIM - 对流进行修剪,限制长度
  • XDEL - 删除消息
  • XLEN - 获取流包含的元素数量,即消息长度
  • XRANGE - 获取消息列表,会自动过滤已经删除的消息
  • XREVRANGE - 反向获取消息列表,ID 从大到小
  • XREAD - 以阻塞或非阻塞方式获取消息列表

消费者组相关命令:

  • XGROUP CREATE - 创建消费者组
  • XREADGROUP GROUP - 读取消费者组中的消息
  • XACK - 将消息标记为"已处理"
  • XGROUP SETID - 为消费者组设置新的最后递送消息ID
  • XGROUP DELCONSUMER - 删除消费者
  • XGROUP DESTROY - 删除消费者组
  • XPENDING - 显示待处理消息的相关信息
  • XCLAIM - 转移消息的归属权
  • XINFO - 查看流和消费者组的相关信息;
  • XINFO GROUPS - 打印消费者组的信息;
  • XINFO STREAM - 打印流信息

xadd 命令

XADD 命令将指定的流条目追加到指定 key 的流中。如果 key 不存在,将使用流的条目自动创建 key。

一个条目是由一组键值对组成的,它基本上是一个小的字典。键值对以用户给定的顺序存储,并且读取流的命令(如 XRANGE 或者 XREAD)可以保证按照通过 XADD 添加的顺序返回。

XADD key ID field value [field value ...]
  • key :队列名称,如果不存在就创建
  • ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
  • field value : 记录。
redis> XADD mystream * name Sara surname OConnor
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) "1601372323627-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "OConnor"
2) 1) "1601372323627-1"
   2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
      5) "field3"
      6) "value3"

 返回值:该命令返回添加的条目的 ID。如果 ID 参数传的是*,那么 ID 是自动生成的,否则,命令仅返回用户在插入期间指定的相同的 ID。

xtrim 命令

XTRIM 将流裁剪为指定数量的项目,如有需要,将驱逐旧的项目(ID较小的项目)。

XTRIM key MAXLEN [~] count
  • key :队列名称
  • MAXLEN :长度
  • count :数量
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1601372434568-0"
   2) 1) "field1"
      2) "A"
      3) "field2"
      4) "B"
      5) "field3"
      6) "C"
      7) "field4"
      8) "D"

返回值:返回从流中删除的条目数。

 xdel 命令

从指定流中移除指定的条目,并返回成功删除的条目的数量。在传递的ID不存在的情况下,返回的数量可能与传递的ID数量不同。

XDEL key ID[ID ...]
  • key:队列名称。
  • ID:消息 ID 
> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
   2) 1) "a"
      2) "1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) "3"

返回值:删除的条目数量。

xlen 命令

返回流中的条目数。如果指定的key不存在,则此命令返回0,就好像该流为空。

与其他的Redis类型不同,零长度流是可能的,所以你应该调用TYPE或者EXISTS来检查一个key是否存在。一旦内部没有任何的条目(例如调用XDEL后),流不会被自动删除,因为可能还存在与其相关联的消费者组。

redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3

返回值:流中包含的条目数量

xrange 命令

xrange命令返回流中满足给定ID范围的条目。范围由最小和最大ID指定。所有ID在指定的两个ID之间或与其中一个ID相等(闭合区间)的条目将会被返回。

XRANGE key start end [COUNT count]
  • key :队列名
  • start :开始值, - 表示最小值
  • end :结束值, + 表示最大值
  • count :数量
redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) "1601372577811-0"
   2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) "1601372577811-1"
   2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"

返回值:该命令返回ID与指定范围匹配的条目。返回的条目是完整的,这意味着ID和所有组成条目的字段都将返回。此外,返回的条目及其字段和值的顺序与使用XADD添加它们的顺序完全一致。

xread 命令 

从一个或者多个流中读取数据,仅返回ID大于调用者报告的最后接收ID的条目。此命令有一个阻塞选项,用于等待可用的项目,类似于BRPOP或者BZPOPMIN等等。

XREAD[COUNT count][BLOCK milliseconds] STREAMS key[key ...]id[id ...]
  • count:数量。
  • milliseconds:可选,阻塞毫秒数,没有设置就是非阻塞模式。
  • key :队列名。
  • id:消息 ID。
redis> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) "1532"
            3) "event-id"
            4) "5"
            5) "user-id"
            6) "7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) "812"
            3) "event-id"
            4) "9"
            5) "user-id"
            6) "388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"

返回值:该命令返回一个结果数组:返回数组的每个元素都是一个由两个元素组成的数组(键名和为该键报告的条目)。报告的条目是完整的流条目,具有ID以及所有字段和值的列表。返回的条目及其字段和值的顺序与使用XADD添加它们的顺序完全一致。

当使用BLOCK时,超时时将返回一个空回复(nil)。

xgroup 命令 

使用 XGROUP CREATE 创建消费者组,语法格式:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • key :队列名称,如果不存在就创建
  • groupname :组名。
  • $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

从头开始消费:

XGROUP CREATE mystream consumer-group-name 0-0  

从尾部开始消费:

XGROUP CREATE mystream consumer-group-name $

xreadgroup 命令

使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group :消费组名
  • consumer :消费者名。
  • count : 读取数量。
  • milliseconds : 阻塞毫秒数。
  • key : 队列名。
  • ID : 消息 ID。

xack 命令

XACK命令用于从流的消费者组的待处理条目列表(简称PEL)中删除一条或多条消息。当一条消息交付到某个消费者时,它将被存储在PEL中等待处理,这通常出现在作为调用XREADGROUP命令的副作用,或者一个消费者通过调用XCLAIM命令接管消息的时候。

一旦消费者成功地处理完一条消息,它应该调用XACK,这样这个消息就不会被再次处理,且作为一个副作用,关于此消息的PEL条目也会被清除,从Redis服务器释放内存。

XACK key group ID[ID ...]

返回值:该命令返回成功确认的消息数。某些消息ID可能不再是PEL的一部分(例如因为它们已经被确认),而且XACK不会把他们算到成功确认的数量中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/930338.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【dasctf】easy_log

base解码可得压缩包密码 二分法盲注 import urllib.parse,re with open(raccess.log,r) as f:logf.readlines() dict1{} count0 #判断逻辑,最后一个fasle则取自身;最后一个为true则加1; for each in log:resre.findall(rflag\),(\d),1\)\)…

CEdit 选中文字实时更新到另一个控件中

有时候,我们会遇到需求,软件中需要让选中一个CEdit控件中的文字实时更新到另一个控件中,实现效果如下所示: 代码如下: BOOL CEditDemoDlg::PreTranslateMessage(MSG* pMsg) { CEdit* pOldEdit (CEdit*)GetDlgIte…

Linux驱动开发一、RK3568把hello编译到Linux内核中运行。‘rk_vendor_read’未定义的引用

1、在字符设备目录下建立hello目录 ~/Linux/rk356x_linux/kernel/drivers/char/hello 2、进入hello目录,新建hello.c、Makefile、Kconfig三个文件 3、Kconfig是打开make menuconfig配置界面是后的选项,这Kconfig是在字符设备下的。 config HELLOtrist…

哪些测试仪器可以用于检测静电中和设备的性能

静电设备性能测试通常需要使用一些专门的仪器来进行。以下是一些常见的静电设备性能测试仪器: 1. 静电电压测试仪:用于测量物体表面的静电电压。它通常可以测量正负电压,并具有高精度和快速响应的特点。 2. 静电电荷仪:用于测量物…

探讨uniapp的页面问题

1 新建页面 uni-app中的页面,默认保存在工程根目录下的pages目录下。 每次新建页面,均需在pages.json中配置pages列表; 未在pages.json -> pages 中注册的页面,uni-app会在编译阶段进行忽略。pages.json的完整配置参考&am…

新能源发电变流关键技术开发

新能源发电变流技术开发 文章目录 新能源发电变流技术开发前言新能源并网电力质量控制电能储存风能、光伏等行业新能源汽车新型城区的能源建设因此,可以说,新能源发电变流技术在电力系统与现代新能源行业中具有重要的应用地位,它对提高新能源发电的可靠性、电力质量、储能等…

基于Java+SpringBoot+Vue前后端分离常规应急物资管理系统设计和实现

博主介绍:✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专…

理解图傅里叶变换和图卷积

图神经网络(GNN)代表了一类强大的深度神经网络架构。在一个日益互联的世界里,因为信息的联通性,大部分的信息可以被建模为图。例如,化合物中的原子是节点,它们之间的键是边。 图神经网络的美妙之处在于它们…

CSS 属性值计算过程

目录 例子1&#xff0c;确定声明值2&#xff0c;层叠冲突2.1&#xff0c;比较源重要性2.2&#xff0c;比较优先级2.3&#xff0c;比较源次序 3&#xff0c;使用继承4&#xff0c;使用默认值其他 例子 我们来举例说明<h1> 标签最终的样式&#xff1a; <div><h1…

Hugo·Stack主题·杂记

运行环境 hugo版本&#xff1a;v0.117.0&#xff08;扩展版&#xff09; go&#xff1a;1.21.0 PowerShell 7&#xff08;x64&#xff09; Windows10 添加主题 git submodule add https://github.com/CaiJimmy/hugo-theme-stack/ themes/hugo-theme-stack修改配置文件 m…

C语言入门 Day_10 判断的进阶

目录 前言 1.多重判断 2.代码块 3.条件运算符 3.易错点 4.思维导图 前言 if和else能够处理两种不同的情况&#xff0c;如果&#xff08;if&#xff09;满足条件&#xff0c;我们就执行这几行代码&#xff1b;否则&#xff08;else&#xff09;的话&#xff0c;我们就执行…

快速排序三种思路详解!

一、快速排序的介绍 快速排序是Hoare于1962年提出的一种二叉树结构的交换排序方法&#xff0c;其基本思想为&#xff1a;任取待排序元素序列中 的某元素作为基准值&#xff0c;按照该排序码将待排序集合分割成两子序列&#xff0c;左子序列中所有元素均小于基准值&#xff0c;…

电脑excel文件误删怎么恢复?分享6种有效方法

在日常办公中&#xff0c;电脑中存储的Excel文件可能会遭遇误删的意外情况&#xff0c;如何快速恢复误删的Excel文件成为许多人关注的热点。本文将介绍几种有效的方法&#xff0c;希望能够帮助您恢复误删的Excel文件。 方法1、从回收站恢复 检查电脑的回收站&#xff0c;如果E…

数字孪生是什么?工厂数字孪生实例分析

数字孪生是建筑物或城市等物理实体的 3D 模型&#xff0c;数字孪生具有实时、连续的数据&#xff0c;可实时更新其功能和流程&#xff0c;从而为工程师提供分析和优化生产流程的数据支撑。简单来说&#xff0c;数字孪生是物理实体的 3D 模型&#xff0c;3D 模型的动画由真实实体…

行业冠军NANK南卡再添新高度,打造百元级开放式蓝牙耳机新标杆!

​最近&#xff0c;国内最受欢迎的开放式耳机品牌NANK南卡推出了一款名为OE CC的产品&#xff0c;它以0感0压为卖点。不断根据用户的反馈进行优化&#xff0c;现如今这款耳机正式在各大平台上架销售。它采用了先进的技术&#xff0c;重新定义了百元级别开放式耳机的三个标准。这…

GD32-舵机的原理

GD32-舵机的原理 舵机的现一脉宽与舵机转动角度 旋转编码器的原理 顺时针&#xff1a;A的下降沿时&#xff0c;B处于高电平&#xff1b; 逆时针&#xff1a;A的下降沿时&#xff0c;B处于低电平&#xff1b; #ifndef _ENCODER_DRIVE_H #define _ENCODER_DRIVE_H#include &quo…

Matlab图像处理运算方法-非线性点运算

常见的非线性灰度变换为对数变换和幂次变换。 对数变换 对数变换的一般表达式为&#xff1a; tclog(1s) 其中c为尺度比例常数&#xff0c;s为输入图像灰度值&#xff0c;t为变换后的输出图像灰度值。在如下图所示的对数曲线上&#xff0c;函数…

node-red - 读写操作redis

node-red - 读写操作redis 一、前期准备二、node-red安装redis节点三、node-red操作使用redis节点3.1 redis-out节点 - 存储数据到redis3.2 redis-in节点 - 查询redis数据 附录附录1&#xff1a;redis -out节点示例代码附录2&#xff1a;redis -in节点示例代码 一、前期准备 安…

ensp-IPsec vpn配置

ensp-IPsec vpn配置 &#x1f4ce;IPsec VPN配置.docx&#x1f4ce;IPSec.zip

Windows和Linux卸载anaconda的完整解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…