PySpark数据计算中常用的成员方法(算子)

news2025/1/16 16:58:31

目录

一.回顾

二.数据计算

map算子

 演示

 flatMap算子

 演示

 reduceByKey算子

 演示

 练习案例1

需求

 解决步骤

完整代码

filter算子

 演示

distinct算子

 演示

 sortBy算子

 演示

 练习案例2

解决步骤

完整代码

三.总结


一.回顾

 1.RDD对象是什么?为什么要使用它?
RDD对象称之为分布式弹性数据集,是PySpark中数据计算的载体,它可以:

  • 提供数据存储
  • 提供数据计算的各类方法
  • 数据计算的方法,返回值依旧是RDD (RDD迭代计算)

后续对数据进行各类计算,都是基于RDD对象进行

2.如何输入数据到Spark(即得到RDD对象)

  • 通过SparkContext的parallelize成员方法,将Python数据容器转换为RDD对象
  • 通过SparkContext的textFile成员方法,读取文本文件得到RDD对象

二.数据计算

PySpark的数据计算,都是基于RDD对象来进行的,那么如何进行呢?自然是依赖,RDD对象内置丰富的:成员方法(算子)

介绍几种常见的成员方法(算子)如下:

  1. map方法
  2. flatmap方法
  3. reduceByKey方法
  4. filter方法
  5. distinct方法
  6. sortBy方法

map算子

功能: map算子,是将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD
语法:

 演示

这样运行会报错,这是因为 Spark 中支持环境变量,设置一个环境变量明确告诉他,python在哪就可以了。

 如上图,告诉spark运行时,在哪找到python解释器就行,

 打开设置你就可以看到你的python解释器的路径,然后导入os包,设置环境就行

代码如下

#导包
from pyspark import  SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"

#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)

rdd=sc.parallelize([1,2,3,4,5])

def func(data):
    return data*10
rdd2=rdd.map(func)

print(rdd2.collect())

这里的函数定义,我们可以用匿名函数(lambda)更简洁

#导包
from pyspark import  SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"

#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)

rdd=sc.parallelize([1,2,3,4,5])

# def func(data):
#     return data*10
rdd2=rdd.map(lambda x:x*10)

print(rdd2.collect())

各算子之间还可以进行链式调用

 flatMap算子

功能:对rdd执行map操作,然后进行解除嵌套操作.

解除嵌套:

 演示代码

 演示

我们先用map试试看是什么结果

 我们可以看到没有解除嵌套
再用flatMap试试
 

#导包
from pyspark import  SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"

#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
#准备一个RDD
rdd=sc.parallelize(["asdf 3rwe dff","sdf 3er gwet","q3w dg xgwe"])
#需求:把RDD数据里面的一个个单词提取出来
rdd2=rdd.flatMap(lambda x:x.split(" "))

print(rdd2.collect())

 结果是

 reduceByKey算子

功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作。

用法

 演示代码

 注意: reduceByKey中接收的函数,只负责聚合,不理会分组
分组是自动by key来分组的.

reduceBeKey中的聚合逻辑是:

 演示

#导包
from pyspark import  SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"

#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
#准备一个RDD
rdd=sc.parallelize([("男",99),("男",88),("女",78),("女",100)])
#需求:分别求出男女的成绩之和
rdd2=rdd.reduceByKey(lambda x,y:x+y)
print(rdd2.collect())

结果是

 练习案例1

需求

读取文件,统计文件中各单词出现的次数

演示代码

 解决步骤

1.用textFile读取文本文件

 2.用flatMap把读取到的单词都一一提取出来

 3.用map将所有单词都转换为二元元组,单词为key,value设置为1

 4.用reduceByKey进行分组并求和

 这样就完成了需求

完整代码

#导包
from pyspark import  SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"

#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
#准备一个RDD读取数据
rdd=sc.textFile("D:/hello.txt")
#取出所有单词
rdd2=rdd.flatMap(lambda x:x.split(" "))
#将所有单词都转换为二元元组,单词为key,value设置为1
word_with_one_rdd=rdd2.map(lambda x:(x,1))
#分组并求和
result_rdd=word_with_one_rdd.reduceByKey(lambda x,y:x+y)
print(result_rdd.collect())

filter算子

功能:过滤想要的数据进行保留

语法:

 演示代码

 演示

#导包
from pyspark import  SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"

#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("text_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
#创建RDD对象
rdd=sc.parallelize([1,2,3,4,5,6,7,8])

rdd2=rdd.filter(lambda x:x%2==0)
print(rdd2.collect())

结果是

distinct算子

功能:对RDD数据进行去重,返回新RDD
语法:rdd.distinct()无需传参

演示代码

 演示

#导包
from pyspark import  SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"

#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("text_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
#创建RDD对象
rdd=sc.parallelize([1,1,1,2,3,3,3,5,4,6,6,6])

rdd2=rdd.distinct()
print(rdd2.collect())

结果是

 sortBy算子

功能:对RDD数据进行排序,基于你指定的排序依据
语法:

 演示

就用上面那个练习,把输出的单词个数进行排序

#导包
from pyspark import  SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"

#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
#准备一个RDD读取数据
rdd=sc.textFile("D:/hello.txt")
#取出所有单词
rdd2=rdd.flatMap(lambda x:x.split(" "))
#将所有单词都转换为二元元组,单词为key,value设置为1
word_with_one_rdd=rdd2.map(lambda x:(x,1))
#分组并求和
result_rdd=word_with_one_rdd.reduceByKey(lambda x,y:x+y)
#对结果进行排序,降序输出
final_rdd=result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(final_rdd.collect())

结果是

 练习案例2

 需求:复制以上内容到文件中,使用Spark读取文件进行计算:

  1. 各个城市销售额排名,从大到小
  2. 全部城市,有哪些商品类别在售卖
  3. 北京市有哪些商品类别在售卖

 这文件里面的json数据,每一条数据都有‘|’这样一个分隔,所以到时候要用split先分开,再把json数据转为python中的字典

解决步骤

1.先用split取出一个个json字符串

 2.把取出来的json字符串转为字典

3.需求1:城市销售额排名。做二元元组(城市,销售额),然后分组聚合,排序

 4.需求2:全部城市有哪些商品在售卖。先用map把每条数据的“category”提取出来,再用distinct去重

5.需求3:北京市有哪些商品在售卖。先用filter过滤出北京的所有数据,再用map得到北京中的所有“category”再用distinct去重

完整代码

#导包
from pyspark import  SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
import json

#创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
#准备一个RDD读取数据
rdd=sc.textFile("D:/orders.txt")
#需求1:城市销售额排名
#取出一个个json字符串
json_str_rdd=rdd.flatMap(lambda x:x.split("|"))
# 把json转换为字典
dict_rdd=json_str_rdd.map(lambda x:json.loads(x))
#取出城市、销售额作为二元元组(城市,销售额)
city_with_money_rdd=dict_rdd.map(lambda x:(x["areaName"],int(x["money"])))
#按城市分组,并把销售额加起来
city_result_rdd=city_with_money_rdd.reduceByKey(lambda x,y:x+y)
#按销售额聚合结果排序
reslut1_rdd=city_result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print("需求1的结果是:",reslut1_rdd.collect())

#需求2:全部城市有哪些商品在售卖
category_rdd=dict_rdd.map(lambda x:x["category"]).distinct()
print("需求2的结果是:",category_rdd.collect())
#需求3:北京市有那些商品在售卖
#先选出北京市的数据
beijing_data_rdd=dict_rdd.filter(lambda x:x["areaName"]=="北京")
#取出全部商品
result3_rdd=beijing_data_rdd.map(lambda x:x["category"]).distinct()
print("需求3的结果是:",result3_rdd.collect())

三.总结

1. map算子(成员方法)

  • 接受一个处理函数,可用lambda表达式快速编写
  • 对RDD内的元素逐个处理,并返回一个新的RDD

2.链式调用
对于返回值是新RDD的算子,可以通过链式调用的方式多次调用算子。

3.flatMap算子

  • 计算逻辑和map一样
  • 可以比map多出解除一层嵌套的功能

4.reduceByKey算子

  • 接受一个处理函数,对数据进行两两计算

5.filter算子

  • 接受一个处理函数,可用lambda快速编写
  • 函数对RDD数据逐个处理,得到True的保留至返回值的RDD中

6.sortBy算子

  • 接收一个处理函数,可用lambda快速编写
  • 函数表示用来决定排序的依据
  • 可以控制升序或降序
  • 全局排序需要设置分区数为1

7.distinct算子

  • 完成对RDD内数据的去重操作

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/162207.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SegFormer学习笔记(1)安装

一、源码:https://github.com/sithu31296/semantic-segmentation我并没使用SegFormer的官方源码,那个mmcv特磨人了,各种奇葩配置错误。二、环境配置新建conda环境conda create -n segformer3715 python3.7.15 选用python3.7.15(纯粹的3.7.0版…

计算机原理四_内存管理

目录儿三、内存管理3.1 内存管理基础3.1.1存储器的多层结构3.1.2 进程运行基本原理进程的装入3.1.3 内存扩充3.1.4 内存的分配3.1.4.1连续分配3.1.4.2非连续分配3.1.4.2.1基本分页存储管理3.1.4.2.2基本分段存储管理3.1.4.2.3 段页式管理3.2 虚拟内存管理3.2.1 虚拟内存的概念3…

【BP靶场portswigger-客户端11】跨站点脚本XSS-10个实验(下)

前言: 介绍: 博主:网络安全领域狂热爱好者(承诺在CSDN永久无偿分享文章)。 殊荣:CSDN网络安全领域优质创作者,2022年双十一业务安全保卫战-某厂第一名,某厂特邀数字业务安全研究员&…

【Go基础】函数和面向接口编程

文章目录一、函数1. 函数的基本形式2. 递归函数3. 匿名函数4. 闭包5. 延迟调用defer6. 异常处理二、面向接口编程1. 接口的基本概念2. 接口的使用3. 接口的赋值4. 接口嵌入5. 空接口6. 类型断言7. 面向接口编程一、函数 1. 函数的基本形式 // 函数定义:a,b是形参 …

【测试】自动化测试

努力经营当下,直至未来明朗! 文章目录一、自动化概述二、自动化测试的分类三、自动化测试工具:selenium四、一个简单的自动化用例五、Selenium常用方法1. 查找页面元素:2.元素的定位(By类)小结普通小孩也要…

Java中this的用法

一、this关键字 1.this的类型:哪个对象调用就是哪个对象的引用类型 二、用法总结 1.this.data; //访问属性 2.this.func(); //访问方法 3.this(); //调用本类中其他构造方法 三、解释用法 1.this.data 这种是在成员方法中使用 让我们来看看不加this会出现什…

ArcGIS基础实验操作100例--实验95平滑处理栅格数据

本实验专栏参考自汤国安教授《地理信息系统基础实验操作100例》一书 实验平台:ArcGIS 10.6 实验数据:请访问实验1(传送门) 空间分析篇--实验95 平滑处理栅格数据 目录 一、实验背景 二、实验数据 三、实验步骤 (1&…

ST算法解决BMQ问题详解(图文并茂,保证看懂)

一.RMQ问题的概念RMQ(Range Minimum/Maximum Query)问题,简单说就是求区间最值问题,是求区间最大值或最小值,即范围最值问题,若是简单的单次询问或者是区间长度很短的询问,可以用暴力的方法来实…

【web安全】——文件上传漏洞

作者名:白昼安全主页面链接: 主页传送门创作初心: 舞台再大,你不上台,永远是观众,没人会关心你努不努力,摔的痛不痛,他们只会看你最后站在什么位置,然后羡慕或鄙夷座右铭…

【寒假第2天】LeetCode刷题

🌈一、选择题 👿第1题: 下面给出的四种排序法中( )排序法是不稳定性排序法A.插入排序 B.冒泡排序 C.归并排序 D.堆,希尔排序,快速排序 答案:D 为啥堆排序是不稳定的&am…

SCA 工具:开源安全威胁一手掌控

1、什么是 SCA SCA(Software Composition Analysis)软件成分分析,通俗的理解就是通过分析软件包含的一些信息和特征来实现对该软件的识别、管理、追踪的技术。我们知道在当今软件开发中,引入开源软件(注 1)到你的项目中&#xff…

线性DP-----(从某点走到某点求最值问题)

线性DP 线性dp问题是dp问题中比较简单的问题,通常一个状态转移方程就可以搞定,线性dp通常求最大值,最小值问题,下面介绍线性dp中从某点走到某点最值问题。 第一类问题(走一遍) 该类问题只走一遍,动态规划中用到的数组f(i,j)含义就是到达(i,j)点得到的最优解 例题1—数字三角形 …

分享88个JavaScript源码,总有一款适合您

JavaScript源码 分享88个JavaScript源码,总有一款适合您 JavaScript源码下载链接:https://pan.baidu.com/s/1guiYWOPKdP1zNW7T8P0caQ?pwd6666 提取码:6666 采集代码下载链接:采集代码.zip - 蓝奏云 下面是文件的名字&#xf…

jinja2 循环计数内置变量loop

变量内容loop.index循环迭代计数(从1开始)loop.index0循环迭代计数(从0开始)loop.revindex循环迭代倒序计数(从len开始,到1结束)loop.revindex0循环迭代倒序计数(从len-1…

【正点原子FPGA连载】 第十八章双目OV5640摄像头HDMI显示实验 摘自【正点原子】DFZU2EG/4EV MPSoC 之FPGA开发指南V1.0

1)实验平台:正点原子MPSoC开发板 2)平台购买地址:https://detail.tmall.com/item.htm?id692450874670 3)全套实验源码手册视频下载地址: http://www.openedv.com/thread-340252-1-1.html 第十八章双目OV5…

jsp题库管理系统Myeclipse开发sqlserver数据库web结构java编程计算机网页项目

一、源码特点 jsp 题库管理系统 是一套完善的web设计系统,对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开 发,数据库为sqlserver,使…

Kafka集群安装

Apache kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,是消息中间件的一种,用于构建实时数据管道和流应用程序。Kafka官网:http://kafka.apache.org/安装环…

1-连续系统PID的Simulink仿真

以二阶线性传递函数。为被控对象,进行模拟PID控制。在信号发生器中选择正弦信号,仿真时取,,,输入指令为,其中A1.0,F0.20Hz。采用ODE45迭代方法,仿真时间为10s。PID控制器由Simulink下的工具箱提…

【Ansible】Ansible Jinja2 模板

Ansible Jinja2 模板 文章目录Ansible Jinja2 模板一、Ansible Jinja2 模板背景介绍二、JinJa2 模块1.JinJa2 是什么?2.Jinja2 必知会3.Jinja2 逻辑控制三、如何使用模板四、 实例演示一、Ansible Jinja2 模板背景介绍 目前 nginx 的配置文件在所有的服务器上都是相…

六种常见系统架构

六种常见系统架构 - 基础篇目录概述需求:设计思路实现思路分析1.URL管理2.微服务架构3.四、微服务架构4.多级缓存架构参考资料和推荐阅读Survive by day and develop by night. talk for import biz , show your perfect code,full busy,skip hardness,m…