[RocketMQ] Producer发送消息的总体流程 (七)

news2024/9/27 5:49:57
  • 单向发送: 把消息发向Broker服务器, 不管Broker是否接收, 只管发, 不管结果。
  • 同步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。
  • 异步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。异步所以发送消息后, 不用等待, 等到Broker服务器的响应调用回调。

文章目录

      • 1.send源码入口
        • 1.1 同步消息
        • 1.2 单向消息
        • 1.3 异步消息
      • 2.sendDefaultImpl发送消息实现
        • 2.1 makeSureStateOK确定生产者服务状态
        • 2.2 checkMessage校验消息的合法性
        • 2.3 tryToFindTopicPublishInfo查找topic的发布信息
        • 2.4 计算发送次数timesTotal
        • 2.5 selectOneMessageQueue选择消息队列
        • 2.6 sendKernelImpl发送消息
          • 2.6.1 findBrokerAddressInPublish查找broker地址
          • 2.6.2 brokerVIPChannel判断vip通道
          • 2.6.3 setUniqID生成uniqId
          • 2.6.4 tryToCompressMessage压缩消息
        • 2.7 updateFaultItem更新故障表
          • 2.7.1 computeNotAvailableDuration计算隔离时间
          • 2.7.2 updateFaultItem更新故障表
      • 3.总结

DefaultMQProducer提供了很多send方法的重载:

在这里插入图片描述
在这里插入图片描述

1.send源码入口

1.1 同步消息

在这里插入图片描述

public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //调用defaultMQProducerImpl#send发送消息
    return this.defaultMQProducerImpl.send(msg);
}

defaultMQProducerImpl#send发送消息。
在这里插入图片描述

调用另一个send(), 超时时间为3s。

public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //调用另一个send方法,设置超时时间参数,默认3000ms
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

在这里插入图片描述

/**
 * DefaultMQProducerImpl的方法
 *
 * @param msg     消息
 * @param timeout 超时时间,毫秒值
 */
public SendResult send(Message msg,
                       long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //调用另一个sendDefaultImpl方法,设置消息发送模式为SYNC,即同步;设置回调函数为null
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

该方法内部又调用另一个sendDefaultImpl方法, 设置消息发送方式为SYNC, 为同步, 设置回调函数为null。

1.2 单向消息

单向消息使用sendOneway发送。

在这里插入图片描述

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException
 {
    //根据namespace设置topic
    msg.setTopic(withNamespace(msg.getTopic()));
    //调用defaultMQProducerImpl#sendOneway发送消息
    this.defaultMQProducerImpl.sendOneway(msg);
}

defaultMQProducerImpl#sendOneway。

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
    try {
        //调用sendDefaultImpl方法,设置消息发送模式为ONEWAY,即单向;设置回调函数为null;设置超时时间参数,默认3000ms
        this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
    } catch (MQBrokerException e) {
        throw new MQClientException("unknown exception", e);
    }
}

最终调用sendDefaultImpl方法, 发送模式为ONEWAY, 设置回调函数为null, 超时时间为3s。

1.3 异步消息

异步消息使用带有callback函数的send方法发送。

在这里插入图片描述

public void send(Message msg, SendCallback sendCallback) throws MQClientException,
 RemotingException, InterruptedException {
    //根据namespace设置topic
    msg.setTopic(withNamespace(msg.getTopic()));
    //调用defaultMQProducerImpl#send发送消息,带有sendCallback参数
    this.defaultMQProducerImpl.send(msg, sendCallback);
}

该方法内部调用defaultMQProducerImpl#send方法发送消息, 带sendCallback参数。

public void send(Message msg, SendCallback sendCallback) throws MQClientException, 
RemotingException, InterruptedException {
    //该方法内部又调用另一个send方法,设置超时时间参数,默认3000ms。
    send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
        throws MQClientException, RemotingException, InterruptedException {
    //调用起始时间
    final long beginStartTime = System.currentTimeMillis();
    //获取异步发送执行器线程池
    ExecutorService executor = this.getAsyncSenderExecutor();
    try {
        /*
         * 使用线程池异步的执行sendDefaultImpl方法,即异步发送消息
         */
        executor.submit(new Runnable() {
            @Override
            public void run() {
                /*
                 * 发送之前计算超时时间,如果超时则不发送,直接执行回调函数onException方法
                 */
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeout > costTime) {
                    try {
                        //调用sendDefaultImpl方法执行发送操作
                        sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
                    } catch (Exception e) {
                        //抛出异常,执行回调函数onException方法
                        sendCallback.onException(e);
                    }
                } else {
                    //超时,执行回调函数onException方法
                    sendCallback.onException(
                            new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
                }
            }

        });
    } catch (RejectedExecutionException e) {
        throw new MQClientException("executor rejected ", e);
    }

}

方法内部会获取获取异步发送执行器线程池, 使用线程池异步的执行sendDefaultImpl方法, 即异步发送。

发送之前计算超时时间, 如果超时则执行回调函数onException()。

2.sendDefaultImpl发送消息实现

该方法位于DefaultMQProducerImpl中, 无论是同步, 异步, 还是单向, 最后调用的都是sendDefaultImpl方法。

  1. makeSureStateOK方法, 确定此producer的服务状态正常, 如果服务状态不是RUNNING, 那么抛出异常。
  2. 检查消息的合法性。
  3. 调用tryToFindTopicPublishInfo方法, 尝试查找消息的一个topic路由, 进行发送消息。
  4. 计算循环发送消息的总次数timesTotal, 默认情况下, 同步为3, 允许重试2次, 其他模式为1, 即不允许重试。实际上, 异步发送消息可以最多重试2次, 不是这里实现的。
  5. 调用selectOneMessageQueue方法, 选择一个队列MessageQueue, 支持失败故障转移。
  6. 调用sendKernelImpl方法发送消息, 同步、异步、单向发送消息都是这个方法实现的。
  7. 调用updateFaultItem方法, 更新本地错误表缓存数据, 用于延迟时间的故障转移的功能。
  8. 根据发送模式的不同, 如果是异步或者单向发送则直接返回, 如果是同步的话, 如果开启了retryAnotherBrokerWhenNotStoreOK, 那么如果返回值不返回SEND_OK状态, 则重新执行。
  9. 此过程中, 如果抛出RemotingException、MQClientException、以及MQBrokerException异常, 那么会重试, 如果抛出InterruptedException, 或者超时则不会重试。

在这里插入图片描述
在这里插入图片描述

/**
 * DefaultMQProducerImpl的方法
 *
 * @param msg               方法
 * @param communicationMode 通信模式
 * @param sendCallback      回调方法
 * @param timeout           超时时间
 */
private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    /*
     * 1 确定此producer的服务状态正常,如果服务状态不是RUNNING,那么抛出异常
     */
    this.makeSureStateOK();
    /*
     * 2 校验消息的合法性
     */
    Validators.checkMessage(msg, this.defaultMQProducer);
    //生成本次调用id
    final long invokeID = random.nextLong();
    //开始时间戳
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    //结束时间戳
    long endTimestamp = beginTimestampFirst;
    /*
     * 3 尝试查找消息的一个topic路由,用以发送消息
     */
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    //找到有效的topic信息
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        /*
         * 4 计算发送消息的总次数
         * 同步模式为3,即默认允许重试2次,可更改重试次数;其他模式为1,即不允许重试,不可更改
         */
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        //记录每一次重试时候发送消息目标Broker名字的数组
        String[] brokersSent = new String[timesTotal];
        /*
         * 在循环中,发送消息,包含消息重试的逻辑,总次数默认不超过3
         */
        for (; times < timesTotal; times++) {
            //上次使用过的broker,可以为空,表示第一次选择
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            /*
             * 5 选择一个消息队列MessageQueue
             */
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                //设置brokerName
                brokersSent[times] = mq.getBrokerName();
                try {
                    //调用的开始时间
                    beginTimestampPrev = System.currentTimeMillis();
                    //如果还有可调用次数,那么
                    if (times > 0) {
                        //在重新发送期间用名称空间重置topic
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    //现在调用的开始时间 减去 开始时间,判断时候在调用发起之前就超时了
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    //如果已经超时了,那么直接结束循环,不再发送
                    //即超时的时候,即使还剩下重试次数,也不会再继续重试
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
                    /*
                     * 6 异步、同步、单向发送消息
                     */
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    //方法调用结束时间戳
                    endTimestamp = System.currentTimeMillis();
                    /*
                     * 7 更新本地错误表缓存数据,用于延迟时间的故障转移的功能
                     */
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    /*
                     * 8 根据发送模式执行不同的处理
                     */
                    switch (communicationMode) {
                        //异步和单向模式直接返回null
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            //同步模式,如果开启了retryAnotherBrokerWhenNotStoreOK开关,那么如果不是返回SEND_OK状态,则仍然会执行重试发送
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }
                            //如果发送成功,则返回
                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) {
                    //RemotingException异常,会执行重试
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQClientException e) {
                    //MQClientException异常,会执行重试
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQBrokerException e) {
                    //MQBrokerException异常
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    //如果返回的状态码属于一下几种,则支持重试:
                    //ResponseCode.TOPIC_NOT_EXIST,
                    //ResponseCode.SERVICE_NOT_AVAILABLE,
                    //ResponseCode.SYSTEM_ERROR,
                    //ResponseCode.NO_PERMISSION,
                    //ResponseCode.NO_BUYER_ID,
                    //ResponseCode.NOT_IN_CURRENT_UNIT

                    if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
                        continue;
                    } else {
                        //其他状态码不支持重试,如果有结果则返回,否则直接抛出异常
                        if (sendResult != null) {
                            return sendResult;
                        }

                        throw e;
                    }
                } catch (InterruptedException e) {
                    //InterruptedException异常,不会执行重试
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());

                    log.warn("sendKernelImpl exception", e);
                    log.warn(msg.toString());
                    throw e;
                }
            } else {
                break;
            }
        }
        /*
         * 抛出异常的操作
         */
        if (sendResult != null) {
            return sendResult;
        }

        String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

        info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

        MQClientException mqClientException = new MQClientException(info, exception);
        if (callTimeout) {
            throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
        }

        if (exception instanceof MQBrokerException) {
            mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
        } else if (exception instanceof RemotingConnectException) {
            mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
        } else if (exception instanceof RemotingTimeoutException) {
            mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
        } else if (exception instanceof MQClientException) {
            mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
        }

        throw mqClientException;
    }

    validateNameServerSetting();

    throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

2.1 makeSureStateOK确定生产者服务状态

确定此producer的服务状态正常, 如果服务状态不是RUNNING, 那么抛出异常。

在这里插入图片描述

/**
 * DefaultMQProducerImpl的方法
 */
private void makeSureStateOK() throws MQClientException {
    //服务状态不是RUNNING,那么抛出MQClientException异常。
    if (this.serviceState != ServiceState.RUNNING) {
        throw new MQClientException("The producer service state not OK, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
    }
}

2.2 checkMessage校验消息的合法性

  1. 如果msg消息为null, 抛出异常。
  2. 校验topic, 如果topic为空, 或者长度大于127字符, 或者topic中有非法字符则抛出异常, 如果当前topic是不为允许使用的系统topic, 抛出异常。
  3. 校验消息体, 如果消息体为null, 或者为空数组, 或者消息字节数组长度大于4M, 抛出异常。
    在这里插入图片描述
/**
 * Validators的方法
 */
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
    //如果消息为null,抛出异常
    if (null == msg) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    /*
     * 校验topic
     */
    //如果topic为空,或者长度大于127个字符,或者topic的字符串不符合 "^[%|a-zA-Z0-9_-]+$"模式,即包含非法字符,那么抛出异常
    Validators.checkTopic(msg.getTopic());
    //如果当前topic是不为允许使用的系统topic SCHEDULE_TOPIC_XXXX,那么抛出异常
    Validators.isNotAllowedSendTopic(msg.getTopic());

    // body
    //如果消息体为null,那么抛出异常
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }
    //如果消息体为空数组,那么抛出异常
    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }
    //如果消息 字节数组长度大于4,194,304,即消息的大小大于4M,那么抛出异常
    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
                "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
    }
}

public static void checkTopic(String topic) throws MQClientException {
    //如果topic为空,那么抛出异常
    if (UtilAll.isBlank(topic)) {
        throw new MQClientException("The specified topic is blank", null);
    }
    //如果topic长度大于127个字符,那么抛出异常
    if (topic.length() > TOPIC_MAX_LENGTH) {
        throw new MQClientException(
                String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null);
    }
    //如果topic字符串包含非法字符,那么抛出异常
    if (isTopicOrGroupIllegal(topic)) {
        throw new MQClientException(String.format(
                "The specified topic[%s] contains illegal characters, allowing only %s", topic,
                "^[%|a-zA-Z0-9_-]+$"), null);
    }
}

2.3 tryToFindTopicPublishInfo查找topic的发布信息

该方法用于查找指定topic的发布信息TopicPublishInfo。

  1. 首先在本地缓存topicPublishInfoTable获取。
  2. updateTopicRouteInfoFromNameServer()获取, 从nameServer同步此topic的路由配置信息。
  3. 从nameServer同步topic数据。

在这里插入图片描述

/**
 * DefaultMQProducerImpl的方法
 * <p>
 * 查找指定topic的推送信息
 */
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    //尝试直接从producer的topicPublishInfoTable中获取topic信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    //如果没有获取到有效信息,
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        //那么立即创建一个TopicPublishInfo
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        //立即从nameServer同步此topic的路由配置信息,并且更新本地缓存
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        //再次获取topicPublishInfo
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
    //如果找到的路由信息是可用的,直接返回
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        //再次从nameServer同步topic的数据,不过这次使用默认的topic “TBW102”去找路由配置信息作为本topic参数信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
  • 首先在本地缓存topicPublishInfoTable获取, 如果没有获取到, 调用updateTopicRouteInfoFromNameServer方法从nameServer同步此topic的路由配置信息, 并更新缓存。如果还是没有获取到有效数据, 再次从nameServer同步topic数据, 不过这次默认的topic是 “TBW102”去找路由配置信息作为本topic参数信息。

TopicPublishInfo: topic的发布信息

在这里插入图片描述

/**
 * 是否是顺序消息
 */
private boolean orderTopic = false;
/**
 * 是否包含路由信息
 */
private boolean haveTopicRouterInfo = false;
/**
 * topic的消息队列集合
 */
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
/**
 * 当前线程线程的消息队列的下标,循环选择消息队列使用+1
 */
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
/**
 * topic路由信息,包括topic的队列信息queueDatas,topic的broker信息brokerDatas,顺序topic配置orderTopicConf,消费过滤信息filterServerTable等属性
 */
private TopicRouteData topicRouteData;

2.4 计算发送次数timesTotal

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + 
this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

计算循环发送消息的总次数timesTotal, 默认情况下, 同步为3, 允许重试2次, 其他模式为1, 即不允许重试。实际上, 异步发送消息可以最多重试2次, 不是这里实现的。是MQClientAPIImpl#sendMessage方法中。

2.5 selectOneMessageQueue选择消息队列

方法内部调用mqFaultStrategy#selectOneMessageQueue方法:

在这里插入图片描述

/**
 * DefaultMQProducerImpl的方法
 *
 * 选择一个消息队列
 * @param tpInfo topic信息
 * @param lastBrokerName 上次使用过的broker
 */
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    //调用mqFaultStrategy#selectOneMessageQueue方法
    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

mqFaultStrategy#selectOneMessageQueue方法支持故障转移机制:

  1. 首先判断是否开启了发送延迟故障转移机制, 即判断sendLatencyFaultEnable属性是否为true, 默认为false不开启。
    1. 如果开启的话, 首先仍然是遍历消息队列, 按照轮询的方式选取一个消息队列, 当消息队列可用时, 选择消息队列的工作就结束, 否则循环选择其他队列。如果该mq的broker不存在LatencyFaultTolerance维护的faultItemTable集合属性中, 或者当前时间戳大于该broker下一次开始可用的时间戳, 则表示无障碍。
    2. 如果没有选出无故障mq, 那么从LatencyFaultTolerance维护的不是最好的broker集合faultItemTable中随机选择一个broker, 随后判断如果写队列数大于0, 那么选择该Broker。然后遍历消息队列, 采用取模的方式获取一个队列, 重置其brokerName和queueId, 进行消息发送。
    3. 如果上面的步骤抛出了异常, 那么遍历消息队列, 采用取模的方式获取一个队列。
  2. 如果没有开启延迟故障转移机制, 那么遍历消息队列, 采用取模轮询的方式获取一个brokerName与lastBrokerName不相等的队列, 即不会再次选择上次发送失败的broker。如果没有找到一个不同broker的mq, 那么退回到轮询的方式。

selectOneMessageQueue方法选择mq的时候的故障转移机制, 目的是为了保证每次消息尽快的发送, 是一种高可用手段。

  1. 延迟时间的故障转移, 消息队列选择时候, 可用过滤mq认为不可用的broker, 以免不断为宕机的broker发送消息, 选取一个延迟比较短的broker, 实现消息发送高可用。
  2. 没有开启延迟时间的故障转移的时候, 在轮询选择mq的时候, 不会选择上一次发送失败的broker, 实现消息发送高可用。
/**
 * MQFaultStrategy的方法
 * <p>
 * 选择一个消息队列,支持故障延迟转移
 *
 * @param tpInfo         topic信息
 * @param lastBrokerName 上次使用过的broker
 */
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    /*
     * 判断是否开启了发送延迟故障转移机制,默认false不打开
     * 如果开启了该机制,那么每次选取topic下对应的queue时,会基于之前执行的耗时,在有存在符合条件的broker的前提下,优选选取一个延迟较短的broker,否则再考虑随机选取。
     */
    if (this.sendLatencyFaultEnable) {
        try {
            //当前线程线程的消息队列的下标,循环选择消息队列使用+1
            int index = tpInfo.getSendWhichQueue().incrementAndGet();
            //遍历消息队列,采用取模的方式获取一个队列,即轮询的方式
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                //取模
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                //获取该消息队列
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                //如果当前消息队列是可用的,即无故障,那么直接返回该mq
                //如果该broker不存在LatencyFaultTolerance维护的faultItemTable集合属性中,或者当前时间已经大于该broker下一次开始可用的时间点,表示无故障
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    return mq;
            }
            //没有选出无故障的mq,那么一个不是最好的broker集合中随机选择一个
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            //如果写队列数大于0,那么选择该broker
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                //遍历消息队列,采用取模的方式获取一个队列,即轮询的方式
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    //重置其brokerName,queueId,进行消息发送
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                }
                return mq;
            } else {
                //如果写队列数小于0,那么移除该broker
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
        //如果上面的步骤抛出了异常,那么遍历消息队列,采用取模的方式获取一个队列,即轮询的方式
        return tpInfo.selectOneMessageQueue();
    }
    //如果没有发送延迟故障转移机制,那么那么遍历消息队列,即采用取模轮询的方式
    //获取一个brokerName与lastBrokerName不相等的队列,即不会再次选择上次发送失败的broker
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

selectOneMessageQueue方法有两个重载方法, 一个无参的, 一个有参的。

无参的表示选择mq无限制:

/**
 * TopicPublishInfo的方法
 * <p>
 * 轮询的选择一个mq
 */
public MessageQueue selectOneMessageQueue() {
    //获取下一个index
    int index = this.sendWhichQueue.incrementAndGet();
    //取模计算索引
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    //获取该索引的mq
    return this.messageQueueList.get(pos);
}

有参数, 其参数是上一次发送失败的brokerName, 表示不会选择上一次失败的brokerName的mq。如果最后没有选择出来, 那么走轮询的逻辑。

/**
 * TopicPublishInfo的方法
 *
 * @param lastBrokerName 上一次发送失败的brokerName
 */
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    //如果lastBrokerName为null,即第一次发送,那么轮询选择一个
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            //轮询选择一个mq
            int index = this.sendWhichQueue.incrementAndGet();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            //如果mq的brokerName不等于lastBrokerName,就返回,否则选择下一个
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        //没有选出来,那么轮询选择一个
        return selectOneMessageQueue();
    }
}

2.6 sendKernelImpl发送消息

选择了队列后, 调用sendKernelImpl发送消息

  1. 首先调用findBrokerAddressInPublish方法从brokerAddrTable中查找Master broker地址。如果找不到, tryToFindTopicPublishInfo从nameServer远程拉取配置, 并更新本地缓存, 再次尝试获取Master broker地址。
  2. 调用brokerVIPChannel判断是否开启vip通道, 如果开启了, 那么将brokerAddr的port – 2, vip通道的端口为普通端口-2。
  3. 如果不是批量消息, 那么设置唯一的uniqId。
  4. 如果不是批量消息, 且消息体大于4K, 那么压缩消息。
  5. 如果存在CheckForbiddenHook, 则执行checkForbidden钩子方法。如果存在SendMessageHook, 则执行sendMessageBefore钩子方法。
  6. 设置请求头信息SendMessageRequestHeader, 请求头包含各种基本属性, producerGroup, topic, queueId, 并且将消息重试次数和最大重试次数存入请求头中。
  7. 根据不同的发送模式发送消息, 如果是异步, 需要先克隆并还原消息, 最终异步, 同步, 单向都是调用MQClientAPIImpl#sendMessage方法发送消息的。
  8. 如果MQClientAPIImpl#sendMessage方法正常发送或者抛出RemotingException、MQBrokerException、InterruptedException异常, 判断如果存在SendMessageHook, 执行sendMessageAfter钩子方法。
  9. finally中对消息进行恢复。

在这里插入图片描述

在这里插入图片描述

/**
 * DefaultMQProducerImpl的方法
 * 发送消息
 *
 * @param msg               消息
 * @param mq                mq
 * @param communicationMode 发送模式
 * @param sendCallback      发送回调
 * @param topicPublishInfo  topic信息
 * @param timeout           超时时间
 * @return 发送结果
 */
private SendResult sendKernelImpl(final Message msg,
                                  final MessageQueue mq,
                                  final CommunicationMode communicationMode,
                                  final SendCallback sendCallback,
                                  final TopicPublishInfo topicPublishInfo,
                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //开始时间
    long beginStartTime = System.currentTimeMillis();
    /*
     * 1 根据brokerName从brokerAddrTable中查找broker地址
     */
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    //如果本地找不到 broker 的地址
    if (null == brokerAddr) {
        /*
         * 2 从nameServer远程拉取配置,并更新本地缓存
         * 该方法此前就学习过了
         */
        tryToFindTopicPublishInfo(mq.getTopic());
        //再次获取地址
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }

    SendMessageContext context = null;
    if (brokerAddr != null) {
        /*
         * 3 vip通道判断
         */
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

        byte[] prevBody = msg.getBody();
        try {
            //for MessageBatch,ID has been set in the generating process
            /*
             * 4 如果不是批量消息,那么尝试生成唯一uniqId,即UNIQ_KEY属性。MessageBatch批量消息在生成时就已经设置uniqId
             * uniqId也被称为客户端生成的msgId,从逻辑上代表唯一一条消息
             */
            if (!(msg instanceof MessageBatch)) {
                MessageClientIDSetter.setUniqID(msg);
            }
            /*
             * 设置nameSpace为实例Id
             */
            boolean topicWithNamespace = false;
            if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                topicWithNamespace = true;
            }
            //消息标识符
            int sysFlag = 0;
            //消息压缩标识
            boolean msgBodyCompressed = false;
            /*
             * 5 尝试压缩消息
             */
            if (this.tryToCompressMessage(msg)) {
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                msgBodyCompressed = true;
            }
            //事务消息标志,prepare消息
            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (Boolean.parseBoolean(tranMsg)) {
                sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
            }
            /*
             * 6 如果存在CheckForbiddenHook,则执行checkForbidden方法
             * 为什么叫禁止钩子呢,可能是想要使用者将不可发送消息的检查放在这个钩子函数里面吧(猜测)
             */
            if (hasCheckForbiddenHook()) {
                CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                checkForbiddenContext.setCommunicationMode(communicationMode);
                checkForbiddenContext.setBrokerAddr(brokerAddr);
                checkForbiddenContext.setMessage(msg);
                checkForbiddenContext.setMq(mq);
                checkForbiddenContext.setUnitMode(this.isUnitMode());
                this.executeCheckForbiddenHook(checkForbiddenContext);
            }
            /*
             * 7 如果存在SendMessageHook,则执行sendMessageBefore方法
             */
            if (this.hasSendMessageHook()) {
                context = new SendMessageContext();
                context.setProducer(this);
                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                context.setCommunicationMode(communicationMode);
                context.setBornHost(this.defaultMQProducer.getClientIP());
                context.setBrokerAddr(brokerAddr);
                context.setMessage(msg);
                context.setMq(mq);
                context.setNamespace(this.defaultMQProducer.getNamespace());
                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (isTrans != null && isTrans.equals("true")) {
                    context.setMsgType(MessageType.Trans_Msg_Half);
                }

                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                    context.setMsgType(MessageType.Delay_Msg);
                }
                this.executeSendMessageHookBefore(context);
            }
            /*
             * 8 设置请求头信息
             */
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setSysFlag(sysFlag);
            requestHeader.setBornTimestamp(System.currentTimeMillis());
            requestHeader.setFlag(msg.getFlag());
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            requestHeader.setReconsumeTimes(0);
            requestHeader.setUnitMode(this.isUnitMode());
            requestHeader.setBatch(msg instanceof MessageBatch);
            //针对重试消息的处理
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                //获取消息重新消费次数属性值
                String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                if (reconsumeTimes != null) {
                    //将重新消费次数设置到请求头中,并且清除该属性
                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                }
                //获取消息的最大重试次数属性值
                String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                if (maxReconsumeTimes != null) {
                    //将最大重新消费次数设置到请求头中,并且清除该属性
                    requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                }
            }
            /*
             * 9 根据不同的发送模式,发送消息
             */
            SendResult sendResult = null;
            switch (communicationMode) {
                /*
                 * 异步发送模式
                 */
                case ASYNC:
                    /*
                     * 首先克隆并还原消息
                     *
                     * 该方法的finally中已经有还原消息的代码了,为什么在异步发送消息之前,还要先还原消息呢?
                     *
                     * 因为异步发送时 finally 重新赋值的时机并不确定,有很大概率是在第一次发送结束前就完成了 finally 中的赋值,
                     * 因此在内部重试前 msg.body 大概率已经被重新赋值过,而 onExceptionImpl 中的重试逻辑 MQClientAPIImpl.sendMessageAsync 不会再对数据进行压缩,
                     * 简言之,在异步发送的情况下,如果调用 onExceptionImpl 内部的重试,有很大概率发送的是无压缩的数据
                     */
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    //如果开启了消息压缩
                    if (msgBodyCompressed) {
                        //If msg body was compressed, msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                        //克隆一个message
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        //恢复原来的消息体
                        msg.setBody(prevBody);
                    }
                    //如果topic整合了namespace
                    if (topicWithNamespace) {
                        if (!messageCloned) {
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        //还原topic
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                    }
                    /*
                     * 发送消息之前,进行超时检查,如果已经超时了那么取消本次发送操作,抛出异常
                     */
                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    /*
                     * 10 发送异步消息
                     */
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                    break;
                /*
                 * 单向、同步发送模式
                 */
                case ONEWAY:
                case SYNC:
                    /*
                     * 发送消息之前,进行超时检查,如果已经超时了那么取消本次发送操作,抛出异常
                     */
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    /*
                     * 10 发送单向、同步消息
                     */
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                    break;
                default:
                    assert false;
                    break;
            }
            /*
             * 9 如果存在SendMessageHook,则执行sendMessageAfter方法
             */
            if (this.hasSendMessageHook()) {
                context.setSendResult(sendResult);
                this.executeSendMessageHookAfter(context);
            }
            //返回执行结果
            return sendResult;

            //如果抛出了异常,如果存在SendMessageHook,则执行sendMessageAfter方法
        } catch (RemotingException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (MQBrokerException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (InterruptedException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } finally {
            /*
             * 对消息进行恢复
             * 1、因为客户端可能还需要查看原始的消息内容,如果是压缩消息,则无法查看
             * 2、另外如果第一次压缩后消息还是大于4k,如果不恢复消息,那么客户端使用该message重新发送的时候,还会进行一次消息压缩
             */
            msg.setBody(prevBody);
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
    }

    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
2.6.1 findBrokerAddressInPublish查找broker地址

根据brokerName向brokerAddrTable查找数据, 因为生产者只会向Master发送数据, 所以返回的是Master地址。

在这里插入图片描述

/**
 * MQClientInstance的方法
 */
public String findBrokerAddressInPublish(final String brokerName) {
    //查询brokerAddrTable缓存的数据
    HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
    //返回Mater节点的地址
    if (map != null && !map.isEmpty()) {
        return map.get(MixAll.MASTER_ID);
    }

    return null;
}
2.6.2 brokerVIPChannel判断vip通道

调用brokerVIPChannel判断是否开启vip通道, 如果开启了, 那么将brokerAddr的port – 2, vip通道的端口为普通端口-2。

在这里插入图片描述

/**
 * MixAll的方法
 */
public static String brokerVIPChannel(final boolean isChange, final String brokerAddr) {
    //如果开启了vip通道
    if (isChange) {
        int split = brokerAddr.lastIndexOf(":");
        String ip = brokerAddr.substring(0, split);
        String port = brokerAddr.substring(split + 1);
        //重新拼接brokerAddr,其中port - 2
        String brokerAddrNew = ip + ":" + (Integer.parseInt(port) - 2);
        return brokerAddrNew;
    } else {
        //如果没有开启vip通道,那么返回原地址
        return brokerAddr;
    }
}

消费者拉取消息只能请求普通通道, 但是生产者可以选择vip和普通通道。

2.6.3 setUniqID生成uniqId

设置到UNIQ_KEY属性中, 批量消息在生成时就已经设置uniqId。

uniqId表示客户端生成的唯一一条消息。

在这里插入图片描述

/**
 * MessageClientIDSetter的方法
 */
public static void setUniqID(final Message msg) {
    //如果这条消息不存在"UNIQ_KEY"属性,那么创建uniqId并且存入"UNIQ_KEY"属性中
    if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
        msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
    }
}
2.6.4 tryToCompressMessage压缩消息

消息压缩, 压缩比为5, 压缩之后, 设置压缩标志, 批量消息不支持压缩, 消息压缩有利于网络传输数据。

在这里插入图片描述

/**
 * DefaultMQProducerImpl的方法
 */
private boolean tryToCompressMessage(final Message msg) {
    //如果是批量消息,那么不进行压缩
    if (msg instanceof MessageBatch) {
        //batch dose not support compressing right now
        return false;
    }
    byte[] body = msg.getBody();
    if (body != null) {
        //如果消息长度大于4K
        if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
            try {
                //进行压缩,使用的JDK自带的压缩类
                byte[] data = UtilAll.compress(body, zipCompressLevel);
                if (data != null) {
                    //重新设置到body中
                    msg.setBody(data);
                    return true;
                }
            } catch (IOException e) {
                log.error("tryToCompressMessage exception", e);
                log.warn(msg.toString());
            }
        }
    }

    return false;
}

2.7 updateFaultItem更新故障表

发送消息后, 无论正常与否, 调用updateFaultItem更新故障表, 更新本地错误表缓存数据, 用于延迟时间的故障转移功能。

故障转移功能在此前的selectOneMessageQueue方法中被使用到, 用于查找一个可用的消息队列, updateFaultItem方法判断是否开启了故障转移功能, 会更新LatencyFaultTolerance维护的faultItemTable集合属性中的异常broker数据。

在这里插入图片描述

/**
 * DefaultMQProducerImpl的方法
 * @param brokerName brokerName
 * @param currentLatency 当前延迟
 * @param isolation 是否使用默认隔离
 */
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    //调用MQFaultStrategy#updateFaultItem方法
    this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}

MQFaultStrategy#updateFaultItem方法。其根据本次发送消息的延迟时间currentLatency, 计算出broker的隔离时间duration, 即是broker的下一个可用时间点。用于更新故障记录表。

/**
 * MQFaultStrategy的方法
 *
 * @param brokerName     brokerName
 * @param currentLatency 当前延迟
 * @param isolation      是否使用默认隔离时间
 */
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    //如果开启了故障转移,即sendLatencyFaultEnable为true,默认false
    if (this.sendLatencyFaultEnable) {
        //根据消息当前延迟currentLatency计算当前broker的故障延迟的时间duration
        //如果isolation为true,则使用默认隔离时间30000,即30s
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        //更新故障记录表
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}
2.7.1 computeNotAvailableDuration计算隔离时间

根据消息当前延迟currentLatency计算当前broker的故障延迟的时间duration, 据此即可以计算出该broker的下一个可用时间点。

在这里插入图片描述

//延迟等级
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
//不可用时间等级
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

/**
 * MQFaultStrategy的方法
 *
 * @param currentLatency 当前延迟
 * @return 故障延迟的时间
 */
private long computeNotAvailableDuration(final long currentLatency) {
    //倒叙遍历latencyMax
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        //选择broker延迟时间对应的broker不可用时间,默认30000对应的故障延迟的时间为600000,即10分钟
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }

    return 0;
}

latencyMax为延迟等级, notAvailableDuration为隔离时间。之间的关系:

在这里插入图片描述

2.7.2 updateFaultItem更新故障表

该方法更新LatencyFaultToleranceImpl维护的faultItemTable集合属性中的异常broker的故障信息, 将会设置发送消息的延迟时间currentLatency属性, 以及下一个可用时间点LatencyFaultToleranceImpl属性。

下次可用时间LatencyFaultToleranceImpl属性= 现在的时间 + 隔离的时间, 在selectOneMessageQueue方法选取消息队列的时候, 如果开启了故障转移, 那么会查找下一个可用时间点小于当前时间点的broker的队列来发送消息。

在这里插入图片描述

/**
 * LatencyFaultToleranceImpl的方法
 *
 * @param name                 brokerName
 * @param currentLatency       当前延迟
 * @param notAvailableDuration 隔离时间(不可用时间)
 */
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    //获取该broker此前的故障记录数据
    FaultItem old = this.faultItemTable.get(name);
    //如果此前没有数据,那么设置一个新对象肌凝乳
    if (null == old) {
        final FaultItem faultItem = new FaultItem(name);
        //设置当前延迟
        faultItem.setCurrentLatency(currentLatency);
        //设置下一次可用时间点
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        //已有故障记录,更新
        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if (old != null) {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    } else {
        //已有故障记录,更新
        old.setCurrentLatency(currentLatency);
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    }
}

3.总结

  1. 生产者消息重试: RocketMQ的消费者消息重试和生产者消息重投。

  2. 生产者故障转移: 通过sendLatencyFaultEnable属性配置是否开启, 目的是为了保证每次发送消息成功, 是一种高可用手段。

    1. 延迟时间的故障转移, 需要sendLatencyFaultEnable为true, 消息队列选择时候, 可用过滤mq认为不可用的broker, 以免不断为宕机的broker发送消息, 选取一个延迟比较短的broker, 实现消息发送高可用。
    2. 没有开启延迟时间的故障转移的时候, 在轮询选择mq的时候, 不会选择上一次发送失败的broker, 实现消息发送高可用。
  3. Vip通道: VIP通道用于隔离读写操作, 消费者拉取消息只能请求普通通道, 生产者可用选择vip或者普通通道。

  4. 故障转移表: RocketMQ的Producer生产者故障转移依赖于故障转移表实现, 是一个HasmMap。消息发送结束之后, 会根据本次发送消息的延迟时间currentLatency, 计算该broker的隔离时间duration, 即为broker的下一次可用时间点。然后更新故障记录表。故障转移表的key为brokerName, value为未来该broker可用时间。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/688742.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

css基础知识十:介绍一下CSS中的Grid网格布局?

一、是什么 Grid 布局即网格布局&#xff0c;是一个二维的布局方式&#xff0c;由纵横相交的两组网格线形成的框架性布局结构&#xff0c;能够同时处理行与列 擅长将一个页面划分为几个主要区域&#xff0c;以及定义这些区域的大小、位置、层次等关系 这与之前讲到的flex一维…

操作系统—内存管理

单片机是没有操作系统的&#xff0c;每次写完代码都是通过一些工具将程序直接烧录进去&#xff0c;这样程序才能跑起来。单片机的CPU是直接操作内存的物理地址。在这种情况下&#xff0c;要想在内存中同时运行两个程序是不可能的&#xff0c;程序会崩溃。那么操作系统为了解决这…

LLM相关的一些调研

Prompt Engine 可以参考该项目&#xff0c;该项目提供关于提示词书写的规则。由openai以及吴恩达完成。 https://github.com/datawhalechina/prompt-engineering-for-developers由于目前chatgpt 无法直接在国内访问&#xff0c;推荐在claude on slack上尝试。关于claude api h…

Leetcode:1035. 不相交的线、53. 最大子数组和(C++)

目录 1035. 不相交的线 题目描述&#xff1a; 实现代码与解析&#xff1a; 动态规划 原理解析&#xff1a; 53. 最大子数组和 题目描述&#xff1a; 实现代码与解析&#xff1a; 动态规划 原理思路&#xff1a; 1035. 不相交的线 题目描述&#xff1a; 在两条独立的水…

移动端永不过时的高薪技术岗位,原来是它……

随着 Android 设备的普及和应用领域的不断扩大&#xff0c;Android Framework 开发需求量将会持续增长&#xff0c;并且会越来越多地向行业、企业级应用和系统优化等方向发展。以下是一些 Android Framework 开发相关的应用场景&#xff1a; 1. 特定垂直领域的智能设备&#x…

Jmeter性能测试

一、jmeter多并发 1.线程设置&#xff1a; 线程数——多少个虚拟用户 ramp_up时间(秒)——时间&#xff0c;设置时间内将线程都跑完 循环次数——勾选永远&#xff0c;就一直跑&#xff0c;直到手动停止&#xff1b;输入数字&#xff0c;就是循环多少次 2.jmeter逻辑分支控制…

关于MySQL性能优化方案,掌握这一篇就够了!

目录 前言 一、设置索引 1、索引的优缺点: 2、给表列创建索引 3、查看索引 4、删除索引&#xff1a; 5、索引原理&#xff1a; 二、分类讨论 三、针对偶尔很慢的情况 1、 数据库在刷新脏页&#xff08;flush&#xff09; 2. 拿不到锁我能怎么办 四、针对一直都这…

力扣题库刷题笔记16--最接近的三数之和

1、题目如下&#xff1a; 2、个人Python代码实现 本题的思路应该与很早之前刷的第15题三数之和是一个思路&#xff1a; 1、先将数组排序&#xff0c;然后进行遍历数组 2、确定左指针、右指针 3、判断三个数之和是否接近目标值 4、重点是&#xff0c;为确保左右指针不是同一个元…

Transformer回归预测

一、Attention is all you need——李沐论文精读Transformer 论文地址&#xff1a; https://arxiv.org/pdf/1706.03762.pdf Transformer论文逐段精读【论文精读】 卷积神经网络对较长的序列难以建模&#xff0c;因为他每次看一个比较小的窗口&#xff0c;如果两个像素隔得比较…

6.STM32时钟系统

1.时钟系统框图: HSI&#xff1a;高速的内部时钟->8MHz;HSE&#xff1a;外部高速时钟->8MHz;PLL&#xff1a; 锁相环->用于倍频(放大频率)&#xff1b;CSS&#xff1a;时钟监控系统(一旦检测到HSE(外部晶振)失败&#xff0c;将会自动切换系统时钟源HSI)&#xff1b;LS…

高等数学函数的性质

&#xff08;本文内容为个人笔记分享&#xff09; 牛顿二项公式 ( x y ) n ∑ k 0 n C n k ⋅ x n − k y k (xy)^n\stackrel{n}{\sum\limits_{k0}}C^k_n\sdot x^{n-k}y^k (xy)nk0∑​n​Cnk​⋅xn−kyk. 映射 f : X → Y f:X\rightarrow Y f:X→Y&#xff0c; f f f 为 …

成功解决RuntimeError:Unable to find a valid cuDNN algorithm to run convolution

该错误有可能是由于GPU不足导致的 有两种解决方法: 方法一&#xff1a;指定device 在指定device时&#xff0c;没有指定具体的卡 只用了如下代码 device torch.device("cuda" if torch.cuda.is_available() else "cpu")默认使用了index0的卡&#xff0…

CppUTest——【由JUnit移植过来的】C++单元测试框架——的下载安装

C单元测试框架CppUTest的下载与安装 简介下载地址单元测试框架下载单元测试被测工程下载 安装安装Cygwin下载地址安装步骤手动安装CMake 编译单元测试框架CppUTest 导入到Virtual Studio准备条件根据VS版本选择导入对应的.sln文件 简介 CppUnit是【由JUnit移植过来的】C测试框…

每日一练 | 华为认证真题练习Day65

1、如果一个以太网数据帧的Length/Type0x8100&#xff0c;那么这个数据帧的载荷不可能是&#xff1f;&#xff08;多选&#xff09; A. ARP应答报文 B. OSPF报文 C. RSTP数据帧 D. STP数据帧 2、路由器某接口配置信息如下&#xff0c;则此端口可以接收携带哪个VLAN的数据包…

《C++ Primer》--学习8

vector 对象是如何增长的 当不得不获取新的内存空间时&#xff0c;vector 和 string 的实现通常会分配比新的空间需求更大的内存空间&#xff0c;容器预留这些空间作为备用&#xff0c;这样就不用每次添加新元素都重新分配容器的内存空间了 管理容器的成员函数 capacity 和 …

高速电路设计系列分享-带宽和动态范围

目录 概要 整体架构流程 技术名词解释 1.带宽 2.动态范围 小结 概要 提示&#xff1a;这里可以添加技术概要 本文继续熟悉一些基本概念。 在许多技术领域,我们习惯于把技术进步与更高的速率关联起来:从以太网到无线局域网再到蜂窝移动网络&#xff0c;数据通信的实质就是不断…

MySQL8.0(Win)的安装步骤

MySQL8.0&#xff08;Win&#xff09;的安装步骤 MySql8.0 安装网址MySql8.0 安装界面界面一界面二界面三界面四界面五界面六界面七界面八界面十界面十一 查看 MySql8.0 安装结果配置MySql8.0的环境变量Path使用命令行访问MySQL的安装结果使用命令行操作MySQL数据库显示MySQL中…

Fiddler 简单抓包

文章目录 一、Fiddler 简介二、下载 Fiddler三、功能介绍1、Filters2、Inspectors3、Automatic BreakPoints4、TextWizard5、其他常用功能 一、Fiddler 简介 Fiddler是一个http协议调试代理工具&#xff0c;它能够记录并检查所有你的电脑和互联网之间的http通讯&#xff0c;设…

计算机由于找不到msvcr120.dll无法执行代码的解决方法分享

运行软件程序或游戏的时候&#xff0c;计算机提示由于找不到msvcr120.dll无法执行代码是怎么回事呢&#xff1f;msvcr120.dll是Microsoft Visual C的一部分&#xff0c;用于在Windows操作系统上运行C应用程序。它是一个动态链接库文件&#xff0c;包含了许多C运行时库&#xff…

【日志1】rsyslog,logrotate,post_code,journalctl

文章目录 1.rsyslog&#xff1a;rsyslogd一个进程 &#xff0c;管理每个进程发来的log并往/var/log里写&#xff0c;syslog函数将log写给rsyslogd进程&#xff0c;rsyslogd -v2.logrotate&#xff1a;logrotate /etc/logrotate.rsyslog&#xff08;bb中重命名&#xff09;3.pos…