33、Flink之hive介绍与简单示例

news2025/1/10 23:35:47

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs

30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)

33、Flink之hive介绍与简单示例

42、Flink 的table api与sql之Hive Catalog


文章目录

  • Flink 系列文章
  • 一、Table & SQL Connectors 示例: Apache Hive
    • 1、支持的Hive版本
    • 2、依赖项
      • 1)、使用 Flink 提供的 Hive jar
      • 2)、用户定义的依赖项
      • 3)、移动 planner jar 包
    • 3、Maven 依赖
    • 4、连接到Hive
    • 5、DDL&DML


本文介绍了Apache Hive连接器的使用,以具体的示例演示了通过java和flink sql cli创建catalog。
本文依赖环境是hadoop、zookeeper、hive、flink环境好用,本文内容以flink1.17版本进行介绍的,具体示例是在1.13版本中运行的(因为hadoop集群环境是基于jdk8的,flink1.17版本需要jdk11)。
更多的内容详见后续关于hive的介绍。

一、Table & SQL Connectors 示例: Apache Hive

Apache Hive 已经成为了数据仓库生态系统中的核心。 它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样它也是一个数据管理平台,可用于发现,定义,和演化数据。

Flink 与 Hive 的集成包含两个层面。

一是利用了 Hive 的 MetaStore 作为持久化的 Catalog,用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。

二是利用 Flink 来读写 Hive 的表。

HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以"开箱即用"的访问其已有的 Hive 数仓。 您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

关于flink与hive集成的部分请参考:42、Flink 的table api与sql之Hive Catalog

1、支持的Hive版本

Flink 支持以下的 Hive 版本。

  • 2.3
    2.3.0
    2.3.1
    2.3.2
    2.3.3
    2.3.4
    2.3.5
    2.3.6
    2.3.7
    2.3.8
    2.3.9
  • 3.1
    3.1.0
    3.1.1
    3.1.2
    3.1.3

某些功能是否可用取决于您使用的 Hive 版本,这些限制不是由 Flink 所引起的:

  • Hive 内置函数在使用 Hive-2.3.0 及更高版本时支持。
  • 列约束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本时支持。
  • 更改表的统计信息,在使用 Hive-2.3.0 及更高版本时支持。
  • DATE列统计信息,在使用 Hive-2.3.0 及更高版时支持。

2、依赖项

要与 Hive 集成,您需要在 Flink 下的/lib/目录中添加一些额外的依赖包, 以便通过 Table API 或 SQL Client 与 Hive 进行交互。 或者,您可以将这些依赖项放在专用文件夹中,并分别使用 Table API 程序或 SQL Client 的-C或-l选项将它们添加到 classpath 中。

Apache Hive 是基于 Hadoop 之上构建的, 首先您需要 Hadoop 的依赖,请参考 Providing Hadoop classes:

export HADOOP_CLASSPATH=`hadoop classpath`

有两种添加 Hive 依赖项的方法。第一种是使用 Flink 提供的 Hive Jar包。您可以根据使用的 Metastore 的版本来选择对应的 Hive jar。第二个方式是分别添加每个所需的 jar 包。如果您使用的 Hive 版本尚未在此处列出,则第二种方法会更适合。

注意:建议您优先使用 Flink 提供的 Hive jar 包。仅在 Flink 提供的 Hive jar 不满足您的需求时,再考虑使用分开添加 jar 包的方式。

1)、使用 Flink 提供的 Hive jar

下表列出了所有可用的 Hive jar。您可以选择一个并放在 Flink 发行版的/lib/ 目录中。
在这里插入图片描述

2)、用户定义的依赖项

您可以在下方找到不同Hive主版本所需要的依赖项。

  • Hive 2.3.4
/flink-1.17.1
   /lib

       // Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jars
       flink-connector-hive_2.12-1.17.1.jar

       // Hive dependencies
       hive-exec-2.3.4.jar

       // add antlr-runtime if you need to use hive dialect
       antlr-runtime-3.5.2.jar
  • Hive 3.1.0
/flink-1.17.1
   /lib

       // Flink's Hive connector
       flink-connector-hive_2.12-1.17.1.jar

       // Hive dependencies
       hive-exec-3.1.0.jar
       libfb303-0.9.3.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately

       // add antlr-runtime if you need to use hive dialect
       antlr-runtime-3.5.2.jar

3)、移动 planner jar 包

把 FLINK_HOME/opt 下的 jar 包 flink-table-planner_2.12-1.17.1.jar 移动到 FLINK_HOME/lib 下,并且将 FLINK_HOME/lib 下的 jar 包 flink-table-planner-loader-1.17.1.jar 移出去。 具体原因请参见 FLINK-25128。你可以使用如下命令来完成移动 planner jar 包的工作:

mv $FLINK_HOME/opt/flink-table-planner_2.12-1.17.1.jar $FLINK_HOME/lib/flink-table-planner_2.12-1.17.1.jar
mv $FLINK_HOME/lib/flink-table-planner-loader-1.17.1.jar $FLINK_HOME/opt/flink-table-planner-loader-1.17.1.jar

只有当要使用 Hive 语法 或者 HiveServer2 endpoint, 你才需要做上述的 jar 包移动。 但是在集成 Hive 的时候,推荐进行上述的操作。

3、Maven 依赖

如果您在构建自己的应用程序,则需要在 mvn 文件中添加以下依赖项。 您应该在运行时添加以上的这些依赖项,而不要在已生成的 jar 文件中去包含它们。

<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.12</artifactId>
  <version>1.17.1</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  <version>1.17.1</version>
  <scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
</dependency>

4、连接到Hive

通过 TableEnvironment 或者 YAML 配置,使用 Catalog 接口 和 HiveCatalog连接到现有的 Hive 集群。

以下是如何连接到 Hive 的示例:

  • java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name            = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir     = "/opt/hive-conf";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");

----------------------示例----------------------------
import java.util.List;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;

/**
 * @author alanchan
 *
 */
public class TestHiveCatalogDemo {

	/**
	 * @param args
	 * @throws DatabaseNotExistException 
	 * @throws CatalogException 
	 */
	public static void main(String[] args) throws CatalogException, DatabaseNotExistException {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		String name = "alan_hive";
		// testhive 数据库名称
		String defaultDatabase = "testhive";
		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";

		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
		tenv.registerCatalog("alan_hive", hiveCatalog);
		// 使用注册的catalog
		tenv.useCatalog("alan_hive");

		List<String> tables = hiveCatalog.listTables(defaultDatabase); 
		for (String table : tables) {
			System.out.println("Database:testhive  tables:" + table);
		}
	}

}
  • sql
CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'mydatabase',
    'hive-conf-dir' = '/opt/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;

------------------具体示例如下----------------------------
Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in set

Flink SQL> CREATE CATALOG alan_hivecatalog WITH (
>     'type' = 'hive',
>     'default-database' = 'testhive',
>     'hive-conf-dir' = '/usr/local/bigdata/apache-hive-3.1.2-bin/conf'
> );
[INFO] Execute statement succeed.

Flink SQL> show catalogs;
+------------------+
|     catalog name |
+------------------+
| alan_hivecatalog |
|  default_catalog |
+------------------+
2 rows in set

Flink SQL> use alan_hivecatalog;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: A database with name [alan_hivecatalog] does not exist in the catalog: [default_catalog].

Flink SQL> use catalog alan_hivecatalog;
[INFO] Execute statement succeed.

Flink SQL> show tables;
+-----------------------------------+
|                        table name |
+-----------------------------------+
| alan_hivecatalog_hivedb_testtable |
|                         apachelog |
|                          col2row1 |
|                          col2row2 |
|                       cookie_info |
|                              dual |
|                         dw_zipper |
|                               emp |
|                          employee |
|                  employee_address |
|               employee_connection |
|                 ods_zipper_update |
|                          row2col1 |
|                          row2col2 |
|                            singer |
|                           singer2 |
|                           student |
|                      student_dept |
|               student_from_insert |
|                      student_hdfs |
|                    student_hdfs_p |
|                      student_info |
|                     student_local |
|                 student_partition |
|              t_all_hero_part_msck |
|                     t_usa_covid19 |
|                   t_usa_covid19_p |
|                              tab1 |
|                         tb_dept01 |
|                    tb_dept_bucket |
|                            tb_emp |
|                          tb_emp01 |
|                     tb_emp_bucket |
|                     tb_json_test1 |
|                     tb_json_test2 |
|                          tb_login |
|                      tb_login_tmp |
|                          tb_money |
|                      tb_money_mtn |
|                            tb_url |
|              the_nba_championship |
|                             tmp_1 |
|                        tmp_zipper |
|                         user_dept |
|                     user_dept_sex |
|                             users |
|                 users_bucket_sort |
|                   website_pv_info |
|                  website_url_info |
+-----------------------------------+
49 rows in set

  • ymal
execution:
    ...
    current-catalog: alan_hivecatalog  # set the HiveCatalog as the current catalog of the session
    current-database: testhive
    
catalogs:
   - name: alan_hivecatalog  
     type: hive
     hive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf

下表列出了通过 YAML 文件或 DDL 定义 HiveCatalog 时所支持的参数。

在这里插入图片描述

5、DDL&DML

在 Flink 中执行 DDL 操作 Hive 的表、视图、分区、函数等元数据时,参考:33、Flink之hive
Flink 支持 DML 写入 Hive 表,请参考:33、Flink之hive
以上,介绍了Apache Hive连接器的使用,以具体的示例演示了通过java和flink sql cli创建catalog。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/953484.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

油耳的人适合戴哪种耳机,油耳戴什么耳机不会掉

长时间佩戴入耳式耳机导致油耳朵&#xff1f;甚至耳朵受伤。这似乎是一个不可避免的问题&#xff0c;但不同耳机对听力的伤害程度却不尽相同。然而&#xff0c;目前为止&#xff0c;骨传导耳机是唯一一种在相同音量下对听力损伤最低的选择。这种耳机通过骨骼传递声音&#xff0…

亮数据:以色列一家让人向往的互联网公司,很强

大家好&#xff0c;我是二哥呀&#xff01; 今天再给大家推荐一家小而美的互联网公司——亮数据&#xff0c;成立于 2014 年&#xff0c;总部设在以色列&#xff0c;全球目前约有 500 名员工&#xff0c;但却有超过 2000 个专利申报和计数。 我在他们的官网看到这样一段介绍&…

关于Echarts 绘制玫瑰图 (笔记)

目录 基于js文件绘图 基于vue3绘制玫瑰图 基于js文件绘图 // 定义一个配置对象 var option {// 图例设置legend: {top: bottom},// 工具栏设置toolbox: {show: true,feature: {mark: { show: true }, // 标记工具dataView: { show: true, readOnly: false }, // 数据视图工具r…

软件安全检测有哪些测试内容?怎么做

安全测试 信息安全检测是一个统称的概念&#xff0c;其概念的提出对于规范和明确信息安全日常工作具有重要作用。一般企业会委托第三方检测机构进行信息安全检测并且出具相关的软件安全检测报告。 信息安全检测依据是什么 根据国家标准、行业标准、地方标准或相关技术规范&a…

不会电脑分区?看这里,教你轻松学会!

随着计算机技术的不断发展&#xff0c;电脑分区成为管理和优化硬盘空间的重要手段之一。它是对硬盘进行逻辑划分的过程&#xff0c;通过将硬盘分成不同的区域&#xff0c;提高数据访问效率&#xff0c;同时保护系统和用户数据的安全性。本文将为您介绍两种常用的电脑分区方法&a…

螺线管线圈的用途是什么

螺线管线圈是一种电子元器件&#xff0c;通常用于电感器和变压器。螺线管线圈可以是单层的或多层的&#xff0c;并且可以根据特定的电气参数进行设计。它们被广泛应用于电子设备和通信系统中&#xff0c;以满足各种应用的要求。 螺线管线圈主要用于电感器和变压器。电感器是一种…

科技云报道:AI+云计算共生共长,能否解锁下一个高增长空间?

科技云报道原创。 在过去近一年的时间里&#xff0c;AI大模型从最初的框架构建&#xff0c;逐步走到落地阶段。 然而&#xff0c;随着AI大模型深入到千行百业中&#xff0c;市场开始意识到通用大模型虽然功能强大&#xff0c;但似乎并不能完全满足不同企业的个性化需求。 大…

开源项目-内容管理系统

哈喽,大家好,今天给大家带来一个开源项目-内容管理系统。项目通过SpringBoot实现 主要功能有 - 内容:发帖、评论、帖子分类、分页、回帖统计、访问统计、表单验证 - 用户:权限、资料、头像、邮箱验证 - 管理:后台管理、统计图表、帖子/分类管理 注册 通过用户名,邮箱,…

实现基于栈的表达式求值计算器(难度4/10)

本作业主要考察&#xff1a;解释器模式的实现思想/栈结构在表达式求值方面的绝对优势 C数据结构与算法夯实基础作业列表 通过栈的应用&#xff0c;理解特定领域设计的关键作用&#xff0c;给大家眼前一亮的感觉。深刻理解计算机语言和人类语言完美结合的杰作。是作业中的上等…

长胜证券:华为“黑科技”点燃A股炒作激情

8月29日&#xff0c;在未举行相关发布会的情况下&#xff0c;华为新款手机Mate60Pro悄然上线开售&#xff0c;并在一小时内售罄。 金融出资报记者注意到&#xff0c;跟着商场对新机重视的继续发酵&#xff0c;其中的各种技能打破也愈加受到重视&#xff0c;其影响很快扩散到资…

前端(十五)——开源一个用react封装的图片预览组件

&#x1f475;博主&#xff1a;小猫娃来啦 &#x1f475;文章核心&#xff1a;开源一个react封装的图片预览组件 文章目录 组件开源代码下载地址运行效果展示实现思路使用思路和api实现的功能数据和入口部分代码展示 组件开源代码下载地址 Gitee&#xff1a;点此跳转下载 CSDN…

叉式移动机器人(AGV/AMR)解决方案

叉式移动机器人&#xff08;AGV/AMR&#xff09;是在叉车上加载各种导引技术&#xff0c;构建地图算法&#xff0c;辅以避障安全技术&#xff0c;实现叉车的无人化作业。 ▲ 叉式移动机器人无人化作业 ▲叉式移动机器人常见车型 叉式移动机器人在行业初期&#xff0c;各机构、…

Hugging Face--Transformers

pipeline 在这里插入图片描述 AutoClass AutoClass 是一个能够通过预训练模型的名称或路径自动查找其架构的快捷方式. 你只需要为你的任务选择合适的 AutoClass 和它关联的预处理类。 AutoTokenizer AutoModel 保存模型 自定义模型构建 Trainer - PyTorch优化训练循环 参考资…

概念解析 | 量子时代的灵感:探索量子感知技术

注1:本文系“概念解析”系列之一,致力于简洁清晰地解释、辨析复杂而专业的概念。本次辨析的概念是:量子感知技术。 量子时代的灵感:探索量子感知技术 量子感知技术是一个充满希望和挑战的新兴领域。在此,我们将深入探讨这个主题,概述其背景,解释其工作原理,讨论现有的…

AI绘画:多巴胺3D女孩风格分享【附Midjourney关键词Prompt】

文末附完整的教程&#xff0c;已经放在网盘&#xff0c;需要的自己下载 今年&#xff0c;多巴胺风格大火特火&#xff01; 你知道吗&#xff0c;这种色彩斑斓&#xff0c;一看就心情大好的风格&#xff0c;简直就像是甜到心坎的糖果。 而其中最受欢迎的&#xff0c;就是那些多…

服务器数据恢复- RAID5出现故障后恢复数据和操作系统的案例

服务器数据恢复环境&#xff1a; 某品牌服务器中有4块SAS硬盘组建了一组RAID5阵列&#xff0c;另外1块磁盘作为热备盘使用。上层操作系统为redhat linux&#xff0c;部署了一个数据库是oracle的OA。 服务器故障&初检&#xff1a; RAID5中一块磁盘离线后热备盘未自动激活re…

【Mysql问题集锦】:Can‘t create table ‘#sql-58d7_431d‘ (errno: 28)

问题描述&#xff1a; 问题原因&#xff1a; OSError: [Errno 28] No space left on device&#xff0c;即&#xff1a;磁盘空间不足&#xff0c;无法创建文件。因此&#xff0c;导致Mysql无法执行SQL语句。 问题解法&#xff1a; Step 1&#xff0c;查看有哪些目录占用了大量…

抖音企业号无需API开发连接AI图像生成,打造AI智能绘图助手

1. 抖音用户使用场景&#xff1a; 作为抖音企业号的运营人员&#xff0c;我们一直在寻找新的方式来增强我们与用户之间的互动。最近&#xff0c;我们发现了AI绘图技术可以根据用户需求和指令自动创建图片&#xff0c;无需人为干预&#xff0c;这为我们节省了人力和时间。因此&a…

如何设计微服务

一、序幕 最近在思考&#xff0c;自己哪些不足&#xff0c;需要学习点什么&#xff1f;看着Java基础知识&#xff0c;千遍一律&#xff0c;没有太大的动力需深挖&#xff0c;只能在写业务项目的时候边写边思考边夯实自己的基础。于是看了网上的一些资料&#xff0c;结合以前面试…

气传导耳机哪个品牌比较好?市面上爆款气传导耳机推荐!

​随着人们对运动健康的重视&#xff0c;越来越多的人开始关注运动时佩戴的耳机。然而&#xff0c;传统的耳机在运动时往往会受到限制&#xff0c;而气传导耳机则可以避免这些问题&#xff0c;为用户带来更加舒适和便捷的使用体验。今天就来展开说说&#xff0c;市面上气传导耳…