Flink之输出算子Data Sink
- Data Sink
- 常见输出算子
- print()
- printToErr()
- writeAsText()
- writeAsCsv()
- writeToSocket()
- 常用连接器
- File Sink连接器
- Kafka Sink连接器
- RabbitMQ Sink连接器
- JDBC Sink连接器
- Elasticsearch Sink连接器
- MongoDB Sink连接器
- 自定义Sink
- RichSinkFunction
- SinkFunction
- 验证测试
Data Sink
在Apache Flink中,输出算子(Data Sink)用于将数据流发送到外部系统或存储介质中,如数据库、消息队列、文件系统、Apache Kafka等,以便进行后续的持久化、分析或其他操作。输出算子是数据流处理的最后一步,它决定了数据的最终去向。
Flink提供了各种内置的输出算子,可支持许多常见的数据存储系统,如print()、printToErr()、writeAsText()等。
Flink还提供了一部分框架的Sink连接器,支持与许多外部系统集成的连接器,如Apache Kafka、Elasticsearch、JDBC、MongoDB等。这些连接器提供了专门的输出算子,可以直接与这些外部系统进行交互。
也可以自定义输出算子来与其他系统进行集成,只需实现 SinkFunction 接口,并将其用作 addSink 方法的参数。
常见输出算子
在使用Flink进行数据处理时,数据经Data Source流入,然后通过系列Transformations的转化,最终可以通过Sink 将计算结果进行输出,Flink Data Sinks就是用于定义数据流最终的输出位置。
Flink 提供了几个较为简单的 Sink API 用于日常的开发,具体如下:
print()
将数据打印到标准输出stdout,不需要额外配置,它会将数据元素以字符串形式直接打印到标准输出上
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromElements("Hello", "World", "Flink");
dataStream.print();
env.execute("Print Data");
printToErr()
将数据打印到标准错误输出stderr,类似于 print(),但将数据打印到标准错误输出上。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromElements("Hello", "World", "Flink");
dataStream.printToErr();
env.execute("Print to StdErr");
writeAsText()
将数据写入文本文件,将数据按照行的形式写入文本文件,每行一个数据元素。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromElements("Hello", "World", "Flink");
dataStream.writeAsText("/path/to/output.txt"); // 指定输出路径和文件名
env.execute("Write as Text");
可以通过指定第二个参数来定义输出模式,它有以下两个可选值:
WriteMode.NO_OVERWRITE:当指定路径上不存在任何文件时,才执行写出操作
WriteMode.OVERWRITE:不论指定路径上是否存在文件,都执行写出操作;如果原来已有文件,则进行覆盖
并行的方式写出到多个文件:
streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE);
将输出结果全部写出到一个文件,设置其并行度为1:
streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
writeAsCsv()
用于将计算结果以CSV的文件格式写出到指定目录,除了路径参数是必选外,该方法还支持传入输出模式,行分隔符,和字段分隔符三个额外的参数
DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(
new Tuple2<>("Alice", 25),
new Tuple2<>("Bob", 30),
new Tuple2<>("Charlie", 35)
);
// writeAsCsv(String path, WriteMode writeMode, String rowDelimiter, String fieldDelimiter)
dataStream.writeAsCsv("/path/to/output.csv", WriteMode.OVERWRITE, "\n", ",");
env.execute("Write as CSV");
writeToSocket()
将数据通过网络 socket 发送到指定的主机和端口。可以用于将数据流输出到外部系统或进行简单的网络通信。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromElements("Hello", "World", "Flink");
dataStream.writeToSocket("localhost", 9999, new SimpleStringSchema());
env.execute("Write to Socket");
常用连接器
连接器可以和多种多样的第三方系统进行交互。Flink官方目前支持以下第三方系统连接器
Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon DynamoDB (sink)
Amazon Kinesis Data Streams (source/sink)
Amazon Kinesis Data Firehose (sink)
DataGen (source)
Elasticsearch (sink)
Opensearch (sink)
FileSystem (sink)
RabbitMQ (source/sink)
Google PubSub (source/sink)
Hybrid Source (source)
Apache Pulsar (source)
JDBC (sink)
MongoDB (source/sink)
除Flink官方之外,还有一些其他第三方系统与Flink的连接器,通过Apache Bahir发布:
Apache ActiveMQ (source/sink)
Apache Flume (sink)
Redis (sink)
Akka (sink)
Netty (source)
File Sink连接器
Flink提供了一个流式文件系统的连接器
FileSink
,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
File Sink支持行编码Row-encoded和批量编码Bulk-encoded格式。这两种不同的方式都有各自的构建器builder,可以直接调用FileSink的静态方法:
// 行编码
FileSink.forRowFormat(basePath,rowEncoder)
// 批量编码
FileSink.forBulkFormat(basePath,bulkWriterFactory)
File Sink将传入的数据写入存储桶中。考虑到输入流可以是无界的,每个桶中的数据被组织成有限大小的 Part 文件。 完全可以配置为基于时间的方式往桶中写入数据,比如可以设置每个小时的数据写入一个新桶中。
注意:
在 STREAMING 模式下使用 FileSink 需要开启 Checkpoint 功能。 文件只在 Checkpoint 成功时生成。如果没有开启 Checkpoint 功能,文件将永远停留在 in-progress 或者 pending 的状态,并且下游系统将不能安全读取该文件数据。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每个目录中,有N个并行度的文件在写入
env.setParallelism(2);
// 开启checkpoint,否则生成文件有后缀: .inprogress
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
/**
* 生成模拟数据
*/
DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
new GeneratorFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "Number:" + value;
}
},
Long.MAX_VALUE, RateLimiterStrategy.perSecond(1000), Types.STRING);
// 读取模拟数据
DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");
// 输出到文件系统
FileSink<String> fieSink = FileSink
// 输出行式存储的文件,指定路径、指定编码
.<String>forRowFormat(new Path("D:/temp"), new SimpleStringEncoder<>("UTF-8"))
// 输出文件的一些配置: 文件名的前缀、后缀
.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("fileSink-").withPartSuffix(".log").build())
// 按照目录分桶:每个小时一个目录
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
// 文件滚动策略: 30秒 或 1m 生成一个文件
.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofSeconds(30)).withMaxPartSize(new MemorySize(1024 * 1024)).build())
.build();
dataGen.sinkTo(fieSink);
env.execute();
}
Kafka Sink连接器
添加Kafka 连接器依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 写到kafka的一致性级别如果是精准一次,必须开启checkpoint
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
SingleOutputStreamOperator<String> singleOutputStreamOperator = env.socketTextStream("node01", 8888);
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
// 指定kafka地址和端口
.setBootstrapServers("node01:9092,node01:9092,node03:9092")
// 指定序列化器、指定Topic名称、具体的序列化
.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("kafkaSink").setValueSerializationSchema(new SimpleStringSchema()).build())
// 写到kafka的一致性级别: 精准一次、至少一次
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// 如果是精准一次,必须设置 事务的前缀
.setTransactionalIdPrefix("kafkaSink-")
// 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
.build();
singleOutputStreamOperator.sinkTo(kafkaSink);
env.execute();
}
自定义序列化器,实现带key的record
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(RestartStrategies.noRestart());
SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("node01", 8888);
/**
* 自定义序列器:
* 实现KafkaRecordSerializationSchema接口,重写ProducerRecord序列化方法
* 指定key、value,转成字节数组
* @return 返回一个 ProducerRecord对象,把key、value放进去
*/
class MyKafkaRecordSerializa implements KafkaRecordSerializationSchema<String> {
@Nullable
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext kafkaSinkContext, Long timestamp) {
String[] datas = element.split(",");
byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
byte[] value = element.getBytes(StandardCharsets.UTF_8);
return new ProducerRecord<>("kafkaSink", key, value);
}
}
KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("node01:9092,node02:9092,node03:9092")
.setRecordSerializer(new MyKafkaRecordSerializa())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("kafkaSink-")
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
.build();
sensorDS.sinkTo(kafkaSink);
env.execute();
}
启动一个消费者,查看是否收到数据
bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --topic ws
RabbitMQ Sink连接器
要在Flink中使用RabbitMQ作为输出连接器,可以使用Flink提供的RabbitMQ Sink。
添加对RabbitMQ连接器的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq</artifactId>
<version>3.0.1-1.17</version>
</dependency>
定义一个用于序列化数据的简单类
public class MyFlinkBean {
public String id;
public String name;
public Integer age;
// getter() setter()
public MyFlinkBean() {
}
public MyFlinkBean(String id, String name, Integer age) {
this.id = id;
this.name = name;
this.age = age;
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
/**
* 构建数据源
* 定义要发送到RabbitMQ的数据流
*/
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
list.add(UUID.randomUUID() + "," + "flink" + i + "," + i);
}
DataStreamSource<String> stream = env.fromCollection(list);
/**
* Map算子转换处理
*/
class MyMapFunction implements MapFunction<String, MyFlinkBean> {
@Override
public MyFlinkBean map(String value) throws Exception {
String[] data = value.split(",");
String uid = data[0].replace("-", "");
return new MyFlinkBean(uid, data[1], Integer.valueOf(data[2]));
}
}
SingleOutputStreamOperator<MyFlinkBean> source = stream.map(new MyMapFunction());
// 定义RabbitMQ连接配置
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setPort(5672).setUserName("work").setPassword("12345678").setVirtualHost("/").build();
// 将MyFlinkBean对象序列化为字节数组
class MySerializationSchema implements SerializationSchema<MyFlinkBean> {
@Override
public byte[] serialize(MyFlinkBean flinkBean) {
return flinkBean.toString().getBytes();
}
}
// 使用RabbitMQ连接器将数据发送到RabbitMQ队列
RMQSink<MyFlinkBean> rabbitMQSink = new RMQSink<>(connectionConfig, "flinkQueue", new MySerializationSchema());
source.addSink(rabbitMQSink);
env.execute("RabbitMQ Example");
}
/**
* MQ其他参数配置
*/
public class MyRMQSinkPublishOptions implements RMQSinkPublishOptions<MyFlinkBean> {
private final String exchangeName;
private final String routingKey;
public MyRMQSinkPublishOptions(String exchangeName, String routingKey) {
this.exchangeName = exchangeName;
this.routingKey = routingKey;
}
@Override
public String computeRoutingKey(MyFlinkBean element) {
// 根据具体逻辑计算路由键
return routingKey;
}
@Override
public AMQP.BasicProperties computeProperties(MyFlinkBean a) {
// 其他MQ参数配置
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties();
return basicProperties;
}
@Override
public String computeExchange(MyFlinkBean a) {
// 根据具体逻辑计算交换机名称
return exchangeName;
}
}
// 交换机名称
String exchange = "myExchange";
// 路由键名称
String routingKey = "myRoutingKey";
// 使用RabbitMQ连接器将数据发送到RabbitMQ队列
RMQSink<MyFlinkBean> rabbitMQSink = new RMQSink<>(connectionConfig, new MySerializationSchema(), new MyRMQSinkPublishOptions(exchange, routingKey), null);
source.addSink(rabbitMQSink);
JDBC Sink连接器
添加MySQL驱动
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.20</version>
</dependency>
添加jdbc连接器
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.0-1.17</version>
</dependency>
定义一个用于序列化数据的简单类
public class MyFlinkBean {
public String id;
public String name;
public Integer age;
// getter() setter()
public MyFlinkBean() {
}
public MyFlinkBean(String id, String name, Integer age) {
this.id = id;
this.name = name;
this.age = age;
}
}
编写输出到MySQL
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
/**
* 构建数据源
*/
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
list.add(UUID.randomUUID() + "," + "flink" + i + "," + i);
}
DataStreamSource<String> stream = env.fromCollection(list);
/**
* Map算子转换处理
*/
class MyMapFunction implements MapFunction<String, MyFlinkBean> {
@Override
public MyFlinkBean map(String value) throws Exception {
String[] data = value.split(",");
String uid = data[0].replace("-", "");
return new MyFlinkBean(uid, data[1], Integer.valueOf(data[2]));
}
}
SingleOutputStreamOperator<MyFlinkBean> source = stream.map(new MyMapFunction());
/**
* 预编译SQL,对SQL填充占位符
*/
class MyJdbcStatementBuilder implements JdbcStatementBuilder<MyFlinkBean> {
@Override
public void accept(PreparedStatement preparedStatement, MyFlinkBean myFlinkBean) throws SQLException {
preparedStatement.setString(1, myFlinkBean.getId());
preparedStatement.setString(2, myFlinkBean.getName());
preparedStatement.setInt(3, myFlinkBean.getAge());
}
}
// 定义执行的sql
String sql = "insert into flink values(?,?,?)";
/**
* 执行选项参数
*/
JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
// 重试次数
.withMaxRetries(3)
// 批次的大小:条数
.withBatchSize(100)
// 批次的时间
.withBatchIntervalMs(3000)
.build();
/**
* 连接选项参数
*/
JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/demo?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
.withUsername("root")
.withPassword("123456")
// 重试的超时时间
.withConnectionCheckTimeoutSeconds(60)
.build();
SinkFunction<MyFlinkBean> jdbcSink = JdbcSink.sink(sql, new MyJdbcStatementBuilder(), jdbcExecutionOptions, jdbcConnectionOptions);
source.addSink(jdbcSink);
env.execute();
}
创建表
CREATE TABLE `flink` (
`id` VARCHAR ( 32 ) NOT NULL,
`name` VARCHAR ( 10 ) DEFAULT NULL,
`age` TINYINT ( 3 ) DEFAULT NULL,
PRIMARY KEY ( `id` )
) ENGINE = INNODB DEFAULT CHARSET = utf8mb4
执行测试
Elasticsearch Sink连接器
注意:
Flink Elasticsearch Sink的每个并行实例使用一个BulkProcessor向集群发送操作请求。 这会在元素批量发送到集群之前进行缓存。 BulkProcessor 一次执行一个批量请求,即不会存在两个并行刷新缓存的操作。
new Elasticsearch7SinkBuilder<String>()
// 设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
.setBulkFlushMaxActions(1)
Elasticsearch Sinks容错:
通过启用Flink checkpoint,Elasticsearch Sink保证至少一次将操作请求发送到Elasticsearch集群。 这是通过在进行 checkpoint时等待BulkProcessor中所有挂起的操作请求来实现。 这有效地保证了在触发checkpoint之前所有的请求被Elasticsearch成功确认,然后继续处理发送到sink的记录。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 5000 毫秒执行一次 checkpoint
env.enableCheckpointing(5000);
Elasticsearch请求处理失败:
ES操作请求可能由于多种原因而失败,包括节点队列容量暂时已满或者要被索引的文档格式错误。 Flink Elasticsearch Sink允许用户通过通过指定一个退避策略来重试请求。
new Elasticsearch7SinkBuilder<String>()
// 启用一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5
.setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
配置内部批量处理器:
可以进一步配置内部的 BulkProcessor 关于其如何刷新缓存操作请求的行为:
setBulkFlushMaxActions(int numMaxActions):刷新前最大缓存的操作数。
setBulkFlushMaxSizeMb(int maxSizeMb):刷新前最大缓存的数据量(以兆字节为单位)。
setBulkFlushInterval(long intervalMillis):刷新的时间间隔(不论缓存操作的数量或大小如何)。
配置如何对暂时性请求错误进行重试:
// 退避延迟的类型,CONSTANT 或者 EXPONENTIAL,退避重试次数,退避重试的时间间隔。
// 对于常量延迟来说,此值是每次重试间的间隔。对于指数延迟来说,此值是延迟的初始值
setBulkFlushBackoffStrategy(FlushBackoffType flushBackoffType, int maxRetries, long delayMillis):
示例:
添加Elasticsearch7.x依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7</artifactId>
<version>3.0.1-1.17</version>
</dependency>
public class Demo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 5000 毫秒执行一次 checkpoint
env.enableCheckpointing(5000);
// 模拟数据流
DataStream<String> input = env.fromElements("data1", "data2", "data3");
// 构建 Elasticsearch Sink
input.sinkTo(new Elasticsearch7SinkBuilder<String>()
// 设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
.setBulkFlushMaxActions(1)
.setHosts(new HttpHost("127.0.0.1", 9200, "http"))
.setEmitter(
(element, context, indexer) -> indexer.add(createIndexRequest(element))
)
.build()
);
env.execute("Elasticsearch Sink Example");
}
/**
* 对每个传入的元素执行单个索引请求
* 还有DeleteRequest、 UpdateRequest
* 创建一个包含要写入的数据的IndexRequest对象,并设置索引名称、文档ID和数据源
*
* @param element
* @return
*/
private static IndexRequest createIndexRequest(String element) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.id(element)
.source(json);
}
}
MongoDB Sink连接器
Flink 提供了 MongoDB 连接器使用至少一次(At-least-once)的语义在 MongoDB collection 中读取和写入数据。
容错:
默认的写入语义为至少一次 (AT_LEAST_ONCE),但检查点并不是默认开启的,这会导致 Sink 缓冲写入请求,直到完成或
MongoWriter
自动刷新。 默认情况下,MongoWriter
会缓冲 1000 个新增的写入请求。
当开启了 Flink checkpoint,Flink MongoDB Sink 保证至少一次 (at-least-once) 的写入语义. MongoWriter 在检查点时,会确认所有被缓存的写入操作被正确写入
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
配置 Mongo Writer:
内部的 MongoWriter 可以使用 MongoSinkBuilder 更精细化地配置来实现不一样的写入行为:
setBatchSize(int batchSize):设置写入的批次大小。 可以设置为 -1 来禁用批式写入。
setBatchIntervalMs(long batchIntervalMs):设置写入的最大间隔时间,单位为毫秒。 可以设置为 -1 来禁用批式写入。
当使用如下设置时,会产生不一样的写入行为:
1.当缓存记录数超过最大批次大小,或者写入时间间隔超过限制时写入
batchSize > 1 and batchInterval > 0
2.仅在检查点写入
batchSize == -1 and batchInterval == -1
3.对每条记录进行写入
batchSize == 1 or batchInterval == 0
示例:
添加依赖到项目:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>1.0.1-1.17</version>
</dependency>
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 模拟数据流
DataStream<String> stream = env.fromElements("data1", "data2", "data3");
// 构建 MongoDB Sink
MongoSink<String> sink = MongoSink.<String>builder()
// 设置 MongoDB 连接字符串
.setUri("mongodb://user:password@127.0.0.1:27017")
// 设置写入的数据库名称
.setDatabase("my_db")
// 设置写入的集合名称
.setCollection("my_coll")
// 默认值:1000。 设置写入的最大批次大小。可以设置为 -1 来禁用批式写入
.setBatchSize(1000)
// 默认值:1000. 设置写入的最大间隔时间,单位为毫秒。可以设置为 -1 来禁用批式写入
.setBatchIntervalMs(1000)
// 默认值:3。设置写入失败时最大重试次数
.setMaxRetries(3)
// 默认值:DeliveryGuarantee.AT_LEAST_ONCE. 设置投递保证。 仅一次(EXACTLY_ONCE)的投递保证暂不支持
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
// 将数据对象转换为 MongoDB 中的文档对象
.setSerializationSchema(
(input, context) -> new InsertOneModel<>(BsonDocument.parse(input)))
.build();
// 将数据流写入 MongoDB
stream.sinkTo(sink);
env.execute("MongoDB Sink Example");
}
自定义Sink
当Flink没有提供可以直接使用的连接器,就只能自定义Sink进行输出。与Source类似,Flink提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
在Flink中有两种主要类型的 Sink,分别是SinkFunction
和RichSinkFunction
1.SinkFunction:
SinkFunction是 Flink 提供的一个简单的数据接收器接口。可以通过实现SinkFunction接口来定义接收数据和处理数据的逻辑。
它只有一个核心方法`
void invoke(IN value) throws Exception: 用于接收数据并进行处理。
2.RichSinkFunction:
RichSinkFunction是SinkFunction的子类,它提供了更多的生命周期管理方法和访问上下文的功能。通过继承 RichSinkFunction,可以在接收器的生命周期过程中进行一些初始化或清理操作
它定义了三个额外的方法:
void open(Configuration parameters) throws Exception:初始化资源的方法,可以在接收器开始时执行
void close() throws Exception:清理资源的方法,可以在接收器停止时执行
void setRuntimeContext(RuntimeContext t):设置运行时上下文对象,可以通过该对象访问一些运行时环境信息
RichSinkFunction
实现RichSinkFunction,重写关键方法invoke(),在方法中实现将流里的数据发送出去的逻辑。
public static class MySink extends RichSinkFunction<String> {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化操作 例如:创建Kafka生产者
}
@Override
public void close() throws Exception {
super.close();
// 清理、销毁操作 例如:关闭Kafka生产者
}
/**
* sink的核心逻辑
*
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(String value, Context context) throws Exception {
// 来一条数据,调用一次
// 处理接收到的数据 例如:将数据发送到Kafka
// 打印接收到的数据
System.out.println(value);
}
}
SinkFunction
根据logLevel参数的不同,在invoke()方法中将数据以不同的日志级别发送到日志系统中
public class LogSinkFunction<T> implements SinkFunction<T> {
private final String logLevel; // 日志级别
public LogSinkFunction(String logLevel) {
this.logLevel = logLevel;
}
@Override
public void invoke(T value, Context context) throws Exception {
// 根据日志级别将数据发送到日志系统
switch (logLevel) {
case "INFO":
// 发送到INFO日志
break;
case "WARN":
// 发送到WARN日志
break;
case "ERROR":
// 发送到ERROR日志
break;
default:
// 默认发送到INFO日志
break;
}
}
}
验证测试
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.fromElements("1", "2", "3");
// source.addSink(new MySink());
source.addSink(new LogSinkFunction<>("INFO"));
source.print();
env.execute();
}