Flink 流式读写文件、文件夹

news2025/3/11 8:57:35

文章目录

  • 一、flink 流式读取文件夹、文件
  • 二、flink 写入文件系统——StreamFileSink
  • 三、查看完整代码

一、flink 流式读取文件夹、文件

Apache Flink针对文件系统实现了一个可重置的source连接器,将文件看作流来读取数据。如下面的例子所示:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        TextInputFormat textInputFormat = new TextInputFormat(null);
        DataStreamSource<String> source = env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);

StreamExecutionEnvironment.readFile()接收如下参数:

  • FileInputFormat参数,负责读取文件中的内容。
  • 文件路径。如果文件路径指向单个文件,那么将会读取这个文件。如果路径指向一个文件夹,FileInputFormat将会扫描文件夹中所有的文件。
  • PROCESS_CONTINUOUSLY将会周期性的扫描文件,以便扫描到文件新的改变。
  • 30000L表示多久扫描一次监听的文件。

FileInputFormat是一个特定的InputFormat,用来从文件系统中读取文件。FileInputFormat分两步读取文件。首先扫描文件系统的路径,然后为所有匹配到的文件创建所谓的input splits。一个input split将会定义文件上的一个范围,一般通过读取的开始偏移量和读取长度来定义。在将一个大的文件分割成一堆小的splits以后,这些splits可以分发到不同的读任务,这样就可以并行的读取文件了。FileInputFormat的第二步会接收一个input split,读取被split定义的文件范围,然后返回对应的数据。

DataStream应用中使用的FileInputFormat需要实现CheckpointableInputFormat接口。这个接口定义了方法来做检查点和重置文件片段的当前的读取位置。

在Flink 1.7中,Flink提供了一些类,这些类继承了FileInputFormat,并实现了CheckpointableInputFormat接口。TextInputFormat一行一行的读取文件,而CsvInputFormat使用逗号分隔符来读取文件。

二、flink 写入文件系统——StreamFileSink

该Sink不但可以将数据写入到各种文件系统中,而且整合了checkpoint机制来保证Exacly Once语义,还可以对文件进行分桶存储,还支持以列式存储的格式写入,功能更强大。

streamFileSink中输出的文件,其生命周期会经历3中状态:

  • in-progress Files 当前文件正在写入中
  • Pending Files 当处于 In-progress 状态的文件关闭closed了,就变为 Pending 状态
  • Finished Files 在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
    下面是一个简答的例子 , 将接收到的数据流 ,写入到文件中保存 !

数据文件格式是行式存储格式

        BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));
        StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(
                new Path(savePath),
                new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据
                                .withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB
                                .build())
                .withBucketAssigner(assigner)
                .build();

其中特别说明了,如果使用 FileSink 在 STREAMING 模式的时候,必须开启 checkpoint,不然的话会导致每个分片文件一直处于 in-progress 或者 pending 状态,不能保证整个写入流程的安全性。

所以在我们上述的示例中,我们并未开启 checkpoint 导致写出文件一直处于 inprogress 状态。如果加上 checkpoint 后:
在这里插入图片描述
将数据以列式存储的格式输出到文件中

三、查看完整代码

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;

import java.time.ZoneId;
import java.util.concurrent.TimeUnit;

public class WordTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.设置CK&状态后端
        env.setStateBackend(new FsStateBackend("hdfs://nameservice1/tmp/kafka_test/data/chatgpt/mnbvc/checkpoint"));
        env.enableCheckpointing(1000*60*3);// 每 ** ms 开始一次 checkpoint
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置模式为精确一次
        env.getCheckpointConfig().setCheckpointTimeout(1000*60*5);// Checkpoint 必须在** ms内完成,否则就会被抛弃
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 同一时间只允许一个 checkpoint 进行
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);// 确认 checkpoints 之间的时间会进行 ** ms
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);// 允许两个连续的 checkpoint 错误
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10,TimeUnit.SECONDS)));//重启策略:重启3次,间隔10s
        // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        
        String sourcePath = "hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_com";
        String savePath = "hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_filter_01";

        TextInputFormat textInputFormat = new TextInputFormat(null);
        DataStreamSource<String> source = env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);

        BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));
        StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(
                new Path(savePath),
                new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据
                                .withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB
                                .build())
                .withBucketAssigner(assigner)
                .build();


        source.map(line -> JSONObject.parseObject(line))
                .filter(line -> line.getString("text").length() > 200 && line.getInteger("id") % 7 == 0)
                .map(line -> JSON.toJSONString(line))
                .addSink(fileSink);

        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/884263.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

编写一个通用函数,从键盘输入n,显示正n边形。通过调用函数,在屏幕上同时显示下面的四个图形

题目&#xff1a;编写一个通用函数&#xff0c;从键盘输入n&#xff0c;显示正n边形。通过调用函数&#xff0c;在屏幕上同时显示下面的四个图形。 结果&#xff1a; 调用举例&#xff1a; drawShape(3)drawShape(4)drawShape(5)drawShape(6) 代码&#xff1a; import turt…

数据库MySQL 创建查询恢复数据库

创建数据库 查询数据库 备份恢复数据库

【Docker】 使用Docker-Compose 搭建基于 WordPress 的博客网站

引 本文将使用流行的博客搭建工具 WordPress 搭建一个私人博客站点。部署过程中使用到了 Docker 、MySQL 。站点搭建完成后经行了发布文章的体验。 WordPress WordPress 是一个广泛使用的开源内容管理系统&#xff08;CMS&#xff09;&#xff0c;用于构建和管理网站、博客和…

【深入理解ES6】块级作用域绑定

1. var声明及变量提升机制 提升&#xff08;Hoisting&#xff09;机制&#xff1a;通过关键字var声明的变量&#xff0c;都会被当成在当前作用域顶部生命的变量。 function getValue(condition){if(condition){var value "blue";console.log(value);}else{// 此处…

7-2 成绩转换

分数 15 全屏浏览题目 切换布局 作者 沈睿 单位 浙江大学 本题要求编写程序将一个百分制成绩转换为五分制成绩。转换规则&#xff1a; 大于等于90分为A&#xff1b;小于90且大于等于80为B&#xff1b;小于80且大于等于70为C&#xff1b;小于70且大于等于60为D&#xff1b;小…

RK3568 ubuntu18.04环境搭建

一.打开Window10虚拟化功能 打开Windows 10控制面板&#xff0c;选择“应用“ 点击右上角“程序与功能” 点击“启用或关闭Windows 功能”&#xff0c;勾选“适用于Linux 的Windows 子系统”和“虚 拟机平台” 二.VMware新建虚拟机 打开Vmware,选择“创建新的虚拟机” 选择“…

tk-mybatis使用介绍,springboot整合tk-mybatis、PageHelper实现分页查询

Mybatis-Plus极大简化了我们的开发&#xff0c;作为mybatis的增强版&#xff0c;Mybatis-Plus确实帮我们减少了很多SQL语句的编写&#xff0c;通过其提高的API&#xff0c;可以方便快捷第完成增删查改操作。但是&#xff0c;其实除了Mybatis-Plus以外&#xff0c;还有一个技术t…

TikTok带货有什么优势?品牌营销的新趋势

在当今数字化时代&#xff0c;品牌营销正日益倾向于社交媒体平台&#xff0c;而TikTok作为一款全球热门的短视频社交平台&#xff0c;正在成为品牌营销的新趋势。TikTok带货&#xff0c;也就是品牌利用TikTok平台进行商品推广和销售&#xff0c;已成为一种创新的、高效的营销方…

BaseMapper的insert方法快速插入数据未提交问题

一、前言 今天测试一批日志数据插入数据库&#xff0c;发现通过BaseMapper的int insert(T entity);方法在大量数据进行插入的时候插入的数据变成了未提交。意思就是程序运行insert成功&#xff0c;但是数据库里却没有数据。当一条一条数据插入的时候却是可以的&#xff0c;循环…

Linux下设计简易线程池

Linux下设计简易线程池 文章目录 Linux下设计简易线程池1.介绍2.具体实现2.1任务类头文件Task.hpp2.2线程池文件ThreadPool.hpp2.3主函数Main.cc 1.介绍 ​ 线程池是一种池化技术&#xff0c;是消费者生产者模型的具体体现。它能够预先创建一批能够被重复使用的线程&#xff0…

Java是编译型还是解释型

定义 编译语言&#xff08;英语&#xff1a;Compiled language&#xff09;是一种程式语言类型&#xff0c;通过编译器来实作。它不像直译语言一样&#xff0c;由直译器将程式码一句一句执行&#xff0c;而是以编译器&#xff0c;先将程式码编译为机器码&#xff0c;再加以执行…

麦肯锡发布《2023年度科技报告》!

在经历了 2022 年技术投资和人才的动荡之后&#xff0c;2023 年上半年&#xff0c;人们对技术促进商业和社会进步的潜力重新燃起了热情。生成式人工智能&#xff08;Generative AI&#xff09;在这一复兴过程中功不可没&#xff0c;但它只是众多进步中的一个&#xff0c;可以推…

世纪之争:量子物理学解决了「黑洞悖论」

在黑洞内部&#xff0c;20 世纪物理学的两大理论支柱似乎发生了冲突。现在&#xff0c;一群年轻的物理学家认为&#xff0c;他们通过诉诸新世纪的中心支柱——量子信息物理学&#xff0c;已经解决了这一冲突。 2013 年 8 月&#xff0c;数十位著名理论物理学家齐聚加利福尼亚州…

Cpp学习——string模拟实现

目录 一&#xff0c;string的成员变量 二&#xff0c;string的各项功能函数 1.构造函数 2.析构函数 3.扩容函数 4.插入与删除数据的函数 5.运算符重载 6.打印显示函数 7&#xff0c;拷贝构造 8.find函数 一&#xff0c;string的成员变量 在模拟实现string之前&#xff…

怎么把视频转gif图片?视频在线转gif动图的方法

我们在使用gif动图的时候&#xff0c;经常发现有些图片是一些视频片段&#xff0c;那么视频转gif图是怎么制作的呢&#xff1f;可以使用视频转gif工具&#xff0c;市面上许多软件都可以完成&#xff0c;今天就给大家介绍一个视频在线转gif的方法&#xff0c;省去了下载安装的时…

LangChain手记 Question Answer 问答系统

整理并翻译自DeepLearning.AILangChain的官方课程&#xff1a;Question Answer&#xff08;源代码可见&#xff09; 本节介绍使用LangChian构建文档上的问答系统&#xff0c;可以实现给定一个PDF文档&#xff0c;询问关于文档上出现过的某个信息点&#xff0c;LLM可以给出关于该…

redis事务对比Lua脚本区别是什么

redis官方对于lua脚本的解释&#xff1a;Redis使用同一个Lua解释器来执行所有命令&#xff0c;同时&#xff0c;Redis保证以一种原子性的方式来执行脚本&#xff1a;当lua脚本在执行的时候&#xff0c;不会有其他脚本和命令同时执行&#xff0c;这种语义类似于 MULTI/EXEC。从别…

SABO-ELM电力负荷短期预测,MATLAB代码

关于电力负荷预测&#xff0c;后台留言的呼声很高。今天就为大家带来一期关于电力负荷预测的文章。 简介 简单说一下本期内容&#xff1a; ①对电力负荷数据进行处理 ②采用极限学习机(ELM)对电力负荷数据进行训练和预测 ③采用减法平均优化器算法优化极限学习机的权值阈值&…

【Linux命令详解 | du命令】 du命令用于查看文件或目录的磁盘使用情况,帮助管理存储空间。

文章标题 简介一&#xff0c;参数列表二&#xff0c;使用介绍1. 基本用法2. 以人类可读的格式显示大小3. 显示总计磁盘使用量4. 包括每个文件的大小5. 限制显示的目录深度6. 排除特定文件或目录7. 指定块大小总结 简介 在Linux操作系统中&#xff0c;存储空间管理是至关重要的…

docker打包运行中的容器,生成镜像文件保存到本地

因为想着方便部署&#xff0c;将所有没问题的项目容器打包成镜像&#xff0c;走到哪儿都离线安装自动部署。 第一步先把运行中的容器打包成镜像 docker commit 运行中容器id 像打包成的镜像名称第二步将大象装进冰箱&#xff0c;不好意思说错了&#xff0c;把镜像保存到本地 …