消息队列服务器核心功能就是,提供了虚拟主机,交换机, 队列,消息等概念的管理,实现三种典型的消息转发方式,可以实现跨主机/服务器之间的生产者消费模型。
这里,就编写一个demo,实现跨主机的生产者消费者模型。
🍅 完善服务器的启动类
@SpringBootApplication
public class TigerMqApplication {
public static ConfigurableApplicationContext context;
public static void main(String[] args) throws IOException {
context = SpringApplication.run(TigerMqApplication.class, args);
BrokerServer brokerServer = new BrokerServer(9090);
brokerServer.start();
}
}
🍅 创建Demo程序
/*
* 表示一个生产者
* */
public class DemoProducer {
public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("启动生产者");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 创建交换机和队列
channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null );
channel.queueDeclare("testQueue",true,false,false,null);
// 创建一个消息并且发送
byte[] body = "hello,TigerMQ".getBytes();
boolean ok = channel.basicPublish("testExchange","testQueue",null,body);
System.out.println("消息投递完成!ok = " + ok);
Thread.sleep(500);
channel.close();
connection.close();
}
}
/*
* 表示一个消费者
* */
public class DemoConsumer {
public static void main(String[] args) throws IOException, MqException, InterruptedException {
System.out.println("启动消费者!");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);
channel.queueDeclare("testQueue",true,false,false,null);
channel.basicConsume("testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
System.out.println("[消费数据]开始!");
System.out.println("consumerTag = " + consumerTag);
System.out.println("basicProperties = " + basicProperties);
String bodyString = new String(body,0, body.length);
System.out.println("body = " + bodyString);
System.out.println("[消费数据]结束!");
}
});
// 一直等待消费
while (true){
Thread.sleep(500);
}
}
}
🍅 启动服务器和客户端程序
启动服务器
启动生产者和消费者
启动生产者
[Connection] 发送请求! type=1, length=188
[Connection] 收到响应! type=1, length=192
[Connection] 发送请求! type=3, length=512
[Connection] 收到响应! type=3, length=192
[Connection] 发送请求! type=5, length=349
[Connection] 收到响应! type=5, length=192
[Connection] 发送请求! type=9, length=437
[Connection] 收到响应! type=9, length=192
消息投递完成!ok = true
[Connection] 发送请求! type=2, length=188
[Connection] 收到响应! type=2, length=192
[Connection] 连接正常断开!
Process finished with exit code 0
启动消费者!
[Connection] 发送请求! type=1, length=188
[Connection] 收到响应! type=1, length=192
[Connection] 发送请求! type=3, length=512
[Connection] 收到响应! type=3, length=192
[Connection] 发送请求! type=5, length=349
[Connection] 收到响应! type=5, length=192
[Connection] 发送请求! type=10, length=315
[Connection] 收到响应! type=10, length=192
[Connection] 收到响应! type=12, length=528
[消费数据]开始!
consumerTag = C-4e9d5324-c197-462a-a0a5-31ffe3bf929a
basicProperties = BasicProperties(messageId=M-69e805c0-8298-4e8f-b737-001c340e18d5, routingKey=testQueue, deliverMode=1)
body = hello,TigerMQ
[消费数据]结束!