PySpark
学习目标
- 了解什么是
Spark
、PySpark
- 了解为什么学习
PySpark
- 了解如何和大数据开发方向进行衔接
Spark是什么?
Apache Spark是用于大规模数据
处理的统一分析引擎
。
简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。
基础准备
学习目标
- 掌握PySpark库的安装
- 掌握PySpark执行环境入口对象的构建
- 理解PySpark的编程模型
安装PySpark
pip install pyspark
或者使用国内代理镜像
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
构建PySpark执行环境入口对象
后续的工作都要基于类SparkContext
的类对象
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
运行结果,打印PySpark
的运行版本
PySpark的编程模型
SparkContext
类对象,是PySpark编程中一切功能的入口。
PySpark的编程,主要分为如下三大步骤:
- 数据输入
- 通过
SparkContext
类对象的成员方法 - 完成数据的读取操作
- 读取后得到RDD类对象
- 通过
- 数据处理计算
- 通过RDD类对象的成员方法
- 完成各种数据计算的需求
- 数据输出
- 将处理完成后的RDD对象
- 调用各种成员方法完成
- 写出文件、转换为list等操作
数据输入
学习目标
- 理解RDD对象
RDD对象
RDD全称:弹性分布式数据集(Resilient Distributed Datasets)
PySpark针对数据的处理,都是以RDD对象作为载体,即:
- 数据存储在RDD内
- 各类数据的计算方法,也都是RDD的成员方法
- RDD的数据计算方法,返回值依旧是RDD对象
PySpark支持通过SparkContext对象的parallelize
成员方法,将:
- list
- tuple
- set
- dict
- str
转换为PySpark的RDD对象
注意:
- 字符串会被拆分出一个个的字符,存入RDD对象
- 字典仅有
key
会被存入RDD对象
示例代码
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("kevindurant")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})
# 如果要查看RDD里面有什么内容,需要用collect()方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
sc.stop()
运行结果
读取文件转RDD对象
PySpark也支持通过SparkContext
入口对象,来读取文件,构建出RDD对象
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
rdd = sc.textFile("./hello.txt")
print(rdd.collect())
sc.stop()
hello.txt
文件内容如下:
代码运行结果如下: