【极数系列】Flink集成DataSource读取文件数据(08)

news2025/1/9 14:38:14

文章目录

  • 01 引言
  • 02 简介概述
  • 03 基于文件读取数据
    • 3.1 readTextFile(path)
    • 3.2 readFile(fileInputFormat, path)
    • 3.3 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
    • 3.4 实现原理
    • 3.5 注意事项
    • 3.6 支持读取的文件形式
  • 04 源码实战demo
    • 4.1 pom.xml依赖
    • 4.2 创建文件数据流作业
    • 4.3 运行程序查看日志

01 引言

源码地址,一键下载可用:https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:FlinkFileSourceJob(文件)

02 简介概述

1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。

2.Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。

3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。

03 基于文件读取数据

3.1 readTextFile(path)

读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。

3.2 readFile(fileInputFormat, path)

按照指定的文件输入格式读取(一次)文件。

3.3 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

这是前两个方法内部调用的方法。它基于给定的 fileInputFormat 读取路径 path 上的文件。根据提供的 watchType 的不同,source 可能定期(每 interval 毫秒)监控路径上的新数据(watchType 为 FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次当前路径中的数据然后退出(watchType 为 FileProcessingMode.PROCESS_ONCE)。使用 pathFilter,用户可以进一步排除正在处理的文件。

3.4 实现原理

底层Flink 将文件读取过程拆分为两个子任务,即 目录监控数据读取。每个子任务都由一个单独的实体实现。监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度和作业的并行度相等。单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于 watchType),找到要处理的文件,将它们划分为 分片,并将这些分片分配给下游 reader。Reader 是将实际获取数据的角色。每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。

3.5 注意事项

(1)如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,当一个文件被修改时,它的内容会被完全重新处理。这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。

(2)如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,source 扫描一次路径然后退出,无需等待 reader 读完文件内容。当然,reader 会继续读取数据,直到所有文件内容都读完。关闭 source 会导致在那之后不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。

3.6 支持读取的文件形式

1.本地文件

2.HDFS文件

3.文件夹

4.压缩文件

04 源码实战demo

4.1 pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xsy</groupId>
    <artifactId>aurora_flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--属性设置-->
    <properties>
        <!--java_JDK版本-->
        <java.version>11</java.version>
        <!--maven打包插件-->
        <maven.plugin.version>3.8.1</maven.plugin.version>
        <!--编译编码UTF-8-->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--输出报告编码UTF-8-->
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!--json数据格式处理工具-->
        <fastjson.version>1.2.75</fastjson.version>
        <!--log4j版本-->
        <log4j.version>2.17.1</log4j.version>
        <!--flink版本-->
        <flink.version>1.18.0</flink.version>
        <!--scala版本-->
        <scala.binary.version>2.11</scala.binary.version>
        <!--log4j依赖-->
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <!--通用依赖-->
    <dependencies>

        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!--================================集成外部依赖==========================================-->
        <!--集成日志框架 start-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <!--集成日志框架 end-->
    </dependencies>

    <!--编译打包-->
    <build>
        <finalName>${project.name}</finalName>
        <!--资源文件打包-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>org.google.code.flindbugs:jar305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <excluder>org.apache.logging.log4j:*</excluder>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <!--插件统一管理-->
        <pluginManagement>
            <plugins>
                <!--maven打包插件-->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                    <configuration>
                        <fork>true</fork>
                        <finalName>${project.build.finalName}</finalName>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>

                <!--编译打包插件-->
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>${maven.plugin.version}</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                        <encoding>UTF-8</encoding>
                        <compilerArgs>
                            <arg>-parameters</arg>
                        </compilerArgs>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!--配置Maven项目中需要使用的远程仓库-->
    <repositories>
        <repository>
            <id>aliyun-repos</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <!--用来配置maven插件的远程仓库-->
    <pluginRepositories>
        <pluginRepository>
            <id>aliyun-plugin</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

</project>

4.2 创建文件数据流作业

package com.aurora.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * @description flink的文件source应用
 * @author 浅夏的猫
 * @datetime 23:03 2024/1/28
*/
public class FlinkFileSourceJob {

    private static final Logger logger = LoggerFactory.getLogger(FlinkFileSourceJob.class);

    public static void main(String[] args) throws Exception {

        //1.创建Flink运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.设置Flink运行模式:
        //STREAMING-流模式,BATCH-批模式,AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式)
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //3.基于文件的source使用(本地/HDFS文件/文件夹/压缩文件)
        //3.1本地文件
        DataStreamSource<String> dataStreamSourceFile = env.readTextFile("E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties");

        //3.2 HDFS文件,前提你已经搭建环境
//        DataStreamSource<String> dataStreamSourceHdfs = env.readTextFile("hdfs://localhost:8020//source/application.txt");

        //3.3文件夹
        DataStreamSource<String> dataStreamSourceDir = env.readTextFile("E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources");


        //3.4压缩文件
        DataStreamSource<String> dataStreamSourceRar = env.readTextFile("E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\test.rar");

        //4.输出打印
        dataStreamSourceFile.print();
//        dataStreamSourceHdfs.print();
        dataStreamSourceDir.print();
        dataStreamSourceRar.print();

        //5.启动运行
        env.execute();
    }

}

4.3 运行程序查看日志

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1420105.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ROS学习笔记11——ROS中的重名问题

一、ros功能包重名——ros工作空间覆盖 功能包重名时&#xff0c;会按照 ROS_PACKAGE_PATH 查找&#xff0c;在前的会优先执行。ROS 会解析 .bashrc 文件&#xff0c;并生成 ROS_PACKAGE_PATH ROS包路径&#xff0c;即调用功能包的顺序&#xff0c;该变量中按照 .bashrc 中配置…

leetcode—跳跃游戏—贪心算法

1 跳跃游戏1 给你一个非负整数数组 nums &#xff0c;你最初位于数组的 第一个下标 。数组中的每个元素代表你在该位置可以跳跃的最大长度。 判断你是否能够到达最后一个下标&#xff0c;如果可以&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 示例 1&a…

图像畸变校正(2)

畸变校正是一种用于矫正图像或视频中的失真或畸变的技术。这种失真通常是由摄像头镜头的特性或角度造成的&#xff0c;可能会导致图像中的对象形状、大小或位置不准确。以下是畸变校正的一般方法&#xff1a; 摄像头模型建立&#xff1a; 首先&#xff0c;需要建立摄像头的模型…

【React教程】(1) React简介、React核心概念、React初始化

目录 ReactReact 介绍React 特点React 的发展历史React 与 Vue 的对比技术层面开发团队社区Native APP 开发 相关资源链接 EcmaScript 6 补充React 核心概念组件化虚拟 DOM 起步初始化及安装依赖Hello World React React 介绍 React 是一个用于构建用户界面的渐进式 JavaScrip…

海外云手机为什么吸引用户?

近年来&#xff0c;随着全球化的飞速发展&#xff0c;海外云手机逐渐成为各行各业关注的焦点。那么&#xff0c;究竟是什么让海外云手机如此吸引用户呢&#xff1f;本文将深入探讨海外云手机的三大吸引力&#xff0c;揭示海外云手机的优势所在。 1. 高效的社交媒体运营 海外云…

Android 13以上版本读写SD卡权限适配

如题&#xff0c;最近工作上处理的问题&#xff0c;把解决方案简单逻列出来&#xff0c;供有需要的朋友参考之 解决方案&#xff1a; 1、配置权限 <uses-permission android:name"android.permission.READ_MEDIA_IMAGES" /><uses-permission android:name&q…

PyQt5零基础入门(八)——按钮控件(QPushButton、QToolButton)

前言 按钮控件是图形用户界面(GUI)中常用的交互元素&#xff0c;用于触发特定的事件或行为。在Qt框架中&#xff0c;QPushButton和QToolButton是两种常用的按钮控件。 后边我们将以test.png为按钮图标&#xff0c;对比使用两种按钮控件。 普通按钮控件(QPushButton) QPushB…

禁止 ios H5 中 bounces 滑动回弹效果

在开发面向 iOS 设备的 HTML5 应用时&#xff0c;控制页面的滚动行为至关重要&#xff0c;特别是禁用在 Safari 中默认的滑动回弹效果。本文旨在提供一个简洁明了的解决方案&#xff0c;帮助开发者在特定的 Web 应用中禁用这一效果。 1. 什么是滑动回弹效果&#xff1f; 在 iO…

编写交互式 Shell 脚本

在日常的系统管理和自动化任务中&#xff0c;使用 Shell 脚本可以为我们节省大量时间和精力。 文章将以输入 IP 为例&#xff0c;通过几个版本逐步完善一个案例。 原始需求 编写一个交互式的 Shell 脚本&#xff0c;运行时让用户可以输入IP地址&#xff0c;并且脚本会将输入…

【高效开发工具系列】Wolfram Alpha

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

【RT-DETR有效改进】2024.1最新MFDS-DETR的HS-FPN改进特征融合层(降低100W参数,全网独家首发)

👑欢迎大家订阅本专栏,一起学习RT-DETR👑 一、本文介绍 本文给大家带来的改进机制是最近这几天最新发布的改进机制MFDS-DETR提出的一种HS-FPN结构,其是一种为白细胞检测设计的网络结构,主要用于解决白细胞数据集中的多尺度挑战。它的基本原理包括两个关键部分:特征…

“全”实力认可 | 美创科技领跑CCSIP 2023全景图数据安全领域

近日&#xff0c;FreeBuf咨询正式发布《CCSIP&#xff08;China Cyber Security Industry Panorama&#xff09;2023中国网络安全行业全景册&#xff08;第六版&#xff09;》。本次全景册面向广大国内安全厂商&#xff0c;由厂商自主申报并填写信息征集表&#xff0c;经FreeBu…

【俄乌之战】乌克兰声称280台俄罗斯服务器被毁,损失数据超2 PB

乌克兰报告针对俄罗斯政府关键基础设施和私营公司的多次网络攻击&#xff0c;导致全国范围内的中断和大量数据丢失。 乌克兰 HUR&#xff08;乌克兰国防部主要情报局&#xff09;的网络安全专家声称对IPL Consulting进行了成功的网络攻击。据报道&#xff0c;他们摧毁了整个IT…

uniapp H5 实现上拉刷新 以及 下拉加载

uniapp H5 实现上拉刷新 以及 下拉加载 1. 先上图 下拉加载 2. 上代码 <script>import DragableList from "/components/dragable-list/dragable-list.vue";import {FridApi} from /api/warn.jsexport default {data() {return {tableList: [],loadingHi…

SV-7041T 多媒体教学广播IP网络有源音箱

SV-7041T是深圳锐科达电子有限公司的一款2.0声道壁挂式网络有源音箱&#xff0c;具有10/100M以太网接口&#xff0c;可将网络音源通过自带的功放和喇叭输出播放&#xff0c;可达到功率30W。同时它可以外接一个30W的无源副音箱&#xff0c;用在面积较大的场所。5寸进口全频低音喇…

leetcode刷题(剑指offer) 50.Pow(x, n)

50.Pow(x, n) 实现 pow(x, n) &#xff0c;即计算 x 的整数 n 次幂函数&#xff08;即&#xff0c;xn &#xff09;。 示例 1&#xff1a; 输入&#xff1a;x 2.00000, n 10 输出&#xff1a;1024.00000示例 2&#xff1a; 输入&#xff1a;x 2.10000, n 3 输出&#x…

MGRE实验报告二

实验要求&#xff1a; 实验预览图&#xff1a; 实验分析&#xff1a; 1、对R1-R5配置IP地址&#xff0c;同时R1-R5每个路由器各有一个环回 2.1、对R1、R3、R4路由器开启虚拟接口1&#xff0c;分别配置隧道IP、接口封装协议&#xff0c;接口类型、定义封装源、开启伪广播功能&…

【教程】iOS如何抓取HTTP和HTTPS数据包经验分享

&#x1f4f1; 在日常的App开发和研发调研中&#xff0c;对各类App进行深入的研究分析时&#xff0c;我们需要借助专业的抓包应用来协助工作。本文将介绍如何使用iOS手机抓包工具来获取HTTP和HTTPS数据包&#xff0c;并推荐一款实用的抓包应用——克魔助手&#xff0c;希望能够…

vue3 + antd 封装动态表单组件(三)

传送带&#xff1a; vue3 antd 封装动态表单组件&#xff08;一&#xff09; vue3 antd 封装动态表单组件&#xff08;二&#xff09; 前置条件&#xff1a; vue版本 v3.3.11 ant-design-vue版本 v4.1.1 我们发现ant-design-vue Input组件和FormItem组件某些属性支持slot插…

uniapp如何添加多个表单数组?

目录 一、实现思路 二、实现步骤 ①view部分展示 ②JavaScript 内容 ③css中样式展示 三、效果展示 四、小结 注意事项 总结模板&#xff1a; 一、实现思路 1.在 data 中定义一个数组&#xff0c;用于存储表单项的数据 2.在模板中使用 v-for 指令渲染表单项 3.在 methods 中…