Redis Stream Redisson Stream

news2025/1/7 6:29:48

目录

    • 一、Redis Stream
      • 1.1 场景1:多个客户端可以同时接收到消息
        • 1.1.1 XADD - 向stream添加Entry(发消息 )
        • 1.1.2 XREAD - 从stream中读取Entry(收消息)
        • 1.1.3 XRANGE - 从stream指定区间读取Entry(收消息)
      • 1.2 场景2:多个客户端仅收到一部分消息(分片sharded、消费组group)
        • 1.2.1 XGROUP CREATE - 创建消费组
        • 1.2.2 XREADGROUP - 从消费组中读取消息
        • 1.2.3 XACK - 确认消息
        • 1.2.4 XPENDING - 读取PEL消息
        • 1.2.5 XCLAIM & XAUTOCLAIM - 转移PEL中消息的所有权给其他消费者
        • 1.2.6 统计命令
      • 1.3 其他
    • 二、Redisson Stream

一、Redis Stream

之前介绍过Redis Pub/Sub相关内容,通过Redis Pub/Sub可以实现发布/订阅消息传递范式,但是存在丢消息的可能,而本文介绍的Redis Stream是一种可用来实现 可靠消息队列、支持消息分组(类似Kafka Group) 的数据结构。

关于Redis Stream的使用存在如下2个场景

  • 场景1: 多个客户端可以同时接收到消息
  • 场景2: 多个客户端仅收到一部分消息(分片sharded),例如发送消息A,B,C,客户端1收到A,C,客户端2收到B(参考Kafka group概念)。

关于场景1,则可参考XADD、XREAD、XRANGE等相关命令的使用,
关于场景2,则需要了解XGROUP CREATE、XREADGROUP、XACK等相关命令的使用。

1.1 场景1:多个客户端可以同时接收到消息

场景1中相关命令XADD、XREAD、XRANGE的使用汇总如下图:
在这里插入图片描述

1.1.1 XADD - 向stream添加Entry(发消息 )

向stream添加Entry(多个key/value对),XADD命令格式:

XADD stream名称 id key1 value1 key2 value2 …

其中id为此次entry的唯一ID,而key1 value1 key2 value2 …即为entry的具体内容,
id为*则表示由Redis自动生成ID:<millisecondsTime>-<sequenceNumber>
亦可明确指定id。

示例:

XADD mystream * name 罗 age 18
XADD mystream 1692632086370-0 name 刘 age 18
1.1.2 XREAD - 从stream中读取Entry(收消息)

从stream中读取entry,XREAD命令格式:

XREAD COUNT 最多读取数量 BLOCK 阻塞等待毫秒数 STREAMS stream名称 上次接收的id

通过XADD添加一条消息,多个执行XREAD的客户端都会读取到该消息,
XREAD会从参数中指定的 上次接收的id 之后开始读取后续的消息,
上次接受的id 可设置为$,需配合BLOCK使用,表示仅读取从阻塞开始后新添加的消息(即不关心历史消息),
上次接受的id 可设置为+,需要Redis版本>=7.4 RC1,表示仅读取最后一条消息。
阻塞等待的毫秒数 如果为0,则表示一直阻塞,直到读取到一条消息。

示例:

# 从头开始读取1条消息
XREAD STREAMS mystream 0

# 从头开始读取2条消息
XREAD COUNT 2 STREAMS mystream 0-0
# 从指定消息ID之后开始读取2条消息
XREAD COUNT 2 STREAMS mystream 1692632086370-0

# 最长阻塞5秒,最多读取100条消息,仅读取从阻塞开始后新添加的消息
XREAD BLOCK 5000 COUNT 100 STREAMS mystream $
# 继续从上次接受的id之后继续读取
XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3

# 读取最后一条消息(需要Redis版本>=7.4 RC1)
XREAD STREAM mystream +
1.1.3 XRANGE - 从stream指定区间读取Entry(收消息)

从stream指定区间(起始ID范围)正向读取Entry,XRANGE命令格式:

XRANGE stream名称 起始id 结束id COUNT 最多读取数量

按起始到结束正向返回消息,
-表示最小ID,+表示最大ID

示例:

# 返回全部消息(从前到后依次返回)
XRANGE mystream - + 
# 返回5条消息(从前到后依次返回)
XRANGE mystream - + COUNT 5

# 返回指定id(包括指定id)之后5条消息(从前到后依次返回)
XRANGE mystream 1718951980910-0 + COUNT 5

# 返回指定id(不包括指定id)之后5条消息(从前到后依次返回)
XRANGE mystream (1718951980910-0 + COUNT 5

从stream指定区间(起始ID范围)逆向读取Entry,XREVRANGE命令格式:

XREVRANGE stream名称 结束id 起始id COUNT 最多读取数量

按结束到起始逆向返回消息。

示例:

返回全部消息(从后到前逆向依次返回)
XREVRANGE mystream + -
# 返回2条消息(从后到前逆向依次返回)
XREVRANGE mystream + - COUNT 2

1.2 场景2:多个客户端仅收到一部分消息(分片sharded、消费组group)

场景2中相关命令XGROUP CREATE、XREADGROUP、XACK、XPENDING、XCLAIM等使用汇总如下图:

在这里插入图片描述

1.2.1 XGROUP CREATE - 创建消费组

给stream创建消费分组,分组间彼此隔离,分组内多个consumer会轮流消费消息(分片),XGROUP CREATE命令格式:

XGROUP CREATE stream名称 group名称 起始读取id [MKSTREAM]

起始读取id0,表示从头开始读取,
起始读取id$,表示从最后一条消息之后开始读取,
MKSTREAM子命令是可选的,表示自动创建stream。

示例:

# 为mystream创建分组mygroup1,且从最新消息开始消费
 XGROUP CREATE mystream mygroup1 $
1.2.2 XREADGROUP - 从消费组中读取消息

以分组group读取stream中的消息,group中每个客户端需要指定consumer名称,多个consumer分摊group中的消息,而多个group间彼此隔离,XREADGROUP命令格式:

XREADGROUP GROUP group名称 consumer名称 COUNT 最多读取数量 BLOCK 阻塞等待毫秒数 [NOACK] STREAMS stream名称 上次接收的id

PEL(Pending Entries List): 当使用XREADGROUP读取分组下消息时,服务器会记住哪条消息发给了分组下的哪个消费者,该记录存储在消费者组中,称为PEL,即已发送但尚未确认的消息ID列表。后续在消费者处理完消息后,消费者必须手动调用XACK命令对消息ID进行确认,以便从PEL中删除挂起的消息,关于PEL的结构可参见下图(截取自RedisInsight工具):
在这里插入图片描述

上次接收的id>,表示消费者只希望接收从未传递给任何其他消费者的消息,即给我新的信息>号表示从当前消费组的last_delivered_id后面开始读。
上次接收的id 设为0或其他有效的id,则表示仅读取 PEL(当前consumer没有确认的消息) 中指定id之后的消息。

NOACK子命令式可选的,表示无需确认消息,NOACK子命令适用于对可靠性要求不高、偶尔的消息丢失是可以接受的情况,使用NOACK子命令可以避免将消息添加到PEL( Pending Entries List),相当于在读取消息后自动确认消息,后续无需再调用XACK命令进行确认,

示例:

# 消费者c1阻塞读取mystream下分组mygroup1的最新消息(直到读取到1条消息后解除阻塞)
XREADGROUP GROUP mygroup1 c1 BLOCK 0 STREAMS mystream >

# 消费者c1读取mystream下分组mygroup1的PEL消息(即已投递给c1但c1未进行确认的消息列表)
XREADGROUP GROUP mygroup1 c1 STREAMS mystream 0
1.2.3 XACK - 确认消息

确认stream下指定分组group的某条消息已被成功消费,XACK命令格式:

XACK stream名称 group名称 消息id

示例:

# 确认1条消息 
XACK mystream mygroup1 1719206857966-0 

# 同时确认3条消息
XACK mystream mygroup1 1719206857966-0 1719206909894-0 1719207195666-0
1.2.4 XPENDING - 读取PEL消息

读取stream中指定分组group的PEL挂起消息列表,XPENDING命令格式:

XPENDING stream名称 group名称 IDEL 空闲毫秒数 起始消息id 结束消息id 查询数量 consumer名称

示例:

# 查询mystream下mygroup1分组的PEL列表
XPENDING mystream mygroup1

# 查询mystream下mygroup1分组下的消费者c1的空闲9秒的最多10条PEL消息
XPENDING mystream mygroup1 IDLE 9000 - + 10 c1
1.2.5 XCLAIM & XAUTOCLAIM - 转移PEL中消息的所有权给其他消费者

通过XPENDING查询出PEL消息(已投递未确认)后,若原先消息对应的consumer已经挂掉,没有能力继续处理消息,则可通过XCLIAM将对应的消息转移给同分组下的其他consumer进行处理,XCLAIM命令格式如下:

XCLAIM stream名称 group名称 consumer名称 空闲时长毫秒 消息id1 消息id2

转移后消息上次投递时间会重置为当前时间(即消息空闲idle时间为0),
默认会返回已经转移成功的消息内容,且消息投递计数会加1,
也可添加JUSTID子命令,则只返回消息ID不返回消息内容,且消息投递计数不变,
若多个客户端同时通过XCLAIM转移同一条消息的所有权,则只会有一个客户端转移成功。
Redis官方原文如下:

Note that the message is claimed only if its idle time is greater than the minimum idle time we specify when calling XCLAIM. Because as a side effect XCLAIM will also

  • reset the idle time (since this is a new attempt at processing the message),
  • two consumers trying to claim a message at the same time will never both succeed: only one will successfully claim the message. This avoids that we process a given message multiple times in a trivial way (yet multiple processing is possible and unavoidable in the general case).

示例:

# mystream下mygroup1分组下的PEL消息1526569498055-0且空闲时长超过1小时,则将其转移给消费者c2
XCLAIM mystream mygroup1 c2 3600000 1526569498055-0

亦可通过XAUTOCLAIM将PEL中指定起始消息ID后的消息批量进行转移,XAUTOCLIAM命令格式如下:

XAUTOCLAIM stream名称 group名称 consumer名称 空闲时长毫秒 起始消息id COUNT 消息数量

示例:

# 扫描mystream下mygroup1分组下的所有PEL消息,空闲时长超过1小时,则最多转移25条消息给消费者c2
XAUTOCLAIM mystream mygroup1 c2 3600000 0-0 COUNT 25
1.2.6 统计命令
# 查询stream下的分组信息
XINFO GROUPS stream名称

# 查询stream信息
XINFO STREAM stream名称

# 查询stream下指定分组的消费者信息
XINFO CONSUMERS stream名称 group名称

1.3 其他

删除stream中的消息:

XDEL stream名称 id1 id2 …

查询stream中的消息(entry)数量:

XLEN stream名称

压缩stream中的消息数据量:

XTRIM stream名称 MAXLEN 保留的最近消息数量
XTRIM stream名称 MINID 消息ID(小于此ID的消息均会被删除)

二、Redisson Stream

在Redisson中可通过Stream实现Redis Stream,

场景1 相关示例代码如下:

@Test
void testStream() throws InterruptedException {
    String streamName = "mystream";
    MyMessage2 myMessage = this.buildMyMessageWithTimestampId();

    //获取Stream
    RStream<String, Object> stream = this.redisson.getStream(streamName);

    //发消息 - XADD mystream * name 我的消息 age 18
    StreamMessageId entryId = stream.add(StreamAddArgs.entries(myMessage.toMap()));
    log.info("stream[{}] add success, id: {}", streamName, entryId);

    //读消息 - XREAD COUNT 5 BLOCK 5000 STREAMS mystream 0
    Map<StreamMessageId, Map<String, Object>> entries = stream.read(StreamReadArgs.greaterThan(StreamMessageId.ALL).count(5).timeout(Duration.ofSeconds(5)));
    entries.forEach((id, entryMap) -> {
        log.info("stream[{}] read message: id={}, entry: {}", streamName, id, entryMap);
    });

    //读取区间内消息 - XRANGE mystream 0 entryId COUNT 10
    entries = stream.range(10, StreamMessageId.ALL, entryId);
    entries.forEach((id, entryMap) -> {
        log.info("stream[{}] range message: id={}, entry: {}", streamName, id, entryMap);
    });
}

场景2 相关示例代码如下:

@Resource
private RedissonClient redisson;

@Test
void testStreamGroup() throws InterruptedException {
   String streamName = "mystream";
   String groupName = "mygroup1";
   String consumerName = "c1";
   MyMessage2 myMessage = this.buildMyMessageWithTimestampId();

   //获取Stream
   RStream<String, Object> stream = this.redisson.getStream(streamName);

   //发消息 - XADD mystream * name 我的消息 age 18
   StreamMessageId entryId = stream.add(StreamAddArgs.entries(myMessage.toMap()));
   log.info("stream[{}] add success, id: {}", streamName, entryId);

   //查询已存在的分组 - XINFO GROUPS mystream
   List<StreamGroup> streamGroups = stream.listGroups();
   streamGroups.forEach(streamGroup -> {
       log.info("stream[{}] listGroups groupName: {}", streamName, streamGroup.getName());
   });
   Boolean existGroup = streamGroups.stream().anyMatch(group -> groupName.equals(group.getName()));
   if (!existGroup) {
       //创建分组 - XGROUP CREATE mygroup1 $
       stream.createGroup(StreamCreateGroupArgs.name(groupName)
               //此处id支持:NEWEST即$,ALL即0
               .id(StreamMessageId.ALL));
       log.info("stream[{}] createGroup success, groupName: {}", streamName, groupName);
   }

   //读分组消息 - XREADGROUP GROUP mygroup1 c1 COUNT 5 BLOCK 5000 STREAMS mystream >
   Map<StreamMessageId, Map<String, Object>> entries = stream.readGroup(groupName, consumerName,
           //greaterThan即设置从哪个消息ID之后开始读取,支持:NEVER_DELIVERED即>、ALL即0
           StreamReadGroupArgs.greaterThan(StreamMessageId.NEVER_DELIVERED)
                   .count(5)
                   .timeout(Duration.ofSeconds(5)));
   entries.forEach((id, entryMap) -> {
       log.info("stream[{}] readGroup groupName: {}, consumerName: {}, message: id={}, entry: {}",
               streamName, groupName, consumerName, id, entryMap);
   });

   //读取PEL中未确认的消息 - XPENDING mystream mygroup1 - + 100 c1
   Map<StreamMessageId, Map<String, Object>> streamMessageIdMapMap = stream.pendingRange(groupName, consumerName, StreamMessageId.MIN, StreamMessageId.MAX, 100);
   streamMessageIdMapMap.forEach((id, entryMap) -> {
       log.info("stream[{}] pendingRange groupName: {}, consumerName: {}, message: id={}, entry: {}",
               streamName, groupName, consumerName, id, entryMap);
       //确认消息(从PEL中移除) - XACK mystream mygroup1 1600000000000-0
       stream.ack(groupName, id);
       log.info("stream[{}] ack groupName: {}, consumerName: {}, message: id={}",
               streamName, groupName, consumerName, id);
   });

}


参考:

Redis Stream
https://redis.io/docs/latest/develop/data-types/streams/
https://redis.io/docs/latest/commands/xreadgroup/

Redisson Stream
https://github.com/redisson/redisson/wiki/7.-Distributed-collections#720-stream

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1868342.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ubuntu24 安装 docker

更新 apt-get sudo apt-get update 安装软件包 sudo apt-get install apt-transport-https ca-certificates curl software-properties-common 添加Docker的官方GPG密钥 curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add - 添加 Docker 仓库 …

不锈钢氩弧焊丝ER316L

说明&#xff1a;TG316L 是超低碳的不锈钢焊丝。熔敷金属耐蚀、耐热、抗裂性能优良。防腐蚀性能良好。 用途:用于石油化工、化肥设备等。也可用于要求焊接后不进行热处理的高Cr钢的焊接。

Dynamic-Link库 (动态链接库)

一、动态链接库&#xff08;DLL&#xff09;的基本概念 二、动态链接库的优势 三、动态链接库的实现方法 四、动态链接库的版本冲突问题&#xff08;DLL地狱&#xff09; 五、动态链接库与静态链接库的区别 一、动态链接库&#xff08;DLL&#xff09;的基本概念 定义&…

数据结构(Java):ArrayList的应用

1、引言 上一篇博客&#xff0c;已经为大家讲解了集合类ArrayList。 这篇博客&#xff0c;就来帮助大家学会使用ArrayList。 2、题1&#xff1a; 删除字符&#xff08;热身题&#xff09; 题目&#xff1a;给出str1和str2两个字符串&#xff0c;删除str1中出现的所有的str2…

kafka进阶核心原理详解:案例解析(第11天)

系列文章目录 kafka高级&#xff08;重点&#xff09; kafka核心概念汇总 kafka的数据位移offset Kafka的基准/压力测试 Kafka的分片副本机制 kafka如何保证数据不丢失 kafka的消息存储及查询机制 生产者数据分发策略 消费者负载均衡机制 kafka的监控工具:kafka-eagle…

基于Java的多元化智能选课系统-计算机毕业设计源码040909

摘 要 多元化智能选课系统使用Java语言的Springboot框架&#xff0c;采用MVVM模式进行开发&#xff0c;数据方面主要采用的是微软的Mysql关系型数据库来作为数据存储媒介&#xff0c;配合前台技术完成系统的开发。 论文主要论述了如何使用JAVA语言开发一个多元化智能选课系统&a…

本地调试时不将服务挂到nacos

本地调试的时候不将服务挂到nacos从而影响前端和测试使用&#xff0c;需要在edit Configurations...加入一句配置信息。 -Dspring.cloud.nacos.discovery.register-enabledfalse

Flutter Navigator.popUntil 参数传递

Flutter 使用页面传参 以下是 在flutter 中页面传参的常用形式&#xff0c;都可以有有直接的传值参数提供。 Navigator.push #跳转到指定页面 压栈路由表Navigator.pushReplacement #关闭当前页面 跳转到指定页面压栈路由表Navigator.pus…

JMeter安装与使用

安装包下载&#xff1a;https://pan.xunlei.com/s/VNigSM9IEjqNBVkw8by6i-LoA1?pwdu6gq# 也可以官网下载&#xff1a; 1.解压安装包 2.打开方式 &#xff08;1&#xff09;bin->ApacheJMeter.jar->打开界面 &#xff08;2&#xff09;如果&#xff08;1&#xff09;打…

Node.js全栈指南:认识MIME和HTTP

MIME&#xff0c;全称 “多用途互联网邮件扩展类型”。 这名称相当学术&#xff0c;用人话来说就是&#xff1a; 我们浏览一个网页的时候&#xff0c;之所以能看到 html 文件展示成网页&#xff0c;图片可以正常显示&#xff0c;css 样式能正常影响网页效果&#xff0c;js 脚…

Xilinx FPGA:vivado这里记录一个小小的问题

问题描述&#xff1a;uart_data从rx模块输入到ctrl模块后就没有值了。 问题一&#xff1a;首先我仿真例化了两个模块&#xff0c;并且&#xff0c;我选取了单独例化的rx模块中的uart_data 的值&#xff0c;所以在仿真中它是有值的。 timescale 1ns / 1ps module test_bench_TO…

如何高效使用 .http 文件记录和测试API接口

1. 前言 在现代软件开发中&#xff0c;API&#xff08;应用程序接口&#xff09;成为了系统间通信的重要桥梁。.http 文件作为一种轻量级的API请求描述方式&#xff0c;不仅便于开发者记录和分享API接口信息&#xff0c;还能够帮助自动化测试流程。本文将深入介绍如何有效地使…

【论文阅读】-- MultiStream:探索分层时间序列的多分辨率流图方法

MultiStream: A Multiresolution Streamgraph Approach to Explore Hierarchical Time Series 摘要1 引言2相关工作2.1 堆叠图和流图可视化2.2 时间序列的层次结构2.3 交互技术 3 需求分析4 视觉映射和功能4.1 设计原理总结4.2 概述4.3 多分辨率视图4.4 控制器4.5 层次管理器 5…

【知识学习】阐述Unity3D中FogLOD的概念及使用方法示例

在Unity3D中&#xff0c;Fog&#xff08;雾效&#xff09;和LOD&#xff08;Level of Detail&#xff0c;细节层次&#xff09;是两种用于提高场景视觉效果和性能的技术。 Fog&#xff08;雾效&#xff09; 雾效是一种视觉效果&#xff0c;用于模拟大气中的雾或烟&#xff0c…

纠结要不要选计算机专业,问问自己这个问题

又到了一年一度高考填志愿的时候&#xff0c;几年前我做过一个介绍计算机专业的视频。有需要的同学可以去看下 有人看完视频问我&#xff1a; 你到底是在推荐还是在劝退计算机&#xff1f; 还有人说&#xff1a; 我也不知道自己对计算机有没有兴趣&#xff0c;怎么办&#xff1…

应用案例 | 如何监测高价值货物在物流运输过程中受到的振动和冲击?全面保障货物安全

一、货物运输 不同种类的货物对运输的要求不同&#xff0c;钢铁、煤炭、矿石等大宗物资通常对运输要求较低&#xff0c;而电子产品、IT 产品、家电等高价值敏感类货物则更强调运输的安全性和时效性&#xff0c;往往希望能尽可能安全和快速送达这类货物&#xff0c;使之尽快进入…

uni-app与原生插件混合开发调试1-环境准备

uni-app与原生插件混合开发调试系列文章分为3篇&#xff0c;分别详细讲了《环境准备》、《搭建uni-app本地开发调试环境》和《安卓原生插件开发调试和打包》&#xff0c;3篇文章完整详细地介绍了“从环境安装配置到本地开发调试到原生插件打包”整个流程。 相关名词和概念解释…

计算机网络面试TCP篇之TCP三次握手与四次挥手

TCP 三次握手与四次挥手面试题 任 TCP 虐我千百遍&#xff0c;我仍待 TCP 如初恋。 巨巨巨巨长的提纲&#xff0c;发车&#xff01;发车&#xff01; PS&#xff1a;本次文章不涉及 TCP 流量控制、拥塞控制、可靠性传输等方面知识&#xff0c;这些知识在这篇&#xff1a; TCP …

神州信息与国科量子联合进军量子网络应用服务市场(中国军民两用通信技术展览会)

量子通信&#xff0c;智联未来 —— 神州信息与国科量子共启安全通信新纪元 在信息技术飞速发展的今天&#xff0c;信息安全已成为全球关注的焦点。神州数码信息服务股份有限公司&#xff08;神州信息&#xff09;与国科量子通信网络有限公司&#xff08;国科量子&#xff09;…

PyCharm 2024.1最新变化

PyCharm 2024.1 版本带来了一系列激动人心的新功能和改进&#xff0c;以下是一些主要的更新亮点: Hugging Face 模型和数据集文档预览&#xff1a;在 PyCharm 内部快速获取 Hugging Face 模型或数据集的详细信息&#xff0c;通过鼠标悬停或使用 F1 键打开文档工具窗口来预览。 …