消息队列的特性
- durable:队列持久化。如果设置持久化,那么无论RabbitMQ在关闭时,就会将队列存储到本地磁盘,无论宕机还是重启,队列也不会删除;如果设置不持久化,那么在RabbitMQ关闭时,就会将队列删除。
- exclusive:独占队列。如果设置独占,那么当前队列只允许预先设置的Connection访问;如果设置不独占,则所有Connection都可以访问。
- autoDelete:自动删除队列。如果设置自动删除,那么当消费者消费完该队列的消息时,该队列立刻被删除;如果设置不自动删除,那么当消费者消费完该队列的消息时,该队列还会继续被保留。
消息自动确认机制
如果开启消息自动确认,那么一旦MQ把消息发送给消费者,那么该消息就会立即标记为删除。又因为在默认情况下,在MQ把消息发送给对应Consumer时,是一次性把属于该Consumer的所有消息发送到对应的通道内。所有如果其中一个消费者正在消费一个长期的任务却只完成的一部分就死了,那么我们就会丢失正在消费的消息和发送给该消费者中通道内尚未消费的所有消息。
如上模型,如果是循环的分发方式,Consumer-1一次性被分到的消息为1,3,5号消息,Consumer-2一次性被分到的消息为2,4,6号消息。如果Consumer-1在消费3号消息时出错宕机了,那么正在消费的3号消息和通道中还未消费的5号消息就会全部丢失。如果Consumer-2在消费2号消息时宕机了,那么正在消费的2号消息和通道中还未消费的4号、6号消息就会全部丢失。
为了保证业务数据的完整性,我们需要修改两个点:
-
在MQ把消息发送给消费者后,该消息不会标记为删除。
-
当某一消费者在宕机时,能够将正在消费的消息和还未消费的消息交给其他消费者继续消费。
第一点如果要实现,就要关闭消息自动确认机制。目的是为了保证在消费者消费消息宕机时,当前消息不会丢失。在关闭消息自动确认后,我们需要在每一个消息被正常消费后,手动确认该消息,这样该消息才会被标记为删除,MQ才会发送下一个消息给该Consumer。
第二点如果要实现,那么就不能按照默认的那样,MQ一次性将消息发送给Consumer,需要设置MQ每一次只能发给Consumer一个消息。目的是为了保证在消费者消费消息时,对应通道内没有其他消息,以至于如果该消费者宕机了,本应分给该消费者的消息能够被MQ分配给其他消费者消费。
消息模型
RabbitMQ在3.5
版本之前只支持五种消息发布模型,分别是:“Hello World!”,Work queues,Publish/Subscribe,Routing,Topics。在3.7
版本支持六种消息发布模型(新增RPC),在3.9
版本支持七种消息发布模型(新增Publisher Confirms)。
RPC模型和Publisher Confirms模型本文中不会描述,本文只描述常见的五种消息发布模型,足够应对多种业务场景。
生产者和对应的一个或多个消费者必须在同一个用户下,连接同一个Virtual host(虚拟主机)、Channel(通道)、Exchange(交换机)和MQ(消息队列),才能相互进行通信。
1. "Hello World!"模型
点对点模型,一个生产者、一个消息队列和一个消费者。
生产者直接将消息发送进消息队列,消费者监听消息队列,不断地从中获取消息并消费。
点对点模型只能处理一些简单的业务,如果消息比较多,消费者处理消息比较耗时时,那么生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积的越来越多。
2. Work queues模型
任务模型,一个生产者、一个消息队列和多个消费者。
生产者直接将消息发送进消息队列,多个消费者监听同一个消息队列,共同消费消息队列中的消息。
队列中的消息一旦被某一个消费者消费,就会消失,因此消息是不会被重复消费的。
在Work queues模型中,MQ绑定了多个消费者,RabbitMQ默认采用了循环的方式分发消息。
循环:RabbitMQ将按顺序将每个消息发送到下一个使用者,平均每个消费者都会收到相同数量的消息。例如,有编号1-10的消息在MQ中,同时存在两个消费者,则消费者A分到的消息为1,3,5,7,9,消费者B分到的消息为2,4,6,8,10。
因为RabbitMQ默认是以循环的方式分发消息,所以如果消费者中有消费能力较弱的消费者,那么也会造成消息的堆积越来越多,拖慢系统的运行速度。RabbitMQ还提供了能者多劳的分发消息的方式,消费能力强的消费者就会被分到更多的消息,消费能力弱的消费者就会被分到更少的消息,该方式需要作额外设置。
3. Publish/Subscribe模型
广播模型,一个生产者、一个fanout交换机、多个消息队列和多个消费者,一个交换机绑定多个队列,一个消息队列绑定一个消费者。
生产者首先将消息先发给交换机,交换机将消息发送给所有队列,队列接收到消息后再发送给对应绑定的消费者,实现一条消息被所有消费者消费。
根据AMQP协议,前两种模型也需要连接交换机,之所以没有在模型图中显示,是因为前两种模型使用的是RabbitMQ中的default交换机。
4. Routing模型
路由模型,一个生产者、一个direct交换机、多个消息队列和多个消费者,一个交换机绑定多个队列,一个消息队列绑定一个消费者。
与广播模型的不同的是,广播模型的交换机与队列之间是任意绑定,也就是交换机绑定了所有队列。在路由模型中,交换机需要绑定一个RoutingKey(路由key)。队列也需要指定一个RoutingKey,交换机不再把消息发送给所有队列,而是根据队列的RoutingKey进行判断,只有队列的RoutingKey和交换机的RoutingKey完全一致,交换机才把消息发送给该队列进行消费。
例如上述模型,所有MQ绑定了 “info” 的RoutingKey,只有第一个MQ还另外绑定了 “error” 的RoutingKey。那么当生产者发送RoutingKey为 “info” 的消息时,交换机会将该消息发送给所有MQ;当生产者发送RoutingKey为 “error” 的消息时,交换机只会将该消息发送给第一个MQ。
注意:RoutingKey一般由一个或者多个单词组成,如果是以多个单词组成,单词之间以 “.” 分割。
5. Topics模型
动态路由模型,一个生产者、一个topic交换机、多个消息队列和多个消费者,一个交换机绑定多个队列,一个消息队列绑定一个消费者。
在路由模型中,如果队列需要绑定一个RoutingKey,那么就要单独设置一次RoutingKey,如果需要绑定多个RoutingKey,那么就要单独设置多次不同的RoutingKey,这样以来,会造成代码冗余,及其不方便。Topics模型提供了动态路由匹配规则,也就是在路由模型的基础上允许在RoutingKey中使用通配符,这样设置一个使用了统配符的RoutingKey就可以匹配多个符合规则的具体的RoutingKey了,从而不需要设置多次,大大简化了代码。
RabbitMQ中提供了两种通配符:
- *:替换 1 个单词。
- #:替换 0 个或者多个单词。
例如上述模型,第一个MQ绑定了 “*.rabbit.*” 的RoutingKey,最后一个MQ绑定了 “*.rabbit.#” 的RoutingKey。那么当生产者发送RoutingKey为 “user.rabbit.insert” 的消息时,交换机会将该消息发送给第一个MQ;当生产者发送RoutingKey为 “user.rabbit.insert.all” 的消息时,交换机会将该消息发送给最后一个MQ;当生产者发送RoutingKey为 “user.rabbit” 的消息时,交换机也会将该消息发送给最后一个MQ。