1 为什么需要消息队列?
消息队列是一种基于消息的异步通信机制,用于在分布式系统中不同组件或服务之间传递数据和通知。实际上可以将消息队列看作为存放消息的容器,参与消息传递的分别称为生产者(发送消息)和消费者(处理消息),这里的消息队列是服务以及系统内部各个组件/模块之间通信的 中间件。
中间件(英语:Middleware),又译中间件、中介层,是一类提供系统软件和应用软件之间连接、便于软件各部件之间的沟通的软件,应用软件可以借助中间件在不同的技术架构之间共享信息与资源。中间件位于客户机服务器的操作系统之上,管理着计算资源和网络通信。——维基百科
通过在系统中使用消息队列,可以实现如下功能:
- 系统解耦: 消息队列允许系统中的不同组件通过发送和接收消息进行通信,而无需直接连接和了解彼此的实现细节。这种解耦使得系统更加灵活,能够更容易地扩展、升级和维护。
- 异步通信: 发送方将消息发送到消息队列后即可继续执行其他操作,而无需等待接收方处理消息。接收方在合适的时候从队列中获取并处理消息。这种异步通信方式可以提升系统的整体响应速度和吞吐量。
- 消息缓冲: 消息队列可以作为临时存储,帮助处理消息的流量峰值和波动。发送方可以将消息发送到队列中,而不必担心接收方是否能立即处理,从而减少了消息丢失的风险。
- 支持分布式系统: 在分布式系统中,消息队列可以跨越多个物理节点传递消息,帮助管理和协调系统中的各种服务和组件。
- 削峰填谷: 消息队列可以帮助平滑处理系统的消息流量,例如在高峰期间缓冲消息并逐渐消费,避免因突发流量导致系统崩溃或性能下降。
- 数据流处理:针对分布式系统产生的海量数据流,如业务日志、监控数据、用户行为等,消息队列可以实时或批量收集这些数据,并将其导入到大数据处理引擎中,实现高效的数据流管理和处理。
1.1 消息队列标准-JMS 和 AMQP
JMS(Java Message Service)和 AMQP(Advanced Message Queuing Protocol)都是消息中间件的标准和协议。
- JMS(Java Message Service, Java 消息服务)是 Java 的消息服务, 是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。
- AMQP即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计且兼容 JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。
对比方向 | JMS | AMQP |
定义 | Java | 协议 |
跨平台/语言 | 否 | 是 |
支持消息模型 | 提供两种消息模型:①Peer-2-Peer;②Pub/sub | 提供了五种消息模型:①direct exchange;②fanout exchange;③topic change;④headers exchange;⑤system exchange。本质来讲,后四种和 JMS 的 pub/sub 模型没有太大差别,仅是在路由机制上做了更详细的划分; |
支持消息类型 | TextMessage:包含字符串作为消息内容的消息类型;可以用来传递简单的文本消息,比如配置信息或者简单的命令。 BytesMessage:包含一个原始的字节流作为消息内容的消息类型;适合传递二进制数据,如图像、音频、视频等。 MapMessage:包含一组名称-值对作为消息内容的消息类型;可以用来传递结构化数据,比如传递一组配置参数。 StreamMessage:包含一系列 Java 基本数据类型作为消息内容的消息类型;适合传递数据流,比如一组数字或者其他基本数据类型的序列。 ObjectMessage:包含一个序列化对象作为消息内容的消息类;可以用来传递 Java 对象,但要求被传递的对象必须实现了 Serializable 接口。 | byte[](二进制) |
1.2 消息队列技术选型
常见消息队列对比如下 [1]:
- Kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 Kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。Kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,几乎是全世界这个领域的事实性规范。
- RabbitMQ 在吞吐量方面虽然稍逊于 Kafka、RocketMQ 和 Pulsar,但是由于它基于 Erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 Erlang 开发,所以国内很少有公司有实力做 Erlang 源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这几种消息队列中,RabbitMQ 或许是你的首选。
- RocketMQ 和 Pulsar 支持强一致性,对消息一致性要求比较高的场景可以使用。RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的 MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。
- ActiveMQ 的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用,已经被淘汰了。
2 什么是Kafka?
Kafka 是一个开源的分布式消息系统,最初由 LinkedIn 开发并开源,现在归属于 Apache 软件基金会的顶级项目之一。它设计用来处理大规模的实时数据流,具有高吞吐量、低延迟和高可靠性的特点。
官方文档
Apache KafkaApache Kafka: A Distributed Streaming Platform.https://kafka.apache.org/documentation/#gettingStarted
github地址
https://github.com/apache/kafkahttps://github.com/apache/kafka
什么是Kafka?可以结合其使用场景从这三个角度考虑:
- 消息系统:Kafka 用作消息中间件时,除了具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能;同时,还实现的消息顺序性保障及回溯消费。
- 流式处理平台:Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。
- 存储系统:Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置 为“永久”或启用主题的日志压缩功能即可。 [2]
Kafka 的开发社区非常活跃,在消息引擎的领域取的不俗成绩之后,不断拓展自己的领域,在基于事件的流处理平台方向一直发力,不断自我更新迭代力图成为这个领域内的事实标准。参考大佬绘制的Kafka 应用生态架构图 [3]
3 Kafka核心概念
典型的Kafka体系结构通常包括若干生产者Producer、若干服务代理节点Broker、若干消费者Consumer,以及一个ZooKeeper集群,如下图
Kafka 体系架构 [4]
生产者Producer:生产者负责创建消息,并持续不断的向某个特定的主题Topic发送消息
服务代理节点Broker:Broker 负责将收到的消息存储到磁盘中,可以简单地看作一个独立的 Kafka 服务节点或 Kafka 服务实例(大多数情况下也可以将 Broker 看作一台 Kafka 服务器,前提是这台服务器上只部署了一个 Kafka 实例)。一个或多个 Broker 组成了一个 Kafka 集群。通常使用首字母小写的 broker 来表示服务代理节点
消费者Consumer:消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理。
ZooKeeper集群: Kafka 中使用ZooKeeper来负责集群元数据的管理、控制器的选举等操作。
3.1 主题(Topic)与分区(Partition)
生产者发送消息会以主题Topic进行归类,所以发送到Kafka集群的每一条消息都需要指定主题。Topic是一个逻辑上的概念,通常还会细分为若干分区Partition(也称为主题分区Topic Partition)。
Topic和Partition的关系可以从以下几个方面理解:
- Topic可以看作Partition的集合,Partition独属于单个主题,同一个Topic中的不同Partition中的消息是不同的。关于Topic中的Partition个数可以是创建Topic时通过指定参数设置,也可以在Topic创建后修改,增加Partion数量通常用来实现水平扩展。
- 单个Topic 可以横跨不同的broker上,这也导致了Topic中的消息无法在所有Partition有序,但是每个 Partition 是一个有序的队列(即Partition有序而不是主题有序)。Partition存储层面可以看作一个可追加的日志文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset ,消息在分区中的唯一标识,Kafka 用其来保证消息在Partition内的顺序性,不过 offset 并不跨越分区)。
- 消息发送到broker前已经通过Topic归类,具体到存储在那个分区则依据分区规则,如果分区规则设置合理所有的消息会均匀分配在不同的分区中。试想Topic只对应一个日志文件(Partion),那么这个文件所在的机器 I/O 将会成为这个Topic的性能瓶颈。
Partition数如何设定?
参考ProducerRecord类,它是用于封装生产者将要发送到 Kafka 集群的消息
public class ProducerRecord<K, V> {
private final String topic; // 消息要发送到的Kafka主题
private final Integer partition; // 消息要发送到的分区编号。如果没有指定分区,则为null。
private final Headers headers; // 消息的头信息,可以用于传递额外的元数据
private final K key; // 消息的键
private final V value; // 消息的值
private final Long timestamp; //消息的时间戳
……
}
在ProducerRecord类中显示指定分区号partition,则会发送到topic中指定的分区;
没有指定分区号partition,则会依据消息的键key来执行不同的分区选择方法:没有key,则使用一些默认的策略,比如轮询或随机选择一个分区来发送消息;有key,通常会计算键的哈希值,并使用哈希值与分区数取模的方式来确定分区编号。
Partition是 Kafka中最小的并行操作单元,对生产者而言,每个Partition的数据写入是完全可以并行化的;对消费者而言,Kafka 只允许单个Partition中的消息被一个消费者线程消费,一个消费组的消费并行度完全依赖于所消费的Partition数。如此看来,Topic中的Partition数越多,理论上所能达到的吞吐量就越大,那么事实真的如预想的一样吗?
实际,则是Partition数的增加会带来性能上的提升,但是一旦Partition数超过了某个阈值之后,整体的吞吐也是不升反降的。 [5]某些应用场景会要求Topic中的消息都能保证顺序性,这种情况下在创建主题时可以设定Partition数为 1,通过Partition有序性的这一特性来达到Topic有序性的目的。
合适的Partition数如何确定?实际没有权威的答案,往往是依据开发者实战经验,结合业务应用、硬件资源、环境配置等做出的选择,通常根据预估的吞吐量及是否与 key 相关的规则来设定Partition数即可,后期可以通过增加Partition数、增加 broker 或Partition重分配等手段来进行改进。
注:Partition数只能增加不能减少,Partition删除首先会引起消息丢失的可靠性问题,然后删除Partition中消息的顺序性问题和事务性问题都要考虑。非常复杂,如果真的需要实现此类功能,则完全可以重新创建一个分区数较小的主题,然后将 现有主题中的消息按照既定的逻辑复制过去即可。
Topic和Partition相关名字解释
名词 | 解释 |
主题(Topic) | 一个 Topic 就是一组 Partition 的集合,效果相当于是给一组 Partition 做了个命名,唯一提供的实际功能应该就是增加集合中的 Partition 数量 |
分区(Partition) | 一个独立不可再分割的消息队列,分区中会有多个副本保存消息,他们的状态应该是一致的。 Kafka 分区副本的同步机制不是纯异步的,有高水位机制去跟踪从副本的同步进度,并且有对应的领导者副本选举机制保证分区整体对外可见的消息都是已提交的 |
副本(Replica) | 分区中消息的物理存储单元,通常对应磁盘上的一个日志目录,目录中会对消息文件进一步进行分段保存。 |
主副本、领导者副本(Leader Replica) | 指一个 Partition 的多个副本中,对外提供读写服务的那个副本。 Kafka 集群范围有对等地位的组件是 Controller。 |
已同步副本(In sync replica) | In sync replica 指满足副本同步要求的副本集合,包括领导者副本。副本同步是按照时间差进行判定的,而非消息偏移的延迟 |
从副本(Follower Replica) | 负责与主副本的消息同步 |
3.2 多副本与存储视图
上面我们知道Topic和Partion的关系,实际上Partition还引入多副本(Replica)机制提升了容灾能力。Replica副本之间是一主多从的关系,主副本Leader Replica负责处理读写请求,follower Replica只负责与主副本的消息同步。也就是说生产者和消费者只与Leader Replica交互。你可以理解为其他副本只是leader副本的拷贝,它们的存在只是为了保证消息存储的安全性。
通常副本处于不同的 broker 中,当 Leader Replica发生故障时会从 follower 中选举出一个leader,但是follower中如果有和 leader同步程度达不到要求的参加不了leader的竞选。这里就需要说明同一个Partition的副本中虽然保存相同的消息,但同一时刻,副本之间并非完全一样,这是由于follower Replica中的消息相对leader Replica而言会有一定的滞后。正常情况AR (Assigned Replicas) = ISR (In-Sync Replicas) ,即所有副本AR与一定程度程度同步副本ISR集合是相等的。当存在滞后过多副本OSR (Out-of-Sync Replicas),他们的关系则是:AR =I SR + OSR
只有在 ISR 集合中的副本才有资格被选举为新的leader,ISR和OSR集合是动态变化的,OSR中副本跟上则会移动至ISR,反之亦然。Leader Replica负责维护和跟踪ISR集合中所有follower Replica (Leader Replica也属于ISR)的滞后状态。关于ISR中副本的同步,还涉及HW和LEO 等相关概念,参考下表。
Replica的数据存储视图
为易于理解ISR、HW、LEO等概念,下面解释下下副本Replica在实际物理上的存在形式Log以及LogSegment。如上图Replica存在于不同的broker提供了容错的能力,而副本的实际存储形式是Log,为避免Log太大,会将其平均分割为多个大小相同的LogSegment,也就是日志分段。前者以文件夹的形式存储,后者对应磁盘上一个日志文件和两个索引文件(偏移量索引文件以“.index”为文件后缀和时间戳索引文件以“.timeindex” 为文件后缀),以及其他文件。
每个 LogSegment 中不只包含“.log”“.index”“.timeindex”这 3 种文件,还可能包 含“.deleted”、“.cleaned”、“.swap”等临时文件,以及可能的“.snapshot”、“.txnindex”、 “leader-epoch-checkpoint”等文件。从更加宏观的视角上看,Kafka 中的文件不只上面提及的这些文件,比如还有一些检查点文件。
Log中只有最后一个LogSegment可以执行写入操作,将消息按照顺序追加。每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前LogSegment中第一条消息的offset。偏移量是一个64位的长整型数,日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为20位数字,没有达到的位数则用 0 填充。例如第一个LogSegment的基准偏移量为0,其对应的日志文件为 00000000000000000000.log。
多副本和日志视图相关概念
名词 | 解释 |
高水位(High Watermark) | 简写HW是一个特定的消息偏移量(offset)的标识,消费者只能拉取到这个偏移量之前的消息。它与 ISR(In-Sync Replicas,同步副本)以及 LEO(Log End Offset,日志结束偏移量)存在紧密的关系。 |
日志结束偏移量(Log End Offset) | 简写LEO标识当前日志文件中下一条待写入消息的偏移量,其值为当前日志分区中最后一条消息的偏移量值加 1。Partition的 ISR 集合中的每个副本都会维护自身的 Log End,通常 ISR 集合中最小的 Log End Offset 即为分区的“High Watermark”。 |
日志起始偏移量(LogStartOffset) | 用于标识日志文件中起始消息的偏移量 |
3.3 消费者(Consumer)与消费组(Consumer Group)
我们知道消费者通过订阅特定的Topic(主题)来消费消息,其实在消费者上层还有个消费组(Consumer Group)的概念。消费者通过消费组来标识,当消息发布到Topic后,会被分配给每个订阅它的消费组中的一个消费者实例。例如上图消费组A和B都订阅Topic,彼此没有影响,按照Kafka的默认分配规则,消费组A中的每个消费者实例得到一个分区,消费组B中平均2个分区,消费者将消费其分配的Partion。
- 如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例
- 如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程?
注意:一个分区在同一个消费组下只能被一个消费者消费,但一个消费者可以消费多个分区
kafka协议会动态维护消费组中的消费关系,当新的实例加入消费组会从消费组中其他成员处接管部分Partion分区,消费者减少则会分发其分区。由此可见,通过消费者和消费组模型可以让消费能力具备横向的伸缩性,增加消费者来提升消费能力(Partion数固定的情况下,消费组中消费者数目不宜超过Partion数,否则多出的消费者将无分区而空闲),或者减少消费者降低消费能力。
3.4 Zookeeper与Kafka
长期以来ZooKeeper一直是集群管理的核心,ZooKeeper中保存了大量数据,会随着Topic、Partition和消费者组的数量增加,成为约束集群规模的重要条件 [6](作为强一致性的存储系统,写入性能不佳,面对高频率的写入请求将难应付)。随着3.0版本的中 KRaft 协议的推出,Zookeeper的退出进程开启,虽然如此新版本Kafka的生产环境落地不多,Zookeeper 仍然是不可或缺的组件。
Zookeeper主要为Kafka做了下面这些事情:
- Broker注册管理:在Zookeeper上会有一个专门的节点,用来进行Broker服务器列表记录。当Broker启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,并在节点中记录IP地址和端口等信息。
- Topic注册管理:在Kafka中,同一个Topic的消息会被分成多个Partition并将其Replica分布在多个不同Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护。例如,创建了一个名为my-topic的主题并且它有两个分区,对应到zookeeper中会创建这些文件夹:/brokers/topics/my-topic/Partitions/0、/brokers/topics/my-topic/Partitions/1
- 负载均衡:Kafka通过将特定Topic分成多个Partition,而各个Partition可以分布在不同的Broker上,这样便能提供比较好的并发能力。 对于同一个Topic的不同Partition,会尽量分布到不同的Broker服务器上。当生产者产生消息后也会尽量投递到不同Broker的Partition里面。当Consumer消费的时候,Zookeeper可以根据当前的Partition数量以及Consumer数量来实现动态负载均衡
- Controller 选举:在Kafka集群中,Controller是一个特殊的 Broker 节点,负责管理和协调整个集群的运行状态。当Kafka集群启动时,Zookeeper会通过选举机制选择一个Broker节点作为Controller。这个选举过程基于Zookeeper的临时节点和顺序节点特性,确保了选举结果的可靠性和一致性。
-
消息队列基础知识总结 | JavaGuide ↩︎
-
https://juejin.cn/post/7146133960865611806 ↩︎
-
https://juejin.cn/post/7176576097205616700?searchId=202407291657482745B92FB71D4D331818 ↩︎
-
https://juejin.cn/post/7146133960865611806 ↩︎
-
https://juejin.cn/post/7146133960865611806 ↩︎
-
生产故障|Kafka消息发送延迟达到几十秒的罪魁祸首竟然是... | HeapDump性能社区 ↩︎