16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

news2025/1/16 1:07:31

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs

30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)


文章目录

  • Flink 系列文章
  • 一、Table & SQL Connectors 示例: Apache Hive
    • 1、支持的Hive版本
    • 2、依赖项
      • 1)、使用 Flink 提供的 Hive jar
      • 2)、用户定义的依赖项
      • 3)、移动 planner jar 包
    • 3、Maven 依赖
    • 4、连接到Hive
    • 5、DDL&DML


本文介绍了Apache Hive连接器的使用,以具体的示例演示了通过java和flink sql cli创建catalog。
本文依赖环境是hadoop、zookeeper、hive、flink环境好用,本文内容以flink1.17版本进行介绍的,具体示例是在1.13版本中运行的(因为hadoop集群环境是基于jdk8的,flink1.17版本需要jdk11)。
更多的内容详见后续关于hive的介绍。

一、Table & SQL Connectors 示例: Apache Hive

Apache Hive 已经成为了数据仓库生态系统中的核心。 它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样它也是一个数据管理平台,可用于发现,定义,和演化数据。

Flink 与 Hive 的集成包含两个层面。

一是利用了 Hive 的 MetaStore 作为持久化的 Catalog,用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。

二是利用 Flink 来读写 Hive 的表。

HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以"开箱即用"的访问其已有的 Hive 数仓。 您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

1、支持的Hive版本

Flink 支持以下的 Hive 版本。

  • 2.3
    2.3.0
    2.3.1
    2.3.2
    2.3.3
    2.3.4
    2.3.5
    2.3.6
    2.3.7
    2.3.8
    2.3.9
  • 3.1
    3.1.0
    3.1.1
    3.1.2
    3.1.3

某些功能是否可用取决于您使用的 Hive 版本,这些限制不是由 Flink 所引起的:

  • Hive 内置函数在使用 Hive-2.3.0 及更高版本时支持。
  • 列约束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本时支持。
  • 更改表的统计信息,在使用 Hive-2.3.0 及更高版本时支持。
  • DATE列统计信息,在使用 Hive-2.3.0 及更高版时支持。

2、依赖项

要与 Hive 集成,您需要在 Flink 下的/lib/目录中添加一些额外的依赖包, 以便通过 Table API 或 SQL Client 与 Hive 进行交互。 或者,您可以将这些依赖项放在专用文件夹中,并分别使用 Table API 程序或 SQL Client 的-C或-l选项将它们添加到 classpath 中。

Apache Hive 是基于 Hadoop 之上构建的, 首先您需要 Hadoop 的依赖,请参考 Providing Hadoop classes:

export HADOOP_CLASSPATH=`hadoop classpath`

有两种添加 Hive 依赖项的方法。第一种是使用 Flink 提供的 Hive Jar包。您可以根据使用的 Metastore 的版本来选择对应的 Hive jar。第二个方式是分别添加每个所需的 jar 包。如果您使用的 Hive 版本尚未在此处列出,则第二种方法会更适合。

注意:建议您优先使用 Flink 提供的 Hive jar 包。仅在 Flink 提供的 Hive jar 不满足您的需求时,再考虑使用分开添加 jar 包的方式。

1)、使用 Flink 提供的 Hive jar

下表列出了所有可用的 Hive jar。您可以选择一个并放在 Flink 发行版的/lib/ 目录中。
在这里插入图片描述

2)、用户定义的依赖项

您可以在下方找到不同Hive主版本所需要的依赖项。

  • Hive 2.3.4
/flink-1.17.1
   /lib

       // Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jars
       flink-connector-hive_2.12-1.17.1.jar

       // Hive dependencies
       hive-exec-2.3.4.jar

       // add antlr-runtime if you need to use hive dialect
       antlr-runtime-3.5.2.jar
  • Hive 3.1.0
/flink-1.17.1
   /lib

       // Flink's Hive connector
       flink-connector-hive_2.12-1.17.1.jar

       // Hive dependencies
       hive-exec-3.1.0.jar
       libfb303-0.9.3.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately

       // add antlr-runtime if you need to use hive dialect
       antlr-runtime-3.5.2.jar

3)、移动 planner jar 包

把 FLINK_HOME/opt 下的 jar 包 flink-table-planner_2.12-1.17.1.jar 移动到 FLINK_HOME/lib 下,并且将 FLINK_HOME/lib 下的 jar 包 flink-table-planner-loader-1.17.1.jar 移出去。 具体原因请参见 FLINK-25128。你可以使用如下命令来完成移动 planner jar 包的工作:

mv $FLINK_HOME/opt/flink-table-planner_2.12-1.17.1.jar $FLINK_HOME/lib/flink-table-planner_2.12-1.17.1.jar
mv $FLINK_HOME/lib/flink-table-planner-loader-1.17.1.jar $FLINK_HOME/opt/flink-table-planner-loader-1.17.1.jar

只有当要使用 Hive 语法 或者 HiveServer2 endpoint, 你才需要做上述的 jar 包移动。 但是在集成 Hive 的时候,推荐进行上述的操作。

3、Maven 依赖

如果您在构建自己的应用程序,则需要在 mvn 文件中添加以下依赖项。 您应该在运行时添加以上的这些依赖项,而不要在已生成的 jar 文件中去包含它们。

<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.12</artifactId>
  <version>1.17.1</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  <version>1.17.1</version>
  <scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
</dependency>

4、连接到Hive

通过 TableEnvironment 或者 YAML 配置,使用 Catalog 接口 和 HiveCatalog连接到现有的 Hive 集群。

以下是如何连接到 Hive 的示例:

  • java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name            = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir     = "/opt/hive-conf";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");

----------------------示例----------------------------
import java.util.List;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;

/**
 * @author alanchan
 *
 */
public class TestHiveCatalogDemo {

	/**
	 * @param args
	 * @throws DatabaseNotExistException 
	 * @throws CatalogException 
	 */
	public static void main(String[] args) throws CatalogException, DatabaseNotExistException {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		String name = "alan_hive";
		// testhive 数据库名称
		String defaultDatabase = "testhive";
		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";

		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
		tenv.registerCatalog("alan_hive", hiveCatalog);
		// 使用注册的catalog
		tenv.useCatalog("alan_hive");

		List<String> tables = hiveCatalog.listTables(defaultDatabase); 
		for (String table : tables) {
			System.out.println("Database:testhive  tables:" + table);
		}
	}

}
  • sql
CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'mydatabase',
    'hive-conf-dir' = '/opt/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;

------------------具体示例如下----------------------------
Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in set

Flink SQL> CREATE CATALOG alan_hivecatalog WITH (
>     'type' = 'hive',
>     'default-database' = 'testhive',
>     'hive-conf-dir' = '/usr/local/bigdata/apache-hive-3.1.2-bin/conf'
> );
[INFO] Execute statement succeed.

Flink SQL> show catalogs;
+------------------+
|     catalog name |
+------------------+
| alan_hivecatalog |
|  default_catalog |
+------------------+
2 rows in set

Flink SQL> use alan_hivecatalog;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: A database with name [alan_hivecatalog] does not exist in the catalog: [default_catalog].

Flink SQL> use catalog alan_hivecatalog;
[INFO] Execute statement succeed.

Flink SQL> show tables;
+-----------------------------------+
|                        table name |
+-----------------------------------+
| alan_hivecatalog_hivedb_testtable |
|                         apachelog |
|                          col2row1 |
|                          col2row2 |
|                       cookie_info |
|                              dual |
|                         dw_zipper |
|                               emp |
|                          employee |
|                  employee_address |
|               employee_connection |
|                 ods_zipper_update |
|                          row2col1 |
|                          row2col2 |
|                            singer |
|                           singer2 |
|                           student |
|                      student_dept |
|               student_from_insert |
|                      student_hdfs |
|                    student_hdfs_p |
|                      student_info |
|                     student_local |
|                 student_partition |
|              t_all_hero_part_msck |
|                     t_usa_covid19 |
|                   t_usa_covid19_p |
|                              tab1 |
|                         tb_dept01 |
|                    tb_dept_bucket |
|                            tb_emp |
|                          tb_emp01 |
|                     tb_emp_bucket |
|                     tb_json_test1 |
|                     tb_json_test2 |
|                          tb_login |
|                      tb_login_tmp |
|                          tb_money |
|                      tb_money_mtn |
|                            tb_url |
|              the_nba_championship |
|                             tmp_1 |
|                        tmp_zipper |
|                         user_dept |
|                     user_dept_sex |
|                             users |
|                 users_bucket_sort |
|                   website_pv_info |
|                  website_url_info |
+-----------------------------------+
49 rows in set

  • ymal
execution:
    ...
    current-catalog: alan_hivecatalog  # set the HiveCatalog as the current catalog of the session
    current-database: testhive
    
catalogs:
   - name: alan_hivecatalog  
     type: hive
     hive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf

下表列出了通过 YAML 文件或 DDL 定义 HiveCatalog 时所支持的参数。

在这里插入图片描述

5、DDL&DML

在 Flink 中执行 DDL 操作 Hive 的表、视图、分区、函数等元数据时,参考:33、Flink之hive
Flink 支持 DML 写入 Hive 表,请参考:33、Flink之hive
以上,介绍了Apache Hive连接器的使用,以具体的示例演示了通过java和flink sql cli创建catalog。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/944580.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

strtok, strtok_s(字符串分割符)和memset_s

strtok()函数的原型如下 char *__cdecl strtok(char *_String, const char *_Delimiter)strtok()函数接收两个传入参数&#xff0c;将_String中保存的字符串&#xff08;待处理字符串&#xff09;&#xff0c;按照_Delimiter中的字符作为分隔符进行分割。如果_String为空&…

亚马逊,速卖通,美客多不出单怎么办?测评补单专治出单难

亚马逊不出单怎么办? 一、广告优化 虽然现在平台内部的流量都大幅度减少&#xff0c;但是不能停止对广告的投放&#xff0c;并且更需要你花时间去优化和分析广告报表。如果广告报表显示所有的关键词曝光量都非常低&#xff0c;卖家就需要提高竞价了&#xff0c;这时候你可以…

idea配置注释模板

一、类的模板 设置里面依次找到图中标注的地方 填入 /** ${describe} author 填入你的名字 date ${YEAR}-${MONTH}-${DAY} ${TIME} version 1.0.0 */配置完成后&#xff0c;新创建的类就会自动生成类开头的注释 二、方法的注释模板 如图创建模板 步骤6中填入 *** $descrip…

AP5192 DC-DC降压恒流LED汽车头灯摩托车电动车大灯电源驱动

AP5192是一款PWM工作模式,高效率、外围简单、 内置功率MOS管&#xff0c;适用于4.5-100V输入的高精度 降压LED恒流驱动芯片。最大电流1.5A。 AP5192可实现线性调光和PWM调光&#xff0c;线性调光 脚有效电压范围0.55-2.6V. AP5192 工作频率可以通过RT 外部电阻编程 来设定&…

基于负载均衡的在线OJ实战项目

前言&#xff1a; 该篇讲述了实现基于负载均衡式的在线oj&#xff0c;即类似在线编程做题网站一样&#xff0c;文章尽可能详细讲述细节即实现&#xff0c;便于大家了解学习。 文章将采用单篇不分段形式&#xff08;ps&#xff1a;切着麻烦&#xff09;&#xff0c;附图文&#…

Linux内核源码剖析之kmem_cache_create

写在前面&#xff1a; 版本信息&#xff1a; Linux内核2.6.24&#xff08;大部分centos、ubuntu应该都在3.1。但是2.6的版本适合学习&#xff0c;后续版本本质变化也不是很大&#xff09; 一个操作系统对于内存的管理是非常的重要&#xff0c;关乎到整个系统的运行效率和内存最…

C++新经典 | C语言

目录 一、基础之查漏补缺 1.float精度问题 2.字符型数据 3.变量初值问题 4.赋值&初始化 5.头文件之<> VS " " 6.逻辑运算 7.数组 7.1 二维数组初始化 7.2 字符数组 8.字符串处理函数 8.1 strcat 8.2 strcpy 8.3 strcmp 8.4 strlen 9.函数 …

一篇掌握高级交换技术原理与配置(一):vlan聚合

一、概述 VLAN聚合&#xff08;VLAN Aggregation&#xff0c;也称Super-VLAN&#xff09;: 指在一个物理网络内&#xff0c;用多个VLAN&#xff08;称为Sub-VLAN&#xff09;隔离广播域&#xff0c;并将这些Sub-VLAN聚合成一个逻辑的VLAN&#xff08;称为Super-VLAN&#xff0…

寻找RocketMQ首席评测官【阿里云产品测评】

寻找RocketMQ首席评测官【阿里云产品测评】 前言版权推荐寻找RocketMQ首席评测官开始任务一&#xff1a;免费领取资源任务二&#xff1a;了解评测活动体验普通消息场景体验顺序消息场景体验定时消息场景体验事务消息场景体验消息堆积场景体验消息重投场景体验总结未完待续 最后…

Ubuntu升级Cmake、gcc、g++

背景 最近要安装llvm&#xff0c;我选择的是从源码安装&#xff0c;所以要使用Cmake进行构建项目。但是服务器上的Cmake、gcc、g的版本都太低了&#xff0c;不符合要求&#xff0c;所以要对此进行升级。在本博客中采用的升级方法不一定是最好的方法&#xff08;因为我也是参考…

EXCEL数据处理

1. 自定义数字格式 选中数字--右键--设置单元格格式--自定义--shang ↑ 2.条件格式 如果。。。。就。。。。 选中某列--开始--条件格式--突出显示--大于/小于/等于。。。--设置为&#xff08;可选自定义格式&#xff09; 选中区域--条件格式--清除规则--清除所选单元格的规…

外贸人看过来,这里是WhatsApp宝藏使用技巧!

如今从事外贸的宝子普遍都会用到WhatsApp这款全球即时聊天工具。其免费&#xff0c;且可以直接跟陌生人聊天&#xff0c;是一个跟海外客户建立联系的重要阵地。但是有些宝子刚接触WhatsApp&#xff0c;不知道怎么去使用&#xff0c;今天小S就整理出了几个宝藏使用技巧&#xff…

探讨C#、C++和Java这三门语言在嵌入式的地位

我理解对于初入嵌入式领域的担忧。你是想选择一款通用性最广的语言专心学习&#xff0c;但是不知如何选择&#xff0c;视频后方提供了免费的嵌入式学习资源&#xff0c;内容涵盖入门到进阶&#xff0c;需要的到后方免费获取。因为我也曾是一名计算机专业毕业生。通过一段时间的…

2023-08-29 衣品-甄别与筛选

摘要: 外在形象可以说是内在自我的具象化表现, 自我定位与自我认知的表现便是一个人的形象. 所以对于衣品的甄别, 在很大程度上是体验该衣服所表现出来与内在潜意识契合的地方. 明白了这一点, 那么在做甄别和筛选的时候就能明白很多东西. 本文一方面做一定程度的练习, 一方…

生态环境保护3D数字展厅提供了一个线上环保知识学习平台

在21世纪的今天&#xff0c;科技与环保的交汇点提供了无数令人兴奋的可能性。其中&#xff0c;生态环境保护3D数字展厅就是一个绝佳的例子。这个展厅以其独特的3D技术&#xff0c;为我们带来了一个全新的、互动的学习环境&#xff0c;让我们能够更直观地了解和理解我们的环境。…

架构师日记-软件工程里的组织文化 | 京东云技术团队

一 引言 本文是京东到家自动化测试体系建设过程中的一些回顾和总结&#xff0c;删减了部分系统设计与实践的章节&#xff0c;保留了组织与文化相关的内容&#xff0c;整理成文&#xff0c;以飨读者。 下面就以QA&#xff08;Quality Assurance&#xff09;的视角来探讨工作中经…

一文看懂java集合(图文详细)

java集合框架图 看图可知&#xff0c;主要分为两类&#xff1a;Collection 和 Map&#xff0c;Collection主要用于存储一组对象&#xff0c;Map用于存储键-值对。 对这二者再细分 Collection接口&#xff1a; Map接口 集合框架总结 一、Collection 接口的接口 对象的集合&a…

重要提醒!亚马逊卖家们需关注:Review政策发生重大变化

在电商领域&#xff0c;Review&#xff08;评价&#xff09;对卖家而言至关重要&#xff0c;是客户是否下单的核心考量之一。亚马逊的Review政策变化一直备受卖家关注&#xff0c;任何Review方面的动向都会引发卖家们的关切。近期&#xff0c;亚马逊对Review政策进行了调整&…

鸿鹄企业工程项目管理系统 Spring Cloud+Spring Boot+前后端分离构建工程项目管理系统源代码

鸿鹄工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离构建工程项目管理系统 1. 项目背景 一、随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性&#xff0c;公司对内部工程管…

ST SR5E1 22KW OBC 3KW DC DC Combo System 二合一车载充电器解决方案

ST SR5E1 22KW OBC & 3KW DC DC Combo System 二合一车载充电器解决方案 电动车内一般有两个不同电压等级的电池&#xff0c;高压电池用于驱动电机&#xff0c;低压电瓶用于车内电子设备供电&#xff0c;两个电池之间需要一个DCDC变换器来实现功率的流动&#xff0c;根据主…