Message 存储
由于 Message 以后会是一个比较庞大的数据, 放在数据库里并不合适, 因此我们要将它放在二进制文件中
因为 Message 和 Queue 是捆绑在一起的, 因此我们将目录名设置成 QueueName, 然后这些目录是存储在之前的 data 里
就像这样:
在 testQueue 中有包含两个文件:
queue_data.txt 和 queue_stat.txt
queue_data.txt 中包含的是消息具体内容
然后我们规定在消息具体内容中的数据存储方式:
前面是消息长度, 后面接上消息的二进制数据, 这样才取出数据的时候也能计算出这些消息的
offsetBeg 和 offsetEnd
queue_stat.txt 中包含的是总数据数和有效数据数
因为我们在进行文件中消息的删除时不可能直接进行删除, 我们要做的就是把消息给拿出来, 将属性isvalid 设为 false, 再放回去, 同时有效数据数 -1 这样就相当于消息已经被删除了
MessageFileManager
在 datacenter 中创建一个 MessageFileManager 的类
先写对文件和目录进行创建的方法:
// 定义一个内部类, 来标识该队列的统计消息
// 使用 static, 静态内部类
static public class Stat{
public int totalCount;
public int validCount;
}
// 获得消息所在的路径, 里面有 queue_data.txt 和 queue_stat.txt
private String getMessageDir(String queueName){
return "./data/" + queueName;
}
// 获得 queue_data.txt 路径
private String getMessageDataPath(String queueName){
return getMessageDir(queueName) + "/queue_data.txt";
}
// 获得 queue_stat.txt 路径
private String getMessageStatPath(String queueName){
return getMessageDir(queueName) + "./queue_stat.txt";
}
private void writeStat(String queueName, Stat stat) throws IOException {
File statFile = new File(getMessageStatPath(queueName));
try(OutputStream outputStream = new FileOutputStream(statFile)){
try(PrintWriter printWriter = new PrintWriter(outputStream)){
printWriter.write(stat.totalCount+"\t"+stat.validCount);
printWriter.flush();
}
}
}
// 创建对应的文件和目录
public void createMessageFiles(String queueName) throws MqException, IOException {
// 查看目录在不在, 不在就创建
File baseDir = new File(getMessageDir(queueName));
if(!baseDir.exists()){
boolean ok = baseDir.mkdirs();
if(!ok){
throw new MqException("[MessageFileManager] 目录创建失败 queueName=" + queueName);
}
}
// 创建 queue_data.txt
File dataFile = new File(getMessageDataPath(queueName));
if(!dataFile.exists()){
boolean ok = dataFile.createNewFile();
if(!ok){
throw new MqException("[MessageFileManager] queue_data.txt 创建失败 queueName=" + queueName);
}
}
// 创建 queue_stat.txt
File statFile = new File(getMessageStatPath(queueName));
if(!dataFile.exists()){
boolean ok = statFile.createNewFile();
if(!ok){
throw new MqException("[MessageFileManager] queue_stat.txt 创建失败 queueName=" + queueName);
}
}
// 给queue_stat.txt 中插入初始值
Stat stat = new Stat();
stat.totalCount = 0;
stat.validCount = 0;
writeStat(queueName, stat);
}
// 销毁 queueName 对应的文件和目录
public void destroyMessageFile(String queueName) throws MqException {
// 先销毁 queue_data.txt 和 queue_stat.txt
File dataFile = new File(getMessageDataPath(queueName));
if(dataFile.exists()){
boolean ok = dataFile.delete();
if(!ok){
throw new MqException("[MessageFileManager] queue_data.file销毁失败 queueName=" + queueName);
}
}
File statFile = new File(getMessageStatPath(queueName));
if(dataFile.delete()){
boolean ok = statFile.delete();
if(!ok){
throw new MqException("[MessageFileManager] queue_stat.txt销毁失败 queueName=" + queueName);
}
}
File baseDir = new File(getMessageDir(queueName));
if(baseDir.exists()){
boolean ok = baseDir.delete();
if(!ok){
throw new MqException("[MessageFileManager] 目录销毁失败 queueName=" + queueName);
}
}
}
注: 这里的 MqException 是我们自己创建的 Exception
在 common 里进行创建 MqException:
往里面存数据:
因为要存二进制数据, 因此我们要再存之前要将 message 转换成 byte[] 然后存进去, 这就是序列化
// 检查目录和文件是否存在
public boolean checkFilesExits(String queueName) {
// 判定队列的数据文件和统计文件是否都存在!!
File queueDataFile = new File(getMessageDataPath(queueName));
if (!queueDataFile.exists()) {
return false;
}
File queueStatFile = new File(getMessageStatPath(queueName));
if (!queueStatFile.exists()) {
return false;
}
return true;
}
private Stat readStat(String queueName) throws IOException {
// 由于当前消息统计文件是文本文件, 可以直接使用 Scanner 来读取文件内容
Stat stat = new Stat();
File statFile = new File(getMessageStatPath(queueName));
try(InputStream inputStream = new FileInputStream(statFile)){
try(Scanner scanner = new Scanner(inputStream)){
stat.totalCount = scanner.nextInt();
stat.validCount = scanner.nextInt();
}
}
return stat;
}
public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
// 检查当前要写的文件是否存在
if(!checkFilesExits(queue.getName())){
throw new MqException("[MessageFileManager] 要写入的文件不存在 queueName=" +queue.getName());
}
// 把 message 序列化
byte[] messageBinary = BinaryTool.toBytes(message);
synchronized (queue){
// 先获取到当前队列数据文件的长度, 用来计算出该 Message 对象的 offsetBeg 和 offsetEnd
// 把新的 Message 数据, 写入到队列数据文件的末尾. 此时 Message 对象的 offsetNeg, 就是当前文件长度 + 4
// offsetEnd 就是当前文件长度 + 4 + message 自身长度
File dataFile = new File(getMessageDataPath(queue.getName()));
message.setOffsetBeg(dataFile.length()+4);
message.setOffsetEnd(dataFile.length()+4+messageBinary.length);
// message 写入
try(OutputStream outputStream = new FileOutputStream(dataFile)){
try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
// 写入长度
dataOutputStream.writeInt(messageBinary.length);
// 写入消息本体
dataOutputStream.write(messageBinary);
}
}
// 改变 stat 里的值
Stat stat = readStat(queue.getName());
stat.totalCount++;
stat.validCount++;
writeStat(queue.getName(), stat);
}
}
删除消息:
这里涉及到我们的反序列化:
// 通过将 message 拿出来, 然后将 iSValid 设成 null
// 在写入进行
public void deleteMessage(MSGQueue queue, Message message) throws MqException, IOException, ClassNotFoundException {
if(!checkFilesExits(queue.getName())){
throw new MqException("[MessageFileManager] 要删除消息的文件不存在 queueName=" +queue.getName());
}
File dataFile = new File(getMessageDataPath(queue.getName()));
synchronized (queue){
try(RandomAccessFile randomAccessFile = new RandomAccessFile(dataFile, "rw")){
// 1. 先从文件中读取对应的 Message 数据, 这里会把 buffSrc 给装满
byte[] bufferSrc = new byte[(int)(message.getOffsetEnd()- message.getOffsetBeg())];
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.read(bufferSrc);
// 2. 把当前读出来的二进制数据转换成 Message 对象
Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);
// 3. 把 isValid 设置成无效
diskMessage.setIsValid((byte)0x0);
// 4. 重新写入文件
byte[] bufferDest = BinaryTool.toBytes(diskMessage);
// 上面在读之后, 光标移到了下一个消息的位置
// 因此先将光标移回来
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.write(bufferDest);
}
// 更新统计文件
Stat stat = readStat(queue.getName());
if(stat.validCount > 0){
stat.validCount--;
}
writeStat(queue.getName(), stat);
}
}
得到指定目录中的所有消息:
// 将所有消息内容加载到内存中
// 这个方法准备在程序启动时调用
// 使用 LinkedList, 主要目的是为了进行头删操作
// 由于是在程序启动时进行调用, 此时服务器还不能处理请求, 因此不需要进行加锁
public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, ClassNotFoundException, MqException {
LinkedList<Message> messages = new LinkedList<>();
try(InputStream inputStream = new FileInputStream(getMessageDataPath(queueName))){
try(DataInputStream dataInputStream = new DataInputStream(inputStream)){
// 记录当前文件光标
long currentOffset = 0;
// 循环读取消息
while(true){
// 1. 读取当前消息的长度, 这里的 readInt 会读到文章末尾后
// 因此会抛出 EOFException 异常,
int messageSize = dataInputStream.readInt();
// 2. 按照长度读取到消息
byte[] buffer = new byte[messageSize];
int actualSize = dataInputStream.read(buffer);
if(messageSize != actualSize){
throw new MqException("[MessageFileManager] 文件格式错误! queueName=" + queueName);
}
// 3. 将读到的二进制数据反序列化会 Message对象
Message message = (Message) BinaryTool.fromBytes(buffer);
// 4. 判定一下看这个消息对象是不是无效对象
if(message.getIsValid() != 0x1){
// 虽然是无效内容, 但也要将 offset 更新
currentOffset += (4 + messageSize);
continue;
}
// 5. 有效数据则将其加入链表中, 加入前计算 offsetBeg 和 offsetEnd
message.setOffsetBeg(currentOffset + 4);
message.setOffsetBeg(currentOffset + 4 + messageSize);
currentOffset += (4 + messageSize);
messages.add(message);
}
} catch (EOFException e) {
// 这里的 catch 是预料中的情况, 是正常的业务逻辑
// 因此不需要去处理
System.out.println("[MessageFileManager] 恢复 Message 数据完成 queueName=" + queueName);
}
}
return messages;
}
垃圾回收(gc)
这里的垃圾回收策略是: 当总消息数大于 2000 条时, 有效消息数小于一半时. 进行垃圾回收
// GC回收策略
// 约定当消息大于 2000 条并且 有效消息 < 1/2 时 进行 gc
public boolean checkGC(String queueName) throws IOException {
// 判定是否要 GC
Stat stat = readStat(queueName);
if (stat.totalCount > 2000 && stat.totalCount / stat.validCount > 2) {
return true;
}
return false;
}
private String getQueueDataNewPath(String queueName){
return getMessageDir(queueName) + "/queue_data_new.txt";
}
// 使用复制算法进行gc
// 创建新文件 queue_data_new.txt, 将消息放入
// 删除 queue_data.txt
// 将 queue_data_new.txt 改为 queue_data.txt
public void gc(MSGQueue queue) throws IOException, MqException, ClassNotFoundException {
synchronized (queue){
// 计算一下gc消耗的时间
long gcBeg = System.currentTimeMillis();
// 1. 创建一个新文件
File dataNewFile = new File(getQueueDataNewPath(queue.getName()));
if(dataNewFile.exists()){
// 如果存在说明之前 gc 到一半, 程序崩溃了
throw new MqException("[MessageFIleManager] gc 时发现 queue_data_new.txt 存在! queueName=" + queue.getName());
}
boolean ok = dataNewFile.createNewFile();
if(!ok){
throw new MqException("[MessageFileManager] 创建 queue_data_new.txt 失败! DataNewFile=" + dataNewFile.getAbsolutePath());
}
// 2. 从旧的文件中, 读取出所有的有效消息对象(这个方法已经去除掉无效对象了)
LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());
// 3. 将 isValid = true 的消息写入新文件
try(OutputStream outputStream = new FileOutputStream(dataNewFile)){
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
for(Message message : messages){
byte[] buffer = BinaryTool.toBytes(message);
// 先写消息长度
dataOutputStream.writeInt(buffer.length);
dataOutputStream.write(buffer);
}
}
}
// 4. 删除旧的数据文件, 并且把新的文件重新命名
File dataOldFile = new File(getMessageDataPath(queue.getName()));
ok = dataOldFile.delete();
if(!ok){
throw new MqException("[MessageFileManager] 删除旧的数据文件失败! dataOldFile=" + dataOldFile.getAbsolutePath());
}
// 把 queue_data_new.txt -> queue_data.txt
ok = dataNewFile.renameTo(dataOldFile);
if(!ok){
throw new MqException("[MessageFileManager] 文件重命名失败! dataNameFile=" + dataNewFile.getAbsolutePath());
}
// 更新统计文件
Stat stat = readStat(queue.getName());
stat.totalCount = messages.size();
stat.validCount = messages.size();
writeStat(queue.getName(), stat);
System.out.println("[MessageFileManager] 垃圾回收完成 queueName=" + queue.getName());
}
}