消息中间件之RocketMQ源码分析(三)

news2025/1/22 19:54:56

RocketMQ中的Consumer启动流程

RocketMQ客户端中有两个独立的消费者实现类分别为DefaultMQPullConsumer和DefaultMQPushConsumer,

DefaultMQPullConsumer

DefaultMQPullConsumer,该消费者使用时需要用户主动从Broker中Pull消息和消费消息,提交消费位点

继承关系图

在这里插入图片描述

核心属性

  • namesrvAddr:继承自ClientConfig,表示RocketMQ集群的Namesrv地址,如果是多个,则用逗号分开
    如:127.0.0.1:9876,127.0.0.2:9876
  • clientIP:使用客户端的程序所在机器的IP地址,目前支持IPV4和IPV6,同时排除了本地环会地址(127.0.xxx.xxx)
    和私有内网地址(192.168.xxx.xxx),如果在Docket中运行,获取的IP地址是容器所在的IP地址,而非宿主主机的IP地址
  • instanceName:实例名,顾名思义每个实例都需要取不一样的名字,加入要在多个机器上部署
    多个程序进程,那么每个进程的实例名必须不相同,否则程序会启动失败,因为在创建MQClient时,
    会用到IP和instancename名称来
    在这里插入图片描述

在这里插入图片描述

  • vipChannelEnabled:这是一个boolean值,表示是否开启VIP通道。VIP通道和非VIP通道的区别是使用不同的端口号进行通信
  • clientCallbackExecutorThreads:客户端回调线程数。该线程数等于Netty通信层回调线程的个数,默认值为
    Runtime.getRuntime().availableProcessors();表示当前有效的CPU个数
  • pollNameServerInterval:获取Topic路由信息间隔,单位为ms,默认为30000ms(30s)
  • heartbeatBrokerInterval:客户端和Broker心跳间隔,单位为ms,默认30000ms(30s)
  • persistCOnsumerOffsetInterval:持久化消费位点时间间隔,单位为ms,默认为5000ms(5s)
  • defaultMQPullConsumer:默认pull消费者的具体实现
  • consumerGroup:消费者组名字
  • brokerSuspendMaxTimeMills:在长轮询模式下,Broker的最大挂起请求时间,建议不要修改此值
  • consumerTimeoutMillsWhenSuspend:在长轮询模式下,消费者的最大请求超时时间,必须比brokerSuspendMaxTimeMills大,不建议修改
  • messageModel:消费模式,现在支持集群模式消费和广播模式消费
  • messageQueueListener:消息路由信息变化时回调处理监听器,一般在重新平衡时被调用
  • offsetStore:位点存储模块。集群模式位点会持久化到Broker中,广播模式持久化到本地文件中(某个实例消费失败,生产者也不会重发),位点存储模块有两个实现类,分别为RemoteBrokerOffsetStore和LocalFileOffsetStore
  • allocateMessageQUeueStrategy:消费Queue分配策略管理器,默认是平均分配策略
    private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
  • maxReconsumeTimes:最大重试次数,可以配置

核心方法

  • registerMessageQueueListener():注册队列变化监听器,当队列发生变化是会被监听到
    在这里插入图片描述

  • pull():从Broker中Pull消息,如果有PullCallback参数,则表示异步拉取
    在这里插入图片描述

  • pullBlockIfNotFound():长轮询方式拉取,如果没有拉取到消息,那么Broker会讲请求Hold住一段时间,
    当有消息来临时再发送pull请求
    在这里插入图片描述

  • updateConsumeOffset():更新某一个Queue的消费位点
    在这里插入图片描述

  • fetchConsumeOffset():查找某个Queue的消费位点
    在这里插入图片描述

  • sendMessageBack():如果消费发送失败,则可以讲消息重新发回Broker,这个消费者组延迟一段时间后可以再消费(也就是重试)
    在这里插入图片描述

  • fetchSubscribeMessageQueues():获取一个Topic的全部Queue信息

在这里插入图片描述

Pull启动流程

在这里插入图片描述

  • 1.最初创建defaultMQPullConsumerImpl时的状态为ServiceState.CREATE_JSUT,然后设置消费者的默认启动状态为失败

在这里插入图片描述
在这里插入图片描述

  • 2.检查消费者的配置比,如消费者组名、消费类型、Queue的分配策略等参数是否符合规范,将订阅关系数据发给Rebalance服务对象
    在这里插入图片描述
    在这里插入图片描述

  • 3.校验消费者实例名,如果时默认的名字,则更改为当前的程序进程id
    在这里插入图片描述

  • 4.获取一个MQClientInstance,如果MQClientInstance已经初始化,则直接返回初始化的实例。这是核心对象,每个ClientID缓存一个实例
    在这里插入图片描述

  • 5.设置Rebalance对象消费组、消费类型、Queue分配策略、MQClientInstance等参数
    在这里插入图片描述

  • 6.对BrokerAPI的封装类pullAPIWrapper进行初始化,同时注册消息,过滤filter
    在这里插入图片描述

  • 7.初始化位点管理器并加载位点信息,位点管理器分为本地管理和远程管理,集群消费时
    消费位点保存在Broker中,由远程管理器管理,广播消息时位点在本地,由本地管理其管理
    在这里插入图片描述

  • 8.本地注册消费者实例,如果注册成功,则表示消费者启动成功
    在这里插入图片描述

DefaultMQPushConsumer

大部分属性、方法和DefaultMQPullConsumer是一样的

核心属性和方法

  • defaultMQPushConsumerImpl:默认的Push消费者具体实现类
  • consumeFromWhere:一个枚举,表示从什么位点开始消费,
    CONSUME_FROM_LAST_OFFSET:默认从上次消费的位点开始消费,相当于断点继续
    CONSUME_FROM_TIMESTAMP:从指定时间开始消费
    CONSUME_FROM_FIRST_OFFSET:从ConsumeQueue的最小位点开始消费
  • consumeTimestamp:表示从哪一时刻开始消费,时间格式为yyyyMMDDHHmmss,默认半小时前,当consumeFromWhere=CONSUME_FROM_TIMESTAMP时,consumeTimestamp设置的值才生效
  • allocateMessageQueueStrategy:消费者订阅topic-queue策略
  • subscription:订阅关系,表示当前消费者订阅了哪些Topic的哪些Tag
  • messageListener:消息Push回调监听器
  • consumeThreadMin:最小消费线程数,必须小于consumeThreadMax
    consumeThreadMax:最大线程数,必须大于consumeThreadMin
  • adjustThreadPoolNumsThreshold:动态调整消费线程池的线程数大小,开源版本不支持
  • consumeConcurrentlyMaxSpan:并发消息的最大位点差,,如果Pull消息的位点差超过该值,拉取变慢
  • pullThresholdForQueue:一个Queue能缓存的最大消息数,超过该值则采取拉取流控措施,默认是1000
  • pullThresholdSizeForQueue:一个Queue最大能缓存的消息字节数,单位是MB,默认是10MB
  • pullThresholdForTopic:一个Topic最大能缓存的消息数。超过该值则采取拉取流控措施,该字段值默认是-1,该值根据pullThresholdForQueue的配置决定是否生效,pullThresholdForTopic的优先级低于pullThresholdForQueue
  • pullThreasholdSizeForTopic:一个Topic最大能缓存的消息字节数,单位是MB,默认为-1,结合pullThresholdSizeForQueue配置项生效,该配置项的优先级低于pullThresholdSizeForQueue
  • pullInterval:拉取间隔,单位为ms
  • consumeMessageBatchMaxSize:消费者每次批量消费时,最多消费多少条消息,默认是1
  • pullBatchSize:一次最大拉取多少条消息,默认32条
  • postSubscriptionWhenPull:每次拉取消息时是否更新订阅关系,默认false
  • maxReconsumeTimes:最大重试次数,默认-1,表示最大重试次数为16次
  • suspendCurrentQueueTimeMillis为段轮询场景设置的挂起时间,比如顺序消息场景
  • consumeTimeout:消费超时时间,单位为min,默认是15

Push启动流程

在这里插入图片描述

  • 1-7和Pull模式类似
  • 8.初始化消费服务并启动,之所以用户"感觉"消息是Broker主动推送给自己的,
    是因为DefaultMQPushConsumer通过Pull服务将消息
    拉取到本地,再通过Callbakc的形式,将本地消息Push给用户的消费代码,
    DefaultMQPushConsumer和DefaultMQPullConsumer
    获取消息的方式一样,本质上都是拉取。

消费服务分为两种,即并行消费服务和顺序消费服务,对应的实现类分别是
ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService
根据用户监听器继承的不同接口初始化不同的消费服务程序

在这里插入图片描述

  • 9.启动MQClientInstance实例
    在这里插入图片描述
  • 10.更新本地订阅关系和路由信息,通过Broker检查是否支持消费者的过滤类型;
    向集群中的所有Broker发送消费者组的心跳信息
    在这里插入图片描述
  • 11.立即执行一次Rebalance
    this.mQClientFactory.rebalanceImmediately();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1425171.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Altium Designer的学习

PCB设计流程 1.新建空白工程: 创建一个新的工程 新建四个文件,并且保存: 每次打开文件时,打开以.PrjPcb结尾的文件 2.元件符号的创建: 在绘制图形的时候设置成10mil,为了在原理图中显得不那么大。 在绘制引脚的时候设…

外星人入侵(python)

前言 代码来源《python编程从入门到实践》Eric Matthes 署 袁国忠 译 使用软件:PyCharm Community Editor 2022 目的:记录一下按照书上敲的代码 alien_invasion.py 游戏的一些初始化设置,调用已经封装好的函数方法,一个函数的…

将vant地区数据改为label value children格式

以下代码放到nodejs中运行 a.js文件内容,vant的数据,来自import { areaList } from vant/area-data,形如: const fs require(fs);const a require(./a.js);const b transformData(a); fs.writeFileSync(./b.js, JSON.string…

STM32G4 系列命名规则

STM32G4产品线 基础型系列STM32G4x1 具有入门级模拟外设配置,单存储区Flash,支持的Flash存储器容量范围从32到512KB。 增强型系列STM32G4x3 与基本型器件相比具有更多数量的模拟外设,以及双存储区Flash,Flash存储器容量也提高…

如何在Windows系统使用Plex部署影音服务与公网访问本地资源【内网穿透】

文章目录 1.前言2. Plex网站搭建2.1 Plex下载和安装2.2 Plex网页测试2.3 cpolar的安装和注册 3. 本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4. 公网访问测试5. 结语 正文开始前给大家推荐个网站,前些天发现了一个巨牛的 人工智能学习网站, 通…

Spring-mybatis

怎样通过Spring整合Mybatis来实现业务 目录 1.导入依赖 <dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency>&l…

快乐学Python,如何正确使用pandas处理时间类型的数据?

在日常的数据分析工作中&#xff0c;常常会有根据日期来对数据进行分析。比如我们需要通过用户的下单时间来分析用户在不同时间段对商品的喜好&#xff1b;如通过访问日志的访问时间来分析系统的访问周期和负载&#xff0c;为不同时间段的资源调配提供依据&#xff1b;如通过用…

vit细粒度图像分类(六)TransFC学习笔记

1.摘要 从判别局部区域学习特征表示在细粒度视觉分类中起着关键作用。利用注意机制提取零件特征已成为一种趋势。然而&#xff0c;这些方法有两个主要的局限性:第一&#xff0c;它们往往只关注最突出的部分&#xff0c;而忽略了其他不明显但可区分的部分。其次&#xff0c;他们…

2024不可不会的StableDiffusion之拼接各组件(五)

1. 引言 在之前的文章中&#xff0c;我介绍了如何安装扩散器库diffuser用以生成 AI 图像和构成stable diffusion的各个关键组件&#xff0c;即 CLIP 文本编码器、VAE 和 U-Net。在这篇文章中&#xff0c;我们将尝试把这些关键组件放在一起&#xff0c;并详细展示生成图像的扩散…

如何在Shopee菲律宾市场进行选品:策略和建议

在Shopee菲律宾市场进行选品时&#xff0c;卖家需要采取一系列策略和建议&#xff0c;以确保他们的产品能够在这个市场上取得成功。这篇文章将介绍一些关键的策略和建议&#xff0c;帮助卖家更好地了解市场趋势、关注热销品类、满足消费者需求、创新营销手段、优化供应链管理、…

大数据分析|从七个特征理解大数据分析

文献来源&#xff1a;Saggi M K, Jain S. A survey towards an integration of big data analytics to big insights for value-creation[J]. Information Processing & Management, 2018, 54(5): 758-790. 下载链接&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1…

如何进行有效的Shopee新店选品

在Shopee平台上开设新店是一个令人兴奋的时刻&#xff0c;但是在开始销售之前&#xff0c;进行有效的选品是至关重要的一步。选品的质量和策略将直接影响你的市场竞争力和销售业绩。下面是一些建议&#xff0c;可以帮助你进行有效的Shopee新店选品。 先给大家推荐一款shopee知虾…

❤ 做一个自己的AI智能机器人吧

❤ 做一个自己的AI智能机器人 看了扣子&#xff08;coze&#xff09;的模型&#xff0c;字节基于chatgpt搭建的一个辅助生成AI的网站&#xff0c;感觉蛮有意思&#xff0c;看了掘金以后&#xff0c;于是动手自己也实现了一个。 官网 https://www.coze.cn/ 进入的网站 1、 创…

算法模板 1.前缀和

前缀和&#xff1a;以O(1)的时间求解一段区间的和&#xff0c;空间复杂度O(n) 一维前缀和 795. 前缀和 - AcWing题库 #include <bits/stdc.h> using namespace std; const int N 100010; int a[N],s[N]; int main(){int n,m;scanf("%d%d",&n,&m);fo…

跟着cherno手搓游戏引擎【15】DrawCall的封装

目标&#xff1a; Application.cpp:把渲染循环里的glad代码封装成自己的类&#xff1a; #include"ytpch.h" #include "Application.h"#include"Log.h" #include "YOTO/Renderer/Renderer.h" #include"Input.h"namespace YO…

个人建站前端篇(二)项目采用服务端渲染SSR

SSR的优点 更好的SEO首屏加载速度更快&#xff0c;用户体验更好可以使用相同的语言以及相同的声明式、面向组件的心智模型来开发整个应用&#xff0c;而不需要在后端模板系统和前端框架之间来回切换。 Vue生态中的SSR通用解决方案 Nuxt是一个构建于 Vue 生态系统之上的全栈框…

Modelarts自动学习之旅,实现智慧食堂的人脸识别提示优化

前言 最近公司食堂进行了升级&#xff0c;不但餐盘更换为智能餐盘&#xff0c;且结账的时候可以刷脸支付。 这些升级让排队结账的速度提升了很多&#xff0c;且食堂员工效率也随之提高了很多。果然&#xff0c;科技改变世界。 我观察了一下&#xff0c;智能餐盘基本没有卡顿…

最新2024如何解决谷歌浏览器Chrome谷歌翻译无法使用问题

快速恢复谷歌浏览器一键翻译功能在Chrome 中安装好【翻译】插件 Macbook 操作步骤&#xff1a; 1点击“前往”&#xff0c;打开“前往文件夹” 2 在对话框中输入“/etc” 囝找到“hosts”文件&#xff0c;复制粘贴到桌面 3 在复制的文件最后新起一行&#xff0c;输入并保存&am…

UnityShader(十二)实现标准光照模型中的高光反射

目录 基本光照模型中的高光反射公式&#xff1a; 逐顶点光照 逐像素光照 基本光照模型中的高光反射公式&#xff1a; 从公式可以看出 要计算高光反射需要知道四个参数&#xff1a;入射光线的颜色和强度clight&#xff0c;材质的高光反射系数mspecular&#xff0c;视角方向v以…

怿星科技荣膺星河智联“2023年度卓越供应商”,共创智能座舱新未来

1月19日&#xff0c;在星河智联2023年度卓越供应商评选活动中&#xff0c;怿星科技凭借卓越的产品和优质的服务&#xff0c;以及在项目管理、设计开发和成本控制等多方面的出色表现&#xff0c;荣获了“年度卓越供应商”的荣誉称号。 添加图片注释&#xff0c;不超过 140 字&am…