RabbitMQ 快速入门-消息的收发
- 准备工作
- 一、Connection 方式
- 1. 生产者测试类
- 2. 消费者测试类
- 注意
- 二、RabbitTemplate 方式
- 1. 生产者测试类
- 2. 创建队列
- 3. 消费者
- 注意
准备工作
推荐创建两个 SpringBoot 项目,一个作为生产者,另一个作为消费者
也可使用 Maven 的继承聚合模式管理两个项目
项目中需要引入下面的依赖
<!-- AMQP 依赖,包含了 RabbitMQ 的相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
为便于运行,在测试类中编写代码对消息队列进行操作
一、Connection 方式
RabbitMQ 中有几种概念,分别是:虚拟主机(virtualHost),通道(channel),队列(queue),还有一个交换机(exchanges)的概念在之后会遇到
服务先与虚拟主机建立连接,然后创建通道,声明或创建队列之后发送或接收消息,消息最终会在队列中传输
下面使用 connection 的方式来实现接发消息,以便于理解 RabbitMQ 的模式(不常用到,了解即可)
1. 生产者测试类
@SpringBootTest
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.0.102");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道 Channel
Channel channel = connection.createChannel();
// 3. 声明队列(不存在则创建)
String queueName = "simple.queue"; // 队列名
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:" + message);
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
2. 消费者测试类
@SpringBootTest
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1 设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.0.102");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
// 1.2 建立连接
Connection connection = factory.newConnection();
// 2. 创建通道 Channel
Channel channel = connection.createChannel();
// 3. 声明队列(不存在则创建)
String queueName = "simple.queue"; // 队列名
channel.queueDeclare(queueName, false, false, false, null);
// 4. 获取消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:" + message);
}
});
System.out.println("等待接收消息......");
}
}
我们可以在 RabbitMQ 的管理页面查看相关信息:
注意
- 消费者接收消息是异步过程,而不会阻塞主线程
- queueDeclare 在不存在该队列时会创建队列,否则不创建
- 队列不会被自动删除,可以在管理页删除(点击队列名称,点击 Delete 选项)
- 消息只会被读取一次,未被读取的消息存放在队列中等待被消费
- 上例消费者没有关闭通道和连接的操作,不会只读取一条消息,而是一直等待不停读取
- RabbitMQ 重启后,队列因未持久化被删除,将 queueDeclare 第二个参数改为 true 以创建持久化队列(已存在的队列不可更改)
- RabbitMQ 重启后,虽有队列但消息没了,因为消息未持久化,发送消息时将 basicPublish 方法第三个参数改为
MessageProperties.PERSISTENT_TEXT_PLAIN
以持久化消息
持久化的队列在 Features 栏会有字母 D 标示,如图:
有持久化的消息可以看到 Properties 信息,未持久化则没有,如图:
二、RabbitTemplate 方式
上面的例子可以看出,大多数代码是重复的,所以 SpringAMQP 中封装了 RabbitTemplate 以便于进行消息队列的操作
首先在项目 yaml 配置文件中假如 RabbitMQ 的连接相关配置
spring:
rabbitmq:
host: 192.168.0.102 # RabbitMQ 服务 ip 地址
port: 5672 # 消息服务端口
username: root # 用户名
password: "123456" # 密码
virtual-host: / #虚拟主机
然后就能自动装配 RabbitTemplate 类了
1. 生产者测试类
@RunWith(SpringRunner.class)
@SpringBootTest()
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue() {
String queueName = "simple.queue";
String message = "Hello, springAMQP!";
rabbitTemplate.convertAndSend(queueName, message);
}
}
只需调用 convertAndSend 方法即可发送消息
注意:此操作不会创建队列,如果队列不存在则没有效果
2. 创建队列
若要创建队列,需要声明一个 Queue 类型的 bean 并受到 Spring 的管理
通常放在一个 Configuration 配置类中,示例如下:
@Configuration
public class RabbitMqConfig {
@Bean
public Queue simpleQueue() {
return new Queue("simple.queue"); // 队列名与函数名无关
}
}
如此启动项目时,bean 被创建,就会创建一个队列(若已存在则不再创建)
3. 消费者
消费者不再在测试类中演示,而是使用监听队列的方式
只需在一个方法上注解 @RabbitListener,并指定队列名
同时方法所在的类也要被 Spring 管理(注解 @Component)
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
System.out.printf("消费者接收到 simple.queue 的消息:【 %s 】\n", message);
}
}
启动项目即可监听队列并处理接收到的消息
注意:如果监听的队列名不存在,则会报错Failed to declare queue(s):[simple.queue]
,解决方法同前面的配置里创建队列
注意
- 此方式创建的队列默认持久化
- 此方式生产的消息默认持久化