RocketMQ单机部署完整学习笔记

news2024/9/21 18:45:38

文章目录

  • 前言
  • 一、RocketMQ是什么?
  • 二、使用步骤
    • 1.安装MQ
      • 1.安装JDK
      • 2.安装mq
      • 3.MQ配置(核心)
    • 2.搭建可视化dashboard
      • 1.下载源码
      • 2.修改配置
      • 3.启动
    • 3.整合java
      • 1.生产者
      • 2.消费者
      • 3.启动生产者
      • 4.启动消费者
      • 5.dashboard添加消费组
  • 三、总结
    • 全部的配置

前言

本文是基于4.X版本RocketMQ,从MQ的搭建,消息推送和消费以及dashboard的使用

一、RocketMQ是什么?

参考文档 https://rocketmq.apache.org/zh/docs/4.x/
重点角色如下

  • Producer:消息的发送者;举例:发信者
  • Consumer:消息接收者;举例:收信者
  • Broker:暂存和传输消息;举例:邮局
  • NameServer:管理Broker;举例:各个邮局的管理机构
  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息;举例
  • tag:消息标签,方便服务器过滤使用

二、使用步骤

1.安装MQ

首先安装jdk 再安装mq

1.安装JDK

  1. 查看Linux系统是否有自带的jdk
java -version

如果有 输入 rpm -qa | grep java 检测jdk的安装包
接着进行一个个删除包,输入:rpm -e --nodeps +包名
最后再次:rpm -qa | grep java检查是否删除完即可

  1. 下载jdk

https://www.oracle.com/java/technologies/downloads/#java8
资源连接地址
还在审核中
3. 上传jdk到linux服务器
在这里插入图片描述

  1. 解压jdk
tar -zvxf jdk-8u241-linux-x64..tar.gz
  1. 配置环境变量
    用vim /etc/profile进入编辑状态,加入下边这段配置
    JAVA_HOME 根据自己的解压路径来写
JAVA_HOME=/usr/local/jdk/jdk1.8.0_241
JRE_HOME=$JAVA_HOME/jre
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATH

  1. 重新加载配置
source /etc/profile
  1. 进行测试
java -version

在这里插入图片描述

2.安装mq

  1. 下载mq
    连接 https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-source-release.zip

  2. 解压上传
    我下载的是公司的mq是4.8的,官网链接给的是根4.9的,这个问题不大,不影响

目录结构如下
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

  1. 配置环境变量
vim /etc/profile
# 在末尾加入下面配置 路径和自己解压的mq路径一直 上一步有截图
export ROCKETMQ_HOME=/bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0
# 使环境变量生效
source /etc/profile

3.MQ配置(核心)

这一步很重要,配置完这里,那mq就算部署好了

  1. 修改runserver.sh
    默认配置比较大,修改启动大小
## cd /bin
vim runserver.sh
## 修改启动大小
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

  1. 启动服务nameserver
## 启动
nohup sh bin/mqnamesrv &
## 关闭
sh bin/mqshutdown namesrv

出现下面就算启动成功了
The Name Server boot success. serializeType=JSON
注意目录别进错了
在这里插入图片描述

  1. 指定NameServer地址
    相当于 broker注册到nameserver上
vim /etc/profile
# 在末尾加入下面配置 有多个时以分号隔开,这个是集群时使用的 mq端口默认是9876 
# 192.168.141.101是服务器地址
export NAMESRV_ADDR=192.168.141.101:9876
# 使环境变量生效
source /etc/profile
  1. 修改 runbroker.sh

修改启动参数

vim runbroker.sh
## 
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"

  1. 修改broker.conf
    重要,核心配置,以后关于mq服务的配置都在这里
    conf目录下
    2m-2s-async
    2m-2s-sync
    2m-noslave
    dledger
    这四个目录是集群配置时会用到,我们这路是单机的先不管
vim broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
#brokerid,0就表示是Master,>0的都是表示
brokerId = 0
# 这个就是第三三步配置的export NAMESRV_ADDR=192.168.141.101:9876 多个以分号分割
namesrvAddr=192.168.141.101:9876
#如果是多网卡的机器,比如云服务器,那么需要在broker.conf中增加brokerIP1属性,
#指定所在机器的外网网卡地址
brokerIP1=192.168.141.101
#对外服务的监听端口
listenPort=10911
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
traceTopicEnable=true
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true 


在这里插入图片描述
在这里插入图片描述

  1. 启动broker
    进入bin目录
    注意 -c 很多人启动不起来就是没加上
#启动
nohup sh  bin/mqbroker -c conf/broker.conf &
# 关闭
sh bin/mqshutdown broker

查看nohup.out
出现这样的就说明启动成功了
The broker[broker-a, 192.168.141.101:10911] boot success. serializeType=JSON and name server is 192.168.141.101:9876

在这里插入图片描述

  1. 到此mq服务已启动完成
jps

在这里插入图片描述

  1. 查看日志
## 查看namesrv日志
 tailf /root/logs/rocketmqlogs/namesrv.log 
 ## broker日志
 tailf /root/logs/rocketmqlogs/broker.log 

其实前面启动的时候查看这里的日志也可以看是否启动没
停掉时先停broker 再停namesrv 启动先启 namesrv 再启动broker 因为broker 需要注册到namesrv 上

  1. 发送和接收消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

在这里插入图片描述

在这里插入图片描述

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

在这里插入图片描述

2.搭建可视化dashboard

1.下载源码

点击下面直接下载4.X的
https://github.com/apache/rocketmq-dashboard/tree/release-1.0.0
说下
现在MQ已经到5.X了,但是现在还保留着4的,分支下拉到最后可以看到一个relaese-1.0.0
这个就是4.X用的,下载下来后解压
切记版本要对上,不然你和我一样折腾个一两天

在这里插入图片描述

2.修改配置

主要改下这个

rocketmq.config.namesrvAddr=192.168.141.101:9876

如果自己端口需要修改也可以,我是改成了8078
在这里插入图片描述

3.启动

在这里插入图片描述
访问 http://localhost:8078
在这里插入图片描述

3.整合java

1.生产者

直接上代码

package com.bsoft;

import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.IOException;

/**
 * @author:
 * @time: 2023/12/29 15:40
 */
public class MQPublisher {
    private final static String nameServer = "192.168.141.101:9876";

    private final static String producerGroup = "my_group";
    private final static String consumerGroup = "my_group";

    private final static String topic = "topic_test";
    public static void main(String[] args) throws IOException {
        // 初始化一个producer并设置Producer group name
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        try {
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 启动producer
            producer.start();
            // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
            Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 异步发送消息, 发送结果通过callback返回给客户端
            producer.send(msg, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("OK %s %n",
                            sendResult.getMsgId());
                }

                public void onException(Throwable e) {
                    System.out.printf("Exception %s %n", e);
                    e.printStackTrace();
                }
            },10000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.in.read();
    }
}

2.消费者

package com.bsoft;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;

/**
 * @author: liucheng
 * @time: 2023/12/29 15:39
 */
public class MQConsumer {
    private final static String nameServer = "192.168.141.101:9876";

    private final static String consumerGroup = "my_group_test";

    private final static String topic = "topic_test";
    public static void main(String[] args) throws MQClientException, IOException, InterruptedException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup,true);
        // 设置NameServer的地址
        consumer.setNamesrvAddr(nameServer);
        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe(topic, "tagE");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                msgs.forEach((msg)->{
                    byte[] body = msg.getBody();
                    String s = new String(body, Charset.defaultCharset());
                    System.out.println("msg=================> " +s);

                });
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started......");
//        Thread.sleep(5000);
//        consumer.shutdown();
        System.in.read();
    }
}


3.启动生产者

在这里插入图片描述

在这里插入图片描述

查看详情
在这里插入图片描述

4.启动消费者

package com.bsoft;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;

/**
 * @author: liucheng
 * @time: 2023/12/29 15:39
 */
public class MQConsumer {
    private final static String nameServer = "192.168.141.101:9876";

    private final static String consumerGroup = "my_group_test";

    private final static String topic = "topic_test";
    public static void main(String[] args) throws MQClientException, IOException, InterruptedException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        // 设置NameServer的地址
        consumer.setNamesrvAddr(nameServer);
        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe(topic, "tagA");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                msgs.forEach((msg)->{
                    byte[] body = msg.getBody();
                    String s = new String(body, Charset.defaultCharset());
                    System.out.println("msg=================> " +s);

                });
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started......");
//        Thread.sleep(5000);
//        consumer.shutdown();
        System.in.read();
    }
}

在这里插入图片描述
没日志打印,但是又显示消费了
在这里插入图片描述

5.dashboard添加消费组

没查询到那添加一个
在这里插入图片描述
在这里插入图片描述
重启生产者发现可以消费

对于消费组还是得在dashboard创建好了再去写代码,有的说能够改配置能否直接创建,试过了,没生效,先这样吧,有好的方法或搭建过程中遇到什么问题可以私聊我,看到及时回答

三、总结

整个搭建过程踩了不少坑,比如
版本的不一致导致部分功能一直报错;
启动brocker时未指定实例文件没有加-c来启动导致部署失败;消
费组未在dashboard创建时代码中不显示消费信息
关于5.x无法打包的问题是因为缺少yarn-1.22.10.tar.gz 这个已经上传到jdk那个资源下了,把这个复制到
{path}\maven\repository\com\github\eirslett\yarn\1.22.10再打包即可

全部的配置

#  cat /etc/profile

JAVA_HOME=/usr/local/jdk/jdk1.8.0_241
JRE_HOME=$JAVA_HOME/jre
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATH
export ROCKETMQ_HOME=/bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0
export NAMESRV_ADDR=192.168.141.101:9876
#  cat runbroker.sh 
 
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
# cat runserver.sh 
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

# cat broker.conf 
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
namesrvAddr=192.168.141.101:9876
brokerIP1=192.168.141.101
listenPort=10911
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
traceTopicEnable=true
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1354206.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据库攻防学习

免责声明 本文仅供学习和研究使用,请勿使用文中的技术用于非法用途,任何人造成的任何负面影响,与本号及作者无关。 Redis 0x01 redis学习 在渗透测试面试或者网络安全面试中可能会常问redis未授权等一些知识&#xff0c;那么什么是redis&#xff1f;redis就是个数据库&#xff…

【UEFI基础】EDK网络框架(通用函数和数据)

通用函数和数据 DPC DPC全称Deferred Procedure Call。Deferred的意思是“延迟”&#xff0c;这个DPC的作用就是注册函数&#xff0c;然后在之后的某个时刻调用&#xff0c;所以确实是有“延迟”的意思。DPC在UEFI的实现中包括两个部分。一部分是库函数DxeDpcLib&#xff0c;…

知识付费平台搭建?找明理信息科技,专业且高效

明理信息科技知识付费saas租户平台 在当今数字化时代&#xff0c;知识付费已经成为一种趋势&#xff0c;越来越多的人愿意为有价值的知识付费。然而&#xff0c;公共知识付费平台虽然内容丰富&#xff0c;但难以满足个人或企业个性化的需求和品牌打造。同时&#xff0c;开发和…

【MATLAB第88期】基于MATLAB的6种神经网络(ANN、FFNN、CFNN、RNN、GRNN、PNN)多分类预测模型对比含交叉验证

【MATLAB第88期】基于MATLAB的6种神经网络&#xff08;ANN、FFNN、CFNN、RNN、GRNN、PNN&#xff09;多分类预测模型对比含交叉验证 前言 本文介绍六种类型的神经网络分类预测模型 1.模型选择 前馈神经网络 (FFNN) 人工神经网络 (ANN) 级联前向神经网络 (CFNN) 循环神经网…

QT上位机开发(串口界面设计)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 如果上位机要和嵌入式设备进行打交道的话&#xff0c;那么串口可能就是我们遇到的第一个硬件设备。串口的物理接线很简单&#xff0c;基本上就是收…

前端-relation-graph实现关系数据展示(关系图/流程图)

目录 前言&#xff1a; 1. relation-graph 2. relation-graph数据关系组件---官方地址relation-graph - A Relationship Graph Componenthttps://www.relation-graph.com/ 3. 选择relation-graph的理由 4. 项目中引用relation-graph 4.1 下载命令 4.2 在Vue 2 中使用 4…

CGAL的无限制的Delaunay图

本章描述了构建L∞距离下线段Delaunay图的算法和几何特征。这些特征还包括绘制L∞距离下线段Delaunay图对偶&#xff08;即L∞距离下线段Voronoi图&#xff09;边缘的方法。L∞算法和特征依赖于欧几里得&#xff08;或L2&#xff09;距离下的线段Delaunay图算法和特征。L∞度量…

【动态规划】C++算法:44 通配符匹配

作者推荐 【动态规划】【字符串】扰乱字符串 本文涉及的基础知识点 动态规划 LeetCode44 通配符匹配 给你一个输入字符串 (s) 和一个字符模式 &#xff0c;请你实现一个支持 ‘?’ 和 ‘’ 匹配规则的通配符匹配&#xff1a; ‘?’ 可以匹配任何单个字符。 ’ 可以匹配…

动手学深度学习之卷积神经网络之池化层

池化层 卷积层对位置太敏感了&#xff0c;可能一点点变化就会导致输出的变化&#xff0c;这时候就需要池化层了&#xff0c;池化层的主要作用就是缓解卷积层对位置的敏感性 二维最大池化 这里有一个窗口&#xff0c;来滑动&#xff0c;每次我们将窗口中最大的值给拿出来 还是上…

大创项目推荐 深度学习人脸表情识别算法 - opencv python 机器视觉

文章目录 0 前言1 技术介绍1.1 技术概括1.2 目前表情识别实现技术 2 实现效果3 深度学习表情识别实现过程3.1 网络架构3.2 数据3.3 实现流程3.4 部分实现代码 4 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习人脸表情识别系…

【JavaFX】JavaFX11开发踩坑记录

文章目录 技术栈踩坑记录 技术栈 JavaFX 11MavenJDK 11 踩坑记录 这些坑对于初学者很容易踩&#xff0c;JavaFX经常会报错空指针异常遇到其中一个问题可能就会消耗好几天的时间。 JavaFX 采用的是MVC架构设计&#xff0c;页面设计使用 fxml文件&#xff1b;业务逻辑采用Con…

k8s的网络

k8s的网络 k8s中的通信模式&#xff1a; 1、pod内部之间容器与容器之间的通信 在同一个pod中的容器共享资源和网络&#xff0c;使用同一个网络命名空间&#xff0c;可以直接通信的 2、同一个node节点之内&#xff0c;不同pod之间的通信 每个pod都有一个全局的真实的ip地址…

qt 异常汇总

1. C2338 No Q_OBJECT in the class with the signal (编译源文件 ..\..\qt\labelme-master\src\mainwindow.cpp mainwindow头文件中的类没有Q_OBJECT宏定义&#xff0c;或者其子类或者其他依赖没有Q_OBJECT宏定义。 全部qt类都要写上Q_OBJECT. 2. C2385 对connect的访…

AI:116-基于深度学习的视频行为识别与分析

🚀点击这里跳转到本专栏,可查阅专栏顶置最新的指南宝典~ 🎉🎊🎉 你的技术旅程将在这里启航! 从基础到实践,深入学习。无论你是初学者还是经验丰富的老手,对于本专栏案例和项目实践都有参考学习意义。 ✨✨✨ 每一个案例都附带有在本地跑过的关键代码,详细讲解供…

ctfshow——PHP特性

文章目录 web 89web 90web 91web 92web 93web 94web 95web 96web 97web 98web 99 web 89 使用人工分配 ID 键的数值型数组绕过preg_match. 两个函数&#xff1a; preg_match()&#xff1a;执行正则表达式&#xff0c;进行字符串过滤。preg_match函数用法&#xff0c;正则表达式…

uni-app 前后端调用实例 基于Springboot 详情页实现

锋哥原创的uni-app视频教程&#xff1a; 2023版uniapp从入门到上天视频教程(Java后端无废话版)&#xff0c;火爆更新中..._哔哩哔哩_bilibili2023版uniapp从入门到上天视频教程(Java后端无废话版)&#xff0c;火爆更新中...共计23条视频&#xff0c;包括&#xff1a;第1讲 uni…

【Java EE初阶七】多线程案例(生产者消费者模型)

1. 阻塞队列 队列是先进先出的一种数据结构&#xff1b; 阻塞队列&#xff0c;是基于队列&#xff0c;做了一些扩展&#xff0c;适用于多线程编程中&#xff1b; 阻塞队列特点如下&#xff1a; 1、是线程安全的 2、具有阻塞的特性 2.1、当队列满了时&#xff0c;就不能往队列里…

Yapi安装配置(CentOs)

环境要求 nodejs&#xff08;7.6) mongodb&#xff08;2.6&#xff09; git 准备工作 清除yum命令缓存 sudo yum clean all卸载低版本nodejs yum remove nodejs npm -y安装nodejs,获取资源,安装高版本nodejs curl -sL https://rpm.nodesource.com/setup_8.x | bash - #安装 s…

图片中src属性绑定不同的路径

vue3 需求是按钮disable的时候&#xff0c;显示灰色的icon&#xff1b;非disable状态&#xff0c;显示白色的icon 一开始src写成三元表达式&#xff0c;发现不行&#xff0c;网上说src不能写成三元表达式&#xff0c;vue会识别成字符串 最后的解决方案 同时&#xff0c;发现…

win下持续观察nvidia-smi

简介&#xff1a;在Windows操作系统中&#xff0c;没有与Linux中watch命令直接对应的内置工具&#xff0c;但有1种方法快速简单的方法可以实现类似的效果&#xff0c;尤其是用于监控类似于nvidia-smi的命令输出。 历史攻略&#xff1a; Python&#xff1a;查看windows下GPU的…