参考RabbitMQ实现一个消息队列

news2025/1/20 6:02:26

文章目录

  • 前言
  • 小小消息管家
    • 1.项目介绍
    • 2. 需求分析
      • 2.1 API
      • 2.2 消息应答
      • 2.3 网络通信协议设计
    • 3. 开发环境
    • 4. 项目结构介绍
      • 4.1 配置信息
    • 5. 项目演示

前言

消息队列的本质就是阻塞队列,它的最大用途就是用来实现生产者消费者模型,从而实现解耦合以及削峰填谷

在分布式系统中不再是单个服务器而是服务器“集群”,如果我们我们直接A服务器给B服务器发送请求,B服务器给A服务器返回响应,这样的话我们AB的耦合较大,如果A或者B服务器挂了,我们业务也就崩溃了。引入消息队列之后我们将请求和响应都通过消息队列这个中间人来传递,就降低了耦合度。

同样的,如果我们AB服务器直接进行通信,如果A服务器突然发送许多请求,我们B服务器也会收到巨多请求的影响,AB由于硬件资源限制可能都会崩溃。如果我们引入消息队列,消息队列可以将许多请求接收存储下来,B服务器依然可以按照原有节奏取请求,不会一下子接收大量请求。这也就是我们所说的削峰填谷,也是类似的,就算请求过少,我们的服务器依然可以从消息队列中取出挤压得请求。

在分布式系统中,跨主机之间使用生产者消费者模型就显得非常重要了。 我们通常把阻塞队列封装成⼀个独立的服务器程序,并且新增一些功能。这样的程序我们就称为 消息队列

所以此次就仿照市面上得RabbitMQ实现一个简易的消息队列。

小小消息管家

1.项目介绍

将消息队列分成服务器模块、客户端模块、公共模块来实现。

  1. 服务器模块通过虚拟主机实现交换机、队列、绑定、消息等相关操作的隔离。虚拟主机主要负责对上述内容的数据进行硬盘(数据库和文件)以及内存管理、消息的三种转发方式如Direct、Fanout、Topic、为提供客户端API的实现。
  2. 客户端模块:实现连接管理提供建立连接、信道管理通过信道实现TCP连接的复用。在信道管理中实现客户端调用的API。
  3. 公共模块:约定客户端服务器的通信协议、数据传输过程中的序列化以及反序列化。

在这里插入图片描述

2. 需求分析

由于是实现一个生产者消费者模型,在消息队列中我们要实现的逻辑如下:我们的生产者客户端向服务器发送消息,消费者客户端向服务器订阅消息,服务器负责消息的存储和转发。

在这里插入图片描述

概念解读:

  • 虚拟主机 (VirtualHost):是⼀个逻辑上的集合,⼀个 BrokerServer 上可以存在多个 VirtualHost。
  • 交换机 (Exchange):生产者把消息先发送到 BrokerServer 的 Exchange 上。 再根据不同的规则把消息转发给不同的 Queue。
  • 队列 (Queue):真正用来存储消息,每个消费者决定自己从哪个 Queue 上读取消息。
  • 绑定 (Binding):Exchange 和 Queue 之间的关联关系。Exchange 和 Queue 可以理解成 “多对多” 关
    系。
  • 消息 (Message): 传递的内容。

2.1 API

我们的服务器提供以下API是西安消息队列的基本功能。

  1. 创建队列 (queueDeclare)
  2. 销毁队列 (queueDelete)
  3. 创建交换机 (exchangeDeclare)
  4. 销毁交换机 (exchangeDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)

我们的客户端提供以下API供客户使用消息队列,为了复用TCP连接,我们提供了一个Channel逻辑通道。所以在客户端我们还需要提供Channel的创建和关闭:

  1. 创建 Connection
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel
  5. 创建队列 (queueDeclare)
  6. 销毁队列 (queueDelete)
  7. 创建交换机 (exchangeDeclare)
  8. 销毁交换机 (exchangeDelete)
  9. 创建绑定 (queueBind)
  10. 解除绑定 (queueUnbind)
  11. 发布消息 (basicPublish)
  12. 订阅消息 (basicConsume)
  13. 确认消息 (basicAck)

2.2 消息应答

被消费者消费的消息,需要进行应答来确定我们消费者正确消费了消息。我们设置两种应答模式:

  1. 自动应答:消费者只要消费了消息,就算应答完毕。Broker直接删除这个消息。
  2. 手动应答:消费者手动调用应答接口(确认消息),Broker收到应答请求后删除这个消息。

2.3 网络通信协议设计

我们使用TCP 协议,来作为通信的底层协议。在这个基础上自定义应用层协议,实现客户端对服务器提供的功能远程调用。所以我们在协议中约定type标记调用的功能,length为消息的长度,payload为消息的具体内容。

请求和响应的格式如下:
在这里插入图片描述

其中 type取值如下:
• 0x1 创建 channel
• 0x2 关闭 channel
• 0x3 创建 exchange
• 0x4 销毁 exchange
• 0x5 创建 queue
• 0x6 销毁 queue
• 0x7 创建 binding
• 0x8 销毁 binding
• 0x9 发送 message
• 0xa 订阅 message
• 0xb 返回 ack
• 0xc 服务器给客户端推送的消息(被订阅的消息)

其中 payload 部分,会根据不同的 type,存在不同的格式。对于请求来说, payload 表示这次方法调用的各种参数信息,我们定义对应的类实现。
对于响应,payload 表示这次方法调用的返回值。

3. 开发环境

数据库:SQLite
开发语言:Java
技术框架:SpringBoot、SpringMVC、Mybatis
管理工具:Maven
开发工具:Intellij IDEA 2020.1.4
操作系统:Windows10

4. 项目结构介绍

在这里插入图片描述

  • common 包中约定通信协议包括请求响应格式以及不同的payload对应的数据格式;创建自定义异常类;实现序列化反序列化;定义消费者以及处理消息调用的函数接口。
  • demo 创建了一个消费者和生产者用于测试项目。
  • mqclient 客户端模块,提供创建连接的工厂类;定义完整连接的内容;定义channel实现客户端api
  • mqserver.core 定义了交换机、队列、消息、交换机类型、转发规则、消费消息的逻辑。
  • mqserver.datacenter 定义了交换机、队列、消息、绑定的存储以及管理。
  • mqserver.mapper:实现对sqlite数据库的操作。

4.1 配置信息

由于我们对于数据库的存储只涉及一小部分,所以此处我们利用sqlite进行数据管理。

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.14</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>mq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>mq</name>
    <description>mq</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
        <dependency>
            <groupId>org.xerial</groupId>
            <artifactId>sqlite-jdbc</artifactId>
            <version>3.41.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter-test</artifactId>
            <version>2.3.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

application.yml: 配置数据库信息
在这里插入图片描述

5. 项目演示

创建一个生产者客户端向服务器发送一个消息:

package com.example.mq.demo;

import com.example.mq.mqclient.Channel;
import com.example.mq.mqclient.Connection;
import com.example.mq.mqclient.ConnectionFactory;
import com.example.mq.mqserver.core.ExchangeType;

import java.io.IOException;

/**
 * 这个类用来表示一个生产者.
 *  通常这是一个单独的服务器程序.
 */
public class DemoProducer {
    public static void main(String[] args) throws IOException, InterruptedException {
        System.out.println("启动生产者");
        //创建一个连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);

        //得到逻辑链接,这个就类似于socket
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 通过逻辑连接创建交换机和队列
        channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
        channel.queueDeclare("testQueue", true, false, false, null);

        // 创建一个消息并发送
        byte[] body = "hello 欢迎消费消息".getBytes();
        boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);
        System.out.println("消息投递完成! ok=" + ok);

        Thread.sleep(500);
        channel.close();
        connection.close();
    }
}

创建一个消费者客户端,订阅消费消息

package com.example.mq.demo;

import com.example.mq.common.Consumer;
import com.example.mq.common.MqException;
import com.example.mq.mqclient.Channel;
import com.example.mq.mqclient.Connection;
import com.example.mq.mqclient.ConnectionFactory;
import com.example.mq.mqserver.core.BasicProperties;
import com.example.mq.mqserver.core.ExchangeType;

import java.io.IOException;

/**
 * @author zq
 * @date 2023-08-05 19:29
 */

/*
 * 这个类表示一个消费者.
 * 通常这个类也应该是在一个独立的服务器中被执行
 */
public class DemoConsumer {
    public static void main(String[] args) throws IOException, MqException, InterruptedException {
        System.out.println("启动消费者!");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
        channel.queueDeclare("testQueue", true, false, false, null);

        //订阅消息,并定义如何消费消息
        channel.basicConsume("testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                System.out.println("[消费数据] 开始!");
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("basicProperties=" + basicProperties);
                String bodyString = new String(body, 0, body.length);
                System.out.println("body=" + bodyString);
                System.out.println("[消费数据] 结束!");
            }
        });

        // 通过这个循环模拟一直等待消费完生产者生产的所有消息
        while (true) {
            Thread.sleep(500);
        }
    }
}

实现结果如下 :

在这里插入图片描述
在这里插入图片描述

通过结果我们可以看出,我们消费者成功取出订阅的队列中的消息进行消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/842937.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

有什么好用的PNG素材网站吗?看看这6个

高品质PNG素材无疑能提升网站的质量&#xff0c;给用户带来更美好的使用体验&#xff0c;今天本文会与大家分享6个好用的PNG素材网站&#xff0c;一起来看看吧&#xff01; 1、即时设计资源广场 即时设计资源广场集成了多种大厂素材&#xff0c;不只是PNG素材&#xff0c;还有…

试用AI生成代码工具Fauxpilot,详细安装过程

设置服务 预先说明 需要预先安装支持NVIDIA的docker,docker compose > 1.28不能再容器里运行&#xff0c;否则出现以下报错&#xff1a; rootc536ca0dbd64:/test/fauxpilot-main# ./setup.sh Checking for curl ... /usr/bin/curl Checking for zstd ... /opt/conda/bin…

Java-认识String

目录 一、String概念及创建 1.1 String概念 1.2 String的创建 二、String常用方法 2.1 String对象的比较 2.2 字符串查找 2.3 转化 2.4 字符串替换 2.5 字符串拆分 2.6字符串的截取 2.7 其他操作方法 2.8 字符串修改 三、面试题 一、String概念及创建 1.1 String概念 Java中…

PVE虚拟化平台之安装openKylin开源操作系统

PVE虚拟化平台之安装openKylin开源操作系统 一、openKylin介绍1.1 openKylin简介1.2 openKylin特性 二、下载openKylin系统镜像2.1 官方网址2.2 下载openKylin系统镜像 三、上传镜像到PVE存储3.1 检查PVE环境3.2 上传镜像 四、创建虚拟机4.1 设置虚拟机名称4.2 操作系统设置4.3…

【六袆 - 国际化】SpringBoot国际化Message

模拟场景校验请求参数 private void checkParam(List<ReqAppAdminDTO> req) {// 校验管理员如果已存在&#xff0c;则抛出已存在异常req.forEach(item -> {AppAdminDO appAdminDO appAdminMapper.selectByAppIdAndAdminNo(item.getAppId(), item.getAdminNo());if (O…

ubuntu上回环设备/dev/loop0占用100%清理

查看磁盘占用情况时&#xff1a; df -h/dev/loopn这些设备在Linux下被称为回环设备。 终端输入&#xff1a; sudo apt autoremove --purge snapd再次查看&#xff1a;

Android安卓实战项目(8)---自行车fitting计算软件(源码在文末)可用于比赛项目或者作业参考中

Android安卓实战项目&#xff08;8&#xff09;—自行车fitting计算软件&#xff08;源码在文末&#x1f415;&#x1f415;&#x1f415;&#xff09;可用于比赛项目或者作业参考中 【bilibili演示地址】 https://www.bilibili.com/video/BV1eu4y1B7yA/?share_sourcecopy_we…

常用抓包工具

Fiddler Fiddler 是一个很好用的抓包工具&#xff0c;可以用于抓取http/https的数据包&#xff0c;常用于Windows系统的抓包&#xff0c;它有个优势就是免费 Charles Charles是由JAVA开发的&#xff0c;可以运行在window Linux MacOS&#xff0c;但它是收费的&#xff0c;和…

大厂容器云实践之路(一)

1-华为CCE容器云实践 华为企业云 | CCE容器引擎实践 ——从IaaS到PaaS到容器集群 容器部署时代的来临 IaaS服务如日中天 2014-2015年&#xff0c;大家都在安逸的使用IaaS服务&#xff1b; 亚马逊AWS的部署能力方面比所有竞争对手…

有血有肉的PPT

1、PPT是Powerpoint缩写 2、引申的含义是Powerpoint Power(力量/能量&#xff09; Point(观点/要点) 3、用PPT做的文档是讲演稿&#xff0c;讲演的内容要有力度&#xff0c;之所以要去演讲是为了能够影响受众 4、其次演讲稿上的内容要列出要点、表明观点&#xff0c;所以一般P…

Java 并发容器和框架Fork/Join详解

目 录 一 使用场景 1 大规模数据处理 2 复杂计算 3 并行搜索 4 并行排序 二 Fork/Join框架介绍 三 Fork/Join框架模块 四 Fork/Join框架核心思想 1分治思想(Divide-and-Conquer) 2 work-stealing(工作窃取)算法 五 Fork/Join框架执行流程 1 实现原理&#xff1a; 2…

Vue3 第二节 Vue3的响应式

1.Vue3的响应式原理 2.ref函数和reactive函数的对比 3.setup注意点 一.Vue3的响应式原理 1.Vue2.x中的响应式原理 ① 实现原理 对象类型&#xff1a;通过Object.defineProperty() 对属性的读取&#xff0c;修改进行拦截&#xff08;数据劫持&#xff09;数组类型&#xf…

上位机是什么?有什么实际用途?

上位机是指控制、监测或管理下位机的计算机系统&#xff0c;也可以称为主机。它通常用于工业自动化、机器人控制、数据采集和处理等领域。在工业自动化中&#xff0c;上位机负责向下位机下发指令并获取反馈信息&#xff0c;以控制生产流程。在机器人控制中&#xff0c;上位机负…

详细教程:如何搭建废品回收小程序

废品回收是一项环保举措&#xff0c;通过回收和再利用废弃物品&#xff0c;可以减少资源浪费和环境污染。近年来&#xff0c;随着智能手机的普及&#xff0c;小程序成为了推广和运营的重要工具。本文将详细介绍如何搭建一个废品回收小程序。 1. 进入乔拓云网后台 首先&#xf…

Maven: ‘mvn‘ is not recognized as an internal or external command

下载并配置好Maven之后&#xff0c;CMD测试安装是否成功&#xff1a;mvn -v 提示&#xff1a; mvn is not recognized as an internal or external command, operable program or batch file. 检查环境变量&#xff1a; MAVEN_HOME: %MAVEN_HOME%\bin: 看上去没问题&#x…

pinctrl设备树节点映射详细分析imx_dt_node_to_map

pinctrl设备树节点映射详细分析imx_dt_node_to_map 文章目录 pinctrl设备树节点映射详细分析imx_dt_node_to_mapstruct pinctrl_mapreally_probepinctrl_bind_pinscreate_pinctrlpinctrl_dt_to_mapdt_to_map_one_configdt_remember_or_free_mappinctrl_register_map add_settin…

Linux网络服务之部署yum仓库

yum &#xff1f; yum ! 一、YUM概述1.1 yum简介1.2 yum工作原理 二、yum 配置文件2.1 yum主配置文件2.2 yum仓库设置文件2.2.1 配置文件主要格式2.2.2 软件仓库的提供方式2.2.3 日志文件 三、yum命令详解3.1 安装和升级3.2 查询3.2.1 显示可用的安装包 ----- yum list3.2.2 显…

数据结构笔记--归并排序及其拓展题(小和问题、逆序对问题)

目录 1--归并排序 2--小和问题 3--逆序对问题 1--归并排序 归并排序的核心思想&#xff1a;将一个无序的序列归并排序为一个有序的系列&#xff1b;通过递归将无序的序列二分&#xff0c;从底层开始将二分的序列归并排序为有序序列&#xff1b; #include <iostream> #…

手工测试VS自动化测试到底那个更胜一筹?

手工与自动化只是一种形式&#xff0c;真正的核心是测试用例、业务模型和测试分析。当企业的产品规模开始膨胀的时候&#xff0c;尤其是产品迭代加快是不是能及时得到测试验证支持是很重要的。这些靠手工测试是基本无法实现的&#xff0c;手工测试会严重的拖慢产品进度&#xf…

快速排序和qsort函数详解详解qsort函数

&#x1f495;是非成败转头空&#xff0c;青山依旧在&#xff0c;几度夕阳红&#x1f495; 作者&#xff1a;Mylvzi 文章主要内容&#xff1a;快速排序和qsort函数详解 前言&#xff1a; 我们之前学习过冒泡排序&#xff0c;冒泡排序尽管很方便&#xff0c;但也存在一些局限性…