RocketMQ5.0.0消息发送

news2025/1/16 8:19:34

一、消息

消息实体类为org.apache.rocketmq.common.message.Message,其主要属性如下。

// 消息所属topic
private String topic;
// 消息Flag(RocketMQ不作处理),即:用户处理
private int flag;
// 扩展属性
private Map<String, String> properties;
// 消息体
private byte[] body;
// 事务ID
private String transactionId;

二、消息生产者

1.生产者UML

默认消息生产者org.apache.rocketmq.client.producer.DefaultMQProducer。消息生产者两个实现类:默认生产者DefaultMQProducer、事务消息生产者TransactionMQProducer

2.DefaultMQProducer关键属性

// 封装生产者内部实现方法
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;

// 生产组,默认DEFAULT_PRODUCER
private String producerGroup;
// 创建消息队列的数量,默认4
private volatile int defaultTopicQueueNums = 4;
// 发送消息超时时间,默认3s
private int sendMsgTimeout = 3000;
// 消息体大小超出4K则压缩消息体
private int compressMsgBodyOverHowmuch = 1024 * 4;
// 同步模式下最大发送消息次数
private int retryTimesWhenSendFailed = 2;
// 异步模式下最大发送消息次数
private int retryTimesWhenSendAsyncFailed = 2;
// 发送失败时,是否选择任意一个Broker发送,默认否
private boolean retryAnotherBrokerWhenNotStoreOK = false;
// 最大发送消息体的大小,默认4MB
private int maxMessageSize = 1024 * 1024 * 4;

3.发送消息方法

RocketMQ支持3种消息发送方式:

  • 同步(sync):发送消息时,同步等待直到消息服务器返回发送结果。

  • 异步(async):发送消息时,指定消息发送成功后的回调函数,然后发送,立即返回,消息发送者线程不阻塞,消息发送成功或失败的回调任务在一个新的线程中执行。

  • 单向(oneway):发送消息后直接返回,不等待Broker的结果,也不注册回调函数,即:只发送,不在乎消息是否成功存储到Broker

以下是消息发送方法,其中:

  • SendCallback sendCallback:含有SendCallback类的,是指定回调方法,属于异步方式发送消息;

  • MessageQueue messageQueue:指定发送消息存储到哪个消费队列;

  • MessageQueueSelector selector:消息队列选择器,指定发送哪个消费队列;

  • long timeout:发送超时时间;

  • Object arg:传入到MessageQueueSelector的用户参数;

  • Collection<Message> msgs:批量发送消息;

  • sendMessageInTransaction()方法:发送事务消息,生产者为TransactionMQProducer

  • sendOneway():单向方式发送消息。

三、生产者启动流程

生产者启动方法org.apache.rocketmq.client.producer.DefaultMQProducer#start,其方法调用链如下。

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)是核心方法,如下代码所示。DefaultMQProducerImpl维护生产者示例的ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable(主题映射的发布信息)

// 启动生产者
public void start(final boolean startFactory) throws MQClientException {
    // 默认仅创建,不启动,即:创建DefaultMQProducerImpl.serviceState = CREATE_JUST
    switch (this.serviceState) {
        // 仅创建,不启动
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            // 检查生产组名称是否符合规范:最大长度为255,不含特殊字符
            this.checkConfig();
            // 生产者的instanceName为进程ID
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }

            /*
                获取或创建MQClientInstance:
                a. 当前生产者获取clientId
                b. 根据clientId获取或创建MQClientInstance
             */
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

            // 当前生产者注册到MQClientInstance(维护所有生产者)
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            // 当前生产者,topic发布信息
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            // 启动MQClientInstance
            if (startFactory) {
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

    RequestFutureHolder.getInstance().startScheduledTask(this);

}

注意,整个JVM实例中只存在一个MQClientManager去维护一个MQClientlnstance缓存表ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable,即:同一个clientId只会创建一个MQClientInstance。而clientId组成:当前生产者服务器IP + instanceName(PID#时间戳) + unitName( 可选)。从clientId组成看出,单个JVM有且只有一个MQClientInstance实例。多个生产者和多个消费者共用一个MQClientInstance实例

如下代码所示MQClientInstance维护的不同容器。示例启动后,定时任务周期维护生产者和消费者的路由信息、Topic发布信息等

// 生产者容器
private final ConcurrentMap<String/* 生产组 */, MQProducerInner> producerTable = new ConcurrentHashMap<>();
// 消费者容器
private final ConcurrentMap<String/* 消费组 */, MQConsumerInner> consumerTable = new ConcurrentHashMap<>();
// adminExt容器
private final ConcurrentMap<String, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<>();
// 主题的路由信息容器
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<>();
// 主题消费队列容器
private final ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, String/*brokerName*/>> topicEndPointsTable = new ConcurrentHashMap<>();

四、消息发送流程

以下代码是生产者发送消息SendResult send(Message msg, long timeout) 方法为入口讲解。其发送消息的核心实现方法是org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl,其调用链如下图所示。发送主要步骤是:验证消息、查找topic路由信息、选择消息队列、消息发送(包括异常处理)。

1.验证消息

org.apache.rocketmq.client.Validators#checkMessage方法检查消息是否符合规范:

  • 主题是否规范:Topic不能为空字符串、长度不能超出127

  • 消息体是否规范:消息体不能null、不能为空字符串、长度不能超出4MB

// 检查消息是否符合规范:topic规范、消息体不能为空、消息体长度默认不能大于4MB
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
    if (null == msg) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    // topic
    Validators.checkTopic(msg.getTopic());
    Validators.isNotAllowedSendTopic(msg.getTopic());

    // body
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }

    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }

    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
            "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
    }
}

2.查找主题路由信息

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo方法是获取主题路由信息。从生产者本地缓存的topicPublishInfoTable表获取,没有找到则尝试创建或更新到NameServer,则再从NameServer查找,若获取不到,则根据isDefault是否采用默认主题路由来获取,再找不到,则抛出异常

获取主题路由信息TopicPublishInfo的目的是:最终获取topic下该生产组的Broker路由信息TopicRouteData(获取到Broker地址及存储的消费队列)

/**
 * 获取主题路由发布信息
 * 获取逻辑:
 * step1: 本地生产者有缓存该topic路由信息和消息队列,则直接返回
 * step2: 本地生产者没有缓存,则从NameServer查找主题路由信息
 * step3:         没有缓存,从NameServer查找不到,则isDefault是否采用默认主题路由(defaultMQProducer.getCreateTopicKey() —— AUTO_CREATE_TOPIC_KEY_TOPIC)
 * @param topic 主题
 * @return 主题发布信息
 */
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // 获取生产者缓存的主题发布信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    // 生产者没有主题发布信息或没有消息队列,则创建并更新NameServer主题路由信息
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        // 本地生产者创建
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 更新NameServer主题路由信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    // 本地有缓存,则直接获取
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        // isDefault为true,采用默认主题发送消息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

3.选择消息队列

根据获取的主题发布信息,进而选择消息队列。当选择消息队列时,生产者和NameServer并不能立即知道Broker是否可用,原因是

  • NameServer检测Broker是否可用是有延迟的,最短为一次心跳检测间隔(5s);

  • 生产者每隔30s更新一次路由信息,NameServer不会检测到Broker岩机后立刻推送消息给生产者;

因此采用发送消息重试机制(同步次数:retryTimesWhenSendFailed,异步次数:retryTimesWhenSendAsyncFailed),同时引入一种机制,在Broker宕机期间,如果一次消息发送失败后,可以将该Broker暂时排除在消息队列的选择范围中。sendLatencyFaultEnable配置是否启用Broker故障延迟机制,默认不启用

1). 默认机制

没有启用Broker故障延迟机制,即:sendLatencyFaultEnable为false,采用默认机制,其实现方法org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String),其代码如下。

其算法是:没有失败的Broker,则随机获取一个;有失败的Broker,排除上次失败的Broker。该算法主要是获取上次发送失败的Broker,该缺点是:一个Broker可能有多个消费队列,下次选择可能还是失败Broker队列。则引入了Broker故障延迟机制

/**
 * 选择一个消息队列
 * @param lastBrokerName 上次发送失败的brokerName
 * @return 发送消息的消息队列
 */
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    // 没有broker发送失败
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    }
    // 有broker发送失败
    else {
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            // sendWhichQueue自增长再获取,原子操作
            int index = this.sendWhichQueue.incrementAndGet();
            // 取模,获取消息队列索引值
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            // 跳过失败的broker,该算法缺陷:多次获取到失败的broker(宕机broker中多个消息队列)
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}

public MessageQueue selectOneMessageQueue() {
    // sendWhichQueue自增长再获取,原子操作
    int index = this.sendWhichQueue.incrementAndGet();
    // 取模,获取消息队列索引值
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}

2).Broker故障延迟机制

Broker故障延迟机制核心类,如下:

启用Broker故障延迟机制,即:sendLatencyFaultEnable为true,则选择消息队列的核心方法是org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue,其代码如下。

/**
 * 选择一个消息队列
 * step1:是否启用故障延迟机制sendLatencyFaultEnable,默认false,不启用
 * step2:若启用,判断上次Broker是否可用
 * @param tpInfo 主题路由信息
 * @param lastBrokerName 上次发送失败的brokerName
 * @return 发送消息的消息队列
 */
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    // sendLatencyFaultEnable为true时,启动Broker故障延迟机制;默认false,不启用
    if (this.sendLatencyFaultEnable) {
        try {
            int index = tpInfo.getSendWhichQueue().incrementAndGet();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);

                // 判断broker是否可用,判断依据:当前时间是否在不可用持续时间(notAvailableDuration)范围内
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    return mq;
            }

            // 从失败的broker中选择一个可用的broker,没有则返回null
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                }
                return mq;
            } else {
                // 移除可用的失败broker
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }

    // 不启用Broker故障延迟机制
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

4.消息发送

1).生产者消息发送

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl是消息发送核心方法,其代码如下。

/**
 * 发送消息
 * step1:从MQClientInstance获取broker地址,若没有,则从NameServer获取,再没有则抛出异常;
 * step2:不是批量发送消息,则为消息分配一个全局消息ID;
 * step3:设置消息是否压缩,消息体 > 4KB,则压缩;
 * step4:是否是事务Prepare消息,若是写入事务prepare标签;
 * step5:执行发送前钩子函数;
 * step6:根据不同发送方式,发送消息;
 * step7:执行发送后钩子函数;
 * @param msg 待发送消息
 * @param mq 选择的消息队列(消息发送到该队列)
 * @param communicationMode 发送消息模式,如:同步、异步、单向
 * @param sendCallback 异步发送消息回调函数
 * @param topicPublishInfo topic发布路由信息
 * @param timeout 发送超时时间
 * @return 发送结果
 * @throws MQClientException
 * @throws RemotingException
 * @throws MQBrokerException
 * @throws InterruptedException
 */
private SendResult sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();

    // 从MQClientInstance获取broker地址,若没有,则从NameServer获取,再没有则抛出异常
    String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
    if (null == brokerAddr) {
        // 从NameServer获取
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
    }

    SendMessageContext context = null;
    if (brokerAddr != null) {
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

        byte[] prevBody = msg.getBody();
        try {
            //for MessageBatch,ID has been set in the generating process
            // 不是批量发送消息,则为消息分配一个全局消息ID
            if (!(msg instanceof MessageBatch)) {
                MessageClientIDSetter.setUniqID(msg);
            }

            // 设置消息instanceId
            boolean topicWithNamespace = false;
            if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                topicWithNamespace = true;
            }

            // 设置消息是否压缩,消息体 > 4KB,则压缩
            int sysFlag = 0;
            boolean msgBodyCompressed = false;
            if (this.tryToCompressMessage(msg)) {
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG; // |= ,是二进制位或运算,如:a = a | b
                sysFlag |= compressType.getCompressionFlag();
                msgBodyCompressed = true;
            }

            // 是否是事务Prepare消息
            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (Boolean.parseBoolean(tranMsg)) {
                sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; // 事务消息标签
            }

            // 判断是否注册了禁止消息发送钩子函数,即:判定checkForbiddenHookList不为空
            if (hasCheckForbiddenHook()) {
                CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                checkForbiddenContext.setCommunicationMode(communicationMode);
                checkForbiddenContext.setBrokerAddr(brokerAddr);
                checkForbiddenContext.setMessage(msg);
                checkForbiddenContext.setMq(mq);
                checkForbiddenContext.setUnitMode(this.isUnitMode());
                this.executeCheckForbiddenHook(checkForbiddenContext);
            }

            // 判断是否注册了消息发送钩子函数,即:判定sendMessageHookList不为空
            // 注册钩子函数:DefaultMQProducerImpl.registerSendMessageHook
            if (this.hasSendMessageHook()) {
                context = new SendMessageContext();
                context.setProducer(this);
                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                context.setCommunicationMode(communicationMode);
                context.setBornHost(this.defaultMQProducer.getClientIP());
                context.setBrokerAddr(brokerAddr);
                context.setMessage(msg);
                context.setMq(mq);
                context.setNamespace(this.defaultMQProducer.getNamespace());
                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (isTrans != null && isTrans.equals("true")) {
                    context.setMsgType(MessageType.Trans_Msg_Half);
                }

                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                    context.setMsgType(MessageType.Delay_Msg);
                }
                this.executeSendMessageHookBefore(context);
            }

            // 创建发送消息请求头
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); // 默认主题在该broker的队列数
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setSysFlag(sysFlag); // 系统标记
            requestHeader.setBornTimestamp(System.currentTimeMillis()); // 消息发送时间
            requestHeader.setFlag(msg.getFlag()); // 消息标记,目前不做处理
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            requestHeader.setReconsumeTimes(0); // 消息重试次数
            requestHeader.setUnitMode(this.isUnitMode());
            requestHeader.setBatch(msg instanceof MessageBatch); // 是否批量消息
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                if (reconsumeTimes != null) {
                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                }

                String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                if (maxReconsumeTimes != null) {
                    requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                }
            }

            SendResult sendResult = null;
            switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    if (msgBodyCompressed) {
                        //If msg body was compressed, msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        msg.setBody(prevBody);
                    }

                    if (topicWithNamespace) {
                        if (!messageCloned) {
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                    }

                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        brokerName,
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.mQClientFactory,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                        context,
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    // 发送消息是否超时
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    // 发送消息请求
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        brokerName,
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            }

            // 执行发送后钩子函数
            if (this.hasSendMessageHook()) {
                context.setSendResult(sendResult);
                this.executeSendMessageHookAfter(context);
            }

            return sendResult;
        } catch (RemotingException e) {
            // 执行发送后钩子函数
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (MQBrokerException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (InterruptedException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } finally {
            msg.setBody(prevBody);
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
    }

    // 没有broker地址,则抛出异常
    throw new MQClientException("The broker[" + brokerName + "] not exist", null);
}

执行发送消息请求,根据同步、异步、单向发送方式进行网络传输,其发送消息请求码都是RequestCode.SEND_MESSAGE。其实现方法是org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage,其代码如下。

/**
 * 发送消息请求
 * @param addr broker地址
 * @param brokerName broker名称
 * @param msg 待发送消息 {@link Message}
 * @param requestHeader 发送消息请求头 {@link SendMessageRequestHeader}
 * @param timeoutMillis 发送超时时间
 * @param communicationMode 发送消息模式,如:同步、异步、单向
 * @param sendCallback 异步发送消息回调函数 {@link SendCallback}
 * @param topicPublishInfo topic发布路由信息 {@link TopicPublishInfo}
 * @param instance {@link MQClientInstance}
 * @param retryTimesWhenSendFailed 重试次数
 * @param context {@link SendMessageContext}
 * @param producer {@link DefaultMQProducerImpl}
 * @return
 * @throws RemotingException
 * @throws MQBrokerException
 * @throws InterruptedException
 */
public SendResult sendMessage(
    final String addr,
    final String brokerName,
    final Message msg,
    final SendMessageRequestHeader requestHeader,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final MQClientInstance instance,
    final int retryTimesWhenSendFailed,
    final SendMessageContext context,
    final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    RemotingCommand request = null;

    // 根据消息类型,判断发送请求码
    String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
    boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
    if (isReply) {
        if (sendSmartMsg) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
        }
    } else {
        // 批量消息发送,请求码SEND_BATCH_MESSAGE
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        }
        // 单个消息发送,请求码SEND_MESSAGE
        else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }
    }
    request.setBody(msg.getBody()); // 请求体,就是消息体

    /*
        发送消息请求到broker端
        broker端处理发送消息总入口:SendMessageProcessor.processRequest
     */
    switch (communicationMode) {
        case ONEWAY:
            this.remotingClient.invokeOneway(addr, request, timeoutMillis);

            return null;
        case ASYNC:
            final AtomicInteger times = new AtomicInteger();

            long costTimeAsync = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTimeAsync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }

            this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                retryTimesWhenSendFailed, times, context, producer);

            return null;
        case SYNC:

            long costTimeSync = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTimeSync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }

            return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
        default:
            assert false;
            break;
    }

    return null;
}

2).Broker接收发送消息

org.apache.rocketmq.broker.processor.SendMessageProcessor是Broker接收消息的实现类,其代码如下。

// broker处理发送消息的请求
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    SendMessageContext traceContext;
    switch (request.getCode()) {
        // 消费段消费ACK确认发送请求入口MQClientAPIImpl.consumerSendMessageBack
        case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.consumerSendMsgBack(ctx, request);
        default:
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
                return null;
            }
            // broker的topic映射队列上下文
            TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true);
            RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
            if (rewriteResult != null) {
                return rewriteResult;
            }
            traceContext = buildMsgContext(ctx, requestHeader);
            String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
            traceContext.setCommercialOwner(owner);

            // 执行发送前钩子函数
            try {
                this.executeSendMessageHookBefore(ctx, request, traceContext);
            } catch (AbortProcessException e) {
                final RemotingCommand errorResponse = RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage());
                errorResponse.setOpaque(request.getOpaque());
                return errorResponse;
            }

            RemotingCommand response;
            // 批量消息
            if (requestHeader.isBatch()) {
                response = this.sendBatchMessage(ctx, request, traceContext, requestHeader, mappingContext,
                    (ctx1, response1) -> executeSendMessageHookAfter(response1, ctx1));
            }
            // 单个消息
            else {
                response = this.sendMessage(ctx, request, traceContext, requestHeader, mappingContext,
                    (ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));
            }

            return response;
    }
}

五、参考资料

https://www.cnblogs.com/wzh2010/p/16629876.html

https://blog.csdn.net/m0_37543627/article/details/128542505

https://blog.csdn.net/m0_37543627/article/details/128551723

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/187594.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

零代码实现EDI标准报文转换

在与客户进行沟通的时候&#xff0c;经常有客户对EDI实施很感兴趣&#xff0c;一方面是客户具有相应的IT基础和技术力量&#xff0c;并且后续可能会有更多合作伙伴的EDI接入&#xff0c;因此客户有自主实施的想法&#xff1b;另一方面也可以在一定程度上为企业节约成本。 知行…

谷歌seo排名需要的链接数量?谷歌seo排名需要多久?

本文主要分享要实现谷歌排名需要多少条英文外链&#xff0c;以及时间成本的预估。 本文由光算创作&#xff0c;有可能会被修改或剽窃&#xff0c;我们佛系对待这种行为吧。 谷歌seo排名需要的链接数量是多少&#xff1f; 答案是&#xff1a;需要1000~2000条GPB外链 为什么一…

对数据中台的梳理与思考

Gartmer:《数据中台在中国已经接近炒作的顶峰》 PowerData&#xff1a;接近顶峰?那就说明还有上升的空间嘛 本篇文章聊聊数据中台爆火背后的逻辑。 一、概念篇 1、什么是中台 中台是将系统的通用化能力进行打包整合&#xff0c;通过接口的形式赋能到外部系统&#xff0c;从而…

嵌入式Linux-线程的回收/取消/分离

1. 线程的回收 1.1 回收线程的概念 春节七天连假已经过完啦&#xff0c;也该回收一下我们放假的线程了&#xff01; 听过很多回收旧手机、旧冰箱和旧彩电…&#xff0c;那么回收线程又是什么呢&#xff1f; 在父、子进程当中&#xff0c;父进程可通过 wait()函数&#xff08;…

尚硅谷谷粒商城Rabbit MQ

文章目录1. 概述2. 相关概念2.1 RabbitMQ简介&#xff1a;2.2核心概念2.2.1 Message2.2.2 Publisher2.2.3 Exchange2.2.4 Queue2.2.5 Binding2.2.6Connection2.2.7 Channel2.2.8 Consumer2.2.9Virtual Host2.2.10Broker3.Docker安装rabbit MQ4、RabbitMQ运行机制4.1AMQP 中的消…

【信管10.2】规划识别风险及定性分析

规划识别风险及定性分析了解完风险相关的知识以及项目风险的管理过程之后&#xff0c;我们就进入到每个风险过程的学习。风险管理过程的内容并不算少&#xff0c;直逼范围、进度、成本、质量四大核心模块&#xff0c;也是我们需要重点关注的内容。当年的论文我写得就是风险管理…

IDEA中Maven打包遇到的问题

问题1 问题描述 使用Maven进行打包&#xff0c;点击package&#xff0c;Run控制台的信息出现中文乱码的情况 解决方法 -DarchetypeCataloginternal -Dfile.encodingGBK问题2 问题描述 程序能够正常运行&#xff0c;但是使用Maven对程序进行打包&#xff0c;在编译过程中出现…

注册Github账号详细教程【超详细篇 适合新手入门】

前言 &#x1f4dc; “ 作者 久绊A ” 专注记录自己所整理的Java、web、sql等&#xff0c;IT技术干货、学习经验、面试资料、刷题记录&#xff0c;以及遇到的问题和解决方案&#xff0c;记录自己成长的点滴 目录 一、GitHub的简介 1、大概介绍 2、详细介绍 二、如何注册自己…

算法训练营 day29 回溯算法 组合总和III 电话号码的字母组合

算法训练营 day29 回溯算法 组合总和III 电话号码的字母组合 组合总和III 216. 组合总和 III - 力扣&#xff08;LeetCode&#xff09; 找出所有相加之和为 n 的 k 个数的组合&#xff0c;且满足下列条件&#xff1a; 只使用数字1到9 每个数字 最多使用一次 返回 所有可能的…

16.Map系列、集合嵌套、不可变集合

目录 一.Map 1.1 Map集合概述 1.2 Map集合体系 1.3 Map集合体系特点 1.4 Map集合实现类特点 1.5 Map集合的API 1.6 Map集合的遍历方式 1.6.1 键找值的方式遍历 1.6.2 键值对的方式遍历 1.6.3 Lambda表达式的方式 1.7 HashMap 1.7.1 HashMap的特点 1.7.2 底层原理 …

python求不同分辨率图像的峰值信噪比,一文搞懂

可以使用 Python 的 NumPy 和 OpenCV 库来实现这个任务。提前准备一张图片作为素材。 文章目录什么是峰值信噪比PSNR 峰值信噪比补充说明使用 OpenCV 库来实现这个任务PSNR 的计算值受图像的亮度影响计算不同分辨率图像的 PSNRpython 求不同分辨率图像的峰值信噪比 | 其他知识点…

Java面试题:finalize的原理和工作缺点是什么

finalize是 Object 中的一个方法&#xff0c;如果子类重写它&#xff0c;垃圾回收时此方法会被调用&#xff0c;可以在其中进行资源释放和清理工作。其次将资源释放和清理放在 finalize 方法中非常不好&#xff0c;非常影响性能&#xff0c;严重时甚至会引起 OOM&#xff0c;从…

LabVIEW对NI Linux RT应用程序性能进行基准测试

LabVIEW对NI Linux RT应用程序性能进行基准测试如果应用程序具有苛刻的性能要求&#xff0c;则应为应用程序创建性能基准测试&#xff0c;以确保它满足性能要求。性能要求高度依赖于应用程序&#xff0c;应确定哪些性能指标很重要。下面介绍了典型的实时应用程序性能指标。如果…

USBIP

USBIP USB/IP 是一个开源项目&#xff0c;已合入 Kernel&#xff0c;在 Linux 环境下可以通过使用 USB/IP 远程共享 USB 设备。 USB Client&#xff1a;使用USB的终端&#xff0c;将server共享的usb设备挂载到本地。 USB Server&#xff1a;分享本地的usb设备至远程。 USBIP…

Python的入门知识汇集

创建 Python的创始人为Guido van Rossum。1989年圣诞节期间,Guido为了打发圣诞节的无趣,决心开发一个新的脚本解释程序,做为ABC 语言的一种继承。之所以选中Python(大蟒蛇的意思)作为程序的名字,是因为他是一个叫Monty Python的喜剧团体的爱好者。 什么是Pyhton Pytho…

委派模式——从SLF4J说起

作者&#xff1a;vivo 互联网服务器团队- Xiong yangxin 将某个通用解决方案包装成成熟的工具包&#xff0c;是每一个技术建设工作者必须思考且必须解决的问题。本文从业内流行的既有工具包入手&#xff0c;解析实现思路&#xff0c;沉淀一般方法。为技术建设的初学者提供一些实…

Gorm连接以及CURD实战+测试

Gorm CRUD 前言 Gorm是go的一个ORM框架&#xff0c;官方文档地址为-> GORM 指南 本文将介绍与gorm有关的CRUD操作&#xff0c;操作数据库类型为mysql数据库 数据库连接 func Open(dialector Dialector, opts …Option) (db *DB, err error) 该函数用于进行gorm连接对应…

中国市场手机出货量跌穿3亿部,苹果也顶不住了,只有三星暗爽

多家市调机构都给出了2022年中国智能手机市场的数据&#xff0c;数据虽然有些出入&#xff0c;不过都认为中国市场的手机出货量跌穿了3亿部&#xff0c;创下近10年来的新低纪录&#xff0c;国产手机尤其惨&#xff0c;而曾逆势增长的苹果也开始出现下滑。市调机构IDC给出的数据…

词法分析器Flex学习1 - Flex识别关键字

以前曾写过2篇Flex和Bison入门应用的文章&#xff1b; https://blog.csdn.net/bcbobo21cn/article/details/112343850 https://blog.csdn.net/bcbobo21cn/article/details/106193648 我只记得Flex是词法分析器&#xff0c;Bison是语法分析器&#xff1b; 只是一些入门的介绍&…

基于SSM+Layui的图书管理系统(Java版 附源代码及数据库)

目录 功能要求 技术栈 项目架构 登录界面 系统首页 借阅管理 图书管理 读者管理 类型管理 公告管理 管理员管理 统计分析 数据库设计 源代码数据库资料 毕设项目专栏 功能要求 &#xff08;1&#xff09;对系统登陆后进行增删改查功能 &#xff08;2&#xff09;…