Redis Stream消息队列

news2024/11/17 1:53:48

什么是Stream?

Stream 实际上是一个具有消息发布/订阅功能的组件,也就常说的消息队列。其实这种类似于 broker/consumer(生产者/消费者)的数据结构很常见,比如 RabbitMQ 消息中间件、Celery 消息中间件,以及 Kafka 分布式消息系统等,而 Redis Stream 正是借鉴了 Kafaka 系统。

1) 优点

Strean 除了拥有很高的性能和内存利用率外, 它最大的特点就是提供了消息的持久化存储,以及主从复制功能,从而解决了网络断开、Redis 宕机情况下,消息丢失的问题,即便是重启 Redis,存储的内容也会存在。

2) 流程

Stream 消息队列主要由四部分组成,分别是:消息本身、生产者、消费者和消费组,对于前述三者很好理解,下面了解什么是消费组。

一个 Stream 队列可以拥有多个消费组,每个消费组中又包含了多个消费者,组内消费者之间存在竞争关系。当某个消费者消费了一条消息时,同组消费者,都不会再次消费这条消息。被消费的消息 ID 会被放入等待处理的 Pending_ids 中。每消费完一条信息,消费组的游标就会向前移动一位,组内消费者就继续去争抢下消息。

 Redis Stream 消息队列结构程如下图所示:

Redis Stream结构图

 

下面对上图涉及的专有名词做简单解释:
  • Stream direction:表示数据流,它是一个消息链,将所有的消息都串起来,每个消息都有一个唯一标识 ID 和对应的消息内容(Message content)。
  • Consumer Group :表示消费组,拥有唯一的组名,使用 XGROUP CREATE 命令创建。一个 Stream 消息链上可以有多个消费组,一个消费组内拥有多个消费者,每一个消费者也有一个唯一的 ID 标识。
  • last_delivered_id :表示消费组游标,每个消费组都会有一个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
  • pending_ids :Redis 官方称为 PEL,表示消费者的状态变量,它记录了当前已经被客户端读取的消息 ID,但是这些消息没有被 ACK(确认字符)。如果客户端没有 ACK,那么这个变量中的消息 ID 会越来越多,一旦被某个消息被 ACK,它就开始减少。
3) ACK 

ACK(Acknowledge character)即确认字符,在数据通信中,接收方传递给发送方的一种传输类控制字符。表示发来的数据已确认接收无误。在 TCP/IP 协议中,如果接收方成功的接收到数据,那么会回复一个 ACK 数据。通常 ACK 信号有自己固定的格式,长度大小,由接收方回复给发送方。

常用命令汇总

Redis Stream命令
命令说明
XADD 添加消息到末尾。
XTRIM对 Stream 流进行修剪,限制长度。
XDEL删除指定的消息。
XLEN获取流包含的元素数量,即消息长度。
XRANGE获取消息列表,会自动过滤已经删除的消息。
XREVRANGE 反向获取消息列表,ID 从大到小。
XREAD以阻塞或非阻塞方式获取消息列表。
XGROUP CREATE创建消费者组。
XREADGROUP GROUP读取消费者组中的消息。
XACK将消息标记为"已处理"。
XGROUP SETID为消费者组设置新的最后递送消息ID。
XGROUP DELCONSUMER删除消费者。
XGROUP DESTROY删除消费者组。
XPENDING显示待处理消息的相关信息。
XCLAIM 转移消息的归属权。
XINFO查看 Stream 流、消费者和消费者组的相关信息。
XINFO GROUPS查看消费者组的信息。
XINFO STREAM 查看 Stream 流信息。
XINFO CONSUMERS key group查看组内消费者流信息。

创建消息ID

当创建一个 Srteam 时, 需要创建消息 ID,该 ID 是唯一、不可重复的,并且只增不减。消息 ID 有两种创建方式,一是系统自动生成,二是自定义创建。

1) 系统自动创建

语法格式如下:

XADD key ID field value [field value ...]

参数说明如下:

  • key :指定队列名称,如果不存就创建;
  • ID :消息 id,我们使用*表示由 redis 生成,可以自定义,但是要自己保证递增性;
  • field value :消息记录。


返回值是毫秒时间戳格式的字符串。比如 1610619132674-2,它表示在该毫秒内产生的第 2 条消息。使用示例:

XADD mystream * username cc 10

2) 自定义ID

自定义 ID 比较简单,但是需要注意的是 ID 的形式必须是 “整数”,并且后面加入消息的 ID 必须大于前面消息的 ID,也就是自定义 ID 也必须遵守递增的规则。示例如下:

XADD mystream1 001 name zhangsan addr hebei

创建消费组

Redis Stream通过XGROUP CREATE指令创建消费组(Consumer Group),在创建时,需要传递起始消息的 ID 用来初始化 last_delivered_id 变量。语法格式如下:

<span style="color:#444444">XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]</span>

参数说明如下:

  • key :指定 Stream 队列名称,若不存在则自动创建。
  • groupname :自定义消费组的名称,不可重复。
  • $ :表示从尾部开始消费,只接受新消息,而当前 Stream 的消息则被忽略。

消费消息

Redis Stream 通过XREADGROUP命令使消费组消费信息,它和XREAD命令一样,都可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PLE(正在处理的消息)结构里,客户端处理完毕后使用 XACK 命令通知 Redis 服务器,本条消息已经处理完毕,该消息的 ID 就会从 PEL 中移除。示意图如下

redis stream

XREADGROUP命令的语法格式如下所示:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]  

参数说明如下:

  • group :消费组名称。
  • consumer :消费者名称。
  • count : 要读取的数量。
  • milliseconds : 阻塞时间,以毫秒为单位。
  • key :  键指定的队列名称。
  • ID : 表示消息 ID。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1242610.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023年跨界融合创新应用合作发展大会-核心PPT资料下载

一、峰会简介 本次大会主题为“创新地理信息价值 服务数字中国建设”。1天主论坛和6场专题论坛的报告&#xff0c;围绕主题深入探讨地理信息产业与相关重要应用领域的跨界融合和深化合作。 本届大会将搭建地理信息产业与旅游、林业、环保、气象、住建、水利、农业农村、电力等…

2023亚太杯数学建模竞赛C题新能源电动汽车数据分析与代码讲解

C题论文包括摘要、问题重述、问题分析、模型假设、符号说明、模型的建立和求解&#xff08;问题1模型的建立和求解、问题2模型的建立和求解、问题3模型的建立和求解、问题4模型的建立和求解、问题5模型的建立和求解&#xff09;、模型的评价等等&#xff0c; 视频讲解如下&…

MySQL-01-MySQL基础架构

1-MySQL逻辑结构 如果能在头脑中构建一幅MySQL各组件之间如何协同工作的架构图&#xff0c;有助于深入理解MySQL服务器。下图展示了MySQL的逻辑架构图。 MySQL逻辑架构整体分为三层&#xff0c;最上层为客户端层&#xff0c;并非MySQL所独有&#xff0c;诸如&#xff1a;连接处…

驯服大数据的超强利器——PySpark数据处理引擎

你是否曾经为了处理大规模数据而烦恼&#xff1f;是否曾经为了解决日常的数据科学挑战而彻夜难眠&#xff1f;现在&#xff0c;Spark数据处理引擎正在向你敞开大门。这是一个惊人的分析工厂&#xff0c;输入原始数据&#xff0c;输出洞察。 PySpark&#xff0c;作为Spark的核心…

一文掌握 Spring Boot 常用注解,保姆级整理,建议收藏!

亲兄弟篇&#xff1a; SpringBoot注解大全&#xff08;超详细&#xff09;_Maiko Star的博客-CSDN博客 一、SpringBoot常用注解 二、Bean处理注解 2.1 Resource 依赖注入&#xff0c;自动导入标注的对象到当前类中&#xff0c;比如我们的 Controller 类通常要导入 Service 类…

java中BigDecimal的介绍及使用(二)

系列文章目录 java中BigDecimal的介绍及使用&#xff0c;BigDecimal格式化&#xff0c;BigDecimal常见问题java中BigDecimal的介绍及使用(二) 文章目录 系列文章目录一、前言二、BigDecimal提供的方法2.1、stripTrailingZeros() 去除小数尾部所有的02.2、int signum()2.3、int…

JAVA爬虫2 - Jsoup解析、对接MySQL、多线程爬虫、json库使用

官网:https://jsoup.org/download Jsoup是一款基于Java的HTML解析器,它可以方便地从网页中抓取和解析数据。它的主要作用是帮助开 发者处理HTML文档,提取所需的数据或信息。下面介绍几个常用的API: 选择器(Selector)API:用于根据CSS选择器语法选择HTML元素。 属性(Attribute…

短视频变表情包gif怎么做?这一招最好用

Gif动态表情包是一种有效的表达感情的方式。可以通过添加图像、文字等更加直观的传递情感和信息。在各种聊天软件中gif动态表情包也是非常收欢迎的。当我们看到一段视频想要将其制作成gif动态表情包的时候要怎么操作呢&#xff1f;教大家使用在线制作gif&#xff08;https://ww…

编写自己的CA和TA与逆向

参考内容《手机安全和可信应用开发》 https://note.youdao.com/s/MTlG4c1w 介绍 TA的全称是Trust Application&#xff0c; 即可信任应用程序。 CA的全称是Client Applicant&#xff0c; 即客户端应用程序。 TA运行在OP-TEE的用户空间&#xff0c; CA运行在REE侧。 CA执行时代…

Faster R-CNN源码解析(三)

目录 todaytorch.meshgrid()函数 today 今天我们主要来捋一捋AnchorsGenerator这部分代码,对应在network_files文件夹中的rpn_function文件中&#xff0c;从RegionProposalNetwork()类的forward()函数开始看&#xff0c;首先会进入head部分也就是我们看到的RPNHead部分,也就是…

SVD 最小二乘法解 亲测ok!

线性最小二乘问题 m个方程求解n个未知数&#xff0c;有三种情况&#xff1a; mn且A为非奇异&#xff0c;则有唯一解&#xff0c;xA.inverse()*bm>n&#xff0c;约束的个数大于未知数的个数&#xff0c;称为超定问题&#xff08;overdetermined&#xff09;m<n&#xff0…

2023.11.22 IDEA Spring Boot 项目热部署

目录 引言 操作步骤 1. 在 pom.xml 中添加热部署框架支持 2. Setting 开启项目自动编译 3. 以后创建的新项目进行同步配置 4. 重复 配置 步骤2 的内容 5. 开启运行中的热部署 引言 Spring Boot 的热部署是一种在项目正在运行的时候修改代码&#xff0c;却不需要重新启动…

数字孪生农村供水工程平台:为乡村振兴注入新活力

随着科技的不断进步&#xff0c;数字孪生技术逐渐成为各行业创新发展的重要驱动力。在水利领域&#xff0c;数字孪生农村供水平台以其独特的优势&#xff0c;为农村供水系统带来了革命性的变革。本文将为您详细介绍数字孪生农村供水平台的核心特点及优势&#xff0c;带您领略智…

【软件测试】技术不好?不学这几招你怎么跳槽?

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、软件测试面试环…

解决DaemonSet没法调度到master节点的问题

最近在kubernetes部署一个springcloud微服务项目&#xff0c;到了最后一步部署边缘路由&#xff1a;使用nginx-ingress和traefik都可以&#xff0c;必须使用DaemonSet部署&#xff0c;但是发现三个节点&#xff0c;却总共只有两个pod。 换句话说&#xff0c; DaemonSet没法调度…

Google搜索广告图标详解

Google搜索广告图标是指在Google搜索结果页面中&#xff0c;广告结果前面显示的小图标。这些图标旨在帮助用户更容易地识别哪些结果是广告&#xff0c;并提供更直观的搜索体验。本文小编将对于Google搜索广告图标的类型、作用和设计原则进行介绍。 一、Google搜索广告图标的类型…

化学仿制药参比制剂目录-参比制剂查询网站

2015年以前&#xff0c;参比制剂对于仿制药的研究无关紧要&#xff0c;但推出了’仿制药一致性评价’后&#xff0c;参比制剂的选择成为了决定仿制药成功与否的关键因素&#xff0c;如今在进行仿制药研究时&#xff0c;首要任务就是确定仿制目标&#xff0c;也就是参比制剂。 …

PLC通过lora网关采集温室大棚温湿度数据

概述: 运用lora网关远程控制大棚内风机&#xff0c;日光灯&#xff0c;温湿度传感器等设备。可以实现远程获取现场环境的空气温湿度、土壤水分温度、二氧化碳浓度、光照强度可以自动控制温室湿帘风机、喷淋滴灌、加温补光等设备&#xff0c;并向远程计算机端推送实时数据&…

linux删除oracle数据库:如何在Linux系统中删除Oracle数据库

停止Oracle数据库服务&#xff1a;# su - oracle 1. 停止Oracle数据库服务&#xff1a; # su - oracle $ sqlplus / as sysdba SQL> SHUTDOWN IMMEDIATE; 2. 删除oracle安装目录&#xff1a; # rm -rf /u01/app/oracle 3. 删除oracle用户和组&#xff1a; # userdel…

关于前端上传

类似于 上面的传参form-data形式&#xff0c;第一个参数为上传的文件&#xff0c;第二个参数为json格式