RabbitMQ作为分布式消息存储和转发系统,已广泛使用于分布式系统中。本文简要介绍RabbitMQ相关概念、集群架构和消息转发流程,并与Kafka做了简要对比,以加深理解。
1、RabbitMQ相关概念
1.1 AMQP介绍
消息(Message)是应用之间的传送数据,包括文本字符串或者嵌入式对象。消息从一端立即返回到另一端称为同步通信;消息从一端发出后先进入临时缓冲存储,达到一定条件后再发送给另一端称为异步通信。这个临时缓冲存储的具体实现是消息队列(Message queue),消息队列是应用之间的通信方式。
那么什么是AMQP呢?AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
AMQP的内部架构如上图所示:
- Publisher:消息的生产者,表示向消息队列系统发布消息的应用程序
- Consumer:消息的消费者,表示从消息队列中取得消息的应用程序
- Exchange:消息交换机,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。生产者不是直接将消息投递到Queue中的,实际上是生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个个Queue中。
- Binding:用于消息队列和交换机之间的关联,将交换机和消息队列按照路由规则绑定
- Queue:用来保存消息,消息可以投入在多个queue中。消息会一直存在queue中,直到消费者将其取走。
- Connection:网络连接,比如TCP/IP套接字连接
- Channel:消息通道,是建立在真实的TCP连接内的虚拟连接。AMQP协议规定只有通过Channel才能执行AMQP命令,大部分的业务操作如定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等都是通过Channel这个接口完成的。使用channel的原因是因为对于操作系统来说建立和销毁TCP是非常昂贵的开销,如果客户端的每个线程都建立一个TCP连接,系统也将无法承受如此多的TCP连接。
- Virtual Host:虚拟主机,每个Broker内可以有多个虚拟主机,实际上是一个独立的域,拥有自己的队列、交换器、绑定和权限机制。
- Broker:消息队列服务器实体
1.2 RabbitMQ特性
RabbitMQ是AMQP协议的开源实现,基于Erlang语言开发,支持多种客户端,多用于分布式系统中的消息存储和转发。RabbitMQ主要特性如下:
- 可靠性(Reliability):RabbitMQ使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
- 灵活的路由(Flexible Routing):在消息进入队列之前,通过Exchange来路由消息的。RabbitMQ提供一些内置的Exchange实现典型的路由功能,也可以通过插件机制实现需要的Exchange。
- 消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
- 高可用(Highly Available Queues):队列通过在集群中的机器上进行镜像的方式,保证在部分节点出问题的情况下队列仍然可用。
- 多种协议(Multi-protocol):RabbitMQ支持多种消息队列协议,如STOMP、MQTT等等。
- 多语言客户端(Many Clients):RabbitMQ几乎支持所有常用语言,比如Java、Python、.NET、Ruby等。
- 管理界面(Management UI):RabbitMQ提供了一个易用的用户界面,用户可以监控和管理消息Broker。
- 跟踪机制(Tracing):如果消息异常,可以根据消息跟踪机制定位原因。
- 插件机制(Plugin System):RabbitMQ提供了许多插件来进行扩展。
1.3 RabbitMQ使用场景
RabbitMQ作为一个分布式消息存储和转发的系统,适用于以下场景:
- 应用解耦:业务系统中多个系统之间的相互调用很复杂,比如A系统和其它系统严重耦合,A系统产生的关键数据其它系统需要消费,A系统需要时刻维护其它系统的消费情况,是否成功、超时,失败是否需要重发等。如果使用消息队列MQ,A系统只需要将消息发送到MQ中,其它系统只需要从MQ中消费数据,实现了系统之间的解耦。
- 流量消峰:业务系统在秒杀等特殊时段流量出现高峰,但大部分时间段流量相对平稳。这个时候按照流量峰值去部署资源显然是不合理的,是对资源的浪费。因此可以使用消息队列做缓冲,将短时间的业务分散成一段时间来处理,达到消峰的目的。比如A系统每秒钟处理2K个请求,在峰值时段系统请求可能超过10K,已经超过系统的处理能力,此时先将请求接入MQ,A系统再从MQ中慢慢拉取请求。这样处理的缺点就是高峰期会有业务请求积压到MQ中,时效性要求不满足。
- 异步处理:在业务系统中有些处理是可以异步完成的,在同步架构中A系统等待B系统处理完成,B系统却要处理很长时间,在整个业务链上交易的处理时间变长了。如果使用MQ,A系统能够很快的响应客户端的请求,不用等待后面异步处理的结果。
2、RabbitMQ实现原理
2.1 RabbitMQ集群架构
2.1.1 集群模式
RabbitMQ集群设计的目的是允许消费者和生产者在节点奔溃的情况下继续运行,并且通过线性扩展增加节点来提升消息的吞吐量。RabbitMQ集群分为两种模式:
- 普通集群:主备架构,只是实现主备方案,不至于主节点宕机导致整个服务无法使用
- 镜像集群:同步结构,基于普通集群实现的队列同步
1)普通模式
对于普通模式,集群中各节点有相同的队列结构,但消息只会存在于集群中的一个节点也就是Master节点。当消息进入Master节点的Queue后,consumer从slave节点拉取数据,RabbitMQ会临时从Master、slave节点之间进行数据传输,将Master节点中的消息实体数据取出并经过slave节点发送给consumer。
应用场景: 该模式存在一个问题就是当Master节点故障时候,slave节点无法拉取到master节点中的消息数据,因此只是保证了服务可用,但是无法保证高可用。如果做了队列持久化或消息持久化,那么得等Master节点恢复,然后才可被消费,并且在Master节点恢复之前其它节点不能再创建Master节点已经创建过的持久队列;如果没有持久化的话,消息就会失丢。因此,该模式更适合于消息无需持久化的场景,只有当队列非持久化,客户端才能重新连接到集群里的其他节点,并重新创建队列。假如该队列是持久化的,那么唯一办法是将故障节点恢复起来。
2)镜像模式
与普通模式不同之处是消息实体会主动在镜像节点间同步,而不是在获取数据时临时拉取,这种模式能够保证高可用。在镜像模式下包括1个master节点和n个slave节点,当master节点故障时候,会选出新的主并自动切换到新主。在实际的镜像模式架构中,客户端通过LVS负载均衡连接到HAProxy,HAProxy再将访问请求代理到管理的RabbitMQ Server上,实现高可用和负载功能。
应用场景:镜像场景适用于对可靠性要求高的场景,镜像集群也是基于普通集群,只有先搭建普通集群,然后才能设置镜像队列。但是镜像模式中如果镜像队列过多并且消息体量过大,集群内部的网络带宽被这种消息镜像同步占用,影响系统的性能。
2.1.2 集群间数据同步
RabbitMQ在默认情况下考虑存储空间、性能的原因,所以集群内部仅同步元数据
- 队列元数据:包括队列名称和它们的属性,比如是否可持久化、是否自动删除
- 交换器元数据:交换器名称、类型、属性
- 绑定元数据:内部是一张表格记录如何将消息路由到队列
- vhost元数据:为vhost内部的队列、交换器、绑定提供命名空间和安全属性
在RabbitMQ集群中,元数据信息在所有节点中都是一致的。因此,当用户访问其中任何一个RabbitMQ节点时,通过rabbitmqctl查询到的queue/user/exchange/vhost等信息都是相同的。
2.1.3 集群节点类型
RabbitMQ集群中有RAW Node和Disk Node两种类型:
- RAM node内存节点:将所有的队列、交换机、绑定、用户、权限和vhost的元数据定义存储在内存中,好处是可以使得像交换机和队列声明等操作更加的快速。
- Disk node磁盘节点:将元数据存储在磁盘中,单节点系统只允许磁盘类型的节点,防止重启RabbitMQ的时候,丢失系统的配置信息。
对于RabbitMQ集群,要求集群中至少有一个disk node,其它节点可以是RAW Node。当节点加入或离开集群时,需要将变更通知到至少一个磁盘节点。如果只有一个磁盘节点,刚好又是该节点崩溃了,那么集群可以继续路由消息,但是不能创建队列、创建交换器、创建绑定、添加用户、更改权限、添加或删除集群节点。因此通常在集群中添加2台以上的磁盘节点,这样其中一台发生故障了,集群仍然可以保持运行,且能够在任何时候保存元数据变更。
2.2 RabbitMQ消息转发流程
- 建立连接:由生产者和消费者创建连接,连接到broker的物理节点上
- 建立消息通道channel:Channel是建立在连接之上的,一个连接可以建立多个通道。生产者会连接virtual host建立通道;消费者连接到相应的queue上建立通道。
- 发送消息:生产者发送消息到Broker中的Exchange中。
- 路由转发:生产者在发送消息时,需要指定一个RoutingKey和Exchange,Exchange收到消息后可以看到消息中指定的RoutingKey,再根据当前Exchange的ExchangeType,按一定的规则将消息转发到相应的queue中去。
- 消息接收:消费者会监听相应的queue,一旦queue中有可以消费的消息,queue就将消息发送给消费端。
- 消息确认:当消费者完成某一条消息的处理之后,需要发送一条ACK消息给对应的Queue。Queue收到ACK信息后,才会认为消息处理成功,并将消息从Queue中移除;如果在对应的Channel断开后,Queue没有收到这条消息的ACK信息,该消息将被发送给另外的Channel。消息的确认机制提高了通信的可靠性。
2.3 Exchange类型
Exchange根据不同的exchange type和Binding规则进行消息的路由分发,目前常用的有4种exchange:Direct exchange、Fanout exchange、Topic exchange和headers。
2.3.1 Direct exchange直接转发路由
Direct exchange的实现原理是通过消息中的路由键(routing key),与queue中的routing key进行比对,如果二者匹配,则将消息发送到这个消息队列。
如图所示,生产者以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…)和Queue2(amqp.gen-Agl…);如果以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2;如果以其他routingKey发送消息,则消息不会路由到这两个Queue中。
2.3.2 Fanout exchange复制分发路由
Fanout exchange路由不需要routkey,当exchange收到消息后,会将消息复制多份转发给绑定的消息队列。这种路由策略有点像路由广播,每台子网内的主机都获得了一份复制的消息,这种路由机制是最快的。
如图所示,生产者发送到Exchange的所有消息都会路由到图中的两个Queue,并最终被两个消费者C1和C2消费。
2.3.3 Topic exchange通配路由
Topic exchange是direct exchange的通配符模式,通过模式匹配分发消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。exchange支持“#”和“”的通配,“#”匹配0个或多个单词,“”匹配一个单词。
如图所示,routing key为"quick.orange.rabbit"的消息会同时路由到Q1和Q2、routing key为"lazy.orange.elephant"的消息也会同时路由到Q1和Q2。“quick.orange.fox"和"lazy.brown.fox"的消息分别路由到Q1和Q2,但是"lazy.pink.rabbit"只会路由到Q2一次,虽然能匹配到2条规则。其它的比如"quick.brown.fox”,则不能被匹配到会被丢弃。
2.3.4 Headers
Headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在创建queue的时候需要设置绑定的头部信息,分为全部匹配和部分匹配。当消息发送到Exchange时,RabbitMQ会获取到该消息的headers,对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。和其它类型相比,性能会差很多,实际使用的机会很少。
2.4 RabbitMQ消息可靠性
2.4.1 消息持久化
RabbitMQ消息的可靠性是基于持久化实现的,持久化可以防止在异常情况下丢失数据。RabbitMQ的持久化分为三个部分:Exchange持久化、队列持久化和消息的持久化。
1)Exchange持久化
交换器持久化可以通过在声明队列时将durable参数设置为true。如果交换器不设置持久化,那么在RabbitMQ服务重启之后,相关的Exchange元数据会丢失,但是消息不会丢失,只是不能将消息发送到这个Exchange了。
2)队列持久化和消息持久化
队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。队列的持久化可以通过在声明队列时将durable参数设置为true。设置了队列和消息的持久化,当RabbitMQ服务重启之后,消息依然存在。如果只设置队列持久化或者消息持久化,重启之后消息都会消失。
持久化是保证在服务器重启的时候可以保持不丢失相关信息,但是将所有的消息设置为持久化会影响RabbitMQ的性能,因为磁盘的写入速度比内存的写入要慢得多。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。在实际中,需要根据实际情况在可靠性和吞吐量之间做一个权衡。
2.4.2 消息确认机制
为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制。消息确认机制分为生产者和消费者。
1)生产者消息确认机制
生产者将消息投递到所有匹配的队列后,会发送一个确认消息给生产者,使得生产者得知消息已经正确达到。如果消息和队列是持久化存储的,那么确认消息会在消息写入磁盘之后发出。
2)消费者消息确认机制
消费者订阅队列的时候,可以指定autoAck参数
- 当autoAck为true,RabbitMQ采用自动确认模式,RabbitMQ自动把发送出去的消息设置为确认,然后从内存或者硬盘中删除,而不管消费者是否真正消费到了这些消息。
- 当autoAck为false,RabbitMQ会等待消费者回复的确认信号,收到确认信号之后才从内存或者磁盘中删除消息。
消息确认机制是RabbitMQ消息可靠性投递的基础,只要设置autoAck参数为false,消费者就有足够的时间处理消息,不用担心处理消息的过程中消费者进程挂掉后消息丢失的问题。因为RabbitMQ会一直等待并持有消息,直到消费者确认了该消息。
2.4.3 死信队列
当消息在一个队列中变成死信(dead message)时,会通过交换机发送到死信队列中,这个交换机称为DLX(Dead Letter Exchange)。以下场景下消息会变成死信:
- 消息被拒绝(basic.reject 或 basic.nack)并且requeue=false.
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到队列中)
DLX和一般的交换机一样,当这个队列中有死信的时候,RabbitMQ会自动将这个消息重新发送到设置的交换器上,进而被路由到另一个队列。后续的程序可以根据死信队列中的内容分析当时发生的异常,进而改善和优化系统。定义死信队列指定参数:
- x-dead-letter-exchange: 用来设置死信后发送的交换机
- x-dead-letter-routing-key:用来设置死信的 routingKey
2.4.4 延迟队列
延迟队列就是进入该队列的消息会被消费者延迟消费,也就是当消息被生产者发送以后,等待特定的时间后,消费者才能拿到这个消息进行消费。
- 延迟队列适用于需要延迟处理的场景,比如网上购物后的30分钟内进行支付,如果指定时间内没有支付成功,则订单会自动取消。
- 延迟队列的典型应用场景还有延迟重试。比如消费者从队列里面消费消息失败了,可以延迟一段时间以后进行重试。
2.4.5 过期时间
RabbitMQ可以对消息和队列设置TTL(Time To Live),TTL是一条消息在队列中的最大存活时间,单位是毫秒。
- RabbitMQ支持设置消息的过期时间,在消息发送的时候进行指定,每条消息的过期时间可以不同;
- RabbitMQ支持设置队列的过期时间,从消息入队列开始计算,直到超过了队列的超时时间配置,那么消息会变成死信,自动清除。
如果两种方式一起使用,则过期时间以两者中较小的那个数值为准。当然也可以不设置TTL,如果不设置表示消息不会过期;如果设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息将被立即丢弃。
3、RabbitMQ实战体验
3.1 RabbitMQ集群部署
3.1.1 准备erlang环境
1)安装erlang语言环境
[root@tango-centos01 local]# mkdir erlang
[root@tango-centos01 otp_src_25.1.2]# ./configure --prefix=/usr/local/erlang/
[root@tango-centos01 otp_src_25.1.2]# make
[root@tango-centos01 otp_src_25.1.2]# make install
2)设置环境变量
#更新/etc/profile,添加内容
$PATH:/usr/local/erlang/bin
#source /etc/profile
3)检查erlang版本
[root@tango-rac01 ~]# erl
Erlang/OTP 25 [erts-13.1.2] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1]
Eshell V13.1.2 (abort with ^G)
1>
3.1.2 安装RabbitMQ
1)解压安装包
tar -xvf rabbitmq-server-generic-unix-3.11.3.tar.xz
2)启动服务
[root@tango-rac01 sbin]# ./rabbitmq-server -detached
带-detached参数的命令是后台启动方式
3)检查状态
[root@tango-rac01 sbin]# ./rabbitmqctl status
Status of node rabbit@tango-rac01 ...
Runtime
OS PID: 27316
OS: Linux
Uptime (seconds): 41
Is under maintenance?: false
RabbitMQ version: 3.11.3
RabbitMQ release series support status: supported
4)启用RabbitMQ Management管理台
[root@tango-rac01 sbin]# ./rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@tango-rac01:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@tango-rac01...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
started 3 plugins.
5)访问控制管理台http://192.168.112.135:15672/
提示guest用户无法远程访问,需修改rabbit.app配置文件
在/usr/local/rabbitmq_server-3.11.3/plugins/rabbit-3.11.3/ebin修改
{loopback_users, [<<"">>]},
6)重启RabbitMQ服务
./rabbitmqctl shutdown
./rabbitmq-server -detached
7)重新访问控制台
4、RabbitMQ和Kafka对比
在前文中介绍了Kafka相关知识,本文又介绍了RabbitMQ相关概念,二者之间有什么样的区别。RabbitMQ作为一种消息中间件,作为服务总线利用消息队列实现分布式系统中的消息存储与转发;Kafka则是一种分布式流式系统实现,利用消息订阅实现分布式的消息发布。
- 架构模型方面
- RabbitMQ:以broker为中心,具备消费端消费消息的确认机制
- Kafka:以consumer为中心,没有消费端的确认机制
- 负载均衡方面
- RabbitMQ:本身不支持负载均衡,需要第三方lvs支持
- Kafka:采用zookeeper对集群中的broker、topic进行管理,将topic注册到zookeeper上,通过zookeeper的协调机制,producer保存对应的topic的broker信息,可以随机或者轮询发送到broker上,producer可以基于语义指定分片,消息发送到broker的某个分片上。
- 消费者拉取方式的不同
- RabbitMQ:采用push的方式,当消息到达队列后,会将消息推送到消费端
- Kafka:采用pull的方式,当消息到达topic后,消费者端主动从订阅拉取数据
- 消息处理完后处理方式的不同
- RabbitMQ:被消费者端确认消费了的消息会从磁盘删除
- Kafka:消息消费后依旧保留在磁盘
- 生产者发送消息到broker的方式不同
- RabbitMQ:当为主从集群的时候,生产者连接到谁,发送消息就到对应的机器上,其他机器只是存储元数据。消费者连接时,只需要连接任意集群中的任意一台服务器,获取数据时都可以通过元数据经过路由到达实际存储队列消息的那台服务器。
- Kafka:当生产者发送消息时,必须发送到master分片所在的机器。为了实现这一个功能,kafka在连接集群时,只要连接到任意一台或多台服务器,就可以知道整个集群的情况,其中包含了集群所有机器的ip地址,分片的信息。
- 副本同步对性能影响
- RabbitMQ:新节点加入时,如果ha-sync-mode=manual,则不会手动同步镜像到新节点。如果ha-sync-mode=automatic时,会自动同步到新节点中。在同步新节点时,主节点不会再接收生产者的消息,也不会push消息到消费者,就是一种stop-the-world的状态。如果存量消息过多,则会导致生产者和消费者请求超时,可以使用设置重试规则解决。
- Kafka:新的节点加入,会主动从主分区拉取数据,等待数据拉取完成(不包含未提交的,只包含所有已提交数据)后才把该节点加入到集群中。
RabbitMQ和Kafka对比如下表所示:
参考资料:
- https://www.rabbitmq.com/getstarted.html
- https://www.jianshu.com/p/79ca08116d57
- https://cloud.tencent.com/developer/article/1391426
- https://zhuanlan.zhihu.com/p/554203671
- https://my.oschina.net/jikeh/blog/2207127
- https://blog.csdn.net/qq_18478183/article/details/113877158
- https://blog.csdn.net/guiripei/article/details/124347066