基于 IDEA 搭建 RocketMQ-4.6 源码环境

news2025/1/15 17:35:24

RocketMQ 架构

源码搭建前, 需要理解 RocketMQ 的四个重要组件, 以及 RocketMQ 的工作流程:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NsC5WRMG-1668600773110)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/18fd4bb6a5994ffd8e0f47dc49441784~tplv-k3u1fbpfcp-watermark.image?)]

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。

  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

结合部署架构图,描述集群工作流程:

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

准备环境

  1. JDK1.8
  2. IDEA.2022
  3. RocketMQ-4.6
  4. windows11

构建源码

  1. 拉取源码:

git clone https://github.com/apache/rocketmq.git

我的项目路径在 d:\yyr\zgp\rocketmq

  1. 执行 maven 命令

mvn clean install -Dmaven.test.skip=true

在这里插入图片描述

我们主要关注上图中标注的 4 个模块

  • broker
  • namesrv
  • example
  • distribution

运行程序

0. 准备工作

在 rocketmq 工程的根目录下, 新建 conf 目录(暂时不关注目录中的四个文件)

在这里插入图片描述

1. 运行 NameServer

  1. 打开 distribution 模块, 将 logback_namesrv.xml 文件放在 rocketmq/conf 目录下

在这里插入图片描述

  1. 首先需要,通过 IDEA 配置环境变量 ROCKETMQ_HOME, 类似我们装 JDK 一样, 需要配置 JAVA_HOME

在这里插入图片描述

  1. 运行 namesrv 模块启动类 org/apache/rocketmq/namesrv/NamesrvStartup.java 模块

当出现如下日志时, 通常可以 断定 是启动成功了。

在这里插入图片描述

2. 运行 Broker

1.打开 distribution 模块, 将 logback_broker.xml 以及 broker.conf 文件放在 rocketmq/conf 目录下

在这里插入图片描述

2.通过 IDEA 配置环境变量 ROCKETMQ_HOME, 类似我们装 JDK 一样, 需要配置 JAVA_HOME。并指定程序运行时需要读取的配置文件 broker.conf 的位置

在这里插入图片描述

  1. 运行 broker 模块

启动类 org/apache/rocketmq/broker/BrokerStartup.java

当出现如下日志时, 通常可以 断定 是启动成功了。(很多文章会说控制台打印如下日志就代表 broker 运行成功了, 其实不然。后面会说明原因

在这里插入图片描述

3. 运行消息生产者 Producer

在这里插入图片描述

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        // TopicTest 可以随意替换
        DefaultMQProducer producer = new DefaultMQProducer("TopicTest");
        // 指定 namesrv 地址, 默认端口是 9876
        producer.setNamesrvAddr("localhost:9876");
      
        producer.start();

        // 这里进行了修改, 只发送一条消息
        for (int i = 0; i < 1; i++) {
            try {
                Message msg = new Message("zhangsan" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

     
        producer.shutdown();
    }
}

消息发送成功

在这里插入图片描述

可能会出现的问题

找不到名称为 “zhangsan” 的消息主题

在这里插入图片描述

问题出现的主要原因是 broker 没有注册到 namesvr, 要么没有指定 broker.conf 文件, 要么就是 broker.conf 配置文件中没有配置 namesvr 的地址。

在这里插入图片描述

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = true
namesrvAddr = localhost:9876 // 注意这里

4. 运行 Consumer

在这里插入图片描述

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 与生产者保持一致
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TopicTest
        consumer.setNamesrvAddr("localhost:9876");

       
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        /*
         * Subscribe one more more topics to consume.
         */
        consumer.subscribe("TopicTest1", "*");

        /*
         *  Register callback to execute on arrival of messages fetched from brokers.
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

消息消费成功

在这里插入图片描述

5. 安装可视化工具 rocketmq-console

如果源码环境搭建完成后, 消息始终无法消费,或者没有发送出去,但是又无法判断哪个环节出现了问题, 我们就可以搭建可视化工具, 通常情况下, 这样更容易找到哪个模块出现了问题。

rocketmq-console 的搭建非常简单。

  1. 克隆源码

https://github.com/apache/rocketmq-externals.git

  1. 切换到对应的分支

在这里插入图片描述

  1. 修改 application.properties 文件, 添加 namesvr 地址

在这里插入图片描述

  1. 运行并访问

http://localhost:8080/

这是验证 namesvr 和 broker 是否启动成功最简单的办法。
除此之外, 我们也可以看出来消息是否发送成功, 是否消费成功。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/10494.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java基础之《undertow容器》

一、什么是undertow 1、undertow是springboot默认支持的三种servlet容器之一。 tomcat、jetty、undertow 2、undertow怎么读 under-tow 3、undertow是RedHat&#xff08;红帽公司&#xff09;的开源产品&#xff0c;采用java开发&#xff0c;是一款灵活、高性能的web服务器&…

大学生游戏静态HTML网页设计-(北京冬奥会12页 带js 带视频 轮播图)

⛵ 源码获取 文末联系 ✈ Web前端开发技术 描述 网页设计题材&#xff0c;DIVCSS 布局制作,HTMLCSS网页设计期末课程大作业 | HTML期末大学生网页设计作业&#xff0c;Web大学生网页 HTML&#xff1a;结构 CSS&#xff1a;样式 在操作方面上运用了html5和css3&#xff0c; 采用…

stack容器、queue容器(20221116)

一、stack容器 1、基本概念 先进后出的数据结构&#xff0c;只有一个出口&#xff08;栈顶&#xff09;。 栈不允许有遍历行为&#xff0c;可以判断是否为空(empty)&#xff0c;也可以知道其元素个数&#xff08;size&#xff09; 2、常用接口 构造函数&#xff1a; stac…

初始MySQL

目录 一、什么是数据库 二、SQL分类 三、库的操作 四、表的操作 五、数据类型 六、表的约束 什么是数据库 存储数据用文件就可以了&#xff0c;为什么还要有数据库&#xff1f; 文件保存数据有以下几个缺点&#xff1a; 文件的安全性问题文件不利于数据查询和管理 文件…

电脑视频怎么录制?好用的电脑录屏方法

在日常使用电脑的时候&#xff0c;很多小伙伴经常会遇到需要录制电脑视频的时候。但网上各种眼花缭乱的电脑录屏方法&#xff0c;很多小伙伴看了表示自己根本没有学会。今天就给大家分享2个简单好用的电脑录屏方法&#xff0c;看完后轻松掌握电脑录屏。 一&#xff0e;使用Wind…

主成分分析法在图像压缩和重建中的应用研究-含Matlab代码

目录一、引言二、主成分分析法概念及性质2.1 概念2.2 性质三、计算步骤3.1 计算相关系数矩阵3.2 计算特征值与特征向量3.3 计算主成分贡献率及累计贡献率3.4 计算主成分载荷3.5 各主成分的得分四、图像压缩与重建实验分析五、参考文献六、Matlab代码获取一、引言 主成分分析法…

【附源码】Python计算机毕业设计网上购物平台

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

最火后台管理系统 RuoYi 项目探秘,之二

上篇中&#xff0c;我们初步观察了 RuoYi 的项目结构&#xff0c;并在最后实际运行起了项目。我们也发现了作者不好的代码习惯&#xff0c;作为反例&#xff0c;我们应该要养成良好的编码习惯。本篇开始&#xff0c;我们会按照 Web 界面逐一对具体子项目的实现的功能进行探秘。…

Qt使用7z压缩和解压示例(支持文件夹递归、多文件不同位置)

1&#xff0c;简介 Qt自带的压缩处理类功能不太完善&#xff0c;也不支持中文路径。 这是我封装好的一个Qt调用7z处理压缩解压的工具类 ZipAPI&#xff0c;提供了几个简单易用的接口。 写压缩解压代码从此非常方便快捷&#xff01; 支持中文路径&#xff0c;支持常规的压缩解…

Cell:水平基因转移在昆虫中广泛存在,增强鳞翅目雄性昆虫求偶行为

期刊&#xff1a;Cell 影响因子&#xff1a;66.85 发表时间&#xff1a;2022年8月 一、研究背景 昆虫起源于约4.8亿年前&#xff0c;是地球上最繁盛的动物类群&#xff0c;已被描述种超过100万&#xff0c;占所有动物物种50%以上。这个古老的动物类群在…

插画、插图网站,免费(商用)

本期分享5个高质量插画网站&#xff0c;免费可商用&#xff0c;设计必备&#xff0c;建议收藏&#xff01;1、Undraw https://undraw.co/illustrationsUndraw是一个扁平风格插画图库&#xff0c;里面有大量的插画&#xff0c;可以支持在线更改配色&#xff0c;网站提供免费下载…

【JavaSE】类和对象(下)(访问限定符 包的概念 导入包中的类 自定义包 包的访问权限控制举例 常见的包 实例内部类 静态内部类 局部内部类 对象的打印)

文章目录六、 封装6.1 封装的概念6.2 访问限定符6.3 封装扩展之包6.3.1 包的概念6.3.2 导入包中的类6.3.3 自定义包6.3.4 包的访问权限控制举例6.3.5 常见的包七、内部类7.1 内部类7.1.1 实例内部类7.1.2 静态内部类7.2 局部内部类7.3 匿名内部类八、对象的打印六、 封装 6.1 …

人工智能-线性回归2--房价预测、欠拟合过拟合、正则化、模型保存加载

7&#xff0c;案例&#xff1a;波士顿房价预测 回归性能评估MSE from sklearn.datasets import load_boston from sklearn.model_selection import train_test_split from sklearn.linear_model import LinearRegression,SGDRegressor from sklearn.meyrics import mean_squa…

详解PHP解决swoole守护进程Redis假死 ,mysql断线重连问题

详解PHP解决swoole守护进程Redis假死 &#xff0c;mysql断线重连问题最近公司有个项目&#xff0c;要举办一个线上活动&#xff0c;我这边负责提供接口记录用户访问记录&#xff0c;与操作记录&#xff0c;由于活动参与人数可能比较多&#xff0c;为了不影响正常业务运行&#…

安全性归约(游戏)

文章目录基于游戏的安全性定义归约中的概率关系某事件发生某事件不发生互斥事件基于游戏的安全性定义 在将攻击 Γ\GammaΓ 的算法 A′AA′ 归约到攻击 Π\PiΠ 的算法 AAA 时&#xff0c; 让 A′AA′ 根据 ChΓCh_\GammaChΓ​ 提供的信息&#xff0c;为 AAA 模拟出同分布的 …

股票自动下单接口够接入多种股票数据源吗?

很多的股票交易接口在原先只能接入行情的CTP程序&#xff0c;那么股票自动下单接口现在可以通过openctp提供的CTPAPI&#xff0c;可以接入到多种多样的股票数据源&#xff01;但是目前由于大多的股票交易接口是受到监管限制的就很难接入实盘&#xff0c;那么股票自动下单接口通…

知心世界姐王瑞平:谷传民与大衣哥朱之文是沟通问题不是人品问题

大衣哥和谷传民的官司走到现在&#xff0c;互联网上面也出现了两大阵营对垒&#xff0c;一方面是大衣哥的粉丝&#xff0c;旗帜鲜明地支持自己的偶像&#xff0c;另一大阵营的人&#xff0c;则一心一意支持谷传民。虽然每个阵营都有自己的道理&#xff0c;但是毕竟都太过极端&a…

Virtual Data Augmentation: 虚拟数据扩增技术

听说过数据扩增&#xff08;Data Augmentation&#xff09;&#xff0c;也听说过虚拟对抗训练&#xff08;Virtual Adversarial Traning&#xff09;&#xff0c;但是我没想到会有人将其结合&#xff0c;谓之虚拟数据扩增&#xff08;Virtual Data Augmentation&#xff09;。这…

CANoe诊断测试

诊断协议那些事儿 本文为诊断协议那些事儿专栏文章&#xff0c;当我们在开发工程中越来越多的需要使用到总线测试工具&#xff0c;其中包括BUSMASTER、周立功、PCAN、CANOE等&#xff0c;本文将使用德国Vector公司的CANoe介绍诊断测试的基本环境。 文章目录诊断协议那些事儿一…

Python编程从入门到实践 第五章:if语句 练习答案记录

Python编程从入门到实践 第五章&#xff1a;if语句 练习答案记录 练习题导航Python编程从入门到实践 第五章&#xff1a;if语句 练习答案记录5.1 一个简单示例5.2 条件测试5.2.1 检查是否相等5.2.2 检查是否相等时忽略大小写5.2.3 检查是否不相等5.2.4 数值比较5.2.5 检查多个文…