【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPushConsumer的实现原理及源码分析

news2024/9/20 9:40:56

RocketMQ开源是使用文件作为持久化工具,阿里内部未开源的性能会更高,使用oceanBase作为持久化工具。 在RocketMQ1.x和2.x使用zookeeper管理集群,3.x开始使用nameserver代替zk,更轻量级,此外RocketMQ的客户端拥有两种的操作方式:DefaultMQPushConsumer和DefaultMQPullConsumer。

<dependency>
   <groupId>org.apache.rocketmqgroupId>
   <artifactId>rocketmq-clientartifactId>
   <version>4.3.0version>
dependency>

以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始

public class MQPushConsumer {
    public static void main(String[] args) throws MQClientException {
        String groupName = "rocketMqGroup1";

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);

        consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setMessageModel(MessageModel.BROADCASTING);

        consumer.subscribe("order-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(List mgs,
                    ConsumeConcurrentlyContext consumeconcurrentlycontext) {
                System.out.println(Thread.currentThread().getName()+"Receive New Messages:"+mgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
  • CLUSTERING:默认模式,同一个ConsumerGroup(groupName相同)每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所
  • 订阅topic整体,从而达到负载均衡的目的
  • BROADCASTING:同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。

ConsumeConcurrentlyStatus.RECONSUME_LATER boker会根据设置的messageDelayLevel发起重试,默认16次。

DefaultMQPushConsumerImpl中各个对象的主要功能如下:

RebalancePushImpl:主要负责决定,当前的consumer应该从哪些Queue中消费消息;

  • 1)PullAPIWrapper:长连接,负责从broker处拉取消息,然后利用ConsumeMessageService回调用户的Listener执行消息消费逻辑;
  • 2)ConsumeMessageService:实现所谓的"Push-被动"消费机制;从Broker拉取的消息后,封装成ConsumeRequest提交给ConsumeMessageSerivce,此service负责回调用户的Listener消费消息;
  • 3)OffsetStore:维护当前consumer的消费记录(offset);有两种实现,Local和Rmote,Local存储在本地磁盘上,适用于BROADCASTING广播消费模式;而Remote则将消费进度存储在Broker上,适用于CLUSTERING集群消费模式;
  • 4)MQClientFactory:负责管理client(consumer、producer),并提供多中功能接口供各个Service(Rebalance、PullMessage等)调用;大部分逻辑均在这个类中完成;

consumer.registerMessageListener执行过程:


    public void registerMessageListener(MessageListenerConcurrently messageListener) {
        this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
    }

通过源码可以看出主要实现过程在DefaultMQPushConsumerImpl类中consumer.start后调用DefaultMQPushConsumerImpl的同步start方法

public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                this.copySubscription();
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                  this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                this.consumeMessageService.start();
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        this.mQClientFactory.rebalanceImmediately();
    }

通过mQClientFactory.start();发我们发现他调用

public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;

                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }

                    this.mQClientAPIImpl.start();

                    this.startScheduledTask();

                    this.pullMessageService.start();

                    this.rebalanceService.start();

                  this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

在这个方法中有多个start,我们主要看pullMessageService.start();通过这里我们发现RocketMQ的Push模式底层其实也是通过pull实现的,下面我们来看下pullMessageService处理了哪些逻辑:

private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }

我们发现其实他还是通过DefaultMQPushConsumerImpl类的pullMessage方法来进行消息的逻辑处理.

PullRequest这里说明一下,上面我们已经提了一下rocketmq的push模式其实是通过pull模式封装实现的,pullrequest这里是通过长轮询的方式达到push效果。

长轮询方式既有pull的优点又有push模式的实时性有点。

  • push方式是server端接收到消息后,主动把消息推送给client端,实时性高。弊端是server端工作量大,影响性能,其次是client端处理能力不同且client端的状态不受server端的控制,如果client端不能及时处理消息容易导致消息堆积已经影响正常业务等。
  • pull方式是client循环从server端拉取消息,主动权在client端,自己处理完一个消息再去拉取下一个,缺点是循环的时间不好设定,时间太短容易忙等,浪费CPU资源,时间间隔太长client的处理能力会下降,有时候有些消息会处理不及时。

PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法进行消息拉取操作。

将回调类PullCallback传入该方法中,当采用异步方式拉取消息时,在收到响应之后会回调该回调类的方法。

public void pullMessage(final PullRequest pullRequest) {
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        if (processQueue.isDropped()) {
            log.info("the pull request[{}] is dropped.", pullRequest.toString());
            return;
        }
        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
        try {
            this.makeSureStateOK();
        } catch (MQClientException e) {
            log.warn("pullMessage exception, consumer state not ok", e);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            return;
        }
        if (this.isPause()) {
            log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
            return;
        }
        long cachedMessageCount = processQueue.getMsgCount().get();
        long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }
        if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }
        if (!this.consumeOrderly) {
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                        pullRequest, queueMaxSpanFlowControlTimes);
                }
                return;
            }
        } else {
            if (processQueue.isLocked()) {
                if (!pullRequest.isLockedFirst()) {
                    final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                    boolean brokerBusy = offset < pullRequest.getNextOffset();
                    log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                        pullRequest, offset, brokerBusy);
                    if (brokerBusy) {
                        log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                            pullRequest, offset);
                    }
                    pullRequest.setLockedFirst(true);
                    pullRequest.setNextOffset(offset);
                }
            } else {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                log.info("pull message later because not locked in broker, {}", pullRequest);
                return;
            }
        }
        final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (null == subscriptionData) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            log.warn("find the consumer's subscription failed, {}", pullRequest);
            return;
        }
        final long beginTimestamp = System.currentTimeMillis();
        PullCallback pullCallback = new PullCallback() {

            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);
                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }
                            if (pullResult.getNextBeginOffset() < prevRequestOffset
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                    pullResult.getNextBeginOffset(),
                                    firstMsgOffset,
                                    prevRequestOffset);
                            }
                            break;
                        case NO_NEW_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case NO_MATCHED_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("the pull request offset illegal, {} {}",
                                pullRequest.toString(), pullResult.toString());
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            pullRequest.getProcessQueue().setDropped(true);
                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

                                public void run() {
                                    try {
                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                            pullRequest.getNextOffset(), false);
                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                        log.warn("fix the pull request offset, {}", pullRequest);
                                    } catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e);
                                    }
                                }
                            }, 10000);
                            break;
                        default:
                            break;
                    }
                }
            }

            public void onException(Throwable e) {
                if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("execute the pull request exception", e);
                }
                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }
        };
        boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
            if (commitOffsetValue > 0) {
                commitOffsetEnable = true;
            }
        }
        String subExpression = null;
        boolean classFilter = false;
        SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (sd != null) {
            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                subExpression = sd.getSubString();
            }
            classFilter = sd.isClassFilterMode();
        }
        int sysFlag = PullSysFlag.buildSysFlag(
            commitOffsetEnable,
            true,
            subExpression != null,
            classFilter
        );
        try {

            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,

                CommunicationMode.ASYNC,
                pullCallback
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        }
    }

在MQClientAPIImpl.pullMessage方法中,根据入参communicationMode的值分为异步拉取和同步拉取方式两种。

无论是异步方式拉取还是同步方式拉取,在发送拉取请求之前都会构造一个ResponseFuture对象,以请求消息的序列号为key值,存入NettyRemotingAbstract.responseTable:ConcurrentHashMap, ResponseFuture>变量中,对该变量有几种情况会处理:

public PullResult pullMessage(
        final String addr,
        final PullMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
        switch (communicationMode) {
            case ONEWAY:
                assert false;
                return null;
            case ASYNC:
                this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
                return null;
            case SYNC:
                return this.pullMessageSync(addr, request, timeoutMillis);
            default:
                assert false;
                break;
        }
        return null;
    }

对于同步发送方式,调用MQClientAPIImpl.pullMessageSync(String addr, RemotingCommand request, long timeoutMillis)方法,大致步骤如下:

  • 以上一步的返回值RemotingCommand对象为参数调用MQClientAPIImpl. processPullResponse (RemotingCommand response)方法将返回对象解析并封装成PullResultExt对象然后返回给调用者,响应消息的结果状态转换如下:
    • 若RemotingCommand对象的Code等于SUCCESS,则PullResultExt.pullStatus=FOUND;
    • 若RemotingCommand对象的Code等于PULL_NOT_FOUND,则PullResultExt.pullStatus= NO_NEW_MSG;
    • 若RemotingCommand对象的Code等于PULL_RETRY_IMMEDIATELY,则PullResultExt.pullStatus= NO_MATCHED_MSG;
    • 若RemotingCommand对象的Code等于PULL_OFFSET_MOVED,则PullResultExt.pullStatus= OFFSET_ILLEGAL;

    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        long beginStartTime = System.currentTimeMillis();
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call timeout");
                }
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                if (this.rpcHook != null) {
                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                }
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

getMQClientAPIImpl().pullMessage最终通过channel写入并刷新队列中。然后在消息服务端大体的处理逻辑是服务端收到新消息请求后,如果队列中没有消息不急于返回,通过一个循环状态,每次waitForRunning一段时间默认5秒,然后再check,如果broker一直没有新新消息,第三次check的时间等到时间超过SuspendMaxTimeMills就返回空,如果在等待过程中收到了新消息直接调用notifyMessageArriving函数返回请求结果。"长轮询"的核心是,Broker端HOLD住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给 Consumer 。长轮询的主动权掌握在consumer中,即使broker有大量的消息堆积也不会主动推送给consumer。

分享资源

资源分享
获取以上资源请访问开源项目 点击跳转

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/897955.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【快应用】图片放大预览功能的实现

【关键词】 放大、image、background-position 【问题背景】 快应用中并没有直接的放大图片预览的功能&#xff0c;然而是可以利用现有的功能去实现图片的放大预览功能的。这样在快应用中浏览页面内容遇到图片时&#xff0c;遇到一些小图&#xff0c;觉得图片内容是不错的&am…

基于Three.js的WebXR渲染入门

1、Three.js 渲染管线快速概览 我不会花太多时间讨论 Three.JS 渲染管道的工作原理,因为它在互联网上有详细记录(例如,此链接)。 我将在下图中列出基础知识,以便更容易理解各个部分的去向。 2、WebXR 设备 API 入门 在我们深入了解 WebXR API 本身之前,您应该知道 WebX…

Linux如何改变文件的权限

Linux如何改变文件的权限 权限介绍权限更改关键字chmod通过数字修改通过字母修改 权限介绍 文件类型和文件权限由10个字符组成 文件的类型&#xff1a;- 表示文件&#xff0c; d 表示文件夹文件权限&#xff1a;r 表示读权限&#xff0c;w 表示写权限&#xff0c;x 表示执行权…

vue浏览器插件安装-各种问题

方法1&#xff1a;vue.js devtolls插件下载 https://blog.csdn.net/qq_55640378/article/details/131553642 下载地址&#xff1a; Tags vuejs/devtools GitHub npm install 或是 cnpm install 遇到的报错 设置淘宝镜像源&#xff08;推荐使用nrm&#xff0c;这一步是为…

219、仿真-基于51单片机L298直流电机开始停止正反转加减速Proteus仿真设计(程序+Proteus仿真+配套资料等)

毕设帮助、开题指导、技术解答(有偿)见文未 目录 一、硬件设计 二、设计功能 三、Proteus仿真图 四、程序源码 资料包括&#xff1a; 需要完整的资料可以点击下面的名片加下我&#xff0c;找我要资源压缩包的百度网盘下载地址及提取码。 方案选择 单片机的选择 方案一&a…

韦东山-电子量产工具项目:UI系统

代码结构 所有代码都已通过测试跑通&#xff0c;其中代码结构如下&#xff1a; 一、include文件夹 1.1 common.h #ifndef _COMMON_H #define _COMMON_Htypedef struct Region {int iLeftUpX; //区域左上方的坐标int iLeftUpY; //区域左下方的坐标int iWidth; //区域宽度…

英雄大战反派 Game Jam

围绕英雄与反派之间的冲突&#xff0c;制作惊心动魄的游戏。 所有游戏创作者和梦想家请注意&#xff01;准备释放您的创造力&#xff0c;进入一个充满冲突和传奇战役的世界。我们很高兴为您带来《英雄大战反派》终极 Game Jam&#xff0c;仅限使用 The Sandbox 的 Game Maker 进…

SVD奇异值分解相关技术

重要说明&#xff1a;本文从网上资料整理而来&#xff0c;仅记录博主学习相关知识点的过程&#xff0c;侵删。 一、参考资料 Eigen 矩阵的SVD分解 奇异值分解&#xff08;SVD&#xff09; 二、相关介绍 1. 单位阵 主对角线全为1的方阵&#xff0c;称为单位阵&#xff0c;…

unity 之 GetComponent 获取游戏对象上组件实例方法

GetComponent 简单介绍 GetComponent 是Unity引擎中用于获取游戏对象上组件实例的方法。它允许您从游戏对象中获取特定类型的组件&#xff0c;以便在脚本中进行操作和交互。 GetComponent< ComponentType >(): 这是一个泛型方法&#xff0c;用于从当前游戏对象上获取指定…

解锁数据中心高效监测精密空调,现学现用!

精密空调监控在当今科技发展中扮演着至关重要的角色。无论是数据中心、实验室还是医疗设施&#xff0c;都需要维持恒定的温度、湿度和空气质量&#xff0c;以确保设备的正常运行和数据的准确性。 精密空调监控不仅是现代高科技环境中的必备工具&#xff0c;也是保障设备稳定性、…

idea 本地版本控制 local history

idea 本地版本控制 local history 如何打开 1 自定义快捷键 settings->keymap->搜索框输入 show history -》Add Keyboard Shortcut -》设置为 CtrlAltL 2 右键文件-》local history -》show history 新建文件 版本1&#xff0c;creating class com.geekmice…这个是初…

同一个局域网主机中的一台主机连接另一台主机的虚拟机

星光下的赶路人star的个人主页 理想的路总是为有信心的人预备着 文章目录 1、描述问题2、解决前提3、解决办法4、实操4.1 虚拟机配置4.2 主机防火墙配置&#xff08;是你要连接虚拟机的所在的主机&#xff09;4.3 连接测试 1、描述问题 想要连接朋友主机的虚拟机&#xff0c;利…

光电效应波粒二象性

光电效应&#xff1a;光照射在金属表面时&#xff0c;瞬间&#xff08;t &#xff1c;10^-9s&#xff09;释放出电子的现象。 光电效应规律&#xff1a; 1、任何一种金属都有一个截止频率&#xff0c;低于该截止频率不能发生光电效应&#xff1b; 2、光电子最大初动能与入射光的…

“我30岁了,转行IT行业!还有没有出路?”

人到30&#xff0c;就容易产生中年危机。俗话说30而立&#xff0c;但其实很不容易&#xff0c;成家不易、立业也不易&#xff0c;尤其是现如今房价这么贵&#xff0c;物价那么高&#xff0c;各种压力随之而来。在职场中工作了几年&#xff0c;到近30岁时如果还是没有太大进展&a…

vue2+vue3封装使用svg图标

需求&#xff1a;1.在vue2中封装使用svg 2.在vue3中封装使用svg 3.在vue3中使用自定义插件封装多个组件 1.获取svg图标操作 在阿里巴巴矢量图标库找自己需要的svg图标 地址&#xff1a;阿里巴巴矢量图标库 随便找个图标点击下载 选择好尺寸后就可以点击复制svg代码了 在ass…

基于互斥锁的生产者消费者模型

文章目录 生产者消费者 定义代码实现 / 思路完整代码执行逻辑 / 思路 局部具体分析model.ccfunc&#xff08;消费者线程&#xff09; 执行结果 生产者消费者 定义 生产者消费者模型 是一种常用的 并发编程模型 &#xff0c;用于解决多线程或多进程环境下的协作问题。该模型包含…

Windows如何部署Jenkins

一、简介 Jenkins 是国际上流行的免费开源软件项目&#xff0c;基于Java 开发持续集成工具&#xff0c;用于监控持续重复的工作&#xff0c;提供一个开放的易用的软件平台&#xff0c;使软件的持续集成自动化&#xff0c;大大节约人力和时效。 二、Java JDK 访问 OpenLogic…

入行嵌入式的几个必备技能

嵌入式作为时下最热的行业之一&#xff0c;有不少朋友想入行却不得其法&#xff0c;这里为大家提供几个嵌入式行业必备的技能。 精通C语言 嵌入式系统中&#xff0c;精通C语言至关重要。 对于嵌入式软件开发者而言&#xff0c;掌握C语言是必要条件。在大学期间&#xff0c;你…

【8月19日】红帽openstack管理课程(CL210) 新一轮开课

课程介绍 通过实验室操作练习&#xff0c;学员将能够深入学习红帽企业 Linux OpenStack 平台各服务的手动安装方法&#xff0c;还将了解 OpenStack 开发社区的未来发展计划。 培训地点&#xff1a; 线下面授&#xff1a;苏州市姑苏区干将东路666号和基广场401室&#xff1b;…

原生微信小程序自定义picker多列选择器:picker写法用法

前言: 最近用原生微信小程序写法写医疗相关项目微信小程序&#xff0c;在编辑个人资料的时候&#xff0c;需要很多选择器&#xff0c;比如城市地区选择器&#xff0c;职业职称选择器&#xff0c;科室选择器&#xff0c;学校选择器&#xff0c;学历选择器&#xff0c;年份日期选…