RocketMQ消息轨迹产生的背景以及使用方式

news2025/1/4 15:59:53

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

背景

最近在维护RocketMQ经常会出现这种问题
消息发送方和接收方出现扯皮,消息发送方说我的消息已经发送成功了,消费方说我没接收到消息。两边各持己见,谁也不会说服谁。这时候就非常希望RocketMQ能有消息的一个消息发送和消费的一个业务log了,类似什么时候发送了消息,什么时候消费了消息,消费成功还是失败了

正常的消息查询页面一般只有消息是否消费,没有消息消费成功还是失败

不管消费成功还是失败,这里显式的都是CONSUMED,非常不方便排查问题,那么RocketMQ是不是有类似的log功能呢?
答案是有的,这里就引出了我们今天的主角,消息轨迹

RocketMQ版本

  • 5.1.0

消息轨迹是什么

RocketMQ消息轨迹主要是用来记录消息的发送消费记录,算是一种消息的log

如何使用

RocketMQ的消息轨迹开启主要是三个地方

  1. broker
  2. producer
  3. consumer

broker

broker启动配置文件添加如下配置

traceTopicEnable=true

producer开启消息轨迹

DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, true);

和一般的消息发送不同,主要是添加一个新的构造函数的参数

之前的构造方式

DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);

consumer开启消息轨迹

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, true);

和一般的消息消费也不同,我们也添加了enableMsgTracetrue

测试

消息发送

public class LocalProducer {

    /**
     * The number of produced messages.
     */
    public static final int MESSAGE_COUNT = 1;
    public static final String PRODUCER_GROUP = "xiao-zou-topic-producer";
    
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9000";
    public static final String TOPIC = "xiao-zou-topic";
    public static final String TAG = "TagA";

    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, true);

        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        producer.addRetryResponseCode(RemotingSysResponseCode.SYSTEM_BUSY);
        producer.start();

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            try {
                Message msg = new Message(TOPIC /* Topic */,
                    TAG /* Tag */,
                    ("Hello xiaozou " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
//                msg.setDelayTimeLevel(2);
                SendResult sendResult = producer.send(msg, 5000);
                DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                System.out.printf("%s %s%n", sendResult, dtf2.format(LocalDateTime.now()));
                TimeUnit.SECONDS.sleep(2);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

消息消费

public class LocalConsumer {

    public static final String CONSUMER_GROUP = "gid-xiao-zou-topic";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9001";
    public static final String TOPIC = "xiao-zou-topic";

    public static void main(String[] args) throws MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, true);
        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe(TOPIC, "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

这里消费故意返回失败,便于观察消费轨迹

然后我们发送消息后用msgId去查看消息轨迹看看

消息轨迹查看

我们一般用的是消息的查询,现在我们直接去消息轨迹那里查看

我们查看消息轨迹可以看到非常详细的消费记录
包括消息的

  • 发送时间
  • 消费是否成功还是失败
  • 重试测试等
  • 消费者的ip
  • broker的ip

消息轨迹的存储

消息轨迹默认存储的TopicRMQ_SYS_TRACE_TOPIC,也可以自己设置。
存储方式有两种

普通模式

RocketMQ集群中每一个Broker节点均用于存储Client端收集并发送过来的消息轨迹数据。因此,对于RocketMQ集群中的Broker节点数量并无要求和限制。

物理IO隔离模式

对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹(只在该broker创建轨迹Topic),使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。在该模式下,RocketMQ集群中至少有两个Broker节点,其中一个Broker节点定义为存储消息轨迹数据的服务端。

总结

总的来说消息轨迹开启后会非常方便我们定位问题,但是会增加额外的存储开支,如果消息量很大,推荐使用物理隔离的方式,单独使用一个broker存储消息轨迹

参考

  • 官方文档: https://rocketmq.apache.org/zh/docs/4.x/bestPractice/03messagetra/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/880164.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

苹果笔记本清理内存免费工具CleanMyMac X

苹果笔记本怎么清理内存&#xff1f;最简单粗暴的方式&#xff0c;就是下载专业的清理工具&#xff0c;可以帮助我们高效地清理苹果笔记本上的垃圾文件。比如苹果官方都推荐的CleanMyMac X&#xff0c;就可以自动扫描并清理系统中的垃圾文件&#xff0c;帮助我们省去手动操作的…

开学季值得买电容笔有哪些?推荐平价好用的电容笔

大多数的学生党都没有稳定的经济来源&#xff0c;因此在挑选东西时都追求着高性价比。随着iPad平板电脑的性能不断提高&#xff0c;其所具备的功能将不断增加&#xff0c;它将逐渐融入我们的生活与工作。由于电子产品的不断升级&#xff0c;软件的改进&#xff0c;使得电容笔的…

应用在家庭影院触摸屏中的高性能低功耗触摸芯片

家庭影院的主要思想是获得清晰的画面和令人惊叹的环绕声。这可以通过多个电子元件的组合轻松实现&#xff0c;为您提供真正的剧院体验。家庭影院系统所需的电子设备&#xff0c;主要有&#xff1a;扬声器、电视或投影仪、媒体设备和接收器。这些设备以不同的方式工作&#xff0…

FPGA应用学习笔记-----复位电路(二)和小结

不可复位触发器若和可复位触发器混合写的话&#xff0c;不可复位触发器是由可复位触发器馈电的。 不应该出现的复位&#xff0c;因为延时导致了冒险&#xff0c;异步复位存在静态冒险 附加素隐含项&#xff0c;利用数电方法&#xff0c;消除静态冒险 这样多时钟区域还是算异步的…

20. 有效的括号 题解

题目描述&#xff1a;20. 有效的括号 - 力扣&#xff08;LeetCode&#xff09; 给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#xff0c;判断字符串是否有效。 有效字符串需满足&#xff1a; 左括号必须用相同类型的右括号…

用于全局复根和极点查找算法的自适应网格生成器(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

JS 将 json 对象转成字符串并保留格式 - JSON.stringify()

JSON.stringify(value, replacer, space) value&#xff1a;将要序列化成一个 JSON 字符串的值。 replacer&#xff08;可选&#xff09;&#xff1a;如果该参数是一个函数&#xff0c;则在序列化过程中&#xff0c;被序列化的值的每个属性都会经过该函数的转换和处理&#xf…

Prometheus的搭建与使用

一、安装Prometheus 官网下载地址&#xff1a;Download | Prometheus 解压&#xff1a;tar -zxvf prometheus-2.19.2.linux-amd64.tar.gz重命名&#xff1a; mv prometheus-2.19.2.linux-amd64 /home/prometheus进入对应目录&#xff1a; cd /home/prometheus查看配置文件&am…

item_get_desc-获得TB商品描述

一、接口参数说明&#xff1a; item_get_desc-获得淘宝商品描述&#xff0c;点击更多API调试&#xff0c;请移步注册API账号点击获取测试key和secret 公共参数 请求地址: https://api-gw.onebound.cn/taobao/item_get_desc 名称类型必须描述keyString是调用key&#xff08;点…

安全加密框架图——Oracle安全开发者

Oracle安全开发者 ACLs 设计 ACLs&#xff08;访问控制列表&#xff09;时&#xff0c;可以根据以下思路进行设计&#xff1a; 所有者文件权限&#xff1a;确定文件的所有者能够对文件执行哪些操作&#xff0c;如读取、写入、执行等。这可以根据文件的性质和拥有者的职责来决…

Cygwin 配置C/C++编译环境以及如何编译项目

文章目录 一、安装C、C编译环境需要的包1. 选择gcc-core、gcc-g2. 选择gdb3. 选择mingw64下的gcc-core、gcc-g4. 选择make5. 选择cmake6. 确认更改7. 查看包安装状态 二、C、C 项目编译示例step1&#xff1a;解压缩sed-4.9.tar.gzstep2&#xff1a;执行./configure生成Makefile…

Nginx网站服务(安装nginx、平滑升级nginx、nginx各种访问配置)

一、Nginx概述 1、什么是nginx&#xff1f; 稳定性高、系统资源消耗低、对HTTP并发连接的处理能力高&#xff08;单台物理器可支持30000-50000个并发请求&#xff09; NG并发连接能力有2个因素的影响 ①CPU的个数 ②本地吴立琪系统的最大文件打开数2、Nginx应用场景 静态服…

日常BUG——普通页面跳转tabbar页面报错

&#x1f61c;作 者&#xff1a;是江迪呀✒️本文关键词&#xff1a;日常BUG、BUG、问题分析☀️每日 一言 &#xff1a;存在错误说明你在进步&#xff01; 一、问题描述 微信小程序页面跳转的时候出现下面的问题&#xff1a; wx.redirectTo({url: /pages/index/i…

行业分析(思维逻辑)

1、 如何着手分析一个行业&#xff1f; - 知乎做行业分析&#xff0c;很容易陷入一个误区&#xff0c;就是认为&#xff1a;我只要能找到最全的、最准的信息&#xff0c;就能做好行研。所以很多知…https://www.zhihu.com/question/20219092/answer/666346625 2、 3、 4、 …

Linux下搭建java环境

文章目录 一&#xff0c;xshell链接linux二&#xff0c;linux安装jdk环境 一&#xff0c;xshell链接linux 这里用到的工具,VMware搭配CentOS7 64位Xshell5 操作之前确保,传输Xshell连接了虚拟机 打开Xshell,文件->新建 主机ip—>进入虚拟机,右键打开终端,输入命令:ifco…

统筹型人才PK执行型人才,有啥区别?

统筹型人才PK执行型人才&#xff0c;有啥区别&#xff1f; 在大企业成长序列中 在着力培养统筹型人才 趣讲大白话&#xff1a;将还是兵&#xff1f; 【趣讲信息科技257期】 **************************** 我有华为公司当员工的同事 当干到一定年龄和职位后 有个判定&#xff1a…

手把手教你更改Vue项目图标

引言&#xff1a; Vue.js 是一款轻量级的前端框架&#xff0c;具有灵活、高效和易用等特点。在开发Vue项目时&#xff0c;我们经常会使用第三方库或工具来为项目添加图标&#xff0c;以提升用户界面的美观度。在本文中&#xff0c;将向大家介绍如何手把手更改Vue项目的图标&am…

R语言实现计算净重新分类指数(NRI)和综合判别改善指数(IDI)

两个模型比较&#xff0c;与第一个模型相比&#xff0c;NRI&#xff08;重新分对的 - 重新分错的&#xff09;/总人数。IDI&#xff08;新模型患者平均预测概率-旧模型患者平均预测概率&#xff09;-&#xff08;新模型非患者平均预测概率-旧模型非患者平均预测概率&#xff09…

【Unity每日一记】关于物体(敌方)检测—(向量点乘相关)

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;uni…

【游戏开发教程】Unity Cinemachine快速上手,详细案例讲解(虚拟相机系统 | 新发出品 | 良心教程)

文章目录 一、前言二、插件下载三、案例1&#xff1a;第三人称自由视角&#xff0c;Free Look character场景1、场景演示2、组件参数2.1、CinemachineBrain&#xff1a;核心2.2、CinemachineFreeLook&#xff1a;第三人称自由视角相机2.2.1、设置Follow&#xff1a;跟随2.2.2、…