Rocket MQ
一,是啥,从哪来
RocketMQ是一个开源的分布式消息中间件,最初由阿里巴巴集团开发。它的设计目标是为了在高并发、高吞吐量的场景下,实现可靠的消息传输,并且具有良好的可伸缩性和可扩展性。
RocketMQ支持多种消息模式,包括同步、异步、单向和定时消息。同时,RocketMQ还具有高度可靠性、低延迟、高吞吐量、分布式部署等优点,可以在众多领域中广泛应用,比如电商、金融、物流等。
RocketMQ目前已经成为了Apache软件基金会下的顶级开源项目之一,并得到了全球广泛的应用和认可。
消息模型
简单消息模型:
RocketMQ拓展后的消息模型:
ps:
- 相同的ConsumerGroup下的消费者主要有两种负载均衡模式,即广播模式,和集群模式(图中是最常用的集群模式)。
- 在集群模式下,同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费,如图中 ConsumerGroupA 订阅 TopicA,TopicA 对应 3个队列,则 GroupA 中的 Consumer1 消费的是 MessageQueue 0和 MessageQueue 1的消息,Consumer2是消费的是MessageQueue2的消息。
- 在广播模式下,同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列。需要注意的是,广播模式下因为每个 Consumer 实例都需要处理全部的消息(如图中ConsumerGroup B),因此这种模式仅推荐在通知推送、配置同步类小流量场景使用。
二,能干啥
RocketMQ是一款高可靠、高吞吐量、可伸缩、易于部署和管理的分布式消息中间件,可以用来实现消息的异步传输、解耦、削峰填谷等功能。具体来说,RocketMQ可以用于以下场景:
- 事件驱动架构:RocketMQ可以用来将事件异步传输到订阅者,从而实现事件驱动架构,降低系统的耦合度,提高系统的可扩展性和可维护性。
- 流式计算:RocketMQ可以用来传输实时流式数据,如日志、监控数据、交易数据等,用于流式计算和实时数据分析。
- 异步处理:RocketMQ可以用来实现异步处理,将一些耗时的任务转移到异步线程或者其他服务中执行,从而提高系统的吞吐量和性能。
- 削峰填谷:RocketMQ可以用来实现消息队列的削峰填谷功能,当系统承受的请求量超过了系统的处理能力时,可以将请求转化为消息,通过RocketMQ进行缓存,等待系统恢复正常后再进行处理。
- 分布式事务:RocketMQ提供了基于消息的分布式事务支持,可以保证消息在多个节点之间的原子性和一致性。
总之,RocketMQ是一款功能强大的分布式消息中间件,适用于各种场景下的消息传输和异步处理。
RocketMQ强调的就是一个低延迟,高可靠性,高吞吐量的特点。
三,同类产品有哪些
- ActiveMQ(2004):Apache ActiveMQ是一个基于JMS协议的开源消息中间件,使用Java语言编写,可以用于构建分布式系统和企业应用。ActiveMQ支持多种协议,包括OpenWire、Stomp、AMQP、MQTT等,同时也提供了REST和WebSocket接口,支持消息持久化和事务等特性。
- RabbitMQ(2007):RabbitMQ是一个基于AMQP协议的开源消息中间件,使用Erlang语言编写,具有高性能和可扩展性。RabbitMQ支持多种语言客户端,包括Java、Python、Ruby等,支持消息持久化和事务等特性,同时也支持消息的确认和重传机制。插件支持顺序消息。
- Kafka(2011):Kafka是一个分布式的高吞吐量消息队列系统,使用Scala语言编写,由LinkedIn公司开发。Kafka的设计理念是基于发布/订阅模式的消息系统,支持多个消费者并行消费同一个Topic的消息,同时支持水平扩展和数据持久化等特性。
- RocketMQ(2012):RocketMQ是阿里巴巴开源的分布式消息中间件,使用Java语言编写,支持消息顺序性和高可用性。RocketMQ的设计理念是基于Topic和Tag来实现消息的路由,同时支持批量发送和消息事务等特性,适用于大规模的分布式系统和实时数据处理场景。
四,同类产品对比
MQ产品 | 性能 | 集群部署 | 顺序消息 | 定时消息 | 消息存储 | 开发语言 |
---|---|---|---|---|---|---|
ActiveMQ | 低,数据量大时更甚 | 管理起来较复杂 | 支持,通过exclusive consumer(独有消费者)和message group(同一个组的发给同一个consumer)来实现 | 支持 | 支持使用JDBC和高性能日志的快速持久性,如levelDB、kahaDB。 | Java |
RabbitMQ | 低,数据量大时更甚 | 不支持 | 支持(单个队列) | 支持(需要引入插件来支持) | Erlang语言自带的Mnesia数据库或者通过插件支持Mysql,PostageSql | Erlang |
Kafka | 高吞吐量,低延迟 | 支持分布式部署和水平拓展 | 不支持 | 不支持 | 磁盘和内存混合。Kafka将消息存储在磁盘上的一个或多个日志文件中,同时使用内存缓存来提高读写性能和吞吐量 | Scala |
RocketMQ | 高可靠性、高吞吐量、低延迟,适用于高并发、大规模的数据处理场景。 | 支持分布式部署和水平扩展 | 支持(一组顺序性的消息发送至同一台broker中的同一个队列中) | 支持 | CommitLog,该存储方式将消息持久化到磁盘上,同时使用内存映射技术加速读写操作。CommitLog以消息队列的形式存储消息,支持多种索引方式,包括哈希索引、时间索引等。RocketMQ还支持使用外部存储服务来存储消息,如Hadoop、MySQL、MongoDB等。 | Java |
五,原理
RocketMQ的实现原理可以分为以下几个方面:
- 架构设计:RocketMQ的架构分为四层,分别是客户端、Broker、NameServer和存储层。客户端和Broker之间通过网络通信来传递消息,NameServer负责维护Broker的元数据信息,存储层则负责实际存储消息。
- 消息存储:RocketMQ使用CommitLog存储消息,即将消息追加到磁盘上的文件中。每个Broker节点上都有一个或多个CommitLog文件,每个文件分为多个固定大小的消息存储段,每个存储段包含多条消息,每条消息由消息长度、消息内容和消息属性三部分组成。
- 消息路由:RocketMQ通过Topic和Tag来实现消息的路由,Producer发送消息时需要指定消息所属的Topic和Tag,Broker根据Topic和Tag将消息分发到相应的Consumer。每个Topic可以有多个Tag,每个Consumer可以订阅一个或多个Topic和Tag的消息。
- HA机制:RocketMQ通过主从复制的方式实现高可用性,即每个Broker节点都有一个Master节点和多个Slave节点。Master节点负责接收Producer的消息和向Consumer发送消息,同时将消息同步到Slave节点上,Slave节点用于备份和容灾。当Master节点宕机时,会自动切换到Slave节点继续服务。
- 消息顺序保证:RocketMQ可以通过顺序消息的方式来保证消息的顺序,即Producer发送的顺序消息在Broker上存储和消费时都保持顺序不变。RocketMQ通过Topic和Message Queue来实现顺序消息,每个Message Queue只能由一个Consumer消费,Producer发送的所有消息都会按照顺序写入同一个Message Queue中,Consumer则按照顺序消费该Queue中的消息。
总的来说,RocketMQ的实现原理主要包括了消息存储、消息路由、HA机制、消息顺序保证等方面,这些技术手段使得RocketMQ具备了高性能、高可靠性和高可扩展性的特点。
六,如何使用
部署模型
ps:NameServer作用
- 管理Broker,每个Broker都会注册到一个NameServer中,并定时发送心跳,NameServer提供心跳检测机制,检查Broker是否还存活。
- 管理路由信息,每个NameServer将保存关于整个Broker集群的整个路由信息和用于客户端查询的队列信息。Product和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
部署组件
Broker,NameServer,Provider,Consumer
部署过程
目的:在一台linux服务器下部署一个Master+slaver节点的RocketMQ,并在springboot项目中使用它。
-
下载并安装RocketMQ
可以从RocketMQ的官网(http://rocketmq.apache.org/)下载所需版本的安装包,解压后得到RocketMQ的安装目录。将安装目录复制到服务器上的合适位置,比如
/usr/local/rocketmq
。 -
修改配置文件
在
/usr/local/rocketmq/conf
目录下,复制broker.conf
文件并将其重命名为broker-a.properties
(作为Master节点的配置文件)和broker-b.properties
(作为Slave节点的配置文件)。在两个配置文件中,需要修改的主要配置项包括:brokerName
:设置Broker名称,可以自定义。brokerId
:设置Broker的唯一ID,Master和Slave节点的ID不能相同。namesrvAddr
:设置NameServer的地址和端口,格式为<name-server-ip>:9876
。listenPort
:设置Broker的监听端口,Master和Slave节点的端口不能相同。
例:
broker-master.properties:
# Master节点配置 brokerName=master brokerId=0 namesrvAddr=localhost:9876 listenPort=10911 brokerRole=SYNC_MASTER
broker-slave.properties:
# Slaver节点配置 brokerName=slaver brokerId=1 namesrvAddr=localhost:9876 listenPort=10912 brokerRole=SLAVE
-
启动NameServer
在命令行中执行以下命令来启动NameServer:
cd /usr/local/rocketmq/bin nohup sh mqnamesrv &
-
启动Master和Slave节点
在命令行中执行以下命令来启动Master和Slave节点:
cd /usr/local/rocketmq/bin nohup sh mqbroker -c ../conf/broker-master.properties & nohup sh mqbroker -c ../conf/broker-slave.properties &
这将启动两个节点,并将Master节点和Slave节点进行绑定。
查看日志的地方:
当前文件夹下的nohup.out或者~/logs/rocketmqlogs文件下。
-
验证节点状态
执行以下命令来查看节点状态:
cd /usr/local/rocketmq/bin sh mqadmin clusterList -n <name-server-ip>:9876
如果节点正常启动,将输出Master和Slave节点的信息。
-
在Spring Boot项目中使用RocketMQ
首先需要添加RocketMQ的依赖,在
pom.xml
文件中加入以下依赖:<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${rocketmq.version}</version> </dependency>
在Spring Boot项目的配置文件中,需要添加以下配置项:
rocketmq.name-server=192.168.1.100:9876
其中
192.168.1.100:9876
是RocketMQ的NameServer地址和端口。在需要使用RocketMQ的类中,可以使用
@RocketMQMessageListener
注解来监听消息:@Service @RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-consumer-group", selectorExpression = "my-tag") public class MyRocketMQListener implements RocketMQListener<String> { @Override public void onMessage(String message) { // 处理消息 } }
其中
topic
表示要监听的主题