Hudi-集成 Hive

news2024/12/28 20:34:24

集成 Hive

Hudi 源表对应一份 HDFS 数据,通过 Spark,Flink 组件或者 Hudi CLI,可以将 Hudi 表的数据映射为 *Hive 外部表*,基于该外部表, Hive可以方便的进行实时视图,读优化视图以及增量视图的查询。

集成步骤

以 hive3.1.2、hudi 0.12.0为例,其他版本类似。

(1)拷贝编译好的jar包

将 hudi-hadoop-mr-bundle-0.12.0.jar , hudi-hive-sync-bundle-0.12.0.jar 放到 hive 节点的lib目录下;

cp /opt/software/hudi-0.12.0/packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.12.0.jar /opt/module/hive/lib/

cp /opt/software/hudi-0.12.0/packaging/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.12.0.jar /opt/module/hive/lib/

(2)配置完后重启 hive

// 按照需求选择合适的方式重启
nohup hive --service metastore &
nohup hive --service hiveserver2 &

Hive同步

(1)Flink同步Hive

Flink hive sync 现在支持两种 hive sync mode, 分别是 hms 和 jdbc 模式。 其中 hms 只需要配置 metastore uris;而 jdbc 模式需要同时配置 jdbc 属性 和 metastore uris,具体配置模版如下:

## hms mode 配置
CREATE TABLE t1(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts TIMESTAMP(3),
    `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
    'connector'='hudi',
    'path' = 'hdfs://xxx.xxx.xxx.xxx:9000/t1',
    'table.type'='COPY_ON_WRITE',        -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
    'hive_sync.enable'='true',           -- required,开启hive同步功能
    'hive_sync.table'='${hive_table}',              -- required, hive 新建的表名
    'hive_sync.db'='${hive_db}',             -- required, hive 新建的数据库名
    'hive_sync.mode' = 'hms',            -- required, 将hive sync mode设置为hms, 默认jdbc
    'hive_sync.metastore.uris' = 'thrift://ip:9083' -- required, metastore的端口
);

实例:

CREATE TABLE t10(
    id int,
    num int,
    ts int,
    primary key (id) not enforced
)
PARTITIONED BY (num)
with(
    'connector'='hudi',
    'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t10',
    'table.type'='COPY_ON_WRITE', 
    'hive_sync.enable'='true', 
    'hive_sync.table'='h10', 
    'hive_sync.db'='default', 
    'hive_sync.mode' = 'hms',
    'hive_sync.metastore.uris' = 'thrift://hadoop1:9083'
);

insert into t10 values(1,1,1); 

(2)Spark 同步Hive

参数:https://hudi.apache.org/docs/basic_configurations#Write-Options

option("hoodie.datasource.hive_sync.enable","true").                         //设置数据集注册并同步到hive
option("hoodie.datasource.hive_sync.mode","hms").                         //使用hms
option("hoodie.datasource.hive_sync.metastore.uris", "thrift://ip:9083"). //hivemetastore地址
option("hoodie.datasource.hive_sync.username","").                          //登入hiveserver2的用户
option("hoodie.datasource.hive_sync.password","").                      //登入hiveserver2的密码
option("hoodie.datasource.hive_sync.database", "").                   //设置hudi与hive同步的数据库
option("hoodie.datasource.hive_sync.table", "").                        //设置hudi与hive同步的表名
option("hoodie.datasource.hive_sync.partition_fields", "").               //hive表同步的分区列
option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor"). // 分区提取器 按/ 提取分区

案例:

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
	.withColumn("a",split(col("partitionpath"),"\\/")(0))
	.withColumn("b",split(col("partitionpath"),"\\/")(1))
	.withColumn("c",split(col("partitionpath"),"\\/")(2))
df.write.format("hudi").
    options(getQuickstartWriteConfigs).
    option(PRECOMBINE_FIELD_OPT_KEY, "ts").
    option(RECORDKEY_FIELD_OPT_KEY, "uuid").
    option("hoodie.table.name", tableName). 
    option("hoodie.datasource.hive_sync.enable","true").
    option("hoodie.datasource.hive_sync.mode","hms").
    option("hoodie.datasource.hive_sync.metastore.uris", "thrift://hadoop1:9083").
    option("hoodie.datasource.hive_sync.database", "default").
    option("hoodie.datasource.hive_sync.table", "spark_hudi").
    option("hoodie.datasource.hive_sync.partition_fields", "a,b,c").
    option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").
	mode(Overwrite).
	save(basePath)

Flink使用 HiveCatalog

(1)直接使用Hive Catalog

  • 上传hive connector到flink的lib中

    hive3.1.3的connector存在guava版本冲突,需要解决:官网下载connector后,用压缩软件打开jar包,删除/com/google文件夹。处理完后上传flink的lib中。

  • 解决与hadoop的冲突

    避免与hadoop的冲突,拷贝hadoop-mapreduce-client-core-3.1.3.jar到flink的lib中(5.2.1已经做过)

  • 创建catalog

    CREATE CATALOG hive_catalog
      WITH (
          'type' = 'hive',
          'default-database' = 'default',
          'hive-conf-dir' = '/opt/module/hive/conf',
          'hadoop-conf-dir'='/opt/module/hadoop-3.1.3/etc/hadoop'
      );
    
    use catalog hive_catalog;
    
    -- hive-connector内置了hive module,提供了hive自带的系统函数
    load module hive with ('hive-version'='3.1.2');
    show modules;
    show functions;
    
    -- 可以调用hive的split函数
    select split('a,b', ',');
    

(2)Hudi Catalog使用hms

CREATE CATALOG hoodie_hms_catalog
  WITH (
      'type'='hudi',
      'catalog.path' = '/tmp/hudi_hms_catalog',
      'hive.conf.dir' = '/opt/module/hive/conf',
      'mode'='hms',
      'table.external' = 'true'
  );

创建 Hive 外表

一般来说 Hudi 表在用 Spark 或者 Flink 写入数据时会自动同步到 Hive 外部表(同6.2), 此时可以直接通过 beeline 查询同步的外部表,若写入引擎没有开启自动同步,则需要手动利用 hudi 客户端工具 run_hive_sync_tool.sh 进行同步,具体后面介绍。

查询 Hive 外表

(1)设置参数

使用 Hive 查询 Hudi 表前,需要通过set命令设置 hive.input.format,否则会出现数据重复,查询异常等错误,如下面这个报错就是典型的没有设置 hive.input.format 导致的:

java.lang.IllegalArgumentException: HoodieRealtimeReader can oly work on RealTimeSplit and not with xxxxxxxxxx

除此之外对于增量查询,还需要 set 命令额外设置3个参数。

set hoodie.mytableName.consume.mode=INCREMENTAL;
set hoodie.mytableName.consume.max.commits=3;
set hoodie.mytableName.consume.start.timestamp=commitTime;

注意这3个参数是表级别参数。

参数名描述
hoodie.mytableName.consume.modeHudi表的查询模式。增量查询 :INCREMENTAL。非增量查询:不设置或者设为SNAPSHOT
hoodie.mytableName.consume.start.timestampHudi表增量查询起始时间。
hoodie. mytableName.consume.max.commitsHudi表基于 hoodie.mytableName.consume.start.timestamp之后要查询的增量commit次数。例如:设置为3时,增量查询从指定的起始时间之后commit 3次的数据设为-1时,增量查询从指定的起始时间之后提交的所有数据

(2)COW 表查询

这里假设同步的 Hive 外表名为 hudi_cow。

  • 实时视图

    设置 hive.input.format 为以下两个之一:

    • org.apache.hadoop.hive.ql.io.HiveInputFormat
    • org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat

    像普通的hive表一样查询即可:

    set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;
    select count(*) from hudi_cow;
    
  • 增量视图

    除了要设置 hive.input.format,还需要设置上述的3个增量查询参数,且增量查询语句中的必须添加 where 关键字并将 `_hoodie_commit_time > ‘startCommitTime’ 作为过滤条件(这地方主要是hudi的小文件合并会把新旧commit的数据合并成新数据,hive是没法直接从parquet文件知道哪些是新数据哪些是老数据)

    set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;
    set hoodie.hudicow.consume.mode= INCREMENTAL;
    set hoodie.hudicow.consume.max.commits=3;
    set hoodie.hudicow.consume.start.timestamp= xxxx;
    select count(*) from hudicow where `_hoodie_commit_time`>'xxxx'
    -- (这里注意`_hoodie_commit_time` 的引号是反引号(tab键上面那个)不是单引号, 'xxxx'是单引号)
    

(3)MOR 表查询

这里假设 MOR 类型 Hudi 源表的表名为hudi_mor,映射为两张 Hive 外部表hudi_mor_ro(ro表)和 hudi_mor_rt(rt表)。

  • 实时视图

    设置了 hive.input.format 之后,即可查询到Hudi源表的最新数据

    set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;
    select * from hudicow_rt;
    
  • 读优化视图

    ro 表全称 read oprimized table,对于 MOR 表同步的 xxx_ro 表,只暴露压缩后的 parquet。其查询方式和COW表类似。设置完 hiveInputFormat 之后 和普通的 Hive 表一样查询即可。

  • 增量视图

    这个增量查询针对的rt表,不是ro表。同 COW 表的增量查询类似:

    set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; // 这地方指定为HoodieCombineHiveInputFormat
    set hoodie.hudimor.consume.mode=INCREMENTAL;
    set hoodie.hudimor.consume.max.commits=-1;
    set hoodie.hudimor.consume.start.timestamp=xxxx;
    select * from hudimor_rt where `_hoodie_commit_time`>'xxxx';// 这个表名要是rt表
    索引
    

    说明:

    • set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;最好只用于 rt 表的增量查询 当然其他种类的查询也可以设置为这个,这个参数会影响到普通的hive表查询,因此在rt表增量查询完成后,应该设置 set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; 或者改为默认值set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 用于其他表的查询。
    • set hoodie.mytableName.consume.mode=INCREMENTAL; 仅用于该表的增量查询模式,若要对该表切换为其他查询模式,应设置set hoodie.hudisourcetablename.consume.mode=SNAPSHOT;

hive sync tool

若写入引擎没有开启自动同步,则需要手动利用 Hudi 客户端工具进行同步,Hudi提供Hive sync tool用于同步Hudi最新的元数据(包含自动建表、增加字段、同步分区信息)到hive metastore。

Hive sync tool提供三种同步模式,JDBC,HMS,HIVEQL。这些模式只是针对Hive执行DDL的三种不同方式。在这些模式中,JDBC或HMS优于HIVEQL, HIVEQL主要用于运行DML而不是DDL。

(1)使用语法及参数

脚本位置在hudi源码路径下的hudi-sync/hudi-hive-sync/run_sync_tool.sh

  • 语法

    #查看语法帮助
    ./run_sync_tool.sh --help
    
    #语法:
    ./run_sync_tool.sh  \
    --jdbc-url jdbc:hive2:\/\/hiveserver:10000 \
    --user hive \
    --pass hive \
    --partitioned-by partition \
    --base-path <basePath> \
    --database default \
    --table <tableName>
    

    从Hudi 0.5.1版本开始,读时合并优化版本的表默认带有’_ro’后缀。为了向后兼容旧的Hudi版本,提供了一个可选的配置 --skip-ro-suffix,如果需要,可以关闭’_ro’后缀。

  • 参数说明

    HiveSyncConfigDataSourceWriteOption描述
    –databasehoodie.datasource.hive_sync.database同步到hive的目标库名
    –tablehoodie.datasource.hive_sync.table同步到hive的目标表名
    –userhoodie.datasource.hive_sync.usernamehive metastore 用户名
    –passhoodie.datasource.hive_sync.passwordhive metastore 密码
    –use-jdbchoodie.datasource.hive_sync.use_jdbc使用JDBC连接到hive metastore
    –jdbc-urlhoodie.datasource.hive_sync.jdbcurlHive metastore url
    –sync-modehoodie.datasource.hive_sync.mode同步hive元数据的方式. 有效值为 hms, jdbc 和hiveql.
    –partitioned-byhoodie.datasource.hive_sync.partition_fieldshive分区字段名,多个字段使用逗号连接.
    –partition-value-extractorhoodie.datasource.hive_sync.partition_extractor_class解析分区值的类名,默认SlashEncodedDayPartitionValueExtractor

(2)解决依赖问题

run_sync_tool.sh这个脚本就是查找hadoop、hive和bundle包的依赖,实际上使用的时候会报错各种ClassNotFoundException、NoSuchMethod,所以要动手修改依赖的加载逻辑:

vim /opt/software/hudi-0.12.0/hudi-sync/hudi-hive-sync/run_sync_tool.sh
  • 修改hadoop、hive、hudi-hive-sync-bundle-0.12.0.jar的依赖加载

    a. 将34行 HUDI_HIVE_UBER_JAR=xxxx 注释掉

    b. 将52行 HADOOP_HIVE_JARS=xxx注释掉

    #在 54行 添加如下:
    HADOOP_HIVE_JARS=`hadoop classpath`:$HIVE_HOME/lib/*
    HUDI_HIVE_UBER_JAR=/opt/software/hudi-0.12.0/packaging/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.12.0.jar
    

  • 解决parquet-column的版本冲突

    a. 上传parquet-column-1.12.2.jar到/opt/software/,脚本中添加如下:

    PARQUET_JAR=/opt/software/parquet-column-1.12.2.jar
    

    b. 拼接路径到命令最前面(只能最前面!)

    c. 保存退出

(3)JDBC模式同步

通过hive2 jdbc协议同步,提供的是hive server2的地址,如jdbc:hive2://hive-server:10000。默认为jdbc。

cd /opt/software/hudi-0.12.0/hudi-sync/hudi-hive-sync

./run_sync_tool.sh \
--base-path hdfs://hadoop1:8020/tmp/hudi_flink/t2/ \
--database default \
--table t2_flink \
--jdbc-url jdbc:hive2://hadoop1:10000 \
--user atguigu \
--pass atguigu \
--partitioned-by num

(4)HMS模式同步

提供hive metastore的地址,如thrift://hms:9083,通过hive metastore的接口完成同步。使用时需要设置 --sync-mode=hms。

如果使用的是远程metastore,那么确保hive-site.xml配置文件中设置hive.metastore.uris。

./run_sync_tool.sh  \
--base-path hdfs://hadoop1:8020/tmp/hudi_flink/t3 \
--database default \
--table t3_flink  \
--user atguigu \
--pass atguigu \
--partitioned-by age \
--sync-mode hms \
--jdbc-url thrift://hadoop1:9083 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/342738.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【软件相关】文献管理工具——Zotero

文章目录0 前期教程1 前言2 一些说明3 下载安装4 功能一&#xff1a;插入文献引用格式5 功能二&#xff1a;从网页下载文献pdf和题录6 功能三&#xff1a;数据多平台同步7 功能四&#xff1a;通过DOI添加条目及添加订阅8 安装xpi插件9 功能五&#xff1a;智能识别中英文文献10 …

The Number Of ThreadPoolExecutor

序言整理下Java 线程池中线程数量如何设置的依据巨人肩膀:https://blog.csdn.net/weilaizhixing007/article/details/125955693https://blog.csdn.net/yuyan_jia/article/details/120298564#:~:text%E4%B8%80%E4%B8%AA%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%A4%84%E7%90%86%E8%AE%A1,…

MongoDB--》文档查询的详细具体操作

目录 统计查询 分页列表查询 排序查询 正则的复杂条件查询 比较查询 包含查询 条件连接查询 统计查询 统计查询使用count()方法&#xff0c;其语法格式如下&#xff1a; db.collection.count(query,options) ParameterTypeDescriptionquerydocument查询选择条件optio…

int和Integer有什么区别?

第7讲 | int和Integer有什么区别&#xff1f; Java 虽然号称是面向对象的语言&#xff0c;但是原始数据类型仍然是重要的组成元素&#xff0c;所以在面试中&#xff0c;经常考察原始数据类型和包装类等 Java 语言特性。 今天我要问你的问题是&#xff0c;int 和 Integer 有什么…

JUC并发编程 Ⅱ -- 共享模型之管程(下)

文章目录wait notifywait / notify的原理API 介绍sleep与wait辨析优雅地使用wait/notify保护性暂停模式超时版 GuardedObjectjoin原理多任务版GuardedObject生产者-消费者模式定义实现Park & Unpark基本使用特点原理重新理解线程状态转换线程的活跃性死锁定位死锁活锁饥饿R…

vs2019 winform安装包,安装完成后默认启动

vs2019 winform安装包&#xff0c;安装完成后默认启动在要打包的项目中&#xff0c;新建安装类选中打包项目&#xff0c;右键&#xff0c;进入文件系统![在这里插入图片描述](https://img-blog.csdnimg.cn/62647c550ffa4d489f1d19f19bbd99b1.png)选中application folder 右键&a…

java微信小程序校园二手闲置商品交易跳蚤市场

基于跳蚤市场小程序的设计基于现有的手机,可以实现个人中心、用户管理、卖家管理、商品类型管理、商品信息管理、购物车管理、私聊信息管理、聊天回复管理、留言板管理、我的收藏管理、系统管理等功能。方便用户对商品信息等详细的了解及统计分析。根据系统功能需求建立的模块关…

后量子 KEM 方案:Kyber

参考文献&#xff1a; Bos J, Ducas L, Kiltz E, et al. CRYSTALS-Kyber: a CCA-secure module-lattice-based KEM[C]//2018 IEEE European Symposium on Security and Privacy (EuroS&P). IEEE, 2018: 353-367.Avanzi R, Bos J, Ducas L, et al. Crystals-kyber[J]. NIST…

AlphaFold 2 处理蛋白质折叠问题

蛋白质是一个较长的氨基酸序列&#xff0c;比如100个氨基酸的规模&#xff0c;如此长的氨基酸序列连在一起是不稳定的&#xff0c;它们会卷在一起&#xff0c;形成一个独特的3D结构&#xff0c;这个3D结构的形状决定了蛋白质的功能。 蛋白质结构预测&#xff08;蛋白质折叠问题…

Android 一体机研发之修改系统设置————屏幕亮度

Android 一体机研发之修改系统设置————屏幕亮度 Android 一体机研发之修改系统设置————声音 Android 一体机研发之修改系统设置————自动锁屏 前言 最近工作略微有点儿空闲&#xff0c;抽空给大家总结一下&#xff1a;近期一直搞得一体机app研发&#xff0c;适用…

recv和明文收包分析

我们CTRLg 跳到recv 分析收包函数 发现函数会断并且收包函数返回值(收包包长)也会不断变化 那么证明recv是真正的收包函数&#xff0c;游戏没有重新实现该函数 我们只要分析该函数即可 在recv函数执行完毕以后下断 eax是包长,esi28是包指针 我们上2个号&#xff0c;让另外…

项目(今日指数之环境搭建)

一 项目架构1.1 今日指数技术选型【1】前端技术【2】后端技术栈【3】整体概览1.2 核心业务介绍【1】业务结构预览【2】业务结构预览1.定时任务调度服务XXL-JOB通过RestTemplate多线程动态拉去股票接口数据&#xff0c;刷入数据库&#xff1b; 2.国内指数服务 3.板块指数服务 4.…

清晰理解并解决二分问题

文章目录二分问题常规解法&#xff1a;使用CSTL自带算法解决二分问题&#xff1a;小数二分二分问题常规解法&#xff1a; 二分问题注意事项&#xff1a; 题目可能无解&#xff0c;但二分一定有解&#xff08;也就是二分问题会得到一个结果&#xff0c;但是该结果可能不符合题目…

RabbitMQ-集群

一、搭建1、创建三个虚拟机2、修改三台主机的hostname,分别为node1,node2,node3,分别重启vi /etc/hostname reboot3、配置各个主机的hosts文件&#xff0c;让各个节点都能互相识别对方vi /etc/hosts #添加下面配置 192.168.xxx.165 node1 192.168.xxx.167 node2 192.168.xxx.16…

Django by Example·第三章|Extending Your Blog Application@笔记

Django by Example第三章|Extending Your Blog Application笔记 之前已经写过两章内容了&#xff0c;继续第三章。第三章继续对博客系统的功能进行拓展&#xff0c;其中将会穿插一些重要的技术要点。 部分内容引用自原书&#xff0c;如果大家对这本书感兴趣 请支持原版Django …

基于模块联邦的微前端实现方案

一、 微前端应用案例概述 当前案例中包含三个微应用&#xff0c;分别为 Marketing、Authentication 和 Dashboard Marketing&#xff1a;营销微应用&#xff0c;包含首页组件和价格组件 Authentication&#xff1a;身份验证微应用&#xff0c;包含登录组件 Dashboard&#x…

B站发帖软件哪个好用?好用的哔哩哔哩发帖工具

B站发帖软件哪个好用?好用的哔哩哔哩发帖工具#发帖软件#哔哩哔哩发帖#视频发布软件 登录成功之后&#xff0c;进入到这样一个界面&#xff0c;默认情况下是这个样子的&#xff0c;我们在这里输入一下我们的一个文件夹的路径&#xff0c;输入到这里&#xff0c;点击添加账号&a…

kettle开发-Day36-循环驱动作业

前言&#xff1a;在日常数据处理时&#xff0c;我们通过变量传参来完成某个日期的数据转换。但可能因程序或者网络原因导致某个时间段的数据抽取失败。常见导致kettle作业失败的原因大概分为三大类&#xff0c;数据源异常、数据库异常、程序异常。因此面对这些异常时&#xff0…

Not available OpenAI s services are not available in your country.

一、准备阶段 1、邮箱账号(qq、网易、谷歌等等) 2、你能够科学上网(下边详细介绍) 3、拥有一个GW手机号&#xff0c;用于接收注册验证码。&#xff08;下边详细介绍&#xff09; 二、开始注册 1、官方注册网址https://beta.openai.com/signup&#xff08;按照步骤注册&am…

RDSDRDSPolarDBPolarDB-X的区别

RDS 阿里云关系型数据库&#xff08;Relational Database Service&#xff0c;简称RDS&#xff09;&#xff0c;是一种稳定可靠、可弹性伸缩的在线数据库服务。 基于阿里云分布式文件系统和高性能存储&#xff0c;RDS支持MySQL、SQL Server、PostgreSQL和PPAS&#xff08;Post…