【天衍系列 01】深入理解Flink的 FileSource 组件:实现大规模数据文件处理

news2024/11/24 6:25:23

文章目录

  • 01 基本概念
  • 02 工作原理
  • 03 数据流实现
  • 04 项目实战
    • 4.1 项目结构
    • 4.2 maven依赖
    • 4.3 StreamFormat读取文件数据
    • 4.4 BulkFormat读取文件数据
    • 4.5 使用小结
  • 05 数据源比较
  • 06 总结

01 基本概念

Apache Flink 是一个流式处理框架,被广泛应用于大数据领域的实时数据处理和分析任务中。在 Flink 中,FileSource 是一个重要的组件,用于从文件系统中读取数据并将其转换为 Flink 的数据流。本文将深入探讨 FileSource 的工作原理、用法以及与其他数据源的比较。

02 工作原理

FileSource 是 Flink 提供的一种用于从文件系统中读取数据的源。它能够处理各种类型的文件,包括文本文件、压缩文件、序列文件等。FileSource 的工作原理可以概括为以下几个步骤:

1.文件分配(File Assignment)

在 Flink 集群中,每个任务都会负责读取文件的一个分片。FileSource 会根据文件的大小和数量将文件分配给不同的任务进行处理。

2.并行读取(Parallel Reading)

每个任务会并行地读取分配给它的文件分片。这意味着文件中的数据会被同时读取,从而提高了整体的读取速度和处理效率。

3.数据解析(Data Parsing)

读取的数据会经过解析器进行解析,将其转换为 Flink 中的数据结构,如 DataSet 或 DataStream。

4.数据分发(Data Distribution)

解析后的数据会被分发到后续的算子中进行进一步的处理和分析。

03 数据流实现

  • 有界流(Bounded Streams)

    有界流是指具有明确结束点的数据流,即数据流在某个时刻会结束,数据量是有限的。例如,从静态文件、数据库或有限数据集中读取的数据流就是有界流。有界流的特点包括:

    • 数据量是有限的,流的结束点是已知的。
    • 可以对整个数据流进行批处理式的分析和处理,因为所有数据都可用且有限。
    • 可以使用批处理算法和优化技术,例如排序、分组聚合等。
  • 无界流(Unbounded Streams)

    无界流是指没有明确结束点的数据流,即数据流会持续不断地产生,数据量可能是无限的。例如,实时传感器数据、日志流、消息队列中的数据等都是无界流。无界流的特点包括:

    • 数据源持续不断地产生数据,流没有明确的结束点。
    • 通常用于实时流式处理,要求系统能够实时处理数据并在流中进行持续的分析和计算。
    • 需要采用流式处理的技术和算法,例如窗口计算、流式聚合、事件时间处理等。
  • 不同数据流实现

    • 创建一个 File Source 时, 默认情况下,Source 为有界/批的模式;

      //创建一个FileSource数据源,并设置为批模式,读取完文件后结束
      final FileSource<String> source = FileSource.forRecordStreamFormat(...)
              .build();
      
    • 设置参数monitorContinuously(Duration.ofMillis(5)) 可以把 Source 设置为持续的流模式

      //创建一个FileSource数据源,并设置为流模式,每隔5分钟检查路径新文件,并读取
      final FileSource<String> source = FileSource.forRecordStreamFormat(...)
              .monitorContinuously(Duration.ofMillis(5))  
              .build();   
      

04 项目实战

1.FileSource支持多种数据格式数据读取与解析,本期以Text File文件为例展开。
2.jdk版本11
3.Flink版本1.18.0
4.下面是两个简单的示例代码,演示如何在 Flink 中使用 FileSource 读取文件数据

4.1 项目结构

在这里插入图片描述

4.2 maven依赖

<!-- flink读取Text File文件依赖 start-->
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-files</artifactId>
	<version>1.18.0</version>
</dependency>
<!-- flink读取Text File文件依赖 end-->

<!-- flink基础依赖 start -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.18.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.18.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.18.0</version>
</dependency>

<!-- flink基础依赖 end -->

4.3 StreamFormat读取文件数据

  • StreamFormat从文件流中读取文件内容。它是最简单的格式实现, 并且提供了许多拆箱即用的特性(如 Checkpoint 逻辑),但是限制了可应用的优化(例如对象重用,批处理等等)。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;

/**
 * 描述:
 * flink集成FileSource & forRecordStreamFormat使用 & 流模式
 * StreamFormat:从文件流中读取文件内容。它是最简单的格式实现,
 * 并且提供了许多拆箱即用的特性(如 Checkpoint 逻辑),
 * 但是限制了可应用的优化(例如对象重用,批处理等等)。
 *
 * @author 浅夏的猫
 * @version 1.0.0
 * @date 2024-02-07 15:30:22
 */
public class FileSourceRecordStreamingJob {

    public static void main(String[] args) throws Exception {

        // 创建 需要读取的文件路径Path
        Path path = new Path("D:\\flink\\file_source.txt");

        // 创建 读取文件的格式函数
        TextLineInputFormat textLineInputFormat = new TextLineInputFormat();

        // 创建 FileSource
        FileSource<String> fileSource = FileSource.
                forRecordStreamFormat(textLineInputFormat, path)
                //放开注释则使用流模式,每隔5分钟检查是否有新文件否则默认使用批模式
//                .monitorContinuously(Duration.ofMillis(5))
                .build();

        // 创建 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 添加 FileSource 到数据流
        env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "FileSource").print();

        // 执行任务
        env.execute("FileSourceRecordStreamingJob");
    }
}

4.4 BulkFormat读取文件数据

  • BulkFormat从文件中一次读取一批记录,虽然是最 “底层” 的格式实现,但是提供了优化实现的最大灵活性。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.impl.StreamFormatAdapter;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;


/**
 * 描述:flink集成FileSource & forBulkFileFormat使用 & 流模式
 * BulkFormat:从文件中一次读取一批记录。 它虽然是最 “底层” 的格式实现,但是提供了优化实现的最大灵活性。
 *
 * @author 浅夏的猫
 * @version 1.0.0
 * @date 2024-02-07 15:30:22
 */
public class FileSourceBulkStreamingJob {

    public static void main(String[] args) throws Exception {

        //创建 批量读取文件的格式函数,其实底层还是通过对单行文件读取
        BulkFormat<String, FileSourceSplit> bulkFormat = new StreamFormatAdapter<>(new TextLineInputFormat());

        // 创建 FileSource
        FileSource<String> fileSource = FileSource.
                forBulkFileFormat(bulkFormat, new Path("D:\\flink\\file_source.txt"))
                //放开注释则使用流模式,每隔5分钟检查是否有新文件,否则默认使用批模式
//                .monitorContinuously(Duration.ofMillis(5))
                .build();

        // 创建 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 添加 FileSource 到数据流
        env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "FileSource").print();

        // 执行任务
        env.execute("FileSourceBulkStreamingJob");
    }
}

4.5 使用小结

在上面的示例中,我们使用FileSource方法从指定路径读取文本文件,并将其转换为一个数据流,选择不同的输入格式和解析方式,然后我们调用 print 方法将数据流中的数据打印出来。

05 数据源比较

FileSource 是 Flink 中常用的数据源之一,与其他数据源相比,它具有一些优势和劣势,根据实际情况和需求,可以选择不同的数据源来满足任务的要求。

  • 优势

    • 支持读取大规模的文件数据,适用于大数据处理场景。
    • 支持并行读取和处理,能够充分利用集群资源,提高处理效率。
    • 支持多种文件格式和压缩方式,灵活性强。
  • 劣势

    • 对于小文件的处理效率较低,可能会导致资源浪费和性能下降。
    • 无法实时监控文件的变化,需要手动触发重新读取。

06 总结

FileSource 是 Apache Flink 中用于读取文件数据的重要组件,它能够高效地处理大规模的文件数据,并提供丰富的功能和灵活的用法。通过深入了解 FileSource 的工作原理和用法,可以更好地利用 Flink 来实现大规模数据文件的处理和分析任务。

通过以上详细介绍,可以对 Apache Flink 中的 FileSource 有一个全面的了解,从而更好地应用于实际的数据处理项目中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1457598.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

报表开发工具DevExpress .NET Reporting v23.2亮点 - 支持智能标签

DevExpress Reporting是.NET Framework下功能完善的报表平台&#xff0c;它附带了易于使用的Visual Studio报表设计器和丰富的报表控件集&#xff0c;包括数据透视表、图表&#xff0c;因此您可以构建无与伦比、信息清晰的报表。 DevExpress Reporting控件日前正式发布了v23.2…

STM32学习·HAL库·STM32CubeMX系列学习(安装和使用)

目录 ​编辑 1. STM32CubeMX简介 2. STM32CubeMX安装 2.1 STM32CubeMX软件获取 2.1.1 获取Java软件 2.1.2 STM32CubeMX软件获取 2.2 搭建Java运行环境 2.3 安装STM32CubeMX软件 2.4 下载和关联STM32cube固件包 1. STM32CubeMX简介 STM32CubeMX 是 ST 微电子公…

Go 是否有三元运算符?Rust 和 Python 是怎么做的?

嗨&#xff0c;大家好&#xff01;本文是系列文章 Go 技巧第十四篇&#xff0c;系列文章查看&#xff1a;Go 语言技巧。 今天来聊聊在 Go 语言中是否支持三元运算符。这个问题很简单&#xff0c;没有。 首先&#xff0c;什么是三元运算符&#xff1f; 在其他一些编程语言中&a…

The Sandbox NFT 概览与数据分析

作者&#xff1a;stellafootprint.network 编译&#xff1a;cicifootprint.network 数据源&#xff1a;The Sandbox NFT Collection Dashboard Sandbox NFT 系列包括独特的体素资产和 LAND 地块&#xff0c;使所有者能够在 The Sandbox 元宇宙中构建、玩虚拟体验并从中获…

【VSCode】设置 一键生成vue模板 的快捷入口

问题 每次写一个组件的时候&#xff0c;都需要去手敲默认结构或者是复制粘贴&#xff0c;十分的麻烦&#xff01; 解决办法 文件 > 首选项 > 用户代码片段 > vue.json 配置vue模板 其中prefix是用来触发代码段的内容&#xff0c;即模版的快捷入口&#xff1b;body里…

红帽认证——步入优质职场的第一步

在当今数字化时代&#xff0c;掌握先进的技术和技能是开启成功职业生涯的关键。红帽认证课程将为你提供这样的机会&#xff0c;帮助你成为一名具备实际操作能力的专业人士。Redhat&#xff0c;红帽公司是全球知名的开源技术厂家&#xff0c;领先的开源解决方案供应商。Linux有很…

Python Flask高级编程之RESTFul API前后端分离(学习笔记)

Flask-RESTful是一个强大的Python库&#xff0c;用于构建RESTful APIs。它建立在Flask框架之上&#xff0c;提供了一套简单易用的工具&#xff0c;可以帮助你快速地创建API接口。Flask-RESTful遵循REST原则&#xff0c;支持常见的HTTP请求方法&#xff0c;如GET、POST、PUT和DE…

Datawhale零基础入门金融风控Task1 赛题理解

Task1 赛题理解 Tip:本次新人赛是Datawhale与天池联合发起的0基础入门系列赛事第四场 —— 零基础入门金融风控之贷款违约预测挑战赛。 赛题以金融风控中的个人信贷为背景&#xff0c;要求选手根据贷款申请人的数据信息预测其是否有违约的可能&#xff0c;以此判断是否通过此项…

Office2019安装冲突解决方法 ErrorCode 30182-392

问题描述 挂载安装Office 2019安装镜像后直接安装会出现如下的错误&#xff1a; 问题原因在于Office 365与Offfice2019版本号相同&#xff08;均为16.0&#xff09;官方页面-各Office版本号 解决办法 解决方法就是利用官方部署工具进行安装&#xff0c;绕过版本冲突问题 …

ansible剧本中的角色

1 roles角色 1.1 roles角色的作用&#xff1f; 可以把playbook剧本里的各个play看作为一个角色&#xff0c;将各个角色打的tasks任务、vars变量、template模版和copy、script模块使用的相关文件等内容放置在指定角色的目录里统一管理&#xff0c;在需要的时候可在playbook中使…

从可靠性的角度理解 tcp

可靠性是 tcp 最大的特点。常见的用户层协议&#xff0c;比如 http, ftp, ssh, telnet 均是使用的 tcp 协议。可靠性&#xff0c;即从用户的角度来看是可靠的&#xff0c;只要用户调用系统调用返回成功之后&#xff0c;tcp 协议栈保证将报文发送到对端。引起不可靠的表现主要有…

【conda环境 安装 tensorflow2.2】 解决方案

1.检查anaconda安装&#xff1a;在cmd输入 conda --version 2.检测已经安装的环境&#xff1a;conda info --envs 3.新建一个python3.5的环境&#xff0c;tensorflow&#xff1a; ###conda create -n xxx python3.5 xxx为虚拟环境名 ###conda create -n xxx python3.6 xxx为虚拟…

【求职】搜狗2016 C++笔试题

1.关于重载和多态正确的是&#xff1f; A.如果父类和子类都有相同的方法,参数个数不同,将子类对象赋给父类后,由于子类继承于父类,所以使用父类指针调用父类方法时,实际调用的是子类的方法; B.选项全部都不正确 C.重载和多态在C面向对象编程中经常用到的方法,都只在实现子类…

使用智能电销机器人,拓客效果更佳!

现在很多的企业做销售都离不开电话营销&#xff0c;它是一种能够直接帮助企业获取更多利润的营销模式&#xff0c;目前被各大行业所采用。 znyx222 了解探讨 电话营销是一个压力很大的职业&#xff0c;新员工培养难度大、老员工又不好维护&#xff0c;会有情绪问题出现等&…

Redis篇----第七篇

系列文章目录 文章目录 系列文章目录前言一、Redis 的回收策略(淘汰策略)?二、为什么 edis 需要把所有数据放到内存中?三、Redis 的同步机制了解么?四、Pipeline 有什么好处,为什么要用 pipeline?前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍…

Java 21 新特性的扫盲级别初体验

一、前言 JDK 21 于 2023 年 9 月发布&#xff0c;作为目前讨论热度最高的JDK&#xff0c;虽然大家都开玩笑说你发任你发&#xff0c;我用Java8&#xff0c;但是作为一个Javaer&#xff0c;对JDK21的新特性还是要有所了解的。 以下是 JDK 21 的新功能列表&#xff1a; 虚拟线…

初阶数据结构之---导论,算法时间复杂度和空间复杂度(C语言)

说在整个初阶数据结构开头 数据结构其实也学了挺长时间了&#xff0c;说着是要刷题所以才没怎么去写关于数据结构方面的内容。数据结构作为计算机中及其重要的一环&#xff0c;如果不趁着假期系统整理一下着实可惜&#xff0c;我这里构想的是将初阶数据结构和高阶数据结构&…

Servlet原理学习

一、网站架构和Servlet技术体系架构 1.网站架构 现在的网站架构分为 B/S架构和C/S的架构两种。 这种“B/S”结构有很多好处&#xff0c;维护和升级方式更简单&#xff0c;客户端是浏览器&#xff0c;基本不需要维护&#xff0c;只需要维护升级服务器端就可以&#xff0c; C/S结…

[AudioRecorder]iPhone苹果通话录音汉化破解版-使用巨魔安装-ios17绕道目前还不支持

首先你必须有巨魔才能使用&#xff01;&#xff01; 不会安装的&#xff0c;还没安装的移步这里&#xff0c;ios17 以上目前装不了&#xff0c;别看了&#xff1a;永久签名 | 网址分类目录 | 路灯iOS导航-苹果签名实用知识网址导航-各种iOS技巧-后厂村路灯 视频教程 【Audio…

并发List、Set、ConcurrentHashMap底层原理

并发List、Set、ConcurrentHashMap底层原理 ArrayList: List特点&#xff1a;元素有放入顺序&#xff0c;元素可重复 存储结构&#xff1a;底层采用数组来实现 public class ArrayList<E> extends AbstractList<E>implements List<E>, RandomAccess, Clon…