Flink将数据写入CSV文件后文件中没有数据

news2025/1/12 22:46:18

Flink中有一个过时的sink方法:writeAsCsv,这个方法是将数据写入CSV文件中,有时候我们会发现程序启动后,打开文件查看没有任何数据,日志信息中也没有任何报错,这里我们结合源码分析一下这个原因.

这里先看一下数据处理的代码
代码中我是使用的自定义数据源生产数据的方式,为了方便测试

import lombok.*;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/6/19
 * @Description: 自定义数据源测试
 **/
public class FlinkCustomizeSource {
    public static void main(String[] args) throws Exception {
        // 创建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(1); // 这里的并行度设置为几就会生成多少个csv文件
        // 添加自定义数据源
         DataStreamSource<CustomizeBean> dataStreamSource = env.addSource(new customizeSource());
        // 先将数据转换成Tuple类型,这样才能写入csv中
        SingleOutputStreamOperator<Tuple4<String, Integer, String, String>> tuple4Stream = dataStreamSource.map(
                bean -> Tuple4.of(bean.getName(), bean.getAge(), bean.getGender(), bean.getHobbit())
        ).returns(new TypeHint<Tuple4<String, Integer, String, String>>() {});
        // 选择csv类型的sink,模式使用的覆盖
        tuple4Stream.writeAsCsv("/Users/xxx/data/testData/test.csv", FileSystem.WriteMode.OVERWRITE);
        env.execute();
    }
}

// 自定义数据源需要实现SourceFunction接口,注意这个接口是单机的数据源,如果是想自定义分布式的数据源需要集成RichParallelSourceFunction类
class customizeSource implements SourceFunction<CustomizeBean> {
    int flag;
    // Job执行的线程
    @Override
    public void run(SourceContext ctx) throws Exception {
        /*这个方法里就是具体的数据逻辑,实际内容要根据业务需求编写,这里只是为了演示方便*/
        CustomizeBean customizeBean = new CustomizeBean();
        String[] genders = {"M", "W"};
        String[] hobbits = {"篮球运动爱好者", "钓鱼爱好者", "乒乓球运动爱好者", "美食爱好者", "羽毛球运动爱好者", "天文知识爱好者", "旅游爱好者", "书法爱好者", "非遗文化爱好者", "网吧战神"};
        while (flag != 100) {
            // 这里自定义的Bean作为数据源
            customizeBean.setAge(RandomUtils.nextInt(18, 80)); // 年龄
            customizeBean.setName("A-" + new Random().nextInt()); // 姓名
            customizeBean.setGender(genders[RandomUtils.nextInt(0, genders.length)]); // 性别
            customizeBean.setHobbit(hobbits[RandomUtils.nextInt(0, hobbits.length)]); // 爱好
            // 将数据收集
            ctx.collect(customizeBean);
            // 睡眠时间是为了控制数据生产的速度,演示效果更加明显
            Thread.sleep(1000);
        }
    }

    // Job取消时就会调用cancel方法
    @Override
    public void cancel() {
        // flag为100时就会停止程序
        flag = 100;
    }
}

@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
class CustomizeBean{
    private String name;
    private int age;
    private String gender;
    private String hobbit;
}

上面的代码中我们使用自定义数据源的方式(java bean[CustomizeBean]),通过设置Thread.sleep(1000)可以固定每秒生成一条数据.这里我们先看一下存储CSV文件的目录
在这里插入图片描述
通过上图可以看到程序没有启动时,目录是空的,这里我们启动一下程序
日志内容如下

[2023-06-19 15:26:37,755]-[INFO] -org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader -3206 -org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader.load(StateChangelogStorageLoader.java:98).load(98) | Creating a changelog storage with name 'memory'.
[2023-06-19 15:26:37,766]-[INFO] -org.apache.flink.runtime.taskexecutor.TaskExecutor -3217 -org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:757).submitTask(757) | Received task Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203), deploy into slot with allocation id b691e34573507d585516decbedb36384.
[2023-06-19 15:26:37,768]-[INFO] -org.apache.flink.runtime.taskmanager.Task -3219 -org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1080).transitionState(1080) | Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203) switched from CREATED to DEPLOYING.
[2023-06-19 15:26:37,769]-[INFO] -org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl -3220 -org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl.markExistingSlotActive(TaskSlotTableImpl.java:388).markExistingSlotActive(388) | Activate slot b691e34573507d585516decbedb36384.
[2023-06-19 15:26:37,773]-[INFO] -org.apache.flink.runtime.taskmanager.Task -3224 -org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:623).doRun(623) | Loading JAR files for task Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203) [DEPLOYING].
[2023-06-19 15:26:37,788]-[INFO] -org.apache.flink.streaming.runtime.tasks.StreamTask -3239 -org.apache.flink.runtime.state.StateBackendLoader.loadFromApplicationOrConfigOrDefaultInternal(StateBackendLoader.java:257).loadFromApplicationOrConfigOrDefaultInternal(257) | No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4e1fcd2f
[2023-06-19 15:26:37,789]-[INFO] -org.apache.flink.runtime.state.StateBackendLoader -3240 -org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:315).fromApplicationOrConfigOrDefault(315) | State backend loader loads the state backend as HashMapStateBackend
[2023-06-19 15:26:37,789]-[INFO] -org.apache.flink.streaming.runtime.tasks.StreamTask -3240 -org.apache.flink.runtime.state.CheckpointStorageLoader.createJobManagerCheckpointStorage(CheckpointStorageLoader.java:274).createJobManagerCheckpointStorage(274) | Checkpoint storage is set to 'jobmanager'
[2023-06-19 15:26:37,793]-[INFO] -org.apache.flink.runtime.taskmanager.Task -3244 -org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1080).transitionState(1080) | Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203) switched from DEPLOYING to INITIALIZING.
[2023-06-19 15:26:37,795]-[INFO] -org.apache.flink.runtime.executiongraph.ExecutionGraph -3246 -org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1416).transitionState(1416) | Source: Custom Source -> Map -> Sink: Unnamed (1/1) (965035c5eef2b8f28ffcfc309b92e203) switched from DEPLOYING to INITIALIZING.
[2023-06-19 15:26:37,836]-[INFO] -org.apache.flink.runtime.taskmanager.Task -3287 -org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1080).transitionState(1080) | Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203) switched from INITIALIZING to RUNNING.
[2023-06-19 15:26:37,837]-[INFO] -org.apache.flink.runtime.executiongraph.ExecutionGraph -3288 -org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1416).transitionState(1416) | Source: Custom Source -> Map -> Sink: Unnamed (1/1) (965035c5eef2b8f28ffcfc309b92e203) switched from INITIALIZING to RUNNING.

这里的日志我截取了最后的部分,可以看到没有任何报错的,我们在看一下生成的CSV文件
在这里插入图片描述
这里我们再将文件打开,看一下有没有数据
在这里插入图片描述
通过图片可以看到这个文件中是没有任何数据的.
这里我先说一下原因,然后再结合源码看一下,没有数据的原因是数据在内存中还没有达到4k的缓存,没有到这个数据量就不会将数据刷新到磁盘上,代码中我们加入了睡眠时间Thread.sleep(1000)就是为了看到这个效果,接下来我们就结合源码看一下.writeAsCsv这个方法的缓存刷新是不是4k,我们先看一下.writeAsCsv的内容,点击去源码后我们先找到下面这段代码

    @Deprecated
    @PublicEvolving
    public <X extends Tuple> DataStreamSink<T> writeAsCsv(
            String path, WriteMode writeMode, String rowDelimiter, String fieldDelimiter) {
        Preconditions.checkArgument(
                getType().isTupleType(),
                "The writeAsCsv() method can only be used on data streams of tuples.");

        CsvOutputFormat<X> of = new CsvOutputFormat<>(new Path(path), rowDelimiter, fieldDelimiter);// 着重看这里,我们在看一下CsvOutputFormat里面的内容

        if (writeMode != null) {
            of.setWriteMode(writeMode);
        }

        return writeUsingOutputFormat((OutputFormat<T>) of);
    }

这里我们在点击去看CsvOutputFormat这个输出,找到如下内容

 @Override
    public void writeRecord(T element) throws IOException {
        int numFields = element.getArity();

        for (int i = 0; i < numFields; i++) {
            Object v = element.getField(i);
            if (v != null) {
                if (i != 0) {
                    this.wrt.write(this.fieldDelimiter);
                }

                if (quoteStrings) {
                    if (v instanceof String || v instanceof StringValue) {
                        this.wrt.write('"'); // 我们要注意到wrt这个变量
                        this.wrt.write(v.toString());
                        this.wrt.write('"');
                    } else {
                        this.wrt.write(v.toString());
                    }
                } else {
                    this.wrt.write(v.toString());
                }
            } else {
                if (this.allowNullValues) {
                    if (i != 0) {
                        this.wrt.write(this.fieldDelimiter);
                    }
                } else {
                    throw new RuntimeException(
                            "Cannot write tuple with <null> value at position: " + i);
                }
            }
        }

        // add the record delimiter
        this.wrt.write(this.recordDelimiter);
    }

这里我们先看一下writeRecord(T element)这个方法,实际上在我们调用writeAsCsv的时候底层就是通过writeRecord方法将数据写入csv文件,我们看上面代码的时候要注意到this.wrt这个变量,通过wrt我们就可以找到,对数据刷新到磁盘定义的数据量的大小,看一下对wrt的定义,源码内容如下

    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        super.open(taskNumber, numTasks);
        this.wrt =
                this.charsetName == null
                        ? new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096)) // 看一下这里
                        : new OutputStreamWriter(
                                new BufferedOutputStream(this.stream, 4096), this.charsetName); // 还有这里
    }

通过上面的源码我们可以看到BufferedOutputStream的缓冲流定义死了为4096,也就是4k大小,这个参数是写死的,我们改变不了,所以在使用writeAsCsv这个方法时,代码没有报错,并且文件中也没有数据时先不要慌,通过源码先看看具体的实现逻辑,我们就可以很快定位到问题,如果代码中我将Thread.sleep(1000)这行代码删除掉的话CSV文件中很快就会有数据的,代码中我使用的自定义数据源,并且每条数据其实很小,还有睡眠1秒的限制,所以导致很久CSV文件中都没有数据生成.
文章内容写到现在也过了很久了,数据的大小也满足4k的条件了,我们看一下文件内容
在这里插入图片描述
可以看到文件中已经生成了数据,我们在看一下文件的大小
在这里插入图片描述
说到这里我想大家应该都理解了,虽然说了这么多关于writeAsCsv这个方法的内容,但是不建议大家使用这个方法毕竟属于过时的方法,用起来弊端也比较大.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/662522.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AIVA.AI:AI音乐作曲创作平台

【产品介绍】 AIVA.AI是一个人工智能音乐创作平台&#xff0c;可以根据用户的需求和喜好&#xff0c;自动生成不同风格和情感的音乐。目标是成为创意人士的创意助手&#xff0c;无论是独立游戏开发者、音乐新手还是专业作曲家&#xff0c;都可以利用AIVA.AI的技术来为自己的项目…

分布式各系统时间统一程序

目录 1、背景2、Cristians algorithm 算法&#xff08;克里斯蒂安算法&#xff09;3、实现思路3.1、步骤&#xff1a;3.2、公式 4、具体代码4.1、构建时间戳4.2、定义数据包4.3、客户端实现4.3、服务端实现 说明 1、背景 使用场景是在一个大型分布式系统下&#xff0c;对时间有…

第一天,掌握PyTorch的张量创建

文章目录 一&#xff0c;张量二&#xff0c;创建张量1. 直接从数据来创建张量Tensor函数TODO &#xff1a;从数据直接创建张量 2. 从numpy数据创建张量from_numpy函数从numpy数据创建张量 3. 从另一个张量来进行创建张量4. 使用随机值或者常数值来创建张量5. 张量的属性&#x…

【C】C语言数据类型、常量变量的介绍

C语言基础 数据类型常量和变量变量全局变量和局部变量变量的作用域和生命周期作用域生命周期 常量 数据类型 下图为C语言常见的数据类型&#xff1a; &#xff08;浮点数就是我们常见的小数&#xff0c;字符类型要用‘’引起来&#xff0c;在C语言中字符串类型要用字符数组来…

如何知识变现?介绍几个变现途径

哈喽&#xff0c;大家好&#xff0c;我是海哥&#xff0c;知识付费变现创业教练&#xff0c;教育公司培训总监&#xff0c;从事知识付费变现咨询10年&#xff0c;已助力3000人实现知识付费变现。 在互联网时代&#xff0c;所有线下的产业都可以在线上再做一遍&#xff0c;知识产…

简易版python爬虫--通过关键字爬取网页

背景&#xff1a; 帮同学写了个爬虫程序&#xff0c;特此记录&#xff0c;怕以后忘了 这里是爬取百度https://www.baidu.com 不为什么&#xff0c;主要就是百度老实&#xff0c;能爬&#xff0c;爬着简单&#xff0c;爬着不犯法。。。 关键字爬取基本模板&#xff1a; import…

Git第一章、Git的原理与使用

背景知识&#xff1a; 我们在编写各种文档时&#xff0c;为了防止文档丢失&#xff0c;更改失误&#xff0c;失误后能恢复到原来的版本&#xff0c;不得不复制出一个副本。每个版本有各自的内容&#xff0c;但最终会只有一份报告需要被我们使用 。但在此之前的工作都需要这些不…

[Flutter]理解Widget-Key的作用

这里主要是理解在Widget中key的作用/用途。 import dart:math;import package:flutter/material.dart;/// 这里主要是理解在Widget中key的作用/用途。 void main() {runApp(const MyApp()); }class MyApp extends StatelessWidget {const MyApp({super.key});overrideWidget b…

Vault从入门到精通系列之二:启动Vault服务器

Vault从入门到精通系列之二&#xff1a;启动Vault服务器 一、启动开发服务器二、设置环境变量三、验证服务器正在运行四、vault命令汇总 Vault 作为客户端-服务器应用程序运行。Vault 服务器是唯一与数据存储和后端交互的 Vault 架构。通过 Vault CLI 完成的所有操作都通过 TLS…

【Leetcode60天带刷】day09字符串—— 459.重复的子字符串,28. 实现 strStr(),剑指Offer 05.替换空格

题目&#xff1a; 459. 重复的子字符串 给定一个非空的字符串 s &#xff0c;检查是否可以通过由它的一个子串重复多次构成。 示例 1: 输入: s "abab" 输出: true 解释: 可由子串 "ab" 重复两次构成。示例 2: 输入: s "aba" 输出: false示例…

Grammarly:AI语法检测写作助手工具

【产品介绍】 Grammarly是于2009年发布&#xff0c;当前使用最普遍和准确的英语语法检查&#xff08;grammar checker&#xff09;、拼写、校对&#xff08;proofreading&#xff09;检查和抄袭&#xff08;plagiarism&#xff09;检测软件工具&#xff0c;其中Grammarly校对检…

pytorch笔记:transformer 和 vision transformer

来自B站视频&#xff0c;API查阅&#xff0c;TORCH.NN seq2seq 可以是 CNN&#xff0c;RNN&#xff0c;transformer nn.Transformer 关键源码&#xff1a; encoder_layer TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout,activation, layer_norm_eps, ba…

杂谈 | 人类微生物组研究:解析挑战与前景

谷禾健康 人类微生物组研究正在从描述关联发展到了解整个微生态对人类的影响。虽然存在挑战&#xff0c;但在应用数据驱动的微生物组诊断和干预方面正在取得进展&#xff0c;这可能会在未来十年内带来精准医学的突破。 本文我们来探讨关于微生物组的研究进展及其对人类健康的影…

Windows10配置Kiosk(展台)模式

Windows10配置Kiosk&#xff08;展台&#xff09;模式 新建Kiosk专用用户 进入windows设置->账户->家庭与其他用户&#xff0c;点击’将其他人添加到这台电脑’ 在弹出的对话框里&#xff0c;点击我没有这个人的登录信息&#xff0c;然后添加一个没有Microsoft账户的用…

什么是眼图

眼图是指示波器用余辉方式将捕获的波形按每3bit的形式累积叠加显示采集到的串行信号的比特位的结果&#xff0c;叠加后的图形形状看起来和眼睛很像 眼图有很多参数&#xff0c;可以用来量化信号的质量&#xff0c;例如&#xff1a; 眼高&#xff1a;指眼图中最大和最小电压之差…

WOT全球技术创新大会2023在京召开:创新不止,实战为王

AIGC、大模型、大算力异常火爆的背后&#xff0c;其应用场景是什么、商业实践有何规律可循&#xff1f;多云实践、业务架构演进、研发效能等看似“老生常谈”的技术热点&#xff0c;在崇尚业务创新的当前时代有何新的发展趋势&#xff1f; 6月16-17日&#xff0c;51CTO集结50来…

android存储2--初始化.存储service的启动

android版本&#xff1a;android-11.0.0_r21 http://aospxref.com/android-11.0.0_r21/ android存储初始化分3个阶段&#xff1a; 1&#xff09;清理环境。因android支持多用户&#xff0c;解锁后登录的可能是另一个用户&#xff0c;需把之前用户执行的一些信息清理干净。《an…

Net6.0项目升级到Net7.0

NetCore3.1升级到Net6.0&#xff0c;可参考此文章&#xff1a;NetCore3.1项目升级到Net6.0_vs2022 没有startup_csdn_aspnet的博客-CSDN博客 其实与我之前发布的步骤基本一致&#xff0c;升级到net6.0之后&#xff0c;在升级net7.0基本没有可修改的代码&#xff0c;只是升级一…

NOTA双功能螯合剂:NOTA PEG11 MeTz,NOTA PEG11 Azide,两者试剂信息知识总结说明

NOTA及其衍生物是新型双功能整合剂之一。NOTA及其衍生物具有良好的配位和鳌合能力&#xff0c;可作为过渡金属离子的配体。 本文主要就NOTA PEG11 MeTz&#xff0c;NOTA PEG11 Azide两者进行说明&#xff0c;以下内容希望可以帮助到大家。 一、MeTz-PEG11-NOTA 理论分析&#…

【spring源码系列-06】refresh中obtainFreshBeanFactory方法的执行流程

Spring源码系列整体栏目 内容链接地址【一】spring源码整体概述https://blog.csdn.net/zhenghuishengq/article/details/130940885【二】通过refresh方法剖析IOC的整体流程https://blog.csdn.net/zhenghuishengq/article/details/131003428【三】xml配置文件启动spring时refres…