Python大数据处理利器,PySpark的入门实战

news2024/11/14 15:18:16

PySpark极速入门

一:Pyspark简介与安装

什么是Pyspark?

PySpark是Spark的Python语言接口,通过它,可以使用Python API编写Spark应用程序,目前支持绝大多数Spark功能。目前Spark官方在其支持的所有语言中,将Python置于首位。

如何安装?

在终端输入

pip intsall pyspark

或者使用pycharm,在GUI界面安装

二:编程实践

加载、转换数据

# 导入pyspark
# 导入pandas, 稍后与pyspark中的数据结构做对比
import pyspark
import pandas as pd

在编写spark程序前,我们要创建一个SparkSession对象

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark极速入门").getOrCreate()

可以看到会话的一些信息:使用的Spark版本、运行模式、应用程序名字

演示环境用的是local本地模式, * 代表的是使用全部线程 如果想用集群模式的话,可以去查看集群搭建的相关教程 届时pyspark程序作为spark的客户端,设置连接集群,就是真正的分布式计算了 目前只是本地模式,用多线程去模拟分布式计算。

spark

看看我们将用到的test1数据吧

使用read方法,用option设置是否读取csv的头,再指定路径就可以读取数据了

df_spark = spark.read.option("header", "true").csv("./data/test1.csv")

看看是什么类型

type(df_spark)
pyspark.sql.dataframe.DataFrame

再看看用pandas读取是什么类型

type(pd.read_csv("./data/test1.csv"))
pandas.core.frame.DataFrame

可以发现Spark读取这种结构化数据时,用的也是和pandas类似的dataframe结构 这也是Spark应用最广泛的数据结构

使用show方法打印数据

df_spark.show()
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

使用printSchema方法打印元数据信息,发现明明是数值类型的,它却读取为了字符串类型

df_spark.printSchema()
root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)

在读取时,加上类型推断,发现此时已经能正确读取了

df_spark = spark.read.option("header", "true").csv("./data/test1.csv",inferSchema=True)
df_spark.printSchema()
root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)

选择某些列, 可以发现不管选多列还是选单列,返回的都是dataframe 返回的也同样可以printSchema、show等dataframe使用的方法,做到了结构的统一

df_spark.select(["Name", "age"])
DataFrame[Name: string, age: int]
df_spark.select("Name")
DataFrame[Name: string]
df_spark.select(["Name", "age", "Salary"]).printSchema()
root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Salary: integer (nullable = true)

不用select,而用[]直接选取,就有点类似与pandas的series了

df_spark["Name"]
Column<'Name'>

column就不能直接show了

df_spark["age"].show()
---------------------------------------------------------------------------

TypeError                                 Traceback (most recent call last)

Input In [15], in <cell line: 1>()
----> 1 df_spark["age"].show()


TypeError: 'Column' object is not callable

用describe方法可以对dataframe做一些简单的统计

df_spark.describe().show()
+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  null|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  null| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+

用withColumn方法给dataframe加上一列

df_spark = df_spark.withColumn("Experience After 3 year", df_spark["Experience"] + 3)
df_spark.show()
+---------+---+----------+------+-----------------------+
|     Name|age|Experience|Salary|Experience After 3 year|
+---------+---+----------+------+-----------------------+
|    Krish| 31|        10| 30000|                     13|
|Sudhanshu| 30|         8| 25000|                     11|
|    Sunny| 29|         4| 20000|                      7|
|     Paul| 24|         3| 20000|                      6|
|   Harsha| 21|         1| 15000|                      4|
|  Shubham| 23|         2| 18000|                      5|
+---------+---+----------+------+-----------------------+

用drop方法删除列

df_spark = df_spark.drop("Experience After 3 year")
df_spark.show()
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

用withColumnRename方法重命名列

df_spark.withColumnRenamed("Name", "New Name").show()
+---------+---+----------+------+
| New Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

处理缺失值

看看接下来要带缺失值的test2数据吧

CSeoe.png

df_spark = spark.read.csv("./data/test2.csv", header=True, inferSchema=True)
df_spark.show()
+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+

用na.drop删除缺失值 how参数设置策略,any意思是只要一行里有缺失值,那就删了 any也是how的默认参数

df_spark.na.drop(how="any").show()
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

可以通过thresh参数设置阈值,代表超过一行中缺失值的数量超过这个值,才会被删除

df_spark.na.drop(how="any", thresh=2).show()
+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
+---------+----+----------+------+

也可以用subset参数设置关注的列 下面代码意思是,在Experience列中,只要有缺失值就删掉

df_spark.na.drop(how="any", subset=["Experience"]).show()
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
+---------+---+----------+------+

用fillna填充缺失值, 可以用字典对各列的填充值进行设置

df_spark.fillna({'Name': 'unknown', 'age': 18, 'Experience': 0, 'Salary': 0}).show()
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh| 18|         0| 40000|
|  unknown| 34|        10| 38000|
|  unknown| 36|         0|     0|
+---------+---+----------+------+

还可以调用机器学习模块的相关方法, 通过设置策略,可以用平均数、众数等方式填充

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['age', 'Experience', 'Salary'],
    outputCols = [f"{c}_imputed" for c in ['age', 'Experience', 'Salary']]
).setStrategy("mean")
imputer.fit(df_spark).transform(df_spark).show()
+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|null|      null| 40000|         28|                 5|         40000|
|     null|  34|        10| 38000|         34|                10|         38000|
|     null|  36|      null|  null|         36|                 5|         25750|
+---------+----+----------+------+-----------+------------------+--------------+

过滤操作

还是切换到test1数据

df_spark = spark.read.csv("./data/test1.csv", header=True, inferSchema=True)
df_spark.show()
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

可以使用filter方法对数据进行过滤操作,类似于SQL中的where 可以使用字符串的方式,也可以利用column方式去传递条件

df_spark.filter("Salary <= 20000").show()
+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+

df_spark.filter(df_spark["Salary"]<=20000).show()
+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+

如果是字符串,用 and 表示同时满足多个条件 如果是用column,用( & ) 连接多个条件

df_spark.filter("Salary <= 20000 and age <= 24").show()
+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+

df_spark.filter(
    (df_spark["Salary"]<=20000)
    & (df_spark["age"]<=24)
).show()
+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+

column中,用|表示或, ~表示取反
df_spark.filter(
    (df_spark["Salary"]<=20000)
    | (df_spark["age"]<=24)
).show()
+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+

df_spark.filter(
    (df_spark["Salary"]<=20000)
    | ~(df_spark["age"]<=24)
).show()
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

分组聚合

换一个数据集test3

df_spark = spark.read.csv("./data/test3.csv", header=True, inferSchema=True)
df_spark.show()
+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+

使用groupby方法对dataframe某些列进行分组

df_spark.groupBy("Name")
<pyspark.sql.group.GroupedData at 0x227454d4be0>

可以看到分组的结果是GroupedData对象,它不能使用show等方法打印 GroupedData对象需要进行聚合操作,才能重新转换为dataframe 聚合函数有sum、count、avg、max、min等

df_spark.groupBy("Departments").sum().show()
+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+

三:总结

Pandas的dataframe与PySpark的dataframe有许多相似之处,熟悉Pandas的同学可以很快适应它的API。目前可以粗浅地把PySpark理解为”分布式的Pandas“,不过,PySpark还有分布式机器学习的功能——Spark MLlib(可以理解为分布式的Sklearn、TensorFlow等),后续会给大家介绍。在集群中,它的dataframe可以分布在不同的机器上,以此处理海量数据。有兴趣的小伙伴可以通过虚拟机搭建一个Spark集群,进一步学习Spark。

Apache Spark™ - 用于大规模数据分析的统一引擎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/188751.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OpenCV-Python学习(18)—— OpenCV 图像几何变换之图像平移(cv.warpAffine)

1. 学习目标 学习图像的平移矩阵&#xff1b;学习 OpenCV 图像平移函数。 2. 图像的平移矩阵 平移是物体位置在水平和垂直方向的移动。 像素点 (x,y) 沿 x 轴平移 dx、沿 y 轴平移 dy&#xff0c;公式&#xff1a; 3. 图像平移函数 3.1 cv.warpAffine() 函数使用 cv.war…

R语言生物群落(生态)数据统计分析与绘图

包含&#xff1a;《R语言基础》、《tidyverse数据清洗》、《多元统计分析》、《随机森林模型》、《回归及混合效应模型》、《结构方程模型》、《统计结果作图》七合一版本 R 语言作的开源、自由、免费等特点使其广泛应用于生物群落数据统计分析。生物群落数据多样而复杂&#…

手动启动Oracle服务和Oracle监听服务和init.ora文件相关

Oracle 11g 安装未完全成功&#xff1b;安装完以后&#xff0c;服务只有2个&#xff1b;这样是用不了&#xff0c;oracle服务和oracle监听服务都没有&#xff1b; 尝试启动一下数据库&#xff0c;出现12560错误&#xff1b; 根据资料&#xff0c;可用如下命令启动Oracle服务&am…

【06】FreeRTOS临界段代码保护及调度器挂起与恢复

目录 1.临界段代码保护简介 2.临界段代码保护函数介绍 2.1任务级临界区调用格式示例 2.2中断级临界区调用格式示例 2.3函数调用特点 2.4任务级进入和退出临界段函数 2.5中断级进入和退出临界段函数 3.任务调度器的挂起和恢复 3.1任务调度器挂起函数vTaskSuspendAll() …

为什么大部分虚拟主机都配置SSD

对于任何站长来说&#xff0c;拥有一个不会加载的漂亮网站可能是毁灭性的。选择正确的托管服务对于确保网站始终以最佳状态运行至关重要。而由于新手站长呈爆发性增长态势&#xff0c;选择虚拟主机的站长日趋增多。本文就将介绍大部分虚拟主机都配置SSD的原因。SSD优势SSD在数据…

Windows下MySQL5与MySQL8的下载、安装、配置

MySQL版本简介 MySQL Community Server 社区版本&#xff0c;开源免费&#xff0c;自由下载&#xff0c;但不提供官方技术支持&#xff0c;适用于 大多数普通用户。MySQL Enterprise Edition 企业版本&#xff0c;需付费&#xff0c;不能在线下载&#xff0c;可以试用30天。提供…

EMT4J详细介绍与使用,帮你找到Java版本升级带来的问题,让你在项目jdk升级不在头疼

Java版本升级带来的问题 前因 java更新迭代速度巨快无比&#xff0c;Spring Framework 6 等项目已经至少需要 Java 17。但是&#xff0c;对于 Java 版本的采用是相对缓慢的。例如&#xff0c;在 Java 11 发布四年之后&#xff08;2022年&#xff09;&#xff0c;只有不到 49%…

[C语言]操作符

目录 1.操作符分类 2.算术操作符 3.位移操作符 3.1左移操作符 3.2右移操作符 4.位操作符 4.1’&‘&#xff08;按位与&#xff09; 4.2’|‘&#xff08;按位或&#xff09; 4.3‘^’&#xff08;按位异或&#xff09; 5.赋值操作符 5.1复合赋值符 6.单目操作…

IDEA搭建Finchley.SR2版本的SpringCloud父子基础项目-------Hystrix断路器

1.1分布式系统面临的问题 复杂分布式体系结构中的应用程序有数十个依赖关系&#xff0c;每个依赖关系在某些时候将不可避免地失败。 服务雪崩 多个微服务之间调用的时候&#xff0c;假设微服务A调用微服务B和微服务C&#xff0c;微服务B和微服务C又调用其它的微服务&#xff0…

高薪前端都应该具备的开发好习惯

格拉德威尔曾提出过一个“一万小时定律”&#xff0c;即任何人从平凡到大师的必要条件&#xff0c;就是历经1万小时的锤炼&#xff0c;而这“1万小时”也不是达到就行&#xff1b;如何构成&#xff0c;才是能否成为行业资深的关键。总结起来&#xff0c;就是四个字&#xff1a;…

Databend 开源周报 第 77 期

Databend 是一款强大的云数仓。专为弹性和高效设计。自由且开源。 即刻体验云服务&#xff1a;https://app.databend.com 。 What’s New 探索 Databend 本周新进展&#xff0c;遇到更贴近你心意的 Databend 。 Features & Improvements Meta 使用 expressin::TableSch…

蓝桥杯单片机快速得奖方法(分享一些实用技巧)

文章目录前言一、蓝桥杯单片机痛点1.LED灯微亮2.数码管微亮3.蜂鸣器乱叫4.驱动程序不会写5.按键程序灵敏度低容易误操作6.矩阵按键代码总是记不住一写就忘记7.使用大量延时函数导致程序运行效率低下二、痛点解决方法1.锁存器的错误操作2.代码不熟练3.没有高效的代码总结前言 又…

微信小程序 java python django加油站服务系统

目 录 摘 要 I ABSTRACT II 第一章 绪 论 1 1.1选题背景 2 1.2研究现状 3 1.3研究内容 7 第二章 开发工具及关键技术介绍 8 2.1微信开发者工具 8 2.2小程序框架以及目录结构介绍 8 第三章 系统分析 10 3.1需求分析 10 3.2可行性分析 1…

Appium是如何工作的

Appium是由node.js开发的开源自动化测试工具&#xff0c;可用来测试移动端的Native、Hybrid和移动Web应用&#xff0c;被测平台包括Android和iOS&#xff08;最近宣称已支持Windows&#xff09;。 Native apps – 使用Android、iOS和Windows SDK开发的应用。 Mobile web apps …

存储区域网络将占下一代数据存储市场的 7%

根据 Future Market Insights 的最新行业分析&#xff0c;全球存储区域网络 (SAN) 市场预计将显示稳定的增长机会&#xff0c;在 2022 年至 2029 年的评估期内复合年增长率约为 3.9%。 2021 年全球市场估值达到 195.76 亿美元&#xff0c;到 2029 年将进一步扩大至 268.67 亿美…

车载以太网 - SomeIP测试专栏 - SomeIP Entry - 04

前面总纲内容有说,车载以太网中的SomeIP内容是非常多的,并且为了实现SOA的相关需求,提供了多种多样的报文类型,因此也就有了今天要说的SomeIP-SD中的重要组成部分之一的条目(Entry)部分,而SomeIP-SD在车载网络中占有相当大的比重,可以当做是一定要实现的,如果这块不实…

实现自己的数据库四

一前言上一篇已经说明了B树的一些原理&#xff0c;也讲到&#xff0c;我们目前采用的持久化数据的方式&#xff0c;而且我们是单独的插入数据&#xff0c;没有任何元数据信息&#xff0c;虽然插入的速度很快&#xff0c;因为是采用追加的方式。但是这种方式插入速度很快&#x…

Pd1 药物研发进展|销售数据|市场规模|竞争格局|前景分析

Programmed Death-1 (PD-1; CD279) 是一种在活化 T 细胞中诱导的抑制性受体&#xff0c;作为多种癌症的一线治疗药物。然而&#xff0c;严重的免疫相关不良反应限制了PD-1/PD-L1单克隆抗体的临床应用&#xff0c;尽管其疗效良好。 也迫切需要开发针对 PD-1/PD-L1 轴的新型抑制剂…

Torch 论文复现:Vision Transformer (ViT)

论文标题&#xff1a;An Image is Worth 16x16 Words: Transformers for Image Recognition at Scale 从 TPUv3-core-days 可以看到&#xff0c;ViT 所需的训练时间比 ResNet 更短&#xff0c;同时 ViT 取得了更高的准确率 ViT 的基本思想是&#xff0c;把一张图片拆分成若干个…

Paddle入门实战系列(四):中文场景文字识别

✨写在前面&#xff1a;强烈推荐给大家一个优秀的人工智能学习网站&#xff0c;内容包括人工智能基础、机器学习、深度学习神经网络等&#xff0c;详细介绍各部分概念及实战教程&#xff0c;通俗易懂&#xff0c;非常适合人工智能领域初学者及研究者学习。➡️点击跳转到网站。…