环境的搭建
这里是用Maven工程搭建的基础环境项目,这里的dome_rabbitmq就是父工程。
子工程
- publisher:消息发布者,将消息发送到队列queue
- consumer:订阅队列,处理队列中的消息
父工程的pom文件依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>mq_dome</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>publisher</module>
<module>consumer</module>
</modules>
<packaging>pom</packaging>
<!-- springboot父类依赖 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.10</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- 消息转化时候使用 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>
publisher实现
思路:
- 建立连接
- 创建Channel
- 声明队列
- 发送消息
- 关闭连接和channel
代码实现:
public class FirsExample {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 隔离环境
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setHost("192.168.103.100");
factory.setPort(5672);
// 获取连接
Connection connection = factory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 发送信息
for (int i = 0; i < 100; i++) {
channel.basicPublish("","basic.queue.xx",null,("8888776"+i+"").getBytes(StandardCharsets.UTF_8));
}
// 释放资源
channel.close();
connection.close();
}
}
consumer实现
代码思路:
- 建立连接
- 创建Channel
- 声明队列
- 订阅消息
代码实现:
public class FirsExample {
public static void main(String[] args) throws Exception {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 隔离环境
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setHost("192.168.103.100");
factory.setPort(5672);
// 获取连接
Connection connection = factory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 发送信息
channel.basicConsume("basic.queue.xx",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("body="+new String(body));
}
});
释放资源
// channel.close();
// connection.close();
}
}
总结
基本消息队列的消息发送流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 利用channel向队列发送消息
基本消息队列的消息接收流程: - 建立connection
- 创建channel
- 利用channel声明队列
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定