1. RocketMQ 介绍
RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。简单的来说,它由 Broker 服务器和客户端两部分组成,其中客户端一个是消息发布者客户端(Producer),它负责向 Broker 服务器发送消息;另外一个是消息的消费者客户端(Consumer),多个消费者可以组成一个消费组,来订阅和拉取消费 Broker 服务器上存储的消息。
正由于它具有高性能、高可靠性和高实时性的特点,与其他协议组件在 MQTT 等各种消息场景中的结合也越来越多,应用越来越广泛。而对于这样一个强大的消息中间件平台,在实际使用的时候还缺少一个监控管理平台。
当前在开源界,使用最广泛监控解决方案的就是 Prometheus。与其它传统监控系统相比较,Prometheus 具有易于管理,监控服务的内部运行状态,强大的数据模型,强大的查询语言 PromQL,高效的数据处理,可扩展,易于集成,可视化,开放性等优点。并且借助于 Prometheus 可以很快速的构建出一个能够监控 RocketMQ 的监控平台。
2. Prometheus、Alertmanager、Grafana环境搭建
安装方式参考 Prometheus Operator 极简配置方式在k8s一条龙安装Prometheus 监控这里就不细说了。
3. 在 K8s上部署RocketMQ Exporter
RocketMQ 本身不支持 Prometheus 需要使用 rocketmq-exporter
3.1 Apache RocketMQ Exporter for Prometheus.
项目地址 https://github.com/apache/rocketmq-exporter
制作我们自己的docker镜像
-
git clone https://github.com/apache/rocketmq-exporter.git
-
cd xx/rocketmq-exporter
-
build
mvn clean package docker:build
-
修改tag,并push
docker tag rocketmq-exporter admin4j/rocketmq-exporter docker push admin4j/rocketmq-exporter
不想自己弄镜像的直接使用我做好的即可docker pull admin4j/rocketmq-exporter:latest
3.2 RocketMQ-Exporter 的具体实现
当前在 Exporter 当中,实现原理如下图所示:
整个系统基于 spring boot 框架来实现。由于 MQ 内部本身提供了比较全面的数据统计信息,所以对于 Exporter 而言,只需要将 MQ 集群提供的统计信息取出然后进行加工而已。所以 RocketMQ-Exporter 的基本逻辑是内部启动多个定时任务周期性的从 MQ 集群拉取数据,然后将数据规范化后通过端点暴露给 Prometheus 即可。其中主要包含如下主要的三个功能部分:
- MQAdminExt 模块通过封装 MQ 系统客户端提供的接口来获取 MQ 集群内部的统计信息。
- MetricService 负责将 MQ 集群返回的结果数据进行加工,使其符合 Prometheus 要求的格式化数据。
- Collect 模块负责存储规范化后的数据,最后当 Prometheus 定时从 Exporter 拉取数据的时候,Exporter 就将 Collector 收集的数据通过 HTTP 的形式在/metrics 端点进行暴露。
3.3 RocketMQ-Exporter 的监控指标和告警指标
RocketMQ-Exporter 主要是配合 Prometheus 来做监控,下面来看看当前在 Expoter 中定义了哪些监控指标和告警指标。
- 监控指标
rocketmq_message_accumulation 是一个聚合指标,需要根据其它上报指标聚合生成。
- 告警指标
消费者堆积告警指标也是一个聚合指标,它根据消费堆积的聚合指标生成,value 这个阈值对每个消费者是不固定的,当前是根据过去 5 分钟生产者生产的消息数量来定,用户也可以根据实际情况自行设定该阈值。告警指标设置的值只是个阈值只是象征性的值,用户可根据在实际使用 RocketMQ 的情况下自行设定。这里重点介绍一下消费者堆积告警指标,在以往的监控系统中,由于没有像 Prometheus 那样有强大的 PromQL 语言,在处理消费者告警问题时势必需要为每个消费者设置告警,那这样就需要 RocketMQ 系统的维护人员为每个消费者添加,要么在系统后台检测到有新的消费者创建时自动添加。在 Prometheus 中,这可以通过一条如下的语句来实现:
(sum(rocketmq_producer_offset) by (topic) - on(topic) group_right sum(rocketmq_consumer_offset) by (group,topic))
- ignoring(group) group_left sum (avg_over_time(rocketmq_producer_tps[5m])) by (topic)*5*60 > 0
借助 PromQL 这一条语句不仅可以实现为任意一个消费者创建消费告警堆积告警,而且还可以使消费堆积的阈值取一个跟生产者发送速度相关的阈值。这样大大增加了消费堆积告警的准确性。
3.4 RocketMQ-Exporter 使用示例
apiVersion: apps/v1
kind: Deployment
metadata:
name: rocketmq-exporter
spec:
replicas: 1
selector:
matchLabels:
app: rocketmq-exporter
template:
metadata:
labels:
app: rocketmq-exporter
annotations:
prometheus.io/path: metrics
prometheus.io/port: '5557'
prometheus.io/scrape: 'true'
spec:
containers:
- name: rocketmq-exporter
image: admin4j/rocketmq-exporter
imagePullPolicy: IfNotPresent
resources:
limits:
memory: 1G
env:
- name: rocketmq.config.namesrvAddr
value: 192.168.0.192:9876
这里使用注解 prometheus.io/scrape: 'true'
的方式让prometheus自定方式rocketmq-exporter的pod
使用注解
prometheus.io/scrape: 'true'
需要提前配置,参考 【k8s 实战】Prometheus Operator 高级配置- 监控Kubernetes自动发现
3.5 查看 prometheus targets 自动发现rocketmq-exporter
3. 使用 Grafana Dashboard 展示监控项数据
在Grafana 官网找一个 mysql 监控模板 https://grafana.com/grafana/dashboards/?search=RocketMQ
我们这里选择 ID: 14612,查看效果
4. 利用AlertManager 发送RockectMq报警消息
4.1 告警规则添加
添加PrometheusRule
CRD
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
labels:
prometheus: k8s
role: alert-rules
name: prometheus-rocketmq-rules
spec:
groups:
- name: RocketmqAlerts
rules:
- alert: RocketMQClusterProduceHigh
expr: sum(rocketmq_producer_tps) by (cluster) >= 10
for: 3m
labels:
severity: warning
annotations:
description: '{{$labels.cluster}} Sending tps too high.'
summary: cluster send tps too high
- alert: RocketMQClusterProduceLow
expr: sum(rocketmq_producer_tps) by (cluster) < 1
for: 3m
labels:
severity: warning
annotations:
description: '{{$labels.cluster}} Sending tps too low.'
summary: cluster send tps too low
- alert: RocketMQClusterConsumeHigh
expr: sum(rocketmq_consumer_tps) by (cluster) >= 10
for: 3m
labels:
severity: warning
annotations:
description: '{{$labels.cluster}} consuming tps too high.'
summary: cluster consume tps too high
- alert: RocketMQClusterConsumeLow
expr: sum(rocketmq_consumer_tps) by (cluster) < 1
for: 3m
labels:
severity: warning
annotations:
description: '{{$labels.cluster}} consuming tps too low.'
summary: cluster consume tps too low
- alert: ConsumerFallingBehind
expr: (sum(rocketmq_producer_offset) by (topic) - on(topic) group_right sum(rocketmq_consumer_offset) by (group,topic)) - ignoring(group) group_left sum (avg_over_time(rocketmq_producer_tps[5m])) by (topic)*5*60 > 0
for: 3m
labels:
severity: warning
annotations:
description: 'consumer {{$labels.group}} on {{$labels.topic}} lag behind
and is falling behind (behind value {{$value}}).'
summary: consumer lag behind
- alert: GroupGetLatencyByStoretime
expr: rocketmq_group_get_latency_by_storetime > 1000
for: 3m
labels:
severity: warning
annotations:
description: 'consumer {{$labels.group}} on {{$labels.broker}}, {{$labels.topic}} consume time lag behind message store time
and (behind value is {{$value}}).'
summary: message consumes time lag behind message store time too much