文章目录
- 前言
- 一、Rabbit MQ简介
- 1、基本概念
- 2、组件架构
- 二、使用步骤
- 1.引入依赖
- 2.application.properties
- 3、docker 安装Rabbit MQ
- 3、使用案例
- 3.1、定义队列
- 3.2、定义交换机
- 3.3、绑定
- 3.4、发送消息
- 3.5、接受消息
- 3.5、自定义消息序列化方式
- 3.6、演示Fanout 交换机模式
- 3.7、演示Topic 交换机模式
- 三、消息可靠性
- 3.1、ConfirmCallback&ReturnCallback
- 3.2、设置手动ACK
- 3.2.1、演示自动签收消息丢失
- 3.2.2、演示手动签收
- 总结
前言
本篇主要介绍消息中间件Rabbit MQ
的基本概念及使用,以及保证消息的可靠投递
对应视频p248-p261
一、Rabbit MQ简介
1、基本概念
Rabbit MQ是一个消息代理 - 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。这是官方文档给予Rabbit MQ的定义。
Rabbit MQ是基于AMQP 模型
,AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。和JMS(Java Message Service)JAVA消息服务
最大的区别在于,AMQP
具有跨平台,跨语言的特性,并且支持更多的消息模型,并且简化了消息的类型(网络通信统一转换为流)。
在企业级开发中运用Rabbit MQ,主要是为了达到异步
、削峰
、解耦
的目的:
异步
:假设用户注册完成后需要发送短信和邮件,如果是同步模式,就需要注册->发送短信->发送邮件顺序执行。假设某一个步骤调用第三方的接口执行的时间较长,会极大地拖延整个业务流程的完成时间,需要等到最后一步执行完成后再返回结果,这样对于用户是非常不友好的。而引入Rabbit MQ,可以做到注册完成后直接返回结果给用户,异步通知其他的服务发送短信,发送邮件。削峰
:对于一些电商平台,在双11,618等活动时,用户请求会达到峰值,服务器一般短时间内无法承受这样的负担,通过Rabbit MQ,可以将用户的请求暂时存放入队列中,然后负责业务处理的模块按照一定的规则从队列中获取请求进行处理,也就是起到一个缓冲的作用。解耦
:假设现在有三个系统,B,C系统需要接受A系统的通知,如果后续又加入了一个D系统,那么A系统还需要编写针对D系统发送通知的接口。引入Rabbit MQ,A系统可以将通知发送到队列,需要接受消息的系统自行监听队列即可。
2、组件架构
Rabbit MQ的基本架构,包括生产者
、消费者
、消息
、交换机
、队列
、通道
、虚拟主机
:
生产者
是消息的生产方,负责编辑消息发送到交换机。消费者
是消息的消费方,负责监听队列,从队列中获取消息并处理。交换机
作为消息和队列的中间组件,消息需要经过交换机,而不是直接发送至队列。队列
用来保存消息直到发送给消费者。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。通道
信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。可以复用一条TCP连接。虽然客户端和 RabbitMQ 服务器之间只有一个 TCP 连接
,但可以通过多个信道
并行处理不同的消息通信任务。每个信道在逻辑上是独立的,它们互不干扰。在 RabbitMQ 中,客户端首先与服务器建立一个 TCP 连接,然后通过该连接创建多个信道来进行通信。虚拟主机
上述的交换机,队列,通道都是运行在虚拟主机上的。可以将虚拟主机理解成一个命名空间
,允许你在同一个 RabbitMQ 实例中分隔不同的应用或项目。不同的队列、交换机等资源可以在不同的虚拟主机中拥有相同的名称,而不会产生冲突。并且每个虚拟主机都有一套独立的权限管理。
二、使用步骤
1.引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.application.properties
spring.rabbitmq.host=自己的地址
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
还需要在启动类上加入@EnableRabbit
注解
3、docker 安装Rabbit MQ
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p
25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
3、使用案例
同样的Rabbit MQ提供了两个模版:AmqpAdmin
和RabbitTemplate
。AmqpAdmin
可用于队列,交换机,绑定关系的定义。RabbitTemplate
可以用于发送消息。
3.1、定义队列
/**
* 创建队列
*/
@Test
public void createQueue() {
//参数二控制是否持久化
amqpAdmin.declareQueue(new Queue("hello-java-queue", true, false, false,null));
System.out.println("queue created");
}
3.2、定义交换机
常见的交换机有:
Direct 交换机
根据精确匹配
的路由键来将消息路由到特定队列。Fanout 交换机
将消息广播到所有绑定的队列,无需考虑路由键。Topic 交换机
根据模式匹配
的路由键,将消息路由到匹配的队列。Headers 交换机
根据消息头中的属性值匹配,而非路由键。
前三种模式最为常用,Topic 交换机
的模式匹配
也称为通配符模式
,一般有两种,#
匹配0个或多个单词,*
匹配一
个单词。
/**
* 创建交换机(直连模式)
*/
@Test
public void createExchange() {
//参数二控制是否持久化
amqpAdmin.declareExchange(new DirectExchange("hello-java-exchange", true, false,null));
System.out.println("exchange created");
}
3.3、绑定
/**
* 将队列绑定到交换机上
*/
@Test
public void createBinding() {
//参数一 队列名 参数二:绑定类型 参数三:交换机名 参数四:路由键
amqpAdmin.declareBinding(new Binding("hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java-exchange",
"hello.java",
null));
System.out.println("binding created");
}
3.4、发送消息
发送的消息通过路由键找到对应的交换机。
/**
* 发送消息
*/
@Test
public void sendMessage() {
//参数一:交换机名 参数二:路由键
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java","这是一个测试信息");
}
3.5、接受消息
需要在方法上加入@RabbitListener
注解,指定监听的队列。
@RabbitListener(queues = "hello-java-queue")
public void listenMessage(Message message, Channel channel) {
}
3.5、自定义消息序列化方式
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
3.6、演示Fanout 交换机模式
/**
* 创建交换机(广播模式)
*/
@Test
public void createFanoutExchange() {
amqpAdmin.declareExchange(new FanoutExchange("hello-java-fanout-exchange", true, false,null));
}
@Test
public void createQueue2() {
amqpAdmin.declareQueue(new Queue("hello-java-fanout-queue", true, false, false,null));
}
/**
* 将队列绑定到交换机上
*/
@Test
public void createBindingFanout() {
amqpAdmin.declareBinding(new Binding("hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java-fanout-exchange",
"hello.java",
null));
amqpAdmin.declareBinding(new Binding("hello-java-fanout-queue",
Binding.DestinationType.QUEUE,
"hello-java-fanout-exchange",
"hello.java",
null));
System.out.println("binding created");
}
/**
* 发送消息
*/
@Test
public void sendMessage() {
rabbitTemplate.convertAndSend("hello-java-fanout-exchange","hello.js","这是一个测试信息");
}
@RabbitListener(queues = {"hello-java-queue","hello-java-fanout-queue"})
public void receiveQueue(String message) {
System.out.println(message);
}
广播模式下,在发送消息时,即使指定的路由键和队列绑定在交换机上的不同,监听相关队列的消费者也可以接收到消息。即:fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。
3.7、演示Topic 交换机模式
/**
* 创建交换机(通配符模式)
*/
@Test
public void createExchangeTopic() {
amqpAdmin.declareExchange(new TopicExchange("hello-java-topic-exchange", true, false,null));
System.out.println("exchange created");
}
/**
* 创建通配符队列1
*/
@Test
public void createTopicQueue1() {
amqpAdmin.declareQueue(new Queue("hello-java-topic-queue1", true, false, false,null));
}
/**
* 创建通配符队列2
*/
@Test
public void createTopicQueue2() {
amqpAdmin.declareQueue(new Queue("hello-java-topic-queue2", true, false, false,null));
}
/**
* 将队列绑定到交换机上(通配符模式)
*/
@Test
public void createBindingTopic() {
amqpAdmin.declareBinding(new Binding("hello-java-topic-queue1",
Binding.DestinationType.QUEUE,
"hello-java-topic-exchange",
"#.java",
null));
amqpAdmin.declareBinding(new Binding("hello-java-topic-queue2",
Binding.DestinationType.QUEUE,
"hello-java-topic-exchange",
"*.java",
null));
System.out.println("binding created");
}
@GetMapping("/send")
public void sendMessage() {
rabbitTemplate.convertAndSend("hello-java-topic-exchange", "java", "这是一个测试信息");
rabbitTemplate.convertAndSend("hello-java-topic-exchange", "hello.world.java", "这是一个测试信息");
}
/**
* 监听通配符队列1 和交换机绑定的路由键是 #.java
* @param message
* @param channel
*/
@RabbitListener(queues = "hello-java-topic-queue1")
public void receiveTopicQueue(String message,Channel channel) {
System.out.println("监听hello-java-topic-queue1的消息是" + message);
}
首先向通配符模式的交换机发送了两个消息,消费者选择监听与交换机绑定了#.java
路由键的队列hello-java-topic-queue1
,由于发送的消息java
和hello.world.java
与#
通配符能匹配上(前者匹配0个,后者匹配2个),所以两条消息消费者都接受到了。
接下来测试*
通配符,由于*
通配符只能匹配单个单词,所以只监听到了hello.java
@GetMapping("/send")
public void sendMessage() {
rabbitTemplate.convertAndSend("hello-java-topic-exchange", "java", "这是一个测试信息java");
rabbitTemplate.convertAndSend("hello-java-topic-exchange", "hello.world.java", "这是一个测试信息hello.world.java");
rabbitTemplate.convertAndSend("hello-java-topic-exchange", "hello.java", "这是一个测试信息hello.java");
}
/**
* 监听通配符队列2 和交换机绑定的路由键是 *.java
* @param message
* @param channel
*/
@RabbitListener(queues = "hello-java-topic-queue2")
public void receiveTopicQueue(String message,Channel channel) {
System.out.println("监听hello-java-topic-queue2的消息是" + message);
}
三、消息可靠性
在Rabbit MQ中,消息首先需要发送到交换机,再由交换机发送到队列,最后消费者从队列中读取消息。中转的步骤较多,其中每一步都有可能发生消息丢失的问题。Rabbit MQ采用消息投递回调
和消息确认机制
分别保证生产者方和消费者方的消息可靠性。
3.1、ConfirmCallback&ReturnCallback
ConfirmCallback
是消息发送到交换机的回调,无论成功或者失败都会触发。ReturnCallback
是消息从交换机到队列的回调,只有失败了才会触发。
在使用之前,需要在配置文件中加上:
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
可以在自定义配置类中为RabbitTemplate
加上对应的配置:
/**
* 消息发送到交换机的回调 ConfirmCallback(成功和失败都会触发)
* 从交换机到队列投递失败的回调ReturnCallback
* 从队列到消费者是 ack机制 只要消费者没有手动ack,消息就默认未被消费,是unacked状态,服务器宕机,消息会重置为ready状态
*/
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* Confirmation callback.
*
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("触发ConfirmCallback:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* Returned message callback.
*
* @param message the returned message.
* @param replyCode the reply code.
* @param replyText the reply text.
* @param exchange the exchange.
* @param routingKey the routing key.
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("触发ReturnCallback:message"+message+"replyCode:"+replyCode+",exchange:"+exchange+",routingKey:"+routingKey);
}
});
}
3.2、设置手动ACK
如果没有进行任何设置,在Rabbit MQ中默认消息都是自动签收
的:
3.2.1、演示自动签收消息丢失
@GetMapping("/send")
public void sendMessage() {
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java","这是一个测试信息"+i);
}
}
@RabbitListener(queues = "hello-java-queue")
public void listenMessage(Message message, Channel channel) {
System.out.println("listenMessage接收到的消息" + message);
throw new NullPointerException();
}
上图的1条消息已经被自动签收
模拟出现异常,消息不会丢失。
假设此时服务器宕机了,队列中剩下未处理的消息会丢失:
/**
* 同一个消息只能被一个监听者接受
* 一个消息处理完成后,才能处理下一个消息
* @param message
* @param channel
*/
@RabbitListener(queues = "hello-java-queue")
public void listenMessage(Message message, Channel channel) {
System.out.println("listenMessage接收到的消息" + message);
}
通过jps -l
找到当前服务进程,并Stop-Process -Id 25364 -Force
强行终止:
发现剩下的消息全部丢失:
3.2.2、演示手动签收
我们可以设置消息手动确认,在配置文件中加入:spring.rabbitmq.listener.simple.acknowledge-mode=manual
,打上断点:
队列中有5条unack(未确认)的消息。
接下来处理了两个消息,但是没有手动确认:
可以看到在控制台中依旧是5条unack状态:
假设此时服务器宕机了:
可以看到控制台中这5条消息回到了ready状态,没有丢失:
如果需要手动签收,需要在消费者方监听的代码中,使用channel
的basicAck
或basicNack
方法,前者是签收,后者是拒绝,其中拒绝又有两种模式,一种是重新放回队列,另一种是丢弃:
拒绝并丢弃
:
@RabbitListener(queues = "hello-java-queue")
public void listenMessage(Message message, Channel channel) {
System.out.println("监听hello-java-queue的消息是"+message);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//手动确认
if (deliveryTag % 2 == 0) {
//签收
channel.basicAck(deliveryTag, false);
System.out.println("接收到的消息" + deliveryTag);
} else {
//拒绝(丢弃)
channel.basicNack(deliveryTag, false, false);
System.out.println("拒绝了消息" + deliveryTag);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
重新启动项目,消息一部分被拒绝,一部分被签收并消费
队列清空
拒绝并重新放回队列
:当一个消息被拒绝又重新放回队列时,会被再次消费,创建两个消费者监听hello-java-queue
:
@RabbitListener(queues = "hello-java-queue")
public void listenMessage(Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//手动确认
if (deliveryTag % 2 == 0) {
//签收
channel.basicAck(deliveryTag, false);
System.out.println("listenMessage接收到的消息" + deliveryTag);
} else {
//拒绝
channel.basicNack(deliveryTag, false, true);
System.out.println("listenMessage拒绝了消息" + deliveryTag);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@RabbitListener(queues = {"hello-java-queue","hello-java-fanout-queue"})
public void receiveQueue(Message message,Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
channel.basicAck(deliveryTag, false);
System.out.println("receiveQueue接收到的消息"+deliveryTag);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
拒绝的消息可以放回队列被再次消费,直到有消费者签收为止。
总结
下面总结一下Rabbit MQ 的工作流程
- 生产者与虚拟主机建立连接,经过信道,发送消息到交换机(带有路由键)。
- 交换机根据绑定和路由规则,将消息路由到一个或多个队列。
- 队列存储消息,等待消费者处理。
- 消费者从队列中接收消息,并处理。
- 消费者处理消息后,发送 ACK 确认,RabbitMQ 从队列中删除该消息。
- 如果消费者拒绝消息或处理失败,RabbitMQ 可能会将消息重新投递,或者发送到死信队列。
其他诸如死信队列,延迟队列,消息幂等性,消息积压等将在高级篇中讲解。
下一篇:订单服务&分布式事务