目录
- 一、Redis Stream
- 1.1 场景1:多个客户端可以同时接收到消息
- 1.1.1 XADD - 向stream添加Entry(发消息 )
- 1.1.2 XREAD - 从stream中读取Entry(收消息)
- 1.1.3 XRANGE - 从stream指定区间读取Entry(收消息)
- 1.2 场景2:多个客户端仅收到一部分消息(分片sharded、消费组group)
- 1.2.1 XGROUP CREATE - 创建消费组
- 1.2.2 XREADGROUP - 从消费组中读取消息
- 1.2.3 XACK - 确认消息
- 1.2.4 XPENDING - 读取PEL消息
- 1.2.5 XCLAIM & XAUTOCLAIM - 转移PEL中消息的所有权给其他消费者
- 1.2.6 统计命令
- 1.3 其他
- 二、Redisson Stream
一、Redis Stream
之前介绍过Redis Pub/Sub相关内容,通过Redis Pub/Sub可以实现发布/订阅消息传递范式,但是存在丢消息的可能,而本文介绍的Redis Stream是一种可用来实现 可靠消息队列、支持消息分组(类似Kafka Group) 的数据结构。
关于Redis Stream的使用存在如下2个场景
- 场景1: 多个客户端可以同时接收到消息
- 场景2: 多个客户端仅收到一部分消息(分片sharded),例如发送消息A,B,C,客户端1收到A,C,客户端2收到B(参考Kafka group概念)。
关于场景1,则可参考XADD、XREAD、XRANGE
等相关命令的使用,
关于场景2,则需要了解XGROUP CREATE、XREADGROUP、XACK
等相关命令的使用。
1.1 场景1:多个客户端可以同时接收到消息
场景1中相关命令XADD、XREAD、XRANGE
的使用汇总如下图:
1.1.1 XADD - 向stream添加Entry(发消息 )
向stream添加Entry(多个key/value对),XADD命令格式:
XADD
stream名称 id key1 value1 key2 value2 …
其中id为此次entry的唯一ID,而key1 value1 key2 value2 …即为entry的具体内容,
id为*
则表示由Redis自动生成ID:<millisecondsTime>-<sequenceNumber>
,
亦可明确指定id。
示例:
XADD mystream * name 罗 age 18
XADD mystream 1692632086370-0 name 刘 age 18
1.1.2 XREAD - 从stream中读取Entry(收消息)
从stream中读取entry,XREAD命令格式:
XREAD
COUNT
最多读取数量BLOCK
阻塞等待毫秒数STREAMS
stream名称 上次接收的id
通过XADD添加一条消息,多个执行XREAD的客户端都会读取到该消息,
XREAD会从参数中指定的 上次接收的id 之后开始读取后续的消息,
上次接受的id 可设置为$
,需配合BLOCK使用,表示仅读取从阻塞开始后新添加的消息(即不关心历史消息),
上次接受的id 可设置为+
,需要Redis版本>=7.4 RC1,表示仅读取最后一条消息。
阻塞等待的毫秒数 如果为0,则表示一直阻塞,直到读取到一条消息。
示例:
# 从头开始读取1条消息
XREAD STREAMS mystream 0
# 从头开始读取2条消息
XREAD COUNT 2 STREAMS mystream 0-0
# 从指定消息ID之后开始读取2条消息
XREAD COUNT 2 STREAMS mystream 1692632086370-0
# 最长阻塞5秒,最多读取100条消息,仅读取从阻塞开始后新添加的消息
XREAD BLOCK 5000 COUNT 100 STREAMS mystream $
# 继续从上次接受的id之后继续读取
XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3
# 读取最后一条消息(需要Redis版本>=7.4 RC1)
XREAD STREAM mystream +
1.1.3 XRANGE - 从stream指定区间读取Entry(收消息)
从stream指定区间(起始ID范围)正向读取Entry,XRANGE命令格式:
XRANGE
stream名称 起始id 结束idCOUNT
最多读取数量
按起始到结束正向返回消息,
-
表示最小ID,+
表示最大ID
示例:
# 返回全部消息(从前到后依次返回)
XRANGE mystream - +
# 返回5条消息(从前到后依次返回)
XRANGE mystream - + COUNT 5
# 返回指定id(包括指定id)之后5条消息(从前到后依次返回)
XRANGE mystream 1718951980910-0 + COUNT 5
# 返回指定id(不包括指定id)之后5条消息(从前到后依次返回)
XRANGE mystream (1718951980910-0 + COUNT 5
从stream指定区间(起始ID范围)逆向读取Entry,XREVRANGE命令格式:
XREVRANGE
stream名称 结束id 起始idCOUNT
最多读取数量
按结束到起始逆向返回消息。
示例:
返回全部消息(从后到前逆向依次返回)
XREVRANGE mystream + -
# 返回2条消息(从后到前逆向依次返回)
XREVRANGE mystream + - COUNT 2
1.2 场景2:多个客户端仅收到一部分消息(分片sharded、消费组group)
场景2中相关命令XGROUP CREATE、XREADGROUP、XACK、XPENDING、XCLAIM
等使用汇总如下图:
1.2.1 XGROUP CREATE - 创建消费组
给stream创建消费分组,分组间彼此隔离,分组内多个consumer会轮流消费消息(分片),XGROUP CREATE命令格式:
XGROUP CREATE
stream名称 group名称 起始读取id [MKSTREAM
]
起始读取id 为0
,表示从头开始读取,
起始读取id 为$
,表示从最后一条消息之后开始读取,
MKSTREAM
子命令是可选的,表示自动创建stream。
示例:
# 为mystream创建分组mygroup1,且从最新消息开始消费
XGROUP CREATE mystream mygroup1 $
1.2.2 XREADGROUP - 从消费组中读取消息
以分组group读取stream中的消息,group中每个客户端需要指定consumer名称,多个consumer分摊group中的消息,而多个group间彼此隔离,XREADGROUP
命令格式:
XREADGROUP GROUP
group名称 consumer名称COUNT
最多读取数量BLOCK
阻塞等待毫秒数 [NOACK
]STREAMS
stream名称 上次接收的id
PEL(Pending Entries List): 当使用XREADGROUP
读取分组下消息时,服务器会记住哪条消息发给了分组下的哪个消费者,该记录存储在消费者组中,称为PEL,即已发送但尚未确认的消息ID列表。后续在消费者处理完消息后,消费者必须手动调用XACK命令对消息ID进行确认,以便从PEL中删除挂起的消息,关于PEL的结构可参见下图(截取自RedisInsight工具):
上次接收的id 为>
,表示消费者只希望接收从未传递给任何其他消费者的消息,即给我新的信息,>
号表示从当前消费组的last_delivered_id后面开始读。
上次接收的id 设为0
或其他有效的id,则表示仅读取 PEL(当前consumer没有确认的消息) 中指定id之后的消息。
NOACK
子命令式可选的,表示无需确认消息,NOACK子命令适用于对可靠性要求不高、偶尔的消息丢失是可以接受的情况,使用NOACK子命令可以避免将消息添加到PEL( Pending Entries List),相当于在读取消息后自动确认消息,后续无需再调用XACK命令进行确认,
示例:
# 消费者c1阻塞读取mystream下分组mygroup1的最新消息(直到读取到1条消息后解除阻塞)
XREADGROUP GROUP mygroup1 c1 BLOCK 0 STREAMS mystream >
# 消费者c1读取mystream下分组mygroup1的PEL消息(即已投递给c1但c1未进行确认的消息列表)
XREADGROUP GROUP mygroup1 c1 STREAMS mystream 0
1.2.3 XACK - 确认消息
确认stream下指定分组group的某条消息已被成功消费,XACK
命令格式:
XACK
stream名称 group名称 消息id
示例:
# 确认1条消息
XACK mystream mygroup1 1719206857966-0
# 同时确认3条消息
XACK mystream mygroup1 1719206857966-0 1719206909894-0 1719207195666-0
1.2.4 XPENDING - 读取PEL消息
读取stream中指定分组group的PEL挂起消息列表,XPENDING
命令格式:
XPENDING
stream名称 group名称IDEL
空闲毫秒数 起始消息id 结束消息id 查询数量 consumer名称
示例:
# 查询mystream下mygroup1分组的PEL列表
XPENDING mystream mygroup1
# 查询mystream下mygroup1分组下的消费者c1的空闲9秒的最多10条PEL消息
XPENDING mystream mygroup1 IDLE 9000 - + 10 c1
1.2.5 XCLAIM & XAUTOCLAIM - 转移PEL中消息的所有权给其他消费者
通过XPENDING查询出PEL消息(已投递未确认)后,若原先消息对应的consumer已经挂掉,没有能力继续处理消息,则可通过XCLIAM将对应的消息转移给同分组下的其他consumer进行处理,XCLAIM
命令格式如下:
XCLAIM
stream名称 group名称 consumer名称 空闲时长毫秒 消息id1 消息id2
转移后消息上次投递时间会重置为当前时间(即消息空闲idle时间为0),
默认会返回已经转移成功的消息内容,且消息投递计数会加1,
也可添加JUSTID
子命令,则只返回消息ID不返回消息内容,且消息投递计数不变,
若多个客户端同时通过XCLAIM转移同一条消息的所有权,则只会有一个客户端转移成功。
Redis官方原文如下:
Note that the message is claimed only if its idle time is greater than the minimum idle time we specify when calling XCLAIM. Because as a side effect XCLAIM will also
- reset the idle time (since this is a new attempt at processing the message),
- two consumers trying to claim a message at the same time will never both succeed: only one will successfully claim the message. This avoids that we process a given message multiple times in a trivial way (yet multiple processing is possible and unavoidable in the general case).
示例:
# mystream下mygroup1分组下的PEL消息1526569498055-0且空闲时长超过1小时,则将其转移给消费者c2
XCLAIM mystream mygroup1 c2 3600000 1526569498055-0
亦可通过XAUTOCLAIM
将PEL中指定起始消息ID后的消息批量进行转移,XAUTOCLIAM
命令格式如下:
XAUTOCLAIM
stream名称 group名称 consumer名称 空闲时长毫秒 起始消息idCOUNT
消息数量
示例:
# 扫描mystream下mygroup1分组下的所有PEL消息,空闲时长超过1小时,则最多转移25条消息给消费者c2
XAUTOCLAIM mystream mygroup1 c2 3600000 0-0 COUNT 25
1.2.6 统计命令
# 查询stream下的分组信息
XINFO GROUPS stream名称
# 查询stream信息
XINFO STREAM stream名称
# 查询stream下指定分组的消费者信息
XINFO CONSUMERS stream名称 group名称
1.3 其他
删除stream中的消息:
XDEL
stream名称 id1 id2 …
查询stream中的消息(entry)数量:
XLEN
stream名称
压缩stream中的消息数据量:
XTRIM
stream名称MAXLEN
保留的最近消息数量
XTRIM
stream名称MINID
消息ID(小于此ID的消息均会被删除)
二、Redisson Stream
在Redisson中可通过Stream实现Redis Stream,
场景1 相关示例代码如下:
@Test
void testStream() throws InterruptedException {
String streamName = "mystream";
MyMessage2 myMessage = this.buildMyMessageWithTimestampId();
//获取Stream
RStream<String, Object> stream = this.redisson.getStream(streamName);
//发消息 - XADD mystream * name 我的消息 age 18
StreamMessageId entryId = stream.add(StreamAddArgs.entries(myMessage.toMap()));
log.info("stream[{}] add success, id: {}", streamName, entryId);
//读消息 - XREAD COUNT 5 BLOCK 5000 STREAMS mystream 0
Map<StreamMessageId, Map<String, Object>> entries = stream.read(StreamReadArgs.greaterThan(StreamMessageId.ALL).count(5).timeout(Duration.ofSeconds(5)));
entries.forEach((id, entryMap) -> {
log.info("stream[{}] read message: id={}, entry: {}", streamName, id, entryMap);
});
//读取区间内消息 - XRANGE mystream 0 entryId COUNT 10
entries = stream.range(10, StreamMessageId.ALL, entryId);
entries.forEach((id, entryMap) -> {
log.info("stream[{}] range message: id={}, entry: {}", streamName, id, entryMap);
});
}
场景2 相关示例代码如下:
@Resource
private RedissonClient redisson;
@Test
void testStreamGroup() throws InterruptedException {
String streamName = "mystream";
String groupName = "mygroup1";
String consumerName = "c1";
MyMessage2 myMessage = this.buildMyMessageWithTimestampId();
//获取Stream
RStream<String, Object> stream = this.redisson.getStream(streamName);
//发消息 - XADD mystream * name 我的消息 age 18
StreamMessageId entryId = stream.add(StreamAddArgs.entries(myMessage.toMap()));
log.info("stream[{}] add success, id: {}", streamName, entryId);
//查询已存在的分组 - XINFO GROUPS mystream
List<StreamGroup> streamGroups = stream.listGroups();
streamGroups.forEach(streamGroup -> {
log.info("stream[{}] listGroups groupName: {}", streamName, streamGroup.getName());
});
Boolean existGroup = streamGroups.stream().anyMatch(group -> groupName.equals(group.getName()));
if (!existGroup) {
//创建分组 - XGROUP CREATE mygroup1 $
stream.createGroup(StreamCreateGroupArgs.name(groupName)
//此处id支持:NEWEST即$,ALL即0
.id(StreamMessageId.ALL));
log.info("stream[{}] createGroup success, groupName: {}", streamName, groupName);
}
//读分组消息 - XREADGROUP GROUP mygroup1 c1 COUNT 5 BLOCK 5000 STREAMS mystream >
Map<StreamMessageId, Map<String, Object>> entries = stream.readGroup(groupName, consumerName,
//greaterThan即设置从哪个消息ID之后开始读取,支持:NEVER_DELIVERED即>、ALL即0
StreamReadGroupArgs.greaterThan(StreamMessageId.NEVER_DELIVERED)
.count(5)
.timeout(Duration.ofSeconds(5)));
entries.forEach((id, entryMap) -> {
log.info("stream[{}] readGroup groupName: {}, consumerName: {}, message: id={}, entry: {}",
streamName, groupName, consumerName, id, entryMap);
});
//读取PEL中未确认的消息 - XPENDING mystream mygroup1 - + 100 c1
Map<StreamMessageId, Map<String, Object>> streamMessageIdMapMap = stream.pendingRange(groupName, consumerName, StreamMessageId.MIN, StreamMessageId.MAX, 100);
streamMessageIdMapMap.forEach((id, entryMap) -> {
log.info("stream[{}] pendingRange groupName: {}, consumerName: {}, message: id={}, entry: {}",
streamName, groupName, consumerName, id, entryMap);
//确认消息(从PEL中移除) - XACK mystream mygroup1 1600000000000-0
stream.ack(groupName, id);
log.info("stream[{}] ack groupName: {}, consumerName: {}, message: id={}",
streamName, groupName, consumerName, id);
});
}
参考:
Redis Stream
https://redis.io/docs/latest/develop/data-types/streams/
https://redis.io/docs/latest/commands/xreadgroup/
Redisson Stream
https://github.com/redisson/redisson/wiki/7.-Distributed-collections#720-stream