RocketMQ 5.0 快速入门

news2024/11/24 6:42:56

RocketMQ 5.0

Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。

1、下载与安装RocketMQ 5.0

为了接近生产环境的开发,我们都是选择直接在Linux服务器上安装。

下载地址:https://rocketmq.apache.org/zh/download

在这里插入图片描述

注意:安装包分为二进制包和源码包,二进制包是已经编译好的可以直接运行,而源码包则需要编译后才能运行。

编译命令示例

$ unzip rocketmq-all-5.1.3-source-release.zip
$ cd rocketmq-all-5.1.3-source-release/
$ mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
$ cd distribution/target/rocketmq-5.1.3/rocketmq-5.1.3

我们直接下载最新的二进制包就好了。

安装步骤如下

1、启动NameServer注册中心(存储 Broker 元信息)

# 解压
$ unzip rocketmq-all-5.1.3-bin-release.zip

解压后我们需要改一下启动脚本(如果服务器资源足够多可以忽略这一步)。

runserver.sh需要修改JVM内存的配置,此脚本默认从JVM申请的内存有4G(我们只是用来测试与学习服务器资源配置根本没有这么高),如下

# 以下为 runserver.sh 截取片段
# 无论走 if 还是 else -Xms和-Xmx的配置都是4g
# 所以我们要重新赋值这个 JAVA_OPT 变量
choose_gc_options()
{
    # Example of JAVA_MAJOR_VERSION value : '1', '9', '10', '11', ...
    # '1' means releases befor Java 9
    JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | awk -F '"' '/version/ {print $2}' | awk -F '.' '{print $1}')
    if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then
      JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
      JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
      JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
      JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
    else
      JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
      JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
      JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"
    fi
}

重新定制内存(重新赋值JAVA_OPT变量)直接加在条件判断代码块后面即可。

# 重新定制内存
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
# JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
# JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"

runbroker.sh也是需要修改JVM内存的配置,如下代码默认分配的是8g。

# 修改前
# JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
# 修改后
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"

修改完后,我们就可以启动 RocketMQ 的 NameServer 了

# 启动 namesrv
$ nohup sh bin/mqnamesrv &
 
# 验证 namesrv 是否启动成功
$ tail -f -n 500 nohup.out
...
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876

# 或者是
$ tail -f ~/logs/rocketmqlogs/namesrv.log
2023-07-18 23:17:49 INFO NSScanScheduledThread - start scanNotActiveBroker
...

2、启动 Broker 消息存储中心和 Proxy 代理

Proxy组件是 RocketMQ 5.0 版本官方推荐的部署组件,详细说明可查看官方文档

https://rocketmq.apache.org/zh/docs/deploymentOperations/01deploy/

NameServer 成功启动后,我们启动 Broker 和 Proxy 。

# 启动 Broker+Proxy
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

# 指定配置文件启动(broker默认使用的端口是10911,我们也可以在配置文件修改端口)
$ nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf --enable-proxy &

# 注意 --enable-proxy 开启代理后可能会报错
# java.io.IOException: Failed to bind to address 0.0.0.0:8080
# 当端口被占用时 broker/proxy 将无法启动
# 解决方案 https://blog.csdn.net/zooah212/article/details/127994243

# 验证是否启动成功
$ tail -f -n 500 nohup.out
The broker[suzhou-ydshp, 192.168.5.135:10911] boot success. serializeType=JSON and name server is localhost:9876

2、测试消息收发

创建消息发送的目标 Topic,RocketMQ 5.0 版本需要提前创建,例如:

# 可以通过 mqadmin 命令创建
# 注意 TestTopic 是topic名称
$ sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster

create topic to 192.168.5.135:10911 success.
TopicConfig [topicName=TestTopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]

1、在IDEA中创建一个Java工程,并引入以下依赖

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client-java -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.5</version>
</dependency>

2、创建发送消息的程序并运行

@SpringBootTest
public class DemoApplicationTest {
    private static final Logger logger = LoggerFactory.getLogger(DemoApplicationTest.class);

    @Test
    public void test() throws ClientException {
        // 接入点地址,需要设置成 Proxy 的地址和端口列表,一般是xxx:8081;xxx:8081
        String endpoint = "192.168.5.135:8081";
        // 消息发送的目标Topic名称,需要提前创建。
        String topic = "TestTopic";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
        ClientConfiguration configuration = builder.build();

        // 初始化Producer时需要设置通信配置以及预绑定的Topic
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();

        // 普通消息发送
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // 设置消息索引键,可根据关键字精确查找某条消息
                .setKeys("messageKey")
                // 设置消息Tag,用于消费端根据指定Tag过滤消息
                .setTag("messageTag")
                // 消息内容实体(byte[])
                .setBody("hello rocketMQ".getBytes())
                .build();
        try {
            // 发送消息,需要关注发送结果,并捕获失败等异常。
            SendReceipt sendReceipt = producer.send(message);
            logger.info("send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
            logger.error("failed to send message", e);
        }
        // 关闭
        producer.close();
    }
}

3、创建订阅消息程序并运行。

Apache RocketMQ 支持SimpleConsumer和PushConsumer两种消费者类型,可以选择任意一种方式订阅消息。这里主要介绍PushConsumer。

@Test
public void pushConsumerTest() throws Exception {
    ClientServiceProvider provider = ClientServiceProvider.loadService();
    // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081
    String endpoint = "192.168.5.135:8081";
    ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
            .setEndpoints(endpoint)
            .build();
    // 订阅消息的过滤规则,表示订阅所有Tag的消息
    String tag = "*";
    FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
    // 为消费者指定所属的消费者分组,Group需要提前创建
    String consumerGroup = "TestGroup";

    // 指定需要订阅哪个目标Topic,Topic需要提前创建
    String topic = "TestTopic";
    // 初始化 PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系
    PushConsumer pushConsumer = provider.newPushConsumerBuilder()
            .setClientConfiguration(clientConfiguration)
            // 设置消费者分组
            .setConsumerGroup(consumerGroup)
            // 设置预绑定的订阅关系
            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
            // 设置消费监听器
            .setMessageListener(messageView -> {
                // 处理消息并返回消费结果
                logger.info("consume message successfully, messageId={}", messageView.getMessageId());
                // 消息内容处理
                ByteBuffer body = messageView.getBody();
                String message = StandardCharsets.UTF_8.decode(body).toString();
                body.flip();
                logger.info("message body={}", message);
                return ConsumeResult.SUCCESS;
            }).build();
    Thread.sleep(Long.MAX_VALUE);
    // 如果不需要再使用 PushConsumer,可关闭该实例。
    pushConsumer.close();
}

3、新版rocketmq-dashboard搭建

rocketmq-dashboard是由 rocketmq-console 升级而来,整体UI风格更加简洁,新增了很多新功能。支持多种部署方式如 docker 镜像部署,源码手动编译与部署等。

搭建过程可参考如下文章

文章地址:https://blog.csdn.net/m0_46357847/article/details/130476251

官方文档(使用说明):https://rocketmq.apache.org/zh/docs/deploymentOperations/04Dashboard

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/772260.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大二web作业精仿王者荣耀(html+css)

经过漫长的期末考试季节&#xff0c;我成功地完成了一个王者荣耀的仿写项目&#xff0c;并且非常高兴地与大家分享。 作品展示 作业-王者荣耀 作品目录 获取源码 1&#xff0c;复制该网站 https://download.csdn.net/download/qq_42431718/87946610 2&#xff0c;点击上方下…

Lazygit贴合 neovim

功能性要比gitui 好用&#xff0c;vim 的键位习惯 > 嵌入式数据库 &#xff0c;python 的性能够用了 … … ,分析差异&#xff0c;选择 备份和升级

ROS:action通信

目录 一、前言二、概念三、作用四、实际案例4.1需求4.2action通信自定义action文件4.2.1定义action文件4.2.2编辑配置文件4.2.3编译 4.3action通信自定义action文件调用(C)4.3.1流程4.3.2vscode配置4.3.3服务端4.3.4客户端4.3.5编译配置文件4.3.6执行 4.4action通信自定义actio…

python_day11_practice

将文本数据插入数据库 两文本文件为day10面向对象练习案例 将data_define.py文件复制过来&#xff08;导入失败&#xff0c;疑惑&#xff09; 新建数据库&#xff0c;建表orders -- CREATE DATABASE py_sql charset utf8;use py_sql;create table orders(order_date date,…

企业内部FAQ系统的搭建重要性是什么?

企业内部FAQ系统&#xff08;Frequently Asked Questions&#xff0c;常见问题解答系统&#xff09;的搭建对于企业来说具有重要的意义。它可以帮助企业有效地管理和解决员工和客户的常见问题&#xff0c;提高工作效率和服务质量。 企业内部FAQ系统搭建的重要性&#xff1a; …

k8s集群内网与办公网络打通

k8s网络 k8s节点ip段&#xff1a;192.168.1.0/24 k8s pod ip段&#xff1a; 10.233.64.0/18 k8s svc ip段&#xff1a; 10.233.0.0/18办公网络 办公电脑ip段&#xff1a;192.168.2.0/24 交换机ip&#xff1a; 192.168.8.252说明 192.168.2.0/24网段访问192.168.1.0/24网段经…

埋点数据完备性校验及结果分析

一、数据校验功能入口 入口:数据管理——数据质量——数据校验 二、操作步骤 2.1 统计时间区间 根据自身需要进行选择。一般选择埋点严重bug fix后的时间,避免脏数据过多影响分析结果。 2.2 数据抽样 根据自身需要进行选择。全量数据量较大,分析起来会复杂一些,但是结…

Win11中的Swapfile.sys

除了 pagefile.sys 和 hiberfil.sys 文件外&#xff0c;在系统根目录会多出一个 swapfile.sys 虚拟内存文件。Windows 10/8 系统为什么会同时使用 SWAP 交换文件和 Page 页面文件呢&#xff1f; 其实 swapfile.sys 文件目前只被用来交换 Universal App (其实就是Metro App)的个…

pyqt 使用pixmap展示图片时候出现失真(图片偏移)

像上图上面的情况&#xff0c; 都是经过放大、旋转等操作&#xff0c;展示图片的时候出现失真的情况 一般都是显卡的问题 需要在qimage转pixmap时&#xff0c;添加部分参数 修改办法&#xff1a; 原本是&#xff1a; pixMap QImage(self.pic_image, width, height, QImage…

JDK、JRE、JVM三者之间的关系以及区别

一、关系 JDK JRE Java 开发工具包 [Java,Javac,Javadoc,Javap等] JRE JVM Java 的核心类库 二、JDK,JRE与JVM介绍 1、JDK JDK是用于Java程序开发的最小环境&#xff0c;包含&#xff1a;Java程序设计语言&#xff0c;Java虚拟机&#xff08;JVM&#xff09;&#xff0…

喜讯!旭帆科技成功入驻“科大硅谷”!

2023年7月&#xff0c;安徽旭帆信息科技有限公司&#xff08;以下简称“旭帆科技”&#xff09;成功入驻“科大硅谷”&#xff0c;成为合肥城市发展新引擎、科创生态集群企业队伍中的一员。 “科大硅谷”项目建设总投资约75.82亿&#xff0c;共计17.37平方公里&#xff0c;是聚…

MQTT 订阅选项的使用

在 MQTT 发布/订阅模式介绍这篇博客中&#xff0c;我们已经了解到&#xff0c;我们需要先向服务端发起订阅&#xff0c;才能从服务端接收对应的消息。如果说订阅时指定的主题过滤器决定了服务端将向我们转发哪些主题下的消息&#xff0c;那么订阅选项则是允许我们进一步定制服务…

Python学习(十三)

安装包的方法&#xff1a; #python数据和json数据的相互转换 import json #准备列表&#xff0c;列表的每一个元素都是字典&#xff0c;将其转换为JSON data [{"name":"大大","age":21},{"name":"小小","age":21…

Java工程师进阶:50小时,搞定企业级核心框架

哈喽&#xff0c;做Java开发的同学们注意啦&#xff01;&#xff01;&#xff01; 小谷又来分享技术了&#xff0c;关乎Java工程师技能进阶与升职加薪的方向哦~~ 最近老有小伙伴私信小谷&#xff0c;说自己的Java水平已经到了瓶颈期&#xff0c;不知道该咋突破&#xff0c;长久…

STM32定时器中断的使用示例

STM32定时器中断的使用示例 前言硬件和软件cubemx使能定时器中断中断服务函数案例输出结果 前言 上一篇博客实现了定时器输出pwm&#xff0c;这篇接着上次的工程&#xff0c;在上次的工程上做简单的配置即可 硬件和软件 硬件使用的是stm32h750vbt6&#xff1b;软件用到了stm…

JAVA电商 B2B2C商城系统免费搭建 多用户商城系统 直播带货 新零售商城 o2o商城 电子商务 拼团商城 分销商城 手机商城免费搭建

1. 涉及平台 平台管理、商家端&#xff08;PC端、手机端&#xff09;、买家平台&#xff08;H5/公众号、小程序、APP端&#xff08;IOS/Android&#xff09;、微服务平台&#xff08;业务服务&#xff09; 2. 核心架构 Spring Cloud、Spring Boot、Mybatis、Redis 3. 前端框…

星火认知大模型,让我感受到了国产AI的崛起

文章目录 一、申请和测试代码二、实测GPT4.0和星火认知大模型的对比2.1 测试网站2.2 经典问题提问对比2.3 代码问题提问对比2.4 论文问题对比2.5 评价 一、申请和测试代码 在我之前的一篇文章中&#xff0c;我分享了如何申请星火认知大模型的内测&#xff0c;并提供了一份可以…

云曦期末复现

serialize 代码审计&#xff0c;给1传参&#xff0c;满足password的值为yunxi&#xff0c;那么反序列化前就会执行__wakeup函数&#xff0c;从而得到flag.php&#xff0c;但是password的值被定死为1&#xff0c;利用PHP反序列化的字符逃逸: <?php error_reporting(0); hig…

Ubuntu搭建docker+laradock

使用Ubuntu搭建dockerlaradock windows 下载Ubuntu工具二选一 链接&#xff1a;https://pan.baidu.com/s/154K6MKdFZxWqaTn2q-6MSQ 提取码&#xff1a;06lc https://www.jianshu.com/p/b7e11d0dbe8c借鉴地址&#xff1a;https://zhuanlan.zhihu.com/p/547169542 备注&#x…

GO语言semaphore信号量

一般地,我们唤醒在等待队列中的线程会使用系统调用和切换线程这样的开销比较大. 本质上.是结合自旋锁和调度器调度后的锁.不过这种机制适合线程不适合协程.因为调度器调度需要切换线程,而协成切换不能切换线程. 协程等待一个锁如何等待和唤醒呢? 在GO语言中,我们使用semaphor…