6.1、Flink数据写入到文件

news2024/10/6 8:23:31

1、前言

Flink API 提供了FileSink连接器,来帮助我们将数据写出到文件系统中去

版本说明:java1.8、flink1.17

官网链接:官网


2、Format Types - 指定文件格式

FileSink 支持 Row-encoded 、Bulk-encoded 两种格式写入文件系统

        Row-encoded:文本格式

        Bulk-encoded:Parquet、Avro、SequenceFile、Compress、Orc

Row-encoded sink: FileSink.forRowFormat(basePath, rowEncoder)
Bulk-encoded sink: FileSink.forBulkFormat(basePath, bulkWriterFactory)

3、桶分配 - 文件分区策略(分目录)

桶的逻辑定义了如何将数据分配到基本输出目录内的子目录中。(好比Hive中的分区)

Flink 内置了两种同分配策略:

  • DateTimeBucketAssigner :默认的基于时间的分配器
  • BasePathBucketAssigner :分配所有文件存储在基础路径上(单个全局桶)
BasePathBucketAssigner - 不会生成子目录
DateTimeBucketAssigner - 根据时间进行分桶

代码示例:

// TODO 按照时间进行分桶,每分钟生成一个子目录,目录名称为 yyyy-MM-dd HH-mm
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH-mm", ZoneId.systemDefault()))
// TODO 当个全局桶,不生成子目录
.withBucketAssigner(new BasePathBucketAssigner())

4、滚动策略 - 分文件

滚动策略定义`何时生成新的文件`,可以指定 文件创建时间和文件大小 进行配置

// TODO 文件滚动策略:  文件创建后1分钟 或 大小超过1m 时生成新的文件
.withRollingPolicy(
    DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(1))     //   指定文件持续时间
            .withMaxPartSize(new MemorySize(1024 * 1024))    //   指定文件大小
            .build()
)

5、文件命名&生命周期

Part 文件可以处于以下三种状态中的任意一种:

  1. In-progress :当前正在写入的 Part 文件处于 in-progress 状态
  2. Pending :由于指定的滚动策略)关闭 in-progress 状态文件,并且等待提交
  3. Finished :流模式(STREAMING)下的成功的 Checkpoint 或者批模式(BATCH)下输入结束,文件的 Pending 状态转换为 Finished 状态

注意:在 STREAMING 模式下使用 FileSink 需要开启 Checkpoint 功能。 Finished状态的文件只在 Checkpoint 成功时生成。如果没有开启 Checkpoint 功能,文件将永远停留在 in-progress 或者 pending 的状态,并且下游系统将不能安全读取该文件数据。

文件命名策略:

  • In-progress / Pending:prefix-part-<uid>-<partFileIndex>.ext.inprogress.uid
  • Finished:prefix-part-<uid>-<partFileIndex>.ext

    prefix : 文件名称前缀(默认为空)

  ext :文件名称后缀(默认为空)

  uid :uid 是一个分配给 Subtask 的随机 ID 值

└── 2019-08-25--12
    ├── prefix-4005733d-a830-4323-8291-8866de98b582-0.ext
    ├── prefix-4005733d-a830-4323-8291-8866de98b582-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    ├── prefix-81fc4980-a6af-41c8-9937-9939408a734b-0.ext
    └── prefix-81fc4980-a6af-41c8-9937-9939408a734b-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11

代码示例:

// TODO 指定输出文件的名称配置 前缀、后缀
.withOutputFileConfig(
    OutputFileConfig.builder()
            .withPartPrefix("flink") // 指定前缀
            .withPartSuffix("txt")   // 指定后缀
            .build()
)

6、这是一个完整的例子

package com.baidu.datastream.sink;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;
import java.time.ZoneId;

// TODO flink 数据输出到文件系统
public class SinkFiles {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // STREAMING 模式时,必须开启checkpoint,否则文件一直都是 .inprogress
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        // 2.指定数据源
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);

        // 3.初始化 FileSink 实例
        FileSink<String> fileSink = FileSink
                // TODO 指定输出方式 行式输出、文件路径、编码
                .<String>forRowFormat(new Path("data/output"), new SimpleStringEncoder<String>("UTF-8"))
                // TODO 指定输出文件的名称配置 前缀、后缀
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("flink") // 指定前缀
                                .withPartSuffix(".txt")   // 指定后缀
                                .build()
                )
                // TODO 按照时间进行目录分桶:每分钟生成一个目录,目录格式为 yyyy-MM-dd HH-mm
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH-mm", ZoneId.systemDefault()))
                // TODO 文件滚动策略:  1分钟 或 1m 生成新的文件
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(1))
                                .withMaxPartSize(new MemorySize(1024 * 1024))
                                .build()
                )
                .build();

        streamSource.sinkTo(fileSink);

        // 3.触发程序执行
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1004503.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

小程序自定义tabbar

前言 使用小程序默认的tabbar可以满足常规开发&#xff0c;但是满足不了个性化需求&#xff0c;如果想个性化开发就需要用到自定义tabbar,以下图为例子 一、在app.json配置 先按照以往默认的形式配置&#xff0c;如果中间的样式特殊则不需要配置 "tabBar": {&qu…

SpringWeb解析

目录 运行流程 组件介绍 简单搭建一个SpringWeb项目 1.导入依赖 2.配置DispatcherServlet 3.开启注解 4.处理器搭建 5.接收请求 获取请求的数据 中文乱码 返回json格式 拦截器 代码实现 SpringWeb 是 spring 框架中的一个模块&#xff0c;基于 Servlet API 构建的原…

豆瓣图书评分数据的可视化分析

导语 豆瓣是一个提供图书、电影、音乐等文化产品的社区平台&#xff0c;用户可以在上面发表自己的评价和评论&#xff0c;形成一个丰富的文化数据库。本文将介绍如何使用爬虫技术获取豆瓣图书的评分数据&#xff0c;并进行可视化分析&#xff0c;探索不同类型、不同年代、不同…

如何做好医药产品说明书翻译?

近年来&#xff0c;随着世界各国之间的交流日渐紧密&#xff0c;医药产业发达国家的药品和医用器械对于其他国家的输出日益增多&#xff0c;但这些医疗产品在流通过程中&#xff0c;往往需要专业的医药翻译人员进行产品说明书的精确翻译。那么&#xff0c;如何做好医药产品说明…

掌动智能:提升硬件连通性测试效率与精确度

在当今数字化时代&#xff0c;各种智能设备和硬件组件在我们的日常生活和工作中扮演着越来越重要的角色。为确保这些设备正常运行&#xff0c;并实现跨设备的无缝连接&#xff0c;硬件连通性测试变得至关重要。为了提高测试效率与精确度&#xff0c;让我们介绍掌动智能作为一家…

物理层(408)

一、通信基础 【2009】在无噪声的情况下&#xff0c;若某通信链路的带宽为3kHz&#xff0c;采用4个相位&#xff0c;每个相位具有4种振幅的QAM调制技术&#xff0c;则该通信链路的最大数据传输速率是&#xff08;B&#xff09; A、12kb/s B、24kb/s C、48kb/s …

A股风格因子看板 (2023.09 第01期)

该因子看板跟踪A股风格因子&#xff0c;该因子主要解释沪深两市的市场收益、刻画市场风格趋势的系列风格因子&#xff0c;用以分析市场风格切换、组合风格暴 露等。 今日为该因子跟踪第1期&#xff0c;指数组合数据截止日2023-08-31&#xff0c;要点如下 近1年A股风格因子收益走…

《DevOps实践指南》- 读书笔记(五)

DevOps实践指南 Part 4 第二步 &#xff1a;反馈的技术实践14. 建立能发现并解决问题的遥测系统14.1 建设集中式监控架构14.2 建立生产环境的应用程序日志遥测14.3 使用遥测指导问题的解决14.4 将建立生产遥测融入日常工作14.5 建立自助访问的遥测和信息辐射器14.6 发现和填补遥…

元宇宙全球市场规模到2030年将达9805亿美元!

元宇宙是一种新兴的概念&#xff0c;它指的是一个虚拟的世界&#xff0c;由人工智能、虚拟现实、区块链等技术构建而成。元宇宙的起源可以追溯到上世纪90年代的虚拟世界“第二人生”&#xff0c;但直到近年来&#xff0c;随着技术的不断发展&#xff0c;它才逐渐成为了人们关注…

labelme2voc 标签重叠/覆盖问题

使用labelme自带的 labelme2voc.py转换voc数据集时可能标签重叠

C语言实现单链表和双向循环链表

全文目录 链表单链表实现申请节点头插尾插头删尾删任意节点后插入删除单链表的销毁 带头双向循环链表实现链表初始化申请节点头插尾插头删尾删任意节点后插入删除链表的销毁 链表和顺序表对比总结 链表 概念&#xff1a;链表是一种物理存储结构上非连续、非顺序的存储结构&…

GO语言篇之发布开源软件包

GO语言篇之发布开源软件包 文章目录 GO语言篇之发布开源软件包新建仓库拉取到本地初始化项目编写代码提交代码发布引用软件包 我们写GO语言程序的时候难免会引用第三方的软件包&#xff0c;那么你知道别人是怎么发布自己的软件包吗&#xff0c;别急&#xff0c;这篇博客教你怎么…

Apache DolphinScheduler 如何实现自动化打包+单机/集群部署?

Apache DolphinScheduler 是一款开源的分布式任务调度系统&#xff0c;旨在帮助用户实现复杂任务的自动化调度和管理。DolphinScheduler 支持多种任务类型&#xff0c;可以在单机或集群环境下运行。下面将介绍如何实现 DolphinScheduler 的自动化打包和单机/集群部署。 自动化…

【Oracle】数据库导入导出

Oracle数据库导入导出 文章目录 Oracle数据库导入导出一、expdp导出1、管理员身份登录2、删除以前测试的用户及对应的数据3、创建表空间&#xff08;源表--待导出的表&#xff09;4、创建用户&#xff0c;给用户设置默认表空间和临时表空间5、给用户授权&#xff08;创建表和视…

Unity+百度文心大模型驱动AI小姐姐数字人

1.简述 最近看到新闻&#xff0c;说是百度、字节、商汤、百川、智普等几家企业及机构所发布的生成式大语言模型&#xff0c;通过了《生成式人工智能服务管理暂行办法》&#xff0c;成为首批获得官方备案的大语言模型服务提供商。虽然一直在使用包括文心一言、chatglm这些大语言…

怒赞了,阿里P8面试官推荐的Java高并发核心编程文档

前言 学完阿里P8面试官推荐的Java高并发核心编程文档后&#xff0c;终于拿到了蚂蚁p6的offer&#xff0c;这份文档包含的内容有点多。 Java高并发核心编程文档《尼恩Java高并发三部曲》获读者怒赞&#xff01;获取方式见文末 文章目录 前言尼恩Java高并发三部曲卷1&#xff1…

适合引流的运动步数打卡抽奖小程序源码开发

要健康也要瘦&#xff1f;那么有一个可以让你悄悄改变还可以获取奖品的小程序简直不要太入心。用运动步数兑换奖品&#xff0c;每天运动一下&#xff0c;换点小礼品&#xff0c;简直不要太惬意。 运动步数兑换小程序核心亮点&#xff1a; 小程序与微信运动做了关联&#xff…

Android环境配置笔记

文章目录 一、各环境文档二、参考 一、各环境文档 Gradle官方的兼容性文档&#xff1a;Java Compatibility 更新日期&#xff1a;2023.9.12 Android Gradle插件版本&#xff1a;Android Gradle Plugin 二、参考 参考文章&#xff1a;Android问题记录

SS928搭建NNN环境

环境要求&#xff1a;ubuntu18.04 参考文件&#xff1a; 《ATC工具使用指南》《应用开发指南》《驱动和开发环境安装指南》 《昇腾模型压缩工具使用指南&#xff08;ONNX&#xff09;》 交叉编译器的安装-----------------------------------------------------------------…

C语言“牵手”淘宝商品评论数据方法,淘宝商品评论接口,淘宝商品评价接口,淘宝API接口申请指南

淘宝商品评论API是淘宝开放平台为开发者提供的一套应用程序编程接口&#xff0c;通过该接口&#xff0c;开发者可以获取到店铺所有商品的评价数据。 淘宝商品评论API包含以下接口&#xff1a; taobao.item.reviews.get&#xff1a;用于获取指定商品的评价数据&#xff0c;输入…