5.与SpringBoot整合
5.1.SpringBoot项目中配置环境
5.1.1.pom.xml配置依赖
在 pom.xml 配置文件中声明依赖, 通过Maven导入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
5.1.2.配置服务器地址
在配置文件中 application.properties 增加 配置信息
其中 账号/密码是 启动RabbitMQ时, 设置的
spring.rabbitmq.host=192.168.3.251(Rabbitmq 服务器的IP)
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
5.2.在配置类中声明队列
通过 new org.springframework.amqp.core.Queue() 创建 队列, 传入队列的 name 属性
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Rabbit 配置类
* 配置 队列 交换机 及 绑定关系
*/
@Configuration
public class RabbitConfig {
@Bean
public Queue stockQueue() {
// 定义 名称为 'fivemall.stock' 的队列
return new Queue("fivemall.stock");
}
@Bean
public Queue chunQueue() {
// 定义 名称为 'yuan.chun.admin' 的队列
return new Queue("yuan.chun.admin");
}
观察操作界面可以看到新的队列
5.3.发送消息
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 消息生产者 Producer
*/
@Component
public class FivemallSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String msg = "to fivemall.stock : " + new Date();
System.out.println("发送消息 : " + msg);
// 指明 接收消息的 队列名称 为 "fivemall.stock"
this.rabbitTemplate.convertAndSend("fivemall.stock", msg);
}
}
5.4.接收消息
@RabbitListener(queues = “fivemall.stock”)
指定 监听的队列 如: “fivemall.stock” , 也可以 将注解放到 类 上
@RabbitHandler
接收消息处理方法
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FivemallReceiver {
@RabbitListener(queues = "fivemall.stock")
@RabbitHandler
public void goodsProcess(Object message) {
System.out.println("message. = " + message.getClass().getName());
System.out.println("fivemall.stock 队列 接收消息 : " + message);
}
@RabbitListener(queues = "yuan.chun.admin")
@RabbitHandler
public void process(Object message) {
System.out.println("message. = " + message.getClass().getName());
System.out.println("yuan.chun.admin 队列 接收消息 : " + message);
}
}
5.5.测试
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
private FivemallSender fivemallSender;
@RequestMapping("/testDirect")
public void testDirect() throws Exception {
fivemallSender.send();
}
}
控制台查看结果, 可以看到接收到参数类型为 org.springframework.amqp.core.Message
同时可以看到 , 虽然监听了两个队列但只有一个队列接收到信息
因为 发送消息时 指定了队列
这里默认的交换机 相当于 直接交换机 Direct Exchange
发送消息 : to fivemall.stock : Sat Feb 18 12:30:45 CST 2023
message. = org.springframework.amqp.core.Message
fivemall.stock 队列 接收消息 : (Body:'to fivemall.stock : Sat Feb 18 12:30:45 CST 2023' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=fivemall.stock, deliveryTag=1, consumerTag=amq.ctag-s2X0i0wE1_VuxMkqyPF0WQ, consumerQueue=fivemall.stock])