文档:https://github.com/apache/rocketmq/blob/develop/docs/cn/operation.md写的很全面,我写了一半就偷懒了,地址放这里。
命令大全:https://github.com/apache/rocketmq/blob/develop/docs/cn/operation.md
1. 删除讨厌的告警
如上sh: warning: setlocale: LC_ALL: cannot change locale (en_US.UTF-8)
和OpenJDK 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
两个异常太烦人了。
- 删除
sh: warning: setlocale: LC_ALL: cannot change locale (en_US.UTF-8)
- 执行
localedef -f UTF-8 -i en_US en_US.UTF-8
- 执行
- 删除
OpenJDK 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
- 修改
tools.sh
脚本,删除-XX:PermSize=128m -XX:MaxPermSize=128m
- 修改
2. 生产者和消费者代码
2.1 生产者代码
package com.tom.lrocket;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer1 {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("demo-producer-group");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
int count = 1000;
for (int i = 0; i < count; i++) {
String topic = "TOPIC-1";
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message(topic, "TagB", (topic + " Hello RocketMQ " + i).getBytes());
// 发送消息到一个Broker
SendResult send = producer.send(msg);
System.out.println(send);
Thread.sleep(1000);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
SendResult [sendStatus=SEND_OK, msgId=AC1086D51C7518B4AAC2948F2B740000, offsetMsgId=AC1086D500002A9F0000000000063E0C, messageQueue=MessageQueue [topic=TOPIC-2, brokerName=broker-a, queueId=1], queueOffset=163]
SendResult [sendStatus=SEND_OK, msgId=AC1086D51C7518B4AAC2948F2F6A0001, offsetMsgId=AC1086D500002A9F0000000000063F7C, messageQueue=MessageQueue [topic=TOPIC-2, brokerName=broker-a, queueId=2], queueOffset=164]
SendResult [sendStatus=SEND_OK, msgId=AC1086D51C7518B4AAC2948F33550002, offsetMsgId=AC1086D500002A9F00000000000640EC, messageQueue=MessageQueue [topic=TOPIC-2, brokerName=broker-a, queueId=3], queueOffset=164]
2.2 消费者代码
package com.tom.lrocket;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer1 {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("lixi");
// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
String topic = "TOPIC-1";
consumer.subscribe(topic, "TagB");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
Consumer Started.
ConsumeMessageThread_lixi_10 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=1, storeSize=185, queueOffset=287, sysFlag=0, bornTimestamp=1722255624569, bornHost=/172.18.0.1:60040, storeTimestamp=1722255624572, storeHost=/172.16.134.213:10911, msgId=AC1086D500002A9F0000000000065181, commitLogOffset=414081, bodyCRC=1939369001, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TOPIC-1', flag=0, properties={CONSUME_START_TIME=1722255633739, UNIQ_KEY=AC1086D51C6E18B4AAC2948F61790013, MIN_OFFSET=0, TAGS=TagB, WAIT=true, MAX_OFFSET=289}, body=[84, 79, 80, 73, 67, 45, 49, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 57], transactionId='null'}]]
ConsumeMessageThread_lixi_19 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=185, queueOffset=288, sysFlag=0, bornTimestamp=1722255625577, bornHost=/172.18.0.1:60040, storeTimestamp=1722255625582, storeHost=/172.16.134.213:10911, msgId=AC1086D500002A9F00000000000652F3, commitLogOffset=414451, bodyCRC=560564046, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TOPIC-1', flag=0, properties={CONSUME_START_TIME=1722255633739, UNIQ_KEY=AC1086D51C6E18B4AAC2948F65690014, MIN_OFFSET=0, TAGS=TagB, WAIT=true, MAX_OFFSET=290}, body=[84, 79, 80, 73, 67, 45, 49, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50, 48], transactionId='null'}]]
ConsumeMessageThread_lixi_13 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=184, queueOffset=284, sysFlag=0, bornTimestamp=1722255613500, bornHost=/172.18.0.1:60040, storeTimestamp=1722255613503, storeHost=/172.16.134.213:10911, msgId=AC1086D500002A9F00000000000641A4, commitLogOffset=410020, bodyCRC=852255292, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TOPIC-1', flag=0, properties={CONSUME_START_TIME=1722255633739, UNIQ_KEY=AC1086D51C6E18B4AAC2948F363C0008, MIN_OFFSET=0, TAGS=TagB, WAIT=true, MAX_OFFSET=289}, body=[84, 79, 80, 73, 67, 45, 49, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56], transactionId='null'}]]
2.3 解释Message Key 、Unique Key、Message Id 的区别
from:https://cloud.tencent.com/developer/article/1581366
github的解释
RocketMQ提供了3种消息查询方式:
- 按照Message Key 查询:消息的key是业务开发同学在发送消息之前自行指定的,通常会把具有业务含义,区分度高的字段作为消息的key,如用户id,订单id等。
- 按照Unique Key查询:除了业务开发同学明确的指定消息中的key,RocketMQ生产者客户端在发送发送消息之前,会自动生成一个UNIQ_KEY,设置到消息的属性中,从逻辑上唯一代表一条消息。
- 按照Message Id 查询:Message Id 是消息发送后,在Broker端生成的,其包含了Broker的地址,和在CommitLog中的偏移信息,并会将Message Id作为发送结果的一部分进行返回。Message Id中属于精确匹配,从物理上唯一代表一条消息,查询效率更高。
从发送端可以看到msgId=AC1086D51C6E18B4AAC2948F16B10000, offsetMsgId=AC1086D500002A9F00000000000639BC
。msgId
是Unique Key
,offsetMsgId
是Message Id
。
SendResult [sendStatus=SEND_OK, msgId=AC1086D51C6E18B4AAC2948F16B10000, offsetMsgId=AC1086D500002A9F00000000000639BC, messageQueue=MessageQueue [topic=TOPIC-1, brokerName=broker-a, queueId=2], queueOffset=284]
从接受的消息可以看到msgId=AC1086D500002A9F00000000000639BC
和UNIQ_KEY=AC1086D51C6E18B4AAC2948F16B10000
。
ConsumeMessageThread_lixi_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=184, queueOffset=284, sysFlag=0, bornTimestamp=1722255605426, bornHost=/172.18.0.1:60040, storeTimestamp=1722255605436, storeHost=/172.16.134.213:10911, msgId=AC1086D500002A9F00000000000639BC, commitLogOffset=407996, bodyCRC=1008200206, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TOPIC-1', flag=0, properties={CONSUME_START_TIME=1722255633739, UNIQ_KEY=AC1086D51C6E18B4AAC2948F16B10000, MIN_OFFSET=0, TAGS=TagB, WAIT=true, MAX_OFFSET=290}, body=[84, 79, 80, 73, 67, 45, 49, 32, 72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
Message Key
3. 查看topic中的消息统计、消费组的消费状态
- topicStatus 查看 Topic 消息队列offset
- consumerProgress 订阅组消费状态
4. Admin Tool命令
4.1 Topic相关
名称 | 含义 | 命令选项 | 说明 |
updateTopic | 创建更新Topic配置 | -b | Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port |
-c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询) | ||
-h | 打印帮助 | ||
-n | NameServer服务地址,格式 ip:port | ||
-p | 指定新topic的读写权限( W=2|R=4|WR=6 ) | ||
-r | 可读队列数(默认为 8) | ||
-w | 可写队列数(默认为 8) | ||
-t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) | ||
deleteTopic | 删除Topic | -c | cluster 名称,表示删除某集群下的某个 topic (集群 可通过 clusterList 查询) |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) | ||
topicList | 查看 Topic 列表信息 | -h | 打印帮助 |
-c | 不配置-c只返回topic列表,增加-c返回clusterName, topic, consumerGroup信息,即topic的所属集群和订阅关系,没有参数 | ||
-n | NameServer 服务地址,格式 ip:port | ||
topicRoute | 查看 Topic 路由信息 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
topicStatus | 查看 Topic 消息队列offset | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
topicClusterList | 查看 Topic 所在集群列表 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
updateTopicPerm | 更新 Topic 读写权限 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-b | Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port | ||
-p | 指定新 topic 的读写权限( W=2|R=4|WR=6 ) | ||
-c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询),-b优先,如果没有-b,则对集群中所有Broker执行命令 | ||
updateOrderConf | 从NameServer上创建、删除、获取特定命名空间的kv配置,目前还未启用 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic,键 | ||
-v | orderConf,值 | ||
-m | method,可选get、put、delete | ||
allocateMQ | 以平均负载算法计算消费者列表负载消息队列的负载结果 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-i | ipList,用逗号分隔,计算这些ip去负载Topic的消息队列 | ||
statsAll | 打印Topic订阅关系、TPS、积累量、24h读写总量等信息 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-a | 是否只打印活跃topic | ||
-t | 指定topic |
deleteTopic 删除topic
sh-4.2$ sh mqadmin deleteTopic -h
usage: mqadmin deleteTopic -c <arg> [-h] [-n <arg>] -t <arg>
-c,--clusterName <arg> delete topic from which cluster
-h,--help Print help
-n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
-t,--topic <arg> topic name
sh-4.2$
sh-4.2$
sh-4.2$ sh mqadmin deleteTopic -n namesrv:9876 -t TopicA -c DefaultCluster
delete topic [TopicA] from cluster [DefaultCluster] success.
delete topic [TopicA] from NameServer success.
topicList 查看 Topic 列表信息
sh-4.2$ sh mqadmin topicList -n namesrv:9876 -c
#Cluster Name #Topic #Consumer Group
DefaultCluster RMQ_SYS_TRANS_HALF_TOPIC
DefaultCluster TOM
DefaultCluster TopicA
DefaultCluster BenchmarkTest
DefaultCluster OFFSET_MOVED_EVENT
DefaultCluster broker-a
DefaultCluster TBW102
DefaultCluster SELF_TEST_TOPIC
DefaultCluster DefaultCluster
sh-4.2$
sh-4.2$
sh-4.2$
topicRoute 查看 Topic 路由信息
sh-4.2$ sh mqadmin topicRoute -n namesrv:9876 -t TOM
{
"brokerDatas":[
{
"brokerAddrs":{0:"172.16.134.213:10911"
},
"brokerName":"broker-a",
"cluster":"DefaultCluster"
}
],
"filterServerTable":{},
"queueDatas":[
{
"brokerName":"broker-a",
"perm":6,
"readQueueNums":3,
"topicSynFlag":0,
"writeQueueNums":3
}
]
}
sh-4.2$
sh-4.2$ sh mqadmin topicStatus -n namesrv:9876 -t TOM
#Broker Name #QID #Min Offset #Max Offset #Last Updated
broker-a 0 0 1 2024-07-29 07:13:33,155
broker-a 1 0 0
broker-a 2 0 1 2024-07-29 07:18:07,281
sh-4.2$
sh-4.2$
topicClusterList 查看 Topic 所在集群列表
sh-4.2$
sh-4.2$ sh mqadmin topicClusterList -n namesrv:9876 -t TOM
DefaultCluster
sh-4.2$
updateTopicPerm 更新 Topic 读写权限
sh-4.2$
sh-4.2$ sh mqadmin updateTopicPerm -n namesrv:9876 -t TOM -b localhost:10911 -p 2
update topic perm from 6 to 2 in localhost:10911 success.
TopicConfig [topicName=TOM, readQueueNums=3, writeQueueNums=3, perm=-W-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
sh-4.2$
sh-4.2$
sh-4.2$ sh mqadmin consumeMessage -n namesrv:9876 -t TOM -b broker-a -c 1 -g lixi
org.apache.rocketmq.client.exception.MQClientException: Can not find Message Queue for this topic, TOM
See http://rocketmq.apache.org/docs/faq/ for further details.
at org.apache.rocketmq.client.impl.MQAdminImpl.fetchSubscribeMessageQueues(MQAdminImpl.java:162)
at org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.fetchSubscribeMessageQueues(DefaultMQPullConsumerImpl.java:138)
at org.apache.rocketmq.client.consumer.DefaultMQPullConsumer.fetchSubscribeMessageQueues(DefaultMQPullConsumer.java:225)
at org.apache.rocketmq.tools.command.message.ConsumeMessageCommand.executeDefault(ConsumeMessageCommand.java:251)
at org.apache.rocketmq.tools.command.message.ConsumeMessageCommand.execute(ConsumeMessageCommand.java:195)
at org.apache.rocketmq.tools.command.MQAdminStartup.main0(MQAdminStartup.java:135)
at org.apache.rocketmq.tools.command.MQAdminStartup.main(MQAdminStartup.java:86)
Caused by: org.apache.rocketmq.client.exception.MQClientException: Can not find Message Queue for this topic, TOM Namesrv return empty
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at org.apache.rocketmq.client.impl.MQAdminImpl.fetchSubscribeMessageQueues(MQAdminImpl.java:157)
... 6 more
sh-4.2$
sh-4.2$
sh-4.2$ sh mqadmin updateTopicPerm -n namesrv:9876 -t TOM -b localhost:10911 -p 4
update topic perm from 2 to 4 in localhost:10911 success.
TopicConfig [topicName=TOM, readQueueNums=3, writeQueueNums=3, perm=R--, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
sh-4.2$
sh-4.2$
sh-4.2$
sh-4.2$
sh-4.2$ sh mqadmin consumeMessage -n namesrv:9876 -t TOM -b broker-a -c 1 -g lixi
Consume ok
MSGID: 7F000001000831CEFDE091BEFDDB0000 MessageExt [queueId=0, storeSize=181, queueOffset=0, sysFlag=0, bornTimestamp=1722237213147, bornHost=/172.18.0.1:45436, storeTimestamp=1722237213155, storeHost=/172.16.134.213:10911, msgId=AC1086D500002A9F0000000000000000, commitLogOffset=0, bodyCRC=54431857, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TOM', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, KEYS=key-1, UNIQ_KEY=7F000001000831CEFDE091BEFDDB0000, WAIT=true, TAGS=tag-1}, body=[109, 101, 115, 115, 97, 103, 101, 66, 111, 100, 121, 45, 49], transactionId='null'}] BODY: messageBody-1
MessageQueue [topic=TOM, brokerName=broker-a, queueId=0] print msg finished. status=NO_NEW_MSG, offset=1
The older -1 message of the 1 queue will be provided
sh-4.2$
statsAll 打印Topic订阅关系、TPS、积累量、24h读写总量等信息
sh-4.2$
sh-4.2$
sh-4.2$ sh mqadmin statsAll -n namesrv:9876 -t TOM
#Topic #Consumer Group #Accumulation #InTPS #OutTPS #InMsg24Hour #OutMsg24Hour
TOM 0 0.00 0 NO_CONSUMER
sh-4.2$
查看rocketmq消息堆积
1. sh mqadmin consumerProgress -n namesrv:9876 -g lixi -s true
sh-4.2$ sh mqadmin consumerProgress -n namesrv:9876 -g lixi -s true
#Topic #Broker Name #QID #Broker Offset #Consumer Offset #Client IP #Diff #LastTime
%RETRY%lixi broker-a 0 0 0 172.16.134.213 0 N/A
TOM broker-a 0 2 2 172.16.134.213 0 2024-07-29 08:53:11
TOM broker-a 1 1 1 172.16.134.213 0 2024-07-29 09:20:30
TOM broker-a 2 2 2 172.16.134.213 0 2024-07-29 08:58:33
Consume TPS: 0.00
Diff Total: 0
2. 从管理页面的topic查看
从管理页面的消费者组查看
sh-4.2$ sh mqadmin consumerStatus -n namesrv:9876 -g lixi
001 172.16.134.213@5084 V4_6_0 1722245834703/172.16.134.213@5084
Same subscription in the same group of consumer
Rebalance OK
sh-4.2$
sh-4.2$
sh-4.2$
sh-4.2$
sh-4.2$
sh-4.2$
sh-4.2$ sh mqadmin consumeMessage -n namesrv:9876 -t TOM -b broker-a -g lixi
Consume ok
MSGID: 7F000001000831CEFDE091BEFDDB0000 MessageExt [queueId=0, storeSize=181, queueOffset=0, sysFlag=0, bornTimestamp=1722237213147, bornHost=/172.18.0.1:45436, storeTimestamp=1722237213155, storeHost=/172.16.134.213:10911, msgId=AC1086D500002A9F0000000000000000, commitLogOffset=0, bodyCRC=54431857, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TOM', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=key-1, UNIQ_KEY=7F000001000831CEFDE091BEFDDB0000, WAIT=true, TAGS=tag-1}, body=[109, 101, 115, 115, 97, 103, 101, 66, 111, 100, 121, 45, 49], transactionId='null'}] BODY: messageBody-1
MSGID: 7F000001000831CEFDE0921A38420005 MessageExt [queueId=0, storeSize=175, queueOffset=1, sysFlag=0, bornTimestamp=1722243191874, bornHost=/172.18.0.1:35472, storeTimestamp=1722243191877, storeHost=/172.16.134.213:10911, msgId=AC1086D500002A9F0000000000000166, commitLogOffset=358, bodyCRC=532952986, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TOM', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=key, UNIQ_KEY=7F000001000831CEFDE0921A38420005, WAIT=true, TAGS=tag}, body=[109, 101, 115, 115, 97, 103, 101, 66, 111, 100, 121], transactionId='null'}] BODY: messageBody
MessageQueue [topic=TOM, brokerName=broker-a, queueId=0] print msg finished. status=NO_NEW_MSG, offset=2
MessageQueue [topic=TOM, brokerName=broker-a, queueId=1] print msg finished. status=NO_NEW_MSG, offset=0
Consume ok
MSGID: 7F000001000831CEFDE091C32CAC0002 MessageExt [queueId=2, storeSize=177, queueOffset=0, sysFlag=0, bornTimestamp=1722237487276, bornHost=/172.18.0.1:58402, storeTimestamp=1722237487281, storeHost=/172.16.134.213:10911, msgId=AC1086D500002A9F00000000000000B5, commitLogOffset=181, bodyCRC=439861707, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TOM', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=key, UNIQ_KEY=7F000001000831CEFDE091C32CAC0002, WAIT=true, TAGS=tag}, body=[109, 101, 115, 115, 97, 103, 101, 66, 111, 100, 121, 45, 50], transactionId='null'}] BODY: messageBody-2
MSGID: 7F000001000831CEFDE0921F1FF30007 MessageExt [queueId=2, storeSize=181, queueOffset=1, sysFlag=0, bornTimestamp=1722243513331, bornHost=/172.18.0.1:40958, storeTimestamp=1722243513334, storeHost=/172.16.134.213:10911, msgId=AC1086D500002A9F0000000000000215, commitLogOffset=533, bodyCRC=439861707, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TOM', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=key-2, UNIQ_KEY=7F000001000831CEFDE0921F1FF30007, WAIT=true, TAGS=tag-2}, body=[109, 101, 115, 115, 97, 103, 101, 66, 111, 100, 121, 45, 50], transactionId='null'}] BODY: messageBody-2
MessageQueue [topic=TOM, brokerName=broker-a, queueId=2] print msg finished. status=NO_NEW_MSG, offset=2