RabbitMQ之三种队列之间的区别及如何选型

news2024/12/28 20:56:36

目录

不同队列之间的区别

Classic经典队列

Quorum仲裁队列

Stream流式队列

如何使用不同类型的队列​

Quorum队列

Stream队列


不同队列之间的区别

Classic经典队列

这是RabbitMQ最为经典的队列类型。在单机环境中,拥有比较高的消息可靠性。

 

       经典队列可以选择是否持久化(Durability)以及是否自动删除(Auto delete)两个属性。其中,Durability有两个选项,Durable和Transient。 Durable表示队列会将消息保存到硬盘,这样消息的安全性更高。但是同时,由于需要有更多的IO操作,所以生产和消费消息的性能,相比Transient会比较低。​ Auto delete属性如果选择为是,那队列将在至少一个消费者已经连接,然后所有的消费者都断开连接后删除自己。Arguments部分,还有非常多的参数,可以点击后面的问号逐步了解。

       在RabbitMQ中,经典队列是一种非常传统的队列结构。消息以FIFO先进先出的方式存入队列。消息被Consumer从队列中取出后就会从队列中删除。如果消息需要重新投递,就需要再次入队。这种队列都依靠各个Broker自己进行管理,在分布式场景下,管理效率是不太高的。并且这种经典队列不适合积累太多的消息。如果队列中积累的消息太多了,会严重影响客户端生产消息以及消费消息的性能。因此,经典队列主要用在数据量比较小,并且生产消息和消费消息的速度比较稳定的业务场景。比如内部系统之间的服务调用。


Quorum仲裁队列

       仲裁队列,是RabbitMQ从3.8.0版本,引入的一个新的队列类型,整个3.8.X版本,也都是在围绕仲裁队列进行完善和优化。仲裁队列相比Classic经典队列,在分布式环境下对消息的可靠性保障更高。官方文档中表示,未来会使用Quorum仲裁队列代替传统Classic队列。

 

       Quorum是基于Raft一致性协议实现的一种新型的分布式消息队列,他实现了持久化,多备份的FIFO队列,主要就是针对RabbitMQ的镜像模式设计的。简单理解就是quorum队列中的消息需要有集群中多半节点同意确认后,才会写入到队列中。这种队列类似于RocketMQ当中的DLedger集群。这种方式可以保证消息在集群内部不会丢失。同时,Quorum是以牺牲很多高级队列特性为代价,来进一步保证消息在分布式环境下的高可靠。从整体功能上来说,Quorum队列是在Classic经典队列的基础上做减法,因此对于RabbitMQ的长期使用者而言,其实是会影响使用体验的。

与普通队列的区别:

        Quorum队列大部分功能都是在Classic队列基础上做减法,比如Non-durable queues表示是非持久化的内存队列。Exclusivity表示独占队列,即表示队列只能由声明该队列的Connection连接来进行使用,包括队列创建、删除、收发消息等,并且独占队列会在声明该队列的Connection断开后自动删除。

       特例Poison Message handling(处理有毒的消息)。所谓毒消息是指消息一直不能被消费者正常消费(可能是由于消费者失败或者消费逻辑有问题等),就会导致消息不断的重新入队,这样这些消息就成为了毒消息。这些读消息应该有保障机制进行标记并及时删除。Quorum队列会持续跟踪消息的失败投递尝试次数,并记录在"x-delivery-count"这样一个头部参数中。然后,就可以通过设置 Delivery limit参数来定制一个毒消息的删除策略。当消息的重复投递次数超过了Delivery limit参数阈值时,RabbitMQ就会删除这些毒消息。当然,如果配置了死信队列的话,就会进入对应的死信队列。  

       Quorum队列更适合于长期存在的队列,并且在对容错、数据安全方面有更严格要求的场景。相对于追求低延迟、非持久化等高级队列,Quorum队列提供了更可靠的数据复制机制,以满足对数据一致性和高可用性的要求。

不适合使用的场景:
临时使用的队列:
        比如transient临时队列,exclusive独占队列,或者经常会修改和删除的队列。
对消息低延迟要求高:
       一致性算法会影响消息的延迟。
对数据安全性要求不高:
       Quorum队列需要消费者手动通知或者生产者手动确认。
队列消息积压严重 :
      如果队列中的消息很大,或者积压的消息很多,就不要使用Quorum队列。Quorum队列当前会将所有消息始终保存在内存中,直到达到内存使用极限。


Stream流式队列

​       Stream队列是RabbitMQ自3.9.0版本开始引入的一种新的数据队列类型。这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景。

       Stream队列的核心是以append-only只添加的日志来记录消息,整体来说,就是消息将以append-only的方式持久化到日志文件中,然后通过调整每个消费者的消费进度offset,来实现消息的多次分发。下方有几个属性也都是来定义日志文件的大小以及保存时间。如果你熟悉Kafka或者RocketMQ,会对这种日志记录消息的方式非常熟悉。这种队列提供了RabbitMQ已有的其他队列类型不太好实现的四个特点:

large fan-outs 大规模分发
       
当想要向多个订阅者发送相同的消息时,以往的队列类型必须为每个消费者绑定一个专用的队列。如果消费者的数量很大,这就会导致性能低下。而Stream队列允许任意数量的消费者使用同一个队列的消息,从而消除绑定多个队列的需求。
Replay/Time-travelling 消息回溯
​       RabbitMQ已有的这些队列类型,在消费者处理完消息后,消息都会从队列中删除,因此,无法重新读取已经消费过的消息。而Stream队列允许用户在日志的任何一个连接点开始重新读取数据。
Throughput Performance 高吞吐性能
​        Strem队列的设计以性能为主要目标,对消息传递吞吐量的提升非常明显。
Large logs 大日志
​       RabbitMQ一直以来有一个让人诟病的地方,就是当队列中积累的消息过多时,性能下降会非常明显。但是Stream队列的设计目标就是以最小的内存开销高效地存储大量的数据。使用Stream队列可以比较轻松的在队列中积累百万级别的消息。

       整体上来说,RabbitMQ的Stream队列,其实有很多地方借鉴了其他MQ产品的优点,在保证消息可靠性的基础上,着力提高队列的消息吞吐量以及消息转发性能。因此,Stream也是在视图解决一个RabbitMQ一直以来,让人诟病的缺点,就是当队列中积累的消息过多时,性能下降会非常明显的问题。RabbitMQ以往更专注于企业级的内部使用,但是从这些队列功能可以看到,Rabbitmq也在向更复杂的互联网环境靠拢,未来对于RabbitMQ的了解,也需要随着版本推进,不断更新。


如何使用不同类型的队列​

这几种不同类型的队列,虽然实现方式各有不同,但是本质上都是一种存储消息的数据结构。

Quorum队列

      Quorum队列与Classic队列的使用方式是差不多的。最主要的差别就是在声明队列时有点不同。如果要声明一个Quorum队列,则只需要在后面的arguments中传入一个参数,x-queue-type,参数值设定为quorum

Map<String,Object> params = new HashMap<>();
params.put("x-queue-type","quorum");
//声明Quorum队列的方式就是添加一个x-queue-type参数,指定为quorum。默认是classic
channel.queueDeclare(QUEUE_NAME, true, false, false, params);

       Quorum队列的消息是必须持久化的,所以durable参数必须设定为true,如果声明为false,就会报错。同样,exclusive参数必须设置为false。这些声明,在Producer和Consumer中是要保持一致的。

Stream队列

​        Stream队列相比于Classic队列,在使用上就要稍微复杂一点。如果要声明一个Stream队列,则 x-queue-type参数要设置为 stream 。

        Map<String,Object> params = new HashMap<>();
        params.put("x-queue-type","stream");
        params.put("x-max-length-bytes", 20_000_000_000L); // maximum stream size: 20 GB
        params.put("x-stream-max-segment-size-bytes", 100_000_000); // size of segment files: 100 MB
        channel.queueDeclare(QUEUE_NAME, true, false, false, params);

       与Quorum队列类似, Stream队列的durable参数必须声明为true,exclusive参数必须声明为false。这其中,x-max-length-bytes 表示日志文件的最大字节数。x-stream-max-segment-size-bytes 每一个日志文件的最大大小。这两个是可选参数,通常为了防止stream日志无限制累计,都会配合stream队列一起声明。

当要消费Stream队列时,要重点注意他的三个必要的步骤:

1. channel必须设置basicQos属性。 与Spring框架集成使用时,channel对象可以在@RabbitListener声明的消费者方法中直接引用,Spring框架会进行注入。

2. 正确声明Stream队列。 在Queue对象中传入声明Stream队列所需要的参数。

3. 消费时需要指定offset。 与Spring框架集成时,可以通过注入Channel对象,使用原生API传入offset属性。

       例如用原生API创建Stream类型的Consumer时,还必须添加一个参数x-stream-offset,表示从队列的哪个位置开始消费。  

 	Map<String,Object> consumeParam = new HashMap<>();
    consumeParam.put("x-stream-offset","last");
    channel.basicConsume(QUEUE_NAME, false,consumeParam, myconsumer);

x-stream-offset的可选值有以下几种:
first: 从日志队列中第一个可消费的消息开始消费
last: 消费消息日志中最后一个消息
next: 相当于不指定offset,消费不到消息。
Offset: 一个数字型的偏移量
Timestamp:一个代表时间的Data类型变量,表示从这个时间点开始消费。例如 一个小时前 Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)

       由于在Consumer中必须传入x-stream-offset这个参数,所以在与SpringBoot集成时,stream队列目前暂时无法正常消费。在目前版本下,使用RabbitMQ的SpringBoot框架集成,可以正常声明Stream队列,往Stream队列发送消息,但是无法直接消费Stream队列了。

       SpringBoot框架集成RabbitMQ后,为了简化编程模型,就把channel,connection等这些关键对象给隐藏了,目前框架下,无法直接接入这些对象的注入过程,所以无法直接使用。

       如果非要使用Stream队列,那么有两种方式,一种是使用原生API的方式,在SpringBoot框架下自行封装。另一种是使用RabbitMQ的Stream 插件。在服务端通过Strem插件打开TCP连接接口,并配合单独提供的Stream客户端使用。这种方式对应用端的影响太重了,并且并没有提供与SpringBoot框架的集成,还需要自行完善,因此Stream队列使用较少。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1420298.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Algorithms 4】算法(第4版)学习笔记 01 - 1.5 案例研究:union-find算法

文章目录 前言参考目录学习笔记1&#xff1a;动态连通性2&#xff1a;UF 实现 1&#xff1a;快速查找 quick-find2.1&#xff1a;demo 演示 12.2&#xff1a;demo 演示 22.3&#xff1a;quick-find 代码实现3&#xff1a;UF 实现 2&#xff1a;快速合并 quick-union3.1&#xf…

Powershell 并发任务 | Runspace 线程 | 结果获取

介绍 在 PowerShell 中进行多任务处理&#xff08;Multithreading 或 Parallel Processing&#xff09;主要目的是提高脚本的执行效率和性能。对于需要处理大量数据或执行多个独立任务的脚本来说尤其有用。 提高性能&#xff1a; 多任务处理允许脚本同时执行多个任务&#xff…

Centos7 双机单网卡安装 OpenStack

虚拟机配置 1&#xff1a;准备虚拟机2台&#xff0c;配置如下 openstack master----192.168.20.205 2cpu&#xff0c;8G内存&#xff0c;200G硬盘&#xff0c;网络桥接方式--静态IP----单网卡 node1计算节点---192.168.20.215 2cpu&#xff0c;8G内存&#xff0c;200G硬盘&a…

数据结构【初阶】--排序(归并排序和基数排序)

目录 一.归并排序的非递归写法 1.思想应用 2.代码基本实现 (1)单趟归并逻辑 (2)多趟&#xff08;循环&#xff09;的控制条件 ① 迭代条件&#xff1a;i2*gap ② 结束条件&#xff1a;i(或i<n-2*gap)<> (3)代码展示 ① 单趟逻辑 ②整体逻辑 3.优化代码…

《HTML 简易速速上手小册》第6章:HTML 语义与结构(2024 最新版)

文章目录 6.1 语义化标签的重要性6.1.1 基础知识6.1.2 案例 1&#xff1a;使用 <article>, <section>, <aside>, <header>, 和 <footer>6.1.3 案例 2&#xff1a;构建带有嵌套语义化标签的新闻网站6.1.4 案例 3&#xff1a;创建一个带有 <mai…

HAL库之看门狗

一、看门狗的介绍 &#xff08;1&#xff09;独立看门狗(IWDG)&#xff1a;独立看门狗由专用的低速时钟(LSI)驱动&#xff0c;即使主时钟发生故障它也仍然有效。因此叫独立&#xff0c;同时因此在低功耗模式下不能启动看门狗&#xff0c;低功耗详情见之前文章。IWDG比WWDG更精…

macos Android平台签名证书(.keystore)

一、申请appid的使用说明&#xff08;有appid的请忽略申请appid&#xff09; 创建应用 申请的appid在源码视图填写后会自动生成一个对应的包名 ⚠️注意&#xff1a;申请appid的时候应用名称和项目名称保持一致。 二、 Android如何使用自用证书进行打包 1.找到安装jdk的路径…

D25XB100-ASEMI整流桥D25XB100参数、封装、规格

编辑&#xff1a;ll D25XB100-ASEMI整流桥D25XB100参数、封装、规格 型号&#xff1a;D25XB100 品牌&#xff1a;ASEMI 正向电流&#xff08;Id&#xff09;&#xff1a;25A 反向耐压&#xff08;VRRM&#xff09;&#xff1a;1000V 正向浪涌电流&#xff1a;350A 正向…

win10设置重启自动运行

大家知道windows系列非常不稳定&#xff0c;经常更新&#xff0c;蓝屏&#xff0c;死机等。 所以服务部署在上面会经常挂掉。当他重启后&#xff0c;对应的服务要是没有开启成功就会出问题。 所以我们需要在重启后启动。 1.首先把你想要的执行的程序写一个bat脚本 这个是我的…

C语言第十三弹---VS使用调试技巧

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】 VS调试技巧 1、什么是bug 2、什么是调试&#xff08;debug&#xff09;&#xff1f; 3、Debug和Release​编辑​ 4、VS调试快捷键 4.1、环境准备 4.2、调试…

学习Spring的第十二天

Bean基本注解开发 创建一个空Maven项目: 创建完如下 之后在pom文件配置坐标 <dependencies><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.3.7</version></depe…

翻译: GPT-4 Vision征服LLM幻觉hallucinations 升级Streamlit六

GPT-4 Vision 系列: 翻译: GPT-4 with Vision 升级 Streamlit 应用程序的 7 种方式一翻译: GPT-4 with Vision 升级 Streamlit 应用程序的 7 种方式二翻译: GPT-4 Vision静态图表转换为动态数据可视化 升级Streamlit 三翻译: GPT-4 Vision从图像转换为完全可编辑的表格 升级St…

Windows系统安装OpenSSH+VS Code结合内网穿透实现远程开发

文章目录 前言1、安装OpenSSH2、vscode配置ssh3. 局域网测试连接远程服务器4. 公网远程连接4.1 ubuntu安装cpolar内网穿透4.2 创建隧道映射4.3 测试公网远程连接 5. 配置固定TCP端口地址5.1 保留一个固定TCP端口地址5.2 配置固定TCP端口地址5.3 测试固定公网地址远程 前言 远程…

Netty-ChannelHandle的业务处理

ChannelHandle结构 ChannelHandler基础接口 基础接口里面定义的基础通用方法。增加handler&#xff0c;移除handler&#xff0c;异常处理。 ChannelInboundHandler public interface ChannelInboundHandler extends ChannelHandler {/*** The {link Channel} of the {link Ch…

学习PyQt5

1、布局之后&#xff0c;无法移动对象到指定区域&#xff0c;无法改变对象大小。 原因&#xff1a;因为CtrlA选中了整个窗口&#xff0c;然后布局的时候就相当于整个窗口都按照这种布局&#xff0c;如选了水平布局&#xff0c;按钮一直在中间&#xff0c;无法拖到其它位置。 …

如何安装配置HFS并实现无公网ip远程访问本地电脑共享文件

文章目录 前言1.下载安装cpolar1.1 设置HFS访客1.2 虚拟文件系统 2. 使用cpolar建立一条内网穿透数据隧道2.1 保留隧道2.2 隧道名称2.3 成功使用cpolar创建二级子域名访问本地hfs 总结 前言 在大厂的云存储产品热度下降后&#xff0c;私人的NAS热度快速上升&#xff0c;其中最…

那些年与指针的情仇(二)---二级指针指针与数组的那点事函数指针

关注小庄 顿顿解馋(&#xff61;&#xff65;∀&#xff65;)&#xff89;&#xff9e; 欢迎回到我们的大型纪录片《那些年与指针的爱恨情仇》&#xff0c;在本篇博客中我们将继续了解指针的小秘密&#xff1a;二级指针&#xff0c;指针与数组的关系以及函数指针。请放心食用&a…

【Kafka】服务器Broker与Controller详解

这里写自定义目录标题 Broker概述Broker总体工作流程Broker重要参数 Controller为什么需要Controller具体作用数据服务Leader选举选举流程脑裂问题羊群效应触发leader选举 Broker 概述 Kafka服务实例&#xff0c;负责消息的持久化、中转等功能。一个独立的Kafka 服务器被就是…

[N-135]基于springboot,vue高校图书馆管理系统

开发工具&#xff1a;IDEA 服务器&#xff1a;Tomcat9.0&#xff0c; jdk1.8 项目构建&#xff1a;gradle-5.6.4 数据库&#xff1a;mysql5.7 系统分前后台&#xff0c;项目采用前后端分离 前端技术&#xff1a;vueelementUI 服务端技术&#xff1a;springbootmybatisred…

【HarmonyOS应用开发】ArkUI 开发框架-基础篇-第一部分(七)

常用基础组件 一、组件介绍 组件&#xff08;Component&#xff09;是界面搭建与显示的最小单位&#xff0c;HarmonyOS ArkUI声明式开发范式为开发者提供了丰富多样的UI组件&#xff0c;我们可以使用这些组件轻松的编写出更加丰富、漂亮的界面。组件根据功能可以分为以下五大类…