python学习之路 - PySpark快速入门

news2024/11/15 23:53:29

目录

  • 一、PySpark实战
    • 1、前言介绍
    • 2、基础准备
        • a、pySpark库的安装
        • b、构建pySpark执行环境入口对象
        • c、pySpark编程模型
    • 3、数据输入
        • a、python数据容器转RDD对象
        • b、读取文件内容转RDD对象
    • 4、数据计算
        • a、map算子
        • b、flatMap算子
        • c、reduceByKey算子
        • d、综合案例
        • e、filter算子
        • f、distinct算子
        • g、sortBy算子
    • 5、数据输出
        • a、collect算子
        • b、reduce算子
        • c、take算子
        • e、count算子
        • f、saveAsTextFile算子

一、PySpark实战

1、前言介绍

Spark:Spark是用于大规模数据处理的统一分析引擎,是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB、EB等海量数据

pySpark:Spark对python的支持,就体现在python的第三方库pySpark上

2、基础准备

a、pySpark库的安装

命令:pip install pyspark
不知道操作步骤的可以看此文章 第六节 安装第三方python包

b、构建pySpark执行环境入口对象
from pyspark import SparkConf,SparkContext
#创建SparkConf类对象   setMaster:设置运行模式   setAppName:当前spark类的名称
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")       
#基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
#打印pySpark的运行版本
print(sc.version)
#停止Sparkcontext对象的运行
sc.stop()
c、pySpark编程模型
  • 数据输入:通过SparkContext类对象的成员方法完成数据的读取操作,读取后得到RDD类对象
  • 数据处理:通过RDD类对象的成员方法完成各种数据计算的需求
  • 数据输出:将处理完成后的RDD对象调用各种成员方法完成写出文件等操作

3、数据输入

a、python数据容器转RDD对象
  • 支持的数据容器有:list,tuple,set,dict,str
  • str容器会输出单个字符,字典容器会输出所有key,其他容器会输出原本内容
from pyspark import SparkConf,SparkContext

#定义数据容器
list = ['1', '2', '3']
tuple = ('1', '2', '3')
set = {'1', '2', '3'}
dict = {'1': 'abc', '2': 'def', '3': 'ghi'}
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
#将数据容器转换为RDD
rdd = sc.parallelize(dict)
print(rdd.collect())
sc.stop()
b、读取文件内容转RDD对象
  • 文件的每一行会变为一个元素

如创建一个文件,内容如图。
用下面代码取文件内容转换为RDD对象并输出

在这里插入图片描述

from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.textFile(文件地址)
print(rdd.collect())
sc.stop()

输出结果为:
['这是一个文件内容。', '但是', '这才是第三行内容', '你猜这是第几行', '对了,这是第五行']

4、数据计算

RDD中内置了丰富的成员方法,也叫“算子”

a、map算子
  • 功能:将RDD的数据一条一条处理(处理的逻辑是基于map算子中接收的处理函数),返回新的RDD
  • 多个map方法之间可以链式调用

案例1:将list中的每个元素都乘以10

from pyspark import SparkConf,SparkContext
#如果报错Python worker failed to connect back,需要引入os设置python安装位置
import os
os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list = ['1', '2', '3']
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(list)
#写法一
# rdd2 = rdd.map(lambda x:int(x) * 10)

#写法二
def func(x):
    return int(x) * 10
rdd2 = rdd.map(func)
rdd2 = rdd.map(func)
print(rdd2.collect())
sc.stop()

结果为:
[10, 20, 30]

案例2:将list中的每个元素都先乘以10,再加上5,分为两个map写

from pyspark import SparkConf,SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list = ['1', '2', '3']
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(list)
rdd2 = rdd.map(lambda x:int(x) * 10).map(lambda x: int(x) + 5)		#这里支持链式调用
print(rdd2.collect())
sc.stop()
b、flatMap算子
  • 功能:对RDD执行map操作,然后解除嵌套操作
  • 解除嵌套:假如输入的list的多层嵌套的,那么最后的结果全部元素都为list的一层

案例:将多层嵌套的 list 取出所有元素放到一层中

from pyspark import SparkConf,SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list = [[1,2,3],[4,5,6],[7,8]]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(list)
#写法一
rdd2 = rdd.flatMap(lambda x: x)
print(rdd2.collect())
sc.stop()

结果为:
[1, 2, 3, 4, 5, 6, 7, 8]
c、reduceByKey算子
  • 功能:针对KV型的RDD,自动按照key分组,然后根据提供的聚合逻辑,完成组内数据(value)的聚合操作
  • KV型的RDD其实就是二元元组,比如:[(‘a’,1) , (‘b’,1) , (‘c’,1)],每个元组中第一个值为key,第二个值为value

案例:将男女分组,并且计算两组的分数总和

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list = [('男', 99), ('男', 88), ('女', 77), ('女', 66), ('男', 55)]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(list)
# 写法一
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd2.collect())
sc.stop()

结果为:
[('女', 143), ('男', 242)]
d、综合案例

读取文件内容,统计各个元素出现次数,文件内容如下:

在这里插入图片描述

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list1 = []
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
#读取文件内容
rdd = sc.textFile("C://Users//HLY//Desktop//test.txt")
#先将内容切割成单个元素并一层展示,再将元素设置成二元元组,最后将元素分组统计
rdd2 = rdd.flatMap(lambda x : x.strip().split(" ")).map(lambda x : (x,1)).reduceByKey(lambda a, b: a + b)
print(rdd2.collect())
sc.stop()

结果为:
[('test2', 3), ('test3', 4), ('test', 3), ('test1', 3), ('test4', 4), ('test5', 4)]
e、filter算子
  • 功能:过滤想要的数据进行保留

案例:过滤出所有偶数

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list1 = [1,2,3,4,5,6,7,8,9,10]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
#读取文件内容
rdd = sc.parallelize(list1)
rdd2 = rdd.filter(lambda x : x % 2 == 0)
print(rdd2.collect())
sc.stop()

结果为:
[2, 4, 6, 8, 10]
f、distinct算子
  • 功能:对RDD中的数据进行去重,返回新的RDD

案例:对已有的列表进行去重

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list1 = [1,1,2,2,2,2,3,3,3,4,4,4]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
#读取文件内容
rdd = sc.parallelize(list1)
rdd2 = rdd.distinct()
print(rdd2.collect())
sc.stop()

结果为:
[1, 2, 3, 4]
g、sortBy算子
  • 功能:对RDD数据进行排序,基于指定的排序依据
  • 参数:
    • func:告知RDD是对那个数据进行排序,比如lambda x:x[1] 表示对rdd中第二列元素进行排序
    • ascending:True升序,False降序
    • numPartitions:用多少分区排序,单个分区时传1

案例:对给出的二元集合根据第二个元素进行降序排列

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list1 = [('test', 1), ('test1', 5), ('test3', 2), ('test4', 4), ('test5', 8), ('test6', 7), ('test7', 6)]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
rdd2 = rdd.sortBy(lambda x: x[1], ascending=False)
print(rdd2.collect())
sc.stop()

结果为:
[('test5', 8), ('test6', 7), ('test7', 6), ('test1', 5), ('test4', 4), ('test3', 2), ('test', 1)]

5、数据输出

a、collect算子
  • 功能:将RDD各个分区内的数据统一收集到Driver当中,形成一个List对象

案例:输出RDD的内容

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list1 = [1,2,3,4,5]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
print(rdd.collect())
sc.stop()

结果为:
[1,2,3,4,5]
b、reduce算子
  • 对RDD的全部数据按照传入的逻辑进行聚合,返回一个数字

案例:计算列表中的所有元素和

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list1 = [1, 2, 3, 4, 5]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
num = rdd.reduce(lambda a, b: a + b)
print(num)
sc.stop()

结果为:
15
c、take算子
  • 功能:取RDD的前N个元素,组成 List 返回

案例:取出列表前3个元素

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list1 = [1, 2, 3, 4, 5]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
list = rdd.take(3)
print(list)
sc.stop()

结果为:
[1, 2, 3]
e、count算子
  • 功能:计算RDD有多少条数据,返回一个数字

案例:获取列表中的元素个数

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list1 = [1, 2, 3, 4, 5]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
num = rdd.count()
print(num)
sc.stop()

结果为:
5
f、saveAsTextFile算子
  • 功能:将RDD的数据写入文本文件中
  • 执行此方法需要安装hadoop环境,具体配置过程可以看 这篇文章
  • 其输出内容是根据区分决定的,有多少分区就会输出多少个文件。内容会均匀分摊到各个文件中。分区数默认与电脑的CPU内核一致

案例1:输出列表内容到各个文件中

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"
os.environ["HADOOP_HOME"] = "D:/hadoop/hadoop-3.0.0"

list1 = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
rdd.saveAsTextFile("C:/Users/HLY/Desktop/test")
sc.stop()

在这里插入图片描述
结果会生成16个内容文件和2个状态文件,并且16个内容文件中每个文件中都有一个数字

案例2:将列表内容输出到一个文件中

#方法一:配置全局并行度为1

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"
os.environ["HADOOP_HOME"] = "D:/hadoop/hadoop-3.0.0"

list1 = [1, 2, 3, 4, 5]
#设置全局并行度为1
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app").set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
rdd.saveAsTextFile("C:/Users/HLY/Desktop/test")
sc.stop()
#方法二:设置分区数为1

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"
os.environ["HADOOP_HOME"] = "D:/hadoop/hadoop-3.0.0"

list1 = [1, 2, 3, 4, 5]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
#设置分区数为1
rdd = sc.parallelize(list1,numSlices= 1)
rdd.saveAsTextFile("C:/Users/HLY/Desktop/test")
sc.stop()

在这里插入图片描述

最后会生成1个结果文件,3个其他文件,并且内容都会在 part-00000 文件中显示

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2093180.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据可视化库(Matplotlib)

目录 常规绘图方法 细节设置 子图和标注 风格设置 常用图表绘制 盒图 直方图和散点图 3D图 布局设置 常规绘图方法 首先导入工具包,一般用plt来当作Matplotlib的别名: import matplotlib.pyplot as plt %matplotlib inline 指定魔法指令之后…

刚刚放出GPT-5上线时间,转身就一个限制,OpenAI你真行!

大家好,我是AI肝铁侠。 在6月23日,OpenAI 首席技术官米拉穆拉蒂 (Mira Murati) 表示,GPT-5 是 OpenAI 的下一代人工智能产品,预计将在 2025 年底或 2026 年初,实现博士级别的智能。 说实话OpenAI又把GPT5计划发布时间…

Leetcode面试经典150题-45.跳跃游戏II

解法都在代码里,不懂就留言或者私信,这个题绝对比动态规划的解法强 class Solution {/**本题我们先不用动态规划了,因为从任何一个位置都可能跳到最后一个位置,用动态规划的成本太高了本题的解题思路:看看某个步数内最…

Vue项目安装依赖(npm install)报错的解决

天行健,君子以自强不息;地势坤,君子以厚德载物。 每个人都有惰性,但不断学习是好好生活的根本,共勉! 文章均为学习整理笔记,分享记录为主,如有错误请指正,共同学习进步。…

OLED模块

目录 一:OLED使用原理 硬件部分 SSD1306框图及引脚定义 选择通信接口 原理图 软件部分 4线SPI的传输时序 3线SPI的传输时序 I2C的通信时序 执行逻辑框图 二:基本命令表 滚动命令表 寻址设置命令表 硬件配置命令表 时间及驱动命令 初始化过…

《自然语言处理》—— 词向量之CountVectorizer方法实现

文章目录 一、什么是词向量,为什么要进行词向量二、CountVectorizer方法简单介绍1、基本作用2、参数详解 三、示例:代码实现 一、什么是词向量,为什么要进行词向量 词向量是一种将单词或短语映射到实数向量空间的技术。在自然语言处理中&…

Loki Unable to fetch labels from Loki (no org id)

应该是多租户相关导致的 参考文档: 参考文档cMulti-tenancy | Grafana Loki documentationDescribes how Loki implements multi-tenancy to isolate tenant data and queries.https://grafana.com/docs/loki/latest/operations/multi-tenancy/ https://github.com/grafana…

中国招标投标平台JS逆向:DES加密与Python纯算还原

中国招标投标平台JS逆向:DES加密与Python纯算还原 目录 🔐 JS DES解密🧮 Python版本的纯算实现 🔐 JS DES解密 在中国招标投标公共服务平台的分析过程中,发现了数据加密采用了DES算法。DES(数据加密标准&…

JS运行机制及事件循环机制

进程:独立运行,拥有资源空间的应用程序 线程:CPU调度的最小单位 浏览器: 多进程 浏览器有哪些进程? Browser进程,也是主进程 负责各个页面的管理 创建 销毁前进后退等网络资源下载 插件进程:比如Chrome的…

多线程+连接池+代理 运行一段时间线程阻塞,如何解决??

🏆本文收录于《CSDN问答解惑-专业版》专栏,主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收…

【CSP:202012-2】期末预测之最佳阈值(Java)

题目链接 202012-2 期末预测之最佳阈值 题目描述 求解思路 前缀和 根据题意我们可以得知: θ θ θ 值为 a[i].y 时的预测正确的次数等于 a[i].y 前面有多少个 result 0 以及后面有多少个result 1。定义Score类用来存储y和result,其中sum0表示a[1]…

苹果macOS 15.1 Beta 3发布 允许用户将App Store应用下载到外置硬盘

8 月 29 日消息,苹果今日向 Mac 电脑用户推送了 macOS 15.1 开发者预览版 Beta 3 更新(内部版本号:24B5035e),本次更新距离上次发布 Beta / RC 间隔 16 天。 苹果公司在发布 iOS / iPadOS 18.1 Beta 3 更新之外&#x…

redis的共享session应用

项目背景: 该项目背景就是黑马的黑马点评项目。 一:基于Session实现验证码登录流程 基本的登录流程我们做了很多了。这个是短信登录流程 其实和普通的登录流程就多了一个生成验证码,并将验证码保存在session中,并且呢&#xf…

20240831 每日AI必读资讯

Runway 突然删除清空了 HuggingFace 上的所有内容!原因不明... - 之前的项目也无法访问了,比如 Stable Diffusion v1.5也被删了 🔗https://huggingface.co/runwayml/stable-diffusion-v1-5 🔗GitHub 也空了:https:…

ue Rotate to face BB entry转向不对

可能原因: 角色模型没有到正向。 错误: 正确:

C语言:ASCII码表和字符操作

目录 目录 1. 引言 2. ASCII码表 2.1 控制字符 2.2 可显示字符 3. 例子 3.1 相关函数 3.2 打印能够显示的 ASCII码 3.3 字母大小写转换 3.4 数字转数字字符 1. 引言 因为计算机只是认识 0 和 1组成的一串串的二进制数字,为了将人类认识的文…

【时间盒子】-【1.序言】高效人士都在用的时间管理方法。我是如何通过鸿蒙元服务APP实现?

一、介绍 【时间盒子】系列内容将帮助开发者学习如何构建一个全新的HarmonyOS元服务应用,学习使用DevEco Studio创建新项目、使用预览器预览页面、使用真机调试APP、自定义弹窗、使用系统提醒能力(闹钟)、使用首选项数据持久化、熟悉ArkUI页…

Centos 下载和 VM 虚拟机安装

1. Centos 下载 阿里云下载地址 centos-7.9.2009-isos-x86_64安装包下载_开源镜像站-阿里云 2. VM 中创建 Centos 虚拟机 2.1 先打开 VM 虚拟机,点击首页的创建新的虚拟机 2.2 选择自定义,然后点击下一步。 2.3 这里默认就好,继续选择下一…

PE文件结构详解(非常详细)

最近在参考OpenShell为任务栏设置图片背景时,发现里面使用了IAT Hook,这一块没有接触过,去查资料的时候发现IAT Hook需要对PE文件结构有一定的了解,索性将PE文件结构的资料找出来,系统学习一下。 PE文件结构 Portable…

C++基础(1)——入门知识

目录 1.C版本更新 2.C参考⽂档: 3.C书籍推荐 4.C的第⼀个程序 5.命名空间 5.1namespace的价值 5.2namespace的定义 5.3 命名空间使⽤ 6.C输⼊&输出 7.缺省参数 8.函数重载 9.引⽤ 9.1引⽤的概念和定义 9.2引⽤的特性 9.3引⽤的使用 9.4const引⽤…