实验三 Spark SQL基础编程
1.实验目的
1. 掌握 Spark SQL 的基本编程方法;
2. 熟悉 RDD 到 DataFrame 的转化方法;
3. 熟悉利用 Spark SQL 管理来自不同数据源的数据。
2.实验内容
1.Spark SQL 基本操作 将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。
{ "id":1, "name":"Ella", "age":36 }
{ "id":2, "name":"Bob", "age":29 }
{ "id":3, "name":"Jack", "age":29 }
{ "id":4, "name":"Jim", "age":28 }
{ "id":4, "name":"Jim", "age":28 }
{ "id":5, "name":"Damon" }
{ "id":5, "name":"Damon" }
为 employee.json 创建 DataFrame,并写出 Python 语句完成下列操作:
(1)查询所有数据;
(2)查询所有数据,并去除重复的数据;
(3)查询所有数据,打印时去除 id 字段;
(4)筛选出 age>30 的记录;
(5)将数据按 age 分组;
(6)将数据按 name 升序排列;
(7)取出前 3 行数据;
(8)查询所有记录的 name 列,并为其取别名为 username;
(9)查询年龄 age 的平均值;
(10)查询年龄 age 的最小值。
创建json文件
echo '{ "id":1, "name":"Ella", "age":36 }' > employee.json
echo '{ "id":2, "name":"Bob", "age":29 }' >> employee.json
echo '{ "id":3, "name":"Jack", "age":29 }' >> employee.json
echo '{ "id":4, "name":"Jim", "age":28 }' >> employee.json
echo '{ "id":4, "name":"Jim", "age":28 }' >> employee.json
echo '{ "id":5, "name":"Damon" }' >> employee.json
echo '{ "id":5, "name":"Damon" }' >> employee.json
参考代码:关键代码如下。
#导入
....
//创建sprak对象
....
df = spark.read.json("employee.json")
df.show()
df_distinct = df.distinct()
df_distinct.show()
df_without_id = df.select("name", "age")
df_without_id.show()
df_age_gt_30 = df.filter(col("age") > 30)
df_age_gt_30.show()
df_grouped_by_age = df.groupBy("age").count()
df_grouped_by_age.show()
df_sorted_by_name = df.orderBy("name")
df_sorted_by_name.show()
df_top_3 = df.limit(3)
df_top_3.show()
df_with_username = df.select(col("name").alias("username"))
df_with_username.show()
print(df.select(avg("age")).first()[0])
print(df.select(min("age")).first()[-3] )
//关闭
...
2.编程实现将 RDD 转换为 DataFrame 源文件内容如下(包含 id,name,age):
1,Ella,36
2,Bob,29
3,Jack,29
请将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。
请写出 程序代码。
关键代码如下:
# 创建SparkSession
.......
# 定义源文件的结构类型
.......
# 读取源文件并创建RDD
....
# 将RDD转换为DataFrame
df = spark.createDataFrame(rdd, schema)
# 打印DataFrame的所有数据
df.show(truncate=False)
# 将DataFrame的数据按指定格式打印出来
df_string = df.rdd \
.map(lambda row: f"id:{row['id']},name:{row['name']},age:{row['age']}") \
.collect()
for data in df_string:
print(data)
.......
3. 编程实现利用 DataFrame 读写 MySQL
(1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 1 所示 的两行数据
表 1 employee
(2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 2 所 示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。
表 2 employee
sql语句:
CREATE DATABASE sparktest;
USE sparktest;
CREATE TABLE employee (
id INT PRIMARY KEY,
name VARCHAR(50),
gender CHAR(1),
age INT
);
INSERT INTO employee (id, name, gender, age) VALUES
(1, 'Alice', 'F', 22),
(2, 'John', 'M', 25);
python代码
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession.builder \
.appName("MySQL Example") \
.getOrCreate()
# 定义MySQL连接信息
mysql_host = "localhost"
mysql_port = "3306"
mysql_database = "sparktest"
mysql_table = "employee"
mysql_username = "root"
mysql_password = "root"
# 创建DataFrame
data = [("3", "Mary", "F", 26), ("4", "Tom", "M", 23)]
columns = ["id", "name", "gender", "age"]
df = spark.createDataFrame(data, columns)
# 写入MySQL数据库
df.write.format("jdbc") \
.option("url", f"jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_database}&useSSL=false") \
.option("dbtable", mysql_table) \
.option("user", mysql_username) \
.option("password", mysql_password) \
.mode("append") \
.save()
# 从MySQL数据库读取数据
df_mysql = spark.read.format("jdbc") \
.option("url", f"jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_database}") \
.option("dbtable", mysql_table) \
.option("user", mysql_username) \
.option("password", mysql_password) \
.load()
# 计算age的最大值和总和
max_age = df_mysql.selectExpr("max(age)").collect()[0][0]
sum_age = df_mysql.selectExpr("sum(age)").collect()[0][0]
# 打印结果
print("Max Age:", max_age)
print("Sum of Age:", sum_age)
# 关闭SparkSession
spark.stop()
3.参考代码
https://download.csdn.net/download/weixin_41957626/87780630