RocketMQ源码阅读-Producer消息发送

news2025/4/28 8:28:12

RocketMQ源码阅读-Producer消息发送

  • 1. 从单元测试入手
  • 2. 启动过程
  • 3. 同步消息发送过程
  • 4. 异步消息发送过程
  • 5. 小结

Producer是消息的生产者。

Producer和Consummer对Rocket来说都是Client,Server是Broker。

客户端在源码中是一个单独的Model,目录为rocketmq/client。

类DefaultMQProducer是Producer的默认入口实现类。
image.png
继承了类ClientConfig,客户端配置类,存储上下文配置信息。
实现接口MQProducer:
image.png
定义了Producer对外提供的接口,也就是所有的发送消息的方法,同时MQProducer接口继承了MQAdmin接口。
如上图中MQAdmin是元数据管理接口,定义了对Message操作的一些方法。

DefaultMQProducer中有一个重要的成员变量:
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;

DefaultMQProducer的所有操作,基本没什么业务逻辑,都是调用DefaultMQProducerImpl类中的方法。

DefaultMQProducerImpl类是Producer操作的具体实现类。

1. 从单元测试入手

看源码的流程都是从单元测试入手

Producer的单元测试在类org.apache.rocketmq.client.producer.DefaultMQProducerTest中。
单元测试的所有方法,就对应着这个类的全部功能。
DefaultMQProducerTest的方法列表如下:

其中 init 和 terminate 是测试开始初始化和测试结束销毁时需要执行的代码。
其他的方法是测试不同功能的测试用例,也就是测试不同的发消息的方式。

2. 启动过程

从单元测试中的init方法入手:

@Before
public void init() throws Exception {
    String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
    // 创建一个Producer,并赋予名字
    producer = new DefaultMQProducer(producerGroupTemp);
	// 设置NameServer的地址
    producer.setNamesrvAddr("127.0.0.1:9876");
	// 消息长度大于16开启压缩
    producer.setCompressMsgBodyOverHowmuch(16);
	// 创建不同的Message
    message = new Message(topic, new byte[] {'a'});
    zeroMsg = new Message(topic, new byte[] {});
    bigMessage = new Message(topic, "This is a very huge message!".getBytes());

	// 启动Producer
    producer.start();

	// 设置mQClientFactory和mQClientAPIImpl,后面再讲
    Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
    field.setAccessible(true);
    field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);

    field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
    field.setAccessible(true);
    field.set(mQClientFactory, mQClientAPIImpl);

	// 注册Producer
    producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());

    when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
        nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
    when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
        nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
        .thenReturn(createSendResult(SendStatus.SEND_OK));
}

这段代码就是创建了一个DefaultMQProducer,设置参数,并调用start()方法启动,之后注册到NameServer中。

首先,看下DefaultMQProducer#start()方法:

@Override
public void start() throws MQClientException {
    this.setProducerGroup(withNamespace(this.producerGroup));
	// 直接调用defaultMQProducerImpl的start方法
    this.defaultMQProducerImpl.start();
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

其中直接调用了DefaultMQProducerImpl的start方法:

public void start() throws MQClientException {
    this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }

            // 获取MQClientInstance实例
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
        	// 注册
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                            null);
            }

            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            if (startFactory) {
                // 启动mQClientFactory
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                     this.defaultMQProducer.isSendMessageWithVIPChannel());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                                        + this.serviceState
                                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                                        null);
        default:
            break;
    }
	// 给所有的broker发心跳
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

RocketMQ 使用一个成员变量 serviceState 来记录和管理自身的服务状态,这实际上是状态模式 (State Pattern) 这种设计模式的变种实现。

对于启动过程,状态serviceState为CREATE_JUST。
CREATE_JUST分支中,会获取一个MQClientManager(单例模式),通过方法getAndCreateMQClientInstance()获取一个MQClientInstance实例,赋值给成员变量:

private MQClientInstance mQClientFactory;

然后调用MQClientInstance的registerProducer()方法,将自己注册到MQClientInstance中。
随后,调用MQClientInstance的start(),启动mQClientFactory。
最后,给所有的broker发心跳。

进一步看MQClientInstance的start()方法:

public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

这一部分代码的注释比较清楚,流程是这样的:

  1. 启动实例 mQClientAPIImpl,其中 mQClientAPIImpl 是类 MQClientAPIImpl 的实例,封装了客户端与 Broker 通信的方法;
  2. 启动各种定时任务,包括与 Broker 之间的定时心跳,定时与 NameServer 同步数据等任务;
  3. 启动拉取消息服务;
  4. 启动 Rebalance 服务;
  5. 启动默认的 Producer 服务。

以上就是 Producer 的启动流程。

3. 同步消息发送过程

分析 Producer 发送消息的流程。
接口 MQProducer 中,定义了 19 个不同参数的发消息的方法。
这19个接口可以分为3类:

  • 单向发送(Oneway):发送消息后立即返回,不处理响应,不关心是否发送成功;
  • 同步发送(Sync):发送消息后等待响应;
  • 异步发送(Async):发送消息后立即返回,在提供的回调方法中处理响应。

先看下同步发送消息的方法(异步发送消息,只是将同步发送方法提交给线程池)。
DefaultMQProducer中对同步发送方法的实现为:

@Override
public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    Validators.checkMessage(msg, this);
    msg.setTopic(withNamespace(msg.getTopic()));
    return this.defaultMQProducerImpl.send(msg);
}

实际调用的为DefaultMQProducerImpl的send()方法:

public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

最终调用的方法为DefaultMQProducerImpl的sendDefaultImpl()方法,源码位置为517行:
(源码偏长,在下方进行解读)

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    this.makeSureStateOK();
    Validators.checkMessage(msg, this.defaultMQProducer);

    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    // 获取Topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            // 选择一个消息要发送的队列
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    if (times > 0) {
                        //Reset topic with namespace during resend.
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
                	// 发消息
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }

                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) {
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQClientException e) {
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQBrokerException e) {
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    switch (e.getResponseCode()) {
                        case ResponseCode.TOPIC_NOT_EXIST:
                        case ResponseCode.SERVICE_NOT_AVAILABLE:
                        case ResponseCode.SYSTEM_ERROR:
                        case ResponseCode.NO_PERMISSION:
                        case ResponseCode.NO_BUYER_ID:
                        case ResponseCode.NOT_IN_CURRENT_UNIT:
                            continue;
                        default:
                            if (sendResult != null) {
                                return sendResult;
                            }

                            throw e;
                    }
                } catch (InterruptedException e) {
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());

                    log.warn("sendKernelImpl exception", e);
                    log.warn(msg.toString());
                    throw e;
                }
            } else {
                break;
            }
        }

        if (sendResult != null) {
            return sendResult;
        }

        String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
            times,
            System.currentTimeMillis() - beginTimestampFirst,
            msg.getTopic(),
            Arrays.toString(brokersSent));

        info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

        MQClientException mqClientException = new MQClientException(info, exception);
        if (callTimeout) {
            throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
        }

        if (exception instanceof MQBrokerException) {
            mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
        } else if (exception instanceof RemotingConnectException) {
            mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
        } else if (exception instanceof RemotingTimeoutException) {
            mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
        } else if (exception instanceof MQClientException) {
            mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
        }

        throw mqClientException;
    }

    List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
    if (null == nsList || nsList.isEmpty()) {
        throw new MQClientException(
            "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
    }

    throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

首先是获取Topic信息,DefaultMQProducerImpl#tryToFindTopicPublishInfo():

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // 缓存中获取 Topic发布信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    // 当无可用的 Topic发布信息时,从Namesrv获取一次
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
	// 若获取的 Topic发布信息时候可用,则返回
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        // 使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。用于 Topic发布信息不存在 && Broker支持自动创建Topic
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

优先从缓存中获取Topic路由信息,如果缓存没有,就从NameServer中获取。

然后选择消息要发送的队列,DefaultMQProducerImpl#selectOneMessageQueue():

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

实际调用MQFaultStrategy的selectOneMessageQueue方法:
根据 Topic发布信息 选择一个消息队列

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        try {
            // 获取 brokerName=lastBrokerName && 可用的一个消息队列
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }
			// 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
    	// 选择一个消息队列,不考虑队列的可用性
        return tpInfo.selectOneMessageQueue();
    }
	// 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

选好队列后返回到DefaultMQProducerImpl#sendDefaultImpl()方法中。

然后DefaultMQProducerImpl#sendDefaultImpl()方法中继续调用sendKernelImpl()方法发送消息,sendKernelImpl()源码如下:

private SendResult sendKernelImpl(final Message msg,
                                  final MessageQueue mq,
                                  final CommunicationMode communicationMode,
                                  final SendCallback sendCallback,
                                  final TopicPublishInfo topicPublishInfo,
                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    // 获取 broker地址
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }

    SendMessageContext context = null;
    if (brokerAddr != null) {
        // 是否使用broker vip通道。broker会开启两个端口对外服务。
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

        byte[] prevBody = msg.getBody();// 记录消息内容。下面逻辑可能改变消息内容,例如消息压缩。
        try {
            //for MessageBatch,ID has been set in the generating process
            if (!(msg instanceof MessageBatch)) {
                // 设置唯一编号
                MessageClientIDSetter.setUniqID(msg);
            }

            boolean topicWithNamespace = false;
            if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                topicWithNamespace = true;
            }

            // 消息压缩
            int sysFlag = 0;
            boolean msgBodyCompressed = false;
            if (this.tryToCompressMessage(msg)) {
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                msgBodyCompressed = true;
            }
            // 事务
            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
            }
        	// hook:发送消息校验
            if (hasCheckForbiddenHook()) {
                CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                checkForbiddenContext.setCommunicationMode(communicationMode);
                checkForbiddenContext.setBrokerAddr(brokerAddr);
                checkForbiddenContext.setMessage(msg);
                checkForbiddenContext.setMq(mq);
                checkForbiddenContext.setUnitMode(this.isUnitMode());
                this.executeCheckForbiddenHook(checkForbiddenContext);
            }
        	// hook:发送消息前逻辑
            if (this.hasSendMessageHook()) {
                context = new SendMessageContext();
                context.setProducer(this);
                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                context.setCommunicationMode(communicationMode);
                context.setBornHost(this.defaultMQProducer.getClientIP());
                context.setBrokerAddr(brokerAddr);
                context.setMessage(msg);
                context.setMq(mq);
                context.setNamespace(this.defaultMQProducer.getNamespace());
                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (isTrans != null && isTrans.equals("true")) {
                    context.setMsgType(MessageType.Trans_Msg_Half);
                }

                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                    context.setMsgType(MessageType.Delay_Msg);
                }
                this.executeSendMessageHookBefore(context);
            }
			// 构建发送消息请求
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setSysFlag(sysFlag);
            requestHeader.setBornTimestamp(System.currentTimeMillis());
            requestHeader.setFlag(msg.getFlag());
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            requestHeader.setReconsumeTimes(0);
            requestHeader.setUnitMode(this.isUnitMode());
            requestHeader.setBatch(msg instanceof MessageBatch);
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                if (reconsumeTimes != null) {
                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                }

                String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                if (maxReconsumeTimes != null) {
                    requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                }
            }
        	// 发送消息
            SendResult sendResult = null;
            switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    if (msgBodyCompressed) {
                        //If msg body was compressed, msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        msg.setBody(prevBody);
                    }

                    if (topicWithNamespace) {
                        if (!messageCloned) {
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                    }

                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.mQClientFactory,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                        context,
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    // 同步发送消息
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            }
        	// hook:发送消息后逻辑
            if (this.hasSendMessageHook()) {
                context.setSendResult(sendResult);
                this.executeSendMessageHookAfter(context);
            }
        	// 返回发送结果
            return sendResult;
        } catch (RemotingException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (MQBrokerException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (InterruptedException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } finally {
            msg.setBody(prevBody);
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
    }
    // broker为空抛出异常
    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

也很长,但是逻辑简单,主要功能就是构建发送消息的头 RequestHeader 和上下文 SendMessageContext,然后调用方法 MQClientAPIImpl#sendMessage()(上述代码第150行),将消息发送给队列所在的 Broker。

至此,消息被发送给远程调用的封装类 MQClientAPIImpl,完成后续序列化和网络传输等步骤。

这一篇主要看发送代码,后续序列化和网络传输等步骤的代码后面再去探究。

ps:推荐IEDA插件SequenceDiagram,能一键生成时序图

整体的时序图为:
同步发送消息时序图.png

4. 异步消息发送过程

上一节讲到异步发送消息,只是将同步发送方法提交给线程池。对于MQProducer接口中方法为:

void send(final Message msg, final SendCallback sendCallback, final long timeout)
	throws MQClientException, RemotingException, InterruptedException;

也就是带有回调方法,DefaultMQProducer中的实现为:

@Override
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
    msg.setTopic(withNamespace(msg.getTopic()));
    this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}

调用了DefaultMQProducerImpl的send方法:

@Deprecated
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
    final long beginStartTime = System.currentTimeMillis();
    ExecutorService executor = this.getAsyncSenderExecutor();
    try {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeout > costTime) {
                    try {
                        sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
                    } catch (Exception e) {
                        sendCallback.onException(e);
                    }
                } else {
                    sendCallback.onException(
                        new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
                }
            }

        });
    } catch (RejectedExecutionException e) {
        throw new MQClientException("executor rejected ", e);
    }

}

可以看到,实际上是将DefaultMQProducerImpl的sendDefaultImpl()方法(对应源码517行)提交到线程池执行,后面的流程都和同步发送一致,参看上一节分析。

异步发送消息时序图为:
异步发消息时序图.png
至此,异步消息发送也完成了。

5. 小结

MQProducer定义了19种发送消息的方法,其默认实现为DefaultMQProducer。DefaultMQProducer中的方法没有业务逻辑,最终会调用DefaultMQProducerImpl类中的具体逻辑。

DefaultMQProducerImpl中发消息的方法最终都会调用其sendKernelImpl()方法,其主要功能就是构建发送消息的头 RequestHeader 和上下文 SendMessageContext,然后调用方法 MQClientAPIImpl#sendMessage(),将消息发送给队列所在的 Broker。

最终消息被发送给远程调用的封装类 MQClientAPIImpl,完成后续序列化和网络传输等步骤。

类之间的关系为:
发消息类图.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1383430.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

精确掌控并发:固定时间窗口算法在分布式环境下并发流量控制的设计与实现

这是《百图解码支付系统设计与实现》专栏系列文章中的第&#xff08;14&#xff09;篇。点击上方关注&#xff0c;深入了解支付系统的方方面面。 本篇主要介绍分布式场景下常用的并发流量控制方案&#xff0c;包括固定时间窗口、滑动时间窗口、漏桶、令牌桶、分布式消息中间件…

python + ddt数据驱动 之 多个参数

案例&#xff1a;打开https://www.csdn.net/&#xff0c;进行登录&#xff0c;查看结果 不使用ddt数据驱动&#xff1a; import unittest from selenium import webdriver import timeclass CSDNTestCase(unittest.TestCase):def setUp(self):# 打开chrome浏览器self.driver …

vue2实现日历12个月平铺,显示工作日休息日

参考&#xff1a;https://blog.csdn.net/weixin_40292154/article/details/125312368 1.组件DateCalendar.vue&#xff0c;sass改为less <template><div class"cc-calendar"><div class"calendar-title"><span>{{ year }}年{{ mo…

线性调频信号的解线调(dechirp,去斜)处理matlab仿真

线性调频信号的解线调 线性调频信号的回波模型参考信号去斜处理去斜处理傅里叶变换得到脉压结果解线调仿真总结 线性调频信号的回波模型 对于线性调频脉冲压缩雷达&#xff0c;其发射信号为&#xff1a; s ( t ) r e c t ( t T ) e x p ( j π μ t 2 ) \begin{equation} s(…

C++深入学习之STL:1、容器部分

标准模板库STL的组成 主要由六大基本组件组成&#xff1a;容器、迭代器、算法、适配器、函数对象(仿函数)以及空间配置器。 容器&#xff1a;就是用来存数据的&#xff0c;也称为数据结构。 本文要详述的是容器主要如下&#xff1a; 序列式容器&#xff1a;vector、list 关联…

网络爬虫丨基于scrapy+mysql爬取博客信息并保存到数据库中

文章目录 写在前面实验描述实验框架实验需求 实验内容1.安装依赖库2.创建Scrapy项目3.配置系统设置4.配置管道文件5.连接数据库6.分析要爬取的内容7.编写爬虫文件 运行结果写在后面 写在前面 本期内容&#xff1a;基于scrapymysql爬取博客信息并保存到数据库中 实验需求 ana…

人大金仓参与起草《数据库运维管理能力成熟度模型》标准

近日&#xff0c;由中国信息通信研究院、中国移动通信集团有限公司、人大金仓等单位参与起草的《数据库运维管理能力成熟度模型》标准正式发布。本标准适用于金融、电信、互联网、能源等重点行业对内部数据库运维管理能力进行全面综合的评价。 数据库作为基础软件的核心组成部分…

18k+ start开源项目管理工具Focalboard centos部署教程

1.下载安装包 官方github地址 https://github.com/mattermost/focalboard 发行版下载地址 https://github.com/mattermost/focalboard/releases/download/v7.10.6/focalboard-server-linux-amd64.tar.gz 插件下载地址 https://github.com/mattermost/focalboard/releases/down…

Http协议、HttpClient

HTTP请求协议包 http服务器 HTTP Server 也是我们常说的Web服务器 在网络中传递的信息都是以【二进制】形式存在的&#xff0c;接收方在接收信息后需要把二进制数据编译为原数据。 弊端&#xff1a;HTTP协议无法实现服务器主动向客户端发起消息。 http服务器需要 1、可以接…

Apollo之原理和使用讲解

文章目录 1 Apollo1.1 简介1.1.1 背景1.1.2 简介1.1.3 特点 1.2 基础模型1.3 Apollo 四个维度1.3.1 application1.3.2 environment1.3.3 cluster1.3.4 namespace 1.4 本地缓存1.5 客户端设计1.5.1 客服端拉取原理1.5.2 配置更新推送实现 1.6 总体设计1.7 可用性考虑 2 操作使用…

鸿蒙应用开发尝鲜:初识HarmonyOS

初识HarmonyOS 来源:华为官方网站 : https://developer.huawei.com/ 相信大家对鸿蒙应用开发也不在陌生,很多身处互联网行业或者不了解的人们现在也一定都听说过华为鸿蒙.这里我将不再说废话,直接步入正题 鸿蒙应用开发语言 HarmonyOS应用开发采用的是ArkTS语言,ArkTS是在Typ…

sublime中添加GBK编码模式

当写代码的中文注释时&#xff0c;编译代码出现如下错误&#xff1a; 解决办法&#xff0c;添加GBK模式&#xff1a; &#xff11;. 点击Preferences -> Package Control&#xff1a; 2. 在跳出来的搜索框里搜索conver, 点击ConverToUTF8 3. File左上角会多出GBK的选项 由…

arcgis javascript api4.x加载天地图web墨卡托(wkid:3857)坐标系

效果&#xff1a; 示例代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta http-equiv&quo…

HarmonyOS-LocalStorage:页面级UI状态存储

管理应用拥有的状态概述 上一个章节中介绍的装饰器仅能在页面内&#xff0c;即一个组件树上共享状态变量。如果开发者要实现应用级的&#xff0c;或者多个页面的状态数据共享&#xff0c;就需要用到应用级别的状态管理的概念。ArkTS根据不同特性&#xff0c;提供了多种应用状态…

LeetCode讲解篇之2280. 表示一个折线图的最少线段数

文章目录 题目描述题解思路题解代码 题目描述 题解思路 折线图中如果连续的线段共线&#xff0c;那么我们可以可以将其合并成一条线段 首先将坐标点按照横坐标升序排序 然后遍历数组 我们可以通过计算前一个线段的斜率和当前线段的斜率来判断是否共线 如果二者相等&#x…

[NSSCTF Round#16 Basic]RCE但是没有完全RCE

[NSSCTF Round#16 Basic]RCE但是没有完全RCE 第一关 <?php error_reporting(0); highlight_file(__file__); include(level2.php); if (isset($_GET[md5_1]) && isset($_GET[md5_2])) {if ((string)$_GET[md5_1] ! (string)$_GET[md5_2] && md5($_GET[md…

【剑指offer】数组中重复的数字

&#x1f451;专栏内容&#xff1a;力扣刷题⛪个人主页&#xff1a;子夜的星的主页&#x1f495;座右铭&#xff1a;前路未远&#xff0c;步履不停 目录 一、题目描述1、题目2、示例 二、题目分析1、双重for循环2、for-each 循环3、set集合 一、题目描述 1、题目 剑指offer&a…

2024.1.13力扣每日一题——构造限制重复的字符串

2024.1.13 题目来源我的题解方法一 计数模拟 题目来源 力扣每日一题&#xff1b;题序&#xff1a;2182 我的题解 方法一 计数模拟 因为字符串s由小写字母构成&#xff0c;因此使用一个int[26]的数组保存每个字符的数量&#xff0c;然后从最大的字符开始构造结果字符串sb&…

自编C++题目——输入程序

预估难度 简单 题目描述 小明编了一个输入程序&#xff0c;当用户的输入之中有<时&#xff0c;光标移动到最右边&#xff1b;当输入有>时&#xff0c;光标移动到最左边&#xff0c;当输入有^时&#xff0c;光标移动到前一个字符&#xff0c;当输入为#时&#xff0c;清…

深度学习中的稀疏注意力

稀疏注意力 文章目录 一、稀疏注意力的特点 1. 单头注意力&#xff08;Single-Head Attention&#xff09; 2. 多头注意力&#xff08;Multi-Head Attention&#xff09; 3. 稀疏注意力&#xff08;Sparse Attention&#xff09; 二、稀疏注意力的示意图 三、与Flash Attention…