【Redis】特殊数据类型 Stream (流)

news2024/11/24 9:10:24

 

 🎯前言

除了五中基本的数据类型外,Redis还支持两种特殊的数据类型,第一种 Geo (地理位置):用于存储地理位置相关的数据,例如经纬度、距离等。第二种 Stream (流):是一个高级的列表类型,支持对列表的批量操作,如添加多个元素、获取多个元素等。

Redis Stream是Redis数据结构中的一种,用于处理基于事件的消息流。它提供了一种高度可扩展且高效的方式来处理大量的消息,并且可以很容易地与Redis的其他数据结构集成。它是一个可持久化的、可追溯的、能处理大量数据的消息流。

Redis Stream的基本单位是消息条目(Entry),每个消息条目包含一个消息体(Payload)和一些元数据(Metadata)。在Redis Stream中,消息条目按照时间顺序排列,可以理解为是一个时间线上的序列。


Redis Stream具有以下特点:

  1. 可持久化:Redis Stream中的数据可以被持久化保存,保证了数据的可靠性。
  2. 可追溯:Redis Stream提供了一种消息的追溯机制,可以追踪到消息的来源和处理过程。
  3. 高性能:Redis Stream支持高性能的读写操作,可以处理大量数据。
  4. 灵活性:Redis Stream提供了丰富的操作命令,可以对消息进行各种操作,如读取、写入、删除、标记等。

Redis Stream在很多场景中都有广泛的应用如:

  1. 消息队列:Redis Stream可以作为一个高性能的消息队列,用于处理大量的消息。
  2. 事件流:Redis Stream可以用于记录和处理各种事件,如系统日志、用户行为等。
  3. 通知系统:Redis Stream可以作为一个通知系统,用于将各种通知推送给用户或者系统。
  4. 社交网络:Redis Stream可以用于实现社交网络中的好友关系、消息传递等功能。

Redis Stream 的结构图:

Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

在 Redis 中发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息,pub/sub(发布/订阅)模式本身并不支持持久化。当一个订阅者(subscriber)订阅了一个频道(channel)后,它将在内存中保存该频道的状态以及接收到的消息。然而,如果订阅者断开了连接或者 Redis 服务器崩溃,它就无法恢复到之前的状态。

而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

  • 它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容
  • 每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。
  • Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
  • last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
  • pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。

🎯Stream (流相 关命令介绍)

Redis Stream的核心概念包括:

  • Fields:消息由一个或多个字段组成,每个字段有唯一的ID和对应的值。
  • Messages:由一个或多个字段组成的序列,形成一个完整的消息。
  • Fields和Messages都可以被视为Stream的基本单元。

消息队列相关命令:

  • XADD:用于向Stream中添加消息。它需要指定Stream名称、消息ID和消息内容。例如,XADD mystream * field1 value1 field2 value2表示将一个名为mystream的Stream中添加一条消息,消息ID为*,消息内容为两个字段field1和field2及其对应的值value1和value2。
  • XTRIM:用于修剪Stream,只保留指定范围内的消息。它需要指定Stream名称和消息ID范围。例如,XTRIM mystream 0 100表示保留mystream中的前100条消息,从第0条消息开始。
  • XDEL:用于删除Stream中的指定消息。它需要指定Stream名称和消息ID。例如,XDEL mystream message-id表示删除名为mystream的Stream中指定的消息ID为message-id的消息。
  • XLEN:用于获取Stream中消息的数量。它需要指定Stream名称。例如,XLEN mystream表示获取名为mystream的Stream中消息的数量。
  • XRANGE:用于获取Stream中指定范围内的消息。它需要指定Stream名称和消息ID范围。例如,XRANGE mystream 0 100表示获取名为mystream的Stream中的前100条消息,从第0条消息开始。
  • XREVRANGE:与XRANGE类似,但返回的消息顺序是逆序的,即最新的消息排在前面。
  • XREAD:用于读取Stream中的消息。它可以按照指定的规则过滤和排序消息。例如,XREAD STREAM mystream *表示读取名为`mystream

 消费者组相关命令:

  • XGROUP CREATE:用于创建一个新的消费者组。它需要指定Stream名称、消费者组名称和分片ID。例如,XGROUP CREATE mystream mygroup 0表示创建一个名为mygroup的消费者组,并指定分片ID为0。
  • XREADGROUP GROUP:用于读取指定消费者组的消息。它需要指定Stream名称、消费者组名称和分片ID。例如,XREADGROUP GROUP mygroup consumer 0表示读取名为mygroup的消费者组中分片ID为0的消息,并指定一个名为consumer的消费者。
  • XACK:用于确认已处理的消息。它需要指定Stream名称、消息ID和消费者组名称。例如,XACK mystream message-id mygroup表示确认名为mygroup的消费者组中处理过的消息ID为message-id的消息。
  • XGROUP SETID:用于设置消费者组的偏移量。它需要指定Stream名称、消息ID和消费者组名称。例如,XGROUP SETID mystream message-id mygroup表示将名为mygroup的消费者组的偏移量设置为消息ID为message-id的消息。
  • XGROUP DELCONSUMER:用于删除指定的消费者。它需要指定Stream名称和消费者组名称。例如,XGROUP DELCONSUMER mystream mygroup consumer2表示删除名为mygroup的消费者组中名为consumer2的消费者。
  • XGROUP DESTROY:用于销毁指定的消费者组。它需要指定Stream名称和消费者组名称。例如,XGROUP DESTROY mystream mygroup表示销毁名为mygroup的消费者组。
  • XPENDING:用于获取未处理的消息。它需要指定Stream名称和消费者组名称。例如,XPENDING mystream mygroup表示获取名为mygroup的消费者组中未处理的消息。
  • XCLAIM:用于转移未处理的消息所有权。它需要指定Stream名称、消息ID和目标消费者组名称。例如,XCLAIM mystream message-id mygroup consumer2表示将名为`mygroupRedis

Redis Stream还支持一些高级特性如:

  • 消费者组:可以将多个消费者组织成一个消费者组,以便同时处理同一个Stream的消息。
  • 消息ID生成器:可以使用自动或手动配置的消息ID生成器来生成唯一的消息ID。
  • 事务处理:可以将多个操作打包成一个事务,以确保Stream的一致性和可靠性。

🎯Stream (流相 关命令操作)

XADD (添加消息到末尾)

说明:

XADD是一个用于将新元素添加到Redis的有序集合(sorted set)的命令。它可以用于实现简单的消息队列。

语法:

XADD key ID field string [field string ...]
  1. key:Stream的名称。
  2. ID:消息ID,可以是一个自动生成的UUID,也可以是一个字符串。如果省略消息ID,则Redis会自动生成一个唯一的消息ID。
  3. field:消息中的字段名。
  4. string:消息中的字段对应的字符串值。
  5. ...:可以多次使用,直到添加完所有的字段。

示例:

127.0.0.1:6379> XADD mystream * field1 value1 field2 value2
"1688711543963-0"
127.0.0.1:6379> XADD mystream * name age mr.xiao 25
"1688711752464-0"
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1688711543963-0"
   2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
2) 1) "1688711752464-0"
   2) 1) "name"
      2) "age"
      3) "mr.xiao"
      4) "25"
127.0.0.1:6379>

XTRIM (对流进行修剪,限制长度)

说明:

XTRIM命令用于修剪(trim)有序集合(sorted set)的成员数量,只保留在指定数量范围内的成员。

语法:

XTRIM key MAXLEN [~] count
  1. key:要修剪的有序集合的键值。
  2. MAXLEN:要保留的最大成员数量。当有序集合的成员数量大于该值时,命令将修剪掉多余的成员。
  3. [~]:限制修剪操作的阈值,表示只要有序集合的长度超过该阈值,就执行修剪操作。
  4. count:要保留的最小成员数量。当有序集合的成员数量小于该值时,命令将修剪掉多余的成员。

示例:

XTRIM mystream MAXLEN 1

 这个命令将修剪名为"mystream"的有序集合,使其成员数量最多为1。


XDEL (删除消息)

说明:

XDEL命令用于从Redis的有序集合(sorted set)中删除一个或多个指定成员。

语法:

XDEL key ID [ID ...]
  1. key:要删除成员的有序集合的键值。
  2. ID:要删除的成员的成员属性值,可以有多个参数,每个参数表示一个要删除的成员。

示例:

XDEL mystream 1 2 1688711752464-0

 这将从名为"mystream"的有序集合中删除ID为1和2的两个成员。


XLEN (获取流包含的元素数量,即消息长度)

说明:

XLEN命令用于获取Redis有序集合(sorted set)的长度,即其中包含的成员数量。

语法:

XLEN key
  1. key:要查询长度的有序集合的键值。

示例:

XLEN mystream

 这将返回名为"mystream"的有序集合的长度,即其中包含的成员数量。


XRANGE (获取消息列表,会自动过滤已经删除的消息)

说明:

XRANGE命令用于从Redis的有序集合(sorted set)中查询指定范围内的成员,并按照成员属性的值进行排序。

语法:

XRANGE key start end [COUNT count]
  1. key:要查询的有序集合的键值。
  2. start:要查询的范围的起始成员属性值(包含)。
  3. end:要查询的范围的结束成员属性值(不包含)。
  4. [COUNT count]:要返回的成员数量,默认为所有成员。

示例:

127.0.0.1:6379> XADD emp * name xiaowang age 18
"1688716368510-0"
127.0.0.1:6379> XADD emp * name xiaoli age 23
"1688716375775-0"
127.0.0.1:6379> XADD emp * name ergo age 25
"1688716381880-0"
127.0.0.1:6379> XADD emp * name xiaohei age 27
"1688716387217-0"
127.0.0.1:6379> XADD emp * name xiaosong age 32
"1688716392174-0"
127.0.0.1:6379> XLEN emp
(integer) 5
127.0.0.1:6379> XRANGE emp - + COUNT 2
1) 1) "1688716368510-0"
   2) 1) "name"
      2) "xiaowang"
      3) "age"
      4) "18"
2) 1) "1688716375775-0"
   2) 1) "name"
      2) "xiaoli"
      3) "age"
      4) "23"
127.0.0.1:6379>

XREVRANGE (反向获取消息列表,ID 从大到小)

说明:

XREVRANGE命令用于从Redis的有序集合(sorted set)中查询指定范围内的成员,并按照成员属性的值进行逆序排序。

语法:

XREVRANGE key end start [COUNT count]
  1. key:要查询的有序集合的键值。
  2. end:要查询的范围的结束成员属性值(不包含)。
  3. start:要查询的范围的起始成员属性值(包含)。
  4. [COUNT count]:要返回的成员数量,默认为所有成员。

示例:

127.0.0.1:6379> XADD emp * name xiaowang age 18
"1688716368510-0"
127.0.0.1:6379> XADD emp * name xiaoli age 23
"1688716375775-0"
127.0.0.1:6379> XADD emp * name ergo age 25
"1688716381880-0"
127.0.0.1:6379> XADD emp * name xiaohei age 27
"1688716387217-0"
127.0.0.1:6379> XADD emp * name xiaosong age 32
"1688716392174-0"
127.0.0.1:6379> XLEN emp
(integer) 5
127.0.0.1:6379> XREVRANGE emp + - COUNT 1
1) 1) "1688716392174-0"
   2) 1) "name"
      2) "xiaosong"
      3) "age"
      4) "32"
127.0.0.1:6379>

XREAD (以阻塞或非阻塞方式获取消息列表)

说明:

多个有序集合中读取指定数量的消息,并将这些消息作为流(stream)返回。

语法:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
  1. COUNT count:要读取的消息数量。
  2. BLOCK milliseconds:如果使用该参数,命令将在指定的毫秒数内等待,直到有足够数量的消息可用或超时。
  3. STREAMS key [key ...]:要读取消息的有序集合的键值,可以有多个参数。
  4. ID [ID ...]:要读取的消息的成员属性值,可以有多个参数。

示例:

127.0.0.1:6379> XREAD COUNT 2 STREAMS mystream emp 0-0 0-0
1) 1) "mystream"
   2) 1) 1) "1688714670580-0"
         2) 1) "name"
            2) "age"
            3) "mr.xiao"
            4) "25"
2) 1) "emp"
   2) 1) 1) "1688716368510-0"
         2) 1) "name"
            2) "xiaowang"
            3) "age"
            4) "18"
      2) 1) "1688716375775-0"
         2) 1) "name"
            2) "xiaoli"
            3) "age"
            4) "23"
127.0.0.1:6379>

XGROUP CREATE (创建消费者组)

说明:

XGROUP CREATE命令用于创建一个新的消费者组

语法:

XGROUP CREATE [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER k
  1. CREATE key groupname id-or-$:创建一个新的消费者组,其中key是指Stream的名称,groupname是指消费者组的名称,id-or-$是指分片ID。如果省略分片ID参数,则会自动分配一个唯一的ID。
  2. SETID key id-or-$:设置消费者组的偏移量。其中key是指Stream的名称,id-or-$是指消息ID,用于指定一个唯一的消息ID作为消费者组的偏移量。
  3. DESTROY key groupname:销毁指定的消费者组。其中key是指Stream的名称,groupname是指消费者组的名称。
  4. DELCONSUMER key groupname consumer:删除指定的消费者。其中key是指Stream的名称,groupname是指消费者组的名称,consumer是指消费者的名称。

示例:

从头开始消费:

XGROUP CREATE mystream consumer-group-name 0-0 

这个命令将创建一个名为mygroup的消费者组,并将其添加到名为mystream的Stream中,并将该消费者组绑定到从第0条消息到第0条消息的范围内。

从尾部开始消费:

XGROUP CREATE mystream consumer-group-name $

这个命令将创建一个名为mygroup的消费者组,并将其偏移量设置为最新消息的偏移量,确保该消费者组可以正确地处理后续的消息。


XREADGROUP GROUP (读取消费组中的消息)

说明:

读取指定消费者组的消息。

语法:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
  1. GROUP group consumer:指定要读取的消费者组的名称和消费者名称。
  2. COUNT count:指定要读取的消息数量。
  3. BLOCK milliseconds:指定在读取消息之前等待的毫秒数。
  4. STREAMS key [key ...]:指定要读取的Stream名称,可以同时指定多个Stream。
  5. ID ID ...:指定要读取的消息ID范围,可以是起始消息ID和结束消息ID之间的范围,也可以是一个具体的消息ID。

示例:

XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >

这个命令将从名为mystream的Stream中读取并锁定第一条未被锁定的消息,并将其分配给名为myconsumer的消费者。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/747286.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

学习系统编程No.30【多线程控制实战】

引言: 北京时间:2023/7/7/9:58,耳机正在充电中,所以刚好让我们先把引言写一写,昨天睡觉前听了一会小说,听小说的好处就在于,它可以让你放下手机,快速睡觉,并且还有一定的…

python批量检测网站是否能打开

import requestsif __name__ "__main__":file_name input() #读取文件名fp1 open(file_name, "r") #以只读,打开文件for line in fp1.readlines(): #readlines 按行读取文件,会保留\n,返回一个(文…

3. MySQL - 数据类型 选项约束

目录 回顾 1. 命令行下的 MySql 客户端 2. 图形化界面的 MySQL-Client 3. 数据库概述 3.1 数据库管理系统是什么 3.2 工作模式 3.3 RDBMS 管理数据的结构 3.4 客户端连接服务器的信息 4. MySQL 中的数据类型 4.1 整型类型 4.2 字符串 4.3 日期/时间 5. MySQL 每个字…

【Ubuntu18.04 解决蓝牙wifi 之ax201无线网卡驱动安装】

【Ubuntu18.04 解决蓝牙wifi 之ax201无线网卡驱动安装】 1. 前言2. 更新linux内核 3. 下载安装intel ax201网卡驱动 1. 前言 台式机安装了双系统win11Ubuntu18.04系统,发现没有无线网卡和蓝牙,经查阅资料发现由于网卡刚没多久,Ubuntu没有集成…

服务器反向代理

反向代理作用 隐藏服务器信息 -> 保证内网的安全,通常将反向代理作为公网访问地址,web服务器是内网,即通过nginx配置外网访问web服务器内网 举例 百度的网址是:http://www.baidu.com , 现在我通过自己的服务器地…

Figma源文件导出技巧:提升效率的简易步骤

因为Figma,sketch,xd都支持导入sketch格式,所以我们只要将文件格式转成sketch,就能自由的在不同软件间导入导出。 现在就有一个网站可以帮助你快速简单的导入Figma、Sketch、XD 等格式文件,,还可以导出 Sketch 文件满足跨工具协作…

Nacos 服务注册和配置中心

文章目录 1 应用1.1 依赖1.2 配置文件 2 Nacos发现实例模型3 注册中心对比4 Nacos 支持AP和CP模式的切换4.1 何时选择何种模式? 5 Nacos 服务配置10.5.1 SpringCloud原生注解RefreshScope5.2 配置5.3 分类设计思想 6 Nacos 集群是持久化配置6.1 Nacos支持三种部署模…

Redis实战案例16-redisson的快速入门

1. 可能存在的问题 不可重入:基于SETNX实现的简单分布式锁通常不支持可重入性,即同一个客户端在获取锁后不能再次获取锁,否则会导致死锁。不可重试:如果多个客户端同时尝试获取锁但都失败了,并且没有重试机制&#xff…

基于RWKV-Runner大语言模型系统

RWKV Runner 旨在消除大语言模型的使用门槛,全自动处理AI对话,并且提供了OpenAI API兼容的接口。使用起来简单方便,但是还是比较吃机器,显存2G到32G都可以使用,根据自己的模型选择即可。 总结起来: 使用起来方便简单,上手容易。需要有电脑基础,很多地方还不是傻瓜化。需…

LayUI框架——选项卡等element组件使用

目录 前言 一、element组件 1. element基础方法 2. 更新渲染 二、动态实现选项卡 要求 1. 优化dao类 2. 优化前端JSP页面 3. 引入头部hand.jsp页面 4. 优化后台主界面js 5. 运行效果图 前言 在项目中我们需要编写许多页面,在页面中有许多元素需要自动去完…

【Unity编辑器扩展】字库裁剪工具, 优化字体文件大小,批量修改文本组件字体

原理: 1. 扫描项目中用到的字符集; 2. 把字体文件裁剪掉没用到的字符,仅保留项目中用到的字符; 3. 生成裁剪后的字体文件; 工具功能设计: 1. 支持通过拖拽字体文件或文件夹批量选择需要裁剪的字体文件。…

网络安全设备Bypass功能介绍及分析

网络安全平台厂商往往需要用到一项比较特殊的技术,那就是Bypass,那么到底什么是Bypass呢,Bypass设备又是如何来实现的?下面我就对Bypass技术做一下简单的介绍和说明。 一、 什么是Bypass。 大家知道,网络安全设备一般…

mac安装Golang开发环境及入门

目录 一、Mac brew 安装go环境 1.1 安装步骤 1.2 设置GOPATH 及环境变量 1.3 编写第一个go程序 二、快速入门 1.1 快速入门需求 1.2 go学习(自用) 一、Mac brew 安装go环境 1.1 安装步骤 1)终端输入,也可以指定下载go版本…

SPSSAU方差分析+python

准备数据 将数据格式调整为以下格式: jupyter处理过程 #读取数据 import numpy as np import pandas as pd# 创建一个空的DataFrame t1 pd.DataFrame() t2 pd.DataFrame() t3 pd.DataFrame() T1pd.read_excel(./数据/抑郁_T1.xlsx)T1.columnsT1.iloc[0] T1T1…

模板类与继承

模板类与继承 模板类继承普通类普通类继承模板类的实例化版本。普通类继承模板类模板类继承模板类模板类继承模板参数给出的基类 模板类继承普通类 基类 派生类 测试函数; 普通类继承模板类的实例化版本。 模板基类 普通类继承模板基类的实例化版本: 普通…

PROFINET转DeviceNet网关devicenet通讯模块

远创智控YC-DNT-PN这款神器,连接PROFINET和DeviceNet网络,让两边数据轻松传输。 这个网关不仅从ETHERNET/IP和DEVICENET一侧读写数据,还可以将缓冲区数据交换,这样就可以在两个网络之间愉快地传递数据了!而且&#xff…

云计算的学习(三)

三、云计算中的网络基础知识 3.1虚拟化中网络的架构 a.虚拟化中网络的架构 二层交换机作为接入交换机使用,三层交换机可以作为汇聚交换机或核心交换机,在抛开网络安全设备时,路由器直接连接在互联网上。 b.广播和单播 物理服务器内部主要…

【单片机】MSP430单片机,msp430f5529,DHT11 温湿度检测仪,上限报警,单击双击判定,OLED

文章目录 功能接线示意图效果图原理 功能 硬件 IIC OLED 0.96寸 无源蜂鸣器 低电平触发 DHT11 温湿度传感器 板子上的2个按键 板子上的2个灯 功能: 1 显示温湿度 2 按键单击双击识别 3 按键修改温湿度,双击选择某一个设置项目,单击进行加或…

Python(五):print函数详解

❤️ 专栏简介:本专栏记录了我个人从零开始学习Python编程的过程。在这个专栏中,我将分享我在学习Python的过程中的学习笔记、学习路线以及各个知识点。 ☀️ 专栏适用人群 :本专栏适用于希望学习Python编程的初学者和有一定编程基础的人。无…