Redis 篇-深入了解基于 Redis 实现消息队列(比较基于 List 实现消息队列、基于 PubSub 发布订阅模型之间的区别)

news2025/1/11 18:44:00

🔥博客主页: 【小扳_-CSDN博客】
❤感谢大家点赞👍收藏⭐评论✍

文章目录

        1.0 消息队列的认识

        2.0 基于 List 实现消息队列

        2.1 基于 List 实现消息队列的优缺点

        3.0 基于 PubSub 实现消息队列

        3.1 基于 PubSub 的消息队列优缺点

        4.0 基于 Stream 实现消息队列

        4.1 Stream 的单消费模式

        4.2 Stream 的消费组模式


        1.0 消息队列的认识

        消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包含 3 个角色:

        1)消息队列:存储和管理消息,也被称为消息代理(Message Broker)

        2)生产者:发送消息到消息队列。

        3)消费者:从消息队列获取消息并处理消息。

        2.0 基于 List 实现消息队列

        Redis 的 list 数据结构是一个双向链表,很容易模拟出队列效果。

实现思路:

        队列时入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP 或者 RPUSH 结合 LPOP 来实现。不过需要注意的是,当队列中没有消息时 RPOP 或 LPOP 操作会直接返回 null ,并不像 JVM 的阻塞队列那样会阻塞并等待消息,因此这里应该使用 BRPOP 或者 BLPOP 来实现阻塞效果。

代码演示:

        当数据要进入队列时,那么可以使用 LPUSH KEY VALUE 命令,KEY 为队列名称,VALUE 为数据值,将数据写入 Redis 中。当要获取数据的时,使用 BRPOP KEY TIMEOUT命令,KEY 为队列名称,TIMEOUT 为最大阻塞时间,在最大阻塞时间内,仍旧没有获取数据,则返回 null 。该命令主要做了两步,将数据移除队列中,并将该数据返回。

        2.1 基于 List 实现消息队列的优缺点

        优点:

        1)利用 Redis 存储,不受限于 JVM 内存上限。

        2)基于 Redis 的持久化机制,数据安全性有保证。

        3)可以满足消息有序性。

        缺点:

        1)无法避免消息丢失。

        当在 BRPOP 获取数据的时候,出现异常,返回数据失败,从而导致数据丢失。因为数据已经从队列中移除出来了,所以队列中已经不存在之前的数据了。

        2)只支持单消费者。

        当一个消费者来消费之后,其他再来的消费者就不能再获取到第一个消费者的数据,所以说数据只能给一个消费者。

        3.0 基于 PubSub 实现消息队列

        PubSub 是 Redis2.0 版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个 channel ,生产者向对应 channel 发送消息后,所有订阅者都能收到相关消息。

常用的命令:

        1)SUBSCRIBE channel [channel]:订阅一个或者多个频道。

        2)PUBLISH channel msg:向一个频道发送消息。

        3)PSUBSCRIBE pattern [pattern]:订阅与 pattern 格式匹配的所有频道。* 代表通配符,订阅所有频道。

        这就实现了支持多个消费者获取到相同的消息。当消息被发布了,那么已经订阅该频道的消费者就可以及时获取到消息了。

代码演示:

        先订阅频道:

        发送消息:

        当生产者发送完消息,消费者就会收到通知,从通道中获取到消息。

        3.1 基于 PubSub 的消息队列优缺点

        优点:

        1)采用发布订阅模型,支持多生产、多消费。

        解决了基于 List 实现的消息队列的缺点,单消费。

        缺点:

        1)不支持数据持久化。

        将消息发布出去之后,不会进行数据保存。不管有无消费者订阅,都会将消息直接发布出去。

        2)无法避免消息丢失。

        因为不支持持久化,当消息丢失之后,无法再找到原本的数据。

        3)消息堆积有上限,超出时数据丢失。 

        在消费者中,接收到的数据会暂时存放起来,一旦超过存放的大小,就会导致数据丢失。

        4.0 基于 Stream 实现消息队列

        Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息的命令:

XADD key ID field string [field string ...]

         key 为队列名称,*|ID 为消息的唯一 id,* 代表由 Redis 自动生成。格式是“时间戳-递增数字”,例如 "1644804662707-0"。field value 代表发送到队列中的消息,称为  Entry 。格式就是多个 key-value 键值对。

代码演示:

        4.1 Stream 的单消费模式

单消费者获取数据的命名:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

        COUNT count 为每次读取消息的最大数量;BLOCK milliseconds 代表当没有消息时,是否阻塞,阻塞时长;STREAMS key 代表要从哪个队列读取消息,key 就是队列名;ID 代表起始 id ,只返回大于该 ID 的消息,0 为从第一个消息开始,而 $ 为从最新的消息开始。

代码演示:

        当 ID 使用 $ 时,不会从原本 s 中直接获取原本的数据,而是在 2 秒内有无最新的数据添加进来,如果有,则返回该数据;如果没有,则返回 null。

        当 ID 使用 0 时,则从原本 s 中直接获取原本的数据。

        在业务开发中,我们可以循环的调用 XREAD 阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:

        需要注意的地方:

        当使用 Stream 单消费者模式的时候,我们指定起始 ID 为 $ 时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过 1 条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。

        XREAD 命令特点:

        1)消息可回溯。

        2)一个消息可以被多个消费者读取。

        3)可以阻塞读取。

        4)有消息漏读的风险。

        4.2 Stream 的消费组模式

        将多个消费者划分到一个组中,监听同一个队列。

特点:

        1)消息分流:

        队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度。

        2)消息标示:

        消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费。

        3)消息确认:

        消费者获取消息后,消息处于 pending 状态,并存入一个 pending-list 。当处理完成后,需要通过 XACK 来确认消息,标记消息为已处理,才会从 pending-list 移除。

创建消费者组:

XGROUP CREATE key groupname id|$ [MKSTREAM]

        key 代表队列名称,groupName 代表消费者组名称,ID 起始 ID 标示,$ 代表队列中最后一个消息,0 则代表队列中第一个消息。MKSTREAMS 代表不存在时自动创建队列。

        如果之前列表的数据要继续获取,则 ID 选为 0;如果之前的列表中的数据不需要了,则 ID 选为 $ 。

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

        group 代表组名,consumer 代表组内消费者名称,count 代表每次读取的最大数量,milliseconds 代表当没有消息时最长的等待时间,NOACK 代表无需手动 ACK,获取消息后自动确认。key 代表指定队列名称,

        ID 代表获取消息的起始 ID :

                当 ID 为 ">" :从下一个未消费的消息开始。

                当 ID 为其他:根据指定 id 从 pending-list 中获取已消费但未确认的消息,例如 0,是从 pending-list 中的第一个消息开始。

确认消息:

XACK key groupName ID

        key 为队列名,groupName 为组名,ID 为消息唯一 id 。

查看未确认的消息:

XPENDING key group [start end count] [consumer]

        key 为队列名,group 为组名,start 起始地址,count 个数,consumer 组内消费者名称。

消费者监听消息思路:

 

Java 代码实现从消息队列中获取消息:

import cn.hutool.core.bean.BeanUtil;
import com.project.volunteermanagementproject.pojo.StreamObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.List;
import java.util.Map;
@Component
public class StreamUtil {
    @Autowired
    StringRedisTemplate stringRedisTemplate;

    //实现从消息队列中获取消息
    public void getStream(){
        while (true){
            try {
                List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read(
                        Consumer.from("g1", "c1"),
                        StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                        StreamOffset.create("s1", ReadOffset.lastConsumed())
                );

                if (read == null || read.isEmpty()){
                    //如果获取失败,说明没有消息,继续下一次循环
                    continue;
                }
                //解析消息中的消息
                MapRecord<String, Object, Object> entries = read.get(0);
                Map<Object, Object> value = entries.getValue();
                StreamObject streamObject = BeanUtil.fillBeanWithMap(value, new StreamObject(), true);
                //这就拿到了消息队列中的数据了,就可以去使用该对象了
                System.out.println(streamObject);
                //这就需要确认消息队列
                stringRedisTemplate.opsForStream().acknowledge("s1", "g1", entries.getId());
            } catch (Exception e) {
                //如果在获取消息过程中出现异常,则需要再次执行该消息任务
                while (true){
                    try {
                        List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read(
                                Consumer.from("g1", "c1"),
                                StreamReadOptions.empty().count(1),
                                StreamOffset.create("s1", ReadOffset.from("0"))
                        );
                        if (read == null || read.isEmpty()){
                            break;
                        }
                        MapRecord<String, Object, Object> entries = read.get(0);
                        Map<Object, Object> value = entries.getValue();
                        StreamObject streamObject = BeanUtil.fillBeanWithMap(value, new StreamObject(), true);
                        //重新拿到未确认的数据
                        System.out.println(streamObject);
                        //再次进行消息确认
                        Long acknowledge = stringRedisTemplate.opsForStream().acknowledge("s1", "g1", entries.getId());
                    } catch (Exception ex) {
                        throw new RuntimeException(ex);
                    }
                    
                }
            }

        }
    }
}

XREADGROUP 命令特点:

        1)消息可回溯

        2)可以多消费者争抢消息,加快消费速度

        3)可以阻塞读取

        4)没有消息漏读的风险

        5)有消息确认机制,保证消息至少被消费一次

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2131388.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2009-2023年上市公司华证esg评级、评分年度数据(含细分项)

2009-2023年上市公司华证esg评级、评分年度数据&#xff08;含细分项&#xff09; 1、时间&#xff1a;2009-2023年 2、来源&#xff1a;整理自wind 3、指标&#xff1a;证券代码、年份、证券简称、评级日期、综合评级、综合得分、E评级、E得分、S评级、S得分、G评级、G得分…

《论负载均衡技术在Web系统中的应用》写作框架,软考高级系统架构设计师

论文真题 负载均衡技术是提升Web系统性能的重要方法。利用负载均衡技术&#xff0c; 可将负载(工作任务) 进行平衡、分摊到多个操作单元上执行&#xff0c; 从而协同完成工作任务&#xff0c; 达到提升Web系统性能的目的。 请围绕“负载均衡技术在Web系统中的应用”论题&…

《计算机组成原理:探索数字世界的基石》

《计算机组成原理&#xff1a;探索数字世界的基石》 在当今数字化的时代&#xff0c;计算机已经成为人们生活和工作中不可或缺的一部分。而要深入理解计算机的运作&#xff0c;就必须掌握计算机组成原理。 计算机组成原理是一门研究计算机硬件系统的学科&#xff0c;它涵盖了…

Linux-Curl使用

在 Linux 中&#xff0c;curl是一个强大的命令行工具&#xff0c;用于从服务器或其他 URL 地址获取数据或与网络服务进行交互。 对于自己写不明白的curl&#xff0c;可以使用postman、apipost等接口工具生成curl请求&#xff0c;用于测试 # 下载单个文件 默认将输出打印到标准…

Zabbix监控k8s云原生环境

传统监控的本质就是收集、分析和使用信息来观察一段时间内监控对象的运行进度&#xff0c;并且进行相应的决策管理的过程&#xff0c;监控侧重于观察特定指标。是随着云原生时代的到来&#xff0c;我们对监控的功能提出了更多的要求&#xff0c;要实现这些功能&#xff0c;就要…

python画图|3D垂线标记

在前述学习过程中&#xff0c;我们学习了二维坐标上的垂线标记画图&#xff0c;链接如下&#xff1a; python画图|垂线标记系列_python画点相对x轴的垂线-CSDN博客 也学习了3D作图基本方法&#xff1a; python画图|3D图基础教程-CSDN博客 现在我们尝试将这二者结合&#x…

自定义WPF滑块样式-Slider

在Windows应用程序开发中&#xff0c;滑块&#xff08;Slider&#xff09;是一个非常常见且有用的控件。它可以让用户通过拖动滑块来选择一个范围内的值。然而&#xff0c;WPF或UWP应用程序中的默认滑块样式可能并不总是符合我们的设计需求。因此&#xff0c;我们需要自定义滑块…

华为OD机试 - 伐木工 - 动态规划(Java 2024 E卷 200分)

华为OD机试 2024E卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;E卷D卷A卷B卷C卷&#xff09;》。 刷的越多&#xff0c;抽中的概率越大&#xff0c;私信哪吒&#xff0c;备注华为OD&#xff0c;加…

可以实时引入模块

缺点&#xff1a;循环会有点问题,不能死循环,最好在python脚本中起一个计时器 解释器在执行时时同步的,所以会见界面卡住,使用多个线程可以解决这个问题 或者使用 C的异步 一个完整的IDLE 麻烦,得把pyshell.py 弄能才能从tk 改到qt 内嵌到 dock

三维坐标变换

&#xff08;一些困惑梳理记录&#xff09; “坐标转换”的区分 1、坐标系基底变换 2、目标描述向量变换 总的来说&#xff0c; A属于1、坐标系基底变换&#xff0c; B中所述方法&#xff0c;可用于1、坐标系基底变换&#xff0c;也可用于2、目标描述向量变换&#xff0c…

文生视频算法

文生视频 Sora解决问题&#xff1a;解决思路&#xff1a; CogVideoX解决问题&#xff1a;解决思路&#xff1a; Stable Video Diffusion&#xff08;SVD&#xff09;解决问题&#xff1a;解决思路&#xff1a; 主流AI视频技术框架&#xff1a; Sora Sora: A Review on Backg…

SpringBoot项目请求返回json空字段过滤

接口返回的json中有的字段可能是为空的&#xff0c;我们不希望他为空的还返回&#xff0c;如下例子&#xff1a; 解决方案&#xff1a;只需要加一个配置类就行&#xff1a; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.Dese…

【每日刷题】Day120

【每日刷题】Day120 &#x1f955;个人主页&#xff1a;开敲&#x1f349; &#x1f525;所属专栏&#xff1a;每日刷题&#x1f34d; &#x1f33c;文章目录&#x1f33c; 1. 413. 等差数列划分 - 力扣&#xff08;LeetCode&#xff09; 2. 978. 最长湍流子数组 - 力扣&…

知识图谱增强在 360 文档知识问答及管理中的应用实践

主要内容包括以下几大部分&#xff1a; 360 文档云知识管理/问答应用场景 KG 在文档 RAG 问答中的应用 KG 在文档标准化、层次化、结构化应用 KG 与 LLM 在文档场景下的挑战及展望 问答环节 01 360 文档云知识管理/问答应用场景 首先来介绍一下 360 文档云知识管理问答的…

K1计划100%收购 MariaDB; TDSQL成为腾讯云核心战略产品; Oracle@AWS/Google/Azure发布

重要更新 1. 腾讯全球数字生态大会与9月5日-6日举行&#xff0c;发布“5T”战略&#xff0c;包括TDSQL、TencentOS、TCE&#xff08;专有云 &#xff09;、TBDS&#xff08;大数据&#xff09;、TI &#xff08;人工智能开发平台&#xff09;等 ( [2] ) ; 并正式向原子开源基金…

【无人机设计与控制】基于PID控制的四旋翼无人机系统Matlab仿真

摘要 本文基于PID控制设计了一种四旋翼无人机控制系统&#xff0c;并通过Matlab进行仿真验证。研究了姿态控制和位置控制的性能&#xff0c;仿真结果表明该系统在稳定性和响应速度方面具有良好的表现。本文的主要贡献是验证了PID控制器在多轴飞行器控制中的有效性&#xff0c;…

基于mongodb+flask(Python)+vue的实验室器材管理系统

实验室器材管理系统是一个现代化的、高度集成的软件解决方案&#xff0c;它结合了Flask作为后端框架&#xff0c;MongoDB作为数据库&#xff0c;以及Vue.js作为前端用户界面&#xff0c;专为优化和精简实验室设备及耗材的管理流程而设计。此系统旨在为实验室管理员、研究人员和…

3.C++入门(内联函数,c++11,auto,范围for,nullptr)

⭐本篇文章为C学习的第三篇&#xff1a;主要了解内联函数和部分c11新特性 ⭐本人c代码的Gitee仓库&#xff1a;c学习 橘子真甜/yzc的c学习 - 码云 - 开源中国 (gitee.com) 一. 内联函数 以inline修饰的函数称为内联函数&#xff0c;编译的时候c编译器会在内联函数的地方展开&a…

AI 平台 formulabot 介绍

AI 平台 formulabot 介绍 FormulaBot.com 是一个基于人工智能的数据分析平台&#xff0c;旨在简化数据处理和分析任务 主要功能 数据分析与可视化: Formula Bot 提供工具来分析、可视化和转换数据&#xff0c;使用户能够快速理解数据背后的信息。公式生成: 用户可以通过自然…

别再过度复杂化了,实体SEO其实就是SEO

“实体SEO”。听起来有点可怕&#xff0c;是不是&#xff1f;不仅“实体”这个词听起来有点陌生&#xff0c;还感觉又是要在你永无止境的SEO待办清单上再添加一项。你在SEO方面已经捉襟见肘了&#xff0c;但天啊&#xff0c;又有一个新事物需要你投入稀缺的资源。 不过我有好消…