大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等

news2025/1/15 6:39:45

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink Sink JDBC
  • Flink Sink Kafka

在这里插入图片描述

注意事项

DataSetAPI 和 DataStream API一样有三个部分组成,各部分的作用对应一致,此处不再赘述。

FlinkDataSet

在 Apache Flink 中,DataSet API 是 Flink 批处理的核心接口,它主要用于处理静态数据集。虽然 Flink 的 DataStream API 被广泛用于流式数据处理,但 DataSet API 适用于大规模批处理场景,如数据清洗、ETL、分析等。虽然近年来 Flink 更多地向流处理方向发展,但批处理仍然是数据处理中的一个重要场景。

DataSource

对DataSet批处理而言,较为频繁的操作是读取HDFS中的文件数据,因为这里主要介绍两个 DataSource 组件:

  • 基于集合:fromCollection 主要是为了方便测试
  • 基于文件:readTextFile,基于HDFS中的数据进行计算分析

基本概念

Flink 的 DataSet API 是一个功能强大的批处理 API,专为处理静态、离线数据集设计。DataSet 中的数据是有限的,处理时系统会先等待整个数据集加载完毕。DataSet 可以通过多种方式创建,例如从文件、数据库、集合等加载数据,然后通过一系列转换操作(如 map、filter、join 等)进行处理。

核心特性

  • 支持丰富的转换操作。
  • 提供多种输入输出数据源。
  • 支持复杂的数据类型,包括基本类型、元组、POJO、列表等。
  • 支持优化计划,例如通过 cost-based optimizer 来优化查询执行计划。

DataSet 创建

在 Flink 中,可以通过多种方式创建 DataSet。以下是常见的数据源:

从本地文件读取

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("path/to/file");

从 CSV 文件读取

DataSet<Tuple3<Integer, String, Double>> csvData = env.readCsvFile("path/to/file.csv")
    .types(Integer.class, String.class, Double.class);

从集合中创建

List<Tuple2<String, Integer>> data = Arrays.asList(
    new Tuple2<>("Alice", 1),
    new Tuple2<>("Bob", 2)
);
DataSet<Tuple2<String, Integer>> dataSet = env.fromCollection(data);

从数据库中读取

可以通过自定义的输入格式(如 JDBC 输入格式)从数据库中读取数据,虽然 Flink 本身并没有内置 JDBC 源的批处理 API,但可以通过自定义实现。

DataSet 的转换操作(Transformation)

Flink 的 DataSet API 提供了丰富的转换操作,可以对数据进行各种变换,以下是常用的转换操作:
在这里插入图片描述
在这里插入图片描述

Map

将 DataSet 中的每一条记录进行映射操作,生成新的 DataSet。

DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
DataSet<Integer> squaredNumbers = numbers.map(n -> n * n);

Filter

过滤掉不满足条件的记录。

DataSet<Integer> evenNumbers = numbers.filter(n -> n % 2 == 0);

FlatMap

类似于 map,但允许一条记录生成多条输出记录。

DataSet<String> lines = env.fromElements("hello world", "flink is great");
DataSet<String> words = lines.flatMap((line, collector) -> {
    for (String word : line.split(" ")) {
        collector.collect(word);
    }
});

Reduce

将数据集根据某种聚合逻辑进行合并

DataSet<Integer> sum = numbers.reduce((n1, n2) -> n1 + n2);

GroupBy 和 Reduce

对数据集进行分组,然后在每个组上执行聚合操作

DataSet<Tuple2<String, Integer>> wordCounts = words
    .map(word -> new Tuple2<>(word, 1))
    .groupBy(0)
    .reduce((t1, t2) -> new Tuple2<>(t1.f0, t1.f1 + t2.f1));

Join

类似于 SQL 中的连接操作,连接两个 DataSet。

DataSet<Tuple2<Integer, String>> persons = env.fromElements(
    new Tuple2<>(1, "Alice"),
    new Tuple2<>(2, "Bob")
);
DataSet<Tuple2<Integer, String>> cities = env.fromElements(
    new Tuple2<>(1, "Berlin"),
    new Tuple2<>(2, "Paris")
);
DataSet<Tuple2<String, String>> personWithCities = persons.join(cities)
    .where(0)
    .equalTo(0)
    .with((p, c) -> new Tuple2<>(p.f1, c.f1));

DataSet 输出

DataSet API 提供多种方式将数据写出到外部系统:

写入文件

wordCounts.writeAsCsv("output/wordcounts.csv", "\n", ",");

写入数据库

虽然 DataSet API 没有直接提供 JDBC Sink,可以通过自定义 Sink 实现写入数据库功能。

打印控制台

wordCounts.print();

批处理的优化

DataSet API 提供了优化机制,通过成本模型和执行计划的分析来优化任务执行。在 Flink 内部,编译器会根据任务定义的转换操作生成一个优化的执行计划,这个过程类似于 SQL 查询优化器的工作原理。

  • DataSet 的分区:Flink 可以根据数据集的分区进行优化。例如,通过 partitionByHash 或 partitionByRange 来手动控制数据的分布方式。
  • DataSet 的缓存:可以通过 rebalance()、hashPartition() 等方法来均衡数据负载,以提高并行度和计算效率。

DataSet API 的容错机制

Flink 的 DataSet API 提供了容错机制,支持在发生故障时重新执行失败的任务。虽然 DataSet API 没有像 DataStream 那样依赖于 Checkpoint 机制,但其批处理特性允许任务从头开始重新执行,确保数据处理的正确性。

DataSet 与 DataStream 的对比

DataSet API 与 DataStream API 之间有一些重要的区别:

请添加图片描述

DataSet API 的未来

需要注意的是,Flink 的官方路线图中已经不再优先开发 DataSet API 的新特性,未来的主要开发将集中在 DataStream API,甚至批处理功能都将通过 DataStream API 来实现。
因此,如果可能,建议新项目尽量使用 DataStream API 来替代 DataSet API。
特别是 Flink 的 Table API 和 SQL API 也适用于批处理和流处理,这些高层 API 提供了更简洁的语法和更强的优化能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2104769.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JVM 垃圾回收机制:GC

目录 一、死亡对象的判断算法 1.1 引用计数算法 1.2 可达性分析算法 二、垃圾回收算法 2.1 标记-清除算法 2.2 复制算法 2.3 标记-整理算法 2.4 分代算法 三、垃圾收集器 3.1 CMS收集器&#xff08;老年代收集器&#xff0c;并发GC&#xff09; 3.2 G1收集器(唯一一…

项目实战 - 贪吃蛇

目录 1. 基本功能 2. 技术要点 3. 环境 4. 效果演示 5. 控制台设置 6. Win32 API介绍 6.1 Win32 API 6.2 程序台控制(Console) 6.3 控制台屏幕上的坐标(COORD) 6.4 GetStdHandle 6.5 GetConsoleCursorInfo 6.5.1 CONSOLE_CURSOR_INFO 6.6 SetConsoleCursorInfo 6…

云原生架构概念

云原生架构概念 云原生架构&#xff08;Cloud Native Architechtrue&#xff09;作为一种现代软件开发的革新力量&#xff0c;正在逐渐改变企业构建、部署和管理应用程序的方式。它的核心优势在于支持微服务架构&#xff0c;使得应用程序能够分解为独立、松耦合的服务&#xf…

详解si5338 si53xx 设计使用及STM32 iic驱动设计

背景 在实际项目中经常使用si5338 si53xx&#xff0c;进行多路时钟的倍频以生成想要的时钟信号&#xff0c;但是针对si5338 si53xx设计使用缺少相关的资料&#xff0c;本文详解si5338 si53xx 设计使用及STM32 iic驱动设计&#xff0c;本文使用工程在项目中得到测试&#xff0c…

基于人体关节夹角的人体动作识别算法(代码+数据集)

为此本文提出了一个基于人体关节夹角的人体动作识别算法&#xff0c;主要做了以下工作&#xff1a; &#xff08;1&#xff09;提出了一个可解释性强&#xff0c;耗费算力较少且鲁棒性较高的基于人体关节夹角的人体动作序列的特征抽取方法。 &#xff08;2&#xff09;本文所使…

PyInstaller实战:打包Python应用并间接指定输出文件名

在深入探讨如何使用PyInstaller打包Python应用并指定输出文件名称的过程中&#xff0c;我们不仅可以了解基本的命令行操作和参数设置&#xff0c;还可以深入了解PyInstaller的工作机制、状态变化以及它在处理复杂Python项目时的优势。下面&#xff0c;我们将详细展开这一过程&a…

提升多跳问答中的语言模型知识编辑能力

人工智能咨询培训老师叶梓 转载标明出处 大模型在静态知识库的更新上存在局限&#xff0c;特别是在面对需要多步骤推理的多跳问题时&#xff0c;难以提供准确和最新的回答。为了解决这一问题&#xff0c;来自美国佐治亚大学、纽约大学、莱斯大学、北卡罗来纳州立大学等机构的研…

STM32F103C8----GPIO(跟着江科大学STM32)

一&#xff0c;GPIO简介 GPIO&#xff08;General Purpose Input Output&#xff09;通用输入输出口 可配置为8种输入输出模式 引脚电平&#xff1a;0V~3.3V&#xff08;0V&#xff09;&#xff0c;部分引脚可容忍5V 输出模式下可控制端口输出高低电平&#xff0c;用以驱动…

idea2021安装教程与常见配置(可激活至2099年)

idea2021安装教程与常见配置(可激活至2099年) 下载 官网下载地址:https://www.jetbrains.com/zh-cn/idea/download/other.html 这里我们选择压缩包安装方式,选择2021.3 - Windows x64 ZIP Archive (zip),也可以选择exe安装方式 安装 解压缩安装方式 创建非中文目录D:\idea…

Win32绕过UAC弹窗获取管理员权限

在早些年写一些桌面软件时&#xff0c;需要管理员权限&#xff0c;但是又不想UAC弹窗&#xff0c;所以一般是直接将UAC的级别拉到最低&#xff0c;或者直接禁用UAC的相关功能。 什么是UAC(User Account Control) 用户帐户控制 (UAC) 是一项 Windows 安全功能&#xff0c;旨在保…

行走挖机多路比例阀控制放大器

挖掘机比例多路阀是挖掘机液压系统中的关键部件&#xff0c;它负责控制挖掘机各执行元件的运动方向、速度和力矩&#xff0c;从而影响挖掘机的作业效果。比例多路阀由多个阀块组成&#xff0c;其中比例控制阀由BEUEC比例放大器控制。每个阀块都有特定功能&#xff0c;如换向阀用…

昇腾大模型性能分析思路

性能分析 模型训练优化流程 我们根据性能问题的场景&#xff0c;按照单机和集群场景进行分类&#xff0c;再明确性能问题属于哪一类&#xff0c;明确好性能问题背景之后&#xff0c;才方便进行下一步问题的定位&#xff1b; 在明确问题背景后&#xff0c;参考性能分析工具介绍…

004、架构_详解(重点)

GoldenDB 分布式数据库框架 DN和RDB增加了备节点;引入新模块CM,且GTM、MDS、PM、CM都增加备节点;MDS、PM、CM、RDB被统一在了管理节点之中;GTM和MDS间多了一条连线,因为GTM的切换由MDS把控;初步系统架构mysqld:一般称为DB节点,负责单个节点的数据处理; dbproxy:一般…

FreeRTOS学习笔记—③RTOS内存管理篇(正在更新中)

二、RTOS的核心功能 RTOS的核心功能块主要分为任务管理、内核管理、时间管理以及通信管理4部分&#xff0c;框架图如下所示&#xff1a; &#xff08;1&#xff09;任务管理&#xff1a;负责管理和调度任务的执行&#xff0c;确保系统中的任务能够按照预期运行。 &#xff08;…

【SpringBoot】使用Nacos服务注册发现与配置管理

前提&#xff1a;需要提前部署好nacos服务&#xff0c;这里可以参考我的文章&#xff1a;Windows下Nacos安装与配置 0. 版本信息 Spring Boot3.2.8Spring Cloud2023.0.1Spring Cloud alibaba2023.0.1.0nacos2.3.2本地安装的nacos2.3.0 Spring Boot、Spring Cloud、Spring Clo…

黑盒闪清 v2.9.9 体积小巧,简洁高效的手机清理神器

黑盒闪清APP是安卓手机上的一款优质文件管理器&#xff0c;拥有存储分析、文件分类、大文件扫描、空文件夹扫描等功能&#xff0c;应用无广告、无推送&#xff0c;完全免费使用&#xff0c;让你手机中的文件管理就跟在电脑上管理一样简单。 链接&#xff1a;https://pan.quark…

C语言学习笔记 Day16(文件管理--下)

Day16 内容梳理&#xff1a; C语言学习笔记 Day14&#xff08;文件管理--上&#xff09;-CSDN博客 C语言学习笔记 Day15&#xff08;文件管理--中&#xff09;-CSDN博客 目录 Chapter 10 文件操作 10.5 文件状态 10.6 文件的随机读写 fseek()、rewind() &#xff08;1&…

对同一文件夹下所有excel表进行相同操作(数据填充、删除、合并)

背景引入&#xff1a;如图所示&#xff0c;笔者需要对数十个表格的银行日记账工作簿合并成一个工作簿&#xff0c;以便与本月银行流水进行核对。 为了方便银行日记账与银行流水进行核对&#xff0c;需要再每个村或小组的表格中&#xff0c;将村或小组的名称放在J列。 clear c…

Java | Leetcode Java题解之第392题判断子序列

题目&#xff1a; 题解&#xff1a; class Solution {public boolean isSubsequence(String s, String t) {int n s.length(), m t.length();int[][] f new int[m 1][26];for (int i 0; i < 26; i) {f[m][i] m;}for (int i m - 1; i > 0; i--) {for (int j 0; j…

9月4日C++作业

#include <iostream> #include <string> using namespace std; class Human {private:string name;int age;public:Human(){} //无参构造函数//有参构造函数Human(string i_name,int i_age):name(i_name),age(i_age){cout<<"调用了Human有参构…