【Redis】Redis 的消息队列 List、Streams—(六)

news2024/9/26 5:16:08

目录

    • 一、消息队列
    • 二、List 方案
    • 三、Streams 方案

在这里插入图片描述
在这里插入图片描述

一、消息队列

我们一般把消息队列中发送消息的组件称为生产者,把接收消息的组件称为消费者,下图是一个通用的消息队列的架构模型:
消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性。

  • (1)消息保序

虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。

  • (2)重复消息处理

消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。如果多次处理重复消息的话,就可能造成一个业务逻辑被多次执行,从而出现数据问题。

  • (3)消息可靠性保证

消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。

二、List 方案

List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,已经能满足消息 保序 的需求了。具体来说,生产者可以使用 LPUSH 命令把要发送的消息依次写入 List,而消费者则可以使用 RPOP 命令,从 List 的另一端按照消息的写入顺序,依次读取消息并进行处理。
在这里插入图片描述

List 并不会主动地通知消费者有新消息写入,如果消费者循环调用 RPOP 命令又会带来 CPU 开销问题。Redis 提供了 BRPOP 命令,称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。

在解决 重复消息处理 的问题上,一方面,消息队列要能给每一个消息提供全局唯一的 ID,另一方面,消费者程序要把已经处理过的消息的 ID 记录下来,如果已经处理过,消费者程序就不再进行处理了。这种处理特性也称为幂等性,指对于同一条消息,消费者收到一次或多次的处理结果是一致的。

不过,List 本身是不会为每个消息生成 ID,所以,消息的全局唯一 ID 需要生产者程序在发送消息前自行生成,并包含在消息中以供消费者处理。

为了 保证消息可靠性 ,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存,这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。
在这里插入图片描述

三、Streams 方案

如果生产者消息发送很快,而消费者处理消息的速度比较慢,会导致 List 中的消息越积越多,给 Redis 的内存带来很大压力,而 List 并不支持多个消费者同时处理。这时候就要用到 Redis 从 5.0 版本开始提供的 Streams 数据类型了。Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令:

  • XADD:插入消息,保证有序,可以自动生成全局唯一 ID
  • XREAD:用于读取消息,可以按 ID 读取数据
  • XREADGROUP:按消费组形式读取消息
  • XPENDING:命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息
  • XACK:命令用于向消息队列确认消息处理已完成
  • XADD 命令插入新消息的格式是键 - 值对形式,例如往名称为 mqstream 的消息队

列中插入一条消息:

XADD mqstream * repo 5
"1599203861727-0"

其中,* 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID,例如“1599203861727-0”。也可以不用 *,直接在消息队列名称后自行设定一个 ID,只要保证全局唯一就行。

自动生成的 ID 由两部分组成,第一部分“1599203861727”是数据插入时,以毫秒为单位计算的当前服务器时间,第二部分表示插入消息在当前毫秒内的消息序号,从 0 开始。例如,“1599203861727-0”就表示在“1599203861727”毫秒内的第 1 条消息。

XREAD 在读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取。设定 block 配置项,可实现类似于 BRPOP 的阻塞读取操作,单位是毫秒。例如,从 ID 为 1599203861727-0 的消息开始,读取后续的所有消息(共 3 条)

XREAD BLOCK 100 STREAMS  mqstream 1599203861727-0
1) 1) "mqstream"
   2) 1) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
            
      2) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"
            
      3) 1) "1599274927910-0"
         2) 1) "repo"
            2) "1"

再看一个例子,命令以 $ 结尾表示读取最新的消息,同时设置了 block 10000 的配置项,表明 XREAD 在读取最新消息时,如果没有消息到来将阻塞 10000 毫秒(即 10 秒),然后再返回。当消息队列 mqstream 中一直没有消息时,XREAD 在 10 秒后返回空值(nil)

XREAD block 10000 streams mqstream $
(nil)
(10.00s)

XGROUP 创建消费组,是区别于 List 的功能,创建后 Streams 可以使用 XREADGROUP 命令让消费组内的消费者读取消息。

例如,我们执行下面的命令,创建一个名为 group1 的消费组,这个消费组消费的消息队列是 mqstream

XGROUP create mqstream group1 0
OK

执行命令,让 group1 消费组里的消费者 consumer1 从 mqstream 中读取所有消息,命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。

在 consumer1 读取消息前,group1 中没有其他消费者读取过消息,所以,consumer1 就得到 mqstream 消息队列中的所有消息共4条。

XREADGROUP group group1 consumer1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599203861727-0"
         2) 1) "repo"
            2) "5"
            
      2) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
            
      3) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"
            
      4) 1) "1599274927910-0"
         2) 1) "repo"
            2) "1"

如果队列中的消息已经被其他消费者读取,则其他消费者无法读取,例如,再让 group1 内的 consumer2 读取消息时,返回空值。

XREADGROUP group group1 consumer2  streams mqstream 0
1) 1) "mqstream"
   2) (empty list or set)

消费组的目的是让组内的多个消费者共同分担读取,从而实现负载均衡,例如,让 group2 中的 consumer1、2、3 各自读取一条消息

XREADGROUP group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599203861727-0"
         2) 1) "repo"
            2) "5"
            
XREADGROUP group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
            
XREADGROUP group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"

为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。

例如,查看一下 group2 中各个消费者已读取、但尚未确认的消息个数。其中,XPENDING 返回结果的第二、三行分别表示 group2 中所有消费者读取的消息最小 ID 和最大 ID。

XPENDING mqstream group2
1) (integer) 3
2) "1599203861727-0"
3) "1599274925823-0"
4) 1) 1) "consumer1"
      2) "1"
   2) 1) "consumer2"
      2) "1"
   3) 1) "consumer3"
      2) "1"

如果需要进一步查看某个消费者具体读取了哪些数据,可以执行以下命令,consumer2 已读取的消息的 ID 是 1599274912765-0

XPENDING mqstream group2 - + 10 consumer2
1) 1) "1599274912765-0"
   2) "consumer2"
   3) (integer) 513336
   4) (integer) 1

当 1599274912765-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。

 XACK mqstream group2 1599274912765-0
(integer) 1
XPENDING mqstream group2 - + 10 consumer2
(empty list or set)

一张表格,汇总了用 List 和 Streams 实现消息队列的特点和区别
在这里插入图片描述

Redis 是一个非常轻量级的键值数据库,Kafka、RabbitMQ 是专门面向消息队列场景的重量级软件,例如 Kafka 的运行就需要再部署 ZooKeeper。

如果分布式系统中的组件消息通信量不大,那么,Redis 只需要使用有限的内存空间就能满足消息存储的需求,而且,Redis 的高性能特性能支持快速的消息读写,不失为消息队列的一个好的解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2089534.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于yolov10的PCB检测算法研究

内容:项目将YOLOV10创新后的PCB检测算法成功部署到GD32H757上,实现PCB缺陷的工业产线实时检测。 项目主要支持开源代码:HomiKetalys/gd32ai-modelzoo: Provide deployable deep learning models on gd32 (github.com) (想了解将…

群晖7.2.1 半洗白后安装AME

1. 群晖打开SSH 2. Xshell登录群晖 用管理员账户登录,然后使用sudo -i 获取root权限,sudo -i是要再次验证管理员密码 sudo -iSA6400还需要运行这个命令 /usr/syno/etc/rc.sysv/apparmor.sh stop #DSM7.2 AME版本3.1.0-3005强制解锁激活命令 curl -sk…

最常用集合 - ArrayList详解

ArrayList介绍 ArrayList实现了List接口,是顺序容器,即元素存放的数据与放进去的顺序相同,允许放入null元素,底层通过数组实现。除该类未实现同步外,其余跟Vector大致相同。每个ArrayList都有一个容量(capacity)&…

MeshAnything V2来了!30秒生成建模师级Mesh!最大可生成面数提升至1600.

GitHub已揽星1.9k的MeshAnything项目上新了V2版本,由来自南洋理工大学、清华大学、帝国理工学院、西湖大学等研究人员完成。 MeshAnything V2相比V1,使用了最新提出的Adjacent Mesh Tokenization(AMT)算法,将最大可生…

mysql学习教程,从入门到精通,MySQL介绍(1)

1、MySQL 教程 本教程是为初学者准备的,以帮助他们理解与MySQL语言相关的基础知识和高级概念。 mysql MySQL 是最流行的关系型数据库管理系统,在 WEB 应用方面 MySQL 是最好的 RDBMS(Relational Database Management System:关系数据库管理系…

浏览器的高级搜索

一、背景 通常我们在浏览器搜索内容都是直接在输入框输入我们想要查询的内容,但是这样搜索出来的内容关联性不是很高,很多内容都是与我们搜索内容无关,会浪费我们大量的时间去查找内容。比如:我想要搜索网页中包含《游戏科学》这4…

kafak集群搭建-基于kRaft方式

kafak集群搭建-基于kRaft方式 1、服务器规划2、kafka集群部署配置2.1、解压三个kafka2.2、配置/config/kraft/server.properties 3、启动kafka集群4、SpringBoot集成kafka的kRaft集群4.1、消费者4.2、生产者4.3、配置类4.4、实体类4.5、JSON工具类4.6、项目配置文件4.7、测试类…

【web开发】Spring Boot 快速搭建Web项目

Date: 2024.08.30 13:52:20 author: lijianzhan 简述:【Spring Boot 快速搭建Web项目应用】是一篇关于Java Web项目构建的文章,主要讲解了如何借助Maven工具来管理和构建Web应用程序。Maven是Java开发中广泛使用的自动化构建工具,能够帮助开…

顺序循环队列

顺序循环队列 队头插入元素,队尾删除元素 本来应该判空和判断是否存满的条件都是:队头 队尾,但这样就没办法区分了,所以,就牺牲一个空间(比如长度为10,但只能存9个),这…

基层医疗云HIS系统源码:云计算、大数据等现代信息技术研发

云HIS源码,基层云HIS系统源码,基层医疗云HIS系统 利用云计算、大数据等现代信息技术研发的基层医疗云HIS系统实现了医院信息化从局域网向互联网转型,重新定义医疗卫生信息化建设的理念、构架、功能和运维体系。实现了医院信息化由局域网向互…

CAN协议通信 学习笔记

文章目录 1.CAN通信简介2.物理层2.1 CAN总线的电气特性2.2 CAN的位同步机制(了解,用于理解CAN的初始化参数的配置原理)硬同步方式重新同步方式 2.3 CAN对比其他常用协议的优势 3. 数据链路层3.1 CAN协议的数据帧3.2 仲裁机制3.3 访问控制3.4 …

python-FastApi框架

文章目录 FastApi一. 简介二. 特性三. 安装1. 安装fastapi模块2. 安装ASGI服务器( Uvicorn 或者 Hypercorn) 四. 实例1. 创建**main.py**文件(GET请求)2. 运行3. 测试4. 更新main_py(加入PUT请求) 五. 自动化API文档1. Swagger UI(交互式文档)2. ReDoc(可选式文档) FastApi 一…

华为云征文|Flexus云服务器X,云上性能新飞跃,开启业务增长新纪元

🏆作者简介,黑夜开发者,CSDN领军人物,全栈领域优质创作者✌,CSDN博客专家,阿里云社区专家博主,2023年6月CSDN上海赛道top4。 🏆数年电商行业从业经验,AWS/阿里云资深使用…

想告诉所有人,我找到脸书视频保存方法啦!

各位集美集帅们,我可算找到脸书视频的保存教学啦。作为社媒体人,在脸书看到有趣的素材却保存不了时真的要急的爆炸了。试了好多方式,这软件是最给力哒,我不管,下面的步骤介绍你一定要看完! 打开脸书&#x…

JVM面试(一)什么是虚拟机?什么是class文件?

什么是java虚拟机? 如果通俗点来讲,我们在电脑上一行行敲出来的代码,电脑本身是不认识的,最终是要转成电脑可以运行的101001这种字节。 但是这些我们又不可能手动来转换,所以呢,就需要一个工具&#xff0…

关于redis存储数据类型选择

项目使用的spring-boot,操作redis使用的是spring redis的api 在序列化的时候,如果往redis存入的是比较小的数字,反序列化的时候,会是integer类型 如果字段定义的是Long类型,因为比较小,所以被反序列化成i…

Cadence高速板设计技巧(全志H3)

市场上一般的电视屏幕是4K的: cadence查找: 右侧的面板FIND里面输入要查找的名字就可以进行查找。 全局查找需要鼠标点击到.DSN的,进入全局。 在视图里选择一个层就可以单独查看这个层的东西,屏蔽掉其他层的东西: 共…

linux命令:用于删除空目录的命令行工具rmdir详细介绍

目录 一、概述 二、用法 1、基本语法 (1)选项 (2)目录... 2、主要选项 (1)-p, --parents (2) -v, --verbose (3) -h, --help (4&#x…

Mysql基础练习题 596.查询至少有5个学生的所有班级 (力扣)

596.查询至少有5个学生的所有班级 建表插入数据: Create table If Not Exists Courses (student varchar(255), class varchar(255)) Truncate table Courses insert into Courses (student, class) values (A, Math) insert into Courses (student, class) value…

指针进阶(多级指针)

0.多级指针命名 多级指针命名,最主要的是要知道该指针指向的是什么数据。 一.1级指针 - 指向一个变量 若定义一个变量 int a,那么 目标类型就为 int。 所以该指针应该定义为 int *p; /* 目标 */ int a;/* 目标类型 *p */ int *p;/* 指向目标 */ p a;二…