本文也是笔者一直没有去详细学习的一个重要知识点MQ,也是架构中非常重要的一个中间件。
主要从Rabbit官网于Spring AMQP官方文档的角度去详细学习MQ
官方文档
Rabbit
Spring AMQP
学习结果
测试项目地址
导读
本文主要从以下两个角度去学习MQ
一、RabbitMQ 官方文档
该角度主要从中间件服务的角度去分析MQ的一些核心概念与高可用。
二、SpringAMQP 官方文档
该角度主要从客户端角度去学习怎么接入MQ,并且优化配置。
节点和群集存储可视为架构、元数据或拓扑的信息。 用户、虚拟主机、队列、交换、绑定、运行时参数都属于此类别。 此元数据在 RabbitMQ 术语中称为定义。
Rabbit
一、核心概念
首先从官方的架构文档开始看起(点击标题进入官方文档):
英文描述 | 中文描述 | 描述 |
---|---|---|
VHost | 虚拟主机 | 类似分组的存在,可以用来做服务的隔离,如测试环境与生产环境 |
Exchange | 交换机 | 用户投递消息的入口,对消息识别后,投递到不同的队列中 |
Queue | 队列 | 实际存储消息的队列,接收来自交换机的消息 |
Binding | 绑定 | 定义交换机与队列绑定条件,决定不同条件的消息进入不同的队列 |
上文可能还不够自观,下图具体描述主流程,其中Exchange->Queue/Exchange的条件也就是上文所说的Binding(绑定)
以上是最重要的核心概念,在机构描述中还有一些其他内容:
元数据:也就是整个Rabbit服务信息,也可以理解成配置信息,该数据Rabbit提供了多种导出方案,在迁移或集群之间的复制同步信息时被使用到;
角色:也就是权限系统,也有详细的账号权限控制功能;
单节点安装
Rabbit是基于Erlang开发的,所以安装Rabbit需要Erlang环境,且版本需要匹配。可以点击查看版本匹配表。
受限篇幅长度这里就不列出安装步骤了,笔者可以点击连接后,往下拉官方文档里会教安装。
Linux-Rabbit安装包下载
windows-Rabbit安装包下载
Erlang安装包下载
Rabbit HA
本节主要阐述Rabbit高可用方案,主要有三:
Federation and/or Shovel | Clustering |
---|---|
Brokers 在逻辑上是独立的,可能有不同的所有者。 | 集群形成单个逻辑Broker。 |
Broker可以运行不同(并且在某些方面不兼容)版本的 RabbitMQ 和 Erlang。 | 节点必须运行兼容版本的 RabbitMQ 和 Erlang。 |
Broker可以通过不可靠的 WAN 连接 链接。通信通过 AMQP 0-9-1(可选由 TLS 保护),需要设置适当的用户和权限。 | Broker必须通过合理可靠的 LAN 进行连接 链接。节点将使用共享密钥相互进行身份验证 并选择性地使用启用了 TLS 的链接。 |
Broker可以以任何拓扑结构连接。链接可以是单向的,也可以是双向的。 | 所有节点双向连接到所有其他节点。 |
强调可用性和分区容错 (AP) 来自 CAP定理。 | 强调一致性和分区容错 (CP) 来自 CAP 定理。 |
Broker中的一些Exchange可能是联合的,而有些可能是本地的。 | 全有或全无。 |
连接到任何Broker的客户端只能在该代理中使用非独占队列。 | 连接到任何节点的客户端可以在所有节点上使用非独占队列。 |
官方描述可能还是过于晦涩,笔者用自己理解的白话文进行描述。其主要量大类,根据CAP原理分为AP、CP,提供了三种方案
Clustering
其中队列可以只在某个节点也可以集群所有节点同步队列以达到安全高可用,当然全集群的复制队列虽然安全但性能比较低,为此Rabbit提供了其他两种队列类型以达到更优秀的性能同时保证安全,下文会介绍到。
同时这种CP方案应在局域网内,以减少网路损耗,从而达到高效同步队列数据;
Federation
该类型主要是在不同的Rabbit服务下,通过该模式类似投递的方式也向对象Rabbit服务推送消息。
但需要所有同步所需权限账号才可以进行连接。
Shovel
类似Federation,Shovel 插件工作在一个较低的级别。
federation旨在提供交换机和队列的独立分布,而shovel只是从一个代理上的队列中消费消息,并将它们转发到另一个代理上的交换机。
Clustering
思考过的读者应该可以发觉Clustering集群方案是我们常见的高可用方案,而其他两种更像业务之间的解耦,为其他业务转发消息。而本节主要介绍该方案的详细信息。
核心概念
- 集群是一个或 多个节点,每个节点共享用户、虚拟主机、 队列、交换、绑定、运行时参数和其他分布式状态;
- 集群的组成可以动态变更。 所有 RabbitMQ Broker(节点)一开始都是在单个上运行的节点。这些节点可以加入到集群中,也可以退回到一个Borker;
- 默认创建的队列都只在一个节点上,但通过访问集群中的其中一个节点都可以访问到该队列;
- 使用复制队列和流,可以使得队列在集群节点中复制队列副本,以达到消息的安全;
- 不像Zookeeper的集群有Leader节点(主节点),通常情况下每个节点都是平等的,但复制队列类型的情况下些许不同;
- 因为有几个功能(例如复制队列,MQTT中的客户端跟踪) 需要集群成员之间达成共识,强烈建议使用奇数个集群节点: 1、3、5、7 等。
Delay Queues(延迟队列)
延迟队列,在Rabbit中没有直接提供延迟队列的功能,该功能通过队列的dead-letter-exchange参数配置实现:
认真看图后是可以发现:
- 通过设置消息过期时间
- 投递到无消费者的队列中
- 过期后投递到指定的队列,该队列就是延迟队列
浅看一下Rabbit管理端的配置界面
优化方案:
- 设置预拉取数量dead_letter_worker_consumer_prefetch,可以提高吞吐量-默认预拉取32条;
- 延迟队列会尝试尽可能早地将消息移动到磁盘,惰性模式配置。
Quorum Queues(复制队列)
中文直译复制队列,前面提到了那么多次,本节主要对他进行介绍;
- 复制队列是 RabbitMQ 实现持久、 基于Raft共识算法的复制FIFO队列。 它从 RabbitMQ 3.8.0 开始可用。
- 复制队列针对数据安全性为重中之重。复制队列被视为复制队列类型的默认选项。
- 具有Leader节点与副本节点,Leader节点向副本节点确认存储后才确认消费。
复制队列与 RabbitMQ 中其他类型的队列共享许多基本原理。 但是,它们更专门构建,专注于数据安全和可预测的恢复, 并且不支持某些功能。
Raft协议是一个CP协议,以一套规则来保证分布式数据安全,其主要核心概念是:集群内的部分节点状态达成一致即算成功;
特征 | 经典队列(默认类型) | 仲裁队列 |
---|---|---|
是否可以非持久队列 | 可以 | 不 |
排他性 | 是的 | 不 |
每条消息持久性 | 允许 | 总是 |
集群变更 | 自动 | 手动 |
消息 TTL(生存时间) | 是的 | 是(自 3.10 起) |
队列 TTL | 是的 | 是的 |
队列长度限制 | 是的 | 是 |
懒惰的行为 | 是的 | 始终(从 3.10 开始)或通过内存限制功能(3.10 之前) |
消息优先级 | 是的 | 不 |
消费者优先 | 是的 | 是的 |
死信推Exchange | 是的 | 是的 |
遵守政策 | 是的 | 是(请参阅策略支持) |
有害消息处理 | 不 | 是的 |
全局 QoS 预取 | 是的 | 不 |
复制队列-死信
死信的配置也决定了延迟队列的可靠性。如果不能理解这句话请读者重新阅读上文”延迟队列“
复制队列支持死信交换 (DLX),在集群环境中使用 DLX 并不安全,在3.10 复制队列支持更安全的死信形式,该形式对队列之间的消息传输使用至少一次保证 (下面概述了限制和注意事项):
形式 | 默认 | 基础描述 |
---|---|---|
至少一次 | 可选 | 官方推荐,更加安全 |
最多一次 | 默认 | 具有幂等性,但是在一些设置下会变的不安全 |
首先需要读者认真思考,两者的词意
至少一次:在多种情况下,确保消息安全
最多一次:消息幂等性考虑
一、最多一次
对于复制队列,“最多一次”仍然是默认的死信策略,对于死信更多地是一种信息性质的情况,信息性质应该保证消息不会被重复投递(幂等),但是在传输过程中丢失或在溢出策略不合适的情况下丢失,丢失情况如果可以被情况,那该策略方案是好选择。
二、 至少一次
通过实现一个特殊的内部死信消费者进程来完成的。 其工作原理类似于普通队列消费者,手动确认分开 从它只使用已经死信的消息。
这意味着源复制队列将保留 死信消息,直到它们被确认。内部消费者 将使用死信消息并将其发布到目标队列 发布者确认。只有在发布者确认 收到,因此至少提供一次保证。
三、 配置至少一次
要为源复制队列启用至少一次死信,请应用以下所有策略 (参数x- 开头)
- 将dead-letter-strategy设置为at-least-once(默认值为at-most-once)。
- 将overflow设置为reject-publish(默认值为drop-head)。
- 配置dead-letter-exchange。
- 启用功能标志stream_queue(默认启用) 对于在 3.9 或更高版本中创建的 RabbitMQ 集群)。
- (可选)配置死信路由配置dead-letter-routing-key。
建议另外配置最大长度或最大长度字节,以防止源复制队列中过多的消息堆积
Rabbit官方管理端配置:
四、至少一次的局限性/警告
- 至少一次死信不适用于默认的drop-head溢出 策略,即使未设置队列长度限制。 因此,如果配置了drop-head,则死信将回退最多一次。请改用溢出策略拒绝-发布reject-publish。
- 至少一次死信将需要更多的系统资源,例如内存和 CPU。 因此,仅当要求死信不丢失时,才启用至少一次。
- 确认成为死信后被正常投递,避免队列堆积。
- 每个复制队列,将有一个内部死信 消费者流程。内部死信使用者进程位于复制队列领导节点上,默认每次预拉取最大拉取 32条;
- 如果死信吞吐量较高,则可以通过高级配置文件的 rabbit 应用程序部分中的dead_letter_worker_consumer_prefetch设置来增加预取大小 (每秒数千条消息)是必需的。
- 可以通过设置reject-publish可以动态切换
五、优先级
复制队列不支持设置消息的优先级,只能设置消费者的优先级;
但可以通过在Exchange,将不同优先级的消息投递到不同的队列中来实现优先级的划分;
由于篇幅限制,笔者将重新开一篇来介绍复制队列
👇专门介绍复制队列的博客👇
👉复制队列👈
👆专门介绍复制队列的博客👆
懒惰模式
即 尽可能早地将其内容移动到磁盘, 并且仅在消费者请求时才将它们加载到 RAM 中。
惰性模式下有一些内存于磁盘的负载配置,需要读者自己点开标题进入官网详细阅读
经典队列
可以在惰性模式下运行:即 尽可能早地将其内容移动到磁盘, 并且仅在消费者请求时才将它们加载到 RAM 中。
惰性队列的主要目标之一是能够支持非常 长队列(数百万条消息)。队列可能会变得很长 出于各种原因:
- 消费者离线/已崩溃/停机进行维护
- 消息入口突然激增,生产者超过消费者
- 消费者比正常情况慢
以上问题主要来自磁盘I/O速度导致速率变慢
官网也给出了一份报告:
消息数 | 邮件正文大小 | 消息类型 | 生产者 | 消费者 |
---|---|---|---|---|
1,000,000 | 1,000字节 | Durable | 1 | 0 |
队列模式 | 队列进程内存 | 内存中的消息 | 消息使用的内存 | 节点内存 |
---|---|---|---|---|
默认 | 257兆字节 | 386,307 | 368兆字节 | 734兆字节 |
懒惰模式 | 159 KB | 0 | 0 | 117兆字节 |
两个队列都保留了 1,000,000 条消息,并使用了 1.2 GB 的磁盘空间。
在一些业务场景下也有使用
- 节点内存使用率时,延迟队列是合适的 更高的磁盘 I/O 和磁盘利用率是可以接受的。延迟队列还有其他方面 这应该被考虑。
- 非常适合在延迟队列的中转队列。
复制队列
整体语义相同也是消息会存入磁盘来降低内存消耗。
从 RabbitMQ 3.10 之后
复制队列将其消息内容存储在磁盘上(根据 Raft 要求)和 仅在内存中保留每条消息的少量元数据记录。
但是复制队列的内存限制配置已经没有用了,可以配置最大内存数量x-max-in-memory-length=0。
90000000000
Stream队列
流是一种新的持久和复制的数据结构:
- 以性能为主要目标;
- 提供复制队列的CP高可用;
- 非消费后就删除消息的形式消费消息,允许多个消费者同时消费、重复消费;
- 适用日志队列
- 执行任何操作之前将所有数据保存到磁盘
可以看出该队列就像一个数据库表,按照顺序存储消息,消息在一定限定范围内持久化存储。
持久化保存
流作为不可变的仅追加磁盘日志实现。这意味着 日志将无限增长,直到磁盘用完。为了避免这种不良情况 方案可以为每个流设置保留配置,这将 根据日志数据总大小和/或期限丢弃日志中最旧的数据。
参数配置 | 描述 |
---|---|
max-age | Y, M, D, h, m, s-例如7D(七天) |
max-length-bytes | 最大存储字节数 |
性能特点
由于流在执行任何操作之前将所有数据保存到磁盘,因此建议使用 以尽可能使用最快的磁盘。
由于流的磁盘 I/O 密集型性质,它们的吞吐量会降低 随着邮件大小的增加。
就像仲裁队列一样,流也受到群集大小的影响。 流具有的副本越多,其吞吐量通常越低 因为必须做更多的工作来复制数据并达成共识。
优先队列
RabbitMQ 从版本 3.5.0 开始在核心中实现了优先级队列。 该实现支持有限数量的优先级:255。建议使用 1 到 10 之间的值,使用 x-max-priority 可选队列参数设置。 此参数应为 1 到 255 之间的正整数, 指示队列应支持的最大优先级。
意思可以为队列设置255个等级的消息,每个优先级都有一些内存中和磁盘上成本 每个队列。还有额外的 CPU 成本,从该特性考虑,优先级应该控制在10以内;
需要注意的是,如果消息未发生堆积(消费速度比生产速度快),消息的产生都将被即时消费,那么即使设置了优先级,但因为消息被即刻消费导致没有机会被排序,那么该功能将失效;
安全性
保证消息安全也是关键:
默认手动ack前提下
消息的安全也需要在开启ack模式下,Mq告知消息处理失败,且业务端也要做出一定处理
队列 | 安全性 | 描述 |
---|---|---|
经典队列 | 不安全 | 只在单个broker存储 |
复制队列 | 安全 | 在集群大多数可用下安全 |
Stream | 特定情况不安全 | 服务器宕机时缓存数据丢失 |
Spring AMQP
连接池类型
连接类型 | 推荐指数 | 描述 |
---|---|---|
PooledChannelConnectionFactory | 其次 | 基于 Apache Pool2 管理单个连接和两个通道池。 一个池用于事务通道,另两个个池用于非事务通道 |
ThreadChannelConnectionFactory | 推荐 | 一个池用于事务通道,另两个个池用于非事务通道,但确保同线程的消息被相同的连接处理,以确保消费顺序。 |
CachingConnectionFactory | 最推荐 | 单个连接代理,可以通过通道来区分是否支持事务,扩展性更强,也可以设置基于多个连接,更适合集群。 |
PooledChannelConnectionFactory
此工厂基于 Apache Pool2 管理单个连接和两个通道池。 一个池用于事务通道,另一个池用于非事务通道。 池是默认配置的;提供回调以配置池;有关更多信息,请参阅 Apache 文档。GenericObjectPool
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
connectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(connectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// 事务连接池
}
else {
// 非事务连接池
}
});
return pcf;
}
注意要先引入Apache的连接池包
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
ThreadChannelConnectionFactory
Apache jar 必须位于类路径上才能使用此工厂。commons-pool2
此工厂管理单个连接和两个连接,一个用于事务通道,另一个用于非事务通道。 此工厂确保同一线程上的所有操作都使用相同的通道(只要它保持打开状态)。 这有助于严格的消息排序,而无需作用域内操作。 为避免内存泄漏,如果应用程序使用许多生存期较短的线程,则必须调用工厂的线程来释放通道资源。 从版本 2.3.7 开始,线程可以将其通道传输到另一个线程。 有关详细信息,请参阅多线程环境中的严格消息排序。ThreadLocalcloseThreadChannel()
@Bean
ConnectionFactory tcf() {
com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setVirtualHost(VIRTUAL_HOST);
connectionFactory.setThreadFactory(new NamedThreadFactory("PooledChannelConnectionFactory"));
ThreadChannelConnectionFactory threadChannelConnectionFactory = new ThreadChannelConnectionFactory(connectionFactory);
return threadChannelConnectionFactory;
}
CachingConnectionFactory
既可以单个连接/多个通道也可以多个连接/多个通道,动态性更强,配置为CacheMode.CONNECTION时HA模式下更适用。
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(HOST);
cachingConnectionFactory.setPort(PORT);
cachingConnectionFactory.setUsername(USERNAME);
cachingConnectionFactory.setPassword(PASSWORD);
cachingConnectionFactory.setConnectionLimit(1);
//连接30秒后超时,超时后销毁且
cachingConnectionFactory.setChannelCheckoutTimeout(30 * 1000);
//最大缓存数量
cachingConnectionFactory.setChannelCacheSize(Runtime.getRuntime().availableProcessors());
//CacheMode.CHANNEL模式下,所有的客户端共享一个链接,不同的channel之间相互隔离。
cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
cachingConnectionFactory.setConnectionNameStrategy((connectionFactory) -> "业务系统Connection");
cachingConnectionFactory.setVirtualHost(VIRTUAL_HOST);
cachingConnectionFactory.setPublisherReturns(true);
cachingConnectionFactory.setRequestedHeartBeat(60);
cachingConnectionFactory.setConnectionThreadFactory(new NamedThreadFactory("RabbitConnection"));
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
return cachingConnectionFactory;
}
动态选这VHost
消息顺序
考虑以下情况:
- 投递时按序投递
- 消费时按序消费
由于 RabbitMQ 的异步性质和缓存通道的使用;不确定是否将使用相同的通道,因此无法保证消息到达队列的顺序。 (在大多数情况下,它们会按顺序到达,但无序交付的概率不为零)。 要解决此用例:
- 生产者在同一通道发送(发送串行化)
- 无并发消费者(单个消费者消费)
串行化通道发送
@Bean(name = "singleConnRabbitTemplate")
public RabbitTemplate singleConnRabbitTemplate() {
//创建连接工厂
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
//只开辟单个连接
connectionFactory.setChannelCacheSize(1);
connectionFactory.setChannelCheckoutTimeout(1000L);
//创建操作模板
return new RabbitTemplate(connectionFactory);
}
ThreadChannelConnectionFactory连接池也可以做到指定线程通道
消费时预拉取数量设置1且设置单个消费者
生产(推送消息)
消费(消费消息)
有两种方法可以接收 :
- 简单的选项是使用轮询方法调用一次轮询一个
- 更复杂但更常见的方法是注册异步按需接收的侦听器
轮询消费者
异步消费
- 支持@Valid 注解验证入参,校验异常被RabbitListenerErrorHandler 监听
- 单队列根据类型不同处理
原始消息信息org.springframework.amqp.core.Message
原始消息属性 MessageProperties Message
通信管道 com.rabbitmq.client.Channel
从传入的 AMQP 消息转换而来。org.springframework.messaging.Message
@Header-带注释的方法参数,用于提取特定的标头值,包括标准 AMQP 标头。
@Headers-带注释的参数,所有标头。java.util.Map
笔者后知后觉还是单写了一篇
笔者后知后觉还是单写了一篇
笔者后知后觉还是单写了一篇