- 新建类
package test01;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
public class TestOutputFile {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
executionEnvironment.setParallelism(1);
//监听数据端口
DataStreamSource<String> dataSource = executionEnvironment.socketTextStream("localhost", 9999);
//开启checkpoint,这样到了一定节点就会关闭文件,否则文件一直都是inprogress,此处设置的检查点是2秒。
executionEnvironment.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
//输出至文件
FileSink<String> fileSink = FileSink
//设置按行输出,指定输出的路径及编码格式,这里的泛型指定的是字符串类型。
.<String>forRowFormat(new Path("D:/IT/testfilnk"), new SimpleStringEncoder<>("UTF-8"))
//设置输出文件名的前缀和后缀
.withOutputFileConfig(OutputFileConfig.builder()
.withPartPrefix("test-flink-output-")
.withPartSuffix(".log")
.build())
//设置文件滚动策略,这里设置的是20s和1024B(1KB),滚动策略满足其一就会重新写新文件。
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofSeconds(20))
.withMaxPartSize(new MemorySize(1024))
.build()
)
.build();
dataSource.sinkTo(fileSink);
executionEnvironment.execute();
}
}
- 启动程序并启动nc -lp
输入数据:
正在写入的文件会有inprogress的标识(在指定的目录下生成文件时会按照日期的年月日时进行分目录,因为我在执行时的时间是2023/7/12 22点,所以它就会自动生成一个2023-07-12--22目录,分桶策略也可以自己在代码中配置。):
当满足滚动策略时,会结束当前文件,然后重新写入新文件:
查看文件内容: