这篇文章主要介绍AMQP 0-9-1 协议,是RabbitMQ支持的协议之一,理解AQMP对于使用和理解RabbitMQ也很有帮助。
AMQP 0-9-1(高级消息队列协议)是一种消息传递协议,它使客户端应用程序能与消息中间件进行通信。消息中间件接收Producer的消息,并将消息路由到Queue,Consumer订阅Queue,就能消费到消息。从AMQP 0-9-1 模型看,Producer发布消息到Exchange(交换器),Exchange根据绑定规则,将消息路由到一个或多个Queue。消息中间件把消息推送给订阅该Queue的Consumer,或者由Consumer从Queue上拉取消息。
下图直管的展示了AMQP 0-9-1 协议中的角色和消息主要流转过程,其中Exchange、Bindings、Qeue是AMQP 0-9-1的主要实体。
Exchange
AMQP 0-9-1 定义了Exchange,用来把消息路由到Queue,路由算法依赖 Binding Rule 和 Exchange 类型,AMQP 0-9-1 提供了4中Exchange类型。
Exchange type | 默认的预定义的名字 |
---|---|
Direct exchange | (Empty string) and amq.direct |
Fanout exchange | amq.fanout |
Topic exchange | amq.topic |
Headers exchange | amq.match (and amq.headers in RabbitMQ) |
Default Exchange
默认的Exchange是没有名字的,它是一种特殊的Direct Exchange。所有的Queue创建完,如果没有手动设置绑定关系,自动绑定到默认的Exchange,Routing Key 和 Queue 名字相同。
Direct Exchange
Direct Exchange 要求发布消息时设置的Routing Key 必须与 Binding Key 一致,才能路由到Queue。它常用的场景是分发消息,根据Routing Key,把消息分发给不同的Consumer。
Fanout Exchange
Fanout Exchange忽略Binding Key,会把消息路由给所有与它绑定的Queues,类似于广播消息。
Topic Exchange
Topic Exchange是一种比较灵活的Exchange,采用模式匹配的方式路由消息,具体的模式匹配规则由各个消息中间件实现。RabbitMQ的实现规则可以参考这里。
Headers Exchange
Headers Exchange根据消息的属性来路由消息,比如设置绑定的属性是x-product=product1,那么只有消息上有x-product=product1这个属性值时才会路由到队列。
Bindings
Bindings是Exchange和Queue绑定的具体规则,Exchange就是根据这个Bindings路由消息到Queue的。发布消息时的Routing Key必须和Binding Key匹配,消息才能正确路由到Queue。
Queue
Queue是实际保存消息的地方,AMQP 0-9-1规定队列名字的长度不能超过255个字节。如果客户端没有指定队列名字,Broker会自动生成队列名字,不管是哪种方式,必须保证队列名字唯一。如果申明的队列已经在Broker存在,那么就会报403错误。
持久性
AMPQ 0-9-1 规定,队列可以申明为持久化和非持久化的,持久化队列会保存到磁盘,包括它的元数据。非持久化的队列只是保存在内存中。
Consumer
AQMP 0-9-1 规定消费者有两种方式消费消息:
- Consumer订阅,Broker推送消息给Consumer,这种方式也是推荐的一种实现。优点是获取消息实时,缺点是失去了对消息控制的灵活性,比如根据机器的性能决定一次消费多少消息。
- Consumer拉取消息,有点是客户端可控性高,缺点是无法准确获知消息何时到达。
消息确认
由于网络是不可靠的,当Broker推送消息给Consumer后,无法知道这条消息是否已经被Consumer接收并正确处理了,Broker也无法判断是否可以移除这条消息。所以AMQP 0-9-1 定义了消息确认机制,有两种不同的模式:
- 在Broker发出消息之后,这种方式容易导致消息丢失。
- 在应用程序返回ack之后。
默认是自动确认,也支持手动确认,推荐使用手动确认,因为客户端程序可以自己控制在何时进行消息确认。
拒绝消息
消费者处理消息有可能失败,或者当前业务逻辑无法正确处理,就可以拒绝消息,此时,消息可能会被丢弃,也有可能重新入队,取决客户端的实现。注意,当队列只有一个订阅者,避免由于处理逻辑不当造成死循环。
批量拒绝消息
消费者可以批量接受消息,如果这批消息都无法处理,或者都处理失败,可以批量拒绝消息。
消息接收数量
当一个队列有多个消费者订阅时,定义每个消费在下一个ack之前能接收的最大消息数量,可以实现消息的负载均衡和提高吞吐量。
消息属性
AMQP 0-9-1 规范定义消息有元数据和消息体,元数据就是消息的属性,比如:
- Content type
- Content encoding
- Routing key
- Delivery mode (persistent or not)
- Message priority
- Message publishing timestamp
- Expiration period
- Publisher application id
消息体就是发送到队列的具体数据,以二进制格式发送,broker接收到消息,不会处理它,根据客户端的设置,保存到磁盘或者内存。
函数列表
AMQP 0-9-1 定义了非常丰富的函数,比如跟exchange相关的如下:
- exchange.declare
- exchange.declare-ok
- exchange.delete
- exchange.delete-ok
完整的函数定义请参考手册
Connection
AMQP 0-9-1 规范定义了连接是建立在TCP的可靠性连接,并且是长连接。当一个客户端不需要连接Broker,AQMP 0-9-1 规范是应该关闭AMQP的连接,而不是底层的TCP连接。
Channel
客户端有可能需要和Broker建立多个连接,那么就需要建立多个TCP连接,而这是很消耗资源和时间的。所以AMQP 0-9-1 规范定义了通道,通道是建立在连接之上的,一个连接可以有多个通道。客户端需要建立多个连接时,可以创建多个通道,每个通过都有唯一的ID。
当连接关闭时,在这个连接之上的所有通过都会关闭。
虚拟主机
这里的虚拟主机的概念是AQMP 0-9-1 中定义的,在一个Broker中(可以理解为一个服务)创建多个环境,用于资源隔离,每个环境中都有各自的Exchange、Bindings、Queue、用户、元数据等等,环境之间资源不会共享。
好了,以上就是对AMQP 0-9-1 的介绍。