仿 RabbitMQ 的消息队列3(实战项目)

news2025/1/24 22:44:47

七. 消息存储设计

上一篇博客已经将消息统计文件的读写代码实现了,下一步我们将实现创建队列文件和目录。

实现创建队列文件和目录

初始化 0\t0 这样的初始值.

//创建队列对应的文件和目录:
    public void createQueueFile(String queueName) throws IOException, MqException {
        //先创建对应的目录:
        File file = new File(getQueueDir(queueName));
        if(!file.exists()){
            boolean ok = file.mkdirs();
            if(!ok) throw new IOException("创建队列目录失败。baseDir:"+file.getAbsolutePath());
        }else{
            throw new MqException("[createQueueFile] 队列对应的目录已经被创建过了,创建失败");
        }
        //下面开始创建 数据文件:
        File dataFile = new File(getQueueDataDir(queueName));
        if(!dataFile.exists()){
            boolean ok = dataFile.createNewFile();
            if(!ok) throw new IOException("创建数据文件失败。queuedataDir:"+dataFile.getAbsolutePath());
        }else{
            throw new MqException("[createQueueFile] 队列对应的数据文件已经被创建过了,创建失败");
        }
        //创建 统计文件:
        File statFile = new File(getQueueStatDir(queueName));
        if(!statFile.exists()){
            boolean ok = statFile.createNewFile();
            if(!ok) throw new IOException("创建统计文件失败。queuestatDir:"+statFile.getAbsolutePath());
        }else{
            throw new MqException("[createQueueFile] 队列对应的统计文件已经被创建过了,创建失败");
        }
        //给消息统计文件设定初始值 0\t0, (消息数量:0,有效消息数量:0)
        // 目的:不用在今后使用的时候对空文件做一些特殊的判定
        Stat stat = new Stat();
        stat.totalCount = 0;
        stat.validCount = 0;
        //再写入:
        writeStat(queueName,stat);
    }

实现删除文件或目录

注意:File 类的 delete ⽅法只能删除空⽬录. 因此需要先把内部的⽂件先删除掉,如果还存在多余文件,就会删除失败。

//删除队列的文件或目录:
    //队列也是可以删除的,当队列删除后,对应的消息文件啥的,也要随之删除。
    public void deleteQueueFile(String queueName) throws IOException{
        //先删除 数据文件:
        File queueDataFile = new File(getQueueDataDir(queueName));
        boolean ok1 = queueDataFile.delete();
        //再删除 统计文件:
        File queueStatFile = new File(getQueueStatDir(queueName));
        boolean ok2 = queueStatFile.delete();
        //再删除目录;
        File file = new File(getQueueDir(queueName));
        boolean ok3 = file.delete();
        if(!(ok1 && ok2 && ok3)){
            //任意一个删除失败,就失败,抛出异常:
            throw new IOException("删除队列文件或目录失败");
        }
    }

检查队列⽂件是否存在

判定该队列的消息⽂件和统计⽂件是否存在. ⼀旦出现缺失, 则不能进⾏后续⼯作.

//检查队列的 文件或目录 是否存在: 目的:判断是否队列之前被 别人用过。
    //用处1:如果后续有生产者给 broker server 生产消息了,这个消息可能需要记录到文件上,此时需要判断文件是否存在(持久化的应用)。
    public boolean checkFilesExists(String queueName){
        //如果队列的 数据文件和统计文件都存在,才存在:
        File queueDataFiles = new File(getQueueDataDir(queueName));
        if(!queueDataFiles.exists()) return false;
        File queueStatFiles = new File(getQueueStatDir(queueName));
        if(!queueStatFiles.exists()) return false;
        //都存在,则返回true;
        return true;
    }

实现消息对象序列化/反序列化

先创建工具类BinaryTool用与序列化/反序列化。
在这里插入图片描述

  • 使⽤ ByteArrayInputStream / ByteArrayOutputStream 针对 byte[] 进⾏封装, ⽅便后续操作. (这两个流对象是纯内存的, 不需要进⾏ close).
  • 使⽤ ObjectInputStream / ObjectOutputStream 进⾏序列化 / 反序列化操作. 通过内部的readObject / writeObject 即可完成对应操作.
/**
 * 这个类用来序列化 与反序列化
 * 此处我们采用的是java标注库里的 ObjectOutputStream 和ObjectInputStream 两个流对象,但是序列化的对象必须要实现Serializable接口、
 *
 * 由于将序列化,反序列化当做一个工具,很多数据都可能用到,所以我们将它的方法搞成静态的
 *
 */
public class BinaryTool {
    //序列化:
    public static byte[] toBytes(Object object) throws IOException {
        //由于在try里面写流对象能自动关闭省去我们不少事,所以,直接写在try()里
        //这里 使用ByteArrayOutputStream是因为 未知的byte数组的长度,这个类能自动记录。
        try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){
            //将byteArrayOutputStream 传入ObjectOutputStream 就相当于将他们相互关联了,当objectOutputStream调用writeObject方法
            //时会将这个对象写入关联的byteArrayOutputStream里,然后直接调用byteArrayOutputStream里的方法,将序列化的数据转换成直接数组就行了
            //其实这个 ObjectOutputStream 不仅可以关联数组,还可以是文件,网络。关联了文件就将对象序列化到文件里,关联了网络,就是网络数据的传输socket。
            try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){
                objectOutputStream.writeObject(object);
            }
            //这个操作就是把byteArrayOutputStream中持有的二进制数据取出来,转成byte[]
            return byteArrayOutputStream.toByteArray();
        }
    }
    //反序列化:
    public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){
            try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){
                return objectInputStream.readObject();
            }
        }
    }

}

实现写入消息文件

  • 考虑线程安全, 按照队列维度进⾏加锁.
  • 使⽤ DataOutputStream 进⾏⼆进制写操作. ⽐原⽣ OutputStream 要⽅便.
  • 需要记录 Message 对象在⽂件中的偏移量. 后续的删除操作依赖这个偏移量定位到消息,这也是message里的偏移量初始化的时候,就是发送消息的时候。offsetBeg是原有⽂件⼤⼩的基础上, 再 + 4. 4 个字节是存放消息大小的空间.
  • 写完消息, 要同时更新统计信息.
 //该方法将传来的一个新的消息放到对应的文件当中:新增消息
    public void sendMessage(MESGQueue queue, Message message) throws IOException, MqException {
        //先检查文件是否存在:如果不存在怎抛出异常,这个异常可以自定义。
        if(!checkFilesExists(queue.getName())){
            throw new MqException("[MessageFileManager对应的文件不存在]!queueName"+queue.getName());
        }
        //先进行序列化:
        byte[] binaryMessage = BinaryTool.toBytes(message);
        //为了解决线程安全问题,我们引入锁,如果此时的锁对象 是同一个队列,那就阻塞等待。
        synchronized (queue){
            //先将数据文件new出来,看看此时文件里已经写入的数据长度,方便我们后续计算offsetbegin和offsetend
            File file = new File(getQueueDataDir(queue.getName()));
            //在写入消息的时候才对message里的offsetbegin和offsetend 进行赋值:
            message.setOffsetBeg(file.length()+4);
            message.setOffsetEnd(file.length()+4+binaryMessage.length);
            //由于我们的写入是追加写入,所以不要忘记 true
            try (OutputStream outputStream = new FileOutputStream(file,true)){
                try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
                    //先写入4个字节的消息长度:
                    dataOutputStream.writeInt(binaryMessage.length);
                    //再写入offsetbegin 和 offsetend
                    dataOutputStream.write(binaryMessage);
                }
            }
            //此时已经将消息数据文件写完了,不要忘记消息统计文件:
            Stat stat = readStat(queue.getName());
            stat.validCount++;
            stat.totalCount++;
            writeStat(queue.getName(),stat);
        }
    }

创建异常类MqException

作为⾃定义异常类. 后续业务上出现问题, 都统⼀抛出这个异常
在这里插入图片描述

public class MqException extends Exception{
    public MqException(String reason){
        super(reason);
    }
}

实现删除消息

此处的删除只是 “逻辑删除”, 即把 Message 类中的 isValid 字段设置为 0.

  • 使⽤ RandomAccessFile 来随机访问到⽂件的内容.(随机访问其实没什么玄乎的,就像数组一样,能通过下标快速访问某个元素,这就是随机访问的原理。内存是支持随机访问的)。
  • 根据 Message 中的 offsetBeg 和 offsetEnd 定位到消息在⽂件中的位置. 通过randomAccessFile.seek 操作⽂件指针偏移过去. 再读取.
  • 读出的结果解析成 Message 对象, 修改 isValid 字段, 再重新写回⽂件. 注意写的时候要重新设定文件指针的位置. ⽂件指针会随着上述的读操作产⽣改变,所以要重新seek,将光标移动到开始。
  • 最后, 要记得更新统计⽂件, 把合法消息 - 1.
 //删除消息:主要的操作歩奏:
    // 1,将消息读出来
    //2,将消息里的isVail 改成0x0
    //3,将消息放回文件中
    public void deleteMessage(MESGQueue queue,Message message) throws IOException, ClassNotFoundException {
        //由于删除消息的时候也可能收到线程安全问题,所以我们要加锁:
        synchronized (queue){
            //先将消息读出来:
            //由于我们正常使用的FileInputStream,只能从头开始读。而此时的场景更倾向于 随机读取,所以我们使用到了RandomAccessFile进行随机读取
            //注意这个RandomAccessFile类的第二个参数:rw可读可写。
            try(RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataDir(queue.getName()),"rw")){
                //先准备一个byte数组用来放 读出来的二进制数据:
                byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];
                //先将光标刷新到 offsetbegin
                randomAccessFile.seek(message.getOffsetBeg());
                //将message二进制数据读出来
                randomAccessFile.read(bufferSrc);
                //转换成message对象:
                Message message1 = (Message) BinaryTool.fromBytes(bufferSrc);
                //将message里的isVail 改成0x0
                message1.setIsVail((byte) 0x0);
                //将新的message1 转成二进制:
                byte[] bufferDest = BinaryTool.toBytes(message1);
                //由于上一次读文件光标已经发生了变化,所以此时还要调整光标到offsetbegin
                randomAccessFile.seek(message.getOffsetBeg());
                //将数据写入文件:
                randomAccessFile.write(bufferDest);

                //此时已经将数据文件里的vail改成无效,那我们需不需要将这个内存中的 message对象里的vail也改成无效呢?
                //可以是可以,但是没有必要:想象一下我们将一个文件标记成无效的场景是不是我们此时要删掉这个文件的时候,
                //此时我们都要删掉这个文件了,当然要连同文件里的数据和内存中的数据都删了呀,文件里的数据可能需要一些歩奏,
                //但是在内存中删一个对象实在太容易了,今后会有内存中的删除消息操作。这就相当于让一个将死之人多活几秒,但他终究逃不过死亡
                //这就是message里的vail 其实不需要改动的原因。
            }
            //不要忘了统计文件也要更新, 由于我们此时已经将数据文件里的一个消息改成无效的,所以此时统计文件里的有效消息就要--了
            Stat stat = readStat(queue.getName());
            if(stat.validCount >0){
                stat.validCount --;
            }
            //再将更新后的统计信息 写入文件
            writeStat(queue.getName(),stat);
        }
    }

实现消息加载

这个功能在服务器重启, 和垃圾回收的时候都很关键

  • 使⽤ DataInputStream 读取数据. 先读 4 个字节为消息的⻓度, 然后再按照这个⻓度来读取实际消息内容.
  • 读取完毕之后, 转换成 Message 对象.
  • 同时计算出该对象的 offsetBeg 和 offsetEnd.
  • 最终把结果整理成链表, 返回出去.
  • 注意, 对于 DataInputStream 来说, 如果读取到 EOF, 会抛出⼀个 EOFException , ⽽不是返回特定值. 因此需要注意上述循环的结束条件.
//从消息数据文件当中读出所有消息:
    //由于是服务器刚启动的时候才会调用这个方法,此时的队列还不能处理各种请求,所以不需要考虑线程安全问题。
    public LinkedList<Message> loadAllMessagesFromQueueDataFile(String queueName) throws IOException, ClassNotFoundException, MqException {
        //先new出来一个linkedList来放所有消息:使用链表是因为要进行头删和尾删等操作:
        LinkedList<Message> messages = new LinkedList<>();
        //创建流对象:
        try(InputStream inputStream = new FileInputStream(getQueueDataDir(queueName))){
            //与上面的DataOutputStream对应,此时用的是DataInputStream
            try(DataInputStream dataInputStream = new DataInputStream(inputStream)){
                //由于要读的消息可能不止一条,所以用一个while循环:
                //但是如果我们直接这样写会一直重复的读一个消息,而DataInputStream不能控制光标的移动,所以要定义一个量来
                //记录我们读到哪里了,另外,这个量也为后续message对象的offsetbegin和offsetend的初始化提供便利
                long currentOffset = 0;
                //写完大概得逻辑以后不知道不会不会有疑问,这个while的条件可是true啊,这不死循环了嘛,
                //其实这也是无奈之举,主要原因是dataInputStream.readInt()读到文件的末尾并不会返回-1,EOF啥的,而是
                //直接抛出 EOFException异常,直接结束循环,因此我们只用在外层catch住这个异常就行了,这是一个很特别的预料之内的循环结束方式
                while(true){
                    //先读4个字节,求出数据的长度:
                    int messageLen = dataInputStream.readInt();
                    //创建一个刚好能装messageLen长度的字节数组:
                    byte[] messageBinary = new byte[messageLen];
                    //读出消息数据:并且用变量接收,判断读出的数据是否和预期的数据长度一致,若不一致,说明格式不正确,错乱了则抛出异常
                    int realMessageLen = dataInputStream.read(messageBinary);
                    if(realMessageLen != messageLen){
                        throw new MqException("[MessageFileManager] 文件格式错误!!!queueName:"+queueName);
                    }
                    //将数组反序列化成message对象
                    Message message = (Message) BinaryTool.fromBytes(messageBinary);
                    //如果读到的消息是无效的,则跳过这个无效消息,更新currentoffset:
                    if(message.getIsVail() == 0x0){
                        currentOffset+=(4+messageLen);
                        continue;
                    }
                    //再将message里的offsetbegin和offsetend给初始化:
                    message.setOffsetBeg(currentOffset+4);
                    message.setOffsetEnd(currentOffset+4+messageLen);
                    //正常读完后,别忘了,将currentoffset更新
                    currentOffset+=(4+messageLen);
                    //再将消息加入到链表当中
                    messages.add(message);
                }
            }catch (EOFException e){
                System.out.println("[MessageFileManager] 恢复Message 数据完成!!!");
            }
        }
        return messages;
    }

实现垃圾回收(GC)

  • 上述删除操作, 只是把消息在⽂件上标记成了⽆效. 并没有腾出硬盘空间. 最终⽂件⼤⼩可能会越积越多. 因此需要定期的进⾏批量清除.
  • 此处使⽤类似于复制算法. 当总消息数超过 2000, 并且有效消息数⽬少于 50% 的时候, 就触发 GC。GC 的时候会把所有有效消息加载出来, 写⼊到⼀个新的消息⽂件中, 使⽤新⽂件, 代替旧⽂件即可.
 public void gc(MESGQueue queue) throws MqException, IOException, ClassNotFoundException {
        //根据以前的写代码经验,次GC过程可能有线程安全问题,所以我们直接加锁:
        //其实这也是为什么形参传入的是一个队列,而不是队列的名字的其中一个原因。
        synchronized (queue){
            //由于GC的执行时间可能很慢,我们手动的将时间计算出来,如果将来服务器运行半天无响应了,如果是GC的问题
            //我们也能知道
            long gcBegin = System.currentTimeMillis();
            //先创建新的文件:
            File newQueueFile = new File(getQueueDataNewPath(queue.getName()));
            //如果文件已经存在了,可能上一次gc有残留,这是不正常的,所以抛出异常
            if(newQueueFile.exists()){
                throw new MqException("[MessageFilemanager] 队列的queue_new_data.txt已经存在!!queueName:"+queue.getName());
            }
            //如果执行到这,说明文件不存在,则创建新文件:
            boolean ok = newQueueFile.createNewFile();
            //如果创建文件失败,则抛出异常:
            if(!ok){
                throw new MqException("[MessageFileManager] 创建文件失败!!newQueueDataFile:"+newQueueFile.getAbsolutePath());
            }
            //先创建一个链表用来存储从原来的文件中取出来的message对象:此处可以用到之前的方法:
            //取出原来文件里的所有有效文件:
            LinkedList<Message> messages = loadAllMessagesFromQueueDataFile(queue.getName());
            //new出相应的流对象用来写入新文件:
            //这里我写错了,将queue.getName()传入了,但是明明是一个不存在的路径,他竟然还能正常写?底层也不抛出异常,
            //我真是又惊讶,又惊吓。
            //之后我又去查了查资料:原来是FileOutputStream的问题啊,FileOutputStream太nb了,
            //如果传入的字符串对应的路径不存在,FileOutputStream会自动给你创建一个文件用于写入,这这这也太贴心了吧,
            //不过我还是希望他能直接抛异常,毕竟我找bug也找了这么久了,况且天知道他会把我的数据写到哪里:
            //其实也知道:如果是绝对路径,他会自动创建路径下的文件;如果是相对路径,他会在当前工作空间创建一个文件。
            //找了一圈以后发下,在我的mq路径下,就存在一个queuetest1的文件,里面正好是之前我写入的数据,呜呜呜,要哭了。
//            try(OutputStream outputStream = new FileOutputStream(queue.getName())){
            try(OutputStream outputStream = new FileOutputStream(newQueueFile)){
                try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
                    //循环读取messages,将对象重新写入新文件:
                    for(Message m : messages){
                        //先将消息序列化:一个字节数组:
                        byte[] buffer = BinaryTool.toBytes(m);
                        //将二进制数组写入新的文件:注意遵循之前的约定:
                        dataOutputStream.writeInt(buffer.length);
                        dataOutputStream.write(buffer);
                    }
                }
            }
            //删除旧文件,这里以前传入的旧文件的路径写错了,直接传成了名字,所以写代码一定要细心啊。
//            File oldQueueFile = new File(queue.getName());
            File oldQueueFile = new File(getQueueDataDir(queue.getName()));
            boolean ok2 = oldQueueFile.delete();
            System.out.println("[ok2]oldQueueFile 文件删除:"+ok2);
            //如果删除失败,可能是没有权限之类的,抛出异常:
            if(!ok2){
                throw new MqException("MessageFileManager 删除旧文件失败!! oldDataQueueFile:"+oldQueueFile.getAbsolutePath());
            }
            //重命名新文件:
            boolean ok3 = newQueueFile.renameTo(oldQueueFile);
            //如果重命名失败,抛出异常:
            if(!ok3) {
                throw new MqException("[MessageFileManager] 新文件重命名失败!!oldDataQueueFile:"+oldQueueFile.getAbsolutePath()
                        +" , newDataQueueFile="+newQueueFile.getAbsolutePath());
            }
            //不要忘记更新统计文件:
            Stat stat = readStat(queue.getName());
            stat.totalCount = messages.size();
            stat.validCount = messages.size();
            writeStat(queue.getName(),stat);
            long gcEnd = System.currentTimeMillis();
            System.out.println("[MessageFileManager] GC执行完毕!!! 执行的时间:"+(gcEnd - gcBegin)+"ms");
        }

    }

测试MessageFileManager

创建MessageFileManagerTest类用于测试:
在这里插入图片描述

测试前的准备:

  • 创建两个队列, ⽤来辅助测试.
  • 使⽤ ReflectionTestUtils.invokeMethod 来调⽤私有⽅法(这就是传说中的反射,注意它的参数,用法)。
  •     ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);
    

这个反射的参数:
第一个参数:类的实例。
第二个参数:你想调用的方法
后面的参数就是不定参数了(数量不确定),能确定的是:后面的参数的就是你想调用的方法的参数。


@SpringBootTest
public class MessageFileManagerTest {
    private MessageFileManager messageFileManager = new MessageFileManager();
    private static final String queueName1 = "queuetest1";
    private static final String queueName2 = "queuetest2";

    @BeforeEach
    public void setUp() throws IOException, MqException {
        //由于我们要测试的是队列,所以准备工作就是先创建队列文件:
        messageFileManager.createQueueFile(queueName1);
        messageFileManager.createQueueFile(queueName2);
    }
    //这个@AfterEach注解我试过了,即使测试方法执行过程中抛出了异常,这个方法还是在每次执行完测试单元以后该执行他还是执行他,
    //无关乎异常,真nb
    @AfterEach
    public void tearDown() throws IOException {
        //首尾工作,将刚才创建的队列文件删掉:
        messageFileManager.deleteQueueFile(queueName1);
        messageFileManager.deleteQueueFile(queueName2);
    }
  }

测试代码:

@Test
    public void testCreateFile(){
        //其实就测试创建的队列文件是否存在:
        //由于我们在MessageFileManager里的get路径方法是 private修饰的,所以不能直接调用get路径方法,只能手动写上
        //检验 队列数据文件是否存在:
        File queueDataFile1 = new File("./data/"+queueName1+"/queue_data.txt");
        //此处用的方法是isFile 而不是exists,因为要判定这是个文件,并不是只是存在就行,存在了也可能是个目录。
        Assertions.assertEquals(true,queueDataFile1.isFile());
        //检验 队列统计文件是否存在:
        File queueStatFile1 = new File("./data/"+queueName1+"/queue_stat.txt");
        Assertions.assertEquals(true,queueStatFile1.isFile());

        //检验 队列数据文件是否存在:
        File queueDataFile2 = new File("./data/"+queueName2+"/queue_data.txt");
        Assertions.assertEquals(true,queueDataFile2.isFile());
        //检验 队列统计文件是否存在:
        File queueStatFile2 = new File("./data/"+queueName2+"/queue_stat.txt");
        Assertions.assertEquals(true,queueStatFile2.isFile());
        System.out.println("[CreateFileText] 测试创建队列文件成功!!!");
    }
    @Test
    public void testReadAndWriteStat(){
        //先创建出stat类,由于他是内部类,所以要类名. 调用出来:
        MessageFileManager.Stat stat = new MessageFileManager.Stat();
        stat.totalCount = 200;
        stat.validCount = 100;
        //此时写入stat 到统计文件当中:但是如果直接用messageFileManager. 由于writeStat是private修饰,所以肯定调用不出来,
        //此时就要用spring带的 反射方法了:
//        messageFileManager.
        //用反射将 stat写入统计文件:
        ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);
        //用反射将 写入的统计文件读出来:
        MessageFileManager.Stat statNew = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);
        //判断读出来的stat和我们设定的stat是否一样
        Assertions.assertEquals(200,statNew.totalCount);
        Assertions.assertEquals(100,statNew.validCount);
        System.out.println("[testReadAndWriteStat] 测试成功!!!");
    }

    //要想测试发送消息,首先要有队列和消息吧,所以,我们先写创建队列和消息的方法:
    private MESGQueue createQueue(){
        MESGQueue queue = new MESGQueue();
        //这里的队列名字不能随便取,因为随便取的队列名字也没有对应的文件啊,要用就要用已经创建了文件的队列名,
        //考虑到这个队列要与文件交互,而我们只创建了queuename1和queuename2两个名字对应的文件,所以只能用这两个名字的一个。
        queue.setName(queueName1);
        queue.setDurable(true);
        queue.setExclusive(false);
        queue.setAutoDelete(false);
        HashMap<String, Object> hashMap = new HashMap<>();
        hashMap.put("aaa", "111");
        hashMap.put("bbb", "222");
        queue.setArguments(hashMap);
        return queue;
    }
    private Message createMessage(String context){
        //此时能用到我们之前在message里写的创建 message的工厂类了:
        Message message = Message.createMessageWithId("testRoutingKey",null,context.getBytes());
        return message;
    }

    @Test
    public void testSendMessage() throws IOException, MqException, ClassNotFoundException {
        //先创建队列与消息:
        MESGQueue queue = createQueue();
        Message message = createMessage("abcdefghijklmnopqrstuvwxyz");
        //发送消息:
        messageFileManager.sendMessage(queue,message);

        //验证stat文件:
        MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);
        Assertions.assertEquals(1,stat.totalCount);
        Assertions.assertEquals(1,stat.validCount);

        //验证data文件:使用loadAllMessagesFromQueueDataFile读取文件内容:
        LinkedList<Message> messages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);
        //验证:
        Assertions.assertEquals(1,messages.size());
        Message message1 = messages.get(0);
        //判断这个message1和我们之前的消息message是否一样:
        Assertions.assertEquals(message.getMessageId(),message1.getMessageId());
        Assertions.assertEquals(message.getRoutingKey(),message1.getRoutingKey());
        Assertions.assertEquals(message.getDeliverMode(),message1.getDeliverMode());
        Assertions.assertArrayEquals(message.getBody(),message1.getBody());
        System.out.println("[testSendMessage] 测试成功!!!");
    }

    //虽然上一个testSendMessage 已经间接测试过这个方法,但是为了求稳,再测试一遍
    @Test
    public void testLoadAllMessagesFromQueueDataFile() throws IOException, MqException, ClassNotFoundException {
        //我们需要准备200条数据用来加载:
        //先创建一个队列用来存放消息:注意这个方法使用的是queueName1创建的队列
        MESGQueue queue = createQueue();
        //先创建一个链表用来保存消息,和后面新加载的消息作对比:
        LinkedList<Message> expectedMessages = new LinkedList<>();
        //使用for循环创建消息:
        for(int i =0;i<200;i++){
            Message message = createMessage("testMessage"+i);
            //将消息写入文件:
            messageFileManager.sendMessage(queue,message);
            //记录消息:
            expectedMessages.add(message);
        }
        //调用loadAllMessagesFromQueueDataFile取出所有消息:
        LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);
        //先验证队列的数目是否一致:
        Assertions.assertEquals(200,realMessages.size());
        //验证基本属性:
        for(int i= 0;i<realMessages.size();i++){
            Message realMessage = realMessages.get(i);
            Message expectMessage = expectedMessages.get(i);
            System.out.println("["+i+"]"+realMessage.toString());
            Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());
            Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());
            Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());
            Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());
            Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());
        }
        System.out.println("[testLoadAllMessagesFromQueueDataFile] 测试成功!!!");
    }

    //测试删除消息:
    @Test
    public void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {
        //先创建一个队列:
        MESGQueue queue = createQueue();
        //创建一个链表用来保存预期消息:
        LinkedList<Message> expectedMessages = new LinkedList<>();
        //再将20条消息都写入队列:
        for(int i =0;i<20;i++){
            Message message = createMessage("testMessage"+i);
            //将消息写入队列文件:
            messageFileManager.sendMessage(queue,message);
            //记录消息:
            expectedMessages.add(message);
        }
        //这里 就以删除前三条消息为例:
        messageFileManager.deleteMessage(queue,expectedMessages.get(0));
        messageFileManager.deleteMessage(queue,expectedMessages.get(1));
        messageFileManager.deleteMessage(queue,expectedMessages.get(2));

        //读出消息,对比:
        LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);
        //先判断个数:
        Assertions.assertEquals(17,realMessages.size());
        for(int i =3;i<20;i++){
            Message realMessage = realMessages.get(i-3);
            Message expectMessage = expectedMessages.get(i);
            System.out.println("["+i+"]"+realMessage.toString());
            Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());
            Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());
            Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());
            Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());
            Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());
        }
        System.out.println("[testDeleteMessage] 测试成功!!!");
    }

    //测试GC,这里的GC其实只是测试,不用管消息总数是否大于2000或有效消息占比不到50%,因为那是业务上的判定,会有专门的类来进一步封装
    //而此处我们只进行测试GC这个方法
    //计划将100条消息都存入队列,然后将奇数下标的消息都删除,然后执行GC,验证现在的文件是否比原来的文件小:
    @Test
    public void testGC() throws IOException, MqException, ClassNotFoundException {
        //创建一个队列:
        MESGQueue queue = createQueue();
        //创建一个链表用来记录消息:
        LinkedList<Message> expectedMessages = new LinkedList<>();
        //先发送100条消息:
        for(int i =0;i<100;i++){
            Message message = createMessage("testMessage"+i);
            //发送到队列:
            messageFileManager.sendMessage(queue,message);
            //记录
            expectedMessages.add(message);
        }
        //删除奇数下标的消息:
        for(int i =1;i<100;i+=2){
            messageFileManager.deleteMessage(queue,expectedMessages.get(i));
        }
        //先记录执行GG之前的文件大小:
        File oldFile = new File("./data/"+queueName1+"/queue_data.txt");
        long oldFileLength = oldFile.length();

        //执行GC
        messageFileManager.gc(queue);
        //记录执行完GC之后的文件大小:
        File newFile = new File("./data/"+queueName1+"/queue_data.txt");
        long newFileLength = newFile.length();
        //取出真实的消息:
        LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);
        //先验证消息数量是否对的上:
        Assertions.assertEquals(50,realMessages.size());
        //挨个验证消息:
        for(int i = 0;i<50;i++){
            Message realMessage = realMessages.get(i);
            Message expectMessage = expectedMessages.get(i*2);
            System.out.println("["+i+"]"+realMessage.toString());
            Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());
            Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());
            Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());
            Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());
            Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());
        }
        //验证文件大小:
        //这个验证的原理其实是:
        //删除一个文件并不是直接删除,而是逻辑删除,通过标记统计文件里的vail来标识的,此时的数据文件即使有很多无效的文件,但是他的大小依旧是total
        //而非vail有效文件的大小。但是如果进行了文件的GC迁移,此时的新文件的大小就是旧文件的vail有效文件的大小了。所以,新文件会小于旧文件的大小。
        System.out.println("[oldFileLength]:"+oldFileLength);
        System.out.println("[newFileLength]:"+newFileLength);
        Assertions.assertTrue(newFileLength<oldFileLength);
        System.out.println("[testGC] 测试成功!!!");
    }

测试结果:没问题

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2281662.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

多线程杂谈:惊群现象、CAS、安全的单例

引言 本文是一篇杂谈&#xff0c;帮助大家了解多线程可能会出现的面试题。 目录 引言 惊群现象 结合条件变量 CAS原子操作&#xff08;cmp & swap&#xff09; 线程控制&#xff1a;两个线程交替打印奇偶数 智能指针线程安全 单例模式线程安全 最简单的单例&…

腾讯 Hunyuan3D-2: 高分辨率3D 资产生成

腾讯 Hunyuan3D-2&#xff1a;高分辨率 3D 资产生成的突破 前言 在当今数字化时代&#xff0c;3D 资产生成技术正变得越来越重要。无论是游戏开发、影视制作还是虚拟现实领域&#xff0c;高质量的 3D 模型和纹理都是创造沉浸式体验的关键。然而&#xff0c;传统的 3D 资产制作…

R语言学习笔记之开发环境配置

一、概要 整个安装过程及遇到的问题记录 操作步骤备注&#xff08;包含遇到的问题&#xff09;1下载安装R语言2下载安装RStudio3离线安装pacman提示需要安装Rtools4安装Rtoolspacman、tidyfst均离线安装完成5加载tidyfst报错 提示需要安装依赖&#xff0c;试错逐步下载并安装…

DRG/DIP 2.0时代下基于PostgreSQL的成本管理实践与探索(上)

一、引言 1.1 研究背景与意义 在医疗领域的改革进程中&#xff0c; DRG/DIP 2.0 时代&#xff0c;医院成本管理的重要性愈发凸显。新的医保支付方式下&#xff0c;医院的收入不再单纯取决于医疗服务项目的数量&#xff0c;而是与病种的分组、费用标准以及成本控制紧密相关。这…

【数据结构】_顺序表

目录 1. 概念与结构 1.1 静态顺序表 1.2 动态顺序表 2. 动态顺序表实现 2.1 SeqList.h 2.2 SeqList.c 2.3 Test_SeqList.c 3. 顺序表性能分析 线性表是n个具有相同特性的数据元素的有限序列。 常见的线性表有&#xff1a;顺序表、链表、栈、队列、字符串等&#xff1b…

缓存之美:万文详解 Caffeine 实现原理(下)

上篇文章&#xff1a;缓存之美&#xff1a;万文详解 Caffeine 实现原理&#xff08;上&#xff09; getIfPresent 现在我们对 put 方法有了基本了解&#xff0c;现在我们继续深入 getIfPresent 方法&#xff1a; public class TestReadSourceCode {Testpublic void doRead() …

VSCode下EIDE插件开发STM32

VSCode下STM32开发环境搭建 本STM32教程使用vscode的EIDE插件的开发环境&#xff0c;完全免费&#xff0c;有管理代码文件的界面&#xff0c;不需要其它IDE。 视频教程见本人的 VSCodeEIDE开发STM32 安装EIDE插件 Embedded IDE 嵌入式IDE 这个插件可以帮我们管理代码文件&am…

HTTP 配置与应用(局域网)

想做一个自己学习的有关的csdn账号&#xff0c;努力奋斗......会更新我计算机网络实验课程的所有内容&#xff0c;还有其他的学习知识^_^&#xff0c;为自己巩固一下所学知识&#xff0c;下次更新HTTP 配置与应用&#xff08;不同网段&#xff09;。 我是一个萌新小白&#xf…

LiteFlow Spring boot使用方式

文章目录 概述LiteFlow框架的优势规则调用逻辑规则组件定义组件内数据获取通过 DefaultContext自定义上下文 通过 组件规则定义数据通过预先传入数据 liteflow 使用 概述 在每个公司的系统中&#xff0c;总有一些拥有复杂业务逻辑的系统&#xff0c;这些系统承载着核心业务逻…

mysql学习笔记-数据库的设计规范

1、范式简介 在关系型数据库中&#xff0c;关于数据表设计的基本原则、规则就称为范式。 1.1键和相关属性的概念 超键:能唯一标识元组的属性集叫做超键。 候选键:如果超键不包括多余的属性&#xff0c;那么这个超键就是候选键 主键:用户可以从候选键中选择一个作为主键。 外…

计算机网络 (55)流失存储音频/视频

一、定义与特点 定义&#xff1a;流式存储音频/视频是指经过压缩并存储在服务器上的多媒体文件&#xff0c;客户端可以通过互联网边下载边播放这些文件&#xff0c;也称为音频/视频点播。 特点&#xff1a; 边下载边播放&#xff1a;用户无需等待整个文件下载完成即可开始播放…

60,【1】BUUCF web [RCTF2015]EasySQL1

先查看源码 1&#xff0c;changepwd&#xff08;修改密码&#xff09; <?php // 开启会话&#xff0c;以便使用会话变量 session_start();// 设置页面的内容类型为 HTML 并使用 UTF-8 编码 header("Content-Type: text/html; charsetUTF-8");// 引入配置文件&…

我谈概率论与数理统计的知识体系

学习概率统计二十多年后&#xff0c;在廖老师的指导下&#xff0c;厘清了各章之间的关系。本来就是一条线两个分支&#xff0c;脉络很清晰。 分支一&#xff1a;从随机现象到样本空间到随机事件再到概率。 从随机事件到随机变量&#xff1a;为了进行定量的数学处理&#xff0…

基于海思soc的智能产品开发(视频的后续开发)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 前面我们讨论了camera&#xff0c;也讨论了屏幕驱动&#xff0c;这些都是基础的部分。关键是&#xff0c;我们拿到了这些视频数据之后&#xff0c;…

Python的进程和线程

ref 讲个故事先 这就像一个舞台&#xff08;CPU核心&#xff09;​&#xff0c; 要供多个剧组演出多个剧目&#xff08;进程&#xff09;​&#xff0c; 剧目中有多个各自独立的角色&#xff08;线程&#xff09;​&#xff0c;有跑龙套的&#xff0c;有主角&#xff0c;第一…

Xcode :给模拟器 创建桌面 快捷方式

给模拟器 创建 桌面 快捷方式&#xff1a; 1、找到xcode程序&#xff1b; 2、右击鼠标点击”显示包内容“菜单&#xff1b; 3、打开contents/developer/applications/ 找到Simulator工具图标&#xff0c;右击鼠标点击”制作替身“菜单&#xff1b; 4、将替身拖到桌面上。 …

STM32项目分享:智能厨房安全检测系统

目录 一、前言 二、项目简介 1.功能详解 2.主要器件 三、原理图设计 四、PCB硬件设计 PCB图 五、程序设计 六、实验效果 七、资料内容 项目分享 一、前言 项目成品图片&#xff1a; 哔哩哔哩视频链接&#xff1a; STM32智能厨房安全检测系统 &#xff08;资料分…

STM32_SD卡的SDIO通信_基础读写

本篇将使用CubeMXKeil, 创建一个SD卡读写的工程。 目录 一、SD卡要点速读 二、SDIO要点速读 三、SD卡座接线原理图 四、CubeMX新建工程 五、CubeMX 生成 SD卡的SDIO通信部分 六、Keil 编辑工程代码 七、实验效果 实现效果&#xff0c;如下图&#xff1a; 一、SD卡 速读…

ubuntu20.04安装使用direct_visual_lidar_calibration标定雷达和相机

官方链接GitHub - koide3/direct_visual_lidar_calibration: A toolbox for target-less LiDAR-camera calibration [ROS1/ROS2] 官方安装方式 Installation - direct_visual_lidar_calibration 安装依赖 sudo apt install libomp-dev libboost-all-dev libglm-dev libglfw…

华为EC6110T-海思Hi3798MV310_安卓9.0_通刷-强刷固件包

华为EC6110T-海思Hi3798MV310_安卓9.0_通刷-强刷固件包 刷机教程说明&#xff1a; 适用机型&#xff1a;华为EC6110-T、华为EC6110-U、华为EC6110-M 破解总分为两个部分&#xff1a;拆机短接破解&#xff08;保留IPTV&#xff09;和OTT卡刷&#xff08;不保留IPTV&#xff09…