[RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十)

news2025/1/10 11:47:41

asyncSendMessage方法用来处理来自producer发送的消息。

在这里插入图片描述

文章目录

      • 1.asyncSendMessage异步处理单条消息
      • 2.preSend准备响应命令对象
        • 2.1 msgCheck检查并自动创建topic
          • 2.1.1 createTopicInSendMessageMethod创建普通topic
          • 2.1.2 createTopicInSendMessageBackMethod创建重试topc
          • 2.1.3 autoCreateTopicEnable自动创建topic的问题
      • 3.handlePutMessageResultFuture处理消息存放结果

1.asyncSendMessage异步处理单条消息

  1. 调用preSend方法创建响应的命令对象, 包括自动创建topic的逻辑, 随后创建响应头对象。
  2. 随后创建MessageExtBrokerInner对象, 从请求中获取消息的属性并设置到对象属性中, 消息体, topic。
  3. 判断如果是重试或者死信消息, 调用handleRetryAndDLQ方法处理重试和死信队列消息, 如果已重试次数大于最大重试次数, 那么替换topic为死信队列topic, 消息会被发送至死信队列。
  4. 判断如果是事务准备消息, 并且不会拒绝处理事务消息, 调用asyncPrepareMessage方法以异步的方式处理存储事务准备消息。
  5. 如果是普通消息, 调用asyncPutMessage方法处理, 存储普通消息。asyncPutMessage以异步方式将消息存储到存储器中, 处理器可以处理下一个请求而不是等待结, 当结果完成时, 以异步方式通知客户端。
  6. 调用handlePutMessageResultFuture方法处理消息存储的处理结果。

在这里插入图片描述

/**
 * SendMessageProcessor的方法
 * <p>
 * 处理单条消息
 */
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                            SendMessageContext mqtraceContext,
                                                            SendMessageRequestHeader requestHeader) {
    /*
     * 1 创建响应的命令对象,包括自动创建topic的逻辑
     */
    final RemotingCommand response = preSend(ctx, request, requestHeader);
    //获取响应头
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();

    if (response.getCode() != -1) {
        return CompletableFuture.completedFuture(response);
    }
    //获取消息体
    final byte[] body = request.getBody();
    //获取队列id
    int queueIdInt = requestHeader.getQueueId();
    //从broker的topicConfigTable缓存中根据topicName获取TopicConfig
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    //如果队列id小于0,则随机选择一个写队列索引作为id
    if (queueIdInt < 0) {
        queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
    }
    //构建消息对象,保存着要存入commitLog的数据
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    //设置topic
    msgInner.setTopic(requestHeader.getTopic());
    //设置队列id
    msgInner.setQueueId(queueIdInt);
    /*
     * 2 处理重试和死信队列消息,将会对死信消息替换为死信topic
     */
    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
        return CompletableFuture.completedFuture(response);
    }
    /*
     * 设置一系列属性
     */
    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());
    Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    //设置到properties属性中
    MessageAccessor.setProperties(msgInner, origProps);
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
    //WAIT属性表示 消息发送时是否等消息存储完成后再返回
    if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {
        // There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.
        // It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.
        //不需要存储"WAIT=true"属性,从propertiesString中移除它,为每个消息节省9个字节。
        String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
        //将没有WAIT属性的origProps存入msgInner的propertiesString属性
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        // Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later
        //将WAIT属性重新存入origProps集合中,因为msgInner.isWaitStoreMsgOK()稍后将被调用
        origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
    } else {
        //将没有WAIT属性的origProps存入msgInner的propertiesString属性
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    }

    CompletableFuture<PutMessageResult> putMessageResult = null;
    /*
     * 处理事务消息逻辑
     */
    //TRAN_MSG属性值为true,表示为事务消息
    String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    //处理事务消息
    if (transFlag != null && Boolean.parseBoolean(transFlag)) {
        //判断是否需要拒绝事务消息,如果需要拒绝,则返回NO_PERMISSION异常
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                            + "] sending transaction message is forbidden");
            return CompletableFuture.completedFuture(response);
        }
        //调用asyncPrepareMessage方法以异步的方式处理、存储事务准备消息,底层仍是asyncPutMessage方法
        putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
    } else {
        //不是事务消息,那么调用asyncPutMessage方法处理,存储消息
        //以异步方式将消息存储到存储器中,处理器可以处理下一个请求而不是等待结果,当结果完成时,以异步方式通知客户端
        putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    }
    //处理消息存放的结果
    return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}

2.preSend准备响应命令对象

  • 创建响应的命令对象, 包括topic的校验和自动创建topic的逻辑。
  1. 创建RemotingCommand对象, 设置唯一请求id。
  2. 校验 当前时间是否小于broker的起始服务时间, 如果小于, 返回SYSTEM_ERROR, 表示不可用提供服务。
  3. msgCheck方法进行一系列的校验, 包括topic的自动创建逻辑。

在这里插入图片描述

/**
 * SendMessageProcessor的方法
 * <p>
 * 准备响应数据
 */
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,
                                SendMessageRequestHeader requestHeader) {
    //创建响应命令对象
    final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    //设置唯一id为请求id
    response.setOpaque(request.getOpaque());
    //添加扩展字段属性"MSG_REGION"、"TRACE_ON"
    response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

    log.debug("Receive SendMessage request command {}", request);
    //获取配置的broker的处理请求的起始服务时间,默认为0
    final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
    //如果当前时间小于起始时间,那么broker会返回一个SYSTEM_ERROR,表示现在broker还不能提供服务
    if (this.brokerController.getMessageStore().now() < startTimestamp) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));
        return response;
    }
    //设置code为-1
    response.setCode(-1);
    /*
     * 消息校验,包括自动创建topic的逻辑
     */
    super.msgCheck(ctx, requestHeader, response);
    if (response.getCode() != -1) {
        return response;
    }

    return response;
}

2.1 msgCheck检查并自动创建topic

  1. 校验如果当前broker有没有写的权限, 如果没有, 则broker会返回一个NO_PERMISSION异常。
    在这里插入图片描述

  2. 校验topic不能为空, 必须属于合法字符, 且长度不超过127字符。
    在这里插入图片描述在这里插入图片描述

  3. 校验如果当前topic是不为允许使用的系统topic, 抛出异常。
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

  4. 从broker的topicConfigTable缓存中根据topicName获取TopicConfig。
    在这里插入图片描述

    1. 如果不存在该topic信息, 调用createTopicInSendMessageMethod创建topic, 失败了, 判断是否重试, 如果是重试创建topic, 如果创建还是失败的话, 返回TOPIC_NOT_EXIST异常信息。
  5. 如果找到或者创建了topic, 校验queutId 不能大于等于该broker的读或写的最大queueId。
    在这里插入图片描述

/**
 * AbstractSendMessageProcessor的方法
 * <p>
 * 消息校验,包括自动创建topic的逻辑
 */
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
                                   final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
    //如果当前broker没有写的权限,那么broker会返回一个NO_PERMISSION异常,sending message is forbidden,禁止向该broker发送消息
    if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
            && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending message is forbidden");
        return response;
    }
    //校验topic不能为空,必须属于合法字符regex: ^[%|a-zA-Z0-9_-]+$,且长度不超过127个字符
    if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
        return response;
    }
    //校验如果当前topic是不为允许使用的系统topic,那么抛出异常,默认不能为SCHEDULE_TOPIC_XXXX
    if (TopicValidator.isNotAllowedSendTopic(requestHeader.getTopic(), response)) {
        return response;
    }
    //从broker的topicConfigTable缓存中根据topicName获取TopicConfig
    TopicConfig topicConfig =
            this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    //如果不存在该topic信息
    if (null == topicConfig) {
        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
            } else {
                topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
            }
        }

        log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
        /*
         * 尝试创建普通topic
         */
        topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
                requestHeader.getTopic(),
                requestHeader.getDefaultTopic(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
        /*
         * 尝试创建重试topic
         */
        if (null == topicConfig) {
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                topicConfig =
                        this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                                requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
                                topicSysFlag);
            }
        }

        if (null == topicConfig) {
            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
            response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
                    + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
            return response;
        }
    }
    //校验queutId 不能大于等于该broker的读或写的最大数量
    int queueIdInt = requestHeader.getQueueId();
    int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
    if (queueIdInt >= idValid) {
        String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
                queueIdInt,
                topicConfig.toString(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

        log.warn(errorInfo);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorInfo);

        return response;
    }
    return response;
}
2.1.1 createTopicInSendMessageMethod创建普通topic

在这里插入图片描述

  • 创建一个新的topic
  1. 获取锁防止并发创建相同的topic, 获取锁之后, 再次尝试从topicConfigTable获取topic信息, 如果获取到了, 直接返回, 如果没有, 创建topic。
    在这里插入图片描述

  2. 获取默认topic的信息, 作为创建新topic的模板, 默认topic实际上就是TBW102, 其有8个读写队列, 权限为读写并且可继承,即7。
    在这里插入图片描述

  3. 如果默认topic就是TBW102, 并且如果broker配置不支持自动创建topic, 即autoCreateTopicEnable为false, 设置权限为可读写, 不可继承。
    在这里插入图片描述
    在这里插入图片描述

  4. 如果默认topic配置的权限包括可继承, 从默认topic继承属性创建新topic。创建一个TopicConfig, 选择默认队列数量与默认topic写队列中数量小的值作为新topic的读写队列数量, 默认为4, 设置权限, 去除可继承权限。
    在这里插入图片描述

  5. 如果topic不是null, 则表示创建了topic, 将新的topic信息存入topicConfigTable缓存中, 生成下一个数据版本。标识位置为true, 调用persist方法将topic配置持久化到配置文件 {user.home}/store/config/topics.json中。
    在这里插入图片描述

  6. 解锁, 判断如果创建了新topic, 调用registerBrokerAll方法向nameServer注册当前broker的新配置路由信息。
    在这里插入图片描述

  /**
   * TopicConfigManager的方法
   * <p>
   * 创建普通topic,并持久化至配置文件 {user.home}/store/config/topics.json中
   *
   * @param topic                       待创建topic
   * @param defaultTopic                默认topic,用于作为模板创建新topic
   * @param remoteAddress               远程地址
   * @param clientDefaultTopicQueueNums 自动创建服务器不存在的topic时,默认创建的队列数,默认为4
*                                    可通过生产者DefaultMQProducer的defaultTopicQueueNums属性进行配置
   * @param topicSysFlag                topic标识
   * @return topic配置
   */
  public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
                                                    final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
      TopicConfig topicConfig = null;
      boolean createNew = false;

      try {
          //需要加锁防止并发创建相同的topic
          if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
              try {
                  //再次尝试从topicConfigTable获取topic信息,如果获取到了,那么直接返回
                  topicConfig = this.topicConfigTable.get(topic);
                  if (topicConfig != null)
                      return topicConfig;
                  //获取默认topic的信息,用于作为模板创建新topic,默认的默认topic实际上就是TBW102,其有8个读写队列,权限为读写并且可继承,即7
                  TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
                  if (defaultTopicConfig != null) {
                   //如果默认topic就是TBW102
                      if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                       //如果broker配置不支持自动创建topic,那么设置权限为可读写,不可继承,即6
                          if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                              defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
                          }
                      }
                      //如果默认topic配置的权限包括可继承,那么从默认topic继承属性
                      if (PermName.isInherited(defaultTopicConfig.getPerm())) {
                       //创建topic配置
                          topicConfig = new TopicConfig(topic);
                          //选择默认队列数量与默认topic写队列数中最小的值作为新topic的读写队列数量,默认为4
                          int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums());

                          if (queueNums < 0) {
                              queueNums = 0;
                          }

                          topicConfig.setReadQueueNums(queueNums);
                          topicConfig.setWriteQueueNums(queueNums);
                          //权限
                          int perm = defaultTopicConfig.getPerm();
                          //去掉可继承权限
                          perm &= ~PermName.PERM_INHERIT;
                          topicConfig.setPerm(perm);
                          topicConfig.setTopicSysFlag(topicSysFlag);
                          topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
                      } else {
                          log.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",
                                  defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);
                      }
                  } else {
                      log.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]",
                              defaultTopic, remoteAddress);
                  }
                  //如果topic不为null,说明创建了新topic
                  if (topicConfig != null) {
                      log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]",
                              defaultTopic, topicConfig, remoteAddress);
                      //将新的topic信息存入topicConfigTable缓存中
                      this.topicConfigTable.put(topic, topicConfig);
                      //生成下一个数据版本
                      this.dataVersion.nextVersion();
                      //标识位置为true
                      createNew = true;
                      /*
                       * 将topic配置持久化到配置文件 {user.home}/store/config/topics.json中
                       */
                      this.persist();
                  }
              } finally {
               //解锁
                  this.topicConfigTableLock.unlock();
              }
          }
      } catch (InterruptedException e) {
          log.error("createTopicInSendMessageMethod exception", e);
      }
      //如果创建了新topic,那么马上向nameServer注册当前broker的新配置路由信息
      if (createNew) {
          this.brokerController.registerBrokerAll(false, true, true);
      }

      return topicConfig;
  }
2.1.2 createTopicInSendMessageBackMethod创建重试topc
  • 自动创建重试topic, 源码和创建普通topic类似,. 不同的是重试topic不需要模板topic, 默认读写队列都是1, 权限为读写。
/**
 * TopicConfigManager的方法
 * <p>
 * 创建重试topic,并持久化至配置文件 {user.home}/store/config/topics.json中
 *
 * @param topic                       待创建topic
 * @param perm                        权限
 * @param clientDefaultTopicQueueNums 自动创建服务器不存在的topic时,默认创建的队列数,默认为4
 *                                    可通过生产者DefaultMQProducer的defaultTopicQueueNums属性进行配置
 * @param topicSysFlag                topic标识
 * @return topic配置
 */
public TopicConfig createTopicInSendMessageBackMethod(
        final String topic,
        final int clientDefaultTopicQueueNums,
        final int perm,
        final int topicSysFlag) {
    //尝试获取topic
    TopicConfig topicConfig = this.topicConfigTable.get(topic);
    //如果存在则直接返回
    if (topicConfig != null)
        return topicConfig;

    boolean createNew = false;

    try {
        //需要加锁防止并发创建相同的topic
        if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                //再次尝试从topicConfigTable获取topic信息,如果获取到了,那么直接返回
                topicConfig = this.topicConfigTable.get(topic);
                if (topicConfig != null)
                    return topicConfig;
                //创建topic
                topicConfig = new TopicConfig(topic);
                //重试topic的默认读写队列数量为1
                topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
                topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
                //重试topic的默认权限为读写
                topicConfig.setPerm(perm);
                topicConfig.setTopicSysFlag(topicSysFlag);

                log.info("create new topic {}", topicConfig);
                this.topicConfigTable.put(topic, topicConfig);
                createNew = true;
                //获取下一个版本
                this.dataVersion.nextVersion();
                //持久化broker信息
                this.persist();
            } finally {
                //解锁
                this.topicConfigTableLock.unlock();
            }
        }
    } catch (InterruptedException e) {
        log.error("createTopicInSendMessageBackMethod exception", e);
    }

    if (createNew) {
        //注册broker信息
        this.brokerController.registerBrokerAll(false, true, true);
    }

    return topicConfig;
}
2.1.3 autoCreateTopicEnable自动创建topic的问题

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • Producer发送消息源码, 客户端发送信息前, 会选择一个topic所在的broker地址, 如果topic不存在, 那么会默认topic的路由信息中的一个broker发送。

  • 信息发送到broker后, 会发送没有指定的topic并且如果broker的autoCreateTopicEnable为true的话, 会走createTopicInSendMessageMethod方法, 会自动创建topic方法的最后马上调用registerBrokerAll方法向nameServer注册当前broker的新配置路由信息。

  • 生产者客户端会定时每30s从nameServer更新路由数据, 如果此时有其他的producer的存在, 并且刚好从nameServer获取到了这个新的topic的路由信息, 假设其他producer也需要向该topic发送信息, 由于发现topic路由信息已存在, 并且只存在于刚才那一个broker中, 此时这些producer都会将该topic的消息发送到这一个broker中来。

  • 接下来所有的Producer都只会向这一个Broker发送消息, 其他broker不再有机会创建新topic。本想topic在所有broker创建, 但是只有一个broker有topic信息, 违背了RocketMQ集群实现高可用的本事。

  • 所以RocketMQ官方建议生产环境下将broker的autoCreateTopicEnable设置为false, 闭自动创建topic, 改为手动在每个broker上创建。

3.handlePutMessageResultFuture处理消息存放结果

存放消息后, 调用handlePutMessageResult方法。

在这里插入图片描述

/**
 * SendMessageProcessor的方法
 * <p>
 * 处理消息存放结果
 *
 * @param putMessageResult   存放结果
 * @param response           响应对象
 * @param request            请求对象
 * @param msgInner           内部消息对象
 * @param responseHeader     响应头
 * @param sendMessageContext 发送消息上下文
 * @param ctx                连接上下文
 * @param queueIdInt         queueId
 * @return
 */
private CompletableFuture<RemotingCommand> handlePutMessageResultFuture(CompletableFuture<PutMessageResult> putMessageResult,
                                                                        RemotingCommand response,
                                                                        RemotingCommand request,
                                                                        MessageExt msgInner,
                                                                        SendMessageResponseHeader responseHeader,
                                                                        SendMessageContext sendMessageContext,
                                                                        ChannelHandlerContext ctx,
                                                                        int queueIdInt) {
    //阻塞,当从存放消息完毕时,执行后续的操作,即执行handlePutMessageResult方法
    return putMessageResult.thenApply((r) ->
            handlePutMessageResult(r, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt)
    );
}

handlePutMessageResult处理存放消息的结果, 响应写回给客户端。

/**
 * SendMessageProcessor的方法
 * <p>
 * 处理存放消息的结果
 *
 * @param putMessageResult   存放结果
 * @param response           响应对象
 * @param request            请求对象
 * @param msg                内部消息对象
 * @param responseHeader     响应头
 * @param sendMessageContext 发送消息上下文
 * @param ctx                连接上下文
 * @param queueIdInt         queueId
 * @return
 */
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
                                               RemotingCommand request, MessageExt msg,
                                               SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
                                               int queueIdInt) {
    //结果为null,那么直接返回系统异常
    if (putMessageResult == null) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("store putMessage return null");
        return response;
    }
    boolean sendOK = false;
    //解析存放消息状态码,转换为对应的响应码
    switch (putMessageResult.getPutMessageStatus()) {
        // Success
        case PUT_OK:
            sendOK = true;
            response.setCode(ResponseCode.SUCCESS);
            break;
        case FLUSH_DISK_TIMEOUT:
            response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
            sendOK = true;
            break;
        case FLUSH_SLAVE_TIMEOUT:
            response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
            sendOK = true;
            break;
        case SLAVE_NOT_AVAILABLE:
            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
            sendOK = true;
            break;

        // Failed
        case CREATE_MAPEDFILE_FAILED:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("create mapped file failed, server is busy or broken.");
            break;
        case MESSAGE_ILLEGAL:
        case PROPERTIES_SIZE_EXCEEDED:
            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
            response.setRemark(
                    "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
            break;
        case SERVICE_NOT_AVAILABLE:
            response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
            response.setRemark(
                    "service not available now. It may be caused by one of the following reasons: " +
                            "the broker's disk is full [" + diskUtil() + "], messages are put to the slave, message store has been shut down, etc.");
            break;
        case OS_PAGECACHE_BUSY:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
            break;
        case LMQ_CONSUME_QUEUE_NUM_EXCEEDED:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("[LMQ_CONSUME_QUEUE_NUM_EXCEEDED]broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num, default limit 2w.");
            break;
        case UNKNOWN_ERROR:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("UNKNOWN_ERROR");
            break;
        default:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("UNKNOWN_ERROR DEFAULT");
            break;
    }

    String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
    //如果发送成功
    if (sendOK) {
        //如果topic是SCHEDULE_TOPIC_XXXX,即延迟消息的topic
        if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msg.getTopic())) {
            //增加统计计数
            this.brokerController.getBrokerStatsManager().incQueuePutNums(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
            this.brokerController.getBrokerStatsManager().incQueuePutSize(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getWroteBytes());
        }
        //增加统计计数
        this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
        this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
                putMessageResult.getAppendMessageResult().getWroteBytes());
        this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());

        response.setRemark(null);
        //设置响应头中的migId,实际上就是broker生成的offsetMsgId属性
        responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
        //消息队列Id
        responseHeader.setQueueId(queueIdInt);
        //消息逻辑偏移量
        responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
        //如果不是单向请求,那么将响应写会客户端
        doResponse(ctx, request, response);
        //如果有发送消息的钩子,那么执行
        if (hasSendMessageHook()) {
            sendMessageContext.setMsgId(responseHeader.getMsgId());
            sendMessageContext.setQueueId(responseHeader.getQueueId());
            sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());

            int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
            int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
            int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;

            sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
            sendMessageContext.setCommercialSendTimes(incValue);
            sendMessageContext.setCommercialSendSize(wroteSize);
            sendMessageContext.setCommercialOwner(owner);
        }
        return null;
    } else {
        //如果有发送消息的钩子,那么执行
        if (hasSendMessageHook()) {
            int wroteSize = request.getBody().length;
            int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);

            sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
            sendMessageContext.setCommercialSendTimes(incValue);
            sendMessageContext.setCommercialSendSize(wroteSize);
            sendMessageContext.setCommercialOwner(owner);
        }
    }
    return response;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/704314.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

浅入浅出Java锁

前提 做分布式爬虫时&#xff0c;结合已有的架构&#xff0c;直接对某网站的详情页进行了爬取&#xff1b;尴尬的是&#xff0c;某网站需先采集列表页&#xff0c;之后才能采集详情页&#xff1b;这种防爬手段使用了用户行为监控&#xff0c;行为异常的访问直接就给屏蔽了。 对…

c++11 日期和时间工具-(std::chrono)

链接 std::chrono是C11引入的日期时间处理库&#xff0c;其中包含3种时钟&#xff1a; system_clock&#xff0c;steady_clock&#xff0c;high_resolution_clock。 定义于头文件 <chrono> std::chrono 库 system_clock steady_clock 链接 链接2 每一次调用time_…

解决go install github.com/mattn/goreman@latest安装报错

go install github.com/mattn/goremanlatest报错&#xff1a; [rootlocalhost ~]# go install github.com/mattn/goremanlatest go: github.com/mattn/goremanlatest: module github.com/mattn/goreman: Get "https://proxy.golang.org/github.com/mattn/goreman/v/list&…

现在都在说 Docker 好,为什么我一用就出现这么多问题?查了一宿才解决!

#配置国内源进行docker安装 报错 HTTP Error 404 - Not Found 原因&#xff1a;由于配置国内镜像源时&#xff0c;把地址写错了&#xff0c;导致后面安装docker提示HTTP Error 404 解决方法&#xff1a; 1&#xff09;进入到 /etc/yum.repos.d目录下 cd /etc/yum.repos.d l…

8. 查询每日新用户数

文章目录 题目需求思路一实现一题目来源 题目需求 从用户登录明细表&#xff08;user_login_detail&#xff09;中查询每天的新增用户数&#xff0c;若一个用户在某天登录了&#xff0c;且在这一天之前没登录过&#xff0c;则任务该用户为这一天的新增用户。 期望结果如下&am…

Baumer工业相机堡盟工业相机如何通过BGAPISDK进行定序器编程:根据每次触发信号移动感兴趣区域(C#)

Baumer工业相机堡盟工业相机如何通过BGAPISDK进行定序器编程:根据每次触发信号移动感兴趣区域&#xff08;C#&#xff09; Baumer工业相机Baumer工业相机BGAPISDK和定序器编程的技术背景Baumer工业相机通过BGAPISDK进行定序器编程功能1.引用合适的类文件2.Baumer工业相机通过BG…

采集发布到WordPress自定义参数

Wordpress有自定义设置的参数&#xff0c;一般是用户自行设置&#xff0c;或主题和插件扩展新增的自定义参数&#xff0c;要怎么发布&#xff1f; WordPress主题或插件扩展新增的自定义参数&#xff0c;一般是保存到数据库的wp_postmeta表中。 先去数据库中确定对应自定义参数…

设计消息模块的持久层

一、创建MessageDao类 在 com.example.emos.wx.db.dao 包中创建 MessageDao.java 类 Repository public class MessageDao {Autowiredprivate MongoTemplate mongoTemplate;public String insert(MessageEntity entity){Date sendTimeentity.getSendTime();sendTimeDateUtil.…

【day4】类和对象

#include <iostream> using namespace std;class Complex {int real;int vir; public:Complex(){}Complex(int a,int b):real(a),vir(b){}void show(){cout << real << "" << vir << "i" << endl;}//成员函数版的运算…

Java基础-lambda表达式

简化匿名内部类的书写 下面两种写法均可&#xff1b; Arrays.sort(arr, new Comparator<Integer>() {Overridepublic int compare(Integer o1, Integer o2) {return o1 - o2;} }); Arrays.sort(arr, (Integer o1, Integer o2) -> {return o1 - o2;} );函数式编程思想&…

山西电力市场日前价格预测【2023-07-01】

日前价格预测 预测明日&#xff08;2023-07-01&#xff09;山西电力市场全天平均日前电价为364.57元/MWh。其中&#xff0c;最高日前价格为451.88元/MWh&#xff0c;预计出现在21: 30。最低日前电价为309.59元/MWh&#xff0c;预计出现在13: 30。以上预测仅供学习参考&#xff…

LabVIEW开发工业物联网状态监测

物理对象的网络&#xff0c;允许在它们之间传输数据。信息通常保存在集中式云数据库中。由于物联网&#xff0c;我们现在可以从远处进行监控和感知。由于网络和通信的增加&#xff0c;越来越多的流程可能会自动化。 调度、维护管理和质量改进等关键领域的决策正受到大数据技术…

python接口自动化(六)--发送get请求接口(详解)

简介 如果想用python做接口测试&#xff0c;我们首先有不得不了解和学习的模块。它就是第三方模块&#xff1a;Requests。 虽然Python内置的urllib模块&#xff0c;用于访问网络资源。但是&#xff0c;它用起来比较麻烦&#xff0c;而且&#xff0c;缺少很多实用的高级功能。更…

Git入门级指南

Git入门级指南 在软件开发和版本控制中&#xff0c;Git是一种非常流行且强大的工具。本文将为你提供关于Git的基本知识&#xff0c;并提 供一些实例来演示如何正确使用Git来管理代码。 关于git的简介 Git是一种分布式版本控制系统&#xff0c;它可以跟踪和管理项目中的代码…

前端安全问题及解决方案

随着互联网的高速发展&#xff0c;信息安全问题已经成为行业最为关注的焦点之一。总的来说安全是很复杂的一个领域&#xff0c;在移动互联网时代&#xff0c;前端人员除了传统的 XSS、CSRF 等安全问题之外&#xff0c;还时常遭遇网络劫持、非法调用 Hybrid API 等新型安全问题。…

(2023最新)互联网1010道Java面试真题汇总

我相信各位小伙伴们都发现了&#xff0c;现在的 IT 的环境并不如以前了&#xff0c;似乎是迎来“寒冬”&#xff0c;再加上最近上热搜的阿里云大裁员事件&#xff0c;又将 Java 开发岗推上了一个新的难度。而被裁员的人&#xff0c;不得不降薪重新找到一份工作&#xff0c;而经…

Django框架-5

路由系统 通过URL&#xff08;Uniform Resource Locator&#xff0c;统一资源定位符&#xff09;可以访问互联网上的资源——用户通过 浏览器向指定URL发起请求&#xff0c;Web服务器接收请求并返回用户请求的资源&#xff0c;因此可以将URL视为用户与服务器之间交互的桥梁。 …

MATLAB中scatter函数用法

目录 语法 说明 ​示例 scatter函数的功能是绘制散点图。 语法 scatter(x,y) scatter(x,y,sz) scatter(x,y,sz,c) scatter(___,"filled") scatter(___,mkr) scatter(tbl,xvar,yvar) scatter(tbl,xvar,yvar,"filled") scatter(ax,___) scatter(___,Nam…

Mybatis面试题--MyBatis延迟加载

Mybatis是否支持延迟加载&#xff1f; 答&#xff1a;Mybatis支持延迟记载&#xff0c;但默认没有开启 什么叫做延迟加载&#xff1f; 查询用户的时候&#xff0c;把用户所属的订单数据也查询出来&#xff0c;这个是立即加载 查询用户的时候&#xff0c;暂时不查询订单数据&…

【C/C++】拷贝构造函数的调用 使用方法

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; &#x1f525;c系列专栏&#xff1a;C/C零基础到精通 &#x1f525; 给大…