将Parquet文件的数据导入Hive 、JSON文件导入ES

news2025/1/12 19:54:18

文章目录

  • 将Parquet文件的数据导入Hive
    • 查询parquet文件格式
      • 编译cli工具
      • 查看元数据信息
      • 查询抽样数据
    • 创建hive表 数据存储格式采用parquet
    • 加载文件
  • 将json数据导入ES
    • ES批量导入api
    • 原始json文件内容
    • 索引结构
    • 重组json脚本
    • 重组后的json文件
    • bulk api调用

将Parquet文件的数据导入Hive

查询parquet文件格式

主要利用社区工具 https://github.com/apache/parquet-mr/

编译cli工具

 cd parquet-cli;
 mvn clean install -DskipTests;

查看元数据信息

 java -cp parquet-cli-1.13.1.jar;dependency/* org.apache.parquet.cli.Main meta yellow_tripdata_2023-03.parquet

在这里插入图片描述

查询抽样数据

 java -cp parquet-cli-1.13.1.jar;dependency/* org.apache.parquet.cli.Main head -n 2 yellow_tripdata_2023-03.parquet
{"VendorID": 2, "tpep_pickup_datetime": 1677629203000000, "tpep_dropoff_datetime": 1677629803000000, "passenger_count": 1, "trip_distance": 0.0, "RatecodeID": 1, "store_and_fwd_flag": "N", "PULocationID": 238, "DOLocationID": 42, "payment_type": 2, "fare_amount": 8.6, "extra": 1.0, "mta_tax": 0.5, "tip_amount": 0.0, "tolls_amount": 0.0, "improvement_surcharge": 1.0, "total_amount": 11.1, "congestion_surcharge": 0.0, "Airport_fee": 0.0}
{"VendorID": 2, "tpep_pickup_datetime": 1677629305000000, "tpep_dropoff_datetime": 1677631170000000, "passenger_count": 2, "trip_distance": 12.4, "RatecodeID": 1, "store_and_fwd_flag": "N", "PULocationID": 138, "DOLocationID": 231, "payment_type": 1, "fare_amount": 52.7, "extra": 6.0, "mta_tax": 0.5, "tip_amount": 12.54, "tolls_amount": 0.0, "improvement_surcharge": 1.0, "total_amount": 76.49, "congestion_surcharge": 2.5, "Airport_fee": 1.25}      

parquet 和 hive 的 field 类型映射关系

parquet 字段类型hive 字段类型
BINARYSTRING
BOOLEANBOOLEAN
DOUBLEDOUBLE
FLOATFLOAT
INT32INT
INT64BIGINT
INT96TIMESTAMP
BINARY + OriginalType UTF8STRING
BINARY + OriginalType DECIMALDECIMAL

创建hive表 数据存储格式采用parquet

# 创建以parquet存储的表
  CREATE TABLE `test_trino.yellow_taxi_trip_records_tmp`
(
  `VendorID` int COMMENT '仪表供应商ID', 
  `tpep_pickup_datetime` TIMESTAMP COMMENT '仪表启动时间', 
  `tpep_dropoff_datetime` TIMESTAMP COMMENT '仪表关闭时间',
  `passenger_count` bigint COMMENT '乘客数量', 
  `trip_distance` double COMMENT '行程距离',
  `RateCodeID` bigint COMMENT '费率编码',
  `store_and_fwd_flag` string COMMENT '是否存储',
  `PULocationID` bigint COMMENT '上车区域坐标',
  `DOLocationID` bigint COMMENT '下场区域坐标',
  `payment_type` bigint COMMENT '付款方式',
  `fare_amount` double COMMENT '票价',
  `extra` double COMMENT '杂费附加费',
  `mta_tax` double COMMENT '税费',
  `tip_amount` double COMMENT '小费',
  `tolls_amount` double COMMENT '过路费',
  `improvement_surcharge` double COMMENT '改善附加费',
  `total_amount` double COMMENT '费用总计,不包含现金小费',
  `congestion_surcharge` double COMMENT '拥堵费',
  `airport_fee` double COMMENT '机房上下车费用'
)
COMMENT '黄色的出租车记录'
PARTITIONED BY ( 
  `ym` string COMMENT '分区字段,年月(yyyyMM)')
STORED AS PARQUET;

加载文件

  # 利用hive客户端load parquet数据
    LOAD DATA LOCAL INPATH '/opt/yellow_tripdata_2023-02.parquet' OVERWRITE INTO TABLE `test_trino.yellow_taxi_trip_records_tmp` PARTITION (ym=202302);

将json数据导入ES

ES批量导入api

批量写入es需要使用bulk api,这个API支持json文件的数据导入。

原始json文件内容

{"geonameid": 2986043, "name": "Pic de Font Blanca", "latitude": 42.64991, "longitude": 1.53335, "country_code": "AD", "population": 0}
{"geonameid": 2994701, "name": "Roc Mélé", "latitude": 42.58765, "longitude": 1.74028, "country_code": "AD", "population": 0}
{"geonameid": 3007683, "name": "Pic des Langounelles", "latitude": 42.61203, "longitude": 1.47364, "country_code": "AD", "population": 0}
{"geonameid": 3017832, "name": "Pic de les Abelletes", "latitude": 42.52535, "longitude": 1.73343, "country_code": "AD", "population": 0}
{"geonameid": 3017833, "name": "Estany de les Abelletes", "latitude": 42.52915, "longitude": 1.73362, "country_code": "AD", "population": 0}
{"geonameid": 3023203, "name": "Port Vieux de la Coume d’Ose", "latitude": 42.62568, "longitude": 1.61823, "country_code": "AD", "population": 0}
{"geonameid": 3029315, "name": "Port de la Cabanette", "latitude": 42.6, "longitude": 1.73333, "country_code": "AD", "population": 0}
{"geonameid": 3034945, "name": "Port Dret", "latitude": 42.60172, "longitude": 1.45562, "country_code": "AD", "population": 0}
{"geonameid": 3038814, "name": "Costa de Xurius", "latitude": 42.50692, "longitude": 1.47569, "country_code": "AD", "population": 0}
{"geonameid": 3038815, "name": "Font de la Xona", "latitude": 42.55003, "longitude": 1.44986, "country_code": "AD", "population": 0}
{"geonameid": 3038816, "name": "Xixerella", "latitude": 42.55327, "longitude": 1.48736, "country_code": "AD", "population": 0}
{"geonameid": 3038818, "name": "Riu Xic", "latitude": 42.57165, "longitude": 1.67554, "country_code": "AD", "population": 0}
{"geonameid": 3038819, "name": "Pas del Xic", "latitude": 42.49766, "longitude": 1.57597, "country_code": "AD", "population": 0}
{"geonameid": 3038820, "name": "Roc del Xeig", "latitude": 42.56068, "longitude": 1.4898, "country_code": "AD", "population": 0}

索引结构

PUT allcountries
{
  "settings": {
    "index.number_of_replicas": 0
  },
  "mappings": {
        "_doc":{
            "dynamic": "strict",
            "properties": {
              "geonameid": {
                "type": "long"
              },
              "name": {
                "type": "text"
              },
              "latitude": {
                "type": "double"
              },
              "longitude": {
                "type": "double"
              },
              "country_code": {
                "type": "text"
              },
              "population": {
                "type": "long"
              }
            }
        }
  }
}

重组json脚本

# coding=UTF-8
# 将原始josn重组出适合ES bulk API导入的JSON数据
import json
import os
import io
current_path = os.path.dirname(__file__)
#w打开一个文件只用于写入,r用于只读
#如果该文件已存在则打开文件,并从开头开始编辑,即原有内容会被删除
#如果该文件不存在,创建新文件
new_jsonfile = io.open(current_path+'/es-test-bulk.json','w',encoding='utf-8')

with io.open(current_path+'/es-test.json','r',encoding='utf-8')as fp:
    for line in fp.readlines():
        json_data=json.loads(line)
        #添加index行
        new_data={}
        new_data['index']={}
        new_data['index']['_index']="allCountries"
        temp=json.dumps(new_data).encode("utf-8").decode('unicode_escape')
        new_jsonfile.write(temp)
        new_jsonfile.write('\n'.decode('utf-8'))

        #原json对象处理为1行
        old_data={}
        old_data['geonameid']=json_data['geonameid']
        old_data['name']=json_data['name']
        old_data['latitude']=json_data['latitude']
        old_data['longitude']=json_data['longitude']
        old_data['country_code']=json_data['country_code']
        old_data['population']=json_data['population']
        temp=json.dumps(old_data).encode("utf-8").decode('unicode_escape')
        new_jsonfile.write(temp)
        new_jsonfile.write('\n'.decode('utf-8'))
        
new_jsonfile.close()

重组后的json文件

{"index": {"_index": "allcountries"}}
{"name": "El Barrerol", "geonameid": 3040809, "longitude": 1.45207, "country_code": "AD", "latitude": 42.439579999999999, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Camí d’Easagents", "geonameid": 3040810, "longitude": 1.61341, "country_code": "AD", "latitude": 42.53349, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Pleta de Duedra", "geonameid": 3040811, "longitude": 1.4949399999999999, "country_code": "AD", "latitude": 42.625540000000001, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Pleta de Duedra", "geonameid": 3040812, "longitude": 1.5637000000000001, "country_code": "AD", "latitude": 42.61985, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Plana Duedra", "geonameid": 3040813, "longitude": 1.5228900000000001, "country_code": "AD", "latitude": 42.59393, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Planella del Duc", "geonameid": 3040814, "longitude": 1.4995700000000001, "country_code": "AD", "latitude": 42.456490000000002, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Canal del Duc", "geonameid": 3040815, "longitude": 1.6195600000000001, "country_code": "AD", "latitude": 42.576920000000001, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Canal Dreta", "geonameid": 3040816, "longitude": 1.5381, "country_code": "AD", "latitude": 42.551319999999997, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Canal Dreta", "geonameid": 3040817, "longitude": 1.4865900000000001, "country_code": "AD", "latitude": 42.506630000000001, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Port Dret", "geonameid": 3040818, "longitude": 1.7001299999999999, "country_code": "AD", "latitude": 42.573979999999999, "population": 0}

bulk api调用

curl -H "Content-Type: application/x-ndjson"  -XPOST "192.168.1.1:9600/allcountries/_doc/_bulk" --data-binary @"/opt/es-documents-bulk.json"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/598496.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Boot如何实现分布式文件系统

Spring Boot如何实现分布式文件系统 随着数据量的不断增长,单机文件系统已经无法满足大规模数据存储和访问的需求,因此分布式文件系统变得越来越重要。本文将介绍如何使用 Spring Boot 实现分布式文件系统。 1. 分布式文件系统的设计 分布式文件系统是…

【JavaSE】Java基础语法(四十):UDP通信程序

文章目录 1. UDP发送数据2. UDP接收数据【应用】3. UDP通信程序练习【应用】4. UDP三种通讯方式 1. UDP发送数据 Java中的UDP通信 UDP协议是一种不可靠的网络协议,它在通信的两端各建立一个Socket对象,但是这两个 Socket只是发送,接收数据的对…

Doris的一些进阶用法

6.doris进阶 6.1修改表 6.1.1修改表名 示例: 将名为 table1 的表修改为 table2 SQLALTER TABLE table1 RENAME table2; -- 示例 ALTER TABLE aggregate_test RENAME aggregate_test1; 将表 example_table 中名为 rollup1 的 rollup index 修改为 rollup2 SQLA…

V2board 1.6.1 提权漏洞(web缓存投毒)

目录 复现环境: 漏洞产生的原因: 漏洞的利用以及复现: 复现环境: 在gethub中直接拉去docker镜像 vulhub/README.zh-cn.md at master vulhub/vulhub GitHub 漏洞产生的原因: 前端的认证方式与后端并没有进行区分…

【LeetCode热题100】打卡第9天:电话号码的字母组合

文章目录 电话号码的字母组合⛅前言🔒题目🔑题解 电话号码的字母组合 ⛅前言 大家好,我是知识汲取者,欢迎来到我的LeetCode热题100刷题专栏! 精选 100 道力扣(LeetCode)上最热门的题目&#xf…

本地运行 LLAMA GPT-3.5-TURBO开源项目

git: nomic-ai/gpt4all: gpt4all: an ecosystem of open-source chatbots trained on a massive collections of clean assistant data including code, stories and dialogue (github.com) 下载好源码后,的目录结构: 视频中说的 chat 目录…

视图和用户管理

目录 视图基本使用视图规则和限制 用户管理用户用户信息创建用户删除用户修改用户密码 数据库的权限给用户授权回收权限 视图 视图是一个虚拟表,其内容由查询定义。同真实的表一样,视图包含一系列带有名称的列和行数据。视图的数据变化会影响到基表&…

基于Python的接口自动化-构建mock接口服务

引言 Mock 即模拟,就是在测试过程中,对于某些不容易构造或者不容易获取的对象,用一个虚拟的对象来创建以便测试的测试方法,其最大的优势就是降级前后端耦合度, 使前端工程师可以不依赖后端返回数据,先开发前…

堆排序及top k 问题

目录 一:堆排序 1.向上调整建堆 2.向下调整建堆 3.向上调整建堆时间复杂度 4.向下调整建堆时间复杂度 二:找 top k 问题 1.造数据 2.进行建堆,查找最大的K个数据 一:堆排序 升序 --- 建大堆 --- 每个父亲节点 > 孩子节…

高德API JS 高德地图获取多个坐标点的中心点

高德API JS 高德地图获取多个坐标点的中心点 一、需求 我需要在地图上展示多个地点,并且展示的同时,地图缩放到合适的大小,要求刚好能显示全部点位,并且边缘留有一部分间隔。 做成如图所示这样。 二、需要用到的 AMap 类库 经…

使用Python绘制6.1儿童节消消乐,素描图,词云图,字符画图,提取轮廓图及蒙太奇效果图

这篇博客将介绍如何使用Python绘制6.1儿童节消消乐,素描图,词云图,字符画图,提取轮廓图及蒙太奇效果图。 使用Python绘制端午dragboat消消乐 美轮美奂的界面效果 1. 效果图 6.1儿童节快乐原始图VS素描图: 素描进阶…

内网穿透-公网ip-方法总结-访问内网服务器-frp-虚拟服务器

文章目录 1.固定IP2.虚拟服务器转发3.IP盒子4.总结 1.固定IP 第一种方式是向三大电信服务商购买专用通道,固定IP,这种方式是最正统,也是各大虚拟服务器服务商采用的方式,宽带带宽有稳定的保障。在访问量不足的前提下,…

anaconda 安装_Linux系统上

安装流程 1 下载安装包 官网 https://www.anaconda.com/download#downloads 2 执行安装 bash Anaconda3-2021.11-Linux-x86_64.sh3 安装过程 一路enteryes,接受licence、指定安装路径和init之后安装完成。 检验anaconda是否安装成功。 conda --version或 con…

【LeetCode】12,整数转罗马数字。 难度等级:中等。易错点:使用 python 字典构建哈希表时要考虑哈希表是否有序

文章目录 一、题目二、我的解法:基于有序哈希表的贪心算法2.1 使用 dict 构建哈希表2.2 使用两个 list / tuple 构建有序哈希表 一、题目 二、我的解法:基于有序哈希表的贪心算法 2.1 使用 dict 构建哈希表 贪心法则:我们每次尽量使用最大的…

基于AT89C52单片机的多功能万年历设计

点击链接获取Keil源码与Project Backups仿真图: https://download.csdn.net/download/qq_64505944/87853675 源码获取 目 录 摘 要 1 1 方案论证 2 1.1 单片机芯片的选择方案和论证 2 1.2 显示模块选择方案和论证 2 1.3 时钟芯片的选择方案和论证 3 1.4 电路设计…

BitLocker加密卷“恢复密钥(数字密码)”提取还原

BitLocker是微软Windows自带的用于加密磁盘分卷的技术。 通常,解开后的加密卷通过Windows自带的命令工具“manage-bde”可以查看其恢复密钥串,如下图所示: 如图,这里的数字密码下面的一长串字符串即是下面要提取恢复密钥。 在计…

IMA/EVM完整性检测代码分析

IMA/EVM完整性检测 IMA(Integrity Measurement Architecture)是一个内核安全子系统,用于检测文件或数据的完整性和安全性。IMA的hook机制指的是内核接口钩子(kernel interface hooks),用于向IMA注册和实现…

第三章 部署Web及WDS服务

♥️作者介绍:奇妙的大歪 ♥️个人名言:但行前路,不负韶华! ♥️个人简介:云计算网络运维专业人员 目录 一.什么是web 1.www(world wide web)万维网 世界 维度 2.www服务软件 3.info…

ElementUI-Form表单二次封装

一、Form组件二次封装考虑组件构成: form组件:input text passworldselectcheckboxradio文本域日期 二、实现Form表单的二次封装: 1. 分析出对应的位置 开始抽离组件 2. 如果需要产生多个form表单,则需要产生多个el-form-item…

学习路之gis--百度离线地图下载制作

在有些情况下需要使用地图,但又不能访问外网,这时你需要一个离线地图。本文介绍如何制作百度离线地图。 下面将介绍如何实现一个离线版百度地图: 1. 下载百度地图瓦片 下载网址:望远网-百度地图下载 首先需选择下载地图瓦片的样式…