最小程序MiniProgram
前面我们已经搭建起了Flink 的基础环境,这一节我们就在上一节的基础上,进行编写我们的第一个Flink 程序,开始之前我们先看一下一个完整的Flink 程序是什么样的
Flink 程序结构
为了演示Flink 程序结构,我们下面写了一个程序,这个程序我称之为 MiniProgram
,也就是流程序的最小结构
package com.kingcall.examples.stream;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
public class MiniProgram {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> arras = new ArrayList<>();
arras.add("China");
arras.add("Japan");
arras.add("Russia");
DataStream<String> country= env.fromCollection(arras);
DataStream<String> filters = country.filter(new FilterFunction<String>() {
@Override
public boolean filter(String str) throws Exception {
return !"Japan".equals(str);
}
});
filters.print();
env.execute();
}
}
可以参考下面的注释,一个流程序就这样被创建出来了,一个标准的程序就是这样的结构
- 每个 Flink 应用都需要有执行环境,在该示例中为
env
。流式应用需要用到StreamExecutionEnvironment
。 - DataStream API 将你的应用构建为一个 job graph,并附加到
StreamExecutionEnvironment
。当调用env.execute()
时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。 - 上述示例用
filters.print()
打印其结果到 task manager 的日志中(如果运行在 IDE 中时,将追加到你的 IDE 控制台)。它会对流中的每个元素都调用toString()
方法。
‘
需要特别注意的是,Flink 的流程序需要我们在最后调用env.execute() 才可以执行,在此之前都是在定义程序的执行逻辑,只有最后的这一步骤才会提交任务并执行
如果没有调用 execute(),应用就不会运行
下面就是程序的输出
’
1> 和 5> 指出输出来自哪个 sub-task(即 thread),我们可以对上面的程序进行改造一下,也就是设置并行度env.setParallelism(1);
这个时候输出就没有>
这样的提示了
‘
基本的 stream source
上述示例用 env.fromElements(...)
方法构造 DataStream<Person>
。这样将简单的流放在一起是为了方便用于原型或测试。StreamExecutionEnvironment
上还有一个 fromCollection(Collection)
方法。因此,你可以这样做:
List<Person> people = new ArrayList<Person>();
people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));
DataStream<Person> flintstones = env.fromCollection(people);
另一个获取数据到流中的便捷方法是用 socket
DataStream<String> lines = env.socketTextStream("localhost", 9999)
或读取文件
DataStream<String> lines = env.readTextFile("file:///path");
在真实的应用中,最常用的数据源是那些支持低延迟,高吞吐并行读取以及重复(高性能和容错能力为先决条件)的数据源,例如 Apache Kafka,Kinesis 和各种文件系统。REST API 和数据库也经常用于增强流处理的能力(stream enrichment)。
基本的 stream sink
上述示例用 filters.print()
打印其结果到 task manager 的日志中(如果运行在 IDE 中时,将追加到你的 IDE 控制台)。它会对流中的每个元素都调用 toString()
方法。
输出看起来类似于
1> Fred: age 35
2> Wilma: age 35
- 1> 和 2> 指出输出来自哪个 sub-task(即 thread)在生产中
- 常用的 sink 包括各种数据库和几个 pub-sub 系统
什么是流
前面我们介绍了Flink程序的结构,我们是通过一个叫做最小程序的MiniProgram
进行说明的,我们之前一直说到Flink 程序的数据流,流说明数据是实时的流动的,那这里的数据有什么格式的要求吗,或者什么样的数据才能流动。
DataStream API 得名于特殊的 DataStream
类,该类用于表示 Flink 程序中的数据集合。你可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。
DataStream
在用法上类似于常规的 Java 集合
,但在某些关键方面却大不相同。它们是不可变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地察看内部元素,而只能使用 DataStream
API 操作来处理它们,DataStream
API 操作也叫作转换(transformation)。
你可以通过在 Flink 程序中添加 source 创建一个初始的 DataStream
。然后,你可以基于 DataStream
派生新的流,并使用 map、filter 等 API 方法把 DataStream
和派生的流连接在一起。
Flink 的 Java DataStream API 可以将任何可序列化的对象转化为流。Flink 自带的序列化器有
- 基本类型,即 String、Long、Integer、Boolean、Array
- 复合类型:Tuples、POJOs 和 Scala case classes
而且 Flink 会交给 Kryo 序列化其他类型。也可以将其他序列化器和 Flink 一起使用。特别是有良好支持的 Avro。
Java tuples 和 POJOs
Flink 的原生序列化器可以高效地操作 tuples 和 POJOs
Tuples
对于 Java,Flink 自带有 Tuple0
到 Tuple25
类型。
Tuple2<String, Integer> person = Tuple2.of("Fred", 35);
// zero based index!
String name = person.f0;
Integer age = person.f1;
POJOs
如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许“按名称”字段引用):
- 该类是公有且独立的(没有非静态内部类)
- 该类有公有的无参构造函数
- 类(及父类)中所有的所有不被 static、transient 修饰的属性要么是公有的(且不被 final 修饰),要么是包含公有的 getter 和 setter 方法,这些方法遵循 Java bean 命名规范。
示例:
public class Person {
public String name;
public Integer age;
public Person() {}
public Person(String name, Integer age) {
. . .
}
}
Person person = new Person("Fred Flintstone", 35);
完整程序
下面的程序将关于人的记录流作为输入,并且过滤后只包含成年人。
package com.kingcall.examples.stream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;
public class Adults {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
new Person("Pebbles", 2));
DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
@Override
public boolean filter(Person person) throws Exception {
return person.age >= 18;
}
});
adults.print();
env.execute();
}
public static class Person {
public String name;
public Integer age;
public Person() {
}
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
}
}
}
下面就是程序的输出
’
Stream 执行环境
每个 Flink 应用都需要有执行环境,在该示例中为 env
。流式应用需要用到 StreamExecutionEnvironment
。
DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment
。当调用 env.execute()
时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。
注意,如果没有调用 execute(),应用就不会运行。
‘
调试
在生产中,应用程序将在远程集群或一组容器中运行。如果集群或容器挂了,这就属于远程失败。JobManager 和 TaskManager 日志对于调试此类故障非常有用,但是更简单的是 Flink 支持在 IDE 内部进行本地调试。你可以设置断点,检查局部变量,并逐行执行代码。如果想了解 Flink 的工作原理和内部细节,查看 Flink 源码也是非常好的方法。
总结
Flink 中的 DataStream 程序是对数据流(例如过滤、更新状态、定义窗口、聚合)进行转换的常规程序。数据流的起始是从各种源(例如消息队列、套接字流、文件)创建的。结果通过 sink 返回,例如可以将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其它程序中。任务执行可以运行在本地 JVM 中,也可以运行在多台机器的集群上。
Flink 程序看起来像一个转换 DataStream
的常规程序。每个程序由相同的基本部分组成:
- 获取一个
执行环境(execution environment)
; - 加载/创建初始数据;
- 指定数据相关的转换;
- 指定计算结果的存储位置;
- 触发程序执行。