13-14-15-RabbitMq工作模式深度剖析与Spring整合MQ以及RabbitMq高级特性

news2024/12/26 11:36:14

RabbitMQ消息传递流程
连接( Connection)
在RabbitMQ中,生产者和消费者与RabbitMQ的通信就是基于TCP连接的。不过呢我们知道TCP连接的创建和销毁在高并发场景下对于操作系统来说都是特别昂贵的开销,所以RabbitMQ又引入了信道的概念
信道(Channel)
信道是一个类似于NIO(一种TCP多路复用技术)的技术,在RabbitMQ中每个生产者、消费者线程各把持一个信道,多个信道复用了同一个TCP 连接。当每个信道的流量不是很大时,复用单连接可以在产生性能瓶颈的情况下有效地节 TCP 连接资源。当信道本身的流量很大时,就会开辟多连接,将这些信道均摊到这些连接中

消息流转过程

生产者消息投递过程
生产者连接到Broker 建立一个连接,然后开启一个信道
接着生产者声明一个交换器 ,并设置相关属性,比如交换机类型、是否持久化、是否自动删除、是否内置等
生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除、消息最大过期时间、消息最大长度、消息最大字节数等
生产者通过路由键将交换器和队列绑定起来
生产者发送消息至Broker ,发送的消息包含消息体和含有路由键、交换器、优先级、是否持久化、过期时间、延时时间等信息的标签
相应的交换器根据接收到的路由键查找相匹配的队列如果找到 ,则将从生产者发送过来的消息存入相应的队列中
如果没有找到 ,则根据生产者配置的属性选择丢弃还是回退给生产者
关闭信道
关闭连接
消费者消费消息过程
消费者连接到Broker ,建立一个连接,开启一个信道
消费者向 RabbitMQ Broker 请求消费相应队列中的消息,在这个过程中可能会设置消费者标签、是否自动确认、是否排他等
等待 RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
消费者确认接收到的消息
RabbitMQ从队列中删除相应己经被确认的消息
关闭信道
关闭连接。

消息怎么路由?
从概念上来说,消息路由必须有三部分:交换器、路由、绑定。生产者把消息发布到交换器上;绑定决定了消息如何从路由器路由到特定的队列;消息最终到达队列,并被消费者接收。
消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则)。如果能够匹配到队列,则消息会投递到相应队列中;如果不能匹配到任何队列,消息将进入 “黑洞”。
常用的交换器主要分为一下三种:
direct:如果路由键完全匹配,消息就被投递到相应的队列
fanout:如果交换器收到消息,将会广播到所有绑定的队列上
topic:可以使来自不同源头的消息能够到达同一个队列。使用topic交换器时,可以使用通配符。
比如:“*” 匹配特定位置的任意文本, “.” 把路由键分为了几部分,“#” 匹配所有规则等。
特别注意:发往topic交换器的消息不能随意的设置选择键(routing_key),必须是由"."隔开的一系列的标识符组成。

如何避免消息重复投递或重复消费?
消息重复投
在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;
重复消费
在消息消费时,要求消息体中必须要有一个唯一的bizId作为去重和幂等的依据,避免同一条消息被重复消费。

如何解决丢数据的问题?
1.生产者丢数据
生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。
然而缺点就是吞吐量下降了。因此,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
2.消息队列丢数据
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢
①、将queue的持久化标识durable设置为true,则代表是一个持久的队列
②、发送消息的时候将deliveryMode=2 标记成持久化。这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)
3.消费者丢数据
启用手动确认模式可以解决这个问题
rabbitmq如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,rabbitmq认为你都消费了,这数据就丢了。
这个时候得用rabbitmq提供的ack机制,简单来说,就是你关闭rabbitmq自动ack,可以通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那rabbitmq就认为你还没处理完,这个时候rabbitmq会把这个消费分配给别的consumer去处理,消息是不会丢的。

多个消费者监听一个队列时,消息如何分发?
轮询: 默认的策略,消费者轮流,平均地接收消息
公平分发: 根据消费者的能力来分发消息,给空闲的消费者发送更多消息
//当消费者有x条消息没有响应ACK时,不再给这个消费者发送消息

如何保证消息的顺序性
一个队列只有一个消费者的情况下才能保证顺序,否则只能通过全局ID实现(每条消息都一个msgId,关联的消息拥有一个parentMsgId。可以在消费端实现前一条消息未消费,不处理下一条消息;也可以在生产端实现前一条消息未处理完毕,不发布下一条消息)

RabbitMQ的集群模式和集群节点类型
1:单一模式:
即单机情况不做集群,就单独运行一个rabbitmq而已。
2:普通模式:
默认模式,以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈。当rabbit01节点故障后,rabbit02节点无法取到rabbit01节点中还未消费的消息实体。如果做了消息持久化,那么得等rabbit01节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象。
3:镜像模式:
把需要的队列做成镜像队列,存在与多个节点属于,RabbitMQ的HA方案。该模式解决了普通模式中的问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用。

死信队列和延迟队列的使用
死信队列
其实和普通的队列没啥大的区别,都需要创建自己的Queue、Exchange,然后通过RoutingKey绑定到Exchange上去,只不过死信队列的RoutingKey和Exchange要作为参数,绑定到正常的队列上去,一种应用场景是正常队列里面的消息被basicNack或者reject时,消息就会被路由到正常队列绑定的死信队列中,还有一种还有常用的场景就是开启了自动签收,然后消费者消费消息时出现异常,超过了重试次数,那么这条消息也会进入死信队列,–补充:DLX,全称为
Dead-Letter-Exchange,死信交换器,死信邮箱。当消息在一个队列中变成死信 (
dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。
延时队列
顾名思义,不是及时的队列,也就是发送者发给的消息要延时一段时间,消费者才能接受的到,这里有个典型的应用场景就是订单30分钟内未支付就关闭订单,当然死信队列也是可以实现的,这里只演示消息的延时消费逻辑,订单逻辑就一个判断,这里不做讨论。

消息的可靠投递
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
confirm 确认模式
return 退回模式

rabbitmq 整个消息投递的路径为:
producer—>rabbitmq broker—>exchange—>queue—>consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback 。
消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递。

消息的可靠投递小结
设置ConnectionFactory的publisher-confirms=“true” 开启 确认模式。

使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。

使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。

Consumer Ack
ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
自动确认:acknowledge=“none”
手动确认:acknowledge=“manual”
根据异常情况确认:acknowledge=“auto”

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

Consumer Ack 小结
在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认

如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息

如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
消息可靠性总结
持久化
exchange要持久化
queue要持久化
message要持久化
生产方确认Confirm
消费方确认Ack
Broker高可用

TTL
TTL 全称 Time To Live(存活时间/过期时间)。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
TTL 小结
设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

如果两者都进行了设置,以时间短的为准。

死信队列
信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
在这里插入图片描述
消息成为死信的三种情况:

  1. 队列消息长度到达限制;

  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定死信交换机
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
在这里插入图片描述

死信队列小结

  1. 死信交换机和死信队列和普通的没有区别

  2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

  3. 消息成为死信的三种情况:
    队列消息长度到达限制;
    消费者拒接消费消息,并且不重回队列;
    原队列存在消息过期设置,消息到达超时时间未被消费;

延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。
  2. 新用户注册成功7天后,发送短信问候。
    实现方式:
    定时器
    延迟队列
    在这里插入图片描述
    很可惜,在RabbitMQ中并未提供延迟队列功能。
    但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
    在这里插入图片描述
    延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
    RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。

消息幂等性保障
等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

消息幂等性保障–乐观锁机制
在这里插入图片描述
消息积压
-消费者宕机积压
-消费者消费能力不足积压
-发送者发流量太大

解决方案:上线更多的消费者,进行正常消费上线专门的队列消费服务,将消息先批量取出来,记录数据库,再慢慢处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/115582.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

云原生之使用Docker部署轻量级web服务器lighthttpd

云原生之使用Docker部署轻量级web服务器lighthttpd一、Lighthttpd介绍二、检查系统版本三、检查docker状态四、下载lighthttpd镜像五、部署lighthttpd1.创建数据目录2.创建lighthttpd容器3.查看容器状态六、访问lighthttpd服务七、编辑index.html1.编辑index.html文件2.重新访问…

Hadoop大数据存算分离方案:计算层无缝对接存储系统

Hadoop的诞生改变了企业对数据的存储、处理和分析的过程,加速了大数据的发展。随着大数据系统建设的深入,企业的数据基础设施易出现计算资源浪费、存储性能低、管理成本过高等挑战。相比存算一体架构,存算分离架构具有性能与成本最优、兼具灵…

3D地图app

3D三维地图APP 发布时间:2018-07-19 版权: 3D地图依据高程数据等对地表进行渲染,实现地表的起伏,模拟出真实的三维场景,让你有如身临其境般的感觉。 (注:Bigemap 3D地图是一个三维地图浏览功能…

项目沟通怎么才能不像在吵架?

项目沟通并非吵架,看起来却总是剑拔弩张。有效沟通才能真正解决问题,笔者给出了一些实用的建议,从对象到场景,再到方法与技巧,应该在沟通中有针对性地注意这些问题。 沟通是个老话题,在项目管理中有专门讲沟…

draw.io使用教程

大部分的绘图应用都离不开三个基本的元素,图形,链接,文本。每个元素都有基本的操作和样式,元素与元素之间又可以进行组合,“三生万物”,生成各种各样的图表。 如果没有这款绘图的 可以点击获取 : drawio文…

企业项目管理的不同与好处

大型企业组织通常同时运行多个复杂项目。尽管这些项目看起来不一定相互关联,但它们都会影响同一个企业组织。企业项目管理(EPM)是指在公司范围内管理项目的实践。它通常涉及实施战略和流程,以大规模简化和提高项目管理的有效性。根据项目管理协会(PMI)的…

burpsuite靶场——XXE

文章目录什么是XML?什么是XML实体?什么是文档类型定义(DTD)?什么是XML自定义实体?什么是XML外部实体?使用外部实体利用 XXE 来检索文件利用 XXE 执行 SSRF 攻击盲XXE漏洞带外交互的盲 XXE过 XML 参数实体进行带外交互的…

【AJAX】AJAX的跨域问题

AJAX的跨域问题跨域的概述区别同源与不同源同源策略有什么用?AJAX跨域解决方案方案一、设置响应头方案二、jsonp方案三、代理机制(httpclient)跨域的概述 跨域是指从一个域名的网页去请求另一个域名的资源。比如从百度(https://ba…

WPF控件模板、数据模板、容器样式选择器

WPF控件模板 利用Tag来绑定控件模板内容 <!--模板定义--> <Style x:Key"ButtonStyle1" TargetType"{x:Type Button}"><Setter Property"Template"><Setter.Value><ControlTemplate TargetType"{x:Type Button…

声音事件检测metric:PSDS

论文&#xff1b;A FRAMEWORK FOR THE ROBUST EVALUATION OF SOUND EVENT DETECTION Abstract 这项工作为多声道声音事件检测&#xff08;SED&#xff09;系统的性能评估定义了一个新的框架&#xff0c;它克服了传统的collar-based事件决定、事件F-cores和事件错误率的限制。…

【Kotlin 协程】Flow 流组合 ( Flow#zip 组合多个流 | 新组合流的元素收集间隔与被组合流元素发射间隔的联系 )

文章目录一、Flow 流组合1、Flow#zip 组合多个流2、新组合流的元素收集间隔与被组合流元素发射间隔的联系一、Flow 流组合 1、Flow#zip 组合多个流 调用 Flow#zip 函数 , 可以将两个 Flow 流合并为一个流 ; Flow#zip 函数原型 : /*** 将来自当前流( this )的值压缩到[其他]流&…

第二十六章 数论——欧拉函数(详解与证明)

第二十六章 数论——欧拉函数&#xff08;详解与证明&#xff09;欧拉函数1、互质2、欧拉函数的定义3、欧拉函数的公式4、欧拉函数的证明5、欧拉函数的使用&#xff08;1&#xff09;问题一&#xff1a;思路代码&#xff08;2&#xff09;问题二&#xff1a;思路case1case1case…

2022/12/17 MySQL索引失效的底层原理

1 复合索引-最左前缀原理 where子句中使用最频繁的一列放在最左边&#xff1b;我们在&#xff08;a,b,c&#xff09;字段上建了一个联合索引&#xff0c;所以这个索引是先按a 再按b 再按c进行排列的&#xff0c;所以&#xff1a;以下的查询方式都可以用到索引 select * from …

emacs下安装eaf

emacs下安装eaf插件 原因 eaf插件一开始还有点排斥&#xff0c;觉得emacs终端下操作多好多流畅。想要浏览器&#xff0c;终端和pdf再快速切换就可以了&#xff0c;毕竟我用i3wm/yabai窗口管理器。 但是想到当初也是vim用的多学得多&#xff0c;emacs就不愿意去接触学习&#…

Linux系统下的压缩和解压指令

Linux系统下的压缩和解压指令 gzip/gunzip指令 gzip&#xff1a;用于压缩文件&#xff1b;gunzip&#xff1a;用于解压的 语法&#xff1a;gzip file 以及 gunzip file.gz (压缩文件&#xff0c;只能将文件压缩为*.gz文件) gzip /home/hello.txt: gzip压缩&#xff0c;将/home下…

Android设计模式详解之观察者模式

前言 观察者常用于订阅-发布系统&#xff0c;能够将观察者和被观察者进行解耦&#xff0c;降低两者之间的依赖&#xff1b; 定义&#xff1a;定义对象间一种一对多的依赖关系&#xff0c;使得每当一个对象改变状态时&#xff0c;则所有依赖于它的对象都会得到通知并被自动更新…

湖南人与江西人,关系有多密切?回顾四省填湘的历史

现在的湖南人&#xff0c;可能90%都是四省填湘的移民后代&#xff0c;这种说法可信吗&#xff1f;而湖南人又与江西人的关系有多密切呢&#xff1f;我们来回顾一下四省填湘的历史吧。 四省填湘的背景 在历史上&#xff0c;湖南属于广义上的荆楚之地&#xff0c;是春秋战国时…

图解深度学习-神经网络

深度学习 深度学习是一种统计学习方法&#xff0c;可以在大量数据中自动提取关键特征信息。 深度学习的分类 深度学习的起源有感知器和基于图模型的玻尔兹曼机。在这两个的基础上引入多层结构形成了现在的深度学习。 神经网络的历史 M-P模型和感知器模型 M-P模型是首个通过…

中断和中断系统

目录 中断的引入 中断的概念 中断源&#xff08;2018年&#xff09;背会 中断系统的功能 1&#xff1a;实现中断及返回 2&#xff1a;实现优先权排队&#xff08;中断判优&#xff09; 3&#xff1a;高级中断源能中断低级的中断处理 CPU对外部可屏蔽中断的响应以及中断过…

AtCoder Grand Contest 060 A - No Majority

比赛名称&#xff1a;AtCoder Grand Contest 060 比赛链接&#xff1a;AtCoder Grand Contest 060 A - No Majority 题意&#xff1a; 一个由小写英文字母组成的字符串x被认为是好的&#xff0c;当且仅当以下条件得到满足。 x的每一个长度为2或更大的&#xff08;连续的&am…