目录
- 消息可靠性
- 生产者确保将消息成功送入队列
- 消息确认
- 消息回执
- 消费者确保消息成功从队列中取出并成功消费
- 消费确认机制
- 消费失败重试机制
- 失败策略
- 使用第三种方式:消费者指定失败后转发的交换机
- 使用第一种方式:在队列中指定死信交换机
- 消息持久化问题
- 交换机持久化
- 队列持久化
- 消息持久化
- 死信交换机
- 配置处理失败的死信交换机
- 配置TTL死信交换机
- 延迟消息
- DelayExchange插件的安装
- 惰性队列 Lazy Quenes
消息可靠性
如果是日志收集,那么如果消息在MQ过程有消息丢失(消息者处理失败,网络故障、服务器故障等),此些消息较整体看来微乎其微,不那么重要,丢失就丢失了。如果考虑消息可靠性会使得性能下降,得不偿失。
如果是一个订单服务,向MQ中发送消息,因为是异步,那边已经通知下单成功了,那么就要确保这个消息一定不会丢失。这就一定要确保消息可靠性。虽然是异步调用,但要确保异步调用各个服务一定要处理成功。
消息可靠性考虑的问题:
- 消息一定要成功给到消费者
- 生产者负责部分不会出问题
- 消费者负责部分不会出问题
- 消息持久化问题
- 消费者一定要成功处理消息
生产者确保将消息成功送入队列
消息的可靠性对于生产者负责部分就是是否将消息成功发送到对应队列,这一过程分为两部分,第一部分是将消息发送给交换机过程是否是成功和交换机路由到队列是否成功。分别对应消息确认
和消费回执
消息确认
消息确认部分分为三个可能情况:
- 生产者将消息发送至交换机过程失败
- 交换机故障 如:没有对应的交换机,此时会返回
nack
- 成功将消息送入到指定交换机,返回
ack
默认RabbitMQ是不开启消息确认的,配置步骤如下:
4. 在配置文件中开启消息确认机制
spring:
rabbitmq:
publisher-confirm-type: correlated
- 配置消息,指定为对应失败的回调方法
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 消息的唯一性
correlationData.getFuture().addCallback(
result->{
if(result.isAck()){
// 成功回调
System.out.println("消息成功投递到交换机");
}
else{
// 失败回调 这里可以进行重试
System.out.println("消息未成功投递到交换机");
}
},
ex->{
System.out.println("发送消息失败");
}
);
在发送消息时,将correlationData
对象作为参数传递。
rabbitTemplate.convertAndSend("e1","",user,correlationData);
消息回执
消息回执是当交换机向队列进行路由时出错返回的
配置步骤如下:
- 在配置文件中
spring:
rabbitmq:
publisher-returns: true # 开启消息回执
template:
mandatory: true # true 为自定义消息回执回调 false为直接丢弃掉消息
- 配置消息回执的回调
一个rabbitMQ对象只能配置一个消息回执所以在类,消息回执只有在当交换机向消息队列中路由失败时,才会被执行
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 投递失败,记录日志
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有业务需要,可以重发消息
});
}
}
测试:
测试交换机出现故障:向一个不存在的交换机中发送消息,因为消息没有发送到交换机,所以会在消息确认中会调用nack方法
测试交换机没有路由到队列,将交换机与一个队列进行绑定,然后通过页面控制台将此队列删除
消费者确保消息成功从队列中取出并成功消费
当生产者已经成功将消息送到队列,那么生产者的使命就算完成。剩下一部分是由队列向消费者推送,然后消费者处理消息
但这一部又有两个部分可能出错:
- 消费者从队列中获取消息时失败
- 消息者成功从队列中获取消息,但处理失败
消费确认机制
无论是哪种都是消费者没有成功处理消息,判断思路就是:如果消费者成功处理消息就返回一个确认,只要是出现意外,队列就不会受到确认,就会认为消费者最终没有成功处理掉消息,就不会在队列中删除。
返回确认又分为三种:
- manual:手动返回,需要在代码中调用api
- auto:自动返回,通过spring的Aop特性,对代码进行增强,在代码执行后代理类会为其发送确认
- none:获取消息立即确认,只要消费者从队列中取出消息即默认会成功消费,队列中的消息就不会存在
这种方式不能保证消息的可靠性
修改方式
- 在配置文件中修改
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: 模式
思考:从消费者从队列取出消息到消费者成功消费,这一过程到底发送了几个确认机制?是消费者从队列取出就送一个确认,然后消费成功后再次发送一个确认,还是最终成功消费后只发送一个确认?
队列向队列中发送消息时,会将消息状态设置为unacked
,如果消费者发送了确认就会将消息删除。
测试:先来看一下当消费者从队列中取出消息,但消费失败,默认情况是怎样的呢?
经过测试发现,消费者消费失败,会不断的进行重试
。当停止失败重试后,消息会重新入队。(默认就是这种)
控制台信息,注意这里的控制台是3.9版本的,如果版本过低可能会不一样
这种方式最大的问题就是:一但消费失败,就像踢皮球一样队列中的消息不断向消费者推送,不断的从Unacked状态到ready状态切换,大量耗费MQ的资源,如上图,MQ消息发送条数已经飙到1700条/秒
如果将模式设置为nono,只要消息被消费者取出,即使消费者消费失败
消息也会被队列丢弃
消息重试也可以进行配置,默认是只要失败就不停的重试,不断的由队列向消费者发送
。可以对其进行配置,使得按照指定规则在本地重试。
消费失败重试机制
消费者失败重试机制是另一种消费可靠机制,它不在采用消费确认返回ack,然后队列收到确认删除消息这种机制,而是队列只要发送消息给消费者就认为消费者会成功,然后从队列中删除掉消息(相当于none模式),但消费者消费失败呢?会在本地进行重试,注意是本地进行重试,也就是队列只向消费者发送一次,这样不会影响到MQ的性能。消费者成功处理后会自动向队列返回一个ack,失败了会本地尝试,次数耗尽后,根据对应的失败策略,采取相应措施
配置步骤:
将消费者消息确认改为
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
这时进行测试:
当消费者取出消息时,MQ对此消息标记为Unacked
,当消息处理失败后,会进行本地重试,当重试次数超过配置上限,MQ依然会将消息丢弃。
可以打断点判断最终重试了几次,当消费失败后,如果没有超过失败上限,会自动重新调用回调方法
当重试次数超过上限,也会自动发送一个reject
,队列端删除掉消息
那么这就带来一个问题:如果本地多次尝试仍然失败,消息可靠性依然没有得到保证
那么这就需要修改失败策略,默认是当失败次数达到上限也返回reject
,使得消息在队列中被删除
失败策略
失败策略共有以下几种
-
RejectAndDontRequeueRecoverer:重试耗尽后,直接
reject,丢弃消息
。当选择本地重试后,失败达到上限,默认就是这种,上面已经演示 -
ImmediateRequeueMessageRecoverer:重试耗尽后,返回
nack,消息重新入队
,重新踢皮球 -
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
这里介绍一下最后一种处理方法:,当消费者处理消息失败达到上限,自己会作为一个生产者,再向其他交换机发送消息
使用第三种方式:消费者指定失败后转发的交换机
这种方式就好比我是乙方,但我处理不了,因此我作为甲方再找一个乙方去处理
测试:当向生产者向交换机发送消息,交换机路由到队列,消费者从队列中取出消息时,开始本地处理,处理失败接着重新处理。当处理失败达到上限后,就要转发给配置好的失败处理交换机了。
- 生产者向交换机发送一个消息
- 消费者从对列中取出消息,多次处理失败,但没有达到上限,继续重试
- 当消费者处理失败次数达到上限
那么让我们在错误队列中查看一下消费者处理失败转发的消息,可以看到不仅有原消息内容,消费者还给消息添加上了错误信息
在使用一个服务去监听这个错误队列,这样就使得其他服务失败的消息能够进行统一再次处理
使用第一种方式:在队列中指定死信交换机
由于死信交换机的用途不止这一种场景,为了结构能清晰,这里只大概介绍。后续会详细介绍
使用死信交换机是在消息队列中配置,当消费者本地处理消息失败达到上限后,采用第一种失败策略即返回reject,原本消息队列接收到此确认后和ack一样:直接将消息丢弃,但配置了死信交换机后,会转发给死信交换机,死信交换机会路由到死信队列,然后同样有一个专门处理失败消息的消费者。
这种方式就好比我是乙方,但我处理不了,我直接告诉甲方,甲方再去找其他人作为乙方
消息持久化问题
上面通过生产者消费确认机制,消费者确认机制已经能够实现,生产者转发到的消息一定消费者一定能够成功处理。但如果在消息中间过程,MQ发生宕机,消息已经到达队列,向生产者已经确认,但消费者却不知到有给它的消息,这时宕机没有持久化,整个消息不就丢失了嘛?
为了保证整个链路宕机后,依然能够恢复至宕机前状态,就要考虑三个持久化问题
- 交换机持久化
- 队列持久化
- 消息持久化
值的注意的是:使用SpringAMQP声明或创建的交换机
、队列
以及发送的消息
默认都是持久化
的。
测试:
我刚在在测试错误交换机时,声明的交换机、队列,以及队列中的消息都是还在的,此时我是用docker将RabbitMQ进行重启,判断是否还在
当然,使用springAMQP时也可以显示的声明一下持久化(也是设置非持久化的方法)
交换机持久化
@Bean
public DirectExchange simpleExchange(){
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new DirectExchange("simple.direct", true, false);
}
队列持久化
@Bean
public Queue errorQueue(){
return new Queue("error.queue",true);
}
消息持久化
那当我们只放入一个String会有哪些操作呢?
总结:
上面已经提到消费者消息处理失败后采用本地重试,当重试次数超过规定,可以由消费者向某个交换机转发它未能处理的消息,也可以向队列发出一个reject,让消息成为死信,由队列指定的死信交换机处理。这里就来讲一下队列如何使用死信交换机
死信交换机
死信交换机,顾名思义就是将各个队列中的死信发往到此交换机
,Dead Letter Exchange简称DLK
那么什么是死信呢?死信共分为三种情况
未被成功消费的消息
:消费者采用本地重试机制,失败策略为最终发送reject
,此时队列中的此消息未被成功消费就成了死信超时的消息
:可能是队列设定了ttl,队列超时也可能是消息设置了ttl,消息超时。队列超时则队列中所有消息都成了死信,消息超时只此消息为死信队列溢出的消息
:因为队列空间有限,当消息长时间放在队列中,直到队列没有空间去暂存这些消息,就会将最早的一些消息释放掉,成为死信,再去存放新的消息
如何让一个交换机成为死信交换机?
死信交换机就是一个普通的交换机,死信交换机的生产者是队列
,向其发送的队列指定其死信交换机,然后队列中的死信就会自动向死信交换机发送
配置处理失败的死信交换机
规划
类型 | 名称 |
---|---|
死信交换机 | dl.e2 |
死信队列 | dl.q2 |
普通交换机 | e2 |
普通队列 | q2 |
配置:当q2队列的消费者处理失败后,交给死信队列dlq2的消费者
- 配置死信队列,就是一个普通的一组交换机和队列,以及绑定关系。并为队列指定消费者
@Bean
public DirectExchange dle2(){
return new DirectExchange("dle2");
}
@Bean
public Queue dlq2(){
return new Queue("dlq2");
}
@Bean
public Binding dlbinding(){
return BindingBuilder.bind(dlq2()).to(dle2()).with("a1");
}
@RabbitListener(queues = "dlq2")
public void listenDirectQueue2(String msg) {
System.out.println("死信队列中"+msg);
}
- 配置正常情况下的交换机和队列,并将其绑定。队列要指定死信交换机,然后指定其消费者
@Bean
public Queue q2(){
return QueueBuilder.durable("q2")
.deadLetterExchange("dle2")
.build();
}
@Bean
public DirectExchange e2(){
return new DirectExchange("e2");
}
@Bean
public Binding binding(){
return BindingBuilder.bind(q2()).to(e2()).with("a1");
}
指定消费者,消费者要求是配置本地重试机制,同时消费者会处理失败
- 生产者向交换机发送消息,对于生产者是无感知的
测试结果:
q2队列尝试两次失败,然后dlq2队列获取到消息
配置TTL死信交换机
将队列设置消息有效期,但这个队列消息没有消费者,也就是说这个向这个队列中发送的消息,由于不可能被消费,超过队列的有效期就会成为死信,再为队列设置死信交换机,然后为死信交换机绑定队列,有一个消费者监控这个队列,这样就能够实现将消息定时后发送到这个消者。
因此,TTL死信队列可以用来做消息定时发送(有效期的队列没有消费者,消息必成为死信),超时消息处理(将消费者超时未处理的消息交给其他消费者)。
当然,不仅可以为队列设置有效期,可以为消息设置有效期。队列有效期相当于为队列中所有消息设置了有效期,而消息有效期是指单个,超时了一样会作为此队列的死信
设置队列所有消息延迟消息步骤: 如果为有效期队列指定消费者功能就变为了:超时消息由其他交换机处理
规划
类型 | 名称 |
---|---|
死信交换机 | dl.e1 |
死信队列 | dl.q1 |
普通交换机 | e1 |
普通队列 | q1 |
-
先创建一个死信交换机,然后绑定队列,队列另一端有消费者监听(死信交换机和普通的相同)
-
创建一个交换机和一个定时队列,并指定其死信交换机为前面配置的交换机
值的注意的是:延迟队列不能有消费者,因为有消费者可能消息就不会在队列中堵塞不超过有效期,没等成为死信就被消费了 -
生产者向交换机发送消息,交换机发送会有效期队列
设置单个消息延迟,这里不再为队列指定TTL,而是在生产者端为消息指定TTL,至于死信队列和以上相同
例如:将消息放入Message 对象中,在对象中指定TTL
Message message = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")
.build();
思考:共有几种方式设置消息有效期?
共有两种,一种是为队列中所有消息指定有效期,一种是为单个消息指定有效期
如何实现订单15分钟有效?
将订单信息放入一个延迟队列中,该队列设置有效期为15分钟,但不设置消费者,订单信息在此队列中阻塞15分钟后交给死信交换机,死信交换机对应的死信队列的消费者获取到消息后根据信息取消订单。
当然,要想实现消息延迟发送,不止可以使用设置队列有效期然后通过死信队列消费这种方式,RabbitMQ还提供了一个专用于延迟消息
的插件。至于由于消息队列已满,当新消息插入时,队列中最老的消息成为死信,这里不再演示。接下来,将会介绍通过使用插件的方式实现延迟消息
延迟消息
延迟消息的应用场景:延迟消息可以用来设置有效期,定时,预约等
官方提供了一个DelayExchange
插件用来实现延迟消息,它是基于交换机实现的而不是队列。虽然交换机不能存储消息,但是使用了插件
DelayExchange插件的安装
- 首先,先下载插件,注意插件的版本要和MQ的版本一致,下载地址https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
- 然后将插件放入MQ的插件目录中
这里的RabbitMQ是使用Dokcer启动的,在启动时已经将插件目录挂载到了外面。
RabbitMQ启动时的命令
docker run \
-e RABBITMQ_DEFAULT_USER=yan\
-e RABBITMQ_DEFAULT_PASS=1234 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
3.9.27-management-alpine
命令中将MQ的插件目录挂载到了名为mq-plugins
的逻辑卷,使用命令docker volume inspect mq-plugins
查看逻辑卷对应的宿主机目录
将.ez
结尾的插件放入此目录中
为什么我们的插件显得格格不入,因为还需要安装
- 安装插件
安装插件需要进入容器进行安装,使用命令docker exec -it mq bash
,然后使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchange
安装完成
安装完成
那么延迟交换机是怎样使用的呢? 概括就是指定交换机为delayed类型
,设置消息的延迟时间
延迟交换机是在普通交换机的基础上声明为delayed
类型,然后在发送消息时,如果是向此交换机发送的是延迟消息,就要在在消息的头部添加属性x-delay
,值为想要延迟的时间。当发送消息时,延迟交换机会根据消息头部属性x-delay
的值知道延迟时间,并将此消息持久化到磁盘中,当时间一到,重新投递到与之绑定的队列中。
也就是说虽然将交换机声明了为延迟交换机
,但它是在普通交换机基础上进行的增强,仍具有普通交换机的功能。
下面使用代码来演示一下这一过程:
声明交换机为延迟交换机,采用的方式同样有@Bean的方式和注解两种方式
- 使用注解方式
- 使用@Bean的方式
生产者在发送消息时,需要消息在消息的请求头添加属性x-delay
,值为延迟毫秒数
在测试时发现,当发送消息到延迟交换机,由于交换机并未及时将消息路由到队列,会触发消息可靠性的消息回执方法,可根据返回信息进行判断,忽略这种情况
使用延迟消息插件可以替代使用死信队列的方式,在死信问题上还有一个点就是由于队列空间不足导致队列中最老的消息成为死信,可以使用交换机接收死信进行处理,但如何避免这种情况呢?
- 增大消费者的处理消费能力
- 提高队列的容量
提高消费者处理能力不在MQ的范围,这里探讨第二种即提升消息队列的方法
惰性队列 Lazy Quenes
什么是惰性队列? 简单理解就是将消息直接存入磁盘,用时再从磁盘进行读取。
为什么称之为惰性队列? 因此磁盘速度较内存慢
那为什么使用惰性队列? 惰性队列是存在磁盘中的,磁盘虽然速度慢但其空间大,有效解决消息堆积问题!而且速度稳定,不使用惰性队列时,内存中的消息达到一定数量也要放转入磁盘,但这IO过程MQ的并发就会下降,呈现忽高忽低的现象。而是用惰性队列,消息直接放入磁盘,没有间歇性page-out
,速度较为稳定。
那惰性队列有什么缺点呢? 惰性队列的缺点就是磁盘存储数据的缺点:存入与取出都要经过磁盘,时效性差!磁盘IO将相较于内存性能差!
惰性队列的使用
惰性队列是队列的一种,它是MQ3.6
版本后推出的。
惰性队列的使用同样是在声明队列时,也同样会有两种方式@Bean和注解,注意延迟消息是对交换机进行配置,惰性队列是对队列进行配置
- @Bean的方式
- 注解的方式
发送消息是不变的,只是将队列的属性进行修改,使其将消息直接存入磁盘,需要时再从磁盘取出
此外,还可以使用命令将已经存在的队列修改为惰性队列
例如:我要将一个普通名为q4的队列修改为惰性队列
在MQ容器内部使用命令rabbitmqctl set_policy Lazy "^q4$" '{"queue-mode":"lazy"}' --apply-to queues