Hive分区表数据压缩
1.背景
目前公司的Hive分区表采用的TextFile格式存储,占用的存储空间较大,考虑到存储成本,需要对存储的历史数据进行压缩。
2.压缩格式选择
2.1 snappy压缩
优点:高速压缩速度和合理的压缩率;支持Hadoop native库。
缺点:不支持split;压缩率比gzip要低;Hadoop本身不支持,需要安装;linux系统下没有对应的命令。
应用场景:当MapReduce作业的map输出的数据量比较大的时候,作为map到reduce的中间数据的压缩格式;或者作为一个MapReduce作业的输出和另外一个MapReduce作业的输入。
2.2 lzo压缩
优点:压缩/解压速度也比较快,合理的压缩率;支持split,是Hadoop中最流行的压缩格式;支持Hadoop native库;可以在linux系统下安装lzop命令,使用方便。
缺点:压缩率比gzip要低一些;hadoop本身不支持,需要安装;在应用中对lzo格式的文件需要做一些特殊处理(为了支持split需要建索引,还需要指定inputformat为lzo格式)。
应用场景:一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,lzo优点越明显。
2.3 gzip压缩
优点:压缩率比较高,而且压缩/解压速度也比较快;Hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;有Hadoop native库;大部分linux系统都自带gzip命令,使用方便。
缺点:不支持split
应用场景:当每个文件压缩之后在130M以内的,都可以考虑用gzip压缩格式。比如每天的日志压缩成一个gzip文件,运行MapReduce程序的时候通过多个gzip文件达到并发。对于处理这些文件的程序(如Hive、流、MapReduce程序)完全和文本处理一样,压缩之后原来的程序不需要做任何修改。
2.4 bzip2压缩
优点:支持split;具有很高的压缩率,比gzip压缩率都高;Hadoop本身支持,但不支持native;在linux系统下自带bzip2命令,使用方便。
缺点:压缩/解压速度慢;不支持native。
应用场景:适合对速度要求不高,但需要较高的压缩率的场景。可以作为MapReduce作业的输出格式;输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况;对单个很大的文本文件想压缩减少存储空间,同时又需要支持split,而且兼容之前的应用程序(即应用程序不需要修改)的情况。
PS: 由于是对历史数据进行压缩,因此选择了Hadoop原生兼容以及压缩比较高且支持split的bzip2压缩算法。
2.5 查看当前集群支持的压缩编码
目前公司使用的CDH集群,在HDFS服务的配置中搜索io.compression.codecs
即可查看当前Hadoop支持的压缩编码格式,如下图所示,org.apache.hadoop.io.compress.BZip2Codec
就是Bzip2压缩格式的编码解码器,如果没有加上配置,重启服务即可。
3.代码实现
Hive的分区表数据存放在HDFS上,并且存放数据的目录是最内层一级的分区,因此使用Spark和HDFS客户端对文件进行压缩,具体实现思路如下。
- 首先接受一个分区路径,这个路径是最内层一级分区的上一级分区路径
- 遍历当前分区下的所有路径,这个路径就是数据存放的路径
- 读取数据存放路径中的数据,使用Spark Core的
saveAsTextFile
指定输出格式为Bzip2对数据进行压缩输出到当前路径下临时路径中。 - 删除原来分区路径下的数据,将临时路径中压缩好的数据移动到原来分区路径,这样就完成一个分区的数据压缩
- 循环完成就完成了一个一级分区的所有二级分区的压缩
- 最后删除临时路径,完成数据压缩
代码如下所示
public class Text2Bzip {
public static void main(String[] args) throws IOException {
// 创建SparkContext
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("test_bizip2");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
// 获取路径
String path = args[0];
// 获取hdfs客户端
Configuration hadoopConf = new Configuration();
FileSystem fs = FileSystem.get(hadoopConf);
// 遍历当前压缩分区的文件
RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(new Path(path));
while (iterator.hasNext()) {
Path dataPath = iterator.next().getPath();
String tempPath = path + "/compression/" + dataPath.getName();
System.out.println("=======================>");
System.out.println("开始处理: ");
System.out.println("压缩数据目录: " + dataPath);
System.out.println("临时目录: " + tempPath);
// 获取并行度
int parallelize = getParallelize(fs, dataPath.toString());
if (parallelize == -1) {
System.out.println("没有文件!");
} else {
// 执行压缩
zip(sc, dataPath.toString(), tempPath, parallelize);
// 移动文件
mv(fs, tempPath, dataPath.toString());
}
}
// 删除临时目录
fs.delete(new Path(path + "/compression/"), true);
// 停止Spark程序
sc.stop();
}
// 压缩
public static void zip(JavaSparkContext sc, String src, String temp, int parallelize) {
System.out.println("-----------zip-----------------");
sc.textFile(src).coalesce(parallelize).saveAsTextFile(temp, BZip2Codec.class);
}
// 获取并行度
public static int getParallelize(FileSystem fs, String path) throws IOException {
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(path), false);
long size = 0L;
while (iterator.hasNext()) {
size += iterator.next().getLen();
}
if (size == 0) {
return -1;
} else {
// 控制每个task 读取256MB的文件 减少小文件
return (int) (size / 268435456) + 1;
}
}
// 移动文件
public static void mv(FileSystem fs, String src, String dst) throws IOException {
Path dstPath = new Path(dst);
Path srcPath = new Path(src);
fs.delete(dstPath, true);
fs.mkdirs(dstPath);
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(srcPath, false);
while (iterator.hasNext()) {
LocatedFileStatus file = iterator.next();
fs.rename(file.getPath(), new Path(dstPath, file.getPath().getName()));
}
}
}
由于Hadoop原生支持Bzip2压缩算法,因此,不会影响到Hive正常读取,但是读取速度相比之前可能会慢一些。
4.效果
通过观察Spark的WebUI,如下图所示,可以看到输入的数据为4GB左右,压缩后输出的数据为80MB左右,压缩比在50倍左右,当然这也与数据的分布规律有关。