Spark SQL

news2025/1/11 17:06:49

一、理解介绍

Spark SQL是spark中用于结构化数据处理的组件,可访问多种数据源,如连接Hive、MySQL,实现读写等操作。为什么要用spark去操作这些数据库呢?hive是一个基于Hadoop的数据仓库工具,hive的查询操作语句都要依赖于MapReduce任务进行处理,spark的计算效率比MapReduce高,spark SQL 在hive兼容层面做了进一步优化,所以如果用spark引擎与hive交互性能会显著提高。

二、RDD和DataFrame

RDD:弹性分布式数据集,把要执行的数据集可以通过RDD转换成更方便操作的形式,比如一段话通过RDD处理成一个一个词,再对词做下一步操作(计数、对比等等)

RDD是分布式的java对象集合,但是对象内部结构对于RDD而言是不可知的

DataFrame:是以RDD为基础的分布式数据集,提供详细的结构信息,相当于关系库中的一张表。

RDD可以转换成DataFrame:

1.利用反射机制推断RDD模式 2.编程方式定义RDD模式

利用反射机制推断RDD模式时需要先定义一个case class,因为只有ase class才能被spark隐式的转换成DataFrame。系统把文件employee.txt加载到内存生成一个RDD,每个RDD元素都是string类型,比如Array(“Ella”,"24")这一行记录是一个RDD,然后调用map(_.split(" "))方法得到一个新的RDD,还可根据需求继续对RDD进行转换,最后转换成想要的形式,比如转换成这种类型[namestring , age: int ]然后执行toDF()操作,实现把RDD转换成DataFrame,可以看出,DataFrame就像一张有数据结构的表,不再是一个个没有联系的RDD块。

 2.编程方式定义RDD模式。使用编程接口构造一个模式(schema),并将其应用在已知的RDD上

第一步:制作“表头”

第二步:“制作表中记录”

第三部:把“表头”和“表中记录”拼接

通过val schema = StructType(fields)语句把fields作为输入生成一个StructTyped对象即schema里面包含表的模式信息,
生成表头,把RDD的一行元素转换成想要的和表头类型匹配的样子,比如中age是int型,RDD要toInt
   对应转换,元素转换完成就是“表中记录”,把“表头”和“表中记录”拼接在一起得到一个有结构的DataFrame

三、通过Spark SQL读写数据库

1、读写MySQL

在idea中添加依赖,读写操作的代码都是写在idea上的scala代码,通过打包push到远程仓库gitee上,再拉到linux上再spark-local模式下执行

 读MySQL数据库中的表:

package edu.hnuahe.Yyj_Test
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
object SparkReadMySQL { //edu.hnuahe.Yyj_Test.SparkReadMySQL
  def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder().appName("SparkReadMySQL").getOrCreate()
    val df =spark.read.format("jdbc").option("url","jdbc:mysql://yyj4601:3306/spark")
      .option("driver","com.mysql.jdbc.Driver").option("dbtable","student") //表名
      .option("user","root") //y用户名
      .option("password","***") //密码
      .load()
    df.show()
    spark.stop()

  }

}
//hive和spark尽量不在同一台主机

写入MySQL数据库中的表中:

package edu.hnuahe.Yyj_Test

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import java.util.Properties

object SparkWriteMySQL {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("SparkWriteMySQL").getOrCreate()
    val studentRDD = spark.sparkContext
      .parallelize(Array("5 Ironman M 23",
                          "6 Bumblebee M 21"))
      .map(_.split(" "))
    //val schema = StructType(fields)语句把fields作为输入生成一个StructTyped对象即schema里面包含表的模式信息,也就是表头
    val schema = StructType(List(StructField("id",IntegerType,nullable = true),
                                  StructField("name",StringType,nullable = true),
                                   StructField("gender",StringType,nullable = true),
                                  StructField("age",IntegerType,nullable = true)))
    //创建一个row对象,每个row对象是rowRDD中的一行
    val rowRDD = studentRDD.map(p => Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
    //建立row对象和模式之间的关系也就是把数据和模式对应
    val studentDF =spark.createDataFrame(rowRDD,schema)
    //创建一个prop变量保存jdbc连接参数
    val prop = new Properties()
    prop.put("user","root")
    prop.put("password", "yyj@123")
    prop.put("driver", "com.mysql.jdbc.Driver")
    studentDF.write.mode("append")
      .jdbc("jdbc:mysql://yyj4601:3306/spark",
        "spark.student",prop)
    spark.stop()
  }

}

 

2、读写Hive


读之前Hive里要提前存在要读的表

package edu.hnuahe.Yyj_Test

import org.apache.spark.sql.SparkSession

object sparkReadHive {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("sparkReadHive")
      .enableHiveSupport().getOrCreate()
    import spark.sql
    sql("SELECT * FROM sparktest.student").show(false)
    spark.stop()
  }

}

执行前要在spark-local/conf/spark-env中添加环境

如果报错:“UnresolvedRelation”未解决的关系

1.可能是NameNode和Hive不在同一结点上,2.也可能环境变量中为指明是spark-local模式

对于1.的解决:把hive目录下的conf/hive-site.xml复制一份到spark-local/conf中

对于2.的解决:vim/etc/profile  把spark-local环境变量添加上,退出后执行source/etc/profile刷新环境变量

以上基本就能解决非代码敲错的报错:

读取Hive中的表成功

 写入Hive:

package edu.hnuahe.Yyj_Test

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object sparkWriterHive {
  def main(args: Array[String]): Unit = {
    val warehouseLocation = "spark-warehouse"
    val spark = SparkSession.builder().appName("sparkWriteHive")
      .config("spark.sql.warehouse.dir",warehouseLocation)
      .enableHiveSupport().getOrCreate()
    //2、设置两条数据表示两个学生信息
    val studentRDD = spark.sparkContext
      .parallelize(Array("5 Ironman M 23",
                         "6 Bumblebee M 21" ))
      .map(_.split(" "))
    val schema = StructType(List(StructField("id",IntegerType,nullable = true),
      StructField("name",StringType,nullable = true),
      StructField("gender",StringType,nullable = true),
      StructField("age",IntegerType,nullable = true)))

    //4、创建row对象每个row对象都是rowrdd中的一行
    val rowRDD = studentRDD.map(p => Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
    //5、建立row对象和模式之间的关系也就是把数据和模式对应
    val studentDF = spark.createDataFrame(rowRDD,schema)
//查看视图
    studentDF.show()

    //6、创建视图
    studentDF.createTempView("tempTable")
    //7、执行插入语句
    import spark.sql
    sql("INSERT INTO sparktest.student SELECT * FROM tempTable")
    spark.stop()
  }

}

四、Spark SQL基本操作

1、Linux中employee.json:/home/test/

 读取json文件创建dataframe:

2、用scala语句完成下列操作:

  1. 查询所有数据

 df.select(df("id"),df("name"),df("age")).show()

2.查询所有数据,并去出重复数据

df.select(df("id"),df("name"),df("age")).distinct().show()或者df.distinct().show()

3.查询所有数据,打印时去除id字段

df.select(df("name"),df("age")).show()或df.drop(“id”).show()

4.筛选出age>30的记录

df.filter(df("age")>30).show()

5将数据按照age分组

df.groupBy("age").count().show()

6.数据按照name升序排列

df.select(df("id"),df("name"),df("age")).sort(df("name").asc).show() 或df.sort(df("name").asc).show()

7.取出前三行数据

df.head(3)或df.take(3)

2.查询所有记录的name列,并为其取别名为username

df.select(df("name").as("username")).show()

9.查询age的平均值

df.agg("age"->"avg").show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/495376.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

走进两邻文化,全民禁种铲毒——禁毒公益大集活动

四月梧桐芳菲尽,五月槐花飘香来。五月的春风赋予了劳动者应有的权利和由衷的自豪感;五月的春雨浇灌了我们肩负禁毒工作的义务和责任的使命感。现在也是今年禁种铲毒工作的深入执行阶段,禁毒工作一直以来都是维护社会稳定的重要工作之一&#…

C语言函数大全-- s 开头的函数(4)

C语言函数大全 本篇介绍C语言函数大全-- s 开头的函数(4) 1. strdup 1.1 函数说明 函数声明函数功能char * strdup(const char *s);用于将一个以 NULL 结尾的字符串复制到新分配的内存空间中 注意: strdup() 函数返回指向新分配的内存空间…

EasyMedia播放rtsp视频流(vue2、vue3皆可用)

之前发布过WebRtc播放rtsp视频流的博客,已经解决了web播放rtsp的问题,但WebRtc太耗内存,且需要命令行启动,对用户不太友好,虽然可以写脚本,让用户一键启动。这是无意间发现的另一种web播放rtsp视频流的办法…

相当Python程序员,选择培训班还是自学?我结合自己的经历谈谈看法

前几天我写了一篇文章,分享了自己当上程序员的经历。然后,我收到了很多小伙伴的提问,都在问同一个问题,即如何选择报培训班还是自学。今天,我结合自己的个人经历,来谈一下个人的看法。 我认为这个问题的第…

C/C++内存管理以及new/delete的底层实现。

一、C/C 内存分布 我们平常写代码所用的内存叫虚拟内存,是操作系统分配给每个进程的4G的内存,其中3G叫用户空间,1G叫内核空间。 我们所用的也就是3G的用户空间,如下图: 说明: 1. 代码段—可执行的代码/只…

【Linux】Linux学习之常用命令一

介绍 这里是小编成长之路的历程,也是小编的学习之路。希望和各位大佬们一起成长! 以下为小编最喜欢的两句话: 要有最朴素的生活和最遥远的梦想,即使明天天寒地冻,山高水远,路远马亡。 一个人为什么要努力&a…

【嵌入式笔/面试】嵌入式软件基础题和真题总结——单片机与Linux

在学习的时候找到几个十分好的工程和个人博客,先码一下,内容都摘自其中,有些重难点做了补充! 才鲸 / 嵌入式软件笔试题汇总 嵌入式与Linux那些事 阿秀的学习笔记 小林coding 百问网linux 嵌入式软件面试合集 2022年春招实习十四面…

在离职1年后,我后悔了,决定再战阿里,涨薪50%,成为卷王

2021年初,我通过一整天的笔试及面试加入一家(某一线城市国资委全资控股)某集团的研究机构(中央研究院),任职中级软件测试工程师;在这边工作了整整一年,目前已经跳槽到一家互联网公司…

Linux之系统j基本设置(四)

1、Linux 系统基本设置 1、系统时间管理 查看系统当前时间和时区 [root192 ~]# date 2023年 05月 04日 星期四 22:43:16 EDT [root192 ~]# date -R Thu, 04 May 2023 22:43:24 -0400 [root192 ~]# date %Y %m %d %H:%M:%S 2023 05 04 22:43:38设置完整时间 [root192 ~]# da…

智能安防系统-视频监控系统

一、智能安防系统 1、智能安防系统介绍 安全防范系统成为了智慧城市与物联网行业应用中的一个非常重要的子系统。 安防系统主要包括:视频监控系统、入侵报警系统、出入口控制系统、电子巡查系统以及智能停车场管理系统等5个子系统。 AI人工智能安防系统功能&#xf…

第三十四章 Unity人形动画(上)

在我们DirectX课程中,我们讲过一个模型最少拥有网格和材质,可以没有动画。游戏场景中的静态物体就可以是这样的模型,例如花草树木,建筑物等等,他们通过MeshRenderer就可以渲染。对于一个带有动画的FBX文件,…

爬虫实验笔记

这里的爬虫实验害暂时没有遇到验证码等问题,步骤可以简单概括为: 1.找到爬虫必要的信息; 2.内容提取; 3.将提取到的内容保存至xlsx文件 1.找到爬虫必要的信息 以zh为例,首先找一个自己感兴趣的贴,进入开…

webp格式转换成jpg,webp转jpg方法步骤

webp格式转换成jpg,webp转jpg方法步骤。办公室工作集科学性、规范性于一体,仅凭过去的一些经验和习惯,很难提升工作的质量和水平。因此,作为办公室工作人员来说,必须要以科学的理念,运用现代办公管理软件来…

PSP - D-I-TASSER DeepMSA2 源码简读

欢迎关注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://blog.csdn.net/caroline_wendy/article/details/130519945 DIT:https://zhanggroup.org/D-I-TASSER/ D-I-TASSER (Deep-learning based Iterative Threading ASSEmbly Refin…

MySQL学习笔记第八天

第07章单行函数 4. 日期和时间函数 4.6 计算日期和时间的函数 第1组: 函数用法DATE_ADD(datetime,lNTERVAL exprtype),ADDDATE(date,INTERVAL exprtype)返回与给定日期时间相差INTERVAL时间段的日期时间DATE_SUB(date,lNTERVAL expr type)&#xff0…

vulnhub dc-5

1.信息搜集 官方文档描述 主要内容不会使用到ssh,进入的方式很难被发现,是改变页面刷新的方法,只有一个flag nmap扫描 存活主机 192.168.85.176 端口 80 111 中间件: nginx 2.访问网站,进行进一步信息搜集 通过这两张…

加速 AI 训练,如何在云上实现灵活的弹性吞吐?

AI 已经成为各行各业软件研发的基础,带来了前所未有的效率和创新。今天,我们将分享苏锐在AWS量化投研行业活动的演讲实录,为大家介绍JuiceFS 在 AI 量化投研领域的应用经验,也希望为其他正在云上构建机器学习平台,面临…

4面美团测试工程师,因为这个小细节,直接让我前功尽弃.....

说一下我面试别人时候的思路 反过来理解,就是面试时候应该注意哪些东西;用加粗部分标注了 一般面试分为这么几个部分: 一、自我介绍 这部分一般人喜欢讲很多,其实没必要。大约5分钟内说清楚自己的职业经历,自己的核…

C++11入门基础知识

文章目录 C11简介列表初始化std::initializer_list 变量类型推导nullptr范围for循环STL中的一些变化 C11简介 在2003年C标准委员会曾经提交了一份技术勘误表(简称TC1),使得C03这个名字已经取代了C98称为C11之前的最新C标准名称。不过由于C03(TC1)主要是对C98标准中…

阿里云Alibaba Cloud Linux镜像操作系统性能兼容如何?

阿里云服务器操作系统Alibaba Cloud Linux镜像怎么样?可以代替CentOS吗?Alibaba Cloud Linux兼容性如何?有人维护吗?漏洞可以修复吗?Alibaba Cloud Linux完全兼容CentOS,并由阿里云官方免费提供长期维护。 …