SpringCloud Alibaba】(十三)学习 RocketMQ 消息队列

news2025/1/11 5:10:40

目录

  • 1、MQ 使用场景与选型对比
    • 1.1、MQ 的使用场景
    • 1.2、引入 MQ 后的注意事项
    • 1.3、MQ 选型对比
  • 2、下载、安装 RocketMQ 及 RocketMQ 控制台
    • 2.1、下载安装 RocketMQ
    • 2.2、测试 RocketMQ 环境
    • 2.3、RocketMQ 控制台【图形化管理控制台】
      • 2.3.1、下载、安装
      • 2.3.2、验证 RocketMQ 控制台
  • 3、RocketMQ 快速入门
    • 3.1、导入 RocketMQ 依赖
    • 3.2、编写生产者代码
    • 3.3、编写消费者代码
    • 3.4、测试消息的生产与消费
  • 4、集成 RocketMQ
    • 4.1、用户微服务集成 RocketMQ
    • 4.2、订单微服务整合 RocketMQ
    • 4.3、测试集成的 RocketMQ

1、MQ 使用场景与选型对比

1.1、MQ 的使用场景

MQ 的英文全称是 Message Queue,翻译成中文就是消息队列,队列实现了先进先出(FIFO)的消息模型。通过消息队列,我们可以实现多个进程之间的通信,例如,可以实现多个微服务之间的消息通信。MQ 的最简模型就是生产者生产消息,将消息发送到 MQ,消息消费者订阅 MQ,消费消息

在这里插入图片描述

MQ的使用场景通常包含:异步解耦、流量削峰

1.2、引入 MQ 后的注意事项

引入MQ最大的优点就是异步解耦和流量削峰,但是引入 MQ 后也有很多需要注意的事项和问题,主要包括:系统的整体可用性降低、系统的复杂度变高、引入了消息一致性的问题

  • 系统的整体可用性降低:在对一个系统进行架构设计时,引入的外部依赖越多,系统的稳定性和可用性就会降低。系统中引入了MQ,部分业务就会出现强依赖MQ的现象,此时,如果MQ宕机,则部分业务就会变得不可用。所以,引入MQ时,我们就要考虑如何实现MQ的高可用。
  • 系统的复杂度变高:引入 MQ 后,会使之前的同步接口调用变成通过 MQ 的异步调用,在实际的开发过程中,异步调用会比同步调用复杂的多。并且异步调用出现问题后,重现问题,定位问题和解决问题都会比同步调用复杂的多。并且引入 MQ 后,还要考虑如何保证消息的顺序等问题
  • 消息一致性问题 :引入 MQ 后,不得不考虑的一个问题就是消息的一致性问题。这期间就要考虑如何保证消息不丢失,消息幂等和消息数据处理的幂等性问题

1.3、MQ 选型对比

目前,在行业内使用的比较多的 MQ 包含 RabbitMQ、Kafka 和 RocketMQ。这里,我将三者的对比简单整理了个表格,如下所示:

消息中间件(MQ)优点缺点使用场景
RabbitMQ功能全面、消息的可靠性比较高吞吐量低,消息大量积累会影响性能,使用的开发语言是 erlang,不好定制功能规模不大的场景
Kafka吞吐量最高,性能最好,集群模式下高可用功能上比较单一,会丢失部分数据日志分析,大数据场景
RocketMQ吞吐量高,性能高,可用性高,功能全面。使用 Java 语言开发,容易定制功能开源版不如阿里云上版文档比较简单几乎支持所有场景,包含大数据场景和业务场景

2、下载、安装 RocketMQ 及 RocketMQ 控制台

2.1、下载安装 RocketMQ

Apache RocketMQ开发者指南

Windows 部署 RocketMQ

RocketMQ 下载、安装过程:

  1. 下载、解压。下载地址:二进制版本 4.9.2 官方下载

修改配置文件 conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

# 自动创建Topic
autoCreateTopicEnable=true
# nameServ地址
namesrvAddr=127.0.0.1:9876
# 存储路径
storePathRootDir=D:/dev/rocketmq-4.9.2/data/dataDir
# commitLog路径
storePathCommitLog=D:/dev/rocketmq-4.9.2/data/dataDir/commitlog
# 消息队列存储路径
storePathConsumeQueue=D:/dev/rocketmq-4.9.2/data/dataDir/consumequeue
# 消息索引存储路径
storePathIndex=D:/dev/rocketmq-4.9.2/data/dataDir/index
# checkpoint文件路径
storeCheckpoint=D:/dev/rocketmq-4.9.2/data/dataDir/checkpoint
# abort文件存储路径
abortFile=D:/dev/rocketmq-4.9.2/data/dataDir/abort
  1. 配置 ROCKETMQ 环境变量:否则,启动报错

    • 先添加一个环境变量 ROCKETMQ_HOME
      在这里插入图片描述
    • 在 Path 中进行添加

在这里插入图片描述

  1. 内存分配设置【可选】

①:编辑 server 启动文件:bin/runserver.cmd

set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

在这里插入图片描述

②:编辑 broker 启动文件:bin/runbroker.cmd

set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx256m -Xmn128m"

在这里插入图片描述

  1. 修改日志存储默认路径:conf/logback_broker.xmlconf/logback_namesrv.xmlconf/logback_tools.xml

在这里插入图片描述

  1. 启动 NameServer

命令行执行:

mqnamesrv.cmd

打印出如下信息,说明 RocketMQ 的 NameServer 启动成功了:

在这里插入图片描述

  1. 启动 Broker

命令行执行:

mqbroker.cmd -n localhost:9876

打印出如下信息,说明 RocketMQ 的 Broker 服务启动成功了:

在这里插入图片描述

2.2、测试 RocketMQ 环境

RocketMQ 内置了大量的测试案例,并且这些测试案例可以通过 RocketMQ 的 bin 目录下的 tools.cmd 命令进行测试

1、启动生产者程序向 RocketMQ 发送消息

重新打开 cmd 命令行,进入 RocketMQ 的 bin 目录,在命令行输入如下命令调用 RocketMQ 自带的生产者程序向 RocketMQ 发送消息:

set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Producer

可以看到,执行完上述两条命令后,生产者程序开始向 RocketMQ 发送消息:

在这里插入图片描述

2、启动消费者程序消费 RocketMQ 中的消息

重新打开 cmd 命令行,进入 RocketMQ 的 bin 目录,在命令行输入如下命令调用 RocketMQ 自带的消费者程序消费 RocketMQ 中的消息

set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Consumer

可以看到,执行完上述两条命令后,消费者程序开始消费 RocketMQ 中的消息:

在这里插入图片描述

2.3、RocketMQ 控制台【图形化管理控制台】

2.3.1、下载、安装

RocketMQ 控制台本质上是一个 SpringBoot 程序,启动后默认监听的端口是 8080。RocketMQ 的新版控制台已经从 RocketMQ 的 rocketmq-externals 项目中分离出来了。也就是说,新版的 RocketMQ 控制台已经从 https://github.com/apache/rocketmq-externals 链接所示的项目中分离出来,新版控制台的链接地址为:https://github.com/apache/rocketmq-dashboard

1、下载 RocketMQ 控制台源码
2、修改配置:src/main/resources/application.properties

  • 端口:7000
  • namesrvAddr:localhost:9876
  • dataPath:D:/dev/rocket-mq-master/data

在这里插入图片描述

3、打开 cmd 命令行,进入 RocketMQ 控制台源码的根目录,输入如下 Maven 命令开始编 RocketMQ 控制台的源码:

mvn clean install -Dmaven.test.skip=true

4、编译完成后,会在 RocketMQ 控制台源码的根目录下生成 target 目录,进入 target 目录,可以看到生成了 rocketmq-dashboard-1.0.1-SNAPSHOT.jar 文件,如下所示

在这里插入图片描述

5、重新打开 cmd 命令行,进入 rocketmq-dashboard-1.0.0.jar 文件所在的目录,在命令行直接输入如下命令启动 RocketMQ 控制台程序

java -jar rocketmq-console-ng-1.0.0.jar

2.3.2、验证 RocketMQ 控制台

在浏览器中输入 http://localhost:7000 后,出现如下画面说明 RocketMQ 启动成功【可切换语言】:

在这里插入图片描述

选择【Topic】菜单想后可以看到目前 RocketMQ 中存在一个名称为 TopicTest 的主题:

在这里插入图片描述

点击 TopicTest 主题的状态按钮,如下所示:

在这里插入图片描述

可以看到,正确显示出了 TopicTest 主题的消息队列信息,说明 RocketMQ 控制台启动成功了

3、RocketMQ 快速入门

3.1、导入 RocketMQ 依赖

在用户微服务 shop-userpom.xml 中,添加 RocketMQ 相关的依赖,如下所示:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.2</version>
</dependency>

3.2、编写生产者代码

在用户微服务的 src/test/java 目录下新建 com.zzc.rocketmq.test包,在包下创建 RocketMQProducer 类,作为 RocketMQ 的生产者,代码如下所示:

public class RocketMQProducer {
    public static void main(String[] args) throws Exception {
        // 1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
        // 2.指定Nameserver地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 3.启动producer
        producer.start();
        // 4.构建消息对象
        Message message = new Message("bingheTopic", "bingheTag", "Hello RocketMQ".getBytes());
        System.out.println("生产者发出的消息为:" + JSONObject.toJSONString(message));
        // 5.发送消息并接收结果
        SendResult sendResult = producer.send(message);
        System.out.println("生产者收到的发送结果信息为:" + JSONObject.toJSONString(sendResult));
        // 6.关闭生产者
        producer.shutdown();
    }
}

3.3、编写消费者代码

com.zzc.rocketmq.test 包下新建 RocketMQConsumer 类,作为 RocketMQ 的消费者,代码如下所示:

public class RocketMQConsumer {
    public static void main(String[] args) throws Exception {
        // 1.创建消息消费者 consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
        // 2.指定Nameserver地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 3.订阅 testTopic主题
        consumer.subscribe("testTopic", "*");
        // 4.设置消息监听,当收到消息时 RocketMQ 会回调消息监听
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                // 打印消息消费者收到的RocketMQ消息
                System.out.println("消费者收到的消息为:" + list);
                // 返回消息消费成功的标识
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.启动消费者
        System.out.println("消费者启动成功");
        consumer.start();
    }
}

3.4、测试消息的生产与消费

1、为了便于观察,这里我们先启动消费者程序 RocketMQConsumer,启动 RocketMQConsumer后会在 IDEA 的控制台打印如下信息:

在这里插入图片描述

2、运行生产者程序 RocketMQProducer,运行后 RocketMQProducer 程序控制台会输出如下信息:

在这里插入图片描述

说明生产者程序 RocketMQProducer 成功将消息发送到 RocketMQ

3、接下来,再看下消费者程序 RocketMQConsumer 的控制台,如下所示:

在这里插入图片描述

说明生产者发送到 RocketMQ 的消息,被消费者成功消费到了

4、集成 RocketMQ

在项目中模拟一个用户成功下单后,为用户发送通知,通知用户下单成功的逻辑。

具体的流程:

下单成功后将订单的信息发送到 RocketMQ,然后用户微服务订阅 RocketMQ 的消息,接收到消息后进行打印

  • 用户微服务:消费者
  • 订单微服务:生产者

4.1、用户微服务集成 RocketMQ

1、在用户微服务 shop-user 导入了 RocketMQ 的依赖

2、在用户微服务 shop-userapplication.yml 文件中添加如下 RocketMQ 的配置

rocketmq:
  name-server: 127.0.0.1:9876

3、在用户微服务 shop-user 中创建 com.zzc.user.mq 包,在包下创建 RocketConsumeListener,实现 org.apache.rocketmq.spring.core.RocketMQListener 接口,具体代码如下所示:

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "user-group", topic = "order-topic")
public class RocketConsumeListener implements RocketMQListener<Order> {

    @Override
    public void onMessage(Order order) {
        log.info("用户微服务收到了订单信息:{}", JSONObject.toJSONString(order));
    }
}

@RocketMQMessageListener 注解,表示当前类是一个 RocketMQ 的消费者,在@RocketMQMessageListener 注解中配置了消费者组为 user-group,主题为 order-topic

4.2、订单微服务整合 RocketMQ

1、在订单微服务 shop-orderpom.xml 文件中添加 RocketMQ 的依赖

2、在订单微服务 shop-orderapplication.yml 文件中添加如下配置:

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: order-group

3、修改 OrderServiceImpl

@Slf4j
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {
   // ...

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void saveOrder(OrderParamVo orderParamVo) {
        // ...
        log.info("库存扣减成功");
        // 发送消息
        rocketMQTemplate.convertAndSend("order-topic", order);
    }
}

4.3、测试集成的 RocketMQ

1、分别启动 Nacos,Sentinel,ZipKin 和 RocketMQ

2、分别启动用户微服务、商品微服务、订单微服务和网关服务

3、在浏览器中输入localhost:10001/server-order/order/submit_order?userId=1001&productId=1001&count=1

4、查看用户微服务 shop-user 的控制台,发现会输出订单的信息,如下所示:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2095605.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【困难】 猿人学web第一届 第14题 备而后动-勿使有变

调试干扰 进入题目 打开开发者工具会进入一个无限 debugger; 向上查看堆栈&#xff0c;可以找到生成 debugger 的代码段 手动解混淆后可以知道 debugger 生成的方式 (function () {// 函数内的代码是不需要的&#xff0c;因为里面的代码不会执行 }[constructor](debugger)[call…

Java并发编程面试必备:如何创建线程池、线程池拒绝策略

一、线程池 1. 线程池使用 1.1 如何配置线程池大小 如何配置线程池大小要看业务系统执行的任务更多的是计算密集型任务&#xff0c;还是I/O密集型任务。大家可以从这两个方面来回答面试官。 &#xff08;1&#xff09;如果是计算密集型任务&#xff0c;通常情况下&#xff…

模型 ACT心理灵活六边形

系列文章 分享 模型&#xff0c;了解更多&#x1f449; 模型_思维模型目录。接纳现实&#xff0c;灵活行动&#xff0c;追求价值。 1 ACT心理灵活六边形的应用 1.1 应对工作压力 背景&#xff1a; 在高压的工作环境中&#xff0c;员工经常面临巨大的工作压力&#xff0c;这可…

在VScode中使用Git将本地已有文件夹提交到Github仓库以便于使用版本控制进行项目开发

前置软件 VScode、Git。 Linux系统中安装Git工具请自行百度。可以通过git --version查看对应Git版本号。 Github创建空白仓库 一定要注意创建空白仓库&#xff0c;不要包含任何文件&#xff0c;包括Readme.md文件也不能有。 上面的仓库名&#xff08;Repository name&#xff…

Kaggle克隆github项目+文件操作+Kaggle常见操作问题解决方案——一文搞定,以openpose姿态估计项目为例

文章目录 前言一、Kaggle克隆仓库1、克隆项目2、查看目录 二、安装依赖三、文件的上传、复制、转移操作1.上传.pth文件到input目录2、将权重文件从input目录转移到工作目录 三、修改工作目录里的文件内容1、修改demo_camera.py内容 四、运行&#xff01; 前言 想跑一些深度学习…

【网络安全】条件竞争绕过电子邮件验证

未经许可,不得转载。 文章目录 正文正文 目标:xxx.com 使用电子邮件注册该网站并登录。接着,进入帐户设置,进入更改电子邮件功能: 请求包如下: 接着,发送两个相同的请求包到repeater,第一个中添加攻击者邮件: 第二个中添加正常的邮件: 创建组,以便能够同时发送两个…

手把手教你如果安装激活CleanMyMac X 4.15.6中文破解版

CleanMyMac X 4.15.6中文破解版可以为Mac腾出空间&#xff0c;软件已经更新到CleanMyMac X 4.15.6中文版支持最新版Macos 10.14系统。CleanMyMac X 4.15.6中文破解版具有一系列巧妙的新功能&#xff0c;可让您安全&#xff0c;智能地扫描和清理整个系统&#xff0c;删除大量未使…

NeRF: Representing Scenes asNeural Radiance Fields for View Synthesis 论文解读

目录 一、导言 二、NeRF 1、渲染和反渲染 2、NeRF的基本原理 3、采样点 4、位置编码 5、NeRF网络结构 6、体渲染 三、分层采样 1、均匀采样 2、基于σ的采样 四、损失函数 一、导言 该论文来自于ECCV2020&#xff0c;主要提到一种NeRF的方法来合成复杂场景下的新视…

创建 AD9361 的 vivado 工程,纯FPGA配置,不使用ARM程序

前言 AD9361 的配置程序&#xff0c;如果使用官方的&#xff0c;就必须用ps进行配置&#xff0c;复杂不好使&#xff0c;如果直接使用FPGA配置&#xff0c;将会特别的简单。 配置软件 创建一份完整的寄存器配置表 //*******************************************************…

续:docker 仓库数据传输加密

上一个实验&#xff1a;非加密的形式在企业中是不被允许的。 示例&#xff1a;【为Registry 提供加密传输】 因为传输也是https&#xff0c;所以与ssh一样的加密。 ## 这种方式就不用写这个了。 [rootdocker ~]# cat /etc/docker/daemon.json #{ # "insecure-registrie…

GoodSync Business - 企业级服务器同步与备份工具

现在越来越多公司会搭建服务器&#xff0c;或自建文件共享中心。那么如何才能实现对这些终端的高效管理、安全备份&#xff0c;以保障企业数据的安全呢&#xff1f; GoodSync Business 就是一款企业服务器同步与备份工具&#xff0c;适用于 Win / Mac 工作站&#xff0c;以及 …

C语言程序设计

日落有个小商店&#xff0c;贩卖着橘黄色的温柔。 7.关系操作符 > > < < ! (用于测试“不相等”) &#xff08;用于测试“相等”&#xff0c;但是不是所有的对象都可以用该符号来比较相不相等&#xff09; eg. int main ( ) { if ("abc"&q…

【AI大模型】基于docker部署向量数据库Milvus和可视化工具Attu详解步骤

&#x1f680; 作者 &#xff1a;“大数据小禅” &#x1f680; 文章简介 &#xff1a;本专栏后续将持续更新大模型相关文章&#xff0c;从开发到微调到应用&#xff0c;需要下载好的模型包可私。 &#x1f680; 欢迎小伙伴们 点赞&#x1f44d;、收藏⭐、留言&#x1f4ac; 目…

【王树森】Few-Shot Learning (2/3): Siamese Network 孪生网络(个人向笔记)

Learning Pairwise Similarity Scores Training Data 训练集有很多个类别的图片&#xff0c;每个类别的图片都有标注 Positive Sample&#xff1a;我们需要正样本来告诉神经网路什么东西是同一类 Negative Sample&#xff1a;负样本可以告诉神经网路事物之间的区别 我们用…

【笔记】数据结构04

文章目录 稀疏矩阵压缩存储三元组表示稀疏矩阵转三元组 栈的应用前缀表达式前缀表达式求值 中缀表达式后缀表达式 稀疏矩阵内容参考 强连通文章 稀疏矩阵压缩存储三元组表示 将非零元素所在的行、列以及它的值构成一个三元组&#xff08;i,j,v&#xff09;&#xff0c;然后再…

推荐 3 个好用的桌面管理工具,实用强大,不要错过

BitDock BitDock是一款运行在Windows系统中的桌面停靠栏工具&#xff0c;也被称为比特工具栏。这款软件模仿了Mac系统的Dock栏设计&#xff0c;旨在为用户提供一个简洁、高效且极具美感的桌面环境。BitDock不仅具有个性化的外观&#xff0c;还内置了多种实用功能&#xff0c;如…

Java后端 - 常见BUG及其处理策略(持续更新中~)

Bug 收集与总结 本文记录的是 本人SpringBoot 后端项目使用和运行代码时所遇到的各种问题&#xff0c;全部都已解决&#xff0c;欢迎在评论区补充你遇到的 Bug 哦&#xff01;仅以本文记录学习社区项目时&#xff0c;所遇到的奇奇怪怪的 bug&#xff0c;以及一些很愚蠢的错误&…

从马斯洛需求层次理论谈职场激励

马斯洛需求层次理论是一个广为人知的心理学概念&#xff0c;它是由美国心理学家亚伯拉罕马斯洛&#xff08;Abraham Maslow&#xff09;于1943年提出&#xff0c;常被描述为一个5层的金字塔模型&#xff0c;反映了人类不同需求层级和依赖关系。马斯洛认为人类只有在低层级需求得…

在 VS Code 中使用 Git 源代码管理【Mac 版】

文章目录 一、Git 使用文档二、使用示例1、复制远程仓库地址2、查看当前所在的分支2.1、界面查看2.2、终端查看 3、修改/新增文件4、显示增改的详细内容5、添加暂存区6、查看/取消暂存的更改7、提交本地代码库8、待提交文件9、推送到远程仓库10、验证11、查看推送记录11.1、关于…

六. 部署分类器-trt-engine-explorer

目录 前言0. 简述1. 案例运行2. 补充说明3. engine分析结语下载链接参考 前言 自动驾驶之心推出的 《CUDA与TensorRT部署实战课程》&#xff0c;链接。记录下个人学习笔记&#xff0c;仅供自己参考 本次课程我们来学习课程第六章—部署分类器&#xff0c;一起来学习 trt-engine…