清洗相关的API
清洗相关的API:
1.去重API: dropDupilcates
2.删除缺失值API: dropna
3.替换缺失值API: fillna
去重API: dropDupilcates
dropDuplicates(subset):删除重复数据
1.用来删除重复数据,如果没有指定参数subset,比对行中所有字段内容,如果全部相同,则认为是重复数据,会被删除
2.如果有指定参数subset,只比对subset中指定的字段范围
删除缺失值API: dropna
dropna(thresh,subset):删除缺失值数据.
1.如果不传递参数,只要任意一个字段值为null,就会删除整行数据
2.如果只指定了subset,那么空值的检查,就只会限定在subset指定范围内
3.如果只指定了thresh,那么空值检查的这些字段中,至少需要有thresh(>=thresh)个字段的值不为空,才不会被删除
替换缺失值API: fillna
fillna(value,subset):替换缺失值数据
1.value:必须要传递参数,指定填充缺失值的数据
2.subset:限定缺失值的替换范围
注意:
value如果不是字典,那么就只会替换字段类型匹配的空值
最常用的是value传递字典形式
# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F
# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式
需求分析:
1- 将每行内容切分得到单个的单词
2- 组织DataFrame的数据结构
2.1- 有两列。一列是单词,一列是次数
"""
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
print('API的清洗')
# 创建Sparksession对象
spark = SparkSession \
.builder \
.appName('api_etl_demo') \
.master('local[*]') \
.getOrCreate()
# 数据输入
init_df = spark.read.csv(
path='file:///export/data/pyspark_projects/02_spark_sql/data/clear_data.csv',
sep=',',
header=True,
inferSchema=True,
encoding='utf8'
)
# 查看数据
init_df.show()
init_df.printSchema()
# 数据处理
print('=' * 50)
# 去重API: dropDuplicates
init_df.dropDuplicates().show()
# 指定字段去重
init_df.dropDuplicates(subset=['id', 'name']).show()
print('=' * 50)
# 删除缺失值的API: dropna
init_df.dropna().show()
# 指定字段删除
init_df.dropna(subset='name').show()
init_df.dropna(subset=['name', 'age', 'address']).show()
init_df.dropna(thresh=1, subset=['name', 'age', 'address']).show()
init_df.dropna(thresh=2, subset=['name', 'age', 'address']).show()
print('=' * 50)
# 替换缺失值API
init_df.fillna(9999).show()
# value传递字典形式
init_df.fillna(value={'id': 9999, 'name': '刘亦菲', 'address': '北京'}).show()
# 释放资源
spark.stop()
Spark SQL的Shuffle分区设置
Spark SQL底层本质上还是Spark的RDD程序,认为 Spark SQL组件就是一款翻译软件,用于将SQL/DSL翻译为Spark RDD程序, 执行运行
Spark SQL中同样也是存在shuffle的分区的,在执行shuffle分区后, shuffle分区数量默认为 200个,但是实际中, 一般都是需要调整这个分区的, 因为当数据量比较少的数据, 200个分区相对来说比较大一些, 但是当数据量比较大的时候, 200个分区显得比较小
调整shuffle分区的数量:
方案一(不推荐):直接修改spark的配置文件spark-defaults.conf,全局设置,默认值为200
修改设置 spark.sql.shuffle.partitions 20
方案二(常用,推荐使用):在客户端通过指令submit命令提交的时候动态设置shuffle的分区数量,部署上线的时候,基于spark-submit提交运行的时候
"./spark-submit --conf "spark.sql.shuffle.partitions=20"
方案三(比较常用):在代码中设置,主要在测试环境中使用,一般部署上线的时候,会删除,优先级也是最高的,一般的使用场景是数据量未来不会发生太大的波动
sparksession.conf.set("spark.sql.shuffle.partitions",20)
# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F
# 绑定指定的python解释器
"""
1.2 直接基于DataFrame来处理
需求分析:
1- 将每行内容切分得到单个的单词
2- 组织DataFrame的数据结构
2.1- 有两列。一列是单词,一列是次数
"""
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
print('直接基于DataFrame来处理')
spark = SparkSession \
.builder \
.config("spark.sql.shuffle.partitions", 1) \
.appName('dataFrame_world_count_demo') \
.master('local[*]') \
.getOrCreate()
# 数据输入
# text方式读取hdfs上的文件
init_df = spark.read.text(paths='hdfs://node1:8020/source/word.txt')
# # 查看数据
# init_df.show()
# # 打印dataframe表结构信息
# init_df.printSchema()
# 创建临时视图
init_df.createTempView('words')
# 数据处理
"""
sparksql方式处理数据-子查询
1.先切分每一行的数据
2.使用炸裂函数获得一个word单词列
3.使用子查询方式聚合统计每个单词出现的次数
"""
spark.sql("""select word,count(*) as cnt
from (select explode(split(value,' ')) as word from words)
group by word order by cnt desc
""").show()
"""
sparksql方式处理数据-侧视图
1.先切分每一行的数据
2.使用炸裂函数获得一个word单词列
3.使用侧视图方式聚合统计每个单词出现的次数
炸裂函数配合侧视图使用如下:
格式:select 原表别名.字段名,侧视图名.字段名 from 原表 原表别名 lateral view explode(要炸开的字段)
侧视图名 as 字段名
"""
spark.sql("""select word,count(*) as cnt
from words w
lateral view explode(split(value,' ')) t as word
group by word order by cnt desc
""").show()
print('=' * 50)
"""
DSL方式处理数据-方式一
1.先切分每一行的数据
2.使用炸裂函数获得一个word单词列
3.调用API聚合统计单词个数再排序
"""
init_df.select(
F.explode(F.split('value', ' ')).alias('word')
).groupBy('word').count().orderBy('count', ascending=False).show()
"""
DSL方式处理数据-方式二
1.先切分每一行的数据
2.使用炸裂函数获得一个word单词列
3.调用API聚合统计单词个数再排序
4.agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可
"""
init_df.select(
F.explode(F.split('value', ' ')).alias('word')
).groupBy('word').agg(
F.count('word').alias('cnt'),
F.max('word').alias('max_word'),
F.min('word').alias('min_word'),
).orderBy('cnt', ascending=False).show()
"""
DSL方式处理数据-方式三
withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名
withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源
"""
init_df.withColumn(
'word',
F.explode(F.split('value', ' '))
).groupBy('word').agg(
F.count('word').alias('cnt'),
F.max('word').alias('max_word'),
F.min('word').alias('min_word')
).orderBy('cnt', ascending=False).show()
# 数据输出
# 是否资源
spark.stop()
数据写出操作
统一的输出语法:
对应的简写API格式如下,以CSV为例:
init_df.write.csv(
path='存储路径',
mode='模式',
header=True,
sep='\t',
encoding='UTF-8'
)
输出到本地文件
常用参数说明:
1- path:指定结果数据输出路径。支持本地文件系统和HDFS文件系统
2- mode:当输出目录中文件已经存在的时候处理办法
2.1- append:追加。如果文件已经存在,那么继续在该目录下产生新的文件
2.2- overwrite:覆盖。如果文件已经存在,那么就先将已有的文件清除,再写入进去
2.3- ignore:忽略。如果文件已经存在,那么不执行任何操作
2.4- error:报错。如果文件已经存在,那么直接报错。会报错AnalysisException: path
file:xxx already exists.
3- sep:字段间的分隔符
4- header:数据输出的时候,是否要将字段名称输出到文件的第一行。推荐设置为True
5- encoding:文件输出的编码方式
# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F
# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式
需求分析:
1- 将每行内容切分得到单个的单词
2- 组织DataFrame的数据结构
2.1- 有两列。一列是单词,一列是次数
"""
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
print('数据输出本地文件')
# 创建Sparksession对象
spark = SparkSession \
.builder \
.appName('api_etl_demo') \
.master('local[*]') \
.getOrCreate()
# 数据输入
init_df = spark.read.csv(
path='file:///export/data/pyspark_projects/02_spark_sql/data/clear_data.csv',
sep=',',
header=True,
inferSchema=True,
encoding='utf8'
)
# 数据处理
result = init_df.where('age>20')
# 数据查看
result.show()
result.printSchema()
# 数据输出
# 以csv格式输出,简写API
result.write.csv(
path='file:///export/data/pyspark_projects/02_spark_sql/data/output',
mode='append',
header=True,
sep=',',
encoding='utf8'
)
# 以json方式输出到本地文件系统,复杂API
result.write \
.format('json') \
.option('encoding', 'utf8') \
.mode('overwrite') \
.save('file:///export/data/pyspark_projects/02_spark_sql/data/output_json')
数据输出到数据库
数据库的驱动包, 一般都是一些Jar包
如何放置【mysql-connector-java-5.1.41.jar】驱动包呢?
1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,
目录位置: /export/server/spark/jars
2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包
目录位置:
/root/anaconda3/lib/python3.8/site-packages/pyspark/jars/
3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下
hdfs的spark的jars目录下: hdfs://node1:8020/spark/jars
请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案: spark-submit --jars ....
将中文输出到了数据表中乱码
解决办法:
1- 数据库连接要加上:useUnicode=true&characterEncoding=utf-8
2- 创建数据库的时候需要指定编码character set utf8
# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F
# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式
需求分析:
1- 将每行内容切分得到单个的单词
2- 组织DataFrame的数据结构
2.1- 有两列。一列是单词,一列是次数
"""
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
print('API的清洗')
# 创建Sparksession对象
spark = SparkSession \
.builder \
.appName('api_etl_demo') \
.master('local[*]') \
.getOrCreate()
# 数据输入
init_df = spark.read.csv(
path='file:///export/data/pyspark_projects/02_spark_sql/data/clear_data.csv',
sep=',',
header=True,
inferSchema=True,
encoding='utf8'
)
# 数据处理
result = init_df.where('age>20')
# 数据查看
result.show()
result.printSchema()
# 数据输出
# 以csv格式输出,简写API
result.write.jdbc(
url='jdbc:mysql://node1:3306/day06?useUnicode=true&characterEncoding=utf-8',
table='student',
mode='append',
properties={'user': 'root', 'password': '123456'}
)