Redis队列Stream、Redis多线程详解(一)

news2025/1/20 15:41:10

Redis队列与Stream

Redis5.0 最大的新特性就是多出了一个数据结构 Stream,它是一个新的强大的支持多播的可持久化的消息队列,作者声明Redis Stream地借鉴了 Kafka 的设计。

Redis Stream 的结构如上图所示,每一个Stream都有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的,Redis 重启后,内容还在。

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用xadd指令追加消息时自动创建。

每个 Stream 都可以挂多个消费组,每个消费组会有个游标last_delivered_id在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化last_delivered_id变量。

每个消费组 (Consumer Group) 的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。

同一个消费组 (Consumer Group) 可以挂接多个消费者 (Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。每个消费者有一个组内唯一名称。

消费者 (Consumer) 内部会有个状态变量pending_ids,它记录了当前已经被客户端读取,但是还没有 ack的消息。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为PEL,也就是Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。

消息 ID 的形式是timestampInMillis-sequence,例如1527846880572-5,它表示当前的消息在毫米时间戳1527846880572时产生,并且是该毫秒内产生的第 5 条消息。消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。

消息内容就是键值对,形如 hash 结构的键值对,这没什么特别之处。

常用操作命令

生产端

xadd 追加消息

xdel 删除消息,这里的删除仅仅是设置了标志位,不会实际删除消息。

xrange 获取消息列表,会自动过滤已经删除的消息

xlen 消息长度

del 删除 Stream

xadd streamtest * name mark age 18

 

streamtest 表示当前这个队列的名字,也就是我们一般意义上Redis中的key,* 号表示服务器自动生成 ID,后面顺序跟着“name mark age 18”,是我们存入当前streamtest 这个队列的消息,采用的也是 key/value的存储形式

返回值1626705954593-0 则是生成的消息 ID,由两部分组成:时间戳-序号。时间戳时毫秒级单位,是生成消息的Redis服务器时间,它是个64位整型。序号是在这个毫秒时间点内的消息序号。它也是个64位整型。

为了保证消息是有序的,因此Redis生成的ID是单调递增有序的。由于ID中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis的每个Stream类型数据都维护一个latest_generated_id属性,用于记录最后一个消息的ID。若发现当前时间戳退后(小于latest_generated_id所记录的),则采用时间戳不变而序号递增的方案来作为新消息ID(这也是序号为什么使用int64的原因,保证有足够多的的序号),从而保证ID的单调递增性质。

如果不是非常特别的需求,强烈建议使用Redis的方案生成消息ID,因为这种时间戳+序号的单调递增的ID方案,几乎可以满足全部的需求,但ID是支持自定义的。

 

xrange streamtest - +

其中-表示最小值 , + 表示最大值

 或者我们可以指定消息 ID 的列表:

 xdel streamtest 1626706380924-0

xlen streamtest

 

del streamtest 删除整个 Stream

 消费端

单消费者

虽然Stream中有消费者组的概念,但是可以在不定义消费组的情况下进行 Stream 消息的独立消费,当 Stream 没有新消息时,甚至可以阻塞等待。Redis 设计了一个单独的消费指令xread,可以将 Stream 当成普通的消息队列 (list) 来使用。使用 xread 时,我们可以完全忽略消费组 (Consumer Group) 的存在,就好比 Stream 就是一个普通的列表 (list)。

xread count 1 streams stream2 0-0

“count 1”表示从 Stream 读取1条消息,缺省当然是头部,“streams”可以理解为Redis关键字,“stream2”指明了要读取的队列名称,“0-0”指从头开始

 

xread count 2 streams stream2 1626710882927-0

也可以指定从streams的消息Id开始(不包括命令中的消息id)

 

xread count 1 streams stream2 $

$代表从尾部读取,上面的意思就是从尾部读取最新的一条消息,此时默认不返回任何消息

 

所以最好以阻塞的方式读取尾部最新的一条消息,直到新的消息的到来

xread block 0 count 1 streams stream2 $

block后面的数字代表阻塞时间,单位毫秒

 此时我们新开一个客户端,往stream2中写入一条消息

 可以看到阻塞解除了,返回了新的消息内容,而且还显示了一个等待时间,这里我们等待了127.87s

 一般来说客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息 ID。下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的消息。

消费组

创建消费组

Stream 通过xgroup create指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化last_delivered_id变量。

xgroup create stream2 cg1 0-0

“stream2”指明了要读取的队列名称,“cg1”表示消费组的名称,“0-0”表示从头开始消费

 

xgroup create stream2 cg2 $

$ 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略

 

现在我们可以用xinfo命令来看看stream2的情况:

xinfo stream stream2

 xinfo groups stream2

 消息消费

有了消费组,自然还需要消费者,Stream 提供了 xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID。

它同 xread 一样,也可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack 指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。

xreadgroup GROUP cg1 c1 count 1 streams stream2 >

“GROUP”属于关键字,“cg1”是消费组名称,“c1”是消费者名称,“count 1”指明了消费数量,> 号表示从当前消费组的 last_delivered_id 后面开始读,每当消费者读取一条消息,last_delivered_id 变量就会前进

 

前面我们定义cg1的时候是从头开始消费的,自然就获得Stream2中第一条消息

再执行一次上面的命令

 

自然就读取到了下条消息。

我们将Stream2中的消息读取完

xreadgroup GROUP cg1 c1 count 2 streams stream2 >

很自然就没有消息可读了, xreadgroup GROUP cg1 c1 count 1 streams stream2 >

 

然后设置阻塞等待

xreadgroup GROUP cg1 c1 block 0 count 1 streams stream2 >

 

我们新开一个客户端,发送消息到stream2

xadd stream2 * name lison score 98

 回到原来的客户端,发现阻塞解除,收到新消息

 

我们来观察一下观察消费组状态

 

如果同一个消费组有多个消费者,我们还可以通过 xinfo consumers 指令观察每个消费者的状态

xinfo consumers stream2 cg1

可以看到目前c1这个消费者有 5 条待ACK的消息,空闲了441340 ms 没有读取消息。

如果我们确认一条消息

xack stream2 cg1 1626751586744-0

就可以看到待确认消息变成了4条

 xack允许带多个消息id,比如

 

同时Stream还提供了命令XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息,每个Pending的消息有4个属性:

消息ID

所属消费者

IDLE,已读取时长

delivery counter,消息被读取次数

命令XCLAIM用以进行消息转移的操作,将某个消息转移到自己的Pending列表中。需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。

更多的Redis的Stream命令请大家参考Redis官方文档:

https://redis.io/topics/streams-intro

Commands | Redis

同时Redis文档中,在每个命令的详情页右边会显示“Related commands”,可以通过这个列表快速了解相关的命令和进入具体命令的详情页。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/414559.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Pandas 学习手册中文第二版:1~5

原文:Learning pandas 协议:CC BY-NC-SA 4.0 译者:飞龙 一、Pandas 与数据分析 欢迎来到《Pandas 学习手册》! 在本书中,我们将进行一次探索我们学习 Pandas 的旅程,这是一种用于 Python 编程语言的开源数…

Android:启动流程

Android启动流程 第一步:启动电源以及系统启动 当电源按下,引导芯片代码开始从预定义的地方(固化在ROM)开始执行。加载引导程序到RAM,然后 执行 第二步:引导程序 引导程序是在Android操作系统开始运行前的一个小程序。引导程序…

如何防止设备被重复控制

1. 引言 在一个物联网的系统中,主要有三部分组成:云端、WiFi、电控。当用户在APP上控制设备时,其控制下发链路是:云端>>WIFI>> 电控。当电控收到控制指令后,执行设备控制,控制成功后&#xff…

如何使用Midjourney辅助建筑平面设计,常用的建筑平面效果图提示和使用效果展示(内附Midjourney提示词网站)

文章目录一、室内建筑平面设计1.AutoCAD图纸(别墅首层图)2.平面效果图3.三维平面透视图二、建筑室内设计1.现代简约2.波西米亚风格3.工业风格4.沿海风格5.法国风格6.现代风格7.提示增加颜色倾向8.提示中增加设计师9.其它一些尝试三、好用的Midjourney提示…

Redis 6.x哨兵模式部署(五)

目录 一、主从复架构搭建 二、哨兵模式搭建 2.1背景 2.2哨兵模式介绍 2.3 Sentinel三大工作任务 1监控(Monitoring) 2提醒(Notification) 3自动故障迁移(Automatic failover) 4核心流程 2.4 安装…

企业如何实现数字化转型?

企业如何实现数字化转型? 首先,我需要先跟各位明确,企业数字化转型中很重要的3个“先行”条件: 第一、企业一把手的眼光和格局 一把手的视野、格局、定力是最重要的因素,没有之一。能不能放下自己过去的执念与经验&a…

信息与计算科学有哪些SCI期刊推荐? - 易智编译EaseEditing

以下是信息与计算科学领域的一些知名SCI期刊推荐: Information Sciences: 该期刊是信息科学领域的重要期刊,涵盖了信息科学、计算科学、人工智能、数据挖掘、模式识别、多媒体技术、网络通信、智能系统等方面的研究。 IEEE Transactions on…

电子文件的线上存储工具,你了解多少?

信息化时代的来临,企业也纷纷跟随时代步伐进入现代化办公。信息时代最显著的特征就是纸质文件到电子文件的转变。企业一天的办公中,可能就会产出无数的电子文件,其中很多文件都是珍贵的业务经验,因此线上存储是企业需要考虑的问题…

网页解析--bs4--01

python爬虫之bs4模块(超详细) Beautiful Soup 4.4.0 文档 — Beautiful Soup 4.2.0 documentation (crummy.com) 可以看到bs4库将网页文件变成了一个soup的类型, 事实上,bs4库 是解析、遍历、维护、“标签树“的功能库。 通俗一点…

redis基础总结-常用命令

redis常用指令3. 常用指令3.1 key 操作分析3.1.1 key应该设计哪些操作?3.1.2 key 基本操作3.1.3 key 扩展操作(时效性控制)3.1.4 key 扩展操作(查询模式)3.2 数据库指令3.2.1 key 的重复问题3.2.2 解决方案3.2.3 数据库…

001:Mapbox GL加载基础的地图

第001个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+mapbox中加载最基础的 Mapbox GL地图 。 直接复制下面的 vue+mapbox源代码,操作2分钟即可运行实现效果 文章目录 示例效果配置方式示例源代码(共59行)相关API参考:专栏目标示例效果 配置方式 1)查看基础设置…

无限制翻译软件-中英互译字数无限

翻译软件是我们工作及学习中必不可少的工具,然而许多翻译软件在使用时常常会出现字数限制的问题,这使得用户在处理长文本和大量文本时变得十分麻烦。如果你也遇到了类似的问题,那么哪个翻译软件不限制字数将为您带来全新的翻译体验。 以下是我们的哪个翻…

人人都是ChatGPT prompt 工程师

关于 Prompt ​ 解释这个词之前,首先需要解释 prompt 这个词: 简单的理解它是给 AI 模型的指令。 它可以是一个问题、一段文字描述,甚至可以是带有一堆参数的文字描述。AI 模型会基于 prompt 所提供的信息,生成对应的文本&…

Spark SQL join操作详解

一、 数据准备 本文主要介绍 Spark SQL 的多表连接,需要预先准备测试数据。分别创建员工和部门的 Datafame,并注册为临时视图,代码如下: val spark SparkSession.builder().appName("aggregations").master("lo…

腾讯云服务器CVM标准型S5和S6区别性能评测

腾讯云服务器CVM标准型S5是次新一代云服务器规格,标准型S6是最新一代的云服务器,S6实例的CPU处理器主频性能要高于S5实例,同CPU内存配置下的标准型S6实例要比S5实例性能更好一些,但是目前标准型S5实例活动较多,云服务器…

【ChatGPT】预训练模型微调及其应用(ChatGLM-6B、duckduckgo_search、GPT在科研的应用等)

note instructGPT(基于提示学习的系列模型)——>GPT3.5(大规模预训练语言模型)——>ChatGPT模型(高质量数据标注反馈学习)。chatGPT三大技术:情景学习、思维链、自然指令学习。GPT4飞跃式…

ASEMI代理AD9833BRMZ-REEL原装ADI车规级AD9833BRMZ-REEL

编辑:ll ASEMI代理AD9833BRMZ-REEL原装ADI车规级AD9833BRMZ-REEL 型号:AD9833BRMZ-REEL 品牌:ADI/亚德诺 封装:MSOP-10 批号:2023 引脚数量:10 安装类型:表面贴装型 AD9833BRMZ-REEL汽车…

【都2023年了,还在问网络安全怎么入门】

前言 【都2023年了,还在问网络安全怎么入门】所以这一期就出一一个怎么学习网络安全的学习路线和方法,觉得有用的话点赞收藏下 首先咱们聊聊,学习网络安全方向通常会有哪些问题 1、打基础时间太长 学基础花费很长时间,光语言都有…

数据库系统概论--第二章课后习题

1.试述关系模型的三个组成部分。 答: 关系模型由关系数据结构、(关系操作集合)和(关系完整性约束)三部分组成。 2. 简述关系数据语言的特点和分类。 答:特点:1)集合操作方式; 2)高度非过程化; 3)集查询、DDL、DML、…

【C++STL精讲】string类的基本使用与常用接口

文章目录💐专栏导读💐文章导读🌷为什么要学习string类?🌷string类的基本使用🌷string类的常用接口🌺数据访问函数🌺容量相关函数🌺操作函数🌷迭代器与范围for…