🎯前言
除了五中基本的数据类型外,Redis还支持两种特殊的数据类型,第一种 Geo (地理位置):用于存储地理位置相关的数据,例如经纬度、距离等。第二种 Stream (流):是一个高级的列表类型,支持对列表的批量操作,如添加多个元素、获取多个元素等。
Redis Stream是Redis数据结构中的一种,用于处理基于事件的消息流。它提供了一种高度可扩展且高效的方式来处理大量的消息,并且可以很容易地与Redis的其他数据结构集成。它是一个可持久化的、可追溯的、能处理大量数据的消息流。
Redis Stream的基本单位是消息条目(Entry),每个消息条目包含一个消息体(Payload)和一些元数据(Metadata)。在Redis Stream中,消息条目按照时间顺序排列,可以理解为是一个时间线上的序列。
Redis Stream具有以下特点:
- 可持久化:Redis Stream中的数据可以被持久化保存,保证了数据的可靠性。
- 可追溯:Redis Stream提供了一种消息的追溯机制,可以追踪到消息的来源和处理过程。
- 高性能:Redis Stream支持高性能的读写操作,可以处理大量数据。
- 灵活性:Redis Stream提供了丰富的操作命令,可以对消息进行各种操作,如读取、写入、删除、标记等。
Redis Stream在很多场景中都有广泛的应用如:
- 消息队列:Redis Stream可以作为一个高性能的消息队列,用于处理大量的消息。
- 事件流:Redis Stream可以用于记录和处理各种事件,如系统日志、用户行为等。
- 通知系统:Redis Stream可以作为一个通知系统,用于将各种通知推送给用户或者系统。
- 社交网络:Redis Stream可以用于实现社交网络中的好友关系、消息传递等功能。
Redis Stream 的结构图:
Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。
在 Redis 中发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息,pub/sub(发布/订阅)模式本身并不支持持久化。当一个订阅者(subscriber)订阅了一个频道(channel)后,它将在内存中保存该频道的状态以及接收到的消息。然而,如果订阅者断开了连接或者 Redis 服务器崩溃,它就无法恢复到之前的状态。
而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
- 它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容
- 每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。
- Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
- last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
- pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。
🎯Stream (流相 关命令介绍)
Redis Stream的核心概念包括:
- Fields:消息由一个或多个字段组成,每个字段有唯一的ID和对应的值。
- Messages:由一个或多个字段组成的序列,形成一个完整的消息。
- Fields和Messages都可以被视为Stream的基本单元。
消息队列相关命令:
- XADD:用于向Stream中添加消息。它需要指定Stream名称、消息ID和消息内容。例如,XADD mystream * field1 value1 field2 value2表示将一个名为mystream的Stream中添加一条消息,消息ID为*,消息内容为两个字段field1和field2及其对应的值value1和value2。
- XTRIM:用于修剪Stream,只保留指定范围内的消息。它需要指定Stream名称和消息ID范围。例如,XTRIM mystream 0 100表示保留mystream中的前100条消息,从第0条消息开始。
- XDEL:用于删除Stream中的指定消息。它需要指定Stream名称和消息ID。例如,XDEL mystream message-id表示删除名为mystream的Stream中指定的消息ID为message-id的消息。
- XLEN:用于获取Stream中消息的数量。它需要指定Stream名称。例如,XLEN mystream表示获取名为mystream的Stream中消息的数量。
- XRANGE:用于获取Stream中指定范围内的消息。它需要指定Stream名称和消息ID范围。例如,XRANGE mystream 0 100表示获取名为mystream的Stream中的前100条消息,从第0条消息开始。
- XREVRANGE:与XRANGE类似,但返回的消息顺序是逆序的,即最新的消息排在前面。
- XREAD:用于读取Stream中的消息。它可以按照指定的规则过滤和排序消息。例如,XREAD STREAM mystream *表示读取名为`mystream
消费者组相关命令:
- XGROUP CREATE:用于创建一个新的消费者组。它需要指定Stream名称、消费者组名称和分片ID。例如,XGROUP CREATE mystream mygroup 0表示创建一个名为mygroup的消费者组,并指定分片ID为0。
- XREADGROUP GROUP:用于读取指定消费者组的消息。它需要指定Stream名称、消费者组名称和分片ID。例如,XREADGROUP GROUP mygroup consumer 0表示读取名为mygroup的消费者组中分片ID为0的消息,并指定一个名为consumer的消费者。
- XACK:用于确认已处理的消息。它需要指定Stream名称、消息ID和消费者组名称。例如,XACK mystream message-id mygroup表示确认名为mygroup的消费者组中处理过的消息ID为message-id的消息。
- XGROUP SETID:用于设置消费者组的偏移量。它需要指定Stream名称、消息ID和消费者组名称。例如,XGROUP SETID mystream message-id mygroup表示将名为mygroup的消费者组的偏移量设置为消息ID为message-id的消息。
- XGROUP DELCONSUMER:用于删除指定的消费者。它需要指定Stream名称和消费者组名称。例如,XGROUP DELCONSUMER mystream mygroup consumer2表示删除名为mygroup的消费者组中名为consumer2的消费者。
- XGROUP DESTROY:用于销毁指定的消费者组。它需要指定Stream名称和消费者组名称。例如,XGROUP DESTROY mystream mygroup表示销毁名为mygroup的消费者组。
- XPENDING:用于获取未处理的消息。它需要指定Stream名称和消费者组名称。例如,XPENDING mystream mygroup表示获取名为mygroup的消费者组中未处理的消息。
- XCLAIM:用于转移未处理的消息所有权。它需要指定Stream名称、消息ID和目标消费者组名称。例如,XCLAIM mystream message-id mygroup consumer2表示将名为`mygroupRedis
Redis Stream还支持一些高级特性如:
- 消费者组:可以将多个消费者组织成一个消费者组,以便同时处理同一个Stream的消息。
- 消息ID生成器:可以使用自动或手动配置的消息ID生成器来生成唯一的消息ID。
- 事务处理:可以将多个操作打包成一个事务,以确保Stream的一致性和可靠性。
🎯Stream (流相 关命令操作)
XADD (添加消息到末尾)
说明:
XADD是一个用于将新元素添加到Redis的有序集合(sorted set)的命令。它可以用于实现简单的消息队列。
语法:
XADD key ID field string [field string ...]
- key:Stream的名称。
- ID:消息ID,可以是一个自动生成的UUID,也可以是一个字符串。如果省略消息ID,则Redis会自动生成一个唯一的消息ID。
- field:消息中的字段名。
- string:消息中的字段对应的字符串值。
- ...:可以多次使用,直到添加完所有的字段。
示例:
127.0.0.1:6379> XADD mystream * field1 value1 field2 value2
"1688711543963-0"
127.0.0.1:6379> XADD mystream * name age mr.xiao 25
"1688711752464-0"
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1688711543963-0"
2) 1) "field1"
2) "value1"
3) "field2"
4) "value2"
2) 1) "1688711752464-0"
2) 1) "name"
2) "age"
3) "mr.xiao"
4) "25"
127.0.0.1:6379>
XTRIM (对流进行修剪,限制长度)
说明:
XTRIM命令用于修剪(trim)有序集合(sorted set)的成员数量,只保留在指定数量范围内的成员。
语法:
XTRIM key MAXLEN [~] count
- key:要修剪的有序集合的键值。
- MAXLEN:要保留的最大成员数量。当有序集合的成员数量大于该值时,命令将修剪掉多余的成员。
- [~]:限制修剪操作的阈值,表示只要有序集合的长度超过该阈值,就执行修剪操作。
- count:要保留的最小成员数量。当有序集合的成员数量小于该值时,命令将修剪掉多余的成员。
示例:
XTRIM mystream MAXLEN 1
这个命令将修剪名为"mystream"的有序集合,使其成员数量最多为1。
XDEL (删除消息)
说明:
XDEL命令用于从Redis的有序集合(sorted set)中删除一个或多个指定成员。
语法:
XDEL key ID [ID ...]
- key:要删除成员的有序集合的键值。
- ID:要删除的成员的成员属性值,可以有多个参数,每个参数表示一个要删除的成员。
示例:
XDEL mystream 1 2 1688711752464-0
这将从名为"mystream"的有序集合中删除ID为1和2的两个成员。
XLEN (获取流包含的元素数量,即消息长度)
说明:
XLEN命令用于获取Redis有序集合(sorted set)的长度,即其中包含的成员数量。
语法:
XLEN key
- key:要查询长度的有序集合的键值。
示例:
XLEN mystream
这将返回名为"mystream"的有序集合的长度,即其中包含的成员数量。
XRANGE (获取消息列表,会自动过滤已经删除的消息)
说明:
XRANGE命令用于从Redis的有序集合(sorted set)中查询指定范围内的成员,并按照成员属性的值进行排序。
语法:
XRANGE key start end [COUNT count]
- key:要查询的有序集合的键值。
- start:要查询的范围的起始成员属性值(包含)。
- end:要查询的范围的结束成员属性值(不包含)。
- [COUNT count]:要返回的成员数量,默认为所有成员。
示例:
127.0.0.1:6379> XADD emp * name xiaowang age 18
"1688716368510-0"
127.0.0.1:6379> XADD emp * name xiaoli age 23
"1688716375775-0"
127.0.0.1:6379> XADD emp * name ergo age 25
"1688716381880-0"
127.0.0.1:6379> XADD emp * name xiaohei age 27
"1688716387217-0"
127.0.0.1:6379> XADD emp * name xiaosong age 32
"1688716392174-0"
127.0.0.1:6379> XLEN emp
(integer) 5
127.0.0.1:6379> XRANGE emp - + COUNT 2
1) 1) "1688716368510-0"
2) 1) "name"
2) "xiaowang"
3) "age"
4) "18"
2) 1) "1688716375775-0"
2) 1) "name"
2) "xiaoli"
3) "age"
4) "23"
127.0.0.1:6379>
XREVRANGE (反向获取消息列表,ID 从大到小)
说明:
XREVRANGE命令用于从Redis的有序集合(sorted set)中查询指定范围内的成员,并按照成员属性的值进行逆序排序。
语法:
XREVRANGE key end start [COUNT count]
- key:要查询的有序集合的键值。
- end:要查询的范围的结束成员属性值(不包含)。
- start:要查询的范围的起始成员属性值(包含)。
- [COUNT count]:要返回的成员数量,默认为所有成员。
示例:
127.0.0.1:6379> XADD emp * name xiaowang age 18
"1688716368510-0"
127.0.0.1:6379> XADD emp * name xiaoli age 23
"1688716375775-0"
127.0.0.1:6379> XADD emp * name ergo age 25
"1688716381880-0"
127.0.0.1:6379> XADD emp * name xiaohei age 27
"1688716387217-0"
127.0.0.1:6379> XADD emp * name xiaosong age 32
"1688716392174-0"
127.0.0.1:6379> XLEN emp
(integer) 5
127.0.0.1:6379> XREVRANGE emp + - COUNT 1
1) 1) "1688716392174-0"
2) 1) "name"
2) "xiaosong"
3) "age"
4) "32"
127.0.0.1:6379>
XREAD (以阻塞或非阻塞方式获取消息列表)
说明:
多个有序集合中读取指定数量的消息,并将这些消息作为流(stream)返回。
语法:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
- COUNT count:要读取的消息数量。
- BLOCK milliseconds:如果使用该参数,命令将在指定的毫秒数内等待,直到有足够数量的消息可用或超时。
- STREAMS key [key ...]:要读取消息的有序集合的键值,可以有多个参数。
- ID [ID ...]:要读取的消息的成员属性值,可以有多个参数。
示例:
127.0.0.1:6379> XREAD COUNT 2 STREAMS mystream emp 0-0 0-0
1) 1) "mystream"
2) 1) 1) "1688714670580-0"
2) 1) "name"
2) "age"
3) "mr.xiao"
4) "25"
2) 1) "emp"
2) 1) 1) "1688716368510-0"
2) 1) "name"
2) "xiaowang"
3) "age"
4) "18"
2) 1) "1688716375775-0"
2) 1) "name"
2) "xiaoli"
3) "age"
4) "23"
127.0.0.1:6379>
XGROUP CREATE (创建消费者组)
说明:
XGROUP CREATE命令用于创建一个新的消费者组
语法:
XGROUP CREATE [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER k
- CREATE key groupname id-or-$:创建一个新的消费者组,其中key是指Stream的名称,groupname是指消费者组的名称,id-or-$是指分片ID。如果省略分片ID参数,则会自动分配一个唯一的ID。
- SETID key id-or-$:设置消费者组的偏移量。其中key是指Stream的名称,id-or-$是指消息ID,用于指定一个唯一的消息ID作为消费者组的偏移量。
- DESTROY key groupname:销毁指定的消费者组。其中key是指Stream的名称,groupname是指消费者组的名称。
- DELCONSUMER key groupname consumer:删除指定的消费者。其中key是指Stream的名称,groupname是指消费者组的名称,consumer是指消费者的名称。
示例:
从头开始消费:
XGROUP CREATE mystream consumer-group-name 0-0
这个命令将创建一个名为mygroup
的消费者组,并将其添加到名为mystream
的Stream中,并将该消费者组绑定到从第0条消息到第0条消息的范围内。
从尾部开始消费:
XGROUP CREATE mystream consumer-group-name $
这个命令将创建一个名为mygroup
的消费者组,并将其偏移量设置为最新消息的偏移量,确保该消费者组可以正确地处理后续的消息。
XREADGROUP GROUP (读取消费组中的消息)
说明:
读取指定消费者组的消息。
语法:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
- GROUP group consumer:指定要读取的消费者组的名称和消费者名称。
- COUNT count:指定要读取的消息数量。
- BLOCK milliseconds:指定在读取消息之前等待的毫秒数。
- STREAMS key [key ...]:指定要读取的Stream名称,可以同时指定多个Stream。
- ID ID ...:指定要读取的消息ID范围,可以是起始消息ID和结束消息ID之间的范围,也可以是一个具体的消息ID。
示例:
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
这个命令将从名为mystream的Stream中读取并锁定第一条未被锁定的消息,并将其分配给名为myconsumer的消费者。