一、基本结构
所有中间件技术都是基于 TCP/IP 协议基础之上进行构建新的协议规范,RabbitMQ遵循的是AMQP协议(Advanced Message Queuing Protocol - 高级消息队列协议)。
生产者发送消息流程:
- 1、生产者和
Broker
建立TCP
连接; - 2、生产者和
Broker
建立通道; - 3、生产者通过通道消息发送给
Broker
,由Exchange
将消息进行转发; - 4、
Exchange
将消息转发到指定的Queue
(队列)。
【详细】
1、消息生产者连接到`RabbitMQ Broker`,建立链接(Connection),在链接(Connection)上开启一个信道(Channel);
2、声明一个交换机(Exchange),并设置相关属性,比如交换机类型、是否持久化等;
3、声明一个队列(Queue),并设置相关属性,比如是否排他、是否持久化、是否自动删除等;
4、使用路由键(RoutingKey)将队列(Queue)和交换机(Exchange)绑定起来;
5、生产者发送消息至 RabbitMQ Broker,其中包含路由键、交换器等信息,根据路由键(RoutingKey)发送消息到交换机(Exchange);
6、相应的交换器(Exchange)根据接收到的路由键(RoutingKey)查找相匹配的队列如果找到 ,则将从生产者发送过来的消息存入相应的队列中;
7、如果没有找到 ,则根据生产者配置的属性选择丢弃还是回退给生产者;
8、关闭信道(Channel);
9、关闭链接(Connection);
消费者接收消息流程:
- 1、消费者和
Broker
建立TCP
连接; - 2、消费者和
Broker
建立通道; - 3、消费者监听指定的
Queue
(队列); - 4、当有消息到达
Queue
时Broker
默认将消息推送给消费者; - 5、消费者接收到消息;
- 6、
ack
回复。
【详细】
- 1、建立链接(Connection);
- 2、在链接(Connection)上开启一个信道(Channel);
- 3、请求消费指定队列(Queue)的消息,并设置回调函数(onMessage);
- 4、[MQ]将消息推送给消费者,消费者接收消息;
- 5、消费者发送消息确定(Ack[acknowledge]);
- 6、[MQ]删除被确认的消息;
- 7、关闭信道(Channel);
- 8、关闭链接(Connection);
MQ消费消息分发原理
1)一种是Pull模式,对应的方法是basicGet。
消息存放在服务端,只有消费者主动获取才能拿到消息。如果每搁一段时间获取一次消息,消息的实时性会降低。
但是好处是可以根据自己的消费能力决定消息的频率。
2)另一种是push,对应的方法是BasicConsume,只要生产者发消息到服务器,就马上推送给消费者,
消息保存客户端,实时性很高,如果消费不过来有可能会造成消息积压。Spring AMQP是push方式,
通过事件机制对队列进行监听,只要有消息到达队列,就会触发消费消息的方法。
二、RabbitMQ组成部分说明
-
Producer: 消息生产者,即生产方客户端,生产方客户端将消息发送;
-
Connection: TCP连接,生产者或消费者与消息队列RabbitMQ版间的物理TCP连接;
1)Connection会执行认证、IP解析、路由等底层网络任务。
2)应用与消息队列RabbitMQ版完成Connection建立大约需要15个TCP报文交互,因而会消耗大量的网络资源和消息队列RabbitMQ版资源。
3)一个进程对应一个Connection,一个进程中的多个线程则分别对应一个Connection中的多个Channel。
4)Producer和Consumer分别使用不同的Connection进行消息发送和消费; -
Channel: 在客户端的每个物理TCP连接里,可建立多个Channel,每个Channel代表一个会话任务。
1)Channel是物理TCP连接中的虚拟连接。
2)当应用通过Connection与消息队列RabbitMQ版建立连接后,所有的AMQP协议操作(例如创建队列、发送消息、接收消息等)都会通过Connection中的Channel完成。
3) Channel可以复用Connection,即一个Connection下可以建立多个Channel。
4) Channel不能脱离Connection独立存在,而必须存活在Connection中。
5) 当某个Connection断开时,该Connection下的所有Channel都会断开。 -
Broker: 消息队列服务进程,此进程包括两个部分:Exchange和Queue;
-
Exchange(交换器): 生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中。Exchange根据消息的属性或内容路由消息。
-
Queue: 消息队列,存储消息的队列,每个消息都会被投入到一个或多个Queue里;
-
Consumer: 消息消费者,即消费方客户端,接收MQ转发的消息;
-
Routing Key(路由键): 生产者在向Exchange发送消息时,需要指定一个Routing Key来设定该消息的路由规则。 Routing Key需要与Exchange类型及Binding Key联合使用才能生效。一般情况下,生产者在向Exchange发送消息时,可以通过指定Routing Key来决定消息被路由到哪个或哪些Queue;
-
Binding: 一套绑定规则,用于告诉Exchange消息应该被存储到哪个Queue。它的作用是把Exchange和Queue按照路由规则绑定起来。
-
Binding Key(绑定键): 用于告知Exchange应该将消息投递到哪些Queue中(生产者将消息发送给哪个Exchange是需要由RoutingKey决定的,生产者需要将Exchange与哪个队列绑定时需要由BindingKey决定的);
-
Virtual Host: 虚拟主机,本质上是一个mini版的RabbitMQ服务器,拥有自己的队列、交换机、绑定和权限机制,vhost是共享相同的身份认证和加密环境的独立服务器域。vhost是AMQP的基础,必须在连接时指定,RabbitMQ默认的vhost是/。
三、交换模式
Direct Exchange(直连模式)
【路由规则】 Direct Exchange根据Binding Key和Routing Key完全匹配的规则路由消息。
【使用场景】 Direct Exchange适用于通过简单字符标识符区分消息的场景。
Direct Exchange常用于单播路由。
Fanout Exchange(广播模式)
【路由规则】 Fanout Exchange忽略Routing Key和Binding Key的匹配规则,将消息路由到所有绑定的Queue。
【使用场景】 Fanout Exchange适用于广播消息的场景。例如,分发系统使用Fanout Exchange来广播各种状态和配置更新。
Topic Exchange(主题模式)
【路由规则】 Topic Exchange根据Binding Key和Routing Key通配符匹配的规则路由消息。
Topic Exchange支持的通配符包括星号(*
)和井号(#
)。 星号(*
)代表一个英文单词(例如cn)。 井号(#
)代表零个、一个或多个英文单词,英文单词间通过英文句号(.)分隔,例如cn.zj.hz。
【使用场景】 Topic Exchange适用于通过通配符区分消息的场景。
Topic Exchange常用于多播路由。例如,使用Topic Exchange分发有关于特定地理位置的数据。
Headers Exchange (头部交换机)
【路由规则】 Headers Exchange可以被视为Direct Exchange的另一种表现形式。
Headers Exchange可以像Direct Exchange一样工作,不同之处在于Headers Exchange使用Headers属性代替Routing Key进行路由匹配。
在绑定Headers Exchange和Queue时,可以设置绑定属性的键值对。然后,在向Headers Exchange发送消息时,设置消息的Headers属性键值对。
Headers Exchange将根据消息Headers属性键值对和绑定属性键值对的匹配情况路由消息。
匹配算法由一个特殊的绑定属性键值对控制。该属性为x-match,只有以下两种取值:
- 1)all:所有除x-match以外的绑定属性键值对必须和消息Headers属性键值对匹配才会路由消息。
- 2)any:只要有一组除x-match以外的绑定属性键值对和消息Headers属性键值对匹配就会路由消息。
以下两种情况下,认为消息Headers属性键值对和绑定属性键值对匹配:- 1、 消息Headers属性的键和值与绑定属性的键和值完全相同;
- 2、 消息Headers属性的键和绑定属性的键完全相同,但绑定属性的值为空。
【使用场景】 Headers Exchange适用于通过多组Headers属性区分消息的场景。Headers Exchange常用于多播路由。例如,涉及到分类或者标签的新闻更新。
生产者确认机制
1、确认原理
生产者将消息发送到exchange,exchange根据路由规则将消息投递到了queue。
- 1)Confirm确认:生产者发送消息到交换机时会存在消息丢失的情景,开启事务会导致吞吐量下降,Confirm机制就是消息发送到交换机(Exchange)时会触发Confirm回调。通过 publisher confirm (发送方确认机制)可以确定消息是否被成功路由到MQ broker从而选择是否重发等步骤。当生产者开启 publisher confirm 消息发送到MQ端之后,MQ会回一个ack给生产者,ack是个boolean值,为true消息成功发送到MQ。反之发送失败。
- 2)Return确认:从交换机到队列也有可能出现路由失败导致消息丢失情景(可能是MQ出问题导致queue和exchange绑定丢失,或者失误删除了绑定关系等),Return机制可解决这个问题,路由失败时可以通过Return回调来将路由失败的消息记录下来。
消费者确认机制
1、消费者确认原理
消费者确认是指当一条消息投递到消费者处理后,消费者发送给MQ broker的确认
(通俗的说就是 告知服务器这条消息已经被我消费了,可以在队列删掉 ,这样以后就不会再发了, 否则消息服务器以为这条消息没处理掉 重启应用后还会在发)。
有auto和manual两种
-
1)auto则由broker自行选择时机,一般可认为消息发送到消费者后就直接被ack,也即消息会被从队列中移除掉而不顾消息的处理逻辑是否成功;
-
2)manual则是需要消费者显式的去手动ack后消息才会被从队列中移除掉,通过这个机制可以限制在消息处理完之后再Ack或者nack; 开启手动确认模式,即由消费方自行决定何时应该ack,通过设置autoAck=false开启手动确认模式;
消息持久化
消息发送并保存到队列之后如果不做特殊处理是保存在内存中,当节点宕机重启或者内存故障等,会导致消息丢失,通过对消息进行持久化到磁盘可以降低这种风险, 除了对消息进行持久化还是不够,还需要对queue、exchange进行持久化。
RabbitMQ解决消息丢失问题
消息确认机制
RabbitMQ
提供了消息确认机制,即生产者在发送消息后,可以等待RabbitMQ服务器返回确认信息,以确保消息已经被正确地接收和处理。如果RabbitMQ服务器没有返回确认信息,生产者可以选择重新发送消息或者采取其他的补救措施。
生产者确认消息和重试
-
使用缓存:在
confirmCallback
中,将ack
为false
的消息存到缓存中。然后,可以使用另外的线程或者定时任务来处理这些失败的消息,进行重试。 -
设置重试次数:为了避免无限重试,我们可以设置一个重试次数的上限。当达到这个上限后,我们可以选择将消息发送到死信队列,或者进行其他的错误处理。
-
使用死信队列:在RabbitMQ中,我们可以设置一个死信队列来存储那些无法被正常处理的消息。当消息在主队列中被拒绝或者过期后,它们会被发送到死信队列。然后,我们可以对死信队列中的消息进行人工处理,或者在一段时间后再次进行处理。
事务机制
1.首先需要配置一个事务管理器
@Bean
PlatformTransactionManager platformTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
2.然后在生产者上添加事务注解以及设置通信通道为事务模式。
@Transactional
- 开启事务机制就三步:
- 配置事务管理器
- 使用 @Transactional 注解开启事务
- 调用 setChannelTransacted 方法设置消息通道为事务模式,即设置为 true
4.当我们开启事务模式之后,RabbitMQ 生产者发送消息会有这样几个步骤:
- (1) 客服端发出请求,将通信管道设置为事务模式
- (2) 服务端给出回复,同意将通信管道设置为事务模式
- (3) 客户端发送消息
- (4) 客户端提交事务
- (5) 服务端给出响应,确认事务提交
6.两步 RabbitMQ 都有提供解决的方案。那么,如果确保消息成功到达 RabbitMQ 呢?
-
(1) 开启事务机制
-
(2) 发送方确认机制
注意:这是两种不同的方案,不可以同时开启,只能二选其一。如果同时开启,则会报错
消息持久化
RabbitMQ支持将消息持久化到磁盘,即使RabbitMQ服务器宕机或重启,消息也不会丢失。在发布消息时,可以设置消息的持久化标志,这样消息就会被写入磁盘中,而不是仅仅保存在内存中。
其中我们交换机和队列
都要设置对应的持久化,在创建时,我们会设置持久化参数。同时为了避免单点故障,RabbitMQ应该做成集群模式,以免一台机器损坏,出现数据丢失的问题。
消费端确认和重试
对于消费者来说,该配置不仅起到了连接作用,同时也启动了重试机制,默认重试 2 次。
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.listener.simple.retry.enabled=true # 开启消费者重试机制
spring.rabbitmq.listener.simple.retry.max-attempts=3 # 最大重试次数
spring.rabbitmq.listener.simple.retry.initial-interval=3000 # 重试时间间隔
确认的话需要我们做签收操作
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
死信队列配置
被拒收的消息,或者是过期的消息,或者是队列已经满了的消息,都会进入死信队列,死信队列有一个默认的生效时间,如果没有做任务配置,到了时间会自动删除消息。
java中配置死信队列
/**
* 延迟队列,又叫死信队列 “
*
* @return
*/
@Bean
public Queue delayQueue() {
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "test_ex");
arguments.put("x-dead-letter-routing-key", "test_ex.dead");
// 消息过期时间 2分钟
arguments.put("x-message-ttl", 60000);
return new Queue("delayQueue", true, false, false, arguments);
}
以上参数说明:
- x-dead-letter-exchange:死信队列过期以后往指定交换机发
- x-dead-letter-routing-key:死信队列过期指定路由键
- x-message-ttl: 死信队列过期时间,单位是毫秒
RabbitMQ解决消息积压问题
RabbitMQ消息积压问题通常是由于消费者无法及时消费消息或消费速度过慢或发送者流量太大导致的。以下是一些解决方法:
1.增加消费者数量: 可以通过增加消费者的数量来提高消费速度,减少消息积压。可以通过添加更多的消费者进程或者增加消费者的线程数来实现。
2.调整消费者的QoS参数: 消费者的QoS参数可以控制消费者每次从RabbitMQ服务器获取的消息数量,以及未确认消息的最大数量。可以适当调整这些参数,以减少消息积压。
3.设置消费者的超时时间: 可以设置消费者的超时时间,如果消费者在指定的时间内没有消费消息,就将消息重新投递到队列中,以便其他消费者消费。
4.增加队列的容量: 可以增加队列的容量,以便存储更多的消息。但是,如果队列容量过大,可能会导致内存占用过高,影响系统的性能。
5.使用死信队列: 可以将未能及时消费的消息转移到死信队列中,以便后续处理。可以设置死信队列的超时时间,以便在一定时间内处理这些消息。
6.监控和调整: 可以使用RabbitMQ的监控工具来监控队列的状态和消费者的消费速度,及时发现并解决消息积压问题。
RabbitMQ解决消息重复消费问题
RabbitMQ提供了消息去重的机制来解决消息重复消费的问题。具体来说,可以使用以下两种方式来实现:
1.消息去重插件
RabbitMQ提供了一个消息去重插件,可以通过在RabbitMQ节点上安装该插件来实现消息去重。该插件会在消息传输之前对消息进行唯一性校验,如果消息已经被消费过,那么该消息将被丢弃。该插件的实现原理是将已经消费过的消息ID保存在内存中,当新消息到达时,会检查该消息ID是否已经存在,如果存在则丢弃该消息。
2.消息幂等性设计
消息幂等性是指对于同一条消息,无论消费多少次,最终的结果都是一致的。因此,可以通过在消息的生产者或消费者端实现消息幂等性来解决消息重复消费的问题。具体实现方式包括:
- 在消息生产者端,为每条消息生成唯一的ID,将该ID与消息一起发送到RabbitMQ,消费者在消费消息时根据该ID进行幂等性校验;
- 在消息消费者端,记录已经消费过的消息ID,当重复消费同一条消息时,直接忽略该消息。
需要注意的是,实现消息幂等性需要考虑业务逻辑的复杂性和消息处理的性能。如果业务逻辑比较简单,可以通过对消息进行去重来解决问题;如果业务逻辑比较复杂,可以通过实现消息幂等性来保证消息的正确性。