命令
mqadmin命令列表
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin
The most commonly used mqadmin commands are:
updateTopic Update or create topic
deleteTopic Delete topic from broker and NameServer.
updateSubGroup Update or create subscription group
setConsumeMode Set consume message mode. pull/pop etc.
deleteSubGroup Delete subscription group from broker.
updateBrokerConfig Update broker's config
updateTopicPerm Update topic perm
topicRoute Examine topic route info
topicStatus Examine topic Status info
topicClusterList Get cluster info for topic
addBroker Add a broker to specified container
removeBroker Remove a broker from specified container
resetMasterFlushOffset Reset master flush offset in slave
brokerStatus Fetch broker runtime status data
queryMsgById Query Message by Id
queryMsgByKey Query Message by Key
queryMsgByUniqueKey Query Message by Unique key
queryMsgByOffset Query Message by offset
queryMsgTraceById Query a message trace
printMsg Print Message Detail
printMsgByQueue Print Message Detail
sendMsgStatus Send msg to broker.
brokerConsumeStats Fetch broker consume stats data
producerConnection Query producer's socket connection and client version
consumerConnection Query consumer's socket connection, client version and subscription
consumerProgress Query consumers's progress, speed
consumerStatus Query consumer's internal data structure
cloneGroupOffset Clone offset from other group.
producer Query producer's instances, connection, status, etc.
clusterList List cluster infos
topicList Fetch all topic list from name server
updateKvConfig Create or update KV config.
deleteKvConfig Delete KV config.
wipeWritePerm Wipe write perm of broker in all name server you defined in the -n param
addWritePerm Add write perm of broker in all name server you defined in the -n param
resetOffsetByTime Reset consumer offset by timestamp(without client restart).
skipAccumulatedMessage Skip all messages that are accumulated (not consumed) currently
updateOrderConf Create or update or delete order conf
cleanExpiredCQ Clean expired ConsumeQueue on broker.
deleteExpiredCommitLog Delete expired CommitLog files
cleanUnusedTopic Clean unused topic on broker.
startMonitoring Start Monitoring
statsAll Topic and Consumer tps stats
allocateMQ Allocate MQ
checkMsgSendRT Check message send response time
clusterRT List All clusters Message Send RT
getNamesrvConfig Get configs of name server.
updateNamesrvConfig Update configs of name server.
getBrokerConfig Get broker config by cluster or special broker
getConsumerConfig Get consumer config by subscription group name
queryCq Query cq command.
sendMessage Send a message
consumeMessage Consume message
updateAclConfig Update acl config yaml file in broker
deleteAclConfig Delete Acl Config Account in broker
clusterAclConfigVersion List all of acl config version information in cluster
updateGlobalWhiteAddr Update global white address for acl Config File in broker
getAclConfig List all of acl config information in cluster
updateStaticTopic Update or create static topic, which has fixed number of queues
remappingStaticTopic Update or create static topic, which has fixed number of queues
exportMetadata Export metadata
exportConfigs Export configs
exportMetrics Export metrics
haStatus Fetch ha runtime status data
getSyncStateSet Fetch syncStateSet for target brokers
getBrokerEpoch Fetch broker epoch entries
getControllerMetaData Get controller cluster's metadata
getControllerConfig Get controller config.
updateControllerConfig Update controller config.
electMaster Re-elect the specified broker as master
cleanBrokerMetadata Clean metadata of broker on controller
dumpCompactionLog parse compaction log to message
getColdDataFlowCtrInfo get cold data flow ctr info
updateColdDataFlowCtrGroupConfig addOrUpdate cold data flow ctr group config
removeColdDataFlowCtrGroupConfig remove consumer from cold ctr config
setCommitLogReadAheadMode set read ahead mode for all commitlog files
topicList
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicList -n localhost:9876
%RETRY%please_rename_unique_group_name
RMQ_SYS_TRANS_HALF_TOPIC
stringRequestTopic
%RETRY%objectRequestConsumer
%RETRY%please_rename_unique_group_name_4
TRANS_CHECK_MAX_TIME_TOPIC
BenchmarkTest
%RETRY%genericRequestConsumer
string-topic
TBW102
rmq_sys_REVIVE_LOG_DefaultCluster
SELF_TEST_TOPIC
%RETRY%string_consumer_newns
SCHEDULE_TOPIC_XXXX
DefaultCluster_REPLY_TOPIC
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23
RMQ_SYS_TRANS_OP_HALF_TOPIC
TopicTest
localhost.localdomain
order-paid-topic
%RETRY%my-group1
user-topic
%RETRY%string_trans_consumer
message-ext-topic
OFFSET_MOVED_EVENT
%RETRY%user_consumer
%RETRY%order-paid-consumer
yeqiang-MS-7B23
DefaultCluster
spring-transaction-topic
%RETRY%stringRequestConsumer
bytesRequestTopic
%RETRY%string_consumer
%RETRY%bytesRequestConsumer
%RETRY%rocketmq-consume-demo-message-ext-consumer
statsAll
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin statsAll -n localhost:9876
#Topic #Consumer Group #Accumulation #InTPS #OutTPS #InMsg24Hour #OutMsg24Hour
RMQ_SYS_TRANS_HALF_TOPIC CID_RMQ_SYS_TRANS 0 0.00 0.00 0 0
stringRequestTopic stringRequestConsumer 1 0.00 0.00 0 0
TRANS_CHECK_MAX_TIME_TOPIC 0 0.00 0 NO_CONSUMER
BenchmarkTest 0 0.00 0 NO_CONSUMER
string-topic string_consumer 106 0.00 0.00 0 0
string-topic string_consumer_newns 63 0.00 0.00 0 0
TBW102 0 0.00 0 NO_CONSUMER
rmq_sys_REVIVE_LOG_DefaultCluster 0 0.00 0 NO_CONSUMER
SELF_TEST_TOPIC 0 0.00 0 NO_CONSUMER
SCHEDULE_TOPIC_XXXX 0 0.00 0 NO_CONSUMER
DefaultCluster_REPLY_TOPIC 0 0.00 0 NO_CONSUMER
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23 0 0.00 0 NO_CONSUMER
RMQ_SYS_TRANS_OP_HALF_TOPIC CID_RMQ_SYS_TRANS 0 0.00 0.00 0 0
TopicTest please_rename_unique_group_name 252 0.00 0.00 0 0
TopicTest please_rename_unique_group_name_4 0 0.00 0.00 0 0
localhost.localdomain 0 0.00 0 NO_CONSUMER
order-paid-topic order-paid-consumer 1 0.00 0.00 0 0
user-topic user_consumer 2 0.00 0.00 0 0
message-ext-topic rocketmq-consume-demo-message-ext-consumer 2 0.00 0.00 0 0
OFFSET_MOVED_EVENT 0 0.00 0 NO_CONSUMER
yeqiang-MS-7B23 0 0.00 0 NO_CONSUMER
DefaultCluster 0 0.00 0 NO_CONSUMER
spring-transaction-topic string_trans_consumer 15 0.00 0.00 0 0
bytesRequestTopic bytesRequestConsumer 0 0.00 0.00 0 0
topicStatus
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus -n localhost:9876 -t string-topic
#Broker Name #QID #Min Offset #Max Offset #Last Updated
yeqiang-MS-7B23 0 0 35 2023-08-25 16:21:35,786
yeqiang-MS-7B23 1 0 52 2023-08-25 14:55:57,152
yeqiang-MS-7B23 2 0 33 2023-08-25 16:21:35,646
yeqiang-MS-7B23 3 0 42 2023-08-25 14:55:57,172
yeqiang-MS-7B23 4 0 1 2023-08-25 16:21:34,355
yeqiang-MS-7B23 5 0 1 2023-08-25 14:55:57,105
yeqiang-MS-7B23 6 0 4 2023-08-25 16:23:01,489
yeqiang-MS-7B23 7 0 1 2023-08-25 16:21:36,186
Python 生产者:producer.py
from rocketmq.client import Producer, Message
groupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
TAGS = "tag-my-group1"
KEYS = "key-my-group1-0"
# 初始化生产者,并设置生产组信息,组名称使用全称,例:rocketmq-xxx|namespace_python%group1
producer = Producer(groupName)
# 设置服务地址
producer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# producer.set_session_credentials(
# accessKey, # 角色密钥
# secretKey, # 角色名称
# ''
# )
# 启动生产者
producer.start()
# 组装消息 topic名称,在控制台 topic 页面复制。
msg = Message(topicName)
# 设置keys
msg.set_keys(TAGS)
# 设置tags
msg.set_tags(KEYS)
# 消息内容
msg.set_body('This is a new message.')
# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
# 资源释放
producer.shutdown()
运行
yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ source /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/activate
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012857767267388CFD61230000 35
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$
mqadmin查询topic状态
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus -n localhost:9876 -t string-topic
#Broker Name #QID #Min Offset #Max Offset #Last Updated
yeqiang-MS-7B23 0 0 36 2023-08-28 09:03:35,722
yeqiang-MS-7B23 1 0 52 2023-08-25 14:55:57,152
yeqiang-MS-7B23 2 0 33 2023-08-25 16:21:35,646
yeqiang-MS-7B23 3 0 42 2023-08-25 14:55:57,172
yeqiang-MS-7B23 4 0 1 2023-08-25 16:21:34,355
yeqiang-MS-7B23 5 0 1 2023-08-25 14:55:57,105
yeqiang-MS-7B23 6 0 4 2023-08-25 16:23:01,489
yeqiang-MS-7B23 7 0 1 2023-08-25 16:21:36,186
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicRoute -n localhost:9876 -t string-topic
{
"brokerDatas":[
{
"brokerAddrs":{0:"10.47.76.67:10911"
},
"brokerName":"yeqiang-MS-7B23",
"cluster":"DefaultCluster",
"enableActingMaster":false
}
],
"filterServerTable":{},
"queueDatas":[
{
"brokerName":"yeqiang-MS-7B23",
"perm":6,
"readQueueNums":8,
"topicSysFlag":0,
"writeQueueNums":8
}
]
}
图形工具rocketmq-dashborad
https://github.com/apache/rocketmq-dashboard
自行编译
mvn clean package -Dmaven.test.skip=true
启动
java -Drocketmq.namesrv.addr=127.0.0.1:9876 -jar target/rocketmq-dashboard-1.0.0.jar
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012DF4226307248D16C3250000 36
consoumer.py
import time
from rocketmq.client import PushConsumer, ConsumeStatus
# 消息处理回调
groupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
KEYS = "key-my-group1-0"
def callback(msg):
# 模拟业务
print('Received message. messageId: ', msg.id, ' body: ', msg.body)
# 消费成功回复CONSUME_SUCCESS
return ConsumeStatus.CONSUME_SUCCESS
# 消费成功回复消息状态
# return ConsumeStatus.RECONSUME_LATER
# 初始化消费者,并设置消费者组信息
consumer = PushConsumer(groupName)
# 设置服务地址
consumer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# consumer.set_session_credentials(
# accessKey, # 角色密钥
# secretKey, # 角色名称
# ''
# )
# 订阅topic
consumer.subscribe(topicName, callback, "*")
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()
while True:
time.sleep(3600)
# 资源释放
consumer.shutdown()
启动python消费者
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/consumer.py
[Consumer] Waiting for messages.
Received message. messageId: 7F0001012DF4226307248D16C3250000 body: b'This is a new message.'
可以看到my-group1已被消费
再启动一个consumer.py,产生一次消息
可以看到,只有一个consumer消费到了消息,说明默认情况下,消息非广播模式。
Java生产一个消息:
training: Java SpringBoot SpringCloud k8s等练习程序 - Gitee.com
python rocketmq依赖
Release rocketmq-client-cpp-2.1.0 · apache/rocketmq-client-cpp · GitHub
python完整程序
python-rocketmq-demo: python3 rocketmq5 的一个例子