RocketMQ源码分析 - 环境搭建

news2024/9/21 22:44:22

RocketMQ源码分析 - 环境搭建

    • 环境搭建
      • 源码拉取
      • 导入IDEA
      • 调试
        • 1) 启动NameServer
        • 2) 启动Broker
        • 3) 发送消息
        • 4) 消费消息

环境搭建

依赖工具

  • JDK:1.8+
  • Maven
  • Intellij IDEA

源码拉取

从官方仓库 https://github.com/apache/rocketmq clone或者download源码。
在这里插入图片描述
源码目录结构:

  • broker:broker模块(broker启动进程)
  • client:消息客户端,包含消息生产者、消息消费者相关类
  • common:公共包
  • dev:开发者信息(非源代码)
  • distribution:部署实例文件夹(非源代码)
  • example:RocketMQ例代码
  • filter:消息过滤相关基础类
  • filtersrv:消息过滤服务器实现相关类(Filter启动进程)
  • logappender:日志实现相关类
  • namesrv:NameServer实现相关类(NameServer启动进程)
  • openmessageing:消息开放标准
  • remoting:远程通信模块,给予Netty
  • srcutil:服务工具类
  • store:消息存储实现相关类
  • style:checkstyle相关实现
  • test:测试相关类
  • tools:工具类,监控命令相关实现类

导入IDEA

在这里插入图片描述
执行安装

clean install -Dmaven.test.skip=true

在这里插入图片描述

。。。。。。
[INFO] 
[INFO] Apache RocketMQ 4.5.1 .............................. SUCCESS [ 24.872 s]
[INFO] rocketmq-logging 4.5.1 ............................. SUCCESS [  3.511 s]
[INFO] rocketmq-remoting 4.5.1 ............................ SUCCESS [  4.462 s]
[INFO] rocketmq-common 4.5.1 .............................. SUCCESS [  5.444 s]
[INFO] rocketmq-client 4.5.1 .............................. SUCCESS [  4.268 s]
[INFO] rocketmq-store 4.5.1 ............................... SUCCESS [  3.219 s]
[INFO] rocketmq-srvutil 4.5.1 ............................. SUCCESS [  1.431 s]
[INFO] rocketmq-filter 4.5.1 .............................. SUCCESS [  1.321 s]
[INFO] rocketmq-acl 4.5.1 ................................. SUCCESS [  1.082 s]
[INFO] rocketmq-broker 4.5.1 .............................. SUCCESS [  3.667 s]
[INFO] rocketmq-tools 4.5.1 ............................... SUCCESS [  2.806 s]
[INFO] rocketmq-namesrv 4.5.1 ............................. SUCCESS [  1.228 s]
[INFO] rocketmq-logappender 4.5.1 ......................... SUCCESS [  1.394 s]
[INFO] rocketmq-openmessaging 4.5.1 ....................... SUCCESS [  1.122 s]
[INFO] rocketmq-example 4.5.1 ............................. SUCCESS [  1.282 s]
[INFO] rocketmq-test 4.5.1 ................................ SUCCESS [  1.439 s]
[INFO] rocketmq-distribution 4.5.1 ........................ SUCCESS [  0.147 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:02 min
[INFO] Finished at: 2024-07-19T08:46:25+08:00
[INFO] Final Memory: 57M/913M
[INFO] ------------------------------------------------------------------------

Process finished with exit code 0

调试

创建conf配置文件夹,从distribution拷贝broker.conf和logback_broker.xml和logback_namesrv.xml
在这里插入图片描述

1) 启动NameServer
  • 展开namesrv模块,右键NamesrvStartup.java
    在这里插入图片描述

  • 配置ROCKETMO_HOME
    在这里插入图片描述
    在这里插入图片描述

  • 重新启动
    控制台打印结果

Connected to the target VM, address: '127.0.0.1:65350', transport: 'socket'
The Name Server boot success. serializeType=JSON

在这里插入图片描述

2) 启动Broker
  • broker.conf配置文件内容

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# namesrvAddr地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 开启客户端创建主题功能
autoCreateTopicEnable=true

# 存储路径
storePathRootDir=D:\\work\\mq\\rocketmq-master\\dataDir
# commitLog路径
storePathCommitLog=D:\\work\\mq\\rocketmq-master\\dataDir\\commitLog
# 消息队列存储路径
storePathConsumeQueue=D:\\work\\mq\\rocketmq-master\\dataDir\\consumequeue
# 消息索引存储路径
storePathIndex=D:\\work\\mq\\rocketmq-master\\dataDir\\index
# checkpoint文件路径
storeCheckpoint=D:\\work\\mq\\rocketmq-master\\dataDir\\checkpoint
# abort文件存储路径
abortFile=D:\\work\\mq\\rocketmq-master\\dataDir\\abort
  • 创建数据文件夹dataDir
  • 启动BrokerStartup,配置broker.conf和ROCKETMQ_HOME
    在这里插入图片描述
    在这里插入图片描述
3) 发送消息
  • 进入example模块的org.apache.rocketmq.example.quickstart
  • 指定Namesrv地址
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
  • 运行main方法,发送消息

4) 消费消息
  • 进入example模块的org.apache.rocketmq.example.quickstart
  • 指定Namesrv地址
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");```
- 运行main方法,发送消息
```javascript
D:\install\jdk\jdk-8u131-windows-x64\bin\java.exe -javaagent:D:\install\idea\ideaIU-2018.3.5.win\lib\idea_rt.jar=57007:D:\install\idea\ideaIU-2018.3.5.win\bin -Dfile.encoding=UTF-8 -classpath D:\install\jdk\jdk-8u131-windows-x64\jre\lib\charsets.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\deploy.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\access-bridge-64.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\cldrdata.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\dnsns.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\jaccess.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\jfxrt.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\localedata.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\nashorn.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\sunec.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\sunjce_provider.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\sunmscapi.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\sunpkcs11.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\zipfs.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\javaws.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\jce.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\jfr.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\jfxswt.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\jsse.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\management-agent.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\plugin.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\resources.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\rt.jar;D:\work\mq\rocketmq-master\example\target\classes;D:\work\mq\rocketmq-master\client\target\classes;D:\work\mq\rocketmq-master\common\target\classes;C:\Users\muxu\.m2\repository\org\apache\commons\commons-lang3\3.4\commons-lang3-3.4.jar;D:\work\mq\rocketmq-master\srvutil\target\classes;D:\work\mq\rocketmq-master\remoting\target\classes;C:\Users\muxu\.m2\repository\com\alibaba\fastjson\1.2.51\fastjson-1.2.51.jar;C:\Users\muxu\.m2\repository\io\netty\netty-all\4.0.42.Final\netty-all-4.0.42.Final.jar;C:\Users\muxu\.m2\repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;C:\Users\muxu\.m2\repository\commons-cli\commons-cli\1.2\commons-cli-1.2.jar;C:\Users\muxu\.m2\repository\com\google\guava\guava\19.0\guava-19.0.jar;C:\Users\muxu\.m2\repository\ch\qos\logback\logback-classic\1.0.13\logback-classic-1.0.13.jar;C:\Users\muxu\.m2\repository\ch\qos\logback\logback-core\1.0.13\logback-core-1.0.13.jar;C:\Users\muxu\.m2\repository\org\slf4j\slf4j-api\1.7.7\slf4j-api-1.7.7.jar;C:\Users\muxu\.m2\repository\org\javassist\javassist\3.20.0-GA\javassist-3.20.0-GA.jar;C:\Users\muxu\.m2\repository\io\openmessaging\openmessaging-api\0.3.1-alpha\openmessaging-api-0.3.1-alpha.jar;D:\work\mq\rocketmq-master\openmessaging\target\classes;D:\work\mq\rocketmq-master\acl\target\classes;D:\work\mq\rocketmq-master\logging\target\classes;C:\Users\muxu\.m2\repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;C:\Users\muxu\.m2\repository\commons-codec\commons-codec\1.9\commons-codec-1.9.jar org.apache.rocketmq.example.quickstart.Consumer
22:16:16.493 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=2, sysFlag=0, bornTimestamp=1721571319594, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319598, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F0000000000000642, commitLogOffset=1602, bodyCRC=193412630, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382412, UNIQ_KEY=0200000111C818B4AAC26BC5B72A000A, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 48], transactionId='null'}]] 
ConsumeMessageThread_16 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=11, sysFlag=0, bornTimestamp=1721571319797, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319798, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F0000000000001F6E, commitLogOffset=8046, bodyCRC=529756006, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382418, UNIQ_KEY=0200000111C818B4AAC26BC5B7F5002E, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52, 54], transactionId='null'}]] 
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=3, sysFlag=0, bornTimestamp=1721571319624, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319627, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F000000000000090E, commitLogOffset=2318, bodyCRC=216726031, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382419, UNIQ_KEY=0200000111C818B4AAC26BC5B748000E, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 52], transactionId='null'}]] 
ConsumeMessageThread_11 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=6, sysFlag=0, bornTimestamp=1721571319698, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319702, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F0000000000001172, commitLogOffset=4466, bodyCRC=1237960928, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382419, UNIQ_KEY=0200000111C818B4AAC26BC5B792001A, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50, 54], transactionId='null'}]] 
ConsumeMessageThread_16 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=18, sysFlag=0, bornTimestamp=1721571319880, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319881, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F0000000000003302, commitLogOffset=13058, bodyCRC=1521507721, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382425, UNIQ_KEY=0200000111C818B4AAC26BC5B848004A, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 52], transactionId='null'}]] 
。。。。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2057866.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【微服务】微服务组件之Nacos注册中心和配置中心的使用

背景: 在当前的软件架构领域,微服务架构凭借其高度的可扩展性、灵活性和可维护性,已成为企业构建复杂应用的首选。微服务架构通过将应用拆分成一系列小的、独立的服务,实现了服务的解耦和复用,从而提高了应用的可扩展性…

Sass实现网页背景主题切换

Sass 实现网页背景主题切换 前言准备工作一、 简单的两种主题黑白切换1.定义主题2. 添加主题切换功能3. 修改 data-theme 属性 二、多种主题切换1. 定义主题2. 动态生成 CSS 变量1.遍历列表2.遍历映射3.高级用法 3. 设置默认主题4. 切换功能HTML 三、多种主题多种样式切换1. 定…

在 Fedora 上安装 LAMP(Linux、Apache、MySQL、PHP)的方法

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 关于 LAMP LAMP 栈是一组用于启动和运行 Web 服务器的开源软件。该缩写代表 Linux、Apache、MySQL 和 PHP。由于服务器已经在运行 Fedo…

高性能web服务器1

基础 Web 服务简介 Web 服务是互联网的核心组成部分之一,它允许用户通过浏览器访问信息和应用程序。一个基础的 Web 服务通常由 Web 服务器软件、静态网页内容、以及可选的动态内容生成程序组成。 Web 服务器软件 Web 服务器软件是运行在服务器上的程序&#xff…

【Java 数据结构】PriorityQueue介绍

优先级队列 回顾二叉树堆堆是什么堆的实现初始化堆的创建向下调整建堆复杂度插入向上调整建堆复杂度删除 PriorityQueue类介绍PriorityQueue是什么PriorityQueue使用构造方法常用方法 PriorityQueue源码介绍Top-K问题 回顾二叉树 上一次我们简单的了解了二叉树这个数据结构, 但…

每天五分钟深度学习框架pytorch:神经网络工具箱nn的介绍

本文重点 我们前面一章学习了自动求导,这很有用,但是在实际使用中我们基本不会使用,因为这个技术过于底层,我们接下来将学习pytorch中的nn模块,它是构建于autograd之上的神经网络模块,也就是说我们使用pytorch封装好的神经网络层,它自动会具有求导的功能,也就是说这部…

夏晖WMS是什么?夏晖WMS怎么与金蝶云星空进行集成?

在数字化浪潮席卷全球的今天,企业对于业务流程的高效管理和数据集成的需求愈发迫切。夏晖WMS作为一款领先的仓库管理系统,与金蝶云星空ERP的集成成为了众多企业提升管理效率的关键环节。 夏晖WMS是什么? 夏晖WMS是一款由夏晖物流(上海&…

Golang | Leetcode Golang题解之第355题设计推特

题目: 题解: type Twitter struct {Tweets []intUserTweets map[int][]intFollows map[int][]intIsFollowMy map[int]bool }/** Initialize your data structure here. */ func Constructor() Twitter {// 每一次实例化的时候,都重新分配一次…

C语言 | Leetcode C语言题解之第354题俄罗斯套娃信封问题

题目: 题解: int cmp(int** a, int** b) {return (*a)[0] (*b)[0] ? (*b)[1] - (*a)[1] : (*a)[0] - (*b)[0]; }int maxEnvelopes(int** envelopes, int envelopesSize, int* envelopesColSize) {if (envelopesSize 0) {return 0;}qsort(envelopes, …

宜佰丰超市进销存管理系统

你好呀,我是计算机学姐码农小野!如果有相关需求,可以私信联系我。 开发语言: Java 数据库: MySQL 技术: JavaMysql 工具: IDEA/Eclipse、Navicat、Maven 系统展示 首页 管理员功能模块…

接口测试及常用接口测试工具(postman/jmeter)详解

🍅 点击文末小卡片 ,免费获取软件测试全套资料,资料在手,涨薪更快 首先,什么是接口呢? 接口一般来说有两种,一种是程序内部的接口,一种是系统对外的接口。 系统对外的接口&#x…

【Alibaba Cola 状态机】重点解析以及实践案例

【Alibaba Cola 状态机】重点解析以及实践案例 1. 状态模式 状态模式是一种行为型设计模式,允许对象在内部状态改变时改变其行为,简单地讲就是,一个拥有状态的context对象,在不同状态下,其行为会发生改变。看起来是改…

Spring项目:文字花园(四)

一.实现登录 传统思路: • 登陆⻚⾯把⽤⼾名密码提交给服务器. • 服务器端验证⽤⼾名密码是否正确, 并返回校验结果给后端 • 如果密码正确, 则在服务器端创建 Session . 通过 Cookie 把 sessionId 返回给浏览器. 问题: 集群环境下⽆法直接使⽤Session. 原因分析: 我们开…

渐变纹理的使用

1、渐变纹理的使用 通过单张纹理和凹凸纹理相,我们知道图片中存储的数据不仅仅可以是颜色数据,还可以是高度、法线数据。 理论上来说,图片中存储的数据我们可以自定义规则,我们可以往图片中存储任何满足 我们需求的数据用于渲染。…

原神4.8版本抽到角色和重点培养数据表

<!DOCTYPE html> <html lang"zh-cn"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>原神4.8版本抽到角色和重点培养数据表</title…

vue-element-admin——<keep-alive>不符合预期缓存的原因

vue-element-admin——<keep-alive>不符合预期缓存的原因 本文章&#xff0c;以现在中后台开发用的非常多的开源项目vue-element-admin为案例。首先&#xff0c;列出官方文档与缓存<keep-alive>相关的链接&#xff08;请认真阅读&#xff0c;出现缓存<keep-ali…

MSR配置

公钥私钥 网页上提供的脚本安装客户端??去掉跳板机 history | grep azcopy 44 azcopy 47 azcopy cp --recursive --log-level NONE --overwrite true https://singularitywor9084471172.blob.core.windows.net/yifanyang/thinking.py\?sv\2023-01-03\&st\2024-…

机器学习:逻辑回归实现下采样和过采样

1、概述 逻辑回归本身是一种分类算法&#xff0c;它并不涉及下采样或过采样操作。然而&#xff0c;在处理不平衡数据集时&#xff0c;这些技术经常被用来改善模型的性能。下采样和过采样是两种常用的处理不平衡数据集的方法。 2、下采样 1、概念 下采样是通过减少数量较多的类…

【学习笔记】Day 19

一、进度概述 1、机器学习常识1-11&#xff0c;以及相关代码复现 二、详情 1、不确定性 所谓不确定性, 是指我们在进行预测的时候, 不能够保证 100% 的准确。而机器学习&#xff0c;比的就是谁 “猜的更准”。 不确定性&#xff0c;可能由信息不足、信息模糊等原因产…

编写开放接口与思考

编写开放接口与思考 一、情景描述&#xff1a; 当一个项目开发一定程度时&#xff0c;会有跟合作厂商对接共同开发的情况&#xff0c;那么如果合作厂商想要使用你项目中的某个接口&#xff0c;你该如何把接口暴露给他们&#xff1f; 二、实现方式分析 1、因为现在接口大部分…