【RocketMQ入门-安装部署与Java API测试】

news2024/9/26 3:30:47

【RocketMQ入门-安装部署与Java API测试】

    • 一、环境说明
    • 二、安装部署
    • 三、Java API 编写Producer和Consumer进行测试
    • 四、小结

一、环境说明

  1. 虚拟机VWMare:安装centos7.6操作系统
  2. 源码包:rocketmq-all-5.1.3-source-release.zip
  3. 单master部署,在一台虚拟机上安装部署name server和proxy以及broker
  4. 流程图:
    在这里插入图片描述

二、安装部署

  1. 源码包安装需要事先安装部署maven,下载apache-maven-3.6.3-bin.tar.gz安装包,然后解压并配置环境变量,如下命令:

    tar -zvxf apache-maven-3.6.3-bin.tar.gz -C /training/
    

    配置环境变量(此处是用root安装),编辑:vi ~/.bash_profile,在文件末尾添加如下内容:

    #maven
    export MVN_HOME=/training/apache-maven-3.6.3
    export PATH=$MVN_HOME/bin:$PATH
    

    执行:source ~/.bash_profile 使环境生效。

  2. 进入/training/apache-maven-3.6.3/conf目录下,配置maven的仓库为阿里云和华为云仓库,执行如下命令:

    cd /training/apache-maven-3.6.3/conf/
    mv settings.xml settings.xml.backup
    vi settings.xml
    

    在打开的settings.xml中,粘贴如下内容即可:

    <?xml version="1.0" encoding="utf-8"?>
    <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="         http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
        <mirrors>
    		<mirror>
    				<id>aliyunmaven</id>
    				<mirrorOf>*</mirrorOf>
    				<name>阿里云公共仓库</name>
    				<url>https://maven.aliyun.com/repository/public</url>
    		</mirror>
    		 <mirror>
    				<id>huaweicloud</id>
    				<mirrorOf>central</mirrorOf>
    				<name>huaweicloud maven</name>
    				<url>https://mirrors.huaweicloud.com/repository/maven/</url>
    		</mirror>
    
        </mirrors>
        <profiles>
            <profile>
    
    			<repositories>
    					<repository>
    					  <id>central</id>
    					  <url>https://maven.aliyun.com/repository/central</url>
    					  <releases>
    							<enabled>true</enabled>
    					  </releases>
    					  <snapshots>
    							<enabled>true</enabled>
    					  </snapshots>
    					</repository>
    			</repositories>
            </profile>
        </profiles>
    </settings>
    
  3. 由于CentOS7.6最小模式安装没有unzip命令,需要事先安装,执行如下命令安装:

    yum install unzip -y
    
  4. 解压源码包rocketmq-all-5.1.3-source-release.zip,进入到解压后的目录下,然后编译安装,执行如下命令:

    unzip rocketmq-all-5.1.3-source-release.zip
    cd rocketmq-all-5.1.3-source-release/
    mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
    
  5. 第5步骤正确后,进入到 /rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3目录下,然后启动NameServer,执行如下命令:

    cd  /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
    nohup sh bin/mqnamesrv &
    
  6. 验证NameServer是否启动成功,执行如下命令:

    tail -f ~/logs/rocketmqlogs/namesrv.log
    

    会看到如下内容,说明已经正常启动了

    The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
    或者执行jps命令查看是否已经有了NameServer进程:NamesrvStartup,如有说明ok

  7. 第5、6步骤正确后,进入到 /rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3目录下,然后启动Broker和Proxy,执行如下命令:注意:NameServer成功启动后,我们启动Broker和Proxy,5.x 版本下我们建议使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。5.x 版本也支持 Broker 和 Proxy 分离部署以实现更灵活的集群能力。详情参考其他教程。

    cd  /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
    nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
    
  8. 验证NameServer是否启动成功,执行如下命令:

    tail -f ~/logs/rocketmqlogs/proxy.log
    

    会看到如下内容,说明已经正常启动了

    The broker[broker-a, 192.168.36.132:10911] boot success. serializeType=JSON and name server is localhost:9876
    或者执行jps命令查看是否已经有了:ProxyStartup 进程,如有说明ok

三、Java API 编写Producer和Consumer进行测试

  1. 上述正常启动NameServer和Broker及Proxy后,首先需要创建名为TestTopic的Topic,执行如下命令:
    cd  /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
    sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
    
    查看新创建的Topic,验证是否已经创建好,执行:
    sh bin/mqadmin topicList -n localhost:9876
    
    结果如下:
    在这里插入图片描述
  2. 创建消费者组,执行如下命令:
    cd  /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
    sh bin/sh mqadmin updateSubGroup -g testgroup -c DefaultCluster -n localhost:9876
    
    执行命令无任何错误即说明已经创建成功。
  3. 在Idea中创建Maven工程,添加rocketmq依赖,添加如下依赖到pom.xml中:
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <rocketmq-client-java-version>5.0.5</rocketmq-client-java-version>
        <slf4j.version>1.7.25</slf4j.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>${rocketmq-client-java-version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
    </dependencies>
    
  4. 编写ProducerTest生产者,代码如下:
    import org.apache.rocketmq.client.apis.ClientConfiguration;
    import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
    import org.apache.rocketmq.client.apis.ClientException;
    import org.apache.rocketmq.client.apis.ClientServiceProvider;
    import org.apache.rocketmq.client.apis.message.Message;
    import org.apache.rocketmq.client.apis.producer.Producer;
    import org.apache.rocketmq.client.apis.producer.SendReceipt;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    public class ProducerTest {
        private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);
    
        public static void main(String[] args) throws Exception {
            testMain();
        }
    
        public static void testMain() throws ClientException, IOException {
            // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
            String endpoint = "192.168.36.132:8081";
            // 消息发送的目标Topic名称,需要提前创建。
            // 执行:sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
            String topic = "TestTopic";
            ClientServiceProvider provider = ClientServiceProvider.loadService();
            ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
            ClientConfiguration configuration = builder.build();
            // 初始化Producer时需要设置通信配置以及预绑定的Topic。
            Producer producer = provider.newProducerBuilder()
                    .setTopics(topic)
                    .setClientConfiguration(configuration)
                    .build();
            int temp = 0;
            while (true) {
                String msg = "第 " + temp + " 条消息,我喜欢rocketmq!!";
                temp++;
                // 普通消息发送。
                Message message = provider.newMessageBuilder()
                        .setTopic(topic)
                        // 设置消息索引键,可根据关键字精确查找某条消息。
                        .setKeys("messageKey")
                        // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                        .setTag("messageTag")
                        // 消息体。
                        .setBody(msg.getBytes())
                        .build();
                try {
                    // 发送消息,需要关注发送结果,并捕获失败等异常。
                    SendReceipt sendReceipt = producer.send(message);
                    Thread.sleep(1000);
                    logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
                } catch (Exception e) {
                    logger.error("Failed to send message", e);
                }
            }
            // producer.close();
        }
    }
    
  5. 编写CommonUtils工具类,用于将ByteBuffer转成String,代码如下:
    import java.nio.ByteBuffer;
    import java.nio.charset.Charset;
    import java.nio.charset.StandardCharsets;
    
    public class CommonUtils {
    
        public static void main(String[] args) {
            System.out.println("Hello world!");
        }
    
        public static String decodeKey(ByteBuffer bytes) {
            Charset charset = StandardCharsets.UTF_8;
            return charset.decode(bytes).toString();
        }
    
        
        public static byte[] decodeValue(ByteBuffer bytes) {
            int len = bytes.limit() - bytes.position();
            byte[] bytes1 = new byte[len];
            bytes.get(bytes1);
            return bytes1;
        }
    
        
        public static ByteBuffer encodeKey(String key) {
            return ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8));
        }
    
        public static ByteBuffer encodeValue(byte[] value) {
            ByteBuffer byteBuffer = ByteBuffer.allocate(value.length);
            byteBuffer.clear();
            byteBuffer.get(value, 0, value.length);
            return byteBuffer;
        }
    }
    
  6. 编写ConsumerTest生产者,代码如下:
    import java.util.Collections;
    import org.apache.rocketmq.client.apis.ClientConfiguration;
    import org.apache.rocketmq.client.apis.ClientServiceProvider;
    import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
    import org.apache.rocketmq.client.apis.consumer.FilterExpression;
    import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
    import org.apache.rocketmq.client.apis.consumer.PushConsumer;
    import org.rocketmq.producer.CommonUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class PushConsumerTest {
        private static final Logger logger = LoggerFactory.getLogger(PushConsumerTest.class);
    
        private PushConsumerTest() {
        }
    
        public static void main(String[] args) throws Exception {
            final ClientServiceProvider provider = ClientServiceProvider.loadService();
            // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
            String endpoints = "192.168.36.132:8081";
            ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                    .setEndpoints(endpoints)
                    .build();
            // 订阅消息的过滤规则,表示订阅所有Tag的消息。
            String tag = "*";
            FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
            // 为消费者指定所属的消费者分组,Group需要提前创建。
            // 执行:sh bin/sh mqadmin updateSubGroup -g testgroup -c DefaultCluster -n localhost:9876
            String consumerGroup = "testgroup";
            // 指定需要订阅哪个目标Topic,Topic需要提前创建。
            String topic = "TestTopic";
            // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
            PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                    .setClientConfiguration(clientConfiguration)
                    // 设置消费者分组。
                    .setConsumerGroup(consumerGroup)
                    // 设置预绑定的订阅关系。
                    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                    // 设置消费监听器。
                    .setMessageListener(messageView -> {
                        // 处理消息并返回消费结果。
                        logger.info("Consume message successfully, messageId={},messageBody={}", messageView.getMessageId(), CommonUtils.decodeKey(messageView.getBody()));
                        return ConsumeResult.SUCCESS;
                    })
                    .build();
            Thread.sleep(Long.MAX_VALUE);
            // 如果不需要再使用 PushConsumer,可关闭该实例。
            // pushConsumer.close();
        }
    }
    
  7. 为了能查看到控制台日志输入,需要在resources目录下新建log4j.properties、log4j2.properties,具体内容如下:
    log4j.properties内容:
    log4j.rootLogger=INFO,console
    
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.out
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
    
    log4j2.properties内容:
    name = PropertiesConfig
    property.filename = target/logs
    
    #appenders = console, file
    #配置值是appender的类型,并不是具体appender实例的name
    appenders = rolling
    
    appender.rolling.type = RollingFile
    appender.rolling.name = RollingLogFile
    appender.rolling.fileName=${filename}/automationlogs.log
    appender.rolling.filePattern = ${filename}/automationlogs-%d{MM-dd-yy-HH-mm-ss}-%i.log
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern=[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 5
     
    rootLogger.level = INFO,console
    rootLogger.appenderRef.rolling.ref = RollingLogFile
    
  8. 到此,完成了所有准备工作了,整个工程如下所示:
    在这里插入图片描述
  9. 运行ProducerTest程序进行消息的发送,控制台中会看到如下内容:
    在这里插入图片描述
  10. 运行ConsumerTest程序接收消息,控制台中会看到如下内容:
    在这里插入图片描述

四、小结

至此,一个单节点副本的 RocketMQ 集群已经部署起来了,我们也通过编写Java程序进行简单的消息收发。如本文对您有帮助,麻烦您动动发财的手指点个赞~~~~~,谢谢您的阅读!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/853931.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【论文研读】MARLlib 的架构分析

【论文研读】MARLlib: A Scalable Multi-agent Reinforcement Learning Library 和尚念经 多智能体强化学习框架研究。 多智能体强化学习库。 多智能体强化学习算法实现。 多智能体强化学习环境的统一化&#xff0c;标准化。 多智能体强化学习算法解析。 多智能体强化学习 算法…

Kafka:springboot集成kafka收发消息

kafka环境搭建参考Kafka&#xff1a;安装和配置_moreCalm的博客-CSDN博客 1、springboot中引入kafka依赖 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><…

如何用Postman做接口自动化测试,你知道么?

什么是自动化测试&#xff1f; 把人对软件的测试行为转化为由机器执行测试行为的一种实践。 例如GUI自动化测试&#xff0c;模拟人去操作软件界面&#xff0c;把人从简单重复的劳动中解放出来。 本质是用代码去测试另一段代码&#xff0c;属于一种软件开发工作&#xff0c;已…

数据结构入门:栈

目录 前言 1. 栈 1.1栈的概念及结构 1.2 栈的实现 1.2.1 栈的定义 1.2.2 栈的初始化 1.2.3 入栈 1.2.4 出栈 1.2.5 栈的元素个数 1.2.6 栈顶数据 1.2.7 栈的判空 2.栈的应用 2.1 题目一&#xff1a;括号匹配 2.1.1 思路 2.1.2 分析 2.1.3 题解 总结 前言 无论你是计算机科学专…

初学者自学python哪本书好,python教程自学全套

大家好&#xff0c;小编来为大家解答以下问题&#xff0c;python怎么自学,可以达到什么程度&#xff0c;初学者自学python哪本书好&#xff0c;现在让我们一起来看看吧&#xff01; 前言 Python是一个非常适合自学&#xff0c;0基础的话从入门到精通也只需要花3-4个月PYTHON库“…

诚迈科技亮相华为开发者大会2023,打造万物互联全场景生态

8月4-6日&#xff0c;华为开发者大会2023在中国松山湖盛大举行&#xff0c;诚迈科技作为华为合作伙伴携一系列基于OpenHarmony和HarmonyOS Connect的创新技术及生态成果&#xff0c;精彩亮相OpenHarmony共建展区、OpenHarmony使能展区和鸿蒙智联展区&#xff0c;吸引了众多行业…

nginx负载均衡(nginx结束)

本节主要内容 1、四层&#xff0c;七层代理的配置方法 2、负载均衡的算法 nginx负载均衡&#xff1a;反向代理来实现 反向代理有两种转发方式&#xff1a;1、四层代理 2、七层代理 Nginx的七层代理和四层代理 七层是最常见的反向代理方式&#xff0c;只能配置在nginx配置文…

基于SDK方式的小程序监控

基于SDK方式的小程序监控 一、背景 微信小程序自 2017 年正式上线以来&#xff0c;就受到商家和开发者的青睐。到 2022 年底&#xff0c;我国的互联网普及率已经高达 75.6%。随着互联网的快速发展&#xff0c;小程序也在快速成长&#xff0c;不仅使用人数在逐年攀升&#xff…

宝尊电商短期前景堪忧,宝尊国际能否取得成功还有待验证

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 核心业务面临短期逆风 在2023年第一季度财报中&#xff0c;宝尊电商&#xff08;BZUN&#xff09;表示其电商业务(简称BEC)主要包括&#xff1a;品牌的门店运营、客户服务以及物流和供应链管理、IT和数字营销等增值服务”。…

词嵌入、情感分类任务

目录 1.词嵌入&#xff08;word embedding&#xff09; 对单词使用one-hot编码的缺点是难以看出词与词之间的关系。 所以需要使用更加特征化的表示&#xff08;featurized representation&#xff09;&#xff0c;如下图所示&#xff0c;我们可以得到每个词的向量表达。 假设…

php webshell 免杀入门

webshell 查杀软件&#xff1a; d盾、安全狗、护卫神、Sangfor WebShellKill 在线查杀 百度WEBDIR https://scanner.baidu.com 河马 https://www.shellpub.com cloudwalker牧云 https://webshellchop.chaitin.cn 查杀技术 静态检测、动态检测、日志检查 静态检查&#xff1a…

感觉和身边其他人有差距?你的感觉我懂!

在我们的成长历程中&#xff0c;总要经历不同的人和事&#xff0c;身边不乏比我们优秀&#xff0c;比我们厉害的人&#xff0c;这个是无可避免的&#xff0c;也是无法选择的&#xff0c;但是可以选择的是&#xff1a;我们怎么做&#xff01; 目录 我的情况事件感受 我的解法心态…

【人工智能前沿弄潮】—— 玩转SAM(Segment Anything)

玩转SAM(Segment Anything) 官网链接&#xff1a; Segment Anything | Meta AI (segment-anything.com) github链接&#xff1a; facebookresearch/segment-anything: The repository provides code for running inference with the SegmentAnything Model (SAM), links fo…

企业服务器被devos勒索病毒攻击后怎么处理,devos勒索病毒如何攻击的

众所周知&#xff0c;科学技术是第一生产力&#xff0c;科学技术的发展给企业与人们的生活带来了极大变化&#xff0c;但随之而来的网络安全威胁也不断增加。最近&#xff0c;我们收到很多企业的求助&#xff0c;企业的计算机服务器遭到了devos勒索病毒的攻击&#xff0c;导致企…

华为、腾讯、淘宝面试流程+面试技术题分析,速看!

IT 是一个人才日益紧缺的行业&#xff0c;随着人才的紧缺&#xff0c;IT 业的薪水也是水涨船高。互联网巨头们对人才的争夺&#xff0c;更是日益激烈化。对于从事 IT 的职场人士&#xff0c;绝大部门都想进入那些巨头公司。除了薪水高、办公环境好&#xff0c;发展更有前景外&a…

unity海康威视原生SDK拉取网络摄像头画面,并展示在一个Material上

原理是使用sdk获取视频流&#xff0c;格式为YUV&#xff0c;然后分离YUV通道到三张不同的Texture2D上&#xff0c;通过shader将三个通道重新输出为原始图像。 我将所用的各个部分已经整理成一个压缩包&#xff0c;免积分下载 压缩包结构如下 使用步骤 1 DLL:放在Plugins文件…

东南亚调研

东南亚地理 东南亚经济 https://zh.m.wikipedia.org/zh-hans/%E4%B8%9C%E7%9B%9F%E5%9B%BD%E5%AE%B6GDP%E5%88%97%E8%A1%A8 东南亚人口结构 东南亚一半以上的人口年龄在 30 岁以下 https://population-pyramid.net/zh-cn 东南亚数字经济 原文件&#xff1a; 谷歌关于东南亚数…

Java整合Selenium录制视频

捕捉视频 有时候我们未必能够分析故障只需用日志文件或截图的帮助。有时捕获完整的执行视频帮助。让我们了解如何捕捉视频。 我们将利用Monte媒体库的执行相同。 配置 第1步&#xff1a;导航到URL下载屏幕记录JAR&#xff0c;如下图所示。 http://www.randelshofer.ch/monte…

20230809在WIN10下使用python3批量将TXT文件转换为SRT文件

20230809在WIN10下使用python3批量将TXT文件转换为SRT文件 2023/8/9 17:30 由于喜欢看纪录片等外文视频&#xff0c;通过剪映/PR2023/AUTOSUB识别字幕之后&#xff0c;可以通过google翻译识别为简体中文的DOCX文档。 DOCX文档转换为TXT文档之后&#xff0c;还需要转换为SRT文档…

Linux(CentOS7)搭建达梦数据库

参考官方文档 本文记录一下达梦数据库的安装步骤&#xff0c;在安装的过程中出现了一些问题&#xff0c;进行了处理。 文章目录 安装前准备数据库安装命令行安装图形化安装 配置实例注册服务启动、停止数据库 安装前准备 用户在安装 DM 数据库之前需要检查或修改操作系统的配…