文章目录
- 简单介绍RabbitMQ
- RabbitMQ架构
- 什么是 RabbitMQ?有什么显著的特点?
- RabbitMQ 有那些基本概念?
- RabbitMQ routing 路由模式
- 消息怎么路由?
- RabbitMQ publish/subscribe 发布订阅(共享资源)
- 能够在地理上分开的不同数据中心使用 RabbitMQ cluster 么?
- 什么情况下会出现 blackholed 问题?
- 消息如何分发?
- Basic.Reject 的用法是什么?
- 什么是 Binding 绑定?
- RabbitMQ如何确保消息的不丢失
- RabbitMQ如何避免消息堆积
- 如何保证RabbitMQ的高可用
- RabbitMQ 如何构建集群?
- RabbitMQ 支持哪些消息模式?
- RabbitMQ 中有哪几种交换机类型?
- RabbitMQ 如何实现消息的持久化?
- RabbitMQ 是如何实现死信队列的?
- RabbitMQ 中如何进行事务处理?
- 聊一聊常用的 RabbitMQ 插件
简单介绍RabbitMQ
RabbitMQ是一款基于AMQP协议的、稳定易用的消息中间件
其稳定体现在其确保消息的不丢失能力,通过从生产端、broker端、消费者端来保障。
另外其支持延时队列、死信机制等,提高了它的使用覆盖场景。
RabbitMQ架构
RabbitMQ架构会涉及如下模型:Producer、Consumer、Queue、Exchange、Broker、RoutingKey、BindingKey。
什么是 RabbitMQ?有什么显著的特点?
RabbitMQ 是一个开源的消息中间件,使用 Erlang 语言开发。这种语言天生非常适合分布式场景,RabbitMQ 也就非常适用于在分布式应用程序之间传递消息。RabbitMQ 有非常多显著的特点:
- 消息传递模式:RabbitMQ 支持多种消息传递模式,包括发布/订阅、点对点和工作队列等,使其更灵活适用于各种消息通信场景。
- 消息路由和交换机:RabbitMQ 引入了交换机(Exchange)的概念,用于将消息路由到一个或多个队列。这允许根据消息的内容、标签或路由键进行灵活的消息路由,从而实现更复杂的消息传递逻辑。
- 消息确认机制:RabbitMQ 支持消息确认机制,消费者可以确认已成功处理消息。这确保了消息不会在传递后被重复消费,增加了消息的可靠性。
- 可扩展性:RabbitMQ 是高度可扩展的,可以通过添加更多的节点和集群来增加吞吐量和可用性。这使得 RabbitMQ 适用于大规模的分布式系统。
- 多种编程语言支持:RabbitMQ 提供了多种客户端库和插件,支持多种编程语言,包括 Java、Python、Ruby、Node.js 等,使其在不同技术栈中都能方便地集成和使用。
- 消息持久性:RabbitMQ 允许消息和队列的持久性设置,确保消息在 RabbitMQ 重新启动后不会丢失。这对于关键的业务消息非常重要。
- 灵活的插件系统:RabbitMQ 具有丰富的插件系统,使其可以扩展功能,包括管理插件、数据复制插件、分布式部署插件等。
- 管理界面:RabbitMQ 提供了一个易于使用的 Web 管理界面,用于监视和管理队列、交换机、连接和用户权限等。
总之,RabbitMQ 是一个功能丰富、高度可扩展且灵活的消息中间件,适用于各种分布式应用程序和消息通信需求。它的强大功能和广泛的社区支持使其成为一个流行的消息中间件解决方案。
RabbitMQ 有那些基本概念?
- Broker:简单来说就是消息队列服务器实体。
- Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
- Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
- Binding:绑定,它的作用就是把 exchange 和 queue 按照路由规则绑定起来。
- Routing Key:路由关键字,exchange 根据这个关键字进行消息投递。
- VHost:vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限 系统,可以做到 vhost 范围的用户控制。当然,从RabbitMQ 的全局⻆度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
- Producer:消息生产者,就是投递消息的程序。
- Consumer:消息消费者,就是接受消息的程序。
- Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个 channel代表一个会话任务。
- 由 Exchange、Queue、RoutingKey 三个才能决定一个从 Exchange 到 Queue 的唯一的线路。
RabbitMQ routing 路由模式
消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携 带路由字符(对象的方法),交换机根据路由的 key,只能匹配上路由 key 对应的消息队列, 对应的消费者才能消费消息。
根据业务功能定义路由字符串。
从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。
业务场景:error 通知、EXCEPTION、错误通知的功能、传统意义的错误通知、客户 通知、利用 key 路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自 定义消费者,实时接收错误。
消息怎么路由?
消息提供方->路由->一至多个队列消息发布到交换器时,消息将拥有一个路由键 (routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消 息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则)。
常用的交换器主要分为一下三种:
- fanout:如果交换器收到消息,将会广播到所有绑定的队列上。
- direct:如果路由键完全匹配,消息就被投递到相应的队列。
- topic:可以使来自不同源头的消息能够到达同一个队列。 使用 topic 交换器时,可以使用通配符。
RabbitMQ publish/subscribe 发布订阅(共享资源)
- 每个消费者监听自己的队列。
- 生产者将消息发给 broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
能够在地理上分开的不同数据中心使用 RabbitMQ cluster 么?
不能。
- 第一,你无法控制所创建的 queue 实际分布在 cluster 里的哪个 node 上(一般使用HAProxy + cluster 模型时都是这样),这可能会导致各种跨地域访问时的常⻅问题。
- 第二,Erlang 的 OTP 通信框架对延迟的容忍度有限,这可能会触发各种超时,导致业务 疲于处理。
- 第三,在广域网上的连接失效问题将导致经典的“脑裂”问题,而 RabbitMQ 目前无法 处理(该问题主要是说Mnesia)。
什么情况下会出现 blackholed 问题?
blackholed 问题是指,向 exchange 投递了 message ,而由于各种原因导致该 message 丢失,但发送者却不知道。可导致 blackholed 的情况:
- 向未绑定 queue 的 exchange 发送 message;
- exchange 以binding_key key_A 绑定了 queue queue_A,但向该 exchange 发送 message 使用的 routing_key 却是key_B。
消息如何分发?
- 若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。
- 通过路由可实现多消费的功能
Basic.Reject 的用法是什么?
该信令可用于 consumer 对收到的 message 进行 reject 。若在该信令中设置 requeue=true,则当RabbitMQ server 收到该拒绝信令后,会将该 message 重新发 送到下一个处于 consume 状态的 consumer处(理论上仍可能将该消息发送给当前 consumer)。若设置 requeue=false ,则 RabbitMQ server 在收到拒绝信令后,将直 接将该 message 从 queue 中移除。
另外一种移除 queue 中 message 的小技巧是,consumer 回复 Basic.Ack 但不对获 取到的 message 做任何处理。而 Basic.Nack 是对 Basic.Reject 的扩展,以支持一次 拒绝多条 message 的能力。
什么是 Binding 绑定?
通过绑定将交换器和队列关联起来,一般会指定一个 BindingKey,这样 RabbitMq 就知道 如何正确路由消息到队列了。
RabbitMQ如何确保消息的不丢失
RabbitMQ针对消息传递过程中可能发生问题的各个地方,给出了针对性的解决方案:
- 生产者发送消息时可能因为网络问题导致消息没有到达交换机:RabbitMQ提供了publisher confirm机制,生产者发送消息后,可以编写ConfirmCallback函数;消息成功到达交换机后,RabbitMQ会调用ConfirmCallback通知消息的发送者,返回ACK;消息如果未到达交换机,RabbitMQ也会调用ConfirmCallback通知消息的发送者,返回NACK;消息超时未发送成功也会抛出异常。
- 消息到达交换机后,如果未能到达队列,也会导致消息丢失:RabbitMQ提供了publisher return机制,生产者可以定义ReturnCallback函数,消息到达交换机,未到达队列,RabbitMQ会调用ReturnCallback通知发送者,告知失败原因。
- 消息到达队列后,MQ宕机也可能导致丢失消息:RabbitMQ提供了持久化功能,集群的主从备份功能;消息持久化,RabbitMQ会将交换机、队列、消息持久化到磁盘,宕机重启可以恢复消息;镜像集群,仲裁队列,都可以提供主从备份功能,主节点宕机,从节点会自动切换为主,数据依然在。
- 消息投递给消费者后,如果消费者处理不当,也可能导致消息丢失:SpringAMQP基于RabbitMQ提供了消费者确认机制、消费者重试机制,消费者失败处理策略。
消费者的确认机制:
消费者处理消息成功,未出现异常时,Spring返回ACK给RabbitMQ,消息才被移除
消费者处理消息失败,抛出异常,宕机,Spring返回NACK或者不返回结果,消息不被异常
消费者重试机制:
默认情况下,消费者处理失败时,消息会再次回到MQ队列,然后投递给其它消费者。Spring提供的消费者重试机制,则是在处理失败后不返回NACK,而是直接在消费者本地重试。多次重试都失败后,则按照消费者失败处理策略来处理消息。避免了消息频繁入队带来的额外压力。
消费者失败策略:
当消费者多次本地重试失败时,消息默认会丢弃。
Spring提供了Republish策略,在多次重试都失败,耗尽重试次数后,将消息重新投递给指定的异常交换机,并且会携带上异常栈信息,帮助定位问题。
RabbitMQ如何避免消息堆积
消息堆积问题产生的原因往往是因为消息发送的速度超过了消费者消息处理的速度。因此解决方案无外乎以下三点:
- 提高消费者处理速度:优化业务代码,批量处理业务,多线程并发处理业务。
- 增加更多消费者:一个队列绑定多个消费者,共同争抢任务,自然可以提供消息处理的速度。
- 增加队列消息存储上限:在RabbitMQ的1.8版本后,加入了新的队列模式:Lazy Queue。这种队列不会将消息保存在内存中,而是在收到消息后直接写入磁盘中,理论上没有存储上限。可以解决消息堆积问题。
如何保证RabbitMQ的高可用
要实现RabbitMQ的高可用无外乎下面两点:
- 做好交换机、队列、消息的持久化
- 搭建RabbitMQ的镜像集群,做好主从备份。当然也可以使用仲裁队列代替镜像集群。
RabbitMQ 如何构建集群?
RabbitMQ 支持两种主要类型的集群:普通集群(Classic Cluster)和镜像集群(Mirrored Cluster)。他们之间有一些重要的区别:
- 普通集群: 这种模式使用Erlang语言天生具备的集群方式搭建。这种集群模式下,集群的各个节点之间只会有相同的元数据,即队列结构,而消息不会进行冗余,只存在一个节点中。消费时,如果消费的不是存有数据的节点, RabbitMQ会临时在节点之间进行数据传输,将消息从存有数据的节点传输到消费的节点。很显然,这种集群模式的消息可靠性不是很高。因为如果其中有个节点服务宕机了,那这个节点上的数据就无法消费了,需要等到这个节点服务恢复后才能消费,而这时,消费者端已经消费过的消息就有可能给不了服务端正确应答,服务起来后,就会再次消费这些消息,造成这部分消息重复消费。 另外,如果消息没有做持久化,重启就消息就会丢失。并且,这种集群模式也不支持高可用,即当某一个节点服务挂了后,需要手动重启服务,才能保证这一部分消息能正常消费。所以这种集群模式只适合一些对消息安全性不是很高的场景。而在使用这种模式时,消费者应该尽量的连接上每一个节点,减少消息在集群中的传输。
- 镜像集群:这种模式是在普通集群模式基础上的一种增强方案,这也就是RabbitMQ的官方HA高可用方案。需要在搭建了普通集群之后再补充搭建。其本质区别在于,这种模式会在镜像节点中间主动进行消息同步,而不是在客户端拉取消息时临时同步。并且在集群内部有一个算法会选举产生master和slave,当一个master挂了后,也会自动选出一个来。从而给整个集群提供高可用能力。这种模式的消息可靠性更高,因为每个节点上都存着全量的消息。而他的弊端也是明显的,集群内部的网络带宽会被这种同步通讯大量的消耗,进而降低整个集群的性能。这种模式下,队列数量最好不要过多
总的来说,普通集群适用于对性能要求高,但可以接受数据丢失的情况。而镜像集群则适用于对数据持久性和可用性有更高要求,并愿意付出一些性能代价的场景。
RabbitMQ 支持哪些消息模式?
RabbitMQ 支持多种消息传递模式,这些模式允许应用程序在不同的场景下进行灵活的消息交流。以下是几种最常见的消息分发机制:
- workQueue 工作序列机制: Producer 将消息发送到 queue,多个 Consumer 同时消费Queue 上的消息。消息会均匀的分配给多个 Consumer 处理。
- Publish/Subscribe 订阅发布机制: Producer 只负责将消息发送到exchange交换机上。Exchange 将消息转发到所有订阅的 Queue,并由对应的 Consumer 去进行消费
- Routing 基于内容路由机制:在订阅发布机制的基础上,增加一个routingKey,并根据routingKey判断 Exchange 将消息转发到哪些 Queue 上。
- Topic 基于话题路由机制:在基于内容路由的基础上,对routingKey增加了模糊匹配的功能。
另外,RabbitMQ 还支持双向同步的 RPC 机制,不过一般用得比较少。这些消息模式允许开发者根据应用程序的需求选择合适的消息通信方式,以满足不同的业务场景和可靠性要求。不同的模式可以用于构建各种类型的分布式系统和应用程序。
RabbitMQ 中有哪几种交换机类型?
RabbitMQ 支持多种交换机(Exchange)类型,每种类型都用于不同的消息路由和分发策略:
- Direct Exchange:这种交换机根据消息的路由键(Routing Key)将消息发送到与之完全匹配的队列。只有当消息的路由键与队列绑定时指定的路由键完全相同时,消息才会被路由到队列。这是一种简单的路由策略,适用于点对点通信。
- Topic Exchange:这种交换机根据消息的路由键与队列绑定时指定的路由键模式(通配符)匹配程度,将消息路由到一个或多个队列。路由键可以使用通配符符号 *(匹配一个单词)和 #(匹配零个或多个单词),允许更灵活的消息路由。用于发布/订阅模式和复杂的消息路由需求。
- Headers Exchange:这种交换机根据消息的标头信息(Headers)来决定消息的路由,而不是使用路由键。队列和交换机之间的绑定规则是根据标头键值对来定义的,只有当消息的标头与绑定规则完全匹配时,消息才会被路由到队列。适用于需要复杂消息匹配的场景。
- Fanout Exchange:这种交换机将消息广播到与之绑定的所有队列,无论消息的路由键是什么。用于发布/订阅模式,其中一个消息被广播给所有订阅者。
- Default Exchange:这是 RabbitMQ 默认实现的一种交换机,它不需要手动创建。当消息发布到默认交换机时,路由键会被解释为队列的名称,消息会被路由到与路由键名称相同的队列。默认交换机通常用于点对点通信,但不支持复杂的路由策略。
这些不同类型的交换机允许你在 RabbitMQ 中实现各种不同的消息路由和分发策略,以满足不同的应用需求。选择适当的交换机类型对于有效的消息传递非常重要。
RabbitMQ 如何实现消息的持久化?
RabbitMQ 允许消息的持久化,以确保即使在 RabbitMQ 服务器重新启动后,消息也不会丢失。RabbitMQ 可以通过以下方式实现消息的持久化:
- 消息持久化:在 RabbitMQ 中,只需要在发送消息时,将delivery_mode属性设置为 2,就可以将消息标记为持久化。
- 队列持久化:在 RabbitMQ 中声明队列时,也可以将队列声明为持久化。RabbitMQ 中的队列分为三种不同类型经典队列,仲裁队列和流式队列。其中,经典队列需要将durable属性设置为true。而仲裁队列和流式队列默认必须持久化保存。
- 交换机持久化:与经典队列类似,RabbitMQ 也可以在声明交换机时,将交换机的 durable 属性设置为true,这样就可以将交换机标记为持久化。
RabbitMQ 的持久化机制会对其性能产生影响。因此,需要根据具体的业务场景和需求来权衡是否需要持久化以及需要哪种类型的持久化。
RabbitMQ 是如何实现死信队列的?
死信队列是 RabbitMQ 提供的一种特殊序列,处理那些无法被正常消费的消息。有三种情况会产生死信:
- 消息被消费者明确拒绝。
- 消息达到预设的过期时间仍没有消费者消费。
- 消息由于队列已经达到最大长度限制而被丢弃。
在 RabbitMQ 中,实现死信队列只需要给正常队列增加三个核心参数即可:
- dead-letter-exchange:指定当前队列对应的死信队列
- dead-letter-routing-key:指定消息转入死信队列时的路由键
- message-ttl:消息在队列中的过期时间。
接下来,就可以往正常队列中发送消息。如果消息满足了某些条件,就会成为死信,并被重新发送到对应的死信队列中。而此时,RabbitMQ 会在消息的头部添加一些与死信相关的补充信息,例如时间、成为死信的原因、原队列等。应用程序可以按需处理这些补充的信息。
最后,死信队列中的消息都是正常业务处理失败的消息,应用程序需要创建一个消费者来专门处理这些被遗漏的消息。例如记录日志、发送警报等。这样才能保证业务数据的完整性。
RabbitMQ 中如何进行事务处理?
RabbitMQ 提供了事务处理机制,允许生产者在发送消息时将操作包装在一个事务中,以确保消息的可靠性传递。在 RabbitMQ 中,事务是通过通道(Channel)来实现的。可以通过以下步骤进行事务处理:
- 开启事务:在生产者端,可以通过调用 Channel 的 tx_select 方法来开启一个事务。这将启动一个新的事务,并将所有后续的消息发布操作放在该事务内。
- 发送消息:接下来在事务中,可以正常发送消息。如果消息发送失败,事务会自动回滚。
- 提交事务:如果事务中所有消息发送成功后,需要提交事务。可以通过调用 Channel 的tx_commit方法提交事务。
- 处理异常:如果在事务过程中发生异常,可以使用 try/catch 快来捕获异常。然后在异常处理过程中,调用 Channel 的 tx_rollback 方法来回滚 RabbitMQ 相关的事务操作。
需要注意的是,RabbitMQ 的事务处理是基于存储过程的,它可以保证在事务中的操作要么全部成功,要么全部失败。但是,由于 RabbitMQ 是一个异步的消息队列系统,事务处理可能会对其性能产生影响。因此,需要根据具体的应用场景和需求来权衡是否需要使用事务以及如何使用事务。
聊一聊常用的 RabbitMQ 插件
RabbitMQ 支持许多插件,这些插件可以扩展 RabbitMQ 的功能和特性。以下是一些常用的 RabbitMQ 插件:
- Management Plugin:RabbitMQ 管理插件提供了一个 Web 管理界面,用于监控和管理 RabbitMQ 服务器。可以查看队列、交换机、连接、通道等的状态,并进行配置和操作。
- Shovel Plugin:Shovel 插件用于将消息从一个 RabbitMQ 服务器传递到另一个 RabbitMQ 服务器,实现消息复制和跨集群通信。它可以用于实现数据复制、故障恢复、数据中心间同步等。
- Federation Plugin:Federation 插件允许不同 RabbitMQ 集群之间建立联合,实现消息的跨集群传递。这对于构建分布式系统、将消息从一个地理位置传递到另一个地理位置非常有用。
- STOMP Plugin:STOMP插件允许使用 STOMP 协议与 RabbitMQ 进行通信。这对于使用非 AMQP 协议的客户端与 RabbitMQ 交互非常有用,例如使用 WebSocket 的 Web 应用程序。
- Prometheus Plugin:Prometheus 插件用于将 RabbitMQ 的性能指标导出到 Prometheus 监控系统,以便进行性能监控和警报。
- Delayed Message Plugin:延迟消息插件允许发布延迟交付的消息,使你能够在稍后的时间点将消息传递给消费者。这对于实现定时任务、延迟重试等场景非常有用。
这些插件提供了丰富的功能扩展,可以根据需求选择并配置它们