目录
一、引言
二、MemoryDataCenter
1.设计数据结构
2.封装Exchange方法
3.封装MsgQueue方法
4.封装Binding方法
5.封装Message
6.实现待确定消息的管理
7.将数据从硬盘上恢复到内存中
三、测试MemoryDataCenter
1.准备工作
2.测试交换机
3.测试队列
4.测试绑定
5.测试消息
6.测试发送消息
7.测试待确认消息
8.测试从硬盘上读取消息到内存
四、总结
一、引言
上一篇文章介绍了统一硬盘处理的操作,这一篇文章我们就简单介绍一下数据在内存里面的管理。
二、MemoryDataCenter
1.设计数据结构
// 此处为了线程安全,我们使用ConcurrentHashMap这样的数据结构
private ConcurrentHashMap<String,Exchange> exchangeMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String,MsgQueue> queueMap = new ConcurrentHashMap<>();
// 绑定:使用嵌套的HashMap,key是exchangeName,value也是一个HashMap(key是queueName,value是Binding对象)
private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
// 表示“未被确认”的消息:使用嵌套的HashMap,key是queueName,value是HashMap(key是messageId,value是Message对象)
// 存储当前队列中哪些消息被消费者取走,但是还没有应答
private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();
2.封装Exchange方法
/*
封装交换机
*/
public void insertExchange(Exchange exchange){
exchangeMap.put(exchange.getName(),exchange);
System.out.println("[MemoryDataCenter] 交换机添加成功!exchangeName="+exchange.getName());
}
public Exchange getExchange(String exchangeName){
return exchangeMap.get(exchangeName);
}
public void deleteExchange(String exchangeName){
exchangeMap.remove(exchangeName);
System.out.println("[MemoryDataCenter] 交换机删除成功!exchangeName="+exchangeName);
}
3.封装MsgQueue方法
/*
封装队列
*/
public void insertQueue(MsgQueue queue){
queueMap.put(queue.getName(),queue);
System.out.println("[MemoryDataCenter] 队列添加成功!queueName="+queue.getName());
}
public MsgQueue getQueue(String queueName){
return queueMap.get(queueName);
}
public void deleteQueue(String queueName){
queueMap.remove(queueName);
System.out.println("[MemoryDataCenter] 队列删除成功!queueName="+queueName);
}
4.封装Binding方法
/*
封装绑定
*/
public void insertBinding(Binding binding) throws MqException {
ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
k -> new ConcurrentHashMap<>());
synchronized (bindingMap){
if(bindingMap.get(binding.getQueueName())!=null){
throw new MqException("[MemoryDataCenter] 绑定已经存在!exchangeName="+binding.getExchangeName()
+",queueName="+binding.getQueueName());
}
bindingMap.put(binding.getQueueName(),binding);
}
System.out.println("[MemoryDataCenter] 绑定添加成功!exchangeName="+binding.getExchangeName()
+",queueName="+ binding.getQueueName());
}
public Binding getBinding(String exchangeName,String queueName){
ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(exchangeName);
if(bindingMap==null){
return null;
}
return bindingMap.get(queueName);
}
public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){
return bindingsMap.get(exchangeName);
}
public void deleteBinding(Binding binding) throws MqException {
ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
if(bindingMap==null){
throw new MqException("[MemoryDataCenter] 绑定不存在!exchangeName="+binding.getExchangeName()
+",queueName="+binding.getQueueName());
}
bindingMap.remove(binding.getQueueName());
System.out.println("[MemoryDataCenter] 绑定删除成功!exchangeName="+binding.getExchangeName()
+",queueName="+binding.getQueueName());
}
5.封装Message
/*
封装消息
*/
public void addMessage(Message message){
messageMap.put(message.getMessageId(),message);
System.out.println("[MemoryDataCenter] 新消息添加成功!messageId="+message.getMessageId());
}
public Message getMessage(String messageId){
return messageMap.get(messageId);
}
public void removeMessage(String messageId){
messageMap.remove(messageId);
System.out.println("[MemoryDataCenter] 消息删除成功!messageId="+messageId);
}
public void sendMessage(MsgQueue queue,Message message){
LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(),k->new LinkedList<>());
synchronized (messages){
messages.add(message);
}
addMessage(message);
System.out.println("[MemoryDataCenter] 消息投递到队列成功!messageId="+message.getMessageId());
}
public Message pollMessage(String queueName){
LinkedList<Message> messages = queueMessageMap.get(queueName);
if(messages==null){
return null;
}
synchronized (messages){
if(messages.size()==0){
return null;
}
Message currentMessage = messages.remove(0);
System.out.println("[MemoryDataCenter] 消息从队列中取出!messageId="+currentMessage.getMessageId());
return currentMessage;
}
}
public int getMessageCount(String queueName){
LinkedList<Message> messages = queueMessageMap.get(queueName);
if(messages==null){
return 0;
}
synchronized (messages){
return messages.size();
}
}
6.实现待确定消息的管理
/*
实现待确定消息的管理
*/
public void addMessageWaitAck(String queueName,Message message){
ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName
,k->new ConcurrentHashMap<>());
messageHashMap.put(message.getMessageId(),message);
System.out.println("[MemoryDataCenter] 消息进入待确认队列!queueName="+queueName);
}
public void removeMessageWaitAck(String queueName,String messageId){
ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
if(messageHashMap==null){
return;
}
messageHashMap.remove(messageId);
System.out.println("[MemoryDataCenter] 消息从待确认队列中删除!messageId="+messageId);
}
public Message getMessageWaitAck(String queueName,String messageId){
ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
if(messageHashMap==null){
return null;
}
return messageHashMap.get(messageId);
}
7.将数据从硬盘上恢复到内存中
public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {
// 1.清空所有哈希表
exchangeMap.clear();
queueMap.clear();
bindingsMap.clear();
messageMap.clear();
queueMessageMap.clear();
// 2.恢复所有交换机数据
List<Exchange> exchanges = diskDataCenter.selectAllExchanges();
for(Exchange exchange:exchanges){
exchangeMap.put(exchange.getName(),exchange);
}
// 3.恢复所有队列数据
List<MsgQueue> queues = diskDataCenter.selectAllQueues();
for(MsgQueue queue:queues){
queueMap.put(queue.getName(),queue);
}
// 4.恢复所有绑定数据
List<Binding> bindings = diskDataCenter.selectAllBindings();
for(Binding binding:bindings){
ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName()
,k-> new ConcurrentHashMap<>());
bindingMap.put(binding.getQueueName(),binding);
}
// 5.恢复所有消息数据
for(MsgQueue queue:queues){
LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());
queueMessageMap.put(queue.getName(),messages);
for(Message message:messages){
messageMap.put(message.getMessageId(),message);
}
}
}
三、测试MemoryDataCenter
1.准备工作
@SpringBootTest
public class MemoryDataCenterTests {
private MemoryDataCenter memoryDataCenter = null;
@BeforeEach
public void setUp(){
memoryDataCenter = new MemoryDataCenter();
}
@AfterEach
public void tearDown(){
memoryDataCenter = null;
}
private Exchange createTestExchange(String exchangeName){
Exchange exchange = new Exchange();
exchange.setName(exchangeName);
exchange.setType(ExchangeType.DIRECT);
exchange.setDurable(true);
exchange.setAutoDelete(false);
return exchange;
}
private MsgQueue createTestQueue(String queueName){
MsgQueue queue = new MsgQueue();
queue.setName(queueName);
queue.setDurable(true);
queue.setExclusive(false);
queue.setAutoDelete(false);
return queue;
}
private Message createTestMessage(String content){
Message message = Message.createMessageWithId("testRoutingKey",null,content.getBytes());
return message;
}
}
2.测试交换机
@Test
public void testExchange(){
Exchange expectedExchange = createTestExchange("testExchange");
memoryDataCenter.insertExchange(expectedExchange);
Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
Assertions.assertEquals(expectedExchange,actualExchange);
memoryDataCenter.deleteExchange("testExchange");
actualExchange = memoryDataCenter.getExchange("testExchange");
Assertions.assertNull(actualExchange);
}
3.测试队列
@Test
public void testQueue(){
MsgQueue expectedQueue = createTestQueue("testQueue");
memoryDataCenter.insertQueue(expectedQueue);
MsgQueue actualQueue = memoryDataCenter.getQueue("testQueue");
Assertions.assertEquals(expectedQueue,actualQueue);
memoryDataCenter.deleteQueue("testQueue");
actualQueue = memoryDataCenter.getQueue("testQueue");
Assertions.assertNull(actualQueue);
}
4.测试绑定
@Test
public void testBinding() throws MqException {
Binding expectedBinding = new Binding();
expectedBinding.setExchangeName("testExchange");
expectedBinding.setQueueName("testQueue");
expectedBinding.setBindingKey("testBindingKey");
memoryDataCenter.insertBinding(expectedBinding);
Binding actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");
Assertions.assertEquals(expectedBinding,actualBinding);
ConcurrentHashMap<String,Binding> bindingMap = memoryDataCenter.getBindings("testExchange");
Assertions.assertEquals(1,bindingMap.size());
Assertions.assertEquals(expectedBinding,bindingMap.get("testQueue"));
memoryDataCenter.deleteBinding(expectedBinding);
actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");
Assertions.assertNull(actualBinding);
}
5.测试消息
@Test
public void testMessage(){
Message expectedMessage = createTestMessage("testMessgae");
memoryDataCenter.addMessage(expectedMessage);
Message actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());
Assertions.assertEquals(expectedMessage,actualMessage);
memoryDataCenter.removeMessage(expectedMessage.getMessageId());
actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());
Assertions.assertNull(actualMessage);
}
6.测试发送消息
@Test
public void testSendMessage(){
MsgQueue queue = createTestQueue("testQueue");
List<Message> expectedMessages = new ArrayList<>();
for(int i=0;i<10;i++){
Message message = createTestMessage("testMessage"+i);
memoryDataCenter.sendMessage(queue,message);
expectedMessages.add(message);
}
List<Message> actualMessage = new ArrayList<>();
while(true){
Message message = memoryDataCenter.pollMessage("testQueue");
if(message==null){
break;
}
actualMessage.add(message);
}
Assertions.assertEquals(expectedMessages.size(),actualMessage.size());
for(int i=0;i<actualMessage.size();i++){
Assertions.assertEquals(expectedMessages.get(i),actualMessage.get(i));
}
}
7.测试待确认消息
@Test
public void testMessageWaitAck(){
Message expectedMessage = createTestMessage("expectedMessage");
memoryDataCenter.addMessageWaitAck("testQueue",expectedMessage);
Message actualMessage = memoryDataCenter.getMessageWaitAck("testQueue",expectedMessage.getMessageId());
Assertions.assertEquals(expectedMessage,actualMessage);
memoryDataCenter.removeMessageWaitAck("testQueue",expectedMessage.getMessageId());
actualMessage = memoryDataCenter.getMessageWaitAck("testQueue",expectedMessage.getMessageId());
Assertions.assertNull(actualMessage);
}
8.测试从硬盘上读取消息到内存
@Test
public void testRecovery() throws IOException, MqException, ClassNotFoundException {
SpringDemoMqApplication.context = SpringApplication.run(SpringDemoMqApplication.class);
DiskDataCenter diskDataCenter = new DiskDataCenter();
diskDataCenter.init();
Exchange expectedExchange = createTestExchange("testExchange");
diskDataCenter.insertExchange(expectedExchange);
MsgQueue expectedQueue = createTestQueue("testQueue");
diskDataCenter.insertQueue(expectedQueue);
Binding expectedBinding = new Binding();
expectedBinding.setExchangeName("testExchange");
expectedBinding.setQueueName("testQueue");
expectedBinding.setBindingKey("testBindingKey");
diskDataCenter.insertBinding(expectedBinding);
Message expectedMessage = createTestMessage("testContent");
diskDataCenter.sendMessage(expectedQueue,expectedMessage);
memoryDataCenter.recovery(diskDataCenter);
Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
Assertions.assertEquals(expectedExchange.getName(),actualExchange.getName());
Assertions.assertEquals(expectedExchange.getType(),actualExchange.getType());
MsgQueue actualQueue = memoryDataCenter.getQueue("testQueue");
Assertions.assertEquals(expectedQueue.getName(),actualQueue.getName());
Assertions.assertEquals(expectedQueue.isExclusive(),actualQueue.isExclusive());
Binding actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");
Assertions.assertEquals(expectedBinding.getExchangeName(),actualBinding.getExchangeName());
Assertions.assertEquals(expectedBinding.getQueueName(),actualBinding.getQueueName());
Message actualMessage = memoryDataCenter.pollMessage("testQueue");
Assertions.assertEquals(expectedMessage.getMessageId(),actualMessage.getMessageId());
Assertions.assertEquals(expectedMessage.getRoutingKey(),actualMessage.getRoutingKey());
Assertions.assertEquals(expectedMessage.getDeliverMode(),actualMessage.getDeliverMode());
Assertions.assertArrayEquals(expectedMessage.getBody(),actualMessage.getBody());
SpringDemoMqApplication.context.close();
File dataDir = new File("./data");
FileUtils.deleteDirectory(dataDir);
}
四、总结
本篇文章主要介绍了一下如何在内存中管理数据,代码层面也是对相关概念进行操作以及测试,下一篇文章我们将学习虚拟主机设计内容,感谢观看!