【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)

news2025/1/4 17:08:19

需要源码和依赖请点赞关注收藏后评论区留言私信~~~

一、Dataframe操作

步骤如下

1)利用IntelliJ IDEA新建一个maven工程,界面如下

2)修改pom.XML添加相关依赖包

3)在工程名处点右键,选择Open Module Settings

4)配置Scala Sdk,界面如下

5)新建文件夹scala,界面如下:

6) 将文件夹scala设置成Source Root,界面如下:

 

7) 新建scala类,界面如下:

 此类主要功能是读取D盘下的people.txt文件,使用编程方式操作DataFrame,相关代码如下

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
case class Person(name:String,age:Long)
object sparkSqlSchema {
  def main(args: Array[String]): Unit = {

    //创建Spark运行环境
    val spark = SparkSession.builder().appName("sparkSqlSchema").master("local").getOrCreate()
    val sc = spark.sparkContext;
    //读取文件
    val data: RDD[Array[String]] = sc.textFile("D:/people.txt"). map (x => x.split(","));
    //将RDD与样例类关联
    val personRdd: RDD[Person] = data. map (x => Person(x(0),x(1).toLong))
    //手动导入隐式转换
    import spark.implicits._
    val personDF: DataFrame = personRdd.toDF
    //显示DataFrame的数据
    personDF.show()
    //显示DataFrame的schema信息
    personDF.printSchema()
    //显示DataFrame记录数
    println(personDF.count())
    //显示DataFrame的所有字段
    personDF.columns.foreach(println)
    //取出DataFrame的第一行记录
    println(personDF.head())
    //显示DataFrame中name字段的所有值
    personDF.select("name").show()
    //过滤出DataFrame中年龄大于20的记录
    personDF.filter($"age" > 20).show()
    //统计DataFrame中年龄大于20的人数
    println(personDF.filter($"age" > 20).count())
    //统计DataFrame中按照年龄进行分组,求每个组的人数
    personDF.groupBy("age").count().show()
    //将DataFrame注册成临时表
    personDF.createOrReplaceTempView("t_person")
    //传入sql语句,进行操作
    spark.sql("select * from t_person").show()
    spark.sql("select * from t_person where name='王五'").show()
    spark.sql("select * from t_person order by age desc").show()
    //DataFrame转换成Dataset
    var ds=personDF.as[Person]
    ds.show()
    //关闭操作
    sc.stop()
    spark.stop()
  }



}

二、Spark SQL读写MySQL数据库

下面的代码使用JDBC连接MySQL数据库,并进行读写操作 主要步骤如下

1:新建数据库

2:新建表

3:添加依赖包

4:新建类

5:查看运行结果

代码如下

import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode,SparkSession}

object sparkSqlMysql {
  def main(args: Array[String]): Unit = {
    //创建sparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .appName("sparkSqlMysql")
      .master("local")
      .getOrCreate()
    val sc = spark.sparkContext
    //读取数据
    val data: RDD[Array[String]] = sc.textFile("D:/people.txt").map(x => x.split(","));
    //RDD关联Person
    val personRdd: RDD[Person] = data.map(x => Person(x(0), x(1).toLong))
    //导入隐式转换
    import spark.implicits._
    //将RDD转换成DataFrame
    val personDF: DataFrame = personRdd.toDF()
    personDF.show()
    //创建Properties对象,配置连接mysql的用户名和密码
    val prop =new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","123456")
    //将personDF写入MySQL

    personDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/spark?useUnicode=true&characterEncoding=utf8","person",prop)
    //从数据库里读取数据
    val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/spark", "person",                   prop)
    mysqlDF.show()
    spark.stop()
  }


}

三、Spark SQL读写Hive

下面的示例程序连接Hive,并读写Hive下的表 主要步骤如下

1:在pom.xml中添加Hive依赖包

2:连接Hive

3:新建表

4:向Hive表写入数据,新scala类sparksqlToHIVE,主要功能是读取D盘下的people.txt文件,使用编程方式操作DataFrame,然后插入到HIVE的表中。

5:查看运行结果

代码如下

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame,SparkSession}
object  sparksqlToHIVE {
  def main(args: Array[String]): Unit = {
    //设置访问用户名,主要用于访问HDFS下的Hive warehouse目录
    System.setProperty("HADOOP_USER_NAME", "root")
    //创建sparkSession
    val spark: SparkSession = SparkSession.builder()
      .appName("sparksqlToHIVE")
      .config("executor-cores",1)
      .master("local")
      .enableHiveSupport() //开启支持Hive
      .getOrCreate()
    val sc = spark.sparkContext
    //读取文件
    val data: RDD[Array[String]] = sc.textFile("D:/people.txt"). map (x => x.split(","));
    //将RDD与样例类关联
    val personRdd: RDD[Person] = data. map (x => Person(x(0),x(1).toLong))
    //手动导入隐式转换
    import spark.implicits._
    val personDF: DataFrame = personRdd.toDF
    //显示DataFrame的数据
    personDF.show()
    //将DataFrame注册成临时表t_person
    personDF.createOrReplaceTempView("t_person")
    //显示临时表t_person的数据
    spark.sql("select * from t_person").show()
    //使用Hive中bigdata的数据库
    spark.sql("use bigdata")
    //将临时表t_person的数据插入使用Hive中bigdata数据库下的person表中
    spark.sql("insert into person select * from t_person")
    //显示用Hive中bigdata数据库下的person表数据
    spark.sql("select * from person").show()
    spark.stop()
  }

}

创作不易 觉得有帮助请点赞关注收藏~~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/100123.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

整数的大小端序

在存储整数时,一般按字节为逻辑单位进行存储,有“小端序”和“大端序”之分。小端序(little-endian) 是指将表示整数的低位字节存储在内存地址的低位,高位字节存储在内存地址的高位。如果将整数 1982062410 存储至内存…

【CANN训练营第三季】2022年度第三季新手班之昇腾AI入门课

本次参加CANN训练营,本来我报名的是进阶班课程,再看一遍新手班,学习一下目前CANN的最新进展也是不错的,巩固一下。 视频课程大家可以从这里看到 (1)【CANN训练营第三季】- 昇腾AI入门课(上&am…

使用Keepalived工具实现集群节点的高可用

GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。GreatSQL是MySQL的国产分支版本,使用上与MySQL一致。作者:蟹黄瓜子文章来源:社区投稿 1.前言 在集群当中离不开的一个词就是是高可用,用本文来…

OpenWrt + 每步科技DDNS 实现ipv6动态域名解析方法

其实好几个月前我就已经把这个动态域名设置好了,后面重新刷了系统,忘记保存,又得重新再来,这次把过程记录一下,免得下次再从头百度。 工具 刷好openWrt的路由器一个每步科技注册的域名(我为什么选择这个&…

数字电子技术(八)D/A和A/D转换

D/A和A/D转换概述D/A转换A/D转换例题练习模拟信号:在时间与数值上都连续 数字信号:在时间与数值上都离散 概述 D/A转换:数字信号——模拟信号 (D/A转换器简称DAC)A/D转换:模拟信号——数字信号 &#xff0…

修改物料编号格式及长度

修改物料编号格式及长度(OMSL) 路径:IMG--后勤常规--物料主数据--基本设置--定义物料编号的输出格式

毕业设计 - 基于JSP的合同信息管理系统【源码+论文】

文章目录前言一、项目设计1. 模块设计数据库设计2. 实现效果二、部分源码项目源码前言 今天学长向大家分享一个 java web jsp 项目: 基于JSP的合同信息管理系统 适合用于毕业设计、课程设计 一、项目设计 1. 模块设计 需求分析是从客户的需求中提取出软件系统能够帮助用户…

java互联网医院系统HIS源码带本地搭建教程

技术架构 技术框架:SpringBoot MySql MyBatis nginx Vue2.6 原生APP 运行环境:jdk8 IntelliJ IDEA maven 宝塔面板 Android Studio 文字本地搭建教程 下载源码,小皮面板安装mysql5.7数据库,创建一个新数据库,…

引力波探测,冷冻电镜研究:两项诺奖GPU功不可没

我们的日常工作固然重要,但并非每一份重要的工作都能够助力他人获得诺贝尔奖。然而,就在2017年10月,GPU 计算便两度成为了助力获得诺贝尔奖的幕后英雄。 三名美国物理学家Rainer Weiss、Barry Barish和Kip Thorne因探测到了爱因斯坦百年前预测…

从“跨域融合”到“中央计算”,这家Tier1如何率先抢跑?

全球汽车产业已经进入以智能化为主旋律的下半场竞赛,同时整车电子电气架构也在加速跨入集中式电子电气架构时代。 在这样的背景之下,智能驾驶域控制器成为了当前最大的增量市场之一,由此也带动了上游芯片、OS、中间件等域控相关软硬件产品的…

第13讲:Python列表对象中元素的删操作

文章目录1.列表元素删操作的方法2.调用remove方法一次删除一个指定的元素3.调用pop方法一次只删除一个指定索引的元素3.1.使用pop方法删除列表中索引为2的元素3.2.使用pop方法不指定索引3.3.使用pop方法指定的索引不存在时同样也会抛出错误4.使用del语句一次至少删除一个元素4.…

nodejs+vue082新生入学管理系统-vscode msyql

一章 绪论 3 1.1课题背景 3 1.2课题研究的目的和意义 3 1.3 研究现状 4 1.4论文所做的主要工作 4 第二章 技术介绍 5 2.1 B/S结构 5 2.2MySQL介绍 5 2.3MySQL环境配置 6 第三章 系统分析与设计 8 3.1系统说明 8 3.2系统可行性分析 8 3.2.1 技术可行性 8 3.2.2 经济可行性 8 3…

Vue3——路由和嵌套路由的使用

路由的作用 用来在前端的页面实现页面的切换,比如下图中acwing的页面应该就是采用了路由来设计导航栏,在每一次点击不同选项时只有网址后面的索引发生变化,网页并没有刷新 路由的使用方法: 根据导航栏处的跳转的页面的不同&…

如何创建vue项目

一. 环境准备 1.安装node.js 推荐地址:Node.js 2.检查是否安装完成:输出版本号说明安装成功 二.搭建vue环境 1.全局安装vue/cli 推荐地址:快速上手 | Vue.js 2.在命令输入 npm install -g vue/cli 如果使用yarn global add vue/cli 需要…

JVM--基础--19.7--垃圾收集器--G1

​ JVM–基础–19.7–垃圾收集器–G1 1、结构图 ​ 2、G1收集器(面向服务端) 2.1、特点 2.1.1、并行于并发 使用多个CPU(CPU或者CPU核心)来缩短stop-The-World停顿时间,其他需要停顿Java线程执行的GC动作,G1收集器仍然可以通过并发的方式让java程序…

​汽车芯片的可靠性设计:控制亚稳态,提升稳定性

【作者简介】Dr. Roy 复睿微 IC后端工程师,南开大学与韩国首尔国立大学联合培养博士。博士期间发表高水平学术期刊论文多篇,其中一作一区封面文章2篇;授权发明专利5项。同时,在先进工艺大芯片的静态时序分析、芯片设计流程提效优化…

对某擦边站点的一次渗透

更新时间:2022.07.05 2022年11月21日21:50:12 1. 说明 在上半年的时候,在线浏览网页的时候,突然跳转到了一个sese的界面,然后要下载app,本着弹出即下载的原则,我就欣然安装了: app本身长这样…

Dubbo架构设计与源码解析(一) 架构设计

作者:黄金 一、架构演变 单应用架构 ----> 垂直架构 ----> 分布式架构 ----> 微服务架构 ----> 云原生架构 二、Dubbo总体架构 1、角色职能 • Container:服务容器 (tomcat、jetty、weblogic) • Provider&#xf…

Web Spider NEX XX国际货币经纪 - PDF下载 提取关键词(二)

Web Spider NEX XX国际货币经纪 - PDF下载 & 解析 首先声明: 此次案例只为学习交流使用,切勿用于其他非法用途 文章目录Web Spider NEX XX国际货币经纪 - PDF下载 & 解析前言一、任务说明1.PDF下载2.PDF解析提取关键词数据二、Pip模块安装三、网站分析四、核…

【解决】Unity Player Log 自生成造成磁盘满占用率问题

开发平台:Unity 2020 编程平台:Visual Studio 2022 编程语言:CSharp   问题描述 Unity 工程完成打包与发布过程后,在运行时生成大量 Player Log 的日志文件导致其所在盘占用率满额问题。通常情况下,这类日志文件信息…