RocketMQ
部署架构图

NameServer:主要是对元数据的管理,包括Topic和路由信息的管理,底层由netty实现,是一个提供路由管理、路由注册和发现的无状态节点,类似于ZooKeeper
Broker:消息中转站,负责收发消息、持久化消息
Producer:消息的生产者,一般由业务系统来产生消息供消费者消费
Consumer:消息的消费者,一般由业务系统来异步消费消息
工作流程
启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
领域模型

消息生产
生产者(Producer):
Apache RocketMQ 中用于产生消息的运行实体,一般集成于业务调用链路的上游。生产者是轻量级匿名无身份的。
消息存储
主题(Topic):Apache RocketMQ 消息传输和存储的分组容器,主题内部由多个队列组成,消息的存储和水平扩展实际是通过主题内的队列实现的。
队列(MessageQueue):Apache RocketMQ 消息传输和存储的实际单元容器,类比于其他消息队列中的分区。 Apache RocketMQ 通过流式特性的无限队列结构来存储消息,消息在队列内具备顺序性存储特征。
消息(Message):Apache RocketMQ 的最小传输单元。消息具备不可变性,在初始化发送和完成存储后即不可变。
消息消费
消费者分组(ConsumerGroup):Apache RocketMQ 发布订阅模型中定义的独立的消费身份分组,用于统一管理底层运行的多个消费者(Consumer)。同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展。
消费者(Consumer):Apache RocketMQ 消费消息的运行实体,一般集成在业务调用链路的下游。消费者必须被指定到某一个消费组中。
订阅关系(Subscription):Apache RocketMQ 发布订阅模型中消息过滤、重试、消费进度的规则配置。订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。Apache RocketMQ 的订阅关系除过滤表达式之外都是持久化的,即服务端重启或请求断开,订阅关系依然保留。
存储机制
RocketMQ
|--store
|-commitlog
| |-00000000000000000000
| |-00000000001073741824
|-config
| |-consumerFilter.json
| |-consumerOffset.json
| |-delayOffset.json
| |-subscriptionGroup.json
| |-topics.json
|-consumequeue
| |-SCHEDULE_TOPIX_XXX
| |-topicA
| |-topicB
| |-0
| |-1
| |-2
| |-3
| |-00000000000000000000
| |-00000000001073741824
|-index
| |-00000000000000000000
| |-00000000001073741824
|-abort
|-checkpoint
物理存储CommitLog
在RocketMQ中CommitLog的作用是存储所有topic的消息,CommitLog的存储文件地址是$HOME\store\commitlog${fileName},每个文件默认大小是1GB,当超过1GB时,会自动创建一个新的CommitLog文件进行存储。
CommitLog的目录结构
CommitLog的文件名长度为20位,左边补齐零,最右边为偏移量。例如:文件名00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;假设第一个文件存满了,则会生成第二个文件00000000001073741824,起始偏移量为1073741824,依次类推。

CommitLog每条消息存储长度不同,其逻辑视图可以参考下图,每条消息的前4个字节存储该消息的总长度。

CommitLog的存储结构

字段简称 | 字段长度(字节) | 含义 |
msgLen | 4 | 消息的长度,消息的长度是整个消息体所占用的字节数的大小 |
magicCode | 4 | 魔数,是固定值,MAGICCODE = 0xdaa320a7 |
bodyCRC | 4 | 消息体的CRC,用于防止网络、硬件等故障导致数据与发送时不一致带来的问题,当broker重启recover时会校验 |
queueId | 4 | 队列id,表示消息发到了哪个consumequeue |
flag | 4 | 网络通信层标记。创建Message对象时由生产者通过构造器设定的flag值,可以用于标记是请求,还是响应亦或是oneway(producer发只管发出消息不管返回值)类型,在RemotingCommand有用到 |
queueOffset | 8 | 在consumequeue中的偏移量 |
physicalPosition | 8 | 代表消息在commitLog中的物理起始地址偏移量 |
sysFlag | 4 | 指明消息是事务状态等消息特征,可参考MessageSysFlag类。二进制为四个字节从右往左数: 当第1个字节为1(值为1)时表示表示消息是压缩的(Compressed); 当第2个字节为1(值为2)表示多消息(MultiTags); 当4个字节均为0(值为0)时表示非事务消息; 当第3个字节为1(值为4)时表示prepared消息; 当第4个字节为1(值为8)时表示commit消息; 当第3/4个字节均为1时(值为12)时表示rollback消息。 |
msg born timestamp | 8 | producer发送消息的时间戳 |
msg host | 8 | producer的host(address:port) |
store timestamp | 8 | 消息存储的时间戳 |
store host | 8 | broker的host(address:port) |
reconsume time | 4 | 消息被某个订阅组重新消费了几次(订阅组之间独立计数),因为重试消息发送到了topic名字为%retry%groupName的队列queueId=0的队列中去了,成功消费一次记录为0; |
prepare transaction offset | 8 | 表示是prepared状态的事务消息偏移量,RocketMQ事务消息基于两阶段提交 |
body length | 4 | 消息体长度 |
msg body | bodyLength | 消息体的内容 |
topic length | 1 | topic的长度,topc的长度最多不能超过127个字节,超过的话存储会出错(有前置校验) |
topic | topicLength | topic的内容值 |
properties Length | 2 | 属性值大小 |
properties | propertiesLength | RocketMQ内部用到的一些属性。例如发送消息的TAG就存放在Properties里面,Properties中的一些常用key都定义在了MessageConst里面 |
commitLog 文件的最大的一个特点就是消息的顺序写入,随机读写,所有的 topic 的消息都存储到 commitLog 文件中,顺序写入可以充分的利用磁盘顺序减少了 IO 争用数据存储的性能,kafka 也是通过硬盘顺序存盘的。
大家都常说硬盘的速度比内存慢,其实这句话也是有歧义的,当硬盘顺序写入和读取的时候,速度不比内存慢,甚至比内存速度快,这种存储方式就好比数组,我们如果知道数组的下标,则可以直接通过下标计算出位置,找到内存地址,众所周知,数组的读取是很快的,但是数组的缺点在于插入数据比较慢,因为如果在中间插入数据需要将后面的数据往后移动。
而对于数组来说,如果我们只会顺序的往后添加,数组的速度也是很快的,因为数组没有后续的数据的移动,这一操作很耗时。
回到 RocketMQ 中的 commitLog 文件,也是同样的道理,顺序的写入文件也就不需要太多的去考虑写入的位置,直接找到文件往后放就可以了,而取数据的时候,也是和数组一样,我们可以通过文件的大小去精准的定位到哪一个文件,然后再精准的定位到文件的位置
逻辑存储ConsumeQueue和Index
RocketMQ的消息存储采用的是混合型的存储结构,所谓混合型存储结构,就是Broker单个实例下所有的队列共用一个CommitLog来存储,这样会导致查询的时候无法快速定位具体的某个消息。于是就有了ConsumeQueue和Index两个逻辑存储文件。

从上图可以看出,Producer生产任意的消息后,无论Topic是否相同,都会将消息内容存储到CommitLog中,当CommitLog存储满了,则会创建一个新的CommitLog文件继续存储新生成的消息。此外,在Broker端,会有一个后台服务线程——ReputMessageService会不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)。而Consumer端可以根据ConsumeQueue中的数据查找具体的消息。IndexFile则为消息查询提供了一种通过key或时间区间来查询消息的方法。下面主要介绍下ConsumeQueue和IndexFile。
ConsumeQueue
RocketMQ基于主题订阅模式实现消息的消费,但是在CommitLog中存储的消息是不连续的,如果从CommitLog检索该消息文件会很慢,为了提高效率,对应的主题的队列建立了索引文件,为了加快消息的检索和节省磁盘空间,每一个ConsumeQueue条目存储了消息的关键信息CommitLog文件中的偏移量、消息长度、tag的hashcode值。

一个ConsumeQueue表示一个topic的一个queue,类似于kafka的一个partition,单个ConsumeQueue文件中默认包含30万个条目,ConsumeQueue文件名采取定长设计,每一个条目共20个字节,分别为8字节的CommitLog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。
ConsumeQueue可以看成是基于topic的CommitLog索引文件,故ConsumeQueue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName},ConsumeQueue具体目录结构如下所示:

上述的TopicTest是topic名,TopicTest下有4个文件夹,分别代表不同的queueId。每个queueId下存储具体的ConsumeQueue文件,如上图的00000000000000000000文件。消费者在读取消息时,先读取ConsumeQueue,再通过ConsumeQueue中的位置信息读取CommitLog,得到原始的消息。
Index
IndexFile(索引文件)用于为生成的索引文件提供访问服务。Index文件的存储位置是:$HOME \store\index${fileName},它通过MsgId或消息Key值查询消息真正的实体内容。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为420M,一个IndexFile可以保存 2000W个索引。IndexFile的文件结构如下所示:

IndexHead 数据
beginTimestamp:该IndexFile包含消息的第一条消息的存储时间。大小为8字节。
endTimestamp:该IndexFile包含消息的最后一条消息的存储时间。大小为8字节。
beginPhyoffset:第一条消息在commitlog的偏移量。大小为8字节。
endPhyoffset:最后一条消息在commitlog的偏移量。大小为8字节。
hashSlotCount:已经填充的slot数。大小为4字节。
indexCount:该IndexFile包含的索引个数。大小为4字节
Hash槽
Hash槽的作用就是存放当前slot下最新index的序号。每当放一个新的消息的index进来,首先取MessageKey的hashCode,然后用hashCode对slot总数取模,得到应该放到哪个slot中,slot总数默认500W个。只要是取hash就必然面临hash冲突的问题,跟HashMap一样,IndexFile也是使用一个链表结构来解决hash冲突。只是这里跟HashMap稍微有点区别的地方是,slot中放的是最新index的指针。这是因为一般查询的时候肯定是优先查最近的消息。

从上图可以看出,IndexFile结构与hash表很相似,固定数量的slot组成数组,每个slot对应一条index链,index之间通过链表方式组织在一起。slot的值对应当前slot下最新的那个index的序号,而index中存储了当前slot下以及当前index的前一个index序号,这样就把slot下的所有index链起来了。
Index 条目列表
每个Index条目固定长度为20字节,存放真正的索引数据。Index总共大概有2000W,也就是说平均每个slot存4个Index。Index组成结构包含下列几种:
hashcode:消息key的hashcode。
phyoffset:消息对应的物理偏移量。
timedif:该消息存储时间与第一条消息的时间戳的差值,小于0表示该消息无效。
preIndexNo:该条目的前一条记录的 Index 索引,hash冲突时,根据该值构建链表结构。
功能特性
普通消息
定时/延迟消息
顺序消息
事务消息
消息发送重试和流控机制
消息过滤
消费者负载均衡
消费进度管理
消费重试
消息存储和清理机制
Kakfa
部署架构图

Broker:消息中间件处理节点(服务器),一个节点就是一个broker,一个Kafka集群由一个或多个broker组成。
Topic:Kafka对消息进行归类,发送到集群的每一条消息都要指定一个topic。
Partition:物理上的概念,每个topic包含一个或多个partition,一个partition对应一个文件夹,这个文件夹下存储partition的数据和索引文件,每个partition内部是有序的。
Producer:生产者,负责发布消息到broker。
Consumer:消费者,从broker读取消息。
ConsumerGroup:每个consumer属于一个特定的consumer group,可为每个consumer指定group name,若不指定,则属于默认的group,一条消息可以发送到不同的consumer group,但一个consumer group中只能有一个consumer能消费这条消息。
kafka的工作流程

kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的
topic是逻辑上的概念,而partition是物理上的概念,每个partition对应一个log文件,该log文件中存储的就是producer生产的数据。producer生产的数据会被不断追加到log文件的末端,且每条数据都有自己的offset
offset是一个long型的数字,通过这个offset可以确定一条在该partition下的唯一消息。在partition下是保证有序的,但是在topic下面没有保证有序性
消费者组中的每个消费者,都会实时记录自己消费到哪个offset以便出错恢复,从上次的位置继续消费
kafka存储机制


由于生产者生产的消息会不断追加到log文件末端,为防止log文件过大导致数据定位效率低,kafka采取了分片和索引机制,将每个partition分为多个segment(逻辑上的概念,index+log文件)
每个partition(目录)相当于一个巨型文件被平均分配到多个大小相等的segment(片段)数据文件中(每个segment文件中消息数量不一定相等),这种特性也方便old segment的删除,即方便已被消费的消息的清理,提高磁盘的利用率。每个partition只需要支持顺序读写就行,segment的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours}等若干参数)决定
每个segment对应两个文件----“.index”和“.log”文件。分别表示为segment索引文件和数据文件(引入索引文件的目的就是便于利用二分查找快速定位message位置)。这两个文件的命名规则为:
partition全局的第一个segment从0开始,后续每个segment文件名以当前segment的第一条消息的offset命名,数值大小为64位,20位数字字符长度,没有数字用0填充。
这些文件位于一个文件夹下(partition目录),改文件夹的命名规则:topic名+分区序号。例如,first这个topic有三个分区,则其对应的文件夹为first-0,first-1,first-2

Index和log文件以当前segment的第一条消息的offset命名。

如何通过offset查找message
例如读取offset=368776的message,需要通过下面2个步骤查找。
第一步查找segment file其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件 00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset **二分查找**文件列表,就可以快速定位到具体文件。当offset=368776时定位到00000000000000368769.index|log
第二步通过segment file查找message通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和 00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到 offset=368776为止。
高效性的保证
每条消息都被append到该partition中,是顺序写入磁盘的,在机械盘中随机写入的效率是很低的,但是如果是顺序写入效率非常高这是kafka高吞吐率的一个很重要的保证。

每一条消息被发送到broker时,会根据paritition规则选择被存储到哪一个partition。如果partition规则设置的合理,所有 消息可以均匀分布到不同的partition里,这样就实现了水平扩展。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)。在创建topic时可以在$KAFKA_HOME/config/server.properties中指定这个partition的数量(如下所示),当然也可以在topic创建之后去修改parition数量
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3
在发送一条消息时,可以指定这条消息的key,producer根据这个key和partition机制来判断将这条消息发送到哪个parition。 paritition机制可以通过指定producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中如果key可以被解析为整数则将对应的整数与partition总数取余,该消息会被发送到该数对应的partition。(每个parition都会有个序号)
对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际 上也没必要),因此Kafka提供两种策略去删除旧数据。一是基于时间,二是基于partition文件大小。例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可通过配置让Kafka在partition文件超过1GB时删除旧数据,如下所示。
############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# By default the log cleaner is disabled and the log retention policy will default to
#just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs
#can then be marked for log compaction.
log.cleaner.enable=false
这里要注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除文件与Kafka性能无关,选择怎样的删除策略只与磁盘 以及具体的需求有关。另外,Kafka会为每一个consumer group保留一些metadata信息–当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下 consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。因为 offet由consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些consumer过,不需要通过broker去保证同一个consumer group只有一个consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。
性能测试
吞吐量
测试场景
对比Kafka、RabbitMQ、RocketMQ发送小消息(124字节)的性能。这次压测我们只关注服务端的性能指标,所以压测的标准是:
不断增加发送端的压力,直到系统吞吐量不再上升,而响应时间拉长。这时服务端已出现性能瓶颈,可以获得相应的系统最佳吞吐量。

在同步发送场景中,三个消息中间件的表现区分明显:
Kafka的吞吐量高达17.3w/s,不愧是高吞吐量消息中间件的行业老大。这主要取决于它的队列模式保证了写磁盘的过程是线性IO。此时broker磁盘IO已达瓶颈。
RocketMQ也表现不俗,吞吐量在11.6w/s,磁盘IO %util已接近100%。RocketMQ的消息写入内存后即返回ack,由单独的线程专门做刷盘的操作,所有的消息均是顺序写文件。
RabbitMQ的吞吐量5.95w/s,CPU资源消耗较高。它支持AMQP协议,实现非常重量级,为了保证消息的可靠性在吞吐量上做了取舍。我们还做了RabbitMQ在消息持久化场景下的性能测试,吞吐量在2.6w/s左右。
测试结论
在服务端处理同步发送的性能上,Kafka>RocketMQ>RabbitMQ。
Topic数量对性能的影响
测试场景
分区数采用8个分区。这次压测我们只关注服务端的性能指标,所以压测的退出标准是:
不断增加发送端的压力,直到系统吞吐量不再上升,而响应时间拉长。此时服务端出现性能瓶颈,获取相应的系统最佳吞吐量,整个过程中保证消息没有累积。
默认每个Topic的分区数为8,每个Topic对应一个订阅者,逐步增加Topic数量。得到如下数据:

可以看到,不论Topic数量是多少,Kafka和RocketMQ均能保证发送端和消费端的TPS持平,就是说,保证了消息没有累积。
根据Topic数量的变化,画出二者的消息处理能力的对比曲线如下图:

从图上可以看出:
Kafka在Topic数量由64增长到256时,吞吐量下降了98.37%。
RocketMQ在Topic数量由64增长到256时,吞吐量只下降了16%。
测试结论
在消息发送端,消费端共存的场景下,随着Topic数的增加Kafka吞吐量会急剧下降,而RocketMQ则表现稳定。因此Kafka适合Topic和消费端都比较少的业务场景,而RocketMQ更适合多Topic,多消费端的业务场景。
稳定性
测试场景
消息收发端共存的情况下,RocketMQ和Kafka各运行约1个小时,观察不同Topic数量时,Kafka、RocketMQ性能指标(TPS&响应时间)的波动性。
默认每个Topic的分区数为8,每个Topic对应一个订阅者,逐步增加Topic的数量,这里性能是否抖动根据趋势图做直观的判断,数据如下:

做完全部的测试场景后会发现,正如之前的猜测,Kafka在32和64个Topic时,就已经出现了不稳定的情况。下面看一下32和64个Topic的详细数据,如下图所示:

蓝色Kafka的TPS曲线在18分钟以后,就开始上下波动,毫无规律,而RocketMQ则表现稳定。下面再看64个Topic的情况,如下图所示:


Kafka的TPS在前20分钟保持稳定,并大幅度领先RocketMQ。20分钟后又开始出现不规则波动,这些波动直接导致响应时间的变化,某个时刻Kafka的客户端响应时间会达到25毫秒,而RocketMQ全程都是5毫秒。
这次的对比3个场景中, Kafka胜出一个,就是8个Topic的场景,如图5所示,由于Topic个数和分区数的限制,导致Kafka只适合小规模的业务系统。

测试结论
Topic数的增加对RocketMQ无影响,长时间运行服务非常稳定。
Kafka 的Topic数量建议不要超过8个。8个以上的Topic会导致Kafka响应时间的剧烈波动,造成部分客户端的响应时间过长,影响客户端投递的实时性以及客户端的业务吞吐量。
可靠性
测试场景
使用多个发送端向一个Topic发送消息,发送方式为同步发送,分区数为8,只启动一个订阅者。
场景1:模拟进程退出
在消息收发过程中,利用Kill -9 命令使Broker进程终止,然后重新启动,得到可靠性数据如下:

注:以上测试场景中Kafka的异步刷盘间隔为1秒钟,同步发送需设置request.required.acks=1,否则会出现消息丢失。
在Broker进程被终止重启,Kafka和RMQ都能保证同步发送的消息不丢,因为进程退出后操作系统能确保将该进程遗留在内存的数据刷到磁盘上。实验中,Kafka出现了极少量的消息重复。再次可以确定此场景中,二者的可靠性都很高。
场景2:模拟机器掉电
在消息收发过程中,直接拔掉Broker所在的宿主机电源,然后重启宿主机和Broker应用。因受到机房断电限制,我们在本场景测试中使用的是普通PC机器。得到可靠性数据如下:

测试发现,即使在并发很低的情况下,Kafka和RMQ都无法保证掉电后不丢消息。这个时候,就需要改变刷盘策略了。我们把刷盘策略由“异步刷盘”变更为“同步刷盘”,就是说,让每一条消息都完成存储后才返回,以保证消息不丢失。
重新执行上面的测试,得到数据如下:

首先,设置同步刷盘时,二者都没出现消息丢失的情况。限于我们使用的是普通PC机器,两者吞吐量都不高。此时Kafka的最高TPS仅有500条/秒,RMQ可以达到4000条/秒,已经是Kafka的8倍。
为什么Kafka的吞吐量如此低呢?因为Kafka本身是没有实现任何同步刷盘机制的,就是说在这种场景下测试,Kafka注定是要丢消息的。但要想做到每一条消息都在落盘后才返回,我们可以通过修改异步刷盘的频率来实现。设置参数log.flush.interval.messages=1,即每条消息都刷一次磁盘。这样的做法,Kafka也不会丢消息了,但是频繁的磁盘读写直接导致性能的下降。
另外,二者在服务恢复后,均出现了消息重复消费的情况,这说明消费位点的提交并不是同步落盘的。不过,幸好Kafka和RMQ都提供了自定义消费位点的接口,来避免大量的重复消费。
测试结论
在Broker进程被Kill的场景, Kafka和RocketMQ都能在保证吞吐量的情况下,不丢消息,可靠性都比较高。
在宿主机掉电的场景,Kafka与RocketMQ均能做到不丢消息,此时Kafka的吞吐量会急剧下跌,几乎不可用。RocketMQ则仍能保持较高的吞吐量。
在单机可靠性方面,RocketMQ综合表现优于Kafka。
同步刷盘和异步刷盘的区别

同步刷盘是在每条消息都确认落盘了之后才向发送者返回响应;而异步刷盘中,只要消息保存到Broker的内存就向发送者返回响应,Broker会有专门的线程对内存中的消息进行批量存储。所以异步刷盘的策略下,当机器突然掉电时,Broker内存中的消息因无法刷到磁盘导致丢失。
为什么Topic增多Kafka的性能会下降很多?
kafka一个topic下面的所有消息都是以partition的方式分布式的存储在多个节点上。存文件需要寻找消息所属的Partition文件,再完成顺序写入,当Topic比较多时,Partition也比较多,甚至说多个topic的Partition会分布在同一台机器上,Partition寻址就会浪费比较多的时间,原本的顺序读写就变为随机读写,partition 越多,随机就越严重,所以Kafka不太适合多Topic的场景。
那RocketMQ为什么在多Topic的情况下,依然还能很好的保持较多的吞吐量呢?我们首先来看一下RocketMQ中比较关键的文件:

rocketmq中的消息主体数据并没有像Kafka一样写入多个文件,而是写入一个文件,这样我们的写入IO竞争就非常小,可以在很多Topic的时候依然保持很高的吞吐量。
有人可能说这里的ConsumeQueue写是在不停的写入呢,并且ConsumeQueue是以Queue维度来创建文件,那么文件数量依然很多,在这里ConsumeQueue的写入的数据量很小,每条消息只有20个字节,30W条数据也才6M左右,所以其实对我们的影响相对Kafka的Topic之间影响是要小很多的。
比较
ActiveMQ | RabbitMQ | RocketMQ | Kafka | |
Conpany | Apache | Rabbit | Apache & Alibaba | Apache & Linkedin |
Language | Go | Erlang | Java | Scala |
GitHub | https://github.com/apache/activemq (2.1K Star) | https://github.com/rabbitmq/rabbitmq-server (10.4K Star) | https://github.com/apache/rocketmq (18.7K Star) | https://github.com/apache/kafka (24.1K Star) |
GitHub Solve Issue | ✅✅ | ✅✅ | ✅✅✅ | ✅✅✅ |
LICENSE | Apache-2.0 license | Apache-2.0 license | Apache-2.0 license | Apache-2.0 license |
Web Tools | ActiveMQ Web Console | 自带的工具 | RocketMQ Dashboard | Cmak(kafka-manager) |
Cloud Products | ❌ Aliyun ✅ AWS Amazon MQ | ✅ Aliyun RabbitMQ ✅ AWS Amazon MQ | ✅ Aliyun RocketMQ ❌ AWS | ✅ Aliyun Kafka ✅ AWS MSK |
单机吞吐量 | 万级 | 万级 | 十万级 | 十万级 |
单机消息堆积 | 百万级 | 百万级 | 十亿级 | 十亿级 |
时效性 | ms级 | μs级 | ms级 | ms级 |
可用性 | 高,主从架构实现高可用 | 高,主从架构实现高可用 | 非常高,分布式架构 | 非常高,分布式架构 |
消息可靠性 | 有较低概率丢失数据 | 经过参数优化配置,可以做到0丢失 | 经过参数优化配置,可以做到0丢失 | |
功能支持 | MQ领域的功能极其完备 | 基于Erlang开发,并发能力很强,性能极其好,延迟很低 | MQ功能较为完善,业内共识的金融级可靠业务消息首选方案 | 功能简单,主要支持简单的MQ功能,在大数据、实时计算、日志采集领域被大规模使用 |
事务消息 | ✅ | ✅ | ✅ | ❌ |
顺序消息 | ✅ | ✅ | ✅ | ✅ |
延迟消息 | ✅ | ❌ | ✅ | ❌ |
消费失败重试 | ✅ | ✅ | ✅ | ❌ |
高可用 | 需要借助Zookeeper | 支持高可用, Master-Slave model, 不需要借助其他组件 | 需要借助Zookeeper | |
消息回溯 | ✅ | ❌ | ✅ 时间戳 + 偏移量 | ✅ 偏移量 |

1、ActiveMQ的MQ的功能很完备,RabbitMQ的延迟非常低,但是他们两个的吞吐量和消息堆积能力不够好,ActiveMQ官方社区不活跃,现在已经用的很少了;RabbitMQ一些中小公司用的还是挺多的,简单够用,因为公司的量级达不到。
2、RocketMQ 的高吞吐量,低延迟、数据堆积能力都非常好,支持成百上千个Topic性能也比较稳定,同时MQ的功能也很完备,像延迟队列,事务消息,消费重试等,由于它的性能强劲且可靠,让很多互联网公司用于电商、金融行业等。
3、当Topic少的时候,Kafka的吞吐量、延迟会比RocketMQ更高一层,所以Kakfa广泛用于在大数据、实时计算、日志采集,但是当Topic增多,Kakfa的性能不稳定,同时Kafka的MQ功能很简单,像延迟队列,事务消息,消费重试都不支持
4、所以我认为RocketMQ是最合适的选择,但是在亚马逊上没有RocketMQ云产品,需要SRE部署集群,所以还需要再思考一下。。