[flink 实时流基础] 输出算子(Sink)

news2025/1/6 20:25:38

学习笔记
Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。
image.png


文章目录

      • **连接到外部系统**
      • **输出到文件**
      • 输出到 Kafka
      • 输出到 mysql
      • 自定义 sink

连接到外部系统

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。
Flink1.12开始,同样重构了Sink架构,
stream.sinkTo(…)
当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/connectors/datastream/overview/
image.png

我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。
除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。
image.png
除此以外,就需要用户自定义实现sink连接器了。

输出到文件

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。
public class SinkFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每个目录中,都有 并行度个数的 文件在写入
        env.setParallelism(2);
        
        // 必须开启checkpoint,否则一直都是 .inprogress
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                Long.MAX_VALUE,
                RateLimiterStrategy.perSecond(1000),
                Types.STRING
        );

        DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");

        // 输出到文件系统
        FileSink<String> fieSink = FileSink
                // 输出行式存储的文件,指定路径、指定编码
                .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
                // 输出文件的一些配置: 文件名的前缀、后缀
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("atguigu-")
                                .withPartSuffix(".log")
                                .build()
                )
                // 按照目录分桶:如下,就是每个小时一个目录
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 文件滚动策略:  1分钟 或 1m
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(1))
                                .withMaxPartSize(new MemorySize(1024*1024))
                                .build()
                )
                .build();
        dataGen.sinkTo(fieSink);

        env.execute();
    }
}

输出到 Kafka

(1)添加Kafka 连接器依赖
由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
(2)启动Kafka集群
(3)编写输出到Kafka的示例代码

public class SinkKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 如果是精准一次,必须开启checkpoint(后续章节介绍)
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102", 7777);

        /**
         * Kafka Sink:
         * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
         * 1、开启checkpoint(后续介绍)
         * 2、设置事务前缀
         * 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // 指定 kafka 的地址和端口
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                // 指定序列化器:指定Topic名称、具体的序列化
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("ws")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // 写到kafka的一致性级别: 精准一次、至少一次
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("atguigu-")
                // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
                .build();


        sensorDS.sinkTo(kafkaSink);


        env.execute();
    }
}

自定义序列化器,实现带key的record:

public class SinkKafkaWithKey {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());


        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102", 7777);


        /**
         * 如果要指定写入kafka的key,可以自定义序列化器:
         * 1、实现 一个接口,重写 序列化 方法
         * 2、指定key,转成 字节数组
         * 3、指定value,转成 字节数组
         * 4、返回一个 ProducerRecord对象,把key、value放进去
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,
        hadoop104:9092")
                .setRecordSerializer(
                        new KafkaRecordSerializationSchema<String>() {

                            @Nullable
                            @Override
                            public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                                String[] datas = element.split(",");
                                byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                return new ProducerRecord<>("ws", key, value);
                            }
                        }
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("atguigu-")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();


        sensorDS.sinkTo(kafkaSink);


        env.execute();
    }
}

输出到 mysql

写入数据的MySQL的测试步骤如下。
(1)添加依赖
添加MySQL驱动:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>

官方还未提供flink-connector-jdbc的1.17.0的正式依赖,暂时从apache snapshot仓库下载,pom文件中指定仓库路径:

<repositories>
    <repository>
        <id>apache-snapshots</id>
        <name>apache snapshots</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
    </repository>
</repositories>

添加依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>1.17-SNAPSHOT</version>
</dependency>

如果不生效,还需要修改本地maven的配置文件,mirrorOf中添加如下标红内容:

<mirror>
    <id>aliyunmaven</id>
    <mirrorOf>*,!apache-snapshots</mirrorOf>
    <name>阿里云公共仓库</name>
    <url>https://maven.aliyun.com/repository/public</url>
</mirror>

(2)启动MySQL,在test库下建表ws

mysql>
CREATE TABLE ws (
id varchar(100) NOT NULL,
ts bigint(20) DEFAULT NULL,
vc int(11) DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

(3)编写输出到MySQL的示例代码

public class SinkMySQL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction());


/**
     * TODO 写入mysql
     * 1、只能用老的sink写法: addsink
     * 2、JDBCSink的4个参数:
     *    第一个参数: 执行的sql,一般就是 insert into
     *    第二个参数: 预编译sql, 对占位符填充值
     *    第三个参数: 执行选项 ---》 攒批、重试
     *    第四个参数: 连接选项 ---》 url、用户名、密码
     */
SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
    "insert into ws values(?,?,?)",
    new JdbcStatementBuilder<WaterSensor>() {
        @Override
        public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
            //每收到一条WaterSensor,如何去填充占位符
            preparedStatement.setString(1, waterSensor.getId());
            preparedStatement.setLong(2, waterSensor.getTs());
            preparedStatement.setInt(3, waterSensor.getVc());
        }
    },
    JdbcExecutionOptions.builder()
    .withMaxRetries(3) // 重试次数
    .withBatchSize(100) // 批次的大小:条数
    .withBatchIntervalMs(3000) // 批次的时间
    .build(),
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
    .withUsername("root")
    .withPassword("000000")
    .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
    .build()
);


sensorDS.addSink(jdbcSink);


env.execute();
}
}

(4)运行代码,用客户端连接MySQL,查看是否成功写入数据。

自定义 sink

如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
stream.addSink(new MySinkFunction());
在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1561714.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

8_springboot_shiro_jwt_多端认证鉴权_多Reaml管理

1. 目标 前面一直讨论的是只有一个Reaml的场景&#xff0c;Shiro是可以管理多个Realm的。那么什么场景下&#xff0c;我们需要定义多个Realm&#xff0c;以及Shiro框架是如何管理多个Realm的&#xff0c;他们是如何工作的。本章将会解释上面的问题&#xff0c;最后会配置前面章…

MySQL中MHA故障排查

文章目录 MySQL故障排查MySQL主从环境常见故障1、故障一1.1 故障现象1.2 报错原因1.3 解决方法 2、故障二2.1 故障现象2.2 报错原因2.3 解决方法 3、故障三3.1 故障现象3.2 报错原因3.3 解决方法 4、故障四4.1 故障现象4.2 问题分析4.3 解决方法 5、故障五5.1 故障现象5.2 报错…

neo4j使用详解(六、cypher即时时间函数语法——最全参考)

Neo4j系列导航&#xff1a; neo4j及简单实践 cypher语法基础 cypher插入语法 cypher插入语法 cypher查询语法 cypher通用语法 cypher函数语法 6.时间函数-即时类型 表示具体的时刻的时间类型函数 6.1.date函数 年-月-日时间函数&#xff1a; yyyy-mm-dd 6.1.1.获取date da…

深度学习500问——Chapter05: 卷积神经网络(CNN)(3)

文章目录 5.14 理解转置卷积与棋盘效应 5.14.1 标准卷积 5.14.2 转置卷积 5.15 卷积神经网络的参数设置 5.16 提高卷积神经网络的泛化能力 5.17 卷积神经网络在不同领域的应用 5.17 .1 联系 5.17.2 区别 5.14 理解转置卷积与棋盘效应 5.14.1 标准卷积 在理解转置卷积之前&…

从学习海底捞到学习巴奴,中国餐饮带洋快餐重归“产品主义”

俗话说“民以食为天”&#xff0c;吃饭一向是国人的头等大事&#xff0c;餐饮业也是经济的强劲助推力。新世纪以来&#xff0c;餐饮业不断讲述着热辣滚烫的商业故事。 2006年&#xff0c;拥有“必胜客”、“肯德基”等品牌的餐饮巨头百胜集团&#xff0c;组织两百多名区域经理…

代码随想录算法训练营第24天|理论基础 |77. 组合

理论基础 jia其实在讲解二叉树的时候&#xff0c;就给大家介绍过回溯&#xff0c;这次正式开启回溯算法&#xff0c;大家可以先看视频&#xff0c;对回溯算法有一个整体的了解。 题目链接/文章讲解&#xff1a;代码随想录 视频讲解&#xff1a;带你学透回溯算法&#xff08;理…

Windows安装禅道系统结合Cpolar实现公网访问内网BUG管理服务

文章目录 前言1. 本地安装配置BUG管理系统2. 内网穿透2.1 安装cpolar内网穿透2.2 创建隧道映射本地服务3. 测试公网远程访问4. 配置固定二级子域名4.1 保留一个二级子域名5.1 配置二级子域名6. 使用固定二级子域名远程 前言 BUG管理软件,作为软件测试工程师的必备工具之一。在…

竞技之道-打造成功竞技游戏的实战指南【文末送书】

文章目录 理解竞技游戏的本质游戏力&#xff1a;竞技游戏设计实战教程【文末送书】 在当今数字化时代&#xff0c;游戏已经不再是一种单纯的娱乐方式&#xff0c;而是成为了一门具有巨大商业潜力的产业。特别是竞技游戏&#xff0c;它们引领着全球数十亿玩家的潮流&#xff0c;…

引用,内联函数,auto函数,指针nullptr

一&#xff1a;引用 1.1 该文章的引用是对上一篇引用的进行补充和完善 按理来说&#xff0c;double可以隐式转换为int&#xff0c;那起别名的时候为什么不可以类型转换呢&#xff1f; 那是因为&#xff0c;在类型转换的时候&#xff0c;会创建一个临时变量&#xff0c;让后再…

基于kalman的单目标追踪,以及demo测试(Python and C++)

一.卡尔曼滤波简单介绍 我们可以在任何含有不确定信息的动态系统中的使用卡尔曼滤波&#xff0c;对系统的下一步动作做出有根据的猜测。猜测的依据是预测值和观测值&#xff0c;首先我们认为预测值和观测值都符合高斯分布且包含误差&#xff0c;然后我们预设预测值的误差Q和观测…

OMNet项目1 —— Linux环境配置

项目环境搭建&#xff0c;软件版本Ubuntu16&#xff0c;OMNet5.0 Linux配置环境步骤 安装VMWare虚拟机16.25&#xff08;个人号养老版本&#xff09;下载ISO镜像文件Ubuntu16 链接&#xff1a;https://pan.baidu.com/s/1SETyn6t4qIUfli1uRRgm3w?pwdf4ua 提取码&#xff1a;f…

软件设计师25--逻辑结构设计

软件设计师25--逻辑结构设计 考点1&#xff1a;关系模式相关概念数据模型关系模型相关概念完整性约束 考点2&#xff1a;E-R图转换关系模式逻辑结构设计 - E-R模型转关系模式E - R图转关系模式 考点1&#xff1a;关系模式相关概念 数据模型 层次模型网状模型关系模型面向对象…

第四百三十六回

文章目录 1. 概念介绍2. 思路与方法2.1 实现思路2.2 实现方法 3. 示例代码4. 内容总结 我们在上一章回中介绍了"不同平台上换行的问题"相关的内容&#xff0c;本章回中将介绍如何在页面上显示蒙板层.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. 概念介绍 我…

ElMessageBox.confirm中内容换行

ElMessageBox.confirm(导入结果&#xff1a;<br/>成功导入${res.successCount}条数据&#xff0c;导入失败${res.errorList.length}条数据。<br/>${str},"提示",{confirmButtonText: "确定",cancelButtonText: "取消",type: "w…

云原生应用(5)之Dockerfile精讲及新型容器镜像构建技术

一、容器与容器镜像之间的关系 说到Docker管理的容器不得不说容器镜像&#xff0c;主要因为容器镜像是容器模板&#xff0c;通过容器镜像我们才能快速创建容器。 如下图所示&#xff1a; Docker Daemon通过容器镜像创建容器。 二、容器镜像分类 操作系统类 CentOS Ubuntu 在…

38.HarmonyOS鸿蒙系统 App(ArkUI)堆叠布局结合弹性布局

层叠布局用于在屏幕上预留一块区域来显示组件中的元素&#xff0c;提供元素可以重叠的布局。层叠布局通过Stack容器组件实现位置的固定定位与层叠&#xff0c;容器中的子元素&#xff08;子组件&#xff09;依次入栈&#xff0c;后一个子元素覆盖前一个子元素&#xff0c;子元素…

Spring源码分析(BeanDefinition)

文章目录 Spring源码分析&#xff08;BeanDefinition&#xff09;一、概述1、BeanDefinition 的理解2、BeanDefinition 接口3、BeanDefinition 的实现4、BeanDefinitionHolder 类 二、BeanDefinition 的加载1、reader 的获取1&#xff09;registerAnnotationConfigProcessors2&…

浅谈高阶智能驾驶-NOA领航辅助的技术与发展

浅谈高阶智能驾驶-NOA领航辅助的技术与发展 附赠自动驾驶学习资料和量产经验&#xff1a;链接 2019年在国内首次试驾特斯拉NOA领航辅助驾驶的时候&#xff0c;当时兴奋的觉得未来已来;2020年在试驾蔚来NOP领航辅助驾驶的时候&#xff0c;顿时不敢小看国内新势力了;现在如果哪家…

第十八章 算法

一、介绍 1.1 什么是算法 算法&#xff08;Algorithm&#xff09;是指解题方案的准确而完整的描述&#xff0c;是一系列解决问题的清晰指令&#xff0c;算法代表着用系统的方法描述解决问题的策略机制。也就是说&#xff0c;能够对一定规范的输入&#xff0c;在有限时间内获…

kubernetes之实战进阶篇

目录 一、搭建kubenetes集群 1.1、搭建方案选择 1.2、软硬件准备 1.2.1、硬件要求: 1.2.2、软件要求 1.3、安装步骤 1.3.1、初始化操作(三个节点都要执行一遍) 1.3.2、部署kubernetes master节点(控制面板) 1.3.3、node节点加入k8s集群 1.3.4、部署CNI网络插件 1.3.…