PulsarClient源码解析

news2024/12/23 16:28:57

一、Pulsar客户端简析

pulsar服务是经典的C/S架构,由客户端和服务端构成。服务端提供处理读写请求服务,客户端负责发起读写请求。pulsar将客户端按照读写分成了生产者和消费者,但是无论怎么分,它们本质上都是Pulsar客户端并有很多相同的地方,本篇就针对客户端进行分析。

下图是客户端的组成部分

在这里插入图片描述

  1. ConnectionPool连接池(值得深挖)

    连接池是客户端的核心,维护着跟服务端的TCP连接,客户端的读写(生产者/消费者)都强依赖网络连接,因为最后数据还是要通过底层的网络进行通信

  2. LookupService

    具有路有属性的客户端角色,负责查找Topic归属的Broker节点

  3. 线程池

    1. 事件线程池

    2. 外部线程池

    3. 内部线程池

二、创建客户端对象

1、创建流程

先看看下面一段代码
在这里插入图片描述

这是通过生产者写数据到Pulsar服务端的case,相信大家不会感到陌生,其中34行就是创建客户端的逻辑,因此就从这里进行跟踪

在这里插入图片描述

通过跟踪PulsarClient.builder方法可以看到这是一个静态方法,可以看到返回值是ClientBuilder对象,具体实现逻辑咱们先不关心,咱们只需要知道这个方法会创建一个ClientBuilder就够了,结合之前的创建的逻辑可以知道最终会走到ClientBuilder#build方法
在这里插入图片描述

build方法中核心就是50行的通过PulsarClientImpl构造函数创建客户端对象,其他的都是一些配置校验工作。下面就直接进入PulsarClientImpl的构造函数进行分析

在这里插入图片描述

这个构造方法虽然逻辑不少,但实际上重要的就是201行的初始化

三、创建网络连接池

1. 初始化

在这里插入图片描述

这里的ClientCnx对象非常重要,后面再细讲。现在先继续跟踪构造函数的逻辑

在这里插入图片描述

这里初始化了Netty客户端,在跟外部创建网络连接时直接通过Netty客户端进行即可。客户端初始化阶段,网络连接池基本上就做了这些事,那么它是怎么工作的呢,让我们来往下看

2. 工作流程

在创建生产者对象时,会走到PulsarClientImpl#getConnection这个方法,从这里开始进行分析
在这里插入图片描述

此方法第一行是通过Lookup服务查找Topic归属的Broker节点信息,Lookup相关的感兴趣的可以看 Apache Pulsar源码解析之Lookup机制 这篇文章,在969行会将Lookup查到的Broker地址进行创建网络连接

在这里插入图片描述

可以看到获取网络连接方法最终会调用ConnectionPool类的方法进行获取

在这里插入图片描述

这里会根据参数randomKey(Broker IP地址)进行网络连接的复用,如果还没创建的话则进行创建,同时检查清理不可用的网络连接进行释放

在这里插入图片描述

调用createConnection方法进行连接创建,同时也在通道创建通道成功后绑定监听器,在通道被关闭时一起关闭当前的网络连接。

在这里插入图片描述

将unresolvedPhysicalAddress解析出正确的地址,现在还不太清楚已经有了logicalAddress(目标Broker地址),还解析这个的用途是什么

在这里插入图片描述

调用connectToAddress方法创建网络连接,如果出现异常时,如果服务端在Lookup阶段返回不止一个地址,那么就尝试跟下一个地址创建网络连接

在这里插入图片描述

这里应该就很熟悉了,就是通过Netty客户端创建跟目标Broker节点的TCP连接。

3. 小结

连接池是一个ConcurrentHashMap,key是IP+端口,value是ClientCnx来缓存这些连接。ClientCnx除了管理连接还管理所有的业务命令,例如发送消息是Send命令,服务端会对应一个handleSend方法来处理这个命令。

连接池是客户端最重要的内容,无论是生产者还是消费者在创建的时候,都会去连接池获取/创建跟Broker的网络连接,后续的数据读写本质上都是通过Netty的channel进行的,因此可见它是相当重要的。连接池的设计将网络连接跟其他读写功能剥离开来达到职责分离的效果,同时通过池化概念,在多个生产者/消费者情况下不仅节约了网络连接的创建,同时还提升网络连接创建的性能(复用思路)。

四、其他功能

1. LookupService

Lookup机制工作逻辑在 Apache Pulsar源码解析之Lookup机制 这篇文章已经说明了,这里主要一起看看它的创建逻辑。在PulsarClientImpl构造函数中,我们能看到以下代码

在这里插入图片描述

Pulsar支持两种Lookup实现,一种是通过http协议去跟服务端通信,另一种是通过二进制方式查询(性能更好)。本次就跟着比较常见的http方式进行分析

在这里插入图片描述

通过构造函数可以看到是通过创建HttpClient对象进行的,这里我一开始以为是用的开源的HttpClient就没继续跟,后来在调试的时候跟进去才发现,这是Pulsar 自定义对象

在这里插入图片描述

跟踪进去看,其他都是赋值、安全检查的工作,核心在161行的httpClient创建。 这里可以看到是创建的AsyncHttpClient对象,这是一个封装Netty的async-http-client-2.12.1.jar的外部包,这是支持异步处理的高性能HTTP工具包

2. MemoryLimitController

由于MemoryLimitController的内容不多,就看看它的创建以及工作流程

在这里插入图片描述

在PulsarClientImpl的构造函数中可以看到会调用MemoryLimitController构造函数进行创造,从这里进去分析

在这里插入图片描述

构造逻辑很简单,就只有赋值操作,到这里就成功创建MemoryLimitController对象了,那就再看看它是怎么工作的吧。MemoryLimitController的核心工作逻辑是checkTrigger方法

在这里插入图片描述

这里会调用trigger的run方法,这个trigger的逻辑其实就是创建MemoryLimitController的构造方法第三个参数,也就是上面的reduceConsumerReceiverQueueSize方法,那么继续跟踪

在这里插入图片描述

循环调用所有的消费者的reduceCurrentReceiverQueueSize方法,也就是说MemoryLimitController目前只能限制消费者的内存。

在这里插入图片描述

这里的限制逻辑相当于将所有消费者的接收队列的容量减半,避免队列中的数据占用过多内存。由此可见MemoryLimitController目前做的还是比较小范围的内存控制,并且整体逻辑并不复杂。

3. 线程池

客户端对象初始化时候,会创建三个线程池,现在就来分析下它们的用途

在这里插入图片描述

首先是externalExecutorProvider线程池通过查看调用的方法,可以看到是被消费者使用,里面的工作线程主要是用于消费服务端的消息

在这里插入图片描述

再来看看internalExecutorProvider线程池,可以看到是用来是一些跟偏向Pulsar系统层面的工作

在这里插入图片描述

至于scheduledExecutorProvider相信就更简单了,顾名思义都是处理一些周期性的任务,例如下面的周期性的同步Topic分区信息到客户端

在这里插入图片描述

五、总结

  1. PulsarClient中的EventLoopGroup负责创建TCP连接,ConnectionPool对象负责管理连接,在创建ConnectionPool对象时会通过EventLoopGroup创建连接。
  2. PulsarClient使用Netty来创建TCP连接,并管理一个连接池和两个线程池,所有Producer和Consumer都会复用PulsarClient的连接池和线程池,这样可以避免客户端创建过多的连接和线程。因此通常一个进程中只创建一个PulsarClient,每个Topic可以自己单独创建Producer和Consumer。
  3. 由于Broker使用了Reactor模型,单线程只负责转发事件,而数据的读取、解码、处理等都是在工作线程中完成的,也就是服务端所有请求都是异步完成的;客户端Producer/Consumer都是异步的,因此不存在单连接的性能问题。
  4. PulsarClient默认只会与每个Broker建立一个连接,如果觉得不够可以通过配置来调大。
  5. ClientCnx非常重要,这是netty的入队处理逻辑类。后续会专门针对这个类进行解析

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1931728.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QT--控件篇四

一、对话框 在软件开发中,对话框(Dialog)是一种常见的用户界面元素,用于与用户进行交互和获取信息。它通常以模态或非模态的形式出现,模态对话框会阻止用户与应用程序的其他部分交互,直到对话框关闭为止&a…

快速排序(quick sort)

欢迎来到一夜看尽长安花 博客,您的点赞和收藏是我持续发文的动力 对于文章中出现的任何错误请大家批评指出,一定及时修改。有任何想要讨论的问题可联系我:3329759426qq.com 。发布文章的风格因专栏而异,均自成体系,不足…

nftables(9)NAT、FLOWTABLES

NAT NAT简介 我们在iptables、firewalld中都介绍过有关NAT的相关部分。那么在nftables中,我们继续介绍nftables中NAT的功能实现方式,配置方法和与前两者的区别。 我们先简单回顾一下NAT的类型和其功能: 这些是不同的网络地址转换&#xf…

在 Windows 上开发.NET MAUI 应用_1.安装开发环境

开发跨平台的本机 .NET Multi-platform App UI (.NET MAUI) 应用需要 Visual Studio 2022 17.8 或更高版本,或者具有 .NET MAUI 扩展的最新 Visual Studio Code。要开始在 Windows 上开发本机跨平台 .NET MAUI 应用,请按照安装步骤安装 Visual Studio 20…

leetcode94. 二叉树的中序遍历,递归法+迭代法。附带前序遍历方法

leetcode94. 二叉树的中序遍历 给定一个二叉树的根节点 root ,返回 它的 中序 遍历 。 示例 1: 输入:root [1,null,2,3] 输出:[1,3,2] 示例 2: 输入:root [] 输出:[] 示例 3: …

高职综合布线实训室

一、高职综合布线实训室建设背景 随着《国民经济和社会发展第十四个五年规划和2035年远景目标纲要》的深入实施,数字化转型已成为国家发展的核心战略之一,计算机网络技术作为数字化建设的基石,其重要性日益凸显。然而,面对数字时代…

【手撕RLHF-DPO(1)】不是PPO训不起,而是DPO更有性价比!

Introduction Direct Preference Optimization: Your Language Model is Secretly a Reward Model 在LLM对齐问题上,OpenAI提出的RLHF训练范式最为人熟知,同时也是ChatGPT行之有效的对齐方案。 RLHF通常包含三个步骤:SFT, Reward Model, PPO…

【STM32】RTT-Studio中HAL库开发教程三:IIC通信--AHT20

文章目录 一、I2C总线通信协议二、AHT20传感器介绍三、STM32CubeMX配置硬件IIC四、RTT中初始化配置五、具体实现代码六、实验现象 一、I2C总线通信协议 使用奥松的AHT20温湿度传感器,对环境温湿度进行采集。AHT20采用的是IIC进行通信,可以使用硬件IIC或…

Visual Studio使用——在vs中给vb.net项目添加新的窗口:新建的方式、添加已有窗口的方式

目录 引出Visual Studio使用vb添加新的窗体自定义代码片段vs显示所有文件 总结Idea安装和使用0.Java下载 和 IDEA工具1.首次新建项目2.隐藏文件不必要显示文件3.目录层级设置4.Settings设置选择idea的场景提示代码不区分大小写 取消git的代码作者显示 引出 Visual Studio使用—…

trl - 微调、对齐大模型的全栈工具

文章目录 一、关于 TRL亮点 二、安装1、Python包2、从源码安装3、存储库 三、命令行界面(CLI)四、如何使用1、SFTTrainer2、RewardTrainer3、PPOTrainer4、DPOTrainer 五、其它开发 & 贡献参考文献最近策略优化 PPO直接偏好优化 DPO 一、关于 TRL T…

安全防御,防火墙配置NAT转换智能选举综合实验

目录: 一、实验拓扑图 二、实验需求 三、实验大致思路 四、实验步骤 1、防火墙的相关配置 2、ISP的配置 2.1 接口ip地址配置: 3、新增设备地址配置 4、多对多的NAT策略配置,但是要保存一个公网ip不能用来转换,使得办公区的…

c++入门----类与对象(上)

大家好啊,好久没有更新了。因为本人的愚笨,想与大家分享的话肯定还得自己明白了才能给大家分享吧。所以这几天都在内部消化。好给大家优质的文章。当然我写的肯定还是很有问题的,希望大家可以在评论区里面指出来。好,废话不多说&a…

LabVIEW 与 PLC 通讯方式

在工业自动化中,LabVIEW 与 PLC(可编程逻辑控制器)的通信至关重要,常见的通信方式包括 OPC、Modbus、EtherNet/IP、Profibus/Profinet 和 Serial(RS232/RS485)。这些通信协议各有特点和应用场景&#xff0c…

从图表访问Data Store Memory

Simulink模型将全局变量实现为数据存储,可以是数据存储内存块,也可以是Simulink.Signal的实例。您可以使用数据存储在多个Simulink块之间共享数据,而无需显式的输入或输出连接来将数据从一个块传递到另一个块。Stateflow图表通过符号化地读取…

警惕预言成真!3本预警、On Hold已被剔除,新增8本SCI/SSCI被除名!7月WOS更新(附下载)

本周投稿推荐 SCI • 能源科学类,1.5-2.0(25天来稿即录) • IEEE计算机类,4.0-5.0(48天录用) • 生物医学制药类(2天逢投必中) EI • 各领域沾边均可(2天录用&…

精益思维在数字工厂建设中的具体应用

在数字化浪潮席卷全球的今天,数字工厂建设已成为企业转型升级的必由之路。然而,如何确保数字工厂的高效运行和持续创新,成为摆在众多企业面前的难题。精益思维,作为一种追求持续改进和卓越绩效的管理理念,正成为助力数…

iPhone手机怎么识别藏文?藏语翻译通App功能介绍:藏文OCR识别提取文字

在工作学习的过程中,遇到不会的藏文,也不知道怎么把文字打出来,这个时候可以试试《藏语翻译通》App的图片识别功能,支持拍照识别和图片识别,拍一拍就能提取藏文文字,并支持一键翻译和复制分享。 跟着小编的…

汽车免拆诊断案例 | 2017 款林肯大陆车发动机偶尔无法起动

故障现象 一辆2017款林肯大陆车,搭载2.0T发动机,累计行驶里程约为7.5万km。车主进厂反映,有时按下起动按钮,起动机不工作,发动机无法起动,组合仪表点亮正常;多次按下起动按钮,发动机…

01大学物理电磁篇 静电场

5-6 静电场的环路定理 电势能 5-7电势 5-8电场强度与电势梯度

背部筋膜炎最有效的治疗方法

背部筋膜炎症状:背部筋膜炎引起的疼痛通常是钝痛或酸痛,且这种疼痛是无菌性炎症产生的炎症因子、疼痛因子刺激局部神经引起的。疼痛主要发生在腰背部,特别是两侧腰肌和髂嵴上方可能会更加明显。长时间不活动或活动过度都可能诱发疼痛。疼痛可…