RocketMq源码分析(六)--消息消费者启动流程

news2024/12/27 13:39:01

文章目录

  • 一、消息消费者模式
  • 二、消费者启动流程
    • 1、 push模式
      • 1)类关系
      • 2)类构造器
      • 3)启动流程
    • 2、pull模式
      • 1)类关系
      • 2)类构造器
      • 3)启动流程

一、消息消费者模式

  消息消费分两种模式:推(push)和拉(pull)
  消费由接口MQConsumer定义,由两个方法构成,代码如下:

/**
 * Message queue consumer interface
 */
public interface MQConsumer extends MQAdmin {
    /**
     * If consuming failure,message will be send back to the broker,and delay consuming some time
     */
    void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

    /**
     * Fetch message queues from consumer cache according to the topic
     *
     * @param topic message topic
     * @return queue set
     */
    Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException;
}

二、消费者启动流程

1、 push模式

1)类关系

  push模式接口定义为MQPushConsumer,其继承自MQConsumer 接口,默认实现类DefaultMQPushConsumer,类关系如下图:
在这里插入图片描述

2)类构造器

  DefaultMQPushConsumer类核心构造器如下

    /**
     * Constructor specifying namespace, consumer group, RPC hook and message queue allocating algorithm.
     *
     * @param namespace Namespace for this MQ Producer instance.
     * @param consumerGroup Consume queue.
     * @param rpcHook RPC hook to execute before each remoting command.
     * @param allocateMessageQueueStrategy Message queue allocating algorithm.
     */
    public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        //消费者所属组
        this.consumerGroup = consumerGroup;
        //生产者实例的命名空间
        this.namespace = namespace;
        //消息队列分配算法
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        //push模式消费内部实现
        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
    }

参数说明:

  • consumerGroup :消费者所属组
  • namespace :生产者实例的命名空间
  • allocateMessageQueueStrategy :消息队列分配算法
  • defaultMQPushConsumerImpl :push模式消费内部实现

其他类成员变量

  • messageModel:消费模式,有集群模式(BROADCASTING)和广播模式(CLUSTERING)两种,默认CLUSTERING
  • consumeFromWhere:从消息服务器拉取不到消息时使用的重新计算消费策略,默认CONSUME_FROM_LAST_OFFSET
  • consumeThreadMin:最小消费线程数,默认20
  • consumeThreadMax:最大消费线程数,默认20
  • pullBatchSize:每次拉取消息的条数,默认32
  • postSubscriptionWhenPull:是否每次拉取消息后更新订阅信息,默认false

3)启动流程

  push模式下,消费者启动通过DefaultMQPushConsumer#start方法启动,start方法代码如下:

    /**
     * This method gets internal infrastructure readily to serve. Instances must call this method after configuration.
     *
     * @throws MQClientException if there is any client error.
     */
    @Override
    public void start() throws MQClientException {
        //设置消费者分组
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        //启动内部消费者
        this.defaultMQPushConsumerImpl.start();
        //异步数据分发器
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

  可以看出,消费者启动时,start方法主要主要做了3个事情:设置消费者分组、调用内部消费者实现实例start方法、如果存在则启动消息异步分发器
  让我们来看看内部消费者实现(DefaultMQPushConsumerImpl)启动时做了哪些事情。

  • 1、DefaultMQPushConsumerImpl在执行start方法时,先对本实例服务的状态(serviceState)进行判断。

//服务实例状态,有:CREATE_JUST、RUNNING、SHUTDOWN_ALREADY、START_FAILED,默认CREATE_JUST
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;

  如果是刚创建实例时(CREATE_JUST状态),则先将附实例状态更改为START_FAILED,然后进行一些列初始化并启动操作(检查配置、复制订阅信息、设置负载均衡实现、创建并加载消息进度offsetStore、创建并启动消息消费consumeMessageService服务、向消费者工厂注册自己),最后变更为RUNNING状态。
  如果是其他状态,则放弃初始化并提示。

switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                //启动前先将服务状态设置为failed
                this.serviceState = ServiceState.START_FAILED;
                //检查配置信息
                this.checkConfig();
                //复制订阅信息
                this.copySubscription();

                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }

                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            //广播模式,消息进度存储在消费端
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            //集群模式,消息进度存储在broker端
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();

                //根据消息是否顺序创建消费服务
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }

                this.consumeMessageService.start();

                //注册消费者
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                //只会启动一次
                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
  • 2、更新订阅主题信息
 this.updateTopicSubscribeInfoWhenSubscriptionChanged();
  • 3、向broker发送检查客户端命令
this.mQClientFactory.checkClientInBroker();
  • 4、向所有broker发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  • 5、立即启动负载均衡服务
this.mQClientFactory.rebalanceImmediately();

  至此,push模式消费者启动完成

2、pull模式

  pull模式原实现DefaultMQPullConsumer已过时,官方退出新pull模式实现类DefaultLitePullConsumer

1)类关系

  pull模式由LitePullConsumer接口定义,默认实现类为DefaultLitePullConsumer,类关系如下:
在这里插入图片描述

2)类构造器

  DefaultLitePullConsumer类构造器代码如下:

    /**
     * Constructor specifying namespace, consumer group and RPC hook.
     *
     * @param consumerGroup Consumer group.
     * @param rpcHook RPC hook to execute before each remoting command.
     */
    public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
        //生产者实例命名空间
        this.namespace = namespace;
        //消费者所属组
        this.consumerGroup = consumerGroup;
        //是否允许steam流请求类型
        this.enableStreamRequestType = true;
        //默认内部消费实现
        defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
    }

参数说明:

  • namespace :生产者实例的命名空间
  • consumerGroup:消费者所属组
  • enableStreamRequestType:是否允许steam流请求类型
  • defaultLitePullConsumerImpl:pull模式默认内部消费实现

其他类成员变量

  • messageModel:消费模式,默认CLUSTERING
  • autoCommit:是否自动提交消息偏移量,默认true
  • pullThreadNums:拉取消息线程数,默认20
  • pullBatchSize:每次拉取消息的条数,默认10
  • consumeFromWhere:从消息服务器拉取不到消息时使用的重新计算消费策略,默认CONSUME_FROM_LAST_OFFSET

3)启动流程

  pull模式,消息启动入口为DefaultLitePullConsumer#start方法,代码如下:

    @Override
    public void start() throws MQClientException {
        //初始化异步数据分发器
        setTraceDispatcher();
        //设置消费者所属组
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        //启动内部消息消费实现
        this.defaultLitePullConsumerImpl.start();
        //启动异步数据分发器
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

  可以看出,pull消费实现主要做了3个事情:初始化并启动消息异步数据分发器、设置消费者所属组、启动内部消息消费实现(defaultLitePullConsumerImpl)
  现在,让我们来看看内部消息消费实现(defaultLitePullConsumerImpl)启动时做了哪些事情:

  • 与push模式类似,pull模式defaultLitePullConsumerImpl执行start方法时,同样根据内部服务实例状态(serviceState)进行判断,如果是START_FAILED状态,则进行消费者初始化操作;如果其他状态则放弃并提示错误
    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                //先设置为failed状态
                this.serviceState = ServiceState.START_FAILED;
                //检查配置
                this.checkConfig();

                if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultLitePullConsumer.changeInstanceNameToPID();
                }
                //初始化MQClientInstance,注册消费者实例
                initMQClientFactory();
                //初始化消息消费负载均衡实现
                initRebalanceImpl();
				//初始化消息拉取包装器,注册消息过滤HOOK
                initPullAPIWrapper();
                //初始化消息消费进度
                initOffsetStore();
                //启动MQClientInstance
                mQClientFactory.start();
                //定时任务更新消息队列
                startScheduleTask();
                //服务实例状态更新为RUNNING
                this.serviceState = ServiceState.RUNNING;

                log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());
                //向broker发送检查客户端命令
                operateAfterRunning();

                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
    }

  至此,pull模式消费者启动流程完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/483280.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Redis】Redis缓存双写一致性之更新策略

介绍 面试题 1、只要用到缓存&#xff0c;就可能会涉及到Redis缓存与数据库双存储双写&#xff0c;只要是双写&#xff0c;就一定会有数据一致性问题&#xff0c;怎么解决一致性问题&#xff1f; 2、双写一致性&#xff0c;先动缓存redis还是数据库mysql&#xff1f;为什么&a…

剪枝与重参第十课:RepVGG重参

目录 RepVGG重参前言1. RepVGG2. RepVGG网络搭建2.1 conv_bn2.2 RepVGG Block初始化2.3 forward2.4 branch的合并2.5 重参的实现2.6 整体网络结构搭建2.7 模型导出 3. 完整示例代码总结 RepVGG重参 前言 手写AI推出的全新模型剪枝与重参课程。记录下个人学习笔记&#xff0c;仅…

了解npm run指令

了解npm run指令 在package.json文件中的script字段&#xff0c;可以定义脚本命令&#xff0c;通过npm run指令运行该脚本。 比如简单定义一个输出打印的shell脚本。 {"script": {"hw": "echo hello world!"} }执行npm run hw可以看到终端上打…

Python---正则表达式与递归

1. 正则表达式&#xff1a; 是一种字符串验证的规则&#xff0c;通过特殊的字符串组合来确立规则 用规则去匹配字符串是否满足 如(^[\w-](\.[\w-])*[\w-](\.[\w-])$)可以表示为一个标准邮箱的格式 re模块的三个主要方法&#xff1a; re.match&#xff1a; re.match(匹配规…

电子电气架构——车辆E/E架构Software独立性

我是穿拖鞋的汉子,魔都中坚持长期主义的工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 人只有在举棋不定,无从把握的时候才感到疲惫。只有去行动就能获得解放,哪怕做的不好也比无所作为强! 本文主要介绍车辆E/E架构常识,主要涉及E/E架构车载…

Java 操作ElasticSearch

Java REST提供了两种风格的客户端连接工具&#xff0c;Java High Level REST Client、Java Low Level REST Client&#xff0c;这里我就不去细说Java Low Level REST Client了&#xff0c;因为这我确实没用到过&#xff0c;也不是很了解&#xff0c;我说一下Java High Level RE…

LVS 负载均衡群集的 NAT 模式和 DR 模式

文章目录 一、NAT 模式和 DR 模式的介绍DR模式NAT模式两种模式的区别 二、DR模式集群构建配置 一、NAT 模式和 DR 模式的介绍 DR模式 当用户向负载均衡调度器&#xff08;Director Server&#xff09;发起请求&#xff0c;调度器将请求发往至内核空间PREROUTING链首先会接收到…

【JavaEE】HTML基础知识

目录 1.HTML结构 2.HTML常见标签 3.表格标签 4.列表标签 5.表单标签 ​6.select 标签 7.textarea 标签 8.无语义标签: div & span 9.标签小练习 1.HTML结构 形如&#xff1a; <body idmyId>hello</body> HTML的书写格式 标签名 (body) 放到 <…

【操作系统OS】学习笔记:第二章 进程与线程 (上)【哈工大李治军老师】

基于本人观看学习 哈工大李治军老师主讲的操作系统课程 所做的笔记&#xff0c;仅进行交流分享 特此鸣谢李治军老师&#xff0c;操作系统的神作&#xff01; 如果本篇笔记帮助到了你&#xff0c;还请点赞 关注 支持一下 ♡>&#x16966;<)!! 主页专栏有更多&#xff0c;…

58.网页设计规则#5_阴影

一些概念 ● 在一个100%平面设计的时代之后&#xff0c;我们现在又回到了在UI设计中使用阴影(“平面设计2.0”) ● 阴影箱深度(3D):阴影越多&#xff0c;离元素界面越远 利用好阴影 你不必使用阴影!只在对网站个性有意义的情况下使用它们 使用少量的阴影:不要给每个元素…

Jmeter接口自动化测试系列之Http接口自动化实战

以下主要介绍Jmeter接口自动化需要哪些控件、接口自动化实战及总结。 前面的系列文章&#xff0c;介绍了常用组件、参数化、接口依赖、断言等知识点&#xff0c;今天我们要将这些结合起来&#xff0c;进行综合实战。 2023年B站最新Jmeter接口测试实战教程&#xff0c;精通接口…

从一到无穷大 #5 公有云时序数据库定价

文章目录 引言serverless实例售卖结论 Azure CosmosDB预配吞吐量自动缩放吞吐量Serverless预留容量存储量 Amazon Timestream写入计费查询计费存储 阿里云TSDB阿里云Lindom时序引擎实例固定费用存储费用节点费用 华为云GaussDB(for Influx)腾讯云CTSDBTDengineInfluxDB CloudAW…

c++11下篇 + 智能指针

c11下篇 智能指针 1 可变参数模板1.1 递归函数方式展开参数包1.2 逗号表达式展开参数包1.3 STL容器中的empalce相关接口函数&#xff1a; 2 lambda达式2.1 c的痛2.2 lambda表达式语法2.3 函数对象与lambda表达式 3 包装器3.1 bind 4 线程库4.1 thread类的简单介绍4.2 面试题&a…

MySQL示例数据库(MySQL Sample Databases) 之 world_x数据库

文章目录 MySQL示例数据库(MySQL Sample Databases) 之 world_x数据库官方示例数据介绍world_x数据库world_x数据库安装world-db/world.sql的脚本内容参考 MySQL示例数据库(MySQL Sample Databases) 之 world_x数据库 官方示例数据介绍 MySQL 官方提供了多个示例数据库&#…

差分数组 技巧小记

差分数组 差分数组二维差分 差分数组 如果两个信息“长得很像”&#xff0c;只要保留一个&#xff0c;对另一个&#xff0c;只要保留它们的差异&#xff0c;然后进行微调就行了。 差分数组&#xff1a; 3210&#xff0c;3208&#xff0c;3206&#xff0c;3211&#xff0c;32…

Three.js--》Gui.js库的使用讲解

目录 Gui.js库基本使用 使用three自带gui库实现基本操作 gui库实现下拉菜单和单选框 gui库分组方法实现 使用dat.gui第三方库 Gui.js库基本使用 gui.js说白了就是一个前端js库&#xff0c;对HTML、CSS和JavaScript进行了封装&#xff0c;学习开发3d技术时借助该库可以快速…

230502-LLM-Vicuna介绍、安装与注意事项整理

最终效果 在对话过程中&#xff0c;GPU与CPU均会有波动&#xff0c;但是主要还是CPU波动为主 相关资料 序号链接说明001本地CPU6G内存部署类ChatGPT模型&#xff08;Vicuna 小羊驼&#xff09; - 知乎极简安装版本&#xff0c;只支持CPU与命令行002最新开源语言模型 Vicuna 媲…

【Linux进阶之路】初始Linux

文章目录 一.时代背景二.硅谷发展模式三.操作系统基本定义常见的操作系统Linux系统的常见安装方式 四.基本指令的使用登录指令与用户相关的指令ls 指令——信息查看pwd指令——打印当前所处的文件位置cd指令——访问文件rm——删除指令touch——创建文件与修改文件信息tree ——…

Rust - 变量与数据的交互方式(clone)

在上一篇文章中我们介绍了变量与数据的交互方式-move&#xff0c;通过底层原理我们知道Rust 永远也不会自动创建数据的 “深拷贝”。因此&#xff0c;任何 自动的复制可以被认为对运行时性能影响较小。 但是如果我们 确实需要深度复制 String中堆上的数据&#xff0c;而不仅仅…

RT1010 PWM 组成配置和 PWMX 的使用

1. 前言 本篇博文将着眼于 i.MX RT1010 内部的 eFlexPWM&#xff0c;介绍其各个功能模块&#xff0c;以及 PWM 产生的原理。 2. 功能模块组成 以下是 RT1010 内部 PWM 的一个 Submoudle 的组成框图&#xff0c;从框图中我们可以看到&#xff1a; 自左向右依次有 Prescaler 对…