44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例--网上有些说法好像是错误的

news2024/12/28 13:08:08

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs

30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的


文章目录

  • Flink 系列文章
  • 一、模块Modules
    • 1、模块介绍
    • 2、模块类别Module Types
      • 1)、CoreModule
      • 2)、HiveModule
      • 3)、User-Defined Module
    • 3、模块生命周期和解析顺序Module Lifecycle and Resolution Order
    • 4、模块Modules的使用
      • 1)、SQL方式
      • 2)、编码方式-java
  • 二、Hive Functions内置函数和自定义函数使用
    • 1、通过 HiveModule 使用 Hive 内置函数
    • 2、使用原生的hive 聚合函数Native Hive Aggregate Functions
    • 3、hive的自定义函数介绍
    • 4、hive的自定义函数使用-示例
      • 1)、定义函数
        • 1、UDF实现
        • 2、GenericUDF实现
      • 2)、hive中注册函数并使用
      • 3)、flink sql中使用


本文介绍了Flink的module功能以及Flink SQl使用hive的内置函数和hive的自定义函数功能。
本文依赖hadoop、hive、flink集群能正常使用,其版本分别是3.1.4、3.1.2和1.13.6,内容是按照1.17版本写的。
本文分为2个部分,即介绍了Flink 的Module和Flink SQL 使用Hive的内置函数及自定义函数,并提供了完整的可验证通过的示例。

一、模块Modules

1、模块介绍

模块允许用户扩展 Flink 的内置对象,例如定义行为类似于 Flink 内置函数的函数。它们是可插拔的,虽然 Flink 提供了一些预构建的模块,但用户可以编写自己的模块。

例如,用户可以定义自己的地理函数,并将它们作为内置函数插入 Flink 中,用于 Flink SQL 和表 API。另一个例子是用户可以加载一个现成的 Hive 模块,将 Hive 内置函数用作 Flink 内置函数。

此外,模块可以提供内置的表源( table source)和接收器工厂(sink factories),这些工厂禁用了基于 Java 服务提供程序接口 (SPI) 的 Flink 默认发现机制,或者影响如何在没有相应目录的情况下创建临时表的连接器。

模块提供的对象被视为 Flink 系统(内置)对象的一部分;因此,它们没有任何命名空间。

2、模块类别Module Types

1)、CoreModule

CoreModule 包含 Flink 的所有系统(内置)功能,默认情况下加载并启用。

2)、HiveModule

HiveModule 为 SQL 和 Table API 用户提供了 Hive 内置函数作为 Flink 的系统函数。Flink 的 Hive 文档提供了有关设置模块的完整详细信息。

3)、User-Defined Module

用户可以通过实现模块接口来开发自定义模块。若要在 SQL CLI 中使用自定义模块,用户应通过实现模块工厂接口来开发模块及其相应的模块工厂。

模块工厂定义一组属性,用于在 SQL CLI 引导时配置模块。属性将传递给发现服务,该服务尝试将属性与 ModuleFactory 匹配并实例化相应的模块实例。

3、模块生命周期和解析顺序Module Lifecycle and Resolution Order

可以加载、启用、禁用和卸载模块。当 TableEnvironment 最初加载模块时,默认情况下会启用该模块。Flink 支持多个模块,并跟踪加载顺序以解析元数据。此外,Flink 只解析启用模块之间的功能。例如,当两个模块中存在两个同名的函数时,将有三个条件,如下。

  • 如果两个模块都启用了,那么 Flink 会根据模块的解析顺序解析函数。
  • 如果其中一个被禁用,则 Flink 会将函数解析为启用的模块。
  • 如果两个模块都被禁用,那么 Flink 就无法解析该功能。

用户可以通过使用不同声明顺序的模块来更改解析顺序。例如,用户可以指定 Flink 通过 USE MODULES Hive、core 首先在 Hive 中查找函数。

此外,用户还可以通过不声明模块来禁用模块。例如,用户可以指定 Flink 通过 USE MODULES hive 禁用核心模块(但是,强烈建议不要禁用核心模块)。禁用模块不会卸载它,用户可以使用它时再次启用它。例如,用户可以带回核心模块并将其放置在第一个通过USE MODULES core,hive。仅当模块已加载时,才能启用该模块。使用卸载的模块将引发异常。最终,用户可以卸载模块。

禁用和卸载模块的区别在于,TableEnvironment 仍然保留已禁用的模块,用户可以列出所有已加载的模块以查看已禁用的模块。

4、模块Modules的使用

一般有2种使用方式,即sql方式和开发语言编程方式,如java、scala、python。
用户可以使用 SQL 在表 API 和 SQL CLI 中加载/卸载/使用/列出模块。
用户可以使用 Java、Scala 或 Python 以编程方式加载/卸载/使用/列出模块。

1)、SQL方式

sql的方式本文列出了三种,即通过客户端直接使用、java语言中编写sql和配置文件方式。

  • SQL cli
Flink SQL> SHOW MODULES;
+-------------+
| module name |
+-------------+
|        core |
+-------------+
1 row in set

Flink SQL> SHOW FULL MODULES;
+-------------+------+
| module name | used |
+-------------+------+
|        core | true |
+-------------+------+
1 row in set

Flink SQL> LOAD MODULE hive WITH ('hive-version' = '3.1.2');
[INFO] Execute statement succeed.

Flink SQL> SHOW MODULES;
+-------------+
| module name |
+-------------+
|        core |
|        hive |
+-------------+
2 rows in set

Flink SQL> SHOW FULL MODULES;
+-------------+------+
| module name | used |
+-------------+------+
|        core | true |
|        hive | true |
+-------------+------+
2 rows in set

Flink SQL> USE MODULES hive, core ;
[INFO] Execute statement succeed.

Flink SQL> SHOW MODULES;
+-------------+
| module name |
+-------------+
|        hive |
|        core |
+-------------+
2 rows in set

Flink SQL> SHOW FULL MODULES;
+-------------+------+
| module name | used |
+-------------+------+
|        hive | true |
|        core | true |
+-------------+------+
2 rows in set

Flink SQL> UNLOAD MODULE hive;
[INFO] Execute statement succeed.

Flink SQL> SHOW MODULES;
+-------------+
| module name |
+-------------+
|        core |
+-------------+
1 row in set

Flink SQL> SHOW FULL MODULES;
+-------------+------+
| module name | used |
+-------------+------+
|        core | true |
+-------------+------+
1 row in set

  • java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);

// Show initially loaded and enabled modules
tableEnv.executeSql("SHOW MODULES").print();
// +-------------+
// | module name |
// +-------------+
// |        core |
// +-------------+
tableEnv.executeSql("SHOW FULL MODULES").print();
// +-------------+------+
// | module name | used |
// +-------------+------+
// |        core | true |
// +-------------+------+

// Load a hive module
tableEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '...')");

// Show all enabled modules
tableEnv.executeSql("SHOW MODULES").print();
// +-------------+
// | module name |
// +-------------+
// |        core |
// |        hive |
// +-------------+

// Show all loaded modules with both name and use status
tableEnv.executeSql("SHOW FULL MODULES").print();
// +-------------+------+
// | module name | used |
// +-------------+------+
// |        core | true |
// |        hive | true |
// +-------------+------+

// Change resolution order
tableEnv.executeSql("USE MODULES hive, core");
tableEnv.executeSql("SHOW MODULES").print();
// +-------------+
// | module name |
// +-------------+
// |        hive |
// |        core |
// +-------------+
tableEnv.executeSql("SHOW FULL MODULES").print();
// +-------------+------+
// | module name | used |
// +-------------+------+
// |        hive | true |
// |        core | true |
// +-------------+------+

// Disable core module
tableEnv.executeSql("USE MODULES hive");
tableEnv.executeSql("SHOW MODULES").print();
// +-------------+
// | module name |
// +-------------+
// |        hive |
// +-------------+
tableEnv.executeSql("SHOW FULL MODULES").print();
// +-------------+-------+
// | module name |  used |
// +-------------+-------+
// |        hive |  true |
// |        core | false |
// +-------------+-------+

// Unload hive module
tableEnv.executeSql("UNLOAD MODULE hive");
tableEnv.executeSql("SHOW MODULES").print();
// Empty set
tableEnv.executeSql("SHOW FULL MODULES").print();
// +-------------+-------+
// | module name |  used |
// +-------------+-------+
// |        hive | false |
// +-------------+-------+
  • yaml
    使用 YAML 定义的所有模块都必须提供指定类型的类型属性。现支持以下类型
    在这里插入图片描述
modules:
   - name: core
     type: core
   - name: hive
     type: hive

使用SQL方式时,模块的名称是用于加载模块的,所以是模块的唯一标识,并且大小写敏感

2)、编码方式-java

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);

// Show initially loaded and enabled modules
tableEnv.listModules();
// +-------------+
// | module name |
// +-------------+
// |        core |
// +-------------+
tableEnv.listFullModules();
// +-------------+------+
// | module name | used |
// +-------------+------+
// |        core | true |
// +-------------+------+

// Load a hive module
tableEnv.loadModule("hive", new HiveModule());

// Show all enabled modules
tableEnv.listModules();
// +-------------+
// | module name |
// +-------------+
// |        core |
// |        hive |
// +-------------+

// Show all loaded modules with both name and use status
tableEnv.listFullModules();
// +-------------+------+
// | module name | used |
// +-------------+------+
// |        core | true |
// |        hive | true |
// +-------------+------+

// Change resolution order
tableEnv.useModules("hive", "core");
tableEnv.listModules();
// +-------------+
// | module name |
// +-------------+
// |        hive |
// |        core |
// +-------------+
tableEnv.listFullModules();
// +-------------+------+
// | module name | used |
// +-------------+------+
// |        hive | true |
// |        core | true |
// +-------------+------+

// Disable core module
tableEnv.useModules("hive");
tableEnv.listModules();
// +-------------+
// | module name |
// +-------------+
// |        hive |
// +-------------+
tableEnv.listFullModules();
// +-------------+-------+
// | module name |  used |
// +-------------+-------+
// |        hive |  true |
// |        core | false |
// +-------------+-------+

// Unload hive module
tableEnv.unloadModule("hive");
tableEnv.listModules();
// Empty set
tableEnv.listFullModules();
// +-------------+-------+
// | module name |  used |
// +-------------+-------+
// |        hive | false |
// +-------------+-------+

二、Hive Functions内置函数和自定义函数使用

关于Flink sql使用hive的内置函数可以参考文章:41、Flink之Hive 方言介绍及详细示例

1、通过 HiveModule 使用 Hive 内置函数

HiveModule 为 Flink SQL 和 Table API 用户提供了 Hive 内置函数作为 Flink 系统(内置)函数。

  • java 代码示例
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		String moduleName            = "myhive";
		String hiveVersion         = "3.1.2";
		tenv.loadModule(moduleName, new HiveModule(hiveVersion));
  • yaml配置示例
modules:
   - name: core
     type: core
   - name: myhive
     type: hive
  • hive内置函数使用示例
CREATE CATALOG alan_hivecatalog WITH (
    'type' = 'hive',
    'default-database' = 'testhive',
    'hive-conf-dir' = '/usr/local/bigdata/apache-hive-3.1.2-bin/conf'
);
use catalog alan_hivecatalog;
set table.sql-dialect=hive;
load module hive;
use modules hive,core;
select explode(array(1,2,3));
create table tbl (key int,value string);
set execution.runtime-mode=streaming; 
insert into table tbl values (5,'e'),(1,'a'),(1,'a'),(3,'c'),(2,'b'),(3,'c'),(3,'c'),(4,'d');
select * from tbl;

--------------------flink sql 操作
Flink SQL> select explode(array(1,2,3));
Hive Session ID = 7d3ae2d5-24f3-4d97-9897-83c8a9abda9b
[ERROR] Could not execute SQL statement. Reason:
org.apache.hadoop.hive.ql.parse.SemanticException: Invalid function explode

Flink SQL> set table.sql-dialect=hive;

Flink SQL> select explode(array(1,2,3));
Hive Session ID = c0b87333-4957-4c18-b197-27649a3f2ae2
[ERROR] Could not execute SQL statement. Reason:
org.apache.hadoop.hive.ql.parse.SemanticException: Invalid function explode

Flink SQL> load module hive;

Flink SQL> use modules hive,core;

Flink SQL> select explode(array(1,2,3));

+----+-------------+
| op |         col |
+----+-------------+
| +I |           1 |
| +I |           2 |
| +I |           3 |
+----+-------------+
Received a total of 3 rows

Flink SQL> create table tbl (key int,value string);

Flink SQL> insert overwrite table tbl values (5,'e'),(1,'a'),(1,'a'),(3,'c'),(2,'b'),(3,'c'),(3,'c'),(4,'d');
Hive Session ID = 12fe08fa-5e63-44b2-8fc3-a90064959451
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Streaming mode not support overwrite.

Flink SQL> set execution.runtime-mode=batch; 
Hive Session ID = 4f17cc70-165c-4540-a299-874b66458521
[INFO] Session property has been set.

Flink SQL> insert overwrite table tbl values (5,'e'),(1,'a'),(1,'a'),(3,'c'),(2,'b'),(3,'c'),(3,'c'),(4,'d');
Hive Session ID = 1923623f-03d3-44b4-93ab-ee8498c5da06
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Checkpoint is not supported for batch jobs.

Flink SQL> set execution.runtime-mode=streaming; 

Flink SQL> insert into table tbl values (5,'e'),(1,'a'),(1,'a'),(3,'c'),(2,'b'),(3,'c'),(3,'c'),(4,'d');

Flink SQL> select * from tbl;
+----+-------------+--------------------------------+
| op |         key |                          value |
+----+-------------+--------------------------------+
| +I |           5 |                              e |
| +I |           1 |                              a |
| +I |           1 |                              a |
| +I |           3 |                              c |
| +I |           2 |                              b |
| +I |           3 |                              c |
| +I |           3 |                              c |
| +I |           4 |                              d |
+----+-------------+--------------------------------+
Received a total of 8 rows

一些hive的内置函数存在线程安全问题,建议更新hive的版本修复它

2、使用原生的hive 聚合函数Native Hive Aggregate Functions

如果 HiveModule 加载的优先级高于 CoreModule,Flink 会先尝试使用 Hive 内置函数。然后对于 Hive 内置的聚合函数,Flink 现在只能使用基于排序的聚合算子。从 Flink 1.17 开始,我们引入了一些原生的 Hive 聚合函数,可以使用基于哈希的聚合运算符来执行。目前仅支持5个函数,即sum/count/avg/min/max,未来将支持更多聚合函数。用户可以通过打开选项 table.exec.hive.native-agg-function.enabled 来使用原生的聚合函数,这为作业带来了显著的性能改进。
在这里插入图片描述

原生的聚合函数(native aggregation functions)的功能现在与 Hive 内置聚合函数不完全一致,例如,不支持某些数据类型。如果性能不是瓶颈,则无需启用此选项。此外,通过 SqlClient 使用时,无法为每个作业打开 table.exec.hive.native-agg-function.enabled 选项,目前仅支持模块级别。用户应先启用此选项,然后加载 HiveModule。此问题将在将来修复。

3、hive的自定义函数介绍

用户可以在 Flink 中使用他们现有的 Hive 用户定义函数。
当前支持的用户自定义函数包括如下:

  • UDF
  • GenericUDF
  • GenericUDTF
  • UDAF
  • GenericUDAFResolver2

在查询计划和执行时,Hive 的 UDF 和 GenericUDF 会自动转换为 Flink 的 ScalarFunction,Hive 的 GenericUDTF 会自动转换为 Flink 的 TableFunction,Hive 的 UDAF 和 GenericUDAFResolver2 会自动转换为 Flink 的 AggregateFunction。

若要使用 Hive 用户定义函数,前提条件如下:

  • 设置由 Hive metastore支持的 HiveCatalog,该目录包含该函数作为会话的当前目录
  • 在 Flink 的类路径中包含包含该函数的 jar
  • 使用 Blink planner (1.14版本以后没有这个限制)

4、hive的自定义函数使用-示例

关于hive自定义函数的开发过程详见文章关于自定义函数的部分:7、hive shell客户端与属性配置、内置运算符、函数(内置运算符与自定义UDF运算符)
简单来说分为如下几步:

  • 写一个java类,继承UDF,并重载evaluate方法,方法中实现函数的业务逻辑(重载意味着可以在一个java类中实现多个函数功能)
  • 程序打成jar包,上传HS2服务器本地或者HDFS
  • 客户端命令行中添加jar包到Hive的classpath: hive>add JAR /xxxx/udf.jar
  • 注册成为临时函数(给UDF命名):create temporary function 函数名 as ‘UDF类全路径’
  • HQL中使用函数

1)、定义函数

本函数实现功能如下:

  • 能够对输入数据进行非空判断、手机号位数判断
  • 能够实现校验手机号格式,把满足规则的进行****处理
  • 对于不符合手机号规则的数据直接返回,不处理
  • 代码 -有两种实现方式即UDF和GenericUDF

1、UDF实现

public class EncryptPhoneNumber extends UDF {
    /**
     * 重载evaluate方法 实现函数的业务逻辑
     * @param phoNum  入参:未加密手机号
     * @return 返回:加密后的手机号字符串
     */
    public String evaluate(String phoNum){
        String encryptPhoNum = null;
        //手机号不为空 并且为11位
        if (StringUtils.isNotEmpty(phoNum) && phoNum.trim().length() == 11 ) {
            //判断数据是否满足中国大陆手机号码规范
                String regex = "^(1[3-9]\\d{9}$)";
            Pattern p = Pattern.compile(regex);
            Matcher m = p.matcher(phoNum);
            if (m.matches()) {//进入这里都是符合手机号规则的
                //使用正则替换 返回加密后数据
                encryptPhoNum = phoNum.trim().replaceAll("(\\d{3})\\d{4}(\\d{4})","$1****$2");
            }else{
                //不符合手机号规则 数据直接原封不动返回
                encryptPhoNum = phoNum;
            }
        }else{
            //不符合11位 数据直接原封不动返回
            encryptPhoNum = phoNum;
        }
        return encryptPhoNum;
    }
}

2、GenericUDF实现

import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;


public class EncryptPhoneNumber extends GenericUDF {
	StringObjectInspector elementOI;
	/**
	 * Initialize this GenericUDF. This will be called once and only once per
	 * GenericUDF instance.
	 */
	@Override
	public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
		// 1. 检查该记录是否传过来正确的参数数量
		if (arguments.length != 1) {
			throw new UDFArgumentException("输入参数错误,必须是一个参数。");
		}
		// 2. 检查该条记录是否传过来正确的参数类型
		ObjectInspector a = arguments[0];
		if (!(a instanceof StringObjectInspector)) {
			throw new UDFArgumentException("輸入參數錯誤,需要是一個字符串");
		}

		// 3. 检查通过后,将参数赋值给成员变量ObjectInspector,为了在evaluate()中使用
		this.elementOI = (StringObjectInspector) a;
		return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
	}

	/**
	 * Evaluate the GenericUDF with the arguments. 重载evaluate方法 实现函数的业务逻辑
	 */
	@Override
	public Object evaluate(DeferredObject[] arguments) throws HiveException {
		String phoNum = elementOI.getPrimitiveJavaObject(arguments[0].get()).toString();
		String encryptPhoNum = null;
		// 手机号不为空 并且为11位
		if (StringUtils.isNotEmpty(phoNum) && phoNum.trim().length() == 11) {
			// 判断数据是否满足中国大陆手机号码规范
			String regex = "^(1[3-9]\\d{9}$)";
			Pattern p = Pattern.compile(regex);
			Matcher m = p.matcher(phoNum);
			if (m.matches()) {// 进入这里都是符合手机号规则的
				// 使用正则替换 返回加密后数据
				encryptPhoNum = phoNum.trim().replaceAll("(\\d{3})\\d{4}(\\d{4})", "$1****$2");
			} else {
				// 不符合手机号规则 数据直接原封不动返回
				encryptPhoNum = phoNum;
			}
		} else {
			// 不符合11位 数据直接原封不动返回
			encryptPhoNum = phoNum;
		}
		return encryptPhoNum;
	}

	/**
	 * Get the String to be displayed in explain.
	 */
	@Override
	public String getDisplayString(String[] children) {
		return "this is a EncryptPhoneNumber pro.";
	}

	public static void main(String[] args) throws Exception {
		EncryptPhoneNumber ep = new EncryptPhoneNumber();
		JavaStringObjectInspector stringOI = PrimitiveObjectInspectorFactory.javaStringObjectInspector;

		JavaStringObjectInspector resultInspector = (JavaStringObjectInspector) ep.initialize(new ObjectInspector[] { stringOI });

		Object result = ep.evaluate(new DeferredObject[] { new DeferredJavaObject("13917885967") });
		System.out.println("result:" + result);
	}

}

2)、hive中注册函数并使用

本处简单的描述过程和命令。

  • 打包
mvn package -Dmaven.test.skip=true
  • 添加jar包到Hive的classpath
0: jdbc:hive2://server4:10000> add jar /usr/local/bigdata/testjar/hive-0.0.1-SNAPSHOT.jar;
No rows affected (0.01 seconds)

  • 注册成为永久函数

该处需要特别注意,同时需要注册函数的时候带上数据库名称,否则默认为default.函数名称,如:default.encryptphonenumber

-- alan_testdatabase 为hive中一个数据库名称
CREATE FUNCTION alan_testdatabase.encryptPhoneNumber AS 'org.hive.udf.EncryptPhoneNumber';

0: jdbc:hive2://server4:10000> CREATE FUNCTION alan_testdatabase.encryptPhoneNumber AS 'org.hive.udf.EncryptPhoneNumber';
No rows affected (0.023 seconds)

  • 验证hive的自定义函数功能
0: jdbc:hive2://server4:10000> select alan_testdatabase.encryptPhoneNumber("13788889999");
+--------------+
|     _c0      |
+--------------+
| 137****9999  |
+--------------+

3)、flink sql中使用

前提:需要将hive 自定义函数的jar包(也就是第二步中注册为函数的那个jar包)放在flink的lib目录下,并重启集群。

  • 在hive中查看自定义的函数
0: jdbc:hive2://server4:10000> show functions;
+---------------------------------------+
|               tab_name                |
+---------------------------------------+
......
| aes_decrypt                           |
| aes_encrypt                           |
| alan_testdatabase.encryptphonenumber  |
| and                                   |
| array                                 |
........
  • 设置Flink sql的环境

Flink SQL> LOAD MODULE hive WITH ('hive-version' = '3.1.2');
[INFO] Execute statement succeed.

Flink SQL> use modules hive,core;
[INFO] Execute statement succeed.

Flink SQL> SHOW FULL MODULES;
+-------------+------+
| module name | used |
+-------------+------+
|        hive | true |
|        core | true |
+-------------+------+
2 rows in set

Flink SQL> use catalog alan_hivecatalog;
[INFO] Execute statement succeed.

Flink SQL> set table.sql-dialect=hive;
[INFO] Session property has been set.

Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.

  • 在flink中查看函数

Flink SQL> use alan_testdatabase;
[INFO] Execute statement succeed.

Flink SQL> show functions;
Hive Session ID = 5d34cbf8-5984-4ec0-8527-e06a948ad7ca
+--------------------------------+
|                  function name |
+--------------------------------+
.........
|             encryptphonenumber |
.........
  • 在flink sql中验证hive 的自定义函数
Flink SQL> select alan_testdatabase.encryptPhoneNumber("13788889999");
+----+--------------------------------+
| op |                         _o__c0 |
+----+--------------------------------+
| +I |                    137****9999 |
+----+--------------------------------+
Received a total of 1 row

以上,介绍了Flink的module功能以及Flink SQl使用hive的内置函数和hive的自定义函数功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/979388.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

免费教学自制产品图册的方法

制作一个精美的产品图册可以帮助我们更好地展示产品的特点和优势,吸引潜在客户的注意力。而使用免费的工具和资源来制作图册,不仅可以节省成本,还能提高效率。 首先,我们需要选择一个适合的设计工具。例如FLBOOK在线制作电子杂志平…

包管理工具--》yarn的配置及使用

包管理工具系列文章目录 一、包管理工具--》npm的配置及使用(一) 二、包管理工具--》npm的配置及使用(二) 目录 🌟yarn 简介 🌟yarn 的核心命令 初始化 安装 脚本和本地CLI 查询 更新 卸载 &…

Redis:StringRedisTemplate简介

(笔记总结自b站黑马程序员课程) 为了在反序列化时知道对象的类型,JSON序列化器会将类的class类型写入json结果中,存入Redis,会带来额外的内存开销。 为了减少内存的消耗,我们可以采用手动序列化的方式&am…

Python之列表操作和内存模型

Python之列表操作和内存模型 列表list 一个排列整齐的队伍,Python采用顺序表实现 列表内的个体称作元素,由若干元素组成 列表 元素可以是任意对象(数字、字符串、对象、列表等) 列表内元素有顺序,可以使用索引 线性的数据结构 使用 [ ] …

Spring - Cloud (微服务)

SpringCloud 和 SpringBoot 版本选择对应: 版本对应:https://start.spring.io/actuator/info Cloud官网:Spring Cloud 中文网:Spring Cloud中文网-官方文档中文版 在官网可以查看 当前Cloud 推荐的Boot版本: 当前技术…

Yolov8-pose关键点检测:模型轻量化创新 | 轻量级Slim-Neck

💡💡💡本文解决什么问题:轻量级Slim-Neck,缓解 DSC 缺陷对模型的负面影响,并充分利用深度可分离卷积 DSC 的优势。 Slim-Neck | mAP50从0.921提升至0.93, mAP50从0.697提升至0.829 Yolov8-Pose关键点检测专栏介绍:https://blog.csdn.net/m0_63774211/category_1…

从0到1构建界面设计系统思维

用户界面(UI)是人与机器之间发生交互的载体,也是用户体验(UX)的一个组成部分。用户界面由两个主要部分组成:视觉设计(即传达产品的外观和感觉)和交互设计(即元素的功能和…

超详解| Yolov8模型手把手调参 | 配置 | 模型训练 | 验证 | 推理

YOLOv8是一款前沿、最先进(SOTA)的模型,基于先前YOLO版本的成功,引入了新功能和改进,进一步提升性能和灵活性。 然而,要充分发挥Yolov8的潜力,合理的参数配置是至关重要的。本文将带您深入了解…

2023年值得推荐的 API 开发工具

数字化时代,应用程序编程接口(API)的重要性愈发凸显。API 充当着应用程序之间的桥梁,促进数据交换和功能集成。随着 API 的不断增加和复杂化,开发对 API 开发工具的要求也越来越高。 我们一起来盘点下 2023年上半年比…

Ant design table实现单选和点击行选中

实现单选 Antd table实现单选非常方便只需要在rowSelection属性里添加type: radio即可。 实现点击行选中 需要用到onRow属性 在onClick事件里面手动设置selectRowKey,跟Table onChange事件一样 onRow{(record) > ({onClick: () > {console.log(record)…

极智开发 | vscode使用ssh加速git操作

欢迎关注我的公众号 [极智视界],获取我的更多经验分享 大家好,我是极智视界,本文分享一下 vscode使用ssh加速git操作。 邀您加入我的知识星球「极智视界」,星球内有超多好玩的项目实战源码和资源下载,链接:https://t.zsxq.com/0aiNxERDq 之前在 vscode 中主要使用 http …

浅析自动化测试工具的功能与作用

自动化测试工具是一种软件工具,旨在通过脚本或可视化界面自动执行测试任务和验证预期结果。这些工具可以自动识别和执行测试用例,模拟用户操作,比较实际和预期结果,并生成测试报告。自动化测试工具减少了人工干预,提高…

蓝桥杯备赛Day7——算法复杂度分析、前缀和思想

O(nlogn)算法可能达到的最优复杂度。快速排序算法是典型例子。 O(n^2)一个两重循环的算法,复杂度是O(n^2)。例如冒泡排序,是典型的两重循环。 O(n^3)、O(n^4)等等。 O(2n)一般对应集合问题。 例如一个集合中有n个数,要求输出它的所有子集。 O(n!)在集合问题中,如果要求按顺…

算法训练营day42|动态规划 part04(01背包问题基础(两种解决方案)、LeetCode 416.分割等和子集)

文章目录 01背包----二维dp数组01背包----滚动数组416.分割等和子集思路分析背包解法思考总结 有n件物品和一个最多能背重量为w 的背包。第i件物品的重量是weight[i],得到的价值是value[i] 。每件物品只能用一次,求解将哪些物品装入背包里物品价值总和最…

Geek——能力超强的卸载工具

简介 Geek是一款能力超强的卸载工具,旨在帮助用户轻松、彻底地卸载不需要的软件。无论是常见的应用程序、插件还是顽固的垃圾文件,Geek都能够迅速而安全地将其清理殆尽,释放您的存储空间和系统资源。 使用Geek,您可以轻松摆脱不…

【100天精通Python】Day56:Python 数据分析_Pandas数据清洗和处理

目录 数据清洗和处理 1.处理缺失值 1.1 删除缺失值: 1.2 填充缺失值: 1.3 插值: 2 数据类型转换 2.1 数据类型转换 2.2 日期和时间的转换: 2.3 分类数据的转换: 2.4 自定义数据类型的转换: 3 数…

2023,软件测试人的未来在哪里?

2023年,IT行业出现空前的萧条,首先是年初一开始各大厂像着了魔似的不约而同的纷纷裁员、降薪、奖金包缩水,随之而来的是需求萎缩,HC减少或封锁等等。 而有幸未被列入裁员名单的在职人员,庆幸之余也心有余悸&#xff0…

Excel VSTO开发8 -相关控件

版权声明:本文为博主原创文章,转载请在显著位置标明本文出处以及作者网名,未经作者允许不得用于商业目的。 8 相关控件 在VSTO开发中,Ribbon(或称为Ribbon UI)是指Office应用程序中的那个位于顶部的带有选…

it运维监控管理平台,统一运维监控管理平台

随着系统规模的不断扩大和复杂性的提高,IT运维管理的难度也在逐步增加。为了应对这一挑战,IT运维监控管理平台应运而生。本文将详细介绍IT运维监控管理平台的作用和优势以及如何选择合适的平台。 IT运维监控管理平台的作用管理平台 IT运维监控管理平台是…

glb数据介绍

目录 1.什么是glb数据?2.glb数据可以用来干什么?3.glb和gltf有什么区别?4.glb数据怎么打开?5.如何创建glb数据?6.glb数据的结构7.glb数据的优缺点8.glb对pbr渲染的帮助 1.什么是glb数据? GLB文件&#xff0…