RocketMQ教程-(6-5)-运维部署-Promethus Exporter

news2025/1/12 20:50:14

介绍​

Rocketmq-exporter 是用于监控 RocketMQ broker 端和客户端所有相关指标的系统,通过 mqAdmin 从 broker 端获取指标值后封装成 87 个 cache。

警告

过去版本曾是 87 个 concurrentHashMap,由于 Map 不会删除过期指标,所以一旦有 label 变动就会生成一个新的指标,旧的无用指标无法自动删除,久而久之造成内存溢出。而使用 Cache 结构可可以实现过期删除,且过期时间可配置。

Rocketmq-expoter 获取监控指标的流程如下图所示,Expoter 通过 MQAdminExt 向 MQ 集群请求数据,请求到的数据通过 MetricService 规范化成 Prometheus 需要的格式,然后通过 /metics 接口暴露给 Promethus。 

957681249485

Metric 结构​

Metric 类位于 org.apache.rocketmq.expoter.model.metrics 包下,实质上是一些实体类,每个实体类代表一类指标, 总共 14 个 Metric 类。这些类作为 87 个 Cache 的 key, 用不同的 label 值进行区分。

实体类中包含了 LABEL 的三个维度:BROKER、CONSUMER、PRODUCER

  • broker 相关 metric 类有: BrokerRuntimeMetric、BrokerMetric、DLQTopicOffsetMetric、TopicPutNumMetric

  • 消费者相关类有: ConsumerRuntimeConsumeFailedMsgsMetric 、ConsumerRuntimeConsumeFailedTPSMetric 、ConsumerRuntimeConsumeOKTPSMetric、ConsumerRuntimeConsumeRTMetric、ConsumerRuntimePullRTMetric、ConsumerRuntimePullTPSMetric、ConsumerCountMetric、ConsumerMetric、ConsumerTopicDiffMetric

  • 生产者相关 metric 类有: ProducerMetric

Prometheus 拉取 metrics 的过程​

RocketMQ-exporter 项目和 Prometheus 相当于服务器和客户端的关系,RocketMQ-exporter 项目引入了 Prometheus 的 client 包,该包中规定了需要获取的信息的类型即项目中的 MetricFamilySamples 类,Prometheus 向 expoter 请求 metrics,expoter 将信息封装成相应的类型之后返回给 Prometheus。

rocketmq-expoter 项目启动后,会获取 rocketmq 的各项 metrics 收集到 mfs 对象中,当浏览器或 Prometheus 访问相应的接口时,会通过 service 将 mfs 对象中的 samples 生成 Prometheus 所支持的格式化数据。主要包含以下步骤:

浏览器通过访问 ip:5557/metrics,会调用 RMQMetricsController 类下的 metrics 方法,其中 ip 为 rocketmq-expoter 项目运行的主机 ip

    private void metrics(HttpServletResponse response) throws IOException {
    StringWriter writer = new StringWriter();
    metricsService.metrics(writer);
    response.setHeader("Content-Type", "text/plain; version=0.0.4; charset=utf-8");
    response.getOutputStream().print(writer.toString());
}

通过新建 StringWriter 对象用于收集 metrics 指标,调用 MetricsService 类中的方法 metrics 将 expoter 中提取到的指标收集到 writer 对象中,最后将收集到的指标输出到网页上。

收集到的指标格式为:

<metric name>{<label name>=<label value>, ...} <metric value>

如:

rocketmq_group_diff{group="rmq_group_test_20220114",topic="fusion_console_tst",countOfOnlineConsumers="0",msgModel="1",} 23.0

MetricCollectTask 类中的 5 个定时任务​

MetricCollectTask 类中有 5 个定时任务,分别为 collectTopicOffset、collectConsumerOffset、collectBrokerStatsTopic、collectBrokerStats 和 collectBrokerRuntimeStats。用于收集消费位点信息以及 Broker 状态信息等。其 cron 表达式为:cron: 15 0/1 * * * ?,表示每分钟会收集一次。其核心功能是通过 mqAdminExt 对象从集群中获取 broker 中的信息,然后将其添加到对应的 87 个监控指标中,以 collectTopicOffset 为例:

  1. 首先初始化TopicList对象,通过mqAdminExt.fetchAllTopicList()方法获取到集群的所有topic信息。
    TopicList topicList = null;
    try {  topicList = mqAdminExt.fetchAllTopicList();
} catch (Exception ex) {
        log.error(String.format("collectTopicOffset-exception comes getting topic list from namesrv, address is %s",
            JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
        return;
    }

  1. 将 topic 加入到 topicSet 中,循环遍历每一个 topic,通过 mqAdminExt.examineTopicStats(topic)函数来检查 topic 状态。
    Set < String > topicSet = topicList != null ? topicList.getTopicList() : null;
 for (String topic: topicSet) {
     TopicStatsTable topicStats = null;
     try {
         topicStats = mqAdminExt.examineTopicStats(topic);
     } catch (Exception ex) {
         log.error(String.format("collectTopicOffset-getting topic(%s) stats error. the namesrv address is %s",
             topic,
             JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
         continue;}

  1. 初始化 topic 状态 set,用于用于按 broker 划分的 topic 信息位点的 hash 表 brokerOffsetMap,以及一个用于按 broker 名字为 key 的用于存储更新时间戳的 hash 表 brokerUpdateTimestampMap。
        Set<Map.Entry<MessageQueue, TopicOffset>> topicStatusEntries = topicStats.getOffsetTable().entrySet();
        HashMap<String, Long> brokerOffsetMap = new HashMap<>();
        HashMap<String, Long> brokerUpdateTimestampMap = new HashMap<>();
        for (Map.Entry<MessageQueue, TopicOffset> topicStatusEntry : topicStatusEntries) {
            MessageQueue q = topicStatusEntry.getKey();
            TopicOffset offset = topicStatusEntry.getValue();
            if (brokerOffsetMap.containsKey(q.getBrokerName())) {
                brokerOffsetMap.put(q.getBrokerName(), brokerOffsetMap.get(q.getBrokerName()) + offset.getMaxOffset());
            } else {
                brokerOffsetMap.put(q.getBrokerName(), offset.getMaxOffset());
            }
            if (brokerUpdateTimestampMap.containsKey(q.getBrokerName())) {
                if (offset.getLastUpdateTimestamp() > brokerUpdateTimestampMap.get(q.getBrokerName())) {
                    brokerUpdateTimestampMap.put(q.getBrokerName(), offset.getLastUpdateTimestamp());
                }
            } else {
                brokerUpdateTimestampMap.put(q.getBrokerName(),
                offset.getLastUpdateTimestamp());
            }
        }

  1. 最后通过遍历 brokerOffsetMap 中的每一项,通过调用 metricsService 获取到 metricCollector 对象,调用 RMQMetricsCollector 类中的 addTopicOffsetMetric 方法,将相应的值添加到 RMQMetricsCollector 类中 87 个指标对应的其中一个指标的 cache 中。
 Set<Map.Entry<String, Long>> brokerOffsetEntries = brokerOffsetMap.entrySet();
        for (Map.Entry<String, Long> brokerOffsetEntry : brokerOffsetEntries) {
            metricsService.getCollector().addTopicOffsetMetric(clusterName, brokerOffsetEntry.getKey(), topic,
                brokerUpdateTimestampMap.get(brokerOffsetEntry.getKey()), brokerOffsetEntry.getValue());
        }
    }
    log.info("topic offset collection task finished...." + (System.currentTimeMillis() - start));
}

Rocketmq-exporter 收集指标流程图​

95680458234

快速开始​

配置 application.yml

application.yml 中重要的配置主要有:

  • server.port 设置 promethus 监听 rocketmq-exporter 的端口, 默认为 5557

  • rocketmq.config.webTelemetryPath 配置 promethus 获取指标的路径,默认为 /metrics ,使用默认值即可.

  • rocketmq.config.enableACL 如果 RocketMQ 集群开启了 ACL 验证,需要配置为 true, 并在 accessKey 和 secretKey 中配置相应的 ak, sk.

  • rocketmq.config.outOfTimeSeconds 用于配置存储指标和相应的值的过期时间,若超过该时间,cache 中的 key 对应的节点没有发生写更改,则会进行删除.一般配置为 60s 即可(根据 promethus 获取指标的时间间隔进行合理配置,只要保证过期时间大于等于 promethus 收集指标的时间间隔即可)

  • task..cron 配置 exporter 从 broker 拉取指标的定时任务的时间间隔,默认值为"15 0/1 * * ?" 每分钟的 15s 拉取一次指标.

启动 exporter 项目​

按照 promethus 官网配置启动​

配置 promethus 的 static_config: -targets 为 exporter 的启动 IP 和端口,如: localhost:5557

访问 promethus 页面​

本地启动默认为: localhost:9090 ,则可对收集到的指标值进行查看,如下图所示:

906876098423

提示

为了达到更好的可视化效果,观察指标值变化趋势, promethus 搭配 grafana 效果更佳哦!

可观测性指标​

可观测性指标主要包括两个大类: 服务端指标和客户端指标, 服务端指标由服务端直接生成, 客户端指标在客户端产生, 由服务端通过 rpc 请求客户端获取到. 客户端指标又可细分为生产端指标和消费端指标.所有 87 个可观测性指标及其主要含义如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/931226.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

南方CASS软件安装包分享

目录 一、软件简介 二、软件下载 一、软件简介 南方CASS软件是一款基于AutoCAD平台开发的测量和计算设计软件&#xff0c;广泛应用于水利、电力、市政、建筑、交通等领域。 南方CASS软件集成了地形测量、断面测量、土地勘测定界、水文水利和公路设计等功能&#xff0c;为测…

JS 数组中对象 某属性相同对某属性的值进行相加去重(支持多条件多个值判断相加)

/* delSameObjValue 数组对象相同值相加去重arr 需要处理的数组resultNum 最终计算结果的键名keyName 用于计算判断的键名 keyValue 用于计算结果的键名 --> 对应的键值为number类型 */function delSameObjValue(arr, resultNum, keyName, keyValue) {const warp new Map(…

微信开发之一键创建微信群聊的技术实现

创建微信群 本接口为敏感接口&#xff0c;请查阅调用规范手册创建后&#xff0c;手机上不会显示该群&#xff0c;往该群主动发条消息手机即可显示。 请求URL&#xff1a; http://域名地址/createChatroom 请求方式&#xff1a; POST 请求头Headers&#xff1a; Content-T…

Navisworks2020~2023安装包分享

目录 一、软件介绍 二、下载地址 一、软件介绍 Navisworks是一款专业的建筑、工厂、机械和设备设计软件工具&#xff0c;旨在帮助项目相关方可靠地整合、分享和审阅详细的三维设计模型。它提供了一系列功能强大的工具&#xff0c;使设计师、工程师和建筑师能够更好地协作、沟…

PB4引脚作GPIO上电高电平问题

问题说明 给旧项目debug&#xff0c;芯片是国民技术 N32G452VEL7 &#xff08;用起来跟32没多大差 包括PB4在内有多个引脚作为输出&#xff0c;默认低电平&#xff0c;在状态机内先输出高电平再回到低电平&#xff0c;来模拟按键的状态&#xff08;相当于按键按下松开后按键功…

Docker使用mysql:5.6和 owncloud 镜像,构建一个个人网盘,安装搭建私有仓库 Harbor

一、使用mysql:5.6和 owncloud 镜像&#xff0c;构建一个个人网盘。 [rootlocalhost ~]# docker pull mysql:5.6[rootlocalhost ~]# docker pull owncloud[rootlocalhost ~]# docker run -itd --name mysql --env MYSQL_ROOT_PASSWORD123456 mysql:5.6 d45cc5b95f00692881baaf…

vscode的远程代码调试

目录 ssh连接 xdebug调试 ssh连接 在vscode中下载该插件 这里用虚拟机测试&#xff0c;这里用虚拟机测试&#xff0c;注意ssh是可以连接的 然后安装好remote后&#xff0c;点击左下角的>< 在弹出的这个上选择connect to host连接一台主机 配置完用户名和IP后再点一次发…

<C++>泛型编程-模板

1.泛型编程 如何实现一个通用的交换函数呢&#xff1f;可以使用函数重载 void Swap(int &left, int &right) {int temp left;left right;right temp; }void Swap(double &left, double &right) {double temp left;left right;right temp; }void Swap(c…

ES6中promise的使用

ES6中promise的使用 本文目录 ES6中promise的使用基础介绍箭头函数function函数状态 原型方法Promise.prototype.then()Promise.prototype.catch() 静态方法Promise.all()Promise.race()Promise.any() 链式回调 基础介绍 官网&#xff1a;https://promisesaplus.com/ window.…

数据处理 | Python实现基于DFCP张量分解结合贝叶斯优化的缺失数据填补

数据处理 | Python实现基于DFCP张量分解结合贝叶斯优化的缺失数据填补 目录 数据处理 | Python实现基于DFCP张量分解结合贝叶斯优化的缺失数据填补实践过程基本介绍研究背景程序设计参考资料实践过程 基本介绍 数据处理 | Python实现基于DFCP张量分解结合贝叶斯优化的缺失数据填…

bug复刻,解决方案---在改变div层级关系时,导致传参失败

问题描述&#xff1a; 在优化页面时&#xff0c;为了实现网页顶部遮挡效果&#xff08;内容滚动&#xff0c;顶部导航栏不随着一起滚动&#xff0c;并且覆盖&#xff09;&#xff0c;做法是将内容都放在一个div里面&#xff0c;为这个新的div设置样式&#xff0c;margin-top w…

Android 之 LayoutInflater (布局服务)

本节引言&#xff1a; 本节继续带来的是Android系统服务中的LayoutInflater(布局服务)&#xff0c;说到布局&#xff0c;大家第一时间 可能想起的是写完一个布局的xml&#xff0c;然后调用Activity的setContentView()加载布局&#xff0c;然后把他显示 到屏幕上是吧~其实这个底…

lnmp架构-tomcat session

tomcat session共享 部署 tomcat 在server2 和 server3上进行同样的操作 保存的信息都会在tomcat日志中保存 tomcat 与nginx的整合 在server1上 当并发量开始增大时 就得做负载均衡 解决 当一个tomcat服务器挂掉后 另一个服务器上有第一个服务器上提交的数据问题 在server2…

wx:for的使用和事件传参,解构赋值的应用

在页面的.js文件中创建了一个对象&#xff0c; 并且在页面的view中调用了两种不同的方法将对象中的元素显示出来&#xff01; 第2种代码要加强理解&#xff01;&#xff01;&#xff01; 小程序中的wx:if wx:elif wx:else 其实好像c语言中的 if-elif-else 在页面的.j…

【大数据】图解 Hadoop 生态系统及其组件

图解 Hadoop 生态系统及其组件 1.HDFS2.MapReduce3.YARN4.Hive5.Pig6.Mahout7.HBase8.Zookeeper9.Sqoop10.Flume11.Oozie12.Ambari13.Spark 在了解 Hadoop 生态系统及其组件之前&#xff0c;我们首先了解一下 Hadoop 的三大组件&#xff0c;即 HDFS、MapReduce、YARN&#xff0…

Linux 内核定时器

一、相关知识点 (一)知识点 1、内核定时器分类 1)标准定时器或系统定时器 2)高精度定时器(HRT) 头文件:#include <linux/hrtimer.h> 2、检查系统是否可用HRT 1)查看内核配置文件 CONFIG_HIGH_RES_TIMERS=y 2)查看机器 …

Vue2向Vue3过度Vuex核心概念module模块

目录 1 核心概念 - module1.目标2.问题3.模块定义 - 准备 state 2 获取模块内的state数据1.目标&#xff1a;2.使用模块中的数据3.代码示例 3 获取模块内的getters数据1.目标&#xff1a;2.语法&#xff1a;3.代码演示 4 获取模块内的mutations方法1.目标&#xff1a;2.注意&am…

js实现数据关联查找更新。数据求和验证

为了实现这个功能我们和后端定义了数据结构 data:{id&#xff1a;‘’&#xff0c;formInfo:,formInfo2:,formInfo3:,formInfo4:, ......deailData:[ // 明细数据 // saleData 查询带出的对应明细序列号数据{ id:, ocopyId:, copyId:, odoId:, ......, saleData:[ { id:, oc…

unity-AI自动导航

unity-AI自动导航 给人物导航 一.地形创建 1.首先我们在Hierarchy面板中创建一个地形对象terrian&#xff0c;自行设定地形外貌&#xff0c;此时我们设置一个如下的地形外观。 二.创建导航系统 1.在主人公的Inspector、面板中添加Nav Mesh Agent &#xff08;导航网格代理&…

【Linux】手把手教你实现udp服务器

网络套接字~ 文章目录 前言一、udp服务器的实现总结 前言 上一篇文章中我们讲到了很多的网络名词以及相关知识&#xff0c;下面我们就直接进入udp服务器的实现。 一、udp服务器的实现 首先我们需要创建五个文件(文件名可以自己命名也可以和我一样)&#xff0c;分别是makefile…