Redis实战(5)——Redis实现消息队列

news2025/1/27 13:07:10

消息队列,顾名思义,就是一个存放消息的队列。最简单的消息队列包含3个角色

  • 生产者:将消息存入队列中
  • 队列:存放和管理消息
  • 消费者: 将消息从队列中取出来并做业务处理
    在这里插入图片描述
    R e d i s 提供了三种实现消息队列的方式,基于 L i s t 结构、 P u b S u b 、 S t r e a m 结构 \textcolor{red}{Redis 提供了三种实现消息队列的方式,基于List结构、PubSub、Stream结构} Redis提供了三种实现消息队列的方式,基于List结构、PubSubStream结构

1 基于List 结构实现消息队列

Redis的List是一个双向列表。可以从两端存入数据或者取出数据。

  • LPUSH/RPUSH key element 【elements】
  • BLPOP/RPOP key timeout

利用list 结构实现的消息队列主要是依据阻塞取指令 BLPOP/RPOP 来模拟消费者监听队列,直到队列中有消失时获得该数据
优点: 实现简单,且可以持久化
缺点: 只能有一个消费者来消费数据,且只能消费一次,无法避免消息的丢失

2 基于PubSub(发布/订阅)

PubSub 是一个基于点对点的消息模型,消费者可以订阅一个或者多个chanel,当生产者向队列发送了消息时,消费者只要订阅了频道就可以收到并处理消息
在这里插入图片描述

  • PUBLISH channel message 将信息 message 发送到指定的频道 channel
  • SUBSCRIBE channel [channel …] 订阅一个或多个频道
  • PSUBSCRIBE pattren 订阅与通配符匹配的chanel

在使用PSUBSCRIBE pattren 时,支持多种通配符
1 \textcolor{blue}{1} 1 ?:匹配一个字符
2 \textcolor{blue}{2} 2 * :匹配零个字符或多个字符
3 \textcolor{blue}{3} 3 [] :选择匹配,匹配[]中定义的字符 如hell[ae]o 可以匹配 hello 和 hellao

使用PubSub 实现的消费队列时,支持 多生产、多消费 \textcolor{red}{多生产、多消费} 多生产、多消费的模式,不过PubSub不支持数据的持久化,相较于List,它本身就不是一个数据结构无法利用Redis持久化数据。并且无法避免消息的丢失,如生产者向无人订阅的频道发消息时,数据会丢失。另外还会出现由于消费者的缓存空间有效,超时缓存上限时,将会出现消息的丢失。由于这些缺点,redis的PUBSUB模式,无法满足对可靠性要求较高的服务。

3 基于Stream 数据结构

Stream 是redis5.0 及之后针对消息队列场景设计的 数据结构 \textcolor{red}{数据结构} 数据结构,因此数据的安全性得到了保障,因为可以持久化。相较于List 数据结构实现的消息队列的方式,有更多针对消息队列的单独命令,可以实现一个功能更加完善的消息队列

发送消息

  • XADD k e y \textcolor{red}{key} key [ N O M K S T R E A M ] \textcolor{blue}{[NOMKSTREAM] } [NOMKSTREAM] [ < M A X L E N ∣ M I N I D > [ = ∣   ] t h r e s h o l d [ L I M I T c o u n t ] ] \textcolor{green}{[<MAXLEN | MINID> [= | ~] threshold [LIMIT count]]} [<MAXLENMINID>[= ]threshold[LIMITcount]] < ∗ ∣ i d > \textcolor{orange}{<* | id>} <id> F i e l d v a l u e [ F i e l d v a l u e . . . ] \textcolor{purple}{Field value [Field value ...]} Fieldvalue[Fieldvalue...]

参数说明
K e y \textcolor{red}{Key} Key : 存储消息的队列的名字
[ N O M K S T R E A M ] \textcolor{blue}{[NOMKSTREAM] } [NOMKSTREAM] :可选参数,是否在队列不存在时,创建队列。默认是创建的
[ < M A X L E N ∣ M I N I D > [ = ∣   ] t h r e s h o l d [ L I M I T c o u n t ] ] \textcolor{green}{[<MAXLEN | MINID> [= | ~] threshold [LIMIT count]]} [<MAXLENMINID>[= ]threshold[LIMITcount]] :可选参数,设置消息队列的最大消息数,默认是设上限的
< ∗ ∣ i d > \textcolor{orange}{<* | id>} <id> :消息的唯一id,* 表示有redis自动生成,格式是时间戳_递增值 如 1526919030474-0。Id值也可以自定义。
F i e l d v a l u e [ F i e l d v a l u e . . . ] \textcolor{purple}{Field value [Field value ...]} Fieldvalue[Fieldvalue...] 消息体

读取消息

  • XREAD [ C O U N T c o u n t ] \textcolor{red}{[COUNT count] } [COUNTcount] [ B L O C K m i l l i s e c o n d s ] \textcolor{blue}{[BLOCK milliseconds]} [BLOCKmilliseconds] S T R E A M S k e y [ k e y . . . ] \textcolor{green}{STREAMS key [key ...] } STREAMSkey[key...] i d [ i d . . . ] \textcolor{orange}{id [id ...]} id[id...]

参数说明
[ C O U N T c o u n t ] \textcolor{red}{[COUNT count] } [COUNTcount] 可选参数, 指定读取消息的条数
[ B L O C K m i l l i s e c o n d s ] \textcolor{blue}{[BLOCK milliseconds]} [BLOCKmilliseconds] 当没有消息时,读取队列消息的阻塞时长,当设置为0时,永久等待,直到读取到队列中消息
S T R E A M S k e y [ k e y . . . ] \textcolor{green}{STREAMS key [key ...] } STREAMSkey[key...] 需要读取的队列的key名字,可以从多个队列中读取数据
i d [ i d . . . ] \textcolor{orange}{id [id ...]} id[id...] 读取消息的起始Id 。有两个特殊的id,0 表示从第一个消息读起,$ 表示读取最新的一条消息

在读取消息时,可以通过while(true) 循环 调用xread block 0 streams key $ 去永久的监听队列去获得消息。不过这种模式下会出现一个问题,在获得消息并处理消息这个时间间隙中,可能生产者又往队列中增加了好几条消息,由于Id 为$ 只会读取最新的一条消息,那么可能会出现消息的漏读。这里可以采用基于消费者组去读取消息

3.1 基于消费者组去消费消息

可以将多个消费者划分到一个组中,其中每个组消费消息时都会维护一个最后消费消息的标识 L a s t d e l i v e r e d i d \textcolor{red}{Last delivered id} Lastdeliveredid,当宕机重启后,直接从该标识id之后的消息消费。意味者不会重复消费消息。
在消费者组中还维护了一个 Pending_ids集合,该集合中存放了未确认【ACK】消费数据的消息Id,
机器出现宕机后重启,可继续确认未处理的消息。可以通过 X A C K \textcolor{red}{XACK} XACK来确认客户端确认已经消费了消息,之后从Pending_ids集合中移除。

基于消费者组消费消息时,最大程度的保证了消息的安全消费、不重复消费。
在这里插入图片描述

创建消费者组
XGROUP C R E A T E \textcolor{red}{CREATE} CREATE K E Y \textcolor{green}{KEY } KEY G R O U P N A M E \textcolor{blue}{GROUPNAME } GROUPNAME I D \textcolor{orange}{ID} ID [ M K S T R E A M ] \textcolor{purple}{ [MKSTREAM]} [MKSTREAM]

C R E A T E \textcolor{red}{CREATE} CREATE :创建组
K E Y \textcolor{green}{KEY } KEY :基于哪个队列去创建组
G R O U P N A M E \textcolor{blue}{GROUPNAME } GROUPNAME :创建的消费者组名称
I D \textcolor{orange}{ID} ID 消息的标识id。0 从头消费 $ 消费最新的消息
[ M K S T R E A M ] \textcolor{purple}{ [MKSTREAM]} [MKSTREAM] : 可选参数,当队列不存在时,是否创建队列,默认是创建

从消费者组中消费消息

XGROUPREAD GROUP g r o u p \textcolor{red}{group } group c o n s u m e r \textcolor{green}{consumer } consumer [ C O U N T c o u n t ] \textcolor{blue}{[COUNT count] } [COUNTcount] [ B l o c k m i l l i s e c o n d s ] \textcolor{orange}{[Block milliseconds] } [Blockmilliseconds] [ N O A C K ] \textcolor{purple}{ [NOACK] } [NOACK] S T R E A M S K E Y [ k e y . . . ] \textcolor{red}{STREAMS KEY [key ...]} STREAMSKEY[key...] I D [ I D . . . . . ] \textcolor{green}{ ID [ID.....] } ID[ID.....]

g r o u p \textcolor{red}{group } group : 组的名字,定义从哪个消费者组消费消息
c o n s u m e r \textcolor{green}{consumer } consumer :消费者名字,如果不存在,自动创建
[ C O U N T c o u n t ] \textcolor{blue}{[COUNT count] } [COUNTcount] :消费数量
[ B l o c k m i l l i s e c o n d s ] \textcolor{orange}{[Block milliseconds] } [Blockmilliseconds] :可选参数,阻塞时长【单位ms】,不设置时为非阻塞消费。
[ N O A C K ] \textcolor{purple}{ [NOACK] } [NOACK] :可选参数,是否自动确认。true时消息不会进入pending_ids[] 集合中,可能会有未消费的消息。所以为了安全性,无需设置。
S T R E A M S K E Y [ k e y . . . ] \textcolor{red}{STREAMS KEY [key ...]} STREAMSKEY[key...] : 监听的队列的名字
I D [ I D . . . . . ] \textcolor{green}{ ID [ID.....] } ID[ID.....] :获得消息的起始ID 。
设置成 ">" :从下一个未消费的消息开始消费。
设置成其他:均是从pending-list中获得已消费但是未确认的消息,如0 ,从pending-list中第一个消息开始。
根据实际情况可设置不同的ID 去消费消息。正常读取设置> 异常读取未确认的消息

确认消息
XACK k e y \textcolor{red}{key } key g r o u p \textcolor{green}{group } group I D [ I D . . . ] \textcolor{blue}{ ID [ID...] } ID[ID...]

k e y \textcolor{red}{key } key :队列名称
g r o u p \textcolor{green}{group } group :组名称
I D [ I D . . . ] \textcolor{blue}{ ID [ID...] } ID[ID...] :待确认的消息Id

3.2 数据测试

(1) 向order队列中添加4条消息

xadd order * voucherId 9 userId 150 orderId 79297921056506055
xadd order * voucherId 9 userId 129 orderId 79297921056506083
xadd order * voucherId 9 userId 111 orderId 79297921056506108
xadd order * voucherId 9 userId 111 orderId 79297921056506101

在这里插入图片描述
(2) 向order队列创建消费者组group_1

## 消费者组从头开始消费数据
XGROUP CREATE order  group_1 0 

(3) 从消费者组中消费消息

## 消费最新的未消费的消息,采用阻塞式获取,最长等待2000ms
XREADGROUP GROUP group_1 consumer_1 COUNT 1 BLOCK 2000 STRAEAMS order >

在这里插入图片描述
第五次消费时,阻塞等待后返回空。队列中的消息全部消费,此时都处于为确认状态,全部存入了penging-list中。
此时需要手动确认这些消息确实已经被成功的消费了,需要手动确认将其从pending-list 集合中移除

(4) 手动确认已经消费的消息

 XACK order group_1 1691146911471-0  1691148054821-0 1691148657217-0  1691202770386-0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/840046.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【力扣每日一题】2023.8.5 合并两个有序链表

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 题目给我们两个有序的链表&#xff0c;要我们保持升序的状态合并它们。 我们可以马上想要把两个链表都遍历一遍&#xff0c;把所有节点的…

AI抠图使用指南:Stable Diffusion WebUI Rembg实用技巧

抠图是图像处理工具的一项必备能力&#xff0c;可以用在重绘、重组、更换背景等场景。最近我一直在探索 Stable Diffusion WebUI 的各项能力&#xff0c;那么 SD WebUI 的抠图能力表现如何呢&#xff1f;这篇文章就给大家分享一下。 安装插件 作为一个生成式AI&#xff0c;SD…

aardio:用 WebView 模仿 mdict 界面

aardio&#xff1a;用 WebView 模仿 mdict 界面 import win.ui; /*DSG{{*/ mainForm win.form(text"aardio2";right889;bottom467) mainForm.add( button{cls"button";text"go";left335;top22;right399;bottom41;z2}; button2{cls"button…

基于以太网的煤矿电力监控系统的设计与应用 安科瑞 许敏

摘 要&#xff1a;针对传统煤矿电力监控系统通讯网络性能较差、无法实现准确故障定位及报警、不具备数据交互功能等问题&#xff0c;结合分布式网络及GPS授时技术设计了一套基于工业以太网及RS485总线架构的煤矿电力监控系统&#xff0c;可实现对井下供电网络及设备的远程监控…

C++ 字符串

在C语言中是使用字符型数组来存放字符串&#xff0c;C程序也仍然可以沿用这种方法。不仅如此&#xff0c;C库中还预定义了string类。 1.用字符数组存储和处理字符串 字符串常量是用一对双引号括起来的字符序列。例如"abcd"&#xff0c;"China"都是字符串…

跨域+四种解决方法

文章目录 一、跨域二、JSONP实现跨域请求三、前端代理实现跨域请求四、后端设置请求头实现跨域请求五、Nginx代理实现跨域请求5.1 安装Nginx软件5.2 使用Ubuntu安装nginx 本文是在学习课程满神yyds后记录的笔记&#xff0c;强烈推荐读者去看此课程。 一、跨域 出于浏览器的同…

【力扣】23. 合并 K 个升序链表 <链表指针、堆排序、分治>

目录 【力扣】23. 合并 K 个升序链表题解方法一&#xff1a;暴力&#xff0c;先遍历取出来值到数组中排序&#xff0c;再生成新链表方法二&#xff1a;基础堆排序&#xff08;使用优先队列 PriorityQueue&#xff09;方法三&#xff1a;基础堆排序&#xff08;使用优先队列 Pri…

时序预测 | Matlab实现基于SVR支持向量机回归的电力负荷预测模型

文章目录 预测结果基本介绍程序设计参考资料预测结果 基本介绍 时序预测 | Matlab实现基于SVR支持向量机回归的电力负荷预测模型 支持向量机(英语:support vector machine,常简称为SVM,又名支持向量网络)是在分类与回归分析中分析数据的监督式学习模型与相关的学习算法。给…

微信小程序nodejs+vue+uniapp个人家庭理财系统--论文

随着Internet的发展&#xff0c;人们的日常生活已经离不开网络。未来人们的生活与工作将变得越来越数字化&#xff0c;网络化和电子化。网上管理&#xff0c;它将是直接管理家庭理财系统app的最新形式。本论文是以构建家庭理财系统app为目标&#xff0c;使用nodejs技术制作&…

什么是Linux,如何在Windows操作系统下搭建Linux环境,远程连接Linux系统

文章目录 什么是LinuxLinux的诞生及发展为什么要学习LinuxLinux内核Linux发行版什么是虚拟机如何在VMware虚拟机中搭建Linux系统环境远程连接 Linux 系统Linux 帮助网站 什么是Linux Linux是一套免费使用和自由传播的类Unix操作系统&#xff0c;是一个基于POSIX和UNIX的多用户…

概念解析 | PointNet概述

注1:本文系“概念解析”系列之一,致力于简洁清晰地解释、辨析复杂而专业的概念。本次解析的概念是:点云深度学习及PointNet论文概述 参考论文:Qi C R, Su H, Mo K, et al. Pointnet: Deep learning on point sets for 3d classification and segmentation[C]//Proceedings of …

《向量数据库指南》——GPTCache 中的温度参数

目录 GPTCache 中的温度参数 a. 从多个候选答案中随机选择 b. 调整概率跳过缓存,直接调用模型 GPTCache 中的温度参数 为了平衡响应的随机性和一致性,并满足用户偏好或应用需求,在多模态 AI 应用中选择适当的温度参数值至关重要。GPTCache 保留了机器学习中温度参数的概…

Netty 粘包半包

什么是 TCP 粘包半包&#xff1f; 假设客户端分别发送了两个数据包 D1 和 D2 给服务端&#xff0c;由于服务端一次读取到的字节 数是不确定的&#xff0c;故可能存在以下 4 种情况。 &#xff08;1&#xff09;服务端分两次读取到了两个独立的数据包&#xff0c;分别…

kubernetes基于helm部署gitlab-operator

kubernetes基于helm部署gitlab-operator 这篇博文介绍如何在 Kubernetes 中使用helm部署 GitLab-operator。 先决条件 已运行的 Kubernetes 集群负载均衡器&#xff0c;为ingress-nginx控制器提供EXTERNAL-IP&#xff0c;本示例使用metallb默认存储类&#xff0c;为gitlab p…

ChatGPT“侵入”校园,教学评价体制受冲击,需作出调整

北密歇根大学的教授奥曼在学生作业中发现了一篇关于世界宗教的“完美论文”。“这篇文章写得比大多数学生都要好......好到不符合我对学生的预期&#xff01;”他去问ChatGPT&#xff1a;“这是你写的吗&#xff1f;”ChatGPT回答&#xff1a;“99.9%的概率是的。” ChatGPT“侵…

C++二叉搜索树剖析

目录 &#x1f347;二叉搜索树概念&#x1f348;二叉搜索树查找&#x1f349;二叉搜索树的插入&#x1f34a;二叉搜索树的删除&#x1f34d;二叉搜索树的查找、插入、删除实现&#x1f34b;二叉搜索树的应用&#x1f96d;二叉搜索树的性能分析&#x1f353;总结 &#x1f347;二…

Mac 创建和删除 Automator 工作流程,设置 Terminal 快捷键

1. 创建 Automator 流程 本文以创建一个快捷键启动 Terminal 的自动操作为示例。 点击打开 自动操作&#xff1b; 点击 新建文稿 点击 快速操作 选择 运行 AppleScript 填入以下内容 保存名为 “Open Terminal” 打开 设置 > 键盘&#xff0c;选择 键盘快捷键 以此选择 服…

Python(六十九)为什么要将元组设计成不可变序列

❤️ 专栏简介&#xff1a;本专栏记录了我个人从零开始学习Python编程的过程。在这个专栏中&#xff0c;我将分享我在学习Python的过程中的学习笔记、学习路线以及各个知识点。 ☀️ 专栏适用人群 &#xff1a;本专栏适用于希望学习Python编程的初学者和有一定编程基础的人。无…

git bash 安装sdkadmin

1.下载相关安装包,复制到git 安装目录 D:\software\Git\mingw64\bin 2. 运行 curl -s "https://get.sdkman.io" | bash

心跳跟随的心形灯(STM32(HAL)+WS2812+MAX30102)

文章目录 前言介绍系统框架原项目地址本项目开发开源地址硬件PCB软件功能 详细内容硬件外壳制作WS2812级联及控制MAX30102血氧传感器0.96OLEDFreeRTOS 效果视频总结 前言 在好几年前&#xff0c;我好像就看到了焊武帝 jiripraus在纪念结婚五周年时&#xff0c;制作的一个心跳跟…