Hive的函数高阶应用
explode函数
explode属于UDTF函数,表生成函数,输入一行数据输出多行数据。
功能:
explode() takes in an array (or a map) as an input and outputs the elements of the array (map) as separate rows. --explode接收map array类型的参数 把map或者array的元素输出,一行一个元素。 explode(array(11,22,33)) 11 22 33 select explode(`array`(11,22,33,44,55)); select explode(`map`("id",10086,"name","allen","age",18));
lateral view 侧视图
侧视图的原理是将UDTF的结果构建成一个类似于视图的表,然后将原表中的每一行和UDTF函数输出的每一行进行连接,生成一张新的虚拟表
背景
UDTF函数生成的结果可以当成一张虚拟的表,但是无法和原始表进行组合查询
select name,explode(location) from test_message; --这个sql就是错误的 相当于执行组合查询
从理论层面推导,对两份数据进行join就可以了
但是,hive专门推出了lateral view侧视图的语,满足上述需要。
功能:把UDTF函数生成的结果和原始表进行关联,便于用户在select时间组合查询、 lateral view是UDTf的好基友好搭档,实际中经常配合使用。
语法:
--lateral view侧视图基本语法如下 select …… from tabelA lateral view UDTF(xxx) 别名 as col1,col2,col3……; --针对上述NBA冠军球队年份排名案例,使用explode函数+lateral view侧视图,可以完美解决 select a.team_name ,b.year from the_nba_championship a lateral view explode(champion_year) b as year; --根据年份倒序排序 select a.team_name ,b.year from the_nba_championship a lateral view explode(champion_year) b as year order by b.year desc; --统计每个球队获取总冠军的次数 并且根据倒序排序 select a.team_name ,count(*) as nums from the_nba_championship a lateral view explode(champion_year) b as year group by a.team_name order by nums desc;
行列转换
多行转单列
collect_set --把多行数据收集为一行 返回set集合 去重无序
collect_list --把多行数据收集为一行 返回list集合 不去重有序
字符串拼接函数
concat --直接拼接字符串
concat_ws --指定分隔符拼接
select concat("it","cast","And","heima");
select concat("it","cast","And",null);
select concat_ws("-","itcast","And","heima");
select concat_ws("-","itcast","And",null);
单列转多行
技术原理: explode+lateral view
--原表 +-------+-------+--------+--+ | col1 | col2 | col3 | +-------+-------+--------+--+ | a | b | 1,2,3 | | c | d | 4,5,6 | +-------+-------+--------+--+ --目标表 +----------------+----------------+----------------+--+ | row2col2.col1 | row2col2.col2 | row2col2.col3 | +----------------+----------------+----------------+--+ | a | b | 1 | | a | b | 2 | | a | b | 3 | | c | d | 4 | | c | d | 5 | | c | d | 6 | +----------------+----------------+----------------+--+ --创建表 create table col2row2( col1 string, col2 string, col3 string )row format delimited fields terminated by '\t'; --加载数据 load data local inpath '/root/hivedata/c2r2.txt' into table col2row2; select * from col2row2; select explode(split(col3,',')) from col2row2; --SQL最终实现 select col1, col2, lv.col3 as col3 from col2row2 lateral view explode(split(col3, ',')) lv as col3;
json格式数据处理
在hive中,没有json类的存在,一般使用string类型来修饰,叫做json字符串,简称json串。
在hive中,处理json数据的两种方式
hive内置了两个用于解析json的函数
json_tuple --是UDTF 表生成函数 输入一行,输出多行 一次提取读个值 可以单独使用 也可以配合lateral view侧视图使用 get_json_object --是UDF普通函数,输入一行 输出一行 一次只能提取一个值 多次提取多次使用
使用==JsonSerDe 类解析==,在加载json数据到表中的时候完成解析动作
hive 窗口函数
快速理解窗口函数功能
window function 窗口函数、开窗函数、olap分析函数。
窗口:可以理解为操作数据的范围,窗口有大有小,本窗口中操作的数据有多有少。
可以简单地解释为类似于聚合函数的计算函数,但是通过GROUP BY子句组合的常规聚合会隐藏正在聚合的各个行,最终输出一行;而窗口函数聚合后还可以访问当中的各个行,并且可以将这些行中的某些属性添加到结果集中。
窗口函数语法
具有OVER语句的函数叫做窗口函数。
Function(arg1,..., argn) OVER ([PARTITION BY <...>] [ORDER BY <....>] [<window_expression>]) --其中Function(arg1,..., argn) 可以是下面分类中的任意一个 --聚合函数:比如sum max avg等 --排序函数:比如rank row_number等 --分析函数:比如lead lag first_value等 --OVER [PARTITION BY <...>] 类似于group by 用于指定分组 每个分组你可以把它叫做窗口 --如果没有PARTITION BY 那么整张表的所有行就是一组 --[ORDER BY <....>] 用于指定每个分组内的数据排序规则 支持ASC、DESC --[<window_expression>] 用于指定每个窗口中 操作的数据范围 默认是窗口中所有行
窗口聚合函数
语法
sum|max|min|avg OVER ([PARTITION BY <...>] [ORDER BY <....>] [<window_expression>])
重点:有PARTITION BY 没有PARTITION BY的区别;有ORDER BY没有ORDER BY的区别。
有没有partition by 影响的是全局聚合 还是分组之后 每个组内聚合。
有没有order by的区别:
没有order by,聚合的时候是组内所有的数据聚合再一起 全局聚合
如果有order by,聚合的时候是累加聚合,默认是第一行聚合到当前行。
window_expression
直译叫做window表达式 ,通俗叫法称之为window子句。
功能:控制窗口操作的范围。
语法
rows between - preceding:往前 - following:往后 - current row:当前行 - unbounded:起点 - unbounded preceding 表示从前面的起点 第一行 - unbounded following:表示到后面的终点 最后一行
窗口排序函数、窗口序列函数
功能:主要对数据分组排序之后,组内顺序标号。
核心函数:row_number、rank、dense_rank
适合场景:分组TopN问题(注意哦 不是全局topN)
ntile函数
功能:将分组排序之后的数据分成指定的若干个部分(若干个桶)
规则:尽量平均分配 ,优先满足最小的桶,彼此最多不相差1个。
Hive的数据压缩
Hive的默认执行引擎是MapReduce,因此通常所说的Hive压缩指的是MapReduce的压缩。
压缩是指通过算法对数据进行重新编排,降低存储空间。无损压缩。
MapReduce可以在两个阶段进行数据压缩
map的输出
减少shuffle的数据量 提高shuffle时网络IO的效率
reduce的输出
减少输出文件的大小 降低磁盘的存储空间
压缩的弊端
浪费时间
消耗CPU、内存
某些优秀的压缩算法需要钱
压缩的算法(推荐使用snappy)
Snappy org.apache.hadoop.io.compress.SnappyCodec
Hive中压缩的设置:注意 本质还是指的是MapReduce的压缩
--设置Hive的中间压缩 也就是map的输出压缩 1)开启 hive 中间传输数据压缩功能 set hive.exec.compress.intermediate=true; 2)开启 mapreduce 中 map 输出压缩功能 set mapreduce.map.output.compress=true; 3)设置 mapreduce 中 map 输出数据的压缩方式 set mapreduce.map.output.compress.codec = org.apache.hadoop.io.compress.SnappyCodec; --设置Hive的最终输出压缩,也就是Reduce输出压缩 1)开启 hive 最终输出数据压缩功能 set hive.exec.compress.output=true; 2)开启 mapreduce 最终输出数据压缩 set mapreduce.output.fileoutputformat.compress=true; 3)设置 mapreduce 最终数据输出压缩方式 set mapreduce.output.fileoutputformat.compress.codec =org.apache.hadoop.io.compress.SnappyCodec; 4)设置 mapreduce 最终数据输出压缩为块压缩 还可以指定RECORD set mapreduce.output.fileoutputformat.compress.type=BLOCK;
--设置完毕之后 只有当HiveSQL底层通过MapReduce程序执行 才会涉及压缩。 --已有普通格式的表 select * from student; --ctas语句 create table student_snappy as select * from student;
Hive的数据存储格式
列式存储、行式存储
数据最终在文件中底层以什么样的形成保存。
Hive中表的数据存储格式,不是只支持text文本格式,还支持其他很多格式。
hive表的文件格式是如何指定的呢? 建表的时候通过STORED AS 语法指定。如果没有指定默认都是textfile。
Hive中主流的几种文件格式。
textfile 文件格式
ORC、Parquet 列式存储格式。
都是列式存储格式,底层是以二进制形式存储。数据存储效率极高,对于查询贼方便。 二进制意味着肉眼无法直接解析,hive可以自解析。
在实际开发中,可以根据需求选择不同的文件格式并且搭配不同的压缩算法。可以得到更好的存储效果。
结论建议:在Hive中推荐使用ORC+snappy压缩。
Hive通用调优
Fetch抓取机制
功能:在执行sql的时候,能不走MapReduce程序处理就尽量不走MapReduce程序处理。
尽量直接去操作数据文件。
设置: hive.fetch.task.conversion= more。
--在下述3种情况下 sql不走mr程序 --全局查找 select * from student; --字段查找 select num,name from student; --limit 查找 select num,name from student limit 2;
mapreduce本地模式
功能:如果非要执行MapReduce程序,能够本地执行的,尽量不提交yarn上执行。
默认是关闭的。意味着只要走MapReduce就提交yarn执行。
mapreduce.framework.name = local 本地模式 mapreduce.framework.name = yarn 集群模式
Hive提供了一个参数,自动切换MapReduce程序为本地模式,如果不满足条件,就执行yarn模式。
set hive.exec.mode.local.auto = true; --3个条件必须都满足 自动切换本地模式 The total input size of the job is lower than: hive.exec.mode.local.auto.inputbytes.max (128MB by default) --数据量小于128M The total number of map-tasks is less than: hive.exec.mode.local.auto.tasks.max (4 by default) --maptask个数少于4个 The total number of reduce tasks required is 1 or 0. --reducetask个数是0 或者 1
切换Hive的执行引擎
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. 如果针对Hive的调优依然无法满足你的需求 还是效率低, 尝试使用spark计算引擎 或者Tez.
join优化
底层还是MapReduce的join优化。
MapReduce中有两种join方式。指的是join的行为发生什么阶段。
map端join
reduce端join
优化1:Hive自动尝试选择map端join提高join的效率 省去shuffle的过程。
开启 mapjoin 参数设置: (1)设置自动选择 mapjoin set hive.auto.convert.join = true; --默认为 true (2)大表小表的阈值设置: set hive.mapjoin.smalltable.filesize= 25000000;
优化2:大表join大表
--背景: 大表join大表本身数据就十分具体,如果join字段存在null空值 如何处理它? --方式1:空key的过滤 此行数据不重要 参与join之前 先把空key的数据过滤掉 SELECT a.* FROM (SELECT * FROM nullidtable WHERE id IS NOT NULL ) a JOIN ori b ON a.id =b.id; --方式2:空Key转换 CASE WHEN a.id IS NULL THEN 'xxx任意字符串' ELSE a.id END CASE WHEN a.id IS NULL THEN concat('hive', rand()) ELSE a.id --避免转换之后数据倾斜 随机分布打散
优化3:桶表join提高优化效率。bucket mapjoin
1.1 条件 1) set hive.optimize.bucketmapjoin = true; 2) 一个表的bucket数是另一个表bucket数的整数倍 3) bucket列 == join列 4) 必须是应用在map join的场景中 1.2 注意 1)如果表不是bucket的,只是做普通join。
group by 数据倾斜优化
(1)是否在 Map 端进行聚合,默认为 True set hive.map.aggr = true; (2)在 Map 端进行聚合操作的条目数目 set hive.groupby.mapaggr.checkinterval = 100000; (3)有数据倾斜的时候进行负载均衡(默认是 false) set hive.groupby.skewindata = true; --Q:在hive中数据倾斜开启负载均衡之后 底层执行机制是什么样? --step1:启动一个MapReduce程序 将倾斜的数据随机发送到各个reduce中 进行打散 每个reduce进行聚合都是局部聚合 --step2:再启动第二个MapReduce程序 将上一步局部聚合的结果汇总起来进行最终的聚合
hive中如何调整底层MapReduce中task的个数(并行度)
maptask个数
如果是在MapReduce中 maptask是通过逻辑切片机制决定的。
但是在hive中,影响的因素很多。比如逻辑切片机制,文件是否压缩、压缩之后是否支持切割。
因此在Hive中,调整MapTask的个数,直接去HDFS调整文件的大小和个数,效率较高。
如果小文件多,就进行小文件的合并 合并的大小最好=block size 如果大文件多,就调整blocl size
reducetask个数
如果在MapReduce中,通过代码可以直接指定 job.setNumReduceTasks(N)
在Hive中,reducetask个数受以下几个条件控制的
(1)每个 Reduce 处理的数据量默认是 256MB hive.exec.reducers.bytes.per.reducer=256000000 (2)每个任务最大的 reduce 数,默认为 1009 hive.exec.reducsers.max=1009 (3)mapreduce.job.reduces 该值默认为-1,由 hive 自己根据任务情况进行判断。 --如果用户用户不设置 hive将会根据数据量或者sql需求自己评估reducetask个数。 --用户可以自己通过参数设置reducetask的个数 set mapreduce.job.reduces = N --用户设置的不一定生效,如果用户设置的和sql执行逻辑有冲突,比如order by,在sql编译期间,hive又会将reducetask设置为合理的个数。 Number of reduce tasks determined at compile time: 1