消息队列之 - 消息持久化设计

news2024/10/5 13:47:56

目录

  • 前言
  • 设计思想
    • 消息文件的格式
    • 垃圾回收的思想
    • 其他情况
    • 序列化和反序列化
  • 具体代码实现
    • 测试消息文件

前言

我们之前说过, 消息队列不管要往硬盘中存储, 还要往内存中存储, 并且是以内存为主 ,硬盘为辅, 接下来, 就将消息队列如何存储到硬盘 做一个设计

设计思想

我们往硬盘中存放的消息, 只是简单的 存 和 取 还有删除 , 并不涉及 复杂的增删改查, 而且消息的数量可能会非常多, 往数据库中存放效率不高

我们之前说过, 消息 是 依附于队列来存在的, 所以存储的时候 就要按照队列的分类来存储 ,在一个虚拟主机中, 会有多个队列 , 每个队列下面, 放着一个文件, 文件中存储我们的消息, 那一个文件不可能只存储一个消息, 而将多个消息存储到一个文件中, 我们需要知道每个消息 的开头 与结尾, 这里我们每个队列下面 存放 俩个文件, 一个文件是单纯的消息文件 , 一个文件是用来存储 这个消息文件中的消息的总数目, 和有效文件的数目, 方便我们后续进行垃圾回收算法的设计

消息文件的格式

我们对存储多个消息的文件 , 进行消息格式的设计 ,我们约定 每个消息的前4个字节用来存放, 这个消息的长度 , 然后跟上 消息本体, 依此内推 , 如图
在这里插入图片描述

对于我们的Broker server 服务器来说, 消息是需要新增 和删除的, 因此 在文件中采取逻辑删除的形式来删除一个消息

垃圾回收的思想

对于一个文件中无效消息过多的情况 , 我们采取垃圾回收 来 清楚这些无效消息, 我们约定 ,加入一个文件中的总消息数量大于2000 , 且无效消息 >50% ,时我们进行垃圾回收, 垃圾回收具体实现为: 我们采取复制算法, 也就是 假如一个文件触发了垃圾回收, 我们就将这个文件中的有效消息, 挪到一个新文件中, 在将旧文件删除掉 , 并将这个新文件的名称改回旧文件的名称

其他情况

我们上面说了 , 垃圾消息过多的情况, 还有一种情况是 ,假如一个文件全部都是有效消息, 并且还不断的持续添加中, 这种状况势必会让文件越来越大, 当文件过大的时候, 我们想要查找消息 就显得有点困难, 因此我们势必要针对这种情况作出处理, 我们约定: 当一个文件过大的时候 ,我们就将它一份为2 , 分成俩个具体的小文件, 同理当一个文件垃圾回收后过小的时候, 我们就将这俩个小文件合并成一个文件

具体思路: 用一个队列来存放每个文件的大小是多少,消息数目的多少, 无效消息是多少,

序列化和反序列化

因为我们存储的是二进制文件 , 所以 势必要序列化和反序列化 消息的, 序列化常用的方式 有 JSON 格式, 但是由于 JSON格式序列化之后产生的是文本文件, 这里我们想要存储的是二进制文件 ,所以我们采用二进制化的序列化方式 , 使用JAVA标准库提供的 ObjectInputStream 和 ObjectOutputStream 的方式来序列化 和反序列化

具体代码实现

该类设计方法
在这里插入图片描述

package com.example.demo.mqServer.dataCenter;

import com.example.demo.Common.BinaryTool;
import com.example.demo.Common.MqException;
import com.example.demo.mqServer.core.MSGQueue;
import com.example.demo.mqServer.core.Message;

import java.io.*;
import java.util.LinkedList;
import java.util.Scanner;

//通过这个类, 来针对硬盘上的消息进行管理
public class MessageFileManager {
    // 定义一个内部类, 来表示该队列的统计信息
    // 有限考虑使用 static, 静态内部类.
    static public class Stat {
        // 此处直接定义成 public, 就不再搞 get set 方法了.
        // 对于这样的简单的类, 就直接使用成员, 类似于 C 的结构体了.
        public int totalCount;  // 总消息数量
        public int validCount;  // 有效消息数量
    }
    // 预定消息文件所在的目录和文件名
    // 这个方法, 用来获取到指定队列对应的消息文件所在路径
    private String getQueueDir(String queueName){
        return "./data/"+queueName;
    }

    // 这个方法用来获取该队列的消息数据文件路径
    private String getQueueDataPath(String queueName){
        return getQueueDir(queueName)+"/queue_data.txt";
    }
    // 这个方法用来获取该队列的消息统计文件路径
    private String getQueueStatPath(String queueName){
        return getQueueDir(queueName)+"/queue_stat.txt";
    }
    public void init(){
        // 暂时不需要单独初始化方法, 只是列出来
    }
    //    读写 消息统计文件
    private Stat readStat(String queueName){
        Stat stat = new Stat();
        try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))){
            Scanner scanner = new Scanner(inputStream);
            stat.totalCount = scanner.nextInt();
            stat.validCount = scanner.nextInt();
            return stat;

        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    private void writeStat(String queueName , Stat stat){
        // 使用 PrintWrite 来写文件.
        // OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.
        try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))){
            PrintWriter printWriter = new PrintWriter(outputStream);
            printWriter.write(stat.totalCount +"\t"+stat.validCount);
            printWriter.flush();
        }catch (IOException e) {
            e.printStackTrace();
        }
    }

//    创建文件目录与相对应的文件
    public void createQueueFiles(String queueName) throws IOException {
        File baseDir = new File(getQueueDir(queueName));
        if (!baseDir.exists()){
            // 如果不存在就去创建它
            boolean ok = baseDir.mkdirs();
            if (!ok){
                throw new IOException("创建目录失败! baseDir=" + baseDir.getAbsolutePath());
            }
        }
        // 2. 创建队列数据文件
        File queueDataFile = new File(getQueueDataPath(queueName));
        if (!queueDataFile.exists()) {
            boolean ok = queueDataFile.createNewFile();
            if (!ok) {
                throw new IOException("创建文件失败! queueDataFile=" + queueDataFile.getAbsolutePath());
            }
        }
        // 3. 创建消息统计文件
        File queueStatFile = new File(getQueueStatPath(queueName));
        if (!queueStatFile.exists()) {
            boolean ok = queueStatFile.createNewFile();
            if (!ok) {
                throw new IOException("创建文件失败! queueStatFile=" + queueStatFile.getAbsolutePath());
            }
        }
        // 4. 给消息统计文件设置初始值
        Stat stat = new Stat();
        stat.validCount = 0;
        stat.totalCount =0;
        writeStat(queueName,stat);
    }

    // 删除队列的目录和文件.
    // 队列也是可以被删除的. 当队列删除之后, 对应的消息文件啥的, 自然也要随之删除.
    public void destroyQueueFiles(String queueName) throws IOException {
        //先删除里面的文件, 在删除目录
        File  queueDataFile = new File(getQueueDataPath(queueName));
        boolean isQueueData = queueDataFile.delete();
        File queueStatFile = new File(getQueueStatPath(queueName));
        boolean isQueueStat = queueStatFile.delete();
        File queueDir = new File(getQueueDir(queueName));
        boolean isQueueDir = queueDir.delete();
        if (!isQueueData || !isQueueStat || !isQueueDir){
            // 只要有一个没有删除成功, 说明整体删除失败,
            throw new IOException("删除队列目录和文件失败! baseDir=" + queueDir.getAbsolutePath());
        }
    }

    // 检查队列的目录 与 文件是否存在
    // 比如后续有生产者给 broker server 生产消息了, 这个消息就可能需要记录到文件上(取决于消息是否要持久化)
    public boolean checkFilesExits(String queueName) {
        // 判定队列的数据文件和统计文件是否都存在!!
        File queueDataFile = new File(getQueueDataPath(queueName));
        if (!queueDataFile.exists()) {
            return false;
        }
        File queueStatFile = new File(getQueueStatPath(queueName));
        if (!queueStatFile.exists()) {
            return false;
        }
        return true;
    }



    // 这个方法用来把一个新的消息, 放到队列对应的文件中.
    // queue 表示要把消息写入的队列. message 则是要写的消息.
    public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
        // 先检查队列是否合法
        if (!checkFilesExits(queue.getName())){
            throw new MqException("[MessageFileManager] 队列对应的文件不存在! queueName=" + queue.getName());
        }
        // 2. 把 Message 对象, 进行序列化, 转成二进制的字节数组.
        byte[] messageBinary = BinaryTool.toBytes(message);
        synchronized (queue){
            // 3. 先获取到当前的队列数据文件的长度, 用这个来计算出该 Message 对象的 offsetBeg 和 offsetEnd
            // 把新的 Message 数据, 写入到队列数据文件的末尾. 此时 Message 对象的 offsetBeg , 就是当前文件长度 + 4
            // offsetEnd 就是当前文件长度 + 4 + message 自身长度.
            File queueDataFile = new File(getQueueDataPath(queue.getName()));
            // 设置 beg 和 end 值
            message.setOffsetBeg(queueDataFile.length()+4);
            message.setOffsetEnd(queueDataFile.length()+4+messageBinary.length);
            // 4. 写入消息到数据文件, 注意, 是追加写入到数据文件末尾.
            try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {
                // DataOutputStream 封装了 将int 一个个字节写入 方法
                try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                    // 接下来要先写当前消息的长度, 占据 4 个字节的~~
                    dataOutputStream.writeInt(messageBinary.length);
                    // 写入消息本体
                    dataOutputStream.write(messageBinary);
                }
            }
            // 5. 更新消息的统计文件
            Stat stat = readStat(queue.getName());
            stat.totalCount+=1;
            stat.validCount+=1;
            writeStat(queue.getName(),stat);
        }
    }

    // 这个是删除消息的方法.
    // 这里的删除是逻辑删除, 也就是把硬盘上存储的这个数据里面的那个 isValid 属性, 设置成 0
    // 1. 先把文件中的这一段数据, 读出来, 还原回 Message 对象;
    // 2. 把 isValid 改成 0;
    // 3. 把上述数据重新写回到文件.
    // 此处这个参数中的 message 对象, 必须得包含有效的 offsetBeg 和 offsetEnd
    public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {
        synchronized (queue) {
            try (RandomAccessFile accessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {
                accessFile.seek(message.getOffsetBeg());
                // 1. 先从文件中读取对应的 Message 数据.
                byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];
                accessFile.read(bufferSrc);
                // 2. 把当前读出来的二进制数据, 转换回成 Message 对象
                Message message1 = (Message) BinaryTool.toObject(bufferSrc);
                // 3. 把 isValid 设置为无效.
                message1.setIsValid((byte) 0x0);
                // 此处不需要给参数的这个 message 的 isValid 设为 0, 因为这个参数代表的是内存中管理的 Message 对象
                // 而这个对象马上也要被从内存中销毁了.
                // 4. 重新写入文件
                byte[] bufferDest = BinaryTool.toBytes(message1);
                // 虽然上面已经 seek 过了, 但是上面 seek 完了之后, 进行了读操作, 这一读, 就导致, 文件光标往后移动, 移动到
                // 下一个消息的位置了. 因此要想让接下来的写入, 能够刚好写回到之前的位置, 就需要重新调整文件光标.
                accessFile.seek(message.getOffsetBeg());
                accessFile.write(bufferDest);
                // 通过上述这通折腾, 对于文件来说, 只是有一个字节发生改变而已了~~
            }
            // 不要忘了, 更新统计文件!! 把一个消息设为无效了, 此时有效消息个数就需要 - 1
            Stat stat = readStat(queue.getName());
            if (stat.validCount > 0) {
                stat.validCount -= 1;
            }
            writeStat(queue.getName(), stat);
        }
    }

    // 使用这个方法, 从文件中, 读取出所有的消息内容, 加载到内存中(具体来说是放到一个链表里)
    // 这个方法, 准备在程序启动的时候, 进行调用.
    // 这里使用一个 LinkedList, 主要目的是为了后续进行头删操作.
    // 这个方法的参数, 只是一个 queueName 而不是 MSGQueue 对象. 因为这个方法不需要加锁, 只使用 queueName 就够了.
    // 由于该方法是在程序启动时调用, 此时服务器还不能处理请求呢~~ 不涉及多线程操作文件.
    public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
        LinkedList<Message> messages = new LinkedList<>();
        try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){
            try (DataInputStream dataInputStream = new DataInputStream(inputStream)){
                // 这个变量记录当前文件光标.
                long currentOffset = 0;
                // 一个文件中包含了很多消息, 此处势必要循环读取.
                while (true){
                    // 1. 读取当前消息的长度, 这里的 readInt 可能会读到文件的末尾(EOF)
                    //    readInt 方法, 读到文件末尾, 会抛出 EOFException 异常. 这一点和之前的很多流对象不太一样.
                    int messageSize = dataInputStream.readInt();
                    // 2 将消息按照这个长度读取到数组中
                    byte[] buffer =  new byte[messageSize];
                    int actualSize = dataInputStream.read(buffer);
                    if (messageSize != actualSize) {
                        // 如果不匹配, 说明文件有问题, 格式错乱了!!
                        throw new MqException("[MessageFileManager] 文件格式错误! queueName=" + queueName);
                    }
                    // 3 将读到的数据 反序列化 成 message 对象
                    Message message =(Message) BinaryTool.toObject(buffer);
                    // 4 判断当前对象是否 有效
                    if (message.getIsValid() != 0x1){
                        // 说明是无效数据, 直接跳过
                        // 虽然消息是无效数据, 但是 offset 不要忘记更新.
                        currentOffset += (4 + messageSize);
                        continue;
                    }
                    // 5. 有效数据, 则需要把这个 Message 对象加入到链表中. 加入之前还需要填写 offsetBeg 和 offsetEnd
                    //    进行计算 offset 的时候, 需要知道当前文件光标的位置的. 由于当下使用的 DataInputStream 并不方便直接获取到文件光标位置
                    //    因此就需要手动计算下文件光标.
                    message.setOffsetBeg(currentOffset + 4);
                    message.setOffsetEnd(currentOffset + 4 + messageSize);
                    currentOffset += (4 + messageSize);
                    messages.add(message);
                }
            }catch (EOFException e) {
                // 这个 catch 并非真是处理 "异常", 而是处理 "正常" 的业务逻辑. 文件读到末尾, 会被 readInt 抛出该异常.
                // 这个 catch 语句中也不需要做啥特殊的事情
                System.out.println("[MessageFileManager] 恢复 Message 数据完成!");
            }
        }
        return messages;
    }

    // 检查当前是否要针对该队列的消息数据文件进行 GC
    public boolean checkGC(String queueName) {
        // 判断是否要GC , 是根据总消息总数 和 有效 消息总数, 这俩个值都是在消息统计文件中的
        Stat stat = readStat(queueName);
        if (stat.totalCount > 2000 && (double)stat.totalCount / (double) stat.validCount <0.5){
            return true;
        }
        return false;
    }
    private String getQueueDataNewPath(String queueName) {
        return getQueueDir(queueName) + "/queue_data_new.txt";
    }
    // 通过这个方法, 真正执行消息数据文件的垃圾回收操作.
    // 使用复制算法来完成.
    // 创建一个新的文件, 名字就是 queue_data_new.txt
    // 把之前消息数据文件中的有效消息都读出来, 写到新的文件中.
    // 删除旧的文件, 再把新的文件改名回 queue_data.txt
    // 同时要记得更新消息统计文件.
    public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {
        // 进行 gc 的时候, 是针对消息数据文件进行大洗牌. 在这个过程中, 其他线程不能针对该队列的消息文件做任何修改.
        synchronized (queue){
            // 由于 gc 操作可能比较耗时, 此处统计一下执行消耗的时间.
            long gcBeg = System.currentTimeMillis();
            // 1. 创建一个新的文件
            File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
            if (queueDataNewFile.exists()) {
                // 正常情况下, 这个文件不应该存在. 如果存在, 就是意外~~ 说明上次 gc 了一半, 程序意外崩溃了.
                throw new MqException("[MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在! queueName=" + queue.getName());
            }
            // 如果没有新文件, 就去创建新文件
            boolean ok = queueDataNewFile.createNewFile();
            if (!ok) {
                throw new MqException("[MessageFileManager] 创建文件失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath());
            }
            //2 . 从旧的文件中读取所有有效对象 , 直接调用
            LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());
            //3 . 将有效对象 写入到新文件中

            try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {
                try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                    for (Message message : messages) {
                        byte[] buffer = BinaryTool.toBytes(message);
                        // 先写四个字节消息的长度
                        dataOutputStream.writeInt(buffer.length);
                        dataOutputStream.write(buffer);
                    }
                }
            }

            // 4. 删除旧的数据文件, 并且把新的文件进行重命名
            File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
            ok = queueDataOldFile.delete();
            if (!ok) {
                throw new MqException("[MessageFileManager] 删除旧的数据文件失败! queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
            }
            // 把 queue_data_new.txt => queue_data.txt
            ok = queueDataNewFile.renameTo(queueDataOldFile);
            if (!ok) {
                throw new MqException("[MessageFileManager] 文件重命名失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()
                        + ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
            }
            // 5. 更新统计文件
            Stat stat = readStat(queue.getName());
            stat.totalCount = messages.size();
            stat.validCount = messages.size();
            writeStat(queue.getName(), stat);

            long gcEnd = System.currentTimeMillis();
            System.out.println("[MessageFileManager] gc 执行完毕! queueName=" + queue.getName() + ", time="
                    + (gcEnd - gcBeg) + "ms");


        }
    }
}

测试消息文件

package com.example.demo.mqServer.dataCenter;

import com.example.demo.Common.MqException;
import com.example.demo.mqServer.core.MSGQueue;
import com.example.demo.mqServer.core.Message;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.util.ReflectionTestUtils;

import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

import static org.junit.jupiter.api.Assertions.*;

@SpringBootTest
class MessageFileManagerTest {

    private MessageFileManager messageFileManager = new MessageFileManager();

    private static final String queueName1 = "testQueue1";
    private static final String queueName2 = "testQueue2";

    // 这个方法是每个用例执行之前的准备工作
    @BeforeEach
    public void setUp() throws IOException {
        // 准备阶段, 创建出两个队列, 以备后用
        messageFileManager.createQueueFiles(queueName1);
        messageFileManager.createQueueFiles(queueName2);
    }

    // 这个方法就是每个用例执行完毕之后的收尾工作
    @AfterEach
    public void tearDown() throws IOException {
        // 收尾阶段, 就把刚才的队列给干掉.
        messageFileManager.destroyQueueFiles(queueName1);
        messageFileManager.destroyQueueFiles(queueName2);
    }

    @Test
    public void testCreateFiles() {
        // 创建队列文件已经在上面 setUp 阶段执行过了. 此处主要是验证看看文件是否存在.
        File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");
        Assertions.assertEquals(true, queueDataFile1.isFile());
        File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");
        Assertions.assertEquals(true, queueStatFile1.isFile());

        File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");
        Assertions.assertEquals(true, queueDataFile2.isFile());
        File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");
        Assertions.assertEquals(true, queueStatFile2.isFile());
    }
    @Test
    public void testReadWriteStat() {
        MessageFileManager.Stat stat = new MessageFileManager.Stat();
        stat.totalCount = 100;
        stat.validCount = 50;
        // 此处就需要使用反射的方式, 来调用 writeStat 和 readStat 了.
        // Java 原生的反射 API 其实非常难用~~
        // 此处使用 Spring 帮我们封装好的 反射 的工具类.
        ReflectionTestUtils.invokeMethod(messageFileManager, "writeStat", queueName1, stat);

        // 写入完毕之后, 再调用一下读取, 验证读取的结果和写入的数据是一致的.
        MessageFileManager.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);
        Assertions.assertEquals(100, newStat.totalCount);
        Assertions.assertEquals(50, newStat.validCount);
        System.out.println("测试 readStat 和 writeStat 完成!");
    }

    private MSGQueue createTestQueue(String queueName) {
        MSGQueue queue = new MSGQueue();
        queue.setName(queueName);
        queue.setDurable(true);
        queue.setAutoDelete(false);
        queue.setExclusive(false);
        return queue;
    }

    private Message createTestMessage(String content) {
        Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());
        return message;
    }

    @Test
    public void testSendMessage() throws IOException, MqException, ClassNotFoundException {
        // 构造出消息, 并且构造出队列.
        Message message = createTestMessage("testMessage");
        // 此处创建的 queue 对象的 name, 不能随便写, 只能用 queueName1 和 queueName2. 需要保证这个队列对象
        // 对应的目录和文件啥的都存在才行.
        MSGQueue queue = createTestQueue(queueName1);

        // 调用发送消息方法
        messageFileManager.sendMessage(queue, message);

        // 检查 stat 文件.
        MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);
        Assertions.assertEquals(1, stat.totalCount);
        Assertions.assertEquals(1, stat.validCount);

        // 检查 data 文件 对比 插入获取到的消息与 准备插入的消息是否一致
        LinkedList<Message> messages = messageFileManager.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(1, messages.size());
        Message curMessage = messages.get(0);
        Assertions.assertEquals(message.getMessageId(), curMessage.getMessageId());
        Assertions.assertEquals(message.getRoutingKey(), curMessage.getRoutingKey());
        Assertions.assertEquals(message.getDeliverMode(), curMessage.getDeliverMode());
        // 比较两个字节数组的内容是否相同, 不能直接使用 assertEquals 了.
        Assertions.assertArrayEquals(message.getBody(), curMessage.getBody());

        System.out.println("message: " + curMessage);
    }


    @Test
    public void testLoadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {
        // 往队列中插入 100 条消息, 然后验证看看这 100 条消息从文件中读取之后, 是否和最初是一致的.
        MSGQueue queue = createTestQueue(queueName1);
        List<Message> expectedMessages = new LinkedList<>();
        for (int i = 0; i < 100; i++) {
            Message message = createTestMessage("testMessage" + i);
            messageFileManager.sendMessage(queue, message);
            expectedMessages.add(message);
        }

        // 读取所有消息
        LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(expectedMessages.size(), actualMessages.size());
        for (int i = 0; i < expectedMessages.size(); i++) {
            Message expectedMessage = expectedMessages.get(i);
            Message actualMessage = actualMessages.get(i);
            System.out.println("[" + i + "] actualMessage=" + actualMessage);

            Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
            Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
            Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
            Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
            Assertions.assertEquals(0x1, actualMessage.getIsValid());
        }
    }

    @Test
    public void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {
        // 创建队列, 写入 10 个消息. 删除其中的几个消息. 再把所有消息读取出来, 判定是否符合预期.
        MSGQueue queue = createTestQueue(queueName1);
        List<Message> expectedMessages = new LinkedList<>();
        for (int i = 0; i < 10; i++) {
            Message message = createTestMessage("testMessage" + i);
            messageFileManager.sendMessage(queue, message);
            expectedMessages.add(message);
        }

        // 删除其中的三个消息
        messageFileManager.deleteMessage(queue, expectedMessages.get(7));
        messageFileManager.deleteMessage(queue, expectedMessages.get(8));
        messageFileManager.deleteMessage(queue, expectedMessages.get(9));

        // 对比这里的内容是否正确.
        LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(7, actualMessages.size());
        for (int i = 0; i < actualMessages.size(); i++) {
            Message expectedMessage = expectedMessages.get(i);
            Message actualMessage = actualMessages.get(i);
            System.out.println("[" + i + "] actualMessage=" + actualMessage);

            Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
            Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
            Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
            Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
            Assertions.assertEquals(0x1, actualMessage.getIsValid());
        }
    }


    @Test
    public void testGC() throws IOException, MqException, ClassNotFoundException {
        // 先往队列中写 100 个消息. 获取到文件大小.
        // 再把 100 个消息中的一半, 都给删除掉(比如把下标为偶数的消息都删除)
        // 再手动调用 gc 方法, 检测得到的新的文件的大小是否比之前缩小了.
        MSGQueue queue = createTestQueue(queueName1);
        List<Message> expectedMessages = new LinkedList<>();
        for (int i = 0; i < 100; i++) {
            Message message = createTestMessage("testMessage" + i);
            messageFileManager.sendMessage(queue, message);
            expectedMessages.add(message);
        }

        // 获取 gc 前的文件大小
        File beforeGCFile = new File("./data/" + queueName1 + "/queue_data.txt");
        long beforeGCLength = beforeGCFile.length();

        // 删除偶数下标的消息
        for (int i = 0; i < 100; i += 2) {
            messageFileManager.deleteMessage(queue, expectedMessages.get(i));
        }

        // 手动调用 gc
        messageFileManager.gc(queue);

        // 重新读取文件, 验证新的文件的内容是不是和之前的内容匹配
        LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(50, actualMessages.size());
        for (int i = 0; i < actualMessages.size(); i++) {
            // 把之前消息偶数下标的删了, 剩下的就是奇数下标的元素了.
            // actual 中的 0 对应 expected 的 1
            // actual 中的 1 对应 expected 的 3
            // actual 中的 2 对应 expected 的 5
            // actual 中的 i 对应 expected 的 2 * i + 1
            Message expectedMessage = expectedMessages.get(2 * i + 1);
            Message actualMessage = actualMessages.get(i);

            Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
            Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
            Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
            Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
            Assertions.assertEquals(0x1, actualMessage.getIsValid());
        }
        // 获取新的文件的大小
        File afterGCFile = new File("./data/" + queueName1 + "/queue_data.txt");
        long afterGCLength = afterGCFile.length();
        System.out.println("before: " + beforeGCLength);
        System.out.println("after: " + afterGCLength);
        Assertions.assertTrue(beforeGCLength > afterGCLength);
    }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/822350.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

云服务器无法远程连接服务

问题 今天刚买了华为的云服务器&#xff0c;通过宝塔安装了对应的基础服务&#xff0c;也在华为云的官网上设置了安全组。此时万事俱备只欠东风&#xff0c;我测试使用sqlyog进行远程连接。原本已经在准备部署项目了&#xff0c;现实给了我重重的一拳。 sqlyog爆2003代码错误&…

交换机VLAN技术和实验(eNSP)

目录 一&#xff0c;交换机的演变 1.1&#xff0c;最小网络单元 1.2&#xff0c;中继器&#xff08;物理层&#xff09; 1.3&#xff0c;集线器&#xff08;物理层&#xff09; 1.4&#xff0c;网桥&#xff08;数据链路层&#xff09; 二&#xff0c;交换机的工作行为 2.…

IO进线程——库的制作(静态库、动态库)

库的制作 1、静态库 ①生成二进制文件 gcc -c linkstack.c -o linkstack.o②制作静态库文件&#xff0c;把.o文件转换为.a文件 ar crs liblinkstack.a linkstack.o③编译时链接 gcc linkstack_main.c -L. -llinkstack2、动态库 ①生成地址无关二进制文件 gcc -fPIC -c l…

《工具箱-VNCServer》配置VNCServer,使用VNCViewer实现局域网内页面共享

VNCServer设置 通过VNCServer配置&#xff0c;与VNCviewer配套使用 1.下载并安装VNCServer 2.邮箱密码注册后用户登录 3.设置VNC密码 4.设置viewer不能控制本机 5.打开VNClicensewiz&#xff0c;选择“Enter a license key …” BQ24G-PDXE4-KKKRS-WBHZE-F5RCA BQ24G-PDXE4-…

mysql进阶-修改linux服务器中MySQL的字符集

1.背景 linux中mysql8默认的字符集是latin1&#xff0c;在插入中文时会报错&#xff0c;所以一般在配置好mysql时需要修改字符集为utf8【又叫utfmb3,一般开发够用&#xff0c;一个字符用3个字节表示】或者utfmb4【一个字符用4个字节表示&#xff0c;如果存储emoji表情&#xf…

Linux实战:五子棋

一、五子棋原理 采用二维数组保存棋盘信息,棋盘上面的任何一个位置,里面可以放置三类信息。 空用户1的落子(黑子)用户2的落子(白子)下棋就是在二维数组中找对应的空位置,进行落子落完子之后下来就要考虑该落子位置是否有”五子连珠“,进而进行输赢判定,每一次走棋,多…

如何让python在手机上运行,python在手机上怎么运行

大家好&#xff0c;小编来为大家解答以下问题&#xff0c;python程序如何在手机上运行&#xff0c;如何让python在手机上运行&#xff0c;现在让我们一起来看看吧&#xff01; 在计算机语言的运用过程中PythonS60手机是经常被使用的计算机语言&#xff0c;以下的文章是介绍Pyth…

华为OD机试真题 JavaScript 实现【明明的随机数】【2023Q1 100分】,附详细解题思路

目录 一、题目描述二、输入描述三、输出描述四、解题思路五、JavaScript算法源码 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 刷的越多&#xff0c;抽中的概率越大&#xff0c;每一题都有详细的答题思路、详细的代码注释、样例测试&#xff0c;发现新题目&#x…

Python超实用!批量重命名文件/文件夹,只需1行代码

大家好&#xff0c;这里是程序员晚枫&#xff0c;之前在小破站给大家分享了一个视频&#xff1a;批量重命名文件。 最近在程序员晚枫的读者群里&#xff0c;发现很多朋友对这个功能很感兴趣&#xff0c;尤其是对下一步的优化&#xff1a;批量重命名文件夹。 这周我利用下班时…

Java 解析Excel单元格的富文本

1. 总体介绍 该方法是解析 xlsx 单元格中的富文本&#xff0c;注意不是 xls&#xff0c;xls 的 api 不一样&#xff0c;试了很久没成功。只实现了解析 斜体字、上下标&#xff0c;其它的实现方式应该类似。 2. 具体实现 2.1 代码 package util;import java.io.FileInputStr…

嵌入式一开始该怎么学?学习单片机

学习单片机&#xff1a; 模电数电肯定必须的&#xff0c;玩单片机大概率这两门课都学过&#xff0c;学过微机原理更好。 直接看野火的文档&#xff0c;芯片手册&#xff0c;外设手册。 学单片机不要纠结于某个型号&#xff0c;我认为stm32就OK&#xff0c;主要是原理和感觉。…

【ZYNQ】Linux驱动之梦开始的地方

软件版本&#xff1a;Vivado2021.1 操作系统&#xff1a;WIN10 64bit、Ubuntu18.04 硬件平台&#xff1a;ZYNQ UltraScale 文章目录 1.1系统框图1.2介绍1.2.1寄存器查询手册1.2.2物理地址与虚拟地址1.2.3MIO介绍1.2.4PS的LED 引脚介绍 1.3搭建工程1.4程序分析1.4.1驱动程序分析…

edgeConnector新版兼容ARM-为用户提供更多部署方案

Softing工业自动化的edgeConnector产品新版3.50兼容ARM处理器——大大扩展了edgeConnector产品的应用范围。 &#xff08;edgeConnector新版兼容ARM——为用户提供更多部署方案&#xff09; Softing edgeConnector产品系列是基于Docker的软件应用&#xff0c;可访问SIMATIC S7、…

QT 在label上透明绘图

一、新建TransparentDemo工程 二、在界面上添加label&#xff0c;修改样式表&#xff0c;将底色置为红色&#xff0c;作为北京 三、新建一个TransparentLabel类&#xff0c;继承自QLabel 此时&#xff0c;工程包括文件 五、在transparentlabel.h中添加 头文件 #include …

静态分析全解析:助力高质量软件开发,降低成本风险

按时发布高质量版本、需要满足编码和合规标准&#xff0c;而且决不能出现错误……静态分析为那些面临巨大压力的队伍提供了帮助。 这就是为什么开发团队都在使用静态分析/源代码分析工具。本篇文章将讨论静态分析、使用静态代码分析器的优势&#xff0c;以及静态分析的局限性。…

MySQL语法

Mysql通用语法 SQL可多行书写&#xff0c;分号结尾SQL语句可以使用空格进行缩进&#xff0c;增加可读性MYSQL&#xff0c;不区分大小写&#xff0c;但关键字推荐使用大写注释&#xff1a; 单行注释&#xff1a;--注释内容 或#注释内容 多行注释&#xff1a;/*注释内容*/ SQ…

声音从何而来?如何保护自己免受聊天工具中欺诈性语音邮件的侵害?

&#x1f5e3; 并不是每个人都喜欢在信使上收到语音邮件。 如果它们也是骗子发来的&#xff0c;那就更令人不快了。 让我们来看看攻击者都会使用哪些手段吧 社交工程 骗子可以使用语音信息而不是电话来散布虚假信息&#xff0c;例如&#xff0c;以银行的名义散布有关账户问…

windows下的txt文档,传到ubuntu后,每行后面出现^M,怎么处理?

问题背景&#xff1a;windows下pycharm生成的txt文档&#xff0c;传到ubuntu后&#xff0c;每行后面出现^M 用vim打开显示 使用cat -A filename显示如下 参考https://www.lmlphp.com/user/16697/article/item/579325/给出的几种方法 方法一、dos2unix filename。服务器没装…

《网络是怎样连接的》(二.2)

(6条消息) 《网络是怎样连接的》&#xff08;二.1&#xff09;_qq_38480311的博客-CSDN博客 本文主要取材于 《网络是怎样连接的》 第二章 2.5 2.6章节。 目录 简述&#xff1a; 本文的主要内容是 以太网的收发操作 和 UDP协议的收发操作。 IP与以太网的包收发操作 包是什…

百分点科技参编信通院《数据中台实践指南(1.0版)》

日前&#xff0c;在大数据产业发展大会上&#xff0c;百分点科技参与编写的《数据中台实践指南&#xff08;1.0版&#xff09;》正式发布&#xff0c;该指南由中国信息通信研究院大数据技术标准推进委员会指导和组织&#xff0c;百分点科技、阿里云、中国移动、中国联通、交通银…