IDEA 导入 RocketMQ 源码

news2025/1/8 5:11:07

目录

  • 前言
  • 一、RocketMQ 架构
  • 二、环境准备
  • 三、下载源码
  • 四、编译源码
    • 4.1 导入源码
    • 4.2 目录结构
    • 4.3 运行程序
      • 1. 启动 Namesrv
      • 2. 启动 Broker
      • 3. 启动 Producer
      • 4. 启动 Consumer
  • 五、监控平台的搭建
    • 5.1 下载 console 源码
    • 5.2 IDEA 启动


前言

最近项目中有个功能需要在本地调试下 RocketMQ,所以需要在本地导入 RocketMQ 的源码并启动,故做此记录,便于回顾问题和与各位同学一起探讨。


一、RocketMQ 架构

在源码搭建前, 需要先理解 RocketMQ 的四个重要组件, 以及 RocketMQ 的工作流程:

在这里插入图片描述

  • NameServer 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

  • Broker 部署相对复杂,Broker 分为 MasterSlave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 MasterMasterSlave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId0 表示 Master,非 0 表示 SlaveMaster 也可以部署多个。每个 BrokerNameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。 注意:当前 RocketMQ 版本在部署架构上支持一 MasterSlave,但只有 BrokerId=1 的从服务器才会参与消息的读负载。

  • ProducerNameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。

  • ConsumerNameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 MasterSlave 建立长连接,且定时向 MasterSlave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,消费者在向 Master 拉取消息时,Master 服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读 I/O),以及从服务器是否可读等因素建议下一次是从 Master 还是 Slave 拉取。

结合部署架构图,描述集群工作流程:

  • 启动 NameServerNameServer 起来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。

  • Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息 ( IP+ 端口等) 以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 TopicBroker 的映射关系。

  • 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic

  • Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

  • ConsumerProducer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。


二、环境准备

  • ① JDK 1.8

    java -version

    在这里插入图片描述

  • ② IntelliJ IDEA 2021

  • ③ RocketMQ-4.9.x 源码

  • ④ Maven

  • ⑤ Git

  • ⑥ Windows 11


三、下载源码

我们可以在 github 或者 gitee 上都能下载到 RocketMQ 的源码

github 上下载:

github 上搜素 rocketmq 就能找到:https://github.com/apache/rocketmq

在这里插入图片描述

gitee上下载:

gitee 上搜素 rocketmq 就能找到:https://gitee.com/apache/rocketmq

在这里插入图片描述

这里我是直接下载 ZIP 压缩包的,可以避免很多问题

在这里插入图片描述

解压:

在这里插入图片描述


四、编译源码

4.1 导入源码

打开 IDEA ,选择 File -> Open

在这里插入图片描述

选中 rocketmq 源码所在目录

在这里插入图片描述

导入进来是这样子的

在这里插入图片描述

项目导进来之后先检查下 JDK 的配置,配置 JDK 版本 1.8 的(建议 JDK 的版本不要配置太高)

在这里插入图片描述

配置你的 Maven

在这里插入图片描述

检查下 git 配置(编译的时候会自动去检测 git,所以需要检查下)

在这里插入图片描述


4.2 目录结构

在这里插入图片描述


4.3 运行程序

本地 Debug 环境搭建过程如下:

  • ① 通过源码启动 Namesrv
  • ② 通过源码启动 broker
  • ③ 通过源码启动 Producer
  • ④ 通过源码启动 Consumer

1. 启动 Namesrv

Namesrv 源码在 rocketmq-namesrv 包下,启动类是 src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java,直接通过NamesrvStartupmain 方法启动会失败

在这里插入图片描述

终端输出提示我们需要配置一个 ROCKETMQ_HOME 环境变量,我们将环境变量配置到 IDEA

在这里插入图片描述

例如:ROCKETMQ_HOME=D:\source-code\rocketmq-4.9.x\rocketmq-4.9.x

在这里插入图片描述

再次启动 NamesrvStartup 后再次报错

在这里插入图片描述

意思是没有读到 conf 目录下的配置文件 logback_namesrv.xml

那就在项目下创建一个 conf 的文件夹

在这里插入图片描述

在这里插入图片描述

logback_namesrv.xml 这个配置文件可以在 distribution 的模块下找到,只需要将该文件复制一份到你所创建 conf 目录下即可

在这里插入图片描述

再启动 namesrv,控制台提示启动成功 The Name Server boot success.

在这里插入图片描述

到此 NameSrver 就算是启动完成了


2. 启动 Broker

Broker 源码在 rocketmq-broker 包下,启动类是 src/main/java/org/apache/rocketmq/broker/BrokerStartup.java,如果直接通过 IDEA 启动也是会失败

在这里插入图片描述

与上面 namesrv 一样,我们在 IDEA 上配置启动环境变量 ROCKETMQ_HOME

例如:ROCKETMQ_HOME=D:\source-code\rocketmq-4.9.x\rocketmq-4.9.x

在这里插入图片描述

再次启动还是会报与 Nameserv 相同的问题,我们只需要在 distribution 模块下找到 broker 相关的两个配置文件 logback_broker.xmlbroker.conf 两个文件复制到 conf 目录下即可

在这里插入图片描述

再次启动,就可以看到 broker 也正常运行了

在这里插入图片描述

但是仔细观察会发现虽然 broker 启动成功了,但是 brokerName 好像和配置文件 broker.conf 中的 brokerName 不一致

在这里插入图片描述

我们可以启动 broker 时让它指定以 conf/broker.conf 的配置文件启动

在这里插入图片描述

再次启动

在这里插入图片描述

broker.conf 的配置文件中还有以下配置可以修改

# namesrv服务地址
namesrvAddr = 127.0.0.1:9876
# 运行自动创建topic,避免调试的时候麻烦
autoCreateTopicEnable = true
# 数据存储路径
storePathRootDir = D:/file/rocketmq/data_store
# commitlog存储文件
storePathCommitLog = D:/file/rocketmq/data_store/commitlog
# 消费队列存储文件
storePathConsumeQueue = D:/file/rocketmq/data_store/consumequeue
# 索引存储文件
storePathIndex = D:/file/rocketmq/data_store/index
# checkpoint存储文件
storeCheckpoint = D:/file/rocketmq/data_store/checkpoint
# abort文件
abortFile = D:/file/rocketmq/data_store/abort

做本地调试的时候最好添加以下两个配置:

# namesrv服务地址
namesrvAddr = 127.0.0.1:9876
# 运行自动创建topic,避免调试的时候麻烦
autoCreateTopicEnable = true

在这里插入图片描述

为了更好的查看启动 broker 的相关配置,可以在 logback_broker.xml 配置文件中的 RocketmqBroker 里面追加 <appender-ref ref="STDOUT"/> 配置,例如:

    <logger name="RocketmqBroker" additivity="false">
        <level value="INFO"/>
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="RocketmqBrokerAppender"/>
    </logger>

在这里插入图片描述


3. 启动 Producer

example 模块中官方给了一个 producer 的示例

在这里插入图片描述

package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {

	// 发送消息次数:原本是 1000 次,作为演示我调整为 1 次
    public static final int MESSAGE_COUNT = 1;
    public static final String PRODUCER_GROUP = "please_rename_unique_group_name";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "TopicTest";
    public static final String TAG = "TagA";

    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        // 下面这行代码原本是注释的,这里要放开注释
        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

        producer.start();

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            try {
                Message msg = new Message(TOPIC /* Topic */,
                    TAG /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

直接运行即可

在这里插入图片描述

可以看到是有发送一条消息的


4. 启动 Consumer

同样的在 example 模块中官方给了一个 consumer 的示例

在这里插入图片描述

package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

public class Consumer {

    public static final String CONSUMER_GROUP = "please_rename_unique_group_name_4";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "TopicTest";

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
		// 下面这行代码原本是注释的,这里要放开注释
        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe(TOPIC, "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

直接运行

在这里插入图片描述

运行后可以看到 consumer 消费调了之前 producer 生成出来的那条消息


五、监控平台的搭建

RocketMQ 有一个专门的监控平台来查看 MQ 的情况,大概长这样子

在这里插入图片描述

5.1 下载 console 源码

下载链接:https://github.com/apache/rocketmq-externals

在这里插入图片描述可以通过 git 把它给拉下来,可以看到这里面有 rocketmq 与各种各样的技术集成,但是这个监控平台只需要启动 rocketmq-console 这个服务就行了

以下是我下载的:
链接:百度网盘链接
提取码:no5x

找个目录存放,然后解压下来。

在这里插入图片描述

5.2 IDEA 启动

进入 IDEA ,打开 rocketmq-console 项目

在这里插入图片描述

在这里插入图片描述

IDEA 打开以后,修改配置文件 application.properties

在这里插入图片描述
主要是将 rocketmq.config.namesrvAddr 设置成你 RocketMQ 所运行的服务器 IP (公网)地址,然后就可以直接启动了

在这里插入图片描述启动完成之后游览器上输入:localhost:8080,访问

在这里插入图片描述

到目前位置,RocketMQ 的监控平台也用 IDEA 启动成功了


参考文章:

RocketMQ 监控平台搭建与项目引入:https://blog.csdn.net/xhmico/article/details/124489116

基于 IDEA 搭建 RocketMQ-4.6 源码环境:https://juejin.cn/post/7166279522772320286

【RocketMQ | 源码分析】RocketMQ本地调试环境搭建:https://juejin.cn/post/7216729116690694199

手把手教你使用Idea调试RocketMQ源码:https://juejin.cn/post/7166175844145037319

RocketMQ 源码分析: https://gitee.com/haijun1998/rocketmq、https://gitee.com/wen-zhan/rocketmq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2051893.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SAM2论文核心速览

官方博客&#xff1a; https://ai.meta.com/blog/segment-anything-2/ 官方论文&#xff1a;​​​​​​https://ai.meta.com/research/publications/sam-2-segment-anything-in-images-and-videos/ 一、研究背景 研究问题&#xff1a;这篇文章要解决的问题是如何在图像和视频…

公式编辑器 -vue-formula-editor

前言 公式编辑旨在帮助用户使用可视化的前提&#xff0c;能便捷的使用平台&#xff0c;例如低代码平台使用广泛 vue-formula-editor vue-formula-editor是一款开源的Vue公式计算组件&#xff0c;可以帮助开发者快速集成公式编辑 在线体验 demo & 源码 安装 npm i vue-form…

CentOS 上安装 Java 17

要在 CentOS 上安装 Java 17&#xff0c;您可以使用多种方法。这里我将向您展示如何通过下载 Oracle 提供的 Java 开发工具包 (JDK) 或使用其他开源 JDK 版本&#xff08;如 Adoptium 或 OpenJDK&#xff09;来完成安装。 方法一&#xff1a;使用 Oracle JDK 17 下载 JDK 17&a…

HTB-BoardLight靶机笔记

BoardLight靶机笔记 概述 HTB的靶机BoardLight 靶机地址&#xff1a;https://app.hackthebox.com/machines/BoardLight 一、nmap扫描 1&#xff09;端口扫描 -sT tcp全连接扫描 --min-rate 以最低速率10000扫描 -p- 扫描全端口 sudo nmap -sT --min-rate 10000 -p- -o p…

【论文写作】怎么写一篇学术论文

文章目录 &#xff08;一&#xff09;非匀速地写论文&#xff08;二&#xff09;弄清期刊的投稿要求以及使用论文模板&#xff08;三&#xff09;论文各个部分撰写的顺序&#xff08;四&#xff09;图表比你想象中的要重要许多&#xff08;五&#xff09;结果和讨论&#xff08…

Java | Leetcode Java题解之第349题两个数组的交集

题目&#xff1a; 题解&#xff1a; class Solution {public int[] intersection(int[] nums1, int[] nums2) {Arrays.sort(nums1);Arrays.sort(nums2);int length1 nums1.length, length2 nums2.length;int[] intersection new int[length1 length2];int index 0, index…

CPU占用异常分析

文章目录 问题现象二次排查参考资料 问题现象 执行文件解压&#xff0c;执行过程中被kill掉了&#xff0c;两次均如此。 [rootlocalhost demo_2]# gzip -d demo.sql.gz Killed网上查资料&#xff0c;可能是磁盘不足、系统资源不足&#xff1b; 磁盘查看没有问题&#xff0c;内…

Matlab2021b通过CNN、CNN-LSTM模型实现对声音信号的二分类与四分类

1、利用Matlab2021b训练CNN、CNN-LSTM模型&#xff0c;对采集的一维时序信号进行分类二分类与四分类 2. 声音信号每个样本数据长度3001个采样点&#xff0c;对其进行归一化处理 3、CNN时序信号多分类执行结果截图 3.1 二分类&#xff1a; CNN模型&#xff1a; 训练集损失值…

Linux装ifort环境

下载完成之后&#xff0c;需要解压文件 t tar zxvf IPSXE2020u4Linux.tgz 解压完成之后进入文件夹&#xff0c;我们使用GUI界面安装。 键入./install_GUI.sh 启动安装程序 收集用户信息&#xff0c;选择同意或者不同意都可以 这一步需要等待十几秒 核验不通过 这是缺少运行程…

牛客JS题(四十五)数组去重

注释很详细&#xff0c;直接上代码 涉及知识点&#xff1a; set的灵活用法去除的判别标准 题干&#xff1a; 我的答案 <!DOCTYPE html> <html><head><meta charset"UTF-8" /><style>/* 填写样式 */</style></head><bo…

★ C++基础篇 ★ vector 类

Ciallo&#xff5e;(∠・ω< )⌒☆ ~ 今天&#xff0c;我将继续和大家一起学习C基础篇第六章----vector类 ~ 目录 一 vector的介绍及使用 1.1 vector的介绍 1.2 vector的使用 1.2.1 vector的定义 1.2.2 vector iterator 的使用 1.2.3 vector 空间增长问题 1.2.4 vecto…

网站配置了https证书,但浏览器访问时却访问了http

是由于缺少强制将 HTTP 请求重定向到 HTTPS 的规则 # HTTP 到 HTTPS 重定向配置 server {listen 80;server_name www.xlqd.site xlqd.site;return 301 https://$host$request_uri; } # 那么你原来的server块就要删除 listen 80;

【学习笔记】A2X通信的协议(十二)- PC5信令协议数据错误处理

目录 10. 处理未知、未预见和错误的PC5信令协议数据 10.1 总则 10.2 消息过短或过长 10.2.1 消息过短 10.2.2 消息过长 10.3 未知或未预见的消息类型 10.4 非语义性强制信息元素错误 10.5 非命令性消息部分中的未知和未预见的IE 10.5.1 消息中未知的IEI 10.5.2 乱序的…

虚幻蓝图 | 游戏开发 Randomize Height

当地图上有无数个收集物【如水晶】&#xff0c;一键随机化高度 应用前 应用后 同理带seed的随机化位置摆放如下&#xff1a; https://www.youtube.com/watch?vkGpsMEMDrjQ

【Hot100】LeetCode—142. 环形链表 II

目录 1- 思路快慢指针推导 2- 实现⭐142. 环形链表 II——题解思路 3- ACM 实现 原题连接&#xff1a;141. 环形链表 1- 思路 快慢指针推导 ① 利用快慢指针&#xff0c;定位环② 根据环&#xff0c;从头出发一个指针&#xff0c;从环处出发一个指针 两者相遇的地方就是环的入…

文献引用数据集分类(GCN)

#基于点的任务 from torch_geometric.datasets import Planetoid from torch_geometric.transforms import NormalizeFeatures import matplotlib.pyplot as plt from sklearn.manifold import TSNE import torch import torch.nn.functional as F from torch.nn import Linear…

Mysql原理与调优-事务与MVCC

目录 1.事务 1.1 什么是事务 1.2 事务隔离级别 1.2.1 事务并发执行可能出现的问题 1.2.2 隔离级别 1.2.3 如何查看和设置事务的隔离级别 1.2.3 快照读和当前读 2.MVCC 2.1 版本链机制 2.2 Read View 2.2.1 Read View读取事务的原则 2.4 Read Committed级别查询 2.5…

ACM MM 2024,复旦腾讯优图等提出MDT-A2G,可根据说话语音同步生成手势

复旦&腾讯优图等提出MDT-A2G&#xff0c;这是一个专门用来生成与语音同步手势的先进模型。想象一下&#xff0c;当我们说话时&#xff0c;身体自然会做出手势。这个模型的目的是让计算机也能像人类一样&#xff0c;根据说话的内容来生成合适的手势。它的运作方式像是一个人…

python的多线程实现高速下载PDB数据集

多线程下载数据 最近在某个网站上写了个shell脚本来下载数据集&#xff0c;内容量不大&#xff0c;但是下载的特别的慢&#xff0c;于是想到用多线程下载&#xff0c;发现快了很多。本文主要让大家清楚python中的几个模块区别和关于程序加速的一些方法&#xff0c;以及多线程下…

YOLOv8_det/seg/pose/obb推理流程

本章将介绍目标检测、实例分割、关键点检测和旋转目标检测的推理原理,基于onnx模型推理,那么首先就需要了解onnx模型的输入和输出,对输入的图片需要进行预处理的操作,对输出的结果需要进行后处理的操作,这部分内容在我的另一个专栏《YOLOv8深度剖析》中也有介绍,如果对YO…