pyspark入门基础详细讲解

news2025/1/10 16:59:51

1.前言介绍

学习目标:了解什么是Speak、PySpark,了解为什么学习PySpark,了解课程是如何和大数据开发方向进行衔接

使用pyspark库所写出来的代码,既可以在电脑上简单运行,进行数据分析处理,又可以把代码无缝迁移到成百上千的服务器集群上去做分布式计算。

为什么要学习pyspark呢?

总结

2.基础准备

学习目标:掌握pyspark库的安装,掌握pyspark执行环境入口对象的构建,理解pyspark的编程模型。

建议使用国内代理镜像网站下载更快。

 简化代码,本质上是同一个意思,链式结构,链式调用化简程序 基本原则,就是我不管调用什么方法,我的返回值都是同一个对象啊

代码展示:
"""
演示获取pyspark的执行环境入库对象:SparkContext
并通过SparkContext对象获取当前PySpark的版本
"""

# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象  setMaster是描写运行模式   setAppName是设置当前Spark任务的名字
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 同一个意思,链式结构,链式调用化简程序
# 基本原则,就是我不管调用什么方法,我的返回值都是同一个对象啊
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

spark需要启动时间,所以代码的运行一小会,3.5.3就是当前spark的运行版本

这个sc非常非常重要哦,后续给大家讲解。

通过sc拿到数据输入,数据处理计算是通过RDD类对象的一系列成员方法来对数据进行计算,然后把结果对外进行输出

我们只需要记住后期写spark代码的三大步,把数据加载进来,对数据进行计算,把结果输出去

总结

3.数据输入

学习目标:理解RDD对象,掌握PySpark数据输入的2种方法。

RDD就和列表等数据容器差不多

python数据容器转RDD对象

parallelize成员方法把数据容器存入RDD对象

如果要查看RDD里面有什么内容,需要用collect()方法

字符串会把每一个字符都拆出来,存入RDD对象,字典仅有key被存入RDD对象

读取文件转RDD对象

总结

4.数据计算

map方法

学习目标:掌握RDD的map方法

map会把传入的每一个参数都返回一个值

你会发现报错了,报错的原因是spark没有找到python解释器

给他指定一条路径,这样就没有问题了。如果指定路径之后还是没有解决的,可能是因为pycharm版本太新,降低版本就行了,建议是pycharm3.10

对于简单函数我们可以使用lambda匿名函数。

结果是一样的

链式调用

总结

flatMap方法

学习目标:掌握RDD的flatMap方法对数据进行计算。

通过map,可以看到尽管我们把数据分成一个一个的,但是还是存在嵌套,依旧被嵌套在list当中

当我们使用了flatMap方法后,发现解除了嵌套

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] ="D:/python/venv/Scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize(["ikun 22","ikun 3","ik unh hhhh"])
# 需求:将RDD数据里面的一个个单词提取出来
rdd2 = rdd.flatMap(lambda x: x.split(" ")) # 使用空格进行切分
print(rdd2.collect())

使用flatMap可以解除内部嵌套,语法与map一样

总结:

reduceByKey方法

学习目标:掌握RDD的reduceByKey方法    

 

二元元组指的是元组里面存储的只有两个元素

KV型的RDD一般是两个元素,把第一个元素当成key,第二个当成value,自动按照key分组,然后根据你传入的逻辑计算value

(v,v)->(v)  意思是传入两个相同类型的参数,返回一个返回值,类型和传入要求一致

自动分组并且组内求和

总结

可以完成按key进行分组,并且组内进行逻辑计算

练习案例1

学习目标:完成使用PySpark进行单词计数的案例

数据文件

取出所有的单词,flatMap是把单词一个一个取出来,map是把单词一行一行取出来,一行是一个列表。

把单词转换成二元元组

完整代码

"""
完成练习案例:单词计数统计
"""
from pyspark import SparkConf,SparkContext
import os
# 1.构建执行环境入口对象
os.environ['PYSPARK_PYTHON'] ="D:/python/venv/Scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 2.读取数据文件
rdd = sc.textFile("D:/word.txt")
# 3.取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 4.将所有单词都转换成二元元组,单词为key,value设置为1
# (hello,1) (spark,1) (itheima,1) (itcast,1)
word_with_one_rdd = word_rdd.map(lambda word: (word,1))
# 5.分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)
# 6.打印输出结果
print(result_rdd.collect())

filter方法

学习目标:掌握RDD的filter方法

True被保留,False被丢弃

总结

distinct方法

学习目标:掌握RDD的distinct方法

不需要传入参数,功能简单就是去重操作

总结

sortBy方法

学习目标:掌握RDD的sortBy方法进行内容的排序

接收函数传入参数并且有一个返回值

目前我们没有解除到分布式,就先写上numPartitions=1

之前写过一个读取文件,统计单词的个数,现在让我们对他进行排序

可以自己控制升序或者降序,True升序,False降序

from pyspark import SparkConf,SparkContext
import os
# 1.构建执行环境入口对象
os.environ['PYSPARK_PYTHON'] ="D:/python/venv/Scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 2.读取数据文件
rdd = sc.textFile("D:/word.txt")
# 3.取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 4.将所有单词都转换成二元元组,单词为key,value设置为1
# (hello,1) (spark,1) (itheima,1) (itcast,1)
word_with_one_rdd = word_rdd.map(lambda word: (word,1))
# 5.分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)
# 6.对结果进行排序
final_rdd = result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(final_rdd.collect())

总结:

练习案例2

学习目标:完成练习案例2的开发

完整代码:

"""
完成练习案例:json商品统计
"""
# 1.各个城市销售额排名,从小到大
# 2.全部城市,有哪些商品类别在售卖
# 3.北京市有哪些商品类别在售卖
from pyspark import SparkConf,SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] ='D:/python/venv/Scripts/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 需求1:城市销售额排名
# 1.1 读取文件得到RDD
file_rdd = sc.textFile("D:/2222.txt")
# 1.2 取出一个个json字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))

# 1.3 将一个个json字符串转换为字典
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
# 1.4 取出城市和销售额数据
city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'],int(x['money'])))
# 1.5 按城市分组按销售额聚合
city_result_rdd = city_with_money_rdd.reduceByKey(lambda a,b:a+b)
# 1.6 按销售额聚合结果进行排序
result1_rdd = city_result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print("需求1的结果:",result1_rdd.collect())
# 需求2:全部城市有哪些商品类别在售卖
# 2.1 取出全部的商品类别
# 2.2 对全部商品类别进行去重
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2的结果:",category_rdd.collect())
# 需求3:北京市有哪些商品类别在售卖
# 3.1 过滤北京市的数据
beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
# 3.2 取出全部商品类别
# 3.3 进行商品类别去重
result3_rdd = beijing_data_rdd.map(lambda x: x['category']).distinct()
print("需求3的结果:",result3_rdd.collect())

5.数据输出

输出为python对象

学习目标:掌握将RDD的结果输出为python对象的各类方法

collect算子

reduce算子

reduce和reducebykey的区别是reducebykey是获取key然后组内计算,reduce是单纯的直接计算

take算子

就是取前N个元素

count算子

总结

from pyspark import SparkConf,SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] ='D:/python/venv/Scripts/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备RDD
rdd = sc.parallelize([1,2,3,4,5])
# collect算子,输出RDD为list对象
rdd_list:list = rdd.collect()
print(rdd_list)
print(type(rdd_list))
# reduce算子,对RDD进行两两融合
num = rdd.reduce(lambda a,b: a+b)
print(num)
# take算子,取出RDD前N个元素,组成list返回
take_list = rdd.take(3)
print(take_list)
# count,统计rdd内有多少条数据,返回值为数字
num_count = rdd.count()
print(f"rdd内有{num_count}个元素")
sc.stop()

输出到文件中

学习目标:掌握将RDD的内容输出到文件中,了解如何更改RDD的分区数为1

报错了,原因是配置的问题,接下来我们给他配置

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2237851.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

uniapp上拉刷新下拉加载

方法一: z-paging 的组件库: show-loading-more-no-more-view"false" 该属性控制是否显示 "加载更多" 或 "没有更多" 的提示。如果设为 false,则不会显示这些提示。如果设为 true,当数据加载完毕…

CSS教程(二)- CSS选择器

1. 作用 匹配文档中的某些元素为其应用样式。根据不同需求把不同的标签选出来。 2. 分类 分类 基础选择器 包含 标签选择器、ID选择器、类选择器、通用选择器等 复合选择器 包含 后代选择器、子代选择器、伪类选择器等 1 标签选择器 介绍 又称为元素选择器,根…

LeetCode 56.合并区间

思路: 类似于用最少的箭射气球题目,最主要是要处理区间之间是否有重叠,如果无重叠则加入数组,如果有重叠,则需要重新设判断的边界,与下一个区间继续判断。 难点在于 代码用法 需熟练掌握 思想简单&#…

【MySQL】MySQL基础知识复习(上)

前言 本篇博客将复习MySQL的基础知识,及着重复习CRUD(增删查改)操作。 目录 一.MySQL数据库基础知识 1.数据库操作 1.1显示当前的数据库 1.2 创建数据库 1.3 使用数据库 1.4 删除数据库 2.数据类型 2.1.数字类型 2.2字符串类型 2.3…

华为大变革?仓颉编程语言会代替ArkTS吗?

在华为鸿蒙生态系统中,编程语言的选择一直是开发者关注的焦点。近期,华为推出了自研的通用编程语言——仓颉编程语言,这引发了关于仓颉是否会取代ArkTS的讨论。本文将从多个角度分析这两种语言的特点、应用场景及未来趋势,探讨仓颉…

稀硫酸介质中 V 型球阀的材质选择与选型要点-耀圣

稀硫酸介质中 V 型球阀的材质选择与选型要点 在工业生产中,稀硫酸是一种常见的化学介质,对于输送和控制稀硫酸的阀门,正确的材质选择和选型至关重要。本文将介绍稀硫酸介质中 V 型球阀的材质选择,并提供一些选型的要点。 一、稀硫…

昇思大模型平台打卡体验活动:项目3基于MindSpore的GPT2文本摘要

昇思大模型平台打卡体验活动:项目3基于MindSpore的GPT2文本摘要 1. 环境设置 本项目可以沿用前两个项目的相关环境设置。首先,登陆昇思大模型平台,并进入对应的开发环境: https://xihe.mindspore.cn/my/clouddev 接着&#xff0…

定时器输入捕获实验配置

首先,第一个时基工作参数配置 HAL_TIM_IC_Init( ) 还是一样的套路,传参是一个句柄,先定义一个结构体 Instance:指向TIM_TypeDef的指针,表示定时器的实例。TIM_TypeDef是一个包含了定时器寄存器的结构体,用…

计算机视觉读书系列(1)——基本知识与深度学习基础

研三即将毕业,后续的工作可能会偏AI方向的计算机视觉方面,因此准备了两条线来巩固计算机视觉基础。 一个是本系列,阅读经典《Deep Learning for Vision System》,做一些总结跑一些例子,也对应本系列文章 二是OpenCV实…

运维智能化转型:AIOps引领IT运维新浪潮

1. AIOps是什么? AIOps(Artificial Intelligence for IT Operations),即人工智能在IT运维中的应用,通过机器学习技术处理运维数据(如日志、监控信息和应用数据),解决传统自动化运维…

SkyNet嵌入式系统目标检测实践测试分析

目标检测和跟踪对于资源受限的嵌入式系统来说是具有挑战性的任务。尽管这些任务是人工智能领域中计算量最大的任务之一,但它们在嵌入式设备上只能使用有限的计算和内存资源。与此同时,这种资源受限的实现通常需要满足额外的苛刻要求,如实时响…

「OC」SDWebimage的学习

「OC」SDWebimage的学习 前言 在知乎日报这个项目之中,我在很多情况下都会进行图片资源的网络申请。通过上网搜索我了解到了SDWebimage这个功能丰富的第三方库,进行了较为浅层的学习。因为SDWebimage这个库之中的相关内容还是较为多且复杂的&#xff0…

SIwave:释放 SIwizard 求解器的强大功能

SIwave 是一种电源完整性和信号完整性工具。SIwizard 是 SIwave 中 SI 分析的主要工具,也是本博客的主题。 SIwizard 用于研究 RF、clock 和 control traces 的信号完整性。该工具允许用户进行瞬态分析、眼图分析和 BER 计算。用户可以将 IBIS 和 IBIS-AMI 模型添加…

Kafka 可观测性最佳实践

Kafka 概述 Kafka 是由 LinkedIn 开发一个分布式的基于发布订阅模式的消息队列,是一个实时数据处理系统,可以横向扩展。与 RabbitMQ、RockerMQ 等中间件一样拥有几大特点: 异步处理服务解耦流量削峰 监控 Kafka 是非常重要的,因…

342--358作业整理(错误 + 重点)

目录 1. 在需要运行的类中 定义 main 方法 2. this 。访问逻辑:先访问本类中,再访问父类中可以访问的成员(不包括和本类中重名的成员) 3. super 。访问逻辑:super(父类对象)直接访问父类及以…

Android自启动管控

1. 自启动管控需求来源 自启动、关联启动、交叉启动、推送启动等现象的泛滥除了对个人信息保护带来隐患外,还会导致占用过多的系统CPU和内存资源,造成系统卡顿、发热、电池消耗过快;还可能引入一些包含“恶意代码”的进程在后台隐蔽启动&…

智能的编织:C++中auto的编织艺术

在C的世界里,auto这个关键字就像是一个聪明的助手,它能够自动帮你识别变量的类型,让你的代码更加简洁和清晰。下面,我们就来聊聊auto这个关键字的前世今生,以及它在C11标准中的新用法。 auto的前世 在C11之前&#x…

函数式编程Stream流(通俗易懂!!!)

重点:只关注传入的参数列表和方法体(数据操作) 1.Lambda表达式 本质是匿名内部类的优化,先写匿名内部类 1.1 基本用法 public class lambdaTest {public static void main(String[] args) { // int i calculateNum((…

C#里对数组的排序操作

一般情况下是采用 Array.Sort(a) 来进行排序。 例子代码如下: /** C# Program to Sort a String using Predefined Function*/ using System; class linSearch {public static void Main(){Console.WriteLine("Enter Number of Elements you Want to Hold in the Arra…