【消息队列RocketMQ】五、RocketMQ 实战应用与生态拓展

news2025/4/25 6:37:18

本篇文章主要将结合前面几篇文章的基础讲解,来演示RocketMQ的实际场景中的应用。

一、RocketMQ 实战应用场景​

1.1 电商系统中的应用​

在电商系统中,RocketMQ 承担着重要角色。以双十一大促活动为例,短时间内会产生海量的订单请求、库存变更请求和支付请求。​

订单处理:用户下单后,订单服务将下单消息发送至 RocketMQ。订单相关的后续操作,如库存扣减、优惠券核销、物流信息生成等,都通过订阅该订单消息实现异步处理。通过配置broker.conf文件,可设置消息的持久化策略,保证订单消息不丢失。例如,将flushDiskType设置为SYNC_FLUSH,确保消息实时刷盘 。

flushDiskType=SYNC_FLUSH

库存同步:当库存发生变化时,库存服务发送库存变更消息到 RocketMQ,其他依赖库存信息的服务(如商品展示服务、订单服务)订阅该消息,实现库存数据的实时同步。在 CentOS 7 系统中,可使用以下命令启动 Producer 发送库存变更消息:

nohup sh tools.sh org.apache.rocketmq.example.quickstart.Producer &

1.2 金融领域的应用​

在金融行业,对数据一致性和可靠性要求极高。RocketMQ 的事务消息特性在此发挥关键作用。​

转账业务:以跨行转账为例,在转账操作中,涉及到转出账户扣减和转入账户增加两个操作。通过 RocketMQ 的事务消息,确保这两个操作要么都成功,要么都失败。在生产者代码中,需要实现TransactionListener接口来处理事务消息。

TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        // 执行本地事务,如账户扣款
        try {
            // 模拟扣款操作
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        // 检查本地事务状态
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});

对账系统:每日交易结束后,各业务系统将交易数据发送至 RocketMQ,对账系统订阅这些消息,进行数据核对。通过调整broker.conf中的transactionCheckInterval参数,可设置事务消息的回查间隔,确保事务的最终一致性。

transactionCheckInterval=60000 # 单位毫秒,设置1分钟回查一次

1.3 日志处理系统​

日志处理是 RocketMQ 的常见应用场景之一。​

日志收集:各应用服务将日志信息发送到 RocketMQ,日志收集服务订阅相关 Topic,将日志数据存储到分布式文件系统(如 HDFS)中。在 CentOS 7 上,可通过修改 Producer 的配置,设置日志消息的 Topic 和标签。

DefaultMQProducer producer = new DefaultMQProducer("log_producer_group");
producer.setNamesrvAddr("localhost:9876");
Message msg = new Message("LogTopic", "InfoTag", logContent.getBytes());
producer.send(msg);

日志分析:数据分析服务从 RocketMQ 中消费日志消息,进行实时数据分析,如统计接口调用频率、用户行为分析等。为了提高日志消息的消费效率,可在broker.conf中调整defaultTopicQueueNums参数,增加 Topic 的队列数量。

defaultTopicQueueNums=8

二、RocketMQ 与其他技术的集成​

2.1 与 Spring Cloud 的集成​

Spring Cloud 是一套微服务框架,与 RocketMQ 集成后,可实现微服务之间的异步通信和解耦。​

引入依赖:在 Spring Cloud 项目的pom.xml文件中添加 RocketMQ 相关依赖。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

配置文件修改:在application.yml文件中配置 RocketMQ 的 NameServer 地址和 Producer 组名。

rocketmq:
  name-server: localhost:9876
  producer:
    group: spring_cloud_producer_group

发送与消费消息:在 Spring Boot 应用中,通过注入RocketMQTemplate发送消息,创建@RocketMQMessageListener注解的消费者类接收消息。

// 发送消息
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String message) {
    rocketMQTemplate.convertAndSend("SpringCloudTopic", message);
}

// 消费消息
@Component
@RocketMQMessageListener(topic = "SpringCloudTopic", consumerGroup = "spring_cloud_consumer_group")
public class SpringCloudConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

2.2 与 Kubernetes 的集成​

Kubernetes 是容器编排工具,将 RocketMQ 部署在 Kubernetes 集群中,可提高集群的资源利用率和管理效率。​

创建 RocketMQ 的 Kubernetes 资源文件:包括Deployment、Service和PersistentVolumeClaim等文件。以Deployment为例:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: rocketmq-broker
spec:
  replicas: 2
  selector:
    matchLabels:
      app: rocketmq-broker
  template:
    metadata:
      labels:
        app: rocketmq-broker
    spec:
      containers:
      - name: rocketmq-broker
        image: rocketmqinc/rocketmq-broker:4.9.4
        ports:
        - containerPort: 10911
        - containerPort: 10909
        volumeMounts:
        - name: data-volume
          mountPath: /home/rocketmq/store
      volumes:
      - name: data-volume
        persistentVolumeClaim:
          claimName: rocketmq-pvc

部署到 Kubernetes 集群:在 CentOS 7 上,通过kubectl命令将资源文件应用到集群中。

kubectl apply -f rocketmq-deployment.yaml
kubectl apply -f rocketmq-service.yaml
kubectl apply -f rocketmq-pvc.yaml

三、RocketMQ 生态拓展​

3.1 社区生态发展​

RocketMQ 作为 Apache 顶级项目,拥有活跃的社区。社区成员不断贡献新功能、修复 Bug,推动 RocketMQ 的版本迭代。例如,在 RocketMQ 5.0 版本中,引入了新的存储引擎和通信协议,进一步提升了性能和可扩展性。开发者可以通过 Apache 官方网站(RocketMQ · 官方网站 | RocketMQ)获取最新版本信息和技术文档,也可以在 GitHub 仓库(https://github.com/apache/rocketmq)参与开源项目的开发和讨论。​

3.2 周边工具生态​

管理工具

RocketMQ Console 是一款可视化管理工具,方便用户对 RocketMQ 集群进行监控和管理。在 CentOS 7 上,可通过以下步骤部署:​

        1、下载 RocketMQ Console 的 Jar 包:访问 RocketMQ Console 的 GitHub 仓库(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console),在releases页面下载最新版本的 Jar 包,也可以使用wget命令在 CentOS 7 终端下载,例如:

wget https://github.com/apache/rocketmq-externals/releases/download/v1.0.1/rocketmq-console-ng-1.0.1.jar

        2、执行命令启动:使用以下命令启动 RocketMQ Console,其中--rocketmq.config.namesrvAddr指定 NameServer 的地址和端口,--server.port指定 RocketMQ Console 的服务端口:

nohup java -jar rocketmq-console-ng-1.0.1.jar --rocketmq.config.namesrvAddr=localhost:9876 --server.port=8080 &

         3、访问与使用:通过浏览器访问http://localhost:8080,进入 RocketMQ Console 界面。在该界面中,用户可以直观地查看 Topic、Consumer Group、Broker 等信息,支持创建、删除 Topic,查看 Consumer 的消费进度、消息堆积情况等操作。例如,在 Topic 管理页面,可查看每个 Topic 的消息数量、队列分布,还能手动调整 Topic 的读写权限 。

        4、配置优化:如果需要调整 RocketMQ Console 的配置,如增加内存分配,可修改启动命令为:

nohup java -Xmx512m -Xms256m -jar rocketmq-console-ng-1.0.1.jar --rocketmq.config.namesrvAddr=localhost:9876 --server.port=8080 &

-Xmx512m设置最大堆内存为 512MB,-Xms256m设置初始堆内存为 256MB,以适应大规模集群监控需求。 

    

监控工具

Prometheus 和 Grafana 可以与 RocketMQ 集成,实现对 RocketMQ 集群的监控。通过配置 Prometheus 的采集规则,获取 RocketMQ 的指标数据(如消息发送量、消费延迟等),然后在 Grafana 中进行可视化展示。

        1、Prometheus 配置:首先,在 CentOS 7 上安装 Prometheus。下载 Prometheus 的二进制包,解压后进入目录,编辑prometheus.yml配置文件,添加 RocketMQ 的监控指标采集任务。例如:

global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'rocketmq'
    static_configs:
      - targets: ['localhost:9876']  # NameServer地址,根据实际情况修改
    metrics_path: /metrics  # RocketMQ暴露指标的路径
    params:
      module: [rocketmq_exporter]

保存配置后,使用以下命令启动 Prometheus:

nohup./prometheus --config.file=prometheus.yml &

Prometheus 会按照配置定期从 RocketMQ 中采集消息发送量、消费延迟、Broker 负载等指标数据。​

        2. Grafana 配置:安装 Grafana 后,访问其 Web 界面(默认地址为http://localhost:3000),使用默认账号密码(admin/admin)登录。在 Grafana 中,首先添加 Prometheus 作为数据源,在数据源配置页面,输入 Prometheus 的地址和端口(如http://localhost:9090,9090 为 Prometheus 默认端口),保存测试连接成功后,即可导入 RocketMQ 相关的监控仪表盘模板。可以从 Grafana 官方网站(https://grafana.com/grafana/dashboards/)搜索 RocketMQ 相关模板,下载 JSON 文件后,在 Grafana 中通过 “+” -> “Import” 导入模板,就能直观地查看 RocketMQ 集群的各项监控指标图表,如消息吞吐量趋势图、Consumer 消费速率对比图等 。

测试工具:

除了基础的管理和监控工具,RocketMQ 生态中还有用于性能测试的工具,如rocketmq-tools自带的压力测试功能。在 CentOS 7 的 RocketMQ 安装目录bin文件夹下,通过以下命令可以进行简单的消息发送压力测试:

sh tools.sh org.apache.rocketmq.example.perf.PerfTestProducer -t TestTopic -n 100000 -m 1024

上述命令中,-t指定测试的 Topic 为TestTopic,-n表示发送 100000 条消息,-m指定每条消息大小为 1024 字节。通过调整这些参数,可以模拟不同场景下的消息发送压力,测试 RocketMQ 集群的性能表现。同时,还可以使用Jmeter与 RocketMQ 结合,进行更复杂的性能测试。在Jmeter中,需要添加 “Java 请求”,引入 RocketMQ 的客户端依赖包,编写 Java 代码实现消息的发送和接收测试,从而全面评估 RocketMQ 在高并发场景下的性能和稳定性。

数据迁移工具:

在实际应用中,当需要对 RocketMQ 集群进行升级、数据迁移等操作时,rocketmq-migration-tool可以发挥重要作用。该工具可以实现不同 RocketMQ 集群之间的数据迁移,支持 Topic、Consumer Group 等信息的迁移。使用时,先在 CentOS 7 系统上下载工具包,配置源集群和目标集群的 NameServer 地址、迁移规则等信息,通过执行迁移命令,即可将消息数据、消费进度等从源集群迁移到目标集群。例如:

java -jar rocketmq-migration-tool.jar --sourceNamesrvAddr=source-namesrv:9876 --targetNamesrvAddr=target-namesrv:9876 --topic=MigrationTopic

上述命令将MigrationTopic的相关数据从源集群(source-namesrv:9876)迁移到目标集群(target-namesrv:9876),保障了业务系统在集群变更过程中的数据完整性和连续性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2342200.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机组成与体系结构:直接内存映射(Direct Memory Mapping)

目录 CPU地址怎么找到真实的数据&#xff1f; 内存映射的基本单位和结构 1. Pages&#xff08;页&#xff09;——虚拟地址空间的基本单位 2. Frames&#xff08;页框&#xff09;——物理内存空间的基本单位 3. Blocks&#xff08;块&#xff09;——主存和缓存之间的数据…

STM32提高篇: 蓝牙通讯

STM32提高篇: 蓝牙通讯 一.蓝牙通讯介绍1.蓝牙技术类型 二.蓝牙协议栈1.蓝牙芯片架构2.BLE低功耗蓝牙协议栈框架 三.ESP32-C3中的蓝牙功能1.广播2.扫描3.通讯 四.发送和接收 一.蓝牙通讯介绍 蓝牙&#xff0c;是一种利用低功率无线电&#xff0c;支持设备短距离通信的无线电技…

SpringMVC处理请求映射路径和接收参数

目录 springmvc处理请求映射路径 案例&#xff1a;访问 OrderController类的pirntUser方法报错&#xff1a;java.lang.IllegalStateException&#xff1a;映射不明确 核心错误信息 springmvc接收参数 一 &#xff0c;常见的字符串和数字类型的参数接收方式 1.1 请求路径的…

【程序员 NLP 入门】词嵌入 - 上下文中的窗口大小是什么意思? (★小白必会版★)

&#x1f31f; 嗨&#xff0c;你好&#xff0c;我是 青松 &#xff01; &#x1f308; 希望用我的经验&#xff0c;让“程序猿”的AI学习之路走的更容易些&#xff0c;若我的经验能为你前行的道路增添一丝轻松&#xff0c;我将倍感荣幸&#xff01;共勉~ 【程序员 NLP 入门】词…

从物理到预测:数据驱动的深度学习的结构化探索及AI推理

在当今科学探索的时代&#xff0c;理解的前沿不再仅仅存在于我们书写的方程式中&#xff0c;也存在于我们收集的数据和构建的模型中。在物理学和机器学习的交汇处&#xff0c;一个快速发展的领域正在兴起&#xff0c;它不仅观察宇宙&#xff0c;更是在学习宇宙。 AI推理 我们…

大模型AI的“双刃剑“:数据安全与可靠性挑战与破局之道

在数字经济蓬勃发展的浪潮中&#xff0c;数据要素已然成为驱动经济社会创新发展的核心引擎。从智能制造到智慧城市&#xff0c;从电子商务到金融科技&#xff0c;数据要素的深度融合与广泛应用&#xff0c;正以前所未有的力量重塑着产业格局与经济形态。 然而&#xff0c;随着…

操作系统概述与安装

主流操作系统概述 信创平台概述 虚拟机软件介绍与安装 windows server 安装 centos7 安装 银河麒麟V10 安装 一&#xff1a;主流服务器操作系统 &#xff08;1&#xff09;Windows Server 发展历程&#xff1a; 1993年推出第一代 WindowsNT&#xff08;企业级内核&am…

开发了一个b站视频音频提取器

B站资源提取器-说明书 一、功能说明 本程序可自动解密并提取B站客户端缓存的视频资源&#xff0c;支持以下功能&#xff1a; - 自动识别视频缓存目录 - 将加密的.m4s音频文件转换为标准MP3格式 - 将加密的.m4s视频文件转换为标准MP4格式&#xff08;合并音视频流&#xff09;…

基于javaweb的SpringBoot校园服务平台系统设计与实现(源码+文档+部署讲解)

技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;免费功能设计、开题报告、任务书、中期检查PPT、系统功能实现、代码编写、论文编写和辅导、论文…

PHYBench:首个大规模物理场景下的复杂推理能力评估基准

2025-04-23, 由北京大学物理学院和人工智能研究所等机构共同创建的 PHYBench 数据集&#xff0c;这是一个专门用于评估大型语言模型在物理场景下的复杂推理能力的高质量基准。该数据集包含 500 道精心策划的物理问题&#xff0c;覆盖力学、电磁学、热力学、光学、现代物理和高级…

Red:1靶场环境部署及其渗透测试笔记(Vulnhub )

环境介绍&#xff1a; 靶机下载&#xff1a; https://download.vulnhub.com/red/Red.ova 本次实验的环境需要用到VirtualBox&#xff08;桥接网卡&#xff09;&#xff0c;VMware&#xff08;桥接网卡&#xff09;两台虚拟机&#xff08;网段都在192.168.152.0/24&#xff0…

深入详解人工智能数学基础——概率论中的KL散度在变分自编码器中的应用

🧑 博主简介:CSDN博客专家、CSDN平台优质创作者,高级开发工程师,数学专业,10年以上C/C++, C#, Java等多种编程语言开发经验,拥有高级工程师证书;擅长C/C++、C#等开发语言,熟悉Java常用开发技术,能熟练应用常用数据库SQL server,Oracle,mysql,postgresql等进行开发应用…

测试模版x

本篇技术博文摘要 &#x1f31f; 引言 &#x1f4d8; 在这个变幻莫测、快速发展的技术时代&#xff0c;与时俱进是每个IT工程师的必修课。我是盛透侧视攻城狮&#xff0c;一名什么都会一丢丢的网络安全工程师&#xff0c;也是众多技术社区的活跃成员以及多家大厂官方认可人员&a…

Openharmony 和 HarmonyOS 区别?

文章目录 OpenHarmony 与 HarmonyOS 的区别&#xff1a;开源生态与商业发行版的定位差异一、定义与定位二、技术架构对比1. OpenHarmony2. HarmonyOS 三、应用场景差异四、开发主体与生态支持五、关键区别总结六、如何选择&#xff1f;未来展望 OpenHarmony 与 HarmonyOS 的区别…

uniapp 仿小红书轮播图效果

通过对小红书的轮播图分析&#xff0c;可得出以下总结&#xff1a; 1.单张图片时容器根据图片像素定高 2.多图时轮播图容器高度以首图为锚点 3.比首图长则固高左右留白 4.比首图短则固宽上下留白 代码如下&#xff1a; <template><view> <!--轮播--><s…

R/G-B/G色温坐标系下对横纵坐标取对数的优势

有些白平衡色温坐标系会分别对横纵坐标取对数运算。 这样做有什么优势呢? 我们知道对数函数对0-1之间的因变量值具有扩展作用。即自变量x变化比较小时,经过对数函数作用后可以把因变量扩展到较大范围内,即x变化较小时,y变化较大,增加了识别数据的识别性。 由于Raw数据中的…

AI赋能安全调度系统:智能升级与功能跃迁

安全调度系统通过AI技术的深度整合&#xff0c;实现了从传统监控到智能决策的质变升级。这种智能化转型不仅提升了系统的响应速度和处理精度&#xff0c;更重塑了整个安全管理的运行范式。以下是AI技术为安全调度系统带来的核心功能强化&#xff1a; 智能风险识别与预警能力跃…

数据结构与算法(十二):图的应用-最小生成树-Prim/Kruskal

相关文献&#xff1a; 数据结构与算法(一)&#xff1a;基础理论 数据结构与算法(二)&#xff1a;线性表的实现 数据结构与算法(三)&#xff1a;线性表算法设计练习 数据结构与算法(四)&#xff1a;斐波那契数列 数据结构与算法(五)&#xff1a;LRU 数据结构与算法(六)&#xff…

项目——高并发内存池

目录 项目介绍 做的是什么 要求 内存池介绍 池化技术 内存池 解决的问题 设计定长内存池 高并发内存池整体框架设计 ThreadCache ThreadCache整体设计 哈希桶映射对齐规则 ThreadCache TLS无锁访问 CentralCache CentralCache整体设计 CentralCache结构设计 C…

系统与网络安全------弹性交换网络(2)

资料整理于网络资料、书本资料、AI&#xff0c;仅供个人学习参考。 Eth-Trunk 组网中经常会遇到的问题 链路聚合技术 概述 Eth-Trunk&#xff08;链路聚合技术&#xff09;作为一种捆绑技术&#xff0c;可以把多个独立的物理接口绑定在一起&#xff0c;作为一个大带宽的逻辑…