RocketMQ之 Consumer,消费者消费原理解析

news2025/4/15 1:46:10
  • B站 https://www.bilibili.com/video/BV1rX4y1z72v
  • 在线学习文档 https://d9bp4nr5ye.feishu.cn/wiki/wikcnjjvso9uytlgVJBfKcJh1Kq

今天我们阅读源码的目的:在SpringBoot项目中,RocketMQ是如何通过 @RocketMQMessageListener 来进行消费的。

在SpringBoot项目中,我们要接收消息只需要使用一个注解就好了 @RocketMQMessageListener 下面的代码就是一个简单的 consumer 的案例。

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Component
@RocketMQMessageListener(consumerGroup = "my-consumer_sync-topic", topic = "test-topic")
public class consumer implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        byte[] body = messageExt.getBody();
        System.out.println(new String(body));
    }
}

下面我们就来通过源码的方式来解密它是如何工作的。

RocketMQAutoConfiguration


在我们的项目中,引入了 rocketmq-spring-boot-starter,我们知道在各种 starter中一定有一个自动注入,我们来看看 RocketMQ的自动注入做了什么。

在RocketMQAutoConfiguration中,它导入了一个ListenerContainerConfiguration

在这里插入图片描述


ListenerContainerConfiguration

@Configuration
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {}

ListenerContainerConfiguration实现了 SmartInitializingSingleton 接口,重写了 afterSingletonsInstantiated 方法,这个方法在单例bean实例化之后会被调用,来看看重写后的方法


afterSingletonsInstantiated()

@Override
public void afterSingletonsInstantiated() {
    Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
        .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

    beans.forEach(this::registerContainer);
}

它首先获取了全部使用 RocketMQMessageListener 注解的bean,然后调用 registerContainer 方法。

registerContainer(String beanName, Object bean)

private void registerContainer(String beanName, Object bean) {
    Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
    
    // 数据校验
    if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
        throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
    }
    if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
        throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
    }
    
    // 获取bean上面的注解
    RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
    // 获取注解上面的数据
    String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
    String topic = this.environment.resolvePlaceholders(annotation.topic());
    
    // 判断当前消费组对当前topic是否开启监听
    boolean listenerEnabled =
        (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
            .getOrDefault(topic, true);

    if (!listenerEnabled) {
        log.debug(
            "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
            consumerGroup, topic);
        return;
    }
    validate(annotation);
    
    // 把当前的bean注册到对应的容器中去
    String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
        counter.incrementAndGet());
    GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;

    genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
        // 创建这个bean对象
        () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
    DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
        DefaultRocketMQListenerContainer.class);
    if (!container.isRunning()) {
        try {
            // 【开启这个容器】
            container.start();
        } catch (Exception e) {
            log.error("Started container failed. {}", container, e);
            throw new RuntimeException(e);
        }
    }

    log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}

createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation)

这里注意,它把我们的监听器塞进去了,后面会用到这个的

private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
    RocketMQMessageListener annotation) {
    DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();

    container.setRocketMQMessageListener(annotation);

    String nameServer = environment.resolvePlaceholders(annotation.nameServer());
    nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
    String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
    container.setNameServer(nameServer);
    if (!StringUtils.isEmpty(accessChannel)) {
        container.setAccessChannel(AccessChannel.valueOf(accessChannel));
    }
    container.setTopic(environment.resolvePlaceholders(annotation.topic()));
    String tags = environment.resolvePlaceholders(annotation.selectorExpression());
    if (!StringUtils.isEmpty(tags)) {
        container.setSelectorExpression(tags);
    }
    container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
    container.setTlsEnable(environment.resolvePlaceholders(annotation.tlsEnable()));
    if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
        // 这里注意,它把我们的监听器塞进去了,后面会用到这个的
        container.setRocketMQListener((RocketMQListener) bean);
    } else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
        container.setRocketMQReplyListener((RocketMQReplyListener) bean);
    }
    container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
    container.setName(name);

    return container;
}

上面这个代码大家可以简单看下,反正最终创建的是 container 其实是 DefaultRocketMQListenerContainer

DefaultRocketMQListenerContainer

上面我们创建一个container,并开启了它 container.start(); 下面我们就来看看它的 start方法

start()

private DefaultMQPushConsumer consumer;

@Override
public void start() {
    if (this.isRunning()) {
        throw new IllegalStateException("container already running. " + this.toString());
    }

    try {
        consumer.start();
    } catch (MQClientException e) {
        throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
    }
    this.setRunning(true);

    log.info("running container: {}", this.toString());
}

DefaultMQPushConsumer

上面的 start 调用了 consumer.start(),其实就是 DefaultMQPushConsumer 的start方法

start()

protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

@Override
public void start() throws MQClientException {
    setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    this.defaultMQPushConsumerImpl.start();
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

DefaultMQPushConsumerImpl

在上面的 start() 方法里面又调用了 DefaultMQPushConsumerImpl 的 start() 方法

start()

// serviceState 的默认值
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;

public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            this.copySubscription();

            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }

            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            this.pullAPIWrapper = new PullAPIWrapper(
                mQClientFactory,
                this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }
            this.offsetStore.load();

            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService =
                    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
            } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService =
                    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
            }

            this.consumeMessageService.start();

            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }
            // 下一步的调用
            mQClientFactory.start();
            log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    this.mQClientFactory.checkClientInBroker();
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    this.mQClientFactory.rebalanceImmediately();
}

这里面的代码也很长了,我们看最后又有一个 start调用 mQClientFactory.start();

MQClientInstance

start()

private ServiceState serviceState = ServiceState.CREATE_JUST;

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

它这个start方法开启了更多的 start,这里我们看到有一个 pullMessageService.start() 以前我们不是说,注解的方式,其实也是主动拉去数据的方式么?毫无疑问,我们需要好好看看这个 pull的start方法。


PullMessageService

start()

PullMessageService 里面并没有 start 方法,而是调用父类 ServiceThread 的 start方法,在start方法中,开启了多线程的 start,我们知道只有等到资源来的时候就会去执行 run方法,所以我们可以看看PullMessageService 的run了

public class PullMessageService extends ServiceThread {}

public abstract class ServiceThread implements Runnable {}

public void start() {
    log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
    if (!started.compareAndSet(false, true)) {
        return;
    }
    stopped = false;
    this.thread = new Thread(this, getServiceName());
    this.thread.setDaemon(isDaemon);
    this.thread.start();
}

run()

很明显,从它的方法名字上就可以看到,它开始 pullMessage 了
它这里几乎是开启了一个死循环,如果当前线程不终止的话,会一直运行。

@Override
public void run() {
    log.info(this.getServiceName() + " service started");
    // 死循环
    while (!this.isStopped()) {
        try {
            // 从阻塞队列中拿到一个可用的请求
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

pullMessage(final PullRequest pullRequest)

通过消费组找到消费者,然后去拉消息

private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest);
    } else {
        log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
    }
}

DefaultMQPushConsumerImpl

pullMessage(final PullRequest pullRequest)

这个方法的代码很拉长,里面创建了一个 PullCallback,并且去重写了它的 onSuccess onException ,我觉得这种完全可以提出到外面。它的前半部分是组装各种参数,后半部分主要是重写 onSuccess方法,最后又调用了 pullKernelImpl 方法,这里我们直接看这个方法吧,对于onSuccess 方法我们后面会再看到

注意CommunicationMode.ASYNC,这个参数,后面会用到

public void pullMessage(final PullRequest pullRequest) {
    // 数据处理,埋点
    
    // 创建PullCallback,并重写里面的方法
    
    try {
        this.pullAPIWrapper.pullKernelImpl(
            pullRequest.getMessageQueue(),
            subExpression,
            subscriptionData.getExpressionType(),
            subscriptionData.getSubVersion(),
            pullRequest.getNextOffset(),
            this.defaultMQPushConsumer.getPullBatchSize(),
            sysFlag,
            commitOffsetValue,
            BROKER_SUSPEND_MAX_TIME_MILLIS,
            CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
            CommunicationMode.ASYNC,
            pullCallback
        );
    } catch (Exception e) {
        log.error("pullKernelImpl exception", e);
        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    }
}

PullAPIWrapper

pullKernelImpl

这个方法主要做两件事

  1. 找到要拉去的 broker
  2. 组装PullMessageRequestHeader ,然后下一步
public PullResult pullKernelImpl(
    final MessageQueue mq,
    final String subExpression,
    final String expressionType,
    final long subVersion,
    final long offset,
    final int maxNums,
    final int sysFlag,
    final long commitOffset,
    final long brokerSuspendMaxTimeMillis,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    FindBrokerResult findBrokerResult =
        this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
            this.recalculatePullFromWhichNode(mq), false);
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);
    }

    if (findBrokerResult != null) {
        {
            // check version
            if (!ExpressionType.isTagType(expressionType)
                && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                    + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
            }
        }
        int sysFlagInner = sysFlag;

        if (findBrokerResult.isSlave()) {
            sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
        }

        PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
        requestHeader.setConsumerGroup(this.consumerGroup);
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setQueueId(mq.getQueueId());
        requestHeader.setQueueOffset(offset);
        requestHeader.setMaxMsgNums(maxNums);
        requestHeader.setSysFlag(sysFlagInner);
        requestHeader.setCommitOffset(commitOffset);
        requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
        requestHeader.setSubscription(subExpression);
        requestHeader.setSubVersion(subVersion);
        requestHeader.setExpressionType(expressionType);

        String brokerAddr = findBrokerResult.getBrokerAddr();
        if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
            brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
        }

        PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
            brokerAddr,
            requestHeader,
            timeoutMillis,
            communicationMode,
            pullCallback);

        return pullResult;
    }

    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

MQClientAPIImpl
pullMessage
public PullResult pullMessage(
    final String addr,
    final PullMessageRequestHeader requestHeader,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);

    switch (communicationMode) {
        case ONEWAY:
            assert false;
            return null;
        case ASYNC:
            this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
            return null;
        case SYNC:
            return this.pullMessageSync(addr, request, timeoutMillis);
        default:
            assert false;
            break;
    }

    return null;
}

pullMessageAsync

它这里发送了一个 netty请求,如果结果不为空,就交给PullCallback 的 onSuccess 方法进行处理

private void pullMessageAsync(
    final String addr,
    final RemotingCommand request,
    final long timeoutMillis,
    final PullCallback pullCallback
) throws RemotingException, InterruptedException {
    this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
        @Override
        public void operationComplete(ResponseFuture responseFuture) {
            RemotingCommand response = responseFuture.getResponseCommand();
            if (response != null) {
                try {
                    PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
                    assert pullResult != null;
                    pullCallback.onSuccess(pullResult);
                } catch (Exception e) {
                    pullCallback.onException(e);
                }
            } else {
                if (!responseFuture.isSendRequestOK()) {
                    pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
                } else if (responseFuture.isTimeout()) {
                    pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
                        responseFuture.getCause()));
                } else {
                    pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
                }
            }
        }
    });
}

PullCallback

onSuccess

现在我们就可以回过头来看看,当时的 onSuccess 是如何重写的,这里如果是正常拉取到消息的时候 pullResult.getPullStatus() == FOUND 的。

public void onSuccess(PullResult pullResult) {
    if (pullResult != null) {
        pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
            subscriptionData);

        switch (pullResult.getPullStatus()) {
            case FOUND:
                // 对消息偏移量进行处理
                long prevRequestOffset = pullRequest.getNextOffset();
                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                long pullRT = System.currentTimeMillis() - beginTimestamp;
                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                    pullRequest.getMessageQueue().getTopic(), pullRT);

                long firstMsgOffset = Long.MAX_VALUE;
                if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                } else {
                    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                        pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                    // 把消息放进消息队列中,注意它这里不是直接立马消费,而是把消息放进容器中
                    // 这个容器是 treeMap, key是消息的偏移量,value 是消息体
                    boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                    // 【消息提交消费】
                    // 我们一般不是顺序消息,所以实现类是 ConsumeMessageConcurrentlyService,
                    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                        pullResult.getMsgFoundList(),
                        processQueue,
                        pullRequest.getMessageQueue(),
                        dispatchToConsume);

                    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                    } else {
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    }
                }

                if (pullResult.getNextBeginOffset() < prevRequestOffset
                    || firstMsgOffset < prevRequestOffset) {
                    log.warn(
                        "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                        pullResult.getNextBeginOffset(),
                        firstMsgOffset,
                        prevRequestOffset);
                }

                break;
            case NO_NEW_MSG:
            case NO_MATCHED_MSG:
                pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                break;
            case OFFSET_ILLEGAL:
                log.warn("the pull request offset illegal, {} {}",
                    pullRequest.toString(), pullResult.toString());
                pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                pullRequest.getProcessQueue().setDropped(true);
                DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                pullRequest.getNextOffset(), false);

                            DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                            DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                            log.warn("fix the pull request offset, {}", pullRequest);
                        } catch (Throwable e) {
                            log.error("executeTaskLater Exception", e);
                        }
                    }
                }, 10000);
                break;
            default:
                break;
        }
    }
}

ConsumeMessageConcurrentlyService

submitConsumeRequest

最终我们的消息会被封装成一个 ConsumeRequest, 然后丢到一个线程池中去消费

private final ThreadPoolExecutor consumeExecutor;
this.consumeExecutor = new ThreadPoolExecutor(
    this.defaultMQPushConsumer.getConsumeThreadMin(), // 20
    this.defaultMQPushConsumer.getConsumeThreadMax(), // 20
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.consumeRequestQueue,
    new ThreadFactoryImpl(consumeThreadPrefix));


@Override
public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        for (int total = 0; total < msgs.size(); ) {
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }

            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}

ConsumeRequest

ConsumeRequest 是 ConsumeMessageConcurrentlyService 的内部类 class ConsumeRequest implements Runnable,我们来看一下它的 run方法

这里面的代码也很多,主要是三点

  1. 找到 listener
  2. 让 listener 去处理消息
  3. 对 处理的结果 status 进行处理
@Override
public void run() {
    // ....
    // 找到 listener   
    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
    
    try {
        // ....
        // 消息处理并返回 status
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        // ...
    }
    
    // ...
    // 对结果状态处理
    if (!processQueue.isDropped()) {
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    } else {
        log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
    }
}

DefaultMessageListenerConcurrently

consumeMessage

这里几乎没做什么操作,把消息交给了 handleMessage 方法处理

@SuppressWarnings("unchecked")
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    for (MessageExt messageExt : msgs) {
        log.debug("received msg: {}", messageExt);
        try {
            long now = System.currentTimeMillis();
            handleMessage(messageExt);
            long costTime = System.currentTimeMillis() - now;
            log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
        } catch (Exception e) {
            log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
            context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }

    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

DefaultRocketMQListenerContainer

handleMessage

DefaultMessageListenerConcurrently 是 DefaultRocketMQListenerContainer 的内部类,所以这个handleMessage 是 DefaultRocketMQListenerContainer 的方法

而这个 DefaultRocketMQListenerContainer 在最开始就已经说了,在创建的时候就把我们的监听器塞进去了。

我们的监听器是重写了这个 onMessage 方法的,所以最终就到了我们的消费者

private void handleMessage(
    MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
    if (rocketMQListener != null) {
        // 消息消费
        rocketMQListener.onMessage(doConvertMessage(messageExt));
    } else if (rocketMQReplyListener != null) {
        Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
        Message<?> message = MessageBuilder.withPayload(replyContent).build();

        org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
        DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
        producer.setSendMsgTimeout(replyTimeout);
        producer.send(replyMessage, new SendCallback() {
            @Override public void onSuccess(SendResult sendResult) {
                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                    log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
                } else {
                    log.debug("Consumer replies message success.");
                }
            }

            @Override public void onException(Throwable e) {
                log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
            }
        });
    }
}

总结

  1. 每个使用了 @RocketMQMessageListener 注解的消费者,都会被解析成一个 ListenerContainer
  2. ListenerContainer 在解析出来后,就被开启了,它会运行一个死循环的代码(如果当前线程不终止的话,会一直运行),这段代码会不停的去 pull 消息
  3. 如果 pull到了消息,就会被丢入一个线程池,等待资源去处理消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/436217.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python数据结构-----leetcode用队列实现栈

目录 前言&#xff1a; 方法步骤 示例 Python代码实现 225. 用队列实现栈 前言&#xff1a; 上一期学习了怎么去通过两个栈来实现队列&#xff0c;同样这一期我就来讲讲怎么去通过两个队列来实现栈的功能&#xff0c;一起来学习吧。&#xff08;上一期链接Python数据结构--…

【Git基础】常用git命令(一)

文章目录 1. 创建仓库1.1 创建仓库1.2 git add和git commit① git add② git commit③ 工作区、暂存区和仓库 2. 创建git服务器2.1 服务器&#xff1a;2.2 本地2.3 修改配置信息 3. git基础原理3.1 四个区域3.2 工作流程3.3 文件的四种状态① git rm② git checkout 4.优雅的提…

setup的两个注意点

setup的两个注意点 首先&#xff0c;我们原本在v2中&#xff0c;父组件给子组件传递参数时&#xff0c;使用props来接收&#xff0c;当然除了这个方法外&#xff0c;我们还可以通过$attr来接收&#xff0c;只不过使用$atter就不能对父组件传来的参数进行类型的限定&#xff0c…

你是一个资深API接口爬虫程序员,现在需要你介绍一下如何通过商品id来获取商品数据并读取出来

获取商品数据通常需要使用API接口&#xff0c;根据接口文档中的说明传递商品id参数&#xff0c;并使用相应的请求方式&#xff08;通常为GET请求&#xff09;向API服务器发送请求即可。 以下是一个获取商品数据的示例请求&#xff1a; 首先打开API接口文档&#xff0c;找到获…

小航助学答题系统编程等级考试scratch二级真题2023年3月(含题库答题软件账号)

青少年编程等级考试scratch真题答题考试系统请点击 电子学会-全国青少年编程等级考试真题Scratch一级&#xff08;2019年3月&#xff09;在线答题_程序猿下山的博客-CSDN博客_小航答题助手 1.小猫的程序如图所示&#xff0c;积木块的颜色与球的颜色一致。点击绿旗执行程序后&a…

USB TO SPI / USB TO I2C 软件概要 7 --- 专业版调试器

所需设备&#xff1a; 1、USB 转 SPI / I2C 适配器&#xff1b; 软件概述&#xff1a; SPI类: USB TO SPI 1.0-Slave SPI从机软件&#xff0c;适合单步调试&#xff0c;支持SPI工作模式0、1、2、3&#xff0c;自动跟随主机通讯速率&#xff0c;自动接收数据&#xff1b; …

【算法宇宙——在故事中学算法】背包dp之完全背包问题

学习者不灵丝相传&#xff0c;而自杖明月相反&#xff0c;子来此事却无得失。 文章目录 前言正文小明的探险之旅&#xff08;2&#xff09;最后的优化代码 前言 尽管计算机是门严谨的学科&#xff0c;但正因为严谨&#xff0c;所以要有趣味才能看得下去。在笔者的前几篇算法类…

C#基础学习--LINQ

什么是LINQ 从对象获取数据的方法一直都是作为程序的一部分而设计的&#xff0c;然而使用LINQ可以很轻松的查询对象集合 LINQ提供程序 匿名类型 匿名类型经常用于LINQ查询的结果之中 匿名类型的对象创建表达式&#xff1a; using System; using System.Collections; using …

BiFormer:基于双层路由注意力的视觉Transformer

文章目录 摘要1、简介2、相关工作3、我们的方法:BiFormer3.1、预备知识&#xff1a;注意力3.2、双层路由注意(BRA)3.3、BRA的复杂性分析 4、实验4.1、ImageNet-1K图像分类4.2. 目标检测与实例分割4.3. 基于ADE20K的语义分割4.4、消融研究4.5、注意图可视化 5、局限性和未来工作…

C++ -3- 类和对象(中) | (三)END

文章目录 6.日期类的实现构造函数赋值运算符 “”前置、后置日期 - 日期日期类实现—代码汇总流插入流提取 7.const成员const 与 权限放大 8.取地址及const取地址操作符重载 6.日期类的实现 #pragma once #include <stdbool.h> #include <iostream> using namespa…

vue vue-json-viewer 展示 JSON 格式数据

1、下载 vue-json-viewer npm 下载 vue-json-viewer &#xff1a; // Vue2 npm install vue-json-viewer2 --save // Vue3 npm install vue-json-viewer3 --saveyarn 下载 vue-json-viewer &#xff1a; // Vue2 yarn add vue-json-viewer2 // Vue3 yarn add vue-json-view…

基于LS1028 TSN 交换机软件系统设计与实现(三)

NXP 推出 OpenIL 作为用于工业领域的 Linux 发行版&#xff0c; OpenIL 新增的部分中 含有&#xff1a;支持实时的操作系统的扩展和支持工业厂房中自动化 OEM 的 Time-Sensitive 网络。 OpenIL 作为开放型的工业 Linux 系统最大的优势便是将实时计算在网络中 的…

JavaWeb——UDP的报文结构和注意事项

目录 一、UDP特点 1、无连接 2、不可靠 3、面向数据报 4、全双工通信 二、UDP报文结构 1、报头 2、载荷 三、端口 四、报文长度 五、校验和 1、定义 六、注意事项 1、UDP只有接收缓冲区、没有发送缓冲区 2、UDP大小受限 3、基于UDP的应用层协议 4、MTU对UDP协议…

《Java8实战》第11章 用 Optional 取代 null

11.1 如何为缺失的值建模 public String getCarInsuranceName(Person person) { return person.getCar().getInsurance().getName(); } 上面的这种代码就很容易出现NullPointerException的异常。 11.1.1 采用防御式检查减少 NullPointerException 为了避免NullPointerExce…

【Linux】基础IO——文件操作|文件描述符|重定向|缓冲区

文章目录 一、文件操作1. 文件预备知识2. 回顾C文件操作3. 文件操作的系统调用标志位的传递openwriteread 二、文件描述符1. 文件描述符的理解2. 文件描述符的分配规则 三、重定向1. 重定向的本质2. dup2系统调用 四、缓冲区1. 缓冲区的刷新策略2. 缓冲区的位置3. 简单模拟实现…

当程序员的好处和坏处,我用七年经历来和大家聊一聊

我想和大家分享一下我做程序员这七年来的一些感受和经验&#xff0c;同时也想和大家聊一聊做程序员的好处和坏处&#xff0c;让大家真正深入了解程序员的工作&#xff0c;是不是和大家想象中的一样。 首先&#xff0c;我毕业于四川某不知名的二本院校&#xff0c;于2016年进入…

【软考备战·希赛网每日一练】2023年4月19日

文章目录 一、今日成绩二、错题总结第一题第二题第三题 三、知识查缺 题目及解析来源&#xff1a;2023年04月19日软件设计师每日一练 一、今日成绩 二、错题总结 第一题 解析&#xff1a; 第二题 解析&#xff1a; server-side n.服务器端 enterprise n.企业 client n.客户 d…

matplotlib的配色(随机颜色函数,各种渐变色,彩虹色)

也是画图的时候经常会遇到的问题&#xff0c;什么颜色好看&#xff1f; 先直接上一个配色表&#xff1a; plt官网&#xff1a;List of named colors — Matplotlib 3.8.0.dev898g4f5b5741ce documentation 需要什么颜色传入就行了。 例如我下面画一个柱状图&#xff0c;自己选…

ctfhub技能树 web sql注入

1.整型注入 页面正常时 判断注入字段数 ?id1 order by 2判断注入回显位 ?id-1 union select 1,2查数据库 ?id-1 union select 1,database()库名&#xff1a;sqli 查数据表 ?id-1 union select 1,group_concat(table_name) from information_schema.tables where tabl…

kotlin协程、线程切换,函数方法委托

kotlin协程、线程切换&#xff0c;函数方法委托 一般编程的技法&#xff0c;比如&#xff0c;在Android中&#xff0c;假设在主线程中实现了一个函数&#xff0c;但该函数是耗时操作&#xff0c;毫无疑问&#xff0c;需要将这个函数的实现切入非主线程中操作&#xff0c;那么可…