目录
- 一、读队列与写队列
- 1.概念介绍
- 2.读写队列个数关系分析
- 二、消息持久化
- 1.持久化文件介绍
- 2.持久化结构介绍:
- 三、过期文件删除
- 1.如何判断文件过期
- 2.什么时候删除过期文件
- 四、高效文件写
- 1.零拷贝技术加速文件读写
- 2.文件顺序写
- 3.刷盘机制
- 五、 消息主从复制
- 六、负载均衡
- 1.Producer负载均衡
- 2.Consumer负载均衡
- 七、死信队列
一、读队列与写队列
1.概念介绍
在RocketMQ控制台创建topic时就需要设置读队列和写队列。写队列负责消息的写入,读队列负责consumer的的消息读取。这其实是一种读写分离的思想
perm字段表示Topic的权限。有三个可选项。 2:禁写禁订阅,4:可订阅,不能写,6:可写可订阅
2.读写队列个数关系分析
我们设置时默认,写队列=读队列。如果写队列>读队列:那么会有一部分写队列数据无法写到读队列,也就无法被消费会出现消息丢失。如果写队列<读队列,那么就有一部分读队列上是没有数据的会造成资源浪费。
这里有一种情景是读队列!=写队列的。
要对Topic的MessageQueue进行缩减的时候,例如原来四个队列,现在要缩减成两个队列。如果立即缩减读写队列,那么被缩减的MessageQueue上没有被消费的消息,就会丢失。这时,可以先缩减写队列,待空出来的读队列上的消息都被消费完了之后,再来缩减读队列,这样就可以比较平稳的实现队列缩减了。
二、消息持久化
RocketMQ消息直接采用磁盘文件保存消息,默认路径在${user_home}/store目录。这些存储目录可以在broker.conf中自行指定。
1.持久化文件介绍
有三个比较重要的文件:
- Commitlog:存储消息的元数据。所有的消息都会顺序写入文件,Commitlog由多个文件组成,每个文件固定为1g大小。以第一条消息偏移量为文件名。
- Consumequeue:存储消息在CommitLog的索引。一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog。
- IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,例如可以通过MeessageId或者MessageKey来检索文件
另外,还有几个辅助的存储文件:
- checkpoint:数据存盘检查点。里面主要记录commitlog文件、ConsumeQueue文件以及IndexFile文件最后一次刷盘的时间戳。
- config/*.json:这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置、消费者组配置、消费者组消息偏移量Offset 等等一些信息。
- abort:这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作。
2.持久化结构介绍:
整体消息存储结构如图:
流程图解释:
1.所有生产者发送的消息都存储在Commitlog
2.消费者在消费消息时根据ConsumerQueue中的记录的偏移量单元,就可以定位到具体存储在Commitlog上的消息。
3.通过MeessageId或者MessageKey来查找消息时,会借助IndexFile文件,找到消息在Commitlog的具体偏移位置。
三、过期文件删除
消息既然要持久化,就必须有对应的删除机制。RocketMQ内置了一套过期文件的删除机制。
1.如何判断文件过期
在broker.conf中配置的fileReservedTime属性就是文件保留时间。文件超过了这个时间就认为是过期文件。
2.什么时候删除过期文件
在broker.conf中deleteWhen属性指定文件删除时间。默认是凌晨四点,ocketMQ内部有一个定时任务,对文件进行扫描,并且触发文件删除的操作
四、高效文件写
我们知道RocketMQ的可以抗住很高的并发,并且在高并发场景下也可以保证消息写到文件存储。那么是怎么做到的呢?
1.零拷贝技术加速文件读写
零拷贝(zero-copy)是操作系统层面提供的一种加速文件读写的操作机制,非常多的开源软件都在大量使用零拷贝,来提升IO操作的性能。对于Java应用层,对应着mmap和sendFile两种方式。接下来,咱们深入操作系统来详细理解一下零拷贝。
- 理解CPU拷贝和DMA拷贝
操作系统对于内存空间分为用户态和内核态。用户态应用程序无法直接操作硬件,这其实是为了保证操作系统的安全。所以应用程序在与磁盘发生IO时,需要在用户态和内核态来回复制数据,而这些操作是由CPU进行任务调度、分配,所以当发生大规模IO时CPU占用率会很高。
之后操作系统为了避免CPU被各种IO调用占满,引入DMA(直接存储器存储)。DMA是一套独立的指令集,不会占用CPU的计算资源。这样CPU就不需要参与具体的数据复制的工作,只需要管理DMA的权限即可,DMA拷贝极大的释放了CPU的性能,因此他的拷贝速度会比CPU拷贝要快很多。
虽然 DMA可以独立完成数据在系统内部的复制。但是,数据复制过程中,依然需要借助数据总进线。当系统内的IO操作过多时,还是会占用过多的数据总线,造成总线冲突,最终还是会影响数据读写性能。
后来又引入了Channel通道的方式。Channel,是一个完全独立的处理器,专门负责IO操作。既然是处理器,Channel就有自己的IO指令,与CPU无关,他也更适合大型的IO操作,性能更高
而所谓的零拷贝技术,其实并不是不拷贝,而是要尽量减少CPU拷贝。
-
mmap文件映射机制是怎么回事
在一次文件拷贝过程中,操作系统层面的拷贝已经由CPU拷贝优化成了DMA拷贝。而内核态与用户态之间的拷贝依然是CPU拷贝。所以,在这个场景下,零拷贝技术优化的重点,就是内核态与用户态之间的这两次拷贝。
mmap文件映射的方式,就是在用户态不再保存文件的内容,而只保存文件的映射,包括文件的内存起始地址,文件大小等。真实的数据,也不需要在用户态留存,可以直接通过操作映射,在内核态完成数据复制。
最后,这种mmap的映射机制由于还是需要用户态保存文件的映射信息,数据复制的过程也需要用户态的参与,这其中的变数还是非常多的。所以,mmap机制适合操作小文件,如果文件太大,映射信息也会过大,容易造成很多问题。通常mmap机制建议的映射文件大小不要超过2G 。而RocketMQ做大的CommitLog文件保持在1G固定大小,也是为了方便文件映射。 -
sendFile机制是怎么运行的
早期的sendfile实现机制其实还是依靠CPU进行页缓存与socket缓存区之间的数据拷贝。但是,在后期的不断改进过程中,sendfile优化了实现机制,在拷贝过程中,并不直接拷贝文件的内容,而是只拷贝一个带有文件位置和长度等信息的文件描述符FD,这样就大大减少了需要传递的数据。而真实的数据内容,会交由DMA控制器,从页缓存中打包异步发送到socket中。
最后,sendfile机制在内核态直接完成了数据的复制,不需要用户态的参与,所以这种机制的传输效率是非常稳定的。sendfile机制非常适合大数据的复制转移。
2.文件顺序写
通常应用程序往磁盘写文件时,由于磁盘空间不是连续的,会有很多碎片。所以我们去写一个文件时,也就无法把一个文件写在一块连续的磁盘空间中,而需要在磁盘多个扇区之间进行大量的随机写。这个过程中有大量的寻址操作,会严重影响写数据的性能。而顺序写机制是在磁盘中提前申请一块连续的磁盘空间,每次写数据时,就可以避免这些寻址操作,直接在之前写入的地址后面接着写就行。
3.刷盘机制
在操作系统层次,当应用程序写入一个文件时,文件内容并不会直接写到硬件中。而是会先写到操作系统的PageCache中,这个时候如果机器宕机了那么这部分数据就会丢失。因此,操作系统也提供了一个系统调用,应用程序可以自行调用这个系统调用,完成PageCache的强制刷盘。
RocketMQ中有同步刷盘和异步刷盘两种方式。
- 同步刷盘
在返回成功状态时,消息已经写入磁盘。具体流程是消息写到PageCache后,立刻通知刷盘线程刷盘,等待刷盘完成后,刷盘线程唤醒等待线程返回消息写成功。同步刷盘机制会更频繁的调用fsync,所以吞吐量相比异步刷盘会降低,但是数据的安全性会得到提高。 - 异步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大 - 配置方式
刷盘方式是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。
五、 消息主从复制
Broker以一个集群的方式部署,消息需要从Master复制到Slave上。而消息复制的方式分为同步复制和异步复制。
- 同步复制
同步等待master和slave都写入成功,才反馈客户端写入成功。这样数据不易丢失但是增大数据写入的延迟,降低系统的吞吐量 - 异步复制
只要master写入成功就反馈客户端写成功,然后在异步把消息复制给slave.
在异步复制下,系统拥有较低的延迟和较高的吞吐量。但是如果master节点故障,而有些数据没有完成复制,就会造成数据丢失。 - 配置方式
消息复制方式是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。
六、负载均衡
1.Producer负载均衡
Producer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的queue上的目的。而由于MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的broker上。
2.Consumer负载均衡
Consumer也是以MessageQueue为单位来进行负载均衡。分为集群模式和广播模式。
集群模式
Producer发送消息会均匀的分配到所有MessageQueue上,集群模式下每个consumer都会被均匀的分配一个或者多个MessageQueue(默认采用平均分配策略),这样保证Consumer负载均衡
广播模式
广播模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说。而在实现上,就是在Consumer分配Queue时,所有Consumer都分到所有的Queue。
七、死信队列
当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。
RocketMQ默认的重试次数是16次。见源码org.apache.rocketmq.common.subscription.SubscriptionGroupConfig中的retryMaxTimes属性。
这个重试次数可以在消费者端进行配置。 例如 DefaultMQPushConsumer实例中有个setMaxReconsumeTimes方法指定重试次数。
死信队列的名称是%DLQ%+ConsumGroup
死信队列的特征:
- 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
- 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
- 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
- 死信队列中的消息不会再被消费者正常消费。
- 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。
通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。
默认创建出来的死信队列,他里面的消息是无法读取的,在控制台和消费者中都无法读取。这是因为这些默认的死信队列,他们的权限perm被设置成了2:禁读(这个权限有三种 2:禁读,4:禁写,6:可读可写)。需要手动将死信队列的权限配置成6,才能被消费(可以通过mqadmin指定或者web控制台)。