RocketMQ 架构
源码搭建前, 需要理解 RocketMQ
的四个重要组件, 以及 RocketMQ
的工作流程:
-
NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
-
Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
-
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
-
Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。
结合部署架构图,描述集群工作流程:
- 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
准备环境
- JDK1.8
- IDEA.2022
- RocketMQ-4.6
- windows11
构建源码
- 拉取源码:
git clone https://github.com/apache/rocketmq.git
我的项目路径在 d:\yyr\zgp\rocketmq
- 执行 maven 命令
mvn clean install -Dmaven.test.skip=true
我们主要关注上图中标注的 4 个模块
- broker
- namesrv
- example
- distribution
运行程序
0. 准备工作
在 rocketmq 工程的根目录下, 新建 conf 目录(暂时不关注目录中的四个文件)
1. 运行 NameServer
- 打开
distribution
模块, 将logback_namesrv.xml
文件放在 rocketmq/conf 目录下
- 首先需要,通过 IDEA 配置环境变量
ROCKETMQ_HOME
, 类似我们装 JDK 一样, 需要配置JAVA_HOME
。
- 运行 namesrv 模块启动类
org/apache/rocketmq/namesrv/NamesrvStartup.java
模块
当出现如下日志时, 通常可以 断定 是启动成功了。
2. 运行 Broker
1.打开 distribution
模块, 将 logback_broker.xml
以及 broker.conf
文件放在 rocketmq/conf 目录下
2.通过 IDEA 配置环境变量 ROCKETMQ_HOME
, 类似我们装 JDK 一样, 需要配置 JAVA_HOME
。并指定程序运行时需要读取的配置文件 broker.conf 的位置
- 运行 broker 模块
启动类 org/apache/rocketmq/broker/BrokerStartup.java
当出现如下日志时, 通常可以 断定 是启动成功了。(很多文章会说控制台打印如下日志就代表 broker 运行成功了, 其实不然。后面会说明原因)
3. 运行消息生产者 Producer
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// TopicTest 可以随意替换
DefaultMQProducer producer = new DefaultMQProducer("TopicTest");
// 指定 namesrv 地址, 默认端口是 9876
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 这里进行了修改, 只发送一条消息
for (int i = 0; i < 1; i++) {
try {
Message msg = new Message("zhangsan" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
消息发送成功
可能会出现的问题
找不到名称为 “zhangsan” 的消息主题
问题出现的主要原因是 broker 没有注册到 namesvr, 要么没有指定 broker.conf 文件, 要么就是 broker.conf 配置文件中没有配置 namesvr 的地址。
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = true
namesrvAddr = localhost:9876 // 注意这里
4. 运行 Consumer
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 与生产者保持一致
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TopicTest
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* Subscribe one more more topics to consume.
*/
consumer.subscribe("TopicTest1", "*");
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* Launch the consumer instance.
*/
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消息消费成功
5. 安装可视化工具 rocketmq-console
如果源码环境搭建完成后, 消息始终无法消费,或者没有发送出去,但是又无法判断哪个环节出现了问题, 我们就可以搭建可视化工具, 通常情况下, 这样更容易找到哪个模块出现了问题。
rocketmq-console 的搭建非常简单。
- 克隆源码
https://github.com/apache/rocketmq-externals.git
- 切换到对应的分支
- 修改 application.properties 文件, 添加 namesvr 地址
- 运行并访问
http://localhost:8080/
这是验证 namesvr 和 broker 是否启动成功最简单的办法。
除此之外, 我们也可以看出来消息是否发送成功, 是否消费成功。