Flink中FileSink的使用

news2025/1/10 21:42:17

在Flink中提供了StreamingFileSink用以将数据流输出到文件系统.
这里结合代码介绍如何使用FileSink.
首先FileSink有两种模式forRowFormatforBulkFormat

    public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
            final Path basePath, final Encoder<IN> encoder) {
        return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
    }

    public static <IN> DefaultBulkFormatBuilder<IN> forBulkFormat(
            final Path basePath, final BulkWriter.Factory<IN> bulkWriterFactory) {
        return new DefaultBulkFormatBuilder<>(
                basePath, bulkWriterFactory, new DateTimeBucketAssigner<>());
    }

二者的区别是forRowFormat是一行一行的处理数据,而forBulkFormat则是可以一次处理多条数据,而多条处理的好处就是可以帮助生成列式存储的文件如ParquetFileORCFile,而forRowFormat则做不到这点,关于列式存储和行式存储的区别可通过数据存储格式这篇文章简单做一个了解.

下面以forRowFormat作为示例演示一下代码

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/6/27
 * @Description: 测试
 **/
public class FlinkFileSink {
    public static void main(String[] args) throws Exception {
        // 构建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为1
        env.setParallelism(1);
        // 这里是生成数据流,CustomizeSource这个类是自定义数据源(为了方便测试)
        DataStreamSource<CustomizeBean> dataStreamSource = env.addSource(new CustomizeSource());
        // 现将数据转换成字符串形式
        SingleOutputStreamOperator<String> map = dataStreamSource.map(bean -> bean.toString());
        // 构造FileSink对象,这里使用的RowFormat,即行处理类型的
        FileSink<String> fileSink = FileSink
                // 配置文件输出路径及编码格式
                .forRowFormat(new Path("/Users/xxx/data/testData/"), new SimpleStringEncoder<String>("UTF-8"))
                // 设置文件滚动策略(文件切换)
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofSeconds(180)) // 设置间隔时长180秒进行文件切换
                                .withInactivityInterval(Duration.ofSeconds(20)) // 文件20秒没有数据写入进行文件切换
                                .withMaxPartSize(MemorySize.ofMebiBytes(1)) // 设置文件大小1MB进行文件切换
                                .build()
                )
                // 分桶策略(划分子文件夹)
                .withBucketAssigner(new DateTimeBucketAssigner<String>()) // 按照yyyy-mm-dd--h进行分桶
                //设置分桶检查时间间隔为100毫秒
                .withBucketCheckInterval(100)
                // 输出文件文件名相关配置
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("test_") // 文件前缀
                                .withPartSuffix(".txt") // 文件后缀
                                .build()
                )
                .build();
        // 输出到文件
        map.print();
        map.sinkTo(fileSink);
        env.execute();
    }
}

代码内容这里就不详细说明了,注释已经写得很清楚了.有一点要注意使用FileSink的时候我们要加上对应的pom依赖.我这里使用Flink版本是1.15.3

        <!-- File connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>

这里我们先看一下生成的结果文件

-rw-r--r--  1 xxx  staff   1.0M  6 27 14:43 .test_-eb905337-488d-46f1-8177-86fbb46f778f-0.txt.inprogress.91e49c89-cc79-44f5-940d-ded2770b61a1
-rw-r--r--  1 xxx  staff   1.0M  6 27 14:44 .test_-eb905337-488d-46f1-8177-86fbb46f778f-1.txt.inprogress.c548bd30-8583-48d5-91d2-2e11a7dca2cd
-rw-r--r--  1 xxx  staff   1.0M  6 27 14:45 .test_-eb905337-488d-46f1-8177-86fbb46f778f-2.txt.inprogress.a041dba1-8f37-4307-82da-682c48b0796b
-rw-r--r--  1 xxx  staff   280K  6 27 14:45 .test_-eb905337-488d-46f1-8177-86fbb46f778f-3.txt.inprogress.e05d1759-0a38-4a25-bcd0-1216ce6dda59

这里有必要说明一下由于我使用的是Mac在生成文件的时候会出现一个小问题,上面的那种文件会隐藏起来,直接点开文件夹是看不到的可以通过command + shift + .来显示隐藏文件,或者像我这种直接通过终端ll -a来查看,windows没有发现这个问题.
可以看到除了最后一个文件,其他的文件大小基本都是1MB,最后一个是因为写入的数据大小还没有满足1MB,并且写入时间也没有满足滚动条件,所以还在持续写入中.
而且通过文件名我们可以看到所有文件中都带有inprogress这个状态,这是因为我们没有开启checkpoint,这里先说一下FileSink写入文件时的三个文件状态,官网原图如下:
在这里插入图片描述
这三种状态分别是inprogresspendingfinished,对应的就是处理中、挂起和完成,官网中同时也说明了FileSink必须和checkpoint配合使用,不然文件的状态只会出现inprogresspending,原文内容如下:
在这里插入图片描述
下面我们在看一下加入checkpoint的代码和结果文件
代码如下

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/6/27
 * @Description: 测试
 **/
public class FlinkFileSink {
    public static void main(String[] args) throws Exception {
        // 构建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为1
        env.setParallelism(1);
        // 这里是生成数据流,CustomizeSource这个类是自定义数据源(为了方便测试)
        DataStreamSource<CustomizeBean> dataStreamSource = env.addSource(new CustomizeSource());
        // 现将数据转换成字符串形式
        SingleOutputStreamOperator<String> map = dataStreamSource.map(bean -> bean.toString());

        // 每20秒作为checkpoint的一个周期
        env.enableCheckpointing(20000);
        // 两次checkpoint间隔最少是10秒
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
        // 程序取消或者停止时不删除checkpoint
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // checkpoint必须在60秒结束,否则将丢弃
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一时间只能有一个checkpoint
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 设置EXACTLY_ONCE语义,默认就是这个
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // checkpoint存储位置
        env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
        // 设置执行模型为Streaming方式
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        
        // 构造FileSink对象,这里使用的RowFormat,即行处理类型的
        FileSink<String> fileSink = FileSink
                // 配置文件输出路径及编码格式
                .forRowFormat(new Path("/Users/xxx/data/testData/"), new SimpleStringEncoder<String>("UTF-8"))
                // 设置文件滚动策略(文件切换)
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofSeconds(180)) // 设置间隔时长180秒进行文件切换
                                .withInactivityInterval(Duration.ofSeconds(20)) // 文件20秒没有数据写入进行文件切换
                                .withMaxPartSize(MemorySize.ofMebiBytes(1)) // 设置文件大小1MB进行文件切换
                                .build()
                )
                // 分桶策略(划分子文件夹)
                .withBucketAssigner(new DateTimeBucketAssigner<String>()) // 按照yyyy-mm-dd--h进行分桶
                //设置分桶检查时间间隔为100毫秒
                .withBucketCheckInterval(100)
                // 输出文件文件名相关配置
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("test_") // 文件前缀
                                .withPartSuffix(".txt") // 文件后缀
                                .build()
                )
                .build();
        // 输出到文件
        map.print();
        map.sinkTo(fileSink);
        env.execute();
    }
}

看一下结果文件:

-rw-r--r--  1 xxx  staff   761K  6 27 15:13 .test_-96ccd42e-716d-4ee0-835e-342618914e7d-2.txt.inprogress.aa5fccaa-f99f-4059-93e7-6d3c548a66b3
-rw-r--r--  1 xxx  staff   1.0M  6 27 15:11 test_-96ccd42e-716d-4ee0-835e-342618914e7d-0.txt
-rw-r--r--  1 xxx  staff   1.0M  6 27 15:12 test_-96ccd42e-716d-4ee0-835e-342618914e7d-1.txt

可以看到已经完成的文件状态中已经没有inprogress和其他的后缀了,而正在写入的文件则是处于inprogress状态.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/692061.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Labview视觉一键尺寸测量仪,多产品,多尺寸,快速编辑, 测量,导出结果

这是一个关于LabVIEW视觉一键尺寸测量仪的描述&#xff0c;它具有以下特点&#xff1a;支持多种产品和尺寸的测量&#xff0c;可以快速进行编辑、测量和导出结果。 这个领域涉及到的知识点和领域范围包括&#xff1a;LabVIEW、视觉测量、尺寸测量、编辑功能和结果导出。 LabV…

HCIP-7.4交换机STP生成树协议原理

HCIP-7.4交换机STP生成树协议原理 1、什么是交换机生成树&#xff1f;2、STP生成树2.1、标准生成树基本计算过程(802.1D)2.2、STP的基本概念2.3、 BPDU格式及字段说明2.4、 STP的选举原则2.4.1 配置案例说明2.4.2 华为设备的COST值 2.5、端口状态描述2.6、cost值修改2.6.1、非根…

【UCOS-III】自我学习笔记→第20讲→时间管理

文章目录 前言实验步骤1.复制任务创建和删除工程文件并删除task3任务&#xff0c;修改任务1和任务2的优先级为22.修改任务1和任务2的内容3.查看示波器现象 测试代码工程文件总结 前言 无&#xff0c;仅作记录&#xff0c;不具有参考价值&#xff0c;所用开发板为STM32F411RET6…

计算机视觉:分割一切AI大模型segment-anything

1 segment-anything介绍 Segment Anything Model (SAM)来源于Facebook公司Meta AI实验室。据Mata实验室介绍&#xff0c;SAM 已经学会了关于物体的一般概念&#xff0c;并且它可以为任何图像或视频中的任何物体生成 mask&#xff0c;甚至包括在训练过程中没有遇到过的物体和图…

刷题日记《链表01》

题目概述&#xff08;力扣&#xff09; 给定循环单调非递减列表中的一个点&#xff0c;写一个函数向这个列表中插入一个新元素 insertVal &#xff0c;使这个列表仍然是循环升序的。 给定的可以是这个列表中任意一个顶点的指针&#xff0c;并不一定是这个列表中最小元素的指针。…

SpringCloud-13_Alibaba OSS

对象存储 OSS 就是所谓的“图床”吗&#xff1f;&#xff08;致敬yupi /捂脸&#xff09; 一图说明&#xff1a; 阿里云对象存储oos 阿里云对象存储 OSS&#xff08;Object Storage Service&#xff09;是一款海量、安全、低成本、高可靠的云存储服务&#xff0c;提供最高可达 …

汉王人脸考勤管理系统 万能密码登录 漏洞复现

fofa&#xff1a;title“汉王人脸考勤管理系统” 漏洞复现 登录页面&#xff1a; 使用万能密码登录 用户名&#xff1a;or’ or 11– 密码&#xff1a;or

Minio的使用

今天学习的时候用到了阿里云的OSS&#xff0c;由于在公司项目上用到了Minio作为云端文件服务器&#xff0c;因此学习了以下Minio&#xff0c;打算替换掉阿里云的OSS 1.Minio的安装 在这里提供了Docker-compose的模式作为Minio的下载(其中9002作为API请求接口端&#xff0c;90…

Matlab【旅行商问题】—— 基于模拟退火算法的无人机药品配送路线最优化

文章目录 问题描述模拟退火算法Metropolis准则算法流程图&#xff1a; Demo1&#xff1a;只考虑累计距离&#xff0c;通过模拟退火算法求解最短路径matlab代码&#xff1a;最优解之一&#xff1a;适应度进行曲线&#xff1a; Demo1&#xff1a;考虑每个站点的病人数量&#xff…

Visual modflow Flex地下水数值模拟及参数优化、抽水实验设计与处理、复杂的饱和/非饱和地下水流分析

专题一 地下水数值软件的操作流程、建模步骤和所需资料处理及相关注意事项 [1] Visual MODFLOW Flex特征 [2] Visual MODFLOW Flex软件界面及模块 [3] 地下水数值模拟的建模步骤及数据需求 专题二 模型建模操作方法 技巧、真实案例演练、特殊问题处理 [1] 直接模型建模的操…

深入浅出设计模式 - 装饰者模式

博主介绍&#xff1a; ✌博主从事应用安全和大数据领域&#xff0c;有8年研发经验&#xff0c;5年面试官经验&#xff0c;Java技术专家✌ Java知识图谱点击链接&#xff1a;体系化学习Java&#xff08;Java面试专题&#xff09; &#x1f495;&#x1f495; 感兴趣的同学可以收…

Python桌面应用开发之PyQt

文章目录 引言&#xff1a;桌面应用开发三大框架介绍一、PyQT介绍二、安装三、使用教程(1)基础窗口(2)分区布局窗口(类似于html中div的使用)(3)栅格布局窗口(类似于html中的table)(4)表单布局窗口(类似于html中的form)(5)事件函数与事件过滤器(6)信号和槽四、实战示例(1)状态栏…

跨链协议悄然升级

当前加密产业公链百家争鸣&#xff0c;群雄割据&#xff0c;每条公链都拥有自身的忠实用户。 然而&#xff0c;公链与公链之间仿佛一座座孤岛&#xff0c;无法进行无缝的交流和联系&#xff0c;仅能通过跨链桥经由在两条不同的链上运用不同处理机制来协助转移用户的资产。但&a…

【深度学习 | CNN】“深入解析卷积神经网络与反卷积:从原理到应用的全面指南” (从一维、二维、三维讲解)

🤵‍♂️ 个人主页: @AI_magician 📡主页地址: 作者简介:CSDN内容合伙人,全栈领域优质创作者。 👨‍💻景愿:旨在于能和更多的热爱计算机的伙伴一起成长!!🐱‍🏍 🙋‍♂️声明:本人目前大学就读于大二,研究兴趣方向人工智能&硬件(虽然硬件还没开始玩…

MySQL数据库的MHA高可用集群部署及故障切换(图文详解!绝对详细!!)

目录 一、MHA概述 1、MHA简介 2、MHA 的组成 &#xff08;1&#xff09;MHA Node&#xff08;数据节点&#xff09; &#xff08;2&#xff09;MHA Manager&#xff08;管理节点&#xff09; 3、MHA 的特点 二、 搭建MySQLMHA 1、实验思路 2、修改mysql节点的主机名 3&…

JS 事件委托

JavaScript事件委托&#xff08;Event delegation&#xff09;又叫事件代理&#xff0c;是一种在父元素上监听事件&#xff0c;然后通过事件冒泡机制来处理子元素的事件的技术。通过事件委托&#xff0c;可以避免为每个子元素都绑定事件处理程序&#xff0c;提高性能并简化代码…

SC7504运算放大器(OPA)可pin对pin兼容OPA4350

SC750x 系列轨至轨 CMOS 运算放大器针对低电压单电源运行进行了优化。轨至轨输入和输出、低噪声(5nV/√Hz) 和高速运行(38MHz, 22V/μs) 使得运算放大器非常适合驱动模数 (A/D) 转换器。可pin对pin兼容OPA4350。而且也适用于手机功率放大器 (PA) 控制环路和视频处理&#xff08…

【嵌入式Qt开发入门】初识Qt——Linux下安装Qt

Qt 是什么&#xff1f; Qt 是一个跨平台的 C开发库。主要用来开发图形用户界面&#xff08;Graphical User Interface&#xff0c;简 称 GUI&#xff09;程序。 Qt 虽然经常被当做一个 GUI 库&#xff0c;用来开发图形界面应用程序&#xff0c;但这并不是 Qt 的全部&#xff1b…

Win11总是出现BitLocker恢复,想要彻底关闭它该如何操作?

win11解除bitlocker加密方法一&#xff1a; 1、首先按下键盘“WinR”打开运行(如图所示)。 2、打开运行后&#xff0c;在其中输入“control”并点击“确定”打开控制面板(如图所示)。 3、打开后&#xff0c;进入“bitlocker驱动器加密”(如图所示)。 4、随后展开被加密的磁盘&…

Pycharm报错Non-zero exit code (2)

问题现象&#xff1a; 通常我们拿到一个Python项目后&#xff0c;项目中有requirement.txt文件&#xff0c;里面有列出需要安装的三方库&#xff0c;使用pycharm直接安装这些库时&#xff0c;报错&#xff1a;Non-zero exit code (2) 解决方案&#xff1a; 第一种临时解决方案&…