Flink源码解析(一、source原理)

news2025/1/21 21:56:58

文章目录

  • 背景
  • 逻辑原理
    • connector架构
    • sql处理阶段
  • 代码实例
  • 代码debug
  • 参考文献

背景

source/sink 是flink最核心的部分之一,通过对其实现原理的学习,结合源码debug,有助于加深对框架处理过程的理解,以及架构设计上的提升。

逻辑原理

如果我们对自己对接一个数据源,核心的话就是连接器connector,比如关系型数据库就是jdbc。

connector架构

flink官方connector的架构如下
在这里插入图片描述

  • MetaData
    将 sql create source table 转化为实际的 CatalogTable,对应代码RelNode
  • Planning
    创建 RelNode 的过程中使用 SPI 将所有的 source(DynamicTableSourceFactory)\sink(DynamicTableSinkFactory) 工厂动态加载,获取到 connector = kafka,然后从所有 source 工厂中过滤出名称为 kafka 并且 继承自 DynamicTableSourceFactory.class 的工厂类 KafkaDynamicTableFactory,使用 KafkaDynamicTableFactory 创建出 KafkaDynamicSource
  • Runtime
    KafkaDynamicSource 创建出 FlinkKafkaConsumer,负责flink程序实际运行。

sql处理阶段

因为文章采用flink sql作为实例,所以先了解下sql在集群中经历的大致步骤,后续结合源码有助理解。
在这里插入图片描述
从图中可以看出,一段查询 SQL / 使用TableAPI 编写的程序(以下简称 TableAPI 代码)从输入到编译为可执行的 JobGraph 主要经历如下几个阶段

  1. 将 SQL文本 / TableAPI 代码转化为逻辑执行计划(Logical Plan)
  2. Logical Plan 通过优化器优化为物理执行计划(Physical Plan)
  3. 通过代码生成技术生成 Transformations 后进一步编译为可执行的 JobGraph 提交运行

代码实例

版本flink-1.13.1

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class KafkaSourceTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
        tEnv.executeSql(
                "CREATE TABLE KafkaSourceTable (\n"
                        + "  `f0` STRING,\n"
                        + "  `f1` STRING\n"
                        + ") WITH (\n"
                        + "  'connector' = 'kafka',\n"
                        + "  'topic' = 'topic',\n"
                        + "  'properties.bootstrap.servers' = 'localhost:9092',\n"
                        + "  'properties.group.id' = 'testGroup',\n"
                        + "  'format' = 'json'\n"
                        + ")"
        );
        Table t = tEnv.sqlQuery("SELECT * FROM KafkaSourceTable");
        tEnv.toAppendStream(t, Row.class).print();
        env.execute();
    }
}

代码debug

  1. 从tEnv.sqlQuery方法断点进入在这里插入图片描述

  2. 解析sql语法
    后面回根据解析返回的操作表类型创建对应的Table
    在这里插入图片描述

  3. parse主要工作
    获取语法解析器parser,查询计划实现类planner。 将sql语句解析成生成AST抽象语法树SqlNode(实际SqlSelector),之后调用convert转换方法。
    在这里插入图片描述

  4. convert处理
    首先validate验证SqlNode的正确性。
    在这里插入图片描述
    之后根据sql kind为QUERY进入converter.convertSqlQuery方法
    在这里插入图片描述

  5. convertSQLQuery处理
    生成逻辑计划,作用是SqlNode–>RelNode。
    在这里插入图片描述

  6. rel方法
    调用sqlToRelConverter.convertQuery方法。
    在这里插入图片描述
    真正的实现是在 convertQueryRecursive() 方法中完成的。
    在这里插入图片描述
    实际根据kind调用convertSelect方法
    在这里插入图片描述

  7. 调用convertIdentifier
    这中间过程省略一部分,实际调用到convertIdentifier方法。参数BlackBoard是对select进行转换时的一个临时工作空间,可以临时记录下转换过程中需要的信息,比如select依赖的scope、当前的root节点、当前节点是否是top节点等。这里还会创建CatalogSourceTable 类,此类继承自 FlinkPreparingTableBase,负责将 Calcite 的 RelOptTable转化为TableSourceTable
    在这里插入图片描述

  8. toRel
    这里会根据指定的connector,创建对应的tableSource,就和我们connector架构部分关联上了。发现 tableSource 已经是 KafkaDynamicSource。另外可以发现创建table source参数catalogTable,包含了所有 sql create source table 中信息的 catalogTable 变量传入了。
    在这里插入图片描述

  9. createDynamicTableSource
    使用 SPI 将所有的 source(DynamicTableSourceFactory)\sink(DynamicTableSinkFactory) 工厂动态加载,然后根据factoryClass过滤出KafkaDynamicTableFactory
    在这里插入图片描述
    10.createTableSource
    使用 kafka 工厂对象创建出 kafka source。在这里插入图片描述

  10. 获取format
    进入factory具体实现可以看到 KafkaDynamicTableFactory.createDynamicTableSource 中调用 KafkaDynamicTableFactory.createKafkaTableSource 来创建 KafkaDynamicSource。 另外这里还有一个重要点就是获取key value反序列化schema
    在这里插入图片描述
    在这里插入图片描述
    spi机制获取factory后,通过参数中的format=json过滤。
    在这里插入图片描述


参考文献

https://developer.aliyun.com/article/765311
https://cloud.tencent.com/developer/article/1864657

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/40606.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

详细介绍 Oracle中的Materialized Views(物化视图/快照)

A materialized view (snapshot) is a table segment whose contents are periodically refereshed based on query (against a local or remote table)(针对的本地或者远程表) The simplest way to achieve replication of data between sites for against remote tables: ma…

中断上下文和进程上下文

中断上下文 参考博客:(https://blog.csdn.net/AndroidBBC/article/details/81911065) 中断上半部分,中断触发;中断下半部分,中断执行。 中断执行一般有tasklet、工作队列实现 工作队列机制 工作队列所执行的中断代码会表现出进…

Linux | 第一篇——常见指令汇总【超全、超详细讲解】

Linux之常见指令🌳前言💻操作系统的概念💻Linux的使用环境介绍🌳基本指令汇总一、【whoami】指令二、【pwd】指令三、【mkdir】指令四、【touch】指令五、【ls】指令1、拓展:文件的概念2、命令 - 命令选项六、【cd】指…

AtCoder Beginner Contest 260 G.Scalene Triangle Area(花式二维差分/二维线段树)

题目 n*n的网格(n<2e3)&#xff0c; 每个网格内的字符是O或者X&#xff0c;其中O表示(i,j)上有一个棋子&#xff0c;X表示没有 位于(s,t)棋子覆盖住了方格(u,v)&#xff0c;当且仅当&#xff1a; 1. 2. 3. q(q<2e5)次询问&#xff0c;第i次给出一个方格位置(xi…

如何给在 SAP Business Application Studio 里开发的 OData 服务准备测试数据试读版

在开始本步骤的学习之前&#xff0c;请大家务必完成前一步骤1. SAP Business Application Studio 里创建一个基于 CAP 模型的最简单的 OData 服务的学习。换言之&#xff0c;大家已经在 SAP Business Technology Platform 上的 Business Application Studio 里&#xff0c;创建…

Python实现基于Optuna超参数自动优化的xgboost回归模型(XGBRegressor算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 Optuna是一个开源的超参数优化(HPO)框架&#xff0c;用于自动执行超参数的搜索空间。 为了找到最佳的超…

docker 部署多个前端vue项目

文章目录一、docker 部署前端项目方案1. 方案12. 方案2二、Nginx配置运行2.1. 拉取nginx镜像2.2. 创建配置目录2.3. 创建Nginx容器三、部署前端项目3.1. 压缩3.2. 上传3.3. 验证附录index.html50x.htmlnginx.conf一、docker 部署前端项目方案 1. 方案1 一个docker容器对应一个…

java中“冷门”工具类的总结

文章目录前言一些不常用的工具类不可变集合多值MapTable表Lists、Maps、Sets字符串操作BagLazyList双向Map并发集合小总结CopyOnWriteArrayListConcurrentHashMap前言 最近挖掘了一些在项目中不常用的工具类&#xff0c;有些实用性还是很高的&#xff0c;特此总结一下。 另外又…

mysql中的这些日志,你都知道吗 2?

上一篇文章&#xff0c;我们介绍了binlog和redo log这两种日志&#xff0c;对这两种日志不熟悉的老铁可以看下"mysql中的这些日志&#xff0c;你都知道吗",在上篇文章的末尾&#xff0c;作者还留了一个问题&#xff1a;binlog 和 redo log两个相互独立的日志模块&…

kube-ovn安装与卸载

1.环境准备 Kube-OVN 是一个符合 CNI 规范的网络组件&#xff0c;其运行需要依赖 Kubernetes 环境及对应的内核网络模块。 以下是通过测试的操作系统和软件版本&#xff0c;环境配置和所需要开放的端口信息。1.1 软件版本 Kubernetes > 1.16 且 < 1.24&#xff0c;推荐…

RK3568平台开发系列讲解(音频篇)Android AudioRecord 采集音频

🚀返回专栏总目录 文章目录 一、Android 平台的音频采集技术选型1.1、SDK 层提供的采集方法1.2、NDK 层提供的采集方法二、AudioRecord 采集音频沉淀、分享、成长,让自己和他人都能有所收获!😄 📢 Android 平台音频采集的技术选型,在 SDK 层和 NDK 层各有两套音频采集…

【MySQL】MySQL参数调优与实战详解(调优篇)(实战篇)(MySQL专栏启动)

&#x1f4eb;作者简介&#xff1a;小明java问道之路&#xff0c;专注于研究 Java/ Liunx内核/ C及汇编/计算机底层原理/源码&#xff0c;就职于大型金融公司后端高级工程师&#xff0c;擅长交易领域的高安全/可用/并发/性能的架构设计与演进、系统优化与稳定性建设。 &#x1…

dataloader重构与keras入门体验

原创文章第117篇&#xff0c;专注“个人成长与财富自由、世界运作的逻辑&#xff0c; AI量化投资”。 北京疫情昨天突破4000&#xff0c;社会面800。看来三天的预期过于乐观了&#xff0c;不知道如何发展。如同资本市场的短期走势&#xff0c;没有人可以预测。但往前看三年&am…

Python性能优化

正文 python为什么性能差&#xff1a; 当我们提到一门编程语言的效率时&#xff1a;通常有两层意思&#xff0c;第一是开发效率&#xff0c;这是对程序员而言&#xff0c;完成编码所需要的时间&#xff1b;另一个是运行效率&#xff0c;这是对计算机而言&#xff0c;完成计算任…

Elasticsearch 8.X DSL 如何优化更有助于提升检索性能?

1、企业级实战 DSL&#xff08;数据已经脱敏&#xff09; 2、大家可以看一下&#xff0c;能发现哪些问题&#xff1f; 根据我的实战和咨询经验&#xff0c;我发现如下几个问题。当然&#xff0c;这是在和球友交流确认问题之后总结出来的。2.1 问题1&#xff1a;bool 组合嵌套过…

动态加载布局的技巧

文章目录动态加载布局的技巧使用限定符使用最小宽度限定符动态加载布局的技巧 使用限定符 在平板上面大多数时候采用的双页的模式,程序会在左侧列表上显示一个包含子项列表,右侧的面版会显示详细的内容的因为平板具有足够大的屏幕.完全能够显示两页的内容.但是在手机上手机只能…

事业编招聘:雄安新区公开选聘专业骨干人才

河北雄安新区公开选聘专业骨干人才公告 根据河北雄安新区建设发展工作需要&#xff0c;决定面向全国机关、企事业单位选聘部分专业骨干人才&#xff0c;现将有关事项公告如下&#xff1a; 一、选聘计划 共选聘20名专业骨干人才。 二、选聘范围 全国机关和企事业单位工作人员。…

Java包装类

为什么有包装类&#xff1f; 在Java中&#xff0c;基本数据类型不是继承自Object&#xff0c;为了在泛型中可以支持基本数据类型&#xff0c;Java给每个基本数据类型都对应了一个包装类。【基本数据类型不符合面向对象思想&#xff0c;基本类型不是对象&#xff0c;从而基本数据…

机器学习中的数学原理——梯度下降法(最速下降法)

好久没更新了&#xff0c;确实是有点懒了&#xff0c;主要是这两天返乡在隔离&#xff08;借口&#xff09;。这个专栏主要是用来分享一下我在机器学习中的学习笔记及一些感悟&#xff0c;也希望对你的学习有帮助哦&#xff01;感兴趣的小伙伴欢迎私信或者评论区留言&#xff0…

开放一批PCB资源(二)

这些板卡&#xff0c;都已经停产&#xff0c;现其PCB和原理图对外开放&#xff0c;都是cadence格式。 有需要的加我微信联系。&#xff08;微信&#xff1a;18633364981&#xff09; 这是开放的第二批&#xff0c;后续还有。这一批的价格象征性的收费每个 2000元。 这些板卡…