通过写文件方式写入 Hive 数据
Hive最简单的写入数据方式就是通过Hive Jdbc写入Hive数据,但这并不是写入Hive最高效的方法。
Hive通过读取相关Hdfs的文件来获取数据信息,而通过直接写入Hdfs文件数据达到写入Hive数据的效果,这是目前最高效的方法。
通用写法
最通用的写法就是通过Serializer配合StandardStructObjectInspector序列化数据,再通过RecordWriter写入数据,它适用于几乎目前所有的文件类型。
StandardStructObjectInspector用于描述表结构和字段类型。
Serializer有多种实现,分别对应每种Hadoop文件格式的序列化器,例如:ParquetHiveSerDe、AvroSerDe、OrcSerde等。
RecordWriter创建需要HiveOutputFormat,HiveOutputFormat也是有多种Hadoop文件格式的实现的,例如:OrcOutputFormat、HiveIgnoreKeyTextOutputFormat、MapredParquetOutputFormat,用于写入相应格式的数据。
通过StorageFormatDescriptor可以快速的获取相应文件格式的Serializer、HiveOutputFormat,只需要StorageFormatFactory#get(formatType)即可创建一个对应文件格式类型的StorageFormatDescriptor,StorageFormatDescriptor也是有各种数据格式类型实现的,例如TextFileStorageFormatDescriptor、ParquetFileStorageFormatDescriptor等等。
StorageFormatDescriptor的getSerde()、getOutputFormat()、getInputFormat()等方法,可以获取Serializer和HiveOutputFormat。
当然你也可以通过Table API获取StorageDescriptor从而获取相应的OutputFormat和Serializer。
@Test
public void test2()
throws ClassNotFoundException, IllegalAccessException, InstantiationException,
HiveException, IOException, SerDeException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "");
StorageDescriptor sd = Table.getEmptyTable(null, null).getSd();
SerDeInfo serDeInfo = new SerDeInfo();
HashMap<String, String> parameters = new HashMap<>();
parameters.put(serdeConstants.SERIALIZATION_FORMAT, "1");
serDeInfo.setParameters(parameters);
serDeInfo.setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
sd.setInputFormat(SequenceFileInputFormat.class.getName());
sd.setOutputFormat(HiveSequenceFileOutputFormat.class.getName());
StorageFormatFactory storageFormatFactory = new StorageFormatFactory();
sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName());
// 通过格式类型获取StorageFormatDescriptor,这里一般有TEXT、AVRO、PARQUET、ORC这几种,可通过IOConstants查看
StorageFormatDescriptor storageFormatDescriptor =
storageFormatFactory.get(IOConstants.TEXTFILE);
sd.setInputFormat(storageFormatDescriptor.getInputFormat());
sd.setOutputFormat(storageFormatDescriptor.getOutputFormat());
String serdeLib = storageFormatDescriptor.getSerde();
if (serdeLib != null) {
sd.getSerdeInfo().setSerializationLib(serdeLib);
}
SerDeInfo serdeInfo = sd.getSerdeInfo();
Properties tableProperties = new Properties();
// tableProperties.put(serdeConstants.FIELD_DELIM, (byte) 1);
tableProperties.setProperty(serdeConstants.FIELD_DELIM, ",");
// tableProperties.setProperty(serdeConstants.COLLECTION_DELIM, "");
// tableProperties.setProperty(serdeConstants.MAPKEY_DELIM, "");
Serializer recordSerDe =
(Serializer) (Class.forName(serdeInfo.getSerializationLib()).newInstance());
SerDeUtils.initializeSerDe(
(Deserializer) recordSerDe, configuration, tableProperties, null);
Class<? extends OutputFormat> outputFormatClz =
HiveFileFormatUtils.getOutputFormatSubstitute(
Class.forName(storageFormatDescriptor.getOutputFormat()));
HiveOutputFormat outputFormat = (HiveOutputFormat) outputFormatClz.newInstance();
// 这里对应hive相应的表、分区路径、还有一个随机的文件名
Path path =
new Path( ".../hive/warehouse/table_name/pt_day=12/pt_hour=12/test");
JobConf jobConf = new JobConf(configuration);
jobConf.setMapOutputCompressorClass(GzipCodec.class);
jobConf.set(
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.COMPRESS_CODEC,
GzipCodec.class.getName());
FileSinkOperator.RecordWriter recordWriter =
HiveFileFormatUtils.getRecordWriter(
jobConf,
outputFormat,
recordSerDe.getSerializedClass(),
false,
tableProperties,
path,
Reporter.NULL);
ObjectInspector intInspector =
ObjectInspectorFactory.getReflectionObjectInspector(
Integer.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
StandardListObjectInspector intListInspector =
ObjectInspectorFactory.getStandardListObjectInspector(intInspector);
StandardStructObjectInspector standardStructObjectInspector =
ObjectInspectorFactory.getStandardStructObjectInspector(
new ArrayList<>(List.of("address")),
new ArrayList<>(Arrays.asList(intListInspector)));
Object[] instance = new Object[1];
ArrayList<Integer> address = new ArrayList<>();
for (int i = 5; i < 10; i++) {
address.add(i * i);
}
instance[0] = address;
Writable serialize = recordSerDe.serialize(instance, standardStructObjectInspector);
recordWriter.write(serialize);
recordWriter.close(false);
}
其他写法
Text格式
通过TextOutputFormat写入Text格式的Hive表数据文件,以下是一张拥有"id", "address"字段的表,而map是一个Map类型的字段
@Test
public void testWriteMap() {
UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");
admin.doAs(
(PrivilegedAction<Void>)
() -> {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "...");
OrcSerde orcSerde = new OrcSerde();
Object[] instance = new Object[2];
instance[0] = 1;
ArrayList<Integer> address = new ArrayList<>();
for (int i = 5; i < 10; i++) {
address.add(i * i);
}
instance[1] = address;
ObjectInspector intInspector =
ObjectInspectorFactory.getReflectionObjectInspector(
Integer.class,
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
StandardListObjectInspector intListInspector =
ObjectInspectorFactory.getStandardListObjectInspector(
intInspector);
StandardStructObjectInspector standardStructObjectInspector =
ObjectInspectorFactory.getStandardStructObjectInspector(
new ArrayList<>(List.of("id", "address")),
new ArrayList<>(
Arrays.asList(intInspector, intListInspector)));
Writable serialize =
orcSerde.serialize(instance, standardStructObjectInspector);
TextOutputFormat<Object, Object> objectObjectTextOutputFormat =
new TextOutputFormat<>();
Path path =
new Path(
".../hive/warehouse/table_name/partition/file");
try {
JobConf entries = new JobConf(configuration);
RecordWriter<Object, Object> recordWriter =
objectObjectTextOutputFormat.getRecordWriter(
null, entries, path.toString(), Reporter.NULL);
recordWriter.write(NullWritable.get(), serialize);
recordWriter.close(Reporter.NULL);
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
});
}
ORC格式
ORC格式的写入和Text相似,不多说,只示范Map类型写入
写入MAP<STRING, MAP<STRING, STRING>>类型数据
@Test
public void testWriteMap() {
UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");
admin.doAs(
(PrivilegedAction<Void>)
() -> {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "...");
OrcSerde orcSerde = new OrcSerde();
Object[] instance = new Object[2];
instance[0] = 1;
ArrayList<Integer> address = new ArrayList<>();
for (int i = 5; i < 10; i++) {
address.add(i * i);
}
instance[1] = address;
ObjectInspector intInspector =
ObjectInspectorFactory.getReflectionObjectInspector(
Integer.class,
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
StandardListObjectInspector intListInspector =
ObjectInspectorFactory.getStandardListObjectInspector(
intInspector);
StandardStructObjectInspector standardStructObjectInspector =
ObjectInspectorFactory.getStandardStructObjectInspector(
new ArrayList<>(List.of("id", "address")),
new ArrayList<>(
Arrays.asList(intInspector, intListInspector)));
Writable serialize =
orcSerde.serialize(instance, standardStructObjectInspector);
OrcOutputFormat orcOutputFormat = new OrcOutputFormat();
Path path =
new Path(".../hive/warehouse/table_name/partition/file");
try {
JobConf entries = new JobConf(configuration);
RecordWriter recordWriter =
orcOutputFormat.getRecordWriter(
null, entries, path.toString(), Reporter.NULL);
recordWriter.write(NullWritable.get(), serialize);
recordWriter.close(Reporter.NULL);
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
});
}
Parquet格式
Parquest通过MessageType表示表结构,用group存储数据类型和数据,最后通过ParquetWriter写入数据
写入MAP<STRING, MAP<STRING, STRING>>类型数据
数据如下:
id: 100
address
key_value
key: key0
value: value0
key_value
key: key1
value: value1
key_value
key: key2
value: value4
格式如下:
message Pair {
optional int32 id;
optional group address (MAP) {
repeated group key_value {
optional binary key;
optional binary value;
}
}
}
代码如下:
@Test
public void testWriteIdWithMap1() {
UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");
admin.doAs(
(PrivilegedAction<Void>)
() -> {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "");
try {
Path path =
new Path(".../hive/warehouse/table_name/partition/file");
Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();
String name = "address";
// 注意这里的named后面必须是key、value
PrimitiveType keyType =
Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
.named("key");
PrimitiveType valueType =
Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
.named("value");
messageTypeBuilder
.optional(PrimitiveType.PrimitiveTypeName.INT32)
.named("id");
messageTypeBuilder
.optionalMap()
.key(keyType)
.value(valueType)
.named(name);
MessageType pari = messageTypeBuilder.named("Pair");
SimpleGroup simpleGroup = new SimpleGroup(pari);
ParquetWriter<Group> parquetWriter =
ExampleParquetWriter.builder(path)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withWriterVersion(
ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(
CompressionCodecName.UNCOMPRESSED)
.withConf(configuration)
.withType(pari)
.withDictionaryEncoding(false)
.withRowGroupSize(134217728L)
.build();
simpleGroup.add(0, 100);
Group mapGroup = simpleGroup.addGroup(1);
for (int i = 0; i < 3; i++) {
Group entry0 = mapGroup.addGroup(0);
entry0.add(0, "key" + i);
entry0.add(1, "value" + i * i);
}
parquetWriter.write(simpleGroup);
parquetWriter.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
});
}
写入ARRAY<ARRAY<INT>>类型数据
@Test
public void testWriteIdWithArrayArray2() {
UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");
admin.doAs(
(PrivilegedAction<Void>)
() -> {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "...");
try {
Path path =
new Path(
".../hive/warehouse/table_name/partition/file");
Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();
PrimitiveType named =
Types.optional(PrimitiveType.PrimitiveTypeName.INT32)
.named("address");
messageTypeBuilder
.optional(PrimitiveType.PrimitiveTypeName.INT32)
.named("id");
messageTypeBuilder
.optionalList()
.optionalListElement()
.element(named)
.named("address")
.named("address");
MessageType pari = messageTypeBuilder.named("Pair");
SimpleGroup simpleGroup = new SimpleGroup(pari);
ParquetWriter<Group> parquetWriter =
ExampleParquetWriter.builder(path)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withWriterVersion(
ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(
CompressionCodecName.UNCOMPRESSED)
.withConf(configuration)
.withType(pari)
.withDictionaryEncoding(false)
.withRowGroupSize(134217728L)
.build();
simpleGroup.add(0, 100);
// add group
Group address = simpleGroup.addGroup(1);
for (int i = 0; i < 5; i++) {
// group add list entry
Group listGroup = address.addGroup(0);
// add group
Group sublist = listGroup.addGroup(0);
for (int j = 5; j < 10; j++) {
// group add list entry
Group subListGroup = sublist.addGroup(0);
subListGroup.add(0, i * i);
}
}
parquetWriter.write(simpleGroup);
parquetWriter.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
});
}