1 消费者负载均衡
消费者从Apache RocketMQ获取消息消费时,通过消费者负载均衡策略,可以将主题内的消息分配给指定消费者分组中的多个消费者共同分担,提高消费并发能力和消费者的水平扩展能力。
1.1 背景信息
了解消费者负载均衡策略,可以解决以下问题:
1、消息消费处理的容灾能力:根据消费者负载均衡策略,明确当局部节点出现故障时,消息如何进行消费重试和容灾切换。
2、消息消费的顺序性机制:通过消费者负载均衡策略,可以进一步了解消息消费时,如何保证同一消息组内消息的先后顺序。
3、消息分配的水平拆分策略:了解消费者负载均衡策略,可以明确消息消费压力如何被分配到不同节点,有针对性的进行流量迁移和水平扩缩容。
1.2 广播消息和共享消费
在Apache RocketMQ领域模型中,同一条消息支持被多个消费者分组订阅,同时,对于每个消费者分组可以初始化多个消费者。可以根据消费者分组和消费者的不同组合,实现以下两种不同的消费效果:
消费组间广播消息:如上图所示,每个消费者分组只初始化唯一一个消费者,每个消费者可消费到消费者分组内的所有的消息,各消费者分组都订阅相同的消息,以此实现但客户端级别的广播一对多推送效果。该方式一般可用于网关推送、配置推送等场景。
消费组内共享消费:如上图所示,每个消费者分组下初始化了多个消费者,这些消费者共同分担消费者分组内的所有消息,实现消费者分组内流量的水平拆分和负载均衡。该方式一般可用于微服务解耦场景。
1.3 什么是消费者负载均衡
如上文所述,消费组间广播消费场景下,每个消费者分组内只有一个消费者,因此不涉及消费者的负载均衡。
消费组内共享消费场景下,消费者分组内多个消费者共同分担消息,消息按照哪种逻辑分配给哪个消费者,就是由消费者负载均衡策略决定的。
根据消费者类型的不同,消费者负载均衡策略分为以下两种模式:
消息粒度负载均衡:PushConsumer和SimpleConsumer默认负载策略。
队列粒度负载均衡:PullConsumer默认负载策略
1.4 消息粒度负载均衡
使用范围
对于PushConsumer和SimpleConsumer类型的消费者,默认且仅使用消息粒度负载均衡策略。
【注意】上述说明是指5.0 SDK,PushConsumer默认使用消息粒度负载均衡。
策略原理
消息粒度负载均衡策略中,同一消费者分组内的多个消费者将按照消息粒度平均分摊主题中的所有消息,即同一个队列中的消息,可被平均分配给多个消费者共同消费。
如上图所示,消费者分组Group A 中有三个消费者A1 A2 和A3 ,这三个消费者将共同主题中同一队列Queue1的多条消息。注意 消息粒度负载均衡策略保证同一个队列的消息可以被多个消费者共同处理,但是该策略使用的消息分配算法结果是随机的,并不能指定消息被哪一个特定的消费者处理。
消息粒度负载均衡机制,是基于内部的单条消息确认语义实现的。消费者获取某条消息后,服务端会将该消息加锁,保证这条信息对其他消费者不可见,直到该消息消费成功或消费超时。因此,即使多个消费者同时消费同一队列的消息,服务端也保证消息不回被多个消费者重复消费。
顺序消息负载机制
在顺序消息中,消息的顺序性指的是同一消息组内的多个消息之间的先后顺序。因此,顺序消息场景下,消息粒度负载均衡策略还需要保证同一消息组内的消息,按照服务端存储的先后顺序进行消费。不同消费者处理同一个消费组内的消息时,会严格按照先后顺序锁定消息状态,确保同一消息组的消息串行消费。
如上图所示,队列Queue1中有4条顺序消息,这4条信息属于同一消息组G1,存储顺序由M1到M4。在消费过程中, 前面的消息M1 M2被消费者ConsumerA1处理,只要消费状态没有提交,消费者A2是无法并行消费后面的M3 M4消息的,必须等前面的消息提交消费状态后才能消费后面的消息。
策略特点
相对于队列粒度负载均衡策略,消息粒度负载均衡策略有以下特点:
1、消费分摊更均衡:对于传统队列级的负载均衡策略,如果队列数量和消费者数量不均衡,则可能会出现部分消费者空闲,或部分消费者处理过多消息的情况。消息粒度负载均衡策略无需关注消费者和队列的相对数量,能够更均匀的分摊消息。
2、对非对等消费者更友好:在线上生产环境中,由于网络机房分区延迟、消费者物理资源规格不一致等原因,消费者的处理能力可能会不一致,如果按照队列分配消息,则可能出现部分消费者消息堆积、部分消费者空闲的情况。消息粒度负载均衡策略按需分配,消费者处理任务更均衡。
3、队列分配运维更方便:传统基于绑定队列的负载均衡策略必须保持队列数量大于等于消费者数量,以免产生部分消费者获取不到队列出现空转的情况,而消息粒度负载均衡策略则无需关注队列数量。
适用场景
消息粒度负载均衡策略下,同一队列内的消息离散的分布于多个消费者,适用于绝大多数在线事件处理的场景。只需要基本的消息处理能力,对消息之间没有批量聚合的诉求。而对于流式处理、聚合计算场景,需要明确地对消息进行聚合、批处理时,更适合使用队列粒度的负载均衡策略。
使用示例
消息粒度负载均衡策略不需要额外设置,对于PushConsumer和SimpleConsumer消费者类型默认启用。
public static void test01() throws ClientException {
//消费示例1:使用PushConsumer消息普通消息,只需要在消费监听器处理即可,无需关注消息负载均衡
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
//根据消费结果返回状态
return ConsumeResult.SUCCESS;
}
};
/*消费示例2:使用SimpleConsumer消息普通主题,主动获取消息处理并提交,会按照订阅的主题
自动获取,无需关注消息负载均衡
*/
SimpleConsumerBuilderImpl simpleConsumerBuilder = new SimpleConsumerBuilderImpl();
SimpleConsumer simpleConsumer = simpleConsumerBuilder.build();
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
try {
simpleConsumer.ack(messageView);
}catch (ClientException e){
e.printStackTrace();
}
});
}catch (ClientException e){
e.printStackTrace();
}
}
1.5队列粒度负载均衡
策略原理
队列粒度负载均衡策略中,同一消费者分组内的多个消费者将按照队列粒度消费消息,即每个队列仅被一个消费者消费。
如上图所示,主题中的三个队列Queue1 Queue2 Queue3 被分配给消费者分组中的两个消费者,每个队列只能分配给一个消费者消费,该示例中由于队列数对于消费者数,因此,消费者A2被分配了2个队列。若队列数小于消费者数量,可能会出现部分消费者无法绑定队列的情况。
队列粒度负载均衡,基于队列数、消费者数量等运行数据进行统一的算法分配,将每个队列绑定到特定的消费者,然后每个消费者按照取消息>提交消费位点>持久化消费位点的消费语义处理消息,取消息过程不提交消费状态,因此,为了避免消息被多个消费者重复消费,每个队列仅支持被一个消费者消费。
【注意】队列粒度负载均衡策略保证同一个队列仅被一个消费者处理,该策略的实现依赖消费者和服务端的信息协商机制,Apache RocketMQ并不能保证协商结果完全一致。因此,在消费者数量、队列数量发生变化时,可能会出现短暂的队列分配结果不一致,从而导致少量消息被重复消费。
策略特点
相对于消息粒度负载均衡策略,队列粒度负载均衡策略分配粒度较大,不够灵活。但该策略在流式处理场景下有天然优势,能够保证同一队列的消息被相同的消费者处理,对于批处理、聚合处理更友好。
1.6 使用建议
针对消费逻辑做消息幂等
无论是消息粒度负载均衡策略还是队列粒度负载均衡策略,在消费者上线或下线、服务端扩缩容等场景下,都会触发短暂的重新负载均衡动作。此时可能会存在短暂的负载不一致情况,出现少量消息重复消费的现象。因此,需要在下游消费逻辑中做好消息幂等去重处理。
2 消费重试
消费者出现异常,消费某条消息失败时,Apache RocketMQ会根据消费重试策略重新投递信息进行故障恢复。
2.1 应用场景
Apache RocketMQ的消费重试主要解决的是业务处理逻辑失败导致的消费完整性问题,是一种为业务兜底的策略,不应该被用来做业务流程控制。建议以下消费失败场景使用重试机制:
1、业务处理失败,且失败原因跟当前的消息内容相关,比如该消息对应的事务状态还未获取到,预期一段时间后可执行成功。
2、消费失败的愿意不会导致连续性,即当前消息消费失败是一个小概率事件,不是常态化的失败,后面的消息大概率会消费成功。此时可以对当前消息进行重试,避免进程阻塞。
错误的使用场景如下:
1、消息处理逻辑中使用消费失败来做条件判断的结果分流,是不合理的,因此处理逻辑已经预见了一定会大量出现该判断分支。
2、消费处理使用消费失败来做处理速率限流是不合理的。限流的目的是将超出流量的消息暂时堆积在队列中达到削峰的作用,而不是让消息进入重试链路。
2.2 应用目的
消息中间件做异步解耦时的一个典型问题是如果下游服务处理消息事件失败,如何保证整个调用链路的完整性。Apache RocketMQ作为金融级的可靠业务消息中间件,在消息投递处理机制的设计上天然支持可靠传输策略,通过完成的确认和重试机制保证每条消息都按照业务的预期被处理。
了解Apache RocketMQ的消息确认机制以及消费重试策略可以分析如下问题:
1、如何保证业务完整处理消息:了解消费重试策略,可以在设计实现消费者逻辑时保证每条消息处理的完整性,避免部分信息出现异常时被忽略,导致业务状态不一致。
2、系统异常时处理的消息状态如何恢复:了解当前系统出现异常(宕机故障等)等场景时,处理中的消息状态如何恢复,是否会出现状态不一致。
2.3 消费重试策略概述
消费重试指的是,消费者在消费某条消息失败后,Apache RocketMQ服务端会根据重试策略重新消费该消息,超过一定次数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中。
消息重试的触发条件
消费失败,包括消费者返回消息失败状态标识或抛出非预期异常。
消息处理超时,包括在PushConsumer中排队超时。
消息重试策略主要行为
重试过程状态:控制消息在重试流程中的状态和变化逻辑。
重试间隔:上一次消费失败或超时后,下次重新尝试消费的间隔时间。
最大重试次数:消息可被重试消费的最大次数
消息重试策略差异
根据消费者类型不同,消息重试策略的具体内部机制和设置方法有所不同,具体差异如下
消费者类型 | 重试过程状态机 | 重试间隔 | 最大重试次数 |
PushConsumer | 已就绪 处理中 待重试 提交 死信 | 消费者分组创建时元数据 控制。 无序消息:阶梯间隔 顺序消息:固定间隔时间 | 消费者分组创建时的元 数据控制 |
SimpleConsumer | 已就绪 处理中 提交 死信 | 通过API修改获取消息时的 不可见时间 | 消费者分组创建时的元数据控制 |
2.4 PushConsumer消费重试策略
重试状态机
PushConsumer 消费消息时,消息的主要几个状态如下:
Ready:已就绪装填。消息在Apache RocketMQ服务端已就绪,可以被消费者消费。
Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态。
WaitingRetry:待重试状态,PushConsumer独有的状态。当消费者消息处理失败或消费超时,会触发消费重试逻辑判断。如果当前重试次数未达到最大次数,则该消息变为待重试状态,经过重试间隔后,消息将重新变成已就绪小黄太可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。
Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机。
DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不回再重试,会被投递至死信队列,可以通过消费死信队列的消息进行业务恢复。
消息重试过程中,每次重试消息状态都会经过已就绪>处理中>待重试的变化,两次消费间的间隔时间实际由消费耗时及重试间隔控制,消费耗时的最大上限受服务端系统参数控制,一般不应超超过上限时间。
最大重试次数
PushConsumer的最大重试次数由消费者分组创建时的元数据控制。例如,最大重试次数为3次,则该消息最多可被投递4次,1次为原始信息,3次为重试投递次数。
重试间隔时间
无序消息:重试间隔为阶梯时间,具体时间如下:
【注意】若重试次数超过16次,后面每次重试间隔都为2个小时。
顺序消息:重试间隔为固定时间,具体取值,请参见参数限制
使用示例
PushConsumer触发消息重试只需要返回消息失败的状态码即可,当出现非预期的异常时,也会被SDK捕获。
public static void test02() throws ClientException {
SimpleConsumerBuilderImpl simpleConsumerBuilder = new SimpleConsumerBuilderImpl();
SimpleConsumer simpleConsumer = simpleConsumerBuilder.build();
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
//返回消费失败,会自动重试,直至达到最大重试次数
return ConsumeResult.FAILURE;
}
};
}
2.5 SimpleConsumer消费重试策略
重试状态机
SimpleConsumer消费消息时,消息的几个主要状态如下:
Ready:已就绪装填。消息在Apache RocketMQ服务端已就绪,可以被消费者消费。
Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态。
Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机。
DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不回再重试,会被投递至死信队列,可以通过消费死信队列的消息进行业务恢复。
和PushConsumer消费重试策略不同的是,SimpleConsumer消费者的重试间隔是预分配的,每次获取消息消费者会在调用API时设置一个不可见时间参数InvisibleDuration,即消息的最大处理时长。若消息消费失败触发重试,不需要设置下一次重试的时间间隔,直接复用不可见时间参数的取值。
由于不可见时间为预分配的,可能是实际业务中的消息处理时间差别较大,可以通过API接口修改不可见时间。
例如,预设消息处理耗时最多20ms,但实际业务中20 ms内消息处理不完,可以修改消息不可见时间,延长消息处理时间,避免消息触发重试机制。
修改消息不可见时间需要满足以下几个条件:
1、消息处理未超时
2、消息处理未提交消费状态
如下图所示,消息不可见时间修改后立即生效,即从调用API时刻开始,重新计算消息不可见时间
最大重试次数
SimpleConsumer的最大重试次数由消费者分组创建时的元数据控制,
消息重试间隔
消息重试间隔=不可见时间-消息实际处理时长
SimpleConsumer的消费重试间隔通过消息的不可见时间控制。例如,消息不可见时间是30 ms,实际消息处理用了10 ms就返回失败相应,则距下次消息重试还需要20 ms,此时的消息重试间隔即为20 ms;若直到30 ms消息还未处理完成且为返回结果,则消息超时,立即重试,此时重试间隔即为0ms。
使用示例
SimpleConsumer触发消息重试只需要等待即可。
public static void test03() throws ClientException {
SimpleConsumerBuilderImpl simpleConsumerBuilder = new SimpleConsumerBuilderImpl();
SimpleConsumer simpleConsumer = simpleConsumerBuilder.build();
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10,Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//如果处理失败,希望服务端重试,需要忽略即可,等待消息再次可见后立即可重试获取
});
}catch (ClientException e){
//如果遇到系统流控第原因造成拉取失败,需要重新发起获取消息请求
e.printStackTrace();
}
}
2.6 使用建议
合理重试,避免因限流等诉求触发消费重试
前面应用场景中提到过,消息重试适用业务处理失败且当前消费失败为小概率事件的场景,不适合在连续性失败的场景下使用,例如消费限流场景。
合理控制重试次数,避免无限重试
虽然Apache RocketMQ支持自定义消费重试次数,但是建议通过减少重试次数+延长重试间隔来降低系统压力,避免出现无限重试或大量重试的情况。
3 消息存储和清理机制
3.1 背景信息
根据Apache RocketMQ中队列的定义,消息按照到达服务器的先后顺序被存储到队列中,理论上每个队列都支持无限存储。
但在实际部署场景中,服务端节点的物理存储空间有限,消息无法做到永久存储,因此,在实际使用中需要考虑以下问题,消息在服务端中的存储以什么维度为判定条件?消息存储以什么粒度进行管理?消息存储超过限制后如何处理?这些问题都是由消息存储和过期清理机制来定义的。
了解消息存储和过期清理机制,可以从以下方面更好的进行运维管理:
1、提供消息存储时间SLA,为业务提供安全冗余空间:消息存储时间的承诺本质上代表业务侧可以自由获取消息的时间范围。对于消息时间长、消息堆积、故障恢复等场景非常关键。
2、评估和控制存储成本:Apache RocketMQ消息一般存储于磁盘上,可以通过存储机制评估存储空间,提前预留存储资源。
3.2 消息存储机制
原理机制
Apache RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉。
消息存储机制主要定义以下关键问题:
1、消息存储管理粒度:Apache RocketMQ按存储节点管理消息的存储时长,并不是按照主题或队列粒度来管理。
2、消息存储判断依据:消息存储按照存储时间作为判断依据,相对于消息数量、消息大小等条件,使用存储时间作为判断依据,更利于业务方队消息数据的价值评估。
3、消息存储和是否消费状态无关:Apache RocketMQ 的消息存储是按照消息的生产时间计算,和消息是否被消费无关。按照统一的计算策略可以有效地简化存储机制。
消息在对队列中存储情况如下:
【备注】消息存储管理粒度说明
Apache RocketMQ按照服务端节点粒度管理存储时长而非队列或主题,原因如下:
1、消息存储优势权衡:Apache RocketMQ基于统一的物理日志队列和轻量化逻辑队列的二级组织方式,管理物理数据。这种机制可以带来顺序读写、高吞吐、高性能等优势,但缺点是不支持按主题和队列单独管理。
2、安全生产和容量保障风险要求:即使Apache RocketMQ按照主题或队列独立生成存储文件,但存储层本质还是共享存储介质。单独根据主题或队列控制存储时长,这种方式看似灵活,但实际上整个集群仍然存在容量风险,可能会导致存储时长SLA被打破。从安全生产角度考虑,最合理的方式还是将不同存储时长的消息通过不同集群进行分离治理。
消息存储和消费状态关系说明
Apache RocketMQ统一管理消息的存储时长,无论消息是否被消费。
当消费者不在线或消息消费异常时,会造成队列中大量消息堆积,且该线程暂时无法有效控制。若此时按照消费状态考虑将未消费的消息全部保留,则很容易导致存储空间不足,进而影响到新消息的读写速度。
根据统一的存储时长管理消息 ,可以帮助消费者业务清晰地判断每条消息的声明周期。只要消息在有效期内可以随时被消费,或通过重置消费位点功能实用新型可被消费多次。
消息存储文件结构说明
Apache RocketMQ消息默认存储在本地磁盘文件中,存储文件的根目录是由配置参数storePathRootDir决定,存储结构如下图所示,其中commitlog文件存储消息物理文件,consumeQueue文件存储逻辑队列索引,其他文件的详细作用可以看到代码解析。
3.3 消息过期清理机制
在Apache RocketMQ中,消息保存时长并不能完整控制消息的实际保存时间,因为消息存储仍然使用本地磁盘,本地磁盘空间不足时,为保证服务稳定性信息仍然会被强制清理,导致消息的实际保存时长小于设置的保存时长。
3.4 使用建议
消息存储时长建议适当增加
Apache RocketMQ按存储时长统一控制信息是否被保留。建议在存储成本可控的前提下,尽可能延长存储时长,可以为紧急故障恢复、应急问题排查和消息回溯带来更多的可操作空间。