68、Flink DataStream Connector 之文件系统详解

news2025/1/11 15:02:49
文件系统
1.概述

连接器提供了 BATCH 模式和 STREAMING 模式统一的 Source 和 Sink。

Flink FileSystem abstraction支持连接器对文件系统进行(分区)文件读写,文件系统连接器为 BATCHSTREAMING 模式提供了相同的保证,而且对 STREAMING 模式执行提供了精确一次(exactly-once)语义保证。

连接器支持对任意(分布式的)文件系统(例如,POSIX、 S3、 HDFS)以某种数据格式 format (例如,Avro、 CSV、 Parquet) 对文件进行写入,或者读取后生成数据流或一组记录。

2.File Source
a)概述

File Source 是基于 Source API 同时支持批模式和流模式文件读取的统一 Source,File Source 分为以下两个部分:SplitEnumerator 和 SourceReader。

  • SplitEnumerator 负责发现和识别需要读取的文件,并将这些文件分配给 SourceReader 进行读取。
  • SourceReader 请求需要处理的文件,并从文件系统中读取该文件。

可能需要指定某种 format 与 File Source 联合进行解析 CSV、解码 AVRO、或者读取 Parquet 列式文件。

b)有界流和无界流

有界的 File Source(通过 SplitEnumerator)列出所有文件(一个过滤出隐藏文件的递归目录列表)并读取。

无界的 File Source 由配置定期扫描文件的 enumerator 创建;在无界的情况下,SplitEnumerator 将像有界的 File Source 一样列出所有文件,但是不同的是,经过一个时间间隔之后,重复上述操作;对于每一次列举操作,SplitEnumerator 会过滤掉之前已经检测过的文件,将新扫描到的文件发送给 SourceReader。

c)使用方法

可以通过调用以下 API 建立一个 File Source:

// 从文件流中读取文件内容
FileSource.forRecordStreamFormat(StreamFormat,Path...);
        
// 从文件中一次读取一批记录
FileSource.forBulkFileFormat(BulkFormat,Path...);

可以通过创建 FileSource.FileSourceBuilder 设置 File Source 的所有参数。

对于有界/批的使用场景,File Source 需要处理给定路径下的所有文件;对于无界/流的使用场景,File Source 会定期检查路径下的新文件并读取。

当创建一个 File Source 时(通过上述任意方法创建的 FileSource.FileSourceBuilder), 默认情况下,Source 为有界/批的模式;可以调用 AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration) 设置 Source 为持续的流模式。

final FileSource<String> source =
        FileSource.forRecordStreamFormat(...)
        .monitorContinuously(Duration.ofMillis(5))  
        .build();
d)Format Types

通过 file formats 定义的文件 readers 读取每个文件,其中定义了解析和读取文件内容的逻辑;Source 支持多个解析类,这些接口是实现简单性和灵活性/效率之间的折衷。

  • StreamFormat 从文件流中读取文件内容。它是最简单的格式实现, 并且提供了许多拆箱即用的特性(如 Checkpoint 逻辑),但是限制了可应用的优化(例如对象重用,批处理等等)。
  • BulkFormat 从文件中一次读取一批记录。 它虽然是最 “底层” 的格式实现,但是提供了优化实现的最大灵活性。

TextLine Format-不支持从 Checkpoint 恢复

使用 StreamFormat 格式化文件中的文本行,Java 中内置的 InputStreamReader 对使用了支持各种字符集的字节流进行解码。

此格式不支持从 Checkpoint 进行恢复优化;在恢复时,将重新读取并放弃在最后一个 Checkpoint 之前处理的行数;这是由于无法通过字符集解码器追踪文件中的行偏移量,及其内部缓冲输入流和字符集解码器的状态。

SimpleStreamFormat 抽象类

这是 StreamFormat 的简单版本,适用于不可拆分的格式;可以通过实现 SimpleStreamFormat 接口自定义读取数组或文件:

private static final class ArrayReaderFormat extends SimpleStreamFormat<byte[]> {
    private static final long serialVersionUID = 1L;

    @Override
    public Reader<byte[]> createReader(Configuration config, FSDataInputStream stream)
            throws IOException {
        return new ArrayReader(stream);
    }

    @Override
    public TypeInformation<byte[]> getProducedType() {
        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
    }
}

final FileSource<byte[]> source =
                FileSource.forRecordStreamFormat(new ArrayReaderFormat(), path).build();

CsvReaderFormat 是一个实现 SimpleStreamFormat 接口的例子。

CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
FileSource<SomePojo> source = 
        FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();

对于 CSV Format 的解析,在这个例子中,是根据使用 Jackson 库的 SomePojo 的字段自动生成的。

注意:可能需要添加 @JsonPropertyOrder({field1, field2, ...}) 这个注释到自定义的类上,并且字段顺序与 CSV 文件列的顺序完全匹配。

如果需要对 CSV 模式或解析选项进行更细粒度的控制,可以使用 CsvReaderFormat 的更底层的 forSchema 静态工厂方法:

CsvReaderFormat<T> forSchema(Supplier<CsvMapper> mapperFactory, 
                             Function<CsvMapper, CsvSchema> schemaGenerator, 
                             TypeInformation<T> typeInformation) 

Bulk Format

BulkFormat 一次读取并解析一批记录,BulkFormat 的实现包括 ORC Format 或 Parquet Format 等。

外部的 BulkFormat 类主要充当 reader 的配置持有者和工厂角色;BulkFormat.Reader 是在 BulkFormat#createReader(Configuration, FileSourceSplit) 方法中创建的,然后完成读取操作。

如果在流的 checkpoint 执行期间基于 checkpoint 创建 Bulk reader,那么 reader 是在 BulkFormat#restoreReader(Configuration, FileSourceSplit) 方法中重新创建的。

可以通过将 SimpleStreamFormat 包装在 StreamFormatAdapter 中转换为 BulkFormat

BulkFormat<SomePojo, FileSourceSplit> bulkFormat = 
        new StreamFormatAdapter<>(CsvReaderFormat.forPojo(SomePojo.class));
e)自定义文件枚举类
/**
 * 针对 Hive 数据源的 FileEnumerator 实现类,基于 HiveTablePartition 生成拆分文件
 */
public class HiveSourceFileEnumerator implements FileEnumerator {
    
    // 构造方法
    public HiveSourceFileEnumerator(...) {
        ...
    }

    /***
     * 拆分给定路径下的所有相关文件。{@code minDesiredSplits} 是一个可选项,代表需要多少次拆分才能正确利用并行度
     */
    @Override
    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
            throws IOException {
        // createInputSplits:splitting files into fragmented collections
        return new ArrayList<>(createInputSplits(...));
    }

    ...

    /***
     * 创建 HiveSourceFileEnumerator 的工厂
     */
    public static class Provider implements FileEnumerator.Provider {

        ...
        @Override
        public FileEnumerator create() {
            return new HiveSourceFileEnumerator(...);
        }
    }
}
// 使用自定义文件枚举类
new HiveSource<>(
        ...,
        new HiveSourceFileEnumerator.Provider(
        partitions != null ? partitions : Collections.emptyList(),
        new JobConfWrapper(jobConf)),
       ...);
f)当前限制

对于大量积压的文件,Watermark 效果不佳;这是因为 Watermark 急于在一个文件中推进,而下一个文件可能包含比 Watermark 更晚的数据。

对于无界 File Sources,枚举器会会将当前所有已处理文件的路径记录到 state 中,在某些情况下,这可能会导致状态变得相当大。 未来计划将引入一种压缩的方式来跟踪已经处理的文件(例如,将修改时间戳保持在边界以下)。

3.File Sink
a)概述

File Sink 将传入的数据写入存储桶中,考虑到输入流可以是无界的,每个桶中的数据被组织成有限大小的 Part 文件;完全可以配置为基于时间的方式往桶中写入数据,比如可以设置每个小时的数据写入一个新桶中,这意味着桶中将包含一个小时间隔内接收到的记录。

桶目录中的数据被拆分成多个 Part 文件,对于相应的接收数据的桶的 Sink 的每个 Subtask,每个桶将至少包含一个 Part 文件;将根据配置的滚动策略来创建其他 Part 文件;对于 Row-encoded Formats(参考 Format Types)默认的策略是根据 Part 文件大小进行滚动,需要指定文件打开状态最长时间的超时以及文件关闭后的非活动状态的超时时间;对于 Bulk-encoded Formats 在每次创建 Checkpoint 时进行滚动,并且用户也可以添加基于大小或者时间等的其他条件。

注意:在 STREAMING 模式下使用 FileSink 需要开启 Checkpoint 功能,文件只在 Checkpoint 成功时生成;如果没有开启 Checkpoint 功能,文件将永远停留在 in-progress 或者 pending 的状态,并且下游系统将不能安全读取该文件数据。

在这里插入图片描述

b)Format Types

FileSink 不仅支持 Row-encoded 也支持 Bulk-encoded,例如 Apache Parquet,这两种格式可以通过如下的静态方法进行构造:

Row-encoded sink: FileSink.forRowFormat(basePath, rowEncoder)
Bulk-encoded sink: FileSink.forBulkFormat(basePath, bulkWriterFactory)

创建 Row-encoded Format 或者 Bulk-encoded Format 的 Sink 时,都必须指定桶的路径以及对数据进行编码的逻辑。

Row-encoded Formats

Row-encoded Format 需要指定一个 Encoder,在输出数据到文件过程中被用来将单个行数据序列化为 OutputStream。

除了 bucket assigner,RowFormatBuilder 还允许用户指定以下属性:

  • Custom RollingPolicy :自定义滚动策略覆盖 DefaultRollingPolicy;
  • bucketCheckInterval (默认值 = 1 min) :基于滚动策略设置的检查时间间隔。

写入字符串的基本用法如下:

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

DataStream<String> input = ...;

final FileSink<String> sink = FileSink
    .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(15))
            .withInactivityInterval(Duration.ofMinutes(5))
            .withMaxPartSize(MemorySize.ofMebiBytes(1024))
            .build())
	.build();

input.sinkTo(sink);

上述示例中创建了一个简单的 Sink,默认的将记录分配给小时桶;还指定了滚动策略,当满足以下三个条件的任何一个时都会将 In-progress 状态文件进行滚动:

  • 包含了至少15分钟的数据量
  • 已经5分钟没有接收到新的记录
  • 文件大小已经达到 1GB(写入最后一条记录之后)

Bulk-encoded Formats

Bulk-encoded 的 Sink 的创建和 Row-encoded 的相似,但不需要指定 Encoder,而是需要指定 BulkWriter.Factory,BulkWriter 定义了如何添加和刷新数据以及如何最终确定一批记录使用哪种编码字符集的逻辑。

Flink 内置了5种 BulkWriter 工厂类

  • ParquetWriterFactory
  • AvroWriterFactory
  • SequenceFileWriterFactory
  • CompressWriterFactory
  • OrcBulkWriterFactory

注意:Bulk-encoded Format 仅支持一种继承了 CheckpointRollingPolicy 类的滚动策略;在每个 Checkpoint 都会滚动,也可以根据大小或处理时间进行滚动。

Parquet Format

Flink 内置了为 Avro Format 数据创建 Parquet 写入工厂的快捷方法,在 AvroParquetWriters 类中可以发现那些方法。

为了让 Parquet Format 数据写入更加通用,用户需要创建 ParquetWriterFactory 并且自定义实现 ParquetBuilder 接口。

如果在程序中使用 Parquet 的 Bulk-encoded Format,需要添加如下依赖到项目中:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-parquet</artifactId>
    <version>1.19.0</version>
</dependency>

类似这样使用 FileSink 写入 Parquet Format 的 Avro 数据:

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
import org.apache.avro.Schema;


Schema schema = ...;
DataStream<GenericRecord> input = ...;

final FileSink<GenericRecord> sink = FileSink
	.forBulkFormat(outputBasePath, AvroParquetWriters.forGenericRecord(schema))
	.build();

input.sinkTo(sink);

类似这样使用 FileSink 写入 Parquet Format 的 Protobuf 数据:

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;

// ProtoRecord 是一个生成 protobuf 的类
DataStream<ProtoRecord> input = ...;

final FileSink<ProtoRecord> sink = FileSink
	.forBulkFormat(outputBasePath, ParquetProtoWriters.forType(ProtoRecord.class))
	.build();

input.sinkTo(sink);

Avro Format

Flink 支持写入数据到 Avro Format 文件,在 AvroWriters 类中可以发现一系列创建 Avro writer 工厂的便利方法。

如果在程序中使用 AvroWriters,需要添加如下依赖到项目中:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro</artifactId>
    <version>1.19.0</version>
</dependency>

类似这样使用 FileSink 写入数据到 Avro Format 文件中:

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.avro.Schema;


Schema schema = ...;
DataStream<GenericRecord> input = ...;

final FileSink<GenericRecord> sink = FileSink
	.forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
	.build();

input.sinkTo(sink);

对于自定义创建的 Avro writers,例如支持压缩功能,用户需要创建 AvroWriterFactory 并且自定义实现 AvroBuilder 接口:

AvroWriterFactory<?> factory = new AvroWriterFactory<>((AvroBuilder<Address>) out -> {
	Schema schema = ReflectData.get().getSchema(Address.class);
	DatumWriter<Address> datumWriter = new ReflectDatumWriter<>(schema);

	DataFileWriter<Address> dataFileWriter = new DataFileWriter<>(datumWriter);
	dataFileWriter.setCodec(CodecFactory.snappyCodec());
	dataFileWriter.create(schema, out);
	return dataFileWriter;
});

DataStream<Address> stream = ...
stream.sinkTo(FileSink.forBulkFormat(
	outputBasePath,
	factory).build());

ORC Format

ORC Format 的数据采用 Bulk-encoded Format,Flink 提供了 Vectorizer 接口的具体实现类 OrcBulkWriterFactory。

像其他列格式一样也是采用 Bulk-encoded Format,Flink 中 OrcBulkWriter 是使用 ORC 的 VectorizedRowBatch 实现批的方式输出数据的。

由于输入数据已经被转换成了 VectorizedRowBatch,所以用户必须继承抽象类 Vectorizer 并且覆写类中 vectorize(T element, VectorizedRowBatch batch) 方法;此方法提供了用户直接使用的 VectorizedRowBatch 类的实例,用户需要编写从输入 element 到 ColumnVectors 的转换逻辑,然后设置在 VectorizedRowBatch 实例中。

示例:是 Person 类型的输入元素

class Person {
    private final String name;
    private final int age;
    ...
}

然后,转换 Person 类型元素的实现并在 VectorizedRowBatch 中设置,如下所示:

import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;

public class PersonVectorizer extends Vectorizer<Person> implements Serializable {	
	public PersonVectorizer(String schema) {
		super(schema);
	}
	@Override
	public void vectorize(Person element, VectorizedRowBatch batch) throws IOException {
		BytesColumnVector nameColVector = (BytesColumnVector) batch.cols[0];
		LongColumnVector ageColVector = (LongColumnVector) batch.cols[1];
		int row = batch.size++;
		nameColVector.setVal(row, element.getName().getBytes(StandardCharsets.UTF_8));
		ageColVector.vector[row] = element.getAge();
	}
}

如果在程序中使用 ORC 的 Bulk-encoded Format,需要添加如下依赖到项目中:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-orc</artifactId>
    <version>1.19.0</version>
</dependency>

然后,使用 FileSink 以 ORC Format 输出数据:

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;

String schema = "struct<_col0:string,_col1:int>";
DataStream<Person> input = ...;

final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(new PersonVectorizer(schema));

final FileSink<Person> sink = FileSink
	.forBulkFormat(outputBasePath, writerFactory)
	.build();

input.sinkTo(sink);

OrcBulkWriterFactory 还可以采用 Hadoop 的 ConfigurationProperties,以提供自定义的 Hadoop 配置 和 ORC 输出属性。

String schema = ...;
Configuration conf = ...;
Properties writerProperties = new Properties();

writerProperties.setProperty("orc.compress", "LZ4");
// 其他 ORC 属性也可以使用类似方式进行设置

final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(
    new PersonVectorizer(schema), writerProperties, conf);

用户在重写 vectorize(...) 方法时可以调用 addUserMetadata(...) 方法来添加自己的元数据到 ORC 文件中。

public class PersonVectorizer extends Vectorizer<Person> implements Serializable {	
	@Override
	public void vectorize(Person element, VectorizedRowBatch batch) throws IOException {
		...
		String metadataKey = ...;
		ByteBuffer metadataValue = ...;
		this.addUserMetadata(metadataKey, metadataValue);
	}
}

Hadoop SequenceFile Format

如果在程序中使用 SequenceFile 的 Bulk-encoded Format,需要添加如下依赖到项目中:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sequence-file</artifactId>
    <version>1.19.0</version>
</dependency>

示例:创建一个简单的 SequenceFile

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;


DataStream<Tuple2<LongWritable, Text>> input = ...;
Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
final FileSink<Tuple2<LongWritable, Text>> sink = FileSink
  .forBulkFormat(
    outputBasePath,
    new SequenceFileWriterFactory<>(hadoopConf, LongWritable.class, Text.class))
	.build();

input.sinkTo(sink);

SequenceFileWriterFactory 提供额外的构造参数设置是否开启压缩功能。

c)桶分配

桶的逻辑定义了如何将数据分配到基本输出目录内的子目录中。

Row-encoded Format 和 Bulk-encoded Format 使用了 DateTimeBucketAssigner 作为默认的分配器,默认的分配器 DateTimeBucketAssigner 会基于使用了格式为 yyyy-MM-dd--HH 的系统默认时区来创建小时桶,日期格式( 桶大小)和时区都可以手动配置。

还可以在格式化构造器中通过调用 .withBucketAssigner(assigner) 方法指定自定义的 BucketAssigner

Flink 内置了两种 BucketAssigners:

  • DateTimeBucketAssigner :默认的基于时间的分配器
  • BasePathBucketAssigner :分配所有文件存储在基础路径上(单个全局桶)
d)滚动策略

RollingPolicy 定义了何时关闭给定的 In-progress Part 文件,并将其转换为 Pending 状态,然后再转换为 Finished 状态;Finished 状态的文件,可供查看并且可以保证数据的有效性,在出现故障时不会恢复。

  • 在 STREAMING 模式下,滚动策略结合 Checkpoint 间隔(到下一个 Checkpoint 成功时,文件的 Pending 状态才转换为 Finished 状态)共同控制 Part 文件对下游 readers 是否可见以及这些文件的大小和数量;
  • 在 BATCH 模式下,Part 文件在 Job 最后对下游才变得可见,滚动策略只控制最大的 Part 文件大小。

Flink 内置了两种 RollingPolicies:

  • DefaultRollingPolicy
  • OnCheckpointRollingPolicy
e)Part 文件生命周期

为了在下游使用 FileSink 作为输出,需要了解生成的输出文件的命名和生命周期。

Part 文件可以处于以下三种状态中的任意一种:

  1. In-progress :当前正在写入的 Part 文件处于 in-progress 状态
  2. Pending :由于指定的滚动策略关闭 in-progress 状态文件,并且等待提交
  3. Finished :流模式(STREAMING)下的成功的 Checkpoint 或者批模式(BATCH)下输入结束,文件的 Pending 状态转换为 Finished 状态

只有 Finished 状态下的文件才能被下游安全读取,并且保证不会被修改。

对于每个活动的桶,在任何给定时间每个写入 Subtask 中都有一个 In-progress 状态的 Part 文件,但可能有多个 Pending 状态和 Finished 状态的文件。

Part 文件示例

为了更好的了解这些文件的生命周期,看一个只有 2 个 Sink Subtask 的简单例子:

└── 2019-08-25--12
    ├── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    └── part-81fc4980-a6af-41c8-9937-9939408a734b-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575

当这个 Part 文件 part-81fc4980-a6af-41c8-9937-9939408a734b-0 滚动时(比如说此文件变的很大时),此文件将进入 Pending 状态并且不能重命名,Sink 就会打开一个新的 Part 文件: part-81fc4980-a6af-41c8-9937-9939408a734b-1

└── 2019-08-25--12
    ├── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    ├── part-81fc4980-a6af-41c8-9937-9939408a734b-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
    └── part-81fc4980-a6af-41c8-9937-9939408a734b-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11

part-81fc4980-a6af-41c8-9937-9939408a734b-0 现在是 Pending 状态,在下一个 Checkpoint 成功后,立即成为 Finished 状态:

└── 2019-08-25--12
    ├── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    ├── part-81fc4980-a6af-41c8-9937-9939408a734b-0
    └── part-81fc4980-a6af-41c8-9937-9939408a734b-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11

根据桶策略创建新桶时,不会影响当前 In-progress 状态的文件:

└── 2019-08-25--12
    ├── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    ├── part-81fc4980-a6af-41c8-9937-9939408a734b-0
    └── part-81fc4980-a6af-41c8-9937-9939408a734b-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
└── 2019-08-25--13
    └── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.2b475fec-1482-4dea-9946-eb4353b475f1

旧桶仍然可以接收新记录,因为桶策略是在每条记录上进行评估的。

Part 文件配置

Finished 状态与 In-progress 状态的文件只能通过命名来区分。

默认的,文件命名策略如下:

  • In-progress / Pending:part--.inprogress.uid
  • Finished:part-- 当 Sink Subtask 实例化时,这的 uid 是一个分配给 Subtask 的随机 ID 值;这个 uid 不具有容错机制,所以当 Subtask 从故障恢复时,uid 会重新生成。

Flink 允许用户给 Part 文件名添加一个前缀和/或后缀,使用 OutputFileConfig 来完成上述功能;例如 Sink 将在创建文件的文件名上添加前缀 “prefix” 和后缀 “.ext”,如下所示:

└── 2019-08-25--12
    ├── prefix-4005733d-a830-4323-8291-8866de98b582-0.ext
    ├── prefix-4005733d-a830-4323-8291-8866de98b582-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    ├── prefix-81fc4980-a6af-41c8-9937-9939408a734b-0.ext
    └── prefix-81fc4980-a6af-41c8-9937-9939408a734b-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11

用户也可以使用 OutputFileConfig 采用如下方式添加前缀和后缀:

OutputFileConfig config = OutputFileConfig
 .builder()
 .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build();
            
FileSink<Tuple2<Integer, Integer>> sink = FileSink
 .forRowFormat((new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))
 .withBucketAssigner(new KeyBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .withOutputFileConfig(config)
 .build();
f)文件合并

从 1.15 版本开始 FileSink 支持已经提交 pending 文件的合并,从而允许应用设置一个较小的时间周期并且避免生成大量的小文件。 尤其是当用户使用 bulk 格式的时候:这种格式要求用户必须在 checkpoint 的时候切换文件。

文件合并功能可以通过以下代码打开:

FileSink<Integer> fileSink=
        FileSink.forRowFormat(new Path(path),new SimpleStringEncoder<Integer>())
            .enableCompact(
                FileCompactStrategy.Builder.newBuilder()
                    .setNumCompactThreads(1024)
                    .enableCompactionOnCheckpoint(5)
                    .build(),
                new RecordWiseFileCompactor<>(
                    new DecoderBasedReader.Factory<>(SimpleStringDecoder::new)))
            .build();

这一功能开启后,在文件转为 pending 状态与文件最终提交之间会进行文件合并。

这些 pending 状态的文件将首先被提交为一个以 . 开头的临时文件,这些文件随后将会按照用户指定的策略和合并方式进行合并,并生成合并后的 pending 状态的文件,然后这些文件将被发送给 Committer 并提交为正式文件,在这之后,原始的临时文件也会被删除掉。

当开启文件合并功能时,用户需要指定 FileCompactStrategy 与 FileCompactor。

FileCompactStrategy 指定何时以及哪些文件将被合并,目前有两个并行的条件:目标文件大小与间隔的 Checkpoint 数量

当目前缓存的文件的总大小达到指定的阈值,或自上次合并后经过的 Checkpoint 次数已经达到指定次数时, FileSink 将创建一个异步任务来合并当前缓存的文件。

FileCompactor 指定如何将给定的路径列表对应的文件进行合并将结果写入到文件中,根据如何写文件,它可以分为两类

  • OutputStreamBasedFileCompactor:用户将合并后的结果写入一个输出流中,通常在用户不希望或者无法从输入文件中读取记录时使用。这种类型的 CompactingFileWriter 的一个例子是 ConcatFileCompactor,它直接将给定的文件进行合并并将结果写到输出流中。
  • RecordWiseFileCompactor: 这种类型的 CompactingFileWriter 会逐条读出输入文件的记录,然后和FileWriter一样写入输出文件中。CompactingFileWriter 的一个例子是 RecordWiseFileCompactor,它从给定的文件中读出记录并写出到 CompactingFileWriter 中。用户需要指定如何从原始文件中读出记录。

注意事项1 一旦启用了文件合并功能,此后若需要再关闭,必须在构建FileSink时显式调用disableCompact方法。

注意事项2 如果启用了文件合并功能,文件可见的时间会被延长。

g)注意事项

注意事项 1:当使用的 Hadoop 版本 < 2.7 时, 当每次 Checkpoint 时请使用 OnCheckpointRollingPolicy 滚动 Part 文件。原因是:如果 Part 文件 “穿越” 了 Checkpoint 的时间间隔, 然后从失败中恢复过来时,FileSink 可能会使用文件系统的 truncate() 方法丢弃处于 In-progress 状态文件中的未提交数据。 这个方法在 Hadoop 2.7 版本之前是不支持的,Flink 将抛出异常。

注意事项 2:鉴于 Flink 的 Sink 和 UDF 通常不会区分正常作业终止(例如 有限输入流)和 由于故障而终止, 在 Job 正常终止时,最后一个 In-progress 状态文件不会转换为 “Finished” 状态。

注意事项 3:Flink 和 FileSink 永远不会覆盖已提交数据,如果一个 In-progress 状态文件被后续成功的 Checkpoint 提交了,当尝试从这个旧的 Checkpoint / Savepoint 进行恢复时,FileSink 将拒绝继续执行并将抛出异常,因为程序无法找到 In-progress 状态的文件。

注意事项 4:目前FileSink 仅支持以下 5 种文件系统:HDFS、 S3、OSS、ABFS 和 Local,如果在运行时使用了不支持的文件系统,Flink 将抛出异常。

BATCH 注意事项

注意事项 1:虽然 Writer 是以用户指定的 parallelism 执行的,然而 Committer 是以 parallelism = 1 执行的。

注意事项 2:Pending 状态文件被提交并且所有输入数据被处理完后,才转换为 Finished 状态。

注意事项 3:当系统处于高可用状态下,并且正当 Committers 进行提交时如果 JobManager 发生了故障,那么可能会有副本。这种情况将会在 Flink 的未来版本中进行修复。

S3 注意事项

注意事项 1:对于 S3,FileSink 仅支持基于 Hadoop-based 文件系统的实现,而不支持基于 Presto 的实现。 如果 Job 中使用 FileSink 写入 S3,但是希望使用基于 Presto 的 Sink 做 Checkpoint,建议明确使用 “s3a://” (对于 Hadoop)作为 Sink 目标路径格式并且使用 “s3p://” 作为 Checkpoint 的目标路径格式(对于 Presto)。 对于 Sink 和 Checkpoint 同时使用 “s3://” 可能导致不可控的行为,由于两者的实现 “监听” 同一格式路径。

注意事项 2:在保证高效的同时还要保证 exactly-once 语义,FileSink 使用了 S3 的 Multi-part Upload 功能(MPU 功能开箱即用)。 此功能允许以独立的块上传文件(因此称为 “multi-part”),当 MPU 的所有块都上传成功时,这些块就可以合并生成原始文件。 对于非活动的 MPU,S3 支持桶生命周期规则,用户可以使用该规则终止在启动后指定天数内未完成的多块上传操作。 即如果设置了这个规则,并在某些文件未完全上传的情况下执行 Savepoint,则其关联的 MPU 可能会在 Job 重启前超时。 这将导致 Job 无法从该 Savepoint 恢复,因为 Pending 状态的 Part 文件已不存在,那么 Flink Job 将失败并抛出异常,因为程序试图获取那些不存在的文件导致了失败。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1928312.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数字孪生Digital Twin 结合建筑信息模型 BIM 在AIoT 智慧城市建设中Web 可视化大屏实践...

智慧城市建设通过将城市中的建筑、基础设施等构建 BIM 模型&#xff0c;并与实时采集的数据相结合&#xff0c;创建数字孪生体。可以实现对城市能源消耗、交通流量、环境质量等的实时监测和预测&#xff0c;优化城市规划和资源分配。 01 数字孪生 Digital Twin 数字孪生 Digita…

Spring MVC 全注解开发

1. Spring MVC 全注解开发 文章目录 1. Spring MVC 全注解开发2. web.xml 文件 的替代2.1 Servlet3.0新特性2.2 编写 WebAppInitializer 3. Spring MVC的配置3.1 Spring MVC的配置&#xff1a;开启注解驱动3.2 Spring MVC的配置&#xff1a;视图解析器3.3 Spring MVC的配置&…

【实战:python-Django发送邮件-短信-钉钉通知】

一 Python发送邮件 1.1 使用SMTP模块发送邮件 import smtplib from email.mime.text import MIMEText from email.header import Headermsg_from 306334678qq.com # 发送方邮箱 passwd luzdikipwhjjbibf # 填入发送方邮箱的授权码(填入自己的授权码&#xff0c;相当于邮箱…

SSE、Webworker 、webSocket、Http、Socket 服务器推送技术

Http协议 受浏览器的同源策略限制 HTTP 协议是一种无状态的、无连接&#xff08;短暂连接&#xff0c;客户端发送请求&#xff0c;服务器响应后即断开连接&#xff09;的、单向的应用层协议。 它采用了请求/响应模型。通信请求只能由客户端发起&#xff0c;服务端对请求做出应…

(day18) leetcode 204.计数质数

描述 给定整数 n &#xff0c;返回 所有小于非负整数 n 的质数的数量 。 示例 1&#xff1a; 输入&#xff1a;n 10 输出&#xff1a;4 解释&#xff1a;小于 10 的质数一共有 4 个, 它们是 2, 3, 5, 7 。示例 2&#xff1a; 输入&#xff1a;n 0 输出&#xff1a;0示例 3…

JVM--自动内存管理--JAVA内存区域

1. 运行时数据区域 灰色的线程共享&#xff0c;白色的线程独享 白色的独享就是根据个体"同生共死" 程序计数器&#xff1a; 是唯一一个没有OOM(内存溢出)的地方 是线程独享的 作用&#xff1a; 是一块较小的内存空间,是当前线程所执行的字节吗的行号指示器 由于…

智慧水利:迈向水资源管理的新时代,结合物联网、云计算等先进技术,阐述智慧水利解决方案在提升水灾害防控能力、优化水资源配置中的关键作用

本文关键词&#xff1a;智慧水利、智慧水利工程、智慧水利发展前景、智慧水利技术、智慧水利信息化系统、智慧水利解决方案、数字水利和智慧水利、数字水利工程、数字水利建设、数字水利概念、人水和协、智慧水库、智慧水库管理平台、智慧水库建设方案、智慧水库解决方案、智慧…

docker 安装 onlyoffice

1.文档地址 Installing ONLYOFFICE Docs for Docker on a local server - ONLYOFFICE 2.安装onlyoffice docker run -i -t -d -p 9000:8000 --restartalways -e JWT_ENABLEDfalse onlyoffice/documentserver 如果发现镜像无法下载,可以尝试更换镜像源 {"registry-mir…

JVM和类加载机制-01[JVM底层架构和JVM调优]

JVM底层 Java虚拟机内存模型JVM组成部分五大内存区域各自的作用虚拟机栈(线程栈)栈帧内存区域 本地方法栈程序计数器为什么jvm要设计程序计数器&#xff1f; 堆方法区 JVM优化-堆详解JVM底层垃圾回收机制jvm调优工具jvisualvm.exeArthas工具使用 Java虚拟机内存模型 JVM跨平台原…

2024年初级注册安全工程师职业资格考试首次开考!

​2024年初级注册安全工程师考试首次开考&#xff08;注&#xff1a;该考试由各省人事考试局组织考试&#xff09;。目前未取得中级注册安全工程师证书的各位同学&#xff0c;可以关注该考试&#xff0c;毕竟初级考证相对较容易&#xff0c;先去考一个。 目前初安开考地区汇总…

PHP多功能投票微信小程序系统源码

&#x1f389;一键决策&#xff0c;尽在掌握&#xff01;多功能投票小程序&#xff0c;让选择不再纠结&#x1f914; &#x1f4f2;【开篇&#xff1a;告别传统&#xff0c;拥抱便捷投票新时代】&#x1f4f2; 还在为组织投票活动手忙脚乱&#xff1f;或是面对众多选项犹豫不…

技术成神之路:设计模式(七)状态模式

1.介绍 状态模式&#xff08;State Pattern&#xff09;是一种行为设计模式&#xff0c;它允许一个对象在其内部状态改变时改变其行为。这个模式将状态的相关行为封装在独立的状态类中&#xff0c;并将不同状态之间的转换逻辑分离开来。 2.主要作用 状态模式的主要作用是让一个…

HTML5+CSS3小实例:纯CSS实现奥运五环

实例:纯CSS实现奥运五环 技术栈:HTML+CSS 效果: 源码: 【HTML】 <!DOCTYPE html> <html lang="zh-CN"> <head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-sca…

算法导论 总结索引 | 第五部分 第十八章:B树

1、B 树是 为磁盘或其他直接存取的辅助存储设备 而设计的一种平衡搜索树。B 树类似于红黑树&#xff0c;在降低磁盘 I/O 操作次数方面要更好一些。许多数据库系统 使用 B 树 或者 B 树 的变种来存储信息 2、B 树与红黑树的不同之处 在于 B 树的结点 可以有很多孩子&#xff0c…

《JavaScript权威指南第7版》中文PDF+英文PDF+源代码 +JavaScript权威指南(第6版)(附源码)PDF下载阅读分享推荐

JavaScript是Web编程语言。绝大多数网站都使用JavaScript&#xff0c;所有现代Web浏览器&#xff08;无论是桌面、平板还是手机浏览器&#xff0c;书中以后统称为浏览器&#xff09;都包含JavaScript解释器&#xff0c;这让JavaScript成为有史以来部署最广泛的编程语言。过去十…

怎么把自己写的组件发布到npm官方仓库??

一.注册npm账号 npm官网 1.注册npm 账号 2.登陆 3.登陆成功 二.搭建一个vue 项目 具体步骤参考liu.z Z 博客 或者初始化一个vue项目 vue create XXX &#xff08;工程名字&#xff09;运行代码 npm run serve三.组件封装 1.在src文件下建一个package文件&#xff0…

EMQX开源版安装

一、EMQX是什么 EMQX 是一款开源的大规模分布式 MQTT 消息服务器&#xff0c;功能丰富&#xff0c;专为物联网和实时通信应用而设计。EMQX 5.0 单集群支持 MQTT 并发连接数高达 1 亿条&#xff0c;单服务器的传输与处理吞吐量可达每秒百万级 MQTT 消息&#xff0c;同时保证毫秒…

参考——CCS联合CLion调试__开发TI芯片

一、简介 随着电赛临近&#xff0c;有些题必须指定使用TI芯片&#xff0c;那么就不得不学一下CCS。虽然CCS相较于Keil和IAR&#xff0c;显得更现代化一些&#xff0c;但还是没有代码样式、代码格式化、代码补全等功能。如果你用惯了CLion再用CCS&#xff0c;就会有些许一言难尽…

LLM之Prompt(四)| OpenAI、微软发布Prompt技术报告

摘要 生成式人工智能 &#xff08;GenAI&#xff09; 系统正越来越多地部署在各行各业和研究机构。开发人员和用户通过使用提示或提示工程与这些系统进行交互。虽然提示是一个广泛提及且被研究的概念&#xff0c;但由于该领域的新生&#xff0c;存在相互矛盾的术语和对构成提示…

十七、【文本编辑器(三)】图像坐标变换

目录 一、缩放功能 二、旋转功能 三、镜像功能 四、QMatrix简单介绍 一、缩放功能 &#xff08;1&#xff09;在头文件中添加 “protected slots:" 变量&#xff1a; void ShowZoomln( ); &#xff08;2&#xff09;在 createActionso函数的最后添力口事件关联&…