1、简介
RocketMQ 是一款开源的分布式消息队列系统,由阿里巴巴集团开发并开源。它是为了满足大规模分布式系统中的消息通信和异步解耦需求而设计的,具有高吞吐量、低延迟、可靠性强等特点。下面将详细介绍 RocketMQ 的架构、组件和关键特性。
以下是RocketMQ界面:
2、基础概念:
- 消息:待传递的数据单元,包含消息主题和消息内容。
- 主题(Topic):消息的分类标识,消费者通过订阅主题来接收相关消息。
- 消费者(Consumer):消息的接收者,通过订阅主题来消费消息。
- 生产者(Producer):消息的发送者,将消息发布到指定的主题。
- 消费者分组(Consumer Group)是一种将多个消费者实例组织在一起的概念。
- ACL(Access Control List):俗称访问控制列表。访问控制,基本上会涉及到用户、资源、权限、角色等
3、架构和组件:
- 名字服务器(Name Server):管理元数据信息、提供路由查询等功能。
- 代理服务器(Broker):消息的中转和存储,负责消息的传递和存储。
- 控制器(Controller):监控和管理整个集群的状态和各个 Broker 的健康状况。
4、消息传输模型
主流的消息中间件的传输模型主要为点对点模型和发布订阅模型。
4.1、点对点模型
点对点模型也叫队列模型,具有如下特点:
- 消费匿名:消息上下游沟通的唯一的身份就是队列,下游消费者从队列获取消息无法申明独立身份。
- 一对一通信:基于消费匿名特点,下游消费者即使有多个,但都没有自己独立的身份,因此共享队列中的消息,每一条消息都只会被唯一一个消费者处理。因此点对点模型只能实现一对一通信。
4.2、发布订阅模型
发布订阅模型具有如下特点:
- 消费独立:相比队列模型的匿名消费方式,发布订阅模型中消费方都会具备的身份,一般叫做订阅组(订阅关系),不同订阅组之间相互独立不会相互影响。
- 一对多通信:基于独立身份的设计,同一个主题内的消息可以被多个订阅组处理,每个订阅组都可以拿到全量消息。因此发布订阅模型可以实现一对多通信。
传输模型对比
点对点模型和发布订阅模型各有优势,点对点模型更为简单,而发布订阅模型的扩展性更高。 RocketMQ 使用的传输模型为发布订阅模型,因此也具有发布订阅模型的特点。
5、生产者
5.1、消息发送类型
1)普通消息发送
Apache RocketMQ可用于以三种方式发送消息:同步、异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。
2)延时消息发送
延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。
在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的延时事件触发。使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
}
3)顺序消息发送
顺序消息是一种对消息发送和消费顺序有严格要求的消息。
对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在 RocketMQ 中支持分区顺序消息,如下图所示。我们可以按照某一个标准对消息进行分区(比如图中的ShardingKey),同一个ShardingKey的消息会被分配到同一个队列中,并按照顺序被消费。
需要注意的是 RocketMQ 消息的顺序性分为两部分,生产顺序性和消费顺序性。只有同时满足了生产顺序性和消费顺序性才能达到上述的FIFO效果。
4)事务消息发送
在一些对数据一致性有强需求的场景,可以用 RocketMQ 事务消息来解决,从而保证上下游数据的一致性。
事务消息交互流程如下图所示。
- 生产者将消息发送至Apache RocketMQ服务端。
- Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
- 生产者开始执行本地事务逻辑。
- 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
- 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
- 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
- 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 说明 服务端回查的间隔时间和最大回查次数,请参见参数限制。
- 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
5)应答消息发送及接收
RocketMQ中,消息的应答主要分为两个方面:发送方的应答和消费方的应答。
- 发送方的应答:
- 同步发送应答:当发送方使用同步发送方式发送消息时,发送方法会阻塞等待Broker的应答结果,直到收到发送结果的响应或发送超时。
- 异步发送应答:当发送方使用异步发送方式发送消息时,发送方法会立即返回,发送方可以通过注册回调函数来处理发送结果,当收到Broker的应答时,会调用回调函数通知发送结果。
- 单向发送无应答:当发送方使用单向发送方式发送消息时,发送方法会立即返回,不关心发送结果,也不会等待任何应答。
- 消费方的应答:
- 消费成功应答:当消费方成功处理一条消息后,可以向Broker发送消费成功的应答。Broker在收到消费成功的应答后,会将该消息标记为已消费,防止重复消费。
- 消费失败应答:当消费方无法处理一条消息时,可以向Broker发送消费失败的应答。Broker在收到消费失败的应答后,可以根据消费失败的策略进行相应处理,例如重新投递给其他消费者或进行重试等。
通过应答机制,RocketMQ可以确保消息的可靠性传输和处理。发送方可以根据应答结果判断消息是否发送成功,消费方可以通过应答告知Broker消息的处理结果。这样可以保证消息系统的可靠性和一致性。
public class ManualAckConsumer {
public static void main(String[] args) throws MQClientException {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic和Tag
consumer.subscribe("TopicTest", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
try {
// 处理消息
System.out.println("收到消息:" + new String(message.getBody()));
// 模拟业务处理成功
boolean success = true;
if (success) {
// 手动ACK确认消息消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
// 手动ACK确认消息消费失败
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
} catch (Exception e) {
e.printStackTrace();
// 异常情况下,手动ACK确认消息消费失败
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 消费失败
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
// 启动消费者
consumer.start();
System.out.println("消费者已启动");
}
}
5.2、消息发送方式
RocketMQ 提供了三种消息发送方式,分别是同步发送、异步发送和单向发送。
- 同步发送(Synchronous Send):发送方发送消息后,会一直等待消息被成功发送或发送超时。发送方会阻塞等待,直到收到发送结果的响应或者超时。同步发送适用于对消息的可靠性要求较高,需要确保消息被成功发送并得到处理结果的场景。
public class SyncProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("group_name");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
try {
// 创建消息实例,指定Topic、Tag和消息内容
Message message = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
// 发送消息并等待返回结果
SendResult sendResult = producer.send(message);
// 判断消息发送状态
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}
} catch (Exception e) {
e.printStackTrace();
}
// 关闭生产者实例
producer.shutdown();
}
}
- 异步发送(Asynchronous Send):发送方发送消息后,不会立即等待发送结果,而是通过回调函数来处理发送结果。发送方发送消息后立即返回,并继续执行后续代码,接收方收到消息后会异步处理并回调发送方的回调函数,通知发送结果。异步发送适用于对消息的实时性要求较高,但对消息可靠性要求稍低的场景。
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("group_name");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
try {
// 创建消息实例,指定Topic、Tag和消息内容
Message message = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
// 发送消息并注册回调
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息发送成功");
}
@Override
public void onException(Throwable e) {
System.out.println("消息发送失败:" + e.getMessage());
}
});
// 等待异步发送结果
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
// 关闭生产者实例
producer.shutdown();
}
}
- 单向发送(One-Way Send):发送方发送消息后,不关心发送结果,也不会等待任何响应。发送方只负责将消息发送到消息队列中,不做任何确认或者后续处理。单向发送适用于对消息的可靠性要求较低,无需关注发送结果的场景,如日志收集等。
public class OnewayProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("group_name");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
try {
// 创建消息实例,指定Topic、Tag和消息内容
Message message = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
// 发送单向消息,无需等待发送结果
producer.sendOneway(message);
} catch (Exception e) {
e.printStackTrace();
}
// 关闭生产者实例
producer.shutdown();
}
}
6、消费者
6.1、消费者组
在 RocketMQ 的领域模型中,消费者分组的位置和流程如下:
- 消息由生产者初始化并发送到RocketMQ 服务端。
- 消息按照到达RocketMQ 服务端的顺序存储到主题的指定队列中。
- 消费者按照指定的订阅关系从RocketMQ 服务端中获取消息并消费。
6.2、消费模式
RocketMQ提供两种分组模式,集群模式(Cluster Mode)和广播模式(Broadcast Mode)。
- 集群消费模式:多个消费者实例共同消费同一主题的消息。
- 广播消费模式(topic通配符):每个消费者实例都独立消费所有消息。
集群消费模式适用于每条消息只需要被处理一次的场景,也就是说整个消费组会Topic收到全量的消息,而消费组内的消费分担消费这些消息,因此可以通过扩缩消费者数量,来提升或降低消费能力,具体示例如下图所示,是最常见的消费方式。
// 设置消费模式为集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
广播消费模式适用于每条消息需要被消费组的每个消费者处理的场景,也就是说消费组内的每个消费者都会收到订阅Topic的全量消息,因此即使扩缩消费者数量也无法提升或降低消费能力,具体示例如下图所示。
// 设置消费模式为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
6.3、消费方式(pull/push)
MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。
Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
7、消息过滤
RocketMQ在topic过滤基础上可以增加tag进一步筛选
Message message = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
8、持久化的实现:
消息的持久化存储:RocketMQ 将消息存储到磁盘,以确保消息的可靠性。
RocketMQ是一种分布式消息中间件,它支持消息的持久化,以确保消息在系统异常或故障发生时不会丢失。RocketMQ提供了多种持久化方式,包括同步刷盘和异步刷盘,以满足不同场景下的性能和可靠性需求。
RocketMQ的持久化机制如下:
- 异步刷盘(Asynchronous Flush):RocketMQ使用异步方式将消息写入磁盘。当消息到达Broker时,会首先将消息写入内存缓冲区(PageCache),同时异步地将消息刷写到磁盘中的CommitLog文件。异步刷盘可以提高写入性能,但在系统异常崩溃时,可能会导致部分消息的丢失。
- 同步刷盘(Synchronous Flush):除了异步刷盘,RocketMQ还提供了同步刷盘的方式。在同步刷盘模式下,当消息写入内存缓冲区后,Broker会立即将消息刷写到磁盘中的CommitLog文件,并等待刷盘操作的返回结果。这样可以确保消息写入磁盘后的持久化,但会对写入性能产生一定的影响。
- 文件预热(File Preallocation):RocketMQ在启动时会预先分配一些CommitLog文件,这样可以避免消息写入时频繁地创建和分配文件,提高写入性能和稳定性。
- 重试机制(Retry Mechanism):如果消息写入CommitLog文件失败,RocketMQ会进行重试,直到写入成功或达到最大重试次数。这样可以增加消息的可靠性和持久化能力。
- 文件刷写策略(Flush Strategy):RocketMQ提供了多种文件刷写策略,可以根据不同的需求和场景进行配置。例如,可以设置定时刷盘、根据消息数量刷盘或根据时间间隔刷盘等。
9、负载均衡
集群模式下,同一个消费组内的消费者会分担收到的全量消息,这里的分配策略是怎样的?如果扩容消费者是否一定能提升消费能力?
RocketMQ 提供了多种集群模式下的分配策略,包括平均分配策略、机房优先分配策略、一致性hash分配策略等,可以通过如下代码进行设置相应负载均衡策略
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
默认的分配策略是平均分配,这也是最常见的策略。平均分配策略下消费组内的消费者会按照类似分页的策略均摊消费。
10、部署方式
部署方式 | RocketMQrocketmq.apache.org/zh/docs/4.x/deployment/01deploy
=================================
如果文章对你有帮助,请不要忘记加个关注、点个赞!