MQ - 09 RabbitMQ的架构设计与实现

news2024/9/28 17:32:19

文章目录

  • 导图
  • 概述
  • RabbitMQ 系统架构
  • 协议和网络模块
  • 数据存储
    • 元数据存储 ---> 自带的分布式数据库 Mnesia
    • 消息数据存储
  • 生产者和消费者
  • HTTP 协议支持和管控操作
  • RabbitMQ 从生产到消费的全过程
  • 总结

在这里插入图片描述


导图

在这里插入图片描述


概述

最基础的消息队列应该具备通信协议、网络模块、存储模块、生产者、消费者五个模块。
在这里插入图片描述

接下来我们从消息和流的角度,分别看一下

  • 消息方向的消息队列 RabbitMQ、RocketMQ
  • 流方向的消息队列 Kafka、Pulsar

在这五个模块的实现思路和设计思想 。

今天先看看 RabbitMQ。


RabbitMQ 系统架构

我们先来看一下 RabbitMQ 的系统架构。

在这里插入图片描述

如上图所示,RabbitMQ 由 Producer、Broker、Consumer 三个大模块组成。生产者将数据发送到 Broker,Broker 接收到数据后,将数据存储到对应的 Queue 里面,消费者从不同的 Queue 消费数据。

那么除了 Producer、Broker、Queue、Consumer、ACK 这几个消息队列的基本概念外,它还有 Exchange、Bind、Route 这几个独有的概念。下面来简单解释下。

Exchange 称为交换器,它是一个逻辑上的概念,用来做分发,本身不存储数据。流程上生产者先将消息发送到 Exchange,而不是发送到数据的实际存储单元 Queue 里面。

然后 Exchange 会根据一定的规则将数据分发到实际的 Queue 里面存储。这个分发过程就是 Route(路由),设置路由规则的过程就是 Bind(绑定)。

即 Exchange 会接收客户端发送过来的 route_key,然后根据不同的路由规则,将数据发送到不同的 Queue 里面。

这里需要注意的是,在 RabbitMQ 中是没有 Topic 这个用来组织分区的逻辑概念的。RabbitMQ 中的 Topic 是指 Topic 路由模式,是一种路由模式,和消息队列中的 Topic 意义是完全不同的

那为什么 RabbitMQ 会有 Exchange、Bind、Route 这些独有的概念呢?

在我看来,主要和当时业界的架构设计思想以及主导设计 AMQP 协议的公司背景有关。当时的设计思路是:希望发消息跟写信的流程一样,可以有一个集中的分发点(邮局),通过填写好地址信息,最终将信投递到目的地。这个集中分发点(邮局)就是 Exchange,地址信息就是 Route,填写地址信息的操作就是 Bind,目的地是 Queue。

讲清楚基本概念和架构,我们就围绕着前面提到的五个模块来分析一下 RabbitMQ,先来看一下协议和网络模块。


协议和网络模块

在网络通信协议层面,RabbitMQ 数据流是基于四层 TCP 协议通信的,跑在 TCP 上的应用层协议是 AMQP。

如果开启 Management 插件,也可以支持 HTTP 协议的生产和消费。TCP + AMQP 是数据流的默认访问方式,也是官方推荐的使用方式,因为它性能会比 HTTP 高很多。

RabbitMQ 在协议内容和连接管理方面,都是遵循 AMQP 规范。即 RabbitMQ 的模型架构和 AMQP 的模型架构是一样的,交换器、交换器类型、队列、绑定、路由键等都是遵循 AMQP 协议中相应的概念

AMQP 是一个应用层的通信协议,可以看作一系列结构化命令的集合,用来填充 TCP 层协议的 body 部分。通过协议命令进行交互,可以完成各种消息队列的基本操作,如 Connection.Start(建立连接)、Basic.Publish(发送消息)等等,详细的 AMQP 协议内容可以参考文档 AMQP Working Group 1.0 Final

下面是一张生产消息流程的协议命令交互图,大概包含了建立连接、发送消息、关闭连接三个步骤。

在这里插入图片描述


讲完了协议,我们来看看网络模块。

先来看下面这张图,在 RabbitMQ 的网络层有 Connectoion 和 Channel 两个概念需要关注。

在这里插入图片描述

Connection 是指 TCP 连接,Channel 是 Connection 中的虚拟连接。两者的关系是:

  • 一个客户端和一个 Broker 之间只会建立一条 TCP 连接,就是指 Connection。
  • Channel(虚拟连接)的概念在这个连接中定义,一个 Connection 中可以创建多个 Channel。

客户端和服务端的实际通信都是在 Channel 维度通信的。这个机制可以减少实际的 TCP 连接数量,从而降低网络模块的损耗。从设计角度看,也是基于 IO 复用、异步 I/O 的思路来设计的。

从编码实现的角度,RabbitMQ 的网络模块设计会比较简单。主要包含 tcp_listener、tcp_acceptor、rabbit_reader 三个进程。如下图所示,RabbitMQ 服务端通过

  • tcp_listener 监听端口,
  • tcp_acceptor 接收请求,
  • rabbit_reader 处理和返回请求。

本质上来看是也是一个多线程的网络模型。

在这里插入图片描述


数据存储

接下来我们看看 RabbitMQ 的存储模块。

RabbitMQ 的存储模块也包含元数据存储与消息数据存储两部分。如下图所示,RabbitMQ 的两类数据都是存储在 Broker 节点上的,不会依赖第三方存储引擎。

在这里插入图片描述

我们先来看一下元数据存储。

元数据存储 —> 自带的分布式数据库 Mnesia

RabbitMQ 的元数据都是存在于 Erlang 自带的分布式数据库 Mnesia 中的。即每台 Broker 都会起一个 Mnesia 进程,用来保存一份完整的元数据信息。因为 Mnesia 本身是一个分布式的数据库,自带了多节点的 Mnesia 数据库之间的同步机制。所以在元数据的存储模块,RabbitMQ 的 Broker 只需要调用本地的 Mnesia 接口保存、变更数据即可。不同节点的元数据同步 Mnesia 会自动完成。

Mnesia 对 RabbitMQ 的作用,相当于 ZooKeeper 对于 Kafka、NameServer 对于 RocketMQ 的作用。因为 Mnesia 是内置在 Broker 中,所以部署 RabbitMQ 集群时,你会发现只需要部署 Broker,不需要部署其他的组件。这种部署结构就很简单清晰,从而也降低了后续的运维运营成本。

在一些异常的情况下,如果不同节点上的 Mnesia 之间的数据同步出现问题,就会导致不同的 Mnesia 数据库之间数据不一致,进而导致集群出现脑裂、无法启动等情况。此时就需要手动修复异常的 Mnesia 实例上的数据。

因为 Mnesia 本身是一个数据库,所以它和数据库一样,可以进行增删改查的操作。需要了解 Mnesia 的更多操作,你可以参考 ErLang Mnesia


消息数据存储

如下图所示,RabbitMQ 消息数据的最小存储单元是 Queue,即消息数据是按顺序写入存储到 Queue 里面的。在底层的数据存储方面,所有的 Queue 数据是存储在同一个“文件”里面的。这个“文件”是一个虚拟的概念,表示所有的 Queue 数据是存储在一起的意思。

在这里插入图片描述

这个“文件”由队列索引(rabbit_queue_index)和消息存储(rabbitmq_msg_store)两部分组成。即在节点维度,所有 Queue 数据都是存储在 rabbit_msg_store 里面的,每个节点上只有一个 rabbit_msg_store,数据会依次顺序写入到 rabbit_msg_store 中。

rabbit_msg_store 是一个逻辑概念,底层的实际存储单元分为两个,msg_store_persistent 和 msg_store_transient,分别负责持久化消息和非持久化消息的存储。

msg_store_persistent 和 msg_store_transient 在操作系统上是以文件夹的形式表示的,具体的数据存储是以不同的文件段的形式存储在目录中,所有消息都会以追加的形式写入到文件中。

当一个文件的大小超过了配置的单个文件的最大值,就会关闭这个文件,然后再创建一个文件来存储数据。关于 RabbitMQ 底层的数据存储结构,如下图所示:

在这里插入图片描述
队列索引负责存储、维护队列中落盘消息的信息,包括消息的存储位置、是否交付、是否 ACK 等等信息。队列索引是 Queue 维度的,每个 Queue 都有一个对应的队列索引。

RabbitMQ 也提供了过期时间(TTL)机制,用来删除集群中没用的消息。它支持单条消息和队列两个维度来设置数据过期时间。如果在队列上设置 TTL,那么队列中的所有消息都有相同的过期时间。我们也可以对单条消息单独设置 TTL,每条消息的 TTL 可以不同。如果两种方案一起使用,那么消息的 TTL 就会以两个值中最小的那个为准。如果不设置 TTL,则表示此消息不会过期。

删除消息时,不会立即删除数据,只是从 Erlang 中的 ETS 表删除指定消息的相关信息,同时更新消息对应的存储文件的相关信息。此时文件中的消息不会立即被删除,会被标记为已删除数据,直到一个文件中都是可以删除的数据时,再将这个文件删除,这个动作就是常说的延时删除。另外内核有检测机制,会检查前后两个文件中的数据是否可以合并,当符合合并规则时,会进行段文件的合并。

在了解了 RabbitMQ 的协议、网络模块和数据存储后,我们再来看一下 RabbitMQ 的生产者和消费者的实现。


生产者和消费者

当生产者和消费者连接到 Broker 进行生产消费的时候,是直接和 Broker 交互的,不需要客户端寻址。客户端连接 Broker 的方式,跟我们通过 HTTP 服务访问 Server 是一样的,都是直连的。部署架构如下图所示:

在这里插入图片描述

abbitMQ 集群部署后,为了提高容灾能力,就需要在集群前面挂一层负载均衡来进行灾备。客户端拿到负载均衡 IP 后,在生产或消费时使用这个 IP 和服务端直接建立连接。因为 Queue 是具体存储数据的单元,不同的 Queue 有可能分布在不同的 Broker 上,就有可能出现生产或消费基于负载均衡 IP 请求到的 Broker,并不是当前 Queue 所在的 Broker,从而导致生产消费失败。

为了解决这个问题,在每个 Broker 上会设置有转发的功能。在实现上,每台 Broker 节点都会保存集群所有的元数据信息。当 Broker 收到请求后,根据本地缓存的元数据信息判断 Queue 是否在本机上,如果不在本机,就会将请求转发到 Queue 所在的目标节点。

从客户端的实现来看,因为各个语言的实现机制不太一样,基础模块的连接管理、心跳管理、序列化等部分遵循各编程语言的开发规范去实现。例如网络模块的实现,如果客户端是用 Java 语言写的,那么可以使用 Java NIO 库完成网络模块的开发。客户端和服务端传输协议的内容遵循 AMQP 协议,底层以二进制流的形式序列化数据。即根据 AMQP 协议的格式构建内容后,然后序列化为二进制的格式,传递给 Broker 进行处理。

在这里插入图片描述

生产端发送数据不是直接发送到 Queue,而是直接发送到 Exchange。即发送时需要指定 Exchange 和 route_key,服务端会根据这两个信息,将消息数据分发到具体的 Queue。因为 Exchange 和 route_key 都是一个逻辑概念,数据是直接发送到 Broker 的,然后在服务端根据路由绑定规则,将数据分发到不同的 Queue 中,所以在客户端是没有发送生产分区分配策略的逻辑。其实从某种程度来看,Exchagne 和 Route 的功能就是生产分区分配的过程,只是将这个逻辑从客户端移动到了服务端而已

在消费端,RabbitMQ 支持 Push(推)和 Pull(拉)两种模式,如果

  • 使用了 Push 模式,Broker 会不断地推送消息给消费者。不需要客户端主动来拉,只要服务端有消息就会将数据推给客户端。当然推送消息的个数会受到 channel.basicQos 的限制,不能无限推送,在消费端会设置一个缓冲区来缓冲这些消息。

  • 拉模式是指客户端不断地去服务端拉取消息,RabbitMQ 的拉模式只支持拉取单条消息。

在 AMQP 协议中,是没有定义 Topic 和消费分组的概念的,所以在消费端没有消费分区分配、消费分组 Rebalance 等操作,消费者是直接消费 Queue 数据的。

为了保证消费流程的可靠性,RabbitMQ 也提供了消息确认机制。消费者在消费到数据的时候,会调用 ACK 接口来确认数据是否被成功消费。

底层提供了自动 ACK 和手动 ACK 两种机制。

  • 自动 ACK 表示当客户端消费到数据后,消费者会自动发送 ACK,默认是自动 ACK。
  • 手动 ACK 表示客户端消费到数据后,需要手动调用。

ACK 的时候,支持单条 ACK 和批量 ACK 两种动作,批量 ACK 可以用来提升 ACK 效率。另外,为了提升 ACK 动作的性能,有些客户端也支持异步的 ACK。


在了解了上述的五个模块后,最后我们来看一下 RabbitMQ 对 HTTP 协议的支持和管控操作。

HTTP 协议支持和管控操作

RabbitMQ 内核本身不支持 HTTP 协议的生产、消费和集群管控等操作。如果需要支持,则需要先手动开启 Management 插件,通过插件的形式让内核支持这个功能。

大部分情况下,都会建议你启用 Management 插件,否则集群使用就会不太方便。如下图所示,从实现上来看 Management 插件对 HTTP 协议的支持,就是在开启插件的时候,会启动一个新的 HTTP Server 来监听一个新的端口。

客户端只需要访问这个端口提供的 HTTP 接口,就可以完成 HTTP 读写数据和一些集群管控的操作。如果你想了解更多细节,可以查看这个文档 Management Plugin。

在这里插入图片描述

开启插件后,就可以通过 HTTP 接口实现生产、消费、集群的配置、资源的创建、删除等操作。比如下面是一个查看 Vhost 列表的 curl 命令示例:

curl -i --header "authorization: Bearer <token>" http://localhost:15672/api/vhosts     

RabbitMQ 从生产到消费的全过程

跟经典的消息队列一样,RabbitMQ 的生产到消费总共经过生产者、Broker、消费者三个模块。大致的流程如下:

在生产端,客户端根据 AMQP 协议定义的命令字(如 Connection.Start/Start-Ok、Connection.Tune/Tune-Ok),通过四层的 TCP 协议和 Broker 创建 Connection、Channel 进行通信。

客户端直连 Broker 服务,不需要经过寻址,然后客户端需要指定 Exchange、route_key 发送消息。因为 AMQP 没有支持批量发送的协议,消息会立即发送给给服务端。通信协议的内容格式、序列化和反序列化遵循 AMQP 的标准。

Broker 收到消息后,根据 AMQP 协议反序列化解析出请求内容。根据 Exchange 和 route_key 的信息,结合路由模式,将数据分发到具体的 Queue 中。存储层收到消息后,底层会将这条数据的结构进行整合,添加一些额外信息,如写入时间等,然后将数据写入到同一个文件存储。Broker 支持数据过期机制,当消息过期后,数据会被删除。

消费端直接指定 Queue 消费,不需要经过消费分组、分区分配的过程。消费端跟生产端一样,根据 AMQP 协议连接上 Broker 后,消费端直接从 Queue 中消费数据,消费完成后通过手动 ACK 或自动 ACK 的方式 ACK 消息。


总结

RabbitMQ 主要有 Producer、Broker、Consumer、Exchange、Queue、Route、Bind、Connection、Channel、ACK 等概念。

总结 RabbitMQ,可以从以下七个方面入手:

  • 协议层基于 AMQP 标准开发。

  • 网络层核心数据流基于 TCP 协议通信,并通过 Connection 和 Channel 机制实现连接的复用,以减少创建的 TCP 连接数量。

  • 存储层基于多个 Queue 数据统一到一个文件存储的思路设计,同时支持分段存储和基于时间的数据过期机制。

  • 元数据存储是基于 Erlang 内置的数据库 Mnesia 来实现。

  • 客户端的访问是直连的,没有客户端寻址机制。

  • 生产端是通过 Exchange 和 Route 写入数据的,生产数据的分发是在服务端完成的,其他消息队列的分发一般都是在客户端。

  • 消费端没有消费分组、消费分区分配等概念,直连 Queue 消费,同时也提供了手动和自动两种 ACK 机制。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1025961.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

socket() failed (24: Too many open files) while connecting to upstream, client

一、这个错误通常是因为文件句柄数目超过系统限制导致的。要解决这个问题&#xff0c;您可以尝试以下几个步骤&#xff1a; 调整系统文件句柄限制&#xff1a;您可以通过修改/etc/security/limits.conf文件中的nofile参数来增加系统文件句柄的最大数目。将nofile的值增加到更高…

zabbix监控平台部署(二)

目录 一、自定义监控 二、Nginx监控 三、监控mysql 四、钉钉告警 五、163邮箱报警 总结 zabbix5.0 一、自定义监控 zabbix-agent&#xff08;147&#xff09; agent端操作 vim /etc/zabbix/zabbix_agentd.conf 在配置未文件末尾添加 UserParametermemory_userd,free…

C++中 负数与String字符串的长度 string.size()作比较 输出错误

在刷题的时候&#xff0c;发现用 -1<t.size() 输出的是错误的值&#xff0c;如下&#xff0c;t“ABC”&#xff0c;但重新定义一个变量后又可以了&#xff0c;查阅检查后&#xff0c;发现string.size()返回的是一个无符号的整数&#xff0c;因此与有符号整数比较&#xff0c…

SpingBoot:整合Mybatis-plus+Druid+mysql

SpingBoot&#xff1a;整合Mybatis-plusDruid 一、特别说明二、创建springboot新工程三、配置3.1 配置pom.xml文件3.2 配置数据源和durid连接池3.3 编写拦截器配置类 四、自动生成代码五、测试六、附件-mysql数据库表 本文参考链接&#xff1a; [Java] Spring Boot 集成 MyBati…

免费、安全、可靠!一站式构建平台 ABS 介绍及实例演示 | 龙蜥技术

编者按&#xff1a;操作系统是一个大的软件集合&#xff0c;成百上千个软件之间有相互调用、相互依赖等各种复杂的关联关系&#xff0c;所以统一的软件包格式&#xff0c;能够更友好地管理、定义这些复杂关系。今天&#xff0c;龙蜥社区基础设施 Contributor 单凯伦带大家了解龙…

Python灰帽编程——网页信息爬取

文章目录 网页信息爬取1. 相关模块1.1 requests 模块1.1.1 模块中的请求方法1.1.2 请求方法中的参数1.1.3 响应对象中属性 1.2 RE 模块1.2.1 匹配单个字符1.2.2 匹配一组字符1.2.3 其他元字符1.2.4 核心函数 2. 网页信息爬取2.1 获取网页HTML 源代码2.2 提取图片地址2.3 下载图…

【机器学习】详解回归(Regression)

文章目录 是什么的问题案例说明 是什么的问题 回归分析&#xff08;Regression Analysis&#xff09; 是研究自变量与因变量之间数量变化关系的一种分析方法&#xff0c;它主要是通过因变量Y与影响它的自变量 X i &#xff08; i 1 , 2 , 3 … &#xff09; X_i&#xff08;i1…

鉴源论坛 · 观模丨基于应用程序编程接口(API)的自动化测试(下)

作者 | 黄杉 华东师范大学软件工程学院博士 苏亭 华东师范大学软件工程学院教授 版块 | 鉴源论坛 观模 社群 | 添加微信号“TICPShanghai”加入“上海控安51fusa安全社区” 上文“基于应用程序编程接口&#xff08;API&#xff09;的自动化测试&#xff08;上&#xff09;”…

阿里巴巴全店商品采集教程,阿里巴巴店铺所有商品接口(详解阿里巴巴店铺所有商品数据采集步骤方法和代码示例)

随着电商行业的快速发展&#xff0c;阿里巴巴已成为国内的电商平台之一&#xff0c;拥有着海量的商品资源。对于一些需要大量商品数据的商家或者需求方来说&#xff0c;阿里巴巴全店采集是非常必要的。本文将详细介绍阿里巴巴全店采集的步骤和技巧&#xff0c;帮助大家更好地完…

自定义实现:头像上传View

看看效果&#xff1a; 非常简单&#xff1a;代码直接贴在下面&#xff0c;有需要的直接带走 /*** 带有自定义字体TextView。*/ class EditAvatarUploadView : AppCompatTextView {lateinit var paint:Paintconstructor(context: Context) : this(context, null){iniPaint()}con…

广告电商:一种新型的电商模式

电商行业是一个竞争激烈的领域&#xff0c;要想在这个领域中脱颖而出&#xff0c;就需要不断创新和变革。广告电商就是一种新型的电商模式&#xff0c;它结合了社交电商和广告分佣的优势&#xff0c;让消费者在购物的同时可以获得积分&#xff0c;并且还能通过观看平台对接的广…

【ES6】

ES6 1 ES6简介1.1 什么是ES61.2 为什么使用ES6 2 ES6的新增语法2.1 let2.2 const2.3 let、const、var的区别2.4 解构赋值2.4.1 数组解构2.4.2 对象解构 2.5 箭头函数2.6 剩余参数 3 ES6的内置对象扩展3.1 Array的扩展方法3.1.1 扩展运算符(展开语法)3.1.2 构造函数方法&#xf…

开启潮玩文化新篇章,泡泡玛特首届海外PTS潮玩成功落地新加坡

近日&#xff0c;泡泡玛特2023 PTS潮流玩具展&#xff08;下简称新加坡PTS&#xff09;在新加坡滨海湾金沙成功举办&#xff0c;本届潮玩展主题为“Back to Play in the Garden City”&#xff0c;现场人气爆棚&#xff0c;三天吸引了超过2万观众入场&#xff0c;这也是泡泡玛特…

Hadoop源码阅读(三):HDFS上传

说明&#xff1a; 1.Hadoop版本&#xff1a;3.1.3 2.阅读工具&#xff1a;IDEA 2023.1.2 3.源码获取&#xff1a;Index of /dist/hadoop/core/hadoop-3.1.3 (apache.org) 4.工程导入&#xff1a;下载源码之后得到 hadoop-3.1.3-src.tar.gz 压缩包&#xff0c;在当前目录打开Pow…

【计算机网络】 拥塞控制

文章目录 背景TCP的四种拥塞控制算法慢开始与拥塞避免&#xff1a;快重传&#xff1a;快恢复&#xff1a; 流量控制和拥塞控制本质上的 区别 背景 网络中的链路容量和交换节点中的缓存和处理机都有着工作的极限&#xff0c;当网络的需求超过他们的工作极限时&#xff0c;就出现…

电脑回收站为什么自动清空?win10回收站自动清理的东西怎么找回

“很奇怪&#xff0c;我昨天才删除的文件&#xff0c;但是今天回收站中啥也没有了。难道win10回收站是自动清空的吗&#xff1f;&#xff1f;请问如何恢复这些文件呢&#xff1f;” ——您是否曾经遇到过回收站自动清空的情况&#xff1f;您是如何处理的呢&#xff1f;下面为大…

爬虫 — Js 逆向案例一英汉互译

目标网站&#xff1a;https://fanyi.baidu.com/ 需求&#xff1a;实现英汉互译 案例分析 1、分析网站加载方式 动态加载&#xff0c;目标 url&#xff1a;https://fanyi.baidu.com/v2transapi?fromen&tozh 2、分析请求方式 post&#xff08;携带 data 参数&#xff09…

功率放大器的特点是什么

功率放大器是电子系统中常见的一种设备&#xff0c;其主要功能是将低功率输入信号放大为高功率的信号输出。功率放大器具有多种特点&#xff0c;下面西安安泰电子将详细介绍功率放大器的几个主要特点。 功率放大器的一个主要特点是高功率输出。与信号放大器相比&#xff0c;功率…

Pytorch 深度学习实践 day01(背景)

准备 线性代数&#xff0c;概率论与数理统计&#xff0c;Python理解随机变量和分布之间的关系 人类智能和人工智能 人类智能分为推理和预测 推理&#xff1a;通过外界信息的输入&#xff0c;来进行的推测 预测&#xff1a;例如&#xff0c;看到一个真实世界的实体&#xff…

windows 电脑改成安卓桌面

windows电脑改造为类安卓的操作逻辑&#xff0c;适用于触摸屏windows系统。主要操作逻辑见下面图片列表&#xff1a; 桌面主页面 侧边工具栏 工具栏里有一些常用小工具 打开首页的应用&#xff08;edge浏览器&#xff09; 查看我的所有应用