0、汇总
========1.14.0========
1.有界流支持 Checkpoint;
2.批执行模式支持 DataStream 和 Table/SQL 混合应用;
3.新增 Hybrid Source 功能;
4.新增 缓冲区去膨胀 功能;
5.新增 细粒度资源管理 功能;
6.新增 DataStream 的 Pulsar 连接器;
========1.15.0========
1.支持增量的 Savepoint;
2.保证作业级别的指标在 Reactive 模式下可以正常工作;
3.为自适应调度器添加了异常历史记录;
4.引入自适应批调度器,支持根据每个节点需要处理的数据量的大小自动决定批处理作业中各节点的并行度;
5.支持跨源节点的 Watermark 对齐;
6.SQL 通过保持拓扑不变的方式使相同的查询在升级 Flink 版本后仍然可以启动和执行;
7.支持基于 Changelog 的状态存储;
8.支持清理重试;
9.增加对 OpenAPI 的支持;
10.默认启用作业结束前的 Checkpoint;
11.优化 Application 模式;
12.批处理支持 Window table-valued 函数;
13.SQL 增加 CAST函数和JSON函数;
14.可以结合任何 Scala 版本 (包括 Scala 3) 使用 Flink的 Java API;
15.新增支持异步输出与端到端一致性的ES Sink;
16.支持 CSV 格式和 小文件压缩功能;
========1.16.0========
1.批处理-SQL Gateway 支持 REST API 和 HiveServer2 协议;
2.批处理-完善Hive语法兼容,使用 HiveServer2 协议连接 SQL Gateway,SQL Gateway 会自动注册 Hive Catalog,自动切换到 Hive 方言,自动使用批处理模式提交作业;
3.批处理-支持Join Hint;
4.批处理-支持自适应 Hash Join;
5.批处理-支持批处理的预测执行;
6.批处理-支持混合 Shuffle 模式(实验);
7.批处理-优化Blocking shuffle;
8.批处理-支持动态分区裁剪;
9.流处理-支持Generalized Incremental Checkpoint;
10.流处理-改进 RocksDB Rescaling;
11.流处理-改善 State Backend 的监测体验和可用性;
12.流处理-支持透支缓冲区;
13.流处理-更新了从 Aligned Checkpoint(AC)切换到 Unaligned Checkpoint(UC)的时间点;
14.流处理-对于复杂的流作业可以在运行前检测并解决潜在的正确性问题;
15.流处理-维表关联-支持了通用的缓存机制和相关指标,可以加速维表查询;
16.流处理-维表关联-通过作业配置或查询提示支持可配置的异步模式(ALLOW_UNORDERED),在不影响正确性的前提下大大提升查询吞吐;
17.流处理-维表关联-支持可重试的查询,解决维表数据更新延迟问题;
18.流处理-异步 I/O 支持重试;
19.流处理-新语法-USING JAR 支持动态加载 UDF jar包,方便平台开发者轻松实现 UDF 的管理和相关作业的提交;
20.流处理-新语法-CREATE TABLE AS SELECT 方便用户基于已有的表和查询创建新的表;
21.流处理-新语法-ANALYZE TABLE 支持用户手工为原表生成统计信息,以便优化器可以生成更优的执行计划;
22.批处理-新语法-支持通过 DataStream#cache 缓存 Transformation 的执行结果;
23.History Server 及已完成作业的信息增强;
24.在 Table API 或 SQL 应用程序中支持 Protobuf 格式;
25.为异步 Sink 引入可配置的 RateLimitingStrategy;
========1.17.0========
1.批处理-支持 Sink 算子预测执行,优化预测执行慢任务的检测;
2.批处理-自适应批处理调度器成为了批作业的默认调度器,改进自适应批调度器的配置,增强了自适应批处理调度器的能力;
3.批处理-混合 Shuffle 模式支持自适应批调度器和预测执行,混合 Shuffle 模式支持重用中间数据;
4.批处理-支持 SQL Client 的 gateway 模式,可以使用 SQL 语句来管理作业的生命周期;
5.批处理-为 Batch 模式引入了新的 Delete 和 Update API,扩展了 ALTER TABLE 语法,包括 ADD/MODIFY/DROP 列、主键和 Watermark 的能力;
6.批处理-优化Hive connector,在流批模式下均能自动地进行文件合并;
7.引入了动态规划 join-reorder 算法,引入了动态 local hash aggregation 策略,移除了不必要的虚拟函数调用;
8.流处理-引入实验性功能 PLAN_ADVICE;
9.流处理-增强 watermark 对齐;
10.流处理-扩展 Streaming FileSink;
11.流处理-解决 UC 会写入过多的小文件,导致 HDFS 的 namenode 负载过高的问题,提供了 REST API,可以在作业运行时手动触发具有自定义 Checkpoint 类型的 Checkpoint;
12.流处理-RocksDBStateBackend 升级,Calcite 升级;
13.在 Slack 频道加入了性能日常监控汇报来帮助开发者快速发现性能回退问题。
14.支持 Task 级别火焰图;
15.支持通用的令牌机制;
========1.18.0========
1.FlinkSQL-新增 Flink SQL Gateway 的 JDBC Driver 支持
2.FlinkSQL-新增 Flink 连接器的存储过程(Stored Procedure)支持
3.FlinkSQL-新增 DDL 支持
4.FlinkSQL-新增 时间旅行(Time Traveling)SQL 支持
5.流处理-Table API & SQL 支持算子级别状态保留时间(TTL)
6.流处理-支持使用 SQL Hint 配置 水印对齐 和 数据源空闲超时
7.批处理-Hybrid Shuffle 支持远程存储
8.批处理-新增 Flink SQL 的运行时过滤(Runtime Filter)
9.批处理-新增 Flink SQL 算子的融合代码生成(Operator Fusion Codegen)
10.云原生-增强自动弹性化(Elasticity)
11.云原生-新增通过 REST API 控制动态细粒度扩缩容
12.云原生-支持更快地 RocksDB 扩缩容
13.新增 Java 17 支持
14.生产可用的水印对齐(Watermark Alignment)功能
15.可插拔式故障处理
16.SQL 客户端改进
17.Apache Pekko 代替 Akka
18.Calcite 升级
19.重要 API 弃用
- SourceFunction已经弃用
- SinkFunction 尚未正式弃用,即将被 SinkV2 所取代
- Queryable State现已弃用
- DataSet API现已弃用
1、Apache Flink 1.14.0 发布公告
0)关键功能汇总
1.有界流支持 Checkpoint;
2.批执行模式支持 DataStream 和 Table/SQL 混合应用;
3.新增 Hybrid Source 功能;
4.新增 缓冲区去膨胀 功能;
5.新增 细粒度资源管理 功能;
6.新增 DataStream 的 Pulsar 连接器;
1)流批一体的处理体验
1.有界流 Checkpoint 机制
Flink 支持在部分任务结束后创建 Checkpoint,以及在有界流处理结束后触发最终 Checkpoint 以确保在作业结束时将所有输出结果提交到 Sink(与 stop-with-savepoint 类似)。
在配置中添加 execution.checkpointing.checkpoints-after-tasks-finish.enabled: true 启用。
2.DataStream 和 Table/SQL 混合应用的批执行模式
有界的批执行模式的 SQL/Table 应用可将其中间数据表转换成数据流,经过由 DataStream API 定义的算子处理,再转换回数据表。
3.混合 Source
混合 Source 能够依次从多个数据源读取数据,在不同数据源之间无缝切换,产出一条由来自多个数据源的数据合并而成的数据流。
例如,将新数据灌入 Kafka,并最终迁移至 S3,混合 Source 可以像读取一条连续的逻辑数据流一样,先从 S3 读取历史数据,然后转换到 Kafka 读取最新的数据。
4.整合 Source 和 Sink
2)运维改进
1.缓冲区去膨胀
可以最小化 Checkpoint 的延迟和开销,通过自动调整网络内存的用量,在确保高吞吐的同时最小化缓冲区中的数据量。
2.细粒度资源管理
TaskManager 上的 Slot 可以动态改变大小。
3)连接器
1.连接器指标
对连接器的指标进行了标准化。
2.Pulsar 连接器
新增 DataStream API 的 Pulsar 连接器。
4)PyFlink
1.基于链接的性能提升
与 Java API 将任务中的转换函数、算子链接起来以避免序列化开销类似,PyFlink 现在也会将 Python 函数链接起来。
2.环回调试模式
该模式下,用户自定义 Python 函数将由运行客户端的 Python 进程执行,该进程是启动 PyFlink 应用的入口,负责执行用于构建数据流 DAG 的所有 DataStream API 和 Table API 代码。
用户现在本地运行 PyFlink 作业时,可以通过在 IDE 中设置断点的方式方便地调试 Python 函数。
3.其他改进
支持用 Yarn Application 模式执行作业、支持使用 tgz 压缩格式的 Python 归档文件等
5)告别旧版 SQL 引擎和 Mesos 支持
2、Apache Flink 1.15 发布公告
0)关键功能汇总
1.支持增量的 Savepoint;
2.保证作业级别的指标在 Reactive 模式下可以正常工作;
3.为自适应调度器添加了异常历史记录;
4.引入自适应批调度器,支持根据每个节点需要处理的数据量的大小自动决定批处理作业中各节点的并行度;
5.支持跨源节点的 Watermark 对齐;
6.SQL 通过保持拓扑不变的方式使相同的查询在升级 Flink 版本后仍然可以启动和执行;
7.支持基于 Changelog 的状态存储;
8.支持清理重试;
9.增加对 OpenAPI 的支持;
10.默认启用作业结束前的 Checkpoint;
11.优化 Application 模式;
12.批处理支持 Window table-valued 函数;
13.SQL 增加 CAST函数和JSON函数;
14.可以结合任何 Scala 版本 (包括 Scala 3) 使用 Flink的 Java API;
15.新增支持异步输出与端到端一致性的ES Sink;
16.支持 CSV 格式和 小文件压缩功能;
1)运维 Apache Flink
1.澄清 Checkpoint 与 Savepoint 语义
如果用户选择使用原生格式并且同时使用了 RocksDB 状态存储,那么Savepoint 将采用增量的方式来执行
2.基于 Reactive 模式与自适应调度器的弹性伸缩
改进了 Reactive 模式,保证了作业级别的指标在 Reactive 模式下也可以正常工作。
为自适应调度器添加了异常历史记录。
提高了缩减作业规模的速度。
3.自适应批调度器
引入了自适应批处理调度器,可以自动根据每个节点需要处理的数据量的大小自动决定批处理作业中各节点的并行度。
此调度器的主要优点包括:
- 易用性:批处理作业的用户不再需要手动调优并行度。
- 自适应:自动调整并行度可以更好地适应节点消费数据集随时间发生变化的情况。
- 细粒度:每个作业节点的并行度可以单独调整。这允许 SQL 批处理作业的节点自动为每个节点单独选择最适合的并行度。
4.跨源节点的 Watermark 对齐
基于新的 Source 接口来实现的数据源节点可以启用 Watermark 对齐功能。
用户可以定义对齐组,如果其中某个源节点与其它节点相比Watermark领先过多,用户可以暂停从该节点中消费数据。
5.SQL 版本升级
问题:SQL 查询的执行计划及其生成的拓扑是通过优化规则和一个基于成本的模型来得到的,即使最小的更改也可能会产生一个完全不同的拓扑,这种动态性使得在不同 Flink 版本间保证快照兼容性非常具有挑战性。
升级:通过保持拓扑不变的方式使相同的查询在升级 Flink 版本后仍然可以启动和执行。
6.基于 Changelog 的状态存储
基于 Changelog 的状态存储通过在后台不断向非易失性存储上上传状态变化的记录来实现以下目标:
更短的端到端延迟:端到端延迟主要取决于 Checkpoint 机制,特别是使用了两阶段提交的支持端到端一致性的 Sink 节点的情况,这种情况下缩短 Checkpoint 周期意味着可以更快的提交数据。
更可预测的 Checkpoint 间隔:目前 Checkpoint 的完成时间很大程度上取决于需要保存在 Checkpoint 中的数据的大小。通过使这一数据总是可以很小,Checkpoint 的完成时间变得更加可以预测。
恢复工作更少:Checkpoint 越频繁,每次重启后重新处理的数据也会越少。
7.可重复的清理
在以前的 Flink 版本中,Flink 在作业结束时只尝试清理一次与作业相关的残留数据,这可能会导致在发生错误时无法完成清理。
在这个版本中,Flink 将尝试重复运行清理以避免残留数据。
默认情况下,Flink 将不断重试,直到运行成功为止,可以通过配置来改变这种行为,禁用重试策略可以恢复 Flink 之前版本的行为。
8.Open API
Flink 现在提供遵循 Open API 标准的 REST API 规范。
这允许 REST API 与遵循 Open API 标准的工具直接交互。
9.Application 模式的改进
在 Application 模式下 运行 Flink 时,通过相关配置,可以保证作业在结束前能够正常完成 stop-with-savepoint 操作。
在 Application 模式下运行的作业的恢复和清理也得到了改进,本地状态的元数据也可以保存在工作目录中,这使得从本地状态恢复更容易 (例如将工作目录设定在非易失的跨机器的存储中的情况,之前本地状态的元数据保存在内存中,因此在作业恢复时无法找回)。
2)流批一体的更多进展
1.作业结束前的 Checkpoint
默认启用对作业结束前等待一次 Checkpoint 操作的支持。
2.Window table-valued 函数
Window table-valued 函数支持在批模式下使用。
3)Flink SQL
1.CAST / 类型系统增强
失败的 CAST 的默认行为已从返回 null 更改为返回错误。
2.JSON 函数
引入多个 JSON 处理函数。
4)社区支持
1.云环境互操作性
2.Elasticsearch Sink
基于最新的 Sink API 来实现的,可以提供异步输出与端到端一致性的能力,可以作为未来更多 Sink 实现的模板。
3.Scala-free 的 Flink
Scala 用户可以结合任何 Scala 版本 (包括 Scala 3) 使用 Flink的 Java API。
4.PyFlink
引入了一种 “线程” 模式的新执行模式:用户自定义的函数将在 JVM 中作为线程执行,而不是在单独的 Python 进程中执行。
5)其它
添加了对CSV 格式 小文件压缩功能的支持。
3、Apache Flink 1.16 发布公告
0)关键功能汇总
1.批处理-SQL Gateway 支持 REST API 和 HiveServer2 协议;
2.批处理-完善Hive语法兼容,使用 HiveServer2 协议连接 SQL Gateway,SQL Gateway 会自动注册 Hive Catalog,自动切换到 Hive 方言,自动使用批处理模式提交作业;
3.批处理-支持Join Hint;
4.批处理-支持自适应 Hash Join;
5.批处理-支持批处理的预测执行;
6.批处理-支持混合 Shuffle 模式(实验);
7.批处理-优化Blocking shuffle;
8.批处理-支持动态分区裁剪;
9.流处理-支持Generalized Incremental Checkpoint;
10.流处理-改进 RocksDB Rescaling;
11.流处理-改善 State Backend 的监测体验和可用性;
12.流处理-支持透支缓冲区;
13.流处理-更新了从 Aligned Checkpoint(AC)切换到 Unaligned Checkpoint(UC)的时间点;
14.流处理-对于复杂的流作业可以在运行前检测并解决潜在的正确性问题;
15.流处理-维表关联-支持了通用的缓存机制和相关指标,可以加速维表查询;
16.流处理-维表关联-通过作业配置或查询提示支持可配置的异步模式(ALLOW_UNORDERED),在不影响正确性的前提下大大提升查询吞吐;
17.流处理-维表关联-支持可重试的查询,解决维表数据更新延迟问题;
18.流处理-异步 I/O 支持重试;
19.流处理-新语法-USING JAR 支持动态加载 UDF jar包,方便平台开发者轻松实现 UDF 的管理和相关作业的提交;
20.流处理-新语法-CREATE TABLE AS SELECT 方便用户基于已有的表和查询创建新的表;
21.流处理-新语法-ANALYZE TABLE 支持用户手工为原表生成统计信息,以便优化器可以生成更优的执行计划;
22.批处理-新语法-支持通过 DataStream#cache 缓存 Transformation 的执行结果;
23.History Server 及已完成作业的信息增强;
24.在 Table API 或 SQL 应用程序中支持 Protobuf 格式;
25.为异步 Sink 引入可配置的 RateLimitingStrategy;
1)批处理
1.SQL Gateway
SQL Gateway 是对 SQL Client 的扩展和增强,支持多租户和插件式 API 协议(Endpoint),解决了 SQL Client 只能服务单用户并且不能对接外部服务或组件的问题。
当前 SQL Gateway 已支持 REST API 和 HiveServer2 协议,用户可以通过 cURL,Postman,各种编程语言的 HTTP 客户端链接到 SQL Gateway 提交流作业、批作业,甚至 OLAP 作业。
2.Hive 语法兼容
HiveServer2 协议允许使用 Hive JDBC/Beeline 和 SQL Gateway 进行交互,Hive 生态(DBeaver, Apache Superset, Apache DolphinScheduler, and Apache Zeppelin)也因此很容易迁移到 Flink。
当用户使用 HiveServer2 协议连接 SQL Gateway,SQL Gateway 会自动注册 Hive Catalog,自动切换到 Hive 方言,自动使用批处理模式提交作业,用户可以得到和直接使用 HiveServer2 一样的体验。
Flink 完善了对 Hive 语法的兼容,增加了对 Hive 若干生产中常用语法的支持。
3.Join Hint
Hint 是业界用来干预执行计划以改善优化器缺点的通用解决方案。
统计信息缺失或优化器的代价模型不完善都会导致选出错误 Join 策略,从而导致作业运行慢甚至有运行失败的风险。
用户通过指定 Join Hint,让优化器尽可能选择指定的 Join 策略,避免优化器的各种不足,以确保批作业的生产可用性。
4.自适应 Hash Join
批作业中数据倾斜是常见的,使用 HashJoin 可能运行失败,因此引入自适应的 HashJoin:
Join 算子运行时一旦 HashJoin 运行失败,可以自动回退到 SortMergeJoin,并且是 Task 粒度,通过该机制确保 HashJoin 算子始终成功,从而提高了作业的稳定性。
5.批处理的预测执行
问题机器:指存在硬件问题、突发 I/O 繁忙或 CPU 负载高等问题的机器,可能会使运行在该机器上的任务比其他机器上的任务要慢得多,从而影响批处理作业的整体执行时间。
启用预测执行时:
- Flink 将持续检测慢任务,一旦检测到慢任务,该任务所在的机器将被识别为问题机器,并通过黑名单机制被加黑,调度器将为慢任务创建新的执行实例并将它们部署到未被加黑的节点,同时现有执行实例也将继续运行。
- 新的执行实例和老的执行实例将处理相同的输入数据并产出相同的结果数据,一旦任何执行实例率先完成,它将被视为该任务的唯一完成执行实例,并且该任务的其余执行实例都将被取消。
大多数现有 Source 都可以使用预测执行,目前 Sink 尚不支持预测执行。
增强了 Web UI 和 REST API,以显示任务的多个执行实例和被加黑的 TaskManager。
6.混合 Shuffle 模式
结合 Blocking Shuffle 和 Pipeline Shuffle 的优点:
- 与 Blocking Shuffle 一样,它不要求上下游任务同时运行,这允许使用很少的资源执行作业。
- 与 Pipeline Shuffle 一样,它不要求上游任务完成后才执行下游任务,这在给定足够资源情况下减少了作业的整体执行时间。
- 用户可以选择不同的落盘策略,以满足减少数据落盘或是降低任务重启代价的不同需求。
注意:该功能为实验性的,并且默认关闭。
7.Blocking shuffle 优化
改进了 Blocking Shuffle 的可用性和性能,包括自适应网络缓冲区分配、顺序 IO 优化和结果分区重用,允许多个消费者节点重用同一个物理结果分区,以减少磁盘 IO 和存储空间。
还引入了两种压缩率更高的压缩算法(LZO 和 ZSTD)与默认的 LZ4 压缩算法相比,进一步减少存储空间,但要付出一些 CPU 成本。
8.动态分区裁剪
静态分区裁剪:即在优化阶段,优化器将 Filter 中的 Partition 相关的过滤条件下推到 Source Connector 中从而减少不必要的分区读取。
动态分区裁剪:即运行时根据其他相关表的数据确定分区裁剪信息从而减少对分区表中无效分区的读取。
2)流处理
1.Generalized Incremental Checkpoint
基于 Changelog State Backend 在自身易用性上和与其他 State Backend 兼容性上做了诸多改进:
- 支持状态迁移
- 支持 Failover 时从本地恢复
- 引入文件缓存优化恢复过程的性能
- 支持从 Checkpoint 进行切换
- 优化监控体验:
- 扩充了 Changelog 的监控指标
- 在 Flink WebUI 上显示 Changelog 相关的配置
2.RocksDB Rescaling 改进
使用了 RocksDB 的区间删除来优化增量 RocksDB State Backend 的 Rescaling 性能。
区间删除被用来避免在 Rescaling 过程中大量的扫描和单点删除操作,对有大量的状态需要删除的扩并发来说,单个并发上的恢复速度可以提高 2~10 倍。
3.改善 State Backend 的监测体验和可用性
之前,RocksDB 的日志位于它自己的 DB 目录中,这个版本让 RocksDB 的日志默认留在 Flink 的日志目录中。
新增了 RocksDB 相关的统计指标,以帮助调试 DB 级别的性能,例如,在 DB 内的总块缓存命中/失败计数。
4.支持透支缓冲区
透支缓冲区(Overdraft Buffers)旨在缓解反压情况下 Subtask 被阻塞的概率,通过设置 taskmanager.network.memory.max-overdraft-buffers-per-gate 开启。
从 1.16 开始,一个 Flink 的 Subtask 可以申请 5 个(默认)额外的透支缓冲区。
透支缓冲区会轻微地增加作业的内存使用量,但可以极大地减少 Checkpoint 的间隔,特别是在开启 Unaligned Checkpoint 情况下。只有当前 Subtask 被下游 Subtasks 反压且当前 Subtask 需要请求超过 1 个网络缓冲区(Network Buffer)才能完成当前的操作时,透支缓冲区才会被使用。
5.对齐 Checkpoint 超时
更新了从 Aligned Checkpoint(AC)切换到 Unaligned Checkpoint(UC)的时间点。
- 在开启 UC 的情况下,如果配置了 execution.checkpointing.aligned-checkpoint-timeout,在启动时每个 Checkpoint 仍然是 AC,但当全局 Checkpoint 持续时间超过 aligned-checkpoint-timeout 时,如果 AC 还没完成,那么 Checkpoint 将会转换为 UC。
- 以前,对一个 Substask 来说,AC 到 UC 的切换需要等所有上游的 Barriers 到达后才能开始,在反压严重的情况下,在 checkpointing-timeout 过期之前,下游的 Substask 可能无法完全地收到所有 Barriers,从而导致 Checkpoint 失败。
- 在这个版本中,如果上游 Subtask 中的 Barrier 无法在 execution.checkpointing.aligned-checkpoint-timeout 内发送到下游,Flink 会让上游的 Subtask 先切换成 UC,以把 Barrier 发送到下游,从而减少反压情况下 Checkpoint 超时的概率。
6.流计算的非确定性
对于复杂的流作业,现在可以在运行前检测并解决潜在的正确性问题。
如果问题不能完全解决,一个详细的消息可以提示用户如何调整 SQL,以避免引入非确定性问题。
7.维表增强
维表关联在流处理中被广泛使用,在 1.16 中加入了多项优化和增强:
- 支持了通用的缓存机制和相关指标,可以加速维表查询。
- 通过作业配置或查询提示支持可配置的异步模式(ALLOW_UNORDERED),在不影响正确性的前提下大大提升查询吞吐。
- 可重试的查询机制让用户解决维表数据更新延迟问题有了更多的手段
8.异步 I/O 支持重试
为异步 I/O引入了内置的重试机制,对用户现有代码是透明的,可以灵活地满足用户的重试和异常处理需求。
3)PyFlink
在 Python DataStream API 中以及在 Table API 和 SQL 的 Python 表值函数中,也支持了新的执行模式:“线程”模式。
在该模式下,用户自定义的 Python 函数将通过 JNI 在 JVM 中执行,而不是在独立的 Python 进程中执行。
4)其他
1.新语法
- USING JAR 支持动态加载 UDF jar包,方便平台开发者轻松实现 UDF 的管理和相关作业的提交。
- CREATE TABLE AS SELECT 方便用户基于已有的表和查询创建新的表。
- ANALYZE TABLE 支持用户手工为原表生成统计信息,以便优化器可以生成更优的执行计划。
2.DataStream 中的缓存
支持通过 DataStream#cache 缓存 Transformation 的执行结果。缓存的中间结果在首次计算中间结果时才生成,以便以后的作业可以重用该结果。
如果缓存丢失,原始的 Transformation 将会被重新计算以得到结果。目前该功能只在批处理模式下支持。这个功能对于 Python 中的 ML 和交互式编程非常有用。
3.History Server 及已完成作业的信息增强
- JobManager / HistoryServer WebUI 提供了详细的执行时间指标,包括任务在每个执行状态下的耗时,以及在运行过程中繁忙/空闲/反压总时间。
- JobManager / HistoryServer WebUI 提供了按 Task 或者 TaskManager 维度分组的主要子任务指标的聚合。
- JobManager / HistoryServer WebUI 提供了更多的环境信息,包括环境变量,JVM 选项和 Classpath。
- HistoryServer 现在支持从外部日志归档服务中浏览日志
4.Protobuf 格式
在 Table API 或 SQL 应用程序中支持 Protobuf 格式。
5.为异步 Sink 引入可配置的 RateLimitingStrategy
1.15 中允许实现自定义异步 Sink。
1.16 中支持可配置的 RateLimitingStrategy,可以自定义其异步 Sink 在请求失败时的行为方式,具体行为取决于特定的 Sink。
如果没有指定 RateLimitingStrategy,它将默认使用 AIMDScalingStrategy。
4、Apache Flink 1.17 发布公告
0)关键功能汇总
1.批处理-支持 Sink 算子预测执行,优化预测执行慢任务的检测;
2.批处理-自适应批处理调度器成为了批作业的默认调度器,改进自适应批调度器的配置,增强了自适应批处理调度器的能力;
3.批处理-混合 Shuffle 模式支持自适应批调度器和预测执行,混合 Shuffle 模式支持重用中间数据;
4.批处理-支持 SQL Client 的 gateway 模式,可以使用 SQL 语句来管理作业的生命周期;
5.批处理-为 Batch 模式引入了新的 Delete 和 Update API,扩展了 ALTER TABLE 语法,包括 ADD/MODIFY/DROP 列、主键和 Watermark 的能力;
6.批处理-优化Hive connector,在流批模式下均能自动地进行文件合并;
7.引入了动态规划 join-reorder 算法,引入了动态 local hash aggregation 策略,移除了不必要的虚拟函数调用;
8.流处理-引入实验性功能 PLAN_ADVICE;
9.流处理-增强 watermark 对齐;
10.流处理-扩展 Streaming FileSink;
11.流处理-解决 UC 会写入过多的小文件,导致 HDFS 的 namenode 负载过高的问题,提供了 REST API,可以在作业运行时手动触发具有自定义 Checkpoint 类型的 Checkpoint;
12.流处理-RocksDBStateBackend 升级,Calcite 升级;
13.在 Slack 频道加入了性能日常监控汇报来帮助开发者快速发现性能回退问题。
14.支持 Task 级别火焰图;
15.支持通用的令牌机制;
1)批处理
1.预测执行
Sink 算子支持预测执行,包括 DiscardingSink、PrintSinkFunction、PrintSink、FileSink、FileSystemOutputFormat 和 HiveTableSink。
优化预测执行慢任务的检测。
2.自适应批处理调度器
自适应批处理调度器成为了批作业的默认调度器。
该调度器可以根据每个 job vertex 处理的数据量,自动为其设置合适的并行度,是唯一支持预测执行的调度器。
改进自适应批调度器的配置,不再需要显式将全局默认并行度设置为-1 来开启自动推导并行度功能,如果设置了全局默认并行度,其会被用做自动推导并行度的上界。
增强了自适应批处理调度器的能力,可以根据细粒度的数据分布信息,将数据更均匀的分配给下游任务,自动推导的并行度现在也不再被限制为 2 的幂。
3.优化混合 Shuffle 模式
混合 Shuffle 模式支持自适应批调度器和预测执行。
混合 Shuffle 模式支持重用中间数据。
提高了稳定性,避免了在大规模生产环境中出现的稳定性问题
4.SQL Client/Gateway
支持了 SQL Client 的 gateway 模式,允许用户将 SQL 查询提交给 SQL Gateway 来使用 Gateway 的各种功能。
可以使用 SQL 语句来管理作业的生命周期,包括显示作业信息和停止正在运行的作业。
5.SQL API
为 Batch 模式引入了新的 Delete 和 Update API,并将其暴露给连接器,这样外部存储系统便可以基于 API 实现行级更新和删除。
扩展了 ALTER TABLE 语法,包括 ADD/MODIFY/DROP 列、主键和 Watermark 的能力。
6.Hive 兼容
优化Hive connector,在之前的版本中,对于 Hive 的写入,只支持在流模式下自动地进行文件合并,从 Flink 1.17 开始,在批模式下也能自动地进行文件合并,可以大大减少小文件的数量。
通过加载HiveModule来使用 Hive 内置函数时,引入了原生的 Hive 聚合函数如 SUM/COUNT/AVG/MIN/MAX 进 HiveModule 中,可以在基于哈希的聚合算子上执行,带来显著的性能提升。
7.TPC-DS
引入了动态规划 join-reorder 算法(注意:join-reorder 默认未开启)
引入了动态 local hash aggregation 策略,根据数据的分布,动态确定是否需要在本地进行聚合操作以提高性能。
移除了不必要的虚拟函数调用,加快执行速度。
2)流处理
1.Streaming SQL 语义完善
引入实验性功能 PLAN_ADVICE,该功能可以检测用户 SQL 潜在的正确性风险,并提供优化建议。
如果用户通过 EXPLAIN PLAN_ADVICE 命令发现查询存在 NDU(非确定性更新) 问题,优化器会在物理计划输出的末尾追加建议,建议会标记到对应操作节点上,并提示用户更新查询和配置,通过提供具体的建议,优化器可以帮助用户提高查询结果的准确性。
== Optimized Physical Plan With Advice ==
...
advice[1]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.
PLAN_ADVICE 还可以帮助提高查询的性能和效率,如果检测到聚合操作可以优化为更高效的 local-global 聚合操作,优化器会提供相应的优化建议,通过应用这些具体的建议,优化器可以帮用户提高其查询的性能和效率。
== Optimized Physical Plan With Advice ==
...
advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.optimizer.agg-phase-strategy' to 'AUTO').
2.Watermark 对齐增强
Watermark 对齐增强-解决 event time 作业中的源数据倾斜问题;
该方案存在限制:Source 并行度必须和分区数匹配,因为具有多个分区的 Source 算子中,如果一个分区比另一个分区更快地发出数据,此时需要缓存大量数据。
增强 watermark 对齐:考虑 watermark 边界的情况下对 Source 算子内的多个分区进行数据发射对齐,确保了 Source 中的 Watermark 前进更加协调,避免了下游算子缓存过多的数据,从而提高了流作业的执行效率。
3.Streaming FileSink 扩展
添加 ABFS 支持后,StreamingFileSink 可以支持五种不同的文件系统:HDFS、S3、OSS、ABFS 和 Local。
4.Checkpoint 改进
解决了之前版本的 UC 会写入过多的小文件,可能导致 HDFS 的 namenode 负载过高的问题。
提供了 REST API,可以在作业运行时手动触发具有自定义 Checkpoint 类型的 Checkpoint,例如,使用增量 Checkpoint 运行的作业,可以定期或手动触发全量 Checkpoint 来去除多个增量 Checkpoint 之间的关联关系,从而避免引用很久以前的文件。
5.RocksDBStateBackend 升级
将 FRocksDB 的版本升级到 6.20.3-ververica-2.0:
- 支持在 Apple 芯片上构建 FRocksDB Java
- 通过避免昂贵的 ToString() 操作提高 Compaction Filter 的性能
- 升级 FRocksDB 的 ZLIB 版本,避免 Memory Corruption
- 为 RocksJava 添加 periodic_compaction_seconds 选项
提供参数扩大 TaskManager 的 slot 之间共享内存的范围,可以在 TaskManager 中 slot 内存使用不均匀时提高内存效率,基于此在调整参数后可以以资源隔离为代价来降低整体内存消耗。
6.Calcite 升级
将 Calcite 版本升级到 1.29.0 以提高 Flink SQL 系统的性能和效率。
3)其他
1.PyFlink
支持 Python 3.10、支持在 Mac M1 和 M2 电脑上运行 PyFlink,改进了 Java 和 Python 进程之间的跨进程通信的稳定性、支持以字符串的方式声明 Python UDF 的结果类型、支持在 Python UDF 中访问作业参数。
2.性能监控 Benchmark
在 Slack 频道加入了性能日常监控汇报来帮助开发者快速发现性能回退问题。
3.Task 级别火焰图
Flame Graph 功能提供了针对 task 级别的可视化支持。
4.通用的令牌机制
在 Flink 1.17 之前 Flink 只支持 Kerberos 认证和基于 Hadoop 的令牌;
在 Flink 1.17 之后 Flink 的委托令牌框架更加通用,使其认证协议不再局限于 Hadoop。
5、Apache Flink 1.18 发布公告
0、重要功能总结
1)Flink SQL 提升
1.Flink SQL Gateway 的 JDBC Driver
提供了 Flink SQL Gateway 的 JDBC Driver,可以使用支持 JDBC 的任何 SQL 客户端通过 Flink SQL 与表进行交互。
2.Flink 连接器的存储过程(Stored Procedure)支持
可以通过 Catalog 接口自定义存储过程到连接器中。
连接器内的存储过程提高了 Flink 的 SQL 和 Table API 的可扩展性。
可以使用 Call 语句来直接调用 catalog 内置的存储过程。
CREATE TABLE `paimon`.`default`.`T` (
id BIGINT PRIMARY KEY NOT ENFORCED,
dt STRING, -- format 'yyyy-MM-dd'
v STRING
);
-- use catalog before call
proceduresUSE CATALOG `paimon`;
-- compact the whole table using call statement
CALL sys.compact('default.T');
3.DDL 支持扩展
支持以下功能(需要底层连接器支持):
- REPLACE TABLE AS SELECT
- CREATE OR REPLACE TABLE AS SELECT
- ALTER TABLE ADD PARTITION
- ALTER TABLE DROP PARTITION
- SHOW PARTITIONS
- 批处理模式 TRUNCATE TABLE
4.时间旅行(Time Traveling)
支持时间旅行(time travel) SQL 语法,用于查询历史版本的数据,可以指定一个时间点,来检索表在该时间点的数据和架构。
-- 查询表 `paimon_tb` 在 2022年11月11日的数据
SELECT * FROM paimon_tb FOR SYSTEM_TIME AS OF TIMESTAMP '2022-11-11 00:00:00';
2)流处理提升
1.Table API & SQL 支持算子级别状态保留时间(TTL)
Table API 和 SQL 可以为有状态的算子单独设置状态保留时间 (TTL)。
在像流 regular join 这样的场景中,可以为左侧和右侧流设置不同的 TTL。
在以前的版本中,状态保留时间只能在 pipeline 级别使用配置项 table.exec.state.ttl 进行控制,引入算子级别的状态保留后,用户现在可以根据其具体需求优化资源使用。
2.SQL 的水印对齐(Watermark Alignment)和空闲检测(Idleness Detection)
可以使用 SQL Hint 配置 水印对齐 和 数据源空闲超时,之前这些功能仅在 DataStream API 中可用。
3)批处理提升
1.Hybrid Shuffle 支持远程存储
Hybrid Shuffle 支持将 Shuffle 数据存储在远程存储中,可以使用配置项 taskmanager.network.hybrid-shuffle.remote.path 配置远程存储路径。
Hybrid Shuffle 通过将内存用量与并行度解耦,减少了网络内存的使用,提高了稳定性和易用性。
2.性能提升与 TPC-DS 基准测试
a) Flink SQL 的运行时过滤(Runtime Filter)
运行时过滤(Runtime Filter)是用于优化 join 性能的常见方法,旨在动态生成某些 join 查询的运行时过滤条件,以减少扫描或 Shuffle 的数据量,避免不必要的 I/O 和网络传输,从而加速查询。
b) Flink SQL 算子的融合代码生成(Operator Fusion Codegen)
算子融合代码生成(Operator Fusion Codegen)通过将算子 DAG 融合成一个经过优化的单算子,消除了虚函数调用,利用 CPU 寄存器进行中间数据操作,并减少指令缓存不命中的情况,从而提高了查询的执行性能。
**注意:**上述功能默认处于关闭状态,使用 table.optimizer.runtime-filter.enabled 和 table.exec.operator-fusion-codegen.enabled 两个配置项来启用。
4)迈向云原生弹性化
1.自动弹性化(Elasticity)
Flink 1.15 版本,引入自适应调度器,构成了完全弹性 Apache Flink 部署的核心,允许作业在运行时更改其资源要求和并行度,还根据集群中可用的资源进行自适应调整,只有当集群能够满足作业的最低所需资源时才会重新调整。
Flink 1.18 版本之前,自适应调度器主要用于响应模式(Reactive Mode),单个作业始终会使用集群中的所有可用资源。
在 Flink 1.18 版本中,自适应调度器变得更加强大和更广泛适用,并正在成为 Apache Flink 流处理任务的默认调度器。
2.通过 REST API 控制动态细粒度扩缩容
在作业运行时,通过 Flink Web UI 和 REST API 更改作业的任何 task 的并行度。
3.更快地 RocksDB 扩缩容
提升了并行下载的能力,从只并行下载状态句柄(state handle),扩展到并行下载文件,关闭了用于扩缩容的临时 RocksDB 实例在批量插入时的写前日志(write-ahead-logging)。
4.Java 17 支持
5)其他改进
1.生产可用的水印对齐(Watermark Alignment)功能
2.可插拔式故障处理
3.SQL 客户端的改进
4.Apache Pekko 代替 Akka
5.Calcite 升级
6)重要 API 弃用
- SourceFunction已经弃用
- SinkFunction 尚未正式弃用,即将被 SinkV2 所取代
- Queryable State现已弃用
- DataSet API现已弃用