文章目录
- 一、硬盘管理
- 1, 创建 DiskDataCenter 类
- 2, init() 初始化
- 3, 封装交换机
- 4, 封装队列
- 5, 关于绑定
- 6, 关于消息
- 二、内存管理
- 1, 数据结构的设计
- 2, 创建 MemoryDataCenter 类
- 3, 关于交换机
- 4, 关于队列
- 5, 关于绑定
- 6, 关于消息
- 7, 恢复数据
- 三、小结
创建 Spring Boot 项目, Spring Boot 2 系列版本, Java 8 , 引入 MyBatis, Lombok 依赖
提示:是正在努力进步的小菜鸟一只,如有大佬发现文章欠佳之处欢迎批评指点~ 废话不多说,直接上干货!
整体目录结构 :
本文主要实现 server 包
一、硬盘管理
硬盘管理包括第二篇文章中实现的 DataBaseManager 类, 实现了数据库存储(交换机, 队列, 绑定)和第三篇文章中实现的 MessageFileManager 类, 实现了文件存储(消息)
接下来使用 DiskDataCenter 这个类, 对上述两个类进一步的整合, 封装, 统一对上层提供数据库和文件操作的 API
整合的好处
上层代码(服务器中的 VirtualHost 在操作硬盘数据的时候, 直接调用 DiskDataCenter 类的 API 即可, 不需要关心硬盘上的数据来自数据库还是文件, 也不需要知道具体是怎么实现的)
1, 创建 DiskDataCenter 类
要有两个成员属性, 分别是 dataBaseManager 对象和 messageFileManager 对象, 我们要持有这两个类的 API, 才能进一步封装
public class DiskDataCenter {
private DataBaseManager dataBaseManager = new DataBaseManager();
private MessageFileManager messageFileManager = new MessageFileManager();
}
2, init() 初始化
dataBaseManager 中需要对数据库进行初始化, 所以在当前类提供一个初始化 API
public void init() {
dataBaseManager.init();
}
3, 封装交换机
public void insertExchange(Exchange exchange) {
dataBaseManager.insertExchange(exchange);
}
public void deleteExchange(String exchangeName) {
dataBaseManager.deleteExchange(exchangeName);
}
public List<Exchange> selectAllExchanges() {
return dataBaseManager.selectAllExchanges();
}
4, 封装队列
创建队列的时候要创建对应的目录( queue 目录)和文件( data 文件和 stat 文件), 同理, 删除的时候和也要删除对应的目录和文件
public void insertQueue(MessageQueue queue) throws IOException {
dataBaseManager.insertQueue(queue);
messageFileManager.createQueueFiles(queue.getName());
}
public void deleteQueue(String queueName) throws IOException {
dataBaseManager.deleteQueue(queueName);
messageFileManager.deleteFiles(queueName);
}
public List<MessageQueue> selectAllQueue() {
return dataBaseManager.selectAllQueues();
}
5, 关于绑定
public void insertBinding(Binding binding) {
dataBaseManager.insertBinding(binding);
}
public void deleteBinding(Binding binding) {
dataBaseManager.deleteBinding(binding);
}
public List<Binding> selectAllBindings() {
return dataBaseManager.selectAllBindings();
}
6, 关于消息
删除消息的时候顺便查一下 stat 文件, 如果需要GC, 就调用实现GC的方法(读一下 stat 文件并不是很耗时)
public void sendMessage(MessageQueue queue, Message message) throws IOException, MQException {
messageFileManager.sendMessage(queue, message);
}
public void deleteMessage(MessageQueue queue, Message message) throws IOException, MQException {
messageFileManager.deleteMessage(queue, message);
// 这是逻辑删除, 而不是真正从文件中删掉这个数据, 并且需要判断是否需要进行垃圾回收
if (messageFileManager.isNeedGC(queue.getName())) {
messageFileManager.gc(queue);
}
}
public LinkedList<Message> selectAllMessages(String queueName) throws MQException, IOException {
return messageFileManager.loadAllMessageFromQueue(queueName);
}
二、内存管理
硬盘上存储数据, 只是为了实现 “持久化” 这样的效果. 但是实际的消息存储/转发, 还是主要靠内存的结构
对于 MQ 来说, 内存部分是更关键的, 内存速度更快, 可以达成更⾼的并发
对于交换机, 队列, 绑定, 消息的查找, 直接在内存中找, 增加, 删除需要在管理内存的同时也管理硬盘
1, 数据结构的设计
需要在内存中存储交换机, 队列, 绑定, 交换机, 就需要使用合适的数据结构来辅助我们更方便的存储和管理数据, 并且还需要保证线程安全
- 交换机
使用 ConcurrentHashMap<String, Exchange>,key: exchangeName
,value: exchange
- 队列
使用 ConcurrentHashMap<String, MessageQueue>,key: queueName
,value: queue
- 绑定
使用(嵌套的) ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>>,key: exchangeName
,value: [key: queueName, value: binding]
生产者发布消息的时候, 要指定一个 exchangeName, 服务器需要拿着这个 exchangeName, 在绑定表里查有没有已经绑定过的队列, 如果有队列, 并且符合 bindingKey 和 routingKey 的匹配规则, 才能转发消息
绑定是从交换机和队列这两个维度建立的, 所以使用嵌套的 Map 存储绑定表
-
消息
使用 ConcurrentHashMap<String, Message>,key: messageId
,value: message
-
在队列里的 N 条消息
使用 ConcurrentHashMap<String, LinkedList< Message>>,key: queueName
,value: 该队列中存储消息的链表
-
在队列里的 N 条未确认的消息( “待确认队列” )
使用(嵌套的) ConcurrentHashMap<String, ConcurrentHashMap<String, Message>>,key: queueName
,value: [key: messageId, value: message]
- 之前的文章说明过, 消费者消费消息采用"推"的方式, 即: 队列中有消息之后, 服务器主动推送给订阅了该队列的消费者
- 推送之后, 如果消费者是手动应答
- 在消费者还没应答之前, 服务器视为消费者还没成功消费消息, 需要备份这条消息, 所以这个嵌套的Map 相当于一个 “待确认队列”
- 消费者确认应答之后, 服务器再从这个 “待确认队列” 中删除该消息
2, 创建 MemoryDataCenter 类
public class MemoryDataCenter {
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, MessageQueue> queueMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, LinkedList<Message>> messageInQueueMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> messageInQueueNotAckMap = new ConcurrentHashMap<>();
}
3, 关于交换机
public void addExchange(Exchange exchange) {
exchangeMap.put(exchange.getName(), exchange);
System.out.println("[MemoryDataCenter.addExchange()] " + exchange.getName() + "交换机添加成功");
}
public Exchange getExchange(String exchangeName) {
return exchangeMap.get(exchangeName);
}
public void removeExchange(String exchangeName) {
exchangeMap.remove(exchangeName);
System.out.println("[MemoryDataCenter.removeExchange()] " + exchangeName + "交换机删除成功");
}
4, 关于队列
public void addQueue(MessageQueue queue) {
queueMap.put(queue.getName(), queue);
System.out.println("[MemoryDataCenter.addQueue()] " + queue.getName() + "队列添加成功");
}
public MessageQueue getQueue(String queueName) {
return queueMap.get(queueName);
}
public void removeQueue(String queueName) {
queueMap.remove(queueName);
System.out.println("[MemoryDataCenter.removeQueue()] " + queueName + "队列删除成功");
}
5, 关于绑定
- 1, 添加绑定
注意 bindings
Map 和 bindingMap 不同, bindingMap 表示 bindings
Map 的 value 值
使用 bindings
Map.computeIfAbsent(key) 可以创建 bindings
Map 的value 值( bindingMap )
public void addBinding(Binding binding) throws MQException {
String exchangeName = binding.getExchangeName();
String queueName = binding.getQueueName();
// ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
// if (bindingMap == null) {
// bindingMap = new ConcurrentHashMap<>();
// bindingsMap.put(binding.getExchangeName(), bindingMap);
// }
// 1, 先查是否存在exchange绑定的queue, 没有则创建
ConcurrentHashMap<String, Binding> bindingMap =
bindingsMap.computeIfAbsent(exchangeName, V -> new ConcurrentHashMap<>());
// 2, 再查exchange和queue是否已经存在绑定, 没有创建
synchronized (bindingMap) {
if (bindingMap.get(exchangeName) != null) {
// 如果 exchange-queue 这个绑定已经存在, 不能再插入
throw new MQException("[MemoryDataCenter.addBinding()] exchangeName = " + queueName
+ "-queueName = " + queueName + "的绑定已经存在, 添加失败");
}
bindingMap.put(queueName, binding);
}
System.out.println("[MemoryDataCenter.addBinding()] exchangeName = " + exchangeName
+ "-queueName = " + queueName + "的绑定已经添加成功");
}
- 2, 获取绑定(根据交换机和队列获取绑定)
public Binding getBinding(String exchangeName, String queueName) {
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
if (bindingMap == null) {
return null;
}
return bindingMap.get(queueName);
}
- 3, 获取绑定(根据交换机, 获取该交换机的所有绑定关系)
public ConcurrentHashMap<String, Binding> getBindings(String exchangeName){
return bindingsMap.get(exchangeName);
}
- 4, 删除绑定
public void removeBinding(Binding binding) throws MQException {
// 1, 先判断该交换机是否存在绑定
String exchangeName = binding.getExchangeName();
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
if (bindingMap == null) {
throw new MQException("[MemoryDataCenter.removeBinding()] exchangeName = " + exchangeName + "该交换机的绑定不存在, 删除失败");
}
// 2, 交换机不存在绑定才能删除
bindingsMap.remove(binding.getExchangeName());
System.out.println("[MemoryDataCenter.removeBinding()] exchangeName = " + exchangeName + "该交换机的绑定已删除成功");
}
6, 关于消息
- 关于 messageMap
public void addMessage(Message message) {
messageMap.put(message.getMessageId(), message);
System.out.println("[MemoryDataCenter.addMessage()] 消息添加成功");
}
public Message getMessage(String messageId) {
return messageMap.get(messageId);
}
public void removeMessage(String messageId){
messageMap.remove(messageId);
System.out.println("MemoryDataCenter.removeMessage()] 消息删除成功");
}
- 关于 messageInQueueMap
public void sendMessage(MessageQueue queue, Message message) {
LinkedList<Message> messageList = messageInQueueMap.computeIfAbsent(queue.getName(), V -> new LinkedList<>());
synchronized (messageList) {
messageList.add(message); // 尾插
}
addMessage(message);
System.out.println("[MemoryDataCenter.sendMessage()] 消息发送成功");
}
/**
* 从队列中取走消息, 但不代表消息在内存中没了, 在 messageMap 中还保留
*/
public Message pollMessage(MessageQueue queue) {
LinkedList<Message> messageList = messageInQueueMap.get(queue.getName());
if (messageList == null || messageList.size() == 0) {
return null;
}
synchronized (messageList) {
Message result = messageList.removeFirst(); // 头删
System.out.println("[MemoryDataCenter.pollMessage()] 消息取出成功");
return result;
}
}
/**
* 获取队列中的消息总数
*/
public int getMessageCount(MessageQueue queue) {
LinkedList<Message> messageList = messageInQueueMap.get(queue.getName());
if (messageList == null) {
return 0;
}
return messageList.size();
}
- 关于 messageInQueueNotAckMap
public void addMessageNotAck(String queueName, Message message) {
ConcurrentHashMap<String, Message> messageNotAckMap = messageInQueueNotAckMap.computeIfAbsent(queueName,
V -> new ConcurrentHashMap<>());
messageNotAckMap.put(message.getMessageId(), message);
System.out.println("[MemoryDataCenter.addMessageNotAck()] 未确认消息添加成功");
}
public void removeMessageNotAck(String queueName, String messageId) {
ConcurrentHashMap<String, Message> messageNotAckMap = messageInQueueNotAckMap.get(queueName);
if (messageNotAckMap == null) {
return;
}
messageNotAckMap.remove(messageId);
System.out.println("[MemoryDataCenter.removeMessageNotAck()] 未确认消息删除成功");
}
public Message getMessageNotAck(String queueName, String messageId) {
ConcurrentHashMap<String, Message> messageNotAckMap = messageInQueueNotAckMap.get(queueName);
if (messageNotAckMap == null) {
return null;
}
return messageNotAckMap.get(messageId);
}
–
7, 恢复数据
当服务器重启时, 需要把硬盘上的数据恢复到内存中
public void recover(DiskDataCenter diskDataCenter) throws MQException, IOException {
// 1, 清楚所有内存数据(防止残留)
exchangeMap.clear();
queueMap.clear();
bindingsMap.clear();
messageMap.clear();
messageInQueueMap.clear();
messageInQueueNotAckMap.clear();
// 2. 从硬盘中获取数据并存储在内存
// 2.1, 获取交换机数据
List<Exchange> exchangeList = diskDataCenter.selectAllExchanges();
for (Exchange exchange : exchangeList) {
exchangeMap.put(exchange.getName(), exchange);
}
// 2.2, 获取队列数据
List<MessageQueue> queueList = diskDataCenter.selectAllQueue();
for (MessageQueue queue : queueList) {
queueMap.put(queue.getName(), queue);
}
// 2.3, 获取绑定数据
List<Binding> bindingList = diskDataCenter.selectAllBindings();
for (Binding binding : bindingList) {
ConcurrentHashMap<String, Binding> bindingMap =
bindingsMap.computeIfAbsent(binding.getExchangeName(), V -> new ConcurrentHashMap<>());
bindingMap.put(binding.getQueueName(), binding);
}
// 2.4, 获取消息数据
for (MessageQueue queue : queueList) {
LinkedList<Message> messageList = diskDataCenter.selectAllMessages(queue.getName());
messageInQueueMap.put(queue.getName(), messageList);
for (Message message : messageList) {
messageMap.put(message.getMessageId(), message);
}
}
}
恢复消息数据, 只需要恢复 messageInQueueMap 和 messageMap , 不需要关心 messageInQueueNotAckMap 因为 messageInQueueNotAckMap 只是在内存中存储, 如果服务器重启, 这块数据丢失了就丢失了, 因为 “未确认的消息” 不会被服务器真正的删除, 重启之后, “未确认的消息” 会被重新加载回内存
三、小结
本文主实现了两点:
- 1, 实现了对数据库+文件这两个模块的进一步整合, 封装, 为上层提供了硬盘数据管理的 API
- 2, 实现了内存数据管理, 并考虑了线程安全
-
- 2.1, 设计了消息在内存中存储使用的数据结构
-
- 2.2, 实现了内存中的交换机, 队列, 绑定, 消息, 以及消息和队列的关联, 这些数据的增删查
至此, 服务器对于硬盘数据和内存数据都做好了管理, 下篇文章就可以设计 VirtualHost 了, 需要对硬盘数据和内存数据再做进一步的整合, 封装, 统一管理