实验三 Spark SQL基础编程

news2025/1/22 12:38:51

实验三 Spark SQL基础编程

1.实验目的

1. 掌握 Spark SQL 的基本编程方法;

2. 熟悉 RDD 到 DataFrame 的转化方法;

3. 熟悉利用 Spark SQL 管理来自不同数据源的数据。

2.实验内容

1.Spark SQL 基本操作 将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。

{ "id":1, "name":"Ella", "age":36 }

{ "id":2, "name":"Bob", "age":29 }

{ "id":3, "name":"Jack", "age":29 }

{ "id":4, "name":"Jim", "age":28 }

{ "id":4, "name":"Jim", "age":28 }

{ "id":5, "name":"Damon" }

{ "id":5, "name":"Damon" }

为 employee.json 创建 DataFrame,并写出 Python 语句完成下列操作:

(1)查询所有数据;

(2)查询所有数据,并去除重复的数据;

(3)查询所有数据,打印时去除 id 字段;

(4)筛选出 age>30 的记录;

(5)将数据按 age 分组;

(6)将数据按 name 升序排列;

(7)取出前 3 行数据;

(8)查询所有记录的 name 列,并为其取别名为 username;

(9)查询年龄 age 的平均值;

(10)查询年龄 age 的最小值。

创建json文件

echo '{ "id":1, "name":"Ella", "age":36 }' > employee.json
echo '{ "id":2, "name":"Bob", "age":29 }' >> employee.json
echo '{ "id":3, "name":"Jack", "age":29 }' >> employee.json
echo '{ "id":4, "name":"Jim", "age":28 }' >> employee.json
echo '{ "id":4, "name":"Jim", "age":28 }' >> employee.json
echo '{ "id":5, "name":"Damon" }' >> employee.json
echo '{ "id":5, "name":"Damon" }' >> employee.json

参考代码:关键代码如下。

#导入
....
//创建sprak对象
....

df = spark.read.json("employee.json")


df.show()


df_distinct = df.distinct()
df_distinct.show()


df_without_id = df.select("name", "age")
df_without_id.show()

df_age_gt_30 = df.filter(col("age") > 30)
df_age_gt_30.show()

df_grouped_by_age = df.groupBy("age").count()
df_grouped_by_age.show()

df_sorted_by_name = df.orderBy("name")
df_sorted_by_name.show()
df_top_3 = df.limit(3)
df_top_3.show()

df_with_username = df.select(col("name").alias("username"))
df_with_username.show()


print(df.select(avg("age")).first()[0])


print(df.select(min("age")).first()[-3] )

//关闭
...

 

2.编程实现将 RDD 转换为 DataFrame 源文件内容如下(包含 id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

请将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。

请写出 程序代码。

关键代码如下:

# 创建SparkSession
.......

# 定义源文件的结构类型

.......
# 读取源文件并创建RDD
....

# 将RDD转换为DataFrame
df = spark.createDataFrame(rdd, schema)

# 打印DataFrame的所有数据
df.show(truncate=False)

# 将DataFrame的数据按指定格式打印出来
df_string = df.rdd \
    .map(lambda row: f"id:{row['id']},name:{row['name']},age:{row['age']}") \
    .collect()

for data in df_string:
    print(data)

.......

 

 

3. 编程实现利用 DataFrame 读写 MySQL 

(1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 1 所示 的两行数据

表 1 employee

 

(2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 2 所 示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。

表 2 employee

sql语句:

CREATE DATABASE sparktest;

USE sparktest;

CREATE TABLE employee (
  id INT PRIMARY KEY,
  name VARCHAR(50),
  gender CHAR(1),
  age INT
);

INSERT INTO employee (id, name, gender, age) VALUES
(1, 'Alice', 'F', 22),
(2, 'John', 'M', 25);

 python代码

from pyspark.sql import SparkSession

# 创建SparkSession
spark = SparkSession.builder \
    .appName("MySQL Example") \
    .getOrCreate()

# 定义MySQL连接信息
mysql_host = "localhost"
mysql_port = "3306"
mysql_database = "sparktest"
mysql_table = "employee"
mysql_username = "root"

mysql_password = "root"

# 创建DataFrame
data = [("3", "Mary", "F", 26), ("4", "Tom", "M", 23)]
columns = ["id", "name", "gender", "age"]
df = spark.createDataFrame(data, columns)

# 写入MySQL数据库
df.write.format("jdbc") \
    .option("url", f"jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_database}&useSSL=false") \
    .option("dbtable", mysql_table) \
    .option("user", mysql_username) \
    .option("password", mysql_password) \
    .mode("append") \
    .save()

# 从MySQL数据库读取数据
df_mysql = spark.read.format("jdbc") \
    .option("url", f"jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_database}") \
    .option("dbtable", mysql_table) \
    .option("user", mysql_username) \
    .option("password", mysql_password) \
    .load()

# 计算age的最大值和总和
max_age = df_mysql.selectExpr("max(age)").collect()[0][0]
sum_age = df_mysql.selectExpr("sum(age)").collect()[0][0]

# 打印结果
print("Max Age:", max_age)
print("Sum of Age:", sum_age)

# 关闭SparkSession
spark.stop()

 

 3.参考代码

https://download.csdn.net/download/weixin_41957626/87780630

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/522192.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CF1245D Shichikuji and Power Grid 题解

CF1245D Shichikuji and Power Grid 题解 题目链接字面描述题面翻译样例 #1样例输入 #1样例输出 #1 样例 #2样例输入 #2样例输出 #2 提示 思路点拨代码实现 题目 链接 https://www.luogu.com.cn/problem/CF1245D 字面描述 题面翻译 已知一个平面上有 n n n 个城市&#x…

计网笔记 数据链路层 (1-2) 封装成帧、差错控制、流量控制与可靠传输、停止等待协议、后退N帧协议(GBN)、选择重传协议(SR)

文章目录 前言在这里插入图片描述 零、数据链路层基本概念一、功能0、数据链路层功能概述1、封装成帧和透明传输1.1封装成帧1.2 透明传输1.3组帧方法 2、数据链路层的差错控制2.0差错从何而来2.1位错(比特错,1变成0,0变成1)2.2帧错…

干货 | ChatGPT使用指南,让你轻松上车AI世界!

Hello,大家好! 这里是壹脑云科研圈,我是喵君姐姐~ 聊天机器人(Chatbot)是一种人工智能应用,可以模拟人类对话行为,以自然语言进行交互。 在过去的几年里,随着自然语言处理技术和深…

Springboot +Flowable,定时器的简单使用

一.流程定义定时激活 之前介绍流程定义的时候,流程都是定义好之后立马就激活了,其实在流程定义的这个过程中,我们还可以设置一个激活时间,也就是流程定义好之后,并不会立马激活(不激活就不能据此流程定义创…

操作系统作业 第37-40章

第四次作业 第37章 本章作业需要使用提供的disk.py程序。该程序可以模拟磁盘的工作。在默认情况下,磁盘调度方法为FIFO。对于时间的计算,假设旋转一度为1个时间单位,旋转完整一圈需要360个时间单位,而一个磁道上默认有12个扇区&…

实验二 RDD基础编程

实验二 RDD基础编程 前提是配置好大数据环节。 hadoop,spark,scala等必须的软件 以及下载pyshark 1.实验目的 1. 掌握 RDD 基本操作; 2. 熟悉使用 RDD 编程解决实际具体问题的方法; 2.实验内容 本人仅提供测试代码!…

策划专业技能提升攻略,让你在职场中脱颖而出

作为一个10多年的老策划,刚入行的时候也走过很多弯路,后来加入到一家在国内比较知名的策划公司(老板也是当年的十大知名策划人)才真正让我实现水平的跃升。 当时公司经常有内训,新人的第一课就是策划人应该如何快速入…

FreeRTOS-事件组详解

✅作者简介:嵌入式入坑者,与大家一起加油,希望文章能够帮助各位!!!! 📃个人主页:rivencode的个人主页 🔥系列专栏:玩转FreeRTOS 💬保持…

深入理解JVM读书笔记与实战_01_Java内存区域与内存溢出异常

文章目录 运行时数据区域问题引入 运行时数据区域 Java虚拟机在执行Java程序的过程中会把所管理的内存划分为若干个不同的数据区。运行时数据区包括了程序计数器、虚拟机栈、本地方法栈、方法区和堆。 程序计数器:程序计数器是线程私有的内存,用来记住…

vue:组件使用v-model实现2个组件间的数据双向绑定

一、需要实现的需求: 子组件输入框的数据发生改变,父组件的数据跟着实时改变; 父组件的数据发生改变,子组件的数据跟着实时改变。 二、实现思路: 1、(1)在父组件引入子组件。(2&…

CAN总线要点总结(CAN2.0A/B)

个人博客原文链接:CAN总线要点总结(CAN2.0A/B) 前言 工作也有几年了,在项目中也接触过几次CAN总线,但总是止步于会用即可,对于很多细节上的东西有时还是稀里糊涂的状态,这几天正好有点时间&am…

【亲测有效】pycharm不显示软件包

http://pypi.hustunique.com/ https://pypi.mirrors.ustc.edu.cn/ http://pypi.tuna.tsinghua.edu.cn/simple/ http://mirrors.aliyun.com/pypi/simple/ http://pypi.douban.com/simple/2023.5.13 亲测有效

单点登录系统:登录,登出,拦截器

什么是单点登录? 单点登录(Single Sign On),简称为 SSO,是目前比较流行的企业业务整合的解决方案之一。SSO 的定义是在多个应用系统中,用户只需要登录一次就可以访问所有相互信任的应用系统。 假设一个企业…

贪心算法(无规则)

目录 1.easy1.455. 分发饼干2.1005. K 次取反后最大化的数组和3.860. 柠檬水找零 2.medium1.序列问题1.376. 摆动序列2.738. 单调递增的数字 2.贪心解决股票问题1.122. 买卖股票的最佳时机 II 3.两个维度权衡问题1.135. 分发糖果*2.406. 根据身高重建队列(linklist,…

【WOA-LSTM】基于WOA优化 LSTM神经网络预测研究(Python代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

Neuron 2.4.0 发布:体验下一代工业物联网连接和管理

近日,EMQ 旗下的工业协议网关软件 Neuron 发布了最新的 2.4.0 版本。 该版本新增了包括 ABB COMLI 在内的四个南向驱动和一个北向应用,同时对现有的插件功能和 UI 进行了优化。 快速体验 Neuron 新版本 新增驱动插件满足不同场景需求 IEC61850 MMS 和 …

springboot项目如何优雅停机

文章目录 前言kill -9 pid的危害如何优雅的停机理论步骤优雅方式1、kill -15 pid 命令停机2、ApplicationContext close停机3、actuator shutdown 停机 前言 相信很多同学都会用Kill -9 PID来杀死进程,如果用在我们微服务项目里面,突然杀死进程会有什么…

计算机组成原理基础练习题第二章

1.下面四种语言中,()更适应网络环境 A、FORTRAN    B、JavaC、C        D、PASCAL 2.下列叙述中正确的是() A.终端是计算机硬件的一部分,好比电视中的小屏幕 B. ALU是代数逻辑单元的缩写 C.导航用计算机属于一般用途计算…

网络数据链路层介绍

文章目录 一、以太网二、以太网的帧格式三、局域网通信的原理四、ARP协议1.ARP协议的介绍2.ARP协议的工作流程3.ARP数据报格式 一、以太网 以太网并不是一种具体的网络,而是一种技术标准,它既包含了数据链路层的内容,也包含了一些物理层的内…

【数据分享】2023年全国A级景区数据(14847个景区)

我国的旅游景区,依据景区质量划分为五级,从高到低依次为5A、4A、3A、2A、A级旅游景区。我国旅游管理部门对于A级景区实行“有进有出”的动态管理,也就是A级景区的名单每年都在变!我们立方数据学社也一直在跟踪整理每年的A级景区数…