基于Controller模式部署RocketMQ集群

news2024/12/23 16:13:51

RocketMQ简介

RocketMQ是一种分布式消息中间件,它由阿里巴巴集团开发,并且后来捐献给了Apache软件基金会。RocketMQ最初是为了解决阿里巴巴内部因业务增长带来的高吞吐量需求而设计的。随着其不断发展和完善,RocketMQ已经成为了一个能够处理从传统发布/订阅模式到大规模实时无差错交易系统的强大消息引擎。
在这里插入图片描述

RocketMQ的主要特点

  • 架构简洁:相比其他消息队列系统,RocketMQ具有更简单的架构设计。
  • 丰富的业务功能:支持多种消息类型和应用场景,比如实时消息处理、顺序消息处理以及事务消息处理。
  • 极高的可扩展性:可以轻松地进行水平扩展以满足不同规模的应用需求。
  • 金融级可靠性:经过长时间在实际生产环境中的验证,被广泛认可为适用于对数据准确性和一致性要求极高的金融场景下的消息- 传递解决方案。

与其它消息队列系统的比较

  • Kafka 更适合于大量数据处理(如离线流处理);
  • RabbitMQ 则擅长处理复杂的消息路由及支持多种协议;
  • ActiveMQ 提供了广泛的协议支持但性能相对较低。

官方网站:http://rocketmq.apache.org

项目地址:https://github.com/apache/rocketmq

中文社区:https://rocketmq.io/

集群模式介绍

RocketMQ 技术架构中包含的角色有NameServer、Broker、Proxy、Controller、dashboard、Producer、Consumer等。
在这里插入图片描述

  • NameServer 是 RocketMQ 的路由服务集群,它不保存任何消息数据,而是维护着所有 Broker 的路由信息(包括 Broker 存活情况、各 topic 的路由信息等)
  • Broker 是 RocketMQ 的核心服务节点,负责接收来自 Producer(生产者)的消息、存储消息以及将消息转发给 Consumer(消费者)。在 RocketMQ 架构中,Broker 分为 Master Broker 和 Slave Broker(或称为 Replica)两种角色
  • Proxy 是 RocketMQ 为了提高性能和简化客户端接入而引入的一个可选组件。它作为一个轻量级的代理服务器,位于客户端与 Broker 之间
  • Controller是 RocketMQ 5.0 新增的组件,它充当控制平面的角色,负责管理和协调系统的整体状态。它主要出现在可切换架构的部署过程中,如 Broker 宕机时,它将进行调度,对 Broker 集群进行选举管理。Controller 本身也支持集群部署,基于 Raft 实现容灾。
  • Dashboard 是 RocketMQ 提供的一款可视化管理界面,它允许用户通过网页浏览器直观地监控和管理 RocketMQ 集群的状态,包括但不限于查看消息队列、消费进度、Broker 健康状况、主题配置、消费组详情等信息。
  • Producer 与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer 与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。

RocketMQ集群迭代历程:

  • 在RocketMQ 4.5版本之前,使用简单的Master-slave主从模式,如果Master宕机,不支持自动将Slave切换为Master,需要人工介入
  • 在RocketMQ 4.5版本之后,引入了DLedger来解决Master/Slave的自动主从切换问题,该机制基于Raft协议实现,但存在局限性
  • 在RocketMQ 5.0版本之后,引入controller组件,基于Dledger Controller实现主从切换

Proxy 组件部署有两种方式:

  • Local 模式:将 Proxy 与 Broker 一起部署在同一个进程中,这能够节约一些机器资源。
  • Cluster 模式:将 Proxy 作为一个独立集群进行部署。这能够提供较强的横向扩展能力。

Controller 组件部署有两种方式:

  • 嵌入于 NameServer 进行部署,可以通过配置 enableControllerInNamesrv 打开。
  • 另一种是独立部署,需要单独部署 Controller 组件。

数据存储分为同步刷盘和异步刷盘:

  • 同步刷盘:当消息到达 Broker 后,只有把消息写入到 CommitLog 日志文件中,才给生产者返回发送成功的响应。
  • 异步刷盘:当消息到达 Broker 后,就给生产者返回数据发送成功了,并启动一个异步线程去把消息写入到 CommitLog 中。

主从同步分为同步复制和异步复制:

  • 每个Master配置一个Slave,有多组 Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),优点是性能同多Master模式几乎一样,缺点是Master宕机磁盘损坏情况下会丢失少量消息。

  • 每个Master配置一个Slave,有多组 Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功。优点是数据与服务都无单点故障,缺点是性能比异步复制模式略低

集群部署规划

本示例基于RocketMQ 5.0 controller模式,采用异步刷盘、同步复制方式,所有组件分离部署:

  • proxy为无状态组件,可以部署1至多个节点,本示例为2节点冗余部署
  • nameserver为无状态组件,可以部署1至多个节点,本示例为2节点冗余部署
  • controller基于raft协议,需要部署至少3个节点,本示例为3节点冗余部署
  • broker部署2个副本组,每个副本组包含1个主节点和1个从节点,主从选举及故障切换由controller组件自动管理
  • dashboard为无状态组件,可以部署1至多个节点,本示例为2节点冗余部署
  • LB节点通过haproxy和keepalived实现,作为proxy和dashboard的统一负载均衡入口,本示例为2节点冗余部署

节点规划清单如下:

主机名IP地址角色操作系统CPU/内存/磁盘
LB01192.168.72.18负载均衡器Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
LB02192.168.72.19负载均衡器Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
proxy01192.168.72.31Proxy节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
proxy02192.168.72.32Proxy节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
nameserver01192.168.72.33NameServer节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
nameserver02192.168.72.34NameServer节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
broker-1-1192.168.72.51Broker副本组1-1Ubuntu 22.04 LTS2C /8GB/100GB DATA DISK
broker-1-2192.168.72.52Broker副本组1-2Ubuntu 22.04 LTS2C /8GB/100GB DATA DISK
broker-2-1192.168.72.53Broker副本组2-1Ubuntu 22.04 LTS2C /8GB/100GB DATA DISK
broker-2-2192.168.72.54Broker副本组2-2Ubuntu 22.04 LTS2C /8GB/100GB DATA DISK
controller01192.168.72.40Controller节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
controller02192.168.72.41Controller节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
controller03192.168.72.42Controller节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
dashboard01192.168.72.55Dashboard管理节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
dashboard02192.168.72.56Dashboard管理节点Ubuntu 22.04 LTS2C /4GB/100GB DATA DISK
proxy VIP192.168.72.250proxy VIPN/AN/A
dashboard VIP192.168.72.251dashboard VIPN/AN/A

节点配置说明:

  • 可视情况合并部分组件复用一个节点部署,nameserver、broker为必选组件
  • broker 至少8G内存,生产环境建议提高配置,例如8C 16G 500G SSD,数据盘可单独挂载至/data目录
  • 提供两个分离的VIP地址,一个作为rocketmq proxy统一入口,一个作为rocketmq dashboard统一入口

集群架构如下图所示:
在这里插入图片描述
部署目录规划

  • 安装目录:统一安装包及配置文件存放路径,如 /opt/rocketmq/
  • 数据目录:运行过程中生成的日志和数据文件与安装包分开,存放在 /data/目录下。
  • 服务管理:通过 systemd 管理服务,保证开机自启和管理规范化。

基础环境配置

配置主机名,在所有节点执行,每个节点单独配置。

hostnamectl set-hostname nameserver01
hostnamectl set-hostname nameserver02
hostnamectl set-hostname controller01
hostnamectl set-hostname controller02
hostnamectl set-hostname controller03
hostnamectl set-hostname broker-1-1
hostnamectl set-hostname broker-1-2
hostnamectl set-hostname broker-2-1
hostnamectl set-hostname broker-2-2
hostnamectl set-hostname dashboard01
hostnamectl set-hostname dashboard02
hostnamectl set-hostname proxy01
hostnamectl set-hostname proxy02
hostnamectl set-hostname lb01
hostnamectl set-hostname lb02

配置所有节点时间同步,默认与互联网NTP同步时间

apt install -y chrony
systemctl enable --now chrony

基础软件安装,在所有非LB节点执行。

apt update -y
apt install -y unzip
apt install -y openjdk-11-jdk

依赖说明:

  • RocketMQ依赖JAVA环境,因此需要安装openjdk,注意版本兼容性
  • 官方二进制包解压需要使用unzip工具

操作系统初始化,在所有非LB节点执行,配置RocketMQ相关系统参数。

wget https://raw.githubusercontent.com/apache/rocketmq/refs/heads/develop/distribution/bin/os.sh
bash os.sh

部署 RocketMQ 软件包

以下操作在所有非LB节点执行。

下载二进制文件,以使用RocketMQ 5.3.1版本二进制包安装为例。

wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.1/rocketmq-all-5.3.1-bin-release.zip

rocketmq-all-5.3.1-bin-release.zip 上传到目标服务器。执行以下命令解压到/opt目录下:

# 解压安装包到指定目录
sudo unzip rocketmq-all-5.3.1-bin-release.zip -d /opt

# 创建软链接,便于后续升级管理
sudo ln -s /opt/rocketmq-all-5.3.1-bin-release /opt/rocketmq

配置环境变量,方便调用相关bin二进制命令

cat >/etc/profile.d/rocketmq.sh<<'EOF'
export ROCKETMQ_HOME=/opt/rocketmq
export PATH=$ROCKETMQ_HOME/bin:$PATH
EOF

source /etc/profile

部署 NameServer 组件

在所有nameserver节点操作。

创建 systemd 服务文件

/etc/systemd/system/ 目录下创建一个 rocketmq-namesrv.service 文件。写入以下内容:

cat >/etc/systemd/system/rocketmq-namesrv.service<<'EOF'
[Unit]
Description=RocketMQ NameServer
After=network.target

[Service]
Type=simple
User=root
ExecStart=/opt/rocketmq/bin/mqnamesrv
ExecStop=/bin/kill -SIGTERM $MAINPID
Restart=on-failure
RestartSec=5
LimitNOFILE=65536
Environment="ROCKETMQ_HOME=/opt/rocketmq"
Environment="JAVA_OPT=-Duser.home=/data/rocketmq-namesrv"

[Install]
WantedBy=multi-user.target
EOF

启用并启动 NameServer 服务

执行以下命令:

# 启动 NameServer 服务
sudo systemctl start rocketmq-namesrv

# 设置开机自启
sudo systemctl enable rocketmq-namesrv

# 检查服务状态
sudo systemctl status rocketmq-namesrv

# 重启服务(可选)
systemctl daemon-reload
systemctl restart rocketmq-namesrv

示例输出结果

root@nameserver01:~# systemctl status rocketmq-namesrv
● rocketmq-namesrv.service - RocketMQ NameServer
     Loaded: loaded (/etc/systemd/system/rocketmq-namesrv.service; enabled; vendor preset: enabled)
     Active: active (running) since Sun 2024-12-22 15:27:03 CST; 1min 36s ago
   Main PID: 112830 (mqnamesrv)
      Tasks: 74 (limit: 4557)
     Memory: 240.4M
        CPU: 5.388s
     CGroup: /system.slice/rocketmq-namesrv.service
             ├─112830 /bin/sh /opt/rocketmq/bin/mqnamesrv
             ├─112831 sh /opt/rocketmq/bin/runserver.sh -Drmq.logback.configurationFile=/opt/rocketmq/conf/rmq.namesrv.logback.xml org.apache.rocketmq.namesrv.NamesrvStartup
             └─112858 /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Duser.home=/data/rocketmq-namesrv -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m -XX:+UseG1GC -XX:G1Heap>

Dec 22 15:27:03 nameserver01 systemd[1]: Started RocketMQ NameServer.
Dec 22 15:27:05 nameserver01 mqnamesrv[112858]: The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876

日志检查与验证

NameServer 默认的日志目录位于home目录下,可通过以下命令检查:

tail -f /data/rocketmq-namesrv/logs/rocketmqlogs/namesrv.log

确保 NameServer 服务正常监听在 9876 端口

root@nameserver01:~# ss -tlnp | grep 9876
LISTEN 0      1024               *:9876            *:*    users:(("java",pid=7363,fd=152))   

部署 Controller 组件

以下操作在controller节点执行。

修改controller配置文件

创建controller01节点配置文件,在Controller01节点操作。

cat >/opt/rocketmq/conf/controller/controller.conf<<EOF
controllerDLegerGroup = group1
controllerDLegerPeers = n0-192.168.72.40:9878;n1-192.168.72.41:9878;n2-192.168.72.42:9878
controllerDLegerSelfId = n0
controllerStorePath = /data/admin/DledgerController
EOF

创建controller02节点配置文件,在Controller02节点操作。

cat >/opt/rocketmq/conf/controller/controller.conf<<EOF
controllerDLegerGroup = group1
controllerDLegerPeers = n0-192.168.72.40:9878;n1-192.168.72.41:9878;n2-192.168.72.42:9878
controllerDLegerSelfId = n1
controllerStorePath = /data/admin/DledgerController
EOF

创建controller03节点配置文件,在Controller03节点操作。

cat >/opt/rocketmq/conf/controller/controller.conf<<EOF
controllerDLegerGroup = group1
controllerDLegerPeers = n0-192.168.72.40:9878;n1-192.168.72.41:9878;n2-192.168.72.42:9878
controllerDLegerSelfId = n2
controllerStorePath = /data/admin/DledgerController
EOF

创建 systemd 服务文件

在所有Controller节点操作。

/etc/systemd/system/ 目录下创建一个 rocketmq-controller.service 文件。写入以下内容:

cat >/etc/systemd/system/rocketmq-controller.service<<'EOF'
[Unit]
Description=RocketMQ Controller
After=network.target

[Service]
Type=simple
User=root
ExecStart=/opt/rocketmq/bin/mqcontroller -c /opt/rocketmq/conf/controller/controller.conf
ExecStop=/bin/kill -SIGTERM $MAINPID
Restart=on-failure
RestartSec=5
Environment="ROCKETMQ_HOME=/opt/rocketmq"
Environment="JAVA_OPT=-Duser.home=/data/admin"

[Install]
WantedBy=multi-user.target
EOF

启用并启动 controller 服务

在所有Controller节点操作,执行以下命令:

# 启动 Controller 服务
sudo systemctl start rocketmq-controller

# 设置开机自启
sudo systemctl enable rocketmq-controller

# 检查服务状态
sudo systemctl status rocketmq-controller

# 重启服务(可选)
systemctl daemon-reload
systemctl restart rocketmq-controller

示例输出如下

root@controller01:~# systemctl status rocketmq-controller.service 
● rocketmq-controller.service - RocketMQ Controller
     Loaded: loaded (/etc/systemd/system/rocketmq-controller.service; enabled; vendor preset: enabled)
     Active: active (running) since Sun 2024-12-22 15:31:23 CST; 2min 19s ago
   Main PID: 124707 (mqcontroller)
      Tasks: 89 (limit: 4556)
     Memory: 314.1M
        CPU: 26.095s
     CGroup: /system.slice/rocketmq-controller.service
             ├─124707 /bin/sh /opt/rocketmq/bin/mqcontroller -c /opt/rocketmq/conf/controller/controller.conf
             ├─124708 sh /opt/rocketmq/bin/runserver.sh -Drmq.logback.configurationFile=/opt/rocketmq/conf/rmq.controller.logback.xml org.apache.rocketmq.controller.ControllerStartup -c /opt/rock>
             └─124735 /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Duser.home=/data/admin -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m -XX:+UseG1GC -XX:G1HeapRegionSize=>

Dec 22 15:31:23 controller01 systemd[1]: Started RocketMQ Controller.
Dec 22 15:31:25 controller01 mqcontroller[124735]: load config properties file OK, /opt/rocketmq/conf/controller/controller.conf
Dec 22 15:31:26 controller01 mqcontroller[124735]: WARNING: An illegal reflective access operation has occurred
Dec 22 15:31:26 controller01 mqcontroller[124735]: WARNING: Illegal reflective access by com.caucho.hessian.io.JavaDeserializer (file:/opt/rocketmq-all-5.3.1-bin-release/lib/hessian-3.3.6.jar) to>
Dec 22 15:31:26 controller01 mqcontroller[124735]: WARNING: Please consider reporting this to the maintainers of com.caucho.hessian.io.JavaDeserializer
Dec 22 15:31:26 controller01 mqcontroller[124735]: WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
Dec 22 15:31:26 controller01 mqcontroller[124735]: WARNING: All illegal access operations will be denied in a future release
Dec 22 15:31:27 controller01 mqcontroller[124735]: The Controller Server boot success. serializeType=JSON

日志检查与验证

Controller 默认的日志目录位于home目录下,可通过以下命令检查:

tail -f /data/admin/logs/rocketmqlogs/controller.log

确保 Controller 服务正常监听在 9878 端口:

root@controller01:~# ss -tlnp | grep 9878
LISTEN 0      1024               *:9878            *:*    users:(("java",pid=121984,fd=157))   

查看controller状态

root@controller01:~# mqadmin getControllerMetaData -a 192.168.72.40:9878

#ControllerGroup        group1
#ControllerLeaderId     n0
#ControllerLeaderAddress        192.168.72.40:9878
#Peer:  n0:192.168.72.40:9878
#Peer:  n1:192.168.72.41:9878
#Peer:  n2:192.168.72.42:9878

参数-a 代表的是任意一个 Controller 的地址

部署 Broker 组件

以下操作在broker节点执行。

修改broker配置文件

备份默认配置文件,在所有broker节点操作。

cp /opt/rocketmq/conf/broker.conf{,.bak}

修改两个broker-1-*节点配置文件,副本组1,两个节点配置相同

cat >/opt/rocketmq/conf/broker.conf<<EOF
brokerClusterName=DefaultCluster
brokerName=broker-1
listenPort=10911
storePathCommitLog=/data/rocketmq-broker/store/commitlog/
storePathConsumerQueue=/data/rocketmq-broker/store/consumequeue/
allAckInSyncStateSet=true
#brokerId=-1
deleteWhen=04
fileReservedTime=48
#brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
enableControllerMode = true
controllerAddr = 192.168.72.40:9878;192.168.72.41:9878;192.168.72.42:9878
namesrvAddr = 192.168.72.33:9876;192.168.72.34:9876
EOF

参数说明:

  • brokerClusterName:设定 Broker 所属的集群名称。默认是 DefaultCluster
  • brokerName:指定 Broker 名称,用于区分集群中的不同 Broker,同一个副本组内节点参数相同,不同副本组不同。
  • listenPort:设置 Broker 的监听端口,默认为 10911
  • storePathCommitLog:指定 commitlog 文件的存储路径,RocketMQ 存储消息的文件。
  • storePathConsumerQueue:指定消费者队列文件的存储路径。
  • allAckInSyncStateSet=true:若该值为 true,则一条消息需要复制到 SyncStateSet 中的每一个副本才会向客户端返回成功,可以保证消息不丢失。默认为 false。
  • deleteWhen:该参数指定 commitlog 文件的删除策略,04 表示每天的 4 点删除过期的 commitlog 文件。
  • fileReservedTime:配置 RocketMQ 存储的日志文件的保留时间,单位为小时。48 小时后未被消费的消息文件会被删除。
  • flushDiskType:配置日志刷盘策略。ASYNC_FLUSH 表示异步刷盘,通常性能较好,但可能会丢失部分数据。
  • enableControllerMode:启用控制器模式,用于 RocketMQ 集群管理和控制,通常在 RocketMQ 4.x 版本中开启。
  • controllerAddr:设置控制器节点的地址,用于集群的管理。
  • namesrvAddr:设置 NameServer 的地址,RocketMQ 的 NameServer 用于提供消息路由信息。

修改两个broker-2-*节点配置文件,副本组2,两个节点配置相同,注意与副本组1的brokerName参数不同

cat >/opt/rocketmq/conf/broker.conf<<EOF
brokerClusterName=DefaultCluster
brokerName=broker-2
listenPort=10911
storePathCommitLog=/data/rocketmq-broker/store/commitlog/
storePathConsumerQueue=/data/rocketmq-broker/store/consumequeue/
allAckInSyncStateSet=true
#brokerId=-1
deleteWhen=04
fileReservedTime=48
#brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
enableControllerMode = true
controllerAddr = 192.168.72.40:9878;192.168.72.41:9878;192.168.72.42:9878
namesrvAddr = 192.168.72.33:9876;192.168.72.34:9876
EOF

创建 systemd 服务文件

在所有broker节点操作。

/etc/systemd/system/ 目录下创建一个 rocketmq-broker.service 文件。写入以下内容:

cat >/etc/systemd/system/rocketmq-broker.service<<'EOF'
[Unit]
Description=RocketMQ Broker
After=network.target

[Service]
Type=simple
User=root
ExecStart=/opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf
ExecStop=/bin/kill -SIGTERM $MAINPID
Restart=on-failure
RestartSec=5
LimitNOFILE=65536
Environment="ROCKETMQ_HOME=/opt/rocketmq"
Environment="JAVA_OPT=-Duser.home=/data/rocketmq-broker"

[Install]
WantedBy=multi-user.target
EOF

启用并启动 Broker 服务

执行以下命令:

# 启动 Broker 服务
sudo systemctl start rocketmq-broker

# 设置开机自启
sudo systemctl enable rocketmq-broker

# 检查服务状态
sudo systemctl status rocketmq-broker

# 重启服务(可选)
systemctl daemon-reload
systemctl restart rocketmq-broker

示例输出如下:

root@broker-1-1:~# systemctl status rocketmq-broker
● rocketmq-broker.service - RocketMQ Broker
     Loaded: loaded (/etc/systemd/system/rocketmq-broker.service; enabled; vendor preset: enabled)
     Active: active (running) since Sun 2024-12-22 15:55:35 CST; 4min 36s ago
   Main PID: 119620 (mqbroker)
      Tasks: 187 (limit: 4556)
     Memory: 7.3G
        CPU: 1min 21.725s
     CGroup: /system.slice/rocketmq-broker.service
             ├─119620 /bin/sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf
             ├─119621 sh /opt/rocketmq/bin/runbroker.sh -Drmq.logback.configurationFile=/opt/rocketmq/conf/rmq.broker.logback.xml org.apache.rocketmq.broker.BrokerStartup -c /opt/rocketmq/conf/br>
             └─119650 /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Duser.home=/data/rocketmq-broker -server -Xms8g -Xmx8g -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:Initiat>

Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.classic.model.processor.LoggerModelHandler - Setting additivity of logger [io.opente>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.core.model.processor.AppenderRefModelHandler - Attaching appender named [RocketmqBro>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.classic.model.processor.LoggerModelHandler - Setting level of logger [RocketmqAuthAu>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.classic.model.processor.LoggerModelHandler - Setting additivity of logger [RocketmqA>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.core.model.processor.AppenderRefModelHandler - Attaching appender named [RocketmqAut>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.classic.model.processor.RootLoggerModelHandler - Setting level of ROOT logger to INFO
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,708 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.core.model.processor.AppenderRefModelHandler - Attaching appender named [DefaultSift>
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,709 |-INFO in org.apache.rocketmq.logging.ch.qos.logback.core.model.processor.DefaultProcessor@6b4a4e18 - End of configuration.
Dec 22 15:55:44 broker-1-1 mqbroker[119650]: 15:55:44,713 |-INFO in org.apache.rocketmq.common.logging.JoranConfiguratorExt@27c86f2d - Registering current configuration as safe fallback point
Dec 22 15:55:51 broker-1-1 mqbroker[119650]: The broker[broker-1, 192.168.72.51:10911] boot success. serializeType=JSON and name server is 192.168.72.33:9876;192.168.72.34:9876
lines 1-22/22 (END)

日志检查与验证

Broker默认的日志目录位于home目录下,可通过以下命令检查:

tail -f /data/rocketmq-broker/logs/rocketmqlogs/broker.log 

确保 Broker 服务正常监听端口:

root@broker01-master:~# ss -tlnp | grep 10
LISTEN 0      1024               *:10909            *:*    users:(("java",pid=10544,fd=176))        
LISTEN 0      1024               *:10911            *:*    users:(("java",pid=10544,fd=175))        
LISTEN 0      50                 *:10912            *:*    users:(("java",pid=10544,fd=169))  

端口说明:

  • 10911端口:这是RocketMQ Broker组件默认使用的监听端口(listenPort)。要修改这一设定值,应该直接在Broker的配置文件内调整listenPort参数。
  • 10912端口:被称为haListenPort,专门用于Master Broker与Slave Broker之间进行主备切换时的数据同步。如需改变此端口,同样需要在Broker的配置文件中寻找并更新haListenPort项。
  • 10909端口:一般情况下,它代表了Broker VIPChannel所用到的端口,默认计算方式为listenPort - 2(因此通常是10909)。VIPChannel是一种特别设计的网络通道,旨在优化高负载环境下的性能表现。如果想要变更这个特定端口,可以考虑调整listenPort或直接在配置中指定新的VIPChannel端口。

查看 SyncStateSet

可以通过运维工具查看 SyncStateSet:

mqadmin getSyncStateSet -a 192.168.72.40:9878 -b broker-1

参数-a 代表的是任意一个 Controller 的地址

如果顺利的话,可以看到以下内容:

root@broker-1-1:~# mqadmin getSyncStateSet -a 192.168.72.40:9878 -b broker-1

#brokerName     broker-1
#MasterBrokerId 1
#MasterAddr     192.168.72.51:10911
#MasterEpoch    6
#SyncStateSetEpoch      6
#SyncStateSetNums       2

InSyncReplica:  ReplicaIdentity{brokerName='broker-1', brokerId=1, brokerAddress='192.168.72.51:10911', alive=true}

InSyncReplica:  ReplicaIdentity{brokerName='broker-1', brokerId=2, brokerAddress='192.168.72.52:10911', alive=true}
root@broker-1-1:~# mqadmin getSyncStateSet -a 192.168.72.40:9878 -b broker-2

#brokerName     broker-2
#MasterBrokerId 1
#MasterAddr     192.168.72.53:10911
#MasterEpoch    3
#SyncStateSetEpoch      6
#SyncStateSetNums       2

InSyncReplica:  ReplicaIdentity{brokerName='broker-2', brokerId=1, brokerAddress='192.168.72.53:10911', alive=true}

InSyncReplica:  ReplicaIdentity{brokerName='broker-2', brokerId=2, brokerAddress='192.168.72.54:10911', alive=true}
root@broker-1-1:~# 

查看 BrokerEpoch

可以通过运维工具查看 BrokerEpochEntry:

mqadmin getBrokerEpoch -n 192.168.72.33:9876 -b broker-1

参数-n 代表的是任意一个 Namesrv 的地址

如果顺利的话,可以看到以下内容:

root@broker-1-1:~# mqadmin getBrokerEpoch -n 192.168.72.33:9876 -b broker-1

#clusterName    DefaultCluster
#brokerName     broker-1
#brokerAddr     192.168.72.51:10911
#brokerId       0
#Epoch: EpochEntry{epoch=1, startOffset=0, endOffset=0}

#clusterName    DefaultCluster
#brokerName     broker-1
#brokerAddr     192.168.72.52:10911
#brokerId       2
#Epoch: EpochEntry{epoch=1, startOffset=0, endOffset=0}
root@broker-1-1:~# 
root@broker-1-1:~# 
root@broker-1-1:~# mqadmin getBrokerEpoch -n 192.168.72.33:9876 -b broker-2

#clusterName    DefaultCluster
#brokerName     broker-2
#brokerAddr     192.168.72.54:10911
#brokerId       2
#Epoch: EpochEntry{epoch=1, startOffset=0, endOffset=217}
#Epoch: EpochEntry{epoch=2, startOffset=217, endOffset=217}

#clusterName    DefaultCluster
#brokerName     broker-2
#brokerAddr     192.168.72.53:10911
#brokerId       0
#Epoch: EpochEntry{epoch=1, startOffset=0, endOffset=217}
#Epoch: EpochEntry{epoch=2, startOffset=217, endOffset=217}
root@broker-1-1:~# 

列出集群信息

root@broker-1-1:~# mqadmin clusterlist -n 192.168.72.33:9876
#Cluster Name           #Broker Name            #BID  #Addr                  #Version              #InTPS(LOAD)     #OutTPS(LOAD)  #Timer(Progress)        #PCWait(ms)  #Hour         #SPACE    #ACTIVATED
DefaultCluster          broker-1                0     192.168.72.51:10911    V5_3_1                 0.00(0,0ms)       0.00(0,0ms)  0-0(0.0w, 0.0, 0.0)               0  481904.09     0.1100          true
DefaultCluster          broker-1                2     192.168.72.52:10911    V5_3_1                 0.00(0,0ms)       0.00(0,0ms)  2-0(0.0w, 0.0, 0.0)               0  481904.09     0.1000         false
DefaultCluster          broker-2                0     192.168.72.53:10911    V5_3_1                 0.00(0,0ms)       0.00(0,0ms)  0-0(0.0w, 0.0, 0.0)               0  0.68          0.1100          true
DefaultCluster          broker-2                2     192.168.72.54:10911    V5_3_1                 0.00(0,0ms)       0.00(0,0ms)  2-0(0.0w, 0.0, 0.0)               0  0.68          0.1100         false

共2个副本组,每个副本组1主1从两个broker节点。

部署 Proxy 组件

在所有proxy节点操作。

创建 systemd 服务文件

/etc/systemd/system/ 目录下创建一个 rocketmq-proxy.service 文件。写入以下内容:

cat >/etc/systemd/system/rocketmq-proxy.service<<'EOF'
[Unit]
Description=RocketMQ Proxy
After=network.target

[Service]
Type=simple
User=root
Environment="ROCKETMQ_HOME=/opt/rocketmq"
Environment="JAVA_OPT=-Duser.home=/data/rocketmq-proxy"
Environment="NAMESRV_ADDR=192.168.72.33:9876;192.168.72.34:9876"
ExecStart=/opt/rocketmq/bin/mqproxy -n ${NAMESRV_ADDR}
ExecStop=/bin/kill -SIGTERM $MAINPID
Restart=on-failure
RestartSec=5

[Install]
WantedBy=multi-user.target
EOF

启用并启动 proxy服务

执行以下命令:

# 启动 proxy 服务
sudo systemctl start rocketmq-proxy

# 设置开机自启
sudo systemctl enable rocketmq-proxy

# 检查服务状态
sudo systemctl status rocketmq-proxy

# 重新启动服务(可选)
systemctl daemon-reload
systemctl restart rocketmq-proxy

日志检查与验证

Proxy默认的日志目录位于home目录下,可通过以下命令检查:

tail -f /data/rocketmq-proxy/logs/rocketmqlogs/proxy.log 

确保 NameServer 服务正常监听端口:

root@proxy01:~# ss -tlnp | grep :80
LISTEN 0      1024               *:8080            *:*    users:(("java",pid=8906,fd=229))         
LISTEN 0      4096               *:8081            *:*    users:(("java",pid=8906,fd=228)) 

端口说明:

  • 8080端口:这是RocketMQ Proxy中默认的remoting协议访问端口(remotingListenPort)。此外,在RocketMQ Dashboard中也作为默认访问端口。如果需要更改这个端口号,对于Proxy来说,可以通过修改配置文件conf/rmq-proxy.json来实现;而对于Dashboard,则是通过调整application.yml文件中的相关设置完成,参考链接为这里。
  • 8081端口:在RocketMQ Proxy里,该端口被指定为gRPC协议访问端口(grpcServerPort)。同样地,若需自定义此端口号,应编辑rmq-proxy.json配置文件。

部署 Dashboard 组件

在所有dashboard节点操作。

下载并解压rocketmq-dashboard源码包

wget https://dist.apache.org/repos/dist/release/rocketmq/rocketmq-dashboard/2.0.0/rocketmq-dashboard-2.0.0-source-release.zip
sudo unzip rocketmq-dashboard-2.0.0-source-release.zip -d /opt
sudo ln -s /opt/rocketmq-dashboard-2.0.0-source-release /opt/rocketmq-dashboard

修改配置文件

root@dashboard01:~# cat /opt/rocketmq-dashboard/src/main/resources/application.yml 
rocketmq:
  config:
    namesrvAddrs:
      - 192.168.72.33:9876;192.168.72.34:9876
    dataPath: /data/rocketmq-console/data
    loginRequired: true
    proxyAddrs:
      - 192.168.72.31:8080
      - 192.168.72.32:8080

编译 rocketmq-dashboard

cd /opt/rocketmq-dashboard
apt install -y maven
mvn clean package -Dmaven.test.skip=true

创建 systemd 服务文件

/etc/systemd/system/ 目录下创建一个 rocketmq-dashboard.service 文件。写入以下内容:

cat >/etc/systemd/system/rocketmq-dashboard.service<<'EOF'
[Unit]
Description=RocketMQ Dashboard Service
After=network.target

[Service]
ExecStart=/usr/bin/java $JAVA_OPT -jar /opt/rocketmq-dashboard/target/rocketmq-dashboard-2.0.0.jar
WorkingDirectory=/opt/rocketmq-dashboard
StandardOutput=journal
StandardError=journal
Restart=always
User=root
Group=root
Environment="JAVA_OPT=-server -Xms256m -Xmx256m -Duser.home=/data/rocketmq-console"

[Install]
WantedBy=multi-user.target
EOF

启用并启动 dashboard 服务

执行以下命令:

# 启动 dashboard 服务
sudo systemctl start rocketmq-dashboard

# 设置开机自启
sudo systemctl enable rocketmq-dashboard

# 检查服务状态
sudo systemctl status rocketmq-dashboard

# 重新服务(可选)
systemctl daemon-reload
systemctl restart rocketmq-dashboard

日志检查与验证

Dashboard默认的日志目录位于home目录下,可通过以下命令检查:

tail -f /data/rocketmq-console/logs/dashboardlogs/rocketmq-dashboard.log

确保 Dashboard服务正常监听端口

root@dashboard01:~# ss -tlnp | grep 8080

提示:Started App in x.xxx seconds (JVM running for x.xxx) 启动成功

浏览器页面访问:http://dashboard-addr:8080
在这里插入图片描述

部署 LB 组件

在所有lb节点执行。

安装haproxy和keepalived

apt install -y haproxy keepalived

修改haproxy配置文件,所有lb节点配置相同。

cat >/etc/haproxy/haproxy.cfg<<EOF
global
    log /dev/log local0
    log /dev/log local1 notice
    maxconn 200
    uid 99
    gid 99
    daemon

defaults
    log     global
    option  httplog
    timeout connect 5000ms
    timeout client  50000ms
    timeout server  50000ms

frontend proxy_front_remoting
    bind *:8080
    mode tcp
    default_backend proxy_back_remoting

backend proxy_back_remoting
    mode tcp
    balance roundrobin
    option http-server-close
    option forwardfor
    server proxy01 192.168.72.31:8080 check
    server proxy02 192.168.72.32:8080 check


frontend proxy_front_grpc
    bind *:8081
    mode tcp
    default_backend proxy_back_grpc

backend proxy_back_grpc
    mode tcp
    balance roundrobin
    option http-server-close
    option forwardfor
    server proxy01 192.168.72.31:8081 check
    server proxy02 192.168.72.32:8081 check

frontend dashboard_front
    bind *:8088
    mode http
    default_backend dashboard_back

backend dashboard_back
    mode http
    balance roundrobin
    cookie SERVERID insert indirect nocache
    option http-server-close
    option forwardfor
    server dashboard1 192.168.72.55:8080 check cookie dashboard1
    server dashboard2 192.168.72.56:8080 check cookie dashboard2
EOF

修改keepalived配置文件,所有lb节点配置相同。

cat >/etc/keepalived/keepalived.conf<<EOF
global_defs {
    router_id 51
    vrrp_version 2
    vrrp_garp_master_delay 1
    vrrp_garp_master_refresh 1
    script_user root
    enable_script_security
}

vrrp_script check_haproxy {
    script "/usr/bin/killall -0 haproxy"
    timeout 3
    interval 5   # check every 5 second
    fall 3       # require 3 failures for KO
    rise 2       # require 2 successes for OK
}

# VRRP Instance for Proxy VIP
vrrp_instance VI_Proxy {
    state BACKUP
    interface ens33
    virtual_router_id 51
    priority 101
    advert_int 1
    authentication {
        auth_type PASS
        auth_pass 1234
    }
    virtual_ipaddress {
        192.168.72.250 dev ens33 # Proxy VIP
    }
    nopreempt
    track_script {
        check_haproxy
    }
}

# VRRP Instance for Dashboard VIP
vrrp_instance VI_Dashboard {
    state BACKUP
    interface ens33
    virtual_router_id 52
    priority 101
    advert_int 1
    authentication {
        auth_type PASS
        auth_pass 1234
    }
    virtual_ipaddress {
        192.168.72.251 dev ens33 # Dashboard VIP
    }
    nopreempt
    track_script {
        check_haproxy  
    }
}
EOF

参数说明:

  • interface ens33:指定节点网卡名称
  • virtual_ipaddress:指定VIP地址

启动服务

systemctl enable --now haproxy.service keepalived.service

测试故障转移

在副本组1中,IP为192.168.72.51的节点,即broker-1-1为master节点,示例如下:
在这里插入图片描述
将该节点直接关机,模拟broker master节点故障

root@broker-1-1:~# shutdown -h now

重新查看集群状态,节点IP为192.168.72.52的节点自动提升为master节点,继续为客户端提供消息写入及消费能力。
在这里插入图片描述
重新恢复IP为192.168.72.51的节点,即broker-1-1,该节点将成为slave节点。
在这里插入图片描述

使用工具生产与消费消息

创建topic

mqadmin updateTopic -n 192.168.72.33:9876 -t TopicTest -c DefaultCluster \
-a +message.type=NORMAL

示例输出

root@nameserver01:~# mqadmin updateTopic -n 192.168.72.33:9876 -t TopicTest -c DefaultCluster \
-a +message.type=NORMAL
create topic to 192.168.72.51:10911 success.
create topic to 192.168.72.53:10911 success.
TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+message.type=NORMAL}]
root@nameserver01:~# 

发送消息,连接到proxy VIP地址

export NAMESRV_ADDR=192.168.72.250:8080
tools.sh org.apache.rocketmq.example.quickstart.Producer

示例输出

SendResult [sendStatus=SEND_OK, msgId=C0A84821BD1D5505405770783E0D03E4, offsetMsgId=C0A8483300002A9F000000000009616C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-1, queueId=2], q
ueueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=C0A84821BD1D5505405770783E0F03E5, offsetMsgId=C0A8483300002A9F000000000009625E, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-1, queueId=3], q
ueueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=C0A84821BD1D5505405770783E1003E6, offsetMsgId=C0A8483300002A9F0000000000096350, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-1, queueId=4], q
ueueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=C0A84821BD1D5505405770783E1203E7, offsetMsgId=C0A8483300002A9F0000000000096442, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-1, queueId=5], q
ueueOffset=125]

消费消息,连接到proxy VIP地址

export NAMESRV_ADDR=192.168.72.250:8080
tools.sh org.apache.rocketmq.example.quickstart.Consumer

示例输出

ConsumeMessageThread_please_rename_unique_group_name_4_5 Receive New Messages: [MessageExt [brokerName=broker-2, queueId=4, storeSize=242, queueOffset=100, sysFlag=0, bornTimestamp=1734869327582, bornHost=/192.168.72.31:57150, storeTimestamp=1734869327582, storeHost=/192.168.72.53:10911, msgId=C0A8483500002A9F000000000008A628, commitLogOffset=566824, bodyCRC=1500772453, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1734869422248, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A84821BD1D5505405770783ADE026E, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=124}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 54, 50, 50], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_14 Receive New Messages: [MessageExt [brokerName=broker-2, queueId=4, storeSize=242, queueOffset=98, sysFlag=0, bornTimestamp=1734869327512, bornHost=/192.168.72.31:57162, storeTimestamp=1734869327512, storeHost=/192.168.72.53:10911, msgId=C0A8483500002A9F0000000000089708, commitLogOffset=562952, bodyCRC=1456471771, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1734869422248, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A84821BD1D5505405770783A98024E, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=124}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 57, 48], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_4 Receive New Messages: [MessageExt [brokerName=broker-1, queueId=2, storeSize=242, queueOffset=120, sysFlag=0, bornTimestamp=1734869328255, bornHost=/192.168.72.31:52084, storeTimestamp=1734869328255, storeHost=/192.168.72.51:10911, msgId=C0A8483300002A9F0000000000093B9C, commitLogOffset=605084, bodyCRC=2121214082, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1734869422240, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A84821BD1D5505405770783D7F0394, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=126}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 49, 54], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_3 Receive New Messages: [MessageExt [brokerName=broker-1, queueId=2, storeSize=242, queueOffset=119, sysFlag=0, bornTimestamp=1734869328229, bornHost=/192.168.72.31:52064, storeTimestamp=1734869328229, storeHost=/192.168.72.51:10911, msgId=C0A8483300002A9F000000000009340C, commitLogOffset=603148, bodyCRC=236436726, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1734869422239, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A84821BD1D5505405770783D650384, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=126}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 48, 48], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_17 Receive New Messages: [MessageExt [brokerName=broker-2, queueId=5, storeSize=242, queueOffset=97, sysFlag=0, bornTimestamp=1734869327482, bornHost=/192.168.72.31:57146, storeTimestamp=1734869327482, storeHost=/192.168.72.53:10911, msgId=C0A8483500002A9F000000000008906A, commitLogOffset=561258, bodyCRC=942024666, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1734869422221, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A84821BD1D5505405770783A7A023F, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=124}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 55, 53], transactionId='null'}]] 

查看dashboard,连接到dashboard VIP
在这里插入图片描述

查看topic状态
在这里插入图片描述

使用SDK生产与消费消息

使用Idea IDE新建JAVA项目,项目结构如下:
在这里插入图片描述
pom.xml代码示例

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>rocketmq-demo01</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>23</maven.compiler.source>
        <maven.compiler.target>23</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.7</version>
        </dependency>

        <!-- SLF4J API -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>2.0.16</version> <!-- 使用当前版本 -->
        </dependency>

        <!-- Logback Classic (SLF4J's default implementation) -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.5.15</version> <!-- 使用当前版本 -->
        </dependency>

        <!-- Logback Core -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.5.13</version> <!-- 使用当前版本 -->
        </dependency>

    </dependencies>
</project>

ProducerExample.java代码示例

package org.example;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerExample {
    private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);


    public static void main(String[] args) throws ClientException {
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
        String endpoint = "192.168.72.250:8081";
        // 消息发送的目标Topic名称,需要提前创建。
        String topic = "TopicTest";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
        ClientConfiguration configuration = builder.build();
        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
        // 普通消息发送。
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // 设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                // 消息体。
                .setBody("messageBody".getBytes())
                .build();
        try {
            // 发送消息,需要关注发送结果,并捕获失败等异常。
            SendReceipt sendReceipt = producer.send(message);
            logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
            logger.error("Failed to send message", e);
        }
        // producer.close();
    }
}

配置参数说明:

  • String endpoint 指定proxy VIP地址及端口
  • String topic 指定topic,需提前创建

PushConsumerExample.java代码示例

package org.example;

import java.io.IOException;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PushConsumerExample {
    private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoints = "192.168.72.250:8081";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        // 订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // 为消费者指定所属的消费者分组,Group需要提前创建。
        String consumerGroup = "YourConsumerGroup";
        // 指定需要订阅哪个目标Topic,Topic需要提前创建。
        String topic = "TopicTest";
        // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // 设置消费者分组。
                .setConsumerGroup(consumerGroup)
                // 设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 设置消费监听器。
                .setMessageListener(messageView -> {
                    // 处理消息并返回消费结果。
                    logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        // 如果不需要再使用 PushConsumer,可关闭该实例。
        // pushConsumer.close();
    }
}

配置参数说明:

  • String endpoint 指定proxy VIP地址及端口
  • String topic 指定topic,需提前创建

测试生产消息
在这里插入图片描述
测试消费消息
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2264313.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

性能】JDK和Jmeter的安装与配置

一、JDK环境配置 1. 下载JDK 官网下载地址&#xff1a;http://www.oracle.com/technetwork/java/javase/downloads/java-archive-downloads-javase7-521261.html 选择对应系统的安装包&#xff0c;下载后安装&#xff0c;安装中记录JDK安装的地址&#xff0c;之后一直点击下一…

分布式协同 - 分布式事务_2PC 3PC解决方案

文章目录 导图Pre2PC&#xff08;Two-Phase Commit&#xff09;协议准备阶段提交阶段情况 1&#xff1a;只要有一个事务参与者反馈未就绪&#xff08;no ready&#xff09;&#xff0c;事务协调者就会回滚事务情况 2&#xff1a;当所有事务参与者均反馈就绪&#xff08;ready&a…

ubuntu 如何重装你的apt【apt-get报错: symbol lookup error/undefined symbol】

副标题:解决error:apt-get: symbol lookup error: /lib/x86_64-linux-gnu/libapt-private.so.0.0: undefined symbol: _ZNK13pkgTagSection7FindULLENS_3KeyERKy, version APTPKG_6.0 文章目录 问题描述报错分析解决方案:重装你的apt1、查看你的ubuntu版本2、下载适配你的ap…

RK3588 , mpp硬编码yuv, 保存MP4视频文件.

RK3588 , mpp硬编码yuv, 保存MP4视频文件. ⚡️ 传送 ➡️ Ubuntu x64 架构, 交叉编译aarch64 FFmpeg mppRK3588, FFmpeg 拉流 RTSP, mpp 硬解码转RGBRk3588 FFmpeg 拉流 RTSP, 硬解码转RGBRK3588 , mpp硬编码yuv, 保存MP4视频文件.

纯血鸿蒙APP实战开发——动态注册字体案例

介绍 本示例介绍利用上传下载模块和注册自定义字体模块实现从网络上下载字体并注册应用字体的功能&#xff0c;该场景多用于由特殊字体要求的场景。 效果图预览 使用说明 进入本案例页面后&#xff0c;可点击下方按钮切换字体。目前仅提供了思源宋体的注册&#xff0c;第一次…

Oracle中间件 SOA之 OSB 12C服务器环境搭建

环境信息 服务器基本信息 如下表&#xff0c;本次安装总共使用1台服务器&#xff0c;具体信息如下&#xff1a; App1服务器 归类 APP服务器 Ip Address 172.xx.30.xx HostName appdev01. xxxxx.com Alias appdev01 OSB1服务器 归类 OSB服务器 Ip Address 172.xx3…

数据结构---------二叉树前序遍历中序遍历后序遍历

以下是用C语言实现二叉树的前序遍历、中序遍历和后序遍历的代码示例&#xff0c;包括递归和非递归&#xff08;借助栈实现&#xff09;两种方式&#xff1a; 1. 二叉树节点结构体定义 #include <stdio.h> #include <stdlib.h>// 二叉树节点结构体 typedef struct…

前置知识补充—JavaScript

JavaScript 简介 JavaScript 是什么 JavaScript (简称 JS), 是⼀个脚本语⾔, 解释型或即时编译型的编程语⾔. 虽然它是作为开发Web⻚⾯的脚本语⾔⽽出名&#xff0c;但是它也被⽤到了很多⾮浏览器环境中 HTML&#xff1a; ⽹⻚的结构 CSS&#xff1a; …

Mac上详细配置java开发环境和软件(更新中)

文章目录 概要JDK的配置JDK下载安装配置JDK环境变量文件 Idea的安装Mysql安装和配置Navicat Premium16.1安装安装Vscode安装和配置Maven配置本地仓库配置阿里云私服Idea集成Maven Cpolar快速入门 概要 这里使用的是M3型片 14.6版本的Mac 用到的资源放在网盘 链接: https://pan…

第二十六周机器学习笔记:PINN求正反解求PDE文献阅读——正问题

第二十六周周报 摘要Abstract文献阅读《Physics-informed neural networks: A deep learning framework for solving forward and inverse problems involving nonlinear partial differential equations》1. 引言2. 问题的设置3.偏微分方程的数据驱动解3.1 连续时间模型3.1.1 …

米思奇图形化编程之ESP32控制LED灯闪烁方案实现

目录 一、项目概述 二、硬件准备 三、硬件连接 四、软件编程 五、验证效果 六、总结 一、项目概述 本项目使用米思奇图形化编程环境&#xff0c;编写micropython软件代码&#xff0c;实现了控制ESP32开发板上LED灯闪烁效果。该项目可为后续更复杂的物联网项目打下基础。…

完全离线使用,效率直接拉满

现在越来越多的人使用OCR软件来提高自己的工作效率&#xff0c;今天给大家推荐一款电脑端的文字识别工具&#xff0c;对比以往的软件来说&#xff0c;功能更加丰富全面。 Umi-OCR 美术、舞蹈、音乐 打开软件之后需要安装一下。 软件主要有截图OCR识别、批量OCR识别、批量文档识…

CSDN外链失效3:

参考我之前的博客&#xff1a; 外链失效博客1&#xff1a;随想笔记1&#xff1a;CSDN写博客经常崩溃&#xff0c;遇到外链图片转存失败怎么办_csdn外链图片转存失败-CSDN博客 外链失效博客2&#xff1a;网络随想2&#xff1a;转语雀_md格式转语雀lake格式-CSDN博客 markdown…

Java 中的字符串

目录 Java 中的字符串字符串的创建字符串的比较字符串的拼接如何定义一个空的字符串 Java 中的字符串 字符串的创建 在 Java 中&#xff0c;可以通过以下几种方式创建字符串&#xff1a; 1.使用字符串字面量&#xff1a; String str "Hello, World!";2.使用 new…

U盘结构损坏且无法访问:原因、恢复方案与预防措施

U盘结构损坏现象描述 U盘&#xff0c;这一小巧便捷的存储设备&#xff0c;在日常工作和学习中扮演着重要角色。然而&#xff0c;当U盘出现结构损坏且无法访问时&#xff0c;用户往往会陷入焦虑与困惑。具体表现为&#xff0c;将U盘插入电脑后&#xff0c;系统无法识别U盘&…

basic_ios及其衍生库(附 GCC libstdc++源代码)

basic_ios及其衍生库(附 GCC libstdc源代码) 我们由这张图展开我们的讨论 对于Date对象&#xff0c;只有实现了<<重载到输出流才可以插入到stringstream ss中 现在我有疑问stringstream是怎么做到既能输出又能输入的&#xff1f; 而且为什么stringstream对象能传给ostre…

【开源库 | minizip】Linux(Ubuntu18.04)下,minizip的编译、交叉编译

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; ⏰发布时间⏰&#xff1a; 2024-12-20 …

Gin-vue-admin(1):环境配置和安装

目录 环境配置如果443网络连接问题&#xff0c;需要添加代理服务器 后端运行前端运行 环境配置 git clone https://gitcode.com/gh_mirrors/gi/gin-vue-admin.git到server文件目录下 go mod tidygo mod tidy 是 Go 语言模块系统中的一个命令&#xff0c;用于维护 go.mod 文件…

java: 无效的目标发行版: xx

java: 无效的目标发行版: xx 背景java: 无效的目标发行版: xx 在 Intellij 的修复 背景 这里单独针对Intellij开发工具对 “java: 无效的目标发行版: xx”错误的修复。 java: 无效的目标发行版: xx 在 Intellij 的修复 同一台电脑使用多个JDK的时候容易出现在运行程序时容易…

vscode+编程AI配置、使用说明

文章目录 [toc]1、概述2、github copilot2.1 配置2.2 使用文档2.3 使用说明 3、文心快码&#xff08;Baidu Comate&#xff09;3.1 配置3.2 使用文档3.3 使用说明 4、豆包&#xff08;MarsCode&#xff09;4.1 配置4.2 使用文档4.3 使用说明 5、通义灵码&#xff08;TONGYI Lin…