SpringCloud微服务
什么是微服务
维基百科:微服务是一种软件架构风格,它是以专注于单一职责的很多小型项目为基础,组合出复杂的大型应用。
微服务拆分
拆分目标
高内聚:每个微服务的职责要尽量单一,包含的业务相互关联度高、完整度高。
低耦合:每个微服务的功能要相对独立,尽量减少对其它微服务的依赖。
拆分方式
纵向拆分:按照业务模块来拆分。
横向拆分:抽取公共服务,提高复用性。
RabbitMQ
同步调用
优点:时效性强,等待到结果后才返回
缺点:扩展性差、性能下降(调用链越长耗时越久)、级联失败问题(一个调用点卡住,后面的链路都不能执行)
异步调用
异步调用通常是基于消息通知的方式,包含三个角色:
消息发送者:投递消息的人,就是原来的调用者
消息接收者:接收和处理消息的人,就是原来的服务提供者
消息代理:管理、暂存、转发消息,你可以把它理解成微信服务器
优点:
耦合度低,拓展性强
异步调用,无需等待,性能好
故障隔离,下游服务故障不影响上游业务
缓存消息,流量削峰填谷
缺点:
不能立即得到调用结果,时效性差
不确定下游业务执行是否成功
业务安全依赖于Broker(消息代理)的可靠性
MQ技术选型
RabbitMQ安装
-
查找镜像:
docker search rabbitmq
-
拉取镜像:
docker pull rabbitmq:3.8.19
,指定拉取版本为3.18.19,如果不指定则默认拉取latest -
查看镜像:
docker images
-
启动镜像:设置账号登录为admin,登录密码为admin,不指定镜像版本,默认启动rabbitmq:latest
docker run \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin \ --name mq \ --hostname localhost \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3.8.19
-
查看容器:
docker ps
-
进入RabbitMQ容器:
docker exec -it 4df /bin/bash
-
开启RabbitMQ后台访问:
rabbitmq-plugins enable rabbitmq_management
-
退出容器bash:
exit
-
网页访问RabbitMQ后台:访问http://localhost:15672,账号admin,密码admin
常见问题:
-
后台管理系统的可视化界面中出现:All stable feature flags must be enabled after completing an upgrade
**解决方案:**点击Admin -> Feature Flags,确保所有稳定的特性标志都是启用状态。如果有任何标志未启用,请将其启用。
-
后台管理系统的可视化界面中出现:Stats in management UI are disabled on this node
**解决方案:**进入RabbitMQ容器,运行命令:
echo management_agent.disable_metrics_collector=false>/etc/rabbitmq/conf.d/management_agent.disable_metrics_collector.conf
,退出RabbitMQ容器,然后运行docker restart 容器id
重启RabbitMQ容器。 -
后台管理系统的可视化界面中 Overview 不显示图形的问题
解决方案:同《2. 后台管理系统的可视化界面中出现:Stats in management UI are disabled on this node》
RabbitMQ介绍
-
publisher:消息发送者
-
comsumer:消息消费者
-
queue:队列-存储消息
-
exchange:交换机-接收发送者发送的消息,并将消息路由到与其绑定的队列
-
virtual-host:虚拟主机-将数据隔离(多个项目使用同一个RabbitMQ时,可以为每个项目建立一个virtual-host,将不同项目之间的exchange和queue隔离)
Work Queues(任务模型)
任务模型简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。同一个消息只会被一个消费者处理。多个消费者绑定到一个队列,可以加快消息处理速度。
默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。因此我们需要修改消费者的application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息,消费者处理完后再投递下一条消息。
Fanout交换机
Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式。 Fanout交换机会将收到的消息复制成n份,然后将消息发送到n个与其绑定的队列中。
应用场景:用户支付成功后,交易服务更新订单状态,短信服务通知用户,积分服务为用户增加积分。
实现:交易服务的queue、短信服务的queue、积分服务的queue都绑定到Fanout交换机,用户支付成功后,支付服务将消息发送到Fanout交换机,这样交易服务、短信服务、积分服务九都能收到这条消息了。
案例演示:
实现思路:
- 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
- 在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定
- 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
- 在publisher中编写测试方法,向hmall.fanout发送消息
代码实现:
发送者:
@SpringBootTest
class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
String exchangeName="hmall.fanout";
String message="hello everyone"; rabbitTemplate.convertAndSend(exchangeName,null,message);
}
}
消费者:
@Component
@Slf4j
public class SpringRabbitListener {
@RabbitListener(queues = "fanout.queue1")
public void listenerWorkQueue1(String message){
log.info("消费者1接收到消息:{},---{}",message, LocalTime.now());
}
@RabbitListener(queues = "fanout.queue2")
public void listenerWorkQueue2(String message){
log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());
}
}
消费者输出:
Direct交换机
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
- 每一个Queue都与Exchange设置一个Bindingkey(可以为每一个Queue指定相同的Bindingkey,实现和Fanout交换机相同的功能)。
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
应用场景:用户取消后,只需要给交易服务发送消息,通知交易服务更新订单状态,而不需要给短信服务和积分服务发送消息。
案例演示:
实现思路:
- 在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2
- 在RabbitMQ控制台中,声明交换机hmall. direct,将两个队列与其绑定,routeKey 为blue时路由到direct.queue1,为yellow时路由到direct.queue2,为red时路由到direct.queue1和direct.queue2
- 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
- 在publisher中编写测试方法,利用不同的RoutingKey向hmall. direct发送消息
代码实现:
消费者
@Component
@Slf4j
public class SpringRabbitListener {
@RabbitListener(queues = "direct.queue1")
public void listenerWorkQueue1(String message){
log.info("消费者1接收到消息:{},---{}",message, LocalTime.now());
}
@RabbitListener(queues = "direct.queue2")
public void listenerWorkQueue2(String message){
log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());
}
}
发送者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
//交换机名称
String exchangeName="hmall.direct";
//消息
String message_blue="hello blue";
String message_yellow="hello yellow";
String message_red="hello red";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,"blue",message_blue);
rabbitTemplate.convertAndSend(exchangeName,"yellow",message_yellow);
rabbitTemplate.convertAndSend(exchangeName,"red",message_red);
}
消费者输出:
Topic交换机
TopicExchange也是基于routingkey做消息路由,但是routingkey通常是多个单词的组合,并且以.分割。
Queue与Exchange指定routingkey时可以使用通配符:
- #:代指0个或多个单词
- *:代指一个单词
案例演示:
实现思路:
- 在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2
- 在RabbitMQ控制台中,声明交换机hmall. topic,将两个队列与其绑定
- 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
- 在publisher中编写测试方法,利用不同的RoutingKey向hmall. topic发送消息
代码实现:
消费者:
@Component
@Slf4j
public class SpringRabbitListener {
@RabbitListener(queues = "topic.queue1")
public void listenerWorkQueue1(String message){
log.info("消费者1接收到消息:{},---{}",message, LocalTime.now());
}
@RabbitListener(queues = "topic.queue2")
public void listenerWorkQueue2(String message){
log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());
}
}
发送者1:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
//交换机名称
String exchangeName="hmall.topic";
//消息
String message="中国新闻";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
}
消费者输出:
发送者2:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
//交换机名称
String exchangeName="hmall.topic";
//消息
String message="中国天气";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,"china.weather",message);
}
消费者输出:
AMQP
Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
RabbitMQ使用
后台可视化界面操作
-
创建用户
-
创建虚拟主机
-
为用户添加可访问的虚拟主机
注意:当前登录用户默认有权限访问其创建的所有虚拟主机。
-
创建队列
-
Durability:
Durable:持久化队列,Rabbit服务器重启后,这个队列还会存在
Transient:临时队列,Rabbit服务器重启后,这个队列将会被删除
-
-
查看队列的消费者
-
向队列中发布消息
-
获取队列中消息
队列中可以存储消息。当队列中的消息未被消费时,消息将存储在队列中,此时可以查看队列中的消息。
-
Act Mode:
Nack message requeue true:获取消息,但是不做ack应答确认,消息重新入队
Ack message requeue false:获取消息,应答确认,消息不重新入队,将会从队列中删除
reject requeue true:拒绝获取消息,消息重新入队
reject requeue false:拒绝获取消息,消息不重新入队,将会被删除
-
Encoding:可以选择将消息进行base64编码
-
Messages:从队列中获取的消息数量
-
-
清理消息
代码操作
-
引入依赖
<!-- AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
application.yaml中配置RabbitMQ
spring: rabbitmq: host: 192.168.1.2 # RabbitMQ地址 port: 5672 # 端口 virtual-host: /hmall # 虚拟主机 username: jack # 用户名 password: jack # 密码
创建队列和交换机
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
- Queue:用于声明队列,可以用工厂类QueueBuilder构建
- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
如果已经存在交换机、队列、绑定关系,运行代码时则不会进行创建,而且也不会报错。
通常发送者只需要关心消息发送,消费者关心队列、交换机、以及绑定关系,所以创建操作一般写在消费者中。
Sping提供了基于java bean和基于@RabbitListener
注解两种方式创建。
- 基于bean代码演示:
package com.itheima.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfiguration {
//声明交换机
@Bean
public FanoutExchange fanoutExchange(){
// 方式1
// return new FanoutExchange("hmall.fanout");
// 方式2
return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
}
//声明队列
@Bean
public Queue fanoutQueue1(){
// 方式1
// return new Queue("fanout.queue1",true);
// 方式2
return QueueBuilder.durable("fanout.queue1").build();
}
//将队列和交换机绑定
@Bean
public Binding fanoutQueue1Binding(Queue fanoutQueue1,FanoutExchange fanoutExchange){//spring会去找beanName='fanoutQueue1'的bean作为参数传进来
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Queue fanoutQueue2(){
return QueueBuilder.durable("fanout.queue2").build();
}
@Bean
public Binding fanoutQueue2Binding(Queue fanoutQueue2,FanoutExchange fanoutExchange){//spring会去找beanName='fanoutQueue2'的bean作为参数传进来
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
- 基于
@RabbitListener
注解代码演示:
@Component
@Slf4j
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding( //将交换机和队列绑定
value = @Queue(name="direct.queue1",durable = "true"), //如果没有队列direct.queue1则创建队列,并监听队列direct.queue1
exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT), //如果没有交换机hmall.direct则创建交换机
key = {"blue","red"} //routingKey
))
public void listenerWorkQueue1(String message){
log.info("消费者1接收到消息:{},---{}",message, LocalTime.now());
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name="direct.queue2",durable = "true"),
exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
key = {"yellow","red"}
))
public void listenerWorkQueue2(String message){
log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());
}
}
发送消息
-
直接发送给队列
方法:
public void convertAndSend(String routingKey, final Object object)
,直接发给队列时,routingKey相当于队列名。@Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimp leQueue() { //队列名称 String queueName = "simple. queue"; //消息 String message = "hello, spring amqp!"; //发送消息 rabbitTemplate.convertAndSend(queueName, message); }
注意:队列不显示绑定交换机时,默认还是会绑定到defalut exchange上
-
发送给Fanout Exchange
方法:
public void convertAndSend(String exchange, String routingKey, final Object object)
,使用Fanout Exchange时,routingKey相当于队列名,发送给Fanout Exchange时,routingKey传null或""@Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue(){ //交换机名称 String exchangeName="hmall.fanout"; //消息 String message="hello everyone"; //发送消息 rabbitTemplate.convertAndSend(exchangeName,null,message); }
-
发送给direct交换机
方法:
public void convertAndSend(String exchange, String routingKey, final Object object)
,routingKey就是交换机和队列绑定时的routingKey@Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue(){ //交换机名称 String exchangeName="hmall.direct"; //消息 String message_blue="hello blue"; String message_yellow="hello yellow"; String message_red="hello red"; //发送消息 rabbitTemplate.convertAndSend(exchangeName,"blue",message_blue); rabbitTemplate.convertAndSend(exchangeName,"yellow",message_yellow); rabbitTemplate.convertAndSend(exchangeName,"red",message_red); }
-
发送给topic交换机
方法:方法:
public void convertAndSend(String exchange, String routingKey, final Object object)
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue(){ //交换机名称 String exchangeName="hmall.topic"; //消息 String message="中国新闻"; //发送消息 rabbitTemplate.convertAndSend(exchangeName,"china.news",message); }
接收消息
@Component
@Slf4j
public class SpringRabbitListener {
@RabbitListener(queues = "队列名")
public void listenerSimpleQueue(String message){
log.info("消费者收到消息:{}",message);
}
}
配置消息转换器
convertAndSend方法会先将消息进行序列化,然后再发送。
Spring的对消息对象的处理是由org.springframework.amap.support.converter.Messageconverter来处理的。而
默认实现是SimpleMessageConverter,如果消息实现了Serializable接口,则会使用serialize方法进行序列化,而serialize方法是基于JDK的Objectoutputstream完成序列化的。存在下列问题:
- JDK的序列化有安全风险
- JDK序列化的消息太大
- JDK序列化的消息可读性差
建议采用JSON序列化代替默认的JDK序列化,要做两件事情:
-
在publisher和consumer中都要引入jackson依赖,发送者和消费者要使用相同的消息转换器:
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
-
在publisher和consumer中都要配置MessageConverter:
@Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
测试:
-
使用默认的消息转换器
发送者:
package com.itheima.publisher; import lombok.AllArgsConstructor; import lombok.Data; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.io.Serializable; @SpringBootTest class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { User jack = new User("jack", 18); rabbitTemplate.convertAndSend("testConvertMessage", jack); } } @Data @AllArgsConstructor class User implements Serializable { //要实现Serializable接口,否则convertAndSend方法进行消息转换时会抛出异常 private String name; private Integer age; }
查看消息:
消费者:
package com.itheima.consumer.mq; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.Serializable; @Component @Slf4j public class SpringRabbitListener { @RabbitListener(queues = "testConvertMessage") public void listenerWorkQueue(User message){ log.info("消费者接收到消息:{}",message); } } @Data @AllArgsConstructor @NoArgsConstructor class User implements Serializable { private String name; private Integer age; }
消费者输出:
-
配置消息转换器
发送者:
package com.itheima.publisher; import lombok.AllArgsConstructor; import lombok.Data; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { User jack = new User("jack", 18); rabbitTemplate.convertAndSend("testConvertMessage", jack); } } @Data @AllArgsConstructor class User { private String name; private Integer age; }
查看消息:
消费者:
package com.itheima.consumer.mq; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j public class SpringRabbitListener { @RabbitListener(queues = "testConvertMessage") public void listenerWorkQueue(User message){ //自动将json字符串转为User独享 log.info("消费者接收到消息:{}",message); } } @Data @AllArgsConstructor @NoArgsConstructor //消费者将消息转为User对象时,User对象一定要有空参构造器 class User { private String name; private Integer age; }
消费者输出:
消息可靠性
消息丢失三种情况:
- 发送者到MQ服务器时消息丢失
- MQ服务器宕机导致消息丢失
- MQ服务器将消息发送给消费者时消息丢失
发送者的可靠性
发送者重连
有的时候由于网络波动,可能会出现发送者连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制,默认重连机制是关闭的。
spring:
rabbitmq:
connection-timeout: 1s #设置MQ的连接超时时间,超过1秒钟还没有连上MQ则表示连接超时
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 initial-interval * multiplier
max-attempts: 3 # 最大重试次数
案例演示:
-
停止MQ
-
开启重连
spring: rabbitmq: host: 192.168.1.2 port: 5672 virtual-host: /hmall username: jack password: jack connection-timeout: 1s template: retry: enabled: true initial-interval: 1000ms multiplier: 1 max-attempts: 3
-
发送者发送消息
package com.itheima.publisher; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { rabbitTemplate.convertAndSend("testConvertMessage", "你好"); } }
-
消息发送失败
注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能,如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
发送者确认
SpringAMQP提供了Publisher Confirm和Publisher Return两种确认机制。开启确人机制后,当发送者发送消息给MQ后,MQ会返回确认结果给发送者。返回的结果有以下几种情况:
- 消息投递到了MQ,但是MQ路由失败。此时会通过PublisherReturn返回路由异常原因,然后PublisherConfirm返回ACK,告知发送者投递成功
- 临时消息投递到了MQ,并且入队成功,PublisherConfirm返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队且完成持久化,PublisherConfirm返回ACK,告知投递成功
- 其它情况都会返回NACK,告知投递失败
开启发送者确认机制:
-
开启配置
spring: rabbitmq: publisher-confirm-type: correlated # 开局publisher confirm机制,并设置confirm类型 publisher-returns: true # 开局publisher return机制
这里publisher-confirm-type有三种模式可选:
-
none:关闭confirm机制
-
simple:同步阻塞等待MQ的回执消息
-
correlated:MQ异步回调方式返回回执消息
-
-
为RabbitTemplate配置ReturnsCallback
每个RabbitTemplate只能配置一个ReturnsCallback,因此需要在项目启动过程中配置:
-
每次发送消息时,指定消息ID、消息ConfirmCallback
案例演示:
-
开启发送者确认配置
spring: rabbitmq: host: 192.168.1.2 # RabbitMQ地址 port: 5672 # 端口 virtual-host: /hmall # 虚拟主机 username: jack # 用户名 password: jack # 密码 publisher-confirm-type: correlated # 开局publisher confirm机制,并设置confirm类型 publisher-returns: true # 开局publisher return机制
-
定义ReturnsCallback
package com.itheima.publisher; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Configuration @Slf4j @AllArgsConstructor public class MqConfig { private final RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setReturnsCallback(returnedMessage -> { log.info("监听到了消息return callback"); log.info("exchange: {}", returnedMessage.getExchange()); log.info("routingKey: {}", returnedMessage.getRoutingKey()); log.info("message:{}", returnedMessage.getMessage()); log.info("replyCode: {}", returnedMessage.getReplyCode()); log.info("replyText: {}", returnedMessage.getReplyText()); }); } }
-
定义ConfirmCallback并发送消息
3.1 发送成功
package com.itheima.publisher; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.UUID; import java.util.concurrent.TimeUnit; @SpringBootTest @Slf4j class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testConfirmCallback() { //0. 创建CorrelationData,并设置消息ID CorrelationData cd = new CorrelationData(UUID.randomUUID().toString()); cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onFailure(Throwable ex) { log.error("spring amqp 处理确认结果异常", ex); } @Override public void onSuccess(CorrelationData.Confirm result) { if (result.isAck()) { log.info("收到ConfirmCallback ack,消息发送成功!"); } else { log.info("收到ConfirmCallback nack,消息发送失败!", result.getReason()); } } }); //1. 交换机名称 String exchangeName = "hmall.direct"; //2. 消息 String message = "测试发送者确认"; //3. 发送消息 rabbitTemplate.convertAndSend(exchangeName, "blue", message, cd); //4. 此单元测试方法执行完,main线程就结束了,因此需要睡眠2s接收回调函数 try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } }
3.2 发送失败-路由失败
rabbitTemplate.convertAndSend(exchangeName, "blue22", message, cd);
注意:发送者确认机制需要发送者和MQ进行确认,会大大影响消息发送的效率,通常情况下不建议开启发送者确认机制。
MQ的可靠性
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:
- 一旦MQ宕机,内存中的消息会丢失
- 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞
数据持久化
RabbitMQ实现数据持久化包括3个方面,设置为持久化后,重启MQ,交换机、队列、消息也不会丢失。
-
交换机持久化(新建交换机默认就是持久化)
D表示持久化
-
队列持久化(新建队列默认就是持久化)
-
消息持久化(可视化界面发送消息时默认是非持久化,SpringAmqp发送消息时默认是持久化的)
案例演示:
MQ接收非持久化消息
发送者发送1百万条非持久化消息
发送耗时:
MQ收到了一百万条非持久化消息
注意:本测试使用的MQ是3.13.3,默认使用的是Lazy Queue模式:所有的消息直接存入磁盘,不再存储到内存,所以In memory显示1。(paged out代表的就是从内存移动到磁盘中的消息的数量,Persistent表示存入磁盘且持久化的消息的数量)
重启MQ后,一百万条非持久化消息全部丢失
MQ接收持久化消息
发送者发送1百万条持久化消息
发送耗时:
MQ收到了一百万条持久化消息
注意:本测试使用的MQ是3.13.3,默认使用的是Lazy Queue模式:所有的消息直接存入磁盘,不再存储到内存,所以In memory显示1。(paged out代表的就是从内存移动到磁盘中的消息的数量,Persistent表示存入磁盘且持久化的消息的数量)
重启MQ后,一百万条持久化消息不会丢失
结论:
在接收非持久化消息时,MQ收到消息后会先将消息存到内存中的队列中,队列满了之后会把先收到的消息存到磁盘中(这个行为称为paged out,paged out会导致MQ阻塞),然后再继续接收消息,把消息存进内存中的队列中,队列满了之后再把队列中的消息存入磁盘中,以此类推。
在接收持久化消息时,MQ会直接将消息存到磁盘中,不会等内存中的队列满了之后再将消息保存到磁盘中。
发送一千万条非持久化消息耗时:
发送一千万条持久化消息耗时:
从上面发送者发送一百万条消息的耗时来看,发送持久化消息比发送非持久化消息耗时更少(不需要paged out),而且持久化消息在MQ重启后不会丢失,所以建议发送持久化消息。
Lazy Queue
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列。
惰性队列的特征如下:
- 接收到消息后直接存入磁盘,不再存储到内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)
在3.12版本后,所有队列都是Lazy Queue模式,无法更改。
3.12版本之前的MQ设置Lazy Queue模式有三种方式:
-
可视化界面设置
要设置一个队列为情性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:
-
Spring Bean方式设置
-
注解方式设置
非Lazy Queue模式+持久化消息和Lazy Queue模式+持久化消息MQ接收消息速度对比:
消费者的可靠性
消费者确认机制
消费者确认机制(Consumer Acknowledgement)是为了确认消费者是否成功处理消息。MQ将一条消息发送给消费者后,MQ上的这条消息处理待确认状态,当消费者处理消息结束后,应该向RabbitMO发送一个回执,告知RabbitMQ自己消息处理状态:
- ack:成功处理消息,RabbitMQ从队列中删除该消息
- nack:消息处理失败,RabbitMQ需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:
-
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
-
manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
-
auto:自动模式(默认模式)。SpringAMQP利用 AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack。当业务出现异常时,根据异常判断返回不同结果:
- 如果是业务异常(throw new RuntimeException),会自动返回nack
- 如果是消息处理或校验异常,自动返回reject
案例演示:
-
消费者配置
spring: rabbitmq: host: 192.168.1.2 # RabbitMQ地址 port: 5672 # 端口 virtual-host: /hmall # 虚拟主机 username: jack # 用户名 password: jack # 密码 listener: simple: prefetch: 1 acknowledge-mode: auto
-
消费者
-
发送者
查看消息状态:
-
因为消费者抛出业务异常,所以会给MQ发送nack,然后MQ不停地向消费者投递消息
查看消息内容
- 查看队列中的消息,提示队列是空的,所以得出结论:待确认的消息不保存在队列中
-
失败重试机制
SpringAMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限的requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:
案例演示:
- 消费者配置
spring:
rabbitmq:
host: 192.168.1.2 # RabbitMQ地址
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: jack # 用户名
password: jack # 密码
listener:
simple:
prefetch: 1
acknowledge-mode: auto
retry:
enabled: true
initial-interval: 1000ms
multiplier: 1
max-attempts: 3
stateless: true
-
消费者
-
发送者
-
消费者输出
-
查看消息状态
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer(默认):重试耗尽后,给MQ返回reject,MQ收到reject后会将消息丢弃。
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。
将失败处理策略改为RepublishMessageRecoverer:
-
首先,定义接收失败消息的交换机、队列及其绑定关系。
-
然后,定义RepublishMessageRecoverer:
案例演示:
- 定义接收失败消息的交换机、队列、绑定关系、RepublishMessageRecoverer
-
消费者
-
消费者输出
-
查看error.queue上的消息
业务幂等性
幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x)),例如求绝对值的函数。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。
消除非幂等性的手段:
-
唯一消息id
案例演示:
-
配置消息转换器
-
发送者发送消息
-
查看消息
-
消费者使用Message接收
-
业务判断
延迟消息
延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延迟任务:设置在一定时间之后才执行的任务
死信交换机
当一个队列中的消息满足下列情况之一时,就会成为死信 (dead letter)
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)
案例演示:
-
消费者中定义死信交换机和队列,并监听
-
定义普通交换机,不需要消费者