前言
Redis 5.0 及 5.0 以后的版本提供的Streams 是专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令。
消息队列
Streams 操作
XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
名称为 mqstream 的消息队列中插入一条消息:
127.0.0.1:6379> xadd mqstream * high 168
"1683981116051-0"消息的键是 high
值是 168
mqstream 后面的*:表示让 Redis 为插入的数据自动生成一个全局唯一的 ID,也可以不用*,直接在消息队列名称后设定一个全局唯一 ID 号。
XREAD:用于读取消息,可以按 ID 读取数据;
取1683981116051-0后所有消息:
127.0.0.1:6379> xread block 100 streams mqstream 1683981116051-0
1) 1) "mqstream"
2) 1) 1) "1683983220880-0"
2) 1) "high"
2) "169"
2) 1) "1683983231594-0"
2) 1) "high"
2) "170"
block 配置项:阻塞读取操作,类似于Redis的list类型BRPOP操作127.0.0.1:6379> xread block 10000 streams mqstream $
(nil)
(10.09s)
block 10000 的配置项:10000 的单位是毫秒,如果没有新消息,阻塞10秒然后返回(nil):表示没有新消息
XREADGROUP:按消费组形式读取消息;
创建一个名为 grouphigh 的消费组,这个消费组消费的消息队列是 mqstream:
127.0.0.1:6379> xgroup create mqstream grouphigh 0
OK消费组的消费者读取mqstream的所有消息
127.0.0.1:6379> xreadgroup group grouphigh consumer1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1683981116051-0"
2) 1) "high"
2) "168"
2) 1) "1683983220880-0"
2) 1) "high"
2) "169"
3) 1) "1683983231594-0"
2) 1) "high"
2) "170"grouphigh:消费者
consumer1 :消费者
>:从第一条读取未被消费的消息
127.0.0.1:6379> xreadgroup group grouphigh consumer2 streams mqstream 0
1) 1) "mqstream"
2) (empty list or set)因为consumer1 读取了grouphigh中的数据就表示已经消费,consumer2就读取不到了
XPENDING :用来查询每个消费组内所有消费者已读取但尚未确认的消息;
为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。
查看grouphigh消费者 中各个消费者已读取、但未确认的消息个数:
127.0.0.1:6379> xpending mqstream grouphigh
1) (integer) 3
2) "1683981116051-0"
3) "1683983231594-0"
4) 1) 1) "consumer1"
2) "3"查看某个消费者具体读取数据:
127.0.0.1:6379> xpending mqstream grouphigh - + 10 consumer1
1) 1) "1683981116051-0"
2) "consumer1"
3) (integer) 653197
4) (integer) 2
2) 1) "1683983220880-0"
2) "consumer1"
3) (integer) 653197
4) (integer) 2
3) 1) "1683983231594-0"
2) "consumer1"
3) (integer) 653197
4) (integer) 2
XACK :用于向消息队列确认消息处理已完成。
通知 Streams 1683981116051-0消息已经被读取:
xack mqstream grouphigh 1683981116051-0
(integer) 1
127.0.0.1:6379> xpending mqstream grouphigh - + 10 consumer1
1) 1) "1683983220880-0"
2) "consumer1"
3) (integer) 944785
4) (integer) 2
2) 1) "1683983231594-0"
2) "consumer1"
3) (integer) 944785
4) (integer) 2
127.0.0.1:6379>