Linux部署RocketMQ并使用SpringBoot创建生产、消费者

news2025/1/22 18:58:31
  • 😜           :是江迪呀
  • ✒️本文关键词RocketMQ消息队列
  • ☀️每日   一言在你心灰意冷、心烦意乱时也不要停下你的脚步!

在这里插入图片描述

一、前言

RocketMQ(Apache RocketMQ)是一种开源的分布式消息中间件系统,最初由阿里巴巴开发并捐赠给 Apache 基金会。它提供了可靠的、低延迟的消息传递能力,适用于构建大规模分布式系统中的消息通信。RocketMQ 主要用于解决分布式系统中异步通信、解耦、流量削峰等问题。下面让我们一起看下,如何在Linux上部署RocketMQ~

二、介绍RocketMQ

2.1 RocketMQ产生背景

随着业务规模的扩大,阿里巴巴面临着越来越多的分布式系统构建需求。为了解决这个问题,阿里巴巴集团2012年推出的开源分布式消息中间件 —— RocketMQ

2.1 RocketMQ作用

(1)异步通信和解耦: RocketMQ可以在不同的服务之间实现异步通信,解耦了服务之间的紧耦合关系,提高了系统的可维护性和可扩展性。
(2)流量削峰: RocketMQ支持消息积压和消费速率不匹配时的流量削峰功能,防止系统因突发流量而崩溃。
(3)实时数据同步: 用于将数据实时同步到不同的存储介质,保持数据的一致性。
(4)事件驱动架构: RocketMQ支持事件驱动的架构,使得系统能够更加敏捷地响应业务事件。

2.2 RocketMQ的组件

RocketMQ 的主要组件包括:
(1)Producer:负责发送消息到 RocketMQ 服务器。
(2)Broker:消息中转服务器,负责存储消息并提供消息的读写服务。
(3)Consumer:Broker 订阅并消费消息。
(4)Topic:消息的分类,Producer 发送消息到特定的 TopicConsumer 订阅相应的 Topic
(5)Tag: 对消息的进一步分类,可以用于 Consumer 进一步过滤消息。
(6)Message Queue: 每个 Topic 下可以分成多个 Message Queue,实现消息的分区和负载均衡。

2.3 RocketMQ的优缺点

(1)优点

  • 高吞吐量: RocketMQ具有高吞吐量的特点,适用于大量消息的处理。
  • 可靠性: RocketMQ通过消息的持久化存储和复制机制,确保消息不会丢失。
  • 低延迟: RocketMQ在消息传递过程中能够保持较低的延迟,适用于实时性要求较高的场景。
  • 灵活的消息模式: 支持发布-订阅和点对点两种消息模式,根据业务需求进行选择。
  • 水平扩展: 可以通过增加Broker节点来实现水平扩展,提高消息处理能力。

(2)缺点

  • 维护成本: RocketMQ需要维护多个组件,包括ProducerBrokerConsumer等,涉及到一定的运维成本。
  • 学习曲线: 对于新手来说,学习和理解RocketMQ的一些概念和配置可能需要一定的时间。
  • 一致性保障: 虽然RocketMQ通过复制机制保障了消息的可靠性,但在极端情况下可能会存在消息的重复传递或乱序问题。

三、 RocketMQ如何部署

3.1 下载

RocketMQ下载地址

3.2 上传、解压

上传文件到Linux有两种方式:

(1)上传

  • 通过rz命令
rz

你可以使用rz命令,在使用这个命令之前你必须确保linux已经安装了lrzsz,安装命令如下:

sudo apt-get update
sudo apt-get install lrzsz
  • 使用xftp
    这个我就不赘述了。
    在这里插入图片描述

(2)解压

unzip rocketmq-all-4.5.2-bin-release.zip

如果没有安装unzip,需要安装一下:

// 查看 unzip 包的安装情况
yum list unzip
//没有安装时,使用命令安装 unzip
yum list unzipyum install unzip.x86_64

在这里插入图片描述

3.2 启动RocketMQ

RocketMQ的启动主要涉及到Namesrv(命名服务)Broker(消息存储和消费者服务)两部分。要想启动RocketMQ,首先进入解压后的bin目录:

cd rocketmq-all-4.5.2-bin-release/bin

(1)启动Namesrv并设置输出日志位置

nohup sh mqnamesrv > namesrv.log 2>&1 &

(2)启动Broker并设置输出日志位置

nohup sh mqbroker -n localhost:9876 > broker.log 2>&1 &

查看是否启动:

jps

输出下面的内容说明启动成功了:

2931 NamesrvStartup
25599 Jps
25583 BrokerStartup

在启动Broker会出现失败问题,一般来说就是内存不足RocketMq默认的虚拟机内存较大,因而启动失败,需要编辑如下两个配置文件,修改jvm的内存大小:

//编辑runbroker.sh和runserver.sh修改默认的JVM大小
vim runbroker.sh
vim runserver.sh 

在这里插入图片描述
修改为:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -xx:metaspaceSize=128m -XX:MaxMetaspaceSize=320m"

如果还是启动不了,需要将NameServer关闭,重新启动一下,同样是先进入bin目录,关闭命令如下:

sh mqshutdown namesrv

四、测试与关闭

4.1 测试

(1) 发送消息(生产者)

//设置环境变量
export NAMESRV_ADDR=localhost:9876
//使用安装包的demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

在这里插入图片描述
上面的信息就是RocketMQproducer发送的消息。特点:启动发送完毕消息后就会停止。

(2) 接收消息(消费者)

//设置环境变量
export NAMESRV_ADDR=localhost:9876
//接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

在这里插入图片描述

4.2 关闭RocketMQ

//关闭namesrv
sh bin/mqshutdown namesrv
//关闭Broker
sh bin/mqshutdown broker

五、SpringBoot连接RocketMQ

5.1 引入依赖

<dependencies>
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-thymeleaf</artifactId>
     <version>2.3.5.RELEASE</version>
 </dependency>
    
 <dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-client</artifactId>
     <version>4.5.2</version> 
 </dependency>
</dependencies>

5.2 配置文件application.properties

# Name Server地址
rocketmq.name-server=your-nameserver-ip:9876
# 生产者组名
rocketmq.producer.group=my-producer-group
# 消费者组名
rocketmq.consumer.group=my-consumer-group

5.3 生产者

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class RocketMQProducer {

    @Value("${rocketmq.name-server}")
    private String nameServer;

    @Value("${rocketmq.producer.group}")
    private String producerGroup;

    public void sendMessage(String topic, String message) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(nameServer);
        producer.start();
        // 创建消息对象,设置消息内容
        org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message(topic, message.getBytes());
        // 发送消息
        producer.send(msg);
        producer.shutdown();
    }
}

5.4 消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class RocketMQConsumer {

    @Value("${rocketmq.name-server}")
    private String nameServer;

    @Value("${rocketmq.consumer.group}")
    private String consumerGroup;
    public void startConsumer(String topic) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(nameServer);
        // 订阅主题和标签,可以根据需要进行过滤
        consumer.subscribe(topic, "*");
        // 注册消息监听器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (org.apache.rocketmq.common.message.MessageExt msg : msgs) {
                System.out.println("Received message: " + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
    }

5.5 启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class RocketMQDemoApplication {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(RocketMQDemoApplication.class, args);

        RocketMQProducer producer = context.getBean(RocketMQProducer.class);
        producer.sendMessage("my-topic", "Hello, RocketMQ!");
        RocketMQConsumer consumer = context.getBean(RocketMQConsumer.class);
        consumer.startConsumer("my-topic");
    }
}

六、RocketMQ集群

上面所述的是单体RocketMQ,也能使用。但是如果你想要实现高可用在实际的业务场景中。RocketMQ大部分都不会单体存在,需要搭建集群来实现高可用

有人已经写好了,而且很详细:传送门

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/938424.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Day-21慢就是快】代码随想录-栈与队列-逆波兰表达式求值

逆波兰表达式&#xff1a;是一种后缀表达式&#xff0c;所谓后缀就是指运算符写在后面。 平常使用的算式则是一种中缀表达式&#xff0c;如 ( 1 2 ) * ( 3 4 ) 。 该算式的逆波兰表达式写法为 ( ( 1 2 ) ( 3 4 ) * ) 。 逆波兰表达式主要有以下两个优点&#xff1a; 去掉…

Cesium 显示经纬高

文章目录 需求分析 需求 页面展示经、纬度和高 分析 html <div id"latlng_show" style"width:340px;height:30px;position:absolute;bottom:40px;right:200px;z-index:1;font-size:15px;"><div style"width:100px;height:30px;float:left;…

浅析三维模型OBJ格式轻量化压缩文件大小的技术方法

浅析三维模型OBJ格式轻量化压缩文件大小的技术方法 在减小三维模型OBJ格式轻量化文件大小方面&#xff0c;有许多技术和方法可以使用。下面我将介绍一些常用的方法来减小OBJ文件的大小。 1、优化顶点数量&#xff1a;减少OBJ文件中的顶点数量是减小文件大小的一种有效方法。可…

LSF 安装目录,快速参考 LSF 命令、守护程序、配置文件、日志文件和重要集群配置参数

样本 UNIX 和 Linux 安装目录 守护程序错误日志文件 守护程序错误日志文件存储在 LSF_LOGDIR 在 lsf.conf 文件中定义的目录中。 LSF 基本系统守护程序日志文件LSF 批处理系统守护程序日志文件pim.log.host_namembatchd.log.host_namembatchd.log.host_namesbatchd.log.host_…

【Go 基础篇】切片:Go语言中的灵活数据结构

在Go语言中&#xff0c;切片&#xff08;Slice&#xff09;是一种强大且灵活的数据结构&#xff0c;用于管理和操作一系列元素。与数组相比&#xff0c;切片的大小可以动态调整&#xff0c;这使得它成为处理动态数据集合的理想选择。本文将围绕Go语言中切片的引入&#xff0c;介…

Java【手撕双指针】LeetCode 18. “四数之和“, 图文详解思路分析 + 代码

文章目录 前言一、四数之和1, 题目2, 思路分析3, 代码 前言 各位读者好, 我是小陈, 这是我的个人主页, 希望我的专栏能够帮助到你: &#x1f4d5; JavaSE基础: 基础语法, 类和对象, 封装继承多态, 接口, 综合小练习图书管理系统等 &#x1f4d7; Java数据结构: 顺序表, 链表, 堆…

C# Emgu.CV 条码检测

效果 项目 代码 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms; using Emgu.CV; using Emgu.CV.Util; using static Emgu.C…

ThinkPHP 资源路由的简单使用,restfull风格API

ThinkPHP 资源路由的简单使用&#xff0c;restfull风格API 一、资源控制器二、资源控制器简单使用 一、资源控制器 资源控制器可以轻松的创建RESTFul资源控制器&#xff0c;可以通过命令行生成需要的资源控制器&#xff0c;例如生成index应用的TestR资源控制器使用&#xff1a…

layoutdm:discrete diffusion model for controllable layout generation

自动布局生成是我之前做banner生成中非常重要的一步&#xff0c;好的布局是成功的一半&#xff0c;在19年之前甚至更早时间&#xff0c;我们做这块主要是对标阿里的鹿班&#xff0c;那时候的技术方案主要是我之前发的smartbanner中&#xff0c;基本还是要靠模板以及相应的pipel…

软考A计划-网络工程师-必考知识点-上

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例点击跳转>软考全系列点击跳转>蓝桥系列 &#x1f449;关于作者 专注于Android/Unity和各种游…

[C/C++]笔记-函数的栈空间(避免栈空间溢出)

个人主页&#xff1a;北海 &#x1f390;CSDN新晋作者 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏✨收录专栏&#xff1a;C/C&#x1f91d;希望作者的文章能对你有所帮助&#xff0c;有不足的地方请在评论区留言指正&#xff0c;大家一起学习交流&#xff01;&#x1f9…

机器视觉工程师们,人生而不平等,你更要迈出第一步的勇气

我永远相信人生而不公平&#xff0c;所以你必须跨出第一步。 有些事情试试才知道&#xff0c;每一个人每件事对待每一个人都是公平公正的。 很多小白&#xff0c;做事情总有一种胆怯的心理&#xff0c;我给一句忠告&#xff0c;先干再说&#xff0c;一边干&#xff0c;一边思考…

基于加密接口的测试用例设计

这里写目录标题 一、环境准备二、原理三、实战练习 一、环境准备 1、对响应加密的接口。对它发起一个get请求后&#xff0c;得到一个加密过后的响应信息。(如果有可用的加密过的接口以及了解它的解密方法&#xff0c;可以跳过) 2、准备一个加密文件 加密字段 3、使用python…

Java【手撕双指针】LeetCode 15. “三数之和“, 图文详解思路分析 + 代码

文章目录 前言一、三数之和1, 题目2, 思路分析3, 代码 前言 各位读者好, 我是小陈, 这是我的个人主页, 希望我的专栏能够帮助到你: &#x1f4d5; JavaSE基础: 基础语法, 类和对象, 封装继承多态, 接口, 综合小练习图书管理系统等 &#x1f4d7; Java数据结构: 顺序表, 链表, 堆…

【base64】JavaScriptuniapp 将图片转为base64并展示

Base64是一种用于编码二进制数据的方法&#xff0c;它将二进制数据转换为文本字符串。它的主要目的是在网络传输或存储过程中&#xff0c;通过将二进制数据转换为可打印字符的形式进行传输 JavaScript 压缩图片 <html><body><script src"https://code.j…

Python绘图系统10:在父组件中使用子组件的函数

文章目录 Combobox绑定事件互相调用源代码 Python绘图系统&#xff1a; &#x1f4c8;从0开始实现一个三维绘图系统自定义控件&#xff1a;坐标设置控件&#x1f4c9;坐标列表控件&#x1f4c9;支持多组数据的绘图系统图表类型和风格&#xff1a;散点图和条形图&#x1f4ca;混…

【2023】数字信号处理之Fourier分析

目录 一、基础概念 1. 时域 2. 频域 3. Fourier分析级数变换 Fourier级数 Fourier变换 离散谱 连续谱 4. 欧拉公式&#xff01;&#xff01;&#xff01; 欧拉恒等式 二 、三角函数系及Fourier级数 1. 三角函数系 概念 性质——周期性、正交性、完备性 Fourier系…

AI时代,程序员需要焦虑吗?

原文来自 微信公众号"互联网技术人进阶之路". 目录 前言一、程序员会被 AI 取代么&#xff1f;二、服务端开发尚难被 AI 取代三、服务端开发何去何从&#xff1f;四、业界首部体系化、全景式解读服务端开发的著作第一部分&#xff1a;服务端开发的技术和方法第二部分…

java基于SpringBoot+vue的宠物用品商城交易平台的设计与实现y704t

在此基础上&#xff0c;结合现有宠物用品交易体系的特点&#xff0c;运用新技术&#xff0c;构建了以 springboot为基础的宠物用品交易信息化管理体系。首先&#xff0c;以需求为依据&#xff0c;根据需求分析结果进行了系统的设计&#xff0c;并将其划分为管理员和用户二种角色…

MySQL索引 事物 存储引擎

一 索引 索引的概念 索引就是一种帮助系统能够更快速的查找信息的结构 索引的作用 索引的副作用 创建索引的规则 MySQL的优化 哪些字段/场景适合创建索引 哪些不适合 小字段唯一性强的字段更新不频繁&#xff0c;但查询率比较高的字段表记录超过 300行主键&#xff0c;外键…