根据源码,模拟实现 RabbitMQ - 内存数据管理(4)

news2024/11/24 10:22:40

目录

一、内存数据管理

1.1、需求分析

1.2、实现 MemoryDataCenter 类

1.2.1、ConcurrentHashMap 数据管理

1.2.2、封装交换机操作

1.2.3、封装队列操作

1.2.4、封装绑定操作

1.2.5、封装消息操作

1.2.6、封装未确认消息操作

1.2.7、封装恢复数据操作


一、内存数据管理


1.1、需求分析

当前已经使用 数据库 管理了 交换机、绑定、队列,又使用 数据文件 管理了 消息.

最后还使用一个类将上述两部分整合在了一起,对上层提供统一的一套接口.

但对于 MQ 来说,是以内存存储数据为主,硬盘存储数据为辅(硬盘数据主要是为了持久化保存,重启之后,数据不丢失).

接下来就需要使用 内存 来管理上述数据~~

这里我们主要使用 ConcurrentHashMap 来进行数据管理(主要是因为线程安全问题).

交换机:使用 ConcurrentHashMap,key 是 name,value 是 Exchange 对象。

队列:使用 ConcurrentHashMap,key 是 name,value 是 MSGQueue 对象。

绑定:使用嵌套的 ConcurrentHashMap,key 是 exchangeName,value 是一个 ConcurrentHashMap(key 是 queueName,value 是 Binding 对象)。

消息:使用 ConcurrentHashMap,key 是 messageId ,value 是 Message 对象。

队列和消息的关联关系:使用嵌套的 ConcurrentHashMap,key 是 queueName,value 是一个 LinkedList(每个元素是一个 Message 对象)。

表示 “未被确认” 的消息:使用嵌套的 ConcurrentHashMap,key 是 queueName,value 是 ConcurrentHashMap(key 是 messageId,value 是 Message 对象,后续实现消息确认的逻辑,需要根据 ack 响应的内容,会提供一个确认的 messageId,根据这个 messageId 来把上述结构中的 Message 对象找到并移除)。

Ps:此处实现的 MQ,支持两种应答模式的 ACK

  1. 自动应答:消费者取了元素,整个消息就算是被应答了,此时整个消息就可以被干掉了。
  2. 手动应答:消费者取了元素,这个消息不算被应答,需要消费者主动再调用一个 basicAck 方法,此时才认为是真正应答了,才能删除这个消息。

1.2、实现 MemoryDataCenter 类

1.2.1、ConcurrentHashMap 数据管理

这里就是用 ConcurrentHashMap 来对上述数据进行统一内存管理.

    //key 是 exchangeName, value 是 Exchange 对象
    private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
    //key 是 queueName, value 是 MSGQueue 对象
    private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
    //第一个 key 是 exchangeName,第二个 key 是 queueName
    private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
    //key 是 messageId ,value 是 Message 对象
    private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
    //key 是 queueName , value 是 Message 的链表
    private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
    // 第一个 key 是 queueName, 第二个 key 是 messageId
    private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();

1.2.2、封装交换机操作

主要就是对 exchangeMap 插入、获取、删除交换机.


    public void insertExchange(Exchange exchange) {
        exchangeMap.put(exchange.getName(), exchange);
        System.out.println("[MemoryDataCenter] 新交换机添加成功! exchangeName=" + exchange.getName());
    }

    public Exchange getExchange(String exchangeName) {
        return exchangeMap.get(exchangeName);
    }

    public void deleteExchange(String exchangeName) {
        exchangeMap.remove(exchangeName);
        System.out.println("[MemoryDataCenter] 交换机删除成功! exchangeName=" + exchangeName);
    }

1.2.3、封装队列操作

主要就是对 queueMap 插入、获取、删除队列.

    public void insertQueue(MSGQueue queue) {
        queueMap.put(queue.getName(), queue);
        System.out.println("[MemoryDataCenter] 新队列添加成功! queueName=" + queue.getName());
    }

    public MSGQueue getQueue(String queueName) {
        return queueMap.get(queueName);
    }

    public void deleteQueue(String queueName) {
        queueMap.remove(queueName);
        System.out.println("[MemoryDataCenter] 队列删除成功! queueName=" + queueName);
    }

1.2.4、封装绑定操作

这里值得注意的是加锁逻辑,并不是加了锁就一定安全,也不是说不加锁就一定不安全,如果这段代码前后逻辑性很强,需要打包成一个原子性的操作,那就可以进行加锁,如果不是那么强的因果,就没必要,因为加锁也是需要开销的,加锁之后的锁竞争更是一个时间消耗。

    public void insertBinding(Binding binding) throws MqException {
//        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
//        if(bindingMap == null) {
//            bindingMap = new ConcurrentHashMap<>();
//            bindingsMap.put(binding.getExchangeName(), bindingMap);
//        }
        //上面这段逻辑可以用以下代码来替换
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
                k -> new ConcurrentHashMap<>());

        synchronized(bindingMap) {
            //再根据 queueName 查一下,只有不存在的时候才能插入,存在就抛出异常
            if(bindingMap.get(binding.getQueueName()) != null) {
                throw new MqException("[MemoryDataCenter] 绑定已经存在! exchangeName=" + binding.getExchangeName() +
                        ", queueName=" + binding.getQueueName());
            }
            bindingMap.put(binding.getQueueName(), binding);
        }
        System.out.println("[MemoryDataCenter] 新绑定添加成功!exchangeName=" + binding.getExchangeName() +
                ", queueName=" + binding.getQueueName());

    }

    /**
     * 获取绑定有两个版本
     * 1.根据 exchangeName 和 queueName 确定唯一一个 Binding
     * 2.根据 exchangeName 获取到所有的 Binding
     * @param exchangeName
     * @param queueName
     * @return
     */
    public Binding getBinding(String exchangeName, String queueName) throws MqException {
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
        if(bindingMap == null) {
            throw new MqException("[MemoryDataCenter] 绑定不存在!exchangeName=" + exchangeName +
                    ", queueName=" + queueName);
        }
        return bindingMap.get(queueName);
    }

    public ConcurrentHashMap<String, Binding> getBindings(String exchangName) {
        return bindingsMap.get(exchangName);
    }

    public void deleteBinding(Binding binding) throws MqException {
        ConcurrentHashMap<String, Binding>  bindingMap = bindingsMap.get(binding.getExchangeName());
        //这里操作不是很关键,因此可以不用加锁(加锁不一定就安全,也不是说不加锁就一定不安全,要结合实际场景)
        //如果这段代码前后逻辑性很强,需要打包成一个原子性的操作,那就可以进行加锁,如果不是那么强的因果,就没必要,因为加锁也是需要开销的,加锁之后的锁竞争更是一个时间消耗
        if(bindingMap == null) {
            throw new MqException("[MemoryDataCenter] 绑定不存在!exchangeName=" + binding.getExchangeName() +
                    ", queueName=" + binding.getQueueName());
        }
        bindingMap.remove(binding.getQueueName());
        System.out.println("[MemoryDataCenter] 绑定删除成功!exchangeName=" + binding.getExchangeName() +
                ", queueName=" + binding.getQueueName());
    }

1.2.5、封装消息操作

这里值得注意的是 LinkedList 是线程不安全的,要特殊处理.

    /**
     * 添加消息
     * @param message
     */
    public void addMessage(Message message) {
        messageMap.put(message.getMessageId(), message);
        System.out.println("[MemoryDataCenter] 新消息添加成功!messageId=" + message.getMessageId());
    }

    /**
     * 根据 id 查询消息
     * @param messageId
     */
    public Message selectMessage(String messageId) {
        return messageMap.get(messageId);
    }

    /**
     * 根据 id 删除消息
     * @param messageId
     */
    public void removeMessage(String messageId) {
        messageMap.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息被移除!messageId=" + messageId);
    }

    /**
     * 发送消息到指定队列
     * @param message
     */
    public void sendMessage(MSGQueue queue, Message message) {
        //先根据队列名字找到指定的链表
        LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>());
        //LinkedList 是线程不安全的
        synchronized (messages) {
            messages.add(message);
        }
        //这里把消息在消息中心也插入一下。即使 message 在消息中心存在也没关系
        //因为相同的 messageId 对应的 message 的内容一定是一样的(服务器不会修改 Message 的内容)
        addMessage(message);
        System.out.println("[MemoryDataCenter] 消息被投递到队列当中!messageId=" + message.getMessageId());
    }

    /**
     * 从队列中取消息
     * @param queueName
     * @return
     */
    public Message pollMessage(String queueName) {
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        if(messages == null) {
            return null;
        }
        synchronized (messages) {
            if(messages.size() == 0) {
                return null;
            }
            //链表中有消息就进行头删
            Message currentMessage = messages.remove(0);
            System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId=" + currentMessage.getMessageId());
            return currentMessage;
        }
    }

    /**
     * 获取指定队列的消息个数
     * @param queueName
     * @return
     */
    public int getMessageCount(String queueName) {
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        if(messages == null) {
            return 0;
        }
        synchronized (messages) {
            return messages.size();
        }
    }

1.2.6、封装未确认消息操作

“未被确认” 的消息:使用嵌套的 ConcurrentHashMap,key 是 queueName,value 是 ConcurrentHashMap(key 是 messageId,value 是 Message 对象,后续实现消息确认的逻辑,需要根据 ack 响应的内容,会提供一个确认的 messageId,根据这个 messageId 来把上述结构中的 Message 对象找到并移除)。

    /**
     * 添加未确认的消息
     * @param queueName
     * @param message
     */
    public void addMessageWaitAck(String queueName, Message message) {
        ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,
                k -> new ConcurrentHashMap<>());
        messageHashMap.put(message.getMessageId(), message);
        System.out.println("[MemoryDataCenter] 消息进入待确认队列!messageId=" + message.getMessageId());
    }

    /**
     * 删除未确认的消息
     * @param messageId
     */
    public void removeMessageWaitAck(String queueName, String messageId) {
        ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
        if(messageHashMap == null) {
            return;
        }
        messageHashMap.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息从待确认队列中删除!messageId=" + messageId);
    }

    public Message getMessageWaitAck(String queueName, String messageId) {
        ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
        if(messageHashMap == null) {
            return null;
        }
        return messageHashMap.get(messageId);
    }

1.2.7、封装恢复数据操作

从硬盘上读取数据,把硬盘中之前持久化存储的各个维度的数据都恢复到内存中.


    public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {
        //1.先清空之前所有的数据
        exchangeMap.clear();
        queueMap.clear();
        bindingsMap.clear();
        messageMap.clear();
        queueMessageMap.clear();
        //2.恢复所有的交换机数据
        List<Exchange> exchanges = diskDataCenter.selectAllExchanges();
        for(Exchange exchange : exchanges) {
            exchangeMap.put(exchange.getName(), exchange);
        }
        //3.恢复所有的队列数据
        List<MSGQueue> queues = diskDataCenter.selectAllQueue();
        for(MSGQueue queue : queues) {
            queueMap.put(queue.getName(), queue);
        }
        //4.恢复所有绑定数据
        List<Binding> bindings = diskDataCenter.selectAllBindings();
        for(Binding binding : bindings) {
            ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
                    k -> new ConcurrentHashMap<>());
            bindingMap.put(binding.getQueueName(), binding);
        }
        //5.恢复所有的消息数据
        for(MSGQueue queue : queues) {
            LinkedList<Message> messages = diskDataCenter.loadAllMessagesFromQueue(queue.getName());
            queueMessageMap.put(queue.getName(), messages);
            //遍历所有的队列,根据每个队列名字。来恢复所有消息
            for(Message message : messages) {
                messageMap.put(message.getMessageId(), message);
            }
        }

    }

Ps;“未确认的消息” 这部分数据不需要从硬盘中恢复,之前硬盘存储也没有考虑过这里~

一旦在等待 ack 的过程中,服务器重启了,这些 “未被确认的消息” 就恢复成了 “未被取走的消息”,这个消息在硬盘上存储的时候,就是当作 “未被取走”。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/903334.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习深度学习——NLP实战(情感分析模型——数据集)

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位即将上大四&#xff0c;正专攻机器学习的保研er &#x1f30c;上期文章&#xff1a;机器学习&&深度学习——BERT&#xff08;来自transformer的双向编码器表示&#xff09; &#x1f4da;订阅专栏&#xff1a;机器…

【数据结构】_7.二叉树

目录 1.树形结构 1.1 树的概念 1.2 树的相关概念 1.3 树的表示 1.4 树在实际中的应用—表示文件系统的目录树结构 ​编辑​2.二叉树 2.1 概念 2.2 特殊二叉树 2.3 二叉树的性质 2.4 二叉树的存储结构 2.4.1 顺序存储结构&#xff08;数组存储结构&#xff09; 2.4.2…

LeetCode--HOT100题(36)

目录 题目描述&#xff1a;146. LRU 缓存&#xff08;中等&#xff09;题目接口解题思路代码 PS: 题目描述&#xff1a;146. LRU 缓存&#xff08;中等&#xff09; 请你设计并实现一个满足 LRU (最近最少使用) 缓存 约束的数据结构。 实现 LRUCache 类&#xff1a; LRUCache…

引人关注的领域 ---- 信号稀疏表示

本篇文章是博主在人工智能等领域学习时&#xff0c;用于个人学习、研究或者欣赏使用&#xff0c;并基于博主对人工智能等领域的一些理解而记录的学习摘录和笔记&#xff0c;若有不当和侵权之处&#xff0c;指出后将会立即改正&#xff0c;还望谅解。文章分类在学习摘录和笔记专…

redis实战-缓存数据解决缓存与数据库数据一致性

缓存的定义 缓存(Cache),就是数据交换的缓冲区,俗称的缓存就是缓冲区内的数据,一般从数据库中获取,存储于本地代码。防止过高的数据访问猛冲系统,导致其操作线程无法及时处理信息而瘫痪&#xff0c;这在实际开发中对企业讲,对产品口碑,用户评价都是致命的;所以企业非常重视缓存…

基于蜉蝣算法优化的BP神经网络(预测应用) - 附代码

基于蜉蝣算法优化的BP神经网络&#xff08;预测应用&#xff09; - 附代码 文章目录 基于蜉蝣算法优化的BP神经网络&#xff08;预测应用&#xff09; - 附代码1.数据介绍2.蜉蝣优化BP神经网络2.1 BP神经网络参数设置2.2 蜉蝣算法应用 4.测试结果&#xff1a;5.Matlab代码 摘要…

MySQL8.0.26-Linux版安装

MySQL8.0.26-Linux版安装 1. 准备一台Linux服务器 云服务器或者虚拟机都可以; Linux的版本为 CentOS7; 2. 下载Linux版MySQL安装包 MySQL :: Download MySQL Community Server (Archived Versions) 3. 上传MySQL安装包 4. 创建目录,并解压 mkdir mysql ​ tar -xvf mysql-8…

网站老域名跳转到新域名有哪些方法?内网穿透内网主机让外网访问

在网站服务器变更及本地主机搭建时&#xff0c;我们经常会遇到老域名地址跳转到新URL的配置&#xff0c;一些朋友还会面对无公网IP让外网访问的问题。今天我们来了解下网站老域名跳转到新域名有哪些方法&#xff0c;以及如何通过内网穿透实现内网主机让外网访问。 网站老域名跳…

【Unity小技巧】Unity2D TileMap的探究(最简单,最全面的TileMap使用介绍)

文章目录 前言介绍一、TileMap简单的使用1、创建Unity工程2、Tilemap的使用2.1、导入素材图片2.2、切割图片2.3、创建画板2.4、创建瓦片2.5、创建网格2.6、在网格上刷瓦片2.7、解决瓦片没有占满格子的问题2.8、解决瓦片之间有缝隙的问题2.9、擦除瓦片2.10、区域瓦片绘制2.11、瓦…

适合上班族做的4个低门槛的副业兼职

对于大多数职场中人来说&#xff0c;如果没有在30岁之后获得晋升&#xff0c;获得更好的发展平台&#xff0c;可能就会感到工作缺乏足够的吸引力了。当我们只有一份工作的时候&#xff0c;就好比把鸡蛋放在一个篮子里&#xff0c;把自己的青春放在一家公司里。这也就好比单一的…

华盛顿大学Baker实验室率先设计出双稳态结构蛋白质

在蛋白质世界&#xff0c;“结构决定功能”是一条基本原则。因此&#xff0c;很多人可能认为&#xff0c;一个蛋白质就应该有一个唯一确定的结构&#xff0c;使得它能够去执行确定的生物学功能。其实&#xff0c;在真实的世界中&#xff0c;蛋白质大多都是处于一种不断起伏的动…

MongDB【CRUD练习-条件查询-文档关系】

练习1-CRUD // 进入test数据库 use test; // 查询文档内容 db.students.find(); // 显示当前数据库中所有集合 show collections; // 向数据库的user集合中插入一个文档 db.users.insertOne({username: "lyh"} ); // 查看当前数据库中所有的集合 发现users集合被创建…

信号处理--基于EEG脑电信号的眼睛状态的分析

本实验为生物信息学专题设计小项目。项目目的是通过提供的14导联EEG 脑电信号&#xff0c;实现对于人体睁眼和闭眼两个状态的数据分类分析。每个脑电信号的时长大约为117秒。 目录 加载相关的库函数 读取脑电信号数据并查看数据的属性 绘制脑电多通道连接矩阵 绘制两类数据…

《强化学习:原理与Python实战》——可曾听闻RLHF

前言&#xff1a; RLHF&#xff08;Reinforcement Learning with Human Feedback&#xff0c;人类反馈强化学习&#xff09;是一种基于强化学习的算法&#xff0c;通过结合人类专家的知识和经验来优化智能体的学习效果。它不仅考虑智能体的行为奖励&#xff0c;还融合了人类专家…

LeetCode算法递归类—二叉树的右视图

目录 199. 二叉树的右视图 题解&#xff1a; 目标&#xff1a; 思路&#xff1a; 过程&#xff1a; 代码&#xff1a; 运行结果&#xff1a; 给定一个二叉树的 根节点 root&#xff0c;想象自己站在它的右侧&#xff0c;按照从顶部到底部的顺序&#xff0c;返回从右侧所…

使用渲染纹理(RenderTexture)来实现3D视觉

如上图的效果&#xff0c;使用2D场景&#xff0c;通过摄像头的改动&#xff0c;使其看起来像是3D效果一样。 Nintendo Switch上刚推出的《超级马里奥》中&#xff0c;有一些关卡混合了2D和3D的画面&#xff0c; 一般来说&#xff0c;摄像机会直接渲染到电脑屏幕&#xff1b;…

【C语言练习】数组OJ题

目录 一.消失的数字思路1&#xff1a;思路2&#xff1a; 二.移除元素三.轮转数组四.删除有序数组中的重复项五.合并两个有序数组 一.消失的数字 题目&#xff1a; 思路1&#xff1a; 数组是从0加到N&#xff0c;所以把0到N的数加起来减去数组中的值&#xff0c;结果就是消失…

阿里云ECS服务器和轻量应用服务器区别?怎么选择?

阿里云轻量应用服务器和云服务器ECS有什么区别&#xff1f;ECS是专业级云服务器&#xff0c;轻量应用服务器是轻量级服务器&#xff0c;轻量服务器使用门槛更低&#xff0c;适合个人开发者或中小企业新手使用&#xff0c;可视化运维&#xff0c;云服务器ECS适合集群类、高可用、…

Cadence 仿真

电路的学习离不开仿真和实践&#xff0c;主流的仿真软件有很多&#xff0c;有用功pretues和mutisim&#xff0c;这次主要是工作中使用cadence&#xff0c;而且带有比较强大的仿真工具Pspice&#xff0c;所以选择了它 &#xff0c;我使用的版本是cadence 17.4自带的。官方提供了…

ViT模型架构和CNN区别

目录 Vision Transformer如何工作 ViT模型架构 ViT工作原理解析 步骤1&#xff1a;将图片转换成patches序列 步骤2&#xff1a;将patches铺平 步骤3&#xff1a;添加Position embedding 步骤4&#xff1a;添加class token 步骤5&#xff1a;输入Transformer Encoder 步…