这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
Rocketmq版本
- version: 5.1.0
背景
继续上次的高可用topic二开已经有了一段时间,现在我们需要对我们的限流数据进行监控,所以现在我们来研究研究RocketMQ
的监控源码
入口
这里我们源码的切入点还是以client
为切入点
首先我们来看看比如我们要统计topic
发送消息的数量是如何统计的。
入口代码我这里直接看的是rocketmq-exporter
的源代码,我这里给出部分核心代码
@Resource
private MQAdminExt mqAdminExt;
BrokerStatsData bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic);
BrokerStatsData viewBrokerStatsData(final String brokerAddr, final String statsName, final String statsKey)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
InterruptedException;
可以看到核心方法就是通过viewBrokerStatsData
方法
知道了入口后我们就去RocketMQ
源码里面具体分析
客户端的监控指标获取
老规矩我们这里直接进去看看netty的通信code
这里可以看到的通信code就是RequestCode.VIEW_BROKER_STATS_DATA
所以我们直接去找到broker对应的业务handler
很快我们就锁定了方法ViewBrokerStatsData(ctx, request)
这里这个方法命名有点奇怪,竟然是大写开头,不过我们不用在意这些小问题
可以考虑给社区提一个pr,不过merge不merge就不知道了,毕竟现在社区的pr已经高达200多个没处理了
进入到ViewBrokerStatsData
方法后,我们可以看到有有一个比较核心的方法
这里还记得requestHeader.getStatsName()
和requestHeader.getStatsKey()
的值吗
没错就是我们开始传进来的这两个值
requestHeader.getStatsName()
:BrokerStatsManager.TOPIC_PUT_NUMS
requestHeader.getStatsKey()
:topic
监控核心数据结构
在上面的分析我们就已经看到了RocketMQ
的核心数据结构,没错就是
BrokerStatsManager
这里我们重点关注的属性就是
private final HashMap<String, StatsItemSet> statsTable = new HashMap<>();
这里我简单理了一下他们的关系图
这里我可以简单给大家看看debug实际里面的数据结构
其中statsTable
的可以全在Stats
类中,大致有如下一些,我这里简单截个图
之前我们使用的BrokerStatsManager.TOPIC_PUT_NUMS
实际是不推荐使用了,推荐使用Stats
所以如果我们想要自定义一些监控指标就可以在这里面加一些我们自己的key
StatsItem
中的统计维度主要有三个:
- 分钟:csListMinute
- 小时:csListHour
- 天:csListDay
我们在AdminBrokerProcessor
中处理客户端的请求的时候拼装返回数据也可以看到
StatsItemSet的初始化
StatsItemSet
的初始化主要是在BrokerStatsManager
的init
方法
数据是如何写入
通过上面的从client
到broker
我们大致知道了数据查询以及存储的数据结构,接下来我们就来看看数据是如何写入的
通过上面的分析我们知道数据是存储在BrokerStatsManager
的statsTable
中
所以我们看看statsTable
中的调用关系
我们很快就定位到了写入TOPIC_PUT_NUMS
的方法
public void incTopicPutNums(final String topic) {
this.statsTable.get(Stats.TOPIC_PUT_NUMS).addValue(topic, 1, 1);
}
public void incTopicPutNums(final String topic, int num, int times) {
this.statsTable.get(Stats.TOPIC_PUT_NUMS).addValue(topic, num, times);
}
再通过这两个方法的调用关系,我们发现业务处理器即发消息的处理器SendMessageProcessor
有调用
我们可以看到发送消息成功后就会更新改值的内存值
this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
public void incTopicPutNums(final String topic, int num, int times) {
this.statsTable.get(Stats.TOPIC_PUT_NUMS).addValue(topic, num, times);
}
最终将数据写入到了StatsItem
中,那么我们的csListMinute
、csListHour
、csListDay
是如何统计的呢?这里我们有一个多个定时任务,每隔10s会去统计一次
统计逻辑就是在samplingInSeconds();
方法中,可以看到这里启动的定时任务是10s统计一次
public void samplingInSeconds() {
synchronized (this.csListMinute) {
if (this.csListMinute.size() == 0) {
this.csListMinute.add(new CallSnapshot(System.currentTimeMillis() - 10 * 1000, 0, 0));
}
this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.sum(), this.value
.sum()));
if (this.csListMinute.size() > 7) {
this.csListMinute.removeFirst();
}
}
}
这里是总共统计7个值,超过就将之前的移除掉
第一次会添加一个初始化,可以看到我们到第60s刚好会将初始值0移除掉
计算我们的统计指标sum
、tps
则是在computeStatsData
这个方法
private static StatsSnapshot computeStatsData(final LinkedList<CallSnapshot> csList) {
StatsSnapshot statsSnapshot = new StatsSnapshot();
synchronized (csList) {
double tps = 0;
double avgpt = 0;
long sum = 0;
long timesDiff = 0;
if (!csList.isEmpty()) {
CallSnapshot first = csList.getFirst();
CallSnapshot last = csList.getLast();
sum = last.getValue() - first.getValue();
tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp());
timesDiff = last.getTimes() - first.getTimes();
if (timesDiff > 0) {
avgpt = (sum * 1.0d) / timesDiff;
}
}
statsSnapshot.setSum(sum);
statsSnapshot.setTps(tps);
statsSnapshot.setAvgpt(avgpt);
statsSnapshot.setTimes(timesDiff);
}
return statsSnapshot;
}
总结
至此RocketMQ
的一些监控指标的处理就分析完成了,我们从指标的获取
->写入
->统计
都分析到了。
包括如何添加我们自己的监控指标,当然一些小细节就限于篇幅就没具体分析比如,统计计数使用的LongAdder
而不是AtomicLong
等