1.概述
Apache Pulsar 是灵活的发布-订阅消息系统(Flexible Pub/Sub messaging),采用计算与存储分离的架构。雅虎在 2013 年开始开发 Pulsar ,于 2016 年首次开源,目前是 Apache 软件基金会的顶级项目。Pulsar 具有支持多租户、持久化存储、多机房跨区域数据复制、高吞吐、低延迟等特性。
2.Pulsar 组件
Pulsar 集群主要由以下三部分组成:
- Broker:Pulsar 的 broker 是一个无状态组件,本身不存储数据。主要负责处理 producer 和 consumer的请求,消息的复制与分发,数据的计算。
- Zookeeper:主要用于存储元数据、集群配置,任务的协调(例如哪个 broker 负责哪个
topic),服务的发现(例如 broker 发现 bookie 的地址)。 - Bookeeper:主要用于数据的持久化存储。除了消息数据,cursors 也会被持久化到 Bookeeper,cursors 是消费端订阅消费的位移。Bookeeper 中每一个存储节点叫做 bookie。
3. Pulsar 基本概念
Producer & Consumer
身为⼀个 Pub/Sub 系统,⾸先的存在要素必然是 producer(⽣产者)。producer 发送数据给 Pulsar,将消息以 append 的形式追加到 topic 中。发送的数据是 key/value 形式的,并且数据会上 schema 的信息。Pulsar 会确保⼀个 producer 往 topic 发送的消息满⾜⼀定的 schema 格式。
既然有 producer 负责生产消息,那么相应地就有 consumer 负责消费消息。在 Pulsar 中 consumer 可以使用不同的订阅模式来接受消息。
Subscription
Pulsar ⾥将 consumer 接收消息的过程称之为:subscription(订阅),类似于 Kafka 的 consumer group(消费组)。⼀个订阅⾥的所有 consumer,会作为⼀个整体去消费这个 topic ⾥的所有消息。Pulsar 有四种订阅模式:独占(exclusive)、故障转移(failover)、共享(shared)、共享键(key_shared)。
Exclusive
在 exclusive 模式下,一个 subscription 只允许被一个 consumer 用于订阅 topic ,如果多个 consumer 使用相同的 subscription 去订阅同一个 topic,则会发生错误。exclusive 是默认的订阅模式。如下图所示,Consumer A-0 和 Consumer A-1 都使用了相同的 subscription(相同的消费组),只有 Consumer A-0 被允许消费消息。
Failover
在 failover 模式下,多个 consumer 允许使用同一个 subscription 去订阅 topic。但是对于给定的 topic,broker 将选择⼀个 consumer 作为该 topic 的主 consumer ,其他 consumer 将被指定为故障转移 consumer 。当主 consumer 失去连接时,topic 将被重新分配给其中⼀个故障转移 consumer ,⽽新分配的 consumer 将成为新的主 consumer 。发⽣这种情况时,所有未确认的消息都将传递给新的主 consumer ,这个过程类似于 Kafka 中的 consumer 组重平衡(rebalance)。
如下图所示,Consumer B-0 是 topic 的主 consumer ,当 Consumer B-0 失去连接时,Consumer B-1 才能成为新的主 consumer 去消费 topic。
Shared
在 shared 模式下,多个 consumer 可以使用同一个 subscription 去订阅 topic。消息以轮询的方式分发给 consumer ,并且每条消费仅发送给一个 consumer 。当有 consumer 失去连接时,所有发送给该 consumer 但未被确认的消息将被重新安排,以便发送给该 subscription 上剩余的 consumer 。
如下图所示,Consumer C-1,Consumer C-2,Consumer C-3 以轮询的方式接受消息。
shared 模式有以下限制:
- 消息不能保证有序。
- 不支持批量 ack。
Key_Shared
key_shared 是 Pulsar 2.4.0 以后⼀个新订阅模式。在 shared 模式下,多个 consumer 可以使用同一个 subscription 去订阅 topic。消息按照 key 分发给 consumer ,含有相同 key 的消息只被发送给同一个 consumer 。
如下图所示,不同的 consumer 只接受到对应 key 的消息。
key_shared 模式有以下限制:
- 需要为每条消息指定一个 key 或者 orderingKey。
- 不支持批量 ack。
- producer 应该禁用 batch 或者使用基于
key 的 batch。
Cursor
cursor 是用来存储一个 subscription 中消费的状态信息(类似 Kafka 中的 offset,偏移量)。Pulsar 将 subscription 的 cursor 存储至 BookKeeper 的 ledger 中。
存储模型
- 第一层抽象是 topic(partition),topic 是一个逻辑的概念,topic 是消息的集合,所有⽣产者的消息,都会归属到指定的
topic ⾥。所有在 topic ⾥的消息,会按照⼀定的规则,被切分成不同的分区(partition)。在 Kafka 中
partition 是真正的物理单元,但是在 Pulsar 中 partition 仍然是一个逻辑的概念。 - Pulsar 把 partition 进一步分成多个分片(segment),segment 是 Pulsar
中真正的物理单元,Pulsar 中的数据是持久化在 Bookeeper 中的,segment 其实对应的就是 Bookeeper 中的
ledger。 - 在分片中存储了更小粒度的 entry,entry 存储的是一条或者一个 batch 的消息,batch 是一次性批量提交多条消息。
⽽最底层的 message 通常包含 Message ID,由以下几个部分组成: - partition-index
- ledger-id(segment)
- entry-id
- batch-index
Broker
Pulsar 中的 broker 是无状态的,不存储数据,真正的数据存储在 Bookeeper 上。每个 topic 的 partition 都会分配到某一个 borker 上,producer 和 consumer 则会连接到这个 broker,从而向该 topic 的 partition 发送和消费消息。broker 主要负责消息的复制与分发,数据的计算。
Namespace & Tenant
Pulsar 从一开始就支持多租户,topic 的名称是层级化的,最上层是租户(tenant),然后是命名空间(namespace),最后才是 topic。
{persistent|non-persistent}://tenant/namespace/topic
- 租户可以跨集群分布,每个租户都可以有单独的认证和授权机制。租户也是存储配额、消息 TTL 和隔离策略的管理单元。
- 命名空间是租户的管理单元,命名空间上配置的策略适用于在该命名空间中创建的所有 topic。租户可以使用 REST API 和
pulsar-admin CLI 工具来创建多个命名空间。 - persistent|non-persistent 标识了 topic 的类型,默认情况下 topic 是持久化存储到磁盘上的。
Ack 机制
在 Pulsar 中支持了两种 ack 的机制,分别是单条 ack 和批量 ack。单条 ack(AckIndividual)是指 consumer 可以根据消息的 messageID 来针对某一个特定的消息进行 ack 操作;批量 ack(AckCumulative)是指一次 ack 多条消息。
消息生命周期
默认情况下,Pulsar Broker 会对消息做如下处理:
- 当消息被 consumer 确认之后,会立即执行删除操作。
- 对于未被确认的消息会存储到 backlog 中。
但是,很多线上的生产环境下,这种默认行为并不能满足我们的生产需求,所以,Pulsar 提供了
如下配置策略来覆盖这些行为:
- Retention 策略:用户可以将 consumer 已经确认的消息保留下来。
- TTL 策略:对于未确认的消息,用户可以通过设置 TTL 来使未确认的消息到达已经确认的状态。
上述两种策略的设置都是在NameSpace 的级别进行设置。
Backlog
backlog 是未被确认的消息的集合,它有一个大前提是,这些消息所在的 topic 是被 broker 所持久化的,在默认情况下,用户创建的 topic 都会被持久化。换句话说,broker 会将所有未确认或者未处理的消息都存放到 backlog 中。
需要注意的是,对 backlog 进行配置时,我们需要明确以下两点:
- 在当前的 namespace 下,每一个 topic 允许 backlog 的大小是多少。
- 如果超过设定的 backlog 的阈值,将会执行哪些操作。
当超过设定的 backlog 的阈值,Pulsar 提供了以下三种策略供用户选择:
Retention
Retention 策略的设置提供了两种方式:
- 消息的大小,默认值:defaultRetentionSizeInMB=0
- 消息被保存的时间,默认值:defaultRetentionTimeInMinutes=0
Time To Live(TTL)
TTL 参数就像附在每条消息上的秒表,用于定义允许消息停留在未确认状态的时间。当 TTL 过期时,Pulsar 会自动将消息更改为已确认状态(并使其准备删除)。TTL 只去处理一件事情,就是将未被确认的消息变为被确认的状态,TTL 本身不会去涉及相应的删除操作。
消息写入流程
producer 向 topic 的 partition 对应的 broker 发送消息。broker 以并行的方式将消息写到多个 bookie 中,当指定数量的 bookie 写入成功时,broker 会向 producer 响应消息写入成功。
消息读取流程
consumer 向订阅 topic 的 partition 对应的 broker 请求消息,如果消息在 broker 的缓存中存在,则 broker 直接将消息返回给 consumer 。如果缓存中不存在,broker 去 bookie 中读取消息,然后返回给 consumer 。consumer 在完成消费后,向 broker 响应 ack 表示完成消费。consumer ack 的元数据也是会持久化在 bookie 中的。
4.Pulsar vs Kafka
数据存储
- Kafka 的服务层和存储层位于同一节点上,broker 负责数据的计算与存储。
- Pulsar 的架构将服务层与存储层解耦:无状态 broker 节点负责数据服务;bookie 节点负责数据存储。
- 另外 Pulsar 还支持分层存储,如主存储(基于 SSD)、历史存储(S3)等。可以将访问频率较低的数据卸载到低成本的持久化存储(如
AWS S3、Azure 云)中。
存储单元:
- Kafka 和 Pulsar 都有类似的消息概念,客户端通过主题与消息系统进行交互,每个主题都可以分为多个分区。Pulsar 和Kafka 之间的根本区别在于 Kafka 是以分区(partition)作为数据的存储单元,而 Pulsar是以分片(segment)作为为数据的存储单元。
- 在 Kafka中,分区只能存储在单个节点上并复制到其他节点,其容量受最小节点容量的限制。当对集群进行扩容时或者发送副本故障时,会触发数据的拷贝,这将耗费很长的时间。
- 在 Pulsar 中,同样是以分区作为为逻辑单元,但是是以 segment为物理存储单元。分区随着时间的推移会进行分段,并在整个集群中均衡分布,能够有效迅速地扩展。
名词对应表
根据个人对 Pulsar 和 Kafka 的理解,整理如下 Pulsar 和 Kafka 的名词对应表:
Pulsar | Kafka |
---|---|
Topic | Topic |
Partition | Partition |
Segment(Ledger) | Segment |
Bookie | Broker |
Broker | Client SDK |
Write Quorum Size (Qw) | Replica Number |
Ack Quorum Size (Qa) | request.required.acks |
- Pulsar 和 Kafka 都是以 topic 描述一个基本的数据集合,topic 数据又逻辑分为若干个 partition。
- 但 Kafka 以 partition 作为物理存储单位,每个 partition 必须作为一个整体(一个目录)存储在某一个 broker上,虽然 Kafka 也会将一个 partition 分成多个 segment,但是这些 segment 是存在 Kafka broker的同一个目录下。而 Pulsar 的每个 partition 是以 segment(对应到 Bookkeeper 的 ledger)作为物理存储的单位,所以 Pulsar 中的一个逻辑上有序的 partition 数据集合在物理上会均匀分散到多个 bookie 节点中。
- Pulsar 的数据存储节点 Bookkeeper 被称为 bookie,相当于一个 Kafka broker。
- ensemble size 表示 topic 要用到的物理存储节点 bookie 个数,其副本数目 Qw 不能超过 bookie个数,因为一个 bookie 上不能存储超过一个以上的数据副本。
- Qa 是每次写请求发送完毕后需要回复确认的 bookie 的个数。