一、Pulsar客户端简析
pulsar服务是经典的C/S架构,由客户端和服务端构成。服务端提供处理读写请求服务,客户端负责发起读写请求。pulsar将客户端按照读写分成了生产者和消费者,但是无论怎么分,它们本质上都是Pulsar客户端并有很多相同的地方,本篇就针对客户端进行分析。
下图是客户端的组成部分
-
ConnectionPool连接池(值得深挖)
连接池是客户端的核心,维护着跟服务端的TCP连接,客户端的读写(生产者/消费者)都强依赖网络连接,因为最后数据还是要通过底层的网络进行通信
-
LookupService
具有路有属性的客户端角色,负责查找Topic归属的Broker节点
-
线程池
-
事件线程池
-
外部线程池
-
内部线程池
-
二、创建客户端对象
1、创建流程
先看看下面一段代码
这是通过生产者写数据到Pulsar服务端的case,相信大家不会感到陌生,其中34行就是创建客户端的逻辑,因此就从这里进行跟踪
通过跟踪PulsarClient.builder方法可以看到这是一个静态方法,可以看到返回值是ClientBuilder对象,具体实现逻辑咱们先不关心,咱们只需要知道这个方法会创建一个ClientBuilder就够了,结合之前的创建的逻辑可以知道最终会走到ClientBuilder#build方法
build方法中核心就是50行的通过PulsarClientImpl构造函数创建客户端对象,其他的都是一些配置校验工作。下面就直接进入PulsarClientImpl的构造函数进行分析
这个构造方法虽然逻辑不少,但实际上重要的就是201行的初始化
三、创建网络连接池
1. 初始化
这里的ClientCnx对象非常重要,后面再细讲。现在先继续跟踪构造函数的逻辑
这里初始化了Netty客户端,在跟外部创建网络连接时直接通过Netty客户端进行即可。客户端初始化阶段,网络连接池基本上就做了这些事,那么它是怎么工作的呢,让我们来往下看
2. 工作流程
在创建生产者对象时,会走到PulsarClientImpl#getConnection这个方法,从这里开始进行分析
此方法第一行是通过Lookup服务查找Topic归属的Broker节点信息,Lookup相关的感兴趣的可以看 Apache Pulsar源码解析之Lookup机制 这篇文章,在969行会将Lookup查到的Broker地址进行创建网络连接
可以看到获取网络连接方法最终会调用ConnectionPool类的方法进行获取
这里会根据参数randomKey(Broker IP地址)进行网络连接的复用,如果还没创建的话则进行创建,同时检查清理不可用的网络连接进行释放
调用createConnection方法进行连接创建,同时也在通道创建通道成功后绑定监听器,在通道被关闭时一起关闭当前的网络连接。
将unresolvedPhysicalAddress解析出正确的地址,现在还不太清楚已经有了logicalAddress(目标Broker地址),还解析这个的用途是什么
调用connectToAddress方法创建网络连接,如果出现异常时,如果服务端在Lookup阶段返回不止一个地址,那么就尝试跟下一个地址创建网络连接
这里应该就很熟悉了,就是通过Netty客户端创建跟目标Broker节点的TCP连接。
3. 小结
连接池是一个ConcurrentHashMap,key是IP+端口,value是ClientCnx来缓存这些连接。ClientCnx除了管理连接还管理所有的业务命令,例如发送消息是Send命令,服务端会对应一个handleSend方法来处理这个命令。
连接池是客户端最重要的内容,无论是生产者还是消费者在创建的时候,都会去连接池获取/创建跟Broker的网络连接,后续的数据读写本质上都是通过Netty的channel进行的,因此可见它是相当重要的。连接池的设计将网络连接跟其他读写功能剥离开来达到职责分离的效果,同时通过池化概念,在多个生产者/消费者情况下不仅节约了网络连接的创建,同时还提升网络连接创建的性能(复用思路)。
四、其他功能
1. LookupService
Lookup机制工作逻辑在 Apache Pulsar源码解析之Lookup机制 这篇文章已经说明了,这里主要一起看看它的创建逻辑。在PulsarClientImpl构造函数中,我们能看到以下代码
Pulsar支持两种Lookup实现,一种是通过http协议去跟服务端通信,另一种是通过二进制方式查询(性能更好)。本次就跟着比较常见的http方式进行分析
通过构造函数可以看到是通过创建HttpClient对象进行的,这里我一开始以为是用的开源的HttpClient就没继续跟,后来在调试的时候跟进去才发现,这是Pulsar 自定义对象
跟踪进去看,其他都是赋值、安全检查的工作,核心在161行的httpClient创建。 这里可以看到是创建的AsyncHttpClient对象,这是一个封装Netty的async-http-client-2.12.1.jar的外部包,这是支持异步处理的高性能HTTP工具包
2. MemoryLimitController
由于MemoryLimitController的内容不多,就看看它的创建以及工作流程
在PulsarClientImpl的构造函数中可以看到会调用MemoryLimitController构造函数进行创造,从这里进去分析
构造逻辑很简单,就只有赋值操作,到这里就成功创建MemoryLimitController对象了,那就再看看它是怎么工作的吧。MemoryLimitController的核心工作逻辑是checkTrigger方法
这里会调用trigger的run方法,这个trigger的逻辑其实就是创建MemoryLimitController的构造方法第三个参数,也就是上面的reduceConsumerReceiverQueueSize方法,那么继续跟踪
循环调用所有的消费者的reduceCurrentReceiverQueueSize方法,也就是说MemoryLimitController目前只能限制消费者的内存。
这里的限制逻辑相当于将所有消费者的接收队列的容量减半,避免队列中的数据占用过多内存。由此可见MemoryLimitController目前做的还是比较小范围的内存控制,并且整体逻辑并不复杂。
3. 线程池
客户端对象初始化时候,会创建三个线程池,现在就来分析下它们的用途
首先是externalExecutorProvider线程池通过查看调用的方法,可以看到是被消费者使用,里面的工作线程主要是用于消费服务端的消息
再来看看internalExecutorProvider线程池,可以看到是用来是一些跟偏向Pulsar系统层面的工作
至于scheduledExecutorProvider相信就更简单了,顾名思义都是处理一些周期性的任务,例如下面的周期性的同步Topic分区信息到客户端
五、总结
- PulsarClient中的EventLoopGroup负责创建TCP连接,ConnectionPool对象负责管理连接,在创建ConnectionPool对象时会通过EventLoopGroup创建连接。
- PulsarClient使用Netty来创建TCP连接,并管理一个连接池和两个线程池,所有Producer和Consumer都会复用PulsarClient的连接池和线程池,这样可以避免客户端创建过多的连接和线程。因此通常一个进程中只创建一个PulsarClient,每个Topic可以自己单独创建Producer和Consumer。
- 由于Broker使用了Reactor模型,单线程只负责转发事件,而数据的读取、解码、处理等都是在工作线程中完成的,也就是服务端所有请求都是异步完成的;客户端Producer/Consumer都是异步的,因此不存在单连接的性能问题。
- PulsarClient默认只会与每个Broker建立一个连接,如果觉得不够可以通过配置来调大。
- ClientCnx非常重要,这是netty的入队处理逻辑类。后续会专门针对这个类进行解析