大家心心念念的RocketMQ5.x入门手册来喽

news2024/11/15 11:55:22

1、前言

为了更好的拥抱云原生,RocketMQ5.x架构进行了大的重构,提出了存储与计算分离的设计架构,架构设计图如下所示:

00

RocketMQ5.x提供了一套非常建议的消息发送、消费API,并统一放在Apache顶级开源项目rocketmq-clients下,链接:https://github.com/apache/rocketmq-clients,提供了cpp、go、java、php、rust的实现,多语言生态初现,如下图所示:

01

2、源码级调试 RocketMQ 5.x

当RocketMQ为了顺应云原生大潮,提出存储与计算分离后,想必我相信很多粉丝朋友和我一样,都希望尽快一睹RocketMQ5.x的”芳颜“,如果还没有在IDE中调试通过的小伙伴,那就跟着我的步骤来,带你一起体验RocketMQ 5.x。

Step1:从github(https://github.com/apache/rocketmq)下载源码,并导入到IDEA中,如下图所示:

02

相比RocketMQ4.x,5.x主要是增加了一个代理模块(rocketmq-proxy),将路由、计算等功能从Broker中剥离出来。

Step2:创建一个RocketMQ主目录,并在主目录中创建conf文件夹,并把源码中distribution模块中conf下的文件拷贝到当前目录,如下图所示:

03

Step3:从namesrv模块中找到类NamesrvStartup类,配置后运行,如下图所示:

04

这里的关键点在于需要配置环境变量ROCKETMQ_HOME,其路径设置为【Step2】中创建的目录,然后启动该类,输出如下所示表示NameServer启动成功。

The Name Server boot success. serializeType=JSON

Step4:从broker模块中找到类BrokerStartup,配置后运行,效果如下图所示:

05

这里有两个要点:

  • 通过 -c 参数指定broker配置文件的位置
  • 设置ROCKETMQ_HOME环境变量,其路径就是上文中conf目录所在的父目录

Step5:启动proxy模块,如下图所示:

06

设置好环境变量RMQ_PROXY_HOME环境变量,直接启动,会抛出如下错误:

07

原因是RocketMQ Proxy在启动时会RMQ_PROXY_HOME加载日志文件,我们从源码模块中distribution中logback_proxy.xml拷贝到proxy主目录的conf文件夹下。

再次尝试启动,抛出如下错误:

08

需要再从源码模块中distribution中rmq-proxy.json拷贝到proxy主目录的conf文件夹下,启动成功如下所示:

09

那问题来了,rmq-proxy.json文件中的内容是多少呢?

{
  "rocketMQClusterName": "DefaultCluster"
}

那这个文件中又可以陪着哪些参数呢?这个目前无法从官方网站中获取,大家可以去查看org.apache.rocketmq.proxy.config.ProxyConfig,里面所有的属性都可以在这个文件中配置。

Nameserver、broker、Proxy都已经启动成功了,那我们如何发送消息呢?

由于RocketMQ 5.x引入了Proxy,原先的RocketMQ Client API 不能直接使用,RocketMQ官方提供了一套极简API,API的完整定义在Apache顶级开源项目rocketmq-apis(https://github.com/apache/rocketmq-apis),具体的定义如下图所示:

10

具体的实现在https://github.com/apache/rocketmq-clients,实现了cpp、golang、java、php、rust的实现。

接下来,我们使用一下java版本的客户端尝试发送一条消息,代码如下所示:

<dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client-apis</artifactId>
      <version>5.0.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client-java</artifactId>
      <version>5.0.0</version>
    </dependency>
  
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

public class RocketMQProxyTest {

    public static void main(String[] args) throws Exception {


        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        // Credential provider is optional for client configuration.
        String accessKey = "yourAccessKey";
        String secretKey = "yourSecretKey";
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        String endpoints = "127.0.0.1:8081";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .setCredentialProvider(sessionCredentialsProvider)
                .setRequestTimeout(Duration.ofSeconds(30))
                .build();
        String topic = "TopicTest";
        final Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                // Set the topic name(s), which is optional. It makes producer could prefetch the topic route before
                // message publishing.
                .setTopics(topic)
                // May throw {@link ClientException} if the producer is not initialized.
                .build();
        // Define your message body.
        byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "yourMessageTagA";


        final Message message = provider.newMessageBuilder()
                // Set topic for the current message.
                .setTopic(topic)
                // Message secondary classifier of message besides topic.
                .setTag(tag)
                // Key(s) of the message, another way to mark message besides message id.
                .setKeys("yourMessageKey-0e094a5f9d85")
                .setBody(body)
                .build();
        final CompletableFuture<SendReceipt> future = producer.sendAsync(message);
        future.whenComplete((sendReceipt, throwable) -> {
            if (null == throwable) {
                System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId());
            } else {
                System.out.println("Failed to send message");
            }
        });
        // Block to avoid exist of background threads.
        Thread.sleep(Long.MAX_VALUE);
        // Close the producer when you don't need it anymore.
        producer.close();
    }
}

运行结果:

Send message successfully, messageId=01C6A0F34F62CB328C03EFF3EF00000000

运行成功,在这里给大家留一个作业,那消息消费如何写呢?

原文首发:https://www.codingw.net/Article?id=783

一键三连(关注、点赞、留言)是对我最大的鼓励。

各位技术朋友们,我是《RocketMQ技术内幕》一书作者,CSDN2020博客之星TOP2,热衷于中间件领域的技术分享,维护「中间件兴趣圈」公众号,旨在成体系剖析Java主流中间件,构建完备的分布式架构体系,欢迎大家大家关注我,回复「专栏」可获取15个专栏;回复「PDF」可获取海量学习资料,回复「加群」可以拉你入技术交流群,零距离与BAT大厂的大神交流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/340276.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

T07 奥运排序问题

描述 按要求&#xff0c;给国家进行排名。 输入描述&#xff1a; 有多组数据。 第一行给出国家数N&#xff0c;要求排名的国家数M&#xff0c;国家号从0到N-1。 第二行开始的N行给定国家或地区的奥运金牌数&#xff0c;奖牌数&#xff0c;人口数&#xff08;百万&#xff09…

【c/c++】c语言的自增操作在不同编译器的差别

示例代码 代码如下&#xff1a; #include <stdio.h>#define product(x) ((x)*(x))int main(void) {int i 3, j, k;j product(i); // (i) * (i)k product(i); // (i) * (i)printf("%d %d\n", j, k); }执行结果 在Ubuntu18.04下通过GCC编译和执行的结果…

【在执行make geth报错解决方法】

在执行make geth报错解决方法问题详细描述&#xff1a;详细解决方法对根据报错提示信息对相关文件夹权限进行修改2、再次执行make geth 检查是否还报错问题详细描述&#xff1a; Ubuntu 版本&#xff1a;18.04问题&#xff1a;在编译运行以太坊源码执行make geth命令时报错&am…

*from . import _imaging as core : ImportError: DLL load failed: 找不到指定的模块

错误提示如上。为了解决这个问题&#xff0c;首先参考了解决 from . import _imag…模块。. 首先尝试了彻底卸载pillow&#xff1a;conda uninstall pillow &#xff1b; pip uninstall pillow 然后重装 pip install pillow&#xff0c;发现问题仍然没有解决。 并且尝试了windo…

湿敏电阻的原理,结构,分类与应用总结

🏡《总目录》 0,概述 湿敏电阻是指电阻值随着环境的湿度变化而变化的电阻,本文对其工作原理,结构,分类和应用场景进行总结。 1,工作原理 湿敏电阻是利用湿敏材料制成的,湿敏材料吸收空气中水分时,自身的阻值发生变化。 2,结构 如下图所示,市民电阻包括4个部分构成,…

SpringBoot+Vue实现智能物流管理系统

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7/8.0 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.3.9 浏…

【手写 Vuex 源码】第七篇 - Vuex 的模块安装

一&#xff0c;前言 上一篇&#xff0c;主要介绍了 Vuex 模块收集的实现&#xff0c;主要涉及以下几个点&#xff1a; Vuex 模块的概念&#xff1b;Vuex 模块和命名空间的使用&#xff1b;Vuex 模块收集的实现-构建“模块树”&#xff1b; 本篇&#xff0c;继续介绍 Vuex 模…

gradle命令

环境搭建 $ mkdir /opt/gradle $ unzip -d /opt/gradle gradle-7.6-bin.zip $ ls /opt/gradle/gradle-7.6 LICENSE NOTICE bin getting-started.html init.d lib media配置环境变量 $ export PATH=$PATH:/opt/gradle/gradle-7.6/bin检查配置是否ok gradle -v Android …

Elasticsearch7.8.0版本进阶——分布式集群(应对故障)

目录一、Elasticsearch集群的安装1.1、Elasticsearch集群的安装&#xff08;win10环境&#xff09;1.2、Elasticsearch集群的安装&#xff08;linux环境&#xff09;二、应对故障&#xff08;win10环境集群演示&#xff09;2.1、启动集群&#xff08;三个节点&#xff09;2.2、…

Lecture4 反向传播(Back Propagation)

目录 1 问题背景 1.1计算图&#xff08;Computational Graph&#xff09; 1.2 激活函数&#xff08;Activation Function&#xff09;引入 1.3 问题引入 2 反向传播&#xff08;Back Propagation&#xff09; 2.1 为什么要使用反向传播 2.2 前馈运算(Forward Propagation…

Allegro如何更改临时高亮的颜色设置操作指导

Allegro如何更改临时高亮的颜色设置操作指导 在用Allegro做PCB设计的时候,当移动或者高亮某个对象之前,会被临时高亮一个颜色,方便查看,类似下图 运行高亮命令的时候,器件被临时高亮成了白色 软件默认的是白色,如何更改成其它颜色? 具体操作如下 点击Display选择Color…

西瓜书读书笔记—绪论

文章目录机器学习典型的机器学习过程基本术语归纳偏好机器学习 机器学习&#xff1a;致力于研究如果通过计算的手段&#xff0c;利用经验来改善系统自身的性能 在计算机系统中&#xff0c;“经验” 通常以 “数据” 形式存在&#xff0c;因此&#xff0c;机器学习所研究的主要内…

《计算机组成与设计》05. 大而快:层次化存储

文章目录局部性原理存储层次结构存储层次结构示意图传输数据示意图Cache 基础映射方式直接映射全相连映射组相连映射Cache 访问直接映射例题 —— Cache 容量计算组相联映射处理写操作3C 模型Cache 失效问题 —— 通过更改 Cache 块容量&#xff0c;以此通过空间局部性来降低失…

【Linux常用指令合集】

基本的增删改查 ls&#xff1a;显示文件或目录-l&#xff1a;列出文件详细信息l(list)-a&#xff1a;列出当前目录下所有文件及目录&#xff0c;包括隐藏的a(all) mkdir 目录名&#xff1a;创建目录-p&#xff1a;级联创建 cd 目录&#xff1a;切换目录 pwd&#xff1a;显示当…

SpringMVC--视图、RESTful案例、处理AJAX请求

SpringMVC的视图 SpringMVC中的视图是View接口&#xff0c;视图的作用渲染数据&#xff0c;将模型Model中的数据展示给用户 SpringMVC视图的种类很多&#xff0c;默认有转发视图和重定向视图 当工程引入jstl的依赖&#xff0c;转发视图会自动转换为JstlView 若使用的视图技术为…

GitHub个人资料自述与管理主题设置

目录 关于您的个人资料自述文件 先决条件 添加个人资料自述文件 删除个人资料自述文件 管理主题设置 补充&#xff1a;建立一个空白文件夹 关于您的个人资料自述文件 可以通过创建个人资料 README&#xff0c;在 GitHub.com 上与社区分享有关你自己的信息。 GitHub 在个…

【触摸屏功能测试】MQTT_STD本地调试说明-测试记录

1、MQTT简介 MQTT是一种基于发布/订阅模式的“轻量级”通讯协议。它是针对受限的、低带宽的、高延迟的、网络不可靠的环境下的网络通讯设备设计的。 发布是指客户端将消息传递给服务器&#xff0c;订阅是指客户端接收服务器推送的消息。每个消息有一个主题&#xff0c;包含若干…

七大设计原则之迪米特法则应用

目录1 迪米特法则介绍2 迪米特法则应用1 迪米特法则介绍 迪米特原则&#xff08;Law of Demeter LoD&#xff09;是指一个对象应该对其他对象保持最少的了解&#xff0c;又叫最少知 道原则&#xff08;Least Knowledge Principle,LKP&#xff09;&#xff0c;尽量降低类与类之…

30分钟吃掉wandb可视化自动调参

wandb.sweep: 低代码&#xff0c;可视化&#xff0c;分布式 自动调参工具。使用wandb 的 sweep 进行超参调优&#xff0c;具有以下优点。(1)低代码&#xff1a;只需配置一个sweep.yaml配置文件&#xff0c;或者定义一个配置dict&#xff0c;几乎不用编写调参相关代码。(2)可视化…

Django框架之视图和URL

视图和URL 站点管理页面做好了, 接下来就要做公共访问的页面了.对于Django的设计框架MVT. 用户在URL中请求的是视图.视图接收请求后进行处理.并将处理的结果返回给请求者.使用视图时需要进行两步操作 1.定义视图2.配置URLconf 1. 定义视图 视图就是一个Python函数&#xff0c…