文章目录
- 1、AMQP
- 2、基本消息模型队列
- 3、WorkQueue模型
- 4、发布订阅模型
- 5、发布订阅-Fanout Exchange
- 6、发布订阅-DirectExchange
- 7、发布订阅-TopicExchange
- 8、消息转换器
1、AMQP
Advanced Message Queuing Protocol,高级消息队列协议。是用于在应用程序之间传递业务消息的开放标准。
该协议与语言和平台无关,更符合微服务中独立性的要求。
Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。
它包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
接下来对几种常见的MQ模型,用SpringAMQP来演示一下具体实现。
2、基本消息模型队列
消息发送
- 引入AMQP依赖(publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程)
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 在publisher服务中编写application.yml,添加mq连接信息
spring:
rabbitmq:
host: 192.168.150.101 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: root # 用户名
password: 123321 # 密码
- 在publisher服务中新建一个测试类,
注入RabbitTemplate,调用convertAndSend方法,传入队列名称和消息内容即可
。
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
String queueName = "simple.queue";
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend(queueName, message);
}
}
消息接收
- 在consumer服务中编写application.yml,添加mq连接信息
spring:
rabbitmq:
host: 192.168.150.101 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: root # 用户名
password: 123321 # 密码
- 在consumer服务中新建一个类,编写消费逻辑
//定义类,添加@Component注解
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息 :【" + msg + "】");
}
}
类中声明方法,添加@RabbitListener注解,注解上写明要消费的队列名称,此时,方法的参数就是消息。
3、WorkQueue模型
和基本模型不一样,WorkQueue模型,即工作队列,有多个消费者。可以提高消息处理速度,避免队列消息堆积。
案例:实现一个队列绑定多个消费者
思路如下:
- 在publisher服务中定义测试方法,每秒产生50条消息,发送到队列simple.queue
- 在consumer服务中定义两个消息监听者,都监听simple.queue队列,但消费者1每秒能处理50条消息,而消费者2每秒只能处理10条消息
演示代码如下,先写生产者:
//在publisher服务中添加一个测试方法,循环发送50条消息到simple.queue队列
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, message__";
for (int i = 0; i < 50; i++) {
// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);
// 避免发送太快
Thread.sleep(20);
}
}
接下来编写两个消费者,都监听上面的队列simple.queue:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage1(String msg) throws InterruptedException {
System.out.println("spring 消费者1接收到消息:【" + msg + "】");
Thread.sleep(25);
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage2(String msg) throws InterruptedException {
System.err.println("spring 消费者2接收到消息:【" + msg + "】");
Thread.sleep(100);
}
上面一个用System.out.println一个用System.error.println
,这样一红一白,在控制台查看消费的效果:
可以看到,消费者2处理消息慢(代码里用处理一次休眠的旧来模拟),但它从队列里拿的消息却和消费者1一样,因此导致消费总时间过长。对消费者2来说,这就是没有那个金刚钻,却揽了这么多瓷器活儿。这个现象的原因是 ⇒ 消息预取机制
预取机制,通俗说就是消息到了队列后,消费者通过通道一人一个先拿过来,能不能快速处理完的另说,先取走再说。
接下来加一个消费预取限制
。通过设置prefetch来控制消费者预取的消息数量。修改消费者的application.yml文件,设置preFetch这个值,可以控制预取消息的上限:
spring:
rabbitmq:
host: 192.168.150.101 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: root # 用户名
password: 123321 # 密码
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
prefetch默认无限,这里改为1,即 每次只能获取一条消息,处理完成才能获取下一个消息。重启,这次同样生产50条消息,消费总时间变短了,可以看到消费者2处理的慢,取的也慢。
4、发布订阅模型
和前面两种模型不同,一个消息被一个消费者消费完就没了。发布订阅模式允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。 当前上一章说的支付功能的实现,就得用这个模型。
消息被路由到哪些队列中,由exchange决定。常见exchange类型包括:
- Fanout:广播
- Direct:路由
- Topic:话题
注意:
exchange负责消息路由,而不是存储,路由失败则消息丢失
5、发布订阅-Fanout Exchange
Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue
SpringAMQP提供了声明交换机、队列、绑定关系的API:
接下来写演示代码:
- 步骤1:在消费consumer服务声明Exchange、Queue,并Binding绑定
//在consumer服务配置目录下创建一个类,添加@Configuration注解
@Configuration
public class FanoutConfig {
// 声明FanoutExchange交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("exchange.fanout");
}
// 声明第1个队列
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
//绑定队列1和交换机,方法形参就是队列和交换机类型的参数
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
//以相同方式声明第2个队列,并完成绑定到上面的交换机
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
//绑定队列2和交换机
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
//将来Spring读到这些Bean,就会向RabbitMQ去声明这些队列和交换机,并绑定
启动消费服务,可以看到队列创建并且绑定到了交换机,以后交换机有消息,和交换机绑定的队列都能收到一份!
- 步骤2:在consumer服务声明两个消费者,
即添加两个方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到fanout.queue2的消息:【" + msg + "】");
}
//这里在一个服务里用两个消费者方法模拟被多个服务消费
//到此,两个队列就分别和这两个消费者方法勾搭上了
//重启消费者服务。
- 步骤3:在publisher服务发送消息到FanoutExchange
//这次不再是send到队列,而是发到交换机
@Test
public void testFanoutExchange() { // 队列名称 String exchangeName = "itcast.fanout"; // 消息 String message = "hello, everyone!"; // 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息 rabbitTemplate.convertAndSend(exchangeName, "", message);}
//执行这段测试代码,往交换机发消息
查看消费者服务控制台,可以看到一次发送,被多个消费者收到
小总结:
交换机的作用是什么?
➢ 接收publisher发送的消息
➢ 将消息按照规则路由到与之绑定的队列
➢ 不能缓存消息,路由失败,消息丢失
➢FanoutExchange的会将消息路由到每个绑定的队列
6、发布订阅-DirectExchange
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
当然两个队列可以设置同一个key,此时的效果就和上面的Fanout Exchange一样了。
步骤1:在consumer服务声明Exchange、Queue。这里不再用Bean,而是利用@RabbitListener声明Exchange、Queue、RoutingKey
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到Direct消息:【"+msg+"】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到Direct消息:【"+msg+"】 ");
}
看IDEA的提示类型写就行:exchange的type属性默认就是direct类型,这里不加也行。
启动消费者服务,到此,队列和交换机都被创建和绑定好了,查看RabbitMQ控制台:
步骤2:在publisher服务发送消息到DirectExchange,此时convertAndSend方法的第二个参数就要写Routing key了
//在publisher服务的SpringAmqpTest类中添加测试方法
@Test
public void testDirectExchange() {
// 队列名称
String exchangeName = "itcast.direct";
// 消息
String message = "hello,blue!";
// 发送消息,参数依次为:交换机名称,RoutingKey,消息
rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}
运行这个测试方法,查看消费服务:
小总结:
Direct交换机与Fanout交换机的差异?
➢ Fanout交换机将消息路由给每一个与之绑定的队列
➢ Direct交换机根据RoutingKey判断路由给哪个队列
➢ 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声明队列和交换机的常用注解有:
@Queue
@Exchange
7、发布订阅-TopicExchange
TopicExchange与DirectExchange类似,区别在于它的routingKey必须是多个单词的列表,并且以 . 分割
#:代指0个或多个单词
*:代指一个单词
举例:
china.news 代表有中国的新闻消息;
china.weather 代表中国的天气消息;
japan.news 则代表日本新闻
japan.weather 代表日本的天气消息
总之就是用通配符简化了队列和exchange的绑定。用代码演示一下这个TopicExchange类型的使用,测试结构:
步骤1: 在consumer服务利用@RabbitListener
声明Exchange、Queue、RoutingKey
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到Topic消息:【"+msg+"】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到Topic消息:【"+msg+"】");
}
//注意key和exchange中的type
步骤2: 在publisher服务发送消息到TopicExchange
//在publisher服务的SpringAmqpTest类中添加测试方法
@Test
public void testTopicExchange() {
// 队列名称
String exchangeName = "itcast.topic";
// 消息
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
8、消息转换器
SpringAMQP的发送方法convertAndSend()中,接收消息的类型是Object
,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送
。
//这次不再是用@RabbitListener,因为它用在消费方法上,我们要看消息,不希望它被消费
//因此使用Bean声明队列
@Bean
public Queue objectMessageQueue(){
return new Queue("object.queue");
}
在publisher中发送消息以测试:
@Test
public void testSendMap() throws InterruptedException {
// 准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name", "Jack");
msg.put("age", 21);
// 发送消息
rabbitTemplate.convertAndSend("object.queue", msg);
}
重启服务,查看RabbitMQ中的消息:
数据展示有问题,Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。接下来修改序列化的方式。
修改序列化方式只需要定义一个MessageConverter 类型的Bean即可
- 在父工程(或者消息生产者服务)中引入依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
- 在配置类中声明MessageConverter的Bean(直接写启动类中也行)
//我们一旦声明MessageConverter,就会覆盖Spring默认的MessageConverter(SpringBoot自动装配的原理)
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
此时再重发消息,RabbitMQ中消息展示正常。接下来看消费者服务:
- 引入Jackson依赖(如果上一步引入依赖放在了父工程,那这里就不用重复引入了)
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
- 在consumer服务定义MessageConverter:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
- 定义一个消费者,监听object.queue队列并消费消息
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String, Object> msg) {
System.out.println("收到消息:【" + msg + "】");
}
重启消费者服务,序列化方式修改成功!
SpringAMQP中消息的序列化和反序列化是怎么实现的?
利用MessageConverter实现的,默认是JDK的序列化,可重新定义MessageConverter类型的Bean来修改。
注意发送方与接收方必须使用相同的MessageConverter
System.out.println("End!");