5.2 消费端保障
5.2.1 注意幂等性
应用程序在使用RocketMQ进行消息消费时必须支持幂等消费,即同一个消息被消费多次和消费一次的结果一样,这一点在使用RoketMQ或者分析RocketMQ源代码之前再怎么强调也不为过。
“至少一次送达”的消息交付策略,和消息重复消费是一对共生的因果关系,要做到不丢消息就无法避免消息重复消费,原因很简单,试想一下这样的场景:客户端接收到消息并完成了消费,在消费确认过程中发生了通讯错误,从Broker的角度是无法得知客户端是在接收消息过程中出错还是在消费确认过程中出错,为了确保不丢消息,重发消息是唯一的选择。
有了消息幂等消费约定的基础,RocketMQ就能够有针对性地采取一些性能优化措施,例如:并行消费、消费进度同步机制等,这也是RocketMQ性能优异的原因之一。
5.2.2 消息消费模式
从不同的维度划分,Consumer支持以下消费模式:
- 广播消费模式下,消息消费失败不会进行重试,消费进度保存在Consumer端;
- 集群消费模式下,消息消费失败有机会进行重试,消费进度集中保存在Broker端。
5.2.2.1 集群消费
使用相同 Group ID 的订阅者属于同一个集群,同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点
注意事项
- 消费端集群化部署, 每条消息只需要被处理一次。
- 由于消费进度在服务端维护, 可靠性更高。
- 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
- 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
5.2.2.2 广播消费
广播消费指的是:一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次,广播消费中ConsumerGroup概念可以认为在消息划分方面无意义。
注意事项
- 广播消费模式下不支持顺序消息。
- 广播消费模式下不支持重置消费位点。
- 每条消息都需要被相同逻辑的多台机器处理。
- 消费进度在客户端维护,出现重复的概率稍大于集群模式。
- 广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
- 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过, 请谨慎选择。
- 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
- 目前仅 Java 客户端支持广播模式。
- 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
5.2.2.3 集群模式模拟广播
如果业务需要使用广播模式,也可以创建多个 Group ID,用于订阅同一个 Topic。
注意事项
- 每条消息都需要被多台机器处理,每台机器的逻辑可以相同也可以不一样。
- 消费进度在服务端维护,可靠性高于广播模式。
- 对于一个 Group ID 来说,可以部署一个消费端实例,也可以部署多个消费端实例。当部署多个消费端实例时,实例之间又组成了集群模式(共同分担消费消息)。假设 Group ID 1 部署了三个消费者实例 C1、C2、C3,那么这三个实例将共同分担服务器发送给 Group ID 1 的消息。同时,实例之间订阅关系必须保持一致。
5.2.3 消息消费模式
RocketMQ消息消费本质上是基于的拉(pull)模式,consumer主动向消息服务器broker拉取消息。
- 推消息模式下,消费进度的递增是由RocketMQ内部自动维护的;
- 拉消息模式下,消费进度的变更需要上层应用自己负责维护,RocketMQ只提供消费进度保存和查询功能。
5.2.3.1 推模式(PUSH)
我们上面使用的消费者都是PUSH模式,也是最常用的消费模式
由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常。
实现方式,代码上使用 DefaultMQPushConsumer
consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送(push)过来的。主要用的也是这种方式。
5.2.3.2 拉模式(PULL)
RocketMQ的PUSH模式是由PULL模式来实现的
由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能。
5.2.3.3 注意事项
注意:RocketMQ 4.6.0版本后将弃用DefaultMQPullConsumer
DefaultMQPullConsumer方式需要手动管理偏移量,官方已经被废弃,将在2022年进行删除
DefaultLitePullConsumer
该类是官方推荐使用的手动拉取的实现类,偏移量提交由RocketMQ管理,不需要手动管理
5.2.4 消息确认机制
consumer的每个实例是靠队列分配来决定如何消费消息的,那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试)
为了保证数据不被丢失,RocketMQ支持消息确认机制,即ack。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。
5.2.4.1 确认消费
业务实现消费回调的时候,当且仅当此回调函数返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
,RocketMQ才会认为这批消息(默认是1条)是消费完成的。
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
execute();//执行真正消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
})
5.2.4.2 消费异常
如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回
ConsumeConcurrentlyStatus.RECONSUME_LATER
,RocketMQ就会认为这批消息消费失败了。
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
execute();//执行真正消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER
}
})
为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup,而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列,应用可以监控死信队列来做人工干预。
5.2.5 消息重试机制
5.2.5.1 顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列RocketMQ版会自动不断地进行消息重试(每次间隔时间为1秒),这时,应用会出现消息消费被阻塞的情况,因此,建议您使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
5.2.5.2 无序消息的重试
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
5.2.5.3 重试次数
消息队列RocketMQ版默认允许每条消息最多重试16次,每次重试的间隔时间如下。
第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
---|---|---|---|
1 | 10秒 | 9 | 7分钟 |
2 | 30秒 | 10 | 8分钟 |
3 | 1分钟 | 11 | 9分钟 |
4 | 2分钟 | 12 | 10分钟 |
5 | 3分钟 | 13 | 20分钟 |
6 | 4分钟 | 14 | 30分钟 |
7 | 5分钟 | 15 | 1小时 |
8 | 6分钟 | 16 | 2小时 |
如果消息重试16次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的4小时46分钟之内进行16次重试,超过这个时间范围消息将不再重试投递。
5.2.5.4 和生产端重试区别
消费者和生产者的重试还是有区别的,主要有两点
- 默认重试次数:Product默认是2次,而Consumer默认是16次。
- 重试时间间隔:Product是立刻重试,而Consumer是有一定时间间隔的。它照
1S,5S,10S,30S,1M,2M····2H
进行重试。
注意:Product在异步情况重试失效,而对于Consumer在广播情况下重试失效。
5.2.5.5 重试配置方式
需要重试
消费失败后,重试配置方式,集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
- 方式1:返回RECONSUME_LATER(推荐)
- 方式2:返回Null
- 方式3:抛出异常
无需重试
集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回Action.CommitMessage,此后这条消息将不会再重试。
//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
//消息处理逻辑抛出异常,消息将重试。
try {
doConsumeMessage(list);
}catch (Exception e){
//捕获消费逻辑中的所有异常,并返回Action.CommitMessage;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//业务方正常消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
5.3 死信队列
在正常情况下无法被消费(超过最大重试次数)的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列就称为死信队列(Dead-Letter Queue)
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次 数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。 在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
5.3.1 死信特性
5.3.1.1 死信消息特性
- 不会再被消费者正常消费
- 有效期与正常消息相同,均为 3 天,3 天后会被自动删除,故死信消息应在产生的 3 天内及时处理
5.3.1.2 死信队列特性
- 一个死信队列对应一个消费者组,而不是对应单个消费者实例
- 一个死信队列包含了对应的 Group ID 所产生的所有死信消息,不论该消息属于哪个 Topic
- 若一个 Group ID 没有产生过死信消息,则 RocketMQ 不会为其创建相应的死信队列
6. Redis 轮询队列
redis队列中存放车辆信息,调度系统从队列中获取车辆信息,打车完成后再将车辆信息放回队列中
6.1 相关代码
6.1.1 redis获取车辆
从list左侧弹出一个车辆
/**
* 从Redis List列表中拿取一个车辆ID
* 如果没有获取到延时10S
*
* @return
*/
public String takeVehicle() {
//从Redis List列表中拿取一个车辆ID
return redisTemplate.opsForList().leftPop(DispatchConstant.VEHICLE_QUEUE, 1, TimeUnit.SECONDS);
}
6.1.2 redis压入车辆
检查车辆状态,并从右侧压入车辆
/**
* 设置车辆状态为Ready
*
* @param vehicleId
*/
public void readyDispatch(String vehicleId) {
//检查车辆状态
DispatchConstant.DispatchType vehicleDispatchType = taxiVehicleStatus(vehicleId);
//如果车辆时运行状态
if (vehicleDispatchType.isRunning() || vehicleDispatchType.isReady()) {
redisTemplate.opsForValue().set(DispatchConstant.VEHICLE_STATUS_PREFIX + vehicleId, DispatchConstant.DispatchType.READY.toString());
//从右侧压入车辆
redisTemplate.opsForList().rightPush(DispatchConstant.VEHICLE_QUEUE, vehicleId);
}
}