highlight: arduino-light
1.5 死信队列
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。对应的源码如下:
java public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%"; public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%"; private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, RemotingCommand request, MessageExt msg, TopicConfig topicConfig) { // 获取topic进行判断逻辑 String newTopic = requestHeader.getTopic(); // 重试队列:%RETRY%+consumerGroup // 如果是重试队列才会进入判断 if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { //获取消费者组 String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark( "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); return false; } // 获取最大重试次数 int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } // 获取消费次数 int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); // 超过最大重试次数之后发送到死信队列 if (reconsumeTimes >= maxReconsumeTimes) { // 死信队列:%DLQ%+consumerGroup newTopic = MixAll.getDLQTopic(groupName); int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager() .createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 ); msg.setTopic(newTopic); msg.setQueueId(queueIdInt); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); return false; } } } int sysFlag = requestHeader.getSysFlag(); if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; } msg.setSysFlag(sysFlag); return true; } }
在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
1.5.1 死信特性
死信消息具有以下特性
- 不会再被消费者正常消费。
- 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。
死信队列具有以下特性:
- 一个死信队列对应一个消费者Group, 而不是对应单个消费者实例。
- 如果一个消费者Group未产生死信消息,消息队列RocketMQ 不会为其创建相应的死信队列。
- 一个死信队列包含了对应消费者Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
- 死信队列的topic是在消息发送过程中判断对应的topic是否存在,不存在就动态进行创建
- 重试队列是以%RETRY%+consumerGroup作为TOPIC。
- 死信队列是以%DLQ%+consumerGroup作为TOPIC。
- 死信队列对应Topic的权限为2,只有写权限,所以死信队列没办法读取。需要设置权限为6[2:W; 4:R; 6:RW]
1.5.2 查看死信信息
- 在控制台查询出现死信队列的主题信息
- 在消息界面根据主题查询死信消息
- 选择重新发送消息
一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。
1.5.3处理死信队列
1.订阅并消费死信队列
```java public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstartconsumerdlq"); consumer.setNamesrvAddr("localhost:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUMEFROMFIRSTOFFSET); // 订阅死信队列 consumer.subscribe("%DLQ%quickstartconsumer_dlq", "*"); consumer.setMaxReconsumeTimes(1); consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
} ```
2.重试次数判断,持久化重试
重试次数到达一定次数,可能程序没问题,只是数据有问题,此时无论重试多少次,最终的结果都是一样的,所以此时可以直接返回成功!
由我们自己的通用定时任务重试,消息中注意需要定义1个bizType。