【Flink-1.17-教程】-【四】Flink DataStream API(7)输出算子(Sink)

news2024/9/25 2:28:09

【Flink-1.17-教程】-【四】Flink DataStream API(7)输出算子(Sink)

  • 1)连接到外部系统
  • 2)输出到文件
  • 3)输出到 Kafka
  • 4)输出到 MySQL(JDBC)
  • 5)自定义 Sink 输出

Flink 作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

在这里插入图片描述

1)连接到外部系统

Flink 的 DataStream API 专门提供了向外部写入数据的方法:addSink。与 addSource 类似,addSink 方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink 程序中所有对外的输出操作,一般都是利用 Sink 算子完成的。

Flink1.12 以前,Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。

stream.addSink(new SinkFunction());

addSink 方法同样需要传入一个参数,实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法 invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

Flink1.12 开始,同样重构了 Sink 架构,

stream.sinkTo()

当然,Sink 多数情况下同样并不需要我们自己实现。之前我们一直在使用的 print 方法其实就是一种 Sink,它表示将数据流写入标准控制台打印输出。Flink 官方为我们提供了一部分的框架的 Sink 连接器。如下图所示,列出了 Flink 官方目前支持的第三方系统连接器:

在这里插入图片描述

我们可以看到,像 Kafka 之类流式系统,Flink 提供了完美对接,source / sink 两端都能连接,可读可写;而对于 Elasticsearch、JDBC 等数据存储系统,则只提供了输出写入的 sink 连接器。

除 Flink 官方之外,Apache Bahir 框架,也实现了一些其他第三方系统与 Flink 的连接器。

在这里插入图片描述

除此以外,就需要用户自定义实现 sink 连接器了。

2)输出到文件

Flink 专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink 支持的文件系统。

FileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用 FileSink 的静态方法:

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。

示例:

public class SinkFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 每个目录中,都有 并行度个数的 文件在写入
        env.setParallelism(2);

        // 必须开启checkpoint,否则一直都是 .inprogress
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                Long.MAX_VALUE,
                RateLimiterStrategy.perSecond(1000),
                Types.STRING
        );

        DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");

        // TODO 输出到文件系统
        FileSink<String> fieSink = FileSink
                // 输出行式存储的文件,指定路径、指定编码
                .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
                // 输出文件的一些配置: 文件名的前缀、后缀
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("atguigu-")
                                .withPartSuffix(".log")
                                .build()
                )
                // 按照目录分桶:如下,就是每个小时一个目录
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 文件滚动策略:  1分钟 或 1m
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(1))
                                .withMaxPartSize(new MemorySize(1024*1024))
                                .build()
                )
                .build();


        dataGen.sinkTo(fieSink);

        env.execute();
    }
}

3)输出到 Kafka

1、添加 Kafka 连接器依赖。

由于我们已经测试过从 Kafka 数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。

2、启动 Kafka 集群。

3、编写输出到 Kafka 的示例代码。

(1)输出无 key 的 record:

public class SinkKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 如果是精准一次,必须开启checkpoint(后续章节介绍)
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102", 7777);

        /**
         * Kafka Sink:
         * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
         * 1、开启checkpoint(后续介绍)
         * 2、设置事务前缀
         * 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // 指定 kafka 的地址和端口
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                // 指定序列化器:指定Topic名称、具体的序列化
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("ws")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // 写到kafka的一致性级别: 精准一次、至少一次
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("atguigu-")
                // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
                .build();

        sensorDS.sinkTo(kafkaSink);

        env.execute();
    }
}

(2)自定义序列化器,实现带 key 的 record:

public class SinkKafkaWithKey {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());

        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102", 7777);

        /**
         * 如果要指定写入kafka的key
         * 可以自定义序列器:
         * 1、实现 一个接口,重写 序列化 方法
         * 2、指定key,转成 字节数组
         * 3、指定value,转成 字节数组
         * 4、返回一个 ProducerRecord对象,把key、value放进去
         *
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                .setRecordSerializer(

                        new KafkaRecordSerializationSchema<String>() {

                            @Nullable
                            @Override
                            public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                                String[] datas = element.split(",");
                                byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                return new ProducerRecord<>("ws", key, value);
                            }
                        }
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("atguigu-")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();

        sensorDS.sinkTo(kafkaSink);

        env.execute();
    }
}

4、运行代码,在 Linux 主机启动一个消费者,查看是否收到数据。

[hadoop102 ~]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws

4)输出到 MySQL(JDBC)

写入数据的 MySQL 的测试步骤如下。

1、添加依赖。

添加 MySQL 驱动:

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>

官方还未提供 flink-connector-jdbc 的 1.17.0 的正式依赖,暂时从 apache snapshot 仓库下载,pom 文件中指定仓库路径:

<repositories>
<repository>
<id>apache-snapshots</id>
<name>apache snapshots</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
</repositories>

添加依赖:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.17-SNAPSHOT</version>
</dependency>

如果不生效,还需要修改本地 maven 的配置文件,mirrorOf 中添加如下标红内容:

<mirror>
<id>aliyunmaven</id>
<mirrorOf>*,!apache-snapshots</mirrorOf>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>

2、启动 MySQL,在 test 库下建表 ws。

mysql>
CREATE TABLE `ws` (
`id` varchar(100) NOT NULL,
`ts` bigint(20) DEFAULT NULL,
`vc` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

3、编写输出到 MySQL 的示例代码。

public class SinkMySQL {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());

        /**
         * TODO 写入mysql
         * 1、只能用老的sink写法: addsink
         * 2、JDBCSink的4个参数:
         *    第一个参数: 执行的sql,一般就是 insert into
         *    第二个参数: 预编译sql, 对占位符填充值
         *    第三个参数: 执行选项 ---》 攒批、重试
         *    第四个参数: 连接选项 ---》 url、用户名、密码
         */
        SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
                "insert into ws values(?,?,?)",
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                        //每收到一条WaterSensor,如何去填充占位符
                        preparedStatement.setString(1, waterSensor.getId());
                        preparedStatement.setLong(2, waterSensor.getTs());
                        preparedStatement.setInt(3, waterSensor.getVc());
                    }
                },
                JdbcExecutionOptions.builder()
                        .withMaxRetries(3) // 重试次数
                        .withBatchSize(100) // 批次的大小:条数
                        .withBatchIntervalMs(3000) // 批次的时间
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("000000")
                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                        .build()
        );

        sensorDS.addSink(jdbcSink);

        env.execute();
    }
}

4、运行代码,用客户端连接 MySQL,查看是否成功写入数据。

5)自定义 Sink 输出

如果我们想将数据存储到我们自己的存储设备中,而 Flink 并没有提供可以直接使用的连接器,就只能自定义 Sink 进行输出了。与 Source 类似,Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction 抽象类,只要实现它,通过简单地调用 DataStream 的 .addSink() 方法就可以自定义写入任何外部存储。

stream.addSink(new MySinkFunction<String>());

在实现 SinkFunction 的时候,需要重写的一个关键方法 invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。

这种方式比较通用,对于任何外部存储系统都有效;不过自定义 Sink 想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器 Flink 官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1408618.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数论问题(算法村第十三关黄金挑战)

辗转相除法 8 和 12 的最大公因数是 4&#xff0c;记作 gcd(8,12)4。辗转相除法最重要的规则是&#xff1a; 若 mod 是 a b 的余数, 则gcd(a, b) gcd(b, mod)&#xff0c;直到a % b 0时&#xff0c;返回 b的值 gcd(546, 429) gcd(429, 117) gcd(117, 78) gcd(78, 39) …

Pytorch神经网络模型nn.Sequential与nn.Linear

1、定义模型 对于标准深度学习模型&#xff0c;我们可以使用框架的预定义好的层。这使我们只需关注使用哪些层来构造模型&#xff0c;而不必关注层的实现细节。 我们首先定义一个模型变量net&#xff0c;它是一个Sequential类的实例。 Sequential类将多个层串联在一起。 当给…

Redis:定时清理垃圾图片

首先理解清理垃圾文件的原理 在填写表单信息时上传图片后就已经存入云中&#xff0c;但是此时取消表单的填写这个图片就变成垃圾图片&#xff0c;所以在点击新建填写表单的方法/upload中&#xff0c;把上传的图片名字存入Redis的value中&#xff0c;key&#xff08;key1&#…

深入理解badblocks

文章目录 一、概述二、安装2.1、源码编译安装2.2、命令行安装2.3、安装确认 三、重要参数详解3.1、查询支持的参数3.2、参数说明 四、实例4.1、全面扫描4.2、破坏性写入并修复4.3、非破坏性写入测试 五、实现原理六、注意事项 团队博客: 汽车电子社区 一、概述 badblocks命令是…

NODE笔记 2 使用node操作飞书多维表格

前面简单介绍了node与简单的应用&#xff0c;本文通过结合飞书官方文档 使用node对飞书多维表格进行简单的操作&#xff08;获取token 查询多维表格recordid&#xff0c;删除多行数据&#xff0c;新增数据&#xff09; 文章目录 前言 前两篇文章对node做了简单的介绍&#xff…

【若依】关于对象查询list返回,进行业务处理以后的分页问题

1、查询对象Jglkq返回 list&#xff0c;对 list 进行业务处理后返回&#xff0c;但分页出现问题。 /*** 嫁功率考勤查询*/RequiresPermissions("hr:kq:list")PostMapping("/list")ResponseBodypublic TableDataInfo list(Jglkq jglkq) throws ParseExcepti…

骨传导耳机综评:透视南卡、韶音和墨觉三大品牌的性能与特点

在当前的蓝牙音频设备领域中&#xff0c;骨传导蓝牙运动耳机以其出色的安全特性和舒适的体验&#xff0c;受到了健身爱好者们的广泛好评。这类耳机不同于我们常见的入耳式耳机&#xff0c;它的工作方式是直接通过振动将声音传递到用户的耳骨中&#xff0c;这样既可以享受音乐&a…

鸿蒙系统的APP开发

鸿蒙系统&#xff08;HarmonyOS&#xff09;是由华为公司开发的一款分布式操作系统。它被设计用于在各种设备上实现无缝的、统一的用户体验&#xff0c;包括智能手机、平板电脑、智能电视、智能穿戴等设备。鸿蒙系统的核心理念是支持多终端协同工作&#xff0c;使应用能够更灵活…

unity学习笔记----游戏练习06

一、豌豆射手的子弹控制 创建脚本单独控制子弹的运动 用transform来控制移动 void Update() { transform.Translate(Vector3.right * speed * Time.deltaTime); } 创建一个控制子弹速度的方法&#xff0c;方便速度的控制 private void SetSpeed(float spee…

DS:顺序表的实现(超详细!!)

创作不易&#xff0c;友友们给个三连呗&#xff01; 本文为博主在DS学习阶段的第一篇博客&#xff0c;所以会介绍一下数据结构&#xff0c;并在最后学习对顺序表的实现&#xff0c;在友友们学习数据结构之前&#xff0c;一定要对三个部分的知识——指针、结构体、动态内存管理的…

springboot118共享汽车管理系统

简介 【毕设源码推荐 javaweb 项目】基于springbootvue 的共享汽车管理系统 适用于计算机类毕业设计&#xff0c;课程设计参考与学习用途。仅供学习参考&#xff0c; 不得用于商业或者非法用途&#xff0c;否则&#xff0c;一切后果请用户自负。 看运行截图看 第五章 第四章 获…

Java 集合List相关面试题

&#x1f4d5;作者简介&#xff1a; 过去日记&#xff0c;致力于Java、GoLang,Rust等多种编程语言&#xff0c;热爱技术&#xff0c;喜欢游戏的博主。 &#x1f4d7;本文收录于java面试题系列&#xff0c;大家有兴趣的可以看一看 &#x1f4d8;相关专栏Rust初阶教程、go语言基…

图形用户界面(GUI)开发教程

文章目录 写在前面MATLAB GUI启动方式按钮&#xff08;Push Button&#xff09;查看属性tag的命名方式回调函数小小的总结 下拉菜单&#xff08;Pop-up Menu&#xff09;单选框&#xff08;Radio Button&#xff09;和复选框&#xff08;Check Box&#xff09;静态文本&#xf…

将vue组件发布成npm包

文章目录 前言一、环境准备1.首先最基本的需要安装nodejs&#xff0c;版本推荐 v10 以上&#xff0c;因为需要安装vue-cli2.安装vue-cli 二、初始化项目1.构建项目2.开发组件/加入组件3. 修改配置文件 三、调试1、执行打包命令2、发布本地连接包3、测试项目 四、发布使用1、注册…

开源客户沟通平台Chatwoot账号激活问题

安装docker docker-compose 安装git clone https://github.com/chatwoot/chatwoot 下载之后根目录有一个docker-compose.production.yaml将其复制到一个目录 重命名 docker-compose.yaml 执行docker-compose up -d 构建 构建之后所有容器都安装好了 直接访问http://ip:3…

基于 Docker 部署 Pingvin Share 文件共享平台

一、Pingvin Share 介绍 Pingvin Share 简介 Pingvin Share 是自托管文件共享平台&#xff0c;是 WeTransfer 的替代方案。 Pingvin Share 特点 在 2 分钟内启动您的实例使用可通过链接访问的文件创建共享没有文件大小限制&#xff0c;只有你的磁盘是你的限制设置共享到期时间…

Mysql全局优化

Mysql全局优化总结 从上图可以看出SQL及索引的优化效果是最好的&#xff0c;而且成本最低&#xff0c;所以工作中我们要在这块花更多时间。 补充一点配置文件my.ini或my.cnf的全局参数&#xff1a; 假设服务器配置为&#xff1a; CPU&#xff1a;32核内存&#xff1a;64GDIS…

highcharts.css文件的样式覆盖了options的series里面的color问题解决

文章目录 一、问题背景二、解决问题 一、问题背景 原本的charts我们的每个数据是有对应的color显示的&#xff0c;如下图&#xff1a; 后面我们系统做了黑白模式&#xff0c;引入了highcharts的css文件&#xff0c;结果highcharts的css文件中class的颜色样式覆盖了我们数据中的…

vue打包后与本地测试样式不同问题,element-ui样式打包部署前后样式不同。

个别文件的样式中<style>未加scope。 查找到一些文件中修改了对应页面的elementUI的样式&#xff0c;但未加scope 给<style>加上scope&#xff0c;就好了。

shell脚本登录dlut-lingshui并设置开机连网和断网重连

本文提供了一个用于无图形界面linux系统自动连接dlut-lingshui校园网的shell脚本&#xff0c;并提供了设置开机联网以及断网重连的详细操作步骤。本文的操作在ubuntu 22.04系统上验证有效&#xff0c;在其他版本的linux系统上操作时遇到问题可以自行百度。 1. 获取校园网认证界…