Flink DataStream之输出数据到File中

news2024/11/15 1:34:11
  • 新建类
package test01;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class TestOutputFile {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
        executionEnvironment.setParallelism(1);

        //监听数据端口
        DataStreamSource<String> dataSource = executionEnvironment.socketTextStream("localhost", 9999);

        //开启checkpoint,这样到了一定节点就会关闭文件,否则文件一直都是inprogress,此处设置的检查点是2秒。
        executionEnvironment.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        //输出至文件
        FileSink<String> fileSink = FileSink
                //设置按行输出,指定输出的路径及编码格式,这里的泛型指定的是字符串类型。
                .<String>forRowFormat(new Path("D:/IT/testfilnk"), new SimpleStringEncoder<>("UTF-8"))
                //设置输出文件名的前缀和后缀
                .withOutputFileConfig(OutputFileConfig.builder()
                        .withPartPrefix("test-flink-output-")
                        .withPartSuffix(".log")
                        .build())
                //设置文件滚动策略,这里设置的是20s和1024B(1KB),滚动策略满足其一就会重新写新文件。
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofSeconds(20))
                                .withMaxPartSize(new MemorySize(1024))
                                .build()
                )
                .build();

        dataSource.sinkTo(fileSink);
        executionEnvironment.execute();
    }
}
  • 启动程序并启动nc -lp

输入数据:

正在写入的文件会有inprogress的标识(在指定的目录下生成文件时会按照日期的年月日时进行分目录,因为我在执行时的时间是2023/7/12 22点,所以它就会自动生成一个2023-07-12--22目录,分桶策略也可以自己在代码中配置。):

 当满足滚动策略时,会结束当前文件,然后重新写入新文件:

查看文件内容:

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/748996.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Model, ViewModel, EnvironmentObject 的使用

1. Model 数据模型的定义与使用 1.1 案例 struct UserModel: Identifiable{let id: String UUID().uuidStringlet dispalyName: Stringlet userName: Stringlet followerCount: Intlet isVerified: Bool }/// 数据模型 struct ModelBootcamp: View {State var users:[Use…

web 禁用 OPTIONS方法启用【原理扫描】

Web服务器上启用了HTTP OPTIONS方法。 OPTIONS方法提供了Web服务器支持的方法列表&#xff0c;它表示对有关由Request-URI标识的请求/响应链上可用的通信选项的信息的请求。 直接在IIS上进行关闭即可&#xff1a;

osg osgDB::readImageFile 返回空指针 解决中

在 osg功能开发中,需要用到 纹理 加载图片&#xff0c;最神奇的之前 好好的。 现在 把osg 编译成了 osg 342vs2013x86 环境 就出现幺蛾子了&#xff0c;之前是使用的 osg364vs2013x86。结果 命令行运行 加载图片 直接 有 warning 提示。还在处理中&#xff01; 提示 找不到文…

Nginx 的Nacos配置

进入nginx 配置目录 cd /usr/local/nginx/conf 2. 编辑nginx配置文件 vi nginx.conf 3. 增加对Nacos 的代理 upstream nacosServerList {server 192.168.172.102:8848;server 192.168.172.103:8848;server 192.168.172.104:8848; } # Nacos地址服务器寻址配置 server {#监听端…

微信小程序第六节——个体账号如何实现用户自定义内容

&#x1f4cc; 微信小程序第一节 ——自定义顶部、底部导航栏及获取胶囊位置信息。 &#x1f4cc; 微信小程序第二节 —— 微信小程序第二节 —— 自定义组件。 &#x1f4cc; 微信小程序第三节 —— 页面跳转的那些事儿。 &#x1f4cc; 微信小程序第四节 —— 网络请求那些事…

matlab GUI入门

matlab GUI入门 两种方法 法一&#xff1a;使用guide 法二&#xff1a;使用appdesigner&#xff08;推荐&#xff0c;更直观&#xff09; winopen(cd) 打开当前路径。 ctrlI 代码自动对齐 matlab 导入数据文件 导入图片数据 用imread&#xff08;&#xff09;函数导入…

第 3 章 Spark 通讯架构

3.1 Spark 通信架构概述 3.1 Spark 通信架构概述 Spark 中通信框架的发展&#xff1a; ➢ Spark 早期版本中采用 Akka 作为内部通信部件。 ➢ Spark1.3 中引入 Netty 通信框架&#xff0c;为了解决 Shuffle 的大数据传输问题使用 ➢ Spark1.6 中 Akka 和 Netty 可以配置使用。…

bug的合规描述

bug的合格描述&#xff1a; 发现问题的版本bug的合格描述&#xff1a; 开发人员需要知道出现问题的版本&#xff0c;才能够获取对应版本的代码来重现故障问题出现的环境 环境分为硬件环境和软件环境&#xff0c;详细的环境描述有利于故障的重现(如果是web项目&#xff0c;需…

到手价的监测要求和方法

品牌在做电商价格监测时&#xff0c;为什么要对到手价进行监测&#xff0c;这其中的原因还是很显现的&#xff0c;各平台的促销信息众多&#xff0c;如果只监测页面价的低价行为&#xff0c;那将有非常多的低价链接不会被发现&#xff0c;而这也会导致品牌做渠道管控时失去公平…

如何随机切换代理IP以避免被封禁?

在网络爬虫和数据抓取的领域&#xff0c;使用代理IP技术是非常常见的做法。使用代理IP可以有效地绕过网站的访问限制&#xff0c;提高访问速度和稳定性。然而&#xff0c;如果我们在访问网站时只使用一个代理IP&#xff0c;那么可能会被网站封禁&#xff0c;从而导致访问失败。…

开源预训练框架 MMPRETRAIN官方文档(高级指南)

1、准备数据集 1、自定义数据集&#xff08;下面都是分类数据的自定义数据集准备&#xff09; CustomDataset是一个通用数据集类&#xff0c;供您使用自己的数据集。要使用CustomDataset&#xff0c;您需要根据以下两种格式组织数据集文件&#xff1a; 1、子文件夹格式 在这…

正则表达式概念以及语法的使用

目录 1.概念 2. 为什么使用正则表达式&#xff1f; 3. 语法 1.普通字符 非打印字符 2. 特殊字符 3. 限定符 4. 定位符 5. 运算优先级 3.匹配规则 1. 基本模式匹配 2. 字符簇 3. 确定重复出现 1.概念 正则表达式(Regular Expression)是一种文本模式&#xff0c;包…

MAYA挖掘机绑定

打组 少选一个 放中心点 把它放组里 放中心点 创建骨骼 放骨骼 旋转不会带动上面骨骼 中心点的位置 骨骼和组做约束 活塞运行 放中心点 相互目标 管子短&#xff0c;需要加长 又短了 设置中心点 创建IK 制作控制器 让控制器带动模型动 手柄 IK 少一个控制器 删除 不用的…

途乐证券杠杆开户-A股首份半年报出炉 康缘药业净利同比增30.6%

中药职业迎成绩兑现期&#xff0c;多家公司上半年盈利估计倍增 7月12日晚&#xff0c;沪深两市首份半年报出炉。康缘药业半年报显示&#xff0c;公司上半年完成营收25.53亿元&#xff0c;同比添加21.74%&#xff1b;完成归母净利润2.76亿元&#xff0c;同比添加30.6%。 康缘药…

Endnote更新所有信息不全的参考文献(中英文文献信息不全)

方法一&#xff1a;手动 找到reference一个一个输入&#xff08;这里针对某些没有doi的文献&#xff0c;有doi的也可以&#xff0c;只要你愿意&#xff09; 方法二&#xff1a;自动 中文文献------选择txt格式导入endnote&#xff0c;在endnote里选择endnote import&#xf…

得物自建 DTS 平台的技术演进 | 精选

0 前言 DTS是数据传输平台(Data Transfer Platform的缩写) 随着得物App的用户流量增长&#xff0c;业务选择的数据库越来越多样化&#xff0c;异构数据源之间的数据同步需求也逐渐增多。为了控制成本并更好地支持业务发展&#xff0c;我们决定自建DTS平台。本文主要从技术选型、…

Qt6之通用文件格式.dat

dat文件&#xff0c;在Windows中大量存在&#xff0c;到处都有。如下图edge浏览器安装目录下就有一个dat文件&#xff0c;如果你强行打开&#xff0c;发现它是乱码的。 一、什么是dat文件 DAT 文件格式只是一种通用格式&#xff0c;它在文件中包含任何类型的数据&#xff0c;它…

使用 TensorRT、卡尔曼滤波器和 SORT 算法进行实时对象检测和跟踪:第 1 部分训练模型

实时物体检测和跟踪在监控、自动驾驶和机器人等各种应用中至关重要。这些任务需要能够实时处理高分辨率视频流的高效算法。近年来,基于深度学习的目标检测算法(例如YOLO、SSD和Faster R-CNN)在图像和视频中的目标检测和定位方面显示出了令人印象深刻的结果。然而,这些算法的…

JVM面试题详解

JVM介绍 JVM是什么&#xff1f; JVM由哪些部分组成&#xff1f;运行流程是什么&#xff1f; JVM组成 什么是程序计数器 你能给我详细的介绍Java堆吗&#xff1f; 什么是虚拟机栈&#xff1f; 堆栈的区别是什么&#xff1f; 能不能解释一下方法区 你听过直接内存吗 类加载器 …

tx视频 wx小程序 视频缓存方案

本文所有教程及源码、软件仅为技术研究。不涉及计算机信息系统功能的删除、修改、增加、干扰,更不会影响计算机信息系统的正常运行。不得将代码用于非法用途,如侵立删!tx视频 wx小程序 视频缓存方案 环境 win10CharlesInternet Download Manager抓包分析 搜索关键词可以很容…