CDPHudi实战-集成spark

news2025/1/6 6:21:40

[一]使用Spark-shell

1-配置hudi Jar包

[root@cdp73-1 ~]# for i in $(seq 1 6); do scp /opt/software/hudi-1.0.0/packaging/hudi-spark-bundle/target/hudi-spark3.4-bundle_2.12-1.0.0.jar   cdp73-$i:/opt/cloudera/parcels/CDH/lib/spark3/jars/; done
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 418.2MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 304.8MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 365.0MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 406.1MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 472.7MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 447.1MB/s   00:00
[root@cdp73-1 ~]#

2-进入Spark-shell

spark-shell --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:1.0.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'

3-初始化项目

// spark-shell
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._

val tableName = "trips_table"
val basePath = "hdfs:///tmp/trips_table"

4-创建表

首次提交将自动初始化表,如果指定的基本路径中尚不存在该表。

5-出入数据

// spark-shell
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
  Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
    (1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
    (1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
    (1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"    ),
    (1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));

var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").
  option("hoodie.datasource.write.partitionpath.field", "city").
  option("hoodie.embed.timeline.server", "false").
  option("hoodie.table.name", tableName).
  mode(Overwrite).
  save(basePath)

【映射到Hudi写操作】​​​​​​​Hudi提供了多种写操作——包括批量和增量写操作——以将数据写入Hudi表,这些操作具有不同的语义和性能。当未配置记录键(请参见下面的键)时,将选择bulk_insert作为写操作,这与Spark的Parquet数据源的非默认行为相匹配。

6-查询数据

// spark-shell
val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.createOrReplaceTempView("trips_table")

spark.sql("SELECT uuid, fare, ts, rider, driver, city FROM  trips_table WHERE fare > 20.0").show()
spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM  trips_table").show()

7-更新数据

// Lets read data from target Hudi table, modify fare column for rider-D and update it. 
val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-D").withColumn("fare", col("fare") * 10)

updatesDf.write.format("hudi").
  option("hoodie.datasource.write.operation", "upsert").
  option("hoodie.embed.timeline.server", "false").
  option("hoodie.datasource.write.partitionpath.field", "city").
  option("hoodie.table.name", tableName).
  mode(Append).
  save(basePath)

8-合并数据

// spark-shell
val adjustedFareDF = spark.read.format("hudi").
  load(basePath).limit(2).
  withColumn("fare", col("fare") * 10)
adjustedFareDF.write.format("hudi").
option("hoodie.datasource.write.payload.class","com.payloads.CustomMergeIntoConnector").
mode(Append).
save(basePath)
// Notice Fare column has been updated but all other columns remain intact.
spark.read.format("hudi").load(basePath).show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2270721.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode:面试题 17.01. 不用加号的加法(python3解法)

难度:简单 设计一个函数把两个数字相加。不得使用 或者其他算术运算符。 示例: 输入: a 1, b 1 输出: 2 提示: a, b 均可能是负数或 0结果不会溢出 32 位整数 题解: class Solution:def add(self, a: int, b: int) -> int:sum_list [a…

设计模式 结构型 适配器模式(Adapter Pattern)与 常见技术框架应用 解析

适配器模式(Adapter Pattern)是一种结构型设计模式,它允许将一个类的接口转换成客户端所期望的另一个接口,从而使原本因接口不兼容而无法一起工作的类能够协同工作。这种设计模式在软件开发中非常有用,尤其是在需要集成…

二维码文件在线管理系统-收费版

需求背景 如果大家想要在网上管理自己的文件,而且需要生成二维码,下面推荐【草料二维码】,这个系统很好。特别适合那些制造业,实体业的使用手册,你可以生成一个二维码,贴在设备上,然后这个二维码…

MySQL8安装与卸载

1.下载mysql MySQL :: Download MySQL Community Serverhttps://dev.mysql.com/downloads/mysql/ 2.解压mysql安装包 解压到自己定义的目录,这里解压就是安装,解压后的路径不要有空格和中文。 3.配置环境变量 配置环境变量可以方便电脑在任何的路径…

数据挖掘——关联规则挖掘

数据挖掘——关联数据挖掘 关联数据挖掘关联规则关联规则挖掘问题:具体挖掘过程Apriori 产生关联规则 关联数据挖掘 关联分析用于发现隐藏在大型数据集中的令人感兴趣的联系,所发现的模式通常用关联规则或频繁项集的形式表示。 关联规则反映一个事物与…

【74HC192减法24/20/72进制】2022-5-17

缘由用74ls192设计一个72进制的减法计数器,需要有逻辑电路图-硬件开发-CSDN问答

Samsung手机首次主要采用竞对Micron LPDDR5内存

根据韩国媒体《韩国先驱报》(The Korea Herald)的报道,即将在1月底发布的三星 Galaxy S25 系列智能手机将首次主要使用美光科技(Micron Technology)提供的移动DRAM,而非三星自家的产品。这一消息对于三星的…

Linux驱动开发学习准备(Linux内核源码添加到工程-Workspace)

Linux内核源码添加到VsCode工程 下载Linux-4.9.88源码: 没有处理同名文件的压缩包: https://pan.baidu.com/s/1yjIBXmxG9pwP0aOhW8VAVQ?pwde9cv 已把同名文件中以大写命名的文件加上_2后缀的压缩包: https://pan.baidu.com/s/1RIRRUllYFn2…

leetcode题目(3)

目录 1.加一 2.二进制求和 3.x的平方根 4.爬楼梯 5.颜色分类 6.二叉树的中序遍历 1.加一 https://leetcode.cn/problems/plus-one/ class Solution { public:vector<int> plusOne(vector<int>& digits) {int n digits.size();for(int i n -1;i>0;-…

vue3+Echarts+ts实现甘特图

项目场景&#xff1a; vue3Echartsts实现甘特图;发布任务 代码实现 封装ganttEcharts.vue <template><!-- Echarts 甘特图 --><div ref"progressChart" class"w100 h100"></div> </template> <script lang"ts&qu…

接受Header使用错Map类型,导致获取到的Header值不全

问题复现 在 Spring 中解析 Header 时&#xff0c;我们在多数场合中是直接按需解析的。例如&#xff0c;我们想使用一个名为 myHeaderName 的 Header&#xff0c;我们会书写代码如下&#xff1a;RequestMapping(path "/hi", method RequestMethod.GET) public Str…

GitHub的简单操作

引言 今天开始就要开始做项目了&#xff0c;上午是要把git搭好。搭的过程中遇到好多好多的问题。下面就说一下git的简单操作流程。我们是使用的GitHub,下面也就以这个为例了 一、GitHub账号的登录注册 https://github.com/ 通过这个网址可以来到GitHub首页 点击中间绿色的S…

【时时三省】(C语言基础)常见的动态内存错误

山不在高&#xff0c;有仙则名。水不在深&#xff0c;有龙则灵。 ----CSDN 时时三省 对NULL指针的解引用操作 示例&#xff1a; malloc申请空间的时候它可能会失败 比如我申请一块非常大的空间 那么空间可能就会开辟失败 正常的话要写一个if&#xff08;p&#xff1d;&#x…

【51项目】51单片机自制小霸王游戏机

视频演示效果&#xff1a; 纳新作品——小霸王游戏机 目录&#xff1a; 目录 视频演示效果&#xff1a; 目录&#xff1a; 前言&#xff1a; 一、连接方式&#xff1a; 1.1 控制引脚 1.2. 显示模块 1.3. 定时器 1.4. 游戏逻辑与硬件结合 1.5. 中断处理 二、源码分析&#xff1a…

ESP32-S3遇见OpenAI:OpenAI官方发布ESP32嵌入式实时RTC SDK

目录 OpenAI RTC SDK简介应用场景详解智能家居控制系统个人健康助手教育玩具 技术亮点解析低功耗设计快速响应高精度RTC安全性保障开发者指南 最近&#xff0c;OpenAI官方发布了一款针对ESP32-S3的嵌入式实时RTC&#xff08;实时时钟&#xff09;SDK&#xff0c;这标志着ESP32-…

Elasticsearch:减少 Elastic 容器镜像中的 CVE(常见的漏洞和暴露)

作者&#xff1a;来自 Elastic Maxime Greau 在这篇博文中&#xff0c;我们将讨论如何通过在 Elastic 产品中切换到最小基础镜像并优化可扩展漏洞管理程序的工作流程来显著减少 Elastic 容器镜像中的常见漏洞和暴露 (Common Vulnerabilities and Exposures - CVEs)。 基于 Chai…

计算机网络 (21)网络层的几个重要概念

前言 计算机网络中的网络层是OSI&#xff08;开放系统互连&#xff09;模型中的第三层&#xff0c;也是TCP/IP模型中的第二层&#xff0c;它位于数据链路层和传输层之间&#xff0c;负责数据包从源主机到目的主机的路径选择和数据转发。 一、网络层的主要功能 路由选择&#xf…

LED背光驱动芯片RT9293应用电路

一&#xff09;简介&#xff1a; RT9293 是一款高频、异步的 Boost 升压型 LED 定电流驱动控制器&#xff0c;其工作原理如下&#xff1a; 1&#xff09;基本电路结构及原理 RT9293的主要功能为上图的Q1. Boost 电路核心原理&#xff1a;基于电感和电容的特性实现升压功能。当…

第四届计算机、人工智能与控制工程

第四届计算机、人工智能与控制工程 The 4th International Conference on Computer, Artificial Intelligence and Control Engineering 重要信息 大会官网&#xff1a;www.ic-caice.net 大会时间&#xff1a;2025年1月10-12日 大会地点&#xff1a;中国合肥 (安徽大学磬苑…

【Rust 学习笔记】Rust 基础数据类型介绍——指针、元组和布尔类型

博主未授权任何人或组织机构转载博主任何原创文章&#xff0c;感谢各位对原创的支持&#xff01; 博主链接 博客内容主要围绕&#xff1a; 5G/6G协议讲解 高级C语言讲解 Rust语言讲解 文章目录 Rust 基础数据类型介绍——指针、元组和布尔类型一、元组类型…