4.2、Flink任务怎样读取文件中的数据

news2025/1/11 6:21:21

目录

1、前言

2、readTextFile(已过时,不推荐使用)

3、readFile(已过时,不推荐使用)

4、fromSource(FileSource) 推荐使用


1、前言

思考: 读取文件时可以设置哪些规则呢?

         1. 文件的格式(txt、csv、二进制...)        

         2. 文件的分隔符(按\n 分割)

         3. 是否需要监控文件变化(一次读取、持续读取)

基于以上规则,Flink为我们提供了非常灵活的 读取文件的方法


2、readTextFile(已过时,不推荐使用)

语法说明:

定义:
    def readTextFile(filePath: String): DataStream[String]
    def readTextFile(filePath: String, charsetName: String)

功能:
    1.读取文本格式的文件
    2.按行读取(\n为分隔符),每行数据被封装为 DataStream 的一个元素
    3.可以指定字符集(默认为UDF-8)
    4.文件只会读取一次

源码分析:
    public DataStreamSource<String> readTextFile(String filePath, String charsetName) {

        // 初始化 TextInputFormat对象
        TextInputFormat format = new TextInputFormat(new Path(filePath));  
        // 指定路径过滤器(使用默认过滤器)
        format.setFilesFilter(FilePathFilter.createDefaultFilter());  
        // 指定Flink中的数据类型    
        TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO; 
        // 指定字符集
        format.setCharsetName(charsetName);     
                                   
        // 调用 readFile 方法
        return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); 
    }

代码示例:

    public static void readTextFile() throws Exception {
        /*
         * TODO 功能说明
         *   readTextFile(path) - 读取文本文件(一次读取),例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。
         * */
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.将文本文件作为数据源
        env.readTextFile("data/1.txt").setParallelism(4).print();

        // 3.触发程序执行
        env.execute();
    }

3、readFile(已过时,不推荐使用)

语法说明:

定义:
    def readFile[T: TypeInformation](
        inputFormat: FileInputFormat[T],
        filePath: String,
        watchType: FileProcessingMode,
        interval: Long): DataStream[T] = {
      val typeInfo = implicitly[TypeInformation[T]] // 隐私转换(将java 数据类型 转换为 Flink数据类型)
      asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo))
    }

参数:
    inputFormat : 指定 FileInputFormat 实现类(根据文件类型 选择相适应的实例)
    filePath    : 指定 文件路径
    watchType   : 指定 读取模式(提供了2个枚举值)
                       PROCESS_ONCE :只读取一次
                       PROCESS_CONTINUOUSLY :按照指定周期扫描文件
    interval    : 指定 扫描文件的周期(单位为毫秒)

功能:
    按照 指定的 文件格式 和 读取方式 读取数据
FileInputFormat 的实现类
FileInputFormat 的实现类

代码示例:

    public static void readFile() throws Exception {
        /*
         * TODO 功能说明
         *    readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。
         *    readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
         *       按照指定的文件输入格式读取(持续的读取)文件
         * */

        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.将文本文件作为数据源
        String filePath = "data/1.txt";

        TextInputFormat textInputFormat = new TextInputFormat(new Path(filePath));
        textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter()); // 指定过滤器
        textInputFormat.setCharsetName("UTF-8"); // 指定编码格式

        /*
         * readFile(inputFormat: FileInputFormat[OUT], filePath: String, watchType: FileProcessingMode, interval: Long)
         * 参数说明:
         *      @inputFormat : 指定文件输入格式
         *      @filePath    : 指定文件路径
         *      @watchType   : 指定监控类型,提供了两种读取策略
         *            PROCESS_ONCE : 只读取一次
         *            PROCESS_CONTINUOUSLY :持续读取,监控新增数据
         *      @interval : 指定连续扫描文件的周期(毫秒)
         * 重点提示:
         *      1.如果watchType设置为PROCESS_CONTINUOUSLY时,当一个文件被修改时,将会导致重新读取该
         *           文件的全部内容,这将会打破`精确一次`的语义
         * */
        env.readFile(
                textInputFormat
                , filePath
                , FileProcessingMode.PROCESS_CONTINUOUSLY
                , 1000
        ).print();

        // 3.触发程序执行
        env.execute();
    }

4、fromSource(FileSource) 推荐使用

    public static void FileSource() throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.将文本文件作为数据源
        FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat()
                , new Path("data/1.txt")).build();

        env.fromSource(fileSource
                , WatermarkStrategy.noWatermarks()
                , "read fileSource"
        ).print();

        // 3.触发程序执行
        env.execute();
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/855614.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

上海震坤行工业超市入选胡润百富“中国产业互联网30强”

上海震坤行工业超市入选胡润百富“中国产业互联网30强” &#xff08;2023年6月27日&#xff0c;广州&#xff09;全面提供全球最具潜力创业企业榜单的胡润研究院今日发布《2023胡润中国产业互联网30强》&#xff08;Hurun China IIoT Top 30 2023&#xff09;&#xff0c;榜…

利用Ettercap进行DNS欺骗攻击

一、域名系统&#xff08;DNS&#xff09; 域名系统DNS是Internet上使用的命名系统&#xff0c;用于将系统名称转换为人们易于使用的IP地址。域名系统是基于互联网的前身ARPANET开发的&#xff0c;在ARPANET时代&#xff0c;主机名和对应的IP地址是通过HOST.TXT文件集中管理的…

IPO观察丨困于门店扩张的KK集团,还能讲好增长故事吗?

KK集团发起了其IPO之路上的第三次冲击。 近日&#xff0c;KK集团更新了招股书&#xff0c;继续推进港交所上市进程&#xff0c;此前两次上市搁置后终于有了新动向。从更新内容来看&#xff0c;KK集团招股书披露了公司截至2023年一季度的最新业绩&#xff0c;交出一份不错的“成…

你怎么看这MyBatis-flex框架 ?(入门篇)

1、简介 在国内目前使用最多的ORM框架就是Mybatis-Plus&#xff0c;也不得不承认&#xff0c;Mybatis-Plus相对于JPA而言&#xff0c;也确实好用一些&#xff08;就个人而言&#xff09;&#xff0c;但是在国外JPA框架还是挺火的&#xff0c;因为JPA是一个完全的ORM框架&#x…

等了10年,工信部的APP备案终于来了

我是卢松松&#xff0c;点点上面的头像&#xff0c;欢迎关注我哦&#xff01; 2005年3月&#xff0c;工信部要求所有境内网站都要进行网站备案、公安备案。 2023年8月&#xff0c;工信部要求所有的APP、小程序进行备案。否则…… 这绝对是一个移动互联网创业分水岭&#xff0…

希尔排序——C语言andPython

前言 步骤 代码 C语言 Python 总结 前言 希尔排序&#xff08;Shell Sort&#xff09;是一种改进的插入排序算法&#xff0c;它通过将数组分成多个子序列进行排序&#xff0c;逐步减小子序列的长度&#xff0c;最终完成整个数组的排序。希尔排序的核心思想是通过排序较远距…

重温HashMap底层原理

目录 1.HashMap概述 2.JDK7与JDK8的HashMap区别 3.HashMap的主要方法分析 4.常见问题分析总结 1.HashMap概述 HashMap是使用频率最高的用于映射键值对(key和value)处理的数据类型。随着JDK版本的跟新&#xff0c;JDK1.8对HashMap底层的实现进行了优化&#xff0c;列入引入…

JTS Self-intersection异常TopologyException: side location conflict解决办法

JTS Self-intersection异常TopologyException: side location conflict解决办法 举例&#xff1a;问题围栏 MULTIPOLYGON (((114.0905685 32.1120567, 114.0905685 32.112957, 114.0905685 32.1138535, 114.0905685 32.1147537, 114.0905685 32.115654, 114.0905685 32.11655…

深入理解Go语言中的并发编程【30】【多路复用】

文章目录 多路复用 多路复用 操作系统级的I/O模型有&#xff1a; 阻塞I/O非阻塞I/O信号驱动I/O异步I/O多路复用I/O   Linux下&#xff0c;一切皆文件。包括普通文件、目录文件、字符设备文件&#xff08;键盘、鼠标&#xff09;、块设备文件&#xff08;硬盘、光驱&#xff…

【LeetCode】按摩师

按摩师 题目描述算法分析编程代码 链接: 按摩师 题目描述 算法分析 编程代码 class Solution { public:int massage(vector<int>& nums) {int n nums.size();if(n 0) return 0;vector<int> f(n);auto g f;f[0] nums[0];for(int i 1;i<n;i){f[i] g[i…

Winform中DatagridView 表头实现一个加上一个checkBox,实现全选选项功能

实现效果 点击checkBox1或者直接在第一列列表头点击即可实现 代码实现 我的datagridview叫dgv 我在datagridview已经默认添加了一个DataGridViewCheckBoxColumn&#xff0c;勾选时value为1&#xff0c;不勾选时value为0 第一种通过可视化拖动一个checkBox来实现 拖动组…

ElasticSearch单节点部署

&#x1f388; 作者&#xff1a;互联网-小啊宇 &#x1f388; 简介&#xff1a; CSDN 运维领域创作者、阿里云专家博主。目前从事 Kubernetes运维相关工作&#xff0c;擅长Linux系统运维、开源监控软件维护、Kubernetes容器技术、CI/CD持续集成、自动化运维、开源软件部署维护…

餐饮管理系统ssm酒店饭店仓库进销存jsp源代码mysql

本项目为前几天收费帮学妹做的一个项目&#xff0c;Java EE JSP项目&#xff0c;在工作环境中基本使用不到&#xff0c;但是很多学校把这个当作编程入门的项目来做&#xff0c;故分享出本项目供初学者参考。 一、项目描述 餐饮管理系统ssm 系统有1权限&#xff1a;管理员 二…

使用MethodInterceptor和ResponseBodyAdvice做分页处理

目录 一、需求 二、代码实现 父pom文件 pom文件 配置文件 手动注册SqlSessionFactory&#xff08;MyBatisConfig &#xff09; 对象 实体类Users 抽象类AbstractQuery 查询参数类UsersQuery 三层架构 UsersController UsersServiceImpl UsersMapper UsersMapper.…

C++项目:在线五子棋对战网页版--session管理模块开发

session 在WEB开发中&#xff0c;HTTP协议是⼀种⽆状态短链接的协议&#xff0c;这就导致⼀个客⼾端连接到服务器上之后&#xff0c;服务器不知道当前的连接对应的是哪个用户&#xff0c;也不知道客⼾端是否登录成功&#xff0c;这时候为客⼾端提所有服务是不合理的。因此&am…

微信昵称后面的“小耳朵”,原来有这么多用处,让我带你涨知识

微信昵称后面的“小耳朵”&#xff0c;原来有这么多用处&#xff0c;让我带你涨知识 大家都知道&#xff0c;在微信昵称后面加上一个"小耳朵"符号是一种常见的表达方式&#xff0c;但你知道吗&#xff1f;这个看似简单的符号其实有着丰富的用处和意义。让我带你了解…

java中io流、属性集Properties、缓冲流、转换流、序列化和反序列化、打印流、网络编程(TCP通信程序、文件复制案例、文件上传案例、B/S服务案例)

IO流&#xff1a; io流中i表示input输入&#xff0c;o表示output输出&#xff0c;流表示数据&#xff08;字符&#xff0c;字节&#xff0c;1个字符2个字节8个位&#xff09;&#xff1b;这里的输入输出是以内存为基础&#xff0c;将数据从内存中输出到硬盘的过程称为输出&…

CMSIS—OS(V1/V2)

在RTOS基础上再封装一层API。 更换项目中所使用到的RTOS。 例如将freertos项目替换为ucos RTX liteos等其他RTOS。 只需更改该CMSIS-OS的API所调用的RTOS的API。 更换RTOS的意义何在&#xff1f;&#xff1f;&#xff1f;&#xff1f;&#xff1f;&#xff1f;&#xff1f;&am…

函数的学习

函数学习 最后附上全部java源码&#xff0c;可自行下载学习 文章目录 函数入门函数重载函数可变个数参数foreach输出传参 基本数据类型传参_引用数据类型文件夹展示所有里面的文件使用递归算法展示文件夹下所有文件1加到100的递归调用下载链接 函数入门 函数重载 public class…

为什么企业一定要走标准化、体系化的道路?

企业实行标准化、体系化道路&#xff0c;有几个重要原因&#xff1a; 1.一致性和质量&#xff1a;标准化流程和系统可确保任务始终如一且高质量地执行。这种一致性对于提供满足客户期望的产品和服务至关重要&#xff0c;从而在客户之间建立信任和忠诚度。 2.效率和生产力&…