《深入探究 RocketMQ:分布式消息中间件的卓越之选》
一、引言
在当今复杂的网络通讯环境中,传统的 Http 请求同步方式存在诸多弊端。当客户端与服务器进行通讯时,客户端必须等待服务端完成处理后返回结果才能继续执行,这种同步调用方式一旦服务器端出现网络延迟或不可达的情况,客户端就会受到影响。为了解决这一问题,消息中间件应运而生,它为分布式系统的通信提供了高效可靠的解决方案。
二、消息中间件概述
(一)什么是消息中间件
消息中间件利用高效可靠的消息传递机制,实现平台无关的数据交流,并基于数据通信进行分布式系统的集成。它通过提供消息传递和消息排队模型,在分布式环境下扩展进程间的通信。常见的角色有 Producer(生产者)和 Consumer(消费者),就如同寄快递一样,生产者发送消息,消费者接收消息。
(二)消息中间件的使用场景
- 异步处理
以用户注册为例,用户注册后需要发送注册邮件和注册短信。传统方式是将注册信息写入数据库成功后,依次发送注册邮件和注册短信,三个任务全部完成后才返回给客户端。这种串行方式响应时间较长,系统吞吐量低。引入消息队列后,注册信息写入数据库成功后,将发送注册邮件和注册短信的任务写入消息队列后直接返回,用户的响应时间相当于注册信息写入数据库的时间,大大缩短了响应时间,提高了系统吞吐量。 - 应用解耦
在用户下单场景中,传统做法是订单系统调用库存系统的接口。但如果库存系统不可用,会影响订单系统的正常下单。解耦后,订单系统下单后将消息写入消息队列,不再关心后续操作,实现了订单系统与库存系统的应用解耦。
三、常见消息中间件比较
对 ActiveMQ、RabbitMQ、RocketMQ 和 Kafka 进行比较,它们在生产者消费者模式、发布订阅模式、请求回应模式、Api 完备性、多语言支持、单机吞吐量、消息延迟、可用性、消息丢失、文档完备性、提供快速入门、社区活跃度和商业支持等方面各有特点。
四、RocketMQ 详解
(一)环境准备
- 下载 RocketMQ:从 http://rocketmq.apache.org/release_notes/release-notes-4.4.0/ 下载。
- 环境要求:64 位操作系统、JDK 1.8+,安装 Maven。
(二)安装 RocketMQ
- 解压缩安装包。
- 配置环境变量:设置变量名 ROCKETMQ_HOME,变量值为 MQ 解压缩路径;编辑 path,添加 % ROCKETMQ_HOME%\bin。
- 启动 RocketMQ:切换到安装目录的 rocketmq 的 bin 目录下,启动 NameServer 和 Broker。如果启动 Broker 时出现错误提示 “错误:找不到或无法加载主类 xxxxxx”,在 bin 下找到并打开 runbroker.cmd,然后将‘% CLASSPATH%’加上英文双引号。
- 安装可视化插件:从 github 下载 rocketmq-externals-rocketmq-console-1.0.0.zip,解压压缩包,进入相应文件夹进行编译和启动,最后在浏览器输入 http://localhost:8085/ 进入控制台。
(三)RocketMQ 的架构及概念
RocketMQ 整体分为四个角色:NameServer、Broker、Producer 和 Consumer。
- Broker:是 RocketMQ 的核心,负责消息的接收、存储、投递等功能,如同邮局和邮递员。
- NameServer:是消息队列的协调者,Broker 向它注册路由信息,Producer 和 Consumer 向其获取路由信息,相当于各个邮局的管理机构。
- Producer:消息的生产者,需要从 NameServer 获取 Broker 信息,然后与 Broker 建立连接,向 Broker 发送消息,如同寄件人。
- Consumer:消息的消费者,需要从 NameServer 获取 Broker 信息,然后与 Broker 建立连接,从 Broker 获取消息,类似收件人。
- Topic:用来区分不同类型的消息,发送和接收消息前都需要先创建 Topic,针对 Topic 来发送和接收消息,如同地区。
- Message Queue:为提高性能和吞吐量引入,一个 Topic 可以设置一个或多个 Message Queue,消息可以并行发送和读取。
- Message:消息的载体。
(四)消息发送接收
-
发送同步消息
代码示例:
//发送消息 public class RocketMQSendTest { public static void main(String[] args) throws Exception { //1. 创建消息生产者, 指定生产者所属的组名 DefaultMQProducer producer = new DefaultMQProducer("myproducer-group"); // 创建一个 DefaultMQProducer 实例,指定生产者所属的组名为"myproducer-group"。这个组名用于区分不同的生产者集合。 //2. 指定Nameserver地址 producer.setNamesrvAddr("192.168.109.131:9876"); // 设置 NameServer 的地址,以便生产者能够连接到 RocketMQ 集群并获取路由信息。 //3. 启动生产者 producer.start(); // 启动生产者,使其准备好发送消息。 //4. 创建消息对象,指定主题、标签和消息体 Message msg = new Message("myTopic", "myTag", ("RocketMQ Message").getBytes()); // 创建一个 Message 对象,指定主题为"myTopic",标签为"myTag",消息体为"RocketMQ Message"的字节数组。 //5. 发送消息 SendResult sendResult = producer.send(msg); // 发送消息,并获取发送结果。 System.out.println(sendResult); // 打印发送结果,以便查看消息是否成功发送。 //6. 关闭生产者 producer.shutdown(); // 关闭生产者,释放资源。 } }
这种可靠性同步地发送方式适用于重要的消息通知、短信通知等场景。消息发送步骤包括创建消息生产者、指定 NameServer 地址、启动生产者、创建消息对象、发送消息和关闭生产者。
2.接收消息
代码示例:
//接收消息 public class RocketMQReceiveTest { public static void main(String[] args) throws MQClientException { //1. 创建消息消费者, 指定消费者所属的组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group"); // 创建一个 DefaultMQPushConsumer 实例,指定消费者所属的组名为"myconsumer-group"。 //2. 指定Nameserver地址 consumer.setNamesrvAddr("192.168.109.131:9876"); // 设置 NameServer 的地址,以便消费者能够连接到 RocketMQ 集群并获取路由信息。 //3. 指定消费者订阅的主题和标签 consumer.subscribe("myTopic", "*"); // 订阅主题为"myTopic",标签为通配符"*",表示订阅该主题下的所有消息。 //4. 设置回调函数,编写处理消息的方法 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println("Receive New Messages: " + msgs); // 返回消费状态 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 设置消息监听器,当有新消息到达时,会调用 consumeMessage 方法进行处理。这里打印接收到的消息,并返回消费成功的状态。 //5. 启动消息消费者 consumer.start(); // 启动消费者,使其开始接收消息。 System.out.println("Consumer Started."); } }
消息接收步骤包括创建消息消费者、指定 NameServer 地址、指定消费者订阅的主题和标签、设置回调函数编写处理消息的方法和启动消息消费者。
-
发送异步消息
异步消息通常用在对响应时间敏感的业务场景,发送端不能容忍长时间等待 Broker 的响应,只会等待 MQ 发送状态。 -
单向发送消息
主要用在不特别关心发送结果的场景,如日志发送。 -
消费消息
负载均衡模式(默认方式):消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同。
广播模式:消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的。
五、使用场景模拟:下单成功后发送短信
(一)订单微服务发送消息
-
在 shop-order 服务中添加 rocketmq 的依赖。
-
添加配置:
<!--rocketmq--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency> rocketmq: name-server: 127.0.0.1:9876 #rocketMQ 服务的地址 producer: group: shop-order #生产者组
3.编写测试代码。
(二)用户微服务订阅消息
-
修改 shop-user 模块配置:
<!--rocketmq--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency> rocketmq: name-server: 127.0.0.1:9876 //广播模式消费 consumer.setMessageModel(MessageModel.BROADCASTING);
-
修改置文件:
rocketmq: name-server: 127.0.0.1:9876
-
编写消息接收服务:
//发送短信的服务 @Slf4j @Service @RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic") public class SmsService implements RocketMQListener<Order> { @Override public void onMessage(Order order) { log.info("收到一个订单信息{},接下来发送短信", JSON.toJSONString(order)); } }
-
启动服务,执行下单操作,观看后台输出。
六、总结
RocketMQ 作为一款强大的分布式消息中间件,在异步处理、应用解耦等场景中发挥着重要作用。通过对其环境准备、安装、架构及概念的理解,以及消息发送接收和实际使用场景的模拟,我们深入了解了其工作原理和优势。在实际应用中,根据不同的业务需求选择合适的消息中间件,并合理利用其特性,可以提高系统的性能、可靠性和可扩展性。未来,消息中间件将继续在构建高效、可靠的分布式系统中发挥重要作用。
-
-