自定义函数
Hive 自带了一些函数,比如:max/min等,但是数量有限,自己可以通过自定义UDF来方便的扩展。
在企业中处理数据的时候,对于敏感数据往往需要进行脱敏处理。比如手机号。我们常见的处理方式是将手机号中间4位进行*号处理。 Hive中没有这样的函数可以直接实现功能,虽然可以通过各种函数的嵌套调用最终也能实现,但是效率不高,现要求自定义开发实现Hive函数,满足上述需求。
- 能够对输入数据进行非空判断、手机号位数判断
- 能够实现校验手机号格式,把满足规则的进行*号处理
- 对于不符合手机号规则的数据直接返回,不处理
实现步骤:
- 写一个java类,继承UDF,并重载evaluate方法,方法中实现函数的业务逻辑;
- 重载意味着可以在一个java类中实现多个函数功能;
- 程序打成jar包,上传HS2服务器本地或者HDFS;
- 客户端命令行中添加jar包到Hive的classpath: hive>add JAR /xxxx/udf.jar;
- 注册成为临时函数(给UDF命名):create temporary function 函数名 as ‘UDF类全路径’;
- HQL中使用函数。
开发环境准备:
idea创建Maven工程
添加pom依赖
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.1</version>
</dependency>
</dependencies>
编写代码
public class MyUDF extends UDF {
public String evaluate(String phoneNum){
//如果手机号为null 或者长度不对 直接返回
if(phoneNum == null || phoneNum.trim().length()!=11){
return phoneNum;
}
//判断手机号是否符合规则
String regex = "1[3-9][0-9]{9}";
boolean matches = phoneNum.trim().matches(regex);
//如果不匹配 原样返回
if(!matches){
return phoneNum;
}
//如果匹配 将中间替换为*
String newNum = phoneNum.trim().
replaceAll("([0-9]{3})[0-9]{4}([0-9]{4})", "$1****$2");
return newNum;
}
}
打成jar包
Maven ----> package
到target目录下去找对应的jar包
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FOkQsEO0-1684840129570)(img/35_jar.png)]
上传添加jar包
1.可以上传到hiveserver2所在的服务器
2.也可以上传到HDFS系统
使用命令
add jar jar包位置
为函数命名
create temporary function myphone_num as 'com.doit.demo.MyUDF';
使用函数
select myphone_num('13812341234');
拉链表设计与实现
1 数据同步问题
Hive在实际工作中主要用于构建离线数据仓库,定期的从各种数据源中同步采集数据到Hive中,经过分层转换提供数据应用。比如每天需要从MySQL中同步最新的订单信息、用户信息、店铺信息等到数据仓库中,进行订单分析、用户分析。
例如:MySQL中有一张用户表:tb_user,每个用户注册完成以后,就会在用户表中新增该用户的信息.
由于每天都会有用户注册,产生新的用户信息,那么每天都需要将MySQL中的用户数据同步到Hive数据仓库中.
假如在1号已经在hive中创建了表并拉取了数据,但是在2号时MySQL中新增2条用户注册数据,并且有1条用户数据发生更新.
那么我们需要对2号的数据进行同步到hive中,新增的数据会直接加载到Hive表中,但是更新的数据如何存储在Hive表中?
方案一:直接覆盖
使用2号的数据 直接将1号的数据覆盖掉
优点:实现最简单,使用起来最方便
缺点:没有历史状态 想查询008之前的数据查看不到
方案二:根据日期构建一份全量的快照表
1号创建一张表拉取所有数据
2号再创建一张表拉取所有数据
... 每天都创建一张表
优点:记录了所有数据在不同时间的状态
缺点:冗余存储了很多没有发生变化的数据,导致存储的数据量过大
方案三:构建拉链表,通过时间标记发生变化的数据的每种状态的时间周期
拉链表的设计是将更新的数据进行状态记录,没有发生更新的数据不进行状态存储,用于存储所有数据在不同时间上的所有状态,通过时间进行标记每个状态的生命周期,查询时,根据需求可以获取指定时间范围状态的数据,默认用9999-12-31等最大值来表示最新状态。
2 拉链表实现原理
原理
-
增量采集变化数据,放入增量表中
-
将Hive中的拉链表与临时表的数据进行合并,合并结果写入临时表
-
将临时表的数据覆盖写入拉链表中
3 拉链表实现演示
创建拉链表
-- 数据准备
vi zipper.txt
001 186xxxx1234 laoda 0 sh 2021-01-01 9999-12-31
002 186xxxx1235 laoer 1 bj 2021-01-01 9999-12-31
003 186xxxx1236 laosan 0 sz 2021-01-01 9999-12-31
004 186xxxx1237 laosi 1 gz 2021-01-01 9999-12-31
005 186xxxx1238 laowu 0 sh 2021-01-01 9999-12-31
006 186xxxx1239 laoliu 1 bj 2021-01-01 9999-12-31
007 186xxxx1240 laoqi 0 sz 2021-01-01 9999-12-31
008 186xxxx1241 laoba 1 gz 2021-01-01 9999-12-31
009 186xxxx1242 laojiu 0 sh 2021-01-01 9999-12-31
010 186xxxx1243 laoshi 1 bj 2021-01-01 9999-12-31
--创建拉链表
create table dw_zipper
(
userid string,
phone string,
nick string,
gender int,
addr string,
starttime string,
endtime string
) row format delimited fields terminated by '\t';
load data local inpath '/root/zipper.txt' into table dw_zipper;
select * from dw_zipper;
创建增量表
vi update.txt
008 186xxxx1241 laoba 1 sh 2021-01-02 9999-12-31
011 186xxxx1244 laoshi 1 jx 2021-01-02 9999-12-31
012 186xxxx1245 laoshi 0 zj 2021-01-02 9999-12-31
create table ods_update
(
userid string,
phone string,
nick string,
gender int,
addr string,
starttime string,
endtime string
) row format delimited fields terminated by '\t';
load data local inpath '/root/update.txt' overwrite into table ods_update;
select * from ods_update;
创建临时表
create table tmp_zipper
(
userid string,
phone string,
nick string,
gender int,
addr string,
starttime string,
endtime string
) row format delimited fields terminated by '\t';
合并数据到临时表
insert overwrite table tmp_zipper
select
userid,
phone,
nick,
gender,
addr,
starttime,
endtime
from ods_update
union all
--查询原来拉链表的所有数据,并将这次需要更新的数据的endTime更改为更新值的startTime
select
a.userid,
a.phone,
a.nick,
a.gender,
a.addr,
a.starttime,
--如果这条数据没有更新或者这条数据不是要更改的数据,就保留原来的值,否则就改为新数据的开始时间-1
if(b.userid is null or a.endtime < '9999-12-31', a.endtime , date_sub(b.starttime,1)) as endtime
from dw_zipper a left join ods_update b
on a.userid = b.userid ;
覆盖拉链表数据
insert overwrite table dw_zipper
select * from tmp_zipper;
数据存储
Hive数据存储的本质还是HDFS,所有的数据读写都基于HDFS的文件来实现,为了提高对HDFS文件读写的性能,Hive中提供了多种文件存储格式:TextFile、SequenceFile、ORC、Parquet等。不同的文件存储格式具有不同的存储特点,有的可以降低存储空间,有的可以提高查询性能等,可以用来实现不同场景下的数据存储,以提高对于数据文件的读写效率。
1 列式存储和行式存储
要理解行式存储和列式存储以及他们之间的差异首先就得理解两种存储方式在结构上的差异.
举个例子,如下表所示为一张学生的学科表:
行式存储
列式存储
写操作差异
从他们的结构上可以看出,行存储在写入时比列存储要快〈eg:如果是机械硬盘,行存储时磁头在磁盘上只需要顺序写入,而列存储需要频繁的移动定位到下一个字段需要写入的地址,造成了时间上的开销)﹔同时由于行存储下表的数据是放在一起的,一次写入,所以数据的完整性可以确定;
读数据差异
1.查询某一列数据
假设我们查询sid列
行存储查询时会将课程名和年级一起查询出来,这样会造成数据冗余,如果列数比较多,影响比较大.
列存储是按照列来存储到一起的,每一列单独存放,查询起来方便很多,直接查询即可
2.查询所有数据
行存储查询所有数据直接查询就可以了
列存储,由于每列单独存放,先查询出来一列,然后在通过聚集运算,把列上每个数据拼接成行,没有行存储方便.
使用场景总结
2 主流文件格式特点
TextFile格式
TextFile是Hive中默认的文件格式,存储形式为按行存储。工作中最常见的数据文件格式就是TextFile文件,几乎所有的原始数据生成都是TextFile格式.
建表时不指定存储格式即为textfile,导入数据时把数据文件拷贝至hdfs不进行处理。
优点
1.最简单的数据格式,不需要经过处理,可以直接cat查看
2.可以使用任意的分隔符进行分割
3.便于和其他工具(Pig, grep, sed, awk)共享数据
4.可以搭配Gzip、Bzip2、Snappy等压缩一起使用
缺点
1.耗费存储空间,I/O性能较低
2.结合压缩时Hive不进行数据切分合并,不能进行并行操作,查询效率低
3.按行存储,读取列的性能差
应用场景
1.适合于小量数据的存储查询
2.一般用于做第一层数据加载和测试使用
Parquet格式
Parquet是一种支持嵌套结构的列式存储文件格式.是一种支持嵌套数据模型对的列式存储系统,作为大数据系统中OLAP查询的优化方案,它已经被多种查询引擎原生支持,并且部分高性能引擎将其作为默认的文件存储格式。通过数据编码和压缩,以及映射下推和谓词下推功能,Parquet的性能也较之其它文件格式有所提升。
优点
1.更高效的压缩和编码可压缩、可分割,优化磁盘利用率和I/O
2.可用于多种数据处理框架
缺点
1.不支持update, insert, delete, ACID
应用场景
1.适用于字段数非常多,无更新,只取部分列的查询
Orc格式
ORC(OptimizedRC File)文件格式也是一种Hadoop生态圈中的列式存储格式.它并不是一个单纯的列式存储格式,是首先根据行组分割整个表,在每一个行组内进行按列存储。.
Index Data
Index Data 包括每一列的最小值和最大值,以及每一列中的行位置。(也可以包含bit field or bloom filter ) 行索引项提供了偏移量,使您能够在解压缩块中查找正确的压缩块和字节。
注意,ORC索引仅用于选择stripes 和 row groups.
拥有相对频繁的行索引项可以在stripe 内跳过行,以便快速读取,尽管stripe 很大。默认情况下,可以跳过每10,000行。
Row Data
存的是具体的数据,先取部分行,然后对这些行按列进行存储。对每个列进行了编码,分成多个Stream来存储。
Stripe Footer
存的是各个Stream的类型,长度等信息。
File Footer
每个文件有一个File Footer,这里面存的是每个Stripe的行数,每个Column的数据类型信息等;
PostScript
每个文件的尾部是一个PostScript,这里面记录了整个文件的压缩类型以及FileFooter的长度信息等。
在读取文件时,会seek到文件尾部读PostScript,从里面解析到File Footer长度,再读FileFooter,从里面解析到各个Stripe信息,再读各个Stripe,即从后往前读。
优点
1.列式存储,存储效率非常高
2.可压缩,高效的列存取
3.查询效率较高,支持索引
4.支持各种复杂的数据类型
5.支持矢量化查询
缺点
1.加载时性能消耗较大
2.需要通过text文件转化生成
3.读取全量数据时性能较差
应用场景
适用于Hive中大型的存储、查询
矢量化查询(了解)
Hive的默认查询执行引擎一次处理一行,而矢量化查询执行是一种Hive针对ORC文件操作的特性,目的是按照每批1024行读取数据,并且一次性对整个记录整合(而不是对单条记录)应用操作,提升了像过滤, 联合, 聚合等等操作的性能。
注意:要使用矢量化查询执行,就必须以ORC格式存储数据。
-- 开启矢量化查询
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
3 主流文件格式对比
**TextFile **
create table test_textfile
(
stime string,
userid string,
keyword string,
clickorder string,
url string
)row format delimited fields terminated by '\t';
load data local inpath '/root/sogou.txt' into table test_textfile;
select * from test_textfile limit 10;
desc formatted test_textfile;
**ORC 列式存储 **
create table test_orc
(
stime string,
userid string,
keyword string,
clickorder string,
url string
)
stored as ORC ;
-- 不能使用load加载数据
insert into table test_orc select * from test_textfile ;
select * from test_orc limit 10;
parquet 列式存储
create table test_parquet
(
stime string,
userid string,
keyword string,
clickorder string,
url string
)
stored as parquet ;
-- 不能使用load加载数据
insert into table test_parquet select * from test_textfile ;
select * from test_parquet limit 10;
desc formatted test_parquet;
-
文本文件是默认格式 行式存储 不压缩 效率最低 18.1M
-
ORC 列式存储 压缩比高 效率高 2.7M
-
parquet 列式存储 压缩比比ORC低 效率和ORC相当 , 兼容性比ORC好 13.1M
执行效率ORC和parquet格式类似 , 但是ORC的压缩比更好 ,parquet 兼容性好 ,
4 数据压缩(了解)
Hive底层运行MapReduce程序时,磁盘I/O操作、网络数据传输、shuffle和merge要花大量的时间,尤其是数据规模很大和工作负载密集的情况下。鉴于磁盘I/O和网络带宽是Hadoop的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O和网络传输非常有帮助。Hive压缩实际上说的就是MapReduce的压缩。
压缩的优点
减小文件存储所占空间
加快文件传输效率,从而提高系统的处理速度
降低IO读写的次数
压缩的缺点
使用数据时需要先对文件解压,加重CPU负荷,压缩算法越复杂,解压时间越长
Hive中的压缩就是使用了Hadoop中的压缩实现的,所以Hadoop中支持的压缩在Hive中都可以直接使用。
要想在Hive中使用压缩,需要对MapReduce和Hive进行相应的配置
临时配置
--配置MapReduce开启输出压缩及配置压缩类型
--开启输出压缩
set mapreduce.output.fileoutputformat.compress=true;
--配置压缩类型为Snappy
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
--配置Hive开启中间结果压缩和输出压缩及配置压缩类型
-- 中间结果压缩
set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
-- 输出结果压缩
set hive.exec.compress.output=true;
永久配置
将以上MapReduce的配置写入mapred-site.xml中,重启Hadoop
将以上Hive的配置写入hive-site.xml中,重启Hive
压缩测试
TextFile压缩
create table t_textfile_snappy
stored as textfile
as select * from test_textfile;
Orc压缩
create table t_orc_snappy
stored as orc tblproperties ("orc.compress"="SNAPPY")
as select * from test_textfile;
select * from t_orc_snappy limit 10;
Hive优化配置
1 Explain查询计划
explain命令可以帮助用户了解一条HQL语句在底层的实现过程。通俗来说就是Hive打算如何去做这件事。explain会解析HQL语句,将整个HQL语句的实现步骤、依赖关系、实现过程都会进行解析返回,可以了解一条HQL语句在底层是如何实现数据的查询及处理的过程,辅助用户对Hive进行优化。
官网:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Explain
explain select * from test_textfile limit 10;
每个查询计划由以下几个部分组成
- 抽象语法树(AST):Hive使用Antlr解析生成器,可以自动地将HQL生成为抽象语法树
- Stage依赖关系:会列出运行查询划分的stage阶段以及之间的依赖关系
- Stage内容:包含了每个stage非常重要的信息,比如运行时的operator和sort orders等具体的信息
explain select stime,count(*) from test_textfile where stime > '00:00:00' group by stime order by stime limit 5;
2 Join优化
Hive Join的底层是通过MapReduce来实现的,Hive实现Join时,为了提高MapReduce的性能,提供了多种Join方案来实现;
例如适合小表Join大表的Map Join,大表Join大表的Reduce Join,以及大表Join的优化方案Bucket Join等。
2.1 Map Join
适用场景 : 小表join大表
原理
将小表的那份数据放到分布式缓存中,给每个MapTask的内存都放一份完整的数据,大的数据每个部分都可以与小数据的完整数据进行join,底层不需要经过shuffle,需要占用内存空间存放小的数据文件.
explain select * from t_category left join t_product tp on t_category.cid = tp.cid;
使用
尽量使用Map Join来实现Join过程,Hive中默认自动开启了Map Join:
hive.auto.convert.join=true
-- 如果大小表连接时小表数据小于该值,则自动开启mapjoin优化
-- 2.0以前
hive.smalltable.filesize or hive.mapjoin.smalltable.filesize: 默认值是25m
-- 2.0以后
hive.auto.convert.join.noconditionaltask.size :默认值10m
和
hive.auto.convert.join.noconditionaltask: 默认值是true
2.2 Reduce Join
应用场景:适合于大表Join大表
原理:将两张表的数据在shuffle阶段利用shuffle的分组,将数据按照关联字段进行合并,必须经过shuffle,利用Shuffle过程中的分组来实现关联.
使用:Hive会自动判断是否满足Map Join,如果不满足Map Join,则自动执行Reduce Join
2.3 Bucket Join
应用场景:适合于大表Join大表
原理:将两张表按照相同的规则将数据划分,根据对应的规则的数据进行join,减少了比较次数,提高了性能
使用
使用Bucket Join
语法:clustered by colName
参数:set hive.optimize.bucketmapjoin = true;
要求:分桶字段 = Join字段 ,桶的个数相等或者成倍数
使用Sort Merge Bucket Join(SMB)
基于有序的数据Join
语法:clustered by colName sorted by (colName)
参数
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
要求:分桶字段 = Join字段 = 排序字段 ,桶的个数相等或者成倍数
3 数据倾斜
分布式计算中最常见的,最容易遇到的问题就是数据倾斜;
现象:当提交运行一个程序时,这个程序的大多数的Task都已经运行结束了,只有某一个Task一直在运行,迟迟不能结束,导致整体的进度卡在99%或者100%,这时候就可以判定程序出现了数据倾斜的问题。
**原因:**数据分配不均衡
Group by、Count(distinct)造成数据倾斜
当程序中出现group by或者count(distinct)等分组聚合的场景时,如果数据本身是倾斜的,根据MapReduce的Hash分区规则,肯定会出现数据倾斜的现象。
根本原因是因为分区规则导致的,所以可以通过以下几种方案来解决group by导致的数据倾斜的问题
方案一:开启Map端聚合
hive.map.aggr=true;
通过减少shuffle数据量和Reducer阶段的执行时间,避免每个Task数据差异过大导致数据倾斜
需要注意:求平均值不能使用 combiner
方案二:数据倾斜时自动负载均衡
hive.groupby.skewindata=true;
开启该参数以后,当前程序会自动通过两个MapReduce来运行
第一个MapReduce自动进行随机分布到Reducer中,每个Reducer做部分聚合操作,输出结果
第二个MapReduce将上一步聚合的结果再按照业务(group by key)进行处理,保证相同的分布到一起,最终聚合得到结果
键随机数打散
Join造成数据倾斜
Join操作时,如果两张表比较大,无法实现Map Join,只能走Reduce Join,那么当关联字段中某一种值过多的时候依旧会导致数据倾斜的问题;
面对Join产生的数据倾斜,核心的思想是尽量避免Reduce Join的产生,优先使用Map Join来实现;
但往往很多的Join场景不满足Map Join的需求,那么可以以下几种方案来解决Join产生的数据倾斜问题:
方案一:提前过滤,将大数据变成小数据,实现Map Join
select a.id,a.value1,b.value2 from table1
a join (select b.* from table2 b where b.ds>='20181201' and b.ds<'20190101') c
on (a.id=c.id)
方案二:使用Bucket Join
方案二:使用Bucket Join
如果使用方案一,过滤后的数据依旧是一张大表,那么最后的Join依旧是一个Reduce Join
这种场景下,可以将两张表的数据构建为桶表,实现Bucket Map Join,避免数据倾斜.
方案三:使用Skew Join
Skew Join是Hive中一种专门为了避免数据倾斜而设计的特殊的Join过程
这种Join的原理是将Map Join和Reduce Join进行合并,如果某个值出现了数据倾斜,就会将产生数据倾斜的数据单独使用Map Join来实现
其他没有产生数据倾斜的数据由Reduce Join来实现,这样就避免了Reduce Join中产生数据倾斜的问题
最终将Map Join的结果和Reduce Join的结果进行Union合并
hive.optimize.skewjoin = true,
原理