RocketMQ5.0--消息发送

news2024/9/23 11:24:15

RocketMQ5.0–消息发送

一、消息

// 消息所属topic
private String topic;
// 消息Flag(RocketMQ不作处理),即:用户处理
private int flag;
// 扩展属性
private Map<String, String> properties;
// 消息体
private byte[] body;
// 事务ID
private String transactionId;

二、消息生产者

1.生产者UML

默认消息生产者org.apache.rocketmq.client.producer.DefaultMQProducer。消息生产者两个实现类:默认生产者DefaultMQProducer、事务消息生产者TransactionMQProducer。

在这里插入图片描述
2.DefaultMQProducer关键属性

// 封装生产者内部实现方法
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
 
// 生产组,默认DEFAULT_PRODUCER
private String producerGroup;
// 创建消息队列的数量,默认4
private volatile int defaultTopicQueueNums = 4;
// 发送消息超时时间,默认3s
private int sendMsgTimeout = 3000;
// 消息体大小超出4K则压缩消息体
private int compressMsgBodyOverHowmuch = 1024 * 4;
// 同步模式下最大发送消息次数
private int retryTimesWhenSendFailed = 2;
// 异步模式下最大发送消息次数
private int retryTimesWhenSendAsyncFailed = 2;
// 发送失败时,是否选择任意一个Broker发送,默认否
private boolean retryAnotherBrokerWhenNotStoreOK = false;
// 最大发送消息体的大小,默认4MB
private int maxMessageSize = 1024 * 1024 * 4;

3.发送消息方法

RocketMQ支持3种消息发送方式:

  • 同步(sync):发送消息时,同步等待直到消息服务器返回发送结果。
  • 异步(async):发送消息时,指定消息发送成功后的回调函数,然后发送,立即返回,消息发送者线程不阻塞,消息发送成功或失败的回调任务在一个新的线程中执行。
  • 单向(oneway):发送消息后直接返回,不等待Broker的结果,也不注册回调函数,即:只发送,不在乎消息是否成功存储到Broker。

以下是消息发送方法,其中:

  • SendCallback sendCallback:含有SendCallback类的,是指定回调方法,属于异步方式发送消息;
  • MessageQueue messageQueue:指定发送消息存储到哪个消费队列;
  • MessageQueueSelector selector:消息队列选择器,指定发送哪个消费队列;
  • long timeout:发送超时时间;
  • Object arg:传入到MessageQueueSelector的用户参数;
  • Collection msgs:批量发送消息;
  • sendMessageInTransaction()方法:发送事务消息,生产者为TransactionMQProducer;
  • sendOneway():单向方式发送消息。
    在这里插入图片描述
    三、生产者启动流程

生产者启动方法org.apache.rocketmq.client.producer.DefaultMQProducer#start,其方法调用链如下。
在这里插入图片描述
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)是核心方法,如下代码所示。DefaultMQProducerImpl维护生产者示例的ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable(主题映射的发布信息)。

// 启动生产者
public void start(final boolean startFactory) throws MQClientException {
    // 默认仅创建,不启动,即:创建DefaultMQProducerImpl.serviceState = CREATE_JUST
    switch (this.serviceState) {
        // 仅创建,不启动
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;
 
            // 检查生产组名称是否符合规范:最大长度为255,不含特殊字符
            this.checkConfig();
            // 生产者的instanceName为进程ID
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }
 
            /*
                获取或创建MQClientInstance:
                a. 当前生产者获取clientId
                b. 根据clientId获取或创建MQClientInstance
             */
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
 
            // 当前生产者注册到MQClientInstance(维护所有生产者)
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }
 
            // 当前生产者,topic发布信息
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
 
            // 启动MQClientInstance
            if (startFactory) {
                mQClientFactory.start();
            }
 
            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }
 
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
 
    RequestFutureHolder.getInstance().startScheduledTask(this);
 
}

注意,整个JVM实例中只存在一个MQClientManager去维护一个MQClientlnstance缓存表ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable,即:同一个clientId只会创建一个MQClientInstance。而clientId组成:当前生产者服务器IP + instanceName(PID#时间戳) + unitName( 可选)。从clientId组成看出,单个JVM有且只有一个MQClientInstance实例。多个生产者和多个消费者共用一个MQClientInstance实例。

如下代码所示MQClientInstance维护的不同容器。示例启动后,定时任务周期维护生产者和消费者的路由信息、Topic发布信息等。

// 生产者容器
private final ConcurrentMap<String/* 生产组 */, MQProducerInner> producerTable = new ConcurrentHashMap<>();
// 消费者容器
private final ConcurrentMap<String/* 消费组 */, MQConsumerInner> consumerTable = new ConcurrentHashMap<>();
// adminExt容器
private final ConcurrentMap<String, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<>();
// 主题的路由信息容器
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<>();
// 主题消费队列容器
private final ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, String/*brokerName*/>> topicEndPointsTable = new ConcurrentHashMap<>();

四、消息发送流程

以下代码是生产者发送消息SendResult send(Message msg, long timeout) 方法为入口讲解。其发送消息的核心实现方法是org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl,其调用链如下图所示。发送主要步骤是:验证消息、查找topic路由信息、选择消息队列、消息发送(包括异常处理)。
在这里插入图片描述
1.验证消息

org.apache.rocketmq.client.Validators#checkMessage方法检查消息是否符合规范:

  • 主题是否规范:Topic不能为空字符串、长度不能超出127
  • 消息体是否规范:消息体不能null、不能为空字符串、长度不能超出4MB
// 检查消息是否符合规范:topic规范、消息体不能为空、消息体长度默认不能大于4MB
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
    if (null == msg) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    // topic
    Validators.checkTopic(msg.getTopic());
    Validators.isNotAllowedSendTopic(msg.getTopic());
 
    // body
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }
 
    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }
 
    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
            "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
    }
}

2.查找主题路由信息

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo方法是获取主题路由信息。从生产者本地缓存的topicPublishInfoTable表获取,没有找到则尝试创建或更新到NameServer,则再从NameServer查找,若获取不到,则根据isDefault是否采用默认主题路由来获取,再找不到,则抛出异常。

获取主题路由信息TopicPublishInfo的目的是:最终获取topic下该生产组的Broker路由信息TopicRouteData(获取到Broker地址及存储的消费队列)。

/**
 * 获取主题路由发布信息
 * 获取逻辑:
 * step1: 本地生产者有缓存该topic路由信息和消息队列,则直接返回
 * step2: 本地生产者没有缓存,则从NameServer查找主题路由信息
 * step3:         没有缓存,从NameServer查找不到,则isDefault是否采用默认主题路由(defaultMQProducer.getCreateTopicKey() —— AUTO_CREATE_TOPIC_KEY_TOPIC)
 * @param topic 主题
 * @return 主题发布信息
 */
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // 获取生产者缓存的主题发布信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    // 生产者没有主题发布信息或没有消息队列,则创建并更新NameServer主题路由信息
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        // 本地生产者创建
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 更新NameServer主题路由信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
 
    // 本地有缓存,则直接获取
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        // isDefault为true,采用默认主题发送消息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

3.选择消息队列

根据获取的主题发布信息,进而选择消息队列。当选择消息队列时,生产者和NameServer并不能立即知道Broker是否可用,原因是:

  • NameServer检测Broker是否可用是有延迟的,最短为一次心跳检测间隔(5s);
  • 生产者每隔30s更新一次路由信息,NameServer不会检测到Broker岩机后立刻推送消息给生产者;

因此采用发送消息重试机制(同步次数:retryTimesWhenSendFailed,异步次数:retryTimesWhenSendAsyncFailed),同时引入一种机制,在Broker宕机期间,如果一次消息发送失败后,可以将该Broker暂时排除在消息队列的选择范围中。sendLatencyFaultEnable配置是否启用Broker故障延迟机制,默认不启用。

1). 默认机制

没有启用Broker故障延迟机制,即:sendLatencyFaultEnable为false,采用默认机制,其实现方法org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String),其代码如下。

其算法是:没有失败的Broker,则随机获取一个;有失败的Broker,排除上次失败的Broker。该算法主要是获取上次发送失败的Broker,该缺点是:一个Broker可能有多个消费队列,下次选择可能还是失败Broker队列。则引入了Broker故障延迟机制。

/**
 * 选择一个消息队列
 * @param lastBrokerName 上次发送失败的brokerName
 * @return 发送消息的消息队列
 */
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    // 没有broker发送失败
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    }
    // 有broker发送失败
    else {
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            // sendWhichQueue自增长再获取,原子操作
            int index = this.sendWhichQueue.incrementAndGet();
            // 取模,获取消息队列索引值
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            // 跳过失败的broker,该算法缺陷:多次获取到失败的broker(宕机broker中多个消息队列)
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}
 
public MessageQueue selectOneMessageQueue() {
    // sendWhichQueue自增长再获取,原子操作
    int index = this.sendWhichQueue.incrementAndGet();
    // 取模,获取消息队列索引值
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}

2).Broker故障延迟机制

Broker故障延迟机制核心类,如下:
在这里插入图片描述
启用Broker故障延迟机制,即:sendLatencyFaultEnable为true,则选择消息队列的核心方法是org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue,其代码如下。

/**
 * 选择一个消息队列
 * step1:是否启用故障延迟机制sendLatencyFaultEnable,默认false,不启用
 * step2:若启用,判断上次Broker是否可用
 * @param tpInfo 主题路由信息
 * @param lastBrokerName 上次发送失败的brokerName
 * @return 发送消息的消息队列
 */
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    // sendLatencyFaultEnable为true时,启动Broker故障延迟机制;默认false,不启用
    if (this.sendLatencyFaultEnable) {
        try {
            int index = tpInfo.getSendWhichQueue().incrementAndGet();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
 
                // 判断broker是否可用,判断依据:当前时间是否在不可用持续时间(notAvailableDuration)范围内
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    return mq;
            }
 
            // 从失败的broker中选择一个可用的broker,没有则返回null
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                }
                return mq;
            } else {
                // 移除可用的失败broker
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
 
        return tpInfo.selectOneMessageQueue();
    }
 
    // 不启用Broker故障延迟机制
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

4.消息发送

1).生产者消息发送

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl是消息发送核心方法,其代码如下。

/**
 * 发送消息
 * step1:从MQClientInstance获取broker地址,若没有,则从NameServer获取,再没有则抛出异常;
 * step2:不是批量发送消息,则为消息分配一个全局消息ID;
 * step3:设置消息是否压缩,消息体 > 4KB,则压缩;
 * step4:是否是事务Prepare消息,若是写入事务prepare标签;
 * step5:执行发送前钩子函数;
 * step6:根据不同发送方式,发送消息;
 * step7:执行发送后钩子函数;
 * @param msg 待发送消息
 * @param mq 选择的消息队列(消息发送到该队列)
 * @param communicationMode 发送消息模式,如:同步、异步、单向
 * @param sendCallback 异步发送消息回调函数
 * @param topicPublishInfo topic发布路由信息
 * @param timeout 发送超时时间
 * @return 发送结果
 * @throws MQClientException
 * @throws RemotingException
 * @throws MQBrokerException
 * @throws InterruptedException
 */
private SendResult sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
 
    // 从MQClientInstance获取broker地址,若没有,则从NameServer获取,再没有则抛出异常
    String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
    if (null == brokerAddr) {
        // 从NameServer获取
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
    }
 
    SendMessageContext context = null;
    if (brokerAddr != null) {
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
 
        byte[] prevBody = msg.getBody();
        try {
            //for MessageBatch,ID has been set in the generating process
            // 不是批量发送消息,则为消息分配一个全局消息ID
            if (!(msg instanceof MessageBatch)) {
                MessageClientIDSetter.setUniqID(msg);
            }
 
            // 设置消息instanceId
            boolean topicWithNamespace = false;
            if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                topicWithNamespace = true;
            }
 
            // 设置消息是否压缩,消息体 > 4KB,则压缩
            int sysFlag = 0;
            boolean msgBodyCompressed = false;
            if (this.tryToCompressMessage(msg)) {
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG; // |= ,是二进制位或运算,如:a = a | b
                sysFlag |= compressType.getCompressionFlag();
                msgBodyCompressed = true;
            }
 
            // 是否是事务Prepare消息
            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (Boolean.parseBoolean(tranMsg)) {
                sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; // 事务消息标签
            }
 
            // 判断是否注册了禁止消息发送钩子函数,即:判定checkForbiddenHookList不为空
            if (hasCheckForbiddenHook()) {
                CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                checkForbiddenContext.setCommunicationMode(communicationMode);
                checkForbiddenContext.setBrokerAddr(brokerAddr);
                checkForbiddenContext.setMessage(msg);
                checkForbiddenContext.setMq(mq);
                checkForbiddenContext.setUnitMode(this.isUnitMode());
                this.executeCheckForbiddenHook(checkForbiddenContext);
            }
 
            // 判断是否注册了消息发送钩子函数,即:判定sendMessageHookList不为空
            // 注册钩子函数:DefaultMQProducerImpl.registerSendMessageHook
            if (this.hasSendMessageHook()) {
                context = new SendMessageContext();
                context.setProducer(this);
                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                context.setCommunicationMode(communicationMode);
                context.setBornHost(this.defaultMQProducer.getClientIP());
                context.setBrokerAddr(brokerAddr);
                context.setMessage(msg);
                context.setMq(mq);
                context.setNamespace(this.defaultMQProducer.getNamespace());
                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (isTrans != null && isTrans.equals("true")) {
                    context.setMsgType(MessageType.Trans_Msg_Half);
                }
 
                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                    context.setMsgType(MessageType.Delay_Msg);
                }
                this.executeSendMessageHookBefore(context);
            }
 
            // 创建发送消息请求头
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); // 默认主题在该broker的队列数
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setSysFlag(sysFlag); // 系统标记
            requestHeader.setBornTimestamp(System.currentTimeMillis()); // 消息发送时间
            requestHeader.setFlag(msg.getFlag()); // 消息标记,目前不做处理
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            requestHeader.setReconsumeTimes(0); // 消息重试次数
            requestHeader.setUnitMode(this.isUnitMode());
            requestHeader.setBatch(msg instanceof MessageBatch); // 是否批量消息
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                if (reconsumeTimes != null) {
                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                }
 
                String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                if (maxReconsumeTimes != null) {
                    requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                }
            }
 
            SendResult sendResult = null;
            switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    if (msgBodyCompressed) {
                        //If msg body was compressed, msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        msg.setBody(prevBody);
                    }
 
                    if (topicWithNamespace) {
                        if (!messageCloned) {
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                    }
 
                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        brokerName,
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.mQClientFactory,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                        context,
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    // 发送消息是否超时
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    // 发送消息请求
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        brokerName,
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            }
 
            // 执行发送后钩子函数
            if (this.hasSendMessageHook()) {
                context.setSendResult(sendResult);
                this.executeSendMessageHookAfter(context);
            }
 
            return sendResult;
        } catch (RemotingException e) {
            // 执行发送后钩子函数
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (MQBrokerException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (InterruptedException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } finally {
            msg.setBody(prevBody);
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
    }
 
    // 没有broker地址,则抛出异常
    throw new MQClientException("The broker[" + brokerName + "] not exist", null);
}

执行发送消息请求,根据同步、异步、单向发送方式进行网络传输,其发送消息请求码都是RequestCode.SEND_MESSAGE。其实现方法是org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage,其代码如下。

/**
 * 发送消息请求
 * @param addr broker地址
 * @param brokerName broker名称
 * @param msg 待发送消息 {@link Message}
 * @param requestHeader 发送消息请求头 {@link SendMessageRequestHeader}
 * @param timeoutMillis 发送超时时间
 * @param communicationMode 发送消息模式,如:同步、异步、单向
 * @param sendCallback 异步发送消息回调函数 {@link SendCallback}
 * @param topicPublishInfo topic发布路由信息 {@link TopicPublishInfo}
 * @param instance {@link MQClientInstance}
 * @param retryTimesWhenSendFailed 重试次数
 * @param context {@link SendMessageContext}
 * @param producer {@link DefaultMQProducerImpl}
 * @return
 * @throws RemotingException
 * @throws MQBrokerException
 * @throws InterruptedException
 */
public SendResult sendMessage(
    final String addr,
    final String brokerName,
    final Message msg,
    final SendMessageRequestHeader requestHeader,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final MQClientInstance instance,
    final int retryTimesWhenSendFailed,
    final SendMessageContext context,
    final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    RemotingCommand request = null;
 
    // 根据消息类型,判断发送请求码
    String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
    boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
    if (isReply) {
        if (sendSmartMsg) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
        }
    } else {
        // 批量消息发送,请求码SEND_BATCH_MESSAGE
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        }
        // 单个消息发送,请求码SEND_MESSAGE
        else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }
    }
    request.setBody(msg.getBody()); // 请求体,就是消息体
 
    /*
        发送消息请求到broker端
        broker端处理发送消息总入口:SendMessageProcessor.processRequest
     */
    switch (communicationMode) {
        case ONEWAY:
            this.remotingClient.invokeOneway(addr, request, timeoutMillis);
 
            return null;
        case ASYNC:
            final AtomicInteger times = new AtomicInteger();
 
            long costTimeAsync = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTimeAsync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
 
            this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                retryTimesWhenSendFailed, times, context, producer);
 
            return null;
        case SYNC:
 
            long costTimeSync = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTimeSync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
 
            return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
        default:
            assert false;
            break;
    }
 
    return null;
}

2).Broker接收发送消息

org.apache.rocketmq.broker.processor.SendMessageProcessor是Broker接收消息的实现类,其代码如下。

// broker处理发送消息的请求
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    SendMessageContext traceContext;
    switch (request.getCode()) {
        // 消费段消费ACK确认发送请求入口MQClientAPIImpl.consumerSendMessageBack
        case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.consumerSendMsgBack(ctx, request);
        default:
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
                return null;
            }
            // broker的topic映射队列上下文
            TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true);
            RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
            if (rewriteResult != null) {
                return rewriteResult;
            }
            traceContext = buildMsgContext(ctx, requestHeader);
            String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
            traceContext.setCommercialOwner(owner);
 
            // 执行发送前钩子函数
            try {
                this.executeSendMessageHookBefore(ctx, request, traceContext);
            } catch (AbortProcessException e) {
                final RemotingCommand errorResponse = RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage());
                errorResponse.setOpaque(request.getOpaque());
                return errorResponse;
            }
 
            RemotingCommand response;
            // 批量消息
            if (requestHeader.isBatch()) {
                response = this.sendBatchMessage(ctx, request, traceContext, requestHeader, mappingContext,
                    (ctx1, response1) -> executeSendMessageHookAfter(response1, ctx1));
            }
            // 单个消息
            else {
                response = this.sendMessage(ctx, request, traceContext, requestHeader, mappingContext,
                    (ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));
            }
 
            return response;
    }
}

五、参考资料
https://www.cnblogs.com/wzh2010/p/16629876.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/728342.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Pandas+Pyecharts | 北京近五年历史天气数据可视化

文章目录 &#x1f3f3;️‍&#x1f308; 1. 导入模块&#x1f3f3;️‍&#x1f308; 2. Pandas数据处理2.1 读取数据2.2 处理最低气温最高气温数据2.3 处理日期数据2.4 处理风力风向数据 &#x1f3f3;️‍&#x1f308; 3. Pyecharts数据可视化3.1 2018-2022年历史温度分布…

漏洞复现 || H3C iMC 存在远程命令执行

免责声明 技术文章仅供参考,任何个人和组织使用网络应当遵守宪法法律,遵守公共秩序,尊重社会公德,不得利用网络从事危害国家安全、荣誉和利益,未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成的直接或间接后果和损失,均由使…

软件的验收测试应该怎么正确实施?

验收测试的主要目的是为了确定软件系统是否满足客户或最终用户的需求和期望&#xff0c;同时确保软件产品的质量标准达到预期。验收测试还可以提供客户和最终用户关于软件系统质量的反馈和建议&#xff0c;以便软件开发团队能够更好地改进和优化软件产品&#xff0c;那软件的验…

【QT】QtXlsx安装使用

QtXlsx库 QtXlsx介绍QtXlsx Qt配置简单使用示例 QtXlsx介绍 QtXlsx是一个可以读取和写入Excel文件的库。它不需要Microsoft Excel&#xff0c;可以在Qt5支持的任何平台上使用。 这里一定是需要QT5支持的。 生成一个新的 .xlsx 文件从现有的 .xlsx 文件中提取数据编辑现有的 .x…

Linux常用指令(下)

目录 一&#xff1a;Linux基础指令 查看联机手册 文本查看相关 时间相关 查找相关 打包和压缩相关 查看Linux版本和体系 其它指令和热键 二&#xff1a;重定向 输入重定向 输出重定向 三&#xff1a;管道 一&#xff1a;Linux基础指令 查看联机手册 Linux的命令有…

ADS笔记,新旧两组仿真数据进行绘图和列表对比

做个笔记&#xff0c;以防遗忘 ADS版本&#xff1a;2023 原理图器件参数的不同&#xff0c;怎么进行对比观看&#xff0c;操作如下 目录 一、数据绘图对比二、数据列表对比 一、数据绘图对比 选择Simulation Setting 然后修改原理图器件的参数&#xff0c;再次重复之前的操作…

SpringBoot2+Vue2实战(十三)用户前台页面设计与实现

Front.vue <template><div><!--头部--><div style"display: flex; height: 60px;line-height: 60px;border-bottom: 1px solid #ccc"><div style"width: 300px;display: flex;padding-left: 30px"><div style"widt…

CENTOS上的网络安全工具(二十七)SPARK+NetSA Security Tools容器化部署(3)

上回说到在我们搭好的YAF3环境上使用yaf处理pcap文件得到silk flow&#xff0c;再使用super mediator工具转为ipfix&#xff0c;继而在spark中导入mothra&#xff0c;就可以开始数据分析了。然而在我们粗粗一用之下&#xff0c;却发现DPI信息在ipfix文件中找不到&#xff0c;到…

【Excel】csv乱码

原因 CSV用UTF-8编码 Excel用ANSI编码 解决 1 创建一个新的Excel 2 数据 > 从文本/CSV 3 选择文件 4 选择 文件原始格式 和 分隔符 &#xff08;根据自己文件进行选择&#xff0c;如果不知道编码&#xff0c;可以一个一个的试&#xff0c;直到不出现乱码&#xff09;

【Go|第5期】Lorca无法正常运行的解决方案

日期&#xff1a;2023年7月5日 作者&#xff1a;Commas 签名&#xff1a;(ง •_•)ง 积跬步以致千里,积小流以成江海…… 注释&#xff1a;如果您觉得有所帮助&#xff0c;帮忙点个赞&#xff0c;也可以关注我&#xff0c;我们一起成长&#xff1b;如果有不对的地方&#xff…

奇怪的SQL问题+1

我的 VIP 用户又抛给我一个 SQL 问题&#xff0c;我很激动&#xff0c;因为素材又来了&#xff1a; 我一看&#xff0c;这个表没什么花头&#xff0c;不就是没设置主键吗&#xff0c;MySQL 会默认生成一个主键&#xff0c;这跟 delete 不掉数据好像也没啥关系。 然后他说&…

事件监听及DOM操作

1.页面内容实现 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>常见事件案例</title> </head> <body><img id"light" src"img/off.gif"> <br><…

红黑树的介绍

红黑树 1.红黑树的概念2. 红黑树的性质3. 红黑树的结点定义4. 红黑树的插入操作情况一: cur为红&#xff0c;p为红&#xff0c;g为黑&#xff0c;u存在且为红情况二: cur为红&#xff0c;p为红&#xff0c;g为黑&#xff0c;u不存在/u存在且为黑情况三: cur为红&#xff0c;p为…

Distributional Graphormer:从分子结构预测到平衡分布预测

编者按&#xff1a;近年来&#xff0c;深度学习技术在分子微观结构预测中取得了巨大的进展。然而&#xff0c;分子的宏观属性和功能往往取决于分子结构在平衡态下的分布&#xff0c;仅了解分子的微观结构还远远不够。在传统的统计力学中&#xff0c;分子动力学模拟或增强采样等…

【计算机视觉 | 目标检测】arxiv 计算机视觉关于目标检测的学术速递(7 月 6 日论文合集)

文章目录 一、检测相关(16篇)1.1 Large-scale Detection of Marine Debris in Coastal Areas with Sentinel-21.2 Unbalanced Optimal Transport: A Unified Framework for Object Detection1.3 Detecting Images Generated by Deep Diffusion Models using their Local Intrin…

Oracle单行函数(字符,数值,日期,转换)

Oracle单行函数&#xff08;字符&#xff0c;数值&#xff0c;日期&#xff0c;转换&#xff09; 前言 1、字符函数 1.1大小写转换函数 1.2连接字符串X和concat(X,Y) 1.3ASCII码与字符转换 1.4返回字符串索引位置&#xff1a;instr(x,str) 1.5返回字符串长度&#xff1a;length…

使用Plotly创建自定义指标图表

大家好&#xff0c;使用Plotly可以创建和自定义指标图表&#xff0c;本文中将介绍如何使用Plotly库创建指标图表的具体操作步骤。 Plotly简介 Plotly是一个强大的数据可视化工具&#xff0c;允许我们使用Python创建各种交互式绘图和图表。在Plotly提供的无数类型的图表中&…

【MySQL】MySQL里程碑

个人主页&#xff1a;【&#x1f60a;个人主页】 系列专栏&#xff1a;【❤️MySQL】 文章目录 时间表从产品特性的角度梳理其发展过程中了解MySQL里程碑事件 时间表 从产品特性的角度梳理其发展过程中了解MySQL里程碑事件 1995年&#xff0c;MySQL 1.0发布&#xff0c;仅供内…

【LeetCode周赛】2022上半年题目精选集——贪心

文章目录 2136. 全部开花的最早一天&#xff08;贪心&#xff09;⭐⭐⭐⭐⭐思路代码语法解析&#xff1a;Integer[] id IntStream.range(0, plantTime.length).boxed().toArray(Integer[]::new); 2141. 同时运行 N 台电脑的最长时间&#xff08;贪心&#xff09;⭐⭐⭐⭐⭐解…

一分钟带你创建百万测试数据,玩转软件测试

准备测试数据是软件测试中非常重要的一个环节&#xff0c;无论是手工测试、动化测试还是性能测试&#xff0c;生成大量测试数据以评估性能是一项重要任务。 然而&#xff0c;寻找合适的测试数据并确保其质量常常是一项繁琐且耗时的工作。 先来看一下准备测试数据常见的四类方法…