消息队列的模拟实现(一)

news2025/1/16 7:50:33

消息队列的模拟实现(一)

    • 认识消息队列
    • 生产者消费者模型两大特征
      • 市面上可见的消息队列`MQ`
        • 消息队列的特点:
    • 模拟实现消息队列
      • 模型分类
      • 提供的核心API
            • 消息队列的推拉模式
      • 交换机的类型
      • 持久化
      • 网络通信
      • 额外提供的方法
        • 使用一个TCP和信道之间的区别
      • 消息应答
      • 使用数据库`SQLite`
        • 总结

认识消息队列

根据block queue(阻塞队列)->是一个生产者消费者模型。(是在一个进程内部进行的

而所谓的消息队列就是将阻塞队列这种数据结构,单独提取出作为一个功能来实现,独立进行部署也是所谓的生产者消费者模型。(进程和进程之间,服务器和服务器之间)

在一个分布式系统中,由一组服务器构成的“集群”。

综上所述,严谨一点:

  1. 消息队列是一种在应用程序之间进行异步通信的中间件。它允许发送和接收消息的应用程序之间可以独立于彼此的运行,而不需要直接连接。

  2. 消息队列主要用于解耦发送方和接收方之间的耦合性,提高系统的可扩展性和可靠性

生产者消费者模型两大特征

  • 解耦和

    两个服务器或者进程中间有一个相对于缓存区的消息队列,两个服务器在对消息队列进行交互,如:A发请求到消息队列,而B从消息队列中读取请求,然后返回响应到消息队列,A在消息队列中获取响应。

  • 削峰填谷

    根据不同时期数据量的大小,利用消息队列让一个服务器在大多数情况下可以正常运行。

    比如:A是一个入口服务器,A调用B完成一些具体的业务。在A,B直接通信,如果A突然收到大量数据,达到请求峰值,B随着也会感到峰值的出现。而在引入消息队列,就算A达到峰值时,B也可以仍然按照原来的方式进行读取请求,不会感到大量压力。

市面上可见的消息队列MQ

  • RabbitMQ
  • Kafka
  • RockrtMQ
  • ActiveMQ

消息队列的特点:

  1. 可靠性
    • 消息队列提供持久化的传输消息机制,确保发送的消息不会丢失。
  2. 异步型
    • 消费者和生产者的操作是异步执行的,使得应用程序可以在发送和接收之间可以独立运行
  3. 解耦性
    • 分离了两种模式,生产者和消费者之间的解耦性得到减少,从而提高系统的灵活性和可维护性
  4. 缓冲能力
    • 消息队列相对于生产者和消费者之间的缓存区,相对于削峰填谷
  5. 扩展性
    • 可以通过增加生产者和消费者的方式来扩展系统的处理业务能力。

模拟实现消息队列

消息队列的运行原理:

消息队列总的来说是一个生产者消费者模型。生产者负责生产消息存储在队列中,而消费者在队列中消费消息。

既然我们要模拟实现一个消息队列,就不得不分析一下所谓的消息队列究竟是如何实现的。

  1. 需求分析,参考MQ的功能
  2. 了解核心概念(生产者消费者模型)
  3. 中间人(Broker)
  4. 发布(Publish)—》生产者向中间人投递消息的过程
  5. 订阅(Subscribe)—》确认那些消费者在中间人这里取出数据,这个注册过程,叫订阅
  6. 消费(consume)–》消费者从中间人这里取数据

模型分类

服务器中传递数据

  1. 一对一
  2. 一对多
  3. 多对多

在这里插入图片描述

中间人服务器(Broker Server)中包含的内容:

  • 虚拟机(Virtual Host)。类似于MySQL数据库中一个Database,相当于一个数据集合。
  • 交换机(Exchange)。相当于生产者将消息投递给Broke server中的某个交换机,再有交换机将消息传递给对应的队列。
  • 队列(Queue)。真正用于存储消息实体,后续消费者将从对应的队列中读数据。
  • 绑定(Binding)。将交换机和队列,建立关联关系,关系可能是一个交换机对应一个队列,可能多个交换机对应一个队列。
  • 消息(message)。 就像请求与响应都是一个消息,用于传递的一种介质。

RabbitMQ就是以上的概念来组织的,被称为AMQP协议。

提供的核心API

  1. 创建队列(QueueDeclare
    • 存在不创建,不存在创建
  2. 销毁队列(QueueDelete
  3. 创建交换机(exchangeDeclare)
  4. 销毁交换机(exchangeDeclare)
  5. 创建绑定(QueueBind)
  6. 解除绑定(QueueUnbind)
  7. 发布消息(basicPublic)
  8. 订阅消息(basicConsume)
  9. 确认信息(BasicACK
    • 该API主要是让消费者告诉Broke server,这个消息已经处理完毕,提高系统的可靠性和传输成功率高。
消息队列的推拉模式

推拉模式指的是消息队列其中的发送和接收方式

RabbitMQ只支持推的方式,kafka都支持

push:Broke把接收到的数据,主动发给订阅者,订阅者在队列中等待,并实时接收到新的消息,这种模式下,消费者无需主动请求消息,而是由生产者主动推送给他们。

  • 优点:实时性高
  • 缺点:浪费短暂的空闲时间
  • 适用场景:需要及时响应的场景

poll:订阅者主动调取broke中的数据,消费者定期轮询或发送请求来获取新的消息。

  • 优点:消费者有更大的控制权
  • 缺点:实时性低,可能第一时间无法获取到消息
  • 适用场景:对消费者的读取消息有时间要求的场景

交换机的类型

交换机在转发消息时,有一套转发规则,对RubbitMQ存在四种不同的交换机,来描述不同的交换规则。

交换机与交换机的转发规则

  1. Direct直接交换机
    • 生产者发送消息的时候,会指定一个目标队列的名字,交换机收到以后,查看绑定的队列,有则转发,无则丢弃。
  2. Fanout扇出交换机
    • 将收到的消息转发到每一个绑定的队列
  3. Topic主题交换机
    • bindingKey:把队列和交换机绑定的时候,指定的一个单词(暗号)。
    • routingKey:生产者发送消息时,也指定一个单词。
    • 当两个key对上时,才会将消息转发到消息队列中
  4. Header消息头交换机

只实现前三种,前三种比较常用!

三种操作的应用场景:

  • Direct 专属交换机,只有指定队列可以使用
  • FanOut 通用交换机,只要是队列就可以使用
  • Topic 只有对准了"暗号"的队列,才可以使用

持久化

Exchange、Queue、Binding 、Message的这些内容都需要持久化,都需要让BrokeServer组织管理,使用两种存储方式:

  1. 内存:方便使用
  2. 硬盘:重启数据不丢失,需要设定消息持久化

对于消息队列,能够高效的处理数据,是非常关键的指标,使用内存存储数据可以达到很高效的处理数据速度,不过内存有一个缺点就是关机后会自动清楚数据,所以需要使用硬盘来存储数据。

该持久化就是两个存储方式之间的对比,硬盘的持久化也是相对于内存持久

网络通信

各种服务器(生产者/消费者)通过网络,通过broke server进行交互,这里设定使用TCP自定义协议来实现以上两者的交互工作。在网络通信的过程中,客户端需要提供对应的Api来实现对服务器的操作

额外提供的方法

  • 创建 Connection
  • 关闭 Connection

以上两个操作相对于一个TCP连接

  • 创建 Channel
  • 关闭Channel

通信/信道

使用一个TCP和信道之间的区别

一个Connection中包含多个Channel,每个channel中的数据是毫不相干的,就像进程之间是独立的。因为TCP建立连接消耗资源,频繁创建和销毁更是不利于程序的高效运行,所以更多时候不会频繁断开连接。使用Channel是一个轻量型通信,比TCP连接和断开,节约更多的资源。

消息应答

当消息队列中的消息被消费,需要进行应答。

两种应答模式:

  1. 自动应答(消费者自行取走消息,消息丢失也无碍)
  2. 手动应答(可靠性高,通常需要消费者主动来调用方法,一般用于重要信息的应答方式)

应答模式是为了保证消息确实被消费者处理成功了,在需要时间可靠性强的场景中比较常见。

使用数据库SQLite

Mysql是一个比较重量型的数据库,为了让项目更加轻便快捷,采用SQLite数据库。

一个完整的SQLite只是一个可执行文件,是一个本地数据库,该数据库直接操作的是系统中的硬盘文件。

在Java中使用Sqlite不需要安装,只需要引入依赖即可使用,自动加载jar包和动态文件。

手动安装方式:下载SQLite点击安装,只是一个exe文件。

MysqlSQLite之间,后者是不需要设置用户和密码的!原因是Sqlite是一个只允许单个用户使用的数据库,而MySql支持多用户使用,所以需要验证密码准确性。

总结

要实现一个消息队列首先需要进行项目分析,列出项目组成,然后一一细分其中功能。其实做每一个项目都是这样的,当然如果是你自己创建一个新的项目其实也是需要进行项目分析和项目组成,然后得到你所期待的功能

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/926729.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机网络-笔记-第一章-计算机网络概述

目录 一、第一章——计算机网络概述 1、因特网概述 (1)网络、互联网、因特网 (2)因特网发展的三个阶段 (3)因特网服务的提供者(ISP) (4)因特网标准化工…

源代码加密、防泄密软件

企业源代码防泄密是指企业采取措施保护其软件或应用程序源代码不被未授权的人员获取、泄露或盗用的一种安全措施。源代码是软件的核心组成部分,其中包含了程序员编写的具体指令和算法,可以被计算机理解和执行。泄漏企业的源代码可能导致严重的后果&#…

DEIF SCM-1测量模块

参数测量: SCM-1测量模块通常用于测量电力系统的各种参数,例如电压、电流、频率、功率因数等。 监测功能: 它能够实时监测电力系统的性能,以确保其在正常运行范围内。 通信接口: DEIF的测量模块通常具有通信接口&…

批量随机改名并自定义长度,让文件夹命名更随心

大家好!你是否曾经为批量改名文件夹而苦恼过?现在,我们为你带来了解决方案!我们的工具可以帮助你轻松批量给文件夹进行随机改名,并且还可以自定义文件夹名的长度,让你的文件夹命名更加随心和个性化。 首先第…

内存管理:TLSF算法原理分析

1、动态内存分配DSA: 动态内存分配(DSA)在计算机中十分重要,其主要用于在程序运行时,根据需要分配和释放内存。 (1)、DSA的几个要点分别为: 内存管理方式:动态内存分配与静态内存分配 相对应&…

JMeter性能测试(上)

一、基础简介 界面 打开方式 双击 jmeter.bat双击 ApacheJMeter.jsr命令行输入 java -jar ApacheJMeter.jar 目录 BIN 目录:存放可执行文件和配置文件 docs目录:api文档,用于开发扩展组件 printable-docs目录:用户帮助手册 li…

在线流程图软件哪个好?5款打工人必备的效率神器!

​流程图是可视化工具的一种,被广泛用于呈现和理解复杂的流程和工作流程。本篇文章我们将向你介绍5款优秀的在线流程图软件,助你提升工作效率,它们分别是:boardmix、Lucidchart、draw.io、Creately、Coggle。 在选择在线流程图软…

对numpy以及pandas中axis的理解

用线代的概念来理解轴,也就是dimension 在numpy中,最小的一维数组就可以看做是一个行列式,通常一个行列式写作如下形式 在numpy中就是这样的形式 anp.arange(4) #array([0, 1, 2, 3]) 对一个二维的矩阵,通常可以由两个行列式组…

用AI重构的钉钉,“钱”路在何方?

点击关注 文|郝 鑫,编|刘雨琦 钉钉2023年生态大会,离开了两年的无招,遇到了单飞9天的钉钉。 “做小钉钉、做好钉钉、做酷钉钉”,无招重申了钉钉的方向。 无招提到的三点,再加上“高质量增长”…

【Windows系统】资源管理器右键卡顿案例

问题 最近在使用办公电脑过程中,发现在Windows系统资源管理器中使用右键会出现卡顿现象。这是一台经常使用,工作日上班都会使用,以前没有这个问题。 出现问题的环境:windows版本:win10_x64(22H2&#xff…

Camunda 7.x 系列【27】手工任务 业务规则任务

有道无术,术尚可求,有术无道,止于术。 本系列Spring Boot 版本 2.7.10 本系列Camunda 版本 7.19.0 源码地址: 文章目录 1. 手工任务2. 业务规则任务1. 手工任务 Manual Task手工任务是定义在流程引擎之外的任务,流程引擎不需要了解,也不需要提供系统或用户界面的工作。…

iOS脱壳技术(二):深入探讨dumpdecrypted工具的高级使用方法

前言 应用程序脱壳是指从iOS应用程序中提取其未加密的二进制可执行文件,通常是Mach-O格式。这可以帮助我们深入研究应用程序的底层代码、算法、逻辑以及数据结构。这在逆向工程、性能优化、安全性分析等方面都有着重要的应用。 在上一篇内容中我们已经介绍了Clutc…

Mybatis的动态SQL分页及特殊字符应用

目录 ​编辑 前言: 1.mybatis的分页 1.1分页的应用场景 1.2分页的使用方式 2.mybatis中特殊字符处理 2.1mybatis中特殊字符介绍 2.2mybatis中特殊字符的使用方式 前言: 上篇我已经写了Mybatis动态之灵活使用,接着上篇写mybatis的分页…

Redis下载与安装

文章目录 Redis简介下载,安装和配置(cmd)图形化工具 Redis 简介 下载,安装和配置(cmd) 开启redis服务 1.在解压出来的文件夹中打开cmd 2.输入 redis-server.exe redis.windows.conf即可开启服务 可以看到…

Android 之 AlarmManager (闹钟服务)

本节引言: 本节带来的Android中的AlarmManager(闹钟服务),听名字我们知道可以通过它开发手机闹钟类的APP, 而在文档中的解释是:在特定的时刻为我们广播一个指定的Intent,简单说就是我们自己定一个时间, 然后…

Pycharm通过SSH配置centos上Spark环境

直接在shell进行pyspark进行编程,程序没有办法写得太长,而且我们希望能够实现一个及时给出结果的编程环境,可以使用pycharm连接centos上的spark,进行本地编程,同步到centos系统中运行程序,并把结果返回pych…

go:正确引入自己编写的包(如何在 Go 中正确引入自己编写的包)

前言 目录如下: 具体教程 1. 工作空间(我的是根目录)新建 go.work 文件 文件内容如下: go 1.21.0use (./tuchuang./tuchuang/testm ) 2. 添加go.mod文件 1. 包文件夹下 进入testm目录执行 go mod init testModule 2. 引用目…

web3d调试

web里嘛,Spector当然是首选 这个有一点点问题 我希望看见某个shader的执行耗时,这个无法做到诶 然后我查API,发现有个api用来query timestamp,然后兴致冲冲的把他写好了 然后发现报错需要开谷歌的一个设置,而且要在谷歌…

无用小程序之——论如何将导出为txt格式的QQ聊天记录进行合并

众所周知,QQ的PC端向我们提供了导出聊天记录的功能,并且允许我们导出为可读的文本文档(txt)格式,就像这样: 然后导出之后就是这样的(不要怪我啥都看不见,这已经是我能提供的最多的信…

MFC 硬件序列号

获取cpu序列号函数 uint64_t CpuId() { //获取CPU序列号int cpuInfo[4] { 0 };__cpuid(cpuInfo, 1);uint64_t serialNumber 0;if ((cpuInfo[3] & (1 << 3)) ! 0) {serialNumber static_cast<std::uint64_t>(_getwch()) << 32;}return serialNumber; …