消息队列项目(3)

news2025/1/11 23:58:39

Message 存储

由于 Message 以后会是一个比较庞大的数据, 放在数据库里并不合适,  因此我们要将它放在二进制文件

因为 Message 和 Queue 是捆绑在一起的, 因此我们将目录名设置成 QueueName, 然后这些目录是存储在之前的 data 里

就像这样:

在 testQueue 中有包含两个文件:

queue_data.txt 和 queue_stat.txt 

queue_data.txt 中包含的是消息具体内容

然后我们规定在消息具体内容中的数据存储方式:

 前面是消息长度, 后面接上消息的二进制数据, 这样才取出数据的时候也能计算出这些消息的 

offsetBeg 和 offsetEnd

queue_stat.txt 中包含的是总数据数和有效数据数

因为我们在进行文件中消息的删除时不可能直接进行删除, 我们要做的就是把消息给拿出来, 将属性isvalid 设为 false, 再放回去, 同时有效数据数 -1 这样就相当于消息已经被删除了

MessageFileManager

在 datacenter 中创建一个 MessageFileManager 的类

先写对文件和目录进行创建的方法:

    // 定义一个内部类, 来标识该队列的统计消息
    // 使用 static, 静态内部类
    static public class Stat{
        public int totalCount;
        public int validCount;
    }

    // 获得消息所在的路径, 里面有 queue_data.txt 和 queue_stat.txt
    private String getMessageDir(String queueName){
        return "./data/" + queueName;
    }

    // 获得 queue_data.txt 路径
    private String getMessageDataPath(String queueName){
        return getMessageDir(queueName) + "/queue_data.txt";
    }

    // 获得 queue_stat.txt 路径
    private String getMessageStatPath(String queueName){
        return getMessageDir(queueName) + "./queue_stat.txt";
    }

    private void writeStat(String queueName, Stat stat) throws IOException {
        File statFile = new File(getMessageStatPath(queueName));
        try(OutputStream outputStream = new FileOutputStream(statFile)){
            try(PrintWriter printWriter = new PrintWriter(outputStream)){
                printWriter.write(stat.totalCount+"\t"+stat.validCount);
                printWriter.flush();
            }
        }
    }

    // 创建对应的文件和目录
    public void createMessageFiles(String queueName) throws MqException, IOException {
        // 查看目录在不在, 不在就创建
        File baseDir = new File(getMessageDir(queueName));
        if(!baseDir.exists()){
            boolean ok = baseDir.mkdirs();
            if(!ok){
                throw new MqException("[MessageFileManager] 目录创建失败 queueName=" + queueName);
            }
        }
        // 创建 queue_data.txt
        File dataFile =  new File(getMessageDataPath(queueName));
        if(!dataFile.exists()){
            boolean ok = dataFile.createNewFile();
            if(!ok){
                throw new MqException("[MessageFileManager] queue_data.txt 创建失败 queueName=" + queueName);
            }
        }
        // 创建 queue_stat.txt
        File statFile = new File(getMessageStatPath(queueName));
        if(!dataFile.exists()){
            boolean ok = statFile.createNewFile();
            if(!ok){
                throw new MqException("[MessageFileManager] queue_stat.txt 创建失败 queueName=" + queueName);
            }
        }
        // 给queue_stat.txt 中插入初始值
        Stat stat = new Stat();
        stat.totalCount = 0;
        stat.validCount = 0;
        writeStat(queueName, stat);
    }

    // 销毁 queueName 对应的文件和目录
    public void destroyMessageFile(String queueName) throws MqException {
        // 先销毁 queue_data.txt 和 queue_stat.txt
        File dataFile = new File(getMessageDataPath(queueName));
        if(dataFile.exists()){
            boolean ok = dataFile.delete();
            if(!ok){
                throw new MqException("[MessageFileManager] queue_data.file销毁失败 queueName=" + queueName);
            }
        }

        File statFile = new File(getMessageStatPath(queueName));
        if(dataFile.delete()){
            boolean ok = statFile.delete();
            if(!ok){
                throw new MqException("[MessageFileManager] queue_stat.txt销毁失败 queueName=" + queueName);
            }
        }

        File baseDir = new File(getMessageDir(queueName));
        if(baseDir.exists()){
            boolean ok = baseDir.delete();
            if(!ok){
                throw new MqException("[MessageFileManager] 目录销毁失败 queueName=" + queueName);
            }
        }
    }

注: 这里的 MqException 是我们自己创建的 Exception

在 common 里进行创建 MqException:

往里面存数据:

因为要存二进制数据, 因此我们要再存之前要将 message 转换成 byte[] 然后存进去, 这就是序列化

    // 检查目录和文件是否存在
    public boolean checkFilesExits(String queueName) {
        // 判定队列的数据文件和统计文件是否都存在!!
        File queueDataFile = new File(getMessageDataPath(queueName));
        if (!queueDataFile.exists()) {
            return false;
        }
        File queueStatFile = new File(getMessageStatPath(queueName));
        if (!queueStatFile.exists()) {
            return false;
        }
        return true;
    }

    private Stat readStat(String queueName) throws IOException {
        // 由于当前消息统计文件是文本文件, 可以直接使用 Scanner 来读取文件内容
        Stat stat = new Stat();
        File statFile = new File(getMessageStatPath(queueName));
        try(InputStream inputStream = new FileInputStream(statFile)){
            try(Scanner scanner = new Scanner(inputStream)){
                stat.totalCount = scanner.nextInt();
                stat.validCount = scanner.nextInt();
            }
        }
        return stat;
    }

    public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
        // 检查当前要写的文件是否存在
        if(!checkFilesExits(queue.getName())){
            throw new MqException("[MessageFileManager] 要写入的文件不存在 queueName=" +queue.getName());
        }
        // 把 message 序列化
        byte[] messageBinary = BinaryTool.toBytes(message);
        synchronized (queue){
            // 先获取到当前队列数据文件的长度, 用来计算出该 Message 对象的 offsetBeg 和 offsetEnd
            // 把新的 Message 数据, 写入到队列数据文件的末尾. 此时 Message 对象的 offsetNeg, 就是当前文件长度 + 4
            // offsetEnd 就是当前文件长度 + 4 + message 自身长度
            File dataFile = new File(getMessageDataPath(queue.getName()));
            message.setOffsetBeg(dataFile.length()+4);
            message.setOffsetEnd(dataFile.length()+4+messageBinary.length);
            // message 写入
            try(OutputStream outputStream = new FileOutputStream(dataFile)){
                try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
                    // 写入长度
                    dataOutputStream.writeInt(messageBinary.length);
                    // 写入消息本体
                    dataOutputStream.write(messageBinary);
                }
            }
            // 改变 stat 里的值
            Stat stat = readStat(queue.getName());
            stat.totalCount++;
            stat.validCount++;
            writeStat(queue.getName(), stat);
        }

    }

 删除消息:

这里涉及到我们的反序列化:

// 通过将 message 拿出来, 然后将 iSValid 设成 null
    // 在写入进行
    public void deleteMessage(MSGQueue queue, Message message) throws MqException, IOException, ClassNotFoundException {
        if(!checkFilesExits(queue.getName())){
            throw new MqException("[MessageFileManager] 要删除消息的文件不存在 queueName=" +queue.getName());
        }
        File dataFile = new File(getMessageDataPath(queue.getName()));
        synchronized (queue){
            try(RandomAccessFile randomAccessFile = new RandomAccessFile(dataFile, "rw")){
                // 1. 先从文件中读取对应的 Message 数据, 这里会把 buffSrc 给装满
                byte[] bufferSrc = new byte[(int)(message.getOffsetEnd()- message.getOffsetBeg())];
                randomAccessFile.seek(message.getOffsetBeg());
                randomAccessFile.read(bufferSrc);
                // 2. 把当前读出来的二进制数据转换成 Message 对象
                Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);
                // 3. 把 isValid 设置成无效
                diskMessage.setIsValid((byte)0x0);
                // 4. 重新写入文件
                byte[] bufferDest = BinaryTool.toBytes(diskMessage);
                // 上面在读之后, 光标移到了下一个消息的位置
                // 因此先将光标移回来
                randomAccessFile.seek(message.getOffsetBeg());
                randomAccessFile.write(bufferDest);
            }
            // 更新统计文件
            Stat stat = readStat(queue.getName());
            if(stat.validCount > 0){
                stat.validCount--;
            }
            writeStat(queue.getName(), stat);
        }
    }

得到指定目录中的所有消息:

// 将所有消息内容加载到内存中
    // 这个方法准备在程序启动时调用
    // 使用 LinkedList, 主要目的是为了进行头删操作
    // 由于是在程序启动时进行调用, 此时服务器还不能处理请求, 因此不需要进行加锁
    public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, ClassNotFoundException, MqException {
        LinkedList<Message> messages = new LinkedList<>();
        try(InputStream inputStream = new FileInputStream(getMessageDataPath(queueName))){
            try(DataInputStream dataInputStream = new DataInputStream(inputStream)){
                // 记录当前文件光标
                long currentOffset = 0;
                // 循环读取消息
                while(true){
                    // 1. 读取当前消息的长度, 这里的 readInt 会读到文章末尾后
                    // 因此会抛出 EOFException 异常,
                    int messageSize = dataInputStream.readInt();
                    // 2. 按照长度读取到消息
                    byte[] buffer = new byte[messageSize];
                    int actualSize = dataInputStream.read(buffer);
                    if(messageSize != actualSize){
                        throw new MqException("[MessageFileManager] 文件格式错误! queueName=" + queueName);
                    }
                    // 3. 将读到的二进制数据反序列化会 Message对象
                    Message message = (Message) BinaryTool.fromBytes(buffer);
                    // 4. 判定一下看这个消息对象是不是无效对象
                    if(message.getIsValid() != 0x1){
                        // 虽然是无效内容, 但也要将 offset 更新
                        currentOffset += (4 + messageSize);
                        continue;
                    }
                    // 5. 有效数据则将其加入链表中, 加入前计算 offsetBeg 和 offsetEnd
                    message.setOffsetBeg(currentOffset + 4);
                    message.setOffsetBeg(currentOffset + 4 + messageSize);
                    currentOffset += (4 + messageSize);
                    messages.add(message);

                }
            } catch (EOFException e) {
                // 这里的 catch 是预料中的情况, 是正常的业务逻辑
                // 因此不需要去处理
                System.out.println("[MessageFileManager] 恢复 Message 数据完成 queueName=" + queueName);
            }
        }
        return messages;
    }

垃圾回收(gc)

这里的垃圾回收策略是: 当总消息数大于 2000 条时, 有效消息数小于一半时. 进行垃圾回收

// GC回收策略
    // 约定当消息大于 2000 条并且 有效消息 < 1/2 时 进行 gc
    public boolean checkGC(String queueName) throws IOException {
        // 判定是否要 GC
        Stat stat = readStat(queueName);
        if (stat.totalCount > 2000 && stat.totalCount / stat.validCount > 2) {
            return true;
        }
        return false;
    }

    private String getQueueDataNewPath(String queueName){
        return getMessageDir(queueName) + "/queue_data_new.txt";
    }

    // 使用复制算法进行gc
    // 创建新文件 queue_data_new.txt, 将消息放入
    // 删除 queue_data.txt
    // 将 queue_data_new.txt 改为 queue_data.txt
    public void gc(MSGQueue queue) throws IOException, MqException, ClassNotFoundException {
        synchronized (queue){
            // 计算一下gc消耗的时间
            long gcBeg = System.currentTimeMillis();

            // 1. 创建一个新文件
            File dataNewFile = new File(getQueueDataNewPath(queue.getName()));
            if(dataNewFile.exists()){
                // 如果存在说明之前 gc 到一半, 程序崩溃了
                throw new MqException("[MessageFIleManager] gc 时发现 queue_data_new.txt 存在! queueName=" + queue.getName());
            }
            boolean ok = dataNewFile.createNewFile();
            if(!ok){
                throw new MqException("[MessageFileManager] 创建 queue_data_new.txt 失败! DataNewFile=" + dataNewFile.getAbsolutePath());
            }

            // 2. 从旧的文件中, 读取出所有的有效消息对象(这个方法已经去除掉无效对象了)
            LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());

            // 3. 将 isValid = true 的消息写入新文件
            try(OutputStream outputStream = new FileOutputStream(dataNewFile)){
                try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
                    for(Message message : messages){
                        byte[] buffer = BinaryTool.toBytes(message);
                        // 先写消息长度
                        dataOutputStream.writeInt(buffer.length);
                        dataOutputStream.write(buffer);
                    }
                }
            }
            // 4. 删除旧的数据文件, 并且把新的文件重新命名
            File dataOldFile = new File(getMessageDataPath(queue.getName()));
            ok = dataOldFile.delete();
            if(!ok){
                throw new MqException("[MessageFileManager] 删除旧的数据文件失败! dataOldFile=" + dataOldFile.getAbsolutePath());
            }
            // 把 queue_data_new.txt -> queue_data.txt
            ok = dataNewFile.renameTo(dataOldFile);
            if(!ok){
                throw new MqException("[MessageFileManager] 文件重命名失败! dataNameFile=" + dataNewFile.getAbsolutePath());
            }

            // 更新统计文件
            Stat stat = readStat(queue.getName());
            stat.totalCount = messages.size();
            stat.validCount = messages.size();
            writeStat(queue.getName(), stat);
            System.out.println("[MessageFileManager] 垃圾回收完成 queueName=" + queue.getName());
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/839180.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LeetCode 0021. 合并两个有序链表

【LetMeFly】21.合并两个有序链表 力扣题目链接&#xff1a;https://leetcode.cn/problems/merge-two-sorted-lists/ 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 示例 1&#xff1a; 输入&#xff1a;l1 [1,2,4], l…

-bash: fork: retry: Resource temporarily unavailable 问题解决

错误提示&#xff1a; -bash: fork: retry: Resource temporarily unavailable 错误分析&#xff1a;之前已经出现过这种资源限制的报错提醒&#xff0c;然后整个系统可用的连接数就已经用完了&#xff0c;无法使用工具来获取系统信息&#xff0c;所以将运行的任务脚本kill后开…

Cesium引入vite + vue3

下载Cesium yarn add cesium下载cesium-vite 插件 yarn add vite-plugin-cesium使用 vite.config.js import { defineConfig } from vite import vue from vitejs/plugin-vue import WindiCSS from vite-plugin-windicss import cesium from vite-plugin-cesium; //引入插件…

Stephen Wolfram:嵌入的概念

The Concept of Embeddings 嵌入的概念 Neural nets—at least as they’re currently set up—are fundamentally based on numbers. So if we’re going to to use them to work on something like text we’ll need a way to represent our text with numbers. And certain…

如何制作网红小和尚视频

文章目录 写在前面需要用到的工具制作过程制作一张自己喜欢的底图制作爆款文案将文案转为语音使用底图和语音生成动画去除水印使用剪映生成视频 写在后面 写在前面 最近我尝试制作小和尚的视频&#xff0c;使用了一些AI技术&#xff0c;制作成本相对较低。这次经历让我感受到了…

工厂模式(C++)

定义 定义一个用于创建对象的接口&#xff0c;让子类决定实例化哪一个类。Factory Method使得一个类的实例化延迟(目的:解耦&#xff0c;手段:虚函数)到子类。 应用场景 在软件系统中&#xff0c;经常面临着创建对象的工作;由于需求的变化&#xff0c;需要创建的对象的具体类…

c语言——计算一串字符的长度

//计算一串字符的长度 //在main函数中输出一个字符&#xff0c;并且计算出该字符的长度。 #include<stdio.h> #include<stdlib.h> int length(char *s){int i0;while(*s!\0){i;s;}return i;} int main() {int len;char str[20];printf("输入字符串&#xff1a…

215. 数组中的第K个最大元素(快排+大根堆+小根堆)

题目链接&#xff1a;力扣 解题思路&#xff1a; 方法一&#xff1a;基于快速排序 因为题目中只需要找到第k大的元素&#xff0c;而快速排序中&#xff0c;每一趟排序都可以确定一个最终元素的位置。 当使用快速排序对数组进行降序排序时&#xff0c;那么如果有一趟排序过程…

大数据课程G2——Hbase的基本架构

文章作者邮箱&#xff1a;yugongshiyesina.cn 地址&#xff1a;广东惠州 ▲ 本章节目的 ⚪ 掌握Hbase的基本架构&#xff1b; ⚪ 掌握Hbase的读写流程&#xff1b; ⚪ 掌握Hbase的设计与优化&#xff1b; 一、基本架构 1. HRegion 1. 在HBase中&#xff0c;会…

林大数据结构【2019】

关键字&#xff1a; 哈夫曼树权值最小、哈夫曼编码、邻接矩阵时间复杂度、二叉树后序遍历、二叉排序树最差时间复杂度、非连通无向图顶点数&#xff08;完全图&#xff09;、带双亲的孩子链表、平衡二叉树调整、AOE网关键路径 一、判断 二、单选 三、填空 四、应用题

【C++】右值引用

文章目录 右值引用值得形式返回对象的缺陷移动语句移动赋值 右值引用 能够取地址、能够被修改的被称之为左值。 不能够取地址、不能够被修改、以及将亡值被称之为右值。 普通类型的变量&#xff0c;因为有名字&#xff0c;可以取地址&#xff0c;都认为是左值。const修饰的常量…

MyBatis查询数据库(4)

前言&#x1f36d; ❤️❤️❤️SSM专栏更新中&#xff0c;各位大佬觉得写得不错&#xff0c;支持一下&#xff0c;感谢了&#xff01;❤️❤️❤️ Spring Spring MVC MyBatis_冷兮雪的博客-CSDN博客 终于到了MyBatis最后一篇&#xff0c;这篇讲的是动态SQL的使用。 复杂情…

docker配置远程连接端口

配置docker 配置远程连接端口 vi /lib/systemd/system/docker.servicesystemctl daemon-reload && systemctl restart docker firewall-cmd --zonepublic --add-port2375/tcp --permanenthttp://node2:2375/version

【类和对象】基础知识

目录 一、类的定义 定义方式一&#xff1a;定义与声明都在类中 定义方式二&#xff1a;定义与声明分离 二、类的实例化&&类对象存储方式 类的实例化 类对象存储模式 三、this指针 一、类的定义 定义方式一&#xff1a;定义与声明都在类中 #include<iostream&…

深入学习 Redis - 谈谈你对 Redis 的 RDB、AOF、混合持久化的了解吧?

目录 一、Redis 是怎么存储数据的&#xff1f; 二、Redis 具体是按照什么样的策略来实现持久化的&#xff1f; 2.1、RDB&#xff08;Redis Database&#xff09; 2.1.1、触发机制 2.1.2、bgsave 命令处理流程 2.1.3、RDB 文件的处理 2.1.4、演示效果 1&#xff09;手动执…

GEE:谐波模型在遥感影像中的应用(季节性变化的拟合与可视化)

作者:CSDN @ _养乐多_ 谐波模型是一种常用的工具,用于拟合和分析影像数据中的周期性和季节性变化。本文将介绍如何使用Google Earth Engine平台实现谐波模型,通过对Landsat影像进行处理和拟合,展示季节性变化的拟合结果,并通过图表和地图可视化展示数据。 谐波模型是一种…

《Java-SE-第二十九章》之Synchronized原理与JUC常用类

前言 在你立足处深挖下去,就会有泉水涌出!别管蒙昧者们叫嚷:“下边永远是地狱!” 博客主页&#xff1a;KC老衲爱尼姑的博客主页 博主的github&#xff0c;平常所写代码皆在于此 共勉&#xff1a;talk is cheap, show me the code 作者是爪哇岛的新手&#xff0c;水平很有限&…

OpenAI 已为 GPT-5 申请商标,GPT-4 发布不到半年,GPT-5 就要来了吗?

据美国专利商标局&#xff08;USPTO&#xff09;信息显示&#xff0c;OpenAI已经在7月18日申请注册了“GPT-5”商标。 在这份新商标申请中&#xff0c;OpenAI将“GPT-5”描述为一种“用于使用语言模型的可下载计算机软件”。 继GPT-4发布之后&#xff0c;它预计将成为OpenAI下一…

【硬件设计】模拟电子基础三--放大电路

模拟电子基础三--放大电路 一、集成运算放大器1.1 定义、组成与性能1.2 电流源电路1.3 差动放大电路1.4 理想运算放大器 二、集成运算放大器的应用2.1 反向比例运算电路2.2 同向比例运算电路2.3 反向加法运算电路2.4 反向减法运算电路2.5 积分运算电路2.6 微分运算电路2.7电压比…

备战秋招 | 笔试强训23

目录 一、选择题 二、编程题 三、选择题题解 四、编程题题解 一、选择题 1、2 —3—6—7—8—14—15—30&#xff0c;下面的数字哪一个是不属于这组数字的系列? A. 3 B. 7 C. 8 D. 15 2、下列关于线性链表的叙述中&#xff0c;正确的是&#xff08; &#xff09; A. 各数…