文章目录
- 消息发送重试
- 重试触发条件
- 重试流程
- 重试间隔
- 重试常见问题
- 消息流控机制
- 流控触发条件
- 生产者控制消息发送重试次数
- gRPC 客户端
- remoting 客户端
- 消费重试
- 重试触发条件
- PushConsumer 消费重试策略
- PushConsumer 重试间隔时间
- 修改 PushConsumer 最大重试次数
- gRPC 协议端口
- Remoting 协议端口
- SimpleConsumer 消费重试策略
- SimpleConsumer 消费重试时间间隔
- 修改 SimpleConsumer 最大重试次数
- 消息重试注意问题
- 消息重试导致的重复消息和重复消费的问题(消息幂等性)
- gRPC 协议消费者重试示例
- 添加消费者分组,并设置重试次数
- 生产者
- 消费者
- 死信队列
- 消费死信队列
- 如何查看死信消息
RocketMQ 消息重试分为发送重试(生产者)和消费重试(消费者)
消息发送重试
RocketMQ 客户端连接服务端发起消息发送请求时,可能会因为网络故障、服务异常等原因导致调用失败。为保证消息的可靠性, RocketMQ 在客户端SDK中内置请求重试逻辑,尝试通过重试发送达到最终调用成功的效果。同步发送和异步发送模式均支持消息发送重试。
重试触发条件
- 客户端消息发送请求调用失败或请求超时
- 网络异常造成连接失败或请求超时。
- 服务端节点处于重启或下线等状态造成连接失败。
- 服务端运行慢造成请求超时。
- 服务端返回失败错误码
- 系统逻辑错误:因运行逻辑不正确造成的错误。
- 系统流控错误:因容量超限造成的流控错误。
对于事务消息,网络超时或异常等场景不会进行重试。
重试流程
生产者在初始化时设置消息发送最大重试次数,当出现上述触发条件的场景时,生产者客户端会按照设置的重试次数一直重试发送消息,直到消息发送成功或达到最大重试次数重试结束,并在最后一次重试失败后返回调用错误响应。
- 同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败,抛出错误码和异常。
- 异步发送:调用线程不会阻塞,但调用结果会通过异常事件或者成功事件返回。
重试间隔
-
除服务端返回系统流控错误场景,其他触发条件触发重试后,均会立即进行重试,无等待间隔。
-
若由于服务端返回流控错误触发重试,系统会按照指数退避策略进行延迟重试。指数退避算法通过以下参数控制重试行为:
-
INITIAL_BACKOFF: 第一次失败重试前后需等待多久,默认值:1秒。
-
MULTIPLIER :指数退避因子,即退避倍率,默认值:1.6。
-
JITTER :随机抖动因子,默认值:0.2。
-
MAX_BACKOFF :等待间隔时间上限,默认值:120秒
-
MIN_CONNECT_TIMEOUT :最短重试间隔,默认值:20秒。
-
计算算法如下:
ConnectWithBackoff()
current_backoff = INITIAL_BACKOFF
current_deadline = now() + INITIAL_BACKOFF
while (TryConnect(Max(current_deadline, now() + MIN_CONNECT_TIMEOUT))!= SUCCESS){
SleepUntil(current_deadline)
current_backoff = Min(current_backoff * MULTIPLIER, MAX_BACKOFF)
current_deadline = now() + current_backoff + UniformRandom(-JITTER * current_backoff, JITTER * current_backoff)
}
重试常见问题
消息肯定不能无限重试,所以生产者可以控制最大重试次数,如果最终重试还是失败,将会抛出异常。重试还可能造成消息重复(这需要根据业务逻辑自己编码处理 —— 幂等性)
消息流控机制
消息流控指的是系统容量或水位过高, RocketMQ 服务端会通过快速失败返回流控错误来避免底层资源承受过高压力。
流控触发条件
- 存储压力大:大量的消息需要同时存储时,MQ 存储压力瞬间飙升,会触发消息流控。
- 服务端请求任务排队溢出:若消费者消费能力不足,导致队列中有大量堆积消息,当堆积消息超过一定数量后会触发消息流控,减少下游消费系统压力。
当前系统触发流控是,客户端一般会收到错误和异常信息如下:
- reply-code:530
- reply-text:TOO_MANY_REQUESTS
生产者控制消息发送重试次数
gRPC 客户端
// 构建生产者
Producer producer = provider.newProducerBuilder()
// Topics 列表:生产者和主题是多对多的关系,同一个生产者可以向多个主题发送消息
.setTopics("MY_FIFO_TOPIC")
.setClientConfiguration(configuration)
// 设置消息发送重试次数(默认:3 次)
.setMaxAttempts(3)
// 构建生产者,此方法会抛出 ClientException 异常
.build();
remoting 客户端
// 设置同步发送重试次数(默认:2)
producer.setRetryTimesWhenSendFailed(2);
// 设置一般发送重试次数(默认:2)
producer.setRetryTimesWhenSendAsyncFailed(2);
消费重试
消费重试指的是,消费者在消费某条消息失败后,RocketMQ 服务端会根据重试策略重新消费该消息,超过一次定数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中。
重试触发条件
- 消费失败:包括消费者返回失败状态或抛出异常
- 消息处理超时
不同的消费者类型,重试触发条件是一样的,但 PushConsumer 和 SimpleConsumer 重试策略稍有不同。
PushConsumer 消费重试策略
PushConsumer 消费的消息,涉及到的状态如下(DLQ:dead letter queue 死信队列):
- Ready:就绪状态的消息才能被消费者获取。(参考 SimpleConsumer 的消费不可见时间和重试等待时间)
- Inflight:处理中,消费者获取消息,执行消息但尚未执行结束返回消费结果。
- Wait Retry:等待重试。PushConsumer独有的状态。当消费失败或超时,但重试次数未达上限时的状态。此状态经过重试时间间隔后消息将重新进入 Ready 状态,等待消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。
- Acked:成功消费状态。达到此状态,说明消息被成功消费。
- DLQ:死信状态。消费重试到达上限时,消息不会再此重试,会被投递到死信队列。我们可以通过消费死信队列的消息进行业务修复。
PushConsumer 重试间隔时间
- 非顺序消息
第几次重试 | 间隔时间 |
---|---|
1 | 10s |
2 | 30s |
3 | 1m |
4 | 2m |
5 | 3m |
6 | 4m |
7 | 5m |
8 | 6m |
9 | 7m |
10 | 8m |
11 | 9m |
12 | 10m |
13 | 20m |
14 | 30m |
15 | 1h |
16 | 2h |
>16 大于16次,后续间隔都为2h | 2h |
- 顺序消息重试间隔为固定时间,默认为:3000ms(因为前面消息在重试,后面的消息在排队,无法消费)
修改 PushConsumer 最大重试次数
gRPC 协议端口
gRPC 协议端口的重试次数设置,以及顺序消费的设置都是设置在消费者分组创建时的元数据控制,也就是说我们在编写代码都时候不需要在代码中设置,请参考前篇《RocketMQ 消费者分类与分组》创建或修改消费者分组来设置。
PushConsumerBuilder 在 build 方法中实例化 PushConsumer 的实现的时候,会读取 MQ 服务端对应消费者分组的设置,也就是说 gRPC 协议的客户端,不允许在客户端代码中修改相关消费者分组的设置。
Remoting 协议端口
// 默认即为 16 次
consumer.setMaxReconsumeTimes(16);
SimpleConsumer 消费重试策略
SimpleConsumer 没有 wait retry 状态。消费失败后根据 InvisibleDuration (消费不可见时间)来计算时间间隔
SimpleConsumer 消费重试时间间隔
消息重试间隔 = InvisibleDuration (不可见时间)- 消息实际处理时长
例如,消息不可见时间为30 ms,实际消息处理用了10 ms就返回失败响应,则距下次消息重试还需要20 ms,此时的消息重试间隔即为20 ms;若直到30 ms消息还未处理完成且未返回结果,则消息超时,立即重试,此时重试间隔即为0 ms。
为了避免 InvisibleDuration 时间小于消息实际处理时长,在消息消费过程中,我们可以动态的调整InvisibleDuration 的时长,来避免此类情况出现。
// 修改 InvisibleDuration
simpleConsumer.changeInvisibleDuration();
simpleConsumer.changeInvisibleDurationAsync()
修改 SimpleConsumer 最大重试次数
SimpleConsumer 重试次数的修改与PushConsumer 相同
消息重试注意问题
消息重试适用业务处理失败且当前消费为小概率事件的场景,是为了解决偶发情况,消费失败。如果消费重试情况经常出现,请考虑修改相应业务逻辑或修改相关代码。
顺序消息频发重试,可能导致顺序消息堆积。
消息重试导致的重复消息和重复消费的问题(消息幂等性)
- 重复消息:生产者重试可能导致消息重复
- 重复消费:消费者重试可能导致重复消费
RocketMQ 无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
注:msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
gRPC 协议消费者重试示例
添加消费者分组,并设置重试次数
$> ./mqadmin updateSubGroup -n 127.0.0.1:9876 -g MY_RETRY_GROUP -r 3 -c DefaultCluster
我们此处为了验证方便,设置重试次数为 3 次。
生产者
import com.yyoo.mq.rocket.MyMQProperties;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class RetryProducerDemo {
public static void main(String[] args) throws ClientException {
// 用于提供:生产者、消费者、消息对应的构建类 Builder
ClientServiceProvider provider = ClientServiceProvider.loadService();
// 构建配置类(包含端点位置、认证以及连接超时等的配置)
ClientConfiguration configuration = ClientConfiguration.newBuilder()
// endpoints 即为 proxy 的地址,多个用分号隔开。如:xxx:8081;xxx:8081
.setEndpoints(MyMQProperties.ENDPOINTS)
.build();
// 构建生产者
Producer producer = provider.newProducerBuilder()
// Topics 列表:生产者和主题是多对多的关系,同一个生产者可以向多个主题发送消息
.setTopics("MY_NORMAL_TOPIC")
.setClientConfiguration(configuration)
// 构建生产者,此方法会抛出 ClientException 异常
.build();
// 构建消息类
Message message = provider.newMessageBuilder()
// 设置消息发送到的主题
.setTopic("MY_NORMAL_TOPIC")
// 设置消息索引键,可根据关键字精确查找某条消息。其一般为业务上的唯一值。如:订单id
.setKeys("order_id_1001")
// 设置消息Tag,用于消费端根据指定Tag过滤消息。其一般用作区分不同的业务,最好给它定义好命名规范
.setTag("RETRY_TEST")
// 消息体,单条消息的传输负载不宜过大。所以此处的字节大小最好有个限制
.setBody("{\"success\":true,\"order_id\":\"1001\",\"msg\":\"消费重试测试!\"}".getBytes())
.build();
// 发送消息(此处最好进行异常处理,对消息的状态进行一个记录)
try {
SendReceipt sendReceipt = producer.send(message);
System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId());
} catch (ClientException e) {
System.out.println("Failed to send message");
}
}
}
生产者代码和普通的消息发送代码一致。
消费者
import com.yyoo.mq.rocket.MyMQProperties;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import java.nio.ByteBuffer;
import java.util.Collections;
public class RetryConsumerDemo {
public static void main(String[] args) throws ClientException {
// 用于提供:生产者、消费者、消息对应的构建类 Builder
ClientServiceProvider provider = ClientServiceProvider.loadService();
// 构建配置类(包含端点位置、认证以及连接超时等的配置)
ClientConfiguration configuration = ClientConfiguration.newBuilder()
// endpoints 即为 proxy 的地址,多个用分号隔开。如:xxx:8081;xxx:8081
.setEndpoints(MyMQProperties.ENDPOINTS)
.build();
// 设置过滤条件(这里为使用 tag 进行过滤)
String tag = "RETRY_TEST";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 构建消费者
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(configuration)
// 设置消费者分组
.setConsumerGroup("MY_RETRY_GROUP")
// 设置主题与消费者之间的订阅关系
.setSubscriptionExpressions(Collections.singletonMap("MY_NORMAL_TOPIC", filterExpression))
.setMessageListener(messageView -> {
System.out.println("============开始消费!");
System.out.println(messageView);
ByteBuffer rs = messageView.getBody();
byte[] rsByte = new byte[rs.limit()];
rs.get(rsByte);
// 一直失败,测试消费重试机制
if(true){
return ConsumeResult.FAILURE;
}
System.out.println("Message body:" + new String(rsByte));
// 处理消息并返回消费结果。
System.out.println("Consume message successfully, messageId=" + messageView.getMessageId());
return ConsumeResult.SUCCESS;
}).build();
System.out.println(pushConsumer);
// 如果不需要再使用 PushConsumer,可关闭该实例。
// pushConsumer.close();
}
}
达到最大重试次数后,消息将被发送到死信队列。
死信队列
死信队列对应的 TOPIC 名称为:%DLQ% + 消费者分组名称。我们的示例对应的死信 TOPIC 名称为:%DLQ%MY_RETRY_GROUP。我们可以通过消费死信队列的消息进行业务恢复或者进行死信消息的持久化存储。
死信消息在 MQ 中的最长存储时间为 3 天,即便我们消费了死信队列的消息,死信消息依然会存储在MQ,到期后才删除。
消费死信队列
import com.yyoo.mq.rocket.MyMQProperties;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import java.nio.ByteBuffer;
import java.util.Collections;
public class DLQConsumerDemo {
public static void main(String[] args) throws ClientException {
// 用于提供:生产者、消费者、消息对应的构建类 Builder
ClientServiceProvider provider = ClientServiceProvider.loadService();
// 构建配置类(包含端点位置、认证以及连接超时等的配置)
ClientConfiguration configuration = ClientConfiguration.newBuilder()
// endpoints 即为 proxy 的地址,多个用分号隔开。如:xxx:8081;xxx:8081
.setEndpoints(MyMQProperties.ENDPOINTS)
.build();
// 设置过滤条件(这里为使用 tag 进行过滤)
String tag = "RETRY_TEST";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 构建消费者
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(configuration)
// 设置消费者分组
.setConsumerGroup("DLQ_CONSUMER_GROUP")
// 设置主题与消费者之间的订阅关系
.setSubscriptionExpressions(Collections.singletonMap("%DLQ%MY_RETRY_GROUP", filterExpression))
.setMessageListener(messageView -> {
System.out.println("============开始消费!");
System.out.println(messageView);
ByteBuffer rs = messageView.getBody();
byte[] rsByte = new byte[rs.limit()];
rs.get(rsByte);
// 测试消费重试机制
if(true){
return ConsumeResult.FAILURE;
}
System.out.println("死信消息消费:" + new String(rsByte));
// 处理消息并返回消费结果。
System.out.println("死信消息消费 Consume message successfully, messageId=" + messageView.getMessageId());
return ConsumeResult.SUCCESS;
}).build();
System.out.println(pushConsumer);
// 如果不需要再使用 PushConsumer,可关闭该实例。
// pushConsumer.close();
}
}
死信队列消费与普通消息消费是一样的,它同样有重试逻辑,遵循同样的规律。即死信消息消费时也有重试机制以及超过最大重试次数成为死信消息。
如何查看死信消息
- 通过 Admin Tool 命令在服务端查看
- 通过 RocketMQ Dashboard 后台查看
- 通过 Promethus 查看 MQ 的监控指标(由 RocketMQ Promethus Exporter 实现)
- 通过 Admin Tool 源码接口 MQAdminExt 自行编码实现(其实以上 3 种方式都是通过 MQAdminExt 接口来实现的)
不仅仅是查看死信消息,Dashboard 还提供了后台管理界面,可以新增主题、消费者分组等各种 Admin Tool 支持的操作,但 Dashboard 版本几乎没有更新,比如修改消费者分组为顺序消费等功能没有,而且很多功能可能会报错,但其是我们自定义 MQ 管理功能的重要参考。
Admin Tool 命令涉及到源码主要在: