手写消息队列(基于RabbitMQ)

news2025/1/20 5:56:13

一、什么是消息队列?

提到消息队列是否唤醒了你脑海深处的记忆?回看前面的这篇文章:《Java 多线程系列Ⅳ(单例模式+阻塞式队列+定时器+线程池)》,其中我们在介绍阻塞队列时说过,阻塞队列最大的用途就是实现 生产者消费者模型

我们知道对于生产者消费者模型来说,它具有两个十分亮眼的特点:

  1. 解耦合.
  2. 削峰填谷.

(1)解耦合
在引入生产者消费者模型之前,两台服务器之间通常是直接交互,这种交互模式使得服务器之间的耦合是非常大的。而引入生产者消费者模型之后,两台服务器之间不再进行直接通信,而是借助阻塞队列进行业务处理,起到了解耦的效果。

(2)削峰填谷
在引入生产者消费者模型之前,同样是两台服务器进行直接通信,如果在一个时间点,服务器 A 突然发送一组请求峰值,此刻服务器 B 也会随之感受到峰值,这种情况下很可能造成服务器故障。如果此时使用阻塞队列,A 将收到的请求发给队列,虽然队列中有很多请求,但是服务器 B 仍然和以按照原有的节奏读取请求。

其实正是因为生产者消费者模型具有以上诸多好处,在实际的后端开发中,特别是分布式系统里,跨主机使用生产者消费者模型是非常普遍的需求。因此通常会把阻塞队列单独分离出来,赋予更加丰富的功能,封装成一个独立的服务器程序,这个程序就称为 消息队列

二、需求整理

1、生产者消费者模型核心概念

  1. 生产者 (Producer): 负责将消息发送到消息队列中。

  2. 消费者 (Consumer): 从消息队列中接收和处理消息。。

  3. 中间人 (Broker): 它负责接收发布者发送的消息,并将这些消息存储在队列中,然后将这些消息传递给订阅者。

  4. 发布 (Publish): 生产者将消息投递到中间人的过程。

  5. 订阅 (Subscribe): 消费者在中间人这里注册的过程。只有消费者注册之后,当一个消息发布到消息队列时消息才会被发送给相应的订阅者。

根据以上概念,我们可以大致画出生产者消费者模型概念图:(PS:下面的每个模块均表示服务器)

一个生产者,一个消费者:

N 个生产者,N 个消费者:

2、Broker 设计概要

我们当前的目的是为了实现一个消息队列,其中 Broker 是最核心的部分,它主要负责消息的 存储转发,其中涉及到的核心概念如下:

  1. 虚拟主机 (VirtualHost): 类似于 MySQL 的 “database”,是⼀个逻辑上的集合。在实际的开发中一个 BrokerServer 可能会同时管理多组业务线上的数据,此时可以使用不同的 VirtualHost 进行区分。

  2. 交换机 (Exchange): 生产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则,把消息转发给不同的 Queue。

  3. 队列 (Queue): 真正用来存储消息的部分,每个消费者决定自己从哪个 Queue 上读取消息(根据订阅的队列)。⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息),一个 Queue 也可以被多个 Exchange 绑定
    (一个 Queue 中的消息可以来自于多个 Exchange)。

  4. 绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 “多对多” 关系。使用一个关联表就可以把这两个概念联系起来。

  5. 消息 (Message): 具体来说是服务器之间的请求和响应。一个消息,可以视为一个字符串(二进制数据),具体由程序员自定义。

上述概念在 Broker 中的体现如图所示:

补充说明1:数据存储

以上这些概念对应的数据,既需要在内存中存储,也需要在硬盘上存储,以内存为主,硬盘为辅:

  • 内存存储:对于 MQ 来说,能够高效的处理转发数据时非常关键的指标,因此使用内存组织上述数据,能够得到较高的效率。
  • 硬盘存储:主要是为了防止内存中的数据随着进程/主机重启而丢失。

补充说明:2: 交换机类型与转发规则

上面我们提到,在生产者发送消息时,首先会将消息发送到 Broker 的交换机上,再由交换机根据不同的规则转发到相应的队列中。在 MQ 中支持四种类型的交换机,它们分别是: Direct(直接交换机)、Fanout(扇出交换机)、Topic(主题交换机)、Header(头部交换机)。其中 Header 这种方式比较复杂,也比较少见,当前项目中主要实现了前三种,下面分别对他们进行详细介绍:

前要说明:

  • 以下 bindingKey(绑定键)是在创建队列和交换机绑定关系时指定的关键字。
  • 以下 routingKey(路由键)是生产者发送消息时指定的关键字。

(1)Direct(直接交换机)

  1. 生产者发送消息时,会指定一个"目标队列"的名字(此时的 routingKey 就是 队列的名字,bindingKey 无效)
  2. 交换机收到消息后,查看当前交换机对应的绑定里面是否存在队列名字为routingKey的队列
  3. 如果有,就转发过去(把消息塞进对应的“目标队列”中)
  4. 如果没有,消息直接丢弃

(2)Fanout(扇出交换机)

  1. 生产者无需指定routingKey,直接发送消息到指定交换机
  2. 交换机收到消息后,直接将消息转发给当前交换机已绑定的所有队列中。(此时的 bindingKey 和 routingKey 对扇出交换机无效。)

(3)Topic(主题交换机)

  1. 生产者发送消息时,指定一个 routingKey
  2. 交换机收到消息后,查看当前交换机对应的绑定中是否存在一个 bindingKey 通过一定的规则和 routingKey 相匹配
  3. 如果有,就将消息转发到对应的绑定队列中。
  4. 如果没有,则将消息丢弃。

PS:以上所有概念出自 AMQP 协议:一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

3、Broker 核心 API

Broker 基于以上概念和功能,需要实现的核心 API 如下:

  1. 创建队列 (queueDeclare)
  2. 销毁队列 (queueDelete)
  3. 创建交换机 (exchangeDeclare)
  4. 销毁交换机 (exchangeDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)

补充说明:

  1. 从上面可以看出来,在进行创建操作时并没有使用 create,而是使用 declare,这是从语义上说明,这里的创建起到的效果是不存在则创建,存在则啥也不做。
  2. 上述并没有创建一个“消费消息”的API,这是因为当前我们使用的工作模式是 push(推),Broker 会将消息主动推送给订阅的消费者。当然也有 pull(拉) 工作模式,需要消费者主动调用 Broker 的 API 获取消息,当前项目不涉及这种模式。
  3. 在MQ中有两种应答方式,一种是自动应答,这种方式下 Broker 将消息推送给订阅的消费者后就算应答完毕。另一种应答方式是手动应答,上述确认消息(basicAck)起到的效果,是可以让消费者 显式 的告诉 Broker,这个消息我处理完毕了,提高整个系统的可靠性。

4、客户端设计概要

生产者、消费者都是客户端程序,broker 则是作为服务器,通过网络进行通信。此处设定,使用 TCP + 自定义的应用层协议 实现 生产者/消费者 和 BrokerServer 之间的交互工作,这里需要给客户端提供一组 API,让客户端的业务代码来调用,从而通过网络通信的方式远程调用 brokerserver 上的方法。

客户端核心API:

  1. 创建 Connection
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel
  5. 创建队列 (queueDeclare)
  6. 销毁队列 (queueDelete)
  7. 创建交换机 (exchangeDeclare)
  8. 销毁交换机 (exchangeDelete)
  9. 创建绑定 (queueBind)
  10. 解除绑定 (queueUnbind)
  11. 发布消息 (basicPublish)
  12. 订阅消息 (basicConsume)
  13. 确认消息 (basicAck)

补充说明:

  1. 和 Broker 服务器 API 相比,客户端程序还提供了如下 4 个 API:创建 Connection、关闭 Connection、创建 Channel、关闭 Channel。
  2. 这里的一个Connection对象代表一个TCP连接。
  3. Channel 是 Connection 内部逻辑上的链接,多个Channel复用同一个TCP连接,一个Connection 中可以包含多个Channel,每个Channel负责客户端中不同的模块,其中传输的数据是互不相干的。
  4. 这样的设定主要是为了能够更好的复用 TCP 连接, 达到长连接的效果, 避免频繁的创建关闭 TCP 连接。
  5. 上述客户端提供的 API 只是给业务代码进行调用,真正的方法执行是交给了BrokerServer。这个过程称为 远程过程调用(Remote Procedure Call,简称RPC)是一种计算机通信协议,它允许程序调用另一个地址空间(通常是不同的计算机)中的过程或函数,而无需程序员显式编写网络代码。通过使用RPC,应用程序可以像调用本地进程一样调用远程服务器上的进程。

5、小结

最后简单总结一下,我们大致需要做的工作,其中涉及到的细节问题,我们后面在进行补充:

  1. 实现生产者、BrokerServer、消费者三个部分
  2. 针对生产者、消费者来说,主要编写客户端和服务器的网络通信部分。
  3. 重点实现BrokerServer,包括内部的基本概念和核心 API。
  4. 数据的持久化存储。

三、具体实现

附上连接:

1、消息队列详细设计与实现思维导图
2、Gitee 完整代码地址

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1228005.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux】:体系结构与进程概念

朋友们、伙计们,我们又见面了,本期来给大家解读一下有关Linux体系结构和进程的知识点,如果看完之后对你有一定的启发,那么请留下你的三连,祝大家心想事成! C 语 言 专 栏:C语言:从入…

「Java开发指南」如何在Spring中使用JAX-WS注释器?

本文将指导您如何使用JAX-WS注释器从Spring服务生成JAX-WS Web服务,在本教程中,您将学习如何: 为Spring服务启用JAX-WS部署应用程序并测试服务 所有与Spring scaffolding相关的任务都需要MyEclipse Spring或Bling授权。 MyEclipse v2023.1…

『力扣刷题本』:二叉树的中序遍历

一、题目 给定一个二叉树的根节点 root ,返回 它的 中序 遍历 。 示例 1: 输入:root [1,null,2,3] 输出:[1,3,2]示例 2: 输入:root [] 输出:[]示例 3: 输入:root [1…

【NI-DAQmx入门】校准

1.设备定期校准的理由 随着时间的推移电子器件的特性会发生自然漂移,可能会导致测量结果的不准确性。防止出现良品和差品筛选出错的情况满足行业国际标准降低设备出现故障的风险使测量结果更具备参考性 2.查找NI设备的校准间隔。 定期校准会使DAQ设备的精度保持在…

Linux远程工具专家推荐(二)

8. Apache Guacamole Apache Guacamole 是一款免费开源的无客户端远程桌面网关,支持 VNC、RDP 和 SSH 等标准协议。无需插件或客户端软件;只需使用 HTML5 Web 应用程序(例如 Web 浏览器)即可。 这意味着您的计算机的使用不受任何一…

线性表--链表-1

文章目录 主要内容一.链表练习题1.设计一个递归算法,删除不带头结点的单链表 L 中所有值为 X 的结点代码如下(示例): 2.设 L为带头结点的单链表,编写算法实现从尾到头反向输出每个结点的值代码如下(示例): …

Jave 定时任务:使用Timer类执行定时任务为何会发生任务阻塞?如何解决?

IDE:IntelliJ IDEA 2022.2.3 x64 操作系统:win10 x64 位 家庭版 JDK: 1.8 文章目录 一、Timer类是什么?二、Timer类主要由哪些部分组成?1.TaskQueue2. TimerThread 三、示例代码分析四、自定义TimerTask为什么会发生任务相互阻塞的…

度加创作工具 演示

度加创作工具 功能图功能测试文比润色测试经验分享测试测试输出测试输出工具地址功能图 功能测试 文比润色测试 经验分享测试 测试输出 在人工智能领域,我们一直在追求一个终极目标:让机器能够像人类一样,能够理解、学习和解决各种复杂问题。而要实现这个目标,我们需要将…

设计模式常见面试题

简单梳理下二十三种设计模式,在使用设计模式的时候,不仅要对其分类了然于胸,还要了解每个设计模式的应用场景、设计与实现,以及其优缺点。同时,还要能区分功能相近的设计模式,避免出现误用的情况。 什么是…

麒麟系统安装找不到安装源!!!!设置基础软件仓库时出错

记录--华为RH2288 V3服务器安装麒麟系统遇到的问题 1.遇到的问题--“设置基础软件仓库时出错”报错导致无法继续安装 没办法下一步 先说结论:系统bug 该问题在CentOS、Rocky Linux最新版中均存在 解决: (一)、如果是外网直接配…

【机器学习基础】决策树(Decision Tree)

🚀个人主页:为梦而生~ 关注我一起学习吧! 💡专栏:机器学习 欢迎订阅!后面的内容会越来越有意思~ ⭐特别提醒:针对机器学习,特别开始专栏:机器学习python实战 欢迎订阅&am…

metinfo 5.0.4 文件包含漏洞复现

metinfo 5.0.4 文件包含漏洞 漏洞环境 metinfo cms 版本 5.0.4 代码审计 在metinfo下的about/index.php代码中发现动态调用 上面没有赋值但是是有具体值的说明在上一个文件包含赋值了 查看这个文件的源代码 可以看到这里做了初始化但是是在fmodule不等于7的时候那假设等…

深入解析具名导入es6规范中的具名导入是在做解构吗

先说答案,不是 尽管es6的具名导入和语法非常相似 es6赋值解构 const obj {a: 1,f() {this.a}}const { a, f } objes6具名导入 //导出文件代码export let a 1export function f() {a}export default {a,f}//导入文件代码import { a, f } from ./tsVolution可以看出…

【Go入门】Web工作方式

【Go入门】 Web工作方式 我们平时浏览网页的时候,会打开浏览器,输入网址后按下回车键,然后就会显示出你想要浏览的内容。在这个看似简单的用户行为背后,到底隐藏了些什么呢? 对于普通的上网过程,系统其实是这样做的&…

gd32关于IO引脚配置的一些问题

一、gd32f103的PA15问题 1、 #define GPIO_SWJ_NONJTRST_REMAP ((uint32_t)0x00300100U) /*!< full SWJ(JTAG-DP SW-DP),but without NJTRST */ #define GPIO_SWJ_SWDPENABLE_REMAP ((uint32_t)0x00300200U) /*!< JTAG-DP disabled and SW-DP enab…

【C++】——阶段性测验(帮助巩固C++前半部分知识)

&#x1f383;个人专栏&#xff1a; &#x1f42c; 算法设计与分析&#xff1a;算法设计与分析_IT闫的博客-CSDN博客 &#x1f433;Java基础&#xff1a;Java基础_IT闫的博客-CSDN博客 &#x1f40b;c语言&#xff1a;c语言_IT闫的博客-CSDN博客 &#x1f41f;MySQL&#xff1a…

安装2023最新版PyCharm来开发Python应用程序

安装2023最新版PyCharm来开发Python应用程序 Install the Latest JetBrains PyCharm Community to Develop Python Applications Python 3.12.0最新版已经由其官网python.org发布&#xff0c;这也是2023年底的最新的版本。 0. PyCharm与Python 自从1991年2月20日&#xff0…

Python---练习:封装一个函数,用于生成指定长度的验证码

练习涉及相关链接&#xff1a;Python---练习&#xff1a;编写一段Python代码&#xff0c;生成一个随机的4位验证码-CSDN博客 Python----函数中的说明文档-CSDN博客Python---return返回值-CSDN博客 代码&#xff1a; # 定义一个generate_code()函数 def generate_code(num): …

语聚AI:无代码开发的API连接新选择,助力电商平台客户服务提升

无代码开发&#xff1a;语聚AI的新选择 在企业运营中&#xff0c;客户服务扮演着重要的角色。然而&#xff0c;许多企业在日常的客服管理中面临着重复咨询、人工接待成本高、缺乏知识库支持以及客服渠道分散等问题。如何提高客服的效率和质量&#xff0c;成为了企业急需解决的…

最全的接口自动化测试思路和实战:【推荐】混合测试自动化框架(关键字+数据驱动)

混合测试自动化框架(关键字数据驱动) 关键字驱动或表驱动的测试框架 这个框架需要开发数据表和关键字。这些数据表和关键字独立于执行它们的测试自动化工具&#xff0c;并可以用来“驱动&#xff02;待测应用程序和数据的测试脚本代码&#xff0c;关键字驱动测试看上去与手工测…