RocketMQKafka重试队列

news2024/12/30 1:00:22

为实现服务间的解耦和部分逻辑的异步处理,我们的系统采纳了消息驱动的方法。通过消息队列的使用,各个服务能够基于事件进行通信,从而降低了直接的依赖关系,优化了系统的响应性能和可靠性。

为什么需要考虑消费重试?

在业务开发过程中,异常处理和重试机制是确保系统稳定性和数据一致性的关键环节。例如,Spring Retry 提供了灵活的重试策略,以增强方法调用的健壮性。同样,在使用 OpenFeign 进行远程服务调用时,其集成的 Ribbon 组件也能够处理接口调用过程中的重试逻辑。

对于消息队列的消费场景,消息消费的重试同样至关重要。当消息处理过程中遇到异常时,合理的重试策略可以确保业务逻辑的正确执行和数据的完整性。

rocketmq与kafka如何进行重试的?

rocketmq(4.9.x版本)

重试队列

若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列

消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

  • 最大重试次数:消息消费失败后,可被重复投递的最大次数。
consumer.setMaxReconsumeTimes(10);
  • 重试间隔:消息消费失败后再次被投递给Consumer消费的间隔时间,只在顺序消费中起作用。
consumer.setSuspendCurrentQueueTimeMillis(5000);

顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。

并发消费失败后并不是投递回原Topic,而是投递到一个特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。 两者参数差别如下

消费类型重试间隔最大重试次数
顺序消费间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX
并发消费间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值
死信队列​

当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。

消费者示例

springboot 集成 maven

@Component
@RocketMQMessageListener(topic = "topicTest",consumerGroup = "test", selectorExpression =  "tagTest",
       consumeThreadNumber = 10)
@RequiredArgsConstructor
public static class TestListener implements RocketMQListener<String> {
   public void onMessage(String event) {
       log.info("consume event: {}", event);
       // 处理业务
   }
}
rocketmq dashboard

重试队列相关topic见下图
在这里插入图片描述
可以看到有个%RETRY%test的topic,即重试topic为%RETRY%+consumerGroup。

源码解析

自定义实现MessageListener消费消息接口,并发消费消息,如下为spring-rocketmq的实现。业务开发只需要实现RocketMQListener即可,见消费者示例。

org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.DefaultMessageListenerConcurrently
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
     for (MessageExt messageExt : msgs) {
         log.debug("received msg: {}", messageExt);
         try {
             long now = System.currentTimeMillis();
             handleMessage(messageExt);
             long costTime = System.currentTimeMillis() - now;
             log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
         } catch (Exception e) {
             log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
             context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;
         }
     }

     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
private void handleMessage(MessageExt messageExt)  {
     if (rocketMQListener != null) {
         rocketMQListener.onMessage(doConvertMessage(messageExt));
     }
}

根据业务接口返回的状态来决定下一步如何处理

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
// 根据状态,设置ackIndex
switch (status) {
   case CONSUME_SUCCESS:
       if (ackIndex >= consumeRequest.getMsgs().size()) {
           ackIndex = consumeRequest.getMsgs().size() - 1;
       }
       int ok = ackIndex + 1;
       int failed = consumeRequest.getMsgs().size() - ok;
       this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
       this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
       break;
   case RECONSUME_LATER:
       ackIndex = -1;
       this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
           consumeRequest.getMsgs().size());
       break;
   default:
       break;
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
   case BROADCASTING:
       for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
           MessageExt msg = consumeRequest.getMsgs().get(i);
           // 广播模式仅输出日志
           log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
       }
       break;
   case CLUSTERING:
       // 集群模式下,只要有一条消息消费失败,则遍历本次消费的所有消息,发回broker
       // consumeRequest.getMsgs().size() 由ConsumeMessageBatchMaxSize决定,默认为1
       List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
       // ackidx = -1, i从0开始,遍历所有消息
       for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
           MessageExt msg = consumeRequest.getMsgs().get(i);
           boolean result = this.sendMessageBack(msg, context);
           if (!result) {
               msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
               msgBackFailed.add(msg);
           }
       }

       if (!msgBackFailed.isEmpty()) {
           consumeRequest.getMsgs().removeAll(msgBackFailed);
           this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
       }
       break;
   default:
       break;
}

sendMsgBack 发回broker
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
    int delayLevel = context.getDelayLevelWhenNextConsume();
    // Wrap topic with namespace before sending back message.
    msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
    try {
        this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
        return true;
    } catch (Exception e) {
        log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg, e);
    }

    return false;
}

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#sendMessageBack

 public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName){
    try {
        String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
            : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
        this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,
            this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
    } catch (Exception e) {
        log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);

        Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

        String originMsgId = MessageAccessor.getOriginMessageId(msg);
        MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

        newMsg.setFlag(msg.getFlag());
        MessageAccessor.setProperties(newMsg, msg.getProperties());
        MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
        MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
        MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
        MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
        newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

        this.mQClientFactory.getDefaultMQProducer().send(newMsg);
    } finally {
        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
    }
}

org.apache.rocketmq.client.impl.MQClientAPIImpl#consumerSendMessageBack

ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);

requestHeader.setGroup(consumerGroup);
requestHeader.setOriginTopic(msg.getTopic());
requestHeader.setOffset(msg.getCommitLogOffset());
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
requestHeader.setBname(brokerName);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
    request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
    case ResponseCode.SUCCESS: {
        return;
    }
    default:
        break;
}

4.9.x 最终还是通过netty发送到broker的
org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync

5.x 通过grpc通信,取代了remoting的通信方式;通过proxy访问,而不是直接链接到broker;

broker端schedule处理
private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request) t {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final ConsumerSendMsgBackRequestHeader requestHeader =
          (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
    ……
    // 读写权限校验等;
    if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
        return CompletableFuture.completedFuture(response);
    }
    if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return CompletableFuture.completedFuture(response);
    }

    // 重试topic
    String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
    int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % subscriptionGroupConfig.getRetryQueueNums();
    int topicSysFlag = 0;
    if (requestHeader.isUnitMode()) {
        topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
    }

    // 创建重试topic,并设置读写权限
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
        newTopic,
        subscriptionGroupConfig.getRetryQueueNums(),
        PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
    if (null == topicConfig) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("topic[" + newTopic + "] not exist");
        return CompletableFuture.completedFuture(response);
    }

    if (!PermName.isWriteable(topicConfig.getPerm())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
        return CompletableFuture.completedFuture(response);
    }
    MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
    if (null == msgExt) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("look message by offset failed, " + requestHeader.getOffset());
        return CompletableFuture.completedFuture(response);
    }

    final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
    if (null == retryTopic) {
        MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
    }
    msgExt.setWaitStoreMsgOK(false);

    int delayLevel = requestHeader.getDelayLevel();

    int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
    if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
        Integer times = requestHeader.getMaxReconsumeTimes();
        if (times != null) {
            maxReconsumeTimes = times;
        }
    }

    // 根据重试次数来判断是发送到死信队列或重试队列
    if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
        || delayLevel < 0) {
        newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
        queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;

        topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                DLQ_NUMS_PER_GROUP,
                PermName.PERM_WRITE | PermName.PERM_READ, 0);

        if (null == topicConfig) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("topic[" + newTopic + "] not exist");
            return CompletableFuture.completedFuture(response);
        }
        msgExt.setDelayTimeLevel(0);
    } else {
        if (0 == delayLevel) {
            delayLevel = 3 + msgExt.getReconsumeTimes();
        }
        msgExt.setDelayTimeLevel(delayLevel);
    }

    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(newTopic);
    msgInner.setBody(msgExt.getBody());
    msgInner.setFlag(msgExt.getFlag());
    MessageAccessor.setProperties(msgInner, msgExt.getProperties());
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
    msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

    msgInner.setQueueId(queueIdInt);
    msgInner.setSysFlag(msgExt.getSysFlag());
    msgInner.setBornTimestamp(msgExt.getBornTimestamp());
    msgInner.setBornHost(msgExt.getBornHost());
    msgInner.setStoreHost(msgExt.getStoreHost());
    msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

    String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
    MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

    // 消息持久化
    CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    return putMessageResult.thenApply(r -> {
        if (r != null) {
            switch (r.getPutMessageStatus()) {
                case PUT_OK:
                    String backTopic = msgExt.getTopic();
                    String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
                    if (correctTopic != null) {
                        backTopic = correctTopic;
                    }
                    if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msgInner.getTopic())) {
                        this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
                        this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), r.getAppendMessageResult().getWroteBytes());
                        this.brokerController.getBrokerStatsManager().incQueuePutNums(msgInner.getTopic(), msgInner.getQueueId());
                        this.brokerController.getBrokerStatsManager().incQueuePutSize(msgInner.getTopic(), msgInner.getQueueId(), r.getAppendMessageResult().getWroteBytes());
                    }
                    this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
                    response.setCode(ResponseCode.SUCCESS);
                    response.setRemark(null);
                    return response;
                default:
                    break;
            }
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(r.getPutMessageStatus().name());
            return response;
        }
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("putMessageResult is null");
        return response;
    });
}

org.apache.rocketmq.store.MessageStore#asyncPutMessage
org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage
org.apache.rocketmq.store.CommitLog#asyncPutMessage

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // Set the storage time
    msg.setStoreTimestamp(System.currentTimeMillis());
    // Set the message body BODY CRC (consider the most appropriate setting
    // on the client)
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    // Back to Results
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    String topic = msg.getTopic();
//        int queueId msg.getQueueId();
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // Delay Delivery
        if (msg.getDelayTimeLevel() > 0) {
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            // 持久化到SCHEDULE_TOPIC_XXXX 中,queueId 根据delayLevel转换
            // org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
            topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
            int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // 将真实的topic、queueId 保存到property中
            // Backup real topic, queueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }

    InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
    if (bornSocketAddress.getAddress() instanceof Inet6Address) {
        msg.setBornHostV6Flag();
    }

    InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
    if (storeSocketAddress.getAddress() instanceof Inet6Address) {
        msg.setStoreHostAddressV6Flag();
    }

    PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
    updateMaxMessageSize(putMessageThreadLocal);
    if (!multiDispatch.isMultiDispatchMsg(msg)) {
        PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
        if (encodeResult != null) {
            return CompletableFuture.completedFuture(encodeResult);
        }
        msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
    }
    PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));

    long elapsedTimeInLock = 0;
    MappedFile unlockMappedFile = null;

    putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
    try {
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
        this.beginTimeInLock = beginLockTimestamp;

        // Here settings are stored timestamp, in order to ensure an orderly
        // global
        msg.setStoreTimestamp(beginLockTimestamp);

        if (null == mappedFile || mappedFile.isFull()) {
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
        }
        if (null == mappedFile) {
            log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
        }
        // 将消息持久化
        result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
        switch (result.getStatus()) {
            case PUT_OK:
                break;
            case END_OF_FILE:
                unlockMappedFile = mappedFile;
                // Create a new file, re-write the message
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                if (null == mappedFile) {
                    // XXX: warn and notify me
                    log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                }
                result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                break;
            case MESSAGE_SIZE_EXCEEDED:
            case PROPERTIES_SIZE_EXCEEDED:
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
            case UNKNOWN_ERROR:
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
            default:
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
        }

        elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
    } finally {
        beginTimeInLock = 0;
        putMessageLock.unlock();
    }

    if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
    }
    ……
}

消费commitLog中的消息

org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup

public void executeOnTimeup() {
    // comsumeQueue 不存在则创建
    ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
            delayLevel2QueueId(delayLevel));

    if (cq == null) {
        this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
        return;
    }

    SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
    if (bufferCQ == null) {
        long resetOffset;
        if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
            log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
                this.offset, resetOffset, cq.getQueueId());
        } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
            log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
                this.offset, resetOffset, cq.getQueueId());
        } else {
            resetOffset = this.offset;
        }

        this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
        return;
    }

    long nextOffset = this.offset;
    try {
        int i = 0;
        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
        for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
            long offsetPy = bufferCQ.getByteBuffer().getLong();
            int sizePy = bufferCQ.getByteBuffer().getInt();
            long tagsCode = bufferCQ.getByteBuffer().getLong();

            if (cq.isExtAddr(tagsCode)) {
                if (cq.getExt(tagsCode, cqExtUnit)) {
                    tagsCode = cqExtUnit.getTagsCode();
                } else {
                    //can't find ext content.So re compute tags code.
                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                        tagsCode, offsetPy, sizePy);
                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                }
            }

            long now = System.currentTimeMillis();
            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

            long countdown = deliverTimestamp - now;
            if (countdown > 0) {
                this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                return;
            }

			// 根据offset查找消息
            MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
            if (msgExt == null) {
                continue;
            }

            MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
            if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                    msgInner.getTopic(), msgInner);
                continue;
            }

            boolean deliverSuc;
            // 投递消息到重试队列
            if (ScheduleMessageService.this.enableAsyncDeliver) {
                deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
            } else {
                deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
            }

            if (!deliverSuc) {
                this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                return;
            }
        }

        nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
    } catch (Exception e) {
        log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
    } finally {
        bufferCQ.release();
    }

    this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}

private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
    int sizePy) {
    PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);
    PutMessageResult result = resultProcess.get();
    boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;
    if (sendStatus) {
        ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());
    }
    return sendStatus;
}

// messageStore 异步持久化消息-->commitLog-->mappedFile
private PutResultProcess deliverMessage(MessageExtBrokerInner msgInner, String msgId, long offset,
    long offsetPy, int sizePy, boolean autoResend) {
    CompletableFuture<PutMessageResult> future =
        ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
    return new PutResultProcess()
        .setTopic(msgInner.getTopic())
        .setDelayLevel(this.delayLevel)
        .setOffset(offset)
        .setPhysicOffset(offsetPy)
        .setPhysicSize(sizePy)
        .setMsgId(msgId)
        .setAutoResend(autoResend)
        .setFuture(future)
        .thenProcess();
}
订阅重试队列

消息经过指定延迟时间被投递到延迟队列后,就可以被消费者消费了。

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#copySubscription

switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        break;
    // 集群模式才会去订阅重试队列
    case CLUSTERING:
        final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
        this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
        break;
    default:
        break;
}
消息的拉取
topic的重新设置
public void resetRetryAndNamespace(final List<MessageExt> msgs, String consumerGroup) {
    final String groupTopic = MixAll.getRetryTopic(consumerGroup);
    for (MessageExt msg : msgs) {
        String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
        if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
            msg.setTopic(retryTopic);
        }

        if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
        }
    }
}

重新处理业务逻辑
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage

总结

重试的主要流程:
1、consumer消费失败,将消息发送回broker;
2、broker收到重试消息之后,先存储到定时队列里;
3、根据重试次数,经过一定延迟时间后,重新投递到retryTopic;
4、consumer会拉取consumerGroup对应的retryTopic的消息;
5、consumer拉取到retryTopic消息之后,转换为原始的topic,由messageListener实现类消费。

QA

1、为什么不是发送到原队列?
因为可能有多个消费者组消费了同一条消息,而只有某一组报错了。发到原队列,则可能会导致其它消费者组重复订阅。

2、重试队列数量,即consumerGroup数量在哪些方面可能产生影响?
a)、broker向nameserver注册时,会注册topic信息,而consumerGroup会增加重试topic;
b)、每个并发消费的consumer都会启动指定核心线程数量的线程池;
c)、订阅关系一致;
d)、客户端重平衡;

kafka重试

kafka原生支持程度

kafka本身并不支持重试、延迟、死信队列等特性。
spring-kafka 封装了重试队列,每个topic都会产生对应的重试topic。而kafka topic数量如果过多,则会影响到kafka的吞吐量。

自定义实现重试队列逻辑

1、实现KafkaListenerErrorHandler,重写handleError方法,在消费失败后为消息header增加retryCount,记录失败的组id,并根据retryCount来判断发送到retry topic 或 dead topic;

@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception,
                          Consumer<?, ?> consumer) {
    log.warn("消费消息失败,将进行重试或进入死信", exception);
    //重试次数
    ConsumerRecord record = (ConsumerRecord) message.getPayload();
    //读取自定义的请求头;如果有,读取重试过的次数;
    Header retryHeader = record.headers().lastHeader("__retry_dead_head__");
    Integer retryTimes = 0;
    if (retryHeader != null && retryHeader.value() != null) {
        retryTimes = JsonUtils.toData(stringDeserializer.deserialize(null, retryHeader.value()), Map.class).get(RETRY_KEY);;
    }

    Headers headers = record.headers();
    String groupId = consumer.groupMetadata().groupId();
    Map<String, Object> custData = new HashMap<>();
    // 处理自定义请求头,比如原topic、groupId等等;
    assembleCustData(custData );
    if (hasRetry >= maxRetryCount) {
        message2Dead(record, custData, headers);
    } else {
        // 未超过重试次数,仍发送到重试队列里
        message2Retry(record, hasRetry, custData, headers);
    }
    //提交消费点
    consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)));
    return null;
}

private void message2Retry(ConsumerRecord record, Integer retryTimes, Map<String, Object> custData, Headers headers) {
    custData.put(RETRY_KEY, hasRetry + 1);
    try {
    	// 定义一个统一的retry topic
        kafkaTemplate.send(new ProducerRecord<>(RETRY_TOPIC, null, null, record.key(), record.value(), buildNewHeaders(headers, custData))).get();

    } catch (Exception ex) {
        throw new BusinessException("发送到死信队列异常", ex);
    }
}

1.1、在消费者注入retryHandler

@Bean
public KafkaListenerErrorHandler kafkaListenerErrorHandler() {
    Map<String, Object> produceConfig = kafkaProperties.buildProducerProperties();
    produceConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    ProducerFactory producerFactory = new DefaultKafkaProducerFactory(produceConfig);
    KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory);
    RetryListenerErrorHandler kafkaListenerErrorHandler = new RetryListenerErrorHandler(kafkaTemplate, appName);
    return kafkaListenerErrorHandler;
}

2、监听 RETRY_TOPIC

@KafkaListener(topics = {RETRY_TOPIC})
public void retryDeal(ConsumerRecord<byte[], byte[]> record, Acknowledgment ack) throws ExecutionException, InterruptedException, UnsupportedEncodingException {
    Header customerHead = record.headers().lastHeader(RetryListenerErrorHandler.CUSTOMER_HEAD_NAME);
    Long timestamp = record.timestamp();
    if (System.currentTimeMillis() < timestamp + RETRY_DELAY_DURATION.toMillis()) {
        try {
            Thread.sleep((timestamp + RETRY_DELAY_DURATION.toMillis()) - System.currentTimeMillis());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    Map<String, Object> custData = JsonUtils.toData(new StringDeserializer().deserialize(null, customerHead.value()), Map.class);
    String topic = (String) custData.get(RetryListenerErrorHandler.CONSUMER_TOPIC);
    Integer partition = (Integer) custData.get(RetryListenerErrorHandler.PARTITION_KEY);
    // 在达到延迟时间后,发送到原队列原partition
    kafkaTemplate.send(new ProducerRecord<>(topic, partition, null, record.key(), record.value(), record.headers())).get();
    ack.acknowledge();
}

3、实现org.apache.kafka.clients.consumer.ConsumerInterceptor,重写onConsume方法

@Override
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {

    KafkaUtils.getConsumerGroupId();

    Map<TopicPartition, List<ConsumerRecord<K, V>>> resultRecordMap = new HashMap<>(records.count());
    records.partitions().forEach(p->{
        List<ConsumerRecord<K,V>> recordList = records.records(p);
        // 根据自定义过滤逻辑(如groupId)来判断是否消费该消息
        List<ConsumerRecord<K, V>> filteredRecords = recordList.stream().filter(this::predict).collect(Collectors.toList());
        if (!filteredRecords.isEmpty()) {
            resultRecordMap.put(p, filteredRecords);
        }
    });
    return new ConsumerRecords<>(resultRecordMap);
}

参考

https://rocketmq.apache.org/zh/docs/4.x/consumer/02push
https://rocketmq.apache.org/zh/docs/featureBehavior/10consumerretrypolicy

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2061694.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

人格凭证(PHC):一种鉴别AI防伪保护隐私的真实身份验证技术

人格凭证&#xff08;PHC&#xff09;&#xff1a;一种鉴别AI防伪保护隐私的真实身份验证技术 引言 随着人工智能&#xff08;AI&#xff09;技术的飞速发展&#xff0c;网络空间中的身份验证问题日益凸显。AI不仅能模仿人类行为&#xff0c;还能创建虚假账户、发布误导性信息…

秒懂Linux之缓冲区

目录 一.何为缓冲区 二. 缓冲区在哪 三. 模拟编码 一.何为缓冲区 缓冲区说白了就是一块内存区域&#xff0c;目的是为了提高使用者的效率以及减少C语言接口的使用频率~ 下面我们用一则小故事来类比出缓冲区的功能~ 张三为了给朋友李四庆祝生日快乐准备了份生日礼物~张三难道…

开源原型设计工具Penpot

Penpot是一个现代化、开源的协同设计平台&#xff0c;专为跨职能团队打造&#xff0c;提供了强大的在线设计和原型制作功能。 以下是对Penpot的详细介绍&#xff1a; 一、平台特点 开源与免费&#xff1a;Penpot是一个完全免费且开放源代码的项目&#xff0c;允许社区贡献和定…

Redis补充

Redis事务 Redis事务的概念 Redis 事务的本质是一组命令的集合。事务支持一次执行多个命令&#xff0c;一个事务中所有命令都会被序列化。在事务执行过程&#xff0c;会按照顺序串行化执行队列中的命令&#xff0c;其他客户端提交的命令请求不会插入到事务执行命令序列中。 …

JAVA多线程等待唤醒机制

为什么要处理线程间通信&#xff1a; 当我们需要多个线程来共同完成一件任务&#xff0c;并且我们希望他们有规律的执行&#xff0c;那么多线程之间需要一些通信机制&#xff0c;可以协调它们的工作&#xff0c;以此实现多线程共同操作一份数据。 比如&#xff1a;线程A用来生…

Java | Leetcode Java题解之第357题统计各位数字都不同的数字个数

题目&#xff1a; 题解&#xff1a; class Solution {public int countNumbersWithUniqueDigits(int n) {if (n 0) {return 1;}if (n 1) {return 10;}int res 10, cur 9;for (int i 0; i < n - 1; i) {cur * 9 - i;res cur;}return res;} }

4-1-5 步进电机原理2(电机专项教程)

4-1-5 步进电机原理2&#xff08;电机专项教程&#xff09; 4-1-5 步进电机原理2永磁式步进电机反应式步进电机混合式步进电机混合式步进电机基本原理 4-1-5 步进电机原理2 新的步进电机分类 永磁式步进电机 目前学习的转子都是永磁铁 反应式步进电机 软磁材料易受到周围磁场…

阿里云魏子珺:阿里云Elasticsearch AI 搜索实践

作者&#xff1a;阿里云魏子珺 【AI搜索 TechDay】是 Elastic 和阿里云联合主办的 AI 技术 Meetup 系列&#xff0c;聚焦企业级 AI 搜索应用和开发者动手实践&#xff0c;旨在帮助开发者在大模型浪潮下升级 AI 搜索&#xff0c;助力业务增长。 阿里云 Elasticsearch 的 AI 搜索…

Nginx笔记(高级)

扩容 通过扩容提升整体吞吐量 单机垂直扩容&#xff1a;硬件资源增加 云服务资源增加 整机&#xff1a;IBM、浪潮、DELL、HP等CPU/主板&#xff1a;更新到主流网卡&#xff1a;10G/40G网卡磁盘&#xff1a;SAS(SCSI) HDD&#xff08;机械&#xff09;、HHD&#xff08;混合&…

OpenCV几何图像变换(5)旋转和缩放计算函数getRotationMatrix2D()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 计算二维旋转的仿射矩阵。 该函数计算以下矩阵&#xff1a; [ α β ( 1 − α ) ⋅ center.x − β ⋅ center.y − β α β ⋅ center.x ( …

Linux 中断处理与内核线程化——以触摸屏中断为例

文章目录 1 什么是中断&#xff1f;2 传统的中断处理模型3 内核线程与用户进程4 中断线程化的理念5 devm_request_threaded_irq 与 request_irq 的比较6 触摸屏驱动中的中断线程化参考链接封面 本文探讨了 Linux 中断处理的传统模型与中断线程化的理念&#xff0c;以及在触摸屏…

【Python】计算直角三角形的 ∠MBC

有一个直角三角形 ABC&#xff0c;其中角 B 是直角&#xff08;90&#xff09;。点 M 是斜边 AC 的中点。我们需要根据边 AB 和 BC 的长度来计算角 ∠MBC。 在直角三角形中&#xff0c;如果一个角是直角&#xff0c;那么另外两个角的和是90。由于 M 是斜边的中点&#xff0c;根…

turtle画图知识

Turtle库是Python编程语言中的一个库&#xff0c;用于创建各种类型的图形&#xff0c;包括简单圆形、线条、路径和图片。它支持多种图形类型&#xff0c;并且可以绘制出各种复杂的形状。 以下是一些基本的使用方法&#xff1a; 1. 创建一个新的Turtle对象&#xff1a; pytho…

hyperf 协程作用和相关的方法

什么是协程 协程是一种轻量级的线程&#xff0c;由用户代码来调度和管理&#xff0c;而不是由操作系统内核来进行调度&#xff0c;也就是在用户态进行 判断当前是否处于协程环境内 在一些情况下我们希望判断一些当前是否运行于协程环境内&#xff0c; 对于一些兼容协程环境与…

RK3568平台(PWM篇)PWM驱动

一.PWM基础知识 PWM 全称为 Pulse Width Modulation&#xff0c;翻译成中文为脉冲宽度调制&#xff0c;它是一种数字信号控 制模拟电路的技术&#xff0c;可以通过改变高/低电平的占空比来控制平均电压或功率,从而达到对模拟 量的控制目的。 周期(T)&#xff1a;指一个完整的…

Vue条件判断:v-if、v-else、v-else-if、v-show 指令

在程序设计中&#xff0c;条件判断是必不可少的技术。在视图中&#xff0c;经常需要通过条件判断来控制 DOM 的显示状态。Vue.js 提供了相应的指令用于实现条件判断&#xff0c;包括&#xff1a;v-if、v-else、v-else-if、v-show 指令。 1、v-if 指令 v-if 指令可以根据表达式…

机器学习 之 线性回归算法

目录 线性回归&#xff1a;理解与应用 什么是线性回归&#xff1f; 一元线性回归 正态分布的重要性 多元线性回归 实例讲解 数据准备 数据分析 构建模型 训练模型 验证模型 应用模型 代码实现 线性回归&#xff1a;理解与应用 线性回归是一种广泛使用的统计方法&…

企业高性能web服务器,原理及实例

一、Web 服务基础介绍 正常情况下的单次web服务访问流程&#xff1a; 1.1 Web 服务介绍 1993年3月2日&#xff0c;中国科学院高能物理研究所租用AT&T公司的国际卫星信道建立的接入美国SLAC国家实 验室的64K专线正式开通&#xff0c;成为我国连入Internet的第一根专线。 1…

Mycat分片-垂直拆分

目录 场景 配置 测试 全局表配置 续接上篇&#xff1a;MySQ分库分表与MyCat安装配置-CSDN博客 续接下篇&#xff1a;Mycat分片-水平拆分-CSDN博客 场景 在业务系统中, 涉及以下表结构 ,但是由于用户与订单每天都会产生大量的数据, 单台服务器的数据 存储及处理能力是有限…

0x01 GlassFish 任意文件读取漏洞复现

参考文章&#xff1a; 应用服务器glassfish任意文件读取漏洞 - SecPulse.COM | 安全脉搏 fofa 搜索使用该服务器的网站 网络空间测绘&#xff0c;网络空间安全搜索引擎&#xff0c;网络空间搜索引擎&#xff0c;安全态势感知 - FOFA网络空间测绘系统 "glassfish"&…