49、Flink 的数据源的 SplitReader API 详解

news2025/1/13 15:53:12
SplitReader API
a)概述

核心的 SourceReader API 是完全异步的,但实际上,大多数 Sources 都会使用阻塞的操作,例如客户端(如 KafkaConsumer)的 poll() 阻塞调用,或者分布式文件系统(HDFS, S3等)的阻塞I/O操作;为了使其与异步 Source API 兼容,这些阻塞(同步)操作需要在单独的线程中进行,并在之后将数据提交给 reader 的异步线程。

SplitReader 是基于同步读取/轮询的 Source 的高级(high-level)API,例如 file source 和 Kafka source 的实现等

核心是上面提到的 SourceReaderBase 类,其使用 SplitReader 并创建提取器(fetcher)线程来运行 SplitReader,该实现支持不同的线程处理模型。

b)SplitReader

SplitReader API 只有以下三个方法:

  • 阻塞式的提取 fetch() 方法,返回值为 RecordsWithSplitIds。
  • 非阻塞式处理分片变动 handleSplitsChanges() 方法。
  • 非阻塞式的唤醒 wakeUp() 方法,用于唤醒阻塞中的提取操作。

SplitReader 仅需要关注从外部系统读取记录,因此比 SourceReader 简单得多。

c)SourceReaderBase

常见的 SourceReader 实现方式如下:

  • 有一个线程池以阻塞的方式从外部系统提取分片。
  • 解决内部提取线程与其他方法调用(如 pollNext(ReaderOutput))之间的同步。
  • 维护每个分片的水印(watermark)以保证水印对齐。
  • 维护每个分片的状态以进行 Checkpoint。

为了减少开发新的 SourceReader 所需的工作,Flink 提供了 SourceReaderBase 类作为 SourceReader 的基本实现。 SourceReaderBase 已经实现了上述需求,要重新编写新的 SourceReader,只需要让 SourceReader 继承 SourceReaderBase,而后完善一些方法并实现 SplitReader。

d)SplitFetcherManager

SourceReaderBase 支持几个开箱即用的线程模型,取决于 SplitFetcherManager 的行为模式; SplitFetcherManager 创建和维护一个分片提取器(SplitFetchers)池,同时每个分片提取器使用一个 SplitReader 进行提取,它还决定如何分配分片给分片提取器。

如下所示,一个 SplitFetcherManager 可能有固定数量的线程,每个线程对分配给 SourceReader 的一些分片进行抓取。

在这里插入图片描述

以下代码片段实现了此线程模型。

/**
 * 一个SplitFetcherManager,它具有固定数量的分片提取器,
 * 并根据分片ID的哈希值将分片分配给分片提取器。
 */
public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit> 
        extends SplitFetcherManager<E, SplitT> {
    private final int numFetchers;

    public FixedSizeSplitFetcherManager(
            int numFetchers,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
        super(elementsQueue, splitReaderSupplier);
        this.numFetchers = numFetchers;
        // 创建 numFetchers 个分片提取器.
        for (int i = 0; i < numFetchers; i++) {
            startFetcher(createSplitFetcher());
        }
    }

    @Override
    public void addSplits(List<SplitT> splitsToAdd) {
        // 根据它们所属的提取器将分片聚集在一起。
        Map<Integer, List<SplitT>> splitsByFetcherIndex = new HashMap<>();
        splitsToAdd.forEach(split -> {
            int ownerFetcherIndex = split.hashCode() % numFetchers;
            splitsByFetcherIndex
                    .computeIfAbsent(ownerFetcherIndex, s -> new ArrayList<>())
                    .add(split);
        });
        // 将分片分配给它们所属的提取器。
        splitsByFetcherIndex.forEach((fetcherIndex, splitsForFetcher) -> {
            fetchers.get(fetcherIndex).addSplits(splitsForFetcher);
        });
    }
}

使用这种线程模型的SourceReader可以像下面这样创建:

public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, SplitStateT>
        extends SourceReaderBase<E, T, SplitT, SplitStateT> {

    public FixedFetcherSizeSourceReader(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitFetcherSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(
                elementsQueue,
                new FixedSizeSplitFetcherManager<>(
                        config.getInteger(SourceConfig.NUM_FETCHERS),
                        elementsQueue,
                        splitFetcherSupplier),
                recordEmitter,
                config,
                context);
    }

    @Override
    protected void onSplitFinished(Map<String, SplitStateT> finishedSplitIds) {
        // 在回调过程中对完成的分片进行处理。
    }

    @Override
    protected SplitStateT initializedState(SplitT split) {
        ...
    }

    @Override
    protected SplitT toSplitType(String splitId, SplitStateT splitState) {
        ...
    }
}

SourceReader 的实现还可以在 SplitFetcherManagerSourceReaderBase 的基础上编写自己的线程模型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1810834.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

包装机故障排查与修复方法:确保生产线的稳定运行

在现代化的工业生产中&#xff0c;包装机作为生产线上的重要一环&#xff0c;其稳定运行直接关系到产品的质量和生产效率。然而&#xff0c;包装机在使用过程中难免会遇到各种故障&#xff0c;如何快速准确地排查并修复这些故障&#xff0c;确保生产线的稳定运行&#xff0c;成…

【Echarts系列】带图片的饼图

【Echarts系列】带图片的饼图 序前提说明示例数据格式代码动态旋转图片 序 为了节省后续开发学习成本&#xff0c;这个系列将记录我工作所用到的一些echarts图表。 前提说明 因为饼图中间需要添加图片&#xff0c;所以比较特殊&#xff0c;对于饼图中间数据的对齐很容易出现…

【npm】console工具(含胶囊,表格,gif图片)

这是一款控制台花样输出工具 相对丰富的输出方式 文本输出属性值输出胶囊样式输出表格输出图片输出&#xff08;含动图&#xff09; 安装 npm install v_aot引用 import v_aot from "v_aot";字段说明 字段类型属性字符串值字符串类型default 、 primary 、 suc…

Centos用自定义java安装包替换系统java版本

一.下载自定义安装java压缩包&#xff0c;比如 jdk-8u191-linux-x64.tar.gz 二.解压到/usr/java/ 下 tar -zxvf jdk-8u191-linux-x64.tar.gz 三.执行命令 update-alternatives --install /usr/bin/java java /usr/java/jdk1.8.0_191/bin/java 300 update-alternatives --i…

【全开源】旅游系统小程序(Uniapp+FastAdmin+ThinkPHP)

&#x1f308;暑假到来&#xff0c;旅游系统小程序助你畅游无忧&#xff01;&#x1f392; 旅游系统&#xff0c;包含消费者端&#xff08;手机端&#xff09;、机构工作人员&#xff08;手机端&#xff09;、机构端&#xff08;PC&#xff09;、平台管理端&#xff08;PC&…

Python基础速成

文件操作 文件读取 with open语法 文件写入 注意事项 文件追加 异常、模块与包 捕获异常 捕获方法 捕获传递 模块的导入与自定义 定义 导入方式 自定义 测试模块 注意事项 python包 定义 操作 第三方包 定义 pip指令安装包

实现思路:Vue 子组件高度不固定下实现瀑布流布局

实现思路&#xff1a;Vue 子组件高度不固定下实现瀑布流布局 一、瀑布流布局基础实现原理 在深入解说不定高度子组件的瀑布流如何实现之前&#xff0c;先大体说一下子组件高度固定已知的这种实现原理&#xff1a; 有一个已知组件高度的数组。定义好这个瀑布流的列数&#xff…

新概念英语视频百度云,新概念英语视频百度网盘,新概念1-4册

在现今数字化时代&#xff0c;英语学习资源丰富多样&#xff0c;其中新概念英语视频因其深入浅出的教学风格和丰富多样的学习内容&#xff0c;备受广大英语学习者的青睐。本文旨在为广大英语学习者提供一份详尽的新概念英语视频下载指南&#xff0c;帮助大家轻松获取优质学习资…

Macbook M芯片Homebrew与git的安装与配置

Macbook M芯片Homebrew与git的安装与配置 Homebrew的安装与配置 搜索Homebrew; 找到如下网址https://brew.sh/ 把以上命令复制到终端 执行后&#xff0c;发现并不能下载&#xff1b; 如果你像我一样也是不通的&#xff0c;可以使用国内源,将如下命令复制到终端&#xff1a;…

【C++题解】1750 - 有0的数

问题&#xff1a;1750 - 有0的数 类型&#xff1a;简单循环 题目描述&#xff1a; 请求出 1∼n 中含有数字 0 的数&#xff0c;有多少个&#xff1f; 输入&#xff1a; 一个整数 n&#xff08;n≤999&#xff09; 。 输出&#xff1a; 一个整数&#xff0c;代表 1∼n 中含…

关于Spring Security的CORS

目录 一、CORS是什么 二、同源安全策略 三、Spring Security中CORS的开启 四、其它处理方法 一、CORS是什么 CORS&#xff08;Cross-Origin Resource Sharing&#xff0c;跨源/域资源共享 &#xff09;是一个W3C标准&#xff0c;一种允许当前域&#xff08;domain&#xff…

苹果WWDC揭晓AI系统、电脑等设备系统全线更新,iPhone将接入ChatGPT

iOS 18可自定义主屏幕、可卫星发短信、钱包App新增点击付款Tap to Cash功能、照片App大改、支持RCS短信&#xff1b;iPad首次有原生计算器App&#xff0c;iPad结合Apple Pencil拥有数学笔记功能&#xff1b;新版macOS可通过Mac查看控制iPhone&#xff0c;Safari有AI驱动的高亮显…

Docker:安装 Orion-Visor 服务器运维的技术指南

请关注微信公众号&#xff1a;拾荒的小海螺 博客地址&#xff1a;http://lsk-ww.cn/ 1、简述 Orion-Visor 是一种用于管理和监控容器的工具。它提供了一个直观的界面&#xff0c;用于查看容器的状态、资源使用情况以及日志等信息。在这篇技术博客中&#xff0c;我们将介绍如何…

01.FastLED库基础

FastLED库基础 FastLED库HSV颜色 HSV颜色基本概念 HSV颜色简介 HSV(Hue, Saturation, Value)是根据颜色的直观特性由A. R. Smith在1978年创建的一种颜色表达方法。该方法中的三个参数分别是&#xff1a;色调&#xff08;H&#xff09;&#xff0c;饱和度&#xff08;S&#…

Jenkins三种构建类型

目录 传送门前言一、概念二、前置处理&#xff08;必做&#xff09;1、赋予777权限2、让jenkins用户拥有root用户的kill权限3、要运行jar包端口号需要大于1024 三、自由风格软件项目&#xff08;FreeStyle Project&#xff09;&#xff08;推荐&#xff09;三、Maven项目&#…

最全面又最浅显易懂的Langchain快速上手教程(下)

最全面又最浅显易懂的Langchain快速上手教程&#xff08;下&#xff09; 三. 深入Langchain 1. 架构设计 从上文知道Langchain在架构上使用了从抽象、到具体、再到整合适配的三层架构&#xff0c;这种一层一层逐渐具体的设计最大可能性的保证了架构的可扩展性和维护性。同时…

【Python】 探索 Python 中的 Ellipsis 对象:一个神奇的省略号

基本原理 在 Python 中&#xff0c;Ellipsis 对象是一个特殊的内置对象&#xff0c;它通常用三个连续的点 ... 来表示。这个对象在 Python 中有几个特定的用途&#xff0c;尤其是在切片操作和迭代器表达式中。虽然它看起来像是一个普通的省略号&#xff0c;但它实际上是 Pytho…

DeepSORT(目标跟踪算法)中自由度决定卡方分布的形状

DeepSORT&#xff08;目标跟踪算法&#xff09;中自由度决定卡方分布的形状 flyfish 重要的两个点 自由度决定卡方分布的形状&#xff08;本文&#xff09; 马氏距离的平方在多维正态分布下服从自由度为 k 的卡方分布 独立的信息 在统计学中&#xff0c;独立的信息是指数据…

MySQL的group by与count(), *字段使用问题

文章目录 问题group by到底做了什么举个例子简单来说为什么select字段&#xff0c;count()不能和*共同使用总结 问题 这是一段摘抄自MySQL官网的文字。其大致意思是MySQL拓展了group by的使用&#xff0c;MySQL允许选择没有出现在group by中的字段。换句话说&#xff0c;标准SQ…

覆盖路径规划经典算法 The Boustrophedon Cellular Decomposition 论文及代码详解

2000年一篇论文 Coverage of Known Spaces: The Boustrophedon Cellular Decomposition 横空出世&#xff0c;解决了很多计算机和机器人领域的覆盖路径问题&#xff0c;今天我来详细解读这个算法。 The Boustrophedon Cellular Decomposition 算法详解 这篇论文标题为"C…