RocketMQ常用基本操作

news2025/1/20 18:20:08

文章中的rabbitmq使用的是rocketmq-all-5.1.3-bin-release版本,需要安装包的可自行下载

RockerMQ启动停止命令

启动命令

nohup sh bin/mqnamesrv &

nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

查看日志

tail -f ~/logs/rocketmqlogs/namesrv.log

tail -f ~/logs/rocketmqlogs/proxy.log

停止命令

sh bin/mqshutdown broker

sh bin/mqshutdown namesrv

集群状态

sh mqadmin clusterList -n 127.0.0.1:9876

创建topic

sh mqadmin updateTopic -n 127.0.0.1:9876 rocket_test

查看所有topic信息

sh mqadmin topicList -n 127.0.0.1:9876

sh mqadmin topicList -n 127.0.0.1:9876 -c

查看 Topic 路由信息

sh mqadmin topicRoute -n 127.0.0.1:9876 -t TopicTest

发送测试消息

export NAMESRV_ADDR=localhost:9876

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

消费消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

Java代码收发消息

Producer

package com.rocket.demo;

import com.alibaba.fastjson.JSON;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.HashMap;

import java.util.Map;

public class RocketProducerDemo {

    private final static String nameServer = "127.0.0.1:9876";

    private final static String producerGroup = "my_group2";

    // debezium-mysql-source-topic topic-test

    private final static String topic = "TopicTest";

    public static void main(String[] args) {

        try {

            // 初始化一个producer并设置Producer group name

            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);

//            DefaultMQProducer producer = new DefaultMQProducer();

            // 设置NameServer地址

            producer.setNamesrvAddr(nameServer);

            // 启动producer

            producer.start();

            for (int i = 0; i < 100; i++) {

                Map<String, String> data = new HashMap();

                data.put("id", i+"");

                data.put("name", i+","+System.currentTimeMillis());

                // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤

                Message msg = new Message("TopicTest", "tagA", JSON.toJSONString(data).getBytes(RemotingHelper.DEFAULT_CHARSET));

                // 利用producer进行发送,并同步等待发送结果

                SendResult sendResult = producer.send(msg, 10000);

                System.out.println(sendResult);

            }

            // 一旦producer不再使用,关闭producer

            producer.shutdown();

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

Consumer

package com.rocket.demo;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class RocketConsumerDemo {

    public static void main(String[] args) throws Exception {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_group");

        consumer.setNamesrvAddr("localhost:9876");

        // debezium-mysql-source-topic  topic-test debezium-mysql-source db-history-debezium-topic debezium-mysql-source

        consumer.subscribe("TopicTest", "*"); // 订阅主题和标签,* 表示订阅所有标签

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {

                for (MessageExt message : messages) {

                    System.out.println("Received message: " + new String(message.getBody()));

                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            }

        });

        consumer.start();

        System.out.println("Consumer started");

    }

}

常见问题

service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL: 0.96 CQ: 0.96 INDEX: 0.96], messages are put to the slave, message store has been shut down

错误原因:博主测试的服务器磁盘使用率到0.96了,rocketmq不允许磁盘超过0.9,清理下磁盘数据即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1880698.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

sqlmap注入详解

免责声明:本文仅做分享... 目录 1.介绍 2.特点 3.下载 4.帮助文档 5.常见命令 指定目标 请求 HTTP cookie头 HTTP User-Agent头 HTTP协议的证书认证 HTTP(S)代理 HTTP请求延迟 设定超时时间 设定重试超时 设定随机改变的参数值 利用正则过滤目标网址 避免过多的…

江大白 | 何凯明入职 MIT,首次带队提出Diffusion Loss,扩散模型思想提升生成速度和效果 !

本文来源公众号“江大白”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;何凯明入职 MIT&#xff0c;首次带队提出Diffusion Loss&#xff0c;扩散模型思想提升生成速度和效果 &#xff01; 导读 在图像生成领域中&#xff0c;作…

在数字化转型中,中小企业如何打造数字化产品和服务?

引言&#xff1a;随着社会的发展和消费者行为的变化&#xff0c;市场对数字化产品和服务的需求日益增长。中小企业需要紧跟这一趋势&#xff0c;通过开发数字化产品和服务来满足消费者的新需求。云计算、大数据、人工智能等先进技术的出现&#xff0c;为中小企业提供了更多的机…

志愿填报指南:为什么我强烈建议你报考计算机专业

首先恭喜2024届高考的同学们&#xff0c;你们已经通过了高考的考验&#xff0c;即将进入人生的新阶段——大学。 现在正是高考完填报志愿的时刻&#xff0c;Left听到身边朋友提到报考志愿的诸多问题&#xff1a; 志愿填报怎么填&#xff1f;我要报考什么专业&#xff1f;这个…

基于线调频小波变换的非平稳信号分析方法(MATLAB)

信号处理领域学者为了改进小波变换在各时频区间能量聚集性不高的缺点&#xff0c;有学者在小波分析基础上引入调频算子构成了线性调频小波变换&#xff0c;线调频小波一方面继承了小波变换的理论完善性&#xff0c;另一方面用一个新的参数&#xff08;线调频参数&#xff09;刻…

ONLYOFFICE 桌面编辑器 8.1 现已发布:功能完善的 PDF 编辑器、幻灯片版式、改进从右至左显示、新的本地化选项等

继 ONLYOFFICE 文档 8.1 发布后&#xff0c;适用于 Linux、Windows 和 macOS 的 ONLYOFFICE 桌面应用程序最新版本也已推出。它具有在线套件的最主要功能&#xff0c;例如功能齐全的 PDF 编辑器、演示文稿中的幻灯片版式、改进的 RTL 支持、新的本地化选项等。 点击进入ONLYOF…

vue中【事件修饰符号】详解

在Vue中&#xff0c;事件修饰符是一种特殊的后缀&#xff0c;用于修改事件触发时的默认行为。以下是Vue中常见的事件修饰符的详细解释&#xff1a; .stop 调用event.stopPropagation()&#xff0c;阻止事件冒泡。当你在嵌套元素中都有相同的事件监听器&#xff08;如click事件…

100张linux C/C++工程师面试高质量图

文章目录 杂项BIOSlinux开机启动流程内核启动流程网络编程网络编程流程tcp状态机三次握手四次断开reactor模型proactor模型select原理poll原理epoll原理文件系统虚拟文件系统文件系统调用阻塞IO非阻塞IO异步IO同步阻塞同步非阻塞IO多路复用进程管理进程状态程序加载内存管理MMU…

【高中数学/基本不等式】已知:x,y皆为正实数,且2xy+x+6y=6 求:x+2y的最小值

【题目】 已知&#xff1a;x,y皆为正实数&#xff0c;且2xyx6y6 求&#xff1a;x2y的最小值 【解答】 解法一&#xff1a;因为2xyx6y6 可转换为(x3)(2y1)-36 得到(x3)(2y1)9 而x2yx3-32y1-1 (x3)(2y1)-4 >2*根号下[(x3)(2y1)]-4 2*3-4 2 解法二&#xff1a…

5.4符号三角形问题

#include<iostream> #include<stdio.h> using namespace std; int half; int ssum; int cnt0;//减号的个数 int n; int p[100][100]; int countt0; void BackTrack(int s) {if(cnt>half||s*(s-1)/2-cnt>half)return ;if(s>n){countt;return ;}for(int i0;…

【从零开始学架构 架构基础】五 架构设计的复杂度来源:低成本、安全、规模

架构设计的复杂度来源其实就是架构设计要解决的问题&#xff0c;主要有如下几个&#xff1a;高性能、高可用、可扩展、低成本、安全、规模。复杂度的关键&#xff0c;就是新旧技术之间不是完全的替代关系&#xff0c;有交叉&#xff0c;有各自的特点&#xff0c;所以才需要具体…

Android Studio环境搭建(4.03)和报错解决记录

1.本地SDK包导入 安装好IDE以及下好SDK包后&#xff0c;先不要管IDE的引导配置&#xff0c;直接新建一个新工程&#xff0c;进到开发界面。 SDK路径配置&#xff1a;File---->>Other Settings---->>Default Project Structure 拷贝你SDK解压的路径来这&#xff0c;…

Hugging Face发布重量级版本:Transformer 4.42

Hugging Face 宣布发布Transformer 4.42&#xff0c;该版本为流行的机器学习库带来了许多新功能和增强功能。此版本引入了几个高级模型&#xff0c;支持新工具和检索增强生成 &#xff08;RAG&#xff09;&#xff0c;提供 GGUF 微调&#xff0c;并整合了量化的 KV 缓存&#x…

2029年AI服务器出货量将突破450万台,AI推理服务器即将爆发式增长

在2020年&#xff0c;新冠疫情与远程办公模式的兴起推动了所有类型服务器的出货量达到峰值&#xff0c;随后几年里&#xff0c;除了AI服务器之外的所有类别都回归到了正常水平。 根据Omdia的研究数据&#xff0c;AI服务器的出货量在2020年急剧上升&#xff0c;并且至今未显示出…

每日一题——Python实现PAT乙级1073 多选题常见计分法(举一反三+思想解读+逐步优化)9千字好文

一个认为一切根源都是“自己不够强”的INTJ 个人主页&#xff1a;用哲学编程-CSDN博客专栏&#xff1a;每日一题——举一反三Python编程学习Python内置函数 Python-3.12.0文档解读 目录 初次尝试 再次尝试 有何不同 版本一&#xff08;原始版本&#xff09;&#xff1a;…

CVE-2019-12272 Openwrt可视页面LuCi命令注入漏洞复现(完结)

声明 本文所使用的一些源代码等内容已经上传至github&#xff0c;具体地址如下 Vulnerability_POC-EXP/OpenWrt/CVE-2019-12272 at main a2148001284/Vulnerability_POC-EXP GitHub 漏洞简介 参考内容&#xff1a; CVE-2019-12272 OpenWrt图形化管理界面LuCI命令注入分析 |…

C# YoloV8 模型效果验证工具(OnnxRuntime+ByteTrack推理)

C# YoloV8 模型效果验证工具(OnnxRuntimeByteTrack推理) 目录 效果 项目 代码 下载 效果 模型效果验证工具 项目 代码 using ByteTrack; using OpenCvSharp; using System; using System.Collections.Generic; using System.Diagnostics; using System.Drawing; using Sys…

Knife4j 2.2.X 版本 swagger彻底禁用

官方文档配置权限&#xff1a;https://doc.xiaominfo.com/v2/documentation/accessControl.html#_3-5-1-%E7%94%9F%E4%BA%A7%E7%8E%AF%E5%A2%83%E5%B1%8F%E8%94%BD%E8%B5%84%E6%BA%90 通常有时候我们碰到的问题如下&#xff1a; 在开发Knife4j功能时,同很多开发者经常讨论的问…

通用管理页面的功能实现

在Windows Forms&#xff08;WinForms&#xff09;应用程序中&#xff0c;创建一个通用的管理页面通常涉及对数据的增删改查&#xff08;CRUD&#xff09;操作&#xff0c;以及一些额外的功能&#xff0c;如数据过滤、排序、导出和导入等。 先看一个仓库管理页面要素。 仓库管…

基于elastic stack的docker-compose部署的ELK与LDAP集成

说明&#xff1a; ldap信息配置到es配置文件上&#xff0c;然后kibana读取es的配置信息 用户与角色的关系通过role_mapping.yml文件配置获取 角色与权限的关系通过elastic stack提供的DevTools或API进行维护 一、前置条件&#xff1a; 1.1 es已开启xpack&#xff08;已开启…