5.RabbitMQ高级特性
为什么不是RabbitMQ的事务?
这是从官网直接翻译过来的:网络可能以不太明显的方式出现故障,而且检测某些故障需要时间。因此,向套接字编写协议帧或一组帧(例如发布的消息)的客户端不能假定消息已经到达服务器并已成功处理。货物可能在运输途中丢失,或者严重延误交货。使用标准AMQP 0-9-1,保证消息不丢失的唯一方法是使用事务–使通道成为事务性的,然后对每个消息或消息集进行发布、提交。在这种情况下,事务没必要地过于重量级,从而将吞吐量降低了250倍。为了解决这个问题,引入了一种确认机制。它模仿了协议中已经存在的消费者确认机制。
1.1 消息的可靠投递
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
- confirm 确认模式
- return 退回模式
rabbitmg 整个消息投递的路径为:
producer—>rabbitmg broker—>exchange—>queue—>consumer
-
消息从 producer 到 exchange 则会返回一个 confirmCallback。
-
消息从 exchange–>queue 投递失败则会返回一个 returnCallback。
我们将利用这两个 callback 控制消息的可靠性投递。
操作总结
-
设置ConnectionFactorv的publisher-confirms="true"开启 确认模式。
-
使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。
在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。 -
设置ConnectionFactorv的publisher-returns=“true” 开启 退回模式。
-
使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如
果设置了rabbitTemplate.setMandatorv(true)参数,则会将消息退回给producer。并执行回调函数 returnedMessage。
注意:主要针对生产者,发送的时候注意发送完消息让消息休眠下,不然会被关闭,无法接受反馈的信息。
1.2 Consumer Ack
ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。有三种确认方式:
- 自动确认:acknowledge="none“
- 手动确认:acknowledge="manual”
- 根据异常情况确认:acknowledge="auto“
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从
RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用
channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
操作总结(基于SpringMVC)
-
在rabbit:listener-container标签中设置acknowledge属性,设置ack方式none:自动确认。 manual:手动确认
-
如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
-
如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
1.3 消费端限流
当生产者向MQ中每秒发送5000个请求,消费端可以从connetion-factory 配置中配置一次去多少条消息。
总结:
在rabbit:listener-container 中配置 prefetch属性设置消费端一次拉取多少消息
消费端的确认模式一定为手动确认。acknowledge=“manual”
1.4 TTL(Time To Live)
- TTL 全称 Time To Live(存活时间/过期时间)。
- 当消息到达存活时间后,还没有被消费,会被自动清除。
- RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间,
注意:
-
设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
-
设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
-
如果两者都进行了设置,以时间短的为准。
1.5 死信队列
死信队列,英文缩写:DLX。Dead Letter Exchange (死信交换机),当消息成为Dead message后,可以被重新发动到另一个交换机,这个交换机就是DLX。
消息成为死信的三种情况:
1.队列消息长度到达限制;
2.消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
3.原队列存在消息过期设置,消息到达超时时间未被消费;
队列绑定死信交换机:
给队列设置参数:x-dead-letter-exchange 和 x-dead-letter-routing-key ,使用场景多少分钟后去做一件事情,例如订单30分钟失效,唯品会的购物车商品2小时之后放入库存等一些定时的操作。使用时情况如下
总结:
1.死信交换机和死信队列和普通的没有区别
2.当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
3.消息成为死信的三种情况:
1>队列消息长度到达限制;
2>消费者拒接消费消息,并且不重回队列;
3>原队列存在消息过期设置,消息到达超时时间未被消费;
1.6 延迟队列
延迟队列:顾名思义,消息进入队列后不会立即被消费,只有达到指定时间后,才会被消费。
需求:
- 下单后,30分钟未支付,取消订单,回滚库存。
- 新用户注册成功7天后,发送短信问候。
实现方式:
-
定时器
-
延迟队列
分析:
1.定时器:实现功能没什么问题,但是我们需要每分钟的去扫描订单是否到时间,会对服务本身或数据库造成一定压力,数据少的时候影响不大;一旦过时没有执行再次执行比较困难。
2.延迟队列:可以实现同样的功能,并且可以减轻服务和数据库的压力,若MQ出问题,服务恢复之后还可以继续执行。
1.7 消息幂等性保障
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消息多条相同的消息与得到与消费该消息一次相同的结果。将消息和数据库中添加版本号,是最好的做法之一。
实际例子操作如下:
1.8消息积压的情况
-
消费者宕机积压;
-
消费者消费能力不足积压;
-
发送者流量太大;
解决方式:
- 上线更多消费者服务;
- 上线专门的队列消费服务,将消息批量取出来,记录数据库,慢慢处理。