从零手搓一个【消息队列】实现数据的硬盘管理和内存管理(线程安全)

news2024/11/24 10:59:20

文章目录

  • 一、硬盘管理
    • 1, 创建 DiskDataCenter 类
    • 2, init() 初始化
    • 3, 封装交换机
    • 4, 封装队列
    • 5, 关于绑定
    • 6, 关于消息
  • 二、内存管理
    • 1, 数据结构的设计
    • 2, 创建 MemoryDataCenter 类
    • 3, 关于交换机
    • 4, 关于队列
    • 5, 关于绑定
    • 6, 关于消息
    • 7, 恢复数据
  • 三、小结


创建 Spring Boot 项目, Spring Boot 2 系列版本, Java 8 , 引入 MyBatis, Lombok 依赖

提示:是正在努力进步的小菜鸟一只,如有大佬发现文章欠佳之处欢迎批评指点~ 废话不多说,直接上干货!

整体目录结构 :
在这里插入图片描述

本文主要实现 server 包


一、硬盘管理

硬盘管理包括第二篇文章中实现的 DataBaseManager 类, 实现了数据库存储(交换机, 队列, 绑定)和第三篇文章中实现的 MessageFileManager 类, 实现了文件存储(消息)

接下来使用 DiskDataCenter 这个类, 对上述两个类进一步的整合, 封装, 统一对上层提供数据库和文件操作的 API

在这里插入图片描述

整合的好处
上层代码(服务器中的 VirtualHost 在操作硬盘数据的时候, 直接调用 DiskDataCenter 类的 API 即可, 不需要关心硬盘上的数据来自数据库还是文件, 也不需要知道具体是怎么实现的)


1, 创建 DiskDataCenter 类

要有两个成员属性, 分别是 dataBaseManager 对象和 messageFileManager 对象, 我们要持有这两个类的 API, 才能进一步封装

public class DiskDataCenter {
    private DataBaseManager dataBaseManager = new DataBaseManager();
    private MessageFileManager messageFileManager = new MessageFileManager();
}

2, init() 初始化

dataBaseManager 中需要对数据库进行初始化, 所以在当前类提供一个初始化 API

    public void init() {
        dataBaseManager.init();
    }

3, 封装交换机

    public void insertExchange(Exchange exchange) {
        dataBaseManager.insertExchange(exchange);
    }

    public void deleteExchange(String exchangeName) {
        dataBaseManager.deleteExchange(exchangeName);
    }

    public List<Exchange> selectAllExchanges() {
        return dataBaseManager.selectAllExchanges();
    }


4, 封装队列

创建队列的时候要创建对应的目录( queue 目录)和文件( data 文件和 stat 文件), 同理, 删除的时候和也要删除对应的目录和文件

	public void insertQueue(MessageQueue queue) throws IOException {
        dataBaseManager.insertQueue(queue);
        messageFileManager.createQueueFiles(queue.getName());
    }

    public void deleteQueue(String queueName) throws IOException {
        dataBaseManager.deleteQueue(queueName);
        messageFileManager.deleteFiles(queueName);
    }

    public List<MessageQueue> selectAllQueue() {
        return dataBaseManager.selectAllQueues();
    }

5, 关于绑定

	public void insertBinding(Binding binding) {
        dataBaseManager.insertBinding(binding);
    }
    
    public void deleteBinding(Binding binding) {
        dataBaseManager.deleteBinding(binding);
    }
    
    public List<Binding> selectAllBindings() {
        return dataBaseManager.selectAllBindings();
    }

6, 关于消息

删除消息的时候顺便查一下 stat 文件, 如果需要GC, 就调用实现GC的方法(读一下 stat 文件并不是很耗时)

public void sendMessage(MessageQueue queue, Message message) throws IOException, MQException {
        messageFileManager.sendMessage(queue, message);
    }

    public void deleteMessage(MessageQueue queue, Message message) throws IOException, MQException {
        messageFileManager.deleteMessage(queue, message);
        // 这是逻辑删除, 而不是真正从文件中删掉这个数据, 并且需要判断是否需要进行垃圾回收
        if (messageFileManager.isNeedGC(queue.getName())) {
            messageFileManager.gc(queue);
        }
    }

    public LinkedList<Message> selectAllMessages(String queueName) throws MQException, IOException {
        return messageFileManager.loadAllMessageFromQueue(queueName);
    }

二、内存管理

硬盘上存储数据, 只是为了实现 “持久化” 这样的效果. 但是实际的消息存储/转发, 还是主要靠内存的结构

对于 MQ 来说, 内存部分是更关键的, 内存速度更快, 可以达成更⾼的并发

对于交换机, 队列, 绑定, 消息的查找, 直接在内存中找, 增加, 删除需要在管理内存的同时也管理硬盘
在这里插入图片描述


1, 数据结构的设计

需要在内存中存储交换机, 队列, 绑定, 交换机, 就需要使用合适的数据结构来辅助我们更方便的存储和管理数据, 并且还需要保证线程安全

  • 交换机
    使用 ConcurrentHashMap<String, Exchange>, key: exchangeName, value: exchange
  • 队列
    使用 ConcurrentHashMap<String, MessageQueue>, key: queueName, value: queue
  • 绑定
    使用(嵌套的) ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>>, key: exchangeName, value: [key: queueName, value: binding]

生产者发布消息的时候, 要指定一个 exchangeName, 服务器需要拿着这个 exchangeName, 在绑定表里查有没有已经绑定过的队列, 如果有队列, 并且符合 bindingKey 和 routingKey 的匹配规则, 才能转发消息
绑定是从交换机和队列这两个维度建立的, 所以使用嵌套的 Map 存储绑定表

  • 消息
    使用 ConcurrentHashMap<String, Message>, key: messageId, value: message

  • 在队列里的 N 条消息
    使用 ConcurrentHashMap<String, LinkedList< Message>>, key: queueName, value: 该队列中存储消息的链表

  • 在队列里的 N 条未确认的消息( “待确认队列” )
    使用(嵌套的) ConcurrentHashMap<String, ConcurrentHashMap<String, Message>>, key: queueName, value: [key: messageId, value: message]

  1. 之前的文章说明过, 消费者消费消息采用"推"的方式, 即: 队列中有消息之后, 服务器主动推送给订阅了该队列的消费者
  2. 推送之后, 如果消费者是手动应答
  3. 在消费者还没应答之前, 服务器视为消费者还没成功消费消息, 需要备份这条消息, 所以这个嵌套的Map 相当于一个 “待确认队列”
  4. 消费者确认应答之后, 服务器再从这个 “待确认队列” 中删除该消息

在这里插入图片描述

2, 创建 MemoryDataCenter 类

public class MemoryDataCenter {

    private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, MessageQueue> queueMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
	private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, LinkedList<Message>> messageInQueueMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> messageInQueueNotAckMap = new ConcurrentHashMap<>();
}

3, 关于交换机

    public void addExchange(Exchange exchange) {
        exchangeMap.put(exchange.getName(), exchange);
        System.out.println("[MemoryDataCenter.addExchange()] " + exchange.getName() + "交换机添加成功");
    }

    public Exchange getExchange(String exchangeName) {
        return exchangeMap.get(exchangeName);
    }

    public void removeExchange(String exchangeName) {
        exchangeMap.remove(exchangeName);
        System.out.println("[MemoryDataCenter.removeExchange()] " + exchangeName + "交换机删除成功");
    }

4, 关于队列

	public void addQueue(MessageQueue queue) {
        queueMap.put(queue.getName(), queue);
        System.out.println("[MemoryDataCenter.addQueue()] " + queue.getName() + "队列添加成功");
    }

    public MessageQueue getQueue(String queueName) {
        return queueMap.get(queueName);
    }

    public void removeQueue(String queueName) {
        queueMap.remove(queueName);
        System.out.println("[MemoryDataCenter.removeQueue()] " + queueName + "队列删除成功");

    }

5, 关于绑定

  • 1, 添加绑定
    注意 bindingsMap 和 bindingMap 不同, bindingMap 表示 bindingsMap 的 value 值
    使用 bindingsMap.computeIfAbsent(key) 可以创建 bindingsMap 的 value 值( bindingMap )
    public void addBinding(Binding binding) throws MQException {
        String exchangeName = binding.getExchangeName();
        String queueName = binding.getQueueName();
//        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
//        if (bindingMap == null) {
//            bindingMap = new ConcurrentHashMap<>();
//            bindingsMap.put(binding.getExchangeName(), bindingMap);
//        }
        // 1, 先查是否存在exchange绑定的queue, 没有则创建
        ConcurrentHashMap<String, Binding> bindingMap =
                bindingsMap.computeIfAbsent(exchangeName, V -> new ConcurrentHashMap<>());
        // 2, 再查exchange和queue是否已经存在绑定, 没有创建
        synchronized (bindingMap) {
            if (bindingMap.get(exchangeName) != null) {
                // 如果 exchange-queue 这个绑定已经存在, 不能再插入
                throw new MQException("[MemoryDataCenter.addBinding()] exchangeName = " + queueName
                        + "-queueName = " + queueName + "的绑定已经存在, 添加失败");
            }
            bindingMap.put(queueName, binding);
        }

        System.out.println("[MemoryDataCenter.addBinding()] exchangeName = " + exchangeName
                + "-queueName = " + queueName + "的绑定已经添加成功");
    }
  • 2, 获取绑定(根据交换机和队列获取绑定)
    public Binding getBinding(String exchangeName, String queueName) {
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
        if (bindingMap == null) {
            return null;
        }
        return bindingMap.get(queueName);
    }
  • 3, 获取绑定(根据交换机, 获取该交换机的所有绑定关系)
    public ConcurrentHashMap<String, Binding> getBindings(String exchangeName){
        return bindingsMap.get(exchangeName);
    }
  • 4, 删除绑定
public void removeBinding(Binding binding) throws MQException {
        // 1, 先判断该交换机是否存在绑定
        String exchangeName = binding.getExchangeName();
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
        if (bindingMap == null) {
            throw new MQException("[MemoryDataCenter.removeBinding()] exchangeName = " + exchangeName + "该交换机的绑定不存在, 删除失败");
        }
        // 2, 交换机不存在绑定才能删除
        bindingsMap.remove(binding.getExchangeName());
        System.out.println("[MemoryDataCenter.removeBinding()] exchangeName = " + exchangeName + "该交换机的绑定已删除成功");
    }

6, 关于消息

  • 关于 messageMap
    public void addMessage(Message message) {
        messageMap.put(message.getMessageId(), message);
        System.out.println("[MemoryDataCenter.addMessage()] 消息添加成功");
    }

    public Message getMessage(String messageId) {
        return messageMap.get(messageId);
    }

    public void removeMessage(String messageId){
        messageMap.remove(messageId);
        System.out.println("MemoryDataCenter.removeMessage()] 消息删除成功");
    }
  • 关于 messageInQueueMap
	public void sendMessage(MessageQueue queue, Message message) {
        LinkedList<Message> messageList = messageInQueueMap.computeIfAbsent(queue.getName(), V -> new LinkedList<>());
        synchronized (messageList) {
            messageList.add(message); // 尾插
        }
        addMessage(message);
        System.out.println("[MemoryDataCenter.sendMessage()] 消息发送成功");
    }
    
    /**
     * 从队列中取走消息, 但不代表消息在内存中没了, 在 messageMap 中还保留
     */
    public Message pollMessage(MessageQueue queue) {
        LinkedList<Message> messageList = messageInQueueMap.get(queue.getName());
        if (messageList == null || messageList.size() == 0) {
            return null;
        }
        synchronized (messageList) {
            Message result = messageList.removeFirst(); // 头删
            System.out.println("[MemoryDataCenter.pollMessage()] 消息取出成功");
            return result;
        }
    }
    
    /**
     * 获取队列中的消息总数
     */
    public int getMessageCount(MessageQueue queue) {
        LinkedList<Message> messageList = messageInQueueMap.get(queue.getName());
        if (messageList == null) {
            return 0;
        }
        return messageList.size();
    }
  • 关于 messageInQueueNotAckMap
	public void addMessageNotAck(String queueName, Message message) {
        ConcurrentHashMap<String, Message> messageNotAckMap = messageInQueueNotAckMap.computeIfAbsent(queueName,
                V -> new ConcurrentHashMap<>());
        messageNotAckMap.put(message.getMessageId(), message);
        System.out.println("[MemoryDataCenter.addMessageNotAck()] 未确认消息添加成功");
    }
    
    public void removeMessageNotAck(String queueName, String messageId) {
        ConcurrentHashMap<String, Message> messageNotAckMap = messageInQueueNotAckMap.get(queueName);
        if (messageNotAckMap == null) {
            return;
        }
        messageNotAckMap.remove(messageId);
        System.out.println("[MemoryDataCenter.removeMessageNotAck()] 未确认消息删除成功");
    }
    
    public Message getMessageNotAck(String queueName, String messageId) {
        ConcurrentHashMap<String, Message> messageNotAckMap = messageInQueueNotAckMap.get(queueName);
        if (messageNotAckMap == null) {
            return null;
        }
        return messageNotAckMap.get(messageId);
    }

7, 恢复数据

当服务器重启时, 需要把硬盘上的数据恢复到内存中

	public void recover(DiskDataCenter diskDataCenter) throws MQException, IOException {
        // 1, 清楚所有内存数据(防止残留)
        exchangeMap.clear();
        queueMap.clear();
        bindingsMap.clear();
        messageMap.clear();
        messageInQueueMap.clear();
        messageInQueueNotAckMap.clear();
        // 2. 从硬盘中获取数据并存储在内存
        // 2.1, 获取交换机数据
        List<Exchange> exchangeList =  diskDataCenter.selectAllExchanges();
        for (Exchange exchange : exchangeList) {
            exchangeMap.put(exchange.getName(), exchange);
        }
        // 2.2, 获取队列数据
        List<MessageQueue> queueList = diskDataCenter.selectAllQueue();
        for (MessageQueue queue : queueList) {
            queueMap.put(queue.getName(), queue);
        }
        // 2.3, 获取绑定数据
        List<Binding> bindingList = diskDataCenter.selectAllBindings();
        for (Binding binding : bindingList) {
            ConcurrentHashMap<String, Binding> bindingMap =
                    bindingsMap.computeIfAbsent(binding.getExchangeName(), V -> new ConcurrentHashMap<>());
            bindingMap.put(binding.getQueueName(), binding);
        }
        // 2.4, 获取消息数据
        for (MessageQueue queue : queueList) {
            LinkedList<Message> messageList = diskDataCenter.selectAllMessages(queue.getName());
            messageInQueueMap.put(queue.getName(), messageList);
            for (Message message : messageList) {
                messageMap.put(message.getMessageId(), message);
            }
        }
    }

恢复消息数据, 只需要恢复 messageInQueueMap 和 messageMap , 不需要关心 messageInQueueNotAckMap 因为 messageInQueueNotAckMap 只是在内存中存储, 如果服务器重启, 这块数据丢失了就丢失了, 因为 “未确认的消息” 不会被服务器真正的删除, 重启之后, “未确认的消息” 会被重新加载回内存


三、小结

本文主实现了两点:

  • 1, 实现了对数据库+文件这两个模块的进一步整合, 封装, 为上层提供了硬盘数据管理的 API
  • 2, 实现了内存数据管理, 并考虑了线程安全
    • 2.1, 设计了消息在内存中存储使用的数据结构
    • 2.2, 实现了内存中的交换机, 队列, 绑定, 消息, 以及消息和队列的关联, 这些数据的增删查

至此, 服务器对于硬盘数据和内存数据都做好了管理, 下篇文章就可以设计 VirtualHost 了, 需要对硬盘数据和内存数据再做进一步的整合, 封装, 统一管理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1054079.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Xcode 超简单实用小秘技让撸码进入新境界

概览 Xcode 是开发  应用不可或缺的王牌主力军&#xff0c;虽然 Xcode 中一些常用使用姿势想必大家都已驾轻就熟&#xff0c;但其中仍有一些隐藏宝藏小技巧不为人知。 充分挖掘它们可以极大加速和方便秃头码农们日常的撸码行为。 一般的&#xff0c;它们分为两类&#xff…

机器人制作开源方案 | 四轴飞行器

1. 概述 基于探索者搭建的模块化四轴飞行器研究平台&#xff0c;采用独特的设计方式&#xff0c;可实现在室内完成对四轴飞行器、无人机等运动控制的原理研究&#xff0c;以及学习飞行控制的原理知识。 2. 组装 请按照下图进行机架的组装。 整体图 请解压文末资料中的 /软件/Mi…

前端JavaScript入门到精通,javascript核心进阶ES6语法、API、js高级等基础知识和实战 —— Web APIs(一)

思维导图 学习目标 变量声明 一、Web API 基本认知 作用和分类 什么是DOM DOM树 DOM对象 二、获取DOM对象 三、操作元素内容 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta http-equiv"X-UA-Compa…

基于Java的实验室预约管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09;有保障的售后福利 代码参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作…

【数据结构】选择排序 堆排序(二)

目录 一&#xff0c;选择排序 1&#xff0c;基本思想 2&#xff0c; 基本思路 3&#xff0c;思路实现 二&#xff0c;堆排序 1&#xff0c;直接选择排序的特性总结&#xff1a; 2&#xff0c;思路实现 3&#xff0c;源代码 最后祝大家国庆快乐&#xff01; 一&#xf…

设计模式8、装饰者模式 Decorator

解释说明&#xff1a;动态地给一个对象增加一些额外的职责。就扩展功能而言&#xff0c;装饰模式提供了一种比使用子类更加灵活的替代方案 抽象构件&#xff08;Component&#xff09;&#xff1a;定义一个抽象接口以规范准备收附加责任的对象 具体构件&#xff08;ConcreteCom…

10OpenMP

OpenMP概述 通过线程实现并行化&#xff0c;与Pthread一样&#xff0c;是基于线程的共享内存库 与Pthread的不同 简而言之&#xff1a; Pthread更加底层&#xff0c;需要用户自己定义每一个线程的行为&#xff0c;OpenMP虽然更加简单&#xff0c;但是底层的线程交互实现很难 …

【单片机】13-实时时钟DS1302

1.RTC的简介 1.什么是实时时钟&#xff08;RTC&#xff09; &#xff08;rtc for real time clock) &#xff08;1&#xff09;时间点和时间段的概念区分 &#xff08;2&#xff09;单片机为什么需要时间点【一定的时间点干什么事情】 &#xff08;3&#xff09;RTC如何存在于…

【Git】Git 原理和使用

Git 一、Git 本地仓库1. 本地仓库的创建2. 配置 Git3. 工作区、暂存区、版本库4. 添加文件5. 查看 .git 文件6. 修改文件7. 版本回退8. 撤销修改9. 删除文件 二、分支管理1. 理解分支2. 创建分支3. 切换分支4. 合并分支5. 删除分支6. 合并冲突7. 分支管理策略8. bug 分支9. 强制…

基于Java的厨艺交流平台设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09;有保障的售后福利 代码参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作…

JavaScript Web APIs第三天笔记

Web APIs - 第3天 进一步学习 事件进阶&#xff0c;实现更多交互的网页特效&#xff0c;结合事件流的特征优化事件执行的效率 掌握阻止事件冒泡的方法理解事件委托的实现原理 事件流 事件流是对事件执行过程的描述&#xff0c;了解事件的执行过程有助于加深对事件的理解&…

数据结构刷题(三十三):完全背包最小值情况。322. 零钱兑换、279. 完全平方数

题目一&#xff1a; 322. 零钱兑换https://leetcode.cn/problems/coin-change/ 思路&#xff1a;完全背包问题&#xff0c;求解最小组合数。dp[j]&#xff1a;凑足总额为j所需钱币的最少个数为dp[j]。同时需要确保凑足总金额为0所需钱币的个数一定是0&#xff0c;那么dp[0] 0…

如果只是用php纯做api的话,给移动端做数据接口,是否需要用php框架?

API接口对接是现代软件开发中不可或缺的一部分&#xff0c;它允许不同的应用程序之间进行数据交换和服务调用。在PHP中&#xff0c;可以使用多种方式实现API接口的对接&#xff0c;包括基于HTTP协议的传统方法以及现代的API客户端库客户端库客户端库等。 一、实现API接口的对接…

Web开发-登录页面设计流程

目录 确定页面设计样式创建js文件jquery.min.jsbootstrap.min.js 创建css文件bootstrap.min.cssmaterialdesignicons.min.cssstyle.min.css 创建ftl文件header.ftlfooter.ftllogin.ftlcss部分html部分 确定页面设计样式 可以自己用“画图”等软件进行设计&#xff0c;也可以打…

步进电机只响不转

我出现问题的原因是相位线接错。 我使用的滑台上示17H的步进电机&#xff0c;之前用的是57的步进电机。 57步进电机的相位线是A黑、A-绿、B红、B-蓝。 17步进电机的相位线是A红、A-绿、B黑、B-蓝。 这两天被一个问题困扰了好久&#xff0c;在调试步进电机开发板的时候电机发生…

ubuntu安装ROS

进官网&#xff0c;选版本&#xff0c;操作系统 ROS: Home 开始安装&#xff1a; noetic/Installation/Ubuntu - ROS Wiki Installation Configure your Ubuntu repositories Configure your Ubuntu repositories to allow "restricted," "universe,"…

银行金融科技岗笔试题资料大总结

程序员进银行科技岗——简单总结_银行程序员 无水印&#xff0c;可直接打印使用。 中国银行 通用资料 视频资料

IPV6(IPV6,RIPng的配置以及手工配置IPV4隧道)

目录 实验一&#xff1a;IPv6的基本配置 实验二&#xff1a;RIPng基本配置 RIPng RIPng的工作机制 实验三&#xff1a;手工配置IPV4隧道 实验一&#xff1a;IPv6的基本配置 案例如下&#xff1a; 各部分配置如下 配置路由器RTA <Huawei>sys Enter system view, …

【论文笔记】DiffusionTrack: Diffusion Model For Multi-Object Tracking

原文链接&#xff1a;https://arxiv.org/abs/2308.09905 1. 引言 多目标跟踪通常分为两阶段的检测后跟踪&#xff08;TBD&#xff09;和一阶段的联合检测跟踪&#xff08;JDT&#xff09;。TBD对单帧进行目标检测后&#xff0c;使用跟踪器跨帧关联相同物体。使用的跟踪器包括使…

【STM32基础 CubeMX】外部中断

文章目录 前言一、中断是什么二、使用CubeMX配置你的第一个中断三、代码分析CubeMX四、中断函数按键中断点灯示例代码总结 前言 当涉及到STM32基础的外部中断时&#xff0c;我们进入了一个引人入胜的领域&#xff0c;它允许微控制器与外部世界进行互动并实时响应各种事件。外部…