消息整体处理过程,这里我们将消息的整体处理阶段分为3个阶段进行分析:
1、Producer发送消息阶段。
2、Broker处理消息阶段。
3、Consumer消费消息阶段。
一、Producer发送消息阶段
1、安全机制保障1,发送方式。
1、同步发送
2、异步发送
3、Oneway发送:Oneway 方式只负责发送请求,不等待应答
2、安全机制保障2
如果发送消息失败或者超时,则重新发送。
发送重试源码如下,本质其实就是一个for循环,当发送消息发生异常的时候重新循环发送。默认重试3次,重试次数修改。
3、安全机制保障3
broker提供多master模式,即使某台broker宕机了,保证消息可以投递到另外一台正常的broker上
二、Broker处理消息阶段
1、安全机制1:同步/异步 【刷盘】的策略
当消息投递到broker之后,会先存到page cache,然后根据broker设置的刷盘策略是否立即刷盘,
也就是如果刷盘策略为异步,broker并不会等待消息落盘成功就会返回【producer成功】,还只是保存到了page cache,
也就是说当broker所在的服务器突然宕机,则会丢失部分页的消息。【这就是异步刷盘带来了的问题】
安全机制2:提供主从模式,同时主从支持同步双写
即使broker设置了【同步刷盘】,如果主broker磁盘损坏,也是会导致消息丢失。
因此可以给broker指定slave,然后将slave设置为同步刷盘策略。
此模式下,producer每发送一条消息,都会等消息投递到【master】和【slave】都落盘成功了,
broker才会当作消息投递成功,保证休息不丢失。
缺点:比较慢,而且如果单边失败,引发其他问题。
三、Consumer消费消息阶段
consumer默认提供的是【At least Once】机制
何为【At least Once】:就是Consumer先pull消息到本地,消费完成后,才向服务器返回ack。
通常消费消息的ack机制一般分为两种思路:
(1)先提交后消费;(可以解决重复消费的问题但是会丢失消息)
(2)先消费,消费成功后再提交;
因此Rocketmq默认实现的是思路二,由各自consumer业务方保证幂等来解决重复消费问题。
消费消息重试机制,RocketMQ本身提供了重新消费消息的能力。但是会有重复消费的问题。
重复消费的问题出现原因
RocketMQ是以【consumer group+queue】来确认消息消费进度,通过【gruop+offset】来标记【queue】消费进度,
消费成功之后都会返回一个ack消息告之broker更新offset,但是RocketMQ并不是按一条一条消息来做ack,
而是根据一次拉取批量来做消息ack
如一次从broker拉去10条消息,就按照10条消息整体做offset,为方便理解下面先按照10条来分析
如上一次的offset为101,本次拉取了10调消息,偏移量从101-110
每一条消息消费成功会按照当前消息最小的offset来更新本地的消费进度,怎么理解这句话,
例如:103消息先消费完成,但是101还没有消费完成(消费失败也算作消费完成),这时候更新还是按照101的偏移量来更新本地偏移量;直到所有的消息都消费完成,110这条消息消费完成的时候才会把偏移量更新为110,再通过定时任务将本地偏移量更新到broker(假设恰好更新偏移量等定时任务触发)。
RocketMQ按批次更新进度好处是不需要每一条消息都需要做ack操作,提升了效率,但是随之产生了2个问题:
1、某一条失败,导致整体失败,然后又重行全量消费一次。
2、但是实际是失败的消息,如果处理。
问题1:
如果这一批消息中的101消息由于一些原因一直没有消费完成,即使其它的9条消息都消费完成了,
broker的消费进度依然偏移到101,如果此时该consumer宕机或者实例被kill,该queue通过负载均衡策略会重新被分配给
其它的consumer,这个时候从broker拉去的偏移量为101开始消费,但是实际102-109这9条消息已经消费完成,
造成102-109这9条消息重复消费
解决方案:
3.6版本之前RocketMQ没有给出解决方案,官方强调业务方【需要自己实现消息幂等】逻辑,但是为了避免大量的出现消息重
复消费的问题,RocketMQ也做了一些限制,如果本地的消息量达到2000之后,不会在拉取新的消息,也就是即使出现上面的
极端情况,也只会造成最多1999条消息重复消费。
3.6之后的版本RocketMQ给出了一个解决方案(治标不治本),在消费端设置了一个消费超时时间
【consumeTimeout = 15min】 原理是,RocketMQ启动了一个定时任务来检查所有的消息的消费情况,在消费开始的时
候会记录消息【消费开始时间】,每隔consumeTimeout时间去检查所有消息是不是消费完成了,如果还没有消费完
成并且时间超过了consumeTimeout配置的时间,就当作【消费成功,但是处理失败】(也算作消费完成),既然消费完成了,
自然会把本地消费进度更新到上例中的110,再通过定时同步机制将本地进度同步到broker,达成本地和broker端一致的效果
consumeTimeout支持业务自己配置,为什么说治标不治本,因为始终还是出现2*consumeTimeout时间(比如第一次任务在12点0分,101消息从12点1分开始消费,到12点30分才会发现超时,如果这个时候宕机)的消息会出现无法完成确认造成消息重复消费。
问题2:
既然是按批量来更新消费进度,但是那些虽然消费完成但是实际是【处理失败】的消息(主动返回【RECONSUME_LATER】和
【抛出异常】的)的消息是如何处理的?
rocketmq在消息消费失败的消息会单独把该消息的msgid、偏移量等信息通过rpc调用通知给broker,那broker会把该消息做重新的投递,从而做到了消息的重置机制,消息的重试后面在分析
安全性保障:跳转
重复消费:跳转