redis【stream】:对redis流数据类型的详细介绍

news2024/9/22 1:21:53

目录

stream产生原因

stream的概念

stream底层实现

stream的常用指令

常用命令一览:

xadd命令

xread命令

xlen命令

xrange命令

xrevrange命令

xtrim命令

xdel命令

xgroup命令

xinfo命令

xpending命令

xreadgroup命令

xack命令

xclaim命令


stream产生原因

redis在设计之初,就试图在保证自身缓存作用在市场上占优的基础上开发与MQ类似的消息队列,以增强自己在市场中的竞争优势,在redis1.0时,我们使用list就能模拟实现一个简单的消息队列,按照插入顺序排序,你可以添加一个元素到列表的头部(左边)或者尾部(右边)。所以常用来做异步队列使用,将需要延后处理的任务结构体序列化成字符串塞进 Redis 的列表,另一个线程从这个列表中轮询数据进行处理。具体如下图:

但是使用list模拟消息队列存在很大的痛点:类似于lpush和lpop这样的指令只能实现点对点的模式,一旦涉及(pub/sub)这样的发布订阅的场景(一对多),list无法满足;除此之外,list存在的一个很大的痛点问题是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。而且也没有 Ack 机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了。所以在redis5.0之后,stream这一更强大的数据结构应运而生

stream的概念

stream是redis5.0之后引入的数据类型,用一句话说,stream是redis版本的MQ中间件+阻塞队列,Stream流实现消息队列,它支持消息的持久化、支持自动生成全局唯一 ID、支持ack确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠

stream底层实现

我们通过一张图和一个表格对redis的底层实现进行说明 :

一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容

1
Message Content
消息内容
2
Consumer group
消费组,通过XGROUP CREATE 命令创建,同一个消费组可以有多个消费者
3
Last_delivered_id
游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
4
Consumer
消费者,消费组中的消费者
5
Pending_ids
消费者会有一个状态变量,用于记录被当前消费已读取但未ack的消息Id,如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack它就开始减少。这个pending_ids变量在Redis官方被称之为 PEL(Pending Entries List),记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符),它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理

stream的常用指令

常用命令一览:

队列相关指令

 消费者相关指令

 四个特殊符号:

xadd命令

语法格式为:

XADD key ID field value [field value ...]

  • key,用来指定 stream 的名字
  • ID,用来指定 ID 值,最常用的是 *
  • field value [field value ...],key-value类型数据

xadd 用来在指定的 key 中添加消息,如果 key 不存在,则自动创建。添加的消息为 key-value 类型,可以一次添加多个消息。

指定 ID,最常用的是 *,表示由 redis 自动生成 ID,自动生成的 ID 为 1526919030474-55 格式,由毫秒时间戳和序列号组成,序列号用于区分同一毫秒内生成的消息,保证 ID 始终是递增的。如果由于一些其他原因系统时钟慢了,导致生成的时间戳小于了 redis 中记录的值,则会取系统中记录的最大值继续递增,保证 ID 的递增状态。

1
2
3
4

127.0.0.1:6379> xadd message 1 key1 value1 key2 value2
"1-0"
127.0.0.1:6379> xadd message 1-2 key1 value1 key2 value2
"1-2"

ID 在一般情况下是由 redis 自动指定的,但其实 ID 也是可以自定义的,为了保证 ID 的自增状态,手动指定的 ID 必须要大于系统中存在的 ID,只不过一般不这么做。

1
2

127.0.0.1:6379> xadd message * key1 value1 key2 value2
"1604475735664-0"

以上命令添加了 key1-value 和 key2-value2 两条消息到 message 这个 key 中,返回值为当前消息的 ID,由 redis 自动生成,此时消息队列中就有一条消息可以被读取了。

1
2

127.0.0.1:6379> xadd message maxlen 10 * key3 value3
"1604476672762-0"

maxlen 参数用来限制 key 中消息的最大数量,但是精确限制 key 中消息的数量是低效的,可以使用 ~ 符号粗略的限制 key 中消息的数量,redis 会在可以删除整个宏结点时才去删除多余的消息,实际数量可能会比限制数量多几十个,这是正常的,但是不会少于限制的数量。

xread命令

语法格式为:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

  • [COUNT count],用来获取消息的数量
  • [BLOCK milliseconds],用来设置阻塞模式和阻塞超时时间,默认为非阻塞
  • id [id ...],用来设置读取的起始 ID,相当于 where id >来获取最新的消息 ID,非阻塞模式下无意义。
  • key,指定 stream 的名字

xread 命令用于从一个或多个 key 中读取消息,仅返回 ID 值大于参数中传入的 ID 的消息。此命令有阻塞用法和非阻塞用法,如果 key 中没有消息,则返回空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

127.0.0.1:6379> xread streams message 0
1) 1) "message"
   2) 1) 1) "1-0"
         2) 1) "key1"
            2) "value1"
            3) "key2"
            4) "value2"
      2) 1) "1-2"
         2) 1) "key1"
            2) "value1"
            3) "key2"
            4) "value2"
      3) 1) "1604476672762-0"
         2) 1) "key3"
            2) "value3"

以上命令在非阻塞模式输出了所有的消息,因为不存在 ID 比 0 还小的消息,所以输出了所有的消息。

阻塞模式中,可以使用 $ 符号来获取最新的消息。如果在指定超时时间内没有新的消息,则返回空。

1
2
3
4
5
6
7
8
9

127.0.0.1:6379> xread count 10 block 10000 streams message $
(nil)
(10.02s)
127.0.0.1:6379> xread count 10 block 10000 streams message $
1) 1) "message"
   2) 1) 1) "1604478070071-0"
         2) 1) "keyblock2"
            2) "value"
(5.00s)

输入命令后,可以观察到命令没有任何输出,此时新开一个 redis-cli,输入 xadd message * keyblock2 value 将一条新的消息添加到 key 中,可以看到上面的命令返回了刚才添加的值和阻塞时间。

xlen命令

语法格式:

XLEN key

返回 key 中消息的数量,如果 key 不存在,则会返回 0。即使 key 中消息的数量为 0,key 也不会被自动删除,因为可能还存在和 key 关联的消费者组。

1
2

127.0.0.1:6379> xlen message
(integer) 5

返回 message 中所有消息的数量。

xrange命令

语法如下:

XRANGE key start end [COUNT count]

  • key,指定 stream 的名字
  • start,起始 ID
  • end,终止 ID
  • [COUNT count],读取的数量

该命令返回与给定的 ID 范围相匹配的消息。ID 的范围由 start 和 end 参数来指定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

127.0.0.1:6379> xrange message - +
1) 1) "1-0"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
2) 1) "1-2"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
3) 1) "1604476672762-0"
   2) 1) "key3"
      2) "value3"
4) 1) "1604478059261-0"
   2) 1) "keyblock"
      2) "value"
5) 1) "1604478070071-0"
   2) 1) "keyblock2"
      2) "value"

此命令由两个特殊的 ID,使用 - 表示最小的 ID 值,使用 + 表示最大的 ID 值,可以查询所有的消息。

1
2
3
4
5
6
7
8
9
10
11

127.0.0.1:6379> xrange message 1 1
1) 1) "1-0"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
2) 1) "1-2"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"

即使 ID 不完整也可以,只输入 ID 值会输出所有拥有相同 ID 不同序列号的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14

127.0.0.1:6379> xrange message - + count 3
1) 1) "1-0"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
2) 1) "1-2"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
3) 1) "1604476672762-0"
   2) 1) "key3"
      2) "value3"

使用 count 参数可以限制输出消息的数量。

通过简单的循环可以使用少量内存迭代一个 key 中所有的值,只需要将上次迭代的结果中最大的 ID 作为下一次迭代的起始 ID 即可。

xrevrange命令

语法说明:

XREVRANGE key end start [COUNT count]

xrevrange 命令和 xrange 命令语法完全相同,只有一点不同,xrevrange 是反向遍历的,不再赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

127.0.0.1:6379> xrevrange message + -
1) 1) "1604478070071-0"
   2) 1) "keyblock2"
      2) "value"
2) 1) "1604478059261-0"
   2) 1) "keyblock"
      2) "value"
3) 1) "1604476672762-0"
   2) 1) "key3"
      2) "value3"
4) 1) "1-2"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
5) 1) "1-0"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"

xtrim命令

语法说明:

XTRIM key MAXLEN [~] count

  • key,指定 stream 的名字
  • maxlen,指定修剪策略,当前只实现了这一种
  • [~],是否近似修剪
  • count,修剪后的数量

xtrim 命令会从 ID 值比较小的消息开始丢弃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

127.0.0.1:6379> xread streams message 0
1) 1) "message"
   2) 1) 1) "1-0"
         2) 1) "key1"
            2) "value1"
            3) "key2"
            4) "value2"
      2) 1) "1-2"
         2) 1) "key1"
            2) "value1"
            3) "key2"
            4) "value2"
      3) 1) "1604476672762-0"
         2) 1) "key3"
            2) "value3"
      4) 1) "1604478059261-0"
         2) 1) "keyblock"
            2) "value"
      5) 1) "1604478070071-0"
         2) 1) "keyblock2"
            2) "value"
127.0.0.1:6379> xtrim message maxlen 4
(integer) 1

xtrim 命令的返回值是修剪掉的 ID 的数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

127.0.0.1:6379> xrange message - +
1) 1) "1-2"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
2) 1) "1604476672762-0"
   2) 1) "key3"
      2) "value3"
3) 1) "1604478059261-0"
   2) 1) "keyblock"
      2) "value"
4) 1) "1604478070071-0"
   2) 1) "keyblock2"
      2) "value"

再次查看可以看到 key 中的消息数量被修剪掉了一个,只剩下了四个。

1
2

127.0.0.1:6379> xtrim message maxlen ~ 2
(integer) 0

如果使用了 ~ 参数,则可能不会进行修剪。此参数告诉 redis 在能够删除整个宏节点时才执行修剪,这样做效率更高,并且可以保证消息的数量不小于所需要的数量。

xdel命令

语法说明:

XDEL key ID [ID ...]

  • key,指定 stream 的名字
  • ID [ID ...],需要删除的 ID 值

xdel 命令用于从 key 中删除指定 ID 的消息,当 ID 不存在时,返回的数目可能和删除的数目不一致。在执行 xdel 命令时,redis 并不会在内存中删除对应的消息,而只会把它标记为删除,在所有节点都被删除之后整个节点被销毁,内存被回收。

1
2
3
4
5
6
7
8
9
10
11
12

127.0.0.1:6379> xdel message 1-2
(integer) 1
127.0.0.1:6379> xrange message - +
1) 1) "1604476672762-0"
   2) 1) "key3"
      2) "value3"
2) 1) "1604478059261-0"
   2) 1) "keyblock"
      2) "value"
3) 1) "1604478070071-0"
   2) 1) "keyblock2"
      2) "value"

删除了 ID 为 1-2 的消息。

xgroup命令

语法说明:

XGROUP [CREATE key groupname id-or-] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupname consumername]

  • [CREATE key groupname id-or-,分组将可以读取指定 key 的新消息,将不能读取历史消息。也可以指定任意的开始 ID。
  • [SETID key groupname id-or-$],重新给已存在的分组设置消息读取的起点。例如将起点设置为 0就可以重新读取所有的历史消息
  • [DESTROY key groupname],销毁指定 key 中的一个分组
  • [CREATECONSUMER key groupname consumername],在指定的 key 和指定的分组中创建一个消费者。当某个命令提及了新的消费者名称时,也会自动创建新的消费者。
  • [DELCONSUMER key groupname consumername],在指定的 key 和指定的分组中销毁一个消费者。

xgroup 是一个命令组,可以通过不同的关键字执行不同的命令。

1
2
3
4
5
6
7
8
9
10
11

127.0.0.1:6379> xgroup create message read_group $
OK
127.0.0.1:6379> xreadgroup group read_group read streams message >
(nil)
127.0.0.1:6379> xadd message * readkey readvalue
"1604494179721-0"
127.0.0.1:6379> xreadgroup group read_group read streams message >
1) 1) "message"
   2) 1) 1) "1604494179721-0"
         2) 1) "readkey"
            2) "readvalue"

使用 creat 命令创建一个 read_group 分组,指定 ID 的起点为最后一个 ID,直接读取的话是空值(xreadgroup命令下面说)。

使用 xadd 命令添加一条新的消息,再次读取发现可以正常读取到新加入的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

127.0.0.1:6379> xgroup setid message read_group 0
OK
127.0.0.1:6379> xreadgroup group read_group read streams message >
1) 1) "message"
   2) 1) 1) "1604476672762-0"
         2) 1) "key3"
            2) "value3"
      2) 1) "1604478059261-0"
         2) 1) "keyblock"
            2) "value"
      3) 1) "1604478070071-0"
         2) 1) "keyblock2"
            2) "value"
      4) 1) "1604494179721-0"
         2) 1) "readkey"
            2) "readvalue"

使用 setid 命令重新设置读取 ID 的起点,可以读取到所有的历史消息。

1
2
3
4
5
6
7
8
9

127.0.0.1:6379> xinfo groups message
1) 1) "name"
   2) "read_group"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 4
   7) "last-delivered-id"
   8) "1604494179721-0"

使用 xinfo 命令查询新创建的分组信息,可以看到分组名字,消费者数量,最后加入的消息的 ID 值(xinfo 命令下边说)。

1
2
3
4
5
6
7

127.0.0.1:6379> xinfo consumers message read_group
1) 1) "name"
   2) "read"
   3) "pending"
   4) (integer) 4
   5) "idle"
   6) (integer) 476449

使用 xinfo 命令查看新加入的消费者的信息,可以看到消费者名字,处于 pending(待处理) 状态的消息数量(pending 状态下边说)。

1
2
3
4

127.0.0.1:6379> xgroup delconsumer message read_group read
(integer) 4
127.0.0.1:6379> xinfo consumers message read_group
(empty array)

使用 delconsumer 命令删除分组中的消费者,使用 xinfo 命令查看分组中的消费者,返回一个空数组,说明删除成功。delconsumer 命令的返回值为当前消费者所拥有的 pending(待处理) 状态的消息数量。

1
2
3
4

127.0.0.1:6379> xgroup destroy message read_group
(integer) 1
127.0.0.1:6379> xinfo groups message
(empty array)

使用 destroy 命令删除 message 中的分组,使用 xinfo 命令查看,返回一个空数组,说明删除成功。destroy 命令的返回值为删除成功的分组数量。注意:即使由活跃的消费者和 pending(待处理) 状态的消息,分组仍然会被删除,需要确保在需要时才执行此命令。

xinfo命令

语法说明:

XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

  • [CONSUMERS key groupname],查询指定 key 和指定分组中的消费者信息。
  • [GROUPS key],查询指定 key 中的分组信息。
  • [STREAM key],查询指定 key 中所有的信息。

1
2
3
4
5
6
7

127.0.0.1:6379> xinfo consumers message read_group
1) 1) "name"
   2) "read"
   3) "pending"
   4) (integer) 4
   5) "idle"
   6) (integer) 206796

读取 message 的 read_group 分组中所有的消费者信息。

1
2
3
4
5
6
7
8
9

127.0.0.1:6379> xinfo groups message
1) 1) "name"
   2) "read_group"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 4
   7) "last-delivered-id"
   8) "1604494179721-0"

读取 message 中所有的分组信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

127.0.0.1:6379> xinfo stream message
 1) "length"
 2) (integer) 4
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1604494179721-0"
 9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1604476672762-0"
    2) 1) "key3"
       2) "value3"
13) "last-entry"
14) 1) "1604494179721-0"
    2) 1) "readkey"
       2) "readvalue"

读取 message 所有的信息。

xpending命令

语法说明:

XPENDING key group [start end count] [consumer]

  • key,指定的 key
  • group,指定的分组
  • [start end count],起始 ID 和结束 ID 还有数量
  • consumer,消费者名字

1
2
3
4
5
6

127.0.0.1:6379> xpending message readgroup
1) (integer) 2
2) "1604496633846-0"
3) "1604496640734-0"
4) 1) 1) "read"
      2) "2"

xpending 命令可以查看对应分组中未确认的消息的数量和其所对应的消费者的名字还有起始和终止 ID。

1
2
3
4
5
6
7
8
9

127.0.0.1:6379> xpending message readgroup - + 10 read
1) 1) "1604496633846-0"
   2) "read"
   3) (integer) 513557
   4) (integer) 4
2) 1) "1604496640734-0"
   2) "read"
   3) (integer) 482927
   4) (integer) 1

使用 xpending 命令可以查看处于未确认状态的消息的具体信息。

xreadgroup命令

语法说明:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

  • GROUP,固定
  • group,分组名
  • consumer,消费者名
  • [COUNT count],每次获取消息的数量
  • [BLOCK milliseconds],阻塞模式和超时时间
  • [NOACK],不需要确认消息,适用于不怎么重要的可以丢失的消息
  • STREAMS,固定
  • key [key ...],指定的 key
  • ID [ID ...],指定的消息 ID,> 指定读取所有未消费的消息,其他值指定被挂起的消息

xreadgroup 命令通过与消费者组和消费者的结合可以做到消息的读取与确认,在 xread 的基础上细化了读取消息操作。

从语法上来看,xreadgroup 和 xread 命令几乎相同,xreadgroup 命令多了一个强制性的参数:GROUP groupname consumername。

当多个消费者同时消费同一个消息队列时,会重复消费相同的消息,每条消息都会被每个消费者消费一遍。但是如果想要多个消费者协作消费同一个消息队列时,就需要用到消费者组。

例如推送系统,肯定是不可以重复推送的,也就是说每条消息只可以被消费一遍,这时就可以使用多个消费者来消费同一个推送队列,降低每个消费者系统的压力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> xadd message * key1 value1
"1604496633846-0"
127.0.0.1:6379> xadd message * key2 value2
"1604496640734-0"
127.0.0.1:6379> xgroup create message readgroup 0
OK
127.0.0.1:6379> xadd message * key3 value3
"1604496696501-0"
127.0.0.1:6379> xadd message * key4 value4
"1604496704823-0"
127.0.0.1:6379> xreadgroup group readgroup read count 1 streams message >
1) 1) "message"
   2) 1) 1) "1604496633846-0"
         2) 1) "key1"
            2) "value1"
127.0.0.1:6379> xreadgroup group readgroup read count 1 streams message >
1) 1) "message"
   2) 1) 1) "1604496640734-0"
         2) 1) "key2"
            2) "value2"

从头开始,清空数据库,重新给 message 添加消息,创建分组,使用 readgroup 命令读取最新的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

127.0.0.1:6379> xinfo consumers message readgroup
1) 1) "name"
   2) "read"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 53146
127.0.0.1:6379> xpending message readgroup - + 10 read
1) 1) "1604496633846-0"
   2) "read"
   3) (integer) 513557
   4) (integer) 4
2) 1) "1604496640734-0"
   2) "read"
   3) (integer) 482927
   4) (integer) 1

读取了两条消息,使用 xinfo 命令查看,可以看到有两条消息处于 pending(待处理)状态,使用 xpending 命令可以查看处于未确认状态的消息的具体信息。

xack命令

语法说明:

XACK key group ID [ID ...]

  • key,指定的 key
  • group,指定的 group
  • D [ID ...],需要确认的消息的 ID

xack 命令从 pending 队列中删除挂起的消息,也就是确认之前未确认的消息。当使用 xreadgroup 命令读取消息时,消息同时被存储到 PEL 中,等待被确认,调用 xack 命令可以从 PEL 中删除挂起的消息并且释放内存,确保不丢失消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

127.0.0.1:6379> xack message readgroup 1604496633846-0
(integer) 1
127.0.0.1:6379> xpending message readgroup
1) (integer) 1
2) "1604496640734-0"
3) "1604496640734-0"
4) 1) 1) "read"
      2) "1"
127.0.0.1:6379> xinfo consumers message readgroup
1) 1) "name"
   2) "read"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 418489

使用 xack 命令确认一条消息,再次使用 xpending 命令查看未确认消息的数量,只剩一条未确认消息,使用 xinfo 命令查看处于 pending 状态的消息数量也为 1,确认消息成功。

xclaim命令

语法说明:

XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]

  • key,指定的 key
  • group,指定的分组
  • consumer,指定的消费者
  • min-idle-time,指定消息最小空闲数,指定空闲了多久的消息会被选中
  • ID [ID ...],消息的 ID
  • [IDLE ms],设置消息的空闲时间,如果不提供,默认为 0
  • [TIME ms-unix-time],和IDLE相同,unix 时间戳
  • RETRYCOUNT,设置重试次数,通常 xclaim 不会改变这个值,它通常用于 xpending 命令,用来发现一些长时间未被处理的消息。
  • FORCE,在 PEL 中创建待处理消息,即使指定的 ID 尚未分配给客户端的PEL。
  • JUSTID,只返回认领的消息 ID 数组,不返回实际消息。

xclaim 命令用于更改未确认消息的所有权,如果有消费者在读取了消息之后未处理完成就挂掉了,那么消息会一直在 pending 队列中,占用内存,这时需要使用 xclaim 命令更改此条消息的所属者,让其他的消费者去消费这条消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14

127.0.0.1:6379> xreadgroup group readgroup read2 count 1 streams message >
1) 1) "message"
   2) 1) 1) "1604496696501-0"
         2) 1) "key3"
            2) "value3"
127.0.0.1:6379> xack message readgroup 1604496696501-0
(integer) 1
127.0.0.1:6379> xpending message readgroup - + 10 read
1) 1) "1604496640734-0"
   2) "read"
   3) (integer) 1258517
   4) (integer) 2
127.0.0.1:6379> xpending message readgroup - + 10 read2
(empty array)

新创建一个消费者,读取一条消息,然后将消息确认掉,查看两个消费者中未确认消息的数量,read 有一条,read2 没有未确认的消息。

1
2
3
4
5
6
7
8
9
10
11

127.0.0.1:6379> xclaim message readgroup read2 0 1604496640734-0
1) 1) "1604496640734-0"
   2) 1) "key2"
      2) "value2"
127.0.0.1:6379> xpending message readgroup - + 10 read2
1) 1) "1604496640734-0"
   2) "read2"
   3) (integer) 3724
   4) (integer) 4
127.0.0.1:6379> xpending message readgroup - + 10 read
(empty array)

使用 xclaim 命令将消息所有权转移给 read2 这个消费者,可以看到,消费者 read2 的 pending 队列中有一条未确认消息,消费者 read 的 pending 队列中已经没有消息了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/564529.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

集合的三种遍历方式

文章目录 目录 文章目录 一.迭代器遍历 二.增强for遍历 三. forEach方法 四. Lambda表达式 函数式接口: 函数式接口是指只有一个抽象方法的接口 为什么Lambda只能用于函数式接口 总结 前言 作者简介:最爱吃兽奶 座右铭:抱怨身处黑暗,不如提灯前行 内容介绍:今天给大家讲一下集合…

学完能拿下阿里23k的JMeter+Grafana+Influxdb搭建可视化性能测试监控平台

【背景说明】 使用jmeter进行性能测试时,工具自带的查看结果方式往往不够直观和明了,所以我们需要搭建一个可视化监控平台来完成结果监控,这里我们采用三种JMeterGrafanaInfluxdb的方法来完成平台搭建 【实现原理】 通过influxdb数据库存储…

Shell运维实战3-while、case、for、select

目录 case基本case 企业级案例 while 循环当型与直到后台运行while 实战 for & selectfor 循环方法linux 生成随机数select case 基本 请注意 case 使用后的闭合问题,开头 case,结尾 esac #! /bin/bashread -p "input your number:" ans…

【C++刷题集】-- day3

目录 选择题 单选 OR59 字符串中找出连续最长的数字串⭐ 【题目解析】 【解题思路】 JZ39 数组中出现次数超过一半的数字⭐ 【题目解析】 【解题思路1】 【解题思路2】 选择题 单选 1、以下程序的输出结果是 ( ) #include <stdio.h> int main() {char a[10] …

vite + vue3 + storybook + ts 搭建组件库记录

目标 只按需引入&#xff0c;不依赖babel-import-plugin 插件。第三方依赖都不打包。用原生fetch请求数据。仅支持esmodule。配置package.json type:"module" 搭建 根据storybook 官网文档&#xff0c;需要在已有的项目中运行 npx storybooklatest init 也就是事…

电源原理分析、波形分析、应力计算、回路布局

1、Flyback变换器工作模态分析&#xff1b; 2、Flyback关键波形分析&#xff1b; 3、RCD吸收电路设计及开关管应力&#xff1b; 4、从噪音回路看布线要点。 5、基于实际项目&#xff0c;原创反激开关电源视频教程曝光 Flyback 变换器模态分析 ​ ON&#xff1a;开关管导通&…

516. 最长回文子序列

516. 最长回文子序列 C代码&#xff1a;dp int longestPalindromeSubseq(char* s) {int n strlen(s);int dp[n][n];memset(dp, 0, sizeof(dp));for (int i 0; i < n; i) {dp[i][i] 1;}for (int i n - 1; i > 0; i--) {for (int j i 1; j < n; j) {if (s[i] s[…

2023最新CTF入门的正确姿势

前言 随着网络安全意识的增强&#xff0c;越来越多的人开始涉足网络安全领域&#xff0c;其中CTF比赛成为了重要的学习和竞赛平台。本人从事网络安全工作多年&#xff0c;也参加过大大小小的CTF比赛&#xff0c;今天就来详细介绍CTF的流程&#xff0c;以及需要具备的知识&…

【前端相关】elementui使用el-upload组件实现自定义上传

elementui使用el-upload组件实现自定义上传 一、问题描述二、实现方式三、实现步骤3.1 方式一&#xff1a;选择后自动上传3.2 方式二&#xff1a;选择图片后手动上传3.3 拓展&#xff1a;上传文件夹 四、服务器相关接口 一、问题描述 elmentui 中的upload默认的提交行为是通过…

[C++]哈希表实现,unordered_map\set封装

目录​​​​​​​ 前言&#xff1a; 1 哈希 1.1 为什么有哈希 1.2 哈希结构 1.3 哈希冲突 2 闭散列 2.1 闭散列结点结构和位置状态表示 2.2 哈希类结构 2.3 插入 2.4 查找 2.5 删除 3 开散列 3.1 哈希表结点结构 3.2 哈希表结构 3.3 插入 3.4 查找、删除 3.5…

Ubuntu/Debian/CentOS搭建Socks5代理一键脚本

说明 Socks5属于明文代理&#xff0c;不要用于科学上网&#xff0c;否则会被阻断端口&#xff0c;可用于正常的跳板使用&#xff1b; 比如SSH转发加速国外VPS的连接速度&#xff0c;特别是一些延迟高或者丢包高的VPS&#xff1b; 使用Socks5转发后SSH就可以快速稳定的连接了&a…

Java中的Replace和ReplaceAll的区别

replace和replaceAll是都是String类中提供的两种用于字符/字符串替换的方法&#xff0c;从字面意思理解&#xff0c;replace表示替换单个匹配项&#xff0c;而replaceAll表示替换所有匹配项&#xff1b;实际上并不是这样子的,replace和replaceAll都是替换所有匹配项,replace是非…

链式二叉树OJ题思路分享

⏩博主CSDN主页:杭电码农-NEO⏩   ⏩专栏分类:刷题分享⏪   ⏩代码仓库:NEO的学习日记⏩   &#x1f339;关注我&#x1faf5;带你刷更多C语言和数据结构的题!   &#x1f51d;&#x1f51d; 链式二叉树OJ题分享 1. 前言&#x1f6a9;2. 单值二叉树&#x1f6a9;2.1 审题…

macOS Ventura 13.5beta OpenCore黑苹果双引导分区原版镜像

镜像特点&#xff08;原文地址&#xff1a;http://www.imacosx.cn/113700.html&#xff0c;转载请注明出处&#xff09; 完全由黑果魏叔官方制作&#xff0c;针对各种机型进行默认配置&#xff0c;让黑苹果安装不再困难。系统镜像设置为双引导分区&#xff0c;全面去除clover引…

六级备考25天|CET-6|听力第五讲|演讲满分技巧|分值最高|2022年6月考题16-18题|18:15~19:00

14.2分一题 抓重点 目录 2. 听力原文复现 问题16 问题17 问题18 3. 听力原文重点词汇 问题16 问题17 问题18 2. 听力原文复现 问题16 What does the speaker say about most American hospitals&#xff1f; visit brief useless Dont challenge with the authority unle…

HEVC熵编码核心点介绍

熵编码基本原理 消息与信息 把客观物质运动和主观思维的活动状态表达出来就成为了消息&#xff1b;消息中包含信息&#xff0c;是信息的载体&#xff1b;因此&#xff0c;信息与消息既有区别又有联系的&#xff1b; 获取信息的过程就是一个消除或部分消除不确定性的过程&…

Linux网络基础-4

在之前的网络基础博客中&#xff0c;我们对网络进行了概要解释&#xff0c;了解了应用层和传输层的知名协议。接下来我们来对网络层的典型协议进行解析。 目录 1.网络层协议 2.IP协议 2.1协议格式 2.2地址管理 2.3特殊网络 2.3.1私网的组建 2.3.2特殊IP地址 2.4路由选…

ACP(MaxCompute篇)-Tunnel上传下载数据

概述 相关命令 odpscmd里面包含了tunnel test11_123>tunnel help; Usage: tunnel <subcommand> [options] [args] Type tunnel help <subcommand> for help on a specific subcommand.Available subcommands:upload (u)download (d)resume (r)show (s)purge …

探索SpringBoot 3.1的惊艳新特性

一、介绍 1.1 新特性概述 经过半年的沉淀 Spring Boot 3.1于2023年5月18日正式发布了&#xff0c;带来了许多令人兴奋的新特性和改进。本篇博客将详细介绍Spring Boot 3.1的新特性、升级说明以及核心功能的改进。 同时&#xff0c;2.6.x 版本线已经停止维护了&#xff0c;最新…

《消息队列高手课》课程笔记(二)

消息模型&#xff1a;主题和队列有什么区别&#xff1f; 两类消息模型 早期的消息队列&#xff0c;就是按照“队列”的数据结构来设计的。 生产者&#xff08;Producer&#xff09;发消息就是入队操作&#xff0c;消费者&#xff08;Consumer&#xff09;收消息就是出队也就是…