RocketMQ 5.0
Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。
1、下载与安装RocketMQ 5.0
为了接近生产环境的开发,我们都是选择直接在Linux服务器上安装。
下载地址:https://rocketmq.apache.org/zh/download
注意:安装包分为二进制包和源码包,二进制包是已经编译好的可以直接运行,而源码包则需要编译后才能运行。
编译命令示例
$ unzip rocketmq-all-5.1.3-source-release.zip
$ cd rocketmq-all-5.1.3-source-release/
$ mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
$ cd distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
我们直接下载最新的二进制包就好了。
安装步骤如下
1、启动NameServer注册中心(存储 Broker 元信息)
# 解压
$ unzip rocketmq-all-5.1.3-bin-release.zip
解压后我们需要改一下启动脚本(如果服务器资源足够多可以忽略这一步)。
runserver.sh
需要修改JVM内存的配置,此脚本默认从JVM申请的内存有4G(我们只是用来测试与学习服务器资源配置根本没有这么高),如下
# 以下为 runserver.sh 截取片段
# 无论走 if 还是 else -Xms和-Xmx的配置都是4g
# 所以我们要重新赋值这个 JAVA_OPT 变量
choose_gc_options()
{
# Example of JAVA_MAJOR_VERSION value : '1', '9', '10', '11', ...
# '1' means releases befor Java 9
JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | awk -F '"' '/version/ {print $2}' | awk -F '.' '{print $1}')
if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
else
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"
fi
}
重新定制内存(重新赋值JAVA_OPT
变量)直接加在条件判断代码块后面即可。
# 重新定制内存
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
# JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
# JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"
runbroker.sh
也是需要修改JVM内存的配置,如下代码默认分配的是8g。
# 修改前
# JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
# 修改后
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
修改完后,我们就可以启动 RocketMQ 的 NameServer 了
# 启动 namesrv
$ nohup sh bin/mqnamesrv &
# 验证 namesrv 是否启动成功
$ tail -f -n 500 nohup.out
...
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
# 或者是
$ tail -f ~/logs/rocketmqlogs/namesrv.log
2023-07-18 23:17:49 INFO NSScanScheduledThread - start scanNotActiveBroker
...
2、启动 Broker 消息存储中心和 Proxy 代理
Proxy组件是 RocketMQ 5.0 版本官方推荐的部署组件,详细说明可查看官方文档
https://rocketmq.apache.org/zh/docs/deploymentOperations/01deploy/
NameServer 成功启动后,我们启动 Broker 和 Proxy 。
# 启动 Broker+Proxy
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
# 指定配置文件启动(broker默认使用的端口是10911,我们也可以在配置文件修改端口)
$ nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf --enable-proxy &
# 注意 --enable-proxy 开启代理后可能会报错
# java.io.IOException: Failed to bind to address 0.0.0.0:8080
# 当端口被占用时 broker/proxy 将无法启动
# 解决方案 https://blog.csdn.net/zooah212/article/details/127994243
# 验证是否启动成功
$ tail -f -n 500 nohup.out
The broker[suzhou-ydshp, 192.168.5.135:10911] boot success. serializeType=JSON and name server is localhost:9876
2、测试消息收发
创建消息发送的目标 Topic,RocketMQ 5.0 版本需要提前创建,例如:
# 可以通过 mqadmin 命令创建
# 注意 TestTopic 是topic名称
$ sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
create topic to 192.168.5.135:10911 success.
TopicConfig [topicName=TestTopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]
1、在IDEA中创建一个Java工程,并引入以下依赖
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client-java -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.5</version>
</dependency>
2、创建发送消息的程序并运行
@SpringBootTest
public class DemoApplicationTest {
private static final Logger logger = LoggerFactory.getLogger(DemoApplicationTest.class);
@Test
public void test() throws ClientException {
// 接入点地址,需要设置成 Proxy 的地址和端口列表,一般是xxx:8081;xxx:8081
String endpoint = "192.168.5.135:8081";
// 消息发送的目标Topic名称,需要提前创建。
String topic = "TestTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// 初始化Producer时需要设置通信配置以及预绑定的Topic
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// 普通消息发送
Message message = provider.newMessageBuilder()
.setTopic(topic)
// 设置消息索引键,可根据关键字精确查找某条消息
.setKeys("messageKey")
// 设置消息Tag,用于消费端根据指定Tag过滤消息
.setTag("messageTag")
// 消息内容实体(byte[])
.setBody("hello rocketMQ".getBytes())
.build();
try {
// 发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
logger.info("send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (ClientException e) {
logger.error("failed to send message", e);
}
// 关闭
producer.close();
}
}
3、创建订阅消息程序并运行。
Apache RocketMQ 支持SimpleConsumer和PushConsumer两种消费者类型,可以选择任意一种方式订阅消息。这里主要介绍PushConsumer。
@Test
public void pushConsumerTest() throws Exception {
ClientServiceProvider provider = ClientServiceProvider.loadService();
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081
String endpoint = "192.168.5.135:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoint)
.build();
// 订阅消息的过滤规则,表示订阅所有Tag的消息
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 为消费者指定所属的消费者分组,Group需要提前创建
String consumerGroup = "TestGroup";
// 指定需要订阅哪个目标Topic,Topic需要提前创建
String topic = "TestTopic";
// 初始化 PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置消费者分组
.setConsumerGroup(consumerGroup)
// 设置预绑定的订阅关系
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置消费监听器
.setMessageListener(messageView -> {
// 处理消息并返回消费结果
logger.info("consume message successfully, messageId={}", messageView.getMessageId());
// 消息内容处理
ByteBuffer body = messageView.getBody();
String message = StandardCharsets.UTF_8.decode(body).toString();
body.flip();
logger.info("message body={}", message);
return ConsumeResult.SUCCESS;
}).build();
Thread.sleep(Long.MAX_VALUE);
// 如果不需要再使用 PushConsumer,可关闭该实例。
pushConsumer.close();
}
3、新版rocketmq-dashboard搭建
rocketmq-dashboard
是由 rocketmq-console
升级而来,整体UI风格更加简洁,新增了很多新功能。支持多种部署方式如 docker 镜像部署,源码手动编译与部署等。
搭建过程可参考如下文章
文章地址:https://blog.csdn.net/m0_46357847/article/details/130476251
官方文档(使用说明):https://rocketmq.apache.org/zh/docs/deploymentOperations/04Dashboard