redis的持久化消息队列

news2025/1/27 12:48:55

Redis Stream

Redis Stream 是 Redis 5.0 版本新增加的数据结构。

Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

上图解析:

  • Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
  • last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
  • pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。

消息队列相关命令:

  • XADD - 添加消息到末尾
  • XTRIM - 对流进行修剪,限制长度
  • XDEL - 删除消息
  • XLEN - 获取流包含的元素数量,即消息长度
  • XRANGE - 获取消息列表,会自动过滤已经删除的消息
  • XREVRANGE - 反向获取消息列表,ID 从大到小
  • XREAD - 以阻塞或非阻塞方式获取消息列表

消费者组相关命令:

  • XGROUP CREATE - 创建消费者组
  • XREADGROUP GROUP - 读取消费者组中的消息
  • XACK - 将消息标记为"已处理"
  • XGROUP SETID - 为消费者组设置新的最后递送消息ID
  • XGROUP DELCONSUMER - 删除消费者
  • XGROUP DESTROY - 删除消费者组
  • XPENDING - 显示待处理消息的相关信息
  • XCLAIM - 转移消息的归属权
  • XINFO - 查看流和消费者组的相关信息;
  • XINFO GROUPS - 打印消费者组的信息;
  • XINFO STREAM - 打印流信息

XADD

使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列,XADD 语法格式:

XADD key ID field value [field value ...]
  • key :队列名称,如果不存在就创建
  • ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
  • field value : 记录。

实例

redis> XADD mystream * name Sara surname OConnor
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) "1601372323627-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "OConnor"
2) 1) "1601372323627-1"
   2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
      5) "field3"
      6) "value3"
redis>

XTRIM

使用 XTRIM 对流进行修剪,限制长度, 语法格式:

XTRIM key MAXLEN [~] count
  • key :队列名称
  • MAXLEN :长度
  • count :数量

实例

127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1601372434568-0"
   2) 1) "field1"
      2) "A"
      3) "field2"
      4) "B"
      5) "field3"
      6) "C"
      7) "field4"
      8) "D"
127.0.0.1:6379>

redis>

XDEL

使用 XDEL 删除消息,语法格式:

XDEL key ID [ID ...]
  • key:队列名称
  • ID :消息 ID

实例

> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
   2) 1) "a"
      2) "1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) "3"

XLEN

使用 XLEN 获取流包含的元素数量,即消息长度,语法格式:

XLEN key
  • key:队列名称

实例

redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3
redis>

XRANGE

使用 XRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XRANGE key start end [COUNT count]
  • key :队列名
  • start :开始值, - 表示最小值
  • end :结束值, + 表示最大值
  • count :数量

实例

redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) "1601372577811-0"
   2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) "1601372577811-1"
   2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"
redis>

XREVRANGE

使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XREVRANGE key end start [COUNT count]
  • key :队列名
  • end :结束值, + 表示最大值
  • start :开始值, - 表示最小值
  • count :数量

实例

redis> XADD writers * name Virginia surname Woolf
"1601372731458-0"
redis> XADD writers * name Jane surname Austen
"1601372731459-0"
redis> XADD writers * name Toni surname Morrison
"1601372731459-1"
redis> XADD writers * name Agatha surname Christie
"1601372731459-2"
redis> XADD writers * name Ngozi surname Adichie
"1601372731459-3"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
1) 1) "1601372731459-3"
   2) 1) "name"
      2) "Ngozi"
      3) "surname"
      4) "Adichie"
redis>

XREAD

使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • count :数量
  • milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
  • key :队列名
  • id :消息 ID

实例

# 从 Stream 头部读取两条消息
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) "1532"
            3) "event-id"
            4) "5"
            5) "user-id"
            6) "7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) "812"
            3) "event-id"
            4) "9"
            5) "user-id"
            6) "388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"
 

XGROUP CREATE

使用 XGROUP CREATE 创建消费者组,语法格式:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • key :队列名称,如果不存在就创建
  • groupname :组名。
  • $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

从头开始消费:

XGROUP CREATE mystream consumer-group-name 0-0  

从尾部开始消费:

XGROUP CREATE mystream consumer-group-name $

XREADGROUP GROUP

使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group :消费组名
  • consumer :消费者名。
  • count : 读取数量。
  • milliseconds : 阻塞毫秒数。
  • key : 队列名。
  • ID : 消息 ID。
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1061816.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OOTD | 美式复古穿搭耳机,复古轻便的头戴式耳机推荐

复古耳机更能带来年代感的复古数码产品,头戴式耳机就好似是时光滤镜的时髦配饰,不说功能实用性,在造型上添加就很酷。 随着时代的发展,时尚有了新的定义。对如今的消费者来说,时尚不仅是美学与个性的展现,…

各种机器码的本质(原码、反码、补码、移码、IEEE754格式阶码)

总述 无论使用什么格式的机器码来表示真值,若取一定位数n以后,各个比特位的排列个数是一定的,为 2 n 2^n 2n种排列,所以选择什么格式的机器码实质上选择什么映射方式来完成从这 2 n 2^n 2n种离散排列到离散的整数真值的映射&…

Open X-Embodiment Robotic Learning Datasets and RT-X Models

文章目录 简介论文链接项目链接Reference 简介 为什么机器人技术远远落后于 NLP、视觉和其他 AI 领域?除其他困难外,数据短缺是罪魁祸首。谷歌 DeepMind 联合其他机构推出了 Open X-Embodiment 数据集,并训练出了能力更强的 RT-X 模型。 Dee…

6-1 选择排序

#include <stdio.h>#define N 1000 int arr[N];/* 对长度为n的数组arr执行选择排序 */ void selectionSort(int arr[], int n);/* 打印长度为n的数组arr */ void printArray(int arr[], int n);void swap(int *xp, int *yp) {int temp *xp;*xp *yp;*yp temp; }int mai…

第八章 排序 二、插入排序

目录 一、算法思想 二、例子 三、代码实现 四、空间复杂度 五、时间复杂度 1、最好的情况 2、最坏的情况 六、优化&#xff08;折半插入排序&#xff09; 七、总结 一、算法思想 每次将一个待排序的记录按其关键字大小插入到前面已排好序的子序列中&#xff0c;直到所…

STL-stack、queue和priority_queue的模拟实现

目录 一、容器适配器 &#xff08;一&#xff09;什么是适配器 &#xff08;二&#xff09;stack和queue的底层结构 二、Stack 三、queue 四、deque双端队列 &#xff08;一&#xff09;优点 &#xff08;二&#xff09;缺陷 五、优先级队列 &#xff08;一&#xff…

如何在终端输出颜色

效果演示: 【看 welcome to here 部分】 环境&#xff1a; Node.js 18.16.0 正文部分 我们可以通过 console.log() 在终端打印字符串。 只要在我们的字符串前面加上转义字符即可。 差不多就是下面这样的结构&#xff1a; 用代码就是&#xff1a; console.log("\x1B…

学习记忆——宫殿篇——记忆宫殿——地点桩——演讲稿定位记忆

其实在演讲的时候有很多人会遇到这样的情况&#xff0c;演讲内容准备的滚瓜烂熟&#xff0c;但是当自己在台上十分紧张的时候&#xff0c;突然忘记要说的内容。 今天在这里就用记忆宫殿的方法为大家解决这样一个问题。 注意&#xff1a;我们在运用这种记忆方法的前提是你已经对…

python笔记:pandas/geopandas DataFrame逐行遍历

在Pandas和GeoPandas中&#xff0c;可以使用几种不同的方法来遍历DataFrame的每一行 0 数据 import pandas as pddata {column1: range(1, 1001),column2: range(1001, 2001) } df pd.DataFrame(data) df 1 iterrows for index, row in df.iterrows():print(index)print(r…

SRT服务器SLS

目前互联网上的视频直播有两种&#xff0c;一种是基于RTMP协议的直播&#xff0c;这种直播方式上行推流使用RTMP协议&#xff0c;下行播放使用RTMP&#xff0c;HTTPFLV或者HLS&#xff0c;直播延时一般大于3秒&#xff0c;广泛应用秀场、游戏、赛事和事件直播&#xff0c;满足了…

MySQL的复合查询

文章目录 1. 多表查询2. 自连接3. 子查询3.1 单行子查询3.2 多行单列子查询3.3 单行多列子查询3.4 在from子句中使用子查询 4. 合并查询4.1 union all4.2 union 5. 内连接6. 外连接6.1 左外连接6.2 右外连接 1. 多表查询 前面我们讲解的mysql表的查询都是对一张表进行查询&…

[架构之路-230]:计算机硬件与体系结构 - 可靠性、可用性、稳定性;MTTF、MTTR、MTBF

目录 一、软件质量属性 二、可靠性、可用性、稳定性区别 2.1 比较 2.2 公式比较 2.3 "正常工作时间"和"正常运行时间" 2.4 比较案例 2.5 可用性好但可靠性较差的示例 三、MTTF、MTTR、MTBF 3.1 图示 3.2 定义 &#xff08;1&#xff09;MTTF&am…

【uniapp+vue3+ts】请求函数封装,请求和上传文件拦截器

1、uniapp 拦截器 uni.addInterceptor(STRING,OBJECT) 拦截器中包括基础地址、超时时间、添加请求头标识、添加token utils文件夹下新建http.ts 拦截uploadFile文件上传&#xff0c;rquest请求接口 cosnt baseUrl xxxx// 添加拦截器 const httpInterceptor {//拦截前触发i…

Python逐日填补Excel中的日期并用0值填充缺失日期的数据

本文介绍基于Python语言&#xff0c;读取一个不同的列表示不同的日期的.csv格式文件&#xff0c;将其中缺失的日期数值加以填补&#xff1b;并用0值对这些缺失日期对应的数据加以填充的方法。 首先&#xff0c;我们明确一下本文的需求。现在有一个.csv格式文件&#xff0c;其第…

几种开源协议的区别(Apache、MIT、BSD、MPL、GPL、LGPL)

作为一名软件开发人员&#xff0c;你一定也是经常接触到开源软件&#xff0c;但你真的就了解这些开源软件使用的开源许可协议吗&#xff1f; 你不会真的认为&#xff0c;开源就是完全免费吧&#xff1f;那么让我们通过本文来寻找答案。 一、开源许可协议简述 开源许可协议是指开…

CI/CD工具中的CI和CD的含义

CI/CD工具中的CI和CD的含义&#xff1f; CI/CD 是现代软件开发方法中广泛使用的一种方法。其中&#xff0c;CI 代表持续集成&#xff08;Continuous Integration&#xff09;&#xff0c;CD 则有两层含义&#xff0c;一是持续交付&#xff08;Continuous Delivery&#xff09;…

Linux CentOS7 vim寄存器

计算机中通常所说的寄存器Register一般指的是CPU中的寄存器&#xff0c;用来暂存CPU处理所需要的指令、数据等。 vim中同样也有寄存器&#xff0c;使用的方式和CPU非常类似。 vim中的寄存器(register)作用和windows中的剪切板类似&#xff0c;不过vim中的寄存器不止一个&…

前后端协议后端统一返回格式Result

/*** 后端统一返回结果* param <T>*/ Data public class Result<T> implements Serializable {private Integer code; //编码&#xff1a;1成功&#xff0c;0和其它数字为失败private String msg; //错误信息private T data; //数据public static <T> Result…

STM32芯片为什么有那么多组VDD?

对于一般的IC元器件通常只有两个电源引脚&#xff0c;一个是Vcc或Vdd&#xff0c;和GND或Vss。学习过32单片机的小伙伴肯定都会有这个发现&#xff0c;32单片机有多组VDD&#xff0c;单片机的引脚资源那么珍贵&#xff0c;为什么要在这里浪费那么多引脚呢&#xff1f;还有就是引…

十三、Django之添加用户(原始方法实现)

修改urls.py path("user/add/", views.user_add),添加user_add.html {% extends layout.html %} {% block content %}<div class"container"><div class"panel panel-default"><div class"panel-heading"><h3 c…