Java项目--仿RabbitMQ的消息队列--内存数据管理

news2024/12/18 6:32:08

目录

一、引言

二、MemoryDataCenter

1.设计数据结构

2.封装Exchange方法

3.封装MsgQueue方法

4.封装Binding方法

5.封装Message

6.实现待确定消息的管理

7.将数据从硬盘上恢复到内存中

三、测试MemoryDataCenter

1.准备工作

2.测试交换机

3.测试队列

4.测试绑定

5.测试消息

6.测试发送消息

7.测试待确认消息

8.测试从硬盘上读取消息到内存

四、总结


一、引言

  上一篇文章介绍了统一硬盘处理的操作,这一篇文章我们就简单介绍一下数据在内存里面的管理。

二、MemoryDataCenter

1.设计数据结构

// 此处为了线程安全,我们使用ConcurrentHashMap这样的数据结构
    private ConcurrentHashMap<String,Exchange> exchangeMap = new ConcurrentHashMap<>();

    private ConcurrentHashMap<String,MsgQueue> queueMap = new ConcurrentHashMap<>();

    // 绑定:使用嵌套的HashMap,key是exchangeName,value也是一个HashMap(key是queueName,value是Binding对象)
    private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingMap = new ConcurrentHashMap<>();

    private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();

    private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();

    // 表示“未被确认”的消息:使用嵌套的HashMap,key是queueName,value是HashMap(key是messageId,value是Message对象)
    // 存储当前队列中哪些消息被消费者取走,但是还没有应答
    private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();

2.封装Exchange方法

/*
    封装交换机
     */
    public void insertExchange(Exchange exchange){
        exchangeMap.put(exchange.getName(),exchange);
        System.out.println("[MemoryDataCenter] 交换机添加成功!exchangeName="+exchange.getName());
    }

    public Exchange getExchange(String exchangeName){
        return exchangeMap.get(exchangeName);
    }

    public void deleteExchange(String exchangeName){
        exchangeMap.remove(exchangeName);
        System.out.println("[MemoryDataCenter] 交换机删除成功!exchangeName="+exchangeName);
    }

3.封装MsgQueue方法

/*
    封装队列
     */
    public void insertQueue(MsgQueue queue){
        queueMap.put(queue.getName(),queue);
        System.out.println("[MemoryDataCenter] 队列添加成功!queueName="+queue.getName());
    }

    public MsgQueue getQueue(String queueName){
        return queueMap.get(queueName);
    }

    public void deleteQueue(String queueName){
        queueMap.remove(queueName);
        System.out.println("[MemoryDataCenter] 队列删除成功!queueName="+queueName);
    }

4.封装Binding方法

/*
    封装绑定
     */
    public void insertBinding(Binding binding) throws MqException {
        ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
                                                        k -> new ConcurrentHashMap<>());
        synchronized (bindingMap){
            if(bindingMap.get(binding.getQueueName())!=null){
                throw new MqException("[MemoryDataCenter] 绑定已经存在!exchangeName="+binding.getExchangeName()
                        +",queueName="+binding.getQueueName());
            }
            bindingMap.put(binding.getQueueName(),binding);
        }
        System.out.println("[MemoryDataCenter] 绑定添加成功!exchangeName="+binding.getExchangeName()
                +",queueName="+ binding.getQueueName());
    }

    public Binding getBinding(String exchangeName,String queueName){
        ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(exchangeName);
        if(bindingMap==null){
            return null;
        }
        return bindingMap.get(queueName);
    }

    public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){
        return bindingsMap.get(exchangeName);
    }

    public void deleteBinding(Binding binding) throws MqException {
        ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
        if(bindingMap==null){
            throw new MqException("[MemoryDataCenter] 绑定不存在!exchangeName="+binding.getExchangeName()
                    +",queueName="+binding.getQueueName());
        }
        bindingMap.remove(binding.getQueueName());
        System.out.println("[MemoryDataCenter] 绑定删除成功!exchangeName="+binding.getExchangeName()
                +",queueName="+binding.getQueueName());
    }

5.封装Message

/*
    封装消息
     */
    public void addMessage(Message message){
        messageMap.put(message.getMessageId(),message);
        System.out.println("[MemoryDataCenter] 新消息添加成功!messageId="+message.getMessageId());
    }

    public Message getMessage(String messageId){
        return messageMap.get(messageId);
    }

    public void removeMessage(String messageId){
        messageMap.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息删除成功!messageId="+messageId);
    }

    public void sendMessage(MsgQueue queue,Message message){
        LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(),k->new LinkedList<>());
        synchronized (messages){
            messages.add(message);
        }
        addMessage(message);
        System.out.println("[MemoryDataCenter] 消息投递到队列成功!messageId="+message.getMessageId());
    }

    public Message pollMessage(String queueName){
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        if(messages==null){
            return null;
        }
        synchronized (messages){
            if(messages.size()==0){
                return null;
            }
            Message currentMessage = messages.remove(0);
            System.out.println("[MemoryDataCenter] 消息从队列中取出!messageId="+currentMessage.getMessageId());
            return currentMessage;
        }
    }

    public int getMessageCount(String queueName){
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        if(messages==null){
            return 0;
        }
        synchronized (messages){
            return messages.size();
        }
    }

6.实现待确定消息的管理

/*
    实现待确定消息的管理
     */
    public void addMessageWaitAck(String queueName,Message message){
        ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName
                                    ,k->new ConcurrentHashMap<>());
        messageHashMap.put(message.getMessageId(),message);
        System.out.println("[MemoryDataCenter] 消息进入待确认队列!queueName="+queueName);
    }

    public void removeMessageWaitAck(String queueName,String messageId){
        ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
        if(messageHashMap==null){
            return;
        }
        messageHashMap.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息从待确认队列中删除!messageId="+messageId);
    }

    public Message getMessageWaitAck(String queueName,String messageId){
        ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
        if(messageHashMap==null){
            return null;
        }
        return messageHashMap.get(messageId);
    }

7.将数据从硬盘上恢复到内存中

public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {
        // 1.清空所有哈希表
        exchangeMap.clear();
        queueMap.clear();
        bindingsMap.clear();
        messageMap.clear();
        queueMessageMap.clear();
        // 2.恢复所有交换机数据
        List<Exchange> exchanges = diskDataCenter.selectAllExchanges();
        for(Exchange exchange:exchanges){
            exchangeMap.put(exchange.getName(),exchange);
        }
        // 3.恢复所有队列数据
        List<MsgQueue> queues = diskDataCenter.selectAllQueues();
        for(MsgQueue queue:queues){
            queueMap.put(queue.getName(),queue);
        }
        // 4.恢复所有绑定数据
        List<Binding> bindings = diskDataCenter.selectAllBindings();
        for(Binding binding:bindings){
            ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName()
                                                            ,k-> new ConcurrentHashMap<>());
            bindingMap.put(binding.getQueueName(),binding);
        }
        // 5.恢复所有消息数据
        for(MsgQueue queue:queues){
            LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());
            queueMessageMap.put(queue.getName(),messages);
            for(Message message:messages){
                messageMap.put(message.getMessageId(),message);
            }
        }
    }

三、测试MemoryDataCenter

1.准备工作

@SpringBootTest
public class MemoryDataCenterTests {
    private MemoryDataCenter memoryDataCenter = null;

    @BeforeEach
    public void setUp(){
        memoryDataCenter = new MemoryDataCenter();
    }

    @AfterEach
    public void tearDown(){
        memoryDataCenter = null;
    }

    private Exchange createTestExchange(String exchangeName){
        Exchange exchange = new Exchange();
        exchange.setName(exchangeName);
        exchange.setType(ExchangeType.DIRECT);
        exchange.setDurable(true);
        exchange.setAutoDelete(false);
        return exchange;
    }

    private MsgQueue createTestQueue(String queueName){
        MsgQueue queue = new MsgQueue();
        queue.setName(queueName);
        queue.setDurable(true);
        queue.setExclusive(false);
        queue.setAutoDelete(false);
        return queue;
    }
    
    private Message createTestMessage(String content){
        Message message =       Message.createMessageWithId("testRoutingKey",null,content.getBytes());
        return message;
    }


    
}

2.测试交换机

@Test
    public void testExchange(){
        Exchange expectedExchange = createTestExchange("testExchange");
        memoryDataCenter.insertExchange(expectedExchange);

        Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
        Assertions.assertEquals(expectedExchange,actualExchange);
        memoryDataCenter.deleteExchange("testExchange");
        actualExchange = memoryDataCenter.getExchange("testExchange");
        Assertions.assertNull(actualExchange);
    }

3.测试队列

@Test
    public void testQueue(){
        MsgQueue expectedQueue = createTestQueue("testQueue");
        memoryDataCenter.insertQueue(expectedQueue);

        MsgQueue actualQueue = memoryDataCenter.getQueue("testQueue");
        Assertions.assertEquals(expectedQueue,actualQueue);
        memoryDataCenter.deleteQueue("testQueue");
        actualQueue = memoryDataCenter.getQueue("testQueue");
        Assertions.assertNull(actualQueue);
    }

4.测试绑定

@Test
    public void testBinding() throws MqException {
        Binding expectedBinding = new Binding();
        expectedBinding.setExchangeName("testExchange");
        expectedBinding.setQueueName("testQueue");
        expectedBinding.setBindingKey("testBindingKey");
        memoryDataCenter.insertBinding(expectedBinding);

        Binding actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");
        Assertions.assertEquals(expectedBinding,actualBinding);

        ConcurrentHashMap<String,Binding> bindingMap = memoryDataCenter.getBindings("testExchange");
        Assertions.assertEquals(1,bindingMap.size());
        Assertions.assertEquals(expectedBinding,bindingMap.get("testQueue"));

        memoryDataCenter.deleteBinding(expectedBinding);
        actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");
        Assertions.assertNull(actualBinding);
    }

5.测试消息

@Test
    public void testMessage(){
        Message expectedMessage = createTestMessage("testMessgae");
        memoryDataCenter.addMessage(expectedMessage);
        Message actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());
        Assertions.assertEquals(expectedMessage,actualMessage);
        memoryDataCenter.removeMessage(expectedMessage.getMessageId());
        actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());
        Assertions.assertNull(actualMessage);
    }

6.测试发送消息

 @Test
    public void testSendMessage(){
        MsgQueue queue = createTestQueue("testQueue");
        List<Message> expectedMessages = new ArrayList<>();
        for(int i=0;i<10;i++){
            Message message = createTestMessage("testMessage"+i);
            memoryDataCenter.sendMessage(queue,message);
            expectedMessages.add(message);
        }
        List<Message> actualMessage = new ArrayList<>();
        while(true){
            Message message = memoryDataCenter.pollMessage("testQueue");
            if(message==null){
                break;
            }
            actualMessage.add(message);
        }
        Assertions.assertEquals(expectedMessages.size(),actualMessage.size());
        for(int i=0;i<actualMessage.size();i++){
            Assertions.assertEquals(expectedMessages.get(i),actualMessage.get(i));
        }
    }

7.测试待确认消息

@Test
    public void testMessageWaitAck(){
        Message expectedMessage = createTestMessage("expectedMessage");
        memoryDataCenter.addMessageWaitAck("testQueue",expectedMessage);
        Message actualMessage = memoryDataCenter.getMessageWaitAck("testQueue",expectedMessage.getMessageId());
        Assertions.assertEquals(expectedMessage,actualMessage);
        memoryDataCenter.removeMessageWaitAck("testQueue",expectedMessage.getMessageId());
        actualMessage = memoryDataCenter.getMessageWaitAck("testQueue",expectedMessage.getMessageId());
        Assertions.assertNull(actualMessage);
    }

8.测试从硬盘上读取消息到内存

@Test
    public void testRecovery() throws IOException, MqException, ClassNotFoundException {
        SpringDemoMqApplication.context = SpringApplication.run(SpringDemoMqApplication.class);
        DiskDataCenter diskDataCenter = new DiskDataCenter();
        diskDataCenter.init();

        Exchange expectedExchange = createTestExchange("testExchange");
        diskDataCenter.insertExchange(expectedExchange);

        MsgQueue expectedQueue = createTestQueue("testQueue");
        diskDataCenter.insertQueue(expectedQueue);

        Binding expectedBinding = new Binding();
        expectedBinding.setExchangeName("testExchange");
        expectedBinding.setQueueName("testQueue");
        expectedBinding.setBindingKey("testBindingKey");
        diskDataCenter.insertBinding(expectedBinding);

        Message expectedMessage = createTestMessage("testContent");
        diskDataCenter.sendMessage(expectedQueue,expectedMessage);

        memoryDataCenter.recovery(diskDataCenter);

        Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
        Assertions.assertEquals(expectedExchange.getName(),actualExchange.getName());
        Assertions.assertEquals(expectedExchange.getType(),actualExchange.getType());

        MsgQueue actualQueue = memoryDataCenter.getQueue("testQueue");
        Assertions.assertEquals(expectedQueue.getName(),actualQueue.getName());
        Assertions.assertEquals(expectedQueue.isExclusive(),actualQueue.isExclusive());

        Binding actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");
        Assertions.assertEquals(expectedBinding.getExchangeName(),actualBinding.getExchangeName());
        Assertions.assertEquals(expectedBinding.getQueueName(),actualBinding.getQueueName());

        Message actualMessage = memoryDataCenter.pollMessage("testQueue");
        Assertions.assertEquals(expectedMessage.getMessageId(),actualMessage.getMessageId());
        Assertions.assertEquals(expectedMessage.getRoutingKey(),actualMessage.getRoutingKey());
        Assertions.assertEquals(expectedMessage.getDeliverMode(),actualMessage.getDeliverMode());
        Assertions.assertArrayEquals(expectedMessage.getBody(),actualMessage.getBody());

        SpringDemoMqApplication.context.close();
        File dataDir = new File("./data");
        FileUtils.deleteDirectory(dataDir);
    }

四、总结

  本篇文章主要介绍了一下如何在内存中管理数据,代码层面也是对相关概念进行操作以及测试,下一篇文章我们将学习虚拟主机设计内容,感谢观看!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2261448.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

动态导出word文件支持转pdf

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、功能说明二、使用步骤1.controller2.工具类 DocumentUtil 导出样式 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 例如&#xff…

那些不属性的C语言关键字-const

大家都知道const修饰的变量是不可变的&#xff0c;但是到底是怎么实现的那&#xff0c;有方法修改只读变量的值吗&#xff0c;今天我们结合实验代码&#xff0c;分析下const关键字的实现原理 const变量 1.const修饰局部变量 int main(){const int abc 123;printf("%d\…

【Java 数据结构】List -> 给我一个接口!!!

&#x1f525;博客主页&#x1f525;&#xff1a;【 坊钰_CSDN博客 】 欢迎各位点赞&#x1f44d;评论✍收藏⭐ 目录 1. 什么是 List 2. List 常用的方法 3. List 的使用 1. 什么是 List 其实 List 是一个接口&#xff0c;它继承了 Collection 接口 下列为 List 接口中的各种…

【5G】5G的主要架构选项

最初&#xff0c;在3GPP讨论中考虑了所有可能的聚合和核心网络组合&#xff0c;共有八个架构选项。以下重点介绍option2、3、4和7。 1. 独立组网 (Standalone, SA) 架构选项 2 &#xff1a;Standalone architecture with 5G-core 特点&#xff1a; 5G核心网&#xff08;5GC, …

Ajax简单理解

Ajax 1 什么是ajax AJAXAsynchronous JavaScript and XML (异步的JavaScript和XML)AJAX不是新的编程语言&#xff0c;二十一种使用现有标准的新方法 AJAX 最大的优点是在不重新加载整个页面的情况下&#xff0c;可以与服务器交换数据并更新部分网页内容。 AJAX 不需要任何浏…

【GESP】C++二级考试大纲知识点梳理, (2)计算机网络的基本概念及分类

GESP C二级官方考试大纲中&#xff0c;共有9条考点&#xff0c;本文针对C&#xff08;2&#xff09;号知识点进行总结梳理。 &#xff08;2&#xff09;了解计算机网络的概念&#xff0c;了解计算机网络的分类&#xff08;广域网&#xff08;WAN&#xff09;、城域网&#xff0…

相机与NAS的奇妙组合,如何使用相机拍照自动上传或备份到NAS

相机与NAS的奇妙组合&#xff0c;如何使用相机拍照自动上传或备份到NAS 哈喽小伙伴们好&#xff0c;我是Stark-C~ 对于喜欢使用专业器材拍照摄影的小伙伴来说&#xff0c;想要将相机存储卡中的照片或视频导出到电脑上&#xff0c;要么是使用数据线直接和相机连接&#xff0c;…

window下的qt5.14.2配置vs2022

这里做一个笔记&#xff0c;已知qt5.14.2和vs2022不兼容&#xff0c;无法自动扫描到vs的编译器。但由于团队协作原因&#xff0c;必须使用qt5.14.2&#xff0c;并且第三方库又依赖vs2022。其实qt5.15.2是支持vs2022的&#xff0c;如果能够用qt5.15.2&#xff0c;还是建议使用qt…

QT从入门到精通(一)——Qlabel介绍与使用

1. QT介绍——代码测试 Qt 是一个跨平台的应用程序开发框架&#xff0c;广泛用于开发图形用户界面&#xff08;GUI&#xff09;应用程序&#xff0c;也支持非图形应用程序的开发。Qt 提供了一套工具和库&#xff0c;使得开发者能够高效地构建高性能、可移植的应用程序。以下是…

【协作笔记Trilium Notes Docker部署】开源协作笔记Trilium Notes本地Docker部署远程协作

文章目录 前言1. 安装docker与docker-compose2. 启动容器运行镜像3. 本地访问测试4.安装内网穿透5. 创建公网地址6. 创建固定公网地址 前言 今天分享一款在G站获得了26K的强大的开源在线协作笔记软件&#xff0c;Trilium Notes的中文版如何在Linux环境使用docker本地部署&…

app的测试范围以及web和app的测试区别

目录 图1.App的测试范围1.1功能测试1.2专项测试1.3性能测试 2.Web和App的测试区别2.1相同点2.2不同点 &#x1f44d; 点赞&#xff0c;你的认可是我创作的动力&#xff01; ⭐️ 收藏&#xff0c;你的青睐是我努力的方向&#xff01; ✏️ 评论&#xff0c;你的意见是我进步的…

数据分析实战—鸢尾花数据分类

1.实战内容 (1) 加载鸢尾花数据集(iris.txt)并存到iris_df中,使用seaborn.lmplot寻找class&#xff08;种类&#xff09;项中的异常值&#xff0c;其他异常值也同时处理 。 import pandas as pd from sklearn.datasets import load_iris pd.set_option(display.max_columns, N…

docker 拉取镜像 | 创建容器 | 容器运行

拉取镜像 拉取镜像的命令&#xff1a;docker pull name &#xff08;name换为你要拉取的镜像名&#xff09; docker pull docker.1panel.live/hispark/qiankunbp:1.0.0 docker.1panel.live/hispark/qiankunbp:1.0.0为镜像名 拉取海思的镜像&#xff1a;&#xff08;如果之前拉…

添加标签(vue3)

点击添加按钮&#xff1a; 最多添加5个 注意&#xff1a; 不只可以el-form 进行校验&#xff0c;也可以对单个el-form-item 进行校验 vue elementUI form组件动态添加el-form-item并且动态添加rules必填项校验方法-CSDN博客 el-form 里边有el-form-item &#xff0c;el-fo…

Dash for Mac 代码API文档管理软件安装

Mac分享吧 文章目录 Dash for Mac 代码API文档管理软件 效果图展示一、Dash 代码API文档管理软件 Mac电脑版——v7.3.31️⃣&#xff1a;下载软件2️⃣&#xff1a;安装软件2.1 左侧安装包拖入右侧文件夹中&#xff0c;等待安装完成&#xff0c;运行软件2.2 打开软件&#xff…

Unity添加newtonsoft-json

package name "com.unity.nuget.newtonsoft-json": "3.2.1",打开包管理器 输入包名称和版本 点击添加

分布式全文检索引擎ElasticSearch-数据的写入存储底层原理

一、数据写入的核心流程 当向 ES 索引写入数据时&#xff0c;整体流程如下&#xff1a; 1、客户端发送写入请求 客户端向 ES 集群的任意节点&#xff08;称为协调节点&#xff0c;Coordinating Node&#xff09;发送一个写入请求&#xff0c;比如 index&#xff08;插入或更…

TensorRT C++ API模型加速 —— TensorRT配置、模型转换、CUDA C++加速、TensorRT C++ 加速

文章目录 前言&#xff1a;TensorRT简介0.1、TensorRT算法流程0.2、TensorRT主要优化技术 一、TensorRT配置1.1、TensorRT环境配置1.1.1、CUDA安装1.1.2、TensorRT下载1.1.3、TensorRT CUDA配置1.1.4、TensorRT配置1.1.4.1、TensorRT python配置1.1.4.2、TensorRT C配置&#x…

RPC 服务与 gRPC 的入门案例

RPC 协议 RPC&#xff08;Remote Procedure Call Protocol&#xff09;即远程过程调用协议&#xff0c;它是一种通过网络从远程计算机程序上请求服务的协议&#xff0c;允许一个计算机程序可以像调用本地服务一样调用远程服务 。 RPC的主要作用是不同的服务间方法调用就像本地…

基于Spring Boot的体育商品推荐系统

一、系统背景与目的 随着电子商务的快速发展和人们健康意识的提高&#xff0c;体育商品市场呈现出蓬勃发展的态势。然而&#xff0c;传统的体育商品销售方式存在商品种类繁多、用户选择困难、个性化需求无法满足等问题。为了解决这些问题&#xff0c;基于Spring Boot的体育商品…