DataFrame详解

news2024/11/20 12:43:31

清洗相关的API

清洗相关的API:

1.去重API: dropDupilcates

2.删除缺失值API: dropna

3.替换缺失值API: fillna

去重API: dropDupilcates

dropDuplicates(subset):删除重复数据

1.用来删除重复数据,如果没有指定参数subset,比对行中所有字段内容,如果全部相同,则认为是重复数据,会被删除

2.如果有指定参数subset,只比对subset中指定的字段范围

删除缺失值API: dropna

dropna(thresh,subset):删除缺失值数据.

1.如果不传递参数,只要任意一个字段值为null,就会删除整行数据

2.如果只指定了subset,那么空值的检查,就只会限定在subset指定范围内

3.如果只指定了thresh,那么空值检查的这些字段中,至少需要有thresh(>=thresh)个字段的值不为空,才不会被删除

 替换缺失值API: fillna

fillna(value,subset):替换缺失值数据

1.value:必须要传递参数,指定填充缺失值的数据

2.subset:限定缺失值的替换范围

注意:

        value如果不是字典,那么就只会替换字段类型匹配的空值

        最常用的是value传递字典形式

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F

# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式

需求分析:

1- 将每行内容切分得到单个的单词

2- 组织DataFrame的数据结构

2.1- 有两列。一列是单词,一列是次数
"""

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    print('API的清洗')
    # 创建Sparksession对象
    spark = SparkSession \
        .builder \
        .appName('api_etl_demo') \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    init_df = spark.read.csv(
        path='file:///export/data/pyspark_projects/02_spark_sql/data/clear_data.csv',
        sep=',',
        header=True,
        inferSchema=True,
        encoding='utf8'
    )
    # 查看数据
    init_df.show()
    init_df.printSchema()
    # 数据处理
    print('=' * 50)
    # 去重API:  dropDuplicates
    init_df.dropDuplicates().show()
    # 指定字段去重
    init_df.dropDuplicates(subset=['id', 'name']).show()

    print('=' * 50)
    # 删除缺失值的API:  dropna
    init_df.dropna().show()
    # 指定字段删除
    init_df.dropna(subset='name').show()
    init_df.dropna(subset=['name', 'age', 'address']).show()
    init_df.dropna(thresh=1, subset=['name', 'age', 'address']).show()
    init_df.dropna(thresh=2, subset=['name', 'age', 'address']).show()
    print('=' * 50)
    # 替换缺失值API
    init_df.fillna(9999).show()
    # value传递字典形式
    init_df.fillna(value={'id': 9999, 'name': '刘亦菲', 'address': '北京'}).show()
    # 释放资源
    spark.stop()

Spark SQL的Shuffle分区设置

Spark SQL底层本质上还是Spark的RDD程序,认为 Spark SQL组件就是一款翻译软件,用于将SQL/DSL翻译为Spark RDD程序, 执行运行

Spark SQL中同样也是存在shuffle的分区的,在执行shuffle分区后, shuffle分区数量默认为 200个,但是实际中, 一般都是需要调整这个分区的, 因为当数据量比较少的数据, 200个分区相对来说比较大一些, 但是当数据量比较大的时候, 200个分区显得比较小

调整shuffle分区的数量:

方案一(不推荐):直接修改spark的配置文件spark-defaults.conf,全局设置,默认值为200

修改设置 spark.sql.shuffle.partitions 20

方案二(常用,推荐使用):在客户端通过指令submit命令提交的时候动态设置shuffle的分区数量,部署上线的时候,基于spark-submit提交运行的时候

        "./spark-submit --conf "spark.sql.shuffle.partitions=20"

方案三(比较常用):在代码中设置,主要在测试环境中使用,一般部署上线的时候,会删除,优先级也是最高的,一般的使用场景是数据量未来不会发生太大的波动

sparksession.conf.set("spark.sql.shuffle.partitions",20)

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F

# 绑定指定的python解释器
"""
1.2 直接基于DataFrame来处理

需求分析:

1- 将每行内容切分得到单个的单词

2- 组织DataFrame的数据结构

2.1- 有两列。一列是单词,一列是次数
"""

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    print('直接基于DataFrame来处理')
    spark = SparkSession \
        .builder \
        .config("spark.sql.shuffle.partitions", 1) \
        .appName('dataFrame_world_count_demo') \
        .master('local[*]') \
        .getOrCreate()

    # 数据输入
    # text方式读取hdfs上的文件
    init_df = spark.read.text(paths='hdfs://node1:8020/source/word.txt')
    # # 查看数据
    # init_df.show()
    # # 打印dataframe表结构信息
    # init_df.printSchema()
    # 创建临时视图
    init_df.createTempView('words')
    # 数据处理
    """
    sparksql方式处理数据-子查询
    1.先切分每一行的数据
    2.使用炸裂函数获得一个word单词列
    3.使用子查询方式聚合统计每个单词出现的次数
    """
    spark.sql("""select word,count(*) as cnt 
    from (select explode(split(value,' ')) as word from words)
    group by word order by cnt desc
    """).show()
    """
       sparksql方式处理数据-侧视图
       1.先切分每一行的数据
       2.使用炸裂函数获得一个word单词列
       3.使用侧视图方式聚合统计每个单词出现的次数
       炸裂函数配合侧视图使用如下:
       格式:select 原表别名.字段名,侧视图名.字段名 from 原表 原表别名 lateral view explode(要炸开的字段)
       侧视图名 as 字段名
       """
    spark.sql("""select word,count(*) as cnt
    from words w 
    lateral view explode(split(value,' ')) t as word
    group by word order by cnt desc
    """).show()

    print('=' * 50)
    """
           DSL方式处理数据-方式一
           1.先切分每一行的数据
           2.使用炸裂函数获得一个word单词列
           3.调用API聚合统计单词个数再排序
    """
    init_df.select(
        F.explode(F.split('value', ' ')).alias('word')
    ).groupBy('word').count().orderBy('count', ascending=False).show()

    """
           DSL方式处理数据-方式二
           1.先切分每一行的数据
           2.使用炸裂函数获得一个word单词列
           3.调用API聚合统计单词个数再排序
           4.agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可
    """
    init_df.select(
        F.explode(F.split('value', ' ')).alias('word')
    ).groupBy('word').agg(
        F.count('word').alias('cnt'),
        F.max('word').alias('max_word'),
        F.min('word').alias('min_word'),
    ).orderBy('cnt', ascending=False).show()

    """
    DSL方式处理数据-方式三
        withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名
        withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源
    """
    init_df.withColumn(
        'word',
        F.explode(F.split('value', ' '))
    ).groupBy('word').agg(
        F.count('word').alias('cnt'),
        F.max('word').alias('max_word'),
        F.min('word').alias('min_word')
    ).orderBy('cnt', ascending=False).show()

    # 数据输出
    # 是否资源
    spark.stop()

数据写出操作

统一的输出语法:

对应的简写API格式如下,以CSV为例:
init_df.write.csv(
    path='存储路径',
    mode='模式',
    header=True,
    sep='\t',
    encoding='UTF-8'
)

输出到本地文件

常用参数说明:
    1- path:指定结果数据输出路径。支持本地文件系统和HDFS文件系统
    2- mode:当输出目录中文件已经存在的时候处理办法
        2.1- append:追加。如果文件已经存在,那么继续在该目录下产生新的文件
        2.2- overwrite:覆盖。如果文件已经存在,那么就先将已有的文件清除,再写入进去
        2.3- ignore:忽略。如果文件已经存在,那么不执行任何操作
        2.4- error:报错。如果文件已经存在,那么直接报错。会报错AnalysisException: path     
                    file:xxx already exists.
        
    3- sep:字段间的分隔符
    4- header:数据输出的时候,是否要将字段名称输出到文件的第一行。推荐设置为True
    5- encoding:文件输出的编码方式

 

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F

# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式

需求分析:

1- 将每行内容切分得到单个的单词

2- 组织DataFrame的数据结构

2.1- 有两列。一列是单词,一列是次数
"""

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    print('数据输出本地文件')
    # 创建Sparksession对象
    spark = SparkSession \
        .builder \
        .appName('api_etl_demo') \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    init_df = spark.read.csv(
        path='file:///export/data/pyspark_projects/02_spark_sql/data/clear_data.csv',
        sep=',',
        header=True,
        inferSchema=True,
        encoding='utf8'
    )
    # 数据处理
    result = init_df.where('age>20')
    # 数据查看
    result.show()
    result.printSchema()

    # 数据输出
    # 以csv格式输出,简写API
    result.write.csv(
        path='file:///export/data/pyspark_projects/02_spark_sql/data/output',
        mode='append',
        header=True,
        sep=',',
        encoding='utf8'
    )

    # 以json方式输出到本地文件系统,复杂API
    result.write \
        .format('json') \
        .option('encoding', 'utf8') \
        .mode('overwrite') \
        .save('file:///export/data/pyspark_projects/02_spark_sql/data/output_json')

数据输出到数据库

数据库的驱动包, 一般都是一些Jar包

如何放置【mysql-connector-java-5.1.41.jar】驱动包呢?  
    1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,
        目录位置: /export/server/spark/jars
    
    2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包
        目录位置:
            /root/anaconda3/lib/python3.8/site-packages/pyspark/jars/
    
    3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下
        hdfs的spark的jars目录下:  hdfs://node1:8020/spark/jars
        

    请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案:  spark-submit --jars  ....

将中文输出到了数据表中乱码
解决办法:
1- 数据库连接要加上:useUnicode=true&characterEncoding=utf-8
2- 创建数据库的时候需要指定编码character set utf8

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F

# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式

需求分析:

1- 将每行内容切分得到单个的单词

2- 组织DataFrame的数据结构

2.1- 有两列。一列是单词,一列是次数
"""

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    print('API的清洗')
    # 创建Sparksession对象
    spark = SparkSession \
        .builder \
        .appName('api_etl_demo') \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    init_df = spark.read.csv(
        path='file:///export/data/pyspark_projects/02_spark_sql/data/clear_data.csv',
        sep=',',
        header=True,
        inferSchema=True,
        encoding='utf8'
    )
    # 数据处理
    result = init_df.where('age>20')
    # 数据查看
    result.show()
    result.printSchema()

    # 数据输出
    # 以csv格式输出,简写API
    result.write.jdbc(
        url='jdbc:mysql://node1:3306/day06?useUnicode=true&characterEncoding=utf-8',
        table='student',
        mode='append',
        properties={'user': 'root', 'password': '123456'}
    )

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1370962.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Maven报错:Malformed \uxxxx encoding 解决办法

maven构建出现这个Malformed \uxxxx encoding问题,应该是maven仓库里面有脏东西进入了! 解决: 将仓库中的resolver-status.properties文件全部干掉。 我使用的everything工具全局搜索resolver-status.properties文件,然后Ctrla,再…

Go语言学习笔记(二)

Go语言的学习资源 以下是一些推荐的Go语言学习资源的链接: Go语言教程:https://golang.org/doc/Go by Example:Go by ExampleGolang Tutorials:https://golangtutorials.com/Go语言第一课(慕课网)&#x…

df -h的值详细介绍

正文: 在 Linux 系统中,了解不同类型的文件系统及其作用是非常重要的。这不仅有助于系统管理,还可以在进行数据存储和优化时做出明智的决策。以下是一个常见的 Linux 文件系统配置的概述,包括每个文件系统的作用和重要性。 操作图片: dev…

Genie Nano-10GigE M/C8200工业相机全面投入生产

6700万像素 业界最小 适用于高性能图像采集 近日,Teledyne DALSA宣布基于Teledyne e2v 67M单色和彩色传感器的Nano-10GigE M8200和C8200工业相机现已进入全面生产阶段。 全新Genie Nano-10GigE 67M相机是业界最小的10GigEVision相机型号,可实现高达14…

快乐学Python,数据分析之获取数据方法「公开数据或爬虫」

学习Python数据分析,第一步是先获取数据,为什么说数据获取是数据分析的第一步呢,显而易见:数据分析,得先有数据,才能分析。 作为个人来说,如何获取用于分析的数据集呢? 1、获取现成…

将dumpbin从Visual Studio中抠出来,并使用dumpbin查看exe和dll库的依赖关系

目录 1、初步说明 2、在开发的机器上使用dumpbin工具查看dll库的依赖关系 3、将dumpbin.exe从Visual Studio中抠出来 3.1、找到dumpbin.exe文件及其依赖的dll文件 3.2、在cmd中运行dumpbin,提示找不到link.exe文件 3.3、再次运行dumpbin.exe提示找不到mspdb10…

2024年第九届机器学习技术国际会议(ICMLT 2024) 即将召开

2024年第九届机器学习技术国际会议(ICMLT 2024)将于2024年5月24-26日在挪威奥斯陆举行。ICMLT 2024旨在讨论机器学习技术领域的最新研究技术现状和前沿趋势,为来自世界各地的科学家、工程师、实业家、学者和其他专业人士提供一个互动和交流的…

【算法】LRU算法

LRU算法 LRU(Least Recently Used) 即最近最少使用,属于典型的内存淘汰机制。 根据数据的历史访问记录来进行淘汰数据,其核心思想是“如果数据最近被访问过,那么将来被访问的几率也更高”,其思路如下图所示: 该算法需…

了解ASP.NET Core 中的文件提供程序

写在前面 ASP.NET Core 通过文件提供程序来抽象化文件系统访问。分为物理文件提供程序(PhysicalFileProvider)和清单嵌入的文件提供程序(ManifestEmbeddedFileProvider)还有复合文件提供程序(CompositeFileProvider );其中PhysicalFileProvider 提供对物理文件系统…

PPT插件-大珩助手-选择同类

选择同类-颜色 对于选中的形状,一键选中当前页中的所有相同颜色的形状 选择同类-文本 一键选择当前页中的所有文本对象 选择同类-非文本 一键选择当前页中的所有非文本对象 选择同类-反选 一键选择当前页未选择的对象 软件介绍 PPT大珩助手是一款全新设计的…

【读书笔记】《白帽子讲web安全》浏览器安全

目录 第二篇 客户端脚本安全 第2章 浏览器安全 2.1同源策略 2.2浏览器沙箱 2.3恶意网址拦截 2.4高速发展的浏览器安全 第二篇 客户端脚本安全 第2章 浏览器安全 近年来随着互联网的发展,人们发现浏览器才是互联网最大的入口,绝大多数用户使用互联…

【办公类-19-01】20240108图书统计登记表制作(23个班级)EXCEL复制表格并合并表格

背景需求: 制作一个EXCEL模板,每个班级的班主任统计 班级图书量(一个孩子10本,最多35个孩子350本) EXCEL模板 1.0版本: 将这个模板制作N份——每班一份 项目:班级图书统计表 核心:一个EXCEL模板批量生成…

合宙海外模组硬核出击,Air780UAAir780UU全新上市

简介 随着国内市场竞争日趋激烈,企业产品出海已呈如火如荼之势,向外发展拼商机更需硬核优势。 合宙作为物联网行业的核心器件提供商,将逐步推出系列高性价比海外模组,全面助力行业客户出海。现针对亚太、欧洲地区,全…

ChatGPT知名开源项目有哪些

ChatGPT-Next-Web:基于ChatGPT API的私有化部署网页聊天系统 主要功能: 只需在 1 分钟内即可在 Vercel 上一键免费部署,支持私有服务器快速部署,支持使用私有域名支持ChatGPT3.5、4等常见模型Linux/Windows/MacOS 上的紧凑型客户…

【Java】知识——各类编码格式以及样例

一、 #ASCII 码 计算机内所有的信息都是二进制位。一个字节包含 8 个二进制位,可以表示 256 个状态,每个状态表示一个符号。 ASCII 码一共规定了128个字符的编码,比如空格 SPACE 是32(二进制00100000),大写…

Shopee买家通系统:轻松获取虾皮买手号的智能利器

近来,有一款强大的软件引起了广泛关注,它就是Shopee买家通系统,为用户提供了自动化注册虾皮买手号的便捷途径。目前,该软件已覆盖菲律宾、泰国、马来西亚、越南、巴西、印度尼西亚等多个国家,为用户提供更广泛的服务。…

CUTANA™ pAG-Tn5 for CUTTag

CUTANA pAG-Tn5是靶向剪切及转座酶(CUT&Tag)技术中进行高效绘制染色质特征的关键试剂。与ChIP-seq相比,CUT&Tag在降低细胞需求量和测序深度的信噪比方面进行了显著改进。CUTANA pAG-Tn5是一种高活性的E. coli转座酶突变体(Tn5)与蛋白A/G的融合产物&#xff…

龍运当头--html做一个中国火龙祝大家龙年大吉

🐉效果展示 🐉HTML展示 <body> <!-- partial:index.partial.html --> <svg><defs><g id=

怎么选择数据安全交换系统,能够防止内部员工泄露数据?

数据泄露可能给企业带来诸多风险&#xff1a;财产损失、身份盗窃、骚扰和诈骗、经济利益受损、客户信任度下降、法律风险和责任等&#xff0c;《2021年度数据泄漏态势分析报告》中显示&#xff0c;在数据泄露的主体中&#xff0c;内部人员导致的数据泄漏事件占比接近60%。 员工…