1. 版本说明
本文档内容基于 flink-1.13.x
,其他版本的整理,请查看本人博客的 flink 专栏其他文章。
2. 执行配置
StreamExecutionEnvironment
包含 ExecutionConfig
对象,该对象允许程序指定运行时的配置值。改变默认值可以影响所有的任务,具体查看 Configuration。
Java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();
Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
var executionConfig = env.getConfig
下面的配置选项均可用,加粗的为默认值:
setClosureCleanerLevel()
,默认设置封闭清洁器级别为ClosureCleanerLevel.RECURSIVE
。封闭清洁器可以移除 flink 程序中不再需要的对匿名函数类的引用。禁用封闭清洁器后,匿名用户函数可能会引用周围的类,而这些类通常是不可被序列化的,这将导致序列化异常。
NONE
:完全禁用封闭清洁器。
TOP_LEVEL
:只清理顶级类,不包括属性。
RECURSIVE
:递归清理所有属性。getParallelism()
/setParallelism(int parallelism)
:设置任务默认的并行度。getMaxParallelism()
/setMaxParallelism(int parallelism)
:设置任务默认的最大并行度。该设置决定了最大并行度,并且指定了动态缩放的上限。getNumberOfExecutionRetries()
/setNumberOfExecutionRetries(int numberOfExecutionRetries)
:设置失败任务重新执行的次数,0 表示禁用容错,-1 为系统默认值。该配置已经过时,请使用 restart strategies 。getExecutionRetryDelay()
/setExecutionRetryDelay(long executionRetryDelay)
:设置任务失败后,在重新执行之前等待的延迟毫秒值。延迟会在 TaskManager 上所有的任务成功停止后之后开始计时,延迟一旦完成,任务就会重新启动。该参数对于延迟重新执行是非常有用的,他可以将某些与超时相关的故障充分显示出来,比如还没有超时的连接,以防止重新执行后遇到相同的问题又马上失败。该参数需要想要起作用,需要将任务失败重新尝试值设置为 1 或更大值。该配置已经过时,请使用 restart strategies。getExecutionMode()
/setExecutionMode()
:默认执行模式为 PIPELINED。设置程序执行的模式,执行模式决定数据交互是以批的方式进行,还是以 pipelined 方式进行。enableForceKryo()
/disableForceKryo
:Kryo 默认并不是强制的。尽管我们可以对 POJO 直接进行分析,但 GenericTypeInformation 仍然会强制对 POJO 类型使用 Kryo 序列化器,在有些情况下这是可取的,比如 Flink 内部序列化器无法对 POJO 进行正确处理。enableForceAvro()
/disableForceAvro()
:Avro 默认并不是强制的。Flink 会强制 AvroTypeInfo 使用 Avro 序列化器而不是 Kryo 来序列化器 Avro POJO。enableObjectReuse()
/disableObjectReuse()
:默认情况下,Flink 中的对象是不会被重用的。开启对象重用模式将会指导运行时重用用户对象,以获取更高的性能。注意,如果算子中的用户代码没有注意到该行为时,可能会导致结果不准确。getGlobalJobParameters()
/setGlobalJobParameters()
:该方法用于设置自定义对象为任务的全局配置,尽管ExecutionConfig
已经可以在所有的用户自定义函数中访问了,但是该方法依然是一个简单的方式让配置在任务中全局可用。addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer)
:使用给定的type
注册 Kryo 序列化器实例。addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
:使用给定的type
注册 Kryo 序列化器实例。registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer)
:使用 Kryo 注册给定的类型,并为他指定一个序列化器。使用 Kryo 注册一个类型,可以使该类型的序列化更加有效。registerKryoType(Class<?> type)
:如果类型最终被 Kryo 序列化,之后它将会被注册到 Kryo,以确定值写入标签信息(integer ID)。如果类型没有被 Kryo 序列化,则每个示例的整个类名都将会被序列化,这将导致很多 I/O 消耗。registerPojoType(Class<?> type)
:使用序列化堆栈注册给定的类型。如果类型最终被序列化为 POJO,然后类型就会使用 POJO 序列化器注册。如果类型最终被序列化为 Kryo,然后他将会被注册为 Kryo,以保证值写入标签。如果类型没有被注册为 Kryo,则每个实例的整个类名都将会被序列化,这将导致很高的 I/O 消耗。
注意,使用 registerKryoType()
注册的类型对于 Flink 的 POJO 序列化器实例中是不可用的。
disableAutoTypeRegistration()
:默认开启类型自动注册。用户代码会使用 Kryo 和 POJO 序列化器来自动类型注册所有的类型,包括子类型。setTaskCancellationInterval(long interval)
:设置连续尝试取消运行中任务的间隔,毫秒值。当取消一个任务时,如果任务线程在一个确定的时间内没有停止,则会创建一个新的线程定期调用任务线程的interrupt()
方法。该参数提供连续调用interrupt()
方法的时间,默认为 30000 毫秒,或 30 秒。
在所有的用户自定义的 Rich*
函数中,都可以通过 getRuntimeContext()
方法获取 RuntimeContext
,并且可以访问 ExecutionConfig
。
3. 程序打包
3.1. 程序打包和分布式运行
正如之前所描述的,Flink 程序可以使用 远程环境
在集群上执行。程序也可以被打包成 JAR 文件(Java Archives)来执行。如果使用命令行的方式执行程序,则必须将程序打包。
3.2. 打包程序
为了能够通过命令行或 web 界面执行打包的 JAR 文件,程序必须使用通过 StreamExecutionEnvironment.getExecutionEnvironment()
获取的 environment 对象。当 JAR 被提交到命令行或 web 界面后,该 environment 会扮演集群环境的角色。如果调用 Flink 程序的方式与上述不同,则 environment 对象会扮演本地环境的角色。
打包程序只需要简单地将所有相关的类导出为 JAR 文件即可,JAR 文件的 manifest 必须指向包含程序入口点(拥有公共 main
方法)的类。最简单的实现方法是将 main-class 写入 manifest 中,比如 main-class: org.apache.flinkexample.MyProgram
。main-class 属性与 Java 虚拟机通过指令 java -jar pathToTheJarFile
执行 JAR 文件时寻找 main 方法的类是相同的。大多数 IDE 都提供了在导出 JAR 文件时自动包含该属性的功能。
3.3. 总结
调用打包后程序的完整流程包括两步:
- 搜索 JAR 文件 manifest 中的 main-class 或 program-class 属性。如果两个属性同时存在,则 program-class 属性会优先于 main-class 属性。对于 JAR manifest 中两个属性都不存在的情况,命令行和 web 界面支持手动传入主类名参数。
- 之后系统调用该主类的 main 方法。
4. 并行执行
本节描述了在 Flink 中配置程序的并行执行。一个 Flink 程序由多个任务 task 组成(转换/算子、数据源和数据接收器)。一个 task 包括多个并行执行的实例,且每一个实例都会处理 task 输入数据的一个子集。一个 task 的并行实例数被称为该 task 的 并行度 (parallelism)。
使用 savepoints 时,应该考虑设置最大并行度。当作业从一个 savepoint 恢复时,可以改变特定算子或着整个程序的并行度,并且此设置会限定整个程序的并行度的上限。由于 Flink 内部会将状态划分为 key-groups,且由于性能所限不能无限制地增加 key-groups,因此设定最大并行度是很有必要的。
4.1. 设置并行度
一个 task 的并行度可以从多个层次指定:
4.1.1. 算子层次
单个算子、数据源和数据接收器的并行度可以通过调用 setParallelism()
方法来指定。如下所示:
Java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = [...]
val wordCounts = text
.flatMap{ _.split(" ") map { (_, 1) } }
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1).setParallelism(5)
wordCounts.print()
env.execute("Word Count Example")
4.1.2. 执行环境层次
Flink 程序运行在执行环境的上下文中,执行环境会为所有执行的算子、数据源、数据接收器 (data sink) 定义一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。
可以通过调用 setParallelism()
方法指定执行环境的默认并行度。如果想以并行度 3
来执行所有的算子、数据源和数据接收器,可以在执行环境上设置默认并行度,如下所示:
Java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();
env.execute("Word Count Example");
Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
val text = [...]
val wordCounts = text
.flatMap{ _.split(" ") map { (_, 1) } }
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1)
wordCounts.print()
env.execute("Word Count Example")
4.1.3. 客户端层次
将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。
在 CLI 客户端中,可以通过 -p
参数指定并行度,例如:
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
在 Java/Scala 程序中,可以通过如下方式指定并行度:
Java
try {
PackagedProgram program = new PackagedProgram(file, args);
InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
Configuration config = new Configuration();
Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());
// 这儿设置并行度为 10
client.run(program, 10, true);
} catch (ProgramInvocationException e) {
e.printStackTrace();
}
Scala
try {
PackagedProgram program = new PackagedProgram(file, args)
InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
Configuration config = new Configuration()
Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())
// 这儿设置并行度为 10
client.run(program, 10, true)
} catch {
case e: Exception => e.printStackTrace
}
4.1.4. 系统层次
可以通过设置 ./conf/flink-conf.yaml
文件中的 parallelism.default
参数,在系统层次指定所有执行环境的默认并行度。可以通过查阅配置文档获取更多细节。
4.2. 设置最大并行度
最大并行度可以在所有设置并行度的地方进行设定(客户端和系统层次除外)。与调用 setParallelism()
方法修改并行度相似,可以通过调用 setMaxParallelism()
方法来设定最大并行度。
默认的最大并行度等于将 operatorParallelism + (operatorParallelism / 2)
值四舍五入到大于等于该值的一个整型值,并且这个整型值是 2
的幂次方,注意默认最大并行度下限为 128
,上限为 32768
。
注意:给最大并行度设置一个非常大的值会降低性能,因为一些状态后端需要维持内部的数据结构,而这些数据结构将会随着 key-groups 的数目而扩张(key-group 是状态重新分配的最小单元)。
5. 执行计划
Flink 的优化器会根据诸如数据量或集群机器数等不同的参数自动地为你的程序选择执行策略。但在大多数情况下,准确地了解 Flink 如何执行你的程序是很有必要的。
5.1. 执行计划可视化工具
Flink 为执行计划提供了可视化工具,它可以把用 JSON 格式表示的作业执行计划以图的形式展现,并且其中包含完整的执行策略标注。
以下代码展示如何在你的程序中打印 JSON 格式的执行计划:
Java
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
...
System.out.println(env.getExecutionPlan());
Scala
val env = ExecutionEnvironment.getExecutionEnvironment
...
println(env.getExecutionPlan())
可以通过如下步骤可视化执行计划:
- 使用你的浏览器打开可视化工具网站
- 将 JSON 字符串拷贝并粘贴到文本框中
- 点击 draw 按钮
完成后,详细的执行计划图会在网页中呈现。
5.2. Web 界面
Flink 提供了用于提交和执行任务的 Web 界面。该界面是 JobManager Web 界面的一部分,起到管理监控的作用,默认情况下运行在 8081 端口。
可视化工具可以在执行 Flink 作业之前展示执行计划图,你可以据该图指定程序的参数。
6. Task故障恢复
当任务发生故障时,Flink 需要重启出错的 Task 以及其他受到影响的 Task ,以将作业恢复到正常执行状态。
Flink 通过重启策略和故障恢复策略来控制 Task 重启:重启策略会决定是否可以重启以及重启的间隔;故障恢复策略决定哪些 Task 需要重启。
6.1. 重启策略
Flink 作业如果没有定义重启策略,则会遵循集群启动时加载的默认重启策略。 如果提交作业时设置了重启策略,则该策略将覆盖掉集群的默认策略。
通过 Flink 的配置文件 flink-conf.yaml
来设置默认的重启策略,配置参数 restart-strategy
定义采取何种策略。如果没有启用 checkpoint,就采用“不重启”策略。如果启用了 checkpoint 且没有配置重启策略,那么就采用固定延时重启策略,此时最大尝试重启次数为 Integer.MAX_VALUE
。下表列出了可用的重启策略与其对应的配置值。
每个重启策略都有自己的一组配置参数来控制其行为,这些参数也可以在配置文件中设置。后文的描述中会详细介绍每种重启策略的配置项。
Key | Default | Type | Description |
---|---|---|---|
restart-strategy | (none) | String | 定义任务失败时的重启策略。可用值有:none 、off 、disable :不重启策略。fixeddelay 、fixed-delay :固定延迟重启策略,可以从 here 获取更多细节。failurerate 、failure-rate :失败率重启策略,可以从 here 获取更多细节。exponentialdelay 、exponential-delay :指数延迟重启策略,可以从 here 获取更多细节。如果禁用了 checkpointing,则默认值为 node 。如果启用了 checkpointing,则默认值为 fixed-delay ,重启间隔为 1 s 延迟,重启次数为 Integer.MAX_VALUE 。 |
除了定义默认的重启策略外,还可以为每个 Flink 作业单独定义重启策略,这个重启策略可以通过在程序中的 ExecutionEnvironment
对象上调用 setRestartStrategy
方法来设置。当然,对于 StreamExecutionEnvironment
对象也同样适用。
下例展示了如何给我们的作业设置固定延时重启策略。如果发生故障,系统会重启作业 3 次,每两次连续的重启尝试之间等待 10 秒钟。
Java
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 延时
));
Scala
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 延时
))
以下部分详细描述各重启策略的配置项。
6.1.1. 固定延迟重启策略
固定延时重启策略按照给定的次数尝试重启作业。如果尝试超过了给定的最大次数,作业将会完成失败。在连续的两次重启尝试之间,重启会策略等待一段固定长度的时间。
通过在 flink-conf.yaml
中设置如下配置参数,默认启用此策略。
restart-strategy: fixed-delay
Key | Default | Type | Description |
---|---|---|---|
restart-strategy.fixed-delay.attempts | 1 | Integer | 将 restart-strategy 设置为 fixed-delay 时,该参数表示任务的最大重启次数。 |
restart-strategy.fixed-delay.delay | 1 s | Duration | 将 restart-strategy 设置为 fixed-delay 时,该参数表示两次连续的重启尝试之间的延迟。延迟重启对程序和外部系统的交互是有帮助的,比如连接或等待中的事务,他们应该在重启之前达到一个指定的超时时间。该值可以使用这些符号指定:1 min 、20 s 。 |
例如:
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
固定延迟重启策略也可以在程序中设置:
Java
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 延时
));
Scala
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 延时
))
6.1.2. 指数延迟重启策略
指数重启策略会无限尝试重启任务,其重启延迟会不断增加,直到最大延迟。任务永远不会最终失败。在两次连续的重启尝试之间,重启延迟会保持指数增加直到到达最大延迟,之后会一直保持最大延迟。
当任务重启成功之后,指数延迟值会在一段时间之后重置,该阈值是可以配置的。
restart-strategy: exponential-delay
Key | Default | Type | Description |
---|---|---|---|
restart-strategy.exponential-delay.backoff-multiplier | 2.0 | Double | 如果将 restart-strategy 设置为 exponential-delay ,则该值表示:每次失败之后,后退值乘以此值,直到达到最大后退值。 |
restart-strategy.exponential-delay.initial-backoff | 1 s | Duration | 如果将 restart-strategy 设置为 exponential-delay ,则该值表示:两次重启之间的起始时间间隔。可以使用这些符号指定该值:1 min 、20 s 。 |
restart-strategy.exponential-delay.jitter-factor | 0.1 | Double | 如果将 restart-strategy 设置为 exponential-delay ,则该值表示:指定后退值的抖动部分,该值代表后退值可以增加或减少多大的随机值。如果你想避免在同一时间内多次重启任务的话,该设置是非常有帮助的。 |
restart-strategy.exponential-delay.max-backoff | 5 min | Duration | 如果将 restart-strategy 设置为 exponential-delay ,则该值表示:两次重启之间的最大可能值。可以使用这些符号指定该值:1 min 、20 s 。 |
restart-strategy.exponential-delay.reset-backoff-threshold | 1 h | Duration | 如果将 restart-strategy 设置为 exponential-delay ,则该值表示:将回退值重置为初始值的时间间隔阈值。该值指定过多长时间之后,任务必须是正常执行的,并且重置指数增加的回退值为他的初始值。可以使用这些符号指定该值:1 min 、20 s 。 |
For example:
restart-strategy.exponential-delay.initial-backoff: 10 s
restart-strategy.exponential-delay.max-backoff: 2 min
restart-strategy.exponential-delay.backoff-multiplier: 2.0
restart-strategy.exponential-delay.reset-backoff-threshold: 10 min
restart-strategy.exponential-delay.jitter-factor: 0.1
指数延迟重启策略以可以通过程序设置:
Java
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
Time.milliseconds(1),
Time.milliseconds(1000),
1.1, // 指数乘子
Time.milliseconds(2000), // 重置重启延迟的时间间隔阈值
0.1 // 抖动
));
Scala
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
Time.of(1, TimeUnit.MILLISECONDS), // 两次重启之前的初始延迟
Time.of(1000, TimeUnit.MILLISECONDS), // 两次重启之间的最大延迟
1.1, // 指数乘子
Time.of(2, TimeUnit.SECONDS), // 重置重启延迟的时间间隔阈值
0.1 // 抖动
))
6.1.3. 失败率重启策略
故障率重启策略在故障发生之后重启作业,但是当故障率(每个时间间隔发生故障的次数)超过设定的限制时,作业会最终失败。在连续的两次重启尝试之间,重启策略会等待一段固定长度的时间。
通过在 flink-conf.yaml
中设置如下配置参数,默认启用此策略。
restart-strategy: failure-rate
Key | Default | Type | Description |
---|---|---|---|
restart-strategy.failure-rate.delay | 1 s | Duration | 如果将 restart-strategy 设置为 failure-rate ,则该值表示:两次连续的重启之间的延迟。可以使用这些符号指定该值:1 min 、20 s 。 |
restart-strategy.failure-rate.failure-rate-interval | 1 min | Duration | 如果将 restart-strategy 设置为 exponential-delay ,则该值表示:评估故障率的时间间隔。可以使用这些符号指定该值:1 min 、20 s 。 |
restart-strategy.failure-rate.max-failures-per-interval | 1 | Integer | 如果将 restart-strategy 设置为 exponential-delay ,则该值表示:在给定的时间间隔内将任务最终失败的最大重启次数。 |
例如:
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
故障率重启策略也可以在程序中设置:
Java
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个时间间隔的最大故障次数
Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
Time.of(10, TimeUnit.SECONDS) // 延时
));
Scala
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个时间间隔的最大故障次数
Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
Time.of(10, TimeUnit.SECONDS) // 延时
))
6.1.4. 不重启策略
作业直接失败,不尝试重启。
restart-strategy: none
不重启策略也可以在程序中设置:
Java
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
Scala
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())
6.1.5. 回退重启策略
使用群集定义的重启策略,这对于启用了 checkpoint 的流处理程序很有帮助。如果没有定义其他重启策略,默认选择固定延时重启策略。
6.2. 故障恢复策略
Flink 支持多种不同的故障恢复策略,该策略需要通过 Flink 配置文件 flink-conf.yaml
中的 jobmanager.execution.failover-strategy
配置项进行配置。
故障恢复策略 | jobmanager.execution.failover-strategy 配置值 |
---|---|
重启所有 | full |
重启局部 pipeline | region |
6.2.1. 重启所有
在整体重启故障恢复策略下,某个 Task 发生故障时会重启作业中的所有的 Task 进行故障恢复。
6.2.2. 重启局部 pipeline
该策略会将作业中的所有 Task 划分为数个 Region。当某个 Task 发生故障时,它会尝试找出进行故障恢复需要重启的最小 Region 集合。相比于全局重启故障恢复策略,这种策略在一些场景下的故障恢复中需要重启的 Task 会更少。
此处 Region 指以 Pipelined 形式进行数据交换的 Task 集合。也就是说,Batch 形式的数据交换会构成 Region 的边界。
- DataStream 和 流式 Table/SQL 作业的所有数据交换都是 Pipelined 形式的。
- 批处理式 Table/SQL 作业的所有数据交换默认都是 Batch 形式的。
- DataSet 作业中的数据交换形式会根据 ExecutionConfig 中配置的
ExecutionMode
决定。
需要重启的 Region 的判断逻辑如下:
- 出错 Task 所在 Region 需要重启。
- 如果要重启的 Region 需要消费的数据有部分无法访问(丢失或损坏),生产该部分数据的 Region 也需要重启。
- 需要重启的 Region 的下游 Region 也需要重启。这是出于保障数据一致性的考虑,因为一些非确定性的计算或者分发会导致同一个结果分区内每次计算的结果数据都不相同。