文章目录
- 1、源码环境搭建
- 1.1、主要功能模块
- 1.2、源码启动服务
- 1.2.1、 启动nameServer
- 1.2.2、 启动Broker
- 1.2.3、 发送消息
- 1.2.4、 消费消息
- 2、源码剖析
- 2.1、NameServer的启动过程
- 2.2、Broker服务启动过程
- 2.3、Netty服务注册框架
- 2.3.1、关注重点
- 2.3.2、源码重点
1、源码环境搭建
1.1、主要功能模块
RocketMQ的官方Git仓库地址:https://github.com/apache/rocketmq 可以用git把项目clone下来或者直接下载代码包。
也可以到RocketMQ的官方网站上下载指定版本的源码: http://rocketmq.apache.org/dowloading/releases/
源码下很多的功能模块,很容易让人迷失方向,我们只关注下几个最为重要的模块:
- broker: Broker 模块(broke 启动进程)
- client :消息客户端,包含消息生产者、消息消费者相关类
- example: RocketMQ 例代码
- namesrv:NameServer模块
- store:消息存储模块
- remoting:远程访问模块
1.2、源码启动服务
将源码导入IDEA后,需要先对源码进行编译。编译指令 clean install -Dmaven.test.skip=true
编译完成后就可以开始调试代码了。调试时需要按照以下步骤:
调试时,先在项目目录下创建一个conf目录,并从distribution拷贝broker.conf和logback_broker.xml和logback_namesrv.xml
1.2.1、 启动nameServer
展开namesrv模块,运行NamesrvStartup类即可启动NameServer
启动时,会报错,提示需要配置一个ROCKETMQ_HOME环境变量。
这个环境变量我们可以在机器上配置,跟配置JAVA_HOME环境变量一样。也可以在IDEA的运行环境中配置。目录指向源码目录即可。
配置完成后,再次执行,看到以下日志内容,表示NameServer启动成功
1.2.2、 启动Broker
启动Broker之前,我们需要先修改之前复制的broker.conf文件
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 自动创建Topic
autoCreateTopicEnable=true
# nameServ地址
namesrvAddr=127.0.0.1:9876
# 存储路径
storePathRootDir=E:\\RocketMQ\\data\\rocketmq\\dataDir
# commitLog路径
storePathCommitLog=E:\\RocketMQ\\data\\rocketmq\\dataDir\\commitlog
# 消息队列存储路径
storePathConsumeQueue=E:\\RocketMQ\\data\\rocketmq\\dataDir\\consumequeue
# 消息索引存储路径
storePathIndex=E:\\RocketMQ\\data\\rocketmq\\dataDir\\index
# checkpoint文件路径
storeCheckpoint=E:\\RocketMQ\\data\\rocketmq\\dataDir\\checkpoint
# abort文件存储路径
abortFile=E:\\RocketMQ\\data\\rocketmq\\dataDir\\abort
然后Broker的启动类是broker模块下的BrokerStartup。
启动Broker时,同样需要ROCETMQ_HOME环境变量,并且还需要配置一个-c 参数,指向broker.conf配置文件。
然后重新启动,即可启动Broker。
1.2.3、 发送消息
启动example模块下的org.apache.rocketmq.example.quickstart.Producer类即可发送消息。
但是在测试源码中,需要指定NameServer地址。这个NameServer地址有两种指定方式,一种是配置一个NAMESRV_ADDR的环境变量。另一种是在源码中指定。我们可以在源码中加一行代码指定NameServer
producer.setNamesrvAddr("127.0.0.1:9876");
然后就可以发送消息了。
1.2.4、 消费消息
可以使用同一模块下的org.apache.rocketmq.example.quickstart.Consumer类来消费消息。运行时同样需要指定NameServer地址
consumer.setNamesrvAddr("192.168.232.128:9876");
这样整个调试环境就搭建好了。
2、源码剖析
2.1、NameServer的启动过程
关注重点
在RocketMQ集群中,实际记性消息存储、推送等核心功能点额是Broker。而NameServer的作用,其实和微服务中的注册中心非常类似,他只是提供了Broker端的服务注册与发现功能。
源码重点
NameServer的启动入口类是org.apache.rocketmq.namesrv.NamesrvStartup。其中的核心是构建并启动一个NamesrvController。这个Cotroller对象就跟MVC中的Controller是很类似的,都是响应客户端的请求。只不过,他响应的是基于Netty的客户端请求。
另外,他的实际启动过程,其实可以配合NameServer的启动脚本进行更深入的理解。
从NameServer启动和关闭这两个关键步骤,我们可以总结出NameServer的组件其实并不是很多,整个NameServer的结构是这样的;
从这里也能看出, RocketMQ的整体源码风格就是典型的MVC思想。Controller响应请求,Service处理业务,各种Table保存消息。
2.2、Broker服务启动过程
关注重点
Broker是整个RocketMQ的业务核心。所有消息存储、转发这些重要的业务都是Broker进行处理。
这里重点梳理Broker有哪些内部服务。这些内部服务将是整理Broker核心业务流程的起点。
源码重点
Broker启动的入口在BrokerStartup这个类,可以从他的main方法开始调试。
启动过程关键点:重点也是围绕一个BrokerController对象,先创建,然后再启动。
首先: 在BrokerStartup.createBrokerController方法中可以看到Broker的几个核心配置:
- BrokerConfig : Broker服务配置
- MessageStoreConfig : 消息存储配置。 这两个配置参数都可以在broker.conf文件中进行配置
- NettyServerConfig :Netty服务端占用了10911端口。同样也可以在配置文件中覆盖。
- NettyClientConfig : Broker既要作为Netty服务端,向客户端提供核心业务能力,又要作为Netty客户端,向NameServer注册心跳。
这些配置是我们了解如何优化 RocketMQ 使用的关键。
然后: 在BrokerController.start方法可以看到启动了一大堆Broker的核心服务,我们挑一些重要的
this.messageStore.start();//启动核心的消息存储组件
this.remotingServer.start();
this.fastRemotingServer.start(); //启动两个Netty服务
this.brokerOuterAPI.start();//启动客户端,往外发请求
BrokerController.this.registerBrokerAll: //向NameServer注册心跳。
this.brokerStatsManager.start();
this.brokerFastFailure.start();//这也是一些负责具体业务的功能组件
我们现在不需要了解这些核心组件的具体功能,只要有个大概,Broker中有一大堆的功能组件负责具体的业务。后面等到分析具体业务时再去深入每个服务的细节。
我们需要抽象出Broker的一个整体结构:
可以看到Broker启动了两个Netty服务,他们的功能基本差不多。实际上,在应用中,可以通过producer.setSendMessageWithVIPChannel(true),让少量比较重要的producer走VIP的通道。而在消费者端,也可以通过consumer.setVipChannelEnabled(true),让消费者支持VIP通道的数据。
2.3、Netty服务注册框架
2.3.1、关注重点
网络通信服务是构建分布式应用的基础,也是我们去理解RocketMQ底层业务的基础。这里就重点梳理RocketMQ的这个服务注册框架,理解各个业务进程之间是如何进行RPC远程通信的。
Netty的所有远程通信功能都由remoting模块实现。RemotingServer模块里包含了RPC的服务端RemotingServer以及客户端RemotingClient。在RocketMQ中,涉及到的远程服务非常多,在RocketMQ中,NameServer主要是RPC的服务端RemotingServer,Broker对于客户端来说,是RPC的服务端RemotingServer,而对于NameServer来说,又是RPC的客户端。各种Client是RPC的客户端RemotingClient。
需要理解的是,RocketMQ基于Netty保持客户端与服务端的长连接Channel。只要Channel是稳定的,那么即可以从客户端发请求到服务端,同样服务端也可以发请求到客户端。例如在事务消息场景中,就需要Broker多次主动向Producer发送请求确认事务的状态。所以,RemotingServer和RemotingClient都需要注册自己的服务。
2.3.2、源码重点
1、哪些组件需要Netty服务端?哪些组件需要Netty客户端? 比较好理解的,NameServer需要NettyServer。客户端,Producer和Consuer,需要NettyClient。Broker需要NettyServer响应客户端请求,需要NettyClient向NameServer注册心跳。但是有个问题, 事务消息的Producer也需要响应Broker的事务状态回查,他需要NettyServer吗?
NameServer不需要NettyClient,这也验证了之前介绍的NameServer之间不需要进行数据同步的说法。
2、所有的RPC请求数据都封账成RemotingCommand对象。而每个处理消息的服务逻辑,都会封装成一个NettyRequestProcessor对象。
3、服务端和客户端都维护一个processorTable,这是个HashMap。key是服务码requestCode,value是对应的运行单元 Pair<NettyRequestProcessor,ExecutorService>类型,包含了处理Processor和执行线程的线程池。具体的Processor,由业务系统自行注册。Broker服务注册见,BrokerController.registerProcessor(),客户端的服务注册见MQClientAPIImpl。NameServer则会注册一个大的DefaultRequestProcessor,统一处理所有服务。
4、请求类型分为REQUEST和RESPONSE。这是为了支持异步的RPC调用。NettyServer处理完请求后,可以先缓存到responseTable中,等NettyClient下次来获取,这样就不用阻塞Channel了,可以提升请求吞吐量。猜一猜Producer的同步请求的流程是什么样的?
5、重点理解remoting包中是如何实现全流程异步化。