管理流创建schema流程源码解析

news2025/1/10 16:28:14

一、简析

schema是pulsar重要的功能之一,现在就一起从源码的视角看下管理流创建schema时客户端和服务端的表现
在这里插入图片描述

客户端

客户端主要经历以下四个步骤

  1. 创建Schema实例

    根据数据类型创建相对应的实例,例如Avro创建AvroSchema、JSON创建JSONSchema等

  2. 获取处理Schema的对象

    管理流PulsarAdmin对象获取SchemasImpl对象,这个对象是专门处理所有schema相关的操作。除此之外PulsarAdmin对象还维护着Clusters、Brokers、Tenants等等管理维护集群的重要对象,通过这些对象可以很好的管理维护Pulsar集群

  3. 构造SchemaInfo

    通过Schema实例创建其对应的SchemaInfo信息,里面就包括这个schema的名字、schema的结构化信息、schema类型等等,最后SchemaInfo这个对象会转成字符串发到服务端

  4. 发送HTTP请求

    通过post请求将数据发到服务端,这里是通过Java Rest库javax.ws.rs-api进行处理的

服务端

服务端主要经历以下四个步骤

  1. 参数格式校验

    • 校验租户、命名空间的是否有效(判空、是否有特殊字符)
    • 从缓存中根据Topic获取其对应的TopicName对象
  2. 权限校验

    • 判断是否是Topic的owner
    • 判断当前用户是否有操作当前Topic的权限
  3. SchemaRegistry注册schema

    SchemaRegistryService是服务端处理所有schema相关的对象,而schema相关的读写操作是依赖它的成员SchemaStorage进行处理的,SchemaStorage的最终是通过Bookkeeper客户端对象发送写请求

  4. 写Bookkeeper

    通过LedgerHandle对象向Bookkeeper服务端发送写请求

小结

schame新建流程概括起来就是,客户端构造schema信息,服务端负责schema校验,bookkeeper负责schema的存储

二、客户端源码解析

源码跟踪

下面是通过管理流创建schema的样例代码,核心就是通过PulsarAdmin.schemas获取schema对象,这个schema对象负责所有客户端跟schema相关的操作,包括schema的增删改查等。通过方法的第二个参数可以看到是通过Schema接口提供的静态方法AVRO来构造Avro格式的schema对象,除此之外Schema接口还提供了诸如JSON、KeyValue、PROTOBUF等静态方法提供对应数据格式的schema对象,这里如果将这块构造schema对象逻辑抽成简单工厂模式可能会更合适些
在这里插入图片描述

接下来就进入createSchema方法,顾名思义可以知道这个方法就是用于创建schema的,第一个参数是topic,第二个参数是SchemaInfo对象,这个对象包含了所有要新建的schema信息,这里会将它转换为PostSchemaPayload对象传递给下一个方法。PostSchemaPayload是用来请求到服务端的参数
在这里插入图片描述

这个方法并不会有返回值,sync方法是处理异步结果对象,它在正常写成功情况下不会做任何操作,但如果有什么错误会往外抛出异常。这里核心逻辑是在createSchemaAsync方法
在这里插入图片描述

可以看到这个方法的返回值是个异步对象,146行这里会获取当前topic对应的TopicName对象,并通过schemaPath方法构造WebTarget对象,这个对象中就包含着要请求的HTTP地址,主要是根据当前Topic的版本来决定请求服务端哪个版本的处理方法。除此之外还可以看到有通过Entity.json方法将PostSchemaPayload对象转换为HTTP请求的参数对象,转换逻辑是javax.ws.rs-api这个网络库封装的,就不进行跟踪了
在这里插入图片描述

这里就是客户端最后发送的地方,request方法中还会发送前的安全相关检查,async方法基本上就说明本次HTTP请求是异步的,而post方法也能看得出,这是一个POST类型的HTTP请求,再往后就是将请求发送出去了
在这里插入图片描述

不知是否有人好奇参数WebTarget长什么样子,通过通过调试可以看到值为

/admin/v2/schemas/public/test-namespace-jytixthzgatgirem/test-multi-version-schema-one/schema

此值仅供学习参考,具体这个值的构造逻辑如下
在这里插入图片描述

小结

简单归纳如下

  • 通过Schema接口构造对应数据格式的schema对象,由此对象可得到schema相关的元信息SchemaInfo
  • 构造请求目标的HTTP地址
  • 通过javax.ws.rs-api提供的库发送异步HTTP请求到服务端

三、服务端源码解析

源码跟踪

服务端的接收逻辑在SchemasResource类,这个类在org.apache.pulsar.broker.admin包下,这个包下全是处理管理流相关的操作,如果有做pulsar平台化需求的,这个包下的相关逻辑值得一读。

再来看看postSchema方法,首先是validateTopicName方法,这个方法就是对入参进行判空、是否有特殊字符做检查;接下来就是核心方法postSchemaAsync,通过方法名可以推断出这是个异步处理schema写请求的方法
在这里插入图片描述

postSchemaAsync方法看似复杂,实际上核心的就是133行,其余的方法大概说一下,validateOwnershipAndOperationAsync方法主要检查当前用户是否有新建schema的操作权限,getSchemaCompatibilityStrategyAsyncWithoutAuth方法相对复杂一些,放到后面详细讲解。那么再看回133行,其中getSchemaRegistryService方法获取的是SchemaRegistryServiceImpl对象,顾名思义可以知道Pulsar的SchemaResistry相关的功能都是由它进行处理,现在先看它的putSchemaIfAbsent方法
在这里插入图片描述

SchemaStorage对象是SchemaRegistryServiceImpl的核心成员,负责schema存储相关的操作。在新建schema时会调用它的put方法进行创建;这里有个trimDeletedSchemaAndGetList方法,如果put方法在创建schema时有任何异常,则此方法会去删除该新建的schema,避免写"一半"的情况发生,某种意义上这也是一种回滚的设计。
在这里插入图片描述

这里的getAll方法很重要,会根据schema的id来查询是否已经存在当前schema,有的话则将版本号加1。处理完之后就调用put方法
在这里插入图片描述

这里没什么逻辑,继续往下跟踪
在这里插入图片描述

getSchemaLocator方法会构造LocatorEntry对象,调用putSchema
在这里插入图片描述

由于是初次创建schema,因此直接走到337行;如果这个topic已经创建过schema则会读取之前的schema信息再新增,同时把版本号自增

在这里插入图片描述

在这里可以看得到构造IndexEntry对象,这是消息的索引对象,后续用来加速查询schema
在这里插入图片描述

这个方法的内容就很眼熟了(bookkeeper相关内容),createLedger方法会先创建这个Ledger
在这里插入图片描述

在576行可以看到最终调用bookkeeper创建这个Ledger
在这里插入图片描述

再来看看addEntry方法,这里核心也是调用bookkeeper的ledgerHandle进行数据写入
在这里插入图片描述

这个方法是属于Bookkeeper客户端的逻辑了,通过方法注释可以看到,这个方法负责将数据异步写入到一个打开的Ledger。Bookkeeper相关的逻辑后续在单独写post进行讲解
在这里插入图片描述

小结

简单归纳如下

  1. 参数格式校验、操作权限校验
  2. 查询当前Topic是否已经创建过schema,有则以插入时版本号自增
  3. 如果是初次创建Schema,则调用bookkeeper创建Ledger
  4. 往这个Schema对应的Ledger内插入schema元数据信息

四、其他

序列化对象创建流程

现在再专门来看看序列化对象的创建过程,回到开头管理流创建schema的地方,Schema.AVRO方法是咱们本次要看的
在这里插入图片描述

通过注释可以看到,此静态方法是创建一个Avro类型的schema对象,getDefaultImplementation方法是获取实现类(饿汉单例设计模式),而newAvroSchema方法才是本次要看的
在这里插入图片描述

继续往下跟踪
在这里插入图片描述

获取对应处理的类加载器,并通过对应的类加载器创建AvroSchema实例
在这里插入图片描述

54行是核心,其他的都是赋值操作
在这里插入图片描述

super调用父类构造函数做赋值操作,还是继续看
在这里插入图片描述

继续跟踪parse逻辑
在这里插入图片描述

FACTORY.createParser方法是jackson的方法,用于创建JsonParser对象的;因此继续跟踪parse方法
在这里插入图片描述

1471行可以看到返回了我们想要的Schema对象,那么Schema.parse方法就是重中之重
在这里插入图片描述

这个方法是核心,本身会递归的进行解析赋值给schema对象
在这里插入图片描述

相信读者读到这里也好奇schema长什么样,因此提供下图让读者感受下,能大概推测得出来这里已经涵盖了schema的结构信息了
在这里插入图片描述

getSchemaCompatibilityStrategyAsyncWithoutAuth方法

AdminResource#getSchemaCompatibilityStrategyAsyncWithoutAuth方法是在服务端处理schema创建请求阶段会调用的方法,现在就一起跟踪看看

731行和739行分别是获取Topic级别和Namespace级别的schema兼容策略,如果没有定义则默认自动更新。例如Topic A之前已经创建过schema1,那么如果此时再发起schema2创建请求,则服务端会继续保存并且生效schema2,只不过它的版本号会进行累加,当然,也可以配置为不支持schema策略不支持更新,一旦确定了后就不允许再变更
在这里插入图片描述

五、总结

相信大家对schema创建的流程已经很清楚了,再次简单归纳下

  1. 客户端根据用户定义的结构信息创建对应的Schema对象,并将结构信息以HTTP请求发给服务端
  2. 服务端检测并根据Schema兼容策略做相对应的处理,一般情况下会调用Bookkeeper创建Ledger以及Entry
  3. Bookkeeper将此Schema数据持久化到磁盘,相当于Schema信息会被Bookkeeper当作一条消息进行存储

这基本上就是全部内容,当然细节感兴趣的小伙伴可以自行跟踪代码,相信你会有更多收获~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1969288.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

1.1、centos stream 9安装Kubernetes v1.30集群 环境说明

最近正在学习kubernetes,买了一套《Kubernetes权威指南 从Docker到Kubernetes实践全接触(第六版)》这本书讲得很好,上下两册,书中k8s的版本是V1.29,目前官网最新版本是v1.30。强烈建议大家买一套看看。 Kubernetes官网地址&#x…

jenkins使用docker api配置自签证书 +发布项目

配置证书 1、创建目录/etc/docker/certs, 在该目录下执行下列命令 openssl genrsa -aes256 -out ca-key.pem 4096 openssl req -new -x509 -days 3650 -key ca-key.pem -sha256 -out ca.pemopenssl genrsa -out server-key.pem 4096 \ openssl req -subj "/…

常见的应急救援设备有哪些_鼎跃安全

在我们的生活中,应急事件的发生常常是突如其来的,它们对人民的生命财产安全构成重大威胁,同时也对社会稳定提出严峻挑战。在这样的紧急情况下,迅速开展有效的救援工作显得尤为重要。而在整个救援过程中,应急设备的使用…

【简历】湘南某二本学院:前端简历指导,秋招面试通过率低

注:为保证用户信息安全,姓名和学校等信息已经进行同层次变更,内容部分细节也进行了部分隐藏 简历说明 这是一份25届二本同学的前端简历,但是这个简历,因为学校是个二本的专业,虽然说主体是在小公司&#x…

计算机基础(Windows 10+Office 2016)教程 —— 第6章 电子表格软件Excel 2016(下)

电子表格软件Excel 2016 6.4 Excel 2016的公式与函数6.4.1 公式的概念6.4.2 公式的使用6.4.3 单元格的引用6.4.4 函数的使用6.4.5 快速计算与自动求和 6.5 Excel 2016的数据管理6.5.1 数据排序6.5.2 数据筛选6.5.3 分类汇总6.5.4 分组显示6.5.5 合并计算 6.6 Excel 2016的图表6…

什么品牌的开放式耳机好用?南卡、韶音、漫步者 三款口碑超群机型横评

现如今耳机几乎成为了日常标配,因为选择合适的耳机成为我们不可忽视的需求。开放式耳机凭借其既能沉浸于高品质音乐,又能保持对周围环境的敏锐感知的独特优势,在市场中脱颖而出,尤其受到运动爱好者及追求生活品质的朋友们的喜爱。…

风吸杀虫灯采用新型技术 无公害诱虫捕虫

TH-FD2S】风吸杀虫灯利用害虫的趋光性和对特定波长的光源(如紫外光、蓝光)的敏感性,通过光波引诱害虫成虫扑灯。同时,内置的风扇产生强烈的气流,形成负压区,将害虫迅速吸入到收集器中。害虫在收集器内被风干…

排序算法:快速排序,golang实现

目录 前言 快速排序 代码示例 1. 算法包 2. 快速排序代码 3. 模拟程序 4. 运行程序 5. 从大到小排序 快速排序的思想 快速排序的实现逻辑 1. 选择基准值 (Pivot) 2. 分区操作 (Partition) 3. 递归排序 循环次数测试 假如 10 条数据进行排序 假如 20 条数据进行…

从入门到自动化:一篇文章掌握Python的80%

Python作为一种高级编程语言,以其简洁明了的语法和强大的功能性,在全球编程社区内享有极高的声誉。本文将带领你从Python的基础语法入手,介绍其常用库的应用,以及如何将Python用于数据分析、网络爬虫和简单的自动化任务&#xff0…

模板(c++)part2

目录 1.非类型模板参数 2.特化 2.1函数模板特化 2.2类模板特化 2.2.1全特化 2.2.2偏特化 3.模板分离编译 1.非类型模板参数 注意&#xff0c;假如 #define N 10 template<class T> class A { private:T a[N]; }; 这样的一个类模板&#xff0c;a数组的大小是定死的 …

canvas绘制表格

canvas绘制表格 最近在为公司产品做技术预研&#xff0c;经理让用canvas做一个表格&#xff0c;于是就有了这篇博客。 我们的数据是后端通过MQTT推送过来的 我在代码中也直接使用了 具体MQTT的实现代码&#xff0c;可见博客 在vue使用MQTT 在这里为了方便实用我直接封装成组件…

【中项第三版】系统集成项目管理工程师 | 第 11 章 规划过程组⑥ | 11.15 - 11.17

前言 第11章对应的内容选择题和案例分析都会进行考查&#xff0c;这一章节属于10大管理的内容&#xff0c;学习要以教材为准。本章上午题分值预计在15分。 目录 11.15 规划资源管理 11.15.1 主要输入 11.15.2 主要工具与技术 11.15.3 主要输出 11.16 估算活动资源 11.1…

安装jdk和tomcat

安装nodejs 1.安装nodejs&#xff0c;这是一个jdk一样的软件运行环境 yum -y list installed|grep epel yum -y install nodejs node -v 2.下载对应的nodejs软件npm yum -y install npm npm -v npm set config .....淘宝镜像 3.安装vue/cli command line interface 命令行接…

轻松搞定 Nginx 在 CentOS 和 Ubuntu 上的安装与配置

注&#xff1a;这是对我以前博客进行优化后再次发布的&#xff0c;博客中的截图为以前的。原博客已删除。 如何安装nginx nginx是一款开源、高性能的Web和反向代理服务器&#xff0c;支持HTTP、HTTPS、SMTP、POP3和IMAP协议。由于其轻量级、资源占用少和强大的并发能力&#…

时空预测又爆火了!新SOTA实现零样本精准预测

时空预测又有新突破啦&#xff01;港大、华南理工等提出了时空大模型UrbanGPT&#xff0c;在性能上猛超现有SOTA&#xff0c;实现零样本即可时空预测&#xff01; 另外还有清华的首个通用城市时空预测模型UniST、能即插即用快速适配的时空提示调整机制FlashST...这些效果非常ni…

探索计算器存储器的奥秘:数字记忆的科学

在日常生活中&#xff0c;我们经常使用计算器来执行各种数学运算。但你是否曾想过&#xff0c;当按下每个按键时&#xff0c;计算器是如何记住数字和运算符的&#xff1f;本文将深入探讨计算器存储器的工作原理&#xff0c;揭示其背后的科学原理。 引言&#xff1a;数字世界的…

家庭出游新风尚!格瑞维亚改装大赛创意实用并存

在创新浪潮翻涌的当下&#xff0c;汽车已蜕变为个性化生活的璀璨舞台&#xff0c;格瑞维亚改装共创大赛便是这一变革的推动者。这场大赛&#xff0c;不仅汇聚了400余支创意团队的心血结晶&#xff0c;更将汽车改装的魅力推向了新的高度。它不仅仅是对机械与美学的重塑&#xff…

STM32——EXIT外部中断

一、中断系统 以上就是中断的概念&#xff0c;简单理解就是&#xff1a; 当程序运行过程中&#xff0c;如果有中断源向CPU打报告&#xff0c;CPU就会暂停手下的事情去处理中断源提交的事情&#xff0c;然后处理完了在返回到CPU原来的位置继续处理手上的事情。如果同时有多个中…

浏览器指纹技术:如何更改浏览器指纹?

“指纹信息”是一个人独有的身份象征&#xff0c;而“浏览器指纹”&#xff0c;就是网站和在线平台使用浏览器指纹来收集有关您的浏览器、设备和网络的详细信息&#xff0c;它可以说是你上网的身份象征&#xff0c;可让网站跟踪您的在线行为。 下面我们简单科普浏览器指纹的工…

tomato 靶场

1.主机发现 扫描ip及端口 2.端口扫描 nmap192.168.233.131 有三个开放的端口nmap -sC -sV -O 192.168.233.131 -sC常见漏洞脚本扫描 -sV开放端口服务/版本号 -O操作系统探测 3.目录扫描 DIRECTORY: http://192.168.233.131/antibot_image/ http://192.168.233.131/index.h…