Flink之输出算子Data Sink

news2024/12/24 0:10:13

Flink之输出算子Data Sink

  • Data Sink
  • 常见输出算子
    • print()
    • printToErr()
    • writeAsText()
    • writeAsCsv()
    • writeToSocket()
  • 常用连接器
    • File Sink连接器
    • Kafka Sink连接器
    • RabbitMQ Sink连接器
    • JDBC Sink连接器
    • Elasticsearch Sink连接器
    • MongoDB Sink连接器
  • 自定义Sink
    • RichSinkFunction
    • SinkFunction
    • 验证测试

Data Sink

在Apache Flink中,输出算子(Data Sink)用于将数据流发送到外部系统或存储介质中,如数据库、消息队列、文件系统、Apache Kafka等,以便进行后续的持久化、分析或其他操作。输出算子是数据流处理的最后一步,它决定了数据的最终去向。

Flink提供了各种内置的输出算子,可支持许多常见的数据存储系统,如print()、printToErr()、writeAsText()等。

Flink还提供了一部分框架的Sink连接器,支持与许多外部系统集成的连接器,如Apache Kafka、Elasticsearch、JDBC、MongoDB等。这些连接器提供了专门的输出算子,可以直接与这些外部系统进行交互。

也可以自定义输出算子来与其他系统进行集成,只需实现 SinkFunction 接口,并将其用作 addSink 方法的参数。

常见输出算子

在使用Flink进行数据处理时,数据经Data Source流入,然后通过系列Transformations的转化,最终可以通过Sink 将计算结果进行输出,Flink Data Sinks就是用于定义数据流最终的输出位置。

Flink 提供了几个较为简单的 Sink API 用于日常的开发,具体如下:

print()

将数据打印到标准输出stdout,不需要额外配置,它会将数据元素以字符串形式直接打印到标准输出上

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dataStream = env.fromElements("Hello", "World", "Flink");

dataStream.print();

env.execute("Print Data");

printToErr()

将数据打印到标准错误输出stderr,类似于 print(),但将数据打印到标准错误输出上。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dataStream = env.fromElements("Hello", "World", "Flink");

dataStream.printToErr();

env.execute("Print to StdErr");

writeAsText()

将数据写入文本文件,将数据按照行的形式写入文本文件,每行一个数据元素。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dataStream = env.fromElements("Hello", "World", "Flink");

dataStream.writeAsText("/path/to/output.txt"); // 指定输出路径和文件名

env.execute("Write as Text");

可以通过指定第二个参数来定义输出模式,它有以下两个可选值:

WriteMode.NO_OVERWRITE:当指定路径上不存在任何文件时,才执行写出操作

WriteMode.OVERWRITE:不论指定路径上是否存在文件,都执行写出操作;如果原来已有文件,则进行覆盖

并行的方式写出到多个文件:

 streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE);

将输出结果全部写出到一个文件,设置其并行度为1:

streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

writeAsCsv()

用于将计算结果以CSV的文件格式写出到指定目录,除了路径参数是必选外,该方法还支持传入输出模式,行分隔符,和字段分隔符三个额外的参数

DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(
  new Tuple2<>("Alice", 25),
  new Tuple2<>("Bob", 30),
  new Tuple2<>("Charlie", 35)
);

// writeAsCsv(String path, WriteMode writeMode, String rowDelimiter, String fieldDelimiter) 
dataStream.writeAsCsv("/path/to/output.csv", WriteMode.OVERWRITE, "\n", ",");

env.execute("Write as CSV");

writeToSocket()

将数据通过网络 socket 发送到指定的主机和端口。可以用于将数据流输出到外部系统或进行简单的网络通信。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dataStream = env.fromElements("Hello", "World", "Flink");

dataStream.writeToSocket("localhost", 9999, new SimpleStringSchema());

env.execute("Write to Socket");

常用连接器

连接器可以和多种多样的第三方系统进行交互。Flink官方目前支持以下第三方系统连接器

Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon DynamoDB (sink)
Amazon Kinesis Data Streams (source/sink)
Amazon Kinesis Data Firehose (sink)
DataGen (source)
Elasticsearch (sink)
Opensearch (sink)
FileSystem (sink)
RabbitMQ (source/sink)
Google PubSub (source/sink)
Hybrid Source (source)
Apache Pulsar (source)
JDBC (sink)
MongoDB (source/sink)

除Flink官方之外,还有一些其他第三方系统与Flink的连接器,通过Apache Bahir发布:

Apache ActiveMQ (source/sink)
Apache Flume (sink)
Redis (sink)
Akka (sink)
Netty (source)

File Sink连接器

Flink提供了一个流式文件系统的连接器FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。

File Sink支持行编码Row-encoded和批量编码Bulk-encoded格式。这两种不同的方式都有各自的构建器builder,可以直接调用FileSink的静态方法:

// 行编码
FileSink.forRowFormat(basePath,rowEncoder)

// 批量编码
FileSink.forBulkFormat(basePath,bulkWriterFactory)

File Sink将传入的数据写入存储桶中。考虑到输入流可以是无界的,每个桶中的数据被组织成有限大小的 Part 文件。 完全可以配置为基于时间的方式往桶中写入数据,比如可以设置每个小时的数据写入一个新桶中。

注意:

在 STREAMING 模式下使用 FileSink 需要开启 Checkpoint 功能。 文件只在 Checkpoint 成功时生成。如果没有开启 Checkpoint 功能,文件将永远停留在 in-progress 或者 pending 的状态,并且下游系统将不能安全读取该文件数据。

 public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每个目录中,有N个并行度的文件在写入
        env.setParallelism(2);

        // 开启checkpoint,否则生成文件有后缀: .inprogress
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        /**
         * 生成模拟数据
         */
        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                Long.MAX_VALUE, RateLimiterStrategy.perSecond(1000), Types.STRING);

        // 读取模拟数据
        DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");

        // 输出到文件系统
        FileSink<String> fieSink = FileSink
                // 输出行式存储的文件,指定路径、指定编码
                .<String>forRowFormat(new Path("D:/temp"), new SimpleStringEncoder<>("UTF-8"))
                // 输出文件的一些配置: 文件名的前缀、后缀
                .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("fileSink-").withPartSuffix(".log").build())
                // 按照目录分桶:每个小时一个目录
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 文件滚动策略:  30秒 或 1m 生成一个文件
                .withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofSeconds(30)).withMaxPartSize(new MemorySize(1024 * 1024)).build())
                .build();

        dataGen.sinkTo(fieSink);
        env.execute();
    }

在这里插入图片描述

Kafka Sink连接器

添加Kafka 连接器依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.17.0</version>
</dependency>
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 写到kafka的一致性级别如果是精准一次,必须开启checkpoint
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        SingleOutputStreamOperator<String> singleOutputStreamOperator = env.socketTextStream("node01", 8888);

        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // 指定kafka地址和端口
                .setBootstrapServers("node01:9092,node01:9092,node03:9092")
                // 指定序列化器、指定Topic名称、具体的序列化
                .setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("kafkaSink").setValueSerializationSchema(new SimpleStringSchema()).build())
                // 写到kafka的一致性级别: 精准一次、至少一次
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("kafkaSink-")
                // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();


        singleOutputStreamOperator.sinkTo(kafkaSink);

        env.execute();
    }

自定义序列化器,实现带key的record

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());


        SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("node01", 8888);

        /**
         * 自定义序列器:
         * 实现KafkaRecordSerializationSchema接口,重写ProducerRecord序列化方法
         * 指定key、value,转成字节数组
         * @return 返回一个 ProducerRecord对象,把key、value放进去
         */
        class MyKafkaRecordSerializa implements KafkaRecordSerializationSchema<String> {
            @Nullable
            @Override
            public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext kafkaSinkContext, Long timestamp) {
                String[] datas = element.split(",");
                byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                byte[] value = element.getBytes(StandardCharsets.UTF_8);
                return new ProducerRecord<>("kafkaSink", key, value);
            }
        }

        KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("node01:9092,node02:9092,node03:9092")
                .setRecordSerializer(new MyKafkaRecordSerializa())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("kafkaSink-")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();

        sensorDS.sinkTo(kafkaSink);

        env.execute();
    }

启动一个消费者,查看是否收到数据

bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --topic ws

RabbitMQ Sink连接器

要在Flink中使用RabbitMQ作为输出连接器,可以使用Flink提供的RabbitMQ Sink。

添加对RabbitMQ连接器的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-rabbitmq</artifactId>
    <version>3.0.1-1.17</version>
</dependency>

定义一个用于序列化数据的简单类

public class MyFlinkBean {
    public String id;

    public String name;
    public Integer age;

	// getter() setter()

    public MyFlinkBean() {
    }

    public MyFlinkBean(String id, String name, Integer age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }
}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        /**
         * 构建数据源
         * 定义要发送到RabbitMQ的数据流
         */
        ArrayList<String> list = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            list.add(UUID.randomUUID() + "," + "flink" + i + "," + i);
        }
        DataStreamSource<String> stream = env.fromCollection(list);

        /**
         * Map算子转换处理
         */
        class MyMapFunction implements MapFunction<String, MyFlinkBean> {
            @Override
            public MyFlinkBean map(String value) throws Exception {
                String[] data = value.split(",");
                String uid = data[0].replace("-", "");
                return new MyFlinkBean(uid, data[1], Integer.valueOf(data[2]));
            }
        }
        SingleOutputStreamOperator<MyFlinkBean> source = stream.map(new MyMapFunction());


        // 定义RabbitMQ连接配置
        RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setPort(5672).setUserName("work").setPassword("12345678").setVirtualHost("/").build();

        // 将MyFlinkBean对象序列化为字节数组
        class MySerializationSchema implements SerializationSchema<MyFlinkBean> {
            @Override
            public byte[] serialize(MyFlinkBean flinkBean) {
                return flinkBean.toString().getBytes();
            }
        }

        // 使用RabbitMQ连接器将数据发送到RabbitMQ队列
        RMQSink<MyFlinkBean> rabbitMQSink = new RMQSink<>(connectionConfig, "flinkQueue", new MySerializationSchema());
        source.addSink(rabbitMQSink);

        env.execute("RabbitMQ Example");
    }

在这里插入图片描述

    /**
     * MQ其他参数配置
     */

    public class MyRMQSinkPublishOptions implements RMQSinkPublishOptions<MyFlinkBean> {

        private final String exchangeName;
        private final String routingKey;

        public MyRMQSinkPublishOptions(String exchangeName, String routingKey) {
            this.exchangeName = exchangeName;
            this.routingKey = routingKey;
        }

        @Override
        public String computeRoutingKey(MyFlinkBean element) {
            // 根据具体逻辑计算路由键
            return routingKey;
        }


        @Override
        public AMQP.BasicProperties computeProperties(MyFlinkBean a) {
            // 其他MQ参数配置
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties();
            return basicProperties;
        }

        @Override
        public String computeExchange(MyFlinkBean a) {
            // 根据具体逻辑计算交换机名称
            return exchangeName;
        }
    }
        // 交换机名称
        String exchange = "myExchange";
        // 路由键名称
        String routingKey = "myRoutingKey";

        // 使用RabbitMQ连接器将数据发送到RabbitMQ队列
        RMQSink<MyFlinkBean> rabbitMQSink = new RMQSink<>(connectionConfig, new MySerializationSchema(), new MyRMQSinkPublishOptions(exchange, routingKey), null);
        source.addSink(rabbitMQSink);

JDBC Sink连接器

添加MySQL驱动

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.20</version>
</dependency>

添加jdbc连接器

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>3.1.0-1.17</version>
</dependency>

定义一个用于序列化数据的简单类

public class MyFlinkBean {
    public String id;

    public String name;
    public Integer age;

	// getter() setter()

    public MyFlinkBean() {
    }

    public MyFlinkBean(String id, String name, Integer age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }
}

编写输出到MySQL

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        /**
         * 构建数据源
         */
        ArrayList<String> list = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            list.add(UUID.randomUUID() + "," + "flink" + i + "," + i);
        }
        DataStreamSource<String> stream = env.fromCollection(list);

        /**
         * Map算子转换处理
         */
        class MyMapFunction implements MapFunction<String, MyFlinkBean> {
            @Override
            public MyFlinkBean map(String value) throws Exception {
                String[] data = value.split(",");
                String uid = data[0].replace("-", "");
                return new MyFlinkBean(uid, data[1], Integer.valueOf(data[2]));
            }
        }
        SingleOutputStreamOperator<MyFlinkBean> source = stream.map(new MyMapFunction());


        /**
         * 预编译SQL,对SQL填充占位符
         */
        class MyJdbcStatementBuilder implements JdbcStatementBuilder<MyFlinkBean> {
            @Override
            public void accept(PreparedStatement preparedStatement, MyFlinkBean myFlinkBean) throws SQLException {
                preparedStatement.setString(1, myFlinkBean.getId());
                preparedStatement.setString(2, myFlinkBean.getName());
                preparedStatement.setInt(3, myFlinkBean.getAge());
            }
        }


        // 定义执行的sql
        String sql = "insert into flink values(?,?,?)";

        /**
         * 执行选项参数
         */
        JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
                // 重试次数
                .withMaxRetries(3)
                // 批次的大小:条数
                .withBatchSize(100)
                // 批次的时间
                .withBatchIntervalMs(3000)
                .build();

        /**
         * 连接选项参数
         */
        JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:mysql://localhost:3306/demo?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                .withUsername("root")
                .withPassword("123456")
                // 重试的超时时间
                .withConnectionCheckTimeoutSeconds(60)
                .build();


        SinkFunction<MyFlinkBean> jdbcSink = JdbcSink.sink(sql, new MyJdbcStatementBuilder(), jdbcExecutionOptions, jdbcConnectionOptions);

        source.addSink(jdbcSink);

        env.execute();
    }

创建表

CREATE TABLE `flink` (
	`id` VARCHAR ( 32 ) NOT NULL,
	`name` VARCHAR ( 10 ) DEFAULT NULL,
	`age` TINYINT ( 3 ) DEFAULT NULL,
PRIMARY KEY ( `id` ) 
) ENGINE = INNODB DEFAULT CHARSET = utf8mb4

执行测试

在这里插入图片描述

Elasticsearch Sink连接器

注意:

Flink Elasticsearch Sink的每个并行实例使用一个BulkProcessor向集群发送操作请求。 这会在元素批量发送到集群之前进行缓存。 BulkProcessor 一次执行一个批量请求,即不会存在两个并行刷新缓存的操作。

new Elasticsearch7SinkBuilder<String>()
 // 设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
.setBulkFlushMaxActions(1)

Elasticsearch Sinks容错:

通过启用Flink checkpoint,Elasticsearch Sink保证至少一次将操作请求发送到Elasticsearch集群。 这是通过在进行 checkpoint时等待BulkProcessor中所有挂起的操作请求来实现。 这有效地保证了在触发checkpoint之前所有的请求被Elasticsearch成功确认,然后继续处理发送到sink的记录。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 5000 毫秒执行一次 checkpoint
env.enableCheckpointing(5000);

Elasticsearch请求处理失败:

ES操作请求可能由于多种原因而失败,包括节点队列容量暂时已满或者要被索引的文档格式错误。 Flink Elasticsearch Sink允许用户通过通过指定一个退避策略来重试请求。

	new Elasticsearch7SinkBuilder<String>()
        // 启用一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5
        .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)

配置内部批量处理器:

可以进一步配置内部的 BulkProcessor 关于其如何刷新缓存操作请求的行为:

setBulkFlushMaxActions(int numMaxActions):刷新前最大缓存的操作数。
setBulkFlushMaxSizeMb(int maxSizeMb):刷新前最大缓存的数据量(以兆字节为单位)。
setBulkFlushInterval(long intervalMillis):刷新的时间间隔(不论缓存操作的数量或大小如何)。

配置如何对暂时性请求错误进行重试:

// 退避延迟的类型,CONSTANT 或者 EXPONENTIAL,退避重试次数,退避重试的时间间隔。
// 对于常量延迟来说,此值是每次重试间的间隔。对于指数延迟来说,此值是延迟的初始值
setBulkFlushBackoffStrategy(FlushBackoffType flushBackoffType, int maxRetries, long delayMillis)

示例:
添加Elasticsearch7.x依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7</artifactId>
    <version>3.0.1-1.17</version>
</dependency>
public class Demo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 每 5000 毫秒执行一次 checkpoint
        env.enableCheckpointing(5000);

        // 模拟数据流
        DataStream<String> input = env.fromElements("data1", "data2", "data3");

        // 构建 Elasticsearch Sink
        input.sinkTo(new Elasticsearch7SinkBuilder<String>()
                // 设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
                .setBulkFlushMaxActions(1)
                .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
                .setEmitter(
                        (element, context, indexer) -> indexer.add(createIndexRequest(element))
                )
                .build()
        );

        env.execute("Elasticsearch Sink Example");
    }

    /**
     * 对每个传入的元素执行单个索引请求
     * 还有DeleteRequest、 UpdateRequest
     * 创建一个包含要写入的数据的IndexRequest对象,并设置索引名称、文档ID和数据源
     *
     * @param element
     * @return
     */
    private static IndexRequest createIndexRequest(String element) {
        Map<String, Object> json = new HashMap<>();
        json.put("data", element);

        return Requests.indexRequest()
                .index("my-index")
                .id(element)
                .source(json);
    }
}

MongoDB Sink连接器

Flink 提供了 MongoDB 连接器使用至少一次(At-least-once)的语义在 MongoDB collection 中读取和写入数据。

容错:

默认的写入语义为至少一次 (AT_LEAST_ONCE),但检查点并不是默认开启的,这会导致 Sink 缓冲写入请求,直到完成或 MongoWriter 自动刷新。 默认情况下,MongoWriter 会缓冲 1000 个新增的写入请求。

当开启了 Flink checkpoint,Flink MongoDB Sink 保证至少一次 (at-least-once) 的写入语义. MongoWriter 在检查点时,会确认所有被缓存的写入操作被正确写入

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); 

配置 Mongo Writer:

内部的 MongoWriter 可以使用 MongoSinkBuilder 更精细化地配置来实现不一样的写入行为:

setBatchSize(int batchSize):设置写入的批次大小。 可以设置为 -1 来禁用批式写入。
setBatchIntervalMs(long batchIntervalMs):设置写入的最大间隔时间,单位为毫秒。 可以设置为 -1 来禁用批式写入。

当使用如下设置时,会产生不一样的写入行为:

1.当缓存记录数超过最大批次大小,或者写入时间间隔超过限制时写入

batchSize > 1 and batchInterval > 0

2.仅在检查点写入

batchSize == -1 and batchInterval == -1

3.对每条记录进行写入

batchSize == 1 or batchInterval == 0

示例:

添加依赖到项目:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-mongodb</artifactId>
    <version>1.0.1-1.17</version>
</dependency>
    public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 模拟数据流
        DataStream<String> stream = env.fromElements("data1", "data2", "data3");

        // 构建 MongoDB Sink
        MongoSink<String> sink = MongoSink.<String>builder()
                // 设置 MongoDB 连接字符串
                .setUri("mongodb://user:password@127.0.0.1:27017")
                // 设置写入的数据库名称
                .setDatabase("my_db")
                // 设置写入的集合名称
                .setCollection("my_coll")
                // 默认值:1000。 设置写入的最大批次大小。可以设置为 -1 来禁用批式写入
                .setBatchSize(1000)
                // 默认值:1000. 设置写入的最大间隔时间,单位为毫秒。可以设置为 -1 来禁用批式写入
                .setBatchIntervalMs(1000)
                // 默认值:3。设置写入失败时最大重试次数
                .setMaxRetries(3)
                // 默认值:DeliveryGuarantee.AT_LEAST_ONCE. 设置投递保证。 仅一次(EXACTLY_ONCE)的投递保证暂不支持
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                // 将数据对象转换为 MongoDB 中的文档对象
                .setSerializationSchema(
                        (input, context) -> new InsertOneModel<>(BsonDocument.parse(input)))
                .build();

        // 将数据流写入 MongoDB
        stream.sinkTo(sink);

        env.execute("MongoDB Sink Example");
    }

自定义Sink

当Flink没有提供可以直接使用的连接器,就只能自定义Sink进行输出。与Source类似,Flink提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

在Flink中有两种主要类型的 Sink,分别是SinkFunctionRichSinkFunction

1.SinkFunction:

SinkFunction是 Flink 提供的一个简单的数据接收器接口。可以通过实现SinkFunction接口来定义接收数据和处理数据的逻辑。

它只有一个核心方法`

void invoke(IN value) throws Exception: 用于接收数据并进行处理。

2.RichSinkFunction:

RichSinkFunction是SinkFunction的子类,它提供了更多的生命周期管理方法和访问上下文的功能。通过继承 RichSinkFunction,可以在接收器的生命周期过程中进行一些初始化或清理操作

它定义了三个额外的方法:

void open(Configuration parameters) throws Exception:初始化资源的方法,可以在接收器开始时执行

void close() throws Exception:清理资源的方法,可以在接收器停止时执行

void setRuntimeContext(RuntimeContext t):设置运行时上下文对象,可以通过该对象访问一些运行时环境信息

RichSinkFunction

实现RichSinkFunction,重写关键方法invoke(),在方法中实现将流里的数据发送出去的逻辑。

    public static class MySink extends RichSinkFunction<String> {

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            // 初始化操作 例如:创建Kafka生产者
        }

        @Override
        public void close() throws Exception {
            super.close();
            // 清理、销毁操作 例如:关闭Kafka生产者
        }

        /**
         * sink的核心逻辑
         *
         * @param value
         * @param context
         * @throws Exception
         */
        @Override
        public void invoke(String value, Context context) throws Exception {
            // 来一条数据,调用一次
            // 处理接收到的数据  例如:将数据发送到Kafka
            
	        // 打印接收到的数据
	        System.out.println(value);
        }
    }

SinkFunction

根据logLevel参数的不同,在invoke()方法中将数据以不同的日志级别发送到日志系统中

    public class LogSinkFunction<T> implements SinkFunction<T> {
        private final String logLevel; // 日志级别

        public LogSinkFunction(String logLevel) {
            this.logLevel = logLevel;
        }

        @Override
        public void invoke(T value, Context context) throws Exception {
            // 根据日志级别将数据发送到日志系统
            switch (logLevel) {
                case "INFO":
                    // 发送到INFO日志
                    break;
                case "WARN":
                    // 发送到WARN日志
                    break;
                case "ERROR":
                    // 发送到ERROR日志
                    break;
                default:
                    // 默认发送到INFO日志
                    break;
            }
        }
    }

验证测试

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> source = env.fromElements("1", "2", "3");

//        source.addSink(new MySink());
        source.addSink(new LogSinkFunction<>("INFO"));

        source.print();
        env.execute();
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1113560.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

海外展预告 | 同立海源将参展美国圣地亚哥SITC 2023年会

第38届癌症免疫治疗学会&#xff08;Society for Immunotherapy of Cancer, SITC&#xff09;年会将于11月1日-5日在美国圣地亚哥举行。同立海源将携细胞分选磁珠试剂、真核/原核重组蛋白、免疫细胞培养基等CGT上游GMP级核心原料整体解决方案参加此次会议并设立展台&#xff0c…

4.5 互联网的路由器

思维导图&#xff1a; 4.5 互联网的路由选择协议 本节的核心内容是讨论如何确定路由表中的路由&#xff0c;具体通过何种路由选择协议实现。 --- **4.5.1 有关路由选择协议的几个基本概念** - **理想的路由算法:** 路由选择协议的关键是路由算法。一个理想的路由算法应具…

如何打造独立站?这4个要点必须做到!

“什么是独立站”独立站指的是个人或小团队独立创建和管理的网站&#xff0c;与依赖于第三方平台的博客、社交媒体或在线商店不同。独立站的所有权和控制权完全归个人或小团队所有&#xff0c;因此具有更大的自主性和独立性&#xff0c;不受第三方平台的限制。 独立站是由个人…

Lua-http库写一个爬虫程序怎么样 ?

以下是一个使用Lua-http库编写的一个爬虫程序&#xff0c;该爬虫使用Lua语言来抓取www.snapchat.com的内容。 代码必须使用以下代码&#xff1a;get_proxy -- 导入所需的库 local http require("http") local json require("json")-- 定义爬虫IP服务器 …

必示科技发布“早准快全易”智能运维产品,与生态伙伴共谋增长

2023年10月13日&#xff0c;“因智而聚 共谋增长”必示科技产品发布活动在北京中关村智造大街圆满召开&#xff0c;来自智能运维行业领域共40多家企业高层代表出席了本次闭门交流活动。 必示科技发布了三款智能运维产品&#xff1a;应用监控预警系统&#xff08;RiskSeer-App&…

[SQL开发笔记]创建SQL数据库

一、引言 在计算机软件开发以及业务流程中&#xff0c;大量数据不断产生&#xff0c;如何安全有效地存储、检索和管理它们已成为信息时代一个至关重要的问题。解决这个问题的关键在于使用数据库&#xff0c;数据库能够高效且条理分明地存储数据&#xff0c;方便用户进行迅速和…

TikTok Shop新结算政策:卖家选择权加强,电商市场蓄势待发

据悉&#xff0c;从2023年11月1日开始&#xff0c;TikTok Shop将根据卖家的店铺表现来应用3种不同类型的结算期&#xff0c;其中&#xff0c;标准结算期&#xff1a;资金交收期为8个日历日&#xff1b;快速结算期&#xff1a;资金交收期为3个日历日&#xff1b;延长结算期&…

HarmonyOS开发:Log工具类源码分析

前言 一转眼就十月中旬了&#xff0c;国庆的劲真大&#xff0c;到现在还未缓过来&#xff0c;以至于要更新的文章迟迟未发布&#xff0c;大家可以看到&#xff0c;最近一段时间的文章&#xff0c;都是关于HarmonyOS相关的&#xff0c;两个原因吧&#xff0c;一是我司有这样的任…

《数据结构、算法与应用C++语言描述》使用C++语言实现数组双端队列

《数据结构、算法与应用C语言描述》使用C语言实现数组双端队列 定义 队列的定义 队列&#xff08;queue&#xff09;是一个线性表&#xff0c;其插入和删除操作分别在表的不同端进行。插入元素的那一端称为队尾&#xff08;back或rear&#xff09;&#xff0c;删除元素的那一…

网站二级域名怎么部署SSL证书?

二级域名是在主域名下创建的子域名&#xff0c;常用于区分不同功能或部门的网站。随着互联网的发展&#xff0c;越来越多的网站开始采用二级域名来构建更灵活和个性化的网站结构&#xff0c;保护二级域名的数据安全也变得至关重要。为了确保二级域名的安全性&#xff0c;申请SS…

python爬虫requests.get乱码问题

爬取百度图片的时候res.text出现乱码&#xff1a; 解决&#xff1a; 删除请求头中的接受编码项

当量因子法、InVEST、SolVES模型等多技术融合在生态系统服务功能社会价值评估中的应用及论文写作、拓展分析

生态系统服务是人类从自然界中获得的直接或间接惠益&#xff0c;可分为供给服务、文化服务、调节服务和支持服务4类&#xff0c;对提升人类福祉具有重大意义&#xff0c;且被视为连接社会与生态系统的桥梁。自从启动千年生态系统评估项目&#xff08;Millennium Ecosystem Asse…

excel表格怎么设置数据超链接?

在Excel表格中&#xff0c;可以设置超链接来快速导航到其他单元格、工作表、文件、网页等。下面我将详细介绍如何设置数据超链接。 1. 在Excel表格中选择要添加超链接的单元格或文本。 2. 使用鼠标右键点击选定的单元格&#xff0c;然后选择“超链接”选项&#xff0c;或者在…

2023下半年信息系统集成设计师案例

案例题 重要的知识点容易忽略的知识点不错的小题合同管理配置管理变更管理成本管理招标管理人力资源管理质量管理风险管理沟通管理立项和需求 能说专业术语&#xff08;比喻十大领域管理不足&#xff09;就说&#xff0c;没法说的大白话也没问题 重要的知识点 如果题目没有说从…

Bazzite:专为 Steam Deck 和 PC 上的 Linux 游戏打造的发行版

导读对于一个专为 Linux 游戏定制的发行版&#xff0c;你是否感兴趣呢&#xff1f;如果答案是肯定的&#xff0c;那么我们为你准备了绝佳选择。 Bazzite 是一个新推出的基于 Fedora 的发行版&#xff0c;它是为 Linux 桌面上的游戏&#xff0c;以及越来越火热的 Steam Deck 定…

必不可少的UI组件二——组件库开发的基础知识(工程化篇)

组件库工程化概述 在 必不可少的UI组件——组件库开发的基础知识(Vue篇) 中&#xff0c;我们介绍了一些封装 Vue 组件的过程中高频使用到的框架技巧&#xff0c;但是&#xff0c;这并不足以支持我们实现完善的组件库。 建设一个成熟的组件库就像盖一幢大楼&#xff0c;工程化…

开发从0 到1获取代码,提交,推送

1,首先我们要下载git 2,下载一个github desktop 3,下载好git 后拉取代码 git clone 克隆的地址 4&#xff0c;克隆好项目后&#xff0c;配置git 密钥到你的账号上 4.1没有有密钥怎么生成&#xff1f; git config --global user.name "xxx" git config --globa…

无线射频收发芯片:Si24R2F

Si24R2F是一款2.4GHz超低功耗有源RFID标签系统的SoC单芯片&#xff0c;集成嵌入式2.4GHz无线射频发射器模块、64次可编程NVM存储器模块以及自动发射控Z器模块等。 Si24R2F支持4通道轮询发射&#xff0c;4个信道可以轮流发射不同的数据&#xff0c;从而增加系统卡片容量。同时支…

温湿度实时监测,这个方法太强了!

温湿度监控是现代社会中一个日益重要的技术领域&#xff0c;它不仅涉及到人们的日常生活&#xff0c;也在各种产业和领域中发挥着至关重要的作用。随着科技的不断进步&#xff0c;我们对环境条件的监测和控制需求愈发增强。 客户案例 制药行业 在制药行业&#xff0c;药品的质量…

Si24R2H无线射频芯片 125KHz唤醒功能

​ 产品信息: Si24R2H-2.4GHz无线发射单芯片 集成嵌入式基带 发射频率范围&#xff1a;2400MHz~2525MHz 接收频率范围&#xff1a;15KHz~150KHz 支持2Mbps、1Mbps和250Kbps三种发射数据速率 产品功能: 1、高精度的位置定位 2、测温和报J 3、实现与手机的联动 4、外W设…