Flink之FileSink将数据写入parquet文件

news2024/9/24 21:19:12

Flink之FileSink将数据写入parquet文件

在使用FileSink将数据写入列式存储文件中时必须使用forBulkFormat,列式存储文件如ORCFileParquetFile,这里就以ParquetFile为例结合代码进行说明.

在Flink1.15.3中是通过构造ParquetWriterFactory然后调用forBulkFormat方法将构造好的ParquetWriterFactory传入,这里先讲一下构造ParquetWriterFactory一共有三种方式

序列API
方式一AvroParquetWriters.forGenericRecord
方式二AvroParquetWriters.forSpecificRecord
方式三AvroParquetWriters.forReflectRecord

其中方式三AvroParquetWriters.forReflectRecord是我们常用的方法,使用起来也是复杂最低、代码变更时灵活度较好方法,方式二AvroParquetWriters.forSpecificRecord使用起来复杂度较高,但是代码变更的时候灵活度相对较好的方法,方式一AvroParquetWriters.forGenericRecord使用起来比较麻烦,而且代码变更时需要更改的也比较多,这里主要介绍方式二和方式三的使用方式.

要说明一点再Flink1.15.3中是通过AvroParquetWriters来构造ParquetWriterFactory,如果是早期版本的Flink可能是要通过ParquetAvroWriters来进行构造,当然在1.15.3中也可以通过这个方式进行构造,不过ParquetAvroWriters已经标注为过时并且建议使用AvroParquetWriters

源码内容如下:

/**
 * Convenience builder to create {@link ParquetWriterFactory} instances for the different Avro
 * types.
 *
 * @deprecated use {@link AvroParquetWriters} instead. // 看这部分是建议使用AvroParquetWriters
 */
@Deprecated // 这里已经标注了过时
public class ParquetAvroWriters {

    /**
     * Creates a ParquetWriterFactory for an Avro specific type. The Parquet writers will use the
     * schema of that specific type to build and write the columnar data.
     *
     * @param type The class of the type to write.
     */
    public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(
            Class<T> type) {
        return AvroParquetWriters.forSpecificRecord(type);
    }
  • AvroParquetWriters.forReflectRecord(方式三)

    这里就先介绍一下AvroParquetWriters.forReflectRecord的使用方式,我们在使用FileSink时最好配合Checkpoint使用,不然文件只会出现inprogress状态,感兴趣的可以自己实验一下,我在Flink中FileSink的使用演示了加Checkpoint和不加Checkpoint的区别感兴趣的可以看一下.

    代码模板内容比较简单,直接代码演示:

    import com.jin.bean.User;
    import com.jin.schema.UserSchemaBean;
    import org.apache.flink.connector.file.sink.FileSink;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.formats.parquet.ParquetWriterFactory;
    import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
    
    /**
     * @Author: J
     * @Version: 1.0
     * @CreateTime: 2023/6/28
     * @Description: 测试
     **/
    public class FlinkFileSinkForParquet {
        public static void main(String[] args) throws Exception {
            // 创建流环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置并行度
            env.setParallelism(1);
    
            // 每30秒作为checkpoint的一个周期
            env.enableCheckpointing(30000);
            // 两次checkpoint间隔最少是20秒
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);
            // 程序取消或者停止时不删除checkpoint
            env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            // checkpoint必须在60秒结束,否则将丢弃
            env.getCheckpointConfig().setCheckpointTimeout(60000);
            // 同一时间只能有一个checkpoint
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
            // 设置EXACTLY_ONCE语义,默认就是这个
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            // checkpoint存储位置
            env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
            
            // 添加数据源(这里使用的是自定义数据源CustomizeSource,方便测试)
            DataStreamSource<CustomizeBean> sourceStream = env.addSource(new CustomizeSource());
            // 将数据流中的数据存储到bean对象中
            SingleOutputStreamOperator<User> userMapStream = sourceStream.map(bean -> new User(bean.getName(), bean.getAge(), bean.getGender(), bean.getHobbit()));
            // 构建parquetWriterFactory
            ParquetWriterFactory<User> parquetWriterFactory2 = AvroParquetWriters.forReflectRecord(User.class);
            // 构建FileSink
            FileSink<User> parquetFileSink = FileSink
                    // 使用Bulk模式,并配置路径和对应的schema
                    .forBulkFormat(new Path("/Users/xxx/data/testData/"), parquetWriterFactory2)
                    // 分桶策略,使用默认的
                    .withBucketAssigner(new DateTimeBucketAssigner<User>())
                    // 每100毫秒检查一次分桶
                    .withBucketCheckInterval(100)
                    // 滚动策略,Bulk的滚动策略只有一种,就是发生Checkpoint的时候才进行滚动(为了保证列式文件的完整性)
                    .withRollingPolicy(OnCheckpointRollingPolicy.build())
                    .build();
            // 输出到文件
            userMapStream.sinkTo(parquetFileSink);
            env.execute();
    
        }
    }
    
    @Getter
    @Setter
    @ToString
    @NoArgsConstructor
    @AllArgsConstructor
    class User {
        private String name;
        private int age;
        private String gender;
        private String hobbit;
    }
    

    代码中注释很详细了,具体使用看注释即可。这里说明一下为什么forBulkFormat的滚动策略只有OnCheckpointRollingPolicy而不是像forRowFormat那样可以通过时间和文件大小来控制文件滚动,注释中我也讲了是为了保证列式存储文件的完整性,因为列式文件中记录了很多信息,并不想行式存储文件一行一行的写就行,写到某一行直接停了也不影响文件的使用,而列式存储文件中不单单是记录了数据本身还有对应的字段类型、文件头信息、文件尾信息、切片索引等很多信息,如果在写入数据时某一刻直接停止了,而文件还没有生成完整的信息那就会导致这个烈士存储文件根本不具备使用性,是无法进行解析的。

    就比如说ParquetFile,它的文件结构如下图
    在这里插入图片描述

    可以看到文件的结构信息是很复杂的,如果感兴了解一下可以看数据存储格式这篇文章了解一下,这里就不细说了,内容还是比较多的.

  • AvroParquetWriters.forSpecificRecord(方式二)

    forSpecificRecord的使用不像forReflectRecord那样自定义一个bean接收数据就行了,使用forSpecificRecord还要结合一下Apache avro的官网看一下,下面我就介绍一下如何使用forSpecificRecord.

    avro的使用有两种方式一是通过API直接调用的方式,二通过配置avsc文件然后进行编译的方式,在代码中我们使用的第二种方式,使用第一种方式同样会出现很多schema的信息在代码中写死修改起来会比较复杂的问题,而且对avroAPI也要足够熟悉,学习成本还是有的.

    1. resource目录中创建avsc文件,文件内容如下

      {
        "namespace": "com.jin.schema",
        "type": "record",
        "name": "UserSchemaBean",
        "fields": [
          {"name": "name", "type": "string"},
          {"name": "age", "type": "int"},
          {"name": "gender",  "type": "string"},
          {"name": "hobbit", "type": "string"}
        ]
      }
      

      文件中的内容就是schema信息,这里我相信大家都能看得明白."namespace": "com.jin.schema"编译后自动创建的bean的存储位置,"name": "UserSchemaBean"就是配置生成bean的名称,fields中就是配置生成bean的成员变量和对应的数据类型.

      官网演示的avsc文件内容如下:

      {"namespace": "example.avro",
       "type": "record",
       "name": "User",
       "fields": [
           {"name": "name", "type": "string"},
           {"name": "favorite_number",  "type": ["int", "null"]},
           {"name": "favorite_color", "type": ["string", "null"]}
       ]
      }
      

      编译后就会根据avsc文件中的schema信息在配置好的目录中自动创建bean.

    2. Maven中添加avsc文件编译插件

      官网内容如下:

      <plugin>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.11.1</version>
        <executions>
          <execution>
            <phase>generate-sources</phase>
            <goals>
              <goal>schema</goal>
            </goals>
            <configuration>
              <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
              <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
            </configuration>
          </execution>
        </executions>
      </plugin>
      

      要注意<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>是已经配置完的avsc文件的位置,像是我就是在原有的resource目录下配置的就要将内容改成<sourceDirectory>${project.basedir}/src/main/resource/</sourceDirectory>否则在编译时就会报错找不到对应的目录或文件,如果想直接使用<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>那就在项目的main目录下创建一个avro目录并将目录性质改为Source root(这个如果不会可自行百度,关键字我都已经提供了).

      我的项目中实际配置如下:

                  <!-- avro插件 -->
                  <plugin>
                      <groupId>org.apache.avro</groupId>
                      <artifactId>avro-maven-plugin</artifactId>
                      <version>1.10.0</version>
                      <executions>
                          <execution>
                              <phase>generate-sources</phase>
                              <goals>
                                  <goal>schema</goal>
                              </goals>
                              <configuration>
                                  <sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
                                  <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                              </configuration>
                          </execution>
                      </executions>
                  </plugin>
      

      选择插件的版本时要注意依赖冲突问题,我们要先看一下Flink的flink-avro下的org.apache.avro:avro是什么版本,如下图:
      在这里插入图片描述

      可以看到1.15.3org.apache.avro:avro的版本是1.10.0,所以我选择的插件也是这个版本.

    3. 编译

      上面步骤都完成了就可以进行编译了,Maven->Lifecycle->compile,这里看一下编译后的结果如下图:
      在这里插入图片描述

      可以看到已经根据我们配置的avsc文件自动创建了对应的bean,这里看一下成员变量内容是否一致,如下:

        /**
         * All-args constructor.
         * @param name The new value for name
         * @param age The new value for age
         * @param gender The new value for gender
         * @param hobbit The new value for hobbit
         */
        public UserSchemaBean(java.lang.CharSequence name, java.lang.Integer age, java.lang.CharSequence gender, java.lang.CharSequence hobbit) {
          this.name = name;
          this.age = age;
          this.gender = gender;
          this.hobbit = hobbit;
        }
      

      可以看到成员变量信息也是完全一致,我这里值展示了小部分代码,编译后的bean中的代码信息很多,不过我们不用关心这个,懂与不懂都不影响使用.

    4. 代码内容

      接下来就到主题了,实际的代码内容如下:

      import com.jin.schema.UserSchemaBean;
      import org.apache.flink.connector.file.sink.FileSink;
      import org.apache.flink.core.fs.Path;
      import org.apache.flink.formats.parquet.ParquetWriterFactory;
      import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
      import org.apache.flink.streaming.api.CheckpointingMode;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      import org.apache.flink.streaming.api.environment.CheckpointConfig;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
      import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
      
      /**
       * @Author: J
       * @Version: 1.0
       * @CreateTime: 2023/6/28
       * @Description: 测试
       **/
      public class FlinkFileSinkForParquet {
          public static void main(String[] args) throws Exception {
              // 创建流环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              // 设置并行度
              env.setParallelism(1);
      
              // 每30秒作为checkpoint的一个周期
              env.enableCheckpointing(30000);
              // 两次checkpoint间隔最少是20秒
              env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);
              // 程序取消或者停止时不删除checkpoint
              env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
              // checkpoint必须在60秒结束,否则将丢弃
              env.getCheckpointConfig().setCheckpointTimeout(60000);
              // 同一时间只能有一个checkpoint
              env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
              // 设置EXACTLY_ONCE语义,默认就是这个
              env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
              // checkpoint存储位置
              env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
      
              // 添加数据源(这里使用的是自定义数据源,方便测试)
              DataStreamSource<CustomizeBean> sourceStream = env.addSource(new CustomizeSource());
              // 将数据流中的对象转成UserSchemaBean类型
              SingleOutputStreamOperator<UserSchemaBean> mapStream = sourceStream.map(bean -> new UserSchemaBean(bean.getName(), bean.getAge(), bean.getGender(), bean.getHobbit()));
              // 构建parquetWriterFactory,这里传入的就是编译后的UserSchemaBean
              ParquetWriterFactory<UserSchemaBean> parquetWriterFactory = AvroParquetWriters.forSpecificRecord(UserSchemaBean.class);
              // 构建FileSink
              FileSink<UserSchemaBean> parquetFileSink = FileSink
                      // 使用Bulk模式,并配置路径和对应的schema
                      .forBulkFormat(new Path("/Users/xxx/data/testData/"), parquetWriterFactory)
                      // 分桶策略,使用默认的
                      .withBucketAssigner(new DateTimeBucketAssigner<UserSchemaBean>())
                      // 每100毫秒检查一次分桶
                      .withBucketCheckInterval(100)
                      // 滚动策略,Bulk的滚动策略只有一种,就是发生Checkpoint的时候才进行滚动(为了保证列式文件的完整性)
                      .withRollingPolicy(OnCheckpointRollingPolicy.build())
                      .build();
              // 输出到文件
              mapStream.sinkTo(parquetFileSink);
              env.execute();
      
          }
      }
      

      通过代码我们可以看到,内容基本就是一致的无非就是forSpecificRecord传入的bean不同而已,当然还是建议使用AvroParquetWriters.forReflectRecord这种方式,简易高效,复杂的过程并不一定能提高我们的代码能力.

      到这里这两种方式我都介绍完了,希望看完这篇文章有所收获.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/696575.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第9章 异常处理

第9章 异常处理 9.1 Java异常处理 try-catch-finally ​ ​ 9.2 Scala异常处理 ​ ​ package chapter09object Test01_Exception {def main(args: Array[String]): Unit {try {val n 10 / 1} catch {case e: ArithmeticException > {println("发生算数异常&quo…

【Java高级编程】Java常用类

Java常用类 1、字符串相关的类1.1、字符串相关的类&#xff1a;String1.2、字符串相关的类&#xff1a;String常用方法1.3、String与基本数据类型、包装类之间的转换1.4、String与char[]之间的转换1.5、String与byte[]之间的转换1.6、String、StringBuffer、StringBuilder三者的…

element-plus中的el-table如何动态合并行(复制粘贴即可使用 亲测有效!)

demo场景&#xff1a; 一个 table&#xff0c;由5列组成&#xff0c;其 prop 分别为’resource_name’, ‘scene_name’, ‘type’, ‘content’, ‘desc’&#xff0c;渲染 table 的数据来源于接口。现在需要将 propdesc的这一列&#xff0c;按照resource_name 相等时进行合并…

使用EVPN构建二层VPN

一、业务背景 EVPN实例用于将EVPN路由与公网路由隔离。不同EVPN实例的路由之间也是相互隔离的。在所有EVPN组网方案中,都需要配置EVPN实例。 二、业务拓扑 三、配置步骤 1、配置基本接口IP 2、配置核心网IGP和LDP 3、配置PE和PE之间的BGP EVPN邻居关系 4、配置EVPN的源地址 …

Linux5.10 NoSQL 之 Redis配置与优化及数据类型

文章目录 计算机系统5G云计算第四章 LINUX NOSQL 之 Redis配置与优化及数据类型一、关系数据库与非关系型数据库1.关系型数据库2.非关系型数据库3.关系型数据库和非关系型数据库区别4.非关系型数据库产生背景5.总结 二、Redis简介1.Redis 具有以下几个优点2.使用场景3.哪些数据…

一、枚举类型——使用 EnumMap 分发

使用 EnumMap 可以实现“真正的”双路分发&#xff0c;它是专门为 enum 设计的高效 Map。我们的目标是在两个未知类型中切换&#xff0c;因此由 EnumMap 组成的 EnumMap &#xff08;嵌套 EnumMap &#xff09;可以实现双路分发。 RoShamBo5.java import java.util.EnumMap;imp…

如何 设计一个高质量的 API 接口?

目录 你是否也感同身受&#xff1f; 优秀API的特质 自解释 易学习 易使用 难误用 API 设计原则 1. 充分原则 2. 单一视角原则 3. 单一功能原则 4. 简单原则 5. 抽象原则 6. 兼容扩展原则 7. 最小惊讶原则 8. 低耦合原则 9. 正交原则 10. 易测试原则 11. 统一原…

怎样查看电脑开关机日志

最近想查看家里电脑是否每天都正常关机了。 家里的是Windows电脑。 使用windows自带功能或者说自带工具无疑是最方便的。 按下【开始→运行】&#xff0c;输入eventvwr&#xff0c;按下回车键。 打开事件查看器&#xff0c;展开Windows日志&#xff0c;双击系统。 选择筛…

Redis内存满分析

操作0&#xff1a; dbsize计算db大小&#xff0c;判断是哪个db的问题&#xff0c;发现是db1的问题。 操作1&#xff1a; Redis中先备份xxx.rdb文件&#xff0c;然后使用下面的工具进行分析 Redis内存分析工具之redis-rdb-tools的安装与使用_薛定谔的猫io的博客-CSDN博客 结…

如何在Windows 10中移动文档文件夹位置

默认情况下,Windows将你的个人文档文件夹存储在你帐户的%UserProfile%文件夹中(例如:“C:\Users\Kent”)。 你可以将“文档”文件夹中文件的存储位置更改为硬盘驱动器、其他驱动器或网络上的其他计算机上的其他位置。 一、如果你当前的文档文件夹受 OneDrive 保护,那么你…

记录一个iOS无法找到堆栈信息的崩溃修复

崩溃提示 2023-06-28 21:14:46.9624560800 -[UIDynamicCatalogColor length]: unrecognized selector sent to instance 0x6000073292c0 崩溃如下图所示 思路&#xff0c;既然我们无法通过调用的堆栈信息查找&#xff0c;那就试试通过崩溃对象的内存地址查看该对象的详细信息 …

攻防世界-web-mfw

题目描述&#xff1a;如图&#xff0c;只有这样的三个页面 home页面&#xff1a; about页面&#xff1a; contact页面&#xff1a; burp抓包可以看到返回的html中刚好对应了三个页面&#xff0c;以及注释掉的flag 尝试将page设置成flag&#xff0c;但是并没有什么反应。 1. 思…

【Web 网络管理】网络杂谈(8)之基于 Web 的网络管理

涉及知识点 基于 Web 的网络管理模式&#xff0c;WBM的介绍与标准&#xff0c;WBM的实现方式与关键技术&#xff0c;WBM的安全性考虑。深入了解WBM技术。 原创于&#xff1a;CSDN博主-《拄杖盲学轻声码》&#xff0c;更多内容可去其主页关注下哈&#xff0c;不胜感激 文章目录…

污水厂3D可视化智慧大屏实现统一数据管理和信息化建设

随着城市化进程的加速和人口的不断增长&#xff0c;污水排放量也随之增加。3D可视化技术的出现&#xff0c;为污水厂的管理和运营带来了新的思路和方法。本文将探讨污水厂3D可视化智慧大屏系统的意义。 首先&#xff0c;污水厂3D可视化智慧大屏系统可以帮助管理人员更好地了解…

Unable to open debugger port (127.0.0.1:55305): java.net.BindException

如图idea启动服务端口被占用&#xff0c;如何解决 1.修改启动服务的端口 修改tomcat的启动端口&#xff0c;避免和windows已启动端口占用&#xff08;不推荐&#xff09; 2.杀死占用服务的端口 winR 打开命令行窗口 netstat -aon|findstr “55305” taskkill -f -pid 25512…

汽车租赁系统

汽车租赁系统&#xff0c;java&#xff0c;ssm&#xff0c;tomcat&#xff0c;mysql。包含数据库文件&#xff0c;源码&#xff0c;和项目运行演示介绍视频 idea和eclipse都可以运行。 系统介绍&#xff1a; 角色&#xff1a;管理员&#xff0c;用户 主要功能&#xff1a;汽车资…

解决 vue2 productionTip=false设置无效

原因&#xff1a;在最新版本的Chrome中&#xff0c;在script中使用settimeout&#xff0c;将在允许第一个js完成后立即回调。 第一种&#xff1a; 直接在源码中&#xff0c;将productionTip&#xff1a;true直接改成false 第二种&#xff0c;则是在红框位置改成大于0的任意数值…

建立无需build的react单页面SPA框架

vue、react这种前端渲染的框架&#xff0c;比较适合做SPA。如果用ejs做SPA&#xff08;Single Page Application&#xff09;&#xff0c;js代码控制好全局变量冲突不算严重&#xff0c;但dom元素用jquery操作会遇到很多的名称上的冲突&#xff08;tag、id、name&#xff09;。…

PyTorch Lightning基础入门

Lightning in 15 minutes Lightning in 15 minutes — PyTorch Lightning 2.0.4 documentation 安装 PyTorch Lightning pip install lightning conda install lightning -c conda-forge 定义一个LightningModule LightningModule可以让pytorch的nn.Module可以整合一些训…

一般人不要轻易去学习网络安全(黑客)

笔者本人 17 年就读于一所普通的本科学校&#xff0c;20 年 6 月在三年经验的时候顺利通过校招实习面试进入大厂&#xff0c;现就职于某大厂安全联合实验室。 我为啥说自学黑客&#xff0c;一般人我还是劝你算了吧&#xff01;因为我就是那个不一般的人。 首先我谈下对黑客&a…