【RocketMQ】浅谈消息发送机制
参考资料:
消息发送核心参数与工作原理详解
RocketMQ消息发送流程
RocketMQ 消息发送 原理详解 源码剖析
结合实际应用场景谈消息发送
《RocketMQ技术内幕》
文章目录
- 【RocketMQ】浅谈消息发送机制
- 一、认识RocketMQ消息——Message
- 二、生产者启动流程
- 2.1、初识 DefaultMQProducer 消息发送者
- 2.2、消息生产者启动流程
- 三、消息发送基本流程
- 3.1、消息长度验证
- 3.2、 查找主题路由信息
- 3.3、选择消息队列
- 3.3.1、重试机制
- 3.3.2、默认机制
- 3.3.3、Broker 故障延迟机制
- 3.4、消息发送
- 3.4.1、获取 Broker 的网络地址
- 3.4.2、分配全局唯一ID、设置系统标记
- 3.4.3、执行注册的钩子函数的增强逻辑
- 3.4.4、构建消息发送请求包
- 3.4.5、进行网络传输
- 3.4.6、执行注册的钩子函数的after逻辑
- 四、消息的发送方式
- 4.1、三种发送方式
- 4.2、代码演示
- 4.3、面试&实践经验
- 五、消息批量发送
- 六、总结
发送消息是 MQ 最基础的操作之一。RocketMQ 官方提供了多语言客户端支持消息的发送和消费操作。当然,消息发送并不仅仅牵扯到客户端操作。客户端做的是向 Broker 发送请求,请求中包含了消息的全部信息。而 Broker 需要处理客户端发送来的生产请求,将消息存储起来。在这篇文章中我将解析消息发送流程中生产者和 Broker 的处理流程,揭秘 RocketMQ 消息发送高性能、高可靠的原理。
RocketMQ 发送普通消息有三种实现方式:可靠同步发送、可靠异步发送、单向 (Oneway) 发送。本文主要聚焦在 RocketMQ 如何发送消息,然后从消息的数据结构开始,逐步介绍生产者的启动流程和消息发送的流程,最后再详细阐述批量消息发送。
一、认识RocketMQ消息——Message
RocketMQ 消息封装类是:org.apache.rocketmq.common.message
Message 的基础属性主要包括消息所属主题 tfopic、消息 Flag(RocketMQ 不做处理、 扩展属性、消息体。
RocketMQ 定义的 MessageFlag
如图所示。
Message 扩展属性主要包含下面几个。
tag
:消息 TAG,用于消息过滤。keys
: Message 索引键,多个用空格隔开,RocketMQ 可以根据这些 key 快速检索到消息。waitStoreMsgOK
:消息发送时是否等消息存储完成后再返回。delay TimeLevel
:消息延迟级别,用于定时消息或消息重试。
这些扩展属性存储在 Message 的 properties 中。
二、生产者启动流程
消息生产者的代码都在 client 模块中,相对于 RocketMQ 来说,它就是客户端,也是消息的提供者,我们在应用系统中初始化生产者的一个实例即可使用它来发消息。
2.1、初识 DefaultMQProducer 消息发送者
DefaultMQProducer 是默认的消息生产者实现类,它实现 MQAdmin 的接口。
- DefaultMQProducer 核心属性
- DefaultMQProducer 的主要方法。
2.2、消息生产者启动流程
消息生产者是如何一步一步启动的呢?我们可以从这个类的 DefaultMQProducerlmpl
的 start 方法来跟踪,具体细节如下。
- Step1:检查
productGroup
是否符合要求;并改变生产者的instanceName
为进程ID。
- Step2:创建
MQClientInstance
实例.整个 JVM实例中只存在一个MQClientManager
实例,维护一个MQClientInstance
缓存表 ConcurrentMap<String/* clientld */, MQClientInstance> factory Table =new ConcurrentHashMap<String, MQClientInstance>(),也就是同一个 clientld 只会创建一个MQClientInstance
。
- 创建 clientld 的方法。
clientld 为客户端IP+instance+(unitname 可选),如果在同一台物理服务器部署两个应用程序,应用程序岂不是 clientld 相同,会造成混乱?
为了避免这个问题,如果 instance 为默认值 DEFAULT 的话,RocketMQ会自动将 instance 设置为进程ID,这样避免了不同进程的相互影响,但同一个JVM中的不同消费者和不同生产者在启动时获取到的 MQClientinstane 实例都是同一个。根据后面的介绍, MQClientInstance 封装了 RocketMQ 网络处理 API,是消息生产者(Producer)、消息消费者 (Consumer)与NameServer、Broker 打交道的网络通道。
-
Step3:向 MQClientInstance 注册,将当前生产者加人到 MQClientInstance 管理中,方便后续调用网络请求、进行心跳检测等.
-
Step4:启动 MQClientInstance,如果 MQClientInstance 已经启动,则本次启动不会真正执行。
三、消息发送基本流程
消息发送流程主要的步骤:验证消息、查找路由、消息发送(包含异常处理机制)。
本节主要以 SendResult sendMessage(Messsage message) 方法为突破口,窥探一下消息发送的基本实现流程。
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
return sendByAccumulator(msg, null, null);
} else {
return sendDirect(msg, null, null);
}
}
3.1、消息长度验证
消息发送之前,首先确保生产者处于运行状态,然后验证消息是否符合相应的规范, 具体的规范要求是主题名称、消息体不能为空、消息长度不能等于0且默认不能超过允许发送消息的最大长度 4M (maxMessageSize=1024* 1024* 4)。
3.2、 查找主题路由信息
消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息要发送到具体的 Broker 节点。
tryToFindTopicPublishinfo 是查找主题的路由信息的方法。如果生产者中缓存了 topic 的路由信息,如果该路由信息中包含了消息队列,则直接返回该路由信息,如果没有缓存或没有包含消息队列,则向 NameServer 查询该 topic 的路由信息。如果最终未找到路由信息,则抛出异常:无法找到主题相关路由信息异常。
第一次发送消息时,本地没有缓存 topic 的路由信息,查询 NameServer 尝试获取,如果路由信息未找到,再次尝试用默认主题 DefautMQProducerlmpl#createTopicKey 去查询, 如果 Brok erConfig#autoCreate TopicEnable 为 true 时,NameServer 将返回路由信息,如果 autoCreateTopicEnable 为 false 将抛出无法找到 topie 路由异常。
3.3、选择消息队列
根据路由信息选择消息队列,返回的消息队列按照 broker、序号排序。举例说明,如果 topicA 在 broker-a,broker-b ,broker-c 上分别创建了 2 个队列,那么返回的消息队列:
[
{
"brokerName":"broker-a",
"queueId":0
},
{
"brokerName":"broker-a",
"queueId":1
},
{
"brokerName":"broker-b",
"queueId":0
},
{
"brokerName":"broker-b",
"queueId":1
},
{
"brokerName":"broker-c",
"queueId":0
},
{
"brokerName":"broker-c",
"queueId":1
}
]
3.3.1、重试机制
首先消息发送端采用重试机制,由 retry Times WhenSendFailed 指定同步方式重试次数,异步重试机制在收到消息发送结构后执行回调之前进行重试。由 retry Times WhenSend- AsyncFailed 指定,接下来就是循环执行,选择消息队列、发送消息,发送成功则返回,收到异常则重试。
/**
* 同步模式下内部尝试发送消息的最大次数
*/
private int retryTimesWhenSendFailed = 2
/**
* 异步模式下内部尝试发送消息的最大次数
*/
private int retryTimesWhenSendAsyncFailed = 2;
选择消息队列有两种方式。
- sendLatencyFaultEnable-false,默认不启用 Broker 故障延迟机制。
- sendLatencyFaultEnable=true,启用 Broker 故障延迟机制。
3.3.2、默认机制
没有开启故障延迟:sendLat encyFaultEnable-false,调用 TopicPublishInfo#selectOneMessageQueue.
首先在一次消息发送过程中,可能会多次执行选择消息队列这个方法,lastBrokerName 就是上一次选择的执行发送消息失败的 Broker。第一次执行消息队列选择时, lastBrokerName 为 null,此时直接用 sendWhichQueue 自增再荻取值,与当前路由表中消息队列个数取模,返回该位置的 MessageQueue(seleetOneMessageQueue( 方法),如果消息发送再失败的话,下次进行消息队列选择时规避上次 MesageQueue 所在的 Broker,否则还是很有可能再次失败。
该算法在一次消息发送过程中能成功规避故障的 Broker,但如果 Broker 宕机,由于路由算法中的消息队列是按 Broker 排序的,如果上一次根据路由算法选择的是宕机的 Broker 的第一个队列,那么随后的下次选择的是宕机 Broker 的第二个队列,消息发送很有可能会失败,再次引发重试,带来不必要的性能损耗,那么有什么方法在一次消息发送失败后, 暂时将该 Broker 排除在消息队列选择范围外呢?或许有朋友会问,Broker 不可用后,路由信息中为什么还会包含该 Broker 的路由信息呢?其实这不难解释:首先,NameServer 检测 Broker 是否可用是有延迟的,最短为一次心跳检测间隔(10s);其次,NameServer 不会检测到 Broker宕机后马上推送消息给消息生产者,而是消息生产者每隔 30s更新一次路由信息,所以消息生产者最快感知 Broker最新的路由信息也需要 30s。如果能引入一种机制, 在 Broker 宕机期间,如果一次消息发送失败后,可以将该 Broker 暂时排除在消息队列的选择范围中。
3.3.3、Broker 故障延迟机制
在发送之前 sendWhichQueue 该值为 broker-a 的 q1,如果由于此时 broker-a 的突发流量异常大导致消息发送失败,会触发重试,按照轮循机制,下一个选择的队列为 broker-a 的 q2 队列,此次消息发送大概率还是会失败,即尽管会重试 2 次,但都是发送给同一个 Broker 处理,此过程会显得不那么靠谱,即大概率还是会失败,那这样重试的意义将大打折扣。
故 RocketMQ 为了解决该问题,引入了故障规避机制,在消息重试的时候,会尽量规避上一次发送的 Broker,回到上述示例,当消息发往 broker-a q1 队列时返回发送失败,那重试的时候,会先排除 broker-a 中所有队列,即这次会选择 broker-b q1 队列,增大消息发送的成功率。
- sendLatencyFaultEnable 设置为 true:开启延迟规避机制。
一旦消息发送失败会将 broker-a “悲观”地认为在接下来的一段时间内该 Broker 不可用,在为未来某一段时间内所有的客户端不会向该 Broker 发送消息。这个延迟时间就是通过 notAvailableDuration、latencyMax 共同计算的,就首先先计算本次消息发送失败所耗的时延,然后对应 latencyMax 中哪个区间,即计算在 latencyMax 的下标,然后返回 notAvailableDuration 同一个下标对应的延迟值。
温馨提示:如果所有的 Broker 都触发了故障规避,并且 Broker 只是那一瞬间压力大,那岂不是明明存在可用的 Broker,但经过你这样规避,反倒是没有 Broker 可用来,那岂不是更糟糕了?针对这个问题,会退化到队列轮循机制,即不考虑故障规避这个因素,按自然顺序进行选择进行兜底。
- 实战经验分享
RocketMQ Broker 的繁忙基本都是瞬时的,而且通常与系统 PageCache 内核的管理相关,很快就能恢复,故不建议开启延迟机制。因为一旦开启延迟机制,例如 5 分钟内不会向一个 Broker 发送消息,这样会导致消息在其他 Broker 激增,从而会导致部分消费端无法消费到消息,增大其他消费者的处理压力,导致整体消费性能的下降。
- 核心源码分析
private long[] latencyMax = {50L, 100L, 550L, 1800L, 3000L, 5000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 2000L, 5000L, 6000L, 10000L, 30000L};
latencyMax,根据 currentLatency 本次消息发送延迟,从latencyMax尾部向前找到第一个比 currentLatency 小的索引index,如果没有找到,返回0。然后根据这个索引从 notAvailableDuration 数组中取出对应的时间,在这个时长内,Broker 将设置为不可用。
下面从源码的角度分析 updateFaultltem、 isAvailable 方法的实现原理,如下所示。
上述代码如果发送过程中抛出了异常,调用 DefaulMQProducerlmpl#updateFaultltem,, 该方法则直接调用 MQFaultStrategy#updateFaultltem 方法,关注一下各个参数的含义。
第一个参数:broker 名称。
第二个参数:本次消息发送延迟时间 currentLatency。
第三个参数:isolation,是否隔离,该参数的含义如果为 true,则使用默认时长30s来计算 Broker 故障规避时长,如果为 false,则使用本次消息发送延迟时间来计算 Broker 故障规避时长。
如果 isolation 为 true,则使用30s 作为 computeNotAvailableDuration 方法的参数;如果 isolation 为 false,则使用本次消息发送时延作为 computeNotAvailableDuration 方法的参数,那 computeNotAvailableDuration 的作用是计算因本次消息发送故障需要将 Broker 规避的时长,也就是接下来多久的时间内该 Broker 将不参与消息发送队列负载。具体算法:从 latencyMax 数组尾部开始寻找,找到第一个比 currentLatency 小的下标,然后从 notAvailableDuration 数组中获取需要规避的时长,该方法最终调用 LatencyFaultTolerance 的 updateFaultltem。
3.4、消息发送
消息发送 API核心人口:DefautMQProducerlmpl#sendKernelImpl。
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout)
消息发送参数详解:
Message msg
:待发送消息.MessageQueue mq
:消息将发送到该消息队列上.CommunicationMode communicationMode
:消息发送模式,SYNC、 ASYNC、 ONEWAY.SendCallback sendCallback
:异步消息回调函数.TopicPublishInfo topicPublishInfo
:主题路由信息 .long timeout
:消息发送超时时间.
3.4.1、获取 Broker 的网络地址
Step1:根据 MessageQueue 获取 Broker 的网络地址。如果 MQClientlnstance 的 brokerAddrTable 未缓存该 Broker 的信息,则从 NameServer 主动更新一下 topic 的路由信息。如果路由更新后还是找不到 Broker 信息,则抛出 MQClientException,提示 Broker 不存在。
3.4.2、分配全局唯一ID、设置系统标记
Step2:为消息分配全局唯一ID,如果消息体默认超过 4K(compressMsgBodyOverHowmuch), 会对消息体采用 zip压缩,并设置消息的系统标记为 MessageSysFlag.COMPRESSED_FLAG。如果是事务 Prepared 消息,则设置消息的系统标记为 MessageSysFlag.TRANSACTION_PREPARED_TYPE。
3.4.3、执行注册的钩子函数的增强逻辑
Step3:如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。通过DefaultMQProducerlmpl#registerSendMessageHook 注册钩子处理类,并且可以注册多个。
看一下钩子处理类接口。
3.4.4、构建消息发送请求包
Step4:构建消息发送请求包。主要包含如下重要信息:生产者组、主题名称、默认创建主题 Key、该主题在单个 Broker默认队列数、队列ID(队列序号)、消息系统标记 MessageSysFlag)、消息发送时间、消息标记(RocketMQ 对消息中的 flag 不做任何处理, 供应用程序使用)、消息扩展属性、消息重试次数、是否是批量消息等。
3.4.5、进行网络传输
Step5:根据消息发送方式,同步、异步、单向方式进行网络传输。
3.4.6、执行注册的钩子函数的after逻辑
Step6:如果注册了消息发送钩子函数,执行 after 逻辑。注意,就算消息发送过程中发生 Remoting Exception、 MQ BrokerException、 InterruptedException 时该方法也会执行。
四、消息的发送方式
4.1、三种发送方式
RocketMQ 支持同步、异步、Oneway 三种发送方式。
- 同步:客户端发起一次消息发送后会同步等待服务器的响应结果。
- 异步:客户端发起一下消息发起请求后不等待服务器响应结果而是立即返回,这样不会阻塞客户端子线程,当客户端收到服务端(Broker)的响应结果后会自动调用回调函数。
- Oneway:客户端发起消息发送请求后并不会等待服务器的响应结果,也不会调用回调函数,即不关心消息的最终发送结果。
4.2、代码演示
下面首先用 Demo 演示一下异步消息发送模式。
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("testProducerGroup");
producer.setNamesrvAddr("192.168.3.10:9876");
try {
producer.start();
//发送单条消息
Message msg = new Message("TOPIC_TEST", "hello rocketmq".getBytes());
producer.send(msg, new SendCallback() {
// 消息发送成功回调函数
public void onSuccess(SendResult sendResult) {
System.out.printf("%s%n", sendResult);
}
// 消息发送失败回调函数
public void onException(Throwable e) {
e.printStackTrace();
// 消息发送失败,可以在这里做补偿,例如将消息存储到数据库,定时重试。
}
});
} catch (Throwable e) {
e.printStackTrace();
//消息发送失败,可以在这里做补偿,例如将消息存储到数据库,定时重试。
}
Thread.sleep(3000);
// 使用完毕后,关闭消息发送者
// 基于 Spring Boot 的应用,在消息发送的时候并不会调用 shutdown 方法,而是等到 spring 容器停止
producer.shutdown();
}
Oneway 方式通常用于发送一些不太重要的消息,例如操作日志,偶然出现消息丢失对业务无影响。那实际生产中,同步发送与异步发送该如何选择呢?
4.3、面试&实践经验
在面试中回答如何选择同步发送还是异步发送时,首先简单介绍一下异步发送的实现原理:
- 每一个消息发送者实例(DefaultMQProducer)内部会创建一个异步消息发送线程池,默认线程数量为 CPU 核数,线程池内部持有一个有界队列,默认长度为 5W,并且会控制异步调用的最大并发度,默认为 65536,其可以通过参数 clientAsyncSemaphoreValue 来配置。
- 客户端使线程池将消息发送到服务端,服务端处理完成后,返回结构并根据是否发生异常调用 SendCallback 回调函数。
实践建议如下:
MQ 与应用服务器都在一个内部网络中,网络通信的流量通常可以忽略,而且 MQ 的设计目的是低延迟、高并发,故通常没有必要使用异步发送,通常都是为了提高 RocketMQ Broker 端相关的参数,特别是刷盘策略和复制策略。如果在一个场景中,一个数据库操作事务中需要发送多条消息,这个时候使用异步发送也会带来一定性能提升。
如果使用异步发送,编程模型上会稍显复杂,其补偿机制、容错机制将会变的较为复杂。
正如上述示例中阐述的那样,补偿代码应该在两个地方调用:
producer#send
方法时需要捕捉异常,常见的异常信息:MQClientException("executor rejected ", e)
。- 在 SendCallback 的 onException 中进行补偿,常见异常有调用超时、RemotingTooMuchRequestException。
五、消息批量发送
批量消息发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。当然,并不是在同一批次中发送的消息数量越多性能就越好,其判断依据是单条消息的长度,如果单条消息内容比较长,则打包多条消息发送会影响其他线程发送消息的响应时间,并且单批次消息发送总长度不能超过 Default MQProducer#maxMessageSize。批量消息发送要解决的是如何将这些消息编码以便服务端能够正确解码出每条消息的消息内容。
批量消息 MessageBatch
类继承普通消息类 Message
,内部仅仅多了消息列表 List<Message> messages
。这样就可以像发送普通消息一样发送批量消息。发送前需要做的就是将多条普通消息体编码,放到 MessageBatch
的消息体中。
服务端接收到后,按照相同规则对批量消息进行解码,即可解码出多条消息。
六、总结
本文重点剖析了消息发送的整个流程,重点如下。
- 消息生产者启动流程
重点理解 MQClientInstance、消息生产者之间的关系。
- 消息队列负载机制
消息生产者在发送消息时,如果本地路由表中未缓存 topic 的路由信息,向 Name- Server 发送获取路由信息请求,更新本地路由信息表,并且消息生产者每隔 30s 从 Name- Server 更新路由表。
- 消息发送异常机制
消息发送高可用主要通过两个手段:重试与 Broker 规避。Broker 规避就是在一次消息发送过程中发现错误,在某一时间段内,消息生产者不会选择该 Broker( 消息服务器)上的消息队列,提高发送消息的成功率。
- 批量消息发送
RocketMQ 支持将同一主题下的多条消息一次性发送到消息服务端。