项目实战 — 消息队列(4){消息持久化}

news2024/12/25 0:12:42

目录

 一、消息存储格式设计

       🍅 1、queue_data.txt:保存消息的内容

        🍅 2、queue_stat.txt:保存消息的统计信息

二、消息序列化

三、自定义异常类

四、创建MessageFileManger类

🍅 1、约定消息文件所在的目录和文件名字

 🍅 2、队列的统计信息

🍅 3、创建队列对应的目录和功能

🍅 4、实现删除队列的文件和目录

🍅 5、检查队列的目录和文件是否都存在

🍅 6、把消息写入到文件中

🍅 7、删除文件中的消息

🍅 8、将硬盘中的数据加载到内存中

🍅 9、实现消息文件的垃圾回收

        🎈 检测是否要进行GC

        🎈 构造新目录

        🎈 进行GC操作

五、测试MessageFileManager类

🍅 1、“准备工作”和“收尾工作”

🍅 2、测试创建文件是否存在

 🍅 3、测试writetStat和readStat是否能够通过

 🍅 4、测试sendMessage

🍅 5、测试删除消息

🍅 6、测试垃圾回收

六、小结


 一、消息存储格式设计

对于消息,并不打算存储在数据库中:

        (1)消息操作并不会涉及到复杂的增删改查

        (2)消息的数量可能会非常多,数据库的访问效率并不高

 所以,我们直接把消息存储在文件中。

那么消息要如何在文件中存储呢?

首先消息,它是依附于队列的,所以在存储的时候,就把消息按照队列的维度展开。

我们会将队列存储在和数据库同级的data目录中,在data中创建一些子目录,子目录的名字就是队列名。

然后在每个队列的子目录下面,再分配两个文件,用来存储消息。主要是以下两个文件。


       🍅 1、queue_data.txt:保存消息的内容

        Message是一个二进制格式的文件,包含若干个消息,每个消息都以二进制的方式存储,每个消息都由这几个部分构成:Message对象序列化之后。

        Messag对象,会分别再内存和硬盘上都记录一份。内存中的衣服呢会记录offsetBegin和offsetEnd。这样就可以找到内存中的Message对象,能够找到对应的硬盘上的message对象。

        关于isValid:是用来标识当前消息在文件中是否有效,为1就是有效的消息,为0就是无效的消息。当为0时就相当于逻辑上的删除消息功能。但是,随着时间的推移,消息的增多,那么该消息可能就会大部分都是无效的消息,针对这种情况,就需要对当前消息的文件,进行垃圾回收。

以下是本程序中实现的垃圾回收功能:   

  垃圾回收(GC):使用复制算法,针对消息数据文件中的垃圾回收进行回收。直接遍历原有的消息数据文件,把有效的数据拷贝到一个新的文件中,然后把之前的旧文件都删除掉(逻辑删除)。

作出约定(可以不按这个来),当总消息数目超过了2000,并且有效的消息数目低于总数目的50%,就出发一次垃圾回收。

 对于RabbitMQ解决垃圾回收的方式如下:

 如果某个消息队列中,消息很多,都是有效消息,就会导致整个消息的数据文件特别答,后续针对文件的各种操作,成本就会很高。RabbitMQ解决方案如下:

    文件拆分:当单个文件长度达到一定阈值以后,就会拆分成两个文件(拆分次数越多文件就越多)

    文件合并:每个单独的文件都会进行GC,如果GC之后,就会发现文件变小很多,当小到一定程度,就会和其他文件合并。

这样就会再消息特别多的时候,也能保证性能上的及时响应。


        🍅 2、queue_stat.txt:保存消息的统计信息

        使用这个文件,来保存消息的统计信息。

        只存一行数据,文本格式。然后一行包括两列:两者使用 \t 来分割,形如2000\t500

                第一列是queue_data.txt中总的消息的数目

                第二列是queue_data.txt中有效消息的数目。


二、消息序列化

消息序列化:把一个对象(结构化数据)转成一个字符串/字节数组。

在序列化之后,对象的信息是不会丢失的,这样就会方便与存储和传输(在文件中存储时,只能以字符串/二进制数据的方式存储对象)。后面需要用的时候,就再反序列化。

由于消息的body是二进制数据,所以这里不会使用JSON进行序列化。

针对二进制序列化,有很多解决方案:        

        (1)Java标准库中提供了序列化的方案:ObjectInputStream 和 ObjectOutputStream

        (2)Hession

        (3)protobuffer

        (4)thrift

这里使用第一种,这样就不用再引入额外的依赖。

在commen中创建一个BinaryTool,因为后面客户端还会用到这个类,所以放在公共的包中。


/*
* 序列化和反序列化
* 实现Serializable接口才能让这个对象进行序列化和反序列化
* */
public class BinaryTool {
    //    把一个对象序列化成一个字节数组
    public static byte[] toBytes(Object object) throws IOException {
//        这个流对象相当于一个边长的字节数组
//        就可以把object序列化的数组逐渐写入到byteArrayOutputStream中,然后统一转成byte[]
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
            try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
//                此处的writeObject就会把该对象进行序列化,生成二进制字节数据,就会写入到ObjectOutputStream中
//                由于ObjectOutputStream又关联到了ByteArrayOutputStream,最终结果就写入到ByteArrayOutputStream里面了
                objectOutputStream.writeObject(object);
            }
//            该操作就是把byteArrayOutputStream中持有的二进制数据取出来,转成byte[]
            return byteArrayOutputStream.toByteArray();
        }
    }

    //  把一个字节数组,反序列化成一个对象
    public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
        Object object = null;
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {
            try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
//                此处的readObject,就是从data的byte[]中读取数据并且进行反序列化
                object = objectInputStream.readObject();
            }
        }
        return object;
    }
}


三、自定义异常类

 

自定义一个异常类,如果是mq的业务逻辑中出的异常,就抛出这个异常类

/*
*自定义异常类
*/
public class MqException extends Exception{
    public MqException(String reason){
        super(reason);
    }
}


四、创建MessageFileManger类

 对硬盘上的消息进行管理的类。


🍅 1、约定消息文件所在的目录和文件名字

//这里的init()方法暂时不用,只是列在这,后面可能扩展 
public void init(){
//        后续扩展
    }

//    约定的那个消息文件所在的目录和文件名

//    用来获取到指定队列对应的消息文件所在的路径
    private  String getQueueDir(String queueName){
        return "./data/" + queueName;
    }

//    用来获取该队列的消息数据文件路径
    private String getQueueDataPath(String queueName){
        return getQueueDir(queueName) + "/queue_data.txt";
    }

//    用来获取该队列列的消息统计文件路径
    private String getQueueStatPath(String queueName){
        return getQueueDir(queueName) + "/queue_stat.txt";
    }

 🍅 2、队列的统计信息

定义一个内部类,来表示该队列的统计信息:

//    定义一个内部类,来表示该队列的统计信息
    static public class Stat{
//        直接定义成public
        public int totalCount; //总消息数量
        public int validCount;//有效消息的数量
    }

然后实现消息统计的读写功能:

private Stat readStat(String queueName){
//        由于当前的消息统计文件是文本文件,直接使用scanner读取文件内容
        Stat stat = new Stat();
        try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {
            Scanner scanner = new Scanner(inputStream);
            stat.totalCount = scanner.nextInt();
            stat.validCount = scanner.nextInt();
            return stat;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    private void writeStat(String queueName,Stat stat){
//      使用PrinWrite来写
//        OutputStream打开文件,默认情况下,会直接把源文件清空,新数据会覆盖原数据
//        这里直接覆盖就可以了
        try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {
            PrintWriter printWriter = new PrintWriter(outputStream);
            printWriter.write(stat.totalCount + "\t" + stat.validCount);
            printWriter.flush();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

🍅 3、创建队列对应的目录和功能

//    创建队列对应的文件和目录
    public void createQueueFiles(String queueName) throws IOException {
//        1.创建队列对应的消息目录
        File baseDir = new File(getQueueDir(queueName));
        if (!baseDir.exists()){
//            不存在,就创建这个目录
            boolean ok = baseDir.mkdirs();
            if (!ok){
                throw  new IOException("创建目录失败!baseDir = " + baseDir.getAbsolutePath());
            }
        }

//        2.创建队列数据文件
        File queueDataFile = new File(getQueueDataPath(queueName));
        if (!queueDataFile.exists()){
            boolean ok = queueDataFile.createNewFile();
            if (!ok){
                throw new IOException("创建文件失败!queueDataFile = " + queueDataFile.getAbsolutePath());
            }
        }
//       3.创建消息统计文件
        File queueStatFile = new File(getQueueStatPath(queueName));
        if (!queueStatFile.exists()){
            boolean ok = queueStatFile.createNewFile();
            if (!ok){
                throw new IOException("创建文件失败!queueStatFile = " + queueStatFile.getAbsolutePath());
            }
        }

//        4.给消息统计文件,设定初始值 0\t0
        Stat stat = new Stat();
        stat.totalCount = 0;
        stat.validCount = 0;
        writeStat(queueName,stat);
    }

🍅 4、实现删除队列的文件和目录

//    实现删除队列的文件和目录
    public void destroyQueueFiles(String queueName) throws IOException {
//        先删除文件,在删除目录
        File queueDataFile = new File(getQueueDataPath(queueName));
        boolean ok1 = queueDataFile.delete();
        File queueStatFile = new File(getQueueStatPath(queueName));
        boolean ok2 = queueStatFile.delete();
        File baseDir = new File(getQueueDir(queueName));
        boolean ok3 = baseDir.delete();
        if (!ok1 || !ok2 || !ok3){
//            有任意一个删除失败,就算整体删除失败
            throw new IOException("删除队列目录和文件失败!baseDir = " + baseDir.getAbsolutePath());
        }
    }

🍅 5、检查队列的目录和文件是否都存在

后续有生产者给brocker server生产消息了,这个消息就可能需要记录到文件上(取决消息是否要持久化)。但是判断是否要持久化之前,需要检查队列中的文件是否存在。

//    检查队列的目录和文件是否存在
    public boolean checkFileExists(String queueName){
//        判断队列的数据文件和统计文件是否都存在
        File queueDataFile = new File(getQueueDataPath(queueName));
        if (!queueDataFile.exists()){
            return false;
        }
        File queueStatFile = new File(getQueueStatPath(queueName));
        if (!queueStatFile.exists()){
            return false;
        }
        return true;
    }

🍅 6、把消息写入到文件中

把一个新的消息,放到队列对应的文件中。主要包含以下三步:

(1)检查写入的队列是否存在;

(2)把Message对象进行序列化,转成二进制的字节数组;

(3)获取到当前数据的文件长度,使用[offsetBegin,offsetEnd]。把新的Message数据,写入到队列数据文件的末尾,此时,

   Message对象的offsetBegin,就是当前文件长度 +4

   offsetEnd就是当前文件长度 + 4 + message自身长度

(4)写入消息到数据文件,追加到数据文件末尾

(5)更新消息统计文件

写入文件时的线程安全问题:

* 如果两个线程,是往同一个队列中写消息,此时就需要阻塞等待;

假设现在有两个线程t1,t2。如果没有加锁,那么他们的目的就是将一个message写入到104~124之间去。但是,此时可能就会导致t1计算长度以后,没有进行写文件;t2就开始计算长度了,并且执行了写文件操作,写完以后,t1才开始写,但是此时t1就不是从104写了,而是从124开始写。这样会导致queue_data多出一段。

 

 所以这里我们就需要对队列进行加锁。

* 如果两个线程,需要往不同的队列中些消息,此时就不需要阻塞等待。

总体代码如下:

//    该方法用于把一个新的消息,放到队列对应的文件中
//    queue表示要把消息写入的队列,message则是要写的消息
    public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
//        1、检查要写入的队列是否存在
//        如果不存在
        if (!checkFileExists(queue.getName())){
            throw new MqException("[MessageFileManager] 队列对应的文件不存在!queueName = " + queue.getName());
        }

//        2、把Message对象进行序列化,转成二进制的字节数组
        byte[] messageBinary = BinaryTool.toBytes(message);

//        这个锁是,当有两个对象针对同一个对象操作时,锁才会有效
        synchronized (queue){
//        3、先获取到当前的队列数据文件长度,使用[offsetBegin,offsetEnd]
//        把新的Message数据,写入到队列数据文件的末尾,此时Message对象的offsetBegin,就是当前文件长度+4
//        offsetEnd就是当前文件长度 + 4 + message自身长度
            File queueDataFile = new File(getQueueDataPath(queue.getName()));
//        获取到文件的长度:queueDataFile.length();单位字节
            message.setOffsetBegin(queueDataFile.length() + 4);
            message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);

//        4.写入消息到数据文件,追加到数据文件末尾
            try(OutputStream outputStream = new FileOutputStream(queueDataFile,true)){
                try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
//                先写当前消息的长度,占据4个字节
//                writeInt()方法用于将给定的整数值作为4个字节(即32位)写入基本DataOutputStream,并且成功执行时变量计数器加4。
                    dataOutputStream.writeInt(messageBinary.length);
//                写入消息本体
                    dataOutputStream.write(messageBinary);
                }
            }

//        5.更新消息统计文件
            Stat stat = readStat(queue.getName());
            stat.totalCount += 1;
            stat.validCount += 1;
            writeStat(queue.getName(),stat);
        }
    }

🍅 7、删除文件中的消息

这里就是逻辑删除:将isValid设置为0.

主要分为3步:

        (1)把文件中需要删除的一段数据读出来,

        (2)还原回Message对象(反序列化);

        (3)把isValid改为0;

        (4)将上面的数据又写回到文件。

        (5)更新统计文件

这里的message对象,必须要包含offsetBegin和offsetEnd。因为这里是对文件中指定的位置进行读写的(把这个随机访问)。随机访问用到的类RandomAcessFile。

关于RandomAcessFile.seek()是用于设置文件指针(相当于光标)位置,设置后,光标会从当前指针的下一位读取到或写入到。 

//    删除消息的方法
    public void deleteMessage(MSGQueue queue,Message message) throws IOException,ClassNotFoundException{
        synchronized (queue){
            try(RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()),"rw")){
//            1.先从文件中读取对应的Message数据
                byte[] bufferSrc = new byte[(int)(message.getOffsetEnd() - message.getOffsetBegin())];
                randomAccessFile.seek(message.getOffsetBegin());
                randomAccessFile.read(bufferSrc);
//            2.把当前读出来的二进制数据,转回成Message对象
                Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);
//            3.把isValid设置为无效
                diskMessage.setIsValid((byte) 0x0);
//            4.重新写入文件
                byte[] buffserDest = BinaryTool.toBytes(diskMessage);
//        这里还需要设置光标的位置,因为,上面的光标已经随着读出数据而发生了改变,已经走到了下一条message的offsetBegin,
//            这里为了重新写入数据到文件中,就需要将光标移到对应的位置上面
                randomAccessFile.seek(message.getOffsetBegin());
                randomAccessFile.write(buffserDest);
//            5.统计文件-1
//            因为有一条数据无效了
                Stat stat = readStat(queue.getName());
                if (stat.validCount > 0){
                    stat.validCount -= 1;
                }
                writeStat(queue.getName(),stat);
            }
        }
    }

🍅 8、将硬盘中的数据加载到内存中

将数据从文件中,读取出所有的消息内容,加载到内存当中(放到一个链表中),这个方法会在程序启动的时候调用,主要又以下几步

        1.读取当前消息的长度;

        2.按照该长度,读取消息内容;

        3.将读取到的二进制数据,反序列化回Message对象

        4.判断消息对象是不是无效对象

        4.将有效的Message对象插入到链表中

//   将数据从文件中,读取出所有的消息内容,加载到内存当中(放到一个链表中)
//    由于该方法实在程序启动时调用, 此时服务器还不能处理请求,不涉及线程操作文件.
    public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException,MqException,ClassNotFoundException {
        LinkedList<Message> messages = new LinkedList<>();
        try(InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){
            try(DataInputStream dataInputStream = new DataInputStream(inputStream)){
//               该变量记录当前文件光标位置,初始位置为0
                long currentOffset = 0;
//                一个文件中包含了很多消息,这里要循环读取
                while (true){
//                    1.读取当前消息的长度,这里可能会读到文件末尾
//                    reaIn()方法读到文件末尾,会抛出EOFException异常
//                    readInt()读取出4个字节
                    int messageSize = dataInputStream.readInt();
//                    2.按照这个长度,读取消息内容
//                    buffer是一个盛放消息容器,和消息的长度一般大小
                    byte[] buffer = new byte[messageSize];
                    int actualSize = dataInputStream.read(buffer);
                    if (messageSize != actualSize){
//                        如果不匹配,说明文件有问题,格式错乱了
                        throw new MqException("[MessageFileManager] 文件格式错误!queueName = " + queueName);
                    }
//                    3.将读取到的二进制数据,反序列化回Message对象
                    Message message = (Message) BinaryTool.fromBytes(buffer);
                    //                  4.判定这个消息对象是不是无效对象
                    if(message.getIsValid() != 0x1){
//                        无效数据,直接跳过
//                        虽然消息是无效消息,但是offset要更新
                        currentOffset += (4 + messageSize);
                        continue;
                    }
//                    有效数据,则需要把这个Message对象加入到链表中,加入前还需要填写offsetBegin和offsetEnd;
//                    进行offset的时候,需要知道当前光标的位置,由于当下使用的DataInputStream并不方便计算光标位置
//                    因此这里手动计算文件光标位置
                    message.setOffsetBegin(currentOffset + 4);
                    message.setOffsetEnd(currentOffset + 4 + messageSize);
                    currentOffset += (4 + messageSize);
                    messages.add(message);
                }
            } catch (EOFException e){
                System.out.println("[MessageFileManager]恢复Message数据完成");
            }
        }
        return messages;
    }

🍅 9、实现消息文件的垃圾回收

为什么要实现垃圾回收?(GC)

        由于当前会不停的往消息文件中写入新消息,而且删除也只是逻辑删除(isValid),这样就可能导致消息文件越来越大,并且里面又包含了大量的无效消息。

此处的垃圾回收,使用的是复制算法:

        判断,当文件中消息总数超过了2000,并且有效消息的数目不足50%时,就触发垃圾回收。然后将文件中有效的消息复制出来,单独写入到一个新的文件中,删除旧文件,使用新文件代替。

        🎈 检测是否要进行GC

检查当前是否要针对该队列的消息数据文件进行GC,判断是否要GC,根据总消息数目和有效消息数目判断。

//    检查当前是否要针对该队列的消息数据文件进行GC
    public boolean checkGC(String queueName){
//        判断是否要GC,根据总消息数目和有效消息数目,这两个值都是在消息统计文件中实现的。
        Stat stat = readStat(queueName);
        if (stat.totalCount > 2000 && (double)stat.validCount / (double) stat.totalCount < 0.5){
            return  true;
        }
        return false;
    }

        🎈 构造新目录

构造一个新目录,放置有效的复制信息。

//    构造一个目录结构放置复制的信息
    private String getQueueDataNewPath(String queueName){
        return getQueueDir(queueName) + "/queue_data_new.txt";
    }

        🎈 进行GC操作

执行消息数据文件的垃圾回收操作,使用复制算法完成,主要分为以下几步:

        (1)创建一个新的文件queue_data_new.txt

        (2)从旧文件中读取出所有的有效消息对象

        (3)把有效消息写入到queue_data_new.txt

        (4)删出旧的文件,并把新的文件重命名(queue_data_new.txt => queue_data.txt)

        (5)更新统计文件

public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {
//        进行gc的时候,是针对消息数据文件作出整体性的一个操作,在这个过程中,
//        进行加锁操作,让其他线程不能对该队列的消息文件作出任何修改
        synchronized (queue) {
            // 由于 gc 操作可能比较耗时, 此处统计一下执行消耗的时间.
            long gcBegin = System.currentTimeMillis();

            // 1. 创建一个新的文件
            File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
            if (queueDataNewFile.exists()) {
                // 正常情况下, 这个文件不应该存在. 如果存在, 就是意外~~ 说明上次 gc 了一半, 程序意外崩溃了.
                throw new MqException("[MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在! queueName=" + queue.getName());
            }
            boolean ok = queueDataNewFile.createNewFile();
            if (!ok) {
                throw new MqException("[MessageFileManager] 创建文件失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath());
            }

            // 2. 从旧的文件中, 读取出所有的有效消息对象了. (这个逻辑直接调用上述方法即可, 不必重新写了)
            LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());

            // 3. 把有效消息, 写入到新的文件中.
            try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {
                try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                    for (Message message : messages) {
                        byte[] buffer = BinaryTool.toBytes(message);
                        // 先写四个字节消息的长度
                        dataOutputStream.writeInt(buffer.length);
                        dataOutputStream.write(buffer);
                    }
                }
            }

            // 4. 删除旧的数据文件, 并且把新的文件进行重命名
            File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
            ok = queueDataOldFile.delete();
            if (!ok) {
                throw new MqException("[MessageFileManager] 删除旧的数据文件失败! queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
            }
            // 把 queue_data_new.txt => queue_data.txt
            ok = queueDataNewFile.renameTo(queueDataOldFile);
            if (!ok) {
                throw new MqException("[MessageFileManager] 文件重命名失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()
                        + ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
            }

            // 5. 更新统计文件
            Stat stat = readStat(queue.getName());
            stat.totalCount = messages.size();
            stat.validCount = messages.size();
            writeStat(queue.getName(), stat);

            long gcEnd = System.currentTimeMillis();
            System.out.println("[MessageFileManager] gc 执行完毕! queueName=" + queue.getName() + ", time="
                    + (gcEnd - gcBegin) + "ms");
        }
    }


五、测试MessageFileManager类


🍅 1、“准备工作”和“收尾工作”

创建MessagefileManagerTests 

@SpringBootTest
public class MessageFileManagerTests {
    private MessageFileManger messageFileManger = new MessageFileManger();

    private static final String queueName1 = "testQueue1";
    private static final String queueName2 = "testQueue2";

//  每个用例执行之前的准备工作
    @BeforeEach
    public void setUp() throws IOException {
//        准备阶段,创建出两个队列,以备后用
        messageFileManger.createQueueFiles(queueName1);
        messageFileManger.createQueueFiles(queueName2);

    }


//    每个用例执行之后的收尾工作
    @AfterEach
    public void tearDown() throws IOException {
//        收尾阶段,把创建出的队列销毁掉
        messageFileManger.destroyQueueFiles(queueName1);
        messageFileManger.destroyQueueFiles(queueName2);
    }
}


🍅 2、测试创建文件是否存在

@Test
    public void testCreateFiles(){
//        创建队列文件在准备工作已经执行过了,这里主要是为了验证文件是否存在
        File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");
//        assertEquals(预期值,实际值)
        Assertions.assertEquals(true,queueDataFile1.isFile());
        File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");
        Assertions.assertEquals(true,queueStatFile1.isFile());

        File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");
        Assertions.assertEquals(true,queueDataFile2.isFile());
        File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");
        Assertions.assertEquals(true,queueStatFile2.isFile());
    }

这里为了方便查看文件是否创建,就把收尾工作注释掉了

 


 🍅 3、测试writetStat和readStat是否能够通过

@Test
    public void testReadWriteStat(){
        MessageFileManger.Stat stat = new MessageFileManger.Stat();
        stat.totalCount = 100;
        stat.validCount =50;

//        由于writeStat和readStat是私有方法,此处就需要使用反射的方式
//        使用Spring封装好的反射的工具类
        ReflectionTestUtils.invokeMethod(messageFileManger,"writeStat", queueName1,stat);

//        写入完毕之后,调用读取,验证读取的结果和写入的数据是一致的
        MessageFileManger.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManger,"readStat",queueName1);
        Assertions.assertEquals(100,newStat.totalCount);
        Assertions.assertEquals(50,newStat.validCount);

        System.out.println("writetStat和readStat测试通过");
    }


 🍅 4、测试sendMessage

构造创建queue和message的方法:

 private MSGQueue createTestQueue(String queueName){
        MSGQueue queue = new MSGQueue();
        queue.setName(queueName);
        queue.setDurable(true);    //是否要持久化
        return queue;
    }

//    构造出一条消息
    private Message createTestMessage(String content){
        Message message = Message.createMessageWithId("testRoutingKey",null,content.getBytes());
        return message;
    }

测试sendMessage:

@Test
    public void testSendMessage() throws IOException, MqException, ClassNotFoundException {
//        构造出消息,并且构造出队列
        Message message = createTestMessage("testMessage");
//       创建queue对象
        MSGQueue queue = createTestQueue(queueName1);

//      调用发送消息的方法
        messageFileManger.sendMessage(queue,message);

//       检查stat文件
        MessageFileManger.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManger,"readStat",queueName1);
        Assertions.assertEquals(1,stat.totalCount);
        Assertions.assertEquals(1,stat.validCount);

//        检查文件,把消息读出来
        LinkedList<Message> messages = messageFileManger.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(1,messages.size());
        Message curMessage = messages.get(0);
        Assertions.assertEquals(message.getMessageId(), curMessage.getMessageId());
        Assertions.assertEquals(message.getDeliverMode(),curMessage.getDeliverMode());

//      比较两个字节数组的内容是否相同,不能直接使用asserEquals
        Assertions.assertArrayEquals(message.getBody(),curMessage.getBody());

        System.out.println("message = "+ curMessage);
    }

 

构造100条消息, 并且读取出来

 @Test
    public void testLoadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {
//      往队列中插入100条消息,验证100条消息从文件中读取之后,是否和最初是一致的
        MSGQueue queue = createTestQueue(queueName1);
        List<Message> expectedMessages = new LinkedList<>();
        for (int i = 0; i < 100; i++) {
            Message message = createTestMessage("testMessge" + 1);
            messageFileManger.sendMessage(queue,message);
            expectedMessages.add(message);
        }

//        读取所有消息
        LinkedList<Message> actualMessages = messageFileManger.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(expectedMessages.size(),actualMessages.size());
        for (int i = 0; i < expectedMessages.size(); i++) {
            Message expectedMessage = expectedMessages.get(i);
            Message actualMessage = actualMessages.get(i);
            System.out.println("[" + i + "]actualMessage = " + actualMessages);

            Assertions.assertEquals(expectedMessage.getMessageId(),actualMessage.getMessageId());
            Assertions.assertEquals(expectedMessage.getRoutingKey(),actualMessage.getRoutingKey());
            Assertions.assertEquals(expectedMessage.getDeliverMode(),actualMessage.getDeliverMode());
            Assertions.assertArrayEquals(expectedMessage.getBody(),actualMessage.getBody());
            Assertions.assertEquals(0x1,actualMessage.getIsValid());
        }
    }

 


🍅 5、测试删除消息

//    测试删除消息
    @Test
    public void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {
        // 创建队列, 写入 10 个消息. 删除其中的几个消息. 再把所有消息读取出来, 判定是否符合预期.
        MSGQueue queue = createTestQueue(queueName1);
        List<Message> expectedMessages = new LinkedList<>();
        for (int i = 0; i < 10; i++) {
            Message message = createTestMessage("testMessage" + i);
            messageFileManager.sendMessage(queue, message);
            expectedMessages.add(message);
        }

        // 删除其中的三个消息
        messageFileManager.deleteMessage(queue, expectedMessages.get(7));
        messageFileManager.deleteMessage(queue, expectedMessages.get(8));
        messageFileManager.deleteMessage(queue, expectedMessages.get(9));

        // 对比这里的内容是否正确.
        LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(7, actualMessages.size());
        for (int i = 0; i < actualMessages.size(); i++) {
            Message expectedMessage = expectedMessages.get(i);
            Message actualMessage = actualMessages.get(i);
            System.out.println("[" + i + "] actualMessage=" + actualMessage);

            Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
            Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
            Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
            Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
            Assertions.assertEquals(0x1, actualMessage.getIsValid());
        }
    }

🍅 6、测试垃圾回收

 @Test
    public void testGC() throws IOException, MqException, ClassNotFoundException {
        // 先往队列中写 100 个消息. 获取到文件大小.
        // 再把 100 个消息中的一半, 都给删除掉(比如把下标为偶数的消息都删除)
        // 再手动调用 gc 方法, 检测得到的新的文件的大小是否比之前缩小了.
        MSGQueue queue = createTestQueue(queueName1);
        List<Message> expectedMessages = new LinkedList<>();
        for (int i = 0; i < 100; i++) {
            Message message = createTestMessage("testMessage" + i);
            messageFileManager.sendMessage(queue, message);
            expectedMessages.add(message);
        }

        // 获取 gc 前的文件大小
        File beforeGCFile = new File("./data/" + queueName1 + "/queue_data.txt");
        long beforeGCLength = beforeGCFile.length();

        // 删除偶数下标的消息
        for (int i = 0; i < 100; i += 2) {
            messageFileManager.deleteMessage(queue, expectedMessages.get(i));
        }

        // 手动调用 gc
        messageFileManager.gc(queue);

        // 重新读取文件, 验证新的文件的内容是不是和之前的内容匹配
        LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(50, actualMessages.size());
        for (int i = 0; i < actualMessages.size(); i++) {
            // 把之前消息偶数下标的删了, 剩下的就是奇数下标的元素了.
            // actual 中的 0 对应 expected 的 1
            // actual 中的 1 对应 expected 的 3
            // actual 中的 2 对应 expected 的 5
            // actual 中的 i 对应 expected 的 2 * i + 1
            Message expectedMessage = expectedMessages.get(2 * i + 1);
            Message actualMessage = actualMessages.get(i);

            Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
            Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
            Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
            Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
            Assertions.assertEquals(0x1, actualMessage.getIsValid());
        }
        // 获取新的文件的大小
        File afterGCFile = new File("./data/" + queueName1 + "/queue_data.txt");
        long afterGCLength = afterGCFile.length();
        System.out.println("before: " + beforeGCLength);
        System.out.println("after: " + afterGCLength);
        Assertions.assertTrue(beforeGCLength > afterGCLength);
    }

六、小结

MessageFileManager主要是负责管理消息在文件中的存储:

        (1)设计了目录结构和文件格式

        (2)实现了目录创建和删除

        (3)实现了统计文件的读写

        (4)实现了消息的写入

        (5)实现了消息的删除

        (6)实现了加载所有消息

        (7)垃圾回收

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/840909.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

重锤式表面电阻测试仪的原理和特点

重锤式表面电阻测试仪是一种用于测量材料表面电阻的仪器。它采用了重锤敲击和测量电流的方式进行测试。 工作原理&#xff1a; 重锤式表面电阻测试仪通过将一个金属锤头敲击在待测物体表面&#xff0c;产生一个封闭电路。测量仪器通过检测在敲击区域上下电极之间距离的电流流…

C语言代码的x86-64汇编指令分析过程记录

先通过Xcode创建一个terminal APP&#xff0c;语言选择C。代码如下&#xff1a; #include <stdio.h>int main(int argc, const char * argv[]) {int a[7]{1,2,3,4,5,6,7};int *ptr (int*)(&a1);printf("%d\n",*(ptr));return 0; } 在return 0处打上断点&…

相机传感器格式与镜头光圈参数

相机靶面大小 CCD/CMOS图像传感器尺寸&#xff08;sensor format&#xff09;1/2’‘、1/3’‘、1/4’实际是多大 1英寸——靶面尺寸为宽12.7mm*高9.6mm&#xff0c;对角线16mm。 2/3英寸——靶面尺寸为宽8.8mm*高6.6mm&#xff0c;对角线11mm。 1/2英寸——靶面尺寸为宽6.…

第4章 变量、作用域与内存

引言 由于js是一门只有在声明变量后才能明确类型的语言&#xff0c;并且在任意时刻都可以改变数据类型。这也引起了一些问题 原始值与引用值 原始值就是基本数据类型&#xff0c;引言值就是复杂数据类型 变量在赋值的时候。js会判断如果是原始值&#xff0c;访问时就是按值访问…

Air32 | 合宙Air001单片机内部FLASH读写示例

Air32 | 合宙Air001单片机内部FLASH读写示例 代码已经通过测试&#xff0c;开发环境KEIL-MDK 5.36。 测试代码 void FLASH_RdWrTest(void) {uint32_t Address;uint32_t PageReadBuffer[FLASH_PAGE_SIZE >> 2];uint32_t PageWriteBuffer[FLASH_PAGE_SIZE >> 2];mem…

仓储物流业如何实现精细化工时成本管理,提升出库效率?

面对日益复杂的物流需求&#xff0c;以及不断上涨的人力成本。仓储物流企业想要在市场竞争中脱颖而出&#xff0c;不仅需要提升出库效率&#xff0c;满足消费者次日达甚至当日达的高要求&#xff0c;更需要控制人力成本&#xff0c;保证企业持续盈利的能力。盖雅新推出的 「劳…

恒运资本:史上最强暑期档!总票房突破147亿,前三都是国产片!

暑期档电影又爆了&#xff01; 就在刚刚曩昔的周末&#xff0c;在《封神第一部》《巨齿鲨2&#xff1a;深渊》《火热》等电影的大卖&#xff0c;以及《背注一掷》点映及预售的加持下&#xff0c;短短两天的大盘票房就到达10亿元。 其间&#xff0c;据猫眼专业版数据&#xff0…

Windows下安装Kafka(图文记录详细步骤)

Windows下安装Kafka Kafka简介一、Kafka安装前提安装Kafka之前&#xff0c;需要安装JDK、Zookeeper、Scala。1.1、JDK安装&#xff08;version&#xff1a;1.8&#xff09;1.1.1、JDK官网下载1.1.2、JDK网盘下载1.1.3、JDK安装 1.2、Zookeeper安装1.2.1、Zookeeper官网下载1.2.…

Java方法重写

目录 1.什么是方法重写 2.方法重写的规则 3.重写与重载的区别 1.什么是方法重写 重写&#xff08;override&#xff0c;也称为覆盖&#xff09;&#xff1a;在子类中对父类中允许访问的方法的实现过程进行重新编写&#xff0c;子类中方法的名称、返回值类型、参数列表与父类…

Redis安装以及配置隧道连接(centOs)

目录 1.centOs安装Redis 2. Redis 启动和停⽌ 3. 操作Redis 2.Xshell配置隧道 1.centOs安装Redis #使⽤yum安装Redis yum -y install redis 2. Redis 启动和停⽌ #查看是否启动 ps -ef|grep redis#启动redis: redis-server /etc/redis.conf &#停⽌Redis redis-cli sh…

【web逆向】全报文加密流量的去加密测试方案

aHR0cHM6Ly90ZGx6LmNjYi5jb20vIy9sb2dpbg 国密混合 WEB JS逆向篇 先看报文&#xff1a;请求和响应都是全加密&#xff0c;这种情况就不像参数加密可以方便全文搜索定位加密代码&#xff0c;但因为前端必须解密响应的密文&#xff0c;因此万能的方法就是搜索拦截器&#xff0c…

B站电商分析,如何发现近期热门商品及优质视频带货达人?

哔哩哔哩是我们熟知的以二次元为基调的视频内容生产平台&#xff0c;具有浓厚而专注的二次元社区氛围&#xff0c;随着b站的迅猛发展&#xff0c;b站由原先的二次元市场逐渐扩张到电子竞技、美妆、生活、纪录片等多个领域。而b站的营收主要来自平台广告变现&#xff0c;内容付费…

python爬虫2:requests库-原理

python爬虫2&#xff1a;requests库-原理 前言 ​ python实现网络爬虫非常简单&#xff0c;只需要掌握一定的基础知识和一定的库使用技巧即可。本系列目标旨在梳理相关知识点&#xff0c;方便以后复习。 目录结构 文章目录 python爬虫2&#xff1a;requests库-原理1. 概述2. re…

JSP实训项目设计报告—MVC简易购物商城

JSP实训项目设计报告—MVC简易购物商城 文章目录 JSP实训项目设计报告—MVC简易购物商城设计目的设计要求设计思路系统要求单点登录模块商品展示模块购物车展示模块 概要设计Model层View层Controller层 详细设计Model层View层登录界面系统主界面 Controller层 系统运行效果项目…

基于Orangepi 3 lts 的云台相机

利用orangepi 3 lts 和arduino nano 制作了一个云台相机&#xff0c;可用于室内监控。 硬件&#xff1a; orangepi 3 ,arduino nano ,usb相机&#xff0c;180度舵机两个 WeChat_20230806213004 软件&#xff1a; 整体采用mqtt进行消息的中转。 相机采用python 利用opencv…

PAT(Advanced Level)刷题指南 —— 第四弹

一、1104 Sum of Number Segments 1. 问题描述 2. Sample Input 4 0.1 0.2 0.3 0.43. Sample Output 5.004. 题解 思路:打表,比如4个数,第一个数出现1*(4) = 4次,第二个数出现2*(4 - 1) 

以太网协议学习笔记

以太网接口电路主要由MAC&#xff08;Media Access Control&#xff09;控制器和物理层接口PHY两大部分构成。 PHY在发送数据时&#xff0c;接收MAC发过来的数据&#xff0c;把并行的数据转化为串行流数据&#xff0c;按照物理层的编码规则把数据编码转化为模拟信号发送出去&am…

【Opencv入门到项目实战】(一):Opencv安装及图像基本操作

文章目录 0.Opencv介绍及环境配置1.图像读取1.1 彩色图像读取1.2 灰色图像读取 2.视频读取3.ROI读取3.1 图形切片处理3.2 提取颜色通道 4.图像填充5.数值运算与图像融合5.1 加法运算5.2 图像融合 6. 总结 0.Opencv介绍及环境配置 OpenCV是一个强大的计算机视觉库&#xff0c;它…

Dockerfile部署golang

使用go镜像打包&#xff0c;运行在容器内 redis和mysql用外部的 项目目录结构 w1go项目&#xff1a; Dockerfile # 这种方式是docker项目加上 本地的mysql和redis环境 # go打包的容器 FROM golang:alpine AS builder# 为我们镜像设置一些必要的环境变量 ENV GO111MODULEon …

Maven-生命周期及命令

关于本文 ✍写作原因 之前在学校学习的时候&#xff0c;编写代码使用的项目都是单体架构&#xff0c;导入开源框架依赖时只需要在pom.xml里面添加依赖&#xff0c;点一下reload按钮即可解决大部分需求&#xff1b;但是在公司使用了dubbo微服务架构之后发现只知道使用reload不足…