模拟实现消息队列项目(系列7) -- 实现BrokerServer

news2025/2/21 22:50:25

目录

前言

1. 创建BrokerServer类

1.1 启动服务器

1.2 停止服务器

1.3 处理一个客户端的连接

1.3.1 解析请求得到Request对象

1.3.2 根据请求计算响应

1.3.3 将响应写回给客户端

1.3.4 遍历Session的哈希表,把断开的Socket对象的键值对进行删除

2. 处理订阅消息请求详解(补充)

3. 序列化/反序列化实现(补充)

结语


前言

        上一章节,我们定义了本项目的应用层传输协议.并且创建了各种参数类.本章节的目标是对BrokerServer(实现一个TCP服务器)进行实现,对连接进行处理,根据请求计算响应返回给客户端.


1. 创建BrokerServer类

public class BrokerServer {
    // 当前考虑一个一个服务器中只有一个虚拟主机
    private VirtualHost virtualHost = new VirtualHost("default");

    // 使用哈希表表示当前会话,也就是有哪些客户端在和服务器进行通信

    // key: channelId value:对应对的Socket对象
    private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();
    
    private ServerSocket serverSocket = null;

    // 引入线程池来处理多个客户端
    private ExecutorService executorService = null;

    // 控制服务器是否继续运行
    private volatile boolean runnable = true;
}

1.1 启动服务器

1. 首先将线程池进行创建,用来处理多个连接.

2. 设置循环用来监听连接

3. 将处理连接交给线程池.

/**
     * 1. 启动服务器
     */
    public void start() throws IOException {
        System.out.println("[BrokerServer] 启动!");
        // newCachedThreadPool自动申请新的线程
        executorService = Executors.newCachedThreadPool();
        try {
            while (runnable){
                Socket clientSocket = serverSocket.accept();
                // 把处理连接的逻辑发送给线程池
                executorService.submit(()->{
                    processConnection(clientSocket);
                });
            }
        }catch (SocketException e){
            System.out.println("[BrokerServer] 服务器停止运行!");
//            e.printStackTrace();
        }

    }

1.2 停止服务器

1. 将标志位runnable设置为false

2. 停止线程池的服务

3. 关闭服务器套接字

/**
     * 2. 停止服务器
     */
    public void stop() throws IOException {
        runnable = false;
        // 停止线程池
        executorService.shutdownNow();
        serverSocket.close();
    }

1.3 处理一个客户端的连接

1. 我们是从请求中获取的信息是二进制文件,我们不能直接使用InputStream和OutputStream,我们借助DataInputStream和DataOutputStream进行操作字节流.

2. 使用DataInputStream进行读取请求的时候,读到末尾的时候会抛出一个异常,我们将这个异常视作为处理正确的业务逻辑.我么catch掉这个异常就可以.

3. 解析得到请求对象

4. 更具请求计算响应

5. 当处理完响应之后,要进行关闭连接,并且将一个连接中其他Channel进行关闭.

/**
     * 3. 处理一个客户端的连接
     *    在一个连接中会出现多个请求和多个响应.在一个连接中要循环的处理
     */
    private void processConnection(Socket clientSocket) {
        try(InputStream inputStream = clientSocket.getInputStream();
            OutputStream outputStream = clientSocket.getOutputStream()){
            try(DataInputStream dataInputStream = new DataInputStream(inputStream);
                DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
                while (true){
                    // 1. 读取并解析请求
                    Request request = readRequest(dataInputStream);
                    // 2. 根据请求计算响应
                    Response response = process(request,clientSocket);
                    // 3. 将响应写回客户端
                    writeResponse(dataOutputStream, response);
                }
            }
        }
        catch (EOFException | SocketException e){
            // 处理正确的业务逻辑
            // 上述进行读取数据的时候,如果数据读到末尾(EOF) ,就会抛出一个异常
            // 借助这个异常结束上述循环
            System.out.println("[BrokerServer]  连接关闭! 客户端地址:" +
                    clientSocket.getInetAddress().toString()
                    + ",端口号: "+clientSocket.getPort());
        }
        catch (ClassNotFoundException | MqException e) {
            e.printStackTrace();
        } catch (IOException e) {
            // 处理真正的异常
            System.out.println("[BrokerServer] connection 出现异常");
            e.printStackTrace();
        }finally {
            try {
                // 当连接处理完成之后,进行关闭连接
                clientSocket.close();
                // 一个连接中可能会包含多个channel,需要把当前这个Socket对应的所有channel进行关闭
                clearClosedSession(clientSocket);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

1.3.1 解析请求得到Request对象

1. 根据我们自定义的格式,先读前4个字节是请求的类型,在读4个字节是payload的长度,在读就是payload.

2. 读取payload的时候,我们先根据长度创建字符数组,然后按照字符数组进行获取payload,比较读取完的长度是否与原来请求的长度一致,不一致说明有消息的丢失.进行抛出异常.

3. 最后得到完整的请求对象,交给下面的方法进行处理.

 /**
     * 3.1 解析请求得到Request对象
     */
    private Request readRequest(DataInputStream dataInputStream) throws IOException {
        Request request = new Request();
        // 1. 首先读取四个字节,为请求的type
        request.setType(dataInputStream.readInt());
        // 2. 在读四个字节就是payload的长度
        request.setLength(dataInputStream.readInt());
        // 3. 创建字符数组,并进行读取到数组中
        byte[] payload = new byte[request.getLength()];
        int n = dataInputStream.read(payload);
        if (n != request.getLength()){
            throw new IOException("[BrokerServer] 请求格式出错");
        }
        // 4. 将读取的数组内容写入到实体的Request对象中
        request.setPayload(payload);
        return request;
    }

1.3.2 根据请求计算响应

1. 我们根据请求对象的payload进行解析,此处需要注意的是,我们读取到的payload是字节数组,我们需要进行反序列化成字符数组.

2. 根据请求对象的Type值进行区分,到底客户端要调用服务器那些功能.

3. 处理完请求之后就要进行构造响应了.

4. 返回响应对象

/**
     * 3.2 根据请求计算响应
     * @param request
     * @param clientSocket
     * @return
     */
    private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
        // 1. 根据request中的payload进行解析
        //  payload 是根据 request 中 type 进行变化的
        BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
        // 打印请求的信息
        System.out.println("[Request] rid=" + basicArguments.getRid()
        + ", channelId=" + basicArguments.getChannelId() +
                "type=" + request.getType() + ",length=" + request.getLength());

        // 2. 根据type的值,区分调用哪种功能
        boolean ok = true;
        if (request.getType() == 0X1){
            // 1. 创建一个channel
            sessions.put(basicArguments.getChannelId(),clientSocket);
            System.out.println("[BrokerServer] 创建channel完成 getChannelId="+ basicArguments.getChannelId());
        }else if (request.getType() == 0x2){
            // 2. 销毁一个channel
            sessions.remove(basicArguments.getChannelId());
            System.out.println("[BrokerServer] 销毁channel完成 getChannelId="+ basicArguments.getChannelId());
        }else if (request.getType() == 0x3){
            // 3. 创建交换机
            ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
            // 调用虚拟主机的功能方法
            ok = virtualHost.exchangeDeclare(arguments.getExchangeName(),
                    arguments.getExchangeType(),
                    arguments.isDurable(),
                    arguments.isAutoDelete(),
                    arguments.getArguments());
            System.out.println("[BrokerServer] 创建交换机完成 ExchangeName="+ arguments.getExchangeName());
        }else if (request.getType() == 0x4){
            // 4. 销毁交换机
            ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
            ok = virtualHost.exchangeDelete(arguments.getExchangeName());
            System.out.println("[BrokerServer] 删除交换机完成 ExchangeName="+ arguments.getExchangeName());
        }else if (request.getType() == 0x5) {
            // 5. 创建队列
            QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
            ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),
                    arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());
            System.out.println("[BrokerServer] 创建队列完成 QueueName="+ arguments.getQueueName());
        } else if (request.getType() == 0x6) {
            // 6. 删除队列
            QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
            ok = virtualHost.queueDelete((arguments.getQueueName()));
            System.out.println("[BrokerServer] 删除队列完成 QueueName="+ arguments.getQueueName());
        } else if (request.getType() == 0x7) {
            // 7. 创建绑定
            QueueBindArguments arguments = (QueueBindArguments) basicArguments;
            ok = virtualHost.queueBind(arguments.getQueueName(),
                    arguments.getExchangeName(), arguments.getBindingKey());
            System.out.println("[BrokerServer] 创建绑定完成 QueueName="+ arguments.getQueueName()
            + ",ExchangeName=" + arguments.getExchangeName());
        } else if (request.getType() == 0x8) {
            // 8. 删除绑定
            QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;
            ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());
            System.out.println("[BrokerServer] 删除绑定完成 QueueName="+ arguments.getQueueName()
                    + ",ExchangeName=" + arguments.getExchangeName());
        } else if (request.getType() == 0x9) {
            // 9. 发布消息
            BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
            ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),
                    arguments.getBasicProperties(), arguments.getBody());
            System.out.println("[BrokerServer] 发布消息完成 ExchangeName=" + arguments.getExchangeName());
        }else if (request.getType() == 0xa) {
            // 10. 订阅消息
            BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;
            ok = virtualHost.basicConsume(arguments.getConsumeTag(), arguments.getQueueName(), arguments.isAutoAck(), new Consumer() {
                @Override
                public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws IOException, MqException {
                    // 将服务器收到的消息进行推送给客户端
                    // 先知道当前这个收到的消息, 要发给哪个客户端.
                    // 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的
                    // socket 对象了, 从而可以往里面发送数据了
                    // 1. 根据 channelId 找到 socket 对象
                    Socket clientSocket = sessions.get(consumerTag);
                    if (clientSocket == null || clientSocket.isClosed()) {
                        throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");
                    }
                    // 2. 构造响应数据
                    SubScribeReturns subScribeReturns = new SubScribeReturns();
                    subScribeReturns.setChannelId(consumerTag);
                    subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应. rid 暂时不需要.
                    subScribeReturns.setOk(true);
                    subScribeReturns.setConsumerTag(consumerTag);
                    subScribeReturns.setBasicProperties(basicProperties);
                    subScribeReturns.setBody(body);
                    byte[] payload = BinaryTool.toBytes(subScribeReturns);
                    Response response = new Response();
                    // 0xc 表示服务器给消费者客户端推送的消息数据.
                    response.setType(0xc);
                    // response 的 payload 就是一个 SubScribeReturns
                    response.setLength(payload.length);
                    response.setPayload(payload);
                    // 3. 把数据写回给客户端.
                    //    注意! 此处的 dataOutputStream 这个对象不能 close !!!
                    //    如果 把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了.
                    //    此时就无法继续往 socket 中写入后续数据了.
                    DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
                    writeResponse(dataOutputStream, response);
                }
            });
        }else if (request.getType() == 0xb) {
            // 10. 调用 basicAck 确认消息.
            BasicAckArguments arguments = (BasicAckArguments) basicArguments;
            ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());
            System.out.println("[BrokerServer] 消费者确认消息完成 QueueName=" + arguments.getQueueName()
            + ", MessageId=" + arguments.getMessageId());
        } else {
            // 当前的 type 是非法的.
            throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());
        }
        // 3. 构造响应
        BasicReturns basicReturns = new BasicReturns();
        basicReturns.setChannelId(basicArguments.getChannelId());
        basicReturns.setRid(basicArguments.getRid());
        basicReturns.setOk(ok);
        byte[] payload = BinaryTool.toBytes(basicReturns);
        Response response = new Response();
        response.setType(request.getType());
        response.setLength(payload.length);
        response.setPayload(payload);
        System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()
                + ", type=" + response.getType() + ", length=" + response.getLength());
        return response;
    }

1.3.3 将响应写回给客户端

1. 注意写入类型和长度是写入固定的4个字节,那么我们就使用dataOutputStream.writeInt()

2. 写完响应之后,记得要刷新缓冲区 dataOutputStream.flush();

/**
     * 3.3 将响应写回给客户端
     * @param dataOutputStream
     * @param response
     */
    private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
        // 将响应的属性从计算好的响应中进行设置
        dataOutputStream.writeInt(response.getType());
        dataOutputStream.writeInt(response.getLength());
        dataOutputStream.write(response.getPayload());
        // 刷新缓冲区
        dataOutputStream.flush();
    }

1.3.4 遍历Session的哈希表,把断开的Socket对象的键值对进行删除

由于Socket都已经断开连接了,那么存储在内存中的Session也就没有存在的必要了.这个集合中存放的是一个连接中的change对应的Session,当连接断开之后,Channel也就不会再进行工作了,新的连接会创建新的Channel.

注意:我们在使用Map.entrySet进行遍历Map的时候,不要一遍遍历一遍进行删除,这样是不稳定的,我们遍历Map将需要进行移除的Session进行添加到待删除的链表中,最后遍历待删除的数据结构进行删除.


上述就是整个封装好的BrokerServer服务器.


下面呢,我对有关根据请求计算响应中订阅消息这一功能,再进行详细的阐述,这块比较难以理解,因为涉及到回调函数,大家可能不知道这个回调函数掉用的时机是哪里.

2. 处理订阅消息请求详解(补充)

 第二个红框部分是回调函数.

        只有消费者订阅的队列中有消息了,并且轮询的方式选中了这个消费者,才会获得消息的本体,此时线程池才会执行到这个回调方法,此时才拿到消息的本体,可以将消息的属性和本体写入到SubscribeReturn中,进而推送给消费者进行消费消息.如果没有消息给这个消费者,那么也不会进行断开连接,只要服务器不断开连接客户端一直在等待分配的消息进行消费.这一点希望,读者能够进一步的理解.等总结完客户端,那么我就会带着大家,再来理一遍这个订阅消息的这个思路.

3. 序列化/反序列化实现(补充)

要想能进行序列化和反序列化就必须对目标对象进行实现serializable接口.

1. 我们使用ByteArrayOutputStream和ObjectOutputStream进行将一个对象序列化为字节数组(输出的是字节用output)

2. 我们ByteArrayInputStream和ObjectInputStream将一个字节数组反序列化成一个对象(输入的是字节用Input)


结语

        至此,我们就彻底的完成了mqserver 的搭建,只剩下mqclient的搭建,我们在下一系列完成客户端的搭建,请持续关注,谢谢!!!

        完整的项目代码已上传Gitee,欢迎大家访问.👇👇👇

模拟实现消息队列https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/849626.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

树、森林 与 二叉树

树、森林 与 二叉树 树结构树结构的基本概念根节点子节点父节点叶节点兄弟节点子树深度高度 树结构的特点二叉树森林查找与遍历方法查找深度优先搜索广度优先搜索 遍历前序遍历中序遍历后序遍历 应用场景 树结构 树结构是一种非常常见且重要的数据结构&#xff0c;它模拟了自然…

中间件RabbitMQ消息队列介绍

1. MQ的相关概念 1.1 什么是MQ MQ&#xff08;message queue&#xff09;&#xff0c;从字面意思上看&#xff0c;本质是个队列&#xff0c;FIFO先入先出&#xff0c;只不过队列中存放的内容是message而已&#xff0c;还是一种跨进程的通信机制&#xff0c;用于上下游传递消息…

Python Opencv实践 - 在图像上绘制图形

import cv2 as cv import numpy as np import matplotlib.pyplot as pltimg cv.imread("../SampleImages/pomeranian.png") print(img.shape)plt.imshow(img[:,:,::-1])#画直线 #cv.line(img,start,end,color,thickness) #参考资料&#xff1a;https://blog.csdn.ne…

【NLP】深入浅出全面回顾注意力机制

深入浅出全面回顾注意力机制 1. 注意力机制概述2. 举个例子&#xff1a;使用PyTorch带注意力机制的Encoder-Decoder模型3. Transformer架构回顾3.1 Transformer的顶层设计3.2 Encoder与Decoder的输入3.3 高并发长记忆的实现self-attention的矩阵计算形式多头注意力&#xff08;…

三次握手与四次挥手 tcp协议特点 tcp状态转移图 TIME_WAIT 抓包

讲解 三次握手图示理解讲解 四次挥手图示理解理解 tcp协议特点tcp状态转移过程总图四次挥手状态转移过程三次挥手状态转移过程 TIME_WAIT状态存在的原因连接状态的一个测试一个面试题&#xff1a;抓包&#xff1a; 三次握手 图示理解 三次握手发生在客户端执行 connect()的时…

elfk

1. 2. ​​​​​​​ 3. 4. 5.

UML—浅谈常用九种图

目录 概述: 1.用例图 2.静态图 3.行为图&#xff1a; 4.交互图&#xff1a; 5.实现图&#xff1a; 概述: UML的视图是由九种视图组成的&#xff0c;分别是用例图、类图、对象图、状态图、活动图、序列图、协作图、构件图、实施图。我们可以根据这9种图的功能和实现的目的…

redis是单线程的,那么他是怎么样避免阻塞的

Redis 实例有哪些阻塞点&#xff1f; Redis 实例在运行时&#xff0c;要和许多对象进行交互&#xff0c;这些不同的交互就会涉及不同的操作&#xff0c;下 面我们来看看和 Redis 实例交互的对象&#xff0c;以及交互时会发生的操作。 客户端&#xff1a;网络 IO&#xff0c;键…

springboot scheduling实现定时任务

文章目录 springboot实现定时任务开启springboot定时任务原因分析&#xff1a; 配置线程池&#xff0c;让定时任务指定并发执行先要线程异步执行springboot异步线程池设置指定线程池执行任务 springboot实现定时任务 开启springboot定时任务 springboot实现定时任务很简单&am…

Zebec Protocol 将进军尼泊尔市场,通过 Zebec Card 推动地区金融平等

流支付正在成为一种全新的支付形态&#xff0c;Zebec Protocol 作为流支付的主要推崇者&#xff0c;正在积极的推动该支付方案向更广泛的应用场景拓展。目前&#xff0c;Zebec Protocol 成功的将流支付应用在薪酬支付领域&#xff0c;并通过收购 WageLink 将其纳入旗下&#xf…

学习才是测试猿的永动力!超详细的 pytest 钩子函数 之初始钩子和引导钩子来啦

前 言 前几篇文章介绍了 pytest 点的基本使用&#xff0c;学完前面几篇的内容基本上就可以满足工作中编写用例和进行自动化测试的需求。从这篇文章开始会陆续给大家介绍 pytest 中的钩子函数&#xff0c;插件开发等等。仔细去看过 pytest 文档的小伙伴&#xff0c;应该都有发现…

内容创作创新技术-147seo采集工具

对于企业和个人来说&#xff0c;内容创作是推广和营销的重要手段。然而&#xff0c;手动撰写大量原创内容不仅费时费力&#xff0c;也有可能陷入创作的瓶颈。面对这一挑战&#xff0c;147采集图文自动改写原创发布应运而生。 147采集图文自动改写原创发布是一款专业、高效的工具…

js-5:==和===的区别,分别在什么情况下使用

1、等于操作符 等于操作符用两个等号&#xff08;&#xff09;表示&#xff0c;如果操作数相等&#xff0c;则返回true。 javascript中存在隐式转换&#xff0c;等于操作符在比较中会先进行类型转换&#xff0c;再确定操作数是否相等。 遵循以下规则&#xff1a; 如果任一操作数…

武汉多域名https证书能保护几个域名

https证书中可以用一张https证书保护多个域名网站的不止一个&#xff0c;泛域名https证书和多域名https证书都是可以用一张https证书保护多个域名站点&#xff0c;但是两种https证书保护的域名站点类型不同&#xff0c;上一篇我们了解了泛域名https证书&#xff0c;今天就随SSL…

【基于openharmony的多路摄像头功能:USB设备插拔检测】

前言 最近项目接触的模块比较繁多而杂&#xff0c;因此开始写文章记录下用以总结。 目前在做的是基于openharmony3.2的多camera功能主要涉及HDF(HAL)层与framework层。 本文章涉及多路摄像头功能的第一步&#xff1a;支持USB摄像头插拔检测。 内容 目前openharmony在HDF层…

399. 除法求值

题目描述&#xff1a; 主要思路&#xff1a; 本题主要利用并查集的思想&#xff0c;重点是要弄明白分子和分母的指向关系以及一系列的值的变化规则。 查询时如果两个数字不在一个集合里那么结果就为-1. class Solution { public:unordered_map<string,string> f;unorde…

港联证券|早盘三大指数涨跌不一 医药商业板块涨近3%

周二&#xff08;8月8日&#xff09;&#xff0c;三大指数涨跌纷歧&#xff0c;到上午收盘&#xff0c;上证指数涨0.01%&#xff0c;报3269.29点&#xff1b;深证成指和创业板指别离跌0.06%和0.05%&#xff1b;沪深两市合计成交额5062.22亿元&#xff0c;总体来看&#xff0c;两…

通信行业实操技巧,让你的基站无人能及!

当今社会&#xff0c;通信网络已经成为人们生活和工作中不可或缺的一部分&#xff0c;而电信基站作为这一网络的支撑和枢纽&#xff0c;扮演着至关重要的角色。 因此&#xff0c;精密空调监控在现代通信基站的运维中具有重要意义&#xff0c;为通信技术的发展和进步提供了有力支…

新一代构建工具 maven-mvnd

新一代构建工具 maven-mvnd mvnd的前世今生下载安装 mvndIDEA集成 mvnd的前世今生 maven 作为一代经典的构建工具&#xff0c;流行了很多年&#xff0c;知道现在依然是大部分Java项目的构建工具的首选&#xff1b;但随着项目复杂度提高&#xff0c;代码量及依赖库的增多使得ma…

Vue电商项目--服务器

购买服务器 就是如果想要别人访问我们的项目&#xff0c;那么我们就需要服务器。 我们之前使用node搭建服务器&#xff0c;只能在局域网中访问。 购买云服务器的方式有很多&#xff1a;像阿里云&#xff0c;腾讯云等等 腾讯云 产业智变云启未来 - 腾讯 (tencent.com) 安全…