SpringBoot整合RabbitMQ学习笔记
以下三种类型的消息,生产者和消费者需各自启动一个服务,模拟生产者服务发送消息,消费者服务监听消息,分布式开发。
一 Fanout类型信息
- . RabbitMQ创建交换机和队列
在RabbitMQ控制台,新建交换机hmall.fanout,新建两个队列,fanout.queue1和fanout.queue2,并将连个队列和交换机进行绑定即可。
操作如下图所示:
一下操作可以通过代码实现,具体参考配置类
(1)创建队列
(2)创建交换机
(3)绑定
2. 代码实现
(1)引入依赖
<dependency>
<groupId>org.springframework.book</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)配置MQ配置信息
spring:
rabbitmq:
host: 192.168.150.101 #主机ip
port: 5672 #端口
virtual-host: /hmall #虚拟主机
username: hmall #用户名
password: 123 #密码
exchange: hmall.fanout
producer:
queue1: fanout.queue1
(3)声明队列和交换机配置类
@Component
public class FanoutConfg{
@Value("${spring.rabbitmq.exchange}")
private String exchange
@Value("${spring.rabbitmq.producer.queue1}")
private String queueName1
// 声明fanout交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(exchanage);
}
// 声明队列
@Bean
public Queue fanoutQueue1(){
return new Queue(queueName1);
}
//绑定队列和交换机
@Bean
public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
return BindingBuilder,build(fanoutQueue1).to(fanoutExchange);
}
}
(4)生产者
@Component
public class RabbitMqProduce {
@Autowired
private RabbitTemplate rabbitTemplete;
@value("${spring.rabbitmq.producer.queue})
private String queueName;
/**
* 入参说明:
* 第一个参数:queueName:队列名称
* 第二个参数:路由键,fanout类型不需要路由键
* 第三个参数:msg 消息题内容
*/
public void send(String msg){
rabbitTemplete.covertAndSend(queueName,null,msg);
}
}
(4)消费者
@Component
public class RabbitMqListener {
@RabbitListener(queues="${spring.rabbitmq.producer.queue}")
public void counsume(String msg){
System.out.pringln("消费者收到 fanout.queue队列发的消息",msg);
}
}
(5)测试类
@SpringBootTest
public class SpringBootTest{
@AUtowired
private RabbitMqProduce producer;
@Test
public void testSendFanoutMsg(){
producer.send("fanout类型发送消息!!!");
}
}
二 direct类型发送消息
- 控制台操作
(1)交换机和队列的创建参考fanout的操作
(2)绑定:与fanout不同的是 给交换机绑定队列的同时需要指定路由键,如下图所示:
- 代码实现
(1)依赖引入参考fanout类型的消息
(2)mq消息配置
spring:
rabbitmq:
host: 192.168.150.101 #主机ip
port: 5672 #端口
virtual-host: /hmall #虚拟主机
username: hmall #用户名
password: 123 #密码
exchange: hmall.direct
producer:
queue1: direct.queue1
queue2: direct.queue2
routingKey1: red
routingKey2: red2
(3)MQ配置类
以下配置可以在消费者注解上实现
@Component
public class RabbitMqConfig {
@Autowired
private RabbitTemplate rabbitTemplete;
@value("${spring.rabbitmq.producer.exchange}")
private String exchange;
@value("${spring.rabbitmq.producer.queue1}")
private String queueName1;
@value("${spring.rabbitmq.producer.queue2}")
private String queueName2;
@value("${spring.rabbitmq.producer.routingKey1}")
private String routingKey1;
@value("${spring.rabbitmq.producer.routingKey2}")
private String routingKey2;
// 创建交换机
@Bean("directExchange")
public Exchange getExchange(){
return ExchangeBuilder
.topicExchange(exchange) // 交换机类型,交换机名称
.durable(true) //ture为持久化,存到磁盘,false存到内存
.build();
}
// 创建队列
@Bean("directQueue1")
public Queue getDirectQueue1(){
retuen new Queue(queueName1);
}
// 交换机绑定队列
@bean
public Binging bindDirectQueue1(@Qualifier("directExchange") Exchange exchange,@Qualifier("directQueue1") Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with(routingKey1)
.noargs();
}
// 创建队列
@Bean("directQueue2")
public Queue getDirectQueue2(){
retuen new Queue(queueName2);
}
// 交换机绑定队列
@bean
public Binging bindDirectQueue2(@Qualifier("directExchange") Exchange exchange,@Qualifier("directQueue2") Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with(routingKey2)
.noargs();
}
}
(4)生产者发送消息
@Component
public class RabbitMqProduce {
@Autowired
private RabbitTemplate rabbitTemplete;
@value("${spring.rabbitmq.producer.queue1})
private String queueName1;
@value("${spring.rabbitmq.producer.queue2})
private String queueName2;
@value("${spring.rabbitmq.producer.routingKey2})
private String routingKey1;
@value("${spring.rabbitmq.producer.routingKey1})
private String routingKey2;
/**
* 入参说明:
* 第一个参数:queueName:队列名称
* 第二个参数:路由键,fanout类型不需要路由键
* 第三个参数:msg 消息题内容
*/
public void sendQueue1(String msg){
rabbitTemplete.covertAndSend(queueName1,routingKey1,msg);
}
public void sendQueue2(String msg){
rabbitTemplete.covertAndSend(queueName2,routingKey2,msg);
}
}
(5)消费者监听消息
第一种:已经编写了配置类
@Component
public class RabbitMqListener {
@RabbitListener(queues="${spring.rabbitmq.producer.queue1}")
public void counsume(String msg){
System.out.pringln("消费者收到 direct.queue1队列发的消息",msg);
}
@RabbitListener(queues="${spring.rabbitmq.producer.queue2}")
public void counsume(String msg){
System.out.pringln("消费者收到 direct.queue2队列发的消息",msg);
}
}
第二种:在注解上配置交换机和队列以及路由键
@Component
public class RabbitMqListener {
@RabbitListener(bindings =
@QueueBinding(
value = Queue(name="${spring.rabbitmq.producer.queue1}",durable="true"),
exchange = @Exchange(name="${spring.rabbitmq.producer.exchange)",type=ExchangeType.DIRECT),
key = {"${spring.rabbitmq.producer.routingKey1}","${spring.rabbitmq.producer.routingKey2}"}
))
public void counsume(String msg){
System.out.pringln("消费者收到 direct.queue1队列发的消息",msg);
}
@RabbitListener(bindings =
@QueueBinding(
value = Queue(name="${spring.rabbitmq.producer.queue2}",durable="true"),
exchange = @Exchange(name="${spring.rabbitmq.producer.exchange)",type=ExchangeType.DIRECT),
key = {"${spring.rabbitmq.producer.routingKey1}","${spring.rabbitmq.producer.routingKey2}"}
))
public void counsume(String msg){
System.out.pringln("消费者收到 direct.queue2队列发的消息",msg);
}
}
(6)测试类
@SpringBootTest
public class SpringBootTest{
@AUtowired
private RabbitMqProduce producer;
@Test
public void testSendDirectMsg(){
producer.send("direct类型发送消息!!!");
}
}
三 Topic类型消息
- 控制台操作
参考前面的创建交换机,队列,以及绑定关系操作 - 代码实现
(1)依赖引入参考fanout类型的消息
(2)mq消息配置
路由键使用通配符进行匹配,#代表多个,*代表一个
spring:
rabbitmq:
host: 192.168.150.101 #主机ip
port: 5672 #端口
virtual-host: /hmall #虚拟主机
username: hmall #用户名
password: 123 #密码
exchange: hmall.topic
producer:
queue1: topic.queue1
queue2: topic.queue2
routingKey1: china.#
routingKey2: #.news
(3)MQ配置类
@Component
public class RabbitMqConfig {
@Autowired
private RabbitTemplate rabbitTemplete;
@value("${spring.rabbitmq.producer.exchange}")
private String exchange;
@value("${spring.rabbitmq.producer.queue1}")
private String queueName1;
@value("${spring.rabbitmq.producer.queue2}")
private String queueName2;
@value("${spring.rabbitmq.producer.routingKey1}")
private String routingKey1;
@value("${spring.rabbitmq.producer.routingKey2}")
private String routingKey2;
// 创建交换机
@Bean("topicExchange")
public Exchange getExchange(){
return ExchangeBuilder
.topicExchange(exchange) // 交换机类型,交换机名称
.durable(true) //ture为持久化,存到磁盘,false存到内存
.build();
}
// 创建队列
@Bean("topicQueue1")
public Queue getDirectQueue1(){
retuen new Queue(queueName1);
}
// 交换机绑定队列
@bean
public Binging bindDirectQueue1(@Qualifier("topicExchange") Exchange exchange,@Qualifier("topicQueue1") Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with(routingKey1)
.noargs();
}
// 创建队列
@Bean("topicQueue2")
public Queue getDirectQueue2(){
retuen new Queue(queueName2);
}
// 交换机绑定队列
@bean
public Binging bindDirectQueue2(@Qualifier("topicExchange") Exchange exchange,@Qualifier("topicQueue2") Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with(routingKey2)
.noargs();
}
}
(4)生产者发送消息
@Component
public class RabbitMqProduce {
@Autowired
private RabbitTemplate rabbitTemplete;
@value("${spring.rabbitmq.producer.queue1})
private String queueName1;
@value("${spring.rabbitmq.producer.queue2})
private String queueName2;
@value("${spring.rabbitmq.producer.routingKey2})
private String routingKey1;
@value("${spring.rabbitmq.producer.routingKey1})
private String routingKey2;
/**
* 入参说明:
* 第一个参数:queueName:队列名称
* 第二个参数:路由键,fanout类型不需要路由键
* 第三个参数:msg 消息题内容
*/
public void sendQueue1(String msg){
rabbitTemplete.covertAndSend(queueName1,routingKey1,msg);
}
public void sendQueue2(String msg){
rabbitTemplete.covertAndSend(queueName2,routingKey2,msg);
}
}
(5)消费者监听消息
@Component
public class RabbitMqListener {
@RabbitListener(queues="${spring.rabbitmq.producer.queue1}")
public void counsume(String msg){
System.out.pringln("消费者收到 topic.queue1队列发的消息",msg);
}
@RabbitListener(queues="${spring.rabbitmq.producer.queue2}")
public void counsume(String msg){
System.out.pringln("消费者收到 topic.queue2队列发的消息",msg);
}
}
(6)测试类
@SpringBootTest
public class SpringBootTest{
@AUtowired
private RabbitMqProduce producer;
@Test
public void testSendDirectMsg(){
producer.send("direct类型发送消息!!!");
}
}
四 消息转换器
MQ会把消息体变成字节码
解决办法:使用消息转换器,实现如下:
- 在生产者和消费者两个服务引入依赖
<dependency>
<groupId>com.fasterxml.jackson</groupId>
<artifactId>jasckson-databind</artifactId>
</dependency>
- 在生产者和消费者两个服务编写消息转换器配置
@Component
public class JacksonMessageConvertor{
@Bean
public MessageCoverter jacksonMessageConvertor(){
return new Jackson2JsonMessageConverter();
}
}
- 消息体
对于生产者来说,是map类型的,则生成者接收的时候也是map类型
例如:
@Component
public class RabbitMqListener {
@RabbitListener(queues="${spring.rabbitmq.producer.queue1}")
public void counsume(Map<String,Objecct> msg){
System.out.pringln("消费者收到 topic.queue1队列发的消息",msg);
}
}
五 案例演示
支付服务支付成功后通知交易服务进行后续操作
生产者和消费者两个服务都需要进行1,2,3步骤
- 添加依赖
<!--mq依赖-->
<dependency>
<groupId>org.springframework.book</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--消息转换器依赖-->
<dependency>
<groupId>com.fasterxml.jackson</groupId>
<artifactId>jasckson-databind</artifactId>
</dependency>
- 添加MQ配置信息
spring:
rabbitmq:
host: 192.168.150.101 #主机ip
port: 5672 #端口
virtual-host: /hmall #虚拟主机
username: hmall #用户名
password: 123 #密码
exchange: pay.topic
queue: mark.order.pay.queue
routKingKey: pay.success
- 消息转换器配置类
@Component
public class JacksonMessageConvertor{
@Bean
public MessageCoverter jacksonMessageConvertor(){
return new Jackson2JsonMessageConverter();
}
}
- 生产者
(1)生产者的配置
@Component
public class Rabbitroducer {
@Autowired
private RabbitTemplate rabbitTemplete;
@value("${spring.rabbitmq.queue})
private String queueName;
@value("${spring.rabbitmq.routingKey})
private String routingKey;
/**
* 入参说明:
* 第一个参数:queueName:队列名称
* 第二个参数:路由键,fanout类型不需要路由键
* 第三个参数:msg 消息题内容
*/
public void sendMsg(String msg){
// 发送消息
rabbitTemplete.covertAndSend(queueName,routingKey, msg);
}
}
(2)业务代码支付成功发送消息
public class payOrderServiceImpl impletement PayOrderService{
@Autowrid
private RabbitProducer payProducer;
@Overirid
@Transactional(rollback = Exception.class)
public void payOrder(PayOrderDto payOrder){
// 一些列操作最终交易成功
// 发送消息通知
try{
payProducer.send(payOrder.getId());
}catch(AmqpException e){
log.error("交易成功,发送消息异常:{}",e.getMessages(););
}
}
}
- 消费者
@Component
public class PaySatusListener {
@Autowired
private OrderService orderService;
@RabbitListener(bindings =
@QueueBinding(
value = Queue(name="${spring.rabbitmq.queue}",durable="true"),
exchange = @Exchange(name="${spring.rabbitmq.exchange)",type=ExchangeType.TOPIC),
key = {"${spring.rabbitmq.routingKey}"}
))
public void listenOrderPay(Long orderId){
//标记订单为已支付
orderService.markOrderPaySuccess(orderId);
}
}