大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)

news2025/3/12 22:28:46

文章目录

    • 一、概述
    • 二、Hudi CLI
    • 三、Spark 与 Hudi 整合使用
      • 1)Spark 测试
      • 2)Spark 与 Hudi 整合使用
        • 1、启动spark-shell
        • 2、导入park及Hudi相关包
        • 3、定义变量
        • 4、模拟生成Trip乘车数据
        • 5、将模拟数据List转换为DataFrame数据集
        • 6、将数据写入到hudi
    • 四、Flink 与 Hudi 整合使用
      • 1)启动flink集群
      • 2) 启动flink SQL 客户端
      • 3)添加数据
      • 4)查询数据(批式查询)
      • 5)更新数据
      • 6)Streaming Query(流式查询)

一、概述

Hudi(Hadoop Upserts Deletes and Incrementals),简称Hudi,是一个流式数据湖平台,支持对海量数据快速更新,内置表格式,支持事务的存储层、 一系列表服务、数据服务(开箱即用的摄取工具)以及完善的运维监控工具,它可以以极低的延迟将数据快速存储到HDFS或云存储(S3)的工具,最主要的特点支持记录级别的插入更新(Upsert)和删除,同时还支持增量查询。

GitHub地址:https://github.com/apache/hudi

官方文档:https://hudi.apache.org/cn/docs/overview

关于Apache Hudi 数据湖 也可以参考我这篇文章:大数据Hadoop之——新一代流式数据湖平台 Apache Hudi

在这里插入图片描述

二、Hudi CLI

构建hudi后,可以通过cd hudi cli&&./hudi-cli.sh启动shell。一个hudi表驻留在DFS上的一个称为basePath的位置,我们需要这个位置才能连接到hudi表。Hudi库有效地在内部管理此表,使用.hoodie子文件夹跟踪所有元数据。

编译生成的包如下:
在这里插入图片描述

# 启动
./hudi-cli/hudi-cli.sh

在这里插入图片描述

三、Spark 与 Hudi 整合使用

Hudi 流式数据湖平台,协助管理数据,借助HDFS文件系统存储数据,使用Spark操作数据。
在这里插入图片描述

Hadoop 安装可参考我这篇文章:大数据Hadoop原理介绍+安装+实战操作(HDFS+YARN+MapReduce)
Hadoop HA安装可参考我这篇文章:大数据Hadoop之——Hadoop 3.3.4 HA(高可用)原理与实现(QJM)
Spark 环境配置可以参考我这篇文章:大数据Hadoop之——计算引擎Spark

1)Spark 测试

cd $SPARK_HOME
hdfs dfs -mkdir /tmp/
hdfs dfs -put README.md /tmp/
hdfs dfs -text /tmp/README.md

# 启动spark-shell
./bin/spark-shell --master local[2]

val datasRDD = sc.textFile("/tmp/README.md")
# 行数
datasRDD.count()
# 读取第一行数据
datasRDD.first()
val dataframe = spark.read.textFile("/tmp/README.md")
dataframe.printSchema
dataframe.show(10,false)

在这里插入图片描述

2)Spark 与 Hudi 整合使用

官方示例:https://hudi.apache.org/docs/quick-start-guide/
在spark-shell命令行,对Hudi表数据进行操作,需要运行spark-shell命令是,添加相关的依赖包,命令如下:

1、启动spark-shell

【第一种方式】在线联网下载相关jar包

### 启动spark-shell,使用spark-shell操作hudi数据湖
### 第一种方式
./bin/spark-shell \
  --master local[2] \
  --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.0 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

### 上述命令需要联网,基于ivy下载下载相关jar包到本地,然后加载到CLASSPATH,其中包含三个jar包。

【第二种方式】离线使用已经下载好的jar包

### 第二种方式,使用--jars
cd /opt/apache
wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.3.0/spark-avro_2.12-3.3.0.jar

cd $SPARK_HOME
./bin/spark-shell \
--master local[2] \
--jars  /opt/apache/hudi-0.12.0/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0.jar,/opt/apache/hudi-0.12.0/hudi-examples/hudi-examples-spark/target/lib/unused-1.0.0.jar,/opt/apache/spark-avro_2.12-3.3.0.jar \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"

2、导入park及Hudi相关包

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord

3、定义变量

val tableName = "hudi_trips_cow"
# 存储到HDFS
val basePath = "hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/hudi_trips_cow"
# 存储到本地
# val basePath = "file:///tmp/hudi_trips_cow"

4、模拟生成Trip乘车数据

##构建DataGenerator对象,用于模拟生成10条Trip乘车数据
val dataGen = new DataGenerator
 
val inserts = convertToStringList(dataGen.generateInserts(10))

其中,DataGenerator可以用于生成测试数据,用来完成后续操作。

5、将模拟数据List转换为DataFrame数据集

##转成df
val df = spark.read.json(spark.sparkContext.parallelize(inserts,2))

##查看数据结构
df.printSchema()
##查看数据
df.show()
# 指定字段查询
df.select("rider","begin_lat","begin_lon","driver","end_lat","end_lon","fare","partitionpath","ts","uuid").show(10,truncate=false)

6、将数据写入到hudi

# 将数据保存到hudi表中,由于Hudi诞生时基于Spark框架,所以SparkSQL支持Hudi数据源,直接通过format指定数据源Source,设置相关属性保存数据即可,注意,hudi不是正真存储数据,而是管理数据。

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

## 重要参数说明
#参数:getQuickstartWriteConfigs,设置写入/更新数据至Hudi时,Shuffle时分区数目
#参数:PRECOMBINE_FIELD_OPT_KEY,数据合并时,依据主键字段
#参数:RECORDKEY_FIELD_OPT_KEY,每条记录的唯一id,支持多个字段
#参数:PARTITIONPATH_FIELD_OPT_KEY,用于存放数据的分区字段

本地存储
在这里插入图片描述
HDFS 存储
在这里插入图片描述

四、Flink 与 Hudi 整合使用

官方示例:https://hudi.apache.org/docs/flink-quick-start-guide

1)启动flink集群

下载地址:http://flink.apache.org/downloads.html

### 1、下载软件包
wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz
tar -xf flink-1.14.6-bin-scala_2.12.tgz
export FLINK_HOME=/opt/apache/flink-1.14.6

### 2、设置HADOOP_CLASSPATH
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
export HADOOP_CONF_DIR='/opt/apache/hadoop/etc/hadoop'

### 3、启动单节点flink 集群
# Start the Flink standalone cluster,这里先修改slot数量,默认是1,这里改成4
# taskmanager.numberOfTaskSlots: 4
cd $FLINK_HOME
./bin/start-cluster.sh

# 测试可用性
./bin/flink run  examples/batch/WordCount.jar

在这里插入图片描述

2) 启动flink SQL 客户端

# 【第一种方式】指定jar包
./bin/sql-client.sh embedded -j ../hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.14-bundle-0.12.0.jar shell

# 【第二种方式】还可以将jar包放在$FINK_HOME/lib目录下
./bin/sql-client.sh embedded shell

3)添加数据

-- sets up the result mode to tableau to show the results directly in the CLI
SET 'sql-client.execution.result-mode' = 'tableau';

CREATE TABLE t1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/flink-hudi-t1',
  'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);

INSERT INTO t1 VALUES ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');
-- insert data using values
INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

在这里插入图片描述
HDFS上查看
在这里插入图片描述

4)查询数据(批式查询)

select * from t1;

在这里插入图片描述

5)更新数据

-- this would update the record with key 'id1'
insert into t1 values
  ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

6)Streaming Query(流式查询)

首先创建表t2,设置相关属性,以流的方式查询读取,映射到上面表:t1

  • read.streaming.enabled 设置为true,表明通过streaming的方式读取表数据;
  • read.streaming.check-interval 指定了source监控新的commits的间隔时间4s
  • table.type 设置表类型为 MERGE_ON_READ
CREATE TABLE t2(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/flink-hudi-t1',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',  -- this option enable the streaming read
  'read.start-commit' = '20210316134557', -- specifies the start commit instant time
  'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);

-- Then query the table in stream mode
select * from t2;

注意:查看可能会遇到如下错误:

[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

【解决】添加hadoop-mapreduce-client-core-xxx.jarhive-exec-xxx.jar到Flink lib中。

cp /opt/apache/hadoop-3.3.2/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.2.jar $FLINK_HOME/lib
cp ./hudi-0.12.0/hudi-examples/hudi-examples-spark/target/lib/hive-exec-2.3.1-core.jar $FLINK_HOME/lib

在这里插入图片描述
Hive 与 Hudi的整合,小伙伴可以先看官网文档:https://hudi.apache.org/docs/syncing_metastore/#flink-setup

Spark 和 Hudi整合,Flink 与 Hudi整合先到这里了,还有很多其它大数据组件与Hudi的整合示例讲解会放在后面文章讲解,请小伙伴耐心等待,有任何疑问欢迎留言,会持续更新【大数据+云原生】相关的文章~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/132.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端ES6相关的面试题

前端ES6相关的面试题 文章目录前端ES6相关的面试题一. var let constvarletconst二 . 函数的扩展reset参数箭头函数三 . 数组的扩展四.Set和map数据结构Setmap五. Promise六. 模块化一. var let const var var存在声明提升 >先上车,后买票 var声明的变量会挂载到window下面…

Java实操避坑指南四、spring中的坑

文章目录1. 项目搭建过程1. pom 依赖2. 在没有配置数据库相关时不要引入依赖包,如spring-boot-starter-data-jpa2. spring bean 默认生成策略的正确使用1. 代码示例2. 单元测试3. 工具类 [参考](#test2)4. 报错信息5. 分析6. 使用说明2. 使用了Autowired 注解&#…

【5G RAN】5G gNB间的N2/NGAP切换(handover)那点事儿

博主未授权任何人或组织机构转载博主任何原创文章,感谢各位对原创的支持! 博主链接 本人就职于国际知名终端厂商,负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作,目前牵头6G算力网络技术标准研究。 博客…

@Import注解详解

Import这个注解非常重要,而且在springboot项目当中随处可见,就拿springboot启动类来说,我们经常会遇到一些Enable相关的注解,例如开启异步EnableAsync、开启缓存支持EnableCaching、开启定时任务EnableScheduling等等… 目录一、I…

Oracle通过DBLINK访问达梦数据库

环境需求 需要安装配置以下相关软件: 1、Oracle Gateways 2、ODBC数据源(gateway机器) 3、达梦数据库软件(gateway机器) 安装配置 Windows环境 安装达梦数据库软件 安装步骤省略,可以参考DM 数据库…

【嵌入式Linux开发一路清障-连载02】Ubuntu22.04安装Shutter进行截图和标注

Ubuntu22.04安装Shutter进行截图和标注障碍 05-Ubuntu22.04中不会给截图做标注,写Bolg举步维艰命令行方式安装Shuttershutter中的常用命令为截取活动窗口设置快捷键安装gnome-web-photo截取长图--失败--未完待续小结下节预告障碍 05-Ubuntu22…

一起来部署项目-采购一台云服务器

前言 不会运维的程序员不是一个好程序员,你是这样认为吗?不,不重要,关键是很多小企业是让后端程序员去干运维的,省钱~~~o(╥﹏╥)o。特别是在YQ严重的当下,所以为了提高自己的竞争力,从今天起&…

万字爽文一篇带你掌握Java8新特性

陈老老老板说明:新的专栏,本专栏专门讲Java8新特性,把平时遇到的问题与Java8的写法进行总结,需要注意的地方都标红了,一起加油。本文是介绍Java8新特性与常用方法(此篇只做大体介绍了解,之后会把…

Java8中LocalDate详解Date线程不安全的原因

LocalDate 分类分工 java.time.LocalDate ->只对年月日做出处理 java.time.LocalTime ->只对时分秒纳秒做出处理 java.time.LocalDateTime ->同时可以处理年月日和时分秒优点 除了使用起来更加简单和灵活,主要是传统的时期处理类Date、Calendar不是多线…

刷爆leetcode第十二期 0026 数组中数字出现的次数

编号0026 数组中数字出现的次数 一个整型数组 nums 里除两个数字之外,其他数字都出现了两次。请写程序找出这两个只出现一次的数字。要求时间复杂度是O(n),空间复杂度是O(1)。 题目示例如下 这里其实是一道我一个月之前做的题目 在学弟的博客里刚好看…

【数据结构与算法】Java实现七大排序算法汇总

✨哈喽,进来的小伙伴们,你们好耶!✨ 🛰️🛰️系列专栏:【数据结构与算法】 ✈️✈️本篇内容: Java实现七大排序算法汇总! 🚀🚀由于本篇博客涉及代码较多,博主把代码都提…

刷爆leetcode第十一期 0023~0025

刷爆leetcode第十一期 编号0023 相同的树编号0024 对称二叉树编号0025 另一个树的子树编号0023 相同的树 给你两棵二叉树的根节点 p 和 q ,编写一个函数来检验这两棵树是否相同。 如果两个树在结构上相同,并且节点具有相同的值,则认为它们是…

多旋翼无人机仿真 rotors_simulator:用键盘控制无人机飞行

多旋翼无人机仿真 rotors_simulator:用键盘控制无人机飞行前言书接上文接口测试键盘指令发布指令转换与发布修改 rotors_simulator 的控制接口节点测试前言 RotorS 是一个MAV gazebo 仿真系统。 提供了几种多旋翼仿真模型,例如 AscTec HummingbirdAsc…

PHP反序列化

序列化与反序列化 序列化 反序列是指把对象转换为字符串的过程&#xff0c;便于在内存、文件、数据库中保存、传输&#xff0c;PHP中使用serialize函数进行序列化。 <?phpclass Person{public $name"php";protected $id;private $age;}$a new Person();$a_se…

全排列笔记

14天阅读挑战赛 全排列 题目 给定一个 没有重复 数字的序列&#xff0c;返回其所有可能的全排列。 示例: 输入: [1,2,3] 输出: [ [1,2,3], [1,3,2], [2,1,3], [2,3,1], [3,1,2], [3,2,1] ] 解答 方法一&#xff1a;回溯 思路 从高中的数学知识我们可以知道 从[1,2,3…

如何在Linux上优雅地写代码-Linux生存指南

初入Linux&#xff0c;发现老是要面对一个命令行&#xff0c;大黑框&#xff0c;看不懂各种手册&#xff0c;写代码也是用vi/vim&#xff0c;难受的捉急。其实Linux下的各种工具&#xff0c;强大得超出你的想象&#xff0c;如果你初入Linux&#xff0c;那么你急需阅读这篇文章&…

操作系统的主要功能

目录 一. 处理机管理功能 1.1 进程控制 1.2 进程同步 1.3 进程通信 1.4 进程调度 二. 存储器管理功能 2.1 内存分配 2.2 内存保护 2.3 地址映射 2.4 内存扩充 三. 设备管理功能 3.1 缓冲管理 3.2 设备分配 3.3 设备处理 3.4 设备独立性和虚拟设备 四…

关于Python爬虫兼职,这里有一条高效路径

前言 昨天&#xff0c;一位00后前来报喜&#xff0c;也表达感谢。 他说&#xff0c;当初刚毕业啥也不会也找不到工作&#xff0c;最后听了我的&#xff0c;边学爬虫边做兼职项目&#xff0c;积极主动求职投简历&#xff0c;既可以兼职获得收益&#xff0c;也能积累项目经验谋求…

Linux:以K、M、G查看文件大小;

简介&#xff1a;灵活多变的查看文件的大小 历史攻略&#xff1a; Linux&#xff1a;sudo免密 python&#xff1a;执行dos命令、Linux命令 案例源码&#xff1a; # 以适当方式显示文件大小&#xff1a; ls -lh# 以byte显示文件大小&#xff1a; ls -l# 以M显示文件大小&am…

NR PUSCH(五) DMRS

微信同步更新欢迎关注同名modem协议笔记 PUSCH DMRS和PDSCH DMRS内容基本一样&#xff0c;但也有不同的地方&#xff0c;例如PUSCH 可能需要Transform precoding&#xff0c;port 对应0~11(DMRS configured type2)等等。先简单看看Transformprecoding的相关内容&#xff0c;Tr…