一、简介
pulsar,消息中间件,是一个用于服务器到服务器的消息系统,具有多租户、高性能等优势。
pulsar采用发布-订阅的设计模式,producer发布消息到topic,consumer订阅这些topic处理流入的消息,并当处理完成之后发送一个确认。
一旦创建订阅,即使consumer断开连接,pulsar仍然可以保存所有消息,在consumer确认消息已处理成功之后才会删除消息。
Pulsar 的关键特性如下:
-
Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
-
极低的发布延迟和端到端延迟。
-
可无缝扩展到超过一百万个 topic。
-
简单的客户端 API,支持 Java、Go、Python 和 C++。
-
Topic 支持多种订阅模式(独占订阅、共享订阅、故障转移订阅)。
-
通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
-
由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
-
基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。
-
分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如S3、GCS)中。
二、安装部署
pulsar支持在多种环境下以多种安装模式进行安装。
环境:裸金属、Docker、kubernetes、AWS、DC/OS
模式:单机模式、单集群、多集群
1,本地单机安装
2,本地单集群安装
三、目录结构
Pulsar 二进制包目录:
目录 | 内容 |
---|---|
bin | Pulsar的命令行工具,比如 pulsar 和 pulsar-admin. |
conf | Pulsar 的配置文件,包含 broker 配置、ZooKeeper 配置等。 |
examples | Java JAR 包,包含 Pulsar Functions 的示例。 |
lib | Pulsar使用到的 JAR 文件 |
licenses | 开源许可文件,.txt 格式,用于规范 Pulsar 代码库的各个组件。 |
运行 Pulsar 会立即生成的目录:
目录 | 内容 |
---|---|
data | ZooKeeper和BookKeeper使用的数据存储目录 |
instances | 为 Pulsar Functions 创建的组件。 |
logs | 安装时生成的日志文件 |
四、重要概念
1、消息
消息是pulsar的基础单元。从生产者、到消费者、在pulsar中被传递保存的东西。由以下成分组成:
成分 | 描述 |
---|---|
Value / data payload | 消息携带的数据。 |
Key | 消息可选择用key标记,有利于topic压缩等。 |
Properties | 用户定义属性的可选键/值映射。 |
Producer name | 产生消息的生产者的名称。如果不指定生产者名称,则使用默认名称。 |
Topic name | 消息发布到的topic的名称。 |
Schema version | 用于生成消息的模式的版本号。 |
Sequence ID | 每条消息都属于其topic的有序序列。消息的序列ID由生产者分配,表示其在该序列ID中的顺序,也可自定义。 可用于重复消息删除等。 |
Message ID | 消息ID表示消息在ledger中指定位置,在pulsar是唯一的。在持久层存储时由bookies分配。 |
Publish time | 发布消息的时间戳。由生产者自动应用。 |
Event time | 应用程序附加到消息的可选时间戳。例如,当消息被处理时应用程序为其附加一个时间戳。如果没有事件事件,则为0. |
消息默认大小时5M。可在broker.conf和bookeeper.conf配置文件中配置:
2、生产者
producer是一个进程,它依附于一个topic把消息发布给brokers。pulsar borker处理消息。
(1)发送模式
生产者可以以同步(sync)或异步(async)的方式向brokers发送消息。
模式 | 描述 |
---|---|
Sync send | producer发布每条消息后都等待broker的确认。如果没有收到确认,生产者将认为发送失败。 |
Async send | producer将消息放入阻塞队列并立即返回,客户端在后端将消息发送到broker。如果队列已满,则在调用API时,producer将阻塞或立即失败(根据producer的配置参数决定)。 |
(2)访问模式
producer访问topic时,有不同类型的访问模式。
访问模式 | 描述 |
---|---|
Shared | 多着producer能向一个topic发布消息。(默认的模式) |
Exclusive | 只有一个producer能向一个topic发布消息
|
WaitForExclusive | 如果一个topic已经连接了一个生产者,那么其他生产者的创建将被挂起,而不是超时。 |
一旦应用程序成功地创建了具有Exclusive或WaitForExclusive访问模式的生产者,该应用程序的实例就保证是该主题的唯一写入者。任何试图生成关于此主题的消息的其他生产者要么立即得到错误,要么必须等到获得独占访问权。
(3)压缩
可以压缩在传输过程中的消息。支持这几种压缩方式:
- LZ4
- ZLIB
- ZSTD
- SNAPPY
(4)分批
启用batching时,producer将在单个请求中累积并发送一批消息。批处理大小由消息的最大数量和最大发布延迟定义。因此,backlog大小表示批的总数,而不是消息的总数。
此时,批是跟踪和存储的基本单元而不是单个消息了。
消费者将一批消息解压为单个消息。
对于scheduled messages,是否启用批处理,始终作为单个消息发送。
当消费者确认批处理的所有消息时,批处理被确认。这意味着,当不是所有批消息都得到确认时,某个消息的失败、否定确认或者确认超时都可能导致重新交付这批中的所有消息。
pulsar从2.6.0开始引入了批索引确认,来避免将已确认的消息以批方式重新传递给consumer。当开启批索引确认,consumer过滤已确认的批索引,并将确认请求发送给broker(broker维护并跟踪批索引的确认状态,从而避免将确认的消息发送给consumer,当批中的所有索引都被确认时,该批将被删除)。
默认情况下,批处理索引确认时禁用的,可在broker中设置acknowledgmentAtBatchIndexLevelEnabled
。
启用批处理索引确认会导致更多的内存开销。
(5)分块
Message chunking使Pulsar能够通过在producer端将消息分割成块并在consumer端聚合分块消息来处理大型有效负载消息。
启用消息分块后,当消息的大小超过允许的最大有效负载大小(代理的maxMessageSize参数)时,消息传递的工作流程如下:
- producer将原始消息拆分为分块消息,并将它们与分块元数据一起按顺序单独的发布给broker。
- broker以与普通消息相同的方式将分块消息存储在一个managed-ledger中,并使用chunkedMessageRate参数记录topic上的分块消息速率。
- 消费者在接收到消息时,对分块的消息进行缓冲,并接收到时所有块将它们聚合到接收队列中。
- 客户端消费来自接收队列的聚合消息。
限制:
- 分块只适用于persisted topic;
- 分块只对 exclusive 和 failover 订阅类型可用;
- 分块不能与批处理同时使用(启动分块前应禁用批处理)。
consumer为每个大型消息保留一个单独的缓冲区,以便将其所有块聚合到一条消息中。
可以通过配置maxPendingChunkedMessage参数来限制消费者同时维护的分块消息的最大数量。
当达到阈值时,使用者通过静默确认或请求代理稍后重新交付来丢弃挂起的消息,从而优化内存利用率。
如果使用者在指定的时间内未能收到消息的所有块,则消息块将不完整地过期。缺省值为1分钟(由expireTimeOfIncompleteChunkedMessage
配置)。
3、消费者
consumer是一个进程,它通过订阅附加到主题,然后接收消息。
consumer向broker发送 flow permit request去获取消息。在consumer端有一个队列(通过receiverQueueSize参数配置,默认是1000)用于接收从broker推送的消息。每次调用consumer.receive()时,都会从缓冲区中取出一条消息。
(1)接收模式
消息以同步(sync)或异步(async)的方式从broker接收。
Mode | Description |
---|---|
Sync receive | 同步接收被阻塞,直到有消息为止。 |
Async receive | 异步接收立即返回一个future值—比如,在java中为CompletableFuture—一旦有新消息可用,这个过程就完成了。 |
(2)监听
客户端库为consumer提供侦听器实现。例如,Java客户机提供了MesssageListener接口。在此接口中,每当接收到新消息时,都会调用received方法。
(3)Acknowledgement
consumer在成功消费消息后向broker发送确认请求。
然后,这个消费的消息将被永久存储,只有在所有的订阅都确认后才会删除。
可配置 message retention policy使被consumer确认的消息继续保存。
消息可以通过以下两种方式确认:
- 逐个确认(individual acknowledged ),consumer确认每条消息并向borker发送确认请求;
//逐个确认API: consumer.acknowledge(msg);
- 累积确认(cumulative acknowledged ),consumer只确认它收到的最后一条消息。
//累积确认API: consumer.acknowledgeCumulative(msg);
注意:Cumulative acknowledgement不能用在Shared订阅类型中,因为Shared订阅类型涉及多个可以访问同一订阅的consumer。在Shared订阅类型中,消息是单独确认的。
(4)Negative acknowledgement
否定确认,允许程序向broker发送通知,指示consumer没有处理消息。
当consumer没有使用消息,并需要重新使用它时,consumer可以向broker发送否定确认(nack),触发broker将此消息重新发送给consumer。
可单独或累积的否定确认消息。
注意:
- 在Exclusive 和 Failover订阅模式中,消费者仅能否定确认它们收到的最后一条消息。
- 在 Shared 和 Key_Shared订阅模式中,消费者能逐个否定确认消息。
- 对排序订阅类型(比如 Exclusive, Failover and Key_Shared)的否定确认可能导致消息乱序。
- 如果要对消息使用否定确认,要在确认超时之前对消息进行否定确认。
- 如果启用了批处理,则将一个批处理中的所有消息重新交付给使用者。
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-negative-ack")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.negativeAckRedeliveryDelay(2, TimeUnit.SECONDS) // the default value is 1 min
.subscribe();
Message<byte[]> message = consumer.receive();
// call the API to send negative acknowledgement
consumer.negativeAcknowledge(message);
message = consumer.receive();
consumer.acknowledge(message);
(5)Acknowledgement timeout
(6)Retry letter topic
(7)Dead letter topic
4、topic
从生产者到消费者传输信息的通道。
结构:
{persistent|non-persistent}://tenant/namespace/topic
Topic name 组成 | 描述 |
---|---|
persistent / non-persistent | topic类型。pulsar支持持久和非持久两种主题,没有指定时默认是持久的。 持久主题:所有消息都持久的保存在磁盘上 非持久主题:数据不持久的保存在磁盘上 |
tenant | 实例中的topic租户。 租户对于Pulsar的多租户至关重要,并且分布在多个集群中。 |
namespace | topic管理单元,主要是提供了一个相关主题的分组机制。大多数topic配置都是在namespace级别执行的。每个租户有一个或多个名称空间。 |
topic | topic名。topic名在pulsar实例中没有特殊含义。 |
提示:
不需要显式的提前的创建主题。
如果客户端向一个不存在的topic写入或接收消息,pulsar自动在主题名称中提供的namespace创建该主题;
如果客户端创建主题时没有指定租户和命名空间,则pulsar在默认租户和命名空间中创建主题。
5、名称空间(namespace)
命名空间是topic的管理单元,主要是提供了一个相关主题的分组机制,将不同类型的topic放到同一个命名空间管理。
不同租户可创建多个命名空间,一个命名空间中可创建多个topic。
6、订阅(subscription)
订阅是一个配置规则,决定如何将消息发送给consumer。
pulsar中有四种订阅类型:
- exclusive
- shared
- failover
- key_shared
(1)订阅类型
订阅类型是在consumer连接到订阅时确定的;
可以通过重启consumer来更改类型;
当订阅没有连接consumer时,其订阅类型是hi未定义的。
Ⅰ、Exclusive(独占类型)
在独占类型中,只允许将单个consumer附加到订阅。默认的订阅模式。
如果多个consumer使用同一个订阅订阅了一个topic,就会发生错误。
Ⅱ、Failover(灾备类型)
在灾备类型中,多个consumer可以附加到同一个订阅;
非分区主题或分区主题的每个分区都选择一个主consumer去接收消息;
当主consumer断开连接时,未确认的和后续的消息都被传递到consumer等待队列中的下一个consumer中。
对于分区topic:broker根据优先级和消费者名称字典序对消费者排序。
对于非分区topic:broker将根据它们订阅非分区topic的顺序选择消费者。
Ⅲ、Shared(共享类型,round robin)
在共享类型中,多个consumer可以附加到同一个订阅。
消息循环分发订阅的consumer,每个消息都只传递给一个consumer。
当某个consumer断开连接时,所有发送给它但没有被确认的消息将被重新发送个剩余的某个消费者 。
注意:
- 共享类型不能保证消息的顺序;
- 不能在共享类型中使用累积确认。
Ⅳ、 Key_Shared
在key_shared类型中,多个consumer可以附加到同一个订阅;
具有相同key的消息仅传递给一个消费者,无论消息被重传多少次都会只传递给同一个消费者;
当有消费者连接或者断开连接时,
注意:
- 使用key_shared订阅类型时,需要禁用批处理或使用基于key的批处理。
- broker根据消息的key分发消息,但默认批处理方式可能无法将相同key的消息打包到同一批中;
- 因为时consumer从批中获取获取单条消息,而一个批中第一个消息的key被视为批中的所有消息的key。
基于key的批处理:
确保生产者将具有相同key的消息打包到同一批中,没有key的消息被打包到同一批中(broker从这个批分发消息时以NON_KEY作为key)。
//开启基于key的批处理的示例
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();
注意:
- 使用key_shared类型,需要为消息指定key或者排序key;
- 不能在key_shared类型中用累积确认。
(2)订阅模式
创建订阅时会创建一个关联游标来记录消息最后消耗的位置,当订阅的consumer重新启动时可以继续从它消费的最后一条消息开始消费,订阅模式主要为了指定游标类型。
订阅模式 | 描述 | 备注 |
---|---|---|
Durable | 游标是持久的。保留消息并保持当前消耗位置。 如果broker重启,可从持久存储(bookkeeper)中恢复游标。 | 默认的订阅模式。 |
NonDurable | 游标是非持久的。 broker停止,游标就会丢失,无法恢复。 | Reader的订阅模式本质上是非持久的,它不阻止主题中的数据被删除。Reader订阅模式蹦年被更改。 |
注意:
- 一个订阅可以有一个或多个consumer;
- 当consumer订阅主题时,必须指定订阅名称;
- 持久订阅和非持久订阅可以使用相同的名称,它们彼此独立;
- 如果consumer指定的订阅不存在,则自动创建该订阅。
7、多主题订阅
从pulsar 1.23.0-incubating版本开始,支持pulsar消费者可同时订阅多个topic。
两种订阅方式:
- 正则表达式(所有topic必须在同一名称空间),例如:persistent://public/default/finance-.*
- topic列表
当订阅多个主题时,pulsar客户端自动调用pulsar api以发现与正则表达式/topic列表匹配的topic并订阅,如果任何主题不存在,则在主题创建后,consumer自动订阅它们。
多个topic之间没有顺序保证,当生产者向多个topic发送消息时,不能保证从这些主题读取消息的顺序相同。
8、分区topic
分区topic是由多个broker处理的一种特殊类型的topic,因此可以实现更高的吞吐量(普通topic由一个broker服务,限制了topic的最大吞吐量)。
分区topic实现:为N个内部主题(其中N是分区的数量),当向分区topic发布消息时,每条消息被路由到多个broker中的一个。
pulsar自动维护分区的分布在哪个broker。
分区主题需要通过admin API显式创建。分区的数量可以在创建主题时指定。
路由模式决定了每个消息应该发布到哪个分区,而订阅类型决定了哪些消息将发送到哪个consumer。
通常,吞吐量问题应该指导分区/路由决策,而订阅决策应该由应用程序语义指导。
(1)路由模式
路由模式决定每条消息应该发布到哪个分区(内部topic)。
模式 | 描述 |
---|---|
RoundRobinPartition | 如果消息没有指定了key,生产者将以循环方式给所有分区发布消息(批),以实现最大吞吐量。 如果消息指定了key,消费者将散列key将消息分配给特定分区。 默认模式 |
SinglePartition | 如果消息没有指定key,生产者随机选择一个分区发布消息; 如果消息指定了key,消费者将散列key将消息分配给特定分区 |
CustomPartition | 自定义实现消息路由,该实现将被调用以确定特定的消息分区。 用户可以通过使用Java client和实现MessageRouter接口来创建自定义路由模式。 |
(2)保证顺序
消息的顺序与路由模式和key相关,通常用户希望每个key分区的消息是有序的。
顺序保证方式 | 描述 | 路由模式和key |
---|---|---|
Per-key-partition | 具有相同键的所有消息将按顺序排列并放置在同一个分区中。 | 使用SinglePartition或RoundRobinPartition模式,并且消息指定key |
Per-producer | 来自同一生产者的所有消息将是有序的。 | 使用SinglePartition模式,并且不为每条消息提供Key。 |
(3)Hashing scheme
HashingScheme是一个枚举,表示在为特定消息选择要使用的分区时可用的标准哈希函数集。
有两种类型的标准哈希函数可用:JavaStringHash和Murmur3_32Hash。
producer 的默认哈希函数是JavaStringHash。
注意,当生产者可以来自不同的多语言客户端时,JavaStringHash是没有用的,在这种用例下,建议使用Murmur3_32Hash。
9、非持久topic
消息永远不会持久化到磁盘,而只存在于内存中。
在使用非持久传递时,杀死Pulsar代理或断开与topic的订阅者的连接意味着在该非持久topic上所有正在传输的消息都会丢失。
在非持久topic中,broker立即将消息传递给所有已连接的订阅,而不会持久化到BookKeeper。
如果订阅断开连接,broker无法传递正在传输的消息,订阅将无法再次收到这些消息。
非持久topic的消息传递会稍微比持久topic的快。
性能
非持久化消息传递通常比持久化消息传递更快,因为代理不会持久化消息,而是在消息被传递到连接的代理时立即将ack发送回生产者。生产者因此看到相对较低的非持久性主题的发布延迟。连接方式
生产者和消费者与以持久性topic相同的方式连接到非持久性topic,但需要以non-persistent开头。
支持三种订阅类型:
exclusive、shared、failover
10、消息重传
11、消息保留和过期
12、消息去重
13、消息延迟传递
四、架构
在最高级别上,一个pulsar实例可以由一个或多个集群组成,实例中的集群之间可以复制数据。