Redis 实现消息队列

news2025/1/8 5:26:26

Redis 实现消息队列

文章目录

  • Redis 实现消息队列
    • 导引
    • 1. 基于List结构的消息队列
    • 2. 基于PubSub的消息队列
    • 3. 基于Stream的消息队列(推荐)
      • 3.1 XADD
      • 3.2 XREAD
      • 3.3 XGROUP

导引

消息队列(Message Queue),从概念上来理解就是用来存放消息的队列,最简单的消息队列模型包括以下三个角色:

  • 生产者:发送消息到消息队列
  • 消息队列:存储和管理信息,也被称为消息代理(Message Broker)
  • 消费者:从消息队列中获取消息并处理消息

Redis也为我们提供了三种不同的方式来实现消息队列:

  1. List结构:基于List结构模拟消息队列
  2. PubSub:基本的点对点消息模型
  3. Stream:比较完善的消息队列模型(推荐

1. 基于List结构的消息队列

这种方式比较简单,因为Redis的list数据结构是一个双向链表,很容易模拟出队列的效果。

队列是入口和出口不在一边,对此我们可以利用:LPUSH 结合 BRPOP,或者 RPUSH 结合 BLPOP 来实现先进先出的效果

在这里插入图片描述

:这里使用BRPOP而不是RPOP是因为BRPOP能够实现阻塞的效果而RPOP不能

使用该方式实现消息队列的优缺点如下:

优点

  • 利用Redis存储,不受限与JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 能够满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

2. 基于PubSub的消息队列

PubSub(发布订阅),是Redis2.0版本引入的消息传递模型,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能收到相关消息

它有以下命令:

  • SUBSCRIBE channel [channel]:订阅一个或多个频道

    在这里插入图片描述

  • PUBLISH channel msg:向一个频道发送消息

    在这里插入图片描述

  • PSUBSCRIBE pattern[pattern]:订阅与pattern格式匹配的所有频道

    在这里插入图片描述

具体操作如下所示:

在这里插入图片描述

该方式实现的消息队列支持多消费者的使用,但也存在着以下弊端:

  • 不能支持数据持久化,一旦redis宕机数据就会丢失
  • 无法避免消息丢失
  • 消息堆积有上限,超出上限后数据会丢失

3. 基于Stream的消息队列(推荐)

Stream是Redis5.0引入的一种新数据类型,能够实现功能完善的消息队列,因为它本身就是一个消息队列,所以我们可以直接通过命令来使用它:

3.1 XADD

作用:发送消息

在这里插入图片描述

其中:

  • key:队列名称

  • [NOMKSTREAM]:如果队列不存在,是否自动创建队列,默认是自动创建

  • [MAXLEN|MINID [=|~] threshold [LIMIT count]]:设置消息队列的最大消息数量

  • *|ID:消息的唯一id,表示由Redis自动生成,格式是“时间戳-递增数字”,一般推荐使用来自动生成

  • field value [field value …]:发送到队列中的消息,以键值对的格式录入,可以多个同时录入

举个栗子🌰:

创建一个名为 users 的队列,并向其中发送一个消息,内容是:{name=Json, age=25},并使用Redis自动生成ID

在这里插入图片描述

3.2 XREAD

作用:读取消息

在这里插入图片描述

其中:

  • [COUNT count]:指定每次读取消息的最大数量
  • [BLOCK milliseconds]:当队列中没有消息时,阻塞指定时长,单位为秒
  • STREAMS key [key …]:要从哪个队列中读取消息,key就是队列名,可以指定多个队列
  • ID [ID …]起始ID,只返回大于该ID的消息,其中0代表从第一个消息开始,$代表从最新的消息开始

举个栗子🌰:

读取users队列中的第一个消息

在这里插入图片描述

:在上述测试中我们只往users队列中添加了一个消息,这个时候如果ID使用$来获取最新消息,且设置了阻塞等待的话,此时读取信息将在阻塞时间过后返回空:

在这里插入图片描述

3.3 XGROUP

消费者组,一个消费者组中可以有多个消费者来操作同一个消息队列

在这里插入图片描述

通常由以下命令组成:

  • 创建消费者组:

    XGROUP Create key groupName ID [MKSTREAM]
    

    其中:

    • key:队列名称
    • groupName:消费者组名称
    • ID:起始ID标识,0代表队列中第一个消息,$代表队列最后一个消息
    • MKSTREAM:队列不存在时自动创建队列

    在这里插入图片描述

    :这里要求队列key已经存在才能创建消费者组,否则需要开启MKSTREAM让其自动创建新的队列

    在这里插入图片描述

  • 删除指定的消费者组:

    XGROUP Destroy key groupName
    

    在这里插入图片描述

  • 给指定消费者组添加消费者

    XGROUP CREATECONSUMER key groupName consumerName
    

    在这里插入图片描述

  • 删除指定消费者组中的消费者

    XGROUP DELCONSUMER key groupName consumerName
    

    在这里插入图片描述

  • 从消费者组中读取消息

    XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
    

    在这里插入图片描述

    其中:

    • group:消费者组名称
    • consumer:消费者名称,如果消费者不存在,会根据该名称自动创建一个消费者
    • count:本次查询的最大数量
    • BLOCK milliseconds:阻塞时间,没有消息时会进行等待,以毫秒为单位
    • NOACK:选择后无需手动ACK,获取到消息后自动确认,一般不建议设置,当我们获取完消息后需要手动确认ack
    • STREAMS key:指定队列名称
    • ID:获取消息的起始ID,它有以下情况:
      • >”:表示从下一个未消费的消息开始
      • 其它:根据指定id从pending-list中获取消息,pending-list用于专门存放那些已消费但未确认的消息;例如此时ID为0,表示获取pending-list中的第一个消息.

    同一个消费者组中的消费者读取同一个消息队列时,若ID使用>来读取,则下一个读取的消息一定是前面的消费者没有读取到的消息,直到消息队列中的消息都被读取过后,最后一个读取的消费者返回nil

    举个栗子🌰:

    我们创建一个队列叫list,再添加几条消息

    在这里插入图片描述

    在这里插入图片描述

    创建一个消费者组g1监听list消息队列

    在这里插入图片描述

    通过XREADGROUP命令为消费者组g1添加消费者c1、c2、c3来读取list队列消息

    在这里插入图片描述

可以看到,同一个消费者组中的消费者,它们都在获取同一个队列中的消息,且ID使用>来读取,下一个读取的消息一定是前面的消费者没有读取到的消息,直到消息全部被读完后只返回nil!

每读取完一条消息,我们需要对它进行手动确认,使其从pending-list中移除,使用下述命令可以查看已读取但还未确认的消息:

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

在这里插入图片描述

我们查看一下list中还有多少未确认的消息:

在这里插入图片描述

好的都没被确认,所以需要我们手动去确认消息,使其从pending-list中移除,操作命令如下:

XACK key group ID [ID ...]

在这里插入图片描述

上述的ID为添加消息时自动创建并返回的ID:

在这里插入图片描述

这样所有已读取的消息就会从pending-list中移除了!

这里贴上该消息队列在Java中的实现方式

//获取消息队列中的信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000ms STEAMS list >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
        Consumer.from("g1", "c1"),
        StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
        StreamOffset.create("list", ReadOffset.lastConsumed())
);

//ACK确认 SACK list g1 id
// 注:因为这里我们只从消息队列中获取一条信息(COUNT 1),所以list.get()使用索引0即可
stringRedisTemplate.opsForStream().acknowledge("list", "g1", list.get(0).getId()); 

//获取pending_list中的消息 XREADGROUP GROUP g1 c1 COUNT 1 STEAMS list 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
        Consumer.from("g1", "c1"),
        StreamReadOptions.empty().count(1),
        StreamOffset.create("list", ReadOffset.from("0"))
);
// 上述代码可以配合循环实现被消费者组不断监听的消息队列

以上便是对Redis实现消息队列的介绍了!!如果内容对大家有帮助的话请给这篇文章一个三连关注吧💕( •̀ ω •́ )✧✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1987766.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

芯片外置电阻RC如何实现振荡器功能?

大家好,这里是大话硬件。 这篇文章来实现DC-DC控制器内部的振荡器模块功能。 在调试DC-DC控制器时,如果要改变其开关频率,通常是修改芯片外围的RC参数, 如下图所示。 结合常用芯片UC3842系列的内部框图,实现方式如下: 实现方案如下: 根据上述的原理,实现的思路:…

springboot整合mybatis-plus和pagehelper插件报错,

在springboot和myabtisplus版本没有冲突的情况下&#xff0c;MybatisPlusAutoConfiguration配置类没有生效&#xff0c;查看该类发现存在生效条件&#xff0c;即&#xff1a; 1.必须存在数据源的Bean对象 2.必须存在SqlSessionFactory和SqlSessionFactoryBean类&#xff08;这…

【QT常用技术讲解】tableWidget右键菜单及多进程编程

前言 本文在QT项目的开发框架的基础上&#xff08;源代码&#xff09;增加tableWidget的右键菜单功能&#xff0c;并使用进程实现ping计算机的功能来讲解&#xff0c;本文不对进程间通信进行讲解。 概述 一个项目在开发过程中&#xff0c;通常面临着引入“第三方应用”&#x…

二叉树:镜像树,子结构,二叉树转链表,二叉树的倒数K个数,对称,Z型打印

1.把一棵二叉树转换为它的镜像树。 void mirror_tree(TreeNode *root) {if(rootNULL) return ;TreeNode *temproot->right;root->rightroot->left;root->lefttemp;mirror_tree(root->right);mirror_tree(root->left);}2、输入两棵二叉树A&#xff0c;B&…

Day 21

Java Script 1.什么是JavaScript 概述 JavaScript是一门世界上最流行的脚本语言 Java、JavaScript 一个合格的后端人员&#xff0c;必须要精通JavaScript 历史 JavaScript 的历史_javascript历史-CSDN博客 ECMAScript它可以理解为是JavaScript的一个标准 2.基本使用及…

AI智能名片微信小程序在社群运营中的价值与应用研究

摘要&#xff1a;在数字化转型的浪潮中&#xff0c;社群运营已成为企业营销策略的重要组成部分&#xff0c;它不仅促进了品牌与消费者之间的深度互动&#xff0c;还为企业带来了持续的用户增长和价值转化。本文深入探讨了AI智能名片微信小程序在社群运营中的创新应用&#xff0…

常用数据库详解:从关系型到非关系型的探索

常用数据库详解&#xff1a;深入探索关系型与非关系型数据库 在数据驱动的世界中&#xff0c;数据库系统是存储、管理、检索和更新数据的核心技术。从历史悠久的关系型数据库到新兴的非关系型数据库&#xff0c;每种数据库都有其独特的设计哲学、优势和适用场景。本文将深入探…

不同环境下RabbitMQ的安装-2 ARM架构、X86架构、Window系统环境下安装RabbitMQ

ARM架构、X86架构、Window系统环境下RabbitMQ的安装 RabbitMQ安装1 Erlang语言介绍2 安装Erlang2.1 ARM架构的CentOS虚拟机中安装Erlang2.2 X86架构的CentOS虚拟机中安装Erlang2.3 Windows系统安装Erlang2.3.1 下载Erlang2.3.2 安装Erlang2.3.3 配置Erlang2.3.4 检测Erlang 3.安…

资质延期成本预测:河南建筑装饰企业预算制定策略

资质延期成本预测对于河南建筑装饰企业来说是确保企业资质顺利延续的重要环节。以下是企业预算制定策略的一些关键点&#xff1a; 一、了解政策与要求 首先&#xff0c;河南建筑装饰企业需要详细了解河南省住房和城乡建设厅及相关部门关于资质延期的具体政策和要求。这包括延…

Linux Vim实用教程:从新手到专家的完全指南

Linux Vim最全面的教程涵盖了从Vim的安装、基本操作、高级功能到个性化配置等各个方面&#xff0c;旨在帮助用户全面掌握这款强大的文本编辑器。下面将详细介绍Linux Vim的功能&#xff1a; 安装过程 在Debian/Ubuntu系统上安装Vim&#xff1a;需要执行sudo apt-get update和su…

目标检测综述文章解读——Object Detection in 20 Years: A Survey

论文&#xff1a;Object Detection in 20 Years: A Survey 作者&#xff1a;Zhengxia Zou, Keyan Chen, Zhenwei Shi, Yuhong Guo, Jieping Ye 链接&#xff1a;https://arxiv.org/abs/1905.05055 这是一篇关于目标检测综述性文章&#xff0c;自2019年5月第一次提交后&#xff…

前端(react)框架nextjs

文章目录 一、什么是next.js1. 路由2. 打包 next build3. 部署 二、 next.js 和react区别三、webstorm使用nextjs四、开发常用总结如何修nextjs 启动监听的端口号&#xff1f;NGINX 反向代理 Next.js 项目配置 参考 一、什么是next.js 官网&#xff1a; https://www.nextjs.cn…

使用C#(winform)调用STK并展示其3D/2D控件

最近有个需求要求对STK做二次开发&#xff0c;要用自己写的界面但又要嵌入STK的3D/2D控件展示&#xff0c;后台调用STK引擎做计算。官方文档语焉不详&#xff0c;网上的资料要么太多重复&#xff08;到处抄来抄去&#xff09;&#xff0c;要么有诸多错漏之处&#xff0c;找了很…

GB/T 28181-2022 公共安全视频监控联网系统:信息传输、交换、控制技术要求pdf协议文档下载,同时附GBT-28181-2016.pdf

国标GB28181 2016标准已经执行很多年了&#xff0c;可以说效果非常好&#xff0c;去年最新的GB28181-2022标准细节也出来了&#xff0c;本来是一个国家级的标准&#xff0c;现在想要下载这个协议标准&#xff0c;一堆网址要会员&#xff0c;要积分&#xff0c;我整理了一下&…

LabVIEW压电陶瓷阻抗测试系统

开发了一种基于LabVIEW软件与PXI模块化仪器的压电陶瓷阻抗测试系统。该系统能在高电压工作条件下测量压电陶瓷的阻抗特性&#xff0c;包括阻抗模值与阻抗角的频率特性&#xff0c;为压电陶瓷的进一步分析与应用提供了重要参考。 项目背景 现有的阻抗测试仪大多只能在低电压条件…

OpenCV图像滤波(5)二维卷积滤波函数filter2D()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 cv::filter2D() 函数用于对图像应用二维卷积滤波器。这个函数可以用来实现多种图像处理操作&#xff0c;如模糊、锐化、边缘检测等。它通过将一个…

什么是Docke,部署dockers,和基本操作命令

Docker简介 什么是容器 容器是用来装东西的&#xff0c;Linux 里面的容器是用来装应用的&#xff1b; 容器就是将软件打包成标准化单元&#xff0c;以用于开发、交付和部署&#xff1b; 容器技术已经成为应用程序封装和交付的核心技术&#xff1b; 容器原理&#xff1a;容器…

如何在STEP 7 (TIA Portal)中配置访问共享的设备及模块内部共享输入/输出(MSI/MSO)功能

通过模块内部共享输入/输出&#xff08;MSI/MSO&#xff09;功能&#xff0c;输入或输出模块可以将其输入或输出数据最多提供给4个IO控制器。 这篇文档介绍了如何在STEP 7 (TIA Portal)中配置访问共享的设备及模块内部共享输入/输出功能。可以在两个不同的项目里或同一个项目里…

24.8.3数据结构|双向循环链表、静态链表

双向循环链表 节点类型与双链表的节点类型完全相同双向循环链表的操作也与双链表的操作基本一致。 例题 将自然数一到N按由小到大的顺序沿顺时针方向围成一个圈&#xff0c;然后以一为起点先沿顺时针方向数到第N个数将其划去&#xff0c;再沿逆时针方向数到第K个数将其滑去&a…

密码加密机的功能模块

密码加密机&#xff0c;也称为加密机或硬件加密模块(HSM&#xff0c;Hardware Security Module)&#xff0c;是一种通过国家商用密码主管部门鉴定并批准使用的国内自主开发的主机加密设备。以下是对密码加密机的详细解析&#xff1a; 一、基本概述 定义&#xff1a;密码加密机是…