RocketMQ简介
RocketMQ是一种分布式消息中间件,它由阿里巴巴集团开发,并且后来捐献给了Apache软件基金会。RocketMQ最初是为了解决阿里巴巴内部因业务增长带来的高吞吐量需求而设计的。随着其不断发展和完善,RocketMQ已经成为了一个能够处理从传统发布/订阅模式到大规模实时无差错交易系统的强大消息引擎。
RocketMQ的主要特点
- 架构简洁:相比其他消息队列系统,RocketMQ具有更简单的架构设计。
- 丰富的业务功能:支持多种消息类型和应用场景,比如实时消息处理、顺序消息处理以及事务消息处理。
- 极高的可扩展性:可以轻松地进行水平扩展以满足不同规模的应用需求。
- 金融级可靠性:经过长时间在实际生产环境中的验证,被广泛认可为适用于对数据准确性和一致性要求极高的金融场景下的消息- 传递解决方案。
与其它消息队列系统的比较
- Kafka 更适合于大量数据处理(如离线流处理);
- RabbitMQ 则擅长处理复杂的消息路由及支持多种协议;
- ActiveMQ 提供了广泛的协议支持但性能相对较低。
官方网站:http://rocketmq.apache.org
项目地址:https://github.com/apache/rocketmq
中文社区:https://rocketmq.io/
集群模式介绍
RocketMQ 技术架构中包含的角色有NameServer、Broker、Proxy、Controller、dashboard、Producer、Consumer等。
NameServer
是 RocketMQ 的路由服务集群,它不保存任何消息数据,而是维护着所有 Broker 的路由信息(包括 Broker 存活情况、各 topic 的路由信息等)Broker
是 RocketMQ 的核心服务节点,负责接收来自 Producer(生产者)的消息、存储消息以及将消息转发给 Consumer(消费者)。在 RocketMQ 架构中,Broker 分为 Master Broker 和 Slave Broker(或称为 Replica)两种角色Proxy
是 RocketMQ 为了提高性能和简化客户端接入而引入的一个可选组件。它作为一个轻量级的代理服务器,位于客户端与 Broker 之间Controller
是 RocketMQ 5.0 新增的组件,它充当控制平面的角色,负责管理和协调系统的整体状态。它主要出现在可切换架构的部署过程中,如 Broker 宕机时,它将进行调度,对 Broker 集群进行选举管理。Controller 本身也支持集群部署,基于 Raft 实现容灾。Dashboard
是 RocketMQ 提供的一款可视化管理界面,它允许用户通过网页浏览器直观地监控和管理 RocketMQ 集群的状态,包括但不限于查看消息队列、消费进度、Broker 健康状况、主题配置、消费组详情等信息。Producer
与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。Consumer
与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。
RocketMQ集群迭代历程:
- 在RocketMQ 4.5版本之前,使用简单的Master-slave主从模式,如果Master宕机,不支持自动将Slave切换为Master,需要人工介入
- 在RocketMQ 4.5版本之后,引入了DLedger来解决Master/Slave的自动主从切换问题,该机制基于Raft协议实现,但存在局限性
- 在RocketMQ 5.0版本之后,引入controller组件,基于Dledger Controller实现主从切换
Proxy 组件部署有两种方式:
- Local 模式:将 Proxy 与 Broker 一起部署在同一个进程中,这能够节约一些机器资源。
- Cluster 模式:将 Proxy 作为一个独立集群进行部署。这能够提供较强的横向扩展能力。
Controller 组件部署有两种方式:
- 嵌入于 NameServer 进行部署,可以通过配置 enableControllerInNamesrv 打开。
- 另一种是独立部署,需要单独部署 Controller 组件。
数据存储分为同步刷盘和异步刷盘:
- 同步刷盘:当消息到达 Broker 后,只有把消息写入到 CommitLog 日志文件中,才给生产者返回发送成功的响应。
- 异步刷盘:当消息到达 Broker 后,就给生产者返回数据发送成功了,并启动一个异步线程去把消息写入到 CommitLog 中。
主从同步分为同步复制和异步复制:
-
每个Master配置一个Slave,有多组 Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),优点是性能同多Master模式几乎一样,缺点是Master宕机磁盘损坏情况下会丢失少量消息。
-
每个Master配置一个Slave,有多组 Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功。优点是数据与服务都无单点故障,缺点是性能比异步复制模式略低
集群部署规划
本示例基于RocketMQ 5.0 controller模式,采用异步刷盘、同步复制方式,所有组件分离部署:
- proxy为无状态组件,可以部署1至多个节点,本示例为2节点冗余部署
- nameserver为无状态组件,可以部署1至多个节点,本示例为2节点冗余部署
- controller基于raft协议,需要部署至少3个节点,本示例为3节点冗余部署
- broker部署2个副本组,每个副本组包含1个主节点和1个从节点,主从选举及故障切换由controller组件自动管理
- dashboard为无状态组件,可以部署1至多个节点,本示例为2节点冗余部署
- LB节点通过haproxy和keepalived实现,作为proxy和dashboard的统一负载均衡入口,本示例为2节点冗余部署
节点规划清单如下:
主机名 | IP地址 | 角色 | 操作系统 | CPU/内存/磁盘 |
---|---|---|---|---|
LB01 | 192.168.72.18 | 负载均衡器 | Ubuntu 22.04 LTS | 2C /4GB/100GB DATA DISK |
LB02 | 192.168.72.19 | 负载均衡器 | Ubuntu 22.04 LTS | 2C /4GB/100GB DATA DISK |
proxy01 | 192.168.72.31 | Proxy节点 | Ubuntu 22.04 LTS | 2C /4GB/100GB DATA DISK |
proxy02 | 192.168.72.32 | Proxy节点 | Ubuntu 22.04 LTS | 2C /4GB/100GB DATA DISK |
nameserver01 | 192.168.72.33 | NameServer节点 | Ubuntu 22.04 LTS | 2C /4GB/100GB DATA DISK |
nameserver02 | 192.168.72.34 | NameServer节点 | Ubuntu 22.04 LTS | 2C /4GB/100GB DATA DISK |
broker-1-1 | 192.168.72.51 | Broker副本组1-1 | Ubuntu 22.04 LTS | 2C /8GB/100GB DATA DISK |
broker-1-2 | 192.168.72.52 | Broker副本组1-2 | Ubuntu 22.04 LTS | 2C /8GB/100GB DATA DISK |
broker-2-1 | 192.168.72.53 | Broker副本组2-1 | Ubuntu 22.04 LTS | 2C /8GB/100GB DATA DISK |
broker-2-2 | 192.168.72.54 | Broker副本组2-2 | Ubuntu 22.04 LTS | 2C /8GB/100GB DATA DISK |
controller01 | 192.168.72.40 | Controller节点 | Ubuntu 22.04 LTS | 2C /4GB/100GB DATA DISK |
controller02 | 192.168.72.41 | Controller节点 | Ubuntu 22.04 LTS | 2C /4GB/100GB DATA DISK |
controller03 | 192.168.72.42 | Controller节点 | Ubuntu 22.04 LTS | 2C /4GB/100GB DATA DISK |
dashboard01 | 192.168.72.55 | Dashboard管理节点 | Ubuntu 22.04 LTS | 2C /4GB/100GB DATA DISK |
dashboard02 | 192.168.72.56 | Dashboard管理节点 | Ubuntu 22.04 LTS | 2C /4GB/100GB DATA DISK |
proxy VIP | 192.168.72.250 | proxy VIP | N/A | N/A |
dashboard VIP | 192.168.72.251 | dashboard VIP | N/A | N/A |
节点配置说明:
- 可视情况合并部分组件复用一个节点部署,nameserver、broker为必选组件
- broker 至少8G内存,生产环境建议提高配置,例如8C 16G 500G SSD,数据盘可单独挂载至
/data
目录 - 提供两个分离的VIP地址,一个作为rocketmq proxy统一入口,一个作为rocketmq dashboard统一入口
集群架构如下图所示:
部署目录规划
- 安装目录:统一安装包及配置文件存放路径,如
/opt/rocketmq/
- 数据目录:运行过程中生成的日志和数据文件与安装包分开,存放在
/data/
目录下。 - 服务管理:通过
systemd
管理服务,保证开机自启和管理规范化。
基础环境配置
配置主机名,在所有节点执行,每个节点单独配置。
hostnamectl set-hostname nameserver01
hostnamectl set-hostname nameserver02
hostnamectl set-hostname controller01
hostnamectl set-hostname controller02
hostnamectl set-hostname controller03
hostnamectl set-hostname broker-1-1
hostnamectl set-hostname broker-1-2
hostnamectl set-hostname broker-2-1
hostnamectl set-hostname broker-2-2
hostnamectl set-hostname dashboard01
hostnamectl set-hostname dashboard02
hostnamectl set-hostname proxy01
hostnamectl set-hostname proxy02
hostnamectl set-hostname lb01
hostnamectl set-hostname lb02
配置所有节点时间同步,默认与互联网NTP同步时间
apt install -y chrony
systemctl enable --now chrony
基础软件安装,在所有非LB节点执行。
apt update -y
apt install -y unzip
apt install -y openjdk-11-jdk
依赖说明:
- RocketMQ依赖JAVA环境,因此需要安装openjdk,注意版本兼容性
- 官方二进制包解压需要使用unzip工具
操作系统初始化,在所有非LB节点执行,配置RocketMQ相关系统参数。
wget https://raw.githubusercontent.com/apache/rocketmq/refs/heads/develop/distribution/bin/os.sh
bash os.sh
部署 RocketMQ 软件包
以下操作在所有非LB节点执行。
下载二进制文件,以使用RocketMQ 5.3.1版本二进制包安装为例。
wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.1/rocketmq-all-5.3.1-bin-release.zip
将 rocketmq-all-5.3.1-bin-release.zip
上传到目标服务器。执行以下命令解压到/opt目录下:
# 解压安装包到指定目录
sudo unzip rocketmq-all-5.3.1-bin-release.zip -d /opt
# 创建软链接,便于后续升级管理
sudo ln -s /opt/rocketmq-all-5.3.1-bin-release /opt/rocketmq
配置环境变量,方便调用相关bin二进制命令
cat >/etc/profile.d/rocketmq.sh<<'EOF'
export ROCKETMQ_HOME=/opt/rocketmq
export PATH=$ROCKETMQ_HOME/bin:$PATH
EOF
source /etc/profile
部署 NameServer 组件
在所有nameserver
节点操作。
创建 systemd 服务文件
在 /etc/systemd/system/
目录下创建一个 rocketmq-namesrv.service
文件。写入以下内容:
cat >/etc/systemd/system/rocketmq-namesrv.service<<'EOF'
[Unit]
Description=RocketMQ NameServer
After=network.target
[Service]
Type=simple
User=root
ExecStart=/opt/rocketmq/bin/mqnamesrv
ExecStop=/bin/kill -SIGTERM $MAINPID
Restart=on-failure
RestartSec=5
LimitNOFILE=65536
Environment="ROCKETMQ_HOME=/opt/rocketmq"
Environment="JAVA_OPT=-Duser.home=/data/rocketmq-namesrv"
[Install]
WantedBy=multi-user.target
EOF
启用并启动 NameServer 服务
执行以下命令:
# 启动 NameServer 服务
sudo systemctl start rocketmq-namesrv
# 设置开机自启
sudo systemctl enable rocketmq-namesrv
# 检查服务状态
sudo systemctl status rocketmq-namesrv
# 重启服务(可选)
systemctl daemon-reload
systemctl restart rocketmq-namesrv
示例输出结果
root@nameserver01:~# systemctl status rocketmq-namesrv
● rocketmq-namesrv.service - RocketMQ NameServer
Loaded: loaded (/etc/systemd/system/rocketmq-namesrv.service; enabled; vendor preset: enabled)
Active: active (running) since Sun 2024-12-22 15:27:03 CST; 1min 36s ago
Main PID: 112830 (mqnamesrv)
Tasks: 74 (limit: 4557)
Memory: 240.4M
CPU: 5.388s
CGroup: /system.slice/rocketmq-namesrv.service
├─112830 /bin/sh /opt/rocketmq/bin/mqnamesrv
├─112831 sh /opt/rocketmq/bin/runserver.sh -Drmq.logback.configurationFile=/opt/rocketmq/conf/rmq.namesrv.logback.xml org.apache.rocketmq.namesrv.NamesrvStartup
└─112858 /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Duser.home=/data/rocketmq-namesrv -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m -XX:+UseG1GC -XX:G1Heap>
Dec 22 15:27:03 nameserver01 systemd[1]: Started RocketMQ NameServer.
Dec 22 15:27:05 nameserver01 mqnamesrv[112858]: The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
日志检查与验证
NameServer 默认的日志目录位于home目录下,可通过以下命令检查:
tail -f /data/rocketmq-namesrv/logs/rocketmqlogs/namesrv.log
确保 NameServer 服务正常监听在 9876 端口:
root@nameserver01:~# ss -tlnp | grep 9876
LISTEN 0 1024 *:9876 *:* users:(("java",pid=7363,fd=152))
部署 Controller 组件
以下操作在controller节点执行。
修改controller配置文件
创建controller01
节点配置文件,在Controller01
节点操作。
cat >/opt/rocketmq/conf/controller/controller.conf<<EOF
controllerDLegerGroup = group1
controllerDLegerPeers = n0-192.168.72.40:9878;n1-192.168.72.41:9878;n2-192.168.72.42:9878
controllerDLegerSelfId = n0
controllerStorePath = /data/admin/DledgerController
EOF
创建controller02
节点配置文件,在Controller02
节点操作。
cat >/opt/rocketmq/conf/controller/controller.conf<<EOF
controllerDLegerGroup = group1
controllerDLegerPeers = n0-192.168.72.40:9878;n1-192.168.72.41:9878;n2-192.168.72.42:9878
controllerDLegerSelfId = n1
controllerStorePath = /data/admin/DledgerController
EOF
创建controller03
节点配置文件,在Controller03
节点操作。
cat >/opt/rocketmq/conf/controller/controller.conf<<EOF
controllerDLegerGroup = group1
controllerDLegerPeers = n0-192.168.72.40:9878;n1-192.168.72.41:9878;n2-192.168.72.42:9878
controllerDLegerSelfId = n2
controllerStorePath = /data/admin/DledgerController
EOF
创建 systemd 服务文件
在所有Controller
节点操作。
在 /etc/systemd/system/
目录下创建一个 rocketmq-controller.service
文件。写入以下内容:
cat >/etc/systemd/system/rocketmq-controller.service<<'EOF'
[Unit]
Description=RocketMQ Controller
After=network.target
[Service]
Type=simple
User=root
ExecStart=/opt/rocketmq/bin/mqcontroller -c /opt/rocketmq/conf/controller/controller.conf
ExecStop=/bin/kill -SIGTERM $MAINPID
Restart=on-failure
RestartSec=5
Environment="ROCKETMQ_HOME=/opt/rocketmq"
Environment="JAVA_OPT=-Duser.home=/data/admin"
[Install]
WantedBy=multi-user.target
EOF
启用并启动 controller 服务
在所有Controller
节点操作,执行以下命令:
# 启动 Controller 服务
sudo systemctl start rocketmq-controller
# 设置开机自启
sudo systemctl enable rocketmq-controller
# 检查服务状态
sudo systemctl status rocketmq-controller
# 重启服务(可选)
systemctl daemon-reload
systemctl restart rocketmq-controller
示例输出如下
root@controller01:~# systemctl status rocketmq-controller.service
● rocketmq-controller.service - RocketMQ Controller
Loaded: loaded (/etc/systemd/system/rocketmq-controller.service; enabled; vendor preset: enabled)
Active: active (running) since Sun 2024-12-22 15:31:23 CST; 2min 19s ago
Main PID: 124707 (mqcontroller)
Tasks: 89 (limit: 4556)
Memory: 314.1M
CPU: 26.095s
CGroup: /system.slice/rocketmq-controller.service
├─124707 /bin/sh /opt/rocketmq/bin/mqcontroller -c /opt/rocketmq/conf/controller/controller.conf
├─124708 sh /opt/rocketmq/bin/runserver.sh -Drmq.logback.configurationFile=/opt/rocketmq/conf/rmq.controller.logback.xml org.apache.rocketmq.controller.ControllerStartup -c /opt/rock>
└─124735 /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Duser.home=/data/admin -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m -XX:+UseG1GC -XX:G1HeapRegionSize=>
Dec 22 15:31:23 controller01 systemd[1]: Started RocketMQ Controller.
Dec 22 15:31:25 controller01 mqcontroller[124735]: load config properties file OK, /opt/rocketmq/conf/controller/controller.conf
Dec 22 15:31:26 controller01 mqcontroller[124735]: WARNING: An illegal reflective access operation has occurred
Dec 22 15:31:26 controller01 mqcontroller[124735]: WARNING: Illegal reflective access by com.caucho.hessian.io.JavaDeserializer (file:/opt/rocketmq-all-5.3.1-bin-release/lib/hessian-3.3.6.jar) to>
Dec 22 15:31:26 controller01 mqcontroller[124735]: WARNING: Please consider reporting this to the maintainers of com.caucho.hessian.io.JavaDeserializer
Dec 22 15:31:26 controller01 mqcontroller[124735]: WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
Dec 22 15:31:26 controller01 mqcontroller[124735]: WARNING: All illegal access operations will be denied in a future release
Dec 22 15:31:27 controller01 mqcontroller[124735]: The Controller Server boot success. serializeType=JSON
日志检查与验证
Controller 默认的日志目录位于home目录下,可通过以下命令检查:
tail -f /data/admin/logs/rocketmqlogs/controller.log
确保 Controller 服务正常监听在 9878 端口:
root@controller01:~# ss -tlnp | grep 9878
LISTEN 0 1024 *:9878 *:* users:(("java",pid=121984,fd=157))
查看controller状态
root@controller01:~# mqadmin getControllerMetaData -a 192.168.72.40:9878
#ControllerGroup group1
#ControllerLeaderId n0
#ControllerLeaderAddress 192.168.72.40:9878
#Peer: n0:192.168.72.40:9878
#Peer: n1:192.168.72.41:9878
#Peer: n2:192.168.72.42:9878
参数-a 代表的是任意一个 Controller 的地址
部署 Broker 组件
以下操作在broker节点执行。
修改broker配置文件
备份默认配置文件,在所有broker节点操作。
cp /opt/rocketmq/conf/broker.conf{,.bak}
修改两个broker-1-*
节点配置文件,副本组1,两个节点配置相同
cat >/opt/rocketmq/conf/broker.conf<<EOF
brokerClusterName=DefaultCluster
brokerName=broker-1
listenPort=10911
storePathCommitLog=/data/rocketmq-broker/store/commitlog/
storePathConsumerQueue=/data/rocketmq-broker/store/consumequeue/
allAckInSyncStateSet=true
#brokerId=-1
deleteWhen=04
fileReservedTime=48
#brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
enableControllerMode = true
controllerAddr = 192.168.72.40:9878;192.168.72.41:9878;192.168.72.42:9878
namesrvAddr = 192.168.72.33:9876;192.168.72.34:9876
EOF
参数说明:
brokerClusterName:
设定 Broker 所属的集群名称。默认是DefaultCluster
。brokerName:
指定 Broker 名称,用于区分集群中的不同 Broker,同一个副本组内节点参数相同,不同副本组不同。listenPort:
设置 Broker 的监听端口,默认为10911
。storePathCommitLog:
指定 commitlog 文件的存储路径,RocketMQ 存储消息的文件。storePathConsumerQueue:
指定消费者队列文件的存储路径。allAckInSyncStateSet=true:
若该值为 true,则一条消息需要复制到 SyncStateSet 中的每一个副本才会向客户端返回成功,可以保证消息不丢失。默认为 false。deleteWhen:
该参数指定 commitlog 文件的删除策略,04
表示每天的 4 点删除过期的 commitlog 文件。fileReservedTime:
配置 RocketMQ 存储的日志文件的保留时间,单位为小时。48 小时后未被消费的消息文件会被删除。flushDiskType:
配置日志刷盘策略。ASYNC_FLUSH 表示异步刷盘,通常性能较好,但可能会丢失部分数据。enableControllerMode:
启用控制器模式,用于 RocketMQ 集群管理和控制,通常在 RocketMQ 4.x 版本中开启。controllerAddr:
设置控制器节点的地址,用于集群的管理。namesrvAddr:
设置 NameServer 的地址,RocketMQ 的 NameServer 用于提供消息路由信息。
修改两个broker-2-*
节点配置文件,副本组2,两个节点配置相同,注意与副本组1的brokerName
参数不同
cat >/opt/rocketmq/conf/broker.conf<<EOF
brokerClusterName=DefaultCluster
brokerName=broker-2
listenPort=10911
storePathCommitLog=/data/rocketmq-broker/store/commitlog/
storePathConsumerQueue=/data/rocketmq-broker/store/consumequeue/
allAckInSyncStateSet=true
#brokerId=-1
deleteWhen=04
fileReservedTime=48
#brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
enableControllerMode = true
controllerAddr = 192.168.72.40:9878;192.168.72.41:9878;192.168.72.42:9878
namesrvAddr = 192.168.72.33:9876;192.168.72.34:9876
EOF
创建 systemd 服务文件
在所有broker节点操作。
在 /etc/systemd/system/
目录下创建一个 rocketmq-broker.service
文件。写入以下内容:
cat >/etc/systemd/system/rocketmq-broker.service<<'EOF'
[Unit]
Description=RocketMQ Broker
After=network.target
[Service]
Type=simple
User=root
ExecStart=/opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf
ExecStop=/bin/kill -SIGTERM $MAINPID
Restart=on-failure
RestartSec=5
LimitNOFILE=65536
Environment="ROCKETMQ_HOME=/opt/rocketmq"
Environment="JAVA_OPT=-Duser.home=/data/rocketmq-broker"
[Install]
WantedBy=multi-user.target
EOF
启用并启动 Broker 服务
执行以下命令:
# 启动 Broker 服务
sudo systemctl start rocketmq-broker
# 设置开机自启
sudo systemctl enable rocketmq-broker
# 检查服务状态
sudo systemctl status rocketmq-broker
# 重启服务(可选)
systemctl daemon-reload
systemctl restart rocketmq-broker
示例输出如下:
root@broker-1-1:~# systemctl status rocketmq-broker
● rocketmq-broker.service - RocketMQ Broker
Loaded: loaded (/etc/systemd/system/rocketmq-broker.service; enabled; vendor preset: enabled)
Active: active (running) since Sun 2024-12-22 15:55:35 CST; 4min 36s ago
Main PID: 119620 (mqbroker)
Tasks: 187 (limit: 4556)
Memory: 7.3G
CPU: 1min 21.725s
CGroup: /system.slice/rocketmq-broker.service
├─119620 /bin/sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf
├─119621 sh /opt/rocketmq/bin/runbroker.sh -Drmq.logback.configurationFile=/opt/rocketmq/conf/rmq.broker.logback.xml org.apache.rocketmq.broker.BrokerStartup -c /opt/rocketmq/conf/br>
└─119650 /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Duser.home=/data/rocketmq-broker -server -Xms8g -Xmx8g -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:Initiat>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.classic.model.processor.LoggerModelHandler - Setting additivity of logger [io.opente>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.core.model.processor.AppenderRefModelHandler - Attaching appender named [RocketmqBro>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.classic.model.processor.LoggerModelHandler - Setting level of logger [RocketmqAuthAu>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.classic.model.processor.LoggerModelHandler - Setting additivity of logger [RocketmqA>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.core.model.processor.AppenderRefModelHandler - Attaching appender named [RocketmqAut>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.classic.model.processor.RootLoggerModelHandler - Setting level of ROOT logger to INFO
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.core.model.processor.AppenderRefModelHandler - Attaching appender named [DefaultSift>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,709 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.core.model.processor.DefaultProcessor@6b4a4e18 - End of configuration.
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,713 |-INFO in org.apache.rocketmq.common.logging.JoranConfiguratorExt@27c86f2d - Registering current configuration as safe fallback point
Dec 22 15:55:51 broker-1-1 mqbroker[119650]: The broker[broker-1, 192.168.72.51:10911] boot success. serializeType=JSON and name server is 192.168.72.33:9876;192.168.72.34:9876
lines 1-22/22 (END)
日志检查与验证
Broker默认的日志目录位于home目录下,可通过以下命令检查:
tail -f /data/rocketmq-broker/logs/rocketmqlogs/broker.log
确保 Broker 服务正常监听端口:
root@broker01-master:~# ss -tlnp | grep 10
LISTEN 0 1024 *:10909 *:* users:(("java",pid=10544,fd=176))
LISTEN 0 1024 *:10911 *:* users:(("java",pid=10544,fd=175))
LISTEN 0 50 *:10912 *:* users:(("java",pid=10544,fd=169))
端口说明:
10911端口
:这是RocketMQ Broker组件默认使用的监听端口(listenPort)。要修改这一设定值,应该直接在Broker的配置文件内调整listenPort参数。10912端口
:被称为haListenPort,专门用于Master Broker与Slave Broker之间进行主备切换时的数据同步。如需改变此端口,同样需要在Broker的配置文件中寻找并更新haListenPort项。10909端口
:一般情况下,它代表了Broker VIPChannel所用到的端口,默认计算方式为listenPort - 2(因此通常是10909)。VIPChannel是一种特别设计的网络通道,旨在优化高负载环境下的性能表现。如果想要变更这个特定端口,可以考虑调整listenPort或直接在配置中指定新的VIPChannel端口。
查看 SyncStateSet
可以通过运维工具查看 SyncStateSet:
mqadmin getSyncStateSet -a 192.168.72.40:9878 -b broker-1
参数-a 代表的是任意一个 Controller 的地址
如果顺利的话,可以看到以下内容:
root@broker-1-1:~# mqadmin getSyncStateSet -a 192.168.72.40:9878 -b broker-1
#brokerName broker-1
#MasterBrokerId 1
#MasterAddr 192.168.72.51:10911
#MasterEpoch 6
#SyncStateSetEpoch 6
#SyncStateSetNums 2
InSyncReplica: ReplicaIdentity{brokerName='broker-1', brokerId=1, brokerAddress='192.168.72.51:10911', alive=true}
InSyncReplica: ReplicaIdentity{brokerName='broker-1', brokerId=2, brokerAddress='192.168.72.52:10911', alive=true}
root@broker-1-1:~# mqadmin getSyncStateSet -a 192.168.72.40:9878 -b broker-2
#brokerName broker-2
#MasterBrokerId 1
#MasterAddr 192.168.72.53:10911
#MasterEpoch 3
#SyncStateSetEpoch 6
#SyncStateSetNums 2
InSyncReplica: ReplicaIdentity{brokerName='broker-2', brokerId=1, brokerAddress='192.168.72.53:10911', alive=true}
InSyncReplica: ReplicaIdentity{brokerName='broker-2', brokerId=2, brokerAddress='192.168.72.54:10911', alive=true}
root@broker-1-1:~#
查看 BrokerEpoch
可以通过运维工具查看 BrokerEpochEntry:
mqadmin getBrokerEpoch -n 192.168.72.33:9876 -b broker-1
参数-n 代表的是任意一个 Namesrv 的地址
如果顺利的话,可以看到以下内容:
root@broker-1-1:~# mqadmin getBrokerEpoch -n 192.168.72.33:9876 -b broker-1
#clusterName DefaultCluster
#brokerName broker-1
#brokerAddr 192.168.72.51:10911
#brokerId 0
#Epoch: EpochEntry{epoch=1, startOffset=0, endOffset=0}
#clusterName DefaultCluster
#brokerName broker-1
#brokerAddr 192.168.72.52:10911
#brokerId 2
#Epoch: EpochEntry{epoch=1, startOffset=0, endOffset=0}
root@broker-1-1:~#
root@broker-1-1:~#
root@broker-1-1:~# mqadmin getBrokerEpoch -n 192.168.72.33:9876 -b broker-2
#clusterName DefaultCluster
#brokerName broker-2
#brokerAddr 192.168.72.54:10911
#brokerId 2
#Epoch: EpochEntry{epoch=1, startOffset=0, endOffset=217}
#Epoch: EpochEntry{epoch=2, startOffset=217, endOffset=217}
#clusterName DefaultCluster
#brokerName broker-2
#brokerAddr 192.168.72.53:10911
#brokerId 0
#Epoch: EpochEntry{epoch=1, startOffset=0, endOffset=217}
#Epoch: EpochEntry{epoch=2, startOffset=217, endOffset=217}
root@broker-1-1:~#
列出集群信息
root@broker-1-1:~# mqadmin clusterlist -n 192.168.72.33:9876
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #Timer(Progress) #PCWait(ms) #Hour #SPACE #ACTIVATED
DefaultCluster broker-1 0 192.168.72.51:10911 V5_3_1 0.00(0,0ms) 0.00(0,0ms) 0-0(0.0w, 0.0, 0.0) 0 481904.09 0.1100 true
DefaultCluster broker-1 2 192.168.72.52:10911 V5_3_1 0.00(0,0ms) 0.00(0,0ms) 2-0(0.0w, 0.0, 0.0) 0 481904.09 0.1000 false
DefaultCluster broker-2 0 192.168.72.53:10911 V5_3_1 0.00(0,0ms) 0.00(0,0ms) 0-0(0.0w, 0.0, 0.0) 0 0.68 0.1100 true
DefaultCluster broker-2 2 192.168.72.54:10911 V5_3_1 0.00(0,0ms) 0.00(0,0ms) 2-0(0.0w, 0.0, 0.0) 0 0.68 0.1100 false
共2个副本组,每个副本组1主1从两个broker节点。
部署 Proxy 组件
在所有proxy节点操作。
创建 systemd 服务文件
在 /etc/systemd/system/
目录下创建一个 rocketmq-proxy.service
文件。写入以下内容:
cat >/etc/systemd/system/rocketmq-proxy.service<<'EOF'
[Unit]
Description=RocketMQ Proxy
After=network.target
[Service]
Type=simple
User=root
Environment="ROCKETMQ_HOME=/opt/rocketmq"
Environment="JAVA_OPT=-Duser.home=/data/rocketmq-proxy"
Environment="NAMESRV_ADDR=192.168.72.33:9876;192.168.72.34:9876"
ExecStart=/opt/rocketmq/bin/mqproxy -n ${NAMESRV_ADDR}
ExecStop=/bin/kill -SIGTERM $MAINPID
Restart=on-failure
RestartSec=5
[Install]
WantedBy=multi-user.target
EOF
启用并启动 proxy服务
执行以下命令:
# 启动 proxy 服务
sudo systemctl start rocketmq-proxy
# 设置开机自启
sudo systemctl enable rocketmq-proxy
# 检查服务状态
sudo systemctl status rocketmq-proxy
# 重新启动服务(可选)
systemctl daemon-reload
systemctl restart rocketmq-proxy
日志检查与验证
Proxy默认的日志目录位于home目录下,可通过以下命令检查:
tail -f /data/rocketmq-proxy/logs/rocketmqlogs/proxy.log
确保 NameServer 服务正常监听端口:
root@proxy01:~# ss -tlnp | grep :80
LISTEN 0 1024 *:8080 *:* users:(("java",pid=8906,fd=229))
LISTEN 0 4096 *:8081 *:* users:(("java",pid=8906,fd=228))
端口说明:
- 8080端口:这是RocketMQ Proxy中默认的remoting协议访问端口(
remotingListenPort
)。此外,在RocketMQ Dashboard中也作为默认访问端口。如果需要更改这个端口号,对于Proxy来说,可以通过修改配置文件conf/rmq-proxy.json
来实现;而对于Dashboard,则是通过调整application.yml
文件中的相关设置完成,参考链接为这里。 - 8081端口:在RocketMQ Proxy里,该端口被指定为gRPC协议访问端口(
grpcServerPort
)。同样地,若需自定义此端口号,应编辑rmq-proxy.json
配置文件。
部署 Dashboard 组件
在所有dashboard节点操作。
下载并解压rocketmq-dashboard
源码包
wget https://dist.apache.org/repos/dist/release/rocketmq/rocketmq-dashboard/2.0.0/rocketmq-dashboard-2.0.0-source-release.zip
sudo unzip rocketmq-dashboard-2.0.0-source-release.zip -d /opt
sudo ln -s /opt/rocketmq-dashboard-2.0.0-source-release /opt/rocketmq-dashboard
修改配置文件
root@dashboard01:~# cat /opt/rocketmq-dashboard/src/main/resources/application.yml
rocketmq:
config:
namesrvAddrs:
- 192.168.72.33:9876;192.168.72.34:9876
dataPath: /data/rocketmq-console/data
loginRequired: true
proxyAddrs:
- 192.168.72.31:8080
- 192.168.72.32:8080
编译 rocketmq-dashboard
cd /opt/rocketmq-dashboard
apt install -y maven
mvn clean package -Dmaven.test.skip=true
创建 systemd 服务文件
在 /etc/systemd/system/
目录下创建一个 rocketmq-dashboard.service
文件。写入以下内容:
cat >/etc/systemd/system/rocketmq-dashboard.service<<'EOF'
[Unit]
Description=RocketMQ Dashboard Service
After=network.target
[Service]
ExecStart=/usr/bin/java $JAVA_OPT -jar /opt/rocketmq-dashboard/target/rocketmq-dashboard-2.0.0.jar
WorkingDirectory=/opt/rocketmq-dashboard
StandardOutput=journal
StandardError=journal
Restart=always
User=root
Group=root
Environment="JAVA_OPT=-server -Xms256m -Xmx256m -Duser.home=/data/rocketmq-console"
[Install]
WantedBy=multi-user.target
EOF
启用并启动 dashboard 服务
执行以下命令:
# 启动 dashboard 服务
sudo systemctl start rocketmq-dashboard
# 设置开机自启
sudo systemctl enable rocketmq-dashboard
# 检查服务状态
sudo systemctl status rocketmq-dashboard
# 重新服务(可选)
systemctl daemon-reload
systemctl restart rocketmq-dashboard
日志检查与验证
Dashboard默认的日志目录位于home目录下,可通过以下命令检查:
tail -f /data/rocketmq-console/logs/dashboardlogs/rocketmq-dashboard.log
确保 Dashboard服务正常监听端口:
root@dashboard01:~# ss -tlnp | grep 8080
提示:Started App in x.xxx seconds (JVM running for x.xxx) 启动成功
浏览器页面访问:http://dashboard-addr:8080
部署 LB 组件
在所有lb节点执行。
安装haproxy和keepalived
apt install -y haproxy keepalived
修改haproxy配置文件,所有lb节点配置相同。
cat >/etc/haproxy/haproxy.cfg<<EOF
global
log /dev/log local0
log /dev/log local1 notice
maxconn 200
uid 99
gid 99
daemon
defaults
log global
option httplog
timeout connect 5000ms
timeout client 50000ms
timeout server 50000ms
frontend proxy_front_remoting
bind *:8080
mode tcp
default_backend proxy_back_remoting
backend proxy_back_remoting
mode tcp
balance roundrobin
option http-server-close
option forwardfor
server proxy01 192.168.72.31:8080 check
server proxy02 192.168.72.32:8080 check
frontend proxy_front_grpc
bind *:8081
mode tcp
default_backend proxy_back_grpc
backend proxy_back_grpc
mode tcp
balance roundrobin
option http-server-close
option forwardfor
server proxy01 192.168.72.31:8081 check
server proxy02 192.168.72.32:8081 check
frontend dashboard_front
bind *:8088
mode http
default_backend dashboard_back
backend dashboard_back
mode http
balance roundrobin
cookie SERVERID insert indirect nocache
option http-server-close
option forwardfor
server dashboard1 192.168.72.55:8080 check cookie dashboard1
server dashboard2 192.168.72.56:8080 check cookie dashboard2
EOF
修改keepalived配置文件,所有lb节点配置相同。
cat >/etc/keepalived/keepalived.conf<<EOF
global_defs {
router_id 51
vrrp_version 2
vrrp_garp_master_delay 1
vrrp_garp_master_refresh 1
script_user root
enable_script_security
}
vrrp_script check_haproxy {
script "/usr/bin/killall -0 haproxy"
timeout 3
interval 5 # check every 5 second
fall 3 # require 3 failures for KO
rise 2 # require 2 successes for OK
}
# VRRP Instance for Proxy VIP
vrrp_instance VI_Proxy {
state BACKUP
interface ens33
virtual_router_id 51
priority 101
advert_int 1
authentication {
auth_type PASS
auth_pass 1234
}
virtual_ipaddress {
192.168.72.250 dev ens33 # Proxy VIP
}
nopreempt
track_script {
check_haproxy
}
}
# VRRP Instance for Dashboard VIP
vrrp_instance VI_Dashboard {
state BACKUP
interface ens33
virtual_router_id 52
priority 101
advert_int 1
authentication {
auth_type PASS
auth_pass 1234
}
virtual_ipaddress {
192.168.72.251 dev ens33 # Dashboard VIP
}
nopreempt
track_script {
check_haproxy
}
}
EOF
参数说明:
interface ens33
:指定节点网卡名称virtual_ipaddress
:指定VIP地址
启动服务
systemctl enable --now haproxy.service keepalived.service
测试故障转移
在副本组1中,IP为192.168.72.51的节点,即broker-1-1
为master节点,示例如下:
将该节点直接关机,模拟broker master节点故障
root@broker-1-1:~# shutdown -h now
重新查看集群状态,节点IP为192.168.72.52的节点自动提升为master节点,继续为客户端提供消息写入及消费能力。
重新恢复IP为192.168.72.51的节点,即broker-1-1
,该节点将成为slave节点。
使用工具生产与消费消息
创建topic
mqadmin updateTopic -n 192.168.72.33:9876 -t TopicTest -c DefaultCluster \
-a +message.type=NORMAL
示例输出
root@nameserver01:~# mqadmin updateTopic -n 192.168.72.33:9876 -t TopicTest -c DefaultCluster \
-a +message.type=NORMAL
create topic to 192.168.72.51:10911 success.
create topic to 192.168.72.53:10911 success.
TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+message.type=NORMAL}]
root@nameserver01:~#
发送消息,连接到proxy VIP地址
export NAMESRV_ADDR=192.168.72.250:8080
tools.sh org.apache.rocketmq.example.quickstart.Producer
示例输出
SendResult [sendStatus=SEND_OK, msgId=C0A84821BD1D5505405770783E0D03E4, offsetMsgId=C0A8483300002A9F000000000009616C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-1, queueId=2], q
ueueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=C0A84821BD1D5505405770783E0F03E5, offsetMsgId=C0A8483300002A9F000000000009625E, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-1, queueId=3], q
ueueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=C0A84821BD1D5505405770783E1003E6, offsetMsgId=C0A8483300002A9F0000000000096350, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-1, queueId=4], q
ueueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=C0A84821BD1D5505405770783E1203E7, offsetMsgId=C0A8483300002A9F0000000000096442, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-1, queueId=5], q
ueueOffset=125]
消费消息,连接到proxy VIP地址
export NAMESRV_ADDR=192.168.72.250:8080
tools.sh org.apache.rocketmq.example.quickstart.Consumer
示例输出
ConsumeMessageThread_please_rename_unique_group_name_4_5 Receive New Messages: [MessageExt [brokerName=broker-2, queueId=4, storeSize=242, queueOffset=100, sysFlag=0, bornTimestamp=1734869327582, bornHost=/192.168.72.31:57150, storeTimestamp=1734869327582, storeHost=/192.168.72.53:10911, msgId=C0A8483500002A9F000000000008A628, commitLogOffset=566824, bodyCRC=1500772453, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1734869422248, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A84821BD1D5505405770783ADE026E, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=124}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 54, 50, 50], transactionId='null'}]]
ConsumeMessageThread_please_rename_unique_group_name_4_14 Receive New Messages: [MessageExt [brokerName=broker-2, queueId=4, storeSize=242, queueOffset=98, sysFlag=0, bornTimestamp=1734869327512, bornHost=/192.168.72.31:57162, storeTimestamp=1734869327512, storeHost=/192.168.72.53:10911, msgId=C0A8483500002A9F0000000000089708, commitLogOffset=562952, bodyCRC=1456471771, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1734869422248, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A84821BD1D5505405770783A98024E, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=124}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 57, 48], transactionId='null'}]]
ConsumeMessageThread_please_rename_unique_group_name_4_4 Receive New Messages: [MessageExt [brokerName=broker-1, queueId=2, storeSize=242, queueOffset=120, sysFlag=0, bornTimestamp=1734869328255, bornHost=/192.168.72.31:52084, storeTimestamp=1734869328255, storeHost=/192.168.72.51:10911, msgId=C0A8483300002A9F0000000000093B9C, commitLogOffset=605084, bodyCRC=2121214082, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1734869422240, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A84821BD1D5505405770783D7F0394, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=126}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 49, 54], transactionId='null'}]]
ConsumeMessageThread_please_rename_unique_group_name_4_3 Receive New Messages: [MessageExt [brokerName=broker-1, queueId=2, storeSize=242, queueOffset=119, sysFlag=0, bornTimestamp=1734869328229, bornHost=/192.168.72.31:52064, storeTimestamp=1734869328229, storeHost=/192.168.72.51:10911, msgId=C0A8483300002A9F000000000009340C, commitLogOffset=603148, bodyCRC=236436726, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1734869422239, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A84821BD1D5505405770783D650384, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=126}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 48, 48], transactionId='null'}]]
ConsumeMessageThread_please_rename_unique_group_name_4_17 Receive New Messages: [MessageExt [brokerName=broker-2, queueId=5, storeSize=242, queueOffset=97, sysFlag=0, bornTimestamp=1734869327482, bornHost=/192.168.72.31:57146, storeTimestamp=1734869327482, storeHost=/192.168.72.53:10911, msgId=C0A8483500002A9F000000000008906A, commitLogOffset=561258, bodyCRC=942024666, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1734869422221, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A84821BD1D5505405770783A7A023F, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=124}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 55, 53], transactionId='null'}]]
查看dashboard,连接到dashboard VIP
查看topic状态
使用SDK生产与消费消息
使用Idea IDE新建JAVA项目,项目结构如下:
pom.xml
代码示例
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>rocketmq-demo01</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>23</maven.compiler.source>
<maven.compiler.target>23</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>
<!-- SLF4J API -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.16</version> <!-- 使用当前版本 -->
</dependency>
<!-- Logback Classic (SLF4J's default implementation) -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.5.15</version> <!-- 使用当前版本 -->
</dependency>
<!-- Logback Core -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.5.13</version> <!-- 使用当前版本 -->
</dependency>
</dependencies>
</project>
ProducerExample.java
代码示例
package org.example;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerExample {
private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
public static void main(String[] args) throws ClientException {
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
String endpoint = "192.168.72.250:8081";
// 消息发送的目标Topic名称,需要提前创建。
String topic = "TopicTest";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// 初始化Producer时需要设置通信配置以及预绑定的Topic。
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// 普通消息发送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
// 设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
// 设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
// 消息体。
.setBody("messageBody".getBytes())
.build();
try {
// 发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (ClientException e) {
logger.error("Failed to send message", e);
}
// producer.close();
}
}
配置参数说明:
String endpoint
指定proxy VIP地址及端口String topic
指定topic,需提前创建
PushConsumerExample.java
代码示例
package org.example;
import java.io.IOException;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PushConsumerExample {
private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoints = "192.168.72.250:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
// 订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 为消费者指定所属的消费者分组,Group需要提前创建。
String consumerGroup = "YourConsumerGroup";
// 指定需要订阅哪个目标Topic,Topic需要提前创建。
String topic = "TopicTest";
// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置消费者分组。
.setConsumerGroup(consumerGroup)
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置消费监听器。
.setMessageListener(messageView -> {
// 处理消息并返回消费结果。
logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// 如果不需要再使用 PushConsumer,可关闭该实例。
// pushConsumer.close();
}
}
配置参数说明:
String endpoint
指定proxy VIP地址及端口String topic
指定topic,需提前创建
测试生产消息
测试消费消息