【Flink】DataStream API使用之输出算子(Sink)

news2025/1/22 19:55:24

输出算子(Sink)

在这里插入图片描述
Flink作为数据处理框架,最终还是需要把计算处理的结果写入到外部存储,为外部应用提供支持。Flink提供了很多方式输出到外部系统。

1. 连接外部系统

Flink中我们可以在各种Fuction中处理输出到外部系统,但是Flink作为一个快速的分布式实时流处理系统,对稳定性和容错性要求极高。一旦出现故障,我们应该有能力恢复之前的状态,保障处理结果的正确性。这种性质一般被称为"状态一致性"。Flink内部提供了一致性检查点checkpoint来保障我们可以回滚到正确的状态,但是我们在处理过程中任意读写外部系统,发生故障后就很难回退到从前了。

所以FlinkDataStream API提供了专门向外部写入数据的方法,通过addSink实现,与addSource类似,addSink方法对应着一个Sink算子,主要就是来实现与外部系统连接,并将数据提交写入;Flink程序中所有对外的输出操作一般都是利用Sink算子完成的。比如我们经常使用的print 方法返回的就是一个 DataStreamSink

Sink算子的创建主要是通过DataSream的.addSink()实现的,并且需要重写default void invoke(IN value, Context context) throws Exception 方法

Flink提供的连接器,这个是1.17版本的,比1.13版本的多很多
在这里插入图片描述
除了官方的,Flink也可以使用Apache Bahir的扩展连接器

在这里插入图片描述

2. 输出到文件

输出文件Flink有writeAsText()writeAsCsv()可以直接输出到文件,但是这种不支持同时写一份文件,必须设定为并行度1,所以Flink又提供了一个专门的流式文件系统的连接器StreamingFileSink

SreamingFileSink继承自抽象类RichSinkFunction,而且集成Flink检查点机制(checkpoint)用来保证精确一次的一致性语义StreamingFileSInk主要操作是将数据写入桶,每个桶中的数据都可以分割成一个个大小有限的分区文件,并且也可以通过各种配置来控制分桶的操作;默认的分桶方式是基于时间的。

StreamingFileSink(Row-encoded)支持行编码批量编码(Bulk-encoded,比如 Parquet)格式,这两种不同的方式都有各自的构建器:

  • 行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)
  • 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)

代码实例:

public static void main(String[] args) throws Exception {
        // 1. 直接调用getExecutionEnvironment 方法,底层源码可以自由判断是本地执行环境还是集群的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        // 2. 从集合中读取数据
        ArrayList<Event> list = new ArrayList<>();
        list.add(new Event("ming","www.baidu1.com",1200L));
        list.add(new Event("xiaohu","www.baidu2.com",1200L));
        list.add(new Event("xiaohu","www.baidu5.com",1267L));
        list.add(new Event("gala","www.baidu6.com",1200L));
        list.add(new Event("ming","www.baidu7.com",4200L));
        list.add(new Event("xiaohu","www.baidu8.com",5500L));
        // 3. 读取数据
        DataStreamSource<Event> eventDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
        // 4. 构建File Sink
        StreamingFileSink<String> streamingFileSink = StreamingFileSink.<String>forRowFormat(new Path("./out"), new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .build()
                ).build();
        eventDataStreamSource.map(Event::toString).addSink(streamingFileSink);
        // 5. 执行程序
        env.execute();
    }

这里设置了并行度是2,所以是两个桶文件。通过.withRollingPolicy()方法指定滚动策略,策略配置说明:

  • withInactivityInterval : 最近 5 分钟没有收到新的数据
  • withRolloverInterval : 至少包含 15 分钟的数据
  • withMaxPartSize : 文件大小已达到 1 GB
    在这里插入图片描述

3. 输出到Kafka

Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项目中是最高级别的一致性保证。
代码实例:

public static void main(String[] args) throws Exception {
        // 1. 直接调用getExecutionEnvironment 方法,底层源码可以自由判断是本地执行环境还是集群的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2. 设置属性
        Properties properties = new Properties();
        properties.put("bootstrap.servers","hadoop102:9092");
        // 3. 读取数据
        DataStreamSource<String> stringDataStreamSource = env.readTextFile("input/clicks.csv");
        // 4. 构建File Sink
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("clicks", new SimpleStringSchema(), properties);
        // 5. addSink
        stringDataStreamSource.addSink(kafkaProducer);
        // 6. 执行程序
        env.execute();
    }

addSink 传入的参数是一个 FlinkKafkaProducerFlinkKafkaProducer继承了抽象类TwoPhaseCommitSinkFunction,这是一个实现了两阶段提交的RichSinkFuction,两阶段提交提供了FlinkKafka写入数据的事务性保证,能够真正做到精确一次的状态一致性

4. 自定义Sink输出

如果Flink提供的Sink不满足自己的要求,也可以通过自定义Sink来满足自己的要求,通过Flink提供的SinkFuction接口和对应的RichSinkFuction抽象类重写invoke()就可以自定义Sink

这里以Hbase为例,使用RichSinkFuction,创建Hbase的连接以及关闭Hbase的连接分别放到openclose方法中。

代码实例:

public static void main(String[] args) throws Exception {
        // 1. 直接调用getExecutionEnvironment 方法,底层源码可以自由判断是本地执行环境还是集群的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2. 设置属性
        Properties properties = new Properties();
        properties.put("bootstrap.servers","hadoop102:9092");
        // 3. 读取数据
        DataStreamSource<String> stringDataStreamSource = env.readTextFile("input/clicks.csv");
        // 4. 构建File Sink
        stringDataStreamSource.addSink(new RichSinkFunction<String>() {

            public Configuration configuration; // 管理 Hbase 的配置信息,这里因为 Configuration 的重名问题,将类以完整路径
            public Connection connection; // 管理 Hbase 连接

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                configuration = HBaseConfiguration.create();
                configuration.set("hbase.zookeeper.quorum", "hadoop102:2181");
                connection = ConnectionFactory.createConnection(configuration);

            }

            @Override
            public void close() throws Exception {
                super.close();
                connection.close(); // 关闭连接

            }

            @Override
            public void invoke(String value, Context context) throws Exception {
                Table table = connection.getTable(TableName.valueOf("test")); // 表名为 test
                Put put = new Put("rowkey".getBytes(StandardCharsets.UTF_8)); // 指定 rowkey
                put.addColumn("info".getBytes(StandardCharsets.UTF_8) // 指定列名
                        , value.getBytes(StandardCharsets.UTF_8) // 写入的数据
                        , "1".getBytes(StandardCharsets.UTF_8)); // 写入的数据
                table.put(put); // 执行 put 操作
                table.close(); // 将表关闭
            }
        });
        // 6. 执行程序
        env.execute();
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/617412.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C#读写参数到APP.Config

C#读写参数到APP.Config 介绍程序Demo常见错误 介绍 系统在开发时&#xff0c;可能需要设置默认参数&#xff0c;比如数据库的链接参数&#xff0c;某个参数的默认数据等等。对于这些数据&#xff0c;可直接在app.config中读取。 在读写时&#xff0c;需要先了解configuratio…

echo命令在Unix中的作用以及其常见用法

在Unix系统中&#xff0c;"echo"是一个常用的命令&#xff0c;用于在终端或脚本中输出文本。它可以将指定的字符串或变量的值打印到标准输出&#xff0c;从而向用户提供信息或进行调试。 本文将详细介绍"echo"命令在Unix中的作用以及其常见用法。 基本语法…

Keras-3-实例1-二分类问题

1. 二分类问题 1.1 IMDB 数据集加载 IMDB 包含5w条严重两极分化的评论&#xff0c;数据集被分为 2.5w 训练数据 和 2.5w 测试数据&#xff0c;训练集和测试集中的正面和负面评论占比都是50% from keras.datasets import imdb(train_data, train_labels), (test_data, test_l…

UE5 Chaos破碎系统学习1

在UE5中&#xff0c;Chaos破碎系统被直接进行了整合&#xff0c;本篇文章就来讲讲chaos的基础使用。 1.基础破碎 1.首先选中需要进行破碎的模型&#xff0c;例如这里选择一个Box&#xff0c;然后切换至Fracture Mode&#xff08;破碎模式&#xff09;&#xff1a; 2.点击右侧…

JAVA实现打字练习软件

转眼已经学了一学期的java了&#xff0c;老师让我们根据所学知识点写一个打字练习软件的综合练习。一开始我也不是很有思路&#xff0c;我找了一下发现csdn上关于这个小项目的代码也不算很多&#xff0c;所以我最后自己在csdn查了一些资料&#xff0c;写了这么一个简略版本的打…

【C++】——list的介绍及模拟实现

文章目录 1. 前言2. list的介绍3. list的常用接口3.1 list的构造函数3.2 iterator的使用3.3 list的空间管理3.4 list的结点访问3.5 list的增删查改 4. list迭代器失效的问题5. list模拟实现6. list与vector的对比7. 结尾 1. 前言 我们之前已经学习了string和vector&#xff0c…

Remix IDE已支持Sui Move在线开发

网页版Remix IDE与WELLDONE Code插件结合&#xff0c;让您无需本地设置或安装即可开始构建Sui应用程序。 不熟悉Sui的构建者可能想在正式配置开发环境之前&#xff0c;浅尝一下构建Sui应用程序。Remix IDE与WELLDONE Code插件组合&#xff0c;即可帮助构建者实现从浏览器窗口开…

JavaScript函数的增强知识

函数属性和arguments以及剩余参数 函数属性name与length ◼ 我们知道JavaScript中函数也是一个对象&#xff0c;那么对象中就可以有属性和方法。 ◼ 属性name&#xff1a;一个函数的名词我们可以通过name来访问&#xff1b; // 自定义属性foo.message "Hello Foo"…

Nginx 之 Tomcat 负载均衡、动静分离

一.详细安装及操作实例&#xff08;Nginx 七层代理&#xff09; 首先至少准备三台服务器 Nginx 服务器&#xff1a;192.168.247.131:80 Tomcat服务器1&#xff1a;192.168.247.133:80 Tomcat服务器2&#xff1a;192.168.247.134:8080 192.168.247.134:80811.部署Nginx 负载均…

微信自动回复怎么设置呢?

友友们 你们是否有以下这些烦恼 1、每天要手动点击“添加”按钮多次以通过大量好友? 2、你是否经常需要在多个微信帐号之间来回切换&#xff1f; 3、你的回复速度慢&#xff0c;导致客户流失率高&#xff1f; 4、为了及时回复&#xff0c;你总是需要带着多部手机出门&…

二十一、C++11(中)

文章目录 一、左值&右值&#xff08;一&#xff09;基本概念1.左值是什么2.右值是什么 &#xff08;二&#xff09;左值引用和右值引用1.左值引用2.右值引用 二、右值引用使用场景和意义&#xff08;一&#xff09;引入&#xff08;二&#xff09;左值引用的使用场景&#…

Linux编译器(gcc/g++)调试器gdb项目自动化构建工具(make/Makefile)版本管理git

Linux编译器-gcc/g&&调试器gdb&&项目自动化构建工具-make/Makefile&&版本管理git &#x1f506;gcc/g的使用可执行文件的"生产"过程gcc如何完成预处理编译汇编链接 函数库函数库一般分为静态库和动态库两种静态C/C库的安装 gcc选项gcc选项记…

WPF 学习:如何照着MaterialDesign的Demo学习

文章目录 往期回顾对应视频资源如何照着wpf项目学习找到你想要抄的页面查找对应源码演示示例如何认清页面元素抄袭实战 项目地址总结 往期回顾 WPF Debug运行是 实时可视化树无效&#xff0c;无法查看代码 WPF MaterialDesign 初学项目实战&#xff08;0&#xff09;:github …

【Java】线程池的概念及使用、ThreadPoolExecutor的构造方法

什么是线程池为什么用线程池JDK提供的线程池工厂模式如何使用 自定义线程池ThreadPoolExecutor类的构造方法工作原理拒绝策略 线程池的使用 什么是线程池 在之前JDBC编程中&#xff0c;通过DataSource获取Connection的时候就已经用到了池的概念。这里的池指的是数据库连接池。…

Vue电商项目--uuid游客身份获取购物车数据

uuid游客身份获取购物车数据 获取购物车列表 请求地址 /api/cart/cartList 请求方式 GET 参数类型 参数名称 类型 是否必选 描述 无 无 无 无 返回示例 成功&#xff1a; { "code": 200, "message": "成功", "…

马尔萨斯 ( Malthus)人口指数增长模型Logistic 模型

3.要求与任务 从 1790 — 1990 年间美国每隔 10 年的人口记录如下表所示&#xff1a; 用以上数据检验马尔萨斯 ( Malthus)人口指数增长模型&#xff0c;根据检验结果进一步讨论马尔萨斯 人口模型的改进&#xff0c;并利用至少两种模型来预测美国2010 年的人口数量。 提示 1 &…

自学黑客(网络安全),一般人我还是劝你算了吧

作为从16年接触网络安全的小白&#xff0c;谈谈零基础如何入门网络安全&#xff0c;有不对的地方&#xff0c;请多多指教。 这些年最后悔的事情莫过于没有把自己学习的东西积累下来形成一个知识体系。 后续我也会陆续的整理网络安全的相关学习资料及文章&#xff0c;与大家一…

数据结构与算法练习(三)二叉树

文章目录 1、树2、二叉树3、满二叉树4、完全二叉树5、二叉树的遍历&#xff08;前序、中序、后序&#xff09;二叉树删除节点或树 6、顺序存储二叉树顺序存储二叉树遍历&#xff08;前序、中序、后序&#xff09; 7、线索化二叉树中序线索二叉树前序线索二叉树后序线索二叉树 1…

Matlab 之 Curve Fitting APP 使用笔记

文章目录 Part.I IntroductionPart.II 使用笔记Chap.I 拟合函数Chap.II 注意事项 Part.I Introduction 曲线或曲面拟合获取拟合参数。本篇博文主要记录一下 Matlab 拟合 APP Curve Fitting 的使用方法。 Part.II 使用笔记 这个APP用来做拟合的&#xff0c;包括二维数据的线拟…

常见的样本统计量及其数字特征

常见的样本统计量及其数字特征 下图来自《统计学图鉴》 样本统计量有什么作用&#xff1f; 因为总体特征包含有总体均值、总体方差等特征&#xff0c;我们在用样本推断总体时&#xff0c;其实就是用样本特征去估计总体特征&#xff0c;例如&#xff1a;样本均值这个统计量的期…