目录
前言
1. 生产者
2. 消费者
3. 启动消息队列服务器
4. 运行效果
结语
前言
在上一章节,我们完成了消息队列的客户端部分,至此我们整个消息队列项目就构建完成了,那我们做的这个消息队列到底有什么效果,以及如何去使用我们自己的消息队列呢?那么本文,就将我们的MQ进行实战操作,写一个基于MQ的生产者消费者模型.本项目全部代码已上传Gitee,链接放在文章末尾,欢迎大家访问!
1. 生产者
我们的生产者就是一个客户端,需要将自己生产出来的消息发送到消息队列中,供消费者进行使用.
我们创建一个生产者,在服务器端创建交换机(直接),队列,然后往对应的队列进行投递消息.
1. 实例化创建连接的工厂类
2. 设置消息队列服务器的IP地址以及端口号
3. 新建一个连接,创建Channel,交换机,队列
4. 新建一个消息转换成字节文件进行发送,此时给线程一个休眠的时间,确保已经发送到消息队列服务器
5. 关闭通道,关闭连接
package com.example.demo.demo;
import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqserver.core.ExchangeType;
import java.io.IOException;
/**
* Created with IntelliJ IDEA.
* Description:生产者 通常是一个单独的服务器程序
* User: YAO
* Date: 2023-08-03
* Time: 16:06
*/
public class DemoProducer {
public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("启动生产者");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 创建交换机和队列
channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
channel.queueDeclare("testQueue", true, false, false, null);
// 创建一个消息并发送
byte[] body = "hello".getBytes();
boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);
System.out.println("消息投递完成! ok=" + ok);
Thread.sleep(500);
channel.close();
connection.close();
}
}
2. 消费者
消费者也是客户端,所做的前期工作是一样的,只不过是发送的请求不同.
1. 消费者需要进行订阅消息,接收到消息之后,执行回调进行消费消息.
2. 消费者需要循环等待消息队列的响应,等待消费.
package com.example.demo.demo;
import com.example.demo.common.Consumer;
import com.example.demo.common.MqException;
import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqserver.core.BasicProperties;
import com.example.demo.mqserver.core.ExchangeType;
import java.io.IOException;
/**
* Created with IntelliJ IDEA.
* Description:消费者 通常是一个单独的服务器程序
* User: YAO
* Date: 2023-08-03
* Time: 16:07
*/
public class DemoConsumer {
public static void main(String[] args) throws MqException, InterruptedException, IOException {
System.out.println("启动消费者!");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
channel.queueDeclare("testQueue", true, false, false, null);
channel.basicConsume("testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
System.out.println("[消费数据] 开始!");
System.out.println("consumerTag=" + consumerTag);
System.out.println("basicProperties=" + basicProperties);
String bodyString = new String(body, 0, body.length);
System.out.println("body=" + bodyString);
System.out.println("[消费数据] 结束!");
}
});
// 由于消费者也不知道生产者要生产多少, 就在这里通过这个循环模拟一直等待消费.
while (true) {
Thread.sleep(500);
}
}
}
3. 启动消息队列服务器
在Spring Boot 项目的启动类中,实例化Broker Server,传入端口号,进行启动服务器.
package com.example.demo;
import com.example.demo.mqserver.BrokerServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import java.io.IOException;
@SpringBootApplication
public class DemoApplication {
public static ConfigurableApplicationContext context;
public static void main(String[] args) throws IOException {
context = SpringApplication.run(DemoApplication.class, args);
BrokerServer brokerServer = new BrokerServer(9090);
brokerServer.start();
}
}
4. 运行效果
1. 服务器启动:
2. 此时如果再重启服务器,会提示数据库已经存在,就会将数据恢复到内存
3. 启动生产者进行投递消息
上述就是按照我们自定义的应用层协议进行发送请求.
我们再来看服务器这边的日志:
4. 启动消费者进行消费消息
我们再来看服务器这边日志
结语
以上就是一个简单的Demo,实现了基于MQ的生产者消费者模型.其他的功能,大家可以在做完这个项目之后自行进行测试.至此这个消息队列的项目就全部完结了,内容还是很多的,希望可以通过这个系列能够帮助到大家去了解消息队列的实现原理.也希望大家能够有所收获,那就到这里吧.接下来就要开始新的项目了(实现论坛系统),又是一个挑战,我们一起加油!❤️
完整的项目代码已上传Gitee,欢迎大家访问.👇👇👇
模拟实现消息队列https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq