第一章 RabbitMQ简介
1.1 什么是消息中间件
消息(message)是指在应用间传递的数据。
消息队列中间件(Message Queue Middleware,简称MQ),是指提供平台无关的、高效可靠的消息传递机制的中间件。
MQ通常又两种传递模式:
- 点对点模式(P2P, Point-To-Point):即一对一模式。该模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输成为可能。
- 发布/订阅模式(Pub/Sub):即一对多模式。发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题 (topic) ,主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用 。
面向消息的中间件(Message Oriented Middleware, 简称MOM)提供了以松散藕合的灵活方式集成应用程序的一种机制。它们提供了基于存储和转发的应用程序之间的异步数据发送,即应用程序彼此不直接通信,而是与作为中介的消息中间件通信 。 消息中间件提供了有保证的消息发送,应用程序开发人员无须了解远程过程调用 ( RPC) 和网络通信协议的细节。
1.2 消息中间件的作用
总体的来说,消息中间件的作用可以概括如下:
- 解耦:消息中间件在处理过程中插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,这允许你独立的扩展和修改两边的处理过程,只要确保它们遵守同样的接口约束即可,这样就把消息收发方进行了解耦。
- 冗余(存储):有些情况下,处理消息的过程会失败,消息中间件可以将数据进行持久化直至它们已经完全被处理,通过这一方式规避了数据丢失风险。
- 扩展性:因为消息中间件解耦了消息收发方的处理过程,所以使得消息收发方有了更好的扩展性。
- 削峰:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费 。 使用消息中间件能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩惯
- 可恢复性: 当系统一部分组件失效时,不会影响到整个系统。当一个处理消息的进程挂掉后,由于消息中间件对消息做了持久化处理,所以其消息可以在系统恢复后继续处理。
- 顺序保证: 在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性。
- 缓冲:在任何重要的系统中,都会存在需要不同处理时间的元素。消息中间件通过一个缓冲层来帮助任务最高效率地执行。
- 异步通信:在很多时候应用不想也不需要立即处理消息 。 消息中间件提供了异步处理机制,允许应用把一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理 。
1.3 RabbitMQ的起源
RabbitMQ 是采用 Erlang 语言实现 AMQP (Advanced Message Queuing Protocol ,高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。
第2章 RabbitMQ入门
2.1 相关概念介绍
2.1.1 名词解释
RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息,其整体架构入下图所示:
- Producer:生产者,就是投递消息的一方。生产者创建消息,然后发布到RabbitMQ。
- Consumer: 消费者,就是接受消息的一方。消费者连接至RabbitMQ,并订阅响应队列。当队列中存入消息时,消费者就会进行消费。
- Message: 消息。消息一般包含两部分:标签(Label)和消息体(PayLoad)。标签包含用来表述这条消息的相关信息,比如路由的名称或路由键;消息体一般是一个带有业务逻辑结构的数据。
- Broker::消息中间件的服务节点。对于 RabbitMQ 来说, 一个 RabbitMQ Broker 可以简单地看作一个RabbitMQ 服务节点 ,或者 RabbitMQ 服务实例。
- Queue:队列,是RabbitMQ的内部对象,用于存储消息。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊 (Round-Robin ,即轮询)给多个消费者进行处理。RabbitMQ 中消息都只能存储在队列中,这一点和 Katka 这种消息中间件相反 。 Katka 将消息存储在 topic (主题)这个逻辑层面,而相对应的队列逻辑只是 topic 实际存储文件中的位移标识。另外RabbitMQ不支持队列层面的广播消费。
- Exchange:交换器。RabbitMQ中,生产者将消息发送到交换器,由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。RabbitMQ中的交换器有四种类型,不同的类型有着不同的路由策略。
- RoutingKey:路由键。生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键 (BindingKey)联合使用才能最终生效。
- Binding:绑定。RabbitMQ 中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键 ( BindingKey ) ,这样 RabbitMQ 就知道如何正确地将消息路由到队列了。
- BindingKey:绑定键,在绑定的时候使用的路由键,其实也属于路由键的一种。可以这么理解,在使用绑定的时候,其中需要的路由键是BindingKey,在发送消息的时候,其中需要的路由键是 RoutingKey。
2.1.2 交换器类型
RabbitMQ常用的交换器类型有fanout、direct、topic 、headers这四种:
- fanout:会把所有发到该交换器的消息路由到所有与该交换器绑定的队列中。
- direct:它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。
- topic:direct类型的交换器路由规则是完全匹配BindingKey和RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic 类型的交换器在匹配规则上进行了扩展,使其可以支持模糊匹配。
- headers:headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers 属性进行匹配。在绑定队列和交换器时制定一组键值对 ,当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers (也是一个键值对的形式) ,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列 。 headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
对于topic模式,其扩展的匹配模式如下:
- RoutingKey和BindingKey均为一个点号分隔的字符串,如
com.rabbitmq.client
- BindingKey 中可以存在两种特殊字符串
*
和#
,用于做模糊匹配,其中#
用于匹配一个单词,*
用于匹配多规格单词
2.1.3 Connection 与 Channel
我们知道无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP连接,也就是Connection。一旦 TCP 连接建立起来,客户端紧接着可以创建一个 AMQP 信道 (Channel) ,每个信道都会被指派一个唯一的 ID。信道是建立在Connection之上的虚拟连接,RabbitMQ 处理的每条AMQP指令都是通过信道完成的,其大致结构如下图:
我们完全可以直接使用Connection就能完成信道的工作,为什么还要引入信道呢?这是对于操作系统而言,建立和销毁TCP连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。 RabbitMQ 采用类似 NIO(Non-blocking I/O)的做法,选择TCP连接复用,不仅可以减少性能开销,同时也便于管理。每个线程把持一个信道,所以信道复用了Connection的TCP连接。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。信道在 AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面展开的。
第三章 客户端开发向导
3.1 连接RabbitMQ
连接RabbitMQ的过程通常分为创建连接和创建信道两部分,你可以通过指定配置进行连接,也可以使用URI的方式进行连接:
import (
"testing"
"github.com/streadway/amqp"
)
func TestRabbitMQ(t *testing.T) {
// 建立连接
conn, err := amqp.DialConfig("amqp://account:password@addrs/virtualHost", amqp.Config{
Vhost: "/K7Game/SNG",
})
if err != nil {
t.Fatal(err)
}
// 创建信道
ch, err := conn.Channel()
if err != nil {
t.Fatal(err)
}
}
Connection 可以用来创建多个Channel实例,但是Channel实例不能在线程问共享,应用程序应该为每一个线程开辟一个Channel。
3.2 使用交换器和队列
交换器和队列是 AMQP 中 highlevel 层面的构建模块,应用程序需确保在使用它们的时候就已经存在了。
你可以通过RabbitMQ配置相应的交换器和队列,也可以在代码中调用 api 声明它们:
import (
"testing"
"github.com/streadway/amqp"
)
func TestRabbitMQ(t *testing.T) {
// 建立连接
conn, err := amqp.DialConfig("amqp://account:password@addrs/virtualHost", amqp.Config{
Vhost: "/K7Game/SNG",
})
if err != nil {
t.Fatal(err)
}
// 创建信道
ch, err := conn.Channel()
if err != nil {
t.Fatal(err)
}
exchageName := "exchange.test"
queueName := "queue.test"
routingKey := "test"
// 声明交换器
if err := ch.ExchangeDeclare(exchageName, "fanout", true, false, false, true, amqp.Table{}); err != nil {
t.Fatal(err)
}
// 声明队列
if _, err := ch.QueueDeclare(queueName, true, false, false, false, amqp.Table{}); err != nil {
t.Fatal(err)
}
// 将队列和交换器绑定
if err := ch.QueueBind(queueName, routingKey, exchageName, false, amqp.Table{}); err != nil {
t.Fatal(err)
}
}
声明时注意交换器与队列名不能使用amq.*
样式的名字,因为这是RabbitMQ内置交换器与队列所使用的名字格式。
如果尝试声明一个已经存在的交换器或者队列 , 只要声明的参数完全匹配现存的交换器或者队列,RabbitMQ 就可以什么都不做 ,并成功返回;如果声明的参数不匹配则会抛出异常。
3.3 发送消息
使用Publish
接口发送消息,接口形式如下:
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error {
各参数如下:
- exchange: 交换器名字
- key: 路由键
- mandatory:详见4.1
- immediate: 详见4.1
- Publishing: 消息的基本属性集,比如Headers、ContentType、DeliveryMode、Body等
示例如下:
import (
"testing"
"github.com/streadway/amqp"
)
func TestRabbitMQ(t *testing.T) {
// 建立连接
conn, err := amqp.DialConfig("amqp://account:password@addrs/virtualHost", amqp.Config{
Vhost: "/K7Game/SNG",
})
if err != nil {
t.Fatal(err)
}
// 创建信道
ch, err := conn.Channel()
if err != nil {
t.Fatal(err)
}
exchageName := "exchange.test"
queueName := "queue.test"
routingKey := "test"
// 声明交换器
if err := ch.ExchangeDeclare(exchageName, "fanout", true, false, false, true, amqp.Table{}); err != nil {
t.Fatal(err)
}
// 声明队列
if _, err := ch.QueueDeclare(queueName, true, false, false, false, amqp.Table{}); err != nil {
t.Fatal(err)
}
// 将队列和交换器绑定
if err := ch.QueueBind(queueName, routingKey, exchageName, false, amqp.Table{}); err != nil {
t.Fatal(err)
}
// 发送消息
ch.Publish(exchageName, routingKey, false, false, amqp.Publishing{
ContentType: "application/json",
Body: []byte("body"),
})
}
3.4 消费消息
RabbitMQ 的消费模式分两种:推(Push)模式和拉(Pull)模式。
3.4.1 推模式
推模式指RabbitMQ服务器主动将消息推送至消费端 ,消费端调用channel.Consume
接口:
...
// 推模式消费消息
deliveries, err := ch.Consume(queueName, "", true, false, false, false, amqp.Table{})
if err != nil {
t.Fatal(err)
}
for d := range deliveries {
t.Logf(
"got %dB delivery: [%v] %q",
len(d.Body),
d.DeliveryTag,
d.Body,
)
}
3.4.2 拉模式
拉模式值消费端主动去RabbitMQ服务器拉取消息,消费端调用channel.Get
接口:
...
delivery, _, err := ch.Get(queueName, true)
if err != nil {
t.Fatal(err)
}
t.Logf(
"got %dB delivery: [%v] %q",
len(delivery.Body),
delivery.DeliveryTag,
delivery.Body,
)
3.4.3 消费端的确认和拒绝
3.4.3.1 消息确认
为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制( message acknowledgement) 。消费者在订阅队列时,可以指定autoAck参数。
当autoAck等于false时,RabbitMQ 会等待消费者显式地回复确认信号后才移除消息, 如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者己经断开连接,则 RabbitMQ 会安排该消息重新进入队列。
当autoAck等于true时,RabbitMQ 会自动把发送出去的消息置为确认,然后将其删除,而不管消费者是否真正地消费到了这些消息。
RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否己经断开,这么设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。
3.4.3.2 消息拒绝
RabbitMQ 在 2 .0.0 版本开始引入了 Basic.Reject
这个命令,消费者客户端可以调用与其对应的 Delivery.Reject(requeue bool)
方法来告诉 RabbitMQ 拒绝这个消息:
...
delivery, _, err := ch.Get(queueName, true)
if err != nil {
t.Fatal(err)
}
delivery.Reject(true)
如果 requeue 参数设置为 false,则 RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。
3.5 关闭连接
在应用程序使用完之后,需要关闭连接,释放资源;显式地关闭 Channel 是个好习惯,但这不是必须的:
...
ch.Close()
conn.Close()
在 Connection 关闭的时候,Channel 也会自动关闭。
Connection关闭前会确保其缓存中的所有消息已经发送或者消费完毕。
第四章 RabbitMQ进阶
4.1 消息何去何从
4.1.1 mandatory参数
当 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会将消息返回给生产者 。当 mandatory 参数设置为 false 时,出现上述情形,则消息直接被丢弃。
4.1.2 immediat参数
当 imrnediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中,当与路由键匹配的所有队列都没有消费者时 ,该消息会返回至生产者。
4.2 过期时间(TTL)
TTL, Time to Live 的简称,即过期时间。RabbitMQ 可以对消息和队列设置 TTL 。
设置消息的TTL
目前有两种方法可以设置消息的 TTL。
第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间,通过队列属性设置消息 TTL 的方法是在 channel.queueDeclare 方法中加入x-message-ttl
参数实现的,这个参数的单位是毫秒:
...
if _, err := ch.QueueDeclare(queueName, true, false, false, false, amqp.Table{
"x-message-ttl": 1000,
}); err != nil {
t.Fatal(err)
}
第二种方法是对消息本身进行单独设置,每条消息的 TTL 可以不同, 通过在channel.Publish
方法中加入 expiration
的属性参数,单位为毫秒:
...
ch.Publish(exchageName, routingKey, false, false, amqp.Publishing{
Expiration: "1000",
ContentType: "application/json",
Body: []byte("body"),
})
如果不设置TTL则表示此消息不会过期 ;如果将TTL设置为 0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。
设置队列的TTL
通过 channel.QueueDeclare
方法中的x-expires
参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并
且在过期时间段内也未调用过Get命令:
...
// 声明队列
if _, err := ch.QueueDeclare(queueName, true, false, false, false, amqp.Table{
"x-expires": 1000,
}); err != nil {
t.Fatal(err)
}
4.3 死信队列
DLX ,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。
消息变成死信一般是由于以下几种情况:
- 消息被拒绝,且requeue参数为false
- 消息过期
- 队列达到最大长度
DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定 ,实际上就是设置某个队列的属性。当这个队列中存在死信时 , RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去 ,进而被路由到另一个队列,即死信队列。通过在channel.QueueDeclare
方法中设置x-dead-letter-exchange
参数来为这个队列添加DLX,也可以通过设置x-dead-letter-routing-key
来为DLX指定路由键:
...
// 声明队列
if _, err := ch.QueueDeclare(queueName, true, false, false, false, amqp.Table{
"x-dead-letter-exchange": "dlx exchange",
"x-dead-letter-routing-key": "dlx-routing-key",
}); err != nil {
t.Fatal(err)
}
死信消息的流转流程如下:
对于 RabbitMQ 来说,DLX 是一个非常有用的特性。后续分析程序可以通过分析死信队列中的消息来判断消息不能被正确消费的远影,进而可以改善和优化系统。
4.4 延迟队列
在 AMQP 协议中,或者 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的 DLX 和 TTL 模拟出延迟队列的功能。
生产者通过 exchange.normal 这个交换器将发送的消息存储在 queue.normal 这个队列中,并为 queue.narmal队列中的消息设置TTL。消费者订阅的并非是 queue.normal 这个队列,而是 queue.dlx 这个队列 。当消息从 queue.normal 这个队列中过期之后被存入 queue.dlx 这个队列中,消费者就恰巧消费到了延迟 一定时间的这条消息,其大致结构如下:
4.5 优先级队列
优先级队列 ,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。可以通过设置队列的 x-max-priority
参数来实现:
...
// 声明队列
if _, err := ch.QueueDeclare(queueName, true, false, false, false, amqp.Table{
"x-max-priority": 5,
}); err != nil {
t.Fatal(err)
}
通过后台管理页面可以看到优先级队列具有Pri
标记:
需要注意的是,如果在消费者的消费速度大于生产者的速度且Broker中没有消息堆积的情况下 ,对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消息就被消费者消费了,那么就相当于Broker中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。
4.6 RPC 实现
RPC ,是 Remote Procedure Call 的简称,即远程过程调用。
一般在 RabbitMQ 中进行 RPC 是很简单。客户端发送请求消息,服务端回复响应的消息 。为了接收响应的消息,我们需要在请求消息中发送一个回调队列,也可以使用默认的队列 :
4.7 持久化
持久化可以提高 RabbitMQ 的可靠性, 以防在异常情况(重启、关闭、右机等)下的数据丢失。RabbitMQ
的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化 。
交换器的持久化是通过在声明队列是将 durable 参数置为 true 实现的,交换器的持久化是通过在声明队列是将 durable 参数置为 true 实现的。
队列的持久化是通过在声明队列时将 durable 参数置为 true 实现的,如果队列不设置持久化,那么在 RabbitMQ 服务重启之后,相关队列的元数据会丢失,此时消息也会丢失。
队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失 , 需要将其设置为持久化。通过将消息的投递模式设置为 2 即可实现消息的持久化。
可以将所有的消息都设直为持久化,但是这样会严重影响 RabbitMQ 的性能。写入磁盘的速度比写入内存的速度慢得不只一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做一个权衡。
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?答案是否定的 。比如在持久化的消息正确存入 RabbitMQ 之后,还需要有一段时间才能存入磁盘之中。 RabbitMQ 并不会为每条消息都进行同步存盘的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内RabbitMQ 服务节点发生了岩机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。这个问题怎么解决呢?这里可以引入 RabbitMQ 的镜像队列机制相当于配置了副本,如果主节点在此特殊时间内挂掉,可以自动切换到从节点
这样有效地保证了高可用性,除非整个集群都挂掉。虽然这样也不能完全保证 RabbitMQ 消息不丢失,但是配置了镜像队列要比没有配置镜像队列的可靠性要高很多,在实际生产环境中的关键业务队列一般都会设置镜像队列。
4.8 生产者确认
RabbitMQ提供了两种方式进行确认:
- 事务
- 发送方确认机制
4.8.1 事务
RabbitMQ 客户端中与事务机制相关的方法有三个: ch.Tx
、ch.TxCommit
、ch.TxRollback
。
正常事务流程如下:
事物回滚如下:
需要注意的是,使用事务机制会吸干RabbitMQ 的性能,所以应尽量少使用。
4.8.2 发送方确认机制
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID,一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Ack) 给生产者,这就使得生产者知晓消息已经正确到达了目的地了:
事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续发送下一条消息。相比之下,发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息。
4.9消费端要点介绍
4.9.1 消息分发
当 RabbitMQ 队列拥有多个消费者时,队列收到的消息将以轮询的分发方式发送给消费者。
很多时候轮询的分发机制也不是那么优雅。试想一下,如果某些消费者任务繁重,来不及消费那么多的消息,而某些其他消费者由于某些原因(比如业务逻辑简单、机器性能卓越等)很快地处理完了所分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降。
那么该如何处理这种情况呢?这里就要用到 channel.QosQos(prefetchCount, prefetchSize int, global bool)
这个方法来设置对消费者做一些设置:
- prefetchCount:消费者所能保持的最大未确认消息的数量
- prefetchSize:消费者所能接收未确认消息的总体大小的上限
这种机制可以类比于 TCP/IP中的"滑动窗口"
4.9.2 消息顺序性
消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。
在不使用任何 RabbitMQ 的高级特性 ,也没有消息丢失、网络故障之类异常的情况发生,并且只有一个消费者的情况下,最好也只有一个生产者的情况下可 以保证消息的顺序性。如果有多个生产者同时发送消息,无法确定消息到达 Broker 的前后顺序,也就无法验证消息的顺序性。
4.10 消息传输保证
消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障分为三个层级:
- At Most Once:最多一次。消息可能会丢失,但绝不会重复传输
- At least once:最少一次。消息绝不会丢失,但可能会重复传输。
- Exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次。
RabbitMQ 支持其中的"最多一次 “和"最少一次”, "恰好一次"是 RabbitMQ 目前无法保障的。
RabbitMQ 没有去重的机制来保证‘恰好一次’,不仅是 RabbitMQ ,目前大多数主流的消息中间件都没有消息去重机制,也不保障‘恰好一次’。
去重处理一般是在业务客户端实现,比如引入 GUID 的概念。针对 GUID ,如果从客户端的角度去重,那么需要引入集中式缓存,必然会增加依赖复杂度,另外缓存的大小也难以界定。建议在实际生产环境中,业务方根据自身的业务特性进行去重,比如业务消息本身具备幂等性,或者借助 Redis 等其他产品进行去重处理。