模拟实现消息队列项目(完结) -- 基于MQ的生产者消费者模型

news2025/1/9 5:22:58

目录

前言

1. 生产者

2. 消费者

3. 启动消息队列服务器

4. 运行效果

 结语


前言

        在上一章节,我们完成了消息队列的客户端部分,至此我们整个消息队列项目就构建完成了,那我们做的这个消息队列到底有什么效果,以及如何去使用我们自己的消息队列呢?那么本文,就将我们的MQ进行实战操作,写一个基于MQ的生产者消费者模型.本项目全部代码已上传Gitee,链接放在文章末尾,欢迎大家访问!


1. 生产者

我们的生产者就是一个客户端,需要将自己生产出来的消息发送到消息队列中,供消费者进行使用.

我们创建一个生产者,在服务器端创建交换机(直接),队列,然后往对应的队列进行投递消息.

1. 实例化创建连接的工厂类

2. 设置消息队列服务器的IP地址以及端口号

3. 新建一个连接,创建Channel,交换机,队列

4. 新建一个消息转换成字节文件进行发送,此时给线程一个休眠的时间,确保已经发送到消息队列服务器

5. 关闭通道,关闭连接

package com.example.demo.demo;

import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqserver.core.ExchangeType;

import java.io.IOException;

/**
 * Created with IntelliJ IDEA.
 * Description:生产者  通常是一个单独的服务器程序
 * User: YAO
 * Date: 2023-08-03
 * Time: 16:06
 */
public class DemoProducer {
    public static void main(String[] args) throws IOException, InterruptedException {
        System.out.println("启动生产者");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 创建交换机和队列
        channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
        channel.queueDeclare("testQueue", true, false, false, null);

        // 创建一个消息并发送
        byte[] body = "hello".getBytes();
        boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);
        System.out.println("消息投递完成! ok=" + ok);

        Thread.sleep(500);
        channel.close();
        connection.close();
    }
}

2. 消费者

消费者也是客户端,所做的前期工作是一样的,只不过是发送的请求不同.

1. 消费者需要进行订阅消息,接收到消息之后,执行回调进行消费消息.

2. 消费者需要循环等待消息队列的响应,等待消费.

package com.example.demo.demo;

import com.example.demo.common.Consumer;
import com.example.demo.common.MqException;
import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqserver.core.BasicProperties;
import com.example.demo.mqserver.core.ExchangeType;

import java.io.IOException;
/**
 * Created with IntelliJ IDEA.
 * Description:消费者  通常是一个单独的服务器程序
 * User: YAO
 * Date: 2023-08-03
 * Time: 16:07
 */
public class DemoConsumer {
    public static void main(String[] args) throws MqException, InterruptedException, IOException {
        System.out.println("启动消费者!");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
        channel.queueDeclare("testQueue", true, false, false, null);

        channel.basicConsume("testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                System.out.println("[消费数据] 开始!");
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("basicProperties=" + basicProperties);
                String bodyString = new String(body, 0, body.length);
                System.out.println("body=" + bodyString);
                System.out.println("[消费数据] 结束!");
            }
        });

        // 由于消费者也不知道生产者要生产多少, 就在这里通过这个循环模拟一直等待消费.
        while (true) {
            Thread.sleep(500);
        }
    }
}

3. 启动消息队列服务器

在Spring Boot 项目的启动类中,实例化Broker Server,传入端口号,进行启动服务器.

package com.example.demo;

import com.example.demo.mqserver.BrokerServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

import java.io.IOException;

@SpringBootApplication
public class DemoApplication {

	public static ConfigurableApplicationContext context;
	public static void main(String[] args) throws IOException {
		context = SpringApplication.run(DemoApplication.class, args);
		BrokerServer brokerServer = new BrokerServer(9090);
		brokerServer.start();
	}

}

4. 运行效果

1. 服务器启动:

2. 此时如果再重启服务器,会提示数据库已经存在,就会将数据恢复到内存

3. 启动生产者进行投递消息

上述就是按照我们自定义的应用层协议进行发送请求. 

我们再来看服务器这边的日志:

4. 启动消费者进行消费消息 

 我们再来看服务器这边日志


 结语

         以上就是一个简单的Demo,实现了基于MQ的生产者消费者模型.其他的功能,大家可以在做完这个项目之后自行进行测试.至此这个消息队列的项目就全部完结了,内容还是很多的,希望可以通过这个系列能够帮助到大家去了解消息队列的实现原理.也希望大家能够有所收获,那就到这里吧.接下来就要开始新的项目了(实现论坛系统),又是一个挑战,我们一起加油!❤️

        完整的项目代码已上传Gitee,欢迎大家访问.👇👇👇

模拟实现消息队列icon-default.png?t=N6B9https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/853019.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

GSEA富集分析结果详解

1. GSEA富集分析原理图 2. GSEA富集分析过程 1. 计算富集分数(ES) 富集分数:S 反应基因集(比如某个通路内的基因集)成员 s 在排序基因集 L(比如根据 logFC 排序的差异基因集,默认降序&#xf…

“为爱起航,一村一书院”在阳朔落地

2023年8月1-5 日,“关爱祖国下一代,助力乡村振兴” 之为爱起航项目在阳朔举行。 本次活动由千里思乡村振兴促进会联合中国文化交流大使组委会携同大湾区19位师生加入到首批“为爱起航,一村一书院”项目中,同时,本项目得…

分页查询从接口到实现,统一对日期类型进行格式化处理

编写Service实现类编写Mapper的sql&#xff0c;但复杂的sql语句需要写到mapper对应的xml文件中日期类型格式化处理 /*** 扩展springmvc框架的消息转换器* param converters*/Overrideprotected void extendMessageConverters(List<HttpMessageConverter<?>> conve…

初识Container

1. 什么是Container&#xff08;容器&#xff09; 要有Container首先要有Image&#xff0c;也就是说Container是通过image创建的。 Container是在原先的Image之上新加的一层&#xff0c;称作Container layer&#xff0c;这一层是可读可写的&#xff08;Image是只读的&#xff0…

天津农商银行智能加密锁管理工具常见问题

天津农商银行智能加密锁管理工具&#xff0c;在使用过程中&#xff0c;可能出现一些莫名的错误&#xff0c;针对亲身遇到的坑&#xff0c;分享给大家&#xff0c;以备不时之需。 一、转账业务导入文件中文汉字出现乱码&#xff0c;如下图。 原因是文件编码不正确&#xff0c;…

MySQL:表的约束和基本查询

表的约束 表的约束——为了让插入的数据符合预期。 表的约束很多&#xff0c;这里主要介绍如下几个&#xff1a; null/not null,default, comment, zerofill&#xff0c;primary key&#xff0c;auto_increment&#xff0c;unique key 。 空属性 两个值&#xff1a;null&am…

【设计模式——学习笔记】23种设计模式——备忘录模式Memento(原理讲解+应用场景介绍+案例介绍+Java代码实现)

案例引入 游戏角色有攻击力和防御力&#xff0c;在大战Boss前保存自身的状态(攻击力和防御力)&#xff0c;当大战Boss后攻击力和防御力下降&#xff0c;可以从备忘录对象恢复到大战前的状态 传统设计方案 针对每一种角色&#xff0c;设计一个类来存储该角色的状态 【分析】…

cpu util margin,cpu freq margin

【cpufreq governor】cpu util 和 cpu margin怎么计算的_悟空明镜的博客-CSDN博客 cpu util margin&#xff0c;cpu freq margin 根据policy_util schedtune_margin 作为算力选对应的cpu cluster或调频

EXCEL表格操作

1.带格式合并&#xff1a;D6&"欢迎光临"&E6 2.带格式复制粘贴&#xff1a;ctrlc 复制&#xff0c;选择对于单元格点击选择性粘贴&#xff1a;粘贴值和数字格式

docker-compose 安装kafka集群

点击关注《golang技术实验室》公众号****&#xff0c;将****获取更多干货 介绍 Kafka是一种高性能的分布式流处理平台&#xff0c;它的集群工作原理如下&#xff1a; 假设你是一个快递员&#xff0c;Kafka集群就是一个快递中转站。在这个中转站中&#xff0c;有很多个小窗口…

基于TF-IDF+TensorFlow+词云+LDA 新闻自动文摘推荐系统—深度学习算法应用(含ipynb源码)+训练数据集

目录 前言总体设计系统整体结构图系统流程图 运行环境Python 环境TensorFlow环境方法一方法二 模块实现1. 数据预处理1&#xff09;导入数据2&#xff09;数据清洗3&#xff09;统计词频 2. 词云构建3. 关键词提取4. 语音播报5. LDA主题模型6. 模型构建 系统测试工程源代码下载…

分布式 - 消息队列Kafka:Kafka生产者发送消息流程和3种方式

文章目录 1. Kafka 生产者2. kafaka 命令行操作3. Kafka 生产者发送消息流程4. Kafka 生产者发送消息的3种方式1. 发送即忘记2. 同步发送3. 异步发送 5. Kafka 消息对象 ProducerRecord 1. Kafka 生产者 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序&#xff0c;它们…

wsl(在windows中使用呢linux系统)适用于windows的linux子系统

步骤可参考微软官方文档https://learn.microsoft.com/zh-cn/windows/wsl/install-manual#step-4—download-the-linux-kernel-update-package 在这里主要列举一些需要注意的点 wsl2的要求 一定要检查下windows版本&#xff0c;版本不对的先升级版本不然无法使用wsl2 wsl支持…

P4381 [IOI2008] Island (求基环树直径)

也许更好的阅读体验 D e s c r i p t i o n \mathcal{Description} Description 给一个基环树森林&#xff0c;求每棵树的直径的和&#xff0c;基环树的直径定义为&#xff0c;从一个点出发只能走到没走过的点&#xff08;即一个环不能把所有边都选&#xff09;&#xff0c;所经…

史上最细,自动化测试-logging日志采集详细实战(二)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、测试场景 给登…

固态硬盘数据恢复方法有哪些?三种恢复方法助您解忧

近年来固态硬盘比较流行&#xff0c;因为工作的需要我也在使用固态硬盘&#xff0c;它真的给我带来了很多的方便。但是最近&#xff0c;我固态硬盘里的文件有些不知道怎么就丢失了&#xff0c;这给我带来了很大的困扰。有什么方法可以找回来吗&#xff1f; 固态硬盘&#xff08…

Netty客户端同步获取结果

上次服务间通信是异步的&#xff0c;现在想实现客户端同步拿到服务端响应结果。实现如下&#xff1a; 在NettyClientHandler类中增加一个结果缓存器 Map<Long,Protocol<ResponseMsg>> resultMap new ConcurrentHashMap<>();修改方法 Override protected vo…

【文献阅读笔记】深度异常检测模型

文章目录 导读相关关键词及其英文描述记录深度异常检测模型Supervised deep anomaly detection 有监督深度异常检测Semi-Supervised deep anomaly detection 半监督深度异常检测Hybrid deep anomaly detection 混合深度异常检测One-class neural network for anomaly detection…

VR全景的盈利模式你知道吗?VR全景能用在哪些领域?

引言&#xff1a; 随着科技的迅猛发展&#xff0c;虚拟现实技术已经逐渐走进我们的生活。这项令人惊叹的技术让我们能够穿越时间与空间的限制&#xff0c;重新定义人们与世界互动的方式。 一&#xff0e;什么是VR全景&#xff1f; VR全景&#xff0c;是一种通过虚拟现实技术&…

万应低代码受邀参加上海电信“大干一场 科创沙龙”活动

7月28日&#xff0c;由上海市宝山区大场镇政府指导、中国电信上海北区局主办的“大干一场 科创沙龙”系列第九期沙龙活动顺利举办。大场镇“数字化转型”领导小组办公室&#xff08;以下简称“数字办”&#xff09;邀请了来自镇域内外的数十家科创服务企业。万应低代码作为天翼…