redis stream 作为消息队列的最详细的命令说明文档

news2024/11/15 11:31:27

简介

stream 作为消息队列,支持多次消费,重复消费,ack机制,消息异常处理机制。
涉及到以下几个概念,消息流,消费者组,消费者。
在这里插入图片描述
在这里插入图片描述

涉及到以下命令

# 添加消息到流中
XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]
# 创建消费则组(加上MKSTREAM,会校验消息流是否存在,不存在会创建)
XGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]
# 消费者读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]   [NOACK] STREAMS key [key ...] id [id ...]

1. 环境

redis server 7.2.2

> info server
# Server
redis_version:7.2.2

redis可视化工具(可以直接使用命令行) redisInsight 2.44.0

2. 生产消费流程测试

消息队列,涉及到如下几个流程

  1. 发送消息到消息流
  2. 创建消费者组,并进行消费
  3. 正常消费,消息确认 ack
  4. 异常消费,转移消息的归属权 claim

2.1 发送消息到消息流

使用如下命令发送消息

XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]

各参数说明

参数名说明
NOMKSTREAM默认情况下,如果消息流不存在,则会创建消息流。
使用该参数,则不会创建,如果不存在则返回 (nil)
[<MAXLEN | MINID> [= | ~] threshold [LIMIT count]]等同于 xtrim 的参数,在添加消息后,会对 stream 裁剪,将先加入的消息剔除。
MAXLEN 表示stream长度不大于 threshold,MINID 表示stream的消息id不小于 threshold;
= 表示精确删除 ~ 表示近似删除;
threshold 表示长度或者id;
limit count 表示最多剔除多少消息
<* | id>* 表示由系统生成消息id,id 表示用用户指定的消息id
field value [field value …]消息采用键值对列表形式存储
# 1.1 执行 lua 脚本,批量添加 10000 个消息
eval "local key = 'test:stream_1';redis.call('del', key); for i=1,ARGV[1],1 do redis.call('xadd', key, i + '-0', 'index' i) end;local res = redis.call('xinfo', 'stream', key); return res[6];" 0 10000
# 1.2 查看 stream 的信息
xinfo stream test:stream_1

# 1.3 添加消息,后执行精确修剪。不输入 = | ~,表示使用 = 。会添加一条消息,然后删除消息,使流长度为10
xadd test:stream_1 maxlen 10  '10001-0' index 10001

# 1.4 近似删除。stream 中的消息是以基数树的结构存储,一个节点可能存储多个数据,所以当某个节点中存在
# 不能删除的数据时,这个节点就不会删除,因此会导致裁剪后的数据多一些。一个节点会存储 100 个数据。
# 取消 1.3 命令,执行如下命令后,流的长度会变成 101
xadd test:stream_1 maxlen ~ 10 '10001-0' index 10001

# 1.5 精确删除,根据 MINID ;添加一个消息,并将流中所有消息id 小于 1714746952323-9 删除
# 取消 1.3, 1.4 的命令,执行如下命令。结果是,保留 id >= '9001-0' 的所有数据, 流的长度会变成 1001
xadd test:stream_1 minid = '9001-0' '10001-0' index 10001
# 1.6 近似删除,根据 minid 和 limit count。
# 取消 1.3、1.4、1.5 的命令,执行如下命令。结果是,保留 id >= '9001-0' 的所有数据,并且最多删除 8950 个, 流的长度会变成 1101
# 因为限制 删除 8950 个,所以最后一个节点,计算到一半发现不能删除了,所以最后计算的节点的数据全部保留,故只删除了 8900个
xadd test:stream_1 minid ~ '9001-0' limit 8950 '10001-0' index 10001

为什么 xadd 需要添加 xtrim 的操作呢?因为有些消息,如果闲置的时间太长是要废弃掉的;所以可以加上这个。

xinfo stream test:stream_0 返回的结果字段中
radix-tree-keys:表示有几个id节点,一个id节点 至多会存储 100 个 id
radix-tree-nodes: radix tree 节点数量

2. 创建消费者组,消费消息

XGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]   [NOACK] STREAMS key [key ...] id [id ...]

2.1 创建消费者组

MKSTREAM: 表示如果 stream 不存在则创建。如果不加 MKSTREAM参数,且stream不存在,执行的 xgroup create 会报错。
ENTRIESREAD:Redis version 7.0.0 可以添加此参数。如果使用了 ENTRIESREAD entries-read 参数, 设置 entries-read 已消费数量;lag 待消费数量, entries-read + lag 等于总数(包含已删除的消息数)

# id = 0 表示 从头开始消费
# id = 具体id,表示从指定 id 之后开始消费,不包含当前id
# id = $ 消费新消息
# 每一个消费者组都有一个 last_delivered_id 记录发送的最后一个消息id, 相互之间不会影响,比如来了一个新消息加入到队列中,通过 xreadgroup 可以让每一个消费者组都消费
# last_delivered_id = 0-0, 1714581497948-0, stream 中的最大的id
xgroup create test:stream_1 test:stream_1:group_0 0
# last_delivered_id = 1714581497948-0
xgroup create test:stream_1 test:stream_1:group_1 1714581497948-0
# last_delivered_id = stream 中的最大的id
xgroup create test:stream_1 test:stream_1:group_2 $

# 执行一下命令之后 消费者组的 entries-read = 1, lag = stream.entries-added - entries-read
xgroup create test:stream_1 test:stream_1:group_3 0 ENTRIESREAD 1
xgroup create test:stream_1 test:stream_1:group_4 1714581525681-0 ENTRIESREAD 1
xgroup create test:stream_1 test:stream_1:group_5 $ ENTRIESREAD 1

2.2 拉取消息消费

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]   [NOACK] STREAMS key [key ...] id [id ...]
  • GROUP group consumer: 与stream 绑定的 消费者组、消费者
  • COUNT count: 查找最大消息数量
  • BLOCK milliseconds: 如果一条消息都没有,阻塞多少时间
  • NOACK: 无需消息确认。相当于在读取的时候就已经确认消息了。
  • STREAMS key [key …] id [id …]
    • id 为 “>” 表示取 stream 中 message_id > consumer_group.last_delivered_id 的消息
    • id 为特定数字,表示 从 padding_list 中取 message_id > id 消息。 使用了具体id, BLOCK 和 NOACK 无效。
# 获取全部未消费的消息
XREADGROUP GROUP test:stream_1:group_0 test:stream_1:group_0:consumer_0 STREAMS test:stream_1 >
# 获取至多 10 条消息;若一条消息都没有,等待 20 秒。超时返回 nil
XREADGROUP GROUP test:stream_1:group_0 test:stream_1:group_0:consumer_0 count 10 BLOCK 20000 STREAMS test:stream_1 >
# 获取正在消费中的消息
XREADGROUP GROUP test:stream_1:group_0 test:stream_1:group_0:consumer_0 count 100 STREAMS test:stream_1 0

3. 正常消费,消息确认 ack

XACK key group id [id ...]

key 流名称
group 组名称
id 消息id

# 读取pel列表(pedding entries list: 消费中的列表)的消息的id,并确认
eval "local key = 'test:stream_1';local list = redis.call('xreadgroup','group','test:stream_1:group_0','test:stream_1:group_0:consumer_0','STREAMS',key,0); local entries = list[1][2];local sum = 0; for i=1,#entries, 1 do sum = sum + redis.call('xack', key, 'test:stream_1:group_0', entries[i][1]); end; return sum;" 0

4. 异常消费,转移消息的归属权 claim

XCLAIM key group consumer min-idle-time id [id ...] [IDLE ms] [TIME unix-time-milliseconds] [RETRYCOUNT count] [FORCE] [JUSTID] [LASTID lastid]
  • min-idle-time:最小闲置时间,如果闲置时间小于min-idle-time,则不处理
  • IDLE :设置消息的空闲时间(上次发送时间)。如果未指定 IDLE,则假定 IDLE 为 0,即重置时间计数,因为该消息现在有一个新所有者正在尝试处理它。
  • TIME :这与 IDLE 相同,但不是相对的毫秒数,而是将空闲时间设置为特定的 Unix 时间(以毫秒为单位)。像当于设置下发时间
  • RETRYCOUNT :将重试计数器设置为指定值。每次再次传送消息时,该计数器都会递增。通常XCLAIM不会更改此计数器,该计数器仅在调用 XPENDING 命令时提供给客户端:这样客户端可以检测异常情况,例如在大量传递尝试后由于某种原因从未处理的消息。
  • FORCE:即使某些指定的 ID 尚未在分配给其他客户端的 PEL 中,也会在 PEL 中创建待处理消息条目。但是该消息必须存在于流中,否则不存在的消息的 ID 将被忽略。
  • JUSTID:仅返回成功领取的消息ID数组,不返回实际消息。使用此选项意味着重试计数器不会增加。
# 强制处理id为 11-0 的,闲置时间大于 1 小时的消息;设置闲置时间为 0
xclaim test:stream_1 test:stream_1:group_0 test:stream_1:group_0:consumer_2 3600000 '11-0' IDLE 0 TIME 15 RETRYCOUNT 1 FORCE JUSTID
# 设置下发时间,并返回待处理消息列表
eval "local key = 'test:stream_1';local group = 'test:stream_1:group_0';local consumer = 'test:stream_1:group_0:consumer_2';local id = '11-0';local t = redis.call('time');local time = t[1] * 1000;redis.call('xclaim', key, group, consumer, 3600, id, 'TIME', time, 'RETRYCOUNT', 1, 'FORCE', 'JUSTID');return redis.call('xpending', key, group, '-', '+', 10, consumer);" 0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1646193.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

动态规划——背包问题(01,完全,多重)

一、01背包问题 1.题目描述 有 N 件物品和一个容量是 V 的背包。每件物品只能使用一次。第 i 件物品的体积是 vi&#xff0c;价值是 wi。 求解将哪些物品装入背包&#xff0c;可使这些物品的总体积不超过背包容量&#xff0c;且总价值最大。输出最大价值。 01背包问题特点&…

不考408的985,不想考408的有福了!吉林大学计算机考研考情分析

吉林大学&#xff08;Jilin University&#xff09;简称吉大&#xff0c;位于吉林长春&#xff0c;始建于1946年&#xff0c;是中华人民共和国教育部直属的综合性全国重点大学&#xff0c;国家“双一流”、“211工程”、“985工程”、“2011计划”重点建设的著名学府&#xff0…

免费分享一套微信小程序商城系统(电商系统)(SpringBoot+Vue3)【至尊版】,帅呆了~~

大家好&#xff0c;我是java1234_小锋老师&#xff0c;自己原创写了一个不错的微信小程序商城系统(电商系统)(SpringBootVue3)【至尊版】&#xff0c;免费分享下哈。 项目视频演示 【免费】微信小程序商城系统(电商系统)(SpringBootVue3) 【至尊版】Java毕业设计_哔哩哔哩_bi…

【数据结构与算法】之五道链表进阶面试题详解!

目录 1、链表的回文结构 2、相交链表 3、随机链表的复制 4、环形链表 5、环形链表&#xff08;||&#xff09; 6、完结散花 个人主页&#xff1a;秋风起&#xff0c;再归来~ 数据结构与算法 个人格言&#xff1a;悟已往之不谏&#xff0c;知…

自动驾驶主流芯片及平台架构(二)特斯拉自动驾驶芯片平台介绍

早期 对外采购mobileye EyeQ3 芯片摄像头半集成方案&#xff0c;主要是为了满足快速量产需求&#xff0c;且受制于研发资金不足限制&#xff1b; 中期 采用高算力NVIDIA 芯片平台其他摄像头供应商的特斯拉内部集成方案&#xff0c;mobileye开发节奏无法紧跟特斯拉需求&#xff…

pyside6的调色板QPalette的简单应用

使用调色板需要先导入:from PySide6.QtGui import QPalette 调色板QPalette的源代码&#xff1a; class QPalette(Shiboken.Object):class ColorGroup(enum.Enum):Active : QPalette.ColorGroup ... # 0x0Normal : QPalette.ColorGrou…

基于C++基础知识的循环语句

一、while循环 while循环语句形式如下&#xff1a; while(表达式){语句 } 循环每次都是执行完语句后回到表达式处重新开始判断&#xff0c;重新计算表达式的值&#xff0c;一旦表达式的值为假就退出循环。用花括号括起来的多条简单语句&#xff0c;花括号及其包含的语句被称…

公网tcp转流

之前做过几次公网推流的尝试, 今天试了UDP推到公网, 再用TCP从公网拉下来, 发现不行, 就直接改用TCP转TCP了. 中间中转使用的python脚本, 感谢GPT提供技术支持: import socket import threadingdef tcp_receiver(port, forward_queue):"""接收TCP数据并将其放入…

【简单介绍下7-Zip】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

Fireworks AI和MongoDB:依托您的数据,借助优质模型,助力您开发高速AI应用

我们欣然宣布 MongoDB与 Fireworks AI 正携手合作 让客户能够利用生成式人工智能 (AI) 更快速、更高效、更安全地开展创新活动 Fireworks AI由 Meta旗下 PyTorch团队的行业资深人士于 2022 年底创立&#xff0c;他们在团队中主要负责优化性能、提升开发者体验以及大规模运…

五款优秀的局域网监控软件推荐:实时监控电脑屏幕的神器

在现代社会&#xff0c;计算机和网络已经成为工作中不可或缺的部分。随着局域网的普及&#xff0c;如何有效地监控和管理局域网内的电脑成为了许多企业和管理者关心的问题。本文将为您推荐五款优秀的局域网监控软件&#xff0c;帮助您实时监控电脑屏幕&#xff0c;提高工作效率…

宏电全栈式IoT赋能供排水智能监测,护航城市生命线

城市供水、排水系统是维系城市正常运行、满足群众生产生活需要的重要基础设施&#xff0c;是城市的“生命线”。随着城市化进程加快&#xff0c;城市规模不断扩大&#xff0c;地下管线增长迅速&#xff0c;城市“生命线安全”的监管日益面临挑战。 宏电作为物联网行业的领航者…

尊享面试100(272.最接近的二叉树搜索值|| python)

刚开始想着用最小堆&#xff0c;把每个元素都加进去&#xff0c;然后找出最小的k个值&#xff0c;复杂度应该是&#xff08;nklogn) import heapq as pq class Solution:def __init__(self):self.h []pq.heapify(self.h)def closestKValues(self, root: Optional[TreeNode], …

LLVM的ThinLTO编译优化技术在Postgresql中的应用

部分内容引用&#xff1a;https://blog.llvm.org/2016/06/thinlto-scalable-and-incremental-lto.html LTO是什么&#xff1f; 链接时优化&#xff08;Link-time optimization&#xff0c;简称LTO&#xff09;是编译器在链接时对程序进行的一种优化。它适用于以文件为单位编译…

博客系统项目测试报告

文章目录 一.报告概要二.测试环境三.手工测试用例四.编写测试用例五.自动化测试Selenium测试项目主要特点 一.报告概要 项目概要 本项目是一个全功能的个人博客系统&#xff0c;旨在提供一个用户友好、功能全面的平台&#xff0c;允许用户注册、登录、浏览博客、查看详细内容、…

嵌入式学习

笔记 作业 有如下结构体 struct Student{ char name[16]; int age; double math_score; double chinese_score; double english_score; double physics_score; double chemistry…

Linux用户日志审计系统

标题日期版本说明作者 用户日志审计系统 2024.05.06v.0.0.1权限lgb 测试环境&#xff1a;CentOS Stream 9 测试过程&#xff1a; 测试开始前&#xff0c;首先我们先建立一个用户。 将文件备份。 我们通过vim编辑器&#xff0c;打开 /etc/profile 文件进行编辑。 将提前编辑好…

【C语言】第一个C程序:hello world

printf简介 printf是C语言提供的库函数&#xff0c;可以在屏幕上打印格式化数据。这里不作展开&#xff0c;只需要知道&#xff0c;如果要打印hello world&#xff0c;就把双引号引起来的"hello world"作为参数传给printf就行了。如果想要在打印后换行&#xff0c;要…

Scratch编程v3.29.1少儿编程工具

软件介绍 SCRATCH是一款由麻省理工学院&#xff08;MIT&#xff09;媒体实验室开发的图形化编程语言和集成开发环境&#xff08;IDE&#xff09;。它的目标是让编程变得有趣、直观且易学&#xff0c;尤其是针对儿童和青少年群体。通过SCRATCH&#xff0c;用户可以通过拖放代码…

python安装问题及解决办法(pip不是内部或外部命令也不是可运行)

pip是python的包管理工具&#xff0c;使python可在cmd&#xff08;命令行窗口&#xff0c;WinR后输入cmd&#xff09;中执行 针对 “pip不是内部或外部命令也不是可运行” 问题&#xff0c;需要在安装的时候将python添加到环境变量中 上图第三个选项必须勾选才能在cmd中使用pi…