Flink 1.16:Hive SQL 如何平迁到 Flink SQL

news2024/11/25 22:46:25

摘要:本文整理自 Apache Flink PMC&Committer 伍翀(云邪)在 9 月 24 日 Apache Flink Meetup 的演讲。主要内容包括:

  1. Hive SQL 迁移的动机

  2. Hive SQL 迁移的挑战

  3. Hive SQL 迁移的实践

  4. Hive SQL 迁移的演示

  5. 未来规划


Tips:点击「阅读原文」获取 PPT~

01

Hive SQL 迁移的动机

9b49f1896d83434df85888cabd80d696.jpeg

Flink 已经是流计算的事实标准,当前国内外做实时计算或流计算一般都会选择 Flink 和 Flink SQL。另外,Flink 也是是家喻户晓的流批一体大数据计算引擎。

然而,目前 Flink 也面临着挑战。比如虽然现在大规模应用都以流计算为主,但 Flink 批计算的应用并不广泛,想要进一步推动真正意义上的流批一体落地,需要推动业界更多地落地 Flink 批计算,需要更积极地拥抱现有的离线生态。当前业界离线生态主要以 Hive 为主,因此我们在过去版本中做了很多与 Hive 相关的集成,包括 Hive Catalog、Hive 语法兼容、Hive UDF 兼容、流式写入 Hive 等。在 Flink 1.16 版本中,我们进一步提升了 HiveSQL 的兼容度,还支持了 HiveServer2 的协议兼容。

所以,为什么 Flink 要去支持 Hive SQL 的迁移?一方面,我们希望吸引更多的 Hive 离线数仓用户,通过用户来不断打磨批计算引擎,对齐主流批计算引擎。另一方面,通过兼容 Hive SQL,来降低现有离线用户使用 Flink 开发离线业务的门槛。除此之外,另外,生态是开源产品的最大门槛。Flink 已经拥有非常丰富的实时生态工具,但离线生态依然较为欠缺。通过兼容 Hive 生态可以快速融入 Hive 离线生态工具和平台,降低用户接入的成本。最后,这也是实现流批一体的重要一环,我们希望推动业界尝试统一的流计算和批计算引擎,再统一流计算和批计算 SQL。

e0ec0829a9e01ba49a742d393229e26e.jpeg

从用户角度来看,Hive SQL 为什么要迁移到 Flink SQL 上?

对于平台方而言,统一流批计算引擎,只需维护一套 Flink 引擎,可以降低维护成本,提升团队研发效率。另外,可以利用 Flink + Gateway+ HiveSQL 兼容,快速建设一套 OLAP 系统。Flink 的另一优势是拥有丰富的 connector 生态,可以借助 Flink 丰富的数据源实现强大的联邦查询。比如不仅可以在 Hive 数仓里做 ad-hoc 查询,也可以将 Hive 表数据与 MySQL、HBase、Iceberg、Hudi 等数据源做联邦查询等。

对于离线数仓用户而言,可以用 Hive SQL 写流计算作业,极大降低实时化改造成本。使用的依然是以前的 HiveSQL 语法,但是可以运行在 streaming 模式下。在此基础之上也可以进一步探索流批一体 SQL 层以及流批一体数仓层的建设。

02

Hive SQL 迁移的挑战

c81fd3cc04218c6002a5302add0024db.jpeg

但是 Flink 支持 HiveSQL 的迁移面临着很多挑战,主要有以下三个方面:

  • 兼容:包括离线数仓作业和Hive平台工具的兼容。主要对应用户层的兼容和平台方的兼容。

  • 稳定性:迁移后的作业首先要保证生产的稳定性。我们在1.16中也做了很多这方面的工作,包括FLIP-168 预测执行和Adaptive Hash Join。后续我们会发表更多的文章来介绍这方面的工作。

  • 性能:最后性能也是很重要的,在1.16中我们也做了很多这方面的工作,包括Dynamic Partition Pruning(DPP)、元数据访问加速等,后续也会发表更多文章来介绍这方面的工作。

接下来我们重点讲解下 Hive 兼容相关的工作。

8b2b13764d655d44b4f8b7455e406c3a.jpeg

Hive 语法的兼容并没有完全造出一套新的 SQL 引擎,而是复用了 Flink SQL 的很多核心流程和代码。我们抽象出了可插拔的 parser 层来支持和扩展不同的语法。Flink SQL 会经过 Flink Parser 转换成 Flink RelNode,再经过 Logical Plan 优化为 Physical Plan,最后转换为 Job Graph 提交执行。为了支持 Hive 语法兼容,我们引入了 Hive Parser 组件,来将 Hive SQL 转化成 Flink RelNode。这个过程中,复用了大部分 Hive 现有的 SQL 解析逻辑,保证语法层的兼容(均基于 Calcite)。之后 RelNode 复用同样的流程和代码转化成 LogicalPlan、Physical Plan、JobGraph,最后提交执行。

58136ab5955fd0306c1e520654c3a05c.jpeg

从架构上看,Hive 语法兼容并不复杂,但这是一个“魔鬼在细节”的工作。上图为部分 Flink1.16 版本里 Flink Hive 兼容相关的 issue,涉及 query 兼容、类型系统、语义、行为、DDL、DML、辅助查询命令等非常多语法功能。累计完成的 issue 数达近百个。

Flink1.16 版本将 Hive 兼容度从 85% 提升至 94.1%。兼容度测试主要依靠 Hive qtest 测试集,其中包含 12,000 多个测试 case,覆盖了 Hive 目前所有主流语法功能。没有兼容的一部分包括 ACID 功能(业界使用较少),如果除去 ACID 功能,兼容度已达 97%以上。

70fdf21431d635be9e539f332f647409.jpeg

SQLGateway 是 Flink SQL 的 server 层组件,是单独的进程,对标 HiveServer2 组件。从 Flink 整体架构上看,SQLGateway 处于中间位置。

向下,封装了用户 API 的 Flink SQL 和 Hive SQL。不管是 Flink SQL 还是 Hive SQL,都使用 Flink 流批一体的 Runtime 来执行,可以运行在批模式,也可以运行在流模式。Flink 的资源也可以部署运行在 YARN、K8S、Flink standalone 集群上。

向上,SQLGateway 提供了可插拔协议层 Endpoint,目前提供了 HiveServer2 和 REST 两种协议实现。通过 HiveServer2 Endpoint,用户可以将 Hive 生态的很多工具和组件(Zeppelin、Superset、Beeline、DBeaver 等)连接到 SQL Gateway,提供流批统一的 SQL 服务并兼容 Hive SQL。通过 REST 协议可以使用 Postman、curl 命令或自己通过 Python、Java 编程来访问,提供完善和灵活的流计算服务。将来,Endpoint 能力也会继续扩展,比如可以提供更高性能的 gRPC 协议或兼容 PG 协议。

03

Hive SQL 迁移的实践

4cb41bfe5baf5c59131555a833186798.jpeg

目前快手正在与 Flink 社区紧密合作,推进流批一体的落地。目前快手迁移 Hive SQL 作业到 Flink SQL 作业已经取得了初步的进展,已有上千个作业完成了迁移。快手的迁移主要策略为双跑平台,已有业务继续运行,双跑平台有智能路由组件,可以通过指定规则或 pattern 识别出作业,投递到 MapReduce、Spark 或 Flink 上运行。初期的运行较为谨慎,会通过白名单机制指定某些作业先运行在 Flink,观察其稳定性与性能,对比其结果一致性,后续逐步通过规则来放量。更多的实践经验与细节可以关注 Flink Forward Asia 2022 上分享的《Hive SQL 迁移到 Flink SQL 在快手的实践》。

04

Hive SQL 迁移的演示

Demo1:Hive SQL 如何迁移到 Flink SQL

f24000d2ab94cbc600b8bfb8981fa308.jpeg

接下来演示一下 Hive SQL 如何迁移到 Flink SQL。我们已经搭建好一个 YARN 集群,以及 Hive 相关组件,包括 HiveServer2 的服务。我们使用 Zeppelin 做数据可视化和 SQL 查询。我们将演示 Hive SQL 迁移到 Flink SQL 只需改一行地址,Zeppelin 体验并无二致,SQL 也无需修改。完整的 Demo 视频请观看完整的演讲视频:https://www.bilibili.com/video/BV1BV4y1T7d4

99680ab44e0caaa7027dc08066885798.jpeg

首先在 Zeppelin 中配置 Hive Interpreter,填入 HiveServer2 的 JDBC 地址和端口、用户名密码、Driver 等信息。

c332ec5c6bc33e46dc36166d248c512c.jpeg

6318681be0eeb45e084205615b1ada96.jpeg

使用当前的 Hive Interpreter,我们可以通过 Hive DDL 命令创建一张打宽的 store_sale_detail 表。使用 Hive SQL 语法关联 store_sales、date_dim、store 三张表,打成一张宽表写到 store_sale_detail。执行该 INSERT INTO 语句后,便能在 Hadoop 平台上看到运行起来的 MapReduce 任务。

07ff8c299ca1f0ae50cde82478834133.jpeg

store_sale_detail 明细宽表生产完后,我们就可以进行查询分析,比如查看星期天每个店铺的销量。运行完后可通过饼图等多种形式展示结果。

baef1cb81bef24cbea261438c255713f.jpeg

fa76a0c6896d38cc8444acac7190f410.jpeg

上面简单演示了使用 Hive 进行数据生产和数据分析,其中计算引擎使用的是 Hive 原生的 Hadoop MapReduce 作业,作业运行在 YARN 集群上。接下来我们开始迁移到 Flink SQL,作业仍然运行在 YARN 集群上。

041246dce6c13617028e6723220f9466.jpeg

首先搭建 Flink SQL 集群以及启动 SQLGateway。我们已经下载并解压了 Flink 1.16 版本。其中 lib 文件夹下也已经提前准备 Hive connector、JDBC connector 和 MySQL Driver。另外,还需要将 flink-table-planner-loader 与 opt/ 目录下的 flink-table-planner JAR 包做个替换,然后启动 YARN session 集群。Session 集群启动后,可在 yarn 上看到 Flink 的 session application。

6dc3813aebea8b4fa22d51425461a976.png

在启动 SQLGateway 之前,需要先修改配置,主要配置 HiveServer2 Endpoint 相关的信息。

4b5a6bc8e15207550922a46ec298967b.png

27c6e5a557d183d07be63460b8cde483.png

此处 SQLGateway 的 endpoint type 是 HiveServer2,以及需要额外设置三个配置:HiveServer2 的 hive-conf-dir、thrift.host 以及 thrift.port。这里注意我们启动的端口号是 20002。然后通过 sql-gateway.sh start 命令启动 SQL Gateway 服务。

28c97131d59640c69751f4f9f015c629.jpeg

启动后便可以进行迁移。因为 HiveServer2 运行在同一台机器上,因此只需修改端口号即可。将此处 10000 端口号改为刚刚启动的 20002 端口号,即 Flink SQLGateway 的端口,无需进行任何其他改动。重启 interpreter,迁移完成!

接着我们可以在 Zeppelin 中重新执行一遍刚刚的 Hive SQL 语句,可以发现结果都是一致的。

f71abffe1d5d068fb50c52046cbc1f0d.png

db69d61c17dbf4a57ddcd62a5714d76a.png

如上图所示,是查询每个商店在周日的销售总额的结果,其饼图结果与使用 Hive 引擎查询的结果完全一致,不同的是这次的查询是运行在 Flink 引擎之上。

Hive SQL 迁移到 Flink SQL 后不仅能获得更好的性能,还能获得 Flink SQL 提供的额外能力,包括更丰富的联邦查询和流批一体能力。

a1db6966e61615a8a4c2df4b61bb2da3.png

我们可以用 Flink DDL 创建新的 catalog,比如 MySQL 表里还有新的额外的维度信息,不在 Hive 中,希望关联它做新数据的探查。只需使用 Flink 的 CREATE CATALOG 语句创建 MySQL catalog,即可实现联邦查询。同时,Flink 会将能下推的 projection、filter 等都下推到 MySQL 进行裁剪。

65dbe007acf096369c9e6e2cfd7ebccd.png

除此之外,也可以使用 Hive SQL 体验流计算的能力。使用 Flink 语法创建一张 datagen 表,该表会源源不断产生随机数据。再切回 Hive 语法创建一张 Hive 结果表 sink。将运行模式改为 streaming,执行 insert into 语句,便提交了一个流作业,该作业会源源不断地将 datagen 中生成的数据流式地写入 Hive。

8e1c5c297d0713c82bf146b7109f6ba6.png

b8cbd4a5c29bd9060ec574dfc828e780.png

8043a556c50edcb2a12e6275fdc65e48.png

为了验证 Hive 结果表一直在被流作业写入数据,我们也可以用 Hive 语法查询写入的表。如上图所示,通过不断执行 count(*) 语句,可以看到该表一直在写入数据,因此查询结果会不断变化。

05

未来规划

f6a63e700901cab9fc60e4bb6f81fdbe.jpeg

未来,Flink 将在以下三个方面持续演进:

  • 第一,持续在 batch 上做更多尝试和投入,提升 batch 的稳定性和性能,目标是短期内能够追齐主流的批计算引擎。

  • 第二,完善数据湖的分析,比如更高效的批式数据湖读写、查询优化下推、列存上的读写优化,Iceberg、Hudi 以及 Flink Table Store 的支持等。另外,也会提供丰富的湖上数据查询和管理功能,比如查询快照版本的能力、查询元数据、更丰富的 DML 语法(UPDATE、DELETE、MERGE INTO)以及管理湖上数据 CALL 命令等。

  • 第三,Flink Batch 生态建设,包括进一步完善 Remote Shuffle Service、血缘管理等。

Q&A

Q:Hive 写通过 Flink 执行,如果 Hive 数据量特别大,是否会出现内存不足、OOM 等报错?

A:目前 Flink 执行 batch 模式下,基本所有算子里都有内存管理机制。数据不是以 Java 对象的方式存在 Flink,而是在 Java 内存里面开辟了单独的内存空间供其使用。如果内存满,则会做落盘、spill,速度可能会略微下降,但一般不会发生内存 OOM。

Q:Flink 是否支持 Hive 自定义 UDF 函数?迁移成本如何?

A:支持,可直接迁移。

Q:现有的离线数仓从 Hive 迁到 Flink 是否存在风险?平滑迁移的注意点有哪些?

A:平滑迁移目前大多使用双跑平台,通过机制挑选出部分作业先进行迁移,迁移的作业在两个平台同时运行,因此需要验证行为、结果是否一致,然后逐渐将老平台的作业下线,变为单跑。整个过程需要循序渐进,通常需要半年至一年的时间。

Q:Demo 中有一个 SQL 查询使用了 Hive on MR 引擎,迁移之后是走 Flink SQLGateway 还是 Hive on MR 模式?

A:迁移后,因为配置的端口是 Flink SQL Gateway 的端口,所以 SQL 请求走的是 Flink SQL Gateway,Gateway 会将 Hive SQL 编译成 Flink 作业提交到 YARN 集群上运行。

Q:Flink 运行批量任务时,TaskManager 个数是我们指定还是自动生成?

A:对于 standalone 模式,包括运行在 k8s 上的 standalone 模式,TM 数量由用户指定。其他模式下,TM 数量都由 Flink 自己决定和拉起,包括 yarn/k8s application 模式,yarn session 模式, yarn per-job 模式,native k8s session 模式。拉起的 TM 数量和作业请求的 slot 数相关,taskmanager.numberOfTaskSlots 参数决定了 slot 与 TM 个数的映射关系,slot 数量则和被调度的作业节点的并发度相关。

Q:Flink 运行在 K8S 上时,如果启用了动态资源分配,shuffle 数据会一直保存在 POD 磁盘上吗?

A:可以选择,可以 on TM 也可以 on RemoteShuffleService。

Q:离线作业迁移后,是否还支持 with as 语法以及 partition by 语法?

A:WITH AS 语法依然支持,CREATE TABLE 中的 PARTITIONED BY 语法也仍然支持。

Flink 1.16.0 已如期发布,欢迎大家体验和使用 Hive SQL 迁移的能力,也欢迎加入【Flink Batch 交流群】交流和反馈相关的问题和想法。

9653529aa7e4e4c036ed89345cfe74ea.jpeg

▼ 关注「Apache Flink」,获取更多技术干货 ▼

ee7716bf304614473e38a8c30f2800fb.png

 17359b83c30bbe2059180d918dfcda83.gif  点击「阅读原文」,获取演讲 PPT

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/90459.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

班级网页制作 HTML个人网页设计 我的班级网站设计与实现 大学生简单班级静态HTML网页设计作品 DIV布局班级网页模板代码 DW学生校园网站制作成品下载

🎉精彩专栏推荐👇🏻👇🏻👇🏻 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 💂 作者主页: 【主页——🚀获取更多优质源码】 🎓 web前端期末大作业…

【数据库数据恢复】SQL server数据库被加密怎么恢复数据?

SQL server数据库故障: SQL server数据库和备份文件被加密,无法使用。数据库MDF、LDF、log日志文件名字被修改。 SQL server数据库数据恢复过程: 1、首先对故障数据库所涉及到的硬盘进行镜像备份,避免对原始数据造成二次破坏&…

python大作业高分项目--射击闯关游戏

项目功能: 地图编辑器:可以实现玩家自己定义每一关卡的样式和难易程度 运行界面:实现了玩家的移动,跳跃,发射子弹,投掷手雷,以及敌人的AL(移动,发射子弹,扔…

Word处理控件Aspose.Words功能演示:在 Python 中将 HTML 转换为 PNG、JPEG、BMP、GIF 或 TIFF 图像

Aspose API支持流行文件格式处理,并允许将各类文档导出或转换为固定布局文件格式和最常用的图像/多媒体格式。 HTML (超文本标记语言)是所有浏览器都支持的主要网页文件格式。它经常用于将数据和信息显示为网页。在某些情况下,我们…

迎接工业互联网的龙卷风暴,软通动力绘制了一张转型地图

《绿野仙踪》一书的开始,主角多萝西被一股龙卷风卷起来,从此离开了平凡无奇的堪萨斯州,来到神奇的奥兹国。这种让人一步登天、进入仙境的“龙卷风暴”,也在科技行业不停上演。在微型计算机和个人电脑PC这两场大型龙卷风市场中&…

cdr最新软件下载2023中文版电脑64位免费安装包

CorelDRAW Graphics Suite2023涵盖了全部CorelDRAW图形处理组件,是一款智能高效的平面设计软件,广泛应用于排版印刷、矢量图形编辑及网页设计等领域,30多年来无数优秀的设计师通过CorelDRAW大胆展现真我,交付了出众的创意作品&…

web前端期末大作业——仿小米商城电商平台(6页) html+css+javascript网页设计实例 企业网站制作

HTML实例网页代码, 本实例适合于初学HTML的同学。该实例里面有设置了css的样式设置,有div的样式格局,这个实例比较全面,有助于同学的学习,本文将介绍如何通过从头开始设计个人网站并将其转换为代码的过程来实践设计。 ⚽精彩专栏推荐&#x1…

【面试】RabbitMQ面试题答案整理

1、RabbitMQ routing路由模式 1、 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息; 2、 根据业务功能定义路由字符串 3、 从系统的…

关于IPv6升级改造的政策文件汇总-中科三方

一、《推进互联网协议第六版(IPv6)规模部署行动计划》 发布时间:2017年11月 发文单位:中共中央办公厅、国务院办公厅 主要内容:用5到10年时间,形成下一代互联网自主技术体系和产业生态,建成全…

Matlab论文插图绘制模板第70期—带误差棒的柱状图(Bar with Errorbar)

在之前的文章中,分享了一系列Matlab柱状图的绘制模板: 这一次,再来分享一种特殊的柱状图:带误差棒的柱状图。 先来看一下成品效果: 特别提示:Matlab论文插图绘制模板系列,旨在降低大家使用Matl…

直播倒计时 1 天 | SOFAChannel#31 RPC 框架设计的考和量

SOFARPC 是蚂蚁集团开源的一款基于 Java 实现的 RPC 服务框架,为应用之间提供远程服务调用能力,具有高可伸缩性,高容错性,目前蚂蚁集团所有的业务的相互间的 RPC 调用都是采用 SOFARPC。SOFARPC 为用户提供了负载均衡,…

全网唯一,不忽悠的ChatGPT

Datawhale干货 作者:Ben,中山大学,Datawhale成员最近ChatGPT火出圈了,它和前阵子的Stable Diffusion(AIGC)一样成为社交媒体上人们津津乐道的话题。“ChatGPT要取代谷歌搜索了?”“ChatGPT要让程…

制作 Python Docker 镜像的最佳实践

概述 📚️Reference: 制作容器镜像的最佳实践 这篇文章是关于制作 Python Docker 容器镜像的最佳实践。(2022 年 12 月更新) 最佳实践的目的一方面是为了减小镜像体积,提升 DevOps 效率,另一方面是为了提高安全性。希望对各位有所…

无忧·企业文档,为企业客户的场景而思考

作为无忧企业文档的设计者,常常被问到一个问题,这个和腾X文档有什么区别?其实这个问题我口头回答了很多次,这次,我将这个问题做个记录与分析。 主要是有以下几个方面不同: 1、客户定位不同 2、核心功能不…

Java基础疑难点梳理(泛型到反射9章内容)

文章目录1. 泛型2. 基础类库3. 集合4. 异常5. 注解6. JDBC7. IO流8. 网络编程9. 类加载和反射1. 泛型 静态方法的形参,静态变量,静态代码块中不能使用泛型(因为即使泛型不同,还是同一个类,静态变量是属于类的&#xff…

锂电池实验室规划设计方案 | 喜格SICOLAB

锂电池实验室规划设计方案 | 喜格SICOLAB 锂电池生产基本工序 正负极混料、涂布、辊压、干燥、裁切、卷绕或叠片、封装、注液、化成、检测、出货等。锂电池实验室设计标准 1、《电子工业洁净厂房设计规范》GB50472-2008 2、《锂离子电池工厂设计标准》GB 51377-2019 《工业建筑…

在 Python 中使用 cv2 进行嗜睡检测

大家好,在这个博客中,我们将构建一个嗜睡检测应用程序,它将检测视频中的人是否变得昏昏欲睡。这是一个非常有趣且简单的项目,代码甚至不到 80 行,让我们开始吧看看最终输出注意——你不会在这里听到警报声,…

Stm32旧版库函数12——定时器的使用\time1

#include "stm32f10x_lib.h" #include "usart.h" #include "delay.h" #include "sys_config.h" #include <math.h> //Keil library #define uchar unsigned char #define uint unsigned int //通用定时器中断初始…

SpringCloud(6)— RabbitMQ(消息队列)

SpringCloud&#xff08;6&#xff09;— RabbitMQ(消息队列) 一 初识MQ 1.同步通信与异步通信 1.同步通信的问题 同步调用的优点在于时效性高&#xff0c;可以立即得到结果 微服务之间基于Feign的调用属于同步方式&#xff0c;存在一些问题 耦合性&#xff1a;业务较多时&a…

硅片检测半导体运动台控制器的设计

多自由度精密运动平台是半导体行业中硅片制造和检测过程里至关重要的问题&#xff0c;采用直线电机和音圈电机等先进驱动方式的精密运动平台相对传统滚珠丝杠旋转电机运动的运动平台&#xff0c;具有精度高、响应快、寿命长、免维护和结构简单等诸多优点&#xff0c;优势十分明…