Flink Source 详解

news2024/11/16 9:58:22

Flink Source 详解

原文

flip-27
FLIP-27 介绍了新版本Source 接口定义及架构

相比于SourceFunction,新版本的Source更具灵活性,原因是将“splits数据获取”与真“正数据获取”逻辑进行了分离
在这里插入图片描述

重要部件

Source 作为工厂类,会创建以下两个重要部件

  1. SplitEnumerator

    • 通过createEnumerator创建

    • SplitEnumerator 响应request split请求

      • handleSplitRequest
    • 工作在SourceCoordinator (官方描述如下),可以理解为在JobMaster上运行一个单线程的逻辑,所以需要跟在worker上的reader通过rpc通信

      Where to run the enumerator
      There was a long discussion about where to run the enumerator which we documented in the appendix. The final approach we took was very similar to option 3 with a few differences. The approach is following.
      
      Each SplitEnumerator will be encapsulated in one SourceCoordinator. If there are multiple sources, multiple SourceCoordinator will there be. The SourceCoordinators will run in the JobMaster, but not as part of the ExecutionGraph. In this FLIP, we propose to failover the entire execution graph when the SplitEnumerator fails. A finer grained enumerator failover will be proposed in a later FLIP.
      
  2. SourceReader

    • 通过createReader创建

    • 工作在worker

    • 由于单独实现SourceReader过于复杂,官方抽象了3种比较通用的模型供开发者使用,MySqlSourceReader就是继承了SingleThreadMultiplexSourceReaderBase

      1. Sequential Single Split (File, database query, most bounded splits)
      2. Multi-split multiplexed (Kafka, Pulsar, Pravega, …)
      3. Multi-split multi-threaded (Kinesis, …)
        在这里插入图片描述

      在这里插入图片描述

      在这里插入图片描述

    • 使用了抽象后的类,开发者的关注点集中在实现一个SplitReader

      public interface SplitReader<E, SplitT extends SourceSplit> {
      
          RecordsWithSplitIds<E> fetch() throws InterruptedException;
      
          void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);
      
          void wakeUp();
      }
      
      1. fetch 获取数据,这里是包含了split信息的record
      2. 响应split改变
      3. 唤醒
  3. RecordEmitter

    1. The RecordsWithSplitIds returned by the SplitReader will be passed to an RecordEmitter one by one.
    2. The RecordEmitter is responsible for the following:
      • Convert the raw record type into the eventual record type
      • Provide an event time timestamp for the record that it processes.
    3. 在 emitRecord 方法中实现

由于通信使用mail风格的rpc(单线程串行),所以响应函数需要保证非阻塞,所以后面可以看到无论enumerator还是reader的最终响应都是在异步线程池中

Non-blocking progress methods, to it supports running in an actor/mailbox/dispatcher style operator

MysqlSource 举例

以flink cdc中的MysqlSource来举例分析

  1. MysqlSource
    • 通过 createEnumerator 创建 MySqlSourceEnumerator

      • 初始化调用start
        • 调用splitAssigner.open()
          • splitAssigner 是获取/分配split动作的真正实现
            • 创建异步线程,填充remainingSplits
      • handleSplitRequest 响应空闲worker的请求
        • assignSplits
          • splitAssigner.getNext()
            • 从 remainingSplits 拿一个可用的split
      • 调用 context.assignSplit 发送 AddSplitEvent
      • MySqlSourceEnumerator 中 splitAssigner 的实现说明
        • splitAssigner 默认实现是 MySqlHybridSplitAssigner
          • hybrid的含义,启动分为两个步骤 1. 读取全量数据 2. 全量数据读取完毕后读取增量数据。将两种模式混合在一起被称为hybird。所以MySqlSnapshotSplitAssigner可以创建两种split
            1. 通过MySqlSnapshotSplitAssigner创建存量数据的split
              • 在读取存量数据时通过chunkSplitter切分为多个split,之后分发给多个reader并行读取
                • chunkSplitter 通过 chunkKey 的范围将存量数据切分
                • 用户可以手动设置chunkKey,否则使用主key作为chunkKey,切分split
            2. 通过 createBinlogSplit 创建增量数据的split
              • 只assign一次binlog的split
              • 只能分发给一个reader,所以在进入增量模式后flink实际所有并行度上只有一个source有数据
                在这里插入图片描述
    • 通过 createReader 创建 MySqlSourceReader

      • 创建 SingleThreadFetcherManager 传入 elementQueue splitReaderSupplier
        • elementQueue: io线程和主线程公用队列,io线程写,主线程读
        • splitReaderSupplier: split reader的工厂
        • SingleThreadFetcherManager 启动后创建线程池
      • sourceOperator 收到 AddSplitEvent 调用 sourceReader.addSplits 这里 sourceReader 是 MySqlSourceReader
        • readerBase 中会调用 splitFetcherManager.addSplits(splits);
          • 由于使用的是 SingleThreadFetcherManager,所以addSplits中永远看到只同时存在一个fetcher
            • fetcher 初始化时加入默认任务 FetchTask 构造的时候传入 elementQueue 传入构造好的 splitReader
            • fetcher addSplits时加入任务 AddSplitsTask
            • fetcher 启动时调用 startFetcher
              • 调用 executors.submit(fetcher); 提交到线程池
              • 线程池中运行 runOnce
                • FetchTask 调用 splitReader.fetch() 获取records 写入 elementQueue
      • 主线程 SourceReaderBase 中的 pollNext 会被框架调用
        • 调用 getNextFetch
          • elementsQueue.poll() 取得 records
            在这里插入图片描述

其他

在Flink CDC 3.0 中

Flink Composer 中使用 WatermarkStrategy.noWatermarks()

 return env.fromSource(
                            sourceProvider.getSource(),
                            WatermarkStrategy.noWatermarks(),
                            sourceDef.getName().orElse(generateDefaultSourceName(sourceDef)),
                            new EventTypeInfo())
                    .setParallelism(sourceParallelism);

很合理,因为pipeline的定义中不会出现聚合函数 window函数

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2241428.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

路漫漫其修远兮,吾将上下而求索---第一次使用github的过程记录和个人感受

文章目录 1.仓库位置2.新建仓库3.配置仓库4.克隆和上传5.推荐文章和我的感受 1.仓库位置 这个仓库的位置就是在我们的这个个人主页的右上角&#xff1b;如果是第一次注册账号的话&#xff0c;这个主页里面肯定是不存在仓库的&#xff0c;需要我们自己手动的进行创建&#xff1…

npm list -g --depth=0(用来列出全局安装的所有 npm 软件包而不显示它们的依赖项)

您提供的命令 npm list -g --depth0 是在 Node Package Manager (npm) 的上下文中使用的&#xff0c;用来列出全局安装的所有 npm 软件包而不显示它们的依赖项。 这是它的运作方式&#xff1a; npm list -g --depth0-g: 指定列表应包括全局安装的软件包。--depth0: 限制树形结…

tdengine学习笔记

官方文档&#xff1a;用 Docker 快速体验 TDengine | TDengine 文档 | 涛思数据 整体架构 TDENGINE是分布式&#xff0c;高可靠&#xff0c;支持水平扩展的架构设计 TDengine分布式架构的逻辑结构图如下 一个完整的 TDengine 系统是运行在一到多个物理节点上的&#xff0c;包含…

K8S单节点部署及集群部署

1.Minikube搭建单节点K8S 前置条件&#xff1a;安装docker&#xff0c;注意版本兼容问题 # 配置docker源 wget https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo -O /etc/yum.repos.d/docker-ce.repo# 安装docker环境依赖 yum install -y yum-utils device-m…

以往运维岗本人面试真题分享

以下是本人面试运维岗的一些面试经历&#xff0c;在此做个记录分享 目录 TCP/IP三次握手 IPtables IPtables四表五链都是什么&#xff1f; nat端口如何做&#xff1f; 开放本机的80端口该如何做&#xff1f; 如何在单用户模式下引导Centos&#xff1f; nginx轮询模式都有…

STM32 串口输出调试信息

软硬件信息 CubeMX version 6.12.1Keil uVision V5.41.0.0 注意 串口有多种&#xff1a; TTL232485 串口的相关知识&#xff1a; 01-【HAL库】STM32实现串口打印&#xff08;printf方式) &#xff0c; 内含 TTL 和 232 区别。 我把 232 串口连进 STM32 串口助手收到的信息…

Python 三种方式实现自动化任务

在这篇文章中&#xff0c;我们将介绍一些用Python实现机器人过程自动化的包。机器人流程自动化&#xff08;Robotic process automation&#xff0c;简称RPA&#xff09;是指将鼠标点击和键盘按压自动化的过程&#xff0c;即模拟人类用户的操作。RPA用于各种应用程序&#xff0…

Android ART知多少?

Android 虚拟机 ART&#xff08;Android Runtime&#xff09;是 Android 平台上的应用程序运行时环境&#xff0c;用于执行应用程序的字节码。ART 自 Android 5.0&#xff08;Lollipop&#xff09;开始取代了 Dalvik&#xff0c;成为 Android 的默认运行时环境。本文将从以下几…

Vulnhub靶场 Billu_b0x 练习

目录 0x00 准备0x01 主机信息收集0x02 站点信息收集0x03 漏洞查找与利用1. 文件包含2. SQL注入3. 文件上传4. 反弹shell5. 提权&#xff08;思路1&#xff1a;ssh&#xff09;6. 提权&#xff08;思路2&#xff1a;内核&#xff09;7. 补充 0x04 总结 0x00 准备 下载链接&#…

软间隔支持向量机支持向量的情况以及点的各种情况

软间隔支持向量 ​ 这一节我们要回答的问题是&#xff1f;如何判断一个点是软间隔支持向量机中的支持向量&#xff0c;在硬间隔支持向量机中&#xff0c;支持向量只需要满足一个等式&#xff1a; y i ( w T x i b ) − 1 0 y_i(w^Tx_i b) -1 0 yi​(wTxi​b)−10 ​ 在软间…

PCA 原理推导

针对高维数据的降维问题&#xff0c;PCA 的基本思路如下&#xff1a;首先将需要降维的数据的各个变量标准化&#xff08;规范化&#xff09;为均值为 0&#xff0c;方差为 1 的数据集&#xff0c;然后对标准化后的数据进行正交变换&#xff0c;将原来的数据转换为若干个线性无关…

在Ubuntu 24.04 LTS上安装飞桨PaddleX

前面我们介绍了《在Windows用远程桌面访问Ubuntu 24.04.1 LTS》本文接着介绍安装飞桨PaddleX。 PaddleX 3.0 是基于飞桨框架构建的一站式全流程开发工具&#xff0c;它集成了众多开箱即用的预训练模型&#xff0c;可以实现模型从训练到推理的全流程开发&#xff0c;支持国内外多…

Web_前端_HTML入门学习的案例案例1

HTML入门学习的案例 来源: HTML入门学习的案例_给学生讲html内容案例-CSDN博客 案例1&#xff1a;hello.html <html><body><title>html技术</title></body><body>hello</body> </html>&#xff08;但是有乱码&#xff09; …

【C#】C#编程入门指南:构建你的.NET开发基础

文章目录 前言&#xff1a;1. C# 开发环境 VS的基本熟悉2. 解决方案与项目的关系3. 编辑、编译、链接、运行4. 托管代码和CLR4.1 CLR&#xff1a;4.2 C# 代码第编译过程&#xff08;两次编译的&#xff09; 5. 命名空间6. 类的组成与分析7. C# 的数据类型7.1 值类型7.2 引用类型…

115页PDF | 埃森哲_XX集团信息化能力成熟度评估及能力提升方案(限免下载)

一、前言 这份报告是埃森哲_XX集团信息化能力成熟度评估及能力提升方案&#xff0c;报告首先分析了集团的战略规划&#xff0c;包括调整优化期、转型升级期和跨越发展期的目标&#xff0c;然后识别了集团面临的内部挑战和外部压力&#xff0c;如管控体系不完善、业务板块多样化…

面试时问到软件开发原则,我emo了

今天去一个小公司面试&#xff0c;面试官是公司的软件总监&#xff0c;眼镜老花到看笔记本电脑困难&#xff0c;用win7的IE打开leetcode网页半天打不开&#xff0c;公司的wifi连接不上&#xff0c;用自己手机热点&#xff0c;却在笔记本电脑上找不到。还是我用自己的手机做热点…

Wi-Fi背后的工作原理与技术发展历程介绍【无线通信小百科】

1个视频说清楚WIFI&#xff1a;频段/历程/技术参数/常用模块 智能手机拥有率越来越高的今天&#xff0c;大家已经习惯了通过无线网络上网的方式。除了在外面需要用手机流量&#xff0c;我们通常在家里或者机场&#xff0c;商场都可以通过Wi-Fi连接上网。本期文章将为大家介绍Wi…

【MySQL 保姆级教学】详细讲解视图--(15)

视图 1. 为什么要有视图&#xff1f;2.视图的定义和特点3. 创建视图4. 视图的使用举例4.1 创建表并插入数据4.2 举例 5. 视图和基表之间有什么联系呢&#xff1f; 1. 为什么要有视图&#xff1f; 当我们频繁地使用用多表查询和复合查询出的结果时&#xff0c;就需要频繁的使用…

Python中的HTTP协议

文章目录 一. 网址URL二. HTTP协议1. HTTP协议的概念2. HTTP协议的作用3. HTTP请求报文与响应报文① HTTP请求报文Ⅰ. GET请求报文格式Ⅱ. GET请求报文分析Ⅲ. POST请求报文格式Ⅳ. POST请求报文分析Ⅴ. GET与POST请求报文总结 ② HTTP响应报文Ⅰ. HTTP响应报文格式Ⅱ. HTTP响应…

108. UE5 GAS RPG 实现地图名称更新和加载关卡

在这一篇里&#xff0c;我们将实现对存档的删除功能&#xff0c;在删除时会有弹框确认。接着实现获取玩家的等级和地图名称和存档位置&#xff0c;我们可以通过存档进入游戏&#xff0c;玩家在游戏中可以在存档点存储存档。 实现删除存档 删除存档需要一个弹框确认&#xff0…