这是RocketMQ的第三篇文章,前两篇文章我们说了一下rocketmq的入门安装和开发配置,以及他的一些名词解释,RocketMQ入门第一次,RocketMQ(二) 领域名词。今天我们来说说的他的一些功能特性。明确区分这些功能特性,方便大家在开发的时候进行使用。例如下面要说的延迟消息的应用场景、事务消息的应用场景,这些关键特性特别是在电商领域等关键场景中有一些使用场景。好了,下面开始介绍。
一、普通消息
这是最通用的一种消息。一般都是使用这种格式的。
应用场景
普通消息一般应用于微服务解耦、事件驱动、数据集成等场景。不要求顺序、处理时机。
功能原理
普通消息是Apache RocketMQ基本消息功能,支持生产者和消费者的异步解耦通信。
生命周期
初始化:生产端构建,待发送到服务端。
待消费:消息发送到服务端,等待消息端消费。
消费中:消息者获取消息,等待处理的过程。如果一段时间没有收到消费者的响应,则服务端会进行重试。
消费提交:消费者处理完成之后,提交处理结果到服务端。服务端不会立即删除消息,方便消费者进行消息回溯。
消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。
使用限制
普通消息仅支持使用MessageType为Normal主题,即普通消息只能发送至类型为普通消息的主题中。
demo
//普通消息发送。
MessageBuilder messageBuilder = new MessageBuilder();
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//消息体。
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
//消费示例一:使用PushConsumer消费普通消息,只需要在消费监听器中处理即可。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
//根据消费结果返回状态。
return ConsumeResult.SUCCESS;
}
};
//消费示例二:使用SimpleConsumer消费普通消息,主动获取消息进行消费处理并提交消费结果。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}
二、延迟消息
RocketMQ的高级特性。
应用场景
分布式定时调度
任务超时处理
基于定时消息处理具备如下优势:
精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。
高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。 Apache RocketMQ 的定时消息具有高并发和水平扩展的能力。
功能原理
定时消息是 Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
定时时间设置原则:
Apache RocketMQ 定时消息设置的定时时间是一个预期触发的系统时间戳,延时时间也需要转换成当前系统时间后的某一个时间戳,而不是一段延时时长。
定时时间的格式为毫秒级的Unix时间戳,您需要将要设置的时刻转换成时间戳形式。
定时时间必须设置在定时时长范围内,超过范围则定时不生效,服务端会立即投递消息。
定时时长最大值默认为24小时,不支持自定义修改。
定时时间必须设置为当前时间之后,若设置到当前时间之前,则定时不生效,服务端会立即投递消息。
定时消息生命周期:
在”初始化“和”待消费“步骤中,插入一个”定时中“生命周期。解释一下这个”定时中”
定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达。
使用限制
定时消息仅支持在 MessageType为Delay 的主题内使用,即定时消息只能发送至类型为定时消息的主题中。
定时消息的定时时长参数精确到毫秒级,但是默认精度为1000ms,即定时消息为秒级精度。
demo
//定时/延时消息发送
MessageBuilder messageBuilder = null;
//以下示例表示:延迟时间为10分钟之后的Unix时间戳。
Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
.setDeliveryTimestamp(deliverTimeStamp)
//消息体
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
//消费示例一:使用PushConsumer消费定时消息,只需要在消费监听器处理即可。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView.getDeliveryTimestamp());
//根据消费结果返回状态。
return ConsumeResult.SUCCESS;
}
};
//消费示例二:使用SimpleConsumer消费定时消息,主动获取消息进行消费处理并提交消费结果。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}
三、顺序消息
也是高级特性。
应用场景
有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。
功能原理
顺序消息是 Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。
Apache RocketMQ 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。
顺序又分为生产顺序和消费顺序:
生产顺序:
保证消息生产的顺序性,则必须满足以下条件:
单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。
串行发送:Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
服务端顺序存储逻辑如下:
相同消息组的消息按照先后顺序被存储在同一个队列。
不同消息组的消息可以混合在同一个队列中,且不保证连续。
消费顺序:
如需保证消息消费的顺序性,则必须满足以下条件:
投递顺序
Apache RocketMQ 通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收---处理---应答的语义处理消息,避免因异步处理导致消息乱序。
有限重试
Apache RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。
生产顺序 | 消费顺序 | 顺序性效果 |
设置消息组,保证消息顺序发送。 | 顺序消费 | 按照消息组粒度,严格保证消息顺序。 同一消息组内的消息的消费顺序和发送顺序完全一致。 |
设置消息组,保证消息顺序发送。 | 并发消费 | 并发消费,尽可能按时间顺序处理。 |
未设置消息组,消息乱序发送。 | 顺序消费 | 按队列存储粒度,严格顺序。 基于 Apache RocketMQ 本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。 |
未设置消息组,消息乱序发送。 | 并发消费 | 并发消费,尽可能按照时间顺序处理。 |
生命周期同普通消息。
demo
//顺序消息发送。
MessageBuilder messageBuilder = null;
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//设置顺序消息的排序分组,该分组尽量保持离散,避免热点排序分组。
.setMessageGroup("fifoGroup001")
//消息体。
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
//消费顺序消息时,需要确保当前消费者分组是顺序投递模式,否则仍然按并发乱序投递。
//消费示例一:使用PushConsumer消费顺序消息,只需要在消费监听器处理即可。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
//根据消费结果返回状态。
return ConsumeResult.SUCCESS;
}
};
//消费示例二:使用SimpleConsumer消费顺序消息,主动获取消息进行消费处理并提交消费结果。
//需要注意的是,同一个MessageGroup的消息,如果前序消息没有消费完成,再次调用Receive是获取不到后续消息的。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}
四、事务消息
同为高级特性。
使用场景
分布式事务。
分布式系统调用的特点为一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。
功能原理
事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
处理流程(参照官网):
生产者将消息发送至Apache RocketMQ服务端。
Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
生产者开始执行本地事务逻辑。
生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
生命周期
在初始化和消费中之间,主要有几点不同:
事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。
demo
发送注意事项:
发送事务消息前,需要开启事务并关联本地的事务执行。
为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。
//演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。
private static boolean checkOrderById(String orderId) {
return true;
}
//演示demo,模拟本地事务的执行结果。
private static boolean doLocalTransaction() {
return true;
}
public static void main(String[] args) throws ClientException {
ClientServiceProvider provider = new ClientServiceProvider();
MessageBuilder messageBuilder = new MessageBuilder();
//构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
Producer producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> {
/**
* 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。
* 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。
*/
final String orderId = messageView.getProperties().get("OrderId");
if (Strings.isNullOrEmpty(orderId)) {
// 错误的消息,直接返回Rollback。
return TransactionResolution.ROLLBACK;
}
return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
})
.build();
//开启事务分支。
final Transaction transaction;
try {
transaction = producer.beginTransaction();
} catch (ClientException e) {
e.printStackTrace();
//事务分支开启失败,直接退出。
return;
}
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。
.addProperty("OrderId", "xxx")
//消息体。
.setBody("messageBody".getBytes())
.build();
//发送半事务消息
final SendReceipt sendReceipt;
try {
sendReceipt = producer.send(message, transaction);
} catch (ClientException e) {
//半事务消息发送失败,事务可以直接退出并回滚。
return;
}
/**
* 执行本地事务,并确定本地事务结果。
* 1. 如果本地事务提交成功,则提交消息事务。
* 2. 如果本地事务提交失败,则回滚消息事务。
* 3. 如果本地事务未知异常,则不处理,等待事务消息回查。
*
*/
boolean localTransactionOk = doLocalTransaction();
if (localTransactionOk) {
try {
transaction.commit();
} catch (ClientException e) {
// 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
e.printStackTrace();
}
} else {
try {
transaction.rollback();
} catch (ClientException e) {
// 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
e.printStackTrace();
}
}
}
五、发送重试和流控
功能背景
消息发送重试
消息发送重试机制主要为您解答如下问题:
部分节点异常是否影响消息发送?
请求重试是否会阻塞业务调用?
请求重试会带来什么不足?
消息流控
流控机制主要为您解答如下问题:
系统在什么情况下会触发流控?
触发流控时客户端行为是什么?
应该如何避免触发流控,以及如何应对突发流控?
消费发送重试
基本概念
客户端连接服务端发起消息发送请求时,可能会因为网络故障、服务异常等原因导致调用失败。为保证消息的可靠性, Apache RocketMQ 在客户端SDK中内置请求重试逻辑,尝试通过重试发送达到最终调用成功的效果。
重试触发条件
触发消息发送重试机制的条件如下:
客户端消息发送请求调用失败或请求超时
网络异常造成连接失败或请求超时。
服务端节点处于重启或下线等状态造成连接失败。
服务端运行慢造成请求超时。
服务端返回失败错误码
系统逻辑错误:因运行逻辑不正确造成的错误。
系统流控错误:因容量超限造成的流控错误。
重试流程
生产者在初始化时设置消息发送最大重试次数,当出现上述触发条件的场景时,生产者客户端会按照设置的重试次数一直重试发送消息,直到消息发送成功或达到最大重试次数重试结束,并在最后一次重试失败后返回调用错误响应。
同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败,抛出错误码和异常。
异步发送:调用线程不会阻塞,但调用结果会通过异常事件或者成功事件返回。
重试间隔
除服务端返回系统流控错误场景,其他触发条件触发重试后,均会立即进行重试,无等待间隔。
若由于服务端返回流控错误触发重试,系统会按照指数退避策略进行延迟重试。指数退避算法通过以下参数控制重试行为:
INITIAL_BACKOFF: 第一次失败重试前后需等待多久,默认值:1秒。
MULTIPLIER :指数退避因子,即退避倍率,默认值:1.6。
JITTER :随机抖动因子,默认值:0.2。
MAX_BACKOFF :等待间隔时间上限,默认值:120秒
MIN_CONNECT_TIMEOUT :最短重试间隔,默认值:20秒。
功能约束
链路耗时阻塞评估
最终异常兜底
消息重复问题
消息流控
概念
消息流控指的是系统容量或水位过高, Apache RocketMQ 服务端会通过快速失败返回流控错误来避免底层资源承受过高压力。
触发条件
存储压力大:消费者分组的初始消费位点为当前队列的最大消费位点。若某些场景例如业务上新等需要回溯到指定时刻前开始消费,此时队列的存储压力会瞬间飙升,触发消息流控。
服务端请求任务排队溢出:若消费者消费能力不足,导致队列中有大量堆积消息,当堆积消息超过一定数量后会触发消息流控,减少下游消费系统压力。
流控行为
当系统触发消息发送流控时,客户端会收到系统限流错误和异常,错误码信息如下:
reply-code:530
reply-text:TOO_MANY_REQUESTS
六、消费者分类
官方提供了三种消费者类型:PushConsumer 、SimpleConsumer 和 PullConsumer。
在实际使用场景中,PullConsumer 仅推荐在流处理框架中集成使用,大多数消息收发场景使用 PushConsumer 和 SimpleConsumer 就可以满足需求。
引用官方的对比表格:
对比项 | PushConsumer | SimpleConsumer | PullConsumer |
接口方式 | 使用监听器回调接口返回消费结果,消费者仅允许在监听器范围内处理消费逻辑。 | 业务方自行实现消息处理,并主动调用接口返回消费结果。 | 业务方自行按队列拉取消息,并可选择性地提交消费结果 |
消费并发度管理 | 由SDK管理消费并发度。 | 由业务方消费逻辑自行管理消费线程。 | 由业务方消费逻辑自行管理消费线程。 |
负载均衡粒度 | 5.0 SDK是消息粒度,更均衡,早期版本是队列维度 | 消息粒度,更均衡 | 队列粒度,吞吐攒批性能更好,但容易不均衡 |
接口灵活度 | 高度封装,不够灵活。 | 原子接口,可灵活自定义。 | 原子接口,可灵活自定义。 |
适用场景 | 适用于无自定义流程的业务消息开发场景。 | 适用于需要高度自定义业务流程的业务开发场景。 | 仅推荐在流处理框架场景下集成使用 |
PushConsumer
// 消费示例:使用PushConsumer消费普通消息。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
// 设置消费者分组。
.setConsumerGroup("YourConsumerGroup")
// 设置接入点。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置消费监听器。
.setMessageListener(new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
// 消费消息并返回处理结果。
return ConsumeResult.SUCCESS;
}
})
.build();
内部原理
在PushConsumer类型中,消息的实时处理能力是基于SDK内部的典型Reactor线程模型实现的。如下图所示,SDK内置了一个长轮询线程,先将消息异步拉取到SDK内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑。
可靠性重试
注意点:
错误方式一:消息还未处理完成,就提前返回消费成功结果。此时如果消息消费失败,Apache RocketMQ 服务端是无法感知的,因此不会进行消费重试。
错误方式二:在消费监听器内将消息再次分发到自定义的其他线程,消费监听器提前返回消费结果。此时如果消息消费失败,Apache RocketMQ 服务端同样无法感知,因此也不会进行消费重试。
顺序性保障
如果消费者分组设置了顺序消费模式,则PushConsumer在触发消费监听器时,严格遵循消息的先后顺序。
适用场景
消息处理时间可预估:如果不确定消息处理耗时,经常有预期之外的长时间耗时的消息,PushConsumer的可靠性保证会频繁触发消息重试机制造成大量重复消息。
无异步化、高级定制场景:PushConsumer限制了消费逻辑的线程模型,由客户端SDK内部按最大吞吐量触发消息处理。该模型开发逻辑简单,但是不允许使用异步化和自定义处理流程。
SimpleConsumer
// 消费示例:使用 SimpleConsumer 消费普通消息,主动获取消息处理并提交。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
// 设置消费者分组。
.setConsumerGroup("YourConsumerGroup")
// 设置接入点。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置从服务端接受消息的最大等待时间
.setAwaitDuration(Duration.ofSeconds(1))
.build();
try {
// SimpleConsumer 需要主动获取消息,并处理。
List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// 消费处理完成后,需要主动调用 ACK 提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
}
});
} catch (ClientException e) {
// 如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
logger.error("Failed to receive message", e);
}
可靠性重试
SimpleConsumer消费者类型中,客户端SDK和服务端通过ReceiveMessage和AckMessage接口通信。客户端SDK如果处理消息成功则调用AckMessage接口;如果处理失败只需要不回复ACK响应,即可在定义的消费不可见时间到达后触发消费重试流程。
顺序性保障
基于 Apache RocketMQ 顺序消息的定义,SimpleConsumer在处理顺序消息时,会按照消息存储的先后顺序获取消息。即需要保持顺序的一组消息中,如果前面的消息未处理完成,则无法获取到后面的消息。
适用场景
消息处理时长不可控:建议使用SimpleConsumer消费类型,可以在消费时自定义消息的预估处理时长,若实际业务中预估的消息处理时长不符合预期,也可以通过接口提前修改。
需要异步化、批量消费等高级定制场景:SimpleConsumer在SDK内部没有复杂的线程封装,完全由业务逻辑自由定制,可以实现异步分发、批量消费等高级定制场景。
需要自定义消费速率:SimpleConsumer是由业务逻辑主动调用接口获取消息,因此可以自由调整获取消息的频率,自定义控制消费速率。
七、消息过滤
消费者订阅了某个主题后,Apache RocketMQ 会将该主题中的所有消息投递给消费者。若消费者只需要关注部分消息,可通过设置过滤条件在 Apache RocketMQ 服务端进行过滤,只获取到需要关注的消息子集,避免接收到大量无效的消息。
应用场景
使用 Apache RocketMQ 的消息过滤功能,可以帮助消费者更高效地过滤自己需要的消息集合,避免大量无效消息投递给消费者,降低下游系统处理压力。
关键流程实现:
生产者:生产者在初始化消息时预先为消息设置一些属性和标签,用于后续消费时指定过滤目标。
消费者:消费者在初始化及后续消费流程中通过调用订阅关系注册接口,向服务端上报需要订阅指定主题的哪些消息,即过滤条件。
服务端:消费者获取消息时会触发服务端的动态过滤计算,Apache RocketMQ 服务端根据消费者上报的过滤条件的表达式进行匹配,并将符合条件的消息投递给消费者。
Tag标签过滤
Tag标签过滤方式是 Apache RocketMQ 提供的基础消息过滤能力,基于生产者为消息设置的Tag标签进行匹配。生产者在发送消息时,设置消息的Tag标签,消费者需指定已有的Tag标签来进行匹配订阅。
Tag标签过滤规则
单Tag匹配:过滤表达式为目标Tag。表示只有消息标签为指定目标Tag的消息符合匹配条件,会被发送给消费者。
多Tag匹配:多个Tag之间为或的关系,不同Tag间使用两个竖线(||)隔开。例如,Tag1||Tag2||Tag3,表示标签为Tag1或Tag2或Tag3的消息都满足匹配条件,都会被发送给消费者进行消费。
全部匹配:使用星号(*)作为全匹配表达式。表示主题下的所有消息都将被发送给消费者进行消费。
demo
发送消息,设置Tag标签:
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
//该示例表示消息的Tag设置为"TagA"。
.setTag("TagA")
//消息体。
.setBody("messageBody".getBytes())
.build();
订阅消息,匹配单个Tag标签:
String topic = "Your Topic";
//只订阅消息标签为"TagA"的消息。
FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG);
pushConsumer.subscribe(topic, filterExpression);
订阅消息,匹配多个Tag标签:
String topic = "Your Topic";
//只订阅消息标签为"TagA"、"TagB"或"TagC"的消息。
FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG);
pushConsumer.subscribe(topic, filterExpression);
订阅消息,匹配Topic中的所有消息,不进行过滤:
String topic = "Your Topic";
//使用Tag标签过滤消息,订阅所有消息。
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
pushConsumer.subscribe(topic, filterExpression);
SQL属性过滤
SQL属性过滤是 Apache RocketMQ 提供的高级消息过滤方式,通过生产者为消息设置的属性(Key)及属性值(Value)进行匹配。
demo
发送消息,同时设置消息Tag标签和自定义属性:
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
//该示例表示消息的Tag设置为"messageTag"。
.setTag("messageTag")
//消息也可以设置自定义的分类属性,例如环境标签、地域、逻辑分支。
//该示例表示为消息自定义一个属性,该属性为地域,属性值为杭州。
.addProperty("Region", "Hangzhou")
//消息体。
.setBody("messageBody".getBytes())
.build();
订阅消息,根据单个自定义属性匹配消息:
String topic = "topic";
//只订阅地域属性为杭州的消息。
FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND Region='Hangzhou'", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);
订阅消息,同时根据多个自定义属性匹配消息:
String topic = "topic";
//只订阅地域属性为杭州且价格属性大于30的消息。
FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND price IS NOT NULL AND Region = 'Hangzhou' AND price > 30", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);
订阅消息,匹配Topic中的所有消息,不进行过滤:
String topic = "topic";
//订阅所有消息。
FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);
八、消费端负载均衡
消费者从 Apache RocketMQ 获取消息消费时,通过消费者负载均衡策略,可将主题内的消息分配给指定消费者分组中的多个消费者共同分担,提高消费并发能力和消费者的水平扩展能力。
广播消费和共享消费
引用官方的图片:
消费组间广播消费 :如上图所示,每个消费者分组只初始化唯一一个消费者,每个消费者可消费到消费者分组内所有的消息,各消费者分组都订阅相同的消息,以此实现单客户端级别的广播一对多推送效果。
该方式一般可用于网关推送、配置推送等场景。
消费组内共享消费 :如上图所示,每个消费者分组下初始化了多个消费者,这些消费者共同分担消费者分组内的所有消息,实现消费者分组内流量的水平拆分和均衡负载。
该方式一般可用于微服务解耦场景。
消费者负载均衡分类
消费者负载均衡策略分为以下两种模式:
消息粒度负载均衡:PushConsumer和SimpleConsumer默认负载策略
队列粒度负载均衡:PullConsumer默认负载策略
消息粒度负载均衡
原理:
消息粒度负载均衡策略中,同一消费者分组内的多个消费者将按照消息粒度平均分摊主题中的所有消息,即同一个队列中的消息,可被平均分配给多个消费者共同消费。
消息粒度的负载均衡机制,是基于内部的单条消息确认语义实现的。消费者获取某条消息后,服务端会将该消息加锁,保证这条消息对其他消费者不可见,直到该消息消费成功或消费超时。因此,即使多个消费者同时消费同一队列的消息,服务端也可保证消息不会被多个消费者重复消费。
demo
消息粒度负载均衡策略不需要额外设置,对于PushConsumer和SimpleConsumer消费者类型默认启用。
SimpleConsumer simpleConsumer = null;
//消费示例一:使用PushConsumer消费普通消息,只需要在消费监听器处理即可,无需关注消息负载均衡。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
//根据消费结果返回状态。
return ConsumeResult.SUCCESS;
}
};
//消费示例二:使用SimpleConsumer消费普通消息,主动获取消息处理并提交。会按照订阅的主题自动获取,无需关注消息负载均衡。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}
队列粒度负载均衡
原理
对于历史版本(服务端4.x/3.x版本)的消费者,包括PullConsumer、DefaultPushConsumer、DefaultPullConsumer、LitePullConsumer等,默认且仅能使用队列粒度负载均衡策略。
适用场景
队列粒度负载均衡策略适用于流式计算、数据聚合等需要明确对消息进行聚合、批处理的场景。
九、消费进度
原理
消息位点(Offset)
消息是按到达服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。
消费位点(ConsumerOffset)
Apache RocketMQ 通过消费位点管理消息的消费进度。每条消息被某个消费者消费完成后不会立即在队列中删除,Apache RocketMQ 会基于每个消费者分组维护一份消费记录,该记录指定消费者分组消费某一个队列时,消费过的最新一条消息的位点,即消费位点。
消费位点初始值
消费位点初始值指的是消费者分组首次启动消费者消费消息时,服务端保存的消费位点的初始值。
重置消费位点
适用场景
初始消费位点不符合需求:因初始消费位点为当前队列的最大消息位点,即客户端会直接从最新消息开始消费。若业务上线时需要消费部分历史消息,您可以通过重置消费位点功能消费到指定时刻前的消息。
消费堆积快速清理:当下游消费系统性能不足或消费速度小于生产速度时,会产生大量堆积消息。若这部分堆积消息可以丢弃,您可以通过重置消费位点快速将消费位点更新到指定位置,绕过这部分堆积的消息,减少下游处理压力。
业务回溯,纠正处理:由于业务消费逻辑出现异常,消息被错误处理。若您希望重新消费这些已被处理的消息,可以通过重置消费位点快速将消费位点更新到历史指定位置,实现消费回溯。
重置功能
重置到队列中的指定位点。
重置到某一时刻对应的消费位点,匹配位点时,服务端会根据自动匹配到该时刻最接近的消费位点。
使用限制
重置消费位点后消费者将直接从重置后的位点开始消费,对于回溯重置类场景,重置后的历史消息大多属于存储冷数据,可能会造成系统压力上升,一般称为冷读现象。因此,需要谨慎评估重置消费位点后的影响。建议严格控制重置消费位点接口的调用权限,避免无意义、高频次的消费位点重置。
Apache RocketMQ 重置消费位点功能只能重置对消费者可见的消息,不能重置定时中、重试等待中的消息。
十、消费重试
消费者出现异常,消费某条消息失败时, Apache RocketMQ 会根据消费重试策略重新投递该消息进行故障恢复。
应用场景
推荐使用消息重试场景如下:
业务处理失败,且失败原因跟当前的消息内容相关,比如该消息对应的事务状态还未获取到,预期一段时间后可执行成功。
消费失败的原因不会导致连续性,即当前消息消费失败是一个小概率事件,不是常态化的失败,后面的消息大概率会消费成功。此时可以对当前消息进行重试,避免进程阻塞。
应用目的
如何保证业务完整处理消息:了解消费重试策略,可以在设计实现消费者逻辑时保证每条消息处理的完整性,避免部分消息出现异常时被忽略,导致业务状态不一致。
系统异常时处理中的消息状态如何恢复:帮助您了解当系统出现异常(宕机故障)等场景时,处理中的消息状态如何恢复,是否会出现状态不一致。
消费策略
消息重试的触发条件
消费失败,包括消费者返回消息失败状态标识或抛出非预期异常。
消息处理超时,包括在PushConsumer中排队超时。
消息重试策略主要行为
重试过程状态机:控制消息在重试流程中的状态和变化逻辑。
重试间隔:上一次消费失败或超时后,下次重新尝试消费的间隔时间。
最大重试次数:消息可被重试消费的最大次数。
官方给出的比较表格:
消费者类型 | 重试过程状态机 | 重试间隔 | 最大重试次数 |
PushConsumer | 已就绪 处理中 待重试 提交 * 死信 | 消费者分组创建时元数据控制。 无序消息:阶梯间隔 顺序消息:固定间隔时间 | 消费者分组创建时的元数据控制。 |
SimpleConsumer | 已就绪 处理中 提交 死信 | 通过API修改获取消息时的不可见时间。 | 消费者分组创建时的元数据控制。 |
PushConsumer消费重试策略
Ready:已就绪状态。消息在Apache RocketMQ服务端已就绪,可以被消费者消费。
Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态。
WaitingRetry:待重试状态,PushConsumer独有的状态。当消费者消息处理失败或消费超时,会触发消费重试逻辑判断。如果当前重试次数未达到最大次数,则该消息变为待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。
Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机。
DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。
demo
SimpleConsumer simpleConsumer = null;
//消费示例:使用PushConsumer消费普通消息,如果消费失败返回错误,即可触发重试。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
//返回消费失败,会自动重试,直至到达最大重试次数。
return ConsumeResult.FAILURE;
}
};
SimpleConsumer消费重试策略
Ready:已就绪状态。消息在Apache RocketMQ服务端已就绪,可以被消费者消费。
Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态。
Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机。
DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。
和PushConsumer消费重试策略不同的是,SimpleConsumer消费者的重试间隔是预分配的,每次获取消息消费者会在调用API时设置一个不可见时间参数 InvisibleDuration,即消息的最大处理时长。若消息消费失败触发重试,不需要设置下一次重试的时间间隔,直接复用不可见时间参数的取值。
demo
//消费示例:使用SimpleConsumer消费普通消息,如果希望重试,只需要静默等待超时即可,服务端会自动重试。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//如果处理失败,希望服务端重试,只需要忽略即可,等待消息再次可见后即可重试获取。
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}
十一、消息的存储和清理
消息存储机制
原理
Apache RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉。
关键问题
消息存储管理粒度:Apache RocketMQ 按存储节点管理消息的存储时长,并不是按照主题或队列粒度来管理。
消息存储判断依据:消息存储按照存储时间作为判断依据,相对于消息数量、消息大小等条件,使用存储时间作为判断依据,更利于业务方对消息数据的价值进行评估。
消息存储和是否消费状态无关:Apache RocketMQ 的消息存储是按照消息的生产时间计算,和消息是否被消费无关。按照统一的计算策略可以有效地简化存储机制。
消息存储管理粒度说明
Apache RocketMQ 按照服务端节点粒度管理存储时长而非队列或主题,原因如下:
消息存储优势权衡:Apache RocketMQ 基于统一的物理日志队列和轻量化逻辑队列的二级组织方式,管理物理数据。这种机制可以带来顺序读写、高吞吐、高性能等优势,但缺点是不支持按主题和队列单独管理。
安全生产和容量保障风险要求:即使Apache RocketMQ 按照主题或者队列独立生成存储文件,但存储层本质还是共享存储介质。单独根据主题或队列控制存储时长,这种方式看似更灵活,但实际上整个集群仍然存在容量风险,可能会导致存储时长SLA被打破。从安全生产角度考虑,最合理的方式是将不同存储时长的消息通过不同集群进行分离治理。
消息过期清理机制
在 Apache RocketMQ中,消息保存时长并不能完整控制消息的实际保存时间,因为消息存储仍然使用本地磁盘,本地磁盘空间不足时,为保证服务稳定性消息仍然会被强制清理,导致消息的实际保存时长小于设置的保存时长。
好了,今天的内容就到这了。