Apache Pulsar 技术系列 - PulsarClient 实现解析

news2025/1/12 6:05:26

导语

Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制(GEO replication)、快速扩容、灵活容错等特性。同时为了达到高性能,低延时、高可用,Pulsar 在客户端也做了很多的优化,本文主要讲述 PulsarClient 基本原理和实现。

PulsarClient 简介

Pulsar 客户端 API 设计优雅简洁,使用 PulsarClient 作为客户端的总入口,方便用户记忆和构建出具体的客户端,例如:

  • Producer: 生产者用来发送消息到指定 Topic。

  • Consumer: 消费者通过订阅关联到指定 Topic 并接收消息。

  • Reader: 手动管理 Cursors 的消费者。(内部使用 Consumer 实现)。

PulsarClient 还统一管理客户端系统资源,为具体的客户端提供了部分通用化处理,包括连接管理、线程管理、内存管理等。接下来让我们了解一下 PulsarClient 是如何实现的。

PulsarClient 有哪些功能

作为客户端的统一入口,下面代码片段不难看出 PulsarClient 主要功能是构建、销毁 PulsarClient 实例,以及构建各种具体 Client 和事务实例。

public interface PulsarClient extends Closeable {
    ProducerBuilder<byte[]> newProducer();
    <T> ProducerBuilder<T> newProducer(Schema<T> schema);
    ConsumerBuilder<byte[]> newConsumer();
    <T> ConsumerBuilder<T> newConsumer(Schema<T> schema);
    ReaderBuilder<byte[]> newReader();
    <T> ReaderBuilder<T> newReader(Schema<T> schema);
    void updateServiceUrl(String serviceUrl) throws PulsarClientException;
    CompletableFuture<List<String>> getPartitionsForTopic(String topic);
    CompletableFuture<Void> closeAsync();
    void shutdown() throws PulsarClientException;
    boolean isClosed();
    TransactionBuilder newTransaction() throws PulsarClientException;
}

实现原理

初始化过程

PulsarClient 可以使用以下代码来实例化。

PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://broker:6650").build();

PulsarClient 以及具体客户端都使用 Builder 模式构建,每种客户端都有对应的 ConfigurationData 来管理配置,PulsarClient 核心配置如下:

public class ClientConfigurationData implements Serializable, Cloneable {
    private String serviceUrl;
   // 用来在运行时外部改变url
    private transient ServiceUrlProvider serviceUrlProvider;
    private long operationTimeoutMs = 30000;
    private long statsIntervalSeconds = 60;
    private int numIoThreads = 1;
    private int numListenerThreads = 1;
    private int connectionsPerBroker = 1;
    private boolean useTcpNoDelay = true;
    private int concurrentLookupRequest = 5000;
    private int maxLookupRequest = 50000;
    private int maxLookupRedirects = 20;
    private int maxNumberOfRejectedRequestPerConnection = 50;
    private int keepAliveIntervalSeconds = 30;
    private int connectionTimeoutMs = 10000;
    private int requestTimeoutMs = 60000;
    private long initialBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
    private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(60);
    private boolean enableBusyWait = false;
    private String listenerName;
   // 全局内存限制(producer使用)
    private long memoryLimitBytes = 0;
    private String proxyServiceUrl;
    private ProxyProtocol proxyProtocol;
    long tickDuration = 1;
    // transaction
    private boolean enableTransaction = false;
}

PulsarClient 的初始化过程比较简单,逐个初始化内部模块,以下代码片段展示了 Client 内部主要的模块。

public class PulsarClientImpl implements PulsarClient {
    // 配置
    protected final ClientConfigurationData conf;
   // 本地元数据管理器,主要负责topic分区个数、topic对应的owner节点以及schema信息
    private LookupService lookup;
   // 共享连接池 双层map结构
    private final ConnectionPool cnxPool;
   // 时间轮
    private final Timer timer;
   // 执行外部逻辑线程组(主要消费使用)
    private final ExecutorProvider externalExecutorProvider;
   // 执行内部逻辑线程组(主要消费使用)
    private final ExecutorProvider internalExecutorService;
    private final AtomicReference<State> state = new AtomicReference<>();
   //producer集合
    private final Set<ProducerBase<?>> producers;
   //consumer集合
    private final Set<ConsumerBase<?>> consumers;
   //producer自增Id
    private final AtomicLong producerIdGenerator = new AtomicLong();
   //consumer自增Id
    private final AtomicLong consumerIdGenerator = new AtomicLong();
   // 请求自增Id
    private final AtomicLong requestIdGenerator = new AtomicLong();
   // netty 线程组
    protected final EventLoopGroup eventLoopGroup;
   // 生产本地buffer内存限制器
    private final MemoryLimitController memoryLimitController;
  ...
}

PulsarClient 初始化时主要创建了 Netty 客户端,连接池、时间轮等对象,只是准备好资源,并没有和服务端建立连接进行任何交互。只有在创建具体的客户端时,才会和服务端有交互。

Producer 创建

Pulsar 是以 Topic 粒度对外提供服务,多分区 Topic 等同于多个不同数字后缀的 Topic 集合。下文提到的 Topic-Partition 包含了单分区 Topic 和多分区 Topic 中的一个  Partition。Pulsar 客户端的实现 Topic-Partition 之间是相互独立的,SDK 内部会为每个 Topic-Partition 单独创建一个具体的客户端。我们在这里只介绍 Producer 的初始化流程(Consumer 类似)。

可以用以下代码构建 Producer。

Producer<byte[]> producer = client.newProducer().topic("my-topic").create();

当 My-topic 为 Non-partitioned Topic,会实例化一个 ProducerImpl 对象并返回,当 My-topic 分区数量大于0时,则会创建 PartitionedProducerImpl 对象。PartitionedProducerImpl 对象内包含了 List。可以理解为 PulsarClient 创建 Producer 时,最终会创建和分区数量一致的 ProducerImpl 对象,每个 ProducerImpl 都独立工作,互不影响(Consumer 类似)。

在创建 Producer 时客户端与服务端命令字交互如下:

  1. PulsarClient 通过用户指定的 ServiceUrl 挑选一个 url 来连接服务端,并做认证相关操作。

  2. 使用 LookupService 发送 PARTITIONED_METADATA 命令字查询给定 Topic 的分区数。

  3. 根据 Metadata 返回结果中的分区数循环创建 ProducerImpl 对象。

    3.1 ProducerImpl 对象初始化时会使用 LookupService 发送 LOOKUP 请求查询对应的分区的 Owner 节点 Lookup 过程可参考https://km.woa.com/articles/show/555638。

    3.2 根据 LOOKUP 响应连接到 Owner 节点,并发送 PRODUCER 请求向服务端创建 Producer。

    到这里 Producer 就已经创建完毕,可以正式使用来发送消息了。

ps: 如果创建好 Producer 后,分区数量有变化了,比如服务端扩容了,客户端可以感知到并增加 ProducerImpl 对象数量吗。答案是可以的,默认会定时1分钟发起一次检测,有分区变化会做相应处理。

连接管理

与大部分组件一样,客户端和服务端使用长连接通信。Pulsar 协议设计上不是传统的应答模式,可以同时支持多个客户端使用同一个连接并行发送接收请求(服务端会串行处理单个 Topic-partition 上的请求来保证消息顺序性)。

得益于连接共享,客户端消耗的连接数是很少的,PulsarClient 会为每台 Broker 创建一个连接池,默认连接数为1, 用户可以使用 ConnectionsPerBroker 配置来设置每台 Broker 最大连接数。ProducerImpl、ConsumerImpl 在初始化时,会随机从连接池中获取一个连接用来和服务端通信。

下图中 maxConnectionsPerHosts=2, 连接池中为每个 Broker 创建2个连接,6个客户端会在对应 T opic owner 节点里随机挑选一个连接绑定。

连接健康管理

Pulsar keepAlive 检测是双向的,连接创建成功后,客户端和服务端都会定时30s(KeepAliveIntervalSeconds 配置可修改)发送 Ping 请求到对端,接收到 Ping 请求后会回应 Pong 来标识存活。在以下几种情况下,客户端、服务端都会主动断开连接:

  • 超时时间内没有完成握手动作。

  • 发送 Ping 或者 Pong 命令时,Netty 回调发送失败。

  • 连接 isAutoRead 打开并且超时时间内没有收到任何请求(包含 Ping、Pong)。

连接断开后,会通知绑定在该连接上的所有客户端,这些客户端会重新从连接池中获取健康的连接。Pulsar 中空闲连接不会自动回收。

线程模型

PulsarClient 使用 Netty 作为网络通信框架, 是标准的 Netty 客户端。协议处理和事件驱动都是依托于 Netty。核心处理类直接继承于 Netty Handler。

所以线程模型也主要围绕于 Netty 的 EventLoopGroup。上文提到,客户端资源管理都收敛于 PulsarClient,也就是使用同一个 PulsarClient 创建出来的具体客户端都共享该 PulsarClient 中的线程等资源,比如使用 ClientA 对象分别创建一个或多个 Producer、Consuemer、Reader 客户端,这些客户端都共享 Client 中的线程资源。

PulsarClient 线程、线程组如下:

图中实线表示客户端会从线程池中挑选一个线程绑定运行。

  • Pulsar-client-io: io 线程( Netty 内部线程),负责网络连接和读写。NumIoThreads 参数配置,默认值为1。客户端不直接绑定 IO 线程,而是由其内部的连接来绑定 IO 线程,所以 IO 线程数配置最好小于或者等于总连接数,否则有些线程不会使用到。

  • Pulsar-client-internal: 主要用于 Consumer 内部处理,比如接收到消息后放置到接收队列等。也是通过 NumIoThreads 参数配置,默认值为1。

  • Pulsar-external-listener: 主要用于 Consumer 外部处理,比如用户消费逻辑回调。NumListenerThreads 参数配置,默认值为1。

  • Pulsar-timer: 时间轮内部线程,负责所有定时操作,比如连接重连,发送超时检测等。一个 PulsarClient 对应一个线程。

简单描述一下生产消费时线程是如何交互:

  • 生产: 用户线程创建消息并放置到本地缓存,IO 线程负责把消息发送到服务端。

  • 消费: IO 线程接收到服务端的消息推送,使用  Pulsar-client-internal 线程把消息放在本地缓存队列,然后使用 Pulsar-external-listener 线程执行用户消息处理逻辑。

总结和思考

本文介绍了 Pulsar 整体客户端架构,讲解了 PulsarClient、Producer 初始化过程以及客户端的连接管理和线程模型。并没有涉及到详细的生产消费过程。大家不难发现 Pulsar 客户端和其他组件客户端相比,较大的区别就是会给每个 Topic-partition 创建  Producer/consumer。如果客户端关联的 Topic-partition 数量很大,Producer/consumer 数量会急剧膨胀,从而导致客户端需要消耗更多的资源。也正是因为  Producer/consumer 数量可能较大,连接和线程等资源不可能做到独立,只能是 Producer/consumer 共享。而资源共享就不可避免出现客户端之间会相互影响,比如限流是控制在连接维度,但是由于连接是共享的,某些 Topic 的限流就会影响到该连接上的全部客户端。建议用户客户端关联的 Topic-partition 数量较大时,可以适当调大连接池和线程池大小来缓解影响,或者使用不同的 PulsarClient 来做客户端隔离。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1325176.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux C | 文件I/O】文件的打开关闭 | open、creat、colse 函数

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; &#x1f923;本文内容&#x1f923;&a…

windows安装、基本使用vim

标题&#xff1a;windows安装、基本使用vim 1.下载并安装GVIM 百度网盘链接 提取码&#xff1a;2apr 进入安装界面&#xff0c;如下&#xff0c;勾选 其它都是默认即可 参考&#xff1b; 2.在powershell中使用vim 参考blog&#xff1a;window10安装vim编辑器 安装好后&…

SpringBoot+WebSocket

SpringBootWebSocket 1.导入依赖&#xff1a; -- Spring Boot 2.x 使用 javax.websocket-- Spring Boot 3.x 使用 jakarta.websocket<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId&g…

C# WPF上位机开发(业务主流程才是核心)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 前面我们说了很多的c# wpf编程技术&#xff0c;里面有控件&#xff0c;有绘图&#xff0c;有数据库&#xff0c;有多线程等技术。但是他们都属于实…

宝塔Linux面板计划任务:文件夹改名方式天天切割日志脚本

新手第一次操作&#xff0c;目测成功且完美&#xff0c;供大家参考 current_time$(date %Y%m%d%H%M%S) old_folder_name"/www/wwwlogs" new_folder_name"/www/wwwlogs_${current_time}" mv "$old_folder_name" "$new_folder_name" m…

Inkscape SVG 编辑器 导入 Gazebo

概述 本教程描述了拉伸 SVG 文件的过程&#xff0c;这些文件是 2D 的 图像&#xff0c;用于在 Gazebo 中为您的模型创建 3D 网格。有时是 更容易在 Inkscape 或 Illustrator 等程序中设计模型的一部分。 在开始之前&#xff0c;请确保您熟悉模型编辑器。 本教程将向您展示如…

Backend - Django 项目创建 运行

目录 一、配置环境 二、创建 Django 项目 &#xff08;一&#xff09;新建文件夹 &#xff08;二&#xff09;打开文件夹 &#xff08;三&#xff09;打开运行终端 &#xff08;四&#xff09;创建基础项目 &#xff08;五&#xff09;创建app 1. 安装Django &#xf…

app上架-您的应用在运行时,未同步告知权限申请的使用目的,向用户索取(相机)等权限,不符合华为应用市场审核标准。

上架提示 您的应用在运行时&#xff0c;未同步告知权限申请的使用目的&#xff0c;向用户索取&#xff08;相机&#xff09;等权限&#xff0c;不符合华为应用市场审核标准。 测试步骤&#xff1a;管理-添加-点击二维码&#xff0c;申请相机权限 修改建议&#xff1a;APP在调…

Gazebo GUI模型编辑器

模型编辑器 现在我们将构建我们的简单机器人。我们将制作一个轮式车辆&#xff0c;并添加一个传感器&#xff0c;使我们能够让机器人跟随一个斑点&#xff08;人&#xff09;。 模型编辑器允许我们直接在图形用户界面 &#xff08;GUI&#xff09; 中构建简单的模型。对于更复…

MyBatis ORM映射

MyBatis只能自动维护库表”列名“与”属性名“相同时的对应关系&#xff0c;二者不同时无法自动ORM 因此需要使用到ORM映射。 共有两种解决办法&#xff1a;1.列的别名 2.结果映射 1.列的别名 在SQL中使用 as 为查询字段添加列别名&#xff0c;以匹配属性名 public List<…

VCG 获取某个顶点的邻接顶点

文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 与之前的思路相同,这里我们利用VCG为我们提供的拓扑结构,获取某个顶点的邻接顶点,这在我们处理网格数据时往往很有用。 二、实现代码 //VCG #include <vcg/complex/algorithms/create/platonic.h> #inclu…

Java_集合进阶(Collection和List系列)

一、集合概述和分类 1.1 集合的分类 已经学习过了ArrayList集合&#xff0c;但是除了ArrayList集合&#xff0c;Java还提供了很多种其他的集合&#xff0c;如下图所示&#xff1a; 我想你的第一感觉是这些集合好多呀&#xff01;但是&#xff0c;我们学习时会对这些集合进行…

idea 如何使用 JaCoCo 跑覆盖率

背景介绍 什么代码覆盖&#xff1f; 代码覆盖(Code coverage)是软件测试中的一种度量&#xff0c;描述程序中源代码被测试的比例和程度&#xff0c;所得比例称为代码覆盖率。简单来理解&#xff0c;就是单元测试中代码执行量与代码总量之间的比率。 Java常用的单元测试覆盖率…

麒麟V10 ARM 离线生成RabbitMQ docker镜像并上传Harbor私有仓库

第一步在外网主机执行&#xff1a; docker pull arm64v8/rabbitmq:3.8.9-management 将下载的镜像打包给离线主机集群使用 在指定目录下执行打包命令&#xff1a; 执行&#xff1a; docker save -o rabbitmq_arm3.8.9.tar arm64v8/rabbitmq:3.8.9-management 如果懒得打包…

Java开发框架和中间件面试题(1)

1.什么是Spring框架&#xff1f; Spring是一种轻量级框架&#xff0c;旨在提高开发人员的开发效率以及系统的可维护性。 我们一般说的Spring框架就是Spring Framework,它是很多模块的集合&#xff0c;使用这些模块可以很方便的协助我们进行开发。这些模块是核心容器、数据访…

comfyUI + animateDiff video2video AI视频生成工作流介绍及实例

原文&#xff1a;comfyUI animateDiff video2video AI视频生成工作流介绍及实例 - 知乎 目录 收起 前言 准备工作环境 comfyUI相关及介绍 comfyUI安装 生成第一个视频 进一步生成更多视频 注意事项 保存为不同的格式 视频宽高设置 种子值设置 提示词与负向提示词…

【漏洞复现】奥威亚 教学视频应用服务平台任意文件上传漏洞

漏洞描述 AVA 教学视频应用服务平台是由广州市奥威亚电子科技有限公司基于当前教育视频资源建设的背景及用户需求的调研,开发出来能够适应时代发展和满足学校需求,具有实效性、多功能、特点鲜明的平台。 该平台存在任意文件上传漏洞,通过此漏洞攻击者可上传webshell木马,…

【Python-批量修改视频分辨率】

Python-批量修改视频分辨率 1 使用Python修改视频分辨率2 常见的视频编码格式2.1 等效的编码格式表示方式2.2 常见的编码格式 1 使用Python修改视频分辨率 首先拷贝视频文件并修改后缀&#xff0c;然后修改图片的分辨率&#xff0c;实现视频批量修改和转换。 import os impor…

npm安装依赖报错ERESOLVE unable to resolve dependency tree(我是在taro项目中)(node、npm 版本问题)

换了电脑之后新电脑安装包出错 &#x1f447;&#x1f447;&#x1f447; npm install 安装包报错 ERESOLVE unable to resolve dependency tree 百度后尝试使用 npm install --force 还是报错 参考 有人说是 node 版本和 npm 版本的问题 参考 新电脑 node版本&#xff1a;16.1…

在Windows系统平台下部署运行服务端Idea工程的jar服务

前言 目前云原生docker等技术&#xff0c;加上部署流水线大大的简化了各种流程&#xff0c;我们后端开发的人员只需要提交代码后&#xff0c;构建、部署、测试、发布等环节都无需人员接入&#xff0c;完全的自动化交付了。那么你肯定不禁想问&#xff0c;如题的需求不是点击一…