一.AMQP 是什么
AMQP(Advanced Message Queuing Protocol, 高级消息队列协议)是一个提供统一消息服务的 应用层标准高级 消息队列协议,是 应用层协议的一个 开放标准,为面向消息的中间件设计,是一个进程间传递 异步消息的 网络协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。目前个实现了AMQP协议标准的开源 消息代理和队列服务器的有ActiveMQ、 RabbitMQ,ZeroMQ,炙手可热的 Kafka,MetaMQ,阿里巴巴的 RocketMQ
二.AMQP模型
从上图,可以了解到AMQP包括以下主要组件和逻辑概念:
Publisher(生产者):消息生产者,AMQP定义了消息的格式,所以生产者需要按照AMQP消息格式生产数据
Broker(服务器):Broker是一个物理上的服务器(或虚拟机),它是部署了消息中间件并接收处理客户端请求的实体
Exchange(交换机):它是一个逻辑上的概念,负责接收消息,是整个消息中间件的入口
Queue(队列):它负责保存消息,并将消息转发给消费者
Binding(绑定):它是Exchange与Queue之间的虚拟连接,实现了根据不同的Routing Key(路由规则)将消息路由到对应的Queue上
Message(消息):消息,本质上就是服务器与客户端传输的数据,由元信息和消息体组成
Virtual Host(虚拟主机):它是一个逻辑上的概念,一个Broker上可以有多个Virtual Host,它起到一个命名空间的作用,可以让服务于不同业务的Exchange和Queue隔离开,是实现权限控制的最小单位
Connection(连接):客户端与服务器之间的网络连接
Channel(信道):一次客户端与服务器之间的通信,相当于JMS中的Session的概
1.请求过程
(1).生产者(Producer)发布消息(Message),经由交换机(Exchange)
(2).交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)
(3).最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取
2.深入理解
(1).生产者、交换机、队列、消费者都可以有多个,因为 AMQP 是一个网络协议,所以这个请求过程中的生产者,消费者,消息代理可以分别存在于不同的设备上
(2).生产者发布消息时可以给消息指定各种消息属性(Message Meta-data),有些属性有可能会被消息代理(Brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用
(3).从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失,基于此原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制
当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的 确认回执( Acknowledgement)后,才完全从队列中删除
(4).在某些情况下,例如当一个消息无法被成功路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃,或者,如果消息代理执行了延期操作,消息会被放入一个死信队列中,此时,消息发布者可以选择某些参数来处理这些特殊情况
三.Exchange交换机
1.介绍
交换机是用来发送消息的 AMQP 实体,交换机拿到一个消息之后将它 路由给一个或零个队列,它使用哪种路由算法是由交换机类型和绑定(Bindings)规则所决定的,AMQP 0-9-1 的代理提供了 四种交换机类型:
Name(交换机类型) | Default pre-declared names(预声明的默认名称) |
Direct exchange(直连交换机) | (Empty string) and amq.direct |
Fanout exchange(扇型交换机) | amq.fanout |
Topic exchange(主题交换机) | amq.topic |
Headers exchange(头交换机) | amq.match (and amq.headers in RabbitMQ) |
除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:
Name
Durability (消息代理重启后,交换机是否还存在)
Auto-delete (当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它)
Arguments(依赖代理本身)
交换机可以有两个状态:持久(durable)、暂存(transient)
持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。
注意:
(1).并不是所有的应用场景都需要持久化的交换机
(2).生产者生产的消息中包含了交换机类型,消息中声明的交换机类型不同,路由规则也就不同,也就会采取不同的规则将消息投入队列
2.默认交换机
默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的 直连交换机(direct exchange),它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的 路由键(routing key)名称与 队列名称相同
举个例子:
当声明了一个名为 “search-indexing-online” 的队列,AMQP 代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为 “search-indexing-online”。因此,当携带着名为 “search-indexing-online” 的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为 “search-indexing-online” 的队列中
3.直连交换机
直连型交换机(direct exchange)是根据消息携带的 路由键(routing key)将消息投递给对应 绑定键的队列。直连交换机用来处理消息的 单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:
1)将一个队列绑定到某个交换机上时,赋予该绑定一个绑定键(Binding Key),假设为R;
2)当一个携带着路由键(Routing Key)为R的消息被发送给直连交换机时,交换机会把它路由给绑定键为R的队列。
直连交换机的队列通常是循环分发任务给多个消费者(称之为轮询),比如说有3个消费者,4个任务,分别分发每个消费者一个任务后,第4个任务又分发给了第一个消费者。综上,得出一个结论:在 AMQP 0-9-1 中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间
直连型交换机图例:
步骤:
当生产者(P)发送消息,路由规则为 Rotuing key=booking 时,这时候将消息传送给 Exchange,Exchange 获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的 Queue,这时发现 Queue1 和 Queue2 都符合,就会将消息传送给这两个队列。
如果以 Rotuing key=create 和 Rotuing key=confirm 发送消息时,这时消息只会被推送到 Queue2 队列中,其他 Routing Key 的消息将会被丢弃
4.扇型交换机
扇型交换机(funout exchange)将消息路由给绑定到它身上的 所有队列,而不 理会绑定的路由键。如果 N 个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的 N 个队列,扇型用来交换机处理消息的 广播路由(broadcast routing)
因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以他的应用案例都极其相似:
大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
分发系统使用它来广播各种状态和配置更新
在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP 没有内置 presence 的概念,因此 XMPP 可能会是个更好的选择)
扇型交换机图例:
上图所示,生产者(P)生产消息 1 将消息 1 推送到 Exchange,由于 Exchange Type=fanout 这时候会遵循 fanout 的规则将消息推送到所有与它绑定 Queue,也就是图上的两个 Queue 最后两个消费者消费。
5.主题交换机
前面提到的 direct 规则是 严格意义上的 匹配,换言之 Routing Key 必须与 Binding Key 相匹配的时候才将消息传送给 Queue.而Topic 的路由规则是一种 模糊匹配,可以通过通配符满足一部分规则就可以传送
它的约定是:
(1).binding key 中可以存在两种特殊字符 “” 与“#”,用于做模糊匹配,其中 “” 用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
(2).routing key 为一个句点号 “.” 分隔的字符串(我们将被句点号 “. ” 分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”,binding key 与 routing key 一样也是句点号 “.” 分隔的字符串
主题交换机图例:
当生产者发送消息 Routing Key=F.C.E 的时候,这时候只满足 Queue1,所以会被路由到 Queue 中,如果 Routing Key=A.C.E 这时候会被同是路由到 Queue1 和 Queue2 中,如果 Routing Key=A.F.B 时,这里只会发送一条消息到 Queue2 中
主题交换机拥有非常广泛的用户案例。无论何时,当一个问题涉及到那些想要有针对性的选择需要接收消息的 多消费者 / 多应用(multiple consumers/applications) 的时候,主题交换机都可以被列入考虑范围
案例:
分发有关于特定地理位置的数据,例如销售点
由多个工作者(workers)完成的后台任务,每个工作者负责处理某些特定的任务
股票价格更新(以及其他类型的金融数据更新)
涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)
云端的不同种类服务的协调
分布式架构 / 基于系统的软件封装,其中每个构建者仅能处理一个特定的架构或者系统。
6.头交换机
headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配
头交换机可以视为 直连交换机的 另一种表现形式,但直连交换机的路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等,灵活性更强(但实际上却很少用到头交换机)
工作步骤:
(1).绑定一个队列到头交换机上时,会同时绑定多个用于匹配的头(header)
(2).传来的消息会携带header,以及会有一个 “x-match” 参数,当 “x-match” 设置为 “any” 时,消息头的任意一个值被匹配就可以满足条件,而当 “x-match” 设置为 “all” 的时候,就需要消息头的所有值都匹配成功
7.交换机小结
类型名称 | 路由规则 |
Default | 自动命名的直交换机 |
Direct | Routing Key==Binding Key,严格匹配 |
Fanout | 把发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中 |
Topic | Routing Key==Binding Key,模糊匹配 |
Headers | 根据发送的消息内容中的 headers 属性进行匹配 |
四.Queue队列
AMQP 中的 队列(queue)跟其他消息队列或任务队列中的队列是很相似的: 它们存储着即将被应用消费掉的消息
队列的特性是先进先出
生产者Send Message “A”被传送到Queue中,消费者发现消息队列Queue中有订阅的消息,就会将这条消息A读取出来进行一些列的业务操作。这里只是一个消费正对应一个队列Queue,也可以多个消费者订阅同一个队列Queue,当然这里就会将Queue里面的消息平分给其他的消费者,但是会存在一个一个问题就是如果每个消息的处理时间不同,就会导致某些消费者一直在忙碌中,而有的消费者处理完了消息后一直处于空闲状态,因为前面已经提及到了Queue会平分这些消息给相应的消费者。这里我们就可以使用prefetchCount来限制每次发送给消费者消息的个数。详情见下图所示:
这里的prefetchCount=1是指每次从Queue中发送一条消息来,等消费者处理完这条消息后Queue会再发送一条消息给消费者
1.队列属性
队列跟交换机共享某些属性,但是队列也有一些另外的属性
Name
Durable(消息代理重启后,队列依旧存在)
Exclusive(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)
Auto-delete(当最后一个消费者退订后即被删除)
Arguments(一些消息代理用他来完成类似与 TTL 的某些额外功能)
2.队列创建
队列在 声明(declare)后才能被使用,如果一个队列尚不存在,声明一个队列会 创建它,如果声明的队列已经存在,并且 属性完全相同,那么此次声明不会对原有队列产生任何影响,如果声明中的属性与已存在队列的属性 有差异,那么一个 错误代码为 406 的通道级 异常就会被抛出
3.队列持久化
持久化队列(Durable queues)会被存储在 磁盘上,当 消息代理(broker)重启的时候,它依旧存在,没有被持久化的队列称作 暂存队列(Transient queues),并不是所有的场景和案例都需要将队列持久化,持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被 重新声明,无论怎样, 只有经过持久化的消息才能被重新恢复
五.Consumer消费者
消息如果只是存储在队列里是没有任何用处的,被应用消费掉,消息的价值才能够体现,在 AMQP 0-9-1 模型中,有两种途径可以达到此目的:
(1).将消息投递给应用 (“push API”)
(2).应用根据需要主动获取消息 (“pull API”)
使用 push API,应用(application)需要明确表示出它在某个特定队列里所感兴趣的,想要消费的消息。如是,我们可以说应用注册了一个消费者,或者说订阅了一个队列。一个队列可以注册多个消费者,也可以注册一个独享的消费者(当独享消费者存在时,其他消费者即被排除在外)。
每个消费者(订阅者)都有一个叫做消费者标签的 标识符,它可以被用来退订消息, 消费者标签实际上是一个字符串
六.消息机制
1.消息确认
消费者应用(Consumer applications)(用来接受和处理消息的应用)在处理消息的时候偶尔会 失败或者有时会 直接崩溃,而且网络原因也有可能引起各种问题。这就给出了个难题, AMQP 代理在什么时候删除消息才是正确的?AMQP 0-9-1 规范给我们两种建议:
(1).自动确认模式
当 消息代理(broker)将消息发送给应用后立即删除。(使用 AMQP 方法: basic.deliver 或 basic.get-ok))
(2).显式确认模式
待应用(application)发送一个 确认回执(acknowledgement)后再删除消息。(使用 AMQP 方法: basic.ack)
如果一个消费者在尚未发送确认回执的情况下挂掉了,那 AMQP 代理会将消息 重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递
2.拒绝消息
当一个消费者接收到某条消息后,处理过程有 可能成功,有 可能失败,应用可以向消息代理表明,本条消息由于 “ 拒绝消息(Rejecting Messages)” 的原因处理失败了(或者未能在此时完成),当拒绝某条消息时,应用可以告诉消息代理如何处理这条消息—— 销毁它或者 重新放入队列
当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。
在 AMQP 中,basic.reject 方法用来执行拒绝消息的操作。但 basic.reject 有个限制:不能使用它决绝多个带有确认回执(acknowledgements)的消息。但是如果使用的是 RabbitMQ,那么可以使用被称作 negative acknowledgements(也叫 nacks)的 AMQP 0-9-1 扩展来解决这个问题
3.预取消息
在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息是非常有用的。这可以在试图批量发布消息的时候起到简单的负载均衡和提高消息吞吐量的作用
4.消息属性
AMQP 模型中的消息(Message)对象是带有属性(Attributes)的,有些属性及其常见,以至于 AMQP 0-9-1 明确的定义了它们,并且应用开发者们无需费心思思考这些属性名字所代表的具体含义。例如:
Content type | 内容类型 |
Content encoding | 内容编码 |
Routing key | 路由键 |
Delivery mode | persistent or not |
投递模式 | 持久化 或 非持久化 |
Message priority | 消息优先权 |
Message publishing timestamp | 消息发布的时间戳 |
Expiration period | 消息有效期 |
Publisher application id | Publisher application id |
有些属性是被 AMQP 代理所使用的,但是大多数是开放给接收它们的应用解释器用的,有些属性是可选的也被称作消息头(headers),他们跟 HTTP 协议的 X-Headers 很相似,消息属性需要在消息被发布的时候定义
5.消息主体
AMQP 的消息除属性外,也含有一个 有效载荷 - P ayload(消息实际携带的数据),它被 AMQP 代理当作 不透明的字节数组来对待
消息代理不会检查或者修改有效载荷,消息可以只包含属性而不携带有效载荷,它通常会使用类似 JSON 这种序列化的格式数据,为了节省,协议缓冲器和 MessagePack 将结构化数据序列化,以便以消息的有效载荷的形式发布。AMQP 及其同行者们通常使用 “content-type” 和 “content-encoding” 这两个字段来与消息沟通进行有效载荷的辨识工作,但这仅仅是基于约定而已
6.消息持久化
消息能够以持久化的方式发布,AMQP 代理会将此消息存储在 磁盘上,如果服务器重启,系统会确认收到的持久化消息未丢失
简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:它完全取决与消息本身的持久模式(persistence mode),将消息以持久化方式发布时,会对性能造成一定的影响(就像数据库操作一样,健壮性的存在必定造成一些性能牺牲)
7.确认回执
由于网络的不确定性和应用失败的可能性,处理确认回执(acknowledgement)就变的十分重要,有时确认消费者收到消息就可以了,有时确认回执意味着消息已被验证并且处理完毕,例如对某些数据已经验证完毕并且进行了数据存储或者索引操作。
这种情形很常见,所以 AMQP 0-9-1 内置了一个功能叫做 消息确认(message acknowledgements),消费者用它来确认消息已经被接收或者处理。如果一个应用崩溃掉(此时连接会断掉,所以AMQP代理亦会得知),而且消息的确认回执功能已经被开启,但是消息代理尚未获得确认回执,那么消息会被重新放入队列(并且在还有还有其他消费者存在于此队列的前提下,立即投递给另外一个消费者)。协议内置的消息确认功能将帮助开发者建立强大的软件
七.其他
连接
一个网络连接,比如TCP/IP套接字连接,AMQP 连接通常是长连接,AMQP 是一个使用 TCP 提供可靠投递的应用层协议。AMQP 使用认证机制并且提供 TLS(SSL)保护,当一个应用不再需要连接AMQP 代理的时候,需要优雅的释放掉 AMQP 连接,而不是直接将 TCP 连接关闭
2.通道
有些应用需要与 AMQP 代理建立多个连接,无论怎样,同时开启多个 TCP 连接都是不合适的,因为这样做会消耗掉过多的系统资源并且使得防火墙的配置更加困难,AMQP 0-9-1 提供了通道(channels)来处理多连接,可以把通道理解成共享一个 TCP 连接的多个轻量化连接,在涉及多线程 / 进程的应用中,为每个线程 / 进程开启一个通道(channel)是很常见的,并且这些通道不能被线程 / 进程共享。一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个 AMQP 方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。
3.虚拟主机
为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP 提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟 Web servers 虚拟主机概念非常相似,这为 AMQP 实体提供了完全隔离的环境。当连接被建立的时候,AMQP 客户端来指定使用哪个虚拟主机
4.AMQP 是可扩展的
(1).定制化交换机类型:可以让开发者们实现一些开箱即用的交换机类型尚未很好覆盖的路由方案:例如 geodata-based routing。)
(2).交换机和队列的声明中可以包含一些消息代理能够用到的额外属性:例如 RabbitMQ 中的 per-queue message TTL 即是使用该方式实现。)
(3).特定消息代理的协议扩展:例如 RabbitMQ 所实现的扩展,新的 AMQP 0-9-1 方法类可被引入)
(4).消息代理可以被其他的插件扩展:例如 RabbitMQ 的管理前端 和 已经被插件化的 HTTP API
参考:RabbitMQ 中文文档-AMQP协议讲解