通过写文件方式写入 Hive 数据

news2025/1/8 23:40:31

通过写文件方式写入 Hive 数据

Hive最简单的写入数据方式就是通过Hive Jdbc写入Hive数据,但这并不是写入Hive最高效的方法。

Hive通过读取相关Hdfs的文件来获取数据信息,而通过直接写入Hdfs文件数据达到写入Hive数据的效果,这是目前最高效的方法。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

通用写法

最通用的写法就是通过Serializer配合StandardStructObjectInspector序列化数据,再通过RecordWriter写入数据,它适用于几乎目前所有的文件类型。

StandardStructObjectInspector用于描述表结构和字段类型。

Serializer有多种实现,分别对应每种Hadoop文件格式的序列化器,例如:ParquetHiveSerDe、AvroSerDe、OrcSerde等。

RecordWriter创建需要HiveOutputFormat,HiveOutputFormat也是有多种Hadoop文件格式的实现的,例如:OrcOutputFormat、HiveIgnoreKeyTextOutputFormat、MapredParquetOutputFormat,用于写入相应格式的数据。

通过StorageFormatDescriptor可以快速的获取相应文件格式的Serializer、HiveOutputFormat,只需要StorageFormatFactory#get(formatType)即可创建一个对应文件格式类型的StorageFormatDescriptor,StorageFormatDescriptor也是有各种数据格式类型实现的,例如TextFileStorageFormatDescriptor、ParquetFileStorageFormatDescriptor等等。

StorageFormatDescriptor的getSerde()、getOutputFormat()、getInputFormat()等方法,可以获取Serializer和HiveOutputFormat。

当然你也可以通过Table API获取StorageDescriptor从而获取相应的OutputFormat和Serializer。

@Test
public void test2()
    throws ClassNotFoundException, IllegalAccessException, InstantiationException,
HiveException, IOException, SerDeException {
    Configuration configuration = new Configuration();
    configuration.set("fs.defaultFS", "");

    StorageDescriptor sd = Table.getEmptyTable(null, null).getSd();
    SerDeInfo serDeInfo = new SerDeInfo();
    HashMap<String, String> parameters = new HashMap<>();
    parameters.put(serdeConstants.SERIALIZATION_FORMAT, "1");
    serDeInfo.setParameters(parameters);
    serDeInfo.setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
    sd.setInputFormat(SequenceFileInputFormat.class.getName());
    sd.setOutputFormat(HiveSequenceFileOutputFormat.class.getName());

    StorageFormatFactory storageFormatFactory = new StorageFormatFactory();
    sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName());
    // 通过格式类型获取StorageFormatDescriptor,这里一般有TEXT、AVRO、PARQUET、ORC这几种,可通过IOConstants查看
    StorageFormatDescriptor storageFormatDescriptor =
        storageFormatFactory.get(IOConstants.TEXTFILE);
    sd.setInputFormat(storageFormatDescriptor.getInputFormat());
    sd.setOutputFormat(storageFormatDescriptor.getOutputFormat());
    String serdeLib = storageFormatDescriptor.getSerde();
    if (serdeLib != null) {
        sd.getSerdeInfo().setSerializationLib(serdeLib);
    }
    SerDeInfo serdeInfo = sd.getSerdeInfo();
    Properties tableProperties = new Properties();
    //        tableProperties.put(serdeConstants.FIELD_DELIM, (byte) 1);
    tableProperties.setProperty(serdeConstants.FIELD_DELIM, ",");
    //        tableProperties.setProperty(serdeConstants.COLLECTION_DELIM, "");
    //        tableProperties.setProperty(serdeConstants.MAPKEY_DELIM, "");
    Serializer recordSerDe =
        (Serializer) (Class.forName(serdeInfo.getSerializationLib()).newInstance());
    SerDeUtils.initializeSerDe(
        (Deserializer) recordSerDe, configuration, tableProperties, null);
    Class<? extends OutputFormat> outputFormatClz =
        HiveFileFormatUtils.getOutputFormatSubstitute(
        Class.forName(storageFormatDescriptor.getOutputFormat()));
    HiveOutputFormat outputFormat = (HiveOutputFormat) outputFormatClz.newInstance();
    // 这里对应hive相应的表、分区路径、还有一个随机的文件名
    Path path =
        new Path( ".../hive/warehouse/table_name/pt_day=12/pt_hour=12/test");
    JobConf jobConf = new JobConf(configuration);
    jobConf.setMapOutputCompressorClass(GzipCodec.class);
    jobConf.set(
        org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.COMPRESS_CODEC,
        GzipCodec.class.getName());
    FileSinkOperator.RecordWriter recordWriter =
        HiveFileFormatUtils.getRecordWriter(
        jobConf,
        outputFormat,
        recordSerDe.getSerializedClass(),
        false,
        tableProperties,
        path,
        Reporter.NULL);

    ObjectInspector intInspector =
        ObjectInspectorFactory.getReflectionObjectInspector(
        Integer.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
    StandardListObjectInspector intListInspector =
        ObjectInspectorFactory.getStandardListObjectInspector(intInspector);
    StandardStructObjectInspector standardStructObjectInspector =
        ObjectInspectorFactory.getStandardStructObjectInspector(
        new ArrayList<>(List.of("address")),
        new ArrayList<>(Arrays.asList(intListInspector)));

    Object[] instance = new Object[1];
    ArrayList<Integer> address = new ArrayList<>();
    for (int i = 5; i < 10; i++) {
        address.add(i * i);
    }
    instance[0] = address;
    Writable serialize = recordSerDe.serialize(instance, standardStructObjectInspector);
    recordWriter.write(serialize);
    recordWriter.close(false);
}

其他写法

Text格式

通过TextOutputFormat写入Text格式的Hive表数据文件,以下是一张拥有"id", "address"字段的表,而map是一个Map类型的字段

@Test
public void testWriteMap() {
    UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");
    admin.doAs(
        (PrivilegedAction<Void>)
        () -> {
            Configuration configuration = new Configuration();
            configuration.set("fs.defaultFS", "...");
            OrcSerde orcSerde = new OrcSerde();

            Object[] instance = new Object[2];
            instance[0] = 1;
            ArrayList<Integer> address = new ArrayList<>();
            for (int i = 5; i < 10; i++) {
                address.add(i * i);
            }

            instance[1] = address;
            ObjectInspector intInspector =
                ObjectInspectorFactory.getReflectionObjectInspector(
                Integer.class,
                ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
            StandardListObjectInspector intListInspector =
                ObjectInspectorFactory.getStandardListObjectInspector(
                intInspector);
            StandardStructObjectInspector standardStructObjectInspector =
                ObjectInspectorFactory.getStandardStructObjectInspector(
                new ArrayList<>(List.of("id", "address")),
                new ArrayList<>(
                    Arrays.asList(intInspector, intListInspector)));
            Writable serialize =
                orcSerde.serialize(instance, standardStructObjectInspector);
            TextOutputFormat<Object, Object> objectObjectTextOutputFormat =
                new TextOutputFormat<>();
            Path path =
                new Path(
                ".../hive/warehouse/table_name/partition/file");
            try {
                JobConf entries = new JobConf(configuration);
                RecordWriter<Object, Object> recordWriter =
                    objectObjectTextOutputFormat.getRecordWriter(
                    null, entries, path.toString(), Reporter.NULL);
                recordWriter.write(NullWritable.get(), serialize);
                recordWriter.close(Reporter.NULL);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            return null;
        });
}

ORC格式

ORC格式的写入和Text相似,不多说,只示范Map类型写入

写入MAP<STRING, MAP<STRING, STRING>>类型数据
@Test
public void testWriteMap() {
    UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");
    admin.doAs(
        (PrivilegedAction<Void>)
        () -> {
            Configuration configuration = new Configuration();
            configuration.set("fs.defaultFS", "...");
            OrcSerde orcSerde = new OrcSerde();
            Object[] instance = new Object[2];
            instance[0] = 1;
            ArrayList<Integer> address = new ArrayList<>();
            for (int i = 5; i < 10; i++) {
                address.add(i * i);
            }
            instance[1] = address;
            ObjectInspector intInspector =
                ObjectInspectorFactory.getReflectionObjectInspector(
                Integer.class,
                ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
            StandardListObjectInspector intListInspector =
                ObjectInspectorFactory.getStandardListObjectInspector(
                intInspector);
            StandardStructObjectInspector standardStructObjectInspector =
                ObjectInspectorFactory.getStandardStructObjectInspector(
                new ArrayList<>(List.of("id", "address")),
                new ArrayList<>(
                    Arrays.asList(intInspector, intListInspector)));
            Writable serialize =
                orcSerde.serialize(instance, standardStructObjectInspector);
            OrcOutputFormat orcOutputFormat = new OrcOutputFormat();
            Path path =
                new Path(".../hive/warehouse/table_name/partition/file");
            try {
                JobConf entries = new JobConf(configuration);
                RecordWriter recordWriter =
                    orcOutputFormat.getRecordWriter(
                    null, entries, path.toString(), Reporter.NULL);
                recordWriter.write(NullWritable.get(), serialize);
                recordWriter.close(Reporter.NULL);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            return null;
        });
}

Parquet格式

Parquest通过MessageType表示表结构,用group存储数据类型和数据,最后通过ParquetWriter写入数据

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

写入MAP<STRING, MAP<STRING, STRING>>类型数据

数据如下:

id: 100
address
  key_value
    key: key0
    value: value0
  key_value
    key: key1
    value: value1
  key_value
    key: key2
    value: value4

格式如下:

message Pair {
  optional int32 id;
  optional group address (MAP) {
    repeated group key_value {
      optional binary key;
      optional binary value;
    }
  }
}

代码如下:

@Test
public void testWriteIdWithMap1() {
    UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");
    admin.doAs(
        (PrivilegedAction<Void>)
        () -> {
            Configuration configuration = new Configuration();
            configuration.set("fs.defaultFS", "");
            try {
                Path path =
                    new Path(".../hive/warehouse/table_name/partition/file");
                Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();
                String name = "address";
                // 注意这里的named后面必须是key、value
                PrimitiveType keyType =
                    Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
                    .named("key");
                PrimitiveType valueType =
                    Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
                    .named("value");
                messageTypeBuilder
                    .optional(PrimitiveType.PrimitiveTypeName.INT32)
                    .named("id");
                messageTypeBuilder
                    .optionalMap()
                    .key(keyType)
                    .value(valueType)
                    .named(name);
                MessageType pari = messageTypeBuilder.named("Pair");
                SimpleGroup simpleGroup = new SimpleGroup(pari);
                ParquetWriter<Group> parquetWriter =
                    ExampleParquetWriter.builder(path)
                    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
                    .withWriterVersion(
                    ParquetProperties.WriterVersion.PARQUET_1_0)
                    .withCompressionCodec(
                    CompressionCodecName.UNCOMPRESSED)
                    .withConf(configuration)
                    .withType(pari)
                    .withDictionaryEncoding(false)
                    .withRowGroupSize(134217728L)
                    .build();
                simpleGroup.add(0, 100);
                Group mapGroup = simpleGroup.addGroup(1);
                for (int i = 0; i < 3; i++) {
                    Group entry0 = mapGroup.addGroup(0);
                    entry0.add(0, "key" + i);
                    entry0.add(1, "value" + i * i);
                }
                parquetWriter.write(simpleGroup);
                parquetWriter.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            return null;
        });
}
写入ARRAY<ARRAY<INT>>类型数据
@Test
public void testWriteIdWithArrayArray2() {
    UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");
    admin.doAs(
        (PrivilegedAction<Void>)
        () -> {
            Configuration configuration = new Configuration();
            configuration.set("fs.defaultFS", "...");
            try {
                Path path =
                    new Path(
                    ".../hive/warehouse/table_name/partition/file");
                Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();
                PrimitiveType named =
                    Types.optional(PrimitiveType.PrimitiveTypeName.INT32)
                    .named("address");
                messageTypeBuilder
                    .optional(PrimitiveType.PrimitiveTypeName.INT32)
                    .named("id");
                messageTypeBuilder
                    .optionalList()
                    .optionalListElement()
                    .element(named)
                    .named("address")
                    .named("address");
                MessageType pari = messageTypeBuilder.named("Pair");
                SimpleGroup simpleGroup = new SimpleGroup(pari);
                ParquetWriter<Group> parquetWriter =
                    ExampleParquetWriter.builder(path)
                    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
                    .withWriterVersion(
                    ParquetProperties.WriterVersion.PARQUET_1_0)
                    .withCompressionCodec(
                    CompressionCodecName.UNCOMPRESSED)
                    .withConf(configuration)
                    .withType(pari)
                    .withDictionaryEncoding(false)
                    .withRowGroupSize(134217728L)
                    .build();
                simpleGroup.add(0, 100);
                // add group
                Group address = simpleGroup.addGroup(1);
                for (int i = 0; i < 5; i++) {
                    // group add list entry
                    Group listGroup = address.addGroup(0);
                    // add group
                    Group sublist = listGroup.addGroup(0);
                    for (int j = 5; j < 10; j++) {
                        // group add list entry
                        Group subListGroup = sublist.addGroup(0);
                        subListGroup.add(0, i * i);
                    }
                }
                parquetWriter.write(simpleGroup);
                parquetWriter.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            return null;
        });
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2083022.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《python语言程序设计》2018版第8章第2题检测子串,你可以用str类中的find方法检测一个字符串

我先用in来做一次 def find_text(text_input1, text_input2):a str(text_input1)b str(text_input2)if b in a:print(f"The {b} is in {a} ")else:print(f"The {b} is not in {a} ")text_n1 "Welcome to shenyang" text_n2 "to"fi…

zdppy_cache缓存框架升级,支持用户级别的缓存隔离,支持超级管理员管理普通用户的缓存

启动服务 import zdppy_api as api import zdppy_cachekey1 "admin" key2 "admin"app api.Api(routes[*zdppy_cache.zdppy_api.cache(key1, key2, api) ])if __name__ __main__:import zdppy_uvicornzdppy_uvicorn.run(app, host"0.0.0.0",…

JVM-类加载过程

类加载过程是 Java 虚拟机 (JVM) 将 Java 代码编译后的字节码文件加载到内存中&#xff0c;并进行解析和验证&#xff0c;最终使程序能够运行的关键步骤。 类加载过程&#xff1a;加载->连接->初始化。连接过程又可分为三步&#xff1a;验证->准备->解析。 1. 加载…

Vitis AI 基本认知(Tiny-VGG 标签获取+预测后处理)

目录 1. 简介 2. 解析 2.1 获取标签 2.1.1 载入数据集 2.1.2 标签-Index 2.1.3 保存和读取类别标签 2.2 读取单个图片 2.3 载入模型并推理 2.3.1 tiny-vgg 模型结构 2.3.2 运行推理 2.4 置信度柱状图 2.5 预测标签 3. 完整代码 4. 总结 1. 简介 本博文在《Vitis …

Python酷库之旅-第三方库Pandas(105)

目录 一、用法精讲 456、pandas.DataFrame.rdiv方法 456-1、语法 456-2、参数 456-3、功能 456-4、返回值 456-5、说明 456-6、用法 456-6-1、数据准备 456-6-2、代码示例 456-6-3、结果输出 457、pandas.DataFrame.rtruediv方法 457-1、语法 457-2、参数 457-3…

云计算实训38——docker网络、跨主机容器之间的通讯

一、docker⽹络 1.桥接--bridge 所有容器连接到桥就可以使⽤外⽹&#xff0c;使⽤nat让容器可以访问外⽹ 使⽤ ip a s指令查看桥&#xff0c;所有容器连接到此桥&#xff0c;ip地址都是 172.17.0.0/16 ⽹段&#xff0c;桥是启动docker服务后出现&#xff0c;在centos使⽤ br…

centos安装mysql8.0版本,并且实现远程连接

一、 卸载mysql 查看mysql安装情况 rpm -qa | grep -i mysql 删除上图中所有信息 rpm -ev mysql-community-release-el7-5.noarch --nodeps 再次查询&#xff0c;没有数据&#xff0c;则为删除干净 find / -name mysql rm -rf /var/lib/mysql 将机器上的所有mysql相关文…

一篇文章带你真正了解接口测试(附视频教程+面试真题)

一、什么是接口测试&#xff1f; 所谓接口&#xff0c;是指同一个系统中模块与模块间的数据传递接口、前后端交互、跨系统跨平台跨数据库的对接。而接口测试&#xff0c;则是通过接口的不同情况下的输入&#xff0c;去对比输出&#xff0c;看看是否满足接口规范所规定的功能、…

79.位域

目录 一.位域的概念 二.语法格式 三.无名位域 四.视频教程 一.位域的概念 有些数据在存储的时候并不需要一个完整的字节。比如使用一个变量表示开关的状态&#xff0c;开关只有开和关俩个状态&#xff0c;所以只需要使用0和1表示&#xff0c;也就是一个二进制位。所以这时候…

前端提升之——chrome浏览器插件开发指南——chrome插件介绍及入门

前言 有一天突发奇想&#xff0c;想要自己写一个浏览器插件玩一玩&#xff0c;并不做用于商业或者其他方面&#xff0c;仅仅用于自我技术的练习和提升。 这里的浏览器我选择Chrome&#xff0c;当然chrome插件同样适用于微软自带的 Microsoft Edge 在当今发达的互联网环境下&…

云微客短视频矩阵如何打造多元化的视频内容呢?

随着抖音、快手等平台的兴起&#xff0c;短视频已经成为了人们日常生活的一部分&#xff0c;也有不少企业通过短视频赛道实现了品牌曝光和获客引流&#xff0c;但是单一的视频内容终究很难长久的吸引用户&#xff0c;所以如何打造多元化的视频内容呢&#xff1f; 在这个快节奏的…

【二叉树】OJ题目

&#x1f31f;个人主页&#xff1a;落叶 目录 单值⼆叉树 【单值二叉树】代码 相同的树 【相同二叉树】代码 对称⼆叉树 【对称二叉树】代码 另一颗树的子树 【另一颗树的子树】代码 二叉树的前序遍历 【二叉树前序遍历】代码 二叉树的中序遍历 【二叉树中序遍历】…

【数据结构】栈和队列相互实现

目录 栈实现队列 思路 入队列 出队列 获取队头元素 队列实现栈 思路 入栈 出栈 获取栈顶元素 完整代码 栈实现队列 队列实现栈 栈实现队列 思路 栈的特点是 先进后出&#xff0c; 队列的特点是 先进新出&#xff0c;这就意味着我们无法通过一个栈来实现队列&…

YOLOv5改进 | 融合改进 | C3融合Efficient Multi-Scale Conv Plus【完整代码】

秋招面试专栏推荐 &#xff1a;深度学习算法工程师面试问题总结【百面算法工程师】——点击即可跳转 &#x1f4a1;&#x1f4a1;&#x1f4a1;本专栏所有程序均经过测试&#xff0c;可成功执行&#x1f4a1;&#x1f4a1;&#x1f4a1; 专栏目录&#xff1a; 《YOLOv5入门 改…

生成式AI扩散模型-Diffusion Model【李宏毅2023】概念讲解、原理剖析笔记

目录 一、Diffusion的基本概念和运作方法 1.Diffusion Model是如何运作的&#xff1f; 2.Denoise模块内部正在做的事情 如何训练Noise predictor&#xff1f; 1&#xff09;Forward Process (Diffusion Process) 2&#xff09;noise predictor 3.Text-to-Image 4.两个A…

MySQL必会知识精华3(使用MySQL)

我们的目标是&#xff1a;按照这一套资料学习下来&#xff0c;大家可以完成数据库增删改查的实际操作。轻松应对面试或者笔试题中MySQL相关题目 上篇文章我们先做一下数据库的基础知识以及MySQL的简单介绍。本篇文章主要连接使用MySQL的相关知识。相对简单&#xff0c;争取做到…

Datawhle X 李宏毅苹果书AI夏令营深度学习笔记之——局部最小值与鞍点

深度学习中优化神经网络是一个重要的问题&#xff0c;我们经常沮丧地发现到了一个节点&#xff0c;不管参数怎么更新&#xff0c;训练的损失都不会下降&#xff0c;神经网络似乎训练不起来了。这可能和损失函数收敛在局部最小值与鞍点有关。 一、 局部最小值&#xff08;local…

‌蜘蛛的工作原理及蜘蛛池的搭建与优化

蜘蛛的工作原理主要包括跟踪网页链接、‌采用一定的爬行策略遍历互联网&#xff0c;‌以及将新内容添加到引擎的索引中。‌具体来说&#xff1a;‌ 跟踪网页链接‌&#xff1a;‌蜘蛛会从一个或多个初始URL开始&#xff0c;‌通过这些URL发现新的链接&#xff0c;‌并将这些链接…

数据的基本类型

数据的基本类型 字符串 切片 切片语法&#xff1a; strs "hello" strs[0:]整数型 浮点型 布尔类型

vscode c++和cuda开发环境配置

文章目录 1. vscode 插件安装2. 开发环境配置2.1 bear 安装2.2 代码的编译2.2.1 编写Makefile文件2.2.2 bear make和make命令2.3 debug环境配置2.1 函数跳转设置2.1.1 ` c_cpp_properties.json` 设置2.1.2 settings.json设置2.2 调试环境配置2.2.1 tasks.json2.2.2 launch.json…