Flink之DataStream API开发Flink程序过程与Flink常见数据类型

news2024/11/26 18:26:43

开发Flink程序过程与Flink常见数据类型

  • DataStream API
    • Flink三层API
    • DataStream API概述
  • 开发Flink程序过程
    • 添加依赖
    • 创建执行环境
    • 执行模式
    • 创建Data Source
    • 应用转换算子
    • 创建Data Sink
    • 触发程序执行
    • 示例
  • Flink常见数据类型
    • 基本数据类型
    • 字符串类型
    • 时间和日期类型
    • 数组类型
    • 元组类型
    • 列表类型
    • 映射类型
    • POJO类型
    • Row类型
    • 可序列化类型
    • 类型提示

DataStream API

Flink三层API

在这里插入图片描述

SQL & TableAPI

SQL & TableAPI同时适用于批处理和流处理,意味着可以对有界数据流和无界数据流以相同的语义进行查询,并产生相同的结果。除了基本查询外,它还支持自定义的标量函数,聚合函数以及表值函数,可以满足多样化的查询需求。

DataStream & DataSetAPI

DataStream & DataSetAPI是Flink数据处理的核心API,支持使用Java语言或Scala语言进行调用,提供了数据读取,数据转换和数据输出等一系列常用操作的封装。

StatefulStreamProcessing

StatefulStreamProcessing是最低级别的抽象,它通过ProcessFunction函数内嵌到DataStreamAPI中。ProcessFunction是Flink提供的最底层API,具有最大的灵活性,允许开发者对于时间和状态进行细粒度的控制。

DataStream API概述

Flink的DataStream API是Flink中最主要的API之一,它用于处理无限流数据。DataStreamAPI支持高级的流处理操作,例如窗口计算、状态管理、流分区等,并且在处理大规模数据时表现出色。

由于Flink DataSet和DataStream API的高度相似,并且DataStream API提供流批一体处理的能力,官方也推荐直接使用DataStream API,因此学习DataStream API如何使用即可。

流(STREAMING)执行模式适用于需要连续增量处理,而且预计无限期保持在线的无边界作业。

批(BATCH)执行模式适用于有一个已知的固定输入,而且不会连续运行的有边界作业。

开发Flink程序过程

确定需求:明确想要解决的问题或实现的功能。

导入依赖:在项目中导入Apache Flink相关的依赖,可以使用Maven、Gradle或其他构建工具来管理依赖关系。

创建StreamExecutionEnvironment:使用StreamExecutionEnvironment.getExecutionEnvironment()创建Flink的执行环境对象,它用于配置和执行流处理作业。

读取数据:从适合的数据源(例如文件、Kafka、Socket等)读取数据,可以使用readTextFile()、addSource()等方法来读取数据并转换为DataStream。

转换操作:对读取到的数据进行处理和转换操作,可以使用诸如map、flatmap、filter等方法来进行各种转换和处理。

窗口操作(可选):如果需要对数据进行窗口操作(例如滚动窗口、滑动窗口等),可以使用Flink提供的窗口操作方法。

结果处理:将转换后的数据写入文件、数据库、消息队列或其他输出源,或者使用print()、collect()等方法将数据打印到控制台。

设置作业配置和调优(可选):根据需求和性能要求,可以设置作业的并行度、时间特性、状态后端、容错机制、资源配置等。

执行作业:通过调用env.execute()方法来执行流处理作业。作业将提交到Flink集群或本地运行。

监控和调试(可选):可以通过Flink的监控界面查看作业的状态和指标,并使用日志和调试工具追踪和解决问题。

添加依赖

		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.17.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.0</version>
        </dependency>

创建执行环境

Flink程序可以在各种上下文环境中运行:

可以在本地 JVM 中执行程序

可以提交到远程集群上运行

创建执行环境是使用StreamExecutionEnvironment类,调用这个类的静态方法来创建执行环境。

在这里插入图片描述

获取到程序执行环境后,还可以对执行环境进行灵活的设置。

可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。

1.本地执行环境

使用createLocalEnvironment()方法创建一个本地执行环境

可以在调用时传入一个参数,指定默认的并行度,默认并行度是电脑CPU核心数

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);

2.集群执行环境
使用createRemoteEnvironment("node01", 8888,"/root/demo.jar")方法创建一个集群执行环境

需要在调用时指定JobManager的主机名和端口号,以及在集群中运行的Jar包

/**
 * JobManager 主机名
 * JobManager 进程端口号
 * 提交给JobManager的JAR包
 */
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("node01", 8888,"/root/demo.jar");

3.自适应执行环境

使用getExecutionEnvironment()方法根据当前运行的上下文直接得到正确的执行环境

如果程序独立运行,则返回一个本地执行环境。如果创建了jar包,然后在命令行调用它并提交到集群执行,则返回集群的执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

4.本地执行环境+Web UI
使用createLocalEnvironmentWithWebUI(conf)方法创建一个本地执行环境,同时启动Web监控UI。

需要创建一个配置文件,设置相关参数,如设置Web UI端口,默认使用端口8081

Configuration conf = new Configuration();
conf.set(RestOptions.BIND_PORT, "8082");

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

执行模式

DataStream API执行模式包括:流执行模式、批执行模式和自动模式。

流执行模式Streaming

流执行模式是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式。

批执行模式Batch

批执行模式是专门用于批处理的执行模式

自动模式AutoMatic

在自动模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

配置批执行模式

执行模式可以通过 execute.runtime-mode 设置来配置。有三种可选的值:

STREAMING: 经典 DataStream 执行模式(默认)

BATCH:DataStream API 上进行批量式执行

AUTOMATIC: 让系统根据数据源的边界性来决定

1.通过命令行配置

提交作业时,增加execution.runtime-mode参数,指定值为BATCH

bin/flink run -Dexecution.runtime-mode=BATCH

2.通过代码配置

// 创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 基于执行环境调用setRuntimeMode方法,传入BATCH模式。不建议,推荐通过命令行传递参数
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

创建Data Source

创建执行环境后,可以使用其提供的一些方法,通过这些方法可以创建Data Source

例如:从文件中读取数据:可以直接逐行读取数据,像读CSV文件一样,或使用任何第三方提供的source

String filePath = "data/test.text";

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile(filePath);

应用转换算子

这将生成一个DataStream,然后可以在上面应用转换算子transformation来创建新的派生DataStream。可以调用DataStream上具有转换功能的方法来应用转换。

例如: 应用一个map的转换算子,它将通过把原始集合中的每一个字符串转换为一个整数来创建一个新的DataStream。

DataStream<String> text = ...;

DataStream<Integer> parsed = text.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

创建Data Sink

一旦有了包含最终结果的DataStream,就可以通过创建sink把它写到外部系统。

// 简单skin:将DataStream以文本格式写入path指定的文件
parsed.writeAsText("data/out");

// 控制台打印
parsed.print();

触发程序执行

需要调用StreamExecutionEnvironment 的execute()、executeAsync()方法来触发程序执行

execute()方法将等待作业完成,然后返回一个JobExecutionResult,其中包含执行时间和累加器结果。

JobExecutionResult result = env.execute();

如果不想等待作业完成,使用executeAsync() 方法来触发作业异步执行。它会返回一个 JobClient,可以通过它与刚刚提交的作业进行通信。

JobClient jobClient = env.executeAsync();
JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();

示例

public static void main(String[] args) throws Exception {
        // 获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从文件中读取数据
        DataStreamSource<String> text = env.readTextFile("data/test.text");

        // 应用转换算子
        DataStream<Integer> parsed = text.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) {
                int number = Integer.parseInt(value);
                System.out.println("number = " + number);
                return number;
            }
        });

        // 简单skin:将DataStream以文本格式写入path指定的文件
        parsed.writeAsText("data/out");

        // 控制台打印
        parsed.print();

        // 触发执行
        env.execute();
    }

Flink常见数据类型

原始数据类型:例如布尔值、整数(byte、short、int、long)、浮点数(float、double)和字符(char)

字符串类型:表示为 Java 类型 String 或 scala 类型 String

时间和日期类型:包括 Timestamp 和 Date,以及 Interval 类型,用于表示时间间隔

数组类型:数组是同一类型的元素的有序集合

元组类型:元组是不同类型的元素的有序集合

列表类型:列表是具有相同元素类型的有序元素集合

映射类型:映射是键值对的无序集合,键和值可以是任何类型

POJO类型:POJO 是普通的 Java 对象,它们包含字段或属性,可以通过名称或 getter 和 setter 方法进行访问

Row类型:Row 是一个有序的、命名的字段集合。与POJO类型类似,但没有setter 和getter方法

可序列化类型:即实现 java.io.Serializable 接口的类型

基本数据类型

Flink支持Java中的所有基本数据类型,例如布尔值、整数(byte、short、int、long)、浮点数(float、double)和字符(char)。

在Flink中定义一个int类型的流

DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5);

字符串类型

字符串类型在Flink中也很常见,可以使用Java或Scala中的String类型表示。

DataStream<String> stream = env.fromElements("hello", "world");

时间和日期类型

时间和日期类型包括DATE、TIME、TIMESTAMP类型,用于表示时间间隔。

DataStream<Tuple2<String, Timestamp>> stream = env.fromElements(
    Tuple2.of("event-1", new Timestamp(System.currentTimeMillis())),
    Tuple2.of("event-2", new Timestamp(System.currentTimeMillis() - 1000))
);

数组类型

数组是同一类型的元素的有序集合。包括基本数据类型数组(PRIMITIVE_ARRAY)和复杂数据类型数组(OBJECT_ARRAY。其中,基本数据类型数组可以是任意基本数据类型的数组,而复杂数据类型数组则可以是结构体或者嵌套的数组。

DataStream<int[]> stream = env.fromElements(new int[]{1, 2, 3}, new int[]{4, 5, 6});

元组类型

元组是复合类型,包含固定数量的各种类型的字段。Java API提供了从Tuple1到Tuple25,不支持空字段

元组是不同类型的元素的有序集合。也就是说元组的每个字段都可以是任意Flink 类型,包括更多元组,从而产生嵌套元组

DataStream<Tuple3<String, Integer, Double>> stream = env.fromElements(
    Tuple3.of("a", 1, 1.1),
    Tuple3.of("b", 2, 2.2)
);

列表类型

列表是具有相同元素类型的有序元素集合。

DataStream<List<String>> stream = env.fromElements(Arrays.asList("hello", "world"), Arrays.asList("foo", "bar"));

映射类型

映射是键值对的无序集合,键和值可以是任何类型。

Map<String, Integer> map1 = new HashMap<>();
map1.put("a", 1);
map1.put("b", 2);

Map<String, Integer> map2 = new HashMap<>();
map2.put("c", 3);
map2.put("d", 4);

DataStream<Map<String, Integer>> stream = env.fromElements(map1, map2);

POJO类型

POJO是普通的Java对象,它们包含字段或属性,可以通过名称或getter和setter方法进行访问

Flink对POJO 类型的要求如下:

类是公有public的
有一个无参的构造方法
所有属性都是公有public的,要么必须可通过 getter 和 setter 函数访问
所有属性的类型都是可以序列化的
public class Person {
    public String name;
    public int age;


    public String getName() { return name; }
    public void setName(String name) { this.name = name; }
    public int getAge() { return age; }
    public void setAge(int age) { this.age = age; }
}

DataStream<Person> stream = env.fromElements(
    new Person("Alice", 25),
    new Person("Bob", 30)
);

Row类型

Row是一个有序的、命名的字段集合。与 POJO类型类似,但没有setter 和 getter 方法。可以认为是具有任意个字段的元组,并支持空字段。

可序列化类型

即实现 java.io.Serializable 接口的类型。

public class MySerializableClass implements Serializable {
    private int value;

    public MySerializableClass(int value)

类型提示

Flink的类型提示Type Hints机制,它可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。也就是说可以帮助Flink更好地理解数据集中元素的类型,从而提高程序的性能。

使用TypeHint或Types类来指定数据集元素的类型

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> input = env.fromCollection(Arrays.asList("a b", "b c", "c d"));
        
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = input
                .flatMap(
                        (String value, Collector<Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1));
                            }
                        }
                )
                // 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式设置系统当前返回类型,才能正确解析出完整数据
                .returns(new TypeHint<Tuple2<String, Integer>>() {
                })
//                .returns(Types.TUPLE(Types.STRING,Types.INT))
                .keyBy(value -> value.f0)
                .sum(1);

        sum.print();
        env.execute();
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1089358.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Java 进阶篇】深入了解JavaScript中的函数

函数是JavaScript编程中的核心概念之一。它们是可重用的代码块&#xff0c;可以帮助您组织和管理程序&#xff0c;使您的代码更具可读性和可维护性。在本篇博客中&#xff0c;我们将深入了解JavaScript中的函数&#xff0c;包括函数的基本语法、参数、返回值、作用域、闭包和高…

网安朝阳·西门子白帽黑客大赛 | 聚焦实战攻防竞赛 促进网安人才发展

2023年10月12日&#xff0c;由西门子&#xff08;中国&#xff09;有限公司、北京市朝阳区人民政府主办&#xff0c;西门子医疗系统有限公司作为赞助单位&#xff0c;北京赛宁网安科技有限公司&#xff08;赛宁网安&#xff09;、北京精典元素公关顾问有限公司承办&#xff0c;…

进程概念[上]

一、冯诺依曼体系结构 冯 • 诺依曼体系结构核心原理为&#xff1a;用户输入的数据先放到内存当中&#xff0c;CPU 读取数据的时候就直接从内存当中读取&#xff0c;CPU 处理完数据后又写回内存当中&#xff0c;然后内存再将数据输出到输出设备当中&#xff0c;最后由输出设备进…

【已解决】HTTP method names must be tokens

该接口应该是http&#xff0c;但是在平台中的填写成https&#xff0c;导致的问题&#xff0c;所以将对应的接口修改为http即可

C#面对对象(英雄联盟人物管理系统)

目录 英雄信息类 因为要在两个窗体里面调用字典&#xff0c;所以要写两个类来构建全局变量 添加功能 查询功能 英雄信息类 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace WindowsFormsApp…

2023-10-13 LeetCode每日一题(避免洪水泛滥)

2023-10-13每日一题 一、题目编号 1488. 避免洪水泛滥二、题目链接 点击跳转到题目位置 三、题目描述 你的国家有无数个湖泊&#xff0c;所有湖泊一开始都是空的。当第 n 个湖泊下雨前是空的&#xff0c;那么它就会装满水。如果第 n 个湖泊下雨前是 满的 &#xff0c;这个…

《Unity Shader入门精要》笔记08

文章目录 Unity的渲染路径前向渲染路径前向渲染路径原理Unity中的前向渲染BassPassAdditional Pass 内置的光照变量和函数 延迟渲染路径延迟渲染的原理Unity中的延迟渲染可访问的内置变量和函数 Unity的光源类型光源类型有什么影响平行光点光源聚光灯 在前向渲染中处理不同的光…

Linux友人帐之系统管理与虚拟机相关

一、虚拟机相关操作 1.1虚拟机克隆 虚拟机克隆是指将一个已经安装好的虚拟机复制出一个或多个完全相同的副本&#xff0c;包括虚拟机的配置、操作系统、应用程序等&#xff0c;从而节省安装和配置的时间和资源。 虚拟机克隆的主要用途有&#xff1a; 创建多个相同或相似的虚拟…

使用Python实现网页中图片的批量下载和水印添加保存

数字时代&#xff0c;图片已经成为我们生活中的一部分。无论是社交媒体上的照片&#xff0c;还是网页中的图片元素&#xff0c;我们都希望能够方便地下载并进行个性化的处理。 假设你是一位设计师&#xff0c;你经常需要从网页上下载大量的图片素材&#xff0c;并为这些图片添加…

深入了解基数排序:原理、性能分析与 Java 实现

基数排序&#xff08;Radix Sort&#xff09;是一种非比较性排序算法&#xff0c;它根据元素的每个位上的值来进行排序。基数排序适用于整数或字符串等数据类型的排序。本文将详细介绍基数排序的原理、性能分析及java实现。 基数排序原理 基数排序的基本原理是按照低位先排序&…

如何解决找不到msvcr100.dll问题,msvcr100.dll丢失的多种修复方案

当我的电脑出现MSVCR100.DLL丢失这个问题时&#xff0c;我感到非常困扰。我试图通过重新安装Visual C 2010 Redistributable Package来解决这个问题&#xff0c;但是这个方法并不总是有效。有些时候&#xff0c;即使我重新安装了整个软件包&#xff0c;MSVCR100.DLL文件仍然找不…

政策加码聚焦工业现代化发展,团队聚能驱动AI机器视觉高质量发展

随着智能制造进程的持续推进&#xff0c;新一代信息技术引领着第四次工业革命&#xff0c;机器视觉技术乘着东风实现高速发展&#xff0c;其视觉创新应用产品全面铺开&#xff0c;新应用、新模式不断涌现。深眸科技紧抓时代发展机遇&#xff0c;基于领先的图像算法和自主研究的…

Windows下DataGrip连接Hive

DataGrip连接Hive 1. 启动Hadoop2. 启动hiveserver2服务3. 启动元数据服务4. 启动DG 1. 启动Hadoop 在控制台中输入start-all.cmd后&#xff0c;弹出下图4个终端&#xff08;注意终端的名字&#xff09;2. 启动hiveserver2服务 单独开一个窗口启动hiveserver2服务&#xff0c;…

clone()方法使用时遇到的问题解决方法(JAVA)

我们平时在自定义类型中使用这个方法时会连续遇到 4 个问题。 基础代码如下&#xff1a; class A {int[] a {1,2,3}; }public class Test {public static void main(String[] args) {} } 第一个&#xff1a; 当我们直接调用时报错原因是Object类中的clone方法是被protecte…

什么叫AI自动直播?

AI自动直播是一种使用人工智能技术进行自动直播的程序或系统。 它可以自动录制视频&#xff0c;并在直播平台上进行展示&#xff0c;以吸引观众并提高品牌知名度。AI自动直播通常需要使用特定的软件或平台来实现&#xff0c;并且需要具备一定的编程和人工智能知识。 AI自动直…

win10搭建gtest测试环境+vs2019

首先是下载gtest&#xff0c;这个我已经放在了博客上方资源绑定处&#xff0c;这个适用于win10vs版本&#xff0c;关于liunx版本的不能用这个。 或者百度网盘链接&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/15m62KAJ29vNe1mrmAcmehA 提取码&#xff1a;vfxz 下…

苹果CMS海螺模版V20修复版/加广告代码 ​适合视频影视类网站使用​

最新苹果CMS海螺模版V20修复版&#xff0c;增加广告代码&#xff0c;适合视频影视类网站使用&#xff0c;有兴趣的可以研究研究。 修复说明&#xff1a; 修复多线路时播放页列表点其他线路还是播放默认线路的问题 修复前台黑白切换和字体颜色切换失效 修复微信二维码没有对…

《进化优化》第4章 遗传算法的数学模型

文章目录 4.1 图式理论4.2 马尔可夫链4.3 进化算法的马尔可夫模型的符号4.4 遗传算法的马尔可夫模型4.4.1 选择4.4.2 变异4.4.3 交叉 4.5 遗传算法的动态系统模型4.5.1 选择4.5.2 变异4.5.3 交叉 4.1 图式理论 图式是描述一组个体的位模式&#xff0c;其中用*来表示不在乎的位…

基于PLC的机械手控制系统设计

目录 摘 要......................................................................................................................... 1 第一章 绪论.............................................................................................................…

什么是promise?如何使用?应用场景?

什么是Promise&#xff1f; Promise是一种用于处理异步操作的JavaScript编程模式。它允许你更优雅地处理异步代码&#xff0c;避免了回调地狱&#xff08;Callback Hell&#xff09;的问题&#xff0c;使代码更易于理解和维护。Promise是ES6&#xff08;ECMAScript 2015&#…