“仿RabbitMQ实现消息队列”---整体架构与模块说明

news2025/1/12 20:42:17

顾得泉:个人主页

个人专栏:《Linux操作系统》 《C++从入门到精通》  《LeedCode刷题》

键盘敲烂,年薪百万!


一、概念性框架理解

我们主要实现的内容:

       1.Broker服务器:消息队列服务器(服务端

       2.消息发布客户端:向服务器发布消息

       3.消息订阅客户端:从服务器订阅消息

    broker服务器是我们最核心的部分,负责消息的存储和转发。

       而我们使用的AMQP(Advanced Message Queuing Protocol-高级消息队列协议,其中一个提供统一消息服务的应用层标准高级消息队列协议,为面向消息的中间件设计,使得遵从该规范的客户端应用和消息中间件服务器的全功能互操作成为可能)模型中,也就是消息中间件服务器Broker中,又存在以下概念:

       虚拟机(VirtualHost):类似于MySQL的"database",是一个逻辑上的集合。一个BrokerServer上可以存在多个VirtualHost
       交换机(Exchange):生产者把消息先发送到Broker的Exchange 上,再根据不同的规则,把消息转发给不同的 Queue
       队列(Queue):真正用来存储消息的部分,每个消费者决定自己从哪个Queue上读取消息
       绑定(Binding):Exchange和Queue之间的关联关系,Exchange和Queue可以理解成"多对多"关系,使用一个关联表就可以把这两个概念联系起来
       消息(Message):传递的内容


二、服务端模块概要设计

一、交换机数据管理模块

1.要管理的数据:描述了一个交换机应该有什么数据

  1.交换机名称:唯一标识
  2.交换机类型:决定了消息的转发方式
       每个队列绑定中有个binding_key,每条消息中有个routing_key

       1.直接交换: binding_key与routing_key相同,则将消息放入队列

       2.广播交换:将消息放入交换机绑定的所有队列中
       3.主题交换: routing_key与多个绑定队列的binding_key有匹配规则,匹配成功了则放入

  3.持久化标志:决定了当前交换机信息是否需要持久化存储
  4.自动删除标志:指的是关联了当前交换机的所有客户端都退出了,是否要自动删除交换机

  5.交换机的其他参数:当前未使用。

2.对交换机的管理操作:

    1.创建交换机:本质上需要的是声明-------强断言的思想-有就OK,没有则创建的意思
    ⒉删除交换机:注意事项--每个交换机都会绑定一个或多个队列(意味着会有一个或多个绑定信息),因此删除交换机需要删除相关绑定信息

    3.获取指定名称交换机
    4.获取当前交换机数量

二、队列数据管理模块

1.要管理的数据:

    1.队列名称:唯一的标识
    2.持久化存储标志:决定了是否将队列信息持久化存储起来,决定了重启后,这个队列还是否存在

    3.是否独占标志:独占就指的是,只有当前客户端自己能够订阅队列消息
    4.自动删除标志:当订阅了当前队列的所有客户端退出后,是否删除队列((暂不考虑)。

    5.其他参数:(暂不考虑)

2.提供的管理操作(还就是增删查三个操作)

    1.创建队列
    2.删除队列
    3.获取指定队列信息

    4.获取队列数量
    5.获取所有队列名称

       当系统重启后,需要重新加载数据,加载历史消息(消息以队列为单元存储在文件中)
而加载消息需要知道队列名称,因为后边消息存储的时候,存储文件以队列名称进行的取名

       一个队列如果持久化标志为false,则意味着重启后,队列就没了,也没有客户端能够订阅队列的消息,因此这个队列的消息如果持久化存储了,是没有意义,因此通常一个队列的持久化标志是false,那么它的消息也就不需要持久化。

三、绑定数据管理模块

管理的数据:

    1.交换机名称

    2.队列名称
    3. binding_key:
绑定密钥--描述了在交换机的主题交换&直接交换的消息发布匹配规则
       由数字,字符,_,#,.,*组成:

           binding_key: news.music.#      routing_key: news.sport.football

管理的操作:

    1.添加绑定

    2.解除绑定
    3.获取交换机相关的所有绑定信息:

       1.删除交换机的时候,要删除相关绑定信息
       2.当消息发布到交换机,交换机得通过这些信息来将消息发布到指定队列

    4.获取队列相关的所有绑定信息:
       删除队列的时候,要删除相关的绑定信息

    5.获取绑定信息数量

四、消息数据管理模块

1.消息信息:

消息属性:

    ID:消息的唯一标识
    持久化标志:表示是否对消息进行持久化(还取决于队列的持久化标志)
    routing_key:决定了当前消息要发布的队列(消息发布到交换机后,根据绑定队列的binding_key决定是否发布到指定队列)

消息主体:消息内容

    --以下是服务端为了管理所添加的信息
    存储偏移量:消息以队列为单元存储在文件中,这个偏移量,是当前消息相对于文件起始位置的偏移量

    消息长度:从偏移量位置取出指定长度的消息(解决粘包问题)
    是否有效标志∶标识当前消息是否已经被删除

       删除一条消息,并不会每次直接将后边的数据拷贝到前边,而只是重置了标志,当一个文件中,有效消息占据总消息比例不到50%,且数据量超过2000,则进行垃圾回收,重新整理文件数据存储*当系统重启,也只需要重新加载有效消息即可(相当于进行了一次垃圾回收)

2.消息的管理

管理方式:以队列为单元进行管理(因为消息的所有操作都是以队列为单元的)

管理数据:

    1.消息链表:保存所有的待推送消息
    2.待确认消息hash:消息推送给客户端后,会等待客户端进行消息确认,收到确认后,才会真正删除消息
    3.持久化消息hash:假设消息都会进行持久化存储,操作过程中会存在垃圾回收操作,但是垃圾回收会改变消息的存储位置。但是内存中的消息也会存储消息的实际存储位置,垃圾回收后就不一致了,因此每次垃圾回收后,都需要用新的位置,去更新持久化消息的信息。垃圾回收:将有效消息读取出来,然后重新截断文件,将消息连续写入文件中(文件中都是有效消息)

    4.持久化的有效消息数量
    5.持久化的总的消息数量:
决定了什么时候进行垃圾回收。

管理操作:

    1.向队列新增消息
    2.获取队首消息:
获取消息后,就会将消息从待推送消息链表删除(不再是待发送消息,而是待确认消息),加入到待确认消息中

    3.对消息进行确认:从待确认消息中移除消息,并进行持久化数据的删除
    4.恢复队列历史消息:主要是在构造函数中进行(只有在重启的时候才会进行)
    5.垃圾回收(消息持久化子模块完成)∶持久化文件中有效消息比例小于50%,且总消息数量超过200进行垃圾回收

    6.删除队列相关消息文件:当一个队列被删除了,那它的消息也就没有存在的意义了。

3.队列消息管理

    1.初始化队列消息结构
    2.移除队列消息结构:
在一个队列创建/删除的时候调用

    3.向队列新增消息
    4.对队列消息进行确认

    5.恢复队列历史消息

五、虚拟机数据管理模块

       对交换机+队列+绑定+消息数据管理的整合

要管理的数据:

    1.交换机数据管理句柄

    2.队列数据管理句柄

    3.绑定信息数据管理句柄

    4.消息数据管理句柄

要管理的操作:

    1.声明/删除交换机:注意---在删除交换机的时候要删除相关的绑定信息
    2.声明/删除队列:注意--在删除队列的时候,要删除相关的绑定信息以及消息数据

    3.队列的绑定/解除绑定:注意--绑定的时候,必须交换机和队列是存在的
    4.获取指定队列的消息
    5.对指定队列的指定消息进行确认
    6.获取交换机相关的所有绑定信息:
一条消息要发布给指定交换机的时候,交换机获取所有绑定信息,来确定消息要发布到哪个队列。

六、交换路由模块

       决定了一条消息是否能够发布到指定的队列

    在每个队列跟交换机的绑定信息中,都有一个binding_key:这是队列发布的匹配规则

    在每条要发布的消息中,都有一个routing_key:是消息的发布规则

    交换机有三种交换类型:直接,广播,主题

       广播:直接将消息发布给交换机的所有绑定队列

       直接: routing_key与binding_key完全一致则匹配成功

       主题: binding_key中是匹配规则news.music.#,routing_key是消息规则news.music.pop,匹配成功才能发布

路由匹配模块本质上来说,没有要管理的数据,只有向外提供的路由匹配操作:

    1.提供一个判断routing_key与binding_key是否能够匹配成功的接口
    2.判断routing_key是否符合规定:
       格式约定:只能由数字,字母,_﹒构成

    3.判断binding_key是否符合规定:
       格式约定∶只能由数字,字母,_.#*构成

七、消费者管理模块

    客户端有两种:发布消息,订阅消息
    因此订阅了指定队列消息的客户端才是一个消费者。
    消费者数据存在的意义:当指定队列有了消息以后,就需要将消息推送给这个消费者客户端(推送的时候就需要找到这个客户端相关的信息--连接)

消费者信息:

1.消费者标识--tag
⒉订阅队列名称:当当前队列有消息就会推送给这个客户端,以及当客户端收到消息,需要对指定队列的消息进行确认
3.自动确认标志:自动确认---推送消息后,直接删除消息不需要额外确认,手动确认---推送消息后,需要等到收到确认回复再去删除消息4.消费处理回调函数指针:队列有一条消息后,通过哪个函数进行处理(函数内部其实逻辑固定---向指定客户端推送消息)

消费者管理:

管理思想:以队列为单元进行管理

    每个消费者订阅的都是指定队列的消息,消费者对消息进行确认也是以队列进行确认。
    最关键的是:当指定队列中有消息了,必然是获取订阅了这个队列的消费者信息进行消息推送

队列消费者管理结构:

    数据信息:消费者链表-…保存当前队列的所有消费者信息(RR轮转每次取出下一个消费者

进行消息推送--一条消息只需要被一个客户端处理即可)

    管理操作:

       1.新增消费者 2.RR轮转获取一个消费者 3.删除消费者 4.队列消费者数量 5.是否为空

管理操作:

    1.初始化队列消费者结构

    2.删除队列消费者结构

    3.向指定队列添加消费者

    4.获取指定队列消费者
    5.删除指定队列消费者

八、信道管理模块

       信道管理: Channel

    信道是网络通信中的一个概念,叫做通信通道。
    网络通信的时候,必然都是通过网络通信连接来完成的,为了能够更加充分的利用资源,因此对通信连接又进行了进一步的细化,细化出了通信通道。

    对于用户来说,一个通信通道,就是进行网络通信的载体,而一个真正的通信连接,可以创建出多个通信通道
    每一个信道之间,在用户的眼中是相互独立的,而在本质的底层它们使用同一个通信连接进行网络通信。
    因此,因为信道是用户眼中的一个通信通道,所以所有的网络通信服务都是由信道提供的。

信道提供的服务操作:

1.声明/删除交换机

2.声明/删除队列
3.绑定/解绑队列与交换机
4.发布消息/订阅队列消息/取消队列订阅/队列消息确认

信道要管理的数据:

    0.信道ID
    1.信道关联的虚拟机句柄

    2.信道关联的消费者句柄:当信道关闭的时候,所有关联的消费者订阅都要取消,相当于删除所有的相关消费者。

    3.工作线程池句柄:信道进行了消息发布到指定队列操作之后;从指定队列获取一个消费者,对这条消息进行消费,也就是将这条消息推送给一个客户端的操作交给线程池执行。并非每个信道都有一个线程池,而是整个服务器有一个线程池,大家所有的信道都是通过同一个线程池进行异步操作而已

信道的管理:

1.创建一个信道   2.关闭一个信道   3.获取指定信道句柄

九、连接管理模块

       概念:网络通信连接

    在网络通信模块中,我们使用muduo库来实现底层通信,muduo库中本身就有Connection连接的概念和对象类。但是我们的连接中,还有一个上层通信信道的概念,这个概念在muduo库中是没有的。
    因此,我们需要在用户的层面,对这个muduo库中的Connection连接进行二次封装。形成我们自己所需的连接管理。

管理数据:

1.muduo库的通信连接
2.当前连接关联的信道管理句柄

连接提供的操作:

1.创建信道   2.关闭信道

管理的操作:

1.新增连接   2.关闭连接   3.获取指定连接信息


三、客户端模块概要设计

一、消费者管理模块

    1.消费者标识

    2.订阅的队列名称

    3.自动确认标志

    4.消息回调处理函数指针

       当当前消费者订阅了某一个队列的消息,这个队列有了消息后,就会将消息推送给这个客户端,这时候收到了消息则使用回调函数进行处理,处理完毕后根据确认标志决定是否进行消息确认。

       管理操作:增删查

二、信道管理模块

       所有提供的操作与服务端雷同,因为客户端给用户要提供什么服务,服务器就要给客户端提供什么服务。

管理信息:

    0.信道ID
    1.消费者管理句柄:
每个信道都有自己相关的消费者
    2.线程池句柄:对推送过来的消息进行回调处理,处理过程通过工作线程来进行

    3.信道关联的连接

信道提供的服务:

    1.声明/删除交换机
    2.声明/删除队列
    3.绑定/解绑队列与交换机
    4.发布消息/确认消息
    5.订阅队列消息/取消订阅队列消息
    6.创建/关闭信道

信道的管理:信道的增删查

三、连接管理模块

    客户端连接的管理,本质上是对客户端TcpClient的二次封装和管理。
    面对用户,不需要有客户端的概念,连接对于用户来说就是客户端,通过连接创建信道,通过信道完成自己所需服务因此,当前客户端这边的连接,对于用户来说就是一个资源的载体。

管理操作:

    1.连接服务器        2.创建信道        

    3.关闭信道        4.关闭连接

管理的资源:工作线程池,连接关联的信道管理句柄

四、异步工作池模块

    1.TcpClient模块需要一个EventLoopThread模块进行IO事件监控。

    2.收到推送消息后,需要对推送过来的消息进行处理,因此需要一个线程池来帮助我们完成消息处理的过程。

       将异步工作线程模块单独拎出来,原因是多个连接用一个EventLoopThread进行I0事件监控就够了,以及所有的推送消息处理也只需要有一个线程池就够了。

       并不需要每个连接都有一个EventLoop,也不需要每个信道的消息处理都有自己的线程池。


四、项目模块整体关系图


结语:关于项目本次的分享到这里就结束了,如果大家有什么问题,欢迎大家在评论区留言~~~ 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1721003.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

新宏观范式和产业趋势下,纷享销客如何助力企业出海?

出海,已不再是企业的“备胎”,而是必须面对的“大考”!在这个全球化的大潮中,有的企业乘风破浪,勇攀高峰,也有的企业在异国他乡遭遇了“水土不服”。 面对“要么出海,要么出局”的抉择&#xff…

阿里云 通过EIP实现VPC下的SNAT以及DNAT

192.168.0.85 有公网地址192.1680.95无公网地址 在192.168.0.85(有公网地址服务器上操作) #开启端口转发 echo "net.ipv4.ip_forward 1" >> /etc/sysctl.conf sysctl -p#仅允许192.168.0.95 iptables -t nat -I POSTROUTING -s 192.16…

【为什么 Google Chrome 打开网页有时极慢?尤其是国内网站,如知网等】

要通过知网搜一点资料,发现怎么都打不开。而且B站,知乎这些速度也变慢了!已经检查过确定不是网络的问题。 清空了记录,清空了已接受Cookie,清空了缓存内容……没用!!! 不断搜索&am…

eDP V1.4协议介绍

一、说明 eDP的全称是Embedded DisplayPort嵌入式显示端口,主要应用与短距离系统内应用,例如手机、一体式台式机等。eDP V1.4b是基于DP V1.3标准制作完成,但因应用场景的不同,还是有很多区别。 电压摆幅不同,eDP相对较低; eDP功耗相对较低; DP有线材和连接器的要求,eD…

【风控】可解释机器学习之InterpretML

【风控】可解释机器学习之InterpretML 在金融风控领域,机器学习模型因其强大的预测能力而备受青睐。然而,随着模型复杂性的增加,模型的可解释性逐渐成为一个挑战。监管要求、业务逻辑的透明度以及对模型决策的信任度,都迫切需要我…

Java垃圾回收_1

一、垃圾回收 1.如何判断对象可以回收 (1)引用计数法 存在循环引用问题, Java未使用这种算法 在引用计数法中,每个对象都有一个引用计数器,记录着指向该对象的引用数量。当引用计数器为零时,表示没有任…

如何矢将量数据转换为栅格数据

在我们分析GIS数据时,有时候也可能需要将矢量数据转换为栅格数据来使用,例如:使用AI图像识别技术进行GIS数据分析或导航的时候!矢量数据就可能不满足需求了! GIS数据转换器-矢量V5.0具有矢量数据转换为栅格数据的功能…

vue:实现丝滑上传进度条

一、效果展示 缓若江海凝清光 . 二、代码 const uploadProgress ref(); //上传进度//进度丝滑更新 //进度,时常 const ProgressChange (targetPercent: number, duration: number) > {//performance.now() 是浏览器提供的一个高性能时间 API,它返…

openfiler安装部署-1

openfiler安装部署 简介1 下载openfiler2 openfiler 安装2.1 vmware 典型配置2.2 稍后安装操作系统2.3 新建虚拟机向导2.4 命名虚拟机2.5 指定磁盘容量2.6 添加系统镜像,准备安装系统2.7 启动安装系统2.8 初始化磁盘,选择"Yes"2.9 创建分区&am…

【NVM】nvm常用命令,切换node版本命令

nvm常用的命令,切换node版本命令 nvm 查看支持安装的node版本 nvm list available nvm安装指定版本node nvm install 版本号 例如:nvm install 10.24.1 nvm查看本机安装所有node版本 nvm list nvm切换node版本 nvm use 10.24.1 检测当前node版本 node -…

Stable Diffusion Webui--安装与使用

最近进行的课程汇报,学习了2023年的CVPR文章《DreamBooth: Fine Tuning Text-to-Image Diffusion Models for Subject-Driven Generation》,因此尝试使用了几种方法对这篇文章的工作进行了一定的复现。本文主要介绍Stable Diffusion Web UI(webui)的安装…

CRMEB多店版v3.0前端技术革新与实践

摘要 随着移动互联网技术的飞速发展,用户对移动应用的体验要求日益提高。CRMEB多店版v3.0作为一款针对多门店管理的电商系统,在前端技术层面进行了全面的革新与优化。本文将从移动端UI设计、页面功能更新、DIY设计功能升级、移动端平台与门店管理、营销…

软件行业人均工资多少?20万已完胜大多数

本篇文章继续讨论中国软件行业的人效比。(金融科技公司的人效比链接在这里。) 这次选择了7家公司:软通动力、用友网络、中科软、东软集团、航天信息、东华软件、中国软件,均是中国软件行业排名比较靠前、业务相对纯粹的软件公司。…

机器学习笔记——逻辑斯蒂回归

参数化模型与非参数化 像前面的KNN模型,不需要对f的形式做出假设,在学习中可以得到任意的模型叫非参数化 而需要对参数进行学习的模型叫参数化模型,参数化限制了f的可能的集合,学习难度相对较低 逻辑斯蒂回归 逻辑斯蒂函数 似…

FPGA-ARM架构与分类

ARM架构,曾称进阶精简指令集机器(Advanced RISC Machine)更早称作Acorn RISC Machine,是一个32位精简指令集(RISC)处理器架构。 主要是根据FPGA zynq-7000的芯片编写的知识思维导图总结,废话不多说自取吧 …

等保测评 | 等保测评简介及流程具体是什么?

等保测评是指对信息系统进行安全性评估和测试,以确保其符合国家相关等级保护要求。在当前信息时代,各类机构和企业面临着日益严峻的网络安全风险,等保测评成为了保障信息系统安全的重要手段之一。本文将介绍等保测评的基本概念、流程和重要性…

通过 SFP 接口实现千兆光纤以太网通信2

Tri Mode Ethernet MAC IP 核结构 时钟网络 IP 核内部时钟网络结构如下图所示。其中,tx_mac_aclk 为 AXI-Stream 发送接口的同步时钟, rx_mac_aclk 为 AXI-Stream 接收接口的同步时钟。由于在设计中没有使用 MDIO 接口,所以不存在时钟信号 …

Linux【安全 02】OpenSSH漏洞修复(离线升级最新版本流程)网盘分享3个安装包+26个离线依赖

OpenSSH离线升级最新版本流程 1. 漏洞信息2. 环境说明3.依赖安装3.1 在线安装3.2 离线安装 4.备份卸载4.1 备份4.2 卸载旧版本 5.安装5.1 zlib5.2 ssl5.3 openssh5.3.1 安装5.3.2 配置 6.脚本整理7.文件资源 本文仅针对CentOS7.8版本,其他版本未测试,安装…

Java18新版本特性!

Java 18引入了多项新特性,主要包括默认UTF-8字符集、简单的Web服务器、栈步进API等。Java 18是Oracle在2022年发布的版本,其旨在通过一系列创新特性来提升开发效率与性能。下面将逐一探讨Java 18的主要新特性以及它们对开发者的具体影响: 默认…

“迎七一、学党史、祭英烈”活动在孙善师孙善帅烈士故居启动

临沂信息联播讯(张春兄、冯爱云) 5月30日,山东省著名烈士孙善师孙善帅故居迎来了山东全味时间企业管理咨询服务有限公司、志林丽虹沂蒙文化传播(临沂)有限公司、山东志林搏击健身有限公司的参观团队,标志着…