从零手搓一个【消息队列】实现虚拟主机的核心功能

news2025/1/15 23:34:07

文章目录

  • 一、虚拟主机设计
  • 二、实现虚拟主机
    • 1, 创建 VirtualHost 类
    • 2, VirtualHost() 构造方法
    • 3, exchangeDeclare() 创建交换机
    • 4, exchageDelete() 删除交换机
    • 5, queueDeclare() 创建队列
    • 6, queueDelete() 删除队列
    • 7, queueBind() 创建绑定
    • 8, queueUnBind() 删除绑定
    • 9, basicPublish() 发布消息
    • 10, basicSubscribe() 订阅消息
    • 11, basicAck() 确认应答
  • 三、实现交换机的转发/绑定规则
    • 1, 设计 bindingKey 和 routingKey
    • 2, checkBindingKey() 检查 bindingKey是否合法
    • 3, checkRoutingKey() 检查 routingKey 是否合法
    • 4, route() 判定转发规则
    • 5, routeFanout() 规定扇出交换机转发规则
    • 6, routeTopic() 规定主题交换机转发规则
  • 四、小结


创建 Spring Boot 项目, Spring Boot 2 系列版本, Java 8 , 引入 MyBatis, Lombok 依赖

提示:是正在努力进步的小菜鸟一只,如有大佬发现文章欠佳之处欢迎批评指点~ 废话不多说,直接上干货!

整体目录结构 :
在这里插入图片描述

本文主要实现 server 包中的 VirtualHost 类


一、虚拟主机设计

到本篇为止, 内存和硬盘的数据都已经组织完成. 接下来使⽤ “虚拟主机” 这个概念, 把这两部分的数据也串起来.并且实现⼀些 MQ 的核心 API

回顾 BrokerServer (中间人服务器) 中的核心概念 :

在这里插入图片描述

声明
在 RabbitMQ 中, 虚拟主机是可以随意创建/删除的. 此处为了实现简单, 并没有实现虚拟主机的管理. 因此我们默认就只有⼀个虚拟主机的存在. 但是在数据结构的设计上我们预留了对于多虚拟主机的管理

虚拟主机存在的目的, 就是为了隔离, 隔离不同业务线上的数据, 所有此处要考虑的是: 交换机和队列从属于虚拟主机中, 如何把不同虚拟主机中的交换机和队列区分开?
此处使用一个简单粗暴的方式, 我们设定交换机和队列的唯一身份标识加一个前缀, 这个前缀就是虚拟主机的唯一标识, 当交换机和队列区分开之后, 绑定和队列中的消息自然就区分开了

虚拟主机中需要的属性和方法 :
在这里插入图片描述

consumerManager 这个对象在下文介绍


二、实现虚拟主机

1, 创建 VirtualHost 类

  • virtualHostName 是虚拟主机的唯一身份标识
  • 虚拟主机要整合硬盘上的数据(数据库, 文件)和内存上的数据, 统一管理和使用, 所以引入 memoryDataCenterdiskDataCenter 这两个成员属性
  • router 用来定义交换机和队列之间的匹配规则和转发规则
  • consumerManager 是虚拟主机实现和消费者相关的 API 时的辅助对象, 这个对象中包含和消费者相关的 API, 仅供内部调用, 不对外暴露
  • exchangeLockqueueLock 是后续实现 API 时保证线程安全的锁对象
public class VirtualHost {
    private String virtualHostName;
    private MemoryDataCenter memoryDataCenter;
    private DiskDataCenter diskDataCenter;
    private Router router;
    private ConsumerManager consumerManager; 

    private final Object exchangeLock = new Object();
    private final Object queueLock = new Object();
}

所以在前几篇文章中介绍的, 文件管理和内存管理, 都是在某个虚拟主机中的, 如果有多个虚拟主机, 每个虚拟主机中都有对应的文件和内存
这篇文章介绍的文件管理
在这里插入图片描述
这篇文章介绍的内存管理
在这里插入图片描述


2, VirtualHost() 构造方法

在构造方法中实现对刚才定义的成员属性的初始化, 对数据库文件的初始化

并且有可能本次不是第一次启动, 而是重启, 就需要恢复硬盘上的数据到内存中

	public VirtualHost(String virtualHostName) {
        this.virtualHostName = virtualHostName;
        this.memoryDataCenter = new MemoryDataCenter();
        this.diskDataCenter = new DiskDataCenter();
        this.router = new Router();
        this.consumerManager = new ConsumerManager(this);

        diskDataCenter.init();
        try {
            memoryDataCenter.recover(diskDataCenter);
        } catch (MQException | IOException e) {
            System.out.println("[VirtualHost] 内存恢复数据失败");
            e.printStackTrace();
        }
    }

另外提供一些 getter()

	public MemoryDataCenter getMemoryDataCenter() {
        return memoryDataCenter;
    }

    public DiskDataCenter getDiskDataCenter () {
        return diskDataCenter;
    }

    public String getVirtualHostName() {
        return virtualHostName;
    }

3, exchangeDeclare() 创建交换机

此处包括下文的核心 API 中的参数都是参考了 RabbitMQ, 并在 这篇文章 中介绍了创建 Exchange, Queue, Binding, Message等核心类时, 已经说明了部分属性本项目中暂不实现

先判断交换机是否已经存在, 如果不存在则创建, 如果存在也不会抛异常, 直接返回即可
根据参数中的 durable 判断是否要写入硬盘

    /**
     * 创建交换机
     * @param exchangeName 名称(唯一标识)
     * @param exchangeTypeEnum 类型
     * @param durable 是否持久化存储
     * @param autoDelete 是否自动删除
     * @param arguments 配置参数
     * @return  已存在返回 true, 不存在则创建
     */
    public boolean exchangeDeclare(String exchangeName,
                                   ExchangeTypeEnum exchangeTypeEnum,
                                   boolean durable,
                                   boolean autoDelete,
                                   Map<String, Object> arguments) {
        exchangeName = virtualHostName + "-" + exchangeName;
        try {
            synchronized (exchangeLock) {
                // 1, 查询是否已经存在交换机
                Exchange exchangeExists = memoryDataCenter.getExchange(exchangeName);
                if (exchangeExists != null) {
                    System.out.println("[VirtualHost.exchangeDeclare()] exchangeName =  " +
                            exchangeName + "的交换机已存在, 创建失败");
                    return true;
                }
                // 2, 创建交换机
                Exchange exchange = new Exchange();
                exchange.setName(exchangeName);
                exchange.setType(exchangeTypeEnum);
                exchange.setDurable(durable);
                exchange.setAutoDelete(autoDelete);
                exchange.setArguments(arguments);
                // 3, 写入硬盘
                if (durable) {
                    diskDataCenter.insertExchange(exchange);
                }
                // 4, 写入内存
                memoryDataCenter.addExchange(exchange);
                System.out.println("[VirtualHost.exchangeDeclare()] exchangeName =  " + 
                        exchangeName + "的交换机创建成功");
                return true;
            }
        } catch (Exception e) {
            System.out.println("[VirtualHost.exchangeDeclare()] exchangeName =  " +
                    exchangeName + "的交换机创建失败");
            e.printStackTrace();
            return false;
        }
    }

按照先写入硬盘, 再写入内存的顺序编写代码, 因为写硬盘失败概率更⼤, 如果硬盘写失败了, 也就不必写内存了


4, exchageDelete() 删除交换机

先使用交换机唯一标识查找交换机, 该交换机如果不存在, 就无法删除

如果该交换机是持久化存储的, 则先删除硬盘, 再删除内存

	/**
     * 删除交换机
     * @param exchangeName 唯一标识
     * @return true/false
     */
    public boolean exchangeDelete(String exchangeName) throws MQException {
        exchangeName = virtualHostName + "-" + exchangeName;
        try {
            synchronized (exchangeLock) {
                // 1, 查找该交换机
                Exchange exchangeExists = memoryDataCenter.getExchange(exchangeName);
                if (exchangeExists == null) {
                    throw new MQException("VirtualHost.exchangeDelete() exchangeName = " +
                            exchangeName + "的交换机不存在, 删除失败");
                }
                // 2, 硬盘删除
                if (exchangeExists.isDurable()) {
                    diskDataCenter.deleteExchange(exchangeName);
                }
                // 3, 内存删除
                memoryDataCenter.removeExchange(exchangeName);
                System.out.println("VirtualHost.exchangeDelete() exchangeName =  "+ exchangeName + "的交换机删除成功");
                return true;
            }
        } catch (MQException e) {
            System.out.println("VirtualHost.exchangeDelete() exchangeName = " + exchangeName + "的交换机删除失败");
            e.printStackTrace();
            return false;
        }
    }

5, queueDeclare() 创建队列

    /**
     * 创建队列
     * @param queueName 唯一标识
     * @param durable 是否持久化
     * @param exclusive 是否被独占
     * @param autoDelete 是否自动删除
     * @param arguments 额外参数
     * @return true/false
     */
    public boolean queueDeclare(String queueName,
                                boolean durable,
                                boolean exclusive,
                                boolean autoDelete,
                                Map<String, Object> arguments) {
        queueName = virtualHostName + "-" + queueName;
        try {
            synchronized (queueLock) {
                // 1, 查找是否存在
                MessageQueue queueExists = memoryDataCenter.getQueue(queueName);
                if (queueExists != null) {
                    System.out.println("[VirtualHost.queueDeclare()] queueName =  " + queueName + "的队列已存在, 创建失败");
                    return true;
                }
                // 2, 创建队列
                MessageQueue queue = new MessageQueue();
                queue.setName(queueName);
                queue.setDurable(durable);
                queue.setExclusive(exclusive);
                queue.setAutoDelete(autoDelete);
                queue.setArguments(arguments);
                // 3, 硬盘存储
                if (durable) {
                    diskDataCenter.insertQueue(queue);
                }
                // 4, 内存存储
                memoryDataCenter.addQueue(queue);
                System.out.println("[VirtualHost.queueDeclare()] queueName =  " + queueName + "的队列创建成功");
                return true;
            }
        } catch (Exception e) {
            System.out.println("[VirtualHost.queueDeclare()] queueName =  " + queueName + "的队列创建失败");
            e.printStackTrace();
            return false;
        }
    }

6, queueDelete() 删除队列

    /**
     * 删除队列
     * @param queueName 唯一标识
     * @return true/false
     */
    public boolean queueDelete(String queueName) {
        queueName = virtualHostName + "-" + queueName;
        try {
            synchronized (queueLock) {
                MessageQueue queueExists = memoryDataCenter.getQueue(queueName);
                // 1, 检查是否存在
                if (queueExists == null) {
                    throw new MQException("VirtualHost.queueDelete() queueName = " + queueName + "的队列已经存在, 删除失败");
                }
                // 2, 硬盘删除
                if (queueExists.isDurable()) {
                    diskDataCenter.deleteQueue(queueName);
                }
                // 3, 内存删除
                memoryDataCenter.removeQueue(queueName);
                System.out.println("VirtualHost.queueDelete() queueName = " + queueName + "的队列删除成功");
                return true;
            }
        } catch (Exception e) {
            System.out.println("VirtualHost.queueDelete() queueName = " + queueName + "的队列删除失败");
            e.printStackTrace();
            return false;
        }
    }

7, queueBind() 创建绑定

  • 先检查交换机和队列的绑定是否存在, 如果存在, 则不创建绑定, 也不抛异常, 直接返回即可
  • 检查 bindingKey 是否合法(后续介绍判定规则)
  • 检查指定的交换机和队列是否存在, 如果不存在, 自然不能创建绑定, 应该抛异常
	public boolean queueBind(String exchangeName, String queueName, String bindingKey) {
        exchangeName = virtualHostName + "-" + exchangeName;
        queueName = virtualHostName + "-" + queueName;
        try {
            synchronized (exchangeLock) {
                synchronized (queueLock) {
                    // 1, 检查绑定是否存在
                    Binding bindingExists = memoryDataCenter.getBinding(exchangeName, queueName);
                    if (bindingExists != null) {
                        System.out.println("[VirtualHost.queueBind()] exchangeName = " + exchangeName
                                + ", queueName = " + queueName + "的绑定已经存在, 无需重复创建");
                        return true; // 这里是否应该允许true??这里是否应该允许true??这里是否应该允许true??这里是否应该允许true??这里是否应该允许true??这里是否应该允许true??
                    }
                    // 2, 检查routingKey是否合法
                    if (!router.checkBindingKey(bindingKey)) {
                        throw new MQException("[VirtualHost.queueBind()] bindingKey = " + bindingKey + "不合法, 绑定创建失败");
                    }
                    // 3, 创建binding对象
                    Binding binding = new Binding();
                    binding.setExchangeName(exchangeName);
                    binding.setQueueName(queueName);
                    binding.setBindingKey(bindingKey);
                    // 4, 检查交换机/队列是否存在
                    Exchange exchangeExists = memoryDataCenter.getExchange(exchangeName);
                    MessageQueue queueExists = memoryDataCenter.getQueue(queueName);
                    if(exchangeExists == null) {
                        throw new MQException("[VirtualHost.queueBind()] exchangeName = " + exchangeName + "的队列不存在, 绑定创建失败");
                    }
                    if(queueExists == null){
                        throw new MQException("[VirtualHost.queueBind()] queueName = " + queueName + "的队列不存在, 绑定创建失败");
                    }
                    // 5, 写入硬盘
                    if (exchangeExists.isDurable() && queueExists.isDurable()) {
                        diskDataCenter.insertBinding(binding);
                    }
                    // 6, 写入内存
                    memoryDataCenter.addBinding(binding);
                    System.out.println("[VirtualHost.queueBind()] exchangeName = " + exchangeName
                            + ", queueName = " + queueName + "的绑定创建成功");
                    return true;
                }
            }
        } catch (Exception e) {
            System.out.println("[VirtualHost.queueBind()] exchangeName = " + exchangeName
                    + ", queueName = " + queueName + "的绑定创建失败");
            e.printStackTrace();
            return false;
        }
    }

8, queueUnBind() 删除绑定

先检查绑定是否存在, 如果不存在, 自然不能删除, 应该抛异常

    public boolean queueUnBind(String exchangeName, String queueName) {
        exchangeName = virtualHostName + "-" + exchangeName;
        queueName = virtualHostName + "-" + queueName;
        try {
            synchronized (exchangeLock) {
                synchronized (queueLock) {
                    // 1, 检查 binding 是否存在
                    Binding bindingExists = memoryDataCenter.getBinding(exchangeName, queueName);
                    if (bindingExists == null) {
                        throw new MQException("[VirtualHost.queueUnBind()] exchangeName = " + exchangeName
                                + ", queueName = " + queueName + "的绑定不存在, 删除失败");
                    }
                    // 2, 硬盘删除
                    diskDataCenter.deleteBinding(bindingExists);
                    // 3, 内存删除
                    memoryDataCenter.removeBinding(bindingExists);
                    System.out.println("[VirtualHost.queueUnBind()] exchangeName = " + exchangeName
                            + ", queueName = " + queueName + "的绑定删除成功");
                    return true;
                }
            }
        } catch (Exception e) {
            System.out.println("[VirtualHost.queueUnBind()] exchangeName = " + exchangeName
            + ", queueName = " + queueName + "的绑定删除失败");
            e.printStackTrace();
            return false;
        }
    }

9, basicPublish() 发布消息

  • 在创建绑定的时候判定了 bindingKey 是否合法, 在此处发布消息的时候要判定 routingKey 是否合法(这个方法下面介绍)
  • 检查交换机是否存在
  • 如果是直接交换机, 直接把 routingKey 作为队列名, 找到该队列
  • 如果是扇出/主题交换机, 要先判定 bindingKey 和 routingKey 是否匹配, 如果匹配才能转发(这个方法下面介绍)
	public boolean basicPublish(String exchangeName,
                                String routingKey,
                                BasicProperties basicProperties,
                                byte[] body) {
        exchangeName = virtualHostName + "-" + exchangeName;
        try {
            // 1, 检查routingKey是否合法
            if (!router.checkRoutingKey(routingKey)) {
                throw new MQException("[VirtualHost.basicPublish()] routingKey = " + routingKey + "不合法, 消息发布失败");
            }
            // 2, 查找交换机是否存在
            Exchange exchangeExists = memoryDataCenter.getExchange(exchangeName);
            if (exchangeExists == null) {
                throw new MQException("[VirtualHost.basicPublish()] exchangeName = " + exchangeName + "的交换机不存在, 消息发布失败");
            }
            // 3, 构造消息对象
            Message message = Message.createMessage(routingKey, basicProperties, body);
            // 4, 判定交换机类型
            if (exchangeExists.getType() == ExchangeTypeEnum.DIRECT) {
                // 如果是直接交换机, routingKey就作为队列名
                String queueName = virtualHostName + "-" + routingKey;
                // 判定队列是否存在
                MessageQueue queueExists = memoryDataCenter.getQueue(queueName);
                if (queueExists == null) {
                    throw new MQException("[VirtualHost.basicPublish()] queueName = " + queueName + "的队列不存在, 消息发布失败");
                }
                sendMessage(queueExists, message);
            } else {
                ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);
                for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {
                    Binding binding = entry.getValue();
                    // 判定队列是否存在, 以及 bindingKey 和 routingKey 是否匹配
                    MessageQueue queueExists = memoryDataCenter.getQueue(binding.getQueueName());
                    if (queueExists == null || !router.route(exchangeExists.getType(), binding, message)) {
                        continue;
                    }
                    sendMessage(queueExists, message);
                }
            }
            return true;
        } catch (MQException e) {
            System.out.println("[VirtualHost.basicPublish()] 消息发布失败");
            e.printStackTrace();
            return false;
        }
    }
  • 首先队列为持久化存储, 并且该消息是持久化存储, 才能写入硬盘
  • 生产者发布了消息, 此时应该提醒消费者消费消息(这个方法后续实现)
    private void sendMessage(MessageQueue queue, Message message) {
        try {
            // 1, 写入硬盘
            if ( queue.isDurable() && message.getDeliverMode() == 2 ) {
                diskDataCenter.sendMessage(queue, message);
            }
            // 2, 写入内存
            memoryDataCenter.sendMessage(queue, message);
            // 3, 给消费者推送消息(message-->queue, queueName-->tokenQueue)
            consumerManager.notifyConsumer(queue.getName());
        } catch (InterruptedException | IOException | MQException e) {
            System.out.println("'[VirtualHost.sendMessage()] 消息发送失败");
            e.printStackTrace();
        }
    }

10, basicSubscribe() 订阅消息

这个方法涉及到 ConsumerManager 这个类的实现, 下篇文章再介绍 ConsumerManager

这个方法只需要做一步: 添加一个消费者, 后续的逻辑(服务器收到消息后就转发给消费者)交给 ConsumerManager 处理

    /**
     * 添加一个指定队列的消费者, 来订阅消息
     * 队列中有消息了就推送给订阅了该队列的消费者(订阅者)
     * @param consumerTag 消费者唯一身份标识
     * @param queueName   队列唯一身份标识
     * @param autoAck     是否自动应答
     * @param consumable  实现把消息推送给订阅者的接口
     */
    public boolean basicSubscribe(String consumerTag,
                                  String queueName,
                                  boolean autoAck,
                                  Consumable consumable) {
        queueName = virtualHostName + "-" + queueName;
        try {
            consumerManager.addConsumer(consumerTag, queueName, autoAck, consumable);
            System.out.println("[VirtualHost.basicSubscribe()] consumerTag = " + consumerTag + "添加消费者成功");
            return true;
        } catch (MQException e) {
            System.out.println("[VirtualHost.basicSubscribe()] consumerTag = " + consumerTag + "添加消费者失败");
            e.printStackTrace();
            return false;
        }
    }

11, basicAck() 确认应答

这个方法用于, 服务器给消费者推送消息之后, 如果消费者选择手动应答, 就应该主动的调用服务器的这个方法

  • 先查询队列和消息是否都存在
  • 如果都存在, 分别在硬盘和内存中, 删除该消息的即可
	public boolean basicAck(String queueName, String messageId) {
        queueName = virtualHostName + "-" + queueName;
        try {
            // 1, 查找队列和消息
            MessageQueue queueExists = memoryDataCenter.getQueue(queueName);
            if (queueExists == null) {
                throw new MQException("[VirtualHost.basicAck()] queueName = " + queueName + "的交换机不存在, 确认应答失败");
            }
            Message messageExists = memoryDataCenter.getMessage(messageId);
            if (messageExists == null) {
                throw new MQException("[VirtualHost.basicAck()] messageId = " + messageId + "的消息不存在, 确认应答失败");
            }
            // 2, 硬盘删除
            if (queueExists.isDurable() && messageExists.getDeliverMode() == 2) {
                diskDataCenter.deleteMessage(queueExists, messageExists);
            }
            // 3, 内存删除
            memoryDataCenter.removeMessage(messageId);
            memoryDataCenter.removeMessageNotAck(queueName, messageId);
            System.out.println("[VirtualHost.basicAck()] queueName = " + queueName +
                    ", messageId = " + messageId + "的确认应答成功");
            return true;
        } catch (MQException | IOException e) {
            System.out.println("[VirtualHost.basicAck()] queueName = " + queueName +
                    ", messageId = " + messageId + "的确认应答失败");
            e.printStackTrace();
            return false;
        }
    }

三、实现交换机的转发/绑定规则

在这里插入图片描述

在 server.core.Router 类中编写代码, 这篇文章 介绍了核心类的实现, 当时没有实现这一块的内容
在这里插入图片描述


1, 设计 bindingKey 和 routingKey

⼀个 routingKey 是由数字, 字⺟, 下划线构成的, 并且可以使⽤ . 分成若⼲部分.
形如 aaa.bbb.ccc
⼀个 bindingKey 是由数字, 字⺟, 下划线构成的, 并且使⽤ . 分成若⼲部分.
另外, ⽀持 *# 两种通配符. (*, # 只能作为 . 切分出来的独⽴部分, 不能和其他数字字⺟混⽤, ⽐如 a.*.b 是合法的, a.*a.b 是不合法的).

  • 其中 * 可以匹配任意⼀个单词.
  • 其中 # 可以匹配任意零个或者多个单词.

bindingKey 为 a.*.b, 可以匹配 routingKey 为 a.a.b 和 a.b.b 和 a.aaa.b
bindingKey 为 a.#.b, 可以匹配 routingKey 为 a.a.b 和 a.b.b 和 a.aaa.b 和 a.aa.bb.b 和 a.b


2, checkBindingKey() 检查 bindingKey是否合法

bindngKey可以为""(空串), 如果交换机类型为直接 / 扇出, bindingKey 用不上, 参数传""即可

  1. 允许是空字符串
  2. 数字字⺟下划线构成
  3. 可以包含通配符
  4. # 不能连续出现
  5. #*不能相邻
	public boolean checkBindingKey(String bindingKey) {
        if (bindingKey.length() == 0) {
            return true;
        }
        for (int i = 0; i < bindingKey.length(); i++) {
            char ch = bindingKey.charAt(i);
            if ((ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z') || (ch >= '0' && ch <= '9')
                    || (ch == '_' || ch == '.' || ch == '*' || ch == '#')) {
                continue;
            }
            return false;
        }
        String[] bindingKeyFragments = bindingKey.split("\\.");
        for (String fragment : bindingKeyFragments) {
            if (fragment.length() > 1 && (fragment.contains("*") || fragment.contains("#"))) {
                return false;
            }
        }
        for (int i = 0; i < bindingKeyFragments.length - 1; i++) {
            // 连续两个 ##
            if (bindingKeyFragments[i].equals("#") && bindingKeyFragments[i + 1].equals("#")) {
                return false;
            }
            // # 连着 *
            if (bindingKeyFragments[i].equals("#") && bindingKeyFragments[i + 1].equals("*")) {
                return false;
            }
            // * 连着 #
            if (bindingKeyFragments[i].equals("*") && bindingKeyFragments[i + 1].equals("#")) {
                return false;
            }
        }
        return true;
    }

3, checkRoutingKey() 检查 routingKey 是否合法

如果是扇出交换机, routingKey 用不上, 设置为""即可

    public boolean checkRoutingKey(String routingKey) {
        if (routingKey.length() == 0) {
            return true;
        }
        for (int i = 0; i < routingKey.length(); i++) {
            char ch = routingKey.charAt(i);
            if ((ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z') || (ch >= '0' && ch <= '9')
                    || (ch == '_' || ch == '.' )) {
                continue;
            }
            return false;
        }
        return true;
    }

4, route() 判定转发规则

直接交换机没有转发这一说, 在上述 basicPublish() 这个方法里已经单独处理过了

	public boolean route(ExchangeTypeEnum exchangeTypeEnum, Binding binding, Message message) throws MQException {
        if (exchangeTypeEnum == ExchangeTypeEnum.FANOUT) {
            return routeFanout(binding, message);
        } else if (exchangeTypeEnum == ExchangeTypeEnum.TOPIC) {
            return routeTopic(binding, message);
        }else {
            throw new MQException("[Router.route()] 非法的交换机类型");
        }
    }

5, routeFanout() 规定扇出交换机转发规则

没有转发规则, 只要是绑定了的队列都能转发, 这里单拎出来一个方法是为了代码风格统一

    public boolean routeFanout(Binding binding, Message message) {
        return true;
    }

6, routeTopic() 规定主题交换机转发规则

这个方法就是用来实现判定 routingKey 和 bindingKey 是否匹配了

	public boolean routeTopic(Binding binding, Message message) {
        String[] bindingKeyFragments = binding.getBindingKey().split("\\.");
        String[] routingKeyFragments = message.getRoutingKey().split("\\.");
        int i = 0;
        int j = 0;
        while (i < bindingKeyFragments.length && j < routingKeyFragments.length) {
            // 遇到 * 只能匹配一个字符
            if (bindingKeyFragments[i].equals("*")) {
                i++;
                j++;
                continue;
            }
            // 遇到 # 就找下一个片段
            if (bindingKeyFragments[i].equals("#")) {
                i++;
                if (i == bindingKeyFragments.length) {
                    return true;
                }
                // 说明#后面还有片段, 让 j 寻找这个片段
                int nextMatchIndex = findNextMatchIndex(bindingKeyFragments[i], j, routingKeyFragments);
                if (nextMatchIndex == -1) {
                    return false;
                }
                j = nextMatchIndex;
                i++;
                j++;
                continue;
            }
            if (!bindingKeyFragments[i].equals(routingKeyFragments[j])) {
                return false;
            }
            i++;
            j++;
        }
        return i == bindingKeyFragments.length && j == routingKeyFragments.length;
    }

    public int findNextMatchIndex(String cur, int j, String[] routingKeyFragments) {
        while (j < routingKeyFragments.length) {
            if (routingKeyFragments[j].equals(cur)) {
                return j;
            }
            j++;
        }
        return -1;
    }

四、小结

本篇主要实现了"虚拟主机"
虚拟主机的作用是为了隔离不同业务线的数据, 本项目暂时只支持一个虚拟主机

  • 虚拟主机把硬盘(数据库+文件)和内存这两个模块的数据管理整合在一起, 并且封装了一系列核心 API, 供上层( BrokerServer )调用
  • 在server.core 包下实现了 Router 类, 这个类主要是在生产者发布消息时, 用来检查 bindingKey 和routingKey 是否合法, routingKey 和 bindingKey 是否匹配等, 为了实现服务器支持三种不同的交换机转发模式

在这里插入图片描述

这些核心 API 基本上都实现了, 还有 basicSubscribe() 这个 API 没有完全实现, 在下篇文章会介绍 ConsumerManager 类, 用来实现和消费者消费消息相关的逻辑

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1057798.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于Java的高校办公室会议行政事务管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09;有保障的售后福利 代码参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作…

电气基础——电源、变压器、接触器、断路器、线缆

目录 1.电源 2.变压器 三项电和2相电的转换 3.接触器 4.断路器 5.线缆 1.电源 2.变压器 三项电和2相电的转换 三相电转为2相电 1.变压器 2.相位移转换器 3.电容器 两相电变不了三相电。但是两相电可以通过电容移相后带动三相电机 零线&#xff0c;地线N&#xff…

VD6283TX环境光传感器驱动开发(1)----获取ID

VD6283TX环境光传感器驱动开发----1.获取ID 概述视频教学样品申请源码下载模块参数IIC接线方式设备ID生成STM32CUBEMX串口配置 IIC配置串口重定向模块地址获取ID主函数结果演示 概述 环境光传感器是一种光电探测器&#xff0c;能够将光转换为电压或者电流&#xff0c;使用多光…

适合在校学生的云服务器有哪些?

随着云计算技术的发展&#xff0c;越来越多的学生开始使用云服务器来进行学习和实践。对于学生来说&#xff0c;选择一款便宜的云服务器不仅可以帮助他们降低成本&#xff0c;还可以提高学习和实践的效率。本文将介绍几款适合学生使用的便宜云服务器。 1、腾讯云学生服务器【点…

最新反编译小程序教程(支持分包一键反编译),反编译成功率高达99%

最新反编译小程序教程&#xff08;支持分包一键反编译&#xff09;&#xff0c;反编译成功率高达99% 优点&#xff1a; 1.支持多个分包以及主包一次性反编译&#xff1b; 2.使用wxappUnpacker无法进行解析的小程序包&#xff0c;一键反编译解析&#xff08;咱没有发现反编译失败…

【面试经典150 | 矩阵】生命游戏

文章目录 写在前面Tag题目来源题目解读解题思路方法一&#xff1a; O ( m n ) O(mn) O(mn) 额外空间方法二&#xff1a; O ( 1 ) O(1) O(1) 额外空间 写在最后 写在前面 本专栏专注于分析与讲解【面试经典150】算法&#xff0c;两到三天更新一篇文章&#xff0c;欢迎催更…… 专…

Explain执行计划字段解释说明---select_type、table、patitions字段说明

1、select_type的类型有哪些 2、select_type的查询类型说明 1、SIMPLE 简单的 select 查询,查询中不包含子查询或者UNION 2、PRIMARY 查询中若包含任何复杂的子部分&#xff0c;最外层查询则被标记为Primary 3、DERIVED 在FROM列表中包含的子查询被标记为DERIVED(衍生)&…

cygwin下载、安装

官网 Cygwin Installation 下载链接 http://www.cygwin.com/setup-x86_64.exe 安装 双击安装 从互联网安装&#xff0c;这种模式直接从Internet安装&#xff0c;适合网速较快的情况&#xff1b;下载而不安装&#xff0c;这种模式只从网上下载Cygwin的组件包&#xff0c;但不…

计算机竞赛 深度学习驾驶行为状态检测系统(疲劳 抽烟 喝水 玩手机) - opencv python

文章目录 1 前言1 课题背景2 相关技术2.1 Dlib人脸识别库2.2 疲劳检测算法2.3 YOLOV5算法 3 效果展示3.1 眨眼3.2 打哈欠3.3 使用手机检测3.4 抽烟检测3.5 喝水检测 4 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 基于深度学习的驾…

【Python 基础 2023 最新】第七课 Pandas

【Python 基础 2022 最新】第七课 Pandas 概述Pandas 是什么?Pandas 的应用场景安装 Pandas Pandas 数据结构Series 数组什么是 Series?Series 创建 Series 数组操作数据检索数据修改过滤Series 数组运算总结 什么是 DataFrameDataFrame 创建 DataFrame 操作数据检索筛选数据…

YAMLException : java.nio.charset.MalformedInputException : Input length = 1

场景还原 有小伙伴反应SpringBoot项目启动异常&#xff0c;但是同组其他伙伴的无问题&#xff01; ERROR org.springframework.boot.SpringApplication - Application run failedorg.yaml.snakeyaml.error.YAMLException: java.nio.charset.MalformedInputException : Inpu…

springboot的配置文件(properties和yml/yaml)

springboot的配置文件有两种格式分别是properties和yml/yaml 创建配置文件 在创建springboot项目时候&#xff0c;会默认生成application.properties这种格式 书写风格 端口 application.propertis server.port8080 application.yml server:port: 8080 连接数据库 applica…

警用装备管理系统|智装备DW-S304的主要功能

东识科技&#xff08;DONWIT&#xff09;警用装备管理系统DW-S304是依托互3D技术、云计算、大数据、RFID技术、数据库技术、AI、视频分析技术对RFID智能仓库进行统一管理、分析的信息化、智能化、规范化的系统。 在国外很早开始便使用警用装备管理系统对警用装备的管理使用进行…

计算机网络(五):运输层

参考引用 计算机网络微课堂-湖科大教书匠计算机网络&#xff08;第7版&#xff09;-谢希仁 1. 运输层概述 之前所介绍的计算机网络体系结构中的物理层、数据链路层以及网络层它们共同解决了将主机通过异构网络互联起来所面临的问题&#xff0c;实现了主机到主机的通信&#xff…

Go开始:Go基本元素介绍

目录 标识符与关键字Go中的标识符Go关键字关键字示例 具名的函数常规函数代码示例 方法代码示例 高阶函数代码示例 匿名函数与Lambda表达式代码示例 闭包代码示例 具名的值变量基本数据类型复合数据类型指针类型 常量基本常量类型枚举常量常量表达式 定义类型和类型别名类型定义…

学过的汇编指令整合

1.数据搬移指令 <opcode>{<cond>}{s} <Rd>, <shifter_operand> 解释&#xff1a; <opcode>&#xff1a;指令码 {<cond>}&#xff1a;条件码 {s}&#xff1a;状态位&#xff0c;如果在指令后面加上s&#xff0c;则运算的结果会影响CPSR的条…

Docker命令起别名

1.打开.bashrc文件 vi ~/.bashrc 2. 起别名 alias dpsdocker ps --format "table{{.ID}}\t{{.Names}}\t{{.Image}}\t{{.Status}}" alias disdocker images 3. 文件生效 source ~/.bashrc 4.展示

计算机竞赛 目标检测-行人车辆检测流量计数

文章目录 前言1\. 目标检测概况1.1 什么是目标检测&#xff1f;1.2 发展阶段 2\. 行人检测2.1 行人检测简介2.2 行人检测技术难点2.3 行人检测实现效果2.4 关键代码-训练过程 最后 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 行人车辆目标检测计数系统 …

2023年全国控制科学与工程学科评估结果 - 自动化考研

考研选择学校时&#xff0c;控制科学与工程考研学校排名情况怎样是广大考研学子十分关心的问题&#xff0c;以下是我们自动化考研联盟为大家整理得最新控制科学与工程学科评估结果情况&#xff0c;还比较权威&#xff0c;供大家参考。 最后祝大家一战成硕,有其他问题欢迎评论区…

【前段基础入门之】=>CSS浮动

浮动的简介 在最初&#xff0c;浮动是用来实现文字环绕图片效果的&#xff0c;现在浮动是主流的页面布局方式之一。 元素浮动后的特点 &#x1f922; 脱离文档流。&#x1f60a; 不管浮动前是什么元素&#xff0c;浮动后&#xff1a;默认宽与高都是被内容撑开&#xff08;尽…