您好,我是码农飞哥(wei158556),感谢您阅读本文,欢迎一键三连哦。
💪🏻 1. Python基础专栏,基础知识一网打尽,9.9元买不了吃亏,买不了上当。 Python从入门到精通
😁 2. 毕业设计专栏,毕业季咱们不慌忙,几百款毕业设计等你选。
❤️ 3. Python爬虫专栏,系统性的学习爬虫的知识点。9.9元买不了吃亏,买不了上当 。python爬虫入门进阶
❤️ 4. Ceph实战,从原理到实战应有尽有。 Ceph实战
❤️ 5. Java高并发编程入门,打卡学习Java高并发。 Java高并发编程入门
文章目录
- 1. 消费重试
- 1.1. 消费重试应用场景
- 1.2. 消费重试的原理
- 1.2.1. 消息重试触发的条件
- 1.3. 消费重试次数
- 2. 死信队列
- 3. 消息幂等问题的出现
- 4. 如何保证幂等性消费呢?
1. 消费重试
消息重试是指消费者在消费某条消息失败之后,RocketMQ服务端会根据重试策略重新消费该消息,若超过最大重试次数还未消费成功则不在进行重新消费,而是直接将该消息发送到死信队列中。
1.1. 消费重试应用场景
消息重试主要是为了解决偶发性的消息消费失败导致的消费完整性问题,这些消费失败的原因包括业务处理逻辑的问题,网络抖动问题。
消费重试应用场景主要有两个:
- 业务处理失败,且失败的原因跟当前的消息内容相关,比如该消息对应的事务状态还未获取到,预期一段时间后可执行成功。
- 消息失败的原因是偶发性的,比如由于网络抖动,消费者消费时宕机等偶发性的问题导致的失败,后续的消息大概率会消费成功。
不要把消息失败来作为条件判断的结果分流,也不要通过使用消息失败来对处理速率限流。
1.2. 消费重试的原理
消费重试的状态机如下图所示:会重试的消息可能会经历如下四种状态。
- Ready: 已就绪状态,消息在RocketMQ服务端中准备就绪,可以被消费者消费
- Inflight:处理中状态,消息正在被消费者获取,处于消费中还没返回消费结果的状态。
- Commit: 提交状态:消息被消费者消费成功,消费者返回成功响应,消息会结束重试。
- Wait Retry: 待重试状态:PushComsumer独有的状态,当消费者消费失败或者消费超时,会触发消费重试机制。如果当前重试次数未达到最大重试次数,则该消息会变成待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。
- DLQ:死信队列:当消息消费失败,并且消费重试的次数超过最大重试次数(默认是16次)之后,RocketMQ服务端会结束该消息的重试,并且将该消息直接发送到死信队列中。
1.2.1. 消息重试触发的条件
- 消费失败:
当消息消费失败就会触发消费重试,即消费者没有向RocketMQ服务端返回offset的情况下都被认为是消费失败。都会触发消费重试。
对应的代码没有返回 CONSUME_SUCCESS 的状态是:
// 4.创建一个回调函数
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// 5.处理消息
for (MessageExt msg : msgs) {
System.out.println(msg);
System.out.println("收到的消息内容:" + new String(msg.getBody()));
}
// 1. 消费监听返回null则会消费重试
return null;
//2.消费监听返回RECONSUME_LATER也会消费重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
});
- 消息处理超时,包括在PushConsumer中排队超时。
1.3. 消费重试次数
RocketMQ 会为每个消费组都设置一个Topic名称为"%RETRY%+consumerGroup"的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费者组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而消费失败的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重试间隔,随着重试次数的增多,重试间隔也会越来越大。
RocketMQ对于重试消息的处理是先保存至Topic名为 “SCHEDULE_TOPIC_XXXX” 的延迟队列,后台定时任务按照对应的时间进行Delay后重新保存至 “%RETRY%+consumerGroup” 的重试队列中。
与延迟队列的设置相同,消息默认会重试16次,每次重试的时间间隔如下:
10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
2. 死信队列
前面说某个消息被重试超过最大重试次数16次之后,则会被直接发送到死信队列中。也就是说死信队列用来存放的是无法被正常消费的消息。
RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message), 将存放死信消息的队列称为死信队列(Dead-Letter Queue)。可以使用Console控制台对死信队列里的消息进行重发来使得消费者可以进行重新消费。
死信队列具备如下特点:
- RocketMQ会自动为需要死信队列的消费者组创建死信队列。
- 死信队列与消费者组对应,死信队列中包含该消费者组所有相关Topic的死信消息。
- 死信队列中消息的有效期与正常消息相同,默认48小时。
- 若要消费死信队列中的消息,需要在控制台将死信队列的权限设置为6,即可读可写。
3. 消息幂等问题的出现
幂等的定义:幂等性指的是多次操作造成的结果是一致的。在http接口中查询操作是幂等的,
新增操作:非幂等的,每次都会插入新数据
更新操作:幂等的,对同样的数据进行修改
删除操作:根据id删除是幂等的。
那么,非幂等的操作如何保证幂等性呢?
消息队列中,很可能会存在一条消息被重复发送,或者一条消息被多个消费者消费。对于像用户注册等非幂等操作,就需要做幂等性保证。可以将情况概况为如下几种:
-
生产者重复发送消息:由于网络抖动,导致生产者没有收到broker的ack消息而重发消息,从而造成消息队列中消息重复。
-
消费者重复消费消息:由于网络抖动,导致消费者没有返回ack给broker,导致消费者重复消费。
-
rebalance时的重复消费:由于网络抖动,在rebalance重分配时也可能会出现消费者重复消费某条消息的情况。
4. 如何保证幂等性消费呢?
- mysql 插入业务id作为主键,主键是唯一的,所以一次只能插入一条
- 使用Redis或zk的分布式锁(主流的方案)
比如在创建订单场景下,我们在发送消息的时候传入orderId作为业务唯一ID。当消息重复发送或者重复消息的时候可以根据订单ID 来做一个逻辑判断。
为了防止两个消费者同时消费相同重复消息的情况,这时候可以在OderId上加上分布式锁,保证同一时间内相同的消息只能有一个消费者消费。