从0开始学习RocketMQ:快速部署启动

news2024/12/22 23:00:34

快速部署

快速部署一个单节点单副本 RocketMQ 服务,并完成简单的消息收发。

安装Apache RocketMQ

下载地址:RocketMQ官网下载

这里我们下载二进制包:rocketmq-all-5.3.0-bin-release.zip
直接解压即可:tar -zxvf rocketmq-all-5.3.0-bin-release.zip
重命名:mv rocketmq-all-5.3.0-bin-release rocketmq
在这里插入图片描述

启动NameServer

安装完RocketMQ包后,我们启动NameServer

### 启动namesrv
$ nohup sh bin/mqnamesrv &
 
### 验证namesrv是否启动成功
$ tail -f nohup.out
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876

启动Broker+Proxy

NameServer成功启动后,我们启动Broker和Proxy。这里我们使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。

### 先启动broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

### 验证broker是否启动成功
$ tail -f nohup.out
Wed Sep 11 18:35:56 CST 2024 rocketmq-proxy startup successfully

在这里插入图片描述

工具测试消息收发

export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

在这里插入图片描述

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

在这里插入图片描述

命令行测试

创建 Topic

bin/mqadmin updateTopic -c DefaultCluster -n localhost:9876 -t studyMq

在这里插入图片描述

查看 Topic 列表

bin/mqadmin topicList -n localhost:9876

在这里插入图片描述

发送消息

bin/mqadmin sendMessage -n localhost:9876 -t studyMq -p 111111

在这里插入图片描述

消费消息

bin/mqadmin consumeMessage -n localhost:9876 -t studyMq

在这里插入图片描述

SDK测试

生产消息

package rocketmq;


import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;

import java.time.Duration;

public class ProducerExample {
    public static void main(String[] args) throws Exception {
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
        String endpoint = "127.0.0.1:8080";
        // 消息发送的目标Topic名称,需要提前创建。
        String topic = "studyMq";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
        builder.setRequestTimeout(Duration.ofSeconds(10));
        ClientConfiguration configuration = builder.build();
        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
        // 普通消息发送。
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("yyh")
                // 消息体。
                .setBody("good night".getBytes())
                .build();
        try {
            // 发送消息,需要关注发送结果,并捕获失败等异常。
            SendReceipt sendReceipt = producer.send(message);
            System.out.println("Send message successfully, messageId:" + sendReceipt.getMessageId());
        } catch (ClientException e) {
            System.out.println("Failed to send message:" + e);

        }
        // producer.close();
    }
}

消费消息

package rocketmq;

import lombok.Data;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;

@Data
public class ConsumerExample {
    private ConsumerExample() {}
    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoints = "127.0.0.1:8080";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .setRequestTimeout(Duration.ofSeconds(20))
                .build();
        // 订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // 为消费者指定所属的消费者分组,Group需要提前创建。
        String consumerGroup = "YourConsumerGroup";
        // 指定需要订阅哪个目标Topic,Topic需要提前创建。
        String topic = "studyMq";
        // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // 设置消费者分组。
                .setConsumerGroup(consumerGroup)
                // 设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 设置消费监听器。
                .setMessageListener(messageView -> {
  					// 获取消息体的ByteBuffer
                    ByteBuffer byteBuffer = messageView.getBody();
					 // 将ByteBuffer转换为字节数组
                    byte[] bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes); //将ByteBuffer中的数据读取到字节数组
                    // 将字节数组转换为字符串
                    String msgBody = new String(bytes, StandardCharsets.UTF_8);
                    // 处理消息并返回消费结果。
                    System.out.println("Consume message successfully, messageId=" + messageView.getMessageId() +",msg=" + msgBody);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        // 如果不需要再使用 PushConsumer,可关闭该实例。
        // pushConsumer.close();
    }
}

关闭服务器

通过以下方式关闭服务

sh bin/mqshutdown broker

sh bin/mqshutdown namesrv

注意:如果是部署在云服务器上,需要把9876、 8080、10911端口都对外授权

参考资料
《https://rocketmq.apache.org/zh/docs/quickStart/01quickstart》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2132451.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

光伏开发:工商业光伏的流程管理全面解析

一、项目准备阶段 1、资源寻觅与沟通 首要任务是寻找适合的工商业屋顶或空地资源,并与业主初步交流,了解其意向、屋顶条件及用电情况。这一阶段的关键在于建立信任关系,为后续工作奠定基础。 2、资料收集与核查 全面收集业主资料&#xff…

算法练习题26——多项式输出(模拟)

输入格式 输入共有 2 行 第一行 1 个整数,n,表示一元多项式的次数。 第二行有 n1 个整数,其中第 i 个整数表示第 n−i1 次项的系数,每两个整数之间用空格隔开。 输出格式 输出共 1 行,按题目所述格式输出多项式。…

Navicat BI 中创建自定义字段:计算字段

在数据库设计和开发中,避免存储任何可以从其他字段计算或重建的数据是一种惯例。因此,在 Navicat BI 中构建图表时,你可能会缺少一些数据。但这不是问题,因为 Navicat BI 提供了专门用于此目的的计算字段。在今天的博客中&#xf…

网站按钮检测系统源码分享

网站按钮检测检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Computer Vis…

浅谈MVC设计模式

1 前言 1.1 内容概要 熟悉使用JSON工具,完成Java对象(Map)和Json字符串之间的相互转换(注意提供构造器和getter/setter方法) 注意事项:不管使用的是什么JSON工具,都要提供类的无参构造方法和…

基于SpringBoot的宠物寄领养网站管理系统

作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、SSM项目源码 系统展示 【前后端分离】基于JavaSpringBootVueMySQL的宠物寄领养网站…

北斗卫星系统信号介绍

覆盖范围亚太区域全球范围 卫星数量35颗区域服务卫星30颗全球服务卫星 信号频段B1I, B2IB1C, B2a, B3, 兼容GPS/Galileo 定位精度区域内10米全球2.5~5米,中国内更高 新增功能区域短报文通信全球短报文通信、星基增强、精密定位 抗干扰能力相对有限更强 互操作…

无人机 PX4 飞控 | 如何检测状态估计EKF性能

无人机 PX4 飞控 | 如何检测状态估计EKF性能 前言检查EKF性能缺少pyulog问题解决脚本崩溃没有输出文件生成对应文件 结语 前言 ECL (Estimation and Control Library,估计和控制库),其中的状态估计使用扩展卡尔曼滤波算法&#x…

图像检测【YOLOv5】——深度学习

Anaconda的安装配置:(Anaconda是一个开源的Python发行版本,包括Conda、Python以及很多安装好的工具包,比如:numpy,pandas等,其中conda是一个开源包和环境管理器,可以用于在同一个电脑…

计算机网络基本概述

欢迎大家订阅【计算机网络】学习专栏,开启你的计算机网络学习之旅! 文章目录 前言一、网络的基本概念二、集线器、交换机和路由器三、互连网与互联网四、网络的类型五、互连网的组成1. 边缘部分2. 核心部分 六、网络协议 前言 计算机网络是现代信息社会…

安装node 报错需要:glibc >= 2.28

--> 解决依赖关系完成 错误:软件包:2:nodejs-18.20.4-1nodesource.x86_64 (nodesource-nodejs) 需要:libm.so.6(GLIBC_2.27)(64bit) 错误:软件包:2:nodejs-18.20.4-1nodesource.x86_64 (nodesource-nodej…

【数据结构篇】~排序(1)之插入排序

排序~插入排序 前言插入排序1.直接插入排序(时间复杂度:O(N^2))1.思想2.代码 2.希尔排序(时间复杂度:O(N∙))1.思路简易证明希尔排序的复杂度 2.代码 前言 四大排序,今天解决插入排序 堆排序和冒泡排序已经写过了&am…

C++笔记---继承(上)

1. 继承的简单介绍 1.1 继承的概念 继承(inheritance)机制是面向对象程序设计使代码可以复用的最重要的手段,它允许我们在保持原有类特性的基础上进行扩展,增加方法(成员函数)和属性(成员变量),这样产生新的类,称派生类。 继承呈…

如何利用 Smarter Balanced 塑造教育领域的 AI 治理

目录 定义挑战 以人为本的设计引领 融入多样性 探索以学生为中心的价值观 探索效果的层次和不同的影响 部位于加利福尼亚州的Smarter Balanced Assessment Consortium 是一个由会员主导的公共组织,为 K-12 和高等教育领域的教育工作者提供评估系统。该组织成立…

09_Tensorflow2图像处理大赏:让你的图片笑出AI感,惊艳朋友圈!

1. 图像处理案例 1.1 逆时针旋转90度 import tensorflow as tf import matplotlib.pyplot as plt import matplotlib.cm as cm import numpy import osdef show_pic(pic,name,cmapNone):显示图像plt.imshow(pic,cmapcmap) plt.axis(off) # 打开坐标轴为 on # 设置图像标题…

C语言数据类型、变量及数据类型的长度、取值范围

文章目录 一、数据类型介绍1.字符型2.整型3.浮点型4.布尔类型 二、变量1.变量的创建2.变量的分类 三、数据类型的长度(字节)1.sizeof 操作符2.各种数据类型的长度3.sizeof中表达式不计算 四、各种类型的取值范围1.signed和unsigned2.数据类型的取值范围 五、整型提升练习1练习2…

【Obsidian】当笔记接入AI,Copilot插件推荐

当笔记接入AI,Copilot插件推荐 自己的知识库笔记如果增加AI功能会怎样?AI的回答完全基于你自己的知识库余料,是不是很有趣。在插件库中有Copilot插件这款插件,可以实现这个梦想。 一、什么是Copilot? 我们知道githu…

el-input-number设置了min值,希望默认值展示为空

data() {return {editForm: {num: undefined, //input}} } <el-input-number v-model.trim"editForm.num" controls-position"right" :min"1" placeholder"请输入" clearable /> 展示效果如下:

C++中的左值(Lvalue)和右值(Rvalue)详解

C中的左值&#xff08;Lvalue&#xff09;和右值&#xff08;Rvalue&#xff09;详解 在C中&#xff0c;左值&#xff08;Lvalue&#xff09;和右值&#xff08;Rvalue&#xff09;的概念是理解表达式和变量的重要基础。为了提高C的性能和灵活性&#xff0c;C11引入了一些新的…

F1C100S/F1C200S的资料来源说明

文章目录 常用板子开源创客荔枝派榴莲派 我想说是的官网啥资料都没有。但是它的资料又很多&#xff0c;从淘宝或者其他地方能都搜到很多。 http://wiki.lcmaker.com/index.php?titleLC-PI-200S https://github.com/peng-zhihui/Planck-Pi?tabreadme-ov-file#head4 http://do…