rocketmq5源码系列--(一)--搭建调试环境

news2025/1/19 8:20:44

说在前头:阿里的rocketmq的文档是真他妈的烂的1b,很多东西都不说,全靠自己看源码,摸索,草,真的要吐血了

rocketmq的版本5而不是版本4,版本5比版本4多了个proxy
rocketmq5 三个组件:namesrv、broker、proxy,所以要启动这三个组件,clientsdk是和proxy通信,proxy和broker通信,

0:rocketmq用的jdk1.8


1:配置环境变量,可以直接在idea里配也可以在windows上配,不过windows上配可能要重启才起作用:
(!!!路径一定要用双斜杠,比如 a\\b\\c不能是a\b\c,因为他会解析成转义而不是路径,坑了我好久)
    ROCKETMQ_HOME="D:\\work\\code\\rocketmq"   #namesrv和broker使用的,就是我们的源码路径
    RMQ_PROXY_HOME="D:\\work\\code\\rocketmq"  #proxy使用的,proxy是新增的,所以这个环境变量也是新增的

2:创建配置文件夹
rocketmq默认使用的是ROCKETMQ_HOME\\distribution目录下的配置文件,我们不用,而是新建一个并在启动的时候指定
    2.1:创建文件夹 ROCKETMQ_HOME\\conf
    2.2:创建配置文件 ROCKETMQ_HOME\\conf\\namesrv.conf,ROCKETMQ_HOME\\conf\\broker.conf,ROCKETMQ_HOME\\conf\\rmq-proxy.json
        broker.conf和namesrv.conf的命令可以随便取,内容也可以为空,直接用默认的,
        proxy我偷懒,直接用代码默认的,默认的要求是RMQ_PROXY_HOME\\conf\\rmq-proxy.json这个文件即指定目录下的指定文件
    rmq-proxy.json文件内容如下:
        {
          "rocketMQClusterName": "DefaultCluster",
          "namesrvAddr": "127.0.0.1:9876"  
        }
    broker.conf文件内容如下:
        brokerClusterName = DefaultCluster    #必须和rmq-proxy.json中的clusterName保持一致
        brokerName = broker-a
        namesrvAddr = 127.0.0.1:9876
        storePathRootDir=D:\\work\\code\\rocketmq\\conf\\brokerstore                #!!!我们手动创建就行,还有,路径名一定要双斜杠
        storePathCommitLog=D:\\work\\code\\rocketmq\\conf\\brokerstore\\commitlog   #!!!这个目录他会自动创建,还有,路径名一定要双斜杠
    namesrv.conf文件内容如下:
        listenPort=9876        #指定端口
        (当然也可以啥也不填)
  
3:修改日志文件,以便会在控制台打印日志
    只要修改每个子项目的resource目录下的rmq.xxx.logback.xml文件就行:
    <configuration>
        <root level="INFO">
            <appender-ref ref="STDOUT"/>          #!!!!只要把<root>标签内的ref的名字改成"STDOUT"就会输出到控制台了
        </root>
    </configuration>

4:启动namesrv/broker/proxy,命令行参数:
    broker:  -c D:\\work\\code\\rocketmq\\conf\\broker.conf  #!!!路径一定要双斜杠,否则会启动失败,这个小错误卡了我好久。。
    namesrv: -c D:\\work\\code\\rocketmq\\conf\\namesrv.conf
    proxy:    (空,可以不填,因为我们用的是默认的配置文件,只要配置RMQ_PROXY_HOME以及创建对应的rmq-proxy.json就行)
    proxy:    启动会超级慢,需要三四分钟。。。真的太夸张了,暂不知道为啥

笔记1:namesrv默认端口9876,broker默认端口10911,proxy默认端口8081,默认集群名 DefaultCluster

5:用mqadmin源码来创建topic
    5.1:修改代码。运行mqadmin创建topic前必须先修改源码中的timeout,
        否则因为它连接需要耗时很长但是超时时间只有5s导致原本可以连接却因为超时而中断而topic创建失败
    源码修改如下:
        public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
            @Override
            public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
                throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
                long beginStartTime = System.currentTimeMillis();
                final Channel channel = this.getAndCreateChannel(addr);
                String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
                //TODO 恢复
                timeoutMillis=25000;        #!!!!!!在进入循环前设置timeoutMillis为25s,这样就不会超时了,只要改这里就可以了
                if (channel != null && channel.isActive()) {
                    long left = timeoutMillis;
                    try {
                        long costTime = System.currentTimeMillis() - beginStartTime;
    5.2: 运行mqadmin来创建topic,不要用mqadmin.cmd,这个创建不了,老是连不上,
         而且broker设置autoTopicCreateEnable=true不起作用必须手动创建
         mqadmin启动命令如下:
            mqadmin对应的class为:org.apache.rocketmq.tools.command.MQAdminStartup
            mqadmin:  updateTopic -b 127.0.0.1:10911 -t testx -n 127.0.0.1:9876    #创建topic testx

client测试程序:

pom.xml

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-tools</artifactId>
            <version>4.9.7</version>
        </dependency>
    </dependencies>

Producer代码:

package producer;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import java.io.IOException;
import java.time.Duration;

public class ProducerExample {


    public static void main(String[] args) throws ClientException {
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
        String endpoint = "127.0.0.1:8081";
        // 消息发送的目标Topic名称,需要提前创建。
        String topic = "testx";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint)
                .setRequestTimeout(Duration.ofSeconds(25));
        ClientConfiguration configuration = builder.build();
        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)

                .build();
        // 普通消息发送。
        String messageBody = "hello world";
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // 设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                // 消息体。
                .setBody(messageBody.getBytes())
                .build();
        try {
            // 发送消息,需要关注发送结果,并捕获失败等异常。
            for (int i = 0; i < 10000; i++) {
                SendReceipt sendReceipt = producer.send(message);
                System.out.println("Send message= {" + messageBody + "} successfully, messageId={" + sendReceipt.getMessageId() + "}");
                int ch = System.in.read();
            }
        } catch (ClientException e) {
            System.out.println("Failed to send message" + e.toString());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        // producer.close();
    }
}

consumer程序:

package consumer;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoints = "localhost:8081";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .setRequestTimeout(Duration.ofSeconds(25)) //!!!!这个超时时间一定要设置长一点,不然会导致连不上而报错
                                                           //!!!!搞了一天,真的吐血,还好今天搞完了,虽然搞完了,但还是得骂两句
                                                           //2024/11/18 22:48,又是加班暂调休的一天。。。。。。
                .build();
        // 订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // 为消费者指定所属的消费者分组,Group需要提前创建。
        String consumerGroup = "myconsumer";
        // 指定需要订阅哪个目标Topic,Topic需要提前创建。
        String topic = "testx";
        // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // 设置消费者分组。
                .setConsumerGroup(consumerGroup)
                // 设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 设置消费监听器。
                .setMessageListener(messageView -> {
                    // 处理消息并返回消费结果。
                    System.out.println("Consume message={" + messageView.getBody().toString() + "} successfully, messageId={" + messageView.getMessageId() + "}");
                    return ConsumeResult.SUCCESS;
                })
                .build();

        Thread.sleep(Long.MAX_VALUE);
        // 如果不需要再使用 PushConsumer,可关闭该实例。
        // pushConsumer.close();
    }
}

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2243494.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何通过电脑监控软件远程监控一台电脑的所有屏幕画面记录

7-1 本教程介绍一个简单的工具&#xff0c;可以安装在电脑中&#xff0c;按设置的时间间隔&#xff0c;自动对屏幕截图保存&#xff0c;并且可以在有网络的其它电脑上远程提取截图文件。 该软件用于自动记录电脑的屏幕画面内容和变化&#xff0c;如果你有这方面的使用场景&am…

Redis 概 述 和 安 装

安 装 r e d i s: 1. 下 载 r e dis h t t p s : / / d o w n l o a d . r e d i s . i o / r e l e a s e s / 2. 将 redis 安装包拷贝到 /opt/ 目录 3. 解压 tar -zvxf redis-6.2.1.tar.gz 4. 安装gcc yum install gcc 5. 进入目录 cd redis-6.2.1 6. 编译 make …

Spring Boot汽车资讯:科技与汽车的新篇章

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了汽车资讯网站的开发全过程。通过分析汽车资讯网站管理的不足&#xff0c;创建了一个计算机管理汽车资讯网站的方案。文章介绍了汽车资讯网站的系统分析部分&…

CSS:高级寄巧

精灵图 为什么需要精灵图呢&#xff1f; 一个网页中往往会应用很多小背景图作为修饰&#xff0c;当网页中的图像过多时&#xff0c;服务器就会频繁地接收和发送 请求图片&#xff0c;造成服务器请求压力过大&#xff0c;这将大大降低页面的加载速度。 因此&#xff0c;为了有…

【原创】如何备份和还原Ubuntu系统,非常详细!!

前言 我在虚拟机装了一个xfce4的Ubuntu桌面版&#xff0c;外加输入法、IDEA等&#xff0c;我想将这个虚拟机里的系统直接搬到物理机中&#xff0c;那我可以省的再重新装一遍、配置xfce4桌面、修改一堆快捷键还有配置idea了&#xff0c;那直接说干就干。 本教程基于Ubuntu24.0…

SAM_Med2D 训练完成后boxes_prompt没有生成mask的问题

之前对着这这篇文章去微调SAM_Med2D(windows环境),发现boxes_prompt空空如也。查找了好长时间问题SAM-Med2D 大模型学习笔记&#xff08;续&#xff09;&#xff1a;训练自己数据集_sam训练自己数据集-CSDN博客 今天在看label2image_test.json文件的时候发现了一些端倪: 官方…

数据结构-二叉搜索树(Java语言)

目录 1.概念 2.查找search 3.插入insert ​编辑4.删除remove&#xff08;难点&#xff09; 5.性能分析 1.概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#xff0c;或者是具有以下性质的二叉树 : 1.若它的左子树不为空&#xff0c;则左子树上所有节点的值都…

【蓝桥杯备赛】深秋的苹果

# 4.1.1. 题目解析 要求某个区间内的数字两两相乘的总和想到前缀和&#xff0c;但是这题重点在于两两相乘先硬算&#xff0c;找找规律&#xff1a; 比如要算这串数字的两两相乘的积之和&#xff1a; 1, 2, 3 1*2 1*3 2*3 1*(23) 2*3 前缀和数组&#xff1a; 1 3 6 发现…

go-zero(一) 介绍和使用

go-zero 介绍和使用 一、什么是 go-zero&#xff1f; go-zero 是一个基于 Go 语言的微服务框架&#xff0c;提供了高效、简单并易于扩展的 API 设计和开发模式。它主要目的是为开发者提供一种简单的方式来构建和管理云原生应用。 1.go-zero 的核心特性 高性能&#xff1a; g…

3. Sharding-Jdbc核⼼流 程+多种分⽚策略

1. Sharding-Jdbc 分库分表执⾏核⼼流程 Sharding-JDBC执行流程 1. SQL解析 -> SQL优化 -> SQL路由 -> SQL改写 -> SQL执⾏-> 结果归并 ->返回结果简写为&#xff1a;解析->路由->改写->执⾏->结果归并1.1 SQL解析 1. SQL解析过程分为词法解析…

编程之路,从0开始:结构体详解

目录 前言 正文 1、结构体引入 2、结构体的声明 3、typedef 4、结构体的匿名声明 5、结构的自引用 &#xff08;1&#xff09;链表 &#xff08;2&#xff09;自引用 6、结构体内存对齐 &#xff08;1&#xff09;对齐规则 &#xff08;2&#xff09;题目 &#x…

01_MinIO部署(Windows单节点部署/Docker化部署)

单节点-Windows环境安装部署 在Windows环境安装MinIO&#xff0c;主要包含两个东西&#xff1a; MinIO Server&#xff08;minio.exe&#xff09;&#xff1a;应用服务本身MinIO Client&#xff08;mc.exe&#xff09;&#xff1a;MinIO客户端工具&#xff08;mc&#xff09;…

qt5半成品飞机大战小游戏

最近在学Qt&#xff0c;心血来潮做了个飞机大战小游戏&#xff0c;由于一些资源比较难找&#xff0c;就做了个半成品。效果图如下&#xff1a; 目前已做功能&#xff1a;人物飞机的自由移动&#xff0c;子弹的发射&#xff0c;子弹与敌机的物体碰撞,碰撞特效。 缺少功能&#x…

html 图片转svg 并使用svg路径来裁剪html元素

1.png转svg 工具地址: Vectorizer – 免费图像矢量化 打开svg图片,复制其中的path中的d标签的路径 查看生成的svg路径是否正确 在线SVG路径预览工具 - UU在线工具 2.在html中使用svg路径 <svg xmlns"http://www.w3.org/2000/svg" width"318px" height…

Android OpenGL ES详解——几何着色器

目录 一、概念 1、图元 2、几何着色器 1、输入类型 2、输出类型 3、输出顶点数量最大值限制 二、使用几何着色器 三、应用举例——造几个房子 四、应用举例——爆破物体 1、获取法向量 2、显示法线 五、应用举例——细分三角形 六、应用举例——广告牌技术 一、概…

基因组之全局互作热图可视化

引言 PlotHiC 是一个专为 Hi-C 数据可视化分析而设计的 Python 包。Hi-C 技术是一种能够检测染色体三维结构的实验方法&#xff0c;它能揭示 DNA 在细胞核内的三维组织结构。为了更好地展示和解释这些复杂的数据&#xff0c;PlotHiC[1] 可以帮助用户方便地绘制Hi-C 数据的热图。…

JVM详解:类的加载过程

JVM中类的加载主要分为三个部分&#xff0c;分别为加载&#xff08;loading&#xff09;&#xff0c;链接&#xff08;linking&#xff09;&#xff0c;初始化&#xff08;initing&#xff09;。其中加载负责的主要是讲类文件加载到内存中变为类对象&#xff0c;不过此时只有基…

FPGA开发流程

注&#xff1a;开发板&#xff1a;小梅哥的ACX720。本实验可直接运行在小梅哥的ACX720开发板上&#xff0c;后续的实验都可直接运行在小梅哥的ACX720上。 一、打开VIVADO并创建工程 1、双击VIVADO图标&#xff0c;打开vivado。 2、打开vivado界面打&#xff0c;点击有 Create …

免费开源!DBdoctor推出开源版系统诊断工具systool

​前言 在开发和运维过程中&#xff0c;经常会遇到难以定位的应用问题&#xff0c;我们通常需要借助Linux系统资源监控工具来辅助诊断。然而&#xff0c;系统的IO、网络、CPU使用率以及文件句柄等信息通常需要通过多个独立的命令工具来获取。在没有部署如Prometheus这样的综合…

Restful API接⼝简介及为什么要进⾏接⼝压测

一、RESTful API简介 在现代Web开发中&#xff0c;RESTful API已经成为一种标准的设计模式&#xff0c;用于构建和交互网络应用程序。本文将详细介绍RESTful API的基本概念、特点以及如何使用它来设计高效的API接口。 1. 基于协议 HTTP 或 HTTPS RESTful API通常使用HTTP&am…