第十七章 : Spring Boot 集成RabbitMQ(一)
前言
本章介绍RabbitMQ的核心概念和消息中间件中非常重要的协议——AMQP协议,然后介绍Direct、Topic、Headers、Fanout等交换机的作用和特点;RabbitMQ的五种消息发送模式-简单队列、工作队列、发布订阅、路由、广播;以及RabbitMQ的消息确认机制。
Springboot 版本 2.3.2.RELEASE ,RabbitMQ 3.9.11,Erlang 24.2
RabbitMQ是什么 ?
RabbitMQ基于开源的AMQP协议实现,服务器端用Erlang语言编写,支持多种客户端,如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP、AJAX等。
RabbitMQ优势
可靠性(Reliability):使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。
灵活的路由(Flexible Routing):在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,既可以将多个Exchange绑定在一起,又可以通过插件机制实现自己的Exchange。
消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
高可用(Highly Available Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出现问题的情况下队列仍然可用。
多种协议(Multi-Protocol):支持多种消息队列协议,如STOMP、MQTT等。
多种语言客户端(Many Clients):几乎支持所有常用语言,比如Java、.NET、Ruby等。
管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。
跟踪机制(Tracing):如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。
插件机制(Plugin System):提供了许多插件进行扩展,也可以编辑自己的插件。
RabbitMQ作为流行的消息中间件,实现了应用程序的异步和解耦,同时也能起到消息缓冲、消息分发的作用,在易用性、扩展性、高可用性等方面表现不俗。
AMQP
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是应用层协议的开放标准,是为面向消息的中间件设计。基于此协议的客户端可与消息中间件传递消息,从而不受产品、开发语言等条件限制。
消息中间件主要用于组件之间的解耦,消息发送者无须知道消息使用者的存在,反之亦然。与其他消息队列协议不同的是,AMQP中增加了Exchange和Binging角色。生产者把消息发布到Exchange
上,消息最终到达队列并被消费者接收;而Binding决定Exchange的消息应该发送到哪个队列。
与其他消息队列协议不同的是,AMQP中增加了Exchange
和Binging
角色。生产者把消息发布到Exchange上,消息最终到达队列并被消费者接收;而Binding决定Exchange的消息应该发送到哪个队列。
消息路由传递的过程:生产者首先将消息发送到Exchange,通过Exchange转发到绑定的各个消息队列上,然后消费者从队列中读取消息。
RabbitMQ如何保证数据可靠性?
-
持久化机制:RabbitMQ支持消息持久化,可以将消息保存在磁盘中,确保即使在RabbitMQ服务重启后,消息也不会丢失。持久化队列在RabbitMQ中被管理,而非持久化队列不会被保存在磁盘中,Rabbit服务重启后队列就会消失。
-
事务机制:RabbitMQ支持事务,当生产者将一个持久化消息发送给服务器时,如果服务器崩溃,没有持久化该消息,生产者无法获知该消息已经丢失。
-
发送端的消息确认机制:生产者可以通过设置消息的确认模式(confirm mode),在消息发送到RabbitMQ后等待RabbitMQ的确认响应,以确保消息成功发送到RabbitMQ。
-
消费端的消息确认机制:消费者可以通过设置消息的确认模式(ack mode),在成功处理消息后向RabbitMQ发送确认响应,以确保消息被成功消费。
-
异常捕获机制:RabbitMQ提供了异常捕获机制,可以在消息发送或接收过程中出现异常时进行捕获和处理,避免因异常导致消息丢失或未处理。
-
高可用集群:RabbitMQ支持高可用集群,多个RabbitMQ节点可以组成一个集群,共同分担消息存储和处理的任务,提高系统的可靠性和稳定性。
-
限流机制:在消费端,RabbitMQ提供了限流机制,可以限制消费者接收消息的速度,避免因消息处理速度过快导致消息丢失或处理失败。
-
幂等性:RabbitMQ支持幂等性操作,即对同一个队列进行多次相同的操作只会产生一次效果,避免因重复操作导致数据不一致。
RabbitMQ组件功能
RabbitMQ中有几个非常重要的组件:服务实体(Broker)、虚拟主机(Virtual Host)、交换机(Exchange)、队列(Queue)和绑定(Binging)等,如图12-2所示。
交换机
交换机(Exchange)的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,只是把消息分发给各自的队列。但是我们给交换机发送消息,**它怎么知道给哪个消息队列发送呢?**这里就要用到RoutingKey
和BindingKey
。BindingKey
是交换机和消息队列绑定的规则描述。RoutingKey
是消息发送时携带的消息路由信息描述。当消息发送到交换机(Exchange)时,通过消息携带的RoutingKey
与当前交换机所有绑定的BindingKey
进行匹配,如果满足匹配规则,则往BindingKey
所绑定的消息队列发送消息,这样就解决了向RabbitMQ发送一次消息,可以分发到不同的消息队列,实现消息路由分发的功能。
交换机有Direct
、Topic
、Headers
和Fanout
四种消息分发类型。不同的类型在处理绑定到队列方面的行为时会有所不同。
1)Direct:其类型的行为是“先匹配,再发送”,即在绑定时设置一个BindingKey,当消息的RoutingKey匹配队列绑定的BindingKey时,才会被交换机发送到绑定的队列中。
2)Topic:按规则转发消息(最灵活)。支持用“”或“#”的模式进行绑定。“”表示匹配一个单词,“#”表示匹配0个或者多个单词。比如,某消息队列绑定的BindingKey为“*.user.#”时,能够匹配到RoutingKey为usd.user和eur.user.db的消息,但是不匹配user.hello。
3)Headers:设置header attribute参数类型的交换机。根据应用程序消息的特定属性进行匹配,这些消息可能在绑定key中标记为可选或者必选。
4)Fanout:转发消息到所有绑定队列(广播)。将消息广播到所有绑定到它的队列中,而不考虑队列绑定的BindingKey的值。
消息发送模式
-
基本消息模型 (简单队列) :这是RabbitMQ的核心,生产者不断向队列中添加新的消息,消费者则不断的从队列中获取并且处理这些消息。
-
工作消息模型 (Work模式) :这种模型下,当消费者无法处理消息时,会将该消息重新放回队列中,以便其他消费者进行处理。
-
一个生产者、2个消费者
-
一个消息只能被一个消费者获取
-
-
发布/订阅模型 (Direct模式) :在发布/订阅模型中,生产者将消息发送到交换机,然后交换机将消息路由到一个或多个队列,最后消费者从队列中获取并处理这些消息。
- 1个生产者,多个消费者
- 每一个消费者都有自己的一个队列
- 生产者没有将消息直接发送到队列,而是发送到了交换机
- 每个队列都要绑定到交换机
- 生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
-
路由模型 ( Fanout) :路由模型和发布/订阅模型相似,但不同之处在于交换机根据路由键来决定如何路由消息。
-
Fanout,也称为广播模式;
-
可以有多个消费者;
-
每个消费者有自己的queue(队列)
-
每个队列都要绑定到Exchange(交换机)
-
生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
-
交换机把消息发送给绑定过的所有队列
-
队列的消费者都能拿到消息。实现一条消息被多个消费者消费
-
-
主题模型 ( Topic):在这种模型中,交换机将根据路由键和绑定键(主题)来决定如何路由消息。
-
通配符模式
-
同一个消息被多个消费者获取。
-
一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。
-
消息确认机制
虽然使用RabbitMQ可以降低系统的耦合度,提高整个系统的高并发能力,但是也使得业务变得复杂,可能造成消息丢失,导致业务中断的情况。
消息确认的场景
使用RabbitMQ很可能造成消息丢失,导致业务中断的情况,
例如:
-
生产者发送消息到RabbitMQ服务器失败。
-
RabbitMQ服务器自身故障导致消息丢失。
-
消费者处理消息失败。
针对上面的情况,RabbitMQ提供了多种消息确认机制,确保消息的正常处理,主要有生产者消息确认机制、Return消息机制、消费端ACK和Nack机制3种消息确认模式。
生产者消息确认机制
生产者消息的确认是指生产者发送消息后,如果Broker收到消息,则会给生产者一个应答。生产者接收应答,用来确定这条消息是否正常地发送到Broker,这种方式也是消息可靠性发送的核心保障。如图12-21所示,当生产者发送消息到MQ Broker时,Broker会发送一个确认(Confirm),通知发送端已经收到此消息。
Return机制
我们知道,消息生产者通过指定一个Exchange和routingKey将消息送达某一个队列中,然后消费者监听队列进行消费处理操作。在某些情况下,如果在发送消息的时候,当前的Exchange不存在或者指定的routingKey路由不到,这个时候如果需要监听这种不可达的消息,可以使用RabbitMQ提供的Return机制处理一些不可路由的消息,如图12-24所示。
消费端ACK和NACK机制
消费者在处理消息时,由于业务异常,我们可以进行日志的记录,然后进行补偿。但是,如果由于服务器宕机等严重问题无法记录日志,如何确保消息被正确处理呢?这就需要消费端ACK和NACK机制,手工进行ACK确认,保障消费者成功处理消息,把未成功处理的消息再次发送,直到消息处理成功。RabbitMQ消费端的确认机制分为3种:none
、manual
、auto
(默认)。
none:表示没有任何应答会被发送。
manual:表示监听者必须通过调用channel.basicAck()来告知消息被处理。
① channel.basicAck(long,boolean):确认收到消息,消息将从队列中被移除,为false时只确认当前一个消费者收到的消息,为true时确认所有消费者收到的消息。
② channel.basicNack(long,boolean,boolean):确认没有收到消息,第一个boolean表示是一个消费者还是所有的消费者,第二个boolean表示消息是否重新回到队列,为true时表示重新入队。
③ channel.basicReject(long,boolean):拒绝消息,requeue=false表示消息不再重新入队,如果配置了死信队列,则消息进入死信队列。消息重回队列时,该消息不会回到队列尾部,仍在队列头部,这时消费者又会接收到这条消息,如果想让消息进入队列尾部,需确认消息后再次发送消息。
**auto:**表示自动应答,除非MessageListener抛出异常,这是默认配置方式。
① 如果消息成功处理,则自动确认。
② 当发生异常抛出AmqpRejectAndDontRequeueException时,则消息会被拒绝且不重新进入队列。
③ 当发生异常抛出ImmediateAcknowledgeAmqpException时,则消费者会被确认。
仍在队列头部,这时消费者又会接收到这条消息,如果想让消息进入队列尾部,需确认消息后再次发送消息。
**auto:**表示自动应答,除非MessageListener抛出异常,这是默认配置方式。
① 如果消息成功处理,则自动确认。
② 当发生异常抛出AmqpRejectAndDontRequeueException时,则消息会被拒绝且不重新进入队列。
③ 当发生异常抛出ImmediateAcknowledgeAmqpException时,则消费者会被确认。
④ 当抛出其他的异常时,则消息会被拒绝,且requeue=true时会发生死循环,可以通过setDefaultRequeueRejected(默认是true)设置抛弃消息。