目录
前言
1. 概述
2. Basic Queue简单队列模型
2.1 消息发送
2.2 消息接收
2.3 总结
3. WorkQueue模型
3.1 消息发送
3.2 消息接收
3.3 测试
3.4 消费预取限制
3.5 总结
4. 发布、订阅
5. Fanout
5.1 声明队列和交换机
5.2 消息发送
5.3 消息接收
5.4 测试
5.5 总结
6. Direct
6.1 基于注解声明队列和交换机
6.2 发送消息
6.3 测试
6.4 总结
7. Topic
7.1 消息发送
7.2 消息接收
7.3 测试
7.4 总结
8. 交换机
8.1 配置JSON转换器
8.2 总结
前言
RabbitMQ帮助我们实现异步处理消息,大大减少了系统的压力,但是利用官方的API来实现RabbitMQ的功能实在是太麻烦了。为了简化发送和接收的API,就出现了SpringAMQP。
1. 概述
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用是来非常方便。
SpringAMQP的官方地址:Spring AMQP。
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
2. Basic Queue简单队列模型
在父工程mq-demo中引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.1 消息发送
首先配置MQ地址,在publisher服务的application.yml中添加配置:
spring:
rabbitmq:
host: MQ的IP地址 # RabbitMQ的IP地址
port: 5672 # RabbitMQ的通信端口
username: lyf # RabbitMQ的用户名
password: 123456 # RabbitMQ的密码
virtual-host: / # RabbitMQ的虚拟主机,这个可以去RabbitMQ管理界面查看
然后在publisher服务中编写测试类SpringAMQPTest,并利用RabbitTemplate实现消息发送:
package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello,spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
然后运行这个测试类成功之后,去RabbitMQ的管理平台可以看到:
点击进去可以看到,具体的MQ信息:
2.2 消息接收
首先配置MQ地址,在consumer服务的application.yml文件中添加配置:
spring:
rabbitmq:
host: MQ的IP地址 # RabbitMQ的IP地址
port: 5672 # RabbitMQ的通信端口
username: lyf # RabbitMQ的用户名
password: 123456 # RabbitMQ的密码
virtual-host: / # RabbitMQ的虚拟主机,这个可以去RabbitMQ管理界面查看
然后在consumer服务的cn.itcast.mq.listener包中添加SpringRabbitListener类:
package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenerSimpleQueueMessage(String msg){
System.out.println("接收到消息:【" + msg + "】");
}
}
找到consumer服务的启动类启动服务,然后在consumer服务的控制台可以看到:
这就是consumer接收到了刚刚publisher发送出去的消息。我们再去RabbitMQ的管理平台看看:
这里的队列显示未接收的消息为0了。
2.3 总结
什么是AMQP?
- 应用消息通信的一种协议,与语言和平台无关。
SpringAMQP如何发送消息?
- 引入amqp的starter依赖
- 配置RabbitMQ地址
- 利用RabbitTemplate的converAndSend方法
SpringAMQP如何接收消息:
- 引入amqp的starter依赖
- 配置RabbitMQ的地址
- 定义类,添加@Component注解
- 类中声明方法,添加@RabbitListener注解,方法参数就是消息
注意:消息一旦被消费就会从队列中删除,RabbitMQ没有消息回溯功能。
3. WorkQueue模型
Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work模型,多个消费者共同处理消息处理,速度就能大大提高了。
现在我们模拟一下WorkQueue,实现一个队列绑定多个消费者。
基本实现思路:
- 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
- 在consumer服务中定义两个消息监听者,都监听simple.queue队列
- 消费者1每秒处理50条消息,消费者2每秒处理10条消息
那么理论上来说,两个消费者加起来是不是就已经超过了50条消息,我们来实践一下。
3.1 消息发送
在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue:
package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
@Autowired
private RabbitTemplate rabbitTemplate;
// @Test
// public void testSendMessage2SimpleQueue() {
// // 队列名称
// String queueName = "simple.queue";
// // 消息
// String message = "hello,spring amqp!";
// // 发送消息
// rabbitTemplate.convertAndSend(queueName, message);
// }
/*
* 测试work模式
* */
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello,message____";
for (int i = 1; i <= 50; i++) {
// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20); // 一秒发送50条消息
}
}
}
3.2 消息接收
在consumer服务中定义两个消息监听者,都监听simple.queue队列:
package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
public class SpringRabbitListener {
// @RabbitListener(queues = "simple.queue")
// public void listenerSimpleQueueMessage(String msg){
// System.out.println("接收到消息:【" + msg + "】");
// }
@RabbitListener(queues = "simple.queue")
public void listenerWorkQueueMessage1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalDateTime.now()); // 看看消费者接收消息的时间
Thread.sleep(20); // 消费者1每秒处理50条消息
}
@RabbitListener(queues = "simple.queue")
public void listenerWorkQueueMessage2(String msg) throws InterruptedException {
System.err.println("消费者2接收到消息:........【" + msg + "】" + LocalDateTime.now()); // 看看消费者接收消息的时间,打印方式跟消费者1区分开
Thread.sleep(100); // 消费者2每秒处理10条消息
}
}
3.3 测试
启动ConsumerApplication后,将ConsumerApplication的日志清空,在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。结果如下:
消费者1接收到消息:【hello,message____1】2024-08-04T15:21:37.238949
消费者1接收到消息:【hello,message____3】2024-08-04T15:21:37.337949300
消费者2接收到消息:........【hello,message____2】2024-08-04T15:21:37.339449800
消费者1接收到消息:【hello,message____5】2024-08-04T15:21:37.364449800
消费者1接收到消息:【hello,message____7】2024-08-04T15:21:37.416949800
消费者2接收到消息:........【hello,message____4】2024-08-04T15:21:37.443450200
消费者1接收到消息:【hello,message____9】2024-08-04T15:21:37.476948600
消费者1接收到消息:【hello,message____11】2024-08-04T15:21:37.537449800
消费者2接收到消息:........【hello,message____6】2024-08-04T15:21:37.549951
消费者1接收到消息:【hello,message____13】2024-08-04T15:21:37.600449200
消费者2接收到消息:........【hello,message____8】2024-08-04T15:21:37.659454600
消费者1接收到消息:【hello,message____15】2024-08-04T15:21:37.661949300
消费者1接收到消息:【hello,message____17】2024-08-04T15:21:37.726449800
消费者2接收到消息:........【hello,message____10】2024-08-04T15:21:37.767950
消费者1接收到消息:【hello,message____19】2024-08-04T15:21:37.785449500
消费者1接收到消息:【hello,message____21】2024-08-04T15:21:37.847451600
消费者2接收到消息:........【hello,message____12】2024-08-04T15:21:37.875949700
消费者1接收到消息:【hello,message____23】2024-08-04T15:21:37.909949400
消费者1接收到消息:【hello,message____25】2024-08-04T15:21:37.971451200
消费者2接收到消息:........【hello,message____14】2024-08-04T15:21:37.984450600
消费者1接收到消息:【hello,message____27】2024-08-04T15:21:38.033449900
消费者2接收到消息:........【hello,message____16】2024-08-04T15:21:38.093949600
消费者1接收到消息:【hello,message____29】2024-08-04T15:21:38.096950100
消费者1接收到消息:【hello,message____31】2024-08-04T15:21:38.159450600
消费者2接收到消息:........【hello,message____18】2024-08-04T15:21:38.201450
消费者1接收到消息:【hello,message____33】2024-08-04T15:21:38.225449200
消费者1接收到消息:【hello,message____35】2024-08-04T15:21:38.281950200
消费者2接收到消息:........【hello,message____20】2024-08-04T15:21:38.310451200
消费者1接收到消息:【hello,message____37】2024-08-04T15:21:38.344949700
消费者1接收到消息:【hello,message____39】2024-08-04T15:21:38.405949800
消费者2接收到消息:........【hello,message____22】2024-08-04T15:21:38.417951100
消费者1接收到消息:【hello,message____41】2024-08-04T15:21:38.473950300
消费者2接收到消息:........【hello,message____24】2024-08-04T15:21:38.530450600
消费者1接收到消息:【hello,message____43】2024-08-04T15:21:38.537450300
消费者1接收到消息:【hello,message____45】2024-08-04T15:21:38.592950300
消费者2接收到消息:........【hello,message____26】2024-08-04T15:21:38.634752300
消费者1接收到消息:【hello,message____47】2024-08-04T15:21:38.652981
消费者1接收到消息:【hello,message____49】2024-08-04T15:21:38.714585
消费者2接收到消息:........【hello,message____28】2024-08-04T15:21:38.745860
消费者2接收到消息:........【hello,message____30】2024-08-04T15:21:38.964359800
消费者2接收到消息:........【hello,message____32】2024-08-04T15:21:39.083360200
消费者2接收到消息:........【hello,message____34】2024-08-04T15:21:39.194360800
消费者2接收到消息:........【hello,message____36】2024-08-04T15:21:39.299858200
消费者2接收到消息:........【hello,message____38】2024-08-04T15:21:39.409246400
消费者2接收到消息:........【hello,message____40】2024-08-04T15:21:39.517245300
消费者2接收到消息:........【hello,message____42】2024-08-04T15:21:39.627854900
消费者2接收到消息:........【hello,message____44】2024-08-04T15:21:39.750355100
消费者2接收到消息:........【hello,message____46】2024-08-04T15:21:39.866354800
消费者2接收到消息:........【hello,message____48】2024-08-04T15:21:39.985855400
消费者2接收到消息:........【hello,message____50】2024-08-04T15:21:40.089854600
我们可以看到,消费者第一次接收消息时间是2024-08-04T15:21:37.238949,而最后一次处理消息的时间是2024-08-04T15:21:40.089854600,这之间相差了3秒左右,跟我们预想的结果相差的有点大。我们再来看一下输出的结果,发现消费者1处理的是奇数的消息,而消费者2处理的是偶数的消息。可以看到消费者1很快完成了自己的25条消息。消费者2却在缓慢的处理自己的25条消息。
也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。
3.4 消费预取限制
俗话说的好,没有那个金刚钻就别揽那个瓷器活。要解决上面的问题,也很简单。我们修改consumer服务中的aoolication.yml文件,添加一个prefetch的配置:
spring:
rabbitmq:
host: 主机IP # RabbitMQ的IP地址
port: 5672 # RabbitMQ的通信端口
username: lyf # RabbitMQ的用户名
password: 123456 # RabbitMQ的密码
virtual-host: / # RabbitMQ的虚拟主机,这个可以去RabbitMQ管理界面查看
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
我们重启ConsumerApplication后,将ConsumerApplication的日志清空,在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。结果如下:
消费者2接收到消息:........【hello,message____2】2024-08-04T15:23:57.373878700
消费者1接收到消息:【hello,message____1】2024-08-04T15:23:57.373878700
消费者1接收到消息:【hello,message____3】2024-08-04T15:23:57.416879100
消费者1接收到消息:【hello,message____4】2024-08-04T15:23:57.445379900
消费者1接收到消息:【hello,message____5】2024-08-04T15:23:57.475857900
消费者2接收到消息:........【hello,message____6】2024-08-04T15:23:57.490857100
消费者1接收到消息:【hello,message____7】2024-08-04T15:23:57.521357400
消费者1接收到消息:【hello,message____8】2024-08-04T15:23:57.552356900
消费者1接收到消息:【hello,message____9】2024-08-04T15:23:57.583357400
消费者1接收到消息:【hello,message____10】2024-08-04T15:23:57.614357400
消费者2接收到消息:........【hello,message____11】2024-08-04T15:23:57.645857400
消费者1接收到消息:【hello,message____12】2024-08-04T15:23:57.677357600
消费者1接收到消息:【hello,message____13】2024-08-04T15:23:57.707357500
消费者1接收到消息:【hello,message____14】2024-08-04T15:23:57.740858700
消费者1接收到消息:【hello,message____15】2024-08-04T15:23:57.770358100
消费者2接收到消息:........【hello,message____16】2024-08-04T15:23:57.802857
消费者1接收到消息:【hello,message____17】2024-08-04T15:23:57.831358
消费者1接收到消息:【hello,message____18】2024-08-04T15:23:57.862357600
消费者1接收到消息:【hello,message____19】2024-08-04T15:23:57.894357300
消费者1接收到消息:【hello,message____20】2024-08-04T15:23:57.925857300
消费者2接收到消息:........【hello,message____21】2024-08-04T15:23:57.955857800
消费者1接收到消息:【hello,message____22】2024-08-04T15:23:57.986357200
消费者1接收到消息:【hello,message____23】2024-08-04T15:23:58.017862
消费者1接收到消息:【hello,message____24】2024-08-04T15:23:58.048857600
消费者1接收到消息:【hello,message____25】2024-08-04T15:23:58.080357600
消费者2接收到消息:........【hello,message____26】2024-08-04T15:23:58.112358100
消费者1接收到消息:【hello,message____27】2024-08-04T15:23:58.141858100
消费者1接收到消息:【hello,message____28】2024-08-04T15:23:58.173358600
消费者1接收到消息:【hello,message____29】2024-08-04T15:23:58.203357100
消费者1接收到消息:【hello,message____30】2024-08-04T15:23:58.237363900
消费者2接收到消息:........【hello,message____31】2024-08-04T15:23:58.265856700
消费者1接收到消息:【hello,message____32】2024-08-04T15:23:58.298358200
消费者1接收到消息:【hello,message____33】2024-08-04T15:23:58.328857400
消费者1接收到消息:【hello,message____34】2024-08-04T15:23:58.387857500
消费者1接收到消息:【hello,message____35】2024-08-04T15:23:58.420358
消费者2接收到消息:........【hello,message____36】2024-08-04T15:23:58.451858400
消费者1接收到消息:【hello,message____37】2024-08-04T15:23:58.482857900
消费者1接收到消息:【hello,message____38】2024-08-04T15:23:58.514357600
消费者1接收到消息:【hello,message____39】2024-08-04T15:23:58.545358700
消费者1接收到消息:【hello,message____40】2024-08-04T15:23:58.576358500
消费者2接收到消息:........【hello,message____41】2024-08-04T15:23:58.606358400
消费者1接收到消息:【hello,message____42】2024-08-04T15:23:58.638357700
消费者1接收到消息:【hello,message____43】2024-08-04T15:23:58.668857600
消费者1接收到消息:【hello,message____44】2024-08-04T15:23:58.699357800
消费者1接收到消息:【hello,message____45】2024-08-04T15:23:58.730357300
消费者2接收到消息:........【hello,message____46】2024-08-04T15:23:58.763358600
消费者1接收到消息:【hello,message____47】2024-08-04T15:23:58.793357500
消费者1接收到消息:【hello,message____48】2024-08-04T15:23:58.823858100
消费者1接收到消息:【hello,message____49】2024-08-04T15:23:58.854859200
消费者1接收到消息:【hello,message____50】2024-08-04T15:23:58.886358300
消费者第一次接收消息是2024-08-04T15:23:57.373878700,最后一次接收消息是2024-08-04T15:23:58.886358300的确跟我们之前预想的耗时一秒一样了。
3.5 总结
WorkQueue模型的使用:
- 多个消费者绑定到一个队列,同一个消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量
4. 发布、订阅
发布订阅的模型如图:
可以看到,再订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,也就是要发送消息的程序,但是不再是发送到队列中,而是发给exchange(交换机)
- Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3个类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
- Consumer:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
5. Fanout
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。
在广播模式下,消息发送流程是这样的:
- 可以有多个队列
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 交换机把消息发送给绑定过的所有队列
- 订阅队列的消费者都能拿到消息
现在我们来利用SpringAMQP演示FanoutExchange的使用。
实现思路如下:
- 在consumer服务中,利用代码声明队列、交换机,并将两者绑定
- 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
- 在publisher中编写测试方法,向itcast.fanout发送消息
5.1 声明队列和交换机
SpringAMQP提供了一个接口Exchange,来表示所有不同类型的交换机:
在consumer服务中,创建一个类,声明队列、交换机,并将两者绑定:
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
// 声明交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout"); // 创建名为itcast.fanout的交换机
}
// 声明队列-fanout.queue1
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1"); // 创建名为fanout.queue1的队列
}
// 将交换机与队列1绑定
@Bean
public Binding bindingQueue1(FanoutExchange fanoutExchange,Queue fanoutQueue1){
return BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange);
}
// 声明队列-fanout.queue2
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2"); // 创建名为fanout.queue2的队列
}
// 将交换机与队列2绑定
@Bean
public Binding bindingQueue2(FanoutExchange fanoutExchange,Queue fanoutQueue2){
return BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange);
}
}
5.2 消息发送
在publisher服务中的SpringAmqpTest类中添加测试方法:
// 测试fanoutExchange模式
@Test
public void testSendFanoutExchange() {
// 交换机名称
String exchangeName = "itcast.fanout";
// 消息
String message = "hello,everyone!";
// 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
5.3 消息接收
在consumer服务中的SpringRabbitListener中添加两个方法,作为消费者:
@RabbitListener(queues = "fanout.queue1")
public void listenerFanoutQueue1(String msg){
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenerFanoutQueue2(String msg){
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
5.4 测试
启动ConsumerApplication后,查看RabbitMQ的管理平台可以看到:
然后点击进去查看,就可以看到我们定义的两个队列:
将ConsumerApplication的日志清空,在执行publisher服务中刚刚编写的发送测试方法testFanoutExchange。结果如下:
两个队列都接收到了消息。
5.5 总结
Fanout交换机的作用是什么?
- 接收publisher发送的消息
- 将消息按照规则路由与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange会将消息路由到每个绑定的队列
声明队列、交换机、绑定关系的Bean是什么?
- Queue
- FanoutExchange
- Bingding
6. Direct
在Fanout模式中,一条消息,会被所有订阅的队列消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange:
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致才会收到消息
现在我们来利用SpringAMQP演示FanoutExchange的使用。
实现思路如下:
- 利用@RabbitListener注解声明Exchange、Queue、RoutingKey
- 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
- 在publisher中编写测试方法,向itcast.direct发送消息
6.1 基于注解声明队列和交换机
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解的方式来声明。这个不用背,可以根据提示来写,没有提示的按ctrl+p就有了。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
key = {"red","blue"}))
public void listenerDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}))
public void listenerDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2消息:【" + msg + "】");
}
6.2 发送消息
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "itcast.direct";
// 消息
String message = "hello,blue!";
// 发送消息,参数分别是:交互机名称、RoutingKey、消息
rabbitTemplate.convertAndSend(exchangeName,"blue",message);
}
6.3 测试
启动ConsumerApplication后,在RabbitMQ的管理平台可以看到:
点击进去可以看到创建的队列:
将ConsumerApplication的日志清空,在执行publisher服务中刚刚编写的发送测试方法testDirectExchange。结果如下:
因为发送消息的RoutingKey写的是blue,所以只有消费者1能收到消息,接下来,把发送消息的RoutingKey改成yellow:
清空控制台日志,再来看看:
因为发送消息的RoutingKey写的是yellow,所以只有消费者2能收到消息,接下来,把发送消息的RoutingKey改成red:
清空控制台日志,再来看看:
6.4 总结
描述一下Direct交换机与Fanout交换机的差异:
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声明队列和交换机有哪些常见注解:
- @Queue
- @Exchange
7. Topic
Topic类型的交换机与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型交换机可以让队列在绑定RoutingKey的时候使用通配符。
RoutingKey一般都是有一个或多个单词组成,多个单词之间以"."分割,例如:item.insert通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
item.#:能够匹配item.spu.insert或者item.spu
item.*:只能匹配item.spu
图示:
解释:
Queue1:绑定的是china.#,因此凡是以china.开头的routingKey都会被匹配到。包括china.news和china.weather
Queue4:绑定的是#.news,因此凡是以.news开头的routingKey都会被匹配到。包括china.news和japan.news
现在我们用SpringAMQP来试试Topic模型:
实现思路如下:
- 并利用@RabbitListener声明Exchange、Queue、RoutingKey
- 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
- 在publisher中编写测试方法,向itcast.topic发送消息
7.1 消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
// 测试TopicExchange模式
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "itcast.topic";
// 消息
String message = "恭喜樊振东拿到巴黎奥运会乒乓球男子单打冠军!";
// 发送消息,参数分别是:交互机名称、RoutingKey、消息
rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
}
7.2 消息接收
在consumer服务的SpringRabbitListener中添加方法:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenerTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenerTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2消息:【" + msg + "】");
}
7.3 测试
启动ConsumerApplication后,在RabbitMQ的管理平台可以看到:
然后点击进去查看,就可以看到我们定义的两个队列:
将ConsumerApplication的日志清空,在执行publisher服务中刚刚编写的发送测试方法testTopicExchange。结果如下:
因为发送消息的RoutingKey写的是china.news,所以消费者1和消费者2都能收到消息,接下来,把发送消息的RoutingKey改成china.weather:
然后清空控制台的日志,可以看到:
因为发送消息的RoutingKey写的是china.weather,所以只有消费者1能收到消息。
7.4 总结
描述下Direct交换机与Topic交换机的差异?
- Topic交换机接收的消息RoutingKey必须是多个单词,以
**.**
分割 - Topic交换机与队列绑定时的bindingKey可以指定通配符
#
:代表0个或多个词*
:代表1个词
8. 交换机
在SpringAMQP的发送方法中,接收消息的类型都是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们做序列化为字节后发送。
首先,我们是不是要先声明一下队列?声明队列有两种方式,一种是在SpringRabbitListener基于注解声明,另一种是在FanoutCofig里面注入Bean声明。第一种方式消息一出来就被消费了,所以我们用第二种方式:
@Bean
public Queue objectQueue(){
return new Queue("object.queue");
}
然后重启ConsumerApplication,查看一下RabbitMQ的管理平台:
接下来我们来试试往这个队列里发送消息,在publisher服务中的SpringAMQPTest类中添加一个发送Map类型消息的方法:
@Test
public void testObjectMessage() {
// 消息
Map<String,Object> msg = new HashMap<>();
msg.put("name","樊振东");
msg.put("age",21);
// 发送消息
rabbitTemplate.convertAndSend("object.queue",msg);
}
执行一下,然后去RabbitMQ的管理平台上看看:
发现这里有了一条消息,点进去看一下:
这里没有显示中文,用的还是java序列化,其实就是JDK序列化,这是因为Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
8.1 配置JSON转换器
如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:
在父工程的pom文件引入依赖(因为publisher和consumer都用到,所以我们的依赖加在父工程里):
<!--消息转换器的依赖-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
然后在publisher服务中的启动类PublisherApplication中添加一个Bean:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
然后在RabbitMQ的管理平台中把刚刚放到object.queue队列里的消息删除:
删除之后,我们再到SpringAMQPTest类中重新发消息。发完之后再到RabbitMQ的管理平台去看:
信息已经能正常显示了。然后在ConsumerApplication也添加转换器的Bean:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
再在consumer的消息监听SpringRabbitListener类上添加接收消息的方法:
@RabbitListener(queues = "object.queue")
public void listenerObjectQueue(Map<String,Object> msg){
System.out.println("消费者接收到object.queue消息:【" + msg + "】");
}
然后重启ConsumerApplication,可以看到:
8.2 总结
SpringAMQP中消息的序列化和反序列化是怎么实现的?
- 利用MessageConverter实现的,默认是JDK的序列化
- 注意发送方和接收方必须使用相同的MessageConverter