Linux 下 RocketMQ 安装、配置与运维(详细讲解)

news2025/1/12 1:08:10

一  RocketMQ 下载安装

1 下载 RocketMQ:        

    下载当前最新版本RocketMQ

   官网下载:    https://dist.apache.org/repos/dist/release/rocketmq/5.3.0/rocketmq-all-5.3.0-bin-release.zip

wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.0/rocketmq-al                                                                                                            l-5.3.0-bin-release.zip

  执行下载图:

  下载成功图:

2 安装RocketMQ

  安装过程非常简单,解压RocketMQ压缩包即可

unzip rocketmq-all-5.3.0-bin-release.zip

    解压过程中: 

3 验证安装

1 启动NameServer

  以后台启动NameServer服务:

nohup sh bin/mqnamesrv &

  执行后看到创建了个后台进程,但此时并无法看到日记

打开日记查看执行效果:

tail -f nohup.out

2 启动Broker

启动Broker 可以加上--enable-proxy 方式启动代理,也可正常启动不使用代理,如下:

# 开启代理方式启动
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

# 默认不使用代理方式启动
nohup sh bin/mqbroker -n localhost:9876

启动成功如下图: 

启动成功, 看一下brokerIP xx.xx.xx.xx10911 如果是内网IP外网是无法访问的,需要配置外网IP,云服务器如果使用默认配置一般是内网IP

 
配置broker外网IP

 rocketMQ主目录\conf\broker.conf

brokerIP1=外网IP地址

 增加后如下图: 

3 测试连通

  使用自带工具验证本地环境:

  生产端发送测试

#先设置工具依赖变量
export NAMESRV_ADDR=localhost:9876

#测试生产端发送
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

执行如下: 

 

从上图中可以看到send_ok,说明生产端已正常发送信息到队列。

测试接收端:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

执行后则到队列的信息如下图:

按ctrl+c结束接收测试。 

从上面看,本地生产发送与接收数据正常,基本可以判断本地安装正常。

注意事项

broket 启动时默认启动脚本内存参数是使用8G内存。如果您的内存足够可以继续增加,如果内存有限则要缩小, 如果内存小于8G可能存在报错:

修改默认内存,文件位置:bin/runbroker.sh,如下图:

用vi打开脚本,找到配置内存参数如下图:

配置项:JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g",根据实际内存业务情况,变更-Xms8g -Xmx8g参数大小即可。

二 JAVA客户端连接

1 mvn 项目引入依赖如下:

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.5.2</version>
        </dependency>

2 生产发送端java代码:

 java生产端发送消息代码:


import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;

@Service
public class MQRocketServiceImpl {

    private DefaultMQProducer producer;

    private static final Logger logger = LogManager.getLogger(MQRocketServiceImpl.class);

    @PostConstruct
    public void initProducer() throws MQClientException {
        producer = new DefaultMQProducer("CONSUMER_GROUP");
        producer.setNamesrvAddr(xx.xx.xx.xx:9876);
        producer.setInstanceName(RunTimeUtil.getRocketMqUniqeInstanceName());
        producer.start();
    }

    @PreDestroy
    public void shutdownProducer() {
        if (producer != null) {
            producer.shutdown();
        }
    }

    public boolean sendMsg(String text, String key) {
        try {
            Message msg = new Message(
                    MqCfg.TOPIC,
                    MqCfg.SUB_EXPRESSION,
                    key,
                    text.getBytes(StandardCharsets.UTF_8) // 使用标准字符集
            );
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    logger.info("Message sent successfully: {}", sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    logger.error("Failed to send message", e);
                }
            });
            return true;
        } catch (UnsupportedEncodingException | MQClientException | RemotingException | InterruptedException e) {
            logger.error("Error sending message", e);
            return false;
        }
    }
}

执行后发送成功打印结果:

 3 服务端接收代码:

 java服务端接收代码


import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class MsgReceiveServiceImpl implements ApplicationRunner {

    @Autowired
    private PackageHandlerImpl packageHandler;

    private static final Logger logger = LogManager.getLogger(MsgReceiveServiceImpl.class);

    @Override
    public void run(ApplicationArguments args) {
        receiveQueue();
    }

    private void receiveQueue() {
       

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MqCfg.CONSUMER_GROUP);
        consumer.setNamesrvAddr("xx.xx.xx.xx:9076");

        try {
            consumer.subscribe(MqCfg.TOPIC, MqCfg.SUB_EXPRESSION);
            consumer.registerMessageListener((MessageListenerOrderly) this::processMessages);
            consumer.start();
            logger.info("MQ消费者启动成功。");
        } catch (MQClientException e) {
            logger.error("MQ消费者启动失败!", e);
            throw new RuntimeException("连接MQ错误,启动失败!", e);
        }
    }


    private ConsumeOrderlyStatus processMessages(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            try {
                String text = new String(msg.getBody());
                //消息处理……
            } catch (Exception e) {
                logger.error("处理消息出错,key={},错误信息:", msg.getKeys(), e);
                // 可以在此处根据业务需求返回 SUSPEND_CURRENT_QUEUE_A_MOMENT,或者选择其他处理方式
            }
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
}

 以上代码实现ApplicationRunner接口,使其作为后台一个线程单独处理消息。

三 RocketMQ 运维

1 RocketMQ面板,可视化管理

RocketMQ与apacheMQ不同,本身没有自带面板查看状态工具,查看队列的状态要依赖命令行这对运维或开发都非常不方便,需要另外安装Rocket MQ 面板工具rocketmq-dashboard:

开源地址:https://github.com/apache/rocketmq-dashboard

下载回来后直接用IDE开发环境运行,也可以打包后放在服务器一起运行,如下图:

开发环境运行视图如下 

除了一般的统计信息,还可以进行管理,功能丰富,如下图:

有此神器作观察,相信运维不再什么难事。

2 RocketMQ变更默认端口

1 修改方式一(4.x ,以前的版本是可以的):

 要更改 RocketMQ 的本地部署中的端口,您需要修改 RocketMQ 的配置文件。RocketMQ 的配置  主要包括 broker.confnamesrv.conf 这两个文件。

  1. 找到配置文件

    • 找到 RocketMQ 的安装目录下的 conf 文件夹。
    • 在 conf 文件夹中,你会看到 broker.conf 和 namesrv.conf
  2. 修改 Broker 端口

    • 打开 broker.conf 文件。
    • 查找 listen_port 这一行,这是 Broker 的监听端口,默认通常是 10911
    • 更改 listen_port 的值为所需的端口号。
  3. 修改 NameServer 端口

    • 打开 namesrv.conf 文件。
    • 查找  NAMESRV_PORT 这一行,这是 NameServer 的监听端口,默认通常是 9876
    • 更改 NAMESRV_PORT 的值为所需的端口号。
2 端口修改方式二(当前最新的版本5.x)

在使用上面的方式修改端口后发现失效,只能查看源代码:

看NameSrv模块源码发现服务固定是9876,后面通加载参数c 判断配置文件路径加载,如下图:

跟进MixAll.properties2Object,发现只是根据类的参数与类型匹配加载,如下图:

直接看配置参数类:

这次我们要改端口,所以新建个配置文件,只需增加端口配置即可:

文件内容就一个字段: listenPort = xxxx 端口号,如下图:

启动测试,修改成功如下图:

# 启动名称服务  xxx.conf是配置文件路径,可以使用相对路径
nohup sh bin/mqnamesrv -c xxx.conf &

查看了 broker 模块源码发现启动也是一样,所以端口修改方式也是同上 ,修改后启动成功如下图:

3 启停脚本

  1 启动RocketMQ脚本:

  启动脚本,只需执行脚本就可以快速启动MQ,以下是启动脚本代码如下:

#!/bin/bash

# 启动 Nameserver
echo 'Starting MQ NameServer...'
nohup sh bin/mqnamesrv > mq.log 2>&1 &
sleep 5

# 检查 Nameserver 是否启动成功
if ps aux | grep -v grep | grep -q 'mqnamesrv'; then
    echo 'MQ NameServer started successfully.'
else
    echo 'Failed to start MQ NameServer.'
    exit 1
fi

# 启动 Broker
echo 'Starting MQ Broker...'
nohup sh bin/mqbroker -n 0.0.0.0:9876 >> mq.log 2>&1 &
sleep 5

# 检查 Broker 是否启动成功
if ps aux | grep -v grep | grep -q 'mqbroker'; then
    echo 'MQ Broker started successfully.'
else
    echo 'Failed to start MQ Broker.'
    exit 1
fi

# 显示日志
tail -f mq.log

执行后同时打印日记,退出只需按ctrl+c即可。执行启动脚本成功如下图:

  2 停止脚本: 

  停止RocketMQ脚本:

#!/bin/bash

# 关闭 Nameserver
echo 'Closing MQ NameServer...'
sh bin/mqshutdown namesrv

# 检查 Nameserver 是否成功关闭
sleep 5
if ! ps aux | grep -v grep | grep -q 'mqnamesrv'; then
    echo 'MQ NameServer closed successfully.'
else
    echo 'Failed to close MQ NameServer.'
    exit 1
fi

# 关闭 Broker
echo 'Closing MQ Broker...'
sh bin/mqshutdown broker

# 检查 Broker 是否成功关闭
sleep 20
if ! ps aux | grep -v grep | grep -q 'mqbroker'; then
    echo 'MQ Broker closed successfully.'
else
    echo 'Failed to close MQ Broker.'
    exit 1
fi

执行停止脚本成功停止,如下图:

注意:停止broker服务花的时间通常比较长,如果显示停止失败可以多次调用或者在脚本延长等待时间即可。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2053306.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

安装搭建MongoDB及配置副本集

目录 一、什么是MongoDB的副本集 简介 &#xff08;1&#xff09;冗余和数据可用性 &#xff08;2&#xff09;MongoDB中的复制 &#xff08;3&#xff09;主从复制和副本集区别 二、副本集的架构 三、副本集的成员 四、部署副本集 1、节点划分 2、安装MongoDB 2.1、…

数据结构与算法——平衡二叉树

1、基本介绍 1&#xff09;平衡二叉树又叫平衡二叉搜索树(Self-balanceing binary search tree)&#xff0c;又被称为AVL树&#xff0c;可以保证查询效率较高。 2&#xff09;具有以下特点&#xff1a;它是一颗空树或它的左右两颗子树的高度差绝对值不超过1&#xff0c;并且左…

网络热门编程项目导学:尚医通

本文作者&#xff1a;程序员鱼皮 免费编程学习 - 编程导航网&#xff1a;https://www.code-nav.cn 现在网上有很多播放量巨高的免费编程项目教程&#xff0c;很多学编程的同学可能都看过&#xff0c;就导致大家可能写在简历上的内容都差不多。 于是就有了下面这张图&#xff1…

python之matplotlib (3 坐标轴设置)

写在前面 在说明坐标轴设置之前&#xff0c;我有必要和大家说清楚图像设置的一些方法&#xff0c;避免陷入困扰模糊的地步。前面我们说过&#xff0c;画图的三种方法&#xff08;python之matplotlib &#xff08;1 介绍及基本用法&#xff09;-CSDN博客&#xff09;。而设置也…

2024年证券从业资格考试题型特点及答题技巧

考试题型、题量、分值 证券从业科目题型题量&#xff1a; 一、单选题(每题0.5分&#xff0c;共40题&#xff0c;共20分) 下列每小题的四个选项中&#xff0c;只有一项是最符合题意的正确答案&#xff0c;多选、错选或不选均不得分。 二、多选题(每题1分&#xff0c;共40题&…

我的Vue2/Vue3知识框架汇总

文章目录 一、前言二、Vue3篇Vue3 相对于 Vue2 做了哪些更新&#xff1f;​Vue3响应式Vue3响应式特点​Object.defineProperty 与 Proxy 的区别​什么是Proxy&#xff1f;​为什么需要 Reflect&#xff1f;(目标对象内部的this指向问题)​Vue3 惰性响应式​Proxy 只会代理对象的…

图片文件比较大怎么办?分享4个简单的在线压缩图片工具

现在经常将图片发布到网上用来展示&#xff0c;但是随着图片质量越高相应的文件也比较大&#xff0c;在遇到图片文件较大问题时&#xff0c;经常会无法正常上传到网站使用&#xff0c;所以一般需要使用图片压缩功能来调整大小后使用。对于经常需要处理图片的小伙伴来说&#xf…

Notion 插件开发入门

Notion 插件开发入门 最近想要用 Notion 记笔记&#xff0c;奈何 Notion 的标签分类功能确实不太好用…… 看了看其它文章中配置多级标签的繁杂流程之后&#xff0c;我觉得还是写一个插件比较靠谱…… 本文主要介绍 Notion 简单的插件开发&#xff0c;编程语言使用 JavaScrip…

打工人的“低成本的高生产力”之ToDesk云电脑

在快节奏的现代生活中&#xff0c;是不是经常觉得钱包瘪得快&#xff0c;但工作压力却大得要命&#xff1f;想要提升效率&#xff0c;又不想掏空腰包&#xff1f;来来来&#xff0c;作为职场老油条&#xff0c; 今儿给你们安利个神器——ToDesk云电脑&#xff0c;简直是咱们打工…

子域名太多如何实现HTTPS?一张通配符SSL证书全搞定

在当今数字化时代&#xff0c;网站安全性已经成为网站运营者以及访问者都非常关注的重要问题。部署SSL证书实现HTTPS加密&#xff0c;确保数据传输安全&#xff0c;防止信息被泄露或篡改&#xff0c;消除浏览器“不安全”提示&#xff0c;提高网站安全性以及可信任度已成为必然…

期权应该怎么及时止损?期权止损有哪些方式?

今天带你了解期权应该怎么及时止损&#xff1f;期权止损有哪些方式&#xff1f;如何在期权的交易计划中设置合适的止损点”是相对简单的事情。 如果自己遭遇了一定的损失&#xff0c;就要及时止损。 一般来说如果亏损接近30%的时候就可以进行止损了。但是不同的投资者资金实力…

集合及数据结构第二节————算法、时间复杂度和空间复杂度

系列文章目录 集合及数据结构第二节————算法、时间复杂度和空间复杂度 算法、时间复杂度和空间复杂度 数据结构和算法的关系.算法的定义算法的特性算法设计的要求算法效率时间复杂度的概念大O的渐进表示法常见时间复杂度计算举例常见空间复杂度计算举例 文章目录 系列文…

关于windows环境使用nginx的一些性能问题

遇到的问题 最近在一个windows环境中部署nginx&#xff0c;遇到了以下问题&#xff1a; 1. nginx启动了九个线程&#xff08;1master8woekr&#xff09;&#xff0c;但是所有链接都被1个woker接收&#xff0c;其余worker不工作 2. 用户端访问web很慢&#xff0c;登录服务器使…

xcode配置使用摄像头和相册权限,没有Info.plist文件也可以配置,解决Thread 4: signal SIGABRT报错问题

最新的Xcode更改了相册和相机的权限关键字&#xff0c;在进行真机调试&#xff0c;或真正在用户使用的时候需要添加这些权限&#xff0c;否则在程序正确时仍然会产生下面的错误&#xff1a; Thread 4: signal SIGABRT This app has crashed because it attempted to access pri…

【Web APIs】JavaScript 操作元素 ④ ( 修改元素属性示例 | 密码表单标签结构 | 密码输入框样式设置 | 右侧图标按钮设置 | JavaScript 修改元素属性示例 )

文章目录 一、案例需求二、关键要点1、密码表单标签结构2、设置盒子样式3、密码输入框样式设置4、右侧图标按钮设置5、盒子模型右侧图标按钮设置 三、JavaScript 修改元素属性示例四、完整代码示例 JavaScript 中 可以通过 DOM ( 文档对象模型 ) 操作 来 修改网页的 内容 , 结构…

牛客JS题(四十六)斐波那契数列

注释很详细&#xff0c;直接上代码 涉及知识点&#xff1a; 递归斐波那契数列 题干&#xff1a; 我的答案 <!DOCTYPE html> <html><head><meta charset"UTF-8" /><style>/* 填写样式 */</style></head><body><!-…

宝马销量崩了,不卷价格就卷铺盖

文 | AUTO芯球 作者 | 雷慢 宝马这回真天塌了&#xff0c; 还记得7月初宝马宣布涨价吗&#xff0c; 我当初就劝我那准备宝马i3的同学说&#xff0c; 别急&#xff0c;怎么涨上去的就会怎么跌回去。 这不&#xff0c;一涨价&#xff0c;价格是保住了&#xff0c;但是销量惨…

【获取本机简要配置信息】(bat)

输出结果(示例)如下 如果提示 ‘系统找不到指定的路径’ 请把set Log那行的路径换一下&#xff0c;换成一个存在的路径就行 比如直接放C盘 set LogC:\本机配置信息.txt 如果提示 “客户端没有所需的特权” 请右键后选择 “以管理员身份运行” 上代码 echo off::设置信息保存路…

【17】暴力递归改dp(下)

目录 一.两人玩游戏 二.象棋游戏 三.鲍勃存活 四.凑钱方案数问题 一.两人玩游戏 题目&#xff1a;有一个正整数数组&#xff0c;A和B两个人轮流拿数组的最左或最右的数值&#xff0c;返回最终的最高分数。 暴力递归版本 public static int win1(int[] arr) {if (arr null…

Chat App 项目之解析(三)

Chat App 项目介绍与解析&#xff08;一&#xff09;-CSDN博客文章浏览阅读76次。Chat App 是一个实时聊天应用程序&#xff0c;旨在为用户提供一个简单、直观的聊天平台。该应用程序不仅支持普通用户的注册和登录&#xff0c;还提供了管理员登录功能&#xff0c;以便管理员可以…