Spark编程实验三:Spark SQL编程

news2025/1/11 15:58:28

目录

一、目的与要求

二、实验内容

三、实验步骤

1、Spark SQL基本操作

2、编程实现将RDD转换为DataFrame

3、编程实现利用DataFrame读写MySQL的数据

四、结果分析与实验体会


一、目的与要求

1、通过实验掌握Spark SQL的基本编程方法;
2、熟悉RDD到DataFrame的转化方法;
3、熟悉利用Spark SQL管理来自不同数据源的数据。

二、实验内容

1、Spark SQL基本操作

        将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:
(1)查询所有数据;
(2)查询所有数据,并去除重复的数据;
(3)查询所有数据,打印时去除id字段;
(4)筛选出age>30的记录;
(5)将数据按age分组;
(6)将数据按name升序排列;
(7)取出前3行数据;
(8)查询所有记录的name列,并为其取别名为username;
(9)查询年龄age的平均值;
(10)查询年龄age的最小值。

2、编程实现将RDD转换为DataFrame

        源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

3、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

三、实验步骤

1、Spark SQL基本操作

        将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:

>>> spark=SparkSession.builder.getOrCreate()
>>> df = spark.read.json("file:///home/zhc/mycode/sparksql/employee.json")

(1)查询所有数据;

>>> df.show()

(2)查询所有数据,并去除重复的数据;

>>> df.distinct().show()

(3)查询所有数据,打印时去除id字段;

>>> df.drop("id").show()

(4)筛选出age>30的记录;

>>> df.filter(df.age > 30).show()

(5)将数据按age分组;

>>> df.groupBy("age").count().show()

(6)将数据按name升序排列;

>>> df.sort(df.name.asc()).show()

(7)取出前3行数据;

>>> df.take(3)

(8)查询所有记录的name列,并为其取别名为username;

>>> df.select(df.name.alias("username")).show()

(9)查询年龄age的平均值;

>>> df.agg({"age": "mean"}).show()

(10)查询年龄age的最小值。

>>> df.agg({"age": "min"}).show()

2、编程实现将RDD转换为DataFrame

        源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

首先,在“/home/zhc/mycode/sparksql”目录下创建文件employee.txt

​​​​​​​[root@bigdata sparksql]# vi employee.txt

然后,在该目录下新建一个py文件命名为rddtodf.py,然后写入如下py程序:

[root@bigdata sparksql]# vi rddtodf.py
#/home/zhc/mycode/sparksql/rddtodf.py
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
if __name__ == "__main__":
        sc = SparkContext("local","Simple App")
        spark=SparkSession(sc)
        peopleRDD = spark.sparkContext.textFile("file:home/zhc/mycode/sparksql/employee.txt")
        rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()
        rowRDD.createOrReplaceTempView("employee")
        personsDF = spark.sql("select * from employee")
        personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)

最后,运行该程序:

[root@bigdata sparksql]# python3 rddtodf.py

3、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

首先,启动mysql服务并进入到mysql数据库中:

[root@bigdata sparksql]# systemctl start mysqld.service
[root@bigdata sparksql]# mysql -u root -p

然后开始接下来的操作。

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

mysql> create database sparktest;
mysql> use sparktest;
mysql> create table employee (id int(4), name char(20), gender char(4), age int(4));
mysql> insert into employee values(1,'Alice','F',22);
mysql> insert into employee values(2,'John','M',25);

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

首先,在“/home/zhc/mycode/sparksql”目录下面新建一个py程序并命名为mysqltest.py。

[root@bigdata sparksql]# vi mysqltest.py

接着,写入如下py程序: 

#/home/zhc/mycode/sparksql/mysqltest.py
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
#下面设置模式信息
schema = StructType([StructField("id",IntegerType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
employeeRDD = spark.sparkContext.parallelize(["3 Mary F 26","4 Tom M 23","5 zhanghc M 21"]).map(lambda x:x.split(" "))
#下面创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = employeeRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
#建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
employeeDF = spark.createDataFrame(rowRDD, schema)
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = 'MYsql123!'
prop['driver'] = "com.mysql.jdbc.Driver"
employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest?useSSL=false",'employee','append', prop)
employeeDF.collect()
employeeDF.agg({"age": "max"}).show()
employeeDF.agg({"age": "sum"}).show()

然后,直接运行该py程序即可得到结果:

[root@bigdata sparksql]# python3 mysqltest.py

最后,到MySQL Shell中,即可查看employee表中的所有信息。

mysql> select * from employee;

四、结果分析与实验体会

        Spark SQL是Apache Spark中用于处理结构化数据的模块。它提供了一种类似于SQL的编程接口,可以用于查询和分析数据。通过实验掌握了Spark SQL的基本编程方法,SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。
        在使用Spark SQL之前,需要创建一个SparkSession对象。可以使用SparkSession的read方法加载数据。可以使用DataFrame的createOrReplaceTempView方法将DataFrame注册为一个临时视图。可以使用SparkSession的sql方法执行SQL查询。除了使用SQL查询外,还可以使用DataFrame的API进行数据操作和转换。可以使用DataFrame的write方法将数据写入外部存储。在使用完SparkSession后,应该调用其close方法来关闭SparkSession。
        最后,还掌握了RDD到DataFrame的转化方法,并可以利用Spark SQL管理来自不同数据源的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1332630.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot实现发送邮件开箱即用

springboot实现发送邮件开箱即用 环境依赖包yml配置Service层Controller层测试 环境 jdk17 springboot版本3.2.1 依赖包 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId><ver…

【OAuth2】:赋予用户控制权的安全通行证--原理篇

&#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于OAuth2的相关操作吧 目录 &#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 一.什么是OAuth? 二.为什么要用OAuth?…

第11章 GUI Page400~402 步骤二 画直线

运行效果&#xff1a; 源代码&#xff1a; /**************************************************************** Name: wxMyPainterApp.h* Purpose: Defines Application Class* Author: yanzhenxi (3065598272qq.com)* Created: 2023-12-21* Copyright: yanzhen…

《每天一分钟学习C语言·九》引用,指针函数,函数指针等

1、 普通全局变量——作用域是整个源程序&#xff08;含有多个源文件&#xff0c;在各个源文件中都有效&#xff09; static全局变量——作用域在当前源文件 2、 引用 &#xff08;1&#xff09;申明引用的同时必须要初始化 &#xff08;2&#xff09;引用变量是目标变量的一个…

Windows 11中显示文件扩展名的方法与Windows 10大同小异,但前者更人性化

默认情况下&#xff0c;Windows 11会隐藏已知文件类型的文件扩展名。这可能会使在不首先打开文件的情况下很难识别文件类型。 幸运的是&#xff0c;你可以将Windows 11配置为显示已知文件类型的扩展名。该方法类似于Windows 10&#xff0c;但该选项现在组织在下拉菜单中&#…

分布式锁功效初探——以电商问题为例

文章目录 电商库存问题单机处理-Sychronized多机器处理-分布式锁入门级别&#xff0c;用redis实现&#xff0c;setnx问题1&#xff1a;逻辑可能异常&#xff0c;造成死锁问题2&#xff1a;机器宕机问题3&#xff1a;锁一直失效&#xff0c;乱套锁续命 redisson分布式丢锁问题主…

python实现元旦多种炫酷高级倒计时_附源码【第20篇—python过元旦】

文章目录 &#x1f30d;python实现元旦倒计时 — 初级(控制台)⛅实现效果&#x1f30b;实现源码&#x1f31c;源码讲解 &#x1f30d;python实现元旦倒计时 — 中级(精美动态图)⛅实现效果&#x1f30b;实现源码&#x1f31c;源码讲解 &#x1f30d;python实现元旦倒计时 — 高…

C++使用HTTP库和框架轻松发送HTTP请求

编程中使用 HTTP 库或框架发送 HTTP 请求 一、引言二、使用Curl库发送HTTP请求三、使用Boost.Beast库发送HTTP请求四、使用cpp-httplib库发送HTTP请求五、自己实现socket发送 HTTP 请求总结 一、引言 使用C编程发送HTTP请求通常需要使用第三方的HTTP库或框架。在C中&#xff0…

57 代码审计-JAVA项目框架类漏洞分析报告

目录 过滤器及拦截器相关区别解释Struts2-016远程代码执行漏洞分析-黑盒流程SpringBoot-SpEL表达式注入漏洞分析-白盒思路 过滤器及拦截器相关区别解释 Filter是基于函数回调的&#xff0c;而Interceptor则是基于Java反射的。 Filter依赖于Servlet容器&#xff0c;而Intercept…

gitattributes配置文件的作用

0 Preface/Foreword Git版本管控工具功能强大&#xff0c;在使用过程中&#xff0c;在多人合作的项目开发过程中&#xff0c;经常会遇到提交代码时出现的warning提醒&#xff0c;尤其是换行符。 Linux/Unix/Mac OS操作系统的换行符使用LF符号&#xff08;\n&#xff09;&…

Dash中 基本的 callback 5

app.callback 在Dash中&#xff0c;app.callback 被用于创建交互性应用程序&#xff0c;它用于定义一个回调函数&#xff0c;该函数在应用程序中发生特定事件时被触发。回调函数可以修改应用程序的布局或更新图表等内容&#xff0c;从而实现动态交互。 下面是一个简单的 app.…

LaTex详细安装及配置(Windows)

文章目录 引言LaTeX简介优势与应用领域 安装环境安装texlive下载texlive安装 编辑器安装texstudio下载texstudio安装 环境配置 使用第一个LaTex文档新建文件编程查看 效果 结语 引言 在当今信息技术高度发达的时代&#xff0c;文档的编辑和排版是我们日常工作和学习中不可或缺…

JavaScript中的prototype和_proto_的关系是什么

JavaScript中的prototype和_proto_的关系是什么 __proto__ 是 JavaScript 中对象的一个内部属性&#xff0c;它指向该对象的原型。JavaScript 中每个对象都有一个 __proto__ 属性&#xff0c;通过它可以访问对象的原型。prototype 是函数对象特有的属性&#xff0c;每个函数都…

蓝桥杯 1223 第 2 场 小白入门赛

蓝桥小课堂-平方和 模拟 1 2 2 2 3 2 ⋯ n 2 n ⋅ ( n 1 ) ⋅ ( 2 n 1 ) 6 1^22^23^2\cdotsn^2\dfrac{n\;\cdot\;(n 1)\;\cdot\;(2n1)}{6} 122232⋯n26n⋅(n1)⋅(2n1)​。 write(n * (n 1) * (n * 2 1) / 6);房顶漏水啦 m a x ( 最大的行 − 最小的行 , 最大的列 −…

DevC++ 用C语言的多线程 实现简单的客户端和服务器

知识来源一&#xff1a; 使用Dev-C实现简单的客户端和服务器-CSDN博客 此先生的博客使用的是win32 SDK来创建多线程&#xff0c;然后鄙人对这个版本的多线程细节不明。于是又重新用C语言的线程替代win32API,以此继续学习服务器代码。 知识来源二&#xff1a;DevC 多线程创建…

[Netty实践] 简单WebSocket服务实现

目录 一、介绍 二、依赖导入 三、基础类准备 四、Handler实现 五、WebSocketChannelInitializer实现 六、WebSocketServer实现 七、前端实现 八、测试 九、参考链接 一、介绍 关于WebSocket此处不进行过多介绍&#xff0c;本章主要着重通过Netty实现WebSocket通信服务…

在线客服系统:解决常见问题的实用工具与解决方案

市场得不断发展促使着消费者服务意识的觉醒&#xff0c;越来越多的消费者在购买产品的时候不仅看产品的功能、外观、性能&#xff0c;还关注品牌的服务质量。在线客服系统的出现帮助企业解决了客户服务难的问题。接下来&#xff0c;我们具体聊一聊在线客服系统能解决哪些问题&a…

每日一题——LeetCode888

方法一 个人方法&#xff1a; 交换后要达到相同的数量&#xff0c;那么意味着这个相同的数量就是两个人总数的平均值&#xff0c;假设A总共有4个&#xff0c;B总共有8个&#xff0c;那么最后两个人都要达到6个&#xff0c;如果A的第一盒糖果只有1个&#xff0c;那么B就要给出6…

铁山靠之——HarmonyOS基础 - 1.0

HarmonyOS学习第一章 一、HarmonyOS简介1.1 安装和使用DevEco Studio1.2 环境配置1.3 项目创建1.4 运行程序1.5 基本工程目录1.5.1 工程级目录1.5.2 模块级目录1.5.3 app.json51.5.4 module.json51.5.5 main_pages.json 二、TypeScript快速入门2.1 简介2.2 基础类型2.2.1 布尔值…

通过 Nginx 代理实现网页内容替换

突发奇想&#xff0c;用 Nginx 代理一个网站&#xff0c;把网站的一些关键字替换掉&#xff0c;蛮有意思的。 如下图&#xff1a; 一、编译安装 Nginx 一般 Nginx 中不包含 subs_filter 文本替换的模块&#xff0c;需要自己手动编译安装&#xff0c;步骤如下。 克隆 subs_fi…