Hive数据倾斜以及解决方案
1、什么是数据倾斜
数据倾斜主要表现在,map/reduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时是百倍或者千倍之多),这条Key所在的reduce节点所处理的数据量比其他节点就大很多,从而导致某几个节点迟迟运行不完。
2、数据倾斜的原因
一些操作有关:
关键词 | 情形 | 后果 |
---|---|---|
Join | 其中一个表较小,但是key集中 | 分发到某一个或几个Reduce上的数据远高于平均值 |
大表与大表,但是分桶的判断字段0值或空值过多 | 这些空值都由一个reduce处理,非常慢 | |
group by | group by 维度过小,某值的数量过多 | 处理某值的reduce灰常耗时 |
Count Distinct | 某特殊值过多 | 处理此特殊值的reduce耗时 |
原因归纳:
- key分布不均匀
- 业务数据本身的特性
- 建表时考虑不周
- 某些SQL语句本身就有数据倾斜
现象:
任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。单一reduce的记录数与平均记录数差异过大,通常可能达到3倍甚至更多。 最长时长远大于平均时长。
3、数据倾斜的解决方案
1)参数调节
Map 端部分聚合,相当于Combiner
hive.map.aggr = true
有数据倾斜的时候进行负载均衡,当选项设定为true,生成的查询计划会有两个MR Job。第一个MR Job中,Map 的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MRJob再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。
hive.groupby.skewindata=true
2)SQL语句调节
如何join:
关于驱动表的选取,选用join key分布最均匀的表作为驱动表,做好列裁剪和filter操作,以达到两表做join的时候,数据量相对变小的效果。
大小表Join:
使用map join让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。
大表Join大表:
把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。
count distinct大量相同特殊值:
count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。
group by维度过小:
采用sum() group by的方式来替换count(distinct)完成计算。
特殊情况特殊处理:
在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。
4、典型的业务场景
1)空值产生的数据倾斜
场景:如日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和 用户表中的user_id 关联,会碰到数据倾斜的问题。
解决方法一:user_id为空的不参与关联
select * from log a
join users b
on a.user_id is not null
and a.user_id = b.user_id
union all
select * from log a
where a.user_id is null;
解决方法二:赋与空值分新的key值
select *
from log a
left outer join users b
on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end =
b.user_id;
结论:
方法2比方法1效率更好,不但io少了,而且作业数也少了。解决方法一中log读取两次,jobs是2。解决方法二job数是1。这个优化适合无效id (比如 -99 , ’’, null 等) 产生的倾斜问题。把空值的key变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上,解决数据倾斜问题。
2)不同数据类型关联产生数据倾斜
场景:用户表中user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时,默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Reducer中。
解决方法:把数字类型转换成字符串类型
select * from users a
left outer join logs b
on a.usr_id = cast(b.user_id as string);
3)小表不小不大,怎么用 map join 解决倾斜问题
使用map join解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小表很大,大到map join会出现bug或异常,这时就需要特别的处理。
例子:
select * from log a
left outer join users b
on a.user_id = b.user_id;
users表有600w+的记录,把users分发到所有的map上也是个不小的开销,而且map join不支持这么大的小表。如果用普通的join,又会碰到数据倾斜的问题。
解决方法:
select /*+mapjoin(x)*/* from log a
left outer join (
select /*+mapjoin(c)*/d.*
from ( select distinct user_id from log ) c
join users d
on c.user_id = d.user_id
) x
on a.user_id = b.user_id;
假如,log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。
Hive数据同步到HDFS,小文件问题怎么解决的?
我们先考虑小文件的产生和影响:
1)哪里会产生小文件
- 源数据本身有很多小文件
- 动态分区会产生大量小文件
- reduce个数越多, 小文件越多
- 按分区插入数据的时候会产生大量的小文件, 文件个数 = maptask个数 * 分区数
2)小文件太多造成的影响
- 从Hive的角度看,小文件会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能。
- HDFS存储太多小文件, 会导致namenode元数据特别大, 占用太多内存, 制约了集群的扩展
小文件解决方案:
方法一:通过调整参数进行合并
1)在Map输入的时候, 把小文件合并
-- 老版本 每个Map最大输入大小,决定合并后的文件数
set mapred.max.split.size=256000000;
-- hadoop2.x 每个Map最大输入大小,决定合并后的文件数
set mapreduce.input.fileinputformat.split.maxsize=2048000000;(2G)
-- 一个节点上split的至少的大小 ,决定了多个data node上的文件是否需要合并
set mapred.min.split.size.per.node=100000000;
-- 一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并
set mapred.min.split.size.per.rack=100000000;
-- 执行Map前进行小文件合并开关开启:true
set hive.hadoop.supports.splittable.combineinputformat = true;
-- 执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
2)在Reduce输出的时候, 把小文件合并
-- (和下面的2选1)在map-only job后合并文件,默认true
set hive.merge.mapfiles = true;
-- (和下面的2选1)在map-reduce job后合并文件,默认false
set hive.merge.mapredfiles = true;
-- 合并后每个文件的大小,默认256000000
set hive.merge.size.per.task = 256000000;
-- 平均文件大小,是决定是否执行合并操作的阈值,默认16000000
set hive.merge.smallfiles.avgsize = 100000000;
方法二:(Hive sql 执行完创建小文件数过多)针对按分区插入数据的时候产生大量的小文件的问题,可以使用DISTRIBUTE BY rand() 将数据随机分配给Reduce,这样可以使得每个Reduce处理的数据大体一致。
-- 设置每个reducer处理的大小为5个G
set hive.exec.reducers.bytes.per.reducer=5120000000;
-- 使用distribute by rand()将数据随机分配给reduce, 避免出现有的文件特别大, 有的文件特别小
insert overwrite table test partition(dt)
select * from iteblog_tmp
DISTRIBUTE BY rand();
方法三:使用Sequencefile作为表存储格式,不要用textfile,在一定程度上可以减少小文件
方法四:使用hadoop的archive归档
-- 用来控制归档是否可用
set hive.archive.enabled=true;
-- 通知Hive在创建归档时是否可以设置父目录
set hive.archive.har.parentdir.settable=true;
-- 控制需要归档文件的大小
set har.partfile.size=1099511627776;
-- 使用以下命令进行归档
ALTER TABLE srcpart ARCHIVE PARTITION(ds='2008-04-08', hr='12');
-- 对已归档的分区恢复为原文件
ALTER TABLE srcpart UNARCHIVE PARTITION(ds='2008-04-08', hr='12');
-- 注意,归档的分区不能够INSERT OVERWRITE,必须先unarchive
Hadoop自带的三种小文件处理方案
-
Hadoop Archive
Hadoop Archive或者HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时,仍然允许对文件进行透明的访问。 -
Sequence file
sequence file由一系列的二进制key/value组成,如果为key小文件名,value为文件内容,则可以将大批小文件合并成一个大文件。 -
CombineFileInputFormat
它是一种新的inputformat,用于将多个文件合并成一个单独的split,另外,它会考虑数据的存储位置。
hive 压缩
压缩Map的输出,这样做有两个好处
- a)压缩是在内存中进行,所以写入map本地磁盘的数据就会变小,大大减少了本地IO次数
- b) Reduce从每个map节点copy数据,也会明显降低网络传输的时间
# 打开job最终输出压缩的开关,设置之后必须设置下面这行,否则还是没有压缩效果
set hive.exec.compress.output=true;
# 设置压缩类型
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
# 大文件压缩仍然会耗时,而且影响mapper并行(mapper并行和文件的个数有关),
# 这个设置,使大的文件可以分割成小文件进行压缩
set mapred.output.compression.type=BLOCK;
# 中间数据map压缩,不影响最终结果。但是job中间数据输出要写在硬盘并通过网络传输到reduce,
# 传送数据量变小,因为shuffle sort(混洗排序)数据被压缩了。
set hive.exec.compress.intermediate=true;
# 为中间数据配置压锁编解码器 ,通常配置Snappy更好。
set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
合并分区小文件操作思路
1.创建备份表
create test.tb_name_bak like test.tb_name;
2.设置合并的相关参数并将原表的数据插入备份表中
#任务结束时合并小文件
SET hive.merge.mapfiles = true;
SET hive.merge.mapredfiles = true;
#合并文件大小256M
SET hive.merge.size.per.task = 256000000;
#当输出文件平均大小小于该值时,启用文件合并
SET hive.merge.smallfiles.avgsize = 134217728;
#压缩输出
SET hive.exec.compress.output = true;
#压缩方式:snappy
SET parquet.compression = snappy;
#默认值为srticat,nonstrict模式表示允许所有分区字段都可以使用动态分区
SET hive.exec.dynamic.partition.mode = nonstrict;
#使用动态分区
SET hive.exec.dynamic.partition = true;
#在所有执行MR的节点上,共可以创建多少个动态分区
SET hive.exec.max.dynamic.partitions=3000;
#在执行MR的单节点上,最大可以创建多少个分区
SET hive.exec.max.dynamic.partitions.pernode=500;
insert overwrite table test.tb_name_bak partition(date_str) select * from test.tb_name;
3.检查备份表和原表数据是否一致,若一致,则进行下一步操作
SELECT count(*) FROM test.tb_name;
SELECT count(*) FROM test.tb_name_bak;
数据转换操作有两种方式:
1.操作表的方式
- 删除原表
drop table test.tb_name;
- 将备份表重命名为原表
alter table test.tb_name_bak rename to test.tb_name;
2.操作hdfs文件方式
- 将原表hdfs路径下的文件移到某个备份目录,确认无误之后,再将文件移动回收站
- 将备份表hdfs路径下的文件移到原表目录下