先稍微介绍下RocketMQ架构。
主从架构
-
Broker 集群:每个 Broker 分为 Master 和 Slave 角色,Master 负责读写,Slave 作为热备。
-
同步复制(SYNC_MASTER):消息写入 Master 后,需等待 Slave 同步完成才返回成功,保证数据强一致(牺牲部分性能)。
-
异步复制(ASYNC_MASTER):消息写入 Master 后立即返回成功,Slave 异步复制,性能更高但可能丢失少量数据。
-
-
故障自动切换:
-
基于 Dledger(Raft 协议实现):在 RocketMQ 4.5+ 版本中,Dledger 实现自动选主,当 Master 宕机时,Slave 自动晋升为新 Master,避免单点故障。
-
旧版本通过 HA(High Availability) 服务手动切换。
-
NameServer 高可用
-
NameServer 无状态集群:NameServer 作为轻量级服务发现组件,多个节点互相独立,无数据同步依赖,通过客户端轮询访问不同节点实现容灾。
-
NameServer 节点间的路由信息可能存在短暂不一致(如某个 NameServer 尚未收到 Broker 的最新心跳),但通过以下机制保证最终一致:
-
Broker 向所有 NameServer 周期性上报心跳,存活节点最终会同步最新状态。
-
客户端定时拉取路由表,覆盖旧缓存。
-
-
-
客户端缓存路由信息:Producer 和 Consumer 会缓存 Topic 的路由信息,即使 NameServer 短暂不可用,仍能继续收发消息。
消息可靠传输
从消息可靠传输可以看到很多RocketMQ高可用的设计。
生产端
使用Dledger集群方案
DLedger是一套基于Raft协议的分布式日志存储组件,在使用Dledger技术搭建的RocketMQ集群中,Dledger会通过两阶段提交的方式保证文件在主从之间成功同步。
配置同步刷盘
SYNC_FLUSH(同步刷新)相比于ASYNC_FLUSH(异步处理)会损失很多性能,但是也更可靠,所以需要根据实际的业务场景做好权衡。
使用事务消息
基于Apache RocketMQ实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。
观察回执状态
对于消费可靠传输,先关注下生产端的回执状态,RocketMQ 的同步发送(syncSend)和异步发送(asyncSend)均会返回以下状态(通过 SendStatus
枚举类定义):
SEND_OK
-
含义:消息发送成功,且满足 Broker 的持久化策略。
-
触发条件:
-
消息已成功写入 Broker 的 CommitLog。
-
若 Broker 配置为
SYNC_MASTER
(同步复制),需主节点和从节点均刷盘完成。 -
若配置为
ASYNC_MASTER
(异步复制),只需主节点刷盘完成。
-
-
处理建议:
-
业务可认为消息已可靠存储,无需重试。
-
需注意:
SEND_OK
仅表示 Broker 接收成功,不保证消费者一定能消费到(例如网络故障导致未推送)。
-
FLUSH_DISK_TIMEOUT
-
含义:Broker 刷盘超时。
-
触发条件:
-
Broker 的刷盘策略为
SYNC_FLUSH
(同步刷盘)时,等待刷盘超时(默认5秒)。 -
磁盘 I/O 压力大或故障导致无法及时刷盘。
-
-
处理建议:
-
消息可能未持久化到磁盘,存在丢失风险。
-
需重试发送,并监控 Broker 磁盘状态。
-
FLUSH_SLAVE_TIMEOUT
-
含义:主节点同步到从节点(Slave)超时。
-
触发条件:
-
Broker 角色为
SYNC_MASTER
(同步复制)。 -
主节点等待 Slave 同步确认超时(默认5秒)。
-
-
处理建议:
-
消息在主节点已持久化,但未同步到 Slave。
-
需重试发送,避免主节点宕机后数据丢失。
-
SLAVE_NOT_AVAILABLE
-
含义:从节点(Slave)不可用。
-
触发条件:
-
Broker 角色为
SYNC_MASTER
,但当前无可用 Slave。 -
Slave 节点宕机或网络隔离。
-
-
处理建议:
-
主节点已接收消息,但无法保证高可用。
-
需检查 Slave 状态,必要时修复集群。
-
Keys的使用
每个消息在业务层面一般建议映射到业务的唯一标识并设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。常见的设置策略使用订单Id、用户Id、请求Id等比较离散的唯一标识来处理。
消费端
不要使用异步消费
正常情况下,消费者端都是需要先处理本地事务,然后再给MQ一个ACK响应,这时MQ就会修改Offset,将消息标记为已消费,从而不再往其他消费者推送消息。所以在Broker的这种重新推送机制下,消息是不会在传输过程中丢失的。但是也会有下面这种情况会造成服务端消息丢失:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
new Thread(){
public void run(){
//处理业务逻辑
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}
};
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
这种异步消费的方式,就有可能造成消息状态返回后消费者本地业务逻辑处理失败造成消息丢失的可能。
RocketMQ不可用降级方案
在这种情况下,RocketMQ相当于整个服务都不可用了,那他本身肯定无法给我们保证消息不丢失了。我们只能自己设计一个降级方案来处理这个问题了。例如在订单系统中,如果多次尝试发送RocketMQ不成功,那就只能另外找给地方,比如本地事务表,把订单消息缓存下来,然后起一个线程定时的扫描这些失败的订单消息,尝试往RocketMQ发送。这样等RocketMQ的服务恢复过来后,就能第一时间把这些消息重新发送出去。