✔ ★Java项目——设计一个消息队列(四)【整合数据库和文件、内存数据结构设计】

news2024/11/25 2:57:18

设计一个消息队列

  • ⼋. 整合数据库和⽂件
        • 上述代码中, 使⽤数据库存储了 Exchange, Queue, Binding, 使⽤⽂本⽂件存储了 Message.
        • 接下来我们把两个部分整合起来, 统⼀进⾏管理.
    • 创建 DiskDataCenter
      • 封装 Exchange ⽅法
      • 封装 Queue ⽅法
      • 封装 Binding ⽅法
      • 封装 Message ⽅法
  • ⼩结
  • 九. 内存数据结构设计
    • 创建 MemoryDataCenter
    • 封装 Exchange ⽅法
    • 封装 Queue ⽅法
    • 封装 Binding ⽅法
    • 封装 Message ⽅法
    • 针对未确认的消息的处理
    • 实现重启后恢复内存
    • 测试 MemoryDataCenter

⼋. 整合数据库和⽂件

上述代码中, 使⽤数据库存储了 Exchange, Queue, Binding, 使⽤⽂本⽂件存储了 Message.
接下来我们把两个部分整合起来, 统⼀进⾏管理.

创建 DiskDataCenter

使⽤ DiskDataCenter 来综合管理数据库和⽂本⽂件的内容.
DiskDataCenter 会持有 DataBaseManager 和 MessageFileManager 对象
在这里插入图片描述
实现 initDir
在这里插入图片描述

封装 Exchange ⽅法

在这里插入图片描述

封装 Queue ⽅法

在这里插入图片描述
创建/删除队列的时候同时创建/删除队列⽬录.

封装 Binding ⽅法

在这里插入图片描述

封装 Message ⽅法

在这里插入图片描述
在 deleteMessage 的时候判定是否进⾏ GC

⼩结

通过上述封装, 把数据库和硬盘⽂件两部分合并成⼀个整体. 上层代码在调⽤的时候则不再关⼼该数据
是存储在哪个部分的.

九. 内存数据结构设计

硬盘上存储数据, 只是为了实现 “持久化” 这样的效果. 但是实际的消息存储/转发, 还是主要靠内存的结
构.
对于 MQ 来说, 内存部分是更关键的, 内存速度更快, 可以达成更⾼的并发

创建 MemoryDataCenter

创建 mqserver.datacenter.MemoryDataCenter
在这里插入图片描述
• 使⽤四个哈希表, 管理 Exchange, Queue, Binding, Message.
• 使⽤⼀个哈希表 + 链表管理 队列 -> 消息 之间的关系.
• 使⽤⼀个哈希表 + 哈希表管理所有的未被确认的消息.

为了保证消息被正确消费了, 会使⽤两种⽅式进⾏确认. ⾃动 ACK 和 ⼿动 ACK.
其中⾃动 ACK 是指当消息被消费之后, 就会⽴即被销毁释放.
其中⼿动 ACK 是指当消息被消费之后, 由消费者主动调⽤⼀个 basicAck ⽅法, 进⾏主动确认. 服务器
收到这个确认之后, 才能真正销毁消息.
此处的 “未确认消息” 就是指在⼿动 ACK 模式下, 该消息还没有被调⽤ basicAck. 此时消息不能删除,
但是要和其他未消费的消息区分开. 于是另搞了个结构.
当后续 basicAck 到了, 就可以删除消息了

封装 Exchange ⽅法

在这里插入图片描述

封装 Queue ⽅法

在这里插入图片描述

封装 Binding ⽅法

在这里插入图片描述

封装 Message ⽅法

在这里插入图片描述

针对未确认的消息的处理

在这里插入图片描述

实现重启后恢复内存

在这里插入图片描述

测试 MemoryDataCenter

创建 MemoryDataCenterTests

package com.example.mq;

import com.example.mq.common.MqException;
import com.example.mq.mqserver.core.*;
import com.example.mq.mqserver.datacenter.DiskDataCenter;
import com.example.mq.mqserver.datacenter.MemoryDataCenter;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

@SpringBootTest
public class MemoryDataCenterTests {
    private MemoryDataCenter memoryDataCenter = null;

    @BeforeEach
    public void setUp() {
        memoryDataCenter = new MemoryDataCenter();
    }

    @AfterEach
    public void tearDown() {
        memoryDataCenter = null;
    }

    // 创建一个测试交换机
    private Exchange createTestExchange(String exchangeName) {
        Exchange exchange = new Exchange();
        exchange.setName(exchangeName);
        exchange.setType(ExchangeType.DIRECT);
        exchange.setAutoDelete(false);
        exchange.setDurable(true);
        return exchange;
    }

    // 创建一个测试队列
    private MSGQueue createTestQueue(String queueName) {
        MSGQueue queue = new MSGQueue();
        queue.setName(queueName);
        queue.setDurable(true);
        queue.setExclusive(false);
        queue.setAutoDelete(false);
        return queue;
    }

    // 针对交换机进行测试
    @Test
    public void testExchange() {
        // 1. 先构造一个交换机并插入.
        Exchange expectedExchange = createTestExchange("testExchange");
        memoryDataCenter.insertExchange(expectedExchange);
        // 2. 查询出这个交换机, 比较结果是否一致. 此处直接比较这俩引用指向同一个对象.
        Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
        Assertions.assertEquals(expectedExchange, actualExchange);
        // 3. 删除这个交换机
        memoryDataCenter.deleteExchange("testExchange");
        // 4. 再查一次, 看是否就查不到了
        actualExchange = memoryDataCenter.getExchange("testExchange");
        Assertions.assertNull(actualExchange);
    }

    // 针对队列进行测试
    @Test
    public void testQueue() {
        // 1. 构造一个队列, 并插入
        MSGQueue expectedQueue = createTestQueue("testQueue");
        memoryDataCenter.insertQueue(expectedQueue);
        // 2. 查询这个队列, 并比较
        MSGQueue actualQueue = memoryDataCenter.getQueue("testQueue");
        Assertions.assertEquals(expectedQueue, actualQueue);
        // 3. 删除这个队列
        memoryDataCenter.deleteQueue("testQueue");
        // 4. 再次查询队列, 看是否能查到
        actualQueue = memoryDataCenter.getQueue("testQueue");
        Assertions.assertNull(actualQueue);
    }

    // 针对绑定进行测试
    @Test
    public void testBinding() throws MqException {
        Binding expectedBinding = new Binding();
        expectedBinding.setExchangeName("testExchange");
        expectedBinding.setQueueName("testQueue");
        expectedBinding.setBindingKey("testBindingKey");
        memoryDataCenter.insertBinding(expectedBinding);

        Binding actualBinding = memoryDataCenter.getBinding("testExchange", "testQueue");
        Assertions.assertEquals(expectedBinding, actualBinding);

        ConcurrentHashMap<String, Binding> bindingMap = memoryDataCenter.getBindings("testExchange");
        Assertions.assertEquals(1, bindingMap.size());
        Assertions.assertEquals(expectedBinding, bindingMap.get("testQueue"));

        memoryDataCenter.deleteBinding(expectedBinding);

        actualBinding = memoryDataCenter.getBinding("testExchange", "testQueue");
        Assertions.assertNull(actualBinding);
    }

    private Message createTestMessage(String content) {
        Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());
        return message;
    }

    @Test
    public void testMessage() {
        Message expectedMessage = createTestMessage("testMessage");
        memoryDataCenter.addMessage(expectedMessage);

        Message actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());
        Assertions.assertEquals(expectedMessage, actualMessage);

        memoryDataCenter.removeMessage(expectedMessage.getMessageId());

        actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());
        Assertions.assertNull(actualMessage);
    }

    @Test
    public void testSendMessage() {
        // 1. 创建一个队列, 创建 10 条消息, 把这些消息都插入队列中.
        MSGQueue queue = createTestQueue("testQueue");
        List<Message> expectedMessages = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message message = createTestMessage("testMessage" + i);
            memoryDataCenter.sendMessage(queue, message);
            expectedMessages.add(message);
        }

        // 2. 从队列中取出这些消息.
        List<Message> actualMessages = new ArrayList<>();
        while (true) {
            Message message = memoryDataCenter.pollMessage("testQueue");
            if (message == null) {
                break;
            }
            actualMessages.add(message);
        }

        // 3. 比较取出的消息和之前的消息是否一致.
        Assertions.assertEquals(expectedMessages.size(), actualMessages.size());
        for (int i = 0; i < expectedMessages.size(); i++) {
            Assertions.assertEquals(expectedMessages.get(i), actualMessages.get(i));
        }
    }

    @Test
    public void testMessageWaitAck() {
        Message expectedMessage = createTestMessage("expectedMessage");
        memoryDataCenter.addMessageWaitAck("testQueue", expectedMessage);

        Message actualMessage = memoryDataCenter.getMessageWaitAck("testQueue", expectedMessage.getMessageId());
        Assertions.assertEquals(expectedMessage, actualMessage);

        memoryDataCenter.removeMessageWaitAck("testQueue", expectedMessage.getMessageId());
        actualMessage = memoryDataCenter.getMessageWaitAck("testQueue", expectedMessage.getMessageId());
        Assertions.assertNull(actualMessage);
    }

    @Test
    public void testRecovery() throws IOException, MqException, ClassNotFoundException {
        // 由于后续需要进行数据库操作, 依赖 MyBatis. 就需要先启动 SpringApplication, 这样才能进行后续的数据库操作.
        MqApplication.context = SpringApplication.run(MqApplication.class);

        // 1. 在硬盘上构造好数据
        DiskDataCenter diskDataCenter = new DiskDataCenter();
        diskDataCenter.init();

        // 构造交换机
        Exchange expectedExchange = createTestExchange("testExchange");
        diskDataCenter.insertExchange(expectedExchange);

        // 构造队列
        MSGQueue expectedQueue = createTestQueue("testQueue");
        diskDataCenter.insertQueue(expectedQueue);

        // 构造绑定
        Binding expectedBinding = new Binding();
        expectedBinding.setExchangeName("testExchange");
        expectedBinding.setQueueName("testQueue");
        expectedBinding.setBindingKey("testBindingKey");
        diskDataCenter.insertBinding(expectedBinding);

        // 构造消息
        Message expectedMessage = createTestMessage("testContent");
        diskDataCenter.sendMessage(expectedQueue, expectedMessage);

        // 2. 执行恢复操作
        memoryDataCenter.recovery(diskDataCenter);

        // 3. 对比结果
        Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
        Assertions.assertEquals(expectedExchange.getName(), actualExchange.getName());
        Assertions.assertEquals(expectedExchange.getType(), actualExchange.getType());
        Assertions.assertEquals(expectedExchange.isDurable(), actualExchange.isDurable());
        Assertions.assertEquals(expectedExchange.isAutoDelete(), actualExchange.isAutoDelete());

        MSGQueue actualQueue = memoryDataCenter.getQueue("testQueue");
        Assertions.assertEquals(expectedQueue.getName(), actualQueue.getName());
        Assertions.assertEquals(expectedQueue.isDurable(), actualQueue.isDurable());
        Assertions.assertEquals(expectedQueue.isAutoDelete(), actualQueue.isAutoDelete());
        Assertions.assertEquals(expectedQueue.isExclusive(), actualQueue.isExclusive());

        Binding actualBinding = memoryDataCenter.getBinding("testExchange", "testQueue");
        Assertions.assertEquals(expectedBinding.getExchangeName(), actualBinding.getExchangeName());
        Assertions.assertEquals(expectedBinding.getQueueName(), actualBinding.getQueueName());
        Assertions.assertEquals(expectedBinding.getBindingKey(), actualBinding.getBindingKey());

        Message actualMessage = memoryDataCenter.pollMessage("testQueue");
        Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
        Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
        Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
        Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());

        // 4. 清理硬盘的数据, 把整个 data 目录里的内容都删掉(包含了 meta.db 和 队列的目录).
        MqApplication.context.close();
        File dataDir = new File("./data");
        FileUtils.deleteDirectory(dataDir);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1607402.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【CAD建模号】学习笔记(三):图形绘制区1

图形绘制区介绍 CAD建模号的图形绘制区可以绘制我们所需要的各种3D模型&#xff0c;绘制的图形即为模型对象&#xff0c;包括线、面、体等。 1. 二维图形绘制组 二维图形是建模的基础&#xff0c;大多数复杂的模型都是基于二维图形制作出来的&#xff0c;掌握二维图形的绘制等…

视频教程下载:用ChatGPT玩转海外抖音TikTok

CHATGPT for TikTok是一门前沿课程&#xff0c;旨在帮助您充分发挥TikTok广告活动的全部潜力。随着数字营销的爆炸性增长&#xff0c;企业需要使用先进的工具来保持竞争优势。在这门课程中&#xff0c;您将学习如何利用CHATGPT——一种先进的人工智能工具——来创建与目标受众产…

FMEA-MSR:监视及系统响应的补充FMEA——SunFMEA软件

FMEA-MSR与功能安全 FMEA-MSR的研究对象是软件系统、电子系统或机电系统&#xff0c;这些系统中包括至少一个传感器、一个控制单元和一个执行器&#xff0c;或它们的一个子集。分析有关在客户操作条件下的潜在失效&#xff0c;及对系统和车辆的影响后果。该方法考虑的是系统或…

以pytorch pipeline并行为例,分析各kernel的耗时占比及性能瓶颈

以pytorch pipeline并行为例,分析各kernel的耗时占比及性能瓶颈 1.生成pipeline并行的测试代码2.pipeline profing3.生成nsys2json.py代码4.将nsys sqlite格式转chrome json格式5.生成耗时成分统计代码6.统计耗时成分7.耗时成分如下:8.查看GPU PCIE链路状态9.链路状态如下10.Ns…

为什么选择气膜建造室内球馆?

在当今社会&#xff0c;越来越多的人选择使用气膜来建造室内球馆。这一选择背后有着多重原因和优势。 灵活性和便捷性 气膜结构球馆具有极高的灵活性和便捷性。相较于传统的建筑方式&#xff0c;搭建气膜球馆所需的时间更短&#xff0c;而且审批流程更为简单。尽管气膜球馆被视…

OpenHarmony多媒体-mp4parser

简介 一个读取、写入操作音视频文件编辑的工具。 编译运行 1、通过IDE工具下载依赖SDK&#xff0c;Tools->SDK Manager->Openharmony SDK 把native选项勾上下载&#xff0c;API版本>10 2、开发板选择RK3568&#xff0c;ROM下载地址. 选择开发板类型是rk3568&#xf…

Linux读写文件

前言 学习了文件系统&#xff0c;就能理解为什么说Linux下一切皆文件。 语言层面的操作 在c语言的学习中我们可以使用fopen()函数对文件进行操作。 int main() {//FILE * fp fopen("./log.txt", "w");//FILE * fp fopen("./log.txt", "…

C++入门之类和对象(中)

C入门之类和对象(中) 文章目录 C入门之类和对象(中)1. 类的6个默认对象2. 构造函数2.1 概念2.2 特性2.3 补丁 3. 析构函数3.1 概念3.2 特性3.3 总结 4. 拷贝构造函数4.1 概念4.2 特性4.3 总结 1. 类的6个默认对象 如果一个类中什么都没有&#xff0c;那么这个类就是一个空类。…

【每日刷题】Day7

【每日刷题】Day7 &#x1f955;个人主页&#xff1a;开敲&#x1f349; &#x1f525;所属专栏&#xff1a;每日刷题&#x1f34d; &#x1f33c;文章目录&#x1f33c; 1. 206. 反转链表 - 力扣&#xff08;LeetCode&#xff09; 2. 203. 移除链表元素 - 力扣&#xff08;…

介绍与部署 Zabbix 监控系统

目录 前言 一、监控系统 1、主流的监控系统 2、监控系统功能 二、Zabbix 监控系统概述 1、Zabbix 概念 2、Zabbix 主要特点 3、Zabbix 主要功能 4、Zabbix 监控对象 5、Zabbix 主要程序 6、Zabbix 监控模式 7、Zabbix 运行机制 8、Zabbix 监控原理 9、Zabbix 主…

elementui单个输入框回车刷新整个页面

<!-- 搜索 --> <el-form :model"queryParams" ref"queryForm" :inline"true"><el-form-item label"名称" prop"nameLike"><el-input v-model"queryParams.nameLike" placeholder"请输入…

呼叫系统的技术实现原理和运作流程,ai智能系统,呼叫中心外呼软交换部署

呼叫系统的技术实现原理和运作流程可以涉及多个组成部分&#xff0c;包括硬件设备、软件系统和通信协议。以下是一般情况下呼叫系统的技术实现原理和运作流程的概述&#xff1a; 硬件设备&#xff1a; 服务器&#xff1a;用于承载呼叫系统的核心软件和数据库。电话交换机&#…

PyTorch|保存及加载模型、nn.Sequential、ModuleList和ModuleDict

系列文章目录 PyTorch|Dataset与DataLoader使用、构建自定义数据集 PyTorch|搭建分类网络实例、nn.Module源码学习 pytorch|autograd使用、训练模型 文章目录 系列文章目录一、保存及加载模型&#xff08;一&#xff09;保存及加载模型的权重&#xff08;二&#xff09;保存及…

前端 - 基础 表单标签 - label 标签

# label 标签 其实不属于 表单标签名单经常和 表单标签 搭配使用。 # <label> 标签 为 input 元素 定义 标注&#xff08; 标签 &#xff09; 使用场景 # 其实说白&#xff0c;<label> 标签就是为了方便用户体验的,举例说明 就是说&#xff0c;如上示&am…

软件需求开发和管理过程性指导文件

1. 目的 2. 适用范围 3. 参考文件 4. 术语和缩写 5. 需求获取的方式 5.1. 与用户交谈向用户提问题 5.1.1. 访谈重点注意事项 5.1.2. 访谈指南 5.2. 参观用户的工作流程 5.3. 向用户群体发调查问卷 5.4. 已有软件系统调研 5.5. 资料收集 5.6. 原型系统调研 5.6.1. …

【深度学习】yolov5目标检测学习与调试

2024.4.15 -2024.4.16 完结 0.准备&&补充知识点 yolo检测算法可以实现目标检测、分割和分类任务。 项目仓库地址&#xff1a;https://github.com/ultralytics/yolov5 跟练视频&#xff1a;目标检测 YOLOv5 开源代码项目调试与讲解实战 lux下载视频神器&#xff1a;h…

【氮化镓】栅极漏电对阈值电压和亚阈值摆幅影响建模

本文是一篇关于p-GaN门AlGaN/GaN高电子迁移率晶体管&#xff08;HEMTs&#xff09;的研究文章&#xff0c;发表于《应用物理杂志》&#xff08;J. Appl. Phys.&#xff09;2024年4月8日的期刊上。文章的标题为“Analysis and modeling of the influence of gate leakage curren…

从智能家居到智能城市:物联网中的隐私和安全风险

随着科技的不断进步&#xff0c;智能设备和物联网&#xff08;IoT&#xff09;技术已经逐渐渗透到我们的生活中。从智能家居设备到智能城市的实现&#xff0c;这些设备和技术可以让我们的生活变得更加便捷和高效。但是&#xff0c;这些设备也带来了不可忽视的隐私和安全风险。 …

Windows(Win10、Win11)本地部署开源大模型保姆级教程

目录 前言1.安装ollama2.安装大模型3.安装HyperV4.安装Docker5.安装聊天界面6.总结 点我去AIGIS公众号查看本文 本期教程用到的所有安装包已上传到百度网盘 链接&#xff1a;https://pan.baidu.com/s/1j281UcOF6gnOaumQP5XprA 提取码&#xff1a;wzw7 前言 最近开源大模型可谓闹…

内外网文件摆渡系统,如何贯通网络两侧被隔断的工作流?

随着业务范围不断扩大&#xff0c;产生的数据体量越来越多&#xff0c;企业会采取网络隔离&#xff0c;对核心数据进行保护。网络隔离主要目的是保护企业内部的敏感数据和系统不受外部网络攻击的风险&#xff0c;可以通过物理或逻辑方式实现&#xff0c;例如使用防火墙、网闸、…