Spark读取Hive数据的两种方式与保存数据到HDFS

news2025/2/24 12:32:34

Spark读取Hive数据的两种方式与保存数据到HDFS

Spark读取Hive数据的方式主要有两种

1、 通过访问hive metastore的方式,这种方式通过访问hive的metastore元数据的方式获取表结构信息和该表数据所存放的HDFS路径,这种方式的特点是效率高、数据吞吐量大、使用spark操作起来更加友好。

2、 通过spark jdbc的方式访问,就是通过链接hiveserver2的方式获取数据,这种方式底层上跟spark链接其他rdbms上一样,可以采用sql的方式先在其数据库中查询出来结果再获取其结果数据,这样大部分数据计算的压力就放在了数据库上。

两种方式的具体实现示例

首先创建Spark Session对象:

    val spark = SparkSession.builder()
      .appName("test")
      .enableHiveSupport()
      .getOrCreate()

方式一(推荐) 直接采用Spark on Hive的方式读取数据,这样SparkSession在使用sql的时候会去找集群hive中的库表,加载其hdfs数据与其元数据组成DataFrame

val df = spark.sql("select * from test.user_info")

方式二 采用spark jdbc的方式,如果有特别的使用场景的话也可以通过这种方法来实现。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
 
 
object test{
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("test")
      .getOrCreate()
 
    register() //如果不手动注册,只能获取到数据库中的表结构,而不能获取到数据
    val df = spark.read
      .format("jdbc")
      .option("driver","org.apache.hive.jdbc.HiveDriver")
      .option("url","jdbc:hive2://xxx:10000/")
      .option("user","hive")
      .option("password",xxx)
      .option("fetchsize", "2000")
      .option("dbtable","test.user_info")
      .load()
    df.show(10)
  }
 
  def register(): Unit = {
    JdbcDialects.registerDialect(HiveSqlDialect)
  }
 
 
  case object HiveSqlDialect extends JdbcDialect {
    override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")
 
    override def quoteIdentifier(colName: String): String = {
      colName.split('.').map(part => s"`$part`").mkString(".")
    }
  }
 
}

Spark的DataFrame和DataSet使用

​ DataFrame是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合。但与RDD不同的是,DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样。此外,多种数据都可以转化为DataFrame,例如Spark计算过程中生成的RDD、结构化数据文件、Hive中的表、外部数据库等。

在Spark中,一个DataFrame所代表的是一个元素类型为Row的Dataset,即DataFrame只是Dataset[Row]的一个类型别名。相对于RDD,Dataset提供了强类型支持,在RDD的每行数据加了类型约束。而且使用DatasetAPI同样会经过Spark SQL优化器的优化,从而提高程序执行效率。

DataFrame和R的数据结构以及python pandas DataFrame的数据结构和操作基本一致。

创建DataFrame、DataSet

  • 创建RDD
  • RDD转化为ROW
  • 通过ROW和元数据信息生成DataFrame
  • 然后通过DataFrame和对应的类转化为DataSet
  • 也就是说DataFrame是DataSet[Row],这里可以通过指定的类将其转化,DataSet[User]
  • 需要注意的事转化使用的类需要时内部类,然后就是类里的变量名要和元数据信息的列名保持对齐。
object MovieLenDataSet {
  case class User(UserID:String, Gender:String, Age:String, Occupation:String, Zip_Code:String)
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark = SparkSession.builder()
      .appName("MovieLenDataSet")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._

    val dataPath = "/home/ffzs/data/ml-1m"
    val schema4users = StructType(
      "UserID::Gender::Age::Occupation::Zip_code"
        .split("::")
        .map(it => StructField(it, StringType, nullable = true))
    )

    val usersRdd = spark.sparkContext.textFile(f"$dataPath/users.dat")
    val usersRows = usersRdd.map(_.split("::"))
      .map(it => {
        it.map(_.trim)
      })
      .map(it => Row(it(0), it(1), it(2), it(3), it(4)))
    val usersDF: DataFrame = spark.createDataFrame(usersRows, schema4users)
    val usersDataSet = usersDF.as[User]
    usersDataSet.show(5)
  }
}

Spark的DataFrame存储的Mode模式选择

spark的dataframe存储中都会调用write的mode方法:

data.write.mode(“append”).saveAsTable(s"u s e r i d . {userid}.userid.{datasetid}")
data.write.mode(SaveMode.Overwrite).parquet(hdfspath)

但不同时候的参数是不同的。

先看一下源码:

spark-v2.3.0:

  def mode(saveMode: SaveMode): DataFrameWriter[T] = {
    this.mode = saveMode
    this
  }

  /**
   * Specifies the behavior when data or table already exists. Options include:
   *   - `overwrite`: overwrite the existing data.
   *   - `append`: append the data.
   *   - `ignore`: ignore the operation (i.e. no-op).
   *   - `error` or `errorifexists`: default option, throw an exception at runtime.
   *
   * @since 1.4.0
   */
  def mode(saveMode: String): DataFrameWriter[T] = {
    this.mode = saveMode.toLowerCase(Locale.ROOT) match {
      case "overwrite" => SaveMode.Overwrite
      case "append" => SaveMode.Append
      case "ignore" => SaveMode.Ignore
      case "error" | "errorifexists" | "default" => SaveMode.ErrorIfExists
      case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
        "Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")
    }
    this
  }

SaveMode.Overwrite(对应着字符串"overwrite"):表示如果目标文件目录中数据已经存在了,则用需要保存的数据覆盖掉已经存在的数据

SaveMode.Append(对应着字符串"append"):表示如果目标文件目录中数据已经存在了,则将数据追加到目标文件中
数据追加方式是:先将表中的所有索引删除,再追加数据

SaveMode.Ignore(对应着字符串为:“ignore”):表示如果目标文件目录中数据已经存在了,则不做任何操作
SaveMode.ErrorIfExists(对应着字符串"error"):表示如果目标文件目录中数据已经存在了,则抛异常(这个是默认的配置)


spark之Dataframe保存模式

以前spark.write时总要先把原来的删了,但其实是可以设置写入模式的。

val df =  spark.read.parquet(input)
df.write.mode("overwrite").parquet(output)

dataframe写入的模式一共有4种:

  1. overwrite 覆盖已经存在的文件
  2. append 向存在的文件追加
  3. ignore 如果文件已存在,则忽略保存操作
  4. error / default 如果文件存在,则报错
def mode(saveMode: String): DataFrameWriter = {
    this.mode = saveMode.toLowerCase match {
      case "overwrite" => SaveMode.Overwrite              
      case "append" => SaveMode.Append                    
      case "ignore" => SaveMode.Ignore                    
      case "error" | "default" => SaveMode.ErrorIfExists  
      case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
        "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")
    }
    this
  }

spark write写入数据task failed失败,两种模式下的不同表现

1、SaveMode.Append

task失败重试,并不会删除上一次失败前写入的数据(文件根据分区号命名),重新执行时会继续追加数据。所以会出现数据重复。

2、SaveMode.Overwrite

task失败重试,会删除该分区上次失败所写入的数据文件,然后创建一个新的数据文件写入数据。所以不会出现数据重复。

启动spark任务报错:ERROR SparkUI: Failed to bind SparkUI

在这里插入图片描述
当启动一个spark任务的时候,就会占用一个端口,默认为4040,从日志可以看到当端口被占用时,它会默认依次增加16次到4056,如果还是失败的话,就会报错退出。

解决方法:

  1. 使用spark-submit提交任务时,在脚本中加配置:–conf spark.port.maxRetries=128(亲测有效)

参考博客:
https://blog.csdn.net/qq_42213403/article/details/117557610

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/193897.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

规则引擎-drools-4-动态生成drl文档

文章目录drools 引擎工作原理动态生成drl文件示例步骤模板文件 decision_rule_template.drt生成规则文件serviceDecisionNodeFact实体对象生成的drl字符串如下KieHealper 执行动态生成drl文件的原理实际应用过程中,很多时候,规则不是一成不变的&#xff…

54.Isaac教程--RealSense相机

RealSense相机 ISAAC教程合集地址: https://blog.csdn.net/kunhe0512/category_12163211.html 文章目录RealSense相机RealsenseCamera Codelet示例应用程序故障排除固件注意事项通过 USB 3.0 电缆使用 USB 3.0 端口x86_64 Linux 主机设置设置电源模型英特尔RealSense 435 摄像头…

分享159个ASP源码,总有一款适合您

ASP源码 分享159个ASP源码,总有一款适合您 下面是文件的名字,我放了一些图片,文章里不是所有的图主要是放不下..., 159个ASP源码下载链接:https://pan.baidu.com/s/1EaQuRA6mxyylrNWLq8iKVA?pwdaljz 提取码&#x…

springmvc知识巩固

文章目录回顾spring知识前言什么是SpringMVCSpringMVC的优点SpringMVC的常用注解Controller注解的作用ResponseBody注解的作用SpringMVC重定向和转发SpringMVC主要组件SpringMVC的执行流程回顾spring知识 上篇整理了“spring知识巩固”常见面试题,有需要的伙伴请点…

Java基础:源码讲解Collection及相关实现List、Set、Queue

1 缘起 说到Java第一问,很多人的第一反应是三大特性,那么接下来,可能就是集合了。 Collection是Java必知必会,即使没有系统学习,在实际的开发过程中,Collection也是应用最广泛的。 当然,一般的…

ESP-IDF:归并排序测试

ESP-IDF:归并排序测试 /归并排序测试/ void printarry18 (int arr[],int length) { for(int i0;i<length;i) { cout<<arr[i]<<" "; } cout<<endl; } void merge(int arr[],int start, int end, int mid,int * temp) { int length 0;//记录te…

进程间通信之管道(匿名管道与命名管道)

进程间通信之管道进程间通信管道什么是管道管道分类——1.匿名管道匿名管道举例管道的特点管道分类——2.命名管道创建一个命名管道举例命名管道的打开规则匿名管道与命名管道的区别具体使用举例&#xff1a;例子1-用命名管道实现文件拷贝例子2-用命名管道实现server&clien…

POI介绍简介

2.1 POI介绍 Apache POI是用Java编写的免费开源的跨平台的Java API&#xff0c;Apache POI提供API给Java程序对Microsoft Office格式档案读和写的功能&#xff0c;其中使用最多的就是使用POI操作Excel文件。 jxl&#xff1a;专门操作Excel maven坐标&#xff1a; <depend…

Linux中安装MySQL及常见问题汇总

一、安装前工作 1、卸载之前的数据库 在安装前需要确定现在这个系统有没有 mysql&#xff0c;如果有那么必须卸载 &#xff08;在 centos7 自带的是 mariaDb 数据库&#xff0c;所以第一步是卸载数据库&#xff09;。 #查看mariadb数据库&#xff1a; rpm -qa | grep maria…

moment.js根据时间戳计算与当前时间相差多少天

Moment旨在在浏览器和Node.js中工作。 所有代码都应该在这两种环境中都有效&#xff0c;并且所有单元测试都在这两种环境中运行。 目前&#xff0c;以下浏览器用于ci系统&#xff1a;Windows XP上的Chrome&#xff0c;Windows 7上的IE 8,9和10&#xff0c;Windows 10上的IE 1…

小车程序、安装vs2019和必要的相关软件

环境 sqlserver2019 vs2019企业版本 安装vs2019步骤 小车程序 C# .net vs2019 sqlserver2019 小车程序地址 小车运行的几种轨迹 一&#xff0c;自动运行到指定节点 找到manual按钮&#xff0c;找到目的节点&#xff0c;search move。 二&#xff0…

[数学] 三次样条

三次样条 已知有一组点x0,x1,x2,⋯,xnx_0, x_1, x_2, \cdots, x_nx0​,x1​,x2​,⋯,xn​, 其中, xt<xt1x_t<x_{t1}xt​<xt1​, y(xt)yty(x_t)y_ty(xt​)yt​, 及该点处的切线y′(xt)yt′y(x_t)y_ty′(xt​)yt′​ 每两个相邻的点之间可以作一个三次曲线 在所有相邻…

let/const相关内容(二)

let/const作用域提升 1&#xff09;根据前面所学的内容知道&#xff0c;var定义的变量是可以作用域提升的。 console.log(foo); // undefined var foo "foo"虽然在第一行中foo还没有被定义&#xff0c;但是在执行代码前&#xff0c;会预编译&#xff0c;先定义f…

ModStartBlog v6.6.0 多语言增强,缓存后台优化

ModStart 是一个基于 Laravel 模块化极速开发框架。模块市场拥有丰富的功能应用&#xff0c;支持后台一键快速安装&#xff0c;让开发者能快的实现业务功能开发。 系统完全开源&#xff0c;基于 Apache 2.0 开源协议。 功能特性 丰富的模块市场&#xff0c;后台一键快速安装会…

【完美解决】Github action报错remote: Write access to repository not granted.

报错及效果图报错代码效果图解决方案必要步骤可能有效的步骤报错及效果图 本解决方案是笔者通过Github action运行项目时报错的解决方案&#xff0c;如果是本地运行报此错&#xff0c;未必有效果。 报错代码 remote: Write access to repository not granted. fatal: unable t…

密评FAQ:如何确定网络和通信安全层面的测评对象?

信息系统一般通过网络技术来实现与外界的互联互通&#xff0c;GB/T 39786-2021《信息安全技术信息系统密码应用基本要求》规定了信息系统在网络和通信安全层面的密码应用技术要求&#xff0c;这些要求涉及到通信的主体&#xff08;通信双方&#xff09;、信息系统与网络边界外建…

(1)深入理解Java虚拟机-内存模型

深入理解Java虚拟机 Java虚拟机运行时数据区 程序计数器 ​ 程序计数器&#xff08;Program Counter Register&#xff09;是一块较小的内存空间&#xff0c;它可以看作是当前线程所执行的 字节码的行号指示器。在Java虚拟机的概念模型里 [1] &#xff0c;字节码解释器工作时…

大学开学必备清单、大学生必备的五件电子产品

转眼间就到了新一年的春季开学&#xff0c;在校生进入了新的一个年级学习。电子产品早就成为每个人的必备&#xff0c;尤其是大学生在校时期&#xff0c;更是上网课、日常查询资料的必备&#xff0c;当然还有一些社交、娱乐的因素也都是通过各式各样的电子产品来满足和实现。接…

android12 rockchip预置APK流程

方法一&#xff1a; 根据RK文档&#xff0c; 预制APK很简单, 首先source 环境之后执行命令&#xff1a;get _build_var TARGET_DEVICE_DIR 查看目标文件夹&#xff0c; 例如: device/rockchip/rk3568s/ 这个目录有三个文件夹&#xff1a; preinstall 不可卸载 preinstall_del_f…

基于text2vec进行文本向量化、聚类

基于text2vec进行文本向量化、聚类基于text2vec进行文本向量化、聚类介绍安装安装text2vec库安装transformers库模型下载文本向量化使用text2vec使用transformers文本聚类训练流程&#xff1a;训练代码推理流程推理代码基于text2vec进行文本向量化、聚类 介绍 文本向量表征工…