深入探究 RocketMQ:分布式消息中间件的卓越之选》

news2024/11/15 11:57:03

《深入探究 RocketMQ:分布式消息中间件的卓越之选》

一、引言

在当今复杂的网络通讯环境中,传统的 Http 请求同步方式存在诸多弊端。当客户端与服务器进行通讯时,客户端必须等待服务端完成处理后返回结果才能继续执行,这种同步调用方式一旦服务器端出现网络延迟或不可达的情况,客户端就会受到影响。为了解决这一问题,消息中间件应运而生,它为分布式系统的通信提供了高效可靠的解决方案。

二、消息中间件概述

(一)什么是消息中间件
消息中间件利用高效可靠的消息传递机制,实现平台无关的数据交流,并基于数据通信进行分布式系统的集成。它通过提供消息传递和消息排队模型,在分布式环境下扩展进程间的通信。常见的角色有 Producer(生产者)和 Consumer(消费者),就如同寄快递一样,生产者发送消息,消费者接收消息。

image-20240904191016839

(二)消息中间件的使用场景

  1. 异步处理
    以用户注册为例,用户注册后需要发送注册邮件和注册短信。传统方式是将注册信息写入数据库成功后,依次发送注册邮件和注册短信,三个任务全部完成后才返回给客户端。这种串行方式响应时间较长,系统吞吐量低。引入消息队列后,注册信息写入数据库成功后,将发送注册邮件和注册短信的任务写入消息队列后直接返回,用户的响应时间相当于注册信息写入数据库的时间,大大缩短了响应时间,提高了系统吞吐量。
  2. 应用解耦
    在用户下单场景中,传统做法是订单系统调用库存系统的接口。但如果库存系统不可用,会影响订单系统的正常下单。解耦后,订单系统下单后将消息写入消息队列,不再关心后续操作,实现了订单系统与库存系统的应用解耦。

image-20240904191207176

三、常见消息中间件比较

对 ActiveMQ、RabbitMQ、RocketMQ 和 Kafka 进行比较,它们在生产者消费者模式、发布订阅模式、请求回应模式、Api 完备性、多语言支持、单机吞吐量、消息延迟、可用性、消息丢失、文档完备性、提供快速入门、社区活跃度和商业支持等方面各有特点。

image-20240904191451170

四、RocketMQ 详解

(一)环境准备

  1. 下载 RocketMQ:从 http://rocketmq.apache.org/release_notes/release-notes-4.4.0/ 下载。
  2. 环境要求:64 位操作系统、JDK 1.8+,安装 Maven。

(二)安装 RocketMQ

  1. 解压缩安装包。
  2. 配置环境变量:设置变量名 ROCKETMQ_HOME,变量值为 MQ 解压缩路径;编辑 path,添加 % ROCKETMQ_HOME%\bin。
  3. 启动 RocketMQ:切换到安装目录的 rocketmq 的 bin 目录下,启动 NameServer 和 Broker。如果启动 Broker 时出现错误提示 “错误:找不到或无法加载主类 xxxxxx”,在 bin 下找到并打开 runbroker.cmd,然后将‘% CLASSPATH%’加上英文双引号。
  4. 安装可视化插件:从 github 下载 rocketmq-externals-rocketmq-console-1.0.0.zip,解压压缩包,进入相应文件夹进行编译和启动,最后在浏览器输入 http://localhost:8085/ 进入控制台。

(三)RocketMQ 的架构及概念
RocketMQ 整体分为四个角色:NameServer、Broker、Producer 和 Consumer。

  1. Broker:是 RocketMQ 的核心,负责消息的接收、存储、投递等功能,如同邮局和邮递员。
  2. NameServer:是消息队列的协调者,Broker 向它注册路由信息,Producer 和 Consumer 向其获取路由信息,相当于各个邮局的管理机构。
  3. Producer:消息的生产者,需要从 NameServer 获取 Broker 信息,然后与 Broker 建立连接,向 Broker 发送消息,如同寄件人。
  4. Consumer:消息的消费者,需要从 NameServer 获取 Broker 信息,然后与 Broker 建立连接,从 Broker 获取消息,类似收件人。
  5. Topic:用来区分不同类型的消息,发送和接收消息前都需要先创建 Topic,针对 Topic 来发送和接收消息,如同地区。
  6. Message Queue:为提高性能和吞吐量引入,一个 Topic 可以设置一个或多个 Message Queue,消息可以并行发送和读取。
  7. Message:消息的载体。

image-20240904191559595

(四)消息发送接收

  1. 发送同步消息

    代码示例:

    //发送消息
    public class RocketMQSendTest {
        public static void main(String[] args) throws Exception {
            //1. 创建消息生产者, 指定生产者所属的组名
            DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
            // 创建一个 DefaultMQProducer 实例,指定生产者所属的组名为"myproducer-group"。这个组名用于区分不同的生产者集合。
            //2. 指定Nameserver地址
            producer.setNamesrvAddr("192.168.109.131:9876");
            // 设置 NameServer 的地址,以便生产者能够连接到 RocketMQ 集群并获取路由信息。
            //3. 启动生产者
            producer.start();
            // 启动生产者,使其准备好发送消息。
            //4. 创建消息对象,指定主题、标签和消息体
            Message msg = new Message("myTopic", "myTag", ("RocketMQ Message").getBytes());
            // 创建一个 Message 对象,指定主题为"myTopic",标签为"myTag",消息体为"RocketMQ Message"的字节数组。
            //5. 发送消息
            SendResult sendResult = producer.send(msg);
            // 发送消息,并获取发送结果。
            System.out.println(sendResult);
            // 打印发送结果,以便查看消息是否成功发送。
            //6. 关闭生产者
            producer.shutdown();
            // 关闭生产者,释放资源。
        }
    }
    

    这种可靠性同步地发送方式适用于重要的消息通知、短信通知等场景。消息发送步骤包括创建消息生产者、指定 NameServer 地址、启动生产者、创建消息对象、发送消息和关闭生产者。

    2.接收消息

    代码示例:

    //接收消息
    public class RocketMQReceiveTest {
        public static void main(String[] args) throws MQClientException {
            //1. 创建消息消费者, 指定消费者所属的组名
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
            // 创建一个 DefaultMQPushConsumer 实例,指定消费者所属的组名为"myconsumer-group"。
            //2. 指定Nameserver地址
            consumer.setNamesrvAddr("192.168.109.131:9876");
            // 设置 NameServer 的地址,以便消费者能够连接到 RocketMQ 集群并获取路由信息。
            //3. 指定消费者订阅的主题和标签
            consumer.subscribe("myTopic", "*");
            // 订阅主题为"myTopic",标签为通配符"*",表示订阅该主题下的所有消息。
            //4. 设置回调函数,编写处理消息的方法
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.println("Receive New Messages: " + msgs);
                    // 返回消费状态
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 设置消息监听器,当有新消息到达时,会调用 consumeMessage 方法进行处理。这里打印接收到的消息,并返回消费成功的状态。
            //5. 启动消息消费者
            consumer.start();
            // 启动消费者,使其开始接收消息。
            System.out.println("Consumer Started.");
        }
    }
    

    消息接收步骤包括创建消息消费者、指定 NameServer 地址、指定消费者订阅的主题和标签、设置回调函数编写处理消息的方法和启动消息消费者。

    1. 发送异步消息
      异步消息通常用在对响应时间敏感的业务场景,发送端不能容忍长时间等待 Broker 的响应,只会等待 MQ 发送状态。

    2. 单向发送消息
      主要用在不特别关心发送结果的场景,如日志发送。

    3. 消费消息

      负载均衡模式(默认方式):消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同。

      广播模式:消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的。

      image-20240904191727230

    五、使用场景模拟:下单成功后发送短信

    (一)订单微服务发送消息

    1. 在 shop-order 服务中添加 rocketmq 的依赖。

    2. 添加配置:

      <!--rocketmq-->
      <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-spring-boot-starter</artifactId>
          <version>2.0.2</version>
      </dependency>
      rocketmq:
      name-server: 127.0.0.1:9876 #rocketMQ 服务的地址
      producer:
      group: shop-order #生产者组
      

      3.编写测试代码。

      (二)用户微服务订阅消息

      1. 修改 shop-user 模块配置:

        <!--rocketmq-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        rocketmq:
        name-server: 127.0.0.1:9876
        //广播模式消费
        consumer.setMessageModel(MessageModel.BROADCASTING);
        
      2. 修改置文件:

        rocketmq:
        name-server: 127.0.0.1:9876
        
      3. 编写消息接收服务:

        //发送短信的服务
        @Slf4j
        @Service
        @RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
        public class SmsService implements RocketMQListener<Order> {
            @Override
            public void onMessage(Order order) {
                log.info("收到一个订单信息{},接下来发送短信", JSON.toJSONString(order));
            }
        }
        
      4. 启动服务,执行下单操作,观看后台输出。

      六、总结

      RocketMQ 作为一款强大的分布式消息中间件,在异步处理、应用解耦等场景中发挥着重要作用。通过对其环境准备、安装、架构及概念的理解,以及消息发送接收和实际使用场景的模拟,我们深入了解了其工作原理和优势。在实际应用中,根据不同的业务需求选择合适的消息中间件,并合理利用其特性,可以提高系统的性能、可靠性和可扩展性。未来,消息中间件将继续在构建高效、可靠的分布式系统中发挥重要作用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2104947.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

万字详解 Redis

1 Redis 是什么 1.1 定义 Redis是一种开源的、基于内存的数据结构存储系统&#xff0c;可以用作数据库、缓存、消息队列等。它支持多种数据结构&#xff0c;如字符串&#xff08;String&#xff09;、哈希&#xff08;Hash&#xff09;、列表&#xff08;List&#xff09;、集…

黑马点评2——商户查询缓存(P37店铺类型查询业务添加缓存练习题答案)redis缓存、更新、穿透、雪崩、击穿、工具封装

文章目录 什么是缓存&#xff1f;添加Redis缓存店铺类型查询业务添加缓存练习题 缓存更新策略给查询商铺的缓存添加超时剔除和主动更新的策略 缓存穿透缓存空对象布隆过滤 缓存雪崩解决方案 缓存击穿解决方案基于互斥锁方式解决缓存击穿问题基于逻辑过期的方式解决缓存击穿问题…

DrissionPage设置启动浏览器为edge

1.查看浏览器启动路径 在浏览器地址栏输入下面地址&#xff0c;拿到可执行文件的路径 。 edge://version/ 2.替换路径 打开DrissionPage._configs. chromium_options.py文件&#xff0c;找到def browser_path(self)这个函数&#xff0c;将返回内容替换为edge的启动路径&#x…

xacro->urdf->pdf

在ROS 2系统中&#xff0c;要将xacro文件转换为PDF文件&#xff0c;可以按照以下步骤操作&#xff1a; 步骤1&#xff1a;将Xacro文件转换为URDF文件 首先&#xff0c;需要将xacro文件转换为urdf文件。可以使用ROS 2提供的xacro工具来完成这个转换。 ros2 run xacro xacro p…

6.1.数据结构-c/c++模拟实现堆上篇(向下,上调整算法,建堆,增删数据)

目录 一.堆(Heap)的基本介绍 二.堆的常用操作&#xff08;以小根堆为例&#xff09; 三.实现代码 3.1 堆结构定义 3.2 向下调整算法* 3.3 初始化堆* 3.4 销毁堆 3.4 向上调整算法* 3.5 插入数据 3.6 删除数据 3.7 返回堆顶数据 四.下篇内容 1.堆排序 2.TopK问题 一…

案例-KVM+GFS分布式存储系统构建KVM高可用(虚拟化实战)

NFS GlusterFS 基于共享存储 采用GFS做共享存储&#xff1b; 实验环境&#xff1a;101 102 103 104 做gfs集群&#xff1b;201 202做虚拟机&#xff1b; 同步一下会话&#xff1b; 为了方便使用主机名进行通信&#xff0c;修改hosts文件&#xff1b; 为了使用GlusterFS的仓库…

国内独家首发 | OpenCSG开源中文版fineweb edu数据集

01 背景 近年来&#xff0c;人工智能&#xff08;AI&#xff09;技术&#xff0c;特别是自然语言处理&#xff08;NLP&#xff09;的飞速发展深刻影响着各个行业。从智能客服到内容生成&#xff0c;从语音识别到翻译工具&#xff0c;NLP的应用已经无处不在。在这一领域中&…

[线程]JUC中常见的类 及 集合类在多线程下的线程安全问题

文章目录 一. JUC中常见的类1. Callable接口2. ReentrantLock3. Semaphore 信号量4. CountDownLatch 二. 集合类在多线程下的线程安全问题多线程下使用ArrayList多线程下使用哈希表(重要) 下面介绍的内容是面试中常考, 但是实际开发中用不到的知识 一. JUC中常见的类 JUC : ja…

『功能项目』主角身旁召唤/隐藏坐骑【20】

本章项目成果展示 我们打开上一篇19坐骑UI搭建及脚本控制显/隐的项目&#xff0c; 本章要做的事情是在坐骑UI界面点击召唤及隐藏坐骑的功能 首先在外包中拖拽一个坐骑熊的预制体 完全解压缩 重命名为MountBear 在资源文件夹Resources下的/预制体文件夹Prefabs下新建坐骑文件夹…

海外合规|新加坡网络安全认证计划简介(三)-Cyber Trust

一、 认证简介&#xff1a; Cyber Trust标志是针对数字化业务运营更为广泛的组织的网络安全认证。该标志针对的是规模较大或数字化程度较高的组织&#xff0c;因为这些组织可能具有更高的风险水平&#xff0c;需要他们投资专业知识和资源来管理和保护其 IT 基础设施和系统。Cy…

新质生产力人工智能+系列5-智能业务识别研究(含任务、数据、算力资源)

在新质生产力高质量发展的要求下&#xff0c;中国移动在“人工智能”和 “数据要素X”方面不断发力&#xff0c;持续发布高质量电信数据集。围绕网元智能、运维智能、服务智能三大方向建设&#xff0c;涵盖无线信道、基站、云网、核心网、哑资源等多领域&#xff0c;支持感知、…

护工系统|护工陪护系统|护工小程序

在医疗服务行业日新月异的今天&#xff0c;陪护机构正乘着数字化转型的东风&#xff0c;扬帆远航&#xff0c;其中&#xff0c;护工系统的引入无疑为其插上了一对强劲的翅膀&#xff0c;引领着行业向更加高效、精细化的方向迈进。这一系统不仅是对传统陪护模式的深刻重塑&#…

oracle物理存储结构文件详解

文章目录 oracle物理文件结构图① 控制文件&#xff1a;② 数据文件&#xff1a;③ 联机Redo日志文件&#xff1a;④ 参数文件&#xff1a;⑤ 归档文件&#xff1a;⑥ 密码文件&#xff1a; oracle物理文件结构图 Oracle数据库的物理结构由控制文件&#xff08;Control f…

硬件工程师笔试面试知识器件篇——电容

目录 电容 2.1、基础 电容原理图 电容实物图 2.1.1、定义 2.1.2、原理 2.1.3、电容的类型 分类1: 分类2: 2.1.4、电容的应用 2.2、相关问题 2.2.1、电容器的电容值如何测量 2.2.2、不同类型的电容器在实际应用中有那些具体差异 2.2.3、如何选择合适的电容器来满…

OrangePi AIpro 香橙派 昇腾 Ascend C 算子开发 与 调用 - Tiling实现 2

OrangePi AIpro 香橙派 昇腾 Ascend C 算子开发 与 调用 - Tiling实现 2 flyfish 前置知识 1 前置知识 2 Host侧CPU和Device侧NPU的主要区别 不同的硬件资源 CPU是为了执行通用计算任务而设计的&#xff0c;但在处理大量的并行计算&#xff08;如矩阵乘、批数据处理&#…

智能优化算法-北方苍鹰优化算法(NGO)(附源码)

目录 1.内容介绍 2.部分代码 3.实验结果 4.内容获取 1.内容介绍 北方苍鹰优化算法 (Northern Goshawk Optimizer, NGO) 是一种基于群体智能的元启发式优化算法&#xff0c;它模拟了北方苍鹰&#xff08;Northern Goshawk&#xff09;的捕食行为、领地行为以及社交互动&#x…

input系统之InputDispatcher

往期文章&#xff1a; Input系统之IMS的启动流程 input系统之InputReader 1.概述 InputReader和InputDispatcher是Input系统的重要组成部分&#xff0c;InputReader主要负责从设备节点获取原始输入事件&#xff0c;并将封装好的事件交给InputDispatcher&#xff1b;InputDis…

数据库 | 子查询 | sql执行顺序 | mysql是否运行

1.系统&#xff08;客户端&#xff09;访问 MySQL 服务器前&#xff0c;做 的第一件事就是建立 TCP 连接。 Caches & Buffers&#xff1a; 查询缓存组件SQL Interface: SQL接口 接收用户的SQL命 令&#xff0c;并且返回用户需要查询的结果。比如 SELECT … FROM就是调用SQ…

防止goroutine崩溃导致主进程崩溃

在Go语言中&#xff0c;当一个goroutine发生异常时&#xff0c;它会直接退出&#xff0c;并不会影响其他goroutine或者主进程。Go语言的设计哲学是“不要通过共享内存来通信&#xff0c;而应该通过通信来共享内存”。这就意味着&#xff0c;goroutine之间的协作通常是通过chann…

Azure AI Search 中的二进制量化:优化存储和加快搜索速度

随着组织继续利用生成式 AI 的强大功能来构建检索增强生成 (RAG) 应用程序和代理&#xff0c;对高效、高性能和可扩展解决方案的需求从未如此强烈。 今天&#xff0c;我们很高兴推出二进制量化&#xff0c;这项新功能可将向量大小减少高达 96%&#xff0c;同时将搜索延迟减少高…