RocketMQ mqadmin java springboot python 调用笔记

news2025/1/12 6:06:20

命令

mqadmin命令列表

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin
The most commonly used mqadmin commands are:
   updateTopic               Update or create topic
   deleteTopic               Delete topic from broker and NameServer.
   updateSubGroup            Update or create subscription group
   setConsumeMode            Set consume message mode. pull/pop etc.
   deleteSubGroup            Delete subscription group from broker.
   updateBrokerConfig        Update broker's config
   updateTopicPerm           Update topic perm
   topicRoute                Examine topic route info
   topicStatus               Examine topic Status info
   topicClusterList          Get cluster info for topic
   addBroker                 Add a broker to specified container
   removeBroker              Remove a broker from specified container
   resetMasterFlushOffset    Reset master flush offset in slave
   brokerStatus              Fetch broker runtime status data
   queryMsgById              Query Message by Id
   queryMsgByKey             Query Message by Key
   queryMsgByUniqueKey       Query Message by Unique key
   queryMsgByOffset          Query Message by offset
   queryMsgTraceById         Query a message trace
   printMsg                  Print Message Detail
   printMsgByQueue           Print Message Detail
   sendMsgStatus             Send msg to broker.
   brokerConsumeStats        Fetch broker consume stats data
   producerConnection        Query producer's socket connection and client version
   consumerConnection        Query consumer's socket connection, client version and subscription
   consumerProgress          Query consumers's progress, speed
   consumerStatus            Query consumer's internal data structure
   cloneGroupOffset          Clone offset from other group.
   producer                  Query producer's instances, connection, status, etc.
   clusterList               List cluster infos
   topicList                 Fetch all topic list from name server
   updateKvConfig            Create or update KV config.
   deleteKvConfig            Delete KV config.
   wipeWritePerm             Wipe write perm of broker in all name server you defined in the -n param
   addWritePerm              Add write perm of broker in all name server you defined in the -n param
   resetOffsetByTime         Reset consumer offset by timestamp(without client restart).
   skipAccumulatedMessage    Skip all messages that are accumulated (not consumed) currently
   updateOrderConf           Create or update or delete order conf
   cleanExpiredCQ            Clean expired ConsumeQueue on broker.
   deleteExpiredCommitLog    Delete expired CommitLog files
   cleanUnusedTopic          Clean unused topic on broker.
   startMonitoring           Start Monitoring
   statsAll                  Topic and Consumer tps stats
   allocateMQ                Allocate MQ
   checkMsgSendRT            Check message send response time
   clusterRT                 List All clusters Message Send RT
   getNamesrvConfig          Get configs of name server.
   updateNamesrvConfig       Update configs of name server.
   getBrokerConfig           Get broker config by cluster or special broker
   getConsumerConfig         Get consumer config by subscription group name
   queryCq                   Query cq command.
   sendMessage               Send a message
   consumeMessage            Consume message
   updateAclConfig           Update acl config yaml file in broker
   deleteAclConfig           Delete Acl Config Account in broker
   clusterAclConfigVersion   List all of acl config version information in cluster
   updateGlobalWhiteAddr     Update global white address for acl Config File in broker
   getAclConfig              List all of acl config information in cluster
   updateStaticTopic         Update or create static topic, which has fixed number of queues
   remappingStaticTopic      Update or create static topic, which has fixed number of queues
   exportMetadata            Export metadata
   exportConfigs             Export configs
   exportMetrics             Export metrics
   haStatus                  Fetch ha runtime status data
   getSyncStateSet           Fetch syncStateSet for target brokers
   getBrokerEpoch            Fetch broker epoch entries
   getControllerMetaData     Get controller cluster's metadata
   getControllerConfig       Get controller config.
   updateControllerConfig    Update controller config.
   electMaster               Re-elect the specified broker as master
   cleanBrokerMetadata       Clean metadata of broker on controller
   dumpCompactionLog         parse compaction log to message
   getColdDataFlowCtrInfo    get cold data flow ctr info
   updateColdDataFlowCtrGroupConfig addOrUpdate cold data flow ctr group config
   removeColdDataFlowCtrGroupConfig remove consumer from cold ctr config
   setCommitLogReadAheadMode set read ahead mode for all commitlog files

topicList

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicList  -n localhost:9876
%RETRY%please_rename_unique_group_name
RMQ_SYS_TRANS_HALF_TOPIC
stringRequestTopic
%RETRY%objectRequestConsumer
%RETRY%please_rename_unique_group_name_4
TRANS_CHECK_MAX_TIME_TOPIC
BenchmarkTest
%RETRY%genericRequestConsumer
string-topic
TBW102
rmq_sys_REVIVE_LOG_DefaultCluster
SELF_TEST_TOPIC
%RETRY%string_consumer_newns
SCHEDULE_TOPIC_XXXX
DefaultCluster_REPLY_TOPIC
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23
RMQ_SYS_TRANS_OP_HALF_TOPIC
TopicTest
localhost.localdomain
order-paid-topic
%RETRY%my-group1
user-topic
%RETRY%string_trans_consumer
message-ext-topic
OFFSET_MOVED_EVENT
%RETRY%user_consumer
%RETRY%order-paid-consumer
yeqiang-MS-7B23
DefaultCluster
spring-transaction-topic
%RETRY%stringRequestConsumer
bytesRequestTopic
%RETRY%string_consumer
%RETRY%bytesRequestConsumer
%RETRY%rocketmq-consume-demo-message-ext-consumer

statsAll

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin statsAll  -n localhost:9876 
#Topic                                                            #Consumer Group                                                  #Accumulation      #InTPS     #OutTPS   #InMsg24Hour  #OutMsg24Hour
RMQ_SYS_TRANS_HALF_TOPIC                                          CID_RMQ_SYS_TRANS                                                           0        0.00        0.00              0              0
stringRequestTopic                                                stringRequestConsumer                                                       1        0.00        0.00              0              0
TRANS_CHECK_MAX_TIME_TOPIC                                                                                                                    0        0.00                          0    NO_CONSUMER
BenchmarkTest                                                                                                                                 0        0.00                          0    NO_CONSUMER
string-topic                                                      string_consumer                                                           106        0.00        0.00              0              0
string-topic                                                      string_consumer_newns                                                      63        0.00        0.00              0              0
TBW102                                                                                                                                        0        0.00                          0    NO_CONSUMER
rmq_sys_REVIVE_LOG_DefaultCluster                                                                                                             0        0.00                          0    NO_CONSUMER
SELF_TEST_TOPIC                                                                                                                               0        0.00                          0    NO_CONSUMER
SCHEDULE_TOPIC_XXXX                                                                                                                           0        0.00                          0    NO_CONSUMER
DefaultCluster_REPLY_TOPIC                                                                                                                    0        0.00                          0    NO_CONSUMER
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23                                                                                                    0        0.00                          0    NO_CONSUMER
RMQ_SYS_TRANS_OP_HALF_TOPIC                                       CID_RMQ_SYS_TRANS                                                           0        0.00        0.00              0              0
TopicTest                                                         please_rename_unique_group_name                                           252        0.00        0.00              0              0
TopicTest                                                         please_rename_unique_group_name_4                                           0        0.00        0.00              0              0
localhost.localdomain                                                                                                                         0        0.00                          0    NO_CONSUMER
order-paid-topic                                                  order-paid-consumer                                                         1        0.00        0.00              0              0
user-topic                                                        user_consumer                                                               2        0.00        0.00              0              0
message-ext-topic                                                 rocketmq-consume-demo-message-ext-consumer                                  2        0.00        0.00              0              0
OFFSET_MOVED_EVENT                                                                                                                            0        0.00                          0    NO_CONSUMER
yeqiang-MS-7B23                                                                                                                               0        0.00                          0    NO_CONSUMER
DefaultCluster                                                                                                                                0        0.00                          0    NO_CONSUMER
spring-transaction-topic                                          string_trans_consumer                                                      15        0.00        0.00              0              0
bytesRequestTopic                                                 bytesRequestConsumer                                                        0        0.00        0.00              0              0

topicStatus

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus  -n localhost:9876 -t string-topic
#Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated
yeqiang-MS-7B23                   0     0                     35                      2023-08-25 16:21:35,786
yeqiang-MS-7B23                   1     0                     52                      2023-08-25 14:55:57,152
yeqiang-MS-7B23                   2     0                     33                      2023-08-25 16:21:35,646
yeqiang-MS-7B23                   3     0                     42                      2023-08-25 14:55:57,172
yeqiang-MS-7B23                   4     0                     1                       2023-08-25 16:21:34,355
yeqiang-MS-7B23                   5     0                     1                       2023-08-25 14:55:57,105
yeqiang-MS-7B23                   6     0                     4                       2023-08-25 16:23:01,489
yeqiang-MS-7B23                   7     0                     1                       2023-08-25 16:21:36,186

Python 生产者:producer.py

from rocketmq.client import Producer, Message

groupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
TAGS = "tag-my-group1"
KEYS = "key-my-group1-0"
# 初始化生产者,并设置生产组信息,组名称使用全称,例:rocketmq-xxx|namespace_python%group1
producer = Producer(groupName)
# 设置服务地址
producer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# producer.set_session_credentials(
#     accessKey,  # 角色密钥
#     secretKey,  # 角色名称
#     ''
# )
# 启动生产者
producer.start()

# 组装消息   topic名称,在控制台 topic 页面复制。
msg = Message(topicName)
# 设置keys
msg.set_keys(TAGS)
# 设置tags
msg.set_tags(KEYS)
# 消息内容
msg.set_body('This is a new message.')

# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
# 资源释放
producer.shutdown()

运行

yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ source /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/activate
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012857767267388CFD61230000 35
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ 

mqadmin查询topic状态

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus  -n localhost:9876 -t string-topic
#Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated
yeqiang-MS-7B23                   0     0                     36                      2023-08-28 09:03:35,722
yeqiang-MS-7B23                   1     0                     52                      2023-08-25 14:55:57,152
yeqiang-MS-7B23                   2     0                     33                      2023-08-25 16:21:35,646
yeqiang-MS-7B23                   3     0                     42                      2023-08-25 14:55:57,172
yeqiang-MS-7B23                   4     0                     1                       2023-08-25 16:21:34,355
yeqiang-MS-7B23                   5     0                     1                       2023-08-25 14:55:57,105
yeqiang-MS-7B23                   6     0                     4                       2023-08-25 16:23:01,489
yeqiang-MS-7B23                   7     0                     1                       2023-08-25 16:21:36,186

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicRoute  -n localhost:9876 -t string-topic
{
	"brokerDatas":[
		{
			"brokerAddrs":{0:"10.47.76.67:10911"
			},
			"brokerName":"yeqiang-MS-7B23",
			"cluster":"DefaultCluster",
			"enableActingMaster":false
		}
	],
	"filterServerTable":{},
	"queueDatas":[
		{
			"brokerName":"yeqiang-MS-7B23",
			"perm":6,
			"readQueueNums":8,
			"topicSysFlag":0,
			"writeQueueNums":8
		}
	]
}

图形工具rocketmq-dashborad

https://github.com/apache/rocketmq-dashboard

自行编译

mvn clean package -Dmaven.test.skip=true

启动

java -Drocketmq.namesrv.addr=127.0.0.1:9876 -jar target/rocketmq-dashboard-1.0.0.jar

 

 

(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012DF4226307248D16C3250000 36

 

 consoumer.py

import time
from rocketmq.client import PushConsumer, ConsumeStatus
# 消息处理回调

groupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
KEYS = "key-my-group1-0"


def callback(msg):
    # 模拟业务
    print('Received message. messageId: ', msg.id, ' body: ', msg.body)
    # 消费成功回复CONSUME_SUCCESS
    return ConsumeStatus.CONSUME_SUCCESS
    # 消费成功回复消息状态
    # return ConsumeStatus.RECONSUME_LATER


# 初始化消费者,并设置消费者组信息
consumer = PushConsumer(groupName)
# 设置服务地址
consumer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# consumer.set_session_credentials(
#     accessKey,	 # 角色密钥
#     secretKey,   # 角色名称
#     ''
# )
# 订阅topic
consumer.subscribe(topicName, callback, "*")
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()

while True:
    time.sleep(3600)
# 资源释放
consumer.shutdown()

启动python消费者

(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/consumer.py
 [Consumer] Waiting for messages.
Received message. messageId:  7F0001012DF4226307248D16C3250000  body:  b'This is a new message.'

可以看到my-group1已被消费 

再启动一个consumer.py,产生一次消息

可以看到,只有一个consumer消费到了消息,说明默认情况下,消息非广播模式。

Java生产一个消息:

training: Java SpringBoot SpringCloud k8s等练习程序 - Gitee.com

 

 

 

python rocketmq依赖

Release rocketmq-client-cpp-2.1.0 · apache/rocketmq-client-cpp · GitHub

python完整程序

python-rocketmq-demo: python3 rocketmq5 的一个例子

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/938582.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

免费数据恢复软件推荐,5步快速恢复数据!

“我是一名学生,前几天把小组汇报资料保存在电脑里,但是不知道为什么这个汇报文件丢失了,请问大家有什么免费的数据恢复软件可以帮我恢复这个文件吗?非常着急!” 数据丢失是每个电脑用户都可能面临的问题。无论是因为误…

github实用指令(实验室打工人入门必备)

​​​​​​​​博主进入实验室啦,作为一只手残党决定在这里分享一些常用的github使用情景和操作指南来解救其他手残党。 内容随着情景增加实时更新。如果只有没几个内容说明场景不多(相信对手残党而言是再好不过的消息) 情景一&#xff1a…

Java会因容器技术盛行而没落吗?

点击下方“JavaEdge”,选择“设为星标” 第一时间关注技术干货! 免责声明~ 任何文章不要过度深思! 万事万物都经不起审视,因为世上没有同样的成长环境,也没有同样的认知水平,更「没有适用于所有人的解决方案…

sdl环境搭建

sdl教程地址 https://lazyfoo.net/tutorials/SDL/index.php 新建C项目&#xff08;空项目&#xff09; 新增src和include目录 新建main.cpp文件 main.cpp #include <SDL/SDL.h> #include <iostream>int main(int argc, char* args[]) {if (SDL_Init(SDL_INIT…

一直在期待的基于 Ubuntu 的滚动发布 Rhino Linux 终于来了

导读现在我们就一起来看看 Rhino Linux 有哪些值得特别关注的地方。 Hands of an office woman typing 你可能还记得我们 去年 报道过&#xff0c;Rhino Linux 将会接替现已停止开发的 “Rolling Rhino Remix”。 经过漫长的等待&#xff0c;它的首个稳定版本终于发布了&…

Python采集关键词结果辅助写作

大家好&#xff01;在进行学术研究和 写作时&#xff0c;获取准确、全面的文献资料和相关研究成果是非常重要的。在本文中&#xff0c;我将与你分享使用Python爬虫 采集 学术关键词结果来辅助 写作的方法&#xff0c;帮助你快速获取与研究主题相关的学术文献和 。 **1. 设置搜索…

旺店通·旗舰奇门和金蝶云星空单据接口对接

旺店通旗舰奇门和金蝶云星空单据接口对接 来源系统:金蝶云星空 金蝶K/3Cloud结合当今先进管理理论和数十万家国内客户最佳应用实践&#xff0c;面向事业部制、多地点、多工厂等运营协同与管控型企业及集团公司&#xff0c;提供一个通用的ERP服务平台。K/3Cloud支持的协同应用包…

Spring Boot 整合 分布式搜索引擎 Elastic Search 实现 数据聚合

文章目录 ⛄引言一、数据聚合⛅简介⚡聚合的分类 二、DSL实现数据聚合⏰Bucket聚合⚡Metric聚合 三、RestAPI实现数据聚合⌚业务需求⏰业务代码实现 ✅效果图⛵小结 ⛄引言 本文参考黑马 分布式Elastic search Elasticsearch是一款非常强大的开源搜索引擎&#xff0c;具备非常…

洁净区环境监测如何操作?

洁净区环境监测 如何操作 洁净区洁净等级划分为&#xff1a; A级&#xff1a;指高风险操作区&#xff0c;如&#xff1a;灌装、放置胶塞桶、敞口安瓿瓶、敞口西林瓶的区域及无菌装配或连接操作的区域。通常用层流操作台&#xff08;罩&#xff09;来维持该区的环境状态。 B级…

【算法与数据结构】404、LeetCode左叶子之和

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析&#xff1a;思路比较简单&#xff0c;遍历所有节点然后判断该节点是否为左叶子节点&#xff0c;如果是&#xff0c…

【React学习】—React中的事件绑定(八)

【React学习】—React中的事件绑定&#xff08;八&#xff09; 一、原生JS <body><button id"btn1">按钮1</button><button id"btn2">按钮2</button><button onclick"demo()">按钮3</button><scr…

华为云服务器以编译方式安装mysql(附带常见报错解决方案)

文章内容较长&#xff0c;请参考目录进行操作。 目录 1、检测系统是否自带mysql 2、下载MySQL 3、安装MySQL 4、环境变量配置 5、下载/升级依赖 1&#xff09;定位问题 2&#xff09;解决问题 gcc版本过低&#xff1a; 使用devtoolset来升级gcc版本 1)安装 centos-re…

实例045 使用任意组件拖动窗体

实例说明 通常将鼠标按住窗口的标题栏才能够拖动窗口&#xff0c;但是&#xff0c;在没有窗口标题栏的情况下如何拖动窗体呢&#xff1f;本例将会利用窗口中的控件拖动窗口&#xff0c;将鼠标放在按钮上然后按住鼠标左键移动鼠标即可拖动窗体。实例效果如图1.46所示。 技术要点…

​什么是502 bad gateway 报错和解决办法

什么是502 bad gateway 报错 简单来说 502 是报错类型代码 bad gateway 错误的网关。是Web服务器作为网关或代理服务器时收到无效的响应。 用我们的口语说就是运行网站的服务器暂时挂了(不响应)。 产生错误的原因 1.连接超时 我们向服务器发送请求 由于服务器当前链接太多&am…

vue三级市区联动

默认返回值格式&#xff1a;all:code、name都返回 name:只返回name code:只返回code&#xff0c;level&#xff1a;可设置显示层级 1&#xff1a; 省 2&#xff1a; 省、市 3&#xff1a; 省、市、区 v-model 默认值 可以是 name: [ "天津市", "天津市",…

Datatable:Python数据分析提速高手,飞一般的感觉!

1 前言 Datatable是一个Python库&#xff1a; 详细介绍大家可以去官网查看&#xff1a; https://datatable.readthedocs.io/en/latest/?badgelatest Datatable的有点包括&#xff1a; 高效的多线程算法 Memory-thrifty 内存映射磁盘上的数据集 本地C实现 完全开源 Da…

iTunes怎么备份?1招教你轻松搞定

相比于苹果手机的iCloud备份&#xff0c;使用iTunes备份具有以下优点&#xff1a;1、备份容量不受限制&#xff1b;2、备份后的文件就像普通文档一样&#xff0c;可以随时进行查看和管理。本文将为大家介绍itunes怎么备份、如何对备份进行加密以及怎么删除备份的方法&#xff0…

nginx配置keepalive长连接

nginx之keepalive详解与其配置_keepalive_timeout_恒者走天下的博客-CSDN博客 为什么要有keepalive? 因为每次建立tcp都要建立三次握手&#xff0c;消耗时间较长&#xff0c;所以为了减少tcp建立连接需要的时间&#xff0c;就可以设置keep_alive长连接。 nginx中keep_alive对…

Java:Map集合的三种遍历方式和常见案例

Map集合的遍历方式 方式一&#xff1a;键找值 遍历方式二&#xff1a;键值对 遍历方式三&#xff1a;Lambda表达式 Map集合的常见案例 需求 某个班级80名学生&#xff0c;现在需要组织秋游活动&#xff0c;班长提供了四个景点依次是(A、B、C、D),每个学生只能选择一个景点&am…

阿里云CDN缓存预热与刷新以及常见的故障汇总

文章目录 1.为CDN缓存的文件增加过期时间2.CDN缓存预热配置3.CDN缓存刷新配置4.常见故障 CDN缓存预热指的是主动将要缓存的文件推送到全国各地的CDN边缘加速器上&#xff0c;减少回源率&#xff0c;提供命中率。 缓存刷新指的是后期上传了同名的文件&#xff0c;之前的缓存已经…