目录
- 概述
- 实践
- 如何启用消息轨迹
- 配置
- 创建Topic
- 代码
- 测试
- 结束
概述
阅读此文可以解决 RocketMQ 中消息是否发送成功,是否消费成功。
查询消息轨迹可作为生产环境中排查问题强有力的数据支持 ,也是研发同学解决线上问题的重要武器之一。
详细如下:
- 引入轨迹的目的
- 轨迹是实实在在的话,那么可以避免一些相互
扯皮
的问题 - 证实生产者是否发送了消息,也可以证实消费者是否消费了消息
- 通过轨迹,有如下的信息:
- 从哪里来
- 什么时间到来
- 停留在哪里
- 停留的时间
- 去向哪里
- 去向的时间是多少
- 轨迹是实实在在的话,那么可以避免一些相互
- 如何实践
RocketMQ 源码中有相关文档
实践
默认情况下消息轨迹是存储在 RMQ_SYS_TRACE_TOPIC
,此外消息轨迹还可以存储在用户自定义的 topic 中。
注意:一般采取默认即可,自定义不在此介绍。
如何启用消息轨迹
- broker端
需要在broker端的配置文件中添加配置项:traceTopicEnable=true
,注意:对于消息轨迹数据量较大的场景,可以在 RocketMQ 集群中选择其中一个 Broker 节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理 IO 完全隔离,互不影响。 - 客户端
producer 端和 consumer 端需要启用消息轨迹,具体是在初始化客户端时打开打开启用消息轨迹的开关并根据实际需求决定是否使用默认的 topic 来存储消息轨迹
配置
[root@hadoop02 conf]# pwd
/data/soft/rocketmq-all-5.1.4-bin-release/conf
[root@hadoop02 conf]#
[root@hadoop02 conf]# vi broker.conf
## if msg tracing is open,the flag will be true
traceTopicEnable=true
创建Topic
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic3
代码
注意: new DefaultMQProducer("ProducerGroupName", true);
及 trace-key
的设置。
public class RocketMQMsgTraceProducer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", true);
producer.setNamesrvAddr("10.32.36.143:9876");
producer.start();
try {
{
Message msg = new Message("test-topic3",
"TagA",
"trace-key",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
测试
执行消息发送,得到如下结果 ,证明配置成功。
上图主要内容如下:
- topic:消息主题
- msgId:消息唯一ID
- offsetMsgId:消息偏移量ID,该ID中包含了Broker的IP以及偏移量
- tags:消息标志
- keys:消息索引key,根据该key可快速检索消息
- storeHost:跟踪类型为Pub时存储该消息的Broker服务器IP
- msgType:消息的类型,可选值为Normal_Msg(普通消息)、Trans_Msg_Half(预提交消息)、Trans_msg_Commit(提交消息)、Delay_Msg(延迟消息)
结束
至此,RocketMQ 支持消息轨迹详查
就结束了,如有疑问,欢迎评论区留言。