【Python】PySpark

news2024/11/17 19:46:54

前言

Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。

简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。

Spark对Python语言的支持,重点体现在Python第三方库:PySpark

PySpark是由Spark官方开发的Python语言第三方库。

Python开发者可以使用pip程序快速的安装PySpark并像其它第三方库那样直接使用。

在这里插入图片描述

基础准备

安装

同其它的Python第三方库一样,PySpark同样可以使用pip程序进行安装。

pip install pyspark

或使用国内代理镜像网站(清华大学源)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark

构建PySpark执行环境入口对象

想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。

PySpark的执行环境入口对象是:类SparkContext的类对象

# 导包
from pyspark import SparkConf, SparkContext

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

# 打印PySpark的运行版本
print(sc.version)

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

运行需要Java环境,推荐jdk8

PySpark的编程模型

SparkContext类对象,是PySpark编程中一切功能的入口。

PySpark的编程,主要分为如下三大步骤:

在这里插入图片描述

数据输入

PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象

RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)

PySpark针对数据的处理,都是以RDD对象作为载体,即:

  • 数据存储在RDD内
  • 各类数据的计算方法,也都是RDD的成员方法
  • RDD的数据计算方法,返回值依旧是RDD对象

在这里插入图片描述

Python数据容器转RDD对象

PySpark支持通过SparkContext对象的parallelize成员方法,将list/tuple/set/dict/str转换为PySpark的RDD对象

# 导包
from pyspark import SparkConf, SparkContext

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd1 = sc.parallelize([1, 2, 3])    
rdd2 = sc.parallelize((1, 2, 3))    
rdd3 = sc.parallelize({1, 2, 3})    
rdd4 = sc.parallelize({'key1': 'value1', 'key2': 'value2'}) 
rdd5 = sc.parallelize('hello')  

# 输出RDD的内容,需要使用collect()
print(rdd1.collect())   # [1, 2, 3]
print(rdd2.collect())   # [1, 2, 3]
print(rdd3.collect())   # [1, 2, 3]
print(rdd4.collect())   # ['key1', 'key2']
print(rdd5.collect())   # ['h', 'e', 'l', 'l', 'o']

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

注意:

  • 字符串会被拆分出一个个的字符,存入RDD对象
  • 字典仅有key会被存入RDD对象

读取文件转RDD对象

PySpark也支持通过SparkContext入口对象来读取文件,构建出RDD对象。

先提前预备一个txt文件

hello
python
day
# 导包
from pyspark import SparkConf, SparkContext

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.textFile('E:\\code\\py-space\\8.27\\hello.txt')

# 输出RDD的内容,需要使用collect()
print(rdd.collect())    # ['hello', 'python', 'day']

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

数据计算

RDD对象内置丰富的:成员方法(算子)

map算子

将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD

rdd.map(func)
# func: f:(T) -> U
# f: 表示这是一个函数
# (T) -> U 表示的是方法的定义:()表示无需传入参数,(T)表示传入1个参数
# T是泛型的代称,在这里表示 任意类型
# U是泛型的代称,在这里表示 任意类型

# (T) -> U : 这是一个函数,该函数接收1个参数,传入参数类型不限,返回一个返回值,返回值类型不限
# (A) -> A : 这是一个函数,该函数接收1个参数,传入参数类型不限,返回一个返回值,返回值类型和传入参数类型一致

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5, 6])

# 通过map方法将全部数据乘以10,传入参数为函数
rdd2 = rdd.map(lambda x: x * 10)

# 输出RDD的内容,需要使用collect()
print(rdd2.collect())   # [10, 20, 30, 40, 50, 60]

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

由于map()的返回值还是RDD对象,可以继续在尾部进行链式调用

rdd3 = rdd.map(lambda x: x * 10).map(lambda x: x + 9)

flatMap算子

对RDD执行map操作,然后进行解除嵌套操作。

在这里插入图片描述

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize(['a b c', 'd e f'])

# 输出RDD的内容,需要使用collect()
print(rdd.map(lambda x: x.split(' ')).collect())    # [['a', 'b', 'c'], ['d', 'e', 'f']]
print(rdd.flatMap(lambda x:x.split(' ')).collect())   # ['a', 'b', 'c', 'd', 'e', 'f']

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduceByKey算子

针对KV型(二元元组)RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作

rdd.reduceByKey(func)
# func: (V, V) -> V
# 接收2个传入参数(类型要一致),返回一个返回值,返回值类型和传入参数类型要求一致

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])

# 输出RDD的内容,需要使用collect()
print(rdd.reduceByKey(lambda a, b: a+b).collect())  # [('b', 3), ('a', 2)]

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduceByKey中的聚合逻辑是:比如有[1,2,3,4,5],然后聚合函数是:lambda a,b: a+b

在这里插入图片描述

注意:reduceByKey中接收的函数,只负责聚合,不理会分组;分组是自动by key来分组的

filter算子

过滤想要的数据进行保留。

rdd.filter(func)
# func: (T) -> bool
# 传入一个参数任意类型,返回值必须是True/False,返回是True的数据被保留,False的数据被丢弃

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5, 6])

# 输出RDD的内容,需要使用collect()
print(rdd.filter(lambda x: x % 2 == 0).collect())  # [2, 4, 6]

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

distinct算子

对RDD数据进行去重,返回新的RDD

rdd.distinct() # 无需传参

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 3, 2, 6])

# 输出RDD的内容,需要使用collect()
print(rdd.distinct().collect())  # [6, 1, 2, 3]

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

sortBy算子

对RDD数据进行排序,基于你指定的排序依据。

rdd.sortKey(func, ascending=False, numPartitions=1)
# func: (T) -> U:告知按照RDD中的哪个数据进行排序,比如lambda x: x[1]表示按照RDD中的第二列元素进行排序
# ascending:True升序,False降序
# numPartitions:用多少分区排序,全局排序需要设置为1

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([('Aiw', 9), ('Tom', 6), ('Jack', 8), ('Bolb', 5)])

# 输出RDD的内容,需要使用collect()
print(rdd.sortBy(lambda x: x[1], ascending=False,
      numPartitions=1).collect())  # [('Aiw', 9), ('Jack', 8), ('Tom', 6), ('Bolb', 5)]

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

数据输出

collect算子

将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象。

rdd.collect()
# 返回值是一个List

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3])

rdd_list: list = rdd.collect()

print(rdd_list)   # [1, 2, 3]
print(type(rdd_list))   # <class 'list'>

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduce算子

对RDD数据集按照你传入的逻辑进行聚合

rdd.reduce(func)
# func:(T, T) -> T
# 传入2个参数,1个返回值,要求返回值和参数类型一致

在这里插入图片描述

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize(range(1, 10))

print(rdd.reduce(lambda a, b: a+b))   # 45

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

take算子

取RDD的前N个元素,组合成List进行返回。

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize(range(1, 10))

rdd_take: list = rdd.take(3)

print(rdd_take)   # [1, 2, 3]
print(type(rdd_take))   # <class 'list'>

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

count算子

计算RDD有多少条数据,返回值是一个数字。

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize(range(1, 10))

rdd_count: int = rdd.count()

print(rdd_count)   # 9
print(type(rdd_count))   # <class 'int'>

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

saveAsTextFile算子

将RDD的数据写入文本文件中。支持本地写出、HDFS等文件系统。

注意事项:

在这里插入图片描述

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'
os.environ['HADOOP_HOME'] = 'D:/Hadoop-3.0.0'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize(range(1, 10))

rdd.saveAsTextFile('./8.27/output') # 运行之前确保输出文件夹不存在,否则报错

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

上述代码输出结果,输出文件夹内有多个分区文件

修改RDD分区为1个

方式一:SparkConf对象设置属性全局并行度为1:

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')
# 设置属性全局并行度为1
conf.set('spark.default.parallelism','1')
# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

方式二:创建RDD的时候设置(parallelize方法传入numSlices参数为1)

rdd = sc.parallelize(range(1, 10), numSlices=1)
rdd = sc.parallelize(range(1, 10), 1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/936132.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ADC芯片CS1238,CS1237介绍和代码

一.芯片介绍 CS1238是一款高精度、低功耗 模数转换芯片&#xff0c;两路差分输入通道&#xff0c;内置温度传感器和高精度振荡器。MCU可以通过2线的SPI 接口SCLK、DRDY与CS1237进行通信&#xff0c;对其进行配置&#xff0c;例如通道选择、PGA选择、输出速率选择等。下面是CS1…

9. 解谜游戏

目录 题目 Description Input Notes 思路 暴力方法 递归法 注意事项 C代码&#xff08;递归法&#xff09; 关于DFS 题目 Description 小张是一个密室逃脱爱好者&#xff0c;在密室逃脱的游戏中&#xff0c;你需要解开一系列谜题最终拿到出门的密码。现在小张需要打…

解决CMake报“Compatible with CMake < 2.8.12 will be removed“问题

今天在使用CMake编译zlib开源库时&#xff0c;弹出编译警告"Compatibility with CMake < 2.8.12 will be removed from a future "&#xff0c;如图(1)所示&#xff1a; 图(1) CMake报版本太低错误 将CMakeList.txt里的cmake版本调高 出现该问题的原因是&#xff…

2023.08.27 学习周报

文章目录 摘要文献阅读1.题目2.重点3.引言4.方法5.实验结果6.结论 深度学习Majorization-Minimization算法1.基本思想2.要求3.示意图 总结 摘要 This week, I read a computer science on the prediction of atmospheric pollutants in urban environments based on coupled d…

PostgreSQL命令行工具psql常用命令

1. 概述 通常情况下操作数据库使用图形化客户端工具&#xff0c;在实际工作中&#xff0c;生产环境是不允许直接连接数据库主机&#xff0c;只能在跳板机上登录到Linux服务器才能连接数据库服务器&#xff0c;此时就需要使用到命令行工具。psql是PostgreSQL中的一个命令行交互…

达梦数据库修改超级管理员密码

ENABLE_LOCAL_OSAUTH0 开启认证 ENABLE_LOCAL_OSAUTH1不认证、免密登陆 谨慎操作 目录 修改账户密码 锁定解锁账户 登录成功 修改账户密码 alter user SYSDBA IDENTIFIED by "Passw0rd!!"; 锁定解锁账户 ALTER USER 用户名 ACCOUNT UNLOCK; 问题分析&#xff1a…

SD 总线引脚介绍

参考 https://www.cnblogs.com/justin-y-lin/p/12259851.html SD卡与TF卡的引脚定义 - 360文档中心

网络服务第一次作业

1.配置ntp时间服务器&#xff0c;确保客户端主机能和服务主机同步时间 2.配置ssh免密登陆&#xff0c;能够实现客户端主机通过服务器端的redhat账户进行基于公钥验证方式的远程连接 timedatetectl---查看时间 修改配置文件 文件路径 vim /etc/chrony.conf-----修改服务…

学习ts(十)装饰器

定义 装饰器是一种特殊类型的声明&#xff0c;它能够被附加到类声明&#xff0c;方法&#xff0c;访问符&#xff0c;属性或参数上&#xff0c;是一种在不改变原类和使用继承的情况下&#xff0c;动态的扩展对象功能。 装饰器使用expression形式&#xff0c;其中expression必须…

软考:中级软件设计师:网络类型与拓扑结构,网络规划与设计,ip地址与子网划分,特殊含义的IP地址

软考&#xff1a;中级软件设计师:网络类型与拓扑结构 提示&#xff1a;系列被面试官问的问题&#xff0c;我自己当时不会&#xff0c;所以下来自己复盘一下&#xff0c;认真学习和总结&#xff0c;以应对未来更多的可能性 关于互联网大厂的笔试面试&#xff0c;都是需要细心准…

「料见」vol27.回顾 | 知名视觉SLAM专家高翔:一起来聊聊视觉SLAM在自动驾驶和机器人领域的实际应用

高翔老师继畅销书《视觉SLAM十四讲》之后&#xff0c;又推出了新作《自动驾驶与机器人中的SLAM技术》。该书自出版以来备受瞩目&#xff0c;为读者提供了关于SLAM技术的全面而深入的理解。 第27期“料见”闭门分享会&#xff0c;我“门”非常开心邀请到知名视觉SLAM专家———…

集合类的线程安全问题

集合类 原来的集合类,大部分都不是线程安全的 Vector, Stack, HashTable, 是线程安全的(不建议用), 其他的集合类不是线程安全的. 加了锁,不一定就是线程安全的,不加锁也不一定是线程不安全的,需要具体问题具体分析 虽然get,set方法都加了synchronized,但是如果不能正确使用,也…

RabbitMQ从原理到实战—基于Golang【万字详解】

文章目录 前言一、MQ是什么&#xff1f;优势劣势 二、MQ的用途1、应用解耦2、异步加速3、削峰填谷4、消息分发 三、RabbitMQ是什么1、AMQP 协议2、RabbitMQ 包含的要素3、RabbitMQ 基础架构 四、实战1、Simple模式(即最简单的收发模式)2、Work Queues 模型3、Publish/Subscribe…

ESP8266固件烧录

文章目录 硬件电路烧录工具完整固件资料+烧录工具硬件电路 烧写模式: GPIO0:0 此时通过REST复位引脚复位,8266进入烧写模式。 烧写通过串口烧写,波特率设置115200 运行模式: GPIO0:1 此时通过REST复位引脚复位,8266进入烧写模式。 烧录工具 烧写工具下载链接:https:…

Python功能制作之简单的绘画板

可能需要安装的库 pip install pillow pip install tk制作 我们使用Python的Tkinter库创建的一个简单绘画软件。 首先创建了一个简单的绘画应用&#xff0c;可以选择颜色、切换画笔和橡皮擦模式、清空画布以及绘制自由曲线。 里面的主要结构和功能是&#xff1a; 导入必要的…

计算机组成原理(主存储器的基本组成、 运算器的基本组成、 控制器的基本组成、完成一条指令的三个阶段)

主存储器的基本组成&#xff1a; 这个是读数据操作图&#xff1a; 读入数据与菜鸟驿站的取货流程差不多&#xff1a; 写入数据的过程与读入数据类似&#xff1a; 1、cpu 指明想要写入到那个位置&#xff08;写到MAR中&#xff09; 2、想要写入的数据会放到MDR中 3、c…

ctfshow-web-红包题第七弹

0x00 前言 CTF 加解密合集CTF Web合集 0x01 题目 0x02 Write Up 首先上来访问就是phpinfo。常规思路先扫一下目录。 发现一个.git文件403&#xff0c;这种情况通常都是存在文件夹&#xff0c;但是不能直接访问文件夹导致的。那么我们可以使用git_extract工具进行获取内容。…

WSL2连接不了外网怎么办?

某天忽然WLAN变成地球图标&#xff0c;上不了Internet&#xff0c;搞了半天网络适配器&#xff0c;仍然不行。回忆之前做过的操作&#xff0c;曾经运行过ZoogVPN&#xff0c;试着启动并连接&#xff0c;然后退出&#xff0c;WLAN神奇地恢复了连接&#xff0c;可以上Internet了。…